Hello,

I have a few spark jobs that are doing the same aggregations. I want to
factorize the aggregation logic. For that I want to use a Trait.
When I run this job extending my Trait (over yarn, in client mode), I get
a NotSerializableException (in attachment).
If I change my Trait to an Object, the job runs fine and I don't have
a NotSerializableException.

Could you explain me why ? I don't understand this behavior

Thnaks
Jean

------------------

object SparkJob extends App {

  val conf = new SparkConf()
  val sparkSession: SparkSession = SparkSession.builder()
    .appName("aggregateAdTechImpressions")
    .config(conf)
    .enableHiveSupport()
    .getOrCreate()

    ...

  val impressionsAdtechDF =
MyUtil.prepareAggregationDataFrame(impressionsAdtechRawDF,
"timestamp")

  val impressionsAggregationDF: DataFrame =
MyUtil.aggregateImpressions(impressionsAdtechDF)

    ...

}

object MyUtil {

  private def parseTs(ts: Int): Int = {
    val tsMilli: Long = ts.toLong * 1000L
    val date: Date = new Date(tsMilli)
    val dateFormat = new SimpleDateFormat("yyyyMMdd")
    val dateStr = dateFormat.format(date)
    if (dateStr == null) 19000101 else dateStr.toInt
  }
  private def udfParseTs: UserDefinedFunction = udf(parseTs _)

  def prepareAggregationDataFrame(rawDF: DataFrame,
timestampColumnName: String): DataFrame = {

    rawDF
      .withColumn("original_placement_id", col("placementid"))
      .withColumn("date", udfParseTs(col(timestampColumnName)))
      .withColumn("placement_id", col("placementid") cast StringType)
      .withColumnRenamed("campaignid", "campaign_id")
      .withColumnRenamed("placementSizeTypeId", "size_id")
      .drop("placementid")
      .drop(timestampColumnName)
  }

  def aggregateImpressions(inputDF: DataFrame): DataFrame = {

    inputDF.groupBy(
      col("date"),
      col("campaign_id"),
      col("original_placement_id"),
      col("placement_id"),
      col("size_id"))
      .agg(count(lit(1)).alias("cnt"))
      .withColumn("type", lit(1))
      .withColumn("revenue_chf", lit(0) cast DoubleType)
      .withColumn("revenue_eur", lit(0) cast DoubleType)
      .withColumn("source", lit(0)) // 0 for AdTech
  }
}

----

object SparkJob2 extends App with MyTrait {

  val conf = new SparkConf()
  val sparkSession: SparkSession = SparkSession.builder()
    .appName("aggregateAdTechImpressions")
    .config(conf)
    .enableHiveSupport()
    .getOrCreate()

    ...

  val impressionsAdtechDF =
prepareAggregationDataFrame(impressionsAdtechRawDF, "timestamp")

  val impressionsAggregationDF: DataFrame =
aggregateImpressions(impressionsAdtechDF)

    ...

}

trait MyTrait {

  private def parseTs(ts: Int): Int = {
    val tsMilli: Long = ts.toLong * 1000L
    val date: Date = new Date(tsMilli)
    val dateFormat = new SimpleDateFormat("yyyyMMdd")
    val dateStr = dateFormat.format(date)
    if (dateStr == null) 19000101 else dateStr.toInt
  }
  private def udfParseTs: UserDefinedFunction = udf(parseTs _)

  def prepareAggregationDataFrame(rawDF: DataFrame,
timestampColumnName: String): DataFrame = {

    rawDF
      .withColumn("original_placement_id", col("placementid"))
      .withColumn("date", udfParseTs(col(timestampColumnName)))
      .withColumn("placement_id", col("placementid") cast StringType)
      .withColumnRenamed("campaignid", "campaign_id")
      .withColumnRenamed("placementSizeTypeId", "size_id")
      .drop("placementid")
      .drop(timestampColumnName)
  }

  def aggregateImpressions(inputDF: DataFrame): DataFrame = {

    inputDF.groupBy(
      col("date"),
      col("campaign_id"),
      col("original_placement_id"),
      col("placement_id"),
      col("size_id"))
      .agg(count(lit(1)).alias("cnt"))
      .withColumn("type", lit(1))
      .withColumn("revenue_chf", lit(0) cast DoubleType)
      .withColumn("revenue_eur", lit(0) cast DoubleType)
      .withColumn("source", lit(0)) // 0 for AdTech
  }
}

Attachment: spark-NotSerializableException.log
Description: Binary data

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Reply via email to