Hi Kevin,

Currently, POJO type is not supported in Python DataStream API because it
is hard to deal with the conversion between Python Objects and Java
Objects. Maybe you can use a RowType to represent the POJO class such as
Types.ROW_NAME([id, created_at, updated_at], [Types.LONG(), Types.LONG(),
Types.LONG()]). We will try to support the POJO type in the future.

Best,
Shuiqiang

Kevin Lam <kevin....@shopify.com> 于2021年3月15日周一 下午10:46写道:

> Hi all,
>
> Looking to use Pyflink to work with some scala-defined objects being
> emitted from a custom source. When trying to manipulate the objects in a
> pyflink defined MapFunction
> <https://ci.apache.org/projects/flink/flink-docs-release-1.12/api/python/pyflink.datastream.html#pyflink.datastream.MapFunction>,
> I'm hitting an error like:
>
> Caused by: java.lang.UnsupportedOperationException: The type information:
> Option[<...>$Record(id: Long, created_at: Option[Long], updated_at:
> Option[Long])] is not supported in PyFlink currently.
>
> The scala object is defined something like:
>
> ```
> object <...> {
>   case class Record(
>     id: Long,
>     created_at: Option[Long],
>     updated_at: Option[Long],
>     ...
>   )
> }
> ```
>
> The pyflink code is something like:
>
> ```
> class Mutate(MapFunction):
>   def map(self,value):
>     print(value.id)
>     value.id = 123
>
> ...
>
> records = env.add_source(..)
> records = records.map(Mutate()
> ```
>
> Can you provide any advice on how to work with these kinds of objects in
> Pyflink?
>
> Thanks in advance!
>

Reply via email to