[ 
https://issues.apache.org/jira/browse/SPARK-18510?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Burak Yavuz updated SPARK-18510:
--------------------------------
    Description: 
Not sure if this is a regression from 2.0 to 2.1. I was investigating this for 
Structured Streaming, but it seems it affects batch data as well.

Here's the issue:
If I specify my schema when doing
{code}
spark.read
  .schema(someSchemaWherePartitionColumnsAreStrings)
{code}

but if the partition inference can infer it as IntegerType or I assume LongType 
or DoubleType (basically fixed size types), then once UnsafeRows are generated, 
your data will be corrupted.

Reproduction:
{code}
val createArray = udf { (length: Long) =>
    for (i <- 1 to length.toInt) yield i.toString
}
spark.range(10).select(createArray('id + 1) as 'ex, 'id, 'id % 4 as 
'part).coalesce(1).write
        .partitionBy("part", "id")
        .mode("overwrite")
        .parquet(src.toString)
val schema = new StructType()
        .add("id", StringType)
        .add("part", IntegerType)
        .add("ex", ArrayType(StringType))
spark.read
      .schema(schema)
      .format("parquet")
      .load(src.toString)
      .show()
{code}
The UDF is useful for creating a row long enough so that you don't hit other 
weird NullPointerExceptions caused for the same reason I believe.
Output:
{code}
+---------+----+--------------------+
|       id|part|                  ex|
+---------+----+--------------------+
|�|   1|[1, 2, 3, 4, 5, 6...|
| |   0|[1, 2, 3, 4, 5, 6...|
|  |   3|[1, 2, 3, 4, 5, 6...|
|   |   2|[1, 2, 3, 4, 5, 6...|
|    |   1|  [1, 2, 3, 4, 5, 6]|
|     |   0|     [1, 2, 3, 4, 5]|
|      |   3|        [1, 2, 3, 4]|
|       |   2|           [1, 2, 3]|
|        |   1|              [1, 2]|
|         |   0|                 [1]|
+---------+----+--------------------+
{code}

I was hoping to fix the issue as part of SPARK-18407 but it seems it's not only 
applicable to StructuredStreaming and deserves it's own JIRA.

  was:
Not sure if this is a regression from 2.0 to 2.1. I was investigating this for 
Structured Streaming, but it seems it affects batch data as well.

Here's the issue:
If I specify my schema when doing
{code}
spark.read
  .schema(someSchemaWherePartitionColumnsAreStrings)
{code}

but if the partition inference can infer it as IntegerType or I assume LongType 
or DoubleType (basically fixed size types), then once UnsafeRows are generated, 
your data will be corrupted.

Reproduction:
{code}
val createArray = udf { (length: Long) =>
    for (i <- 1 to length.toInt) yield i.toString
}
spark.range(10).select(createArray('id + 1) as 'ex, 'id, 'id % 4 as 
'part).coalesce(1).write
        .partitionBy("part", "id")
        .mode("overwrite")
        .parquet(src.toString)
val schema = new StructType()
        .add("id", StringType)
        .add("part", IntegerType)
        .add("ex", ArrayType(StringType))
spark.read
      .schema(schema)
      .format("parquet")
      .load(src.toString)
      .show()
{code}

Output:
{code}
+---------+----+--------------------+
|       id|part|                  ex|
+---------+----+--------------------+
|�|   1|[1, 2, 3, 4, 5, 6...|
| |   0|[1, 2, 3, 4, 5, 6...|
|  |   3|[1, 2, 3, 4, 5, 6...|
|   |   2|[1, 2, 3, 4, 5, 6...|
|    |   1|  [1, 2, 3, 4, 5, 6]|
|     |   0|     [1, 2, 3, 4, 5]|
|      |   3|        [1, 2, 3, 4]|
|       |   2|           [1, 2, 3]|
|        |   1|              [1, 2]|
|         |   0|                 [1]|
+---------+----+--------------------+
{code}

I was hoping to fix the issue as part of SPARK-18407 but it seems it's not only 
applicable to StructuredStreaming and deserves it's own JIRA.


> Partition schema inference corrupts data
> ----------------------------------------
>
>                 Key: SPARK-18510
>                 URL: https://issues.apache.org/jira/browse/SPARK-18510
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL, Structured Streaming
>    Affects Versions: 2.1.0
>            Reporter: Burak Yavuz
>            Priority: Blocker
>
> Not sure if this is a regression from 2.0 to 2.1. I was investigating this 
> for Structured Streaming, but it seems it affects batch data as well.
> Here's the issue:
> If I specify my schema when doing
> {code}
> spark.read
>   .schema(someSchemaWherePartitionColumnsAreStrings)
> {code}
> but if the partition inference can infer it as IntegerType or I assume 
> LongType or DoubleType (basically fixed size types), then once UnsafeRows are 
> generated, your data will be corrupted.
> Reproduction:
> {code}
> val createArray = udf { (length: Long) =>
>     for (i <- 1 to length.toInt) yield i.toString
> }
> spark.range(10).select(createArray('id + 1) as 'ex, 'id, 'id % 4 as 
> 'part).coalesce(1).write
>         .partitionBy("part", "id")
>         .mode("overwrite")
>         .parquet(src.toString)
> val schema = new StructType()
>         .add("id", StringType)
>         .add("part", IntegerType)
>         .add("ex", ArrayType(StringType))
> spark.read
>       .schema(schema)
>       .format("parquet")
>       .load(src.toString)
>       .show()
> {code}
> The UDF is useful for creating a row long enough so that you don't hit other 
> weird NullPointerExceptions caused for the same reason I believe.
> Output:
> {code}
> +---------+----+--------------------+
> |       id|part|                  ex|
> +---------+----+--------------------+
> |�|   1|[1, 2, 3, 4, 5, 6...|
> | |   0|[1, 2, 3, 4, 5, 6...|
> |  |   3|[1, 2, 3, 4, 5, 6...|
> |   |   2|[1, 2, 3, 4, 5, 6...|
> |    |   1|  [1, 2, 3, 4, 5, 6]|
> |     |   0|     [1, 2, 3, 4, 5]|
> |      |   3|        [1, 2, 3, 4]|
> |       |   2|           [1, 2, 3]|
> |        |   1|              [1, 2]|
> |         |   0|                 [1]|
> +---------+----+--------------------+
> {code}
> I was hoping to fix the issue as part of SPARK-18407 but it seems it's not 
> only applicable to StructuredStreaming and deserves it's own JIRA.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to