Hey, Why do you say the way you did it, does not work? The logs you posted say the classes cannot be handled by Flink's built-in mechanism for serializing POJOs and it falls back to a GenericType which is serialized with Kryo and should go through your registered serializer.
Best, Dawid On 14/02/2021 11:44, Svend Vanderveken wrote: > > > Hi all, > > I'm failing to setup an example of wire serialization with Protobuf, > could you help me figure out what I'm doing wrong? > > I'm using a simple protobuf schema: > ``` > syntax = "proto3"; > > import "google/protobuf/wrappers.proto"; > option java_multiple_files = true; > message DemoUserEvent { > Metadata metadata = 1; > oneof payload { > Created created = 10; > Updated updated = 11; > } > message Created {...} > message Updated{...} > ... > } > ``` > > From which I'm generating java from this Gradle plugin: > > > ``` > plugins { id "com.google.protobuf" version "0.8.15"} > ``` > And I'm generating DemoUserEvent instances with Java Iterator looking > like this: > ``` > public class UserEventGenerator implements Iterator<DemoUserEvent>, > Serializable { > transient public final static Faker faker = new Faker(); > ... > @Overridepublic DemoUserEvent next() { > return randomCreatedEvent(); > } > ... > ``` > > I read those two pieces of documentation: > * > https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/types_serialization.html > * > https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/custom_serializers.html > > And tried the demo app below: > > ``` > import com.twitter.chill.protobuf.ProtobufSerializer; > ... > public static void main(String[] args) { > final StreamExecutionEnvironment flinkEnv = > StreamExecutionEnvironment.getExecutionEnvironment(); > flinkEnv.getConfig().registerTypeWithKryoSerializer(DemoUserEvent.class, > ProtobufSerializer.class); > flinkEnv.fromCollection(new UserEventGenerator(), > DemoUserEvent.class).print(); > } > ``` > But the serialization mechanism still fails to handle my protobuf class: > 11:22:45,822 INFO org.apache.flink.api.java.typeutils.TypeExtractor > [] - class live.schema.event.user.v1.DemoUserEvent does not contain a > getter for field payloadCase_ > 11:22:45,822 INFO org.apache.flink.api.java.typeutils.TypeExtractor > [] - class live.schema.event.user.v1.DemoUserEvent does not contain a > setter for field payloadCase_ > 11:22:45,822 INFO org.apache.flink.api.java.typeutils.TypeExtractor > [] - Class class live.schema.event.user.v1.DemoUserEvent cannot be used as > a POJO type because not all fields are valid POJO fields, and must be > processed as GenericType. Please read the Flink documentation on "Data Types > & Serialization" for details of the effect on performance. > > I've also tried this, without success: > > ``` > flinkEnv.getConfig().addDefaultKryoSerializer(DemoUserEvent.class, > ProtobufSerializer.class); > ``` > > I'm using those versions: > > ``` > ext { javaVersion = '11' flinkVersion = '1.12.1' scalaBinaryVersion = '2.12' } > dependencies { compileOnly > "org.apache.flink:flink-streaming-java_${scalaBinaryVersion}:${flinkVersion}"implementation > ("com.twitter:chill-protobuf:0.9.5") { exclude group: > 'com.esotericsoftware.kryo', module: 'kryo' } implementation > "com.google.protobuf:protobuf-java:3.14.0"implementation > 'com.github.javafaker:javafaker:1.0.2'} > ``` > > Any idea what I should try next? > > Thanks in advance!
signature.asc
Description: OpenPGP digital signature