[jira] [Resolved] (KAFKA-6014) new consumer mirror maker halts after committing offsets to a deleted topic

2023-02-24 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6014?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-6014.

Resolution: Fixed

> new consumer mirror maker halts after committing offsets to a deleted topic
> ---
>
> Key: KAFKA-6014
> URL: https://issues.apache.org/jira/browse/KAFKA-6014
> Project: Kafka
>  Issue Type: Bug
>Reporter: Onur Karaman
>Assignee: Jason Gustafson
>Priority: Major
>
> New consumer throws an unexpected KafkaException when trying to commit to a 
> topic that has been deleted. MirrorMaker.commitOffsets doesn't attempt to 
> catch the KafkaException and just kills the process. We didn't see this in 
> the old consumer because old consumer just silently drops failed offset 
> commits.
> I ran a quick experiment locally to prove the behavior. The experiment:
> 1. start up a single broker
> 2. create a single-partition topic t
> 3. create a new consumer that consumes topic t
> 4. make the consumer commit every few seconds
> 5. delete topic t
> 6. expect: KafkaException that kills the process.
> Here's my script:
> {code}
> package org.apache.kafka.clients.consumer;
> import org.apache.kafka.common.TopicPartition;
> import java.util.Collections;
> import java.util.List;
> import java.util.Properties;
> public class OffsetCommitTopicDeletionTest {
> public static void main(String[] args) throws InterruptedException {
> Properties props = new Properties();
> props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
> "localhost:9090");
> props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "g");
> props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
> "org.apache.kafka.common.serialization.ByteArrayDeserializer");
> props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
> "org.apache.kafka.common.serialization.ByteArrayDeserializer");
> props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
> KafkaConsumer kafkaConsumer = new 
> KafkaConsumer<>(props);
> TopicPartition partition = new TopicPartition("t", 0);
> List partitions = 
> Collections.singletonList(partition);
> kafkaConsumer.assign(partitions);
> while (true) {
> kafkaConsumer.commitSync(Collections.singletonMap(partition, new 
> OffsetAndMetadata(0, "")));
> Thread.sleep(1000);
> }
> }
> }
> {code}
> Here are the other commands:
> {code}
> > rm -rf /tmp/zookeeper/ /tmp/kafka-logs* logs*
> > ./gradlew clean jar
> > ./bin/zookeeper-server-start.sh config/zookeeper.properties
> > export LOG_DIR=logs0 && ./bin/kafka-server-start.sh 
> > config/server0.properties
> > ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic t 
> > --partitions 1 --replication-factor 1
> > ./bin/kafka-run-class.sh 
> > org.apache.kafka.clients.consumer.OffsetCommitTopicDeletionTest
> > ./bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic t
> {code}
> Here is the output:
> {code}
> [2017-10-04 20:00:14,451] ERROR [Consumer clientId=consumer-1, groupId=g] 
> Offset commit failed on partition t-0 at offset 0: This server does not host 
> this topic-partition. 
> (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
> Exception in thread "main" org.apache.kafka.common.KafkaException: Partition 
> t-0 may not exist or user may not have Describe access to topic
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:789)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:734)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:808)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:788)
>   at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204)
>   at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167)
>   at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:506)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:353)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:268)
>   at 
> 

[jira] [Resolved] (KAFKA-6014) new consumer mirror maker halts after committing offsets to a deleted topic

2018-07-25 Thread Jason Gustafson (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6014?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-6014.

Resolution: Resolved

We changed the behavior of the consumer in KAFKA-6829 so that unknown topic 
errors are retriable in order to be consistent with how the error is handled 
everywhere else and because there are cases where this error may be based on 
stale metadata. That should resolve this issue, but someone can reopen if 
necessary.

> new consumer mirror maker halts after committing offsets to a deleted topic
> ---
>
> Key: KAFKA-6014
> URL: https://issues.apache.org/jira/browse/KAFKA-6014
> Project: Kafka
>  Issue Type: Bug
>Reporter: Onur Karaman
>Assignee: Jason Gustafson
>Priority: Major
>
> New consumer throws an unexpected KafkaException when trying to commit to a 
> topic that has been deleted. MirrorMaker.commitOffsets doesn't attempt to 
> catch the KafkaException and just kills the process. We didn't see this in 
> the old consumer because old consumer just silently drops failed offset 
> commits.
> I ran a quick experiment locally to prove the behavior. The experiment:
> 1. start up a single broker
> 2. create a single-partition topic t
> 3. create a new consumer that consumes topic t
> 4. make the consumer commit every few seconds
> 5. delete topic t
> 6. expect: KafkaException that kills the process.
> Here's my script:
> {code}
> package org.apache.kafka.clients.consumer;
> import org.apache.kafka.common.TopicPartition;
> import java.util.Collections;
> import java.util.List;
> import java.util.Properties;
> public class OffsetCommitTopicDeletionTest {
> public static void main(String[] args) throws InterruptedException {
> Properties props = new Properties();
> props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
> "localhost:9090");
> props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "g");
> props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
> "org.apache.kafka.common.serialization.ByteArrayDeserializer");
> props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
> "org.apache.kafka.common.serialization.ByteArrayDeserializer");
> props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
> KafkaConsumer kafkaConsumer = new 
> KafkaConsumer<>(props);
> TopicPartition partition = new TopicPartition("t", 0);
> List partitions = 
> Collections.singletonList(partition);
> kafkaConsumer.assign(partitions);
> while (true) {
> kafkaConsumer.commitSync(Collections.singletonMap(partition, new 
> OffsetAndMetadata(0, "")));
> Thread.sleep(1000);
> }
> }
> }
> {code}
> Here are the other commands:
> {code}
> > rm -rf /tmp/zookeeper/ /tmp/kafka-logs* logs*
> > ./gradlew clean jar
> > ./bin/zookeeper-server-start.sh config/zookeeper.properties
> > export LOG_DIR=logs0 && ./bin/kafka-server-start.sh 
> > config/server0.properties
> > ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic t 
> > --partitions 1 --replication-factor 1
> > ./bin/kafka-run-class.sh 
> > org.apache.kafka.clients.consumer.OffsetCommitTopicDeletionTest
> > ./bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic t
> {code}
> Here is the output:
> {code}
> [2017-10-04 20:00:14,451] ERROR [Consumer clientId=consumer-1, groupId=g] 
> Offset commit failed on partition t-0 at offset 0: This server does not host 
> this topic-partition. 
> (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
> Exception in thread "main" org.apache.kafka.common.KafkaException: Partition 
> t-0 may not exist or user may not have Describe access to topic
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:789)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:734)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:808)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:788)
>   at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204)
>   at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167)
>   at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:506)
>   at 
>