[jira] [Comment Edited] (SPARK-16032) Audit semantics of various insertion operations related to partitioned tables

2016-06-23 Thread Ryan Blue (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-16032?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15347172#comment-15347172
 ] 

Ryan Blue edited comment on SPARK-16032 at 6/23/16 10:59 PM:
-

bq. I am not sure apply by-name resolution just to partition columns is a good 
idea.

I'm not sure about this. After looking into it more, I agree in principle and 
that in the long term we don't want to mix by-position column matching with 
by-name partitioning. But I'm less certain about whether or not it's a good 
idea right now. As I look at it more, I agree with you guys more about what is 
"right". But, I'm still concerned about how to move forward from where we're 
at, given the way people are currently using the API.

I think we've already established that it isn't clear that the DataFrameWriter 
API relies on position. I actually think that most people aren't thinking about 
the choice between by-position or by-name resolution and are using what they 
get working. My first use of the API was to build a partitioned table from an 
unpartitioned table, which failed. When I went looking for a solution, 
{{partitionBy}} was the obvious choice (suggested by my IDE) and, sure enough, 
it fixed the problem by [moving the partition columns by 
name|https://github.com/apache/spark/blob/v1.6.1/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala#L180]
 to the end. This solution is common because it works and is more obvious than 
thinking about column order because, as I noted above, it isn't clear that 
{{insertInto}} is using position.

The pattern of using {{partitionBy}} with {{insertInto}} has also become a best 
practice for maintaining ETL jobs in Spark. Consider this table setup, where 
data lands in {{src}} in batches and we move it to {{dest}} for long-term 
storage in Parquet. Here's some example DDL:

{code:lang=sql}
CREATE TABLE src (id string, timestamp bigint, other_properties map);
CREATE TABLE dest (id string, timestamp bigint, c1 string, c2 int)
  PARTITIONED BY (utc_dateint int, utc_hour int);
{code}

The Spark code for this ETL job should be this:

{code:lang=java}
spark.table("src")
  .withColumn("c1", $"other_properties".getItem("c1"))
  .withColumn("c2", $"other_properties".getItem("c2").cast(IntegerType))
  .withColumn("date", dateint($"timetamp"))
  .withColumn("hour", hour($"timestamp"))
  .dropColumn("other_properties")
  .write.insertInto("dest")
{code}

But, users are likely to try this next version instead. That's because it isn't 
obvious that partition columns go after data columns; they are two separate 
lists in the DDL.

{code:lang=java}
spark.table("src")
  .withColumn("date", dateint($"timetamp"))
  .withColumn("hour", hour($"timestamp"))
  .withColumn("c1", $"other_properties".getItem("c1"))
  .withColumn("c2", $"other_properties".getItem("c2").cast(IntegerType))
  .dropColumn("other_properties")
  .write.insertInto("dest")
{code}

And again, the most obvious fix is to add {{partitionBy}} to specify the 
partition columns, which appears to users as a match for Hive's 
{{PARTITION("date", "hour")}} syntax. Users then get the impression that 
{{partitionBy}} is equivalent to {{PARTITION}}, though in reality Hive operates 
by position.

{code:lang=java}
spark.table("src")
  .withColumn("date", dateint($"timetamp"))
  .withColumn("hour", hour($"timestamp"))
  .withColumn("c1", $"other_properties".getItem("c1"))
  .withColumn("c2", $"other_properties".getItem("c2").cast(IntegerType))
  .dropColumn("other_properties")
  .write.partitionBy("date", "hour").insertInto("dest")Another case 
{code}

Another reason to use {{partitionBy}} is for maintaining ETL over time. When 
structure changes, so does column order. Say I want to add a dedup step so I 
get just one row per ID per day. My first attempt, based on getting the column 
order right to begin with, looks like this:

{code:lang=java}
// column orders change, causing the query to break
spark.table("src")
  .withColumn("date", dateint($"timetamp")) // moved to before dropDuplicates
  .dropDuplicates($"date", $"id")   // added to dedup records
  .withColumn("c1", $"other_properties".getItem("c1"))
  .withColumn("c2", $"other_properties".getItem("c2").cast(IntegerType))
  .withColumn("hour", hour($"timestamp"))
  .dropColumn("other_properties")
  .write.insertInto("dest")
{code}

The result is that I get crazy partitioning because c2 and hour are used. The 
most obvious symptom of the wrong column order is partitioning and when I look 
into it, I find {{partitionBy}} fixes it. In many cases, that's the first 
method I'll try because I see bad partition values. This solution doesn't 
always fix the query, but it does solve the partitioning problem I observed. 
(Also: other order problems are hidden by inserting {{Cast}} instead of the 
safer {{UpCast}}.)

Users will also _choose_ this over the right solution, 

[jira] [Comment Edited] (SPARK-16032) Audit semantics of various insertion operations related to partitioned tables

2016-06-21 Thread Wenchen Fan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-16032?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15343029#comment-15343029
 ] 

Wenchen Fan edited comment on SPARK-16032 at 6/22/16 1:15 AM:
--

I think it doesn't make sense to use `partitionBy` with `insertInto`, as we can 
not map `DataFrameWriter.insertInto` to SQL INSERT for 2 reasons:

1. `DataFrameWriter` doesn't support static partition
2. `DataFrameWriter` specifies the partition columns of the data to insert, not 
the table to be inserted.

And it's already broken(mostly) in 1.6, according to the test cases at 
https://gist.github.com/cloud-fan/14ada3f2b3225b5db52ccaa12aacfbd4 , the only 
case that seems reasonable in 1.6 is when the data to insert has same schema 
with the table to be inserted and the `partitionBy` specifies the correct 
partition columns. But I think it's worth to break it and make the overall 
semantics more clear.

Maybe we are wrong, it will be good if we come up with a clean semantics to 
explain the behavior of `DataFrame.insertInto`, but after spent a lot of time 
on it, we failed, and that's why we wanna make these changes and rush in into 
2.0.


was (Author: cloud_fan):
I think it's nonsense to use `partitionBy` with `insertInto`, as we can not map 
`DataFrameWriter.insertInto` to SQL INSERT for 2 reasons:

1. `DataFrameWriter` doesn't support static partition
2. `DataFrameWriter` specifies the partition columns of the data to insert, not 
the table to be inserted.

And it's already broken(mostly) in 1.6, according to the test cases at 
https://gist.github.com/cloud-fan/14ada3f2b3225b5db52ccaa12aacfbd4 , the only 
case that seems reasonable in 1.6 is when the data to insert has same schema 
with the table to be inserted and the `partitionBy` specifies the correct 
partition columns. But I think it's worth to break it and make the overall 
semantics more clear.

Maybe we are wrong, it will be good if we come up with a clean semantics to 
explain the behavior of `DataFrame.insertInto`, but after spent a lot of time 
on it, we failed, and that's why we wanna make these changes and rush in into 
2.0.

> Audit semantics of various insertion operations related to partitioned tables
> -
>
> Key: SPARK-16032
> URL: https://issues.apache.org/jira/browse/SPARK-16032
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.0
>Reporter: Cheng Lian
>Assignee: Wenchen Fan
>Priority: Critical
> Attachments: [SPARK-16032] Spark SQL table insertion auditing - 
> Google Docs.pdf
>
>
> We found that semantics of various insertion operations related to partition 
> tables can be inconsistent. This is an umbrella ticket for all related 
> tickets.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-16032) Audit semantics of various insertion operations related to partitioned tables

2016-06-20 Thread Yin Huai (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-16032?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15340001#comment-15340001
 ] 

Yin Huai edited comment on SPARK-16032 at 6/21/16 5:11 AM:
---

I am attaching a summary of the results of the auditing work. These original 
tests were done before we merged 
https://github.com/apache/spark/commit/57feaa572db62059555b573e145449d0063b08c8.



was (Author: yhuai):
I am attaching a summary of the results of the auditing work. These original 
tests were done using 
https://github.com/apache/spark/commit/ebb9a3b6fd834e2c856a192b4455aab83e9c4dc8.



> Audit semantics of various insertion operations related to partitioned tables
> -
>
> Key: SPARK-16032
> URL: https://issues.apache.org/jira/browse/SPARK-16032
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.0
>Reporter: Cheng Lian
>Assignee: Wenchen Fan
>Priority: Blocker
> Attachments: [SPARK-16032] Spark SQL table insertion auditing - 
> Google Docs.pdf
>
>
> We found that semantics of various insertion operations related to partition 
> tables can be inconsistent. This is an umbrella ticket for all related 
> tickets.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org