smjn commented on code in PR #20884:
URL: https://github.com/apache/kafka/pull/20884#discussion_r2526951229
##########
server-common/src/main/java/org/apache/kafka/server/share/persister/PersisterStateManager.java:
##########
@@ -399,14 +401,24 @@ public void onComplete(ClientResponse response) {
}
// Visibility for testing
- Optional<Errors> checkNetworkError(ClientResponse response,
BiConsumer<Errors, Exception> errorConsumer) {
+ Optional<Errors> checkResponseError(ClientResponse response,
BiConsumer<Errors, Exception> errorConsumer) {
if (response.hasResponse()) {
return Optional.empty();
}
- log.debug("Response for RPC {} with key {} is invalid - {}.",
name(), this.partitionKey, response);
-
- if (response.wasDisconnected()) {
+ log.debug("Response for RPC {} with key {} is invalid - {}",
name(), this.partitionKey, response);
+
+ if (response.authenticationException() != null) {
+ log.error("Authentication exception",
response.authenticationException());
+ Errors error =
Errors.forException(response.authenticationException());
+ errorConsumer.accept(error, new
AuthenticationException(String.format("Server response for %s indicates
authentication exception.", this.partitionKey)));
+ return Optional.of(error);
Review Comment:
@apoorvmittal10 Thanks for the question.
Specifically for SharePartition, no changes are needed are it does not
explicitly handle error codes.
For read RPC it checks:
https://github.com/apache/kafka/blob/trunk/core/src/main/java/kafka/server/share/SharePartition.java#L457
and for write:
https://github.com/apache/kafka/blob/trunk/core/src/main/java/kafka/server/share/SharePartition.java#L2660
so no special handling is needed. We do want to log these instances though
as these are rare situations and we must be aware when they arise. That is
already being done in here -
https://github.com/apache/kafka/blob/trunk/core/src/main/java/kafka/server/share/SharePartition.java#L2936
Furthermore, these are general exceptions not really tied to any specific to
the request. Other sender impls like `AddPartitionsToTxnManager` handle these
similarly.
These 2 exceptions are not included in the json spec either because of
generality.
##########
server-common/src/main/java/org/apache/kafka/server/share/persister/PersisterStateManager.java:
##########
@@ -399,14 +401,24 @@ public void onComplete(ClientResponse response) {
}
// Visibility for testing
- Optional<Errors> checkNetworkError(ClientResponse response,
BiConsumer<Errors, Exception> errorConsumer) {
+ Optional<Errors> checkResponseError(ClientResponse response,
BiConsumer<Errors, Exception> errorConsumer) {
if (response.hasResponse()) {
return Optional.empty();
}
- log.debug("Response for RPC {} with key {} is invalid - {}.",
name(), this.partitionKey, response);
-
- if (response.wasDisconnected()) {
+ log.debug("Response for RPC {} with key {} is invalid - {}",
name(), this.partitionKey, response);
+
+ if (response.authenticationException() != null) {
+ log.error("Authentication exception",
response.authenticationException());
+ Errors error =
Errors.forException(response.authenticationException());
+ errorConsumer.accept(error, new
AuthenticationException(String.format("Server response for %s indicates
authentication exception.", this.partitionKey)));
+ return Optional.of(error);
Review Comment:
@apoorvmittal10 Thanks for the question.
Specifically for SharePartition, no changes are needed as it does not
explicitly handle error codes.
For read RPC it checks:
https://github.com/apache/kafka/blob/trunk/core/src/main/java/kafka/server/share/SharePartition.java#L457
and for write:
https://github.com/apache/kafka/blob/trunk/core/src/main/java/kafka/server/share/SharePartition.java#L2660
so no special handling is needed. We do want to log these instances though
as these are rare situations and we must be aware when they arise. That is
already being done in here -
https://github.com/apache/kafka/blob/trunk/core/src/main/java/kafka/server/share/SharePartition.java#L2936
Furthermore, these are general exceptions not really tied to any specific to
the request. Other sender impls like `AddPartitionsToTxnManager` handle these
similarly.
These 2 exceptions are not included in the json spec either because of
generality.
##########
server-common/src/main/java/org/apache/kafka/server/share/persister/PersisterStateManager.java:
##########
@@ -399,14 +401,24 @@ public void onComplete(ClientResponse response) {
}
// Visibility for testing
- Optional<Errors> checkNetworkError(ClientResponse response,
BiConsumer<Errors, Exception> errorConsumer) {
+ Optional<Errors> checkResponseError(ClientResponse response,
BiConsumer<Errors, Exception> errorConsumer) {
if (response.hasResponse()) {
return Optional.empty();
}
- log.debug("Response for RPC {} with key {} is invalid - {}.",
name(), this.partitionKey, response);
-
- if (response.wasDisconnected()) {
+ log.debug("Response for RPC {} with key {} is invalid - {}",
name(), this.partitionKey, response);
+
+ if (response.authenticationException() != null) {
+ log.error("Authentication exception",
response.authenticationException());
+ Errors error =
Errors.forException(response.authenticationException());
+ errorConsumer.accept(error, new
AuthenticationException(String.format("Server response for %s indicates
authentication exception.", this.partitionKey)));
+ return Optional.of(error);
Review Comment:
@apoorvmittal10 Thanks for the question.
Specifically for SharePartition, no changes are needed as it does not
explicitly handle error codes.
For read RPC it checks:
https://github.com/apache/kafka/blob/trunk/core/src/main/java/kafka/server/share/SharePartition.java#L457
and for write:
https://github.com/apache/kafka/blob/trunk/core/src/main/java/kafka/server/share/SharePartition.java#L2660
so no special handling is needed. We do want to log these instances though
as these are rare situations and we must be aware when they arise. That is
already being done here -
https://github.com/apache/kafka/blob/trunk/core/src/main/java/kafka/server/share/SharePartition.java#L2936
Furthermore, these are general exceptions not really tied to any specific to
the request. Other sender impls like `AddPartitionsToTxnManager` handle these
similarly.
These 2 exceptions are not included in the json spec either because of
generality.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]