KAFKA-2063; Bound fetch response size (KIP-74)

This PR is implementation of 
[KIP-74](https://cwiki.apache.org/confluence/display/KAFKA/KIP-74%3A+Add+Fetch+Response+Size+Limit+in+Bytes)
 which is originally motivated by 
[KAFKA-2063](https://issues.apache.org/jira/browse/KAFKA-2063).

Author: Andrey Neporada <nepor...@gmail.com>
Author: Ismael Juma <ism...@juma.me.uk>

Reviewers: Jun Rao <jun...@gmail.com>, Jiangjie Qin <becket....@gmail.com>, 
Jason Gustafson <ja...@confluent.io>

Closes #1812 from nepal/kip-74


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d04b0998
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d04b0998
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d04b0998

Branch: refs/heads/trunk
Commit: d04b0998c043a6a430921585ffd4c42572a3bf5a
Parents: 6fb25f0
Author: Andrey Neporada <nepor...@gmail.com>
Authored: Sun Sep 18 09:12:53 2016 -0700
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Sun Sep 18 09:15:16 2016 -0700

----------------------------------------------------------------------
 .../kafka/clients/consumer/ConsumerConfig.java  |  23 +-
 .../kafka/clients/consumer/KafkaConsumer.java   |   1 +
 .../clients/consumer/internals/Fetcher.java     |  42 ++--
 .../consumer/internals/SubscriptionState.java   |  82 ++++---
 .../kafka/common/internals/PartitionStates.java | 174 ++++++++++++++
 .../apache/kafka/common/protocol/Protocol.java  |  25 +-
 .../kafka/common/requests/FetchRequest.java     |  96 ++++++--
 .../kafka/common/requests/FetchResponse.java    |  91 ++++----
 .../clients/consumer/KafkaConsumerTest.java     |   2 +
 .../clients/consumer/internals/FetcherTest.java |  23 +-
 .../common/internals/PartitionStatesTest.java   | 219 +++++++++++++++++
 .../common/requests/RequestResponseTest.java    |  26 ++-
 .../main/scala/kafka/admin/ConfigCommand.scala  |  18 +-
 .../main/scala/kafka/admin/TopicCommand.scala   |  48 ----
 core/src/main/scala/kafka/api/ApiVersion.scala  |  11 +-
 .../src/main/scala/kafka/api/FetchRequest.scala |  86 +++++--
 .../main/scala/kafka/api/FetchResponse.scala    |  41 ++--
 .../scala/kafka/consumer/ConsumerConfig.scala   |   6 +-
 .../kafka/consumer/ConsumerFetcherManager.scala |  24 +-
 .../kafka/consumer/ConsumerFetcherThread.scala  |  33 +--
 .../coordinator/GroupMetadataManager.scala      |   2 +-
 .../main/scala/kafka/javaapi/FetchRequest.scala |  48 ++--
 .../main/scala/kafka/log/FileMessageSet.scala   |  12 +-
 core/src/main/scala/kafka/log/Log.scala         |   5 +-
 core/src/main/scala/kafka/log/LogSegment.scala  |  46 ++--
 .../kafka/server/AbstractFetcherManager.scala   |  13 +-
 .../kafka/server/AbstractFetcherThread.scala    | 118 ++++++----
 .../main/scala/kafka/server/DelayedFetch.scala  |  22 +-
 .../main/scala/kafka/server/FetchDataInfo.scala |   3 +-
 .../src/main/scala/kafka/server/KafkaApis.scala |  31 +--
 .../main/scala/kafka/server/KafkaConfig.scala   |  16 +-
 .../kafka/server/ReplicaFetcherThread.scala     |  93 ++++----
 .../scala/kafka/server/ReplicaManager.scala     | 199 +++++++++-------
 .../kafka/api/AuthorizerIntegrationTest.scala   |   5 +-
 .../kafka/api/BaseConsumerTest.scala            |  30 +--
 .../kafka/api/PlaintextConsumerTest.scala       | 133 +++++++++--
 .../kafka/api/ProducerFailureHandlingTest.scala |  94 ++++----
 .../api/RequestResponseSerializationTest.scala  |   8 +-
 .../kafka/integration/PrimitiveApiTest.scala    |   6 +-
 .../unit/kafka/log/FileMessageSetTest.scala     |  38 +--
 .../scala/unit/kafka/log/LogSegmentTest.scala   |   2 +-
 .../src/test/scala/unit/kafka/log/LogTest.scala |  58 +++++
 .../server/AbstractFetcherThreadTest.scala      |  26 +--
 .../unit/kafka/server/FetchRequestTest.scala    | 233 +++++++++++++++++++
 .../unit/kafka/server/KafkaConfigTest.scala     |   1 +
 .../kafka/server/ReplicaManagerQuotasTest.scala |  30 +--
 .../unit/kafka/server/ReplicaManagerTest.scala  |  26 ++-
 .../unit/kafka/server/SimpleFetchTest.scala     |  25 +-
 48 files changed, 1716 insertions(+), 678 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/d04b0998/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
index 4ce908e..ae791b0 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
@@ -113,6 +113,17 @@ public class ConsumerConfig extends AbstractConfig {
     private static final String FETCH_MIN_BYTES_DOC = "The minimum amount of 
data the server should return for a fetch request. If insufficient data is 
available the request will wait for that much data to accumulate before 
answering the request. The default setting of 1 byte means that fetch requests 
are answered as soon as a single byte of data is available or the fetch request 
times out waiting for data to arrive. Setting this to something greater than 1 
will cause the server to wait for larger amounts of data to accumulate which 
can improve server throughput a bit at the cost of some additional latency.";
 
     /**
+     * <code>fetch.max.bytes</code>
+     */
+    public static final String FETCH_MAX_BYTES_CONFIG = "fetch.max.bytes";
+    private static final String FETCH_MAX_BYTES_DOC = "The maximum amount of 
data the server should return for a fetch request. " +
+            "This is not an absolute maximum, if the first message in the 
first non-empty partition of the fetch is larger than " +
+            "this value, the message will still be returned to ensure that the 
consumer can make progress. " +
+            "The maximum message size accepted by the broker is defined via 
<code>message.max.bytes</code> (broker config) or " +
+            "<code>max.message.bytes</code> (topic config). Note that the 
consumer performs multiple fetches in parallel.";
+    public static final int DEFAULT_FETCH_MAX_BYTES = 50 * 1024 * 1024;
+
+    /**
      * <code>fetch.max.wait.ms</code>
      */
     public static final String FETCH_MAX_WAIT_MS_CONFIG = "fetch.max.wait.ms";
@@ -125,7 +136,11 @@ public class ConsumerConfig extends AbstractConfig {
      * <code>max.partition.fetch.bytes</code>
      */
     public static final String MAX_PARTITION_FETCH_BYTES_CONFIG = 
"max.partition.fetch.bytes";
-    private static final String MAX_PARTITION_FETCH_BYTES_DOC = "The maximum 
amount of data per-partition the server will return. The maximum total memory 
used for a request will be <code>#partitions * 
max.partition.fetch.bytes</code>. This size must be at least as large as the 
maximum message size the server allows or else it is possible for the producer 
to send messages larger than the consumer can fetch. If that happens, the 
consumer can get stuck trying to fetch a large message on a certain partition.";
+    private static final String MAX_PARTITION_FETCH_BYTES_DOC = "The maximum 
amount of data per-partition the server " +
+            "will return. If the first message in the first non-empty 
partition of the fetch is larger than this limit, the " +
+            "message will still be returned to ensure that the consumer can 
make progress. The maximum message size " +
+            "accepted by the broker is defined via 
<code>message.max.bytes</code> (broker config) or " +
+            "<code>max.message.bytes</code> (topic config). See " + 
FETCH_MAX_BYTES_CONFIG + " for limiting the consumer request size";
     public static final int DEFAULT_MAX_PARTITION_FETCH_BYTES = 1 * 1024 * 
1024;
 
     /** <code>send.buffer.bytes</code> */
@@ -265,6 +280,12 @@ public class ConsumerConfig extends AbstractConfig {
                                         atLeast(0),
                                         Importance.HIGH,
                                         FETCH_MIN_BYTES_DOC)
+                                .define(FETCH_MAX_BYTES_CONFIG,
+                                        Type.INT,
+                                        DEFAULT_FETCH_MAX_BYTES,
+                                        atLeast(0),
+                                        Importance.MEDIUM,
+                                        FETCH_MAX_BYTES_DOC)
                                 .define(FETCH_MAX_WAIT_MS_CONFIG,
                                         Type.INT,
                                         500,

http://git-wip-us.apache.org/repos/asf/kafka/blob/d04b0998/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 8fa7ab7..f7f2d20 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -672,6 +672,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
                     
config.getBoolean(ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_CONFIG));
             this.fetcher = new Fetcher<>(this.client,
                     config.getInt(ConsumerConfig.FETCH_MIN_BYTES_CONFIG),
+                    config.getInt(ConsumerConfig.FETCH_MAX_BYTES_CONFIG),
                     config.getInt(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG),
                     
config.getInt(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG),
                     config.getInt(ConsumerConfig.MAX_POLL_RECORDS_CONFIG),

http://git-wip-us.apache.org/repos/asf/kafka/blob/d04b0998/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index dd9d084..202c0ad 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -26,7 +26,6 @@ import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.InvalidMetadataException;
 import org.apache.kafka.common.errors.InvalidTopicException;
-import org.apache.kafka.common.errors.RecordTooLargeException;
 import org.apache.kafka.common.errors.RetriableException;
 import org.apache.kafka.common.errors.SerializationException;
 import org.apache.kafka.common.errors.TimeoutException;
@@ -62,6 +61,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
@@ -79,6 +79,7 @@ public class Fetcher<K, V> {
     private final ConsumerNetworkClient client;
     private final Time time;
     private final int minBytes;
+    private final int maxBytes;
     private final int maxWaitMs;
     private final int fetchSize;
     private final long retryBackoffMs;
@@ -96,6 +97,7 @@ public class Fetcher<K, V> {
 
     public Fetcher(ConsumerNetworkClient client,
                    int minBytes,
+                   int maxBytes,
                    int maxWaitMs,
                    int fetchSize,
                    int maxPollRecords,
@@ -113,6 +115,7 @@ public class Fetcher<K, V> {
         this.metadata = metadata;
         this.subscriptions = subscriptions;
         this.minBytes = minBytes;
+        this.maxBytes = maxBytes;
         this.maxWaitMs = maxWaitMs;
         this.fetchSize = fetchSize;
         this.maxPollRecords = maxPollRecords;
@@ -152,7 +155,7 @@ public class Fetcher<K, V> {
      * an in-flight fetch or pending fetch data.
      */
     public void sendFetches() {
-        for (Map.Entry<Node, FetchRequest> fetchEntry: 
createFetchRequests().entrySet()) {
+        for (Map.Entry<Node, FetchRequest> fetchEntry : 
createFetchRequests().entrySet()) {
             final FetchRequest request = fetchEntry.getValue();
             final Node fetchTarget = fetchEntry.getKey();
 
@@ -514,8 +517,8 @@ public class Fetcher<K, V> {
         }
     }
 
-    private Set<TopicPartition> fetchablePartitions() {
-        Set<TopicPartition> fetchable = subscriptions.fetchablePartitions();
+    private List<TopicPartition> fetchablePartitions() {
+        List<TopicPartition> fetchable = subscriptions.fetchablePartitions();
         if (nextInLineRecords != null && !nextInLineRecords.isEmpty())
             fetchable.remove(nextInLineRecords.partition);
         for (CompletedFetch completedFetch : completedFetches)
@@ -530,16 +533,16 @@ public class Fetcher<K, V> {
     private Map<Node, FetchRequest> createFetchRequests() {
         // create the fetch info
         Cluster cluster = metadata.fetch();
-        Map<Node, Map<TopicPartition, FetchRequest.PartitionData>> fetchable = 
new HashMap<>();
+        Map<Node, LinkedHashMap<TopicPartition, FetchRequest.PartitionData>> 
fetchable = new LinkedHashMap<>();
         for (TopicPartition partition : fetchablePartitions()) {
             Node node = cluster.leaderFor(partition);
             if (node == null) {
                 metadata.requestUpdate();
             } else if (this.client.pendingRequestCount(node) == 0) {
                 // if there is a leader and no in-flight requests, issue a new 
fetch
-                Map<TopicPartition, FetchRequest.PartitionData> fetch = 
fetchable.get(node);
+                LinkedHashMap<TopicPartition, FetchRequest.PartitionData> 
fetch = fetchable.get(node);
                 if (fetch == null) {
-                    fetch = new HashMap<>();
+                    fetch = new LinkedHashMap<>();
                     fetchable.put(node, fetch);
                 }
 
@@ -553,9 +556,9 @@ public class Fetcher<K, V> {
 
         // create the fetches
         Map<Node, FetchRequest> requests = new HashMap<>();
-        for (Map.Entry<Node, Map<TopicPartition, FetchRequest.PartitionData>> 
entry : fetchable.entrySet()) {
+        for (Map.Entry<Node, LinkedHashMap<TopicPartition, 
FetchRequest.PartitionData>> entry : fetchable.entrySet()) {
             Node node = entry.getKey();
-            FetchRequest fetch = new FetchRequest(this.maxWaitMs, 
this.minBytes, entry.getValue());
+            FetchRequest fetch = new FetchRequest(this.maxWaitMs, 
this.minBytes, this.maxBytes, entry.getValue());
             requests.put(node, fetch);
         }
         return requests;
@@ -590,14 +593,11 @@ public class Fetcher<K, V> {
                 ByteBuffer buffer = partition.recordSet;
                 MemoryRecords records = MemoryRecords.readableRecords(buffer);
                 List<ConsumerRecord<K, V>> parsed = new ArrayList<>();
-                boolean skippedRecords = false;
                 for (LogEntry logEntry : records) {
                     // Skip the messages earlier than current position.
                     if (logEntry.offset() >= position) {
                         parsed.add(parseRecord(tp, logEntry));
                         bytes += logEntry.size();
-                    } else {
-                        skippedRecords = true;
                     }
                 }
 
@@ -609,19 +609,6 @@ public class Fetcher<K, V> {
                     parsedRecords = new PartitionRecords<>(fetchOffset, tp, 
parsed);
                     ConsumerRecord<K, V> record = parsed.get(parsed.size() - 
1);
                     
this.sensors.recordsFetchLag.record(partition.highWatermark - record.offset());
-                } else if (buffer.limit() > 0 && !skippedRecords) {
-                    // we did not read a single message from a non-empty buffer
-                    // because that message's size is larger than fetch size, 
in this case
-                    // record this exception
-                    Map<TopicPartition, Long> recordTooLargePartitions = 
Collections.singletonMap(tp, fetchOffset);
-                    throw new RecordTooLargeException("There are some messages 
at [Partition=Offset]: "
-                            + recordTooLargePartitions
-                            + " whose size is larger than the fetch size "
-                            + this.fetchSize
-                            + " and hence cannot be ever returned."
-                            + " Increase the fetch size on the client (using 
max.partition.fetch.bytes),"
-                            + " or decrease the maximum message size the 
broker will allow (using message.max.bytes).",
-                            recordTooLargePartitions);
                 }
             } else if (partition.errorCode == 
Errors.NOT_LEADER_FOR_PARTITION.code()
                     || partition.errorCode == 
Errors.UNKNOWN_TOPIC_OR_PARTITION.code()) {
@@ -648,6 +635,11 @@ public class Fetcher<K, V> {
             completedFetch.metricAggregator.record(tp, bytes, recordsCount);
         }
 
+        // we move the partition to the end if we received some bytes or if 
there was an error. This way, it's more
+        // likely that partitions for the same topic can remain together 
(allowing for more efficient serialization).
+        if (bytes > 0 || partition.errorCode != Errors.NONE.code())
+            subscriptions.movePartitionToEnd(tp);
+
         return parsedRecords;
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/d04b0998/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
index dd0bce9..9029417 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
@@ -16,18 +16,21 @@ import 
org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.clients.consumer.OffsetResetStrategy;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.internals.PartitionStates;
 
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.regex.Pattern;
 
 /**
  * A class for tracking the topics, partitions, and offsets for the consumer. 
A partition
- * is "assigned" either directly with {@link #assignFromUser(Collection)} 
(manual assignment)
+ * is "assigned" either directly with {@link #assignFromUser(Set)} (manual 
assignment)
  * or with {@link #assignFromSubscribed(Collection)} (automatic assignment 
from subscription).
  *
  * Once assigned, the partition is not considered "fetchable" until its 
initial position has
@@ -68,8 +71,8 @@ public class SubscriptionState {
     /* the list of topics the group has subscribed to (set only for the leader 
on join group completion) */
     private final Set<String> groupSubscription;
 
-    /* the list of partitions currently assigned */
-    private final Map<TopicPartition, TopicPartitionState> assignment;
+    /* the partitions that are currently assigned, note that the order of 
partition matters (see FetchBuilder for more details) */
+    private final PartitionStates<TopicPartitionState> assignment;
 
     /* do we need to request the latest committed offsets from the 
coordinator? */
     private boolean needsFetchCommittedOffsets;
@@ -84,7 +87,7 @@ public class SubscriptionState {
         this.defaultResetStrategy = defaultResetStrategy;
         this.subscription = Collections.emptySet();
         this.userAssignment = Collections.emptySet();
-        this.assignment = new HashMap<>();
+        this.assignment = new PartitionStates<>();
         this.groupSubscription = new HashSet<>();
         this.needsFetchCommittedOffsets = true; // initialize to true for the 
consumers to fetch offset upon starting up
         this.subscribedPattern = null;
@@ -156,13 +159,17 @@ public class SubscriptionState {
     public void assignFromUser(Set<TopicPartition> partitions) {
         setSubscriptionType(SubscriptionType.USER_ASSIGNED);
 
-        if (!this.assignment.keySet().equals(partitions)) {
+        if (!this.assignment.partitionSet().equals(partitions)) {
             this.userAssignment = partitions;
 
-            for (TopicPartition partition : partitions)
-                if (!assignment.containsKey(partition))
-                    addAssignedPartition(partition);
-            this.assignment.keySet().retainAll(this.userAssignment);
+            Map<TopicPartition, TopicPartitionState> partitionToState = new 
HashMap<>();
+            for (TopicPartition partition : partitions) {
+                TopicPartitionState state = assignment.stateValue(partition);
+                if (state == null)
+                    state = new TopicPartitionState();
+                partitionToState.put(partition, state);
+            }
+            this.assignment.set(partitionToState);
             this.needsFetchCommittedOffsets = true;
         }
     }
@@ -179,13 +186,18 @@ public class SubscriptionState {
             if (!this.subscription.contains(tp.topic()))
                 throw new IllegalArgumentException("Assigned partition " + tp 
+ " for non-subscribed topic.");
 
-        // after rebalancing, we always reinitialize the assignment state
-        this.assignment.clear();
-        for (TopicPartition tp: assignments)
-            addAssignedPartition(tp);
+        // after rebalancing, we always reinitialize the assignment value
+        this.assignment.set(partitionToStateMap(assignments));
         this.needsFetchCommittedOffsets = true;
     }
 
+    private Map<TopicPartition, TopicPartitionState> 
partitionToStateMap(Collection<TopicPartition> assignments) {
+        Map<TopicPartition, TopicPartitionState> map = new 
HashMap<>(assignments.size());
+        for (TopicPartition tp : assignments)
+            map.put(tp, new TopicPartitionState());
+        return map;
+    }
+
     public void subscribe(Pattern pattern, ConsumerRebalanceListener listener) 
{
         if (listener == null)
             throw new IllegalArgumentException("RebalanceListener cannot be 
null");
@@ -218,11 +230,9 @@ public class SubscriptionState {
 
     public Set<TopicPartition> pausedPartitions() {
         HashSet<TopicPartition> paused = new HashSet<>();
-        for (Map.Entry<TopicPartition, TopicPartitionState> entry : 
assignment.entrySet()) {
-            final TopicPartition tp = entry.getKey();
-            final TopicPartitionState state = entry.getValue();
-            if (state.paused) {
-                paused.add(tp);
+        for (PartitionStates.PartitionState<TopicPartitionState> state : 
assignment.partitionStates()) {
+            if (state.value().paused) {
+                paused.add(state.topicPartition());
             }
         }
         return paused;
@@ -243,7 +253,7 @@ public class SubscriptionState {
     }
 
     private TopicPartitionState assignedState(TopicPartition tp) {
-        TopicPartitionState state = this.assignment.get(tp);
+        TopicPartitionState state = this.assignment.stateValue(tp);
         if (state == null)
             throw new IllegalStateException("No current assignment for 
partition " + tp);
         return state;
@@ -274,14 +284,14 @@ public class SubscriptionState {
     }
 
     public Set<TopicPartition> assignedPartitions() {
-        return this.assignment.keySet();
+        return this.assignment.partitionSet();
     }
 
-    public Set<TopicPartition> fetchablePartitions() {
-        Set<TopicPartition> fetchable = new HashSet<>();
-        for (Map.Entry<TopicPartition, TopicPartitionState> entry : 
assignment.entrySet()) {
-            if (entry.getValue().isFetchable())
-                fetchable.add(entry.getKey());
+    public List<TopicPartition> fetchablePartitions() {
+        List<TopicPartition> fetchable = new ArrayList<>();
+        for (PartitionStates.PartitionState<TopicPartitionState> state : 
assignment.partitionStates()) {
+            if (state.value().isFetchable())
+                fetchable.add(state.topicPartition());
         }
         return fetchable;
     }
@@ -300,10 +310,9 @@ public class SubscriptionState {
 
     public Map<TopicPartition, OffsetAndMetadata> allConsumed() {
         Map<TopicPartition, OffsetAndMetadata> allConsumed = new HashMap<>();
-        for (Map.Entry<TopicPartition, TopicPartitionState> entry : 
assignment.entrySet()) {
-            TopicPartitionState state = entry.getValue();
-            if (state.hasValidPosition())
-                allConsumed.put(entry.getKey(), new 
OffsetAndMetadata(state.position));
+        for (PartitionStates.PartitionState<TopicPartitionState> state : 
assignment.partitionStates()) {
+            if (state.value().hasValidPosition())
+                allConsumed.put(state.topicPartition(), new 
OffsetAndMetadata(state.value().position));
         }
         return allConsumed;
     }
@@ -329,7 +338,7 @@ public class SubscriptionState {
     }
 
     public boolean hasAllFetchPositions() {
-        for (TopicPartitionState state : assignment.values())
+        for (TopicPartitionState state : assignment.partitionStateValues())
             if (!state.hasValidPosition())
                 return false;
         return true;
@@ -337,14 +346,15 @@ public class SubscriptionState {
 
     public Set<TopicPartition> missingFetchPositions() {
         Set<TopicPartition> missing = new HashSet<>();
-        for (Map.Entry<TopicPartition, TopicPartitionState> entry : 
assignment.entrySet())
-            if (!entry.getValue().hasValidPosition())
-                missing.add(entry.getKey());
+        for (PartitionStates.PartitionState<TopicPartitionState> state : 
assignment.partitionStates()) {
+            if (!state.value().hasValidPosition())
+                missing.add(state.topicPartition());
+        }
         return missing;
     }
 
     public boolean isAssigned(TopicPartition tp) {
-        return assignment.containsKey(tp);
+        return assignment.contains(tp);
     }
 
     public boolean isPaused(TopicPartition tp) {
@@ -363,8 +373,8 @@ public class SubscriptionState {
         assignedState(tp).resume();
     }
 
-    private void addAssignedPartition(TopicPartition tp) {
-        this.assignment.put(tp, new TopicPartitionState());
+    public void movePartitionToEnd(TopicPartition tp) {
+        assignment.moveToEnd(tp);
     }
 
     public ConsumerRebalanceListener listener() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/d04b0998/clients/src/main/java/org/apache/kafka/common/internals/PartitionStates.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/internals/PartitionStates.java 
b/clients/src/main/java/org/apache/kafka/common/internals/PartitionStates.java
new file mode 100644
index 0000000..49823c0
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/common/internals/PartitionStates.java
@@ -0,0 +1,174 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.internals;
+
+import org.apache.kafka.common.TopicPartition;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+
+/**
+ * This class is a useful building block for doing fetch requests where topic 
partitions have to be rotated via
+ * round-robin to ensure fairness and some level of determinism given the 
existence of a limit on the fetch response
+ * size. Because the serialization of fetch requests is more efficient if all 
partitions for the same topic are grouped
+ * together, we do such grouping in the method `set`.
+ *
+ * As partitions are moved to the end, the same topic may be repeated more 
than once. In the optimal case, a single
+ * topic would "wrap around" and appear twice. However, as partitions are 
fetched in different orders and partition
+ * leadership changes, we will deviate from the optimal. If this turns out to 
be an issue in practice, we can improve
+ * it by tracking the partitions per node or calling `set` every so often.
+ */
+public class PartitionStates<S> {
+
+    private final LinkedHashMap<TopicPartition, S> map = new LinkedHashMap<>();
+
+    public PartitionStates() {}
+
+    public void moveToEnd(TopicPartition topicPartition) {
+        S state = map.remove(topicPartition);
+        if (state != null)
+            map.put(topicPartition, state);
+    }
+
+    public void updateAndMoveToEnd(TopicPartition topicPartition, S state) {
+        map.remove(topicPartition);
+        map.put(topicPartition, state);
+    }
+
+    public void remove(TopicPartition topicPartition) {
+        map.remove(topicPartition);
+    }
+
+    /**
+     * Returns the partitions in random order.
+     */
+    public Set<TopicPartition> partitionSet() {
+        return new HashSet<>(map.keySet());
+    }
+
+    public void clear() {
+        map.clear();
+    }
+
+    public boolean contains(TopicPartition topicPartition) {
+        return map.containsKey(topicPartition);
+    }
+
+    /**
+     * Returns the partition states in order.
+     */
+    public List<PartitionState<S>> partitionStates() {
+        List<PartitionState<S>> result = new ArrayList<>();
+        for (Map.Entry<TopicPartition, S> entry : map.entrySet()) {
+            result.add(new PartitionState<>(entry.getKey(), entry.getValue()));
+        }
+        return result;
+    }
+
+    /**
+     * Returns the partition state values in order.
+     */
+    public List<S> partitionStateValues() {
+        return new ArrayList<>(map.values());
+    }
+
+    public S stateValue(TopicPartition topicPartition) {
+        return map.get(topicPartition);
+    }
+
+    public int size() {
+        return map.size();
+    }
+
+    /**
+     * Update the builder to have the received map as its state (i.e. the 
previous state is cleared). The builder will
+     * "batch by topic", so if we have a, b and c, each with two partitions, 
we may end up with something like the
+     * following (the order of topics and partitions within topics is 
dependent on the iteration order of the received
+     * map): a0, a1, b1, b0, c0, c1.
+     */
+    public void set(Map<TopicPartition, S> partitionToState) {
+        map.clear();
+        update(partitionToState);
+    }
+
+    private void update(Map<TopicPartition, S> partitionToState) {
+        LinkedHashMap<String, List<TopicPartition>> topicToPartitions = new 
LinkedHashMap<>();
+        for (TopicPartition tp : partitionToState.keySet()) {
+            List<TopicPartition> partitions = 
topicToPartitions.get(tp.topic());
+            if (partitions == null) {
+                partitions = new ArrayList<>();
+                topicToPartitions.put(tp.topic(), partitions);
+            }
+            partitions.add(tp);
+        }
+        for (Map.Entry<String, List<TopicPartition>> entry : 
topicToPartitions.entrySet()) {
+            for (TopicPartition tp : entry.getValue()) {
+                S state = partitionToState.get(tp);
+                map.put(tp, state);
+            }
+        }
+    }
+
+    public static class PartitionState<S> {
+        private final TopicPartition topicPartition;
+        private final S value;
+
+        public PartitionState(TopicPartition topicPartition, S state) {
+            this.topicPartition = Objects.requireNonNull(topicPartition);
+            this.value = Objects.requireNonNull(state);
+        }
+
+        public S value() {
+            return value;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o)
+                return true;
+            if (o == null || getClass() != o.getClass())
+                return false;
+
+            PartitionState<?> that = (PartitionState<?>) o;
+
+            return topicPartition.equals(that.topicPartition) && 
value.equals(that.value);
+        }
+
+        @Override
+        public int hashCode() {
+            int result = topicPartition.hashCode();
+            result = 31 * result + value.hashCode();
+            return result;
+        }
+
+        public TopicPartition topicPartition() {
+            return topicPartition;
+        }
+
+        @Override
+        public String toString() {
+            return "PartitionState(" + topicPartition + "=" + value + ')';
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/d04b0998/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java 
b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
index bda4757..d64cf6d 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
@@ -444,6 +444,26 @@ public class Protocol {
     // Only the version number is incremented to indicate the client support 
message format V1 which uses
     // relative offset and has timestamp.
     public static final Schema FETCH_REQUEST_V2 = FETCH_REQUEST_V1;
+    // FETCH_REQUEST_V3 added top level max_bytes field - the total size of 
partition data to accumulate in response.
+    // The partition ordering is now relevant - partitions will be processed 
in order they appear in request.
+    public static final Schema FETCH_REQUEST_V3 = new Schema(new 
Field("replica_id",
+                                                                       INT32,
+                                                                       "Broker 
id of the follower. For normal consumers, use -1."),
+                                                             new 
Field("max_wait_time",
+                                                                       INT32,
+                                                                       
"Maximum time in ms to wait for the response."),
+                                                             new 
Field("min_bytes",
+                                                                       INT32,
+                                                                       
"Minimum bytes to accumulate in the response."),
+                                                             new 
Field("max_bytes",
+                                                                       INT32,
+                                                                       
"Maximum bytes to accumulate in the response. Note that this is not an absolute 
maximum, " +
+                                                                       "if the 
first message in the first non-empty partition of the fetch is larger than this 
" +
+                                                                       "value, 
the message will still be returned to ensure that progress can be made."),
+                                                             new 
Field("topics",
+                                                                       new 
ArrayOf(FETCH_REQUEST_TOPIC_V0),
+                                                                       "Topics 
to fetch in the order provided."));
+
     public static final Schema FETCH_RESPONSE_PARTITION_V0 = new Schema(new 
Field("partition",
                                                                                
   INT32,
                                                                                
   "Topic partition id."),
@@ -470,9 +490,10 @@ public class Protocol {
     // record set only includes messages of v0 (magic byte 0). In v2, record 
set can include messages of v0 and v1
     // (magic byte 0 and 1). For details, see ByteBufferMessageSet.
     public static final Schema FETCH_RESPONSE_V2 = FETCH_RESPONSE_V1;
+    public static final Schema FETCH_RESPONSE_V3 = FETCH_RESPONSE_V2;
 
-    public static final Schema[] FETCH_REQUEST = new Schema[] 
{FETCH_REQUEST_V0, FETCH_REQUEST_V1, FETCH_REQUEST_V2};
-    public static final Schema[] FETCH_RESPONSE = new Schema[] 
{FETCH_RESPONSE_V0, FETCH_RESPONSE_V1, FETCH_RESPONSE_V2};
+    public static final Schema[] FETCH_REQUEST = new Schema[] 
{FETCH_REQUEST_V0, FETCH_REQUEST_V1, FETCH_REQUEST_V2, FETCH_REQUEST_V3};
+    public static final Schema[] FETCH_RESPONSE = new Schema[] 
{FETCH_RESPONSE_V0, FETCH_RESPONSE_V1, FETCH_RESPONSE_V2, FETCH_RESPONSE_V3};
 
     /* List groups api */
     public static final Schema LIST_GROUPS_REQUEST_V0 = new Schema();

http://git-wip-us.apache.org/repos/asf/kafka/blob/d04b0998/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java 
b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
index f8b7fe3..dcdfd9c 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
@@ -14,7 +14,7 @@ package org.apache.kafka.common.requests;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
-import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -24,7 +24,6 @@ import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.ProtoUtils;
 import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
-import org.apache.kafka.common.utils.CollectionUtils;
 
 public class FetchRequest extends AbstractRequest {
 
@@ -35,6 +34,9 @@ public class FetchRequest extends AbstractRequest {
     private static final String MIN_BYTES_KEY_NAME = "min_bytes";
     private static final String TOPICS_KEY_NAME = "topics";
 
+    // request and partition level name
+    private static final String MAX_BYTES_KEY_NAME = "max_bytes";
+
     // topic level field names
     private static final String TOPIC_KEY_NAME = "topic";
     private static final String PARTITIONS_KEY_NAME = "partitions";
@@ -42,12 +44,15 @@ public class FetchRequest extends AbstractRequest {
     // partition level field names
     private static final String PARTITION_KEY_NAME = "partition";
     private static final String FETCH_OFFSET_KEY_NAME = "fetch_offset";
-    private static final String MAX_BYTES_KEY_NAME = "max_bytes";
+
+    // default values for older versions where a request level limit did not 
exist
+    public static final int DEFAULT_RESPONSE_MAX_BYTES = Integer.MAX_VALUE;
 
     private final int replicaId;
     private final int maxWait;
     private final int minBytes;
-    private final Map<TopicPartition, PartitionData> fetchData;
+    private final int maxBytes;
+    private final LinkedHashMap<TopicPartition, PartitionData> fetchData;
 
     public static final class PartitionData {
         public final long offset;
@@ -59,29 +64,79 @@ public class FetchRequest extends AbstractRequest {
         }
     }
 
+    static final class TopicAndPartitionData<T> {
+        public final String topic;
+        public final LinkedHashMap<Integer, T> partitions;
+
+        public TopicAndPartitionData(String topic) {
+            this.topic = topic;
+            this.partitions = new LinkedHashMap<>();
+        }
+
+        public static <T> List<TopicAndPartitionData<T>> 
batchByTopic(LinkedHashMap<TopicPartition, T> data) {
+            List<TopicAndPartitionData<T>> topics = new ArrayList<>();
+            for (Map.Entry<TopicPartition, T> topicEntry : data.entrySet()) {
+                String topic = topicEntry.getKey().topic();
+                int partition = topicEntry.getKey().partition();
+                T partitionData = topicEntry.getValue();
+                if (topics.isEmpty() || !topics.get(topics.size() - 
1).topic.equals(topic))
+                    topics.add(new TopicAndPartitionData(topic));
+                topics.get(topics.size() - 1).partitions.put(partition, 
partitionData);
+            }
+            return topics;
+        }
+    }
+
     /**
-     * Create a non-replica fetch request
+     * Create a non-replica fetch request for versions 0, 1 or 2 (the version 
is set via the RequestHeader).
      */
+    @Deprecated
     public FetchRequest(int maxWait, int minBytes, Map<TopicPartition, 
PartitionData> fetchData) {
-        this(CONSUMER_REPLICA_ID, maxWait, minBytes, fetchData);
+        // Any of 0, 1 or 2 would do here since the schemas for these versions 
are identical
+        this(2, CONSUMER_REPLICA_ID, maxWait, minBytes, 
DEFAULT_RESPONSE_MAX_BYTES, new LinkedHashMap<>(fetchData));
     }
 
     /**
-     * Create a replica fetch request
+     * Create a non-replica fetch request for the current version.
      */
-    public FetchRequest(int replicaId, int maxWait, int minBytes, 
Map<TopicPartition, PartitionData> fetchData) {
-        super(new Struct(CURRENT_SCHEMA));
-        Map<String, Map<Integer, PartitionData>> topicsData = 
CollectionUtils.groupDataByTopic(fetchData);
+    public FetchRequest(int maxWait, int minBytes, int maxBytes, 
LinkedHashMap<TopicPartition, PartitionData> fetchData) {
+        this(ProtoUtils.latestVersion(ApiKeys.FETCH.id), CONSUMER_REPLICA_ID, 
maxWait, minBytes, maxBytes, fetchData);
+    }
+
+    /**
+     * Create a replica fetch request for versions 0, 1 or 2 (the actual 
version is determined by the RequestHeader).
+     */
+    @Deprecated
+    public static FetchRequest fromReplica(int replicaId, int maxWait, int 
minBytes,
+                                           Map<TopicPartition, PartitionData> 
fetchData) {
+        // Any of 0, 1 or 2 would do here since the schemas for these versions 
are identical
+        return new FetchRequest(2, replicaId, maxWait, minBytes, 
DEFAULT_RESPONSE_MAX_BYTES, new LinkedHashMap<>(fetchData));
+    }
+
+    /**
+     * Create a replica fetch request for the current version.
+     */
+    public static FetchRequest fromReplica(int replicaId, int maxWait, int 
minBytes, int maxBytes,
+                                           LinkedHashMap<TopicPartition, 
PartitionData> fetchData) {
+        return new FetchRequest(ProtoUtils.latestVersion(ApiKeys.FETCH.id), 
replicaId, maxWait, minBytes, maxBytes, fetchData);
+    }
+
+    private FetchRequest(int version, int replicaId, int maxWait, int 
minBytes, int maxBytes,
+                         LinkedHashMap<TopicPartition, PartitionData> 
fetchData) {
+        super(new Struct(ProtoUtils.requestSchema(ApiKeys.FETCH.id, version)));
+        List<TopicAndPartitionData<PartitionData>> topicsData = 
TopicAndPartitionData.batchByTopic(fetchData);
 
         struct.set(REPLICA_ID_KEY_NAME, replicaId);
         struct.set(MAX_WAIT_KEY_NAME, maxWait);
         struct.set(MIN_BYTES_KEY_NAME, minBytes);
+        if (version >= 3)
+            struct.set(MAX_BYTES_KEY_NAME, maxBytes);
         List<Struct> topicArray = new ArrayList<Struct>();
-        for (Map.Entry<String, Map<Integer, PartitionData>> topicEntry : 
topicsData.entrySet()) {
+        for (TopicAndPartitionData<PartitionData> topicEntry : topicsData) {
             Struct topicData = struct.instance(TOPICS_KEY_NAME);
-            topicData.set(TOPIC_KEY_NAME, topicEntry.getKey());
+            topicData.set(TOPIC_KEY_NAME, topicEntry.topic);
             List<Struct> partitionArray = new ArrayList<Struct>();
-            for (Map.Entry<Integer, PartitionData> partitionEntry : 
topicEntry.getValue().entrySet()) {
+            for (Map.Entry<Integer, PartitionData> partitionEntry : 
topicEntry.partitions.entrySet()) {
                 PartitionData fetchPartitionData = partitionEntry.getValue();
                 Struct partitionData = topicData.instance(PARTITIONS_KEY_NAME);
                 partitionData.set(PARTITION_KEY_NAME, partitionEntry.getKey());
@@ -96,6 +151,7 @@ public class FetchRequest extends AbstractRequest {
         this.replicaId = replicaId;
         this.maxWait = maxWait;
         this.minBytes = minBytes;
+        this.maxBytes = maxBytes;
         this.fetchData = fetchData;
     }
 
@@ -104,7 +160,11 @@ public class FetchRequest extends AbstractRequest {
         replicaId = struct.getInt(REPLICA_ID_KEY_NAME);
         maxWait = struct.getInt(MAX_WAIT_KEY_NAME);
         minBytes = struct.getInt(MIN_BYTES_KEY_NAME);
-        fetchData = new HashMap<TopicPartition, PartitionData>();
+        if (struct.hasField(MAX_BYTES_KEY_NAME))
+            maxBytes = struct.getInt(MAX_BYTES_KEY_NAME);
+        else
+            maxBytes = DEFAULT_RESPONSE_MAX_BYTES;
+        fetchData = new LinkedHashMap<>();
         for (Object topicResponseObj : struct.getArray(TOPICS_KEY_NAME)) {
             Struct topicResponse = (Struct) topicResponseObj;
             String topic = topicResponse.getString(TOPIC_KEY_NAME);
@@ -121,7 +181,7 @@ public class FetchRequest extends AbstractRequest {
 
     @Override
     public AbstractRequestResponse getErrorResponse(int versionId, Throwable 
e) {
-        Map<TopicPartition, FetchResponse.PartitionData> responseData = new 
HashMap<TopicPartition, FetchResponse.PartitionData>();
+        Map<TopicPartition, FetchResponse.PartitionData> responseData = new 
LinkedHashMap<>();
 
         for (Map.Entry<TopicPartition, PartitionData> entry: 
fetchData.entrySet()) {
             FetchResponse.PartitionData partitionResponse = new 
FetchResponse.PartitionData(Errors.forException(e).code(),
@@ -134,6 +194,8 @@ public class FetchRequest extends AbstractRequest {
             case 0:
                 return new FetchResponse(responseData);
             case 1:
+            case 2:
+            case 3:
                 return new FetchResponse(responseData, 0);
             default:
                 throw new IllegalArgumentException(String.format("Version %d 
is not valid. Valid versions for %s are 0 to %d",
@@ -153,6 +215,10 @@ public class FetchRequest extends AbstractRequest {
         return minBytes;
     }
 
+    public int maxBytes() {
+        return maxBytes;
+    }
+
     public Map<TopicPartition, PartitionData> fetchData() {
         return fetchData;
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d04b0998/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java 
b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
index f28472f..111d197 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
@@ -21,11 +21,10 @@ import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.ProtoUtils;
 import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
-import org.apache.kafka.common.utils.CollectionUtils;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
-import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -65,7 +64,7 @@ public class FetchResponse extends AbstractRequestResponse {
     public static final long INVALID_HIGHWATERMARK = -1L;
     public static final ByteBuffer EMPTY_RECORD_SET = ByteBuffer.allocate(0);
 
-    private final Map<TopicPartition, PartitionData> responseData;
+    private final LinkedHashMap<TopicPartition, PartitionData> responseData;
     private final int throttleTime;
 
     public static final class PartitionData {
@@ -85,28 +84,61 @@ public class FetchResponse extends AbstractRequestResponse {
      * @param responseData fetched data grouped by topic-partition
      */
     public FetchResponse(Map<TopicPartition, PartitionData> responseData) {
-        super(new Struct(ProtoUtils.responseSchema(ApiKeys.FETCH.id, 0)));
-        initCommonFields(responseData);
-        this.responseData = responseData;
-        this.throttleTime = DEFAULT_THROTTLE_TIME;
+        this(0, new LinkedHashMap<>(responseData), DEFAULT_THROTTLE_TIME);
     }
 
-  /**
-   * Constructor for Version 1
-   * @param responseData fetched data grouped by topic-partition
-   * @param throttleTime Time in milliseconds the response was throttled
-   */
+    /**
+     * Constructor for Version 1 and 2
+     * @param responseData fetched data grouped by topic-partition
+     * @param throttleTime Time in milliseconds the response was throttled
+     */
     public FetchResponse(Map<TopicPartition, PartitionData> responseData, int 
throttleTime) {
-        super(new Struct(CURRENT_SCHEMA));
-        initCommonFields(responseData);
-        struct.set(THROTTLE_TIME_KEY_NAME, throttleTime);
+        // the schema for versions 1 and 2 is the same, so we pick 2 here
+        this(2, new LinkedHashMap<>(responseData), throttleTime);
+    }
+
+    /**
+     * Constructor for Version 3
+     * @param responseData fetched data grouped by topic-partition
+     * @param throttleTime Time in milliseconds the response was throttled
+     */
+    public FetchResponse(LinkedHashMap<TopicPartition, PartitionData> 
responseData, int throttleTime) {
+        this(3, responseData, throttleTime);
+    }
+
+    private FetchResponse(int version, LinkedHashMap<TopicPartition, 
PartitionData> responseData, int throttleTime) {
+        super(new Struct(ProtoUtils.responseSchema(ApiKeys.FETCH.id, 
version)));
+
+        List<FetchRequest.TopicAndPartitionData<PartitionData>> topicsData = 
FetchRequest.TopicAndPartitionData.batchByTopic(responseData);
+        List<Struct> topicArray = new ArrayList<>();
+        for (FetchRequest.TopicAndPartitionData<PartitionData> topicEntry: 
topicsData) {
+            Struct topicData = struct.instance(RESPONSES_KEY_NAME);
+            topicData.set(TOPIC_KEY_NAME, topicEntry.topic);
+            List<Struct> partitionArray = new ArrayList<>();
+            for (Map.Entry<Integer, PartitionData> partitionEntry : 
topicEntry.partitions.entrySet()) {
+                PartitionData fetchPartitionData = partitionEntry.getValue();
+                Struct partitionData = topicData.instance(PARTITIONS_KEY_NAME);
+                partitionData.set(PARTITION_KEY_NAME, partitionEntry.getKey());
+                partitionData.set(ERROR_CODE_KEY_NAME, 
fetchPartitionData.errorCode);
+                partitionData.set(HIGH_WATERMARK_KEY_NAME, 
fetchPartitionData.highWatermark);
+                partitionData.set(RECORD_SET_KEY_NAME, 
fetchPartitionData.recordSet);
+                partitionArray.add(partitionData);
+            }
+            topicData.set(PARTITIONS_KEY_NAME, partitionArray.toArray());
+            topicArray.add(topicData);
+        }
+        struct.set(RESPONSES_KEY_NAME, topicArray.toArray());
+
+        if (version >= 1)
+            struct.set(THROTTLE_TIME_KEY_NAME, throttleTime);
+
         this.responseData = responseData;
         this.throttleTime = throttleTime;
     }
 
     public FetchResponse(Struct struct) {
         super(struct);
-        responseData = new HashMap<TopicPartition, PartitionData>();
+        LinkedHashMap<TopicPartition, PartitionData> responseData = new 
LinkedHashMap<>();
         for (Object topicResponseObj : struct.getArray(RESPONSES_KEY_NAME)) {
             Struct topicResponse = (Struct) topicResponseObj;
             String topic = topicResponse.getString(TOPIC_KEY_NAME);
@@ -120,34 +152,11 @@ public class FetchResponse extends 
AbstractRequestResponse {
                 responseData.put(new TopicPartition(topic, partition), 
partitionData);
             }
         }
+        this.responseData = responseData;
         this.throttleTime = struct.hasField(THROTTLE_TIME_KEY_NAME) ? 
struct.getInt(THROTTLE_TIME_KEY_NAME) : DEFAULT_THROTTLE_TIME;
     }
 
-    private void initCommonFields(Map<TopicPartition, PartitionData> 
responseData) {
-        Map<String, Map<Integer, PartitionData>> topicsData = 
CollectionUtils.groupDataByTopic(responseData);
-
-        List<Struct> topicArray = new ArrayList<Struct>();
-        for (Map.Entry<String, Map<Integer, PartitionData>> topicEntry: 
topicsData.entrySet()) {
-            Struct topicData = struct.instance(RESPONSES_KEY_NAME);
-            topicData.set(TOPIC_KEY_NAME, topicEntry.getKey());
-            List<Struct> partitionArray = new ArrayList<Struct>();
-            for (Map.Entry<Integer, PartitionData> partitionEntry : 
topicEntry.getValue().entrySet()) {
-                PartitionData fetchPartitionData = partitionEntry.getValue();
-                Struct partitionData = topicData.instance(PARTITIONS_KEY_NAME);
-                partitionData.set(PARTITION_KEY_NAME, partitionEntry.getKey());
-                partitionData.set(ERROR_CODE_KEY_NAME, 
fetchPartitionData.errorCode);
-                partitionData.set(HIGH_WATERMARK_KEY_NAME, 
fetchPartitionData.highWatermark);
-                partitionData.set(RECORD_SET_KEY_NAME, 
fetchPartitionData.recordSet);
-                partitionArray.add(partitionData);
-            }
-            topicData.set(PARTITIONS_KEY_NAME, partitionArray.toArray());
-            topicArray.add(topicData);
-        }
-        struct.set(RESPONSES_KEY_NAME, topicArray.toArray());
-    }
-
-
-    public Map<TopicPartition, PartitionData> responseData() {
+    public LinkedHashMap<TopicPartition, PartitionData> responseData() {
         return responseData;
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/d04b0998/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index a910f2f..fd0794c 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -1078,6 +1078,7 @@ public class KafkaConsumerTest {
         long requestTimeoutMs = 30000;
         boolean excludeInternalTopics = true;
         int minBytes = 1;
+        int maxBytes = Integer.MAX_VALUE;
         int maxWaitMs = 500;
         int fetchSize = 1024 * 1024;
         int maxPollRecords = Integer.MAX_VALUE;
@@ -1116,6 +1117,7 @@ public class KafkaConsumerTest {
         Fetcher<String, String> fetcher = new Fetcher<>(
                 consumerClient,
                 minBytes,
+                maxBytes,
                 maxWaitMs,
                 fetchSize,
                 maxPollRecords,

http://git-wip-us.apache.org/repos/asf/kafka/blob/d04b0998/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 15bd9a2..88a0526 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -31,7 +31,6 @@ import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.InvalidTopicException;
-import org.apache.kafka.common.errors.RecordTooLargeException;
 import org.apache.kafka.common.errors.SerializationException;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.errors.TopicAuthorizationException;
@@ -64,7 +63,6 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Random;
 
 import static java.util.Collections.singleton;
 import static org.junit.Assert.assertEquals;
@@ -80,6 +78,7 @@ public class FetcherTest {
     private final String metricGroup = "consumer" + groupId + 
"-fetch-manager-metrics";
     private TopicPartition tp = new TopicPartition(topicName, 0);
     private int minBytes = 1;
+    private int maxBytes = Integer.MAX_VALUE;
     private int maxWaitMs = 0;
     private int fetchSize = 1000;
     private long retryBackoffMs = 100;
@@ -315,25 +314,6 @@ public class FetcherTest {
         assertEquals(30L, consumerRecords.get(2).offset());
     }
 
-    @Test(expected = RecordTooLargeException.class)
-    public void testFetchRecordTooLarge() {
-        subscriptions.assignFromUser(singleton(tp));
-        subscriptions.seek(tp, 0);
-
-        // prepare large record
-        MemoryRecords records = 
MemoryRecords.emptyRecords(ByteBuffer.allocate(1024), CompressionType.NONE);
-        byte[] bytes = new byte[this.fetchSize];
-        new Random().nextBytes(bytes);
-        records.append(1L, 0L, null, bytes);
-        records.close();
-
-        // resize the limit of the buffer to pretend it is only fetch-size 
large
-        fetcher.sendFetches();
-        client.prepareResponse(fetchResponse((ByteBuffer) 
records.buffer().limit(this.fetchSize), Errors.NONE.code(), 100L, 0));
-        consumerClient.poll(0);
-        fetcher.fetchedRecords();
-    }
-
     @Test
     public void testUnauthorizedTopic() {
         subscriptions.assignFromUser(singleton(tp));
@@ -721,6 +701,7 @@ public class FetcherTest {
                                                int maxPollRecords) {
         return new Fetcher<>(consumerClient,
                 minBytes,
+                maxBytes,
                 maxWaitMs,
                 fetchSize,
                 maxPollRecords,

http://git-wip-us.apache.org/repos/asf/kafka/blob/d04b0998/clients/src/test/java/org/apache/kafka/common/internals/PartitionStatesTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/common/internals/PartitionStatesTest.java
 
b/clients/src/test/java/org/apache/kafka/common/internals/PartitionStatesTest.java
new file mode 100644
index 0000000..66c7abc
--- /dev/null
+++ 
b/clients/src/test/java/org/apache/kafka/common/internals/PartitionStatesTest.java
@@ -0,0 +1,219 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.internals;
+
+import org.apache.kafka.common.TopicPartition;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class PartitionStatesTest {
+
+    @Test
+    public void testSet() {
+        PartitionStates<String> states = new PartitionStates<>();
+        LinkedHashMap<TopicPartition, String> map = createMap();
+        states.set(map);
+        LinkedHashMap<TopicPartition, String> expected = new LinkedHashMap<>();
+        expected.put(new TopicPartition("foo", 2), "foo 2");
+        expected.put(new TopicPartition("foo", 0), "foo 0");
+        expected.put(new TopicPartition("blah", 2), "blah 2");
+        expected.put(new TopicPartition("blah", 1), "blah 1");
+        expected.put(new TopicPartition("baz", 2), "baz 2");
+        expected.put(new TopicPartition("baz", 3), "baz 3");
+        checkState(states, expected);
+
+        states.set(new LinkedHashMap<TopicPartition, String>());
+        checkState(states, new LinkedHashMap<TopicPartition, String>());
+    }
+
+    private LinkedHashMap<TopicPartition, String> createMap() {
+        LinkedHashMap<TopicPartition, String> map = new LinkedHashMap<>();
+        map.put(new TopicPartition("foo", 2), "foo 2");
+        map.put(new TopicPartition("blah", 2), "blah 2");
+        map.put(new TopicPartition("blah", 1), "blah 1");
+        map.put(new TopicPartition("baz", 2), "baz 2");
+        map.put(new TopicPartition("foo", 0), "foo 0");
+        map.put(new TopicPartition("baz", 3), "baz 3");
+        return map;
+    }
+
+    private void checkState(PartitionStates<String> states, 
LinkedHashMap<TopicPartition, String> expected) {
+        assertEquals(expected.keySet(), states.partitionSet());
+        assertEquals(expected.size(), states.size());
+        List<PartitionStates.PartitionState<String>> statesList = new 
ArrayList<>();
+        for (Map.Entry<TopicPartition, String> entry : expected.entrySet()) {
+            statesList.add(new 
PartitionStates.PartitionState<>(entry.getKey(), entry.getValue()));
+            assertTrue(states.contains(entry.getKey()));
+        }
+        assertEquals(statesList, states.partitionStates());
+    }
+
+    @Test
+    public void testMoveToEnd() {
+        PartitionStates<String> states = new PartitionStates<>();
+        LinkedHashMap<TopicPartition, String> map = createMap();
+        states.set(map);
+
+        states.moveToEnd(new TopicPartition("baz", 2));
+        LinkedHashMap<TopicPartition, String> expected = new LinkedHashMap<>();
+        expected.put(new TopicPartition("foo", 2), "foo 2");
+        expected.put(new TopicPartition("foo", 0), "foo 0");
+        expected.put(new TopicPartition("blah", 2), "blah 2");
+        expected.put(new TopicPartition("blah", 1), "blah 1");
+        expected.put(new TopicPartition("baz", 3), "baz 3");
+        expected.put(new TopicPartition("baz", 2), "baz 2");
+        checkState(states, expected);
+
+        states.moveToEnd(new TopicPartition("foo", 2));
+        expected = new LinkedHashMap<>();
+        expected.put(new TopicPartition("foo", 0), "foo 0");
+        expected.put(new TopicPartition("blah", 2), "blah 2");
+        expected.put(new TopicPartition("blah", 1), "blah 1");
+        expected.put(new TopicPartition("baz", 3), "baz 3");
+        expected.put(new TopicPartition("baz", 2), "baz 2");
+        expected.put(new TopicPartition("foo", 2), "foo 2");
+        checkState(states, expected);
+
+        // no-op
+        states.moveToEnd(new TopicPartition("foo", 2));
+        checkState(states, expected);
+
+        // partition doesn't exist
+        states.moveToEnd(new TopicPartition("baz", 5));
+        checkState(states, expected);
+
+        // topic doesn't exist
+        states.moveToEnd(new TopicPartition("aaa", 2));
+        checkState(states, expected);
+    }
+
+    @Test
+    public void testUpdateAndMoveToEnd() {
+        PartitionStates<String> states = new PartitionStates<>();
+        LinkedHashMap<TopicPartition, String> map = createMap();
+        states.set(map);
+
+        states.updateAndMoveToEnd(new TopicPartition("foo", 0), "foo 0 
updated");
+        LinkedHashMap<TopicPartition, String> expected = new LinkedHashMap<>();
+        expected.put(new TopicPartition("foo", 2), "foo 2");
+        expected.put(new TopicPartition("blah", 2), "blah 2");
+        expected.put(new TopicPartition("blah", 1), "blah 1");
+        expected.put(new TopicPartition("baz", 2), "baz 2");
+        expected.put(new TopicPartition("baz", 3), "baz 3");
+        expected.put(new TopicPartition("foo", 0), "foo 0 updated");
+        checkState(states, expected);
+
+        states.updateAndMoveToEnd(new TopicPartition("baz", 2), "baz 2 
updated");
+        expected = new LinkedHashMap<>();
+        expected.put(new TopicPartition("foo", 2), "foo 2");
+        expected.put(new TopicPartition("blah", 2), "blah 2");
+        expected.put(new TopicPartition("blah", 1), "blah 1");
+        expected.put(new TopicPartition("baz", 3), "baz 3");
+        expected.put(new TopicPartition("foo", 0), "foo 0 updated");
+        expected.put(new TopicPartition("baz", 2), "baz 2 updated");
+        checkState(states, expected);
+
+        // partition doesn't exist
+        states.updateAndMoveToEnd(new TopicPartition("baz", 5), "baz 5 new");
+        expected = new LinkedHashMap<>();
+        expected.put(new TopicPartition("foo", 2), "foo 2");
+        expected.put(new TopicPartition("blah", 2), "blah 2");
+        expected.put(new TopicPartition("blah", 1), "blah 1");
+        expected.put(new TopicPartition("baz", 3), "baz 3");
+        expected.put(new TopicPartition("foo", 0), "foo 0 updated");
+        expected.put(new TopicPartition("baz", 2), "baz 2 updated");
+        expected.put(new TopicPartition("baz", 5), "baz 5 new");
+        checkState(states, expected);
+
+        // topic doesn't exist
+        states.updateAndMoveToEnd(new TopicPartition("aaa", 2), "aaa 2 new");
+        expected = new LinkedHashMap<>();
+        expected.put(new TopicPartition("foo", 2), "foo 2");
+        expected.put(new TopicPartition("blah", 2), "blah 2");
+        expected.put(new TopicPartition("blah", 1), "blah 1");
+        expected.put(new TopicPartition("baz", 3), "baz 3");
+        expected.put(new TopicPartition("foo", 0), "foo 0 updated");
+        expected.put(new TopicPartition("baz", 2), "baz 2 updated");
+        expected.put(new TopicPartition("baz", 5), "baz 5 new");
+        expected.put(new TopicPartition("aaa", 2), "aaa 2 new");
+        checkState(states, expected);
+    }
+
+    @Test
+    public void testPartitionValues() {
+        PartitionStates<String> states = new PartitionStates<>();
+        LinkedHashMap<TopicPartition, String> map = createMap();
+        states.set(map);
+        List<String> expected = new ArrayList<>();
+        expected.add("foo 2");
+        expected.add("foo 0");
+        expected.add("blah 2");
+        expected.add("blah 1");
+        expected.add("baz 2");
+        expected.add("baz 3");
+        assertEquals(expected, states.partitionStateValues());
+    }
+
+    @Test
+    public void testClear() {
+        PartitionStates<String> states = new PartitionStates<>();
+        LinkedHashMap<TopicPartition, String> map = createMap();
+        states.set(map);
+        states.clear();
+        checkState(states, new LinkedHashMap<TopicPartition, String>());
+    }
+
+    @Test
+    public void testRemove() {
+        PartitionStates<String> states = new PartitionStates<>();
+        LinkedHashMap<TopicPartition, String> map = createMap();
+        states.set(map);
+
+        states.remove(new TopicPartition("foo", 2));
+        LinkedHashMap<TopicPartition, String> expected = new LinkedHashMap<>();
+        expected.put(new TopicPartition("foo", 0), "foo 0");
+        expected.put(new TopicPartition("blah", 2), "blah 2");
+        expected.put(new TopicPartition("blah", 1), "blah 1");
+        expected.put(new TopicPartition("baz", 2), "baz 2");
+        expected.put(new TopicPartition("baz", 3), "baz 3");
+        checkState(states, expected);
+
+        states.remove(new TopicPartition("blah", 1));
+        expected = new LinkedHashMap<>();
+        expected.put(new TopicPartition("foo", 0), "foo 0");
+        expected.put(new TopicPartition("blah", 2), "blah 2");
+        expected.put(new TopicPartition("baz", 2), "baz 2");
+        expected.put(new TopicPartition("baz", 3), "baz 3");
+        checkState(states, expected);
+
+        states.remove(new TopicPartition("baz", 3));
+        expected = new LinkedHashMap<>();
+        expected.put(new TopicPartition("foo", 0), "foo 0");
+        expected.put(new TopicPartition("blah", 2), "blah 2");
+        expected.put(new TopicPartition("baz", 2), "baz 2");
+        checkState(states, expected);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/d04b0998/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
 
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index dd77e03..b3baa63 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -30,6 +30,7 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -49,8 +50,8 @@ public class RequestResponseTest {
                 createControlledShutdownRequest(),
                 createControlledShutdownResponse(),
                 createControlledShutdownRequest().getErrorResponse(1, new 
UnknownServerException()),
-                createFetchRequest(),
-                createFetchRequest().getErrorResponse(1, new 
UnknownServerException()),
+                createFetchRequest(3),
+                createFetchRequest(3).getErrorResponse(3, new 
UnknownServerException()),
                 createFetchResponse(),
                 createHeartBeatRequest(),
                 createHeartBeatRequest().getErrorResponse(0, new 
UnknownServerException()),
@@ -112,11 +113,12 @@ public class RequestResponseTest {
         for (AbstractRequestResponse req : requestResponseList)
             checkSerialization(req, null);
 
+
+        checkOlderFetchVersions();
         checkSerialization(createMetadataResponse(0), 0);
         checkSerialization(createMetadataResponse(1), 1);
         
checkSerialization(createMetadataRequest(Arrays.asList("topic1")).getErrorResponse(0,
 new UnknownServerException()), 0);
         
checkSerialization(createMetadataRequest(Arrays.asList("topic1")).getErrorResponse(1,
 new UnknownServerException()), 1);
-        checkSerialization(createFetchRequest().getErrorResponse(0, new 
UnknownServerException()), 0);
         checkSerialization(createOffsetCommitRequest(0), 0);
         checkSerialization(createOffsetCommitRequest(0).getErrorResponse(0, 
new UnknownServerException()), 0);
         checkSerialization(createOffsetCommitRequest(1), 1);
@@ -129,6 +131,14 @@ public class RequestResponseTest {
         checkSerialization(createUpdateMetadataRequest(1, 
null).getErrorResponse(1, new UnknownServerException()), 1);
     }
 
+    private void checkOlderFetchVersions() throws Exception {
+        int latestVersion = ProtoUtils.latestVersion(ApiKeys.FETCH.id);
+        for (int i = 0; i < latestVersion; ++i) {
+            checkSerialization(createFetchRequest(i).getErrorResponse(i, new 
UnknownServerException()), i);
+            checkSerialization(createFetchRequest(i), i);
+        }
+    }
+
     private void checkSerialization(AbstractRequestResponse req, Integer 
version) throws Exception {
         ByteBuffer buffer = ByteBuffer.allocate(req.sizeOf());
         req.writeTo(buffer);
@@ -219,11 +229,15 @@ public class RequestResponseTest {
         return new GroupCoordinatorResponse(Errors.NONE.code(), new Node(10, 
"host1", 2014));
     }
 
-    private AbstractRequest createFetchRequest() {
-        Map<TopicPartition, FetchRequest.PartitionData> fetchData = new 
HashMap<>();
+    @SuppressWarnings("deprecation")
+    private AbstractRequest createFetchRequest(int version) {
+        LinkedHashMap<TopicPartition, FetchRequest.PartitionData> fetchData = 
new LinkedHashMap<>();
         fetchData.put(new TopicPartition("test1", 0), new 
FetchRequest.PartitionData(100, 1000000));
         fetchData.put(new TopicPartition("test2", 0), new 
FetchRequest.PartitionData(200, 1000000));
-        return new FetchRequest(-1, 100, 100000, fetchData);
+        if (version < 3)
+            return new FetchRequest(100, 100000, fetchData);
+        else
+            return new FetchRequest(100, 1000, 1000000, fetchData);
     }
 
     private AbstractRequestResponse createFetchResponse() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/d04b0998/core/src/main/scala/kafka/admin/ConfigCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/ConfigCommand.scala 
b/core/src/main/scala/kafka/admin/ConfigCommand.scala
index 58bdb7a..700c54d 100644
--- a/core/src/main/scala/kafka/admin/ConfigCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConfigCommand.scala
@@ -82,7 +82,6 @@ object ConfigCommand extends Config {
     val entity = parseEntity(opts)
     val entityType = entity.root.entityType
     val entityName = entity.fullSanitizedName
-    warnOnMaxMessagesChange(configsToBeAdded, opts.options.has(opts.forceOpt))
 
     // compile the final set of configs
     val configs = utils.fetchEntityConfig(zkUtils, entityType, entityName)
@@ -99,22 +98,9 @@ object ConfigCommand extends Config {
     println(s"Updated config for entity: $entity.")
   }
 
-  def warnOnMaxMessagesChange(configs: Properties, force: Boolean): Unit = {
-    val maxMessageBytes = configs.get(LogConfig.MaxMessageBytesProp) match {
-      case n: String => n.toInt
-      case _ => -1
-    }
-    if (maxMessageBytes > Defaults.MaxMessageSize){
-      error(TopicCommand.longMessageSizeWarning(maxMessageBytes))
-      if (!force)
-        TopicCommand.askToProceed
-    }
-  }
-
   private def parseBroker(broker: String): Int = {
-    try {
-      broker.toInt
-    }catch {
+    try broker.toInt
+    catch {
       case e: NumberFormatException =>
         throw new IllegalArgumentException(s"Error parsing broker $broker. The 
broker's Entity Name must be a single integer value")
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d04b0998/core/src/main/scala/kafka/admin/TopicCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala 
b/core/src/main/scala/kafka/admin/TopicCommand.scala
index 421486c..a35a989 100755
--- a/core/src/main/scala/kafka/admin/TopicCommand.scala
+++ b/core/src/main/scala/kafka/admin/TopicCommand.scala
@@ -97,13 +97,11 @@ object TopicCommand extends Logging {
     try {
       if (opts.options.has(opts.replicaAssignmentOpt)) {
         val assignment = 
parseReplicaAssignment(opts.options.valueOf(opts.replicaAssignmentOpt))
-        warnOnMaxMessagesChange(configs, 
assignment.valuesIterator.next().length, opts.options.has(opts.forceOpt))
         AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, 
topic, assignment, configs, update = false)
       } else {
         CommandLineUtils.checkRequiredArgs(opts.parser, opts.options, 
opts.partitionsOpt, opts.replicationFactorOpt)
         val partitions = opts.options.valueOf(opts.partitionsOpt).intValue
         val replicas = opts.options.valueOf(opts.replicationFactorOpt).intValue
-        warnOnMaxMessagesChange(configs, replicas, 
opts.options.has(opts.forceOpt))
         val rackAwareMode = if (opts.options.has(opts.disableRackAware)) 
RackAwareMode.Disabled
                             else RackAwareMode.Enforced
         AdminUtils.createTopic(zkUtils, topic, partitions, replicas, configs, 
rackAwareMode)
@@ -357,20 +355,6 @@ object TopicCommand extends Logging {
       CommandLineUtils.checkInvalidArgs(parser, options, ifNotExistsOpt, 
allTopicLevelOpts -- Set(createOpt))
     }
   }
-  def warnOnMaxMessagesChange(configs: Properties, replicas: Integer, force: 
Boolean): Unit = {
-    val maxMessageBytes =  configs.get(LogConfig.MaxMessageBytesProp) match {
-      case n: String => n.toInt
-      case _ => -1
-    }
-    if (maxMessageBytes > Defaults.MaxMessageSize)
-      if (replicas > 1) {
-        error(longMessageSizeWarning(maxMessageBytes))
-        if (!force)
-          askToProceed
-      }
-      else
-        warn(shortMessageSizeWarning(maxMessageBytes))
-  }
 
   def askToProceed: Unit = {
     println("Are you sure you want to continue? [y/n]")
@@ -380,37 +364,5 @@ object TopicCommand extends Logging {
     }
   }
 
-  def shortMessageSizeWarning(maxMessageBytes: Int): String = {
-    "\n\n" +
-      
"*****************************************************************************************************\n"
 +
-      "*** WARNING: you are creating a topic where the max.message.bytes is 
greater than the broker's    ***\n" +
-      "*** default max.message.bytes. This operation is potentially dangerous. 
Consumers will get        ***\n" +
-      s"*** failures if their fetch.message.max.bytes (old consumer) or 
${NewConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG}         ***\n"+
-      "*** (new consumer) < the value you are using.                           
                          ***\n" +
-      
"*****************************************************************************************************\n"
 +
-      s"- value set here: $maxMessageBytes\n" +
-      s"- Default Old Consumer fetch.message.max.bytes: 
${OldConsumerConfig.FetchSize}\n" +
-      s"- Default New Consumer 
${NewConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG}: 
${NewConsumerConfig.DEFAULT_MAX_PARTITION_FETCH_BYTES}\n" +
-      s"- Default Broker max.message.bytes: 
${kafka.server.Defaults.MessageMaxBytes}\n\n"
-  }
-
-  def longMessageSizeWarning(maxMessageBytes: Int): String = {
-    "\n\n" +
-      
"*****************************************************************************************************\n"
 +
-      "*** WARNING: you are creating a topic where the max.message.bytes is 
greater than the broker's    ***\n" +
-      "*** default max.message.bytes. This operation is dangerous. There are 
two potential side effects: ***\n" +
-      "*** - Consumers will get failures if their fetch.message.max.bytes (old 
consumer) or              ***\n" +
-      s"***   ${NewConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG} (new 
consumer) < the value you are using                          ***\n" +
-      "*** - Producer requests larger than replica.fetch.max.bytes will not 
replicate and hence have     ***\n" +
-      "***   a higher risk of data loss                                        
                          ***\n" +
-      "*** You should ensure both of these settings are greater than the value 
set here before using     ***\n" +
-      "*** this topic.                                                         
                          ***\n" +
-      
"*****************************************************************************************************\n"
 +
-      s"- value set here: $maxMessageBytes\n" +
-      s"- Default Broker replica.fetch.max.bytes: 
${kafka.server.Defaults.ReplicaFetchMaxBytes}\n" +
-      s"- Default Broker max.message.bytes: 
${kafka.server.Defaults.MessageMaxBytes}\n" +
-      s"- Default Old Consumer fetch.message.max.bytes: 
${OldConsumerConfig.FetchSize}\n" +
-      s"- Default New Consumer 
${NewConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG}: 
${NewConsumerConfig.DEFAULT_MAX_PARTITION_FETCH_BYTES}\n\n"
-  }
 }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/d04b0998/core/src/main/scala/kafka/api/ApiVersion.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/ApiVersion.scala 
b/core/src/main/scala/kafka/api/ApiVersion.scala
index d955225..cdd418d 100644
--- a/core/src/main/scala/kafka/api/ApiVersion.scala
+++ b/core/src/main/scala/kafka/api/ApiVersion.scala
@@ -54,7 +54,10 @@ object ApiVersion {
     "0.10.0" -> KAFKA_0_10_0_IV1,
 
     // introduced for JoinGroup protocol change in KIP-62
-    "0.10.1-IV0" -> KAFKA_0_10_1_IV0
+    "0.10.1-IV0" -> KAFKA_0_10_1_IV0,
+    // 0.10.1-IV1 is introduced for KIP-74(fetch response size limit).
+    "0.10.1-IV1" -> KAFKA_0_10_1_IV1,
+    "0.10.1" -> KAFKA_0_10_1_IV1
   )
 
   private val versionPattern = "\\.".r
@@ -120,3 +123,9 @@ case object KAFKA_0_10_1_IV0 extends ApiVersion {
   val messageFormatVersion: Byte = Message.MagicValue_V1
   val id: Int = 6
 }
+
+case object KAFKA_0_10_1_IV1 extends ApiVersion {
+  val version: String = "0.10.1-IV1"
+  val messageFormatVersion: Byte = Message.MagicValue_V1
+  val id: Int = 7
+}

Reply via email to