[ 
https://issues.apache.org/jira/browse/FLINK-31099?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Huang Xingbo resolved FLINK-31099.
----------------------------------
    Resolution: Fixed

Merged into master via ca770b3d905936d8a93071210bd6542b6733221d
Merged into release-1.17 via c7c035a2413c04cd75948d8364e0770b97499901
Merged into release-1.16 via e3c0060e7fca53e0e01cb91e00607c8146b85604

> Chained WindowOperator throws NPE in PyFlink ThreadMode
> -------------------------------------------------------
>
>                 Key: FLINK-31099
>                 URL: https://issues.apache.org/jira/browse/FLINK-31099
>             Project: Flink
>          Issue Type: Bug
>          Components: API / Python
>    Affects Versions: 1.17.0, 1.16.1
>            Reporter: Huang Xingbo
>            Assignee: Huang Xingbo
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.17.0, 1.16.2
>
>
> Test case
> {code:python}
> config = Configuration()
> config.set_string("python.execution-mode", "process")
> env = StreamExecutionEnvironment.get_execution_environment(config)
> class MyTimestampAssigner(TimestampAssigner, ABC):
>     def extract_timestamp(self, value: tuple, record_timestamp: int) -> int:
>         return value[0]
> ds = env.from_collection(
>     [(1676461680000, "a1", "b1", 1), (1676461680000, "a1", "b1", 1),
>      (1676461680000, "a2", "b2", 1), (1676461680000, "a1", "b2", 1),
>      (1676461740000, "a1", "b1", 1), (1676461740000, "a2", "b2", 1)]
> ).assign_timestamps_and_watermarks(
>     
> WatermarkStrategy.for_monotonous_timestamps().with_timestamp_assigner(MyTimestampAssigner())
> )
> ds.key_by(
>     lambda x: (x[0], x[1], x[2])
> ).window(
>     TumblingEventTimeWindows.of(Time.minutes(1))
> ).reduce(
>     lambda x, y: (x[0], x[1], x[2], x[3] + y[3]),
>     output_type=Types.TUPLE([Types.LONG(), Types.STRING(), Types.STRING(), 
> Types.INT()])
> # ).filter(
> #     lambda x: x[1] == "a1"
> ).map(
>     lambda x: (x[0], x[1], x[3]),
>     output_type=Types.TUPLE([Types.LONG(), Types.STRING(), Types.INT()])
> ).print()
> env.execute()
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to