dajac commented on a change in pull request #10483: URL: https://github.com/apache/kafka/pull/10483#discussion_r610581181
########## File path: clients/src/main/java/org/apache/kafka/clients/admin/Admin.java ########## @@ -1508,6 +1508,27 @@ default DescribeProducersResult describeProducers(Collection<TopicPartition> par */ DescribeProducersResult describeProducers(Collection<TopicPartition> partitions, DescribeProducersOptions options); + /** + * Describe the state of a set of transactionalIds. See Review comment: nit: I would use `transactional ids` rather than `transactionalIds` in the description. There are few other cases in this file. ########## File path: clients/src/main/java/org/apache/kafka/clients/admin/DescribeTransactionsResult.java ########## @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.admin; + +import org.apache.kafka.clients.admin.internals.CoordinatorKey; +import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.annotation.InterfaceStability; +import org.apache.kafka.common.internals.KafkaFutureImpl; +import org.apache.kafka.common.requests.FindCoordinatorRequest; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ExecutionException; + +@InterfaceStability.Evolving +public class DescribeTransactionsResult { + private final Map<CoordinatorKey, KafkaFutureImpl<TransactionDescription>> futures; + + DescribeTransactionsResult(Map<CoordinatorKey, KafkaFutureImpl<TransactionDescription>> futures) { + this.futures = futures; + } + + public KafkaFuture<TransactionDescription> transactionalIdResult(String transactionalId) { Review comment: `transactionalIdResult` looks weird to me. How about simply using `transaction`? ########## File path: clients/src/main/java/org/apache/kafka/clients/admin/internals/CoordinatorStrategy.java ########## @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.admin.internals; + +import org.apache.kafka.common.errors.GroupAuthorizationException; +import org.apache.kafka.common.errors.TransactionalIdAuthorizationException; +import org.apache.kafka.common.message.FindCoordinatorRequestData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.AbstractResponse; +import org.apache.kafka.common.requests.FindCoordinatorRequest; +import org.apache.kafka.common.requests.FindCoordinatorResponse; +import org.apache.kafka.common.utils.LogContext; +import org.slf4j.Logger; + +import java.util.Objects; +import java.util.Set; + +public class CoordinatorStrategy implements AdminApiLookupStrategy<CoordinatorKey> { + private final Logger log; + + public CoordinatorStrategy( + LogContext logContext + ) { + this.log = logContext.logger(CoordinatorStrategy.class); + } + + @Override + public ApiRequestScope lookupScope(CoordinatorKey key) { + // The `FindCoordinator` API does not support batched lookups, so we use a + // separate lookup context for each coordinator key we need to lookup + return new LookupRequestScope(key); + } + + @Override + public FindCoordinatorRequest.Builder buildRequest(Set<CoordinatorKey> keys) { + CoordinatorKey key = requireSingleton(keys); + return new FindCoordinatorRequest.Builder( + new FindCoordinatorRequestData() + .setKey(key.idValue) + .setKeyType(key.type.id()) + ); + } + + @Override + public LookupResult<CoordinatorKey> handleResponse( + Set<CoordinatorKey> keys, + AbstractResponse abstractResponse + ) { + CoordinatorKey key = requireSingleton(keys); + FindCoordinatorResponse response = (FindCoordinatorResponse) abstractResponse; + Errors error = response.error(); + + switch (error) { + case NONE: + return LookupResult.mapped(key, response.data().nodeId()); + + case COORDINATOR_NOT_AVAILABLE: + case COORDINATOR_LOAD_IN_PROGRESS: + log.debug("FindCoordinator request for key {} returned topic-level error {}. Will retry", + key, error); + return LookupResult.empty(); + + case GROUP_AUTHORIZATION_FAILED: + return LookupResult.failed(key, new GroupAuthorizationException("FindCoordinator request for groupId " + + "`" + key + "` failed due to authorization failure", key.idValue)); + + case TRANSACTIONAL_ID_AUTHORIZATION_FAILED: + return LookupResult.failed(key, new TransactionalIdAuthorizationException("FindCoordinator request for " + + "transactionalId `" + key + "` failed due to authorization failure")); + + default: + return LookupResult.failed(key, error.exception("FindCoordinator request for key " + + "`" + key + "` failed due to an unexpected error")); Review comment: I would mention the error in the message here as well. ########## File path: clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeTransactionsHandler.java ########## @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.admin.internals; + +import org.apache.kafka.clients.admin.TransactionDescription; +import org.apache.kafka.clients.admin.TransactionState; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.TransactionalIdAuthorizationException; +import org.apache.kafka.common.errors.TransactionalIdNotFoundException; +import org.apache.kafka.common.message.DescribeTransactionsRequestData; +import org.apache.kafka.common.message.DescribeTransactionsResponseData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.AbstractResponse; +import org.apache.kafka.common.requests.DescribeTransactionsRequest; +import org.apache.kafka.common.requests.DescribeTransactionsResponse; +import org.apache.kafka.common.requests.FindCoordinatorRequest; +import org.apache.kafka.common.utils.LogContext; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.OptionalLong; +import java.util.Set; +import java.util.stream.Collectors; + +public class DescribeTransactionsHandler implements AdminApiHandler<CoordinatorKey, TransactionDescription> { + private final LogContext logContext; + private final Logger log; + private final Set<CoordinatorKey> keys; + + public DescribeTransactionsHandler( + Collection<String> transactionalIds, + LogContext logContext + ) { + this.keys = buildKeySet(transactionalIds); + this.log = logContext.logger(DescribeTransactionsHandler.class); + this.logContext = logContext; + } + + private static Set<CoordinatorKey> buildKeySet(Collection<String> transactionalIds) { + return transactionalIds.stream() + .map(DescribeTransactionsHandler::asCoordinatorKey) + .collect(Collectors.toSet()); + } + + @Override + public String apiName() { + return "describeTransactions"; + } + + @Override + public Keys<CoordinatorKey> initializeKeys() { + return Keys.dynamicMapped(keys, new CoordinatorStrategy(logContext)); + } + + @Override + public DescribeTransactionsRequest.Builder buildRequest( + Integer brokerId, + Set<CoordinatorKey> keys + ) { + DescribeTransactionsRequestData request = new DescribeTransactionsRequestData(); + List<String> transactionalIds = keys.stream().map(key -> key.idValue).collect(Collectors.toList()); + request.setTransactionalIds(transactionalIds); + return new DescribeTransactionsRequest.Builder(request); + } + + @Override + public ApiResult<CoordinatorKey, TransactionDescription> handleResponse( + Integer brokerId, + Set<CoordinatorKey> keys, + AbstractResponse abstractResponse + ) { + DescribeTransactionsResponse response = (DescribeTransactionsResponse) abstractResponse; + Map<CoordinatorKey, TransactionDescription> completed = new HashMap<>(); + Map<CoordinatorKey, Throwable> failed = new HashMap<>(); + List<CoordinatorKey> unmapped = new ArrayList<>(); + + for (DescribeTransactionsResponseData.TransactionState transactionState : response.data().transactionStates()) { + CoordinatorKey transactionalIdKey = asCoordinatorKey(transactionState.transactionalId()); + Errors error = Errors.forCode(transactionState.errorCode()); + + if (error != Errors.NONE) { + handleError(transactionalIdKey, error, failed, unmapped); + continue; + } + + OptionalLong transactionStartTimeMs = transactionState.transactionStartTimeMs() < 0 ? + OptionalLong.empty() : + OptionalLong.of(transactionState.transactionStartTimeMs()); + + completed.put(transactionalIdKey, new TransactionDescription( + brokerId, + TransactionState.parse(transactionState.transactionState()), + transactionState.producerId(), + transactionState.producerEpoch(), + transactionState.transactionTimeoutMs(), + transactionStartTimeMs, + collectTopicPartitions(transactionState) + )); + } + + return new ApiResult<>(completed, failed, unmapped); + } + + private Set<TopicPartition> collectTopicPartitions( + DescribeTransactionsResponseData.TransactionState transactionState + ) { + Set<TopicPartition> res = new HashSet<>(); + for (DescribeTransactionsResponseData.TopicData topicData : transactionState.topics()) { + String topic = topicData.topic(); + for (Integer partitionId : topicData.partitions()) { + res.add(new TopicPartition(topic, partitionId)); + } + } + return res; + } + + public static CoordinatorKey asCoordinatorKey(String transactionalId) { + return new CoordinatorKey(transactionalId, FindCoordinatorRequest.CoordinatorType.TRANSACTION); + } + + private void handleError( + CoordinatorKey transactionalIdKey, + Errors error, + Map<CoordinatorKey, Throwable> failed, + List<CoordinatorKey> unmapped + ) { + switch (error) { + case TRANSACTIONAL_ID_AUTHORIZATION_FAILED: + failed.put(transactionalIdKey, new TransactionalIdAuthorizationException( + "DescribeTransactions request for transactionalId `" + transactionalIdKey.idValue + "` " + + "failed due to authorization failure")); + break; + + case TRANSACTIONAL_ID_NOT_FOUND: + failed.put(transactionalIdKey, new TransactionalIdNotFoundException( + "DescribeTransactions request for transactionalId `" + transactionalIdKey.idValue + "` " + + "failed because the ID could not be found")); + break; + + case COORDINATOR_LOAD_IN_PROGRESS: + // If the coordinator is in the middle of loading, then we just need to retry + log.debug("DescribeTransactions request for transactionalId `{}` failed because the " + + "coordinator is still in the process of loading state. Will retry", + transactionalIdKey.idValue); + break; + + case NOT_COORDINATOR: + case COORDINATOR_NOT_AVAILABLE: + // If the coordinator is unavailable or there was a coordinator change, then we unmap + // the key so that we retry the `FindCoordinator` request + unmapped.add(transactionalIdKey); + log.debug("DescribeTransactions request for transactionalId `{}` returned error {}. Will retry", + transactionalIdKey.idValue, error); + break; + + default: + failed.put(transactionalIdKey, error.exception("DescribeTransactions request for " + + "transactionalId `" + transactionalIdKey.idValue + "` failed due to unexpected error")); Review comment: I would mention the error in the message. ########## File path: clients/src/main/java/org/apache/kafka/clients/admin/DescribeTransactionsResult.java ########## @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.admin; + +import org.apache.kafka.clients.admin.internals.CoordinatorKey; +import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.annotation.InterfaceStability; +import org.apache.kafka.common.internals.KafkaFutureImpl; +import org.apache.kafka.common.requests.FindCoordinatorRequest; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ExecutionException; + +@InterfaceStability.Evolving +public class DescribeTransactionsResult { Review comment: nit: Should we add some Javadoc to the public methods? ########## File path: clients/src/test/java/org/apache/kafka/clients/admin/internals/DescribeTransactionsHandlerTest.java ########## @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.admin.internals; + +import org.apache.kafka.clients.admin.TransactionDescription; +import org.apache.kafka.clients.admin.internals.AdminApiHandler.ApiResult; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.message.DescribeTransactionsResponseData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.DescribeTransactionsRequest; +import org.apache.kafka.common.requests.DescribeTransactionsResponse; +import org.apache.kafka.common.requests.FindCoordinatorRequest; +import org.apache.kafka.common.utils.LogContext; +import org.junit.jupiter.api.Test; + +import java.util.HashSet; +import java.util.Set; +import java.util.stream.Collectors; + +import static java.util.Arrays.asList; +import static java.util.Collections.emptyList; +import static java.util.Collections.emptyMap; +import static java.util.Collections.singletonList; +import static org.apache.kafka.common.utils.Utils.mkSet; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class DescribeTransactionsHandlerTest { + private final LogContext logContext = new LogContext(); + + @Test + public void testBuildRequest() { + String transactionalId1 = "foo"; + String transactionalId2 = "bar"; + String transactionalId3 = "baz"; + + Set<String> transactionalIds = mkSet(transactionalId1, transactionalId2, transactionalId3); + DescribeTransactionsHandler handler = new DescribeTransactionsHandler(transactionalIds, logContext); + + assertLookup(handler, transactionalIds); + assertLookup(handler, mkSet(transactionalId1)); + assertLookup(handler, mkSet(transactionalId2, transactionalId3)); + } + + @Test + public void testHandleSuccessfulResponse() { + int brokerId = 1; + String transactionalId1 = "foo"; + String transactionalId2 = "bar"; + + Set<String> transactionalIds = mkSet(transactionalId1, transactionalId2); + DescribeTransactionsHandler handler = new DescribeTransactionsHandler(transactionalIds, logContext); + + DescribeTransactionsResponseData.TransactionState transactionState1 = + sampleTransactionState1(transactionalId1); + DescribeTransactionsResponseData.TransactionState transactionState2 = + sampleTransactionState2(transactionalId2); + + Set<CoordinatorKey> keys = coordinatorKeys(transactionalIds); + DescribeTransactionsResponse response = new DescribeTransactionsResponse(new DescribeTransactionsResponseData() + .setTransactionStates(asList(transactionState1, transactionState2))); + + ApiResult<CoordinatorKey, TransactionDescription> result = handler.handleResponse( + brokerId, keys, response); + + assertEquals(keys, result.completedKeys.keySet()); + assertMatchingTransactionState(brokerId, transactionState1, + result.completedKeys.get(coordinatorKey(transactionalId1))); + assertMatchingTransactionState(brokerId, transactionState2, + result.completedKeys.get(coordinatorKey(transactionalId2))); + } + + @Test + public void testHandleErrorResponse() { + String transactionalId = "foo"; + Set<String> transactionalIds = mkSet(transactionalId); + DescribeTransactionsHandler handler = new DescribeTransactionsHandler(transactionalIds, logContext); + assertFatalError(handler, transactionalId, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED); + assertFatalError(handler, transactionalId, Errors.TRANSACTIONAL_ID_NOT_FOUND); + assertFatalError(handler, transactionalId, Errors.UNKNOWN_SERVER_ERROR); + assertRetriableError(handler, transactionalId, Errors.COORDINATOR_LOAD_IN_PROGRESS); + assertUnmappedKey(handler, transactionalId, Errors.NOT_COORDINATOR); + assertUnmappedKey(handler, transactionalId, Errors.COORDINATOR_NOT_AVAILABLE); + } + + private void assertFatalError( + DescribeTransactionsHandler handler, + String transactionalId, + Errors error + ) { + CoordinatorKey key = coordinatorKey(transactionalId); + ApiResult<CoordinatorKey, TransactionDescription> result = handleResponseError(handler, transactionalId, error); + assertEquals(emptyList(), result.unmappedKeys); + assertEquals(mkSet(key), result.failedKeys.keySet()); + + Throwable throwable = result.failedKeys.get(key); + assertTrue(error.exception().getClass().isInstance(throwable)); + } + + private void assertRetriableError( + DescribeTransactionsHandler handler, + String transactionalId, + Errors error + ) { + ApiResult<CoordinatorKey, TransactionDescription> result = handleResponseError(handler, transactionalId, error); + assertEquals(emptyList(), result.unmappedKeys); + assertEquals(emptyMap(), result.failedKeys); + } + + private void assertUnmappedKey( + DescribeTransactionsHandler handler, + String transactionalId, + Errors error + ) { + CoordinatorKey key = coordinatorKey(transactionalId); + ApiResult<CoordinatorKey, TransactionDescription> result = handleResponseError(handler, transactionalId, error); + assertEquals(emptyMap(), result.failedKeys); + assertEquals(singletonList(key), result.unmappedKeys); + } + + private ApiResult<CoordinatorKey, TransactionDescription> handleResponseError( + DescribeTransactionsHandler handler, + String transactionalId, + Errors error + ) { + int brokerId = 1; + + CoordinatorKey key = coordinatorKey(transactionalId); + Set<CoordinatorKey> keys = mkSet(key); + + DescribeTransactionsResponseData.TransactionState transactionState = new DescribeTransactionsResponseData.TransactionState() + .setErrorCode(error.code()) + .setTransactionalId(transactionalId); + + DescribeTransactionsResponse response = new DescribeTransactionsResponse(new DescribeTransactionsResponseData() + .setTransactionStates(singletonList(transactionState))); + + ApiResult<CoordinatorKey, TransactionDescription> result = handler.handleResponse(brokerId, keys, response); + assertEquals(emptyMap(), result.completedKeys); + return result; + } + + private void assertLookup( + DescribeTransactionsHandler handler, + Set<String> transactionalIds + ) { + Set<CoordinatorKey> keys = coordinatorKeys(transactionalIds); + DescribeTransactionsRequest.Builder request = handler.buildRequest(1, keys); + assertEquals(transactionalIds, new HashSet<>(request.data.transactionalIds())); + } + + private static CoordinatorKey coordinatorKey(String transactionalId) { + return new CoordinatorKey(transactionalId, FindCoordinatorRequest.CoordinatorType.TRANSACTION); + } Review comment: nit: We have the same helper methods in few places. I wonder if we could have only one and use it everywhere? `CoordinatorKey` might be a good place (e.g. `CoordinatorKey#transactionalKey`). ########## File path: clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java ########## @@ -5432,6 +5437,47 @@ public void testDescribeProducersRetryAfterDisconnect() throws Exception { } } + @Test + public void testDescribeTransactions() throws Exception { + try (AdminClientUnitTestEnv env = mockClientEnv()) { + String transactionalId = "foo"; + Node coordinator = env.cluster().nodes().iterator().next(); + TransactionDescription expected = new TransactionDescription( + coordinator.id(), TransactionState.COMPLETE_COMMIT, 12345L, + 15, 10000L, OptionalLong.empty(), emptySet()); + Review comment: nit: One empty line could be removed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org