Hello. I have come across some odd behavior with writing to persistent Hive tables in Spark using dynamic partitioning.
Basically, I create a table. Using Spark streaming create counts of events by ID and source. For each RDD I create a temporary table. I then select from the temporary table into the actual table. (1) Create model: case class HiveEventIdCount(eventGenerationTime: Timestamp, eventIdAndSource : String, count : Int, eventMonthBin : Long, eventHourBin : Long) object HiveEventIdCount { def apply(eventIdCount : EventIdCount): HiveEventIdCount = { HiveEventIdCount( eventIdCount.eventGenerationTime, eventIdCount.eventIdAndSource, eventIdCount.count, eventIdCount.eventMonthBin.getMillis, eventIdCount.eventHourBin.getMillis ) } } (2) Create counts: val eventCounts = eventAndDomainCounts .map(x => Tuple2(x.eventIdAndSource, x.count)) .reduceByKey(_ + _) .map(x => EventIdCount( eventGenerationTime = DateTime.now(DateTimeZone.UTC), eventIdAndSource = x._1, count = x._2)) (3) Create the table: val hc = new HiveContext(sc) import hc.implicits._ hc.sql("use default") val eventIdTableCreateStatement = ("""CREATE TABLE IF NOT EXISTS %s |( |eventGenerationTime timestamp, |count bigint |) PARTITIONED BY (eventMonthBin bigint, eventHourBin bigint, %s string) """ format(eventIdCountTable, eventIdPartitionField)).stripMargin hc.sql(eventIdTableCreateStatement) hc.sql("set hive.exec.dynamic.partition.mode=nonstrict") (4) Insert data from each RDD: eventCounts.map(x => HiveEventIdCount(x)).foreachRDD(rdd => { val tempTableName = "rawevents" + Random.nextInt().abs.toString val eventCountsDF = rdd.toDF() eventCountsDF.registerTempTable(tempTableName) val eventMonthBinPartition = rdd.map(x => x.eventMonthBin).max() val eventHourBinPartition = rdd.map(x => x.eventHourBin).max() val insertStatement = "insert into table %s partition(eventMonthBin = %s, eventHourBin = %s, eventIdAndSource) select %s from %s" .format(eventIdCountTable, eventMonthBinPartition, eventHourBinPartition, eventCountsDF.columns.filter(x => x != "eventMonthBin" && x != "eventHourBin" && x != "eventIdAndSource").mkString(","), tempTableName) hc.sql(insertStatement) }) This works. However, the input data has event IDs like '710:Source'. When I look at the HDFS created data I see the following: /user/hive/warehouse/eventidcounts2/eventmonthbin=1446336000000/eventhourbin=1446757200000/eventidandsource=3835 Obviously the manually calculated fields are correct. However, the dynamically calculated (string) partition for idAndSource is a random field from within my case class. I've duplicated this with several other classes and have seen the same result (I use this example because it's very simple). Any idea if this is a known bug? Is there a workaround? Regards, Bryan Jeffrey