Thomas,

The version of Impala we are using is 2.5.0 with CDH 5.7.0 and I see ignore
nulls has been added in Impala 2.7.0. And, does adding ignore nulls would
make a big difference in the expected result?

Ravi

On 21 June 2017 at 11:20, Thomas Tauber-Marshall <[email protected]>
wrote:

> Ravi,
>
> Instead of using the "where ... is not null", have you tried
> 'last_value(... ignore nulls)'
>
> On Wed, Jun 21, 2017 at 1:08 PM Ravi Kanth <[email protected]>
> wrote:
>
>> Antoni,
>>
>> The problem in using last_value function() as far as I observed is, if I
>> use it on multiple columns in a single query, its not retrieving results as
>> expected.
>>
>>  Input:
>> ID (Int) Date_Time (timestamp) A (Int) B (String) C (String)
>> 1 0 1 NULL NULL
>> 1 1 2 Hi NULL
>> 1 3 4 Hello Hi
>> 1 2 5 NULL NULL
>> 1 4 NULL NULL Zero
>>
>> Expected Output:
>>
>>
>> ID (Int) A (Int) B (String) C (String)
>> 1 4 Hello Zero
>>
>>
>> Query executed:
>>
>> select id, last_value(a) over(partition by id order by date_time desc) as
>> a, last_value(b) over(partition by id order by date_time desc) as b,
>> last_value(c) over(partition by id order by date_time desc) as c from
>> udf_test where a is not null and b is not null and c is not null;
>>
>>
>> Output I am getting:
>>
>> +----+---+-------+----+
>>
>> | id | a | b     | c  |
>>
>> +----+---+-------+----+
>>
>> | 1  | 4 | Hello | Hi ||
>>
>> +----+---+-------+----+
>>
>> Hopefully, I am clear with the problem above.
>>
>> Thanks,
>> Ravi
>>
>> On 20 June 2017 at 22:05, Ravi Kanth <[email protected]> wrote:
>>
>>> Antoni,
>>>
>>> Thanks for the suggestion. Let me have a look at it and hopefully we can
>>> use it in our use case.
>>>
>>> Thanks,
>>> Ravi
>>>
>>> On Tue, Jun 20, 2017 at 21:53 Antoni Ivanov <[email protected]> wrote:
>>>
>>>> Hi Ravi,
>>>>
>>>> I am curious why you are not using already existing last_value function
>>>> in Impala to get "latest non null value for the column”
>>>>
>>>> e.g
>>>> last_value(column_a ignore nulls) over(partition by ID  order by
>>>> Date_Time)
>>>>
>>>> Thanks,
>>>> Antoni
>>>>
>>>>
>>>>
>>>>
>>>> On Jun 21, 2017, at 1:17 AM, Tim Armstrong <[email protected]>
>>>> wrote:
>>>>
>>>> This was double-posted to http://community.cloudera.com/
>>>> t5/Interactive-Short-cycle-SQL/Creating-Impala-UDA/m-p/
>>>> 56201/highlight/false#M3073
>>>> <https://urldefense.proofpoint.com/v2/url?u=http-3A__community.cloudera.com_t5_Interactive-2DShort-2Dcycle-2DSQL_Creating-2DImpala-2DUDA_m-2Dp_56201_highlight_false-23M3073&d=DwMFaQ&c=uilaK90D4TOVoH58JNXRgQ&r=8j7tFznmoySfY9RiEAZHgvi3kzcbJ_Zy6Hp9HYX4dDE&m=hJl6J5oweVVguuh7TmOZhr9mMy6SvYUZ7kpQT_oTPY8&s=ySJ0TF-4OH9kZJebUfH8uNEFpwKylGthB9pmsRSfXJc&e=>
>>>> also. I'll continue the discussion here.
>>>>
>>>> > Can we have the flexibility of declaring the variable globally in
>>>> UDF? Globally, I mean outside the function?
>>>>
>>>> > And, the reason I am declaring a static variable is to restore the
>>>> value of timestamp for every record so that I can perform a comparison of
>>>> the timestamps. Is there an alternative approach for this?
>>>>
>>>> Updating a global or static variable in a UDAF is guaranteed not to do
>>>> what you expect - the function can be invoked concurrently by multiple
>>>> threads.
>>>>
>>>> It seems like you probably want to store some additional state in the
>>>> intermediate value. There are some sample UDAs here (see Avg()) where
>>>> additional intermediate state is stored in a StringVal:
>>>> https://github.com/cloudera/impala-udf-samples/blob/
>>>> master/uda-sample.cc#L61
>>>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_cloudera_impala-2Dudf-2Dsamples_blob_master_uda-2Dsample.cc-23L61&d=DwMFaQ&c=uilaK90D4TOVoH58JNXRgQ&r=8j7tFznmoySfY9RiEAZHgvi3kzcbJ_Zy6Hp9HYX4dDE&m=hJl6J5oweVVguuh7TmOZhr9mMy6SvYUZ7kpQT_oTPY8&s=5NvnQzufO43--_qRtdwsm7ukEoVM2Z3bwVv5R9HBM1Q&e=>
>>>>
>>>> On Tue, Jun 20, 2017 at 2:40 PM, Ravi Kanth <[email protected]>
>>>> wrote:
>>>>
>>>>> Thanks Bharath. Can you check if the logic I am implementing is
>>>>> correct or needed any modification in it as well? I am very new to Impala
>>>>> UDF & C++ and having some hard time figuring out the problems.
>>>>>
>>>>> On 20 June 2017 at 14:27, Bharath Vissapragada <[email protected]>
>>>>> wrote:
>>>>>
>>>>>> You need to allocate memory for tsTemp, else it can segfault. That
>>>>>> could be the issue here.
>>>>>>
>>>>>>  static TimestampVal* tsTemp;
>>>>>>       tsTemp->date = 0;
>>>>>>       tsTemp->time_of_day = 0;
>>>>>>
>>>>>>
>>>>>> On Tue, Jun 20, 2017 at 2:15 PM, Ravi Kanth <[email protected]>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi All,
>>>>>>> We are using Impala to do various processings in our systems. We
>>>>>>> have a requirement recently, wherein we have to handle the updates on 
>>>>>>> the
>>>>>>> events i.e, we have an 'e_update' table which has the partial updates
>>>>>>> received for various events. The fields that are not updated are being
>>>>>>> stored as NULL values.
>>>>>>>
>>>>>>>
>>>>>>> Ex:
>>>>>>>
>>>>>>>
>>>>>>> ID (Int) Date_Time (timestamp) A (Int) B (String) C (String)
>>>>>>> 1 0 1 NULL NULL
>>>>>>> 1 1 2 Hi NULL
>>>>>>> 1 3 4 Hello Hi
>>>>>>> 1 2 5 NULL NULL
>>>>>>> 1 4 NULL NULL Zero
>>>>>>>
>>>>>>>
>>>>>>> P.S: Please consider Date_time as valid timestamp type values. For
>>>>>>> easy understanding, mentioned them as 0,1,2,3,4,5
>>>>>>>
>>>>>>>
>>>>>>> As seen in the above table, the events have a unique id and as we
>>>>>>> get an update to a particular event, we are storing the date_time at 
>>>>>>> which
>>>>>>> update has happened and also storing the partial updated values. Apart 
>>>>>>> from
>>>>>>> the updated values, the rest are stored as NULL values.
>>>>>>>
>>>>>>>
>>>>>>> We are planning to mimic inplace updates on the table, so that it
>>>>>>> would retrieve the resulting table as follows using the query below: We
>>>>>>> don't delete the data.
>>>>>>>
>>>>>>>
>>>>>>> > SELECT id, current_val(A,date_time) as A, current_val(B,date_time)
>>>>>>> as B, current_val(C,date_time) as C from e_update GROUP BY ID;
>>>>>>>
>>>>>>>
>>>>>>> where, current_val is a custom impala UDA we are planning to
>>>>>>> implement. i.e. get* latest non null value for the column.*
>>>>>>>
>>>>>>>
>>>>>>> ID (Int) A (Int) B (String) C (String)
>>>>>>> 1 4 Hello Zero
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> Implemented current_val UDA:
>>>>>>> The below code is only for int type inputs:
>>>>>>>
>>>>>>>
>>>>>>> uda-currentval.h
>>>>>>>
>>>>>>> //This is a sample for retrieving the current value of e_update table
>>>>>>> //
>>>>>>> void CurrentValueInit(FunctionContext* context, IntVal* val);
>>>>>>> void CurrentValueUpdate(FunctionContext* context, const IntVal& input, 
>>>>>>> const TimestampVal& ts, IntVal* val);
>>>>>>> void CurrentValueMerge(FunctionContext* context, const IntVal& src, 
>>>>>>> IntVal* dst);
>>>>>>> IntVal CurrentValueFinalize(FunctionContext* context, const IntVal& 
>>>>>>> val);
>>>>>>>
>>>>>>> uda-currentval.cc
>>>>>>>
>>>>>>> // 
>>>>>>> -----------------------------------------------------------------------------------------------
>>>>>>> // This is a sample for retrieving the current value of e_update table
>>>>>>> //-----------------------------------------------------------------------------------------------
>>>>>>> void CurrentValueInit(FunctionContext* context, IntVal* val) {
>>>>>>>       val->is_null = false;
>>>>>>>       val->val = 0;
>>>>>>> }
>>>>>>>
>>>>>>> void CurrentValueUpdate(FunctionContext* context, const IntVal& input, 
>>>>>>> const TimestampVal& ts, IntVal* val) {
>>>>>>>       static TimestampVal* tsTemp;
>>>>>>>       tsTemp->date = 0;
>>>>>>>       tsTemp->time_of_day = 0;
>>>>>>>       if(tsTemp->date==0 && tsTemp->time_of_day==0){
>>>>>>>         tsTemp->date = ts.date;
>>>>>>>         tsTemp->time_of_day = ts.time_of_day;
>>>>>>>         val->val = input.val;
>>>>>>>         return;
>>>>>>>       }
>>>>>>>       if(ts.date > tsTemp->date && ts.time_of_day > 
>>>>>>> tsTemp->time_of_day){
>>>>>>>         tsTemp->date = ts.date;
>>>>>>>         tsTemp->time_of_day = ts.time_of_day;
>>>>>>>         val->val = input.val;
>>>>>>>         return;
>>>>>>>       }
>>>>>>> }
>>>>>>>
>>>>>>> void CurrentValueMerge(FunctionContext* context, const IntVal& src, 
>>>>>>> IntVal* dst) {
>>>>>>>      dst->val += src.val;
>>>>>>> }
>>>>>>>
>>>>>>> IntVal CurrentValueFinalize(FunctionContext* context, const IntVal& 
>>>>>>> val) {
>>>>>>>      return val;
>>>>>>> }
>>>>>>>
>>>>>>>
>>>>>>> We are able to build and create an aggregate function in impala, but
>>>>>>> when trying to run the select query similar to the one above, it is
>>>>>>> bringing down couple of impala deamons and throwing the error below and
>>>>>>> getting terminated.
>>>>>>>
>>>>>>>
>>>>>>> WARNINGS: Cancelled due to unreachable impalad(s):
>>>>>>> hadoop102.**.**.**.com:22000
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> We have impalad running on 14 instances.
>>>>>>>
>>>>>>>
>>>>>>> Can someone help resolve us this problem and a better way to achieve
>>>>>>> a solution for the scenario explained.
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>>
>>

Reply via email to