Thank you. This helps a lot. One last question: Is this behavior the same for both updating phase and merging phase?
On Fri, Jun 5, 2020 at 6:47 PM Tim Armstrong <tarmstr...@cloudera.com> wrote: > I think what you're seeing is expected. > > There is a different 'context' per aggregate function, per aggregation > operator, per thread. So it's expected there are multiple FunctionContext > objects. > > Init() initializes the intermediate value, so if you have an aggregation > with a group by, it will be called multiple times for each FunctionContext. > > On Fri, Jun 5, 2020 at 12:11 PM Shuhao Tan <johnmave...@gmail.com> wrote: > >> Sorry for a typo in the code >> It should be >> void Init(FunctionContext* context, StringVal* result) { >> DebugPrint("Init: current state >> 0x%llx", >> reinterpret_cast<std::size_t>(context->GetFunctionState(FunctionContext::THREAD_LOCAL))); >> auto ptr = context->allocate(4); >> context->SetFunctionState(FunctionContext::THREAD_LOCAL, ptr); >> DebugPrint("Init: store 0x%llx", ptr); >> } >> >> On Fri, Jun 5, 2020 at 3:09 PM Shuhao Tan <johnmave...@gmail.com> wrote: >> >>> Thanks. Knowing that Init() would be called many times per thread really >>> helps. >>> >>> Basically I did something like this: >>> >>> void Init(FunctionContext* context, StringVal* result) { >>> DebugPrint("Init: current state >>> 0x%llx", >>> reinterpret_cast<std::size_t>(context->GetFunctionState(FunctionContext::THREAD_LOCAL))); >>> auto ptr = context->allocate(4); >>> DebugPrint("Init: store >>> 0x%llx", >>> reinterpret_cast<std::size_t>(context->GetFunctionState(FunctionContext::THREAD_LOCAL))); >>> context->SetFunctionState(FunctionContext::THREAD_LOCAL, ptr); >>> } >>> And just nop in the update, serialize, merge, and finalize. >>> >>> The DebugPrint is printing to a temporary file with timestamp and thread >>> id with syscall(SYS_gettid) >>> The output in the file looks like: >>> 2020-06-05 16:13:07[26318]: Init: current state 0x0 >>> 2020-06-05 16:13:07[26318]: Init: store 0xf474008 >>> 2020-06-05 16:13:07[26318]: Init: current state 0x0 >>> 2020-06-05 16:13:07[26318]: Init: store 0xea7d008 >>> 2020-06-05 16:13:07[26318]: Init: current state 0x0 >>> 2020-06-05 16:13:07[26318]: Init: store 0xea7c008 >>> 2020-06-05 16:13:07[26318]: Init: current state 0x0 >>> 2020-06-05 16:13:07[26318]: Init: store 0xe9f6008 >>> 2020-06-05 16:13:07[26318]: Init: current state 0x0 >>> 2020-06-05 16:13:07[26318]: Init: store 0xe9f7008 >>> 2020-06-05 16:13:07[26318]: Init: current state 0xea7d008 >>> 2020-06-05 16:13:07[26318]: Init: store 0xea7d0c0 >>> 2020-06-05 16:13:07[26318]: Init: current state 0x0 >>> 2020-06-05 16:13:07[26318]: Init: store 0xf475008 >>> 2020-06-05 16:13:07[26318]: Init: current state 0xea7c008 >>> 2020-06-05 16:13:07[26318]: Init: store 0xea7c0c0 >>> 2020-06-05 16:13:07[26318]: Init: current state 0x0 >>> 2020-06-05 16:13:07[26318]: Init: store 0xff28008 >>> 2020-06-05 16:13:07[26318]: Init: current state 0xea7c0c0 >>> 2020-06-05 16:13:07[26318]: Init: store 0xea7c178 >>> 2020-06-05 16:13:07[26318]: Init: current state 0xff28008 >>> >>> It does seem that the same thread is calling Init many times, with both >>> same FunctionStates and different FunctionStates. In other words, it seems >>> that these Init() calls are grouped, where different groups have different >>> THREAD_LOCAL FunctionState storage and calls within the same group share >>> the same THREAD_LOCAL FunctionState storage. >>> >>> Does my observation make any sense? >>> >>> On Fri, Jun 5, 2020 at 2:55 PM Tim Armstrong <tarmstr...@cloudera.com> >>> wrote: >>> >>>> I think it would be easier to understand what you're seeing if you >>>> provided an example of what the code for your aggregate function looks >>>> like. If you call SetFunctionState(THREAD_LOCAL), I don't see a way that >>>> the pointer you set would be returned from GetFunctionState(THREAD_LOCAL) >>>> in a different thread. >>>> >>>> Init() is called for every aggregate tuple, so it can be called many >>>> times per thread for aggregations with a grouping key. >>>> >>>> Setting the FRAGMENT_LOCAL state only really makes sense for UDFs when >>>> Prepare(FRAGMENT_LOCAL) is called. After that the state is copied to any >>>> thread-local FunctionContexts. Calling SetFunctionState(FRAGMENT_LOCAL) >>>> later on is only going to modify the thread-local FunctionContext anyway. >>>> >>>> On Fri, Jun 5, 2020 at 11:06 AM Shuhao Tan <johnmave...@gmail.com> >>>> wrote: >>>> >>>>> Hi all, >>>>> >>>>> I recently wrote some UDA and I noticed that in be/src/udf/udf.h the >>>>> comments for SetFunctionState starts with >>>>> > Methods for maintaining state across UDF/UDA function calls. >>>>> I presume GetFunctionState/SetFunctionState should work for UDA as >>>>> well. >>>>> >>>>> I first tried to find an example in the repo, but I found the function >>>>> is exclusively used by UDF. >>>>> I then implemented a simple UDA just to test its behaviour. My current >>>>> findings are: >>>>> Even when using SetFunctionState(THREAD_LOCAL, some_ptr) only in the >>>>> Init, other threads (presumably in the same fragment) can still see it >>>>> with >>>>> GetFunctionState(THREAD_LOCAL) if their Init were invoked later. >>>>> Currently it seems that threads in the same fragment were calling Init >>>>> sequentially without race condition on FunctionState. >>>>> >>>>> My questions: Are GetFunctionState/SetFunctionState well-defined in >>>>> UDA? >>>>> If so, what are the semantics and execution guarantees? How does >>>>> passing different FunctionStateScope change the behavior? Is it guaranteed >>>>> thread-safe? >>>>> >>>>> Thanks. >>>>> >>>>