The types expected by applySchema are documented in the type reference section: http://spark.apache.org/docs/latest/sql-programming-guide.html#spark-sql-datatype-reference
I'd certainly accept a PR to improve the docs and add a link to this from the applySchema section :) Can you explain why you are using mapPartitions and UDFs don't work for you? SQL doesn't really have a great support for partitions in general... We do support for Hive TGFs though and we could possibly add better scala syntax for this concept or something else. On Mon, Jan 5, 2015 at 9:52 PM, Tobias Pfeiffer <[email protected]> wrote: > Hi, > > I have a SchemaRDD where I want to add a column with a value that is > computed from the rest of the row. As the computation involves a > network operation and requires setup code, I can't use > "SELECT *, myUDF(*) FROM rdd", > but I wanted to use a combination of: > > - get schema of input SchemaRDD > - issue a mapPartitions call (including the setup code), obtaining a > new RDD[Row] > - extend the schema manually > - create a new RDD by combining the RDD[Row] with the extended > schema. > > This works very well, but I run into trouble querying that resulting > SchemaRDD with SQL if: > > - the result of my computation is a case class > - and I want to use values in this case class in the SQL query. > > In particular, while > > SELECT column FROM resultrdd > > works well, > > SELECT column.key_name FROM resultrdd > > gives a > > java.lang.ClassCastException: example.MyCaseClass cannot be cast to > org.apache.spark.sql.catalyst.expressions.Row > > Here is an example to illustrate that: > > ----------------------------------- > > import org.apache.spark._import org.apache.spark.sql._import > org.apache.spark.sql.catalyst.types._ > > val sc = new SparkContext("local[3]", "Test") > val sqlc = new SQLContext(sc)import sqlc._ > // this is the case class that my operation is returningcase class > Result(string_values: Map[String, String], num_values: > Map[String, Double])// dummy result dataval data = (Result(Map("team" -> > "a"), Map("score" -> 0.8)) :: Result(Map("team" -> "b"), > Map("score" -> 0.5)) :: Nil)val rdd = sc.parallelize(data)// simulate my > computation by creating an RDD[Row] and creating// a schema > programmaticallyval rowRdd = rdd.map(dr => Row.fromSeq(7 :: dr :: Nil))val > progSchema = StructType(StructField("hello", IntegerType, false) :: > StructField("newcol", rdd.schema, true) :: Nil)val progRdd = > sqlc.applySchema(rowRdd, progSchema)progRdd.registerTempTable("progrdd")// > the following call will *fail* with a ClassCastExceptionsqlc.sql("SELECT > newcol.string_values['team'] FROM progrdd").foreach(println)// however, the > schema I specified is correct. see how embedding// my result in a proper case > class works:case class ResultContainer(hello: Int, newcol: Result)val > caseClassRdd = rdd.map(dr => ResultContainer(7, > dr))caseClassRdd.registerTempTable("caseclassrdd")// the following call will > *work*sqlc.sql("SELECT newcol.string_values['team'] FROM > caseclassrdd").foreach(println)// even though the schema for both RDDs is the > same:progRdd.schema == caseClassRdd.schema > > > ----------------------------------- > > It turns out that I cannot use the case class directly, but I have to > convert it to a Row as well. That is, instead of > > val rowRdd = rdd.map(dr => Row.fromSeq(7 :: dr :: Nil)) > > I have to use > > val rowRdd = rdd.map(dr => Row.fromSeq(7 :: > Row.fromSeq(dr.productIterator.toSeq) :: Nil)) > > and then, I can use > > SELECT newcol.string_values['team'] FROM progrdd > > So now I found that out and I'm happy that it works, but it was quite > hard to track it down, so I was wondering if this is the most > intuitive way to add a column to a SchemaRDD using mapPartitions (as > opposed to using a UDF, where the conversion "case class -> Row" > seems to happen automatically). > > Or, even if there is no more intuitive way, just wanted to have this > documented ;-) > > Thanks > Tobias > >
