KAFKA-4148; Support ListOffsetRequest v1 and search offsets by timestamp in 
consumer (KIP-79)

Author: Jiangjie Qin <becket....@gmail.com>

Reviewers: Jun Rao <jun...@gmail.com>, Ismael Juma <ism...@juma.me.uk>, Jason 
Gustafson <ja...@confluent.io>

Closes #1852 from becketqin/KAFKA-4148


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/eaaa433f
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/eaaa433f
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/eaaa433f

Branch: refs/heads/trunk
Commit: eaaa433fc97b86450833d8fcc8c9289ea35d47c0
Parents: cf8f4a7
Author: Jiangjie Qin <becket....@gmail.com>
Authored: Mon Sep 19 18:38:17 2016 -0700
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Mon Sep 19 18:38:27 2016 -0700

----------------------------------------------------------------------
 .../apache/kafka/clients/consumer/Consumer.java |  16 ++
 .../kafka/clients/consumer/KafkaConsumer.java   |  54 ++++++
 .../kafka/clients/consumer/MockConsumer.java    |  30 ++++
 .../clients/consumer/internals/Fetcher.java     | 179 +++++++++++++------
 .../apache/kafka/common/protocol/Protocol.java  |  40 ++++-
 .../kafka/common/record/OffsetAndTimestamp.java |  58 ++++++
 .../common/requests/ListOffsetRequest.java      | 109 ++++++++---
 .../common/requests/ListOffsetResponse.java     |  65 ++++++-
 .../clients/consumer/KafkaConsumerTest.java     |   4 +-
 .../internals/AbstractCoordinatorTest.java      |  10 +-
 .../clients/consumer/internals/FetcherTest.java |  31 ++--
 .../common/requests/RequestResponseTest.java    |  43 +++--
 core/src/main/scala/kafka/api/ApiVersion.scala  |  11 +-
 .../main/scala/kafka/log/FileMessageSet.scala   |  33 +---
 core/src/main/scala/kafka/log/Log.scala         |  83 ++++-----
 core/src/main/scala/kafka/log/LogSegment.scala  |  22 +--
 .../src/main/scala/kafka/server/KafkaApis.scala | 130 +++++++++++---
 .../kafka/server/ReplicaFetcherThread.scala     |  31 ++--
 .../kafka/api/PlaintextConsumerTest.scala       |  84 ++++++++-
 .../scala/unit/kafka/log/LogSegmentTest.scala   |  21 +--
 .../src/test/scala/unit/kafka/log/LogTest.scala |   8 +-
 docs/upgrade.html                               |  26 ++-
 22 files changed, 837 insertions(+), 251 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/eaaa433f/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
index 6f5a6b6..06e1bec 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
@@ -16,6 +16,7 @@ import org.apache.kafka.common.Metric;
 import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.record.OffsetAndTimestamp;
 
 import java.io.Closeable;
 import java.util.Collection;
@@ -151,6 +152,21 @@ public interface Consumer<K, V> extends Closeable {
     public void resume(Collection<TopicPartition> partitions);
 
     /**
+     * @see KafkaConsumer#offsetsForTimes(java.util.Map)
+     */
+    public Map<TopicPartition, OffsetAndTimestamp> 
offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch);
+
+    /**
+     * @see KafkaConsumer#beginningOffsets(java.util.Collection)
+     */
+    public Map<TopicPartition, Long> 
beginningOffsets(Collection<TopicPartition> partitions);
+
+    /**
+     * @see KafkaConsumer#endOffsets(java.util.Collection)
+     */
+    public Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> 
partitions);
+
+    /**
      * @see KafkaConsumer#close()
      */
     public void close();

http://git-wip-us.apache.org/repos/asf/kafka/blob/eaaa433f/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 108c0cb..889aad8 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -37,6 +37,7 @@ import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.metrics.MetricsReporter;
 import org.apache.kafka.common.network.ChannelBuilder;
 import org.apache.kafka.common.network.Selector;
+import org.apache.kafka.common.record.OffsetAndTimestamp;
 import org.apache.kafka.common.requests.MetadataRequest;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.utils.AppInfoParser;
@@ -1400,6 +1401,59 @@ public class KafkaConsumer<K, V> implements Consumer<K, 
V> {
     }
 
     /**
+     * Look up the offsets for the given partitions by timestamp. The returned 
offset for each partition is the
+     * earliest offset whose timestamp is greater than or equal to the given 
timestamp in the corresponding partition.
+     *
+     * This is a blocking call. The consumer does not have to be assigned the 
partitions.
+     * If the message format version in a partition is before 0.10.0, i.e. the 
messages do not have timestamps, null
+     * will be returned for that partition.
+     *
+     * Notice that this method may block indefinitely if the partition does 
not exist.
+     *
+     * @param timestampsToSearch the mapping from partition to the timestamp 
to look up.
+     * @return a mapping from partition to the timestamp and offset of the 
first message with timestamp greater
+     *         than or equal to the target timestamp. {@code null} will be 
returned for the partition if there is no
+     *         such message.
+     * @throws IllegalArgumentException if the target timestamp is negative.
+     */
+    @Override
+    public Map<TopicPartition, OffsetAndTimestamp> 
offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch) {
+        for (Map.Entry<TopicPartition, Long> entry : 
timestampsToSearch.entrySet()) {
+            if (entry.getValue() < 0)
+                throw new IllegalArgumentException("The target time for 
partition " + entry.getKey() + " is " +
+                        entry.getValue() + ". The target time cannot be 
negative.");
+        }
+        return fetcher.getOffsetsByTimes(timestampsToSearch);
+    }
+
+    /**
+     * Get the earliest available offsets for the given partitions.
+     *
+     * Notice that this method may block indefinitely if the partition does 
not exist.
+     *
+     * @param partitions the partitions to get the earliest offsets.
+     * @return The earliest available offsets for the given partitions
+     */
+    @Override
+    public Map<TopicPartition, Long> 
beginningOffsets(Collection<TopicPartition> partitions) {
+        return fetcher.earliestOffsets(partitions);
+    }
+
+    /**
+     * Get the end offsets for the given partitions. The end offset of a 
partition is the offset of the upcoming
+     * message, i.e. the offset of the last available message + 1.
+     *
+     * Notice that this method may block indefinitely if the partition does 
not exist.
+     *
+     * @param partitions the partitions to get the end offsets.
+     * @return The end offsets for the given partitions.
+     */
+    @Override
+    public Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> 
partitions) {
+        return fetcher.latestOffsets(partitions);
+    }
+
+    /**
      * Close the consumer, waiting indefinitely for any needed cleanup. If 
auto-commit is enabled, this
      * will commit the current offsets. Note that {@link #wakeup()} cannot be 
use to interrupt close.
      */

http://git-wip-us.apache.org/repos/asf/kafka/blob/eaaa433f/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
index 62eb77d..3af2344 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
@@ -20,6 +20,7 @@ import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.record.OffsetAndTimestamp;
 
 import java.util.ArrayList;
 import java.util.Collection;
@@ -299,6 +300,35 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
     }
 
     @Override
+    public Map<TopicPartition, OffsetAndTimestamp> 
offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch) {
+        return null;
+    }
+
+    @Override
+    public Map<TopicPartition, Long> 
beginningOffsets(Collection<TopicPartition> partitions) {
+        Map<TopicPartition, Long> result = new HashMap<>();
+        for (TopicPartition tp : partitions) {
+            Long beginningOffset = beginningOffsets.get(tp);
+            if (beginningOffset == null)
+                throw new IllegalStateException("The partition " + tp + " does 
not have a beginning offset.");
+            result.put(tp, beginningOffset);
+        }
+        return result;
+    }
+
+    @Override
+    public Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> 
partitions) {
+        Map<TopicPartition, Long> result = new HashMap<>();
+        for (TopicPartition tp : partitions) {
+            Long endOffset = endOffsets.get(tp);
+            if (endOffset == null)
+                throw new IllegalStateException("The partition " + tp + " does 
not have an end offset.");
+            result.put(tp, endOffset);
+        }
+        return result;
+    }
+
+    @Override
     public void close() {
         ensureNotClosed();
         this.closed = true;

http://git-wip-us.apache.org/repos/asf/kafka/blob/eaaa433f/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index 202c0ad..14f7c5d 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -41,6 +41,7 @@ import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.record.InvalidRecordException;
 import org.apache.kafka.common.record.LogEntry;
 import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.OffsetAndTimestamp;
 import org.apache.kafka.common.record.Record;
 import org.apache.kafka.common.record.TimestampType;
 import org.apache.kafka.common.requests.FetchRequest;
@@ -57,6 +58,7 @@ import org.slf4j.LoggerFactory;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -351,23 +353,16 @@ public class Fetcher<K, V> {
             throw new NoOffsetForPartitionException(partition);
 
         log.debug("Resetting offset for partition {} to {} offset.", 
partition, strategy.name().toLowerCase(Locale.ROOT));
-        long offset = listOffset(partition, timestamp);
+        long offset = getOffsetsByTimes(Collections.singletonMap(partition, 
timestamp)).get(partition).offset();
 
         // we might lose the assignment while fetching the offset, so check it 
is still active
         if (subscriptions.isAssigned(partition))
             this.subscriptions.seek(partition, offset);
     }
 
-    /**
-     * Fetch a single offset before the given timestamp for the partition.
-     *
-     * @param partition The partition that needs fetching offset.
-     * @param timestamp The timestamp for fetching offset.
-     * @return The offset of the message that is published before the given 
timestamp
-     */
-    private long listOffset(TopicPartition partition, long timestamp) {
+    public Map<TopicPartition, OffsetAndTimestamp> 
getOffsetsByTimes(Map<TopicPartition, Long> timestampsToSearch) {
         while (true) {
-            RequestFuture<Long> future = sendListOffsetRequest(partition, 
timestamp);
+            RequestFuture<Map<TopicPartition, OffsetAndTimestamp>> future = 
sendListOffsetRequests(timestampsToSearch);
             client.poll(future);
 
             if (future.succeeded())
@@ -383,6 +378,25 @@ public class Fetcher<K, V> {
         }
     }
 
+    public Map<TopicPartition, Long> 
earliestOffsets(Collection<TopicPartition> partitions) {
+        return earliestOrLatestOffset(partitions, 
ListOffsetRequest.EARLIEST_TIMESTAMP);
+    }
+
+    public Map<TopicPartition, Long> latestOffsets(Collection<TopicPartition> 
partitions) {
+        return earliestOrLatestOffset(partitions, 
ListOffsetRequest.LATEST_TIMESTAMP);
+    }
+
+    private Map<TopicPartition, Long> 
earliestOrLatestOffset(Collection<TopicPartition> partitions, long timestamp) {
+        Map<TopicPartition, Long> timestampsToSearch = new HashMap<>();
+        for (TopicPartition tp : partitions)
+            timestampsToSearch.put(tp, timestamp);
+        Map<TopicPartition, Long> result = new HashMap<>();
+        for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry : 
getOffsetsByTimes(timestampsToSearch).entrySet())
+            result.put(entry.getKey(), entry.getValue().offset());
+
+        return result;
+    }
+
     /**
      * Return the fetched records, empty the record buffer and update the 
consumed position.
      *
@@ -457,64 +471,121 @@ public class Fetcher<K, V> {
     }
 
     /**
-     * Fetch a single offset before the given timestamp for the partition.
+     * Search the offsets by target times for the specified partitions.
      *
-     * @param topicPartition The partition that needs fetching offset.
-     * @param timestamp The timestamp for fetching offset.
-     * @return A response which can be polled to obtain the corresponding 
offset.
+     * @param timestampsToSearch the mapping between partitions and target time
+     * @return A response which can be polled to obtain the corresponding 
timestamps and offsets.
      */
-    private RequestFuture<Long> sendListOffsetRequest(final TopicPartition 
topicPartition, long timestamp) {
-        Map<TopicPartition, ListOffsetRequest.PartitionData> partitions = new 
HashMap<>(1);
-        partitions.put(topicPartition, new 
ListOffsetRequest.PartitionData(timestamp, 1));
-        PartitionInfo info = metadata.fetch().partition(topicPartition);
-        if (info == null) {
-            metadata.add(topicPartition.topic());
-            log.debug("Partition {} is unknown for fetching offset, wait for 
metadata refresh", topicPartition);
-            return RequestFuture.staleMetadata();
-        } else if (info.leader() == null) {
-            log.debug("Leader for partition {} unavailable for fetching 
offset, wait for metadata refresh", topicPartition);
-            return RequestFuture.leaderNotAvailable();
-        } else {
-            Node node = info.leader();
-            ListOffsetRequest request = new ListOffsetRequest(-1, partitions);
-            return client.send(node, ApiKeys.LIST_OFFSETS, request)
-                    .compose(new RequestFutureAdapter<ClientResponse, Long>() {
+    private RequestFuture<Map<TopicPartition, OffsetAndTimestamp>> 
sendListOffsetRequests(final Map<TopicPartition, Long> timestampsToSearch) {
+        // Group the partitions by node.
+        final Map<Node, Map<TopicPartition, Long>> timestampsToSearchByNode = 
new HashMap<>();
+        for (Map.Entry<TopicPartition, Long> entry: 
timestampsToSearch.entrySet()) {
+            TopicPartition tp  = entry.getKey();
+            PartitionInfo info = metadata.fetch().partition(tp);
+            if (info == null) {
+                metadata.add(tp.topic());
+                log.debug("Partition {} is unknown for fetching offset, wait 
for metadata refresh", tp);
+                return RequestFuture.staleMetadata();
+            } else if (info.leader() == null) {
+                log.debug("Leader for partition {} unavailable for fetching 
offset, wait for metadata refresh", tp);
+                return RequestFuture.leaderNotAvailable();
+            } else {
+                Node node = info.leader();
+                Map<TopicPartition, Long> topicData = 
timestampsToSearchByNode.get(node);
+                if (topicData == null) {
+                    topicData = new HashMap<>();
+                    timestampsToSearchByNode.put(node, topicData);
+                }
+                topicData.put(entry.getKey(), entry.getValue());
+            }
+        }
+
+        final RequestFuture<Map<TopicPartition, OffsetAndTimestamp>> 
listOffsetRequestsFuture = new RequestFuture<>();
+        final Map<TopicPartition, OffsetAndTimestamp> fetchedTimestampOffsets 
= new HashMap<>();
+        for (Map.Entry<Node, Map<TopicPartition, Long>> entry : 
timestampsToSearchByNode.entrySet()) {
+            sendListOffsetRequest(entry.getKey(), entry.getValue())
+                    .addListener(new RequestFutureListener<Map<TopicPartition, 
OffsetAndTimestamp>>() {
                         @Override
-                        public void onSuccess(ClientResponse response, 
RequestFuture<Long> future) {
-                            handleListOffsetResponse(topicPartition, response, 
future);
+                        public void onSuccess(Map<TopicPartition, 
OffsetAndTimestamp> value) {
+                            synchronized (listOffsetRequestsFuture) {
+                                fetchedTimestampOffsets.putAll(value);
+                                if (fetchedTimestampOffsets.size() == 
timestampsToSearch.size() && !listOffsetRequestsFuture.isDone())
+                                    
listOffsetRequestsFuture.complete(fetchedTimestampOffsets);
+                            }
+                        }
+
+                        @Override
+                        public void onFailure(RuntimeException e) {
+                            synchronized (listOffsetRequestsFuture) {
+                                // This may cause all the requests to be 
retried, but should be rare.
+                                if (!listOffsetRequestsFuture.isDone())
+                                    listOffsetRequestsFuture.raise(e);
+                            }
                         }
                     });
         }
+        return listOffsetRequestsFuture;
+    }
+
+    /**
+     * Send the ListOffsetRequest to a specific broker for the partitions and 
target timestamps.
+     *
+     * @param node The node to send the ListOffsetRequest to.
+     * @param timestampsToSearch The mapping from partitions to the target 
timestamps.
+     * @return A response which can be polled to obtain the corresponding 
timestamps and offsets.
+     */
+    private RequestFuture<Map<TopicPartition, OffsetAndTimestamp>> 
sendListOffsetRequest(Node node,
+                                                                               
          final Map<TopicPartition, Long> timestampsToSearch) {
+        ListOffsetRequest request = new ListOffsetRequest(timestampsToSearch, 
ListOffsetRequest.CONSUMER_REPLICA_ID);
+        log.trace("Sending ListOffsetRequest {} to broker {}", request, node);
+        return client.send(node, ApiKeys.LIST_OFFSETS, request)
+                .compose(new RequestFutureAdapter<ClientResponse, 
Map<TopicPartition, OffsetAndTimestamp>>() {
+                    @Override
+                    public void onSuccess(ClientResponse response, 
RequestFuture<Map<TopicPartition, OffsetAndTimestamp>> future) {
+                        handleListOffsetResponse(timestampsToSearch, response, 
future);
+                    }
+                });
     }
 
     /**
      * Callback for the response of the list offset call above.
-     * @param topicPartition The partition that was fetched
+     * @param timestampsToSearch The mapping from partitions to target 
timestamps
      * @param clientResponse The response from the server.
+     * @param future The future to be completed by the response.
      */
-    private void handleListOffsetResponse(TopicPartition topicPartition,
+    private void handleListOffsetResponse(Map<TopicPartition, Long> 
timestampsToSearch,
                                           ClientResponse clientResponse,
-                                          RequestFuture<Long> future) {
+                                          RequestFuture<Map<TopicPartition, 
OffsetAndTimestamp>> future) {
         ListOffsetResponse lor = new 
ListOffsetResponse(clientResponse.responseBody());
-        short errorCode = lor.responseData().get(topicPartition).errorCode;
-        if (errorCode == Errors.NONE.code()) {
-            List<Long> offsets = 
lor.responseData().get(topicPartition).offsets;
-            if (offsets.size() != 1)
-                throw new IllegalStateException("This should not happen.");
-            long offset = offsets.get(0);
-            log.debug("Fetched offset {} for partition {}", offset, 
topicPartition);
-
-            future.complete(offset);
-        } else if (errorCode == Errors.NOT_LEADER_FOR_PARTITION.code()
-                || errorCode == Errors.UNKNOWN_TOPIC_OR_PARTITION.code()) {
-            log.debug("Attempt to fetch offsets for partition {} failed due to 
obsolete leadership information, retrying.",
-                    topicPartition);
-            future.raise(Errors.forCode(errorCode));
-        } else {
-            log.warn("Attempt to fetch offsets for partition {} failed due to: 
{}",
-                    topicPartition, Errors.forCode(errorCode).message());
-            future.raise(new StaleMetadataException());
+        Map<TopicPartition, OffsetAndTimestamp> timestampOffsetMap = new 
HashMap<>();
+        for (Map.Entry<TopicPartition, Long> entry : 
timestampsToSearch.entrySet()) {
+            TopicPartition topicPartition = entry.getKey();
+            ListOffsetResponse.PartitionData partitionData = 
lor.responseData().get(topicPartition);
+            Errors error = Errors.forCode(partitionData.errorCode);
+            if (error == Errors.NONE) {
+                OffsetAndTimestamp offsetAndTimestamp = null;
+                if (partitionData.offset != ListOffsetResponse.UNKNOWN_OFFSET)
+                    offsetAndTimestamp = new 
OffsetAndTimestamp(partitionData.offset, partitionData.timestamp);
+                log.debug("Fetched {} for partition {}", offsetAndTimestamp, 
topicPartition);
+                timestampOffsetMap.put(topicPartition, offsetAndTimestamp);
+            } else if (error == Errors.INVALID_REQUEST) {
+                // The message format on the broker side is before 0.10.0, we 
simply put null in the response.
+                log.debug("Cannot search by timestamp for partition {} because 
the message format version " +
+                        "is before 0.10.0", topicPartition);
+                timestampOffsetMap.put(topicPartition, null);
+            } else if (error == Errors.NOT_LEADER_FOR_PARTITION
+                    || error == Errors.UNKNOWN_TOPIC_OR_PARTITION) {
+                log.debug("Attempt to fetch offsets for partition {} failed 
due to obsolete leadership information, retrying.",
+                        topicPartition);
+                future.raise(error);
+            } else {
+                log.warn("Attempt to fetch offsets for partition {} failed due 
to: {}",
+                        topicPartition, error.message());
+                future.raise(new StaleMetadataException());
+            }
         }
+        if (!future.isDone())
+            future.complete(timestampOffsetMap);
     }
 
     private List<TopicPartition> fetchablePartitions() {
@@ -550,7 +621,7 @@ public class Fetcher<K, V> {
                 fetch.put(partition, new FetchRequest.PartitionData(position, 
this.fetchSize));
                 log.trace("Added fetch request for partition {} at offset {}", 
partition, position);
             } else {
-                log.trace("Skipping fetch for partition {} because there is an 
inflight request to {}", partition, node);
+                log.trace("Skipping fetch for partition {} because there is an 
in-flight request to {}", partition, node);
             }
         }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/eaaa433f/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java 
b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
index d64cf6d..5abf125 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
@@ -375,6 +375,12 @@ public class Protocol {
                                                                              
new Field("max_num_offsets",
                                                                                
        INT32,
                                                                                
        "Maximum offsets to return."));
+    public static final Schema LIST_OFFSET_REQUEST_PARTITION_V1 = new 
Schema(new Field("partition",
+                                                                               
        INT32,
+                                                                               
        "Topic partition id."),
+                                                                             
new Field("timestamp",
+                                                                               
        INT64,
+                                                                               
        "The target timestamp for the partition."));
 
     public static final Schema LIST_OFFSET_REQUEST_TOPIC_V0 = new Schema(new 
Field("topic",
                                                                                
    STRING,
@@ -382,6 +388,12 @@ public class Protocol {
                                                                          new 
Field("partitions",
                                                                                
    new ArrayOf(LIST_OFFSET_REQUEST_PARTITION_V0),
                                                                                
    "Partitions to list offset."));
+    public static final Schema LIST_OFFSET_REQUEST_TOPIC_V1 = new Schema(new 
Field("topic",
+                                                                               
    STRING,
+                                                                               
    "Topic to list offset."),
+                                                                         new 
Field("partitions",
+                                                                               
    new ArrayOf(LIST_OFFSET_REQUEST_PARTITION_V1),
+                                                                               
    "Partitions to list offset."));
 
     public static final Schema LIST_OFFSET_REQUEST_V0 = new Schema(new 
Field("replica_id",
                                                                              
INT32,
@@ -389,6 +401,12 @@ public class Protocol {
                                                                    new 
Field("topics",
                                                                              
new ArrayOf(LIST_OFFSET_REQUEST_TOPIC_V0),
                                                                              
"Topics to list offsets."));
+    public static final Schema LIST_OFFSET_REQUEST_V1 = new Schema(new 
Field("replica_id",
+                                                                             
INT32,
+                                                                             
"Broker id of the follower. For normal consumers, use -1."),
+                                                                   new 
Field("topics",
+                                                                             
new ArrayOf(LIST_OFFSET_REQUEST_TOPIC_V1),
+                                                                             
"Topics to list offsets."));
 
     public static final Schema LIST_OFFSET_RESPONSE_PARTITION_V0 = new 
Schema(new Field("partition",
                                                                                
         INT32,
@@ -398,15 +416,33 @@ public class Protocol {
                                                                                
         new ArrayOf(INT64),
                                                                                
         "A list of offsets."));
 
+    public static final Schema LIST_OFFSET_RESPONSE_PARTITION_V1 = new 
Schema(new Field("partition",
+                                                                               
         INT32,
+                                                                               
         "Topic partition id."),
+                                                                              
new Field("error_code", INT16),
+                                                                              
new Field("timestamp",
+                                                                               
         INT64,
+                                                                               
         "The timestamp associated with the returned offset"),
+                                                                              
new Field("offset",
+                                                                               
         INT64,
+                                                                               
         "offsets found"));
+
     public static final Schema LIST_OFFSET_RESPONSE_TOPIC_V0 = new Schema(new 
Field("topic", STRING),
                                                                           new 
Field("partition_responses",
                                                                                
     new ArrayOf(LIST_OFFSET_RESPONSE_PARTITION_V0)));
 
+    public static final Schema LIST_OFFSET_RESPONSE_TOPIC_V1 = new Schema(new 
Field("topic", STRING),
+                                                                          new 
Field("partition_responses",
+                                                                               
     new ArrayOf(LIST_OFFSET_RESPONSE_PARTITION_V1)));
+
     public static final Schema LIST_OFFSET_RESPONSE_V0 = new Schema(new 
Field("responses",
                                                                               
new ArrayOf(LIST_OFFSET_RESPONSE_TOPIC_V0)));
 
-    public static final Schema[] LIST_OFFSET_REQUEST = new Schema[] 
{LIST_OFFSET_REQUEST_V0};
-    public static final Schema[] LIST_OFFSET_RESPONSE = new Schema[] 
{LIST_OFFSET_RESPONSE_V0};
+    public static final Schema LIST_OFFSET_RESPONSE_V1 = new Schema(new 
Field("responses",
+                                                                              
new ArrayOf(LIST_OFFSET_RESPONSE_TOPIC_V1)));
+
+    public static final Schema[] LIST_OFFSET_REQUEST = new Schema[] 
{LIST_OFFSET_REQUEST_V0, LIST_OFFSET_REQUEST_V1};
+    public static final Schema[] LIST_OFFSET_RESPONSE = new Schema[] 
{LIST_OFFSET_RESPONSE_V0, LIST_OFFSET_RESPONSE_V1};
 
     /* Fetch api */
     public static final Schema FETCH_REQUEST_PARTITION_V0 = new Schema(new 
Field("partition",

http://git-wip-us.apache.org/repos/asf/kafka/blob/eaaa433f/clients/src/main/java/org/apache/kafka/common/record/OffsetAndTimestamp.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/record/OffsetAndTimestamp.java 
b/clients/src/main/java/org/apache/kafka/common/record/OffsetAndTimestamp.java
new file mode 100644
index 0000000..562585b
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/common/record/OffsetAndTimestamp.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.record;
+
+import java.util.Objects;
+
+/**
+ * A container class for offset and timestamp.
+ */
+public final class OffsetAndTimestamp {
+    private final long timestamp;
+    private final long offset;
+
+    public OffsetAndTimestamp(long offset, long timestamp) {
+        this.offset = offset;
+        this.timestamp = timestamp;
+    }
+
+    public long timestamp() {
+        return timestamp;
+    }
+
+    public long offset() {
+        return offset;
+    }
+
+    @Override
+    public String toString() {
+        return "{Timestamp = " + timestamp + ", Offset = " + offset + "}";
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(timestamp, offset);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (o == null || !(o instanceof OffsetAndTimestamp))
+            return false;
+        OffsetAndTimestamp other = (OffsetAndTimestamp) o;
+        return this.timestamp == other.timestamp() && this.offset == 
other.offset();
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/eaaa433f/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java 
b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java
index a3777e2..7e586a4 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java
@@ -26,9 +26,12 @@ import org.apache.kafka.common.utils.CollectionUtils;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 public class ListOffsetRequest extends AbstractRequest {
 
@@ -53,7 +56,13 @@ public class ListOffsetRequest extends AbstractRequest {
 
     private final int replicaId;
     private final Map<TopicPartition, PartitionData> offsetData;
+    private final Map<TopicPartition, Long> partitionTimestamps;
+    private final Set<TopicPartition> duplicatePartitions;
 
+    /**
+     * This class is only used by ListOffsetRequest v0 which has been 
deprecated.
+     */
+    @Deprecated
     public static final class PartitionData {
         public final long timestamp;
         public final int maxNumOffsets;
@@ -64,40 +73,76 @@ public class ListOffsetRequest extends AbstractRequest {
         }
     }
 
+    /**
+     * Constructor for ListOffsetRequest v0
+     */
+    @Deprecated
     public ListOffsetRequest(Map<TopicPartition, PartitionData> offsetData) {
-        this(CONSUMER_REPLICA_ID, offsetData);
+        this(CONSUMER_REPLICA_ID, offsetData, 0);
     }
 
+    /**
+     * Constructor for ListOffsetRequest v0
+     */
+    @Deprecated
     public ListOffsetRequest(int replicaId, Map<TopicPartition, PartitionData> 
offsetData) {
-        super(new Struct(CURRENT_SCHEMA));
-        Map<String, Map<Integer, PartitionData>> topicsData = 
CollectionUtils.groupDataByTopic(offsetData);
+        this(replicaId, offsetData, 0);
+    }
+
+    /**
+     * Constructor for ListOffsetRequest v1.
+     */
+    public ListOffsetRequest(Map<TopicPartition, ?> targetTimes, int 
replicaId) {
+        this(replicaId, targetTimes, 1);
+    }
+
+    /**
+     * Private constructor with a specified version.
+     */
+    @SuppressWarnings("unchecked")
+    private ListOffsetRequest(int replicaId, Map<TopicPartition, ?> 
targetTimes, int version) {
+        super(new Struct(ProtoUtils.requestSchema(ApiKeys.LIST_OFFSETS.id, 
version)));
+        Map<String, Map<Integer, Object>> topicsData =
+                CollectionUtils.groupDataByTopic((Map<TopicPartition, Object>) 
targetTimes);
 
         struct.set(REPLICA_ID_KEY_NAME, replicaId);
         List<Struct> topicArray = new ArrayList<Struct>();
-        for (Map.Entry<String, Map<Integer, PartitionData>> topicEntry: 
topicsData.entrySet()) {
+        for (Map.Entry<String, Map<Integer, Object>> topicEntry: 
topicsData.entrySet()) {
             Struct topicData = struct.instance(TOPICS_KEY_NAME);
             topicData.set(TOPIC_KEY_NAME, topicEntry.getKey());
             List<Struct> partitionArray = new ArrayList<Struct>();
-            for (Map.Entry<Integer, PartitionData> partitionEntry : 
topicEntry.getValue().entrySet()) {
-                PartitionData offsetPartitionData = partitionEntry.getValue();
-                Struct partitionData = topicData.instance(PARTITIONS_KEY_NAME);
-                partitionData.set(PARTITION_KEY_NAME, partitionEntry.getKey());
-                partitionData.set(TIMESTAMP_KEY_NAME, 
offsetPartitionData.timestamp);
-                partitionData.set(MAX_NUM_OFFSETS_KEY_NAME, 
offsetPartitionData.maxNumOffsets);
-                partitionArray.add(partitionData);
+            for (Map.Entry<Integer, Object> partitionEntry : 
topicEntry.getValue().entrySet()) {
+                if (version == 0) {
+                    PartitionData offsetPartitionData = (PartitionData) 
partitionEntry.getValue();
+                    Struct partitionData = 
topicData.instance(PARTITIONS_KEY_NAME);
+                    partitionData.set(PARTITION_KEY_NAME, 
partitionEntry.getKey());
+                    partitionData.set(TIMESTAMP_KEY_NAME, 
offsetPartitionData.timestamp);
+                    partitionData.set(MAX_NUM_OFFSETS_KEY_NAME, 
offsetPartitionData.maxNumOffsets);
+                    partitionArray.add(partitionData);
+                } else {
+                    Long timestamp = (Long) partitionEntry.getValue();
+                    Struct partitionData = 
topicData.instance(PARTITIONS_KEY_NAME);
+                    partitionData.set(PARTITION_KEY_NAME, 
partitionEntry.getKey());
+                    partitionData.set(TIMESTAMP_KEY_NAME, timestamp);
+                    partitionArray.add(partitionData);
+                }
             }
             topicData.set(PARTITIONS_KEY_NAME, partitionArray.toArray());
             topicArray.add(topicData);
         }
         struct.set(TOPICS_KEY_NAME, topicArray.toArray());
         this.replicaId = replicaId;
-        this.offsetData = offsetData;
+        this.offsetData = version == 0 ? (Map<TopicPartition, PartitionData>) 
targetTimes : null;
+        this.partitionTimestamps = version == 1 ? (Map<TopicPartition, Long>) 
targetTimes : null;
+        this.duplicatePartitions = Collections.emptySet();
     }
 
     public ListOffsetRequest(Struct struct) {
         super(struct);
+        Set<TopicPartition> duplicatePatitions = new HashSet<>();
         replicaId = struct.getInt(REPLICA_ID_KEY_NAME);
-        offsetData = new HashMap<TopicPartition, PartitionData>();
+        offsetData = new HashMap<>();
+        partitionTimestamps = new HashMap<>();
         for (Object topicResponseObj : struct.getArray(TOPICS_KEY_NAME)) {
             Struct topicResponse = (Struct) topicResponseObj;
             String topic = topicResponse.getString(TOPIC_KEY_NAME);
@@ -105,25 +150,40 @@ public class ListOffsetRequest extends AbstractRequest {
                 Struct partitionResponse = (Struct) partitionResponseObj;
                 int partition = partitionResponse.getInt(PARTITION_KEY_NAME);
                 long timestamp = partitionResponse.getLong(TIMESTAMP_KEY_NAME);
-                int maxNumOffsets = 
partitionResponse.getInt(MAX_NUM_OFFSETS_KEY_NAME);
-                PartitionData partitionData = new PartitionData(timestamp, 
maxNumOffsets);
-                offsetData.put(new TopicPartition(topic, partition), 
partitionData);
+                TopicPartition tp = new TopicPartition(topic, partition);
+                if (partitionResponse.hasField(MAX_NUM_OFFSETS_KEY_NAME)) {
+                    int maxNumOffsets = 
partitionResponse.getInt(MAX_NUM_OFFSETS_KEY_NAME);
+                    PartitionData partitionData = new PartitionData(timestamp, 
maxNumOffsets);
+                    offsetData.put(tp, partitionData);
+                } else {
+                    if (partitionTimestamps.put(tp, timestamp) != null)
+                        duplicatePatitions.add(tp);
+                }
             }
         }
+        this.duplicatePartitions = duplicatePatitions;
     }
 
     @Override
     public AbstractRequestResponse getErrorResponse(int versionId, Throwable 
e) {
         Map<TopicPartition, ListOffsetResponse.PartitionData> responseData = 
new HashMap<TopicPartition, ListOffsetResponse.PartitionData>();
 
-        for (Map.Entry<TopicPartition, PartitionData> entry: 
offsetData.entrySet()) {
-            ListOffsetResponse.PartitionData partitionResponse = new 
ListOffsetResponse.PartitionData(Errors.forException(e).code(), new 
ArrayList<Long>());
-            responseData.put(entry.getKey(), partitionResponse);
+        if (versionId == 0) {
+            for (Map.Entry<TopicPartition, PartitionData> entry : 
offsetData.entrySet()) {
+                ListOffsetResponse.PartitionData partitionResponse = new 
ListOffsetResponse.PartitionData(Errors.forException(e).code(), new 
ArrayList<Long>());
+                responseData.put(entry.getKey(), partitionResponse);
+            }
+        } else {
+            for (Map.Entry<TopicPartition, Long> entry : 
partitionTimestamps.entrySet()) {
+                ListOffsetResponse.PartitionData partitionResponse = new 
ListOffsetResponse.PartitionData(Errors.forException(e).code(), -1L, -1L);
+                responseData.put(entry.getKey(), partitionResponse);
+            }
         }
 
         switch (versionId) {
             case 0:
-                return new ListOffsetResponse(responseData);
+            case 1:
+                return new ListOffsetResponse(responseData, versionId);
             default:
                 throw new IllegalArgumentException(String.format("Version %d 
is not valid. Valid versions for %s are 0 to %d",
                         versionId, this.getClass().getSimpleName(), 
ProtoUtils.latestVersion(ApiKeys.LIST_OFFSETS.id)));
@@ -134,10 +194,19 @@ public class ListOffsetRequest extends AbstractRequest {
         return replicaId;
     }
 
+    @Deprecated
     public Map<TopicPartition, PartitionData> offsetData() {
         return offsetData;
     }
 
+    public Map<TopicPartition, Long> partitionTimestamps() {
+        return partitionTimestamps;
+    }
+
+    public Set<TopicPartition> duplicatePartitions() {
+        return duplicatePartitions;
+    }
+
     public static ListOffsetRequest parse(ByteBuffer buffer, int versionId) {
         return new 
ListOffsetRequest(ProtoUtils.parseRequest(ApiKeys.LIST_OFFSETS.id, versionId, 
buffer));
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/eaaa433f/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java
index 5befe14..bc8c8d6 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java
@@ -30,6 +30,8 @@ import java.util.List;
 import java.util.Map;
 
 public class ListOffsetResponse extends AbstractRequestResponse {
+    public static final long UNKNOWN_TIMESTAMP = -1L;
+    public static final long UNKNOWN_OFFSET = -1L;
     
     private static final Schema CURRENT_SCHEMA = 
ProtoUtils.currentResponseSchema(ApiKeys.LIST_OFFSETS.id);
     private static final String RESPONSES_KEY_NAME = "responses";
@@ -47,25 +49,58 @@ public class ListOffsetResponse extends 
AbstractRequestResponse {
      *
      *  UNKNOWN_TOPIC_OR_PARTITION (3)
      *  NOT_LEADER_FOR_PARTITION (6)
+     *  INVALID_REQUEST (42)
      *  UNKNOWN (-1)
      */
 
+    // This key is only used by ListOffsetResponse v0
+    @Deprecated
     private static final String OFFSETS_KEY_NAME = "offsets";
+    private static final String TIMESTAMP_KEY_NAME = "timestamp";
+    private static final String OFFSET_KEY_NAME = "offset";
 
     private final Map<TopicPartition, PartitionData> responseData;
 
     public static final class PartitionData {
         public final short errorCode;
+        // The offsets list is only used in ListOffsetResponse v0.
+        @Deprecated
         public final List<Long> offsets;
+        public final Long timestamp;
+        public final Long offset;
 
+        /**
+         * Constructor for ListOffsetResponse v0
+         */
+        @Deprecated
         public PartitionData(short errorCode, List<Long> offsets) {
             this.errorCode = errorCode;
             this.offsets = offsets;
+            this.timestamp = null;
+            this.offset = null;
+        }
+
+        /**
+         * Constructor for ListOffsetResponse v1
+         */
+        public PartitionData(short errorCode, long timestamp, long offset) {
+            this.errorCode = errorCode;
+            this.timestamp = timestamp;
+            this.offset = offset;
+            this.offsets = null;
         }
     }
 
+    /**
+     * Constructor for ListOffsetResponse v0.
+     */
+    @Deprecated
     public ListOffsetResponse(Map<TopicPartition, PartitionData> responseData) 
{
-        super(new Struct(CURRENT_SCHEMA));
+        this(responseData, 0);
+    }
+
+    public ListOffsetResponse(Map<TopicPartition, PartitionData> responseData, 
int version) {
+        super(new Struct(ProtoUtils.responseSchema(ApiKeys.LIST_OFFSETS.id, 
version)));
         Map<String, Map<Integer, PartitionData>> topicsData = 
CollectionUtils.groupDataByTopic(responseData);
 
         List<Struct> topicArray = new ArrayList<Struct>();
@@ -78,7 +113,12 @@ public class ListOffsetResponse extends 
AbstractRequestResponse {
                 Struct partitionData = topicData.instance(PARTITIONS_KEY_NAME);
                 partitionData.set(PARTITION_KEY_NAME, partitionEntry.getKey());
                 partitionData.set(ERROR_CODE_KEY_NAME, 
offsetPartitionData.errorCode);
-                partitionData.set(OFFSETS_KEY_NAME, 
offsetPartitionData.offsets.toArray());
+                if (version == 0)
+                    partitionData.set(OFFSETS_KEY_NAME, 
offsetPartitionData.offsets.toArray());
+                else {
+                    partitionData.set(TIMESTAMP_KEY_NAME, 
offsetPartitionData.timestamp);
+                    partitionData.set(OFFSET_KEY_NAME, 
offsetPartitionData.offset);
+                }
                 partitionArray.add(partitionData);
             }
             topicData.set(PARTITIONS_KEY_NAME, partitionArray.toArray());
@@ -98,11 +138,18 @@ public class ListOffsetResponse extends 
AbstractRequestResponse {
                 Struct partitionResponse = (Struct) partitionResponseObj;
                 int partition = partitionResponse.getInt(PARTITION_KEY_NAME);
                 short errorCode = 
partitionResponse.getShort(ERROR_CODE_KEY_NAME);
-                Object[] offsets = 
partitionResponse.getArray(OFFSETS_KEY_NAME);
-                List<Long> offsetsList = new ArrayList<Long>();
-                for (Object offset: offsets)
-                    offsetsList.add((Long) offset);
-                PartitionData partitionData = new PartitionData(errorCode, 
offsetsList);
+                PartitionData partitionData;
+                if (partitionResponse.hasField(OFFSETS_KEY_NAME)) {
+                    Object[] offsets = 
partitionResponse.getArray(OFFSETS_KEY_NAME);
+                    List<Long> offsetsList = new ArrayList<Long>();
+                    for (Object offset : offsets)
+                        offsetsList.add((Long) offset);
+                    partitionData = new PartitionData(errorCode, offsetsList);
+                } else {
+                    long timestamp = 
partitionResponse.getLong(TIMESTAMP_KEY_NAME);
+                    long offset = partitionResponse.getLong(OFFSET_KEY_NAME);
+                    partitionData = new PartitionData(errorCode, timestamp, 
offset);
+                }
                 responseData.put(new TopicPartition(topic, partition), 
partitionData);
             }
         }
@@ -115,4 +162,8 @@ public class ListOffsetResponse extends 
AbstractRequestResponse {
     public static ListOffsetResponse parse(ByteBuffer buffer) {
         return new ListOffsetResponse(CURRENT_SCHEMA.read(buffer));
     }
+
+    public static ListOffsetResponse parse(ByteBuffer buffer, int version) {
+        return new 
ListOffsetResponse(ProtoUtils.responseSchema(ApiKeys.LIST_OFFSETS.id, 
version).read(buffer));
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/eaaa433f/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index 2408c11..0096e72 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -1066,9 +1066,9 @@ public class KafkaConsumerTest {
         Map<TopicPartition, ListOffsetResponse.PartitionData> partitionData = 
new HashMap<>();
         for (Map.Entry<TopicPartition, Long> partitionOffset : 
offsets.entrySet()) {
             partitionData.put(partitionOffset.getKey(), new 
ListOffsetResponse.PartitionData(error,
-                    singletonList(partitionOffset.getValue())));
+                    1L, partitionOffset.getValue()));
         }
-        return new ListOffsetResponse(partitionData).toStruct();
+        return new ListOffsetResponse(partitionData, 1).toStruct();
     }
 
     private Struct fetchResponse(Map<TopicPartition, FetchInfo> fetches) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/eaaa433f/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
index 777b67f..4f8425a 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
@@ -125,10 +125,12 @@ public class AbstractCoordinatorTest {
             synchronized (coordinator) {
                 coordinator.notify();
             }
-            Thread.sleep(100);
-
-            coordinator.pollHeartbeat(mockTime.milliseconds());
-            fail("Expected pollHeartbeat to raise an error");
+            long startMs = System.currentTimeMillis();
+            while (System.currentTimeMillis() - startMs < 1000) {
+                Thread.sleep(10);
+                coordinator.pollHeartbeat(mockTime.milliseconds());
+            }
+            fail("Expected pollHeartbeat to raise an error in 1 second");
         } catch (RuntimeException exception) {
             assertEquals(exception, e);
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/eaaa433f/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 88a0526..d14488c 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -41,6 +41,7 @@ import org.apache.kafka.common.protocol.types.Struct;
 import org.apache.kafka.common.record.CompressionType;
 import org.apache.kafka.common.record.Compressor;
 import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.OffsetAndTimestamp;
 import org.apache.kafka.common.record.Record;
 import org.apache.kafka.common.requests.FetchRequest;
 import org.apache.kafka.common.requests.FetchResponse;
@@ -491,7 +492,7 @@ public class FetcherTest {
         // with no commit position, we should reset using the default strategy 
defined above (EARLIEST)
 
         
client.prepareResponse(listOffsetRequestMatcher(ListOffsetRequest.EARLIEST_TIMESTAMP),
-                               listOffsetResponse(Errors.NONE, 
Arrays.asList(5L)));
+                               listOffsetResponse(Errors.NONE, 1L, 5L));
         fetcher.updateFetchPositions(singleton(tp));
         assertFalse(subscriptions.isOffsetResetNeeded(tp));
         assertTrue(subscriptions.isFetchable(tp));
@@ -504,7 +505,7 @@ public class FetcherTest {
         subscriptions.needOffsetReset(tp, OffsetResetStrategy.LATEST);
 
         
client.prepareResponse(listOffsetRequestMatcher(ListOffsetRequest.LATEST_TIMESTAMP),
-                               listOffsetResponse(Errors.NONE, 
Arrays.asList(5L)));
+                               listOffsetResponse(Errors.NONE, 1L, 5L));
         fetcher.updateFetchPositions(singleton(tp));
         assertFalse(subscriptions.isOffsetResetNeeded(tp));
         assertTrue(subscriptions.isFetchable(tp));
@@ -517,7 +518,7 @@ public class FetcherTest {
         subscriptions.needOffsetReset(tp, OffsetResetStrategy.EARLIEST);
 
         
client.prepareResponse(listOffsetRequestMatcher(ListOffsetRequest.EARLIEST_TIMESTAMP),
-                               listOffsetResponse(Errors.NONE, 
Arrays.asList(5L)));
+                               listOffsetResponse(Errors.NONE, 1L, 5L));
         fetcher.updateFetchPositions(singleton(tp));
         assertFalse(subscriptions.isOffsetResetNeeded(tp));
         assertTrue(subscriptions.isFetchable(tp));
@@ -531,11 +532,11 @@ public class FetcherTest {
 
         // First request gets a disconnect
         
client.prepareResponse(listOffsetRequestMatcher(ListOffsetRequest.LATEST_TIMESTAMP),
-                               listOffsetResponse(Errors.NONE, 
Arrays.asList(5L)), true);
+                               listOffsetResponse(Errors.NONE, 1L, 5L), true);
 
         // Next one succeeds
         
client.prepareResponse(listOffsetRequestMatcher(ListOffsetRequest.LATEST_TIMESTAMP),
-                               listOffsetResponse(Errors.NONE, 
Arrays.asList(5L)));
+                               listOffsetResponse(Errors.NONE, 1L, 5L));
         fetcher.updateFetchPositions(singleton(tp));
         assertFalse(subscriptions.isOffsetResetNeeded(tp));
         assertTrue(subscriptions.isFetchable(tp));
@@ -635,23 +636,33 @@ public class FetcherTest {
         assertEquals(300, maxMetric.value(), EPSILON);
     }
 
+    @Test
+    public void testGetOffsetsByTimes() {
+        client.prepareResponseFrom(listOffsetResponse(Errors.NONE, 100L, 
100L), cluster.leaderFor(tp));
+
+        Map<TopicPartition, OffsetAndTimestamp> offsetAndTimestampMap =
+                fetcher.getOffsetsByTimes(Collections.singletonMap(tp, 0L));
+        assertEquals(offsetAndTimestampMap.get(tp).timestamp(), 100L);
+        assertEquals(offsetAndTimestampMap.get(tp).offset(), 100L);
+
+    }
+
     private MockClient.RequestMatcher listOffsetRequestMatcher(final long 
timestamp) {
         // matches any list offset request with the provided timestamp
         return new MockClient.RequestMatcher() {
             @Override
             public boolean matches(ClientRequest request) {
                 ListOffsetRequest req = new 
ListOffsetRequest(request.request().body());
-                ListOffsetRequest.PartitionData partitionData = 
req.offsetData().get(tp);
-                return partitionData != null && partitionData.timestamp == 
timestamp;
+                return timestamp == req.partitionTimestamps().get(tp);
             }
         };
     }
 
-    private Struct listOffsetResponse(Errors error, List<Long> offsets) {
-        ListOffsetResponse.PartitionData partitionData = new 
ListOffsetResponse.PartitionData(error.code(), offsets);
+    private Struct listOffsetResponse(Errors error, long timestamp, long 
offset) {
+        ListOffsetResponse.PartitionData partitionData = new 
ListOffsetResponse.PartitionData(error.code(), timestamp, offset);
         Map<TopicPartition, ListOffsetResponse.PartitionData> allPartitionData 
= new HashMap<>();
         allPartitionData.put(tp, partitionData);
-        ListOffsetResponse response = new ListOffsetResponse(allPartitionData);
+        ListOffsetResponse response = new ListOffsetResponse(allPartitionData, 
1);
         return response.toStruct();
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/eaaa433f/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
 
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index b3baa63..574f52d 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -69,9 +69,9 @@ public class RequestResponseTest {
                 createDescribeGroupRequest(),
                 createDescribeGroupRequest().getErrorResponse(0, new 
UnknownServerException()),
                 createDescribeGroupResponse(),
-                createListOffsetRequest(),
-                createListOffsetRequest().getErrorResponse(0, new 
UnknownServerException()),
-                createListOffsetResponse(),
+                createListOffsetRequest(1),
+                createListOffsetRequest(1).getErrorResponse(1, new 
UnknownServerException()),
+                createListOffsetResponse(1),
                 MetadataRequest.allTopics(),
                 createMetadataRequest(Arrays.asList("topic1")),
                 
createMetadataRequest(Arrays.asList("topic1")).getErrorResponse(2, new 
UnknownServerException()),
@@ -129,6 +129,9 @@ public class RequestResponseTest {
         checkSerialization(createUpdateMetadataRequest(1, null), 1);
         checkSerialization(createUpdateMetadataRequest(1, "rack1"), 1);
         checkSerialization(createUpdateMetadataRequest(1, 
null).getErrorResponse(1, new UnknownServerException()), 1);
+        checkSerialization(createListOffsetRequest(0), 0);
+        checkSerialization(createListOffsetRequest(0).getErrorResponse(0, new 
UnknownServerException()), 0);
+        checkSerialization(createListOffsetResponse(0), 0);
     }
 
     private void checkOlderFetchVersions() throws Exception {
@@ -151,7 +154,7 @@ public class RequestResponseTest {
             Method deserializer = req.getClass().getDeclaredMethod("parse", 
ByteBuffer.class, Integer.TYPE);
             deserialized = (AbstractRequestResponse) deserializer.invoke(null, 
buffer, version);
         }
-        assertEquals("The original and deserialized of " + 
req.getClass().getSimpleName() + " should be the same.", req, deserialized);
+        assertEquals("The original and deserialized of " + 
req.getClass().getSimpleName() + "(version " + version + ") should be the 
same.", req, deserialized);
         assertEquals("The original and deserialized of " + 
req.getClass().getSimpleName() + " should have the same hashcode.",
                 req.hashCode(), deserialized.hashCode());
     }
@@ -304,16 +307,32 @@ public class RequestResponseTest {
         return new LeaveGroupResponse(Errors.NONE.code());
     }
 
-    private AbstractRequest createListOffsetRequest() {
-        Map<TopicPartition, ListOffsetRequest.PartitionData> offsetData = new 
HashMap<>();
-        offsetData.put(new TopicPartition("test", 0), new 
ListOffsetRequest.PartitionData(1000000L, 10));
-        return new ListOffsetRequest(-1, offsetData);
+    private AbstractRequest createListOffsetRequest(int version) {
+        if (version == 0) {
+            Map<TopicPartition, ListOffsetRequest.PartitionData> offsetData = 
new HashMap<>();
+            offsetData.put(new TopicPartition("test", 0), new 
ListOffsetRequest.PartitionData(1000000L, 10));
+            return new ListOffsetRequest(offsetData);
+        } else if (version == 1) {
+            Map<TopicPartition, Long> offsetData = new HashMap<>();
+            offsetData.put(new TopicPartition("test", 0), 1000000L);
+            return new ListOffsetRequest(offsetData, 
ListOffsetRequest.CONSUMER_REPLICA_ID);
+        } else {
+            throw new IllegalArgumentException("Illegal ListOffsetRequest 
version " + version);
+        }
     }
 
-    private AbstractRequestResponse createListOffsetResponse() {
-        Map<TopicPartition, ListOffsetResponse.PartitionData> responseData = 
new HashMap<>();
-        responseData.put(new TopicPartition("test", 0), new 
ListOffsetResponse.PartitionData(Errors.NONE.code(), Arrays.asList(100L)));
-        return new ListOffsetResponse(responseData);
+    private AbstractRequestResponse createListOffsetResponse(int version) {
+        if (version == 0) {
+            Map<TopicPartition, ListOffsetResponse.PartitionData> responseData 
= new HashMap<>();
+            responseData.put(new TopicPartition("test", 0), new 
ListOffsetResponse.PartitionData(Errors.NONE.code(), Arrays.asList(100L)));
+            return new ListOffsetResponse(responseData);
+        } else if (version == 1) {
+            Map<TopicPartition, ListOffsetResponse.PartitionData> responseData 
= new HashMap<>();
+            responseData.put(new TopicPartition("test", 0), new 
ListOffsetResponse.PartitionData(Errors.NONE.code(), 10000L, 100L));
+            return new ListOffsetResponse(responseData, 1);
+        } else {
+            throw new IllegalArgumentException("Illegal ListOffsetResponse 
version " + version);
+        }
     }
 
     private AbstractRequest createMetadataRequest(List<String> topics) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/eaaa433f/core/src/main/scala/kafka/api/ApiVersion.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/ApiVersion.scala 
b/core/src/main/scala/kafka/api/ApiVersion.scala
index cdd418d..0d9775a 100644
--- a/core/src/main/scala/kafka/api/ApiVersion.scala
+++ b/core/src/main/scala/kafka/api/ApiVersion.scala
@@ -57,7 +57,10 @@ object ApiVersion {
     "0.10.1-IV0" -> KAFKA_0_10_1_IV0,
     // 0.10.1-IV1 is introduced for KIP-74(fetch response size limit).
     "0.10.1-IV1" -> KAFKA_0_10_1_IV1,
-    "0.10.1" -> KAFKA_0_10_1_IV1
+    // introduced ListGroupRequest v1 in KIP-79
+    "0.10.1-IV2" -> KAFKA_0_10_1_IV2,
+    "0.10.1" -> KAFKA_0_10_1_IV2
+
   )
 
   private val versionPattern = "\\.".r
@@ -129,3 +132,9 @@ case object KAFKA_0_10_1_IV1 extends ApiVersion {
   val messageFormatVersion: Byte = Message.MagicValue_V1
   val id: Int = 7
 }
+
+case object KAFKA_0_10_1_IV2 extends ApiVersion {
+  val version: String = "0.10.1-IV2"
+  val messageFormatVersion: Byte = Message.MagicValue_V1
+  val id: Int = 8
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/eaaa433f/core/src/main/scala/kafka/log/FileMessageSet.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/FileMessageSet.scala 
b/core/src/main/scala/kafka/log/FileMessageSet.scala
index c76653a..55dbfee 100755
--- a/core/src/main/scala/kafka/log/FileMessageSet.scala
+++ b/core/src/main/scala/kafka/log/FileMessageSet.scala
@@ -159,49 +159,34 @@ class FileMessageSet private[kafka](@volatile var file: 
File,
   /**
    * Search forward for the message whose timestamp is greater than or equals 
to the target timestamp.
    *
-   * The search will stop immediately when it sees a message in format version 
before 0.10.0. This is to avoid
-   * scanning the entire log when all the messages are still in old format.
-   *
    * @param targetTimestamp The timestamp to search for.
    * @param startingPosition The starting position to search.
-   * @return None, if no message exists at or after the starting position.
-   *         Some(the_next_offset_to_read) otherwise.
+   * @return The timestamp and offset of the message found. None, if no 
message is found.
    */
-  def searchForTimestamp(targetTimestamp: Long, startingPosition: Int): 
Option[Long] = {
-    var maxTimestampChecked = Message.NoTimestamp
+  def searchForTimestamp(targetTimestamp: Long, startingPosition: Int): 
Option[TimestampOffset] = {
     var lastOffsetChecked = -1L
     val messagesToSearch = read(startingPosition, sizeInBytes)
     for (messageAndOffset <- messagesToSearch) {
       val message = messageAndOffset.message
       lastOffsetChecked = messageAndOffset.offset
-      // Stop searching once we see message format before 0.10.0.
-      // This equivalent as treating message without timestamp has the largest 
timestamp.
-      // We do this to avoid scanning the entire log if no message has a 
timestamp.
-      if (message.magic == Message.MagicValue_V0)
-        return Some(messageAndOffset.offset)
-      else if (message.timestamp >= targetTimestamp) {
+      if (message.timestamp >= targetTimestamp) {
         // We found a message
         message.compressionCodec match {
           case NoCompressionCodec =>
-            return Some(messageAndOffset.offset)
+            return Some(TimestampOffset(messageAndOffset.message.timestamp, 
messageAndOffset.offset))
           case _ =>
             // Iterate over the inner messages to get the exact offset.
-            for (innerMessage <- 
ByteBufferMessageSet.deepIterator(messageAndOffset)) {
-              val timestamp = innerMessage.message.timestamp
+            for (innerMessageAndOffset <- 
ByteBufferMessageSet.deepIterator(messageAndOffset)) {
+              val timestamp = innerMessageAndOffset.message.timestamp
               if (timestamp >= targetTimestamp)
-                return Some(innerMessage.offset)
+                return 
Some(TimestampOffset(innerMessageAndOffset.message.timestamp, 
innerMessageAndOffset.offset))
             }
             throw new IllegalStateException(s"The message set (max timestamp = 
${message.timestamp}, max offset = ${messageAndOffset.offset}" +
                 s" should contain target timestamp $targetTimestamp but it 
does not.")
         }
-      } else
-        maxTimestampChecked = math.max(maxTimestampChecked, message.timestamp)
+      }
     }
-
-    if (lastOffsetChecked >= 0)
-      Some(lastOffsetChecked + 1)
-    else
-      None
+    None
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/eaaa433f/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/Log.scala 
b/core/src/main/scala/kafka/log/Log.scala
index cfd0472..6043b01 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -17,7 +17,7 @@
 
 package kafka.log
 
-import kafka.api.OffsetRequest
+import kafka.api.KAFKA_0_10_0_IV0
 import kafka.utils._
 import kafka.message._
 import kafka.common._
@@ -28,8 +28,9 @@ import java.util.concurrent.{ConcurrentNavigableMap, 
ConcurrentSkipListMap}
 import java.util.concurrent.atomic._
 import java.text.NumberFormat
 
-import org.apache.kafka.common.errors.{CorruptRecordException, 
OffsetOutOfRangeException, RecordBatchTooLargeException, 
RecordTooLargeException}
+import org.apache.kafka.common.errors.{InvalidRequestException, 
CorruptRecordException, OffsetOutOfRangeException, 
RecordBatchTooLargeException, RecordTooLargeException}
 import org.apache.kafka.common.record.TimestampType
+import org.apache.kafka.common.requests.ListOffsetRequest
 
 import scala.collection.{Seq, JavaConversions}
 import com.yammer.metrics.core.Gauge
@@ -583,59 +584,41 @@ class Log(val dir: File,
    * `NOTE:` OffsetRequest V0 does not use this method, the behavior of 
OffsetRequest V0 remains the same as before
    * , i.e. it only gives back the timestamp based on the last modification 
time of the log segments.
    *
-   * @param timestamp The given timestamp for offset fetching.
+   * @param targetTimestamp The given timestamp for offset fetching.
    * @return The offset of the first message whose timestamp is greater than 
or equals to the given timestamp.
+   *         None if no such message is found.
    */
-  def fetchOffsetsByTimestamp(timestamp: Long): Long = {
-    debug(s"Searching offset for timestamp $timestamp")
-    val segsArray = logSegments.toArray
-    if (timestamp == OffsetRequest.EarliestTime)
-      return segsArray(0).baseOffset
+  def fetchOffsetsByTimestamp(targetTimestamp: Long): Option[TimestampOffset] 
= {
+    debug(s"Searching offset for timestamp $targetTimestamp")
 
-    // set the target timestamp to be Long.MaxValue if we need to find from 
the latest.
-    val targetTimestamp = timestamp match {
-      case OffsetRequest.LatestTime => Long.MaxValue
-      case _ => timestamp
-    }
+    if (config.messageFormatVersion < KAFKA_0_10_0_IV0 &&
+        targetTimestamp != ListOffsetRequest.EARLIEST_TIMESTAMP &&
+        targetTimestamp != ListOffsetRequest.LATEST_TIMESTAMP)
+      throw new InvalidRequestException(s"Cannot search offsets based on 
timestamp because message format version " +
+          s"for partition $topicAndPartition is ${config.messageFormatVersion} 
which is earlier than the minimum " +
+          s"required version $KAFKA_0_10_0_IV0")
 
-    var foundOffset: Long = -1L
-    // We have this while loop here to make sure we are returning the valid 
offsets to our best knowledge.
-    // This while loop is to handle the case where the log is truncated during 
the timestamp search and we did not
-    // find any message. In this case, we need to retry the search.
-    do {
-      val targetSeg = {
-        // Get all the segments whose largest timestamp is smaller than target 
timestamp
-        val earlierSegs = segsArray.takeWhile(_.largestTimestamp < 
targetTimestamp)
-        // We need to search the first segment whose largest timestamp is 
greater than the target timestamp if there is one.
-        if (earlierSegs.length < segsArray.length)
-          segsArray(earlierSegs.length)
-        else
-          earlierSegs.last
-      }
+    // For the earliest and latest, we do not need to return the timestamp.
+    val segsArray = logSegments.toArray
+    if (targetTimestamp == ListOffsetRequest.EARLIEST_TIMESTAMP)
+        return Some(TimestampOffset(Message.NoTimestamp, 
segsArray(0).baseOffset))
+    else if (targetTimestamp == ListOffsetRequest.LATEST_TIMESTAMP)
+        return Some(TimestampOffset(Message.NoTimestamp, logEndOffset))
+
+    val targetSeg = {
+      // Get all the segments whose largest timestamp is smaller than target 
timestamp
+      val earlierSegs = segsArray.takeWhile(_.largestTimestamp < 
targetTimestamp)
+      // We need to search the first segment whose largest timestamp is 
greater than the target timestamp if there is one.
+      if (earlierSegs.length < segsArray.length)
+        Some(segsArray(earlierSegs.length))
+      else
+        None
+    }
 
-      // First cache the current log end offset
-      val leo = logEndOffset
-      foundOffset = {
-        // Use the cached log end offsets if
-        // 1. user is asking for latest messages, or,
-        // 2. we are searching on the active segment and the target timestamp 
is greater than the largestTimestamp
-        // after we cached the log end offset. (We have to use the cached log 
end offsets because it is possible that
-        // some messages with a larger timestamp are appended after we check 
the largest timestamp. Using log end offset
-        // after the timestamp check might skip those messages.)
-        if (targetTimestamp == Long.MaxValue
-          || (targetTimestamp > targetSeg.largestTimestamp && targetSeg == 
activeSegment))
-          leo
-        else
-        // The findOffsetByTimestamp() method may return None when the log is 
truncated during the timestamp search.
-        // In that case we simply set the foundOffset to -1 so that we will 
search the timestamp again in the
-        // while loop.
-          targetSeg.findOffsetByTimestamp(targetTimestamp) match {
-            case Some(offset) => offset
-            case None => -1L
-          }
-      }
-    } while (foundOffset < 0)
-    foundOffset
+    targetSeg match {
+      case Some(segment) => segment.findOffsetByTimestamp(targetTimestamp)
+      case None => None
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/eaaa433f/core/src/main/scala/kafka/log/LogSegment.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogSegment.scala 
b/core/src/main/scala/kafka/log/LogSegment.scala
index 3f76a84..1c39acf 100755
--- a/core/src/main/scala/kafka/log/LogSegment.scala
+++ b/core/src/main/scala/kafka/log/LogSegment.scala
@@ -387,21 +387,15 @@ class LogSegment(val log: FileMessageSet,
    * the truncated log and maybe retry or even do the search on another log 
segment.
    *
    * @param timestamp The timestamp to search for.
-   * @return an offset which points to the first message whose timestamp is 
larger than or equals to the
-   *         target timestamp.
-   *         None maybe returned when the log is truncated.
+   * @return the timestamp and offset of the first message whose timestamp is 
larger than or equals to the
+   *         target timestamp. None will be returned if there is no such 
message.
    */
-  def findOffsetByTimestamp(timestamp: Long): Option[Long] = {
-    if (log.end == log.start) {
-      // The log segment is empty, just return base offset with no timestamp.
-      Some(baseOffset)
-    } else {
-      // Get the index entry with a timestamp less than or equal to the target 
timestamp
-      val timestampOffset = timeIndex.lookup(timestamp)
-      val position = index.lookup(timestampOffset.offset).position
-      // Search the timestamp
-      log.searchForTimestamp(timestamp, position)
-    }
+  def findOffsetByTimestamp(timestamp: Long): Option[TimestampOffset] = {
+    // Get the index entry with a timestamp less than or equal to the target 
timestamp
+    val timestampOffset = timeIndex.lookup(timestamp)
+    val position = index.lookup(timestampOffset.offset).position
+    // Search the timestamp
+    log.searchForTimestamp(timestamp, position)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/eaaa433f/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala 
b/core/src/main/scala/kafka/server/KafkaApis.scala
index 51c9eab..15e5b62 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -35,11 +35,11 @@ import kafka.network._
 import kafka.network.RequestChannel.{Response, Session}
 import kafka.security.auth
 import kafka.security.auth.{Authorizer, ClusterAction, Create, Describe, 
Group, Operation, Read, Resource, Write, Delete}
-import kafka.server.QuotaType._
 import kafka.utils.{Logging, SystemTime, ZKGroupTopicDirs, ZkUtils}
-import org.apache.kafka.common.errors.{ClusterAuthorizationException, 
NotLeaderForPartitionException, UnknownTopicOrPartitionException, 
TopicExistsException}
+import org.apache.kafka.common.errors.{InvalidRequestException, 
ClusterAuthorizationException, NotLeaderForPartitionException, 
UnknownTopicOrPartitionException, TopicExistsException}
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.protocol.{ApiKeys, Errors, Protocol, 
SecurityProtocol}
+import org.apache.kafka.common.requests.ListOffsetResponse.PartitionData
 import org.apache.kafka.common.requests.{ApiVersionsResponse, 
DescribeGroupsRequest, DescribeGroupsResponse, GroupCoordinatorRequest, 
GroupCoordinatorResponse, HeartbeatRequest, HeartbeatResponse, 
JoinGroupRequest, JoinGroupResponse, LeaderAndIsrRequest, LeaderAndIsrResponse, 
LeaveGroupRequest, LeaveGroupResponse, ListGroupsResponse, ListOffsetRequest, 
ListOffsetResponse, MetadataRequest, MetadataResponse, OffsetCommitRequest, 
OffsetCommitResponse, OffsetFetchRequest, OffsetFetchResponse, ProduceRequest, 
ProduceResponse, ResponseHeader, ResponseSend, StopReplicaRequest, 
StopReplicaResponse, SyncGroupRequest, SyncGroupResponse, 
UpdateMetadataRequest, UpdateMetadataResponse, CreateTopicsRequest, 
CreateTopicsResponse, DeleteTopicsRequest, DeleteTopicsResponse}
 import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
 import org.apache.kafka.common.utils.Utils
@@ -531,6 +531,22 @@ class KafkaApis(val requestChannel: RequestChannel,
    */
   def handleOffsetRequest(request: RequestChannel.Request) {
     val correlationId = request.header.correlationId
+    val version = request.header.apiVersion()
+
+    val mergedResponseMap =
+      if (version == 0)
+        handleOffsetRequestV0(request)
+      else
+        handleOffsetRequestV1(request)
+
+    val responseHeader = new ResponseHeader(correlationId)
+    val response = new ListOffsetResponse(mergedResponseMap.asJava, version)
+
+    requestChannel.sendResponse(new RequestChannel.Response(request, new 
ResponseSend(request.connectionId, responseHeader, response)))
+  }
+
+  private def handleOffsetRequestV0(request : RequestChannel.Request) : 
Map[TopicPartition, ListOffsetResponse.PartitionData] = {
+    val correlationId = request.header.correlationId
     val clientId = request.header.clientId
     val offsetRequest = request.body.asInstanceOf[ListOffsetRequest]
 
@@ -539,11 +555,10 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
 
     val unauthorizedResponseStatus = unauthorizedRequestInfo.mapValues(_ =>
-      new 
ListOffsetResponse.PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED.code, 
List[JLong]().asJava)
+      new PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED.code, 
List[JLong]().asJava)
     )
 
-    val responseMap = authorizedRequestInfo.map(elem => {
-      val (topicPartition, partitionData) = elem
+    val responseMap = authorizedRequestInfo.map({case (topicPartition, 
partitionData) =>
       try {
         // ensure leader exists
         val localReplica = if (offsetRequest.replicaId != 
ListOffsetRequest.DEBUGGING_REPLICA_ID)
@@ -552,9 +567,9 @@ class KafkaApis(val requestChannel: RequestChannel,
           replicaManager.getReplicaOrException(topicPartition.topic, 
topicPartition.partition)
         val offsets = {
           val allOffsets = fetchOffsets(replicaManager.logManager,
-                                        topicPartition,
-                                        partitionData.timestamp,
-                                        partitionData.maxNumOffsets)
+            topicPartition,
+            partitionData.timestamp,
+            partitionData.maxNumOffsets)
           if (offsetRequest.replicaId != 
ListOffsetRequest.CONSUMER_REPLICA_ID) {
             allOffsets
           } else {
@@ -569,26 +584,91 @@ class KafkaApis(val requestChannel: RequestChannel,
       } catch {
         // NOTE: UnknownTopicOrPartitionException and 
NotLeaderForPartitionException are special cased since these error messages
         // are typically transient and there is no value in logging the entire 
stack trace for the same
-        case utpe: UnknownTopicOrPartitionException =>
+        case e @ ( _ : UnknownTopicOrPartitionException | _ : 
NotLeaderForPartitionException) =>
           debug("Offset request with correlation id %d from client %s on 
partition %s failed due to %s".format(
-               correlationId, clientId, topicPartition, utpe.getMessage))
-          (topicPartition, new 
ListOffsetResponse.PartitionData(Errors.forException(utpe).code, 
List[JLong]().asJava))
-        case nle: NotLeaderForPartitionException =>
-          debug("Offset request with correlation id %d from client %s on 
partition %s failed due to %s".format(
-               correlationId, clientId, topicPartition,nle.getMessage))
-          (topicPartition, new 
ListOffsetResponse.PartitionData(Errors.forException(nle).code, 
List[JLong]().asJava))
+            correlationId, clientId, topicPartition, e.getMessage))
+          (topicPartition, new 
ListOffsetResponse.PartitionData(Errors.forException(e).code, 
List[JLong]().asJava))
         case e: Throwable =>
           error("Error while responding to offset request", e)
           (topicPartition, new 
ListOffsetResponse.PartitionData(Errors.forException(e).code, 
List[JLong]().asJava))
       }
     })
+    responseMap ++ unauthorizedResponseStatus
+  }
 
-    val mergedResponseMap = responseMap ++ unauthorizedResponseStatus
+  private def handleOffsetRequestV1(request : RequestChannel.Request): 
Map[TopicPartition, ListOffsetResponse.PartitionData] = {
+    val correlationId = request.header.correlationId
+    val clientId = request.header.clientId
+    val offsetRequest = request.body.asInstanceOf[ListOffsetRequest]
 
-    val responseHeader = new ResponseHeader(correlationId)
-    val response = new ListOffsetResponse(mergedResponseMap.asJava)
+    val (authorizedRequestInfo, unauthorizedRequestInfo) = 
offsetRequest.partitionTimestamps.asScala.partition {
+      case (topicPartition, _) => authorize(request.session, Describe, new 
Resource(auth.Topic, topicPartition.topic))
+    }
 
-    requestChannel.sendResponse(new RequestChannel.Response(request, new 
ResponseSend(request.connectionId, responseHeader, response)))
+    val unauthorizedResponseStatus = unauthorizedRequestInfo.mapValues(_ => {
+      new PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED.code,
+                        ListOffsetResponse.UNKNOWN_TIMESTAMP,
+                        ListOffsetResponse.UNKNOWN_OFFSET)
+    })
+
+    val responseMap = authorizedRequestInfo.map({case (topicPartition, 
timestamp) =>
+      if (offsetRequest.duplicatePartitions().contains(topicPartition)) {
+        debug(s"OffsetRequest with correlation id $correlationId from client 
$clientId on partition $topicPartition " +
+            s"failed because the partition is duplicated in the request.")
+        (topicPartition, new PartitionData(Errors.INVALID_REQUEST.code(),
+                                           
ListOffsetResponse.UNKNOWN_TIMESTAMP,
+                                           ListOffsetResponse.UNKNOWN_OFFSET))
+      } else {
+        try {
+          // ensure leader exists
+          val localReplica = if (offsetRequest.replicaId != 
ListOffsetRequest.DEBUGGING_REPLICA_ID)
+            replicaManager.getLeaderReplicaIfLocal(topicPartition.topic, 
topicPartition.partition)
+          else
+            replicaManager.getReplicaOrException(topicPartition.topic, 
topicPartition.partition)
+          val found = {
+            fetchOffsetForTimestamp(replicaManager.logManager, topicPartition, 
timestamp) match {
+              case Some(timestampOffset) =>
+                // The request is not from a consumer client
+                if (offsetRequest.replicaId != 
ListOffsetRequest.CONSUMER_REPLICA_ID)
+                  timestampOffset
+                // The request is from a consumer client
+                else {
+                  // the found offset is smaller or equals to the high 
watermark
+                  if (timestampOffset.offset <= 
localReplica.highWatermark.messageOffset)
+                    timestampOffset
+                  // the consumer wants the latest offset.
+                  else if (timestamp == ListOffsetRequest.LATEST_TIMESTAMP)
+                    TimestampOffset(Message.NoTimestamp, 
localReplica.highWatermark.messageOffset)
+                  // The found offset is higher than the high watermark and 
the consumer is not asking for the end offset.
+                  else
+                    TimestampOffset(ListOffsetResponse.UNKNOWN_TIMESTAMP, 
ListOffsetResponse.UNKNOWN_OFFSET)
+                }
+
+              case None =>
+                TimestampOffset(ListOffsetResponse.UNKNOWN_TIMESTAMP, 
ListOffsetResponse.UNKNOWN_OFFSET)
+            }
+          }
+          (topicPartition, new 
ListOffsetResponse.PartitionData(Errors.NONE.code, found.timestamp, 
found.offset))
+        } catch {
+          // NOTE: These exceptions are special cased since these error 
messages are typically transient or the client
+          // would have received a clear exception and there is no value in 
logging the entire stack trace for the same
+          case e @ (_ : UnknownTopicOrPartitionException |
+                    _ : NotLeaderForPartitionException |
+                    _ : InvalidRequestException) =>
+            debug(s"Offset request with correlation id $correlationId from 
client $clientId on " +
+                s"partition $topicPartition failed due to ${e.getMessage}")
+            (topicPartition, new PartitionData(Errors.forException(e).code,
+                                               
ListOffsetResponse.UNKNOWN_TIMESTAMP,
+                                               
ListOffsetResponse.UNKNOWN_OFFSET))
+          case e: Throwable =>
+            error("Error while responding to offset request", e)
+            (topicPartition, new PartitionData(Errors.forException(e).code,
+                                               
ListOffsetResponse.UNKNOWN_TIMESTAMP,
+                                               
ListOffsetResponse.UNKNOWN_OFFSET))
+        }
+      }
+    })
+    responseMap ++ unauthorizedResponseStatus
   }
 
   def fetchOffsets(logManager: LogManager, topicPartition: TopicPartition, 
timestamp: Long, maxNumOffsets: Int): Seq[Long] = {
@@ -603,6 +683,15 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
   }
 
+  private def fetchOffsetForTimestamp(logManager: LogManager, topicPartition: 
TopicPartition, timestamp: Long) : Option[TimestampOffset] = {
+    logManager.getLog(TopicAndPartition(topicPartition.topic, 
topicPartition.partition)) match {
+      case Some(log) =>
+        log.fetchOffsetsByTimestamp(timestamp)
+      case _ =>
+        throw new UnknownTopicOrPartitionException(s"$topicPartition does not 
exist on the broker.")
+    }
+  }
+
   private[server] def fetchOffsetsBefore(log: Log, timestamp: Long, 
maxNumOffsets: Int): Seq[Long] = {
     val segsArray = log.logSegments.toArray
     var offsetTimeArray: Array[(Long, Long)] = null
@@ -1063,8 +1152,7 @@ class KafkaApis(val requestChannel: RequestChannel,
         (topic, Errors.CLUSTER_AUTHORIZATION_FAILED)
       }
       sendResponseCallback(results)
-    }
-    else {
+    } else {
       val (validTopics, duplicateTopics) = 
createTopicsRequest.topics.asScala.partition { case (topic, _) =>
         !createTopicsRequest.duplicateTopics.contains(topic)
       }

Reply via email to