The issue is explained in depth here: https://medium.com/@manuzhang/the-hidden-cost-of-spark-withcolumn-8ffea517c015

Am 19.12.19 um 23:33 schrieb Chris Teoh:
As far as I'm aware it isn't any better. The logic all gets processed by the same engine so to confirm, compare the DAGs generated from both approaches and see if they're identical.

On Fri, 20 Dec 2019, 8:56 am ayan guha, <guha.a...@gmail.com <mailto:guha.a...@gmail.com>> wrote:

    Quick question: Why is it better to use one sql vs multiple
    withColumn? isnt everything eventually rewritten by catalyst?

    On Wed, 18 Dec 2019 at 9:14 pm, Enrico Minack
    <m...@enrico.minack.dev <mailto:m...@enrico.minack.dev>> wrote:

        How many withColumn statements do you have? Note that it is
        better to use a single select, rather than lots of withColumn.
        This also makes drops redundant.

        Reading 25m CSV lines and writing to Parquet in 5 minutes on
        32 cores is really slow. Can you try this on a single machine,
        i.e. run wit "local[*]".

        Can you rule out the writing part by counting the rows? I
        presume this all happens in a single stage.

        Enrico


        Am 18.12.19 um 10:56 schrieb Antoine DUBOIS:
        Hello

        I'm working on an ETL based on csv describing file systems to
        transform it into parquet so I can work on them easily to
        extract informations.
        I'm using Mr. Powers framework Daria to do so. I've quiet
        different input and a lot of transformation and the framework
        helps organize the code.
        I have a stand-alone cluster v2.3.2 composed of 4 node with 8
        cores and 32GB of memory each.
        The storage is handle by a CephFS volume mounted on all nodes.
        First a small description of my algorithm (it's quiet simple):

            Use SparkContext to load the csv.bz2 file,
            Chain a lot of withColumn() statement,
            Drop all unnecessary columns,
            Write parquet file to CephFS

        This treatment can take several hours depending on how much
        lines the CSV is and I wanted to identify if bz2 or network
        could be an issue
        so I run the following test (several time with consistent
        result) :
        I tried the following scenario with 20 cores and 2 core per task:

          * Read the csv.bz2 from CephFS with connection with 1Gb/s
            for each node: ~5 minutes.
          * Read the csv.bz2 from TMPFS(setup to look like a shared
            storage space): ~5 minutes.
          * From the 2 previous tests I concluded that uncompressing
            the file was part of the bottleneck so I decided to
            uncompress the file and store it in TMPFS as well,
            result: ~5.9 minutes.

        The test file has 25'833'369 lines and is 370MB compressed
        and 3700MB uncompressed. Those results have been reproduced
        several time each.
        My question here is by what am I bottleneck in this case ?

        I though that the uncompressed file in RAM would be the
        fastest. Is it possible that my program is suboptimal reading
        the CSV ?
        In the execution logs on the cluster I have 5 to 10 seconds
        GC time max, and timeline shows mainly CPU time (no
        shuffling, no randomization overload either).
        I also noticed that memory storage is never used during the
        execution. I know from several hours of research that bz2 is
        the only real compression algorithm usable as an input in
        spark for parallelization reasons.

        Do you have any idea of why such a behaviour ?
        and do you have any idea on how to improve such treatment ?

        Cheers

        Antoine


-- Best Regards,
    Ayan Guha


Reply via email to