Antoni,

The problem in using last_value function() as far as I observed is, if I
use it on multiple columns in a single query, its not retrieving results as
expected.

 Input:
ID (Int) Date_Time (timestamp) A (Int) B (String) C (String)
1 0 1 NULL NULL
1 1 2 Hi NULL
1 3 4 Hello Hi
1 2 5 NULL NULL
1 4 NULL NULL Zero

Expected Output:


ID (Int) A (Int) B (String) C (String)
1 4 Hello Zero


Query executed:

select id, last_value(a) over(partition by id order by date_time desc) as
a, last_value(b) over(partition by id order by date_time desc) as b,
last_value(c) over(partition by id order by date_time desc) as c from
udf_test where a is not null and b is not null and c is not null;


Output I am getting:

+----+---+-------+----+

| id | a | b     | c  |

+----+---+-------+----+

| 1  | 4 | Hello | Hi ||

+----+---+-------+----+

Hopefully, I am clear with the problem above.

Thanks,
Ravi

On 20 June 2017 at 22:05, Ravi Kanth <[email protected]> wrote:

> Antoni,
>
> Thanks for the suggestion. Let me have a look at it and hopefully we can
> use it in our use case.
>
> Thanks,
> Ravi
>
> On Tue, Jun 20, 2017 at 21:53 Antoni Ivanov <[email protected]> wrote:
>
>> Hi Ravi,
>>
>> I am curious why you are not using already existing last_value function
>> in Impala to get "latest non null value for the column”
>>
>> e.g
>> last_value(column_a ignore nulls) over(partition by ID  order by
>> Date_Time)
>>
>> Thanks,
>> Antoni
>>
>>
>>
>>
>> On Jun 21, 2017, at 1:17 AM, Tim Armstrong <[email protected]>
>> wrote:
>>
>> This was double-posted to http://community.cloudera.com/
>> t5/Interactive-Short-cycle-SQL/Creating-Impala-UDA/m-p/
>> 56201/highlight/false#M3073
>> <https://urldefense.proofpoint.com/v2/url?u=http-3A__community.cloudera.com_t5_Interactive-2DShort-2Dcycle-2DSQL_Creating-2DImpala-2DUDA_m-2Dp_56201_highlight_false-23M3073&d=DwMFaQ&c=uilaK90D4TOVoH58JNXRgQ&r=8j7tFznmoySfY9RiEAZHgvi3kzcbJ_Zy6Hp9HYX4dDE&m=hJl6J5oweVVguuh7TmOZhr9mMy6SvYUZ7kpQT_oTPY8&s=ySJ0TF-4OH9kZJebUfH8uNEFpwKylGthB9pmsRSfXJc&e=>
>> also. I'll continue the discussion here.
>>
>> > Can we have the flexibility of declaring the variable globally in UDF?
>> Globally, I mean outside the function?
>>
>> > And, the reason I am declaring a static variable is to restore the
>> value of timestamp for every record so that I can perform a comparison of
>> the timestamps. Is there an alternative approach for this?
>>
>> Updating a global or static variable in a UDAF is guaranteed not to do
>> what you expect - the function can be invoked concurrently by multiple
>> threads.
>>
>> It seems like you probably want to store some additional state in the
>> intermediate value. There are some sample UDAs here (see Avg()) where
>> additional intermediate state is stored in a StringVal:
>> https://github.com/cloudera/impala-udf-samples/blob/
>> master/uda-sample.cc#L61
>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_cloudera_impala-2Dudf-2Dsamples_blob_master_uda-2Dsample.cc-23L61&d=DwMFaQ&c=uilaK90D4TOVoH58JNXRgQ&r=8j7tFznmoySfY9RiEAZHgvi3kzcbJ_Zy6Hp9HYX4dDE&m=hJl6J5oweVVguuh7TmOZhr9mMy6SvYUZ7kpQT_oTPY8&s=5NvnQzufO43--_qRtdwsm7ukEoVM2Z3bwVv5R9HBM1Q&e=>
>>
>> On Tue, Jun 20, 2017 at 2:40 PM, Ravi Kanth <[email protected]>
>> wrote:
>>
>>> Thanks Bharath. Can you check if the logic I am implementing is correct
>>> or needed any modification in it as well? I am very new to Impala UDF & C++
>>> and having some hard time figuring out the problems.
>>>
>>> On 20 June 2017 at 14:27, Bharath Vissapragada <[email protected]>
>>> wrote:
>>>
>>>> You need to allocate memory for tsTemp, else it can segfault. That
>>>> could be the issue here.
>>>>
>>>>  static TimestampVal* tsTemp;
>>>>       tsTemp->date = 0;
>>>>       tsTemp->time_of_day = 0;
>>>>
>>>>
>>>> On Tue, Jun 20, 2017 at 2:15 PM, Ravi Kanth <[email protected]>
>>>> wrote:
>>>>
>>>>> Hi All,
>>>>> We are using Impala to do various processings in our systems. We have
>>>>> a requirement recently, wherein we have to handle the updates on the 
>>>>> events
>>>>> i.e, we have an 'e_update' table which has the partial updates received 
>>>>> for
>>>>> various events. The fields that are not updated are being stored as NULL
>>>>> values.
>>>>>
>>>>>
>>>>> Ex:
>>>>>
>>>>>
>>>>> ID (Int) Date_Time (timestamp) A (Int) B (String) C (String)
>>>>> 1 0 1 NULL NULL
>>>>> 1 1 2 Hi NULL
>>>>> 1 3 4 Hello Hi
>>>>> 1 2 5 NULL NULL
>>>>> 1 4 NULL NULL Zero
>>>>>
>>>>>
>>>>> P.S: Please consider Date_time as valid timestamp type values. For
>>>>> easy understanding, mentioned them as 0,1,2,3,4,5
>>>>>
>>>>>
>>>>> As seen in the above table, the events have a unique id and as we get
>>>>> an update to a particular event, we are storing the date_time at which
>>>>> update has happened and also storing the partial updated values. Apart 
>>>>> from
>>>>> the updated values, the rest are stored as NULL values.
>>>>>
>>>>>
>>>>> We are planning to mimic inplace updates on the table, so that it
>>>>> would retrieve the resulting table as follows using the query below: We
>>>>> don't delete the data.
>>>>>
>>>>>
>>>>> > SELECT id, current_val(A,date_time) as A, current_val(B,date_time)
>>>>> as B, current_val(C,date_time) as C from e_update GROUP BY ID;
>>>>>
>>>>>
>>>>> where, current_val is a custom impala UDA we are planning to
>>>>> implement. i.e. get* latest non null value for the column.*
>>>>>
>>>>>
>>>>> ID (Int) A (Int) B (String) C (String)
>>>>> 1 4 Hello Zero
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> Implemented current_val UDA:
>>>>> The below code is only for int type inputs:
>>>>>
>>>>>
>>>>> uda-currentval.h
>>>>>
>>>>> //This is a sample for retrieving the current value of e_update table
>>>>> //
>>>>> void CurrentValueInit(FunctionContext* context, IntVal* val);
>>>>> void CurrentValueUpdate(FunctionContext* context, const IntVal& input, 
>>>>> const TimestampVal& ts, IntVal* val);
>>>>> void CurrentValueMerge(FunctionContext* context, const IntVal& src, 
>>>>> IntVal* dst);
>>>>> IntVal CurrentValueFinalize(FunctionContext* context, const IntVal& val);
>>>>>
>>>>> uda-currentval.cc
>>>>>
>>>>> // 
>>>>> -----------------------------------------------------------------------------------------------
>>>>> // This is a sample for retrieving the current value of e_update table
>>>>> //-----------------------------------------------------------------------------------------------
>>>>> void CurrentValueInit(FunctionContext* context, IntVal* val) {
>>>>>       val->is_null = false;
>>>>>       val->val = 0;
>>>>> }
>>>>>
>>>>> void CurrentValueUpdate(FunctionContext* context, const IntVal& input, 
>>>>> const TimestampVal& ts, IntVal* val) {
>>>>>       static TimestampVal* tsTemp;
>>>>>       tsTemp->date = 0;
>>>>>       tsTemp->time_of_day = 0;
>>>>>       if(tsTemp->date==0 && tsTemp->time_of_day==0){
>>>>>         tsTemp->date = ts.date;
>>>>>         tsTemp->time_of_day = ts.time_of_day;
>>>>>         val->val = input.val;
>>>>>         return;
>>>>>       }
>>>>>       if(ts.date > tsTemp->date && ts.time_of_day > tsTemp->time_of_day){
>>>>>         tsTemp->date = ts.date;
>>>>>         tsTemp->time_of_day = ts.time_of_day;
>>>>>         val->val = input.val;
>>>>>         return;
>>>>>       }
>>>>> }
>>>>>
>>>>> void CurrentValueMerge(FunctionContext* context, const IntVal& src, 
>>>>> IntVal* dst) {
>>>>>      dst->val += src.val;
>>>>> }
>>>>>
>>>>> IntVal CurrentValueFinalize(FunctionContext* context, const IntVal& val) {
>>>>>      return val;
>>>>> }
>>>>>
>>>>>
>>>>> We are able to build and create an aggregate function in impala, but
>>>>> when trying to run the select query similar to the one above, it is
>>>>> bringing down couple of impala deamons and throwing the error below and
>>>>> getting terminated.
>>>>>
>>>>>
>>>>> WARNINGS: Cancelled due to unreachable impalad(s):
>>>>> hadoop102.**.**.**.com:22000
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> We have impalad running on 14 instances.
>>>>>
>>>>>
>>>>> Can someone help resolve us this problem and a better way to achieve a
>>>>> solution for the scenario explained.
>>>>>
>>>>
>>>>
>>>
>>
>>

Reply via email to