Hi All-
I am very new to Spark (and Scala) but I was hoping to get some help
creating a distributed job. Essentially I have a set of files (8k at 300mb
each in HDFS) which was created in Hive (Hadoop) with the form:
Int
Array<Map<int,string>>

I want to create a spark job that reads these files and creates a
combination of the int array. Meaning it iterates over every int, ignoring
the string, in the Array<Map<int,string>> and produces a [k,v] of
(int_1,int_2), 1 Then it reduces by the k and sums the v to produce a final
result over a large dataset of :
(int_1, int_2), count

It is producing a set of all possible combinations for that given record.
In order to do this I have locally downloaded a single file and am running
Spark locally on my computer to try to process a a single file. The file
is 382715 lines (~400mb), however some of the arrays can be quite big.

My Spark Job looks like the following:

import collection.mutable.HashMap
import collection.mutable.ListBuffer

def getCombinations(a: List[Int]) : HashMap[(Int,Int),Int] = {
 val comboMap = new HashMap[(Int,Int),Int]()
var aSorted = a.sorted
for (x <- 0 to aSorted.length - 1){
 var b = aSorted(0)
aSorted = aSorted.drop(1)
for (y <- 0 to aSorted.length - 1){
 comboMap((b, aSorted(y))) = 1
}
}
return comboMap
}

def getArray(line: String):List[Int] = {
var a = line.split("\\x01")(1).split("\\x02")
var ids = new ListBuffer[Int]
 for (x <- 0 to a.length - 1){
ids += Integer.parseInt(a(x).split("\\x03")(0))
}
 return ids.toList
}

val textFile = sc.textFile("slices.txt")
val counts = textFile.map(line => getArray(line)).flatMap(a =>
getCombinations(a)).reduceByKey(_ + _)
counts.saveAsTextFile("spark_test")


I am using getArray() to process the text file from its hive storage form,
and then getCombinations() to create the k,v list then finally reducing
over this list. Eventually I will move this to a distributed cluster but I
was hoping to determine if there is anyway to parallelize or optimize this
job for my local computer. Currently it fails with:

java.lang.OutOfMemoryError (java.lang.OutOfMemoryError: Java heap space)

org.apache.spark.util.AppendOnlyMap.growTable(AppendOnlyMap.scala:200)
org.apache.spark.util.AppendOnlyMap.incrementSize(AppendOnlyMap.scala:180)
org.apache.spark.util.AppendOnlyMap.changeValue(AppendOnlyMap.scala:130)
org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:42)
org.apache.spark.rdd.PairRDDFunctions$$anonfun$1.apply(PairRDDFunctions.scala:91)
org.apache.spark.rdd.PairRDDFunctions$$anonfun$1.apply(PairRDDFunctions.scala:91)
org.apache.spark.rdd.RDD$$anonfun$1.apply(RDD.scala:444)
org.apache.spark.rdd.RDD$$anonfun$1.apply(RDD.scala:444)
Thanks,Brad

Reply via email to