I don’t think this is a bug either. For an empty JSON array |[]|,
there’s simply no way to infer its actual data type, and in this case
Spark SQL just tries to fill in the “safest” type, which is
|StringType|, because basically you can cast any data type to |StringType|.
In general, schema inference/evolution is a hard problem. Especially
when schemaless data formats like JSON are used in the data pipeline,
because type information gets lost along the way. Spark SQL tries to
minimize the efforts, but it can’t do all the work for you if the type
information of your data is intrinsically incomplete, or the schema is
evolving in an incompatible way (required columns become optional, or
changing data types of existing columns).
Cheng
On 7/24/15 12:23 AM, Akhil Das wrote:
Currently, the only way for you would be to create proper schema for
the data. This is not a bug, but you could open a jira (since this
would help others to solve their similar use-cases) for feature and in
future version it could be implemented and included.
Thanks
Best Regards
On Tue, Jul 21, 2015 at 4:41 PM, Krzysztof Zarzycki
<k.zarzy...@gmail.com <mailto:k.zarzy...@gmail.com>> wrote:
Hi everyone,
I have pretty challenging problem with reading/writing multiple
parquet files with streaming, but let me introduce my data flow:
I have a lot of json events streaming to my platform. All of them
have the same structure, but fields are mostly optional. Some of
the fields are arrays with structs inside.
These arrays can be empty, but sometimes they contain the data
(structs).
Now I'm using Spark SQL & Streaming to:
0. Stream data from Kafka
val stream = KafkaUtils.createDirectStream ...
1. read json data to json dataframe:
stream.foreachRDD( rdd => {
val dataRdd : RDD[String] = myTransform(rdd)
val jsonDf = sql.read.json(dataRdd)
2. write jsonDf to Parquet files:
if (firstRun) {
jsonRdd.write.parquet("parquet-events")
firstRun =false
}else { // the table has to exist to be able to append data.
jsonRdd.write.mode(SaveMode.Append).parquet("parquet-events")
}
})
All the writing goes fine. It produces multiple files, each for
one batch of data.
But the problem is on reading the data:
scala> val sqlContext = new org.apache.spark.sql.SQLContext(sc)
scala> val events = sqlContext.read.parquet("parquet-events")
org.apache.spark.SparkException: Failed to merge incompatible
schemas StructType...
Caused by: org.apache.spark.SparkException: Failed to merge
incompatible data types StringType and
StructType(StructField(key,StringType,true),
StructField(value,StringType,true))
Indeed the printed schemas contain mismatched types of few fields,
e.g.:
StructField(details,ArrayType(StringType,true),true)
vs
StructField(details,ArrayType(StructType(StructField(key,StringType,true),
StructField(value,StringType,tru e)),true),true)
It seems that bad thing happened in read.json: itrecognized my
array fields differently: when array is empty as containing
Strings; when filled with data as containing structs.
The code of json/InferSchema indeed suggests that:
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/json/InferSchema.scala#L127
WherecanonicalizeType method replaces NullType with StringType in
my empty arrays.
This is a serious problem for someone trying to stream data from
json to parquet tables . Does anyone have ideas how to handle this
problem? My ideas are (some non-exclusive):
1. Have schema perfectly defined on my data. This is a last resort
as I wanted to create schema-less solution.
2. Write my own schema inference, that removes empty arrays from
schema. Then pass schema directly to read method. I could even use
& modify InferSchema class from spark source, but it is private
unfortunately... So I need to copy paste it.
3. Submit a bug to Spark about it. Do you also think it is a bug?
It's a blocker for me currently, any help will be appreciated!
Cheers,
Krzysztof