ableegoldman commented on code in PR #16147: URL: https://github.com/apache/kafka/pull/16147#discussion_r1621532246
########## streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskAssignmentUtils.java: ########## @@ -481,10 +479,24 @@ private static int getCrossRackTrafficCost(final Set<TaskTopicPartition> topicPa */ private static boolean canPerformRackAwareOptimization(final ApplicationState applicationState, final AssignedTask.Type taskType) { - final String rackAwareAssignmentStrategy = applicationState.assignmentConfigs().rackAwareAssignmentStrategy(); + final AssignmentConfigs assignmentConfigs = applicationState.assignmentConfigs(); + final String rackAwareAssignmentStrategy = assignmentConfigs.rackAwareAssignmentStrategy(); if (StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_NONE.equals(rackAwareAssignmentStrategy)) { + LOG.warn("Rack aware task assignment optimization disabled: rack aware strategy was set to {}", + rackAwareAssignmentStrategy); + return false; + } + + if (!assignmentConfigs.rackAwareTrafficCost().isPresent()) { + LOG.warn("Rack aware task assignment optimization unavailable: the traffic cost configuration was not set."); Review Comment: We should log the exact config name since otherwise people won't necessarily know what this is referring to (especially since they already forgot to set this config). ```suggestion LOG.warn("Rack aware task assignment optimization unavailable: must configure {}", StreamsConfig.RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG); ``` ########## streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskAssignmentUtils.java: ########## @@ -481,10 +479,24 @@ private static int getCrossRackTrafficCost(final Set<TaskTopicPartition> topicPa */ private static boolean canPerformRackAwareOptimization(final ApplicationState applicationState, final AssignedTask.Type taskType) { - final String rackAwareAssignmentStrategy = applicationState.assignmentConfigs().rackAwareAssignmentStrategy(); + final AssignmentConfigs assignmentConfigs = applicationState.assignmentConfigs(); + final String rackAwareAssignmentStrategy = assignmentConfigs.rackAwareAssignmentStrategy(); if (StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_NONE.equals(rackAwareAssignmentStrategy)) { + LOG.warn("Rack aware task assignment optimization disabled: rack aware strategy was set to {}", + rackAwareAssignmentStrategy); + return false; + } + + if (!assignmentConfigs.rackAwareTrafficCost().isPresent()) { + LOG.warn("Rack aware task assignment optimization unavailable: the traffic cost configuration was not set."); return false; } + + if (!assignmentConfigs.rackAwareNonOverlapCost().isPresent()) { + LOG.warn("Rack aware task assignment optimization unavailable: the non-overlap cost configuration was not set."); Review Comment: ```suggestion LOG.warn("Rack aware task assignment optimization unavailable: must configure {}", StreamsConfig.RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG); ``` ########## streams/src/main/java/org/apache/kafka/streams/processor/assignment/AssignmentConfigs.java: ########## @@ -40,8 +41,8 @@ public AssignmentConfigs(final StreamsConfig configs) { configs.getInt(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG), configs.getLong(StreamsConfig.PROBING_REBALANCE_INTERVAL_MS_CONFIG), configs.getList(StreamsConfig.RACK_AWARE_ASSIGNMENT_TAGS_CONFIG), - configs.getInt(StreamsConfig.RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG), - configs.getInt(StreamsConfig.RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG), + Optional.ofNullable(configs.getInt(StreamsConfig.RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG)), + Optional.ofNullable(configs.getInt(StreamsConfig.RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG)), Review Comment: don't we need to check `if (assignorClassName.equals("org.apache.kafka.streams.processor.assignment.assignors.StickyTaskAssignor"))` and set these to the sticky assignor defaults if true? Where `assignorClassName` is equal to `streamsConfig.getString(StreamsConfig.TASK_ASSIGNOR_CLASS_CONFIG)` -- I guess maybe we do want the public `AssignmentConfigs` constructor to take in the StreamsConfig after all? ########## streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskAssignmentUtils.java: ########## @@ -481,10 +479,24 @@ private static int getCrossRackTrafficCost(final Set<TaskTopicPartition> topicPa */ private static boolean canPerformRackAwareOptimization(final ApplicationState applicationState, final AssignedTask.Type taskType) { - final String rackAwareAssignmentStrategy = applicationState.assignmentConfigs().rackAwareAssignmentStrategy(); + final AssignmentConfigs assignmentConfigs = applicationState.assignmentConfigs(); + final String rackAwareAssignmentStrategy = assignmentConfigs.rackAwareAssignmentStrategy(); if (StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_NONE.equals(rackAwareAssignmentStrategy)) { + LOG.warn("Rack aware task assignment optimization disabled: rack aware strategy was set to {}", + rackAwareAssignmentStrategy); + return false; + } + + if (!assignmentConfigs.rackAwareTrafficCost().isPresent()) { Review Comment: nit (here and for the check below): ```suggestion if (assignmentConfigs.rackAwareTrafficCost().isEmpty()) { ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org