Improvements to the documentation are always welcome.

In this particular case we actually need to be really careful, as it is
not always the expected behavior. As you are registering your own kryo
serializer it is expected in your case.

However more often the case is, you don't want to use the GenericType,
but a PojoType and this message helps you to identify a problem with
your POJO declaration.

Best,

Dawid

On 15/02/2021 11:50, Svend Vanderveken wrote:
> Oh!
>
> Indeed, my program was just not starting because I omitted the
> flink.execute() part ! I confirms it works now. 
>
> Thanks for the quick response.
>
> Do you mind if I submit a small PR to the Flink doc to clarify that
> those INFO logs are indeed the expected behavior? For example
> here: 
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/custom_serializers.html
>
> Svend
>
>
>
> On Mon, Feb 15, 2021 at 10:03 AM Dawid Wysakowicz
> <dwysakow...@apache.org <mailto:dwysakow...@apache.org>> wrote:
>
>     Hey,
>
>     Why do you say the way you did it, does not work? The logs you
>     posted say the classes cannot be handled by Flink's built-in
>     mechanism for serializing POJOs and it falls back to a GenericType
>     which is serialized with Kryo and should go through your
>     registered serializer.
>
>     Best,
>
>     Dawid
>
>
>     On 14/02/2021 11:44, Svend Vanderveken wrote:
>>
>>
>>     Hi all,
>>
>>     I'm failing to setup an example of wire serialization with
>>     Protobuf, could you help me figure out what I'm doing wrong?
>>
>>     I'm using a simple protobuf schema:
>>     ```
>>     syntax = "proto3";
>>
>>     import "google/protobuf/wrappers.proto";
>>     option java_multiple_files = true;
>>     message DemoUserEvent {
>>       Metadata metadata = 1;
>>       oneof payload {
>>         Created created = 10;
>>         Updated updated = 11;
>>       }
>>       message Created {...}
>>     message Updated{...}
>>       ...
>>     }
>>     ```
>>
>>     From which I'm generating java from this Gradle plugin:
>>
>>
>>     ```
>>     plugins { id "com.google.protobuf" version "0.8.15"}
>>     ```
>>     And I'm generating DemoUserEvent instances with Java Iterator
>>     looking like this:
>>     ```
>>     public class UserEventGenerator implements Iterator<DemoUserEvent>, 
>> Serializable {
>>         transient public final static Faker faker = new Faker();
>>         ...
>>         @Overridepublic DemoUserEvent next() {
>>             return randomCreatedEvent();
>>          }
>>          ...
>>     ```
>>
>>     I read those two pieces of documentation:
>>     * 
>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/types_serialization.html
>>     * 
>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/custom_serializers.html
>>
>>     And tried the demo app below:
>>
>>     ```
>>     import com.twitter.chill.protobuf.ProtobufSerializer;
>>     ...
>>     public static void main(String[] args) {
>>         final StreamExecutionEnvironment flinkEnv = 
>> StreamExecutionEnvironment.getExecutionEnvironment();
>>         
>> flinkEnv.getConfig().registerTypeWithKryoSerializer(DemoUserEvent.class, 
>> ProtobufSerializer.class);
>>         flinkEnv.fromCollection(new UserEventGenerator(), 
>> DemoUserEvent.class).print();
>>     }
>>     ```
>>     But the serialization mechanism still fails to handle my protobuf class:
>>     11:22:45,822 INFO  org.apache.flink.api.java.typeutils.TypeExtractor     
>>        [] - class live.schema.event.user.v1.DemoUserEvent does not contain a 
>> getter for field payloadCase_
>>     11:22:45,822 INFO  org.apache.flink.api.java.typeutils.TypeExtractor     
>>        [] - class live.schema.event.user.v1.DemoUserEvent does not contain a 
>> setter for field payloadCase_
>>     11:22:45,822 INFO  org.apache.flink.api.java.typeutils.TypeExtractor     
>>        [] - Class class live.schema.event.user.v1.DemoUserEvent cannot be 
>> used as a POJO type because not all fields are valid POJO fields, and must 
>> be processed as GenericType. Please read the Flink documentation on "Data 
>> Types & Serialization" for details of the effect on performance.
>>
>>     I've also tried this, without success:
>>
>>     ```
>>     flinkEnv.getConfig().addDefaultKryoSerializer(DemoUserEvent.class, 
>> ProtobufSerializer.class);
>>     ```
>>
>>     I'm using those versions:
>>
>>     ```
>>     ext { javaVersion = '11' flinkVersion = '1.12.1' scalaBinaryVersion = 
>> '2.12' }
>>     dependencies { compileOnly 
>> "org.apache.flink:flink-streaming-java_${scalaBinaryVersion}:${flinkVersion}"implementation
>>  ("com.twitter:chill-protobuf:0.9.5") { exclude group: 
>> 'com.esotericsoftware.kryo', module: 'kryo' } implementation 
>> "com.google.protobuf:protobuf-java:3.14.0"implementation 
>> 'com.github.javafaker:javafaker:1.0.2'}
>>     ```
>>
>>     Any idea what I should try next?
>>
>>     Thanks in advance!
>
>
>
> -- 
> Svend Vanderveken
> Kelesia SPRL - BE 0839 049 010
> blog: https://svend.kelesia.com <http://svend.kelesia.com/>
> Twitter: @sv3ndk <https://twitter.com/sv3ndk>
>

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to