On this note, you can do something smarter that the basic lookup function.
You could convert each partition of the key-value pair RDD into a hashmap
using something like
val rddOfHashmaps = pairRDD.mapPartitions(iterator => {
val hashmap = new HashMap[String, ArrayBuffer[Double]]
iterator.foreach { case (key, value} => hashmap.getOrElseUpdate(key,
new ArrayBuffer[Double]) += value
Iterator(hashmap)
}, preserveParitioning = true)
And then you can do a variation of the lookup
function<https://github.com/apache/incubator-spark/blob/master/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala#L549>to
lookup the right partition, and then within that partition directly
lookup the hashmap and return the value (rather than scanning the whole
partition). That give practically O(1) lookup time instead of O(N). But i
doubt it will match something that a dedicated lookup system like memcached
would achieve.
TD
On Fri, Jan 24, 2014 at 1:36 PM, Andrew Ash <[email protected]> wrote:
> By my reading of the code, it uses the partitioner to decide which worker
> the key lands on, then does an O(N) scan of that partition. I think we're
> saying the same thing.
>
>
> https://github.com/apache/incubator-spark/blob/master/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala#L549
>
>
> On Fri, Jan 24, 2014 at 1:26 PM, Cheng Lian <[email protected]> wrote:
>
>> PairRDDFunctions.lookup is good enough in Spark, it's just that its time
>> complexity is O(N). Of course, for RDDs equipped with a partitioner, N is
>> the average size of a partition.
>>
>>
>> On Sat, Jan 25, 2014 at 5:16 AM, Andrew Ash <[email protected]> wrote:
>>
>>> If you have a pair RDD (an RDD[A,B]) then you can use the .lookup()
>>> method on it for faster access.
>>>
>>>
>>> http://spark.incubator.apache.org/docs/latest/api/core/index.html#org.apache.spark.rdd.PairRDDFunctions
>>>
>>> Spark's strength is running computations across a large set of data. If
>>> you're trying to do fast lookup of a few individual keys, I'd recommend
>>> something more like memcached or Elasticsearch.
>>>
>>>
>>> On Fri, Jan 24, 2014 at 1:11 PM, Manoj Samel
>>> <[email protected]>wrote:
>>>
>>>> Yes, that works.
>>>>
>>>> But then the hashmap functionality of the fast key lookup etc. is gone
>>>> and the search will be linear using a iterator etc. Not sure if Spark
>>>> internally creates additional optimizations for Seq but otherwise one has
>>>> to assume this becomes a List/Array without a fast key lookup of a hashmap
>>>> or b-tree
>>>>
>>>> Any thoughts ?
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> On Fri, Jan 24, 2014 at 1:00 PM, Frank Austin Nothaft <
>>>> [email protected]> wrote:
>>>>
>>>>> Manoj,
>>>>>
>>>>> I assume you’re trying to create an RDD[(String, Double)]? Couldn’t
>>>>> you just do:
>>>>>
>>>>> val cr_rdd = sc.parallelize(cr.toSeq)
>>>>>
>>>>> The toSeq would convert the HashMap[String,Double] into a Seq[(String,
>>>>> Double)] before calling the parallelize function.
>>>>>
>>>>> Regards,
>>>>>
>>>>> Frank Austin Nothaft
>>>>> [email protected]
>>>>> [email protected]
>>>>> 202-340-0466
>>>>>
>>>>> On Jan 24, 2014, at 12:56 PM, Manoj Samel <[email protected]>
>>>>> wrote:
>>>>>
>>>>> > Is there a way to create RDD over a hashmap ?
>>>>> >
>>>>> > If I have a hash map and try sc.parallelize, it gives
>>>>> >
>>>>> > <console>:17: error: type mismatch;
>>>>> > found : scala.collection.mutable.HashMap[String,Double]
>>>>> > required: Seq[?]
>>>>> > Error occurred in an application involving default arguments.
>>>>> > val cr_rdd = sc.parallelize(cr)
>>>>> > ^
>>>>>
>>>>>
>>>>
>>>
>>
>