Repository: spark Updated Branches: refs/heads/master 1d04dc95c -> f6fcb4874
[SPARK-11477] [SQL] support create Dataset from RDD Author: Wenchen Fan <wenc...@databricks.com> Closes #9434 from cloud-fan/rdd2ds and squashes the following commits: 0892d72 [Wenchen Fan] support create Dataset from RDD Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f6fcb487 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f6fcb487 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f6fcb487 Branch: refs/heads/master Commit: f6fcb4874ce20a1daa91b7434cf9c0254a89e979 Parents: 1d04dc9 Author: Wenchen Fan <wenc...@databricks.com> Authored: Wed Nov 4 00:15:50 2015 +0100 Committer: Michael Armbrust <mich...@databricks.com> Committed: Wed Nov 4 00:15:50 2015 +0100 ---------------------------------------------------------------------- .../src/main/scala/org/apache/spark/sql/SQLContext.scala | 9 +++++++++ .../src/main/scala/org/apache/spark/sql/SQLImplicits.scala | 4 ++++ .../src/test/scala/org/apache/spark/sql/DatasetSuite.scala | 7 +++++++ 3 files changed, 20 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/f6fcb487/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index 2cb9443..5ad3871 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -499,6 +499,15 @@ class SQLContext private[sql]( new Dataset[T](this, plan) } + def createDataset[T : Encoder](data: RDD[T]): Dataset[T] = { + val enc = encoderFor[T] + val attributes = enc.schema.toAttributes + val encoded = data.map(d => enc.toRow(d)) + val plan = LogicalRDD(attributes, encoded)(self) + + new Dataset[T](this, plan) + } + /** * Creates a DataFrame from an RDD[Row]. User can specify whether the input rows should be * converted to Catalyst rows. http://git-wip-us.apache.org/repos/asf/spark/blob/f6fcb487/sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala index f460a86..f2904e2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala @@ -48,6 +48,10 @@ abstract class SQLImplicits { implicit def newBooleanEncoder: Encoder[Boolean] = ExpressionEncoder[Boolean](flat = true) implicit def newStringEncoder: Encoder[String] = ExpressionEncoder[String](flat = true) + implicit def rddToDatasetHolder[T : Encoder](rdd: RDD[T]): DatasetHolder[T] = { + DatasetHolder(_sqlContext.createDataset(rdd)) + } + implicit def localSeqToDatasetHolder[T : Encoder](s: Seq[T]): DatasetHolder[T] = { DatasetHolder(_sqlContext.createDataset(s)) } http://git-wip-us.apache.org/repos/asf/spark/blob/f6fcb487/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala index 5973fa7..3e9b621 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala @@ -34,6 +34,13 @@ class DatasetSuite extends QueryTest with SharedSQLContext { data: _*) } + test("toDS with RDD") { + val ds = sparkContext.makeRDD(Seq("a", "b", "c"), 3).toDS() + checkAnswer( + ds.mapPartitions(_ => Iterator(1)), + 1, 1, 1) + } + test("as tuple") { val data = Seq(("a", 1), ("b", 2)).toDF("a", "b") checkAnswer( --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org