Hi All,

We are using Impala to do various processings in our systems. We have a
requirement recently, wherein we have to handle the updates on the events
i.e, we have an 'e_update' table which has the partial updates received for
various events. The fields that are not updated are being stored as NULL
values.



Ex:


ID (Int) Date_Time (timestamp) A (Int) B (String) C (String)
1 0 1 NULL NULL
1 1 2 Hi NULL
1 3 4 Hello Hi
1 2 5 NULL NULL
1 4 NULL NULL Zero



P.S: Please consider Date_time as valid timestamp type values. For easy
understanding, mentioned them as 0,1,2,3,4,5



As seen in the above table, the events have a unique id and as we get
an update to a particular event, we are storing the date_time at which
update has happened and also storing the partial updated values. Apart from
the updated values, the rest are stored as NULL values.



We are planning to mimic inplace updates on the table, so that it would
retrieve the resulting table as follows using the query below: We don't
delete the data.



> SELECT id, current_val(A,date_time) as A, current_val(B,date_time) as B,
current_val(C,date_time) as C from e_update GROUP BY ID;



where, current_val is a custom impala UDA we are planning to implement.
i.e. get* latest non null value for the column.*


ID (Int) A (Int) B (String) C (String)
1 4 Hello Zero





Implemented current_val UDA:

The below code is only for int type inputs:



uda-currentval.h

//This is a sample for retrieving the current value of e_update table
//
void CurrentValueInit(FunctionContext* context, IntVal* val);
void CurrentValueUpdate(FunctionContext* context, const IntVal& input,
const TimestampVal& ts, IntVal* val);
void CurrentValueMerge(FunctionContext* context, const IntVal& src,
IntVal* dst);
IntVal CurrentValueFinalize(FunctionContext* context, const IntVal& val);

uda-currentval.cc

// 
-----------------------------------------------------------------------------------------------
// This is a sample for retrieving the current value of e_update table
//-----------------------------------------------------------------------------------------------
void CurrentValueInit(FunctionContext* context, IntVal* val) {
      val->is_null = false;
      val->val = 0;
}

void CurrentValueUpdate(FunctionContext* context, const IntVal& input,
const TimestampVal& ts, IntVal* val) {
      static TimestampVal* tsTemp;
      tsTemp->date = 0;
      tsTemp->time_of_day = 0;
      if(tsTemp->date==0 && tsTemp->time_of_day==0){
        tsTemp->date = ts.date;
        tsTemp->time_of_day = ts.time_of_day;
        val->val = input.val;
        return;
      }
      if(ts.date > tsTemp->date && ts.time_of_day > tsTemp->time_of_day){
        tsTemp->date = ts.date;
        tsTemp->time_of_day = ts.time_of_day;
        val->val = input.val;
        return;
      }
}

void CurrentValueMerge(FunctionContext* context, const IntVal& src,
IntVal* dst) {
     dst->val += src.val;
}

IntVal CurrentValueFinalize(FunctionContext* context, const IntVal& val) {
     return val;
}



We are able to build and create an aggregate function in impala, but when
trying to run the select query similar to the one above, it is bringing
down couple of impala deamons and throwing the error below and getting
terminated.



WARNINGS: Cancelled due to unreachable impalad(s):
hadoop102.**.**.**.com:22000





We have impalad running on 14 instances.



Can someone help resolve us this problem and a better way to achieve a
solution for the scenario explained.

Reply via email to