Not sure where you want me to put yield. My first try caused an error in Spark that it could not pickle generator objects.

On 02/26/2017 03:25 PM, ayan guha wrote:
Hi

We are doing similar stuff, but with large number of small-ish files. What we do is write a function to parse a complete file, similar to your parse file. But we use yield, instead of return and flatmap on top of it. Can you give it a try and let us know if it works?

On Mon, Feb 27, 2017 at 9:02 AM, Koert Kuipers <ko...@tresata.com <mailto:ko...@tresata.com>> wrote:

    using wholeFiles to process formats that can not be split per line
    is not "old"

    and there are plenty of problems for which RDD is still better
    suited than Dataset or DataFrame currently (this might change in
    near future when Dataset gets some crucial optimizations fixed).

    On Sun, Feb 26, 2017 at 3:14 PM, Gourav Sengupta
    <gourav.sengu...@gmail.com <mailto:gourav.sengu...@gmail.com>> wrote:

        Hi Henry,

        Those guys in Databricks training are nuts and still use Spark
        1.x for their exams. Learning SPARK is a VERY VERY VERY old
        way of solving problems using SPARK.

        The core engine of SPARK, which even I understand, has gone
        through several fundamental changes.

        Just try reading the file using dataframes and try using SPARK
        2.1.

        In other words it may be of tremendous benefit if you were
        learning to solve problems which exists rather than problems
        which does not exist any more.

        Please let me know in case I can be of any further help.

        Regards,
        Gourav

        On Sun, Feb 26, 2017 at 7:09 PM, Henry Tremblay
        <paulhtremb...@gmail.com <mailto:paulhtremb...@gmail.com>> wrote:

            The file is so small that a stand alone python script,
            independent of spark, can process the file in under a second.

            Also, the following fails:

            1. Read the whole file in with wholeFiles

            2. use flatMap to get 50,000 rows that looks like:
            Row(id="path", line="line")

            3. Save the results as CVS to HDFS

            4. Read the files (there are 20) from HDFS into a df using
            sqlContext.read.csv(<path>)

            5. Convert the df to an rdd.

            6 Create key value pairs with the key being the file path
            and the value being the line.

            7 Iterate through values

            What happens is Spark either runs out of memory, or, in my
            last try with a slight variation, just hangs for 12 hours.

            Henry


            On 02/26/2017 03:31 AM, 颜发才(Yan Facai) wrote:
            Hi, Tremblay.
            Your file is .gz format, which is not splittable for
            hadoop. Perhaps the file is loaded by only one executor.
            How many executors do you start?
            Perhaps repartition method could solve it, I guess.


            On Sun, Feb 26, 2017 at 3:33 AM, Henry Tremblay
            <paulhtremb...@gmail.com
            <mailto:paulhtremb...@gmail.com>> wrote:

                I am reading in a single small file from hadoop with
                wholeText. If I process each line and create a row
                with two cells, the first cell equal to the name of
                the file, the second cell equal to the line. That
                code runs fine.

                But if I just add two line of code and change the
                first cell based on parsing a line, spark runs out of
                memory. Any idea why such a simple process that would
                succeed quickly in a non spark application fails?

                Thanks!

                Henry

                CODE:

                [hadoop@ip-172-31-35-67 ~]$ hadoop fs -du /mnt/temp
                3816096
                
/mnt/temp/CC-MAIN-20170116095123-00570-ip-10-171-10-70.ec2.internal.warc.gz


                In [1]: rdd1 = sc.wholeTextFiles("/mnt/temp")
                In [2]: rdd1.count()
                Out[2]: 1


                In [4]: def process_file(s):
                   ...:     text = s[1]
                   ...:     the_id = s[0]
                   ...:     d = {}
                   ...:     l = text.split("\n")
                   ...:     final = []
                   ...:     for line in l:
                   ...:         d[the_id] = line
                   ...:  final.append(Row(**d))
                   ...:     return final
                   ...:

                In [5]: rdd2 = rdd1.map(process_file)

                In [6]: rdd2.count()
                Out[6]: 1

                In [7]: rdd3 = rdd2.flatMap(lambda x: x)

                In [8]: rdd3.count()
                Out[8]: 508310

                In [9]: rdd3.take(1)
                Out[9]: [Row(hdfs://ip-172-31-35-67.us
                
<http://ip-172-31-35-67.us>-west-2.compute.internal:8020/mnt/temp/CC-MAIN-20170116095123-00570-ip-10-171-10-70.ec2.in
                
<http://3-00570-ip-10-171-10-70.ec2.in>ternal.warc.gz='WARC/1.0\r')]

                In [10]: def process_file(s):
                    ...:     text = s[1]
                    ...:     d = {}
                    ...:     l = text.split("\n")
                    ...:     final = []
                    ...:     the_id = "init"
                    ...:     for line in l:
                    ...:         if line[0:15] == 'WARC-Record-ID:':
                    ...:  the_id = line[15:]
                    ...:         d[the_id] = line
                    ...:  final.append(Row(**d))
                    ...:     return final

                In [12]: rdd2 = rdd1.map(process_file)

                In [13]: rdd2.count()
                17/02/25 19:03:03 ERROR YarnScheduler: Lost executor
                5 on ip-172-31-41-89.us-west-2.compute.internal:
                Container killed by YARN for exceeding memory limits.
                10.3 GB of 10.3 GB physical memory used. Consider
                boosting spark.yarn.executor.memoryOverhead.
                17/02/25 19:03:03 WARN
                YarnSchedulerBackend$YarnSchedulerEndpoint: Container
                killed by YARN for exceeding memory limits. 10.3 GB
                of 10.3 GB physical memory used. Consider boosting
                spark.yarn.executor.memoryOverhead.
                17/02/25 19:03:03 WARN TaskSetManager: Lost task 0.0
                in stage 5.0 (TID 5,
                ip-172-31-41-89.us-west-2.compute.internal, executor
                5): ExecutorLostFailure (executor 5 exited caused by
                one of the running tasks) Reason: Container killed by
                YARN for exceeding memory limits. 10.3 GB of 10.3 GB
                physical memory used. Consider boosting
                spark.yarn.executor.memoryOverhead.


-- Henry Tremblay
                Robert Half Technology


                
---------------------------------------------------------------------
                To unsubscribe e-mail:
                user-unsubscr...@spark.apache.org
                <mailto:user-unsubscr...@spark.apache.org>



-- Henry Tremblay
            Robert Half Technology






--
Best Regards,
Ayan Guha

--
Henry Tremblay
Robert Half Technology

Reply via email to