Hi, list.

I'm trying to re-process a topic in Kafka but when I request for earliest
offsets. The code below always returns the same value as latest offsets (if
I replace OffsetRequest.EarliestTime() to OffsetRequest.LatestTime()).

Is there something that I missing? I'm pretty sure that this code worked
for me at some point in our project. Today, we're using Kafka 0.10 but our
library is spark-streaming-kafka_2.10:1.6.3 and that depends on
kafka-clients:0.8.2.1. And also, our application is running on Spark 1.6.3.

Any thoughts are welcome.

// Get the partitions and offsets
TopicAndPartition topicAndPartition = new TopicAndPartition(topic,
partMetadata.partitionId());
PartitionOffsetRequestInfo partitionRequestInfo = new
PartitionOffsetRequestInfo(kafka.api.OffsetRequest.EarliestTime(), 1);

Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo =
Collections.singletonMap(topicAndPartition, partitionRequestInfo);

OffsetRequest offsetRequest = new OffsetRequest(requestInfo,
kafka.api.OffsetRequest.CurrentVersion(), "spark");
OffsetResponse offsetResponse =
getConsumer(partMetadata.leader().host(),
partMetadata.leader().port()).getOffsetsBefore(offsetRequest);

System.out.println(offsetResponse.hasError());
long[] offsets = offsetResponse.offsets(topic, partMetadata.partitionId());
System.out.println("offsets.size: " + offsets.length);
if (offsets.length > 0) {
    StringBuilder result = new StringBuilder();
    result.append("topic: ").append(topic).append("; ");
    result.append("partitionId:
").append(partMetadata.partitionId()).append("; ");
    result.append("offset: ").append(offsets[0]).append("; ");
    result.append("offsetSize: ").append(offsets.length).append(";");

    System.out.println(result.toString() + "\n");
}

Reply via email to