Re: [PR] KAFKA-17747: [2/N] Add compute topic and group hash [kafka]
dajac commented on code in PR #19523: URL: https://github.com/apache/kafka/pull/19523#discussion_r2088351150 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Utils.java: ## @@ -324,4 +329,98 @@ static void throwIfRegularExpressionIsInvalid( regex, ex.getDescription())); } } + +/** + * The magic byte used to identify the version of topic hash function. + */ +static final byte TOPIC_HASH_MAGIC_BYTE = 0x00; + +/** + * Computes the hash of the topics in a group. + * + * The computed hash value is stored as the metadata hash in the *GroupMetadataValue. + * + * If there is no topic, the hash value is set to 0. + * The hashing process involves the following steps: + * 1. Sort the topic hashes by topic name. + * 2. Write each topic hash in order. + * + * @param topicHashes The map of topic hashes. Key is topic name and value is the topic hash. + * @return The hash of the group. + */ +static long computeGroupHash(Map topicHashes) { +if (topicHashes.isEmpty()) { +return 0; +} + +// Sort entries by topic name +List> sortedEntries = new ArrayList<>(topicHashes.entrySet()); +sortedEntries.sort(Map.Entry.comparingByKey()); + +HashStream64 hasher = Hashing.xxh3_64().hashStream(); +for (Map.Entry entry : sortedEntries) { +hasher.putLong(entry.getValue()); +} + +return hasher.getAsLong(); +} + +/** + * Computes the hash of the topic id, name, number of partitions, and partition racks by streaming XXH3. + * + * The computed hash value for the topic is utilized in conjunction with the {@link #computeGroupHash(Map)} + * method and is stored as part of the metadata hash in the *GroupMetadataValue. + * It is important to note that if the hash algorithm is changed, the magic byte must be updated to reflect the + * new hash version. + * + * For non-existent topics, the hash value is set to 0. + * For existent topics, the hashing process involves the following steps: + * 1. Write a magic byte to denote the version of the hash function. + * 2. Write the hash code of the topic ID. + * 3. Write the topic name. + * 4. Write the number of partitions associated with the topic. + * 5. For each partition, write the partition ID and a sorted list of rack identifiers. + * - Rack identifiers are formatted as "" to prevent issues with simple separators. + * + * @param topicName The topic image. + * @param metadataImage The cluster image. + * @return The hash of the topic. + */ +static long computeTopicHash(String topicName, MetadataImage metadataImage) { +TopicImage topicImage = metadataImage.topics().getTopic(topicName); +if (topicImage == null) { +return 0; +} + +HashStream64 hasher = Hashing.xxh3_64().hashStream(); +hasher = hasher +.putByte(TOPIC_HASH_MAGIC_BYTE) // magic byte Review Comment: I wonder whether we should remove the inline comments are the code is pretty self explanatory and you already explain the format in the java doc. ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Utils.java: ## @@ -324,4 +329,98 @@ static void throwIfRegularExpressionIsInvalid( regex, ex.getDescription())); } } + +/** + * The magic byte used to identify the version of topic hash function. + */ +static final byte TOPIC_HASH_MAGIC_BYTE = 0x00; + +/** + * Computes the hash of the topics in a group. + * + * The computed hash value is stored as the metadata hash in the *GroupMetadataValue. + * + * If there is no topic, the hash value is set to 0. + * The hashing process involves the following steps: + * 1. Sort the topic hashes by topic name. + * 2. Write each topic hash in order. + * + * @param topicHashes The map of topic hashes. Key is topic name and value is the topic hash. + * @return The hash of the group. + */ +static long computeGroupHash(Map topicHashes) { +if (topicHashes.isEmpty()) { +return 0; +} + +// Sort entries by topic name +List> sortedEntries = new ArrayList<>(topicHashes.entrySet()); +sortedEntries.sort(Map.Entry.comparingByKey()); + +HashStream64 hasher = Hashing.xxh3_64().hashStream(); +for (Map.Entry entry : sortedEntries) { +hasher.putLong(entry.getValue()); +} + +return hasher.getAsLong(); +} + +/** + * Computes the hash of the topic id, name, number of partitions, and partition racks by streaming XXH3. + * + * The computed hash value for the topic is utilized in conjunction with the {@link #computeGroupHash(Map)} + * method and is stored as part
Re: [PR] KAFKA-17747: [2/N] Add compute topic and group hash [kafka]
dajac commented on PR #19523: URL: https://github.com/apache/kafka/pull/19523#issuecomment-2879230830 @FrankYang0529 Thanks for the update. Could you please also update the description of the PR? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17747: [2/N] Add compute topic and group hash [kafka]
FrankYang0529 commented on code in PR #19523: URL: https://github.com/apache/kafka/pull/19523#discussion_r2087028173 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Utils.java: ## @@ -324,4 +329,84 @@ static void throwIfRegularExpressionIsInvalid( regex, ex.getDescription())); } } + +/** + * The magic byte used to identify the version of topic hash function. + */ +static final byte TOPIC_HASH_MAGIC_BYTE = 0x00; + +/** + * Computes the hash of the topics in a group. + * + * The computed hash value is stored as part of the metadata hash in the *GroupMetadataValue. + * + * The hashing process involves the following steps: + * 1. Sort the topic hashes by topic name. + * 2. Write each topic hash in order. + * + * @param topicHashes The map of topic hashes. Key is topic name and value is the topic hash. + * @return The hash of the group. + */ +static long computeGroupHash(Map topicHashes) { +// Sort entries by topic name +List> sortedEntries = new ArrayList<>(topicHashes.entrySet()); +sortedEntries.sort(Map.Entry.comparingByKey()); + +HashStream64 hasher = Hashing.xxh3_64().hashStream(); +for (Map.Entry entry : sortedEntries) { +hasher.putLong(entry.getValue()); +} + +return hasher.getAsLong(); +} + +/** + * Computes the hash of the topic id, name, number of partitions, and partition racks by XXHash64. + * + * The computed hash value for the topic is utilized in conjunction with the {@link #computeGroupHash(Map)} + * method and is stored as part of the metadata hash in the *GroupMetadataValue. + * It is important to note that if the hash algorithm is changed, the magic byte must be updated to reflect the + * new hash version. + * + * The hashing process involves the following steps: + * 1. Write a magic byte to denote the version of the hash function. + * 2. Write the hash code of the topic ID. + * 3. Write the topic name. + * 4. Write the number of partitions associated with the topic. + * 5. For each partition, write the partition ID and a sorted list of rack identifiers. + *- Rack identifiers are formatted as "" to prevent issues with simple separators. + * + * @param topicImage The topic image. + * @param clusterImage The cluster image. + * @return The hash of the topic. + */ +static long computeTopicHash(TopicImage topicImage, ClusterImage clusterImage) throws IOException { +HashStream64 hasher = Hashing.xxh3_64().hashStream(); +hasher = hasher.putByte(TOPIC_HASH_MAGIC_BYTE) // magic byte Review Comment: Updated it. ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Utils.java: ## @@ -324,4 +329,84 @@ static void throwIfRegularExpressionIsInvalid( regex, ex.getDescription())); } } + +/** + * The magic byte used to identify the version of topic hash function. + */ +static final byte TOPIC_HASH_MAGIC_BYTE = 0x00; + +/** + * Computes the hash of the topics in a group. + * + * The computed hash value is stored as part of the metadata hash in the *GroupMetadataValue. + * + * The hashing process involves the following steps: + * 1. Sort the topic hashes by topic name. + * 2. Write each topic hash in order. + * + * @param topicHashes The map of topic hashes. Key is topic name and value is the topic hash. + * @return The hash of the group. + */ +static long computeGroupHash(Map topicHashes) { +// Sort entries by topic name +List> sortedEntries = new ArrayList<>(topicHashes.entrySet()); +sortedEntries.sort(Map.Entry.comparingByKey()); + +HashStream64 hasher = Hashing.xxh3_64().hashStream(); +for (Map.Entry entry : sortedEntries) { +hasher.putLong(entry.getValue()); +} + +return hasher.getAsLong(); +} + +/** + * Computes the hash of the topic id, name, number of partitions, and partition racks by XXHash64. + * + * The computed hash value for the topic is utilized in conjunction with the {@link #computeGroupHash(Map)} + * method and is stored as part of the metadata hash in the *GroupMetadataValue. + * It is important to note that if the hash algorithm is changed, the magic byte must be updated to reflect the + * new hash version. + * + * The hashing process involves the following steps: + * 1. Write a magic byte to denote the version of the hash function. + * 2. Write the hash code of the topic ID. + * 3. Write the topic name. + * 4. Write the number of partitions associated with the topic. + * 5. For each partition, write the partition ID and a sorted list of rack identifiers. + *- Rack identifiers
Re: [PR] KAFKA-17747: [2/N] Add compute topic and group hash [kafka]
dajac commented on code in PR #19523: URL: https://github.com/apache/kafka/pull/19523#discussion_r2087258500 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Utils.java: ## @@ -324,4 +329,84 @@ static void throwIfRegularExpressionIsInvalid( regex, ex.getDescription())); } } + +/** + * The magic byte used to identify the version of topic hash function. + */ +static final byte TOPIC_HASH_MAGIC_BYTE = 0x00; + +/** + * Computes the hash of the topics in a group. + * + * The computed hash value is stored as part of the metadata hash in the *GroupMetadataValue. + * + * The hashing process involves the following steps: + * 1. Sort the topic hashes by topic name. + * 2. Write each topic hash in order. + * + * @param topicHashes The map of topic hashes. Key is topic name and value is the topic hash. + * @return The hash of the group. + */ +static long computeGroupHash(Map topicHashes) { Review Comment: Ok. It works for me. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17747: [2/N] Add compute topic and group hash [kafka]
dajac commented on code in PR #19523: URL: https://github.com/apache/kafka/pull/19523#discussion_r2087259372 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/UtilsTest.java: ## @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.image.MetadataImage; + +import com.dynatrace.hash4j.hashing.Hashing; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import java.io.IOException; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.stream.Stream; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotEquals; + +public class UtilsTest { +private static final Uuid FOO_TOPIC_ID = Uuid.randomUuid(); +private static final String FOO_TOPIC_NAME = "foo"; +private static final String BAR_TOPIC_NAME = "bar"; +private static final int FOO_NUM_PARTITIONS = 2; +private static final MetadataImage FOO_METADATA_IMAGE = new MetadataImageBuilder() +.addTopic(FOO_TOPIC_ID, FOO_TOPIC_NAME, FOO_NUM_PARTITIONS) +.addRacks() +.build(); + +@Test +void testComputeTopicHash() throws IOException { +long result = Utils.computeTopicHash(FOO_METADATA_IMAGE.topics().getTopic(FOO_TOPIC_ID), FOO_METADATA_IMAGE.cluster()); Review Comment: I wonder if we could return 0 when the topic does not exist. Would it work? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17747: [2/N] Add compute topic and group hash [kafka]
dajac commented on code in PR #19523: URL: https://github.com/apache/kafka/pull/19523#discussion_r2087257495 ## build.gradle: ## @@ -1414,10 +1414,12 @@ project(':group-coordinator') { implementation project(':coordinator-common') implementation libs.jacksonDatabind implementation libs.jacksonJDK8Datatypes +implementation libs.lz4 implementation libs.metrics implementation libs.hdrHistogram implementation libs.re2j implementation libs.slf4jApi +implementation libs.hash4j Review Comment: This is fine. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17747: [2/N] Add compute topic and group hash [kafka]
FrankYang0529 commented on code in PR #19523: URL: https://github.com/apache/kafka/pull/19523#discussion_r2087024185 ## build.gradle: ## @@ -1414,10 +1414,12 @@ project(':group-coordinator') { implementation project(':coordinator-common') implementation libs.jacksonDatabind implementation libs.jacksonJDK8Datatypes +implementation libs.lz4 implementation libs.metrics implementation libs.hdrHistogram implementation libs.re2j implementation libs.slf4jApi +implementation libs.hash4j Review Comment: Yes, I added it to LICENSE-binary and `python committer-tools/verify_license.py` can pass. Do I need to modify other files? Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17747: [2/N] Add compute topic and group hash [kafka]
FrankYang0529 commented on code in PR #19523: URL: https://github.com/apache/kafka/pull/19523#discussion_r2087031341 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Utils.java: ## @@ -324,4 +329,84 @@ static void throwIfRegularExpressionIsInvalid( regex, ex.getDescription())); } } + +/** + * The magic byte used to identify the version of topic hash function. + */ +static final byte TOPIC_HASH_MAGIC_BYTE = 0x00; + +/** + * Computes the hash of the topics in a group. + * + * The computed hash value is stored as part of the metadata hash in the *GroupMetadataValue. + * + * The hashing process involves the following steps: + * 1. Sort the topic hashes by topic name. + * 2. Write each topic hash in order. + * + * @param topicHashes The map of topic hashes. Key is topic name and value is the topic hash. + * @return The hash of the group. + */ +static long computeGroupHash(Map topicHashes) { Review Comment: IIUC, this function will only be used in `GroupMetadataManager` like `throwIfRegularExpressionIsInvalid`. I think we can add `public` when we need it. WDYT? ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Utils.java: ## @@ -324,4 +329,84 @@ static void throwIfRegularExpressionIsInvalid( regex, ex.getDescription())); } } + +/** + * The magic byte used to identify the version of topic hash function. + */ +static final byte TOPIC_HASH_MAGIC_BYTE = 0x00; + +/** + * Computes the hash of the topics in a group. + * + * The computed hash value is stored as part of the metadata hash in the *GroupMetadataValue. + * + * The hashing process involves the following steps: + * 1. Sort the topic hashes by topic name. + * 2. Write each topic hash in order. + * + * @param topicHashes The map of topic hashes. Key is topic name and value is the topic hash. + * @return The hash of the group. + */ +static long computeGroupHash(Map topicHashes) { +// Sort entries by topic name +List> sortedEntries = new ArrayList<>(topicHashes.entrySet()); +sortedEntries.sort(Map.Entry.comparingByKey()); + +HashStream64 hasher = Hashing.xxh3_64().hashStream(); +for (Map.Entry entry : sortedEntries) { +hasher.putLong(entry.getValue()); +} + +return hasher.getAsLong(); +} + +/** + * Computes the hash of the topic id, name, number of partitions, and partition racks by XXHash64. + * + * The computed hash value for the topic is utilized in conjunction with the {@link #computeGroupHash(Map)} + * method and is stored as part of the metadata hash in the *GroupMetadataValue. + * It is important to note that if the hash algorithm is changed, the magic byte must be updated to reflect the + * new hash version. + * + * The hashing process involves the following steps: + * 1. Write a magic byte to denote the version of the hash function. + * 2. Write the hash code of the topic ID. + * 3. Write the topic name. + * 4. Write the number of partitions associated with the topic. + * 5. For each partition, write the partition ID and a sorted list of rack identifiers. + *- Rack identifiers are formatted as "" to prevent issues with simple separators. + * + * @param topicImage The topic image. + * @param clusterImage The cluster image. + * @return The hash of the topic. + */ +static long computeTopicHash(TopicImage topicImage, ClusterImage clusterImage) throws IOException { Review Comment: Same as above. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17747: [2/N] Add compute topic and group hash [kafka]
FrankYang0529 commented on code in PR #19523: URL: https://github.com/apache/kafka/pull/19523#discussion_r2087037433 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/UtilsTest.java: ## @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.image.MetadataImage; + +import com.dynatrace.hash4j.hashing.Hashing; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import java.io.IOException; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.stream.Stream; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotEquals; + +public class UtilsTest { +private static final Uuid FOO_TOPIC_ID = Uuid.randomUuid(); +private static final String FOO_TOPIC_NAME = "foo"; +private static final String BAR_TOPIC_NAME = "bar"; +private static final int FOO_NUM_PARTITIONS = 2; +private static final MetadataImage FOO_METADATA_IMAGE = new MetadataImageBuilder() +.addTopic(FOO_TOPIC_ID, FOO_TOPIC_NAME, FOO_NUM_PARTITIONS) +.addRacks() +.build(); + +@Test +void testComputeTopicHash() throws IOException { +long result = Utils.computeTopicHash(FOO_METADATA_IMAGE.topics().getTopic(FOO_TOPIC_ID), FOO_METADATA_IMAGE.cluster()); Review Comment: If input is topic name and metadata image, we need to handle null `TopicImage` in `computeTopicHash`. Do we want to handle this error in it? ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/UtilsTest.java: ## @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.image.MetadataImage; + +import com.dynatrace.hash4j.hashing.Hashing; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import java.io.IOException; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.stream.Stream; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotEquals; + +public class UtilsTest { +private static final Uuid FOO_TOPIC_ID = Uuid.randomUuid(); +private static final String FOO_TOPIC_NAME = "foo"; +private static final String BAR_TOPIC_NAME = "bar"; +private static final int FOO_NUM_PARTITIONS = 2; +private static final MetadataImage FOO_METADATA_IMAGE = new MetadataImageBuilder() +.addTopic(FOO_TOPIC_ID, FOO_TOPIC_NAME, FOO_NUM_PARTITIONS) +.addRacks() +.build(); + +@Test +void testComputeTopicHash() throws IOException { +long result = Utils.computeTopicHash(FOO_METADATA_IMAGE.topics().getTopic(FOO_TOPIC_ID), FOO_METADATA_IMAGE.cluster()); + +long expected = Hashing.xxh3_64().hashStream() +.putByte((byte) 0) // magic byte +.putLong(FOO_TOPIC_ID.hashCode()) // topic ID +.putString(FOO_TOPIC_NAME) // topic name +.putInt(FOO_NUM_PARTITIONS) // number o
Re: [PR] KAFKA-17747: [2/N] Add compute topic and group hash [kafka]
FrankYang0529 commented on code in PR #19523: URL: https://github.com/apache/kafka/pull/19523#discussion_r2087027573 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Utils.java: ## @@ -324,4 +329,84 @@ static void throwIfRegularExpressionIsInvalid( regex, ex.getDescription())); } } + +/** + * The magic byte used to identify the version of topic hash function. + */ +static final byte TOPIC_HASH_MAGIC_BYTE = 0x00; + +/** + * Computes the hash of the topics in a group. + * + * The computed hash value is stored as part of the metadata hash in the *GroupMetadataValue. + * + * The hashing process involves the following steps: + * 1. Sort the topic hashes by topic name. + * 2. Write each topic hash in order. + * + * @param topicHashes The map of topic hashes. Key is topic name and value is the topic hash. + * @return The hash of the group. + */ +static long computeGroupHash(Map topicHashes) { +// Sort entries by topic name +List> sortedEntries = new ArrayList<>(topicHashes.entrySet()); +sortedEntries.sort(Map.Entry.comparingByKey()); + +HashStream64 hasher = Hashing.xxh3_64().hashStream(); +for (Map.Entry entry : sortedEntries) { +hasher.putLong(entry.getValue()); +} + +return hasher.getAsLong(); +} + +/** + * Computes the hash of the topic id, name, number of partitions, and partition racks by XXHash64. + * + * The computed hash value for the topic is utilized in conjunction with the {@link #computeGroupHash(Map)} + * method and is stored as part of the metadata hash in the *GroupMetadataValue. + * It is important to note that if the hash algorithm is changed, the magic byte must be updated to reflect the + * new hash version. + * + * The hashing process involves the following steps: + * 1. Write a magic byte to denote the version of the hash function. + * 2. Write the hash code of the topic ID. + * 3. Write the topic name. + * 4. Write the number of partitions associated with the topic. + * 5. For each partition, write the partition ID and a sorted list of rack identifiers. + *- Rack identifiers are formatted as "" to prevent issues with simple separators. + * + * @param topicImage The topic image. + * @param clusterImage The cluster image. + * @return The hash of the topic. + */ +static long computeTopicHash(TopicImage topicImage, ClusterImage clusterImage) throws IOException { +HashStream64 hasher = Hashing.xxh3_64().hashStream(); +hasher = hasher.putByte(TOPIC_HASH_MAGIC_BYTE) // magic byte +.putLong(topicImage.id().hashCode()) // topic ID +.putString(topicImage.name()) // topic name +.putInt(topicImage.partitions().size()); // number of partitions + +for (int i = 0; i < topicImage.partitions().size(); i++) { +hasher = hasher.putInt(i); // partition id +// The rack string combination cannot use simple separator like ",", because there is no limitation for rack character. +// If using simple separator like "," it may hit edge case like ",," and ",,," / ",,," and ",,". +// Add length before the rack string to avoid the edge case. +List racks = new ArrayList<>(); Review Comment: Yes, I initialize `racks` outside the for-loop and call `clear` before adding data. ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Utils.java: ## @@ -324,4 +329,84 @@ static void throwIfRegularExpressionIsInvalid( regex, ex.getDescription())); } } + +/** + * The magic byte used to identify the version of topic hash function. + */ +static final byte TOPIC_HASH_MAGIC_BYTE = 0x00; + +/** + * Computes the hash of the topics in a group. + * + * The computed hash value is stored as part of the metadata hash in the *GroupMetadataValue. + * + * The hashing process involves the following steps: + * 1. Sort the topic hashes by topic name. + * 2. Write each topic hash in order. + * + * @param topicHashes The map of topic hashes. Key is topic name and value is the topic hash. + * @return The hash of the group. + */ +static long computeGroupHash(Map topicHashes) { +// Sort entries by topic name +List> sortedEntries = new ArrayList<>(topicHashes.entrySet()); +sortedEntries.sort(Map.Entry.comparingByKey()); + +HashStream64 hasher = Hashing.xxh3_64().hashStream(); +for (Map.Entry entry : sortedEntries) { +hasher.putLong(entry.getValue()); +} + +return hasher.getAsLong(); +} + +/** + * Computes the hash of the topic id, name, number of partitions, and partition racks by XXHash64. + *
Re: [PR] KAFKA-17747: [2/N] Add compute topic and group hash [kafka]
FrankYang0529 commented on code in PR #19523: URL: https://github.com/apache/kafka/pull/19523#discussion_r2087026067 ## build.gradle: ## @@ -1414,10 +1414,12 @@ project(':group-coordinator') { implementation project(':coordinator-common') implementation libs.jacksonDatabind implementation libs.jacksonJDK8Datatypes +implementation libs.lz4 Review Comment: Removed it. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17747: [2/N] Add compute topic and group hash [kafka]
dajac commented on code in PR #19523: URL: https://github.com/apache/kafka/pull/19523#discussion_r2086685337 ## build.gradle: ## @@ -1414,10 +1414,12 @@ project(':group-coordinator') { implementation project(':coordinator-common') implementation libs.jacksonDatabind implementation libs.jacksonJDK8Datatypes +implementation libs.lz4 Review Comment: Do we need this change? ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Utils.java: ## @@ -324,4 +329,84 @@ static void throwIfRegularExpressionIsInvalid( regex, ex.getDescription())); } } + +/** + * The magic byte used to identify the version of topic hash function. + */ +static final byte TOPIC_HASH_MAGIC_BYTE = 0x00; + +/** + * Computes the hash of the topics in a group. + * + * The computed hash value is stored as part of the metadata hash in the *GroupMetadataValue. + * + * The hashing process involves the following steps: + * 1. Sort the topic hashes by topic name. + * 2. Write each topic hash in order. + * + * @param topicHashes The map of topic hashes. Key is topic name and value is the topic hash. + * @return The hash of the group. + */ +static long computeGroupHash(Map topicHashes) { +// Sort entries by topic name +List> sortedEntries = new ArrayList<>(topicHashes.entrySet()); +sortedEntries.sort(Map.Entry.comparingByKey()); + +HashStream64 hasher = Hashing.xxh3_64().hashStream(); +for (Map.Entry entry : sortedEntries) { +hasher.putLong(entry.getValue()); +} + +return hasher.getAsLong(); +} + +/** + * Computes the hash of the topic id, name, number of partitions, and partition racks by XXHash64. + * + * The computed hash value for the topic is utilized in conjunction with the {@link #computeGroupHash(Map)} + * method and is stored as part of the metadata hash in the *GroupMetadataValue. + * It is important to note that if the hash algorithm is changed, the magic byte must be updated to reflect the + * new hash version. + * + * The hashing process involves the following steps: + * 1. Write a magic byte to denote the version of the hash function. + * 2. Write the hash code of the topic ID. + * 3. Write the topic name. + * 4. Write the number of partitions associated with the topic. + * 5. For each partition, write the partition ID and a sorted list of rack identifiers. + *- Rack identifiers are formatted as "" to prevent issues with simple separators. + * + * @param topicImage The topic image. + * @param clusterImage The cluster image. + * @return The hash of the topic. + */ +static long computeTopicHash(TopicImage topicImage, ClusterImage clusterImage) throws IOException { +HashStream64 hasher = Hashing.xxh3_64().hashStream(); +hasher = hasher.putByte(TOPIC_HASH_MAGIC_BYTE) // magic byte +.putLong(topicImage.id().hashCode()) // topic ID +.putString(topicImage.name()) // topic name +.putInt(topicImage.partitions().size()); // number of partitions + +for (int i = 0; i < topicImage.partitions().size(); i++) { +hasher = hasher.putInt(i); // partition id +// The rack string combination cannot use simple separator like ",", because there is no limitation for rack character. +// If using simple separator like "," it may hit edge case like ",," and ",,," / ",,," and ",,". +// Add length before the rack string to avoid the edge case. +List racks = new ArrayList<>(); Review Comment: nit: I wonder whether we could reuse this list. We could declare it before the loop and clear it before using it. What do you think? ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Utils.java: ## @@ -324,4 +329,84 @@ static void throwIfRegularExpressionIsInvalid( regex, ex.getDescription())); } } + +/** + * The magic byte used to identify the version of topic hash function. + */ +static final byte TOPIC_HASH_MAGIC_BYTE = 0x00; + +/** + * Computes the hash of the topics in a group. + * + * The computed hash value is stored as part of the metadata hash in the *GroupMetadataValue. + * + * The hashing process involves the following steps: + * 1. Sort the topic hashes by topic name. + * 2. Write each topic hash in order. + * + * @param topicHashes The map of topic hashes. Key is topic name and value is the topic hash. + * @return The hash of the group. + */ +static long computeGroupHash(Map topicHashes) { +// Sort entries by topic name +List> sortedEntries = new ArrayList<>(topicHashes.entrySet()); +sortedEntries.sort(Map.Entry.comparingByKe
Re: [PR] KAFKA-17747: [2/N] Add compute topic and group hash [kafka]
FrankYang0529 commented on code in PR #19523: URL: https://github.com/apache/kafka/pull/19523#discussion_r2084021758 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/UtilsTest.java: ## @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.utils.ByteBufferOutputStream; +import org.apache.kafka.image.MetadataImage; + +import net.jpountz.xxhash.XXHash64; +import net.jpountz.xxhash.XXHashFactory; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.stream.Stream; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotEquals; + +public class UtilsTest { +private static final Uuid FOO_TOPIC_ID = Uuid.randomUuid(); +private static final String FOO_TOPIC_NAME = "foo"; +private static final String BAR_TOPIC_NAME = "bar"; +private static final int FOO_NUM_PARTITIONS = 2; +private static final MetadataImage FOO_METADATA_IMAGE = new MetadataImageBuilder() +.addTopic(FOO_TOPIC_ID, FOO_TOPIC_NAME, FOO_NUM_PARTITIONS) +.addRacks() +.build(); +private static final XXHash64 LZ4_HASH_INSTANCE = XXHashFactory.fastestInstance().hash64(); Review Comment: I will update the PR to use hash4j with streaming XXH3. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17747: [2/N] Add compute topic and group hash [kafka]
dajac commented on code in PR #19523: URL: https://github.com/apache/kafka/pull/19523#discussion_r2083981644 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/UtilsTest.java: ## @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.utils.ByteBufferOutputStream; +import org.apache.kafka.image.MetadataImage; + +import net.jpountz.xxhash.XXHash64; +import net.jpountz.xxhash.XXHashFactory; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.stream.Stream; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotEquals; + +public class UtilsTest { +private static final Uuid FOO_TOPIC_ID = Uuid.randomUuid(); +private static final String FOO_TOPIC_NAME = "foo"; +private static final String BAR_TOPIC_NAME = "bar"; +private static final int FOO_NUM_PARTITIONS = 2; +private static final MetadataImage FOO_METADATA_IMAGE = new MetadataImageBuilder() +.addTopic(FOO_TOPIC_ID, FOO_TOPIC_NAME, FOO_NUM_PARTITIONS) +.addRacks() +.build(); +private static final XXHash64 LZ4_HASH_INSTANCE = XXHashFactory.fastestInstance().hash64(); Review Comment: I confirm that it is sever-side only. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17747: [2/N] Add compute topic and group hash [kafka]
ijuma commented on code in PR #19523: URL: https://github.com/apache/kafka/pull/19523#discussion_r2083606336 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/UtilsTest.java: ## @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.utils.ByteBufferOutputStream; +import org.apache.kafka.image.MetadataImage; + +import net.jpountz.xxhash.XXHash64; +import net.jpountz.xxhash.XXHashFactory; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.stream.Stream; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotEquals; + +public class UtilsTest { +private static final Uuid FOO_TOPIC_ID = Uuid.randomUuid(); +private static final String FOO_TOPIC_NAME = "foo"; +private static final String BAR_TOPIC_NAME = "bar"; +private static final int FOO_NUM_PARTITIONS = 2; +private static final MetadataImage FOO_METADATA_IMAGE = new MetadataImageBuilder() +.addTopic(FOO_TOPIC_ID, FOO_TOPIC_NAME, FOO_NUM_PARTITIONS) +.addRacks() +.build(); +private static final XXHash64 LZ4_HASH_INSTANCE = XXHashFactory.fastestInstance().hash64(); Review Comment: If this is a server-side dependency, it seems fine. If it's also client-side, we probably need to think more about it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17747: [2/N] Add compute topic and group hash [kafka]
ijuma commented on code in PR #19523: URL: https://github.com/apache/kafka/pull/19523#discussion_r2083606229 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/UtilsTest.java: ## @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.utils.ByteBufferOutputStream; +import org.apache.kafka.image.MetadataImage; + +import net.jpountz.xxhash.XXHash64; +import net.jpountz.xxhash.XXHashFactory; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.stream.Stream; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotEquals; + +public class UtilsTest { +private static final Uuid FOO_TOPIC_ID = Uuid.randomUuid(); +private static final String FOO_TOPIC_NAME = "foo"; +private static final String BAR_TOPIC_NAME = "bar"; +private static final int FOO_NUM_PARTITIONS = 2; +private static final MetadataImage FOO_METADATA_IMAGE = new MetadataImageBuilder() +.addTopic(FOO_TOPIC_ID, FOO_TOPIC_NAME, FOO_NUM_PARTITIONS) +.addRacks() +.build(); +private static final XXHash64 LZ4_HASH_INSTANCE = XXHashFactory.fastestInstance().hash64(); Review Comment: It looks like a high quality library from their description of why they didn't use existing ones: https://www.dynatrace.com/news/blog/hash4j-new-library-java/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17747: [2/N] Add compute topic and group hash [kafka]
dajac commented on code in PR #19523: URL: https://github.com/apache/kafka/pull/19523#discussion_r2081899976 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/UtilsTest.java: ## @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.utils.ByteBufferOutputStream; +import org.apache.kafka.image.MetadataImage; + +import net.jpountz.xxhash.XXHash64; +import net.jpountz.xxhash.XXHashFactory; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.stream.Stream; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotEquals; + +public class UtilsTest { +private static final Uuid FOO_TOPIC_ID = Uuid.randomUuid(); +private static final String FOO_TOPIC_NAME = "foo"; +private static final String BAR_TOPIC_NAME = "bar"; +private static final int FOO_NUM_PARTITIONS = 2; +private static final MetadataImage FOO_METADATA_IMAGE = new MetadataImageBuilder() +.addTopic(FOO_TOPIC_ID, FOO_TOPIC_NAME, FOO_NUM_PARTITIONS) +.addRacks() +.build(); +private static final XXHash64 LZ4_HASH_INSTANCE = XXHashFactory.fastestInstance().hash64(); Review Comment: Thanks. I am fine with taking `hash4j` as a dependency. It is a small one without too much risk but let's see what @ijuma and @chia7712 think about it too. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17747: [2/N] Add compute topic and group hash [kafka]
dajac commented on code in PR #19523: URL: https://github.com/apache/kafka/pull/19523#discussion_r2081683481 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/UtilsTest.java: ## @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.utils.ByteBufferOutputStream; +import org.apache.kafka.image.MetadataImage; + +import net.jpountz.xxhash.XXHash64; +import net.jpountz.xxhash.XXHashFactory; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.stream.Stream; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotEquals; + +public class UtilsTest { +private static final Uuid FOO_TOPIC_ID = Uuid.randomUuid(); +private static final String FOO_TOPIC_NAME = "foo"; +private static final String BAR_TOPIC_NAME = "bar"; +private static final int FOO_NUM_PARTITIONS = 2; +private static final MetadataImage FOO_METADATA_IMAGE = new MetadataImageBuilder() +.addTopic(FOO_TOPIC_ID, FOO_TOPIC_NAME, FOO_NUM_PARTITIONS) +.addRacks() +.build(); +private static final XXHash64 LZ4_HASH_INSTANCE = XXHashFactory.fastestInstance().hash64(); Review Comment: Using streaming XXH3 seems pretty interesting to me given the results. Is `com.dynatrace.hash4j` the only way to get it? The library seems reasonable to be taken as a dependency on the server. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17747: [2/N] Add compute topic and group hash [kafka]
FrankYang0529 commented on code in PR #19523: URL: https://github.com/apache/kafka/pull/19523#discussion_r2081721119 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/UtilsTest.java: ## @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.utils.ByteBufferOutputStream; +import org.apache.kafka.image.MetadataImage; + +import net.jpountz.xxhash.XXHash64; +import net.jpountz.xxhash.XXHashFactory; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.stream.Stream; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotEquals; + +public class UtilsTest { +private static final Uuid FOO_TOPIC_ID = Uuid.randomUuid(); +private static final String FOO_TOPIC_NAME = "foo"; +private static final String BAR_TOPIC_NAME = "bar"; +private static final int FOO_NUM_PARTITIONS = 2; +private static final MetadataImage FOO_METADATA_IMAGE = new MetadataImageBuilder() +.addTopic(FOO_TOPIC_ID, FOO_TOPIC_NAME, FOO_NUM_PARTITIONS) +.addRacks() +.build(); +private static final XXHash64 LZ4_HASH_INSTANCE = XXHashFactory.fastestInstance().hash64(); Review Comment: From https://xxhash.com/, there are three Java libraries. Only zero-allocation-hashing and hash4j provides XXH3. However, only hash4j has streaming hash class. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17747: [2/N] Add compute topic and group hash [kafka]
dajac commented on code in PR #19523: URL: https://github.com/apache/kafka/pull/19523#discussion_r2081650493 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Utils.java: ## @@ -324,4 +336,106 @@ static void throwIfRegularExpressionIsInvalid( regex, ex.getDescription())); } } + +/** + * The magic byte used to identify the version of topic hash function. + */ +static final byte TOPIC_HASH_MAGIC_BYTE = 0x00; +static final XXHash64 LZ4_HASH_INSTANCE = XXHashFactory.fastestInstance().hash64(); + +/** + * Computes the hash of the topics in a group. + * + * The computed hash value is stored as part of the metadata hash in the *GroupMetadataValue. + * + * The hashing process involves the following steps: + * 1. Sort the topic hashes by topic name. + * 2. Convert each long hash value into a byte array. + * 3. Combine the sorted byte arrays to produce a final hash for the group. + * + * @param topicHashes The map of topic hashes. Key is topic name and value is the topic hash. + * @return The hash of the group. + */ +static long computeGroupHash(Map topicHashes) { +// Convert long to byte array. This is taken from guava LongHashCode#asBytes. +// https://github.com/google/guava/blob/bdf2a9d05342fca852645278d474082905e09d94/guava/src/com/google/common/hash/HashCode.java#L187-L199 +LongFunction longToBytes = (long value) -> new byte[] { +(byte) value, +(byte) (value >> 8), +(byte) (value >> 16), +(byte) (value >> 24), +(byte) (value >> 32), +(byte) (value >> 40), +(byte) (value >> 48), +(byte) (value >> 56) +}; + +// Combine the sorted topic hashes. +byte[] resultBytes = new byte[8]; +topicHashes.entrySet() +.stream() +.sorted(Map.Entry.comparingByKey()) // sort by topic name +.map(Map.Entry::getValue) +.map(longToBytes::apply) +.forEach(nextBytes -> { +// Combine ordered hashes. This is taken from guava Hashing#combineOrdered. +// https://github.com/google/guava/blob/bdf2a9d05342fca852645278d474082905e09d94/guava/src/com/google/common/hash/Hashing.java#L689-L712 +for (int i = 0; i < nextBytes.length; i++) { +resultBytes[i] = (byte) (resultBytes[i] * 37 ^ nextBytes[i]); +} +}); + +return LZ4_HASH_INSTANCE.hash(resultBytes, 0, resultBytes.length, 0); +} + +/** + * Computes the hash of the topic id, name, number of partitions, and partition racks by XXHash64. + * + * The computed hash value for the topic is utilized in conjunction with the {@link #computeGroupHash(Map)} + * method and is stored as part of the metadata hash in the *GroupMetadataValue. + * It is important to note that if the hash algorithm is changed, the magic byte must be updated to reflect the + * new hash version. + * + * The hashing process involves the following steps: + * 1. Write a magic byte to denote the version of the hash function. + * 2. Write the hash code of the topic ID. + * 3. Write the UTF-8 encoded topic name. + * 4. Write the number of partitions associated with the topic. + * 5. For each partition, write the partition ID and a sorted list of rack identifiers. + *- Rack identifiers are formatted as "length1:value1,length2:value2" to prevent issues with simple separators. + * + * @param topicImage The topic image. + * @param clusterImage The cluster image. + * @return The hash of the topic. + */ +static long computeTopicHash(TopicImage topicImage, ClusterImage clusterImage) throws IOException { +try (ByteBufferOutputStream bbos = new ByteBufferOutputStream(512); + DataOutputStream dos = new DataOutputStream(bbos)) { +dos.writeByte(TOPIC_HASH_MAGIC_BYTE); // magic byte +dos.writeLong(topicImage.id().hashCode()); // topic ID +dos.writeUTF(topicImage.name()); // topic name +dos.writeInt(topicImage.partitions().size()); // number of partitions +for (int i = 0; i < topicImage.partitions().size(); i++) { +dos.writeInt(i); // partition id +List sortedRacksList = Arrays.stream(topicImage.partitions().get(i).replicas) +.mapToObj(clusterImage::broker) +.filter(Objects::nonNull) +.map(BrokerRegistration::rack) +.filter(Optional::isPresent) +.map(Optional::get) +.sorted() +.toList(); + +// The rack string combination cannot use simple separator like ",", because there is no limitation for rack character. +
Re: [PR] KAFKA-17747: [2/N] Add compute topic and group hash [kafka]
dajac commented on code in PR #19523: URL: https://github.com/apache/kafka/pull/19523#discussion_r2081623145 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Utils.java: ## @@ -324,4 +336,106 @@ static void throwIfRegularExpressionIsInvalid( regex, ex.getDescription())); } } + +/** + * The magic byte used to identify the version of topic hash function. + */ +static final byte TOPIC_HASH_MAGIC_BYTE = 0x00; +static final XXHash64 LZ4_HASH_INSTANCE = XXHashFactory.fastestInstance().hash64(); + +/** + * Computes the hash of the topics in a group. + * + * The computed hash value is stored as part of the metadata hash in the *GroupMetadataValue. + * + * The hashing process involves the following steps: + * 1. Sort the topic hashes by topic name. + * 2. Convert each long hash value into a byte array. + * 3. Combine the sorted byte arrays to produce a final hash for the group. + * + * @param topicHashes The map of topic hashes. Key is topic name and value is the topic hash. + * @return The hash of the group. + */ +static long computeGroupHash(Map topicHashes) { +// Convert long to byte array. This is taken from guava LongHashCode#asBytes. +// https://github.com/google/guava/blob/bdf2a9d05342fca852645278d474082905e09d94/guava/src/com/google/common/hash/HashCode.java#L187-L199 +LongFunction longToBytes = (long value) -> new byte[] { +(byte) value, +(byte) (value >> 8), +(byte) (value >> 16), +(byte) (value >> 24), +(byte) (value >> 32), +(byte) (value >> 40), +(byte) (value >> 48), +(byte) (value >> 56) +}; + +// Combine the sorted topic hashes. +byte[] resultBytes = new byte[8]; +topicHashes.entrySet() +.stream() +.sorted(Map.Entry.comparingByKey()) // sort by topic name +.map(Map.Entry::getValue) +.map(longToBytes::apply) +.forEach(nextBytes -> { +// Combine ordered hashes. This is taken from guava Hashing#combineOrdered. +// https://github.com/google/guava/blob/bdf2a9d05342fca852645278d474082905e09d94/guava/src/com/google/common/hash/Hashing.java#L689-L712 +for (int i = 0; i < nextBytes.length; i++) { +resultBytes[i] = (byte) (resultBytes[i] * 37 ^ nextBytes[i]); +} +}); + +return LZ4_HASH_INSTANCE.hash(resultBytes, 0, resultBytes.length, 0); +} + +/** + * Computes the hash of the topic id, name, number of partitions, and partition racks by XXHash64. + * + * The computed hash value for the topic is utilized in conjunction with the {@link #computeGroupHash(Map)} + * method and is stored as part of the metadata hash in the *GroupMetadataValue. + * It is important to note that if the hash algorithm is changed, the magic byte must be updated to reflect the + * new hash version. + * + * The hashing process involves the following steps: + * 1. Write a magic byte to denote the version of the hash function. + * 2. Write the hash code of the topic ID. + * 3. Write the UTF-8 encoded topic name. + * 4. Write the number of partitions associated with the topic. + * 5. For each partition, write the partition ID and a sorted list of rack identifiers. + *- Rack identifiers are formatted as "length1:value1,length2:value2" to prevent issues with simple separators. + * + * @param topicImage The topic image. + * @param clusterImage The cluster image. + * @return The hash of the topic. + */ +static long computeTopicHash(TopicImage topicImage, ClusterImage clusterImage) throws IOException { +try (ByteBufferOutputStream bbos = new ByteBufferOutputStream(512); + DataOutputStream dos = new DataOutputStream(bbos)) { +dos.writeByte(TOPIC_HASH_MAGIC_BYTE); // magic byte +dos.writeLong(topicImage.id().hashCode()); // topic ID +dos.writeUTF(topicImage.name()); // topic name +dos.writeInt(topicImage.partitions().size()); // number of partitions +for (int i = 0; i < topicImage.partitions().size(); i++) { +dos.writeInt(i); // partition id +List sortedRacksList = Arrays.stream(topicImage.partitions().get(i).replicas) +.mapToObj(clusterImage::broker) +.filter(Objects::nonNull) +.map(BrokerRegistration::rack) +.filter(Optional::isPresent) +.map(Optional::get) +.sorted() +.toList(); + +// The rack string combination cannot use simple separator like ",", because there is no limitation for rack character. +
Re: [PR] KAFKA-17747: [2/N] Add compute topic and group hash [kafka]
FrankYang0529 commented on code in PR #19523: URL: https://github.com/apache/kafka/pull/19523#discussion_r2073493044 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/UtilsTest.java: ## @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.utils.ByteBufferOutputStream; +import org.apache.kafka.image.MetadataImage; + +import net.jpountz.xxhash.XXHash64; +import net.jpountz.xxhash.XXHashFactory; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.stream.Stream; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotEquals; + +public class UtilsTest { +private static final Uuid FOO_TOPIC_ID = Uuid.randomUuid(); +private static final String FOO_TOPIC_NAME = "foo"; +private static final String BAR_TOPIC_NAME = "bar"; +private static final int FOO_NUM_PARTITIONS = 2; +private static final MetadataImage FOO_METADATA_IMAGE = new MetadataImageBuilder() +.addTopic(FOO_TOPIC_ID, FOO_TOPIC_NAME, FOO_NUM_PARTITIONS) +.addRacks() +.build(); +private static final XXHash64 LZ4_HASH_INSTANCE = XXHashFactory.fastestInstance().hash64(); Review Comment: Thanks for the suggestion. I do benchmark for streaming XXH3 / streaming XXH64 / non-streaming XXH3 / non-streaming XXH64. The streaming XXH3 gets the best result. However, it needs to include new library `com.dynatrace.hash4j`. Do we want to import it? cc @chia7712 @dajac ``` Benchmark (numReplicasPerBroker) (partitionsPerTopic) (replicationFactor) Mode Cnt Score Error Units TopicHashBenchmark.testDynatraceStreamingXXH3 10 103 avgt5879.241 ±6.788 ns/op TopicHashBenchmark.testDynatraceStreamingXXH3 10 503 avgt5 4192.380 ± 195.424 ns/op TopicHashBenchmark.testDynatraceStreamingXXH3 10 1003 avgt5 8027.227 ± 210.403 ns/op TopicHashBenchmark.testDynatraceXXH3 10 103 avgt5 1676.398 ±2.249 ns/op TopicHashBenchmark.testDynatraceXXH3 10 503 avgt5 9256.175 ± 45.298 ns/op TopicHashBenchmark.testDynatraceXXH3 10 1003 avgt5 20195.772 ± 37.651 ns/op TopicHashBenchmark.testLz4StreamingXXHash6410 103 avgt5 9739.833 ± 188.303 ns/op TopicHashBenchmark.testLz4StreamingXXHash6410 503 avgt5 45540.195 ± 455.747 ns/op TopicHashBenchmark.testLz4StreamingXXHash6410 1003 avgt5 89084.689 ± 2164.862 ns/op TopicHashBenchmark.testLz4XXHash64 10 103 avgt5 1755.391 ±6.436 ns/op TopicHashBenchmark.testLz4XXHash64 10 503 avgt5 9421.643 ± 79.838 ns/op TopicHashBenchmark.testLz4XXHash64 10 1003 avgt5 19461.960 ± 425.881 ns/op JMH benchmarks done ``` TopicHashBenchmark.java ```java package org.apache.kafka.jmh.metadata; import org.apache.kafka.common.Uuid; import org.apache.kafka.common.metadata.RegisterBrokerRecord; i
Re: [PR] KAFKA-17747: [2/N] Add compute topic and group hash [kafka]
ijuma commented on code in PR #19523: URL: https://github.com/apache/kafka/pull/19523#discussion_r2070951806 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Utils.java: ## @@ -324,4 +336,106 @@ static void throwIfRegularExpressionIsInvalid( regex, ex.getDescription())); } } + +/** + * The magic byte used to identify the version of topic hash function. + */ +static final byte TOPIC_HASH_MAGIC_BYTE = 0x00; +static final XXHash64 LZ4_HASH_INSTANCE = XXHashFactory.fastestInstance().hash64(); + +/** + * Computes the hash of the topics in a group. + * + * The computed hash value is stored as part of the metadata hash in the *GroupMetadataValue. + * + * The hashing process involves the following steps: + * 1. Sort the topic hashes by topic name. + * 2. Convert each long hash value into a byte array. + * 3. Combine the sorted byte arrays to produce a final hash for the group. + * + * @param topicHashes The map of topic hashes. Key is topic name and value is the topic hash. + * @return The hash of the group. + */ +static long computeGroupHash(Map topicHashes) { +// Convert long to byte array. This is taken from guava LongHashCode#asBytes. +// https://github.com/google/guava/blob/bdf2a9d05342fca852645278d474082905e09d94/guava/src/com/google/common/hash/HashCode.java#L187-L199 +LongFunction longToBytes = (long value) -> new byte[] { +(byte) value, +(byte) (value >> 8), +(byte) (value >> 16), +(byte) (value >> 24), +(byte) (value >> 32), +(byte) (value >> 40), +(byte) (value >> 48), +(byte) (value >> 56) +}; + +// Combine the sorted topic hashes. +byte[] resultBytes = new byte[8]; +topicHashes.entrySet() +.stream() +.sorted(Map.Entry.comparingByKey()) // sort by topic name +.map(Map.Entry::getValue) +.map(longToBytes::apply) +.forEach(nextBytes -> { +// Combine ordered hashes. This is taken from guava Hashing#combineOrdered. +// https://github.com/google/guava/blob/bdf2a9d05342fca852645278d474082905e09d94/guava/src/com/google/common/hash/Hashing.java#L689-L712 +for (int i = 0; i < nextBytes.length; i++) { +resultBytes[i] = (byte) (resultBytes[i] * 37 ^ nextBytes[i]); +} +}); + +return LZ4_HASH_INSTANCE.hash(resultBytes, 0, resultBytes.length, 0); +} + +/** + * Computes the hash of the topic id, name, number of partitions, and partition racks by XXHash64. + * + * The computed hash value for the topic is utilized in conjunction with the {@link #computeGroupHash(Map)} + * method and is stored as part of the metadata hash in the *GroupMetadataValue. + * It is important to note that if the hash algorithm is changed, the magic byte must be updated to reflect the + * new hash version. + * + * The hashing process involves the following steps: + * 1. Write a magic byte to denote the version of the hash function. + * 2. Write the hash code of the topic ID. + * 3. Write the UTF-8 encoded topic name. + * 4. Write the number of partitions associated with the topic. + * 5. For each partition, write the partition ID and a sorted list of rack identifiers. + *- Rack identifiers are formatted as "length1:value1,length2:value2" to prevent issues with simple separators. + * + * @param topicImage The topic image. + * @param clusterImage The cluster image. + * @return The hash of the topic. + */ +static long computeTopicHash(TopicImage topicImage, ClusterImage clusterImage) throws IOException { +try (ByteBufferOutputStream bbos = new ByteBufferOutputStream(512); + DataOutputStream dos = new DataOutputStream(bbos)) { +dos.writeByte(TOPIC_HASH_MAGIC_BYTE); // magic byte +dos.writeLong(topicImage.id().hashCode()); // topic ID +dos.writeUTF(topicImage.name()); // topic name +dos.writeInt(topicImage.partitions().size()); // number of partitions +for (int i = 0; i < topicImage.partitions().size(); i++) { +dos.writeInt(i); // partition id +List sortedRacksList = Arrays.stream(topicImage.partitions().get(i).replicas) +.mapToObj(clusterImage::broker) +.filter(Objects::nonNull) +.map(BrokerRegistration::rack) +.filter(Optional::isPresent) +.map(Optional::get) +.sorted() +.toList(); + +// The rack string combination cannot use simple separator like ",", because there is no limitation for rack character. +
Re: [PR] KAFKA-17747: [2/N] Add compute topic and group hash [kafka]
ijuma commented on code in PR #19523: URL: https://github.com/apache/kafka/pull/19523#discussion_r2070950389 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/UtilsTest.java: ## @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.utils.ByteBufferOutputStream; +import org.apache.kafka.image.MetadataImage; + +import net.jpountz.xxhash.XXHash64; +import net.jpountz.xxhash.XXHashFactory; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.stream.Stream; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotEquals; + +public class UtilsTest { +private static final Uuid FOO_TOPIC_ID = Uuid.randomUuid(); +private static final String FOO_TOPIC_NAME = "foo"; +private static final String BAR_TOPIC_NAME = "bar"; +private static final int FOO_NUM_PARTITIONS = 2; +private static final MetadataImage FOO_METADATA_IMAGE = new MetadataImageBuilder() +.addTopic(FOO_TOPIC_ID, FOO_TOPIC_NAME, FOO_NUM_PARTITIONS) +.addRacks() +.build(); +private static final XXHash64 LZ4_HASH_INSTANCE = XXHashFactory.fastestInstance().hash64(); Review Comment: `XXH3` seems to be the fastest implementation. Did we consider using that? https://github.com/Cyan4973/xxHash -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17747: [2/N] Add compute topic and group hash [kafka]
ijuma commented on code in PR #19523: URL: https://github.com/apache/kafka/pull/19523#discussion_r2070948145 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Utils.java: ## @@ -324,4 +336,106 @@ static void throwIfRegularExpressionIsInvalid( regex, ex.getDescription())); } } + +/** + * The magic byte used to identify the version of topic hash function. + */ +static final byte TOPIC_HASH_MAGIC_BYTE = 0x00; +static final XXHash64 LZ4_HASH_INSTANCE = XXHashFactory.fastestInstance().hash64(); + +/** + * Computes the hash of the topics in a group. + * + * The computed hash value is stored as part of the metadata hash in the *GroupMetadataValue. + * + * The hashing process involves the following steps: + * 1. Sort the topic hashes by topic name. + * 2. Convert each long hash value into a byte array. + * 3. Combine the sorted byte arrays to produce a final hash for the group. + * + * @param topicHashes The map of topic hashes. Key is topic name and value is the topic hash. + * @return The hash of the group. + */ +static long computeGroupHash(Map topicHashes) { +// Convert long to byte array. This is taken from guava LongHashCode#asBytes. +// https://github.com/google/guava/blob/bdf2a9d05342fca852645278d474082905e09d94/guava/src/com/google/common/hash/HashCode.java#L187-L199 +LongFunction longToBytes = (long value) -> new byte[] { +(byte) value, +(byte) (value >> 8), +(byte) (value >> 16), +(byte) (value >> 24), +(byte) (value >> 32), +(byte) (value >> 40), +(byte) (value >> 48), +(byte) (value >> 56) +}; + +// Combine the sorted topic hashes. +byte[] resultBytes = new byte[8]; +topicHashes.entrySet() +.stream() +.sorted(Map.Entry.comparingByKey()) // sort by topic name +.map(Map.Entry::getValue) +.map(longToBytes::apply) +.forEach(nextBytes -> { Review Comment: I think we're adding a lot of unnecessary overhead for the hash computation (multiple map calls, etc.). We should probably just use an old school loop. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17747: [2/N] Add compute topic and group hash [kafka]
squah-confluent commented on code in PR #19523: URL: https://github.com/apache/kafka/pull/19523#discussion_r2070408828 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java: ## @@ -209,4 +219,50 @@ void validateOffsetFetch( default boolean shouldExpire() { return true; } + +/** + * Computes the hash of the topics in a group. + * + * @param topicHashes The map of topic hashes. Key is topic name and value is the topic hash. + * @return The hash of the group. + */ +static long computeGroupHash(Map topicHashes) { +return Hashing.combineOrdered( +topicHashes.entrySet() +.stream() +.sorted(Map.Entry.comparingByKey()) +.map(e -> HashCode.fromLong(e.getValue())) +.toList() +).asLong(); +} + +/** + * Computes the hash of the topic id, name, number of partitions, and partition racks by Murmur3. + * + * @param topicImage The topic image. + * @param clusterImage The cluster image. + * @return The hash of the topic. + */ +static long computeTopicHash(TopicImage topicImage, ClusterImage clusterImage) { +HashFunction hf = Hashing.murmur3_128(); +Hasher topicHasher = hf.newHasher() +.putByte((byte) 0) // magic byte +.putLong(topicImage.id().hashCode()) // topic Id +.putString(topicImage.name(), StandardCharsets.UTF_8) // topic name +.putInt(topicImage.partitions().size()); // number of partitions + + topicImage.partitions().entrySet().stream().sorted(Map.Entry.comparingByKey()).forEach(entry -> { +topicHasher.putInt(entry.getKey()); // partition id +String racks = Arrays.stream(entry.getValue().replicas) +.mapToObj(clusterImage::broker) +.filter(Objects::nonNull) +.map(BrokerRegistration::rack) +.filter(Optional::isPresent) +.map(Optional::get) +.sorted() +.collect(Collectors.joining(";")); Review Comment: Thanks for taking the suggestion. I think it's fine now. Small nit though, I was actually thinking of writing the length in binary, using `writeInt` and dropping the `:` and `,` separators entirely. Apologies if I wasn't clear enough earlier. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17747: [2/N] Add compute topic and group hash [kafka]
chia7712 commented on code in PR #19523: URL: https://github.com/apache/kafka/pull/19523#discussion_r2069288073 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Utils.java: ## @@ -324,4 +337,87 @@ static void throwIfRegularExpressionIsInvalid( regex, ex.getDescription())); } } + +/** + * The magic byte used to identify the version of topic hash function. + */ +static final byte TOPIC_HASH_MAGIC_BYTE = 0x00; +static final XXHash64 LZ4_HASH_INSTANCE = XXHashFactory.fastestInstance().hash64(); + +/** + * Computes the hash of the topics in a group. + * + * @param topicHashes The map of topic hashes. Key is topic name and value is the topic hash. + * @return The hash of the group. + */ +static long computeGroupHash(Map topicHashes) { +// Convert long to byte array. This is taken from guava LongHashCode#asBytes. +// https://github.com/google/guava/blob/bdf2a9d05342fca852645278d474082905e09d94/guava/src/com/google/common/hash/HashCode.java#L187-L199 +LongFunction longToBytes = (long value) -> new byte[] { +(byte) value, +(byte) (value >> 8), +(byte) (value >> 16), +(byte) (value >> 24), +(byte) (value >> 32), +(byte) (value >> 40), +(byte) (value >> 48), +(byte) (value >> 56) +}; + +// Combine the sorted topic hashes. +byte[] resultBytes = new byte[8]; +topicHashes.entrySet() +.stream() +.sorted(Map.Entry.comparingByKey()) // sort by topic name +.map(Map.Entry::getValue) +.map(longToBytes::apply) +.forEach(nextBytes -> { +// Combine ordered hashes. This is taken from guava Hashing#combineOrdered. +// https://github.com/google/guava/blob/bdf2a9d05342fca852645278d474082905e09d94/guava/src/com/google/common/hash/Hashing.java#L689-L712 +for (int i = 0; i < nextBytes.length; i++) { +resultBytes[i] = (byte) (resultBytes[i] * 37 ^ nextBytes[i]); +} +}); + +return LZ4_HASH_INSTANCE.hash(resultBytes, 0, resultBytes.length, 0); +} + +/** + * Computes the hash of the topic id, name, number of partitions, and partition racks by XXHash64. + * + * @param topicImage The topic image. + * @param clusterImage The cluster image. + * @return The hash of the topic. + */ +static long computeTopicHash(TopicImage topicImage, ClusterImage clusterImage) throws IOException { Review Comment: > If the hashing function is ever changed, is there a version field that should be updated? yes, there is a magic byte as version. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17747: [2/N] Add compute topic and group hash [kafka]
squah-confluent commented on code in PR #19523: URL: https://github.com/apache/kafka/pull/19523#discussion_r2069188025 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java: ## @@ -209,4 +219,50 @@ void validateOffsetFetch( default boolean shouldExpire() { return true; } + +/** + * Computes the hash of the topics in a group. + * + * @param topicHashes The map of topic hashes. Key is topic name and value is the topic hash. + * @return The hash of the group. + */ +static long computeGroupHash(Map topicHashes) { +return Hashing.combineOrdered( +topicHashes.entrySet() +.stream() +.sorted(Map.Entry.comparingByKey()) +.map(e -> HashCode.fromLong(e.getValue())) +.toList() +).asLong(); +} + +/** + * Computes the hash of the topic id, name, number of partitions, and partition racks by Murmur3. + * + * @param topicImage The topic image. + * @param clusterImage The cluster image. + * @return The hash of the topic. + */ +static long computeTopicHash(TopicImage topicImage, ClusterImage clusterImage) { +HashFunction hf = Hashing.murmur3_128(); +Hasher topicHasher = hf.newHasher() +.putByte((byte) 0) // magic byte +.putLong(topicImage.id().hashCode()) // topic Id +.putString(topicImage.name(), StandardCharsets.UTF_8) // topic name +.putInt(topicImage.partitions().size()); // number of partitions + + topicImage.partitions().entrySet().stream().sorted(Map.Entry.comparingByKey()).forEach(entry -> { +topicHasher.putInt(entry.getKey()); // partition id +String racks = Arrays.stream(entry.getValue().replicas) +.mapToObj(clusterImage::broker) +.filter(Objects::nonNull) +.map(BrokerRegistration::rack) +.filter(Optional::isPresent) +.map(Optional::get) +.sorted() +.collect(Collectors.joining(";")); Review Comment: I think this is fine preventing for accidental collisions. Though it's still possible to _intentionally_ come up with rack names that create collisions, but I believe you'd only be impacting your own cluster. To rule out any ambiguity, we could pretend this was a serialization format and either prefixed strings with their length, or null-terminate them. The same for variable-length lists of strings. These can either be length-prefixed or terminated with an invalid string that cannot occur (""? but not sure on this). ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java: ## @@ -209,4 +219,50 @@ void validateOffsetFetch( default boolean shouldExpire() { return true; } + +/** + * Computes the hash of the topics in a group. + * + * @param topicHashes The map of topic hashes. Key is topic name and value is the topic hash. + * @return The hash of the group. + */ +static long computeGroupHash(Map topicHashes) { +return Hashing.combineOrdered( +topicHashes.entrySet() +.stream() +.sorted(Map.Entry.comparingByKey()) +.map(e -> HashCode.fromLong(e.getValue())) +.toList() +).asLong(); +} + +/** + * Computes the hash of the topic id, name, number of partitions, and partition racks by Murmur3. + * + * @param topicImage The topic image. + * @param clusterImage The cluster image. + * @return The hash of the topic. + */ +static long computeTopicHash(TopicImage topicImage, ClusterImage clusterImage) { +HashFunction hf = Hashing.murmur3_128(); +Hasher topicHasher = hf.newHasher() +.putByte((byte) 0) // magic byte +.putLong(topicImage.id().hashCode()) // topic Id +.putString(topicImage.name(), StandardCharsets.UTF_8) // topic name +.putInt(topicImage.partitions().size()); // number of partitions + + topicImage.partitions().entrySet().stream().sorted(Map.Entry.comparingByKey()).forEach(entry -> { +topicHasher.putInt(entry.getKey()); // partition id +String racks = Arrays.stream(entry.getValue().replicas) +.mapToObj(clusterImage::broker) +.filter(Objects::nonNull) +.map(BrokerRegistration::rack) +.filter(Optional::isPresent) +.map(Optional::get) +.sorted() +.collect(Collectors.joining(";")); Review Comment: I think this is fine preventing for accidental collisions. Though it's still possible to _intentionally_ come up with rack names that create collisions, but I believe you'd only be impacting your own cluster. To rule out any ambiguity, we could pretend thi
Re: [PR] KAFKA-17747: [2/N] Add compute topic and group hash [kafka]
squah-confluent commented on code in PR #19523: URL: https://github.com/apache/kafka/pull/19523#discussion_r2069190560 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Utils.java: ## @@ -324,4 +337,87 @@ static void throwIfRegularExpressionIsInvalid( regex, ex.getDescription())); } } + +/** + * The magic byte used to identify the version of topic hash function. + */ +static final byte TOPIC_HASH_MAGIC_BYTE = 0x00; +static final XXHash64 LZ4_HASH_INSTANCE = XXHashFactory.fastestInstance().hash64(); + +/** + * Computes the hash of the topics in a group. + * + * @param topicHashes The map of topic hashes. Key is topic name and value is the topic hash. + * @return The hash of the group. + */ +static long computeGroupHash(Map topicHashes) { +// Convert long to byte array. This is taken from guava LongHashCode#asBytes. +// https://github.com/google/guava/blob/bdf2a9d05342fca852645278d474082905e09d94/guava/src/com/google/common/hash/HashCode.java#L187-L199 +LongFunction longToBytes = (long value) -> new byte[] { +(byte) value, +(byte) (value >> 8), +(byte) (value >> 16), +(byte) (value >> 24), +(byte) (value >> 32), +(byte) (value >> 40), +(byte) (value >> 48), +(byte) (value >> 56) +}; + +// Combine the sorted topic hashes. +byte[] resultBytes = new byte[8]; +topicHashes.entrySet() +.stream() +.sorted(Map.Entry.comparingByKey()) // sort by topic name +.map(Map.Entry::getValue) +.map(longToBytes::apply) +.forEach(nextBytes -> { +// Combine ordered hashes. This is taken from guava Hashing#combineOrdered. +// https://github.com/google/guava/blob/bdf2a9d05342fca852645278d474082905e09d94/guava/src/com/google/common/hash/Hashing.java#L689-L712 +for (int i = 0; i < nextBytes.length; i++) { +resultBytes[i] = (byte) (resultBytes[i] * 37 ^ nextBytes[i]); +} +}); + +return LZ4_HASH_INSTANCE.hash(resultBytes, 0, resultBytes.length, 0); +} + +/** + * Computes the hash of the topic id, name, number of partitions, and partition racks by XXHash64. + * + * @param topicImage The topic image. + * @param clusterImage The cluster image. + * @return The hash of the topic. + */ +static long computeTopicHash(TopicImage topicImage, ClusterImage clusterImage) throws IOException { Review Comment: If the hashing function is ever changed, is there a version field that should be updated? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17747: [2/N] Add compute topic and group hash [kafka]
squah-confluent commented on code in PR #19523: URL: https://github.com/apache/kafka/pull/19523#discussion_r2069188025 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java: ## @@ -209,4 +219,50 @@ void validateOffsetFetch( default boolean shouldExpire() { return true; } + +/** + * Computes the hash of the topics in a group. + * + * @param topicHashes The map of topic hashes. Key is topic name and value is the topic hash. + * @return The hash of the group. + */ +static long computeGroupHash(Map topicHashes) { +return Hashing.combineOrdered( +topicHashes.entrySet() +.stream() +.sorted(Map.Entry.comparingByKey()) +.map(e -> HashCode.fromLong(e.getValue())) +.toList() +).asLong(); +} + +/** + * Computes the hash of the topic id, name, number of partitions, and partition racks by Murmur3. + * + * @param topicImage The topic image. + * @param clusterImage The cluster image. + * @return The hash of the topic. + */ +static long computeTopicHash(TopicImage topicImage, ClusterImage clusterImage) { +HashFunction hf = Hashing.murmur3_128(); +Hasher topicHasher = hf.newHasher() +.putByte((byte) 0) // magic byte +.putLong(topicImage.id().hashCode()) // topic Id +.putString(topicImage.name(), StandardCharsets.UTF_8) // topic name +.putInt(topicImage.partitions().size()); // number of partitions + + topicImage.partitions().entrySet().stream().sorted(Map.Entry.comparingByKey()).forEach(entry -> { +topicHasher.putInt(entry.getKey()); // partition id +String racks = Arrays.stream(entry.getValue().replicas) +.mapToObj(clusterImage::broker) +.filter(Objects::nonNull) +.map(BrokerRegistration::rack) +.filter(Optional::isPresent) +.map(Optional::get) +.sorted() +.collect(Collectors.joining(";")); Review Comment: I think this is fine preventing for accidental collisions. Though it's still possible to _intentionally_ come up with rack names that create collisions, but I believe you'd only be impacting your own cluster. To rule out any ambiguity, I'd strongly prefer if we pretended this was a serialization format and either prefixed strings with their length, or null-terminated them. The same for variable-length lists of strings. These can either be length-prefixed or terminated with an invalid string that cannot occur (""? but not sure on this). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17747: [2/N] Add compute topic and group hash [kafka]
chia7712 commented on code in PR #19523: URL: https://github.com/apache/kafka/pull/19523#discussion_r2068885949 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Utils.java: ## @@ -324,4 +337,87 @@ static void throwIfRegularExpressionIsInvalid( regex, ex.getDescription())); } } + +/** + * The magic byte used to identify the version of topic hash function. + */ +static final byte TOPIC_HASH_MAGIC_BYTE = 0x00; +static final XXHash64 LZ4_HASH_INSTANCE = XXHashFactory.fastestInstance().hash64(); + +/** + * Computes the hash of the topics in a group. + * + * @param topicHashes The map of topic hashes. Key is topic name and value is the topic hash. + * @return The hash of the group. + */ +static long computeGroupHash(Map topicHashes) { +// Convert long to byte array. This is taken from guava LongHashCode#asBytes. +// https://github.com/google/guava/blob/bdf2a9d05342fca852645278d474082905e09d94/guava/src/com/google/common/hash/HashCode.java#L187-L199 +LongFunction longToBytes = (long value) -> new byte[] { +(byte) value, +(byte) (value >> 8), +(byte) (value >> 16), +(byte) (value >> 24), +(byte) (value >> 32), +(byte) (value >> 40), +(byte) (value >> 48), +(byte) (value >> 56) +}; + +// Combine the sorted topic hashes. +byte[] resultBytes = new byte[8]; +topicHashes.entrySet() +.stream() +.sorted(Map.Entry.comparingByKey()) // sort by topic name +.map(Map.Entry::getValue) +.map(longToBytes::apply) +.forEach(nextBytes -> { +// Combine ordered hashes. This is taken from guava Hashing#combineOrdered. +// https://github.com/google/guava/blob/bdf2a9d05342fca852645278d474082905e09d94/guava/src/com/google/common/hash/Hashing.java#L689-L712 +for (int i = 0; i < nextBytes.length; i++) { +resultBytes[i] = (byte) (resultBytes[i] * 37 ^ nextBytes[i]); +} +}); + +return LZ4_HASH_INSTANCE.hash(resultBytes, 0, resultBytes.length, 0); +} + +/** + * Computes the hash of the topic id, name, number of partitions, and partition racks by XXHash64. + * + * @param topicImage The topic image. + * @param clusterImage The cluster image. + * @return The hash of the topic. + */ +static long computeTopicHash(TopicImage topicImage, ClusterImage clusterImage) throws IOException { Review Comment: Please add documentation to remind developers that the hash is stored as part of the state. Changing the implementation of the hashing function may break compatibility with existing states. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17747: [2/N] Add compute topic and group hash [kafka]
FrankYang0529 commented on code in PR #19523: URL: https://github.com/apache/kafka/pull/19523#discussion_r2068862253 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java: ## @@ -209,4 +221,90 @@ void validateOffsetFetch( default boolean shouldExpire() { return true; } + +/** + * The magic byte used to identify the version of topic hash function. + */ +byte TOPIC_HASH_MAGIC_BYTE = 0x00; +XXHash64 LZ4_HASH_INSTANCE = XXHashFactory.fastestInstance().hash64(); + +/** + * Computes the hash of the topics in a group. + * + * @param topicHashes The map of topic hashes. Key is topic name and value is the topic hash. + * @return The hash of the group. + */ +static long computeGroupHash(Map topicHashes) { +// Convert long to byte array. This is taken from guava LongHashCode#asBytes. +// https://github.com/google/guava/blob/bdf2a9d05342fca852645278d474082905e09d94/guava/src/com/google/common/hash/HashCode.java#L187-L199 +LongFunction longToBytes = (long value) -> new byte[] { +(byte) value, +(byte) (value >> 8), +(byte) (value >> 16), +(byte) (value >> 24), +(byte) (value >> 32), +(byte) (value >> 40), +(byte) (value >> 48), +(byte) (value >> 56) +}; + +// Combine the sorted topic hashes. +byte[] resultBytes = new byte[8]; +topicHashes.entrySet() +.stream() +.sorted(Map.Entry.comparingByKey()) // sort by topic name +.map(Map.Entry::getValue) +.map(longToBytes::apply) +.forEach(nextBytes -> { +// Combine ordered hashes. This is taken from guava Hashing#combineOrdered. +// https://github.com/google/guava/blob/bdf2a9d05342fca852645278d474082905e09d94/guava/src/com/google/common/hash/Hashing.java#L689-L712 +for (int i = 0; i < nextBytes.length; i++) { +resultBytes[i] = (byte) (resultBytes[i] * 37 ^ nextBytes[i]); +} +}); + +// Convert the byte array to long. This is taken from guava BytesHashCode#asLong. +// https://github.com/google/guava/blob/bdf2a9d05342fca852645278d474082905e09d94/guava/src/com/google/common/hash/HashCode.java#L279-L295 +long retVal = (resultBytes[0] & 0xFF); +for (int i = 1; i < resultBytes.length; i++) { +retVal |= (resultBytes[i] & 0xFFL) << (i * 8); +} +return retVal; +} + +/** + * Computes the hash of the topic id, name, number of partitions, and partition racks by Murmur3. + * + * @param topicImage The topic image. + * @param clusterImage The cluster image. + * @return The hash of the topic. + */ +static long computeTopicHash(TopicImage topicImage, ClusterImage clusterImage) throws IOException { +try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); Review Comment: Sorry, I misunderstood ByteBufferOutputStream. I thought it uses fixed capacity even if there is no enough buffer. After checking the source code, it expands memory if the buffer is not big enough. Updated it. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17747: [2/N] Add compute topic and group hash [kafka]
FrankYang0529 commented on code in PR #19523: URL: https://github.com/apache/kafka/pull/19523#discussion_r2066485713 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java: ## @@ -209,4 +221,90 @@ void validateOffsetFetch( default boolean shouldExpire() { return true; } + +/** + * The magic byte used to identify the version of topic hash function. + */ +byte TOPIC_HASH_MAGIC_BYTE = 0x00; +XXHash64 LZ4_HASH_INSTANCE = XXHashFactory.fastestInstance().hash64(); + +/** + * Computes the hash of the topics in a group. + * + * @param topicHashes The map of topic hashes. Key is topic name and value is the topic hash. + * @return The hash of the group. + */ +static long computeGroupHash(Map topicHashes) { +// Convert long to byte array. This is taken from guava LongHashCode#asBytes. +// https://github.com/google/guava/blob/bdf2a9d05342fca852645278d474082905e09d94/guava/src/com/google/common/hash/HashCode.java#L187-L199 +LongFunction longToBytes = (long value) -> new byte[] { +(byte) value, +(byte) (value >> 8), +(byte) (value >> 16), +(byte) (value >> 24), +(byte) (value >> 32), +(byte) (value >> 40), +(byte) (value >> 48), +(byte) (value >> 56) +}; + +// Combine the sorted topic hashes. +byte[] resultBytes = new byte[8]; +topicHashes.entrySet() +.stream() +.sorted(Map.Entry.comparingByKey()) // sort by topic name +.map(Map.Entry::getValue) +.map(longToBytes::apply) +.forEach(nextBytes -> { +// Combine ordered hashes. This is taken from guava Hashing#combineOrdered. +// https://github.com/google/guava/blob/bdf2a9d05342fca852645278d474082905e09d94/guava/src/com/google/common/hash/Hashing.java#L689-L712 +for (int i = 0; i < nextBytes.length; i++) { +resultBytes[i] = (byte) (resultBytes[i] * 37 ^ nextBytes[i]); +} +}); + +// Convert the byte array to long. This is taken from guava BytesHashCode#asLong. Review Comment: Yes, we can use it. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17747: [2/N] Add compute topic and group hash [kafka]
FrankYang0529 commented on code in PR #19523: URL: https://github.com/apache/kafka/pull/19523#discussion_r2066476357 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java: ## @@ -209,4 +221,90 @@ void validateOffsetFetch( default boolean shouldExpire() { return true; } + +/** + * The magic byte used to identify the version of topic hash function. + */ +byte TOPIC_HASH_MAGIC_BYTE = 0x00; +XXHash64 LZ4_HASH_INSTANCE = XXHashFactory.fastestInstance().hash64(); Review Comment: Move to `org.apache.kafka.coordinator.group.Utils`. Thanks. ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java: ## @@ -209,4 +221,90 @@ void validateOffsetFetch( default boolean shouldExpire() { return true; } + +/** + * The magic byte used to identify the version of topic hash function. + */ +byte TOPIC_HASH_MAGIC_BYTE = 0x00; +XXHash64 LZ4_HASH_INSTANCE = XXHashFactory.fastestInstance().hash64(); + +/** + * Computes the hash of the topics in a group. + * + * @param topicHashes The map of topic hashes. Key is topic name and value is the topic hash. + * @return The hash of the group. + */ +static long computeGroupHash(Map topicHashes) { +// Convert long to byte array. This is taken from guava LongHashCode#asBytes. +// https://github.com/google/guava/blob/bdf2a9d05342fca852645278d474082905e09d94/guava/src/com/google/common/hash/HashCode.java#L187-L199 +LongFunction longToBytes = (long value) -> new byte[] { +(byte) value, +(byte) (value >> 8), +(byte) (value >> 16), +(byte) (value >> 24), +(byte) (value >> 32), +(byte) (value >> 40), +(byte) (value >> 48), +(byte) (value >> 56) +}; + +// Combine the sorted topic hashes. +byte[] resultBytes = new byte[8]; +topicHashes.entrySet() +.stream() +.sorted(Map.Entry.comparingByKey()) // sort by topic name +.map(Map.Entry::getValue) +.map(longToBytes::apply) +.forEach(nextBytes -> { +// Combine ordered hashes. This is taken from guava Hashing#combineOrdered. +// https://github.com/google/guava/blob/bdf2a9d05342fca852645278d474082905e09d94/guava/src/com/google/common/hash/Hashing.java#L689-L712 +for (int i = 0; i < nextBytes.length; i++) { +resultBytes[i] = (byte) (resultBytes[i] * 37 ^ nextBytes[i]); +} +}); + +// Convert the byte array to long. This is taken from guava BytesHashCode#asLong. +// https://github.com/google/guava/blob/bdf2a9d05342fca852645278d474082905e09d94/guava/src/com/google/common/hash/HashCode.java#L279-L295 +long retVal = (resultBytes[0] & 0xFF); +for (int i = 1; i < resultBytes.length; i++) { +retVal |= (resultBytes[i] & 0xFFL) << (i * 8); +} +return retVal; +} + +/** + * Computes the hash of the topic id, name, number of partitions, and partition racks by Murmur3. Review Comment: Updated it. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17747: [2/N] Add compute topic and group hash [kafka]
FrankYang0529 commented on code in PR #19523: URL: https://github.com/apache/kafka/pull/19523#discussion_r2066410256 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java: ## @@ -209,4 +221,90 @@ void validateOffsetFetch( default boolean shouldExpire() { return true; } + +/** + * The magic byte used to identify the version of topic hash function. + */ +byte TOPIC_HASH_MAGIC_BYTE = 0x00; +XXHash64 LZ4_HASH_INSTANCE = XXHashFactory.fastestInstance().hash64(); + +/** + * Computes the hash of the topics in a group. + * + * @param topicHashes The map of topic hashes. Key is topic name and value is the topic hash. + * @return The hash of the group. + */ +static long computeGroupHash(Map topicHashes) { +// Convert long to byte array. This is taken from guava LongHashCode#asBytes. +// https://github.com/google/guava/blob/bdf2a9d05342fca852645278d474082905e09d94/guava/src/com/google/common/hash/HashCode.java#L187-L199 +LongFunction longToBytes = (long value) -> new byte[] { +(byte) value, +(byte) (value >> 8), +(byte) (value >> 16), +(byte) (value >> 24), +(byte) (value >> 32), +(byte) (value >> 40), +(byte) (value >> 48), +(byte) (value >> 56) +}; + +// Combine the sorted topic hashes. +byte[] resultBytes = new byte[8]; +topicHashes.entrySet() +.stream() +.sorted(Map.Entry.comparingByKey()) // sort by topic name +.map(Map.Entry::getValue) +.map(longToBytes::apply) +.forEach(nextBytes -> { +// Combine ordered hashes. This is taken from guava Hashing#combineOrdered. +// https://github.com/google/guava/blob/bdf2a9d05342fca852645278d474082905e09d94/guava/src/com/google/common/hash/Hashing.java#L689-L712 +for (int i = 0; i < nextBytes.length; i++) { +resultBytes[i] = (byte) (resultBytes[i] * 37 ^ nextBytes[i]); +} +}); + +// Convert the byte array to long. This is taken from guava BytesHashCode#asLong. +// https://github.com/google/guava/blob/bdf2a9d05342fca852645278d474082905e09d94/guava/src/com/google/common/hash/HashCode.java#L279-L295 +long retVal = (resultBytes[0] & 0xFF); +for (int i = 1; i < resultBytes.length; i++) { +retVal |= (resultBytes[i] & 0xFFL) << (i * 8); +} +return retVal; +} + +/** + * Computes the hash of the topic id, name, number of partitions, and partition racks by Murmur3. + * + * @param topicImage The topic image. + * @param clusterImage The cluster image. + * @return The hash of the topic. + */ +static long computeTopicHash(TopicImage topicImage, ClusterImage clusterImage) throws IOException { +try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputStream dos = new DataOutputStream(baos)) { +dos.writeByte(TOPIC_HASH_MAGIC_BYTE); // magic byte +dos.writeLong(topicImage.id().hashCode()); // topic ID +dos.writeUTF(topicImage.name()); // topic name +dos.writeInt(topicImage.partitions().size()); // number of partitions +for (int i = 0; i < topicImage.partitions().size(); i++) { +dos.writeInt(i); // partition id +List sortedRacksList = Arrays.stream(topicImage.partitions().get(i).replicas) +.mapToObj(clusterImage::broker) +.filter(Objects::nonNull) +.map(BrokerRegistration::rack) +.filter(Optional::isPresent) +.map(Optional::get) +.sorted() +.toList(); + +String racks = IntStream.range(0, sortedRacksList.size()) Review Comment: There is no limitation for rack string, so any character can be part of rack string. I can update KIP if needs. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17747: [2/N] Add compute topic and group hash [kafka]
chia7712 commented on code in PR #19523: URL: https://github.com/apache/kafka/pull/19523#discussion_r2066374101 ## gradle/dependencies.gradle: ## @@ -147,6 +148,7 @@ libs += [ caffeine: "com.github.ben-manes.caffeine:caffeine:$versions.caffeine", classgraph: "io.github.classgraph:classgraph:$versions.classgraph", commonsValidator: "commons-validator:commons-validator:$versions.commonsValidator", + guava: "com.google.guava:guava:$versions.guava", Review Comment: > The ByteBufferOutputStream needs a ByteBuffer with capacity. I wonder whether we can calculate a accurate capacity. For example, rack string can contain any character. It presents 1 to 4 bytes in UTF-8. The initialize capacity can be discussed later. In fact, it may be not a issue if we adopt the growable buffer. The buffer can be big enough for each hash computing eventually. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17747: [2/N] Add compute topic and group hash [kafka]
chia7712 commented on code in PR #19523: URL: https://github.com/apache/kafka/pull/19523#discussion_r2066288123 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java: ## @@ -209,4 +221,90 @@ void validateOffsetFetch( default boolean shouldExpire() { return true; } + +/** + * The magic byte used to identify the version of topic hash function. + */ +byte TOPIC_HASH_MAGIC_BYTE = 0x00; +XXHash64 LZ4_HASH_INSTANCE = XXHashFactory.fastestInstance().hash64(); + +/** + * Computes the hash of the topics in a group. + * + * @param topicHashes The map of topic hashes. Key is topic name and value is the topic hash. + * @return The hash of the group. + */ +static long computeGroupHash(Map topicHashes) { +// Convert long to byte array. This is taken from guava LongHashCode#asBytes. +// https://github.com/google/guava/blob/bdf2a9d05342fca852645278d474082905e09d94/guava/src/com/google/common/hash/HashCode.java#L187-L199 +LongFunction longToBytes = (long value) -> new byte[] { +(byte) value, +(byte) (value >> 8), +(byte) (value >> 16), +(byte) (value >> 24), +(byte) (value >> 32), +(byte) (value >> 40), +(byte) (value >> 48), +(byte) (value >> 56) +}; + +// Combine the sorted topic hashes. +byte[] resultBytes = new byte[8]; +topicHashes.entrySet() +.stream() +.sorted(Map.Entry.comparingByKey()) // sort by topic name +.map(Map.Entry::getValue) +.map(longToBytes::apply) +.forEach(nextBytes -> { +// Combine ordered hashes. This is taken from guava Hashing#combineOrdered. +// https://github.com/google/guava/blob/bdf2a9d05342fca852645278d474082905e09d94/guava/src/com/google/common/hash/Hashing.java#L689-L712 +for (int i = 0; i < nextBytes.length; i++) { +resultBytes[i] = (byte) (resultBytes[i] * 37 ^ nextBytes[i]); +} +}); + +// Convert the byte array to long. This is taken from guava BytesHashCode#asLong. +// https://github.com/google/guava/blob/bdf2a9d05342fca852645278d474082905e09d94/guava/src/com/google/common/hash/HashCode.java#L279-L295 +long retVal = (resultBytes[0] & 0xFF); +for (int i = 1; i < resultBytes.length; i++) { +retVal |= (resultBytes[i] & 0xFFL) << (i * 8); +} +return retVal; +} + +/** + * Computes the hash of the topic id, name, number of partitions, and partition racks by Murmur3. Review Comment: please update the docs ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java: ## @@ -209,4 +221,90 @@ void validateOffsetFetch( default boolean shouldExpire() { return true; } + +/** + * The magic byte used to identify the version of topic hash function. + */ +byte TOPIC_HASH_MAGIC_BYTE = 0x00; +XXHash64 LZ4_HASH_INSTANCE = XXHashFactory.fastestInstance().hash64(); + +/** + * Computes the hash of the topics in a group. + * + * @param topicHashes The map of topic hashes. Key is topic name and value is the topic hash. + * @return The hash of the group. + */ +static long computeGroupHash(Map topicHashes) { +// Convert long to byte array. This is taken from guava LongHashCode#asBytes. +// https://github.com/google/guava/blob/bdf2a9d05342fca852645278d474082905e09d94/guava/src/com/google/common/hash/HashCode.java#L187-L199 +LongFunction longToBytes = (long value) -> new byte[] { +(byte) value, +(byte) (value >> 8), +(byte) (value >> 16), +(byte) (value >> 24), +(byte) (value >> 32), +(byte) (value >> 40), +(byte) (value >> 48), +(byte) (value >> 56) +}; + +// Combine the sorted topic hashes. +byte[] resultBytes = new byte[8]; +topicHashes.entrySet() +.stream() +.sorted(Map.Entry.comparingByKey()) // sort by topic name +.map(Map.Entry::getValue) +.map(longToBytes::apply) +.forEach(nextBytes -> { +// Combine ordered hashes. This is taken from guava Hashing#combineOrdered. +// https://github.com/google/guava/blob/bdf2a9d05342fca852645278d474082905e09d94/guava/src/com/google/common/hash/Hashing.java#L689-L712 +for (int i = 0; i < nextBytes.length; i++) { +resultBytes[i] = (byte) (resultBytes[i] * 37 ^ nextBytes[i]); +} +}); + +// Convert the byte array to long. This is taken from guava BytesHashCode#asLong. +// https://github.com/google/guava/blob/bdf2a9d05342fca852645278d474082905e09d94/gua
Re: [PR] KAFKA-17747: [2/N] Add compute topic and group hash [kafka]
FrankYang0529 commented on code in PR #19523: URL: https://github.com/apache/kafka/pull/19523#discussion_r2066325978 ## gradle/dependencies.gradle: ## @@ -147,6 +148,7 @@ libs += [ caffeine: "com.github.ben-manes.caffeine:caffeine:$versions.caffeine", classgraph: "io.github.classgraph:classgraph:$versions.classgraph", commonsValidator: "commons-validator:commons-validator:$versions.commonsValidator", + guava: "com.google.guava:guava:$versions.guava", Review Comment: > I also wonder what is the impact of putting all the data to a byte array before hashing it. Do you have thoughts on this? Based on the KIP-1101, it minimizes the calculation count of topic hash. The result can be shared between groups. I think we can keep this function simple currently. > I suggest that EventProcessorThread can leverage GrowableBufferSupplier to reuse buffer as much as possible. With BufferSupplier, the hash function needs to be thread safe to reuse the buffer. We can revisit it in the future. > Additionally, Group#computeTopicHashin should use ByteBufferOutputStream to generate the bytes array, as ByteBufferOutputStream#buffer#array can avoid extra array copy like ByteArrayOutputStream#toByteArray The ByteBufferOutputStream needs a ByteBuffer with capacity. I wonder whether we can calculate a accurate capacity. For example, rack string can contain any character. It presents 1 to 4 bytes in UTF-8. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17747: [2/N] Add compute topic and group hash [kafka]
chia7712 commented on code in PR #19523: URL: https://github.com/apache/kafka/pull/19523#discussion_r2066249125 ## gradle/dependencies.gradle: ## @@ -147,6 +148,7 @@ libs += [ caffeine: "com.github.ben-manes.caffeine:caffeine:$versions.caffeine", classgraph: "io.github.classgraph:classgraph:$versions.classgraph", commonsValidator: "commons-validator:commons-validator:$versions.commonsValidator", + guava: "com.google.guava:guava:$versions.guava", Review Comment: > I agree with using a BufferSupplier in order to reuse buffers. However, EventProcessorThread may be too low level to hold it. Having it in Shard may be enough. we can revisit this when the critical code are used by production :) @FrankYang0529 thanks for updates. the result LGTM. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17747: [2/N] Add compute topic and group hash [kafka]
FrankYang0529 commented on code in PR #19523: URL: https://github.com/apache/kafka/pull/19523#discussion_r2066166230 ## gradle/dependencies.gradle: ## @@ -147,6 +148,7 @@ libs += [ caffeine: "com.github.ben-manes.caffeine:caffeine:$versions.caffeine", classgraph: "io.github.classgraph:classgraph:$versions.classgraph", commonsValidator: "commons-validator:commons-validator:$versions.commonsValidator", + guava: "com.google.guava:guava:$versions.guava", Review Comment: Thanks for the suggestion. Updated benchmark result. ``` Benchmark (numReplicasPerBroker) (partitionsPerTopic) (replicationFactor) Mode Cnt ScoreError Units TopicHashBenchmark.testLz4 1010 3 avgt 15 166.389 ± 1.542 ns/op TopicHashBenchmark.testLz4 1050 3 avgt 15 375.660 ± 2.771 ns/op TopicHashBenchmark.testLz4 10 100 3 avgt 15 636.176 ± 8.305 ns/op TopicHashBenchmark.testMurmur 1010 3 avgt 15 238.242 ± 1.664 ns/op TopicHashBenchmark.testMurmur 1050 3 avgt 15 1143.583 ± 5.981 ns/op TopicHashBenchmark.testMurmur 10 100 3 avgt 15 2278.680 ± 29.007 ns/op ``` TopicHashBenchmark.java ```java package org.apache.kafka.jmh.metadata; import org.apache.kafka.common.Uuid; import org.apache.kafka.common.metadata.RegisterBrokerRecord; import org.apache.kafka.image.ClusterDelta; import org.apache.kafka.image.ClusterImage; import org.apache.kafka.image.TopicsDelta; import org.apache.kafka.image.TopicImage; import org.apache.kafka.metadata.BrokerRegistration; import org.apache.kafka.streams.state.internals.Murmur3; import net.jpountz.xxhash.XXHash64; import net.jpountz.xxhash.XXHashFactory; import org.openjdk.jmh.annotations.Benchmark; import org.openjdk.jmh.annotations.BenchmarkMode; import org.openjdk.jmh.annotations.Fork; import org.openjdk.jmh.annotations.Level; import org.openjdk.jmh.annotations.Measurement; import org.openjdk.jmh.annotations.Mode; import org.openjdk.jmh.annotations.OutputTimeUnit; import org.openjdk.jmh.annotations.Param; import org.openjdk.jmh.annotations.Scope; import org.openjdk.jmh.annotations.Setup; import org.openjdk.jmh.annotations.State; import org.openjdk.jmh.annotations.Warmup; import java.io.ByteArrayOutputStream; import java.io.DataOutputStream; import java.io.IOException; import java.util.Arrays; import java.util.List; import java.util.Objects; import java.util.Optional; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import java.util.stream.IntStream; import static org.apache.kafka.jmh.metadata.TopicsImageSnapshotLoadBenchmark.getInitialTopicsDelta; import static org.apache.kafka.jmh.metadata.TopicsImageSnapshotLoadBenchmark.getNumBrokers; @State(Scope.Benchmark) @Fork(value = 1) @Warmup(iterations = 5) @Measurement(iterations = 15) @BenchmarkMode(Mode.AverageTime) @OutputTimeUnit(TimeUnit.NANOSECONDS) public class TopicHashBenchmark { @Param({"10", "50", "100"}) private int partitionsPerTopic; @Param({"3"}) private int replicationFactor; @Param({"10"}) private int numReplicasPerBroker; private byte[] topicBytes; @Setup(Level.Trial) public void setup() throws IOException { TopicsDelta topicsDelta = getInitialTopicsDelta(1, partitionsPerTopic, replicationFactor, numReplicasPerBroker); int numBrokers = getNumBrokers(1, partitionsPerTopic, replicationFactor, numReplicasPerBroker); ClusterDelta clusterDelta = new ClusterDelta(ClusterImage.EMPTY); for (int i = 0; i < numBrokers; i++) { clusterDelta.replay(new RegisterBrokerRecord() .setBrokerId(i) .setRack(Uuid.randomUuid().toString()) ); } TopicImage topicImage = topicsDelta.apply().topicsById().values().stream().findFirst().get(); ClusterImage clusterImage = clusterDelta.apply(); try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); DataOutputStream dos = new DataOutputStream(baos)) { dos.writeByte(0); // magic byte dos.writeLong(topicImage.id().hashCode()); // topic ID dos.writeUTF(topicImage.name()); // topic name dos.writeInt(topicImage.partitions().size()); // number of partitions for (int i = 0; i < topicImage.partitions().size(); i++) {
Re: [PR] KAFKA-17747: [2/N] Add compute topic and group hash [kafka]
dajac commented on code in PR #19523: URL: https://github.com/apache/kafka/pull/19523#discussion_r2066162858 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java: ## @@ -209,4 +221,90 @@ void validateOffsetFetch( default boolean shouldExpire() { return true; } + +/** + * The magic byte used to identify the version of topic hash function. + */ +byte TOPIC_HASH_MAGIC_BYTE = 0x00; +XXHash64 LZ4_HASH_INSTANCE = XXHashFactory.fastestInstance().hash64(); Review Comment: Would it be possible to put those and the new methods to a separate class? Having them in `Group` is weird because it won't be used by all the group types. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17747: [2/N] Add compute topic and group hash [kafka]
dajac commented on code in PR #19523: URL: https://github.com/apache/kafka/pull/19523#discussion_r2066160108 ## gradle/dependencies.gradle: ## @@ -147,6 +148,7 @@ libs += [ caffeine: "com.github.ben-manes.caffeine:caffeine:$versions.caffeine", classgraph: "io.github.classgraph:classgraph:$versions.classgraph", commonsValidator: "commons-validator:commons-validator:$versions.commonsValidator", + guava: "com.google.guava:guava:$versions.guava", Review Comment: > IIRC, the lz4-java is not maintained anymore, so not sure whether the code maintaining is a risk. > > > I also wonder what is the impact of putting all the data to a byte array before hashing it. Do you have thoughts on this? > > I suggest that `EventProcessorThread` can leverage `GrowableBufferSupplier` to reuse buffer as much as possible. Additionally, `Group#computeTopicHashin` should use `ByteBufferOutputStream` to generate the bytes array, as `ByteBufferOutputStream#buffer#array` can avoid extra array copy like `ByteArrayOutputStream#toByteArray` I agree with using a BufferSupplier in order to reuse buffers. However, EventProcessorThread may be too low level to hold it. Having it in Shard may be enough. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17747: [2/N] Add compute topic and group hash [kafka]
chia7712 commented on code in PR #19523: URL: https://github.com/apache/kafka/pull/19523#discussion_r2066122048 ## gradle/dependencies.gradle: ## @@ -147,6 +148,7 @@ libs += [ caffeine: "com.github.ben-manes.caffeine:caffeine:$versions.caffeine", classgraph: "io.github.classgraph:classgraph:$versions.classgraph", commonsValidator: "commons-validator:commons-validator:$versions.commonsValidator", + guava: "com.google.guava:guava:$versions.guava", Review Comment: @FrankYang0529 could you please move `baos.toByteArray()` out of benchmark? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17747: [2/N] Add compute topic and group hash [kafka]
FrankYang0529 commented on code in PR #19523: URL: https://github.com/apache/kafka/pull/19523#discussion_r2066016813 ## gradle/dependencies.gradle: ## @@ -147,6 +148,7 @@ libs += [ caffeine: "com.github.ben-manes.caffeine:caffeine:$versions.caffeine", classgraph: "io.github.classgraph:classgraph:$versions.classgraph", commonsValidator: "commons-validator:commons-validator:$versions.commonsValidator", + guava: "com.google.guava:guava:$versions.guava", Review Comment: Based on benchmark result, lz4 has better performance in our case. I will change to use it. ``` Benchmark (numReplicasPerBroker) (partitionsPerTopic) (replicationFactor) Mode Cnt ScoreError Units TopicHashBenchmark.testLz4 1010 3 avgt 15 194.553 ± 1.631 ns/op TopicHashBenchmark.testLz4 1050 3 avgt 15 484.640 ± 1.721 ns/op TopicHashBenchmark.testLz4 10 100 3 avgt 15 883.435 ± 4.001 ns/op TopicHashBenchmark.testMurmur 1010 3 avgt 15 205.529 ± 0.701 ns/op TopicHashBenchmark.testMurmur 1050 3 avgt 15 1066.528 ± 42.856 ns/op TopicHashBenchmark.testMurmur 10 100 3 avgt 15 2082.821 ± 10.935 ns/op ``` TopicHashBenchmark.java ```java package org.apache.kafka.jmh.metadata; import org.apache.kafka.common.Uuid; import org.apache.kafka.common.internals.Murmur3; import org.apache.kafka.common.metadata.RegisterBrokerRecord; import org.apache.kafka.image.ClusterDelta; import org.apache.kafka.image.ClusterImage; import org.apache.kafka.image.TopicsDelta; import org.apache.kafka.image.TopicImage; import org.apache.kafka.metadata.BrokerRegistration; import net.jpountz.xxhash.XXHash64; import net.jpountz.xxhash.XXHashFactory; import org.openjdk.jmh.annotations.Benchmark; import org.openjdk.jmh.annotations.BenchmarkMode; import org.openjdk.jmh.annotations.Fork; import org.openjdk.jmh.annotations.Level; import org.openjdk.jmh.annotations.Measurement; import org.openjdk.jmh.annotations.Mode; import org.openjdk.jmh.annotations.OutputTimeUnit; import org.openjdk.jmh.annotations.Param; import org.openjdk.jmh.annotations.Scope; import org.openjdk.jmh.annotations.Setup; import org.openjdk.jmh.annotations.State; import org.openjdk.jmh.annotations.TearDown; import org.openjdk.jmh.annotations.Warmup; import java.io.ByteArrayOutputStream; import java.io.DataOutputStream; import java.io.IOException; import java.util.Arrays; import java.util.List; import java.util.Objects; import java.util.Optional; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import java.util.stream.IntStream; import static org.apache.kafka.jmh.metadata.TopicsImageSnapshotLoadBenchmark.getInitialTopicsDelta; import static org.apache.kafka.jmh.metadata.TopicsImageSnapshotLoadBenchmark.getNumBrokers; @State(Scope.Benchmark) @Fork(value = 1) @Warmup(iterations = 5) @Measurement(iterations = 15) @BenchmarkMode(Mode.AverageTime) @OutputTimeUnit(TimeUnit.NANOSECONDS) public class TopicHashBenchmark { @Param({"10", "50", "100"}) private int partitionsPerTopic; @Param({"3"}) private int replicationFactor; @Param({"10"}) private int numReplicasPerBroker; private final ByteArrayOutputStream baos = new ByteArrayOutputStream(); private final DataOutputStream dos = new DataOutputStream(baos); @Setup(Level.Trial) public void setup() throws IOException { TopicsDelta topicsDelta = getInitialTopicsDelta(1, partitionsPerTopic, replicationFactor, numReplicasPerBroker); int numBrokers = getNumBrokers(1, partitionsPerTopic, replicationFactor, numReplicasPerBroker); ClusterDelta clusterDelta = new ClusterDelta(ClusterImage.EMPTY); for (int i = 0; i < numBrokers; i++) { clusterDelta.replay(new RegisterBrokerRecord() .setBrokerId(i) .setRack(Uuid.randomUuid().toString()) ); } TopicImage topicImage = topicsDelta.apply().topicsById().values().stream().findFirst().get(); ClusterImage clusterImage = clusterDelta.apply(); dos.writeByte(0); // magic byte dos.writeLong(topicImage.id().hashCode()); // topic ID dos.writeUTF(topicImage.name()); // topic name dos.writeInt(topicImage.partitions().size()); // number of partitions for (int i = 0; i < topicImag
Re: [PR] KAFKA-17747: [2/N] Add compute topic and group hash [kafka]
ijuma commented on code in PR #19523: URL: https://github.com/apache/kafka/pull/19523#discussion_r2064071619 ## gradle/dependencies.gradle: ## @@ -147,6 +148,7 @@ libs += [ caffeine: "com.github.ben-manes.caffeine:caffeine:$versions.caffeine", classgraph: "io.github.classgraph:classgraph:$versions.classgraph", commonsValidator: "commons-validator:commons-validator:$versions.commonsValidator", + guava: "com.google.guava:guava:$versions.guava", Review Comment: This hash function is used by zstd too. Its pretty safe to rely on it given that lz4 and zstd are the most popular compression algorithms. And we will be supporting them for the foreseeable future. Which particular implementation we use is a fair question. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17747: [2/N] Add compute topic and group hash [kafka]
chia7712 commented on code in PR #19523: URL: https://github.com/apache/kafka/pull/19523#discussion_r2064046103 ## gradle/dependencies.gradle: ## @@ -147,6 +148,7 @@ libs += [ caffeine: "com.github.ben-manes.caffeine:caffeine:$versions.caffeine", classgraph: "io.github.classgraph:classgraph:$versions.classgraph", commonsValidator: "commons-validator:commons-validator:$versions.commonsValidator", + guava: "com.google.guava:guava:$versions.guava", Review Comment: IIRC, the lz4-java is not maintained anymore, so not sure whether the code maintaining is a risk. > I also wonder what is the impact of putting all the data to a byte array before hashing it. Do you have thoughts on this? I suggest that `EventProcessorThread` can leverage `GrowableBufferSupplier` to reuse buffer as much as possible. Additionally, `Group#computeTopicHashin` should use `ByteBufferOutputStream` to generate the bytes array, as `ByteBufferOutputStream#buffer#array` can avoid extra array copy like `ByteArrayOutputStream#toByteArray` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17747: [2/N] Add compute topic and group hash [kafka]
dajac commented on code in PR #19523: URL: https://github.com/apache/kafka/pull/19523#discussion_r2063449780 ## gradle/dependencies.gradle: ## @@ -147,6 +148,7 @@ libs += [ caffeine: "com.github.ben-manes.caffeine:caffeine:$versions.caffeine", classgraph: "io.github.classgraph:classgraph:$versions.classgraph", commonsValidator: "commons-validator:commons-validator:$versions.commonsValidator", + guava: "com.google.guava:guava:$versions.guava", Review Comment: Thanks! While talking to @ijuma about it, he has suggested to look into https://github.com/lz4/lz4-java/tree/master/src/java/net/jpountz/xxhash. We get it via lz4 and it is apparently much faster than Murmur3. It may be worth running a few benchmarks to compare then. What do you think? I also wonder what is the impact of putting all the data to a byte array before hashing it. Do you have thoughts on this? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17747: [2/N] Add compute topic and group hash [kafka]
FrankYang0529 commented on code in PR #19523: URL: https://github.com/apache/kafka/pull/19523#discussion_r2063441471 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java: ## @@ -209,4 +219,50 @@ void validateOffsetFetch( default boolean shouldExpire() { return true; } + +/** + * Computes the hash of the topics in a group. + * + * @param topicHashes The map of topic hashes. Key is topic name and value is the topic hash. + * @return The hash of the group. + */ +static long computeGroupHash(Map topicHashes) { +return Hashing.combineOrdered( +topicHashes.entrySet() +.stream() +.sorted(Map.Entry.comparingByKey()) +.map(e -> HashCode.fromLong(e.getValue())) +.toList() +).asLong(); +} + +/** + * Computes the hash of the topic id, name, number of partitions, and partition racks by Murmur3. + * + * @param topicImage The topic image. + * @param clusterImage The cluster image. + * @return The hash of the topic. + */ +static long computeTopicHash(TopicImage topicImage, ClusterImage clusterImage) { +HashFunction hf = Hashing.murmur3_128(); +Hasher topicHasher = hf.newHasher() +.putByte((byte) 0) // magic byte Review Comment: Yes, add `TOPIC_HASH_MAGIC_BYTE`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17747: [2/N] Add compute topic and group hash [kafka]
FrankYang0529 commented on code in PR #19523: URL: https://github.com/apache/kafka/pull/19523#discussion_r2063441134 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java: ## @@ -209,4 +219,50 @@ void validateOffsetFetch( default boolean shouldExpire() { return true; } + +/** + * Computes the hash of the topics in a group. + * + * @param topicHashes The map of topic hashes. Key is topic name and value is the topic hash. + * @return The hash of the group. + */ +static long computeGroupHash(Map topicHashes) { +return Hashing.combineOrdered( +topicHashes.entrySet() +.stream() +.sorted(Map.Entry.comparingByKey()) +.map(e -> HashCode.fromLong(e.getValue())) +.toList() +).asLong(); +} + +/** + * Computes the hash of the topic id, name, number of partitions, and partition racks by Murmur3. + * + * @param topicImage The topic image. + * @param clusterImage The cluster image. + * @return The hash of the topic. + */ +static long computeTopicHash(TopicImage topicImage, ClusterImage clusterImage) { +HashFunction hf = Hashing.murmur3_128(); +Hasher topicHasher = hf.newHasher() +.putByte((byte) 0) // magic byte +.putLong(topicImage.id().hashCode()) // topic Id +.putString(topicImage.name(), StandardCharsets.UTF_8) // topic name +.putInt(topicImage.partitions().size()); // number of partitions + + topicImage.partitions().entrySet().stream().sorted(Map.Entry.comparingByKey()).forEach(entry -> { Review Comment: Good suggestion! Thanks. Updated it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17747: [2/N] Add compute topic and group hash [kafka]
FrankYang0529 commented on code in PR #19523: URL: https://github.com/apache/kafka/pull/19523#discussion_r2063440444 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java: ## @@ -209,4 +219,50 @@ void validateOffsetFetch( default boolean shouldExpire() { return true; } + +/** + * Computes the hash of the topics in a group. + * + * @param topicHashes The map of topic hashes. Key is topic name and value is the topic hash. + * @return The hash of the group. + */ +static long computeGroupHash(Map topicHashes) { +return Hashing.combineOrdered( +topicHashes.entrySet() +.stream() +.sorted(Map.Entry.comparingByKey()) +.map(e -> HashCode.fromLong(e.getValue())) +.toList() +).asLong(); +} + +/** + * Computes the hash of the topic id, name, number of partitions, and partition racks by Murmur3. + * + * @param topicImage The topic image. + * @param clusterImage The cluster image. + * @return The hash of the topic. + */ +static long computeTopicHash(TopicImage topicImage, ClusterImage clusterImage) { +HashFunction hf = Hashing.murmur3_128(); +Hasher topicHasher = hf.newHasher() +.putByte((byte) 0) // magic byte +.putLong(topicImage.id().hashCode()) // topic Id +.putString(topicImage.name(), StandardCharsets.UTF_8) // topic name +.putInt(topicImage.partitions().size()); // number of partitions + + topicImage.partitions().entrySet().stream().sorted(Map.Entry.comparingByKey()).forEach(entry -> { +topicHasher.putInt(entry.getKey()); // partition id +String racks = Arrays.stream(entry.getValue().replicas) +.mapToObj(clusterImage::broker) +.filter(Objects::nonNull) +.map(BrokerRegistration::rack) +.filter(Optional::isPresent) +.map(Optional::get) +.sorted() +.collect(Collectors.joining(";")); Review Comment: It looks like any character can be valid. I change the combination with following format: ``` 0:,1:, ... ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17747: [2/N] Add compute topic and group hash [kafka]
FrankYang0529 commented on code in PR #19523: URL: https://github.com/apache/kafka/pull/19523#discussion_r2063438718 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java: ## @@ -209,4 +219,50 @@ void validateOffsetFetch( default boolean shouldExpire() { return true; } + +/** + * Computes the hash of the topics in a group. + * + * @param topicHashes The map of topic hashes. Key is topic name and value is the topic hash. + * @return The hash of the group. + */ +static long computeGroupHash(Map topicHashes) { Review Comment: > I wonder whether it is worth inlining the implementation from Guava or something similar to combine the hashes. It would avoid the extra collections. Yes, I copy some implementation to this function. > In the KIP, you also mentioned combining the index with the hash. Is this something done within `combineOrdered`? No, the `computeGroupHash` sorts topics by name and use this order to merge hashes. I also add test case `testComputeGroupHashWithDifferentOrder` and `testComputeGroupHashWithSameKeyButDifferentValue` to verify it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17747: [2/N] Add compute topic and group hash [kafka]
FrankYang0529 commented on code in PR #19523: URL: https://github.com/apache/kafka/pull/19523#discussion_r2063438718 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java: ## @@ -209,4 +219,50 @@ void validateOffsetFetch( default boolean shouldExpire() { return true; } + +/** + * Computes the hash of the topics in a group. + * + * @param topicHashes The map of topic hashes. Key is topic name and value is the topic hash. + * @return The hash of the group. + */ +static long computeGroupHash(Map topicHashes) { Review Comment: > I wonder whether it is worth inlining the implementation from Guava or something similar to combine the hashes. It would avoid the extra collections. Yes, I copy some implementation to this function. > In the KIP, you also mentioned combining the index with the hash. Is this something done within `combineOrdered`? No, the `computeGroupHash` sorts topics by name and use this order to merge hashes. I also add test case `testComputeGroupHashWithDifferentOrder` and `testComputeGroupHashWithSameKeyButDifferentValue` to verify it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17747: [2/N] Add compute topic and group hash [kafka]
FrankYang0529 commented on code in PR #19523: URL: https://github.com/apache/kafka/pull/19523#discussion_r2063434753 ## gradle/dependencies.gradle: ## @@ -147,6 +148,7 @@ libs += [ caffeine: "com.github.ben-manes.caffeine:caffeine:$versions.caffeine", classgraph: "io.github.classgraph:classgraph:$versions.classgraph", commonsValidator: "commons-validator:commons-validator:$versions.commonsValidator", + guava: "com.google.guava:guava:$versions.guava", Review Comment: I update PR to remove guava. I think we can put all data to a byte array and use Murmur3 to hash it, so we don't rely on guava. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17747: [2/N] Add compute topic and group hash [kafka]
dajac commented on code in PR #19523: URL: https://github.com/apache/kafka/pull/19523#discussion_r2059807814 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java: ## @@ -209,4 +219,50 @@ void validateOffsetFetch( default boolean shouldExpire() { return true; } + +/** + * Computes the hash of the topics in a group. + * + * @param topicHashes The map of topic hashes. Key is topic name and value is the topic hash. + * @return The hash of the group. + */ +static long computeGroupHash(Map topicHashes) { +return Hashing.combineOrdered( +topicHashes.entrySet() +.stream() +.sorted(Map.Entry.comparingByKey()) +.map(e -> HashCode.fromLong(e.getValue())) +.toList() +).asLong(); +} + +/** + * Computes the hash of the topic id, name, number of partitions, and partition racks by Murmur3. + * + * @param topicImage The topic image. + * @param clusterImage The cluster image. + * @return The hash of the topic. + */ +static long computeTopicHash(TopicImage topicImage, ClusterImage clusterImage) { +HashFunction hf = Hashing.murmur3_128(); +Hasher topicHasher = hf.newHasher() +.putByte((byte) 0) // magic byte +.putLong(topicImage.id().hashCode()) // topic Id +.putString(topicImage.name(), StandardCharsets.UTF_8) // topic name +.putInt(topicImage.partitions().size()); // number of partitions + + topicImage.partitions().entrySet().stream().sorted(Map.Entry.comparingByKey()).forEach(entry -> { +topicHasher.putInt(entry.getKey()); // partition id +String racks = Arrays.stream(entry.getValue().replicas) +.mapToObj(clusterImage::broker) +.filter(Objects::nonNull) +.map(BrokerRegistration::rack) +.filter(Optional::isPresent) +.map(Optional::get) +.sorted() +.collect(Collectors.joining(";")); Review Comment: `;` is allowed in the `rack` field too so it does really protect us. ## gradle/dependencies.gradle: ## @@ -147,6 +148,7 @@ libs += [ caffeine: "com.github.ben-manes.caffeine:caffeine:$versions.caffeine", classgraph: "io.github.classgraph:classgraph:$versions.classgraph", commonsValidator: "commons-validator:commons-validator:$versions.commonsValidator", + guava: "com.google.guava:guava:$versions.guava", Review Comment: This is something that we haven't really discussed in the KIP because it is an implementation detail but we should discuss whether we really want to take a dependency on Guava. ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java: ## @@ -209,4 +219,50 @@ void validateOffsetFetch( default boolean shouldExpire() { return true; } + +/** + * Computes the hash of the topics in a group. + * + * @param topicHashes The map of topic hashes. Key is topic name and value is the topic hash. + * @return The hash of the group. + */ +static long computeGroupHash(Map topicHashes) { Review Comment: I wonder whether it is worth inlining the implementation from Guava or something similar to combine the hashes. It would avoid the extra collections. I am not sure whether it makes a real difference though. What are your thoughts? ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java: ## @@ -209,4 +219,50 @@ void validateOffsetFetch( default boolean shouldExpire() { return true; } + +/** + * Computes the hash of the topics in a group. + * + * @param topicHashes The map of topic hashes. Key is topic name and value is the topic hash. + * @return The hash of the group. + */ +static long computeGroupHash(Map topicHashes) { +return Hashing.combineOrdered( +topicHashes.entrySet() +.stream() +.sorted(Map.Entry.comparingByKey()) +.map(e -> HashCode.fromLong(e.getValue())) +.toList() +).asLong(); +} + +/** + * Computes the hash of the topic id, name, number of partitions, and partition racks by Murmur3. + * + * @param topicImage The topic image. + * @param clusterImage The cluster image. + * @return The hash of the topic. + */ +static long computeTopicHash(TopicImage topicImage, ClusterImage clusterImage) { +HashFunction hf = Hashing.murmur3_128(); +Hasher topicHasher = hf.newHasher() +.putByte((byte) 0) // magic byte +.putLong(topicImage.id().hashCode()) // topic Id +.putString(topicImage.name(), StandardCharsets.UTF_8) // topic name +.putInt(topicImage.partitions().size()); // number of partitio