Pengwei created KAFKA-2995: ------------------------------ Summary: in 0.9.0.0 Old Consumer's commitOffsets with specify partition can submit not exists topic and partition to zk Key: KAFKA-2995 URL: https://issues.apache.org/jira/browse/KAFKA-2995 Project: Kafka Issue Type: Bug Components: consumer Affects Versions: 0.9.0.0 Reporter: Pengwei Assignee: Neha Narkhede Fix For: 0.9.1.0
in 0.9.0.0 Version, the Old Consumer's commit interface is below: def commitOffsets(offsetsToCommit: immutable.Map[TopicAndPartition, OffsetAndMetadata], isAutoCommit: Boolean) { trace("OffsetMap: %s".format(offsetsToCommit)) var retriesRemaining = 1 + (if (isAutoCommit) 0 else config.offsetsCommitMaxRetries) // no retries for commits from auto-commit var done = false while (!done) { val committed = offsetsChannelLock synchronized { // committed when we receive either no error codes or only MetadataTooLarge errors if (offsetsToCommit.size > 0) { if (config.offsetsStorage == "zookeeper") { offsetsToCommit.foreach { case (topicAndPartition, offsetAndMetadata) => commitOffsetToZooKeeper(topicAndPartition, offsetAndMetadata.offset) } ........ this interface does not check the parameter offsetsToCommit, if offsetsToCommit has some topic or partition which is not exist in the kafka. Then will create an entry in the /consumers/[group]/offsets/[Not exists topic] directory. We should check the offsetsToCommit's topic and partition is exists or just check it is contain in the topicRegistry or checkpointedZkOffsets ? -- This message was sent by Atlassian JIRA (v6.3.4#6332)