Hi, I need to generate some flags based on certain columns and add it back to the schemaRDD for further operations. Do I have to use case class (reflection or programmatically). I am using parquet files, so schema is being automatically derived. This is a great feature. thanks to Spark developers, however, if subsequent createSchemaRDD doesnt work, this feature seems unusable for advanced levels. I hope there is some way of doing it and I am missing something. This is what I am attempting. Appreciate your help.
Here is my code block: import sqlContext.createSchemaRDD val YR_indv_purchase = createSchemaRDD(indv_purchase.map{row => { val v_YR = scala.math.ceil(monthsBetween(row.getString(2), row.getString(5))(dateFormats.YYYYMMDD)) val YR = "YR"+v_YR.toString() (row, YR) } } ).registerTempTable("YRIndvPurchase") --------------------------------------------------------------------------------------------------------------------------------------- ------- just for completeness, here are the functions being used-------- import com.github.nscala_time.time.Imports._ import org.joda.time.format.DateTimeFormat; import org.joda.time.format.DateTimeFormatter; import org.joda.time.Months object dateFormats { val YYYYMMDD = DateTimeFormat.forPattern("YYYYMMDD") } def toDateTime(dtString: String)(implicit fmt: DateTimeFormatter): DateTime = fmt.parseDateTime(dtString) def monthsBetween(FromDT: String, ToDT: String)(implicit fmt: DateTimeFormatter): Int = Months.monthsBetween(toDateTime(FromDT)(fmt), toDateTime(ToDT)(fmt)).getMonths ---------------------------------------------------------------------------------------------------------------------------------------- This compiles ok and throws a runtime exception as below: Exception in thread "main" scala.MatchError: org.apache.spark.sql.Row (of class scala.reflect.internal.Types$TypeRef$$anon$3) at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:72) at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$schemaFor$1.apply(ScalaReflection.scala:64) at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$schemaFor$1.apply(ScalaReflection.scala:62) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.immutable.List.foreach(List.scala:318) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:62) at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:50) at org.apache.spark.sql.catalyst.ScalaReflection$.attributesFor(ScalaReflection.scala:44) at org.apache.spark.sql.execution.ExistingRdd$.fromProductRdd(basicOperators.scala:229) at org.apache.spark.sql.SQLContext.createSchemaRDD(SQLContext.scala:94) at croevss.StageJoin$.vsswf(StageJoin.scala:162) at croevss.StageJoin$.main(StageJoin.scala:41) at croevss.StageJoin.main(StageJoin.scala) regards Sunita Koppar