On Fri, Feb 18, 2022 at 3:27 PM Luke Cwik <[email protected]> wrote: > You are most likely mutating an object stored in a cache and by luck the > object is still cached when the next bundle starts processing. > > Anything you read from state should not be mutated without the > corresponding write call. > Anything you write to state should not be mutated after the call to write. >
That is what my understanding was too. I am currently debugging an issue in our production where weird things are happening and that kind of code is present in that pipeline. Above code sample is a simplified version of my code which I was debugging. When I ran that code from local I was expecting that bug to reproduce all the time but that "luck" element is giving me a hard time. So the takeaway I guess is that I should add those .write() calls at the end of processElement once all mutation is done. "You are most likely mutating an object stored in a cache and by luck the object is still cached when the next bundle starts processing." Can you please elaborate a bit more here, you mean that the Pojo instance after is kept alive across calls to processElement? > Apache Beam could guard against both of these cases but it would require > giving a copy during read() (reduces performance) or and making a copy > during write() (increases memory usage until the writes are materialized). > > > On Fri, Feb 18, 2022 at 3:07 PM gaurav mishra < > [email protected]> wrote: > >> Hi, >> I have a stateful DoFn where I am storing a pojo. in my processElement I >> am doing something like this - >> >> Pojo a = state.read(); >> ..... >> .... >> a.setField1(123) >> .... >> end of function >> >> As you can see I don't have a state.write(a); >> >> when I read that statespec in my onTimer() method I get the correct value. >> >> Pojo a = state.read(); >> log.info(a.getField1()) // 123 >> >> I would imagine that state.read() and state.write() are both network >> calls. Without the .write() call statespec should have had stale data. >> is my understanding correct wrong here wrt to read() and write() calls. >> I was expecting log.info(a.getField1()) to print some old value here. >> >> >>
