Hi Dian,

Thanks for trying it out, it ruled out a problem with the python code. I
double checked the jar path and only included the jar you referenced
without any luck. However, I tried creating a python 3.7 (had 3.8)
environment for pyflink and the code worked without any errors!


On Sun, Apr 25, 2021, 10:09 PM Dian Fu <dian0511...@gmail.com> wrote:

> Hi Eddie,
>
> I have tried your program with the following changes and it could execute
> successfully:
> - Replace `rf"
> file:///{os.getcwd()}/lib/flink-sql-avro-1.12.2.jar;file:///{os.getcwd()}/lib/flink-avro-1.12.2.jar;file:///{os.getcwd()}/lib/avro-1.10.2.jar”`
> with rf`"
> file:///Users/dianfu/code/src/apache/flink/flink-sql-avro-1.12.3.jar”`
> - Use flink-sql-avro-1.12.3.jar [1] instead of flink-sql-avro-1.12.2.jar
> as I encountered issue FLINK-21012 [2] which has been addressed in 1.12.3
>
> For your problem, I suspect if `
> file:///{os.getcwd()}/lib/flink-sql-avro-1.12.2.jar` really exists. Could
> you double check that?
>
> [1]
> https://repository.apache.org/content/repositories/orgapacheflink-1419/org/apache/flink/flink-sql-avro/1.12.3/flink-sql-avro-1.12.3.jar
> [2] https://issues.apache.org/jira/browse/FLINK-21012
>
> Regards,
> Dian
>
> 2021年4月25日 下午11:56,Edward Yang <eddiepy...@gmail.com> 写道:
>
> Hi Dian,
>
> I tried your suggestion but had the same error message unfortunately. I
> also tried file:/ and file:// with the same error, not sure what's going
> on, I assume writing to avro works fine in java and scala?
>
> Eddie
>
> On Sat, Apr 24, 2021 at 10:03 PM Dian Fu <dian0511...@gmail.com> wrote:
>
>> I guess you only need file:///{os.getcwd()}/lib/flink-sql-avro-1.12.2.jar
>> . Could you remove flink-avro-1.12.2.jar and avro-1.10.2.jar and try
>> again?
>>
>> Regards,
>> Dian
>>
>> 2021年4月24日 上午8:29,Edward Yang <eddiepy...@gmail.com> 写道:
>>
>> I've been trying to write to the avro format with pyflink 1.12.2 on
>> ubuntu, I've tested my code with an iterator writing to csv and everything
>> works as expected. Reading through the flink documentation I see that I
>> should add jar dependencies to work with avro. I downloaded three jar files
>> that I believe are required for avro like so:
>>
>> table_env\
>>     .get_config()\
>>     .get_configuration()\
>>     .set_string(
>>     "pipeline.jars",
>>     rf"
>> file:///{os.getcwd()}/lib/flink-sql-avro-1.12.2.jar;file:///{os.getcwd()}/lib/flink-avro-1.12.2.jar;file:///{os.getcwd()}/lib/avro-1.10.2.jar
>> "
>> )
>>
>> I suspect I'm not loading the jar files correctly, but it's unclear what
>> I'm supposed to do as I'm not familiar with java and when I switch the sink
>> format to avro I get some unexpected errors:
>>
>> Py4JJavaError: An error occurred while calling o746.executeInsert.
>> : java.lang.NoClassDefFoundError: org/apache/avro/io/DatumWriter
>>      at 
>> org.apache.flink.formats.avro.AvroFileFormatFactory$1.createRuntimeEncoder(AvroFileFormatFactory.java:71)
>>      at 
>> org.apache.flink.formats.avro.AvroFileFormatFactory$1.createRuntimeEncoder(AvroFileFormatFactory.java:61)
>>      at 
>> org.apache.flink.table.filesystem.FileSystemTableSink.createWriter(FileSystemTableSink.java:373)
>>      at 
>> org.apache.flink.table.filesystem.FileSystemTableSink.createOutputFormatFactory(FileSystemTableSink.java:365)
>>      at 
>> org.apache.flink.table.filesystem.FileSystemTableSink.createBatchSink(FileSystemTableSink.java:163)
>>      at 
>> org.apache.flink.table.filesystem.FileSystemTableSink.consume(FileSystemTableSink.java:139)
>>      at 
>> org.apache.flink.table.filesystem.FileSystemTableSink.lambda$getSinkRuntimeProvider$0(FileSystemTableSink.java:134)
>>      at 
>> org.apache.flink.table.planner.plan.nodes.common.CommonPhysicalSink.createSinkTransformation(CommonPhysicalSink.scala:95)
>>      at 
>> org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecSink.translateToPlanInternal(BatchExecSink.scala:87)
>>      at 
>> org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecSink.translateToPlanInternal(BatchExecSink.scala:42)
>>      at 
>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:59)
>>      at 
>> org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecSink.translateToPlan(BatchExecSink.scala:42)
>>      at 
>> org.apache.flink.table.planner.delegation.BatchPlanner$$anonfun$translateToPlan$1.apply(BatchPlanner.scala:86)
>>      at 
>> org.apache.flink.table.planner.delegation.BatchPlanner$$anonfun$translateToPlan$1.apply(BatchPlanner.scala:85)
>>      at 
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>>      at 
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>>      at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>>      at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>>      at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>>      at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>>      at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>>      at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>>      at 
>> org.apache.flink.table.planner.delegation.BatchPlanner.translateToPlan(BatchPlanner.scala:85)
>>      at 
>> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:162)
>>      at 
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1329)
>>      at 
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:676)
>>      at 
>> org.apache.flink.table.api.internal.TableImpl.executeInsert(TableImpl.java:572)
>>      at 
>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
>> Method)
>>      at 
>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>      at 
>> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>      at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>>      at 
>> org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
>>      at 
>> org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
>>      at 
>> org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
>>      at 
>> org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
>>      at 
>> org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
>>      at 
>> org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
>>      at java.base/java.lang.Thread.run(Thread.java:834)
>> Caused by: java.lang.ClassNotFoundException: org.apache.avro.io.DatumWriter
>>      at 
>> java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581)
>>      at 
>> java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
>>      at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
>>
>> My sample code as follows:
>>
>> from pyflink.dataset import ExecutionEnvironment
>> from pyflink.table import TableConfig, DataTypes, BatchTableEnvironment, 
>> EnvironmentSettings
>>
>> env_settings = 
>> EnvironmentSettings.new_instance().in_batch_mode().use_blink_planner().build()
>> table_env = BatchTableEnvironment.create(environment_settings=env_settings)
>>
>> table_env\
>>     .get_config()\
>>     .get_configuration()\
>>     .set_string(
>>     "pipeline.jars",
>>     
>> rf"file:///{os.getcwd()}/lib/flink-sql-avro-1.12.2.jar;file:///{os.getcwd()}/lib/flink-avro-1.12.2.jar;file:///{os.getcwd()}/lib/avro-1.10.2.jar"
>> )
>>
>> table = table_env.from_elements(
>>     a,
>>     schema=DataTypes.ROW([
>>         DataTypes.FIELD('text', DataTypes.STRING()),
>>         DataTypes.FIELD('text1', DataTypes.STRING())
>>     ])
>> )
>>
>> sink_ddl = f"""
>>     create table Results(
>>         a STRING,
>>         b STRING
>>     ) with (
>>         'connector' = 'filesystem',
>>         'path' = '{result_path}',
>>         'format' = 'avro'
>>     )
>> """
>>
>> table_env.execute_sql(sink_ddl)
>> table.execute_insert("Results").wait()
>>
>> Could someone help or point me in the right direction to look?
>>
>>
>>
>

Reply via email to