[
https://issues.apache.org/jira/browse/KAFKA-7296?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16582798#comment-16582798
]
ASF GitHub Bot commented on KAFKA-7296:
---------------------------------------
hachikuji closed pull request #5514: KAFKA-7296; Handle coordinator loading
error in TxnOffsetCommit
URL: https://github.com/apache/kafka/pull/5514
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
index b242d5a65a6..c0685c9cb2d 100644
---
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
+++
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
@@ -1272,50 +1272,49 @@ String coordinatorKey() {
public void handleResponse(AbstractResponse response) {
TxnOffsetCommitResponse txnOffsetCommitResponse =
(TxnOffsetCommitResponse) response;
boolean coordinatorReloaded = false;
- boolean hadFailure = false;
Map<TopicPartition, Errors> errors =
txnOffsetCommitResponse.errors();
+ log.debug("Received TxnOffsetCommit response for consumer group
{}: {}", builder.consumerGroupId(),
+ errors);
+
for (Map.Entry<TopicPartition, Errors> entry : errors.entrySet()) {
TopicPartition topicPartition = entry.getKey();
Errors error = entry.getValue();
if (error == Errors.NONE) {
- log.debug("Successfully added offsets {} from consumer
group {} to transaction.",
- builder.offsets(), builder.consumerGroupId());
pendingTxnOffsetCommits.remove(topicPartition);
} else if (error == Errors.COORDINATOR_NOT_AVAILABLE
|| error == Errors.NOT_COORDINATOR
|| error == Errors.REQUEST_TIMED_OUT) {
- hadFailure = true;
if (!coordinatorReloaded) {
coordinatorReloaded = true;
lookupCoordinator(FindCoordinatorRequest.CoordinatorType.GROUP,
builder.consumerGroupId());
}
- } else if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION) {
- hadFailure = true;
+ } else if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION
+ || error == Errors.COORDINATOR_LOAD_IN_PROGRESS) {
+ // If the topic is unknown or the coordinator is loading,
retry with the current coordinator
+ continue;
} else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
abortableError(new
GroupAuthorizationException(builder.consumerGroupId()));
- return;
+ break;
} else if (error ==
Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED
|| error == Errors.INVALID_PRODUCER_EPOCH
|| error == Errors.UNSUPPORTED_FOR_MESSAGE_FORMAT) {
fatalError(error.exception());
- return;
+ break;
} else {
fatalError(new KafkaException("Unexpected error in
TxnOffsetCommitResponse: " + error.message()));
- return;
+ break;
}
}
- if (!hadFailure || !result.isSuccessful()) {
- // all attempted partitions were either successful, or there
was a fatal failure.
- // either way, we are not retrying, so complete the request.
+ if (result.isCompleted()) {
+ pendingTxnOffsetCommits.clear();
+ } else if (pendingTxnOffsetCommits.isEmpty()) {
result.done();
- return;
- }
-
- // retry the commits which failed with a retriable error.
- if (!pendingTxnOffsetCommits.isEmpty())
+ } else {
+ // Retry the commits which failed with a retriable error
reenqueue();
+ }
}
}
}
diff --git
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
index 550d003406b..606fa714a34 100644
---
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
@@ -118,7 +118,6 @@ public void setup() {
Map<String, String> metricTags = new LinkedHashMap<>();
metricTags.put("client-id", CLIENT_ID);
int batchSize = 16 * 1024;
- int requestTimeoutMs = 1500;
long deliveryTimeoutMs = 3000L;
long totalSize = 1024 * 1024;
String metricGrpName = "producer-metrics";
@@ -129,7 +128,9 @@ public void setup() {
Metrics metrics = new Metrics(metricConfig, time);
SenderMetricsRegistry senderMetrics = new
SenderMetricsRegistry(metrics);
- this.accumulator = new RecordAccumulator(logContext, batchSize,
CompressionType.NONE, 0L, 0L, deliveryTimeoutMs, metrics, metricGrpName, time,
apiVersions, transactionManager, new BufferPool(totalSize, batchSize, metrics,
time, metricGrpName));
+ this.accumulator = new RecordAccumulator(logContext, batchSize,
CompressionType.NONE, 0L, 0L,
+ deliveryTimeoutMs, metrics, metricGrpName, time, apiVersions,
transactionManager,
+ new BufferPool(totalSize, batchSize, metrics, time,
metricGrpName));
this.sender = new Sender(logContext, this.client, this.metadata,
this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL,
MAX_RETRIES, senderMetrics, this.time, REQUEST_TIMEOUT, 50,
transactionManager, apiVersions);
this.metadata.update(this.cluster, Collections.<String>emptySet(),
time.milliseconds());
@@ -920,13 +921,13 @@ public void
testGroupAuthorizationFailureInTxnOffsetCommit() {
final String consumerGroupId = "consumer";
final long pid = 13131L;
final short epoch = 1;
- final TopicPartition tp = new TopicPartition("foo", 0);
+ final TopicPartition tp1 = new TopicPartition("foo", 0);
doInitTransactions(pid, epoch);
transactionManager.beginTransaction();
TransactionalRequestResult sendOffsetsResult =
transactionManager.sendOffsetsToTransaction(
- singletonMap(tp, new OffsetAndMetadata(39L)), consumerGroupId);
+ singletonMap(tp1, new OffsetAndMetadata(39L)),
consumerGroupId);
prepareAddOffsetsToTxnResponse(Errors.NONE, consumerGroupId, pid,
epoch);
sender.run(time.milliseconds()); // AddOffsetsToTxn Handled,
TxnOffsetCommit Enqueued
@@ -935,7 +936,7 @@ public void
testGroupAuthorizationFailureInTxnOffsetCommit() {
prepareFindCoordinatorResponse(Errors.NONE, false,
CoordinatorType.GROUP, consumerGroupId);
sender.run(time.milliseconds()); // FindCoordinator Returned
- prepareTxnOffsetCommitResponse(consumerGroupId, pid, epoch,
singletonMap(tp, Errors.GROUP_AUTHORIZATION_FAILED));
+ prepareTxnOffsetCommitResponse(consumerGroupId, pid, epoch,
singletonMap(tp1, Errors.GROUP_AUTHORIZATION_FAILED));
sender.run(time.milliseconds()); // TxnOffsetCommit Handled
assertTrue(transactionManager.hasError());
@@ -943,6 +944,7 @@ public void
testGroupAuthorizationFailureInTxnOffsetCommit() {
assertTrue(sendOffsetsResult.isCompleted());
assertFalse(sendOffsetsResult.isSuccessful());
assertTrue(sendOffsetsResult.error() instanceof
GroupAuthorizationException);
+ assertFalse(transactionManager.hasPendingOffsetCommits());
GroupAuthorizationException exception = (GroupAuthorizationException)
sendOffsetsResult.error();
assertEquals(consumerGroupId, exception.groupId());
@@ -1753,7 +1755,16 @@ public void
testHandlingOfUnknownTopicPartitionErrorOnAddPartitions() throws Int
}
@Test
- public void testHandlingOfUnknownTopicPartitionErrorOnTxnOffsetCommit()
throws InterruptedException {
+ public void testHandlingOfUnknownTopicPartitionErrorOnTxnOffsetCommit() {
+ testRetriableErrorInTxnOffsetCommit(Errors.UNKNOWN_TOPIC_OR_PARTITION);
+ }
+
+ @Test
+ public void testHandlingOfCoordinatorLoadingErrorOnTxnOffsetCommit() {
+
testRetriableErrorInTxnOffsetCommit(Errors.COORDINATOR_LOAD_IN_PROGRESS);
+ }
+
+ private void testRetriableErrorInTxnOffsetCommit(Errors error) {
final long pid = 13131L;
final short epoch = 1;
@@ -1762,6 +1773,7 @@ public void
testHandlingOfUnknownTopicPartitionErrorOnTxnOffsetCommit() throws I
transactionManager.beginTransaction();
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
+ offsets.put(tp0, new OffsetAndMetadata(1));
offsets.put(tp1, new OffsetAndMetadata(1));
final String consumerGroupId = "myconsumergroup";
@@ -1773,12 +1785,13 @@ public void
testHandlingOfUnknownTopicPartitionErrorOnTxnOffsetCommit() throws I
assertFalse(addOffsetsResult.isCompleted()); // The request should
complete only after the TxnOffsetCommit completes.
Map<TopicPartition, Errors> txnOffsetCommitResponse = new HashMap<>();
- txnOffsetCommitResponse.put(tp1, Errors.UNKNOWN_TOPIC_OR_PARTITION);
+ txnOffsetCommitResponse.put(tp0, Errors.NONE);
+ txnOffsetCommitResponse.put(tp1, error);
prepareFindCoordinatorResponse(Errors.NONE, false,
CoordinatorType.GROUP, consumerGroupId);
prepareTxnOffsetCommitResponse(consumerGroupId, pid, epoch,
txnOffsetCommitResponse);
- assertEquals(null,
transactionManager.coordinator(CoordinatorType.GROUP));
+ assertNull(transactionManager.coordinator(CoordinatorType.GROUP));
sender.run(time.milliseconds()); // try to send
TxnOffsetCommitRequest, but find we don't have a group coordinator.
sender.run(time.milliseconds()); // send find coordinator for group
request
assertNotNull(transactionManager.coordinator(CoordinatorType.GROUP));
@@ -1803,7 +1816,7 @@ public void
shouldNotAddPartitionsToTransactionWhenTopicAuthorizationFailed() th
}
@Test
- public void
shouldNotSendAbortTxnRequestWhenOnlyAddPartitionsRequestFailed() throws
Exception {
+ public void
shouldNotSendAbortTxnRequestWhenOnlyAddPartitionsRequestFailed() {
final long pid = 13131L;
final short epoch = 1;
@@ -2399,7 +2412,9 @@ public boolean matches(AbstractRequest body) {
};
}
- private void prepareAddOffsetsToTxnResponse(Errors error, final String
consumerGroupId, final long producerId,
+ private void prepareAddOffsetsToTxnResponse(final Errors error,
+ final String consumerGroupId,
+ final long producerId,
final short producerEpoch) {
client.prepareResponse(new MockClient.RequestMatcher() {
@Override
@@ -2449,7 +2464,7 @@ private void doInitTransactions(long pid, short epoch) {
private void assertAbortableError(Class<? extends RuntimeException> cause)
{
try {
- transactionManager.beginTransaction();
+ transactionManager.beginCommit();
fail("Should have raised " + cause.getSimpleName());
} catch (KafkaException e) {
assertTrue(cause.isAssignableFrom(e.getCause().getClass()));
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Producer should handle COORDINATOR_LOADING error in TxnOffsetCommit
> -------------------------------------------------------------------
>
> Key: KAFKA-7296
> URL: https://issues.apache.org/jira/browse/KAFKA-7296
> Project: Kafka
> Issue Type: Bug
> Affects Versions: 2.0.0
> Reporter: Jason Gustafson
> Assignee: Jason Gustafson
> Priority: Major
> Fix For: 2.0.1
>
>
> The producer should check for the COORDINATOR_LOADING error when handling the
> TxnOffsetCommit response. We fixed the logic in GroupCoordinator to correctly
> check for offset loading in https://github.com/apache/kafka/pull/4788. Prior
> to this fix, it was not possible to see this error, so we were able to get
> away with not having the check.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)