Thanks Luke for the explanation. That helps. On Fri, Feb 18, 2022 at 3:58 PM Luke Cwik <[email protected]> wrote:
> Yes, the POJO instance is stored in memory in a cache associated with a > cache key within the process. As long as bundles are being processed > successfully, the runner keeps giving the worker the same cache key so that > you will keep getting back the same POJO instance. If a bundle fails to > process or the runner moves the work to another worker then it knows to > change this cache key effectively invalidating the entry. Eventually the > entry will be evicted since the cache is LRU based. > > This[1] goes into a bunch of details of how this is done when using the > portability APIs but a lot of this is the same pre-portability using method > calls between the runner owned portion and the SDK owned portion of the > worker. > > 1: > https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.7ghoih5aig5m > > On Fri, Feb 18, 2022 at 3:38 PM gaurav mishra < > [email protected]> wrote: > >> >> >> On Fri, Feb 18, 2022 at 3:27 PM Luke Cwik <[email protected]> wrote: >> >>> You are most likely mutating an object stored in a cache and by luck the >>> object is still cached when the next bundle starts processing. >>> >>> Anything you read from state should not be mutated without the >>> corresponding write call. >>> Anything you write to state should not be mutated after the call to >>> write. >>> >> >> That is what my understanding was too. I am currently debugging an issue >> in our production where weird things are happening and that kind of code is >> present in that pipeline. Above code sample is a simplified version of my >> code which I was debugging. When I ran that code from local I was expecting >> that bug to reproduce all the time but that "luck" element is giving me a >> hard time. >> >> So the takeaway I guess is that I should add those .write() calls at the >> end of processElement once all mutation is done. >> >> "You are most likely mutating an object stored in a cache and by luck the >> object is still cached when the next bundle starts processing." >> Can you please elaborate a bit more here, you mean that the Pojo instance >> after is kept alive across calls to processElement? >> >> >>> Apache Beam could guard against both of these cases but it would require >>> giving a copy during read() (reduces performance) or and making a copy >>> during write() (increases memory usage until the writes are materialized). >>> >>> >>> On Fri, Feb 18, 2022 at 3:07 PM gaurav mishra < >>> [email protected]> wrote: >>> >>>> Hi, >>>> I have a stateful DoFn where I am storing a pojo. in my processElement >>>> I am doing something like this - >>>> >>>> Pojo a = state.read(); >>>> ..... >>>> .... >>>> a.setField1(123) >>>> .... >>>> end of function >>>> >>>> As you can see I don't have a state.write(a); >>>> >>>> when I read that statespec in my onTimer() method I get the correct >>>> value. >>>> >>>> Pojo a = state.read(); >>>> log.info(a.getField1()) // 123 >>>> >>>> I would imagine that state.read() and state.write() are both network >>>> calls. Without the .write() call statespec should have had stale data. >>>> is my understanding correct wrong here wrt to read() and write() calls. >>>> I was expecting log.info(a.getField1()) to print some old value here. >>>> >>>> >>>>
