Hi Prasanna,
it could be a bug where the ExecutionConfig is not forwarded properly to
all locations where the KryoSerializer is used.
As a first step for debugging, I would recommend to create a custom
TypeInformation (most methods are not relevant except for
createTypeSerializer and getTypeClass) and instantiate the
KryoSerializer manually. You can then pass this type information to all
operators (using dataStream.returns() or for state). This ensures that
all locations use the same KryoSerializer configuration and registered
ID get lost in the stack.
You can also check locally (either in your IDE or with a remote
debugger) that all KryoSerializer instances have the same registry
entries for class ID: 97.
Where is UnmodifiableCollectionsSerializer coming from?
Regards,
Timo
On 01.11.21 11:34, Prasanna kumar wrote:
Any thoughts on these ?
Thanks,
Prasanna.
On Sat, Oct 30, 2021 at 7:25 PM Prasanna kumar
<prasannakumarram...@gmail.com <mailto:prasannakumarram...@gmail.com>>
wrote:
Hi ,
We have the following Flink Job that processes records from kafka
based on the rules we get from S3 files into broadcasted state.
Earlier we were able to spin a job with any number of task
parallelism without any issues.
Recently we made changes to the Broadcast state Structure and it is
working well upto parallelism of 3.
If we give parallelism of 4 or more , we end up getting
serialization exceptions which result in job failure. ( Block 4 as
in the Image)
Also If the leader job manager dies and a new one comes up , the
other jobs are restarted automatically but this job dies of
serialization issues.
But when we start manually with a parallelism <= 3, it is working.
Programmatically this code is working when we tested with all
possible test cases.
How do we debug serialization issues that we face.
I have attached the exception of logs and the code related to it.
Let me know if any more details are required.
*KRYO SERIALIZAER INITIALISATON*
Class<?> unmodifiableCollectionsSerializer =
Class.forName("java.util.Collections$UnmodifiableCollection");
env.getConfig().addDefaultKryoSerializer(
unmodifiableCollectionsSerializer,
UnmodifiableCollectionsSerializer.class
);
*CONFIGSTATE INTERFACE(USED IN BROADCAST STATE)*
public interface EventConfigState {
void createOrUpdateState(String key, DataPair dataPair);
List<OutputMessage>executeRule(InputMessage inputMessage);
Map<String, Set<DataPair>>getCurrentState();
}
*DERIVED EVENT CONFIG STATE IMPLEMENTATION*
public class DerivedEventConfigStateimplements EventConfigState {
Loggerlogger = LoggerFactory.getLogger(DerivedEventConfigState.class);
private Map<String, Set<DataPair>>derivedConfigMap;
public DerivedEventConfigState() {
derivedConfigMap =new HashMap<>();
}
public void createOrUpdateState(String key, DataPair dataPair) {
derivedConfigMap.putIfAbsent(key, new HashSet<>());
if (derivedConfigMap.get(key).contains(dataPair)) {
derivedConfigMap.get(key).remove(dataPair);
}
derivedConfigMap.get(key).add(dataPair);
}
@Override
public List<OutputMessage>executeRule(InputMessage inputMessage) {
String key = inputMessage.getKey();
List<OutputMessage> outputMessageList =new ArrayList<>();
if (derivedConfigMap.size() ==0) {
logger.error("DerivedEventConfigMap is empty");
return outputMessageList;
}
if (derivedConfigMap.get(key) ==null) {
return outputMessageList;
}
for (DataPair dataPair :derivedConfigMap.get(key)) {
IRule rule = dataPair.getRule();
if (rule.isSatisfied(inputMessage)) {
IMessageBuilder messageBuilder =
MessageBuilderFactory.getMessageBuilder("OutputMessage");
OutputMessage outputMessage = messageBuilder.build(
inputMessage,
dataPair.getEventMessageDefinition()
);
outputMessageList.add(outputMessage);
}
}
return outputMessageList;
}
@Override
public Map<String, Set<DataPair>>getCurrentState() {
return Collections.unmodifiableMap(derivedConfigMap);
}
@Override
public StringtoString() {
return "DerivedEventConfigState{"
+"derivedConfigMap=" +derivedConfigMap
+'}';
}
}
Attached are three Exceptions thrown rando
*Caused by: org.apache.flink.util.SerializedThrowable: TABLE-OP
at java.net.URLClassLoader.findClass(Unknown Source) ~[?:?]
at java.lang.ClassLoader.loadClass(Unknown Source) ~[?:?]
at
org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:64)
~[flink-dist_2.12-1.12.2.jar:1.12.2]
at
org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
~[flink-dist_2.12-1.12.2.jar:1.12.2]
at
org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
~[flink-dist_2.12-1.12.2.jar:1.12.2]
at java.lang.ClassLoader.loadClass(Unknown Source) ~[?:?]
at
org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.loadClass(FlinkUserCodeClassLoaders.java:172)
~[flink-dist_2.12-1.12.2.jar:1.12.2]
at java.lang.Class.forName0(Native Method) ~[?:?]
at java.lang.Class.forName(Unknown Source) ~[?:?]
at
com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:136)
~[flink-dist_2.12-1.12.2.jar:1.12.2]
at
com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115)
~[flink-dist_2.12-1.12.2.jar:1.12.2]
at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
~[flink-dist_2.12-1.12.2.jar:1.12.2]
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
~[flink-dist_2.12-1.12.2.jar:1.12.2]
at
com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:116)
~[flink-dist_2.12-1.12.2.jar:1.12.2]
at
com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)
~[flink-dist_2.12-1.12.2.jar:1.12.2]
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
~[flink-dist_2.12-1.12.2.jar:1.12.2]
at
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143)
~[flink-dist_2.12-1.12.2.jar:1.12.2]
at
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
~[flink-dist_2.12-1.12.2.jar:1.12.2]
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
~[flink-dist_2.12-1.12.2.jar:1.12.2]
at
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
~[flink-dist_2.12-1.12.2.jar:1.12.2]
at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
~[flink-dist_2.12-1.12.2.jar:1.12.2]
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657)
~[flink-dist_2.12-1.12.2.jar:1.12.2]
at
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:273)
~[flink-dist_2.12-1.12.2.jar:1.12.2]
at
org.apache.flink.api.common.typeutils.base.MapSerializer.copy(MapSerializer.java:111)
~[flink-dist_2.12-1.12.2.jar:1.12.2]
at
org.apache.flink.runtime.state.HeapBroadcastState.<init>(HeapBroadcastState.java:69)
~[flink-dist_2.12-1.12.2.jar:1.12.2]
at
org.apache.flink.runtime.state.HeapBroadcastState.deepCopy(HeapBroadcastState.java:84)
~[flink-dist_2.12-1.12.2.jar:1.12.2]
at
org.apache.flink.runtime.state.HeapBroadcastState.deepCopy(HeapBroadcastState.java:40)
~[flink-dist_2.12-1.12.2.jar:1.12.2]
at
org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy.snapshot(DefaultOperatorStateBackendSnapshotStrategy.java:100)
~[flink-dist_2.12-1.12.2.jar:1.12.2]
at
org.apache.flink.runtime.state.DefaultOperatorStateBackend.snapshot(DefaultOperatorStateBackend.java:234)
~[flink-dist_2.12-1.12.2.jar:1.12.2]
at
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:213)
~[flink-dist_2.12-1.12.2.jar:1.12.2]
at
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:162)
~[flink-dist_2.12-1.12.2.jar:1.12.2]
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:371)
~[flink-dist_2.12-1.12.2.jar:1.12.2]
at
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointStreamOperator(SubtaskCheckpointCoordinatorImpl.java:686)
~[flink-dist_2.12-1.12.2.jar:1.12.2]
at
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.buildOperatorSnapshotFutures(SubtaskCheckpointCoordinatorImpl.java:607)
~[flink-dist_2.12-1.12.2.jar:1.12.2]
at
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:572)
~[flink-dist_2.12-1.12.2.jar:1.12.2]
at
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:298)
~[flink-dist_2.12-1.12.2.jar:1.12.2]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$9(StreamTask.java:1004)
~[flink-dist_2.12-1.12.2.jar:1.12.2]
at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
~[flink-dist_2.12-1.12.2.jar:1.12.2]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:988)
~[flink-dist_2.12-1.12.2.jar:1.12.2]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:947)
~[flink-dist_2.12-1.12.2.jar:1.12.2]
... 14 more*
*Caused by: org.apache.flink.util.SerializedThrowable: Encountered
unregistered class ID: 97 Serialization trace: derivedConfigMap
(com.org.app.model.producer.DerivedEventConfigState) at
com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119)
~[flink-dist_2.12-1.12.2.jar:1.12.2] at
com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
~[flink-dist_2.12-1.12.2.jar:1.12.2] at
com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
~[flink-dist_2.12-1.12.2.jar:1.12.2] at
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:135)
~[flink-dist_2.12-1.12.2.jar:1.12.2] at
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
~[flink-dist_2.12-1.12.2.jar:1.12.2] at
com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
~[flink-dist_2.12-1.12.2.jar:1.12.2] at
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143)
~[flink-dist_2.12-1.12.2.jar:1.12.2] at
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
~[flink-dist_2.12-1.12.2.jar:1.12.2] at
com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
~[flink-dist_2.12-1.12.2.jar:1.12.2] at
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
~[flink-dist_2.12-1.12.2.jar:1.12.2] at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
~[flink-dist_2.12-1.12.2.jar:1.12.2] at
com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657)
~[flink-dist_2.12-1.12.2.jar:1.12.2] at
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:273)
~[flink-dist_2.12-1.12.2.jar:1.12.2] at
org.apache.flink.api.common.typeutils.base.MapSerializer.copy(MapSerializer.java:111)
~[flink-dist_2.12-1.12.2.jar:1.12.2] at
org.apache.flink.runtime.state.HeapBroadcastState.<init>(HeapBroadcastState.java:69)
~[flink-dist_2.12-1.12.2.jar:1.12.2] at
org.apache.flink.runtime.state.HeapBroadcastState.deepCopy(HeapBroadcastState.java:84)
~[flink-dist_2.12-1.12.2.jar:1.12.2] at
org.apache.flink.runtime.state.HeapBroadcastState.deepCopy(HeapBroadcastState.java:40)
~[flink-dist_2.12-1.12.2.jar:1.12.2] at
org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy.snapshot(DefaultOperatorStateBackendSnapshotStrategy.java:100)
~[flink-dist_2.12-1.12.2.jar:1.12.2] at
org.apache.flink.runtime.state.DefaultOperatorStateBackend.snapshot(DefaultOperatorStateBackend.java:234)
~[flink-dist_2.12-1.12.2.jar:1.12.2] at
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:213)
~[flink-dist_2.12-1.12.2.jar:1.12.2] at
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:162)
~[flink-dist_2.12-1.12.2.jar:1.12.2] at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:371)
~[flink-dist_2.12-1.12.2.jar:1.12.2] at
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointStreamOperator(SubtaskCheckpointCoordinatorImpl.java:686)
~[flink-dist_2.12-1.12.2.jar:1.12.2] at
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.buildOperatorSnapshotFutures(SubtaskCheckpointCoordinatorImpl.java:607)
~[flink-dist_2.12-1.12.2.jar:1.12.2] at
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:572)
~[flink-dist_2.12-1.12.2.jar:1.12.2] at
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:298)
~[flink-dist_2.12-1.12.2.jar:1.12.2] at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$9(StreamTask.java:1004)
~[flink-dist_2.12-1.12.2.jar:1.12.2] at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
~[flink-dist_2.12-1.12.2.jar:1.12.2] at
org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:988)
~[flink-dist_2.12-1.12.2.jar:1.12.2] at
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:947)
~[flink-dist_2.12-1.12.2.jar:1.12.2] ... 14 more*
*Caused by: org.apache.flink.util.SerializedThrowable: Buffer
underflow. Serialization trace: logger
(com.org.app.model.producer.AppEventConfigState) at
com.esotericsoftware.kryo.io.Input.require(Input.java:181)
~[flink-dist_2.12-1.12.2.jar:1.12.2] at
com.esotericsoftware.kryo.io.Input.readVarInt(Input.java:355)
~[flink-dist_2.12-1.12.2.jar:1.12.2] at
com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:109)
~[flink-dist_2.12-1.12.2.jar:1.12.2] at
com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
~[flink-dist_2.12-1.12.2.jar:1.12.2] at
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:99)
~[flink-dist_2.12-1.12.2.jar:1.12.2] at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
~[flink-dist_2.12-1.12.2.jar:1.12.2] at
com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657)
~[flink-dist_2.12-1.12.2.jar:1.12.2] at
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:273)
~[flink-dist_2.12-1.12.2.jar:1.12.2] at
org.apache.flink.api.common.typeutils.base.MapSerializer.copy(MapSerializer.java:111)
~[flink-dist_2.12-1.12.2.jar:1.12.2] at
org.apache.flink.runtime.state.HeapBroadcastState.<init>(HeapBroadcastState.java:69)
~[flink-dist_2.12-1.12.2.jar:1.12.2] at
org.apache.flink.runtime.state.HeapBroadcastState.deepCopy(HeapBroadcastState.java:84)
~[flink-dist_2.12-1.12.2.jar:1.12.2] at
org.apache.flink.runtime.state.HeapBroadcastState.deepCopy(HeapBroadcastState.java:40)
~[flink-dist_2.12-1.12.2.jar:1.12.2] at
org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy.snapshot(DefaultOperatorStateBackendSnapshotStrategy.java:100)
~[flink-dist_2.12-1.12.2.jar:1.12.2] at
org.apache.flink.runtime.state.DefaultOperatorStateBackend.snapshot(DefaultOperatorStateBackend.java:234)
~[flink-dist_2.12-1.12.2.jar:1.12.2] at
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:213)
~[flink-dist_2.12-1.12.2.jar:1.12.2] ... 24 more*
Thanks
Prasanna,