Hello,
I'm trying to process text documents and build a context vector for each
term/feature in the corpus. Context vector is vector of features that are
"around" a term in the corpus within a distance of x.
Now, when I run this code, the process suffers a GC overhead error and I
get an OOM when it builds this featureContextVector.
Some configurations for perusal.
Cluster type: Stand alone cluster
Number of nodes: 1(master and slave same node)
Job memory: tried with default - 512m, 1G and 2G as well.
Data size: number of docs size is 12.8M, each document being 50-100 chars
long - 1.2G raw data size not considering any JVM overhead.
So, while it looks reasonable that, the job definitely needs more memory, I
was wondering why the records in RDD doesn't spill over to disk so as to
reduce the memory pressure. I also tried changing StorageLevel of the
parent RDD(in this case the docs RDD - see below) type to DISK_ONLY but to
no effect.
1. I want to know if this is an expected behavior and if so why.
2. And, when does RDD spill over to disk kick-in? Is it something that we
should enable while constructing the RDD or SparkContext? Kindly clarify.
3. On a slightly unrelated note, I also want to know if there's an elegant
way to create incremental document Ids when processing documents using
spark. The problem I face is, when I iterate over an RDD, the processing
might get distributed over nodes. So, I can't have an id that's unique and
auto incr across these nodes. I tried .collect().zipWithIndex(), but, that
has the limitation of storing data in memory, which is not desirable when
doing large scale processing. Am I missing something?
Below is the snippet that does the job...
Some type/glossary here:
*docs : RDD[Document], Document - case class to hold document with some
metadata*
*list2FreqMap - transforms a list[T] to Map[T, Int] where, Int value is the
number of times the key has occurred in the list.*
=== Snippet ===
val featureContextVector = docs.flatMap{ doc =>
val terms = doc.tokenize.toList
val context = terms.zipWithIndex.map{ case (term, index) => term ->
(terms, index) }
context
}.groupBy(_._1)
.map{ kv =>
(kv._1, kv._2.map(_._2))
}.map{ kv =>
// contexts are generated based on term, its index in the document
and the window size. we pick "window" items "around" the current position.
val (term, termsIndexPairs) = kv
val contextList = termsIndexPairs.flatMap{ case (terms, index) =>
terms.slice(indexWithinBounds(index-windowSize, terms.size), index)
++ terms.slice(index+1, indexWithinBounds(index+windowSize, terms.size)+1)
}
val contextVectorForTerm = list2FreqMap(contextList).toIterable
term -> contextVectorForTerm
}
--
It's just about how deep your longing is!