This won't work:

rdd2 = rdd.flatMap(splitf)

rdd2.take(1)

[u'WARC/1.0\r']

rdd2.count()

508310

If I then try to apply a map to rdd2, the map only works on each individual line. I need to create a state machine as in my second function. That is, I need to apply a key to each line, but the key is determined by a previous line.

My first function below always has the same id. That was the point, to show that the first function succeeded while the second failed. In the dictionary grows, but it has at most 508,310 keys, In fact, most likely it will have only about 1/10th of this or less. I used the same exact code with the same file with pure python, without Spark, and the process ran in under 1 second.

Thanks!


Henry

On 02/26/2017 11:37 PM, Pavel Plotnikov wrote:

Hi, Henry

In first example the dict d always contains only one value because the_Id is same, in second case duct grows very quickly. So, I can suggest to firstly apply map function to split you file with string on rows then please make repartition and then apply custom logic

Example:

def splitf(s):
    return s.split("\n")

rdd.flatmap(splitf).repartition(1000).map(your function)

Best,
Pavel


On Mon, 27 Feb 2017, 06:28 Henry Tremblay, <paulhtremb...@gmail.com <mailto:paulhtremb...@gmail.com>> wrote:

    Not sure where you want me to put yield. My first try caused an
    error in Spark that it could not pickle generator objects.


    On 02/26/2017 03:25 PM, ayan guha wrote:
    Hi

    We are doing similar stuff, but with large number of small-ish
    files. What we do is write a function to parse a complete file,
    similar to your parse file. But we use yield, instead of return
    and flatmap on top of it. Can you give it a try and let us know
    if it works?

    On Mon, Feb 27, 2017 at 9:02 AM, Koert Kuipers <ko...@tresata.com
    <mailto:ko...@tresata.com>> wrote:

        using wholeFiles to process formats that can not be split per
        line is not "old"

        and there are plenty of problems for which RDD is still
        better suited than Dataset or DataFrame currently (this might
        change in near future when Dataset gets some crucial
        optimizations fixed).

        On Sun, Feb 26, 2017 at 3:14 PM, Gourav Sengupta
        <gourav.sengu...@gmail.com
        <mailto:gourav.sengu...@gmail.com>> wrote:

            Hi Henry,

            Those guys in Databricks training are nuts and still use
            Spark 1.x for their exams. Learning SPARK is a VERY VERY
            VERY old way of solving problems using SPARK.

            The core engine of SPARK, which even I understand, has
            gone through several fundamental changes.

            Just try reading the file using dataframes and try using
            SPARK 2.1.

            In other words it may be of tremendous benefit if you
            were learning to solve problems which exists rather than
            problems which does not exist any more.

            Please let me know in case I can be of any further help.

            Regards,
            Gourav

            On Sun, Feb 26, 2017 at 7:09 PM, Henry Tremblay
            <paulhtremb...@gmail.com
            <mailto:paulhtremb...@gmail.com>> wrote:

                The file is so small that a stand alone python
                script, independent of spark, can process the file in
                under a second.

                Also, the following fails:

                1. Read the whole file in with wholeFiles

                2. use flatMap to get 50,000 rows that looks like:
                Row(id="path", line="line")

                3. Save the results as CVS to HDFS

                4. Read the files (there are 20) from HDFS into a df
                using sqlContext.read.csv(<path>)

                5. Convert the df to an rdd.

                6 Create key value pairs with the key being the file
                path and the value being the line.

                7 Iterate through values

                What happens is Spark either runs out of memory, or,
                in my last try with a slight variation, just hangs
                for 12 hours.

                Henry


                On 02/26/2017 03:31 AM, 颜发才(Yan Facai) wrote:
                Hi, Tremblay.
                Your file is .gz format, which is not splittable for
                hadoop. Perhaps the file is loaded by only one executor.
                How many executors do you start?
                Perhaps repartition method could solve it, I guess.


                On Sun, Feb 26, 2017 at 3:33 AM, Henry Tremblay
                <paulhtremb...@gmail.com
                <mailto:paulhtremb...@gmail.com>> wrote:

                    I am reading in a single small file from hadoop
                    with wholeText. If I process each line and
                    create a row with two cells, the first cell
                    equal to the name of the file, the second cell
                    equal to the line. That code runs fine.

                    But if I just add two line of code and change
                    the first cell based on parsing a line, spark
                    runs out of memory. Any idea why such a simple
                    process that would succeed quickly in a non
                    spark application fails?

                    Thanks!

                    Henry

                    CODE:

                    [hadoop@ip-172-31-35-67 ~]$ hadoop fs -du /mnt/temp
                    3816096
                    
/mnt/temp/CC-MAIN-20170116095123-00570-ip-10-171-10-70.ec2.internal.warc.gz


                    In [1]: rdd1 = sc.wholeTextFiles("/mnt/temp")
                    In [2]: rdd1.count()
                    Out[2]: 1


                    In [4]: def process_file(s):
                       ...:     text = s[1]
                       ...:     the_id = s[0]
                       ...:     d = {}
                       ...:     l = text.split("\n")
                       ...:     final = []
                       ...:     for line in l:
                       ...:  d[the_id] = line
                       ...:  final.append(Row(**d))
                       ...:     return final
                       ...:

                    In [5]: rdd2 = rdd1.map(process_file)

                    In [6]: rdd2.count()
                    Out[6]: 1

                    In [7]: rdd3 = rdd2.flatMap(lambda x: x)

                    In [8]: rdd3.count()
                    Out[8]: 508310

                    In [9]: rdd3.take(1)
                    Out[9]: [Row(hdfs://ip-172-31-35-67.us
                    
<http://ip-172-31-35-67.us>-west-2.compute.internal:8020/mnt/temp/CC-MAIN-20170116095123-00570-ip-10-171-10-70.ec2.in
                    
<http://3-00570-ip-10-171-10-70.ec2.in>ternal.warc.gz='WARC/1.0\r')]

                    In [10]: def process_file(s):
                        ...:     text = s[1]
                        ...:     d = {}
                        ...:     l = text.split("\n")
                        ...:     final = []
                        ...:  the_id = "init"
                        ...:     for line in l:
                        ...:  if line[0:15] == 'WARC-Record-ID:':
                        ...:    the_id = line[15:]
                        ...:  d[the_id] = line
                        ...:  final.append(Row(**d))
                        ...:  return final

                    In [12]: rdd2 = rdd1.map(process_file)

                    In [13]: rdd2.count()
                    17/02/25 19:03:03 ERROR YarnScheduler: Lost
                    executor 5 on
                    ip-172-31-41-89.us-west-2.compute.internal:
                    Container killed by YARN for exceeding memory
                    limits. 10.3 GB of 10.3 GB physical memory used.
                    Consider boosting
                    spark.yarn.executor.memoryOverhead.
                    17/02/25 19:03:03 WARN
                    YarnSchedulerBackend$YarnSchedulerEndpoint:
                    Container killed by YARN for exceeding memory
                    limits. 10.3 GB of 10.3 GB physical memory used.
                    Consider boosting
                    spark.yarn.executor.memoryOverhead.
                    17/02/25 19:03:03 WARN TaskSetManager: Lost task
                    0.0 in stage 5.0 (TID 5,
                    ip-172-31-41-89.us-west-2.compute.internal,
                    executor 5): ExecutorLostFailure (executor 5
                    exited caused by one of the running tasks)
                    Reason: Container killed by YARN for exceeding
                    memory limits. 10.3 GB of 10.3 GB physical
                    memory used. Consider boosting
                    spark.yarn.executor.memoryOverhead.


-- Henry Tremblay
                    Robert Half Technology


                    
---------------------------------------------------------------------
                    To unsubscribe e-mail:
                    user-unsubscr...@spark.apache.org
                    <mailto:user-unsubscr...@spark.apache.org>



-- Henry Tremblay
                Robert Half Technology






-- Best Regards,
    Ayan Guha

-- Henry Tremblay
    Robert Half Technology


--
Henry Tremblay
Robert Half Technology

Reply via email to