[jira] [Commented] (FLINK-29461) ProcessDataStreamStreamingTests.test_process_function unstable
[ https://issues.apache.org/jira/browse/FLINK-29461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17646050#comment-17646050 ] Xingbo Huang commented on FLINK-29461: -- [~mapohl] I have prepared a PR to make the test more stable. > ProcessDataStreamStreamingTests.test_process_function unstable > -- > > Key: FLINK-29461 > URL: https://issues.apache.org/jira/browse/FLINK-29461 > Project: Flink > Issue Type: Bug > Components: API / Python >Affects Versions: 1.16.0, 1.17.0 >Reporter: Huang Xingbo >Assignee: Huang Xingbo >Priority: Critical > Labels: pull-request-available, test-stability > > {code:java} > 2022-09-29T02:10:45.3571648Z Sep 29 02:10:45 self = > testMethod=test_process_function> > 2022-09-29T02:10:45.3572279Z Sep 29 02:10:45 > 2022-09-29T02:10:45.3572810Z Sep 29 02:10:45 def > test_process_function(self): > 2022-09-29T02:10:45.3573495Z Sep 29 02:10:45 > self.env.set_parallelism(1) > 2022-09-29T02:10:45.3574148Z Sep 29 02:10:45 > self.env.get_config().set_auto_watermark_interval(2000) > 2022-09-29T02:10:45.3580634Z Sep 29 02:10:45 > self.env.set_stream_time_characteristic(TimeCharacteristic.EventTime) > 2022-09-29T02:10:45.3583194Z Sep 29 02:10:45 data_stream = > self.env.from_collection([(1, '1603708211000'), > 2022-09-29T02:10:45.3584515Z Sep 29 02:10:45 >(2, '1603708224000'), > 2022-09-29T02:10:45.3585957Z Sep 29 02:10:45 >(3, '1603708226000'), > 2022-09-29T02:10:45.3587132Z Sep 29 02:10:45 >(4, '1603708289000')], > 2022-09-29T02:10:45.3588094Z Sep 29 02:10:45 > type_info=Types.ROW([Types.INT(), Types.STRING()])) > 2022-09-29T02:10:45.3589090Z Sep 29 02:10:45 > 2022-09-29T02:10:45.3589949Z Sep 29 02:10:45 class > MyProcessFunction(ProcessFunction): > 2022-09-29T02:10:45.3590710Z Sep 29 02:10:45 > 2022-09-29T02:10:45.3591856Z Sep 29 02:10:45 def > process_element(self, value, ctx): > 2022-09-29T02:10:45.3592873Z Sep 29 02:10:45 > current_timestamp = ctx.timestamp() > 2022-09-29T02:10:45.3593862Z Sep 29 02:10:45 > current_watermark = ctx.timer_service().current_watermark() > 2022-09-29T02:10:45.3594915Z Sep 29 02:10:45 yield "current > timestamp: {}, current watermark: {}, current_value: {}"\ > 2022-09-29T02:10:45.3596201Z Sep 29 02:10:45 > .format(str(current_timestamp), str(current_watermark), str(value)) > 2022-09-29T02:10:45.3597089Z Sep 29 02:10:45 > 2022-09-29T02:10:45.3597942Z Sep 29 02:10:45 watermark_strategy = > WatermarkStrategy.for_monotonous_timestamps()\ > 2022-09-29T02:10:45.3599260Z Sep 29 02:10:45 > .with_timestamp_assigner(SecondColumnTimestampAssigner()) > 2022-09-29T02:10:45.3600611Z Sep 29 02:10:45 > data_stream.assign_timestamps_and_watermarks(watermark_strategy)\ > 2022-09-29T02:10:45.3601877Z Sep 29 02:10:45 > .process(MyProcessFunction(), > output_type=Types.STRING()).add_sink(self.test_sink) > 2022-09-29T02:10:45.3603527Z Sep 29 02:10:45 self.env.execute('test > process function') > 2022-09-29T02:10:45.3604445Z Sep 29 02:10:45 results = > self.test_sink.get_results() > 2022-09-29T02:10:45.3605684Z Sep 29 02:10:45 expected = ["current > timestamp: 1603708211000, current watermark: " > 2022-09-29T02:10:45.3607157Z Sep 29 02:10:45 > "-9223372036854775808, current_value: Row(f0=1, f1='1603708211000')", > 2022-09-29T02:10:45.3608256Z Sep 29 02:10:45 "current > timestamp: 1603708224000, current watermark: " > 2022-09-29T02:10:45.3609650Z Sep 29 02:10:45 > "-9223372036854775808, current_value: Row(f0=2, f1='1603708224000')", > 2022-09-29T02:10:45.3610854Z Sep 29 02:10:45 "current > timestamp: 1603708226000, current watermark: " > 2022-09-29T02:10:45.3612279Z Sep 29 02:10:45 > "-9223372036854775808, current_value: Row(f0=3, f1='1603708226000')", > 2022-09-29T02:10:45.3613382Z Sep 29 02:10:45 "current > timestamp: 1603708289000, current watermark: " > 2022-09-29T02:10:45.3615683Z Sep 29 02:10:45 > "-9223372036854775808, current_value: Row(f0=4, f1='1603708289000')"] > 2022-09-29T02:10:45.3617687Z Sep 29 02:10:45 > > self.assert_equals_sorted(expected, results) > 2022-09-29T02:10:45.3618620Z Sep 29 02:10:45 > 2022-09-29T02:10:45.3619425Z Sep 29 02:10:45 > pyflink/datastream/tests/test_data_stream.py:986: > 2022-09-29T02:10:45.3620424Z Sep 29 02:10:45 _ _ _ _ _ _ _ _ _ _
[jira] [Commented] (FLINK-29461) ProcessDataStreamStreamingTests.test_process_function unstable
[ https://issues.apache.org/jira/browse/FLINK-29461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17646011#comment-17646011 ] Leonard Xu commented on FLINK-29461: https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=43871=logs=9cada3cb-c1d3-5621-16da-0f718fb86602=c67e71ed-6451-5d26-8920-5a8cf9651901 > ProcessDataStreamStreamingTests.test_process_function unstable > -- > > Key: FLINK-29461 > URL: https://issues.apache.org/jira/browse/FLINK-29461 > Project: Flink > Issue Type: Bug > Components: API / Python >Affects Versions: 1.16.0, 1.17.0 >Reporter: Huang Xingbo >Assignee: Huang Xingbo >Priority: Critical > Labels: test-stability > > {code:java} > 2022-09-29T02:10:45.3571648Z Sep 29 02:10:45 self = > testMethod=test_process_function> > 2022-09-29T02:10:45.3572279Z Sep 29 02:10:45 > 2022-09-29T02:10:45.3572810Z Sep 29 02:10:45 def > test_process_function(self): > 2022-09-29T02:10:45.3573495Z Sep 29 02:10:45 > self.env.set_parallelism(1) > 2022-09-29T02:10:45.3574148Z Sep 29 02:10:45 > self.env.get_config().set_auto_watermark_interval(2000) > 2022-09-29T02:10:45.3580634Z Sep 29 02:10:45 > self.env.set_stream_time_characteristic(TimeCharacteristic.EventTime) > 2022-09-29T02:10:45.3583194Z Sep 29 02:10:45 data_stream = > self.env.from_collection([(1, '1603708211000'), > 2022-09-29T02:10:45.3584515Z Sep 29 02:10:45 >(2, '1603708224000'), > 2022-09-29T02:10:45.3585957Z Sep 29 02:10:45 >(3, '1603708226000'), > 2022-09-29T02:10:45.3587132Z Sep 29 02:10:45 >(4, '1603708289000')], > 2022-09-29T02:10:45.3588094Z Sep 29 02:10:45 > type_info=Types.ROW([Types.INT(), Types.STRING()])) > 2022-09-29T02:10:45.3589090Z Sep 29 02:10:45 > 2022-09-29T02:10:45.3589949Z Sep 29 02:10:45 class > MyProcessFunction(ProcessFunction): > 2022-09-29T02:10:45.3590710Z Sep 29 02:10:45 > 2022-09-29T02:10:45.3591856Z Sep 29 02:10:45 def > process_element(self, value, ctx): > 2022-09-29T02:10:45.3592873Z Sep 29 02:10:45 > current_timestamp = ctx.timestamp() > 2022-09-29T02:10:45.3593862Z Sep 29 02:10:45 > current_watermark = ctx.timer_service().current_watermark() > 2022-09-29T02:10:45.3594915Z Sep 29 02:10:45 yield "current > timestamp: {}, current watermark: {}, current_value: {}"\ > 2022-09-29T02:10:45.3596201Z Sep 29 02:10:45 > .format(str(current_timestamp), str(current_watermark), str(value)) > 2022-09-29T02:10:45.3597089Z Sep 29 02:10:45 > 2022-09-29T02:10:45.3597942Z Sep 29 02:10:45 watermark_strategy = > WatermarkStrategy.for_monotonous_timestamps()\ > 2022-09-29T02:10:45.3599260Z Sep 29 02:10:45 > .with_timestamp_assigner(SecondColumnTimestampAssigner()) > 2022-09-29T02:10:45.3600611Z Sep 29 02:10:45 > data_stream.assign_timestamps_and_watermarks(watermark_strategy)\ > 2022-09-29T02:10:45.3601877Z Sep 29 02:10:45 > .process(MyProcessFunction(), > output_type=Types.STRING()).add_sink(self.test_sink) > 2022-09-29T02:10:45.3603527Z Sep 29 02:10:45 self.env.execute('test > process function') > 2022-09-29T02:10:45.3604445Z Sep 29 02:10:45 results = > self.test_sink.get_results() > 2022-09-29T02:10:45.3605684Z Sep 29 02:10:45 expected = ["current > timestamp: 1603708211000, current watermark: " > 2022-09-29T02:10:45.3607157Z Sep 29 02:10:45 > "-9223372036854775808, current_value: Row(f0=1, f1='1603708211000')", > 2022-09-29T02:10:45.3608256Z Sep 29 02:10:45 "current > timestamp: 1603708224000, current watermark: " > 2022-09-29T02:10:45.3609650Z Sep 29 02:10:45 > "-9223372036854775808, current_value: Row(f0=2, f1='1603708224000')", > 2022-09-29T02:10:45.3610854Z Sep 29 02:10:45 "current > timestamp: 1603708226000, current watermark: " > 2022-09-29T02:10:45.3612279Z Sep 29 02:10:45 > "-9223372036854775808, current_value: Row(f0=3, f1='1603708226000')", > 2022-09-29T02:10:45.3613382Z Sep 29 02:10:45 "current > timestamp: 1603708289000, current watermark: " > 2022-09-29T02:10:45.3615683Z Sep 29 02:10:45 > "-9223372036854775808, current_value: Row(f0=4, f1='1603708289000')"] > 2022-09-29T02:10:45.3617687Z Sep 29 02:10:45 > > self.assert_equals_sorted(expected, results) > 2022-09-29T02:10:45.3618620Z Sep 29 02:10:45 > 2022-09-29T02:10:45.3619425Z Sep 29 02:10:45 > pyflink/datastream/tests/test_data_stream.py:986:
[jira] [Commented] (FLINK-29461) ProcessDataStreamStreamingTests.test_process_function unstable
[ https://issues.apache.org/jira/browse/FLINK-29461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17645147#comment-17645147 ] Matthias Pohl commented on FLINK-29461: --- [~dianfu] [~hxbks2ks] any updates on that one? Or is there anyone else we can ping to have a look at it? > ProcessDataStreamStreamingTests.test_process_function unstable > -- > > Key: FLINK-29461 > URL: https://issues.apache.org/jira/browse/FLINK-29461 > Project: Flink > Issue Type: Bug > Components: API / Python >Affects Versions: 1.16.0, 1.17.0 >Reporter: Huang Xingbo >Assignee: Huang Xingbo >Priority: Critical > Labels: test-stability > > {code:java} > 2022-09-29T02:10:45.3571648Z Sep 29 02:10:45 self = > testMethod=test_process_function> > 2022-09-29T02:10:45.3572279Z Sep 29 02:10:45 > 2022-09-29T02:10:45.3572810Z Sep 29 02:10:45 def > test_process_function(self): > 2022-09-29T02:10:45.3573495Z Sep 29 02:10:45 > self.env.set_parallelism(1) > 2022-09-29T02:10:45.3574148Z Sep 29 02:10:45 > self.env.get_config().set_auto_watermark_interval(2000) > 2022-09-29T02:10:45.3580634Z Sep 29 02:10:45 > self.env.set_stream_time_characteristic(TimeCharacteristic.EventTime) > 2022-09-29T02:10:45.3583194Z Sep 29 02:10:45 data_stream = > self.env.from_collection([(1, '1603708211000'), > 2022-09-29T02:10:45.3584515Z Sep 29 02:10:45 >(2, '1603708224000'), > 2022-09-29T02:10:45.3585957Z Sep 29 02:10:45 >(3, '1603708226000'), > 2022-09-29T02:10:45.3587132Z Sep 29 02:10:45 >(4, '1603708289000')], > 2022-09-29T02:10:45.3588094Z Sep 29 02:10:45 > type_info=Types.ROW([Types.INT(), Types.STRING()])) > 2022-09-29T02:10:45.3589090Z Sep 29 02:10:45 > 2022-09-29T02:10:45.3589949Z Sep 29 02:10:45 class > MyProcessFunction(ProcessFunction): > 2022-09-29T02:10:45.3590710Z Sep 29 02:10:45 > 2022-09-29T02:10:45.3591856Z Sep 29 02:10:45 def > process_element(self, value, ctx): > 2022-09-29T02:10:45.3592873Z Sep 29 02:10:45 > current_timestamp = ctx.timestamp() > 2022-09-29T02:10:45.3593862Z Sep 29 02:10:45 > current_watermark = ctx.timer_service().current_watermark() > 2022-09-29T02:10:45.3594915Z Sep 29 02:10:45 yield "current > timestamp: {}, current watermark: {}, current_value: {}"\ > 2022-09-29T02:10:45.3596201Z Sep 29 02:10:45 > .format(str(current_timestamp), str(current_watermark), str(value)) > 2022-09-29T02:10:45.3597089Z Sep 29 02:10:45 > 2022-09-29T02:10:45.3597942Z Sep 29 02:10:45 watermark_strategy = > WatermarkStrategy.for_monotonous_timestamps()\ > 2022-09-29T02:10:45.3599260Z Sep 29 02:10:45 > .with_timestamp_assigner(SecondColumnTimestampAssigner()) > 2022-09-29T02:10:45.3600611Z Sep 29 02:10:45 > data_stream.assign_timestamps_and_watermarks(watermark_strategy)\ > 2022-09-29T02:10:45.3601877Z Sep 29 02:10:45 > .process(MyProcessFunction(), > output_type=Types.STRING()).add_sink(self.test_sink) > 2022-09-29T02:10:45.3603527Z Sep 29 02:10:45 self.env.execute('test > process function') > 2022-09-29T02:10:45.3604445Z Sep 29 02:10:45 results = > self.test_sink.get_results() > 2022-09-29T02:10:45.3605684Z Sep 29 02:10:45 expected = ["current > timestamp: 1603708211000, current watermark: " > 2022-09-29T02:10:45.3607157Z Sep 29 02:10:45 > "-9223372036854775808, current_value: Row(f0=1, f1='1603708211000')", > 2022-09-29T02:10:45.3608256Z Sep 29 02:10:45 "current > timestamp: 1603708224000, current watermark: " > 2022-09-29T02:10:45.3609650Z Sep 29 02:10:45 > "-9223372036854775808, current_value: Row(f0=2, f1='1603708224000')", > 2022-09-29T02:10:45.3610854Z Sep 29 02:10:45 "current > timestamp: 1603708226000, current watermark: " > 2022-09-29T02:10:45.3612279Z Sep 29 02:10:45 > "-9223372036854775808, current_value: Row(f0=3, f1='1603708226000')", > 2022-09-29T02:10:45.3613382Z Sep 29 02:10:45 "current > timestamp: 1603708289000, current watermark: " > 2022-09-29T02:10:45.3615683Z Sep 29 02:10:45 > "-9223372036854775808, current_value: Row(f0=4, f1='1603708289000')"] > 2022-09-29T02:10:45.3617687Z Sep 29 02:10:45 > > self.assert_equals_sorted(expected, results) > 2022-09-29T02:10:45.3618620Z Sep 29 02:10:45 > 2022-09-29T02:10:45.3619425Z Sep 29 02:10:45 > pyflink/datastream/tests/test_data_stream.py:986: > 2022-09-29T02:10:45.3620424Z Sep 29
[jira] [Commented] (FLINK-29461) ProcessDataStreamStreamingTests.test_process_function unstable
[ https://issues.apache.org/jira/browse/FLINK-29461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17645145#comment-17645145 ] Matthias Pohl commented on FLINK-29461: --- https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=43817=logs=3e4dd1a2-fe2f-5e5d-a581-48087e718d53=b4612f28-e3b5-5853-8a8b-610ae894217a=28358 > ProcessDataStreamStreamingTests.test_process_function unstable > -- > > Key: FLINK-29461 > URL: https://issues.apache.org/jira/browse/FLINK-29461 > Project: Flink > Issue Type: Bug > Components: API / Python >Affects Versions: 1.16.0, 1.17.0 >Reporter: Huang Xingbo >Assignee: Huang Xingbo >Priority: Critical > Labels: test-stability > > {code:java} > 2022-09-29T02:10:45.3571648Z Sep 29 02:10:45 self = > testMethod=test_process_function> > 2022-09-29T02:10:45.3572279Z Sep 29 02:10:45 > 2022-09-29T02:10:45.3572810Z Sep 29 02:10:45 def > test_process_function(self): > 2022-09-29T02:10:45.3573495Z Sep 29 02:10:45 > self.env.set_parallelism(1) > 2022-09-29T02:10:45.3574148Z Sep 29 02:10:45 > self.env.get_config().set_auto_watermark_interval(2000) > 2022-09-29T02:10:45.3580634Z Sep 29 02:10:45 > self.env.set_stream_time_characteristic(TimeCharacteristic.EventTime) > 2022-09-29T02:10:45.3583194Z Sep 29 02:10:45 data_stream = > self.env.from_collection([(1, '1603708211000'), > 2022-09-29T02:10:45.3584515Z Sep 29 02:10:45 >(2, '1603708224000'), > 2022-09-29T02:10:45.3585957Z Sep 29 02:10:45 >(3, '1603708226000'), > 2022-09-29T02:10:45.3587132Z Sep 29 02:10:45 >(4, '1603708289000')], > 2022-09-29T02:10:45.3588094Z Sep 29 02:10:45 > type_info=Types.ROW([Types.INT(), Types.STRING()])) > 2022-09-29T02:10:45.3589090Z Sep 29 02:10:45 > 2022-09-29T02:10:45.3589949Z Sep 29 02:10:45 class > MyProcessFunction(ProcessFunction): > 2022-09-29T02:10:45.3590710Z Sep 29 02:10:45 > 2022-09-29T02:10:45.3591856Z Sep 29 02:10:45 def > process_element(self, value, ctx): > 2022-09-29T02:10:45.3592873Z Sep 29 02:10:45 > current_timestamp = ctx.timestamp() > 2022-09-29T02:10:45.3593862Z Sep 29 02:10:45 > current_watermark = ctx.timer_service().current_watermark() > 2022-09-29T02:10:45.3594915Z Sep 29 02:10:45 yield "current > timestamp: {}, current watermark: {}, current_value: {}"\ > 2022-09-29T02:10:45.3596201Z Sep 29 02:10:45 > .format(str(current_timestamp), str(current_watermark), str(value)) > 2022-09-29T02:10:45.3597089Z Sep 29 02:10:45 > 2022-09-29T02:10:45.3597942Z Sep 29 02:10:45 watermark_strategy = > WatermarkStrategy.for_monotonous_timestamps()\ > 2022-09-29T02:10:45.3599260Z Sep 29 02:10:45 > .with_timestamp_assigner(SecondColumnTimestampAssigner()) > 2022-09-29T02:10:45.3600611Z Sep 29 02:10:45 > data_stream.assign_timestamps_and_watermarks(watermark_strategy)\ > 2022-09-29T02:10:45.3601877Z Sep 29 02:10:45 > .process(MyProcessFunction(), > output_type=Types.STRING()).add_sink(self.test_sink) > 2022-09-29T02:10:45.3603527Z Sep 29 02:10:45 self.env.execute('test > process function') > 2022-09-29T02:10:45.3604445Z Sep 29 02:10:45 results = > self.test_sink.get_results() > 2022-09-29T02:10:45.3605684Z Sep 29 02:10:45 expected = ["current > timestamp: 1603708211000, current watermark: " > 2022-09-29T02:10:45.3607157Z Sep 29 02:10:45 > "-9223372036854775808, current_value: Row(f0=1, f1='1603708211000')", > 2022-09-29T02:10:45.3608256Z Sep 29 02:10:45 "current > timestamp: 1603708224000, current watermark: " > 2022-09-29T02:10:45.3609650Z Sep 29 02:10:45 > "-9223372036854775808, current_value: Row(f0=2, f1='1603708224000')", > 2022-09-29T02:10:45.3610854Z Sep 29 02:10:45 "current > timestamp: 1603708226000, current watermark: " > 2022-09-29T02:10:45.3612279Z Sep 29 02:10:45 > "-9223372036854775808, current_value: Row(f0=3, f1='1603708226000')", > 2022-09-29T02:10:45.3613382Z Sep 29 02:10:45 "current > timestamp: 1603708289000, current watermark: " > 2022-09-29T02:10:45.3615683Z Sep 29 02:10:45 > "-9223372036854775808, current_value: Row(f0=4, f1='1603708289000')"] > 2022-09-29T02:10:45.3617687Z Sep 29 02:10:45 > > self.assert_equals_sorted(expected, results) > 2022-09-29T02:10:45.3618620Z Sep 29 02:10:45 > 2022-09-29T02:10:45.3619425Z Sep 29 02:10:45 >
[jira] [Commented] (FLINK-29461) ProcessDataStreamStreamingTests.test_process_function unstable
[ https://issues.apache.org/jira/browse/FLINK-29461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17644132#comment-17644132 ] Qingsheng Ren commented on FLINK-29461: --- https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=43771=logs=e92ecf6d-e207-5a42-7ff7-528ff0c5b259=40fc352e-9b4c-5fd8-363f-628f24b01ec2=28618 > ProcessDataStreamStreamingTests.test_process_function unstable > -- > > Key: FLINK-29461 > URL: https://issues.apache.org/jira/browse/FLINK-29461 > Project: Flink > Issue Type: Bug > Components: API / Python >Affects Versions: 1.16.0, 1.17.0 >Reporter: Huang Xingbo >Assignee: Huang Xingbo >Priority: Critical > Labels: test-stability > > {code:java} > 2022-09-29T02:10:45.3571648Z Sep 29 02:10:45 self = > testMethod=test_process_function> > 2022-09-29T02:10:45.3572279Z Sep 29 02:10:45 > 2022-09-29T02:10:45.3572810Z Sep 29 02:10:45 def > test_process_function(self): > 2022-09-29T02:10:45.3573495Z Sep 29 02:10:45 > self.env.set_parallelism(1) > 2022-09-29T02:10:45.3574148Z Sep 29 02:10:45 > self.env.get_config().set_auto_watermark_interval(2000) > 2022-09-29T02:10:45.3580634Z Sep 29 02:10:45 > self.env.set_stream_time_characteristic(TimeCharacteristic.EventTime) > 2022-09-29T02:10:45.3583194Z Sep 29 02:10:45 data_stream = > self.env.from_collection([(1, '1603708211000'), > 2022-09-29T02:10:45.3584515Z Sep 29 02:10:45 >(2, '1603708224000'), > 2022-09-29T02:10:45.3585957Z Sep 29 02:10:45 >(3, '1603708226000'), > 2022-09-29T02:10:45.3587132Z Sep 29 02:10:45 >(4, '1603708289000')], > 2022-09-29T02:10:45.3588094Z Sep 29 02:10:45 > type_info=Types.ROW([Types.INT(), Types.STRING()])) > 2022-09-29T02:10:45.3589090Z Sep 29 02:10:45 > 2022-09-29T02:10:45.3589949Z Sep 29 02:10:45 class > MyProcessFunction(ProcessFunction): > 2022-09-29T02:10:45.3590710Z Sep 29 02:10:45 > 2022-09-29T02:10:45.3591856Z Sep 29 02:10:45 def > process_element(self, value, ctx): > 2022-09-29T02:10:45.3592873Z Sep 29 02:10:45 > current_timestamp = ctx.timestamp() > 2022-09-29T02:10:45.3593862Z Sep 29 02:10:45 > current_watermark = ctx.timer_service().current_watermark() > 2022-09-29T02:10:45.3594915Z Sep 29 02:10:45 yield "current > timestamp: {}, current watermark: {}, current_value: {}"\ > 2022-09-29T02:10:45.3596201Z Sep 29 02:10:45 > .format(str(current_timestamp), str(current_watermark), str(value)) > 2022-09-29T02:10:45.3597089Z Sep 29 02:10:45 > 2022-09-29T02:10:45.3597942Z Sep 29 02:10:45 watermark_strategy = > WatermarkStrategy.for_monotonous_timestamps()\ > 2022-09-29T02:10:45.3599260Z Sep 29 02:10:45 > .with_timestamp_assigner(SecondColumnTimestampAssigner()) > 2022-09-29T02:10:45.3600611Z Sep 29 02:10:45 > data_stream.assign_timestamps_and_watermarks(watermark_strategy)\ > 2022-09-29T02:10:45.3601877Z Sep 29 02:10:45 > .process(MyProcessFunction(), > output_type=Types.STRING()).add_sink(self.test_sink) > 2022-09-29T02:10:45.3603527Z Sep 29 02:10:45 self.env.execute('test > process function') > 2022-09-29T02:10:45.3604445Z Sep 29 02:10:45 results = > self.test_sink.get_results() > 2022-09-29T02:10:45.3605684Z Sep 29 02:10:45 expected = ["current > timestamp: 1603708211000, current watermark: " > 2022-09-29T02:10:45.3607157Z Sep 29 02:10:45 > "-9223372036854775808, current_value: Row(f0=1, f1='1603708211000')", > 2022-09-29T02:10:45.3608256Z Sep 29 02:10:45 "current > timestamp: 1603708224000, current watermark: " > 2022-09-29T02:10:45.3609650Z Sep 29 02:10:45 > "-9223372036854775808, current_value: Row(f0=2, f1='1603708224000')", > 2022-09-29T02:10:45.3610854Z Sep 29 02:10:45 "current > timestamp: 1603708226000, current watermark: " > 2022-09-29T02:10:45.3612279Z Sep 29 02:10:45 > "-9223372036854775808, current_value: Row(f0=3, f1='1603708226000')", > 2022-09-29T02:10:45.3613382Z Sep 29 02:10:45 "current > timestamp: 1603708289000, current watermark: " > 2022-09-29T02:10:45.3615683Z Sep 29 02:10:45 > "-9223372036854775808, current_value: Row(f0=4, f1='1603708289000')"] > 2022-09-29T02:10:45.3617687Z Sep 29 02:10:45 > > self.assert_equals_sorted(expected, results) > 2022-09-29T02:10:45.3618620Z Sep 29 02:10:45 > 2022-09-29T02:10:45.3619425Z Sep 29 02:10:45 >
[jira] [Commented] (FLINK-29461) ProcessDataStreamStreamingTests.test_process_function unstable
[ https://issues.apache.org/jira/browse/FLINK-29461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17643720#comment-17643720 ] Martijn Visser commented on FLINK-29461: https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=43742=logs=e92ecf6d-e207-5a42-7ff7-528ff0c5b259=40fc352e-9b4c-5fd8-363f-628f24b01ec2=28926 > ProcessDataStreamStreamingTests.test_process_function unstable > -- > > Key: FLINK-29461 > URL: https://issues.apache.org/jira/browse/FLINK-29461 > Project: Flink > Issue Type: Bug > Components: API / Python >Affects Versions: 1.16.0, 1.17.0 >Reporter: Huang Xingbo >Assignee: Huang Xingbo >Priority: Critical > Labels: test-stability > > {code:java} > 2022-09-29T02:10:45.3571648Z Sep 29 02:10:45 self = > testMethod=test_process_function> > 2022-09-29T02:10:45.3572279Z Sep 29 02:10:45 > 2022-09-29T02:10:45.3572810Z Sep 29 02:10:45 def > test_process_function(self): > 2022-09-29T02:10:45.3573495Z Sep 29 02:10:45 > self.env.set_parallelism(1) > 2022-09-29T02:10:45.3574148Z Sep 29 02:10:45 > self.env.get_config().set_auto_watermark_interval(2000) > 2022-09-29T02:10:45.3580634Z Sep 29 02:10:45 > self.env.set_stream_time_characteristic(TimeCharacteristic.EventTime) > 2022-09-29T02:10:45.3583194Z Sep 29 02:10:45 data_stream = > self.env.from_collection([(1, '1603708211000'), > 2022-09-29T02:10:45.3584515Z Sep 29 02:10:45 >(2, '1603708224000'), > 2022-09-29T02:10:45.3585957Z Sep 29 02:10:45 >(3, '1603708226000'), > 2022-09-29T02:10:45.3587132Z Sep 29 02:10:45 >(4, '1603708289000')], > 2022-09-29T02:10:45.3588094Z Sep 29 02:10:45 > type_info=Types.ROW([Types.INT(), Types.STRING()])) > 2022-09-29T02:10:45.3589090Z Sep 29 02:10:45 > 2022-09-29T02:10:45.3589949Z Sep 29 02:10:45 class > MyProcessFunction(ProcessFunction): > 2022-09-29T02:10:45.3590710Z Sep 29 02:10:45 > 2022-09-29T02:10:45.3591856Z Sep 29 02:10:45 def > process_element(self, value, ctx): > 2022-09-29T02:10:45.3592873Z Sep 29 02:10:45 > current_timestamp = ctx.timestamp() > 2022-09-29T02:10:45.3593862Z Sep 29 02:10:45 > current_watermark = ctx.timer_service().current_watermark() > 2022-09-29T02:10:45.3594915Z Sep 29 02:10:45 yield "current > timestamp: {}, current watermark: {}, current_value: {}"\ > 2022-09-29T02:10:45.3596201Z Sep 29 02:10:45 > .format(str(current_timestamp), str(current_watermark), str(value)) > 2022-09-29T02:10:45.3597089Z Sep 29 02:10:45 > 2022-09-29T02:10:45.3597942Z Sep 29 02:10:45 watermark_strategy = > WatermarkStrategy.for_monotonous_timestamps()\ > 2022-09-29T02:10:45.3599260Z Sep 29 02:10:45 > .with_timestamp_assigner(SecondColumnTimestampAssigner()) > 2022-09-29T02:10:45.3600611Z Sep 29 02:10:45 > data_stream.assign_timestamps_and_watermarks(watermark_strategy)\ > 2022-09-29T02:10:45.3601877Z Sep 29 02:10:45 > .process(MyProcessFunction(), > output_type=Types.STRING()).add_sink(self.test_sink) > 2022-09-29T02:10:45.3603527Z Sep 29 02:10:45 self.env.execute('test > process function') > 2022-09-29T02:10:45.3604445Z Sep 29 02:10:45 results = > self.test_sink.get_results() > 2022-09-29T02:10:45.3605684Z Sep 29 02:10:45 expected = ["current > timestamp: 1603708211000, current watermark: " > 2022-09-29T02:10:45.3607157Z Sep 29 02:10:45 > "-9223372036854775808, current_value: Row(f0=1, f1='1603708211000')", > 2022-09-29T02:10:45.3608256Z Sep 29 02:10:45 "current > timestamp: 1603708224000, current watermark: " > 2022-09-29T02:10:45.3609650Z Sep 29 02:10:45 > "-9223372036854775808, current_value: Row(f0=2, f1='1603708224000')", > 2022-09-29T02:10:45.3610854Z Sep 29 02:10:45 "current > timestamp: 1603708226000, current watermark: " > 2022-09-29T02:10:45.3612279Z Sep 29 02:10:45 > "-9223372036854775808, current_value: Row(f0=3, f1='1603708226000')", > 2022-09-29T02:10:45.3613382Z Sep 29 02:10:45 "current > timestamp: 1603708289000, current watermark: " > 2022-09-29T02:10:45.3615683Z Sep 29 02:10:45 > "-9223372036854775808, current_value: Row(f0=4, f1='1603708289000')"] > 2022-09-29T02:10:45.3617687Z Sep 29 02:10:45 > > self.assert_equals_sorted(expected, results) > 2022-09-29T02:10:45.3618620Z Sep 29 02:10:45 > 2022-09-29T02:10:45.3619425Z Sep 29 02:10:45 >
[jira] [Commented] (FLINK-29461) ProcessDataStreamStreamingTests.test_process_function unstable
[ https://issues.apache.org/jira/browse/FLINK-29461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17643282#comment-17643282 ] Matthias Pohl commented on FLINK-29461: --- https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=43711=logs=bf5e383b-9fd3-5f02-ca1c-8f788e2e76d3=85189c57-d8a0-5c9c-b61d-fc05cfac62cf=29177 > ProcessDataStreamStreamingTests.test_process_function unstable > -- > > Key: FLINK-29461 > URL: https://issues.apache.org/jira/browse/FLINK-29461 > Project: Flink > Issue Type: Bug > Components: API / Python >Affects Versions: 1.16.0, 1.17.0 >Reporter: Huang Xingbo >Assignee: Huang Xingbo >Priority: Critical > Labels: test-stability > > {code:java} > 2022-09-29T02:10:45.3571648Z Sep 29 02:10:45 self = > testMethod=test_process_function> > 2022-09-29T02:10:45.3572279Z Sep 29 02:10:45 > 2022-09-29T02:10:45.3572810Z Sep 29 02:10:45 def > test_process_function(self): > 2022-09-29T02:10:45.3573495Z Sep 29 02:10:45 > self.env.set_parallelism(1) > 2022-09-29T02:10:45.3574148Z Sep 29 02:10:45 > self.env.get_config().set_auto_watermark_interval(2000) > 2022-09-29T02:10:45.3580634Z Sep 29 02:10:45 > self.env.set_stream_time_characteristic(TimeCharacteristic.EventTime) > 2022-09-29T02:10:45.3583194Z Sep 29 02:10:45 data_stream = > self.env.from_collection([(1, '1603708211000'), > 2022-09-29T02:10:45.3584515Z Sep 29 02:10:45 >(2, '1603708224000'), > 2022-09-29T02:10:45.3585957Z Sep 29 02:10:45 >(3, '1603708226000'), > 2022-09-29T02:10:45.3587132Z Sep 29 02:10:45 >(4, '1603708289000')], > 2022-09-29T02:10:45.3588094Z Sep 29 02:10:45 > type_info=Types.ROW([Types.INT(), Types.STRING()])) > 2022-09-29T02:10:45.3589090Z Sep 29 02:10:45 > 2022-09-29T02:10:45.3589949Z Sep 29 02:10:45 class > MyProcessFunction(ProcessFunction): > 2022-09-29T02:10:45.3590710Z Sep 29 02:10:45 > 2022-09-29T02:10:45.3591856Z Sep 29 02:10:45 def > process_element(self, value, ctx): > 2022-09-29T02:10:45.3592873Z Sep 29 02:10:45 > current_timestamp = ctx.timestamp() > 2022-09-29T02:10:45.3593862Z Sep 29 02:10:45 > current_watermark = ctx.timer_service().current_watermark() > 2022-09-29T02:10:45.3594915Z Sep 29 02:10:45 yield "current > timestamp: {}, current watermark: {}, current_value: {}"\ > 2022-09-29T02:10:45.3596201Z Sep 29 02:10:45 > .format(str(current_timestamp), str(current_watermark), str(value)) > 2022-09-29T02:10:45.3597089Z Sep 29 02:10:45 > 2022-09-29T02:10:45.3597942Z Sep 29 02:10:45 watermark_strategy = > WatermarkStrategy.for_monotonous_timestamps()\ > 2022-09-29T02:10:45.3599260Z Sep 29 02:10:45 > .with_timestamp_assigner(SecondColumnTimestampAssigner()) > 2022-09-29T02:10:45.3600611Z Sep 29 02:10:45 > data_stream.assign_timestamps_and_watermarks(watermark_strategy)\ > 2022-09-29T02:10:45.3601877Z Sep 29 02:10:45 > .process(MyProcessFunction(), > output_type=Types.STRING()).add_sink(self.test_sink) > 2022-09-29T02:10:45.3603527Z Sep 29 02:10:45 self.env.execute('test > process function') > 2022-09-29T02:10:45.3604445Z Sep 29 02:10:45 results = > self.test_sink.get_results() > 2022-09-29T02:10:45.3605684Z Sep 29 02:10:45 expected = ["current > timestamp: 1603708211000, current watermark: " > 2022-09-29T02:10:45.3607157Z Sep 29 02:10:45 > "-9223372036854775808, current_value: Row(f0=1, f1='1603708211000')", > 2022-09-29T02:10:45.3608256Z Sep 29 02:10:45 "current > timestamp: 1603708224000, current watermark: " > 2022-09-29T02:10:45.3609650Z Sep 29 02:10:45 > "-9223372036854775808, current_value: Row(f0=2, f1='1603708224000')", > 2022-09-29T02:10:45.3610854Z Sep 29 02:10:45 "current > timestamp: 1603708226000, current watermark: " > 2022-09-29T02:10:45.3612279Z Sep 29 02:10:45 > "-9223372036854775808, current_value: Row(f0=3, f1='1603708226000')", > 2022-09-29T02:10:45.3613382Z Sep 29 02:10:45 "current > timestamp: 1603708289000, current watermark: " > 2022-09-29T02:10:45.3615683Z Sep 29 02:10:45 > "-9223372036854775808, current_value: Row(f0=4, f1='1603708289000')"] > 2022-09-29T02:10:45.3617687Z Sep 29 02:10:45 > > self.assert_equals_sorted(expected, results) > 2022-09-29T02:10:45.3618620Z Sep 29 02:10:45 > 2022-09-29T02:10:45.3619425Z Sep 29 02:10:45 >
[jira] [Commented] (FLINK-29461) ProcessDataStreamStreamingTests.test_process_function unstable
[ https://issues.apache.org/jira/browse/FLINK-29461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17643273#comment-17643273 ] Matthias Pohl commented on FLINK-29461: --- https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=43709=logs=9cada3cb-c1d3-5621-16da-0f718fb86602=c67e71ed-6451-5d26-8920-5a8cf9651901=28522 > ProcessDataStreamStreamingTests.test_process_function unstable > -- > > Key: FLINK-29461 > URL: https://issues.apache.org/jira/browse/FLINK-29461 > Project: Flink > Issue Type: Bug > Components: API / Python >Affects Versions: 1.16.0, 1.17.0 >Reporter: Huang Xingbo >Assignee: Huang Xingbo >Priority: Critical > Labels: test-stability > > {code:java} > 2022-09-29T02:10:45.3571648Z Sep 29 02:10:45 self = > testMethod=test_process_function> > 2022-09-29T02:10:45.3572279Z Sep 29 02:10:45 > 2022-09-29T02:10:45.3572810Z Sep 29 02:10:45 def > test_process_function(self): > 2022-09-29T02:10:45.3573495Z Sep 29 02:10:45 > self.env.set_parallelism(1) > 2022-09-29T02:10:45.3574148Z Sep 29 02:10:45 > self.env.get_config().set_auto_watermark_interval(2000) > 2022-09-29T02:10:45.3580634Z Sep 29 02:10:45 > self.env.set_stream_time_characteristic(TimeCharacteristic.EventTime) > 2022-09-29T02:10:45.3583194Z Sep 29 02:10:45 data_stream = > self.env.from_collection([(1, '1603708211000'), > 2022-09-29T02:10:45.3584515Z Sep 29 02:10:45 >(2, '1603708224000'), > 2022-09-29T02:10:45.3585957Z Sep 29 02:10:45 >(3, '1603708226000'), > 2022-09-29T02:10:45.3587132Z Sep 29 02:10:45 >(4, '1603708289000')], > 2022-09-29T02:10:45.3588094Z Sep 29 02:10:45 > type_info=Types.ROW([Types.INT(), Types.STRING()])) > 2022-09-29T02:10:45.3589090Z Sep 29 02:10:45 > 2022-09-29T02:10:45.3589949Z Sep 29 02:10:45 class > MyProcessFunction(ProcessFunction): > 2022-09-29T02:10:45.3590710Z Sep 29 02:10:45 > 2022-09-29T02:10:45.3591856Z Sep 29 02:10:45 def > process_element(self, value, ctx): > 2022-09-29T02:10:45.3592873Z Sep 29 02:10:45 > current_timestamp = ctx.timestamp() > 2022-09-29T02:10:45.3593862Z Sep 29 02:10:45 > current_watermark = ctx.timer_service().current_watermark() > 2022-09-29T02:10:45.3594915Z Sep 29 02:10:45 yield "current > timestamp: {}, current watermark: {}, current_value: {}"\ > 2022-09-29T02:10:45.3596201Z Sep 29 02:10:45 > .format(str(current_timestamp), str(current_watermark), str(value)) > 2022-09-29T02:10:45.3597089Z Sep 29 02:10:45 > 2022-09-29T02:10:45.3597942Z Sep 29 02:10:45 watermark_strategy = > WatermarkStrategy.for_monotonous_timestamps()\ > 2022-09-29T02:10:45.3599260Z Sep 29 02:10:45 > .with_timestamp_assigner(SecondColumnTimestampAssigner()) > 2022-09-29T02:10:45.3600611Z Sep 29 02:10:45 > data_stream.assign_timestamps_and_watermarks(watermark_strategy)\ > 2022-09-29T02:10:45.3601877Z Sep 29 02:10:45 > .process(MyProcessFunction(), > output_type=Types.STRING()).add_sink(self.test_sink) > 2022-09-29T02:10:45.3603527Z Sep 29 02:10:45 self.env.execute('test > process function') > 2022-09-29T02:10:45.3604445Z Sep 29 02:10:45 results = > self.test_sink.get_results() > 2022-09-29T02:10:45.3605684Z Sep 29 02:10:45 expected = ["current > timestamp: 1603708211000, current watermark: " > 2022-09-29T02:10:45.3607157Z Sep 29 02:10:45 > "-9223372036854775808, current_value: Row(f0=1, f1='1603708211000')", > 2022-09-29T02:10:45.3608256Z Sep 29 02:10:45 "current > timestamp: 1603708224000, current watermark: " > 2022-09-29T02:10:45.3609650Z Sep 29 02:10:45 > "-9223372036854775808, current_value: Row(f0=2, f1='1603708224000')", > 2022-09-29T02:10:45.3610854Z Sep 29 02:10:45 "current > timestamp: 1603708226000, current watermark: " > 2022-09-29T02:10:45.3612279Z Sep 29 02:10:45 > "-9223372036854775808, current_value: Row(f0=3, f1='1603708226000')", > 2022-09-29T02:10:45.3613382Z Sep 29 02:10:45 "current > timestamp: 1603708289000, current watermark: " > 2022-09-29T02:10:45.3615683Z Sep 29 02:10:45 > "-9223372036854775808, current_value: Row(f0=4, f1='1603708289000')"] > 2022-09-29T02:10:45.3617687Z Sep 29 02:10:45 > > self.assert_equals_sorted(expected, results) > 2022-09-29T02:10:45.3618620Z Sep 29 02:10:45 > 2022-09-29T02:10:45.3619425Z Sep 29 02:10:45 >
[jira] [Commented] (FLINK-29461) ProcessDataStreamStreamingTests.test_process_function unstable
[ https://issues.apache.org/jira/browse/FLINK-29461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17643194#comment-17643194 ] Matthias Pohl commented on FLINK-29461: --- https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=43692=logs=bf5e383b-9fd3-5f02-ca1c-8f788e2e76d3=85189c57-d8a0-5c9c-b61d-fc05cfac62cf=26932 > ProcessDataStreamStreamingTests.test_process_function unstable > -- > > Key: FLINK-29461 > URL: https://issues.apache.org/jira/browse/FLINK-29461 > Project: Flink > Issue Type: Bug > Components: API / Python >Affects Versions: 1.16.0, 1.17.0 >Reporter: Huang Xingbo >Assignee: Huang Xingbo >Priority: Critical > Labels: test-stability > > {code:java} > 2022-09-29T02:10:45.3571648Z Sep 29 02:10:45 self = > testMethod=test_process_function> > 2022-09-29T02:10:45.3572279Z Sep 29 02:10:45 > 2022-09-29T02:10:45.3572810Z Sep 29 02:10:45 def > test_process_function(self): > 2022-09-29T02:10:45.3573495Z Sep 29 02:10:45 > self.env.set_parallelism(1) > 2022-09-29T02:10:45.3574148Z Sep 29 02:10:45 > self.env.get_config().set_auto_watermark_interval(2000) > 2022-09-29T02:10:45.3580634Z Sep 29 02:10:45 > self.env.set_stream_time_characteristic(TimeCharacteristic.EventTime) > 2022-09-29T02:10:45.3583194Z Sep 29 02:10:45 data_stream = > self.env.from_collection([(1, '1603708211000'), > 2022-09-29T02:10:45.3584515Z Sep 29 02:10:45 >(2, '1603708224000'), > 2022-09-29T02:10:45.3585957Z Sep 29 02:10:45 >(3, '1603708226000'), > 2022-09-29T02:10:45.3587132Z Sep 29 02:10:45 >(4, '1603708289000')], > 2022-09-29T02:10:45.3588094Z Sep 29 02:10:45 > type_info=Types.ROW([Types.INT(), Types.STRING()])) > 2022-09-29T02:10:45.3589090Z Sep 29 02:10:45 > 2022-09-29T02:10:45.3589949Z Sep 29 02:10:45 class > MyProcessFunction(ProcessFunction): > 2022-09-29T02:10:45.3590710Z Sep 29 02:10:45 > 2022-09-29T02:10:45.3591856Z Sep 29 02:10:45 def > process_element(self, value, ctx): > 2022-09-29T02:10:45.3592873Z Sep 29 02:10:45 > current_timestamp = ctx.timestamp() > 2022-09-29T02:10:45.3593862Z Sep 29 02:10:45 > current_watermark = ctx.timer_service().current_watermark() > 2022-09-29T02:10:45.3594915Z Sep 29 02:10:45 yield "current > timestamp: {}, current watermark: {}, current_value: {}"\ > 2022-09-29T02:10:45.3596201Z Sep 29 02:10:45 > .format(str(current_timestamp), str(current_watermark), str(value)) > 2022-09-29T02:10:45.3597089Z Sep 29 02:10:45 > 2022-09-29T02:10:45.3597942Z Sep 29 02:10:45 watermark_strategy = > WatermarkStrategy.for_monotonous_timestamps()\ > 2022-09-29T02:10:45.3599260Z Sep 29 02:10:45 > .with_timestamp_assigner(SecondColumnTimestampAssigner()) > 2022-09-29T02:10:45.3600611Z Sep 29 02:10:45 > data_stream.assign_timestamps_and_watermarks(watermark_strategy)\ > 2022-09-29T02:10:45.3601877Z Sep 29 02:10:45 > .process(MyProcessFunction(), > output_type=Types.STRING()).add_sink(self.test_sink) > 2022-09-29T02:10:45.3603527Z Sep 29 02:10:45 self.env.execute('test > process function') > 2022-09-29T02:10:45.3604445Z Sep 29 02:10:45 results = > self.test_sink.get_results() > 2022-09-29T02:10:45.3605684Z Sep 29 02:10:45 expected = ["current > timestamp: 1603708211000, current watermark: " > 2022-09-29T02:10:45.3607157Z Sep 29 02:10:45 > "-9223372036854775808, current_value: Row(f0=1, f1='1603708211000')", > 2022-09-29T02:10:45.3608256Z Sep 29 02:10:45 "current > timestamp: 1603708224000, current watermark: " > 2022-09-29T02:10:45.3609650Z Sep 29 02:10:45 > "-9223372036854775808, current_value: Row(f0=2, f1='1603708224000')", > 2022-09-29T02:10:45.3610854Z Sep 29 02:10:45 "current > timestamp: 1603708226000, current watermark: " > 2022-09-29T02:10:45.3612279Z Sep 29 02:10:45 > "-9223372036854775808, current_value: Row(f0=3, f1='1603708226000')", > 2022-09-29T02:10:45.3613382Z Sep 29 02:10:45 "current > timestamp: 1603708289000, current watermark: " > 2022-09-29T02:10:45.3615683Z Sep 29 02:10:45 > "-9223372036854775808, current_value: Row(f0=4, f1='1603708289000')"] > 2022-09-29T02:10:45.3617687Z Sep 29 02:10:45 > > self.assert_equals_sorted(expected, results) > 2022-09-29T02:10:45.3618620Z Sep 29 02:10:45 > 2022-09-29T02:10:45.3619425Z Sep 29 02:10:45 >
[jira] [Commented] (FLINK-29461) ProcessDataStreamStreamingTests.test_process_function unstable
[ https://issues.apache.org/jira/browse/FLINK-29461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17642448#comment-17642448 ] Matthias Pohl commented on FLINK-29461: --- https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=43664=logs=3e4dd1a2-fe2f-5e5d-a581-48087e718d53=b4612f28-e3b5-5853-8a8b-610ae894217a=28979 > ProcessDataStreamStreamingTests.test_process_function unstable > -- > > Key: FLINK-29461 > URL: https://issues.apache.org/jira/browse/FLINK-29461 > Project: Flink > Issue Type: Bug > Components: API / Python >Affects Versions: 1.16.0, 1.17.0 >Reporter: Huang Xingbo >Assignee: Huang Xingbo >Priority: Critical > Labels: test-stability > > {code:java} > 2022-09-29T02:10:45.3571648Z Sep 29 02:10:45 self = > testMethod=test_process_function> > 2022-09-29T02:10:45.3572279Z Sep 29 02:10:45 > 2022-09-29T02:10:45.3572810Z Sep 29 02:10:45 def > test_process_function(self): > 2022-09-29T02:10:45.3573495Z Sep 29 02:10:45 > self.env.set_parallelism(1) > 2022-09-29T02:10:45.3574148Z Sep 29 02:10:45 > self.env.get_config().set_auto_watermark_interval(2000) > 2022-09-29T02:10:45.3580634Z Sep 29 02:10:45 > self.env.set_stream_time_characteristic(TimeCharacteristic.EventTime) > 2022-09-29T02:10:45.3583194Z Sep 29 02:10:45 data_stream = > self.env.from_collection([(1, '1603708211000'), > 2022-09-29T02:10:45.3584515Z Sep 29 02:10:45 >(2, '1603708224000'), > 2022-09-29T02:10:45.3585957Z Sep 29 02:10:45 >(3, '1603708226000'), > 2022-09-29T02:10:45.3587132Z Sep 29 02:10:45 >(4, '1603708289000')], > 2022-09-29T02:10:45.3588094Z Sep 29 02:10:45 > type_info=Types.ROW([Types.INT(), Types.STRING()])) > 2022-09-29T02:10:45.3589090Z Sep 29 02:10:45 > 2022-09-29T02:10:45.3589949Z Sep 29 02:10:45 class > MyProcessFunction(ProcessFunction): > 2022-09-29T02:10:45.3590710Z Sep 29 02:10:45 > 2022-09-29T02:10:45.3591856Z Sep 29 02:10:45 def > process_element(self, value, ctx): > 2022-09-29T02:10:45.3592873Z Sep 29 02:10:45 > current_timestamp = ctx.timestamp() > 2022-09-29T02:10:45.3593862Z Sep 29 02:10:45 > current_watermark = ctx.timer_service().current_watermark() > 2022-09-29T02:10:45.3594915Z Sep 29 02:10:45 yield "current > timestamp: {}, current watermark: {}, current_value: {}"\ > 2022-09-29T02:10:45.3596201Z Sep 29 02:10:45 > .format(str(current_timestamp), str(current_watermark), str(value)) > 2022-09-29T02:10:45.3597089Z Sep 29 02:10:45 > 2022-09-29T02:10:45.3597942Z Sep 29 02:10:45 watermark_strategy = > WatermarkStrategy.for_monotonous_timestamps()\ > 2022-09-29T02:10:45.3599260Z Sep 29 02:10:45 > .with_timestamp_assigner(SecondColumnTimestampAssigner()) > 2022-09-29T02:10:45.3600611Z Sep 29 02:10:45 > data_stream.assign_timestamps_and_watermarks(watermark_strategy)\ > 2022-09-29T02:10:45.3601877Z Sep 29 02:10:45 > .process(MyProcessFunction(), > output_type=Types.STRING()).add_sink(self.test_sink) > 2022-09-29T02:10:45.3603527Z Sep 29 02:10:45 self.env.execute('test > process function') > 2022-09-29T02:10:45.3604445Z Sep 29 02:10:45 results = > self.test_sink.get_results() > 2022-09-29T02:10:45.3605684Z Sep 29 02:10:45 expected = ["current > timestamp: 1603708211000, current watermark: " > 2022-09-29T02:10:45.3607157Z Sep 29 02:10:45 > "-9223372036854775808, current_value: Row(f0=1, f1='1603708211000')", > 2022-09-29T02:10:45.3608256Z Sep 29 02:10:45 "current > timestamp: 1603708224000, current watermark: " > 2022-09-29T02:10:45.3609650Z Sep 29 02:10:45 > "-9223372036854775808, current_value: Row(f0=2, f1='1603708224000')", > 2022-09-29T02:10:45.3610854Z Sep 29 02:10:45 "current > timestamp: 1603708226000, current watermark: " > 2022-09-29T02:10:45.3612279Z Sep 29 02:10:45 > "-9223372036854775808, current_value: Row(f0=3, f1='1603708226000')", > 2022-09-29T02:10:45.3613382Z Sep 29 02:10:45 "current > timestamp: 1603708289000, current watermark: " > 2022-09-29T02:10:45.3615683Z Sep 29 02:10:45 > "-9223372036854775808, current_value: Row(f0=4, f1='1603708289000')"] > 2022-09-29T02:10:45.3617687Z Sep 29 02:10:45 > > self.assert_equals_sorted(expected, results) > 2022-09-29T02:10:45.3618620Z Sep 29 02:10:45 > 2022-09-29T02:10:45.3619425Z Sep 29 02:10:45 >
[jira] [Commented] (FLINK-29461) ProcessDataStreamStreamingTests.test_process_function unstable
[ https://issues.apache.org/jira/browse/FLINK-29461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17642430#comment-17642430 ] Matthias Pohl commented on FLINK-29461: --- https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=43662=logs=e92ecf6d-e207-5a42-7ff7-528ff0c5b259=40fc352e-9b4c-5fd8-363f-628f24b01ec2=28078 > ProcessDataStreamStreamingTests.test_process_function unstable > -- > > Key: FLINK-29461 > URL: https://issues.apache.org/jira/browse/FLINK-29461 > Project: Flink > Issue Type: Bug > Components: API / Python >Affects Versions: 1.16.0, 1.17.0 >Reporter: Huang Xingbo >Assignee: Huang Xingbo >Priority: Critical > Labels: test-stability > > {code:java} > 2022-09-29T02:10:45.3571648Z Sep 29 02:10:45 self = > testMethod=test_process_function> > 2022-09-29T02:10:45.3572279Z Sep 29 02:10:45 > 2022-09-29T02:10:45.3572810Z Sep 29 02:10:45 def > test_process_function(self): > 2022-09-29T02:10:45.3573495Z Sep 29 02:10:45 > self.env.set_parallelism(1) > 2022-09-29T02:10:45.3574148Z Sep 29 02:10:45 > self.env.get_config().set_auto_watermark_interval(2000) > 2022-09-29T02:10:45.3580634Z Sep 29 02:10:45 > self.env.set_stream_time_characteristic(TimeCharacteristic.EventTime) > 2022-09-29T02:10:45.3583194Z Sep 29 02:10:45 data_stream = > self.env.from_collection([(1, '1603708211000'), > 2022-09-29T02:10:45.3584515Z Sep 29 02:10:45 >(2, '1603708224000'), > 2022-09-29T02:10:45.3585957Z Sep 29 02:10:45 >(3, '1603708226000'), > 2022-09-29T02:10:45.3587132Z Sep 29 02:10:45 >(4, '1603708289000')], > 2022-09-29T02:10:45.3588094Z Sep 29 02:10:45 > type_info=Types.ROW([Types.INT(), Types.STRING()])) > 2022-09-29T02:10:45.3589090Z Sep 29 02:10:45 > 2022-09-29T02:10:45.3589949Z Sep 29 02:10:45 class > MyProcessFunction(ProcessFunction): > 2022-09-29T02:10:45.3590710Z Sep 29 02:10:45 > 2022-09-29T02:10:45.3591856Z Sep 29 02:10:45 def > process_element(self, value, ctx): > 2022-09-29T02:10:45.3592873Z Sep 29 02:10:45 > current_timestamp = ctx.timestamp() > 2022-09-29T02:10:45.3593862Z Sep 29 02:10:45 > current_watermark = ctx.timer_service().current_watermark() > 2022-09-29T02:10:45.3594915Z Sep 29 02:10:45 yield "current > timestamp: {}, current watermark: {}, current_value: {}"\ > 2022-09-29T02:10:45.3596201Z Sep 29 02:10:45 > .format(str(current_timestamp), str(current_watermark), str(value)) > 2022-09-29T02:10:45.3597089Z Sep 29 02:10:45 > 2022-09-29T02:10:45.3597942Z Sep 29 02:10:45 watermark_strategy = > WatermarkStrategy.for_monotonous_timestamps()\ > 2022-09-29T02:10:45.3599260Z Sep 29 02:10:45 > .with_timestamp_assigner(SecondColumnTimestampAssigner()) > 2022-09-29T02:10:45.3600611Z Sep 29 02:10:45 > data_stream.assign_timestamps_and_watermarks(watermark_strategy)\ > 2022-09-29T02:10:45.3601877Z Sep 29 02:10:45 > .process(MyProcessFunction(), > output_type=Types.STRING()).add_sink(self.test_sink) > 2022-09-29T02:10:45.3603527Z Sep 29 02:10:45 self.env.execute('test > process function') > 2022-09-29T02:10:45.3604445Z Sep 29 02:10:45 results = > self.test_sink.get_results() > 2022-09-29T02:10:45.3605684Z Sep 29 02:10:45 expected = ["current > timestamp: 1603708211000, current watermark: " > 2022-09-29T02:10:45.3607157Z Sep 29 02:10:45 > "-9223372036854775808, current_value: Row(f0=1, f1='1603708211000')", > 2022-09-29T02:10:45.3608256Z Sep 29 02:10:45 "current > timestamp: 1603708224000, current watermark: " > 2022-09-29T02:10:45.3609650Z Sep 29 02:10:45 > "-9223372036854775808, current_value: Row(f0=2, f1='1603708224000')", > 2022-09-29T02:10:45.3610854Z Sep 29 02:10:45 "current > timestamp: 1603708226000, current watermark: " > 2022-09-29T02:10:45.3612279Z Sep 29 02:10:45 > "-9223372036854775808, current_value: Row(f0=3, f1='1603708226000')", > 2022-09-29T02:10:45.3613382Z Sep 29 02:10:45 "current > timestamp: 1603708289000, current watermark: " > 2022-09-29T02:10:45.3615683Z Sep 29 02:10:45 > "-9223372036854775808, current_value: Row(f0=4, f1='1603708289000')"] > 2022-09-29T02:10:45.3617687Z Sep 29 02:10:45 > > self.assert_equals_sorted(expected, results) > 2022-09-29T02:10:45.3618620Z Sep 29 02:10:45 > 2022-09-29T02:10:45.3619425Z Sep 29 02:10:45 >
[jira] [Commented] (FLINK-29461) ProcessDataStreamStreamingTests.test_process_function unstable
[ https://issues.apache.org/jira/browse/FLINK-29461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17641775#comment-17641775 ] Martijn Visser commented on FLINK-29461: https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=43636=logs=e92ecf6d-e207-5a42-7ff7-528ff0c5b259=40fc352e-9b4c-5fd8-363f-628f24b01ec2=27587 > ProcessDataStreamStreamingTests.test_process_function unstable > -- > > Key: FLINK-29461 > URL: https://issues.apache.org/jira/browse/FLINK-29461 > Project: Flink > Issue Type: Bug > Components: API / Python >Affects Versions: 1.16.0, 1.17.0 >Reporter: Huang Xingbo >Assignee: Huang Xingbo >Priority: Critical > Labels: test-stability > > {code:java} > 2022-09-29T02:10:45.3571648Z Sep 29 02:10:45 self = > testMethod=test_process_function> > 2022-09-29T02:10:45.3572279Z Sep 29 02:10:45 > 2022-09-29T02:10:45.3572810Z Sep 29 02:10:45 def > test_process_function(self): > 2022-09-29T02:10:45.3573495Z Sep 29 02:10:45 > self.env.set_parallelism(1) > 2022-09-29T02:10:45.3574148Z Sep 29 02:10:45 > self.env.get_config().set_auto_watermark_interval(2000) > 2022-09-29T02:10:45.3580634Z Sep 29 02:10:45 > self.env.set_stream_time_characteristic(TimeCharacteristic.EventTime) > 2022-09-29T02:10:45.3583194Z Sep 29 02:10:45 data_stream = > self.env.from_collection([(1, '1603708211000'), > 2022-09-29T02:10:45.3584515Z Sep 29 02:10:45 >(2, '1603708224000'), > 2022-09-29T02:10:45.3585957Z Sep 29 02:10:45 >(3, '1603708226000'), > 2022-09-29T02:10:45.3587132Z Sep 29 02:10:45 >(4, '1603708289000')], > 2022-09-29T02:10:45.3588094Z Sep 29 02:10:45 > type_info=Types.ROW([Types.INT(), Types.STRING()])) > 2022-09-29T02:10:45.3589090Z Sep 29 02:10:45 > 2022-09-29T02:10:45.3589949Z Sep 29 02:10:45 class > MyProcessFunction(ProcessFunction): > 2022-09-29T02:10:45.3590710Z Sep 29 02:10:45 > 2022-09-29T02:10:45.3591856Z Sep 29 02:10:45 def > process_element(self, value, ctx): > 2022-09-29T02:10:45.3592873Z Sep 29 02:10:45 > current_timestamp = ctx.timestamp() > 2022-09-29T02:10:45.3593862Z Sep 29 02:10:45 > current_watermark = ctx.timer_service().current_watermark() > 2022-09-29T02:10:45.3594915Z Sep 29 02:10:45 yield "current > timestamp: {}, current watermark: {}, current_value: {}"\ > 2022-09-29T02:10:45.3596201Z Sep 29 02:10:45 > .format(str(current_timestamp), str(current_watermark), str(value)) > 2022-09-29T02:10:45.3597089Z Sep 29 02:10:45 > 2022-09-29T02:10:45.3597942Z Sep 29 02:10:45 watermark_strategy = > WatermarkStrategy.for_monotonous_timestamps()\ > 2022-09-29T02:10:45.3599260Z Sep 29 02:10:45 > .with_timestamp_assigner(SecondColumnTimestampAssigner()) > 2022-09-29T02:10:45.3600611Z Sep 29 02:10:45 > data_stream.assign_timestamps_and_watermarks(watermark_strategy)\ > 2022-09-29T02:10:45.3601877Z Sep 29 02:10:45 > .process(MyProcessFunction(), > output_type=Types.STRING()).add_sink(self.test_sink) > 2022-09-29T02:10:45.3603527Z Sep 29 02:10:45 self.env.execute('test > process function') > 2022-09-29T02:10:45.3604445Z Sep 29 02:10:45 results = > self.test_sink.get_results() > 2022-09-29T02:10:45.3605684Z Sep 29 02:10:45 expected = ["current > timestamp: 1603708211000, current watermark: " > 2022-09-29T02:10:45.3607157Z Sep 29 02:10:45 > "-9223372036854775808, current_value: Row(f0=1, f1='1603708211000')", > 2022-09-29T02:10:45.3608256Z Sep 29 02:10:45 "current > timestamp: 1603708224000, current watermark: " > 2022-09-29T02:10:45.3609650Z Sep 29 02:10:45 > "-9223372036854775808, current_value: Row(f0=2, f1='1603708224000')", > 2022-09-29T02:10:45.3610854Z Sep 29 02:10:45 "current > timestamp: 1603708226000, current watermark: " > 2022-09-29T02:10:45.3612279Z Sep 29 02:10:45 > "-9223372036854775808, current_value: Row(f0=3, f1='1603708226000')", > 2022-09-29T02:10:45.3613382Z Sep 29 02:10:45 "current > timestamp: 1603708289000, current watermark: " > 2022-09-29T02:10:45.3615683Z Sep 29 02:10:45 > "-9223372036854775808, current_value: Row(f0=4, f1='1603708289000')"] > 2022-09-29T02:10:45.3617687Z Sep 29 02:10:45 > > self.assert_equals_sorted(expected, results) > 2022-09-29T02:10:45.3618620Z Sep 29 02:10:45 > 2022-09-29T02:10:45.3619425Z Sep 29 02:10:45 >
[jira] [Commented] (FLINK-29461) ProcessDataStreamStreamingTests.test_process_function unstable
[ https://issues.apache.org/jira/browse/FLINK-29461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17641099#comment-17641099 ] Matthias Pohl commented on FLINK-29461: --- No worries. Thanks for picking it up. :) https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=43604=logs=e92ecf6d-e207-5a42-7ff7-528ff0c5b259=40fc352e-9b4c-5fd8-363f-628f24b01ec2=29496 > ProcessDataStreamStreamingTests.test_process_function unstable > -- > > Key: FLINK-29461 > URL: https://issues.apache.org/jira/browse/FLINK-29461 > Project: Flink > Issue Type: Bug > Components: API / Python >Affects Versions: 1.16.0, 1.17.0 >Reporter: Huang Xingbo >Assignee: Huang Xingbo >Priority: Critical > Labels: test-stability > > {code:java} > 2022-09-29T02:10:45.3571648Z Sep 29 02:10:45 self = > testMethod=test_process_function> > 2022-09-29T02:10:45.3572279Z Sep 29 02:10:45 > 2022-09-29T02:10:45.3572810Z Sep 29 02:10:45 def > test_process_function(self): > 2022-09-29T02:10:45.3573495Z Sep 29 02:10:45 > self.env.set_parallelism(1) > 2022-09-29T02:10:45.3574148Z Sep 29 02:10:45 > self.env.get_config().set_auto_watermark_interval(2000) > 2022-09-29T02:10:45.3580634Z Sep 29 02:10:45 > self.env.set_stream_time_characteristic(TimeCharacteristic.EventTime) > 2022-09-29T02:10:45.3583194Z Sep 29 02:10:45 data_stream = > self.env.from_collection([(1, '1603708211000'), > 2022-09-29T02:10:45.3584515Z Sep 29 02:10:45 >(2, '1603708224000'), > 2022-09-29T02:10:45.3585957Z Sep 29 02:10:45 >(3, '1603708226000'), > 2022-09-29T02:10:45.3587132Z Sep 29 02:10:45 >(4, '1603708289000')], > 2022-09-29T02:10:45.3588094Z Sep 29 02:10:45 > type_info=Types.ROW([Types.INT(), Types.STRING()])) > 2022-09-29T02:10:45.3589090Z Sep 29 02:10:45 > 2022-09-29T02:10:45.3589949Z Sep 29 02:10:45 class > MyProcessFunction(ProcessFunction): > 2022-09-29T02:10:45.3590710Z Sep 29 02:10:45 > 2022-09-29T02:10:45.3591856Z Sep 29 02:10:45 def > process_element(self, value, ctx): > 2022-09-29T02:10:45.3592873Z Sep 29 02:10:45 > current_timestamp = ctx.timestamp() > 2022-09-29T02:10:45.3593862Z Sep 29 02:10:45 > current_watermark = ctx.timer_service().current_watermark() > 2022-09-29T02:10:45.3594915Z Sep 29 02:10:45 yield "current > timestamp: {}, current watermark: {}, current_value: {}"\ > 2022-09-29T02:10:45.3596201Z Sep 29 02:10:45 > .format(str(current_timestamp), str(current_watermark), str(value)) > 2022-09-29T02:10:45.3597089Z Sep 29 02:10:45 > 2022-09-29T02:10:45.3597942Z Sep 29 02:10:45 watermark_strategy = > WatermarkStrategy.for_monotonous_timestamps()\ > 2022-09-29T02:10:45.3599260Z Sep 29 02:10:45 > .with_timestamp_assigner(SecondColumnTimestampAssigner()) > 2022-09-29T02:10:45.3600611Z Sep 29 02:10:45 > data_stream.assign_timestamps_and_watermarks(watermark_strategy)\ > 2022-09-29T02:10:45.3601877Z Sep 29 02:10:45 > .process(MyProcessFunction(), > output_type=Types.STRING()).add_sink(self.test_sink) > 2022-09-29T02:10:45.3603527Z Sep 29 02:10:45 self.env.execute('test > process function') > 2022-09-29T02:10:45.3604445Z Sep 29 02:10:45 results = > self.test_sink.get_results() > 2022-09-29T02:10:45.3605684Z Sep 29 02:10:45 expected = ["current > timestamp: 1603708211000, current watermark: " > 2022-09-29T02:10:45.3607157Z Sep 29 02:10:45 > "-9223372036854775808, current_value: Row(f0=1, f1='1603708211000')", > 2022-09-29T02:10:45.3608256Z Sep 29 02:10:45 "current > timestamp: 1603708224000, current watermark: " > 2022-09-29T02:10:45.3609650Z Sep 29 02:10:45 > "-9223372036854775808, current_value: Row(f0=2, f1='1603708224000')", > 2022-09-29T02:10:45.3610854Z Sep 29 02:10:45 "current > timestamp: 1603708226000, current watermark: " > 2022-09-29T02:10:45.3612279Z Sep 29 02:10:45 > "-9223372036854775808, current_value: Row(f0=3, f1='1603708226000')", > 2022-09-29T02:10:45.3613382Z Sep 29 02:10:45 "current > timestamp: 1603708289000, current watermark: " > 2022-09-29T02:10:45.3615683Z Sep 29 02:10:45 > "-9223372036854775808, current_value: Row(f0=4, f1='1603708289000')"] > 2022-09-29T02:10:45.3617687Z Sep 29 02:10:45 > > self.assert_equals_sorted(expected, results) > 2022-09-29T02:10:45.3618620Z Sep 29 02:10:45 > 2022-09-29T02:10:45.3619425Z Sep 29 02:10:45
[jira] [Commented] (FLINK-29461) ProcessDataStreamStreamingTests.test_process_function unstable
[ https://issues.apache.org/jira/browse/FLINK-29461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17640473#comment-17640473 ] Xingbo Huang commented on FLINK-29461: -- [~mapohl] Sorry for the late reply, I will take a look into this issue in these two days. > ProcessDataStreamStreamingTests.test_process_function unstable > -- > > Key: FLINK-29461 > URL: https://issues.apache.org/jira/browse/FLINK-29461 > Project: Flink > Issue Type: Bug > Components: API / Python >Affects Versions: 1.16.0, 1.17.0 >Reporter: Huang Xingbo >Assignee: Huang Xingbo >Priority: Critical > Labels: test-stability > > {code:java} > 2022-09-29T02:10:45.3571648Z Sep 29 02:10:45 self = > testMethod=test_process_function> > 2022-09-29T02:10:45.3572279Z Sep 29 02:10:45 > 2022-09-29T02:10:45.3572810Z Sep 29 02:10:45 def > test_process_function(self): > 2022-09-29T02:10:45.3573495Z Sep 29 02:10:45 > self.env.set_parallelism(1) > 2022-09-29T02:10:45.3574148Z Sep 29 02:10:45 > self.env.get_config().set_auto_watermark_interval(2000) > 2022-09-29T02:10:45.3580634Z Sep 29 02:10:45 > self.env.set_stream_time_characteristic(TimeCharacteristic.EventTime) > 2022-09-29T02:10:45.3583194Z Sep 29 02:10:45 data_stream = > self.env.from_collection([(1, '1603708211000'), > 2022-09-29T02:10:45.3584515Z Sep 29 02:10:45 >(2, '1603708224000'), > 2022-09-29T02:10:45.3585957Z Sep 29 02:10:45 >(3, '1603708226000'), > 2022-09-29T02:10:45.3587132Z Sep 29 02:10:45 >(4, '1603708289000')], > 2022-09-29T02:10:45.3588094Z Sep 29 02:10:45 > type_info=Types.ROW([Types.INT(), Types.STRING()])) > 2022-09-29T02:10:45.3589090Z Sep 29 02:10:45 > 2022-09-29T02:10:45.3589949Z Sep 29 02:10:45 class > MyProcessFunction(ProcessFunction): > 2022-09-29T02:10:45.3590710Z Sep 29 02:10:45 > 2022-09-29T02:10:45.3591856Z Sep 29 02:10:45 def > process_element(self, value, ctx): > 2022-09-29T02:10:45.3592873Z Sep 29 02:10:45 > current_timestamp = ctx.timestamp() > 2022-09-29T02:10:45.3593862Z Sep 29 02:10:45 > current_watermark = ctx.timer_service().current_watermark() > 2022-09-29T02:10:45.3594915Z Sep 29 02:10:45 yield "current > timestamp: {}, current watermark: {}, current_value: {}"\ > 2022-09-29T02:10:45.3596201Z Sep 29 02:10:45 > .format(str(current_timestamp), str(current_watermark), str(value)) > 2022-09-29T02:10:45.3597089Z Sep 29 02:10:45 > 2022-09-29T02:10:45.3597942Z Sep 29 02:10:45 watermark_strategy = > WatermarkStrategy.for_monotonous_timestamps()\ > 2022-09-29T02:10:45.3599260Z Sep 29 02:10:45 > .with_timestamp_assigner(SecondColumnTimestampAssigner()) > 2022-09-29T02:10:45.3600611Z Sep 29 02:10:45 > data_stream.assign_timestamps_and_watermarks(watermark_strategy)\ > 2022-09-29T02:10:45.3601877Z Sep 29 02:10:45 > .process(MyProcessFunction(), > output_type=Types.STRING()).add_sink(self.test_sink) > 2022-09-29T02:10:45.3603527Z Sep 29 02:10:45 self.env.execute('test > process function') > 2022-09-29T02:10:45.3604445Z Sep 29 02:10:45 results = > self.test_sink.get_results() > 2022-09-29T02:10:45.3605684Z Sep 29 02:10:45 expected = ["current > timestamp: 1603708211000, current watermark: " > 2022-09-29T02:10:45.3607157Z Sep 29 02:10:45 > "-9223372036854775808, current_value: Row(f0=1, f1='1603708211000')", > 2022-09-29T02:10:45.3608256Z Sep 29 02:10:45 "current > timestamp: 1603708224000, current watermark: " > 2022-09-29T02:10:45.3609650Z Sep 29 02:10:45 > "-9223372036854775808, current_value: Row(f0=2, f1='1603708224000')", > 2022-09-29T02:10:45.3610854Z Sep 29 02:10:45 "current > timestamp: 1603708226000, current watermark: " > 2022-09-29T02:10:45.3612279Z Sep 29 02:10:45 > "-9223372036854775808, current_value: Row(f0=3, f1='1603708226000')", > 2022-09-29T02:10:45.3613382Z Sep 29 02:10:45 "current > timestamp: 1603708289000, current watermark: " > 2022-09-29T02:10:45.3615683Z Sep 29 02:10:45 > "-9223372036854775808, current_value: Row(f0=4, f1='1603708289000')"] > 2022-09-29T02:10:45.3617687Z Sep 29 02:10:45 > > self.assert_equals_sorted(expected, results) > 2022-09-29T02:10:45.3618620Z Sep 29 02:10:45 > 2022-09-29T02:10:45.3619425Z Sep 29 02:10:45 > pyflink/datastream/tests/test_data_stream.py:986: > 2022-09-29T02:10:45.3620424Z Sep 29 02:10:45 _ _ _ _ _ _ _
[jira] [Commented] (FLINK-29461) ProcessDataStreamStreamingTests.test_process_function unstable
[ https://issues.apache.org/jira/browse/FLINK-29461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17640409#comment-17640409 ] Matthias Pohl commented on FLINK-29461: --- https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=43572=logs=bf5e383b-9fd3-5f02-ca1c-8f788e2e76d3=85189c57-d8a0-5c9c-b61d-fc05cfac62cf=27194 > ProcessDataStreamStreamingTests.test_process_function unstable > -- > > Key: FLINK-29461 > URL: https://issues.apache.org/jira/browse/FLINK-29461 > Project: Flink > Issue Type: Bug > Components: API / Python >Affects Versions: 1.16.0, 1.17.0 >Reporter: Huang Xingbo >Assignee: Huang Xingbo >Priority: Critical > Labels: test-stability > > {code:java} > 2022-09-29T02:10:45.3571648Z Sep 29 02:10:45 self = > testMethod=test_process_function> > 2022-09-29T02:10:45.3572279Z Sep 29 02:10:45 > 2022-09-29T02:10:45.3572810Z Sep 29 02:10:45 def > test_process_function(self): > 2022-09-29T02:10:45.3573495Z Sep 29 02:10:45 > self.env.set_parallelism(1) > 2022-09-29T02:10:45.3574148Z Sep 29 02:10:45 > self.env.get_config().set_auto_watermark_interval(2000) > 2022-09-29T02:10:45.3580634Z Sep 29 02:10:45 > self.env.set_stream_time_characteristic(TimeCharacteristic.EventTime) > 2022-09-29T02:10:45.3583194Z Sep 29 02:10:45 data_stream = > self.env.from_collection([(1, '1603708211000'), > 2022-09-29T02:10:45.3584515Z Sep 29 02:10:45 >(2, '1603708224000'), > 2022-09-29T02:10:45.3585957Z Sep 29 02:10:45 >(3, '1603708226000'), > 2022-09-29T02:10:45.3587132Z Sep 29 02:10:45 >(4, '1603708289000')], > 2022-09-29T02:10:45.3588094Z Sep 29 02:10:45 > type_info=Types.ROW([Types.INT(), Types.STRING()])) > 2022-09-29T02:10:45.3589090Z Sep 29 02:10:45 > 2022-09-29T02:10:45.3589949Z Sep 29 02:10:45 class > MyProcessFunction(ProcessFunction): > 2022-09-29T02:10:45.3590710Z Sep 29 02:10:45 > 2022-09-29T02:10:45.3591856Z Sep 29 02:10:45 def > process_element(self, value, ctx): > 2022-09-29T02:10:45.3592873Z Sep 29 02:10:45 > current_timestamp = ctx.timestamp() > 2022-09-29T02:10:45.3593862Z Sep 29 02:10:45 > current_watermark = ctx.timer_service().current_watermark() > 2022-09-29T02:10:45.3594915Z Sep 29 02:10:45 yield "current > timestamp: {}, current watermark: {}, current_value: {}"\ > 2022-09-29T02:10:45.3596201Z Sep 29 02:10:45 > .format(str(current_timestamp), str(current_watermark), str(value)) > 2022-09-29T02:10:45.3597089Z Sep 29 02:10:45 > 2022-09-29T02:10:45.3597942Z Sep 29 02:10:45 watermark_strategy = > WatermarkStrategy.for_monotonous_timestamps()\ > 2022-09-29T02:10:45.3599260Z Sep 29 02:10:45 > .with_timestamp_assigner(SecondColumnTimestampAssigner()) > 2022-09-29T02:10:45.3600611Z Sep 29 02:10:45 > data_stream.assign_timestamps_and_watermarks(watermark_strategy)\ > 2022-09-29T02:10:45.3601877Z Sep 29 02:10:45 > .process(MyProcessFunction(), > output_type=Types.STRING()).add_sink(self.test_sink) > 2022-09-29T02:10:45.3603527Z Sep 29 02:10:45 self.env.execute('test > process function') > 2022-09-29T02:10:45.3604445Z Sep 29 02:10:45 results = > self.test_sink.get_results() > 2022-09-29T02:10:45.3605684Z Sep 29 02:10:45 expected = ["current > timestamp: 1603708211000, current watermark: " > 2022-09-29T02:10:45.3607157Z Sep 29 02:10:45 > "-9223372036854775808, current_value: Row(f0=1, f1='1603708211000')", > 2022-09-29T02:10:45.3608256Z Sep 29 02:10:45 "current > timestamp: 1603708224000, current watermark: " > 2022-09-29T02:10:45.3609650Z Sep 29 02:10:45 > "-9223372036854775808, current_value: Row(f0=2, f1='1603708224000')", > 2022-09-29T02:10:45.3610854Z Sep 29 02:10:45 "current > timestamp: 1603708226000, current watermark: " > 2022-09-29T02:10:45.3612279Z Sep 29 02:10:45 > "-9223372036854775808, current_value: Row(f0=3, f1='1603708226000')", > 2022-09-29T02:10:45.3613382Z Sep 29 02:10:45 "current > timestamp: 1603708289000, current watermark: " > 2022-09-29T02:10:45.3615683Z Sep 29 02:10:45 > "-9223372036854775808, current_value: Row(f0=4, f1='1603708289000')"] > 2022-09-29T02:10:45.3617687Z Sep 29 02:10:45 > > self.assert_equals_sorted(expected, results) > 2022-09-29T02:10:45.3618620Z Sep 29 02:10:45 > 2022-09-29T02:10:45.3619425Z Sep 29 02:10:45 >
[jira] [Commented] (FLINK-29461) ProcessDataStreamStreamingTests.test_process_function unstable
[ https://issues.apache.org/jira/browse/FLINK-29461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17640019#comment-17640019 ] Matthias Pohl commented on FLINK-29461: --- https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=43541=logs=9cada3cb-c1d3-5621-16da-0f718fb86602=c67e71ed-6451-5d26-8920-5a8cf9651901=26458 > ProcessDataStreamStreamingTests.test_process_function unstable > -- > > Key: FLINK-29461 > URL: https://issues.apache.org/jira/browse/FLINK-29461 > Project: Flink > Issue Type: Bug > Components: API / Python >Affects Versions: 1.16.0, 1.17.0 >Reporter: Huang Xingbo >Assignee: Huang Xingbo >Priority: Critical > Labels: test-stability > > {code:java} > 2022-09-29T02:10:45.3571648Z Sep 29 02:10:45 self = > testMethod=test_process_function> > 2022-09-29T02:10:45.3572279Z Sep 29 02:10:45 > 2022-09-29T02:10:45.3572810Z Sep 29 02:10:45 def > test_process_function(self): > 2022-09-29T02:10:45.3573495Z Sep 29 02:10:45 > self.env.set_parallelism(1) > 2022-09-29T02:10:45.3574148Z Sep 29 02:10:45 > self.env.get_config().set_auto_watermark_interval(2000) > 2022-09-29T02:10:45.3580634Z Sep 29 02:10:45 > self.env.set_stream_time_characteristic(TimeCharacteristic.EventTime) > 2022-09-29T02:10:45.3583194Z Sep 29 02:10:45 data_stream = > self.env.from_collection([(1, '1603708211000'), > 2022-09-29T02:10:45.3584515Z Sep 29 02:10:45 >(2, '1603708224000'), > 2022-09-29T02:10:45.3585957Z Sep 29 02:10:45 >(3, '1603708226000'), > 2022-09-29T02:10:45.3587132Z Sep 29 02:10:45 >(4, '1603708289000')], > 2022-09-29T02:10:45.3588094Z Sep 29 02:10:45 > type_info=Types.ROW([Types.INT(), Types.STRING()])) > 2022-09-29T02:10:45.3589090Z Sep 29 02:10:45 > 2022-09-29T02:10:45.3589949Z Sep 29 02:10:45 class > MyProcessFunction(ProcessFunction): > 2022-09-29T02:10:45.3590710Z Sep 29 02:10:45 > 2022-09-29T02:10:45.3591856Z Sep 29 02:10:45 def > process_element(self, value, ctx): > 2022-09-29T02:10:45.3592873Z Sep 29 02:10:45 > current_timestamp = ctx.timestamp() > 2022-09-29T02:10:45.3593862Z Sep 29 02:10:45 > current_watermark = ctx.timer_service().current_watermark() > 2022-09-29T02:10:45.3594915Z Sep 29 02:10:45 yield "current > timestamp: {}, current watermark: {}, current_value: {}"\ > 2022-09-29T02:10:45.3596201Z Sep 29 02:10:45 > .format(str(current_timestamp), str(current_watermark), str(value)) > 2022-09-29T02:10:45.3597089Z Sep 29 02:10:45 > 2022-09-29T02:10:45.3597942Z Sep 29 02:10:45 watermark_strategy = > WatermarkStrategy.for_monotonous_timestamps()\ > 2022-09-29T02:10:45.3599260Z Sep 29 02:10:45 > .with_timestamp_assigner(SecondColumnTimestampAssigner()) > 2022-09-29T02:10:45.3600611Z Sep 29 02:10:45 > data_stream.assign_timestamps_and_watermarks(watermark_strategy)\ > 2022-09-29T02:10:45.3601877Z Sep 29 02:10:45 > .process(MyProcessFunction(), > output_type=Types.STRING()).add_sink(self.test_sink) > 2022-09-29T02:10:45.3603527Z Sep 29 02:10:45 self.env.execute('test > process function') > 2022-09-29T02:10:45.3604445Z Sep 29 02:10:45 results = > self.test_sink.get_results() > 2022-09-29T02:10:45.3605684Z Sep 29 02:10:45 expected = ["current > timestamp: 1603708211000, current watermark: " > 2022-09-29T02:10:45.3607157Z Sep 29 02:10:45 > "-9223372036854775808, current_value: Row(f0=1, f1='1603708211000')", > 2022-09-29T02:10:45.3608256Z Sep 29 02:10:45 "current > timestamp: 1603708224000, current watermark: " > 2022-09-29T02:10:45.3609650Z Sep 29 02:10:45 > "-9223372036854775808, current_value: Row(f0=2, f1='1603708224000')", > 2022-09-29T02:10:45.3610854Z Sep 29 02:10:45 "current > timestamp: 1603708226000, current watermark: " > 2022-09-29T02:10:45.3612279Z Sep 29 02:10:45 > "-9223372036854775808, current_value: Row(f0=3, f1='1603708226000')", > 2022-09-29T02:10:45.3613382Z Sep 29 02:10:45 "current > timestamp: 1603708289000, current watermark: " > 2022-09-29T02:10:45.3615683Z Sep 29 02:10:45 > "-9223372036854775808, current_value: Row(f0=4, f1='1603708289000')"] > 2022-09-29T02:10:45.3617687Z Sep 29 02:10:45 > > self.assert_equals_sorted(expected, results) > 2022-09-29T02:10:45.3618620Z Sep 29 02:10:45 > 2022-09-29T02:10:45.3619425Z Sep 29 02:10:45 >
[jira] [Commented] (FLINK-29461) ProcessDataStreamStreamingTests.test_process_function unstable
[ https://issues.apache.org/jira/browse/FLINK-29461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17639914#comment-17639914 ] Matthias Pohl commented on FLINK-29461: --- Same build, two failures: * https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=43512=logs=e92ecf6d-e207-5a42-7ff7-528ff0c5b259=40fc352e-9b4c-5fd8-363f-628f24b01ec2=29899 * https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=43512=logs=bf5e383b-9fd3-5f02-ca1c-8f788e2e76d3=85189c57-d8a0-5c9c-b61d-fc05cfac62cf=27869 > ProcessDataStreamStreamingTests.test_process_function unstable > -- > > Key: FLINK-29461 > URL: https://issues.apache.org/jira/browse/FLINK-29461 > Project: Flink > Issue Type: Bug > Components: API / Python >Affects Versions: 1.16.0, 1.17.0 >Reporter: Huang Xingbo >Assignee: Huang Xingbo >Priority: Critical > Labels: test-stability > > {code:java} > 2022-09-29T02:10:45.3571648Z Sep 29 02:10:45 self = > testMethod=test_process_function> > 2022-09-29T02:10:45.3572279Z Sep 29 02:10:45 > 2022-09-29T02:10:45.3572810Z Sep 29 02:10:45 def > test_process_function(self): > 2022-09-29T02:10:45.3573495Z Sep 29 02:10:45 > self.env.set_parallelism(1) > 2022-09-29T02:10:45.3574148Z Sep 29 02:10:45 > self.env.get_config().set_auto_watermark_interval(2000) > 2022-09-29T02:10:45.3580634Z Sep 29 02:10:45 > self.env.set_stream_time_characteristic(TimeCharacteristic.EventTime) > 2022-09-29T02:10:45.3583194Z Sep 29 02:10:45 data_stream = > self.env.from_collection([(1, '1603708211000'), > 2022-09-29T02:10:45.3584515Z Sep 29 02:10:45 >(2, '1603708224000'), > 2022-09-29T02:10:45.3585957Z Sep 29 02:10:45 >(3, '1603708226000'), > 2022-09-29T02:10:45.3587132Z Sep 29 02:10:45 >(4, '1603708289000')], > 2022-09-29T02:10:45.3588094Z Sep 29 02:10:45 > type_info=Types.ROW([Types.INT(), Types.STRING()])) > 2022-09-29T02:10:45.3589090Z Sep 29 02:10:45 > 2022-09-29T02:10:45.3589949Z Sep 29 02:10:45 class > MyProcessFunction(ProcessFunction): > 2022-09-29T02:10:45.3590710Z Sep 29 02:10:45 > 2022-09-29T02:10:45.3591856Z Sep 29 02:10:45 def > process_element(self, value, ctx): > 2022-09-29T02:10:45.3592873Z Sep 29 02:10:45 > current_timestamp = ctx.timestamp() > 2022-09-29T02:10:45.3593862Z Sep 29 02:10:45 > current_watermark = ctx.timer_service().current_watermark() > 2022-09-29T02:10:45.3594915Z Sep 29 02:10:45 yield "current > timestamp: {}, current watermark: {}, current_value: {}"\ > 2022-09-29T02:10:45.3596201Z Sep 29 02:10:45 > .format(str(current_timestamp), str(current_watermark), str(value)) > 2022-09-29T02:10:45.3597089Z Sep 29 02:10:45 > 2022-09-29T02:10:45.3597942Z Sep 29 02:10:45 watermark_strategy = > WatermarkStrategy.for_monotonous_timestamps()\ > 2022-09-29T02:10:45.3599260Z Sep 29 02:10:45 > .with_timestamp_assigner(SecondColumnTimestampAssigner()) > 2022-09-29T02:10:45.3600611Z Sep 29 02:10:45 > data_stream.assign_timestamps_and_watermarks(watermark_strategy)\ > 2022-09-29T02:10:45.3601877Z Sep 29 02:10:45 > .process(MyProcessFunction(), > output_type=Types.STRING()).add_sink(self.test_sink) > 2022-09-29T02:10:45.3603527Z Sep 29 02:10:45 self.env.execute('test > process function') > 2022-09-29T02:10:45.3604445Z Sep 29 02:10:45 results = > self.test_sink.get_results() > 2022-09-29T02:10:45.3605684Z Sep 29 02:10:45 expected = ["current > timestamp: 1603708211000, current watermark: " > 2022-09-29T02:10:45.3607157Z Sep 29 02:10:45 > "-9223372036854775808, current_value: Row(f0=1, f1='1603708211000')", > 2022-09-29T02:10:45.3608256Z Sep 29 02:10:45 "current > timestamp: 1603708224000, current watermark: " > 2022-09-29T02:10:45.3609650Z Sep 29 02:10:45 > "-9223372036854775808, current_value: Row(f0=2, f1='1603708224000')", > 2022-09-29T02:10:45.3610854Z Sep 29 02:10:45 "current > timestamp: 1603708226000, current watermark: " > 2022-09-29T02:10:45.3612279Z Sep 29 02:10:45 > "-9223372036854775808, current_value: Row(f0=3, f1='1603708226000')", > 2022-09-29T02:10:45.3613382Z Sep 29 02:10:45 "current > timestamp: 1603708289000, current watermark: " > 2022-09-29T02:10:45.3615683Z Sep 29 02:10:45 > "-9223372036854775808, current_value: Row(f0=4, f1='1603708289000')"] > 2022-09-29T02:10:45.3617687Z Sep 29 02:10:45
[jira] [Commented] (FLINK-29461) ProcessDataStreamStreamingTests.test_process_function unstable
[ https://issues.apache.org/jira/browse/FLINK-29461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17638491#comment-17638491 ] Matthias Pohl commented on FLINK-29461: --- https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=43483=logs=9cada3cb-c1d3-5621-16da-0f718fb86602=c67e71ed-6451-5d26-8920-5a8cf9651901=28464 > ProcessDataStreamStreamingTests.test_process_function unstable > -- > > Key: FLINK-29461 > URL: https://issues.apache.org/jira/browse/FLINK-29461 > Project: Flink > Issue Type: Bug > Components: API / Python >Affects Versions: 1.16.0, 1.17.0 >Reporter: Huang Xingbo >Assignee: Huang Xingbo >Priority: Critical > Labels: test-stability > > {code:java} > 2022-09-29T02:10:45.3571648Z Sep 29 02:10:45 self = > testMethod=test_process_function> > 2022-09-29T02:10:45.3572279Z Sep 29 02:10:45 > 2022-09-29T02:10:45.3572810Z Sep 29 02:10:45 def > test_process_function(self): > 2022-09-29T02:10:45.3573495Z Sep 29 02:10:45 > self.env.set_parallelism(1) > 2022-09-29T02:10:45.3574148Z Sep 29 02:10:45 > self.env.get_config().set_auto_watermark_interval(2000) > 2022-09-29T02:10:45.3580634Z Sep 29 02:10:45 > self.env.set_stream_time_characteristic(TimeCharacteristic.EventTime) > 2022-09-29T02:10:45.3583194Z Sep 29 02:10:45 data_stream = > self.env.from_collection([(1, '1603708211000'), > 2022-09-29T02:10:45.3584515Z Sep 29 02:10:45 >(2, '1603708224000'), > 2022-09-29T02:10:45.3585957Z Sep 29 02:10:45 >(3, '1603708226000'), > 2022-09-29T02:10:45.3587132Z Sep 29 02:10:45 >(4, '1603708289000')], > 2022-09-29T02:10:45.3588094Z Sep 29 02:10:45 > type_info=Types.ROW([Types.INT(), Types.STRING()])) > 2022-09-29T02:10:45.3589090Z Sep 29 02:10:45 > 2022-09-29T02:10:45.3589949Z Sep 29 02:10:45 class > MyProcessFunction(ProcessFunction): > 2022-09-29T02:10:45.3590710Z Sep 29 02:10:45 > 2022-09-29T02:10:45.3591856Z Sep 29 02:10:45 def > process_element(self, value, ctx): > 2022-09-29T02:10:45.3592873Z Sep 29 02:10:45 > current_timestamp = ctx.timestamp() > 2022-09-29T02:10:45.3593862Z Sep 29 02:10:45 > current_watermark = ctx.timer_service().current_watermark() > 2022-09-29T02:10:45.3594915Z Sep 29 02:10:45 yield "current > timestamp: {}, current watermark: {}, current_value: {}"\ > 2022-09-29T02:10:45.3596201Z Sep 29 02:10:45 > .format(str(current_timestamp), str(current_watermark), str(value)) > 2022-09-29T02:10:45.3597089Z Sep 29 02:10:45 > 2022-09-29T02:10:45.3597942Z Sep 29 02:10:45 watermark_strategy = > WatermarkStrategy.for_monotonous_timestamps()\ > 2022-09-29T02:10:45.3599260Z Sep 29 02:10:45 > .with_timestamp_assigner(SecondColumnTimestampAssigner()) > 2022-09-29T02:10:45.3600611Z Sep 29 02:10:45 > data_stream.assign_timestamps_and_watermarks(watermark_strategy)\ > 2022-09-29T02:10:45.3601877Z Sep 29 02:10:45 > .process(MyProcessFunction(), > output_type=Types.STRING()).add_sink(self.test_sink) > 2022-09-29T02:10:45.3603527Z Sep 29 02:10:45 self.env.execute('test > process function') > 2022-09-29T02:10:45.3604445Z Sep 29 02:10:45 results = > self.test_sink.get_results() > 2022-09-29T02:10:45.3605684Z Sep 29 02:10:45 expected = ["current > timestamp: 1603708211000, current watermark: " > 2022-09-29T02:10:45.3607157Z Sep 29 02:10:45 > "-9223372036854775808, current_value: Row(f0=1, f1='1603708211000')", > 2022-09-29T02:10:45.3608256Z Sep 29 02:10:45 "current > timestamp: 1603708224000, current watermark: " > 2022-09-29T02:10:45.3609650Z Sep 29 02:10:45 > "-9223372036854775808, current_value: Row(f0=2, f1='1603708224000')", > 2022-09-29T02:10:45.3610854Z Sep 29 02:10:45 "current > timestamp: 1603708226000, current watermark: " > 2022-09-29T02:10:45.3612279Z Sep 29 02:10:45 > "-9223372036854775808, current_value: Row(f0=3, f1='1603708226000')", > 2022-09-29T02:10:45.3613382Z Sep 29 02:10:45 "current > timestamp: 1603708289000, current watermark: " > 2022-09-29T02:10:45.3615683Z Sep 29 02:10:45 > "-9223372036854775808, current_value: Row(f0=4, f1='1603708289000')"] > 2022-09-29T02:10:45.3617687Z Sep 29 02:10:45 > > self.assert_equals_sorted(expected, results) > 2022-09-29T02:10:45.3618620Z Sep 29 02:10:45 > 2022-09-29T02:10:45.3619425Z Sep 29 02:10:45 >
[jira] [Commented] (FLINK-29461) ProcessDataStreamStreamingTests.test_process_function unstable
[ https://issues.apache.org/jira/browse/FLINK-29461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17638140#comment-17638140 ] Matthias Pohl commented on FLINK-29461: --- https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=43436=logs=3e4dd1a2-fe2f-5e5d-a581-48087e718d53=b4612f28-e3b5-5853-8a8b-610ae894217a=27416 > ProcessDataStreamStreamingTests.test_process_function unstable > -- > > Key: FLINK-29461 > URL: https://issues.apache.org/jira/browse/FLINK-29461 > Project: Flink > Issue Type: Bug > Components: API / Python >Affects Versions: 1.16.0, 1.17.0 >Reporter: Huang Xingbo >Assignee: Huang Xingbo >Priority: Critical > Labels: test-stability > > {code:java} > 2022-09-29T02:10:45.3571648Z Sep 29 02:10:45 self = > testMethod=test_process_function> > 2022-09-29T02:10:45.3572279Z Sep 29 02:10:45 > 2022-09-29T02:10:45.3572810Z Sep 29 02:10:45 def > test_process_function(self): > 2022-09-29T02:10:45.3573495Z Sep 29 02:10:45 > self.env.set_parallelism(1) > 2022-09-29T02:10:45.3574148Z Sep 29 02:10:45 > self.env.get_config().set_auto_watermark_interval(2000) > 2022-09-29T02:10:45.3580634Z Sep 29 02:10:45 > self.env.set_stream_time_characteristic(TimeCharacteristic.EventTime) > 2022-09-29T02:10:45.3583194Z Sep 29 02:10:45 data_stream = > self.env.from_collection([(1, '1603708211000'), > 2022-09-29T02:10:45.3584515Z Sep 29 02:10:45 >(2, '1603708224000'), > 2022-09-29T02:10:45.3585957Z Sep 29 02:10:45 >(3, '1603708226000'), > 2022-09-29T02:10:45.3587132Z Sep 29 02:10:45 >(4, '1603708289000')], > 2022-09-29T02:10:45.3588094Z Sep 29 02:10:45 > type_info=Types.ROW([Types.INT(), Types.STRING()])) > 2022-09-29T02:10:45.3589090Z Sep 29 02:10:45 > 2022-09-29T02:10:45.3589949Z Sep 29 02:10:45 class > MyProcessFunction(ProcessFunction): > 2022-09-29T02:10:45.3590710Z Sep 29 02:10:45 > 2022-09-29T02:10:45.3591856Z Sep 29 02:10:45 def > process_element(self, value, ctx): > 2022-09-29T02:10:45.3592873Z Sep 29 02:10:45 > current_timestamp = ctx.timestamp() > 2022-09-29T02:10:45.3593862Z Sep 29 02:10:45 > current_watermark = ctx.timer_service().current_watermark() > 2022-09-29T02:10:45.3594915Z Sep 29 02:10:45 yield "current > timestamp: {}, current watermark: {}, current_value: {}"\ > 2022-09-29T02:10:45.3596201Z Sep 29 02:10:45 > .format(str(current_timestamp), str(current_watermark), str(value)) > 2022-09-29T02:10:45.3597089Z Sep 29 02:10:45 > 2022-09-29T02:10:45.3597942Z Sep 29 02:10:45 watermark_strategy = > WatermarkStrategy.for_monotonous_timestamps()\ > 2022-09-29T02:10:45.3599260Z Sep 29 02:10:45 > .with_timestamp_assigner(SecondColumnTimestampAssigner()) > 2022-09-29T02:10:45.3600611Z Sep 29 02:10:45 > data_stream.assign_timestamps_and_watermarks(watermark_strategy)\ > 2022-09-29T02:10:45.3601877Z Sep 29 02:10:45 > .process(MyProcessFunction(), > output_type=Types.STRING()).add_sink(self.test_sink) > 2022-09-29T02:10:45.3603527Z Sep 29 02:10:45 self.env.execute('test > process function') > 2022-09-29T02:10:45.3604445Z Sep 29 02:10:45 results = > self.test_sink.get_results() > 2022-09-29T02:10:45.3605684Z Sep 29 02:10:45 expected = ["current > timestamp: 1603708211000, current watermark: " > 2022-09-29T02:10:45.3607157Z Sep 29 02:10:45 > "-9223372036854775808, current_value: Row(f0=1, f1='1603708211000')", > 2022-09-29T02:10:45.3608256Z Sep 29 02:10:45 "current > timestamp: 1603708224000, current watermark: " > 2022-09-29T02:10:45.3609650Z Sep 29 02:10:45 > "-9223372036854775808, current_value: Row(f0=2, f1='1603708224000')", > 2022-09-29T02:10:45.3610854Z Sep 29 02:10:45 "current > timestamp: 1603708226000, current watermark: " > 2022-09-29T02:10:45.3612279Z Sep 29 02:10:45 > "-9223372036854775808, current_value: Row(f0=3, f1='1603708226000')", > 2022-09-29T02:10:45.3613382Z Sep 29 02:10:45 "current > timestamp: 1603708289000, current watermark: " > 2022-09-29T02:10:45.3615683Z Sep 29 02:10:45 > "-9223372036854775808, current_value: Row(f0=4, f1='1603708289000')"] > 2022-09-29T02:10:45.3617687Z Sep 29 02:10:45 > > self.assert_equals_sorted(expected, results) > 2022-09-29T02:10:45.3618620Z Sep 29 02:10:45 > 2022-09-29T02:10:45.3619425Z Sep 29 02:10:45 >
[jira] [Commented] (FLINK-29461) ProcessDataStreamStreamingTests.test_process_function unstable
[ https://issues.apache.org/jira/browse/FLINK-29461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17637085#comment-17637085 ] Matthias Pohl commented on FLINK-29461: --- [~hxbks2ks] any updates on this issue? > ProcessDataStreamStreamingTests.test_process_function unstable > -- > > Key: FLINK-29461 > URL: https://issues.apache.org/jira/browse/FLINK-29461 > Project: Flink > Issue Type: Bug > Components: API / Python >Affects Versions: 1.16.0, 1.17.0 >Reporter: Huang Xingbo >Assignee: Huang Xingbo >Priority: Critical > Labels: test-stability > > {code:java} > 2022-09-29T02:10:45.3571648Z Sep 29 02:10:45 self = > testMethod=test_process_function> > 2022-09-29T02:10:45.3572279Z Sep 29 02:10:45 > 2022-09-29T02:10:45.3572810Z Sep 29 02:10:45 def > test_process_function(self): > 2022-09-29T02:10:45.3573495Z Sep 29 02:10:45 > self.env.set_parallelism(1) > 2022-09-29T02:10:45.3574148Z Sep 29 02:10:45 > self.env.get_config().set_auto_watermark_interval(2000) > 2022-09-29T02:10:45.3580634Z Sep 29 02:10:45 > self.env.set_stream_time_characteristic(TimeCharacteristic.EventTime) > 2022-09-29T02:10:45.3583194Z Sep 29 02:10:45 data_stream = > self.env.from_collection([(1, '1603708211000'), > 2022-09-29T02:10:45.3584515Z Sep 29 02:10:45 >(2, '1603708224000'), > 2022-09-29T02:10:45.3585957Z Sep 29 02:10:45 >(3, '1603708226000'), > 2022-09-29T02:10:45.3587132Z Sep 29 02:10:45 >(4, '1603708289000')], > 2022-09-29T02:10:45.3588094Z Sep 29 02:10:45 > type_info=Types.ROW([Types.INT(), Types.STRING()])) > 2022-09-29T02:10:45.3589090Z Sep 29 02:10:45 > 2022-09-29T02:10:45.3589949Z Sep 29 02:10:45 class > MyProcessFunction(ProcessFunction): > 2022-09-29T02:10:45.3590710Z Sep 29 02:10:45 > 2022-09-29T02:10:45.3591856Z Sep 29 02:10:45 def > process_element(self, value, ctx): > 2022-09-29T02:10:45.3592873Z Sep 29 02:10:45 > current_timestamp = ctx.timestamp() > 2022-09-29T02:10:45.3593862Z Sep 29 02:10:45 > current_watermark = ctx.timer_service().current_watermark() > 2022-09-29T02:10:45.3594915Z Sep 29 02:10:45 yield "current > timestamp: {}, current watermark: {}, current_value: {}"\ > 2022-09-29T02:10:45.3596201Z Sep 29 02:10:45 > .format(str(current_timestamp), str(current_watermark), str(value)) > 2022-09-29T02:10:45.3597089Z Sep 29 02:10:45 > 2022-09-29T02:10:45.3597942Z Sep 29 02:10:45 watermark_strategy = > WatermarkStrategy.for_monotonous_timestamps()\ > 2022-09-29T02:10:45.3599260Z Sep 29 02:10:45 > .with_timestamp_assigner(SecondColumnTimestampAssigner()) > 2022-09-29T02:10:45.3600611Z Sep 29 02:10:45 > data_stream.assign_timestamps_and_watermarks(watermark_strategy)\ > 2022-09-29T02:10:45.3601877Z Sep 29 02:10:45 > .process(MyProcessFunction(), > output_type=Types.STRING()).add_sink(self.test_sink) > 2022-09-29T02:10:45.3603527Z Sep 29 02:10:45 self.env.execute('test > process function') > 2022-09-29T02:10:45.3604445Z Sep 29 02:10:45 results = > self.test_sink.get_results() > 2022-09-29T02:10:45.3605684Z Sep 29 02:10:45 expected = ["current > timestamp: 1603708211000, current watermark: " > 2022-09-29T02:10:45.3607157Z Sep 29 02:10:45 > "-9223372036854775808, current_value: Row(f0=1, f1='1603708211000')", > 2022-09-29T02:10:45.3608256Z Sep 29 02:10:45 "current > timestamp: 1603708224000, current watermark: " > 2022-09-29T02:10:45.3609650Z Sep 29 02:10:45 > "-9223372036854775808, current_value: Row(f0=2, f1='1603708224000')", > 2022-09-29T02:10:45.3610854Z Sep 29 02:10:45 "current > timestamp: 1603708226000, current watermark: " > 2022-09-29T02:10:45.3612279Z Sep 29 02:10:45 > "-9223372036854775808, current_value: Row(f0=3, f1='1603708226000')", > 2022-09-29T02:10:45.3613382Z Sep 29 02:10:45 "current > timestamp: 1603708289000, current watermark: " > 2022-09-29T02:10:45.3615683Z Sep 29 02:10:45 > "-9223372036854775808, current_value: Row(f0=4, f1='1603708289000')"] > 2022-09-29T02:10:45.3617687Z Sep 29 02:10:45 > > self.assert_equals_sorted(expected, results) > 2022-09-29T02:10:45.3618620Z Sep 29 02:10:45 > 2022-09-29T02:10:45.3619425Z Sep 29 02:10:45 > pyflink/datastream/tests/test_data_stream.py:986: > 2022-09-29T02:10:45.3620424Z Sep 29 02:10:45 _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ > _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
[jira] [Commented] (FLINK-29461) ProcessDataStreamStreamingTests.test_process_function unstable
[ https://issues.apache.org/jira/browse/FLINK-29461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17637084#comment-17637084 ] Matthias Pohl commented on FLINK-29461: --- https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=43167=logs=9cada3cb-c1d3-5621-16da-0f718fb86602=c67e71ed-6451-5d26-8920-5a8cf9651901=24422 > ProcessDataStreamStreamingTests.test_process_function unstable > -- > > Key: FLINK-29461 > URL: https://issues.apache.org/jira/browse/FLINK-29461 > Project: Flink > Issue Type: Bug > Components: API / Python >Affects Versions: 1.16.0, 1.17.0 >Reporter: Huang Xingbo >Assignee: Huang Xingbo >Priority: Critical > Labels: test-stability > > {code:java} > 2022-09-29T02:10:45.3571648Z Sep 29 02:10:45 self = > testMethod=test_process_function> > 2022-09-29T02:10:45.3572279Z Sep 29 02:10:45 > 2022-09-29T02:10:45.3572810Z Sep 29 02:10:45 def > test_process_function(self): > 2022-09-29T02:10:45.3573495Z Sep 29 02:10:45 > self.env.set_parallelism(1) > 2022-09-29T02:10:45.3574148Z Sep 29 02:10:45 > self.env.get_config().set_auto_watermark_interval(2000) > 2022-09-29T02:10:45.3580634Z Sep 29 02:10:45 > self.env.set_stream_time_characteristic(TimeCharacteristic.EventTime) > 2022-09-29T02:10:45.3583194Z Sep 29 02:10:45 data_stream = > self.env.from_collection([(1, '1603708211000'), > 2022-09-29T02:10:45.3584515Z Sep 29 02:10:45 >(2, '1603708224000'), > 2022-09-29T02:10:45.3585957Z Sep 29 02:10:45 >(3, '1603708226000'), > 2022-09-29T02:10:45.3587132Z Sep 29 02:10:45 >(4, '1603708289000')], > 2022-09-29T02:10:45.3588094Z Sep 29 02:10:45 > type_info=Types.ROW([Types.INT(), Types.STRING()])) > 2022-09-29T02:10:45.3589090Z Sep 29 02:10:45 > 2022-09-29T02:10:45.3589949Z Sep 29 02:10:45 class > MyProcessFunction(ProcessFunction): > 2022-09-29T02:10:45.3590710Z Sep 29 02:10:45 > 2022-09-29T02:10:45.3591856Z Sep 29 02:10:45 def > process_element(self, value, ctx): > 2022-09-29T02:10:45.3592873Z Sep 29 02:10:45 > current_timestamp = ctx.timestamp() > 2022-09-29T02:10:45.3593862Z Sep 29 02:10:45 > current_watermark = ctx.timer_service().current_watermark() > 2022-09-29T02:10:45.3594915Z Sep 29 02:10:45 yield "current > timestamp: {}, current watermark: {}, current_value: {}"\ > 2022-09-29T02:10:45.3596201Z Sep 29 02:10:45 > .format(str(current_timestamp), str(current_watermark), str(value)) > 2022-09-29T02:10:45.3597089Z Sep 29 02:10:45 > 2022-09-29T02:10:45.3597942Z Sep 29 02:10:45 watermark_strategy = > WatermarkStrategy.for_monotonous_timestamps()\ > 2022-09-29T02:10:45.3599260Z Sep 29 02:10:45 > .with_timestamp_assigner(SecondColumnTimestampAssigner()) > 2022-09-29T02:10:45.3600611Z Sep 29 02:10:45 > data_stream.assign_timestamps_and_watermarks(watermark_strategy)\ > 2022-09-29T02:10:45.3601877Z Sep 29 02:10:45 > .process(MyProcessFunction(), > output_type=Types.STRING()).add_sink(self.test_sink) > 2022-09-29T02:10:45.3603527Z Sep 29 02:10:45 self.env.execute('test > process function') > 2022-09-29T02:10:45.3604445Z Sep 29 02:10:45 results = > self.test_sink.get_results() > 2022-09-29T02:10:45.3605684Z Sep 29 02:10:45 expected = ["current > timestamp: 1603708211000, current watermark: " > 2022-09-29T02:10:45.3607157Z Sep 29 02:10:45 > "-9223372036854775808, current_value: Row(f0=1, f1='1603708211000')", > 2022-09-29T02:10:45.3608256Z Sep 29 02:10:45 "current > timestamp: 1603708224000, current watermark: " > 2022-09-29T02:10:45.3609650Z Sep 29 02:10:45 > "-9223372036854775808, current_value: Row(f0=2, f1='1603708224000')", > 2022-09-29T02:10:45.3610854Z Sep 29 02:10:45 "current > timestamp: 1603708226000, current watermark: " > 2022-09-29T02:10:45.3612279Z Sep 29 02:10:45 > "-9223372036854775808, current_value: Row(f0=3, f1='1603708226000')", > 2022-09-29T02:10:45.3613382Z Sep 29 02:10:45 "current > timestamp: 1603708289000, current watermark: " > 2022-09-29T02:10:45.3615683Z Sep 29 02:10:45 > "-9223372036854775808, current_value: Row(f0=4, f1='1603708289000')"] > 2022-09-29T02:10:45.3617687Z Sep 29 02:10:45 > > self.assert_equals_sorted(expected, results) > 2022-09-29T02:10:45.3618620Z Sep 29 02:10:45 > 2022-09-29T02:10:45.3619425Z Sep 29 02:10:45 >
[jira] [Commented] (FLINK-29461) ProcessDataStreamStreamingTests.test_process_function unstable
[ https://issues.apache.org/jira/browse/FLINK-29461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17637056#comment-17637056 ] Leonard Xu commented on FLINK-29461: https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=43369=logs=bf5e383b-9fd3-5f02-ca1c-8f788e2e76d3=85189c57-d8a0-5c9c-b61d-fc05cfac62cf > ProcessDataStreamStreamingTests.test_process_function unstable > -- > > Key: FLINK-29461 > URL: https://issues.apache.org/jira/browse/FLINK-29461 > Project: Flink > Issue Type: Bug > Components: API / Python >Affects Versions: 1.16.0, 1.17.0 >Reporter: Huang Xingbo >Assignee: Huang Xingbo >Priority: Critical > Labels: test-stability > > {code:java} > 2022-09-29T02:10:45.3571648Z Sep 29 02:10:45 self = > testMethod=test_process_function> > 2022-09-29T02:10:45.3572279Z Sep 29 02:10:45 > 2022-09-29T02:10:45.3572810Z Sep 29 02:10:45 def > test_process_function(self): > 2022-09-29T02:10:45.3573495Z Sep 29 02:10:45 > self.env.set_parallelism(1) > 2022-09-29T02:10:45.3574148Z Sep 29 02:10:45 > self.env.get_config().set_auto_watermark_interval(2000) > 2022-09-29T02:10:45.3580634Z Sep 29 02:10:45 > self.env.set_stream_time_characteristic(TimeCharacteristic.EventTime) > 2022-09-29T02:10:45.3583194Z Sep 29 02:10:45 data_stream = > self.env.from_collection([(1, '1603708211000'), > 2022-09-29T02:10:45.3584515Z Sep 29 02:10:45 >(2, '1603708224000'), > 2022-09-29T02:10:45.3585957Z Sep 29 02:10:45 >(3, '1603708226000'), > 2022-09-29T02:10:45.3587132Z Sep 29 02:10:45 >(4, '1603708289000')], > 2022-09-29T02:10:45.3588094Z Sep 29 02:10:45 > type_info=Types.ROW([Types.INT(), Types.STRING()])) > 2022-09-29T02:10:45.3589090Z Sep 29 02:10:45 > 2022-09-29T02:10:45.3589949Z Sep 29 02:10:45 class > MyProcessFunction(ProcessFunction): > 2022-09-29T02:10:45.3590710Z Sep 29 02:10:45 > 2022-09-29T02:10:45.3591856Z Sep 29 02:10:45 def > process_element(self, value, ctx): > 2022-09-29T02:10:45.3592873Z Sep 29 02:10:45 > current_timestamp = ctx.timestamp() > 2022-09-29T02:10:45.3593862Z Sep 29 02:10:45 > current_watermark = ctx.timer_service().current_watermark() > 2022-09-29T02:10:45.3594915Z Sep 29 02:10:45 yield "current > timestamp: {}, current watermark: {}, current_value: {}"\ > 2022-09-29T02:10:45.3596201Z Sep 29 02:10:45 > .format(str(current_timestamp), str(current_watermark), str(value)) > 2022-09-29T02:10:45.3597089Z Sep 29 02:10:45 > 2022-09-29T02:10:45.3597942Z Sep 29 02:10:45 watermark_strategy = > WatermarkStrategy.for_monotonous_timestamps()\ > 2022-09-29T02:10:45.3599260Z Sep 29 02:10:45 > .with_timestamp_assigner(SecondColumnTimestampAssigner()) > 2022-09-29T02:10:45.3600611Z Sep 29 02:10:45 > data_stream.assign_timestamps_and_watermarks(watermark_strategy)\ > 2022-09-29T02:10:45.3601877Z Sep 29 02:10:45 > .process(MyProcessFunction(), > output_type=Types.STRING()).add_sink(self.test_sink) > 2022-09-29T02:10:45.3603527Z Sep 29 02:10:45 self.env.execute('test > process function') > 2022-09-29T02:10:45.3604445Z Sep 29 02:10:45 results = > self.test_sink.get_results() > 2022-09-29T02:10:45.3605684Z Sep 29 02:10:45 expected = ["current > timestamp: 1603708211000, current watermark: " > 2022-09-29T02:10:45.3607157Z Sep 29 02:10:45 > "-9223372036854775808, current_value: Row(f0=1, f1='1603708211000')", > 2022-09-29T02:10:45.3608256Z Sep 29 02:10:45 "current > timestamp: 1603708224000, current watermark: " > 2022-09-29T02:10:45.3609650Z Sep 29 02:10:45 > "-9223372036854775808, current_value: Row(f0=2, f1='1603708224000')", > 2022-09-29T02:10:45.3610854Z Sep 29 02:10:45 "current > timestamp: 1603708226000, current watermark: " > 2022-09-29T02:10:45.3612279Z Sep 29 02:10:45 > "-9223372036854775808, current_value: Row(f0=3, f1='1603708226000')", > 2022-09-29T02:10:45.3613382Z Sep 29 02:10:45 "current > timestamp: 1603708289000, current watermark: " > 2022-09-29T02:10:45.3615683Z Sep 29 02:10:45 > "-9223372036854775808, current_value: Row(f0=4, f1='1603708289000')"] > 2022-09-29T02:10:45.3617687Z Sep 29 02:10:45 > > self.assert_equals_sorted(expected, results) > 2022-09-29T02:10:45.3618620Z Sep 29 02:10:45 > 2022-09-29T02:10:45.3619425Z Sep 29 02:10:45 > pyflink/datastream/tests/test_data_stream.py:986:
[jira] [Commented] (FLINK-29461) ProcessDataStreamStreamingTests.test_process_function unstable
[ https://issues.apache.org/jira/browse/FLINK-29461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17637042#comment-17637042 ] Leonard Xu commented on FLINK-29461: https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=43367=logs=9cada3cb-c1d3-5621-16da-0f718fb86602=c67e71ed-6451-5d26-8920-5a8cf9651901 > ProcessDataStreamStreamingTests.test_process_function unstable > -- > > Key: FLINK-29461 > URL: https://issues.apache.org/jira/browse/FLINK-29461 > Project: Flink > Issue Type: Bug > Components: API / Python >Affects Versions: 1.16.0, 1.17.0 >Reporter: Huang Xingbo >Assignee: Huang Xingbo >Priority: Critical > Labels: test-stability > > {code:java} > 2022-09-29T02:10:45.3571648Z Sep 29 02:10:45 self = > testMethod=test_process_function> > 2022-09-29T02:10:45.3572279Z Sep 29 02:10:45 > 2022-09-29T02:10:45.3572810Z Sep 29 02:10:45 def > test_process_function(self): > 2022-09-29T02:10:45.3573495Z Sep 29 02:10:45 > self.env.set_parallelism(1) > 2022-09-29T02:10:45.3574148Z Sep 29 02:10:45 > self.env.get_config().set_auto_watermark_interval(2000) > 2022-09-29T02:10:45.3580634Z Sep 29 02:10:45 > self.env.set_stream_time_characteristic(TimeCharacteristic.EventTime) > 2022-09-29T02:10:45.3583194Z Sep 29 02:10:45 data_stream = > self.env.from_collection([(1, '1603708211000'), > 2022-09-29T02:10:45.3584515Z Sep 29 02:10:45 >(2, '1603708224000'), > 2022-09-29T02:10:45.3585957Z Sep 29 02:10:45 >(3, '1603708226000'), > 2022-09-29T02:10:45.3587132Z Sep 29 02:10:45 >(4, '1603708289000')], > 2022-09-29T02:10:45.3588094Z Sep 29 02:10:45 > type_info=Types.ROW([Types.INT(), Types.STRING()])) > 2022-09-29T02:10:45.3589090Z Sep 29 02:10:45 > 2022-09-29T02:10:45.3589949Z Sep 29 02:10:45 class > MyProcessFunction(ProcessFunction): > 2022-09-29T02:10:45.3590710Z Sep 29 02:10:45 > 2022-09-29T02:10:45.3591856Z Sep 29 02:10:45 def > process_element(self, value, ctx): > 2022-09-29T02:10:45.3592873Z Sep 29 02:10:45 > current_timestamp = ctx.timestamp() > 2022-09-29T02:10:45.3593862Z Sep 29 02:10:45 > current_watermark = ctx.timer_service().current_watermark() > 2022-09-29T02:10:45.3594915Z Sep 29 02:10:45 yield "current > timestamp: {}, current watermark: {}, current_value: {}"\ > 2022-09-29T02:10:45.3596201Z Sep 29 02:10:45 > .format(str(current_timestamp), str(current_watermark), str(value)) > 2022-09-29T02:10:45.3597089Z Sep 29 02:10:45 > 2022-09-29T02:10:45.3597942Z Sep 29 02:10:45 watermark_strategy = > WatermarkStrategy.for_monotonous_timestamps()\ > 2022-09-29T02:10:45.3599260Z Sep 29 02:10:45 > .with_timestamp_assigner(SecondColumnTimestampAssigner()) > 2022-09-29T02:10:45.3600611Z Sep 29 02:10:45 > data_stream.assign_timestamps_and_watermarks(watermark_strategy)\ > 2022-09-29T02:10:45.3601877Z Sep 29 02:10:45 > .process(MyProcessFunction(), > output_type=Types.STRING()).add_sink(self.test_sink) > 2022-09-29T02:10:45.3603527Z Sep 29 02:10:45 self.env.execute('test > process function') > 2022-09-29T02:10:45.3604445Z Sep 29 02:10:45 results = > self.test_sink.get_results() > 2022-09-29T02:10:45.3605684Z Sep 29 02:10:45 expected = ["current > timestamp: 1603708211000, current watermark: " > 2022-09-29T02:10:45.3607157Z Sep 29 02:10:45 > "-9223372036854775808, current_value: Row(f0=1, f1='1603708211000')", > 2022-09-29T02:10:45.3608256Z Sep 29 02:10:45 "current > timestamp: 1603708224000, current watermark: " > 2022-09-29T02:10:45.3609650Z Sep 29 02:10:45 > "-9223372036854775808, current_value: Row(f0=2, f1='1603708224000')", > 2022-09-29T02:10:45.3610854Z Sep 29 02:10:45 "current > timestamp: 1603708226000, current watermark: " > 2022-09-29T02:10:45.3612279Z Sep 29 02:10:45 > "-9223372036854775808, current_value: Row(f0=3, f1='1603708226000')", > 2022-09-29T02:10:45.3613382Z Sep 29 02:10:45 "current > timestamp: 1603708289000, current watermark: " > 2022-09-29T02:10:45.3615683Z Sep 29 02:10:45 > "-9223372036854775808, current_value: Row(f0=4, f1='1603708289000')"] > 2022-09-29T02:10:45.3617687Z Sep 29 02:10:45 > > self.assert_equals_sorted(expected, results) > 2022-09-29T02:10:45.3618620Z Sep 29 02:10:45 > 2022-09-29T02:10:45.3619425Z Sep 29 02:10:45 > pyflink/datastream/tests/test_data_stream.py:986:
[jira] [Commented] (FLINK-29461) ProcessDataStreamStreamingTests.test_process_function unstable
[ https://issues.apache.org/jira/browse/FLINK-29461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17636431#comment-17636431 ] Leonard Xu commented on FLINK-29461: https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=43332=logs=e92ecf6d-e207-5a42-7ff7-528ff0c5b259=40fc352e-9b4c-5fd8-363f-628f24b01ec2 > ProcessDataStreamStreamingTests.test_process_function unstable > -- > > Key: FLINK-29461 > URL: https://issues.apache.org/jira/browse/FLINK-29461 > Project: Flink > Issue Type: Bug > Components: API / Python >Affects Versions: 1.16.0, 1.17.0 >Reporter: Huang Xingbo >Assignee: Huang Xingbo >Priority: Critical > Labels: test-stability > > {code:java} > 2022-09-29T02:10:45.3571648Z Sep 29 02:10:45 self = > testMethod=test_process_function> > 2022-09-29T02:10:45.3572279Z Sep 29 02:10:45 > 2022-09-29T02:10:45.3572810Z Sep 29 02:10:45 def > test_process_function(self): > 2022-09-29T02:10:45.3573495Z Sep 29 02:10:45 > self.env.set_parallelism(1) > 2022-09-29T02:10:45.3574148Z Sep 29 02:10:45 > self.env.get_config().set_auto_watermark_interval(2000) > 2022-09-29T02:10:45.3580634Z Sep 29 02:10:45 > self.env.set_stream_time_characteristic(TimeCharacteristic.EventTime) > 2022-09-29T02:10:45.3583194Z Sep 29 02:10:45 data_stream = > self.env.from_collection([(1, '1603708211000'), > 2022-09-29T02:10:45.3584515Z Sep 29 02:10:45 >(2, '1603708224000'), > 2022-09-29T02:10:45.3585957Z Sep 29 02:10:45 >(3, '1603708226000'), > 2022-09-29T02:10:45.3587132Z Sep 29 02:10:45 >(4, '1603708289000')], > 2022-09-29T02:10:45.3588094Z Sep 29 02:10:45 > type_info=Types.ROW([Types.INT(), Types.STRING()])) > 2022-09-29T02:10:45.3589090Z Sep 29 02:10:45 > 2022-09-29T02:10:45.3589949Z Sep 29 02:10:45 class > MyProcessFunction(ProcessFunction): > 2022-09-29T02:10:45.3590710Z Sep 29 02:10:45 > 2022-09-29T02:10:45.3591856Z Sep 29 02:10:45 def > process_element(self, value, ctx): > 2022-09-29T02:10:45.3592873Z Sep 29 02:10:45 > current_timestamp = ctx.timestamp() > 2022-09-29T02:10:45.3593862Z Sep 29 02:10:45 > current_watermark = ctx.timer_service().current_watermark() > 2022-09-29T02:10:45.3594915Z Sep 29 02:10:45 yield "current > timestamp: {}, current watermark: {}, current_value: {}"\ > 2022-09-29T02:10:45.3596201Z Sep 29 02:10:45 > .format(str(current_timestamp), str(current_watermark), str(value)) > 2022-09-29T02:10:45.3597089Z Sep 29 02:10:45 > 2022-09-29T02:10:45.3597942Z Sep 29 02:10:45 watermark_strategy = > WatermarkStrategy.for_monotonous_timestamps()\ > 2022-09-29T02:10:45.3599260Z Sep 29 02:10:45 > .with_timestamp_assigner(SecondColumnTimestampAssigner()) > 2022-09-29T02:10:45.3600611Z Sep 29 02:10:45 > data_stream.assign_timestamps_and_watermarks(watermark_strategy)\ > 2022-09-29T02:10:45.3601877Z Sep 29 02:10:45 > .process(MyProcessFunction(), > output_type=Types.STRING()).add_sink(self.test_sink) > 2022-09-29T02:10:45.3603527Z Sep 29 02:10:45 self.env.execute('test > process function') > 2022-09-29T02:10:45.3604445Z Sep 29 02:10:45 results = > self.test_sink.get_results() > 2022-09-29T02:10:45.3605684Z Sep 29 02:10:45 expected = ["current > timestamp: 1603708211000, current watermark: " > 2022-09-29T02:10:45.3607157Z Sep 29 02:10:45 > "-9223372036854775808, current_value: Row(f0=1, f1='1603708211000')", > 2022-09-29T02:10:45.3608256Z Sep 29 02:10:45 "current > timestamp: 1603708224000, current watermark: " > 2022-09-29T02:10:45.3609650Z Sep 29 02:10:45 > "-9223372036854775808, current_value: Row(f0=2, f1='1603708224000')", > 2022-09-29T02:10:45.3610854Z Sep 29 02:10:45 "current > timestamp: 1603708226000, current watermark: " > 2022-09-29T02:10:45.3612279Z Sep 29 02:10:45 > "-9223372036854775808, current_value: Row(f0=3, f1='1603708226000')", > 2022-09-29T02:10:45.3613382Z Sep 29 02:10:45 "current > timestamp: 1603708289000, current watermark: " > 2022-09-29T02:10:45.3615683Z Sep 29 02:10:45 > "-9223372036854775808, current_value: Row(f0=4, f1='1603708289000')"] > 2022-09-29T02:10:45.3617687Z Sep 29 02:10:45 > > self.assert_equals_sorted(expected, results) > 2022-09-29T02:10:45.3618620Z Sep 29 02:10:45 > 2022-09-29T02:10:45.3619425Z Sep 29 02:10:45 > pyflink/datastream/tests/test_data_stream.py:986:
[jira] [Commented] (FLINK-29461) ProcessDataStreamStreamingTests.test_process_function unstable
[ https://issues.apache.org/jira/browse/FLINK-29461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17632189#comment-17632189 ] Martijn Visser commented on FLINK-29461: https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=43043=logs=3e4dd1a2-fe2f-5e5d-a581-48087e718d53=b4612f28-e3b5-5853-8a8b-610ae894217a=25767 > ProcessDataStreamStreamingTests.test_process_function unstable > -- > > Key: FLINK-29461 > URL: https://issues.apache.org/jira/browse/FLINK-29461 > Project: Flink > Issue Type: Bug > Components: API / Python >Affects Versions: 1.16.0, 1.17.0 >Reporter: Huang Xingbo >Assignee: Huang Xingbo >Priority: Critical > Labels: test-stability > > {code:java} > 2022-09-29T02:10:45.3571648Z Sep 29 02:10:45 self = > testMethod=test_process_function> > 2022-09-29T02:10:45.3572279Z Sep 29 02:10:45 > 2022-09-29T02:10:45.3572810Z Sep 29 02:10:45 def > test_process_function(self): > 2022-09-29T02:10:45.3573495Z Sep 29 02:10:45 > self.env.set_parallelism(1) > 2022-09-29T02:10:45.3574148Z Sep 29 02:10:45 > self.env.get_config().set_auto_watermark_interval(2000) > 2022-09-29T02:10:45.3580634Z Sep 29 02:10:45 > self.env.set_stream_time_characteristic(TimeCharacteristic.EventTime) > 2022-09-29T02:10:45.3583194Z Sep 29 02:10:45 data_stream = > self.env.from_collection([(1, '1603708211000'), > 2022-09-29T02:10:45.3584515Z Sep 29 02:10:45 >(2, '1603708224000'), > 2022-09-29T02:10:45.3585957Z Sep 29 02:10:45 >(3, '1603708226000'), > 2022-09-29T02:10:45.3587132Z Sep 29 02:10:45 >(4, '1603708289000')], > 2022-09-29T02:10:45.3588094Z Sep 29 02:10:45 > type_info=Types.ROW([Types.INT(), Types.STRING()])) > 2022-09-29T02:10:45.3589090Z Sep 29 02:10:45 > 2022-09-29T02:10:45.3589949Z Sep 29 02:10:45 class > MyProcessFunction(ProcessFunction): > 2022-09-29T02:10:45.3590710Z Sep 29 02:10:45 > 2022-09-29T02:10:45.3591856Z Sep 29 02:10:45 def > process_element(self, value, ctx): > 2022-09-29T02:10:45.3592873Z Sep 29 02:10:45 > current_timestamp = ctx.timestamp() > 2022-09-29T02:10:45.3593862Z Sep 29 02:10:45 > current_watermark = ctx.timer_service().current_watermark() > 2022-09-29T02:10:45.3594915Z Sep 29 02:10:45 yield "current > timestamp: {}, current watermark: {}, current_value: {}"\ > 2022-09-29T02:10:45.3596201Z Sep 29 02:10:45 > .format(str(current_timestamp), str(current_watermark), str(value)) > 2022-09-29T02:10:45.3597089Z Sep 29 02:10:45 > 2022-09-29T02:10:45.3597942Z Sep 29 02:10:45 watermark_strategy = > WatermarkStrategy.for_monotonous_timestamps()\ > 2022-09-29T02:10:45.3599260Z Sep 29 02:10:45 > .with_timestamp_assigner(SecondColumnTimestampAssigner()) > 2022-09-29T02:10:45.3600611Z Sep 29 02:10:45 > data_stream.assign_timestamps_and_watermarks(watermark_strategy)\ > 2022-09-29T02:10:45.3601877Z Sep 29 02:10:45 > .process(MyProcessFunction(), > output_type=Types.STRING()).add_sink(self.test_sink) > 2022-09-29T02:10:45.3603527Z Sep 29 02:10:45 self.env.execute('test > process function') > 2022-09-29T02:10:45.3604445Z Sep 29 02:10:45 results = > self.test_sink.get_results() > 2022-09-29T02:10:45.3605684Z Sep 29 02:10:45 expected = ["current > timestamp: 1603708211000, current watermark: " > 2022-09-29T02:10:45.3607157Z Sep 29 02:10:45 > "-9223372036854775808, current_value: Row(f0=1, f1='1603708211000')", > 2022-09-29T02:10:45.3608256Z Sep 29 02:10:45 "current > timestamp: 1603708224000, current watermark: " > 2022-09-29T02:10:45.3609650Z Sep 29 02:10:45 > "-9223372036854775808, current_value: Row(f0=2, f1='1603708224000')", > 2022-09-29T02:10:45.3610854Z Sep 29 02:10:45 "current > timestamp: 1603708226000, current watermark: " > 2022-09-29T02:10:45.3612279Z Sep 29 02:10:45 > "-9223372036854775808, current_value: Row(f0=3, f1='1603708226000')", > 2022-09-29T02:10:45.3613382Z Sep 29 02:10:45 "current > timestamp: 1603708289000, current watermark: " > 2022-09-29T02:10:45.3615683Z Sep 29 02:10:45 > "-9223372036854775808, current_value: Row(f0=4, f1='1603708289000')"] > 2022-09-29T02:10:45.3617687Z Sep 29 02:10:45 > > self.assert_equals_sorted(expected, results) > 2022-09-29T02:10:45.3618620Z Sep 29 02:10:45 > 2022-09-29T02:10:45.3619425Z Sep 29 02:10:45 >
[jira] [Commented] (FLINK-29461) ProcessDataStreamStreamingTests.test_process_function unstable
[ https://issues.apache.org/jira/browse/FLINK-29461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17630240#comment-17630240 ] Matthias Pohl commented on FLINK-29461: --- https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=42908=logs=3e4dd1a2-fe2f-5e5d-a581-48087e718d53=b4612f28-e3b5-5853-8a8b-610ae894217a=27019 > ProcessDataStreamStreamingTests.test_process_function unstable > -- > > Key: FLINK-29461 > URL: https://issues.apache.org/jira/browse/FLINK-29461 > Project: Flink > Issue Type: Bug > Components: API / Python >Affects Versions: 1.16.0, 1.17.0 >Reporter: Huang Xingbo >Assignee: Huang Xingbo >Priority: Critical > Labels: test-stability > > {code:java} > 2022-09-29T02:10:45.3571648Z Sep 29 02:10:45 self = > testMethod=test_process_function> > 2022-09-29T02:10:45.3572279Z Sep 29 02:10:45 > 2022-09-29T02:10:45.3572810Z Sep 29 02:10:45 def > test_process_function(self): > 2022-09-29T02:10:45.3573495Z Sep 29 02:10:45 > self.env.set_parallelism(1) > 2022-09-29T02:10:45.3574148Z Sep 29 02:10:45 > self.env.get_config().set_auto_watermark_interval(2000) > 2022-09-29T02:10:45.3580634Z Sep 29 02:10:45 > self.env.set_stream_time_characteristic(TimeCharacteristic.EventTime) > 2022-09-29T02:10:45.3583194Z Sep 29 02:10:45 data_stream = > self.env.from_collection([(1, '1603708211000'), > 2022-09-29T02:10:45.3584515Z Sep 29 02:10:45 >(2, '1603708224000'), > 2022-09-29T02:10:45.3585957Z Sep 29 02:10:45 >(3, '1603708226000'), > 2022-09-29T02:10:45.3587132Z Sep 29 02:10:45 >(4, '1603708289000')], > 2022-09-29T02:10:45.3588094Z Sep 29 02:10:45 > type_info=Types.ROW([Types.INT(), Types.STRING()])) > 2022-09-29T02:10:45.3589090Z Sep 29 02:10:45 > 2022-09-29T02:10:45.3589949Z Sep 29 02:10:45 class > MyProcessFunction(ProcessFunction): > 2022-09-29T02:10:45.3590710Z Sep 29 02:10:45 > 2022-09-29T02:10:45.3591856Z Sep 29 02:10:45 def > process_element(self, value, ctx): > 2022-09-29T02:10:45.3592873Z Sep 29 02:10:45 > current_timestamp = ctx.timestamp() > 2022-09-29T02:10:45.3593862Z Sep 29 02:10:45 > current_watermark = ctx.timer_service().current_watermark() > 2022-09-29T02:10:45.3594915Z Sep 29 02:10:45 yield "current > timestamp: {}, current watermark: {}, current_value: {}"\ > 2022-09-29T02:10:45.3596201Z Sep 29 02:10:45 > .format(str(current_timestamp), str(current_watermark), str(value)) > 2022-09-29T02:10:45.3597089Z Sep 29 02:10:45 > 2022-09-29T02:10:45.3597942Z Sep 29 02:10:45 watermark_strategy = > WatermarkStrategy.for_monotonous_timestamps()\ > 2022-09-29T02:10:45.3599260Z Sep 29 02:10:45 > .with_timestamp_assigner(SecondColumnTimestampAssigner()) > 2022-09-29T02:10:45.3600611Z Sep 29 02:10:45 > data_stream.assign_timestamps_and_watermarks(watermark_strategy)\ > 2022-09-29T02:10:45.3601877Z Sep 29 02:10:45 > .process(MyProcessFunction(), > output_type=Types.STRING()).add_sink(self.test_sink) > 2022-09-29T02:10:45.3603527Z Sep 29 02:10:45 self.env.execute('test > process function') > 2022-09-29T02:10:45.3604445Z Sep 29 02:10:45 results = > self.test_sink.get_results() > 2022-09-29T02:10:45.3605684Z Sep 29 02:10:45 expected = ["current > timestamp: 1603708211000, current watermark: " > 2022-09-29T02:10:45.3607157Z Sep 29 02:10:45 > "-9223372036854775808, current_value: Row(f0=1, f1='1603708211000')", > 2022-09-29T02:10:45.3608256Z Sep 29 02:10:45 "current > timestamp: 1603708224000, current watermark: " > 2022-09-29T02:10:45.3609650Z Sep 29 02:10:45 > "-9223372036854775808, current_value: Row(f0=2, f1='1603708224000')", > 2022-09-29T02:10:45.3610854Z Sep 29 02:10:45 "current > timestamp: 1603708226000, current watermark: " > 2022-09-29T02:10:45.3612279Z Sep 29 02:10:45 > "-9223372036854775808, current_value: Row(f0=3, f1='1603708226000')", > 2022-09-29T02:10:45.3613382Z Sep 29 02:10:45 "current > timestamp: 1603708289000, current watermark: " > 2022-09-29T02:10:45.3615683Z Sep 29 02:10:45 > "-9223372036854775808, current_value: Row(f0=4, f1='1603708289000')"] > 2022-09-29T02:10:45.3617687Z Sep 29 02:10:45 > > self.assert_equals_sorted(expected, results) > 2022-09-29T02:10:45.3618620Z Sep 29 02:10:45 > 2022-09-29T02:10:45.3619425Z Sep 29 02:10:45 >
[jira] [Commented] (FLINK-29461) ProcessDataStreamStreamingTests.test_process_function unstable
[ https://issues.apache.org/jira/browse/FLINK-29461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17629665#comment-17629665 ] Matthias Pohl commented on FLINK-29461: --- https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=42867=logs=bf5e383b-9fd3-5f02-ca1c-8f788e2e76d3=85189c57-d8a0-5c9c-b61d-fc05cfac62cf=27223 > ProcessDataStreamStreamingTests.test_process_function unstable > -- > > Key: FLINK-29461 > URL: https://issues.apache.org/jira/browse/FLINK-29461 > Project: Flink > Issue Type: Bug > Components: API / Python >Affects Versions: 1.16.0, 1.17.0 >Reporter: Huang Xingbo >Assignee: Huang Xingbo >Priority: Critical > Labels: test-stability > > {code:java} > 2022-09-29T02:10:45.3571648Z Sep 29 02:10:45 self = > testMethod=test_process_function> > 2022-09-29T02:10:45.3572279Z Sep 29 02:10:45 > 2022-09-29T02:10:45.3572810Z Sep 29 02:10:45 def > test_process_function(self): > 2022-09-29T02:10:45.3573495Z Sep 29 02:10:45 > self.env.set_parallelism(1) > 2022-09-29T02:10:45.3574148Z Sep 29 02:10:45 > self.env.get_config().set_auto_watermark_interval(2000) > 2022-09-29T02:10:45.3580634Z Sep 29 02:10:45 > self.env.set_stream_time_characteristic(TimeCharacteristic.EventTime) > 2022-09-29T02:10:45.3583194Z Sep 29 02:10:45 data_stream = > self.env.from_collection([(1, '1603708211000'), > 2022-09-29T02:10:45.3584515Z Sep 29 02:10:45 >(2, '1603708224000'), > 2022-09-29T02:10:45.3585957Z Sep 29 02:10:45 >(3, '1603708226000'), > 2022-09-29T02:10:45.3587132Z Sep 29 02:10:45 >(4, '1603708289000')], > 2022-09-29T02:10:45.3588094Z Sep 29 02:10:45 > type_info=Types.ROW([Types.INT(), Types.STRING()])) > 2022-09-29T02:10:45.3589090Z Sep 29 02:10:45 > 2022-09-29T02:10:45.3589949Z Sep 29 02:10:45 class > MyProcessFunction(ProcessFunction): > 2022-09-29T02:10:45.3590710Z Sep 29 02:10:45 > 2022-09-29T02:10:45.3591856Z Sep 29 02:10:45 def > process_element(self, value, ctx): > 2022-09-29T02:10:45.3592873Z Sep 29 02:10:45 > current_timestamp = ctx.timestamp() > 2022-09-29T02:10:45.3593862Z Sep 29 02:10:45 > current_watermark = ctx.timer_service().current_watermark() > 2022-09-29T02:10:45.3594915Z Sep 29 02:10:45 yield "current > timestamp: {}, current watermark: {}, current_value: {}"\ > 2022-09-29T02:10:45.3596201Z Sep 29 02:10:45 > .format(str(current_timestamp), str(current_watermark), str(value)) > 2022-09-29T02:10:45.3597089Z Sep 29 02:10:45 > 2022-09-29T02:10:45.3597942Z Sep 29 02:10:45 watermark_strategy = > WatermarkStrategy.for_monotonous_timestamps()\ > 2022-09-29T02:10:45.3599260Z Sep 29 02:10:45 > .with_timestamp_assigner(SecondColumnTimestampAssigner()) > 2022-09-29T02:10:45.3600611Z Sep 29 02:10:45 > data_stream.assign_timestamps_and_watermarks(watermark_strategy)\ > 2022-09-29T02:10:45.3601877Z Sep 29 02:10:45 > .process(MyProcessFunction(), > output_type=Types.STRING()).add_sink(self.test_sink) > 2022-09-29T02:10:45.3603527Z Sep 29 02:10:45 self.env.execute('test > process function') > 2022-09-29T02:10:45.3604445Z Sep 29 02:10:45 results = > self.test_sink.get_results() > 2022-09-29T02:10:45.3605684Z Sep 29 02:10:45 expected = ["current > timestamp: 1603708211000, current watermark: " > 2022-09-29T02:10:45.3607157Z Sep 29 02:10:45 > "-9223372036854775808, current_value: Row(f0=1, f1='1603708211000')", > 2022-09-29T02:10:45.3608256Z Sep 29 02:10:45 "current > timestamp: 1603708224000, current watermark: " > 2022-09-29T02:10:45.3609650Z Sep 29 02:10:45 > "-9223372036854775808, current_value: Row(f0=2, f1='1603708224000')", > 2022-09-29T02:10:45.3610854Z Sep 29 02:10:45 "current > timestamp: 1603708226000, current watermark: " > 2022-09-29T02:10:45.3612279Z Sep 29 02:10:45 > "-9223372036854775808, current_value: Row(f0=3, f1='1603708226000')", > 2022-09-29T02:10:45.3613382Z Sep 29 02:10:45 "current > timestamp: 1603708289000, current watermark: " > 2022-09-29T02:10:45.3615683Z Sep 29 02:10:45 > "-9223372036854775808, current_value: Row(f0=4, f1='1603708289000')"] > 2022-09-29T02:10:45.3617687Z Sep 29 02:10:45 > > self.assert_equals_sorted(expected, results) > 2022-09-29T02:10:45.3618620Z Sep 29 02:10:45 > 2022-09-29T02:10:45.3619425Z Sep 29 02:10:45 >
[jira] [Commented] (FLINK-29461) ProcessDataStreamStreamingTests.test_process_function unstable
[ https://issues.apache.org/jira/browse/FLINK-29461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17629654#comment-17629654 ] Matthias Pohl commented on FLINK-29461: --- https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=42858=logs=9cada3cb-c1d3-5621-16da-0f718fb86602=c67e71ed-6451-5d26-8920-5a8cf9651901=28095 > ProcessDataStreamStreamingTests.test_process_function unstable > -- > > Key: FLINK-29461 > URL: https://issues.apache.org/jira/browse/FLINK-29461 > Project: Flink > Issue Type: Bug > Components: API / Python >Affects Versions: 1.16.0, 1.17.0 >Reporter: Huang Xingbo >Assignee: Huang Xingbo >Priority: Critical > Labels: test-stability > > {code:java} > 2022-09-29T02:10:45.3571648Z Sep 29 02:10:45 self = > testMethod=test_process_function> > 2022-09-29T02:10:45.3572279Z Sep 29 02:10:45 > 2022-09-29T02:10:45.3572810Z Sep 29 02:10:45 def > test_process_function(self): > 2022-09-29T02:10:45.3573495Z Sep 29 02:10:45 > self.env.set_parallelism(1) > 2022-09-29T02:10:45.3574148Z Sep 29 02:10:45 > self.env.get_config().set_auto_watermark_interval(2000) > 2022-09-29T02:10:45.3580634Z Sep 29 02:10:45 > self.env.set_stream_time_characteristic(TimeCharacteristic.EventTime) > 2022-09-29T02:10:45.3583194Z Sep 29 02:10:45 data_stream = > self.env.from_collection([(1, '1603708211000'), > 2022-09-29T02:10:45.3584515Z Sep 29 02:10:45 >(2, '1603708224000'), > 2022-09-29T02:10:45.3585957Z Sep 29 02:10:45 >(3, '1603708226000'), > 2022-09-29T02:10:45.3587132Z Sep 29 02:10:45 >(4, '1603708289000')], > 2022-09-29T02:10:45.3588094Z Sep 29 02:10:45 > type_info=Types.ROW([Types.INT(), Types.STRING()])) > 2022-09-29T02:10:45.3589090Z Sep 29 02:10:45 > 2022-09-29T02:10:45.3589949Z Sep 29 02:10:45 class > MyProcessFunction(ProcessFunction): > 2022-09-29T02:10:45.3590710Z Sep 29 02:10:45 > 2022-09-29T02:10:45.3591856Z Sep 29 02:10:45 def > process_element(self, value, ctx): > 2022-09-29T02:10:45.3592873Z Sep 29 02:10:45 > current_timestamp = ctx.timestamp() > 2022-09-29T02:10:45.3593862Z Sep 29 02:10:45 > current_watermark = ctx.timer_service().current_watermark() > 2022-09-29T02:10:45.3594915Z Sep 29 02:10:45 yield "current > timestamp: {}, current watermark: {}, current_value: {}"\ > 2022-09-29T02:10:45.3596201Z Sep 29 02:10:45 > .format(str(current_timestamp), str(current_watermark), str(value)) > 2022-09-29T02:10:45.3597089Z Sep 29 02:10:45 > 2022-09-29T02:10:45.3597942Z Sep 29 02:10:45 watermark_strategy = > WatermarkStrategy.for_monotonous_timestamps()\ > 2022-09-29T02:10:45.3599260Z Sep 29 02:10:45 > .with_timestamp_assigner(SecondColumnTimestampAssigner()) > 2022-09-29T02:10:45.3600611Z Sep 29 02:10:45 > data_stream.assign_timestamps_and_watermarks(watermark_strategy)\ > 2022-09-29T02:10:45.3601877Z Sep 29 02:10:45 > .process(MyProcessFunction(), > output_type=Types.STRING()).add_sink(self.test_sink) > 2022-09-29T02:10:45.3603527Z Sep 29 02:10:45 self.env.execute('test > process function') > 2022-09-29T02:10:45.3604445Z Sep 29 02:10:45 results = > self.test_sink.get_results() > 2022-09-29T02:10:45.3605684Z Sep 29 02:10:45 expected = ["current > timestamp: 1603708211000, current watermark: " > 2022-09-29T02:10:45.3607157Z Sep 29 02:10:45 > "-9223372036854775808, current_value: Row(f0=1, f1='1603708211000')", > 2022-09-29T02:10:45.3608256Z Sep 29 02:10:45 "current > timestamp: 1603708224000, current watermark: " > 2022-09-29T02:10:45.3609650Z Sep 29 02:10:45 > "-9223372036854775808, current_value: Row(f0=2, f1='1603708224000')", > 2022-09-29T02:10:45.3610854Z Sep 29 02:10:45 "current > timestamp: 1603708226000, current watermark: " > 2022-09-29T02:10:45.3612279Z Sep 29 02:10:45 > "-9223372036854775808, current_value: Row(f0=3, f1='1603708226000')", > 2022-09-29T02:10:45.3613382Z Sep 29 02:10:45 "current > timestamp: 1603708289000, current watermark: " > 2022-09-29T02:10:45.3615683Z Sep 29 02:10:45 > "-9223372036854775808, current_value: Row(f0=4, f1='1603708289000')"] > 2022-09-29T02:10:45.3617687Z Sep 29 02:10:45 > > self.assert_equals_sorted(expected, results) > 2022-09-29T02:10:45.3618620Z Sep 29 02:10:45 > 2022-09-29T02:10:45.3619425Z Sep 29 02:10:45 >
[jira] [Commented] (FLINK-29461) ProcessDataStreamStreamingTests.test_process_function unstable
[ https://issues.apache.org/jira/browse/FLINK-29461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17629022#comment-17629022 ] Matthias Pohl commented on FLINK-29461: --- https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=42827=logs=9cada3cb-c1d3-5621-16da-0f718fb86602=c67e71ed-6451-5d26-8920-5a8cf9651901=25085 > ProcessDataStreamStreamingTests.test_process_function unstable > -- > > Key: FLINK-29461 > URL: https://issues.apache.org/jira/browse/FLINK-29461 > Project: Flink > Issue Type: Bug > Components: API / Python >Affects Versions: 1.16.0, 1.17.0 >Reporter: Huang Xingbo >Assignee: Huang Xingbo >Priority: Critical > Labels: test-stability > > {code:java} > 2022-09-29T02:10:45.3571648Z Sep 29 02:10:45 self = > testMethod=test_process_function> > 2022-09-29T02:10:45.3572279Z Sep 29 02:10:45 > 2022-09-29T02:10:45.3572810Z Sep 29 02:10:45 def > test_process_function(self): > 2022-09-29T02:10:45.3573495Z Sep 29 02:10:45 > self.env.set_parallelism(1) > 2022-09-29T02:10:45.3574148Z Sep 29 02:10:45 > self.env.get_config().set_auto_watermark_interval(2000) > 2022-09-29T02:10:45.3580634Z Sep 29 02:10:45 > self.env.set_stream_time_characteristic(TimeCharacteristic.EventTime) > 2022-09-29T02:10:45.3583194Z Sep 29 02:10:45 data_stream = > self.env.from_collection([(1, '1603708211000'), > 2022-09-29T02:10:45.3584515Z Sep 29 02:10:45 >(2, '1603708224000'), > 2022-09-29T02:10:45.3585957Z Sep 29 02:10:45 >(3, '1603708226000'), > 2022-09-29T02:10:45.3587132Z Sep 29 02:10:45 >(4, '1603708289000')], > 2022-09-29T02:10:45.3588094Z Sep 29 02:10:45 > type_info=Types.ROW([Types.INT(), Types.STRING()])) > 2022-09-29T02:10:45.3589090Z Sep 29 02:10:45 > 2022-09-29T02:10:45.3589949Z Sep 29 02:10:45 class > MyProcessFunction(ProcessFunction): > 2022-09-29T02:10:45.3590710Z Sep 29 02:10:45 > 2022-09-29T02:10:45.3591856Z Sep 29 02:10:45 def > process_element(self, value, ctx): > 2022-09-29T02:10:45.3592873Z Sep 29 02:10:45 > current_timestamp = ctx.timestamp() > 2022-09-29T02:10:45.3593862Z Sep 29 02:10:45 > current_watermark = ctx.timer_service().current_watermark() > 2022-09-29T02:10:45.3594915Z Sep 29 02:10:45 yield "current > timestamp: {}, current watermark: {}, current_value: {}"\ > 2022-09-29T02:10:45.3596201Z Sep 29 02:10:45 > .format(str(current_timestamp), str(current_watermark), str(value)) > 2022-09-29T02:10:45.3597089Z Sep 29 02:10:45 > 2022-09-29T02:10:45.3597942Z Sep 29 02:10:45 watermark_strategy = > WatermarkStrategy.for_monotonous_timestamps()\ > 2022-09-29T02:10:45.3599260Z Sep 29 02:10:45 > .with_timestamp_assigner(SecondColumnTimestampAssigner()) > 2022-09-29T02:10:45.3600611Z Sep 29 02:10:45 > data_stream.assign_timestamps_and_watermarks(watermark_strategy)\ > 2022-09-29T02:10:45.3601877Z Sep 29 02:10:45 > .process(MyProcessFunction(), > output_type=Types.STRING()).add_sink(self.test_sink) > 2022-09-29T02:10:45.3603527Z Sep 29 02:10:45 self.env.execute('test > process function') > 2022-09-29T02:10:45.3604445Z Sep 29 02:10:45 results = > self.test_sink.get_results() > 2022-09-29T02:10:45.3605684Z Sep 29 02:10:45 expected = ["current > timestamp: 1603708211000, current watermark: " > 2022-09-29T02:10:45.3607157Z Sep 29 02:10:45 > "-9223372036854775808, current_value: Row(f0=1, f1='1603708211000')", > 2022-09-29T02:10:45.3608256Z Sep 29 02:10:45 "current > timestamp: 1603708224000, current watermark: " > 2022-09-29T02:10:45.3609650Z Sep 29 02:10:45 > "-9223372036854775808, current_value: Row(f0=2, f1='1603708224000')", > 2022-09-29T02:10:45.3610854Z Sep 29 02:10:45 "current > timestamp: 1603708226000, current watermark: " > 2022-09-29T02:10:45.3612279Z Sep 29 02:10:45 > "-9223372036854775808, current_value: Row(f0=3, f1='1603708226000')", > 2022-09-29T02:10:45.3613382Z Sep 29 02:10:45 "current > timestamp: 1603708289000, current watermark: " > 2022-09-29T02:10:45.3615683Z Sep 29 02:10:45 > "-9223372036854775808, current_value: Row(f0=4, f1='1603708289000')"] > 2022-09-29T02:10:45.3617687Z Sep 29 02:10:45 > > self.assert_equals_sorted(expected, results) > 2022-09-29T02:10:45.3618620Z Sep 29 02:10:45 > 2022-09-29T02:10:45.3619425Z Sep 29 02:10:45 >
[jira] [Commented] (FLINK-29461) ProcessDataStreamStreamingTests.test_process_function unstable
[ https://issues.apache.org/jira/browse/FLINK-29461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17628626#comment-17628626 ] Qingsheng Ren commented on FLINK-29461: --- https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=42803=logs=9cada3cb-c1d3-5621-16da-0f718fb86602=c67e71ed-6451-5d26-8920-5a8cf9651901=24482 > ProcessDataStreamStreamingTests.test_process_function unstable > -- > > Key: FLINK-29461 > URL: https://issues.apache.org/jira/browse/FLINK-29461 > Project: Flink > Issue Type: Bug > Components: API / Python >Affects Versions: 1.16.0, 1.17.0 >Reporter: Huang Xingbo >Assignee: Huang Xingbo >Priority: Critical > Labels: test-stability > > {code:java} > 2022-09-29T02:10:45.3571648Z Sep 29 02:10:45 self = > testMethod=test_process_function> > 2022-09-29T02:10:45.3572279Z Sep 29 02:10:45 > 2022-09-29T02:10:45.3572810Z Sep 29 02:10:45 def > test_process_function(self): > 2022-09-29T02:10:45.3573495Z Sep 29 02:10:45 > self.env.set_parallelism(1) > 2022-09-29T02:10:45.3574148Z Sep 29 02:10:45 > self.env.get_config().set_auto_watermark_interval(2000) > 2022-09-29T02:10:45.3580634Z Sep 29 02:10:45 > self.env.set_stream_time_characteristic(TimeCharacteristic.EventTime) > 2022-09-29T02:10:45.3583194Z Sep 29 02:10:45 data_stream = > self.env.from_collection([(1, '1603708211000'), > 2022-09-29T02:10:45.3584515Z Sep 29 02:10:45 >(2, '1603708224000'), > 2022-09-29T02:10:45.3585957Z Sep 29 02:10:45 >(3, '1603708226000'), > 2022-09-29T02:10:45.3587132Z Sep 29 02:10:45 >(4, '1603708289000')], > 2022-09-29T02:10:45.3588094Z Sep 29 02:10:45 > type_info=Types.ROW([Types.INT(), Types.STRING()])) > 2022-09-29T02:10:45.3589090Z Sep 29 02:10:45 > 2022-09-29T02:10:45.3589949Z Sep 29 02:10:45 class > MyProcessFunction(ProcessFunction): > 2022-09-29T02:10:45.3590710Z Sep 29 02:10:45 > 2022-09-29T02:10:45.3591856Z Sep 29 02:10:45 def > process_element(self, value, ctx): > 2022-09-29T02:10:45.3592873Z Sep 29 02:10:45 > current_timestamp = ctx.timestamp() > 2022-09-29T02:10:45.3593862Z Sep 29 02:10:45 > current_watermark = ctx.timer_service().current_watermark() > 2022-09-29T02:10:45.3594915Z Sep 29 02:10:45 yield "current > timestamp: {}, current watermark: {}, current_value: {}"\ > 2022-09-29T02:10:45.3596201Z Sep 29 02:10:45 > .format(str(current_timestamp), str(current_watermark), str(value)) > 2022-09-29T02:10:45.3597089Z Sep 29 02:10:45 > 2022-09-29T02:10:45.3597942Z Sep 29 02:10:45 watermark_strategy = > WatermarkStrategy.for_monotonous_timestamps()\ > 2022-09-29T02:10:45.3599260Z Sep 29 02:10:45 > .with_timestamp_assigner(SecondColumnTimestampAssigner()) > 2022-09-29T02:10:45.3600611Z Sep 29 02:10:45 > data_stream.assign_timestamps_and_watermarks(watermark_strategy)\ > 2022-09-29T02:10:45.3601877Z Sep 29 02:10:45 > .process(MyProcessFunction(), > output_type=Types.STRING()).add_sink(self.test_sink) > 2022-09-29T02:10:45.3603527Z Sep 29 02:10:45 self.env.execute('test > process function') > 2022-09-29T02:10:45.3604445Z Sep 29 02:10:45 results = > self.test_sink.get_results() > 2022-09-29T02:10:45.3605684Z Sep 29 02:10:45 expected = ["current > timestamp: 1603708211000, current watermark: " > 2022-09-29T02:10:45.3607157Z Sep 29 02:10:45 > "-9223372036854775808, current_value: Row(f0=1, f1='1603708211000')", > 2022-09-29T02:10:45.3608256Z Sep 29 02:10:45 "current > timestamp: 1603708224000, current watermark: " > 2022-09-29T02:10:45.3609650Z Sep 29 02:10:45 > "-9223372036854775808, current_value: Row(f0=2, f1='1603708224000')", > 2022-09-29T02:10:45.3610854Z Sep 29 02:10:45 "current > timestamp: 1603708226000, current watermark: " > 2022-09-29T02:10:45.3612279Z Sep 29 02:10:45 > "-9223372036854775808, current_value: Row(f0=3, f1='1603708226000')", > 2022-09-29T02:10:45.3613382Z Sep 29 02:10:45 "current > timestamp: 1603708289000, current watermark: " > 2022-09-29T02:10:45.3615683Z Sep 29 02:10:45 > "-9223372036854775808, current_value: Row(f0=4, f1='1603708289000')"] > 2022-09-29T02:10:45.3617687Z Sep 29 02:10:45 > > self.assert_equals_sorted(expected, results) > 2022-09-29T02:10:45.3618620Z Sep 29 02:10:45 > 2022-09-29T02:10:45.3619425Z Sep 29 02:10:45 >
[jira] [Commented] (FLINK-29461) ProcessDataStreamStreamingTests.test_process_function unstable
[ https://issues.apache.org/jira/browse/FLINK-29461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17627520#comment-17627520 ] Martijn Visser commented on FLINK-29461: https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=42724=logs=3e4dd1a2-fe2f-5e5d-a581-48087e718d53=b4612f28-e3b5-5853-8a8b-610ae894217a=25197 > ProcessDataStreamStreamingTests.test_process_function unstable > -- > > Key: FLINK-29461 > URL: https://issues.apache.org/jira/browse/FLINK-29461 > Project: Flink > Issue Type: Bug > Components: API / Python >Affects Versions: 1.16.0 >Reporter: Huang Xingbo >Assignee: Huang Xingbo >Priority: Critical > Labels: test-stability > > {code:java} > 2022-09-29T02:10:45.3571648Z Sep 29 02:10:45 self = > testMethod=test_process_function> > 2022-09-29T02:10:45.3572279Z Sep 29 02:10:45 > 2022-09-29T02:10:45.3572810Z Sep 29 02:10:45 def > test_process_function(self): > 2022-09-29T02:10:45.3573495Z Sep 29 02:10:45 > self.env.set_parallelism(1) > 2022-09-29T02:10:45.3574148Z Sep 29 02:10:45 > self.env.get_config().set_auto_watermark_interval(2000) > 2022-09-29T02:10:45.3580634Z Sep 29 02:10:45 > self.env.set_stream_time_characteristic(TimeCharacteristic.EventTime) > 2022-09-29T02:10:45.3583194Z Sep 29 02:10:45 data_stream = > self.env.from_collection([(1, '1603708211000'), > 2022-09-29T02:10:45.3584515Z Sep 29 02:10:45 >(2, '1603708224000'), > 2022-09-29T02:10:45.3585957Z Sep 29 02:10:45 >(3, '1603708226000'), > 2022-09-29T02:10:45.3587132Z Sep 29 02:10:45 >(4, '1603708289000')], > 2022-09-29T02:10:45.3588094Z Sep 29 02:10:45 > type_info=Types.ROW([Types.INT(), Types.STRING()])) > 2022-09-29T02:10:45.3589090Z Sep 29 02:10:45 > 2022-09-29T02:10:45.3589949Z Sep 29 02:10:45 class > MyProcessFunction(ProcessFunction): > 2022-09-29T02:10:45.3590710Z Sep 29 02:10:45 > 2022-09-29T02:10:45.3591856Z Sep 29 02:10:45 def > process_element(self, value, ctx): > 2022-09-29T02:10:45.3592873Z Sep 29 02:10:45 > current_timestamp = ctx.timestamp() > 2022-09-29T02:10:45.3593862Z Sep 29 02:10:45 > current_watermark = ctx.timer_service().current_watermark() > 2022-09-29T02:10:45.3594915Z Sep 29 02:10:45 yield "current > timestamp: {}, current watermark: {}, current_value: {}"\ > 2022-09-29T02:10:45.3596201Z Sep 29 02:10:45 > .format(str(current_timestamp), str(current_watermark), str(value)) > 2022-09-29T02:10:45.3597089Z Sep 29 02:10:45 > 2022-09-29T02:10:45.3597942Z Sep 29 02:10:45 watermark_strategy = > WatermarkStrategy.for_monotonous_timestamps()\ > 2022-09-29T02:10:45.3599260Z Sep 29 02:10:45 > .with_timestamp_assigner(SecondColumnTimestampAssigner()) > 2022-09-29T02:10:45.3600611Z Sep 29 02:10:45 > data_stream.assign_timestamps_and_watermarks(watermark_strategy)\ > 2022-09-29T02:10:45.3601877Z Sep 29 02:10:45 > .process(MyProcessFunction(), > output_type=Types.STRING()).add_sink(self.test_sink) > 2022-09-29T02:10:45.3603527Z Sep 29 02:10:45 self.env.execute('test > process function') > 2022-09-29T02:10:45.3604445Z Sep 29 02:10:45 results = > self.test_sink.get_results() > 2022-09-29T02:10:45.3605684Z Sep 29 02:10:45 expected = ["current > timestamp: 1603708211000, current watermark: " > 2022-09-29T02:10:45.3607157Z Sep 29 02:10:45 > "-9223372036854775808, current_value: Row(f0=1, f1='1603708211000')", > 2022-09-29T02:10:45.3608256Z Sep 29 02:10:45 "current > timestamp: 1603708224000, current watermark: " > 2022-09-29T02:10:45.3609650Z Sep 29 02:10:45 > "-9223372036854775808, current_value: Row(f0=2, f1='1603708224000')", > 2022-09-29T02:10:45.3610854Z Sep 29 02:10:45 "current > timestamp: 1603708226000, current watermark: " > 2022-09-29T02:10:45.3612279Z Sep 29 02:10:45 > "-9223372036854775808, current_value: Row(f0=3, f1='1603708226000')", > 2022-09-29T02:10:45.3613382Z Sep 29 02:10:45 "current > timestamp: 1603708289000, current watermark: " > 2022-09-29T02:10:45.3615683Z Sep 29 02:10:45 > "-9223372036854775808, current_value: Row(f0=4, f1='1603708289000')"] > 2022-09-29T02:10:45.3617687Z Sep 29 02:10:45 > > self.assert_equals_sorted(expected, results) > 2022-09-29T02:10:45.3618620Z Sep 29 02:10:45 > 2022-09-29T02:10:45.3619425Z Sep 29 02:10:45 >
[jira] [Commented] (FLINK-29461) ProcessDataStreamStreamingTests.test_process_function unstable
[ https://issues.apache.org/jira/browse/FLINK-29461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17627008#comment-17627008 ] Martijn Visser commented on FLINK-29461: https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=42680=logs=e92ecf6d-e207-5a42-7ff7-528ff0c5b259=40fc352e-9b4c-5fd8-363f-628f24b01ec2=25187 > ProcessDataStreamStreamingTests.test_process_function unstable > -- > > Key: FLINK-29461 > URL: https://issues.apache.org/jira/browse/FLINK-29461 > Project: Flink > Issue Type: Bug > Components: API / Python >Affects Versions: 1.16.0 >Reporter: Huang Xingbo >Priority: Major > Labels: test-stability > > {code:java} > 2022-09-29T02:10:45.3571648Z Sep 29 02:10:45 self = > testMethod=test_process_function> > 2022-09-29T02:10:45.3572279Z Sep 29 02:10:45 > 2022-09-29T02:10:45.3572810Z Sep 29 02:10:45 def > test_process_function(self): > 2022-09-29T02:10:45.3573495Z Sep 29 02:10:45 > self.env.set_parallelism(1) > 2022-09-29T02:10:45.3574148Z Sep 29 02:10:45 > self.env.get_config().set_auto_watermark_interval(2000) > 2022-09-29T02:10:45.3580634Z Sep 29 02:10:45 > self.env.set_stream_time_characteristic(TimeCharacteristic.EventTime) > 2022-09-29T02:10:45.3583194Z Sep 29 02:10:45 data_stream = > self.env.from_collection([(1, '1603708211000'), > 2022-09-29T02:10:45.3584515Z Sep 29 02:10:45 >(2, '1603708224000'), > 2022-09-29T02:10:45.3585957Z Sep 29 02:10:45 >(3, '1603708226000'), > 2022-09-29T02:10:45.3587132Z Sep 29 02:10:45 >(4, '1603708289000')], > 2022-09-29T02:10:45.3588094Z Sep 29 02:10:45 > type_info=Types.ROW([Types.INT(), Types.STRING()])) > 2022-09-29T02:10:45.3589090Z Sep 29 02:10:45 > 2022-09-29T02:10:45.3589949Z Sep 29 02:10:45 class > MyProcessFunction(ProcessFunction): > 2022-09-29T02:10:45.3590710Z Sep 29 02:10:45 > 2022-09-29T02:10:45.3591856Z Sep 29 02:10:45 def > process_element(self, value, ctx): > 2022-09-29T02:10:45.3592873Z Sep 29 02:10:45 > current_timestamp = ctx.timestamp() > 2022-09-29T02:10:45.3593862Z Sep 29 02:10:45 > current_watermark = ctx.timer_service().current_watermark() > 2022-09-29T02:10:45.3594915Z Sep 29 02:10:45 yield "current > timestamp: {}, current watermark: {}, current_value: {}"\ > 2022-09-29T02:10:45.3596201Z Sep 29 02:10:45 > .format(str(current_timestamp), str(current_watermark), str(value)) > 2022-09-29T02:10:45.3597089Z Sep 29 02:10:45 > 2022-09-29T02:10:45.3597942Z Sep 29 02:10:45 watermark_strategy = > WatermarkStrategy.for_monotonous_timestamps()\ > 2022-09-29T02:10:45.3599260Z Sep 29 02:10:45 > .with_timestamp_assigner(SecondColumnTimestampAssigner()) > 2022-09-29T02:10:45.3600611Z Sep 29 02:10:45 > data_stream.assign_timestamps_and_watermarks(watermark_strategy)\ > 2022-09-29T02:10:45.3601877Z Sep 29 02:10:45 > .process(MyProcessFunction(), > output_type=Types.STRING()).add_sink(self.test_sink) > 2022-09-29T02:10:45.3603527Z Sep 29 02:10:45 self.env.execute('test > process function') > 2022-09-29T02:10:45.3604445Z Sep 29 02:10:45 results = > self.test_sink.get_results() > 2022-09-29T02:10:45.3605684Z Sep 29 02:10:45 expected = ["current > timestamp: 1603708211000, current watermark: " > 2022-09-29T02:10:45.3607157Z Sep 29 02:10:45 > "-9223372036854775808, current_value: Row(f0=1, f1='1603708211000')", > 2022-09-29T02:10:45.3608256Z Sep 29 02:10:45 "current > timestamp: 1603708224000, current watermark: " > 2022-09-29T02:10:45.3609650Z Sep 29 02:10:45 > "-9223372036854775808, current_value: Row(f0=2, f1='1603708224000')", > 2022-09-29T02:10:45.3610854Z Sep 29 02:10:45 "current > timestamp: 1603708226000, current watermark: " > 2022-09-29T02:10:45.3612279Z Sep 29 02:10:45 > "-9223372036854775808, current_value: Row(f0=3, f1='1603708226000')", > 2022-09-29T02:10:45.3613382Z Sep 29 02:10:45 "current > timestamp: 1603708289000, current watermark: " > 2022-09-29T02:10:45.3615683Z Sep 29 02:10:45 > "-9223372036854775808, current_value: Row(f0=4, f1='1603708289000')"] > 2022-09-29T02:10:45.3617687Z Sep 29 02:10:45 > > self.assert_equals_sorted(expected, results) > 2022-09-29T02:10:45.3618620Z Sep 29 02:10:45 > 2022-09-29T02:10:45.3619425Z Sep 29 02:10:45 > pyflink/datastream/tests/test_data_stream.py:986: > 2022-09-29T02:10:45.3620424Z