On Fri, Feb 18, 2022 at 3:27 PM Luke Cwik <[email protected]> wrote:

> You are most likely mutating an object stored in a cache and by luck the
> object is still cached when the next bundle starts processing.
>
> Anything you read from state should not be mutated without the
> corresponding write call.
> Anything you write to state should not be mutated after the call to write.
>

That is what my understanding was too. I am currently debugging an issue in
our production where weird things are happening and that kind of code is
present in that pipeline. Above code sample is a simplified version of my
code which I was debugging. When I ran that code from local I was expecting
that bug to reproduce all the time but that "luck" element is giving me a
hard time.

So the takeaway I guess is that I should add those .write() calls at the
end of processElement once all mutation is done.

"You are most likely mutating an object stored in a cache and by luck the
object is still cached when the next bundle starts processing."
Can you please elaborate a bit more here, you mean that the Pojo instance
after is kept alive across calls to processElement?


> Apache Beam could guard against both of these cases but it would require
> giving a copy during read() (reduces performance) or and making a copy
> during write() (increases memory usage until the writes are materialized).
>
>
> On Fri, Feb 18, 2022 at 3:07 PM gaurav mishra <
> [email protected]> wrote:
>
>> Hi,
>> I have a stateful DoFn where I am storing a pojo. in my processElement I
>> am doing something like this -
>>
>> Pojo a = state.read();
>> .....
>> ....
>> a.setField1(123)
>> ....
>> end of function
>>
>> As you can see I don't have a state.write(a);
>>
>> when I read that statespec in my onTimer() method I get the correct value.
>>
>> Pojo a = state.read();
>> log.info(a.getField1()) // 123
>>
>> I would imagine that state.read() and state.write() are both network
>> calls. Without the .write() call statespec should have had stale data.
>> is my understanding correct wrong here wrt to read() and write() calls.
>> I was expecting log.info(a.getField1()) to print some old value here.
>>
>>
>>

Reply via email to