Since SPARK-8406 is serious, we hope to ship it ASAP, possibly next
week, but I can't say it's a promise yet. However, you can cherry pick
the commit as soon as the fix is merged into branch-1.4. Sorry for the
troubles!
Cheng
On 6/17/15 1:42 AM, Nathan McCarthy wrote:
Thanks Cheng. Nice find!
Let me know if there is anything we can do to help on this end with
contributing a fix or testing.
Side note - any ideas on the 1.4.1 eta? There are a few bug fixes we
need in there.
Cheers,
Nathan
From: Cheng Lian
Date: Wednesday, 17 June 2015 6:25 pm
To: Nathan, "[email protected] <mailto:[email protected]>"
Subject: Re: Spark 1.4 DataFrame Parquet file writing - missing random
rows/partitions
Hi Nathan,
Thanks a lot for the detailed report, especially the information about
nonconsecutive part numbers. It's confirmed to be a race condition bug
and just filed https://issues.apache.org/jira/browse/SPARK-8406 to
track this. Will deliver a fix ASAP and this will be included in 1.4.1.
Best,
Cheng
On 6/16/15 12:30 AM, Nathan McCarthy wrote:
Hi all,
Looks like data frame parquet writing is very broken in Spark 1.4.0.
We had no problems with Spark 1.3.
When trying to save a data frame with *569610608* rows.
dfc.write.format("parquet").save(“/data/map_parquet_file")
We get random results between runs. Caching the data frame in memory
makes no difference. It looks like the write out misses some of the
RDD partitions. We have an RDD with *6750* partitions. When we write
out we get less files out than the number of partitions. When reading
the data back in and running a count, we get smaller number of rows.
I’ve tried counting the rows in all different ways. All return the
same result, *560214031* rows, missing about 9.4 million rows (0.15%).
qc.read.parquet("/data/map_parquet_file").count
qc.read.parquet("/data/map_parquet_file").rdd.count
qc.read.parquet("/data/map_parquet_file").mapPartitions{itr => var c
= 0; itr.foreach(_ => c = c + 1); Seq(c).toIterator }.reduce(_ + _)
Looking on HDFS the files, there are /6643/ .parquet files. 107
missing partitions (about 0.15%).
Then writing out the same cached DF again to a new file gives *6717*
files on hdfs (about 33 files missing or 0.5%);
dfc.write.parquet(“/data/map_parquet_file_2")
And we get *566670107* rows back (about 3million missing ~0.5%);
qc.read.parquet("/data/map_parquet_file_2").count
Writing the same df out to json writes the expected number (*6750*)
of parquet files and returns the right number of rows /569610608/.
dfc.write.format("json").save("/data/map_parquet_file_3")
qc.read.format("json").load("/data/map_parquet_file_3").count
One thing to note is that the parquet part files on HDFS are not the
normal sequential part numbers like for the json output and parquet
output in Spark 1.3.
part-r-06151.gz.parquet part-r-118401.gz.parquet
part-r-146249.gz.parquet part-r-196755.gz.parquet
part-r-35811.gz.parquet part-r-55628.gz.parquet
part-r-73497.gz.parquet part-r-97237.gz.parquet
part-r-06161.gz.parquet part-r-118406.gz.parquet
part-r-146254.gz.parquet part-r-196763.gz.parquet
part-r-35826.gz.parquet part-r-55647.gz.parquet
part-r-73500.gz.parquet _SUCCESS
We are using MapR 4.0.2 for hdfs.
Any ideas?
Cheers,
Nathan