Hi,

Please try this:

1. I used a tab-separated input file as follows:

cheolsoo@localhost:~/workspace/pig-svn $cat tag_count_ts_pro_userpair
('a','b','c','d') 3 {('2012-03-04 10:10:10'),('2013-03-04 10:10:11')}

2. My udf is as follows:

import datetime

@outputSchema("days_from_start:bag{t:tuple(cnt:int)}")
def daysFromStart(startDate, aBagOfDates):
        if aBagOfDates is None: return None
        result=[]
        for someDate in aBagOfDates:
            if someDate is None: continue
            someDate = ''.join(someDate)
            if len(someDate)==21: result.append(diffTime(startDate,
someDate))
        return result

@outputSchema("diff:int")
def diffTime(dateFrom, dateTil):
    dateSmall = datetime.datetime.strptime(dateFrom, "%Y-%m-%d %H:%M:%S")
    dateBig = datetime.datetime.strptime(dateTil[1:-1], "%Y-%m-%d %H:%M:%S")
    delta = dateBig - dateSmall
    return delta.days

3. My pig script is as follows:

register 'udf.py' using jython as moins;

x = load 'tag_count_ts_pro_userpair' using PigStorage('\t') as (group:(),
cnt:int, times:{(chararray)});
y = foreach x generate *, moins.daysFromStart('2011-06-01 00:00:00', times);
dump y;

This returns:

(('a','b','c','d'),3,{('2012-03-04 10:10:10'),('2013-03-04
10:10:11')},{(277),(642)})

Thanks,
Cheolsoo

On Mon, Oct 1, 2012 at 7:42 AM, Björn-Elmar Macek <[email protected]>wrote:

> Hi,
>
> i am currently writing a PIG script that works with a bags of timestamp
> tuples. So i am basically working on a datastructure like this:
> (tuple(chararray)), int, bag{tuple(chararray)})
>
> for example:
> ( ('a','b','c','d'), 3, {('2012-03-04 10:10:10'), ('2012-03-04 10:10:11')}
> )
>
> When loading the data i add a schema, so pig knows what data is coming in:
> x = load 'tag_count_ts_pro_userpair' as (group:tuple(),cnt:int,times:**
> bag{});
>
> I then want to change the content of the times-bag, by replacing every
> timestamp with an integer, based on the time distance to a certain date,
> which i do with the follwing UDFs:
> ###### myUDF.py ##############
> from org.apache.pig.scripting import *
> import datetime
> import math
>
>
> @outputSchema("days_from_**start:bag{t:tuple(cnt:int)}")
> def daysFromStart(startDate, aBagOfDates):
>         if aBagOfDates is None: return None;
>         result=[]
>         for somedate in aBagOfDates:
>             if somedate is None: continue
>             aDateString = ''.join(somedate)
>             #ALTERNATIVELY I USED ALSO: aDateString = ''.join(somedate[0])
> // aDateString = ''.join(somedate[1])
>             if len(aDateString==16): result.append(diffTime(**startDate,
> aDateString))
>         return result
>
>
> @outputSchema("diff:int")
> def diffTime(dateFrom,dateTil):
>     dateSmall = datetime.datetime.strptime(**dateFrom,"%Y-%m-%d
> %H:%M:%S");
>     dateBig = datetime.datetime.strptime(**dateTil,"%Y-%m-%d %H:%M:%S");
>     delta = dateBig-dateSmall
>     return delta.days
>
> ##########################
>
> I do this by executing the following command in the grunt:
> y = foreach x generate *, moins.daysFromStart('2011-06-**01 00:00:00',
> times);
>
> But when i try to store y, i get the following error message:
>
> ######## LOG #############
> 2012-10-01 16:35:03,499 [main] ERROR 
> org.apache.pig.tools.pigstats.**SimplePigStats
> - ERROR 2997: Unable to recreate exception from backed error:
> org.apache.pig.backend.**executionengine.ExecException: ERROR 0: Error
> executing function
>     at org.apache.pig.scripting.**jython.JythonFunction.exec(**
> JythonFunction.java:106)
>     at org.apache.pig.backend.hadoop.**executionengine.physicalLayer.**
> expressionOperators.**POUserFunc.getNext(POUserFunc.**java:216)
>     at org.apache.pig.backend.hadoop.**executionengine.physicalLayer.**
> expressionOperators.**POUserFunc.getNext(POUserFunc.**java:258)
>     at org.apache.pig.backend.hadoop.**executionengine.physicalLayer.**
> PhysicalOperator.getNext(**PhysicalOperator.java:316)
>     at org.apache.pig.backend.hadoop.**executionengine.physicalLayer.**
> relationalOperators.POForEach.**processPlan(POForEach.java:**332)
>     at org.apache.pig.backend.hadoop.**executionengine.physicalLayer.**
> relationalOperators.POForEach.**getNext(POForEach.java:284)
>     at org.apache.pig.backend.hadoop.**executionengine.**mapReduceLayer.**
> PigGenericMapBase.runPipeline(**PigGenericMapBase.java:271)
>     at org.apache.pig.backend.hadoop.**executionengine.**mapReduceLayer.**
> PigGenericMapBase.map(**PigGenericMapBase.java:266)
>     at org.apache.pig.backend.hadoop.**executionengine.**mapReduceLayer.**
> PigGenericMapBase.map(**PigGenericMapBase.java:64)
>     at org.apache.hadoop.mapreduce.**Mapper.run(Mapper.java:144)
>     at org.apache.hadoop.mapred.**MapTask.runNewMapper(MapTask.**java:764)
>     at org.apache.hadoop.mapred.**MapTask.run(MapTask.java:370)
>     at org.apache.hadoop.mapred.**Child$4.run(Child.java:255)
>     at java.security.**AccessController.doPrivileged(**Native Method)
>     at javax.security.auth.Subject.**doAs(Subject.java:415)
>     at org.apache.hadoop.security.**UserGroupInformation.doAs(**
> UserGroupInformation.java:**1121)
>     at org.apache.hadoop.mapred.**Child.main(Child.java:249)
> Caused by: Traceback (most recent call last):
>   File "/gpfs/home02/fb16/bmacek/**myUDF.py", line 38, in daysFromStart
>     aDateString = ''.join(somedate[0])
> TypeError: sequence item 0: expected string, int found
>
>     at org.python.core.Py.TypeError(**Py.java:235)
>     at org.python.core.PyString.str_**join(PyString.java:1900)
>     at org.python.core.PyString$str_**join_exposer.__call__(Unknown
> Source)
>     at org.python.core.PyObject.__**call__(PyObject.java:391)
>     at org.python.pycode._pyx3.**daysFromStart$3(/gpfs/home02/**
> fb16/bmacek/myUDF.py:40)
>     at org.python.pycode._pyx3.call_**function(/gpfs/home02/fb16/**
> bmacek/myUDF.py)
>     at org.python.core.PyTableCode.**call(PyTableCode.java:165)
>     at org.python.core.PyBaseCode.**call(PyBaseCode.java:301)
>     at org.python.core.PyFunction.**function___call__(PyFunction.**
> java:376)
>     at org.python.core.PyFunction.__**call__(PyFunction.java:371)
>     at org.python.core.PyFunction.__**call__(PyFunction.java:361)
>     at org.python.core.PyFunction.__**call__(PyFunction.java:356)
>     at org.apache.pig.scripting.**jython.JythonFunction.exec(**
> JythonFunction.java:103)
>     ... 16 more
> ##############################
>
> Depending on the line...
> ---
> aDateString = ''.join(somedate)
>             #ALTERNATIVELY I USED ALSO: aDateString = ''.join(somedate[0])
> // aDateString = ''.join(somedate[1])
> ---
> ... i get different error messages: when i use index 0, the error msg
> above is given, if i use 1 it is outofbounds, and if i omit the
> squarebrackets, the error message says:
>
> 2012-10-01 16:40:46,280 [main] ERROR 
> org.apache.pig.tools.pigstats.**SimplePigStats
> - ERROR 2997: Unable to recreate exception from backed error:
> org.apache.pig.backend.**executionengine.ExecException: ERROR 0: Error
> executing function
>     at org.apache.pig.scripting.**jython.JythonFunction.exec(**
> JythonFunction.java:106)
>     at org.apache.pig.backend.hadoop.**executionengine.physicalLayer.**
> expressionOperators.**POUserFunc.getNext(POUserFunc.**java:216)
>     at org.apache.pig.backend.hadoop.**executionengine.physicalLayer.**
> expressionOperators.**POUserFunc.getNext(POUserFunc.**java:258)
>     at org.apache.pig.backend.hadoop.**executionengine.physicalLayer.**
> PhysicalOperator.getNext(**PhysicalOperator.java:316)
>     at org.apache.pig.backend.hadoop.**executionengine.physicalLayer.**
> relationalOperators.POForEach.**processPlan(POForEach.java:**332)
>     at org.apache.pig.backend.hadoop.**executionengine.physicalLayer.**
> relationalOperators.POForEach.**getNext(POForEach.java:284)
>     at org.apache.pig.backend.hadoop.**executionengine.**mapReduceLayer.**
> PigGenericMapBase.runPipeline(**PigGenericMapBase.java:271)
>     at org.apache.pig.backend.hadoop.**executionengine.**mapReduceLayer.**
> PigGenericMapBase.map(**PigGenericMapBase.java:266)
>     at org.apache.pig.backend.hadoop.**executionengine.**mapReduceLayer.**
> PigGenericMapBase.map(**PigGenericMapBase.java:64)
>     at org.apache.hadoop.mapreduce.**Mapper.run(Mapper.java:144)
>     at org.apache.hadoop.mapred.**MapTask.runNewMapper(MapTask.**java:764)
>     at org.apache.hadoop.mapred.**MapTask.run(MapTask.java:370)
>     at org.apache.hadoop.mapred.**Child$4.run(Child.java:255)
>     at java.security.**AccessController.doPrivileged(**Native Method)
>     at javax.security.auth.Subject.**doAs(Subject.java:415)
>     at org.apache.hadoop.security.**UserGroupInformation.doAs(**
> UserGroupInformation.java:**1121)
>     at org.apache.hadoop.mapred.**Child.main(Child.java:249)
> Caused by: Traceback (most recent call last):
>   File "/gpfs/home02/fb16/bmacek/**myUDF.py", line 38, in daysFromStart
>     aDateString = ''.join(somedate)
> TypeError: sequence item 0: expected string, array.array found
>
>     at org.python.core.Py.TypeError(**Py.java:235)
>     at org.python.core.PyString.str_**join(PyString.java:1900)
>     at org.python.core.PyString$str_**join_exposer.__call__(Unknown
> Source)
>     at org.python.core.PyObject.__**call__(PyObject.java:391)
>     at org.python.pycode._pyx3.**daysFromStart$3(/gpfs/home02/**
> fb16/bmacek/myUDF.py:40)
>     at org.python.pycode._pyx3.call_**function(/gpfs/home02/fb16/**
> bmacek/myUDF.py)
>     at org.python.core.PyTableCode.**call(PyTableCode.java:165)
>     at org.python.core.PyBaseCode.**call(PyBaseCode.java:301)
>     at org.python.core.PyFunction.**function___call__(PyFunction.**
> java:376)
>     at org.python.core.PyFunction.__**call__(PyFunction.java:371)
>     at org.python.core.PyFunction.__**call__(PyFunction.java:361)
>     at org.python.core.PyFunction.__**call__(PyFunction.java:356)
>     at org.apache.pig.scripting.**jython.JythonFunction.exec(**
> JythonFunction.java:103)
>     ... 16 more
>
>
>
> Can anybody please tell me, what i am doing wrong here?
>
> Thanks for your time and help in advance!
> Björn
>
>
>

Reply via email to