I am having a strange interaction with the simple consumer API in Kafka 0.8.2.

Running the following code:


public FetchedMessageList send(String topic, int partition, long offset, int 
fetchSize) throws KafkaException {
        try {
            FetchedMessageList results = new FetchedMessageList();
            FetchRequest req = new FetchRequestBuilder()
                    .clientId(clientId)
                    .addFetch(topic, partition, offset, fetchSize)
                    .minBytes(1)
                    .maxWait(250)
                    .build();
            FetchResponse resp = consumer.fetch(req);
            if(resp.hasError()) {
                int code =  resp.errorCode(topic, partition);
                if (code == 9) {
                    logger.warn("Fetch response contained error code: {}", 
resp.errorCode(topic, partition));
                } else {
                    throw new RuntimeException(String.format("Fetch response 
contained error code %d", code));
                }
            }

Following a leader election, the FetchResponse has an error, and trips the 
(code == 9) condition, causing the program to emit a message via logger.warn.
However, the emitted log message reads “Fetch response contained error code: 6”.
I looked over the Kafka source code and don’t see how resp.errorCode might 
return a different value the second time.
Has anyone seen this before?

Thanks,

Matt Coolbeth

Reply via email to