You usually don't need to do so explicitly since the implicit conversions
in Spark will take care of that for you.  Any RDD[(K, V)] is a PairRDD; so,
e.g., sc.parallelize(1 to 10).map(i => (i, i.toString)) is just one of many
ways to generate a PairRDD .


On Fri, Jan 24, 2014 at 2:23 PM, Manoj Samel <[email protected]>wrote:

> How would I create a PairRDD ?
>
>
> On Fri, Jan 24, 2014 at 1:54 PM, Tathagata Das <
> [email protected]> wrote:
>
>> On this note, you can do something smarter that the basic lookup
>> function. You could convert each partition of the key-value pair RDD into a
>> hashmap using something like
>>
>> val rddOfHashmaps = pairRDD.mapPartitions(iterator => {
>>    val hashmap = new HashMap[String, ArrayBuffer[Double]]
>>    iterator.foreach { case (key, value}  => hashmap.getOrElseUpdate(key,
>> new ArrayBuffer[Double]) += value
>>    Iterator(hashmap)
>>  }, preserveParitioning = true)
>>
>> And then you can do a variation of the lookup 
>> function<https://github.com/apache/incubator-spark/blob/master/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala#L549>to
>>  lookup the right partition, and then within that partition directly
>> lookup the hashmap and return the value (rather than scanning the whole
>> partition). That give practically O(1) lookup time instead of O(N). But i
>> doubt it will match something that a dedicated lookup system like memcached
>> would achieve.
>>
>> TD
>>
>>
>>
>>
>> On Fri, Jan 24, 2014 at 1:36 PM, Andrew Ash <[email protected]> wrote:
>>
>>> By my reading of the code, it uses the partitioner to decide which
>>> worker the key lands on, then does an O(N) scan of that partition.  I think
>>> we're saying the same thing.
>>>
>>>
>>> https://github.com/apache/incubator-spark/blob/master/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala#L549
>>>
>>>
>>> On Fri, Jan 24, 2014 at 1:26 PM, Cheng Lian <[email protected]>wrote:
>>>
>>>> PairRDDFunctions.lookup is good enough in Spark, it's just that its
>>>> time complexity is O(N).  Of course, for RDDs equipped with a partitioner,
>>>> N is the average size of a partition.
>>>>
>>>>
>>>> On Sat, Jan 25, 2014 at 5:16 AM, Andrew Ash <[email protected]>wrote:
>>>>
>>>>> If you have a pair RDD (an RDD[A,B]) then you can use the .lookup()
>>>>> method on it for faster access.
>>>>>
>>>>>
>>>>> http://spark.incubator.apache.org/docs/latest/api/core/index.html#org.apache.spark.rdd.PairRDDFunctions
>>>>>
>>>>> Spark's strength is running computations across a large set of data.
>>>>>  If you're trying to do fast lookup of a few individual keys, I'd 
>>>>> recommend
>>>>> something more like memcached or Elasticsearch.
>>>>>
>>>>>
>>>>> On Fri, Jan 24, 2014 at 1:11 PM, Manoj Samel <[email protected]
>>>>> > wrote:
>>>>>
>>>>>> Yes, that works.
>>>>>>
>>>>>> But then the hashmap functionality of the fast key lookup etc. is
>>>>>> gone and the search will be linear using a iterator etc. Not sure if 
>>>>>> Spark
>>>>>> internally creates additional optimizations for Seq but otherwise one has
>>>>>> to assume this becomes a List/Array without a fast key lookup of a 
>>>>>> hashmap
>>>>>> or b-tree
>>>>>>
>>>>>> Any thoughts ?
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Fri, Jan 24, 2014 at 1:00 PM, Frank Austin Nothaft <
>>>>>> [email protected]> wrote:
>>>>>>
>>>>>>> Manoj,
>>>>>>>
>>>>>>> I assume you’re trying to create an RDD[(String, Double)]? Couldn’t
>>>>>>> you just do:
>>>>>>>
>>>>>>> val cr_rdd = sc.parallelize(cr.toSeq)
>>>>>>>
>>>>>>> The toSeq would convert the HashMap[String,Double] into a
>>>>>>> Seq[(String, Double)] before calling the parallelize function.
>>>>>>>
>>>>>>> Regards,
>>>>>>>
>>>>>>> Frank Austin Nothaft
>>>>>>> [email protected]
>>>>>>> [email protected]
>>>>>>> 202-340-0466
>>>>>>>
>>>>>>> On Jan 24, 2014, at 12:56 PM, Manoj Samel <[email protected]>
>>>>>>> wrote:
>>>>>>>
>>>>>>> > Is there a way to create RDD over a hashmap ?
>>>>>>> >
>>>>>>> > If I have a hash map and try sc.parallelize, it gives
>>>>>>> >
>>>>>>> > <console>:17: error: type mismatch;
>>>>>>> >  found   : scala.collection.mutable.HashMap[String,Double]
>>>>>>> >  required: Seq[?]
>>>>>>> > Error occurred in an application involving default arguments.
>>>>>>> >        val cr_rdd = sc.parallelize(cr)
>>>>>>> >                                    ^
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to