Hi all, I'm failing to setup an example of wire serialization with Protobuf, could you help me figure out what I'm doing wrong?
I'm using a simple protobuf schema: ``` syntax = "proto3"; import "google/protobuf/wrappers.proto"; option java_multiple_files = true; message DemoUserEvent { Metadata metadata = 1; oneof payload { Created created = 10; Updated updated = 11; } message Created {...} message Updated {...} ... } ``` >From which I'm generating java from this Gradle plugin: ``` plugins { id "com.google.protobuf" version "0.8.15" } ``` And I'm generating DemoUserEvent instances with Java Iterator looking like this: ``` public class UserEventGenerator implements Iterator<DemoUserEvent>, Serializable { transient public final static Faker faker = new Faker(); ... @Override public DemoUserEvent next() { return randomCreatedEvent(); } ... ``` I read those two pieces of documentation: * https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/types_serialization.html * https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/custom_serializers.html And tried the demo app below: ``` import com.twitter.chill.protobuf.ProtobufSerializer; ... public static void main(String[] args) { final StreamExecutionEnvironment flinkEnv = StreamExecutionEnvironment.getExecutionEnvironment(); flinkEnv.getConfig().registerTypeWithKryoSerializer(DemoUserEvent.class, ProtobufSerializer.class); flinkEnv.fromCollection(new UserEventGenerator(), DemoUserEvent.class).print(); } ``` But the serialization mechanism still fails to handle my protobuf class: 11:22:45,822 INFO org.apache.flink.api.java.typeutils.TypeExtractor [] - class live.schema.event.user.v1.DemoUserEvent does not contain a getter for field payloadCase_ 11:22:45,822 INFO org.apache.flink.api.java.typeutils.TypeExtractor [] - class live.schema.event.user.v1.DemoUserEvent does not contain a setter for field payloadCase_ 11:22:45,822 INFO org.apache.flink.api.java.typeutils.TypeExtractor [] - Class class live.schema.event.user.v1.DemoUserEvent cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance. I've also tried this, without success: ``` flinkEnv.getConfig().addDefaultKryoSerializer(DemoUserEvent.class, ProtobufSerializer.class); ``` I'm using those versions: ``` ext { javaVersion = '11' flinkVersion = '1.12.1' scalaBinaryVersion = '2.12' } dependencies { compileOnly "org.apache.flink:flink-streaming-java_${scalaBinaryVersion}:${flinkVersion}" implementation ("com.twitter:chill-protobuf:0.9.5") { exclude group: 'com.esotericsoftware.kryo', module: 'kryo' } implementation "com.google.protobuf:protobuf-java:3.14.0" implementation 'com.github.javafaker:javafaker:1.0.2' } ``` Any idea what I should try next? Thanks in advance!