http://git-wip-us.apache.org/repos/asf/kafka/blob/d04b0998/core/src/main/scala/kafka/api/FetchRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/FetchRequest.scala 
b/core/src/main/scala/kafka/api/FetchRequest.scala
index f74bd1c..f100d4b 100644
--- a/core/src/main/scala/kafka/api/FetchRequest.scala
+++ b/core/src/main/scala/kafka/api/FetchRequest.scala
@@ -23,19 +23,24 @@ import kafka.common.TopicAndPartition
 import kafka.consumer.ConsumerConfig
 import kafka.network.RequestChannel
 import kafka.message.MessageSet
-
 import java.util.concurrent.atomic.AtomicInteger
 import java.nio.ByteBuffer
+
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 
-import scala.collection.immutable.Map
+import scala.collection.mutable.ArrayBuffer
+import scala.util.Random
 
 case class PartitionFetchInfo(offset: Long, fetchSize: Int)
 
 object FetchRequest {
-  val CurrentVersion = 2.shortValue
+
+  private val random = new Random
+
+  val CurrentVersion = 3.shortValue
   val DefaultMaxWait = 0
   val DefaultMinBytes = 0
+  val DefaultMaxBytes = Int.MaxValue
   val DefaultCorrelationId = 0
 
   def readFrom(buffer: ByteBuffer): FetchRequest = {
@@ -45,6 +50,7 @@ object FetchRequest {
     val replicaId = buffer.getInt
     val maxWait = buffer.getInt
     val minBytes = buffer.getInt
+    val maxBytes = if (versionId < 3) DefaultMaxBytes else buffer.getInt
     val topicCount = buffer.getInt
     val pairs = (1 to topicCount).flatMap(_ => {
       val topic = readShortString(buffer)
@@ -56,8 +62,22 @@ object FetchRequest {
         (TopicAndPartition(topic, partitionId), PartitionFetchInfo(offset, 
fetchSize))
       })
     })
-    FetchRequest(versionId, correlationId, clientId, replicaId, maxWait, 
minBytes, Map(pairs:_*))
+    FetchRequest(versionId, correlationId, clientId, replicaId, maxWait, 
minBytes, maxBytes, Vector(pairs:_*))
   }
+
+  def shuffle(requestInfo: Seq[(TopicAndPartition, PartitionFetchInfo)]): 
Seq[(TopicAndPartition, PartitionFetchInfo)] =
+    random.shuffle(requestInfo)
+
+  def batchByTopic[T](s: Seq[(TopicAndPartition, T)]): Seq[(String, Seq[(Int, 
T)])] = {
+    val result = new ArrayBuffer[(String, ArrayBuffer[(Int, T)])]
+    s.foreach { case (TopicAndPartition(t, p), value) =>
+      if (result.isEmpty || result.last._1 != t)
+        result += (t -> new ArrayBuffer)
+      result.last._2 += (p -> value)
+    }
+    result
+  }
+
 }
 
 case class FetchRequest(versionId: Short = FetchRequest.CurrentVersion,
@@ -66,29 +86,50 @@ case class FetchRequest(versionId: Short = 
FetchRequest.CurrentVersion,
                         replicaId: Int = Request.OrdinaryConsumerId,
                         maxWait: Int = FetchRequest.DefaultMaxWait,
                         minBytes: Int = FetchRequest.DefaultMinBytes,
-                        requestInfo: Map[TopicAndPartition, 
PartitionFetchInfo])
+                        maxBytes: Int = FetchRequest.DefaultMaxBytes,
+                        requestInfo: Seq[(TopicAndPartition, 
PartitionFetchInfo)])
         extends RequestOrResponse(Some(ApiKeys.FETCH.id)) {
 
   /**
-   * Partitions the request info into a map of maps (one for each topic).
-   */
-  lazy val requestInfoGroupedByTopic = requestInfo.groupBy(_._1.topic)
+    * Partitions the request info into a list of lists (one for each topic) 
while preserving request info ordering
+    */
+  private type PartitionInfos = Seq[(Int, PartitionFetchInfo)]
+  private lazy val requestInfoGroupedByTopic: Seq[(String, PartitionInfos)] = 
FetchRequest.batchByTopic(requestInfo)
 
-  /**
-   *  Public constructor for the clients
-   */
+  /** Public constructor for the clients */
+  @deprecated("The order of partitions in `requestInfo` is relevant, so this 
constructor is deprecated in favour of the " +
+    "one that takes a Seq", since = "0.10.1.0")
   def this(correlationId: Int,
            clientId: String,
            maxWait: Int,
            minBytes: Int,
+           maxBytes: Int,
            requestInfo: Map[TopicAndPartition, PartitionFetchInfo]) {
     this(versionId = FetchRequest.CurrentVersion,
          correlationId = correlationId,
          clientId = clientId,
          replicaId = Request.OrdinaryConsumerId,
          maxWait = maxWait,
-         minBytes= minBytes,
-         requestInfo = requestInfo)
+         minBytes = minBytes,
+         maxBytes = maxBytes,
+         requestInfo = FetchRequest.shuffle(requestInfo.toSeq))
+  }
+
+  /** Public constructor for the clients */
+  def this(correlationId: Int,
+           clientId: String,
+           maxWait: Int,
+           minBytes: Int,
+           maxBytes: Int,
+           requestInfo: Seq[(TopicAndPartition, PartitionFetchInfo)]) {
+    this(versionId = FetchRequest.CurrentVersion,
+      correlationId = correlationId,
+      clientId = clientId,
+      replicaId = Request.OrdinaryConsumerId,
+      maxWait = maxWait,
+      minBytes = minBytes,
+      maxBytes = maxBytes,
+      requestInfo = requestInfo)
   }
 
   def writeTo(buffer: ByteBuffer) {
@@ -98,13 +139,15 @@ case class FetchRequest(versionId: Short = 
FetchRequest.CurrentVersion,
     buffer.putInt(replicaId)
     buffer.putInt(maxWait)
     buffer.putInt(minBytes)
+    if (versionId >= 3)
+      buffer.putInt(maxBytes)
     buffer.putInt(requestInfoGroupedByTopic.size) // topic count
     requestInfoGroupedByTopic.foreach {
       case (topic, partitionFetchInfos) =>
         writeShortString(buffer, topic)
         buffer.putInt(partitionFetchInfos.size) // partition count
         partitionFetchInfos.foreach {
-          case (TopicAndPartition(_, partition), PartitionFetchInfo(offset, 
fetchSize)) =>
+          case (partition, PartitionFetchInfo(offset, fetchSize)) =>
             buffer.putInt(partition)
             buffer.putLong(offset)
             buffer.putInt(fetchSize)
@@ -119,6 +162,7 @@ case class FetchRequest(versionId: Short = 
FetchRequest.CurrentVersion,
     4 + /* replicaId */
     4 + /* maxWait */
     4 + /* minBytes */
+    (if (versionId >= 3) 4 /* maxBytes */ else 0) +
     4 + /* topic count */
     requestInfoGroupedByTopic.foldLeft(0)((foldedTopics, currTopic) => {
       val (topic, partitionFetchInfos) = currTopic
@@ -165,6 +209,7 @@ case class FetchRequest(versionId: Short = 
FetchRequest.CurrentVersion,
     fetchRequest.append("; ReplicaId: " + replicaId)
     fetchRequest.append("; MaxWait: " + maxWait + " ms")
     fetchRequest.append("; MinBytes: " + minBytes + " bytes")
+    fetchRequest.append("; MaxBytes:" + maxBytes + " bytes")
     if(details)
       fetchRequest.append("; RequestInfo: " + requestInfo.mkString(","))
     fetchRequest.toString()
@@ -179,10 +224,11 @@ class FetchRequestBuilder() {
   private var replicaId = Request.OrdinaryConsumerId
   private var maxWait = FetchRequest.DefaultMaxWait
   private var minBytes = FetchRequest.DefaultMinBytes
-  private val requestMap = new collection.mutable.HashMap[TopicAndPartition, 
PartitionFetchInfo]
+  private var maxBytes = FetchRequest.DefaultMaxBytes
+  private val requestMap = new 
collection.mutable.ArrayBuffer[(TopicAndPartition, PartitionFetchInfo)]
 
   def addFetch(topic: String, partition: Int, offset: Long, fetchSize: Int) = {
-    requestMap.put(TopicAndPartition(topic, partition), 
PartitionFetchInfo(offset, fetchSize))
+    requestMap.append((TopicAndPartition(topic, partition), 
PartitionFetchInfo(offset, fetchSize)))
     this
   }
 
@@ -209,13 +255,19 @@ class FetchRequestBuilder() {
     this
   }
 
+  def maxBytes(maxBytes: Int): FetchRequestBuilder = {
+    this.maxBytes = maxBytes
+    this
+  }
+
   def requestVersion(versionId: Short): FetchRequestBuilder = {
     this.versionId = versionId
     this
   }
 
   def build() = {
-    val fetchRequest = FetchRequest(versionId, correlationId.getAndIncrement, 
clientId, replicaId, maxWait, minBytes, requestMap.toMap)
+    val fetchRequest = FetchRequest(versionId, correlationId.getAndIncrement, 
clientId, replicaId, maxWait, minBytes,
+      maxBytes, new ArrayBuffer() ++ requestMap)
     requestMap.clear()
     fetchRequest
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d04b0998/core/src/main/scala/kafka/api/FetchResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/FetchResponse.scala 
b/core/src/main/scala/kafka/api/FetchResponse.scala
index 1066d7f..d99bbcd 100644
--- a/core/src/main/scala/kafka/api/FetchResponse.scala
+++ b/core/src/main/scala/kafka/api/FetchResponse.scala
@@ -28,6 +28,7 @@ import org.apache.kafka.common.network.{Send, MultiSend}
 import org.apache.kafka.common.protocol.Errors
 
 import scala.collection._
+import JavaConverters._
 
 object FetchResponsePartitionData {
   def readFrom(buffer: ByteBuffer): FetchResponsePartitionData = {
@@ -100,7 +101,7 @@ object TopicData {
       val partitionData = FetchResponsePartitionData.readFrom(buffer)
       (partitionId, partitionData)
     })
-    TopicData(topic, Map(topicPartitionDataPairs:_*))
+    TopicData(topic, Seq(topicPartitionDataPairs:_*))
   }
 
   def headerSize(topic: String) =
@@ -108,9 +109,11 @@ object TopicData {
     4 /* partition count */
 }
 
-case class TopicData(topic: String, partitionData: Map[Int, 
FetchResponsePartitionData]) {
+case class TopicData(topic: String, partitionData: Seq[(Int, 
FetchResponsePartitionData)]) {
   val sizeInBytes =
-    TopicData.headerSize(topic) + partitionData.values.foldLeft(0)(_ + 
_.sizeInBytes + 4)
+    TopicData.headerSize(topic) + partitionData.foldLeft(0)((folded, data) => {
+      folded + data._2.sizeInBytes + 4
+    }                                  /*_ + _.sizeInBytes + 4*/)
 
   val headerSize = TopicData.headerSize(topic)
 }
@@ -168,14 +171,18 @@ object FetchResponse {
     val topicCount = buffer.getInt
     val pairs = (1 to topicCount).flatMap(_ => {
       val topicData = TopicData.readFrom(buffer)
-      topicData.partitionData.map {
-        case (partitionId, partitionData) =>
-          (TopicAndPartition(topicData.topic, partitionId), partitionData)
+      topicData.partitionData.map { case (partitionId, partitionData) =>
+        (TopicAndPartition(topicData.topic, partitionId), partitionData)
       }
     })
-    FetchResponse(correlationId, Map(pairs:_*), requestVersion, throttleTime)
+    FetchResponse(correlationId, Vector(pairs:_*), requestVersion, 
throttleTime)
   }
 
+  type FetchResponseEntry = (Int, FetchResponsePartitionData)
+
+  def batchByTopic(data: Seq[(TopicAndPartition, 
FetchResponsePartitionData)]): Seq[(String, Seq[FetchResponseEntry])] =
+    FetchRequest.batchByTopic(data)
+
   // Returns the size of the response header
   def headerSize(requestVersion: Int): Int = {
     val throttleTimeSize = if (requestVersion > 0) 4 else 0
@@ -185,12 +192,12 @@ object FetchResponse {
   }
 
   // Returns the size of entire fetch response in bytes (including the header 
size)
-  def responseSize(dataGroupedByTopic: Map[String, Map[TopicAndPartition, 
FetchResponsePartitionData]],
+  def responseSize(dataGroupedByTopic: Seq[(String, Seq[FetchResponseEntry])],
                    requestVersion: Int): Int = {
     headerSize(requestVersion) +
     dataGroupedByTopic.foldLeft(0) { case (folded, (topic, partitionDataMap)) 
=>
       val topicData = TopicData(topic, partitionDataMap.map {
-        case (topicAndPartition, partitionData) => 
(topicAndPartition.partition, partitionData)
+        case (partitionId, partitionData) => (partitionId, partitionData)
       })
       folded + topicData.sizeInBytes
     }
@@ -198,7 +205,7 @@ object FetchResponse {
 }
 
 case class FetchResponse(correlationId: Int,
-                         data: Map[TopicAndPartition, 
FetchResponsePartitionData],
+                         data: Seq[(TopicAndPartition, 
FetchResponsePartitionData)],
                          requestVersion: Int = 0,
                          throttleTimeMs: Int = 0)
   extends RequestOrResponse() {
@@ -206,7 +213,8 @@ case class FetchResponse(correlationId: Int,
   /**
    * Partitions the data into a map of maps (one for each topic).
    */
-  lazy val dataGroupedByTopic = data.groupBy{ case (topicAndPartition, 
fetchData) => topicAndPartition.topic }
+  private lazy val dataByTopicAndPartition = data.toMap
+  lazy val dataGroupedByTopic = FetchResponse.batchByTopic(data)
   val headerSizeInBytes = FetchResponse.headerSize(requestVersion)
   lazy val sizeInBytes = FetchResponse.responseSize(dataGroupedByTopic, 
requestVersion)
 
@@ -234,7 +242,7 @@ case class FetchResponse(correlationId: Int,
 
   private def partitionDataFor(topic: String, partition: Int): 
FetchResponsePartitionData = {
     val topicAndPartition = TopicAndPartition(topic, partition)
-    data.get(topicAndPartition) match {
+    dataByTopicAndPartition.get(topicAndPartition) match {
       case Some(partitionData) => partitionData
       case _ =>
         throw new IllegalArgumentException(
@@ -247,7 +255,7 @@ case class FetchResponse(correlationId: Int,
 
   def highWatermark(topic: String, partition: Int) = partitionDataFor(topic, 
partition).hw
 
-  def hasError = data.values.exists(_.error != Errors.NONE.code)
+  def hasError = dataByTopicAndPartition.values.exists(_.error != 
Errors.NONE.code)
 
   def errorCode(topic: String, partition: Int) = partitionDataFor(topic, 
partition).error
 }
@@ -274,10 +282,9 @@ class FetchResponseSend(val dest: String, val 
fetchResponse: FetchResponse) exte
   fetchResponse.writeHeaderTo(buffer)
   buffer.rewind()
 
-  private val sends = new MultiSend(dest, 
JavaConversions.seqAsJavaList(fetchResponse.dataGroupedByTopic.toList.map {
-    case(topic, data) => new TopicDataSend(dest, TopicData(topic,
-                                                     
data.map{case(topicAndPartition, message) => (topicAndPartition.partition, 
message)}))
-    }))
+  private val sends = new MultiSend(dest, fetchResponse.dataGroupedByTopic.map 
{
+    case (topic, data) => new TopicDataSend(dest, TopicData(topic, data)): Send
+  }.asJava)
 
   override def writeTo(channel: GatheringByteChannel): Long = {
     if (completed)

http://git-wip-us.apache.org/repos/asf/kafka/blob/d04b0998/core/src/main/scala/kafka/consumer/ConsumerConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ConsumerConfig.scala 
b/core/src/main/scala/kafka/consumer/ConsumerConfig.scala
index e1c792d..fd8983c 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerConfig.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerConfig.scala
@@ -38,6 +38,7 @@ object ConsumerConfig extends Config {
   val AutoOffsetReset = OffsetRequest.LargestTimeString
   val ConsumerTimeoutMs = -1
   val MinFetchBytes = 1
+  val MaxFetchBytes = 50 * 1024 * 1024
   val MaxFetchWaitMs = 100
   val MirrorTopicsWhitelist = ""
   val MirrorTopicsBlacklist = ""
@@ -119,7 +120,7 @@ class ConsumerConfig private (val props: 
VerifiableProperties) extends ZKConfig(
   /** the socket receive buffer for network requests */
   val socketReceiveBufferBytes = props.getInt("socket.receive.buffer.bytes", 
SocketBufferSize)
   
-  /** the number of bytes of messages to attempt to fetch */
+  /** the number of bytes of messages to attempt to fetch from each partition 
*/
   val fetchMessageMaxBytes = props.getInt("fetch.message.max.bytes", FetchSize)
 
   /** the number threads used to fetch data */
@@ -140,6 +141,9 @@ class ConsumerConfig private (val props: 
VerifiableProperties) extends ZKConfig(
   /** the minimum amount of data the server should return for a fetch request. 
If insufficient data is available the request will block */
   val fetchMinBytes = props.getInt("fetch.min.bytes", MinFetchBytes)
   
+  /** the maximum amount of data the server should return for a fetch request 
*/
+  val fetchMaxBytes = props.getInt("fetch.max.bytes", MaxFetchBytes)
+
   /** the maximum amount of time the server will block before answering the 
fetch request if there isn't sufficient data to immediately satisfy 
fetch.min.bytes */
   val fetchWaitMaxMs = props.getInt("fetch.wait.max.ms", MaxFetchWaitMs)
   require(fetchWaitMaxMs <= socketTimeoutMs, "socket.timeout.ms should always 
be at least fetch.wait.max.ms" +

http://git-wip-us.apache.org/repos/asf/kafka/blob/d04b0998/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala 
b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
index e73faf2..5b5fe0d 100755
--- a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
@@ -17,10 +17,10 @@
 
 package kafka.consumer
 
-import org.I0Itec.zkclient.ZkClient
 import kafka.server.{BrokerAndInitialOffset, AbstractFetcherThread, 
AbstractFetcherManager}
 import kafka.cluster.{BrokerEndPoint, Cluster}
 import org.apache.kafka.common.protocol.SecurityProtocol
+import org.apache.kafka.common.TopicPartition
 import scala.collection.immutable
 import collection.mutable.HashMap
 import scala.collection.mutable
@@ -28,7 +28,6 @@ import java.util.concurrent.locks.ReentrantLock
 import kafka.utils.CoreUtils.inLock
 import kafka.utils.ZkUtils
 import kafka.utils.{ShutdownableThread, SystemTime}
-import kafka.common.TopicAndPartition
 import kafka.client.ClientUtils
 import java.util.concurrent.atomic.AtomicInteger
 
@@ -42,9 +41,9 @@ class ConsumerFetcherManager(private val consumerIdString: 
String,
                              private val zkUtils : ZkUtils)
         extends 
AbstractFetcherManager("ConsumerFetcherManager-%d".format(SystemTime.milliseconds),
                                        config.clientId, 
config.numConsumerFetchers) {
-  private var partitionMap: immutable.Map[TopicAndPartition, 
PartitionTopicInfo] = null
+  private var partitionMap: immutable.Map[TopicPartition, PartitionTopicInfo] 
= null
   private var cluster: Cluster = null
-  private val noLeaderPartitionSet = new mutable.HashSet[TopicAndPartition]
+  private val noLeaderPartitionSet = new mutable.HashSet[TopicPartition]
   private val lock = new ReentrantLock
   private val cond = lock.newCondition()
   private var leaderFinderThread: ShutdownableThread = null
@@ -53,7 +52,7 @@ class ConsumerFetcherManager(private val consumerIdString: 
String,
   private class LeaderFinderThread(name: String) extends 
ShutdownableThread(name) {
     // thread responsible for adding the fetcher to the right broker when 
leader is available
     override def doWork() {
-      val leaderForPartitionsMap = new HashMap[TopicAndPartition, 
BrokerEndPoint]
+      val leaderForPartitionsMap = new HashMap[TopicPartition, BrokerEndPoint]
       lock.lock()
       try {
         while (noLeaderPartitionSet.isEmpty) {
@@ -72,7 +71,7 @@ class ConsumerFetcherManager(private val consumerIdString: 
String,
         topicsMetadata.foreach { tmd =>
           val topic = tmd.topic
           tmd.partitionsMetadata.foreach { pmd =>
-            val topicAndPartition = TopicAndPartition(topic, pmd.partitionId)
+            val topicAndPartition = new TopicPartition(topic, pmd.partitionId)
             if(pmd.leader.isDefined && 
noLeaderPartitionSet.contains(topicAndPartition)) {
               val leaderBroker = pmd.leader.get
               leaderForPartitionsMap.put(topicAndPartition, leaderBroker)
@@ -92,9 +91,8 @@ class ConsumerFetcherManager(private val consumerIdString: 
String,
       }
 
       try {
-        addFetcherForPartitions(leaderForPartitionsMap.map{
-          case (topicAndPartition, broker) =>
-            topicAndPartition -> BrokerAndInitialOffset(broker, 
partitionMap(topicAndPartition).getFetchOffset())}
+        addFetcherForPartitions(leaderForPartitionsMap.map { case 
(topicPartition, broker) =>
+          topicPartition -> BrokerAndInitialOffset(broker, 
partitionMap(topicPartition).getFetchOffset())}
         )
       } catch {
         case t: Throwable => {
@@ -125,9 +123,9 @@ class ConsumerFetcherManager(private val consumerIdString: 
String,
     leaderFinderThread.start()
 
     inLock(lock) {
-      partitionMap = topicInfos.map(tpi => (TopicAndPartition(tpi.topic, 
tpi.partitionId), tpi)).toMap
+      partitionMap = topicInfos.map(tpi => (new TopicPartition(tpi.topic, 
tpi.partitionId), tpi)).toMap
       this.cluster = cluster
-      noLeaderPartitionSet ++= topicInfos.map(tpi => 
TopicAndPartition(tpi.topic, tpi.partitionId))
+      noLeaderPartitionSet ++= topicInfos.map(tpi => new 
TopicPartition(tpi.topic, tpi.partitionId))
       cond.signalAll()
     }
   }
@@ -154,7 +152,7 @@ class ConsumerFetcherManager(private val consumerIdString: 
String,
     info("All connections stopped")
   }
 
-  def addPartitionsWithError(partitionList: Iterable[TopicAndPartition]) {
+  def addPartitionsWithError(partitionList: Iterable[TopicPartition]) {
     debug("adding partitions with error %s".format(partitionList))
     inLock(lock) {
       if (partitionMap != null) {
@@ -163,4 +161,4 @@ class ConsumerFetcherManager(private val consumerIdString: 
String,
       }
     }
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/d04b0998/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala 
b/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
index c33122b..c47efb7 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
@@ -24,11 +24,12 @@ import kafka.server.{PartitionFetchState, 
AbstractFetcherThread}
 import kafka.common.{ErrorMapping, TopicAndPartition}
 import scala.collection.Map
 import ConsumerFetcherThread._
+import org.apache.kafka.common.TopicPartition
 
 class ConsumerFetcherThread(name: String,
                             val config: ConsumerConfig,
                             sourceBroker: BrokerEndPoint,
-                            partitionMap: Map[TopicAndPartition, 
PartitionTopicInfo],
+                            partitionMap: Map[TopicPartition, 
PartitionTopicInfo],
                             val consumerFetcherManager: ConsumerFetcherManager)
         extends AbstractFetcherThread(name = name,
                                       clientId = config.clientId,
@@ -65,55 +66,59 @@ class ConsumerFetcherThread(name: String,
   }
 
   // process fetched data
-  def processPartitionData(topicAndPartition: TopicAndPartition, fetchOffset: 
Long, partitionData: PartitionData) {
-    val pti = partitionMap(topicAndPartition)
+  def processPartitionData(topicPartition: TopicPartition, fetchOffset: Long, 
partitionData: PartitionData) {
+    val pti = partitionMap(topicPartition)
     if (pti.getFetchOffset != fetchOffset)
       throw new RuntimeException("Offset doesn't match for partition [%s,%d] 
pti offset: %d fetch offset: %d"
-                                .format(topicAndPartition.topic, 
topicAndPartition.partition, pti.getFetchOffset, fetchOffset))
+                                .format(topicPartition.topic, 
topicPartition.partition, pti.getFetchOffset, fetchOffset))
     
pti.enqueue(partitionData.underlying.messages.asInstanceOf[ByteBufferMessageSet])
   }
 
   // handle a partition whose offset is out of range and return a new fetch 
offset
-  def handleOffsetOutOfRange(topicAndPartition: TopicAndPartition): Long = {
+  def handleOffsetOutOfRange(topicPartition: TopicPartition): Long = {
     val startTimestamp = config.autoOffsetReset match {
       case OffsetRequest.SmallestTimeString => OffsetRequest.EarliestTime
       case OffsetRequest.LargestTimeString => OffsetRequest.LatestTime
       case _ => OffsetRequest.LatestTime
     }
+    val topicAndPartition = new TopicAndPartition(topicPartition.topic, 
topicPartition.partition)
     val newOffset = simpleConsumer.earliestOrLatestOffset(topicAndPartition, 
startTimestamp, Request.OrdinaryConsumerId)
-    val pti = partitionMap(topicAndPartition)
+    val pti = partitionMap(topicPartition)
     pti.resetFetchOffset(newOffset)
     pti.resetConsumeOffset(newOffset)
     newOffset
   }
 
   // any logic for partitions whose leader has changed
-  def handlePartitionsWithErrors(partitions: Iterable[TopicAndPartition]) {
+  def handlePartitionsWithErrors(partitions: Iterable[TopicPartition]) {
     removePartitions(partitions.toSet)
     consumerFetcherManager.addPartitionsWithError(partitions)
   }
 
-  protected def buildFetchRequest(partitionMap: 
collection.Map[TopicAndPartition, PartitionFetchState]): FetchRequest = {
-    partitionMap.foreach { case ((topicAndPartition, partitionFetchState)) =>
+  protected def buildFetchRequest(partitionMap: 
collection.Seq[(TopicPartition, PartitionFetchState)]): FetchRequest = {
+    partitionMap.foreach { case ((topicPartition, partitionFetchState)) =>
       if (partitionFetchState.isActive)
-        fetchRequestBuilder.addFetch(topicAndPartition.topic, 
topicAndPartition.partition, partitionFetchState.offset,
+        fetchRequestBuilder.addFetch(topicPartition.topic, 
topicPartition.partition, partitionFetchState.offset,
           fetchSize)
     }
 
     new FetchRequest(fetchRequestBuilder.build())
   }
 
-  protected def fetch(fetchRequest: FetchRequest): 
collection.Map[TopicAndPartition, PartitionData] =
-    simpleConsumer.fetch(fetchRequest.underlying).data.map { case (key, value) 
=>
-      key -> new PartitionData(value)
+  protected def fetch(fetchRequest: FetchRequest): Seq[(TopicPartition, 
PartitionData)] =
+    simpleConsumer.fetch(fetchRequest.underlying).data.map { case 
(TopicAndPartition(t, p), value) =>
+      new TopicPartition(t, p) -> new PartitionData(value)
     }
 }
 
 object ConsumerFetcherThread {
 
   class FetchRequest(val underlying: kafka.api.FetchRequest) extends 
AbstractFetcherThread.FetchRequest {
+    private lazy val tpToOffset: Map[TopicPartition, Long] = 
underlying.requestInfo.map { case (tp, fetchInfo) =>
+      new TopicPartition(tp.topic, tp.partition) -> fetchInfo.offset
+    }.toMap
     def isEmpty: Boolean = underlying.requestInfo.isEmpty
-    def offset(topicAndPartition: TopicAndPartition): Long = 
underlying.requestInfo(topicAndPartition).offset
+    def offset(topicPartition: TopicPartition): Long = 
tpToOffset(topicPartition)
   }
 
   class PartitionData(val underlying: FetchResponsePartitionData) extends 
AbstractFetcherThread.PartitionData {

http://git-wip-us.apache.org/repos/asf/kafka/blob/d04b0998/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala 
b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
index cf8ae91..1dc2a49 100644
--- a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
@@ -410,7 +410,7 @@ class GroupMetadataManager(val brokerId: Int,
 
             while (currOffset < getHighWatermark(offsetsPartition) && 
!shuttingDown.get()) {
               buffer.clear()
-              val messages = log.read(currOffset, 
config.loadBufferSize).messageSet.asInstanceOf[FileMessageSet]
+              val messages = log.read(currOffset, config.loadBufferSize, 
minOneMessage = true).messageSet.asInstanceOf[FileMessageSet]
               messages.readInto(buffer, 0)
               val messageSet = new ByteBufferMessageSet(buffer)
               messageSet.foreach { msgAndOffset =>

http://git-wip-us.apache.org/repos/asf/kafka/blob/d04b0998/core/src/main/scala/kafka/javaapi/FetchRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/javaapi/FetchRequest.scala 
b/core/src/main/scala/kafka/javaapi/FetchRequest.scala
index 4060077..fb9fa8e 100644
--- a/core/src/main/scala/kafka/javaapi/FetchRequest.scala
+++ b/core/src/main/scala/kafka/javaapi/FetchRequest.scala
@@ -17,32 +17,44 @@
 
 package kafka.javaapi
 
-import java.nio.ByteBuffer
+import java.util
+
 import kafka.common.TopicAndPartition
-import kafka.api.{Request, PartitionFetchInfo}
-import scala.collection.mutable
+import kafka.api.{PartitionFetchInfo, Request}
+
+import scala.collection.JavaConverters._
+
+object FetchRequest {
+  private def seqToLinkedHashMap[K, V](s: Seq[(K, V)]): util.LinkedHashMap[K, 
V] = {
+    val map = new util.LinkedHashMap[K, V]
+    s.foreach { case (k, v) => map.put(k, v) }
+    map
+  }
+}
 
 class FetchRequest(correlationId: Int,
                    clientId: String,
                    maxWait: Int,
                    minBytes: Int,
-                   requestInfo: java.util.Map[TopicAndPartition, 
PartitionFetchInfo]) {
-
-  val underlying = {
-    val scalaMap: Map[TopicAndPartition, PartitionFetchInfo] = {
-      import scala.collection.JavaConversions._
-      (requestInfo: mutable.Map[TopicAndPartition, PartitionFetchInfo]).toMap
-    }
-    kafka.api.FetchRequest(
-      correlationId = correlationId,
-      clientId = clientId,
-      replicaId = Request.OrdinaryConsumerId,
-      maxWait = maxWait,
-      minBytes = minBytes,
-      requestInfo = scalaMap
-    )
+                   requestInfo: util.LinkedHashMap[TopicAndPartition, 
PartitionFetchInfo]) {
+
+  @deprecated("The order of partitions in `requestInfo` is relevant, so this 
constructor is deprecated in favour of the " +
+    "one that takes a LinkedHashMap", since = "0.10.1.0")
+  def this(correlationId: Int, clientId: String, maxWait: Int, minBytes: Int,
+    requestInfo: java.util.Map[TopicAndPartition, PartitionFetchInfo]) {
+    this(correlationId, clientId, maxWait, minBytes,
+      
FetchRequest.seqToLinkedHashMap(kafka.api.FetchRequest.shuffle(requestInfo.asScala.toSeq)))
   }
 
+  val underlying = kafka.api.FetchRequest(
+    correlationId = correlationId,
+    clientId = clientId,
+    replicaId = Request.OrdinaryConsumerId,
+    maxWait = maxWait,
+    minBytes = minBytes,
+    requestInfo = requestInfo.asScala.toBuffer
+  )
+
   override def toString = underlying.toString
 
   override def equals(other: Any) = canEqual(other) && {

http://git-wip-us.apache.org/repos/asf/kafka/blob/d04b0998/core/src/main/scala/kafka/log/FileMessageSet.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/FileMessageSet.scala 
b/core/src/main/scala/kafka/log/FileMessageSet.scala
index 5763042..c76653a 100755
--- a/core/src/main/scala/kafka/log/FileMessageSet.scala
+++ b/core/src/main/scala/kafka/log/FileMessageSet.scala
@@ -128,11 +128,13 @@ class FileMessageSet private[kafka](@volatile var file: 
File,
 
   /**
    * Search forward for the file position of the last offset that is greater 
than or equal to the target offset
-   * and return its physical position. If no such offsets are found, return 
null.
+   * and return its physical position and the size of the message (including 
log overhead) at the returned offset. If
+   * no such offsets are found, return null.
+   *
    * @param targetOffset The offset to search for.
    * @param startingPosition The starting position in the file to begin 
searching from.
    */
-  def searchForOffset(targetOffset: Long, startingPosition: Int): 
OffsetPosition = {
+  def searchForOffsetWithSize(targetOffset: Long, startingPosition: Int): 
(OffsetPosition, Int) = {
     var position = startingPosition
     val buffer = ByteBuffer.allocate(MessageSet.LogOverhead)
     val size = sizeInBytes()
@@ -144,11 +146,11 @@ class FileMessageSet private[kafka](@volatile var file: 
File,
           .format(targetOffset, startingPosition, file.getAbsolutePath))
       buffer.rewind()
       val offset = buffer.getLong()
-      if(offset >= targetOffset)
-        return OffsetPosition(offset, position)
       val messageSize = buffer.getInt()
-      if(messageSize < Message.MinMessageOverhead)
+      if (messageSize < Message.MinMessageOverhead)
         throw new IllegalStateException("Invalid message size: " + messageSize)
+      if (offset >= targetOffset)
+        return (OffsetPosition(offset, position), messageSize + 
MessageSet.LogOverhead)
       position += MessageSet.LogOverhead + messageSize
     }
     null

http://git-wip-us.apache.org/repos/asf/kafka/blob/d04b0998/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/Log.scala 
b/core/src/main/scala/kafka/log/Log.scala
index 894beab..cfd0472 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -517,11 +517,12 @@ class Log(val dir: File,
    * @param startOffset The offset to begin reading at
    * @param maxLength The maximum number of bytes to read
    * @param maxOffset The offset to read up to, exclusive. (i.e. this offset 
NOT included in the resulting message set)
+   * @param minOneMessage If this is true, the first message will be returned 
even if it exceeds `maxLength` (if one exists)
    *
    * @throws OffsetOutOfRangeException If startOffset is beyond the log end 
offset or before the base offset of the first segment.
    * @return The fetch data information including fetch starting offset 
metadata and messages read.
    */
-  def read(startOffset: Long, maxLength: Int, maxOffset: Option[Long] = None): 
FetchDataInfo = {
+  def read(startOffset: Long, maxLength: Int, maxOffset: Option[Long] = None, 
minOneMessage: Boolean = false): FetchDataInfo = {
     trace("Reading %d bytes from offset %d in log %s of length %d 
bytes".format(maxLength, startOffset, name, size))
 
     // Because we don't use lock for reading, the synchronization is a little 
bit tricky.
@@ -558,7 +559,7 @@ class Log(val dir: File,
           entry.getValue.size
         }
       }
-      val fetchInfo = entry.getValue.read(startOffset, maxOffset, maxLength, 
maxPosition)
+      val fetchInfo = entry.getValue.read(startOffset, maxOffset, maxLength, 
maxPosition, minOneMessage)
       if(fetchInfo == null) {
         entry = segments.higherEntry(entry.getKey)
       } else {

http://git-wip-us.apache.org/repos/asf/kafka/blob/d04b0998/core/src/main/scala/kafka/log/LogSegment.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogSegment.scala 
b/core/src/main/scala/kafka/log/LogSegment.scala
index ccc2472..0eb4330 100755
--- a/core/src/main/scala/kafka/log/LogSegment.scala
+++ b/core/src/main/scala/kafka/log/LogSegment.scala
@@ -119,12 +119,13 @@ class LogSegment(val log: FileMessageSet,
    * @param offset The offset we want to translate
    * @param startingFilePosition A lower bound on the file position from which 
to begin the search. This is purely an optimization and
    * when omitted, the search will begin at the position in the offset index.
-   * @return The position in the log storing the message with the least offset 
>= the requested offset or null if no message meets this criteria.
+   * @return The position in the log storing the message with the least offset 
>= the requested offset and the size of the
+    *        message or null if no message meets this criteria.
    */
   @threadsafe
-  private[log] def translateOffset(offset: Long, startingFilePosition: Int = 
0): OffsetPosition = {
+  private[log] def translateOffset(offset: Long, startingFilePosition: Int = 
0): (OffsetPosition, Int) = {
     val mapping = index.lookup(offset)
-    log.searchForOffset(offset, max(mapping.position, startingFilePosition))
+    log.searchForOffsetWithSize(offset, max(mapping.position, 
startingFilePosition))
   }
 
   /**
@@ -135,50 +136,58 @@ class LogSegment(val log: FileMessageSet,
    * @param maxSize The maximum number of bytes to include in the message set 
we read
    * @param maxOffset An optional maximum offset for the message set we read
    * @param maxPosition The maximum position in the log segment that should be 
exposed for read
+   * @param minOneMessage If this is true, the first message will be returned 
even if it exceeds `maxSize` (if one exists)
    *
    * @return The fetched data and the offset metadata of the first message 
whose offset is >= startOffset,
    *         or null if the startOffset is larger than the largest offset in 
this log
    */
   @threadsafe
-  def read(startOffset: Long, maxOffset: Option[Long], maxSize: Int, 
maxPosition: Long = size): FetchDataInfo = {
-    if(maxSize < 0)
+  def read(startOffset: Long, maxOffset: Option[Long], maxSize: Int, 
maxPosition: Long = size,
+           minOneMessage: Boolean = false): FetchDataInfo = {
+    if (maxSize < 0)
       throw new IllegalArgumentException("Invalid max size for log read 
(%d)".format(maxSize))
 
     val logSize = log.sizeInBytes // this may change, need to save a 
consistent copy
-    val startPosition = translateOffset(startOffset)
+    val startOffsetAndSize = translateOffset(startOffset)
 
     // if the start position is already off the end of the log, return null
-    if(startPosition == null)
+    if (startOffsetAndSize == null)
       return null
 
+    val (startPosition, messageSetSize) = startOffsetAndSize
     val offsetMetadata = new LogOffsetMetadata(startOffset, this.baseOffset, 
startPosition.position)
 
-    // if the size is zero, still return a log segment but with zero size
-    if(maxSize == 0)
+    val adjustedMaxSize =
+      if (minOneMessage) math.max(maxSize, messageSetSize)
+      else maxSize
+
+    // return a log segment but with zero size in the case below
+    if (adjustedMaxSize == 0)
       return FetchDataInfo(offsetMetadata, MessageSet.Empty)
 
     // calculate the length of the message set to read based on whether or not 
they gave us a maxOffset
     val length = maxOffset match {
       case None =>
         // no max offset, just read until the max position
-        min((maxPosition - startPosition.position).toInt, maxSize)
+        min((maxPosition - startPosition.position).toInt, adjustedMaxSize)
       case Some(offset) =>
         // there is a max offset, translate it to a file position and use that 
to calculate the max read size;
         // when the leader of a partition changes, it's possible for the new 
leader's high watermark to be less than the
         // true high watermark in the previous leader for a short window. In 
this window, if a consumer fetches on an
         // offset between new leader's high watermark and the log end offset, 
we want to return an empty response.
-        if(offset < startOffset)
-          return FetchDataInfo(offsetMetadata, MessageSet.Empty)
+        if (offset < startOffset)
+          return FetchDataInfo(offsetMetadata, MessageSet.Empty, 
firstMessageSetIncomplete = false)
         val mapping = translateOffset(offset, startPosition.position)
         val endPosition =
-          if(mapping == null)
+          if (mapping == null)
             logSize // the max offset is off the end of the log, use the end 
of the file
           else
-            mapping.position
-        min(min(maxPosition, endPosition) - startPosition.position, 
maxSize).toInt
+            mapping._1.position
+        min(min(maxPosition, endPosition) - startPosition.position, 
adjustedMaxSize).toInt
     }
 
-    FetchDataInfo(offsetMetadata, log.read(startPosition.position, length))
+    FetchDataInfo(offsetMetadata, log.read(startPosition.position, length),
+      firstMessageSetIncomplete = adjustedMaxSize < messageSetSize)
   }
 
   /**
@@ -260,14 +269,15 @@ class LogSegment(val log: FileMessageSet,
   @nonthreadsafe
   def truncateTo(offset: Long): Int = {
     val mapping = translateOffset(offset)
-    if(mapping == null)
+    if (mapping == null)
       return 0
     index.truncateTo(offset)
     timeIndex.truncateTo(offset)
     // after truncation, reset and allocate more space for the (new currently  
active) index
     index.resize(index.maxIndexSize)
     timeIndex.resize(timeIndex.maxIndexSize)
-    val bytesTruncated = log.truncateTo(mapping.position)
+    val (offsetPosition, _) = mapping
+    val bytesTruncated = log.truncateTo(offsetPosition.position)
     if(log.sizeInBytes == 0) {
       created = time.milliseconds
       rollingBasedTimestamp = None

http://git-wip-us.apache.org/repos/asf/kafka/blob/d04b0998/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherManager.scala 
b/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
index ec40516..8f6b84f 100755
--- a/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
@@ -23,8 +23,8 @@ import scala.collection.Map
 import kafka.utils.Logging
 import kafka.cluster.BrokerEndPoint
 import kafka.metrics.KafkaMetricsGroup
-import kafka.common.TopicAndPartition
 import com.yammer.metrics.core.Gauge
+import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.utils.Utils
 
 abstract class AbstractFetcherManager(protected val name: String, clientId: 
String, numFetchers: Int = 1)
@@ -71,7 +71,7 @@ abstract class AbstractFetcherManager(protected val name: 
String, clientId: Stri
   // to be defined in subclass to create a specific fetcher
   def createFetcherThread(fetcherId: Int, sourceBroker: BrokerEndPoint): 
AbstractFetcherThread
 
-  def addFetcherForPartitions(partitionAndOffsets: Map[TopicAndPartition, 
BrokerAndInitialOffset]) {
+  def addFetcherForPartitions(partitionAndOffsets: Map[TopicPartition, 
BrokerAndInitialOffset]) {
     mapLock synchronized {
       val partitionsPerFetcher = partitionAndOffsets.groupBy{ 
case(topicAndPartition, brokerAndInitialOffset) =>
         BrokerAndFetcherId(brokerAndInitialOffset.broker, 
getFetcherId(topicAndPartition.topic, topicAndPartition.partition))}
@@ -85,8 +85,8 @@ abstract class AbstractFetcherManager(protected val name: 
String, clientId: Stri
             fetcherThread.start
         }
 
-        
fetcherThreadMap(brokerAndFetcherId).addPartitions(partitionAndOffsets.map { 
case (topicAndPartition, brokerAndInitOffset) =>
-          topicAndPartition -> brokerAndInitOffset.initOffset
+        
fetcherThreadMap(brokerAndFetcherId).addPartitions(partitionAndOffsets.map { 
case (tp, brokerAndInitOffset) =>
+          tp -> brokerAndInitOffset.initOffset
         })
       }
     }
@@ -95,11 +95,10 @@ abstract class AbstractFetcherManager(protected val name: 
String, clientId: Stri
       "[" + topicAndPartition + ", initOffset " + 
brokerAndInitialOffset.initOffset + " to broker " + 
brokerAndInitialOffset.broker + "] "}))
   }
 
-  def removeFetcherForPartitions(partitions: Set[TopicAndPartition]) {
+  def removeFetcherForPartitions(partitions: Set[TopicPartition]) {
     mapLock synchronized {
-      for ((key, fetcher) <- fetcherThreadMap) {
+      for ((key, fetcher) <- fetcherThreadMap)
         fetcher.removePartitions(partitions)
-      }
     }
     info("Removed fetcher for partitions %s".format(partitions.mkString(",")))
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d04b0998/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala 
b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index 36baf1f..2f2cb4b 100755
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -21,19 +21,23 @@ import java.util.concurrent.locks.ReentrantLock
 
 import kafka.cluster.BrokerEndPoint
 import kafka.consumer.PartitionTopicInfo
-import kafka.message.{MessageAndOffset, ByteBufferMessageSet}
-import kafka.utils.{Pool, ShutdownableThread, DelayedItem}
-import kafka.common.{KafkaException, ClientIdAndBroker, TopicAndPartition}
+import kafka.message.ByteBufferMessageSet
+import kafka.utils.{DelayedItem, Pool, ShutdownableThread}
+import kafka.common.{ClientIdAndBroker, KafkaException}
 import kafka.metrics.KafkaMetricsGroup
 import kafka.utils.CoreUtils.inLock
 import org.apache.kafka.common.errors.CorruptRecordException
 import org.apache.kafka.common.protocol.Errors
 import AbstractFetcherThread._
-import scala.collection.{mutable, Set, Map}
+
+import scala.collection.{Map, Set, mutable}
+import scala.collection.JavaConverters._
 import java.util.concurrent.TimeUnit
 import java.util.concurrent.atomic.AtomicLong
 
 import com.yammer.metrics.core.Gauge
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.internals.PartitionStates
 
 /**
  *  Abstract class for fetching data from multiple partitions from the same 
broker.
@@ -48,7 +52,7 @@ abstract class AbstractFetcherThread(name: String,
   type REQ <: FetchRequest
   type PD <: PartitionData
 
-  private val partitionMap = new mutable.HashMap[TopicAndPartition, 
PartitionFetchState] // a (topic, partition) -> partitionFetchState map
+  private val partitionStates = new PartitionStates[PartitionFetchState]
   private val partitionMapLock = new ReentrantLock
   private val partitionMapCond = partitionMapLock.newCondition()
 
@@ -59,17 +63,17 @@ abstract class AbstractFetcherThread(name: String,
   /* callbacks to be defined in subclass */
 
   // process fetched data
-  def processPartitionData(topicAndPartition: TopicAndPartition, fetchOffset: 
Long, partitionData: PD)
+  def processPartitionData(topicPartition: TopicPartition, fetchOffset: Long, 
partitionData: PD)
 
   // handle a partition whose offset is out of range and return a new fetch 
offset
-  def handleOffsetOutOfRange(topicAndPartition: TopicAndPartition): Long
+  def handleOffsetOutOfRange(topicPartition: TopicPartition): Long
 
   // deal with partitions with errors, potentially due to leadership changes
-  def handlePartitionsWithErrors(partitions: Iterable[TopicAndPartition])
+  def handlePartitionsWithErrors(partitions: Iterable[TopicPartition])
 
-  protected def buildFetchRequest(partitionMap: Map[TopicAndPartition, 
PartitionFetchState]): REQ
+  protected def buildFetchRequest(partitionMap: Seq[(TopicPartition, 
PartitionFetchState)]): REQ
 
-  protected def fetch(fetchRequest: REQ): Map[TopicAndPartition, PD]
+  protected def fetch(fetchRequest: REQ): Seq[(TopicPartition, PD)]
 
   override def shutdown(){
     initiateShutdown()
@@ -86,7 +90,9 @@ abstract class AbstractFetcherThread(name: String,
   override def doWork() {
 
     val fetchRequest = inLock(partitionMapLock) {
-      val fetchRequest = buildFetchRequest(partitionMap)
+      val fetchRequest = 
buildFetchRequest(partitionStates.partitionStates.asScala.map { state =>
+        state.topicPartition -> state.value
+      })
       if (fetchRequest.isEmpty) {
         trace("There are no active partitions. Back off for %d ms before 
sending a fetch request".format(fetchBackOffMs))
         partitionMapCond.await(fetchBackOffMs, TimeUnit.MILLISECONDS)
@@ -98,8 +104,14 @@ abstract class AbstractFetcherThread(name: String,
   }
 
   private def processFetchRequest(fetchRequest: REQ) {
-    val partitionsWithError = new mutable.HashSet[TopicAndPartition]
-    var responseData: Map[TopicAndPartition, PD] = Map.empty
+    val partitionsWithError = mutable.Set[TopicPartition]()
+
+    def updatePartitionsWithError(partition: TopicPartition): Unit = {
+      partitionsWithError += partition
+      partitionStates.moveToEnd(partition)
+    }
+
+    var responseData: Seq[(TopicPartition, PD)] = Seq.empty
 
     try {
       trace("Issuing to broker %d of fetch request %s".format(sourceBroker.id, 
fetchRequest))
@@ -109,8 +121,10 @@ abstract class AbstractFetcherThread(name: String,
         if (isRunning.get) {
           warn(s"Error in fetch $fetchRequest", t)
           inLock(partitionMapLock) {
-            partitionsWithError ++= partitionMap.keys
+            
partitionStates.partitionSet.asScala.foreach(updatePartitionsWithError)
             // there is an error occurred while fetching partitions, sleep a 
while
+            // note that `ReplicaFetcherThread.handlePartitionsWithError` will 
also introduce the same delay for every
+            // partition with error effectively doubling the delay. It would 
be good to improve this.
             partitionMapCond.await(fetchBackOffMs, TimeUnit.MILLISECONDS)
           }
         }
@@ -121,25 +135,28 @@ abstract class AbstractFetcherThread(name: String,
       // process fetched data
       inLock(partitionMapLock) {
 
-        responseData.foreach { case (topicAndPartition, partitionData) =>
-          val TopicAndPartition(topic, partitionId) = topicAndPartition
-          
partitionMap.get(topicAndPartition).foreach(currentPartitionFetchState =>
+        responseData.foreach { case (topicPartition, partitionData) =>
+          val topic = topicPartition.topic
+          val partitionId = topicPartition.partition
+          
Option(partitionStates.stateValue(topicPartition)).foreach(currentPartitionFetchState
 =>
             // we append to the log if the current offset is defined and it is 
the same as the offset requested during fetch
-            if (fetchRequest.offset(topicAndPartition) == 
currentPartitionFetchState.offset) {
+            if (fetchRequest.offset(topicPartition) == 
currentPartitionFetchState.offset) {
               Errors.forCode(partitionData.errorCode) match {
                 case Errors.NONE =>
                   try {
                     val messages = partitionData.toByteBufferMessageSet
-                    val validBytes = messages.validBytes
                     val newOffset = messages.shallowIterator.toSeq.lastOption 
match {
-                      case Some(m: MessageAndOffset) => m.nextOffset
-                      case None => currentPartitionFetchState.offset
+                      case Some(m) =>
+                        partitionStates.updateAndMoveToEnd(topicPartition, new 
PartitionFetchState(m.nextOffset))
+                        fetcherStats.byteRate.mark(messages.validBytes)
+                        m.nextOffset
+                      case None =>
+                        currentPartitionFetchState.offset
                     }
-                    partitionMap.put(topicAndPartition, new 
PartitionFetchState(newOffset))
+
                     fetcherLagStats.getAndMaybePut(topic, partitionId).lag = 
Math.max(0L, partitionData.highWatermark - newOffset)
-                    fetcherStats.byteRate.mark(validBytes)
                     // Once we hand off the partition data to the subclass, we 
can't mess with it any more in this thread
-                    processPartitionData(topicAndPartition, 
currentPartitionFetchState.offset, partitionData)
+                    processPartitionData(topicPartition, 
currentPartitionFetchState.offset, partitionData)
                   } catch {
                     case ime: CorruptRecordException =>
                       // we log the error and continue. This ensures two things
@@ -153,20 +170,20 @@ abstract class AbstractFetcherThread(name: String,
                   }
                 case Errors.OFFSET_OUT_OF_RANGE =>
                   try {
-                    val newOffset = handleOffsetOutOfRange(topicAndPartition)
-                    partitionMap.put(topicAndPartition, new 
PartitionFetchState(newOffset))
+                    val newOffset = handleOffsetOutOfRange(topicPartition)
+                    partitionStates.updateAndMoveToEnd(topicPartition, new 
PartitionFetchState(newOffset))
                     error("Current offset %d for partition [%s,%d] out of 
range; reset offset to %d"
                       .format(currentPartitionFetchState.offset, topic, 
partitionId, newOffset))
                   } catch {
                     case e: Throwable =>
                       error("Error getting offset for partition [%s,%d] to 
broker %d".format(topic, partitionId, sourceBroker.id), e)
-                      partitionsWithError += topicAndPartition
+                      updatePartitionsWithError(topicPartition)
                   }
                 case _ =>
                   if (isRunning.get) {
                     error("Error for partition [%s,%d] to broker 
%d:%s".format(topic, partitionId, sourceBroker.id,
                       partitionData.exception.get))
-                    partitionsWithError += topicAndPartition
+                    updatePartitionsWithError(topicPartition)
                   }
               }
             })
@@ -180,47 +197,52 @@ abstract class AbstractFetcherThread(name: String,
     }
   }
 
-  def addPartitions(partitionAndOffsets: Map[TopicAndPartition, Long]) {
+  def addPartitions(partitionAndOffsets: Map[TopicPartition, Long]) {
     partitionMapLock.lockInterruptibly()
     try {
-      for ((topicAndPartition, offset) <- partitionAndOffsets) {
-        // If the partitionMap already has the topic/partition, then do not 
update the map with the old offset
-        if (!partitionMap.contains(topicAndPartition))
-          partitionMap.put(
-            topicAndPartition,
-            if (PartitionTopicInfo.isOffsetInvalid(offset)) new 
PartitionFetchState(handleOffsetOutOfRange(topicAndPartition))
-            else new PartitionFetchState(offset)
-          )}
+      // If the partitionMap already has the topic/partition, then do not 
update the map with the old offset
+      val newPartitionToState = partitionAndOffsets.filter { case (tp, _) =>
+        !partitionStates.contains(tp)
+      }.map { case (tp, offset) =>
+        val fetchState =
+          if (PartitionTopicInfo.isOffsetInvalid(offset)) new 
PartitionFetchState(handleOffsetOutOfRange(tp))
+          else new PartitionFetchState(offset)
+        tp -> fetchState
+      }
+      val existingPartitionToState = 
partitionStates.partitionStates.asScala.map { state =>
+        state.topicPartition -> state.value
+      }.toMap
+      partitionStates.set((existingPartitionToState ++ 
newPartitionToState).asJava)
       partitionMapCond.signalAll()
     } finally partitionMapLock.unlock()
   }
 
-  def delayPartitions(partitions: Iterable[TopicAndPartition], delay: Long) {
+  def delayPartitions(partitions: Iterable[TopicPartition], delay: Long) {
     partitionMapLock.lockInterruptibly()
     try {
       for (partition <- partitions) {
-        partitionMap.get(partition).foreach (currentPartitionFetchState =>
+        Option(partitionStates.stateValue(partition)).foreach 
(currentPartitionFetchState =>
           if (currentPartitionFetchState.isActive)
-            partitionMap.put(partition, new 
PartitionFetchState(currentPartitionFetchState.offset, new DelayedItem(delay)))
+            partitionStates.updateAndMoveToEnd(partition, new 
PartitionFetchState(currentPartitionFetchState.offset, new DelayedItem(delay)))
         )
       }
       partitionMapCond.signalAll()
     } finally partitionMapLock.unlock()
   }
 
-  def removePartitions(topicAndPartitions: Set[TopicAndPartition]) {
+  def removePartitions(topicPartitions: Set[TopicPartition]) {
     partitionMapLock.lockInterruptibly()
     try {
-      topicAndPartitions.foreach { topicAndPartition =>
-        partitionMap.remove(topicAndPartition)
-        fetcherLagStats.unregister(topicAndPartition.topic, 
topicAndPartition.partition)
+      topicPartitions.foreach { topicPartition =>
+        partitionStates.remove(topicPartition)
+        fetcherLagStats.unregister(topicPartition.topic, 
topicPartition.partition)
       }
     } finally partitionMapLock.unlock()
   }
 
   def partitionCount() = {
     partitionMapLock.lockInterruptibly()
-    try partitionMap.size
+    try partitionStates.size
     finally partitionMapLock.unlock()
   }
 
@@ -230,7 +252,7 @@ object AbstractFetcherThread {
 
   trait FetchRequest {
     def isEmpty: Boolean
-    def offset(topicAndPartition: TopicAndPartition): Long
+    def offset(topicPartition: TopicPartition): Long
   }
 
   trait PartitionData {
@@ -315,13 +337,13 @@ case class ClientIdTopicPartition(clientId: String, 
topic: String, partitionId:
 }
 
 /**
-  * case class to keep partition offset and its state(active , inactive)
+  * case class to keep partition offset and its state(active, inactive)
   */
 case class PartitionFetchState(offset: Long, delay: DelayedItem) {
 
   def this(offset: Long) = this(offset, new DelayedItem(0))
 
-  def isActive: Boolean = { delay.getDelay(TimeUnit.MILLISECONDS) == 0 }
+  def isActive: Boolean = delay.getDelay(TimeUnit.MILLISECONDS) == 0
 
   override def toString = "%d-%b".format(offset, isActive)
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d04b0998/core/src/main/scala/kafka/server/DelayedFetch.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/DelayedFetch.scala 
b/core/src/main/scala/kafka/server/DelayedFetch.scala
index cf3a48f..4b17e81 100644
--- a/core/src/main/scala/kafka/server/DelayedFetch.scala
+++ b/core/src/main/scala/kafka/server/DelayedFetch.scala
@@ -37,10 +37,12 @@ case class FetchPartitionStatus(startOffsetMetadata: 
LogOffsetMetadata, fetchInf
  * The fetch metadata maintained by the delayed fetch operation
  */
 case class FetchMetadata(fetchMinBytes: Int,
+                         fetchMaxBytes: Int,
+                         hardMaxBytesLimit: Boolean,
                          fetchOnlyLeader: Boolean,
                          fetchOnlyCommitted: Boolean,
                          isFromFollower: Boolean,
-                         fetchPartitionStatus: Map[TopicAndPartition, 
FetchPartitionStatus]) {
+                         fetchPartitionStatus: Seq[(TopicAndPartition, 
FetchPartitionStatus)]) {
 
   override def toString = "[minBytes: " + fetchMinBytes + ", " +
                           "onlyLeader:" + fetchOnlyLeader + ", "
@@ -55,7 +57,7 @@ class DelayedFetch(delayMs: Long,
                    fetchMetadata: FetchMetadata,
                    replicaManager: ReplicaManager,
                    quota: ReplicaQuota,
-                   responseCallback: Map[TopicAndPartition, 
FetchResponsePartitionData] => Unit)
+                   responseCallback: Seq[(TopicAndPartition, 
FetchResponsePartitionData)] => Unit)
   extends DelayedOperation(delayMs) {
 
   /**
@@ -136,12 +138,18 @@ class DelayedFetch(delayMs: Long,
    * Upon completion, read whatever data is available and pass to the complete 
callback
    */
   override def onComplete() {
-    val logReadResults = 
replicaManager.readFromLocalLog(fetchMetadata.fetchOnlyLeader,
+    val logReadResults = replicaManager.readFromLocalLog(
+      fetchMetadata.fetchOnlyLeader,
       fetchMetadata.fetchOnlyCommitted,
-      fetchMetadata.fetchPartitionStatus.mapValues(status => 
status.fetchInfo), quota)
-
-    val fetchPartitionData = logReadResults.mapValues(result =>
-      FetchResponsePartitionData(result.errorCode, result.hw, 
result.info.messageSet))
+      fetchMetadata.fetchMaxBytes,
+      fetchMetadata.hardMaxBytesLimit,
+      fetchMetadata.fetchPartitionStatus.map { case (tp, status) => tp -> 
status.fetchInfo },
+      quota
+    )
+
+    val fetchPartitionData = logReadResults.map { case (tp, result) =>
+      tp -> FetchResponsePartitionData(result.errorCode, result.hw, 
result.info.messageSet)
+    }
 
     responseCallback(fetchPartitionData)
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d04b0998/core/src/main/scala/kafka/server/FetchDataInfo.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/FetchDataInfo.scala 
b/core/src/main/scala/kafka/server/FetchDataInfo.scala
index 1a8a604..9d6d437 100644
--- a/core/src/main/scala/kafka/server/FetchDataInfo.scala
+++ b/core/src/main/scala/kafka/server/FetchDataInfo.scala
@@ -19,4 +19,5 @@ package kafka.server
 
 import kafka.message.MessageSet
 
-case class FetchDataInfo(fetchOffsetMetadata: LogOffsetMetadata, messageSet: 
MessageSet)
+case class FetchDataInfo(fetchOffsetMetadata: LogOffsetMetadata, messageSet: 
MessageSet,
+                         firstMessageSetIncomplete: Boolean = false)

http://git-wip-us.apache.org/repos/asf/kafka/blob/d04b0998/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala 
b/core/src/main/scala/kafka/server/KafkaApis.scala
index d3ba5ef..51c9eab 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -146,7 +146,7 @@ class KafkaApis(val requestChannel: RequestChannel,
       val responseHeader = new ResponseHeader(correlationId)
       val leaderAndIsrResponse =
         if (authorize(request.session, ClusterAction, 
Resource.ClusterResource)) {
-          val  result = replicaManager.becomeLeaderOrFollower(correlationId, 
leaderAndIsrRequest, metadataCache, onLeadershipChange)
+          val result = replicaManager.becomeLeaderOrFollower(correlationId, 
leaderAndIsrRequest, metadataCache, onLeadershipChange)
           new LeaderAndIsrResponse(result.errorCode, 
result.responseMap.mapValues(new JShort(_)).asJava)
         } else {
           val result = 
leaderAndIsrRequest.partitionStates.asScala.keys.map((_, new 
JShort(Errors.CLUSTER_AUTHORIZATION_FAILED.code))).toMap
@@ -437,12 +437,12 @@ class KafkaApis(val requestChannel: RequestChannel,
       case (topicAndPartition, _) => authorize(request.session, Read, new 
Resource(auth.Topic, topicAndPartition.topic))
     }
 
-    val unauthorizedPartitionData = unauthorizedRequestInfo.mapValues { _ =>
-      FetchResponsePartitionData(Errors.TOPIC_AUTHORIZATION_FAILED.code, -1, 
MessageSet.Empty)
+    val unauthorizedPartitionData = unauthorizedRequestInfo.map { case (tp, _) 
=>
+      (tp, FetchResponsePartitionData(Errors.TOPIC_AUTHORIZATION_FAILED.code, 
-1, MessageSet.Empty))
     }
 
     // the callback for sending a fetch response
-    def sendResponseCallback(responsePartitionData: Map[TopicAndPartition, 
FetchResponsePartitionData]) {
+    def sendResponseCallback(responsePartitionData: Seq[(TopicAndPartition, 
FetchResponsePartitionData)]) {
 
       val convertedPartitionData =
         // Need to down-convert message when consumer only takes magic value 0.
@@ -480,8 +480,8 @@ class KafkaApis(val requestChannel: RequestChannel,
 
       def fetchResponseCallback(delayTimeMs: Int) {
         trace(s"Sending fetch response to client ${fetchRequest.clientId} of " 
+
-          s"${convertedPartitionData.values.map(_.messages.sizeInBytes).sum} 
bytes")
-        val response = FetchResponse(fetchRequest.correlationId, 
mergedPartitionData, fetchRequest.versionId, delayTimeMs)
+          s"${convertedPartitionData.map { case (_, v) => 
v.messages.sizeInBytes }.sum} bytes")
+        val response = FetchResponse(fetchRequest.correlationId, 
mergedPartitionData.toSeq, fetchRequest.versionId, delayTimeMs)
         requestChannel.sendResponse(new RequestChannel.Response(request, new 
FetchResponseSend(request.connectionId, response)))
       }
 
@@ -490,32 +490,37 @@ class KafkaApis(val requestChannel: RequestChannel,
 
       if (fetchRequest.isFromFollower) {
         //We've already evaluated against the quota and are good to go. Just 
need to record it now.
-        val size = sizeOfThrottledPartitions(fetchRequest, 
mergedPartitionData, quotas.leader)
-        quotas.leader.record(size)
+        val responseSize = sizeOfThrottledPartitions(fetchRequest, 
mergedPartitionData, quotas.leader)
+        quotas.leader.record(responseSize)
         fetchResponseCallback(0)
       } else {
-        val size = 
FetchResponse.responseSize(mergedPartitionData.groupBy(_._1.topic), 
fetchRequest.versionId)
-        quotas.fetch.recordAndMaybeThrottle(request.session.sanitizedUser, 
fetchRequest.clientId, size, fetchResponseCallback)
+        val responseSize = 
FetchResponse.responseSize(FetchResponse.batchByTopic(mergedPartitionData),
+          fetchRequest.versionId)
+        quotas.fetch.recordAndMaybeThrottle(request.session.sanitizedUser, 
fetchRequest.clientId, responseSize, fetchResponseCallback)
       }
     }
 
     if (authorizedRequestInfo.isEmpty)
-      sendResponseCallback(Map.empty)
+      sendResponseCallback(Seq.empty)
     else {
       // call the replica manager to fetch messages from the local replica
       replicaManager.fetchMessages(
         fetchRequest.maxWait.toLong,
         fetchRequest.replicaId,
         fetchRequest.minBytes,
+        fetchRequest.maxBytes,
+        fetchRequest.versionId <= 2,
         authorizedRequestInfo,
         replicationQuota(fetchRequest),
         sendResponseCallback)
     }
   }
 
-  private def sizeOfThrottledPartitions(fetchRequest: FetchRequest, 
mergedPartitionData: Map[TopicAndPartition, FetchResponsePartitionData], quota: 
ReplicationQuotaManager): Int = {
+  private def sizeOfThrottledPartitions(fetchRequest: FetchRequest,
+                                        mergedPartitionData: 
Seq[(TopicAndPartition, FetchResponsePartitionData)],
+                                        quota: ReplicationQuotaManager): Int = 
{
     val throttledPartitions = mergedPartitionData.filter { case (partition, _) 
=> quota.isThrottled(partition) }
-    FetchResponse.responseSize(throttledPartitions.groupBy(_._1.topic), 
fetchRequest.versionId)
+    FetchResponse.responseSize(FetchRequest.batchByTopic(throttledPartitions), 
fetchRequest.versionId)
   }
 
   def replicationQuota(fetchRequest: FetchRequest): ReplicaQuota =

http://git-wip-us.apache.org/repos/asf/kafka/blob/d04b0998/core/src/main/scala/kafka/server/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala 
b/core/src/main/scala/kafka/server/KafkaConfig.scala
index b37be5b..9cd05f1 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -115,6 +115,7 @@ object Defaults {
   val ReplicaFetchMaxBytes = ConsumerConfig.FetchSize
   val ReplicaFetchWaitMaxMs = 500
   val ReplicaFetchMinBytes = 1
+  val ReplicaFetchResponseMaxBytes = 10 * 1024 * 1024
   val NumReplicaFetchers = 1
   val ReplicaFetchBackoffMs = 1000
   val ReplicaHighWatermarkCheckpointIntervalMs = 5000L
@@ -282,6 +283,7 @@ object KafkaConfig {
   val ReplicaFetchMaxBytesProp = "replica.fetch.max.bytes"
   val ReplicaFetchWaitMaxMsProp = "replica.fetch.wait.max.ms"
   val ReplicaFetchMinBytesProp = "replica.fetch.min.bytes"
+  val ReplicaFetchResponseMaxBytesProp = "replica.fetch.response.max.bytes"
   val ReplicaFetchBackoffMsProp = "replica.fetch.backoff.ms"
   val NumReplicaFetchersProp = "num.replica.fetchers"
   val ReplicaHighWatermarkCheckpointIntervalMsProp = 
"replica.high.watermark.checkpoint.interval.ms"
@@ -480,10 +482,17 @@ object KafkaConfig {
   " the leader will remove the follower from isr"
   val ReplicaSocketTimeoutMsDoc = "The socket timeout for network requests. 
Its value should be at least replica.fetch.wait.max.ms"
   val ReplicaSocketReceiveBufferBytesDoc = "The socket receive buffer for 
network requests"
-  val ReplicaFetchMaxBytesDoc = "The number of bytes of messages to attempt to 
fetch"
+  val ReplicaFetchMaxBytesDoc = "The number of bytes of messages to attempt to 
fetch for each partition. This is not an absolute maximum, " +
+    "if the first message in the first non-empty partition of the fetch is 
larger than this value, the message will still be returned " +
+    "to ensure that progress can be made. The maximum message size accepted by 
the broker is defined via " +
+    "<code>message.max.bytes</code> (broker config) or 
<code>max.message.bytes</code> (topic config)."
   val ReplicaFetchWaitMaxMsDoc = "max wait time for each fetcher request 
issued by follower replicas. This value should always be less than the " +
   "replica.lag.time.max.ms at all times to prevent frequent shrinking of ISR 
for low throughput topics"
   val ReplicaFetchMinBytesDoc = "Minimum bytes expected for each fetch 
response. If not enough bytes, wait up to replicaMaxWaitTimeMs"
+  val ReplicaFetchResponseMaxBytesDoc = "Maximum bytes expected for the entire 
fetch response. This is not an absolute maximum, " +
+    "if the first message in the first non-empty partition of the fetch is 
larger than this value, the message will still be returned " +
+    "to ensure that progress can be made. The maximum message size accepted by 
the broker is defined via " +
+    "<code>message.max.bytes</code> (broker config) or 
<code>max.message.bytes</code> (topic config)."
   val NumReplicaFetchersDoc = "Number of fetcher threads used to replicate 
messages from a source broker. " +
   "Increasing this value can increase the degree of I/O parallelism in the 
follower broker."
   val ReplicaFetchBackoffMsDoc = "The amount of time to sleep when fetch 
partition error occurs."
@@ -671,10 +680,11 @@ object KafkaConfig {
       .define(ReplicaLagTimeMaxMsProp, LONG, Defaults.ReplicaLagTimeMaxMs, 
HIGH, ReplicaLagTimeMaxMsDoc)
       .define(ReplicaSocketTimeoutMsProp, INT, 
Defaults.ReplicaSocketTimeoutMs, HIGH, ReplicaSocketTimeoutMsDoc)
       .define(ReplicaSocketReceiveBufferBytesProp, INT, 
Defaults.ReplicaSocketReceiveBufferBytes, HIGH, 
ReplicaSocketReceiveBufferBytesDoc)
-      .define(ReplicaFetchMaxBytesProp, INT, Defaults.ReplicaFetchMaxBytes, 
HIGH, ReplicaFetchMaxBytesDoc)
+      .define(ReplicaFetchMaxBytesProp, INT, Defaults.ReplicaFetchMaxBytes, 
atLeast(0), MEDIUM, ReplicaFetchMaxBytesDoc)
       .define(ReplicaFetchWaitMaxMsProp, INT, Defaults.ReplicaFetchWaitMaxMs, 
HIGH, ReplicaFetchWaitMaxMsDoc)
       .define(ReplicaFetchBackoffMsProp, INT, Defaults.ReplicaFetchBackoffMs, 
atLeast(0), MEDIUM, ReplicaFetchBackoffMsDoc)
       .define(ReplicaFetchMinBytesProp, INT, Defaults.ReplicaFetchMinBytes, 
HIGH, ReplicaFetchMinBytesDoc)
+      .define(ReplicaFetchResponseMaxBytesProp, INT, 
Defaults.ReplicaFetchResponseMaxBytes, atLeast(0), MEDIUM, 
ReplicaFetchResponseMaxBytesDoc)
       .define(NumReplicaFetchersProp, INT, Defaults.NumReplicaFetchers, HIGH, 
NumReplicaFetchersDoc)
       .define(ReplicaHighWatermarkCheckpointIntervalMsProp, LONG, 
Defaults.ReplicaHighWatermarkCheckpointIntervalMs, HIGH, 
ReplicaHighWatermarkCheckpointIntervalMsDoc)
       .define(FetchPurgatoryPurgeIntervalRequestsProp, INT, 
Defaults.FetchPurgatoryPurgeIntervalRequests, MEDIUM, 
FetchPurgatoryPurgeIntervalRequestsDoc)
@@ -881,6 +891,7 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: 
Boolean) extends Abstra
   val replicaFetchMaxBytes = getInt(KafkaConfig.ReplicaFetchMaxBytesProp)
   val replicaFetchWaitMaxMs = getInt(KafkaConfig.ReplicaFetchWaitMaxMsProp)
   val replicaFetchMinBytes = getInt(KafkaConfig.ReplicaFetchMinBytesProp)
+  val replicaFetchResponseMaxBytes = 
getInt(KafkaConfig.ReplicaFetchResponseMaxBytesProp)
   val replicaFetchBackoffMs = getInt(KafkaConfig.ReplicaFetchBackoffMsProp)
   val numReplicaFetchers = getInt(KafkaConfig.NumReplicaFetchersProp)
   val replicaHighWatermarkCheckpointIntervalMs = 
getLong(KafkaConfig.ReplicaHighWatermarkCheckpointIntervalMsProp)
@@ -1042,7 +1053,6 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: 
Boolean) extends Abstra
     require(logCleanerDedupeBufferSize / logCleanerThreads > 1024 * 1024, 
"log.cleaner.dedupe.buffer.size must be at least 1MB per cleaner thread.")
     require(replicaFetchWaitMaxMs <= replicaSocketTimeoutMs, 
"replica.socket.timeout.ms should always be at least replica.fetch.wait.max.ms" 
+
       " to prevent unnecessary socket timeouts")
-    require(replicaFetchMaxBytes >= messageMaxBytes, "replica.fetch.max.bytes 
should be equal or greater than message.max.bytes")
     require(replicaFetchWaitMaxMs <= replicaLagTimeMaxMs, 
"replica.fetch.wait.max.ms should always be at least replica.lag.time.max.ms" +
       " to prevent frequent changes in ISR")
     require(offsetCommitRequiredAcks >= -1 && offsetCommitRequiredAcks <= 
offsetsTopicReplicationFactor,

http://git-wip-us.apache.org/repos/asf/kafka/blob/d04b0998/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala 
b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index b0bd070..78e00df 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -18,24 +18,26 @@
 package kafka.server
 
 import java.net.SocketTimeoutException
+import java.util
 
 import kafka.admin.AdminUtils
 import kafka.cluster.BrokerEndPoint
 import kafka.log.LogConfig
 import kafka.message.ByteBufferMessageSet
-import kafka.api.{KAFKA_0_10_0_IV0, KAFKA_0_9_0}
+import kafka.api.{KAFKA_0_9_0, KAFKA_0_10_0_IV0, KAFKA_0_10_1_IV1}
 import kafka.common.{KafkaStorageException, TopicAndPartition}
 import ReplicaFetcherThread._
 
 import org.apache.kafka.clients.{ManualMetadataUpdater, NetworkClient, 
ClientRequest, ClientResponse}
 import org.apache.kafka.common.network.{LoginType, Selectable, 
ChannelBuilders, NetworkReceive, Selector, Mode}
-import org.apache.kafka.common.requests.{FetchRequest => JFetchRequest, _}
+import org.apache.kafka.common.requests.{ListOffsetResponse, FetchResponse, 
RequestSend, AbstractRequest, ListOffsetRequest}
+import org.apache.kafka.common.requests.{FetchRequest => JFetchRequest}
 import org.apache.kafka.common.{Node, TopicPartition}
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.protocol.{Errors, ApiKeys}
 import org.apache.kafka.common.utils.Time
 
-import scala.collection.{JavaConverters, Map, mutable}
+import scala.collection.{JavaConverters, Map}
 import JavaConverters._
 
 class ReplicaFetcherThread(name: String,
@@ -45,8 +47,7 @@ class ReplicaFetcherThread(name: String,
                            replicaMgr: ReplicaManager,
                            metrics: Metrics,
                            time: Time,
-                           quota: ReplicationQuotaManager
-                          )
+                           quota: ReplicationQuotaManager)
   extends AbstractFetcherThread(name = name,
                                 clientId = name,
                                 sourceBroker = sourceBroker,
@@ -57,13 +58,15 @@ class ReplicaFetcherThread(name: String,
   type PD = PartitionData
 
   private val fetchRequestVersion: Short =
-    if (brokerConfig.interBrokerProtocolVersion >= KAFKA_0_10_0_IV0) 2
+    if (brokerConfig.interBrokerProtocolVersion >= KAFKA_0_10_1_IV1) 3
+    else if (brokerConfig.interBrokerProtocolVersion >= KAFKA_0_10_0_IV0) 2
     else if (brokerConfig.interBrokerProtocolVersion >= KAFKA_0_9_0) 1
     else 0
   private val socketTimeout: Int = brokerConfig.replicaSocketTimeoutMs
   private val replicaId = brokerConfig.brokerId
   private val maxWait = brokerConfig.replicaFetchWaitMaxMs
   private val minBytes = brokerConfig.replicaFetchMinBytes
+  private val maxBytes = brokerConfig.replicaFetchResponseMaxBytes
   private val fetchSize = brokerConfig.replicaFetchMaxBytes
 
   private def clientId = name
@@ -111,22 +114,24 @@ class ReplicaFetcherThread(name: String,
   }
 
   // process fetched data
-  def processPartitionData(topicAndPartition: TopicAndPartition, fetchOffset: 
Long, partitionData: PartitionData) {
+  def processPartitionData(topicPartition: TopicPartition, fetchOffset: Long, 
partitionData: PartitionData) {
     try {
-      val TopicAndPartition(topic, partitionId) = topicAndPartition
+      val topic = topicPartition.topic
+      val partitionId = topicPartition.partition
       val replica = replicaMgr.getReplica(topic, partitionId).get
       val messageSet = partitionData.toByteBufferMessageSet
-      warnIfMessageOversized(messageSet, topicAndPartition)
+
+      maybeWarnIfMessageOversized(messageSet, topicPartition)
 
       if (fetchOffset != replica.logEndOffset.messageOffset)
-        throw new RuntimeException("Offset mismatch for partition %s: fetched 
offset = %d, log end offset = %d.".format(topicAndPartition, fetchOffset, 
replica.logEndOffset.messageOffset))
+        throw new RuntimeException("Offset mismatch for partition %s: fetched 
offset = %d, log end offset = %d.".format(topicPartition, fetchOffset, 
replica.logEndOffset.messageOffset))
       if (logger.isTraceEnabled)
         trace("Follower %d has replica log end offset %d for partition %s. 
Received %d messages and leader hw %d"
-          .format(replica.brokerId, replica.logEndOffset.messageOffset, 
topicAndPartition, messageSet.sizeInBytes, partitionData.highWatermark))
+          .format(replica.brokerId, replica.logEndOffset.messageOffset, 
topicPartition, messageSet.sizeInBytes, partitionData.highWatermark))
       replica.log.get.append(messageSet, assignOffsets = false)
       if (logger.isTraceEnabled)
         trace("Follower %d has replica log end offset %d after appending %d 
bytes of messages for partition %s"
-          .format(replica.brokerId, replica.logEndOffset.messageOffset, 
messageSet.sizeInBytes, topicAndPartition))
+          .format(replica.brokerId, replica.logEndOffset.messageOffset, 
messageSet.sizeInBytes, topicPartition))
       val followerHighWatermark = 
replica.logEndOffset.messageOffset.min(partitionData.highWatermark)
       // for the follower replica, we do not need to keep
       // its segment base offset the physical position,
@@ -135,18 +140,19 @@ class ReplicaFetcherThread(name: String,
       if (logger.isTraceEnabled)
         trace("Follower %d set replica high watermark for partition [%s,%d] to 
%s"
           .format(replica.brokerId, topic, partitionId, followerHighWatermark))
-      if (quota.isThrottled(topicAndPartition))
+      if (quota.isThrottled(new TopicAndPartition(topic, partitionId)))
         quota.record(messageSet.sizeInBytes)
     } catch {
       case e: KafkaStorageException =>
-        fatal(s"Disk error while replicating data for $topicAndPartition", e)
+        fatal(s"Disk error while replicating data for $topicPartition", e)
         Runtime.getRuntime.halt(1)
     }
   }
 
-  def warnIfMessageOversized(messageSet: ByteBufferMessageSet, 
topicAndPartition: TopicAndPartition): Unit = {
-    if (messageSet.sizeInBytes > 0 && messageSet.validBytes <= 0)
-      error(s"Replication is failing due to a message that is greater than 
replica.fetch.max.bytes for partition $topicAndPartition. " +
+  def maybeWarnIfMessageOversized(messageSet: ByteBufferMessageSet, 
topicPartition: TopicPartition): Unit = {
+    // oversized messages don't cause replication to fail from fetch request 
version 3 (KIP-74)
+    if (fetchRequestVersion <= 2 && messageSet.sizeInBytes > 0 && 
messageSet.validBytes <= 0)
+      error(s"Replication is failing due to a message that is greater than 
replica.fetch.max.bytes for partition $topicPartition. " +
         "This generally occurs when the max.message.bytes has been overridden 
to exceed this value and a suitably large " +
         "message has also been sent. To fix this problem increase 
replica.fetch.max.bytes in your broker config to be " +
         "equal or larger than your settings for max.message.bytes, both at a 
broker and topic level.")
@@ -155,8 +161,9 @@ class ReplicaFetcherThread(name: String,
   /**
    * Handle a partition whose offset is out of range and return a new fetch 
offset.
    */
-  def handleOffsetOutOfRange(topicAndPartition: TopicAndPartition): Long = {
-    val replica = replicaMgr.getReplica(topicAndPartition.topic, 
topicAndPartition.partition).get
+  def handleOffsetOutOfRange(topicPartition: TopicPartition): Long = {
+    val topicAndPartition = TopicAndPartition(topicPartition.topic, 
topicPartition.partition)
+    val replica = replicaMgr.getReplica(topicPartition.topic, 
topicPartition.partition).get
 
     /**
      * Unclean leader election: A follower goes down, in the meanwhile the 
leader keeps appending messages. The follower comes back up
@@ -168,7 +175,7 @@ class ReplicaFetcherThread(name: String,
      *
      * There is a potential for a mismatch between the logs of the two 
replicas here. We don't fix this mismatch as of now.
      */
-    val leaderEndOffset: Long = earliestOrLatestOffset(topicAndPartition, 
ListOffsetRequest.LATEST_TIMESTAMP,
+    val leaderEndOffset: Long = earliestOrLatestOffset(topicPartition, 
ListOffsetRequest.LATEST_TIMESTAMP,
       brokerConfig.brokerId)
 
     if (leaderEndOffset < replica.logEndOffset.messageOffset) {
@@ -176,16 +183,16 @@ class ReplicaFetcherThread(name: String,
       // This situation could only happen if the unclean election 
configuration for a topic changes while a replica is down. Otherwise,
       // we should never encounter this situation since a non-ISR leader 
cannot be elected if disallowed by the broker configuration.
       if (!LogConfig.fromProps(brokerConfig.originals, 
AdminUtils.fetchEntityConfig(replicaMgr.zkUtils,
-        ConfigType.Topic, 
topicAndPartition.topic)).uncleanLeaderElectionEnable) {
+        ConfigType.Topic, topicPartition.topic)).uncleanLeaderElectionEnable) {
         // Log a fatal error and shutdown the broker to ensure that data loss 
does not unexpectedly occur.
-        fatal("Exiting because log truncation is not allowed for partition 
%s,".format(topicAndPartition) +
+        fatal("Exiting because log truncation is not allowed for partition 
%s,".format(topicPartition) +
           " Current leader %d's latest offset %d is less than replica %d's 
latest offset %d"
           .format(sourceBroker.id, leaderEndOffset, brokerConfig.brokerId, 
replica.logEndOffset.messageOffset))
         System.exit(1)
       }
 
       warn("Replica %d for partition %s reset its fetch offset from %d to 
current leader %d's latest offset %d"
-        .format(brokerConfig.brokerId, topicAndPartition, 
replica.logEndOffset.messageOffset, sourceBroker.id, leaderEndOffset))
+        .format(brokerConfig.brokerId, topicPartition, 
replica.logEndOffset.messageOffset, sourceBroker.id, leaderEndOffset))
       replicaMgr.logManager.truncateTo(Map(topicAndPartition -> 
leaderEndOffset))
       leaderEndOffset
     } else {
@@ -211,10 +218,10 @@ class ReplicaFetcherThread(name: String,
        * and the current leader's log start offset.
        *
        */
-      val leaderStartOffset: Long = earliestOrLatestOffset(topicAndPartition, 
ListOffsetRequest.EARLIEST_TIMESTAMP,
+      val leaderStartOffset: Long = earliestOrLatestOffset(topicPartition, 
ListOffsetRequest.EARLIEST_TIMESTAMP,
         brokerConfig.brokerId)
       warn("Replica %d for partition %s reset its fetch offset from %d to 
current leader %d's start offset %d"
-        .format(brokerConfig.brokerId, topicAndPartition, 
replica.logEndOffset.messageOffset, sourceBroker.id, leaderStartOffset))
+        .format(brokerConfig.brokerId, topicPartition, 
replica.logEndOffset.messageOffset, sourceBroker.id, leaderStartOffset))
       val offsetToFetch = Math.max(leaderStartOffset, 
replica.logEndOffset.messageOffset)
       // Only truncate log when current leader's log start offset is greater 
than follower's log end offset.
       if (leaderStartOffset > replica.logEndOffset.messageOffset)
@@ -224,14 +231,14 @@ class ReplicaFetcherThread(name: String,
   }
 
   // any logic for partitions whose leader has changed
-  def handlePartitionsWithErrors(partitions: Iterable[TopicAndPartition]) {
+  def handlePartitionsWithErrors(partitions: Iterable[TopicPartition]) {
     delayPartitions(partitions, brokerConfig.replicaFetchBackoffMs.toLong)
   }
 
-  protected def fetch(fetchRequest: FetchRequest): Map[TopicAndPartition, 
PartitionData] = {
+  protected def fetch(fetchRequest: FetchRequest): Seq[(TopicPartition, 
PartitionData)] = {
     val clientResponse = sendRequest(ApiKeys.FETCH, Some(fetchRequestVersion), 
fetchRequest.underlying)
-    new FetchResponse(clientResponse.responseBody).responseData.asScala.map { 
case (key, value) =>
-      TopicAndPartition(key.topic, key.partition) -> new PartitionData(value)
+    new 
FetchResponse(clientResponse.responseBody).responseData.asScala.toSeq.map { 
case (key, value) =>
+      key -> new PartitionData(value)
     }
   }
 
@@ -252,10 +259,10 @@ class ReplicaFetcherThread(name: String,
         networkClient.close(sourceBroker.id.toString)
         throw e
     }
+
   }
 
-  private def earliestOrLatestOffset(topicAndPartition: TopicAndPartition, 
earliestOrLatest: Long, consumerId: Int): Long = {
-    val topicPartition = new TopicPartition(topicAndPartition.topic, 
topicAndPartition.partition)
+  private def earliestOrLatestOffset(topicPartition: TopicPartition, 
earliestOrLatest: Long, consumerId: Int): Long = {
     val partitions = Map(
       topicPartition -> new ListOffsetRequest.PartitionData(earliestOrLatest, 
1)
     )
@@ -269,23 +276,31 @@ class ReplicaFetcherThread(name: String,
     }
   }
 
-  protected def buildFetchRequest(partitionMap: Map[TopicAndPartition, 
PartitionFetchState]): FetchRequest = {
-    val requestMap = mutable.Map.empty[TopicPartition, 
JFetchRequest.PartitionData]
+  protected def buildFetchRequest(partitionMap: Seq[(TopicPartition, 
PartitionFetchState)]): FetchRequest = {
+    val requestMap = new util.LinkedHashMap[TopicPartition, 
JFetchRequest.PartitionData]
+
     val quotaExceeded = quota.isQuotaExceeded
-    partitionMap.foreach { case ((partition, partitionFetchState)) =>
-      if (partitionFetchState.isActive && !(quota.isThrottled(partition) && 
quotaExceeded))
-        requestMap(new TopicPartition(partition.topic, partition.partition)) = 
new JFetchRequest.PartitionData(partitionFetchState.offset, fetchSize)
+    partitionMap.foreach { case (topicPartition, partitionFetchState) =>
+      val topicAndPartition = new TopicAndPartition(topicPartition.topic, 
topicPartition.partition)
+      if (partitionFetchState.isActive && 
!(quota.isThrottled(topicAndPartition) && quotaExceeded))
+        requestMap.put(topicPartition, new 
JFetchRequest.PartitionData(partitionFetchState.offset, fetchSize))
     }
-    new FetchRequest(new JFetchRequest(replicaId, maxWait, minBytes, 
requestMap.asJava))
+
+    val request =
+      if (fetchRequestVersion >= 3) JFetchRequest.fromReplica(replicaId, 
maxWait, minBytes, maxBytes, requestMap)
+      else JFetchRequest.fromReplica(replicaId, maxWait, minBytes, requestMap)
+
+    new FetchRequest(request)
   }
+
 }
 
 object ReplicaFetcherThread {
 
   private[server] class FetchRequest(val underlying: JFetchRequest) extends 
AbstractFetcherThread.FetchRequest {
     def isEmpty: Boolean = underlying.fetchData.isEmpty
-    def offset(topicAndPartition: TopicAndPartition): Long =
-      underlying.fetchData.asScala(new TopicPartition(topicAndPartition.topic, 
topicAndPartition.partition)).offset
+    def offset(topicPartition: TopicPartition): Long =
+      underlying.fetchData.asScala(topicPartition).offset
   }
 
   private[server] class PartitionData(val underlying: 
FetchResponse.PartitionData) extends AbstractFetcherThread.PartitionData {

Reply via email to