never mind... i didnt realize you were referring to the first table as df.
so you want to do a join between the first table and an RDD?
the right way to do it within the data frame construct is to think of it as
a join... map the second RDD to a data frame and do an inner join on ip

On Thu, May 21, 2015 at 10:54 AM, Ram Sriharsha <sriharsha....@gmail.com>
wrote:

> Your original code snippet seems incomplete and there isn't enough
> information to figure out what problem you actually ran into
>
> from your original code snippet there is an rdd variable which is well
> defined  and a df variable that is not defined in the snippet of code you
> sent
>
> one way to make this work is as below (until the last line is executed you
> are actually not collecting anything on the driver, and if your dataset is
> too big to collect on the driver for inspection just do a take(n) on the
> result
>
> from pyspark.sql import Row,SQLContext
> from pyspark.sql.functions import count
>
> sqlContext = SQLContext(sc)
>
> # convert list of ip into a data frame with column ip
> Record = Row("ip")
> df = sc.parallelize(map(lambda x: Record(x), ['208.51.22.18',
> '31.207.6.173', '208.51.22.18'])).toDF()
>
> # obtain ip -> frequency and inspect
> df.groupBy(df.ip).agg(count(df.ip)).show()
>
> +------------+---------+
> |          ip|COUNT(ip)|
> +------------+---------+
> |208.51.22.18|        2|
> |31.207.6.173|        1|
> +------------+---------+
>
> what exactly is the issue you are running into when you say it doesn't get
> through?
>
> On Thu, May 21, 2015 at 10:47 AM, ping yan <sharon...@gmail.com> wrote:
>
>> Thanks. I suspected that, but figured that df query inside a map sounds
>> so intuitive that I don't just want to give up.
>>
>> I've tried join and even better with a DStream.transform() and it works!
>> freqs = testips.transform(lambda rdd: rdd.join(kvrdd).map(lambda (x,y):
>> y[1]))
>>
>> Thank you for the help!
>>
>> Ping
>>
>> On Thu, May 21, 2015 at 10:40 AM, Holden Karau <hol...@pigscanfly.ca>
>> wrote:
>>
>>> So DataFrames, like RDDs, can only be accused from the driver. If your
>>> IP Frequency table is small enough you could collect it and distribute it
>>> as a hashmap with broadcast or you could also join your rdd with the ip
>>> frequency table. Hope that helps :)
>>>
>>>
>>> On Thursday, May 21, 2015, ping yan <sharon...@gmail.com> wrote:
>>>
>>>> I have a dataframe as a reference table for IP frequencies.
>>>> e.g.,
>>>>
>>>> ip                       freq
>>>> 10.226.93.67         1
>>>> 10.226.93.69         1
>>>> 161.168.251.101   4
>>>> 10.236.70.2           1
>>>> 161.168.251.105 14
>>>>
>>>>
>>>> All I need is to query the df in a map.
>>>>
>>>> rdd = sc.parallelize(['208.51.22.18', '31.207.6.173', '208.51.22.18'])
>>>>
>>>> freqs = rdd.map(lambda x: df.where(df.ip ==x ).first())
>>>>
>>>> It doesn't get through.. would appreciate any help.
>>>>
>>>> Thanks!
>>>> Ping
>>>>
>>>>
>>>>
>>>>
>>>> --
>>>> Ping Yan
>>>> Ph.D. in Management
>>>> Dept. of Management Information Systems
>>>> University of Arizona
>>>> Tucson, AZ 85721
>>>>
>>>>
>>>
>>> --
>>> Cell : 425-233-8271
>>> Twitter: https://twitter.com/holdenkarau
>>> Linked In: https://www.linkedin.com/in/holdenkarau
>>>
>>>
>>
>>
>> --
>> Ping Yan
>> Ph.D. in Management
>> Dept. of Management Information Systems
>> University of Arizona
>> Tucson, AZ 85721
>>
>>
>

Reply via email to