I'm attempting to use the dynamodb sink located at https://github.com/apache/flink-connector-aws
The example <https://github.com/apache/flink-connector-aws/blob/main/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/examples/SinkIntoDynamoDb.java> in the repo is working as expected, however when I try to create a nested data structure, I receive a Kryo serialization error message: Caused by: com.esotericsoftware.kryo.KryoException: java.lang.UnsupportedOperationException Serialization trace: m (software.amazon.awssdk.services.dynamodb.model.AttributeValue) at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125) The value that cannot be serialized is produced by this code: import software.amazon.awssdk.services.dynamodb.model.AttributeValue; AttributeValue.builder().m( ImmutableMap.of( "innerkey", AttributeValue.builder().s("innervalue").build() ) ).build(); There are tests in the connector repo <https://github.com/apache/flink-connector-aws/blob/3798aabfcc6f78645bf3d7255dfd6c336cd497f0/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/util/DynamoDbSerializationUtilTest.java#L70-L84> for nested map structures, but they do not test that the structure can be ser/de by Flink, which I believe occurs when the operator that produces the value is separate to the sink operator. Given that this is a fairly simple data type, I should be able to register a custom serializer with Flink, but since I'm new to java I'm having trouble making sense of the docs <https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/datastream/fault-tolerance/serialization/types_serialization/> and was hoping to find someone more knowledgeable in this area for some pointers on what else I could start reading Thanks Matt