Antoni, Thanks for the suggestion. Let me have a look at it and hopefully we can use it in our use case.
Thanks, Ravi On Tue, Jun 20, 2017 at 21:53 Antoni Ivanov <[email protected]> wrote: > Hi Ravi, > > I am curious why you are not using already existing last_value function in > Impala to get "latest non null value for the column” > > e.g > last_value(column_a ignore nulls) over(partition by ID order by Date_Time) > > Thanks, > Antoni > > > > > On Jun 21, 2017, at 1:17 AM, Tim Armstrong <[email protected]> > wrote: > > This was double-posted to > http://community.cloudera.com/t5/Interactive-Short-cycle-SQL/Creating-Impala-UDA/m-p/56201/highlight/false#M3073 > <https://urldefense.proofpoint.com/v2/url?u=http-3A__community.cloudera.com_t5_Interactive-2DShort-2Dcycle-2DSQL_Creating-2DImpala-2DUDA_m-2Dp_56201_highlight_false-23M3073&d=DwMFaQ&c=uilaK90D4TOVoH58JNXRgQ&r=8j7tFznmoySfY9RiEAZHgvi3kzcbJ_Zy6Hp9HYX4dDE&m=hJl6J5oweVVguuh7TmOZhr9mMy6SvYUZ7kpQT_oTPY8&s=ySJ0TF-4OH9kZJebUfH8uNEFpwKylGthB9pmsRSfXJc&e=> > also. I'll continue the discussion here. > > > Can we have the flexibility of declaring the variable globally in UDF? > Globally, I mean outside the function? > > > And, the reason I am declaring a static variable is to restore the value > of timestamp for every record so that I can perform a comparison of the > timestamps. Is there an alternative approach for this? > > Updating a global or static variable in a UDAF is guaranteed not to do > what you expect - the function can be invoked concurrently by multiple > threads. > > It seems like you probably want to store some additional state in the > intermediate value. There are some sample UDAs here (see Avg()) where > additional intermediate state is stored in a StringVal: > https://github.com/cloudera/impala-udf-samples/blob/master/uda-sample.cc#L61 > <https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_cloudera_impala-2Dudf-2Dsamples_blob_master_uda-2Dsample.cc-23L61&d=DwMFaQ&c=uilaK90D4TOVoH58JNXRgQ&r=8j7tFznmoySfY9RiEAZHgvi3kzcbJ_Zy6Hp9HYX4dDE&m=hJl6J5oweVVguuh7TmOZhr9mMy6SvYUZ7kpQT_oTPY8&s=5NvnQzufO43--_qRtdwsm7ukEoVM2Z3bwVv5R9HBM1Q&e=> > > On Tue, Jun 20, 2017 at 2:40 PM, Ravi Kanth <[email protected]> > wrote: > >> Thanks Bharath. Can you check if the logic I am implementing is correct >> or needed any modification in it as well? I am very new to Impala UDF & C++ >> and having some hard time figuring out the problems. >> >> On 20 June 2017 at 14:27, Bharath Vissapragada <[email protected]> >> wrote: >> >>> You need to allocate memory for tsTemp, else it can segfault. That could >>> be the issue here. >>> >>> static TimestampVal* tsTemp; >>> tsTemp->date = 0; >>> tsTemp->time_of_day = 0; >>> >>> >>> On Tue, Jun 20, 2017 at 2:15 PM, Ravi Kanth <[email protected]> >>> wrote: >>> >>>> Hi All, >>>> We are using Impala to do various processings in our systems. We have a >>>> requirement recently, wherein we have to handle the updates on the events >>>> i.e, we have an 'e_update' table which has the partial updates received for >>>> various events. The fields that are not updated are being stored as NULL >>>> values. >>>> >>>> >>>> Ex: >>>> >>>> >>>> ID (Int) Date_Time (timestamp) A (Int) B (String) C (String) >>>> 1 0 1 NULL NULL >>>> 1 1 2 Hi NULL >>>> 1 3 4 Hello Hi >>>> 1 2 5 NULL NULL >>>> 1 4 NULL NULL Zero >>>> >>>> >>>> P.S: Please consider Date_time as valid timestamp type values. For easy >>>> understanding, mentioned them as 0,1,2,3,4,5 >>>> >>>> >>>> As seen in the above table, the events have a unique id and as we get >>>> an update to a particular event, we are storing the date_time at which >>>> update has happened and also storing the partial updated values. Apart from >>>> the updated values, the rest are stored as NULL values. >>>> >>>> >>>> We are planning to mimic inplace updates on the table, so that it would >>>> retrieve the resulting table as follows using the query below: We don't >>>> delete the data. >>>> >>>> >>>> > SELECT id, current_val(A,date_time) as A, current_val(B,date_time) as >>>> B, current_val(C,date_time) as C from e_update GROUP BY ID; >>>> >>>> >>>> where, current_val is a custom impala UDA we are planning to implement. >>>> i.e. get* latest non null value for the column.* >>>> >>>> >>>> ID (Int) A (Int) B (String) C (String) >>>> 1 4 Hello Zero >>>> >>>> >>>> >>>> >>>> Implemented current_val UDA: >>>> The below code is only for int type inputs: >>>> >>>> >>>> uda-currentval.h >>>> >>>> //This is a sample for retrieving the current value of e_update table >>>> // >>>> void CurrentValueInit(FunctionContext* context, IntVal* val); >>>> void CurrentValueUpdate(FunctionContext* context, const IntVal& input, >>>> const TimestampVal& ts, IntVal* val); >>>> void CurrentValueMerge(FunctionContext* context, const IntVal& src, >>>> IntVal* dst); >>>> IntVal CurrentValueFinalize(FunctionContext* context, const IntVal& val); >>>> >>>> uda-currentval.cc >>>> >>>> // >>>> ----------------------------------------------------------------------------------------------- >>>> // This is a sample for retrieving the current value of e_update table >>>> //----------------------------------------------------------------------------------------------- >>>> void CurrentValueInit(FunctionContext* context, IntVal* val) { >>>> val->is_null = false; >>>> val->val = 0; >>>> } >>>> >>>> void CurrentValueUpdate(FunctionContext* context, const IntVal& input, >>>> const TimestampVal& ts, IntVal* val) { >>>> static TimestampVal* tsTemp; >>>> tsTemp->date = 0; >>>> tsTemp->time_of_day = 0; >>>> if(tsTemp->date==0 && tsTemp->time_of_day==0){ >>>> tsTemp->date = ts.date; >>>> tsTemp->time_of_day = ts.time_of_day; >>>> val->val = input.val; >>>> return; >>>> } >>>> if(ts.date > tsTemp->date && ts.time_of_day > tsTemp->time_of_day){ >>>> tsTemp->date = ts.date; >>>> tsTemp->time_of_day = ts.time_of_day; >>>> val->val = input.val; >>>> return; >>>> } >>>> } >>>> >>>> void CurrentValueMerge(FunctionContext* context, const IntVal& src, >>>> IntVal* dst) { >>>> dst->val += src.val; >>>> } >>>> >>>> IntVal CurrentValueFinalize(FunctionContext* context, const IntVal& val) { >>>> return val; >>>> } >>>> >>>> >>>> We are able to build and create an aggregate function in impala, but >>>> when trying to run the select query similar to the one above, it is >>>> bringing down couple of impala deamons and throwing the error below and >>>> getting terminated. >>>> >>>> >>>> WARNINGS: Cancelled due to unreachable impalad(s): >>>> hadoop102.**.**.**.com:22000 >>>> >>>> >>>> >>>> >>>> We have impalad running on 14 instances. >>>> >>>> >>>> Can someone help resolve us this problem and a better way to achieve a >>>> solution for the scenario explained. >>>> >>> >>> >> > >
