On Tue, Sep 9, 2014 at 9:56 AM, Oleg Ruchovets <oruchov...@gmail.com> wrote: > Hi , > > I came from map/reduce background and try to do quite trivial thing: > > I have a lot of files ( on hdfs ) - format is : > > 1 , 2 , 3 > 2 , 3 , 5 > 1 , 3, 5 > 2, 3 , 4 > 2 , 5, 1 > > I am actually need to group by key (first column) : > key values > 1 --> (2,3),(3,5) > 2 --> (3,5),(3,4),(5,1) > > and I need to process (pass) values to the function f ( my custom > function) > outcome of function f() should be to hdfs with corresponding key: > 1 --> f() outcome > 2 --> f() outcome. > > My code is : > > def doSplit(x): > y = x.split(',') > if(len(y)==3): > return y[0],(y[1],y[2]) > > > lines = sc.textFile(filename,1) > counts = lines.map(doSplit).groupByKey() > output = counts.collect() > > for (key, value) in output: > print 'build model for key ->' , key > print value > f(str(key) , value)) > > > Questions: > 1) lines.map(doSplit).groupByKey() - I didn't find the option to use > groupByKey( f() ) to process grouped values? how can I process grouped keys > by custom function? function f has some not trivial logic.
The result of groupByKey() is still RDD with (key, ResultIterable(values)), so you can continue to call map() or mapValues() on it: lines.map(doSplit).groupByKey().map(f) But your `f` need two parameters, the map() will assume that `f` take one parameter, so you need to build a wrapper for `f`: lines.map(doSplit).groupByKey().map(lambda (k, vs): f(k, vs)) If the `f` only accept values as list, then you need to convert `vs` into list: result = lines.map(doSplit).groupByKey().map(lambda (k, vs): f(k, list(vs))) finally, you could save the `result` into HDFS: result.saveAsPickleFile(path, batch=1024) > 2) Using output ( I really don't like this approach ) to pass to > function looks like not scalable and executed only on one machine? What is > the way using PySpark process grouped keys in distributed fashion. > Multiprocessing and on different machine of the cluster. > > 3)In case of processing output how data can be stored on hdfs? Currently, it's not easy to access files in HDFS, you could do it by sc.parallelize(local_data).map(str).saveAsTextFile() > Thanks > Oleg. > > > > > > > > > > > > > > > > > > --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org