AndrewJSchofield commented on code in PR #22241:
URL: https://github.com/apache/kafka/pull/22241#discussion_r3217925882
##########
server-common/src/main/java/org/apache/kafka/server/share/dlq/ShareGroupDLQStateManager.java:
##########
@@ -105,21 +108,73 @@ public void stop() throws Exception {
* @return A future completing normally on successful DLQ, exceptionally
otherwise.
*/
public CompletableFuture<Void> dlq(ShareGroupDLQRecordParameter param) {
- ProduceRequestHandler requestHandler = new
ProduceRequestHandler(param);
+ CompletableFuture<Void> future = new CompletableFuture<>();
+ ProduceRequestHandler requestHandler = new
ProduceRequestHandler(param, future);
sender.enqueue(requestHandler);
- return requestHandler.result().thenAccept(response -> {
- });
+ return future;
}
private abstract class ShareGroupDLQStateManagerHandler implements
RequestCompletionHandler {
+ private final ShareGroupDLQRecordParameter param;
+
+ ShareGroupDLQStateManagerHandler(ShareGroupDLQRecordParameter param) {
+ this.param = param;
+ }
+
protected abstract AbstractRequest.Builder<? extends AbstractRequest>
requestBuilder();
protected abstract CompletableFuture<? extends AbstractResponse>
result();
+
+ protected abstract String name();
+
+ protected abstract void createTopicErrorResponse(Exception exception);
+
+ protected AbstractRequest.Builder<CreateTopicsRequest>
createTopicBuilder() {
+ return new CreateTopicsRequest.Builder(new
CreateTopicsRequestData());
+ }
+
+ public Optional<Throwable> validateDlqTopic() {
+ Optional<String> topicNameOpt =
cacheHelper.shareGroupDlqTopic(param.groupId());
+ Optional<String> topicPrefix =
cacheHelper.shareGroupDlqTopicPrefix();
+
+ if (topicNameOpt.isEmpty()) {
+ return Optional.of(new ConfigException("Configured DLQ topic
name in share group " + param.groupId() + " is empty."));
+ } else if (topicNameOpt.get().indexOf("__") == 0) {
+ return Optional.of(new ConfigException("Configured DLQ topic
name in share group " + param.groupId() + " is invalid: " +
topicNameOpt.get()));
+ }
+
+ String topicName = topicNameOpt.get();
+
+ if (cacheHelper.containsTopic(topicName) &&
!cacheHelper.isDlqEnabledOnTopic(topicName)) {
+ return Optional.of(new ConfigException("DLQ is not enabled on
configured DLQ topic for share group " + param.groupId() + " topic: ." +
topicName));
+ }
+
+ return topicPrefix.map(prefix -> {
+ if (!prefix.isEmpty() && topicName.indexOf(prefix) != 0) {
Review Comment:
Ditto.
##########
server-common/src/main/java/org/apache/kafka/server/share/dlq/ShareGroupDLQStateManager.java:
##########
@@ -105,21 +108,73 @@ public void stop() throws Exception {
* @return A future completing normally on successful DLQ, exceptionally
otherwise.
*/
public CompletableFuture<Void> dlq(ShareGroupDLQRecordParameter param) {
- ProduceRequestHandler requestHandler = new
ProduceRequestHandler(param);
+ CompletableFuture<Void> future = new CompletableFuture<>();
+ ProduceRequestHandler requestHandler = new
ProduceRequestHandler(param, future);
sender.enqueue(requestHandler);
- return requestHandler.result().thenAccept(response -> {
- });
+ return future;
}
private abstract class ShareGroupDLQStateManagerHandler implements
RequestCompletionHandler {
+ private final ShareGroupDLQRecordParameter param;
+
+ ShareGroupDLQStateManagerHandler(ShareGroupDLQRecordParameter param) {
+ this.param = param;
+ }
+
protected abstract AbstractRequest.Builder<? extends AbstractRequest>
requestBuilder();
protected abstract CompletableFuture<? extends AbstractResponse>
result();
+
+ protected abstract String name();
+
+ protected abstract void createTopicErrorResponse(Exception exception);
+
+ protected AbstractRequest.Builder<CreateTopicsRequest>
createTopicBuilder() {
+ return new CreateTopicsRequest.Builder(new
CreateTopicsRequestData());
+ }
+
+ public Optional<Throwable> validateDlqTopic() {
+ Optional<String> topicNameOpt =
cacheHelper.shareGroupDlqTopic(param.groupId());
+ Optional<String> topicPrefix =
cacheHelper.shareGroupDlqTopicPrefix();
+
+ if (topicNameOpt.isEmpty()) {
+ return Optional.of(new ConfigException("Configured DLQ topic
name in share group " + param.groupId() + " is empty."));
+ } else if (topicNameOpt.get().indexOf("__") == 0) {
Review Comment:
I suggest `String.startsWith(String)` instead of `String.indexOf(String)`.
##########
server-common/src/main/java/org/apache/kafka/server/share/dlq/ShareGroupDLQStateManager.java:
##########
@@ -149,12 +214,44 @@ private static class SendThread extends
InterBrokerSendThread {
@Override
public Collection<RequestAndCompletionHandler> generateRequests() {
+ if (!queue.isEmpty()) {
+ ShareGroupDLQStateManager.ShareGroupDLQStateManagerHandler
handler = queue.poll();
+ if (!handler.dlqTopicExists()) {
+ Node randomNode = randomNode();
+ if (randomNode == Node.noNode()) {
+ log.error("Unable to find node to use for coordinator
lookup.");
Review Comment:
This isn't really looking up a coordinator as I understand it. It's trying
to find a random node to auto-create the DLQ topic.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]