Antoni, The problem in using last_value function() as far as I observed is, if I use it on multiple columns in a single query, its not retrieving results as expected.
Input: ID (Int) Date_Time (timestamp) A (Int) B (String) C (String) 1 0 1 NULL NULL 1 1 2 Hi NULL 1 3 4 Hello Hi 1 2 5 NULL NULL 1 4 NULL NULL Zero Expected Output: ID (Int) A (Int) B (String) C (String) 1 4 Hello Zero Query executed: select id, last_value(a) over(partition by id order by date_time desc) as a, last_value(b) over(partition by id order by date_time desc) as b, last_value(c) over(partition by id order by date_time desc) as c from udf_test where a is not null and b is not null and c is not null; Output I am getting: +----+---+-------+----+ | id | a | b | c | +----+---+-------+----+ | 1 | 4 | Hello | Hi || +----+---+-------+----+ Hopefully, I am clear with the problem above. Thanks, Ravi On 20 June 2017 at 22:05, Ravi Kanth <[email protected]> wrote: > Antoni, > > Thanks for the suggestion. Let me have a look at it and hopefully we can > use it in our use case. > > Thanks, > Ravi > > On Tue, Jun 20, 2017 at 21:53 Antoni Ivanov <[email protected]> wrote: > >> Hi Ravi, >> >> I am curious why you are not using already existing last_value function >> in Impala to get "latest non null value for the column” >> >> e.g >> last_value(column_a ignore nulls) over(partition by ID order by >> Date_Time) >> >> Thanks, >> Antoni >> >> >> >> >> On Jun 21, 2017, at 1:17 AM, Tim Armstrong <[email protected]> >> wrote: >> >> This was double-posted to http://community.cloudera.com/ >> t5/Interactive-Short-cycle-SQL/Creating-Impala-UDA/m-p/ >> 56201/highlight/false#M3073 >> <https://urldefense.proofpoint.com/v2/url?u=http-3A__community.cloudera.com_t5_Interactive-2DShort-2Dcycle-2DSQL_Creating-2DImpala-2DUDA_m-2Dp_56201_highlight_false-23M3073&d=DwMFaQ&c=uilaK90D4TOVoH58JNXRgQ&r=8j7tFznmoySfY9RiEAZHgvi3kzcbJ_Zy6Hp9HYX4dDE&m=hJl6J5oweVVguuh7TmOZhr9mMy6SvYUZ7kpQT_oTPY8&s=ySJ0TF-4OH9kZJebUfH8uNEFpwKylGthB9pmsRSfXJc&e=> >> also. I'll continue the discussion here. >> >> > Can we have the flexibility of declaring the variable globally in UDF? >> Globally, I mean outside the function? >> >> > And, the reason I am declaring a static variable is to restore the >> value of timestamp for every record so that I can perform a comparison of >> the timestamps. Is there an alternative approach for this? >> >> Updating a global or static variable in a UDAF is guaranteed not to do >> what you expect - the function can be invoked concurrently by multiple >> threads. >> >> It seems like you probably want to store some additional state in the >> intermediate value. There are some sample UDAs here (see Avg()) where >> additional intermediate state is stored in a StringVal: >> https://github.com/cloudera/impala-udf-samples/blob/ >> master/uda-sample.cc#L61 >> <https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_cloudera_impala-2Dudf-2Dsamples_blob_master_uda-2Dsample.cc-23L61&d=DwMFaQ&c=uilaK90D4TOVoH58JNXRgQ&r=8j7tFznmoySfY9RiEAZHgvi3kzcbJ_Zy6Hp9HYX4dDE&m=hJl6J5oweVVguuh7TmOZhr9mMy6SvYUZ7kpQT_oTPY8&s=5NvnQzufO43--_qRtdwsm7ukEoVM2Z3bwVv5R9HBM1Q&e=> >> >> On Tue, Jun 20, 2017 at 2:40 PM, Ravi Kanth <[email protected]> >> wrote: >> >>> Thanks Bharath. Can you check if the logic I am implementing is correct >>> or needed any modification in it as well? I am very new to Impala UDF & C++ >>> and having some hard time figuring out the problems. >>> >>> On 20 June 2017 at 14:27, Bharath Vissapragada <[email protected]> >>> wrote: >>> >>>> You need to allocate memory for tsTemp, else it can segfault. That >>>> could be the issue here. >>>> >>>> static TimestampVal* tsTemp; >>>> tsTemp->date = 0; >>>> tsTemp->time_of_day = 0; >>>> >>>> >>>> On Tue, Jun 20, 2017 at 2:15 PM, Ravi Kanth <[email protected]> >>>> wrote: >>>> >>>>> Hi All, >>>>> We are using Impala to do various processings in our systems. We have >>>>> a requirement recently, wherein we have to handle the updates on the >>>>> events >>>>> i.e, we have an 'e_update' table which has the partial updates received >>>>> for >>>>> various events. The fields that are not updated are being stored as NULL >>>>> values. >>>>> >>>>> >>>>> Ex: >>>>> >>>>> >>>>> ID (Int) Date_Time (timestamp) A (Int) B (String) C (String) >>>>> 1 0 1 NULL NULL >>>>> 1 1 2 Hi NULL >>>>> 1 3 4 Hello Hi >>>>> 1 2 5 NULL NULL >>>>> 1 4 NULL NULL Zero >>>>> >>>>> >>>>> P.S: Please consider Date_time as valid timestamp type values. For >>>>> easy understanding, mentioned them as 0,1,2,3,4,5 >>>>> >>>>> >>>>> As seen in the above table, the events have a unique id and as we get >>>>> an update to a particular event, we are storing the date_time at which >>>>> update has happened and also storing the partial updated values. Apart >>>>> from >>>>> the updated values, the rest are stored as NULL values. >>>>> >>>>> >>>>> We are planning to mimic inplace updates on the table, so that it >>>>> would retrieve the resulting table as follows using the query below: We >>>>> don't delete the data. >>>>> >>>>> >>>>> > SELECT id, current_val(A,date_time) as A, current_val(B,date_time) >>>>> as B, current_val(C,date_time) as C from e_update GROUP BY ID; >>>>> >>>>> >>>>> where, current_val is a custom impala UDA we are planning to >>>>> implement. i.e. get* latest non null value for the column.* >>>>> >>>>> >>>>> ID (Int) A (Int) B (String) C (String) >>>>> 1 4 Hello Zero >>>>> >>>>> >>>>> >>>>> >>>>> Implemented current_val UDA: >>>>> The below code is only for int type inputs: >>>>> >>>>> >>>>> uda-currentval.h >>>>> >>>>> //This is a sample for retrieving the current value of e_update table >>>>> // >>>>> void CurrentValueInit(FunctionContext* context, IntVal* val); >>>>> void CurrentValueUpdate(FunctionContext* context, const IntVal& input, >>>>> const TimestampVal& ts, IntVal* val); >>>>> void CurrentValueMerge(FunctionContext* context, const IntVal& src, >>>>> IntVal* dst); >>>>> IntVal CurrentValueFinalize(FunctionContext* context, const IntVal& val); >>>>> >>>>> uda-currentval.cc >>>>> >>>>> // >>>>> ----------------------------------------------------------------------------------------------- >>>>> // This is a sample for retrieving the current value of e_update table >>>>> //----------------------------------------------------------------------------------------------- >>>>> void CurrentValueInit(FunctionContext* context, IntVal* val) { >>>>> val->is_null = false; >>>>> val->val = 0; >>>>> } >>>>> >>>>> void CurrentValueUpdate(FunctionContext* context, const IntVal& input, >>>>> const TimestampVal& ts, IntVal* val) { >>>>> static TimestampVal* tsTemp; >>>>> tsTemp->date = 0; >>>>> tsTemp->time_of_day = 0; >>>>> if(tsTemp->date==0 && tsTemp->time_of_day==0){ >>>>> tsTemp->date = ts.date; >>>>> tsTemp->time_of_day = ts.time_of_day; >>>>> val->val = input.val; >>>>> return; >>>>> } >>>>> if(ts.date > tsTemp->date && ts.time_of_day > tsTemp->time_of_day){ >>>>> tsTemp->date = ts.date; >>>>> tsTemp->time_of_day = ts.time_of_day; >>>>> val->val = input.val; >>>>> return; >>>>> } >>>>> } >>>>> >>>>> void CurrentValueMerge(FunctionContext* context, const IntVal& src, >>>>> IntVal* dst) { >>>>> dst->val += src.val; >>>>> } >>>>> >>>>> IntVal CurrentValueFinalize(FunctionContext* context, const IntVal& val) { >>>>> return val; >>>>> } >>>>> >>>>> >>>>> We are able to build and create an aggregate function in impala, but >>>>> when trying to run the select query similar to the one above, it is >>>>> bringing down couple of impala deamons and throwing the error below and >>>>> getting terminated. >>>>> >>>>> >>>>> WARNINGS: Cancelled due to unreachable impalad(s): >>>>> hadoop102.**.**.**.com:22000 >>>>> >>>>> >>>>> >>>>> >>>>> We have impalad running on 14 instances. >>>>> >>>>> >>>>> Can someone help resolve us this problem and a better way to achieve a >>>>> solution for the scenario explained. >>>>> >>>> >>>> >>> >> >>
