Thought about it some more, and simplified the problem space for discussions:
Given: JavaPairRDD<String, Integer> c1; // c1.count() == 8000. Goal: JavaPairRDD<Tuple2<String,Integer>,Tuple2<String,Integer>> c2; // all lexicographical pairs Where: all lexicographic permutations on c1 :: (c1_i._1().compareTo(c1_j._1()) < 0) -> new Tuple2<Tuple2<String,Integer>,Tuple2<String,Integer>>(c1_i, c1_j); // forall c1_i < c1_j \in c1 Not sure how to efficiently generate c2. c1.cartesian(c1).filter(... (c1_i._1().compareTo(c1_j._1()) < 0) ...) was just terrible performance-wise. Thanks, -Dan On Thu, Apr 30, 2015 at 11:58 AM, Dan DeCapria, CivicScience < dan.decap...@civicscience.com> wrote: > I am trying to generate all (N-1)(N)/2 lexicographical 2-tuples from a > glom() JavaPairRDD<List<Tuple2>>. The construction of these initial > Tuple2's JavaPairRDD<AQ,Integer> space is well formed from case classes I > provide it (AQ, AQV, AQQ, CT) and is performant; minimized code: > > SparkConf conf = new SparkConf() > .setAppName("JavaTestMain") > .set("spark.driver.maxResultSize", "0") > .set("spark.akka.frameSize", "512"); > ... > JavaRDD<AQV> aqv = > sc.textFile(data_account_question_value_file).map((String line) -> { > String[] s = line.trim().split(","); > AQV key = new AQV(Integer.parseInt(s[0].trim()), s[1].trim(), > s[2].trim()); // (0,1,2) > return key; > }); > JavaPairRDD<AQ, Integer> c0 = aqv.distinct().mapToPair((AQV c) -> { > return new Tuple2<AQ, Integer>(new AQ(c.getAccount(), > c.getQuestion()), 1); > }); > JavaPairRDD<AQ, Integer> c1 = c0.reduceByKey((Integer i1, Integer i2) -> { > return (i1 + i2); > }); > logger.info(c1.count()); > > This code snippet above works well and returns a value JavaTestMain: 8010 > in a few seconds which is perfect. When I try to generate the iterative > lexicographic permutation space (32,076,045 elements), it is not performant > and results in a thrown java.lang.OutOfMemoryError: Java heap space; > minimized code continued: > > JavaRDD<List<Tuple2<AQ, Integer>>> c2 = c1.glom(); > JavaRDD<CT> c3 = c2.flatMap((List<Tuple2<AQ,Integer>> cl) -> { > List<CT> output = new ArrayList<>((cl.size() - 1) * (cl.size()) / > 2); > Tuple2<AQ, Integer> key1, key2; > for (ListIterator<Tuple2<AQ, Integer>> cloit = cl.listIterator(); > cloit.hasNext(); ) { // outer loop > key1 = cloit.next(); > for (ListIterator<Tuple2<AQ, Integer>> cliit = > cl.listIterator(cloit.nextIndex()); cliit.hasNext(); ) { // inner loop, > if applicable > key2 = cliit.next(); > output.add(new CT(new AQQ(key1._1(), key2._1()), > key1._2(), key2._2())); > } > } > return output; > }); > c3.collect().stream().forEach(System.out::println); > > > 15/04/30 11:29:59 ERROR Executor: Exception in task 0.0 in stage 2.0 (TID > 4) > java.lang.OutOfMemoryError: Java heap space > at java.util.Arrays.copyOf(Arrays.java:3236) > at > java.io.ByteArrayOutputStream.toByteArray(ByteArrayOutputStream.java:191) > at > org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:82) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > 15/04/30 11:29:59 ERROR SparkUncaughtExceptionHandler: Uncaught exception > in thread Thread[Executor task launch worker-0,5,main] > java.lang.OutOfMemoryError: Java heap space > at java.util.Arrays.copyOf(Arrays.java:3236) > at > java.io.ByteArrayOutputStream.toByteArray(ByteArrayOutputStream.java:191) > at > org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:82) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > 15/04/30 11:29:59 WARN TaskSetManager: Lost task 0.0 in stage 2.0 (TID 4, > localhost): java.lang.OutOfMemoryError: Java heap space > at java.util.Arrays.copyOf(Arrays.java:3236) > at > java.io.ByteArrayOutputStream.toByteArray(ByteArrayOutputStream.java:191) > at > org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:82) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > > > Any thoughts here? I would like a well-formed parallel approach per the > units of work per each iterated current pointer in the List to the > subsequent elements to generate the desired output. This is similar to > saying that for each iterable position in the List (pointer_i -> > element_i), the corresponding sublist of size (N - arg(pointer_i)) > following (element_i) is the only unit of work that is required to be > performed at (pointer_i) forall pointers in (N), and as such this > lexicographic process should be parallelizable over (N). I am new to spark, > so it's very possible I'm glossing over something glaringly apparent which > is causing parallelism gains of the framework to break down. > > Thanks, -Dan > > > -- Dan DeCapria CivicScience, Inc. Back-End Data IS/BI/DM/ML Specialist