Hi
If you asked to any DB developer, s/he would tell you the construct:
select userid,time,state,
rank() over (partition by userId order by time desc) r
from event) where r=1
I am not sure if Dataframe supports it, though I am sure we can extend
functions to implement it.
But here is one not using DF, so we can access to
repartitionAndSortWithinPartitions, which is just the tool we need.
Idea is:
- Partition using userId, making sure we get all records of a userid in one
partition
def customPartitioner(k):
#partition using user id
return int(k.split("~")[0])
- Sort data userid and then by time desc
def getSortingKey(k):
#sort using user id~time
return k
- Just pick up the first record for each userid
def getFinal(itr):
prev=(0,0,"NULL")
for v in itr:
curr = (v[0].split("~")[0],v[0].split("~")[1],v[1])
if prev[0]<>curr[0]:
del prev
prev=curr
yield curr
else:
del prev
prev = curr
Lets set up data and test.
data=[[1,20150101,"S1"],[2,20150101,"S2"],[3,20150101,"S3"],
[1,20150102,"S1C1"],[1,20150112,"S1C2"],[1,20150303,"S1C3"],
[2,20150105,"S2C1"],[2,20150501,"S2C2"],[3,20150201,"S3C1"]]
userSchemaRDD = sc.parallelize(data).map(lambda t:
(str(t[0])+"~"+str(t[1]),(t[2])))
for i in userSchemaRDD.collect():
print i
rep = userSchemaRDD.repartitionAndSortWithinPartitions(numPartitions=2,
partitionFunc=customPartitioner, ascending=False, keyfunc=getSortingKey)
res = rep.mapPartitions(getFinal)
for j in res.collect():
print j
Data:
('1~20150101', 'S1')
('2~20150101', 'S2')
('3~20150101', 'S3')
('1~20150102', 'S1C1')
('1~20150112', 'S1C2')
('1~20150303', 'S1C3')
('2~20150105', 'S2C1')
('2~20150501', 'S2C2')
('3~20150201', 'S3C1')
Result
('2', '20150501', 'S2C2')
('3', '20150201', 'S3C1')
('1', '20150303', 'S1C3')
Of course this is rough code, and I really did not tried to get any fancier
than needed, but it should convey the idea.
Kindly let me know if this works (or not :) ) .
To devs, any idea if analytical functions (as they are known in DB world)
are in roadmap?
Best
Ayan
On Sat, May 16, 2015 at 7:49 AM, Justin Yip <[email protected]> wrote:
> Hi Ayan,
>
> I have a DF constructed from the following case class Event:
>
> case class State { attr1: String, ....}
>
> case class Event {
> userId: String,
> time: Long,
> state: State
> }
>
> I would like to generate a DF which contains the latest state of each
> userId. I could have first compute the latest time of each user, and join
> it back to the original data frame. But that involves two shuffles. Hence
> would like to see if there are ways to improve the performance.
>
> Thanks.
>
> Justin
>
>
> On Fri, May 15, 2015 at 6:32 AM, ayan guha <[email protected]> wrote:
>
>> can you kindly elaborate on this? it should be possible to write udafs in
>> similar lines of sum/min etc.
>>
>> On Fri, May 15, 2015 at 5:49 AM, Justin Yip <[email protected]>
>> wrote:
>>
>>> Hello,
>>>
>>> May I know if these is way to implement aggregate function for grouped
>>> data in DataFrame? I dug into the doc but didn't find any apart from the
>>> UDF functions which applies on a Row. Maybe I have missed something. Thanks.
>>>
>>> Justin
>>>
>>> ------------------------------
>>> View this message in context: Custom Aggregate Function for DataFrame
>>> <http://apache-spark-user-list.1001560.n3.nabble.com/Custom-Aggregate-Function-for-DataFrame-tp22893.html>
>>> Sent from the Apache Spark User List mailing list archive
>>> <http://apache-spark-user-list.1001560.n3.nabble.com/> at Nabble.com.
>>>
>>
>>
>>
>> --
>> Best Regards,
>> Ayan Guha
>>
>
>
--
Best Regards,
Ayan Guha