I guess you only need file:///{os.getcwd()}/lib/flink-sql-avro-1.12.2.jar 
<file:///%7Bos.getcwd()%7D/lib/flink-sql-avro-1.12.2.jar>. Could you remove 
flink-avro-1.12.2.jar and avro-1.10.2.jar and try again? 

Regards,
Dian

> 2021年4月24日 上午8:29,Edward Yang <eddiepy...@gmail.com> 写道:
> 
> I've been trying to write to the avro format with pyflink 1.12.2 on ubuntu, 
> I've tested my code with an iterator writing to csv and everything works as 
> expected. Reading through the flink documentation I see that I should add jar 
> dependencies to work with avro. I downloaded three jar files that I believe 
> are required for avro like so:
> 
> table_env\
>     .get_config()\
>     .get_configuration()\
>     .set_string(
>     "pipeline.jars", 
>     
> rf"file:///{os.getcwd()}/lib/flink-sql-avro-1.12.2.jar;file:///{os.getcwd()}/lib/flink-avro-1.12.2.jar;file:///{os.getcwd()}/lib/avro-1.10.2.jar"
> )
> 
> I suspect I'm not loading the jar files correctly, but it's unclear what I'm 
> supposed to do as I'm not familiar with java and when I switch the sink 
> format to avro I get some unexpected errors: 
> Py4JJavaError: An error occurred while calling o746.executeInsert.
> : java.lang.NoClassDefFoundError: org/apache/avro/io/DatumWriter
>       at 
> org.apache.flink.formats.avro.AvroFileFormatFactory$1.createRuntimeEncoder(AvroFileFormatFactory.java:71)
>       at 
> org.apache.flink.formats.avro.AvroFileFormatFactory$1.createRuntimeEncoder(AvroFileFormatFactory.java:61)
>       at 
> org.apache.flink.table.filesystem.FileSystemTableSink.createWriter(FileSystemTableSink.java:373)
>       at 
> org.apache.flink.table.filesystem.FileSystemTableSink.createOutputFormatFactory(FileSystemTableSink.java:365)
>       at 
> org.apache.flink.table.filesystem.FileSystemTableSink.createBatchSink(FileSystemTableSink.java:163)
>       at 
> org.apache.flink.table.filesystem.FileSystemTableSink.consume(FileSystemTableSink.java:139)
>       at 
> org.apache.flink.table.filesystem.FileSystemTableSink.lambda$getSinkRuntimeProvider$0(FileSystemTableSink.java:134)
>       at 
> org.apache.flink.table.planner.plan.nodes.common.CommonPhysicalSink.createSinkTransformation(CommonPhysicalSink.scala:95)
>       at 
> org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecSink.translateToPlanInternal(BatchExecSink.scala:87)
>       at 
> org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecSink.translateToPlanInternal(BatchExecSink.scala:42)
>       at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:59)
>       at 
> org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecSink.translateToPlan(BatchExecSink.scala:42)
>       at 
> org.apache.flink.table.planner.delegation.BatchPlanner$$anonfun$translateToPlan$1.apply(BatchPlanner.scala:86)
>       at 
> org.apache.flink.table.planner.delegation.BatchPlanner$$anonfun$translateToPlan$1.apply(BatchPlanner.scala:85)
>       at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>       at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>       at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>       at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>       at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>       at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>       at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>       at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>       at 
> org.apache.flink.table.planner.delegation.BatchPlanner.translateToPlan(BatchPlanner.scala:85)
>       at 
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:162)
>       at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1329)
>       at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:676)
>       at 
> org.apache.flink.table.api.internal.TableImpl.executeInsert(TableImpl.java:572)
>       at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>       at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>       at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>       at 
> org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
>       at 
> org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
>       at 
> org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
>       at 
> org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
>       at 
> org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
>       at 
> org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
>       at java.base/java.lang.Thread.run(Thread.java:834)
> Caused by: java.lang.ClassNotFoundException: org.apache.avro.io.DatumWriter
>       at 
> java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581)
>       at 
> java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
>       at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
> 
> My sample code as follows:
> 
> from pyflink.dataset import ExecutionEnvironment
> from pyflink.table import TableConfig, DataTypes, BatchTableEnvironment, 
> EnvironmentSettings
> 
> env_settings = 
> EnvironmentSettings.new_instance().in_batch_mode().use_blink_planner().build()
> table_env = BatchTableEnvironment.create(environment_settings=env_settings)
> 
> table_env\
>     .get_config()\
>     .get_configuration()\
>     .set_string(
>     "pipeline.jars", 
>     
> rf"file:///{os.getcwd()}/lib/flink-sql-avro-1.12.2.jar;file:///{os.getcwd()}/lib/flink-avro-1.12.2.jar;file:///{os.getcwd()}/lib/avro-1.10.2.jar"
> )
> 
> table = table_env.from_elements(
>     a, 
>     schema=DataTypes.ROW([
>         DataTypes.FIELD('text', DataTypes.STRING()),
>         DataTypes.FIELD('text1', DataTypes.STRING())
>     ])
> )
> sink_ddl = f"""
>     create table Results(
>         a STRING,
>         b STRING
>     ) with (
>         'connector' = 'filesystem',
>         'path' = '{result_path}',
>         'format' = 'avro'
>     )
> """
> 
> table_env.execute_sql(sink_ddl)
> table.execute_insert("Results").wait()
> 
> Could someone help or point me in the right direction to look?

Reply via email to