Hi Forideal, When using RocksDB, we need to serialize the data (to store it on disk), whereas when using the memory backend, the data (in this case RedConcat.ConcatString instances) is on the heap, thus we won't run into this issue.
Are you registering your custom types in the ExecutionConfig? (If so, it increases the chances of this error to happen) Could you share the code of RedConcat.ConcatString as well? I would not be surprised if this is a bug in Flink. Using a UDAF with custom types is probably not a very common use case. Best, Robert On Fri, Aug 14, 2020 at 12:39 PM forideal <fszw...@163.com> wrote: > Hi > I wrote a UDAF referring to this article > https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/functions/udfs.html#aggregation-functions, > when using in-memory state, the task can run normally. However, When I > chose rocksdb as the state backend, I encountered this error. Thank you for > helping me see this problem. > > The following is the error content: > com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: > 87 > Serialization trace: > list (com.red.data.platform.RedConcat$ConcatString) > at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass( > DefaultClassResolver.java:119) > at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641) > at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField > .java:99) > at com.esotericsoftware.kryo.serializers.FieldSerializer.read( > FieldSerializer.java:528) > at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) > at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer > .deserialize(KryoSerializer.java:346) > at org.apache.flink.util.InstantiationUtil.deserializeFromByt > > public class RedConcat extends AggregateFunction<String, > RedConcat.ConcatString> { > > public class ConcatString { > public List<String> list = new ArrayList<>(); > > public void add(String toString) { > if (list != null) { > if (list.size() < 100) { > list.add(toString); > } > } > } > } > > @Override > public boolean isDeterministic() { > return false; > } > > @Override > public ConcatString createAccumulator() { > return new ConcatString(); > } > > @Override > public void open(FunctionContext context) > throws Exception { > } > > > > Best forideal > > > > >