Your original code snippet seems incomplete and there isn't enough
information to figure out what problem you actually ran into

from your original code snippet there is an rdd variable which is well
defined  and a df variable that is not defined in the snippet of code you
sent

one way to make this work is as below (until the last line is executed you
are actually not collecting anything on the driver, and if your dataset is
too big to collect on the driver for inspection just do a take(n) on the
result

from pyspark.sql import Row,SQLContext
from pyspark.sql.functions import count

sqlContext = SQLContext(sc)

# convert list of ip into a data frame with column ip
Record = Row("ip")
df = sc.parallelize(map(lambda x: Record(x), ['208.51.22.18',
'31.207.6.173', '208.51.22.18'])).toDF()

# obtain ip -> frequency and inspect
df.groupBy(df.ip).agg(count(df.ip)).show()

+------------+---------+
|          ip|COUNT(ip)|
+------------+---------+
|208.51.22.18|        2|
|31.207.6.173|        1|
+------------+---------+

what exactly is the issue you are running into when you say it doesn't get
through?

On Thu, May 21, 2015 at 10:47 AM, ping yan <sharon...@gmail.com> wrote:

> Thanks. I suspected that, but figured that df query inside a map sounds so
> intuitive that I don't just want to give up.
>
> I've tried join and even better with a DStream.transform() and it works!
> freqs = testips.transform(lambda rdd: rdd.join(kvrdd).map(lambda (x,y):
> y[1]))
>
> Thank you for the help!
>
> Ping
>
> On Thu, May 21, 2015 at 10:40 AM, Holden Karau <hol...@pigscanfly.ca>
> wrote:
>
>> So DataFrames, like RDDs, can only be accused from the driver. If your
>> IP Frequency table is small enough you could collect it and distribute it
>> as a hashmap with broadcast or you could also join your rdd with the ip
>> frequency table. Hope that helps :)
>>
>>
>> On Thursday, May 21, 2015, ping yan <sharon...@gmail.com> wrote:
>>
>>> I have a dataframe as a reference table for IP frequencies.
>>> e.g.,
>>>
>>> ip                       freq
>>> 10.226.93.67         1
>>> 10.226.93.69         1
>>> 161.168.251.101   4
>>> 10.236.70.2           1
>>> 161.168.251.105 14
>>>
>>>
>>> All I need is to query the df in a map.
>>>
>>> rdd = sc.parallelize(['208.51.22.18', '31.207.6.173', '208.51.22.18'])
>>>
>>> freqs = rdd.map(lambda x: df.where(df.ip ==x ).first())
>>>
>>> It doesn't get through.. would appreciate any help.
>>>
>>> Thanks!
>>> Ping
>>>
>>>
>>>
>>>
>>> --
>>> Ping Yan
>>> Ph.D. in Management
>>> Dept. of Management Information Systems
>>> University of Arizona
>>> Tucson, AZ 85721
>>>
>>>
>>
>> --
>> Cell : 425-233-8271
>> Twitter: https://twitter.com/holdenkarau
>> Linked In: https://www.linkedin.com/in/holdenkarau
>>
>>
>
>
> --
> Ping Yan
> Ph.D. in Management
> Dept. of Management Information Systems
> University of Arizona
> Tucson, AZ 85721
>
>

Reply via email to