On Sun, Aug 17, 2014 at 11:21 AM, Chengi Liu <chengi.liu...@gmail.com> wrote: > Hi, > Thanks for the response.. > In the second case f2?? > foo will have to be declared globablly??right?? > > My function is somthing like: > def indexing(splitIndex, iterator): > count = 0 > offset = sum(offset_lists[:splitIndex]) if splitIndex else 0 > indexed = [] > for i, e in enumerate(iterator): > index = count + offset + i > for j, ele in enumerate(e): > indexed.append((index, j, ele)) > yield indexed
In this function `indexing`, `offset_lists` should be global. > def another_funct(offset_lists): > #get that damn offset_lists > rdd.mapPartitionsWithSplit(indexing) > But then, the issue is that offset_lists? > Any suggestions? Basically, you can do what you do in normal Python program, PySpark will send the global variables or closures to worker processes automatically. So, you can : def indexing(splitIndex, iterator, offset_lists): pass def another_func(offset_lists): rdd.mapPartitionsWithSplit(lambda index, it: indexing(index, it, offset_lists)) Or: def indexing(splitIndex, iterrator): # access offset_lists def another_func(offset): global offset_lists offset_lists = offset rdd. mapPartitionsWithSplit(indexing) Or: def another_func(offset_lists): def indexing(index, iterator): # access offset_lists pass rdd.mapPartitionsWithIndex(indexing) > > On Sun, Aug 17, 2014 at 11:15 AM, Davies Liu <dav...@databricks.com> wrote: >> >> The callback function f only accept 2 arguments, if you want to pass >> another objects to it, you need closure, such as: >> >> foo=xxx >> def f(index, iterator, foo): >> yield (index, foo) >> rdd.mapPartitionsWithIndex(lambda index, it: f(index, it, foo)) >> >> also you can make f become `closure`: >> >> def f2(index, iterator): >> yield (index, foo) >> rdd.mapPartitionsWithIndex(f2) >> >> On Sun, Aug 17, 2014 at 10:25 AM, Chengi Liu <chengi.liu...@gmail.com> >> wrote: >> > Hi, >> > In this example: >> > >> > http://www.cs.berkeley.edu/~pwendell/strataconf/api/pyspark/pyspark.rdd.RDD-class.html#mapPartitionsWithSplit >> > Let say, f takes three arguments: >> > def f(splitIndex, iterator, foo): yield splitIndex >> > Now, how do i send this foo parameter to this method? >> > rdd.mapPartitionsWithSplit(f) >> > Thanks >> > > > --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org