Github user FurcyPin commented on the pull request:
https://github.com/apache/spark/pull/5713#issuecomment-153306275
Hi @rayortigas,
I have been working on a similar feature.
So far I have something that work similarly but that is a little less easy
to use,
here is an sample demo:
```scala
case class DemoCC(int: Int, boolean: Boolean, string: Option[String])
extends Serializable
object Demo {
def main(args: Array[String]): Unit = {
val sc = new SparkContext()
val sqlContext = new SQLContext(sc)
val inputData: Seq[Row] = Seq(
Row(true,1,"A",2.0),
Row(true,2,null,4.0),
Row(false,3,"C",9.0)
)
val schema = StructType(Seq(
StructField("boolean",BooleanType),
StructField("int",IntegerType),
StructField("string",StringType),
StructField("double",DoubleType)
))
val rdd: RDD[Row] = sc.parallelize(inputData,3)
val df: DataFrame = sqlContext.createDataFrame(rdd,schema)
/* The permutationPlan can be serialized, so we generate it once and
for all on the driver. This will perform a preliminary check as well */
val permutationPlan = PermutationPlan[DemoCC](df)
/* The transformer cannot be serialized because TypeTag is not (really)
serializable in scala 2.10 */
@transient lazy val transformer = new
RowToCaseClassTransformer[DemoCC](permutationPlan)
/* Using "df.map(transformer)" instead would not work... */
val res = df.map{r => transformer(r)}
res.collect.foreach{println}
}
}
```
I tried implementing `dataframe.toCaseClassRDD[CaseClass]` with an implicit
conversion
but I bumped into Serialization Exception because TypeTags are not (really)
serializable in scala 2.10 (https://issues.scala-lang.org/browse/SI-5919),
which is why the transformer in the example
has to be transient.
On the other hand, I tried to be as generic as possible, and my
implementation supports
nested case classes, and maps the DataFrame's column names with the Case
Classes fields names.
Thus they do not require to be defined in the same order, and the Case
Class may have less fields than the DataFrame (as shown the example).
It also throws an error at initialization if there is a type or name
incompatibility.
I've just found this pull request, and would love to contribute, but I am
not sure how to proceed.
I wanted to open source my code earlier, but I hoped to find a way to
reduce the boilerplate first.
I would love to discuss this further with you. I am not sure this is the
most suitable place to do so.
Regards,
Furcy
PS: thanks for the `ScalaReflectionLock.synchronized`,
I bumped into the same issue and was looking for a solution ;-)
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]