Thank you Xingbo 1. I will try to use normal list instead of named. Thanks! 2. There is a new 1.12.1 version of pyflink which is using process_element( self, value, ctx: 'KeyedProcessFunction.Context')
And what about the on_timer(self, timestamp, ctx: 'KeyedProcessFunction.OnTimerContext')? Can i access the value as in process_element() in the ctx for example? Thank you! On Mon, Jan 18, 2021 at 4:18 PM Xingbo Huang <hxbks...@gmail.com> wrote: > Hi Shuiqiang, meneldor, > > 1. In fact, there is a problem with using Python `Named Row` as the return > value of user-defined function in PyFlink. > > When serializing a Row data, the serializer of each field is consistent > with the order of the Row fields. But the field order of Python `Named Row` > has been sorted by field, and it was designed to better compare Named Row > and calculate hash values. So this can lead to > serialization/deserialization errors(The correspondence between serializer > and field is wrong). It is for performance considerations that serializers > are not specified according to file name, but `Named Row` support can be > achieved at the expense of a little performance for ease of use. For the > current example, I suggest returning a list or a normal Row, instead of a > Named Row. > > > 2. In pyflink 1.12.0, the method signature of `on_timer` should be `def > process_element(self, value, ctx: 'KeyedProcessFunction.Context', out: > Collector)`[1]. If you want to send data in `on_timer`, you can use > `Collector.collect`. e.g. > def process_element(self, value, ctx: 'KeyedProcessFunction.Context', out: > Collector): > out.collect(Row('a', 'b', 'c')) > > 3. >>> I am not sure if the timestamp field should be included in > output_type_info as i did now. > > If you return data with a time_stamp field, `output_type_info` needs to > have `time_stamp` field. For example, the data returned in your example > contains `time_stamp`, so your `output_type_info` needs to have the > information of this field. > > > [1] > https://github.com/apache/flink/blob/release-1.12.0/flink-python/pyflink/datastream/functions.py#L759 > > Best, > Xingbo > > 2021年1月18日 下午9:21,meneldor <menel...@gmail.com> 写道: > > Actually the *output_type_info* is ok, it was copy/paste typo. I changed > the function to: > > class MyProcessFunction(KeyedProcessFunction): > def process_element(self, value, ctx: 'KeyedProcessFunction.Context'): > yield types.Row(id=ctx.get_current_key()[0], > tp=ctx.get_current_key()[1], account="TEST", device_ts=value[3][2], > timestamp=ctx.timestamp()) > ctx.timer_service().register_event_time_timer(ctx.timestamp() + 1500) > > def on_timer(self, timestamp, ctx: 'KeyedProcessFunction.OnTimerContext'): > yield types.Row(id=ctx.get_current_key()[0], > tp=ctx.get_current_key()[1], account="TEST", device_ts=1111111111, > timestamp=timestamp) > > And the type to: > > output_type_info = Types.ROW_NAMED(['id', 'tp', 'account', 'device_ts', > 'timestamp'], > [Types.STRING(), Types.STRING(), > Types.STRING(), Types.LONG(), Types.LONG()]) > > I cant return the same data in *on_timer()* because there is no value > parameter. Thats why i hardcoded *device_ts*. However the exception > persists. > > I am not sure if the timestamp field should be included in *output_type_info* > as i did now. > > Regards > > > On Mon, Jan 18, 2021 at 2:57 PM Shuiqiang Chen <acqua....@gmail.com> > wrote: > >> Hi meneldor, >> >> Actually, the return type of the on_timer() must be the same as >> process_element(). It seems that the yield value of process_element() is >> missing the `timestamp` field. And the `output_type_info` has four field >> names but with 5 field types. Could you align them? >> >> Best, >> Shuiqiang >> > >