Hello!
I'm using Kafka 0.9.1. Suppose that I have created a topic "my-topic" with
1 partition.With the following code, I got StaleMetadataException in
Fetcher->listOffset method and the thread is blocked in an infinite while
loop (while true).
I came to this error by mistake, so what to do in this cases? Should we do
a check in Zookeeper that partition really exists and if not then thrown an
exception or should we try for a max retries or timeout exceeds and escape
the infinite loop?
I think a JIRA should be raised.
Regards,
Florin
public class BadAssignedPartitionKConsumer {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers",
"localhost:9092");
props.put("group.id", "test");
props.put("client.id", UUID.randomUUID().toString());
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
String topic = "my-topic";
*consumer.assign(Arrays.asList(new TopicPartition(topic, 6)));*
* consumer.position(new TopicPartition(topic, 6));*
consumer.close();
}
}
Here is the Fetcher's method
private long listOffset(TopicPartition partition, long timestamp) {
* while (true) {//infinte loop*
RequestFuture<Long> future = sendListOffsetRequest(partition,
timestamp);
client.poll(future);
if (future.succeeded())
return future.value();
if (!future.isRetriable())
throw future.exception();
* if (future.exception() instanceof InvalidMetadataException)*
* client.awaitMetadataUpdate();//here I got the *
*StaleMetadataException
and we will not escape the loop*
else
time.sleep(retryBackoffMs);
}
}