Hi Sri, The yield() is from scala. In a for comprehension, it creates a returned collection, and yield adds elements to that return collection.
To output strings instead of tuple data, you could use aggregateByKey, instead of groupByKey: val output = wordCounts. map({case ((user, word), count) => (user, (word, count))}). aggregateByKey("")( seqOp = (s: String, t: (String, Int)) => s + t._1 + " [" + t._2 + "] ", combOp = _ + _ ). map({case (user, result) => user + " " + result}) Hope it helps, Huy. On Tue, Sep 22, 2015 at 2:35 AM sri hari kali charan Tummala < kali.tumm...@gmail.com> wrote: > Hi Huy, > > That worked like a charm, can we remove CompactBuffer from the output. > > (u2,CompactBuffer((three,2), (four,1))) > (u1,CompactBuffer((one,2), (two,1))) > > what yield does in spark ? can you please explain... > > Thanks > Sri > > > > On Mon, Sep 21, 2015 at 6:13 AM, Huy Banh <huy.b...@gmail.com> wrote: > >> Hi, >> >> If your input format is user -> comment, then you could: >> >> val comments = sc.parallelize(List(("u1", "one two one"), ("u2", "three >> four three"))) >> val wordCounts = comments. >> flatMap({case (user, comment) => >> for (word <- comment.split(" ")) yield(((user, word), 1)) }). >> reduceByKey(_ + _) >> >> val output = wordCounts. >> map({case ((user, word), count) => (user, (word, count))}). >> groupByKey() >> >> By Aniket, if we group by user first, it could run out of memory when >> spark tries to put all words in a single sequence, couldn't it? >> >> On Sat, Sep 19, 2015 at 11:05 PM Aniket Bhatnagar < >> aniket.bhatna...@gmail.com> wrote: >> >>> Using scala API, you can first group by user and then use combineByKey. >>> >>> Thanks, >>> Aniket >>> >>> On Sat, Sep 19, 2015, 6:41 PM kali.tumm...@gmail.com < >>> kali.tumm...@gmail.com> wrote: >>> >>>> Hi All, >>>> I would like to achieve this below output using spark , I managed to >>>> write >>>> in Hive and call it in spark but not in just spark (scala), how to group >>>> word counts on particular user (column) for example. >>>> Imagine users and their given tweets I want to do word count based on >>>> user >>>> name. >>>> >>>> Input:- >>>> kali A,B,A,B,B >>>> james B,A,A,A,B >>>> >>>> Output:- >>>> kali A [Count] B [Count] >>>> James A [Count] B [Count] >>>> >>>> My Hive Answer:- >>>> CREATE EXTERNAL TABLE TEST >>>> ( >>>> user_name string , >>>> COMMENTS STRING >>>> >>>> ) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\001' STORED AS TEXTFILE >>>> LOCATION '/data/kali/test'; ---- HDFS FOLDER (create hdfs folder and >>>> create a text file with data mentioned in the email) >>>> >>>> use default;select user_name,COLLECT_SET(text) from (select >>>> user_name,concat(sub,' ',count(comments)) as text from test LATERAL >>>> VIEW >>>> explode(split(comments,',')) subView AS sub group by user_name,sub)w >>>> group >>>> by user_name; >>>> >>>> Spark With Hive:- >>>> package com.examples >>>> >>>> /** >>>> * Created by kalit_000 on 17/09/2015. >>>> */ >>>> import org.apache.log4j.Logger >>>> import org.apache.log4j.Level >>>> import org.apache.spark.sql.SQLContext >>>> import org.apache.spark.sql.hive.HiveContext >>>> import org.apache.spark.{SparkContext, SparkConf} >>>> import org.apache.spark.SparkContext._ >>>> >>>> >>>> object HiveWordCount { >>>> >>>> def main(args: Array[String]): Unit = >>>> { >>>> Logger.getLogger("org").setLevel(Level.WARN) >>>> Logger.getLogger("akka").setLevel(Level.WARN) >>>> >>>> val conf = new >>>> >>>> SparkConf().setMaster("local").setAppName("HiveWordCount").set("spark.executor.memory", >>>> "1g") >>>> val sc = new SparkContext(conf) >>>> val sqlContext= new SQLContext(sc) >>>> >>>> val hc=new HiveContext(sc) >>>> >>>> hc.sql("CREATE EXTERNAL TABLE IF NOT EXISTS default.TEST (user_name >>>> string ,COMMENTS STRING )ROW FORMAT DELIMITED FIELDS TERMINATED BY '001' >>>> STORED AS TEXTFILE LOCATION '/data/kali/test' ") >>>> >>>> val op=hc.sql("select user_name,COLLECT_SET(text) from (select >>>> user_name,concat(sub,' ',count(comments)) as text from default.test >>>> LATERAL >>>> VIEW explode(split(comments,',')) subView AS sub group by >>>> user_name,sub)w >>>> group by user_name") >>>> >>>> op.collect.foreach(println) >>>> >>>> >>>> } >>>> >>>> >>>> >>>> >>>> Thanks >>>> >>>> >>>> >>>> >>>> -- >>>> View this message in context: >>>> http://apache-spark-user-list.1001560.n3.nabble.com/word-count-group-by-users-in-spark-tp24748.html >>>> Sent from the Apache Spark User List mailing list archive at Nabble.com. >>>> >>>> --------------------------------------------------------------------- >>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>>> For additional commands, e-mail: user-h...@spark.apache.org >>>> >>>> > > > -- > Thanks & Regards > Sri Tummala > >