http://git-wip-us.apache.org/repos/asf/kafka/blob/c4d629a0/core/src/main/scala/kafka/network/RequestChannel.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala 
b/core/src/main/scala/kafka/network/RequestChannel.scala
index ec8e1eb..1d95fa0 100644
--- a/core/src/main/scala/kafka/network/RequestChannel.scala
+++ b/core/src/main/scala/kafka/network/RequestChannel.scala
@@ -19,19 +19,16 @@ package kafka.network
 
 import java.net.InetAddress
 import java.nio.ByteBuffer
-import java.util.Collections
 import java.util.concurrent._
 
 import com.yammer.metrics.core.Gauge
 import kafka.metrics.KafkaMetricsGroup
+import kafka.network.RequestChannel.{ShutdownRequest, BaseRequest}
 import kafka.server.QuotaId
 import kafka.utils.{Logging, NotNothing}
-import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.errors.InvalidRequestException
 import org.apache.kafka.common.memory.MemoryPool
 import org.apache.kafka.common.network.{ListenerName, Send}
 import org.apache.kafka.common.protocol.{ApiKeys, Protocol, SecurityProtocol}
-import org.apache.kafka.common.record.{MemoryRecords, RecordBatch}
 import org.apache.kafka.common.requests._
 import org.apache.kafka.common.security.auth.KafkaPrincipal
 import org.apache.kafka.common.utils.Time
@@ -40,25 +37,20 @@ import org.apache.log4j.Logger
 import scala.reflect.ClassTag
 
 object RequestChannel extends Logging {
-  val AllDone = new Request(processor = 1, connectionId = "2", 
Session(KafkaPrincipal.ANONYMOUS, InetAddress.getLocalHost),
-    startTimeNanos = 0, listenerName = new ListenerName(""), securityProtocol 
= SecurityProtocol.PLAINTEXT,
-    MemoryPool.NONE, shutdownReceive)
   private val requestLogger = Logger.getLogger("kafka.request.logger")
 
-  private def shutdownReceive: ByteBuffer = {
-    val emptyProduceRequest = new 
ProduceRequest.Builder(RecordBatch.CURRENT_MAGIC_VALUE, 0, 0,
-      Collections.emptyMap[TopicPartition, MemoryRecords]).build()
-    val emptyRequestHeader = new RequestHeader(ApiKeys.PRODUCE.id, 
emptyProduceRequest.version, "", 0)
-    emptyProduceRequest.serialize(emptyRequestHeader)
-  }
+  sealed trait BaseRequest
+  case object ShutdownRequest extends BaseRequest
 
   case class Session(principal: KafkaPrincipal, clientAddress: InetAddress) {
     val sanitizedUser = QuotaId.sanitize(principal.getName)
   }
 
-  class Request(val processor: Int, val connectionId: String, val session: 
Session, startTimeNanos: Long,
-                val listenerName: ListenerName, val securityProtocol: 
SecurityProtocol, memoryPool: MemoryPool,
-                @volatile private var buffer: ByteBuffer) {
+  class Request(val processor: Int,
+                val context: RequestContext,
+                val startTimeNanos: Long,
+                memoryPool: MemoryPool,
+                @volatile private var buffer: ByteBuffer) extends BaseRequest {
     // These need to be volatile because the readers are in the network thread 
and the writers are in the request
     // handler threads or the purgatory threads
     @volatile var requestDequeueTimeNanos = -1L
@@ -68,31 +60,17 @@ object RequestChannel extends Logging {
     @volatile var apiRemoteCompleteTimeNanos = -1L
     @volatile var recordNetworkThreadTimeCallback: Option[Long => Unit] = None
 
-    val header: RequestHeader = try {
-      RequestHeader.parse(buffer)
-    } catch {
-      case ex: Throwable =>
-        throw new InvalidRequestException(s"Error parsing request header. Our 
best guess of the apiKey is: ${buffer.getShort(0)}", ex)
-    }
+    val session = Session(context.principal, context.clientAddress)
+    private val bodyAndSize: RequestAndSize = context.parseRequest(buffer)
 
-    val bodyAndSize: RequestAndSize =
-      try {
-        // For unsupported version of ApiVersionsRequest, create a dummy 
request to enable an error response to be returned later
-        if (header.apiKey == ApiKeys.API_VERSIONS.id && 
!Protocol.apiVersionSupported(header.apiKey, header.apiVersion)) {
-          new RequestAndSize(new ApiVersionsRequest.Builder().build(), 0)
-        }
-        else
-          AbstractRequest.getRequest(header.apiKey, header.apiVersion, buffer)
-      } catch {
-        case ex: Throwable =>
-          throw new InvalidRequestException(s"Error getting request for 
apiKey: ${header.apiKey} and apiVersion: ${header.apiVersion}", ex)
-      }
+    def header: RequestHeader = context.header
+    def sizeOfBodyInBytes: Int = bodyAndSize.size
 
     //most request types are parsed entirely into objects at this point. for 
those we can release the underlying buffer.
     //some (like produce, or any time the schema contains fields of types 
BYTES or NULLABLE_BYTES) retain a reference
     //to the buffer. for those requests we cannot release the buffer early, 
but only when request processing is done.
-    if (!Protocol.requiresDelayedDeallocation(header.apiKey)) {
-      dispose()
+    if (!Protocol.requiresDelayedDeallocation(header.apiKey.id)) {
+      releaseBuffer()
     }
 
     def requestDesc(details: Boolean): String = s"$header -- 
${body[AbstractRequest].toString(details)}"
@@ -105,7 +83,7 @@ object RequestChannel extends Logging {
       }
     }
 
-    trace("Processor %d received request : %s".format(processor, 
requestDesc(true)))
+    trace(s"Processor $processor received request: ${requestDesc(true)}")
 
     def requestThreadTimeNanos = {
       if (apiLocalCompleteTimeNanos == -1L) apiLocalCompleteTimeNanos = 
Time.SYSTEM.nanoseconds
@@ -121,7 +99,7 @@ object RequestChannel extends Logging {
       if (apiLocalCompleteTimeNanos < 0)
         apiLocalCompleteTimeNanos = responseCompleteTimeNanos
       // If the apiRemoteCompleteTimeNanos is not set (i.e., for requests that 
do not go through a purgatory), then it is
-      // the same as responseCompleteTimeNans.
+      // the same as responseCompleteTimeNanos.
       if (apiRemoteCompleteTimeNanos < 0)
         apiRemoteCompleteTimeNanos = responseCompleteTimeNanos
 
@@ -135,7 +113,7 @@ object RequestChannel extends Logging {
       val responseSendTime = nanosToMs(endTimeNanos - responseDequeueTimeNanos)
       val totalTime = nanosToMs(endTimeNanos - startTimeNanos)
       val fetchMetricNames =
-        if (header.apiKey == ApiKeys.FETCH.id) {
+        if (header.apiKey == ApiKeys.FETCH) {
           val isFromFollower = body[FetchRequest].isFromFollower
           Seq(
             if (isFromFollower) RequestMetrics.followFetchMetricName
@@ -143,7 +121,7 @@ object RequestChannel extends Logging {
           )
         }
         else Seq.empty
-      val metricNames = fetchMetricNames :+ ApiKeys.forId(header.apiKey).name
+      val metricNames = fetchMetricNames :+ header.apiKey.name
       metricNames.foreach { metricName =>
         val m = RequestMetrics.metricsMap(metricName)
         m.requestRate.mark()
@@ -174,12 +152,22 @@ object RequestChannel extends Logging {
         val apiThrottleTimeMs = nanosToMs(responseCompleteTimeNanos - 
apiRemoteCompleteTimeNanos)
         val responseQueueTimeMs = nanosToMs(responseDequeueTimeNanos - 
responseCompleteTimeNanos)
         val responseSendTimeMs = nanosToMs(endTimeNanos - 
responseDequeueTimeNanos)
-        requestLogger.debug("Completed request:%s from connection 
%s;totalTime:%f,requestQueueTime:%f,localTime:%f,remoteTime:%f,throttleTime:%f,responseQueueTime:%f,sendTime:%f,securityProtocol:%s,principal:%s,listener:%s"
-          .format(requestDesc(detailsEnabled), connectionId, totalTimeMs, 
requestQueueTimeMs, apiLocalTimeMs, apiRemoteTimeMs, apiThrottleTimeMs, 
responseQueueTimeMs, responseSendTimeMs, securityProtocol, session.principal, 
listenerName.value))
+
+        requestLogger.debug(s"Completed request:${requestDesc(detailsEnabled)} 
from connection ${context.connectionId};" +
+          s"totalTime:$totalTimeMs," +
+          s"requestQueueTime:$requestQueueTimeMs," +
+          s"localTime:$apiLocalTimeMs," +
+          s"remoteTime:$apiRemoteTimeMs," +
+          s"throttleTime:$apiThrottleTimeMs," +
+          s"responseQueueTime:$responseQueueTimeMs," +
+          s"sendTime:$responseSendTimeMs," +
+          s"securityProtocol:${context.securityProtocol}," +
+          s"principal:${context.principal}," +
+          s"listener:${context.listenerName.value}")
       }
     }
 
-    def dispose(): Unit = {
+    def releaseBuffer(): Unit = {
       if (buffer != null) {
         memoryPool.release(buffer)
         buffer = null
@@ -187,30 +175,14 @@ object RequestChannel extends Logging {
     }
 
     override def toString = s"Request(processor=$processor, " +
-      s"connectionId=$connectionId, " +
+      s"connectionId=${context.connectionId}, " +
       s"session=$session, " +
-      s"listenerName=$listenerName, " +
-      s"securityProtocol=$securityProtocol, " +
+      s"listenerName=${context.listenerName}, " +
+      s"securityProtocol=${context.securityProtocol}, " +
       s"buffer=$buffer)"
 
   }
 
-  object Response {
-
-    def apply(request: Request, responseSend: Send): Response = {
-      require(request != null, "request should be non null")
-      require(responseSend != null, "responseSend should be non null")
-      new Response(request, Some(responseSend), SendAction)
-    }
-
-    def apply(request: Request, response: AbstractResponse): Response = {
-      require(request != null, "request should be non null")
-      require(response != null, "response should be non null")
-      apply(request, response.toSend(request.connectionId, request.header))
-    }
-
-  }
-
   class Response(val request: Request, val responseSend: Option[Send], val 
responseAction: ResponseAction) {
     request.responseCompleteTimeNanos = Time.SYSTEM.nanoseconds
     if (request.apiLocalCompleteTimeNanos == -1L) 
request.apiLocalCompleteTimeNanos = Time.SYSTEM.nanoseconds
@@ -228,7 +200,7 @@ object RequestChannel extends Logging {
 
 class RequestChannel(val numProcessors: Int, val queueSize: Int) extends 
KafkaMetricsGroup {
   private var responseListeners: List[(Int) => Unit] = Nil
-  private val requestQueue = new 
ArrayBlockingQueue[RequestChannel.Request](queueSize)
+  private val requestQueue = new ArrayBlockingQueue[BaseRequest](queueSize)
   private val responseQueues = new 
Array[BlockingQueue[RequestChannel.Response]](numProcessors)
   for(i <- 0 until numProcessors)
     responseQueues(i) = new LinkedBlockingQueue[RequestChannel.Response]()
@@ -260,17 +232,23 @@ class RequestChannel(val numProcessors: Int, val 
queueSize: Int) extends KafkaMe
 
   /** Send a response back to the socket server to be sent over the network */
   def sendResponse(response: RequestChannel.Response) {
+    if (isTraceEnabled) {
+      val requestHeader = response.request.header
+      trace(s"Sending ${requestHeader.apiKey} response to client 
${requestHeader.clientId} of " +
+        s"${response.responseSend.size} bytes.")
+    }
+
     responseQueues(response.processor).put(response)
     for(onResponse <- responseListeners)
       onResponse(response.processor)
   }
 
   /** Get the next request or block until specified time has elapsed */
-  def receiveRequest(timeout: Long): RequestChannel.Request =
+  def receiveRequest(timeout: Long): RequestChannel.BaseRequest =
     requestQueue.poll(timeout, TimeUnit.MILLISECONDS)
 
   /** Get the next request or block until there is one */
-  def receiveRequest(): RequestChannel.Request =
+  def receiveRequest(): RequestChannel.BaseRequest =
     requestQueue.take()
 
   /** Get a response for the given processor if there is one */
@@ -288,6 +266,9 @@ class RequestChannel(val numProcessors: Int, val queueSize: 
Int) extends KafkaMe
   def shutdown() {
     requestQueue.clear()
   }
+
+  def sendShutdownRequest(): Unit = requestQueue.put(ShutdownRequest)
+
 }
 
 object RequestMetrics {

http://git-wip-us.apache.org/repos/asf/kafka/blob/c4d629a0/core/src/main/scala/kafka/network/SocketServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala 
b/core/src/main/scala/kafka/network/SocketServer.scala
index e6f6662..f1bc926 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -37,8 +37,9 @@ import org.apache.kafka.common.metrics._
 import org.apache.kafka.common.metrics.stats.Rate
 import org.apache.kafka.common.network.{ChannelBuilders, KafkaChannel, 
ListenerName, Selectable, Send, Selector => KSelector}
 import org.apache.kafka.common.security.auth.KafkaPrincipal
-import org.apache.kafka.common.protocol.SecurityProtocol
+import org.apache.kafka.common.protocol.{ApiKeys, SecurityProtocol}
 import org.apache.kafka.common.protocol.types.SchemaException
+import org.apache.kafka.common.requests.{RequestContext, RequestHeader}
 import org.apache.kafka.common.utils.{KafkaThread, Time}
 
 import scala.collection._
@@ -485,7 +486,7 @@ private[kafka] class Processor(val id: Int,
             // that are sitting in the server's socket buffer
             updateRequestMetrics(curr.request)
             trace("Socket server received empty response to send, registering 
for read: " + curr)
-            val channelId = curr.request.connectionId
+            val channelId = curr.request.context.connectionId
             if (selector.channel(channelId) != null || 
selector.closingChannel(channelId) != null)
                 selector.unmute(channelId)
           case RequestChannel.SendAction =>
@@ -495,7 +496,7 @@ private[kafka] class Processor(val id: Int,
           case RequestChannel.CloseConnectionAction =>
             updateRequestMetrics(curr.request)
             trace("Closing socket connection actively according to the 
response code.")
-            close(selector, curr.request.connectionId)
+            close(selector, curr.request.context.connectionId)
         }
       } finally {
         curr = requestChannel.receiveResponse(id)
@@ -505,7 +506,7 @@ private[kafka] class Processor(val id: Int,
 
   /* `protected` for test usage */
   protected[network] def sendResponse(response: RequestChannel.Response, 
responseSend: Send) {
-    val connectionId = response.request.connectionId
+    val connectionId = response.request.context.connectionId
     trace(s"Socket server received response to send to $connectionId, 
registering for write and sending data: $response")
     // `channel` can be None if the connection was closed remotely or if 
selector closed it for being idle for too long
     if (channel(connectionId).isEmpty) {
@@ -515,7 +516,7 @@ private[kafka] class Processor(val id: Int,
     // Invoke send for closingChannel as well so that the send is failed and 
the channel closed properly and
     // removed from the Selector after discarding any pending staged receives.
     // `openOrClosingChannel` can be None if the selector closed the 
connection because it was idle for too long
-    if (!openOrClosingChannel(connectionId).isEmpty) {
+    if (openOrClosingChannel(connectionId).isDefined) {
       selector.send(responseSend)
       inflightResponses += (connectionId -> response)
     }
@@ -538,11 +539,12 @@ private[kafka] class Processor(val id: Int,
         val openChannel = selector.channel(receive.source)
         // Only methods that are safe to call on a disconnected channel should 
be invoked on 'openOrClosingChannel'.
         val openOrClosingChannel = if (openChannel != null) openChannel else 
selector.closingChannel(receive.source)
-        val session = RequestChannel.Session(new 
KafkaPrincipal(KafkaPrincipal.USER_TYPE, 
openOrClosingChannel.principal.getName), openOrClosingChannel.socketAddress)
-
-        val req = new RequestChannel.Request(processor = id, connectionId = 
receive.source, session = session,
-          startTimeNanos = time.nanoseconds, listenerName = listenerName, 
securityProtocol = securityProtocol,
-          memoryPool, receive.payload)
+        val principal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, 
openOrClosingChannel.principal.getName)
+        val header = RequestHeader.parse(receive.payload)
+        val context = new RequestContext(header, receive.source, 
openOrClosingChannel.socketAddress,
+          principal, listenerName, securityProtocol)
+        val req = new RequestChannel.Request(processor = id, context = context,
+          startTimeNanos = time.nanoseconds, memoryPool, receive.payload)
         requestChannel.sendRequest(req)
         selector.mute(receive.source)
       } catch {
@@ -565,7 +567,7 @@ private[kafka] class Processor(val id: Int,
   }
 
   private def updateRequestMetrics(request: RequestChannel.Request) {
-    val networkThreadTimeNanos = 
openOrClosingChannel(request.connectionId).fold(0L) 
(_.getAndResetNetworkThreadTimeNanos())
+    val networkThreadTimeNanos = 
openOrClosingChannel(request.context.connectionId).fold(0L)(_.getAndResetNetworkThreadTimeNanos())
     request.updateRequestMetrics(networkThreadTimeNanos)
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/c4d629a0/core/src/main/scala/kafka/server/ClientRequestQuotaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ClientRequestQuotaManager.scala 
b/core/src/main/scala/kafka/server/ClientRequestQuotaManager.scala
index fbc1881..f454483 100644
--- a/core/src/main/scala/kafka/server/ClientRequestQuotaManager.scala
+++ b/core/src/main/scala/kafka/server/ClientRequestQuotaManager.scala
@@ -18,14 +18,15 @@ package kafka.server
 
 import java.util.concurrent.TimeUnit
 
+import kafka.network.RequestChannel
 import org.apache.kafka.common.MetricName
 import org.apache.kafka.common.metrics._
 import org.apache.kafka.common.utils.Time
 
 
 class ClientRequestQuotaManager(private val config: ClientQuotaManagerConfig,
-                         private val metrics: Metrics,
-                         private val time: Time) extends 
ClientQuotaManager(config, metrics, QuotaType.Request, time) {
+                                private val metrics: Metrics,
+                                private val time: Time) extends 
ClientQuotaManager(config, metrics, QuotaType.Request, time) {
   val maxThrottleTimeMs = 
TimeUnit.SECONDS.toMillis(this.config.quotaWindowSizeSeconds)
   def exemptSensor = getOrCreateSensor(exemptSensorName, exemptMetricName)
 
@@ -33,25 +34,29 @@ class ClientRequestQuotaManager(private val config: 
ClientQuotaManagerConfig,
     exemptSensor.record(value)
   }
 
-  def maybeRecordAndThrottle(sanitizedUser: String, clientId: String, 
requestThreadTimeNanos: Long,
-      sendResponseCallback: Int => Unit, recordNetworkThreadTimeCallback: 
(Long => Unit) => Unit): Unit = {
+  def maybeRecordAndThrottle(request: RequestChannel.Request, 
sendResponseCallback: Int => Unit): Unit = {
+    if (request.apiRemoteCompleteTimeNanos == -1) {
+      // When this callback is triggered, the remote API call has completed
+      request.apiRemoteCompleteTimeNanos = time.nanoseconds
+    }
+
     if (quotasEnabled) {
-      val quotaSensors = getOrCreateQuotaSensors(sanitizedUser, clientId)
-      recordNetworkThreadTimeCallback(timeNanos => 
recordNoThrottle(quotaSensors, nanosToPercentage(timeNanos)))
+      val quotaSensors = 
getOrCreateQuotaSensors(request.session.sanitizedUser, request.header.clientId)
+      request.recordNetworkThreadTimeCallback = Some(timeNanos => 
recordNoThrottle(quotaSensors, nanosToPercentage(timeNanos)))
 
       recordAndThrottleOnQuotaViolation(
           quotaSensors,
-          nanosToPercentage(requestThreadTimeNanos),
+          nanosToPercentage(request.requestThreadTimeNanos),
           sendResponseCallback)
     } else {
       sendResponseCallback(0)
     }
   }
 
-  def maybeRecordExempt(requestThreadTimeNanos: Long, 
recordNetworkThreadTimeCallback: (Long => Unit) => Unit): Unit = {
+  def maybeRecordExempt(request: RequestChannel.Request): Unit = {
     if (quotasEnabled) {
-      recordNetworkThreadTimeCallback(timeNanos => 
recordExempt(nanosToPercentage(timeNanos)))
-      recordExempt(nanosToPercentage(requestThreadTimeNanos))
+      request.recordNetworkThreadTimeCallback = Some(timeNanos => 
recordExempt(nanosToPercentage(timeNanos)))
+      recordExempt(nanosToPercentage(request.requestThreadTimeNanos))
     }
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/c4d629a0/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala 
b/core/src/main/scala/kafka/server/KafkaApis.scala
index 4537898..1a85222 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -34,6 +34,7 @@ import kafka.coordinator.group.{GroupCoordinator, 
JoinGroupResult}
 import kafka.coordinator.transaction.{InitProducerIdResult, 
TransactionCoordinator}
 import kafka.log.{Log, LogManager, TimestampOffset}
 import kafka.network.RequestChannel
+import kafka.network.RequestChannel.{CloseConnectionAction, NoOpAction, 
SendAction}
 import kafka.security.SecurityUtils
 import kafka.security.auth._
 import kafka.utils.{CoreUtils, Logging, ZKGroupTopicDirs, ZkUtils}
@@ -42,7 +43,7 @@ import org.apache.kafka.common.internals.FatalExitError
 import org.apache.kafka.common.internals.Topic.{GROUP_METADATA_TOPIC_NAME, 
TRANSACTION_STATE_TOPIC_NAME, isInternal}
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.network.ListenerName
-import org.apache.kafka.common.protocol.{ApiKeys, Errors, Protocol}
+import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.record.{ControlRecordType, 
EndTransactionMarker, MemoryRecords, RecordBatch}
 import org.apache.kafka.common.requests.CreateAclsResponse.AclCreationResponse
 import org.apache.kafka.common.requests.DeleteAclsResponse.{AclDeletionResult, 
AclFilterResponse}
@@ -91,9 +92,9 @@ class KafkaApis(val requestChannel: RequestChannel,
    */
   def handle(request: RequestChannel.Request) {
     try {
-      trace("Handling request:%s from connection 
%s;securityProtocol:%s,principal:%s".
-        format(request.requestDesc(true), request.connectionId, 
request.securityProtocol, request.session.principal))
-      ApiKeys.forId(request.header.apiKey) match {
+      trace(s"Handling request:${request.requestDesc(true)} from connection 
${request.context.connectionId};" +
+        
s"securityProtocol:${request.context.securityProtocol},principal:${request.context.principal}")
+      request.header.apiKey match {
         case ApiKeys.PRODUCE => handleProduceRequest(request)
         case ApiKeys.FETCH => handleFetchRequest(request)
         case ApiKeys.LIST_OFFSETS => handleListOffsetRequest(request)
@@ -166,7 +167,7 @@ class KafkaApis(val requestChannel: RequestChannel,
     if (authorize(request.session, ClusterAction, Resource.ClusterResource)) {
       val result = replicaManager.becomeLeaderOrFollower(correlationId, 
leaderAndIsrRequest, onLeadershipChange)
       val leaderAndIsrResponse = new LeaderAndIsrResponse(result.error, 
result.responseMap.asJava)
-      sendResponseExemptThrottle(RequestChannel.Response(request, 
leaderAndIsrResponse))
+      sendResponseExemptThrottle(request, leaderAndIsrResponse)
     } else {
       val result = leaderAndIsrRequest.partitionStates.asScala.keys.map((_, 
Errors.CLUSTER_AUTHORIZATION_FAILED)).toMap
       sendResponseMaybeThrottle(request, _ =>
@@ -193,8 +194,7 @@ class KafkaApis(val requestChannel: RequestChannel,
           groupCoordinator.handleGroupEmigration(topicPartition.partition)
         }
       }
-      val response = new StopReplicaResponse(error, result.asJava)
-      sendResponseExemptThrottle(RequestChannel.Response(request, response))
+      sendResponseExemptThrottle(request, new StopReplicaResponse(error, 
result.asJava))
     } else {
       val result = stopReplicaRequest.partitions.asScala.map((_, 
Errors.CLUSTER_AUTHORIZATION_FAILED)).toMap
       sendResponseMaybeThrottle(request, _ =>
@@ -218,7 +218,7 @@ class KafkaApis(val requestChannel: RequestChannel,
         adminManager.tryCompleteDelayedTopicOperations(topic)
         }
       }
-      sendResponseExemptThrottle(RequestChannel.Response(request, new 
UpdateMetadataResponse(Errors.NONE)))
+      sendResponseExemptThrottle(request, new 
UpdateMetadataResponse(Errors.NONE))
     } else {
       sendResponseMaybeThrottle(request, _ => new 
UpdateMetadataResponse(Errors.CLUSTER_AUTHORIZATION_FAILED))
     }
@@ -232,14 +232,14 @@ class KafkaApis(val requestChannel: RequestChannel,
     authorizeClusterAction(request)
 
     def controlledShutdownCallback(controlledShutdownResult: 
Try[Set[TopicAndPartition]]): Unit = {
-      controlledShutdownResult match {
+      val response = controlledShutdownResult match {
         case Success(partitionsRemaining) =>
-          val controlledShutdownResponse = new 
ControlledShutdownResponse(Errors.NONE,
-            partitionsRemaining.map(_.asTopicPartition).asJava)
-          sendResponseExemptThrottle(RequestChannel.Response(request, 
controlledShutdownResponse))
+          new ControlledShutdownResponse(Errors.NONE, 
partitionsRemaining.map(_.asTopicPartition).asJava)
+
         case Failure(throwable) =>
-          sendResponseExemptThrottle(RequestChannel.Response(request, 
controlledShutdownRequest.getErrorResponse(throwable)))
+          controlledShutdownRequest.getErrorResponse(throwable)
       }
+      sendResponseExemptThrottle(request, response)
     }
     controller.shutdownBroker(controlledShutdownRequest.brokerId, 
controlledShutdownCallback)
   }
@@ -362,22 +362,17 @@ class KafkaApis(val requestChannel: RequestChannel,
    */
   def handleProduceRequest(request: RequestChannel.Request) {
     val produceRequest = request.body[ProduceRequest]
-    val numBytesAppended = request.header.toStruct.sizeOf + 
request.bodyAndSize.size
-
-    def sendErrorResponse(error: Errors): Unit = {
-      sendResponseMaybeThrottle(request, requestThrottleMs =>
-        produceRequest.getErrorResponse(requestThrottleMs, error.exception))
-    }
+    val numBytesAppended = request.header.toStruct.sizeOf + 
request.sizeOfBodyInBytes
 
     if (produceRequest.isTransactional) {
       if (!authorize(request.session, Write, new Resource(TransactionalId, 
produceRequest.transactionalId))) {
-        sendErrorResponse(Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED)
+        sendErrorResponseMaybeThrottle(request, 
Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception)
         return
       }
       // Note that authorization to a transactionalId implies ProducerId 
authorization
 
     } else if (produceRequest.isIdempotent && !authorize(request.session, 
IdempotentWrite, Resource.ClusterResource)) {
-      sendErrorResponse(Errors.CLUSTER_AUTHORIZATION_FAILED)
+      sendErrorResponseMaybeThrottle(request, 
Errors.CLUSTER_AUTHORIZATION_FAILED.exception)
       return
     }
 
@@ -415,7 +410,6 @@ class KafkaApis(val requestChannel: RequestChannel,
           // no operation needed if producer request.required.acks = 0; 
however, if there is any error in handling
           // the request, since no response is expected by the producer, the 
server will close socket server so that
           // the producer client will know that some error has happened and 
will refresh its metadata
-          val action =
           if (errorInResponse) {
             val exceptionsSummary = mergedResponseStatus.map { case 
(topicPartition, status) =>
               topicPartition -> status.error.exceptionName
@@ -425,9 +419,10 @@ class KafkaApis(val requestChannel: RequestChannel,
                 s"from client id ${request.header.clientId} with ack=0\n" +
                 s"Topic and partition to exceptions: $exceptionsSummary"
             )
-            RequestChannel.CloseConnectionAction
-          } else RequestChannel.NoOpAction
-          sendResponseExemptThrottle(new RequestChannel.Response(request, 
None, action))
+            closeConnection(request)
+          } else {
+            sendNoOpResponseExemptThrottle(request)
+          }
         } else {
           sendResponseMaybeThrottle(request, requestThrottleMs =>
             new ProduceResponse(mergedResponseStatus.asJava, 
bandwidthThrottleTimeMs + requestThrottleMs))
@@ -459,7 +454,7 @@ class KafkaApis(val requestChannel: RequestChannel,
         responseCallback = sendResponseCallback)
 
       // if the request is put into the purgatory, it will have a held 
reference and hence cannot be garbage collected;
-      // hence we clear its data here inorder to let GC re-claim its memory 
since it is already appended to log
+      // hence we clear its data here in order to let GC reclaim its memory 
since it is already appended to log
       produceRequest.clearPartitionRecords()
     }
   }
@@ -544,28 +539,23 @@ class KafkaApis(val requestChannel: RequestChannel,
 
       // fetch response callback invoked after any throttling
       def fetchResponseCallback(bandwidthThrottleTimeMs: Int) {
-        def createResponse(requestThrottleTimeMs: Int): 
RequestChannel.Response = {
+        def createResponse(requestThrottleTimeMs: Int): FetchResponse = {
           val convertedData = new util.LinkedHashMap[TopicPartition, 
FetchResponse.PartitionData]
           fetchedPartitionData.asScala.foreach { case (tp, partitionData) =>
             convertedData.put(tp, convertedPartitionData(tp, partitionData))
           }
-          val response = new FetchResponse(convertedData, 0)
-          val responseSend = response.toSend(bandwidthThrottleTimeMs + 
requestThrottleTimeMs, request.connectionId, request.header)
-
-          trace(s"Sending fetch response to client $clientId of 
${responseSend.size} bytes.")
+          val response = new FetchResponse(convertedData, 
bandwidthThrottleTimeMs + requestThrottleTimeMs)
           response.responseData.asScala.foreach { case (topicPartition, data) 
=>
             // record the bytes out metrics only when the response is being 
sent
             brokerTopicStats.updateBytesOut(topicPartition.topic, 
fetchRequest.isFromFollower, data.records.sizeInBytes)
           }
-
-          RequestChannel.Response(request, responseSend)
+          response
         }
 
         if (fetchRequest.isFromFollower)
-          sendResponseExemptThrottle(createResponse(0))
+          sendResponseExemptThrottle(request, createResponse(0))
         else
-          sendResponseMaybeThrottle(request, request.header.clientId, 
requestThrottleMs =>
-            requestChannel.sendResponse(createResponse(requestThrottleMs)))
+          sendResponseMaybeThrottle(request, requestThrottleMs => 
createResponse(requestThrottleMs))
       }
 
       // When this callback is triggered, the remote API call has completed.
@@ -912,12 +902,12 @@ class KafkaApis(val requestChannel: RequestChannel,
    */
   def handleTopicMetadataRequest(request: RequestChannel.Request) {
     val metadataRequest = request.body[MetadataRequest]
-    val requestVersion = request.header.apiVersion()
+    val requestVersion = request.header.apiVersion
 
     val topics =
       // Handle old metadata request logic. Version 0 has no way to specify 
"no topics".
       if (requestVersion == 0) {
-        if (metadataRequest.topics() == null || 
metadataRequest.topics().isEmpty)
+        if (metadataRequest.topics() == null || metadataRequest.topics.isEmpty)
           metadataCache.getAllTopics()
         else
           metadataRequest.topics.asScala.toSet
@@ -963,7 +953,7 @@ class KafkaApis(val requestChannel: RequestChannel,
       if (authorizedTopics.isEmpty)
         Seq.empty[MetadataResponse.TopicMetadata]
       else
-        getTopicMetadata(metadataRequest.allowAutoTopicCreation, 
authorizedTopics, request.listenerName,
+        getTopicMetadata(metadataRequest.allowAutoTopicCreation, 
authorizedTopics, request.context.listenerName,
           errorUnavailableEndpoints)
 
     val completeTopicMetadata = topicMetadata ++ 
unauthorizedForCreateTopicMetadata ++ unauthorizedForDescribeTopicMetadata
@@ -976,7 +966,7 @@ class KafkaApis(val requestChannel: RequestChannel,
     sendResponseMaybeThrottle(request, requestThrottleMs =>
       new MetadataResponse(
         requestThrottleMs,
-        brokers.map(_.getNode(request.listenerName)).asJava,
+        brokers.map(_.getNode(request.context.listenerName)).asJava,
         clusterId,
         
metadataCache.getControllerId.getOrElse(MetadataResponse.NO_CONTROLLER_ID),
         completeTopicMetadata.asJava
@@ -1062,26 +1052,23 @@ class KafkaApis(val requestChannel: RequestChannel,
   def handleFindCoordinatorRequest(request: RequestChannel.Request) {
     val findCoordinatorRequest = request.body[FindCoordinatorRequest]
 
-    def sendErrorResponse(error: Errors): Unit =
-      sendResponseMaybeThrottle(request, requestThrottleMs => new 
FindCoordinatorResponse(error, Node.noNode))
-
     if (findCoordinatorRequest.coordinatorType == 
FindCoordinatorRequest.CoordinatorType.GROUP &&
         !authorize(request.session, Describe, new Resource(Group, 
findCoordinatorRequest.coordinatorKey)))
-      sendErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED)
+      sendErrorResponseMaybeThrottle(request, 
Errors.GROUP_AUTHORIZATION_FAILED.exception)
     else if (findCoordinatorRequest.coordinatorType == 
FindCoordinatorRequest.CoordinatorType.TRANSACTION &&
         !authorize(request.session, Describe, new Resource(TransactionalId, 
findCoordinatorRequest.coordinatorKey)))
-      sendErrorResponse(Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED)
+      sendErrorResponseMaybeThrottle(request, 
Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception)
     else {
       // get metadata (and create the topic if necessary)
       val (partition, topicMetadata) = findCoordinatorRequest.coordinatorType 
match {
         case FindCoordinatorRequest.CoordinatorType.GROUP =>
           val partition = 
groupCoordinator.partitionFor(findCoordinatorRequest.coordinatorKey)
-          val metadata = getOrCreateInternalTopic(GROUP_METADATA_TOPIC_NAME, 
request.listenerName)
+          val metadata = getOrCreateInternalTopic(GROUP_METADATA_TOPIC_NAME, 
request.context.listenerName)
           (partition, metadata)
 
         case FindCoordinatorRequest.CoordinatorType.TRANSACTION =>
           val partition = 
txnCoordinator.partitionFor(findCoordinatorRequest.coordinatorKey)
-          val metadata = 
getOrCreateInternalTopic(TRANSACTION_STATE_TOPIC_NAME, request.listenerName)
+          val metadata = 
getOrCreateInternalTopic(TRANSACTION_STATE_TOPIC_NAME, 
request.context.listenerName)
           (partition, metadata)
 
         case _ =>
@@ -1201,9 +1188,9 @@ class KafkaApis(val requestChannel: RequestChannel,
       sendResponseCallback(Array[Byte](), Errors.GROUP_AUTHORIZATION_FAILED)
     } else {
       groupCoordinator.handleSyncGroup(
-        syncGroupRequest.groupId(),
-        syncGroupRequest.generationId(),
-        syncGroupRequest.memberId(),
+        syncGroupRequest.groupId,
+        syncGroupRequest.generationId,
+        syncGroupRequest.memberId,
         syncGroupRequest.groupAssignment().asScala.mapValues(Utils.toArray),
         sendResponseCallback
       )
@@ -1230,9 +1217,9 @@ class KafkaApis(val requestChannel: RequestChannel,
     } else {
       // let the coordinator to handle heartbeat
       groupCoordinator.handleHeartbeat(
-        heartbeatRequest.groupId(),
-        heartbeatRequest.memberId(),
-        heartbeatRequest.groupGenerationId(),
+        heartbeatRequest.groupId,
+        heartbeatRequest.memberId,
+        heartbeatRequest.groupGenerationId,
         sendResponseCallback)
     }
   }
@@ -1257,8 +1244,8 @@ class KafkaApis(val requestChannel: RequestChannel,
     } else {
       // let the coordinator to handle leave-group
       groupCoordinator.handleLeaveGroup(
-        leaveGroupRequest.groupId(),
-        leaveGroupRequest.memberId(),
+        leaveGroupRequest.groupId,
+        leaveGroupRequest.memberId,
         sendResponseCallback)
     }
   }
@@ -1274,14 +1261,14 @@ class KafkaApis(val requestChannel: RequestChannel,
     // If this is considered to leak information about the broker version a 
workaround is to use SSL
     // with client authentication which is performed at an earlier stage of 
the connection where the
     // ApiVersionRequest is not available.
-    def sendResponseCallback(requestThrottleMs: Int) {
-      val responseSend =
-        if (Protocol.apiVersionSupported(ApiKeys.API_VERSIONS.id, 
request.header.apiVersion))
-          ApiVersionsResponse.apiVersionsResponse(requestThrottleMs, 
config.interBrokerProtocolVersion.messageFormatVersion).toSend(request.connectionId,
 request.header)
-        else ApiVersionsResponse.unsupportedVersionSend(request.connectionId, 
request.header)
-      requestChannel.sendResponse(RequestChannel.Response(request, 
responseSend))
-    }
-    sendResponseMaybeThrottle(request, request.header.clientId, 
sendResponseCallback)
+    def createResponseCallback(requestThrottleMs: Int): ApiVersionsResponse = {
+      val apiVersionRequest = request.body[ApiVersionsRequest]
+      if (apiVersionRequest.hasUnsupportedRequestVersion)
+        apiVersionRequest.getErrorResponse(requestThrottleMs, 
Errors.UNSUPPORTED_VERSION.exception)
+      else
+        ApiVersionsResponse.apiVersionsResponse(requestThrottleMs, 
config.interBrokerProtocolVersion.messageFormatVersion)
+    }
+    sendResponseMaybeThrottle(request, createResponseCallback)
   }
 
   def handleCreateTopicsRequest(request: RequestChannel.Request) {
@@ -1427,17 +1414,13 @@ class KafkaApis(val requestChannel: RequestChannel,
     val initProducerIdRequest = request.body[InitProducerIdRequest]
     val transactionalId = initProducerIdRequest.transactionalId
 
-    def sendErrorResponse(error: Errors): Unit = {
-      sendResponseMaybeThrottle(request, requestThrottleMs => new 
InitProducerIdResponse(requestThrottleMs, error))
-    }
-
     if (transactionalId != null) {
       if (!authorize(request.session, Write, new Resource(TransactionalId, 
transactionalId))) {
-        sendErrorResponse(Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED)
+        sendErrorResponseMaybeThrottle(request, 
Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception)
         return
       }
     } else if (!authorize(request.session, IdempotentWrite, 
Resource.ClusterResource)) {
-      sendErrorResponse(Errors.CLUSTER_AUTHORIZATION_FAILED)
+      sendErrorResponseMaybeThrottle(request, 
Errors.CLUSTER_AUTHORIZATION_FAILED.exception)
       return
     }
 
@@ -1486,7 +1469,7 @@ class KafkaApis(val requestChannel: RequestChannel,
     val numAppends = new AtomicInteger(markers.size)
 
     if (numAppends.get == 0) {
-      sendResponseExemptThrottle(RequestChannel.Response(request, new 
WriteTxnMarkersResponse(errors)))
+      sendResponseExemptThrottle(request, new WriteTxnMarkersResponse(errors))
       return
     }
 
@@ -1525,7 +1508,7 @@ class KafkaApis(val requestChannel: RequestChannel,
       }
 
       if (numAppends.decrementAndGet() == 0)
-        sendResponseExemptThrottle(RequestChannel.Response(request, new 
WriteTxnMarkersResponse(errors)))
+        sendResponseExemptThrottle(request, new 
WriteTxnMarkersResponse(errors))
     }
 
     // TODO: The current append API makes doing separate writes per producerId 
a little easier, but it would
@@ -1579,7 +1562,7 @@ class KafkaApis(val requestChannel: RequestChannel,
     // No log appends were written as all partitions had incorrect log format
     // so we need to send the error response
     if (skippedMarkers == markers.size())
-      sendResponseExemptThrottle(request, () => sendResponse(request, new 
WriteTxnMarkersResponse(errors)))
+      sendResponseExemptThrottle(request, new WriteTxnMarkersResponse(errors))
   }
 
   def ensureInterBrokerVersion(version: ApiVersion): Unit = {
@@ -1679,17 +1662,12 @@ class KafkaApis(val requestChannel: RequestChannel,
     val header = request.header
     val txnOffsetCommitRequest = request.body[TxnOffsetCommitRequest]
 
-    def sendErrorResponse(error: Errors): Unit = {
-      sendResponseMaybeThrottle(request, requestThrottleMs =>
-        txnOffsetCommitRequest.getErrorResponse(requestThrottleMs, 
error.exception))
-    }
-
     // authorize for the transactionalId and the consumer group. Note that we 
skip producerId authorization
     // since it is implied by transactionalId authorization
     if (!authorize(request.session, Write, new Resource(TransactionalId, 
txnOffsetCommitRequest.transactionalId)))
-      sendErrorResponse(Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED)
+      sendErrorResponseMaybeThrottle(request, 
Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception)
     else if (!authorize(request.session, Read, new Resource(Group, 
txnOffsetCommitRequest.consumerGroupId)))
-      sendErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED)
+      sendErrorResponseMaybeThrottle(request, 
Errors.GROUP_AUTHORIZATION_FAILED.exception)
     else {
       val (existingAndAuthorizedForDescribeTopics, 
nonExistingOrUnauthorizedForDescribeTopics) = 
txnOffsetCommitRequest.offsets.asScala.toMap.partition {
         case (topicPartition, _) =>
@@ -1873,30 +1851,8 @@ class KafkaApis(val requestChannel: RequestChannel,
     val requestInfo = offsetForLeaderEpoch.epochsByTopicPartition()
     authorizeClusterAction(request)
 
-    val responseBody = new OffsetsForLeaderEpochResponse(
-      replicaManager.lastOffsetForLeaderEpoch(requestInfo.asScala).asJava
-    )
-    sendResponseExemptThrottle(RequestChannel.Response(request, responseBody))
-  }
-
-  private def handleError(request: RequestChannel.Request, e: Throwable) {
-    val mayThrottle = e.isInstanceOf[ClusterAuthorizationException] || 
!ApiKeys.forId(request.header.apiKey).clusterAction
-
-    def createResponse(requestThrottleMs: Int): RequestChannel.Response = {
-      val response = 
request.body[AbstractRequest].getErrorResponse(requestThrottleMs, e)
-      /* If request doesn't have a default error response, we just close the 
connection.
-         For example, when produce request has acks set to 0 */
-      if (response == null)
-        new RequestChannel.Response(request, None, 
RequestChannel.CloseConnectionAction)
-      else RequestChannel.Response(request, response)
-    }
-    error("Error when handling request 
%s".format(request.body[AbstractRequest]), e)
-    if (mayThrottle)
-      sendResponseMaybeThrottle(request, request.header.clientId, { 
requestThrottleMs =>
-        requestChannel.sendResponse(createResponse(requestThrottleMs))
-      })
-    else
-      sendResponseExemptThrottle(createResponse(0))
+    val lastOffsetForLeaderEpoch = 
replicaManager.lastOffsetForLeaderEpoch(requestInfo.asScala).asJava
+    sendResponseExemptThrottle(request, new 
OffsetsForLeaderEpochResponse(lastOffsetForLeaderEpoch))
   }
 
   def handleAlterConfigsRequest(request: RequestChannel.Request): Unit = {
@@ -1969,34 +1925,61 @@ class KafkaApis(val requestChannel: RequestChannel,
       throw new ClusterAuthorizationException(s"Request $request is not 
authorized.")
   }
 
-  private def sendResponseMaybeThrottle(request: RequestChannel.Request, 
createResponse: Int => AbstractResponse) {
-    sendResponseMaybeThrottle(request, request.header.clientId, { 
requestThrottleMs =>
-      sendResponse(request, createResponse(requestThrottleMs))
-    })
+  private def handleError(request: RequestChannel.Request, e: Throwable) {
+    val mayThrottle = e.isInstanceOf[ClusterAuthorizationException] || 
!request.header.apiKey.clusterAction
+    error("Error when handling request 
%s".format(request.body[AbstractRequest]), e)
+    if (mayThrottle)
+      sendErrorResponseMaybeThrottle(request, e)
+    else
+      sendErrorResponseExemptThrottle(request, e)
   }
 
-  private def sendResponseMaybeThrottle(request: RequestChannel.Request, 
clientId: String, sendResponseCallback: Int => Unit) {
+  private def sendResponseMaybeThrottle(request: RequestChannel.Request, 
createResponse: Int => AbstractResponse): Unit = {
+    quotas.request.maybeRecordAndThrottle(request,
+      throttleTimeMs => sendResponse(request, 
Some(createResponse(throttleTimeMs))))
+  }
 
-    if (request.apiRemoteCompleteTimeNanos == -1) {
-      // When this callback is triggered, the remote API call has completed
-      request.apiRemoteCompleteTimeNanos = time.nanoseconds
-    }
-    quotas.request.maybeRecordAndThrottle(request.session.sanitizedUser, 
clientId,
-        request.requestThreadTimeNanos, sendResponseCallback,
-        callback => request.recordNetworkThreadTimeCallback = Some(callback))
+  private def sendErrorResponseMaybeThrottle(request: RequestChannel.Request, 
error: Throwable) {
+    quotas.request.maybeRecordAndThrottle(request, 
sendErrorOrCloseConnection(request, error))
+  }
+
+  private def sendResponseExemptThrottle(request: RequestChannel.Request, 
response: AbstractResponse): Unit = {
+    quotas.request.maybeRecordExempt(request)
+    sendResponse(request, Some(response))
   }
 
-  private def sendResponseExemptThrottle(response: RequestChannel.Response) {
-    sendResponseExemptThrottle(response.request, () => 
requestChannel.sendResponse(response))
+  private def sendErrorResponseExemptThrottle(request: RequestChannel.Request, 
error: Throwable): Unit = {
+    quotas.request.maybeRecordExempt(request)
+    sendErrorOrCloseConnection(request, error)(throttleMs = 0)
+  }
+
+  private def sendErrorOrCloseConnection(request: RequestChannel.Request, 
error: Throwable)(throttleMs: Int): Unit = {
+    val response = request.body[AbstractRequest].getErrorResponse(throttleMs, 
error)
+    if (response == null)
+      closeConnection(request)
+    else
+      sendResponse(request, Some(response))
   }
 
-  private def sendResponseExemptThrottle(request: RequestChannel.Request, 
sendResponseCallback: () => Unit) {
-    quotas.request.maybeRecordExempt(request.requestThreadTimeNanos,
-        callback => request.recordNetworkThreadTimeCallback = Some(callback))
-    sendResponseCallback()
+  private def sendNoOpResponseExemptThrottle(request: RequestChannel.Request): 
Unit = {
+    quotas.request.maybeRecordExempt(request)
+    sendResponse(request, None)
   }
 
-  private def sendResponse(request: RequestChannel.Request, response: 
AbstractResponse) {
-    requestChannel.sendResponse(RequestChannel.Response(request, response))
+  private def closeConnection(request: RequestChannel.Request): Unit = {
+    // This case is used when the request handler has encountered an error, 
but the client
+    // does not expect a response (e.g. when produce request has acks set to 0)
+    requestChannel.sendResponse(new RequestChannel.Response(request, None, 
CloseConnectionAction))
   }
+
+  private def sendResponse(request: RequestChannel.Request, responseOpt: 
Option[AbstractResponse]): Unit = {
+    responseOpt match {
+      case Some(response) =>
+        val responseSend = request.context.buildResponse(response)
+        requestChannel.sendResponse(new RequestChannel.Response(request, 
Some(responseSend), SendAction))
+      case None =>
+        requestChannel.sendResponse(new RequestChannel.Response(request, None, 
NoOpAction))
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/c4d629a0/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala 
b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
index b3f98d1..356ccae 100755
--- a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
+++ b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
@@ -41,42 +41,43 @@ class KafkaRequestHandler(id: Int,
 
   def run() {
     while(true) {
-      var req: RequestChannel.Request = null
-      try {
-        while (req == null) {
-          // We use a single meter for aggregate idle percentage for the 
thread pool.
-          // Since meter is calculated as total_recorded_value / time_window 
and
-          // time_window is independent of the number of threads, each 
recorded idle
-          // time should be discounted by # threads.
-          val startSelectTime = time.nanoseconds
-          req = requestChannel.receiveRequest(300)
-          val endTime = time.nanoseconds
-          if (req != null)
-            req.requestDequeueTimeNanos = endTime
-          val idleTime = endTime - startSelectTime
-          aggregateIdleMeter.mark(idleTime / totalHandlerThreads)
-        }
-
-        if (req eq RequestChannel.AllDone) {
+      // We use a single meter for aggregate idle percentage for the thread 
pool.
+      // Since meter is calculated as total_recorded_value / time_window and
+      // time_window is independent of the number of threads, each recorded 
idle
+      // time should be discounted by # threads.
+      val startSelectTime = time.nanoseconds
+
+      val req = requestChannel.receiveRequest(300)
+      val endTime = time.nanoseconds
+      val idleTime = endTime - startSelectTime
+      aggregateIdleMeter.mark(idleTime / totalHandlerThreads)
+
+      req match {
+        case RequestChannel.ShutdownRequest =>
           debug("Kafka request handler %d on broker %d received shut down 
command".format(id, brokerId))
           latch.countDown()
           return
-        }
-        trace("Kafka request handler %d on broker %d handling request 
%s".format(id, brokerId, req))
-        apis.handle(req)
-      } catch {
-        case e: FatalExitError =>
-          latch.countDown()
-          Exit.exit(e.statusCode)
-        case e: Throwable => error("Exception when handling request", e)
-      } finally {
-        if (req != null)
-          req.dispose()
+
+        case request: RequestChannel.Request =>
+          try {
+            request.requestDequeueTimeNanos = endTime
+            trace("Kafka request handler %d on broker %d handling request 
%s".format(id, brokerId, request))
+            apis.handle(request)
+          } catch {
+            case e: FatalExitError =>
+              latch.countDown()
+              Exit.exit(e.statusCode)
+            case e: Throwable => error("Exception when handling request", e)
+          } finally {
+              request.releaseBuffer()
+          }
+
+        case null => // continue
       }
     }
   }
 
-  def initiateShutdown(): Unit = 
requestChannel.sendRequest(RequestChannel.AllDone)
+  def initiateShutdown(): Unit = requestChannel.sendShutdownRequest()
 
   def awaitShutdown(): Unit = latch.await()
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/c4d629a0/core/src/main/scala/kafka/utils/Logging.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/Logging.scala 
b/core/src/main/scala/kafka/utils/Logging.scala
index 2df5878..f2cd4e9 100755
--- a/core/src/main/scala/kafka/utils/Logging.scala
+++ b/core/src/main/scala/kafka/utils/Logging.scala
@@ -49,6 +49,8 @@ trait Logging {
 
   def isDebugEnabled: Boolean = logger.isDebugEnabled
 
+  def isTraceEnabled: Boolean = logger.isTraceEnabled
+
   def debug(msg: => String): Unit = {
     if (logger.isDebugEnabled())
       logger.debug(msgWithLogIdent(msg))

http://git-wip-us.apache.org/repos/asf/kafka/blob/c4d629a0/core/src/test/scala/other/kafka/TestOffsetManager.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/other/kafka/TestOffsetManager.scala 
b/core/src/test/scala/other/kafka/TestOffsetManager.scala
index 69531d4..c8f9397 100644
--- a/core/src/test/scala/other/kafka/TestOffsetManager.scala
+++ b/core/src/test/scala/other/kafka/TestOffsetManager.scala
@@ -263,8 +263,7 @@ object TestOffsetManager {
       }
 
       fetchThread = new FetchThread(threadCount, fetchIntervalMs, zkUtils)
-
-      val statsThread = new StatsThread(reportingIntervalMs, commitThreads, 
fetchThread)
+      statsThread = new StatsThread(reportingIntervalMs, commitThreads, 
fetchThread)
 
       Runtime.getRuntime.addShutdownHook(new Thread() {
         override def run() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/c4d629a0/core/src/test/scala/unit/kafka/admin/AdminRackAwareTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/AdminRackAwareTest.scala 
b/core/src/test/scala/unit/kafka/admin/AdminRackAwareTest.scala
index 27ff4d4..ef85c6d 100644
--- a/core/src/test/scala/unit/kafka/admin/AdminRackAwareTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AdminRackAwareTest.scala
@@ -16,12 +16,11 @@
  */
 package kafka.admin
 
-import kafka.utils.{Logging, TestUtils}
-import kafka.zk.ZooKeeperTestHarness
+import kafka.utils.Logging
 import org.junit.Assert._
 import org.junit.Test
 
-import scala.collection.{Map, Seq}
+import scala.collection.Map
 
 class AdminRackAwareTest extends RackAwareTest with Logging {
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/c4d629a0/core/src/test/scala/unit/kafka/admin/ResetConsumerGroupOffsetTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/unit/kafka/admin/ResetConsumerGroupOffsetTest.scala 
b/core/src/test/scala/unit/kafka/admin/ResetConsumerGroupOffsetTest.scala
index effcd92..6594b6e 100644
--- a/core/src/test/scala/unit/kafka/admin/ResetConsumerGroupOffsetTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ResetConsumerGroupOffsetTest.scala
@@ -14,17 +14,12 @@ package kafka.admin
 
 import java.io.{BufferedWriter, File, FileWriter}
 import java.text.{ParseException, SimpleDateFormat}
-import java.util.concurrent.{ExecutorService, Executors, TimeUnit}
-import java.util.{Calendar, Collections, Date, Properties}
+import java.util.{Calendar, Date, Properties}
 
 import kafka.admin.ConsumerGroupCommand.{ConsumerGroupCommandOptions, 
KafkaConsumerGroupService}
-import kafka.admin.{AdminUtils, ConsumerGroupCommand}
 import kafka.integration.KafkaServerTestHarness
 import kafka.server.KafkaConfig
 import kafka.utils.TestUtils
-import org.apache.kafka.clients.consumer.KafkaConsumer
-import org.apache.kafka.common.errors.WakeupException
-import org.apache.kafka.common.serialization.StringDeserializer
 import org.junit.{After, Before, Test}
 
 import scala.collection.mutable.ArrayBuffer
@@ -330,7 +325,7 @@ class ResetConsumerGroupOffsetTest extends 
KafkaServerTestHarness {
     AdminUtils.deleteTopic(zkUtils, topic1)
   }
 
-  private def produceConsumeAndShutdown(consumerGroupCommand: 
KafkaConsumerGroupService, numConsumers: Int = 1, topic: String, totalMessages: 
Int) {
+  private def produceConsumeAndShutdown(consumerGroupCommand: 
KafkaConsumerGroupService, numConsumers: Int, topic: String, totalMessages: 
Int) {
     TestUtils.produceMessages(servers, topic, totalMessages, acks = 1, 100 * 
1000)
     val executor = createConsumerGroupExecutor(brokerList, numConsumers, 
group, topic)
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/c4d629a0/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala
 
b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala
index e7fbb83..0bc1c9f 100644
--- 
a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala
+++ 
b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala
@@ -28,7 +28,7 @@ import org.junit.Assert._
 import org.junit.Test
 import com.yammer.metrics.Metrics
 import kafka.common.RequestAndCompletionHandler
-import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable
@@ -296,7 +296,8 @@ class TransactionMarkerChannelManagerTest {
 
     val response = new WriteTxnMarkersResponse(createPidErrorMap(Errors.NONE))
     for (requestAndHandler <- requestAndHandlers) {
-      requestAndHandler.handler.onComplete(new ClientResponse(new 
RequestHeader(0, 0, "client", 1), null, null, 0, 0, false, null, response))
+      requestAndHandler.handler.onComplete(new ClientResponse(new 
RequestHeader(ApiKeys.PRODUCE, 0, "client", 1),
+        null, null, 0, 0, false, null, response))
     }
 
     EasyMock.verify(txnStateManager)
@@ -344,7 +345,8 @@ class TransactionMarkerChannelManagerTest {
 
     val response = new WriteTxnMarkersResponse(createPidErrorMap(Errors.NONE))
     for (requestAndHandler <- requestAndHandlers) {
-      requestAndHandler.handler.onComplete(new ClientResponse(new 
RequestHeader(0, 0, "client", 1), null, null, 0, 0, false, null, response))
+      requestAndHandler.handler.onComplete(new ClientResponse(new 
RequestHeader(ApiKeys.PRODUCE, 0, "client", 1),
+        null, null, 0, 0, false, null, response))
     }
 
     EasyMock.verify(txnStateManager)
@@ -398,7 +400,8 @@ class TransactionMarkerChannelManagerTest {
 
     val response = new WriteTxnMarkersResponse(createPidErrorMap(Errors.NONE))
     for (requestAndHandler <- requestAndHandlers) {
-      requestAndHandler.handler.onComplete(new ClientResponse(new 
RequestHeader(0, 0, "client", 1), null, null, 0, 0, false, null, response))
+      requestAndHandler.handler.onComplete(new ClientResponse(new 
RequestHeader(ApiKeys.PRODUCE, 0, "client", 1),
+        null, null, 0, 0, false, null, response))
     }
 
     // call this again so that append log will be retried

http://git-wip-us.apache.org/repos/asf/kafka/blob/c4d629a0/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandlerTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandlerTest.scala
 
b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandlerTest.scala
index 6323d15..41ec159 100644
--- 
a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandlerTest.scala
+++ 
b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandlerTest.scala
@@ -20,7 +20,7 @@ import java.{lang, util}
 
 import org.apache.kafka.clients.ClientResponse
 import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.requests.{RequestHeader, TransactionResult, 
WriteTxnMarkersRequest, WriteTxnMarkersResponse}
 import org.apache.kafka.common.utils.Utils
 import org.easymock.{EasyMock, IAnswer}
@@ -71,7 +71,8 @@ class TransactionMarkerRequestCompletionHandlerTest {
       producerId, producerEpoch, txnResult, coordinatorEpoch, 
Set[TopicPartition](topicPartition)))
     EasyMock.replay(markerChannelManager)
 
-    handler.onComplete(new ClientResponse(new RequestHeader(0, 0, "client", 
1), null, null, 0, 0, true, null, null))
+    handler.onComplete(new ClientResponse(new RequestHeader(ApiKeys.PRODUCE, 
0, "client", 1),
+      null, null, 0, 0, true, null, null))
 
     EasyMock.verify(markerChannelManager)
   }
@@ -84,7 +85,8 @@ class TransactionMarkerRequestCompletionHandlerTest {
     val response = new WriteTxnMarkersResponse(new 
java.util.HashMap[java.lang.Long, java.util.Map[TopicPartition, Errors]]())
 
     try {
-      handler.onComplete(new ClientResponse(new RequestHeader(0, 0, "client", 
1), null, null, 0, 0, false, null, response))
+      handler.onComplete(new ClientResponse(new RequestHeader(ApiKeys.PRODUCE, 
0, "client", 1),
+        null, null, 0, 0, false, null, response))
       fail("should have thrown illegal argument exception")
     } catch {
       case _: IllegalStateException => // ok
@@ -200,8 +202,9 @@ class TransactionMarkerRequestCompletionHandlerTest {
       producerId, producerEpoch, txnResult, coordinatorEpoch, 
Set[TopicPartition](topicPartition)))
     EasyMock.replay(markerChannelManager)
 
-    val response = new WriteTxnMarkersResponse(createPidErrorMap(error))
-    handler.onComplete(new ClientResponse(new RequestHeader(0, 0, "client", 
1), null, null, 0, 0, false, null, response))
+    val response = new WriteTxnMarkersResponse(createProducerIdErrorMap(error))
+    handler.onComplete(new ClientResponse(new RequestHeader(ApiKeys.PRODUCE, 
0, "client", 1),
+      null, null, 0, 0, false, null, response))
 
     assertEquals(txnMetadata.topicPartitions, 
mutable.Set[TopicPartition](topicPartition))
     EasyMock.verify(markerChannelManager)
@@ -210,9 +213,10 @@ class TransactionMarkerRequestCompletionHandlerTest {
   private def verifyThrowIllegalStateExceptionOnError(error: Errors) = {
     mockCache()
 
-    val response = new WriteTxnMarkersResponse(createPidErrorMap(error))
+    val response = new WriteTxnMarkersResponse(createProducerIdErrorMap(error))
     try {
-      handler.onComplete(new ClientResponse(new RequestHeader(0, 0, "client", 
1), null, null, 0, 0, false, null, response))
+      handler.onComplete(new ClientResponse(new RequestHeader(ApiKeys.PRODUCE, 
0, "client", 1),
+        null, null, 0, 0, false, null, response))
       fail("should have thrown illegal state exception")
     } catch {
       case _: IllegalStateException => // ok
@@ -231,8 +235,9 @@ class TransactionMarkerRequestCompletionHandlerTest {
       .once()
     EasyMock.replay(markerChannelManager)
 
-    val response = new WriteTxnMarkersResponse(createPidErrorMap(error))
-    handler.onComplete(new ClientResponse(new RequestHeader(0, 0, "client", 
1), null, null, 0, 0, false, null, response))
+    val response = new WriteTxnMarkersResponse(createProducerIdErrorMap(error))
+    handler.onComplete(new ClientResponse(new RequestHeader(ApiKeys.PRODUCE, 
0, "client", 1),
+      null, null, 0, 0, false, null, response))
 
     assertTrue(txnMetadata.topicPartitions.isEmpty)
     assertTrue(completed)
@@ -250,14 +255,15 @@ class TransactionMarkerRequestCompletionHandlerTest {
       .once()
     EasyMock.replay(markerChannelManager)
 
-    val response = new WriteTxnMarkersResponse(createPidErrorMap(error))
-    handler.onComplete(new ClientResponse(new RequestHeader(0, 0, "client", 
1), null, null, 0, 0, false, null, response))
+    val response = new WriteTxnMarkersResponse(createProducerIdErrorMap(error))
+    handler.onComplete(new ClientResponse(new RequestHeader(ApiKeys.PRODUCE, 
0, "client", 1),
+      null, null, 0, 0, false, null, response))
 
     assertTrue(removed)
   }
 
 
-  private def createPidErrorMap(errors: Errors) = {
+  private def createProducerIdErrorMap(errors: Errors) = {
     val pidMap = new java.util.HashMap[lang.Long, util.Map[TopicPartition, 
Errors]]()
     val errorsMap = new util.HashMap[TopicPartition, Errors]()
     errorsMap.put(topicPartition, errors)

http://git-wip-us.apache.org/repos/asf/kafka/blob/c4d629a0/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala 
b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
index ceeaffc..c005c72 100755
--- a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
+++ b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
@@ -22,7 +22,7 @@ import java.util.Arrays
 
 import kafka.common.KafkaException
 import kafka.server._
-import kafka.utils.{CoreUtils, TestUtils}
+import kafka.utils.TestUtils
 import kafka.zk.ZooKeeperTestHarness
 import org.apache.kafka.common.protocol.SecurityProtocol
 import org.apache.kafka.common.security.auth.KafkaPrincipal

http://git-wip-us.apache.org/repos/asf/kafka/blob/c4d629a0/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala 
b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
index bc0b81a..9c4d08f 100755
--- a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
@@ -24,12 +24,11 @@ import kafka.api.{FetchRequest, FetchRequestBuilder, 
PartitionFetchInfo}
 import kafka.server.{KafkaConfig, KafkaRequestHandler}
 import kafka.producer.{KeyedMessage, Producer}
 import org.apache.log4j.{Level, Logger}
-import kafka.zk.ZooKeeperTestHarness
 import org.junit.Test
 
 import scala.collection._
 import kafka.common.{ErrorMapping, OffsetOutOfRangeException, 
TopicAndPartition, UnknownTopicOrPartitionException}
-import kafka.utils.{CoreUtils, StaticPartitioner, TestUtils}
+import kafka.utils.{StaticPartitioner, TestUtils}
 import kafka.serializer.StringEncoder
 import java.util.Properties
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/c4d629a0/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala 
b/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
index 66103cc..6e9e8ff 100644
--- a/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
@@ -171,7 +171,7 @@ class TopicMetadataTest extends ZooKeeperTestHarness {
       2000, 0).topicsMetadata
     assertEquals(Errors.NONE, topicsMetadata.head.error)
     assertEquals(Errors.NONE, 
topicsMetadata.head.partitionsMetadata.head.error)
-    var partitionMetadata = topicsMetadata.head.partitionsMetadata
+    val partitionMetadata = topicsMetadata.head.partitionsMetadata
     assertEquals("Expecting metadata for 1 partition", 1, 
partitionMetadata.size)
     assertEquals("Expecting partition id to be 0", 0, 
partitionMetadata.head.partitionId)
     assertEquals(1, partitionMetadata.head.replicas.size)

http://git-wip-us.apache.org/repos/asf/kafka/blob/c4d629a0/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
 
b/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
index 2a0525b..ede4ff7 100644
--- 
a/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
+++ 
b/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
@@ -27,7 +27,6 @@ import kafka.javaapi.producer.Producer
 import kafka.utils.IntEncoder
 import kafka.utils.{Logging, TestUtils}
 import kafka.consumer.{KafkaStream, ConsumerConfig}
-import kafka.zk.ZooKeeperTestHarness
 import kafka.common.MessageStreamsExistException
 import org.junit.Test
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/c4d629a0/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala 
b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
index 9a324aa..bb41380 100644
--- a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
@@ -20,7 +20,7 @@ package kafka.log
 import java.io.File
 import java.nio.ByteBuffer
 import java.nio.channels.FileChannel
-import java.nio.file.{OpenOption, StandardOpenOption}
+import java.nio.file.StandardOpenOption
 
 import kafka.server.LogOffsetMetadata
 import kafka.utils.TestUtils

http://git-wip-us.apache.org/repos/asf/kafka/blob/c4d629a0/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala 
b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
index e59ce58..6fbb0c9 100644
--- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
+++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
@@ -20,12 +20,12 @@ package kafka.network
 import java.io._
 import java.net._
 import java.nio.ByteBuffer
-import java.nio.channels.SocketChannel
 import java.util.{HashMap, Random}
 import javax.net.ssl._
 
 import com.yammer.metrics.core.Gauge
 import com.yammer.metrics.{Metrics => YammerMetrics}
+import kafka.network.RequestChannel.SendAction
 import kafka.security.CredentialProvider
 import kafka.server.KafkaConfig
 import kafka.utils.TestUtils
@@ -37,7 +37,7 @@ import org.apache.kafka.common.protocol.{ApiKeys, 
SecurityProtocol}
 import org.apache.kafka.common.record.{MemoryRecords, RecordBatch}
 import org.apache.kafka.common.requests.{AbstractRequest, ProduceRequest, 
RequestHeader}
 import org.apache.kafka.common.security.auth.KafkaPrincipal
-import org.apache.kafka.common.utils.{Time, MockTime}
+import org.apache.kafka.common.utils.{MockTime, Time}
 import org.junit.Assert._
 import org.junit._
 import org.scalatest.junit.JUnitSuite
@@ -89,19 +89,25 @@ class SocketServerTest extends JUnitSuite {
     response
   }
 
+  private def receiveRequest(channel: RequestChannel, timeout: Long = 2000L): 
RequestChannel.Request = {
+    channel.receiveRequest(timeout) match {
+      case request: RequestChannel.Request => request
+      case RequestChannel.ShutdownRequest => fail("Unexpected shutdown 
received")
+      case null => fail("receiveRequest timed out")
+    }
+  }
+
   /* A simple request handler that just echos back the response */
   def processRequest(channel: RequestChannel) {
-    val request = channel.receiveRequest(2000)
-    assertNotNull("receiveRequest timed out", request)
-    processRequest(channel, request)
+    processRequest(channel, receiveRequest(channel))
   }
 
   def processRequest(channel: RequestChannel, request: RequestChannel.Request) 
{
     val byteBuffer = request.body[AbstractRequest].serialize(request.header)
     byteBuffer.rewind()
 
-    val send = new NetworkSend(request.connectionId, byteBuffer)
-    channel.sendResponse(RequestChannel.Response(request, send))
+    val send = new NetworkSend(request.context.connectionId, byteBuffer)
+    channel.sendResponse(new RequestChannel.Response(request, Some(send), 
SendAction))
   }
 
   def connect(s: SocketServer = server, protocol: SecurityProtocol = 
SecurityProtocol.PLAINTEXT) = {
@@ -119,7 +125,6 @@ class SocketServerTest extends JUnitSuite {
   }
 
   private def producerRequestBytes: Array[Byte] = {
-    val apiKey: Short = 0
     val correlationId = -1
     val clientId = ""
     val ackTimeoutMs = 10000
@@ -127,7 +132,7 @@ class SocketServerTest extends JUnitSuite {
 
     val emptyRequest = new 
ProduceRequest.Builder(RecordBatch.CURRENT_MAGIC_VALUE, ack, ackTimeoutMs,
       new HashMap[TopicPartition, MemoryRecords]()).build()
-    val emptyHeader = new RequestHeader(apiKey, emptyRequest.version, 
clientId, correlationId)
+    val emptyHeader = new RequestHeader(ApiKeys.PRODUCE, emptyRequest.version, 
clientId, correlationId)
     val byteBuffer = emptyRequest.serialize(emptyHeader)
     byteBuffer.rewind()
 
@@ -180,7 +185,7 @@ class SocketServerTest extends JUnitSuite {
       sendRequest(plainSocket, serializedBytes)
     plainSocket.close()
     for (_ <- 0 until 10) {
-      val request = server.requestChannel.receiveRequest(2000)
+      val request = receiveRequest(server.requestChannel)
       assertNotNull("receiveRequest timed out", request)
       server.requestChannel.sendResponse(new RequestChannel.Response(request, 
None, RequestChannel.NoOpAction))
     }
@@ -193,10 +198,10 @@ class SocketServerTest extends JUnitSuite {
 
     val requests = sockets.map{socket =>
       sendRequest(socket, serializedBytes)
-      server.requestChannel.receiveRequest(2000)
+      receiveRequest(server.requestChannel)
     }
     requests.zipWithIndex.foreach { case (request, i) =>
-      val index = request.connectionId.split("-").last
+      val index = request.context.connectionId.split("-").last
       assertEquals(i.toString, index)
     }
 
@@ -212,9 +217,9 @@ class SocketServerTest extends JUnitSuite {
     val overrideServer = new SocketServer(KafkaConfig.fromProps(props), 
serverMetrics, time, credentialProvider)
 
     def openChannel(request: RequestChannel.Request): Option[KafkaChannel] =
-      overrideServer.processor(request.processor).channel(request.connectionId)
+      
overrideServer.processor(request.processor).channel(request.context.connectionId)
     def openOrClosingChannel(request: RequestChannel.Request): 
Option[KafkaChannel] =
-      
overrideServer.processor(request.processor).openOrClosingChannel(request.connectionId)
+      
overrideServer.processor(request.processor).openOrClosingChannel(request.context.connectionId)
 
     try {
       overrideServer.startup()
@@ -223,7 +228,7 @@ class SocketServerTest extends JUnitSuite {
       // Connection with no staged receives
       val socket1 = connect(overrideServer, protocol = 
SecurityProtocol.PLAINTEXT)
       sendRequest(socket1, serializedBytes)
-      val request1 = overrideServer.requestChannel.receiveRequest(2000)
+      val request1 = receiveRequest(overrideServer.requestChannel)
       assertTrue("Channel not open", openChannel(request1).nonEmpty)
       assertEquals(openChannel(request1), openOrClosingChannel(request1))
 
@@ -317,10 +322,10 @@ class SocketServerTest extends JUnitSuite {
     def sendTwoRequestsReceiveOne(): RequestChannel.Request = {
       sendRequest(socket, requestBytes, flush = false)
       sendRequest(socket, requestBytes, flush = true)
-      server.requestChannel.receiveRequest(2000)
+      receiveRequest(server.requestChannel)
     }
     val (request, hasStagedReceives) = 
TestUtils.computeUntilTrue(sendTwoRequestsReceiveOne()) { req =>
-      val connectionId = req.connectionId
+      val connectionId = req.context.connectionId
       val hasStagedReceives = 
server.processor(0).numStagedReceives(connectionId) > 0
       if (!hasStagedReceives) {
         processRequest(server.requestChannel, req)
@@ -347,7 +352,7 @@ class SocketServerTest extends JUnitSuite {
     // the following sleep is necessary to reliably detect the connection 
close when we send data below
     Thread.sleep(200L)
     // make sure the sockets are open
-    server.acceptors.values.map(acceptor => 
assertFalse(acceptor.serverChannel.socket.isClosed))
+    server.acceptors.values.foreach(acceptor => 
assertFalse(acceptor.serverChannel.socket.isClosed))
     // then shutdown the server
     server.shutdown()
 
@@ -376,7 +381,7 @@ class SocketServerTest extends JUnitSuite {
     // now try one more (should fail)
     val conn = connect()
     conn.setSoTimeout(3000)
-    assertEquals(-1, conn.getInputStream().read())
+    assertEquals(-1, conn.getInputStream.read())
     conn.close()
 
     // it should succeed after closing one connection
@@ -437,14 +442,13 @@ class SocketServerTest extends JUnitSuite {
         
overrideServer.boundPort(ListenerName.forSecurityProtocol(SecurityProtocol.SSL))).asInstanceOf[SSLSocket]
       sslSocket.setNeedClientAuth(false)
 
-      val apiKey = ApiKeys.PRODUCE.id
       val correlationId = -1
       val clientId = ""
       val ackTimeoutMs = 10000
       val ack = 0: Short
       val emptyRequest = new 
ProduceRequest.Builder(RecordBatch.CURRENT_MAGIC_VALUE, ack, ackTimeoutMs,
         new HashMap[TopicPartition, MemoryRecords]()).build()
-      val emptyHeader = new RequestHeader(apiKey, emptyRequest.version, 
clientId, correlationId)
+      val emptyHeader = new RequestHeader(ApiKeys.PRODUCE, 
emptyRequest.version, clientId, correlationId)
 
       val byteBuffer = emptyRequest.serialize(emptyHeader)
       byteBuffer.rewind()
@@ -466,7 +470,7 @@ class SocketServerTest extends JUnitSuite {
     val socket = connect()
     val bytes = new Array[Byte](40)
     sendRequest(socket, bytes, Some(0))
-    assertEquals(KafkaPrincipal.ANONYMOUS, 
server.requestChannel.receiveRequest(2000).session.principal)
+    assertEquals(KafkaPrincipal.ANONYMOUS, 
receiveRequest(server.requestChannel).session.principal)
   }
 
   /* Test that we update request metrics if the client closes the connection 
while the broker response is in flight. */
@@ -494,9 +498,9 @@ class SocketServerTest extends JUnitSuite {
       sendRequest(conn, serializedBytes)
 
       val channel = overrideServer.requestChannel
-      val request = channel.receiveRequest(2000)
+      val request = receiveRequest(channel)
 
-      val requestMetrics = 
RequestMetrics.metricsMap(ApiKeys.forId(request.header.apiKey).name)
+      val requestMetrics = 
RequestMetrics.metricsMap(request.header.apiKey.name)
       def totalTimeHistCount(): Long = requestMetrics.totalTimeHist.count
       val expectedTotalTimeCount = totalTimeHistCount() + 1
 
@@ -505,8 +509,8 @@ class SocketServerTest extends JUnitSuite {
       // write. If the buffer is smaller than this, the write is considered 
complete and the disconnection is not
       // detected. If the buffer is larger than 102400 bytes, a second write 
is attempted and it fails with an
       // IOException.
-      val send = new NetworkSend(request.connectionId, 
ByteBuffer.allocate(550000))
-      channel.sendResponse(RequestChannel.Response(request, send))
+      val send = new NetworkSend(request.context.connectionId, 
ByteBuffer.allocate(550000))
+      channel.sendResponse(new RequestChannel.Response(request, Some(send), 
SendAction))
       TestUtils.waitUntilTrue(() => totalTimeHistCount() == 
expectedTotalTimeCount,
         s"request metrics not updated, expected: $expectedTotalTimeCount, 
actual: ${totalTimeHistCount()}")
 
@@ -533,12 +537,12 @@ class SocketServerTest extends JUnitSuite {
       val serializedBytes = producerRequestBytes
       sendRequest(conn, serializedBytes)
       val channel = overrideServer.requestChannel
-      val request = channel.receiveRequest(2000)
+      val request = receiveRequest(channel)
 
-      TestUtils.waitUntilTrue(() => 
overrideServer.processor(request.processor).channel(request.connectionId).isEmpty,
-        s"Idle connection `${request.connectionId}` was not closed by 
selector")
+      TestUtils.waitUntilTrue(() => 
overrideServer.processor(request.processor).channel(request.context.connectionId).isEmpty,
+        s"Idle connection `${request.context.connectionId}` was not closed by 
selector")
 
-      val requestMetrics = 
RequestMetrics.metricsMap(ApiKeys.forId(request.header.apiKey).name)
+      val requestMetrics = 
RequestMetrics.metricsMap(request.header.apiKey.name)
       def totalTimeHistCount(): Long = requestMetrics.totalTimeHist.count
       val expectedTotalTimeCount = totalTimeHistCount() + 1
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/c4d629a0/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala
index b212a74..fffe3a8 100644
--- a/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala
@@ -155,7 +155,7 @@ abstract class BaseRequestTest extends 
KafkaServerTestHarness {
 
   def nextRequestHeader(apiKey: ApiKeys, apiVersion: Short): RequestHeader = {
     correlationId += 1
-    new RequestHeader(apiKey.id, apiVersion, "client-id", correlationId)
+    new RequestHeader(apiKey, apiVersion, "client-id", correlationId)
   }
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/c4d629a0/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
index c5d40f6..329772b 100644
--- a/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
@@ -27,7 +27,7 @@ import org.apache.kafka.clients.producer.{KafkaProducer, 
ProducerRecord}
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.record.{Record, RecordBatch}
-import org.apache.kafka.common.requests.{FetchRequest, FetchResponse, 
IsolationLevel}
+import org.apache.kafka.common.requests.{FetchRequest, FetchResponse}
 import org.apache.kafka.common.serialization.StringSerializer
 import org.junit.Assert._
 import org.junit.Test
@@ -49,7 +49,7 @@ class FetchRequestTest extends BaseRequestTest {
   }
 
   private def createFetchRequest(maxResponseBytes: Int, maxPartitionBytes: 
Int, topicPartitions: Seq[TopicPartition],
-                                 offsetMap: Map[TopicPartition, Long] = 
Map.empty): FetchRequest =
+                                 offsetMap: Map[TopicPartition, Long]): 
FetchRequest =
     FetchRequest.Builder.forConsumer(Int.MaxValue, 0, 
createPartitionMap(maxPartitionBytes, topicPartitions, offsetMap))
       .setMaxBytes(maxResponseBytes).build()
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/c4d629a0/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 6cc3ede..deea586 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -28,7 +28,6 @@ import kafka.coordinator.group.GroupCoordinator
 import kafka.coordinator.transaction.TransactionCoordinator
 import kafka.log.{Log, TimestampOffset}
 import kafka.network.RequestChannel
-import kafka.network.RequestChannel.Session
 import kafka.security.auth.Authorizer
 import kafka.server.QuotaFactory.QuotaManagers
 import kafka.server._
@@ -393,10 +392,13 @@ class KafkaApisTest {
 
   private def buildRequest[T <: AbstractRequest](builder: 
AbstractRequest.Builder[T]): (T, RequestChannel.Request) = {
     val request = builder.build()
-    val header = new RequestHeader(builder.apiKey.id, request.version, "", 0)
-    val buffer = request.serialize(header)
-    val session = Session(KafkaPrincipal.ANONYMOUS, InetAddress.getLocalHost)
-    (request, new RequestChannel.Request(1, "1", session, 0, new 
ListenerName(""), SecurityProtocol.PLAINTEXT,
+    val buffer = request.serialize(new RequestHeader(builder.apiKey, 
request.version, "", 0))
+
+    // read the header from the buffer first so that the body can be read next 
from the Request constructor
+    val header = RequestHeader.parse(buffer)
+    val context = new RequestContext(header, "1", InetAddress.getLocalHost, 
KafkaPrincipal.ANONYMOUS,
+      new ListenerName(""), SecurityProtocol.PLAINTEXT)
+    (request, new RequestChannel.Request(processor = 1, context = context, 
startTimeNanos =  0,
       MemoryPool.NONE, buffer))
   }
 
@@ -408,16 +410,13 @@ class KafkaApisTest {
     channel.buffer.getInt() // read the size
     ResponseHeader.parse(channel.buffer)
     val struct = api.responseSchema(request.version).read(channel.buffer)
-    AbstractResponse.getResponse(api, struct)
+    AbstractResponse.parseResponse(api, struct)
   }
 
   private def expectThrottleCallbackAndInvoke(capturedThrottleCallback: 
Capture[Int => Unit]): Unit = {
     EasyMock.expect(clientRequestQuotaManager.maybeRecordAndThrottle(
-      EasyMock.anyString(),
-      EasyMock.anyString(),
-      EasyMock.anyLong(),
-      EasyMock.capture(capturedThrottleCallback),
-      EasyMock.anyObject[(Long => Unit) => Unit]()))
+      EasyMock.anyObject[RequestChannel.Request](),
+      EasyMock.capture(capturedThrottleCallback)))
       .andAnswer(new IAnswer[Unit] {
         override def answer(): Unit = {
           val callback = capturedThrottleCallback.getValue

http://git-wip-us.apache.org/repos/asf/kafka/blob/c4d629a0/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala 
b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
index 916129e..8786b19 100755
--- a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
@@ -31,7 +31,7 @@ import kafka.utils._
 import kafka.zk.ZooKeeperTestHarness
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.protocol.Errors
-import org.apache.kafka.common.utils.{Time, Utils}
+import org.apache.kafka.common.utils.Time
 import org.easymock.{EasyMock, IAnswer}
 import org.junit.Assert._
 import org.junit.{After, Before, Test}

http://git-wip-us.apache.org/repos/asf/kafka/blob/c4d629a0/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala 
b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
index e1d4d75..9c51a10 100755
--- a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
@@ -23,7 +23,7 @@ import TestUtils._
 import kafka.zk.ZooKeeperTestHarness
 import java.io.File
 
-import kafka.server.checkpoints.{OffsetCheckpoint, OffsetCheckpointFile}
+import kafka.server.checkpoints.OffsetCheckpointFile
 import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.serialization.{IntegerSerializer, 
StringSerializer}
@@ -230,7 +230,7 @@ class LogRecoveryTest extends ZooKeeperTestHarness {
     assertEquals(hw, hwFile2.read.getOrElse(topicPartition, 0L))
   }
 
-  private def sendMessages(n: Int = 1) {
+  private def sendMessages(n: Int) {
     (0 until n).map(_ => producer.send(new ProducerRecord(topic, 0, 
message))).foreach(_.get)
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/c4d629a0/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala 
b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
index 244ef78..bb7e9fe 100755
--- a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
+++ b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
@@ -24,7 +24,7 @@ import kafka.utils._
 import kafka.utils.TestUtils._
 import kafka.zk.ZooKeeperTestHarness
 import org.apache.kafka.common.protocol.Errors
-import org.apache.kafka.common.utils.{Time, Utils}
+import org.apache.kafka.common.utils.Time
 import org.junit.{After, Before, Test}
 import org.junit.Assert._
 import java.util.Properties

http://git-wip-us.apache.org/repos/asf/kafka/blob/c4d629a0/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala
index dd683e1..b8c45a5 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala
@@ -17,9 +17,6 @@
 
 package kafka.server
 
-import java.io.File
-
-import org.apache.kafka.common.protocol.SecurityProtocol
 import org.junit.{After, Before, Test}
 import kafka.zk.ZooKeeperTestHarness
 import kafka.utils.TestUtils

http://git-wip-us.apache.org/repos/asf/kafka/blob/c4d629a0/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala
index e7d2a64..f162492 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala
@@ -30,7 +30,7 @@ import kafka.zk.ZooKeeperTestHarness
 import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
 import org.apache.kafka.common.TopicPartition
 import org.junit.Assert._
-import org.junit.{After, Before, Test}
+import org.junit.{After, Test}
 import scala.collection.JavaConverters._
 
 /**

Reply via email to