http://git-wip-us.apache.org/repos/asf/kafka/blob/d04b0998/core/src/main/scala/kafka/api/FetchRequest.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/api/FetchRequest.scala b/core/src/main/scala/kafka/api/FetchRequest.scala index f74bd1c..f100d4b 100644 --- a/core/src/main/scala/kafka/api/FetchRequest.scala +++ b/core/src/main/scala/kafka/api/FetchRequest.scala @@ -23,19 +23,24 @@ import kafka.common.TopicAndPartition import kafka.consumer.ConsumerConfig import kafka.network.RequestChannel import kafka.message.MessageSet - import java.util.concurrent.atomic.AtomicInteger import java.nio.ByteBuffer + import org.apache.kafka.common.protocol.{ApiKeys, Errors} -import scala.collection.immutable.Map +import scala.collection.mutable.ArrayBuffer +import scala.util.Random case class PartitionFetchInfo(offset: Long, fetchSize: Int) object FetchRequest { - val CurrentVersion = 2.shortValue + + private val random = new Random + + val CurrentVersion = 3.shortValue val DefaultMaxWait = 0 val DefaultMinBytes = 0 + val DefaultMaxBytes = Int.MaxValue val DefaultCorrelationId = 0 def readFrom(buffer: ByteBuffer): FetchRequest = { @@ -45,6 +50,7 @@ object FetchRequest { val replicaId = buffer.getInt val maxWait = buffer.getInt val minBytes = buffer.getInt + val maxBytes = if (versionId < 3) DefaultMaxBytes else buffer.getInt val topicCount = buffer.getInt val pairs = (1 to topicCount).flatMap(_ => { val topic = readShortString(buffer) @@ -56,8 +62,22 @@ object FetchRequest { (TopicAndPartition(topic, partitionId), PartitionFetchInfo(offset, fetchSize)) }) }) - FetchRequest(versionId, correlationId, clientId, replicaId, maxWait, minBytes, Map(pairs:_*)) + FetchRequest(versionId, correlationId, clientId, replicaId, maxWait, minBytes, maxBytes, Vector(pairs:_*)) } + + def shuffle(requestInfo: Seq[(TopicAndPartition, PartitionFetchInfo)]): Seq[(TopicAndPartition, PartitionFetchInfo)] = + random.shuffle(requestInfo) + + def batchByTopic[T](s: Seq[(TopicAndPartition, T)]): Seq[(String, Seq[(Int, T)])] = { + val result = new ArrayBuffer[(String, ArrayBuffer[(Int, T)])] + s.foreach { case (TopicAndPartition(t, p), value) => + if (result.isEmpty || result.last._1 != t) + result += (t -> new ArrayBuffer) + result.last._2 += (p -> value) + } + result + } + } case class FetchRequest(versionId: Short = FetchRequest.CurrentVersion, @@ -66,29 +86,50 @@ case class FetchRequest(versionId: Short = FetchRequest.CurrentVersion, replicaId: Int = Request.OrdinaryConsumerId, maxWait: Int = FetchRequest.DefaultMaxWait, minBytes: Int = FetchRequest.DefaultMinBytes, - requestInfo: Map[TopicAndPartition, PartitionFetchInfo]) + maxBytes: Int = FetchRequest.DefaultMaxBytes, + requestInfo: Seq[(TopicAndPartition, PartitionFetchInfo)]) extends RequestOrResponse(Some(ApiKeys.FETCH.id)) { /** - * Partitions the request info into a map of maps (one for each topic). - */ - lazy val requestInfoGroupedByTopic = requestInfo.groupBy(_._1.topic) + * Partitions the request info into a list of lists (one for each topic) while preserving request info ordering + */ + private type PartitionInfos = Seq[(Int, PartitionFetchInfo)] + private lazy val requestInfoGroupedByTopic: Seq[(String, PartitionInfos)] = FetchRequest.batchByTopic(requestInfo) - /** - * Public constructor for the clients - */ + /** Public constructor for the clients */ + @deprecated("The order of partitions in `requestInfo` is relevant, so this constructor is deprecated in favour of the " + + "one that takes a Seq", since = "0.10.1.0") def this(correlationId: Int, clientId: String, maxWait: Int, minBytes: Int, + maxBytes: Int, requestInfo: Map[TopicAndPartition, PartitionFetchInfo]) { this(versionId = FetchRequest.CurrentVersion, correlationId = correlationId, clientId = clientId, replicaId = Request.OrdinaryConsumerId, maxWait = maxWait, - minBytes= minBytes, - requestInfo = requestInfo) + minBytes = minBytes, + maxBytes = maxBytes, + requestInfo = FetchRequest.shuffle(requestInfo.toSeq)) + } + + /** Public constructor for the clients */ + def this(correlationId: Int, + clientId: String, + maxWait: Int, + minBytes: Int, + maxBytes: Int, + requestInfo: Seq[(TopicAndPartition, PartitionFetchInfo)]) { + this(versionId = FetchRequest.CurrentVersion, + correlationId = correlationId, + clientId = clientId, + replicaId = Request.OrdinaryConsumerId, + maxWait = maxWait, + minBytes = minBytes, + maxBytes = maxBytes, + requestInfo = requestInfo) } def writeTo(buffer: ByteBuffer) { @@ -98,13 +139,15 @@ case class FetchRequest(versionId: Short = FetchRequest.CurrentVersion, buffer.putInt(replicaId) buffer.putInt(maxWait) buffer.putInt(minBytes) + if (versionId >= 3) + buffer.putInt(maxBytes) buffer.putInt(requestInfoGroupedByTopic.size) // topic count requestInfoGroupedByTopic.foreach { case (topic, partitionFetchInfos) => writeShortString(buffer, topic) buffer.putInt(partitionFetchInfos.size) // partition count partitionFetchInfos.foreach { - case (TopicAndPartition(_, partition), PartitionFetchInfo(offset, fetchSize)) => + case (partition, PartitionFetchInfo(offset, fetchSize)) => buffer.putInt(partition) buffer.putLong(offset) buffer.putInt(fetchSize) @@ -119,6 +162,7 @@ case class FetchRequest(versionId: Short = FetchRequest.CurrentVersion, 4 + /* replicaId */ 4 + /* maxWait */ 4 + /* minBytes */ + (if (versionId >= 3) 4 /* maxBytes */ else 0) + 4 + /* topic count */ requestInfoGroupedByTopic.foldLeft(0)((foldedTopics, currTopic) => { val (topic, partitionFetchInfos) = currTopic @@ -165,6 +209,7 @@ case class FetchRequest(versionId: Short = FetchRequest.CurrentVersion, fetchRequest.append("; ReplicaId: " + replicaId) fetchRequest.append("; MaxWait: " + maxWait + " ms") fetchRequest.append("; MinBytes: " + minBytes + " bytes") + fetchRequest.append("; MaxBytes:" + maxBytes + " bytes") if(details) fetchRequest.append("; RequestInfo: " + requestInfo.mkString(",")) fetchRequest.toString() @@ -179,10 +224,11 @@ class FetchRequestBuilder() { private var replicaId = Request.OrdinaryConsumerId private var maxWait = FetchRequest.DefaultMaxWait private var minBytes = FetchRequest.DefaultMinBytes - private val requestMap = new collection.mutable.HashMap[TopicAndPartition, PartitionFetchInfo] + private var maxBytes = FetchRequest.DefaultMaxBytes + private val requestMap = new collection.mutable.ArrayBuffer[(TopicAndPartition, PartitionFetchInfo)] def addFetch(topic: String, partition: Int, offset: Long, fetchSize: Int) = { - requestMap.put(TopicAndPartition(topic, partition), PartitionFetchInfo(offset, fetchSize)) + requestMap.append((TopicAndPartition(topic, partition), PartitionFetchInfo(offset, fetchSize))) this } @@ -209,13 +255,19 @@ class FetchRequestBuilder() { this } + def maxBytes(maxBytes: Int): FetchRequestBuilder = { + this.maxBytes = maxBytes + this + } + def requestVersion(versionId: Short): FetchRequestBuilder = { this.versionId = versionId this } def build() = { - val fetchRequest = FetchRequest(versionId, correlationId.getAndIncrement, clientId, replicaId, maxWait, minBytes, requestMap.toMap) + val fetchRequest = FetchRequest(versionId, correlationId.getAndIncrement, clientId, replicaId, maxWait, minBytes, + maxBytes, new ArrayBuffer() ++ requestMap) requestMap.clear() fetchRequest }
http://git-wip-us.apache.org/repos/asf/kafka/blob/d04b0998/core/src/main/scala/kafka/api/FetchResponse.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/api/FetchResponse.scala b/core/src/main/scala/kafka/api/FetchResponse.scala index 1066d7f..d99bbcd 100644 --- a/core/src/main/scala/kafka/api/FetchResponse.scala +++ b/core/src/main/scala/kafka/api/FetchResponse.scala @@ -28,6 +28,7 @@ import org.apache.kafka.common.network.{Send, MultiSend} import org.apache.kafka.common.protocol.Errors import scala.collection._ +import JavaConverters._ object FetchResponsePartitionData { def readFrom(buffer: ByteBuffer): FetchResponsePartitionData = { @@ -100,7 +101,7 @@ object TopicData { val partitionData = FetchResponsePartitionData.readFrom(buffer) (partitionId, partitionData) }) - TopicData(topic, Map(topicPartitionDataPairs:_*)) + TopicData(topic, Seq(topicPartitionDataPairs:_*)) } def headerSize(topic: String) = @@ -108,9 +109,11 @@ object TopicData { 4 /* partition count */ } -case class TopicData(topic: String, partitionData: Map[Int, FetchResponsePartitionData]) { +case class TopicData(topic: String, partitionData: Seq[(Int, FetchResponsePartitionData)]) { val sizeInBytes = - TopicData.headerSize(topic) + partitionData.values.foldLeft(0)(_ + _.sizeInBytes + 4) + TopicData.headerSize(topic) + partitionData.foldLeft(0)((folded, data) => { + folded + data._2.sizeInBytes + 4 + } /*_ + _.sizeInBytes + 4*/) val headerSize = TopicData.headerSize(topic) } @@ -168,14 +171,18 @@ object FetchResponse { val topicCount = buffer.getInt val pairs = (1 to topicCount).flatMap(_ => { val topicData = TopicData.readFrom(buffer) - topicData.partitionData.map { - case (partitionId, partitionData) => - (TopicAndPartition(topicData.topic, partitionId), partitionData) + topicData.partitionData.map { case (partitionId, partitionData) => + (TopicAndPartition(topicData.topic, partitionId), partitionData) } }) - FetchResponse(correlationId, Map(pairs:_*), requestVersion, throttleTime) + FetchResponse(correlationId, Vector(pairs:_*), requestVersion, throttleTime) } + type FetchResponseEntry = (Int, FetchResponsePartitionData) + + def batchByTopic(data: Seq[(TopicAndPartition, FetchResponsePartitionData)]): Seq[(String, Seq[FetchResponseEntry])] = + FetchRequest.batchByTopic(data) + // Returns the size of the response header def headerSize(requestVersion: Int): Int = { val throttleTimeSize = if (requestVersion > 0) 4 else 0 @@ -185,12 +192,12 @@ object FetchResponse { } // Returns the size of entire fetch response in bytes (including the header size) - def responseSize(dataGroupedByTopic: Map[String, Map[TopicAndPartition, FetchResponsePartitionData]], + def responseSize(dataGroupedByTopic: Seq[(String, Seq[FetchResponseEntry])], requestVersion: Int): Int = { headerSize(requestVersion) + dataGroupedByTopic.foldLeft(0) { case (folded, (topic, partitionDataMap)) => val topicData = TopicData(topic, partitionDataMap.map { - case (topicAndPartition, partitionData) => (topicAndPartition.partition, partitionData) + case (partitionId, partitionData) => (partitionId, partitionData) }) folded + topicData.sizeInBytes } @@ -198,7 +205,7 @@ object FetchResponse { } case class FetchResponse(correlationId: Int, - data: Map[TopicAndPartition, FetchResponsePartitionData], + data: Seq[(TopicAndPartition, FetchResponsePartitionData)], requestVersion: Int = 0, throttleTimeMs: Int = 0) extends RequestOrResponse() { @@ -206,7 +213,8 @@ case class FetchResponse(correlationId: Int, /** * Partitions the data into a map of maps (one for each topic). */ - lazy val dataGroupedByTopic = data.groupBy{ case (topicAndPartition, fetchData) => topicAndPartition.topic } + private lazy val dataByTopicAndPartition = data.toMap + lazy val dataGroupedByTopic = FetchResponse.batchByTopic(data) val headerSizeInBytes = FetchResponse.headerSize(requestVersion) lazy val sizeInBytes = FetchResponse.responseSize(dataGroupedByTopic, requestVersion) @@ -234,7 +242,7 @@ case class FetchResponse(correlationId: Int, private def partitionDataFor(topic: String, partition: Int): FetchResponsePartitionData = { val topicAndPartition = TopicAndPartition(topic, partition) - data.get(topicAndPartition) match { + dataByTopicAndPartition.get(topicAndPartition) match { case Some(partitionData) => partitionData case _ => throw new IllegalArgumentException( @@ -247,7 +255,7 @@ case class FetchResponse(correlationId: Int, def highWatermark(topic: String, partition: Int) = partitionDataFor(topic, partition).hw - def hasError = data.values.exists(_.error != Errors.NONE.code) + def hasError = dataByTopicAndPartition.values.exists(_.error != Errors.NONE.code) def errorCode(topic: String, partition: Int) = partitionDataFor(topic, partition).error } @@ -274,10 +282,9 @@ class FetchResponseSend(val dest: String, val fetchResponse: FetchResponse) exte fetchResponse.writeHeaderTo(buffer) buffer.rewind() - private val sends = new MultiSend(dest, JavaConversions.seqAsJavaList(fetchResponse.dataGroupedByTopic.toList.map { - case(topic, data) => new TopicDataSend(dest, TopicData(topic, - data.map{case(topicAndPartition, message) => (topicAndPartition.partition, message)})) - })) + private val sends = new MultiSend(dest, fetchResponse.dataGroupedByTopic.map { + case (topic, data) => new TopicDataSend(dest, TopicData(topic, data)): Send + }.asJava) override def writeTo(channel: GatheringByteChannel): Long = { if (completed) http://git-wip-us.apache.org/repos/asf/kafka/blob/d04b0998/core/src/main/scala/kafka/consumer/ConsumerConfig.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/consumer/ConsumerConfig.scala b/core/src/main/scala/kafka/consumer/ConsumerConfig.scala index e1c792d..fd8983c 100644 --- a/core/src/main/scala/kafka/consumer/ConsumerConfig.scala +++ b/core/src/main/scala/kafka/consumer/ConsumerConfig.scala @@ -38,6 +38,7 @@ object ConsumerConfig extends Config { val AutoOffsetReset = OffsetRequest.LargestTimeString val ConsumerTimeoutMs = -1 val MinFetchBytes = 1 + val MaxFetchBytes = 50 * 1024 * 1024 val MaxFetchWaitMs = 100 val MirrorTopicsWhitelist = "" val MirrorTopicsBlacklist = "" @@ -119,7 +120,7 @@ class ConsumerConfig private (val props: VerifiableProperties) extends ZKConfig( /** the socket receive buffer for network requests */ val socketReceiveBufferBytes = props.getInt("socket.receive.buffer.bytes", SocketBufferSize) - /** the number of bytes of messages to attempt to fetch */ + /** the number of bytes of messages to attempt to fetch from each partition */ val fetchMessageMaxBytes = props.getInt("fetch.message.max.bytes", FetchSize) /** the number threads used to fetch data */ @@ -140,6 +141,9 @@ class ConsumerConfig private (val props: VerifiableProperties) extends ZKConfig( /** the minimum amount of data the server should return for a fetch request. If insufficient data is available the request will block */ val fetchMinBytes = props.getInt("fetch.min.bytes", MinFetchBytes) + /** the maximum amount of data the server should return for a fetch request */ + val fetchMaxBytes = props.getInt("fetch.max.bytes", MaxFetchBytes) + /** the maximum amount of time the server will block before answering the fetch request if there isn't sufficient data to immediately satisfy fetch.min.bytes */ val fetchWaitMaxMs = props.getInt("fetch.wait.max.ms", MaxFetchWaitMs) require(fetchWaitMaxMs <= socketTimeoutMs, "socket.timeout.ms should always be at least fetch.wait.max.ms" + http://git-wip-us.apache.org/repos/asf/kafka/blob/d04b0998/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala index e73faf2..5b5fe0d 100755 --- a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala +++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala @@ -17,10 +17,10 @@ package kafka.consumer -import org.I0Itec.zkclient.ZkClient import kafka.server.{BrokerAndInitialOffset, AbstractFetcherThread, AbstractFetcherManager} import kafka.cluster.{BrokerEndPoint, Cluster} import org.apache.kafka.common.protocol.SecurityProtocol +import org.apache.kafka.common.TopicPartition import scala.collection.immutable import collection.mutable.HashMap import scala.collection.mutable @@ -28,7 +28,6 @@ import java.util.concurrent.locks.ReentrantLock import kafka.utils.CoreUtils.inLock import kafka.utils.ZkUtils import kafka.utils.{ShutdownableThread, SystemTime} -import kafka.common.TopicAndPartition import kafka.client.ClientUtils import java.util.concurrent.atomic.AtomicInteger @@ -42,9 +41,9 @@ class ConsumerFetcherManager(private val consumerIdString: String, private val zkUtils : ZkUtils) extends AbstractFetcherManager("ConsumerFetcherManager-%d".format(SystemTime.milliseconds), config.clientId, config.numConsumerFetchers) { - private var partitionMap: immutable.Map[TopicAndPartition, PartitionTopicInfo] = null + private var partitionMap: immutable.Map[TopicPartition, PartitionTopicInfo] = null private var cluster: Cluster = null - private val noLeaderPartitionSet = new mutable.HashSet[TopicAndPartition] + private val noLeaderPartitionSet = new mutable.HashSet[TopicPartition] private val lock = new ReentrantLock private val cond = lock.newCondition() private var leaderFinderThread: ShutdownableThread = null @@ -53,7 +52,7 @@ class ConsumerFetcherManager(private val consumerIdString: String, private class LeaderFinderThread(name: String) extends ShutdownableThread(name) { // thread responsible for adding the fetcher to the right broker when leader is available override def doWork() { - val leaderForPartitionsMap = new HashMap[TopicAndPartition, BrokerEndPoint] + val leaderForPartitionsMap = new HashMap[TopicPartition, BrokerEndPoint] lock.lock() try { while (noLeaderPartitionSet.isEmpty) { @@ -72,7 +71,7 @@ class ConsumerFetcherManager(private val consumerIdString: String, topicsMetadata.foreach { tmd => val topic = tmd.topic tmd.partitionsMetadata.foreach { pmd => - val topicAndPartition = TopicAndPartition(topic, pmd.partitionId) + val topicAndPartition = new TopicPartition(topic, pmd.partitionId) if(pmd.leader.isDefined && noLeaderPartitionSet.contains(topicAndPartition)) { val leaderBroker = pmd.leader.get leaderForPartitionsMap.put(topicAndPartition, leaderBroker) @@ -92,9 +91,8 @@ class ConsumerFetcherManager(private val consumerIdString: String, } try { - addFetcherForPartitions(leaderForPartitionsMap.map{ - case (topicAndPartition, broker) => - topicAndPartition -> BrokerAndInitialOffset(broker, partitionMap(topicAndPartition).getFetchOffset())} + addFetcherForPartitions(leaderForPartitionsMap.map { case (topicPartition, broker) => + topicPartition -> BrokerAndInitialOffset(broker, partitionMap(topicPartition).getFetchOffset())} ) } catch { case t: Throwable => { @@ -125,9 +123,9 @@ class ConsumerFetcherManager(private val consumerIdString: String, leaderFinderThread.start() inLock(lock) { - partitionMap = topicInfos.map(tpi => (TopicAndPartition(tpi.topic, tpi.partitionId), tpi)).toMap + partitionMap = topicInfos.map(tpi => (new TopicPartition(tpi.topic, tpi.partitionId), tpi)).toMap this.cluster = cluster - noLeaderPartitionSet ++= topicInfos.map(tpi => TopicAndPartition(tpi.topic, tpi.partitionId)) + noLeaderPartitionSet ++= topicInfos.map(tpi => new TopicPartition(tpi.topic, tpi.partitionId)) cond.signalAll() } } @@ -154,7 +152,7 @@ class ConsumerFetcherManager(private val consumerIdString: String, info("All connections stopped") } - def addPartitionsWithError(partitionList: Iterable[TopicAndPartition]) { + def addPartitionsWithError(partitionList: Iterable[TopicPartition]) { debug("adding partitions with error %s".format(partitionList)) inLock(lock) { if (partitionMap != null) { @@ -163,4 +161,4 @@ class ConsumerFetcherManager(private val consumerIdString: String, } } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/kafka/blob/d04b0998/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala b/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala index c33122b..c47efb7 100644 --- a/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala +++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala @@ -24,11 +24,12 @@ import kafka.server.{PartitionFetchState, AbstractFetcherThread} import kafka.common.{ErrorMapping, TopicAndPartition} import scala.collection.Map import ConsumerFetcherThread._ +import org.apache.kafka.common.TopicPartition class ConsumerFetcherThread(name: String, val config: ConsumerConfig, sourceBroker: BrokerEndPoint, - partitionMap: Map[TopicAndPartition, PartitionTopicInfo], + partitionMap: Map[TopicPartition, PartitionTopicInfo], val consumerFetcherManager: ConsumerFetcherManager) extends AbstractFetcherThread(name = name, clientId = config.clientId, @@ -65,55 +66,59 @@ class ConsumerFetcherThread(name: String, } // process fetched data - def processPartitionData(topicAndPartition: TopicAndPartition, fetchOffset: Long, partitionData: PartitionData) { - val pti = partitionMap(topicAndPartition) + def processPartitionData(topicPartition: TopicPartition, fetchOffset: Long, partitionData: PartitionData) { + val pti = partitionMap(topicPartition) if (pti.getFetchOffset != fetchOffset) throw new RuntimeException("Offset doesn't match for partition [%s,%d] pti offset: %d fetch offset: %d" - .format(topicAndPartition.topic, topicAndPartition.partition, pti.getFetchOffset, fetchOffset)) + .format(topicPartition.topic, topicPartition.partition, pti.getFetchOffset, fetchOffset)) pti.enqueue(partitionData.underlying.messages.asInstanceOf[ByteBufferMessageSet]) } // handle a partition whose offset is out of range and return a new fetch offset - def handleOffsetOutOfRange(topicAndPartition: TopicAndPartition): Long = { + def handleOffsetOutOfRange(topicPartition: TopicPartition): Long = { val startTimestamp = config.autoOffsetReset match { case OffsetRequest.SmallestTimeString => OffsetRequest.EarliestTime case OffsetRequest.LargestTimeString => OffsetRequest.LatestTime case _ => OffsetRequest.LatestTime } + val topicAndPartition = new TopicAndPartition(topicPartition.topic, topicPartition.partition) val newOffset = simpleConsumer.earliestOrLatestOffset(topicAndPartition, startTimestamp, Request.OrdinaryConsumerId) - val pti = partitionMap(topicAndPartition) + val pti = partitionMap(topicPartition) pti.resetFetchOffset(newOffset) pti.resetConsumeOffset(newOffset) newOffset } // any logic for partitions whose leader has changed - def handlePartitionsWithErrors(partitions: Iterable[TopicAndPartition]) { + def handlePartitionsWithErrors(partitions: Iterable[TopicPartition]) { removePartitions(partitions.toSet) consumerFetcherManager.addPartitionsWithError(partitions) } - protected def buildFetchRequest(partitionMap: collection.Map[TopicAndPartition, PartitionFetchState]): FetchRequest = { - partitionMap.foreach { case ((topicAndPartition, partitionFetchState)) => + protected def buildFetchRequest(partitionMap: collection.Seq[(TopicPartition, PartitionFetchState)]): FetchRequest = { + partitionMap.foreach { case ((topicPartition, partitionFetchState)) => if (partitionFetchState.isActive) - fetchRequestBuilder.addFetch(topicAndPartition.topic, topicAndPartition.partition, partitionFetchState.offset, + fetchRequestBuilder.addFetch(topicPartition.topic, topicPartition.partition, partitionFetchState.offset, fetchSize) } new FetchRequest(fetchRequestBuilder.build()) } - protected def fetch(fetchRequest: FetchRequest): collection.Map[TopicAndPartition, PartitionData] = - simpleConsumer.fetch(fetchRequest.underlying).data.map { case (key, value) => - key -> new PartitionData(value) + protected def fetch(fetchRequest: FetchRequest): Seq[(TopicPartition, PartitionData)] = + simpleConsumer.fetch(fetchRequest.underlying).data.map { case (TopicAndPartition(t, p), value) => + new TopicPartition(t, p) -> new PartitionData(value) } } object ConsumerFetcherThread { class FetchRequest(val underlying: kafka.api.FetchRequest) extends AbstractFetcherThread.FetchRequest { + private lazy val tpToOffset: Map[TopicPartition, Long] = underlying.requestInfo.map { case (tp, fetchInfo) => + new TopicPartition(tp.topic, tp.partition) -> fetchInfo.offset + }.toMap def isEmpty: Boolean = underlying.requestInfo.isEmpty - def offset(topicAndPartition: TopicAndPartition): Long = underlying.requestInfo(topicAndPartition).offset + def offset(topicPartition: TopicPartition): Long = tpToOffset(topicPartition) } class PartitionData(val underlying: FetchResponsePartitionData) extends AbstractFetcherThread.PartitionData { http://git-wip-us.apache.org/repos/asf/kafka/blob/d04b0998/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala index cf8ae91..1dc2a49 100644 --- a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala +++ b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala @@ -410,7 +410,7 @@ class GroupMetadataManager(val brokerId: Int, while (currOffset < getHighWatermark(offsetsPartition) && !shuttingDown.get()) { buffer.clear() - val messages = log.read(currOffset, config.loadBufferSize).messageSet.asInstanceOf[FileMessageSet] + val messages = log.read(currOffset, config.loadBufferSize, minOneMessage = true).messageSet.asInstanceOf[FileMessageSet] messages.readInto(buffer, 0) val messageSet = new ByteBufferMessageSet(buffer) messageSet.foreach { msgAndOffset => http://git-wip-us.apache.org/repos/asf/kafka/blob/d04b0998/core/src/main/scala/kafka/javaapi/FetchRequest.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/javaapi/FetchRequest.scala b/core/src/main/scala/kafka/javaapi/FetchRequest.scala index 4060077..fb9fa8e 100644 --- a/core/src/main/scala/kafka/javaapi/FetchRequest.scala +++ b/core/src/main/scala/kafka/javaapi/FetchRequest.scala @@ -17,32 +17,44 @@ package kafka.javaapi -import java.nio.ByteBuffer +import java.util + import kafka.common.TopicAndPartition -import kafka.api.{Request, PartitionFetchInfo} -import scala.collection.mutable +import kafka.api.{PartitionFetchInfo, Request} + +import scala.collection.JavaConverters._ + +object FetchRequest { + private def seqToLinkedHashMap[K, V](s: Seq[(K, V)]): util.LinkedHashMap[K, V] = { + val map = new util.LinkedHashMap[K, V] + s.foreach { case (k, v) => map.put(k, v) } + map + } +} class FetchRequest(correlationId: Int, clientId: String, maxWait: Int, minBytes: Int, - requestInfo: java.util.Map[TopicAndPartition, PartitionFetchInfo]) { - - val underlying = { - val scalaMap: Map[TopicAndPartition, PartitionFetchInfo] = { - import scala.collection.JavaConversions._ - (requestInfo: mutable.Map[TopicAndPartition, PartitionFetchInfo]).toMap - } - kafka.api.FetchRequest( - correlationId = correlationId, - clientId = clientId, - replicaId = Request.OrdinaryConsumerId, - maxWait = maxWait, - minBytes = minBytes, - requestInfo = scalaMap - ) + requestInfo: util.LinkedHashMap[TopicAndPartition, PartitionFetchInfo]) { + + @deprecated("The order of partitions in `requestInfo` is relevant, so this constructor is deprecated in favour of the " + + "one that takes a LinkedHashMap", since = "0.10.1.0") + def this(correlationId: Int, clientId: String, maxWait: Int, minBytes: Int, + requestInfo: java.util.Map[TopicAndPartition, PartitionFetchInfo]) { + this(correlationId, clientId, maxWait, minBytes, + FetchRequest.seqToLinkedHashMap(kafka.api.FetchRequest.shuffle(requestInfo.asScala.toSeq))) } + val underlying = kafka.api.FetchRequest( + correlationId = correlationId, + clientId = clientId, + replicaId = Request.OrdinaryConsumerId, + maxWait = maxWait, + minBytes = minBytes, + requestInfo = requestInfo.asScala.toBuffer + ) + override def toString = underlying.toString override def equals(other: Any) = canEqual(other) && { http://git-wip-us.apache.org/repos/asf/kafka/blob/d04b0998/core/src/main/scala/kafka/log/FileMessageSet.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/log/FileMessageSet.scala b/core/src/main/scala/kafka/log/FileMessageSet.scala index 5763042..c76653a 100755 --- a/core/src/main/scala/kafka/log/FileMessageSet.scala +++ b/core/src/main/scala/kafka/log/FileMessageSet.scala @@ -128,11 +128,13 @@ class FileMessageSet private[kafka](@volatile var file: File, /** * Search forward for the file position of the last offset that is greater than or equal to the target offset - * and return its physical position. If no such offsets are found, return null. + * and return its physical position and the size of the message (including log overhead) at the returned offset. If + * no such offsets are found, return null. + * * @param targetOffset The offset to search for. * @param startingPosition The starting position in the file to begin searching from. */ - def searchForOffset(targetOffset: Long, startingPosition: Int): OffsetPosition = { + def searchForOffsetWithSize(targetOffset: Long, startingPosition: Int): (OffsetPosition, Int) = { var position = startingPosition val buffer = ByteBuffer.allocate(MessageSet.LogOverhead) val size = sizeInBytes() @@ -144,11 +146,11 @@ class FileMessageSet private[kafka](@volatile var file: File, .format(targetOffset, startingPosition, file.getAbsolutePath)) buffer.rewind() val offset = buffer.getLong() - if(offset >= targetOffset) - return OffsetPosition(offset, position) val messageSize = buffer.getInt() - if(messageSize < Message.MinMessageOverhead) + if (messageSize < Message.MinMessageOverhead) throw new IllegalStateException("Invalid message size: " + messageSize) + if (offset >= targetOffset) + return (OffsetPosition(offset, position), messageSize + MessageSet.LogOverhead) position += MessageSet.LogOverhead + messageSize } null http://git-wip-us.apache.org/repos/asf/kafka/blob/d04b0998/core/src/main/scala/kafka/log/Log.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index 894beab..cfd0472 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -517,11 +517,12 @@ class Log(val dir: File, * @param startOffset The offset to begin reading at * @param maxLength The maximum number of bytes to read * @param maxOffset The offset to read up to, exclusive. (i.e. this offset NOT included in the resulting message set) + * @param minOneMessage If this is true, the first message will be returned even if it exceeds `maxLength` (if one exists) * * @throws OffsetOutOfRangeException If startOffset is beyond the log end offset or before the base offset of the first segment. * @return The fetch data information including fetch starting offset metadata and messages read. */ - def read(startOffset: Long, maxLength: Int, maxOffset: Option[Long] = None): FetchDataInfo = { + def read(startOffset: Long, maxLength: Int, maxOffset: Option[Long] = None, minOneMessage: Boolean = false): FetchDataInfo = { trace("Reading %d bytes from offset %d in log %s of length %d bytes".format(maxLength, startOffset, name, size)) // Because we don't use lock for reading, the synchronization is a little bit tricky. @@ -558,7 +559,7 @@ class Log(val dir: File, entry.getValue.size } } - val fetchInfo = entry.getValue.read(startOffset, maxOffset, maxLength, maxPosition) + val fetchInfo = entry.getValue.read(startOffset, maxOffset, maxLength, maxPosition, minOneMessage) if(fetchInfo == null) { entry = segments.higherEntry(entry.getKey) } else { http://git-wip-us.apache.org/repos/asf/kafka/blob/d04b0998/core/src/main/scala/kafka/log/LogSegment.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala index ccc2472..0eb4330 100755 --- a/core/src/main/scala/kafka/log/LogSegment.scala +++ b/core/src/main/scala/kafka/log/LogSegment.scala @@ -119,12 +119,13 @@ class LogSegment(val log: FileMessageSet, * @param offset The offset we want to translate * @param startingFilePosition A lower bound on the file position from which to begin the search. This is purely an optimization and * when omitted, the search will begin at the position in the offset index. - * @return The position in the log storing the message with the least offset >= the requested offset or null if no message meets this criteria. + * @return The position in the log storing the message with the least offset >= the requested offset and the size of the + * message or null if no message meets this criteria. */ @threadsafe - private[log] def translateOffset(offset: Long, startingFilePosition: Int = 0): OffsetPosition = { + private[log] def translateOffset(offset: Long, startingFilePosition: Int = 0): (OffsetPosition, Int) = { val mapping = index.lookup(offset) - log.searchForOffset(offset, max(mapping.position, startingFilePosition)) + log.searchForOffsetWithSize(offset, max(mapping.position, startingFilePosition)) } /** @@ -135,50 +136,58 @@ class LogSegment(val log: FileMessageSet, * @param maxSize The maximum number of bytes to include in the message set we read * @param maxOffset An optional maximum offset for the message set we read * @param maxPosition The maximum position in the log segment that should be exposed for read + * @param minOneMessage If this is true, the first message will be returned even if it exceeds `maxSize` (if one exists) * * @return The fetched data and the offset metadata of the first message whose offset is >= startOffset, * or null if the startOffset is larger than the largest offset in this log */ @threadsafe - def read(startOffset: Long, maxOffset: Option[Long], maxSize: Int, maxPosition: Long = size): FetchDataInfo = { - if(maxSize < 0) + def read(startOffset: Long, maxOffset: Option[Long], maxSize: Int, maxPosition: Long = size, + minOneMessage: Boolean = false): FetchDataInfo = { + if (maxSize < 0) throw new IllegalArgumentException("Invalid max size for log read (%d)".format(maxSize)) val logSize = log.sizeInBytes // this may change, need to save a consistent copy - val startPosition = translateOffset(startOffset) + val startOffsetAndSize = translateOffset(startOffset) // if the start position is already off the end of the log, return null - if(startPosition == null) + if (startOffsetAndSize == null) return null + val (startPosition, messageSetSize) = startOffsetAndSize val offsetMetadata = new LogOffsetMetadata(startOffset, this.baseOffset, startPosition.position) - // if the size is zero, still return a log segment but with zero size - if(maxSize == 0) + val adjustedMaxSize = + if (minOneMessage) math.max(maxSize, messageSetSize) + else maxSize + + // return a log segment but with zero size in the case below + if (adjustedMaxSize == 0) return FetchDataInfo(offsetMetadata, MessageSet.Empty) // calculate the length of the message set to read based on whether or not they gave us a maxOffset val length = maxOffset match { case None => // no max offset, just read until the max position - min((maxPosition - startPosition.position).toInt, maxSize) + min((maxPosition - startPosition.position).toInt, adjustedMaxSize) case Some(offset) => // there is a max offset, translate it to a file position and use that to calculate the max read size; // when the leader of a partition changes, it's possible for the new leader's high watermark to be less than the // true high watermark in the previous leader for a short window. In this window, if a consumer fetches on an // offset between new leader's high watermark and the log end offset, we want to return an empty response. - if(offset < startOffset) - return FetchDataInfo(offsetMetadata, MessageSet.Empty) + if (offset < startOffset) + return FetchDataInfo(offsetMetadata, MessageSet.Empty, firstMessageSetIncomplete = false) val mapping = translateOffset(offset, startPosition.position) val endPosition = - if(mapping == null) + if (mapping == null) logSize // the max offset is off the end of the log, use the end of the file else - mapping.position - min(min(maxPosition, endPosition) - startPosition.position, maxSize).toInt + mapping._1.position + min(min(maxPosition, endPosition) - startPosition.position, adjustedMaxSize).toInt } - FetchDataInfo(offsetMetadata, log.read(startPosition.position, length)) + FetchDataInfo(offsetMetadata, log.read(startPosition.position, length), + firstMessageSetIncomplete = adjustedMaxSize < messageSetSize) } /** @@ -260,14 +269,15 @@ class LogSegment(val log: FileMessageSet, @nonthreadsafe def truncateTo(offset: Long): Int = { val mapping = translateOffset(offset) - if(mapping == null) + if (mapping == null) return 0 index.truncateTo(offset) timeIndex.truncateTo(offset) // after truncation, reset and allocate more space for the (new currently active) index index.resize(index.maxIndexSize) timeIndex.resize(timeIndex.maxIndexSize) - val bytesTruncated = log.truncateTo(mapping.position) + val (offsetPosition, _) = mapping + val bytesTruncated = log.truncateTo(offsetPosition.position) if(log.sizeInBytes == 0) { created = time.milliseconds rollingBasedTimestamp = None http://git-wip-us.apache.org/repos/asf/kafka/blob/d04b0998/core/src/main/scala/kafka/server/AbstractFetcherManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/AbstractFetcherManager.scala b/core/src/main/scala/kafka/server/AbstractFetcherManager.scala index ec40516..8f6b84f 100755 --- a/core/src/main/scala/kafka/server/AbstractFetcherManager.scala +++ b/core/src/main/scala/kafka/server/AbstractFetcherManager.scala @@ -23,8 +23,8 @@ import scala.collection.Map import kafka.utils.Logging import kafka.cluster.BrokerEndPoint import kafka.metrics.KafkaMetricsGroup -import kafka.common.TopicAndPartition import com.yammer.metrics.core.Gauge +import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.utils.Utils abstract class AbstractFetcherManager(protected val name: String, clientId: String, numFetchers: Int = 1) @@ -71,7 +71,7 @@ abstract class AbstractFetcherManager(protected val name: String, clientId: Stri // to be defined in subclass to create a specific fetcher def createFetcherThread(fetcherId: Int, sourceBroker: BrokerEndPoint): AbstractFetcherThread - def addFetcherForPartitions(partitionAndOffsets: Map[TopicAndPartition, BrokerAndInitialOffset]) { + def addFetcherForPartitions(partitionAndOffsets: Map[TopicPartition, BrokerAndInitialOffset]) { mapLock synchronized { val partitionsPerFetcher = partitionAndOffsets.groupBy{ case(topicAndPartition, brokerAndInitialOffset) => BrokerAndFetcherId(brokerAndInitialOffset.broker, getFetcherId(topicAndPartition.topic, topicAndPartition.partition))} @@ -85,8 +85,8 @@ abstract class AbstractFetcherManager(protected val name: String, clientId: Stri fetcherThread.start } - fetcherThreadMap(brokerAndFetcherId).addPartitions(partitionAndOffsets.map { case (topicAndPartition, brokerAndInitOffset) => - topicAndPartition -> brokerAndInitOffset.initOffset + fetcherThreadMap(brokerAndFetcherId).addPartitions(partitionAndOffsets.map { case (tp, brokerAndInitOffset) => + tp -> brokerAndInitOffset.initOffset }) } } @@ -95,11 +95,10 @@ abstract class AbstractFetcherManager(protected val name: String, clientId: Stri "[" + topicAndPartition + ", initOffset " + brokerAndInitialOffset.initOffset + " to broker " + brokerAndInitialOffset.broker + "] "})) } - def removeFetcherForPartitions(partitions: Set[TopicAndPartition]) { + def removeFetcherForPartitions(partitions: Set[TopicPartition]) { mapLock synchronized { - for ((key, fetcher) <- fetcherThreadMap) { + for ((key, fetcher) <- fetcherThreadMap) fetcher.removePartitions(partitions) - } } info("Removed fetcher for partitions %s".format(partitions.mkString(","))) } http://git-wip-us.apache.org/repos/asf/kafka/blob/d04b0998/core/src/main/scala/kafka/server/AbstractFetcherThread.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala index 36baf1f..2f2cb4b 100755 --- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala +++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala @@ -21,19 +21,23 @@ import java.util.concurrent.locks.ReentrantLock import kafka.cluster.BrokerEndPoint import kafka.consumer.PartitionTopicInfo -import kafka.message.{MessageAndOffset, ByteBufferMessageSet} -import kafka.utils.{Pool, ShutdownableThread, DelayedItem} -import kafka.common.{KafkaException, ClientIdAndBroker, TopicAndPartition} +import kafka.message.ByteBufferMessageSet +import kafka.utils.{DelayedItem, Pool, ShutdownableThread} +import kafka.common.{ClientIdAndBroker, KafkaException} import kafka.metrics.KafkaMetricsGroup import kafka.utils.CoreUtils.inLock import org.apache.kafka.common.errors.CorruptRecordException import org.apache.kafka.common.protocol.Errors import AbstractFetcherThread._ -import scala.collection.{mutable, Set, Map} + +import scala.collection.{Map, Set, mutable} +import scala.collection.JavaConverters._ import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicLong import com.yammer.metrics.core.Gauge +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.internals.PartitionStates /** * Abstract class for fetching data from multiple partitions from the same broker. @@ -48,7 +52,7 @@ abstract class AbstractFetcherThread(name: String, type REQ <: FetchRequest type PD <: PartitionData - private val partitionMap = new mutable.HashMap[TopicAndPartition, PartitionFetchState] // a (topic, partition) -> partitionFetchState map + private val partitionStates = new PartitionStates[PartitionFetchState] private val partitionMapLock = new ReentrantLock private val partitionMapCond = partitionMapLock.newCondition() @@ -59,17 +63,17 @@ abstract class AbstractFetcherThread(name: String, /* callbacks to be defined in subclass */ // process fetched data - def processPartitionData(topicAndPartition: TopicAndPartition, fetchOffset: Long, partitionData: PD) + def processPartitionData(topicPartition: TopicPartition, fetchOffset: Long, partitionData: PD) // handle a partition whose offset is out of range and return a new fetch offset - def handleOffsetOutOfRange(topicAndPartition: TopicAndPartition): Long + def handleOffsetOutOfRange(topicPartition: TopicPartition): Long // deal with partitions with errors, potentially due to leadership changes - def handlePartitionsWithErrors(partitions: Iterable[TopicAndPartition]) + def handlePartitionsWithErrors(partitions: Iterable[TopicPartition]) - protected def buildFetchRequest(partitionMap: Map[TopicAndPartition, PartitionFetchState]): REQ + protected def buildFetchRequest(partitionMap: Seq[(TopicPartition, PartitionFetchState)]): REQ - protected def fetch(fetchRequest: REQ): Map[TopicAndPartition, PD] + protected def fetch(fetchRequest: REQ): Seq[(TopicPartition, PD)] override def shutdown(){ initiateShutdown() @@ -86,7 +90,9 @@ abstract class AbstractFetcherThread(name: String, override def doWork() { val fetchRequest = inLock(partitionMapLock) { - val fetchRequest = buildFetchRequest(partitionMap) + val fetchRequest = buildFetchRequest(partitionStates.partitionStates.asScala.map { state => + state.topicPartition -> state.value + }) if (fetchRequest.isEmpty) { trace("There are no active partitions. Back off for %d ms before sending a fetch request".format(fetchBackOffMs)) partitionMapCond.await(fetchBackOffMs, TimeUnit.MILLISECONDS) @@ -98,8 +104,14 @@ abstract class AbstractFetcherThread(name: String, } private def processFetchRequest(fetchRequest: REQ) { - val partitionsWithError = new mutable.HashSet[TopicAndPartition] - var responseData: Map[TopicAndPartition, PD] = Map.empty + val partitionsWithError = mutable.Set[TopicPartition]() + + def updatePartitionsWithError(partition: TopicPartition): Unit = { + partitionsWithError += partition + partitionStates.moveToEnd(partition) + } + + var responseData: Seq[(TopicPartition, PD)] = Seq.empty try { trace("Issuing to broker %d of fetch request %s".format(sourceBroker.id, fetchRequest)) @@ -109,8 +121,10 @@ abstract class AbstractFetcherThread(name: String, if (isRunning.get) { warn(s"Error in fetch $fetchRequest", t) inLock(partitionMapLock) { - partitionsWithError ++= partitionMap.keys + partitionStates.partitionSet.asScala.foreach(updatePartitionsWithError) // there is an error occurred while fetching partitions, sleep a while + // note that `ReplicaFetcherThread.handlePartitionsWithError` will also introduce the same delay for every + // partition with error effectively doubling the delay. It would be good to improve this. partitionMapCond.await(fetchBackOffMs, TimeUnit.MILLISECONDS) } } @@ -121,25 +135,28 @@ abstract class AbstractFetcherThread(name: String, // process fetched data inLock(partitionMapLock) { - responseData.foreach { case (topicAndPartition, partitionData) => - val TopicAndPartition(topic, partitionId) = topicAndPartition - partitionMap.get(topicAndPartition).foreach(currentPartitionFetchState => + responseData.foreach { case (topicPartition, partitionData) => + val topic = topicPartition.topic + val partitionId = topicPartition.partition + Option(partitionStates.stateValue(topicPartition)).foreach(currentPartitionFetchState => // we append to the log if the current offset is defined and it is the same as the offset requested during fetch - if (fetchRequest.offset(topicAndPartition) == currentPartitionFetchState.offset) { + if (fetchRequest.offset(topicPartition) == currentPartitionFetchState.offset) { Errors.forCode(partitionData.errorCode) match { case Errors.NONE => try { val messages = partitionData.toByteBufferMessageSet - val validBytes = messages.validBytes val newOffset = messages.shallowIterator.toSeq.lastOption match { - case Some(m: MessageAndOffset) => m.nextOffset - case None => currentPartitionFetchState.offset + case Some(m) => + partitionStates.updateAndMoveToEnd(topicPartition, new PartitionFetchState(m.nextOffset)) + fetcherStats.byteRate.mark(messages.validBytes) + m.nextOffset + case None => + currentPartitionFetchState.offset } - partitionMap.put(topicAndPartition, new PartitionFetchState(newOffset)) + fetcherLagStats.getAndMaybePut(topic, partitionId).lag = Math.max(0L, partitionData.highWatermark - newOffset) - fetcherStats.byteRate.mark(validBytes) // Once we hand off the partition data to the subclass, we can't mess with it any more in this thread - processPartitionData(topicAndPartition, currentPartitionFetchState.offset, partitionData) + processPartitionData(topicPartition, currentPartitionFetchState.offset, partitionData) } catch { case ime: CorruptRecordException => // we log the error and continue. This ensures two things @@ -153,20 +170,20 @@ abstract class AbstractFetcherThread(name: String, } case Errors.OFFSET_OUT_OF_RANGE => try { - val newOffset = handleOffsetOutOfRange(topicAndPartition) - partitionMap.put(topicAndPartition, new PartitionFetchState(newOffset)) + val newOffset = handleOffsetOutOfRange(topicPartition) + partitionStates.updateAndMoveToEnd(topicPartition, new PartitionFetchState(newOffset)) error("Current offset %d for partition [%s,%d] out of range; reset offset to %d" .format(currentPartitionFetchState.offset, topic, partitionId, newOffset)) } catch { case e: Throwable => error("Error getting offset for partition [%s,%d] to broker %d".format(topic, partitionId, sourceBroker.id), e) - partitionsWithError += topicAndPartition + updatePartitionsWithError(topicPartition) } case _ => if (isRunning.get) { error("Error for partition [%s,%d] to broker %d:%s".format(topic, partitionId, sourceBroker.id, partitionData.exception.get)) - partitionsWithError += topicAndPartition + updatePartitionsWithError(topicPartition) } } }) @@ -180,47 +197,52 @@ abstract class AbstractFetcherThread(name: String, } } - def addPartitions(partitionAndOffsets: Map[TopicAndPartition, Long]) { + def addPartitions(partitionAndOffsets: Map[TopicPartition, Long]) { partitionMapLock.lockInterruptibly() try { - for ((topicAndPartition, offset) <- partitionAndOffsets) { - // If the partitionMap already has the topic/partition, then do not update the map with the old offset - if (!partitionMap.contains(topicAndPartition)) - partitionMap.put( - topicAndPartition, - if (PartitionTopicInfo.isOffsetInvalid(offset)) new PartitionFetchState(handleOffsetOutOfRange(topicAndPartition)) - else new PartitionFetchState(offset) - )} + // If the partitionMap already has the topic/partition, then do not update the map with the old offset + val newPartitionToState = partitionAndOffsets.filter { case (tp, _) => + !partitionStates.contains(tp) + }.map { case (tp, offset) => + val fetchState = + if (PartitionTopicInfo.isOffsetInvalid(offset)) new PartitionFetchState(handleOffsetOutOfRange(tp)) + else new PartitionFetchState(offset) + tp -> fetchState + } + val existingPartitionToState = partitionStates.partitionStates.asScala.map { state => + state.topicPartition -> state.value + }.toMap + partitionStates.set((existingPartitionToState ++ newPartitionToState).asJava) partitionMapCond.signalAll() } finally partitionMapLock.unlock() } - def delayPartitions(partitions: Iterable[TopicAndPartition], delay: Long) { + def delayPartitions(partitions: Iterable[TopicPartition], delay: Long) { partitionMapLock.lockInterruptibly() try { for (partition <- partitions) { - partitionMap.get(partition).foreach (currentPartitionFetchState => + Option(partitionStates.stateValue(partition)).foreach (currentPartitionFetchState => if (currentPartitionFetchState.isActive) - partitionMap.put(partition, new PartitionFetchState(currentPartitionFetchState.offset, new DelayedItem(delay))) + partitionStates.updateAndMoveToEnd(partition, new PartitionFetchState(currentPartitionFetchState.offset, new DelayedItem(delay))) ) } partitionMapCond.signalAll() } finally partitionMapLock.unlock() } - def removePartitions(topicAndPartitions: Set[TopicAndPartition]) { + def removePartitions(topicPartitions: Set[TopicPartition]) { partitionMapLock.lockInterruptibly() try { - topicAndPartitions.foreach { topicAndPartition => - partitionMap.remove(topicAndPartition) - fetcherLagStats.unregister(topicAndPartition.topic, topicAndPartition.partition) + topicPartitions.foreach { topicPartition => + partitionStates.remove(topicPartition) + fetcherLagStats.unregister(topicPartition.topic, topicPartition.partition) } } finally partitionMapLock.unlock() } def partitionCount() = { partitionMapLock.lockInterruptibly() - try partitionMap.size + try partitionStates.size finally partitionMapLock.unlock() } @@ -230,7 +252,7 @@ object AbstractFetcherThread { trait FetchRequest { def isEmpty: Boolean - def offset(topicAndPartition: TopicAndPartition): Long + def offset(topicPartition: TopicPartition): Long } trait PartitionData { @@ -315,13 +337,13 @@ case class ClientIdTopicPartition(clientId: String, topic: String, partitionId: } /** - * case class to keep partition offset and its state(active , inactive) + * case class to keep partition offset and its state(active, inactive) */ case class PartitionFetchState(offset: Long, delay: DelayedItem) { def this(offset: Long) = this(offset, new DelayedItem(0)) - def isActive: Boolean = { delay.getDelay(TimeUnit.MILLISECONDS) == 0 } + def isActive: Boolean = delay.getDelay(TimeUnit.MILLISECONDS) == 0 override def toString = "%d-%b".format(offset, isActive) } http://git-wip-us.apache.org/repos/asf/kafka/blob/d04b0998/core/src/main/scala/kafka/server/DelayedFetch.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/DelayedFetch.scala b/core/src/main/scala/kafka/server/DelayedFetch.scala index cf3a48f..4b17e81 100644 --- a/core/src/main/scala/kafka/server/DelayedFetch.scala +++ b/core/src/main/scala/kafka/server/DelayedFetch.scala @@ -37,10 +37,12 @@ case class FetchPartitionStatus(startOffsetMetadata: LogOffsetMetadata, fetchInf * The fetch metadata maintained by the delayed fetch operation */ case class FetchMetadata(fetchMinBytes: Int, + fetchMaxBytes: Int, + hardMaxBytesLimit: Boolean, fetchOnlyLeader: Boolean, fetchOnlyCommitted: Boolean, isFromFollower: Boolean, - fetchPartitionStatus: Map[TopicAndPartition, FetchPartitionStatus]) { + fetchPartitionStatus: Seq[(TopicAndPartition, FetchPartitionStatus)]) { override def toString = "[minBytes: " + fetchMinBytes + ", " + "onlyLeader:" + fetchOnlyLeader + ", " @@ -55,7 +57,7 @@ class DelayedFetch(delayMs: Long, fetchMetadata: FetchMetadata, replicaManager: ReplicaManager, quota: ReplicaQuota, - responseCallback: Map[TopicAndPartition, FetchResponsePartitionData] => Unit) + responseCallback: Seq[(TopicAndPartition, FetchResponsePartitionData)] => Unit) extends DelayedOperation(delayMs) { /** @@ -136,12 +138,18 @@ class DelayedFetch(delayMs: Long, * Upon completion, read whatever data is available and pass to the complete callback */ override def onComplete() { - val logReadResults = replicaManager.readFromLocalLog(fetchMetadata.fetchOnlyLeader, + val logReadResults = replicaManager.readFromLocalLog( + fetchMetadata.fetchOnlyLeader, fetchMetadata.fetchOnlyCommitted, - fetchMetadata.fetchPartitionStatus.mapValues(status => status.fetchInfo), quota) - - val fetchPartitionData = logReadResults.mapValues(result => - FetchResponsePartitionData(result.errorCode, result.hw, result.info.messageSet)) + fetchMetadata.fetchMaxBytes, + fetchMetadata.hardMaxBytesLimit, + fetchMetadata.fetchPartitionStatus.map { case (tp, status) => tp -> status.fetchInfo }, + quota + ) + + val fetchPartitionData = logReadResults.map { case (tp, result) => + tp -> FetchResponsePartitionData(result.errorCode, result.hw, result.info.messageSet) + } responseCallback(fetchPartitionData) } http://git-wip-us.apache.org/repos/asf/kafka/blob/d04b0998/core/src/main/scala/kafka/server/FetchDataInfo.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/FetchDataInfo.scala b/core/src/main/scala/kafka/server/FetchDataInfo.scala index 1a8a604..9d6d437 100644 --- a/core/src/main/scala/kafka/server/FetchDataInfo.scala +++ b/core/src/main/scala/kafka/server/FetchDataInfo.scala @@ -19,4 +19,5 @@ package kafka.server import kafka.message.MessageSet -case class FetchDataInfo(fetchOffsetMetadata: LogOffsetMetadata, messageSet: MessageSet) +case class FetchDataInfo(fetchOffsetMetadata: LogOffsetMetadata, messageSet: MessageSet, + firstMessageSetIncomplete: Boolean = false) http://git-wip-us.apache.org/repos/asf/kafka/blob/d04b0998/core/src/main/scala/kafka/server/KafkaApis.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index d3ba5ef..51c9eab 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -146,7 +146,7 @@ class KafkaApis(val requestChannel: RequestChannel, val responseHeader = new ResponseHeader(correlationId) val leaderAndIsrResponse = if (authorize(request.session, ClusterAction, Resource.ClusterResource)) { - val result = replicaManager.becomeLeaderOrFollower(correlationId, leaderAndIsrRequest, metadataCache, onLeadershipChange) + val result = replicaManager.becomeLeaderOrFollower(correlationId, leaderAndIsrRequest, metadataCache, onLeadershipChange) new LeaderAndIsrResponse(result.errorCode, result.responseMap.mapValues(new JShort(_)).asJava) } else { val result = leaderAndIsrRequest.partitionStates.asScala.keys.map((_, new JShort(Errors.CLUSTER_AUTHORIZATION_FAILED.code))).toMap @@ -437,12 +437,12 @@ class KafkaApis(val requestChannel: RequestChannel, case (topicAndPartition, _) => authorize(request.session, Read, new Resource(auth.Topic, topicAndPartition.topic)) } - val unauthorizedPartitionData = unauthorizedRequestInfo.mapValues { _ => - FetchResponsePartitionData(Errors.TOPIC_AUTHORIZATION_FAILED.code, -1, MessageSet.Empty) + val unauthorizedPartitionData = unauthorizedRequestInfo.map { case (tp, _) => + (tp, FetchResponsePartitionData(Errors.TOPIC_AUTHORIZATION_FAILED.code, -1, MessageSet.Empty)) } // the callback for sending a fetch response - def sendResponseCallback(responsePartitionData: Map[TopicAndPartition, FetchResponsePartitionData]) { + def sendResponseCallback(responsePartitionData: Seq[(TopicAndPartition, FetchResponsePartitionData)]) { val convertedPartitionData = // Need to down-convert message when consumer only takes magic value 0. @@ -480,8 +480,8 @@ class KafkaApis(val requestChannel: RequestChannel, def fetchResponseCallback(delayTimeMs: Int) { trace(s"Sending fetch response to client ${fetchRequest.clientId} of " + - s"${convertedPartitionData.values.map(_.messages.sizeInBytes).sum} bytes") - val response = FetchResponse(fetchRequest.correlationId, mergedPartitionData, fetchRequest.versionId, delayTimeMs) + s"${convertedPartitionData.map { case (_, v) => v.messages.sizeInBytes }.sum} bytes") + val response = FetchResponse(fetchRequest.correlationId, mergedPartitionData.toSeq, fetchRequest.versionId, delayTimeMs) requestChannel.sendResponse(new RequestChannel.Response(request, new FetchResponseSend(request.connectionId, response))) } @@ -490,32 +490,37 @@ class KafkaApis(val requestChannel: RequestChannel, if (fetchRequest.isFromFollower) { //We've already evaluated against the quota and are good to go. Just need to record it now. - val size = sizeOfThrottledPartitions(fetchRequest, mergedPartitionData, quotas.leader) - quotas.leader.record(size) + val responseSize = sizeOfThrottledPartitions(fetchRequest, mergedPartitionData, quotas.leader) + quotas.leader.record(responseSize) fetchResponseCallback(0) } else { - val size = FetchResponse.responseSize(mergedPartitionData.groupBy(_._1.topic), fetchRequest.versionId) - quotas.fetch.recordAndMaybeThrottle(request.session.sanitizedUser, fetchRequest.clientId, size, fetchResponseCallback) + val responseSize = FetchResponse.responseSize(FetchResponse.batchByTopic(mergedPartitionData), + fetchRequest.versionId) + quotas.fetch.recordAndMaybeThrottle(request.session.sanitizedUser, fetchRequest.clientId, responseSize, fetchResponseCallback) } } if (authorizedRequestInfo.isEmpty) - sendResponseCallback(Map.empty) + sendResponseCallback(Seq.empty) else { // call the replica manager to fetch messages from the local replica replicaManager.fetchMessages( fetchRequest.maxWait.toLong, fetchRequest.replicaId, fetchRequest.minBytes, + fetchRequest.maxBytes, + fetchRequest.versionId <= 2, authorizedRequestInfo, replicationQuota(fetchRequest), sendResponseCallback) } } - private def sizeOfThrottledPartitions(fetchRequest: FetchRequest, mergedPartitionData: Map[TopicAndPartition, FetchResponsePartitionData], quota: ReplicationQuotaManager): Int = { + private def sizeOfThrottledPartitions(fetchRequest: FetchRequest, + mergedPartitionData: Seq[(TopicAndPartition, FetchResponsePartitionData)], + quota: ReplicationQuotaManager): Int = { val throttledPartitions = mergedPartitionData.filter { case (partition, _) => quota.isThrottled(partition) } - FetchResponse.responseSize(throttledPartitions.groupBy(_._1.topic), fetchRequest.versionId) + FetchResponse.responseSize(FetchRequest.batchByTopic(throttledPartitions), fetchRequest.versionId) } def replicationQuota(fetchRequest: FetchRequest): ReplicaQuota = http://git-wip-us.apache.org/repos/asf/kafka/blob/d04b0998/core/src/main/scala/kafka/server/KafkaConfig.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index b37be5b..9cd05f1 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -115,6 +115,7 @@ object Defaults { val ReplicaFetchMaxBytes = ConsumerConfig.FetchSize val ReplicaFetchWaitMaxMs = 500 val ReplicaFetchMinBytes = 1 + val ReplicaFetchResponseMaxBytes = 10 * 1024 * 1024 val NumReplicaFetchers = 1 val ReplicaFetchBackoffMs = 1000 val ReplicaHighWatermarkCheckpointIntervalMs = 5000L @@ -282,6 +283,7 @@ object KafkaConfig { val ReplicaFetchMaxBytesProp = "replica.fetch.max.bytes" val ReplicaFetchWaitMaxMsProp = "replica.fetch.wait.max.ms" val ReplicaFetchMinBytesProp = "replica.fetch.min.bytes" + val ReplicaFetchResponseMaxBytesProp = "replica.fetch.response.max.bytes" val ReplicaFetchBackoffMsProp = "replica.fetch.backoff.ms" val NumReplicaFetchersProp = "num.replica.fetchers" val ReplicaHighWatermarkCheckpointIntervalMsProp = "replica.high.watermark.checkpoint.interval.ms" @@ -480,10 +482,17 @@ object KafkaConfig { " the leader will remove the follower from isr" val ReplicaSocketTimeoutMsDoc = "The socket timeout for network requests. Its value should be at least replica.fetch.wait.max.ms" val ReplicaSocketReceiveBufferBytesDoc = "The socket receive buffer for network requests" - val ReplicaFetchMaxBytesDoc = "The number of bytes of messages to attempt to fetch" + val ReplicaFetchMaxBytesDoc = "The number of bytes of messages to attempt to fetch for each partition. This is not an absolute maximum, " + + "if the first message in the first non-empty partition of the fetch is larger than this value, the message will still be returned " + + "to ensure that progress can be made. The maximum message size accepted by the broker is defined via " + + "<code>message.max.bytes</code> (broker config) or <code>max.message.bytes</code> (topic config)." val ReplicaFetchWaitMaxMsDoc = "max wait time for each fetcher request issued by follower replicas. This value should always be less than the " + "replica.lag.time.max.ms at all times to prevent frequent shrinking of ISR for low throughput topics" val ReplicaFetchMinBytesDoc = "Minimum bytes expected for each fetch response. If not enough bytes, wait up to replicaMaxWaitTimeMs" + val ReplicaFetchResponseMaxBytesDoc = "Maximum bytes expected for the entire fetch response. This is not an absolute maximum, " + + "if the first message in the first non-empty partition of the fetch is larger than this value, the message will still be returned " + + "to ensure that progress can be made. The maximum message size accepted by the broker is defined via " + + "<code>message.max.bytes</code> (broker config) or <code>max.message.bytes</code> (topic config)." val NumReplicaFetchersDoc = "Number of fetcher threads used to replicate messages from a source broker. " + "Increasing this value can increase the degree of I/O parallelism in the follower broker." val ReplicaFetchBackoffMsDoc = "The amount of time to sleep when fetch partition error occurs." @@ -671,10 +680,11 @@ object KafkaConfig { .define(ReplicaLagTimeMaxMsProp, LONG, Defaults.ReplicaLagTimeMaxMs, HIGH, ReplicaLagTimeMaxMsDoc) .define(ReplicaSocketTimeoutMsProp, INT, Defaults.ReplicaSocketTimeoutMs, HIGH, ReplicaSocketTimeoutMsDoc) .define(ReplicaSocketReceiveBufferBytesProp, INT, Defaults.ReplicaSocketReceiveBufferBytes, HIGH, ReplicaSocketReceiveBufferBytesDoc) - .define(ReplicaFetchMaxBytesProp, INT, Defaults.ReplicaFetchMaxBytes, HIGH, ReplicaFetchMaxBytesDoc) + .define(ReplicaFetchMaxBytesProp, INT, Defaults.ReplicaFetchMaxBytes, atLeast(0), MEDIUM, ReplicaFetchMaxBytesDoc) .define(ReplicaFetchWaitMaxMsProp, INT, Defaults.ReplicaFetchWaitMaxMs, HIGH, ReplicaFetchWaitMaxMsDoc) .define(ReplicaFetchBackoffMsProp, INT, Defaults.ReplicaFetchBackoffMs, atLeast(0), MEDIUM, ReplicaFetchBackoffMsDoc) .define(ReplicaFetchMinBytesProp, INT, Defaults.ReplicaFetchMinBytes, HIGH, ReplicaFetchMinBytesDoc) + .define(ReplicaFetchResponseMaxBytesProp, INT, Defaults.ReplicaFetchResponseMaxBytes, atLeast(0), MEDIUM, ReplicaFetchResponseMaxBytesDoc) .define(NumReplicaFetchersProp, INT, Defaults.NumReplicaFetchers, HIGH, NumReplicaFetchersDoc) .define(ReplicaHighWatermarkCheckpointIntervalMsProp, LONG, Defaults.ReplicaHighWatermarkCheckpointIntervalMs, HIGH, ReplicaHighWatermarkCheckpointIntervalMsDoc) .define(FetchPurgatoryPurgeIntervalRequestsProp, INT, Defaults.FetchPurgatoryPurgeIntervalRequests, MEDIUM, FetchPurgatoryPurgeIntervalRequestsDoc) @@ -881,6 +891,7 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean) extends Abstra val replicaFetchMaxBytes = getInt(KafkaConfig.ReplicaFetchMaxBytesProp) val replicaFetchWaitMaxMs = getInt(KafkaConfig.ReplicaFetchWaitMaxMsProp) val replicaFetchMinBytes = getInt(KafkaConfig.ReplicaFetchMinBytesProp) + val replicaFetchResponseMaxBytes = getInt(KafkaConfig.ReplicaFetchResponseMaxBytesProp) val replicaFetchBackoffMs = getInt(KafkaConfig.ReplicaFetchBackoffMsProp) val numReplicaFetchers = getInt(KafkaConfig.NumReplicaFetchersProp) val replicaHighWatermarkCheckpointIntervalMs = getLong(KafkaConfig.ReplicaHighWatermarkCheckpointIntervalMsProp) @@ -1042,7 +1053,6 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean) extends Abstra require(logCleanerDedupeBufferSize / logCleanerThreads > 1024 * 1024, "log.cleaner.dedupe.buffer.size must be at least 1MB per cleaner thread.") require(replicaFetchWaitMaxMs <= replicaSocketTimeoutMs, "replica.socket.timeout.ms should always be at least replica.fetch.wait.max.ms" + " to prevent unnecessary socket timeouts") - require(replicaFetchMaxBytes >= messageMaxBytes, "replica.fetch.max.bytes should be equal or greater than message.max.bytes") require(replicaFetchWaitMaxMs <= replicaLagTimeMaxMs, "replica.fetch.wait.max.ms should always be at least replica.lag.time.max.ms" + " to prevent frequent changes in ISR") require(offsetCommitRequiredAcks >= -1 && offsetCommitRequiredAcks <= offsetsTopicReplicationFactor, http://git-wip-us.apache.org/repos/asf/kafka/blob/d04b0998/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala index b0bd070..78e00df 100644 --- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala +++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala @@ -18,24 +18,26 @@ package kafka.server import java.net.SocketTimeoutException +import java.util import kafka.admin.AdminUtils import kafka.cluster.BrokerEndPoint import kafka.log.LogConfig import kafka.message.ByteBufferMessageSet -import kafka.api.{KAFKA_0_10_0_IV0, KAFKA_0_9_0} +import kafka.api.{KAFKA_0_9_0, KAFKA_0_10_0_IV0, KAFKA_0_10_1_IV1} import kafka.common.{KafkaStorageException, TopicAndPartition} import ReplicaFetcherThread._ import org.apache.kafka.clients.{ManualMetadataUpdater, NetworkClient, ClientRequest, ClientResponse} import org.apache.kafka.common.network.{LoginType, Selectable, ChannelBuilders, NetworkReceive, Selector, Mode} -import org.apache.kafka.common.requests.{FetchRequest => JFetchRequest, _} +import org.apache.kafka.common.requests.{ListOffsetResponse, FetchResponse, RequestSend, AbstractRequest, ListOffsetRequest} +import org.apache.kafka.common.requests.{FetchRequest => JFetchRequest} import org.apache.kafka.common.{Node, TopicPartition} import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.protocol.{Errors, ApiKeys} import org.apache.kafka.common.utils.Time -import scala.collection.{JavaConverters, Map, mutable} +import scala.collection.{JavaConverters, Map} import JavaConverters._ class ReplicaFetcherThread(name: String, @@ -45,8 +47,7 @@ class ReplicaFetcherThread(name: String, replicaMgr: ReplicaManager, metrics: Metrics, time: Time, - quota: ReplicationQuotaManager - ) + quota: ReplicationQuotaManager) extends AbstractFetcherThread(name = name, clientId = name, sourceBroker = sourceBroker, @@ -57,13 +58,15 @@ class ReplicaFetcherThread(name: String, type PD = PartitionData private val fetchRequestVersion: Short = - if (brokerConfig.interBrokerProtocolVersion >= KAFKA_0_10_0_IV0) 2 + if (brokerConfig.interBrokerProtocolVersion >= KAFKA_0_10_1_IV1) 3 + else if (brokerConfig.interBrokerProtocolVersion >= KAFKA_0_10_0_IV0) 2 else if (brokerConfig.interBrokerProtocolVersion >= KAFKA_0_9_0) 1 else 0 private val socketTimeout: Int = brokerConfig.replicaSocketTimeoutMs private val replicaId = brokerConfig.brokerId private val maxWait = brokerConfig.replicaFetchWaitMaxMs private val minBytes = brokerConfig.replicaFetchMinBytes + private val maxBytes = brokerConfig.replicaFetchResponseMaxBytes private val fetchSize = brokerConfig.replicaFetchMaxBytes private def clientId = name @@ -111,22 +114,24 @@ class ReplicaFetcherThread(name: String, } // process fetched data - def processPartitionData(topicAndPartition: TopicAndPartition, fetchOffset: Long, partitionData: PartitionData) { + def processPartitionData(topicPartition: TopicPartition, fetchOffset: Long, partitionData: PartitionData) { try { - val TopicAndPartition(topic, partitionId) = topicAndPartition + val topic = topicPartition.topic + val partitionId = topicPartition.partition val replica = replicaMgr.getReplica(topic, partitionId).get val messageSet = partitionData.toByteBufferMessageSet - warnIfMessageOversized(messageSet, topicAndPartition) + + maybeWarnIfMessageOversized(messageSet, topicPartition) if (fetchOffset != replica.logEndOffset.messageOffset) - throw new RuntimeException("Offset mismatch for partition %s: fetched offset = %d, log end offset = %d.".format(topicAndPartition, fetchOffset, replica.logEndOffset.messageOffset)) + throw new RuntimeException("Offset mismatch for partition %s: fetched offset = %d, log end offset = %d.".format(topicPartition, fetchOffset, replica.logEndOffset.messageOffset)) if (logger.isTraceEnabled) trace("Follower %d has replica log end offset %d for partition %s. Received %d messages and leader hw %d" - .format(replica.brokerId, replica.logEndOffset.messageOffset, topicAndPartition, messageSet.sizeInBytes, partitionData.highWatermark)) + .format(replica.brokerId, replica.logEndOffset.messageOffset, topicPartition, messageSet.sizeInBytes, partitionData.highWatermark)) replica.log.get.append(messageSet, assignOffsets = false) if (logger.isTraceEnabled) trace("Follower %d has replica log end offset %d after appending %d bytes of messages for partition %s" - .format(replica.brokerId, replica.logEndOffset.messageOffset, messageSet.sizeInBytes, topicAndPartition)) + .format(replica.brokerId, replica.logEndOffset.messageOffset, messageSet.sizeInBytes, topicPartition)) val followerHighWatermark = replica.logEndOffset.messageOffset.min(partitionData.highWatermark) // for the follower replica, we do not need to keep // its segment base offset the physical position, @@ -135,18 +140,19 @@ class ReplicaFetcherThread(name: String, if (logger.isTraceEnabled) trace("Follower %d set replica high watermark for partition [%s,%d] to %s" .format(replica.brokerId, topic, partitionId, followerHighWatermark)) - if (quota.isThrottled(topicAndPartition)) + if (quota.isThrottled(new TopicAndPartition(topic, partitionId))) quota.record(messageSet.sizeInBytes) } catch { case e: KafkaStorageException => - fatal(s"Disk error while replicating data for $topicAndPartition", e) + fatal(s"Disk error while replicating data for $topicPartition", e) Runtime.getRuntime.halt(1) } } - def warnIfMessageOversized(messageSet: ByteBufferMessageSet, topicAndPartition: TopicAndPartition): Unit = { - if (messageSet.sizeInBytes > 0 && messageSet.validBytes <= 0) - error(s"Replication is failing due to a message that is greater than replica.fetch.max.bytes for partition $topicAndPartition. " + + def maybeWarnIfMessageOversized(messageSet: ByteBufferMessageSet, topicPartition: TopicPartition): Unit = { + // oversized messages don't cause replication to fail from fetch request version 3 (KIP-74) + if (fetchRequestVersion <= 2 && messageSet.sizeInBytes > 0 && messageSet.validBytes <= 0) + error(s"Replication is failing due to a message that is greater than replica.fetch.max.bytes for partition $topicPartition. " + "This generally occurs when the max.message.bytes has been overridden to exceed this value and a suitably large " + "message has also been sent. To fix this problem increase replica.fetch.max.bytes in your broker config to be " + "equal or larger than your settings for max.message.bytes, both at a broker and topic level.") @@ -155,8 +161,9 @@ class ReplicaFetcherThread(name: String, /** * Handle a partition whose offset is out of range and return a new fetch offset. */ - def handleOffsetOutOfRange(topicAndPartition: TopicAndPartition): Long = { - val replica = replicaMgr.getReplica(topicAndPartition.topic, topicAndPartition.partition).get + def handleOffsetOutOfRange(topicPartition: TopicPartition): Long = { + val topicAndPartition = TopicAndPartition(topicPartition.topic, topicPartition.partition) + val replica = replicaMgr.getReplica(topicPartition.topic, topicPartition.partition).get /** * Unclean leader election: A follower goes down, in the meanwhile the leader keeps appending messages. The follower comes back up @@ -168,7 +175,7 @@ class ReplicaFetcherThread(name: String, * * There is a potential for a mismatch between the logs of the two replicas here. We don't fix this mismatch as of now. */ - val leaderEndOffset: Long = earliestOrLatestOffset(topicAndPartition, ListOffsetRequest.LATEST_TIMESTAMP, + val leaderEndOffset: Long = earliestOrLatestOffset(topicPartition, ListOffsetRequest.LATEST_TIMESTAMP, brokerConfig.brokerId) if (leaderEndOffset < replica.logEndOffset.messageOffset) { @@ -176,16 +183,16 @@ class ReplicaFetcherThread(name: String, // This situation could only happen if the unclean election configuration for a topic changes while a replica is down. Otherwise, // we should never encounter this situation since a non-ISR leader cannot be elected if disallowed by the broker configuration. if (!LogConfig.fromProps(brokerConfig.originals, AdminUtils.fetchEntityConfig(replicaMgr.zkUtils, - ConfigType.Topic, topicAndPartition.topic)).uncleanLeaderElectionEnable) { + ConfigType.Topic, topicPartition.topic)).uncleanLeaderElectionEnable) { // Log a fatal error and shutdown the broker to ensure that data loss does not unexpectedly occur. - fatal("Exiting because log truncation is not allowed for partition %s,".format(topicAndPartition) + + fatal("Exiting because log truncation is not allowed for partition %s,".format(topicPartition) + " Current leader %d's latest offset %d is less than replica %d's latest offset %d" .format(sourceBroker.id, leaderEndOffset, brokerConfig.brokerId, replica.logEndOffset.messageOffset)) System.exit(1) } warn("Replica %d for partition %s reset its fetch offset from %d to current leader %d's latest offset %d" - .format(brokerConfig.brokerId, topicAndPartition, replica.logEndOffset.messageOffset, sourceBroker.id, leaderEndOffset)) + .format(brokerConfig.brokerId, topicPartition, replica.logEndOffset.messageOffset, sourceBroker.id, leaderEndOffset)) replicaMgr.logManager.truncateTo(Map(topicAndPartition -> leaderEndOffset)) leaderEndOffset } else { @@ -211,10 +218,10 @@ class ReplicaFetcherThread(name: String, * and the current leader's log start offset. * */ - val leaderStartOffset: Long = earliestOrLatestOffset(topicAndPartition, ListOffsetRequest.EARLIEST_TIMESTAMP, + val leaderStartOffset: Long = earliestOrLatestOffset(topicPartition, ListOffsetRequest.EARLIEST_TIMESTAMP, brokerConfig.brokerId) warn("Replica %d for partition %s reset its fetch offset from %d to current leader %d's start offset %d" - .format(brokerConfig.brokerId, topicAndPartition, replica.logEndOffset.messageOffset, sourceBroker.id, leaderStartOffset)) + .format(brokerConfig.brokerId, topicPartition, replica.logEndOffset.messageOffset, sourceBroker.id, leaderStartOffset)) val offsetToFetch = Math.max(leaderStartOffset, replica.logEndOffset.messageOffset) // Only truncate log when current leader's log start offset is greater than follower's log end offset. if (leaderStartOffset > replica.logEndOffset.messageOffset) @@ -224,14 +231,14 @@ class ReplicaFetcherThread(name: String, } // any logic for partitions whose leader has changed - def handlePartitionsWithErrors(partitions: Iterable[TopicAndPartition]) { + def handlePartitionsWithErrors(partitions: Iterable[TopicPartition]) { delayPartitions(partitions, brokerConfig.replicaFetchBackoffMs.toLong) } - protected def fetch(fetchRequest: FetchRequest): Map[TopicAndPartition, PartitionData] = { + protected def fetch(fetchRequest: FetchRequest): Seq[(TopicPartition, PartitionData)] = { val clientResponse = sendRequest(ApiKeys.FETCH, Some(fetchRequestVersion), fetchRequest.underlying) - new FetchResponse(clientResponse.responseBody).responseData.asScala.map { case (key, value) => - TopicAndPartition(key.topic, key.partition) -> new PartitionData(value) + new FetchResponse(clientResponse.responseBody).responseData.asScala.toSeq.map { case (key, value) => + key -> new PartitionData(value) } } @@ -252,10 +259,10 @@ class ReplicaFetcherThread(name: String, networkClient.close(sourceBroker.id.toString) throw e } + } - private def earliestOrLatestOffset(topicAndPartition: TopicAndPartition, earliestOrLatest: Long, consumerId: Int): Long = { - val topicPartition = new TopicPartition(topicAndPartition.topic, topicAndPartition.partition) + private def earliestOrLatestOffset(topicPartition: TopicPartition, earliestOrLatest: Long, consumerId: Int): Long = { val partitions = Map( topicPartition -> new ListOffsetRequest.PartitionData(earliestOrLatest, 1) ) @@ -269,23 +276,31 @@ class ReplicaFetcherThread(name: String, } } - protected def buildFetchRequest(partitionMap: Map[TopicAndPartition, PartitionFetchState]): FetchRequest = { - val requestMap = mutable.Map.empty[TopicPartition, JFetchRequest.PartitionData] + protected def buildFetchRequest(partitionMap: Seq[(TopicPartition, PartitionFetchState)]): FetchRequest = { + val requestMap = new util.LinkedHashMap[TopicPartition, JFetchRequest.PartitionData] + val quotaExceeded = quota.isQuotaExceeded - partitionMap.foreach { case ((partition, partitionFetchState)) => - if (partitionFetchState.isActive && !(quota.isThrottled(partition) && quotaExceeded)) - requestMap(new TopicPartition(partition.topic, partition.partition)) = new JFetchRequest.PartitionData(partitionFetchState.offset, fetchSize) + partitionMap.foreach { case (topicPartition, partitionFetchState) => + val topicAndPartition = new TopicAndPartition(topicPartition.topic, topicPartition.partition) + if (partitionFetchState.isActive && !(quota.isThrottled(topicAndPartition) && quotaExceeded)) + requestMap.put(topicPartition, new JFetchRequest.PartitionData(partitionFetchState.offset, fetchSize)) } - new FetchRequest(new JFetchRequest(replicaId, maxWait, minBytes, requestMap.asJava)) + + val request = + if (fetchRequestVersion >= 3) JFetchRequest.fromReplica(replicaId, maxWait, minBytes, maxBytes, requestMap) + else JFetchRequest.fromReplica(replicaId, maxWait, minBytes, requestMap) + + new FetchRequest(request) } + } object ReplicaFetcherThread { private[server] class FetchRequest(val underlying: JFetchRequest) extends AbstractFetcherThread.FetchRequest { def isEmpty: Boolean = underlying.fetchData.isEmpty - def offset(topicAndPartition: TopicAndPartition): Long = - underlying.fetchData.asScala(new TopicPartition(topicAndPartition.topic, topicAndPartition.partition)).offset + def offset(topicPartition: TopicPartition): Long = + underlying.fetchData.asScala(topicPartition).offset } private[server] class PartitionData(val underlying: FetchResponse.PartitionData) extends AbstractFetcherThread.PartitionData {