cmccabe commented on a change in pull request #9032:
URL: https://github.com/apache/kafka/pull/9032#discussion_r468874664
##########
File path:
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##########
@@ -4071,6 +4081,168 @@ void handleFailure(Throwable throwable) {
return new
AlterClientQuotasResult(Collections.unmodifiableMap(futures));
}
+ @Override
+ public DescribeUserScramCredentialsResult
describeUserScramCredentials(List<String> users,
DescribeUserScramCredentialsOptions options) {
+ final KafkaFutureImpl<Map<String, UserScramCredentialsDescription>>
future = new KafkaFutureImpl<>();
+ final long now = time.milliseconds();
+ Call call = new Call("describeUserScramCredentials",
calcDeadlineMs(now, options.timeoutMs()),
+ new ControllerNodeProvider()) {
+ @Override
+ public DescribeUserScramCredentialsRequest.Builder
createRequest(int timeoutMs) {
+ return new DescribeUserScramCredentialsRequest.Builder(
+ new
DescribeUserScramCredentialsRequestData().setUsers(users.stream().map(user ->
+ new
DescribeUserScramCredentialsRequestData.UserName().setName(user)).collect(Collectors.toList())));
+ }
+
+ @Override
+ public void handleResponse(AbstractResponse abstractResponse) {
+ DescribeUserScramCredentialsResponse response =
(DescribeUserScramCredentialsResponse) abstractResponse;
+ Errors error = Errors.forCode(response.data().error());
+ switch (error) {
+ case NONE:
+ DescribeUserScramCredentialsResponseData data =
response.data();
+
future.complete(data.userScramCredentials().stream().collect(Collectors.toMap(
+
DescribeUserScramCredentialsResponseData.UserScramCredential::name,
+ userScramCredential -> {
+ List<ScramCredentialInfo> scramCredentialInfos
= userScramCredential.credentialInfos().stream().map(
+ credentialInfo -> new
ScramCredentialInfo(ScramMechanism.from(credentialInfo.mechanism()),
credentialInfo.iterations()))
+ .collect(Collectors.toList());
+ return new
UserScramCredentialsDescription(userScramCredential.name(),
scramCredentialInfos);
+ })));
+ break;
+ case NOT_CONTROLLER:
+ handleNotControllerError(error);
+ break;
+ default:
+ future.completeExceptionally(new ApiError(error,
response.data().errorMessage()).exception());
+ break;
+ }
+ }
+
+ @Override
+ void handleFailure(Throwable throwable) {
+ future.completeExceptionally(throwable);
+ }
+ };
+ runnable.call(call, now);
+ return new DescribeUserScramCredentialsResult(future);
+ }
+
+ @Override
+ public AlterUserScramCredentialsResult
alterUserScramCredentials(List<UserScramCredentialAlteration> alterations,
+
AlterUserScramCredentialsOptions options) {
+ final long now = time.milliseconds();
+ final Map<String, KafkaFutureImpl<Void>> futures = new HashMap<>();
+ for (UserScramCredentialAlteration alteration: alterations) {
+ futures.put(alteration.getUser(), new KafkaFutureImpl<>());
+ }
+ final Map<String, Exception> userIllegalAlterationExceptions = new
HashMap<>();
+ // We need to keep track of users with deletions of an unknown SCRAM
mechanism
+ alterations.stream().filter(a -> a instanceof
UserScramCredentialDeletion).forEach(alteration -> {
+ UserScramCredentialDeletion deletion =
(UserScramCredentialDeletion) alteration;
+ ScramMechanism mechanism = deletion.getMechanism();
+ if (mechanism == null || mechanism == ScramMechanism.UNKNOWN) {
+ userIllegalAlterationExceptions.put(deletion.getUser(), new
IllegalArgumentException("Unknown SCRAM mechanism"));
+ }
+ });
+ // Creating an upsertion may throw InvalidKeyException or
NoSuchAlgorithmException,
+ // so keep track of which users are affected by such a failure and
immediately fail all their alterations
+ final Map<String, Map<ScramMechanism,
AlterUserScramCredentialsRequestData.ScramCredentialUpsertion>> userInsertions
= new HashMap<>();
+ alterations.stream().filter(a -> a instanceof
UserScramCredentialUpsertion)
+ .filter(alteration ->
!userIllegalAlterationExceptions.containsKey(alteration.getUser()))
+ .forEach(alteration -> {
+ UserScramCredentialUpsertion upsertion =
(UserScramCredentialUpsertion) alteration;
+ String user = upsertion.getUser();
+ try {
+ ScramMechanism mechanism =
upsertion.getInfo().getMechanism();
+ if (mechanism == null || mechanism ==
ScramMechanism.UNKNOWN)
+ throw new IllegalArgumentException("Unknown SCRAM
mechanism");
+ userInsertions.putIfAbsent(user, new HashMap<>());
+ userInsertions.get(user).put(mechanism,
getScramCredentialUpsertion(upsertion));
+ } catch (Exception e) {
+ // we might overwrite aa exception from a previous
upsertion, but we don't really care
+ // since we just needs to mark this user as having at
least one illegal alteration
+ // and make an exception instance available for
completing the corresponding future exceptionally
+ userIllegalAlterationExceptions.put(user, e);
Review comment:
The pattern in Kafka has always been to have very specific error codes
and exceptions. Unfortunately INVALID_REQUEST is documented like this:
```
"This most likely occurs because of a request being malformed by the " +
"client library or the message was sent to an incompatible broker. See the
broker logs " +
"for more details.",
```
So we probably need a new error code here
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]