Thanks, Matei.

In the context of this discussion, it would seem mapParitions is essential,
because it's the only way I'm going to be able to process each file as a
whole, in our example of a large number of small XML files which need to be
parsed as a whole file because records are not required to be on a single
line.

The theory makes sense but I'm still utterly lost as to how to implement
it.  Unfortunately there's only a single example of the use of
mapPartitions in any of the Python example programs, which is the log
regression example, which I can't run because it requires Python 2.7 and
I'm on Python 2.6.  (aside: I couldn't find any statement that Python 2.6
is unsupported...is it?)

I'd really really love to see a real life example of a Python use of
mapPartitions.  I do appreciate the very simple examples you provided, but
(perhaps because of my novice status on Python) I can't figure out how to
translate those to a real world situation in which I'm building RDDs from
files, not inline collections like [(1,2),(2,3)].

Also, you say that the function called in mapPartitions can return a
collection OR an iterator.  I tried returning an iterator by calling
ElementTree getiterator function, but still got the error telling me my
object was not an iterator.

If anyone has a real life example of mapPartitions returning a Python
iterator, that would be fabulous.

Diana


On Mon, Mar 17, 2014 at 6:17 PM, Matei Zaharia <matei.zaha...@gmail.com>wrote:

> Oh, I see, the problem is that the function you pass to mapPartitions must
> itself return an iterator or a collection. This is used so that you can
> return multiple output records for each input record. You can implement
> most of the existing map-like operations in Spark, such as map, filter,
> flatMap, etc, with mapPartitions, as well as new ones that might do a
> sliding window over each partition for example, or accumulate data across
> elements (e.g. to compute a sum).
>
> For example, if you have data = sc.parallelize([1, 2, 3, 4], 2), this will
> work:
>
> >>> data.mapPartitions(lambda x: x).collect()
> [1, 2, 3, 4]   # Just return the same iterator, doing nothing
>
> >>> data.mapPartitions(lambda x: [list(x)]).collect()
> [[1, 2], [3, 4]]   # Group together the elements of each partition in a
> single list (like glom)
>
> >>> data.mapPartitions(lambda x: [sum(x)]).collect()
> [3, 7]   # Sum each partition separately
>
> However something like data.mapPartitions(lambda x: sum(x)).collect() will
> *not* work because sum returns a number, not an iterator. That's why I put
> sum(x) inside a list above.
>
> In practice mapPartitions is most useful if you want to share some data or
> work across the elements. For example maybe you want to load a lookup table
> once from an external file and then check each element in it, or sum up a
> bunch of elements without allocating a lot of vector objects.
>
> Matei
>
>
> On Mar 17, 2014, at 11:25 AM, Diana Carroll <dcarr...@cloudera.com> wrote:
>
> > "There's also mapPartitions, which gives you an iterator for each
> partition instead of an array. You can then return an iterator or list of
> objects to produce from that."
> >
> > I confess, I was hoping for an example of just that, because i've not
> yet been able to figure out how to use mapPartitions.  No doubt this is
> because i'm a rank newcomer to Python, and haven't fully wrapped my head
> around iterators.  All I get so far in my attempts to use mapPartitions is
> the darned "suchnsuch is not an iterator" error.
> >
> > def myfunction(iterator): return [1,2,3]
> > mydata.mapPartitions(lambda x: myfunction(x)).take(2)
> >
> >
> >
> >
> >
> > On Mon, Mar 17, 2014 at 1:57 PM, Matei Zaharia <matei.zaha...@gmail.com>
> wrote:
> > Here's an example of getting together all lines in a file as one string:
> >
> > $ cat dir/a.txt
> > Hello
> > world!
> >
> > $ cat dir/b.txt
> > What's
> > up??
> >
> > $ bin/pyspark
> > >>> files = sc.textFile("dir")
> >
> > >>> files.collect()
> > [u'Hello', u'world!', u"What's", u'up??']   # one element per line, not
> what we want
> >
> > >>> files.glom().collect()
> > [[u'Hello', u'world!'], [u"What's", u'up??']]   # one element per file,
> which is an array of lines
> >
> > >>> files.glom().map(lambda a: "\n".join(a)).collect()
> > [u'Hello\nworld!', u"What's\nup??"]    # join back each file into a
> single string
> >
> > The glom() method groups all the elements of each partition of an RDD
> into an array, giving you an RDD of arrays of objects. If your input is
> small files, you always have one partition per file.
> >
> > There's also mapPartitions, which gives you an iterator for each
> partition instead of an array. You can then return an iterator or list of
> objects to produce from that.
> >
> > Matei
> >
> >
> > On Mar 17, 2014, at 10:46 AM, Diana Carroll <dcarr...@cloudera.com>
> wrote:
> >
> > > Thanks Matei.  That makes sense.  I have here a dataset of many many
> smallish XML files, so using mapPartitions that way would make sense.  I'd
> love to see a code example though ...It's not as obvious to me how to do
> that as I probably should be.
> > >
> > > Thanks,
> > > Diana
> > >
> > >
> > > On Mon, Mar 17, 2014 at 1:02 PM, Matei Zaharia <
> matei.zaha...@gmail.com> wrote:
> > > Hi Diana,
> > >
> > > Non-text input formats are only supported in Java and Scala right now,
> where you can use sparkContext.hadoopFile or .hadoopDataset to load data
> with any InputFormat that Hadoop MapReduce supports. In Python, you
> unfortunately only have textFile, which gives you one record per line. For
> JSON, you'd have to fit the whole JSON object on one line as you said.
> Hopefully we'll also have some other forms of input soon.
> > >
> > > If your input is a collection of separate files (say many .xml files),
> you can also use mapPartitions on it to group together the lines because
> each input file will end up being a single dataset partition (or map task).
> This will let you concatenate the lines in each file and parse them as one
> XML object.
> > >
> > > Matei
> > >
> > > On Mar 17, 2014, at 9:52 AM, Diana Carroll <dcarr...@cloudera.com>
> wrote:
> > >
> > >> Thanks, Krakna, very helpful.  The way I read the code, it looks like
> you are assuming that each line in foo.log contains a complete json object?
>  (That is, that the data doesn't contain any records that are split into
> multiple lines.)  If so, is that because you know that to be true of your
> data?  Or did you do as Nicholas suggests and have some preprocessing on
> the text input to flatten the data in that way?
> > >>
> > >> Thanks,
> > >> Diana
> > >>
> > >>
> > >> On Mon, Mar 17, 2014 at 12:09 PM, Krakna H <shankark+...@gmail.com>
> wrote:
> > >> Katrina,
> > >>
> > >> Not sure if this is what you had in mind, but here's some simple
> pyspark code that I recently wrote to deal with JSON files.
> > >>
> > >> from pyspark import SparkContext, SparkConf
> > >>
> > >>
> > >>
> > >> from operator import add
> > >> import json
> > >>
> > >>
> > >>
> > >> import random
> > >> import numpy as np
> > >>
> > >>
> > >>
> > >>
> > >> def concatenate_paragraphs(sentence_array):
> > >>
> > >>
> > >>
> > >> return ' '.join(sentence_array).split(' ')
> > >>
> > >>
> > >>
> > >>
> > >> logFile = 'foo.json'
> > >> conf = SparkConf()
> > >>
> > >>
> > >>
> > >>
> conf.setMaster("spark://cluster-master:7077").setAppName("example").set("spark.executor.memory",
> "1g")
> > >>
> > >>
> > >>
> > >>
> > >>
> > >>
> > >>
> > >> sc = SparkContext(conf=conf)
> > >>
> > >>
> > >>
> > >> logData = sc.textFile(logFile).cache()
> > >>
> > >>
> > >>
> > >> num_lines = logData.count()
> > >> print 'Number of lines: %d' % num_lines
> > >>
> > >>
> > >>
> > >>
> > >>
> > >>
> > >>
> > >> # JSON object has the structure: {"key": {'paragraphs': [sentence1,
> sentence2, ...]}}
> > >> tm = logData.map(lambda s: (json.loads(s)['key'],
> len(concatenate_paragraphs(json.loads(s)['paragraphs']))))
> > >>
> > >>
> > >>
> > >>
> > >>
> > >>
> > >>
> > >> tm = tm.reduceByKey(lambda _, x: _ + x)
> > >>
> > >>
> > >>
> > >>
> > >>
> > >>
> > >>
> > >> op = tm.collect()
> > >> for key, num_words in op:
> > >>
> > >>
> > >>
> > >>      print 'state: %s, num_words: %d' % (state, num_words)
> > >>
> > >>
> > >>
> > >>
> > >>
> > >>
> > >>
> > >>
> > >>
> > >>
> > >>
> > >> On Mon, Mar 17, 2014 at 11:58 AM, Diana Carroll [via Apache Spark
> User List] <[hidden email]> wrote:
> > >> I don't actually have any data.  I'm writing a course that teaches
> students how to do this sort of thing and am interested in looking at a
> variety of real life examples of people doing things like that.  I'd love
> to see some working code implementing the "obvious work-around" you
> mention...do you have any to share?  It's an approach that makes a lot of
> sense, and as I said, I'd love to not have to re-invent the wheel if
> someone else has already written that code.  Thanks!
> > >>
> > >> Diana
> > >>
> > >>
> > >> On Mon, Mar 17, 2014 at 11:35 AM, Nicholas Chammas <[hidden email]>
> wrote:
> > >> There was a previous discussion about this here:
> > >>
> > >>
> http://apache-spark-user-list.1001560.n3.nabble.com/Having-Spark-read-a-JSON-file-td1963.html
> > >>
> > >> How big are the XML or JSON files you're looking to deal with?
> > >>
> > >> It may not be practical to deserialize the entire document at once.
> In that case an obvious work-around would be to have some kind of
> pre-processing step that separates XML nodes/JSON objects with newlines so
> that you can analyze the data with Spark in a "line-oriented format". Your
> preprocessor wouldn't have to parse/deserialize the massive document; it
> would just have to track open/closed tags/braces to know when to insert a
> newline.
> > >>
> > >> Then you'd just open the line-delimited result and deserialize the
> individual objects/nodes with map().
> > >>
> > >> Nick
> > >>
> > >>
> > >> On Mon, Mar 17, 2014 at 11:18 AM, Diana Carroll <[hidden email]>
> wrote:
> > >> Has anyone got a working example of a Spark application that analyzes
> data in a non-line-oriented format, such as XML or JSON?  I'd like to do
> this without re-inventing the wheel...anyone care to share?  Thanks!
> > >>
> > >> Diana
> > >>
> > >>
> > >>
> > >>
> > >> If you reply to this email, your message will be added to the
> discussion below:
> > >>
> http://apache-spark-user-list.1001560.n3.nabble.com/example-of-non-line-oriented-input-data-tp2750p2752.html
> > >> To start a new topic under Apache Spark User List, email [hidden
> email]
> > >> To unsubscribe from Apache Spark User List, click here.
> > >> NAML
> > >>
> > >>
> > >> View this message in context: Re: example of non-line oriented input
> data?
> > >> Sent from the Apache Spark User List mailing list archive at
> Nabble.com.
> > >>
> > >
> > >
> >
> >
>
>

Reply via email to