I'm not too familiar with Spark but the "earliest"/"latest" configuration is only relevant if your consumer does not hold a valid offset.
If you read up to offset N, when you restart you'll start from N. If you start a new consumer then it has no offset, that's when the above configuration takes effect. To reprocess a topic you need to set your consumer's offset to 0 or change your consumer group name to a non-existing one. The former is preferable I believe. On Wed, May 17, 2017 at 1:38 PM Marcelo Oikawa <marcelo.oik...@webradar.com> wrote: > Hi, list. > > I'm trying to re-process a topic in Kafka but when I request for earliest > offsets. The code below always returns the same value as latest offsets (if > I replace OffsetRequest.EarliestTime() to OffsetRequest.LatestTime()). > > Is there something that I missing? I'm pretty sure that this code worked > for me at some point in our project. Today, we're using Kafka 0.10 but our > library is spark-streaming-kafka_2.10:1.6.3 and that depends on > kafka-clients:0.8.2.1. And also, our application is running on Spark 1.6.3. > > Any thoughts are welcome. > > // Get the partitions and offsets > TopicAndPartition topicAndPartition = new TopicAndPartition(topic, > partMetadata.partitionId()); > PartitionOffsetRequestInfo partitionRequestInfo = new > PartitionOffsetRequestInfo(kafka.api.OffsetRequest.EarliestTime(), 1); > > Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = > Collections.singletonMap(topicAndPartition, partitionRequestInfo); > > OffsetRequest offsetRequest = new OffsetRequest(requestInfo, > kafka.api.OffsetRequest.CurrentVersion(), "spark"); > OffsetResponse offsetResponse = > getConsumer(partMetadata.leader().host(), > partMetadata.leader().port()).getOffsetsBefore(offsetRequest); > > System.out.println(offsetResponse.hasError()); > long[] offsets = offsetResponse.offsets(topic, partMetadata.partitionId()); > System.out.println("offsets.size: " + offsets.length); > if (offsets.length > 0) { > StringBuilder result = new StringBuilder(); > result.append("topic: ").append(topic).append("; "); > result.append("partitionId: > ").append(partMetadata.partitionId()).append("; "); > result.append("offset: ").append(offsets[0]).append("; "); > result.append("offsetSize: ").append(offsets.length).append(";"); > > System.out.println(result.toString() + "\n"); > } >