AndrewJSchofield commented on code in PR #22241:
URL: https://github.com/apache/kafka/pull/22241#discussion_r3208942148
##########
core/src/main/java/kafka/server/share/ShareCoordinatorMetadataCacheHelperImpl.java:
##########
@@ -62,6 +67,68 @@ public boolean containsTopic(String topic) {
return false;
}
+ @Override
+ public Optional<String> shareGroupDlqTopic(String groupId) {
+ Properties props = metadataCache.groupConfig(groupId);
+ if (props == null || props.isEmpty()) {
+ return Optional.empty();
+ }
+ Object topicName =
props.get(GroupConfig.ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG);
+ if (topicName instanceof String) {
+ return Optional.of((String) topicName);
+ }
+ return Optional.empty();
+ }
+
+ @Override
+ public boolean isDlqAutoTopicCreateEnabled() {
+ Optional<Integer> someBrokerId =
metadataCache.getRandomAliveBrokerId();
+ if (someBrokerId.isEmpty() || someBrokerId.get() < 0) {
+ return false;
+ }
+ Properties props = metadataCache.brokerConfig(someBrokerId.get());
+
+ if (props == null || props.isEmpty()) {
+ return false;
+ }
+ Object isEnabled =
props.get(GroupCoordinatorConfig.ERRORS_DEADLETTERQUEUE_AUTO_CREATE_TOPICS_ENABLE_CONFIG);
+ if (isEnabled instanceof Boolean) {
+ return (boolean) isEnabled;
+ }
+ return false;
+ }
+
+ @Override
+ public Optional<String> shareGroupDlqTopicPrefix() {
+ Optional<Integer> someBrokerId =
metadataCache.getRandomAliveBrokerId();
Review Comment:
I wonder why this is not using `GroupConfigManager` like the rest of the
share group code. The random alive broker thing looks fantastic, but that's not
necessarily a good thing.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]