Pengwei created KAFKA-2995:
------------------------------

             Summary: in 0.9.0.0 Old Consumer's commitOffsets with specify 
partition can submit not exists topic and partition to zk
                 Key: KAFKA-2995
                 URL: https://issues.apache.org/jira/browse/KAFKA-2995
             Project: Kafka
          Issue Type: Bug
          Components: consumer
    Affects Versions: 0.9.0.0
            Reporter: Pengwei
            Assignee: Neha Narkhede
             Fix For: 0.9.1.0


in 0.9.0.0 Version, the Old Consumer's commit interface is below:

def commitOffsets(offsetsToCommit: immutable.Map[TopicAndPartition, 
OffsetAndMetadata], isAutoCommit: Boolean) {
    trace("OffsetMap: %s".format(offsetsToCommit))
    var retriesRemaining = 1 + (if (isAutoCommit) 0 else 
config.offsetsCommitMaxRetries) // no retries for commits from auto-commit
    var done = false
    while (!done) {
      val committed = offsetsChannelLock synchronized {
        // committed when we receive either no error codes or only 
MetadataTooLarge errors
        if (offsetsToCommit.size > 0) {
          if (config.offsetsStorage == "zookeeper") {
            offsetsToCommit.foreach { case (topicAndPartition, 
offsetAndMetadata) =>
              commitOffsetToZooKeeper(topicAndPartition, 
offsetAndMetadata.offset)
            }
      ........

this interface does not check the parameter offsetsToCommit, if offsetsToCommit 
has some topic or partition which is not exist in the kafka. Then will create 
an entry in the  /consumers/[group]/offsets/[Not exists topic]   directory.

We should check the offsetsToCommit's topic and partition is exists or just 
check it is contain in the topicRegistry or checkpointedZkOffsets ?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to