dajac commented on a change in pull request #11695: URL: https://github.com/apache/kafka/pull/11695#discussion_r795497752
########## File path: core/src/main/scala/kafka/tools/ConsoleProducer.scala ########## @@ -362,11 +375,21 @@ object ConsoleProducer { } private def splitHeaders(headers: String): Array[(String, Array[Byte])] = { - headerSeparatorPattern.split(headers).map { pair => - (pair.indexOf(headerKeySeparator), ignoreError) match { + headersSeparatorPattern.split(headers).map { pair => + (pair.indexOf(headersKeySeparator), ignoreError) match { case (-1, false) => throw new KafkaException(s"No header key separator found in pair '$pair' on line number $lineNumber") case (-1, true) => (pair, null) - case (i, _) => (pair.substring(0, i), pair.substring(i + headerKeySeparator.length).getBytes(StandardCharsets.UTF_8)) + case (i, _) => + val headerKey = pair.substring(0, i) match { + case k => + if (k == nullMarker) + throw new KafkaException(s"Header keys should not be equal to the null marker '$nullMarker' as they can't be null") + else k Review comment: nit: We could use `case k if (k == nullMarker) =>` followed by `case k =>` for the default case. That seems a bit more intuitive when using pattern matching. ########## File path: core/src/main/scala/kafka/tools/ConsoleProducer.scala ########## @@ -302,22 +304,33 @@ object ConsoleProducer { if (props.containsKey("key.separator")) keySeparator = props.getProperty("key.separator") if (props.containsKey("parse.headers")) - parseHeader = props.getProperty("parse.headers").trim.equalsIgnoreCase("true") + parseHeaders = props.getProperty("parse.headers").trim.equalsIgnoreCase("true") if (props.containsKey("headers.delimiter")) headersDelimiter = props.getProperty("headers.delimiter") if (props.containsKey("headers.separator")) headersSeparator = props.getProperty("headers.separator") - headerSeparatorPattern = Pattern.compile(headersSeparator) + headersSeparatorPattern = Pattern.compile(headersSeparator) if (props.containsKey("headers.key.separator")) - headerKeySeparator = props.getProperty("headers.key.separator") + headersKeySeparator = props.getProperty("headers.key.separator") if (props.containsKey("ignore.error")) ignoreError = props.getProperty("ignore.error").trim.equalsIgnoreCase("true") if (headersDelimiter == headersSeparator) throw new KafkaException("headers.delimiter and headers.separator may not be equal") - if (headersDelimiter == headerKeySeparator) + if (headersDelimiter == headersKeySeparator) throw new KafkaException("headers.delimiter and headers.key.separator may not be equal") - if (headersSeparator == headerKeySeparator) + if (headersSeparator == headersKeySeparator) throw new KafkaException("headers.separator and headers.key.separator may not be equal") + if (props.containsKey("null.marker")) { + nullMarker = props.getProperty("null.marker") + } Review comment: nit: I would remove the curly braces here to stay inline with the style of the other `if`s. ########## File path: core/src/main/scala/kafka/tools/ConsoleProducer.scala ########## @@ -362,11 +375,21 @@ object ConsoleProducer { } private def splitHeaders(headers: String): Array[(String, Array[Byte])] = { - headerSeparatorPattern.split(headers).map { pair => - (pair.indexOf(headerKeySeparator), ignoreError) match { + headersSeparatorPattern.split(headers).map { pair => + (pair.indexOf(headersKeySeparator), ignoreError) match { case (-1, false) => throw new KafkaException(s"No header key separator found in pair '$pair' on line number $lineNumber") case (-1, true) => (pair, null) - case (i, _) => (pair.substring(0, i), pair.substring(i + headerKeySeparator.length).getBytes(StandardCharsets.UTF_8)) + case (i, _) => + val headerKey = pair.substring(0, i) match { + case k => + if (k == nullMarker) + throw new KafkaException(s"Header keys should not be equal to the null marker '$nullMarker' as they can't be null") + else k + } + val headerValue = pair.substring(i + headersKeySeparator.length) match { + case v => if (v == nullMarker) null else v.getBytes(StandardCharsets.UTF_8) Review comment: nit: ditto. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org