Hello.

I have come across some odd behavior with writing to persistent Hive tables
in Spark using  dynamic partitioning.

Basically, I create a table.  Using Spark streaming create counts of events
by ID and source.  For each RDD I create a temporary table. I then select
from the temporary table into the actual table.

(1) Create model:
case class HiveEventIdCount(eventGenerationTime: Timestamp,
eventIdAndSource : String, count : Int, eventMonthBin : Long, eventHourBin
: Long)

object HiveEventIdCount {
  def apply(eventIdCount : EventIdCount): HiveEventIdCount = {
    HiveEventIdCount(
      eventIdCount.eventGenerationTime,
      eventIdCount.eventIdAndSource,
      eventIdCount.count,
      eventIdCount.eventMonthBin.getMillis,
      eventIdCount.eventHourBin.getMillis
    )
  }
}

(2) Create counts:

val eventCounts = eventAndDomainCounts
      .map(x => Tuple2(x.eventIdAndSource, x.count))
      .reduceByKey(_ + _)
      .map(x =>
      EventIdCount(
        eventGenerationTime = DateTime.now(DateTimeZone.UTC),
        eventIdAndSource = x._1,
        count = x._2))

(3) Create the table:

val hc = new HiveContext(sc)
import hc.implicits._

hc.sql("use default")
val eventIdTableCreateStatement = ("""CREATE TABLE IF NOT EXISTS %s
  |(
  |eventGenerationTime timestamp,
  |count bigint
  |) PARTITIONED BY (eventMonthBin bigint, eventHourBin bigint, %s string)
   """ format(eventIdCountTable, eventIdPartitionField)).stripMargin
hc.sql(eventIdTableCreateStatement)
hc.sql("set hive.exec.dynamic.partition.mode=nonstrict")

(4) Insert data from each RDD:
eventCounts.map(x => HiveEventIdCount(x)).foreachRDD(rdd => {
  val tempTableName = "rawevents" + Random.nextInt().abs.toString
  val eventCountsDF = rdd.toDF()
  eventCountsDF.registerTempTable(tempTableName)
  val eventMonthBinPartition = rdd.map(x => x.eventMonthBin).max()
  val eventHourBinPartition = rdd.map(x => x.eventHourBin).max()
  val insertStatement = "insert into table %s partition(eventMonthBin = %s,
eventHourBin = %s, eventIdAndSource) select %s from %s"
 .format(eventIdCountTable, eventMonthBinPartition, eventHourBinPartition,
eventCountsDF.columns.filter(x => x != "eventMonthBin" && x !=
"eventHourBin" && x != "eventIdAndSource").mkString(","), tempTableName)
  hc.sql(insertStatement)
})

This works.  However, the input data has event IDs like '710:Source'.  When
I look at the HDFS created data I see the following:

/user/hive/warehouse/eventidcounts2/eventmonthbin=1446336000000/eventhourbin=1446757200000/eventidandsource=3835

Obviously the manually calculated fields are correct. However, the
dynamically calculated (string) partition for idAndSource is a random field
from within my case class.  I've duplicated this with several other classes
and have seen the same result (I use this example because it's very simple).

Any idea if this is a known bug?  Is there a workaround?
Regards,

Bryan Jeffrey

Reply via email to