Thank you. This helps a lot.

One last question: Is this behavior the same for both updating phase and
merging phase?

On Fri, Jun 5, 2020 at 6:47 PM Tim Armstrong <tarmstr...@cloudera.com>
wrote:

> I think what you're seeing is expected.
>
> There is a different 'context' per aggregate function, per aggregation
> operator, per thread. So it's expected there are multiple FunctionContext
> objects.
>
> Init() initializes the intermediate value, so if you have an aggregation
> with a group by, it will be called multiple times for each FunctionContext.
>
> On Fri, Jun 5, 2020 at 12:11 PM Shuhao Tan <johnmave...@gmail.com> wrote:
>
>> Sorry for a typo in the code
>> It should be
>> void Init(FunctionContext* context, StringVal* result) {
>>   DebugPrint("Init: current state
>> 0x%llx", 
>> reinterpret_cast<std::size_t>(context->GetFunctionState(FunctionContext::THREAD_LOCAL)));
>>   auto ptr = context->allocate(4);
>>   context->SetFunctionState(FunctionContext::THREAD_LOCAL, ptr);
>>   DebugPrint("Init: store 0x%llx", ptr);
>> }
>>
>> On Fri, Jun 5, 2020 at 3:09 PM Shuhao Tan <johnmave...@gmail.com> wrote:
>>
>>> Thanks. Knowing that Init() would be called many times per thread really
>>> helps.
>>>
>>> Basically I did something like this:
>>>
>>> void Init(FunctionContext* context, StringVal* result) {
>>>   DebugPrint("Init: current state
>>> 0x%llx", 
>>> reinterpret_cast<std::size_t>(context->GetFunctionState(FunctionContext::THREAD_LOCAL)));
>>>   auto ptr = context->allocate(4);
>>>   DebugPrint("Init: store
>>> 0x%llx", 
>>> reinterpret_cast<std::size_t>(context->GetFunctionState(FunctionContext::THREAD_LOCAL)));
>>>   context->SetFunctionState(FunctionContext::THREAD_LOCAL, ptr);
>>> }
>>> And just nop in the update, serialize, merge, and finalize.
>>>
>>> The DebugPrint is printing to a temporary file with timestamp and thread
>>> id with syscall(SYS_gettid)
>>> The output in the file looks like:
>>> 2020-06-05 16:13:07[26318]: Init: current state 0x0
>>> 2020-06-05 16:13:07[26318]: Init: store 0xf474008
>>> 2020-06-05 16:13:07[26318]: Init: current state 0x0
>>> 2020-06-05 16:13:07[26318]: Init: store 0xea7d008
>>> 2020-06-05 16:13:07[26318]: Init: current state 0x0
>>> 2020-06-05 16:13:07[26318]: Init: store 0xea7c008
>>> 2020-06-05 16:13:07[26318]: Init: current state 0x0
>>> 2020-06-05 16:13:07[26318]: Init: store 0xe9f6008
>>> 2020-06-05 16:13:07[26318]: Init: current state 0x0
>>> 2020-06-05 16:13:07[26318]: Init: store 0xe9f7008
>>> 2020-06-05 16:13:07[26318]: Init: current state 0xea7d008
>>> 2020-06-05 16:13:07[26318]: Init: store 0xea7d0c0
>>> 2020-06-05 16:13:07[26318]: Init: current state 0x0
>>> 2020-06-05 16:13:07[26318]: Init: store 0xf475008
>>> 2020-06-05 16:13:07[26318]: Init: current state 0xea7c008
>>> 2020-06-05 16:13:07[26318]: Init: store 0xea7c0c0
>>> 2020-06-05 16:13:07[26318]: Init: current state 0x0
>>> 2020-06-05 16:13:07[26318]: Init: store 0xff28008
>>> 2020-06-05 16:13:07[26318]: Init: current state 0xea7c0c0
>>> 2020-06-05 16:13:07[26318]: Init: store 0xea7c178
>>> 2020-06-05 16:13:07[26318]: Init: current state 0xff28008
>>>
>>> It does seem that the same thread is calling Init many times, with both
>>> same FunctionStates and different FunctionStates. In other words, it seems
>>> that these Init() calls are grouped, where different groups have different
>>> THREAD_LOCAL FunctionState storage and calls within the same group share
>>> the same THREAD_LOCAL FunctionState storage.
>>>
>>> Does my observation make any sense?
>>>
>>> On Fri, Jun 5, 2020 at 2:55 PM Tim Armstrong <tarmstr...@cloudera.com>
>>> wrote:
>>>
>>>> I think it would be easier to understand what you're seeing if you
>>>> provided an example of what the code for your aggregate function looks
>>>> like. If you call SetFunctionState(THREAD_LOCAL), I don't see a way that
>>>> the pointer you set would be returned from GetFunctionState(THREAD_LOCAL)
>>>> in a different thread.
>>>>
>>>> Init() is called for every aggregate tuple, so it can be called many
>>>> times per thread for aggregations with a grouping key.
>>>>
>>>> Setting the FRAGMENT_LOCAL state only really makes sense for UDFs when
>>>> Prepare(FRAGMENT_LOCAL) is called. After that the state is copied to any
>>>> thread-local FunctionContexts. Calling SetFunctionState(FRAGMENT_LOCAL)
>>>> later on is only going to modify the thread-local FunctionContext anyway.
>>>>
>>>> On Fri, Jun 5, 2020 at 11:06 AM Shuhao Tan <johnmave...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi all,
>>>>>
>>>>> I recently wrote some UDA and I noticed that in be/src/udf/udf.h the
>>>>> comments for SetFunctionState starts with
>>>>> > Methods for maintaining state across UDF/UDA function calls.
>>>>> I presume GetFunctionState/SetFunctionState should work for UDA as
>>>>> well.
>>>>>
>>>>> I first tried to find an example in the repo, but I found the function
>>>>> is exclusively used by UDF.
>>>>> I then implemented a simple UDA just to test its behaviour. My current
>>>>> findings are:
>>>>> Even when using SetFunctionState(THREAD_LOCAL, some_ptr) only in the
>>>>> Init, other threads (presumably in the same fragment) can still see it 
>>>>> with
>>>>> GetFunctionState(THREAD_LOCAL) if their Init were invoked later.
>>>>> Currently it seems that threads in the same fragment were calling Init
>>>>> sequentially without race condition on FunctionState.
>>>>>
>>>>> My questions: Are GetFunctionState/SetFunctionState well-defined in
>>>>> UDA?
>>>>> If so, what are the semantics and execution guarantees? How does
>>>>> passing different FunctionStateScope change the behavior? Is it guaranteed
>>>>> thread-safe?
>>>>>
>>>>> Thanks.
>>>>>
>>>>

Reply via email to