smjn commented on code in PR #22284:
URL: https://github.com/apache/kafka/pull/22284#discussion_r3242665395
##########
server-common/src/main/java/org/apache/kafka/server/share/dlq/ShareGroupDLQStateManager.java:
##########
@@ -165,53 +236,104 @@ public Optional<Throwable> validateDlqTopic() {
});
}
- public ShareGroupDLQRecordParameter recordParam() {
- return param;
- }
-
public boolean dlqTopicExists() {
Optional<String> shareGroupDlqTopic =
cacheHelper.shareGroupDlqTopic(param.groupId());
return
shareGroupDlqTopic.filter(cacheHelper::containsTopic).isPresent();
}
- }
- private class ProduceRequestHandler extends
ShareGroupDLQStateManagerHandler {
- private final CompletableFuture<Void> result;
- private static final Logger LOG =
LoggerFactory.getLogger(ShareGroupDLQStateManager.ProduceRequestHandler.class);
-
- public ProduceRequestHandler(ShareGroupDLQRecordParameter param,
CompletableFuture<Void> result) {
- super(param);
- this.result = result;
+ public String dlqTopicName() {
+ return
cacheHelper.shareGroupDlqTopic(param.groupId()).orElse("UNKNOWN");
}
- @Override
- protected AbstractRequest.Builder<? extends AbstractRequest>
requestBuilder() {
- return null;
- }
-
- @Override
- protected CompletableFuture<? extends AbstractResponse> result() {
- return CompletableFuture.completedFuture(null);
- }
+ // Visibility for testing
+ Optional<Errors> checkResponseError(ClientResponse response) {
+ if (response.hasResponse()) {
+ return Optional.empty();
+ }
- @Override
- protected String name() {
- return "ProduceRequestHandler";
+ LOG.debug("Response for RPC {} with DLQ topic {} is invalid - {}",
name(), dlqTopicName(), response);
+
+ if (response.authenticationException() != null) {
+ LOG.error("Authentication exception",
response.authenticationException());
+ Errors error =
Errors.forException(response.authenticationException());
+ return Optional.of(error);
+ } else if (response.versionMismatch() != null) {
+ LOG.error("Version mismatch exception",
response.versionMismatch());
+ Errors error = Errors.forException(response.versionMismatch());
+ return Optional.of(error);
+ } else if (response.wasDisconnected()) { // Retriable
+ return Optional.of(Errors.NETWORK_EXCEPTION);
+ } else if (response.wasTimedOut()) { // Retriable
+ LOG.debug("Response for RPC {} with DLQ topic {} timed out -
{}.", name(), dlqTopicName(), response);
+ return Optional.of(Errors.REQUEST_TIMED_OUT);
+ } else {
+ return Optional.of(Errors.UNKNOWN_SERVER_ERROR);
+ }
}
- @Override
- protected void createTopicErrorResponse(Exception exception) {
- this.result.completeExceptionally(exception);
- }
+ private void handleCreateTopicsResponse(ClientResponse response) {
+ LOG.debug("Received CreateTopicsResponse {}", response);
+ createTopicsBackoff.incrementAttempt();
+ Errors clientResponseError =
checkResponseError(response).orElse(Errors.NONE);
+ String clientResponseErrorMessage = clientResponseError.message();
+ String dlqTopicName = dlqTopicName();
+
+ switch (clientResponseError) {
+ case NONE:
+ // Topic has been created
+ CreateTopicsResponse createTopicsResponse =
((CreateTopicsResponse) response.responseBody());
+ Optional<CreateTopicsResponseData.CreatableTopicResult>
topicResultOpt = createTopicsResponse.data().topics().stream().findFirst();
+ if (topicResultOpt.isEmpty() ||
!topicResultOpt.get().name().equals(dlqTopicName)) {
+ LOG.error("DLQ topic not found in create topic
response {}.", dlqTopicName);
+
requestErrorResponse(Errors.UNKNOWN_TOPIC_OR_PARTITION.exception());
Review Comment:
Ok, I will remove this check for now
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]