Okie dokie. So first, let's clarify and simplify the problem a little,
especially to ensure that I know what is going on.

Let's first just focus on a particular class. This is ok since presumably
each class is independent. Now, we have user_id, start_time, and end_time
(start_time+duration). If I understand correctly, a user_id should be
included up to end_time+30s, since this is a 30s moving window. As such,
we'll just ignore that side of things for now, because you can just
transform people's start times accordingly. Further, the assumption is that
for a given user_id, you will not have overlapping start and end
times...you can have multiple entries, ie "user 1, start 1, end 3; user 1,
start 5, end 7;" but you can't have them in this form: "user 1, start 1,
end 3; user 1, start 2, end 4."

So we have simplified the question to this: given: user_id, start_time, and
end_time (which never overlap), how can I get a count of unique users for
every second? So now we will design a UDF to generate that output as a bag
of (time, # of people) pairs, for every second from min(start_time) to
max(end_time). The UDF will accept a bag sorted on the start time. Now, as
I write it it's going to be a simple evalfunc, but it should be an
accumulator. It's easy to make the transition.

Here is what you do. Initialize a PriorityQueue. The natural ordering for
int and long is fine, as it will ensure that when we poll it, we'll get the
earliest end time, which is what we want.

So step one is to pull the first tuple, and get the start_time and
end_time. The start time will set our time to start_time (which is
min(start_time) since it was sorted on start_time), and we add the end_time
to the priority queue. We have a counter "uniques" which we increment.

Now, before we actually do increment, we grab the next tuple. Why do you do
this instead of go to the next end time? Because we don't know if someone
starts in between now and the next end time. So we grab the tuple and get
its start and end time. Now there are two cases.

Case 1: the start time is less than the head of the priority queue, via a
peek. If this is the case, then we can safely increment up to the
start_time we just got, and then go from there. This is because it's
impossible for there to be a new end_time less than the start_time we just
got, because they are ordered by start_time and end_time>start_time. So we
add the new end_time, and then we increment our timer until we get to the
new start_time we just got, and add (timer,unique) at each step. When we
get to start_time, we unique++. Now we get the next tuple and repeat.

Case 2: the start time comes after the head of the priority queue, via a
peek. If this is the case, then we need to increment up to the current
head, emitting (timer,unique). Then when we get to the time_value equal to
that end_time, we unique--, and check again if the start_time comes before
than the head of the priority queue. Until it does, we repeat step 2. Once
it does, we do step 1.

I've attached a crude, untested UDF that does this. Buyer beware. But it
shows the general flow, and should be better than exploding the data (I
really hate exploding data like that unless it's absolutely necessary).

To use, generate some data, then...

register window.jar;
define window com.jcoveney.Window('30');
a = load 'data' using PigStorage(',') as (uid:long,start:long,end:long);
b = foreach (group a all) {
  ord = order a by start asc;
  generate flatten(window(ord));
}
dump b;

to generate data, I first did just  a small subsample just to think about
it, then I did (in python)

import random
f=open("data","w")
for i in range(0,1000000):
  v1=random.randint(1,10000000)
  v2=random.randint(1,10000000)
  start=min(v1,v2)
  stop=max(v1,v2)
  print >>f,"%i,%i,%i" % (i,start,stop)

If this function is at all useful, I can clean it up and put in in the
piggybank. Let me know if the logic doesn't make sense, or if it isn't
quite what you had in mind.

Jon

2011/11/1 Marco Cadetg <[email protected]>

> Thanks again for all your comments.
>
> Jonathan, would you mind to enlighten me on the way you would keep track
> of the
> people you need to "eject". I don't get the min heap based tuple...
>
> Cheers
> -Marco
>
>
> On Mon, Oct 31, 2011 at 6:15 PM, Jonathan Coveney <[email protected]>wrote:
>
>> Perhaps I'm misunderstanding your use case, and this depends on the amount
>> of data, but you could consider something like this (to avoid exploding
>> the
>> data, which could perhaps be inavoidable but I hate resorting to that if I
>> don't have to).
>>
>> a = foreach yourdata generate student_id, start_time, start_time+duration
>> as end_time, course;
>> b = group a by course;
>> c = foreach b {
>>  ord = order a by start_time;
>>  generate yourudf.process(ord);
>> }
>>
>> Here is generally what process could do. It would be an accumulator UDF
>> that expected tuples sorted on start_time. Now you basically need a way to
>> know who the distinct users are. Now, since you want 30s windows, your
>> first window will presumably be 30s after the first start_time in your
>> data, and you would just tick ahead in 1s and write to a bag which would
>> have second, # of distinct student_ids. To know when to eject people, you
>> could have any number of data structures... perhaps a min heap based on
>> end_time, and of course instead of "ticking" ahead, you would grab a new
>> tuple (since this is the only thing that would change the state of the #
>> of
>> distinct ids), and then do all of the ticking ahead as you adjust the heap
>> and write the seconds in between the current time pointer and the
>> start_time of the new tuple, making sure in each step to check against the
>> min heap to eject any users that expired.
>>
>> That was a little rambly, I could quickly put together some more
>> reasonable
>> pseudocode if that would help. I think the general idea is clear though...
>>
>> 2011/10/31 Guy Bayes <[email protected]>
>>
>> > ahh TV that explains it
>> >
>> > 12G data file is a bit too big for R unless you sample, not sure if the
>> use
>> > case is conducive to sampling?
>> >
>> > If it is, could sample it down and structure in pig/hadoop and then
>> load it
>> > into the analytical/visualization tool of choice...
>> >
>> > Guy
>> >
>> > On Mon, Oct 31, 2011 at 8:55 AM, Marco Cadetg <[email protected]> wrote:
>> >
>> > > The data is not about students but about television ;) Regarding the
>> > size.
>> > > The raw input data size is about 150m although when I 'explode' the
>> > > timeseries
>> > > it will be around 80x bigger. I guess the average user duration will
>> be
>> > > around
>> > > 40 Minutes which means when sampling it at a 30s interval will
>> increase
>> > the
>> > > size by ~12GB.
>> > >
>> > > I think that is a size which my hadoop cluster with five 8-core x 8GB
>> x
>> > 2TB
>> > > HD
>> > > should be able to cope with.
>> > >
>> > > I don't know about R. Are you able to handle 12Gb
>> > > files well in R (off course it depends on your computer so assume an
>> > > average business computer e.g. 2-core 2GHz 4GB ram)?
>> > >
>> > > Cheers
>> > > -Marco
>> > >
>> > > On Fri, Oct 28, 2011 at 5:02 PM, Guy Bayes <[email protected]>
>> > wrote:
>> > >
>> > > > if it fits in R, it's trivial, draw a density plot or a histogram,
>> > about
>> > > > three lines of R code
>> > > >
>> > > > why I was wondering about the data volume.
>> > > >
>> > > > His example is students attending classes, if  that is really the
>> data
>> > > hard
>> > > > to believe it's super huge?
>> > > >
>> > > > Guy
>> > > >
>> > > > On Fri, Oct 28, 2011 at 6:12 AM, Norbert Burger <
>> > > [email protected]
>> > > > >wrote:
>> > > >
>> > > > > Perhaps another way to approach this problem is to visualize it
>> > > > > geometrically.  You have a long series of class session instances,
>> > > where
>> > > > > each class session is like 1D line segment, beginning/stopping at
>> > some
>> > > > > start/end time.
>> > > > >
>> > > > > These segments naturally overlap, and I think the question you're
>> > > asking
>> > > > is
>> > > > > equivalent to finding the number of overlaps at every subsegment.
>> > > > >
>> > > > > To answer this, you want to first break every class session into a
>> > full
>> > > > > list
>> > > > > of subsegments, where a subsegment is created by "breaking" each
>> > class
>> > > > > session/segment into multiple parts at the start/end point of any
>> > other
>> > > > > class session.  You can create this full set of subsegments in one
>> > pass
>> > > > by
>> > > > > comparing pairwise (CROSS) each start/end point with your original
>> > list
>> > > > of
>> > > > > class sessions.
>> > > > >
>> > > > > Once you have the full list of "broken" segments, then a final
>> GROUP
>> > > > > BY/COUNT(*) will you give you the number of overlaps.  Seems like
>> > > > approach
>> > > > > would be faster than the previous approach if your class sessions
>> are
>> > > > very
>> > > > > long, or there are many overlaps.
>> > > > >
>> > > > > Norbert
>> > > > >
>> > > > > On Thu, Oct 27, 2011 at 4:05 PM, Guy Bayes <[email protected]
>> >
>> > > > wrote:
>> > > > >
>> > > > > > how big is your dataset?
>> > > > > >
>> > > > > > On Thu, Oct 27, 2011 at 9:23 AM, Marco Cadetg <[email protected]
>> >
>> > > > wrote:
>> > > > > >
>> > > > > > > Thanks Bill and Norbert that seems like what I was looking
>> for.
>> > > I'm a
>> > > > > bit
>> > > > > > > worried about
>> > > > > > > how much data/io this could create. But I'll see ;)
>> > > > > > >
>> > > > > > > Cheers
>> > > > > > > -Marco
>> > > > > > >
>> > > > > > > On Thu, Oct 27, 2011 at 6:03 PM, Norbert Burger <
>> > > > > > [email protected]
>> > > > > > > >wrote:
>> > > > > > >
>> > > > > > > > In case what you're looking for is an analysis over the full
>> > > > learning
>> > > > > > > > duration, and not just the start interval, then one further
>> > > insight
>> > > > > is
>> > > > > > > > that each original record can be transformed into a
>> sequence of
>> > > > > > > > records, where the size of the sequence corresponds to the
>> > > session
>> > > > > > > > duration.  In other words, you can use a UDF to "explode"
>> the
>> > > > > original
>> > > > > > > > record:
>> > > > > > > >
>> > > > > > > > 1,marco,1319708213,500,math
>> > > > > > > >
>> > > > > > > > into:
>> > > > > > > >
>> > > > > > > > 1,marco,1319708190,500,math
>> > > > > > > > 1,marco,1319708220,500,math
>> > > > > > > > 1,marco,1319708250,500,math
>> > > > > > > > 1,marco,1319708280,500,math
>> > > > > > > > 1,marco,1319708310,500,math
>> > > > > > > > 1,marco,1319708340,500,math
>> > > > > > > > 1,marco,1319708370,500,math
>> > > > > > > > 1,marco,1319708400,500,math
>> > > > > > > > 1,marco,1319708430,500,math
>> > > > > > > > 1,marco,1319708460,500,math
>> > > > > > > > 1,marco,1319708490,500,math
>> > > > > > > > 1,marco,1319708520,500,math
>> > > > > > > > 1,marco,1319708550,500,math
>> > > > > > > > 1,marco,1319708580,500,math
>> > > > > > > > 1,marco,1319708610,500,math
>> > > > > > > > 1,marco,1319708640,500,math
>> > > > > > > > 1,marco,1319708670,500,math
>> > > > > > > > 1,marco,1319708700,500,math
>> > > > > > > >
>> > > > > > > > and then use Bill's suggestion to group by course, interval.
>> > > > > > > >
>> > > > > > > > Norbert
>> > > > > > > >
>> > > > > > > > On Thu, Oct 27, 2011 at 11:05 AM, Bill Graham <
>> > > > [email protected]>
>> > > > > > > > wrote:
>> > > > > > > > > You can pass your time to a udf that rounds it down to the
>> > > > nearest
>> > > > > 30
>> > > > > > > > second
>> > > > > > > > > interval and then group by course, interval to get counts
>> for
>> > > > each
>> > > > > > > > course,
>> > > > > > > > > interval.
>> > > > > > > > >
>> > > > > > > > > On Thursday, October 27, 2011, Marco Cadetg <
>> > [email protected]>
>> > > > > > wrote:
>> > > > > > > > >> I have a problem where I don't know how or if pig is even
>> > > > suitable
>> > > > > > to
>> > > > > > > > > solve
>> > > > > > > > >> it.
>> > > > > > > > >>
>> > > > > > > > >> I have a schema like this:
>> > > > > > > > >>
>> > > > > > > > >> student-id,student-name,start-time,duration,course
>> > > > > > > > >> 1,marco,1319708213,500,math
>> > > > > > > > >> 2,ralf,1319708111,112,english
>> > > > > > > > >> 3,greg,1319708321,333,french
>> > > > > > > > >> 4,diva,1319708444,80,english
>> > > > > > > > >> 5,susanne,1319708123,2000,math
>> > > > > > > > >> 1,marco,1319708564,500,french
>> > > > > > > > >> 2,ralf,1319708789,123,french
>> > > > > > > > >> 7,fred,1319708213,5675,french
>> > > > > > > > >> 8,laura,1319708233,123,math
>> > > > > > > > >> 10,sab,1319708999,777,math
>> > > > > > > > >> 11,fibo,1319708789,565,math
>> > > > > > > > >> 6,dan,1319708456,50,english
>> > > > > > > > >> 9,marco,1319708123,60,english
>> > > > > > > > >> 12,bo,1319708456,345,math
>> > > > > > > > >> 1,marco,1319708789,673,math
>> > > > > > > > >> ...
>> > > > > > > > >> ...
>> > > > > > > > >>
>> > > > > > > > >> I would like to retrieve a graph (interpolation) over
>> time
>> > > > grouped
>> > > > > > by
>> > > > > > > > >> course. Meaning how many students are learning for a
>> course
>> > > > based
>> > > > > on
>> > > > > > a
>> > > > > > > > 30
>> > > > > > > > >> sec interval.
>> > > > > > > > >> The grouping by course is easy but from there I've no
>> clue
>> > > how I
>> > > > > > would
>> > > > > > > > >> achieve the rest. I guess the rest needs to be achieved
>> via
>> > > some
>> > > > > UDF
>> > > > > > > > >> or is there any way how to this in pig? I often think
>> that I
>> > > > need
>> > > > > a
>> > > > > > > "for
>> > > > > > > > >> loop" or something similar in pig.
>> > > > > > > > >>
>> > > > > > > > >> Thanks for your help!
>> > > > > > > > >> -Marco
>> > > > > > > > >>
>> > > > > > > > >
>> > > > > > > >
>> > > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>>
>
>

Reply via email to