Thanks for that Matthew. Not to do anything against your suggestion but, I just wanted to try impala custom UDA and see if it might give us any expected result. If not this, I would learn to write our own custom UDA to solve problems other than this.
I am facing issues when trying to return any return type other than StringVal in the finalize method of UDA. Its throwing error when registering the handlers with impala and creating a function. ERROR: AnalysisException: Could not find function CurrentValOneUpdate(INT, TIMESTAMP) returns INT in: hdfs://**************com:8020/dwh/impala/udf/libudasample.so Check that function name, arguments, and return type are correct. The above works fine when returning StringVal type. Impala version using: Impala Shell v2.5.0-cdh5.7.0 Waiting for your suggestion Thanks, Ravi On 22 June 2017 at 11:58, Matthew Jacobs <[email protected]> wrote: > I haven't looked at the code, but this isn't going to solve your > earlier issue because this is an aggregation function, this will not > work for analytic functions. I was saying in an earlier email that we > don't expose an interface for registering analytic functions, so you > should probably just upgrade Impala. I posted the patch of the > analytic function we added to illustrate that it will be difficult and > not a matter of registering a UDA. > > On Thu, Jun 22, 2017 at 11:25 AM, Ravi Kanth <[email protected]> > wrote: > > Hi All, > > > > I wrote the below lines of code to achieve this functionality: > > > > // Copyright 2012 Cloudera Inc. > > // > > // Licensed under the Apache License, Version 2.0 (the "License"); > > // you may not use this file except in compliance with the License. > > // You may obtain a copy of the License at > > // > > // http://www.apache.org/licenses/LICENSE-2.0 > > // > > // Unless required by applicable law or agreed to in writing, software > > // distributed under the License is distributed on an "AS IS" BASIS, > > // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or > implied. > > // See the License for the specific language governing permissions and > > // limitations under the License. > > > > #include "uda-sample.h" > > #include <assert.h> > > #include <sstream> > > > > using namespace impala_udf; > > using namespace std; > > > > > > StringVal ToStringVal(FunctionContext* context, const IntVal& val) { > > stringstream ss; > > ss << val.val; > > string str = ss.str(); > > StringVal string_val(context, str.size()); > > memcpy(string_val.ptr, str.c_str(), str.size()); > > return string_val; > > } > > > > // > > ------------------------------------------------------------ > --------------------------- > > // // This is an aggregate function for retrieving the latest non-null > value > > for e_update table > > // // > > ------------------------------------------------------------ > --------------------------- > > > > struct CurrentValStruct { > > IntVal value; > > TimestampVal tsTemp; > > }; > > > > // Initialize the StringVal intermediate to a zero'd AvgStruct > > void CurrentValInit(FunctionContext* context, StringVal* val) { > > val->is_null = false; > > val->len = sizeof(CurrentValStruct); > > val->ptr = context->Allocate(val->len); > > memset(val->ptr, 0, val->len); > > CurrentValStruct* cur = reinterpret_cast<CurrentValStruct*>(val->ptr); > > cur->value.is_null = false; > > cur->tsTemp.is_null = false; > > } > > > > void CurrentValUpdate(FunctionContext* context, const IntVal& input, > const > > TimestampVal& ts, StringVal* val) { > > assert(!val->is_null); > > assert(val->len == sizeof(CurrentValStruct)); > > CurrentValStruct* cur = reinterpret_cast<CurrentValStruct*>(val->ptr); > > //checking if the incoming input value is null > > if(!input.is_null){ > > if(ts.date >= cur->tsTemp.date && ts.time_of_day > > > cur->tsTemp.time_of_day){ > > cur->value = input.val; > > cur->tsTemp.date = ts.date; > > cur->tsTemp.time_of_day = ts.time_of_day; > > } > > } > > } > > > > void CurrentValMerge(FunctionContext* context, const StringVal& src, > > StringVal* dst) { > > if (src.is_null) return; > > const CurrentValStruct* src_cur = reinterpret_cast<const > > CurrentValStruct*>(src.ptr); > > CurrentValStruct* dst_cur = reinterpret_cast< > CurrentValStruct*>(dst->ptr); > > if(dst_cur->tsTemp.is_null){ > > dst_cur->value = src_cur->value; > > dst_cur->tsTemp.date = src_cur->tsTemp.date; > > dst_cur->tsTemp.time_of_day = src_cur->tsTemp.time_of_day; > > dst_cur->tsTemp.is_null = false; > > dst_cur->value.is_null = false; > > } > > else{ > > if(src_cur->tsTemp.date >= dst_cur->tsTemp.date && > > src_cur->tsTemp.time_of_day > dst_cur->tsTemp.time_of_day){ > > dst_cur->value = src_cur->value; > > dst_cur->tsTemp.date = src_cur->tsTemp.date; > > dst_cur->tsTemp.time_of_day = src_cur->tsTemp.time_of_day; > > } > > } > > } > > > > StringVal CurrentValSerialize(FunctionContext* context, const StringVal& > > val) { > > assert(!val.is_null); > > StringVal result(context, val.len); > > memcpy(result.ptr, val.ptr, val.len); > > context->Free(val.ptr); > > return result; > > } > > > > StringVal CurrentValFinalize(FunctionContext* context, const StringVal& > val) > > { > > //IntVal intResult; > > assert(!val.is_null); > > assert(val.len == sizeof(CurrentValStruct)); > > CurrentValStruct* cur = reinterpret_cast<CurrentValStruct*>(val.ptr); > > StringVal result; > > if (cur->value == 0) { > > result = StringVal::null(); > > //intResult = 0; > > } else { > > // intResult = cur->value.val; > > // Copies the result to memory owned by Impala > > result = ToStringVal(context, cur->value.val); > > // intResult = atoi(result.c_str()); > > // std::istringstream(result) >> intResult; > > } > > context->Free(val.ptr); > > return result; > > } > > > > Queries: > > > > create aggregate function current_val(int,timestamp) returns string > location > > '/impala/udf/libudasample.so' init_fn='CurrentValInit' > > update_fn='CurrentValUpdate' merge_fn='CurrentValMerge' > > serialize_fn='CurrentValSerialize' finalize_fn='CurrentValFinalize'; > > > > > > select id, current_val(a,date_time) as a from udf_test GROUP BY id; > > > > > > The above is working fine and I am able to achieve my requirement. But, > is > > there any possibility that we can return an IntVal type rather than > > StringVal type? If so where can I make the changes? > > > > I tried changing the below: > > > > IntVal CurrentValSerialize(FunctionContext* context, const StringVal& > val) { > > > > assert(!val.is_null); > > > > StringVal result(context, val.len); > > > > memcpy(result.ptr, val.ptr, val.len); > > > > context->Free(val.ptr); > > > > IntVal intResult; > > > > CurrentValStruct* cur = reinterpret_cast<CurrentValStruct*>(val.ptr); > > > > intResult = cur->value.val; > > > > return intResult; > > > > } > > > > > > IntVal CurrentValFinalize(FunctionContext* context, const StringVal& > val) { > > > > IntVal intResult; > > > > assert(!val.is_null); > > > > assert(val.len == sizeof(CurrentValStruct)); > > > > CurrentValStruct* cur = reinterpret_cast<CurrentValStruct*>(val.ptr); > > > > //StringVal result; > > > > if (cur->value == 0) { > > > > //result = StringVal::null(); > > > > intResult = 0; > > > > } else { > > > > intResult = cur->value.val; > > > > // Copies the result to memory owned by Impala > > > > //result = ToStringVal(context, cur->value.val); > > > > // intResult = atoi(result.c_str()); > > > > // std::istringstream(result) >> intResult; > > > > } > > > > context->Free(val.ptr); > > > > return intResult; > > > > } > > > > > > But, when trying to create aggregate function I am facing, > > > > create aggregate function current_val(int,timestamp) returns int location > > '/impala/udf/libudasample.so' init_fn='CurrentValInit' > > update_fn='CurrentValUpdate' merge_fn='CurrentValMerge' > > serialize_fn='CurrentValSerialize' finalize_fn='CurrentValFinalize'; > > > > > > Query: create aggregate function current_val(int,timestamp) returns int > > location '/dwh/impala/udf/libudasample.so' init_fn='CurrentValInit' > > update_fn='CurrentValUpdate' merge_fn='CurrentValMerge' > > serialize_fn='CurrentValSerialize' finalize_fn='CurrentValFinalize' > > > > > > ERROR: AnalysisException: Could not find function CurrentValUpdate(INT, > > TIMESTAMP) returns INT in: > > hdfs://***************:8020/impala/udf/libudasample.so > > > > Check that function name, arguments, and return type are correct. > > > > > > I changed the header file function definition also accordingly. Can > someone > > suggest if I am missing something here? > > > > Thanks, > > Ravi > > > > On 21 June 2017 at 13:42, Ravi Kanth <[email protected]> wrote: > >> > >> Thanks for the suggestion Matthew. Let me look into the patch. I am > >> currently working on building a custom UDA. Hopefully the information > you > >> provided and the discussion we had might be useful to me. > >> > >> On 21 June 2017 at 13:39, Matthew Jacobs <[email protected]> wrote: > >>> > >>> I'd strongly recommend the latter (upgrading). We don't really expose > >>> the analytic function interface, so you'd end up writing an Impala > >>> patch, and analytic functions are particularly tricky. > >>> > >>> Here's Thomas' patch to add 'ignore nulls' in first/last value: > >>> https://gerrit.cloudera.org/#/c/3328/ > >>> > >>> On Wed, Jun 21, 2017 at 1:08 PM, Ravi Kanth <[email protected]> > >>> wrote: > >>> > Thanks All. I will think of a possible solution either by > implementing > >>> > a > >>> > Custom UDA or would update the version. > >>> > > >>> > On Wed, Jun 21, 2017 at 13:04 Thomas Tauber-Marshall > >>> > <[email protected]> wrote: > >>> >> > >>> >> On Wed, Jun 21, 2017 at 2:33 PM Ravi Kanth <[email protected] > > > >>> >> wrote: > >>> >>> > >>> >>> Ya. I agree with you Thomas. Probably that's what I'm doing wrong. > >>> >>> > >>> >>> Unfortunately, as mentioned the version of impala we are using I > >>> >>> belive > >>> >>> it doesn't support ignore nulls. > >>> >>> > >>> >>> But, my question is would last_value function retrieve a latest not > >>> >>> null > >>> >>> value irrespective of using ignore nulls? > >>> >> > >>> >> > >>> >> Not sure I follow - if you use last_value without ignore nulls, > you'll > >>> >> get > >>> >> the latest value taking all values into consideration, which may or > >>> >> may not > >>> >> be null. > >>> >> > >>> >>> > >>> >>> > >>> >>> Ravi > >>> >>> > >>> >>> On Wed, Jun 21, 2017 at 12:23 Matthew Jacobs <[email protected]> > wrote: > >>> >>>> > >>> >>>> Ah I think Thomas is right. I read the expected results and the > >>> >>>> query > >>> >>>> too quickly, so my comment about the asc/desc is probably wrong. > >>> >>>> Clearly my point about analytic functions being tricky holds true > :) > >>> >>>> > >>> >>>> On Wed, Jun 21, 2017 at 12:12 PM, Thomas Tauber-Marshall > >>> >>>> <[email protected]> wrote: > >>> >>>> > > >>> >>>> > > >>> >>>> > On Wed, Jun 21, 2017 at 1:52 PM Ravi Kanth > >>> >>>> > <[email protected]> > >>> >>>> > wrote: > >>> >>>> >> > >>> >>>> >> Thomas, > >>> >>>> >> > >>> >>>> >> The version of Impala we are using is 2.5.0 with CDH 5.7.0 and > I > >>> >>>> >> see > >>> >>>> >> ignore nulls has been added in Impala 2.7.0. And, does adding > >>> >>>> >> ignore > >>> >>>> >> nulls > >>> >>>> >> would make a big difference in the expected result? > >>> >>>> > > >>> >>>> > > >>> >>>> > That's too bad. I think that 'ignore nulls' would give you what > >>> >>>> > you > >>> >>>> > want - > >>> >>>> > the problem with the query that you posted is that it eliminates > >>> >>>> > rows > >>> >>>> > that > >>> >>>> > don't match the where clause, so for example the row with "Zero" > >>> >>>> > in it > >>> >>>> > is > >>> >>>> > eliminated because it is filtered out by the "where a is not > >>> >>>> > null", > >>> >>>> > whereas > >>> >>>> > "ignore nulls" only affects the values that could be returned by > >>> >>>> > the > >>> >>>> > specific analytic function that the ignore is applied to. > >>> >>>> > > >>> >>>> >> > >>> >>>> >> > >>> >>>> >> Ravi > >>> >>>> >> > >>> >>>> >> On 21 June 2017 at 11:20, Thomas Tauber-Marshall > >>> >>>> >> <[email protected]> > >>> >>>> >> wrote: > >>> >>>> >>> > >>> >>>> >>> Ravi, > >>> >>>> >>> > >>> >>>> >>> Instead of using the "where ... is not null", have you tried > >>> >>>> >>> 'last_value(... ignore nulls)' > >>> >>>> >>> > >>> >>>> >>> On Wed, Jun 21, 2017 at 1:08 PM Ravi Kanth > >>> >>>> >>> <[email protected]> > >>> >>>> >>> wrote: > >>> >>>> >>>> > >>> >>>> >>>> Antoni, > >>> >>>> >>>> > >>> >>>> >>>> The problem in using last_value function() as far as I > observed > >>> >>>> >>>> is, > >>> >>>> >>>> if I > >>> >>>> >>>> use it on multiple columns in a single query, its not > >>> >>>> >>>> retrieving > >>> >>>> >>>> results as > >>> >>>> >>>> expected. > >>> >>>> >>>> > >>> >>>> >>>> Input: > >>> >>>> >>>> > >>> >>>> >>>> ID (Int)Date_Time (timestamp)A (Int)B (String)C (String) > >>> >>>> >>>> 101NULLNULL > >>> >>>> >>>> 112HiNULL > >>> >>>> >>>> 134HelloHi > >>> >>>> >>>> 125NULLNULL > >>> >>>> >>>> 14NULLNULLZero > >>> >>>> >>>> > >>> >>>> >>>> Expected Output: > >>> >>>> >>>> > >>> >>>> >>>> > >>> >>>> >>>> > >>> >>>> >>>> ID (Int)A (Int)B (String)C (String) > >>> >>>> >>>> 14HelloZero > >>> >>>> >>>> > >>> >>>> >>>> > >>> >>>> >>>> Query executed: > >>> >>>> >>>> > >>> >>>> >>>> select id, last_value(a) over(partition by id order by > >>> >>>> >>>> date_time > >>> >>>> >>>> desc) > >>> >>>> >>>> as a, last_value(b) over(partition by id order by date_time > >>> >>>> >>>> desc) > >>> >>>> >>>> as b, > >>> >>>> >>>> last_value(c) over(partition by id order by date_time desc) > as > >>> >>>> >>>> c > >>> >>>> >>>> from > >>> >>>> >>>> udf_test where a is not null and b is not null and c is not > >>> >>>> >>>> null; > >>> >>>> >>>> > >>> >>>> >>>> > >>> >>>> >>>> > >>> >>>> >>>> Output I am getting: > >>> >>>> >>>> > >>> >>>> >>>> +----+---+-------+----+ > >>> >>>> >>>> > >>> >>>> >>>> | id | a | b | c | > >>> >>>> >>>> > >>> >>>> >>>> +----+---+-------+----+ > >>> >>>> >>>> > >>> >>>> >>>> | 1 | 4 | Hello | Hi || > >>> >>>> >>>> > >>> >>>> >>>> +----+---+-------+----+ > >>> >>>> >>>> > >>> >>>> >>>> > >>> >>>> >>>> Hopefully, I am clear with the problem above. > >>> >>>> >>>> > >>> >>>> >>>> Thanks, > >>> >>>> >>>> Ravi > >>> >>>> >>>> > >>> >>>> >>>> On 20 June 2017 at 22:05, Ravi Kanth < > [email protected]> > >>> >>>> >>>> wrote: > >>> >>>> >>>>> > >>> >>>> >>>>> Antoni, > >>> >>>> >>>>> > >>> >>>> >>>>> Thanks for the suggestion. Let me have a look at it and > >>> >>>> >>>>> hopefully > >>> >>>> >>>>> we > >>> >>>> >>>>> can use it in our use case. > >>> >>>> >>>>> > >>> >>>> >>>>> Thanks, > >>> >>>> >>>>> Ravi > >>> >>>> >>>>> > >>> >>>> >>>>> On Tue, Jun 20, 2017 at 21:53 Antoni Ivanov > >>> >>>> >>>>> <[email protected]> > >>> >>>> >>>>> wrote: > >>> >>>> >>>>>> > >>> >>>> >>>>>> Hi Ravi, > >>> >>>> >>>>>> > >>> >>>> >>>>>> I am curious why you are not using already existing > >>> >>>> >>>>>> last_value > >>> >>>> >>>>>> function in Impala to get "latest non null value for the > >>> >>>> >>>>>> column” > >>> >>>> >>>>>> > >>> >>>> >>>>>> e.g > >>> >>>> >>>>>> last_value(column_a ignore nulls) over(partition by ID > order > >>> >>>> >>>>>> by > >>> >>>> >>>>>> Date_Time) > >>> >>>> >>>>>> > >>> >>>> >>>>>> Thanks, > >>> >>>> >>>>>> Antoni > >>> >>>> >>>>>> > >>> >>>> >>>>>> > >>> >>>> >>>>>> > >>> >>>> >>>>>> > >>> >>>> >>>>>> On Jun 21, 2017, at 1:17 AM, Tim Armstrong > >>> >>>> >>>>>> <[email protected]> > >>> >>>> >>>>>> wrote: > >>> >>>> >>>>>> > >>> >>>> >>>>>> This was double-posted to > >>> >>>> >>>>>> > >>> >>>> >>>>>> > >>> >>>> >>>>>> http://community.cloudera.com/t5/Interactive-Short-cycle- > SQL/Creating-Impala-UDA/m-p/56201/highlight/false#M3073 > >>> >>>> >>>>>> also. I'll continue the discussion here. > >>> >>>> >>>>>> > >>> >>>> >>>>>> > Can we have the flexibility of declaring the variable > >>> >>>> >>>>>> > globally > >>> >>>> >>>>>> > in > >>> >>>> >>>>>> > UDF? Globally, I mean outside the function? > >>> >>>> >>>>>> > >>> >>>> >>>>>> > And, the reason I am declaring a static variable is to > >>> >>>> >>>>>> > restore > >>> >>>> >>>>>> > the > >>> >>>> >>>>>> > value of timestamp for every record so that I can > perform a > >>> >>>> >>>>>> > comparison of > >>> >>>> >>>>>> > the timestamps. Is there an alternative approach for > this? > >>> >>>> >>>>>> > >>> >>>> >>>>>> Updating a global or static variable in a UDAF is > guaranteed > >>> >>>> >>>>>> not > >>> >>>> >>>>>> to do > >>> >>>> >>>>>> what you expect - the function can be invoked concurrently > by > >>> >>>> >>>>>> multiple > >>> >>>> >>>>>> threads. > >>> >>>> >>>>>> > >>> >>>> >>>>>> It seems like you probably want to store some additional > >>> >>>> >>>>>> state in > >>> >>>> >>>>>> the > >>> >>>> >>>>>> intermediate value. There are some sample UDAs here (see > >>> >>>> >>>>>> Avg()) > >>> >>>> >>>>>> where > >>> >>>> >>>>>> additional intermediate state is stored in a StringVal: > >>> >>>> >>>>>> > >>> >>>> >>>>>> > >>> >>>> >>>>>> https://github.com/cloudera/impala-udf-samples/blob/ > master/uda-sample.cc#L61 > >>> >>>> >>>>>> > >>> >>>> >>>>>> > >>> >>>> >>>>>> On Tue, Jun 20, 2017 at 2:40 PM, Ravi Kanth > >>> >>>> >>>>>> <[email protected]> > >>> >>>> >>>>>> wrote: > >>> >>>> >>>>>>> > >>> >>>> >>>>>>> Thanks Bharath. Can you check if the logic I am > implementing > >>> >>>> >>>>>>> is > >>> >>>> >>>>>>> correct or needed any modification in it as well? I am > very > >>> >>>> >>>>>>> new > >>> >>>> >>>>>>> to Impala > >>> >>>> >>>>>>> UDF & C++ and having some hard time figuring out the > >>> >>>> >>>>>>> problems. > >>> >>>> >>>>>>> > >>> >>>> >>>>>>> On 20 June 2017 at 14:27, Bharath Vissapragada > >>> >>>> >>>>>>> <[email protected]> wrote: > >>> >>>> >>>>>>>> > >>> >>>> >>>>>>>> You need to allocate memory for tsTemp, else it can > >>> >>>> >>>>>>>> segfault. > >>> >>>> >>>>>>>> That > >>> >>>> >>>>>>>> could be the issue here. > >>> >>>> >>>>>>>> > >>> >>>> >>>>>>>> static TimestampVal* tsTemp; > >>> >>>> >>>>>>>> tsTemp->date = 0; > >>> >>>> >>>>>>>> tsTemp->time_of_day = 0; > >>> >>>> >>>>>>>> > >>> >>>> >>>>>>>> > >>> >>>> >>>>>>>> On Tue, Jun 20, 2017 at 2:15 PM, Ravi Kanth > >>> >>>> >>>>>>>> <[email protected]> wrote: > >>> >>>> >>>>>>>>> > >>> >>>> >>>>>>>>> Hi All, > >>> >>>> >>>>>>>>> We are using Impala to do various processings in our > >>> >>>> >>>>>>>>> systems. > >>> >>>> >>>>>>>>> We > >>> >>>> >>>>>>>>> have a requirement recently, wherein we have to handle > the > >>> >>>> >>>>>>>>> updates on the > >>> >>>> >>>>>>>>> events i.e, we have an 'e_update' table which has the > >>> >>>> >>>>>>>>> partial > >>> >>>> >>>>>>>>> updates > >>> >>>> >>>>>>>>> received for various events. The fields that are not > >>> >>>> >>>>>>>>> updated > >>> >>>> >>>>>>>>> are being > >>> >>>> >>>>>>>>> stored as NULL values. > >>> >>>> >>>>>>>>> > >>> >>>> >>>>>>>>> > >>> >>>> >>>>>>>>> > >>> >>>> >>>>>>>>> Ex: > >>> >>>> >>>>>>>>> > >>> >>>> >>>>>>>>> > >>> >>>> >>>>>>>>> > >>> >>>> >>>>>>>>> ID (Int) Date_Time (timestamp) A (Int) B (String) C > >>> >>>> >>>>>>>>> (String) > >>> >>>> >>>>>>>>> 1 0 1 NULL NULL > >>> >>>> >>>>>>>>> 1 1 2 Hi NULL > >>> >>>> >>>>>>>>> 1 3 4 Hello Hi > >>> >>>> >>>>>>>>> 1 2 5 NULL NULL > >>> >>>> >>>>>>>>> 1 4 NULL NULL Zero > >>> >>>> >>>>>>>>> > >>> >>>> >>>>>>>>> > >>> >>>> >>>>>>>>> > >>> >>>> >>>>>>>>> P.S: Please consider Date_time as valid timestamp type > >>> >>>> >>>>>>>>> values. > >>> >>>> >>>>>>>>> For > >>> >>>> >>>>>>>>> easy understanding, mentioned them as 0,1,2,3,4,5 > >>> >>>> >>>>>>>>> > >>> >>>> >>>>>>>>> > >>> >>>> >>>>>>>>> > >>> >>>> >>>>>>>>> As seen in the above table, the events have a unique id > >>> >>>> >>>>>>>>> and as > >>> >>>> >>>>>>>>> we > >>> >>>> >>>>>>>>> get an update to a particular event, we are storing the > >>> >>>> >>>>>>>>> date_time at which > >>> >>>> >>>>>>>>> update has happened and also storing the partial updated > >>> >>>> >>>>>>>>> values. Apart from > >>> >>>> >>>>>>>>> the updated values, the rest are stored as NULL values. > >>> >>>> >>>>>>>>> > >>> >>>> >>>>>>>>> > >>> >>>> >>>>>>>>> > >>> >>>> >>>>>>>>> We are planning to mimic inplace updates on the table, > so > >>> >>>> >>>>>>>>> that > >>> >>>> >>>>>>>>> it > >>> >>>> >>>>>>>>> would retrieve the resulting table as follows using the > >>> >>>> >>>>>>>>> query > >>> >>>> >>>>>>>>> below: We > >>> >>>> >>>>>>>>> don't delete the data. > >>> >>>> >>>>>>>>> > >>> >>>> >>>>>>>>> > >>> >>>> >>>>>>>>> > >>> >>>> >>>>>>>>> > SELECT id, current_val(A,date_time) as A, > >>> >>>> >>>>>>>>> > current_val(B,date_time) as B, > current_val(C,date_time) > >>> >>>> >>>>>>>>> > as C > >>> >>>> >>>>>>>>> > from e_update > >>> >>>> >>>>>>>>> > GROUP BY ID; > >>> >>>> >>>>>>>>> > >>> >>>> >>>>>>>>> > >>> >>>> >>>>>>>>> > >>> >>>> >>>>>>>>> where, current_val is a custom impala UDA we are > planning > >>> >>>> >>>>>>>>> to > >>> >>>> >>>>>>>>> implement. i.e. get latest non null value for the > column. > >>> >>>> >>>>>>>>> > >>> >>>> >>>>>>>>> > >>> >>>> >>>>>>>>> > >>> >>>> >>>>>>>>> ID (Int) A (Int) B (String) C (String) > >>> >>>> >>>>>>>>> 1 4 Hello Zero > >>> >>>> >>>>>>>>> > >>> >>>> >>>>>>>>> > >>> >>>> >>>>>>>>> > >>> >>>> >>>>>>>>> > >>> >>>> >>>>>>>>> > >>> >>>> >>>>>>>>> Implemented current_val UDA: > >>> >>>> >>>>>>>>> The below code is only for int type inputs: > >>> >>>> >>>>>>>>> > >>> >>>> >>>>>>>>> > >>> >>>> >>>>>>>>> > >>> >>>> >>>>>>>>> uda-currentval.h > >>> >>>> >>>>>>>>> > >>> >>>> >>>>>>>>> //This is a sample for retrieving the current value of > >>> >>>> >>>>>>>>> e_update > >>> >>>> >>>>>>>>> table > >>> >>>> >>>>>>>>> // > >>> >>>> >>>>>>>>> void CurrentValueInit(FunctionContext* context, IntVal* > >>> >>>> >>>>>>>>> val); > >>> >>>> >>>>>>>>> void CurrentValueUpdate(FunctionContext* context, const > >>> >>>> >>>>>>>>> IntVal& > >>> >>>> >>>>>>>>> input, const TimestampVal& ts, IntVal* val); > >>> >>>> >>>>>>>>> void CurrentValueMerge(FunctionContext* context, const > >>> >>>> >>>>>>>>> IntVal& > >>> >>>> >>>>>>>>> src, > >>> >>>> >>>>>>>>> IntVal* dst); > >>> >>>> >>>>>>>>> IntVal CurrentValueFinalize(FunctionContext* context, > >>> >>>> >>>>>>>>> const > >>> >>>> >>>>>>>>> IntVal& > >>> >>>> >>>>>>>>> val); > >>> >>>> >>>>>>>>> > >>> >>>> >>>>>>>>> uda-currentval.cc > >>> >>>> >>>>>>>>> > >>> >>>> >>>>>>>>> // > >>> >>>> >>>>>>>>> > >>> >>>> >>>>>>>>> > >>> >>>> >>>>>>>>> ------------------------------ > ----------------------------------------------------------------- > >>> >>>> >>>>>>>>> // This is a sample for retrieving the current value of > >>> >>>> >>>>>>>>> e_update > >>> >>>> >>>>>>>>> table > >>> >>>> >>>>>>>>> > >>> >>>> >>>>>>>>> > >>> >>>> >>>>>>>>> > >>> >>>> >>>>>>>>> //---------------------------- > ------------------------------------------------------------------- > >>> >>>> >>>>>>>>> void CurrentValueInit(FunctionContext* context, IntVal* > >>> >>>> >>>>>>>>> val) { > >>> >>>> >>>>>>>>> val->is_null = false; > >>> >>>> >>>>>>>>> val->val = 0; > >>> >>>> >>>>>>>>> } > >>> >>>> >>>>>>>>> > >>> >>>> >>>>>>>>> void CurrentValueUpdate(FunctionContext* context, const > >>> >>>> >>>>>>>>> IntVal& > >>> >>>> >>>>>>>>> input, const TimestampVal& ts, IntVal* val) { > >>> >>>> >>>>>>>>> static TimestampVal* tsTemp; > >>> >>>> >>>>>>>>> tsTemp->date = 0; > >>> >>>> >>>>>>>>> tsTemp->time_of_day = 0; > >>> >>>> >>>>>>>>> if(tsTemp->date==0 && tsTemp->time_of_day==0){ > >>> >>>> >>>>>>>>> tsTemp->date = ts.date; > >>> >>>> >>>>>>>>> tsTemp->time_of_day = ts.time_of_day; > >>> >>>> >>>>>>>>> val->val = input.val; > >>> >>>> >>>>>>>>> return; > >>> >>>> >>>>>>>>> } > >>> >>>> >>>>>>>>> if(ts.date > tsTemp->date && ts.time_of_day > > >>> >>>> >>>>>>>>> tsTemp->time_of_day){ > >>> >>>> >>>>>>>>> tsTemp->date = ts.date; > >>> >>>> >>>>>>>>> tsTemp->time_of_day = ts.time_of_day; > >>> >>>> >>>>>>>>> val->val = input.val; > >>> >>>> >>>>>>>>> return; > >>> >>>> >>>>>>>>> } > >>> >>>> >>>>>>>>> } > >>> >>>> >>>>>>>>> > >>> >>>> >>>>>>>>> void CurrentValueMerge(FunctionContext* context, const > >>> >>>> >>>>>>>>> IntVal& > >>> >>>> >>>>>>>>> src, > >>> >>>> >>>>>>>>> IntVal* dst) { > >>> >>>> >>>>>>>>> dst->val += src.val; > >>> >>>> >>>>>>>>> } > >>> >>>> >>>>>>>>> > >>> >>>> >>>>>>>>> IntVal CurrentValueFinalize(FunctionContext* context, > >>> >>>> >>>>>>>>> const > >>> >>>> >>>>>>>>> IntVal& > >>> >>>> >>>>>>>>> val) { > >>> >>>> >>>>>>>>> return val; > >>> >>>> >>>>>>>>> } > >>> >>>> >>>>>>>>> > >>> >>>> >>>>>>>>> > >>> >>>> >>>>>>>>> > >>> >>>> >>>>>>>>> We are able to build and create an aggregate function in > >>> >>>> >>>>>>>>> impala, > >>> >>>> >>>>>>>>> but when trying to run the select query similar to the > one > >>> >>>> >>>>>>>>> above, it is > >>> >>>> >>>>>>>>> bringing down couple of impala deamons and throwing the > >>> >>>> >>>>>>>>> error > >>> >>>> >>>>>>>>> below and > >>> >>>> >>>>>>>>> getting terminated. > >>> >>>> >>>>>>>>> > >>> >>>> >>>>>>>>> > >>> >>>> >>>>>>>>> > >>> >>>> >>>>>>>>> WARNINGS: Cancelled due to unreachable impalad(s): > >>> >>>> >>>>>>>>> hadoop102.**.**.**.com:22000 > >>> >>>> >>>>>>>>> > >>> >>>> >>>>>>>>> > >>> >>>> >>>>>>>>> > >>> >>>> >>>>>>>>> > >>> >>>> >>>>>>>>> > >>> >>>> >>>>>>>>> We have impalad running on 14 instances. > >>> >>>> >>>>>>>>> > >>> >>>> >>>>>>>>> > >>> >>>> >>>>>>>>> > >>> >>>> >>>>>>>>> Can someone help resolve us this problem and a better > way > >>> >>>> >>>>>>>>> to > >>> >>>> >>>>>>>>> achieve a solution for the scenario explained. > >>> >>>> >>>>>>>> > >>> >>>> >>>>>>>> > >>> >>>> >>>>>>> > >>> >>>> >>>>>> > >>> >>>> >>>>>> > >>> >>>> >>>> > >>> >>>> >> > >>> >>>> > > >> > >> > > >
