Hi Rodrigo,

Flink doesn't support an avro uber jar, so you need to add all dependency
jars manually, such as avro, jackson-core-asl, jackson-mapper-asl and
joda-time in release-1.11.
However, I found that there was a JIRA[1] that provided a default version
of avro uber jar a few days ago.


[1] https://issues.apache.org/jira/browse/FLINK-18802

Best,
Xingbo

Rodrigo de Souza Oliveira Brochado <[email protected]>
于2020年8月11日周二 上午7:30写道:

> Hi guys,
>
> Could you help me setting a Kafka sink connector using Avro format? I'm
> using pyFlink 1.11, OpenJDK 11, and executing the job locally (python
> my_script.py).
>
> My code is simple with a call to a UDTF that now only yields a fixed small
> binary. I use source and sink kafka connectors. The source uses Json format
> that work as expected. The sink works with the Csv format:
>
> (t_env.connect( # declare the external system to connect to
> Kafka()
> .version("universal")
> .topic("output")
> .property("bootstrap.servers", "localhost:9092")
> .property("zookeeper.connect", "localhost:2181")
> ) \
> .with_format( # declare a format for this system
> Csv() # Csv convert bytes to base64 string....
> ) \
> .with_schema( # declare the schema of the table
> Schema()
> .field("outf", DataTypes.BYTES())
> ) \
> .create_temporary_table("mySink"))
>
> But the Csv format converts the bytes (varbinary) to base64 string as
> expected, but is not desired.
> With the Avro format, I just get errors.
>
> - Just replacing Csv() on the code above for Avro(), and adding Avro's
> depency with
> conf = t_env.get_config().get_configuration()
> conf.set_string("pipeline.jars",
> "file://<path_to_kafka_connector>.jar;file://<path_to_kafka_connector>.jar";
> )
> I've got:
> *org.apache.flink.table.api.ValidationException: A definition of an Avro
> specific record class or Avro schema is required.*
>
> - After looking in the pyFlink source code, I've also passed an
> avro_schema argument to the constructor:
> Avro(avro_schema=<my_schema_in_string>) and got
> *java.lang.ClassNotFoundException: org.apache.avro.io.DatumWriter*
>
> - Using the SQL (DDL) declaration documented in [1],  I've got the same
> last error.
>
> I also have some doubts about how to create the schema, but I need the
> Avro to works first.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/avro.html
>
> Thanks,
> Rodrigo
>
>

Reply via email to