Thought about it some more, and simplified the problem space for
discussions:

Given: JavaPairRDD<String, Integer> c1; // c1.count() == 8000.

Goal: JavaPairRDD<Tuple2<String,Integer>,Tuple2<String,Integer>> c2; // all
lexicographical pairs
Where: all lexicographic permutations on c1 ::
(c1_i._1().compareTo(c1_j._1()) < 0) -> new
Tuple2<Tuple2<String,Integer>,Tuple2<String,Integer>>(c1_i, c1_j); //
forall c1_i < c1_j \in c1

Not sure how to efficiently generate c2.
c1.cartesian(c1).filter(... (c1_i._1().compareTo(c1_j._1()) < 0) ...) was
just terrible performance-wise.

Thanks, -Dan




On Thu, Apr 30, 2015 at 11:58 AM, Dan DeCapria, CivicScience <
dan.decap...@civicscience.com> wrote:

> I am trying to generate all (N-1)(N)/2 lexicographical 2-tuples from a
> glom() JavaPairRDD<List<Tuple2>>.  The construction of these initial
> Tuple2's JavaPairRDD<AQ,Integer> space is well formed from case classes I
> provide it (AQ, AQV, AQQ, CT) and is performant; minimized code:
>
> SparkConf conf = new SparkConf()
>         .setAppName("JavaTestMain")
>         .set("spark.driver.maxResultSize", "0")
>         .set("spark.akka.frameSize", "512");
> ...
> JavaRDD<AQV> aqv =
> sc.textFile(data_account_question_value_file).map((String line) -> {
>     String[] s = line.trim().split(",");
>     AQV key = new AQV(Integer.parseInt(s[0].trim()), s[1].trim(),
> s[2].trim()); // (0,1,2)
>     return key;
> });
> JavaPairRDD<AQ, Integer> c0 = aqv.distinct().mapToPair((AQV c) -> {
>         return new Tuple2<AQ, Integer>(new AQ(c.getAccount(),
> c.getQuestion()), 1);
> });
> JavaPairRDD<AQ, Integer> c1 = c0.reduceByKey((Integer i1, Integer i2) -> {
>         return (i1 + i2);
> });
> logger.info(c1.count());
>
> This code snippet above works well and returns a value JavaTestMain: 8010
> in a few seconds which is perfect.  When I try to generate the iterative
> lexicographic permutation space (32,076,045 elements), it is not performant
> and results in a thrown java.lang.OutOfMemoryError: Java heap space;
> minimized code continued:
>
> JavaRDD<List<Tuple2<AQ, Integer>>> c2 = c1.glom();
> JavaRDD<CT> c3 = c2.flatMap((List<Tuple2<AQ,Integer>> cl) -> {
>         List<CT> output = new ArrayList<>((cl.size() - 1) * (cl.size()) /
> 2);
>         Tuple2<AQ, Integer> key1, key2;
>         for (ListIterator<Tuple2<AQ, Integer>> cloit = cl.listIterator();
> cloit.hasNext(); ) { // outer loop
>             key1 = cloit.next();
>             for (ListIterator<Tuple2<AQ, Integer>> cliit =
> cl.listIterator(cloit.nextIndex()); cliit.hasNext(); ) {    //  inner loop,
> if applicable
>                 key2 = cliit.next();
>                 output.add(new CT(new AQQ(key1._1(), key2._1()),
> key1._2(), key2._2()));
>             }
>         }
>         return output;
> });
> c3.collect().stream().forEach(System.out::println);
>
>
> 15/04/30 11:29:59 ERROR Executor: Exception in task 0.0 in stage 2.0 (TID
> 4)
> java.lang.OutOfMemoryError: Java heap space
> at java.util.Arrays.copyOf(Arrays.java:3236)
> at
> java.io.ByteArrayOutputStream.toByteArray(ByteArrayOutputStream.java:191)
> at
> org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:82)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> 15/04/30 11:29:59 ERROR SparkUncaughtExceptionHandler: Uncaught exception
> in thread Thread[Executor task launch worker-0,5,main]
> java.lang.OutOfMemoryError: Java heap space
> at java.util.Arrays.copyOf(Arrays.java:3236)
> at
> java.io.ByteArrayOutputStream.toByteArray(ByteArrayOutputStream.java:191)
> at
> org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:82)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> 15/04/30 11:29:59 WARN TaskSetManager: Lost task 0.0 in stage 2.0 (TID 4,
> localhost): java.lang.OutOfMemoryError: Java heap space
> at java.util.Arrays.copyOf(Arrays.java:3236)
> at
> java.io.ByteArrayOutputStream.toByteArray(ByteArrayOutputStream.java:191)
> at
> org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:82)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
>
>
> Any thoughts here? I would like a well-formed parallel approach per the
> units of work per each iterated current pointer in the List to the
> subsequent elements to generate the desired output. This is similar to
> saying that for each iterable position in the List (pointer_i ->
> element_i), the corresponding sublist of size (N - arg(pointer_i))
> following (element_i) is the only unit of work that is required to be
> performed at (pointer_i) forall pointers in (N), and as such this
> lexicographic process should be parallelizable over (N). I am new to spark,
> so it's very possible I'm glossing over something glaringly apparent which
> is causing parallelism gains of the framework to break down.
>
> Thanks, -Dan
>
>
>


-- 
Dan DeCapria
CivicScience, Inc.
Back-End Data IS/BI/DM/ML Specialist

Reply via email to