Hi, list. I'm trying to re-process a topic in Kafka but when I request for earliest offsets. The code below always returns the same value as latest offsets (if I replace OffsetRequest.EarliestTime() to OffsetRequest.LatestTime()).
Is there something that I missing? I'm pretty sure that this code worked for me at some point in our project. Today, we're using Kafka 0.10 but our library is spark-streaming-kafka_2.10:1.6.3 and that depends on kafka-clients:0.8.2.1. And also, our application is running on Spark 1.6.3. Any thoughts are welcome. // Get the partitions and offsets TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partMetadata.partitionId()); PartitionOffsetRequestInfo partitionRequestInfo = new PartitionOffsetRequestInfo(kafka.api.OffsetRequest.EarliestTime(), 1); Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = Collections.singletonMap(topicAndPartition, partitionRequestInfo); OffsetRequest offsetRequest = new OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), "spark"); OffsetResponse offsetResponse = getConsumer(partMetadata.leader().host(), partMetadata.leader().port()).getOffsetsBefore(offsetRequest); System.out.println(offsetResponse.hasError()); long[] offsets = offsetResponse.offsets(topic, partMetadata.partitionId()); System.out.println("offsets.size: " + offsets.length); if (offsets.length > 0) { StringBuilder result = new StringBuilder(); result.append("topic: ").append(topic).append("; "); result.append("partitionId: ").append(partMetadata.partitionId()).append("; "); result.append("offset: ").append(offsets[0]).append("; "); result.append("offsetSize: ").append(offsets.length).append(";"); System.out.println(result.toString() + "\n"); }