collect() will bring everything to driver and is costly. Instead of using collect() + parallelize, you could use rdd1.checkpoint() along with a more efficient action like rdd1.count(). This you can do within the for loop.
Hopefully you are using the Kryo serializer already. Regards Sab On Mon, Dec 21, 2015 at 5:51 PM, Zhiliang Zhu <zchl.j...@yahoo.com.invalid> wrote: > Dear All. > > I have some kind of iteration job, that is, the next stag's input would > be the previous stag's output , and it must do quite lots of times of > iteration. > > JavaRDD<T> rdd1 = .... //rdd1 may be with one or more > partitions > for (int i=0, JavaRDD<T> rdd2 = rdd1; i < N; ++i) { > JavaRDD<T> rdd3 = rdd2.map(new MapName1(...)); // 1 > rdd4 = rdd3.map(new MapName2(....)); // 2 > > List<T> list = rdd4.collect(); //*however, N is very big, > then this line will be VERY MUCH COST * > rdd2 = jsc.parallelize(list, M).cache(); > } > > Is there way to properly improve the run speed? > > In fact, I would like to apply spark to mathematica optimization by > genetic algorithm , in the above codes, rdd would be the Vector lines > storing <Y, x1, x2, ..., xn> , > 1 is to count fitness number, and 2 is to breed and variate . > To get good solution, the iteration number will be big (for example more > than 1000 ) ... > > Thanks in advance! > Zhiliang > > > > > > On Monday, December 21, 2015 7:44 PM, Zhiliang Zhu > <zchl.j...@yahoo.com.INVALID> wrote: > > > Dear All, > > I need to iterator some job / rdd quite a lot of times, but just lost in > the problem of > spark only accept to call around 350 number of map before it meets one > action Function , > besides, dozens of action will obviously increase the run time. > Is there any proper way ... > > As tested, there is piece of codes as follows: > > ...... > 83 int count = 0; > 84 JavaRDD<Integer> dataSet = jsc.parallelize(list, 1).cache(); > //with only 1 partition > 85 int m = 350; > 86 JavaRDD<Integer> r = dataSet.cache(); > 87 JavaRDD<Integer> t = null; > 88 > 89 for(int j=0; j < m; ++j) { //outer loop to temporarily convert the > rdd r to t > 90 if(null != t) { > 91 r = t; > 92 } > //inner loop to call map 350 times , if m is much more than > 350 (for instance, around 400), then the job will throw exception message > "15/12/21 19:36:17 ERROR yarn.ApplicationMaster: User class > threw exception: java.lang.StackOverflowError java.lang.StackOverflowError > ") > 93 for(int i=0; i < m; ++i) { > 94 * r = r.map(new Function<Integer, Integer>() {* > 95 @Override > 96 public Integer call(Integer integer) { > 97 double x = Math.random() * 2 - 1; > 98 double y = Math.random() * 2 - 1; > 99 return (x * x + y * y < 1) ? 1 : 0; > 100 } > 101 }); > > 104 } > 105 > 106 List<Integer> lt = r.collect(); //then collect this rdd to get > another rdd, however, dozens of action Function as collect is VERY MUCH COST > 107 t = jsc.parallelize(lt, 1).cache(); > 108 > 109 } > 110 > ...... > > Thanks very much in advance! > Zhiliang > > > > -- Architect - Big Data Ph: +91 99805 99458 Manthan Systems | *Company of the year - Analytics (2014 Frost and Sullivan India ICT)* +++