Hi , I am new to Kafka, I have a use case in which My Consumer can't use auto commit offset feature, I have to go with option of manual Commit. With High level Consumer I have have constrain that consumer can commit only current offset, but in my case I will be committing some previous off-set value.
So only possible solution seems like to use Simple Consumer. This is how I am using Simple Consumer for Commit offset : TopicAndPartition topicAndPartition = new TopicAndPartition(topic,partition); Map<TopicAndPartition, OffsetMetadataAndError> requestInfo = new HashMap<TopicAndPartition, OffsetMetadataAndError>(); requestInfo.put(topicAndPartition, new OffsetMetadataAndError(0L,"no_metadata", (short) 0)); kafka.javaapi.OffsetCommitRequest request = new kafka.javaapi.OffsetCommitRequest("test", requestInfo1,kafka.api.OffsetRequest.CurrentVersion(), 0, clientName); kafka.javaapi.OffsetCommitResponse response = consumer.commitOffsets(request); I am getting EOFException Oops:java.io.EOFException: Received -1 when reading from channel, socket has likely been closed. java.io.EOFException: Received -1 when reading from channel, socket has likely been closed. at kafka.utils.Utils$.read(Utils.scala:376) at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54) at kafka.network.Receive$class.readCompletely(Transmission.scala:56) at kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29) at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100) at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:81) at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71) at kafka.consumer.SimpleConsumer.fetchOffsets(SimpleConsumer.scala:138) at kafka.javaapi.consumer.SimpleConsumer.fetchOffsets(SimpleConsumer.scala:99) at com.vizury.rtb.realtimelogging.OfflineLogConsumer.commitOffsetTest(OfflineLogConsumer.java:205) at com.vizury.rtb.realtimelogging.OfflineLogConsumer.run(OfflineLogConsumer.java:147) at com.vizury.rtb.realtimelogging.OfflineLogConsumer.main(OfflineLogConsumer.java:31) Any help ?? same error I am getting with fetchOffsets() method, where as getOffsetsBefore() is working fine.