Thomas, The version of Impala we are using is 2.5.0 with CDH 5.7.0 and I see ignore nulls has been added in Impala 2.7.0. And, does adding ignore nulls would make a big difference in the expected result?
Ravi On 21 June 2017 at 11:20, Thomas Tauber-Marshall <[email protected]> wrote: > Ravi, > > Instead of using the "where ... is not null", have you tried > 'last_value(... ignore nulls)' > > On Wed, Jun 21, 2017 at 1:08 PM Ravi Kanth <[email protected]> > wrote: > >> Antoni, >> >> The problem in using last_value function() as far as I observed is, if I >> use it on multiple columns in a single query, its not retrieving results as >> expected. >> >> Input: >> ID (Int) Date_Time (timestamp) A (Int) B (String) C (String) >> 1 0 1 NULL NULL >> 1 1 2 Hi NULL >> 1 3 4 Hello Hi >> 1 2 5 NULL NULL >> 1 4 NULL NULL Zero >> >> Expected Output: >> >> >> ID (Int) A (Int) B (String) C (String) >> 1 4 Hello Zero >> >> >> Query executed: >> >> select id, last_value(a) over(partition by id order by date_time desc) as >> a, last_value(b) over(partition by id order by date_time desc) as b, >> last_value(c) over(partition by id order by date_time desc) as c from >> udf_test where a is not null and b is not null and c is not null; >> >> >> Output I am getting: >> >> +----+---+-------+----+ >> >> | id | a | b | c | >> >> +----+---+-------+----+ >> >> | 1 | 4 | Hello | Hi || >> >> +----+---+-------+----+ >> >> Hopefully, I am clear with the problem above. >> >> Thanks, >> Ravi >> >> On 20 June 2017 at 22:05, Ravi Kanth <[email protected]> wrote: >> >>> Antoni, >>> >>> Thanks for the suggestion. Let me have a look at it and hopefully we can >>> use it in our use case. >>> >>> Thanks, >>> Ravi >>> >>> On Tue, Jun 20, 2017 at 21:53 Antoni Ivanov <[email protected]> wrote: >>> >>>> Hi Ravi, >>>> >>>> I am curious why you are not using already existing last_value function >>>> in Impala to get "latest non null value for the column” >>>> >>>> e.g >>>> last_value(column_a ignore nulls) over(partition by ID order by >>>> Date_Time) >>>> >>>> Thanks, >>>> Antoni >>>> >>>> >>>> >>>> >>>> On Jun 21, 2017, at 1:17 AM, Tim Armstrong <[email protected]> >>>> wrote: >>>> >>>> This was double-posted to http://community.cloudera.com/ >>>> t5/Interactive-Short-cycle-SQL/Creating-Impala-UDA/m-p/ >>>> 56201/highlight/false#M3073 >>>> <https://urldefense.proofpoint.com/v2/url?u=http-3A__community.cloudera.com_t5_Interactive-2DShort-2Dcycle-2DSQL_Creating-2DImpala-2DUDA_m-2Dp_56201_highlight_false-23M3073&d=DwMFaQ&c=uilaK90D4TOVoH58JNXRgQ&r=8j7tFznmoySfY9RiEAZHgvi3kzcbJ_Zy6Hp9HYX4dDE&m=hJl6J5oweVVguuh7TmOZhr9mMy6SvYUZ7kpQT_oTPY8&s=ySJ0TF-4OH9kZJebUfH8uNEFpwKylGthB9pmsRSfXJc&e=> >>>> also. I'll continue the discussion here. >>>> >>>> > Can we have the flexibility of declaring the variable globally in >>>> UDF? Globally, I mean outside the function? >>>> >>>> > And, the reason I am declaring a static variable is to restore the >>>> value of timestamp for every record so that I can perform a comparison of >>>> the timestamps. Is there an alternative approach for this? >>>> >>>> Updating a global or static variable in a UDAF is guaranteed not to do >>>> what you expect - the function can be invoked concurrently by multiple >>>> threads. >>>> >>>> It seems like you probably want to store some additional state in the >>>> intermediate value. There are some sample UDAs here (see Avg()) where >>>> additional intermediate state is stored in a StringVal: >>>> https://github.com/cloudera/impala-udf-samples/blob/ >>>> master/uda-sample.cc#L61 >>>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_cloudera_impala-2Dudf-2Dsamples_blob_master_uda-2Dsample.cc-23L61&d=DwMFaQ&c=uilaK90D4TOVoH58JNXRgQ&r=8j7tFznmoySfY9RiEAZHgvi3kzcbJ_Zy6Hp9HYX4dDE&m=hJl6J5oweVVguuh7TmOZhr9mMy6SvYUZ7kpQT_oTPY8&s=5NvnQzufO43--_qRtdwsm7ukEoVM2Z3bwVv5R9HBM1Q&e=> >>>> >>>> On Tue, Jun 20, 2017 at 2:40 PM, Ravi Kanth <[email protected]> >>>> wrote: >>>> >>>>> Thanks Bharath. Can you check if the logic I am implementing is >>>>> correct or needed any modification in it as well? I am very new to Impala >>>>> UDF & C++ and having some hard time figuring out the problems. >>>>> >>>>> On 20 June 2017 at 14:27, Bharath Vissapragada <[email protected]> >>>>> wrote: >>>>> >>>>>> You need to allocate memory for tsTemp, else it can segfault. That >>>>>> could be the issue here. >>>>>> >>>>>> static TimestampVal* tsTemp; >>>>>> tsTemp->date = 0; >>>>>> tsTemp->time_of_day = 0; >>>>>> >>>>>> >>>>>> On Tue, Jun 20, 2017 at 2:15 PM, Ravi Kanth <[email protected]> >>>>>> wrote: >>>>>> >>>>>>> Hi All, >>>>>>> We are using Impala to do various processings in our systems. We >>>>>>> have a requirement recently, wherein we have to handle the updates on >>>>>>> the >>>>>>> events i.e, we have an 'e_update' table which has the partial updates >>>>>>> received for various events. The fields that are not updated are being >>>>>>> stored as NULL values. >>>>>>> >>>>>>> >>>>>>> Ex: >>>>>>> >>>>>>> >>>>>>> ID (Int) Date_Time (timestamp) A (Int) B (String) C (String) >>>>>>> 1 0 1 NULL NULL >>>>>>> 1 1 2 Hi NULL >>>>>>> 1 3 4 Hello Hi >>>>>>> 1 2 5 NULL NULL >>>>>>> 1 4 NULL NULL Zero >>>>>>> >>>>>>> >>>>>>> P.S: Please consider Date_time as valid timestamp type values. For >>>>>>> easy understanding, mentioned them as 0,1,2,3,4,5 >>>>>>> >>>>>>> >>>>>>> As seen in the above table, the events have a unique id and as we >>>>>>> get an update to a particular event, we are storing the date_time at >>>>>>> which >>>>>>> update has happened and also storing the partial updated values. Apart >>>>>>> from >>>>>>> the updated values, the rest are stored as NULL values. >>>>>>> >>>>>>> >>>>>>> We are planning to mimic inplace updates on the table, so that it >>>>>>> would retrieve the resulting table as follows using the query below: We >>>>>>> don't delete the data. >>>>>>> >>>>>>> >>>>>>> > SELECT id, current_val(A,date_time) as A, current_val(B,date_time) >>>>>>> as B, current_val(C,date_time) as C from e_update GROUP BY ID; >>>>>>> >>>>>>> >>>>>>> where, current_val is a custom impala UDA we are planning to >>>>>>> implement. i.e. get* latest non null value for the column.* >>>>>>> >>>>>>> >>>>>>> ID (Int) A (Int) B (String) C (String) >>>>>>> 1 4 Hello Zero >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> Implemented current_val UDA: >>>>>>> The below code is only for int type inputs: >>>>>>> >>>>>>> >>>>>>> uda-currentval.h >>>>>>> >>>>>>> //This is a sample for retrieving the current value of e_update table >>>>>>> // >>>>>>> void CurrentValueInit(FunctionContext* context, IntVal* val); >>>>>>> void CurrentValueUpdate(FunctionContext* context, const IntVal& input, >>>>>>> const TimestampVal& ts, IntVal* val); >>>>>>> void CurrentValueMerge(FunctionContext* context, const IntVal& src, >>>>>>> IntVal* dst); >>>>>>> IntVal CurrentValueFinalize(FunctionContext* context, const IntVal& >>>>>>> val); >>>>>>> >>>>>>> uda-currentval.cc >>>>>>> >>>>>>> // >>>>>>> ----------------------------------------------------------------------------------------------- >>>>>>> // This is a sample for retrieving the current value of e_update table >>>>>>> //----------------------------------------------------------------------------------------------- >>>>>>> void CurrentValueInit(FunctionContext* context, IntVal* val) { >>>>>>> val->is_null = false; >>>>>>> val->val = 0; >>>>>>> } >>>>>>> >>>>>>> void CurrentValueUpdate(FunctionContext* context, const IntVal& input, >>>>>>> const TimestampVal& ts, IntVal* val) { >>>>>>> static TimestampVal* tsTemp; >>>>>>> tsTemp->date = 0; >>>>>>> tsTemp->time_of_day = 0; >>>>>>> if(tsTemp->date==0 && tsTemp->time_of_day==0){ >>>>>>> tsTemp->date = ts.date; >>>>>>> tsTemp->time_of_day = ts.time_of_day; >>>>>>> val->val = input.val; >>>>>>> return; >>>>>>> } >>>>>>> if(ts.date > tsTemp->date && ts.time_of_day > >>>>>>> tsTemp->time_of_day){ >>>>>>> tsTemp->date = ts.date; >>>>>>> tsTemp->time_of_day = ts.time_of_day; >>>>>>> val->val = input.val; >>>>>>> return; >>>>>>> } >>>>>>> } >>>>>>> >>>>>>> void CurrentValueMerge(FunctionContext* context, const IntVal& src, >>>>>>> IntVal* dst) { >>>>>>> dst->val += src.val; >>>>>>> } >>>>>>> >>>>>>> IntVal CurrentValueFinalize(FunctionContext* context, const IntVal& >>>>>>> val) { >>>>>>> return val; >>>>>>> } >>>>>>> >>>>>>> >>>>>>> We are able to build and create an aggregate function in impala, but >>>>>>> when trying to run the select query similar to the one above, it is >>>>>>> bringing down couple of impala deamons and throwing the error below and >>>>>>> getting terminated. >>>>>>> >>>>>>> >>>>>>> WARNINGS: Cancelled due to unreachable impalad(s): >>>>>>> hadoop102.**.**.**.com:22000 >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> We have impalad running on 14 instances. >>>>>>> >>>>>>> >>>>>>> Can someone help resolve us this problem and a better way to achieve >>>>>>> a solution for the scenario explained. >>>>>>> >>>>>> >>>>>> >>>>> >>>> >>>> >>
