[jira] [Commented] (SPARK-9278) DataFrameWriter.insertInto inserts incorrect data
[ https://issues.apache.org/jira/browse/SPARK-9278?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16374073#comment-16374073 ] Xiaoju Wu commented on SPARK-9278: -- Created a new ticket to trace this issue SPARK-23493 > DataFrameWriter.insertInto inserts incorrect data > - > > Key: SPARK-9278 > URL: https://issues.apache.org/jira/browse/SPARK-9278 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.4.0 > Environment: Linux, S3, Hive Metastore >Reporter: Steve Lindemann >Assignee: Cheng Lian >Priority: Critical > > After creating a partitioned Hive table (stored as Parquet) via the > DataFrameWriter.createTable command, subsequent attempts to insert additional > data into new partitions of this table result in inserting incorrect data > rows. Reordering the columns in the data to be written seems to avoid this > issue. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-9278) DataFrameWriter.insertInto inserts incorrect data
[ https://issues.apache.org/jira/browse/SPARK-9278?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16374048#comment-16374048 ] Hyukjin Kwon commented on SPARK-9278: - Mind opening another JIRA ticket with its affected version and the reproducer above? > DataFrameWriter.insertInto inserts incorrect data > - > > Key: SPARK-9278 > URL: https://issues.apache.org/jira/browse/SPARK-9278 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.4.0 > Environment: Linux, S3, Hive Metastore >Reporter: Steve Lindemann >Assignee: Cheng Lian >Priority: Critical > > After creating a partitioned Hive table (stored as Parquet) via the > DataFrameWriter.createTable command, subsequent attempts to insert additional > data into new partitions of this table result in inserting incorrect data > rows. Reordering the columns in the data to be written seems to avoid this > issue. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-9278) DataFrameWriter.insertInto inserts incorrect data
[ https://issues.apache.org/jira/browse/SPARK-9278?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16374040#comment-16374040 ] Xiaoju Wu commented on SPARK-9278: -- Seems the issue still exists, here's the test: val data = Seq( (7, "test1", 1.0), (8, "test#test", 0.0), (9, "test3", 0.0) ) import spark.implicits._ val table = "default.tbl" spark .createDataset(data) .toDF("col1", "col2", "col3") .write .partitionBy("col1") .saveAsTable(table) val data2 = Seq( (7, "test2", 1.0), (8, "test#test", 0.0), (9, "test3", 0.0) ) spark .createDataset(data2) .toDF("col1", "col2", "col3") .write .insertInto(table) sql("select * from " + table).show() +-+++ | col2|col3|col1| +-+++ |test#test| 0.0| 8| | test1| 1.0| 7| | test3| 0.0| 9| | 8|null| 0| | 9|null| 0| | 7|null| 1| +-+++ No exception was thrown since I only run insertInto not together with partitionBy. The data are inserted incorrectly. The issue is related to column order. If I change to partitionBy col3, which is the last column in order, it works. val data = Seq( (7, "test1", 1.0), (8, "test#test", 0.0), (9, "test3", 0.0) ) import spark.implicits._ val table = "default.tbl" spark .createDataset(data) .toDF("col1", "col2", "col3") .write .partitionBy("col3") .saveAsTable(table) val data2 = Seq( (7, "test2", 1.0), (8, "test#test", 0.0), (9, "test3", 0.0) ) spark .createDataset(data2) .toDF("col1", "col2", "col3") .write .insertInto(table) sql("select * from " + table).show() ++-++ |col1| col2|col3| ++-++ | 8|test#test| 0.0| | 9| test3| 0.0| | 8|test#test| 0.0| | 9| test3| 0.0| | 7| test1| 1.0| | 7| test2| 1.0| ++-++ > DataFrameWriter.insertInto inserts incorrect data > - > > Key: SPARK-9278 > URL: https://issues.apache.org/jira/browse/SPARK-9278 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.4.0 > Environment: Linux, S3, Hive Metastore >Reporter: Steve Lindemann >Assignee: Cheng Lian >Priority: Critical > > After creating a partitioned Hive table (stored as Parquet) via the > DataFrameWriter.createTable command, subsequent attempts to insert additional > data into new partitions of this table result in inserting incorrect data > rows. Reordering the columns in the data to be written seems to avoid this > issue. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-9278) DataFrameWriter.insertInto inserts incorrect data
[ https://issues.apache.org/jira/browse/SPARK-9278?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15047830#comment-15047830 ] Hyukjin Kwon commented on SPARK-9278: - The result might be definitely different as I ran the codes below with master branch of Spark, local environment without S3, Scala API and Mac OS. Though, I will leave the comment about what I tested in case you might want to test without the environments. Here the codes I ran, {code} // Create data. val alphabets = Seq("a", "e", "i", "o", "u") val partA = (0 to 4).map(i => Seq(alphabets(i % 5), "a", i)) val partB = (5 to 9).map(i => Seq(alphabets(i % 5), "b", i)) val partC = (10 to 14).map(i => Seq(alphabets(i % 5), "c", i)) val data = partA ++ partB ++ partC // Create RDD. val rowsRDD = sc.parallelize(data.map(Row.fromSeq)) // Create Dataframe. val schema = StructType(List( StructField("k", StringType, true), StructField("pk", StringType, true), StructField("v", IntegerType, true)) ) val sdf = sqlContext.createDataFrame(rowsRDD, schema) // create a empty table. sdf.filter("FALSE") .write .format("parquet") .option("path", "foo") .partitionBy("pk") .saveAsTable("foo") // Save a partitioned table. sdf.filter("pk = 'a'") .write .partitionBy("pk") .insertInto("foo") // Select all. val foo = sqlContext.table("foo") foo.show() {code} And the result was correct as below. {code} +---+---+---+ | k| v| pk| +---+---+---+ | a| 0| a| | e| 1| a| | i| 2| a| | o| 3| a| | u| 4| a| +---+---+---+ {code} > DataFrameWriter.insertInto inserts incorrect data > - > > Key: SPARK-9278 > URL: https://issues.apache.org/jira/browse/SPARK-9278 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.4.0 > Environment: Linux, S3, Hive Metastore >Reporter: Steve Lindemann >Assignee: Cheng Lian >Priority: Blocker > > After creating a partitioned Hive table (stored as Parquet) via the > DataFrameWriter.createTable command, subsequent attempts to insert additional > data into new partitions of this table result in inserting incorrect data > rows. Reordering the columns in the data to be written seems to avoid this > issue. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-9278) DataFrameWriter.insertInto inserts incorrect data
[ https://issues.apache.org/jira/browse/SPARK-9278?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15004626#comment-15004626 ] Davies Liu commented on SPARK-9278: --- cc [~lian cheng] > DataFrameWriter.insertInto inserts incorrect data > - > > Key: SPARK-9278 > URL: https://issues.apache.org/jira/browse/SPARK-9278 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.4.0 > Environment: Linux, S3, Hive Metastore >Reporter: Steve Lindemann >Priority: Blocker > > After creating a partitioned Hive table (stored as Parquet) via the > DataFrameWriter.createTable command, subsequent attempts to insert additional > data into new partitions of this table result in inserting incorrect data > rows. Reordering the columns in the data to be written seems to avoid this > issue. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-9278) DataFrameWriter.insertInto inserts incorrect data
[ https://issues.apache.org/jira/browse/SPARK-9278?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14638843#comment-14638843 ] Steve Lindemann commented on SPARK-9278: Here are the steps to reproduce the issue. First, create a Hive table with the desired schema: {noformat} In [1]: hc = pyspark.sql.HiveContext(sqlContext) In [2]: pdf = pd.DataFrame({'pk': ['a']*5+['b']*5+['c']*5, 'k': ['a', 'e', 'i', 'o', 'u']*3, 'v': range(15)}) In [3]: sdf = hc.createDataFrame(pdf) In [4]: sdf.show() +-+--+--+ |k|pk| v| +-+--+--+ |a| a| 0| |e| a| 1| |i| a| 2| |o| a| 3| |u| a| 4| |a| b| 5| |e| b| 6| |i| b| 7| |o| b| 8| |u| b| 9| |a| c|10| |e| c|11| |i| c|12| |o| c|13| |u| c|14| +-+--+--+ In [5]: sdf.filter('FALSE').write.partitionBy('pk').saveAsTable('foo', format='parquet', path='s3a://eglp-core-temp/tmp/foo') {noformat} A table has been created: {noformat} In [33]: print('\n'.join(r.result for r in hc.sql('SHOW CREATE TABLE foo').collect())) CREATE EXTERNAL TABLE `foo`( `col` array COMMENT 'from deserializer') PARTITIONED BY ( `pk` string COMMENT '') ROW FORMAT DELIMITED STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.SequenceFileInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat' LOCATION 's3a://eglp-core-data/hive/warehouse/foo' TBLPROPERTIES ( 'spark.sql.sources.schema.part.0'='{\"type\":\"struct\",\"fields\":[{\"name\":\"k\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"v\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}},{\"name\":\"pk\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}]}', 'transient_lastDdlTime'='1437657391', 'spark.sql.sources.schema.numParts'='1', 'spark.sql.sources.provider'='parquet') {noformat} Now, write a new partition of data (note that this is from the same DataFrame from which the table was created): {noformat} sdf.filter(sdf.pk == 'a').write.partitionBy('pk').insertInto('foo') {noformat} Then, select the data: {noformat} In [7]: foo = hc.table('foo') In [8]: foo.show() +-++--+ |k| v|pk| +-++--+ |a|null| 0| |o|null| 3| |i|null| 2| |e|null| 1| |u|null| 4| +-++--+ In [9]: sdf.filter(sdf.pk == 'a').show() +-+--+-+ |k|pk|v| +-+--+-+ |a| a|0| |e| a|1| |i| a|2| |o| a|3| |u| a|4| +-+--+-+ {noformat} So clearly it inserted incorrect data. By reordering the columns, we can insert data properly: {noformat} In [10]: pdf2 = pdf[['k', 'v', 'pk']] In [11]: sdf2 = hc.createDataFrame(pdf2) In [12]: sdf2.filter(sdf2.pk == 'a').write.partitionBy('pk').insertInto('foo') In [13]: hc.refreshTable('foo') In [14]: foo = hc.table('foo') In [15]: foo.show() +-++--+ |k| v|pk| +-++--+ |a|null| 0| |o|null| 3| |i|null| 2| |e|null| 1| |u|null| 4| |o| 3| a| |u| 4| a| |a| 0| a| |e| 1| a| |i| 2| a| +-++--+ {noformat} > DataFrameWriter.insertInto inserts incorrect data > - > > Key: SPARK-9278 > URL: https://issues.apache.org/jira/browse/SPARK-9278 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.4.0 > Environment: Linux, S3, Hive Metastore >Reporter: Steve Lindemann > > After creating a partitioned Hive table (stored as Parquet) via the > DataFrameWriter.createTable command, subsequent attempts to insert additional > data into new partitions of this table result in inserting incorrect data > rows. Reordering the columns in the data to be written seems to avoid this > issue. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org