You usually don't need to do so explicitly since the implicit conversions in Spark will take care of that for you. Any RDD[(K, V)] is a PairRDD; so, e.g., sc.parallelize(1 to 10).map(i => (i, i.toString)) is just one of many ways to generate a PairRDD .
On Fri, Jan 24, 2014 at 2:23 PM, Manoj Samel <[email protected]>wrote: > How would I create a PairRDD ? > > > On Fri, Jan 24, 2014 at 1:54 PM, Tathagata Das < > [email protected]> wrote: > >> On this note, you can do something smarter that the basic lookup >> function. You could convert each partition of the key-value pair RDD into a >> hashmap using something like >> >> val rddOfHashmaps = pairRDD.mapPartitions(iterator => { >> val hashmap = new HashMap[String, ArrayBuffer[Double]] >> iterator.foreach { case (key, value} => hashmap.getOrElseUpdate(key, >> new ArrayBuffer[Double]) += value >> Iterator(hashmap) >> }, preserveParitioning = true) >> >> And then you can do a variation of the lookup >> function<https://github.com/apache/incubator-spark/blob/master/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala#L549>to >> lookup the right partition, and then within that partition directly >> lookup the hashmap and return the value (rather than scanning the whole >> partition). That give practically O(1) lookup time instead of O(N). But i >> doubt it will match something that a dedicated lookup system like memcached >> would achieve. >> >> TD >> >> >> >> >> On Fri, Jan 24, 2014 at 1:36 PM, Andrew Ash <[email protected]> wrote: >> >>> By my reading of the code, it uses the partitioner to decide which >>> worker the key lands on, then does an O(N) scan of that partition. I think >>> we're saying the same thing. >>> >>> >>> https://github.com/apache/incubator-spark/blob/master/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala#L549 >>> >>> >>> On Fri, Jan 24, 2014 at 1:26 PM, Cheng Lian <[email protected]>wrote: >>> >>>> PairRDDFunctions.lookup is good enough in Spark, it's just that its >>>> time complexity is O(N). Of course, for RDDs equipped with a partitioner, >>>> N is the average size of a partition. >>>> >>>> >>>> On Sat, Jan 25, 2014 at 5:16 AM, Andrew Ash <[email protected]>wrote: >>>> >>>>> If you have a pair RDD (an RDD[A,B]) then you can use the .lookup() >>>>> method on it for faster access. >>>>> >>>>> >>>>> http://spark.incubator.apache.org/docs/latest/api/core/index.html#org.apache.spark.rdd.PairRDDFunctions >>>>> >>>>> Spark's strength is running computations across a large set of data. >>>>> If you're trying to do fast lookup of a few individual keys, I'd >>>>> recommend >>>>> something more like memcached or Elasticsearch. >>>>> >>>>> >>>>> On Fri, Jan 24, 2014 at 1:11 PM, Manoj Samel <[email protected] >>>>> > wrote: >>>>> >>>>>> Yes, that works. >>>>>> >>>>>> But then the hashmap functionality of the fast key lookup etc. is >>>>>> gone and the search will be linear using a iterator etc. Not sure if >>>>>> Spark >>>>>> internally creates additional optimizations for Seq but otherwise one has >>>>>> to assume this becomes a List/Array without a fast key lookup of a >>>>>> hashmap >>>>>> or b-tree >>>>>> >>>>>> Any thoughts ? >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> On Fri, Jan 24, 2014 at 1:00 PM, Frank Austin Nothaft < >>>>>> [email protected]> wrote: >>>>>> >>>>>>> Manoj, >>>>>>> >>>>>>> I assume you’re trying to create an RDD[(String, Double)]? Couldn’t >>>>>>> you just do: >>>>>>> >>>>>>> val cr_rdd = sc.parallelize(cr.toSeq) >>>>>>> >>>>>>> The toSeq would convert the HashMap[String,Double] into a >>>>>>> Seq[(String, Double)] before calling the parallelize function. >>>>>>> >>>>>>> Regards, >>>>>>> >>>>>>> Frank Austin Nothaft >>>>>>> [email protected] >>>>>>> [email protected] >>>>>>> 202-340-0466 >>>>>>> >>>>>>> On Jan 24, 2014, at 12:56 PM, Manoj Samel <[email protected]> >>>>>>> wrote: >>>>>>> >>>>>>> > Is there a way to create RDD over a hashmap ? >>>>>>> > >>>>>>> > If I have a hash map and try sc.parallelize, it gives >>>>>>> > >>>>>>> > <console>:17: error: type mismatch; >>>>>>> > found : scala.collection.mutable.HashMap[String,Double] >>>>>>> > required: Seq[?] >>>>>>> > Error occurred in an application involving default arguments. >>>>>>> > val cr_rdd = sc.parallelize(cr) >>>>>>> > ^ >>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >
