Thanks Luke for the explanation. That helps.

On Fri, Feb 18, 2022 at 3:58 PM Luke Cwik <[email protected]> wrote:

> Yes, the POJO instance is stored in memory in a cache associated with a
> cache key within the process. As long as bundles are being processed
> successfully, the runner keeps giving the worker the same cache key so that
> you will keep getting back the same POJO instance. If a bundle fails to
> process or the runner moves the work to another worker then it knows to
> change this cache key effectively invalidating the entry. Eventually the
> entry will be evicted since the cache is LRU based.
>
> This[1] goes into a bunch of details of how this is done when using the
> portability APIs but a lot of this is the same pre-portability using method
> calls between the runner owned portion and the SDK owned portion of the
> worker.
>
> 1:
> https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.7ghoih5aig5m
>
> On Fri, Feb 18, 2022 at 3:38 PM gaurav mishra <
> [email protected]> wrote:
>
>>
>>
>> On Fri, Feb 18, 2022 at 3:27 PM Luke Cwik <[email protected]> wrote:
>>
>>> You are most likely mutating an object stored in a cache and by luck the
>>> object is still cached when the next bundle starts processing.
>>>
>>> Anything you read from state should not be mutated without the
>>> corresponding write call.
>>> Anything you write to state should not be mutated after the call to
>>> write.
>>>
>>
>> That is what my understanding was too. I am currently debugging an issue
>> in our production where weird things are happening and that kind of code is
>> present in that pipeline. Above code sample is a simplified version of my
>> code which I was debugging. When I ran that code from local I was expecting
>> that bug to reproduce all the time but that "luck" element is giving me a
>> hard time.
>>
>> So the takeaway I guess is that I should add those .write() calls at the
>> end of processElement once all mutation is done.
>>
>> "You are most likely mutating an object stored in a cache and by luck the
>> object is still cached when the next bundle starts processing."
>> Can you please elaborate a bit more here, you mean that the Pojo instance
>> after is kept alive across calls to processElement?
>>
>>
>>> Apache Beam could guard against both of these cases but it would require
>>> giving a copy during read() (reduces performance) or and making a copy
>>> during write() (increases memory usage until the writes are materialized).
>>>
>>>
>>> On Fri, Feb 18, 2022 at 3:07 PM gaurav mishra <
>>> [email protected]> wrote:
>>>
>>>> Hi,
>>>> I have a stateful DoFn where I am storing a pojo. in my processElement
>>>> I am doing something like this -
>>>>
>>>> Pojo a = state.read();
>>>> .....
>>>> ....
>>>> a.setField1(123)
>>>> ....
>>>> end of function
>>>>
>>>> As you can see I don't have a state.write(a);
>>>>
>>>> when I read that statespec in my onTimer() method I get the correct
>>>> value.
>>>>
>>>> Pojo a = state.read();
>>>> log.info(a.getField1()) // 123
>>>>
>>>> I would imagine that state.read() and state.write() are both network
>>>> calls. Without the .write() call statespec should have had stale data.
>>>> is my understanding correct wrong here wrt to read() and write() calls.
>>>> I was expecting log.info(a.getField1()) to print some old value here.
>>>>
>>>>
>>>>

Reply via email to