Dear Flink Community, I have attempted to use Elasticsearch8AsyncSink from the newly released flink-elasticsearch-connector 3.1.0 with the Flink 1.20.0. It works great, except for the state deserialization issue.
If the Flink Job takes a checkpoint when there are ElasticSearch operations being buffered, and then fails, it cannot recover from this checkpoint. The following error prevents recovery: ``` com.esotericsoftware.kryo.KryoException: Unable to find class: co.elastic.clients.elasticsearch.core.bulk.UpdateOperation Serialization trace: bulkOperationVariant (org.apache.flink.connector.elasticsearch.sink.Operation) at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138) at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115) at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641) at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:99) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657) at org.apache.flink.connector.elasticsearch.sink.OperationSerializer.deserialize(OperationSerializer.java:51) at org.apache.flink.connector.elasticsearch.sink.Elasticsearch8AsyncSinkSerializer.deserializeRequestFromStream(Elasticsearch8AsyncSinkSerializer.java:39) at org.apache.flink.connector.elasticsearch.sink.Elasticsearch8AsyncSinkSerializer.deserializeRequestFromStream(Elasticsearch8AsyncSinkSerializer.java:30) at org.apache.flink.connector.base.sink.writer.AsyncSinkWriterStateSerializer.deserialize(AsyncSinkWriterStateSerializer.java:81) at org.apache.flink.connector.base.sink.writer.AsyncSinkWriterStateSerializer.deserialize(AsyncSinkWriterStateSerializer.java:39) at org.apache.flink.core.io.SimpleVersionedSerialization.readVersionAndDeSerialize(SimpleVersionedSerialization.java:227) at org.apache.flink.streaming.api.operators.util.SimpleVersionedListState$DeserializingIterator.next(SimpleVersionedListState.java:138) at java.base/java.util.Iterator.forEachRemaining(Iterator.java:133) at org.apache.flink.util.CollectionUtil.iterableToList(CollectionUtil.java:119) at org.apache.flink.streaming.runtime.operators.sink.StatefulSinkWriterStateHandler.createWriter(StatefulSinkWriterStateHandler.java:103) at org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.initializeState(SinkWriterOperator.java:148) at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:147) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:294) at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106) at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreStateAndGates(StreamTask.java:858) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$restoreInternal$5(StreamTask.java:812) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55) at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:812) at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:771) at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:972) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:941) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:765) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:577) at java.base/java.lang.Thread.run(Thread.java:829) Caused by: java.lang.ClassNotFoundException: co.elastic.clients.elasticsearch.core.bulk.UpdateOperation at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581) at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178) at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522) at java.base/java.lang.Class.forName0(Native Method) at java.base/java.lang.Class.forName(Class.java:398) at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:136) ... 29 more ``` The failing UpdateOperation class exists in the Jar. I have attempted to dig deeper, and it seems like this class is not visible from the context of ElasticSearch connector's OperationSerializer class. Since this class initializes its own instance of Kryo, I'm not sure how I can fix or otherwise influence this. I have produced a minimal reproduction example in the github repository: https://github.com/mayorandrew/flink-elasticsearch-deserialization-bug. Is this a bug with the connector, or am I missing some configuration on my side? Thank you in advance for your support. Kind regards, Andrey Starostin