I haven't looked at the code, but this isn't going to solve your earlier issue because this is an aggregation function, this will not work for analytic functions. I was saying in an earlier email that we don't expose an interface for registering analytic functions, so you should probably just upgrade Impala. I posted the patch of the analytic function we added to illustrate that it will be difficult and not a matter of registering a UDA.
On Thu, Jun 22, 2017 at 11:25 AM, Ravi Kanth <[email protected]> wrote: > Hi All, > > I wrote the below lines of code to achieve this functionality: > > // Copyright 2012 Cloudera Inc. > // > // Licensed under the Apache License, Version 2.0 (the "License"); > // you may not use this file except in compliance with the License. > // You may obtain a copy of the License at > // > // http://www.apache.org/licenses/LICENSE-2.0 > // > // Unless required by applicable law or agreed to in writing, software > // distributed under the License is distributed on an "AS IS" BASIS, > // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. > // See the License for the specific language governing permissions and > // limitations under the License. > > #include "uda-sample.h" > #include <assert.h> > #include <sstream> > > using namespace impala_udf; > using namespace std; > > > StringVal ToStringVal(FunctionContext* context, const IntVal& val) { > stringstream ss; > ss << val.val; > string str = ss.str(); > StringVal string_val(context, str.size()); > memcpy(string_val.ptr, str.c_str(), str.size()); > return string_val; > } > > // > --------------------------------------------------------------------------------------- > // // This is an aggregate function for retrieving the latest non-null value > for e_update table > // // > --------------------------------------------------------------------------------------- > > struct CurrentValStruct { > IntVal value; > TimestampVal tsTemp; > }; > > // Initialize the StringVal intermediate to a zero'd AvgStruct > void CurrentValInit(FunctionContext* context, StringVal* val) { > val->is_null = false; > val->len = sizeof(CurrentValStruct); > val->ptr = context->Allocate(val->len); > memset(val->ptr, 0, val->len); > CurrentValStruct* cur = reinterpret_cast<CurrentValStruct*>(val->ptr); > cur->value.is_null = false; > cur->tsTemp.is_null = false; > } > > void CurrentValUpdate(FunctionContext* context, const IntVal& input, const > TimestampVal& ts, StringVal* val) { > assert(!val->is_null); > assert(val->len == sizeof(CurrentValStruct)); > CurrentValStruct* cur = reinterpret_cast<CurrentValStruct*>(val->ptr); > //checking if the incoming input value is null > if(!input.is_null){ > if(ts.date >= cur->tsTemp.date && ts.time_of_day > > cur->tsTemp.time_of_day){ > cur->value = input.val; > cur->tsTemp.date = ts.date; > cur->tsTemp.time_of_day = ts.time_of_day; > } > } > } > > void CurrentValMerge(FunctionContext* context, const StringVal& src, > StringVal* dst) { > if (src.is_null) return; > const CurrentValStruct* src_cur = reinterpret_cast<const > CurrentValStruct*>(src.ptr); > CurrentValStruct* dst_cur = reinterpret_cast<CurrentValStruct*>(dst->ptr); > if(dst_cur->tsTemp.is_null){ > dst_cur->value = src_cur->value; > dst_cur->tsTemp.date = src_cur->tsTemp.date; > dst_cur->tsTemp.time_of_day = src_cur->tsTemp.time_of_day; > dst_cur->tsTemp.is_null = false; > dst_cur->value.is_null = false; > } > else{ > if(src_cur->tsTemp.date >= dst_cur->tsTemp.date && > src_cur->tsTemp.time_of_day > dst_cur->tsTemp.time_of_day){ > dst_cur->value = src_cur->value; > dst_cur->tsTemp.date = src_cur->tsTemp.date; > dst_cur->tsTemp.time_of_day = src_cur->tsTemp.time_of_day; > } > } > } > > StringVal CurrentValSerialize(FunctionContext* context, const StringVal& > val) { > assert(!val.is_null); > StringVal result(context, val.len); > memcpy(result.ptr, val.ptr, val.len); > context->Free(val.ptr); > return result; > } > > StringVal CurrentValFinalize(FunctionContext* context, const StringVal& val) > { > //IntVal intResult; > assert(!val.is_null); > assert(val.len == sizeof(CurrentValStruct)); > CurrentValStruct* cur = reinterpret_cast<CurrentValStruct*>(val.ptr); > StringVal result; > if (cur->value == 0) { > result = StringVal::null(); > //intResult = 0; > } else { > // intResult = cur->value.val; > // Copies the result to memory owned by Impala > result = ToStringVal(context, cur->value.val); > // intResult = atoi(result.c_str()); > // std::istringstream(result) >> intResult; > } > context->Free(val.ptr); > return result; > } > > Queries: > > create aggregate function current_val(int,timestamp) returns string location > '/impala/udf/libudasample.so' init_fn='CurrentValInit' > update_fn='CurrentValUpdate' merge_fn='CurrentValMerge' > serialize_fn='CurrentValSerialize' finalize_fn='CurrentValFinalize'; > > > select id, current_val(a,date_time) as a from udf_test GROUP BY id; > > > The above is working fine and I am able to achieve my requirement. But, is > there any possibility that we can return an IntVal type rather than > StringVal type? If so where can I make the changes? > > I tried changing the below: > > IntVal CurrentValSerialize(FunctionContext* context, const StringVal& val) { > > assert(!val.is_null); > > StringVal result(context, val.len); > > memcpy(result.ptr, val.ptr, val.len); > > context->Free(val.ptr); > > IntVal intResult; > > CurrentValStruct* cur = reinterpret_cast<CurrentValStruct*>(val.ptr); > > intResult = cur->value.val; > > return intResult; > > } > > > IntVal CurrentValFinalize(FunctionContext* context, const StringVal& val) { > > IntVal intResult; > > assert(!val.is_null); > > assert(val.len == sizeof(CurrentValStruct)); > > CurrentValStruct* cur = reinterpret_cast<CurrentValStruct*>(val.ptr); > > //StringVal result; > > if (cur->value == 0) { > > //result = StringVal::null(); > > intResult = 0; > > } else { > > intResult = cur->value.val; > > // Copies the result to memory owned by Impala > > //result = ToStringVal(context, cur->value.val); > > // intResult = atoi(result.c_str()); > > // std::istringstream(result) >> intResult; > > } > > context->Free(val.ptr); > > return intResult; > > } > > > But, when trying to create aggregate function I am facing, > > create aggregate function current_val(int,timestamp) returns int location > '/impala/udf/libudasample.so' init_fn='CurrentValInit' > update_fn='CurrentValUpdate' merge_fn='CurrentValMerge' > serialize_fn='CurrentValSerialize' finalize_fn='CurrentValFinalize'; > > > Query: create aggregate function current_val(int,timestamp) returns int > location '/dwh/impala/udf/libudasample.so' init_fn='CurrentValInit' > update_fn='CurrentValUpdate' merge_fn='CurrentValMerge' > serialize_fn='CurrentValSerialize' finalize_fn='CurrentValFinalize' > > > ERROR: AnalysisException: Could not find function CurrentValUpdate(INT, > TIMESTAMP) returns INT in: > hdfs://***************:8020/impala/udf/libudasample.so > > Check that function name, arguments, and return type are correct. > > > I changed the header file function definition also accordingly. Can someone > suggest if I am missing something here? > > Thanks, > Ravi > > On 21 June 2017 at 13:42, Ravi Kanth <[email protected]> wrote: >> >> Thanks for the suggestion Matthew. Let me look into the patch. I am >> currently working on building a custom UDA. Hopefully the information you >> provided and the discussion we had might be useful to me. >> >> On 21 June 2017 at 13:39, Matthew Jacobs <[email protected]> wrote: >>> >>> I'd strongly recommend the latter (upgrading). We don't really expose >>> the analytic function interface, so you'd end up writing an Impala >>> patch, and analytic functions are particularly tricky. >>> >>> Here's Thomas' patch to add 'ignore nulls' in first/last value: >>> https://gerrit.cloudera.org/#/c/3328/ >>> >>> On Wed, Jun 21, 2017 at 1:08 PM, Ravi Kanth <[email protected]> >>> wrote: >>> > Thanks All. I will think of a possible solution either by implementing >>> > a >>> > Custom UDA or would update the version. >>> > >>> > On Wed, Jun 21, 2017 at 13:04 Thomas Tauber-Marshall >>> > <[email protected]> wrote: >>> >> >>> >> On Wed, Jun 21, 2017 at 2:33 PM Ravi Kanth <[email protected]> >>> >> wrote: >>> >>> >>> >>> Ya. I agree with you Thomas. Probably that's what I'm doing wrong. >>> >>> >>> >>> Unfortunately, as mentioned the version of impala we are using I >>> >>> belive >>> >>> it doesn't support ignore nulls. >>> >>> >>> >>> But, my question is would last_value function retrieve a latest not >>> >>> null >>> >>> value irrespective of using ignore nulls? >>> >> >>> >> >>> >> Not sure I follow - if you use last_value without ignore nulls, you'll >>> >> get >>> >> the latest value taking all values into consideration, which may or >>> >> may not >>> >> be null. >>> >> >>> >>> >>> >>> >>> >>> Ravi >>> >>> >>> >>> On Wed, Jun 21, 2017 at 12:23 Matthew Jacobs <[email protected]> wrote: >>> >>>> >>> >>>> Ah I think Thomas is right. I read the expected results and the >>> >>>> query >>> >>>> too quickly, so my comment about the asc/desc is probably wrong. >>> >>>> Clearly my point about analytic functions being tricky holds true :) >>> >>>> >>> >>>> On Wed, Jun 21, 2017 at 12:12 PM, Thomas Tauber-Marshall >>> >>>> <[email protected]> wrote: >>> >>>> > >>> >>>> > >>> >>>> > On Wed, Jun 21, 2017 at 1:52 PM Ravi Kanth >>> >>>> > <[email protected]> >>> >>>> > wrote: >>> >>>> >> >>> >>>> >> Thomas, >>> >>>> >> >>> >>>> >> The version of Impala we are using is 2.5.0 with CDH 5.7.0 and I >>> >>>> >> see >>> >>>> >> ignore nulls has been added in Impala 2.7.0. And, does adding >>> >>>> >> ignore >>> >>>> >> nulls >>> >>>> >> would make a big difference in the expected result? >>> >>>> > >>> >>>> > >>> >>>> > That's too bad. I think that 'ignore nulls' would give you what >>> >>>> > you >>> >>>> > want - >>> >>>> > the problem with the query that you posted is that it eliminates >>> >>>> > rows >>> >>>> > that >>> >>>> > don't match the where clause, so for example the row with "Zero" >>> >>>> > in it >>> >>>> > is >>> >>>> > eliminated because it is filtered out by the "where a is not >>> >>>> > null", >>> >>>> > whereas >>> >>>> > "ignore nulls" only affects the values that could be returned by >>> >>>> > the >>> >>>> > specific analytic function that the ignore is applied to. >>> >>>> > >>> >>>> >> >>> >>>> >> >>> >>>> >> Ravi >>> >>>> >> >>> >>>> >> On 21 June 2017 at 11:20, Thomas Tauber-Marshall >>> >>>> >> <[email protected]> >>> >>>> >> wrote: >>> >>>> >>> >>> >>>> >>> Ravi, >>> >>>> >>> >>> >>>> >>> Instead of using the "where ... is not null", have you tried >>> >>>> >>> 'last_value(... ignore nulls)' >>> >>>> >>> >>> >>>> >>> On Wed, Jun 21, 2017 at 1:08 PM Ravi Kanth >>> >>>> >>> <[email protected]> >>> >>>> >>> wrote: >>> >>>> >>>> >>> >>>> >>>> Antoni, >>> >>>> >>>> >>> >>>> >>>> The problem in using last_value function() as far as I observed >>> >>>> >>>> is, >>> >>>> >>>> if I >>> >>>> >>>> use it on multiple columns in a single query, its not >>> >>>> >>>> retrieving >>> >>>> >>>> results as >>> >>>> >>>> expected. >>> >>>> >>>> >>> >>>> >>>> Input: >>> >>>> >>>> >>> >>>> >>>> ID (Int)Date_Time (timestamp)A (Int)B (String)C (String) >>> >>>> >>>> 101NULLNULL >>> >>>> >>>> 112HiNULL >>> >>>> >>>> 134HelloHi >>> >>>> >>>> 125NULLNULL >>> >>>> >>>> 14NULLNULLZero >>> >>>> >>>> >>> >>>> >>>> Expected Output: >>> >>>> >>>> >>> >>>> >>>> >>> >>>> >>>> >>> >>>> >>>> ID (Int)A (Int)B (String)C (String) >>> >>>> >>>> 14HelloZero >>> >>>> >>>> >>> >>>> >>>> >>> >>>> >>>> Query executed: >>> >>>> >>>> >>> >>>> >>>> select id, last_value(a) over(partition by id order by >>> >>>> >>>> date_time >>> >>>> >>>> desc) >>> >>>> >>>> as a, last_value(b) over(partition by id order by date_time >>> >>>> >>>> desc) >>> >>>> >>>> as b, >>> >>>> >>>> last_value(c) over(partition by id order by date_time desc) as >>> >>>> >>>> c >>> >>>> >>>> from >>> >>>> >>>> udf_test where a is not null and b is not null and c is not >>> >>>> >>>> null; >>> >>>> >>>> >>> >>>> >>>> >>> >>>> >>>> >>> >>>> >>>> Output I am getting: >>> >>>> >>>> >>> >>>> >>>> +----+---+-------+----+ >>> >>>> >>>> >>> >>>> >>>> | id | a | b | c | >>> >>>> >>>> >>> >>>> >>>> +----+---+-------+----+ >>> >>>> >>>> >>> >>>> >>>> | 1 | 4 | Hello | Hi || >>> >>>> >>>> >>> >>>> >>>> +----+---+-------+----+ >>> >>>> >>>> >>> >>>> >>>> >>> >>>> >>>> Hopefully, I am clear with the problem above. >>> >>>> >>>> >>> >>>> >>>> Thanks, >>> >>>> >>>> Ravi >>> >>>> >>>> >>> >>>> >>>> On 20 June 2017 at 22:05, Ravi Kanth <[email protected]> >>> >>>> >>>> wrote: >>> >>>> >>>>> >>> >>>> >>>>> Antoni, >>> >>>> >>>>> >>> >>>> >>>>> Thanks for the suggestion. Let me have a look at it and >>> >>>> >>>>> hopefully >>> >>>> >>>>> we >>> >>>> >>>>> can use it in our use case. >>> >>>> >>>>> >>> >>>> >>>>> Thanks, >>> >>>> >>>>> Ravi >>> >>>> >>>>> >>> >>>> >>>>> On Tue, Jun 20, 2017 at 21:53 Antoni Ivanov >>> >>>> >>>>> <[email protected]> >>> >>>> >>>>> wrote: >>> >>>> >>>>>> >>> >>>> >>>>>> Hi Ravi, >>> >>>> >>>>>> >>> >>>> >>>>>> I am curious why you are not using already existing >>> >>>> >>>>>> last_value >>> >>>> >>>>>> function in Impala to get "latest non null value for the >>> >>>> >>>>>> column” >>> >>>> >>>>>> >>> >>>> >>>>>> e.g >>> >>>> >>>>>> last_value(column_a ignore nulls) over(partition by ID order >>> >>>> >>>>>> by >>> >>>> >>>>>> Date_Time) >>> >>>> >>>>>> >>> >>>> >>>>>> Thanks, >>> >>>> >>>>>> Antoni >>> >>>> >>>>>> >>> >>>> >>>>>> >>> >>>> >>>>>> >>> >>>> >>>>>> >>> >>>> >>>>>> On Jun 21, 2017, at 1:17 AM, Tim Armstrong >>> >>>> >>>>>> <[email protected]> >>> >>>> >>>>>> wrote: >>> >>>> >>>>>> >>> >>>> >>>>>> This was double-posted to >>> >>>> >>>>>> >>> >>>> >>>>>> >>> >>>> >>>>>> http://community.cloudera.com/t5/Interactive-Short-cycle-SQL/Creating-Impala-UDA/m-p/56201/highlight/false#M3073 >>> >>>> >>>>>> also. I'll continue the discussion here. >>> >>>> >>>>>> >>> >>>> >>>>>> > Can we have the flexibility of declaring the variable >>> >>>> >>>>>> > globally >>> >>>> >>>>>> > in >>> >>>> >>>>>> > UDF? Globally, I mean outside the function? >>> >>>> >>>>>> >>> >>>> >>>>>> > And, the reason I am declaring a static variable is to >>> >>>> >>>>>> > restore >>> >>>> >>>>>> > the >>> >>>> >>>>>> > value of timestamp for every record so that I can perform a >>> >>>> >>>>>> > comparison of >>> >>>> >>>>>> > the timestamps. Is there an alternative approach for this? >>> >>>> >>>>>> >>> >>>> >>>>>> Updating a global or static variable in a UDAF is guaranteed >>> >>>> >>>>>> not >>> >>>> >>>>>> to do >>> >>>> >>>>>> what you expect - the function can be invoked concurrently by >>> >>>> >>>>>> multiple >>> >>>> >>>>>> threads. >>> >>>> >>>>>> >>> >>>> >>>>>> It seems like you probably want to store some additional >>> >>>> >>>>>> state in >>> >>>> >>>>>> the >>> >>>> >>>>>> intermediate value. There are some sample UDAs here (see >>> >>>> >>>>>> Avg()) >>> >>>> >>>>>> where >>> >>>> >>>>>> additional intermediate state is stored in a StringVal: >>> >>>> >>>>>> >>> >>>> >>>>>> >>> >>>> >>>>>> https://github.com/cloudera/impala-udf-samples/blob/master/uda-sample.cc#L61 >>> >>>> >>>>>> >>> >>>> >>>>>> >>> >>>> >>>>>> On Tue, Jun 20, 2017 at 2:40 PM, Ravi Kanth >>> >>>> >>>>>> <[email protected]> >>> >>>> >>>>>> wrote: >>> >>>> >>>>>>> >>> >>>> >>>>>>> Thanks Bharath. Can you check if the logic I am implementing >>> >>>> >>>>>>> is >>> >>>> >>>>>>> correct or needed any modification in it as well? I am very >>> >>>> >>>>>>> new >>> >>>> >>>>>>> to Impala >>> >>>> >>>>>>> UDF & C++ and having some hard time figuring out the >>> >>>> >>>>>>> problems. >>> >>>> >>>>>>> >>> >>>> >>>>>>> On 20 June 2017 at 14:27, Bharath Vissapragada >>> >>>> >>>>>>> <[email protected]> wrote: >>> >>>> >>>>>>>> >>> >>>> >>>>>>>> You need to allocate memory for tsTemp, else it can >>> >>>> >>>>>>>> segfault. >>> >>>> >>>>>>>> That >>> >>>> >>>>>>>> could be the issue here. >>> >>>> >>>>>>>> >>> >>>> >>>>>>>> static TimestampVal* tsTemp; >>> >>>> >>>>>>>> tsTemp->date = 0; >>> >>>> >>>>>>>> tsTemp->time_of_day = 0; >>> >>>> >>>>>>>> >>> >>>> >>>>>>>> >>> >>>> >>>>>>>> On Tue, Jun 20, 2017 at 2:15 PM, Ravi Kanth >>> >>>> >>>>>>>> <[email protected]> wrote: >>> >>>> >>>>>>>>> >>> >>>> >>>>>>>>> Hi All, >>> >>>> >>>>>>>>> We are using Impala to do various processings in our >>> >>>> >>>>>>>>> systems. >>> >>>> >>>>>>>>> We >>> >>>> >>>>>>>>> have a requirement recently, wherein we have to handle the >>> >>>> >>>>>>>>> updates on the >>> >>>> >>>>>>>>> events i.e, we have an 'e_update' table which has the >>> >>>> >>>>>>>>> partial >>> >>>> >>>>>>>>> updates >>> >>>> >>>>>>>>> received for various events. The fields that are not >>> >>>> >>>>>>>>> updated >>> >>>> >>>>>>>>> are being >>> >>>> >>>>>>>>> stored as NULL values. >>> >>>> >>>>>>>>> >>> >>>> >>>>>>>>> >>> >>>> >>>>>>>>> >>> >>>> >>>>>>>>> Ex: >>> >>>> >>>>>>>>> >>> >>>> >>>>>>>>> >>> >>>> >>>>>>>>> >>> >>>> >>>>>>>>> ID (Int) Date_Time (timestamp) A (Int) B (String) C >>> >>>> >>>>>>>>> (String) >>> >>>> >>>>>>>>> 1 0 1 NULL NULL >>> >>>> >>>>>>>>> 1 1 2 Hi NULL >>> >>>> >>>>>>>>> 1 3 4 Hello Hi >>> >>>> >>>>>>>>> 1 2 5 NULL NULL >>> >>>> >>>>>>>>> 1 4 NULL NULL Zero >>> >>>> >>>>>>>>> >>> >>>> >>>>>>>>> >>> >>>> >>>>>>>>> >>> >>>> >>>>>>>>> P.S: Please consider Date_time as valid timestamp type >>> >>>> >>>>>>>>> values. >>> >>>> >>>>>>>>> For >>> >>>> >>>>>>>>> easy understanding, mentioned them as 0,1,2,3,4,5 >>> >>>> >>>>>>>>> >>> >>>> >>>>>>>>> >>> >>>> >>>>>>>>> >>> >>>> >>>>>>>>> As seen in the above table, the events have a unique id >>> >>>> >>>>>>>>> and as >>> >>>> >>>>>>>>> we >>> >>>> >>>>>>>>> get an update to a particular event, we are storing the >>> >>>> >>>>>>>>> date_time at which >>> >>>> >>>>>>>>> update has happened and also storing the partial updated >>> >>>> >>>>>>>>> values. Apart from >>> >>>> >>>>>>>>> the updated values, the rest are stored as NULL values. >>> >>>> >>>>>>>>> >>> >>>> >>>>>>>>> >>> >>>> >>>>>>>>> >>> >>>> >>>>>>>>> We are planning to mimic inplace updates on the table, so >>> >>>> >>>>>>>>> that >>> >>>> >>>>>>>>> it >>> >>>> >>>>>>>>> would retrieve the resulting table as follows using the >>> >>>> >>>>>>>>> query >>> >>>> >>>>>>>>> below: We >>> >>>> >>>>>>>>> don't delete the data. >>> >>>> >>>>>>>>> >>> >>>> >>>>>>>>> >>> >>>> >>>>>>>>> >>> >>>> >>>>>>>>> > SELECT id, current_val(A,date_time) as A, >>> >>>> >>>>>>>>> > current_val(B,date_time) as B, current_val(C,date_time) >>> >>>> >>>>>>>>> > as C >>> >>>> >>>>>>>>> > from e_update >>> >>>> >>>>>>>>> > GROUP BY ID; >>> >>>> >>>>>>>>> >>> >>>> >>>>>>>>> >>> >>>> >>>>>>>>> >>> >>>> >>>>>>>>> where, current_val is a custom impala UDA we are planning >>> >>>> >>>>>>>>> to >>> >>>> >>>>>>>>> implement. i.e. get latest non null value for the column. >>> >>>> >>>>>>>>> >>> >>>> >>>>>>>>> >>> >>>> >>>>>>>>> >>> >>>> >>>>>>>>> ID (Int) A (Int) B (String) C (String) >>> >>>> >>>>>>>>> 1 4 Hello Zero >>> >>>> >>>>>>>>> >>> >>>> >>>>>>>>> >>> >>>> >>>>>>>>> >>> >>>> >>>>>>>>> >>> >>>> >>>>>>>>> >>> >>>> >>>>>>>>> Implemented current_val UDA: >>> >>>> >>>>>>>>> The below code is only for int type inputs: >>> >>>> >>>>>>>>> >>> >>>> >>>>>>>>> >>> >>>> >>>>>>>>> >>> >>>> >>>>>>>>> uda-currentval.h >>> >>>> >>>>>>>>> >>> >>>> >>>>>>>>> //This is a sample for retrieving the current value of >>> >>>> >>>>>>>>> e_update >>> >>>> >>>>>>>>> table >>> >>>> >>>>>>>>> // >>> >>>> >>>>>>>>> void CurrentValueInit(FunctionContext* context, IntVal* >>> >>>> >>>>>>>>> val); >>> >>>> >>>>>>>>> void CurrentValueUpdate(FunctionContext* context, const >>> >>>> >>>>>>>>> IntVal& >>> >>>> >>>>>>>>> input, const TimestampVal& ts, IntVal* val); >>> >>>> >>>>>>>>> void CurrentValueMerge(FunctionContext* context, const >>> >>>> >>>>>>>>> IntVal& >>> >>>> >>>>>>>>> src, >>> >>>> >>>>>>>>> IntVal* dst); >>> >>>> >>>>>>>>> IntVal CurrentValueFinalize(FunctionContext* context, >>> >>>> >>>>>>>>> const >>> >>>> >>>>>>>>> IntVal& >>> >>>> >>>>>>>>> val); >>> >>>> >>>>>>>>> >>> >>>> >>>>>>>>> uda-currentval.cc >>> >>>> >>>>>>>>> >>> >>>> >>>>>>>>> // >>> >>>> >>>>>>>>> >>> >>>> >>>>>>>>> >>> >>>> >>>>>>>>> ----------------------------------------------------------------------------------------------- >>> >>>> >>>>>>>>> // This is a sample for retrieving the current value of >>> >>>> >>>>>>>>> e_update >>> >>>> >>>>>>>>> table >>> >>>> >>>>>>>>> >>> >>>> >>>>>>>>> >>> >>>> >>>>>>>>> >>> >>>> >>>>>>>>> //----------------------------------------------------------------------------------------------- >>> >>>> >>>>>>>>> void CurrentValueInit(FunctionContext* context, IntVal* >>> >>>> >>>>>>>>> val) { >>> >>>> >>>>>>>>> val->is_null = false; >>> >>>> >>>>>>>>> val->val = 0; >>> >>>> >>>>>>>>> } >>> >>>> >>>>>>>>> >>> >>>> >>>>>>>>> void CurrentValueUpdate(FunctionContext* context, const >>> >>>> >>>>>>>>> IntVal& >>> >>>> >>>>>>>>> input, const TimestampVal& ts, IntVal* val) { >>> >>>> >>>>>>>>> static TimestampVal* tsTemp; >>> >>>> >>>>>>>>> tsTemp->date = 0; >>> >>>> >>>>>>>>> tsTemp->time_of_day = 0; >>> >>>> >>>>>>>>> if(tsTemp->date==0 && tsTemp->time_of_day==0){ >>> >>>> >>>>>>>>> tsTemp->date = ts.date; >>> >>>> >>>>>>>>> tsTemp->time_of_day = ts.time_of_day; >>> >>>> >>>>>>>>> val->val = input.val; >>> >>>> >>>>>>>>> return; >>> >>>> >>>>>>>>> } >>> >>>> >>>>>>>>> if(ts.date > tsTemp->date && ts.time_of_day > >>> >>>> >>>>>>>>> tsTemp->time_of_day){ >>> >>>> >>>>>>>>> tsTemp->date = ts.date; >>> >>>> >>>>>>>>> tsTemp->time_of_day = ts.time_of_day; >>> >>>> >>>>>>>>> val->val = input.val; >>> >>>> >>>>>>>>> return; >>> >>>> >>>>>>>>> } >>> >>>> >>>>>>>>> } >>> >>>> >>>>>>>>> >>> >>>> >>>>>>>>> void CurrentValueMerge(FunctionContext* context, const >>> >>>> >>>>>>>>> IntVal& >>> >>>> >>>>>>>>> src, >>> >>>> >>>>>>>>> IntVal* dst) { >>> >>>> >>>>>>>>> dst->val += src.val; >>> >>>> >>>>>>>>> } >>> >>>> >>>>>>>>> >>> >>>> >>>>>>>>> IntVal CurrentValueFinalize(FunctionContext* context, >>> >>>> >>>>>>>>> const >>> >>>> >>>>>>>>> IntVal& >>> >>>> >>>>>>>>> val) { >>> >>>> >>>>>>>>> return val; >>> >>>> >>>>>>>>> } >>> >>>> >>>>>>>>> >>> >>>> >>>>>>>>> >>> >>>> >>>>>>>>> >>> >>>> >>>>>>>>> We are able to build and create an aggregate function in >>> >>>> >>>>>>>>> impala, >>> >>>> >>>>>>>>> but when trying to run the select query similar to the one >>> >>>> >>>>>>>>> above, it is >>> >>>> >>>>>>>>> bringing down couple of impala deamons and throwing the >>> >>>> >>>>>>>>> error >>> >>>> >>>>>>>>> below and >>> >>>> >>>>>>>>> getting terminated. >>> >>>> >>>>>>>>> >>> >>>> >>>>>>>>> >>> >>>> >>>>>>>>> >>> >>>> >>>>>>>>> WARNINGS: Cancelled due to unreachable impalad(s): >>> >>>> >>>>>>>>> hadoop102.**.**.**.com:22000 >>> >>>> >>>>>>>>> >>> >>>> >>>>>>>>> >>> >>>> >>>>>>>>> >>> >>>> >>>>>>>>> >>> >>>> >>>>>>>>> >>> >>>> >>>>>>>>> We have impalad running on 14 instances. >>> >>>> >>>>>>>>> >>> >>>> >>>>>>>>> >>> >>>> >>>>>>>>> >>> >>>> >>>>>>>>> Can someone help resolve us this problem and a better way >>> >>>> >>>>>>>>> to >>> >>>> >>>>>>>>> achieve a solution for the scenario explained. >>> >>>> >>>>>>>> >>> >>>> >>>>>>>> >>> >>>> >>>>>>> >>> >>>> >>>>>> >>> >>>> >>>>>> >>> >>>> >>>> >>> >>>> >> >>> >>>> > >> >> >
