Thanks! That works:

def process_file(my_iter):
    the_id = "init"
    final = []
    for chunk in my_iter:
        lines = chunk[1].split("\n")
        for line in lines:
            if line[0:15] == 'WARC-Record-ID:':
                the_id = line[15:]
            final.append(Row(the_id = the_id, line = line))
    return iter(final)

rdd2 = rdd.mapPartition(process_file)

Can anyone explain why this solution works? I am aware that an iterator is lazily evaluated, but my exact understanding in this case is vague.

Henry


On 02/27/2017 12:50 AM, 颜发才(Yan Facai) wrote:
Hi, Tremblay,
map processes text line by line, so it is not the method you need.

However,
mapPartition and iterator can help you maintain a state.
like:
http://homepage.cs.latrobe.edu.au/zhe/ZhenHeSparkRDDAPIExamples.html#mapPartitions




On Mon, Feb 27, 2017 at 4:24 PM, Henry Tremblay <paulhtremb...@gmail.com <mailto:paulhtremb...@gmail.com>> wrote:

    This won't work:

    rdd2 = rdd.flatMap(splitf)

    rdd2.take(1)

    [u'WARC/1.0\r']

    rdd2.count()

    508310

    If I then try to apply a map to rdd2, the map only works on each
    individual line. I need to create a state machine as in my second
    function. That is, I need to apply a key to each line, but the key
    is determined by a previous line.

    My first function below always has the same id. That was the
    point, to show that the first function succeeded while the second
    failed. In the  dictionary grows, but it has at most 508,310 keys,
    In fact, most likely it will have only about 1/10th of this or
    less. I used the same exact code with the same file with pure
    python, without Spark, and the process ran in under 1 second.

    Thanks!


    Henry

    On 02/26/2017 11:37 PM, Pavel Plotnikov wrote:

    Hi, Henry

    In first example the dict d always contains only one value
    because the_Id is same, in second case duct grows very quickly.
    So, I can suggest to firstly apply map function to split you file
    with string on rows then please make repartition and then apply
    custom logic

    Example:

    def splitf(s):
        return s.split("\n")

    rdd.flatmap(splitf).repartition(1000).map(your function)

    Best,
    Pavel


    On Mon, 27 Feb 2017, 06:28 Henry Tremblay,
    <paulhtremb...@gmail.com <mailto:paulhtremb...@gmail.com>> wrote:

        Not sure where you want me to put yield. My first try caused
        an error in Spark that it could not pickle generator objects.


        On 02/26/2017 03:25 PM, ayan guha wrote:
        Hi

        We are doing similar stuff, but with large number of
        small-ish files. What we do is write a function to parse a
        complete file, similar to your parse file. But we use yield,
        instead of return and flatmap on top of it. Can you give it
        a try and let us know if it works?

        On Mon, Feb 27, 2017 at 9:02 AM, Koert Kuipers
        <ko...@tresata.com <mailto:ko...@tresata.com>> wrote:

            using wholeFiles to process formats that can not be
            split per line is not "old"

            and there are plenty of problems for which RDD is still
            better suited than Dataset or DataFrame currently (this
            might change in near future when Dataset gets some
            crucial optimizations fixed).

            On Sun, Feb 26, 2017 at 3:14 PM, Gourav Sengupta
            <gourav.sengu...@gmail.com
            <mailto:gourav.sengu...@gmail.com>> wrote:

                Hi Henry,

                Those guys in Databricks training are nuts and still
                use Spark 1.x for their exams. Learning SPARK is a
                VERY VERY VERY old way of solving problems using SPARK.

                The core engine of SPARK, which even I understand,
                has gone through several fundamental changes.

                Just try reading the file using dataframes and try
                using SPARK 2.1.

                In other words it may be of tremendous benefit if
                you were learning to solve problems which exists
                rather than problems which does not exist any more.

                Please let me know in case I can be of any further help.

                Regards,
                Gourav

                On Sun, Feb 26, 2017 at 7:09 PM, Henry Tremblay
                <paulhtremb...@gmail.com
                <mailto:paulhtremb...@gmail.com>> wrote:

                    The file is so small that a stand alone python
                    script, independent of spark, can process the
                    file in under a second.

                    Also, the following fails:

                    1. Read the whole file in with wholeFiles

                    2. use flatMap to get 50,000 rows that looks
                    like: Row(id="path", line="line")

                    3. Save the results as CVS to HDFS

                    4. Read the files (there are 20) from HDFS into
                    a df using sqlContext.read.csv(<path>)

                    5. Convert the df to an rdd.

                    6 Create key value pairs with the key being the
                    file path and the value being the line.

                    7 Iterate through values

                    What happens is Spark either runs out of memory,
                    or, in my last try with a slight variation, just
                    hangs for 12 hours.

                    Henry


                    On 02/26/2017 03:31 AM, 颜发才(Yan Facai) wrote:
                    Hi, Tremblay.
                    Your file is .gz format, which is not
                    splittable for hadoop. Perhaps the file is
                    loaded by only one executor.
                    How many executors do you start?
                    Perhaps repartition method could solve it, I guess.


                    On Sun, Feb 26, 2017 at 3:33 AM, Henry Tremblay
                    <paulhtremb...@gmail.com
                    <mailto:paulhtremb...@gmail.com>> wrote:

                        I am reading in a single small file from
                        hadoop with wholeText. If I process each
                        line and create a row with two cells, the
                        first cell equal to the name of the file,
                        the second cell equal to the line. That
                        code runs fine.

                        But if I just add two line of code and
                        change the first cell based on parsing a
                        line, spark runs out of memory. Any idea
                        why such a simple process that would
                        succeed quickly in a non spark application
                        fails?

                        Thanks!

                        Henry

                        CODE:

                        [hadoop@ip-172-31-35-67 ~]$ hadoop fs -du
                        /mnt/temp
                        3816096
                        
/mnt/temp/CC-MAIN-20170116095123-00570-ip-10-171-10-70.ec2.internal.warc.gz


                        In [1]: rdd1 = sc.wholeTextFiles("/mnt/temp")
                        In [2]: rdd1.count()
                        Out[2]: 1


                        In [4]: def process_file(s):
                           ...:  text = s[1]
                           ...:  the_id = s[0]
                           ...:     d = {}
                           ...:     l = text.split("\n")
                           ...:  final = []
                           ...:  for line in l:
                           ...:  d[the_id] = line
                           ...:  final.append(Row(**d))
                           ...:  return final
                           ...:

                        In [5]: rdd2 = rdd1.map(process_file)

                        In [6]: rdd2.count()
                        Out[6]: 1

                        In [7]: rdd3 = rdd2.flatMap(lambda x: x)

                        In [8]: rdd3.count()
                        Out[8]: 508310

                        In [9]: rdd3.take(1)
                        Out[9]: [Row(hdfs://ip-172-31-35-67.us
                        
<http://ip-172-31-35-67.us>-west-2.compute.internal:8020/mnt/temp/CC-MAIN-20170116095123-00570-ip-10-171-10-70.ec2.in
                        
<http://3-00570-ip-10-171-10-70.ec2.in>ternal.warc.gz='WARC/1.0\r')]

                        In [10]: def process_file(s):
                            ...:  text = s[1]
                            ...:     d = {}
                            ...:     l = text.split("\n")
                            ...:  final = []
                            ...:  the_id = "init"
                            ...:  for line in l:
                            ...:    if line[0:15] == 'WARC-Record-ID:':
                            ...:        the_id = line[15:]
                            ...:    d[the_id] = line
                            ...:  final.append(Row(**d))
                            ...:  return final

                        In [12]: rdd2 = rdd1.map(process_file)

                        In [13]: rdd2.count()
                        17/02/25 19:03:03 ERROR YarnScheduler: Lost
                        executor 5 on
                        ip-172-31-41-89.us-west-2.compute.internal:
                        Container killed by YARN for exceeding
                        memory limits. 10.3 GB of 10.3 GB physical
                        memory used. Consider boosting
                        spark.yarn.executor.memoryOverhead.
                        17/02/25 19:03:03 WARN
                        YarnSchedulerBackend$YarnSchedulerEndpoint:
                        Container killed by YARN for exceeding
                        memory limits. 10.3 GB of 10.3 GB physical
                        memory used. Consider boosting
                        spark.yarn.executor.memoryOverhead.
                        17/02/25 19:03:03 WARN TaskSetManager: Lost
                        task 0.0 in stage 5.0 (TID 5,
                        ip-172-31-41-89.us-west-2.compute.internal,
                        executor 5): ExecutorLostFailure (executor
                        5 exited caused by one of the running
                        tasks) Reason: Container killed by YARN for
                        exceeding memory limits. 10.3 GB of 10.3 GB
                        physical memory used. Consider boosting
                        spark.yarn.executor.memoryOverhead.


-- Henry Tremblay
                        Robert Half Technology


                        
---------------------------------------------------------------------
                        To unsubscribe e-mail:
                        user-unsubscr...@spark.apache.org
                        <mailto:user-unsubscr...@spark.apache.org>



-- Henry Tremblay
                    Robert Half Technology






-- Best Regards,
        Ayan Guha

-- Henry Tremblay
        Robert Half Technology


-- Henry Tremblay
    Robert Half Technology



--
Henry Tremblay
Robert Half Technology

Reply via email to