This may be what you want

val conf = new SparkConf().setMaster("local").setAppName("test")
val sc = new SparkContext(conf)

val inputRdd = sc.parallelize(Array(("key_1", "a"), ("key_1","b"),
("key_2","c"), ("key_2", "d")))

val result = inputRdd.groupByKey().flatMap(e=>{
  val key= e._1
  val valuesWithIndex = e._2.zipWithIndex
  valuesWithIndex.map(value => (key, value._2, value._1))
})

result.collect() foreach println


/// output

*(key_2,0,c)
(key_2,1,d)
(key_1,0,a)
(key_1,1,b)*


On Thu, Jul 30, 2015 at 10:19 AM, ayan guha <guha.a...@gmail.com> wrote:

> Is there a relationship between data and index? I.e with a,b,c to 1,2,3?
> On 30 Jul 2015 12:13, "askformore" <askf0rm...@163.com> wrote:
>
>> I have some data like this: RDD[(String, String)] = ((*key-1*, a), (
>> *key-1*,b), (*key-2*,a), (*key-2*,c),(*key-3*,b),(*key-4*,d)) and I want
>> to group the data by Key, and for each group, add index fields to the
>> groupmember, at last I can transform the data to below : RDD[(String,
>> *Int*, String)] = ((key-1,*1*, a), (key-1,*2,*b), (key-2,*1*,a), (key-2,
>> *2*,b),(key-3,*1*,b),(key-4,*1*,d)) I tried to groupByKey firstly, then
>> I got a RDD[(String, Iterable[String])], but I don't know how to use
>> zipWithIndex function to each Iterable... thanks.
>> ------------------------------
>> View this message in context: help plz! how to use zipWithIndex to each
>> subset of a RDD
>> <http://apache-spark-user-list.1001560.n3.nabble.com/help-plz-how-to-use-zipWithIndex-to-each-subset-of-a-RDD-tp24071.html>
>> Sent from the Apache Spark User List mailing list archive
>> <http://apache-spark-user-list.1001560.n3.nabble.com/> at Nabble.com.
>>
>


-- 
Best Regards

Jeff Zhang

Reply via email to