Chia-Ping Tsai created KAFKA-10438: -------------------------------------- Summary: Lazy initialization of record header to reduce memory usage in validating records Key: KAFKA-10438 URL: https://issues.apache.org/jira/browse/KAFKA-10438 Project: Kafka Issue Type: Improvement Reporter: Chia-Ping Tsai Assignee: Chia-Ping Tsai
{code} private def validateRecord(batch: RecordBatch, topicPartition: TopicPartition, record: Record, batchIndex: Int, now: Long, timestampType: TimestampType, timestampDiffMaxMs: Long, compactedTopic: Boolean, brokerTopicStats: BrokerTopicStats): Option[ApiRecordError] = { if (!record.hasMagic(batch.magic)) { brokerTopicStats.allTopicsStats.invalidMagicNumberRecordsPerSec.mark() return Some(ApiRecordError(Errors.INVALID_RECORD, new RecordError(batchIndex, s"Record $record's magic does not match outer magic ${batch.magic} in topic partition $topicPartition."))) } // verify the record-level CRC only if this is one of the deep entries of a compressed message // set for magic v0 and v1. For non-compressed messages, there is no inner record for magic v0 and v1, // so we depend on the batch-level CRC check in Log.analyzeAndValidateRecords(). For magic v2 and above, // there is no record-level CRC to check. if (batch.magic <= RecordBatch.MAGIC_VALUE_V1 && batch.isCompressed) { try { record.ensureValid() } catch { case e: InvalidRecordException => brokerTopicStats.allTopicsStats.invalidMessageCrcRecordsPerSec.mark() throw new CorruptRecordException(e.getMessage + s" in topic partition $topicPartition.") } } validateKey(record, batchIndex, topicPartition, compactedTopic, brokerTopicStats).orElse { validateTimestamp(batch, record, batchIndex, now, timestampType, timestampDiffMaxMs) } } {code} There is no checks for header key so instantiating key (bytes to string) is unnecessary. -- This message was sent by Atlassian Jira (v8.3.4#803005)