[
https://issues.apache.org/jira/browse/KAFKA-7030?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16511971#comment-16511971
]
ASF GitHub Bot commented on KAFKA-7030:
---------------------------------------
hachikuji closed pull request #5192: KAFKA-7030: Add configuration to disable
message down-conversion (KIP-283)
URL: https://github.com/apache/kafka/pull/5192
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java
b/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java
index d6b70032626..fb2208c0328 100755
--- a/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java
@@ -165,4 +165,11 @@
"the timestamp when a broker receives a message and the timestamp
specified in the message. If " +
"message.timestamp.type=CreateTime, a message will be rejected if the
difference in timestamp " +
"exceeds this threshold. This configuration is ignored if
message.timestamp.type=LogAppendTime.";
+
+ public static final String MESSAGE_DOWNCONVERSION_ENABLE_CONFIG =
"message.downconversion.enable";
+ public static final String MESSAGE_DOWNCONVERSION_ENABLE_DOC = "This
configuration controls whether " +
+ "down-conversion of message formats is enabled to satisfy consume
requests. When set to <code>false</code>, " +
+ "broker will not perform down-conversion for consumers expecting an
older message format. The broker responds " +
+ "with <code>UNSUPPORTED_VERSION</code> error for consume requests from
such older clients. This configuration" +
+ "does not apply to any message format conversion that might be
required for replication to followers.";
}
diff --git a/core/src/main/scala/kafka/log/LogConfig.scala
b/core/src/main/scala/kafka/log/LogConfig.scala
index 158209a1fc0..59269fe18d3 100755
--- a/core/src/main/scala/kafka/log/LogConfig.scala
+++ b/core/src/main/scala/kafka/log/LogConfig.scala
@@ -63,6 +63,7 @@ object Defaults {
val LeaderReplicationThrottledReplicas = Collections.emptyList[String]()
val FollowerReplicationThrottledReplicas = Collections.emptyList[String]()
val MaxIdMapSnapshots = kafka.server.Defaults.MaxIdMapSnapshots
+ val MessageDownConversionEnable =
kafka.server.Defaults.MessageDownConversionEnable
}
case class LogConfig(props: java.util.Map[_, _], overriddenConfigs:
Set[String] = Set.empty)
@@ -96,6 +97,7 @@ case class LogConfig(props: java.util.Map[_, _],
overriddenConfigs: Set[String]
val messageTimestampDifferenceMaxMs =
getLong(LogConfig.MessageTimestampDifferenceMaxMsProp).longValue
val LeaderReplicationThrottledReplicas =
getList(LogConfig.LeaderReplicationThrottledReplicasProp)
val FollowerReplicationThrottledReplicas =
getList(LogConfig.FollowerReplicationThrottledReplicasProp)
+ val messageDownConversionEnable =
getBoolean(LogConfig.MessageDownConversionEnableProp)
def randomSegmentJitter: Long =
if (segmentJitterMs == 0) 0 else Utils.abs(scala.util.Random.nextInt()) %
math.min(segmentJitterMs, segmentMs)
@@ -131,6 +133,7 @@ object LogConfig {
val MessageFormatVersionProp = TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG
val MessageTimestampTypeProp = TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG
val MessageTimestampDifferenceMaxMsProp =
TopicConfig.MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_CONFIG
+ val MessageDownConversionEnableProp =
TopicConfig.MESSAGE_DOWNCONVERSION_ENABLE_CONFIG
// Leave these out of TopicConfig for now as they are replication quota
configs
val LeaderReplicationThrottledReplicasProp =
"leader.replication.throttled.replicas"
@@ -158,6 +161,7 @@ object LogConfig {
val MessageFormatVersionDoc = TopicConfig.MESSAGE_FORMAT_VERSION_DOC
val MessageTimestampTypeDoc = TopicConfig.MESSAGE_TIMESTAMP_TYPE_DOC
val MessageTimestampDifferenceMaxMsDoc =
TopicConfig.MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_DOC
+ val MessageDownConversionEnableDoc =
TopicConfig.MESSAGE_DOWNCONVERSION_ENABLE_DOC
val LeaderReplicationThrottledReplicasDoc = "A list of replicas for which
log replication should be throttled on " +
"the leader side. The list should describe a set of replicas in the form "
+
@@ -262,6 +266,8 @@ object LogConfig {
LeaderReplicationThrottledReplicasDoc,
LeaderReplicationThrottledReplicasProp)
.define(FollowerReplicationThrottledReplicasProp, LIST,
Defaults.FollowerReplicationThrottledReplicas, ThrottledReplicaListValidator,
MEDIUM,
FollowerReplicationThrottledReplicasDoc,
FollowerReplicationThrottledReplicasProp)
+ .define(MessageDownConversionEnableProp, BOOLEAN,
Defaults.MessageDownConversionEnable, LOW,
+ MessageDownConversionEnableDoc,
KafkaConfig.LogMessageDownConversionEnableProp)
}
def apply(): LogConfig = LogConfig(new Properties())
@@ -325,7 +331,8 @@ object LogConfig {
PreAllocateEnableProp -> KafkaConfig.LogPreAllocateProp,
MessageFormatVersionProp -> KafkaConfig.LogMessageFormatVersionProp,
MessageTimestampTypeProp -> KafkaConfig.LogMessageTimestampTypeProp,
- MessageTimestampDifferenceMaxMsProp ->
KafkaConfig.LogMessageTimestampDifferenceMaxMsProp
+ MessageTimestampDifferenceMaxMsProp ->
KafkaConfig.LogMessageTimestampDifferenceMaxMsProp,
+ MessageDownConversionEnableProp ->
KafkaConfig.LogMessageDownConversionEnableProp
)
}
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala
b/core/src/main/scala/kafka/server/KafkaApis.scala
index ae7845b5166..ae80029e79c 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -43,7 +43,6 @@ import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.errors._
import org.apache.kafka.common.internals.FatalExitError
import org.apache.kafka.common.internals.Topic.{GROUP_METADATA_TOPIC_NAME,
TRANSACTION_STATE_TOPIC_NAME, isInternal}
-import org.apache.kafka.common.requests.FetchMetadata.INVALID_SESSION_ID
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.network.{ListenerName, Send}
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
@@ -507,44 +506,41 @@ class KafkaApis(val requestChannel: RequestChannel,
fetchRequest.toForget(),
fetchRequest.isFromFollower())
+ def errorResponse[T >: MemoryRecords <: BaseRecords](error: Errors):
FetchResponse.PartitionData[T] = {
+ new FetchResponse.PartitionData[T](error,
FetchResponse.INVALID_HIGHWATERMARK, FetchResponse.INVALID_LAST_STABLE_OFFSET,
+ FetchResponse.INVALID_LOG_START_OFFSET, null, MemoryRecords.EMPTY)
+ }
+
val erroneous = mutable.ArrayBuffer[(TopicPartition,
FetchResponse.PartitionData[Records])]()
val interesting = mutable.ArrayBuffer[(TopicPartition,
FetchRequest.PartitionData)]()
if (fetchRequest.isFromFollower()) {
// The follower must have ClusterAction on ClusterResource in order to
fetch partition data.
if (authorize(request.session, ClusterAction, Resource.ClusterResource))
{
- fetchContext.foreachPartition((topicPartition, data) => {
- if (!metadataCache.contains(topicPartition)) {
- erroneous += topicPartition -> new
FetchResponse.PartitionData(Errors.UNKNOWN_TOPIC_OR_PARTITION,
- FetchResponse.INVALID_HIGHWATERMARK,
FetchResponse.INVALID_LAST_STABLE_OFFSET,
- FetchResponse.INVALID_LOG_START_OFFSET, null,
MemoryRecords.EMPTY)
- } else {
+ fetchContext.foreachPartition { (topicPartition, data) =>
+ if (!metadataCache.contains(topicPartition))
+ erroneous += topicPartition ->
errorResponse(Errors.UNKNOWN_TOPIC_OR_PARTITION)
+ else
interesting += (topicPartition -> data)
- }
- })
+ }
} else {
- fetchContext.foreachPartition((part, _) => {
- erroneous += part -> new
FetchResponse.PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED,
- FetchResponse.INVALID_HIGHWATERMARK,
FetchResponse.INVALID_LAST_STABLE_OFFSET,
- FetchResponse.INVALID_LOG_START_OFFSET, null, MemoryRecords.EMPTY)
- })
+ fetchContext.foreachPartition { (part, _) =>
+ erroneous += part -> errorResponse(Errors.TOPIC_AUTHORIZATION_FAILED)
+ }
}
} else {
// Regular Kafka consumers need READ permission on each partition they
are fetching.
- fetchContext.foreachPartition((topicPartition, data) => {
+ fetchContext.foreachPartition { (topicPartition, data) =>
if (!authorize(request.session, Read, Resource(Topic,
topicPartition.topic, LITERAL)))
- erroneous += topicPartition -> new
FetchResponse.PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED,
- FetchResponse.INVALID_HIGHWATERMARK,
FetchResponse.INVALID_LAST_STABLE_OFFSET,
- FetchResponse.INVALID_LOG_START_OFFSET, null, MemoryRecords.EMPTY)
+ erroneous += topicPartition ->
errorResponse(Errors.TOPIC_AUTHORIZATION_FAILED)
else if (!metadataCache.contains(topicPartition))
- erroneous += topicPartition -> new
FetchResponse.PartitionData(Errors.UNKNOWN_TOPIC_OR_PARTITION,
- FetchResponse.INVALID_HIGHWATERMARK,
FetchResponse.INVALID_LAST_STABLE_OFFSET,
- FetchResponse.INVALID_LOG_START_OFFSET, null, MemoryRecords.EMPTY)
+ erroneous += topicPartition ->
errorResponse(Errors.UNKNOWN_TOPIC_OR_PARTITION)
else
interesting += (topicPartition -> data)
- })
+ }
}
- def convertRecords(tp: TopicPartition, unconvertedRecords: Records):
BaseRecords = {
+ def maybeConvertFetchedData(tp: TopicPartition,
+ partitionData:
FetchResponse.PartitionData[Records]): FetchResponse.PartitionData[BaseRecords]
= {
// Down-conversion of the fetched records is needed when the stored
magic version is
// greater than that supported by the client (as indicated by the fetch
request version). If the
// configured magic version for the topic is less than or equal to that
supported by the version of the
@@ -552,8 +548,10 @@ class KafkaApis(val requestChannel: RequestChannel,
// know it must be supported. However, if the magic version is changed
from a higher version back to a
// lower version, this check will no longer be valid and we will fail to
down-convert the messages
// which were written in the new format prior to the version downgrade.
- replicaManager.getMagic(tp).flatMap { magic =>
- val downConvertMagic = {
+ val unconvertedRecords = partitionData.records
+ val logConfig = replicaManager.getLogConfig(tp)
+ val downConvertMagic =
+ logConfig.map(_.messageFormatVersion.recordVersion.value).flatMap {
magic =>
if (magic > RecordBatch.MAGIC_VALUE_V0 && versionId <= 1 &&
!unconvertedRecords.hasCompatibleMagic(RecordBatch.MAGIC_VALUE_V0))
Some(RecordBatch.MAGIC_VALUE_V0)
else if (magic > RecordBatch.MAGIC_VALUE_V1 && versionId <= 3 &&
!unconvertedRecords.hasCompatibleMagic(RecordBatch.MAGIC_VALUE_V1))
@@ -562,28 +560,36 @@ class KafkaApis(val requestChannel: RequestChannel,
None
}
- downConvertMagic.map { magic =>
- trace(s"Down converting records from partition $tp to message format
version $magic for fetch request from $clientId")
-
- // Because down-conversion is extremely memory intensive, we want to
try and delay the down-conversion as much
- // as possible. With KIP-283, we have the ability to lazily
down-convert in a chunked manner. The lazy, chunked
- // down-conversion always guarantees that at least one batch of
messages is down-converted and sent out to the
- // client.
- new LazyDownConversionRecords(tp, unconvertedRecords, magic,
fetchContext.getFetchOffset(tp).get, time)
- }
- }.getOrElse(unconvertedRecords)
+ // For fetch requests from clients, check if down-conversion is disabled
for the particular partition
+ if (downConvertMagic.isDefined && !fetchRequest.isFromFollower &&
!logConfig.forall(_.messageDownConversionEnable)) {
+ trace(s"Conversion to message format ${downConvertMagic.get} is
disabled for partition $tp. Sending unsupported version response to $clientId.")
+ errorResponse(Errors.UNSUPPORTED_VERSION)
+ } else {
+ val convertedRecords =
+ downConvertMagic.map { magic =>
+ trace(s"Down converting records from partition $tp to message
format version $magic for fetch request from $clientId")
+ // Because down-conversion is extremely memory intensive, we want
to try and delay the down-conversion as much
+ // as possible. With KIP-283, we have the ability to lazily
down-convert in a chunked manner. The lazy, chunked
+ // down-conversion always guarantees that at least one batch of
messages is down-converted and sent out to the
+ // client.
+ new LazyDownConversionRecords(tp, unconvertedRecords, magic,
fetchContext.getFetchOffset(tp).get, time)
+ }.getOrElse(unconvertedRecords)
+ new FetchResponse.PartitionData[BaseRecords](partitionData.error,
partitionData.highWatermark,
+ FetchResponse.INVALID_LAST_STABLE_OFFSET,
partitionData.logStartOffset, partitionData.abortedTransactions,
+ convertedRecords)
+ }
}
// the callback for process a fetch response, invoked before throttling
def processResponseCallback(responsePartitionData: Seq[(TopicPartition,
FetchPartitionData)]): Unit = {
val partitions = new util.LinkedHashMap[TopicPartition,
FetchResponse.PartitionData[Records]]
- responsePartitionData.foreach{ case (tp, data) =>
+ responsePartitionData.foreach { case (tp, data) =>
val abortedTransactions = data.abortedTransactions.map(_.asJava).orNull
val lastStableOffset =
data.lastStableOffset.getOrElse(FetchResponse.INVALID_LAST_STABLE_OFFSET)
partitions.put(tp, new FetchResponse.PartitionData(data.error,
data.highWatermark, lastStableOffset,
data.logStartOffset, abortedTransactions, data.records))
}
- erroneous.foreach{case (tp, data) => partitions.put(tp, data)}
+ erroneous.foreach { case (tp, data) => partitions.put(tp, data) }
// When this callback is triggered, the remote API call has completed.
// Record time before any byte-rate throttling.
@@ -598,14 +604,10 @@ class KafkaApis(val requestChannel: RequestChannel,
if (unconvertedPartitionData.error != Errors.NONE)
debug(s"Fetch request with correlation id
${request.header.correlationId} from client $clientId " +
s"on partition $tp failed due to
${unconvertedPartitionData.error.exceptionName}")
- val convertedRecords = convertRecords(tp,
unconvertedPartitionData.records)
- val convertedPartitionData = new
FetchResponse.PartitionData[BaseRecords](unconvertedPartitionData.error,
- unconvertedPartitionData.highWatermark,
FetchResponse.INVALID_LAST_STABLE_OFFSET,
unconvertedPartitionData.logStartOffset,
- unconvertedPartitionData.abortedTransactions, convertedRecords)
- convertedData.put(tp, convertedPartitionData)
+ convertedData.put(tp, maybeConvertFetchedData(tp,
unconvertedPartitionData))
}
- // Prepare fetch resopnse from converted data
+ // Prepare fetch response from converted data
val response = new FetchResponse(unconvertedFetchResponse.error(),
convertedData, throttleTimeMs,
unconvertedFetchResponse.sessionId())
response.responseData.asScala.foreach { case (topicPartition, data) =>
@@ -1455,7 +1457,7 @@ class KafkaApis(val requestChannel: RequestChannel,
duplicateTopics.keySet.map((_, new
ApiError(Errors.INVALID_REQUEST, errorMessage))).toMap
} else Map.empty
- val unauthorizedTopicsResults = unauthorizedTopics.keySet.map(_ -> new
ApiError(Errors.TOPIC_AUTHORIZATION_FAILED, null))
+ val unauthorizedTopicsResults = unauthorizedTopics.keySet.map(_ -> new
ApiError(Errors.TOPIC_AUTHORIZATION_FAILED, null))
val completeResults = results ++ duplicatedTopicsResults ++
unauthorizedTopicsResults
sendResponseCallback(completeResults)
}
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala
b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 19bb8074958..5a021919dd6 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -116,6 +116,7 @@ object Defaults {
val NumRecoveryThreadsPerDataDir = 1
val AutoCreateTopicsEnable = true
val MinInSyncReplicas = 1
+ val MessageDownConversionEnable = true
/** ********* Replication configuration ***********/
val ControllerSocketTimeoutMs = RequestTimeoutMs
@@ -330,6 +331,7 @@ object KafkaConfig {
val MinInSyncReplicasProp = "min.insync.replicas"
val CreateTopicPolicyClassNameProp = "create.topic.policy.class.name"
val AlterConfigPolicyClassNameProp = "alter.config.policy.class.name"
+ val LogMessageDownConversionEnableProp = LogConfigPrefix +
"message.downconversion.enable"
/** ********* Replication configuration ***********/
val ControllerSocketTimeoutMsProp = "controller.socket.timeout.ms"
val DefaultReplicationFactorProp = "default.replication.factor"
@@ -599,6 +601,7 @@ object KafkaConfig {
"implement the
<code>org.apache.kafka.server.policy.CreateTopicPolicy</code> interface."
val AlterConfigPolicyClassNameDoc = "The alter configs policy class that
should be used for validation. The class should " +
"implement the
<code>org.apache.kafka.server.policy.AlterConfigPolicy</code> interface."
+ val LogMessageDownConversionEnableDoc =
TopicConfig.MESSAGE_DOWNCONVERSION_ENABLE_DOC;
/** ********* Replication configuration ***********/
val ControllerSocketTimeoutMsDoc = "The socket timeout for
controller-to-broker channels"
@@ -862,6 +865,7 @@ object KafkaConfig {
.define(LogMessageTimestampDifferenceMaxMsProp, LONG,
Defaults.LogMessageTimestampDifferenceMaxMs, MEDIUM,
LogMessageTimestampDifferenceMaxMsDoc)
.define(CreateTopicPolicyClassNameProp, CLASS, null, LOW,
CreateTopicPolicyClassNameDoc)
.define(AlterConfigPolicyClassNameProp, CLASS, null, LOW,
AlterConfigPolicyClassNameDoc)
+ .define(LogMessageDownConversionEnableProp, BOOLEAN,
Defaults.MessageDownConversionEnable, LOW, LogMessageDownConversionEnableDoc)
/** ********* Replication configuration ***********/
.define(ControllerSocketTimeoutMsProp, INT,
Defaults.ControllerSocketTimeoutMs, MEDIUM, ControllerSocketTimeoutMsDoc)
@@ -1135,6 +1139,7 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog:
Boolean, dynamicConfigO
val logMessageFormatVersion = ApiVersion(logMessageFormatVersionString)
def logMessageTimestampType =
TimestampType.forName(getString(KafkaConfig.LogMessageTimestampTypeProp))
def logMessageTimestampDifferenceMaxMs: Long =
getLong(KafkaConfig.LogMessageTimestampDifferenceMaxMsProp)
+ def logMessageDownConversionEnable: Boolean =
getBoolean(KafkaConfig.LogMessageDownConversionEnableProp)
/** ********* Replication configuration ***********/
val controllerSocketTimeoutMs: Int =
getInt(KafkaConfig.ControllerSocketTimeoutMsProp)
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala
b/core/src/main/scala/kafka/server/KafkaServer.scala
index 57bca697437..f73ede619dd 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -79,6 +79,7 @@ object KafkaServer {
logProps.put(LogConfig.MessageFormatVersionProp,
kafkaConfig.logMessageFormatVersion.version)
logProps.put(LogConfig.MessageTimestampTypeProp,
kafkaConfig.logMessageTimestampType.name)
logProps.put(LogConfig.MessageTimestampDifferenceMaxMsProp,
kafkaConfig.logMessageTimestampDifferenceMaxMs: java.lang.Long)
+ logProps.put(LogConfig.MessageDownConversionEnableProp,
kafkaConfig.logMessageDownConversionEnable: java.lang.Boolean)
logProps
}
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala
b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 24f3235570f..965595b2c2e 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -25,7 +25,7 @@ import com.yammer.metrics.core.Gauge
import kafka.api._
import kafka.cluster.{BrokerEndPoint, Partition, Replica}
import kafka.controller.{KafkaController, StateChangeLogger}
-import kafka.log.{Log, LogAppendInfo, LogManager}
+import kafka.log.{Log, LogAppendInfo, LogConfig, LogManager}
import kafka.metrics.KafkaMetricsGroup
import kafka.server.QuotaFactory.{QuotaManagers, UnboundedQuota}
import kafka.server.checkpoints.OffsetCheckpointFile
@@ -995,8 +995,9 @@ class ReplicaManager(val config: KafkaConfig,
quota.isThrottled(topicPartition) && quota.isQuotaExceeded &&
!isReplicaInSync
}
- def getMagic(topicPartition: TopicPartition): Option[Byte] =
-
getReplica(topicPartition).flatMap(_.log.map(_.config.messageFormatVersion.recordVersion.value))
+ def getLogConfig(topicPartition: TopicPartition): Option[LogConfig] =
getReplica(topicPartition).flatMap(_.log.map(_.config))
+
+ def getMagic(topicPartition: TopicPartition): Option[Byte] =
getLogConfig(topicPartition).map(_.messageFormatVersion.recordVersion.value)
def maybeUpdateMetadataCache(correlationId: Int, updateMetadataRequest:
UpdateMetadataRequest) : Seq[TopicPartition] = {
replicaStateChangeLock synchronized {
diff --git
a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
index 45b3fdc74bd..69ca31703ef 100644
---
a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
+++
b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
@@ -369,6 +369,7 @@ class DynamicBrokerReconfigurationTest extends
ZooKeeperTestHarness with SaslSet
props.put(KafkaConfig.LogPreAllocateProp, true.toString)
props.put(KafkaConfig.LogMessageTimestampTypeProp,
TimestampType.LOG_APPEND_TIME.toString)
props.put(KafkaConfig.LogMessageTimestampDifferenceMaxMsProp, "1000")
+ props.put(KafkaConfig.LogMessageDownConversionEnableProp, "false")
reconfigureServers(props, perBrokerConfig = false,
(KafkaConfig.LogSegmentBytesProp, "4000"))
// Verify that all broker defaults have been updated
diff --git
a/core/src/test/scala/unit/kafka/server/FetchRequestDownConversionConfigTest.scala
b/core/src/test/scala/unit/kafka/server/FetchRequestDownConversionConfigTest.scala
new file mode 100644
index 00000000000..e5ef9858cf7
--- /dev/null
+++
b/core/src/test/scala/unit/kafka/server/FetchRequestDownConversionConfigTest.scala
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package unit.kafka.server
+
+import java.util
+import java.util.Properties
+
+import kafka.log.LogConfig
+import kafka.server.{BaseRequestTest, KafkaConfig}
+import kafka.utils.TestUtils
+import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.protocol.{ApiKeys, Errors}
+import org.apache.kafka.common.record.MemoryRecords
+import org.apache.kafka.common.requests.{FetchRequest, FetchResponse}
+import org.apache.kafka.common.serialization.StringSerializer
+import org.junit.Assert._
+import org.junit.Test
+
+class FetchRequestDownConversionConfigTest extends BaseRequestTest {
+ private var producer: KafkaProducer[String, String] = null
+ override def numBrokers: Int = 1
+
+ override def setUp(): Unit = {
+ super.setUp()
+ initProducer()
+ }
+
+ override def tearDown(): Unit = {
+ if (producer != null)
+ producer.close()
+ super.tearDown()
+ }
+
+ override protected def propertyOverrides(properties: Properties): Unit = {
+ super.propertyOverrides(properties)
+ properties.put(KafkaConfig.LogMessageDownConversionEnableProp, "false")
+ }
+
+ private def initProducer(): Unit = {
+ producer =
TestUtils.createNewProducer(TestUtils.getBrokerListStrFromServers(servers),
+ retries = 5, keySerializer = new StringSerializer, valueSerializer = new
StringSerializer)
+ }
+
+ private def createTopics(numTopics: Int, numPartitions: Int,
+ configs: Map[String, String] = Map.empty,
topicSuffixStart: Int = 0): Map[TopicPartition, Int] = {
+ val topics = (0 until numTopics).map(t => s"topic${t + topicSuffixStart}")
+ val topicConfig = new Properties
+ topicConfig.setProperty(LogConfig.MinInSyncReplicasProp, 1.toString)
+ configs.foreach { case (k, v) => topicConfig.setProperty(k, v) }
+ topics.flatMap { topic =>
+ val partitionToLeader = createTopic(topic, numPartitions =
numPartitions, replicationFactor = 1,
+ topicConfig = topicConfig)
+ partitionToLeader.map { case (partition, leader) => new
TopicPartition(topic, partition) -> leader }
+ }.toMap
+ }
+
+ private def createPartitionMap(maxPartitionBytes: Int, topicPartitions:
Seq[TopicPartition],
+ offsetMap: Map[TopicPartition, Long] =
Map.empty): util.LinkedHashMap[TopicPartition, FetchRequest.PartitionData] = {
+ val partitionMap = new util.LinkedHashMap[TopicPartition,
FetchRequest.PartitionData]
+ topicPartitions.foreach { tp =>
+ partitionMap.put(tp, new
FetchRequest.PartitionData(offsetMap.getOrElse(tp, 0), 0L, maxPartitionBytes))
+ }
+ partitionMap
+ }
+
+ private def sendFetchRequest(leaderId: Int, request: FetchRequest):
FetchResponse[MemoryRecords] = {
+ val response = connectAndSend(request, ApiKeys.FETCH, destination =
brokerSocketServer(leaderId))
+ FetchResponse.parse(response, request.version)
+ }
+
+ /**
+ * Tests that fetch request that require down-conversion returns with an
error response when down-conversion is disabled on broker.
+ */
+ @Test
+ def testV1FetchWithDownConversionDisabled(): Unit = {
+ val topicMap = createTopics(numTopics = 5, numPartitions = 1)
+ val topicPartitions = topicMap.keySet.toSeq
+ topicPartitions.foreach(tp => producer.send(new ProducerRecord(tp.topic(),
"key", "value")).get())
+ val fetchRequest = FetchRequest.Builder.forConsumer(Int.MaxValue, 0,
createPartitionMap(1024,
+ topicPartitions)).build(1)
+ val fetchResponse = sendFetchRequest(topicMap.head._2, fetchRequest)
+ topicPartitions.foreach(tp => assertEquals(Errors.UNSUPPORTED_VERSION,
fetchResponse.responseData().get(tp).error))
+ }
+
+ /**
+ * Tests that "message.downconversion.enable" has no effect when
down-conversion is not required.
+ */
+ @Test
+ def testLatestFetchWithDownConversionDisabled(): Unit = {
+ val topicMap = createTopics(numTopics = 5, numPartitions = 1)
+ val topicPartitions = topicMap.keySet.toSeq
+ topicPartitions.foreach(tp => producer.send(new ProducerRecord(tp.topic(),
"key", "value")).get())
+ val fetchRequest = FetchRequest.Builder.forConsumer(Int.MaxValue, 0,
createPartitionMap(1024,
+ topicPartitions)).build()
+ val fetchResponse = sendFetchRequest(topicMap.head._2, fetchRequest)
+ topicPartitions.foreach(tp => assertEquals(Errors.NONE,
fetchResponse.responseData().get(tp).error))
+ }
+
+ /**
+ * Tests that "message.downconversion.enable" can be set at topic level, and
its configuration is obeyed for client
+ * fetch requests.
+ */
+ @Test
+ def testV1FetchWithTopicLevelOverrides(): Unit = {
+ // create topics with default down-conversion configuration (i.e.
conversion disabled)
+ val conversionDisabledTopicsMap = createTopics(numTopics = 5,
numPartitions = 1, topicSuffixStart = 0)
+ val conversionDisabledTopicPartitions =
conversionDisabledTopicsMap.keySet.toSeq
+
+ // create topics with down-conversion configuration enabled
+ val topicConfig = Map(LogConfig.MessageDownConversionEnableProp -> "true")
+ val conversionEnabledTopicsMap = createTopics(numTopics = 5, numPartitions
= 1, topicConfig, topicSuffixStart = 5)
+ val conversionEnabledTopicPartitions =
conversionEnabledTopicsMap.keySet.toSeq
+
+ val allTopics = conversionDisabledTopicPartitions ++
conversionEnabledTopicPartitions
+ val leaderId = conversionDisabledTopicsMap.head._2
+
+ allTopics.foreach(tp => producer.send(new ProducerRecord(tp.topic(),
"key", "value")).get())
+ val fetchRequest = FetchRequest.Builder.forConsumer(Int.MaxValue, 0,
createPartitionMap(1024,
+ allTopics)).build(1)
+ val fetchResponse = sendFetchRequest(leaderId, fetchRequest)
+
+ conversionDisabledTopicPartitions.foreach(tp =>
assertEquals(Errors.UNSUPPORTED_VERSION,
fetchResponse.responseData().get(tp).error))
+ conversionEnabledTopicPartitions.foreach(tp => assertEquals(Errors.NONE,
fetchResponse.responseData().get(tp).error))
+ }
+
+ /**
+ * Tests that "message.downconversion.enable" has no effect on fetch
requests from replicas.
+ */
+ @Test
+ def testV1FetchFromReplica(): Unit = {
+ // create topics with default down-conversion configuration (i.e.
conversion disabled)
+ val conversionDisabledTopicsMap = createTopics(numTopics = 5,
numPartitions = 1, topicSuffixStart = 0)
+ val conversionDisabledTopicPartitions =
conversionDisabledTopicsMap.keySet.toSeq
+
+ // create topics with down-conversion configuration enabled
+ val topicConfig = Map(LogConfig.MessageDownConversionEnableProp -> "true")
+ val conversionEnabledTopicsMap = createTopics(numTopics = 5, numPartitions
= 1, topicConfig, topicSuffixStart = 5)
+ val conversionEnabledTopicPartitions =
conversionEnabledTopicsMap.keySet.toSeq
+
+ val allTopicPartitions = conversionDisabledTopicPartitions ++
conversionEnabledTopicPartitions
+ val leaderId = conversionDisabledTopicsMap.head._2
+
+ allTopicPartitions.foreach(tp => producer.send(new
ProducerRecord(tp.topic(), "key", "value")).get())
+ val fetchRequest = FetchRequest.Builder.forReplica(1, 1, Int.MaxValue, 0,
+ createPartitionMap(1024, allTopicPartitions)).build()
+ val fetchResponse = sendFetchRequest(leaderId, fetchRequest)
+
+ allTopicPartitions.foreach(tp => assertEquals(Errors.NONE,
fetchResponse.responseData().get(tp).error))
+ }
+}
diff --git a/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
b/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
index 424b8c79fe4..1ba388ef4d2 100644
--- a/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
@@ -426,7 +426,7 @@ class FetchRequestTest extends BaseRequestTest {
}
private def createTopics(numTopics: Int, numPartitions: Int, configs:
Map[String, String] = Map.empty): Map[TopicPartition, Int] = {
- val topics = (0 until numPartitions).map(t => s"topic$t")
+ val topics = (0 until numTopics).map(t => s"topic$t")
val topicConfig = new Properties
topicConfig.setProperty(LogConfig.MinInSyncReplicasProp, 2.toString)
configs.foreach { case (k, v) => topicConfig.setProperty(k, v) }
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Add configuration to disable message down-conversion
> ----------------------------------------------------
>
> Key: KAFKA-7030
> URL: https://issues.apache.org/jira/browse/KAFKA-7030
> Project: Kafka
> Issue Type: Sub-task
> Reporter: Dhruvil Shah
> Assignee: Dhruvil Shah
> Priority: Major
> Fix For: 2.0.0
>
>
> Add configuration to disable message down-conversion as described in
> [KIP-283|https://cwiki.apache.org/confluence/display/KAFKA/KIP-283%3A+Efficient+Memory+Usage+for+Down-Conversion].
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)