[jira] [Comment Edited] (SPARK-32317) Parquet file loading with different schema(Decimal(N, P)) in files is not working as expected

2020-07-19 Thread Chen Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-32317?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17160207#comment-17160207
 ] 

Chen Zhang edited comment on SPARK-32317 at 7/19/20, 6:40 PM:
--

The DECIMAL type in Parquet is stored by INT32, INT64, FIXED_LEN_BYTE_ARRAY, or 
BINARY.

Taking 19500.00 stored as INT64 as an example, the parquet file is stored as 
DECIMAL(15,2) -> 195, DECIMAL(15,6) -> 195.

 


was (Author: chen zhang):
The DECIMAL type in Parquet is stored by INT32, INT64, FIXED_LEN_BYTE_ARRAY, or 
BINARY.

Taking 19500.00 stored as INT64 as an example, the parquet file is stored as 
DECIMAL(15,2) -> 195, DECIMAL(15,6) -> 195.

Spark uses requiredSchema to convert the read parquet file data. It does not 
consider that the requiredSchema and the schema stored in the file may be 
different. This may potentially cause data correctness problems.

I think it is necessary to consider the mapping between the requiredSchema and 
the schema stored in the file. I will try to modify the code, and if it goes 
well, I can submit a PR.

> Parquet file loading with different schema(Decimal(N, P)) in files is not 
> working as expected
> -
>
> Key: SPARK-32317
> URL: https://issues.apache.org/jira/browse/SPARK-32317
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 3.0.0
> Environment: Its failing in all environments that I tried.
>Reporter: Krish
>Priority: Major
>  Labels: easyfix
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> Hi,
>  
> We generate parquet files which are partitioned on Date on a daily basis, and 
> we send updates to historical data some times, what we noticed is due to some 
> configuration error the patch data schema is inconsistent to earlier files.
> Assuming we had files generated with schema having ID and Amount as fields. 
> Historical data is having schema like ID INT, AMOUNT DECIMAL(15,6) and the 
> files we send as updates has schema like DECIMAL(15,2). 
>  
> Having two different schema in a Date partition and when we load the data of 
> a Date into spark, it is loading the data but the amount is getting 
> manipulated.
>  
> file1.snappy.parquet
>  ID: INT
>  AMOUNT: DECIMAL(15,6)
>  Content:
>  1,19500.00
>  2,198.34
> file2.snappy.parquet
>  ID: INT
>  AMOUNT: DECIMAL(15,2)
>  Content:
>  1,19500.00
>  3,198.34
> Load these two files togeather
> df3 = spark.read.parquet("output/")
> df3.show() #-we can see amount getting manipulated here,
> +-+---+
> |ID|   AMOUNT|
> +-+---+
> |1|1.95|
> |3|0.019834|
> |1|19500.00|
> |2|198.34|
> +-+---+
>  x
> Options Tried:
> We tried to give schema as String for all fields, but that didt work
> df3 = spark.read.format("parquet").schema(schema).load("output/")
> Error: "org.apache.spark.sql.execution.QueryExecutionException: Parquet 
> column cannot be converted in file file*.snappy.parquet. Column: 
> [AMOUNT], Expected: string, Found: INT64"
>  
> I know merge schema works if it finds few extra columns in one file but the 
> fileds which are in common needs to have same schema. That might nort work 
> here.
>  
> Looking for some work around solution here. Or if there is an option which I 
> havent tried you can point me to that.
>  
> With schema merging I got below eeror:
> An error occurred while calling o2272.parquet. : 
> org.apache.spark.SparkException: Failed merging schema: root |-- ID: string 
> (nullable = true) |-- AMOUNT: decimal(15,6) (nullable = true) at 
> org.apache.spark.sql.execution.datasources.SchemaMergeUtils$.$anonfun$mergeSchemasInParallel$5(SchemaMergeUtils.scala:100)
>  at 
> org.apache.spark.sql.execution.datasources.SchemaMergeUtils$.$anonfun$mergeSchemasInParallel$5$adapted(SchemaMergeUtils.scala:95)
>  at 
> scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36) at 
> scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33) 
> at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198) at 
> org.apache.spark.sql.execution.datasources.SchemaMergeUtils$.mergeSchemasInParallel(SchemaMergeUtils.scala:95)
>  at 
> org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$.mergeSchemasInParallel(ParquetFileFormat.scala:485)
>  at 
> org.apache.spark.sql.execution.datasources.parquet.ParquetUtils$.inferSchema(ParquetUtils.scala:107)
>  at 
> org.apache.spark.sql.execution.datasources.v2.parquet.ParquetTable.inferSchema(ParquetTable.scala:44)
>  at 
> org.apache.spark.sql.execution.datasources.v2.FileTable.$anonfun$dataSchema$4(FileTable.scala:69)
>  at scala.Option.orElse(Option.scala:447) at 
> 

[jira] [Comment Edited] (SPARK-32317) Parquet file loading with different schema(Decimal(N, P)) in files is not working as expected

2020-07-19 Thread Chen Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-32317?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17160207#comment-17160207
 ] 

Chen Zhang edited comment on SPARK-32317 at 7/19/20, 6:38 PM:
--

The DECIMAL type in Parquet is stored by INT32, INT64, FIXED_LEN_BYTE_ARRAY, or 
BINARY.

Taking 19500.00 stored as INT64 as an example, the parquet file is stored as 
DECIMAL(15,2) -> 195, DECIMAL(15,6) -> 195.

Spark uses requiredSchema to convert the read parquet file data. It does not 
consider that the requiredSchema and the schema stored in the file may be 
different. This may potentially cause data correctness problems.

I think it is necessary to consider the mapping between the requiredSchema and 
the schema stored in the file. I will try to modify the code, and if it goes 
well, I can submit a PR.


was (Author: chen zhang):
The DECIMAL type in Parquet is stored by INT32, INT64, FIXED_LEN_BYTE_ARRAY, or 
BINARY.

Taking 19500.00 stored as INT64 as an example, the parquet file is stored as 
DECIMAL(15,2) -> 195, DECIMAL(15,6) -> 195.

Spark uses requiredSchema to convert the read parquet file data. It does not 
consider that the requiredSchema and the schema stored in the file may be 
different. This may potentially cause data correctness problems.

I think it is necessary to consider the mapping between the requiredSchema and 
the schema stored in the file. I will try to modify the code, and if it goes 
well, I can submit a PR.

> Parquet file loading with different schema(Decimal(N, P)) in files is not 
> working as expected
> -
>
> Key: SPARK-32317
> URL: https://issues.apache.org/jira/browse/SPARK-32317
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 3.0.0
> Environment: Its failing in all environments that I tried.
>Reporter: Krish
>Priority: Major
>  Labels: easyfix
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> Hi,
>  
> We generate parquet files which are partitioned on Date on a daily basis, and 
> we send updates to historical data some times, what we noticed is due to some 
> configuration error the patch data schema is inconsistent to earlier files.
> Assuming we had files generated with schema having ID and Amount as fields. 
> Historical data is having schema like ID INT, AMOUNT DECIMAL(15,6) and the 
> files we send as updates has schema like DECIMAL(15,2). 
>  
> Having two different schema in a Date partition and when we load the data of 
> a Date into spark, it is loading the data but the amount is getting 
> manipulated.
>  
> file1.snappy.parquet
>  ID: INT
>  AMOUNT: DECIMAL(15,6)
>  Content:
>  1,19500.00
>  2,198.34
> file2.snappy.parquet
>  ID: INT
>  AMOUNT: DECIMAL(15,2)
>  Content:
>  1,19500.00
>  3,198.34
> Load these two files togeather
> df3 = spark.read.parquet("output/")
> df3.show() #-we can see amount getting manipulated here,
> +-+---+
> |ID|   AMOUNT|
> +-+---+
> |1|1.95|
> |3|0.019834|
> |1|19500.00|
> |2|198.34|
> +-+---+
>  x
> Options Tried:
> We tried to give schema as String for all fields, but that didt work
> df3 = spark.read.format("parquet").schema(schema).load("output/")
> Error: "org.apache.spark.sql.execution.QueryExecutionException: Parquet 
> column cannot be converted in file file*.snappy.parquet. Column: 
> [AMOUNT], Expected: string, Found: INT64"
>  
> I know merge schema works if it finds few extra columns in one file but the 
> fileds which are in common needs to have same schema. That might nort work 
> here.
>  
> Looking for some work around solution here. Or if there is an option which I 
> havent tried you can point me to that.
>  
> With schema merging I got below eeror:
> An error occurred while calling o2272.parquet. : 
> org.apache.spark.SparkException: Failed merging schema: root |-- ID: string 
> (nullable = true) |-- AMOUNT: decimal(15,6) (nullable = true) at 
> org.apache.spark.sql.execution.datasources.SchemaMergeUtils$.$anonfun$mergeSchemasInParallel$5(SchemaMergeUtils.scala:100)
>  at 
> org.apache.spark.sql.execution.datasources.SchemaMergeUtils$.$anonfun$mergeSchemasInParallel$5$adapted(SchemaMergeUtils.scala:95)
>  at 
> scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36) at 
> scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33) 
> at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198) at 
> org.apache.spark.sql.execution.datasources.SchemaMergeUtils$.mergeSchemasInParallel(SchemaMergeUtils.scala:95)
>  at 
> org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$.mergeSchemasInParallel(ParquetFileFormat.scala:485)
>  

[jira] [Comment Edited] (SPARK-32317) Parquet file loading with different schema(Decimal(N, P)) in files is not working as expected

2020-07-19 Thread Chen Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-32317?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17160207#comment-17160207
 ] 

Chen Zhang edited comment on SPARK-32317 at 7/19/20, 6:10 PM:
--

The DECIMAL type in Parquet is stored by INT32, INT64, FIXED_LEN_BYTE_ARRAY, or 
BINARY.

Taking 19500.00 stored as INT64 as an example, the parquet file is stored as 
DECIMAL(15,2) -> 195, DECIMAL(15,6) -> 195.

Spark uses requiredSchema to convert the read parquet file data. It does not 
consider that the requiredSchema and the schema stored in the file may be 
different. This may potentially cause data correctness problems.

I think it is necessary to consider the mapping between the requiredSchema and 
the schema stored in the file. I will try to modify the code, and if it goes 
well, I can submit a PR.


was (Author: chen zhang):
The DECIMAL type in Parquet is stored by INT32, INT64, FIXED_LEN_BYTE_ARRAY, or 
BINARY.

Taking 19500.00 stored as INT64 as an example, the parquet file is stored as 
DECIMAL(15,2) -> 195, DECIMAL(15,6) -> 195.

I see that the spark source code 
org.apache.spark.sql.execution.datasources.parquet.ParquetRowConverter use 
catalystType to convert the decimal type. Maybe we should use the schema in the 
parquet file to convert.

something like:
{code:java}
//case t: DecimalType if parquetType.asPrimitiveType().getPrimitiveTypeName == 
INT64 =>
//  new ParquetIntDictionaryAwareDecimalConverter(t.precision, t.scale, updater)
case t: DecimalType if parquetType.asPrimitiveType().getPrimitiveTypeName == 
INT64 =>
  val mate = parquetType.asPrimitiveType().getDecimalMetadata()
  new ParquetLongDictionaryAwareDecimalConverter(mate.getPrecision, 
mate.getScale, updater)
{code}
I will do some validation later. 

> Parquet file loading with different schema(Decimal(N, P)) in files is not 
> working as expected
> -
>
> Key: SPARK-32317
> URL: https://issues.apache.org/jira/browse/SPARK-32317
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 3.0.0
> Environment: Its failing in all environments that I tried.
>Reporter: Krish
>Priority: Major
>  Labels: easyfix
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> Hi,
>  
> We generate parquet files which are partitioned on Date on a daily basis, and 
> we send updates to historical data some times, what we noticed is due to some 
> configuration error the patch data schema is inconsistent to earlier files.
> Assuming we had files generated with schema having ID and Amount as fields. 
> Historical data is having schema like ID INT, AMOUNT DECIMAL(15,6) and the 
> files we send as updates has schema like DECIMAL(15,2). 
>  
> Having two different schema in a Date partition and when we load the data of 
> a Date into spark, it is loading the data but the amount is getting 
> manipulated.
>  
> file1.snappy.parquet
>  ID: INT
>  AMOUNT: DECIMAL(15,6)
>  Content:
>  1,19500.00
>  2,198.34
> file2.snappy.parquet
>  ID: INT
>  AMOUNT: DECIMAL(15,2)
>  Content:
>  1,19500.00
>  3,198.34
> Load these two files togeather
> df3 = spark.read.parquet("output/")
> df3.show() #-we can see amount getting manipulated here,
> +-+---+
> |ID|   AMOUNT|
> +-+---+
> |1|1.95|
> |3|0.019834|
> |1|19500.00|
> |2|198.34|
> +-+---+
>  x
> Options Tried:
> We tried to give schema as String for all fields, but that didt work
> df3 = spark.read.format("parquet").schema(schema).load("output/")
> Error: "org.apache.spark.sql.execution.QueryExecutionException: Parquet 
> column cannot be converted in file file*.snappy.parquet. Column: 
> [AMOUNT], Expected: string, Found: INT64"
>  
> I know merge schema works if it finds few extra columns in one file but the 
> fileds which are in common needs to have same schema. That might nort work 
> here.
>  
> Looking for some work around solution here. Or if there is an option which I 
> havent tried you can point me to that.
>  
> With schema merging I got below eeror:
> An error occurred while calling o2272.parquet. : 
> org.apache.spark.SparkException: Failed merging schema: root |-- ID: string 
> (nullable = true) |-- AMOUNT: decimal(15,6) (nullable = true) at 
> org.apache.spark.sql.execution.datasources.SchemaMergeUtils$.$anonfun$mergeSchemasInParallel$5(SchemaMergeUtils.scala:100)
>  at 
> org.apache.spark.sql.execution.datasources.SchemaMergeUtils$.$anonfun$mergeSchemasInParallel$5$adapted(SchemaMergeUtils.scala:95)
>  at 
> scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36) at 
> scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33) 
> at 

[jira] [Comment Edited] (SPARK-32317) Parquet file loading with different schema(Decimal(N, P)) in files is not working as expected

2020-07-18 Thread JinxinTang (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-32317?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17160370#comment-17160370
 ] 

JinxinTang edited comment on SPARK-32317 at 7/18/20, 8:55 AM:
--

Please try use `spark.read.*option("mergeSchema", true)*.parquet`if need to 
check the schema, and we can get the following exception when decimal is not 
compatible
{code:java}
scala> spark.sql("select cast(19500.00 as decimal(15,2)) as 
b").write.mode("append").parquet("file:///tmp/schema")

scala> spark.sql("select cast(19500.00 as decimal(15,6)) as 
b").write.mode("append").parquet("file:///tmp/schema")
scala> 
spark.read.option("mergeSchema",true).parquet("file:///tmp/schema").show()
org.apache.spark.SparkException: Failed merging schema:
root
 |-- b: decimal(15,6) (nullable = false)  at 
org.apache.spark.sql.execution.datasources.SchemaMergeUtils$.$anonfun$mergeSchemasInParallel$5(SchemaMergeUtils.scala:100)
  at 
...
Caused by: org.apache.spark.SparkException: Failed to merge fields 'b' and 'b'. 
Failed to merge decimal types with incompatible scala 2 and 6
  at 
org.apache.spark.sql.types.StructType$.$anonfun$merge$2(StructType.scala:593){code}
test version: spark-2.4.6-bin-hadoop2.7, spark-3.0.0-bin-hadoop2.7


was (Author: jinxintang):
Please try use `spark.read.*option("mergeSchema", true)*.parquet`if need to 
check the schema, and we can get the following exception when decimal is not 
compatible
{code:java}
scala> spark.sql("select cast(19500.00 as decimal(15,2)) as 
b").write.mode("append").parquet("file:///tmp/schema")

scala> spark.sql("select cast(19500.00 as decimal(15,6)) as 
b").write.mode("append").parquet("file:///tmp/schema")
scala> spark.read.option("mergeSchema", 
true).parquet("file:///tmp/schema").show()
org.apache.spark.SparkException: Failed merging schema:
root
 |-- b: decimal(15,6) (nullable = false)  at 
org.apache.spark.sql.execution.datasources.SchemaMergeUtils$.$anonfun$mergeSchemasInParallel$5(SchemaMergeUtils.scala:100)
  at 
...
Caused by: org.apache.spark.SparkException: Failed to merge fields 'b' and 'b'. 
Failed to merge decimal types with incompatible scala 2 and 6
  at 
org.apache.spark.sql.types.StructType$.$anonfun$merge$2(StructType.scala:593){code}
test version: spark-2.4.6-bin-hadoop2.7, spark-3.0.0-bin-hadoop2.7

> Parquet file loading with different schema(Decimal(N, P)) in files is not 
> working as expected
> -
>
> Key: SPARK-32317
> URL: https://issues.apache.org/jira/browse/SPARK-32317
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 3.0.0
> Environment: Its failing in all environments that I tried.
>Reporter: Krish
>Priority: Major
>  Labels: easyfix
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> Hi,
>  
> We generate parquet files which are partitioned on Date on a daily basis, and 
> we send updates to historical data some times, what we noticed is due to some 
> configuration error the patch data schema is inconsistent to earlier files.
> Assuming we had files generated with schema having ID and Amount as fields. 
> Historical data is having schema like ID INT, AMOUNT DECIMAL(15,6) and the 
> files we send as updates has schema like DECIMAL(15,2). 
>  
> Having two different schema in a Date partition and when we load the data of 
> a Date into spark, it is loading the data but the amount is getting 
> manipulated.
>  
> file1.snappy.parquet
>  ID: INT
>  AMOUNT: DECIMAL(15,6)
>  Content:
>  1,19500.00
>  2,198.34
> file2.snappy.parquet
>  ID: INT
>  AMOUNT: DECIMAL(15,2)
>  Content:
>  1,19500.00
>  3,198.34
> Load these two files togeather
> df3 = spark.read.parquet("output/")
> df3.show() #-we can see amount getting manipulated here,
> +-+---+
> |ID|   AMOUNT|
> +-+---+
> |1|1.95|
> |3|0.019834|
> |1|19500.00|
> |2|198.34|
> +-+---+
>  x
> Options Tried:
> We tried to give schema as String for all fields, but that didt work
> df3 = spark.read.format("parquet").schema(schema).load("output/")
> Error: "org.apache.spark.sql.execution.QueryExecutionException: Parquet 
> column cannot be converted in file file*.snappy.parquet. Column: 
> [AMOUNT], Expected: string, Found: INT64"
>  
> I know merge schema works if it finds few extra columns in one file but the 
> fileds which are in common needs to have same schema. That might nort work 
> here.
>  
> Looking for some work around solution here. Or if there is an option which I 
> havent tried you can point me to that.
>  
> With schema 

[jira] [Comment Edited] (SPARK-32317) Parquet file loading with different schema(Decimal(N, P)) in files is not working as expected

2020-07-18 Thread JinxinTang (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-32317?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17160370#comment-17160370
 ] 

JinxinTang edited comment on SPARK-32317 at 7/18/20, 8:54 AM:
--

Please try use `spark.read.*option("mergeSchema", true)*.parquet`if need to 
check the schema, and we can get the following exception when decimal is not 
compatible
{code:java}
scala> spark.sql("select cast(19500.00 as decimal(15,2)) as 
b").write.mode("append").parquet("file:///tmp/schema")

scala> spark.sql("select cast(19500.00 as decimal(15,6)) as 
b").write.mode("append").parquet("file:///tmp/schema")
scala> spark.read.option("mergeSchema", 
true).parquet("file:///tmp/schema").show()
org.apache.spark.SparkException: Failed merging schema:
root
 |-- b: decimal(15,6) (nullable = false)  at 
org.apache.spark.sql.execution.datasources.SchemaMergeUtils$.$anonfun$mergeSchemasInParallel$5(SchemaMergeUtils.scala:100)
  at 
...
Caused by: org.apache.spark.SparkException: Failed to merge fields 'b' and 'b'. 
Failed to merge decimal types with incompatible scala 2 and 6
  at 
org.apache.spark.sql.types.StructType$.$anonfun$merge$2(StructType.scala:593){code}
test version: spark-2.4.6-bin-hadoop2.7, spark-3.0.0-bin-hadoop2.7


was (Author: jinxintang):
Please try use `spark.read.*option("mergeSchema", true)*.parquet`if need to 
check the schema, and we can get the following exception when decimal is not 
compatible
{code:java}
scala> spark.sql("select cast(19500.00 as decimal(15,2)) as 
b").write.mode("append").parquet("file:///tmp/schema")

scala> spark.sql("select cast(19500.00 as decimal(15,6)) as 
b").write.mode("append").parquet("file:///tmp/schema")
scala> spark.read.option("mergeSchema", 
true).parquet("file:///tmp/schema").show()
org.apache.spark.SparkException: Failed merging schema:
root
 |-- b: decimal(15,6) (nullable = false)  at 
org.apache.spark.sql.execution.datasources.SchemaMergeUtils$.$anonfun$mergeSchemasInParallel$5(SchemaMergeUtils.scala:100)
  at 
...
Caused by: org.apache.spark.SparkException: Failed to merge fields 'b' and 'b'. 
Failed to merge decimal types with incompatible scala 2 and 6
  at 
org.apache.spark.sql.types.StructType$.$anonfun$merge$2(StructType.scala:593){code}
test version: spark-3.0.0-bin-hadoop2.7, spark-3.0.0-bin-hadoop2.7

> Parquet file loading with different schema(Decimal(N, P)) in files is not 
> working as expected
> -
>
> Key: SPARK-32317
> URL: https://issues.apache.org/jira/browse/SPARK-32317
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 3.0.0
> Environment: Its failing in all environments that I tried.
>Reporter: Krish
>Priority: Major
>  Labels: easyfix
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> Hi,
>  
> We generate parquet files which are partitioned on Date on a daily basis, and 
> we send updates to historical data some times, what we noticed is due to some 
> configuration error the patch data schema is inconsistent to earlier files.
> Assuming we had files generated with schema having ID and Amount as fields. 
> Historical data is having schema like ID INT, AMOUNT DECIMAL(15,6) and the 
> files we send as updates has schema like DECIMAL(15,2). 
>  
> Having two different schema in a Date partition and when we load the data of 
> a Date into spark, it is loading the data but the amount is getting 
> manipulated.
>  
> file1.snappy.parquet
>  ID: INT
>  AMOUNT: DECIMAL(15,6)
>  Content:
>  1,19500.00
>  2,198.34
> file2.snappy.parquet
>  ID: INT
>  AMOUNT: DECIMAL(15,2)
>  Content:
>  1,19500.00
>  3,198.34
> Load these two files togeather
> df3 = spark.read.parquet("output/")
> df3.show() #-we can see amount getting manipulated here,
> +-+---+
> |ID|   AMOUNT|
> +-+---+
> |1|1.95|
> |3|0.019834|
> |1|19500.00|
> |2|198.34|
> +-+---+
>  x
> Options Tried:
> We tried to give schema as String for all fields, but that didt work
> df3 = spark.read.format("parquet").schema(schema).load("output/")
> Error: "org.apache.spark.sql.execution.QueryExecutionException: Parquet 
> column cannot be converted in file file*.snappy.parquet. Column: 
> [AMOUNT], Expected: string, Found: INT64"
>  
> I know merge schema works if it finds few extra columns in one file but the 
> fileds which are in common needs to have same schema. That might nort work 
> here.
>  
> Looking for some work around solution here. Or if there is an option which I 
> havent tried you can point me to that.
>  
> With schema 

[jira] [Comment Edited] (SPARK-32317) Parquet file loading with different schema(Decimal(N, P)) in files is not working as expected

2020-07-18 Thread JinxinTang (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-32317?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17160370#comment-17160370
 ] 

JinxinTang edited comment on SPARK-32317 at 7/18/20, 8:53 AM:
--

Please try use `spark.read.*option("mergeSchema", true)*.parquet`if need to 
check the schema, and we can get the following exception when decimal is not 
compatible
{code:java}
scala> spark.sql("select cast(19500.00 as decimal(15,2)) as 
b").write.mode("append").parquet("file:///tmp/schema")

scala> spark.sql("select cast(19500.00 as decimal(15,6)) as 
b").write.mode("append").parquet("file:///tmp/schema")
scala> spark.read.option("mergeSchema", 
true).parquet("file:///tmp/schema").show()
org.apache.spark.SparkException: Failed merging schema:
root
 |-- b: decimal(15,6) (nullable = false)  at 
org.apache.spark.sql.execution.datasources.SchemaMergeUtils$.$anonfun$mergeSchemasInParallel$5(SchemaMergeUtils.scala:100)
  at 
...
Caused by: org.apache.spark.SparkException: Failed to merge fields 'b' and 'b'. 
Failed to merge decimal types with incompatible scala 2 and 6
  at 
org.apache.spark.sql.types.StructType$.$anonfun$merge$2(StructType.scala:593){code}
test version: spark-3.0.0-bin-hadoop2.7, spark-3.0.0-bin-hadoop2.7


was (Author: jinxintang):
Please try use `spark.read.*option("mergeSchema", true)*.parquet`if need to 
check the schema, and we can get the following exception when decimal is not 
compatible
{code:java}
//
scala> spark.sql("select cast(19500.00 as decimal(15,2)) as 
b").write.mode("append").parquet("file:///tmp/schema")

scala> spark.sql("select cast(19500.00 as decimal(15,6)) as 
b").write.mode("append").parquet("file:///tmp/schema")scala> 
spark.read.option("mergeSchema", true).parquet("file:///tmp/schema").show()
org.apache.spark.SparkException: Failed merging schema:
root
 |-- b: decimal(15,6) (nullable = false)  at 
org.apache.spark.sql.execution.datasources.SchemaMergeUtils$.$anonfun$mergeSchemasInParallel$5(SchemaMergeUtils.scala:100)
  at 
org.apache.spark.sql.execution.datasources.SchemaMergeUtils$.$anonfun$mergeSchemasInParallel$5$adapted(SchemaMergeUtils.scala:95)
  at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
  at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
  at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
  at 
org.apache.spark.sql.execution.datasources.SchemaMergeUtils$.mergeSchemasInParallel(SchemaMergeUtils.scala:95)
  at 
org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$.mergeSchemasInParallel(ParquetFileFormat.scala:493)
  at 
org.apache.spark.sql.execution.datasources.parquet.ParquetUtils$.inferSchema(ParquetUtils.scala:107)
  at 
org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat.inferSchema(ParquetFileFormat.scala:163)
  at 
org.apache.spark.sql.execution.datasources.DataSource.$anonfun$getOrInferFileFormatSchema$11(DataSource.scala:193)
  at scala.Option.orElse(Option.scala:447)
  at 
org.apache.spark.sql.execution.datasources.DataSource.getOrInferFileFormatSchema(DataSource.scala:190)
  at 
org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:401)
  at 
org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:279)
  at 
org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:268)
  at scala.Option.getOrElse(Option.scala:189)
  at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:268)
  at org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:737)
  at org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:715)
  ... 47 elided
Caused by: org.apache.spark.SparkException: Failed to merge fields 'b' and 'b'. 
Failed to merge decimal types with incompatible scala 2 and 6
  at 
org.apache.spark.sql.types.StructType$.$anonfun$merge$2(StructType.scala:593){code}
test version: spark-3.0.0-bin-hadoop2.7, spark-3.0.0-bin-hadoop2.7

> Parquet file loading with different schema(Decimal(N, P)) in files is not 
> working as expected
> -
>
> Key: SPARK-32317
> URL: https://issues.apache.org/jira/browse/SPARK-32317
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 3.0.0
> Environment: Its failing in all environments that I tried.
>Reporter: Krish
>Priority: Major
>  Labels: easyfix
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> Hi,
>  
> We generate parquet files which are partitioned on Date on a daily basis, and 
> we send updates to historical data some times, what we noticed is 

[jira] [Comment Edited] (SPARK-32317) Parquet file loading with different schema(Decimal(N, P)) in files is not working as expected

2020-07-18 Thread JinxinTang (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-32317?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17160370#comment-17160370
 ] 

JinxinTang edited comment on SPARK-32317 at 7/18/20, 8:51 AM:
--

Please try use `spark.read.*option("mergeSchema", true)*.parquet`if need to 
check the schema, and we can get the following exception when decimal is not 
compatible
{code:java}
//
scala> spark.sql("select cast(19500.00 as decimal(15,2)) as 
b").write.mode("append").parquet("file:///tmp/schema")

scala> spark.sql("select cast(19500.00 as decimal(15,6)) as 
b").write.mode("append").parquet("file:///tmp/schema")scala> 
spark.read.option("mergeSchema", true).parquet("file:///tmp/schema").show()
org.apache.spark.SparkException: Failed merging schema:
root
 |-- b: decimal(15,6) (nullable = false)  at 
org.apache.spark.sql.execution.datasources.SchemaMergeUtils$.$anonfun$mergeSchemasInParallel$5(SchemaMergeUtils.scala:100)
  at 
org.apache.spark.sql.execution.datasources.SchemaMergeUtils$.$anonfun$mergeSchemasInParallel$5$adapted(SchemaMergeUtils.scala:95)
  at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
  at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
  at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
  at 
org.apache.spark.sql.execution.datasources.SchemaMergeUtils$.mergeSchemasInParallel(SchemaMergeUtils.scala:95)
  at 
org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$.mergeSchemasInParallel(ParquetFileFormat.scala:493)
  at 
org.apache.spark.sql.execution.datasources.parquet.ParquetUtils$.inferSchema(ParquetUtils.scala:107)
  at 
org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat.inferSchema(ParquetFileFormat.scala:163)
  at 
org.apache.spark.sql.execution.datasources.DataSource.$anonfun$getOrInferFileFormatSchema$11(DataSource.scala:193)
  at scala.Option.orElse(Option.scala:447)
  at 
org.apache.spark.sql.execution.datasources.DataSource.getOrInferFileFormatSchema(DataSource.scala:190)
  at 
org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:401)
  at 
org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:279)
  at 
org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:268)
  at scala.Option.getOrElse(Option.scala:189)
  at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:268)
  at org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:737)
  at org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:715)
  ... 47 elided
Caused by: org.apache.spark.SparkException: Failed to merge fields 'b' and 'b'. 
Failed to merge decimal types with incompatible scala 2 and 6
  at 
org.apache.spark.sql.types.StructType$.$anonfun$merge$2(StructType.scala:593){code}
test version: spark-3.0.0-bin-hadoop2.7, spark-3.0.0-bin-hadoop2.7


was (Author: jinxintang):
Please try use `spark.read.*option("mergeSchema", true)*.parquet`if need to 
check the schema, and we can get the following exception when decimal is not 
compatible `org.apache.spark.SparkException: Failed to merge fields 'b' and 
'b'. Failed to merge decimal types with incompatible scala 6 and 2`,

test enviroment: spark-3.0.0-bin-hadoop2.7 and spark-3.0.0-bin-hadoop2.7

> Parquet file loading with different schema(Decimal(N, P)) in files is not 
> working as expected
> -
>
> Key: SPARK-32317
> URL: https://issues.apache.org/jira/browse/SPARK-32317
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 3.0.0
> Environment: Its failing in all environments that I tried.
>Reporter: Krish
>Priority: Major
>  Labels: easyfix
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> Hi,
>  
> We generate parquet files which are partitioned on Date on a daily basis, and 
> we send updates to historical data some times, what we noticed is due to some 
> configuration error the patch data schema is inconsistent to earlier files.
> Assuming we had files generated with schema having ID and Amount as fields. 
> Historical data is having schema like ID INT, AMOUNT DECIMAL(15,6) and the 
> files we send as updates has schema like DECIMAL(15,2). 
>  
> Having two different schema in a Date partition and when we load the data of 
> a Date into spark, it is loading the data but the amount is getting 
> manipulated.
>  
> file1.snappy.parquet
>  ID: INT
>  AMOUNT: DECIMAL(15,6)
>  Content:
>  1,19500.00
>  2,198.34
> file2.snappy.parquet
>  ID: INT
>  AMOUNT: DECIMAL(15,2)
>  Content:
>  1,19500.00
>  3,198.34
> Load these two files togeather
> df3 = 

[jira] [Comment Edited] (SPARK-32317) Parquet file loading with different schema(Decimal(N, P)) in files is not working as expected

2020-07-17 Thread JinxinTang (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-32317?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17159864#comment-17159864
 ] 

JinxinTang edited comment on SPARK-32317 at 7/17/20, 11:12 AM:
---

Thanks for your advice, maybe we should check the decimal schema and throw 
related exception if not consistent. Meanwhile, write decimal to float or 
double seems not realistic. I will try this later.


was (Author: jinxintang):
Thanks for your advice, maybe we should check the decimal schema and throw 
related exception if not consistent, meanwhile write decimal to float or double 
seems not realistic. I will try this later.

> Parquet file loading with different schema(Decimal(N, P)) in files is not 
> working as expected
> -
>
> Key: SPARK-32317
> URL: https://issues.apache.org/jira/browse/SPARK-32317
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 3.0.0
> Environment: Its failing in all environments that I tried.
>Reporter: Krish
>Priority: Major
>  Labels: easyfix
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> Hi,
>  
> We generate parquet files which are partitioned on Date on a daily basis, and 
> we send updates to historical data some times, what we noticed is due to some 
> configuration error the patch data schema is inconsistent to earlier files.
> Assuming we had files generated with schema having ID and Amount as fields. 
> Historical data is having schema like ID INT, AMOUNT DECIMAL(15,6) and the 
> files we send as updates has schema like DECIMAL(15,2). 
>  
> Having two different schema in a Date partition and when we load the data of 
> a Date into spark, it is loading the data but the amount is getting 
> manipulated.
>  
> file1.snappy.parquet
>  ID: INT
>  AMOUNT: DECIMAL(15,6)
>  Content:
>  1,19500.00
>  2,198.34
> file2.snappy.parquet
>  ID: INT
>  AMOUNT: DECIMAL(15,2)
>  Content:
>  1,19500.00
>  3,198.34
> Load these two files togeather
> df3 = spark.read.parquet("output/")
> df3.show() #-we can see amount getting manipulated here,
> +-+---+
> |ID|   AMOUNT|
> +-+---+
> |1|1.95|
> |3|0.019834|
> |1|19500.00|
> |2|198.34|
> +-+---+
>  x
> Options Tried:
> We tried to give schema as String for all fields, but that didt work
> df3 = spark.read.format("parquet").schema(schema).load("output/")
> Error: "org.apache.spark.sql.execution.QueryExecutionException: Parquet 
> column cannot be converted in file file*.snappy.parquet. Column: 
> [AMOUNT], Expected: string, Found: INT64"
>  
> I know merge schema works if it finds few extra columns in one file but the 
> fileds which are in common needs to have same schema. That might nort work 
> here.
>  
> Looking for some work around solution here. Or if there is an option which I 
> havent tried you can point me to that.
>  
> With schema merging I got below eeror:
> An error occurred while calling o2272.parquet. : 
> org.apache.spark.SparkException: Failed merging schema: root |-- ID: string 
> (nullable = true) |-- AMOUNT: decimal(15,6) (nullable = true) at 
> org.apache.spark.sql.execution.datasources.SchemaMergeUtils$.$anonfun$mergeSchemasInParallel$5(SchemaMergeUtils.scala:100)
>  at 
> org.apache.spark.sql.execution.datasources.SchemaMergeUtils$.$anonfun$mergeSchemasInParallel$5$adapted(SchemaMergeUtils.scala:95)
>  at 
> scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36) at 
> scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33) 
> at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198) at 
> org.apache.spark.sql.execution.datasources.SchemaMergeUtils$.mergeSchemasInParallel(SchemaMergeUtils.scala:95)
>  at 
> org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$.mergeSchemasInParallel(ParquetFileFormat.scala:485)
>  at 
> org.apache.spark.sql.execution.datasources.parquet.ParquetUtils$.inferSchema(ParquetUtils.scala:107)
>  at 
> org.apache.spark.sql.execution.datasources.v2.parquet.ParquetTable.inferSchema(ParquetTable.scala:44)
>  at 
> org.apache.spark.sql.execution.datasources.v2.FileTable.$anonfun$dataSchema$4(FileTable.scala:69)
>  at scala.Option.orElse(Option.scala:447) at 
> org.apache.spark.sql.execution.datasources.v2.FileTable.dataSchema$lzycompute(FileTable.scala:69)
>  at 
> org.apache.spark.sql.execution.datasources.v2.FileTable.dataSchema(FileTable.scala:63)
>  at 
> org.apache.spark.sql.execution.datasources.v2.FileTable.schema$lzycompute(FileTable.scala:82)
>  at 
> org.apache.spark.sql.execution.datasources.v2.FileTable.schema(FileTable.scala:80)
>  at 
> 

[jira] [Comment Edited] (SPARK-32317) Parquet file loading with different schema(Decimal(N, P)) in files is not working as expected

2020-07-17 Thread JinxinTang (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-32317?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17159864#comment-17159864
 ] 

JinxinTang edited comment on SPARK-32317 at 7/17/20, 11:11 AM:
---

Thanks for your advice, maybe we should check the decimal schema and throw 
related exception if not consistent, meanwhile write decimal to float or double 
seems not realistic. I will try this later.


was (Author: jinxintang):
Yes, maybe we should check the decimal schema and throw related exception if 
not consistent. And write decimal to float or double seems not realistic.

> Parquet file loading with different schema(Decimal(N, P)) in files is not 
> working as expected
> -
>
> Key: SPARK-32317
> URL: https://issues.apache.org/jira/browse/SPARK-32317
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 3.0.0
> Environment: Its failing in all environments that I tried.
>Reporter: Krish
>Priority: Major
>  Labels: easyfix
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> Hi,
>  
> We generate parquet files which are partitioned on Date on a daily basis, and 
> we send updates to historical data some times, what we noticed is due to some 
> configuration error the patch data schema is inconsistent to earlier files.
> Assuming we had files generated with schema having ID and Amount as fields. 
> Historical data is having schema like ID INT, AMOUNT DECIMAL(15,6) and the 
> files we send as updates has schema like DECIMAL(15,2). 
>  
> Having two different schema in a Date partition and when we load the data of 
> a Date into spark, it is loading the data but the amount is getting 
> manipulated.
>  
> file1.snappy.parquet
>  ID: INT
>  AMOUNT: DECIMAL(15,6)
>  Content:
>  1,19500.00
>  2,198.34
> file2.snappy.parquet
>  ID: INT
>  AMOUNT: DECIMAL(15,2)
>  Content:
>  1,19500.00
>  3,198.34
> Load these two files togeather
> df3 = spark.read.parquet("output/")
> df3.show() #-we can see amount getting manipulated here,
> +-+---+
> |ID|   AMOUNT|
> +-+---+
> |1|1.95|
> |3|0.019834|
> |1|19500.00|
> |2|198.34|
> +-+---+
>  x
> Options Tried:
> We tried to give schema as String for all fields, but that didt work
> df3 = spark.read.format("parquet").schema(schema).load("output/")
> Error: "org.apache.spark.sql.execution.QueryExecutionException: Parquet 
> column cannot be converted in file file*.snappy.parquet. Column: 
> [AMOUNT], Expected: string, Found: INT64"
>  
> I know merge schema works if it finds few extra columns in one file but the 
> fileds which are in common needs to have same schema. That might nort work 
> here.
>  
> Looking for some work around solution here. Or if there is an option which I 
> havent tried you can point me to that.
>  
> With schema merging I got below eeror:
> An error occurred while calling o2272.parquet. : 
> org.apache.spark.SparkException: Failed merging schema: root |-- ID: string 
> (nullable = true) |-- AMOUNT: decimal(15,6) (nullable = true) at 
> org.apache.spark.sql.execution.datasources.SchemaMergeUtils$.$anonfun$mergeSchemasInParallel$5(SchemaMergeUtils.scala:100)
>  at 
> org.apache.spark.sql.execution.datasources.SchemaMergeUtils$.$anonfun$mergeSchemasInParallel$5$adapted(SchemaMergeUtils.scala:95)
>  at 
> scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36) at 
> scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33) 
> at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198) at 
> org.apache.spark.sql.execution.datasources.SchemaMergeUtils$.mergeSchemasInParallel(SchemaMergeUtils.scala:95)
>  at 
> org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$.mergeSchemasInParallel(ParquetFileFormat.scala:485)
>  at 
> org.apache.spark.sql.execution.datasources.parquet.ParquetUtils$.inferSchema(ParquetUtils.scala:107)
>  at 
> org.apache.spark.sql.execution.datasources.v2.parquet.ParquetTable.inferSchema(ParquetTable.scala:44)
>  at 
> org.apache.spark.sql.execution.datasources.v2.FileTable.$anonfun$dataSchema$4(FileTable.scala:69)
>  at scala.Option.orElse(Option.scala:447) at 
> org.apache.spark.sql.execution.datasources.v2.FileTable.dataSchema$lzycompute(FileTable.scala:69)
>  at 
> org.apache.spark.sql.execution.datasources.v2.FileTable.dataSchema(FileTable.scala:63)
>  at 
> org.apache.spark.sql.execution.datasources.v2.FileTable.schema$lzycompute(FileTable.scala:82)
>  at 
> org.apache.spark.sql.execution.datasources.v2.FileTable.schema(FileTable.scala:80)
>  at 
> org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation$.create(DataSourceV2Relation.scala:141)
>  at 
> 

[jira] [Comment Edited] (SPARK-32317) Parquet file loading with different schema(Decimal(N, P)) in files is not working as expected

2020-07-17 Thread JinxinTang (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-32317?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17159864#comment-17159864
 ] 

JinxinTang edited comment on SPARK-32317 at 7/17/20, 11:07 AM:
---

Yes, maybe we should check the decimal schema and throw related exception if 
not consistent. And write decimal to float or double seems not realistic.


was (Author: jinxintang):
Yes, maybe we should check the decimal schema and throw related exception if 
not consistent. And write decimal to float or double seems not realistic only 
int32 or int64 is allowed in parquet.

> Parquet file loading with different schema(Decimal(N, P)) in files is not 
> working as expected
> -
>
> Key: SPARK-32317
> URL: https://issues.apache.org/jira/browse/SPARK-32317
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 3.0.0
> Environment: Its failing in all environments that I tried.
>Reporter: Krish
>Priority: Major
>  Labels: easyfix
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> Hi,
>  
> We generate parquet files which are partitioned on Date on a daily basis, and 
> we send updates to historical data some times, what we noticed is due to some 
> configuration error the patch data schema is inconsistent to earlier files.
> Assuming we had files generated with schema having ID and Amount as fields. 
> Historical data is having schema like ID INT, AMOUNT DECIMAL(15,6) and the 
> files we send as updates has schema like DECIMAL(15,2). 
>  
> Having two different schema in a Date partition and when we load the data of 
> a Date into spark, it is loading the data but the amount is getting 
> manipulated.
>  
> file1.snappy.parquet
>  ID: INT
>  AMOUNT: DECIMAL(15,6)
>  Content:
>  1,19500.00
>  2,198.34
> file2.snappy.parquet
>  ID: INT
>  AMOUNT: DECIMAL(15,2)
>  Content:
>  1,19500.00
>  3,198.34
> Load these two files togeather
> df3 = spark.read.parquet("output/")
> df3.show() #-we can see amount getting manipulated here,
> +-+---+
> |ID|   AMOUNT|
> +-+---+
> |1|1.95|
> |3|0.019834|
> |1|19500.00|
> |2|198.34|
> +-+---+
>  x
> Options Tried:
> We tried to give schema as String for all fields, but that didt work
> df3 = spark.read.format("parquet").schema(schema).load("output/")
> Error: "org.apache.spark.sql.execution.QueryExecutionException: Parquet 
> column cannot be converted in file file*.snappy.parquet. Column: 
> [AMOUNT], Expected: string, Found: INT64"
>  
> I know merge schema works if it finds few extra columns in one file but the 
> fileds which are in common needs to have same schema. That might nort work 
> here.
>  
> Looking for some work around solution here. Or if there is an option which I 
> havent tried you can point me to that.
>  
> With schema merging I got below eeror:
> An error occurred while calling o2272.parquet. : 
> org.apache.spark.SparkException: Failed merging schema: root |-- ID: string 
> (nullable = true) |-- AMOUNT: decimal(15,6) (nullable = true) at 
> org.apache.spark.sql.execution.datasources.SchemaMergeUtils$.$anonfun$mergeSchemasInParallel$5(SchemaMergeUtils.scala:100)
>  at 
> org.apache.spark.sql.execution.datasources.SchemaMergeUtils$.$anonfun$mergeSchemasInParallel$5$adapted(SchemaMergeUtils.scala:95)
>  at 
> scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36) at 
> scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33) 
> at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198) at 
> org.apache.spark.sql.execution.datasources.SchemaMergeUtils$.mergeSchemasInParallel(SchemaMergeUtils.scala:95)
>  at 
> org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$.mergeSchemasInParallel(ParquetFileFormat.scala:485)
>  at 
> org.apache.spark.sql.execution.datasources.parquet.ParquetUtils$.inferSchema(ParquetUtils.scala:107)
>  at 
> org.apache.spark.sql.execution.datasources.v2.parquet.ParquetTable.inferSchema(ParquetTable.scala:44)
>  at 
> org.apache.spark.sql.execution.datasources.v2.FileTable.$anonfun$dataSchema$4(FileTable.scala:69)
>  at scala.Option.orElse(Option.scala:447) at 
> org.apache.spark.sql.execution.datasources.v2.FileTable.dataSchema$lzycompute(FileTable.scala:69)
>  at 
> org.apache.spark.sql.execution.datasources.v2.FileTable.dataSchema(FileTable.scala:63)
>  at 
> org.apache.spark.sql.execution.datasources.v2.FileTable.schema$lzycompute(FileTable.scala:82)
>  at 
> org.apache.spark.sql.execution.datasources.v2.FileTable.schema(FileTable.scala:80)
>  at 
> org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation$.create(DataSourceV2Relation.scala:141)
>  at 
>