Generally +1 The one use case I've seen of union state I've seen in production (outside of sources and sinks) is as a "poor mans" broadcast state. This was obviously before that feature was added which is now a few years ago so I don't know if those pipelines still exist. FWIW, if they do the state processor api can provide a migration path as it supports rewriting union state as broadcast state.
Seth On Wed, Sep 9, 2020 at 10:21 AM Arvid Heise <ar...@ververica.com> wrote: > +1 to getting rid of non-keyed state as is in general and for union state > in particular. I had a hard time to wrap my head around the semantics of > non-keyed state when designing the rescale of unaligned checkpoint. > > The only plausible use cases are legacy source and sinks. Both should also > be reworked in deprecated. > > My main question is how to represent state in these two cases. For sources, > state should probably be bound to splits. In that regard, split (id) may > act as a key. More generally, there should be probably a concept that > supersedes keys and includes splits. > > For sinks, I can see two cases: > - Either we are in a keyed context, then state should be bound to the key. > - Or we are in a non-keyed context, then state might be bound to the split > (?) in case of a source->sink chaining. > - Maybe it should also be a new(?) concept like output partition. > > It's not clear to me if there are more cases and if we can always find a > good way to bind state to some sort of key, especially for arbitrary > communication patterns (which we may need to replace as well potentially). > > On Wed, Sep 9, 2020 at 4:09 PM Aljoscha Krettek <aljos...@apache.org> > wrote: > > > Hi Devs, > > > > @Users: I'm cc'ing the user ML to see if there are any users that are > > relying on this feature. Please comment here if that is the case. > > > > I'd like to discuss the deprecation and eventual removal of UnionList > > Operator State, aka Operator State with Union Redistribution. If you > > don't know what I'm talking about you can take a look in the > > documentation: [1]. It's not documented thoroughly because it started > > out as mostly an internal feature. > > > > The immediate main reason for removing this is also mentioned in the > > documentation: "Do not use this feature if your list may have high > > cardinality. Checkpoint metadata will store an offset to each list > > entry, which could lead to RPC framesize or out-of-memory errors." The > > insidious part of this limitation is that you will only notice that > > there is a problem when it is too late. Checkpointing will still work > > and a program can continue when the state size is too big. The system > > will only fail when trying to restore from a snapshot that has union > > state that is too big. This could be fixed by working around that issue > > but I think there are more long-term issues with this type of state. > > > > I think we need to deprecate and remove API for state that is not tied > > to a key. Keyed state is easy to reason about, the system can > > re-partition state and also re-partition records and therefore scale the > > system in and out. Operator state, on the other hand is not tied to a > > key but an operator. This is a more "physical" concept, if you will, > > that potentially ties business logic closer to the underlying runtime > > execution model, which in turns means less degrees of freedom for the > > framework, that is Flink. This is future work, though, but we should > > start with deprecating union list state because it is the potentially > > most dangerous type of state. > > > > We currently use this state type internally in at least the > > StreamingFileSink, FlinkKafkaConsumer, and FlinkKafkaProducer. However, > > we're in the process of hopefully getting rid of it there with our work > > on sources and sinks. Before we fully remove it, we should of course > > signal this to users by deprecating it. > > > > What do you think? > > > > Best, > > Aljoscha > > > > > -- > > Arvid Heise | Senior Java Developer > > <https://www.ververica.com/> > > Follow us @VervericaData > > -- > > Join Flink Forward <https://flink-forward.org/> - The Apache Flink > Conference > > Stream Processing | Event Driven | Real Time > > -- > > Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany > > -- > Ververica GmbH > Registered at Amtsgericht Charlottenburg: HRB 158244 B > Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji > (Toni) Cheng >