Dear Flink Community,

I have attempted to use Elasticsearch8AsyncSink from the newly released
flink-elasticsearch-connector 3.1.0 with the Flink 1.20.0. It works great,
except for the state deserialization issue.

If the Flink Job takes a checkpoint when there are ElasticSearch operations
being buffered, and then fails, it cannot recover from this checkpoint. The
following error prevents recovery:

```
com.esotericsoftware.kryo.KryoException: Unable to find class:
co.elastic.clients.elasticsearch.core.bulk.UpdateOperation
Serialization trace:
bulkOperationVariant
(org.apache.flink.connector.elasticsearch.sink.Operation)
at
com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138)
at
com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115)
at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
at
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:99)
at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657)
at
org.apache.flink.connector.elasticsearch.sink.OperationSerializer.deserialize(OperationSerializer.java:51)
at
org.apache.flink.connector.elasticsearch.sink.Elasticsearch8AsyncSinkSerializer.deserializeRequestFromStream(Elasticsearch8AsyncSinkSerializer.java:39)
at
org.apache.flink.connector.elasticsearch.sink.Elasticsearch8AsyncSinkSerializer.deserializeRequestFromStream(Elasticsearch8AsyncSinkSerializer.java:30)
at
org.apache.flink.connector.base.sink.writer.AsyncSinkWriterStateSerializer.deserialize(AsyncSinkWriterStateSerializer.java:81)
at
org.apache.flink.connector.base.sink.writer.AsyncSinkWriterStateSerializer.deserialize(AsyncSinkWriterStateSerializer.java:39)
at
org.apache.flink.core.io.SimpleVersionedSerialization.readVersionAndDeSerialize(SimpleVersionedSerialization.java:227)
at
org.apache.flink.streaming.api.operators.util.SimpleVersionedListState$DeserializingIterator.next(SimpleVersionedListState.java:138)
at java.base/java.util.Iterator.forEachRemaining(Iterator.java:133)
at
org.apache.flink.util.CollectionUtil.iterableToList(CollectionUtil.java:119)
at
org.apache.flink.streaming.runtime.operators.sink.StatefulSinkWriterStateHandler.createWriter(StatefulSinkWriterStateHandler.java:103)
at
org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.initializeState(SinkWriterOperator.java:148)
at
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:147)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:294)
at
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreStateAndGates(StreamTask.java:858)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$restoreInternal$5(StreamTask.java:812)
at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:812)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:771)
at
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:972)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:941)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:765)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:577)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.ClassNotFoundException:
co.elastic.clients.elasticsearch.core.bulk.UpdateOperation
at
java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581)
at
java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
at java.base/java.lang.Class.forName0(Native Method)
at java.base/java.lang.Class.forName(Class.java:398)
at
com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:136)
... 29 more
```

The failing UpdateOperation class exists in the Jar.
I have attempted to dig deeper, and it seems like this class is not visible
from the context of ElasticSearch connector's OperationSerializer class.
Since this class initializes its own instance of Kryo, I'm not sure how I
can fix or otherwise influence this.

I have produced a minimal reproduction example in the github repository:
https://github.com/mayorandrew/flink-elasticsearch-deserialization-bug.

Is this a bug with the connector, or am I missing some configuration on my
side?

Thank you in advance for your support.

Kind regards,
Andrey Starostin

Reply via email to