Hi Shuiqiang, meneldor,
1. In fact, there is a problem with using Python `Named Row` as the return
value of user-defined function in PyFlink.
When serializing a Row data, the serializer of each field is consistent
with the order of the Row fields. But the field order of Python `Named Row` has
been sorted by field, and it was designed to better compare Named Row and
calculate hash values. So this can lead to serialization/deserialization
errors(The correspondence between serializer and field is wrong). It is for
performance considerations that serializers are not specified according to file
name, but `Named Row` support can be achieved at the expense of a little
performance for ease of use. For the current example, I suggest returning a
list or a normal Row, instead of a Named Row.
2. In pyflink 1.12.0, the method signature of `on_timer` should be `def
process_element(self, value, ctx: 'KeyedProcessFunction.Context', out:
Collector)`[1]. If you want to send data in `on_timer`, you can use
`Collector.collect`. e.g.
def process_element(self, value, ctx: 'KeyedProcessFunction.Context',
out: Collector):
out.collect(Row('a', 'b', 'c'))
3. >>> I am not sure if the timestamp field should be included in
output_type_info as i did now.
If you return data with a time_stamp field, `output_type_info` needs to
have `time_stamp` field. For example, the data returned in your example
contains `time_stamp`, so your `output_type_info` needs to have the information
of this field.
[1]
https://github.com/apache/flink/blob/release-1.12.0/flink-python/pyflink/datastream/functions.py#L759
Best,
Xingbo
> 2021年1月18日 下午9:21,meneldor <[email protected]> 写道:
>
> Actually the output_type_info is ok, it was copy/paste typo. I changed the
> function to:
> class MyProcessFunction(KeyedProcessFunction):
> def process_element(self, value, ctx: 'KeyedProcessFunction.Context'):
> yield types.Row(id=ctx.get_current_key()[0],
> tp=ctx.get_current_key()[1], account="TEST", device_ts=value[3][2],
> timestamp=ctx.timestamp())
> ctx.timer_service().register_event_time_timer(ctx.timestamp() + 1500)
>
> def on_timer(self, timestamp, ctx: 'KeyedProcessFunction.OnTimerContext'):
> yield types.Row(id=ctx.get_current_key()[0],
> tp=ctx.get_current_key()[1], account="TEST", device_ts=1111111111,
> timestamp=timestamp)
> And the type to:
> output_type_info = Types.ROW_NAMED(['id', 'tp', 'account', 'device_ts',
> 'timestamp'],
> [Types.STRING(), Types.STRING(),
> Types.STRING(), Types.LONG(), Types.LONG()])
> I cant return the same data in on_timer() because there is no value
> parameter. Thats why i hardcoded device_ts. However the exception persists.
> I am not sure if the timestamp field should be included in output_type_info
> as i did now.
> Regards
>
> On Mon, Jan 18, 2021 at 2:57 PM Shuiqiang Chen <[email protected]
> <mailto:[email protected]>> wrote:
> Hi meneldor,
>
> Actually, the return type of the on_timer() must be the same as
> process_element(). It seems that the yield value of process_element() is
> missing the `timestamp` field. And the `output_type_info` has four field
> names but with 5 field types. Could you align them?
>
> Best,
> Shuiqiang