Github user dongjinleekr commented on a diff in the pull request:
https://github.com/apache/spark/pull/22282#discussion_r214345173
--- Diff:
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriteTask.scala
---
@@ -131,9 +158,25 @@ private[kafka010] abstract class KafkaRowWriter(
throw new
IllegalStateException(s"${KafkaWriter.VALUE_ATTRIBUTE_NAME} " +
s"attribute unsupported type ${t.catalogString}")
}
+ val headersExpression = inputSchema
+ .find(_.name == KafkaWriter.HEADERS_ATTRIBUTE_NAME).getOrElse(
+ Literal(CatalystTypeConverters.convertToCatalyst(null),
MapType(StringType, BinaryType))
+ )
+ headersExpression.dataType match {
+ case MapType(StringType, BinaryType, true) => // good
+ case t =>
+ throw new
IllegalStateException(s"${KafkaWriter.HEADERS_ATTRIBUTE_NAME} " +
--- End diff --
Oh, I misunderstood; After reviewing the code, I found that
`KafkaRowWriter#createProjection` throws `IllegalStateException` while
`KafkaWriter#validateQuery` throwing `AnalysisException.` I think the reason
should be attributed to the difference between two methods - while the former
one detects the error from the state of `InternalRow,` the later one does by
analyzing the expression's schema.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]