mumrah commented on code in PR #18218:
URL: https://github.com/apache/kafka/pull/18218#discussion_r1890777304
##########
clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java:
##########
@@ -42,27 +42,16 @@
public class ProduceRequest extends AbstractRequest {
public static final short LAST_STABLE_VERSION_BEFORE_TRANSACTION_V2 = 11;
- public static Builder forMagic(byte magic, ProduceRequestData data,
boolean useTransactionV1Version) {
Review Comment:
Nice to see this record version stuff going away 😄
##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -834,81 +832,6 @@ class KafkaApis(val requestChannel: RequestChannel,
}
}
- def maybeConvertFetchedData(tp: TopicIdPartition,
- partitionData:
FetchResponseData.PartitionData): FetchResponseData.PartitionData = {
- // We will never return a logConfig when the topic is unresolved and the
name is null. This is ok since we won't have any records to convert.
- val logConfig = replicaManager.getLogConfig(tp.topicPartition)
-
- if (logConfig.exists(_.compressionType == BrokerCompressionType.ZSTD) &&
versionId < 10) {
- trace(s"Fetching messages is disabled for ZStandard compressed
partition $tp. Sending unsupported version response to $clientId.")
- FetchResponse.partitionResponse(tp,
Errors.UNSUPPORTED_COMPRESSION_TYPE)
Review Comment:
IIUC, we aren't doing on-the-fly down conversion anymore since we've dropped
all of Fetch RPC versions that didn't support record version 2.
And for extant data with the older record versions, they are forward
compatible with any newer Fetch RPC.
Is that right?
##########
clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java:
##########
@@ -225,33 +214,28 @@ public void clearPartitionRecords() {
data = null;
}
+ //FIXME Check this logic
Review Comment:
Is this still valid?
##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java:
##########
@@ -344,8 +343,7 @@ public RecordAppendResult append(String topic,
}
if (buffer == null) {
- byte maxUsableMagic = apiVersions.maxUsableProduceMagic();
- int size = Math.max(this.batchSize,
AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic,
compression.type(), key, value, headers));
+ int size = Math.max(this.batchSize,
AbstractRecords.estimateSizeInBytesUpperBound(RecordBatch.CURRENT_MAGIC_VALUE,
compression.type(), key, value, headers));
Review Comment:
nit: can you break this long line?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]