Dear list,
I’m having my first go at using Flink and quickly stumbled over a problem I
find no easy way around. I hope you may help me.
I try to read an avro encoded kafka topic. Doing so, I do get a
NoClassDefFoundError. Org.apache.avro.SchemaBuilder could not be found, but
this should be included in the provided avro-1.9.2.jar. The jar is correctly
picked up, as I do get “java.lang.ClassNotFoundException:
org.apache.avro.generic.IndexedRecord” if I remove that dependency.
[…]
Caused by: java.lang.NoClassDefFoundError: Could not initialize class
org.apache.avro.SchemaBuilder
at
org.apache.flink.formats.avro.typeutils.AvroSchemaConverter.convertToSchema(AvroSchemaConverter.java:240)
at
org.apache.flink.formats.avro.typeutils.AvroSchemaConverter.convertToSchema(AvroSchemaConverter.java:179)
at
org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.open(AvroRowDataDeserializationSchema.java:136)
at
org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.open(KafkaDeserializationSchemaWrapper.java:46)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:694)
at
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:291)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:479)
at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:528)
at
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
at java.lang.Thread.run(Thread.java:748)
The code I am using is the following:
env_settings =
EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
table_env = StreamTableEnvironment.create(environment_settings=env_settings)
jars = ["flink-sql-connector-kafka_2.11-1.11.2.jar",
"flink-avro-1.11.2-sql-jar.jar",
"avro-1.9.2.jar"]
jar_base_path = "file:///path/to/my/jars/"
table_env.get_config().get_configuration().set_string('pipeline.jars',
';'.join([jar_base_path + j for j in jars]))
table_env.execute_sql("""
CREATE TABLE test (
a STRING,
b INT,
c TIMESTAMP
) WITH (
'connector' = 'kafka',
'topic' = 'my_topic',
'properties.bootstrap.servers' = 'kafka.broker.com:9192',
'properties.group.id' = 'something',
'format' = 'avro'
)
""")
test = table_env.from_path('test')
test.to_pandas()
Any help would be appreciated.
Thanks in advance
Thilo
Fraport AG Frankfurt Airport Services Worldwide, 60547 Frankfurt am Main, Sitz
der Gesellschaft: Frankfurt am Main, Amtsgericht Frankfurt am Main: HRB 7042,
Umsatzsteuer-Identifikationsnummer: DE 114150623, Vorsitzender des
Aufsichtsrates: Michael Boddenberg – Hessischer Minister der Finanzen;
Vorstand: Dr. Stefan Schulte (Vorsitzender), Anke Giesen, Michael Müller, Dr.
Pierre Dominique Prümm, Dr. Matthias Zieschang