On Tue, Sep 9, 2014 at 9:56 AM, Oleg Ruchovets <oruchov...@gmail.com> wrote:
> Hi ,
>
>    I came from map/reduce background and try to do quite trivial thing:
>
> I have a lot of files ( on hdfs ) - format is :
>
>    1 , 2 , 3
>    2 , 3 , 5
>    1 , 3,  5
>     2, 3 , 4
>     2 , 5, 1
>
>   I am actually need to group by key (first column) :
>   key   values
>   1 --> (2,3),(3,5)
>   2 --> (3,5),(3,4),(5,1)
>
>   and I need to process (pass)  values to the function f ( my custom
> function)
>   outcome of  function f()  should be  to hdfs with corresponding key:
>     1 --> f() outcome
>     2 --> f() outcome.
>
> My code is :
>
>       def doSplit(x):
>         y = x.split(',')
>         if(len(y)==3):
>            return  y[0],(y[1],y[2])
>
>
>     lines = sc.textFile(filename,1)
>     counts = lines.map(doSplit).groupByKey()
>     output = counts.collect()
>
>     for (key, value) in output:
>         print 'build model for key ->' , key
>         print value
>         f(str(key) , value))
>
>
> Questions:
>    1) lines.map(doSplit).groupByKey() - I didn't  find the option to use
> groupByKey( f() ) to process grouped values? how can I process grouped keys
> by custom function? function f has some not trivial logic.

The result of groupByKey() is still RDD with (key, ResultIterable(values)),
so you can continue to call map() or mapValues() on it:

lines.map(doSplit).groupByKey().map(f)

But your `f` need two parameters, the map() will assume that `f`
take one parameter, so you need to build a wrapper for `f`:

lines.map(doSplit).groupByKey().map(lambda (k, vs): f(k, vs))

If the `f` only accept values as list, then you need to convert `vs` into list:

result = lines.map(doSplit).groupByKey().map(lambda (k, vs): f(k, list(vs)))

finally, you could save the `result` into HDFS:

result.saveAsPickleFile(path, batch=1024)

>     2) Using output ( I really don't like this approach )  to pass to
> function looks like not scalable and executed only on one machine?  What is
> the way using PySpark process grouped keys in distributed fashion.
> Multiprocessing and on different machine of the cluster.
>
> 3)In case of  processing output how data can be stored on hdfs?

Currently, it's not easy to access files in HDFS, you could do it by

sc.parallelize(local_data).map(str).saveAsTextFile()

> Thanks
> Oleg.
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to