ijuma commented on code in PR #18295:
URL: https://github.com/apache/kafka/pull/18295#discussion_r1896287215
##########
clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java:
##########
@@ -503,63 +499,51 @@ private void handleSaslToken(byte[] clientToken) throws
IOException {
}
}
- private boolean handleKafkaRequest(byte[] requestBytes) throws
IOException, AuthenticationException {
- boolean isKafkaRequest = false;
- String clientMechanism = null;
+ /**
+ * @throws InvalidRequestException if the request is not in Kafka format
or if the API key is invalid. Clients
+ * that support SASL without support for KIP-43 (e.g. Kafka Clients 0.9.x)
are in the former bucket - the first
+ * packet such clients send is a GSSAPI token starting with 0x60.
+ */
+ private void handleKafkaRequest(byte[] requestBytes) throws IOException,
AuthenticationException {
try {
ByteBuffer requestBuffer = ByteBuffer.wrap(requestBytes);
RequestHeader header = RequestHeader.parse(requestBuffer);
ApiKeys apiKey = header.apiKey();
- // A valid Kafka request header was received. SASL authentication
tokens are now expected only
- // following a SaslHandshakeRequest since this is not a GSSAPI
client token from a Kafka 0.9.0.x client.
- if (saslState == SaslState.INITIAL_REQUEST)
- setSaslState(SaslState.HANDSHAKE_OR_VERSIONS_REQUEST);
- isKafkaRequest = true;
-
// Raise an error prior to parsing if the api cannot be handled at
this layer. This avoids
// unnecessary exposure to some of the more complex schema types.
if (apiKey != ApiKeys.API_VERSIONS && apiKey !=
ApiKeys.SASL_HANDSHAKE)
- throw new IllegalSaslStateException("Unexpected Kafka request
of type " + apiKey + " during SASL handshake.");
+ throw new InvalidRequestException("Unexpected Kafka request of
type " + apiKey + " during SASL handshake.");
LOG.debug("Handling Kafka request {} during {}", apiKey,
reauthInfo.authenticationOrReauthenticationText());
-
RequestContext requestContext = new RequestContext(header,
connectionId, clientAddress(), Optional.of(clientPort()),
KafkaPrincipal.ANONYMOUS, listenerName, securityProtocol,
ClientInformation.EMPTY, false);
RequestAndSize requestAndSize =
requestContext.parseRequest(requestBuffer);
+
+ // A valid Kafka request was received, we can now update the sasl
state
+ if (saslState == SaslState.INITIAL_REQUEST)
+ setSaslState(SaslState.HANDSHAKE_OR_VERSIONS_REQUEST);
+
if (apiKey == ApiKeys.API_VERSIONS)
handleApiVersionsRequest(requestContext, (ApiVersionsRequest)
requestAndSize.request);
- else
- clientMechanism = handleHandshakeRequest(requestContext,
(SaslHandshakeRequest) requestAndSize.request);
+ else {
+ String clientMechanism =
handleHandshakeRequest(requestContext, (SaslHandshakeRequest)
requestAndSize.request);
+ if (!reauthInfo.reauthenticating() ||
reauthInfo.saslMechanismUnchanged(clientMechanism)) {
+ createSaslServer(clientMechanism);
+ setSaslState(SaslState.AUTHENTICATE);
+ }
+ }
} catch (InvalidRequestException e) {
if (saslState == SaslState.INITIAL_REQUEST) {
- // InvalidRequestException is thrown if the request is not in
Kafka format or if the API key
- // is invalid. For compatibility with 0.9.0.x where the first
packet is a GSSAPI token
- // starting with 0x60, revert to GSSAPI for both these
exceptions.
- if (LOG.isDebugEnabled()) {
- StringBuilder tokenBuilder = new StringBuilder();
- for (byte b : requestBytes) {
- tokenBuilder.append(String.format("%02x", b));
- if (tokenBuilder.length() >= 20)
- break;
- }
- LOG.debug("Received client packet of length {} starting
with bytes 0x{}, process as GSSAPI packet", requestBytes.length, tokenBuilder);
- }
- if (enabledMechanisms.contains(SaslConfigs.GSSAPI_MECHANISM)) {
- LOG.debug("First client packet is not a SASL mechanism
request, using default mechanism GSSAPI");
- clientMechanism = SaslConfigs.GSSAPI_MECHANISM;
- } else
- throw new UnsupportedSaslMechanismException("Exception
handling first SASL packet from client, GSSAPI is not supported by server", e);
- } else
- throw e;
- }
- if (clientMechanism != null && (!reauthInfo.reauthenticating()
- || reauthInfo.saslMechanismUnchanged(clientMechanism))) {
- createSaslServer(clientMechanism);
- setSaslState(SaslState.AUTHENTICATE);
+ // InvalidRequestException is thrown if the request is not in
Kafka format or if the API key is invalid.
+ // If it's the initial request, this could be an ancient
client (see method documentation for more details),
+ // a client configured with the wrong security protocol or a
non kafka-client altogether (eg http client).
+ throw new InvalidRequestException("Invalid request, potential
reasons: kafka client configured with the " +
Review Comment:
Please take a look at the error message and check if it makes sense. This
will only be captured in the broker, but still good to try and make it as
informative as possible.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]