[GitHub] [kafka] lihaosky commented on a diff in pull request #13851: KAFKA-15022: [1/N] initial implementation of rack aware assignor

2023-07-07 Thread via GitHub


lihaosky commented on code in PR #13851:
URL: https://github.com/apache/kafka/pull/13851#discussion_r1255324708


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java:
##
@@ -409,6 +411,43 @@ private String getBrokerSideConfigValue(final Config 
brokerSideTopicConfig,
 return brokerSideConfigEntry.value();
 }
 
+public Map> getTopicPartitionInfo(final 
Set topics) {
+log.debug("Starting to describe topics {} in partition assignor.", 
topics);
+
+long currentWallClockMs = time.milliseconds();
+final long deadlineMs = currentWallClockMs + retryTimeoutMs;
+
+Set topicsToDescribe = new HashSet<>(topics);
+final Map> topicPartitionInfo = new 
HashMap<>();
+
+while (!topicsToDescribe.isEmpty()) {
+final Map> existed = 
getTopicPartitionInfo(topicsToDescribe, null);
+topicPartitionInfo.putAll(existed);
+topicsToDescribe.removeAll(topicPartitionInfo.keySet());
+if (!topicsToDescribe.isEmpty()) {
+currentWallClockMs = time.milliseconds();
+
+if (currentWallClockMs >= deadlineMs) {
+final String timeoutError = String.format(
+"Could not create topics within %d milliseconds. " +
+"This can happen if the Kafka cluster is 
temporarily not available.",
+retryTimeoutMs);
+log.error(timeoutError);

Review Comment:
   I feel this log is ok since it logs internal `retryTimeoutMs` here which 
makes it clear. Caller will have to find it out to log it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] lihaosky commented on a diff in pull request #13851: KAFKA-15022: [1/N] initial implementation of rack aware assignor

2023-07-07 Thread via GitHub


lihaosky commented on code in PR #13851:
URL: https://github.com/apache/kafka/pull/13851#discussion_r1255322606


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java:
##
@@ -279,6 +279,28 @@ public CreateTopicsResult createTopics(final 
Collection newTopics,
 + " You can change the replication.factor config or upgrade 
your brokers to version 2.4 or newer to avoid this error."));
 }
 
+@Test
+public void shouldThrowTimeoutExceptionInGetPartitionInfo() {
+setupTopicInMockAdminClient(topic1, Collections.emptyMap());
+final MockTime time = new MockTime(
+(Integer) 
config.get(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG))
 / 15

Review Comment:
   Sure. Change to 5



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] lihaosky commented on a diff in pull request #13851: KAFKA-15022: [1/N] initial implementation of rack aware assignor

2023-07-07 Thread via GitHub


lihaosky commented on code in PR #13851:
URL: https://github.com/apache/kafka/pull/13851#discussion_r1255320753


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignorTest.java:
##
@@ -0,0 +1,264 @@
+package org.apache.kafka.streams.processor.internals.assignment;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.spy;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
+import org.apache.kafka.clients.admin.MockAdminClient;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.TopicPartitionInfo;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.StreamsConfig.InternalConfig;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.RackAwareTaskAssignor;
+import 
org.apache.kafka.streams.processor.internals.TopologyMetadata.Subtopology;
+import 
org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs;
+import org.apache.kafka.test.MockClientSupplier;
+import org.apache.kafka.test.MockInternalTopicManager;
+import org.junit.Test;
+
+public class RackAwareTaskAssignorTest {
+private final static String TOPIC0 = "topic0";
+private final static String TOPIC1 = "topic1";
+private static final String USER_END_POINT = "localhost:8080";
+private static final String APPLICATION_ID = 
"stream-partition-assignor-test";
+
+private final Node node0 = new Node(0, "node0", 1, "rack1");
+private final Node node1 = new Node(1, "node1", 1, "rack2");
+private final Node node2 = new Node(2, "node2", 1, "rack3");
+private final Node[] replicas = new Node[] {node0, node1, node2};
+
+private final PartitionInfo partitionInfo00 = new PartitionInfo(TOPIC0, 0, 
node0, replicas, replicas);
+private final PartitionInfo partitionInfo01 = new PartitionInfo(TOPIC0, 1, 
node0, replicas, replicas);
+
+private final TopicPartition tp00 = new TopicPartition(TOPIC0, 0);
+private final TopicPartition tp01 = new TopicPartition(TOPIC0, 1);
+private final TopicPartition tp10 = new TopicPartition(TOPIC1, 0);
+
+private final UUID process0UUID  = UUID.randomUUID();
+private final UUID process1UUID = UUID.randomUUID();
+
+private final Subtopology subtopology1 = new Subtopology(1, "topology1");
+
+private final MockTime time = new MockTime();
+private final StreamsConfig streamsConfig = new 
StreamsConfig(configProps());
+private final MockClientSupplier mockClientSupplier = new 
MockClientSupplier();
+
+private final MockInternalTopicManager mockInternalTopicManager = new 
MockInternalTopicManager(
+time,
+streamsConfig,
+mockClientSupplier.restoreConsumer,
+false
+);
+
+private Map configProps() {
+final Map configurationMap = new HashMap<>();
+configurationMap.put(StreamsConfig.APPLICATION_ID_CONFIG, 
APPLICATION_ID);
+configurationMap.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
USER_END_POINT);
+
+final ReferenceContainer referenceContainer = new ReferenceContainer();
+/*
+referenceContainer.mainConsumer = consumer;
+referenceContainer.adminClient = adminClient;
+referenceContainer.taskManager = taskManager;
+referenceContainer.streamsMetadataState = streamsMetadataState;
+referenceContainer.time = time;
+*/
+
configurationMap.put(InternalConfig.REFERENCE_CONTAINER_PARTITION_ASSIGNOR, 
referenceContainer);
+return configurationMap;
+}
+
+@Test
+public void disableActiveSinceMissingClusterInfo() {
+final Cluster metadata = new Cluster(
+"cluster",
+new HashSet<>(Arrays.asList(node0, node1, node2)),
+new HashSet<>(Arrays.asList(partitionInfo00, partitionInfo01)),
+Collections.emptySet(),
+Collections.emptySet()
+);
+
+final Map>> processRacks = new 
HashMap<>();
+
+processRacks.put(process0UUID , Collections.singletonMap("consumer1", 
Optional.of("rack1")));
+
+final RackAwareTaskAssignor assignor = new RackAwareTaskAssignor(
+metadata,
+Collections.singletonMap(new TaskId(1, 1), new 
HashSet<>(Arrays.asList(tp00, tp10))),
+Collections.singletonMap(subtopology1, Collections.singleton(new 
TaskId(1, 1))),
+   

[GitHub] [kafka] lihaosky commented on a diff in pull request #13851: KAFKA-15022: [1/N] initial implementation of rack aware assignor

2023-07-07 Thread via GitHub


lihaosky commented on code in PR #13851:
URL: https://github.com/apache/kafka/pull/13851#discussion_r1255321572


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignorTest.java:
##
@@ -0,0 +1,264 @@
+package org.apache.kafka.streams.processor.internals.assignment;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.spy;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
+import org.apache.kafka.clients.admin.MockAdminClient;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.TopicPartitionInfo;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.StreamsConfig.InternalConfig;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.RackAwareTaskAssignor;
+import 
org.apache.kafka.streams.processor.internals.TopologyMetadata.Subtopology;
+import 
org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs;
+import org.apache.kafka.test.MockClientSupplier;
+import org.apache.kafka.test.MockInternalTopicManager;
+import org.junit.Test;
+
+public class RackAwareTaskAssignorTest {
+private final static String TOPIC0 = "topic0";
+private final static String TOPIC1 = "topic1";
+private static final String USER_END_POINT = "localhost:8080";
+private static final String APPLICATION_ID = 
"stream-partition-assignor-test";
+
+private final Node node0 = new Node(0, "node0", 1, "rack1");
+private final Node node1 = new Node(1, "node1", 1, "rack2");
+private final Node node2 = new Node(2, "node2", 1, "rack3");
+private final Node[] replicas = new Node[] {node0, node1, node2};
+
+private final PartitionInfo partitionInfo00 = new PartitionInfo(TOPIC0, 0, 
node0, replicas, replicas);
+private final PartitionInfo partitionInfo01 = new PartitionInfo(TOPIC0, 1, 
node0, replicas, replicas);
+
+private final TopicPartition tp00 = new TopicPartition(TOPIC0, 0);
+private final TopicPartition tp01 = new TopicPartition(TOPIC0, 1);
+private final TopicPartition tp10 = new TopicPartition(TOPIC1, 0);
+
+private final UUID process0UUID  = UUID.randomUUID();
+private final UUID process1UUID = UUID.randomUUID();
+
+private final Subtopology subtopology1 = new Subtopology(1, "topology1");
+
+private final MockTime time = new MockTime();
+private final StreamsConfig streamsConfig = new 
StreamsConfig(configProps());
+private final MockClientSupplier mockClientSupplier = new 
MockClientSupplier();
+
+private final MockInternalTopicManager mockInternalTopicManager = new 
MockInternalTopicManager(
+time,
+streamsConfig,
+mockClientSupplier.restoreConsumer,
+false
+);
+
+private Map configProps() {
+final Map configurationMap = new HashMap<>();
+configurationMap.put(StreamsConfig.APPLICATION_ID_CONFIG, 
APPLICATION_ID);
+configurationMap.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
USER_END_POINT);
+
+final ReferenceContainer referenceContainer = new ReferenceContainer();
+/*
+referenceContainer.mainConsumer = consumer;
+referenceContainer.adminClient = adminClient;
+referenceContainer.taskManager = taskManager;
+referenceContainer.streamsMetadataState = streamsMetadataState;
+referenceContainer.time = time;
+*/
+
configurationMap.put(InternalConfig.REFERENCE_CONTAINER_PARTITION_ASSIGNOR, 
referenceContainer);
+return configurationMap;
+}
+
+@Test
+public void disableActiveSinceMissingClusterInfo() {
+final Cluster metadata = new Cluster(
+"cluster",
+new HashSet<>(Arrays.asList(node0, node1, node2)),
+new HashSet<>(Arrays.asList(partitionInfo00, partitionInfo01)),
+Collections.emptySet(),
+Collections.emptySet()
+);
+
+final Map>> processRacks = new 
HashMap<>();
+
+processRacks.put(process0UUID , Collections.singletonMap("consumer1", 
Optional.of("rack1")));
+
+final RackAwareTaskAssignor assignor = new RackAwareTaskAssignor(
+metadata,
+Collections.singletonMap(new TaskId(1, 1), new 
HashSet<>(Arrays.asList(tp00, tp10))),
+Collections.singletonMap(subtopology1, Collections.singleton(new 
TaskId(1, 1))),
+   

[GitHub] [kafka] lihaosky commented on a diff in pull request #13851: KAFKA-15022: [1/N] initial implementation of rack aware assignor

2023-06-27 Thread via GitHub


lihaosky commented on code in PR #13851:
URL: https://github.com/apache/kafka/pull/13851#discussion_r1244450143


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java:
##
@@ -409,6 +411,43 @@ private String getBrokerSideConfigValue(final Config 
brokerSideTopicConfig,
 return brokerSideConfigEntry.value();
 }
 
+public Map> getTopicPartitionInfo(final 
Set topics) {
+log.debug("Starting to describe topics {} in partition assignor.", 
topics);
+
+long currentWallClockMs = time.milliseconds();
+final long deadlineMs = currentWallClockMs + retryTimeoutMs;
+
+Set topicsToDescribe = new HashSet<>(topics);
+final Map> topicPartitionInfo = new 
HashMap<>();
+
+while (!topicsToDescribe.isEmpty()) {
+final Map> existed = 
getTopicPartitionInfo(topicsToDescribe, null);
+topicPartitionInfo.putAll(existed);
+topicsToDescribe.removeAll(topicPartitionInfo.keySet());
+if (!topicsToDescribe.isEmpty()) {
+currentWallClockMs = time.milliseconds();
+
+if (currentWallClockMs >= deadlineMs) {
+final String timeoutError = String.format(
+"Could not create topics within %d milliseconds. " +
+"This can happen if the Kafka cluster is 
temporarily not available.",
+retryTimeoutMs);
+log.error(timeoutError);

Review Comment:
   Is this mimicking the `makeReady` function logs. 



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java:
##
@@ -409,6 +411,43 @@ private String getBrokerSideConfigValue(final Config 
brokerSideTopicConfig,
 return brokerSideConfigEntry.value();
 }
 
+public Map> getTopicPartitionInfo(final 
Set topics) {
+log.debug("Starting to describe topics {} in partition assignor.", 
topics);
+
+long currentWallClockMs = time.milliseconds();
+final long deadlineMs = currentWallClockMs + retryTimeoutMs;
+
+Set topicsToDescribe = new HashSet<>(topics);
+final Map> topicPartitionInfo = new 
HashMap<>();
+
+while (!topicsToDescribe.isEmpty()) {
+final Map> existed = 
getTopicPartitionInfo(topicsToDescribe, null);
+topicPartitionInfo.putAll(existed);
+topicsToDescribe.removeAll(topicPartitionInfo.keySet());
+if (!topicsToDescribe.isEmpty()) {
+currentWallClockMs = time.milliseconds();
+
+if (currentWallClockMs >= deadlineMs) {
+final String timeoutError = String.format(
+"Could not create topics within %d milliseconds. " +
+"This can happen if the Kafka cluster is 
temporarily not available.",
+retryTimeoutMs);
+log.error(timeoutError);
+throw new TimeoutException(timeoutError);
+}
+log.info(

Review Comment:
   Is this mimicking the `makeReady` function logs. 



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/RackAwareTaskAssignor.java:
##
@@ -0,0 +1,164 @@
+package org.apache.kafka.streams.processor.internals;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.TopicPartitionInfo;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.processor.TaskId;
+import 
org.apache.kafka.streams.processor.internals.TopologyMetadata.Subtopology;
+import 
org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RackAwareTaskAssignor {
+private static final Logger log = 
LoggerFactory.getLogger(RackAwareTaskAssignor.class);
+
+private final Cluster fullMetadata;
+private final Map> partitionsForTask;
+private final Map>> processRacks;
+private final AssignmentConfigs assignmentConfigs;
+private final Map> racksForPartition;
+private final InternalTopicManager internalTopicManager;
+private Boolean canEnableForActive;
+
+public RackAwareTaskAssignor(final Cluster fullMetadata,
+ final Map> 
partitionsForTask,
+ final Map> 
tasksForTopicGroup,
+ final Map>> processRacks,
+ final InternalTopicManager