aarti gupta created KAFKA-1657:
----------------------------------

             Summary: Fetch request using Simple consumer fails due to failed 
due to Leader not local for partition
                 Key: KAFKA-1657
                 URL: https://issues.apache.org/jira/browse/KAFKA-1657
             Project: Kafka
          Issue Type: Bug
    Affects Versions: 0.8.1.1
            Reporter: aarti gupta


I have a three node Kafka cluster, running on the same physical machine, (on 
different ports)
 with replication factor = 3, and a single topic with 3 partitions.
Multiple producers write to the topic, and a custom partitioner is used to 
direct messages to a given partition.

I use the simple consumer to read from a given partition of the topic, and have 
three instances of my consumer running

The code snippet for the simple consumer suggests, that having any node in the 
cluster, (not necessarily the leader for that partition) is sufficient to find 
the leader for the partition, however, on running this, I find, that given a 
different node in the cluster, a null pointer exception is thrown, and the logs 
show the error


[2014-09-28 20:40:20,984] WARN [KafkaApi-1] Fetch request with correlation id 0 
from client testClient on partition [VCCTask,1] failed due to Leader not local 
for partition [VCCTask,1] on broker 1 (kafka.server.KafkaApis)


bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic VCCTask
Topic:VCCTask   PartitionCount:3        ReplicationFactor:3     Configs:
        Topic: VCCTask  Partition: 0    Leader: 1       Replicas: 2,3,1 Isr: 
1,2,3
        Topic: VCCTask  Partition: 1    Leader: 1       Replicas: 3,1,2 Isr: 
1,2,3
        Topic: VCCTask  Partition: 2    Leader: 1       Replicas: 1,2,3 Isr: 
1,2,3


If i specify the leader for the partition, instead of any node in the cluster, 
everything works great, but this is an operational nightmare.

I was able to reproduce this using a simple test, where a producer writes 
numbers from 1 to 999999, and the consumers, consume from a specific partition.

Here are the code snippets

public class TestConsumerStoreOffsetZookeeper {


    public static void main(String[] args) throws JSONException {

        TestConsumerStoreOffsetZookeeper testConsumer = new 
TestConsumerStoreOffsetZookeeper();
        JSONObject jsonObject = new JSONObject();
        jsonObject.put("topicName", "VCCTask");
        jsonObject.put("clientName", "testClient");
        jsonObject.put("partition", args[0]);
        jsonObject.put("hostPort", "172.16.78.171");
        jsonObject.put("znodeName", "VCCTask");
        jsonObject.put("port", args[1]);
        testConsumer.initialize(jsonObject);
        final long startTime = System.currentTimeMillis();
        testConsumer.startReceiving(new FutureCallback<byte[]>() {
            int noOfMessagesConsumed= 0;
            @Override
            public void onSuccess(byte[] result) {
                LOG.info("YES!! " + ByteBuffer.wrap(result).getLong());
                ++noOfMessagesConsumed;
                LOG.info("# Messages consumed "+ noOfMessagesConsumed +" Time 
elapsed"+ (System.currentTimeMillis()-startTime )/1000 +" seconds");
            }

            @Override
            public void onFailure(Throwable t) {
                LOG.info("NO!! " + t.fillInStackTrace().getMessage());
            }
        });
    }


    private String topicToRead;
    private static Logger LOG = 
Logger.getLogger("TestConsumerStoreOffsetZookeeper");
    List<String> seedBrokers = Lists.newArrayList("localhost");
    private int port;
    private SimpleConsumer consumer;
    Integer partition;
    String clientName;
    private Broker currentLeader;
    private String counter;
    CuratorFramework zooKeeper;


    public void startReceiving(final FutureCallback<byte[]> futureCallback) {

        findLeaderAndUpdateSelfPointers(seedBrokers, port, topicToRead, 
partition);
        LOG.info("Kafka consumer delegate listening on topic " + topicToRead + 
" and partition " + partition);
        int numErrors = 0;
        while (true) {
            long latestOffset = 0;
            Stat stat = null;
            final String path = "/" + topicToRead + "/"+partition;
            try {
                //************************Read top of the
                stat = zooKeeper.checkExists().forPath(path);
                if (stat == null) {
                    latestOffset = 
getLastOffsetFromBeginningOfStream(this.consumer, topicToRead, partition, 
OffsetRequest.EarliestTime(), clientName);
                    byte b[] = new byte[8];
                    ByteBuffer byteBuffer = ByteBuffer.wrap(b);
                    byteBuffer.putLong(latestOffset);
                    final String s = 
zooKeeper.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE).forPath(path);
                    LOG.info(" Zookeeper create string is "+ s);
                    stat = zooKeeper.checkExists().forPath(path);
                    if (stat == null) {
                        LOG.info("Stat was null");
                        throw new RuntimeException("Stat in zookeeper was null, 
cannot continue as message stream cannot be persisted");
                    }
                } else {
                    final byte[] data = 
zooKeeper.getData().storingStatIn(stat).forPath(path);
                    if(data.length>0){
                    latestOffset = ByteBuffer.wrap(data).getLong();
                    }else {
                        latestOffset = 
getLastOffsetFromBeginningOfStream(this.consumer,topicToRead,partition,OffsetRequest.EarliestTime(),clientName);
                    }


                }
            } catch (Exception e) {
                throw new RuntimeException(e.fillInStackTrace().getMessage());
            }
            LOG.info("Topic name is " + topicToRead);
            LOG.info("Last offset is " + latestOffset);
            LOG.info("Constructing new fetch request on  " + topicToRead + " 
from offset" + latestOffset);
            FetchRequest request = new 
FetchRequestBuilder().clientId(clientName).addFetch(topicToRead, partition, 
latestOffset, 100000).build();
            FetchResponse fetchResponse = consumer.fetch(request);
            if (fetchResponse.hasError()) {
                numErrors++;
                final short code = fetchResponse.errorCode(topicToRead, 
partition);
                LOG.info("Error fetching data from broker: " + consumer.host() 
+ " Reason " + code);
                if (code == ErrorMapping.OffsetOutOfRangeCode()) {
                    LOG.info("Offset out of range error: calculating offset 
again");
                    throw new RuntimeException("Offset is out of range, 
multiple consumers are not allowed, this consumer will exit");
                }
                if (numErrors > 5 && code!=3) {
                    consumer.close();
                    consumer = null;
                    findLeaderAndUpdateSelfPointers(seedBrokers, port, 
topicToRead, partition);
                    numErrors = 0;
                }
                continue;
            }
            final ByteBufferMessageSet messageAndOffsets = 
fetchResponse.messageSet(topicToRead, partition);
            final int validBytes = messageAndOffsets.validBytes();
            LOG.info("Received fetch response on topic  " + topicToRead + " 
from offset" + latestOffset + " fetch response valid bytes is " + validBytes);
            try {
                if (validBytes == 0) {
                    LOG.info("No message received");
                    //Don't keep hammering Kafka
                    Thread.sleep(1000);
                    continue;
                }
                for (MessageAndOffset messageAndOffset : messageAndOffsets) {
                    LOG.info("Processing offset");
                    final long currentOffset = messageAndOffset.offset();
                    LOG.info("Processing offset " + currentOffset);
                    //in case of compression entire block may be received
                    if (currentOffset < latestOffset) {
                        LOG.info("Found an old offset: " + currentOffset + 
"Expecting:" + latestOffset);
                        continue;
                    }
                    final ByteBuffer payload = 
messageAndOffset.message().payload();
                    byte[] bytes = new byte[payload.limit()];
                    payload.get(bytes);
                    LOG.info(this.getClass().getName() + " Received message 
from offset" + String.valueOf(latestOffset) + new String(bytes, "UTF-8"));
                    LOG.info(this.getClass().getName() + " Executing future 
callback");
                    //TODO ***************this should be atomic with writing 
job in db***********************
                    futureCallback.onSuccess(bytes);
                    try {
                        long nextOffset = messageAndOffset.nextOffset();
                        incrementOffset(nextOffset, stat, path);
                    } catch (KeeperException | InterruptedException e) {
                        LOG.info("Encountered exception in writing to" + 
e.fillInStackTrace().getMessage());
                    }
                    
//****************************************************************************************
                }
                LOG.info("Outside for loop");
            } catch (Exception e1) {
                LOG.info("Error in processing message or running callback " + 
e1.getMessage());
                futureCallback.onFailure(e1);
                throw new RuntimeException(e1);
            }
        }

    }

    private void incrementOffset(long nextOffset, Stat stat, String path) 
throws Exception {
        if (stat == null) {
            throw new RuntimeException("Given stat was null");
        }
        byte b[] = new byte[8];
        ByteBuffer byteBuffer = ByteBuffer.wrap(b);
        byteBuffer.putLong(nextOffset);
        LOG.info("Offset consumed successfully: Setting offset in zookeeper as 
next offset: "+ nextOffset);
        final Stat statWrite = zooKeeper.setData().forPath(path, b);
        if(statWrite.getDataLength() ==0){
            throw new RuntimeException("Unable to save offset in zookeeper");
        }

    }


    //TODO: agupta adapters should not have an initialize method, rename and 
merge with startListening
    public void initialize(JSONObject configData) {
        try {
            final String hostPort = configData.getString("hostPort");
            zooKeeper = CuratorFrameworkFactory.newClient(hostPort,new 
ExponentialBackoffRetry(10, 3000));
            zooKeeper.start();
            this.counter = configData.getString("znodeName");
            this.topicToRead = configData.getString("topicName");
            LOG.info("Topic name is " + topicToRead);
            //TODO: agupta: read seedbrokers from zookeeper
            //*ZkClient zkClient = new ZkClient("localhost:2108", 4000, 6000, 
new BytesPushThroughSerializer());
            //List<String> brokerList = zkClient.getChildren("/brokers/ips");
            List<String> seedBrokers = Lists.newArrayList("localhost");
            this.seedBrokers = seedBrokers;
            this.port = configData.getInt("port");
            this.partition= configData.getInt("partition");
            this.clientName = configData.getString("clientName");
            LOG.info("Finding leader with for partition " + partition + " 
clientName " + clientName);
        } catch (JSONException | IOException e) {
            e.printStackTrace();
            LOG.info("Error parsing configuration" + e.getMessage());
        } catch (Exception e) {
            LOG.info("Error starting zookeeper" + e.getMessage());

        }
    }


    /**
     * Find last offset to define where to start reading if this is the first 
read
     *
     * @param consumer
     * @param topic
     * @param partition
     * @param whichTime
     * @param clientName
     * @return
     */

    public static long getLastOffsetFromBeginningOfStream(SimpleConsumer 
consumer, String topic, int partition,
                                                          long whichTime, 
String clientName) {
        TopicAndPartition topicAndPartition = new TopicAndPartition(topic, 
partition);
        Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new 
HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
        requestInfo.put(topicAndPartition, new 
PartitionOffsetRequestInfo(whichTime, 1));
        kafka.javaapi.OffsetRequest request = new 
kafka.javaapi.OffsetRequest(requestInfo, 
kafka.api.OffsetRequest.CurrentVersion(), clientName);
        OffsetResponse response = consumer.getOffsetsBefore(request);
        if (response.hasError()) {
            System.out.println("Error fetching data Offset Data the Broker. 
Reason: " + response.errorCode(topic, partition));
            return 0;
        }
        long[] offsets = response.offsets(topic, partition);
        return offsets[0];
    }


    /**
     * Return the lead broker for a given topic and partition
     *
     * @param seedBrokers
     * @param port
     * @param topic
     * @param partition
     * @return
     */
    private PartitionMetadata findLeaderAndUpdateSelfPointers(List<String> 
seedBrokers, int port, String topic, int partition) {
        PartitionMetadata returnMetaData = null;
        loop:
        for (String seed : seedBrokers) {
            SimpleConsumer consumer = null;
            try {
                this.consumer = new SimpleConsumer(seed, port, 100000, 64 * 
1024, "leaderLookup");

                List<String> topics = Collections.singletonList(topic);
                TopicMetadataRequest req = new TopicMetadataRequest(topics);
                kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);

                List<TopicMetadata> metaData = resp.topicsMetadata();

                for (TopicMetadata item : metaData) {
                    for (PartitionMetadata part : item.partitionsMetadata()) {
                        if (part.partitionId() == partition) {
                            returnMetaData = part;
                            LOG.info("Found leader " + 
returnMetaData.leader().host());
                            break loop;
                        }
                    }
                }
            } catch (Exception e) {
                LOG.info("Error communicating with Broker [" + seed + "] to 
find Leader for [" + topic
                        + ", " + partition + "] Reason: " + e);
            } finally {
                if (consumer != null) consumer.close();
            }
        }
        LOG.info("KafkaConsumerDelegate initializing self pointers ");
        if (returnMetaData != null) {
            currentLeader = returnMetaData.leader();
            if (currentLeader != null) {
                this.consumer = new SimpleConsumer(currentLeader.host(), 
currentLeader.port(), 100000, 64 * 1024, clientName);
            }
        }
        LOG.info("KafkaConsumerDelegate: returning metadata");
        return returnMetaData;
    }



*******************************






--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to