This is an automated email from the ASF dual-hosted git repository. jgus pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new ce5ce2d MINOR: A few logging improvements in the broker (#6773) ce5ce2d is described below commit ce5ce2d569dd9fead42974d81bc7adfc5e6c7a22 Author: Jason Gustafson <ja...@confluent.io> AuthorDate: Tue May 21 14:50:24 2019 -0700 MINOR: A few logging improvements in the broker (#6773) Reviewers: Boyang Chen <bche...@outlook.com>, Rajini Sivaram <rajinisiva...@googlemail.com> --- core/src/main/scala/kafka/log/Log.scala | 7 ------- core/src/main/scala/kafka/server/AbstractFetcherManager.scala | 3 ++- core/src/main/scala/kafka/server/ReplicaManager.scala | 11 ++++++++--- .../main/scala/kafka/server/epoch/LeaderEpochFileCache.scala | 5 ++++- 4 files changed, 14 insertions(+), 12 deletions(-) diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index ef786be..56b2969 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -230,13 +230,6 @@ class Log(@volatile var dir: File, } def updateConfig(updatedKeys: Set[String], newConfig: LogConfig): Unit = { - if ((updatedKeys.contains(LogConfig.RetentionMsProp) - || updatedKeys.contains(LogConfig.MessageTimestampDifferenceMaxMsProp)) - && topicPartition.partition == 0 // generate warnings only for one partition of each topic - && newConfig.retentionMs < newConfig.messageTimestampDifferenceMaxMs) - warn(s"${LogConfig.RetentionMsProp} for topic ${topicPartition.topic} is set to ${newConfig.retentionMs}. It is smaller than " + - s"${LogConfig.MessageTimestampDifferenceMaxMsProp}'s value ${newConfig.messageTimestampDifferenceMaxMs}. " + - s"This may result in frequent log rolling.") val oldConfig = this.config this.config = newConfig if (updatedKeys.contains(LogConfig.MessageFormatVersionProp)) { diff --git a/core/src/main/scala/kafka/server/AbstractFetcherManager.scala b/core/src/main/scala/kafka/server/AbstractFetcherManager.scala index a5faf0e..53152eb 100755 --- a/core/src/main/scala/kafka/server/AbstractFetcherManager.scala +++ b/core/src/main/scala/kafka/server/AbstractFetcherManager.scala @@ -174,7 +174,8 @@ abstract class AbstractFetcherManager[T <: AbstractFetcherThread](val name: Stri fetcher.removePartitions(partitions) failedPartitions.removeAll(partitions) } - info(s"Removed fetcher for partitions $partitions") + if (partitions.nonEmpty) + info(s"Removed fetcher for partitions $partitions") } def shutdownIdleFetcherThreads() { diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index cccccfe..2023a97 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -1084,14 +1084,19 @@ class ReplicaManager(val config: KafkaConfig, s"in assigned replica list ${stateInfo.basePartitionState.replicas.asScala.mkString(",")}") responseMap.put(topicPartition, Errors.UNKNOWN_TOPIC_OR_PARTITION) } - } else { - // Otherwise record the error code in response + } else if (requestLeaderEpoch < currentLeaderEpoch) { stateChangeLogger.warn(s"Ignoring LeaderAndIsr request from " + s"controller $controllerId with correlation id $correlationId " + s"epoch $controllerEpoch for partition $topicPartition since its associated " + - s"leader epoch $requestLeaderEpoch is not higher than the current " + + s"leader epoch $requestLeaderEpoch is smaller than the current " + s"leader epoch $currentLeaderEpoch") responseMap.put(topicPartition, Errors.STALE_CONTROLLER_EPOCH) + } else { + stateChangeLogger.debug(s"Ignoring LeaderAndIsr request from " + + s"controller $controllerId with correlation id $correlationId " + + s"epoch $controllerEpoch for partition $topicPartition since its associated " + + s"leader epoch $requestLeaderEpoch matches the current leader epoch") + responseMap.put(topicPartition, Errors.STALE_CONTROLLER_EPOCH) } } diff --git a/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala b/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala index 0c885b7..7219ee1 100644 --- a/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala +++ b/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala @@ -78,7 +78,10 @@ class LeaderEpochFileCache(topicPartition: TopicPartition, if (removedEpochs.isEmpty) { debug(s"Appended new epoch entry $entryToAppend. Cache now contains ${epochs.size} entries.") - } else { + } else if (removedEpochs.size > 1 || removedEpochs.head.startOffset != entryToAppend.startOffset) { + // Only log a warning if there were non-trivial removals. If the start offset of the new entry + // matches the start offfset of the removed epoch, then no data has been written and the truncation + // is expected. warn(s"New epoch entry $entryToAppend caused truncation of conflicting entries $removedEpochs. " + s"Cache now contains ${epochs.size} entries.") }