Just a heads up: I have a cleaner version (with tests!) here: https://issues.apache.org/jira/browse/PIG-2364
If you're still using this, I heavily suggest using the new version. 2011/11/4 Marco Cadetg <[email protected]> > Yeha, that is awesome. Thank you very much Jonathan. > -Marco > > On Wed, Nov 2, 2011 at 7:52 PM, Jonathan Coveney <[email protected]> > wrote: > > > I'll make it less hideous and submit a patch this weekend, then :) > > > > 2011/11/2 Ashutosh Chauhan <[email protected]> > > > > > Hey Jon, > > > > > > Your windowing udf will be very useful outside of this particular > > usecase. > > > It will be great if you can contribute it to PiggyBank. > > > > > > Thanks, > > > Ashutosh > > > > > > On Tue, Nov 1, 2011 at 10:44, Jonathan Coveney <[email protected]> > > wrote: > > > > > > > Okie dokie. So first, let's clarify and simplify the problem a > little, > > > > especially to ensure that I know what is going on. > > > > > > > > Let's first just focus on a particular class. This is ok since > > presumably > > > > each class is independent. Now, we have user_id, start_time, and > > end_time > > > > (start_time+duration). If I understand correctly, a user_id should be > > > > included up to end_time+30s, since this is a 30s moving window. As > > such, > > > > we'll just ignore that side of things for now, because you can just > > > > transform people's start times accordingly. Further, the assumption > is > > > that > > > > for a given user_id, you will not have overlapping start and end > > > > times...you can have multiple entries, ie "user 1, start 1, end 3; > user > > > 1, > > > > start 5, end 7;" but you can't have them in this form: "user 1, start > > 1, > > > > end 3; user 1, start 2, end 4." > > > > > > > > So we have simplified the question to this: given: user_id, > start_time, > > > > and end_time (which never overlap), how can I get a count of unique > > users > > > > for every second? So now we will design a UDF to generate that output > > as > > > a > > > > bag of (time, # of people) pairs, for every second from > min(start_time) > > > to > > > > max(end_time). The UDF will accept a bag sorted on the start time. > Now, > > > as > > > > I write it it's going to be a simple evalfunc, but it should be an > > > > accumulator. It's easy to make the transition. > > > > > > > > Here is what you do. Initialize a PriorityQueue. The natural ordering > > for > > > > int and long is fine, as it will ensure that when we poll it, we'll > get > > > the > > > > earliest end time, which is what we want. > > > > > > > > So step one is to pull the first tuple, and get the start_time and > > > > end_time. The start time will set our time to start_time (which is > > > > min(start_time) since it was sorted on start_time), and we add the > > > end_time > > > > to the priority queue. We have a counter "uniques" which we > increment. > > > > > > > > Now, before we actually do increment, we grab the next tuple. Why do > > you > > > > do this instead of go to the next end time? Because we don't know if > > > > someone starts in between now and the next end time. So we grab the > > tuple > > > > and get its start and end time. Now there are two cases. > > > > > > > > Case 1: the start time is less than the head of the priority queue, > > via a > > > > peek. If this is the case, then we can safely increment up to the > > > > start_time we just got, and then go from there. This is because it's > > > > impossible for there to be a new end_time less than the start_time we > > > just > > > > got, because they are ordered by start_time and end_time>start_time. > So > > > we > > > > add the new end_time, and then we increment our timer until we get to > > the > > > > new start_time we just got, and add (timer,unique) at each step. When > > we > > > > get to start_time, we unique++. Now we get the next tuple and repeat. > > > > > > > > Case 2: the start time comes after the head of the priority queue, > via > > a > > > > peek. If this is the case, then we need to increment up to the > current > > > > head, emitting (timer,unique). Then when we get to the time_value > equal > > > to > > > > that end_time, we unique--, and check again if the start_time comes > > > before > > > > than the head of the priority queue. Until it does, we repeat step 2. > > > Once > > > > it does, we do step 1. > > > > > > > > I've attached a crude, untested UDF that does this. Buyer beware. But > > it > > > > shows the general flow, and should be better than exploding the data > (I > > > > really hate exploding data like that unless it's absolutely > necessary). > > > > > > > > To use, generate some data, then... > > > > > > > > register window.jar; > > > > define window com.jcoveney.Window('30'); > > > > a = load 'data' using PigStorage(',') as > > (uid:long,start:long,end:long); > > > > b = foreach (group a all) { > > > > ord = order a by start asc; > > > > generate flatten(window(ord)); > > > > } > > > > dump b; > > > > > > > > to generate data, I first did just a small subsample just to think > > about > > > > it, then I did (in python) > > > > > > > > import random > > > > f=open("data","w") > > > > for i in range(0,1000000): > > > > v1=random.randint(1,10000000) > > > > v2=random.randint(1,10000000) > > > > start=min(v1,v2) > > > > stop=max(v1,v2) > > > > print >>f,"%i,%i,%i" % (i,start,stop) > > > > > > > > If this function is at all useful, I can clean it up and put in in > the > > > > piggybank. Let me know if the logic doesn't make sense, or if it > isn't > > > > quite what you had in mind. > > > > > > > > Jon > > > > > > > > > > > > 2011/11/1 Marco Cadetg <[email protected]> > > > > > > > >> Thanks again for all your comments. > > > >> > > > >> Jonathan, would you mind to enlighten me on the way you would keep > > track > > > >> of the > > > >> people you need to "eject". I don't get the min heap based tuple... > > > >> > > > >> Cheers > > > >> -Marco > > > >> > > > >> > > > >> On Mon, Oct 31, 2011 at 6:15 PM, Jonathan Coveney < > [email protected] > > > >wrote: > > > >> > > > >>> Perhaps I'm misunderstanding your use case, and this depends on the > > > >>> amount > > > >>> of data, but you could consider something like this (to avoid > > exploding > > > >>> the > > > >>> data, which could perhaps be inavoidable but I hate resorting to > that > > > if > > > >>> I > > > >>> don't have to). > > > >>> > > > >>> a = foreach yourdata generate student_id, start_time, > > > start_time+duration > > > >>> as end_time, course; > > > >>> b = group a by course; > > > >>> c = foreach b { > > > >>> ord = order a by start_time; > > > >>> generate yourudf.process(ord); > > > >>> } > > > >>> > > > >>> Here is generally what process could do. It would be an accumulator > > UDF > > > >>> that expected tuples sorted on start_time. Now you basically need a > > way > > > >>> to > > > >>> know who the distinct users are. Now, since you want 30s windows, > > your > > > >>> first window will presumably be 30s after the first start_time in > > your > > > >>> data, and you would just tick ahead in 1s and write to a bag which > > > would > > > >>> have second, # of distinct student_ids. To know when to eject > people, > > > you > > > >>> could have any number of data structures... perhaps a min heap > based > > on > > > >>> end_time, and of course instead of "ticking" ahead, you would grab > a > > > new > > > >>> tuple (since this is the only thing that would change the state of > > the > > > # > > > >>> of > > > >>> distinct ids), and then do all of the ticking ahead as you adjust > the > > > >>> heap > > > >>> and write the seconds in between the current time pointer and the > > > >>> start_time of the new tuple, making sure in each step to check > > against > > > >>> the > > > >>> min heap to eject any users that expired. > > > >>> > > > >>> That was a little rambly, I could quickly put together some more > > > >>> reasonable > > > >>> pseudocode if that would help. I think the general idea is clear > > > >>> though... > > > >>> > > > >>> 2011/10/31 Guy Bayes <[email protected]> > > > >>> > > > >>> > ahh TV that explains it > > > >>> > > > > >>> > 12G data file is a bit too big for R unless you sample, not sure > if > > > >>> the use > > > >>> > case is conducive to sampling? > > > >>> > > > > >>> > If it is, could sample it down and structure in pig/hadoop and > then > > > >>> load it > > > >>> > into the analytical/visualization tool of choice... > > > >>> > > > > >>> > Guy > > > >>> > > > > >>> > On Mon, Oct 31, 2011 at 8:55 AM, Marco Cadetg <[email protected]> > > > >>> wrote: > > > >>> > > > > >>> > > The data is not about students but about television ;) > Regarding > > > the > > > >>> > size. > > > >>> > > The raw input data size is about 150m although when I 'explode' > > the > > > >>> > > timeseries > > > >>> > > it will be around 80x bigger. I guess the average user duration > > > will > > > >>> be > > > >>> > > around > > > >>> > > 40 Minutes which means when sampling it at a 30s interval will > > > >>> increase > > > >>> > the > > > >>> > > size by ~12GB. > > > >>> > > > > > >>> > > I think that is a size which my hadoop cluster with five > 8-core x > > > >>> 8GB x > > > >>> > 2TB > > > >>> > > HD > > > >>> > > should be able to cope with. > > > >>> > > > > > >>> > > I don't know about R. Are you able to handle 12Gb > > > >>> > > files well in R (off course it depends on your computer so > assume > > > an > > > >>> > > average business computer e.g. 2-core 2GHz 4GB ram)? > > > >>> > > > > > >>> > > Cheers > > > >>> > > -Marco > > > >>> > > > > > >>> > > On Fri, Oct 28, 2011 at 5:02 PM, Guy Bayes < > > [email protected]> > > > >>> > wrote: > > > >>> > > > > > >>> > > > if it fits in R, it's trivial, draw a density plot or a > > > histogram, > > > >>> > about > > > >>> > > > three lines of R code > > > >>> > > > > > > >>> > > > why I was wondering about the data volume. > > > >>> > > > > > > >>> > > > His example is students attending classes, if that is really > > the > > > >>> data > > > >>> > > hard > > > >>> > > > to believe it's super huge? > > > >>> > > > > > > >>> > > > Guy > > > >>> > > > > > > >>> > > > On Fri, Oct 28, 2011 at 6:12 AM, Norbert Burger < > > > >>> > > [email protected] > > > >>> > > > >wrote: > > > >>> > > > > > > >>> > > > > Perhaps another way to approach this problem is to > visualize > > it > > > >>> > > > > geometrically. You have a long series of class session > > > >>> instances, > > > >>> > > where > > > >>> > > > > each class session is like 1D line segment, > > beginning/stopping > > > at > > > >>> > some > > > >>> > > > > start/end time. > > > >>> > > > > > > > >>> > > > > These segments naturally overlap, and I think the question > > > you're > > > >>> > > asking > > > >>> > > > is > > > >>> > > > > equivalent to finding the number of overlaps at every > > > subsegment. > > > >>> > > > > > > > >>> > > > > To answer this, you want to first break every class session > > > into > > > >>> a > > > >>> > full > > > >>> > > > > list > > > >>> > > > > of subsegments, where a subsegment is created by "breaking" > > > each > > > >>> > class > > > >>> > > > > session/segment into multiple parts at the start/end point > of > > > any > > > >>> > other > > > >>> > > > > class session. You can create this full set of subsegments > > in > > > >>> one > > > >>> > pass > > > >>> > > > by > > > >>> > > > > comparing pairwise (CROSS) each start/end point with your > > > >>> original > > > >>> > list > > > >>> > > > of > > > >>> > > > > class sessions. > > > >>> > > > > > > > >>> > > > > Once you have the full list of "broken" segments, then a > > final > > > >>> GROUP > > > >>> > > > > BY/COUNT(*) will you give you the number of overlaps. > Seems > > > like > > > >>> > > > approach > > > >>> > > > > would be faster than the previous approach if your class > > > >>> sessions are > > > >>> > > > very > > > >>> > > > > long, or there are many overlaps. > > > >>> > > > > > > > >>> > > > > Norbert > > > >>> > > > > > > > >>> > > > > On Thu, Oct 27, 2011 at 4:05 PM, Guy Bayes < > > > >>> [email protected]> > > > >>> > > > wrote: > > > >>> > > > > > > > >>> > > > > > how big is your dataset? > > > >>> > > > > > > > > >>> > > > > > On Thu, Oct 27, 2011 at 9:23 AM, Marco Cadetg < > > > >>> [email protected]> > > > >>> > > > wrote: > > > >>> > > > > > > > > >>> > > > > > > Thanks Bill and Norbert that seems like what I was > > looking > > > >>> for. > > > >>> > > I'm a > > > >>> > > > > bit > > > >>> > > > > > > worried about > > > >>> > > > > > > how much data/io this could create. But I'll see ;) > > > >>> > > > > > > > > > >>> > > > > > > Cheers > > > >>> > > > > > > -Marco > > > >>> > > > > > > > > > >>> > > > > > > On Thu, Oct 27, 2011 at 6:03 PM, Norbert Burger < > > > >>> > > > > > [email protected] > > > >>> > > > > > > >wrote: > > > >>> > > > > > > > > > >>> > > > > > > > In case what you're looking for is an analysis over > the > > > >>> full > > > >>> > > > learning > > > >>> > > > > > > > duration, and not just the start interval, then one > > > further > > > >>> > > insight > > > >>> > > > > is > > > >>> > > > > > > > that each original record can be transformed into a > > > >>> sequence of > > > >>> > > > > > > > records, where the size of the sequence corresponds > to > > > the > > > >>> > > session > > > >>> > > > > > > > duration. In other words, you can use a UDF to > > "explode" > > > >>> the > > > >>> > > > > original > > > >>> > > > > > > > record: > > > >>> > > > > > > > > > > >>> > > > > > > > 1,marco,1319708213,500,math > > > >>> > > > > > > > > > > >>> > > > > > > > into: > > > >>> > > > > > > > > > > >>> > > > > > > > 1,marco,1319708190,500,math > > > >>> > > > > > > > 1,marco,1319708220,500,math > > > >>> > > > > > > > 1,marco,1319708250,500,math > > > >>> > > > > > > > 1,marco,1319708280,500,math > > > >>> > > > > > > > 1,marco,1319708310,500,math > > > >>> > > > > > > > 1,marco,1319708340,500,math > > > >>> > > > > > > > 1,marco,1319708370,500,math > > > >>> > > > > > > > 1,marco,1319708400,500,math > > > >>> > > > > > > > 1,marco,1319708430,500,math > > > >>> > > > > > > > 1,marco,1319708460,500,math > > > >>> > > > > > > > 1,marco,1319708490,500,math > > > >>> > > > > > > > 1,marco,1319708520,500,math > > > >>> > > > > > > > 1,marco,1319708550,500,math > > > >>> > > > > > > > 1,marco,1319708580,500,math > > > >>> > > > > > > > 1,marco,1319708610,500,math > > > >>> > > > > > > > 1,marco,1319708640,500,math > > > >>> > > > > > > > 1,marco,1319708670,500,math > > > >>> > > > > > > > 1,marco,1319708700,500,math > > > >>> > > > > > > > > > > >>> > > > > > > > and then use Bill's suggestion to group by course, > > > >>> interval. > > > >>> > > > > > > > > > > >>> > > > > > > > Norbert > > > >>> > > > > > > > > > > >>> > > > > > > > On Thu, Oct 27, 2011 at 11:05 AM, Bill Graham < > > > >>> > > > [email protected]> > > > >>> > > > > > > > wrote: > > > >>> > > > > > > > > You can pass your time to a udf that rounds it down > > to > > > >>> the > > > >>> > > > nearest > > > >>> > > > > 30 > > > >>> > > > > > > > second > > > >>> > > > > > > > > interval and then group by course, interval to get > > > >>> counts for > > > >>> > > > each > > > >>> > > > > > > > course, > > > >>> > > > > > > > > interval. > > > >>> > > > > > > > > > > > >>> > > > > > > > > On Thursday, October 27, 2011, Marco Cadetg < > > > >>> > [email protected]> > > > >>> > > > > > wrote: > > > >>> > > > > > > > >> I have a problem where I don't know how or if pig > is > > > >>> even > > > >>> > > > suitable > > > >>> > > > > > to > > > >>> > > > > > > > > solve > > > >>> > > > > > > > >> it. > > > >>> > > > > > > > >> > > > >>> > > > > > > > >> I have a schema like this: > > > >>> > > > > > > > >> > > > >>> > > > > > > > >> student-id,student-name,start-time,duration,course > > > >>> > > > > > > > >> 1,marco,1319708213,500,math > > > >>> > > > > > > > >> 2,ralf,1319708111,112,english > > > >>> > > > > > > > >> 3,greg,1319708321,333,french > > > >>> > > > > > > > >> 4,diva,1319708444,80,english > > > >>> > > > > > > > >> 5,susanne,1319708123,2000,math > > > >>> > > > > > > > >> 1,marco,1319708564,500,french > > > >>> > > > > > > > >> 2,ralf,1319708789,123,french > > > >>> > > > > > > > >> 7,fred,1319708213,5675,french > > > >>> > > > > > > > >> 8,laura,1319708233,123,math > > > >>> > > > > > > > >> 10,sab,1319708999,777,math > > > >>> > > > > > > > >> 11,fibo,1319708789,565,math > > > >>> > > > > > > > >> 6,dan,1319708456,50,english > > > >>> > > > > > > > >> 9,marco,1319708123,60,english > > > >>> > > > > > > > >> 12,bo,1319708456,345,math > > > >>> > > > > > > > >> 1,marco,1319708789,673,math > > > >>> > > > > > > > >> ... > > > >>> > > > > > > > >> ... > > > >>> > > > > > > > >> > > > >>> > > > > > > > >> I would like to retrieve a graph (interpolation) > > over > > > >>> time > > > >>> > > > grouped > > > >>> > > > > > by > > > >>> > > > > > > > >> course. Meaning how many students are learning > for a > > > >>> course > > > >>> > > > based > > > >>> > > > > on > > > >>> > > > > > a > > > >>> > > > > > > > 30 > > > >>> > > > > > > > >> sec interval. > > > >>> > > > > > > > >> The grouping by course is easy but from there I've > > no > > > >>> clue > > > >>> > > how I > > > >>> > > > > > would > > > >>> > > > > > > > >> achieve the rest. I guess the rest needs to be > > > achieved > > > >>> via > > > >>> > > some > > > >>> > > > > UDF > > > >>> > > > > > > > >> or is there any way how to this in pig? I often > > think > > > >>> that I > > > >>> > > > need > > > >>> > > > > a > > > >>> > > > > > > "for > > > >>> > > > > > > > >> loop" or something similar in pig. > > > >>> > > > > > > > >> > > > >>> > > > > > > > >> Thanks for your help! > > > >>> > > > > > > > >> -Marco > > > >>> > > > > > > > >> > > > >>> > > > > > > > > > > > >>> > > > > > > > > > > >>> > > > > > > > > > >>> > > > > > > > > >>> > > > > > > > >>> > > > > > > >>> > > > > > >>> > > > > >>> > > > >> > > > >> > > > > > > > > > >
