Could you please elaborate on what kind of errors are those bad Parquet files causing? In what ways are they miswritten?

Cheng

On 9/28/15 4:03 PM, jordan.tho...@accenture.com wrote:

Ah, yes, I see that it has been turned off now, that’s why it wasn’t working. Thank you, this is helpful! The problem now is to filter out bad (miswritten) Parquet files, as they are causing this operation to fail.

Any suggestions on detecting them quickly and easily?

*From:*Cheng Lian [mailto:lian.cs....@gmail.com]
*Sent:* Monday, September 28, 2015 5:56 PM
*To:* Thomas, Jordan <jordan.tho...@accenture.com>; mich...@databricks.com
*Cc:* user@spark.apache.org
*Subject:* Re: Performance when iterating over many parquet files

Also, you may find more details in the programming guide:

- http://spark.apache.org/docs/latest/sql-programming-guide.html#schema-merging - http://spark.apache.org/docs/latest/sql-programming-guide.html#configuration

Cheng

On 9/28/15 3:54 PM, Cheng Lian wrote:

    I guess you're probably using Spark 1.5? Spark SQL does support
    schema merging, but we disabled it by default since 1.5 because it
    introduces extra performance costs (it's turned on by default in
    1.4 and 1.3).

    You may enable schema merging via either the Parquet data source
    specific option "mergeSchema":

      sqlContext.read.option("mergeSchema", "true").parquet(path)

    or the global SQL option "spark.sql.parquet.mergeSchema":

      sqlContext.sql("SET spark.sql.parquet.mergeSchema=true")
      sqlContext.read.parquet(path)

    Cheng

    On 9/28/15 3:45 PM, jordan.tho...@accenture.com
    <mailto:jordan.tho...@accenture.com> wrote:

        Dear Michael,

        Thank you very much for your help.

        I should have mentioned in my original email, I did try the
        sequence notation.  It doesn’t seem to have the desired
        effect.  Maybe I should say that each one of these files has a
        different schema.  When I use that call, I’m not ending up
        with a data frame with columns from all of the files taken
        together, but just one of them.  I’m tracing through the code
        trying to understand exactly what is happening with the
        Seq[String] call. Maybe you know?  Is it trying to do some
        kind of schema merging?

        Also, it seems that even if I could get it to work, it would
        require some parsing of the resulting schemas to find the
        invalid files.  I would like to capture these errors on read.

        The parquet files  currently average about 60 MB in size, with
        min about 40 MB and max about 500 or so.  I could coalesce,
        but they do correspond to logical entities and there are a
        number of use-case specific reasons to keep them separate.

        Thanks,

        Jordan

        *From:*Michael Armbrust [mailto:mich...@databricks.com]
        *Sent:* Monday, September 28, 2015 4:02 PM
        *To:* Thomas, Jordan <jordan.tho...@accenture.com>
        <mailto:jordan.tho...@accenture.com>
        *Cc:* user <user@spark.apache.org> <mailto:user@spark.apache.org>
        *Subject:* Re: Performance when iterating over many parquet files

        Another note: for best performance you are going to want your
        parquet files to be pretty big (100s of mb).  You could
        coalesce them and write them out for more efficient repeat
        querying.

        On Mon, Sep 28, 2015 at 2:00 PM, Michael Armbrust
        <mich...@databricks.com <mailto:mich...@databricks.com>> wrote:

            sqlContext.read.parquet
            
<https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala#L258>
            takes lists of files.

            val fileList = sc.textFile("file_list.txt").collect() //
            this works but using spark is possibly overkill
            val dataFrame = sqlContext.read.parquet(fileList: _*)

            On Mon, Sep 28, 2015 at 1:35 PM, jwthomas
            <jordan.tho...@accenture.com
            <mailto:jordan.tho...@accenture.com>> wrote:

                We are working with use cases where we need to do
                batch processing on a large
                number (hundreds of thousands) of Parquet files.  The
                processing is quite
                similar per file.  There are a many aggregates that
                are very SQL-friendly
                (computing averages, maxima, minima, aggregations on
                single columns with
                some selection criteria).  There are also some
                processing that is more
                advanced time-series processing (continuous wavelet
                transforms and the
                like).  This all seems like a good use case for Spark.

                But I'm having performance problems. Let's take a look
                at something very
                simple, which simply checks whether the parquet files
                are readable.

                Code that seems natural but doesn't work:

                import scala.util.{Try, Success, Failure} val
                parquetFiles =
                sc.textFile("file_list.txt") val successes =
                parquetFiles.map(x => (x,
                Try(sqlContext.read.parquet(x)))).filter(_._2.isSuccess).map(x
                => x._1)

                My understanding is that this doesn't work because
                sqlContext can't be used
                inside of a transformation like "map" (or inside an
                action).  That it only
                makes sense in the driver.  Thus, it becomes a null
                reference in the above
                code, so all reads fail.

                Code that works:

                import scala.util.{Try, Success, Failure} val
                parquetFiles =
                sc.textFile("file_list.txt") val successes =
                parquetFiles.collect().map(x =>
                (x,
                Try(sqlContext.read.parquet(x)))).filter(_._2.isSuccess).map(x
                => x._1)


                This works because the collect() means that everything
                happens back on the
                driver.  So the sqlContext object makes sense and
                everything works fine.

                But it is slow.  I'm using yarn-client mode on a
                6-node cluster with 17
executors, 40 GB ram on driver, 19GB on executors. And it takes about 1
                minute to execute for 100 parquet files. Which is too
                long.  Recall we need
                to do this across hundreds of thousands of files.

                I realize it is possible to parallelize after the read:

                import scala.util.{Try, Success, Failure} val
                parquetFiles =
                sc.textFile("file_list.txt") val intermediate_successes =
                parquetFiles.collect().map(x => (x,
                Try(sqlContext.read.parquet(x))))
                val dist_successes = sc.parallelize(successes) val
                successes =
                dist_successes.filter(_._2.isSuccess).map(x => x._1)


But this does not improve performance substantially. It seems the
                bottleneck is that the reads are happening sequentially.

                Is there a better way to do this?

                Thanks,
                Jordan




                --
                View this message in context:
                
http://apache-spark-user-list.1001560.n3.nabble.com/Performance-when-iterating-over-many-parquet-files-tp24850.html
                Sent from the Apache Spark User List mailing list
                archive at Nabble.com.

                
---------------------------------------------------------------------
                To unsubscribe, e-mail:
                user-unsubscr...@spark.apache.org
                <mailto:user-unsubscr...@spark.apache.org>
                For additional commands, e-mail:
                user-h...@spark.apache.org
                <mailto:user-h...@spark.apache.org>

        ------------------------------------------------------------------------


        This message is for the designated recipient only and may
        contain privileged, proprietary, or otherwise confidential
        information. If you have received it in error, please notify
        the sender immediately and delete the original. Any other use
        of the e-mail by you is prohibited. Where allowed by local
        law, electronic communications with Accenture and its
        affiliates, including e-mail and instant messaging (including
        content), may be scanned by our systems for the purposes of
        information security and assessment of internal compliance
        with Accenture policy.
        
______________________________________________________________________________________

        www.accenture.com <http://www.accenture.com>


Reply via email to