Repository: kafka Updated Branches: refs/heads/trunk 07a428e0c -> 27336192f
MINOR: Include response in request log It's implemented such that there is no overhead if request logging is disabled. Also: - Reduce metrics computation duplication in `updateRequestMetrics` - Change a couple of log calls to use string interpolation instead of `format` - Fix a few compiler warnings related to unused imports and unused default arguments. Author: Ismael Juma <ism...@juma.me.uk> Reviewers: Roger Hoover <roger.hoo...@gmail.com>, Jason Gustafson <ja...@confluent.io>, Rajini Sivaram <rajinisiva...@googlemail.com> Closes #3801 from ijuma/log-response-in-request-log Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/27336192 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/27336192 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/27336192 Branch: refs/heads/trunk Commit: 27336192ff62d43d0beabc5dab2855b87befcc55 Parents: 07a428e Author: Ismael Juma <ism...@juma.me.uk> Authored: Fri Sep 8 04:40:24 2017 +0100 Committer: Ismael Juma <ism...@juma.me.uk> Committed: Fri Sep 8 04:40:24 2017 +0100 ---------------------------------------------------------------------- .../ZkNodeChangeNotificationListener.scala | 2 - .../scala/kafka/network/RequestChannel.scala | 93 +++++++++++--------- .../main/scala/kafka/network/SocketServer.scala | 17 ++-- .../src/main/scala/kafka/server/KafkaApis.scala | 9 +- .../kafka/server/KafkaRequestHandler.scala | 4 +- .../kafka/api/ConsumerBounceTest.scala | 2 +- .../kafka/api/EndToEndClusterIdTest.scala | 2 +- .../kafka/api/TransactionsTest.scala | 2 +- .../kafka/log/LogCleanerIntegrationTest.scala | 2 +- .../unit/kafka/log/LogCleanerManagerTest.scala | 4 +- .../scala/unit/kafka/log/LogCleanerTest.scala | 4 +- .../scala/unit/kafka/log/LogSegmentTest.scala | 2 +- .../scala/unit/kafka/log/LogValidatorTest.scala | 4 +- .../unit/kafka/network/SocketServerTest.scala | 6 +- 14 files changed, 83 insertions(+), 70 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/27336192/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala b/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala index 99b5103..b4ee1fd 100644 --- a/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala +++ b/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala @@ -24,8 +24,6 @@ import org.I0Itec.zkclient.exception.ZkInterruptedException import org.I0Itec.zkclient.{IZkChildListener, IZkStateListener} import org.apache.kafka.common.utils.Time -import scala.collection.JavaConverters._ - /** * Handle the notificationMessage. */ http://git-wip-us.apache.org/repos/asf/kafka/blob/27336192/core/src/main/scala/kafka/network/RequestChannel.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala index 1d95fa0..2d6370a 100644 --- a/core/src/main/scala/kafka/network/RequestChannel.scala +++ b/core/src/main/scala/kafka/network/RequestChannel.scala @@ -27,8 +27,8 @@ import kafka.network.RequestChannel.{ShutdownRequest, BaseRequest} import kafka.server.QuotaId import kafka.utils.{Logging, NotNothing} import org.apache.kafka.common.memory.MemoryPool -import org.apache.kafka.common.network.{ListenerName, Send} -import org.apache.kafka.common.protocol.{ApiKeys, Protocol, SecurityProtocol} +import org.apache.kafka.common.network.Send +import org.apache.kafka.common.protocol.{ApiKeys, Protocol} import org.apache.kafka.common.requests._ import org.apache.kafka.common.security.auth.KafkaPrincipal import org.apache.kafka.common.utils.Time @@ -39,6 +39,8 @@ import scala.reflect.ClassTag object RequestChannel extends Logging { private val requestLogger = Logger.getLogger("kafka.request.logger") + def isRequestLoggingEnabled: Boolean = requestLogger.isDebugEnabled + sealed trait BaseRequest case object ShutdownRequest extends BaseRequest @@ -90,7 +92,7 @@ object RequestChannel extends Logging { math.max(apiLocalCompleteTimeNanos - requestDequeueTimeNanos, 0L) } - def updateRequestMetrics(networkThreadTimeNanos: Long) { + def updateRequestMetrics(networkThreadTimeNanos: Long, response: Response) { val endTimeNanos = Time.SYSTEM.nanoseconds // In some corner cases, apiLocalCompleteTimeNanos may not be set when the request completes if the remote // processing time is really small. This value is set in KafkaApis from a request handling thread. @@ -103,15 +105,23 @@ object RequestChannel extends Logging { if (apiRemoteCompleteTimeNanos < 0) apiRemoteCompleteTimeNanos = responseCompleteTimeNanos - def nanosToMs(nanos: Long) = math.max(TimeUnit.NANOSECONDS.toMillis(nanos), 0) + /** + * Converts nanos to millis with micros precision as additional decimal places in the request log have low + * signal to noise ratio. When it comes to metrics, there is little difference either way as we round the value + * to the nearest long. + */ + def nanosToMs(nanos: Long): Double = { + val positiveNanos = math.max(nanos, 0) + TimeUnit.NANOSECONDS.toMicros(positiveNanos).toDouble / TimeUnit.MILLISECONDS.toMicros(1) + } - val requestQueueTime = nanosToMs(requestDequeueTimeNanos - startTimeNanos) - val apiLocalTime = nanosToMs(apiLocalCompleteTimeNanos - requestDequeueTimeNanos) - val apiRemoteTime = nanosToMs(apiRemoteCompleteTimeNanos - apiLocalCompleteTimeNanos) - val apiThrottleTime = nanosToMs(responseCompleteTimeNanos - apiRemoteCompleteTimeNanos) - val responseQueueTime = nanosToMs(responseDequeueTimeNanos - responseCompleteTimeNanos) - val responseSendTime = nanosToMs(endTimeNanos - responseDequeueTimeNanos) - val totalTime = nanosToMs(endTimeNanos - startTimeNanos) + val requestQueueTimeMs = nanosToMs(requestDequeueTimeNanos - startTimeNanos) + val apiLocalTimeMs = nanosToMs(apiLocalCompleteTimeNanos - requestDequeueTimeNanos) + val apiRemoteTimeMs = nanosToMs(apiRemoteCompleteTimeNanos - apiLocalCompleteTimeNanos) + val apiThrottleTimeMs = nanosToMs(responseCompleteTimeNanos - apiRemoteCompleteTimeNanos) + val responseQueueTimeMs = nanosToMs(responseDequeueTimeNanos - responseCompleteTimeNanos) + val responseSendTimeMs = nanosToMs(endTimeNanos - responseDequeueTimeNanos) + val totalTimeMs = nanosToMs(endTimeNanos - startTimeNanos) val fetchMetricNames = if (header.apiKey == ApiKeys.FETCH) { val isFromFollower = body[FetchRequest].isFromFollower @@ -125,13 +135,13 @@ object RequestChannel extends Logging { metricNames.foreach { metricName => val m = RequestMetrics.metricsMap(metricName) m.requestRate.mark() - m.requestQueueTimeHist.update(requestQueueTime) - m.localTimeHist.update(apiLocalTime) - m.remoteTimeHist.update(apiRemoteTime) - m.throttleTimeHist.update(apiThrottleTime) - m.responseQueueTimeHist.update(responseQueueTime) - m.responseSendTimeHist.update(responseSendTime) - m.totalTimeHist.update(totalTime) + m.requestQueueTimeHist.update(Math.round(requestQueueTimeMs)) + m.localTimeHist.update(Math.round(apiLocalTimeMs)) + m.remoteTimeHist.update(Math.round(apiRemoteTimeMs)) + m.throttleTimeHist.update(Math.round(apiThrottleTimeMs)) + m.responseQueueTimeHist.update(Math.round(responseQueueTimeMs)) + m.responseSendTimeHist.update(Math.round(responseSendTimeMs)) + m.totalTimeHist.update(Math.round(totalTimeMs)) } // Records network handler thread usage. This is included towards the request quota for the @@ -142,28 +152,26 @@ object RequestChannel extends Logging { // the total time spent on authentication, which may be significant for SASL/SSL. recordNetworkThreadTimeCallback.foreach(record => record(networkThreadTimeNanos)) - if (requestLogger.isDebugEnabled) { + if (isRequestLoggingEnabled) { val detailsEnabled = requestLogger.isTraceEnabled - def nanosToMs(nanos: Long) = TimeUnit.NANOSECONDS.toMicros(math.max(nanos, 0)).toDouble / TimeUnit.MILLISECONDS.toMicros(1) - val totalTimeMs = nanosToMs(endTimeNanos - startTimeNanos) - val requestQueueTimeMs = nanosToMs(requestDequeueTimeNanos - startTimeNanos) - val apiLocalTimeMs = nanosToMs(apiLocalCompleteTimeNanos - requestDequeueTimeNanos) - val apiRemoteTimeMs = nanosToMs(apiRemoteCompleteTimeNanos - apiLocalCompleteTimeNanos) - val apiThrottleTimeMs = nanosToMs(responseCompleteTimeNanos - apiRemoteCompleteTimeNanos) - val responseQueueTimeMs = nanosToMs(responseDequeueTimeNanos - responseCompleteTimeNanos) - val responseSendTimeMs = nanosToMs(endTimeNanos - responseDequeueTimeNanos) - - requestLogger.debug(s"Completed request:${requestDesc(detailsEnabled)} from connection ${context.connectionId};" + - s"totalTime:$totalTimeMs," + - s"requestQueueTime:$requestQueueTimeMs," + - s"localTime:$apiLocalTimeMs," + - s"remoteTime:$apiRemoteTimeMs," + - s"throttleTime:$apiThrottleTimeMs," + - s"responseQueueTime:$responseQueueTimeMs," + - s"sendTime:$responseSendTimeMs," + - s"securityProtocol:${context.securityProtocol}," + - s"principal:${context.principal}," + - s"listener:${context.listenerName.value}") + val responseString = response.responseAsString.getOrElse( + throw new IllegalStateException("responseAsString should always be defined if request logging is enabled")) + + val builder = new StringBuilder(256) + builder.append("Completed request:").append(requestDesc(detailsEnabled)) + .append(",response:").append(responseString) + .append(" from connection ").append(context.connectionId) + .append(";totalTime:").append(totalTimeMs) + .append(",requestQueueTime:").append(requestQueueTimeMs) + .append(",localTime:").append(apiLocalTimeMs) + .append(",remoteTime:").append(apiRemoteTimeMs) + .append(",throttleTime:").append(apiThrottleTimeMs) + .append(",responseQueueTime:").append(responseQueueTimeMs) + .append(",sendTime:").append(responseSendTimeMs) + .append(",securityProtocol:").append(context.securityProtocol) + .append(",principal:").append(session.principal) + .append(",listener:").append(context.listenerName.value) + requestLogger.debug(builder.toString) } } @@ -183,13 +191,16 @@ object RequestChannel extends Logging { } - class Response(val request: Request, val responseSend: Option[Send], val responseAction: ResponseAction) { + /** responseAsString should only be defined if request logging is enabled */ + class Response(val request: Request, val responseSend: Option[Send], val responseAction: ResponseAction, + val responseAsString: Option[String]) { request.responseCompleteTimeNanos = Time.SYSTEM.nanoseconds if (request.apiLocalCompleteTimeNanos == -1L) request.apiLocalCompleteTimeNanos = Time.SYSTEM.nanoseconds def processor: Int = request.processor - override def toString = s"Response(request=$request, responseSend=$responseSend, responseAction=$responseAction)" + override def toString = + s"Response(request=$request, responseSend=$responseSend, responseAction=$responseAction), responseAsString=$responseAsString" } trait ResponseAction http://git-wip-us.apache.org/repos/asf/kafka/blob/27336192/core/src/main/scala/kafka/network/SocketServer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala index f1bc926..3d54c8a 100644 --- a/core/src/main/scala/kafka/network/SocketServer.scala +++ b/core/src/main/scala/kafka/network/SocketServer.scala @@ -36,8 +36,8 @@ import org.apache.kafka.common.memory.{MemoryPool, SimpleMemoryPool} import org.apache.kafka.common.metrics._ import org.apache.kafka.common.metrics.stats.Rate import org.apache.kafka.common.network.{ChannelBuilders, KafkaChannel, ListenerName, Selectable, Send, Selector => KSelector} +import org.apache.kafka.common.protocol.SecurityProtocol import org.apache.kafka.common.security.auth.KafkaPrincipal -import org.apache.kafka.common.protocol.{ApiKeys, SecurityProtocol} import org.apache.kafka.common.protocol.types.SchemaException import org.apache.kafka.common.requests.{RequestContext, RequestHeader} import org.apache.kafka.common.utils.{KafkaThread, Time} @@ -484,7 +484,7 @@ private[kafka] class Processor(val id: Int, case RequestChannel.NoOpAction => // There is no response to send to the client, we need to read more pipelined requests // that are sitting in the server's socket buffer - updateRequestMetrics(curr.request) + updateRequestMetrics(curr) trace("Socket server received empty response to send, registering for read: " + curr) val channelId = curr.request.context.connectionId if (selector.channel(channelId) != null || selector.closingChannel(channelId) != null) @@ -494,7 +494,7 @@ private[kafka] class Processor(val id: Int, throw new IllegalStateException(s"responseSend must be defined for SendAction, response: $curr")) sendResponse(curr, responseSend) case RequestChannel.CloseConnectionAction => - updateRequestMetrics(curr.request) + updateRequestMetrics(curr) trace("Closing socket connection actively according to the response code.") close(selector, curr.request.context.connectionId) } @@ -511,7 +511,7 @@ private[kafka] class Processor(val id: Int, // `channel` can be None if the connection was closed remotely or if selector closed it for being idle for too long if (channel(connectionId).isEmpty) { warn(s"Attempting to send response via channel for which there is no open connection, connection id $connectionId") - response.request.updateRequestMetrics(0L) + response.request.updateRequestMetrics(0L, response) } // Invoke send for closingChannel as well so that the send is failed and the channel closed properly and // removed from the Selector after discarding any pending staged receives. @@ -561,14 +561,15 @@ private[kafka] class Processor(val id: Int, val resp = inflightResponses.remove(send.destination).getOrElse { throw new IllegalStateException(s"Send for ${send.destination} completed, but not in `inflightResponses`") } - updateRequestMetrics(resp.request) + updateRequestMetrics(resp) selector.unmute(send.destination) } } - private def updateRequestMetrics(request: RequestChannel.Request) { + private def updateRequestMetrics(response: RequestChannel.Response) { + val request = response.request val networkThreadTimeNanos = openOrClosingChannel(request.context.connectionId).fold(0L)(_.getAndResetNetworkThreadTimeNanos()) - request.updateRequestMetrics(networkThreadTimeNanos) + request.updateRequestMetrics(networkThreadTimeNanos, response) } private def processDisconnected() { @@ -576,7 +577,7 @@ private[kafka] class Processor(val id: Int, val remoteHost = ConnectionId.fromString(connectionId).getOrElse { throw new IllegalStateException(s"connectionId has unexpected format: $connectionId") }.remoteHost - inflightResponses.remove(connectionId).foreach(response => updateRequestMetrics(response.request)) + inflightResponses.remove(connectionId).foreach(updateRequestMetrics) // the channel has been closed by the selector but the quotas still need to be updated connectionQuotas.dec(InetAddress.getByName(remoteHost)) } http://git-wip-us.apache.org/repos/asf/kafka/blob/27336192/core/src/main/scala/kafka/server/KafkaApis.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 4fd906f..2c5517f 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -2001,16 +2001,19 @@ class KafkaApis(val requestChannel: RequestChannel, private def closeConnection(request: RequestChannel.Request): Unit = { // This case is used when the request handler has encountered an error, but the client // does not expect a response (e.g. when produce request has acks set to 0) - requestChannel.sendResponse(new RequestChannel.Response(request, None, CloseConnectionAction)) + requestChannel.sendResponse(new RequestChannel.Response(request, None, CloseConnectionAction, None)) } private def sendResponse(request: RequestChannel.Request, responseOpt: Option[AbstractResponse]): Unit = { responseOpt match { case Some(response) => val responseSend = request.context.buildResponse(response) - requestChannel.sendResponse(new RequestChannel.Response(request, Some(responseSend), SendAction)) + val responseString = + if (RequestChannel.isRequestLoggingEnabled) Some(response.toString(request.context.header.apiVersion)) + else None + requestChannel.sendResponse(new RequestChannel.Response(request, Some(responseSend), SendAction, responseString)) case None => - requestChannel.sendResponse(new RequestChannel.Response(request, None, NoOpAction)) + requestChannel.sendResponse(new RequestChannel.Response(request, None, NoOpAction, None)) } } http://git-wip-us.apache.org/repos/asf/kafka/blob/27336192/core/src/main/scala/kafka/server/KafkaRequestHandler.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala index 356ccae..f055762 100755 --- a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala +++ b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala @@ -54,14 +54,14 @@ class KafkaRequestHandler(id: Int, req match { case RequestChannel.ShutdownRequest => - debug("Kafka request handler %d on broker %d received shut down command".format(id, brokerId)) + debug(s"Kafka request handler $id on broker $brokerId received shut down command") latch.countDown() return case request: RequestChannel.Request => try { request.requestDequeueTimeNanos = endTime - trace("Kafka request handler %d on broker %d handling request %s".format(id, brokerId, request)) + trace(s"Kafka request handler $id on broker $brokerId handling request $request") apis.handle(request) } catch { case e: FatalExitError => http://git-wip-us.apache.org/repos/asf/kafka/blob/27336192/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala index 27cafd7..0b42118 100644 --- a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala +++ b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala @@ -177,7 +177,7 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging { }, 2, TimeUnit.SECONDS) consumer.poll(0) - def sendRecords(numRecords: Int, topic: String = this.topic) { + def sendRecords(numRecords: Int, topic: String) { var remainingRecords = numRecords val endTimeMs = System.currentTimeMillis + 20000 while (remainingRecords > 0 && System.currentTimeMillis < endTimeMs) { http://git-wip-us.apache.org/repos/asf/kafka/blob/27336192/core/src/test/scala/integration/kafka/api/EndToEndClusterIdTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/EndToEndClusterIdTest.scala b/core/src/test/scala/integration/kafka/api/EndToEndClusterIdTest.scala index 2cd0df2..7ec2feb 100644 --- a/core/src/test/scala/integration/kafka/api/EndToEndClusterIdTest.scala +++ b/core/src/test/scala/integration/kafka/api/EndToEndClusterIdTest.scala @@ -201,7 +201,7 @@ class EndToEndClusterIdTest extends KafkaServerTestHarness { } private def consumeRecords(consumer: Consumer[Array[Byte], Array[Byte]], - numRecords: Int = 1, + numRecords: Int, startingOffset: Int = 0, topic: String = topic, part: Int = part) { http://git-wip-us.apache.org/repos/asf/kafka/blob/27336192/core/src/test/scala/integration/kafka/api/TransactionsTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala index 760cc39..d978961 100644 --- a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala +++ b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala @@ -529,7 +529,7 @@ class TransactionsTest extends KafkaServerTestHarness { consumer } - private def createReadUncommittedConsumer(group: String = "group") = { + private def createReadUncommittedConsumer(group: String) = { val props = new Properties() props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_uncommitted") props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false") http://git-wip-us.apache.org/repos/asf/kafka/blob/27336192/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala index ff1e8e2..b20622f 100755 --- a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala @@ -268,7 +268,7 @@ class LogCleanerIntegrationTest(compressionCodec: String) extends AbstractLogCle } private def writeDupsSingleMessageSet(numKeys: Int, numDups: Int, log: Log, codec: CompressionType, - startKey: Int = 0, magicValue: Byte = RecordBatch.CURRENT_MAGIC_VALUE): Seq[(Int, String, Long)] = { + startKey: Int = 0, magicValue: Byte): Seq[(Int, String, Long)] = { val kvs = for (_ <- 0 until numDups; key <- startKey until (startKey + numKeys)) yield { val payload = counter.toString counter += 1 http://git-wip-us.apache.org/repos/asf/kafka/blob/27336192/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala index c9f5441..1146029 100644 --- a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala @@ -222,7 +222,7 @@ class LogCleanerManagerTest extends JUnitSuite with Logging { cleanerManager } - private def createLog(segmentSize: Int, cleanupPolicy: String = "delete"): Log = { + private def createLog(segmentSize: Int, cleanupPolicy: String): Log = { val logProps = new Properties() logProps.put(LogConfig.SegmentBytesProp, segmentSize: Integer) logProps.put(LogConfig.RetentionMsProp, 1: Integer) @@ -243,7 +243,7 @@ class LogCleanerManagerTest extends JUnitSuite with Logging { log } - private def makeLog(dir: File = logDir, config: LogConfig = logConfig) = + private def makeLog(dir: File = logDir, config: LogConfig) = Log(dir = dir, config = config, logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, time = time, brokerTopicStats = new BrokerTopicStats, maxProducerIdExpirationMs = 60 * 60 * 1000, producerIdExpirationCheckIntervalMs = LogManager.ProducerIdExpirationCheckIntervalMs, http://git-wip-us.apache.org/repos/asf/kafka/blob/27336192/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala index c29ece5..517e876 100755 --- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala @@ -1270,12 +1270,12 @@ class LogCleanerTest extends JUnitSuite { partitionLeaderEpoch, new SimpleRecord(key.toString.getBytes, value.toString.getBytes)) } - private def appendTransactionalAsLeader(log: Log, producerId: Long, producerEpoch: Short = 0): Seq[Int] => LogAppendInfo = { + private def appendTransactionalAsLeader(log: Log, producerId: Long, producerEpoch: Short): Seq[Int] => LogAppendInfo = { appendIdempotentAsLeader(log, producerId, producerEpoch, isTransactional = true) } private def appendIdempotentAsLeader(log: Log, producerId: Long, - producerEpoch: Short = 0, + producerEpoch: Short, isTransactional: Boolean = false): Seq[Int] => LogAppendInfo = { var sequence = 0 keys: Seq[Int] => { http://git-wip-us.apache.org/repos/asf/kafka/blob/27336192/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala index 30ccc8b..2a07532 100644 --- a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala @@ -333,7 +333,7 @@ class LogSegmentTest { private def endTxnRecords(controlRecordType: ControlRecordType, producerId: Long, producerEpoch: Short, - offset: Long = 0L, + offset: Long, partitionLeaderEpoch: Int = 0, coordinatorEpoch: Int = 0, timestamp: Long = RecordBatch.NO_TIMESTAMP): MemoryRecords = { http://git-wip-us.apache.org/repos/asf/kafka/blob/27336192/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala index a5d415e..96f7bfc 100644 --- a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala @@ -947,9 +947,9 @@ class LogValidatorTest { isFromClient = true) } - private def createRecords(magicValue: Byte = RecordBatch.CURRENT_MAGIC_VALUE, + private def createRecords(magicValue: Byte, timestamp: Long = RecordBatch.NO_TIMESTAMP, - codec: CompressionType = CompressionType.NONE): MemoryRecords = { + codec: CompressionType): MemoryRecords = { val buf = ByteBuffer.allocate(512) val builder = MemoryRecords.builder(buf, magicValue, codec, TimestampType.CREATE_TIME, 0L) builder.appendWithOffset(0, timestamp, null, "hello".getBytes) http://git-wip-us.apache.org/repos/asf/kafka/blob/27336192/core/src/test/scala/unit/kafka/network/SocketServerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala index 6fbb0c9..9b32635 100644 --- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala +++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala @@ -107,7 +107,7 @@ class SocketServerTest extends JUnitSuite { byteBuffer.rewind() val send = new NetworkSend(request.context.connectionId, byteBuffer) - channel.sendResponse(new RequestChannel.Response(request, Some(send), SendAction)) + channel.sendResponse(new RequestChannel.Response(request, Some(send), SendAction, None)) } def connect(s: SocketServer = server, protocol: SecurityProtocol = SecurityProtocol.PLAINTEXT) = { @@ -187,7 +187,7 @@ class SocketServerTest extends JUnitSuite { for (_ <- 0 until 10) { val request = receiveRequest(server.requestChannel) assertNotNull("receiveRequest timed out", request) - server.requestChannel.sendResponse(new RequestChannel.Response(request, None, RequestChannel.NoOpAction)) + server.requestChannel.sendResponse(new RequestChannel.Response(request, None, RequestChannel.NoOpAction, None)) } } @@ -510,7 +510,7 @@ class SocketServerTest extends JUnitSuite { // detected. If the buffer is larger than 102400 bytes, a second write is attempted and it fails with an // IOException. val send = new NetworkSend(request.context.connectionId, ByteBuffer.allocate(550000)) - channel.sendResponse(new RequestChannel.Response(request, Some(send), SendAction)) + channel.sendResponse(new RequestChannel.Response(request, Some(send), SendAction, None)) TestUtils.waitUntilTrue(() => totalTimeHistCount() == expectedTotalTimeCount, s"request metrics not updated, expected: $expectedTotalTimeCount, actual: ${totalTimeHistCount()}")