Github user dongjinleekr commented on a diff in the pull request: https://github.com/apache/spark/pull/22282#discussion_r214198620 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriteTask.scala --- @@ -131,9 +158,25 @@ private[kafka010] abstract class KafkaRowWriter( throw new IllegalStateException(s"${KafkaWriter.VALUE_ATTRIBUTE_NAME} " + s"attribute unsupported type ${t.catalogString}") } + val headersExpression = inputSchema + .find(_.name == KafkaWriter.HEADERS_ATTRIBUTE_NAME).getOrElse( + Literal(CatalystTypeConverters.convertToCatalyst(null), MapType(StringType, BinaryType)) + ) + headersExpression.dataType match { + case MapType(StringType, BinaryType, true) => // good + case t => + throw new IllegalStateException(s"${KafkaWriter.HEADERS_ATTRIBUTE_NAME} " + --- End diff -- Just a typo.
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org