On Sun, Aug 17, 2014 at 11:21 AM, Chengi Liu <chengi.liu...@gmail.com> wrote:
> Hi,
>   Thanks for the response..
> In the second case f2??
> foo will have to be declared globablly??right??
>
> My function is somthing like:
> def indexing(splitIndex, iterator):
>   count = 0
>   offset = sum(offset_lists[:splitIndex]) if splitIndex else 0
>   indexed = []
>   for i, e in enumerate(iterator):
>     index = count + offset + i
>     for j, ele in enumerate(e):
>       indexed.append((index, j, ele))
>   yield indexed

In this function `indexing`, `offset_lists` should be global.

> def another_funct(offset_lists):
>     #get that damn offset_lists
>     rdd.mapPartitionsWithSplit(indexing)
> But then, the issue is that offset_lists?
> Any suggestions?

Basically, you can do what you do in normal Python program, PySpark
will send the global variables or closures to worker processes automatically.

So, you can :

def indexing(splitIndex, iterator, offset_lists):
     pass

def another_func(offset_lists):
     rdd.mapPartitionsWithSplit(lambda index, it: indexing(index, it,
offset_lists))

Or:

def indexing(splitIndex, iterrator):
     # access offset_lists

def another_func(offset):
     global offset_lists
     offset_lists = offset
     rdd. mapPartitionsWithSplit(indexing)

Or:

def another_func(offset_lists):
      def indexing(index, iterator):
          # access offset_lists
          pass
      rdd.mapPartitionsWithIndex(indexing)


>
> On Sun, Aug 17, 2014 at 11:15 AM, Davies Liu <dav...@databricks.com> wrote:
>>
>> The callback function f only accept 2 arguments, if you want to pass
>> another objects to it, you need closure, such as:
>>
>> foo=xxx
>> def f(index, iterator, foo):
>>      yield (index, foo)
>> rdd.mapPartitionsWithIndex(lambda index, it: f(index, it, foo))
>>
>> also you can make f become `closure`:
>>
>> def f2(index, iterator):
>>     yield (index, foo)
>> rdd.mapPartitionsWithIndex(f2)
>>
>> On Sun, Aug 17, 2014 at 10:25 AM, Chengi Liu <chengi.liu...@gmail.com>
>> wrote:
>> > Hi,
>> >   In this example:
>> >
>> > http://www.cs.berkeley.edu/~pwendell/strataconf/api/pyspark/pyspark.rdd.RDD-class.html#mapPartitionsWithSplit
>> > Let say, f takes three arguments:
>> > def f(splitIndex, iterator, foo): yield splitIndex
>> > Now, how do i send this foo parameter to this method?
>> > rdd.mapPartitionsWithSplit(f)
>> > Thanks
>> >
>
>

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to