Repository: kafka
Updated Branches:
  refs/heads/trunk 07a428e0c -> 27336192f


MINOR: Include response in request log

It's implemented such that there is no overhead if request logging is
disabled.

Also:
- Reduce metrics computation duplication in `updateRequestMetrics`
- Change a couple of log calls to use string interpolation instead of `format`
- Fix a few compiler warnings related to unused imports and unused default
arguments.

Author: Ismael Juma <ism...@juma.me.uk>

Reviewers: Roger Hoover <roger.hoo...@gmail.com>, Jason Gustafson 
<ja...@confluent.io>, Rajini Sivaram <rajinisiva...@googlemail.com>

Closes #3801 from ijuma/log-response-in-request-log


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/27336192
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/27336192
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/27336192

Branch: refs/heads/trunk
Commit: 27336192ff62d43d0beabc5dab2855b87befcc55
Parents: 07a428e
Author: Ismael Juma <ism...@juma.me.uk>
Authored: Fri Sep 8 04:40:24 2017 +0100
Committer: Ismael Juma <ism...@juma.me.uk>
Committed: Fri Sep 8 04:40:24 2017 +0100

----------------------------------------------------------------------
 .../ZkNodeChangeNotificationListener.scala      |  2 -
 .../scala/kafka/network/RequestChannel.scala    | 93 +++++++++++---------
 .../main/scala/kafka/network/SocketServer.scala | 17 ++--
 .../src/main/scala/kafka/server/KafkaApis.scala |  9 +-
 .../kafka/server/KafkaRequestHandler.scala      |  4 +-
 .../kafka/api/ConsumerBounceTest.scala          |  2 +-
 .../kafka/api/EndToEndClusterIdTest.scala       |  2 +-
 .../kafka/api/TransactionsTest.scala            |  2 +-
 .../kafka/log/LogCleanerIntegrationTest.scala   |  2 +-
 .../unit/kafka/log/LogCleanerManagerTest.scala  |  4 +-
 .../scala/unit/kafka/log/LogCleanerTest.scala   |  4 +-
 .../scala/unit/kafka/log/LogSegmentTest.scala   |  2 +-
 .../scala/unit/kafka/log/LogValidatorTest.scala |  4 +-
 .../unit/kafka/network/SocketServerTest.scala   |  6 +-
 14 files changed, 83 insertions(+), 70 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/27336192/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala 
b/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala
index 99b5103..b4ee1fd 100644
--- a/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala
+++ b/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala
@@ -24,8 +24,6 @@ import org.I0Itec.zkclient.exception.ZkInterruptedException
 import org.I0Itec.zkclient.{IZkChildListener, IZkStateListener}
 import org.apache.kafka.common.utils.Time
 
-import scala.collection.JavaConverters._
-
 /**
  * Handle the notificationMessage.
  */

http://git-wip-us.apache.org/repos/asf/kafka/blob/27336192/core/src/main/scala/kafka/network/RequestChannel.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala 
b/core/src/main/scala/kafka/network/RequestChannel.scala
index 1d95fa0..2d6370a 100644
--- a/core/src/main/scala/kafka/network/RequestChannel.scala
+++ b/core/src/main/scala/kafka/network/RequestChannel.scala
@@ -27,8 +27,8 @@ import kafka.network.RequestChannel.{ShutdownRequest, 
BaseRequest}
 import kafka.server.QuotaId
 import kafka.utils.{Logging, NotNothing}
 import org.apache.kafka.common.memory.MemoryPool
-import org.apache.kafka.common.network.{ListenerName, Send}
-import org.apache.kafka.common.protocol.{ApiKeys, Protocol, SecurityProtocol}
+import org.apache.kafka.common.network.Send
+import org.apache.kafka.common.protocol.{ApiKeys, Protocol}
 import org.apache.kafka.common.requests._
 import org.apache.kafka.common.security.auth.KafkaPrincipal
 import org.apache.kafka.common.utils.Time
@@ -39,6 +39,8 @@ import scala.reflect.ClassTag
 object RequestChannel extends Logging {
   private val requestLogger = Logger.getLogger("kafka.request.logger")
 
+  def isRequestLoggingEnabled: Boolean = requestLogger.isDebugEnabled
+
   sealed trait BaseRequest
   case object ShutdownRequest extends BaseRequest
 
@@ -90,7 +92,7 @@ object RequestChannel extends Logging {
       math.max(apiLocalCompleteTimeNanos - requestDequeueTimeNanos, 0L)
     }
 
-    def updateRequestMetrics(networkThreadTimeNanos: Long) {
+    def updateRequestMetrics(networkThreadTimeNanos: Long, response: Response) 
{
       val endTimeNanos = Time.SYSTEM.nanoseconds
       // In some corner cases, apiLocalCompleteTimeNanos may not be set when 
the request completes if the remote
       // processing time is really small. This value is set in KafkaApis from 
a request handling thread.
@@ -103,15 +105,23 @@ object RequestChannel extends Logging {
       if (apiRemoteCompleteTimeNanos < 0)
         apiRemoteCompleteTimeNanos = responseCompleteTimeNanos
 
-      def nanosToMs(nanos: Long) = 
math.max(TimeUnit.NANOSECONDS.toMillis(nanos), 0)
+      /**
+       * Converts nanos to millis with micros precision as additional decimal 
places in the request log have low
+       * signal to noise ratio. When it comes to metrics, there is little 
difference either way as we round the value
+       * to the nearest long.
+       */
+      def nanosToMs(nanos: Long): Double = {
+        val positiveNanos = math.max(nanos, 0)
+        TimeUnit.NANOSECONDS.toMicros(positiveNanos).toDouble / 
TimeUnit.MILLISECONDS.toMicros(1)
+      }
 
-      val requestQueueTime = nanosToMs(requestDequeueTimeNanos - 
startTimeNanos)
-      val apiLocalTime = nanosToMs(apiLocalCompleteTimeNanos - 
requestDequeueTimeNanos)
-      val apiRemoteTime = nanosToMs(apiRemoteCompleteTimeNanos - 
apiLocalCompleteTimeNanos)
-      val apiThrottleTime = nanosToMs(responseCompleteTimeNanos - 
apiRemoteCompleteTimeNanos)
-      val responseQueueTime = nanosToMs(responseDequeueTimeNanos - 
responseCompleteTimeNanos)
-      val responseSendTime = nanosToMs(endTimeNanos - responseDequeueTimeNanos)
-      val totalTime = nanosToMs(endTimeNanos - startTimeNanos)
+      val requestQueueTimeMs = nanosToMs(requestDequeueTimeNanos - 
startTimeNanos)
+      val apiLocalTimeMs = nanosToMs(apiLocalCompleteTimeNanos - 
requestDequeueTimeNanos)
+      val apiRemoteTimeMs = nanosToMs(apiRemoteCompleteTimeNanos - 
apiLocalCompleteTimeNanos)
+      val apiThrottleTimeMs = nanosToMs(responseCompleteTimeNanos - 
apiRemoteCompleteTimeNanos)
+      val responseQueueTimeMs = nanosToMs(responseDequeueTimeNanos - 
responseCompleteTimeNanos)
+      val responseSendTimeMs = nanosToMs(endTimeNanos - 
responseDequeueTimeNanos)
+      val totalTimeMs = nanosToMs(endTimeNanos - startTimeNanos)
       val fetchMetricNames =
         if (header.apiKey == ApiKeys.FETCH) {
           val isFromFollower = body[FetchRequest].isFromFollower
@@ -125,13 +135,13 @@ object RequestChannel extends Logging {
       metricNames.foreach { metricName =>
         val m = RequestMetrics.metricsMap(metricName)
         m.requestRate.mark()
-        m.requestQueueTimeHist.update(requestQueueTime)
-        m.localTimeHist.update(apiLocalTime)
-        m.remoteTimeHist.update(apiRemoteTime)
-        m.throttleTimeHist.update(apiThrottleTime)
-        m.responseQueueTimeHist.update(responseQueueTime)
-        m.responseSendTimeHist.update(responseSendTime)
-        m.totalTimeHist.update(totalTime)
+        m.requestQueueTimeHist.update(Math.round(requestQueueTimeMs))
+        m.localTimeHist.update(Math.round(apiLocalTimeMs))
+        m.remoteTimeHist.update(Math.round(apiRemoteTimeMs))
+        m.throttleTimeHist.update(Math.round(apiThrottleTimeMs))
+        m.responseQueueTimeHist.update(Math.round(responseQueueTimeMs))
+        m.responseSendTimeHist.update(Math.round(responseSendTimeMs))
+        m.totalTimeHist.update(Math.round(totalTimeMs))
       }
 
       // Records network handler thread usage. This is included towards the 
request quota for the
@@ -142,28 +152,26 @@ object RequestChannel extends Logging {
       // the total time spent on authentication, which may be significant for 
SASL/SSL.
       recordNetworkThreadTimeCallback.foreach(record => 
record(networkThreadTimeNanos))
 
-      if (requestLogger.isDebugEnabled) {
+      if (isRequestLoggingEnabled) {
         val detailsEnabled = requestLogger.isTraceEnabled
-        def nanosToMs(nanos: Long) = 
TimeUnit.NANOSECONDS.toMicros(math.max(nanos, 0)).toDouble / 
TimeUnit.MILLISECONDS.toMicros(1)
-        val totalTimeMs = nanosToMs(endTimeNanos - startTimeNanos)
-        val requestQueueTimeMs = nanosToMs(requestDequeueTimeNanos - 
startTimeNanos)
-        val apiLocalTimeMs = nanosToMs(apiLocalCompleteTimeNanos - 
requestDequeueTimeNanos)
-        val apiRemoteTimeMs = nanosToMs(apiRemoteCompleteTimeNanos - 
apiLocalCompleteTimeNanos)
-        val apiThrottleTimeMs = nanosToMs(responseCompleteTimeNanos - 
apiRemoteCompleteTimeNanos)
-        val responseQueueTimeMs = nanosToMs(responseDequeueTimeNanos - 
responseCompleteTimeNanos)
-        val responseSendTimeMs = nanosToMs(endTimeNanos - 
responseDequeueTimeNanos)
-
-        requestLogger.debug(s"Completed request:${requestDesc(detailsEnabled)} 
from connection ${context.connectionId};" +
-          s"totalTime:$totalTimeMs," +
-          s"requestQueueTime:$requestQueueTimeMs," +
-          s"localTime:$apiLocalTimeMs," +
-          s"remoteTime:$apiRemoteTimeMs," +
-          s"throttleTime:$apiThrottleTimeMs," +
-          s"responseQueueTime:$responseQueueTimeMs," +
-          s"sendTime:$responseSendTimeMs," +
-          s"securityProtocol:${context.securityProtocol}," +
-          s"principal:${context.principal}," +
-          s"listener:${context.listenerName.value}")
+        val responseString = response.responseAsString.getOrElse(
+          throw new IllegalStateException("responseAsString should always be 
defined if request logging is enabled"))
+
+        val builder = new StringBuilder(256)
+        builder.append("Completed 
request:").append(requestDesc(detailsEnabled))
+          .append(",response:").append(responseString)
+          .append(" from connection ").append(context.connectionId)
+          .append(";totalTime:").append(totalTimeMs)
+          .append(",requestQueueTime:").append(requestQueueTimeMs)
+          .append(",localTime:").append(apiLocalTimeMs)
+          .append(",remoteTime:").append(apiRemoteTimeMs)
+          .append(",throttleTime:").append(apiThrottleTimeMs)
+          .append(",responseQueueTime:").append(responseQueueTimeMs)
+          .append(",sendTime:").append(responseSendTimeMs)
+          .append(",securityProtocol:").append(context.securityProtocol)
+          .append(",principal:").append(session.principal)
+          .append(",listener:").append(context.listenerName.value)
+        requestLogger.debug(builder.toString)
       }
     }
 
@@ -183,13 +191,16 @@ object RequestChannel extends Logging {
 
   }
 
-  class Response(val request: Request, val responseSend: Option[Send], val 
responseAction: ResponseAction) {
+  /** responseAsString should only be defined if request logging is enabled */
+  class Response(val request: Request, val responseSend: Option[Send], val 
responseAction: ResponseAction,
+                 val responseAsString: Option[String]) {
     request.responseCompleteTimeNanos = Time.SYSTEM.nanoseconds
     if (request.apiLocalCompleteTimeNanos == -1L) 
request.apiLocalCompleteTimeNanos = Time.SYSTEM.nanoseconds
 
     def processor: Int = request.processor
 
-    override def toString = s"Response(request=$request, 
responseSend=$responseSend, responseAction=$responseAction)"
+    override def toString =
+      s"Response(request=$request, responseSend=$responseSend, 
responseAction=$responseAction), responseAsString=$responseAsString"
   }
 
   trait ResponseAction

http://git-wip-us.apache.org/repos/asf/kafka/blob/27336192/core/src/main/scala/kafka/network/SocketServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala 
b/core/src/main/scala/kafka/network/SocketServer.scala
index f1bc926..3d54c8a 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -36,8 +36,8 @@ import org.apache.kafka.common.memory.{MemoryPool, 
SimpleMemoryPool}
 import org.apache.kafka.common.metrics._
 import org.apache.kafka.common.metrics.stats.Rate
 import org.apache.kafka.common.network.{ChannelBuilders, KafkaChannel, 
ListenerName, Selectable, Send, Selector => KSelector}
+import org.apache.kafka.common.protocol.SecurityProtocol
 import org.apache.kafka.common.security.auth.KafkaPrincipal
-import org.apache.kafka.common.protocol.{ApiKeys, SecurityProtocol}
 import org.apache.kafka.common.protocol.types.SchemaException
 import org.apache.kafka.common.requests.{RequestContext, RequestHeader}
 import org.apache.kafka.common.utils.{KafkaThread, Time}
@@ -484,7 +484,7 @@ private[kafka] class Processor(val id: Int,
           case RequestChannel.NoOpAction =>
             // There is no response to send to the client, we need to read 
more pipelined requests
             // that are sitting in the server's socket buffer
-            updateRequestMetrics(curr.request)
+            updateRequestMetrics(curr)
             trace("Socket server received empty response to send, registering 
for read: " + curr)
             val channelId = curr.request.context.connectionId
             if (selector.channel(channelId) != null || 
selector.closingChannel(channelId) != null)
@@ -494,7 +494,7 @@ private[kafka] class Processor(val id: Int,
               throw new IllegalStateException(s"responseSend must be defined 
for SendAction, response: $curr"))
             sendResponse(curr, responseSend)
           case RequestChannel.CloseConnectionAction =>
-            updateRequestMetrics(curr.request)
+            updateRequestMetrics(curr)
             trace("Closing socket connection actively according to the 
response code.")
             close(selector, curr.request.context.connectionId)
         }
@@ -511,7 +511,7 @@ private[kafka] class Processor(val id: Int,
     // `channel` can be None if the connection was closed remotely or if 
selector closed it for being idle for too long
     if (channel(connectionId).isEmpty) {
       warn(s"Attempting to send response via channel for which there is no 
open connection, connection id $connectionId")
-      response.request.updateRequestMetrics(0L)
+      response.request.updateRequestMetrics(0L, response)
     }
     // Invoke send for closingChannel as well so that the send is failed and 
the channel closed properly and
     // removed from the Selector after discarding any pending staged receives.
@@ -561,14 +561,15 @@ private[kafka] class Processor(val id: Int,
       val resp = inflightResponses.remove(send.destination).getOrElse {
         throw new IllegalStateException(s"Send for ${send.destination} 
completed, but not in `inflightResponses`")
       }
-      updateRequestMetrics(resp.request)
+      updateRequestMetrics(resp)
       selector.unmute(send.destination)
     }
   }
 
-  private def updateRequestMetrics(request: RequestChannel.Request) {
+  private def updateRequestMetrics(response: RequestChannel.Response) {
+    val request = response.request
     val networkThreadTimeNanos = 
openOrClosingChannel(request.context.connectionId).fold(0L)(_.getAndResetNetworkThreadTimeNanos())
-    request.updateRequestMetrics(networkThreadTimeNanos)
+    request.updateRequestMetrics(networkThreadTimeNanos, response)
   }
 
   private def processDisconnected() {
@@ -576,7 +577,7 @@ private[kafka] class Processor(val id: Int,
       val remoteHost = ConnectionId.fromString(connectionId).getOrElse {
         throw new IllegalStateException(s"connectionId has unexpected format: 
$connectionId")
       }.remoteHost
-      inflightResponses.remove(connectionId).foreach(response => 
updateRequestMetrics(response.request))
+      inflightResponses.remove(connectionId).foreach(updateRequestMetrics)
       // the channel has been closed by the selector but the quotas still need 
to be updated
       connectionQuotas.dec(InetAddress.getByName(remoteHost))
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/27336192/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala 
b/core/src/main/scala/kafka/server/KafkaApis.scala
index 4fd906f..2c5517f 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -2001,16 +2001,19 @@ class KafkaApis(val requestChannel: RequestChannel,
   private def closeConnection(request: RequestChannel.Request): Unit = {
     // This case is used when the request handler has encountered an error, 
but the client
     // does not expect a response (e.g. when produce request has acks set to 0)
-    requestChannel.sendResponse(new RequestChannel.Response(request, None, 
CloseConnectionAction))
+    requestChannel.sendResponse(new RequestChannel.Response(request, None, 
CloseConnectionAction, None))
   }
 
   private def sendResponse(request: RequestChannel.Request, responseOpt: 
Option[AbstractResponse]): Unit = {
     responseOpt match {
       case Some(response) =>
         val responseSend = request.context.buildResponse(response)
-        requestChannel.sendResponse(new RequestChannel.Response(request, 
Some(responseSend), SendAction))
+        val responseString =
+          if (RequestChannel.isRequestLoggingEnabled) 
Some(response.toString(request.context.header.apiVersion))
+          else None
+        requestChannel.sendResponse(new RequestChannel.Response(request, 
Some(responseSend), SendAction, responseString))
       case None =>
-        requestChannel.sendResponse(new RequestChannel.Response(request, None, 
NoOpAction))
+        requestChannel.sendResponse(new RequestChannel.Response(request, None, 
NoOpAction, None))
     }
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/27336192/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala 
b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
index 356ccae..f055762 100755
--- a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
+++ b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
@@ -54,14 +54,14 @@ class KafkaRequestHandler(id: Int,
 
       req match {
         case RequestChannel.ShutdownRequest =>
-          debug("Kafka request handler %d on broker %d received shut down 
command".format(id, brokerId))
+          debug(s"Kafka request handler $id on broker $brokerId received shut 
down command")
           latch.countDown()
           return
 
         case request: RequestChannel.Request =>
           try {
             request.requestDequeueTimeNanos = endTime
-            trace("Kafka request handler %d on broker %d handling request 
%s".format(id, brokerId, request))
+            trace(s"Kafka request handler $id on broker $brokerId handling 
request $request")
             apis.handle(request)
           } catch {
             case e: FatalExitError =>

http://git-wip-us.apache.org/repos/asf/kafka/blob/27336192/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala 
b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
index 27cafd7..0b42118 100644
--- a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
@@ -177,7 +177,7 @@ class ConsumerBounceTest extends IntegrationTestHarness 
with Logging {
       }, 2, TimeUnit.SECONDS)
     consumer.poll(0)
 
-    def sendRecords(numRecords: Int, topic: String = this.topic) {
+    def sendRecords(numRecords: Int, topic: String) {
       var remainingRecords = numRecords
       val endTimeMs = System.currentTimeMillis + 20000
       while (remainingRecords > 0 && System.currentTimeMillis < endTimeMs) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/27336192/core/src/test/scala/integration/kafka/api/EndToEndClusterIdTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/integration/kafka/api/EndToEndClusterIdTest.scala 
b/core/src/test/scala/integration/kafka/api/EndToEndClusterIdTest.scala
index 2cd0df2..7ec2feb 100644
--- a/core/src/test/scala/integration/kafka/api/EndToEndClusterIdTest.scala
+++ b/core/src/test/scala/integration/kafka/api/EndToEndClusterIdTest.scala
@@ -201,7 +201,7 @@ class EndToEndClusterIdTest extends KafkaServerTestHarness {
   }
 
   private def consumeRecords(consumer: Consumer[Array[Byte], Array[Byte]],
-                             numRecords: Int = 1,
+                             numRecords: Int,
                              startingOffset: Int = 0,
                              topic: String = topic,
                              part: Int = part) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/27336192/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala 
b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
index 760cc39..d978961 100644
--- a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
+++ b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
@@ -529,7 +529,7 @@ class TransactionsTest extends KafkaServerTestHarness {
     consumer
   }
 
-  private def createReadUncommittedConsumer(group: String = "group") = {
+  private def createReadUncommittedConsumer(group: String) = {
     val props = new Properties()
     props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_uncommitted")
     props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")

http://git-wip-us.apache.org/repos/asf/kafka/blob/27336192/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala 
b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
index ff1e8e2..b20622f 100755
--- a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
@@ -268,7 +268,7 @@ class LogCleanerIntegrationTest(compressionCodec: String) 
extends AbstractLogCle
   }
 
   private def writeDupsSingleMessageSet(numKeys: Int, numDups: Int, log: Log, 
codec: CompressionType,
-                                        startKey: Int = 0, magicValue: Byte = 
RecordBatch.CURRENT_MAGIC_VALUE): Seq[(Int, String, Long)] = {
+                                        startKey: Int = 0, magicValue: Byte): 
Seq[(Int, String, Long)] = {
     val kvs = for (_ <- 0 until numDups; key <- startKey until (startKey + 
numKeys)) yield {
       val payload = counter.toString
       counter += 1

http://git-wip-us.apache.org/repos/asf/kafka/blob/27336192/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala 
b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
index c9f5441..1146029 100644
--- a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
@@ -222,7 +222,7 @@ class LogCleanerManagerTest extends JUnitSuite with Logging 
{
     cleanerManager
   }
 
-  private def createLog(segmentSize: Int, cleanupPolicy: String = "delete"): 
Log = {
+  private def createLog(segmentSize: Int, cleanupPolicy: String): Log = {
     val logProps = new Properties()
     logProps.put(LogConfig.SegmentBytesProp, segmentSize: Integer)
     logProps.put(LogConfig.RetentionMsProp, 1: Integer)
@@ -243,7 +243,7 @@ class LogCleanerManagerTest extends JUnitSuite with Logging 
{
     log
   }
 
-  private def makeLog(dir: File = logDir, config: LogConfig = logConfig) =
+  private def makeLog(dir: File = logDir, config: LogConfig) =
     Log(dir = dir, config = config, logStartOffset = 0L, recoveryPoint = 0L, 
scheduler = time.scheduler,
       time = time, brokerTopicStats = new BrokerTopicStats, 
maxProducerIdExpirationMs = 60 * 60 * 1000,
       producerIdExpirationCheckIntervalMs = 
LogManager.ProducerIdExpirationCheckIntervalMs,

http://git-wip-us.apache.org/repos/asf/kafka/blob/27336192/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala 
b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
index c29ece5..517e876 100755
--- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
@@ -1270,12 +1270,12 @@ class LogCleanerTest extends JUnitSuite {
       partitionLeaderEpoch, new SimpleRecord(key.toString.getBytes, 
value.toString.getBytes))
   }
 
-  private def appendTransactionalAsLeader(log: Log, producerId: Long, 
producerEpoch: Short = 0): Seq[Int] => LogAppendInfo = {
+  private def appendTransactionalAsLeader(log: Log, producerId: Long, 
producerEpoch: Short): Seq[Int] => LogAppendInfo = {
     appendIdempotentAsLeader(log, producerId, producerEpoch, isTransactional = 
true)
   }
 
   private def appendIdempotentAsLeader(log: Log, producerId: Long,
-                                       producerEpoch: Short = 0,
+                                       producerEpoch: Short,
                                        isTransactional: Boolean = false): 
Seq[Int] => LogAppendInfo = {
     var sequence = 0
     keys: Seq[Int] => {

http://git-wip-us.apache.org/repos/asf/kafka/blob/27336192/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala 
b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
index 30ccc8b..2a07532 100644
--- a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
@@ -333,7 +333,7 @@ class LogSegmentTest {
   private def endTxnRecords(controlRecordType: ControlRecordType,
                             producerId: Long,
                             producerEpoch: Short,
-                            offset: Long = 0L,
+                            offset: Long,
                             partitionLeaderEpoch: Int = 0,
                             coordinatorEpoch: Int = 0,
                             timestamp: Long = RecordBatch.NO_TIMESTAMP): 
MemoryRecords = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/27336192/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala 
b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala
index a5d415e..96f7bfc 100644
--- a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala
@@ -947,9 +947,9 @@ class LogValidatorTest {
       isFromClient = true)
   }
 
-  private def createRecords(magicValue: Byte = RecordBatch.CURRENT_MAGIC_VALUE,
+  private def createRecords(magicValue: Byte,
                             timestamp: Long = RecordBatch.NO_TIMESTAMP,
-                            codec: CompressionType = CompressionType.NONE): 
MemoryRecords = {
+                            codec: CompressionType): MemoryRecords = {
     val buf = ByteBuffer.allocate(512)
     val builder = MemoryRecords.builder(buf, magicValue, codec, 
TimestampType.CREATE_TIME, 0L)
     builder.appendWithOffset(0, timestamp, null, "hello".getBytes)

http://git-wip-us.apache.org/repos/asf/kafka/blob/27336192/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala 
b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
index 6fbb0c9..9b32635 100644
--- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
+++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
@@ -107,7 +107,7 @@ class SocketServerTest extends JUnitSuite {
     byteBuffer.rewind()
 
     val send = new NetworkSend(request.context.connectionId, byteBuffer)
-    channel.sendResponse(new RequestChannel.Response(request, Some(send), 
SendAction))
+    channel.sendResponse(new RequestChannel.Response(request, Some(send), 
SendAction, None))
   }
 
   def connect(s: SocketServer = server, protocol: SecurityProtocol = 
SecurityProtocol.PLAINTEXT) = {
@@ -187,7 +187,7 @@ class SocketServerTest extends JUnitSuite {
     for (_ <- 0 until 10) {
       val request = receiveRequest(server.requestChannel)
       assertNotNull("receiveRequest timed out", request)
-      server.requestChannel.sendResponse(new RequestChannel.Response(request, 
None, RequestChannel.NoOpAction))
+      server.requestChannel.sendResponse(new RequestChannel.Response(request, 
None, RequestChannel.NoOpAction, None))
     }
   }
 
@@ -510,7 +510,7 @@ class SocketServerTest extends JUnitSuite {
       // detected. If the buffer is larger than 102400 bytes, a second write 
is attempted and it fails with an
       // IOException.
       val send = new NetworkSend(request.context.connectionId, 
ByteBuffer.allocate(550000))
-      channel.sendResponse(new RequestChannel.Response(request, Some(send), 
SendAction))
+      channel.sendResponse(new RequestChannel.Response(request, Some(send), 
SendAction, None))
       TestUtils.waitUntilTrue(() => totalTimeHistCount() == 
expectedTotalTimeCount,
         s"request metrics not updated, expected: $expectedTotalTimeCount, 
actual: ${totalTimeHistCount()}")
 

Reply via email to