Thanks for the clarification. I'll try to find some time to write a reproducible test case and submit a ticket. While it may not be able to delete the non-referenced ones, I'm surprised it's exponentially replicating them, and so it's probably worth documenting in a ticket.
On Wed, Nov 27, 2019 at 12:15 PM Gyula Fóra <gyula.f...@gmail.com> wrote: > > You are right Aaron. > > I would say this is like this by design as Flink doesn't require you to > initialize state in the open method so it has no safe way to delete the > non-referenced ones. > > What you can do is restore the state and clear it on all operators and not > reference it again. I know this feels like a workaround but I have no better > idea at the moment. > > Cheers, > Gyula > > On Wed, Nov 27, 2019 at 6:08 PM Aaron Levin <aaronle...@stripe.com> wrote: >> >> Hi, >> >> Yes, we're using UNION state. I would assume, though, that if you are >> not reading the UNION state it would either stop stick around as a >> constant factor in your state size, or get cleared. >> >> Looks like I should try to recreate a small example and submit a bug >> if this is true. Otherwise it's impossible to remove union state from >> your operators. >> >> On Wed, Nov 27, 2019 at 6:50 AM Congxian Qiu <qcx978132...@gmail.com> wrote: >> > >> > Hi >> > >> > Do you use UNION state in your scenario, when using UNION state, then JM >> > may encounter OOM because each TDD will contains all the state of all >> > subtasks[1] >> > >> > [1] >> > https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#using-managed-operator-state >> > Best, >> > Congxian >> > >> > >> > Aaron Levin <aaronle...@stripe.com> 于2019年11月27日周三 上午3:55写道: >> >> >> >> Hi, >> >> >> >> Some context: after a refactoring, we were unable to start our jobs. >> >> They started fine and checkpointed fine, but once the job restarted >> >> owing to a transient failure, the application was unable to start. The >> >> Job Manager was OOM'ing (even when I gave them 256GB of ram!). The >> >> `_metadata` file for the checkpoint was 1.3GB (usually 11MB). Inside >> >> the `_metadata` file we saw `- 1402496 offsets: >> >> com.stripe.flink.backfill.kafka-archive-file-progress`. This happened >> >> to be the operator state we were no longer initializing or >> >> snapshotting after the refactoring. >> >> >> >> Before I dig further into this and try to find a smaller reproducible >> >> test case I thought I would ask if someone knows what the expected >> >> behaviour is for the following scenario: >> >> >> >> suppose you have an operator (in this case a Source) which has some >> >> operator ListState. Suppose you run your flink job for some time and >> >> then later refactor your job such that you no longer use that state >> >> (so after the refactoring you're no longer initializing this operator >> >> state in initializeState, nor are you snapshotting the operator state >> >> in snapshotState). If you launch your new code from a recent >> >> savepoint, what do we expect to happen to the state? Do we anticipate >> >> the behaviour I explained above? >> >> >> >> My assumption would be that Flink would not read this state and so it >> >> would be removed from the next checkpoint or savepoint. Alternatively, >> >> I might assume it would not be read but would linger around every >> >> future checkpoint or savepoint. However, it feels like what is >> >> happening is it's not read and then possibly replicated by every >> >> instance of the task every time a checkpoint happens (hence the >> >> accidentally exponential behaviour). >> >> >> >> Thoughts? >> >> >> >> PS - in case someone asks: I was sure that we were calling `.clear()` >> >> appropriately in `snapshotState` (we, uh, already learned that lesson >> >> :D) >> >> >> >> Best, >> >> >> >> Aaron Levin