Re: Writing to Avro from pyflink

2021-04-26 Thread Edward Yang
Hi Dian,

Thanks for trying it out, it ruled out a problem with the python code. I
double checked the jar path and only included the jar you referenced
without any luck. However, I tried creating a python 3.7 (had 3.8)
environment for pyflink and the code worked without any errors!


On Sun, Apr 25, 2021, 10:09 PM Dian Fu  wrote:

> Hi Eddie,
>
> I have tried your program with the following changes and it could execute
> successfully:
> - Replace `rf"
> file:///{os.getcwd()}/lib/flink-sql-avro-1.12.2.jar;file:///{os.getcwd()}/lib/flink-avro-1.12.2.jar;file:///{os.getcwd()}/lib/avro-1.10.2.jar”`
> with rf`"
> file:///Users/dianfu/code/src/apache/flink/flink-sql-avro-1.12.3.jar”`
> - Use flink-sql-avro-1.12.3.jar [1] instead of flink-sql-avro-1.12.2.jar
> as I encountered issue FLINK-21012 [2] which has been addressed in 1.12.3
>
> For your problem, I suspect if `
> file:///{os.getcwd()}/lib/flink-sql-avro-1.12.2.jar` really exists. Could
> you double check that?
>
> [1]
> https://repository.apache.org/content/repositories/orgapacheflink-1419/org/apache/flink/flink-sql-avro/1.12.3/flink-sql-avro-1.12.3.jar
> [2] https://issues.apache.org/jira/browse/FLINK-21012
>
> Regards,
> Dian
>
> 2021年4月25日 下午11:56,Edward Yang  写道:
>
> Hi Dian,
>
> I tried your suggestion but had the same error message unfortunately. I
> also tried file:/ and file:// with the same error, not sure what's going
> on, I assume writing to avro works fine in java and scala?
>
> Eddie
>
> On Sat, Apr 24, 2021 at 10:03 PM Dian Fu  wrote:
>
>> I guess you only need file:///{os.getcwd()}/lib/flink-sql-avro-1.12.2.jar
>> . Could you remove flink-avro-1.12.2.jar and avro-1.10.2.jar and try
>> again?
>>
>> Regards,
>> Dian
>>
>> 2021年4月24日 上午8:29,Edward Yang  写道:
>>
>> I've been trying to write to the avro format with pyflink 1.12.2 on
>> ubuntu, I've tested my code with an iterator writing to csv and everything
>> works as expected. Reading through the flink documentation I see that I
>> should add jar dependencies to work with avro. I downloaded three jar files
>> that I believe are required for avro like so:
>>
>> table_env\
>> .get_config()\
>> .get_configuration()\
>> .set_string(
>> "pipeline.jars",
>> rf"
>> file:///{os.getcwd()}/lib/flink-sql-avro-1.12.2.jar;file:///{os.getcwd()}/lib/flink-avro-1.12.2.jar;file:///{os.getcwd()}/lib/avro-1.10.2.jar
>> "
>> )
>>
>> I suspect I'm not loading the jar files correctly, but it's unclear what
>> I'm supposed to do as I'm not familiar with java and when I switch the sink
>> format to avro I get some unexpected errors:
>>
>> Py4JJavaError: An error occurred while calling o746.executeInsert.
>> : java.lang.NoClassDefFoundError: org/apache/avro/io/DatumWriter
>>  at 
>> org.apache.flink.formats.avro.AvroFileFormatFactory$1.createRuntimeEncoder(AvroFileFormatFactory.java:71)
>>  at 
>> org.apache.flink.formats.avro.AvroFileFormatFactory$1.createRuntimeEncoder(AvroFileFormatFactory.java:61)
>>  at 
>> org.apache.flink.table.filesystem.FileSystemTableSink.createWriter(FileSystemTableSink.java:373)
>>  at 
>> org.apache.flink.table.filesystem.FileSystemTableSink.createOutputFormatFactory(FileSystemTableSink.java:365)
>>  at 
>> org.apache.flink.table.filesystem.FileSystemTableSink.createBatchSink(FileSystemTableSink.java:163)
>>  at 
>> org.apache.flink.table.filesystem.FileSystemTableSink.consume(FileSystemTableSink.java:139)
>>  at 
>> org.apache.flink.table.filesystem.FileSystemTableSink.lambda$getSinkRuntimeProvider$0(FileSystemTableSink.java:134)
>>  at 
>> org.apache.flink.table.planner.plan.nodes.common.CommonPhysicalSink.createSinkTransformation(CommonPhysicalSink.scala:95)
>>  at 
>> org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecSink.translateToPlanInternal(BatchExecSink.scala:87)
>>  at 
>> org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecSink.translateToPlanInternal(BatchExecSink.scala:42)
>>  at 
>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:59)
>>  at 
>> org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecSink.translateToPlan(BatchExecSink.scala:42)
>>  at 
>> org.apache.flink.table.planner.delegation.BatchPlanner$$anonfun$translateToPlan$1.apply(BatchPlanner.scala:86)
>>  at 
>> org.apache.flink.table.planner.delegation.BatchPlanner$$anonfun$translateToPlan$1.apply(BatchPlanner.scala:85)
>>  at 
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>>  at 
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>>  at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>>  at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>>  at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>>  at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>>  at scala.collection.TraversableLike$class.map(

Re: Writing to Avro from pyflink

2021-04-25 Thread Dian Fu
Hi Eddie,

I have tried your program with the following changes and it could execute 
successfully:
- Replace 
`rf"file:///{os.getcwd()}/lib/flink-sql-avro-1.12.2.jar;file:///{os.getcwd()}/lib/flink-avro-1.12.2.jar;file:///{os.getcwd()}/lib/avro-1.10.2.jar”`
 with 
rf`"file:///Users/dianfu/code/src/apache/flink/flink-sql-avro-1.12.3.jar”` 

- Use flink-sql-avro-1.12.3.jar [1] instead of flink-sql-avro-1.12.2.jar as I 
encountered issue FLINK-21012 [2] which has been addressed in 1.12.3 

For your problem, I suspect if 
`file:///{os.getcwd()}/lib/flink-sql-avro-1.12.2.jar` 
 really exists. 
Could you double check that?

[1] 
https://repository.apache.org/content/repositories/orgapacheflink-1419/org/apache/flink/flink-sql-avro/1.12.3/flink-sql-avro-1.12.3.jar
[2] https://issues.apache.org/jira/browse/FLINK-21012 


Regards,
Dian

> 2021年4月25日 下午11:56,Edward Yang  写道:
> 
> Hi Dian,
> 
> I tried your suggestion but had the same error message unfortunately. I also 
> tried file:/ and file:// with the same error, not sure what's going on, I 
> assume writing to avro works fine in java and scala? 
> 
> Eddie
> 
> On Sat, Apr 24, 2021 at 10:03 PM Dian Fu  > wrote:
> I guess you only need file:///{os.getcwd()}/lib/flink-sql-avro-1.12.2.jar <>. 
> Could you remove flink-avro-1.12.2.jar and avro-1.10.2.jar and try again? 
> 
> Regards,
> Dian
> 
>> 2021年4月24日 上午8:29,Edward Yang > > 写道:
>> 
>> I've been trying to write to the avro format with pyflink 1.12.2 on ubuntu, 
>> I've tested my code with an iterator writing to csv and everything works as 
>> expected. Reading through the flink documentation I see that I should add 
>> jar dependencies to work with avro. I downloaded three jar files that I 
>> believe are required for avro like so:
>> 
>> table_env\
>> .get_config()\
>> .get_configuration()\
>> .set_string(
>> "pipeline.jars", 
>> 
>> rf"file:///{os.getcwd()}/lib/flink-sql-avro-1.12.2.jar;file:///{os.getcwd()}/lib/flink-avro-1.12.2.jar;file:///{os.getcwd()}/lib/avro-1.10.2.jar
>>  <>"
>> )
>> 
>> I suspect I'm not loading the jar files correctly, but it's unclear what I'm 
>> supposed to do as I'm not familiar with java and when I switch the sink 
>> format to avro I get some unexpected errors: 
>> Py4JJavaError: An error occurred while calling o746.executeInsert.
>> : java.lang.NoClassDefFoundError: org/apache/avro/io/DatumWriter
>>  at 
>> org.apache.flink.formats.avro.AvroFileFormatFactory$1.createRuntimeEncoder(AvroFileFormatFactory.java:71)
>>  at 
>> org.apache.flink.formats.avro.AvroFileFormatFactory$1.createRuntimeEncoder(AvroFileFormatFactory.java:61)
>>  at 
>> org.apache.flink.table.filesystem.FileSystemTableSink.createWriter(FileSystemTableSink.java:373)
>>  at 
>> org.apache.flink.table.filesystem.FileSystemTableSink.createOutputFormatFactory(FileSystemTableSink.java:365)
>>  at 
>> org.apache.flink.table.filesystem.FileSystemTableSink.createBatchSink(FileSystemTableSink.java:163)
>>  at 
>> org.apache.flink.table.filesystem.FileSystemTableSink.consume(FileSystemTableSink.java:139)
>>  at 
>> org.apache.flink.table.filesystem.FileSystemTableSink.lambda$getSinkRuntimeProvider$0(FileSystemTableSink.java:134)
>>  at 
>> org.apache.flink.table.planner.plan.nodes.common.CommonPhysicalSink.createSinkTransformation(CommonPhysicalSink.scala:95)
>>  at 
>> org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecSink.translateToPlanInternal(BatchExecSink.scala:87)
>>  at 
>> org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecSink.translateToPlanInternal(BatchExecSink.scala:42)
>>  at 
>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:59)
>>  at 
>> org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecSink.translateToPlan(BatchExecSink.scala:42)
>>  at 
>> org.apache.flink.table.planner.delegation.BatchPlanner$$anonfun$translateToPlan$1.apply(BatchPlanner.scala:86)
>>  at 
>> org.apache.flink.table.planner.delegation.BatchPlanner$$anonfun$translateToPlan$1.apply(BatchPlanner.scala:85)
>>  at 
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>>  at 
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>>  at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>>  at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>>  at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>>  at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>>  at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>>  at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>>  at 
>> org.apache.flink.table.planner.delegation.BatchPlanner.translateToPlan(BatchPlanner.scala:85)
>>  at 
>> org.ap

Re: Writing to Avro from pyflink

2021-04-25 Thread Edward Yang
Hi Dian,

I tried your suggestion but had the same error message unfortunately. I
also tried file:/ and file:// with the same error, not sure what's going
on, I assume writing to avro works fine in java and scala?

Eddie

On Sat, Apr 24, 2021 at 10:03 PM Dian Fu  wrote:

> I guess you only need file:///{os.getcwd()}/lib/flink-sql-avro-1.12.2.jar
> . Could you remove flink-avro-1.12.2.jar and avro-1.10.2.jar and try
> again?
>
> Regards,
> Dian
>
> 2021年4月24日 上午8:29,Edward Yang  写道:
>
> I've been trying to write to the avro format with pyflink 1.12.2 on
> ubuntu, I've tested my code with an iterator writing to csv and everything
> works as expected. Reading through the flink documentation I see that I
> should add jar dependencies to work with avro. I downloaded three jar files
> that I believe are required for avro like so:
>
> table_env\
> .get_config()\
> .get_configuration()\
> .set_string(
> "pipeline.jars",
> rf"
> file:///{os.getcwd()}/lib/flink-sql-avro-1.12.2.jar;file:///{os.getcwd()}/lib/flink-avro-1.12.2.jar;file:///{os.getcwd()}/lib/avro-1.10.2.jar
> "
> )
>
> I suspect I'm not loading the jar files correctly, but it's unclear what
> I'm supposed to do as I'm not familiar with java and when I switch the sink
> format to avro I get some unexpected errors:
>
> Py4JJavaError: An error occurred while calling o746.executeInsert.
> : java.lang.NoClassDefFoundError: org/apache/avro/io/DatumWriter
>   at 
> org.apache.flink.formats.avro.AvroFileFormatFactory$1.createRuntimeEncoder(AvroFileFormatFactory.java:71)
>   at 
> org.apache.flink.formats.avro.AvroFileFormatFactory$1.createRuntimeEncoder(AvroFileFormatFactory.java:61)
>   at 
> org.apache.flink.table.filesystem.FileSystemTableSink.createWriter(FileSystemTableSink.java:373)
>   at 
> org.apache.flink.table.filesystem.FileSystemTableSink.createOutputFormatFactory(FileSystemTableSink.java:365)
>   at 
> org.apache.flink.table.filesystem.FileSystemTableSink.createBatchSink(FileSystemTableSink.java:163)
>   at 
> org.apache.flink.table.filesystem.FileSystemTableSink.consume(FileSystemTableSink.java:139)
>   at 
> org.apache.flink.table.filesystem.FileSystemTableSink.lambda$getSinkRuntimeProvider$0(FileSystemTableSink.java:134)
>   at 
> org.apache.flink.table.planner.plan.nodes.common.CommonPhysicalSink.createSinkTransformation(CommonPhysicalSink.scala:95)
>   at 
> org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecSink.translateToPlanInternal(BatchExecSink.scala:87)
>   at 
> org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecSink.translateToPlanInternal(BatchExecSink.scala:42)
>   at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:59)
>   at 
> org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecSink.translateToPlan(BatchExecSink.scala:42)
>   at 
> org.apache.flink.table.planner.delegation.BatchPlanner$$anonfun$translateToPlan$1.apply(BatchPlanner.scala:86)
>   at 
> org.apache.flink.table.planner.delegation.BatchPlanner$$anonfun$translateToPlan$1.apply(BatchPlanner.scala:85)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>   at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>   at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>   at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>   at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>   at 
> org.apache.flink.table.planner.delegation.BatchPlanner.translateToPlan(BatchPlanner.scala:85)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:162)
>   at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1329)
>   at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:676)
>   at 
> org.apache.flink.table.api.internal.TableImpl.executeInsert(TableImpl.java:572)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>   at 
> org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
>   at 
> org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
>   at 
> org.apache.flink.api.python.shaded

Re: Writing to Avro from pyflink

2021-04-24 Thread Dian Fu
I guess you only need file:///{os.getcwd()}/lib/flink-sql-avro-1.12.2.jar 
. Could you remove 
flink-avro-1.12.2.jar and avro-1.10.2.jar and try again? 

Regards,
Dian

> 2021年4月24日 上午8:29,Edward Yang  写道:
> 
> I've been trying to write to the avro format with pyflink 1.12.2 on ubuntu, 
> I've tested my code with an iterator writing to csv and everything works as 
> expected. Reading through the flink documentation I see that I should add jar 
> dependencies to work with avro. I downloaded three jar files that I believe 
> are required for avro like so:
> 
> table_env\
> .get_config()\
> .get_configuration()\
> .set_string(
> "pipeline.jars", 
> 
> rf"file:///{os.getcwd()}/lib/flink-sql-avro-1.12.2.jar;file:///{os.getcwd()}/lib/flink-avro-1.12.2.jar;file:///{os.getcwd()}/lib/avro-1.10.2.jar"
> )
> 
> I suspect I'm not loading the jar files correctly, but it's unclear what I'm 
> supposed to do as I'm not familiar with java and when I switch the sink 
> format to avro I get some unexpected errors: 
> Py4JJavaError: An error occurred while calling o746.executeInsert.
> : java.lang.NoClassDefFoundError: org/apache/avro/io/DatumWriter
>   at 
> org.apache.flink.formats.avro.AvroFileFormatFactory$1.createRuntimeEncoder(AvroFileFormatFactory.java:71)
>   at 
> org.apache.flink.formats.avro.AvroFileFormatFactory$1.createRuntimeEncoder(AvroFileFormatFactory.java:61)
>   at 
> org.apache.flink.table.filesystem.FileSystemTableSink.createWriter(FileSystemTableSink.java:373)
>   at 
> org.apache.flink.table.filesystem.FileSystemTableSink.createOutputFormatFactory(FileSystemTableSink.java:365)
>   at 
> org.apache.flink.table.filesystem.FileSystemTableSink.createBatchSink(FileSystemTableSink.java:163)
>   at 
> org.apache.flink.table.filesystem.FileSystemTableSink.consume(FileSystemTableSink.java:139)
>   at 
> org.apache.flink.table.filesystem.FileSystemTableSink.lambda$getSinkRuntimeProvider$0(FileSystemTableSink.java:134)
>   at 
> org.apache.flink.table.planner.plan.nodes.common.CommonPhysicalSink.createSinkTransformation(CommonPhysicalSink.scala:95)
>   at 
> org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecSink.translateToPlanInternal(BatchExecSink.scala:87)
>   at 
> org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecSink.translateToPlanInternal(BatchExecSink.scala:42)
>   at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:59)
>   at 
> org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecSink.translateToPlan(BatchExecSink.scala:42)
>   at 
> org.apache.flink.table.planner.delegation.BatchPlanner$$anonfun$translateToPlan$1.apply(BatchPlanner.scala:86)
>   at 
> org.apache.flink.table.planner.delegation.BatchPlanner$$anonfun$translateToPlan$1.apply(BatchPlanner.scala:85)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>   at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>   at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>   at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>   at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>   at 
> org.apache.flink.table.planner.delegation.BatchPlanner.translateToPlan(BatchPlanner.scala:85)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:162)
>   at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1329)
>   at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:676)
>   at 
> org.apache.flink.table.api.internal.TableImpl.executeInsert(TableImpl.java:572)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>   at 
> org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
>   at 
> org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
>   at 
> org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
>   at 
> org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
>   at 
> org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
>   at 
>