[
https://issues.apache.org/jira/browse/SPARK-16032?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15347172#comment-15347172
]
Ryan Blue edited comment on SPARK-16032 at 6/23/16 10:59 PM:
-
bq. I am not sure apply by-name resolution just to partition columns is a good
idea.
I'm not sure about this. After looking into it more, I agree in principle and
that in the long term we don't want to mix by-position column matching with
by-name partitioning. But I'm less certain about whether or not it's a good
idea right now. As I look at it more, I agree with you guys more about what is
"right". But, I'm still concerned about how to move forward from where we're
at, given the way people are currently using the API.
I think we've already established that it isn't clear that the DataFrameWriter
API relies on position. I actually think that most people aren't thinking about
the choice between by-position or by-name resolution and are using what they
get working. My first use of the API was to build a partitioned table from an
unpartitioned table, which failed. When I went looking for a solution,
{{partitionBy}} was the obvious choice (suggested by my IDE) and, sure enough,
it fixed the problem by [moving the partition columns by
name|https://github.com/apache/spark/blob/v1.6.1/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala#L180]
to the end. This solution is common because it works and is more obvious than
thinking about column order because, as I noted above, it isn't clear that
{{insertInto}} is using position.
The pattern of using {{partitionBy}} with {{insertInto}} has also become a best
practice for maintaining ETL jobs in Spark. Consider this table setup, where
data lands in {{src}} in batches and we move it to {{dest}} for long-term
storage in Parquet. Here's some example DDL:
{code:lang=sql}
CREATE TABLE src (id string, timestamp bigint, other_properties map);
CREATE TABLE dest (id string, timestamp bigint, c1 string, c2 int)
PARTITIONED BY (utc_dateint int, utc_hour int);
{code}
The Spark code for this ETL job should be this:
{code:lang=java}
spark.table("src")
.withColumn("c1", $"other_properties".getItem("c1"))
.withColumn("c2", $"other_properties".getItem("c2").cast(IntegerType))
.withColumn("date", dateint($"timetamp"))
.withColumn("hour", hour($"timestamp"))
.dropColumn("other_properties")
.write.insertInto("dest")
{code}
But, users are likely to try this next version instead. That's because it isn't
obvious that partition columns go after data columns; they are two separate
lists in the DDL.
{code:lang=java}
spark.table("src")
.withColumn("date", dateint($"timetamp"))
.withColumn("hour", hour($"timestamp"))
.withColumn("c1", $"other_properties".getItem("c1"))
.withColumn("c2", $"other_properties".getItem("c2").cast(IntegerType))
.dropColumn("other_properties")
.write.insertInto("dest")
{code}
And again, the most obvious fix is to add {{partitionBy}} to specify the
partition columns, which appears to users as a match for Hive's
{{PARTITION("date", "hour")}} syntax. Users then get the impression that
{{partitionBy}} is equivalent to {{PARTITION}}, though in reality Hive operates
by position.
{code:lang=java}
spark.table("src")
.withColumn("date", dateint($"timetamp"))
.withColumn("hour", hour($"timestamp"))
.withColumn("c1", $"other_properties".getItem("c1"))
.withColumn("c2", $"other_properties".getItem("c2").cast(IntegerType))
.dropColumn("other_properties")
.write.partitionBy("date", "hour").insertInto("dest")Another case
{code}
Another reason to use {{partitionBy}} is for maintaining ETL over time. When
structure changes, so does column order. Say I want to add a dedup step so I
get just one row per ID per day. My first attempt, based on getting the column
order right to begin with, looks like this:
{code:lang=java}
// column orders change, causing the query to break
spark.table("src")
.withColumn("date", dateint($"timetamp")) // moved to before dropDuplicates
.dropDuplicates($"date", $"id") // added to dedup records
.withColumn("c1", $"other_properties".getItem("c1"))
.withColumn("c2", $"other_properties".getItem("c2").cast(IntegerType))
.withColumn("hour", hour($"timestamp"))
.dropColumn("other_properties")
.write.insertInto("dest")
{code}
The result is that I get crazy partitioning because c2 and hour are used. The
most obvious symptom of the wrong column order is partitioning and when I look
into it, I find {{partitionBy}} fixes it. In many cases, that's the first
method I'll try because I see bad partition values. This solution doesn't
always fix the query, but it does solve the partitioning problem I observed.
(Also: other order problems are hidden by inserting {{Cast}} instead of the
safer {{UpCast}}.)
Users will also _choose_ this over the right solution,