Github user jose-torres commented on a diff in the pull request:
https://github.com/apache/spark/pull/20445#discussion_r165522181
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
---
@@ -89,7 +96,7 @@ case class MemoryStream[A : Encoder](id: Int, sqlContext:
SQLContext)
def addData(data: TraversableOnce[A]): Offset = {
val encoded = data.toVector.map(d => encoder.toRow(d).copy())
- val plan = new LocalRelation(schema.toAttributes, encoded, isStreaming
= true)
+ val plan = new LocalRelation(attributes, encoded, isStreaming = false)
val ds = Dataset[A](sqlContext.sparkSession, plan)
logDebug(s"Adding ds: $ds")
--- End diff --
Do we still need to store the batches as datasets, now that we're just
collect()ing them back out in createDataReaderFactories()?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]