Yeah it'll be basically the same for both - Init() is called for each
intermediate value in both case, before the update or merge function is
called.

On Fri, Jun 5, 2020 at 4:47 PM Shuhao Tan <johnmave...@gmail.com> wrote:

> Thank you. This helps a lot.
>
> One last question: Is this behavior the same for both updating phase and
> merging phase?
>
> On Fri, Jun 5, 2020 at 6:47 PM Tim Armstrong <tarmstr...@cloudera.com>
> wrote:
>
>> I think what you're seeing is expected.
>>
>> There is a different 'context' per aggregate function, per aggregation
>> operator, per thread. So it's expected there are multiple FunctionContext
>> objects.
>>
>> Init() initializes the intermediate value, so if you have an aggregation
>> with a group by, it will be called multiple times for each FunctionContext.
>>
>> On Fri, Jun 5, 2020 at 12:11 PM Shuhao Tan <johnmave...@gmail.com> wrote:
>>
>>> Sorry for a typo in the code
>>> It should be
>>> void Init(FunctionContext* context, StringVal* result) {
>>>   DebugPrint("Init: current state
>>> 0x%llx", 
>>> reinterpret_cast<std::size_t>(context->GetFunctionState(FunctionContext::THREAD_LOCAL)));
>>>   auto ptr = context->allocate(4);
>>>   context->SetFunctionState(FunctionContext::THREAD_LOCAL, ptr);
>>>   DebugPrint("Init: store 0x%llx", ptr);
>>> }
>>>
>>> On Fri, Jun 5, 2020 at 3:09 PM Shuhao Tan <johnmave...@gmail.com> wrote:
>>>
>>>> Thanks. Knowing that Init() would be called many times per thread
>>>> really helps.
>>>>
>>>> Basically I did something like this:
>>>>
>>>> void Init(FunctionContext* context, StringVal* result) {
>>>>   DebugPrint("Init: current state
>>>> 0x%llx", 
>>>> reinterpret_cast<std::size_t>(context->GetFunctionState(FunctionContext::THREAD_LOCAL)));
>>>>   auto ptr = context->allocate(4);
>>>>   DebugPrint("Init: store
>>>> 0x%llx", 
>>>> reinterpret_cast<std::size_t>(context->GetFunctionState(FunctionContext::THREAD_LOCAL)));
>>>>   context->SetFunctionState(FunctionContext::THREAD_LOCAL, ptr);
>>>> }
>>>> And just nop in the update, serialize, merge, and finalize.
>>>>
>>>> The DebugPrint is printing to a temporary file with timestamp and
>>>> thread id with syscall(SYS_gettid)
>>>> The output in the file looks like:
>>>> 2020-06-05 16:13:07[26318]: Init: current state 0x0
>>>> 2020-06-05 16:13:07[26318]: Init: store 0xf474008
>>>> 2020-06-05 16:13:07[26318]: Init: current state 0x0
>>>> 2020-06-05 16:13:07[26318]: Init: store 0xea7d008
>>>> 2020-06-05 16:13:07[26318]: Init: current state 0x0
>>>> 2020-06-05 16:13:07[26318]: Init: store 0xea7c008
>>>> 2020-06-05 16:13:07[26318]: Init: current state 0x0
>>>> 2020-06-05 16:13:07[26318]: Init: store 0xe9f6008
>>>> 2020-06-05 16:13:07[26318]: Init: current state 0x0
>>>> 2020-06-05 16:13:07[26318]: Init: store 0xe9f7008
>>>> 2020-06-05 16:13:07[26318]: Init: current state 0xea7d008
>>>> 2020-06-05 16:13:07[26318]: Init: store 0xea7d0c0
>>>> 2020-06-05 16:13:07[26318]: Init: current state 0x0
>>>> 2020-06-05 16:13:07[26318]: Init: store 0xf475008
>>>> 2020-06-05 16:13:07[26318]: Init: current state 0xea7c008
>>>> 2020-06-05 16:13:07[26318]: Init: store 0xea7c0c0
>>>> 2020-06-05 16:13:07[26318]: Init: current state 0x0
>>>> 2020-06-05 16:13:07[26318]: Init: store 0xff28008
>>>> 2020-06-05 16:13:07[26318]: Init: current state 0xea7c0c0
>>>> 2020-06-05 16:13:07[26318]: Init: store 0xea7c178
>>>> 2020-06-05 16:13:07[26318]: Init: current state 0xff28008
>>>>
>>>> It does seem that the same thread is calling Init many times, with both
>>>> same FunctionStates and different FunctionStates. In other words, it seems
>>>> that these Init() calls are grouped, where different groups have different
>>>> THREAD_LOCAL FunctionState storage and calls within the same group share
>>>> the same THREAD_LOCAL FunctionState storage.
>>>>
>>>> Does my observation make any sense?
>>>>
>>>> On Fri, Jun 5, 2020 at 2:55 PM Tim Armstrong <tarmstr...@cloudera.com>
>>>> wrote:
>>>>
>>>>> I think it would be easier to understand what you're seeing if you
>>>>> provided an example of what the code for your aggregate function looks
>>>>> like. If you call SetFunctionState(THREAD_LOCAL), I don't see a way that
>>>>> the pointer you set would be returned from GetFunctionState(THREAD_LOCAL)
>>>>> in a different thread.
>>>>>
>>>>> Init() is called for every aggregate tuple, so it can be called many
>>>>> times per thread for aggregations with a grouping key.
>>>>>
>>>>> Setting the FRAGMENT_LOCAL state only really makes sense for UDFs when
>>>>> Prepare(FRAGMENT_LOCAL) is called. After that the state is copied to any
>>>>> thread-local FunctionContexts. Calling SetFunctionState(FRAGMENT_LOCAL)
>>>>> later on is only going to modify the thread-local FunctionContext anyway.
>>>>>
>>>>> On Fri, Jun 5, 2020 at 11:06 AM Shuhao Tan <johnmave...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi all,
>>>>>>
>>>>>> I recently wrote some UDA and I noticed that in be/src/udf/udf.h the
>>>>>> comments for SetFunctionState starts with
>>>>>> > Methods for maintaining state across UDF/UDA function calls.
>>>>>> I presume GetFunctionState/SetFunctionState should work for UDA as
>>>>>> well.
>>>>>>
>>>>>> I first tried to find an example in the repo, but I found the
>>>>>> function is exclusively used by UDF.
>>>>>> I then implemented a simple UDA just to test its behaviour. My
>>>>>> current findings are:
>>>>>> Even when using SetFunctionState(THREAD_LOCAL, some_ptr) only in the
>>>>>> Init, other threads (presumably in the same fragment) can still see it 
>>>>>> with
>>>>>> GetFunctionState(THREAD_LOCAL) if their Init were invoked later.
>>>>>> Currently it seems that threads in the same fragment were calling
>>>>>> Init sequentially without race condition on FunctionState.
>>>>>>
>>>>>> My questions: Are GetFunctionState/SetFunctionState well-defined in
>>>>>> UDA?
>>>>>> If so, what are the semantics and execution guarantees? How does
>>>>>> passing different FunctionStateScope change the behavior? Is it 
>>>>>> guaranteed
>>>>>> thread-safe?
>>>>>>
>>>>>> Thanks.
>>>>>>
>>>>>

Reply via email to