Yeah it'll be basically the same for both - Init() is called for each intermediate value in both case, before the update or merge function is called.
On Fri, Jun 5, 2020 at 4:47 PM Shuhao Tan <johnmave...@gmail.com> wrote: > Thank you. This helps a lot. > > One last question: Is this behavior the same for both updating phase and > merging phase? > > On Fri, Jun 5, 2020 at 6:47 PM Tim Armstrong <tarmstr...@cloudera.com> > wrote: > >> I think what you're seeing is expected. >> >> There is a different 'context' per aggregate function, per aggregation >> operator, per thread. So it's expected there are multiple FunctionContext >> objects. >> >> Init() initializes the intermediate value, so if you have an aggregation >> with a group by, it will be called multiple times for each FunctionContext. >> >> On Fri, Jun 5, 2020 at 12:11 PM Shuhao Tan <johnmave...@gmail.com> wrote: >> >>> Sorry for a typo in the code >>> It should be >>> void Init(FunctionContext* context, StringVal* result) { >>> DebugPrint("Init: current state >>> 0x%llx", >>> reinterpret_cast<std::size_t>(context->GetFunctionState(FunctionContext::THREAD_LOCAL))); >>> auto ptr = context->allocate(4); >>> context->SetFunctionState(FunctionContext::THREAD_LOCAL, ptr); >>> DebugPrint("Init: store 0x%llx", ptr); >>> } >>> >>> On Fri, Jun 5, 2020 at 3:09 PM Shuhao Tan <johnmave...@gmail.com> wrote: >>> >>>> Thanks. Knowing that Init() would be called many times per thread >>>> really helps. >>>> >>>> Basically I did something like this: >>>> >>>> void Init(FunctionContext* context, StringVal* result) { >>>> DebugPrint("Init: current state >>>> 0x%llx", >>>> reinterpret_cast<std::size_t>(context->GetFunctionState(FunctionContext::THREAD_LOCAL))); >>>> auto ptr = context->allocate(4); >>>> DebugPrint("Init: store >>>> 0x%llx", >>>> reinterpret_cast<std::size_t>(context->GetFunctionState(FunctionContext::THREAD_LOCAL))); >>>> context->SetFunctionState(FunctionContext::THREAD_LOCAL, ptr); >>>> } >>>> And just nop in the update, serialize, merge, and finalize. >>>> >>>> The DebugPrint is printing to a temporary file with timestamp and >>>> thread id with syscall(SYS_gettid) >>>> The output in the file looks like: >>>> 2020-06-05 16:13:07[26318]: Init: current state 0x0 >>>> 2020-06-05 16:13:07[26318]: Init: store 0xf474008 >>>> 2020-06-05 16:13:07[26318]: Init: current state 0x0 >>>> 2020-06-05 16:13:07[26318]: Init: store 0xea7d008 >>>> 2020-06-05 16:13:07[26318]: Init: current state 0x0 >>>> 2020-06-05 16:13:07[26318]: Init: store 0xea7c008 >>>> 2020-06-05 16:13:07[26318]: Init: current state 0x0 >>>> 2020-06-05 16:13:07[26318]: Init: store 0xe9f6008 >>>> 2020-06-05 16:13:07[26318]: Init: current state 0x0 >>>> 2020-06-05 16:13:07[26318]: Init: store 0xe9f7008 >>>> 2020-06-05 16:13:07[26318]: Init: current state 0xea7d008 >>>> 2020-06-05 16:13:07[26318]: Init: store 0xea7d0c0 >>>> 2020-06-05 16:13:07[26318]: Init: current state 0x0 >>>> 2020-06-05 16:13:07[26318]: Init: store 0xf475008 >>>> 2020-06-05 16:13:07[26318]: Init: current state 0xea7c008 >>>> 2020-06-05 16:13:07[26318]: Init: store 0xea7c0c0 >>>> 2020-06-05 16:13:07[26318]: Init: current state 0x0 >>>> 2020-06-05 16:13:07[26318]: Init: store 0xff28008 >>>> 2020-06-05 16:13:07[26318]: Init: current state 0xea7c0c0 >>>> 2020-06-05 16:13:07[26318]: Init: store 0xea7c178 >>>> 2020-06-05 16:13:07[26318]: Init: current state 0xff28008 >>>> >>>> It does seem that the same thread is calling Init many times, with both >>>> same FunctionStates and different FunctionStates. In other words, it seems >>>> that these Init() calls are grouped, where different groups have different >>>> THREAD_LOCAL FunctionState storage and calls within the same group share >>>> the same THREAD_LOCAL FunctionState storage. >>>> >>>> Does my observation make any sense? >>>> >>>> On Fri, Jun 5, 2020 at 2:55 PM Tim Armstrong <tarmstr...@cloudera.com> >>>> wrote: >>>> >>>>> I think it would be easier to understand what you're seeing if you >>>>> provided an example of what the code for your aggregate function looks >>>>> like. If you call SetFunctionState(THREAD_LOCAL), I don't see a way that >>>>> the pointer you set would be returned from GetFunctionState(THREAD_LOCAL) >>>>> in a different thread. >>>>> >>>>> Init() is called for every aggregate tuple, so it can be called many >>>>> times per thread for aggregations with a grouping key. >>>>> >>>>> Setting the FRAGMENT_LOCAL state only really makes sense for UDFs when >>>>> Prepare(FRAGMENT_LOCAL) is called. After that the state is copied to any >>>>> thread-local FunctionContexts. Calling SetFunctionState(FRAGMENT_LOCAL) >>>>> later on is only going to modify the thread-local FunctionContext anyway. >>>>> >>>>> On Fri, Jun 5, 2020 at 11:06 AM Shuhao Tan <johnmave...@gmail.com> >>>>> wrote: >>>>> >>>>>> Hi all, >>>>>> >>>>>> I recently wrote some UDA and I noticed that in be/src/udf/udf.h the >>>>>> comments for SetFunctionState starts with >>>>>> > Methods for maintaining state across UDF/UDA function calls. >>>>>> I presume GetFunctionState/SetFunctionState should work for UDA as >>>>>> well. >>>>>> >>>>>> I first tried to find an example in the repo, but I found the >>>>>> function is exclusively used by UDF. >>>>>> I then implemented a simple UDA just to test its behaviour. My >>>>>> current findings are: >>>>>> Even when using SetFunctionState(THREAD_LOCAL, some_ptr) only in the >>>>>> Init, other threads (presumably in the same fragment) can still see it >>>>>> with >>>>>> GetFunctionState(THREAD_LOCAL) if their Init were invoked later. >>>>>> Currently it seems that threads in the same fragment were calling >>>>>> Init sequentially without race condition on FunctionState. >>>>>> >>>>>> My questions: Are GetFunctionState/SetFunctionState well-defined in >>>>>> UDA? >>>>>> If so, what are the semantics and execution guarantees? How does >>>>>> passing different FunctionStateScope change the behavior? Is it >>>>>> guaranteed >>>>>> thread-safe? >>>>>> >>>>>> Thanks. >>>>>> >>>>>