KAFKA-4148; Support ListOffsetRequest v1 and search offsets by timestamp in consumer (KIP-79)
Author: Jiangjie Qin <becket....@gmail.com> Reviewers: Jun Rao <jun...@gmail.com>, Ismael Juma <ism...@juma.me.uk>, Jason Gustafson <ja...@confluent.io> Closes #1852 from becketqin/KAFKA-4148 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/eaaa433f Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/eaaa433f Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/eaaa433f Branch: refs/heads/trunk Commit: eaaa433fc97b86450833d8fcc8c9289ea35d47c0 Parents: cf8f4a7 Author: Jiangjie Qin <becket....@gmail.com> Authored: Mon Sep 19 18:38:17 2016 -0700 Committer: Jason Gustafson <ja...@confluent.io> Committed: Mon Sep 19 18:38:27 2016 -0700 ---------------------------------------------------------------------- .../apache/kafka/clients/consumer/Consumer.java | 16 ++ .../kafka/clients/consumer/KafkaConsumer.java | 54 ++++++ .../kafka/clients/consumer/MockConsumer.java | 30 ++++ .../clients/consumer/internals/Fetcher.java | 179 +++++++++++++------ .../apache/kafka/common/protocol/Protocol.java | 40 ++++- .../kafka/common/record/OffsetAndTimestamp.java | 58 ++++++ .../common/requests/ListOffsetRequest.java | 109 ++++++++--- .../common/requests/ListOffsetResponse.java | 65 ++++++- .../clients/consumer/KafkaConsumerTest.java | 4 +- .../internals/AbstractCoordinatorTest.java | 10 +- .../clients/consumer/internals/FetcherTest.java | 31 ++-- .../common/requests/RequestResponseTest.java | 43 +++-- core/src/main/scala/kafka/api/ApiVersion.scala | 11 +- .../main/scala/kafka/log/FileMessageSet.scala | 33 +--- core/src/main/scala/kafka/log/Log.scala | 83 ++++----- core/src/main/scala/kafka/log/LogSegment.scala | 22 +-- .../src/main/scala/kafka/server/KafkaApis.scala | 130 +++++++++++--- .../kafka/server/ReplicaFetcherThread.scala | 31 ++-- .../kafka/api/PlaintextConsumerTest.scala | 84 ++++++++- .../scala/unit/kafka/log/LogSegmentTest.scala | 21 +-- .../src/test/scala/unit/kafka/log/LogTest.scala | 8 +- docs/upgrade.html | 26 ++- 22 files changed, 837 insertions(+), 251 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/eaaa433f/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java index 6f5a6b6..06e1bec 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java @@ -16,6 +16,7 @@ import org.apache.kafka.common.Metric; import org.apache.kafka.common.MetricName; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.record.OffsetAndTimestamp; import java.io.Closeable; import java.util.Collection; @@ -151,6 +152,21 @@ public interface Consumer<K, V> extends Closeable { public void resume(Collection<TopicPartition> partitions); /** + * @see KafkaConsumer#offsetsForTimes(java.util.Map) + */ + public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch); + + /** + * @see KafkaConsumer#beginningOffsets(java.util.Collection) + */ + public Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> partitions); + + /** + * @see KafkaConsumer#endOffsets(java.util.Collection) + */ + public Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions); + + /** * @see KafkaConsumer#close() */ public void close(); http://git-wip-us.apache.org/repos/asf/kafka/blob/eaaa433f/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index 108c0cb..889aad8 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -37,6 +37,7 @@ import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.metrics.MetricsReporter; import org.apache.kafka.common.network.ChannelBuilder; import org.apache.kafka.common.network.Selector; +import org.apache.kafka.common.record.OffsetAndTimestamp; import org.apache.kafka.common.requests.MetadataRequest; import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.utils.AppInfoParser; @@ -1400,6 +1401,59 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { } /** + * Look up the offsets for the given partitions by timestamp. The returned offset for each partition is the + * earliest offset whose timestamp is greater than or equal to the given timestamp in the corresponding partition. + * + * This is a blocking call. The consumer does not have to be assigned the partitions. + * If the message format version in a partition is before 0.10.0, i.e. the messages do not have timestamps, null + * will be returned for that partition. + * + * Notice that this method may block indefinitely if the partition does not exist. + * + * @param timestampsToSearch the mapping from partition to the timestamp to look up. + * @return a mapping from partition to the timestamp and offset of the first message with timestamp greater + * than or equal to the target timestamp. {@code null} will be returned for the partition if there is no + * such message. + * @throws IllegalArgumentException if the target timestamp is negative. + */ + @Override + public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch) { + for (Map.Entry<TopicPartition, Long> entry : timestampsToSearch.entrySet()) { + if (entry.getValue() < 0) + throw new IllegalArgumentException("The target time for partition " + entry.getKey() + " is " + + entry.getValue() + ". The target time cannot be negative."); + } + return fetcher.getOffsetsByTimes(timestampsToSearch); + } + + /** + * Get the earliest available offsets for the given partitions. + * + * Notice that this method may block indefinitely if the partition does not exist. + * + * @param partitions the partitions to get the earliest offsets. + * @return The earliest available offsets for the given partitions + */ + @Override + public Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> partitions) { + return fetcher.earliestOffsets(partitions); + } + + /** + * Get the end offsets for the given partitions. The end offset of a partition is the offset of the upcoming + * message, i.e. the offset of the last available message + 1. + * + * Notice that this method may block indefinitely if the partition does not exist. + * + * @param partitions the partitions to get the end offsets. + * @return The end offsets for the given partitions. + */ + @Override + public Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions) { + return fetcher.latestOffsets(partitions); + } + + /** * Close the consumer, waiting indefinitely for any needed cleanup. If auto-commit is enabled, this * will commit the current offsets. Note that {@link #wakeup()} cannot be use to interrupt close. */ http://git-wip-us.apache.org/repos/asf/kafka/blob/eaaa433f/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java index 62eb77d..3af2344 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java @@ -20,6 +20,7 @@ import org.apache.kafka.common.MetricName; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.WakeupException; +import org.apache.kafka.common.record.OffsetAndTimestamp; import java.util.ArrayList; import java.util.Collection; @@ -299,6 +300,35 @@ public class MockConsumer<K, V> implements Consumer<K, V> { } @Override + public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch) { + return null; + } + + @Override + public Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> partitions) { + Map<TopicPartition, Long> result = new HashMap<>(); + for (TopicPartition tp : partitions) { + Long beginningOffset = beginningOffsets.get(tp); + if (beginningOffset == null) + throw new IllegalStateException("The partition " + tp + " does not have a beginning offset."); + result.put(tp, beginningOffset); + } + return result; + } + + @Override + public Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions) { + Map<TopicPartition, Long> result = new HashMap<>(); + for (TopicPartition tp : partitions) { + Long endOffset = endOffsets.get(tp); + if (endOffset == null) + throw new IllegalStateException("The partition " + tp + " does not have an end offset."); + result.put(tp, endOffset); + } + return result; + } + + @Override public void close() { ensureNotClosed(); this.closed = true; http://git-wip-us.apache.org/repos/asf/kafka/blob/eaaa433f/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java index 202c0ad..14f7c5d 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java @@ -41,6 +41,7 @@ import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.record.InvalidRecordException; import org.apache.kafka.common.record.LogEntry; import org.apache.kafka.common.record.MemoryRecords; +import org.apache.kafka.common.record.OffsetAndTimestamp; import org.apache.kafka.common.record.Record; import org.apache.kafka.common.record.TimestampType; import org.apache.kafka.common.requests.FetchRequest; @@ -57,6 +58,7 @@ import org.slf4j.LoggerFactory; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -351,23 +353,16 @@ public class Fetcher<K, V> { throw new NoOffsetForPartitionException(partition); log.debug("Resetting offset for partition {} to {} offset.", partition, strategy.name().toLowerCase(Locale.ROOT)); - long offset = listOffset(partition, timestamp); + long offset = getOffsetsByTimes(Collections.singletonMap(partition, timestamp)).get(partition).offset(); // we might lose the assignment while fetching the offset, so check it is still active if (subscriptions.isAssigned(partition)) this.subscriptions.seek(partition, offset); } - /** - * Fetch a single offset before the given timestamp for the partition. - * - * @param partition The partition that needs fetching offset. - * @param timestamp The timestamp for fetching offset. - * @return The offset of the message that is published before the given timestamp - */ - private long listOffset(TopicPartition partition, long timestamp) { + public Map<TopicPartition, OffsetAndTimestamp> getOffsetsByTimes(Map<TopicPartition, Long> timestampsToSearch) { while (true) { - RequestFuture<Long> future = sendListOffsetRequest(partition, timestamp); + RequestFuture<Map<TopicPartition, OffsetAndTimestamp>> future = sendListOffsetRequests(timestampsToSearch); client.poll(future); if (future.succeeded()) @@ -383,6 +378,25 @@ public class Fetcher<K, V> { } } + public Map<TopicPartition, Long> earliestOffsets(Collection<TopicPartition> partitions) { + return earliestOrLatestOffset(partitions, ListOffsetRequest.EARLIEST_TIMESTAMP); + } + + public Map<TopicPartition, Long> latestOffsets(Collection<TopicPartition> partitions) { + return earliestOrLatestOffset(partitions, ListOffsetRequest.LATEST_TIMESTAMP); + } + + private Map<TopicPartition, Long> earliestOrLatestOffset(Collection<TopicPartition> partitions, long timestamp) { + Map<TopicPartition, Long> timestampsToSearch = new HashMap<>(); + for (TopicPartition tp : partitions) + timestampsToSearch.put(tp, timestamp); + Map<TopicPartition, Long> result = new HashMap<>(); + for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry : getOffsetsByTimes(timestampsToSearch).entrySet()) + result.put(entry.getKey(), entry.getValue().offset()); + + return result; + } + /** * Return the fetched records, empty the record buffer and update the consumed position. * @@ -457,64 +471,121 @@ public class Fetcher<K, V> { } /** - * Fetch a single offset before the given timestamp for the partition. + * Search the offsets by target times for the specified partitions. * - * @param topicPartition The partition that needs fetching offset. - * @param timestamp The timestamp for fetching offset. - * @return A response which can be polled to obtain the corresponding offset. + * @param timestampsToSearch the mapping between partitions and target time + * @return A response which can be polled to obtain the corresponding timestamps and offsets. */ - private RequestFuture<Long> sendListOffsetRequest(final TopicPartition topicPartition, long timestamp) { - Map<TopicPartition, ListOffsetRequest.PartitionData> partitions = new HashMap<>(1); - partitions.put(topicPartition, new ListOffsetRequest.PartitionData(timestamp, 1)); - PartitionInfo info = metadata.fetch().partition(topicPartition); - if (info == null) { - metadata.add(topicPartition.topic()); - log.debug("Partition {} is unknown for fetching offset, wait for metadata refresh", topicPartition); - return RequestFuture.staleMetadata(); - } else if (info.leader() == null) { - log.debug("Leader for partition {} unavailable for fetching offset, wait for metadata refresh", topicPartition); - return RequestFuture.leaderNotAvailable(); - } else { - Node node = info.leader(); - ListOffsetRequest request = new ListOffsetRequest(-1, partitions); - return client.send(node, ApiKeys.LIST_OFFSETS, request) - .compose(new RequestFutureAdapter<ClientResponse, Long>() { + private RequestFuture<Map<TopicPartition, OffsetAndTimestamp>> sendListOffsetRequests(final Map<TopicPartition, Long> timestampsToSearch) { + // Group the partitions by node. + final Map<Node, Map<TopicPartition, Long>> timestampsToSearchByNode = new HashMap<>(); + for (Map.Entry<TopicPartition, Long> entry: timestampsToSearch.entrySet()) { + TopicPartition tp = entry.getKey(); + PartitionInfo info = metadata.fetch().partition(tp); + if (info == null) { + metadata.add(tp.topic()); + log.debug("Partition {} is unknown for fetching offset, wait for metadata refresh", tp); + return RequestFuture.staleMetadata(); + } else if (info.leader() == null) { + log.debug("Leader for partition {} unavailable for fetching offset, wait for metadata refresh", tp); + return RequestFuture.leaderNotAvailable(); + } else { + Node node = info.leader(); + Map<TopicPartition, Long> topicData = timestampsToSearchByNode.get(node); + if (topicData == null) { + topicData = new HashMap<>(); + timestampsToSearchByNode.put(node, topicData); + } + topicData.put(entry.getKey(), entry.getValue()); + } + } + + final RequestFuture<Map<TopicPartition, OffsetAndTimestamp>> listOffsetRequestsFuture = new RequestFuture<>(); + final Map<TopicPartition, OffsetAndTimestamp> fetchedTimestampOffsets = new HashMap<>(); + for (Map.Entry<Node, Map<TopicPartition, Long>> entry : timestampsToSearchByNode.entrySet()) { + sendListOffsetRequest(entry.getKey(), entry.getValue()) + .addListener(new RequestFutureListener<Map<TopicPartition, OffsetAndTimestamp>>() { @Override - public void onSuccess(ClientResponse response, RequestFuture<Long> future) { - handleListOffsetResponse(topicPartition, response, future); + public void onSuccess(Map<TopicPartition, OffsetAndTimestamp> value) { + synchronized (listOffsetRequestsFuture) { + fetchedTimestampOffsets.putAll(value); + if (fetchedTimestampOffsets.size() == timestampsToSearch.size() && !listOffsetRequestsFuture.isDone()) + listOffsetRequestsFuture.complete(fetchedTimestampOffsets); + } + } + + @Override + public void onFailure(RuntimeException e) { + synchronized (listOffsetRequestsFuture) { + // This may cause all the requests to be retried, but should be rare. + if (!listOffsetRequestsFuture.isDone()) + listOffsetRequestsFuture.raise(e); + } } }); } + return listOffsetRequestsFuture; + } + + /** + * Send the ListOffsetRequest to a specific broker for the partitions and target timestamps. + * + * @param node The node to send the ListOffsetRequest to. + * @param timestampsToSearch The mapping from partitions to the target timestamps. + * @return A response which can be polled to obtain the corresponding timestamps and offsets. + */ + private RequestFuture<Map<TopicPartition, OffsetAndTimestamp>> sendListOffsetRequest(Node node, + final Map<TopicPartition, Long> timestampsToSearch) { + ListOffsetRequest request = new ListOffsetRequest(timestampsToSearch, ListOffsetRequest.CONSUMER_REPLICA_ID); + log.trace("Sending ListOffsetRequest {} to broker {}", request, node); + return client.send(node, ApiKeys.LIST_OFFSETS, request) + .compose(new RequestFutureAdapter<ClientResponse, Map<TopicPartition, OffsetAndTimestamp>>() { + @Override + public void onSuccess(ClientResponse response, RequestFuture<Map<TopicPartition, OffsetAndTimestamp>> future) { + handleListOffsetResponse(timestampsToSearch, response, future); + } + }); } /** * Callback for the response of the list offset call above. - * @param topicPartition The partition that was fetched + * @param timestampsToSearch The mapping from partitions to target timestamps * @param clientResponse The response from the server. + * @param future The future to be completed by the response. */ - private void handleListOffsetResponse(TopicPartition topicPartition, + private void handleListOffsetResponse(Map<TopicPartition, Long> timestampsToSearch, ClientResponse clientResponse, - RequestFuture<Long> future) { + RequestFuture<Map<TopicPartition, OffsetAndTimestamp>> future) { ListOffsetResponse lor = new ListOffsetResponse(clientResponse.responseBody()); - short errorCode = lor.responseData().get(topicPartition).errorCode; - if (errorCode == Errors.NONE.code()) { - List<Long> offsets = lor.responseData().get(topicPartition).offsets; - if (offsets.size() != 1) - throw new IllegalStateException("This should not happen."); - long offset = offsets.get(0); - log.debug("Fetched offset {} for partition {}", offset, topicPartition); - - future.complete(offset); - } else if (errorCode == Errors.NOT_LEADER_FOR_PARTITION.code() - || errorCode == Errors.UNKNOWN_TOPIC_OR_PARTITION.code()) { - log.debug("Attempt to fetch offsets for partition {} failed due to obsolete leadership information, retrying.", - topicPartition); - future.raise(Errors.forCode(errorCode)); - } else { - log.warn("Attempt to fetch offsets for partition {} failed due to: {}", - topicPartition, Errors.forCode(errorCode).message()); - future.raise(new StaleMetadataException()); + Map<TopicPartition, OffsetAndTimestamp> timestampOffsetMap = new HashMap<>(); + for (Map.Entry<TopicPartition, Long> entry : timestampsToSearch.entrySet()) { + TopicPartition topicPartition = entry.getKey(); + ListOffsetResponse.PartitionData partitionData = lor.responseData().get(topicPartition); + Errors error = Errors.forCode(partitionData.errorCode); + if (error == Errors.NONE) { + OffsetAndTimestamp offsetAndTimestamp = null; + if (partitionData.offset != ListOffsetResponse.UNKNOWN_OFFSET) + offsetAndTimestamp = new OffsetAndTimestamp(partitionData.offset, partitionData.timestamp); + log.debug("Fetched {} for partition {}", offsetAndTimestamp, topicPartition); + timestampOffsetMap.put(topicPartition, offsetAndTimestamp); + } else if (error == Errors.INVALID_REQUEST) { + // The message format on the broker side is before 0.10.0, we simply put null in the response. + log.debug("Cannot search by timestamp for partition {} because the message format version " + + "is before 0.10.0", topicPartition); + timestampOffsetMap.put(topicPartition, null); + } else if (error == Errors.NOT_LEADER_FOR_PARTITION + || error == Errors.UNKNOWN_TOPIC_OR_PARTITION) { + log.debug("Attempt to fetch offsets for partition {} failed due to obsolete leadership information, retrying.", + topicPartition); + future.raise(error); + } else { + log.warn("Attempt to fetch offsets for partition {} failed due to: {}", + topicPartition, error.message()); + future.raise(new StaleMetadataException()); + } } + if (!future.isDone()) + future.complete(timestampOffsetMap); } private List<TopicPartition> fetchablePartitions() { @@ -550,7 +621,7 @@ public class Fetcher<K, V> { fetch.put(partition, new FetchRequest.PartitionData(position, this.fetchSize)); log.trace("Added fetch request for partition {} at offset {}", partition, position); } else { - log.trace("Skipping fetch for partition {} because there is an inflight request to {}", partition, node); + log.trace("Skipping fetch for partition {} because there is an in-flight request to {}", partition, node); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/eaaa433f/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java index d64cf6d..5abf125 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java @@ -375,6 +375,12 @@ public class Protocol { new Field("max_num_offsets", INT32, "Maximum offsets to return.")); + public static final Schema LIST_OFFSET_REQUEST_PARTITION_V1 = new Schema(new Field("partition", + INT32, + "Topic partition id."), + new Field("timestamp", + INT64, + "The target timestamp for the partition.")); public static final Schema LIST_OFFSET_REQUEST_TOPIC_V0 = new Schema(new Field("topic", STRING, @@ -382,6 +388,12 @@ public class Protocol { new Field("partitions", new ArrayOf(LIST_OFFSET_REQUEST_PARTITION_V0), "Partitions to list offset.")); + public static final Schema LIST_OFFSET_REQUEST_TOPIC_V1 = new Schema(new Field("topic", + STRING, + "Topic to list offset."), + new Field("partitions", + new ArrayOf(LIST_OFFSET_REQUEST_PARTITION_V1), + "Partitions to list offset.")); public static final Schema LIST_OFFSET_REQUEST_V0 = new Schema(new Field("replica_id", INT32, @@ -389,6 +401,12 @@ public class Protocol { new Field("topics", new ArrayOf(LIST_OFFSET_REQUEST_TOPIC_V0), "Topics to list offsets.")); + public static final Schema LIST_OFFSET_REQUEST_V1 = new Schema(new Field("replica_id", + INT32, + "Broker id of the follower. For normal consumers, use -1."), + new Field("topics", + new ArrayOf(LIST_OFFSET_REQUEST_TOPIC_V1), + "Topics to list offsets.")); public static final Schema LIST_OFFSET_RESPONSE_PARTITION_V0 = new Schema(new Field("partition", INT32, @@ -398,15 +416,33 @@ public class Protocol { new ArrayOf(INT64), "A list of offsets.")); + public static final Schema LIST_OFFSET_RESPONSE_PARTITION_V1 = new Schema(new Field("partition", + INT32, + "Topic partition id."), + new Field("error_code", INT16), + new Field("timestamp", + INT64, + "The timestamp associated with the returned offset"), + new Field("offset", + INT64, + "offsets found")); + public static final Schema LIST_OFFSET_RESPONSE_TOPIC_V0 = new Schema(new Field("topic", STRING), new Field("partition_responses", new ArrayOf(LIST_OFFSET_RESPONSE_PARTITION_V0))); + public static final Schema LIST_OFFSET_RESPONSE_TOPIC_V1 = new Schema(new Field("topic", STRING), + new Field("partition_responses", + new ArrayOf(LIST_OFFSET_RESPONSE_PARTITION_V1))); + public static final Schema LIST_OFFSET_RESPONSE_V0 = new Schema(new Field("responses", new ArrayOf(LIST_OFFSET_RESPONSE_TOPIC_V0))); - public static final Schema[] LIST_OFFSET_REQUEST = new Schema[] {LIST_OFFSET_REQUEST_V0}; - public static final Schema[] LIST_OFFSET_RESPONSE = new Schema[] {LIST_OFFSET_RESPONSE_V0}; + public static final Schema LIST_OFFSET_RESPONSE_V1 = new Schema(new Field("responses", + new ArrayOf(LIST_OFFSET_RESPONSE_TOPIC_V1))); + + public static final Schema[] LIST_OFFSET_REQUEST = new Schema[] {LIST_OFFSET_REQUEST_V0, LIST_OFFSET_REQUEST_V1}; + public static final Schema[] LIST_OFFSET_RESPONSE = new Schema[] {LIST_OFFSET_RESPONSE_V0, LIST_OFFSET_RESPONSE_V1}; /* Fetch api */ public static final Schema FETCH_REQUEST_PARTITION_V0 = new Schema(new Field("partition", http://git-wip-us.apache.org/repos/asf/kafka/blob/eaaa433f/clients/src/main/java/org/apache/kafka/common/record/OffsetAndTimestamp.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/record/OffsetAndTimestamp.java b/clients/src/main/java/org/apache/kafka/common/record/OffsetAndTimestamp.java new file mode 100644 index 0000000..562585b --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/record/OffsetAndTimestamp.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.record; + +import java.util.Objects; + +/** + * A container class for offset and timestamp. + */ +public final class OffsetAndTimestamp { + private final long timestamp; + private final long offset; + + public OffsetAndTimestamp(long offset, long timestamp) { + this.offset = offset; + this.timestamp = timestamp; + } + + public long timestamp() { + return timestamp; + } + + public long offset() { + return offset; + } + + @Override + public String toString() { + return "{Timestamp = " + timestamp + ", Offset = " + offset + "}"; + } + + @Override + public int hashCode() { + return Objects.hash(timestamp, offset); + } + + @Override + public boolean equals(Object o) { + if (o == null || !(o instanceof OffsetAndTimestamp)) + return false; + OffsetAndTimestamp other = (OffsetAndTimestamp) o; + return this.timestamp == other.timestamp() && this.offset == other.offset(); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/eaaa433f/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java index a3777e2..7e586a4 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java @@ -26,9 +26,12 @@ import org.apache.kafka.common.utils.CollectionUtils; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; public class ListOffsetRequest extends AbstractRequest { @@ -53,7 +56,13 @@ public class ListOffsetRequest extends AbstractRequest { private final int replicaId; private final Map<TopicPartition, PartitionData> offsetData; + private final Map<TopicPartition, Long> partitionTimestamps; + private final Set<TopicPartition> duplicatePartitions; + /** + * This class is only used by ListOffsetRequest v0 which has been deprecated. + */ + @Deprecated public static final class PartitionData { public final long timestamp; public final int maxNumOffsets; @@ -64,40 +73,76 @@ public class ListOffsetRequest extends AbstractRequest { } } + /** + * Constructor for ListOffsetRequest v0 + */ + @Deprecated public ListOffsetRequest(Map<TopicPartition, PartitionData> offsetData) { - this(CONSUMER_REPLICA_ID, offsetData); + this(CONSUMER_REPLICA_ID, offsetData, 0); } + /** + * Constructor for ListOffsetRequest v0 + */ + @Deprecated public ListOffsetRequest(int replicaId, Map<TopicPartition, PartitionData> offsetData) { - super(new Struct(CURRENT_SCHEMA)); - Map<String, Map<Integer, PartitionData>> topicsData = CollectionUtils.groupDataByTopic(offsetData); + this(replicaId, offsetData, 0); + } + + /** + * Constructor for ListOffsetRequest v1. + */ + public ListOffsetRequest(Map<TopicPartition, ?> targetTimes, int replicaId) { + this(replicaId, targetTimes, 1); + } + + /** + * Private constructor with a specified version. + */ + @SuppressWarnings("unchecked") + private ListOffsetRequest(int replicaId, Map<TopicPartition, ?> targetTimes, int version) { + super(new Struct(ProtoUtils.requestSchema(ApiKeys.LIST_OFFSETS.id, version))); + Map<String, Map<Integer, Object>> topicsData = + CollectionUtils.groupDataByTopic((Map<TopicPartition, Object>) targetTimes); struct.set(REPLICA_ID_KEY_NAME, replicaId); List<Struct> topicArray = new ArrayList<Struct>(); - for (Map.Entry<String, Map<Integer, PartitionData>> topicEntry: topicsData.entrySet()) { + for (Map.Entry<String, Map<Integer, Object>> topicEntry: topicsData.entrySet()) { Struct topicData = struct.instance(TOPICS_KEY_NAME); topicData.set(TOPIC_KEY_NAME, topicEntry.getKey()); List<Struct> partitionArray = new ArrayList<Struct>(); - for (Map.Entry<Integer, PartitionData> partitionEntry : topicEntry.getValue().entrySet()) { - PartitionData offsetPartitionData = partitionEntry.getValue(); - Struct partitionData = topicData.instance(PARTITIONS_KEY_NAME); - partitionData.set(PARTITION_KEY_NAME, partitionEntry.getKey()); - partitionData.set(TIMESTAMP_KEY_NAME, offsetPartitionData.timestamp); - partitionData.set(MAX_NUM_OFFSETS_KEY_NAME, offsetPartitionData.maxNumOffsets); - partitionArray.add(partitionData); + for (Map.Entry<Integer, Object> partitionEntry : topicEntry.getValue().entrySet()) { + if (version == 0) { + PartitionData offsetPartitionData = (PartitionData) partitionEntry.getValue(); + Struct partitionData = topicData.instance(PARTITIONS_KEY_NAME); + partitionData.set(PARTITION_KEY_NAME, partitionEntry.getKey()); + partitionData.set(TIMESTAMP_KEY_NAME, offsetPartitionData.timestamp); + partitionData.set(MAX_NUM_OFFSETS_KEY_NAME, offsetPartitionData.maxNumOffsets); + partitionArray.add(partitionData); + } else { + Long timestamp = (Long) partitionEntry.getValue(); + Struct partitionData = topicData.instance(PARTITIONS_KEY_NAME); + partitionData.set(PARTITION_KEY_NAME, partitionEntry.getKey()); + partitionData.set(TIMESTAMP_KEY_NAME, timestamp); + partitionArray.add(partitionData); + } } topicData.set(PARTITIONS_KEY_NAME, partitionArray.toArray()); topicArray.add(topicData); } struct.set(TOPICS_KEY_NAME, topicArray.toArray()); this.replicaId = replicaId; - this.offsetData = offsetData; + this.offsetData = version == 0 ? (Map<TopicPartition, PartitionData>) targetTimes : null; + this.partitionTimestamps = version == 1 ? (Map<TopicPartition, Long>) targetTimes : null; + this.duplicatePartitions = Collections.emptySet(); } public ListOffsetRequest(Struct struct) { super(struct); + Set<TopicPartition> duplicatePatitions = new HashSet<>(); replicaId = struct.getInt(REPLICA_ID_KEY_NAME); - offsetData = new HashMap<TopicPartition, PartitionData>(); + offsetData = new HashMap<>(); + partitionTimestamps = new HashMap<>(); for (Object topicResponseObj : struct.getArray(TOPICS_KEY_NAME)) { Struct topicResponse = (Struct) topicResponseObj; String topic = topicResponse.getString(TOPIC_KEY_NAME); @@ -105,25 +150,40 @@ public class ListOffsetRequest extends AbstractRequest { Struct partitionResponse = (Struct) partitionResponseObj; int partition = partitionResponse.getInt(PARTITION_KEY_NAME); long timestamp = partitionResponse.getLong(TIMESTAMP_KEY_NAME); - int maxNumOffsets = partitionResponse.getInt(MAX_NUM_OFFSETS_KEY_NAME); - PartitionData partitionData = new PartitionData(timestamp, maxNumOffsets); - offsetData.put(new TopicPartition(topic, partition), partitionData); + TopicPartition tp = new TopicPartition(topic, partition); + if (partitionResponse.hasField(MAX_NUM_OFFSETS_KEY_NAME)) { + int maxNumOffsets = partitionResponse.getInt(MAX_NUM_OFFSETS_KEY_NAME); + PartitionData partitionData = new PartitionData(timestamp, maxNumOffsets); + offsetData.put(tp, partitionData); + } else { + if (partitionTimestamps.put(tp, timestamp) != null) + duplicatePatitions.add(tp); + } } } + this.duplicatePartitions = duplicatePatitions; } @Override public AbstractRequestResponse getErrorResponse(int versionId, Throwable e) { Map<TopicPartition, ListOffsetResponse.PartitionData> responseData = new HashMap<TopicPartition, ListOffsetResponse.PartitionData>(); - for (Map.Entry<TopicPartition, PartitionData> entry: offsetData.entrySet()) { - ListOffsetResponse.PartitionData partitionResponse = new ListOffsetResponse.PartitionData(Errors.forException(e).code(), new ArrayList<Long>()); - responseData.put(entry.getKey(), partitionResponse); + if (versionId == 0) { + for (Map.Entry<TopicPartition, PartitionData> entry : offsetData.entrySet()) { + ListOffsetResponse.PartitionData partitionResponse = new ListOffsetResponse.PartitionData(Errors.forException(e).code(), new ArrayList<Long>()); + responseData.put(entry.getKey(), partitionResponse); + } + } else { + for (Map.Entry<TopicPartition, Long> entry : partitionTimestamps.entrySet()) { + ListOffsetResponse.PartitionData partitionResponse = new ListOffsetResponse.PartitionData(Errors.forException(e).code(), -1L, -1L); + responseData.put(entry.getKey(), partitionResponse); + } } switch (versionId) { case 0: - return new ListOffsetResponse(responseData); + case 1: + return new ListOffsetResponse(responseData, versionId); default: throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d", versionId, this.getClass().getSimpleName(), ProtoUtils.latestVersion(ApiKeys.LIST_OFFSETS.id))); @@ -134,10 +194,19 @@ public class ListOffsetRequest extends AbstractRequest { return replicaId; } + @Deprecated public Map<TopicPartition, PartitionData> offsetData() { return offsetData; } + public Map<TopicPartition, Long> partitionTimestamps() { + return partitionTimestamps; + } + + public Set<TopicPartition> duplicatePartitions() { + return duplicatePartitions; + } + public static ListOffsetRequest parse(ByteBuffer buffer, int versionId) { return new ListOffsetRequest(ProtoUtils.parseRequest(ApiKeys.LIST_OFFSETS.id, versionId, buffer)); } http://git-wip-us.apache.org/repos/asf/kafka/blob/eaaa433f/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java index 5befe14..bc8c8d6 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java @@ -30,6 +30,8 @@ import java.util.List; import java.util.Map; public class ListOffsetResponse extends AbstractRequestResponse { + public static final long UNKNOWN_TIMESTAMP = -1L; + public static final long UNKNOWN_OFFSET = -1L; private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.LIST_OFFSETS.id); private static final String RESPONSES_KEY_NAME = "responses"; @@ -47,25 +49,58 @@ public class ListOffsetResponse extends AbstractRequestResponse { * * UNKNOWN_TOPIC_OR_PARTITION (3) * NOT_LEADER_FOR_PARTITION (6) + * INVALID_REQUEST (42) * UNKNOWN (-1) */ + // This key is only used by ListOffsetResponse v0 + @Deprecated private static final String OFFSETS_KEY_NAME = "offsets"; + private static final String TIMESTAMP_KEY_NAME = "timestamp"; + private static final String OFFSET_KEY_NAME = "offset"; private final Map<TopicPartition, PartitionData> responseData; public static final class PartitionData { public final short errorCode; + // The offsets list is only used in ListOffsetResponse v0. + @Deprecated public final List<Long> offsets; + public final Long timestamp; + public final Long offset; + /** + * Constructor for ListOffsetResponse v0 + */ + @Deprecated public PartitionData(short errorCode, List<Long> offsets) { this.errorCode = errorCode; this.offsets = offsets; + this.timestamp = null; + this.offset = null; + } + + /** + * Constructor for ListOffsetResponse v1 + */ + public PartitionData(short errorCode, long timestamp, long offset) { + this.errorCode = errorCode; + this.timestamp = timestamp; + this.offset = offset; + this.offsets = null; } } + /** + * Constructor for ListOffsetResponse v0. + */ + @Deprecated public ListOffsetResponse(Map<TopicPartition, PartitionData> responseData) { - super(new Struct(CURRENT_SCHEMA)); + this(responseData, 0); + } + + public ListOffsetResponse(Map<TopicPartition, PartitionData> responseData, int version) { + super(new Struct(ProtoUtils.responseSchema(ApiKeys.LIST_OFFSETS.id, version))); Map<String, Map<Integer, PartitionData>> topicsData = CollectionUtils.groupDataByTopic(responseData); List<Struct> topicArray = new ArrayList<Struct>(); @@ -78,7 +113,12 @@ public class ListOffsetResponse extends AbstractRequestResponse { Struct partitionData = topicData.instance(PARTITIONS_KEY_NAME); partitionData.set(PARTITION_KEY_NAME, partitionEntry.getKey()); partitionData.set(ERROR_CODE_KEY_NAME, offsetPartitionData.errorCode); - partitionData.set(OFFSETS_KEY_NAME, offsetPartitionData.offsets.toArray()); + if (version == 0) + partitionData.set(OFFSETS_KEY_NAME, offsetPartitionData.offsets.toArray()); + else { + partitionData.set(TIMESTAMP_KEY_NAME, offsetPartitionData.timestamp); + partitionData.set(OFFSET_KEY_NAME, offsetPartitionData.offset); + } partitionArray.add(partitionData); } topicData.set(PARTITIONS_KEY_NAME, partitionArray.toArray()); @@ -98,11 +138,18 @@ public class ListOffsetResponse extends AbstractRequestResponse { Struct partitionResponse = (Struct) partitionResponseObj; int partition = partitionResponse.getInt(PARTITION_KEY_NAME); short errorCode = partitionResponse.getShort(ERROR_CODE_KEY_NAME); - Object[] offsets = partitionResponse.getArray(OFFSETS_KEY_NAME); - List<Long> offsetsList = new ArrayList<Long>(); - for (Object offset: offsets) - offsetsList.add((Long) offset); - PartitionData partitionData = new PartitionData(errorCode, offsetsList); + PartitionData partitionData; + if (partitionResponse.hasField(OFFSETS_KEY_NAME)) { + Object[] offsets = partitionResponse.getArray(OFFSETS_KEY_NAME); + List<Long> offsetsList = new ArrayList<Long>(); + for (Object offset : offsets) + offsetsList.add((Long) offset); + partitionData = new PartitionData(errorCode, offsetsList); + } else { + long timestamp = partitionResponse.getLong(TIMESTAMP_KEY_NAME); + long offset = partitionResponse.getLong(OFFSET_KEY_NAME); + partitionData = new PartitionData(errorCode, timestamp, offset); + } responseData.put(new TopicPartition(topic, partition), partitionData); } } @@ -115,4 +162,8 @@ public class ListOffsetResponse extends AbstractRequestResponse { public static ListOffsetResponse parse(ByteBuffer buffer) { return new ListOffsetResponse(CURRENT_SCHEMA.read(buffer)); } + + public static ListOffsetResponse parse(ByteBuffer buffer, int version) { + return new ListOffsetResponse(ProtoUtils.responseSchema(ApiKeys.LIST_OFFSETS.id, version).read(buffer)); + } } http://git-wip-us.apache.org/repos/asf/kafka/blob/eaaa433f/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java index 2408c11..0096e72 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java @@ -1066,9 +1066,9 @@ public class KafkaConsumerTest { Map<TopicPartition, ListOffsetResponse.PartitionData> partitionData = new HashMap<>(); for (Map.Entry<TopicPartition, Long> partitionOffset : offsets.entrySet()) { partitionData.put(partitionOffset.getKey(), new ListOffsetResponse.PartitionData(error, - singletonList(partitionOffset.getValue()))); + 1L, partitionOffset.getValue())); } - return new ListOffsetResponse(partitionData).toStruct(); + return new ListOffsetResponse(partitionData, 1).toStruct(); } private Struct fetchResponse(Map<TopicPartition, FetchInfo> fetches) { http://git-wip-us.apache.org/repos/asf/kafka/blob/eaaa433f/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java index 777b67f..4f8425a 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java @@ -125,10 +125,12 @@ public class AbstractCoordinatorTest { synchronized (coordinator) { coordinator.notify(); } - Thread.sleep(100); - - coordinator.pollHeartbeat(mockTime.milliseconds()); - fail("Expected pollHeartbeat to raise an error"); + long startMs = System.currentTimeMillis(); + while (System.currentTimeMillis() - startMs < 1000) { + Thread.sleep(10); + coordinator.pollHeartbeat(mockTime.milliseconds()); + } + fail("Expected pollHeartbeat to raise an error in 1 second"); } catch (RuntimeException exception) { assertEquals(exception, e); } http://git-wip-us.apache.org/repos/asf/kafka/blob/eaaa433f/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java index 88a0526..d14488c 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java @@ -41,6 +41,7 @@ import org.apache.kafka.common.protocol.types.Struct; import org.apache.kafka.common.record.CompressionType; import org.apache.kafka.common.record.Compressor; import org.apache.kafka.common.record.MemoryRecords; +import org.apache.kafka.common.record.OffsetAndTimestamp; import org.apache.kafka.common.record.Record; import org.apache.kafka.common.requests.FetchRequest; import org.apache.kafka.common.requests.FetchResponse; @@ -491,7 +492,7 @@ public class FetcherTest { // with no commit position, we should reset using the default strategy defined above (EARLIEST) client.prepareResponse(listOffsetRequestMatcher(ListOffsetRequest.EARLIEST_TIMESTAMP), - listOffsetResponse(Errors.NONE, Arrays.asList(5L))); + listOffsetResponse(Errors.NONE, 1L, 5L)); fetcher.updateFetchPositions(singleton(tp)); assertFalse(subscriptions.isOffsetResetNeeded(tp)); assertTrue(subscriptions.isFetchable(tp)); @@ -504,7 +505,7 @@ public class FetcherTest { subscriptions.needOffsetReset(tp, OffsetResetStrategy.LATEST); client.prepareResponse(listOffsetRequestMatcher(ListOffsetRequest.LATEST_TIMESTAMP), - listOffsetResponse(Errors.NONE, Arrays.asList(5L))); + listOffsetResponse(Errors.NONE, 1L, 5L)); fetcher.updateFetchPositions(singleton(tp)); assertFalse(subscriptions.isOffsetResetNeeded(tp)); assertTrue(subscriptions.isFetchable(tp)); @@ -517,7 +518,7 @@ public class FetcherTest { subscriptions.needOffsetReset(tp, OffsetResetStrategy.EARLIEST); client.prepareResponse(listOffsetRequestMatcher(ListOffsetRequest.EARLIEST_TIMESTAMP), - listOffsetResponse(Errors.NONE, Arrays.asList(5L))); + listOffsetResponse(Errors.NONE, 1L, 5L)); fetcher.updateFetchPositions(singleton(tp)); assertFalse(subscriptions.isOffsetResetNeeded(tp)); assertTrue(subscriptions.isFetchable(tp)); @@ -531,11 +532,11 @@ public class FetcherTest { // First request gets a disconnect client.prepareResponse(listOffsetRequestMatcher(ListOffsetRequest.LATEST_TIMESTAMP), - listOffsetResponse(Errors.NONE, Arrays.asList(5L)), true); + listOffsetResponse(Errors.NONE, 1L, 5L), true); // Next one succeeds client.prepareResponse(listOffsetRequestMatcher(ListOffsetRequest.LATEST_TIMESTAMP), - listOffsetResponse(Errors.NONE, Arrays.asList(5L))); + listOffsetResponse(Errors.NONE, 1L, 5L)); fetcher.updateFetchPositions(singleton(tp)); assertFalse(subscriptions.isOffsetResetNeeded(tp)); assertTrue(subscriptions.isFetchable(tp)); @@ -635,23 +636,33 @@ public class FetcherTest { assertEquals(300, maxMetric.value(), EPSILON); } + @Test + public void testGetOffsetsByTimes() { + client.prepareResponseFrom(listOffsetResponse(Errors.NONE, 100L, 100L), cluster.leaderFor(tp)); + + Map<TopicPartition, OffsetAndTimestamp> offsetAndTimestampMap = + fetcher.getOffsetsByTimes(Collections.singletonMap(tp, 0L)); + assertEquals(offsetAndTimestampMap.get(tp).timestamp(), 100L); + assertEquals(offsetAndTimestampMap.get(tp).offset(), 100L); + + } + private MockClient.RequestMatcher listOffsetRequestMatcher(final long timestamp) { // matches any list offset request with the provided timestamp return new MockClient.RequestMatcher() { @Override public boolean matches(ClientRequest request) { ListOffsetRequest req = new ListOffsetRequest(request.request().body()); - ListOffsetRequest.PartitionData partitionData = req.offsetData().get(tp); - return partitionData != null && partitionData.timestamp == timestamp; + return timestamp == req.partitionTimestamps().get(tp); } }; } - private Struct listOffsetResponse(Errors error, List<Long> offsets) { - ListOffsetResponse.PartitionData partitionData = new ListOffsetResponse.PartitionData(error.code(), offsets); + private Struct listOffsetResponse(Errors error, long timestamp, long offset) { + ListOffsetResponse.PartitionData partitionData = new ListOffsetResponse.PartitionData(error.code(), timestamp, offset); Map<TopicPartition, ListOffsetResponse.PartitionData> allPartitionData = new HashMap<>(); allPartitionData.put(tp, partitionData); - ListOffsetResponse response = new ListOffsetResponse(allPartitionData); + ListOffsetResponse response = new ListOffsetResponse(allPartitionData, 1); return response.toStruct(); } http://git-wip-us.apache.org/repos/asf/kafka/blob/eaaa433f/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java index b3baa63..574f52d 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java @@ -69,9 +69,9 @@ public class RequestResponseTest { createDescribeGroupRequest(), createDescribeGroupRequest().getErrorResponse(0, new UnknownServerException()), createDescribeGroupResponse(), - createListOffsetRequest(), - createListOffsetRequest().getErrorResponse(0, new UnknownServerException()), - createListOffsetResponse(), + createListOffsetRequest(1), + createListOffsetRequest(1).getErrorResponse(1, new UnknownServerException()), + createListOffsetResponse(1), MetadataRequest.allTopics(), createMetadataRequest(Arrays.asList("topic1")), createMetadataRequest(Arrays.asList("topic1")).getErrorResponse(2, new UnknownServerException()), @@ -129,6 +129,9 @@ public class RequestResponseTest { checkSerialization(createUpdateMetadataRequest(1, null), 1); checkSerialization(createUpdateMetadataRequest(1, "rack1"), 1); checkSerialization(createUpdateMetadataRequest(1, null).getErrorResponse(1, new UnknownServerException()), 1); + checkSerialization(createListOffsetRequest(0), 0); + checkSerialization(createListOffsetRequest(0).getErrorResponse(0, new UnknownServerException()), 0); + checkSerialization(createListOffsetResponse(0), 0); } private void checkOlderFetchVersions() throws Exception { @@ -151,7 +154,7 @@ public class RequestResponseTest { Method deserializer = req.getClass().getDeclaredMethod("parse", ByteBuffer.class, Integer.TYPE); deserialized = (AbstractRequestResponse) deserializer.invoke(null, buffer, version); } - assertEquals("The original and deserialized of " + req.getClass().getSimpleName() + " should be the same.", req, deserialized); + assertEquals("The original and deserialized of " + req.getClass().getSimpleName() + "(version " + version + ") should be the same.", req, deserialized); assertEquals("The original and deserialized of " + req.getClass().getSimpleName() + " should have the same hashcode.", req.hashCode(), deserialized.hashCode()); } @@ -304,16 +307,32 @@ public class RequestResponseTest { return new LeaveGroupResponse(Errors.NONE.code()); } - private AbstractRequest createListOffsetRequest() { - Map<TopicPartition, ListOffsetRequest.PartitionData> offsetData = new HashMap<>(); - offsetData.put(new TopicPartition("test", 0), new ListOffsetRequest.PartitionData(1000000L, 10)); - return new ListOffsetRequest(-1, offsetData); + private AbstractRequest createListOffsetRequest(int version) { + if (version == 0) { + Map<TopicPartition, ListOffsetRequest.PartitionData> offsetData = new HashMap<>(); + offsetData.put(new TopicPartition("test", 0), new ListOffsetRequest.PartitionData(1000000L, 10)); + return new ListOffsetRequest(offsetData); + } else if (version == 1) { + Map<TopicPartition, Long> offsetData = new HashMap<>(); + offsetData.put(new TopicPartition("test", 0), 1000000L); + return new ListOffsetRequest(offsetData, ListOffsetRequest.CONSUMER_REPLICA_ID); + } else { + throw new IllegalArgumentException("Illegal ListOffsetRequest version " + version); + } } - private AbstractRequestResponse createListOffsetResponse() { - Map<TopicPartition, ListOffsetResponse.PartitionData> responseData = new HashMap<>(); - responseData.put(new TopicPartition("test", 0), new ListOffsetResponse.PartitionData(Errors.NONE.code(), Arrays.asList(100L))); - return new ListOffsetResponse(responseData); + private AbstractRequestResponse createListOffsetResponse(int version) { + if (version == 0) { + Map<TopicPartition, ListOffsetResponse.PartitionData> responseData = new HashMap<>(); + responseData.put(new TopicPartition("test", 0), new ListOffsetResponse.PartitionData(Errors.NONE.code(), Arrays.asList(100L))); + return new ListOffsetResponse(responseData); + } else if (version == 1) { + Map<TopicPartition, ListOffsetResponse.PartitionData> responseData = new HashMap<>(); + responseData.put(new TopicPartition("test", 0), new ListOffsetResponse.PartitionData(Errors.NONE.code(), 10000L, 100L)); + return new ListOffsetResponse(responseData, 1); + } else { + throw new IllegalArgumentException("Illegal ListOffsetResponse version " + version); + } } private AbstractRequest createMetadataRequest(List<String> topics) { http://git-wip-us.apache.org/repos/asf/kafka/blob/eaaa433f/core/src/main/scala/kafka/api/ApiVersion.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/api/ApiVersion.scala b/core/src/main/scala/kafka/api/ApiVersion.scala index cdd418d..0d9775a 100644 --- a/core/src/main/scala/kafka/api/ApiVersion.scala +++ b/core/src/main/scala/kafka/api/ApiVersion.scala @@ -57,7 +57,10 @@ object ApiVersion { "0.10.1-IV0" -> KAFKA_0_10_1_IV0, // 0.10.1-IV1 is introduced for KIP-74(fetch response size limit). "0.10.1-IV1" -> KAFKA_0_10_1_IV1, - "0.10.1" -> KAFKA_0_10_1_IV1 + // introduced ListGroupRequest v1 in KIP-79 + "0.10.1-IV2" -> KAFKA_0_10_1_IV2, + "0.10.1" -> KAFKA_0_10_1_IV2 + ) private val versionPattern = "\\.".r @@ -129,3 +132,9 @@ case object KAFKA_0_10_1_IV1 extends ApiVersion { val messageFormatVersion: Byte = Message.MagicValue_V1 val id: Int = 7 } + +case object KAFKA_0_10_1_IV2 extends ApiVersion { + val version: String = "0.10.1-IV2" + val messageFormatVersion: Byte = Message.MagicValue_V1 + val id: Int = 8 +} http://git-wip-us.apache.org/repos/asf/kafka/blob/eaaa433f/core/src/main/scala/kafka/log/FileMessageSet.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/log/FileMessageSet.scala b/core/src/main/scala/kafka/log/FileMessageSet.scala index c76653a..55dbfee 100755 --- a/core/src/main/scala/kafka/log/FileMessageSet.scala +++ b/core/src/main/scala/kafka/log/FileMessageSet.scala @@ -159,49 +159,34 @@ class FileMessageSet private[kafka](@volatile var file: File, /** * Search forward for the message whose timestamp is greater than or equals to the target timestamp. * - * The search will stop immediately when it sees a message in format version before 0.10.0. This is to avoid - * scanning the entire log when all the messages are still in old format. - * * @param targetTimestamp The timestamp to search for. * @param startingPosition The starting position to search. - * @return None, if no message exists at or after the starting position. - * Some(the_next_offset_to_read) otherwise. + * @return The timestamp and offset of the message found. None, if no message is found. */ - def searchForTimestamp(targetTimestamp: Long, startingPosition: Int): Option[Long] = { - var maxTimestampChecked = Message.NoTimestamp + def searchForTimestamp(targetTimestamp: Long, startingPosition: Int): Option[TimestampOffset] = { var lastOffsetChecked = -1L val messagesToSearch = read(startingPosition, sizeInBytes) for (messageAndOffset <- messagesToSearch) { val message = messageAndOffset.message lastOffsetChecked = messageAndOffset.offset - // Stop searching once we see message format before 0.10.0. - // This equivalent as treating message without timestamp has the largest timestamp. - // We do this to avoid scanning the entire log if no message has a timestamp. - if (message.magic == Message.MagicValue_V0) - return Some(messageAndOffset.offset) - else if (message.timestamp >= targetTimestamp) { + if (message.timestamp >= targetTimestamp) { // We found a message message.compressionCodec match { case NoCompressionCodec => - return Some(messageAndOffset.offset) + return Some(TimestampOffset(messageAndOffset.message.timestamp, messageAndOffset.offset)) case _ => // Iterate over the inner messages to get the exact offset. - for (innerMessage <- ByteBufferMessageSet.deepIterator(messageAndOffset)) { - val timestamp = innerMessage.message.timestamp + for (innerMessageAndOffset <- ByteBufferMessageSet.deepIterator(messageAndOffset)) { + val timestamp = innerMessageAndOffset.message.timestamp if (timestamp >= targetTimestamp) - return Some(innerMessage.offset) + return Some(TimestampOffset(innerMessageAndOffset.message.timestamp, innerMessageAndOffset.offset)) } throw new IllegalStateException(s"The message set (max timestamp = ${message.timestamp}, max offset = ${messageAndOffset.offset}" + s" should contain target timestamp $targetTimestamp but it does not.") } - } else - maxTimestampChecked = math.max(maxTimestampChecked, message.timestamp) + } } - - if (lastOffsetChecked >= 0) - Some(lastOffsetChecked + 1) - else - None + None } /** http://git-wip-us.apache.org/repos/asf/kafka/blob/eaaa433f/core/src/main/scala/kafka/log/Log.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index cfd0472..6043b01 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -17,7 +17,7 @@ package kafka.log -import kafka.api.OffsetRequest +import kafka.api.KAFKA_0_10_0_IV0 import kafka.utils._ import kafka.message._ import kafka.common._ @@ -28,8 +28,9 @@ import java.util.concurrent.{ConcurrentNavigableMap, ConcurrentSkipListMap} import java.util.concurrent.atomic._ import java.text.NumberFormat -import org.apache.kafka.common.errors.{CorruptRecordException, OffsetOutOfRangeException, RecordBatchTooLargeException, RecordTooLargeException} +import org.apache.kafka.common.errors.{InvalidRequestException, CorruptRecordException, OffsetOutOfRangeException, RecordBatchTooLargeException, RecordTooLargeException} import org.apache.kafka.common.record.TimestampType +import org.apache.kafka.common.requests.ListOffsetRequest import scala.collection.{Seq, JavaConversions} import com.yammer.metrics.core.Gauge @@ -583,59 +584,41 @@ class Log(val dir: File, * `NOTE:` OffsetRequest V0 does not use this method, the behavior of OffsetRequest V0 remains the same as before * , i.e. it only gives back the timestamp based on the last modification time of the log segments. * - * @param timestamp The given timestamp for offset fetching. + * @param targetTimestamp The given timestamp for offset fetching. * @return The offset of the first message whose timestamp is greater than or equals to the given timestamp. + * None if no such message is found. */ - def fetchOffsetsByTimestamp(timestamp: Long): Long = { - debug(s"Searching offset for timestamp $timestamp") - val segsArray = logSegments.toArray - if (timestamp == OffsetRequest.EarliestTime) - return segsArray(0).baseOffset + def fetchOffsetsByTimestamp(targetTimestamp: Long): Option[TimestampOffset] = { + debug(s"Searching offset for timestamp $targetTimestamp") - // set the target timestamp to be Long.MaxValue if we need to find from the latest. - val targetTimestamp = timestamp match { - case OffsetRequest.LatestTime => Long.MaxValue - case _ => timestamp - } + if (config.messageFormatVersion < KAFKA_0_10_0_IV0 && + targetTimestamp != ListOffsetRequest.EARLIEST_TIMESTAMP && + targetTimestamp != ListOffsetRequest.LATEST_TIMESTAMP) + throw new InvalidRequestException(s"Cannot search offsets based on timestamp because message format version " + + s"for partition $topicAndPartition is ${config.messageFormatVersion} which is earlier than the minimum " + + s"required version $KAFKA_0_10_0_IV0") - var foundOffset: Long = -1L - // We have this while loop here to make sure we are returning the valid offsets to our best knowledge. - // This while loop is to handle the case where the log is truncated during the timestamp search and we did not - // find any message. In this case, we need to retry the search. - do { - val targetSeg = { - // Get all the segments whose largest timestamp is smaller than target timestamp - val earlierSegs = segsArray.takeWhile(_.largestTimestamp < targetTimestamp) - // We need to search the first segment whose largest timestamp is greater than the target timestamp if there is one. - if (earlierSegs.length < segsArray.length) - segsArray(earlierSegs.length) - else - earlierSegs.last - } + // For the earliest and latest, we do not need to return the timestamp. + val segsArray = logSegments.toArray + if (targetTimestamp == ListOffsetRequest.EARLIEST_TIMESTAMP) + return Some(TimestampOffset(Message.NoTimestamp, segsArray(0).baseOffset)) + else if (targetTimestamp == ListOffsetRequest.LATEST_TIMESTAMP) + return Some(TimestampOffset(Message.NoTimestamp, logEndOffset)) + + val targetSeg = { + // Get all the segments whose largest timestamp is smaller than target timestamp + val earlierSegs = segsArray.takeWhile(_.largestTimestamp < targetTimestamp) + // We need to search the first segment whose largest timestamp is greater than the target timestamp if there is one. + if (earlierSegs.length < segsArray.length) + Some(segsArray(earlierSegs.length)) + else + None + } - // First cache the current log end offset - val leo = logEndOffset - foundOffset = { - // Use the cached log end offsets if - // 1. user is asking for latest messages, or, - // 2. we are searching on the active segment and the target timestamp is greater than the largestTimestamp - // after we cached the log end offset. (We have to use the cached log end offsets because it is possible that - // some messages with a larger timestamp are appended after we check the largest timestamp. Using log end offset - // after the timestamp check might skip those messages.) - if (targetTimestamp == Long.MaxValue - || (targetTimestamp > targetSeg.largestTimestamp && targetSeg == activeSegment)) - leo - else - // The findOffsetByTimestamp() method may return None when the log is truncated during the timestamp search. - // In that case we simply set the foundOffset to -1 so that we will search the timestamp again in the - // while loop. - targetSeg.findOffsetByTimestamp(targetTimestamp) match { - case Some(offset) => offset - case None => -1L - } - } - } while (foundOffset < 0) - foundOffset + targetSeg match { + case Some(segment) => segment.findOffsetByTimestamp(targetTimestamp) + case None => None + } } /** http://git-wip-us.apache.org/repos/asf/kafka/blob/eaaa433f/core/src/main/scala/kafka/log/LogSegment.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala index 3f76a84..1c39acf 100755 --- a/core/src/main/scala/kafka/log/LogSegment.scala +++ b/core/src/main/scala/kafka/log/LogSegment.scala @@ -387,21 +387,15 @@ class LogSegment(val log: FileMessageSet, * the truncated log and maybe retry or even do the search on another log segment. * * @param timestamp The timestamp to search for. - * @return an offset which points to the first message whose timestamp is larger than or equals to the - * target timestamp. - * None maybe returned when the log is truncated. + * @return the timestamp and offset of the first message whose timestamp is larger than or equals to the + * target timestamp. None will be returned if there is no such message. */ - def findOffsetByTimestamp(timestamp: Long): Option[Long] = { - if (log.end == log.start) { - // The log segment is empty, just return base offset with no timestamp. - Some(baseOffset) - } else { - // Get the index entry with a timestamp less than or equal to the target timestamp - val timestampOffset = timeIndex.lookup(timestamp) - val position = index.lookup(timestampOffset.offset).position - // Search the timestamp - log.searchForTimestamp(timestamp, position) - } + def findOffsetByTimestamp(timestamp: Long): Option[TimestampOffset] = { + // Get the index entry with a timestamp less than or equal to the target timestamp + val timestampOffset = timeIndex.lookup(timestamp) + val position = index.lookup(timestampOffset.offset).position + // Search the timestamp + log.searchForTimestamp(timestamp, position) } /** http://git-wip-us.apache.org/repos/asf/kafka/blob/eaaa433f/core/src/main/scala/kafka/server/KafkaApis.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 51c9eab..15e5b62 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -35,11 +35,11 @@ import kafka.network._ import kafka.network.RequestChannel.{Response, Session} import kafka.security.auth import kafka.security.auth.{Authorizer, ClusterAction, Create, Describe, Group, Operation, Read, Resource, Write, Delete} -import kafka.server.QuotaType._ import kafka.utils.{Logging, SystemTime, ZKGroupTopicDirs, ZkUtils} -import org.apache.kafka.common.errors.{ClusterAuthorizationException, NotLeaderForPartitionException, UnknownTopicOrPartitionException, TopicExistsException} +import org.apache.kafka.common.errors.{InvalidRequestException, ClusterAuthorizationException, NotLeaderForPartitionException, UnknownTopicOrPartitionException, TopicExistsException} import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.protocol.{ApiKeys, Errors, Protocol, SecurityProtocol} +import org.apache.kafka.common.requests.ListOffsetResponse.PartitionData import org.apache.kafka.common.requests.{ApiVersionsResponse, DescribeGroupsRequest, DescribeGroupsResponse, GroupCoordinatorRequest, GroupCoordinatorResponse, HeartbeatRequest, HeartbeatResponse, JoinGroupRequest, JoinGroupResponse, LeaderAndIsrRequest, LeaderAndIsrResponse, LeaveGroupRequest, LeaveGroupResponse, ListGroupsResponse, ListOffsetRequest, ListOffsetResponse, MetadataRequest, MetadataResponse, OffsetCommitRequest, OffsetCommitResponse, OffsetFetchRequest, OffsetFetchResponse, ProduceRequest, ProduceResponse, ResponseHeader, ResponseSend, StopReplicaRequest, StopReplicaResponse, SyncGroupRequest, SyncGroupResponse, UpdateMetadataRequest, UpdateMetadataResponse, CreateTopicsRequest, CreateTopicsResponse, DeleteTopicsRequest, DeleteTopicsResponse} import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse import org.apache.kafka.common.utils.Utils @@ -531,6 +531,22 @@ class KafkaApis(val requestChannel: RequestChannel, */ def handleOffsetRequest(request: RequestChannel.Request) { val correlationId = request.header.correlationId + val version = request.header.apiVersion() + + val mergedResponseMap = + if (version == 0) + handleOffsetRequestV0(request) + else + handleOffsetRequestV1(request) + + val responseHeader = new ResponseHeader(correlationId) + val response = new ListOffsetResponse(mergedResponseMap.asJava, version) + + requestChannel.sendResponse(new RequestChannel.Response(request, new ResponseSend(request.connectionId, responseHeader, response))) + } + + private def handleOffsetRequestV0(request : RequestChannel.Request) : Map[TopicPartition, ListOffsetResponse.PartitionData] = { + val correlationId = request.header.correlationId val clientId = request.header.clientId val offsetRequest = request.body.asInstanceOf[ListOffsetRequest] @@ -539,11 +555,10 @@ class KafkaApis(val requestChannel: RequestChannel, } val unauthorizedResponseStatus = unauthorizedRequestInfo.mapValues(_ => - new ListOffsetResponse.PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED.code, List[JLong]().asJava) + new PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED.code, List[JLong]().asJava) ) - val responseMap = authorizedRequestInfo.map(elem => { - val (topicPartition, partitionData) = elem + val responseMap = authorizedRequestInfo.map({case (topicPartition, partitionData) => try { // ensure leader exists val localReplica = if (offsetRequest.replicaId != ListOffsetRequest.DEBUGGING_REPLICA_ID) @@ -552,9 +567,9 @@ class KafkaApis(val requestChannel: RequestChannel, replicaManager.getReplicaOrException(topicPartition.topic, topicPartition.partition) val offsets = { val allOffsets = fetchOffsets(replicaManager.logManager, - topicPartition, - partitionData.timestamp, - partitionData.maxNumOffsets) + topicPartition, + partitionData.timestamp, + partitionData.maxNumOffsets) if (offsetRequest.replicaId != ListOffsetRequest.CONSUMER_REPLICA_ID) { allOffsets } else { @@ -569,26 +584,91 @@ class KafkaApis(val requestChannel: RequestChannel, } catch { // NOTE: UnknownTopicOrPartitionException and NotLeaderForPartitionException are special cased since these error messages // are typically transient and there is no value in logging the entire stack trace for the same - case utpe: UnknownTopicOrPartitionException => + case e @ ( _ : UnknownTopicOrPartitionException | _ : NotLeaderForPartitionException) => debug("Offset request with correlation id %d from client %s on partition %s failed due to %s".format( - correlationId, clientId, topicPartition, utpe.getMessage)) - (topicPartition, new ListOffsetResponse.PartitionData(Errors.forException(utpe).code, List[JLong]().asJava)) - case nle: NotLeaderForPartitionException => - debug("Offset request with correlation id %d from client %s on partition %s failed due to %s".format( - correlationId, clientId, topicPartition,nle.getMessage)) - (topicPartition, new ListOffsetResponse.PartitionData(Errors.forException(nle).code, List[JLong]().asJava)) + correlationId, clientId, topicPartition, e.getMessage)) + (topicPartition, new ListOffsetResponse.PartitionData(Errors.forException(e).code, List[JLong]().asJava)) case e: Throwable => error("Error while responding to offset request", e) (topicPartition, new ListOffsetResponse.PartitionData(Errors.forException(e).code, List[JLong]().asJava)) } }) + responseMap ++ unauthorizedResponseStatus + } - val mergedResponseMap = responseMap ++ unauthorizedResponseStatus + private def handleOffsetRequestV1(request : RequestChannel.Request): Map[TopicPartition, ListOffsetResponse.PartitionData] = { + val correlationId = request.header.correlationId + val clientId = request.header.clientId + val offsetRequest = request.body.asInstanceOf[ListOffsetRequest] - val responseHeader = new ResponseHeader(correlationId) - val response = new ListOffsetResponse(mergedResponseMap.asJava) + val (authorizedRequestInfo, unauthorizedRequestInfo) = offsetRequest.partitionTimestamps.asScala.partition { + case (topicPartition, _) => authorize(request.session, Describe, new Resource(auth.Topic, topicPartition.topic)) + } - requestChannel.sendResponse(new RequestChannel.Response(request, new ResponseSend(request.connectionId, responseHeader, response))) + val unauthorizedResponseStatus = unauthorizedRequestInfo.mapValues(_ => { + new PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED.code, + ListOffsetResponse.UNKNOWN_TIMESTAMP, + ListOffsetResponse.UNKNOWN_OFFSET) + }) + + val responseMap = authorizedRequestInfo.map({case (topicPartition, timestamp) => + if (offsetRequest.duplicatePartitions().contains(topicPartition)) { + debug(s"OffsetRequest with correlation id $correlationId from client $clientId on partition $topicPartition " + + s"failed because the partition is duplicated in the request.") + (topicPartition, new PartitionData(Errors.INVALID_REQUEST.code(), + ListOffsetResponse.UNKNOWN_TIMESTAMP, + ListOffsetResponse.UNKNOWN_OFFSET)) + } else { + try { + // ensure leader exists + val localReplica = if (offsetRequest.replicaId != ListOffsetRequest.DEBUGGING_REPLICA_ID) + replicaManager.getLeaderReplicaIfLocal(topicPartition.topic, topicPartition.partition) + else + replicaManager.getReplicaOrException(topicPartition.topic, topicPartition.partition) + val found = { + fetchOffsetForTimestamp(replicaManager.logManager, topicPartition, timestamp) match { + case Some(timestampOffset) => + // The request is not from a consumer client + if (offsetRequest.replicaId != ListOffsetRequest.CONSUMER_REPLICA_ID) + timestampOffset + // The request is from a consumer client + else { + // the found offset is smaller or equals to the high watermark + if (timestampOffset.offset <= localReplica.highWatermark.messageOffset) + timestampOffset + // the consumer wants the latest offset. + else if (timestamp == ListOffsetRequest.LATEST_TIMESTAMP) + TimestampOffset(Message.NoTimestamp, localReplica.highWatermark.messageOffset) + // The found offset is higher than the high watermark and the consumer is not asking for the end offset. + else + TimestampOffset(ListOffsetResponse.UNKNOWN_TIMESTAMP, ListOffsetResponse.UNKNOWN_OFFSET) + } + + case None => + TimestampOffset(ListOffsetResponse.UNKNOWN_TIMESTAMP, ListOffsetResponse.UNKNOWN_OFFSET) + } + } + (topicPartition, new ListOffsetResponse.PartitionData(Errors.NONE.code, found.timestamp, found.offset)) + } catch { + // NOTE: These exceptions are special cased since these error messages are typically transient or the client + // would have received a clear exception and there is no value in logging the entire stack trace for the same + case e @ (_ : UnknownTopicOrPartitionException | + _ : NotLeaderForPartitionException | + _ : InvalidRequestException) => + debug(s"Offset request with correlation id $correlationId from client $clientId on " + + s"partition $topicPartition failed due to ${e.getMessage}") + (topicPartition, new PartitionData(Errors.forException(e).code, + ListOffsetResponse.UNKNOWN_TIMESTAMP, + ListOffsetResponse.UNKNOWN_OFFSET)) + case e: Throwable => + error("Error while responding to offset request", e) + (topicPartition, new PartitionData(Errors.forException(e).code, + ListOffsetResponse.UNKNOWN_TIMESTAMP, + ListOffsetResponse.UNKNOWN_OFFSET)) + } + } + }) + responseMap ++ unauthorizedResponseStatus } def fetchOffsets(logManager: LogManager, topicPartition: TopicPartition, timestamp: Long, maxNumOffsets: Int): Seq[Long] = { @@ -603,6 +683,15 @@ class KafkaApis(val requestChannel: RequestChannel, } } + private def fetchOffsetForTimestamp(logManager: LogManager, topicPartition: TopicPartition, timestamp: Long) : Option[TimestampOffset] = { + logManager.getLog(TopicAndPartition(topicPartition.topic, topicPartition.partition)) match { + case Some(log) => + log.fetchOffsetsByTimestamp(timestamp) + case _ => + throw new UnknownTopicOrPartitionException(s"$topicPartition does not exist on the broker.") + } + } + private[server] def fetchOffsetsBefore(log: Log, timestamp: Long, maxNumOffsets: Int): Seq[Long] = { val segsArray = log.logSegments.toArray var offsetTimeArray: Array[(Long, Long)] = null @@ -1063,8 +1152,7 @@ class KafkaApis(val requestChannel: RequestChannel, (topic, Errors.CLUSTER_AUTHORIZATION_FAILED) } sendResponseCallback(results) - } - else { + } else { val (validTopics, duplicateTopics) = createTopicsRequest.topics.asScala.partition { case (topic, _) => !createTopicsRequest.duplicateTopics.contains(topic) }