Your original code snippet seems incomplete and there isn't enough information to figure out what problem you actually ran into
from your original code snippet there is an rdd variable which is well defined and a df variable that is not defined in the snippet of code you sent one way to make this work is as below (until the last line is executed you are actually not collecting anything on the driver, and if your dataset is too big to collect on the driver for inspection just do a take(n) on the result from pyspark.sql import Row,SQLContext from pyspark.sql.functions import count sqlContext = SQLContext(sc) # convert list of ip into a data frame with column ip Record = Row("ip") df = sc.parallelize(map(lambda x: Record(x), ['208.51.22.18', '31.207.6.173', '208.51.22.18'])).toDF() # obtain ip -> frequency and inspect df.groupBy(df.ip).agg(count(df.ip)).show() +------------+---------+ | ip|COUNT(ip)| +------------+---------+ |208.51.22.18| 2| |31.207.6.173| 1| +------------+---------+ what exactly is the issue you are running into when you say it doesn't get through? On Thu, May 21, 2015 at 10:47 AM, ping yan <sharon...@gmail.com> wrote: > Thanks. I suspected that, but figured that df query inside a map sounds so > intuitive that I don't just want to give up. > > I've tried join and even better with a DStream.transform() and it works! > freqs = testips.transform(lambda rdd: rdd.join(kvrdd).map(lambda (x,y): > y[1])) > > Thank you for the help! > > Ping > > On Thu, May 21, 2015 at 10:40 AM, Holden Karau <hol...@pigscanfly.ca> > wrote: > >> So DataFrames, like RDDs, can only be accused from the driver. If your >> IP Frequency table is small enough you could collect it and distribute it >> as a hashmap with broadcast or you could also join your rdd with the ip >> frequency table. Hope that helps :) >> >> >> On Thursday, May 21, 2015, ping yan <sharon...@gmail.com> wrote: >> >>> I have a dataframe as a reference table for IP frequencies. >>> e.g., >>> >>> ip freq >>> 10.226.93.67 1 >>> 10.226.93.69 1 >>> 161.168.251.101 4 >>> 10.236.70.2 1 >>> 161.168.251.105 14 >>> >>> >>> All I need is to query the df in a map. >>> >>> rdd = sc.parallelize(['208.51.22.18', '31.207.6.173', '208.51.22.18']) >>> >>> freqs = rdd.map(lambda x: df.where(df.ip ==x ).first()) >>> >>> It doesn't get through.. would appreciate any help. >>> >>> Thanks! >>> Ping >>> >>> >>> >>> >>> -- >>> Ping Yan >>> Ph.D. in Management >>> Dept. of Management Information Systems >>> University of Arizona >>> Tucson, AZ 85721 >>> >>> >> >> -- >> Cell : 425-233-8271 >> Twitter: https://twitter.com/holdenkarau >> Linked In: https://www.linkedin.com/in/holdenkarau >> >> > > > -- > Ping Yan > Ph.D. in Management > Dept. of Management Information Systems > University of Arizona > Tucson, AZ 85721 > >