I need to take a DataFrame of events, and explode them row-wise so that there's at least one representation per time interval (usually day) in between events.
Here's a simplified version of the problem, which I have gotten to work in spark-shell: case class Meal (food: String, calories: Double, date: Int) case class AsOfDate (dt: Int) val m1 = new Meal("steak", 33, 20170101) val m2 = new Meal("peach", 25, 20170105) val mDf = sc.parallelize(Seq(m1, m2)).toDF scala> mDf.show +-----+--------+--------+ | food|calories| date| +-----+--------+--------+ |steak| 33|20170101| |peach| 25|20170105| +-----+--------+--------+ Now, I want to explode the DataFrame so that there are no gaps in days: mDf.where($"date" < 20170105).explode(mDf("date")) { case Row(date: Int) => (date to 20170104).map(AsOfDate(_)) } scala> res0.show +-----+--------+--------+ | food|calories| date| +-----+--------+--------+ |steak| 33|20170101| |steak| 33|20170102| |steak| 33|20170103| |steak| 33|20170104| +-----+--------+--------+ mDf.where($"date" >= 20170105).explode(mDf("date")) { case Row(date: Int) => (date to 20170105).map(AsOfDate(_)) } scala> res1.show +-----+--------+--------+ | food|calories| date| +-----+--------+--------+ |peach| 25|20170105| +-----+--------+--------+ val exploded = res0.union(res1) scala> exploded.show +-----+--------+--------+ | food|calories| date| +-----+--------+--------+ |steak| 33|20170101| |steak| 33|20170102| |steak| 33|20170103| |steak| 33|20170104| |peach| 25|20170105| +-----+--------+--------+ So that gives me what I want, but I'd like to be able to define the function that does the date iteration elsewhere, and pass it in to the call to explode(). Part of the the reason is that in the real DataFrame "date" is a java.sql.Timestamp, which involves more manipulation to add the right number fo time intervals. I've tried defining the statements like this, but I cannot get it to work: val fn1 = (x: Row) => { case Row(date: Int) => (date to 20170104).map(AsOfDate(_)) } but I get this: error: missing parameter type for expanded function The argument types of an anonymous function must be fully known. (SLS 8.5) Expected type was: ? So I tried it this way instead: val fn1 = (x: Row) => x match { case Row(date: Int) => (date to 20170104).map(AsOfDate(_)) } which seemed to work, until I attempted to use it with explode(): mDf.where($"date" < 20170105).explode(mDf("date"))fn1 error: missing argument list for method explode in class Dataset I don't see how I can pass fn1 to explode given that it expects a type Row as its only input. Is what I want to do possible, or do I need to write the Row manipulation functions with explode() as I've done in the working examples above? As a side question, I also wonder why I need the AsOfDate case class to do the explosion, since all I want to do is set the column value, where I already know the type. Before using AsOfDate, I tried one of the working examples like this: mDf.where($"date" < 20170105).explode(mDf("date")) { case Row(date: Int) => (date to 20170104).map(_.asInstanceOf[Int]) } but that gave me this: error: inferred type arguments [Int] do not conform to method explode's type parameter bounds [A <: Product] I'm not sure what that means, or how using the case class resolves it. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-use-an-anonymous-function-with-DataFrame-explode-tp28285.html Sent from the Apache Spark User List mailing list archive at Nabble.com. --------------------------------------------------------------------- To unsubscribe e-mail: user-unsubscr...@spark.apache.org