Thank you Xingbo

1. I will try to use normal list instead of named. Thanks!
2. There is a new 1.12.1 version of pyflink which is using process_element(
self, value, ctx: 'KeyedProcessFunction.Context')

And what about the on_timer(self, timestamp, ctx:
'KeyedProcessFunction.OnTimerContext')? Can i access the value as in
process_element() in the ctx for example?

Thank you!

On Mon, Jan 18, 2021 at 4:18 PM Xingbo Huang <hxbks...@gmail.com> wrote:

> Hi Shuiqiang, meneldor,
>
> 1. In fact, there is a problem with using Python `Named Row` as the return
> value of user-defined function in PyFlink.
>
> When serializing a Row data, the serializer of each field is consistent
> with the order of the Row fields. But the field order of Python `Named Row`
> has been sorted by field, and it was designed to better compare Named Row
> and calculate hash values. So this can lead to
> serialization/deserialization errors(The correspondence between serializer
> and field is wrong). It is for performance considerations that serializers
> are not specified according to file name, but `Named Row` support can be
> achieved at the expense of a little performance for ease of use. For the
> current example, I suggest returning a list or a normal Row, instead of a
> Named Row.
>
>
> 2. In pyflink 1.12.0, the method signature of `on_timer` should be `def
> process_element(self, value, ctx: 'KeyedProcessFunction.Context', out:
> Collector)`[1].  If you want to send data in `on_timer`, you can use
> `Collector.collect`. e.g.
> def process_element(self, value, ctx: 'KeyedProcessFunction.Context', out:
> Collector):
> out.collect(Row('a', 'b', 'c'))
>
> 3. >>> I am not sure if the timestamp field should be included in
> output_type_info as i did now.
>
> If you return data with a time_stamp field, `output_type_info` needs to
> have `time_stamp` field. For example, the data returned in your example
> contains `time_stamp`, so your `output_type_info` needs to have the
> information of this field.
>
>
> [1]
> https://github.com/apache/flink/blob/release-1.12.0/flink-python/pyflink/datastream/functions.py#L759
>
> Best,
> Xingbo
>
> 2021年1月18日 下午9:21,meneldor <menel...@gmail.com> 写道:
>
> Actually the *output_type_info* is ok, it was copy/paste typo. I changed
> the function to:
>
> class MyProcessFunction(KeyedProcessFunction):
>     def process_element(self, value, ctx: 'KeyedProcessFunction.Context'):
>         yield types.Row(id=ctx.get_current_key()[0], 
> tp=ctx.get_current_key()[1], account="TEST", device_ts=value[3][2], 
> timestamp=ctx.timestamp())
>         ctx.timer_service().register_event_time_timer(ctx.timestamp() + 1500)
>
>     def on_timer(self, timestamp, ctx: 'KeyedProcessFunction.OnTimerContext'):
>         yield types.Row(id=ctx.get_current_key()[0], 
> tp=ctx.get_current_key()[1], account="TEST", device_ts=1111111111, 
> timestamp=timestamp)
>
> And the type to:
>
> output_type_info = Types.ROW_NAMED(['id', 'tp', 'account', 'device_ts', 
> 'timestamp'],
>                                      [Types.STRING(), Types.STRING(), 
> Types.STRING(), Types.LONG(), Types.LONG()])
>
> I cant return the same data in *on_timer()* because there is no value
> parameter. Thats why i hardcoded *device_ts*. However the exception
> persists.
>
> I am not sure if the timestamp field should be included in *output_type_info* 
> as i did now.
>
> Regards
>
>
> On Mon, Jan 18, 2021 at 2:57 PM Shuiqiang Chen <acqua....@gmail.com>
> wrote:
>
>> Hi meneldor,
>>
>> Actually, the return type of the on_timer() must be the same as
>> process_element(). It seems that the yield value of process_element() is
>> missing the `timestamp` field.  And the `output_type_info` has four field
>> names but with 5 field types. Could you align them?
>>
>> Best,
>> Shuiqiang
>>
>
>

Reply via email to