Hi Ajeet, Which version of Kafka are you using? I remember the OffsetCommitRequest's requestInfo should be a map of topicPartition -> OffsetAndMetadata, not OffsetMetadataAndError.
Guozhang On Tue, Jan 27, 2015 at 3:31 AM, ajeet singh <ajeetpr.si...@gmail.com> wrote: > Hi , > > I am new to Kafka, I have a use case in which My Consumer can't use auto > commit offset feature, I have to go with option of manual Commit. With High > level Consumer I have have constrain that consumer can commit only current > offset, but in my case I will be committing some previous off-set value. > > So only possible solution seems like to use Simple Consumer. This is how I > am using Simple Consumer for Commit offset : > > TopicAndPartition topicAndPartition = new > TopicAndPartition(topic,partition); > Map<TopicAndPartition, OffsetMetadataAndError> requestInfo = new > HashMap<TopicAndPartition, OffsetMetadataAndError>(); > requestInfo.put(topicAndPartition, new > OffsetMetadataAndError(0L,"no_metadata", (short) 0)); > kafka.javaapi.OffsetCommitRequest request = new > kafka.javaapi.OffsetCommitRequest("test", > requestInfo1,kafka.api.OffsetRequest.CurrentVersion(), 0, clientName); > kafka.javaapi.OffsetCommitResponse response = > consumer.commitOffsets(request); > > > I am getting EOFException > > Oops:java.io.EOFException: Received -1 when reading from channel, socket > has likely been closed. > java.io.EOFException: Received -1 when reading from channel, socket has > likely been closed. > at kafka.utils.Utils$.read(Utils.scala:376) > at > > kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54) > at kafka.network.Receive$class.readCompletely(Transmission.scala:56) > at > > kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29) > at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100) > at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:81) > at > > kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71) > at kafka.consumer.SimpleConsumer.fetchOffsets(SimpleConsumer.scala:138) > at > kafka.javaapi.consumer.SimpleConsumer.fetchOffsets(SimpleConsumer.scala:99) > at > > com.vizury.rtb.realtimelogging.OfflineLogConsumer.commitOffsetTest(OfflineLogConsumer.java:205) > at > > com.vizury.rtb.realtimelogging.OfflineLogConsumer.run(OfflineLogConsumer.java:147) > at > > com.vizury.rtb.realtimelogging.OfflineLogConsumer.main(OfflineLogConsumer.java:31) > > > Any help ?? same error I am getting with fetchOffsets() method, where as > getOffsetsBefore() is working fine. > -- -- Guozhang