Dear spark user ml members,
I have quite messy input data so it is difficult to load them as a dataframe object directly. What I did is to load it as an RDD of strings first, convert it to an RDD of pyspark.sql.Row objects, then use toDF method as below. mydf = myrdd.map(parse).toDF() I didn't expect any problem from this very simple code at first. But, when I tested it with a bunch of data, I found that this approach fails with the following exception. java.lang.IllegalStateException: Input row doesn't have expected number of values required by the schema. 10 fields are required while 9 values are provided. at org.apache.spark.sql.execution.python.EvaluatePython$.fromJava(EvaluatePython.scala:147) at org.apache.spark.sql.SparkSession$$anonfun$7.apply(SparkSession.scala:665) at org.apache.spark.sql.SparkSession$$anonfun$7.apply(SparkSession.scala:665) at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) ... This exception comes from the fact that some Row objects in RDD have missing fields. For example, the following example fails with the same exception d1 = [Row(k1="value1.1", k2="value1.2")] d2 = [Row(k1="value2.1")] rdd1 = spark.sparkContext.parallelize(d1) rdd2 = spark.sparkContext.parallelize(d2) urdd = rdd1.union(rdd2) urdd.collect() [Row(k1='value1.1', k2='value1.2'), Row(k1='value2.1')] urdd.toDF() DataFrame[k1: string, k2: string] urdd.toDF().show() --> fail with the same exception While digging into the code, I found that Row object raises an exception if it does not have a given key as follows. # spark/python/pyspark/sql/types.py def _verify_type(obj, dataType, nullable=True): ... elif isinstance(dataType, StructType): ... elif isinstance(obj, Row) and getattr(obj, "__from_dict__", False): # the order in obj could be different than dataType.fields for f in dataType.fields: _verify_type(obj[f.name], f.dataType, f.nullable) --> obj[f.name] raise ValueError(item) exception if the key does not exist. I think that raising an exception in this situation is a reasonable approach. However, if I use an RDD of dict objects, instead of Row objects, the convert process succeed as follows by filling missing columns with null values. dict1 = [{"k1":"v1.1", "k2":"v1.2"}] dict2 = [{"k1":"v2.1"}] rdd1 = spark.sparkContext.parallelize(dict1) rdd2 = spark.sparkContext.parallelize(dict2) rdd1.collect() [{'k2': 'v1.2', 'k1': 'v1.1'}] rdd2.collect() [{'k1': 'v2.1'}] urdd = rdd1.union(rdd2) urdd.collect() [{'k2': 'v1.2', 'k1': 'v1.1'}, {'k1': 'v2.1'}] spark.createDataFrame(urdd).show() +----+----+ | k1| k2| +----+----+ |v1.1|v1.2| |v2.1|null| +----+----+ urdd.toDF().show() +----+----+ | k1| k2| +----+----+ |v1.1|v1.2| |v2.1|null| +----+----+ I am wonder whether this difference is an expected result or not. Best wishes, Han-cheol Han-Cheol Cho Data Laboratory / Data Scientist <!-- <span id="deptLineBR"><br></span> --> 〒160-0022 東京都新宿区新宿6-27-30 新宿イーストサイドスクエア13階 Email hancheol....@nhn-techorus.com