Github user HyukjinKwon commented on a diff in the pull request:
https://github.com/apache/spark/pull/21477#discussion_r193299622
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ForeachWriterProvider.scala
---
@@ -44,40 +51,61 @@ case class ForeachWriterProvider[T: Encoder](writer:
ForeachWriter[T]) extends S
override def abort(epochId: Long, messages:
Array[WriterCommitMessage]): Unit = {}
override def createInternalRowWriterFactory():
DataWriterFactory[InternalRow] = {
- val encoder = encoderFor[T].resolveAndBind(
- schema.toAttributes,
- SparkSession.getActiveSession.get.sessionState.analyzer)
- ForeachWriterFactory(writer, encoder)
+ val rowConverter: InternalRow => T = converter match {
+ case Left(enc) =>
+ val boundEnc = enc.resolveAndBind(
+ schema.toAttributes,
+ SparkSession.getActiveSession.get.sessionState.analyzer)
+ boundEnc.fromRow
+ case Right(func) =>
--- End diff --
Can we at least note that this path is Python specific?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]