Hello,
I've got some activity data that I've processed with Pig to generate a
sequence of bags, one per user, that each contain a set of tuples of the
form (timestamp, activity id) that are ordered in time.
>From each bag, I would like to produce a new bag of k-tuples where each
k-tuple contains a consecutive sequence of k activities from the original
ordered bag. For my initial stab at this, I wrote the following Jython UDF:
@outputSchemaFunction("schema")
def k_tuple_expansion(activities,k):
"""Scans through a time ordered bag of tuples of the form
(timestamp,activity id)
for a given user and returns a bag of k-tuples of all activity
sequences of length k.
"""
tups = []
for i in xrange(k-1,len(activities)):
actseq = [activities[i-j][1] for j in range(k-1,-1,-1)]
tups.append(tuple(actseq))
return tups
@schemaFunction("schema")
def schema(input):
# Return whatever type we were handed
return input
This code works appropriately. Problems arise when I try to process these
results further in Pig. Given I'm not specifying a static output schema,
since it is a function of k, Pig doesn't readily know what is being
returned.
My attempt to flatten each user bag of k-tuples is failing.
Given I could construct a string representation of the output schema once k
is known, is there some way to construct the string and pass it back? My
use of the schema function above follows the only example I've seen here.
https://cwiki.apache.org/PIG/udfsusingscriptinglanguages.html
If the Jython UDF approach is not the best, is there a native Pig approach
to attacking this problem?
Any pointers would be most appreciated!
Chris