Hi Rodrigo, Flink doesn't support an avro uber jar, so you need to add all dependency jars manually, such as avro, jackson-core-asl, jackson-mapper-asl and joda-time in release-1.11. However, I found that there was a JIRA[1] that provided a default version of avro uber jar a few days ago.
[1] https://issues.apache.org/jira/browse/FLINK-18802 Best, Xingbo Rodrigo de Souza Oliveira Brochado <[email protected]> 于2020年8月11日周二 上午7:30写道: > Hi guys, > > Could you help me setting a Kafka sink connector using Avro format? I'm > using pyFlink 1.11, OpenJDK 11, and executing the job locally (python > my_script.py). > > My code is simple with a call to a UDTF that now only yields a fixed small > binary. I use source and sink kafka connectors. The source uses Json format > that work as expected. The sink works with the Csv format: > > (t_env.connect( # declare the external system to connect to > Kafka() > .version("universal") > .topic("output") > .property("bootstrap.servers", "localhost:9092") > .property("zookeeper.connect", "localhost:2181") > ) \ > .with_format( # declare a format for this system > Csv() # Csv convert bytes to base64 string.... > ) \ > .with_schema( # declare the schema of the table > Schema() > .field("outf", DataTypes.BYTES()) > ) \ > .create_temporary_table("mySink")) > > But the Csv format converts the bytes (varbinary) to base64 string as > expected, but is not desired. > With the Avro format, I just get errors. > > - Just replacing Csv() on the code above for Avro(), and adding Avro's > depency with > conf = t_env.get_config().get_configuration() > conf.set_string("pipeline.jars", > "file://<path_to_kafka_connector>.jar;file://<path_to_kafka_connector>.jar"; > ) > I've got: > *org.apache.flink.table.api.ValidationException: A definition of an Avro > specific record class or Avro schema is required.* > > - After looking in the pyFlink source code, I've also passed an > avro_schema argument to the constructor: > Avro(avro_schema=<my_schema_in_string>) and got > *java.lang.ClassNotFoundException: org.apache.avro.io.DatumWriter* > > - Using the SQL (DDL) declaration documented in [1], I've got the same > last error. > > I also have some doubts about how to create the schema, but I need the > Avro to works first. > > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/avro.html > > Thanks, > Rodrigo > >
