never mind... i didnt realize you were referring to the first table as df. so you want to do a join between the first table and an RDD? the right way to do it within the data frame construct is to think of it as a join... map the second RDD to a data frame and do an inner join on ip
On Thu, May 21, 2015 at 10:54 AM, Ram Sriharsha <sriharsha....@gmail.com> wrote: > Your original code snippet seems incomplete and there isn't enough > information to figure out what problem you actually ran into > > from your original code snippet there is an rdd variable which is well > defined and a df variable that is not defined in the snippet of code you > sent > > one way to make this work is as below (until the last line is executed you > are actually not collecting anything on the driver, and if your dataset is > too big to collect on the driver for inspection just do a take(n) on the > result > > from pyspark.sql import Row,SQLContext > from pyspark.sql.functions import count > > sqlContext = SQLContext(sc) > > # convert list of ip into a data frame with column ip > Record = Row("ip") > df = sc.parallelize(map(lambda x: Record(x), ['208.51.22.18', > '31.207.6.173', '208.51.22.18'])).toDF() > > # obtain ip -> frequency and inspect > df.groupBy(df.ip).agg(count(df.ip)).show() > > +------------+---------+ > | ip|COUNT(ip)| > +------------+---------+ > |208.51.22.18| 2| > |31.207.6.173| 1| > +------------+---------+ > > what exactly is the issue you are running into when you say it doesn't get > through? > > On Thu, May 21, 2015 at 10:47 AM, ping yan <sharon...@gmail.com> wrote: > >> Thanks. I suspected that, but figured that df query inside a map sounds >> so intuitive that I don't just want to give up. >> >> I've tried join and even better with a DStream.transform() and it works! >> freqs = testips.transform(lambda rdd: rdd.join(kvrdd).map(lambda (x,y): >> y[1])) >> >> Thank you for the help! >> >> Ping >> >> On Thu, May 21, 2015 at 10:40 AM, Holden Karau <hol...@pigscanfly.ca> >> wrote: >> >>> So DataFrames, like RDDs, can only be accused from the driver. If your >>> IP Frequency table is small enough you could collect it and distribute it >>> as a hashmap with broadcast or you could also join your rdd with the ip >>> frequency table. Hope that helps :) >>> >>> >>> On Thursday, May 21, 2015, ping yan <sharon...@gmail.com> wrote: >>> >>>> I have a dataframe as a reference table for IP frequencies. >>>> e.g., >>>> >>>> ip freq >>>> 10.226.93.67 1 >>>> 10.226.93.69 1 >>>> 161.168.251.101 4 >>>> 10.236.70.2 1 >>>> 161.168.251.105 14 >>>> >>>> >>>> All I need is to query the df in a map. >>>> >>>> rdd = sc.parallelize(['208.51.22.18', '31.207.6.173', '208.51.22.18']) >>>> >>>> freqs = rdd.map(lambda x: df.where(df.ip ==x ).first()) >>>> >>>> It doesn't get through.. would appreciate any help. >>>> >>>> Thanks! >>>> Ping >>>> >>>> >>>> >>>> >>>> -- >>>> Ping Yan >>>> Ph.D. in Management >>>> Dept. of Management Information Systems >>>> University of Arizona >>>> Tucson, AZ 85721 >>>> >>>> >>> >>> -- >>> Cell : 425-233-8271 >>> Twitter: https://twitter.com/holdenkarau >>> Linked In: https://www.linkedin.com/in/holdenkarau >>> >>> >> >> >> -- >> Ping Yan >> Ph.D. in Management >> Dept. of Management Information Systems >> University of Arizona >> Tucson, AZ 85721 >> >> >