Hi Shuiqiang Chen,

Thanks for the quick response. Oh I see, that's too bad POJO is not
currently supported.

I'd like to check if I understand your suggestion about RowType. You're
suggesting something like:

1/ Define subclasses of RowType in Java/Scala to hold our java objects we
want to manipulate in Python.
2/ When datastreams/sources emit objects of this type in pyflink, we can
mutate and read from these java defined RowTypes as needed, because Python
doesn't know how to handle arbitrary POJOs, but knows how to handle RowType
objects.

Is that correct? A simple example of extending/using RowType would be
helpful if you have a chance.

Thanks again for all your help, here and in the other threads on this
mailing list, really appreciate it!!

On Mon, Mar 15, 2021 at 11:59 AM Shuiqiang Chen <acqua....@gmail.com> wrote:

> Hi Kevin,
>
> Currently, POJO type is not supported in Python DataStream API because it
> is hard to deal with the conversion between Python Objects and Java
> Objects. Maybe you can use a RowType to represent the POJO class such as
> Types.ROW_NAME([id, created_at, updated_at], [Types.LONG(), Types.LONG(),
> Types.LONG()]). We will try to support the POJO type in the future.
>
> Best,
> Shuiqiang
>
> Kevin Lam <kevin....@shopify.com> 于2021年3月15日周一 下午10:46写道:
>
>> Hi all,
>>
>> Looking to use Pyflink to work with some scala-defined objects being
>> emitted from a custom source. When trying to manipulate the objects in a
>> pyflink defined MapFunction
>> <https://ci.apache.org/projects/flink/flink-docs-release-1.12/api/python/pyflink.datastream.html#pyflink.datastream.MapFunction>,
>> I'm hitting an error like:
>>
>> Caused by: java.lang.UnsupportedOperationException: The type information:
>> Option[<...>$Record(id: Long, created_at: Option[Long], updated_at:
>> Option[Long])] is not supported in PyFlink currently.
>>
>> The scala object is defined something like:
>>
>> ```
>> object <...> {
>>   case class Record(
>>     id: Long,
>>     created_at: Option[Long],
>>     updated_at: Option[Long],
>>     ...
>>   )
>> }
>> ```
>>
>> The pyflink code is something like:
>>
>> ```
>> class Mutate(MapFunction):
>>   def map(self,value):
>>     print(value.id)
>>     value.id = 123
>>
>> ...
>>
>> records = env.add_source(..)
>> records = records.map(Mutate()
>> ```
>>
>> Can you provide any advice on how to work with these kinds of objects in
>> Pyflink?
>>
>> Thanks in advance!
>>
>

Reply via email to