Thanks for that Matthew. Not to do anything against your suggestion but, I
just wanted to try impala custom UDA and see if it might give us any
expected result. If not this, I would learn to write our own custom UDA to
solve problems other than this.

I am facing issues when trying to return any return type other than
StringVal in the finalize method of UDA. Its throwing error when
registering the handlers with impala and creating a function.

ERROR: AnalysisException: Could not find function CurrentValOneUpdate(INT,
TIMESTAMP) returns INT in:
hdfs://**************com:8020/dwh/impala/udf/libudasample.so

Check that function name, arguments, and return type are correct.

The above works fine when returning StringVal type.
Impala version using: Impala Shell v2.5.0-cdh5.7.0

Waiting for your suggestion

Thanks,
Ravi

On 22 June 2017 at 11:58, Matthew Jacobs <[email protected]> wrote:

> I haven't looked at the code, but this isn't going to solve your
> earlier issue because this is an aggregation function, this will not
> work for analytic functions. I was saying in an earlier email that we
> don't expose an interface for registering analytic functions, so you
> should probably just upgrade Impala. I posted the patch of the
> analytic function we added to illustrate that it will be difficult and
> not a matter of registering a UDA.
>
> On Thu, Jun 22, 2017 at 11:25 AM, Ravi Kanth <[email protected]>
> wrote:
> > Hi All,
> >
> > I wrote the below lines of code to achieve this functionality:
> >
> > // Copyright 2012 Cloudera Inc.
> > //
> > // Licensed under the Apache License, Version 2.0 (the "License");
> > // you may not use this file except in compliance with the License.
> > // You may obtain a copy of the License at
> > //
> > // http://www.apache.org/licenses/LICENSE-2.0
> > //
> > // Unless required by applicable law or agreed to in writing, software
> > // distributed under the License is distributed on an "AS IS" BASIS,
> > // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
> implied.
> > // See the License for the specific language governing permissions and
> > // limitations under the License.
> >
> > #include "uda-sample.h"
> > #include <assert.h>
> > #include <sstream>
> >
> > using namespace impala_udf;
> > using namespace std;
> >
> >
> > StringVal ToStringVal(FunctionContext* context, const IntVal& val) {
> >   stringstream ss;
> >   ss << val.val;
> >   string str = ss.str();
> >   StringVal string_val(context, str.size());
> >   memcpy(string_val.ptr, str.c_str(), str.size());
> >   return string_val;
> > }
> >
> > //
> > ------------------------------------------------------------
> ---------------------------
> > // // This is an aggregate function for retrieving the latest non-null
> value
> > for e_update table
> > // //
> > ------------------------------------------------------------
> ---------------------------
> >
> > struct CurrentValStruct {
> >   IntVal value;
> >   TimestampVal tsTemp;
> > };
> >
> > // Initialize the StringVal intermediate to a zero'd AvgStruct
> > void CurrentValInit(FunctionContext* context, StringVal* val) {
> >   val->is_null = false;
> >   val->len = sizeof(CurrentValStruct);
> >   val->ptr = context->Allocate(val->len);
> >   memset(val->ptr, 0, val->len);
> >   CurrentValStruct* cur = reinterpret_cast<CurrentValStruct*>(val->ptr);
> >   cur->value.is_null = false;
> >   cur->tsTemp.is_null = false;
> > }
> >
> > void CurrentValUpdate(FunctionContext* context, const IntVal& input,
> const
> > TimestampVal& ts, StringVal* val) {
> >   assert(!val->is_null);
> >   assert(val->len == sizeof(CurrentValStruct));
> >   CurrentValStruct* cur = reinterpret_cast<CurrentValStruct*>(val->ptr);
> >   //checking if the incoming input value is null
> >   if(!input.is_null){
> >     if(ts.date >= cur->tsTemp.date && ts.time_of_day >
> > cur->tsTemp.time_of_day){
> >     cur->value = input.val;
> >       cur->tsTemp.date = ts.date;
> >       cur->tsTemp.time_of_day = ts.time_of_day;
> >     }
> >   }
> > }
> >
> > void CurrentValMerge(FunctionContext* context, const StringVal& src,
> > StringVal* dst) {
> >   if (src.is_null) return;
> >   const CurrentValStruct* src_cur = reinterpret_cast<const
> > CurrentValStruct*>(src.ptr);
> >   CurrentValStruct* dst_cur = reinterpret_cast<
> CurrentValStruct*>(dst->ptr);
> >   if(dst_cur->tsTemp.is_null){
> >     dst_cur->value = src_cur->value;
> >     dst_cur->tsTemp.date = src_cur->tsTemp.date;
> >     dst_cur->tsTemp.time_of_day = src_cur->tsTemp.time_of_day;
> >   dst_cur->tsTemp.is_null = false;
> >   dst_cur->value.is_null = false;
> >  }
> >   else{
> >     if(src_cur->tsTemp.date >= dst_cur->tsTemp.date &&
> > src_cur->tsTemp.time_of_day > dst_cur->tsTemp.time_of_day){
> >     dst_cur->value = src_cur->value;
> >     dst_cur->tsTemp.date = src_cur->tsTemp.date;
> >       dst_cur->tsTemp.time_of_day = src_cur->tsTemp.time_of_day;
> >     }
> >   }
> > }
> >
> > StringVal CurrentValSerialize(FunctionContext* context, const StringVal&
> > val) {
> >   assert(!val.is_null);
> >   StringVal result(context, val.len);
> >   memcpy(result.ptr, val.ptr, val.len);
> >   context->Free(val.ptr);
> >   return result;
> > }
> >
> > StringVal CurrentValFinalize(FunctionContext* context, const StringVal&
> val)
> > {
> >   //IntVal intResult;
> >   assert(!val.is_null);
> >   assert(val.len == sizeof(CurrentValStruct));
> >   CurrentValStruct* cur = reinterpret_cast<CurrentValStruct*>(val.ptr);
> >   StringVal result;
> >   if (cur->value == 0) {
> >     result = StringVal::null();
> >     //intResult = 0;
> >   } else {
> >     // intResult = cur->value.val;
> >     // Copies the result to memory owned by Impala
> >     result = ToStringVal(context, cur->value.val);
> > //  intResult = atoi(result.c_str());
> >   //  std::istringstream(result) >> intResult;
> >   }
> >   context->Free(val.ptr);
> >   return result;
> > }
> >
> > Queries:
> >
> > create aggregate function current_val(int,timestamp) returns string
> location
> > '/impala/udf/libudasample.so' init_fn='CurrentValInit'
> > update_fn='CurrentValUpdate' merge_fn='CurrentValMerge'
> > serialize_fn='CurrentValSerialize' finalize_fn='CurrentValFinalize';
> >
> >
> > select id, current_val(a,date_time) as a from udf_test GROUP BY id;
> >
> >
> > The above is working fine and I am able to achieve my requirement. But,
> is
> > there any possibility that we can return an IntVal type rather than
> > StringVal type? If so where can I make the changes?
> >
> > I tried changing the below:
> >
> > IntVal CurrentValSerialize(FunctionContext* context, const StringVal&
> val) {
> >
> >   assert(!val.is_null);
> >
> >   StringVal result(context, val.len);
> >
> >   memcpy(result.ptr, val.ptr, val.len);
> >
> >   context->Free(val.ptr);
> >
> >   IntVal intResult;
> >
> >   CurrentValStruct* cur = reinterpret_cast<CurrentValStruct*>(val.ptr);
> >
> >   intResult = cur->value.val;
> >
> >   return intResult;
> >
> > }
> >
> >
> > IntVal CurrentValFinalize(FunctionContext* context, const StringVal&
> val) {
> >
> >   IntVal intResult;
> >
> >   assert(!val.is_null);
> >
> >   assert(val.len == sizeof(CurrentValStruct));
> >
> >   CurrentValStruct* cur = reinterpret_cast<CurrentValStruct*>(val.ptr);
> >
> >   //StringVal result;
> >
> >   if (cur->value == 0) {
> >
> >     //result = StringVal::null();
> >
> >     intResult = 0;
> >
> >   } else {
> >
> >     intResult = cur->value.val;
> >
> >     // Copies the result to memory owned by Impala
> >
> >     //result = ToStringVal(context, cur->value.val);
> >
> > //  intResult = atoi(result.c_str());
> >
> >   //  std::istringstream(result) >> intResult;
> >
> >   }
> >
> >   context->Free(val.ptr);
> >
> >   return intResult;
> >
> > }
> >
> >
> > But, when trying to create aggregate function I am facing,
> >
> > create aggregate function current_val(int,timestamp) returns int location
> > '/impala/udf/libudasample.so' init_fn='CurrentValInit'
> > update_fn='CurrentValUpdate' merge_fn='CurrentValMerge'
> > serialize_fn='CurrentValSerialize' finalize_fn='CurrentValFinalize';
> >
> >
> > Query: create aggregate function current_val(int,timestamp) returns int
> > location '/dwh/impala/udf/libudasample.so' init_fn='CurrentValInit'
> > update_fn='CurrentValUpdate' merge_fn='CurrentValMerge'
> > serialize_fn='CurrentValSerialize' finalize_fn='CurrentValFinalize'
> >
> >
> > ERROR: AnalysisException: Could not find function CurrentValUpdate(INT,
> > TIMESTAMP) returns INT in:
> > hdfs://***************:8020/impala/udf/libudasample.so
> >
> > Check that function name, arguments, and return type are correct.
> >
> >
> > I changed the header file function definition also accordingly. Can
> someone
> > suggest if I am missing something here?
> >
> > Thanks,
> > Ravi
> >
> > On 21 June 2017 at 13:42, Ravi Kanth <[email protected]> wrote:
> >>
> >> Thanks for the suggestion Matthew. Let me look into the patch. I am
> >> currently working on building a custom UDA. Hopefully the information
> you
> >> provided and the discussion we had might be useful to me.
> >>
> >> On 21 June 2017 at 13:39, Matthew Jacobs <[email protected]> wrote:
> >>>
> >>> I'd strongly recommend the latter (upgrading). We don't really expose
> >>> the analytic function interface, so you'd end up writing an Impala
> >>> patch, and analytic functions are particularly tricky.
> >>>
> >>> Here's Thomas' patch to add 'ignore nulls' in first/last value:
> >>> https://gerrit.cloudera.org/#/c/3328/
> >>>
> >>> On Wed, Jun 21, 2017 at 1:08 PM, Ravi Kanth <[email protected]>
> >>> wrote:
> >>> > Thanks All. I will think of a possible solution either by
> implementing
> >>> > a
> >>> > Custom UDA or would update the version.
> >>> >
> >>> > On Wed, Jun 21, 2017 at 13:04 Thomas Tauber-Marshall
> >>> > <[email protected]> wrote:
> >>> >>
> >>> >> On Wed, Jun 21, 2017 at 2:33 PM Ravi Kanth <[email protected]
> >
> >>> >> wrote:
> >>> >>>
> >>> >>> Ya. I agree with you Thomas. Probably that's what I'm doing wrong.
> >>> >>>
> >>> >>> Unfortunately, as mentioned the version of impala we are using I
> >>> >>> belive
> >>> >>> it doesn't support ignore nulls.
> >>> >>>
> >>> >>> But, my question is would last_value function retrieve a latest not
> >>> >>> null
> >>> >>> value irrespective of using ignore nulls?
> >>> >>
> >>> >>
> >>> >> Not sure I follow - if you use last_value without ignore nulls,
> you'll
> >>> >> get
> >>> >> the latest value taking all values into consideration, which may or
> >>> >> may not
> >>> >> be null.
> >>> >>
> >>> >>>
> >>> >>>
> >>> >>> Ravi
> >>> >>>
> >>> >>> On Wed, Jun 21, 2017 at 12:23 Matthew Jacobs <[email protected]>
> wrote:
> >>> >>>>
> >>> >>>> Ah I think Thomas is right. I read the expected results and the
> >>> >>>> query
> >>> >>>> too quickly, so my comment about the asc/desc is probably wrong.
> >>> >>>> Clearly my point about analytic functions being tricky holds true
> :)
> >>> >>>>
> >>> >>>> On Wed, Jun 21, 2017 at 12:12 PM, Thomas Tauber-Marshall
> >>> >>>> <[email protected]> wrote:
> >>> >>>> >
> >>> >>>> >
> >>> >>>> > On Wed, Jun 21, 2017 at 1:52 PM Ravi Kanth
> >>> >>>> > <[email protected]>
> >>> >>>> > wrote:
> >>> >>>> >>
> >>> >>>> >> Thomas,
> >>> >>>> >>
> >>> >>>> >> The version of Impala we are using is 2.5.0 with CDH 5.7.0 and
> I
> >>> >>>> >> see
> >>> >>>> >> ignore nulls has been added in Impala 2.7.0. And, does adding
> >>> >>>> >> ignore
> >>> >>>> >> nulls
> >>> >>>> >> would make a big difference in the expected result?
> >>> >>>> >
> >>> >>>> >
> >>> >>>> > That's too bad. I think that 'ignore nulls' would give you what
> >>> >>>> > you
> >>> >>>> > want -
> >>> >>>> > the problem with the query that you posted is that it eliminates
> >>> >>>> > rows
> >>> >>>> > that
> >>> >>>> > don't match the where clause, so for example the row with "Zero"
> >>> >>>> > in it
> >>> >>>> > is
> >>> >>>> > eliminated because it is filtered out by the "where a is not
> >>> >>>> > null",
> >>> >>>> > whereas
> >>> >>>> > "ignore nulls" only affects the values that could be returned by
> >>> >>>> > the
> >>> >>>> > specific analytic function that the ignore is applied to.
> >>> >>>> >
> >>> >>>> >>
> >>> >>>> >>
> >>> >>>> >> Ravi
> >>> >>>> >>
> >>> >>>> >> On 21 June 2017 at 11:20, Thomas Tauber-Marshall
> >>> >>>> >> <[email protected]>
> >>> >>>> >> wrote:
> >>> >>>> >>>
> >>> >>>> >>> Ravi,
> >>> >>>> >>>
> >>> >>>> >>> Instead of using the "where ... is not null", have you tried
> >>> >>>> >>> 'last_value(... ignore nulls)'
> >>> >>>> >>>
> >>> >>>> >>> On Wed, Jun 21, 2017 at 1:08 PM Ravi Kanth
> >>> >>>> >>> <[email protected]>
> >>> >>>> >>> wrote:
> >>> >>>> >>>>
> >>> >>>> >>>> Antoni,
> >>> >>>> >>>>
> >>> >>>> >>>> The problem in using last_value function() as far as I
> observed
> >>> >>>> >>>> is,
> >>> >>>> >>>> if I
> >>> >>>> >>>> use it on multiple columns in a single query, its not
> >>> >>>> >>>> retrieving
> >>> >>>> >>>> results as
> >>> >>>> >>>> expected.
> >>> >>>> >>>>
> >>> >>>> >>>>  Input:
> >>> >>>> >>>>
> >>> >>>> >>>> ID (Int)Date_Time (timestamp)A (Int)B (String)C (String)
> >>> >>>> >>>> 101NULLNULL
> >>> >>>> >>>> 112HiNULL
> >>> >>>> >>>> 134HelloHi
> >>> >>>> >>>> 125NULLNULL
> >>> >>>> >>>> 14NULLNULLZero
> >>> >>>> >>>>
> >>> >>>> >>>> Expected Output:
> >>> >>>> >>>>
> >>> >>>> >>>>
> >>> >>>> >>>>
> >>> >>>> >>>> ID (Int)A (Int)B (String)C (String)
> >>> >>>> >>>> 14HelloZero
> >>> >>>> >>>>
> >>> >>>> >>>>
> >>> >>>> >>>> Query executed:
> >>> >>>> >>>>
> >>> >>>> >>>> select id, last_value(a) over(partition by id order by
> >>> >>>> >>>> date_time
> >>> >>>> >>>> desc)
> >>> >>>> >>>> as a, last_value(b) over(partition by id order by date_time
> >>> >>>> >>>> desc)
> >>> >>>> >>>> as b,
> >>> >>>> >>>> last_value(c) over(partition by id order by date_time desc)
> as
> >>> >>>> >>>> c
> >>> >>>> >>>> from
> >>> >>>> >>>> udf_test where a is not null and b is not null and c is not
> >>> >>>> >>>> null;
> >>> >>>> >>>>
> >>> >>>> >>>>
> >>> >>>> >>>>
> >>> >>>> >>>> Output I am getting:
> >>> >>>> >>>>
> >>> >>>> >>>> +----+---+-------+----+
> >>> >>>> >>>>
> >>> >>>> >>>> | id | a | b     | c  |
> >>> >>>> >>>>
> >>> >>>> >>>> +----+---+-------+----+
> >>> >>>> >>>>
> >>> >>>> >>>> | 1  | 4 | Hello | Hi ||
> >>> >>>> >>>>
> >>> >>>> >>>> +----+---+-------+----+
> >>> >>>> >>>>
> >>> >>>> >>>>
> >>> >>>> >>>> Hopefully, I am clear with the problem above.
> >>> >>>> >>>>
> >>> >>>> >>>> Thanks,
> >>> >>>> >>>> Ravi
> >>> >>>> >>>>
> >>> >>>> >>>> On 20 June 2017 at 22:05, Ravi Kanth <
> [email protected]>
> >>> >>>> >>>> wrote:
> >>> >>>> >>>>>
> >>> >>>> >>>>> Antoni,
> >>> >>>> >>>>>
> >>> >>>> >>>>> Thanks for the suggestion. Let me have a look at it and
> >>> >>>> >>>>> hopefully
> >>> >>>> >>>>> we
> >>> >>>> >>>>> can use it in our use case.
> >>> >>>> >>>>>
> >>> >>>> >>>>> Thanks,
> >>> >>>> >>>>> Ravi
> >>> >>>> >>>>>
> >>> >>>> >>>>> On Tue, Jun 20, 2017 at 21:53 Antoni Ivanov
> >>> >>>> >>>>> <[email protected]>
> >>> >>>> >>>>> wrote:
> >>> >>>> >>>>>>
> >>> >>>> >>>>>> Hi Ravi,
> >>> >>>> >>>>>>
> >>> >>>> >>>>>> I am curious why you are not using already existing
> >>> >>>> >>>>>> last_value
> >>> >>>> >>>>>> function in Impala to get "latest non null value for the
> >>> >>>> >>>>>> column”
> >>> >>>> >>>>>>
> >>> >>>> >>>>>> e.g
> >>> >>>> >>>>>> last_value(column_a ignore nulls) over(partition by ID
> order
> >>> >>>> >>>>>> by
> >>> >>>> >>>>>> Date_Time)
> >>> >>>> >>>>>>
> >>> >>>> >>>>>> Thanks,
> >>> >>>> >>>>>> Antoni
> >>> >>>> >>>>>>
> >>> >>>> >>>>>>
> >>> >>>> >>>>>>
> >>> >>>> >>>>>>
> >>> >>>> >>>>>> On Jun 21, 2017, at 1:17 AM, Tim Armstrong
> >>> >>>> >>>>>> <[email protected]>
> >>> >>>> >>>>>> wrote:
> >>> >>>> >>>>>>
> >>> >>>> >>>>>> This was double-posted to
> >>> >>>> >>>>>>
> >>> >>>> >>>>>>
> >>> >>>> >>>>>> http://community.cloudera.com/t5/Interactive-Short-cycle-
> SQL/Creating-Impala-UDA/m-p/56201/highlight/false#M3073
> >>> >>>> >>>>>> also. I'll continue the discussion here.
> >>> >>>> >>>>>>
> >>> >>>> >>>>>> > Can we have the flexibility of declaring the variable
> >>> >>>> >>>>>> > globally
> >>> >>>> >>>>>> > in
> >>> >>>> >>>>>> > UDF? Globally, I mean outside the function?
> >>> >>>> >>>>>>
> >>> >>>> >>>>>> > And, the reason I am declaring a static variable is to
> >>> >>>> >>>>>> > restore
> >>> >>>> >>>>>> > the
> >>> >>>> >>>>>> > value of timestamp for every record so that I can
> perform a
> >>> >>>> >>>>>> > comparison of
> >>> >>>> >>>>>> > the timestamps. Is there an alternative approach for
> this?
> >>> >>>> >>>>>>
> >>> >>>> >>>>>> Updating a global or static variable in a UDAF is
> guaranteed
> >>> >>>> >>>>>> not
> >>> >>>> >>>>>> to do
> >>> >>>> >>>>>> what you expect - the function can be invoked concurrently
> by
> >>> >>>> >>>>>> multiple
> >>> >>>> >>>>>> threads.
> >>> >>>> >>>>>>
> >>> >>>> >>>>>> It seems like you probably want to store some additional
> >>> >>>> >>>>>> state in
> >>> >>>> >>>>>> the
> >>> >>>> >>>>>> intermediate value. There are some sample UDAs here (see
> >>> >>>> >>>>>> Avg())
> >>> >>>> >>>>>> where
> >>> >>>> >>>>>> additional intermediate state is stored in a StringVal:
> >>> >>>> >>>>>>
> >>> >>>> >>>>>>
> >>> >>>> >>>>>> https://github.com/cloudera/impala-udf-samples/blob/
> master/uda-sample.cc#L61
> >>> >>>> >>>>>>
> >>> >>>> >>>>>>
> >>> >>>> >>>>>> On Tue, Jun 20, 2017 at 2:40 PM, Ravi Kanth
> >>> >>>> >>>>>> <[email protected]>
> >>> >>>> >>>>>> wrote:
> >>> >>>> >>>>>>>
> >>> >>>> >>>>>>> Thanks Bharath. Can you check if the logic I am
> implementing
> >>> >>>> >>>>>>> is
> >>> >>>> >>>>>>> correct or needed any modification in it as well? I am
> very
> >>> >>>> >>>>>>> new
> >>> >>>> >>>>>>> to Impala
> >>> >>>> >>>>>>> UDF & C++ and having some hard time figuring out the
> >>> >>>> >>>>>>> problems.
> >>> >>>> >>>>>>>
> >>> >>>> >>>>>>> On 20 June 2017 at 14:27, Bharath Vissapragada
> >>> >>>> >>>>>>> <[email protected]> wrote:
> >>> >>>> >>>>>>>>
> >>> >>>> >>>>>>>> You need to allocate memory for tsTemp, else it can
> >>> >>>> >>>>>>>> segfault.
> >>> >>>> >>>>>>>> That
> >>> >>>> >>>>>>>> could be the issue here.
> >>> >>>> >>>>>>>>
> >>> >>>> >>>>>>>>  static TimestampVal* tsTemp;
> >>> >>>> >>>>>>>>       tsTemp->date = 0;
> >>> >>>> >>>>>>>>       tsTemp->time_of_day = 0;
> >>> >>>> >>>>>>>>
> >>> >>>> >>>>>>>>
> >>> >>>> >>>>>>>> On Tue, Jun 20, 2017 at 2:15 PM, Ravi Kanth
> >>> >>>> >>>>>>>> <[email protected]> wrote:
> >>> >>>> >>>>>>>>>
> >>> >>>> >>>>>>>>> Hi All,
> >>> >>>> >>>>>>>>> We are using Impala to do various processings in our
> >>> >>>> >>>>>>>>> systems.
> >>> >>>> >>>>>>>>> We
> >>> >>>> >>>>>>>>> have a requirement recently, wherein we have to handle
> the
> >>> >>>> >>>>>>>>> updates on the
> >>> >>>> >>>>>>>>> events i.e, we have an 'e_update' table which has the
> >>> >>>> >>>>>>>>> partial
> >>> >>>> >>>>>>>>> updates
> >>> >>>> >>>>>>>>> received for various events. The fields that are not
> >>> >>>> >>>>>>>>> updated
> >>> >>>> >>>>>>>>> are being
> >>> >>>> >>>>>>>>> stored as NULL values.
> >>> >>>> >>>>>>>>>
> >>> >>>> >>>>>>>>>
> >>> >>>> >>>>>>>>>
> >>> >>>> >>>>>>>>> Ex:
> >>> >>>> >>>>>>>>>
> >>> >>>> >>>>>>>>>
> >>> >>>> >>>>>>>>>
> >>> >>>> >>>>>>>>> ID (Int) Date_Time (timestamp) A (Int) B (String) C
> >>> >>>> >>>>>>>>> (String)
> >>> >>>> >>>>>>>>> 1 0 1 NULL NULL
> >>> >>>> >>>>>>>>> 1 1 2 Hi NULL
> >>> >>>> >>>>>>>>> 1 3 4 Hello Hi
> >>> >>>> >>>>>>>>> 1 2 5 NULL NULL
> >>> >>>> >>>>>>>>> 1 4 NULL NULL Zero
> >>> >>>> >>>>>>>>>
> >>> >>>> >>>>>>>>>
> >>> >>>> >>>>>>>>>
> >>> >>>> >>>>>>>>> P.S: Please consider Date_time as valid timestamp type
> >>> >>>> >>>>>>>>> values.
> >>> >>>> >>>>>>>>> For
> >>> >>>> >>>>>>>>> easy understanding, mentioned them as 0,1,2,3,4,5
> >>> >>>> >>>>>>>>>
> >>> >>>> >>>>>>>>>
> >>> >>>> >>>>>>>>>
> >>> >>>> >>>>>>>>> As seen in the above table, the events have a unique id
> >>> >>>> >>>>>>>>> and as
> >>> >>>> >>>>>>>>> we
> >>> >>>> >>>>>>>>> get an update to a particular event, we are storing the
> >>> >>>> >>>>>>>>> date_time at which
> >>> >>>> >>>>>>>>> update has happened and also storing the partial updated
> >>> >>>> >>>>>>>>> values. Apart from
> >>> >>>> >>>>>>>>> the updated values, the rest are stored as NULL values.
> >>> >>>> >>>>>>>>>
> >>> >>>> >>>>>>>>>
> >>> >>>> >>>>>>>>>
> >>> >>>> >>>>>>>>> We are planning to mimic inplace updates on the table,
> so
> >>> >>>> >>>>>>>>> that
> >>> >>>> >>>>>>>>> it
> >>> >>>> >>>>>>>>> would retrieve the resulting table as follows using the
> >>> >>>> >>>>>>>>> query
> >>> >>>> >>>>>>>>> below: We
> >>> >>>> >>>>>>>>> don't delete the data.
> >>> >>>> >>>>>>>>>
> >>> >>>> >>>>>>>>>
> >>> >>>> >>>>>>>>>
> >>> >>>> >>>>>>>>> > SELECT id, current_val(A,date_time) as A,
> >>> >>>> >>>>>>>>> > current_val(B,date_time) as B,
> current_val(C,date_time)
> >>> >>>> >>>>>>>>> > as C
> >>> >>>> >>>>>>>>> > from e_update
> >>> >>>> >>>>>>>>> > GROUP BY ID;
> >>> >>>> >>>>>>>>>
> >>> >>>> >>>>>>>>>
> >>> >>>> >>>>>>>>>
> >>> >>>> >>>>>>>>> where, current_val is a custom impala UDA we are
> planning
> >>> >>>> >>>>>>>>> to
> >>> >>>> >>>>>>>>> implement. i.e. get latest non null value for the
> column.
> >>> >>>> >>>>>>>>>
> >>> >>>> >>>>>>>>>
> >>> >>>> >>>>>>>>>
> >>> >>>> >>>>>>>>> ID (Int) A (Int) B (String) C (String)
> >>> >>>> >>>>>>>>> 1 4 Hello Zero
> >>> >>>> >>>>>>>>>
> >>> >>>> >>>>>>>>>
> >>> >>>> >>>>>>>>>
> >>> >>>> >>>>>>>>>
> >>> >>>> >>>>>>>>>
> >>> >>>> >>>>>>>>> Implemented current_val UDA:
> >>> >>>> >>>>>>>>> The below code is only for int type inputs:
> >>> >>>> >>>>>>>>>
> >>> >>>> >>>>>>>>>
> >>> >>>> >>>>>>>>>
> >>> >>>> >>>>>>>>> uda-currentval.h
> >>> >>>> >>>>>>>>>
> >>> >>>> >>>>>>>>> //This is a sample for retrieving the current value of
> >>> >>>> >>>>>>>>> e_update
> >>> >>>> >>>>>>>>> table
> >>> >>>> >>>>>>>>> //
> >>> >>>> >>>>>>>>> void CurrentValueInit(FunctionContext* context, IntVal*
> >>> >>>> >>>>>>>>> val);
> >>> >>>> >>>>>>>>> void CurrentValueUpdate(FunctionContext* context, const
> >>> >>>> >>>>>>>>> IntVal&
> >>> >>>> >>>>>>>>> input, const TimestampVal& ts, IntVal* val);
> >>> >>>> >>>>>>>>> void CurrentValueMerge(FunctionContext* context, const
> >>> >>>> >>>>>>>>> IntVal&
> >>> >>>> >>>>>>>>> src,
> >>> >>>> >>>>>>>>> IntVal* dst);
> >>> >>>> >>>>>>>>> IntVal CurrentValueFinalize(FunctionContext* context,
> >>> >>>> >>>>>>>>> const
> >>> >>>> >>>>>>>>> IntVal&
> >>> >>>> >>>>>>>>> val);
> >>> >>>> >>>>>>>>>
> >>> >>>> >>>>>>>>> uda-currentval.cc
> >>> >>>> >>>>>>>>>
> >>> >>>> >>>>>>>>> //
> >>> >>>> >>>>>>>>>
> >>> >>>> >>>>>>>>>
> >>> >>>> >>>>>>>>> ------------------------------
> -----------------------------------------------------------------
> >>> >>>> >>>>>>>>> // This is a sample for retrieving the current value of
> >>> >>>> >>>>>>>>> e_update
> >>> >>>> >>>>>>>>> table
> >>> >>>> >>>>>>>>>
> >>> >>>> >>>>>>>>>
> >>> >>>> >>>>>>>>>
> >>> >>>> >>>>>>>>> //----------------------------
> -------------------------------------------------------------------
> >>> >>>> >>>>>>>>> void CurrentValueInit(FunctionContext* context, IntVal*
> >>> >>>> >>>>>>>>> val) {
> >>> >>>> >>>>>>>>>       val->is_null = false;
> >>> >>>> >>>>>>>>>       val->val = 0;
> >>> >>>> >>>>>>>>> }
> >>> >>>> >>>>>>>>>
> >>> >>>> >>>>>>>>> void CurrentValueUpdate(FunctionContext* context, const
> >>> >>>> >>>>>>>>> IntVal&
> >>> >>>> >>>>>>>>> input, const TimestampVal& ts, IntVal* val) {
> >>> >>>> >>>>>>>>>       static TimestampVal* tsTemp;
> >>> >>>> >>>>>>>>>       tsTemp->date = 0;
> >>> >>>> >>>>>>>>>       tsTemp->time_of_day = 0;
> >>> >>>> >>>>>>>>>       if(tsTemp->date==0 && tsTemp->time_of_day==0){
> >>> >>>> >>>>>>>>>         tsTemp->date = ts.date;
> >>> >>>> >>>>>>>>>         tsTemp->time_of_day = ts.time_of_day;
> >>> >>>> >>>>>>>>>         val->val = input.val;
> >>> >>>> >>>>>>>>>         return;
> >>> >>>> >>>>>>>>>       }
> >>> >>>> >>>>>>>>>       if(ts.date > tsTemp->date && ts.time_of_day >
> >>> >>>> >>>>>>>>> tsTemp->time_of_day){
> >>> >>>> >>>>>>>>>         tsTemp->date = ts.date;
> >>> >>>> >>>>>>>>>         tsTemp->time_of_day = ts.time_of_day;
> >>> >>>> >>>>>>>>>         val->val = input.val;
> >>> >>>> >>>>>>>>>         return;
> >>> >>>> >>>>>>>>>       }
> >>> >>>> >>>>>>>>> }
> >>> >>>> >>>>>>>>>
> >>> >>>> >>>>>>>>> void CurrentValueMerge(FunctionContext* context, const
> >>> >>>> >>>>>>>>> IntVal&
> >>> >>>> >>>>>>>>> src,
> >>> >>>> >>>>>>>>> IntVal* dst) {
> >>> >>>> >>>>>>>>>      dst->val += src.val;
> >>> >>>> >>>>>>>>> }
> >>> >>>> >>>>>>>>>
> >>> >>>> >>>>>>>>> IntVal CurrentValueFinalize(FunctionContext* context,
> >>> >>>> >>>>>>>>> const
> >>> >>>> >>>>>>>>> IntVal&
> >>> >>>> >>>>>>>>> val) {
> >>> >>>> >>>>>>>>>      return val;
> >>> >>>> >>>>>>>>> }
> >>> >>>> >>>>>>>>>
> >>> >>>> >>>>>>>>>
> >>> >>>> >>>>>>>>>
> >>> >>>> >>>>>>>>> We are able to build and create an aggregate function in
> >>> >>>> >>>>>>>>> impala,
> >>> >>>> >>>>>>>>> but when trying to run the select query similar to the
> one
> >>> >>>> >>>>>>>>> above, it is
> >>> >>>> >>>>>>>>> bringing down couple of impala deamons and throwing the
> >>> >>>> >>>>>>>>> error
> >>> >>>> >>>>>>>>> below and
> >>> >>>> >>>>>>>>> getting terminated.
> >>> >>>> >>>>>>>>>
> >>> >>>> >>>>>>>>>
> >>> >>>> >>>>>>>>>
> >>> >>>> >>>>>>>>> WARNINGS: Cancelled due to unreachable impalad(s):
> >>> >>>> >>>>>>>>> hadoop102.**.**.**.com:22000
> >>> >>>> >>>>>>>>>
> >>> >>>> >>>>>>>>>
> >>> >>>> >>>>>>>>>
> >>> >>>> >>>>>>>>>
> >>> >>>> >>>>>>>>>
> >>> >>>> >>>>>>>>> We have impalad running on 14 instances.
> >>> >>>> >>>>>>>>>
> >>> >>>> >>>>>>>>>
> >>> >>>> >>>>>>>>>
> >>> >>>> >>>>>>>>> Can someone help resolve us this problem and a better
> way
> >>> >>>> >>>>>>>>> to
> >>> >>>> >>>>>>>>> achieve a solution for the scenario explained.
> >>> >>>> >>>>>>>>
> >>> >>>> >>>>>>>>
> >>> >>>> >>>>>>>
> >>> >>>> >>>>>>
> >>> >>>> >>>>>>
> >>> >>>> >>>>
> >>> >>>> >>
> >>> >>>> >
> >>
> >>
> >
>

Reply via email to