I'm attempting to use the dynamodb sink located at
https://github.com/apache/flink-connector-aws

The example
<https://github.com/apache/flink-connector-aws/blob/main/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/examples/SinkIntoDynamoDb.java>
in the repo is working as expected, however when I try to create a nested
data structure, I receive a Kryo serialization error message:

Caused by: com.esotericsoftware.kryo.KryoException:
java.lang.UnsupportedOperationException
Serialization trace:
m (software.amazon.awssdk.services.dynamodb.model.AttributeValue)
at
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)


The value that cannot be serialized is produced by this code:

import software.amazon.awssdk.services.dynamodb.model.AttributeValue;

AttributeValue.builder().m(
  ImmutableMap.of(
    "innerkey", AttributeValue.builder().s("innervalue").build()
  )
).build();


There are tests in the connector repo
<https://github.com/apache/flink-connector-aws/blob/3798aabfcc6f78645bf3d7255dfd6c336cd497f0/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/util/DynamoDbSerializationUtilTest.java#L70-L84>
for nested map structures, but they do not test that the structure can be
ser/de by Flink, which I believe occurs when the operator that produces the
value is separate to the sink operator.

Given that this is a fairly simple data type, I should be able to register
a custom serializer with Flink, but since I'm new to java I'm having
trouble making sense of the docs
<https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/datastream/fault-tolerance/serialization/types_serialization/>
and was hoping to find someone more knowledgeable in this area for some
pointers on what else I could start reading

Thanks
Matt

Reply via email to