smjn commented on code in PR #18014: URL: https://github.com/apache/kafka/pull/18014#discussion_r1867985680
########## share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java: ########## @@ -240,9 +249,43 @@ public void startup( log.info("Starting up."); numPartitions = shareGroupTopicPartitionCount.getAsInt(); + setupRecordPruning(); log.info("Startup complete."); } + private void setupRecordPruning() { + timer.add(new TimerTask(config.shareCoordinatorTopicPruneIntervalMs()) { + @Override + public void run() { + for (int i = 0; i < numPartitions; i++) { Review Comment: No - the shard (state machine) does not maintain that mapping. The information is maintained by the runtime which calls the appropriate shards, based on the context. This information is not exposed outside. We need access to https://github.com/apache/kafka/blob/trunk/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java#L1877 to expose this information. Even then the shard cannot do this. The runtime is encapsulated in the ShareCoordinatorService and only it can issue calls to the runtime. The Shard only serves to provide data related to partitions. Using the loop approach - for a specific internal topic-partition only the correct Shard will honour the request and the others will fail silently due to NOT_COORDINATOR. Flow is ``` ShareCoordinatorShard.callback | | add task task with correct shard ShareCoordinatorService ----> Runtime -----> ==================== ----> EventProcessor | QUEUE | obtain shard from TP in task ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org