[jira] [Commented] (FLINK-29461) ProcessDataStreamStreamingTests.test_process_function unstable

2022-12-12 Thread Xingbo Huang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17646050#comment-17646050
 ] 

Xingbo Huang commented on FLINK-29461:
--

[~mapohl] I have prepared a PR to make the test more stable.

> ProcessDataStreamStreamingTests.test_process_function unstable
> --
>
> Key: FLINK-29461
> URL: https://issues.apache.org/jira/browse/FLINK-29461
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python
>Affects Versions: 1.16.0, 1.17.0
>Reporter: Huang Xingbo
>Assignee: Huang Xingbo
>Priority: Critical
>  Labels: pull-request-available, test-stability
>
> {code:java}
> 2022-09-29T02:10:45.3571648Z Sep 29 02:10:45 self = 
>  testMethod=test_process_function>
> 2022-09-29T02:10:45.3572279Z Sep 29 02:10:45 
> 2022-09-29T02:10:45.3572810Z Sep 29 02:10:45 def 
> test_process_function(self):
> 2022-09-29T02:10:45.3573495Z Sep 29 02:10:45 
> self.env.set_parallelism(1)
> 2022-09-29T02:10:45.3574148Z Sep 29 02:10:45 
> self.env.get_config().set_auto_watermark_interval(2000)
> 2022-09-29T02:10:45.3580634Z Sep 29 02:10:45 
> self.env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
> 2022-09-29T02:10:45.3583194Z Sep 29 02:10:45 data_stream = 
> self.env.from_collection([(1, '1603708211000'),
> 2022-09-29T02:10:45.3584515Z Sep 29 02:10:45  
>(2, '1603708224000'),
> 2022-09-29T02:10:45.3585957Z Sep 29 02:10:45  
>(3, '1603708226000'),
> 2022-09-29T02:10:45.3587132Z Sep 29 02:10:45  
>(4, '1603708289000')],
> 2022-09-29T02:10:45.3588094Z Sep 29 02:10:45  
>   type_info=Types.ROW([Types.INT(), Types.STRING()]))
> 2022-09-29T02:10:45.3589090Z Sep 29 02:10:45 
> 2022-09-29T02:10:45.3589949Z Sep 29 02:10:45 class 
> MyProcessFunction(ProcessFunction):
> 2022-09-29T02:10:45.3590710Z Sep 29 02:10:45 
> 2022-09-29T02:10:45.3591856Z Sep 29 02:10:45 def 
> process_element(self, value, ctx):
> 2022-09-29T02:10:45.3592873Z Sep 29 02:10:45 
> current_timestamp = ctx.timestamp()
> 2022-09-29T02:10:45.3593862Z Sep 29 02:10:45 
> current_watermark = ctx.timer_service().current_watermark()
> 2022-09-29T02:10:45.3594915Z Sep 29 02:10:45 yield "current 
> timestamp: {}, current watermark: {}, current_value: {}"\
> 2022-09-29T02:10:45.3596201Z Sep 29 02:10:45 
> .format(str(current_timestamp), str(current_watermark), str(value))
> 2022-09-29T02:10:45.3597089Z Sep 29 02:10:45 
> 2022-09-29T02:10:45.3597942Z Sep 29 02:10:45 watermark_strategy = 
> WatermarkStrategy.for_monotonous_timestamps()\
> 2022-09-29T02:10:45.3599260Z Sep 29 02:10:45 
> .with_timestamp_assigner(SecondColumnTimestampAssigner())
> 2022-09-29T02:10:45.3600611Z Sep 29 02:10:45 
> data_stream.assign_timestamps_and_watermarks(watermark_strategy)\
> 2022-09-29T02:10:45.3601877Z Sep 29 02:10:45 
> .process(MyProcessFunction(), 
> output_type=Types.STRING()).add_sink(self.test_sink)
> 2022-09-29T02:10:45.3603527Z Sep 29 02:10:45 self.env.execute('test 
> process function')
> 2022-09-29T02:10:45.3604445Z Sep 29 02:10:45 results = 
> self.test_sink.get_results()
> 2022-09-29T02:10:45.3605684Z Sep 29 02:10:45 expected = ["current 
> timestamp: 1603708211000, current watermark: "
> 2022-09-29T02:10:45.3607157Z Sep 29 02:10:45 
> "-9223372036854775808, current_value: Row(f0=1, f1='1603708211000')",
> 2022-09-29T02:10:45.3608256Z Sep 29 02:10:45 "current 
> timestamp: 1603708224000, current watermark: "
> 2022-09-29T02:10:45.3609650Z Sep 29 02:10:45 
> "-9223372036854775808, current_value: Row(f0=2, f1='1603708224000')",
> 2022-09-29T02:10:45.3610854Z Sep 29 02:10:45 "current 
> timestamp: 1603708226000, current watermark: "
> 2022-09-29T02:10:45.3612279Z Sep 29 02:10:45 
> "-9223372036854775808, current_value: Row(f0=3, f1='1603708226000')",
> 2022-09-29T02:10:45.3613382Z Sep 29 02:10:45 "current 
> timestamp: 1603708289000, current watermark: "
> 2022-09-29T02:10:45.3615683Z Sep 29 02:10:45 
> "-9223372036854775808, current_value: Row(f0=4, f1='1603708289000')"]
> 2022-09-29T02:10:45.3617687Z Sep 29 02:10:45 >   
> self.assert_equals_sorted(expected, results)
> 2022-09-29T02:10:45.3618620Z Sep 29 02:10:45 
> 2022-09-29T02:10:45.3619425Z Sep 29 02:10:45 
> pyflink/datastream/tests/test_data_stream.py:986: 
> 2022-09-29T02:10:45.3620424Z Sep 29 02:10:45 _ _ _ _ _ _ _ _ _ _ 

[jira] [Commented] (FLINK-29461) ProcessDataStreamStreamingTests.test_process_function unstable

2022-12-12 Thread Leonard Xu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17646011#comment-17646011
 ] 

Leonard Xu commented on FLINK-29461:


https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=43871=logs=9cada3cb-c1d3-5621-16da-0f718fb86602=c67e71ed-6451-5d26-8920-5a8cf9651901

> ProcessDataStreamStreamingTests.test_process_function unstable
> --
>
> Key: FLINK-29461
> URL: https://issues.apache.org/jira/browse/FLINK-29461
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python
>Affects Versions: 1.16.0, 1.17.0
>Reporter: Huang Xingbo
>Assignee: Huang Xingbo
>Priority: Critical
>  Labels: test-stability
>
> {code:java}
> 2022-09-29T02:10:45.3571648Z Sep 29 02:10:45 self = 
>  testMethod=test_process_function>
> 2022-09-29T02:10:45.3572279Z Sep 29 02:10:45 
> 2022-09-29T02:10:45.3572810Z Sep 29 02:10:45 def 
> test_process_function(self):
> 2022-09-29T02:10:45.3573495Z Sep 29 02:10:45 
> self.env.set_parallelism(1)
> 2022-09-29T02:10:45.3574148Z Sep 29 02:10:45 
> self.env.get_config().set_auto_watermark_interval(2000)
> 2022-09-29T02:10:45.3580634Z Sep 29 02:10:45 
> self.env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
> 2022-09-29T02:10:45.3583194Z Sep 29 02:10:45 data_stream = 
> self.env.from_collection([(1, '1603708211000'),
> 2022-09-29T02:10:45.3584515Z Sep 29 02:10:45  
>(2, '1603708224000'),
> 2022-09-29T02:10:45.3585957Z Sep 29 02:10:45  
>(3, '1603708226000'),
> 2022-09-29T02:10:45.3587132Z Sep 29 02:10:45  
>(4, '1603708289000')],
> 2022-09-29T02:10:45.3588094Z Sep 29 02:10:45  
>   type_info=Types.ROW([Types.INT(), Types.STRING()]))
> 2022-09-29T02:10:45.3589090Z Sep 29 02:10:45 
> 2022-09-29T02:10:45.3589949Z Sep 29 02:10:45 class 
> MyProcessFunction(ProcessFunction):
> 2022-09-29T02:10:45.3590710Z Sep 29 02:10:45 
> 2022-09-29T02:10:45.3591856Z Sep 29 02:10:45 def 
> process_element(self, value, ctx):
> 2022-09-29T02:10:45.3592873Z Sep 29 02:10:45 
> current_timestamp = ctx.timestamp()
> 2022-09-29T02:10:45.3593862Z Sep 29 02:10:45 
> current_watermark = ctx.timer_service().current_watermark()
> 2022-09-29T02:10:45.3594915Z Sep 29 02:10:45 yield "current 
> timestamp: {}, current watermark: {}, current_value: {}"\
> 2022-09-29T02:10:45.3596201Z Sep 29 02:10:45 
> .format(str(current_timestamp), str(current_watermark), str(value))
> 2022-09-29T02:10:45.3597089Z Sep 29 02:10:45 
> 2022-09-29T02:10:45.3597942Z Sep 29 02:10:45 watermark_strategy = 
> WatermarkStrategy.for_monotonous_timestamps()\
> 2022-09-29T02:10:45.3599260Z Sep 29 02:10:45 
> .with_timestamp_assigner(SecondColumnTimestampAssigner())
> 2022-09-29T02:10:45.3600611Z Sep 29 02:10:45 
> data_stream.assign_timestamps_and_watermarks(watermark_strategy)\
> 2022-09-29T02:10:45.3601877Z Sep 29 02:10:45 
> .process(MyProcessFunction(), 
> output_type=Types.STRING()).add_sink(self.test_sink)
> 2022-09-29T02:10:45.3603527Z Sep 29 02:10:45 self.env.execute('test 
> process function')
> 2022-09-29T02:10:45.3604445Z Sep 29 02:10:45 results = 
> self.test_sink.get_results()
> 2022-09-29T02:10:45.3605684Z Sep 29 02:10:45 expected = ["current 
> timestamp: 1603708211000, current watermark: "
> 2022-09-29T02:10:45.3607157Z Sep 29 02:10:45 
> "-9223372036854775808, current_value: Row(f0=1, f1='1603708211000')",
> 2022-09-29T02:10:45.3608256Z Sep 29 02:10:45 "current 
> timestamp: 1603708224000, current watermark: "
> 2022-09-29T02:10:45.3609650Z Sep 29 02:10:45 
> "-9223372036854775808, current_value: Row(f0=2, f1='1603708224000')",
> 2022-09-29T02:10:45.3610854Z Sep 29 02:10:45 "current 
> timestamp: 1603708226000, current watermark: "
> 2022-09-29T02:10:45.3612279Z Sep 29 02:10:45 
> "-9223372036854775808, current_value: Row(f0=3, f1='1603708226000')",
> 2022-09-29T02:10:45.3613382Z Sep 29 02:10:45 "current 
> timestamp: 1603708289000, current watermark: "
> 2022-09-29T02:10:45.3615683Z Sep 29 02:10:45 
> "-9223372036854775808, current_value: Row(f0=4, f1='1603708289000')"]
> 2022-09-29T02:10:45.3617687Z Sep 29 02:10:45 >   
> self.assert_equals_sorted(expected, results)
> 2022-09-29T02:10:45.3618620Z Sep 29 02:10:45 
> 2022-09-29T02:10:45.3619425Z Sep 29 02:10:45 
> pyflink/datastream/tests/test_data_stream.py:986: 

[jira] [Commented] (FLINK-29461) ProcessDataStreamStreamingTests.test_process_function unstable

2022-12-09 Thread Matthias Pohl (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17645147#comment-17645147
 ] 

Matthias Pohl commented on FLINK-29461:
---

[~dianfu] [~hxbks2ks] any updates on that one? Or is there anyone else we can 
ping to have a look at it?

> ProcessDataStreamStreamingTests.test_process_function unstable
> --
>
> Key: FLINK-29461
> URL: https://issues.apache.org/jira/browse/FLINK-29461
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python
>Affects Versions: 1.16.0, 1.17.0
>Reporter: Huang Xingbo
>Assignee: Huang Xingbo
>Priority: Critical
>  Labels: test-stability
>
> {code:java}
> 2022-09-29T02:10:45.3571648Z Sep 29 02:10:45 self = 
>  testMethod=test_process_function>
> 2022-09-29T02:10:45.3572279Z Sep 29 02:10:45 
> 2022-09-29T02:10:45.3572810Z Sep 29 02:10:45 def 
> test_process_function(self):
> 2022-09-29T02:10:45.3573495Z Sep 29 02:10:45 
> self.env.set_parallelism(1)
> 2022-09-29T02:10:45.3574148Z Sep 29 02:10:45 
> self.env.get_config().set_auto_watermark_interval(2000)
> 2022-09-29T02:10:45.3580634Z Sep 29 02:10:45 
> self.env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
> 2022-09-29T02:10:45.3583194Z Sep 29 02:10:45 data_stream = 
> self.env.from_collection([(1, '1603708211000'),
> 2022-09-29T02:10:45.3584515Z Sep 29 02:10:45  
>(2, '1603708224000'),
> 2022-09-29T02:10:45.3585957Z Sep 29 02:10:45  
>(3, '1603708226000'),
> 2022-09-29T02:10:45.3587132Z Sep 29 02:10:45  
>(4, '1603708289000')],
> 2022-09-29T02:10:45.3588094Z Sep 29 02:10:45  
>   type_info=Types.ROW([Types.INT(), Types.STRING()]))
> 2022-09-29T02:10:45.3589090Z Sep 29 02:10:45 
> 2022-09-29T02:10:45.3589949Z Sep 29 02:10:45 class 
> MyProcessFunction(ProcessFunction):
> 2022-09-29T02:10:45.3590710Z Sep 29 02:10:45 
> 2022-09-29T02:10:45.3591856Z Sep 29 02:10:45 def 
> process_element(self, value, ctx):
> 2022-09-29T02:10:45.3592873Z Sep 29 02:10:45 
> current_timestamp = ctx.timestamp()
> 2022-09-29T02:10:45.3593862Z Sep 29 02:10:45 
> current_watermark = ctx.timer_service().current_watermark()
> 2022-09-29T02:10:45.3594915Z Sep 29 02:10:45 yield "current 
> timestamp: {}, current watermark: {}, current_value: {}"\
> 2022-09-29T02:10:45.3596201Z Sep 29 02:10:45 
> .format(str(current_timestamp), str(current_watermark), str(value))
> 2022-09-29T02:10:45.3597089Z Sep 29 02:10:45 
> 2022-09-29T02:10:45.3597942Z Sep 29 02:10:45 watermark_strategy = 
> WatermarkStrategy.for_monotonous_timestamps()\
> 2022-09-29T02:10:45.3599260Z Sep 29 02:10:45 
> .with_timestamp_assigner(SecondColumnTimestampAssigner())
> 2022-09-29T02:10:45.3600611Z Sep 29 02:10:45 
> data_stream.assign_timestamps_and_watermarks(watermark_strategy)\
> 2022-09-29T02:10:45.3601877Z Sep 29 02:10:45 
> .process(MyProcessFunction(), 
> output_type=Types.STRING()).add_sink(self.test_sink)
> 2022-09-29T02:10:45.3603527Z Sep 29 02:10:45 self.env.execute('test 
> process function')
> 2022-09-29T02:10:45.3604445Z Sep 29 02:10:45 results = 
> self.test_sink.get_results()
> 2022-09-29T02:10:45.3605684Z Sep 29 02:10:45 expected = ["current 
> timestamp: 1603708211000, current watermark: "
> 2022-09-29T02:10:45.3607157Z Sep 29 02:10:45 
> "-9223372036854775808, current_value: Row(f0=1, f1='1603708211000')",
> 2022-09-29T02:10:45.3608256Z Sep 29 02:10:45 "current 
> timestamp: 1603708224000, current watermark: "
> 2022-09-29T02:10:45.3609650Z Sep 29 02:10:45 
> "-9223372036854775808, current_value: Row(f0=2, f1='1603708224000')",
> 2022-09-29T02:10:45.3610854Z Sep 29 02:10:45 "current 
> timestamp: 1603708226000, current watermark: "
> 2022-09-29T02:10:45.3612279Z Sep 29 02:10:45 
> "-9223372036854775808, current_value: Row(f0=3, f1='1603708226000')",
> 2022-09-29T02:10:45.3613382Z Sep 29 02:10:45 "current 
> timestamp: 1603708289000, current watermark: "
> 2022-09-29T02:10:45.3615683Z Sep 29 02:10:45 
> "-9223372036854775808, current_value: Row(f0=4, f1='1603708289000')"]
> 2022-09-29T02:10:45.3617687Z Sep 29 02:10:45 >   
> self.assert_equals_sorted(expected, results)
> 2022-09-29T02:10:45.3618620Z Sep 29 02:10:45 
> 2022-09-29T02:10:45.3619425Z Sep 29 02:10:45 
> pyflink/datastream/tests/test_data_stream.py:986: 
> 2022-09-29T02:10:45.3620424Z Sep 29 

[jira] [Commented] (FLINK-29461) ProcessDataStreamStreamingTests.test_process_function unstable

2022-12-09 Thread Matthias Pohl (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17645145#comment-17645145
 ] 

Matthias Pohl commented on FLINK-29461:
---

https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=43817=logs=3e4dd1a2-fe2f-5e5d-a581-48087e718d53=b4612f28-e3b5-5853-8a8b-610ae894217a=28358

> ProcessDataStreamStreamingTests.test_process_function unstable
> --
>
> Key: FLINK-29461
> URL: https://issues.apache.org/jira/browse/FLINK-29461
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python
>Affects Versions: 1.16.0, 1.17.0
>Reporter: Huang Xingbo
>Assignee: Huang Xingbo
>Priority: Critical
>  Labels: test-stability
>
> {code:java}
> 2022-09-29T02:10:45.3571648Z Sep 29 02:10:45 self = 
>  testMethod=test_process_function>
> 2022-09-29T02:10:45.3572279Z Sep 29 02:10:45 
> 2022-09-29T02:10:45.3572810Z Sep 29 02:10:45 def 
> test_process_function(self):
> 2022-09-29T02:10:45.3573495Z Sep 29 02:10:45 
> self.env.set_parallelism(1)
> 2022-09-29T02:10:45.3574148Z Sep 29 02:10:45 
> self.env.get_config().set_auto_watermark_interval(2000)
> 2022-09-29T02:10:45.3580634Z Sep 29 02:10:45 
> self.env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
> 2022-09-29T02:10:45.3583194Z Sep 29 02:10:45 data_stream = 
> self.env.from_collection([(1, '1603708211000'),
> 2022-09-29T02:10:45.3584515Z Sep 29 02:10:45  
>(2, '1603708224000'),
> 2022-09-29T02:10:45.3585957Z Sep 29 02:10:45  
>(3, '1603708226000'),
> 2022-09-29T02:10:45.3587132Z Sep 29 02:10:45  
>(4, '1603708289000')],
> 2022-09-29T02:10:45.3588094Z Sep 29 02:10:45  
>   type_info=Types.ROW([Types.INT(), Types.STRING()]))
> 2022-09-29T02:10:45.3589090Z Sep 29 02:10:45 
> 2022-09-29T02:10:45.3589949Z Sep 29 02:10:45 class 
> MyProcessFunction(ProcessFunction):
> 2022-09-29T02:10:45.3590710Z Sep 29 02:10:45 
> 2022-09-29T02:10:45.3591856Z Sep 29 02:10:45 def 
> process_element(self, value, ctx):
> 2022-09-29T02:10:45.3592873Z Sep 29 02:10:45 
> current_timestamp = ctx.timestamp()
> 2022-09-29T02:10:45.3593862Z Sep 29 02:10:45 
> current_watermark = ctx.timer_service().current_watermark()
> 2022-09-29T02:10:45.3594915Z Sep 29 02:10:45 yield "current 
> timestamp: {}, current watermark: {}, current_value: {}"\
> 2022-09-29T02:10:45.3596201Z Sep 29 02:10:45 
> .format(str(current_timestamp), str(current_watermark), str(value))
> 2022-09-29T02:10:45.3597089Z Sep 29 02:10:45 
> 2022-09-29T02:10:45.3597942Z Sep 29 02:10:45 watermark_strategy = 
> WatermarkStrategy.for_monotonous_timestamps()\
> 2022-09-29T02:10:45.3599260Z Sep 29 02:10:45 
> .with_timestamp_assigner(SecondColumnTimestampAssigner())
> 2022-09-29T02:10:45.3600611Z Sep 29 02:10:45 
> data_stream.assign_timestamps_and_watermarks(watermark_strategy)\
> 2022-09-29T02:10:45.3601877Z Sep 29 02:10:45 
> .process(MyProcessFunction(), 
> output_type=Types.STRING()).add_sink(self.test_sink)
> 2022-09-29T02:10:45.3603527Z Sep 29 02:10:45 self.env.execute('test 
> process function')
> 2022-09-29T02:10:45.3604445Z Sep 29 02:10:45 results = 
> self.test_sink.get_results()
> 2022-09-29T02:10:45.3605684Z Sep 29 02:10:45 expected = ["current 
> timestamp: 1603708211000, current watermark: "
> 2022-09-29T02:10:45.3607157Z Sep 29 02:10:45 
> "-9223372036854775808, current_value: Row(f0=1, f1='1603708211000')",
> 2022-09-29T02:10:45.3608256Z Sep 29 02:10:45 "current 
> timestamp: 1603708224000, current watermark: "
> 2022-09-29T02:10:45.3609650Z Sep 29 02:10:45 
> "-9223372036854775808, current_value: Row(f0=2, f1='1603708224000')",
> 2022-09-29T02:10:45.3610854Z Sep 29 02:10:45 "current 
> timestamp: 1603708226000, current watermark: "
> 2022-09-29T02:10:45.3612279Z Sep 29 02:10:45 
> "-9223372036854775808, current_value: Row(f0=3, f1='1603708226000')",
> 2022-09-29T02:10:45.3613382Z Sep 29 02:10:45 "current 
> timestamp: 1603708289000, current watermark: "
> 2022-09-29T02:10:45.3615683Z Sep 29 02:10:45 
> "-9223372036854775808, current_value: Row(f0=4, f1='1603708289000')"]
> 2022-09-29T02:10:45.3617687Z Sep 29 02:10:45 >   
> self.assert_equals_sorted(expected, results)
> 2022-09-29T02:10:45.3618620Z Sep 29 02:10:45 
> 2022-09-29T02:10:45.3619425Z Sep 29 02:10:45 
> 

[jira] [Commented] (FLINK-29461) ProcessDataStreamStreamingTests.test_process_function unstable

2022-12-06 Thread Qingsheng Ren (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17644132#comment-17644132
 ] 

Qingsheng Ren commented on FLINK-29461:
---

https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=43771=logs=e92ecf6d-e207-5a42-7ff7-528ff0c5b259=40fc352e-9b4c-5fd8-363f-628f24b01ec2=28618

> ProcessDataStreamStreamingTests.test_process_function unstable
> --
>
> Key: FLINK-29461
> URL: https://issues.apache.org/jira/browse/FLINK-29461
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python
>Affects Versions: 1.16.0, 1.17.0
>Reporter: Huang Xingbo
>Assignee: Huang Xingbo
>Priority: Critical
>  Labels: test-stability
>
> {code:java}
> 2022-09-29T02:10:45.3571648Z Sep 29 02:10:45 self = 
>  testMethod=test_process_function>
> 2022-09-29T02:10:45.3572279Z Sep 29 02:10:45 
> 2022-09-29T02:10:45.3572810Z Sep 29 02:10:45 def 
> test_process_function(self):
> 2022-09-29T02:10:45.3573495Z Sep 29 02:10:45 
> self.env.set_parallelism(1)
> 2022-09-29T02:10:45.3574148Z Sep 29 02:10:45 
> self.env.get_config().set_auto_watermark_interval(2000)
> 2022-09-29T02:10:45.3580634Z Sep 29 02:10:45 
> self.env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
> 2022-09-29T02:10:45.3583194Z Sep 29 02:10:45 data_stream = 
> self.env.from_collection([(1, '1603708211000'),
> 2022-09-29T02:10:45.3584515Z Sep 29 02:10:45  
>(2, '1603708224000'),
> 2022-09-29T02:10:45.3585957Z Sep 29 02:10:45  
>(3, '1603708226000'),
> 2022-09-29T02:10:45.3587132Z Sep 29 02:10:45  
>(4, '1603708289000')],
> 2022-09-29T02:10:45.3588094Z Sep 29 02:10:45  
>   type_info=Types.ROW([Types.INT(), Types.STRING()]))
> 2022-09-29T02:10:45.3589090Z Sep 29 02:10:45 
> 2022-09-29T02:10:45.3589949Z Sep 29 02:10:45 class 
> MyProcessFunction(ProcessFunction):
> 2022-09-29T02:10:45.3590710Z Sep 29 02:10:45 
> 2022-09-29T02:10:45.3591856Z Sep 29 02:10:45 def 
> process_element(self, value, ctx):
> 2022-09-29T02:10:45.3592873Z Sep 29 02:10:45 
> current_timestamp = ctx.timestamp()
> 2022-09-29T02:10:45.3593862Z Sep 29 02:10:45 
> current_watermark = ctx.timer_service().current_watermark()
> 2022-09-29T02:10:45.3594915Z Sep 29 02:10:45 yield "current 
> timestamp: {}, current watermark: {}, current_value: {}"\
> 2022-09-29T02:10:45.3596201Z Sep 29 02:10:45 
> .format(str(current_timestamp), str(current_watermark), str(value))
> 2022-09-29T02:10:45.3597089Z Sep 29 02:10:45 
> 2022-09-29T02:10:45.3597942Z Sep 29 02:10:45 watermark_strategy = 
> WatermarkStrategy.for_monotonous_timestamps()\
> 2022-09-29T02:10:45.3599260Z Sep 29 02:10:45 
> .with_timestamp_assigner(SecondColumnTimestampAssigner())
> 2022-09-29T02:10:45.3600611Z Sep 29 02:10:45 
> data_stream.assign_timestamps_and_watermarks(watermark_strategy)\
> 2022-09-29T02:10:45.3601877Z Sep 29 02:10:45 
> .process(MyProcessFunction(), 
> output_type=Types.STRING()).add_sink(self.test_sink)
> 2022-09-29T02:10:45.3603527Z Sep 29 02:10:45 self.env.execute('test 
> process function')
> 2022-09-29T02:10:45.3604445Z Sep 29 02:10:45 results = 
> self.test_sink.get_results()
> 2022-09-29T02:10:45.3605684Z Sep 29 02:10:45 expected = ["current 
> timestamp: 1603708211000, current watermark: "
> 2022-09-29T02:10:45.3607157Z Sep 29 02:10:45 
> "-9223372036854775808, current_value: Row(f0=1, f1='1603708211000')",
> 2022-09-29T02:10:45.3608256Z Sep 29 02:10:45 "current 
> timestamp: 1603708224000, current watermark: "
> 2022-09-29T02:10:45.3609650Z Sep 29 02:10:45 
> "-9223372036854775808, current_value: Row(f0=2, f1='1603708224000')",
> 2022-09-29T02:10:45.3610854Z Sep 29 02:10:45 "current 
> timestamp: 1603708226000, current watermark: "
> 2022-09-29T02:10:45.3612279Z Sep 29 02:10:45 
> "-9223372036854775808, current_value: Row(f0=3, f1='1603708226000')",
> 2022-09-29T02:10:45.3613382Z Sep 29 02:10:45 "current 
> timestamp: 1603708289000, current watermark: "
> 2022-09-29T02:10:45.3615683Z Sep 29 02:10:45 
> "-9223372036854775808, current_value: Row(f0=4, f1='1603708289000')"]
> 2022-09-29T02:10:45.3617687Z Sep 29 02:10:45 >   
> self.assert_equals_sorted(expected, results)
> 2022-09-29T02:10:45.3618620Z Sep 29 02:10:45 
> 2022-09-29T02:10:45.3619425Z Sep 29 02:10:45 
> 

[jira] [Commented] (FLINK-29461) ProcessDataStreamStreamingTests.test_process_function unstable

2022-12-06 Thread Martijn Visser (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17643720#comment-17643720
 ] 

Martijn Visser commented on FLINK-29461:


https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=43742=logs=e92ecf6d-e207-5a42-7ff7-528ff0c5b259=40fc352e-9b4c-5fd8-363f-628f24b01ec2=28926

> ProcessDataStreamStreamingTests.test_process_function unstable
> --
>
> Key: FLINK-29461
> URL: https://issues.apache.org/jira/browse/FLINK-29461
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python
>Affects Versions: 1.16.0, 1.17.0
>Reporter: Huang Xingbo
>Assignee: Huang Xingbo
>Priority: Critical
>  Labels: test-stability
>
> {code:java}
> 2022-09-29T02:10:45.3571648Z Sep 29 02:10:45 self = 
>  testMethod=test_process_function>
> 2022-09-29T02:10:45.3572279Z Sep 29 02:10:45 
> 2022-09-29T02:10:45.3572810Z Sep 29 02:10:45 def 
> test_process_function(self):
> 2022-09-29T02:10:45.3573495Z Sep 29 02:10:45 
> self.env.set_parallelism(1)
> 2022-09-29T02:10:45.3574148Z Sep 29 02:10:45 
> self.env.get_config().set_auto_watermark_interval(2000)
> 2022-09-29T02:10:45.3580634Z Sep 29 02:10:45 
> self.env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
> 2022-09-29T02:10:45.3583194Z Sep 29 02:10:45 data_stream = 
> self.env.from_collection([(1, '1603708211000'),
> 2022-09-29T02:10:45.3584515Z Sep 29 02:10:45  
>(2, '1603708224000'),
> 2022-09-29T02:10:45.3585957Z Sep 29 02:10:45  
>(3, '1603708226000'),
> 2022-09-29T02:10:45.3587132Z Sep 29 02:10:45  
>(4, '1603708289000')],
> 2022-09-29T02:10:45.3588094Z Sep 29 02:10:45  
>   type_info=Types.ROW([Types.INT(), Types.STRING()]))
> 2022-09-29T02:10:45.3589090Z Sep 29 02:10:45 
> 2022-09-29T02:10:45.3589949Z Sep 29 02:10:45 class 
> MyProcessFunction(ProcessFunction):
> 2022-09-29T02:10:45.3590710Z Sep 29 02:10:45 
> 2022-09-29T02:10:45.3591856Z Sep 29 02:10:45 def 
> process_element(self, value, ctx):
> 2022-09-29T02:10:45.3592873Z Sep 29 02:10:45 
> current_timestamp = ctx.timestamp()
> 2022-09-29T02:10:45.3593862Z Sep 29 02:10:45 
> current_watermark = ctx.timer_service().current_watermark()
> 2022-09-29T02:10:45.3594915Z Sep 29 02:10:45 yield "current 
> timestamp: {}, current watermark: {}, current_value: {}"\
> 2022-09-29T02:10:45.3596201Z Sep 29 02:10:45 
> .format(str(current_timestamp), str(current_watermark), str(value))
> 2022-09-29T02:10:45.3597089Z Sep 29 02:10:45 
> 2022-09-29T02:10:45.3597942Z Sep 29 02:10:45 watermark_strategy = 
> WatermarkStrategy.for_monotonous_timestamps()\
> 2022-09-29T02:10:45.3599260Z Sep 29 02:10:45 
> .with_timestamp_assigner(SecondColumnTimestampAssigner())
> 2022-09-29T02:10:45.3600611Z Sep 29 02:10:45 
> data_stream.assign_timestamps_and_watermarks(watermark_strategy)\
> 2022-09-29T02:10:45.3601877Z Sep 29 02:10:45 
> .process(MyProcessFunction(), 
> output_type=Types.STRING()).add_sink(self.test_sink)
> 2022-09-29T02:10:45.3603527Z Sep 29 02:10:45 self.env.execute('test 
> process function')
> 2022-09-29T02:10:45.3604445Z Sep 29 02:10:45 results = 
> self.test_sink.get_results()
> 2022-09-29T02:10:45.3605684Z Sep 29 02:10:45 expected = ["current 
> timestamp: 1603708211000, current watermark: "
> 2022-09-29T02:10:45.3607157Z Sep 29 02:10:45 
> "-9223372036854775808, current_value: Row(f0=1, f1='1603708211000')",
> 2022-09-29T02:10:45.3608256Z Sep 29 02:10:45 "current 
> timestamp: 1603708224000, current watermark: "
> 2022-09-29T02:10:45.3609650Z Sep 29 02:10:45 
> "-9223372036854775808, current_value: Row(f0=2, f1='1603708224000')",
> 2022-09-29T02:10:45.3610854Z Sep 29 02:10:45 "current 
> timestamp: 1603708226000, current watermark: "
> 2022-09-29T02:10:45.3612279Z Sep 29 02:10:45 
> "-9223372036854775808, current_value: Row(f0=3, f1='1603708226000')",
> 2022-09-29T02:10:45.3613382Z Sep 29 02:10:45 "current 
> timestamp: 1603708289000, current watermark: "
> 2022-09-29T02:10:45.3615683Z Sep 29 02:10:45 
> "-9223372036854775808, current_value: Row(f0=4, f1='1603708289000')"]
> 2022-09-29T02:10:45.3617687Z Sep 29 02:10:45 >   
> self.assert_equals_sorted(expected, results)
> 2022-09-29T02:10:45.3618620Z Sep 29 02:10:45 
> 2022-09-29T02:10:45.3619425Z Sep 29 02:10:45 
> 

[jira] [Commented] (FLINK-29461) ProcessDataStreamStreamingTests.test_process_function unstable

2022-12-05 Thread Matthias Pohl (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17643282#comment-17643282
 ] 

Matthias Pohl commented on FLINK-29461:
---

https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=43711=logs=bf5e383b-9fd3-5f02-ca1c-8f788e2e76d3=85189c57-d8a0-5c9c-b61d-fc05cfac62cf=29177

> ProcessDataStreamStreamingTests.test_process_function unstable
> --
>
> Key: FLINK-29461
> URL: https://issues.apache.org/jira/browse/FLINK-29461
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python
>Affects Versions: 1.16.0, 1.17.0
>Reporter: Huang Xingbo
>Assignee: Huang Xingbo
>Priority: Critical
>  Labels: test-stability
>
> {code:java}
> 2022-09-29T02:10:45.3571648Z Sep 29 02:10:45 self = 
>  testMethod=test_process_function>
> 2022-09-29T02:10:45.3572279Z Sep 29 02:10:45 
> 2022-09-29T02:10:45.3572810Z Sep 29 02:10:45 def 
> test_process_function(self):
> 2022-09-29T02:10:45.3573495Z Sep 29 02:10:45 
> self.env.set_parallelism(1)
> 2022-09-29T02:10:45.3574148Z Sep 29 02:10:45 
> self.env.get_config().set_auto_watermark_interval(2000)
> 2022-09-29T02:10:45.3580634Z Sep 29 02:10:45 
> self.env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
> 2022-09-29T02:10:45.3583194Z Sep 29 02:10:45 data_stream = 
> self.env.from_collection([(1, '1603708211000'),
> 2022-09-29T02:10:45.3584515Z Sep 29 02:10:45  
>(2, '1603708224000'),
> 2022-09-29T02:10:45.3585957Z Sep 29 02:10:45  
>(3, '1603708226000'),
> 2022-09-29T02:10:45.3587132Z Sep 29 02:10:45  
>(4, '1603708289000')],
> 2022-09-29T02:10:45.3588094Z Sep 29 02:10:45  
>   type_info=Types.ROW([Types.INT(), Types.STRING()]))
> 2022-09-29T02:10:45.3589090Z Sep 29 02:10:45 
> 2022-09-29T02:10:45.3589949Z Sep 29 02:10:45 class 
> MyProcessFunction(ProcessFunction):
> 2022-09-29T02:10:45.3590710Z Sep 29 02:10:45 
> 2022-09-29T02:10:45.3591856Z Sep 29 02:10:45 def 
> process_element(self, value, ctx):
> 2022-09-29T02:10:45.3592873Z Sep 29 02:10:45 
> current_timestamp = ctx.timestamp()
> 2022-09-29T02:10:45.3593862Z Sep 29 02:10:45 
> current_watermark = ctx.timer_service().current_watermark()
> 2022-09-29T02:10:45.3594915Z Sep 29 02:10:45 yield "current 
> timestamp: {}, current watermark: {}, current_value: {}"\
> 2022-09-29T02:10:45.3596201Z Sep 29 02:10:45 
> .format(str(current_timestamp), str(current_watermark), str(value))
> 2022-09-29T02:10:45.3597089Z Sep 29 02:10:45 
> 2022-09-29T02:10:45.3597942Z Sep 29 02:10:45 watermark_strategy = 
> WatermarkStrategy.for_monotonous_timestamps()\
> 2022-09-29T02:10:45.3599260Z Sep 29 02:10:45 
> .with_timestamp_assigner(SecondColumnTimestampAssigner())
> 2022-09-29T02:10:45.3600611Z Sep 29 02:10:45 
> data_stream.assign_timestamps_and_watermarks(watermark_strategy)\
> 2022-09-29T02:10:45.3601877Z Sep 29 02:10:45 
> .process(MyProcessFunction(), 
> output_type=Types.STRING()).add_sink(self.test_sink)
> 2022-09-29T02:10:45.3603527Z Sep 29 02:10:45 self.env.execute('test 
> process function')
> 2022-09-29T02:10:45.3604445Z Sep 29 02:10:45 results = 
> self.test_sink.get_results()
> 2022-09-29T02:10:45.3605684Z Sep 29 02:10:45 expected = ["current 
> timestamp: 1603708211000, current watermark: "
> 2022-09-29T02:10:45.3607157Z Sep 29 02:10:45 
> "-9223372036854775808, current_value: Row(f0=1, f1='1603708211000')",
> 2022-09-29T02:10:45.3608256Z Sep 29 02:10:45 "current 
> timestamp: 1603708224000, current watermark: "
> 2022-09-29T02:10:45.3609650Z Sep 29 02:10:45 
> "-9223372036854775808, current_value: Row(f0=2, f1='1603708224000')",
> 2022-09-29T02:10:45.3610854Z Sep 29 02:10:45 "current 
> timestamp: 1603708226000, current watermark: "
> 2022-09-29T02:10:45.3612279Z Sep 29 02:10:45 
> "-9223372036854775808, current_value: Row(f0=3, f1='1603708226000')",
> 2022-09-29T02:10:45.3613382Z Sep 29 02:10:45 "current 
> timestamp: 1603708289000, current watermark: "
> 2022-09-29T02:10:45.3615683Z Sep 29 02:10:45 
> "-9223372036854775808, current_value: Row(f0=4, f1='1603708289000')"]
> 2022-09-29T02:10:45.3617687Z Sep 29 02:10:45 >   
> self.assert_equals_sorted(expected, results)
> 2022-09-29T02:10:45.3618620Z Sep 29 02:10:45 
> 2022-09-29T02:10:45.3619425Z Sep 29 02:10:45 
> 

[jira] [Commented] (FLINK-29461) ProcessDataStreamStreamingTests.test_process_function unstable

2022-12-05 Thread Matthias Pohl (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17643273#comment-17643273
 ] 

Matthias Pohl commented on FLINK-29461:
---

https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=43709=logs=9cada3cb-c1d3-5621-16da-0f718fb86602=c67e71ed-6451-5d26-8920-5a8cf9651901=28522

> ProcessDataStreamStreamingTests.test_process_function unstable
> --
>
> Key: FLINK-29461
> URL: https://issues.apache.org/jira/browse/FLINK-29461
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python
>Affects Versions: 1.16.0, 1.17.0
>Reporter: Huang Xingbo
>Assignee: Huang Xingbo
>Priority: Critical
>  Labels: test-stability
>
> {code:java}
> 2022-09-29T02:10:45.3571648Z Sep 29 02:10:45 self = 
>  testMethod=test_process_function>
> 2022-09-29T02:10:45.3572279Z Sep 29 02:10:45 
> 2022-09-29T02:10:45.3572810Z Sep 29 02:10:45 def 
> test_process_function(self):
> 2022-09-29T02:10:45.3573495Z Sep 29 02:10:45 
> self.env.set_parallelism(1)
> 2022-09-29T02:10:45.3574148Z Sep 29 02:10:45 
> self.env.get_config().set_auto_watermark_interval(2000)
> 2022-09-29T02:10:45.3580634Z Sep 29 02:10:45 
> self.env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
> 2022-09-29T02:10:45.3583194Z Sep 29 02:10:45 data_stream = 
> self.env.from_collection([(1, '1603708211000'),
> 2022-09-29T02:10:45.3584515Z Sep 29 02:10:45  
>(2, '1603708224000'),
> 2022-09-29T02:10:45.3585957Z Sep 29 02:10:45  
>(3, '1603708226000'),
> 2022-09-29T02:10:45.3587132Z Sep 29 02:10:45  
>(4, '1603708289000')],
> 2022-09-29T02:10:45.3588094Z Sep 29 02:10:45  
>   type_info=Types.ROW([Types.INT(), Types.STRING()]))
> 2022-09-29T02:10:45.3589090Z Sep 29 02:10:45 
> 2022-09-29T02:10:45.3589949Z Sep 29 02:10:45 class 
> MyProcessFunction(ProcessFunction):
> 2022-09-29T02:10:45.3590710Z Sep 29 02:10:45 
> 2022-09-29T02:10:45.3591856Z Sep 29 02:10:45 def 
> process_element(self, value, ctx):
> 2022-09-29T02:10:45.3592873Z Sep 29 02:10:45 
> current_timestamp = ctx.timestamp()
> 2022-09-29T02:10:45.3593862Z Sep 29 02:10:45 
> current_watermark = ctx.timer_service().current_watermark()
> 2022-09-29T02:10:45.3594915Z Sep 29 02:10:45 yield "current 
> timestamp: {}, current watermark: {}, current_value: {}"\
> 2022-09-29T02:10:45.3596201Z Sep 29 02:10:45 
> .format(str(current_timestamp), str(current_watermark), str(value))
> 2022-09-29T02:10:45.3597089Z Sep 29 02:10:45 
> 2022-09-29T02:10:45.3597942Z Sep 29 02:10:45 watermark_strategy = 
> WatermarkStrategy.for_monotonous_timestamps()\
> 2022-09-29T02:10:45.3599260Z Sep 29 02:10:45 
> .with_timestamp_assigner(SecondColumnTimestampAssigner())
> 2022-09-29T02:10:45.3600611Z Sep 29 02:10:45 
> data_stream.assign_timestamps_and_watermarks(watermark_strategy)\
> 2022-09-29T02:10:45.3601877Z Sep 29 02:10:45 
> .process(MyProcessFunction(), 
> output_type=Types.STRING()).add_sink(self.test_sink)
> 2022-09-29T02:10:45.3603527Z Sep 29 02:10:45 self.env.execute('test 
> process function')
> 2022-09-29T02:10:45.3604445Z Sep 29 02:10:45 results = 
> self.test_sink.get_results()
> 2022-09-29T02:10:45.3605684Z Sep 29 02:10:45 expected = ["current 
> timestamp: 1603708211000, current watermark: "
> 2022-09-29T02:10:45.3607157Z Sep 29 02:10:45 
> "-9223372036854775808, current_value: Row(f0=1, f1='1603708211000')",
> 2022-09-29T02:10:45.3608256Z Sep 29 02:10:45 "current 
> timestamp: 1603708224000, current watermark: "
> 2022-09-29T02:10:45.3609650Z Sep 29 02:10:45 
> "-9223372036854775808, current_value: Row(f0=2, f1='1603708224000')",
> 2022-09-29T02:10:45.3610854Z Sep 29 02:10:45 "current 
> timestamp: 1603708226000, current watermark: "
> 2022-09-29T02:10:45.3612279Z Sep 29 02:10:45 
> "-9223372036854775808, current_value: Row(f0=3, f1='1603708226000')",
> 2022-09-29T02:10:45.3613382Z Sep 29 02:10:45 "current 
> timestamp: 1603708289000, current watermark: "
> 2022-09-29T02:10:45.3615683Z Sep 29 02:10:45 
> "-9223372036854775808, current_value: Row(f0=4, f1='1603708289000')"]
> 2022-09-29T02:10:45.3617687Z Sep 29 02:10:45 >   
> self.assert_equals_sorted(expected, results)
> 2022-09-29T02:10:45.3618620Z Sep 29 02:10:45 
> 2022-09-29T02:10:45.3619425Z Sep 29 02:10:45 
> 

[jira] [Commented] (FLINK-29461) ProcessDataStreamStreamingTests.test_process_function unstable

2022-12-05 Thread Matthias Pohl (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17643194#comment-17643194
 ] 

Matthias Pohl commented on FLINK-29461:
---

https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=43692=logs=bf5e383b-9fd3-5f02-ca1c-8f788e2e76d3=85189c57-d8a0-5c9c-b61d-fc05cfac62cf=26932

> ProcessDataStreamStreamingTests.test_process_function unstable
> --
>
> Key: FLINK-29461
> URL: https://issues.apache.org/jira/browse/FLINK-29461
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python
>Affects Versions: 1.16.0, 1.17.0
>Reporter: Huang Xingbo
>Assignee: Huang Xingbo
>Priority: Critical
>  Labels: test-stability
>
> {code:java}
> 2022-09-29T02:10:45.3571648Z Sep 29 02:10:45 self = 
>  testMethod=test_process_function>
> 2022-09-29T02:10:45.3572279Z Sep 29 02:10:45 
> 2022-09-29T02:10:45.3572810Z Sep 29 02:10:45 def 
> test_process_function(self):
> 2022-09-29T02:10:45.3573495Z Sep 29 02:10:45 
> self.env.set_parallelism(1)
> 2022-09-29T02:10:45.3574148Z Sep 29 02:10:45 
> self.env.get_config().set_auto_watermark_interval(2000)
> 2022-09-29T02:10:45.3580634Z Sep 29 02:10:45 
> self.env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
> 2022-09-29T02:10:45.3583194Z Sep 29 02:10:45 data_stream = 
> self.env.from_collection([(1, '1603708211000'),
> 2022-09-29T02:10:45.3584515Z Sep 29 02:10:45  
>(2, '1603708224000'),
> 2022-09-29T02:10:45.3585957Z Sep 29 02:10:45  
>(3, '1603708226000'),
> 2022-09-29T02:10:45.3587132Z Sep 29 02:10:45  
>(4, '1603708289000')],
> 2022-09-29T02:10:45.3588094Z Sep 29 02:10:45  
>   type_info=Types.ROW([Types.INT(), Types.STRING()]))
> 2022-09-29T02:10:45.3589090Z Sep 29 02:10:45 
> 2022-09-29T02:10:45.3589949Z Sep 29 02:10:45 class 
> MyProcessFunction(ProcessFunction):
> 2022-09-29T02:10:45.3590710Z Sep 29 02:10:45 
> 2022-09-29T02:10:45.3591856Z Sep 29 02:10:45 def 
> process_element(self, value, ctx):
> 2022-09-29T02:10:45.3592873Z Sep 29 02:10:45 
> current_timestamp = ctx.timestamp()
> 2022-09-29T02:10:45.3593862Z Sep 29 02:10:45 
> current_watermark = ctx.timer_service().current_watermark()
> 2022-09-29T02:10:45.3594915Z Sep 29 02:10:45 yield "current 
> timestamp: {}, current watermark: {}, current_value: {}"\
> 2022-09-29T02:10:45.3596201Z Sep 29 02:10:45 
> .format(str(current_timestamp), str(current_watermark), str(value))
> 2022-09-29T02:10:45.3597089Z Sep 29 02:10:45 
> 2022-09-29T02:10:45.3597942Z Sep 29 02:10:45 watermark_strategy = 
> WatermarkStrategy.for_monotonous_timestamps()\
> 2022-09-29T02:10:45.3599260Z Sep 29 02:10:45 
> .with_timestamp_assigner(SecondColumnTimestampAssigner())
> 2022-09-29T02:10:45.3600611Z Sep 29 02:10:45 
> data_stream.assign_timestamps_and_watermarks(watermark_strategy)\
> 2022-09-29T02:10:45.3601877Z Sep 29 02:10:45 
> .process(MyProcessFunction(), 
> output_type=Types.STRING()).add_sink(self.test_sink)
> 2022-09-29T02:10:45.3603527Z Sep 29 02:10:45 self.env.execute('test 
> process function')
> 2022-09-29T02:10:45.3604445Z Sep 29 02:10:45 results = 
> self.test_sink.get_results()
> 2022-09-29T02:10:45.3605684Z Sep 29 02:10:45 expected = ["current 
> timestamp: 1603708211000, current watermark: "
> 2022-09-29T02:10:45.3607157Z Sep 29 02:10:45 
> "-9223372036854775808, current_value: Row(f0=1, f1='1603708211000')",
> 2022-09-29T02:10:45.3608256Z Sep 29 02:10:45 "current 
> timestamp: 1603708224000, current watermark: "
> 2022-09-29T02:10:45.3609650Z Sep 29 02:10:45 
> "-9223372036854775808, current_value: Row(f0=2, f1='1603708224000')",
> 2022-09-29T02:10:45.3610854Z Sep 29 02:10:45 "current 
> timestamp: 1603708226000, current watermark: "
> 2022-09-29T02:10:45.3612279Z Sep 29 02:10:45 
> "-9223372036854775808, current_value: Row(f0=3, f1='1603708226000')",
> 2022-09-29T02:10:45.3613382Z Sep 29 02:10:45 "current 
> timestamp: 1603708289000, current watermark: "
> 2022-09-29T02:10:45.3615683Z Sep 29 02:10:45 
> "-9223372036854775808, current_value: Row(f0=4, f1='1603708289000')"]
> 2022-09-29T02:10:45.3617687Z Sep 29 02:10:45 >   
> self.assert_equals_sorted(expected, results)
> 2022-09-29T02:10:45.3618620Z Sep 29 02:10:45 
> 2022-09-29T02:10:45.3619425Z Sep 29 02:10:45 
> 

[jira] [Commented] (FLINK-29461) ProcessDataStreamStreamingTests.test_process_function unstable

2022-12-02 Thread Matthias Pohl (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17642448#comment-17642448
 ] 

Matthias Pohl commented on FLINK-29461:
---

https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=43664=logs=3e4dd1a2-fe2f-5e5d-a581-48087e718d53=b4612f28-e3b5-5853-8a8b-610ae894217a=28979

> ProcessDataStreamStreamingTests.test_process_function unstable
> --
>
> Key: FLINK-29461
> URL: https://issues.apache.org/jira/browse/FLINK-29461
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python
>Affects Versions: 1.16.0, 1.17.0
>Reporter: Huang Xingbo
>Assignee: Huang Xingbo
>Priority: Critical
>  Labels: test-stability
>
> {code:java}
> 2022-09-29T02:10:45.3571648Z Sep 29 02:10:45 self = 
>  testMethod=test_process_function>
> 2022-09-29T02:10:45.3572279Z Sep 29 02:10:45 
> 2022-09-29T02:10:45.3572810Z Sep 29 02:10:45 def 
> test_process_function(self):
> 2022-09-29T02:10:45.3573495Z Sep 29 02:10:45 
> self.env.set_parallelism(1)
> 2022-09-29T02:10:45.3574148Z Sep 29 02:10:45 
> self.env.get_config().set_auto_watermark_interval(2000)
> 2022-09-29T02:10:45.3580634Z Sep 29 02:10:45 
> self.env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
> 2022-09-29T02:10:45.3583194Z Sep 29 02:10:45 data_stream = 
> self.env.from_collection([(1, '1603708211000'),
> 2022-09-29T02:10:45.3584515Z Sep 29 02:10:45  
>(2, '1603708224000'),
> 2022-09-29T02:10:45.3585957Z Sep 29 02:10:45  
>(3, '1603708226000'),
> 2022-09-29T02:10:45.3587132Z Sep 29 02:10:45  
>(4, '1603708289000')],
> 2022-09-29T02:10:45.3588094Z Sep 29 02:10:45  
>   type_info=Types.ROW([Types.INT(), Types.STRING()]))
> 2022-09-29T02:10:45.3589090Z Sep 29 02:10:45 
> 2022-09-29T02:10:45.3589949Z Sep 29 02:10:45 class 
> MyProcessFunction(ProcessFunction):
> 2022-09-29T02:10:45.3590710Z Sep 29 02:10:45 
> 2022-09-29T02:10:45.3591856Z Sep 29 02:10:45 def 
> process_element(self, value, ctx):
> 2022-09-29T02:10:45.3592873Z Sep 29 02:10:45 
> current_timestamp = ctx.timestamp()
> 2022-09-29T02:10:45.3593862Z Sep 29 02:10:45 
> current_watermark = ctx.timer_service().current_watermark()
> 2022-09-29T02:10:45.3594915Z Sep 29 02:10:45 yield "current 
> timestamp: {}, current watermark: {}, current_value: {}"\
> 2022-09-29T02:10:45.3596201Z Sep 29 02:10:45 
> .format(str(current_timestamp), str(current_watermark), str(value))
> 2022-09-29T02:10:45.3597089Z Sep 29 02:10:45 
> 2022-09-29T02:10:45.3597942Z Sep 29 02:10:45 watermark_strategy = 
> WatermarkStrategy.for_monotonous_timestamps()\
> 2022-09-29T02:10:45.3599260Z Sep 29 02:10:45 
> .with_timestamp_assigner(SecondColumnTimestampAssigner())
> 2022-09-29T02:10:45.3600611Z Sep 29 02:10:45 
> data_stream.assign_timestamps_and_watermarks(watermark_strategy)\
> 2022-09-29T02:10:45.3601877Z Sep 29 02:10:45 
> .process(MyProcessFunction(), 
> output_type=Types.STRING()).add_sink(self.test_sink)
> 2022-09-29T02:10:45.3603527Z Sep 29 02:10:45 self.env.execute('test 
> process function')
> 2022-09-29T02:10:45.3604445Z Sep 29 02:10:45 results = 
> self.test_sink.get_results()
> 2022-09-29T02:10:45.3605684Z Sep 29 02:10:45 expected = ["current 
> timestamp: 1603708211000, current watermark: "
> 2022-09-29T02:10:45.3607157Z Sep 29 02:10:45 
> "-9223372036854775808, current_value: Row(f0=1, f1='1603708211000')",
> 2022-09-29T02:10:45.3608256Z Sep 29 02:10:45 "current 
> timestamp: 1603708224000, current watermark: "
> 2022-09-29T02:10:45.3609650Z Sep 29 02:10:45 
> "-9223372036854775808, current_value: Row(f0=2, f1='1603708224000')",
> 2022-09-29T02:10:45.3610854Z Sep 29 02:10:45 "current 
> timestamp: 1603708226000, current watermark: "
> 2022-09-29T02:10:45.3612279Z Sep 29 02:10:45 
> "-9223372036854775808, current_value: Row(f0=3, f1='1603708226000')",
> 2022-09-29T02:10:45.3613382Z Sep 29 02:10:45 "current 
> timestamp: 1603708289000, current watermark: "
> 2022-09-29T02:10:45.3615683Z Sep 29 02:10:45 
> "-9223372036854775808, current_value: Row(f0=4, f1='1603708289000')"]
> 2022-09-29T02:10:45.3617687Z Sep 29 02:10:45 >   
> self.assert_equals_sorted(expected, results)
> 2022-09-29T02:10:45.3618620Z Sep 29 02:10:45 
> 2022-09-29T02:10:45.3619425Z Sep 29 02:10:45 
> 

[jira] [Commented] (FLINK-29461) ProcessDataStreamStreamingTests.test_process_function unstable

2022-12-02 Thread Matthias Pohl (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17642430#comment-17642430
 ] 

Matthias Pohl commented on FLINK-29461:
---

https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=43662=logs=e92ecf6d-e207-5a42-7ff7-528ff0c5b259=40fc352e-9b4c-5fd8-363f-628f24b01ec2=28078

> ProcessDataStreamStreamingTests.test_process_function unstable
> --
>
> Key: FLINK-29461
> URL: https://issues.apache.org/jira/browse/FLINK-29461
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python
>Affects Versions: 1.16.0, 1.17.0
>Reporter: Huang Xingbo
>Assignee: Huang Xingbo
>Priority: Critical
>  Labels: test-stability
>
> {code:java}
> 2022-09-29T02:10:45.3571648Z Sep 29 02:10:45 self = 
>  testMethod=test_process_function>
> 2022-09-29T02:10:45.3572279Z Sep 29 02:10:45 
> 2022-09-29T02:10:45.3572810Z Sep 29 02:10:45 def 
> test_process_function(self):
> 2022-09-29T02:10:45.3573495Z Sep 29 02:10:45 
> self.env.set_parallelism(1)
> 2022-09-29T02:10:45.3574148Z Sep 29 02:10:45 
> self.env.get_config().set_auto_watermark_interval(2000)
> 2022-09-29T02:10:45.3580634Z Sep 29 02:10:45 
> self.env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
> 2022-09-29T02:10:45.3583194Z Sep 29 02:10:45 data_stream = 
> self.env.from_collection([(1, '1603708211000'),
> 2022-09-29T02:10:45.3584515Z Sep 29 02:10:45  
>(2, '1603708224000'),
> 2022-09-29T02:10:45.3585957Z Sep 29 02:10:45  
>(3, '1603708226000'),
> 2022-09-29T02:10:45.3587132Z Sep 29 02:10:45  
>(4, '1603708289000')],
> 2022-09-29T02:10:45.3588094Z Sep 29 02:10:45  
>   type_info=Types.ROW([Types.INT(), Types.STRING()]))
> 2022-09-29T02:10:45.3589090Z Sep 29 02:10:45 
> 2022-09-29T02:10:45.3589949Z Sep 29 02:10:45 class 
> MyProcessFunction(ProcessFunction):
> 2022-09-29T02:10:45.3590710Z Sep 29 02:10:45 
> 2022-09-29T02:10:45.3591856Z Sep 29 02:10:45 def 
> process_element(self, value, ctx):
> 2022-09-29T02:10:45.3592873Z Sep 29 02:10:45 
> current_timestamp = ctx.timestamp()
> 2022-09-29T02:10:45.3593862Z Sep 29 02:10:45 
> current_watermark = ctx.timer_service().current_watermark()
> 2022-09-29T02:10:45.3594915Z Sep 29 02:10:45 yield "current 
> timestamp: {}, current watermark: {}, current_value: {}"\
> 2022-09-29T02:10:45.3596201Z Sep 29 02:10:45 
> .format(str(current_timestamp), str(current_watermark), str(value))
> 2022-09-29T02:10:45.3597089Z Sep 29 02:10:45 
> 2022-09-29T02:10:45.3597942Z Sep 29 02:10:45 watermark_strategy = 
> WatermarkStrategy.for_monotonous_timestamps()\
> 2022-09-29T02:10:45.3599260Z Sep 29 02:10:45 
> .with_timestamp_assigner(SecondColumnTimestampAssigner())
> 2022-09-29T02:10:45.3600611Z Sep 29 02:10:45 
> data_stream.assign_timestamps_and_watermarks(watermark_strategy)\
> 2022-09-29T02:10:45.3601877Z Sep 29 02:10:45 
> .process(MyProcessFunction(), 
> output_type=Types.STRING()).add_sink(self.test_sink)
> 2022-09-29T02:10:45.3603527Z Sep 29 02:10:45 self.env.execute('test 
> process function')
> 2022-09-29T02:10:45.3604445Z Sep 29 02:10:45 results = 
> self.test_sink.get_results()
> 2022-09-29T02:10:45.3605684Z Sep 29 02:10:45 expected = ["current 
> timestamp: 1603708211000, current watermark: "
> 2022-09-29T02:10:45.3607157Z Sep 29 02:10:45 
> "-9223372036854775808, current_value: Row(f0=1, f1='1603708211000')",
> 2022-09-29T02:10:45.3608256Z Sep 29 02:10:45 "current 
> timestamp: 1603708224000, current watermark: "
> 2022-09-29T02:10:45.3609650Z Sep 29 02:10:45 
> "-9223372036854775808, current_value: Row(f0=2, f1='1603708224000')",
> 2022-09-29T02:10:45.3610854Z Sep 29 02:10:45 "current 
> timestamp: 1603708226000, current watermark: "
> 2022-09-29T02:10:45.3612279Z Sep 29 02:10:45 
> "-9223372036854775808, current_value: Row(f0=3, f1='1603708226000')",
> 2022-09-29T02:10:45.3613382Z Sep 29 02:10:45 "current 
> timestamp: 1603708289000, current watermark: "
> 2022-09-29T02:10:45.3615683Z Sep 29 02:10:45 
> "-9223372036854775808, current_value: Row(f0=4, f1='1603708289000')"]
> 2022-09-29T02:10:45.3617687Z Sep 29 02:10:45 >   
> self.assert_equals_sorted(expected, results)
> 2022-09-29T02:10:45.3618620Z Sep 29 02:10:45 
> 2022-09-29T02:10:45.3619425Z Sep 29 02:10:45 
> 

[jira] [Commented] (FLINK-29461) ProcessDataStreamStreamingTests.test_process_function unstable

2022-12-01 Thread Martijn Visser (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17641775#comment-17641775
 ] 

Martijn Visser commented on FLINK-29461:


https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=43636=logs=e92ecf6d-e207-5a42-7ff7-528ff0c5b259=40fc352e-9b4c-5fd8-363f-628f24b01ec2=27587

> ProcessDataStreamStreamingTests.test_process_function unstable
> --
>
> Key: FLINK-29461
> URL: https://issues.apache.org/jira/browse/FLINK-29461
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python
>Affects Versions: 1.16.0, 1.17.0
>Reporter: Huang Xingbo
>Assignee: Huang Xingbo
>Priority: Critical
>  Labels: test-stability
>
> {code:java}
> 2022-09-29T02:10:45.3571648Z Sep 29 02:10:45 self = 
>  testMethod=test_process_function>
> 2022-09-29T02:10:45.3572279Z Sep 29 02:10:45 
> 2022-09-29T02:10:45.3572810Z Sep 29 02:10:45 def 
> test_process_function(self):
> 2022-09-29T02:10:45.3573495Z Sep 29 02:10:45 
> self.env.set_parallelism(1)
> 2022-09-29T02:10:45.3574148Z Sep 29 02:10:45 
> self.env.get_config().set_auto_watermark_interval(2000)
> 2022-09-29T02:10:45.3580634Z Sep 29 02:10:45 
> self.env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
> 2022-09-29T02:10:45.3583194Z Sep 29 02:10:45 data_stream = 
> self.env.from_collection([(1, '1603708211000'),
> 2022-09-29T02:10:45.3584515Z Sep 29 02:10:45  
>(2, '1603708224000'),
> 2022-09-29T02:10:45.3585957Z Sep 29 02:10:45  
>(3, '1603708226000'),
> 2022-09-29T02:10:45.3587132Z Sep 29 02:10:45  
>(4, '1603708289000')],
> 2022-09-29T02:10:45.3588094Z Sep 29 02:10:45  
>   type_info=Types.ROW([Types.INT(), Types.STRING()]))
> 2022-09-29T02:10:45.3589090Z Sep 29 02:10:45 
> 2022-09-29T02:10:45.3589949Z Sep 29 02:10:45 class 
> MyProcessFunction(ProcessFunction):
> 2022-09-29T02:10:45.3590710Z Sep 29 02:10:45 
> 2022-09-29T02:10:45.3591856Z Sep 29 02:10:45 def 
> process_element(self, value, ctx):
> 2022-09-29T02:10:45.3592873Z Sep 29 02:10:45 
> current_timestamp = ctx.timestamp()
> 2022-09-29T02:10:45.3593862Z Sep 29 02:10:45 
> current_watermark = ctx.timer_service().current_watermark()
> 2022-09-29T02:10:45.3594915Z Sep 29 02:10:45 yield "current 
> timestamp: {}, current watermark: {}, current_value: {}"\
> 2022-09-29T02:10:45.3596201Z Sep 29 02:10:45 
> .format(str(current_timestamp), str(current_watermark), str(value))
> 2022-09-29T02:10:45.3597089Z Sep 29 02:10:45 
> 2022-09-29T02:10:45.3597942Z Sep 29 02:10:45 watermark_strategy = 
> WatermarkStrategy.for_monotonous_timestamps()\
> 2022-09-29T02:10:45.3599260Z Sep 29 02:10:45 
> .with_timestamp_assigner(SecondColumnTimestampAssigner())
> 2022-09-29T02:10:45.3600611Z Sep 29 02:10:45 
> data_stream.assign_timestamps_and_watermarks(watermark_strategy)\
> 2022-09-29T02:10:45.3601877Z Sep 29 02:10:45 
> .process(MyProcessFunction(), 
> output_type=Types.STRING()).add_sink(self.test_sink)
> 2022-09-29T02:10:45.3603527Z Sep 29 02:10:45 self.env.execute('test 
> process function')
> 2022-09-29T02:10:45.3604445Z Sep 29 02:10:45 results = 
> self.test_sink.get_results()
> 2022-09-29T02:10:45.3605684Z Sep 29 02:10:45 expected = ["current 
> timestamp: 1603708211000, current watermark: "
> 2022-09-29T02:10:45.3607157Z Sep 29 02:10:45 
> "-9223372036854775808, current_value: Row(f0=1, f1='1603708211000')",
> 2022-09-29T02:10:45.3608256Z Sep 29 02:10:45 "current 
> timestamp: 1603708224000, current watermark: "
> 2022-09-29T02:10:45.3609650Z Sep 29 02:10:45 
> "-9223372036854775808, current_value: Row(f0=2, f1='1603708224000')",
> 2022-09-29T02:10:45.3610854Z Sep 29 02:10:45 "current 
> timestamp: 1603708226000, current watermark: "
> 2022-09-29T02:10:45.3612279Z Sep 29 02:10:45 
> "-9223372036854775808, current_value: Row(f0=3, f1='1603708226000')",
> 2022-09-29T02:10:45.3613382Z Sep 29 02:10:45 "current 
> timestamp: 1603708289000, current watermark: "
> 2022-09-29T02:10:45.3615683Z Sep 29 02:10:45 
> "-9223372036854775808, current_value: Row(f0=4, f1='1603708289000')"]
> 2022-09-29T02:10:45.3617687Z Sep 29 02:10:45 >   
> self.assert_equals_sorted(expected, results)
> 2022-09-29T02:10:45.3618620Z Sep 29 02:10:45 
> 2022-09-29T02:10:45.3619425Z Sep 29 02:10:45 
> 

[jira] [Commented] (FLINK-29461) ProcessDataStreamStreamingTests.test_process_function unstable

2022-11-29 Thread Matthias Pohl (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17641099#comment-17641099
 ] 

Matthias Pohl commented on FLINK-29461:
---

No worries. Thanks for picking it up. :)

https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=43604=logs=e92ecf6d-e207-5a42-7ff7-528ff0c5b259=40fc352e-9b4c-5fd8-363f-628f24b01ec2=29496

> ProcessDataStreamStreamingTests.test_process_function unstable
> --
>
> Key: FLINK-29461
> URL: https://issues.apache.org/jira/browse/FLINK-29461
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python
>Affects Versions: 1.16.0, 1.17.0
>Reporter: Huang Xingbo
>Assignee: Huang Xingbo
>Priority: Critical
>  Labels: test-stability
>
> {code:java}
> 2022-09-29T02:10:45.3571648Z Sep 29 02:10:45 self = 
>  testMethod=test_process_function>
> 2022-09-29T02:10:45.3572279Z Sep 29 02:10:45 
> 2022-09-29T02:10:45.3572810Z Sep 29 02:10:45 def 
> test_process_function(self):
> 2022-09-29T02:10:45.3573495Z Sep 29 02:10:45 
> self.env.set_parallelism(1)
> 2022-09-29T02:10:45.3574148Z Sep 29 02:10:45 
> self.env.get_config().set_auto_watermark_interval(2000)
> 2022-09-29T02:10:45.3580634Z Sep 29 02:10:45 
> self.env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
> 2022-09-29T02:10:45.3583194Z Sep 29 02:10:45 data_stream = 
> self.env.from_collection([(1, '1603708211000'),
> 2022-09-29T02:10:45.3584515Z Sep 29 02:10:45  
>(2, '1603708224000'),
> 2022-09-29T02:10:45.3585957Z Sep 29 02:10:45  
>(3, '1603708226000'),
> 2022-09-29T02:10:45.3587132Z Sep 29 02:10:45  
>(4, '1603708289000')],
> 2022-09-29T02:10:45.3588094Z Sep 29 02:10:45  
>   type_info=Types.ROW([Types.INT(), Types.STRING()]))
> 2022-09-29T02:10:45.3589090Z Sep 29 02:10:45 
> 2022-09-29T02:10:45.3589949Z Sep 29 02:10:45 class 
> MyProcessFunction(ProcessFunction):
> 2022-09-29T02:10:45.3590710Z Sep 29 02:10:45 
> 2022-09-29T02:10:45.3591856Z Sep 29 02:10:45 def 
> process_element(self, value, ctx):
> 2022-09-29T02:10:45.3592873Z Sep 29 02:10:45 
> current_timestamp = ctx.timestamp()
> 2022-09-29T02:10:45.3593862Z Sep 29 02:10:45 
> current_watermark = ctx.timer_service().current_watermark()
> 2022-09-29T02:10:45.3594915Z Sep 29 02:10:45 yield "current 
> timestamp: {}, current watermark: {}, current_value: {}"\
> 2022-09-29T02:10:45.3596201Z Sep 29 02:10:45 
> .format(str(current_timestamp), str(current_watermark), str(value))
> 2022-09-29T02:10:45.3597089Z Sep 29 02:10:45 
> 2022-09-29T02:10:45.3597942Z Sep 29 02:10:45 watermark_strategy = 
> WatermarkStrategy.for_monotonous_timestamps()\
> 2022-09-29T02:10:45.3599260Z Sep 29 02:10:45 
> .with_timestamp_assigner(SecondColumnTimestampAssigner())
> 2022-09-29T02:10:45.3600611Z Sep 29 02:10:45 
> data_stream.assign_timestamps_and_watermarks(watermark_strategy)\
> 2022-09-29T02:10:45.3601877Z Sep 29 02:10:45 
> .process(MyProcessFunction(), 
> output_type=Types.STRING()).add_sink(self.test_sink)
> 2022-09-29T02:10:45.3603527Z Sep 29 02:10:45 self.env.execute('test 
> process function')
> 2022-09-29T02:10:45.3604445Z Sep 29 02:10:45 results = 
> self.test_sink.get_results()
> 2022-09-29T02:10:45.3605684Z Sep 29 02:10:45 expected = ["current 
> timestamp: 1603708211000, current watermark: "
> 2022-09-29T02:10:45.3607157Z Sep 29 02:10:45 
> "-9223372036854775808, current_value: Row(f0=1, f1='1603708211000')",
> 2022-09-29T02:10:45.3608256Z Sep 29 02:10:45 "current 
> timestamp: 1603708224000, current watermark: "
> 2022-09-29T02:10:45.3609650Z Sep 29 02:10:45 
> "-9223372036854775808, current_value: Row(f0=2, f1='1603708224000')",
> 2022-09-29T02:10:45.3610854Z Sep 29 02:10:45 "current 
> timestamp: 1603708226000, current watermark: "
> 2022-09-29T02:10:45.3612279Z Sep 29 02:10:45 
> "-9223372036854775808, current_value: Row(f0=3, f1='1603708226000')",
> 2022-09-29T02:10:45.3613382Z Sep 29 02:10:45 "current 
> timestamp: 1603708289000, current watermark: "
> 2022-09-29T02:10:45.3615683Z Sep 29 02:10:45 
> "-9223372036854775808, current_value: Row(f0=4, f1='1603708289000')"]
> 2022-09-29T02:10:45.3617687Z Sep 29 02:10:45 >   
> self.assert_equals_sorted(expected, results)
> 2022-09-29T02:10:45.3618620Z Sep 29 02:10:45 
> 2022-09-29T02:10:45.3619425Z Sep 29 02:10:45 

[jira] [Commented] (FLINK-29461) ProcessDataStreamStreamingTests.test_process_function unstable

2022-11-29 Thread Xingbo Huang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17640473#comment-17640473
 ] 

Xingbo Huang commented on FLINK-29461:
--

[~mapohl] Sorry for the late reply, I will take a look into this issue in these 
two days.

> ProcessDataStreamStreamingTests.test_process_function unstable
> --
>
> Key: FLINK-29461
> URL: https://issues.apache.org/jira/browse/FLINK-29461
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python
>Affects Versions: 1.16.0, 1.17.0
>Reporter: Huang Xingbo
>Assignee: Huang Xingbo
>Priority: Critical
>  Labels: test-stability
>
> {code:java}
> 2022-09-29T02:10:45.3571648Z Sep 29 02:10:45 self = 
>  testMethod=test_process_function>
> 2022-09-29T02:10:45.3572279Z Sep 29 02:10:45 
> 2022-09-29T02:10:45.3572810Z Sep 29 02:10:45 def 
> test_process_function(self):
> 2022-09-29T02:10:45.3573495Z Sep 29 02:10:45 
> self.env.set_parallelism(1)
> 2022-09-29T02:10:45.3574148Z Sep 29 02:10:45 
> self.env.get_config().set_auto_watermark_interval(2000)
> 2022-09-29T02:10:45.3580634Z Sep 29 02:10:45 
> self.env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
> 2022-09-29T02:10:45.3583194Z Sep 29 02:10:45 data_stream = 
> self.env.from_collection([(1, '1603708211000'),
> 2022-09-29T02:10:45.3584515Z Sep 29 02:10:45  
>(2, '1603708224000'),
> 2022-09-29T02:10:45.3585957Z Sep 29 02:10:45  
>(3, '1603708226000'),
> 2022-09-29T02:10:45.3587132Z Sep 29 02:10:45  
>(4, '1603708289000')],
> 2022-09-29T02:10:45.3588094Z Sep 29 02:10:45  
>   type_info=Types.ROW([Types.INT(), Types.STRING()]))
> 2022-09-29T02:10:45.3589090Z Sep 29 02:10:45 
> 2022-09-29T02:10:45.3589949Z Sep 29 02:10:45 class 
> MyProcessFunction(ProcessFunction):
> 2022-09-29T02:10:45.3590710Z Sep 29 02:10:45 
> 2022-09-29T02:10:45.3591856Z Sep 29 02:10:45 def 
> process_element(self, value, ctx):
> 2022-09-29T02:10:45.3592873Z Sep 29 02:10:45 
> current_timestamp = ctx.timestamp()
> 2022-09-29T02:10:45.3593862Z Sep 29 02:10:45 
> current_watermark = ctx.timer_service().current_watermark()
> 2022-09-29T02:10:45.3594915Z Sep 29 02:10:45 yield "current 
> timestamp: {}, current watermark: {}, current_value: {}"\
> 2022-09-29T02:10:45.3596201Z Sep 29 02:10:45 
> .format(str(current_timestamp), str(current_watermark), str(value))
> 2022-09-29T02:10:45.3597089Z Sep 29 02:10:45 
> 2022-09-29T02:10:45.3597942Z Sep 29 02:10:45 watermark_strategy = 
> WatermarkStrategy.for_monotonous_timestamps()\
> 2022-09-29T02:10:45.3599260Z Sep 29 02:10:45 
> .with_timestamp_assigner(SecondColumnTimestampAssigner())
> 2022-09-29T02:10:45.3600611Z Sep 29 02:10:45 
> data_stream.assign_timestamps_and_watermarks(watermark_strategy)\
> 2022-09-29T02:10:45.3601877Z Sep 29 02:10:45 
> .process(MyProcessFunction(), 
> output_type=Types.STRING()).add_sink(self.test_sink)
> 2022-09-29T02:10:45.3603527Z Sep 29 02:10:45 self.env.execute('test 
> process function')
> 2022-09-29T02:10:45.3604445Z Sep 29 02:10:45 results = 
> self.test_sink.get_results()
> 2022-09-29T02:10:45.3605684Z Sep 29 02:10:45 expected = ["current 
> timestamp: 1603708211000, current watermark: "
> 2022-09-29T02:10:45.3607157Z Sep 29 02:10:45 
> "-9223372036854775808, current_value: Row(f0=1, f1='1603708211000')",
> 2022-09-29T02:10:45.3608256Z Sep 29 02:10:45 "current 
> timestamp: 1603708224000, current watermark: "
> 2022-09-29T02:10:45.3609650Z Sep 29 02:10:45 
> "-9223372036854775808, current_value: Row(f0=2, f1='1603708224000')",
> 2022-09-29T02:10:45.3610854Z Sep 29 02:10:45 "current 
> timestamp: 1603708226000, current watermark: "
> 2022-09-29T02:10:45.3612279Z Sep 29 02:10:45 
> "-9223372036854775808, current_value: Row(f0=3, f1='1603708226000')",
> 2022-09-29T02:10:45.3613382Z Sep 29 02:10:45 "current 
> timestamp: 1603708289000, current watermark: "
> 2022-09-29T02:10:45.3615683Z Sep 29 02:10:45 
> "-9223372036854775808, current_value: Row(f0=4, f1='1603708289000')"]
> 2022-09-29T02:10:45.3617687Z Sep 29 02:10:45 >   
> self.assert_equals_sorted(expected, results)
> 2022-09-29T02:10:45.3618620Z Sep 29 02:10:45 
> 2022-09-29T02:10:45.3619425Z Sep 29 02:10:45 
> pyflink/datastream/tests/test_data_stream.py:986: 
> 2022-09-29T02:10:45.3620424Z Sep 29 02:10:45 _ _ _ _ _ _ _ 

[jira] [Commented] (FLINK-29461) ProcessDataStreamStreamingTests.test_process_function unstable

2022-11-28 Thread Matthias Pohl (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17640409#comment-17640409
 ] 

Matthias Pohl commented on FLINK-29461:
---

https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=43572=logs=bf5e383b-9fd3-5f02-ca1c-8f788e2e76d3=85189c57-d8a0-5c9c-b61d-fc05cfac62cf=27194

> ProcessDataStreamStreamingTests.test_process_function unstable
> --
>
> Key: FLINK-29461
> URL: https://issues.apache.org/jira/browse/FLINK-29461
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python
>Affects Versions: 1.16.0, 1.17.0
>Reporter: Huang Xingbo
>Assignee: Huang Xingbo
>Priority: Critical
>  Labels: test-stability
>
> {code:java}
> 2022-09-29T02:10:45.3571648Z Sep 29 02:10:45 self = 
>  testMethod=test_process_function>
> 2022-09-29T02:10:45.3572279Z Sep 29 02:10:45 
> 2022-09-29T02:10:45.3572810Z Sep 29 02:10:45 def 
> test_process_function(self):
> 2022-09-29T02:10:45.3573495Z Sep 29 02:10:45 
> self.env.set_parallelism(1)
> 2022-09-29T02:10:45.3574148Z Sep 29 02:10:45 
> self.env.get_config().set_auto_watermark_interval(2000)
> 2022-09-29T02:10:45.3580634Z Sep 29 02:10:45 
> self.env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
> 2022-09-29T02:10:45.3583194Z Sep 29 02:10:45 data_stream = 
> self.env.from_collection([(1, '1603708211000'),
> 2022-09-29T02:10:45.3584515Z Sep 29 02:10:45  
>(2, '1603708224000'),
> 2022-09-29T02:10:45.3585957Z Sep 29 02:10:45  
>(3, '1603708226000'),
> 2022-09-29T02:10:45.3587132Z Sep 29 02:10:45  
>(4, '1603708289000')],
> 2022-09-29T02:10:45.3588094Z Sep 29 02:10:45  
>   type_info=Types.ROW([Types.INT(), Types.STRING()]))
> 2022-09-29T02:10:45.3589090Z Sep 29 02:10:45 
> 2022-09-29T02:10:45.3589949Z Sep 29 02:10:45 class 
> MyProcessFunction(ProcessFunction):
> 2022-09-29T02:10:45.3590710Z Sep 29 02:10:45 
> 2022-09-29T02:10:45.3591856Z Sep 29 02:10:45 def 
> process_element(self, value, ctx):
> 2022-09-29T02:10:45.3592873Z Sep 29 02:10:45 
> current_timestamp = ctx.timestamp()
> 2022-09-29T02:10:45.3593862Z Sep 29 02:10:45 
> current_watermark = ctx.timer_service().current_watermark()
> 2022-09-29T02:10:45.3594915Z Sep 29 02:10:45 yield "current 
> timestamp: {}, current watermark: {}, current_value: {}"\
> 2022-09-29T02:10:45.3596201Z Sep 29 02:10:45 
> .format(str(current_timestamp), str(current_watermark), str(value))
> 2022-09-29T02:10:45.3597089Z Sep 29 02:10:45 
> 2022-09-29T02:10:45.3597942Z Sep 29 02:10:45 watermark_strategy = 
> WatermarkStrategy.for_monotonous_timestamps()\
> 2022-09-29T02:10:45.3599260Z Sep 29 02:10:45 
> .with_timestamp_assigner(SecondColumnTimestampAssigner())
> 2022-09-29T02:10:45.3600611Z Sep 29 02:10:45 
> data_stream.assign_timestamps_and_watermarks(watermark_strategy)\
> 2022-09-29T02:10:45.3601877Z Sep 29 02:10:45 
> .process(MyProcessFunction(), 
> output_type=Types.STRING()).add_sink(self.test_sink)
> 2022-09-29T02:10:45.3603527Z Sep 29 02:10:45 self.env.execute('test 
> process function')
> 2022-09-29T02:10:45.3604445Z Sep 29 02:10:45 results = 
> self.test_sink.get_results()
> 2022-09-29T02:10:45.3605684Z Sep 29 02:10:45 expected = ["current 
> timestamp: 1603708211000, current watermark: "
> 2022-09-29T02:10:45.3607157Z Sep 29 02:10:45 
> "-9223372036854775808, current_value: Row(f0=1, f1='1603708211000')",
> 2022-09-29T02:10:45.3608256Z Sep 29 02:10:45 "current 
> timestamp: 1603708224000, current watermark: "
> 2022-09-29T02:10:45.3609650Z Sep 29 02:10:45 
> "-9223372036854775808, current_value: Row(f0=2, f1='1603708224000')",
> 2022-09-29T02:10:45.3610854Z Sep 29 02:10:45 "current 
> timestamp: 1603708226000, current watermark: "
> 2022-09-29T02:10:45.3612279Z Sep 29 02:10:45 
> "-9223372036854775808, current_value: Row(f0=3, f1='1603708226000')",
> 2022-09-29T02:10:45.3613382Z Sep 29 02:10:45 "current 
> timestamp: 1603708289000, current watermark: "
> 2022-09-29T02:10:45.3615683Z Sep 29 02:10:45 
> "-9223372036854775808, current_value: Row(f0=4, f1='1603708289000')"]
> 2022-09-29T02:10:45.3617687Z Sep 29 02:10:45 >   
> self.assert_equals_sorted(expected, results)
> 2022-09-29T02:10:45.3618620Z Sep 29 02:10:45 
> 2022-09-29T02:10:45.3619425Z Sep 29 02:10:45 
> 

[jira] [Commented] (FLINK-29461) ProcessDataStreamStreamingTests.test_process_function unstable

2022-11-28 Thread Matthias Pohl (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17640019#comment-17640019
 ] 

Matthias Pohl commented on FLINK-29461:
---

https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=43541=logs=9cada3cb-c1d3-5621-16da-0f718fb86602=c67e71ed-6451-5d26-8920-5a8cf9651901=26458

> ProcessDataStreamStreamingTests.test_process_function unstable
> --
>
> Key: FLINK-29461
> URL: https://issues.apache.org/jira/browse/FLINK-29461
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python
>Affects Versions: 1.16.0, 1.17.0
>Reporter: Huang Xingbo
>Assignee: Huang Xingbo
>Priority: Critical
>  Labels: test-stability
>
> {code:java}
> 2022-09-29T02:10:45.3571648Z Sep 29 02:10:45 self = 
>  testMethod=test_process_function>
> 2022-09-29T02:10:45.3572279Z Sep 29 02:10:45 
> 2022-09-29T02:10:45.3572810Z Sep 29 02:10:45 def 
> test_process_function(self):
> 2022-09-29T02:10:45.3573495Z Sep 29 02:10:45 
> self.env.set_parallelism(1)
> 2022-09-29T02:10:45.3574148Z Sep 29 02:10:45 
> self.env.get_config().set_auto_watermark_interval(2000)
> 2022-09-29T02:10:45.3580634Z Sep 29 02:10:45 
> self.env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
> 2022-09-29T02:10:45.3583194Z Sep 29 02:10:45 data_stream = 
> self.env.from_collection([(1, '1603708211000'),
> 2022-09-29T02:10:45.3584515Z Sep 29 02:10:45  
>(2, '1603708224000'),
> 2022-09-29T02:10:45.3585957Z Sep 29 02:10:45  
>(3, '1603708226000'),
> 2022-09-29T02:10:45.3587132Z Sep 29 02:10:45  
>(4, '1603708289000')],
> 2022-09-29T02:10:45.3588094Z Sep 29 02:10:45  
>   type_info=Types.ROW([Types.INT(), Types.STRING()]))
> 2022-09-29T02:10:45.3589090Z Sep 29 02:10:45 
> 2022-09-29T02:10:45.3589949Z Sep 29 02:10:45 class 
> MyProcessFunction(ProcessFunction):
> 2022-09-29T02:10:45.3590710Z Sep 29 02:10:45 
> 2022-09-29T02:10:45.3591856Z Sep 29 02:10:45 def 
> process_element(self, value, ctx):
> 2022-09-29T02:10:45.3592873Z Sep 29 02:10:45 
> current_timestamp = ctx.timestamp()
> 2022-09-29T02:10:45.3593862Z Sep 29 02:10:45 
> current_watermark = ctx.timer_service().current_watermark()
> 2022-09-29T02:10:45.3594915Z Sep 29 02:10:45 yield "current 
> timestamp: {}, current watermark: {}, current_value: {}"\
> 2022-09-29T02:10:45.3596201Z Sep 29 02:10:45 
> .format(str(current_timestamp), str(current_watermark), str(value))
> 2022-09-29T02:10:45.3597089Z Sep 29 02:10:45 
> 2022-09-29T02:10:45.3597942Z Sep 29 02:10:45 watermark_strategy = 
> WatermarkStrategy.for_monotonous_timestamps()\
> 2022-09-29T02:10:45.3599260Z Sep 29 02:10:45 
> .with_timestamp_assigner(SecondColumnTimestampAssigner())
> 2022-09-29T02:10:45.3600611Z Sep 29 02:10:45 
> data_stream.assign_timestamps_and_watermarks(watermark_strategy)\
> 2022-09-29T02:10:45.3601877Z Sep 29 02:10:45 
> .process(MyProcessFunction(), 
> output_type=Types.STRING()).add_sink(self.test_sink)
> 2022-09-29T02:10:45.3603527Z Sep 29 02:10:45 self.env.execute('test 
> process function')
> 2022-09-29T02:10:45.3604445Z Sep 29 02:10:45 results = 
> self.test_sink.get_results()
> 2022-09-29T02:10:45.3605684Z Sep 29 02:10:45 expected = ["current 
> timestamp: 1603708211000, current watermark: "
> 2022-09-29T02:10:45.3607157Z Sep 29 02:10:45 
> "-9223372036854775808, current_value: Row(f0=1, f1='1603708211000')",
> 2022-09-29T02:10:45.3608256Z Sep 29 02:10:45 "current 
> timestamp: 1603708224000, current watermark: "
> 2022-09-29T02:10:45.3609650Z Sep 29 02:10:45 
> "-9223372036854775808, current_value: Row(f0=2, f1='1603708224000')",
> 2022-09-29T02:10:45.3610854Z Sep 29 02:10:45 "current 
> timestamp: 1603708226000, current watermark: "
> 2022-09-29T02:10:45.3612279Z Sep 29 02:10:45 
> "-9223372036854775808, current_value: Row(f0=3, f1='1603708226000')",
> 2022-09-29T02:10:45.3613382Z Sep 29 02:10:45 "current 
> timestamp: 1603708289000, current watermark: "
> 2022-09-29T02:10:45.3615683Z Sep 29 02:10:45 
> "-9223372036854775808, current_value: Row(f0=4, f1='1603708289000')"]
> 2022-09-29T02:10:45.3617687Z Sep 29 02:10:45 >   
> self.assert_equals_sorted(expected, results)
> 2022-09-29T02:10:45.3618620Z Sep 29 02:10:45 
> 2022-09-29T02:10:45.3619425Z Sep 29 02:10:45 
> 

[jira] [Commented] (FLINK-29461) ProcessDataStreamStreamingTests.test_process_function unstable

2022-11-28 Thread Matthias Pohl (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17639914#comment-17639914
 ] 

Matthias Pohl commented on FLINK-29461:
---

Same build, two failures:
* 
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=43512=logs=e92ecf6d-e207-5a42-7ff7-528ff0c5b259=40fc352e-9b4c-5fd8-363f-628f24b01ec2=29899
* 
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=43512=logs=bf5e383b-9fd3-5f02-ca1c-8f788e2e76d3=85189c57-d8a0-5c9c-b61d-fc05cfac62cf=27869

> ProcessDataStreamStreamingTests.test_process_function unstable
> --
>
> Key: FLINK-29461
> URL: https://issues.apache.org/jira/browse/FLINK-29461
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python
>Affects Versions: 1.16.0, 1.17.0
>Reporter: Huang Xingbo
>Assignee: Huang Xingbo
>Priority: Critical
>  Labels: test-stability
>
> {code:java}
> 2022-09-29T02:10:45.3571648Z Sep 29 02:10:45 self = 
>  testMethod=test_process_function>
> 2022-09-29T02:10:45.3572279Z Sep 29 02:10:45 
> 2022-09-29T02:10:45.3572810Z Sep 29 02:10:45 def 
> test_process_function(self):
> 2022-09-29T02:10:45.3573495Z Sep 29 02:10:45 
> self.env.set_parallelism(1)
> 2022-09-29T02:10:45.3574148Z Sep 29 02:10:45 
> self.env.get_config().set_auto_watermark_interval(2000)
> 2022-09-29T02:10:45.3580634Z Sep 29 02:10:45 
> self.env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
> 2022-09-29T02:10:45.3583194Z Sep 29 02:10:45 data_stream = 
> self.env.from_collection([(1, '1603708211000'),
> 2022-09-29T02:10:45.3584515Z Sep 29 02:10:45  
>(2, '1603708224000'),
> 2022-09-29T02:10:45.3585957Z Sep 29 02:10:45  
>(3, '1603708226000'),
> 2022-09-29T02:10:45.3587132Z Sep 29 02:10:45  
>(4, '1603708289000')],
> 2022-09-29T02:10:45.3588094Z Sep 29 02:10:45  
>   type_info=Types.ROW([Types.INT(), Types.STRING()]))
> 2022-09-29T02:10:45.3589090Z Sep 29 02:10:45 
> 2022-09-29T02:10:45.3589949Z Sep 29 02:10:45 class 
> MyProcessFunction(ProcessFunction):
> 2022-09-29T02:10:45.3590710Z Sep 29 02:10:45 
> 2022-09-29T02:10:45.3591856Z Sep 29 02:10:45 def 
> process_element(self, value, ctx):
> 2022-09-29T02:10:45.3592873Z Sep 29 02:10:45 
> current_timestamp = ctx.timestamp()
> 2022-09-29T02:10:45.3593862Z Sep 29 02:10:45 
> current_watermark = ctx.timer_service().current_watermark()
> 2022-09-29T02:10:45.3594915Z Sep 29 02:10:45 yield "current 
> timestamp: {}, current watermark: {}, current_value: {}"\
> 2022-09-29T02:10:45.3596201Z Sep 29 02:10:45 
> .format(str(current_timestamp), str(current_watermark), str(value))
> 2022-09-29T02:10:45.3597089Z Sep 29 02:10:45 
> 2022-09-29T02:10:45.3597942Z Sep 29 02:10:45 watermark_strategy = 
> WatermarkStrategy.for_monotonous_timestamps()\
> 2022-09-29T02:10:45.3599260Z Sep 29 02:10:45 
> .with_timestamp_assigner(SecondColumnTimestampAssigner())
> 2022-09-29T02:10:45.3600611Z Sep 29 02:10:45 
> data_stream.assign_timestamps_and_watermarks(watermark_strategy)\
> 2022-09-29T02:10:45.3601877Z Sep 29 02:10:45 
> .process(MyProcessFunction(), 
> output_type=Types.STRING()).add_sink(self.test_sink)
> 2022-09-29T02:10:45.3603527Z Sep 29 02:10:45 self.env.execute('test 
> process function')
> 2022-09-29T02:10:45.3604445Z Sep 29 02:10:45 results = 
> self.test_sink.get_results()
> 2022-09-29T02:10:45.3605684Z Sep 29 02:10:45 expected = ["current 
> timestamp: 1603708211000, current watermark: "
> 2022-09-29T02:10:45.3607157Z Sep 29 02:10:45 
> "-9223372036854775808, current_value: Row(f0=1, f1='1603708211000')",
> 2022-09-29T02:10:45.3608256Z Sep 29 02:10:45 "current 
> timestamp: 1603708224000, current watermark: "
> 2022-09-29T02:10:45.3609650Z Sep 29 02:10:45 
> "-9223372036854775808, current_value: Row(f0=2, f1='1603708224000')",
> 2022-09-29T02:10:45.3610854Z Sep 29 02:10:45 "current 
> timestamp: 1603708226000, current watermark: "
> 2022-09-29T02:10:45.3612279Z Sep 29 02:10:45 
> "-9223372036854775808, current_value: Row(f0=3, f1='1603708226000')",
> 2022-09-29T02:10:45.3613382Z Sep 29 02:10:45 "current 
> timestamp: 1603708289000, current watermark: "
> 2022-09-29T02:10:45.3615683Z Sep 29 02:10:45 
> "-9223372036854775808, current_value: Row(f0=4, f1='1603708289000')"]
> 2022-09-29T02:10:45.3617687Z Sep 29 02:10:45 

[jira] [Commented] (FLINK-29461) ProcessDataStreamStreamingTests.test_process_function unstable

2022-11-24 Thread Matthias Pohl (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17638491#comment-17638491
 ] 

Matthias Pohl commented on FLINK-29461:
---

https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=43483=logs=9cada3cb-c1d3-5621-16da-0f718fb86602=c67e71ed-6451-5d26-8920-5a8cf9651901=28464

> ProcessDataStreamStreamingTests.test_process_function unstable
> --
>
> Key: FLINK-29461
> URL: https://issues.apache.org/jira/browse/FLINK-29461
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python
>Affects Versions: 1.16.0, 1.17.0
>Reporter: Huang Xingbo
>Assignee: Huang Xingbo
>Priority: Critical
>  Labels: test-stability
>
> {code:java}
> 2022-09-29T02:10:45.3571648Z Sep 29 02:10:45 self = 
>  testMethod=test_process_function>
> 2022-09-29T02:10:45.3572279Z Sep 29 02:10:45 
> 2022-09-29T02:10:45.3572810Z Sep 29 02:10:45 def 
> test_process_function(self):
> 2022-09-29T02:10:45.3573495Z Sep 29 02:10:45 
> self.env.set_parallelism(1)
> 2022-09-29T02:10:45.3574148Z Sep 29 02:10:45 
> self.env.get_config().set_auto_watermark_interval(2000)
> 2022-09-29T02:10:45.3580634Z Sep 29 02:10:45 
> self.env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
> 2022-09-29T02:10:45.3583194Z Sep 29 02:10:45 data_stream = 
> self.env.from_collection([(1, '1603708211000'),
> 2022-09-29T02:10:45.3584515Z Sep 29 02:10:45  
>(2, '1603708224000'),
> 2022-09-29T02:10:45.3585957Z Sep 29 02:10:45  
>(3, '1603708226000'),
> 2022-09-29T02:10:45.3587132Z Sep 29 02:10:45  
>(4, '1603708289000')],
> 2022-09-29T02:10:45.3588094Z Sep 29 02:10:45  
>   type_info=Types.ROW([Types.INT(), Types.STRING()]))
> 2022-09-29T02:10:45.3589090Z Sep 29 02:10:45 
> 2022-09-29T02:10:45.3589949Z Sep 29 02:10:45 class 
> MyProcessFunction(ProcessFunction):
> 2022-09-29T02:10:45.3590710Z Sep 29 02:10:45 
> 2022-09-29T02:10:45.3591856Z Sep 29 02:10:45 def 
> process_element(self, value, ctx):
> 2022-09-29T02:10:45.3592873Z Sep 29 02:10:45 
> current_timestamp = ctx.timestamp()
> 2022-09-29T02:10:45.3593862Z Sep 29 02:10:45 
> current_watermark = ctx.timer_service().current_watermark()
> 2022-09-29T02:10:45.3594915Z Sep 29 02:10:45 yield "current 
> timestamp: {}, current watermark: {}, current_value: {}"\
> 2022-09-29T02:10:45.3596201Z Sep 29 02:10:45 
> .format(str(current_timestamp), str(current_watermark), str(value))
> 2022-09-29T02:10:45.3597089Z Sep 29 02:10:45 
> 2022-09-29T02:10:45.3597942Z Sep 29 02:10:45 watermark_strategy = 
> WatermarkStrategy.for_monotonous_timestamps()\
> 2022-09-29T02:10:45.3599260Z Sep 29 02:10:45 
> .with_timestamp_assigner(SecondColumnTimestampAssigner())
> 2022-09-29T02:10:45.3600611Z Sep 29 02:10:45 
> data_stream.assign_timestamps_and_watermarks(watermark_strategy)\
> 2022-09-29T02:10:45.3601877Z Sep 29 02:10:45 
> .process(MyProcessFunction(), 
> output_type=Types.STRING()).add_sink(self.test_sink)
> 2022-09-29T02:10:45.3603527Z Sep 29 02:10:45 self.env.execute('test 
> process function')
> 2022-09-29T02:10:45.3604445Z Sep 29 02:10:45 results = 
> self.test_sink.get_results()
> 2022-09-29T02:10:45.3605684Z Sep 29 02:10:45 expected = ["current 
> timestamp: 1603708211000, current watermark: "
> 2022-09-29T02:10:45.3607157Z Sep 29 02:10:45 
> "-9223372036854775808, current_value: Row(f0=1, f1='1603708211000')",
> 2022-09-29T02:10:45.3608256Z Sep 29 02:10:45 "current 
> timestamp: 1603708224000, current watermark: "
> 2022-09-29T02:10:45.3609650Z Sep 29 02:10:45 
> "-9223372036854775808, current_value: Row(f0=2, f1='1603708224000')",
> 2022-09-29T02:10:45.3610854Z Sep 29 02:10:45 "current 
> timestamp: 1603708226000, current watermark: "
> 2022-09-29T02:10:45.3612279Z Sep 29 02:10:45 
> "-9223372036854775808, current_value: Row(f0=3, f1='1603708226000')",
> 2022-09-29T02:10:45.3613382Z Sep 29 02:10:45 "current 
> timestamp: 1603708289000, current watermark: "
> 2022-09-29T02:10:45.3615683Z Sep 29 02:10:45 
> "-9223372036854775808, current_value: Row(f0=4, f1='1603708289000')"]
> 2022-09-29T02:10:45.3617687Z Sep 29 02:10:45 >   
> self.assert_equals_sorted(expected, results)
> 2022-09-29T02:10:45.3618620Z Sep 29 02:10:45 
> 2022-09-29T02:10:45.3619425Z Sep 29 02:10:45 
> 

[jira] [Commented] (FLINK-29461) ProcessDataStreamStreamingTests.test_process_function unstable

2022-11-23 Thread Matthias Pohl (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17638140#comment-17638140
 ] 

Matthias Pohl commented on FLINK-29461:
---

https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=43436=logs=3e4dd1a2-fe2f-5e5d-a581-48087e718d53=b4612f28-e3b5-5853-8a8b-610ae894217a=27416

> ProcessDataStreamStreamingTests.test_process_function unstable
> --
>
> Key: FLINK-29461
> URL: https://issues.apache.org/jira/browse/FLINK-29461
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python
>Affects Versions: 1.16.0, 1.17.0
>Reporter: Huang Xingbo
>Assignee: Huang Xingbo
>Priority: Critical
>  Labels: test-stability
>
> {code:java}
> 2022-09-29T02:10:45.3571648Z Sep 29 02:10:45 self = 
>  testMethod=test_process_function>
> 2022-09-29T02:10:45.3572279Z Sep 29 02:10:45 
> 2022-09-29T02:10:45.3572810Z Sep 29 02:10:45 def 
> test_process_function(self):
> 2022-09-29T02:10:45.3573495Z Sep 29 02:10:45 
> self.env.set_parallelism(1)
> 2022-09-29T02:10:45.3574148Z Sep 29 02:10:45 
> self.env.get_config().set_auto_watermark_interval(2000)
> 2022-09-29T02:10:45.3580634Z Sep 29 02:10:45 
> self.env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
> 2022-09-29T02:10:45.3583194Z Sep 29 02:10:45 data_stream = 
> self.env.from_collection([(1, '1603708211000'),
> 2022-09-29T02:10:45.3584515Z Sep 29 02:10:45  
>(2, '1603708224000'),
> 2022-09-29T02:10:45.3585957Z Sep 29 02:10:45  
>(3, '1603708226000'),
> 2022-09-29T02:10:45.3587132Z Sep 29 02:10:45  
>(4, '1603708289000')],
> 2022-09-29T02:10:45.3588094Z Sep 29 02:10:45  
>   type_info=Types.ROW([Types.INT(), Types.STRING()]))
> 2022-09-29T02:10:45.3589090Z Sep 29 02:10:45 
> 2022-09-29T02:10:45.3589949Z Sep 29 02:10:45 class 
> MyProcessFunction(ProcessFunction):
> 2022-09-29T02:10:45.3590710Z Sep 29 02:10:45 
> 2022-09-29T02:10:45.3591856Z Sep 29 02:10:45 def 
> process_element(self, value, ctx):
> 2022-09-29T02:10:45.3592873Z Sep 29 02:10:45 
> current_timestamp = ctx.timestamp()
> 2022-09-29T02:10:45.3593862Z Sep 29 02:10:45 
> current_watermark = ctx.timer_service().current_watermark()
> 2022-09-29T02:10:45.3594915Z Sep 29 02:10:45 yield "current 
> timestamp: {}, current watermark: {}, current_value: {}"\
> 2022-09-29T02:10:45.3596201Z Sep 29 02:10:45 
> .format(str(current_timestamp), str(current_watermark), str(value))
> 2022-09-29T02:10:45.3597089Z Sep 29 02:10:45 
> 2022-09-29T02:10:45.3597942Z Sep 29 02:10:45 watermark_strategy = 
> WatermarkStrategy.for_monotonous_timestamps()\
> 2022-09-29T02:10:45.3599260Z Sep 29 02:10:45 
> .with_timestamp_assigner(SecondColumnTimestampAssigner())
> 2022-09-29T02:10:45.3600611Z Sep 29 02:10:45 
> data_stream.assign_timestamps_and_watermarks(watermark_strategy)\
> 2022-09-29T02:10:45.3601877Z Sep 29 02:10:45 
> .process(MyProcessFunction(), 
> output_type=Types.STRING()).add_sink(self.test_sink)
> 2022-09-29T02:10:45.3603527Z Sep 29 02:10:45 self.env.execute('test 
> process function')
> 2022-09-29T02:10:45.3604445Z Sep 29 02:10:45 results = 
> self.test_sink.get_results()
> 2022-09-29T02:10:45.3605684Z Sep 29 02:10:45 expected = ["current 
> timestamp: 1603708211000, current watermark: "
> 2022-09-29T02:10:45.3607157Z Sep 29 02:10:45 
> "-9223372036854775808, current_value: Row(f0=1, f1='1603708211000')",
> 2022-09-29T02:10:45.3608256Z Sep 29 02:10:45 "current 
> timestamp: 1603708224000, current watermark: "
> 2022-09-29T02:10:45.3609650Z Sep 29 02:10:45 
> "-9223372036854775808, current_value: Row(f0=2, f1='1603708224000')",
> 2022-09-29T02:10:45.3610854Z Sep 29 02:10:45 "current 
> timestamp: 1603708226000, current watermark: "
> 2022-09-29T02:10:45.3612279Z Sep 29 02:10:45 
> "-9223372036854775808, current_value: Row(f0=3, f1='1603708226000')",
> 2022-09-29T02:10:45.3613382Z Sep 29 02:10:45 "current 
> timestamp: 1603708289000, current watermark: "
> 2022-09-29T02:10:45.3615683Z Sep 29 02:10:45 
> "-9223372036854775808, current_value: Row(f0=4, f1='1603708289000')"]
> 2022-09-29T02:10:45.3617687Z Sep 29 02:10:45 >   
> self.assert_equals_sorted(expected, results)
> 2022-09-29T02:10:45.3618620Z Sep 29 02:10:45 
> 2022-09-29T02:10:45.3619425Z Sep 29 02:10:45 
> 

[jira] [Commented] (FLINK-29461) ProcessDataStreamStreamingTests.test_process_function unstable

2022-11-22 Thread Matthias Pohl (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17637085#comment-17637085
 ] 

Matthias Pohl commented on FLINK-29461:
---

[~hxbks2ks] any updates on this issue?

> ProcessDataStreamStreamingTests.test_process_function unstable
> --
>
> Key: FLINK-29461
> URL: https://issues.apache.org/jira/browse/FLINK-29461
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python
>Affects Versions: 1.16.0, 1.17.0
>Reporter: Huang Xingbo
>Assignee: Huang Xingbo
>Priority: Critical
>  Labels: test-stability
>
> {code:java}
> 2022-09-29T02:10:45.3571648Z Sep 29 02:10:45 self = 
>  testMethod=test_process_function>
> 2022-09-29T02:10:45.3572279Z Sep 29 02:10:45 
> 2022-09-29T02:10:45.3572810Z Sep 29 02:10:45 def 
> test_process_function(self):
> 2022-09-29T02:10:45.3573495Z Sep 29 02:10:45 
> self.env.set_parallelism(1)
> 2022-09-29T02:10:45.3574148Z Sep 29 02:10:45 
> self.env.get_config().set_auto_watermark_interval(2000)
> 2022-09-29T02:10:45.3580634Z Sep 29 02:10:45 
> self.env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
> 2022-09-29T02:10:45.3583194Z Sep 29 02:10:45 data_stream = 
> self.env.from_collection([(1, '1603708211000'),
> 2022-09-29T02:10:45.3584515Z Sep 29 02:10:45  
>(2, '1603708224000'),
> 2022-09-29T02:10:45.3585957Z Sep 29 02:10:45  
>(3, '1603708226000'),
> 2022-09-29T02:10:45.3587132Z Sep 29 02:10:45  
>(4, '1603708289000')],
> 2022-09-29T02:10:45.3588094Z Sep 29 02:10:45  
>   type_info=Types.ROW([Types.INT(), Types.STRING()]))
> 2022-09-29T02:10:45.3589090Z Sep 29 02:10:45 
> 2022-09-29T02:10:45.3589949Z Sep 29 02:10:45 class 
> MyProcessFunction(ProcessFunction):
> 2022-09-29T02:10:45.3590710Z Sep 29 02:10:45 
> 2022-09-29T02:10:45.3591856Z Sep 29 02:10:45 def 
> process_element(self, value, ctx):
> 2022-09-29T02:10:45.3592873Z Sep 29 02:10:45 
> current_timestamp = ctx.timestamp()
> 2022-09-29T02:10:45.3593862Z Sep 29 02:10:45 
> current_watermark = ctx.timer_service().current_watermark()
> 2022-09-29T02:10:45.3594915Z Sep 29 02:10:45 yield "current 
> timestamp: {}, current watermark: {}, current_value: {}"\
> 2022-09-29T02:10:45.3596201Z Sep 29 02:10:45 
> .format(str(current_timestamp), str(current_watermark), str(value))
> 2022-09-29T02:10:45.3597089Z Sep 29 02:10:45 
> 2022-09-29T02:10:45.3597942Z Sep 29 02:10:45 watermark_strategy = 
> WatermarkStrategy.for_monotonous_timestamps()\
> 2022-09-29T02:10:45.3599260Z Sep 29 02:10:45 
> .with_timestamp_assigner(SecondColumnTimestampAssigner())
> 2022-09-29T02:10:45.3600611Z Sep 29 02:10:45 
> data_stream.assign_timestamps_and_watermarks(watermark_strategy)\
> 2022-09-29T02:10:45.3601877Z Sep 29 02:10:45 
> .process(MyProcessFunction(), 
> output_type=Types.STRING()).add_sink(self.test_sink)
> 2022-09-29T02:10:45.3603527Z Sep 29 02:10:45 self.env.execute('test 
> process function')
> 2022-09-29T02:10:45.3604445Z Sep 29 02:10:45 results = 
> self.test_sink.get_results()
> 2022-09-29T02:10:45.3605684Z Sep 29 02:10:45 expected = ["current 
> timestamp: 1603708211000, current watermark: "
> 2022-09-29T02:10:45.3607157Z Sep 29 02:10:45 
> "-9223372036854775808, current_value: Row(f0=1, f1='1603708211000')",
> 2022-09-29T02:10:45.3608256Z Sep 29 02:10:45 "current 
> timestamp: 1603708224000, current watermark: "
> 2022-09-29T02:10:45.3609650Z Sep 29 02:10:45 
> "-9223372036854775808, current_value: Row(f0=2, f1='1603708224000')",
> 2022-09-29T02:10:45.3610854Z Sep 29 02:10:45 "current 
> timestamp: 1603708226000, current watermark: "
> 2022-09-29T02:10:45.3612279Z Sep 29 02:10:45 
> "-9223372036854775808, current_value: Row(f0=3, f1='1603708226000')",
> 2022-09-29T02:10:45.3613382Z Sep 29 02:10:45 "current 
> timestamp: 1603708289000, current watermark: "
> 2022-09-29T02:10:45.3615683Z Sep 29 02:10:45 
> "-9223372036854775808, current_value: Row(f0=4, f1='1603708289000')"]
> 2022-09-29T02:10:45.3617687Z Sep 29 02:10:45 >   
> self.assert_equals_sorted(expected, results)
> 2022-09-29T02:10:45.3618620Z Sep 29 02:10:45 
> 2022-09-29T02:10:45.3619425Z Sep 29 02:10:45 
> pyflink/datastream/tests/test_data_stream.py:986: 
> 2022-09-29T02:10:45.3620424Z Sep 29 02:10:45 _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
> _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

[jira] [Commented] (FLINK-29461) ProcessDataStreamStreamingTests.test_process_function unstable

2022-11-22 Thread Matthias Pohl (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17637084#comment-17637084
 ] 

Matthias Pohl commented on FLINK-29461:
---

https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=43167=logs=9cada3cb-c1d3-5621-16da-0f718fb86602=c67e71ed-6451-5d26-8920-5a8cf9651901=24422

> ProcessDataStreamStreamingTests.test_process_function unstable
> --
>
> Key: FLINK-29461
> URL: https://issues.apache.org/jira/browse/FLINK-29461
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python
>Affects Versions: 1.16.0, 1.17.0
>Reporter: Huang Xingbo
>Assignee: Huang Xingbo
>Priority: Critical
>  Labels: test-stability
>
> {code:java}
> 2022-09-29T02:10:45.3571648Z Sep 29 02:10:45 self = 
>  testMethod=test_process_function>
> 2022-09-29T02:10:45.3572279Z Sep 29 02:10:45 
> 2022-09-29T02:10:45.3572810Z Sep 29 02:10:45 def 
> test_process_function(self):
> 2022-09-29T02:10:45.3573495Z Sep 29 02:10:45 
> self.env.set_parallelism(1)
> 2022-09-29T02:10:45.3574148Z Sep 29 02:10:45 
> self.env.get_config().set_auto_watermark_interval(2000)
> 2022-09-29T02:10:45.3580634Z Sep 29 02:10:45 
> self.env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
> 2022-09-29T02:10:45.3583194Z Sep 29 02:10:45 data_stream = 
> self.env.from_collection([(1, '1603708211000'),
> 2022-09-29T02:10:45.3584515Z Sep 29 02:10:45  
>(2, '1603708224000'),
> 2022-09-29T02:10:45.3585957Z Sep 29 02:10:45  
>(3, '1603708226000'),
> 2022-09-29T02:10:45.3587132Z Sep 29 02:10:45  
>(4, '1603708289000')],
> 2022-09-29T02:10:45.3588094Z Sep 29 02:10:45  
>   type_info=Types.ROW([Types.INT(), Types.STRING()]))
> 2022-09-29T02:10:45.3589090Z Sep 29 02:10:45 
> 2022-09-29T02:10:45.3589949Z Sep 29 02:10:45 class 
> MyProcessFunction(ProcessFunction):
> 2022-09-29T02:10:45.3590710Z Sep 29 02:10:45 
> 2022-09-29T02:10:45.3591856Z Sep 29 02:10:45 def 
> process_element(self, value, ctx):
> 2022-09-29T02:10:45.3592873Z Sep 29 02:10:45 
> current_timestamp = ctx.timestamp()
> 2022-09-29T02:10:45.3593862Z Sep 29 02:10:45 
> current_watermark = ctx.timer_service().current_watermark()
> 2022-09-29T02:10:45.3594915Z Sep 29 02:10:45 yield "current 
> timestamp: {}, current watermark: {}, current_value: {}"\
> 2022-09-29T02:10:45.3596201Z Sep 29 02:10:45 
> .format(str(current_timestamp), str(current_watermark), str(value))
> 2022-09-29T02:10:45.3597089Z Sep 29 02:10:45 
> 2022-09-29T02:10:45.3597942Z Sep 29 02:10:45 watermark_strategy = 
> WatermarkStrategy.for_monotonous_timestamps()\
> 2022-09-29T02:10:45.3599260Z Sep 29 02:10:45 
> .with_timestamp_assigner(SecondColumnTimestampAssigner())
> 2022-09-29T02:10:45.3600611Z Sep 29 02:10:45 
> data_stream.assign_timestamps_and_watermarks(watermark_strategy)\
> 2022-09-29T02:10:45.3601877Z Sep 29 02:10:45 
> .process(MyProcessFunction(), 
> output_type=Types.STRING()).add_sink(self.test_sink)
> 2022-09-29T02:10:45.3603527Z Sep 29 02:10:45 self.env.execute('test 
> process function')
> 2022-09-29T02:10:45.3604445Z Sep 29 02:10:45 results = 
> self.test_sink.get_results()
> 2022-09-29T02:10:45.3605684Z Sep 29 02:10:45 expected = ["current 
> timestamp: 1603708211000, current watermark: "
> 2022-09-29T02:10:45.3607157Z Sep 29 02:10:45 
> "-9223372036854775808, current_value: Row(f0=1, f1='1603708211000')",
> 2022-09-29T02:10:45.3608256Z Sep 29 02:10:45 "current 
> timestamp: 1603708224000, current watermark: "
> 2022-09-29T02:10:45.3609650Z Sep 29 02:10:45 
> "-9223372036854775808, current_value: Row(f0=2, f1='1603708224000')",
> 2022-09-29T02:10:45.3610854Z Sep 29 02:10:45 "current 
> timestamp: 1603708226000, current watermark: "
> 2022-09-29T02:10:45.3612279Z Sep 29 02:10:45 
> "-9223372036854775808, current_value: Row(f0=3, f1='1603708226000')",
> 2022-09-29T02:10:45.3613382Z Sep 29 02:10:45 "current 
> timestamp: 1603708289000, current watermark: "
> 2022-09-29T02:10:45.3615683Z Sep 29 02:10:45 
> "-9223372036854775808, current_value: Row(f0=4, f1='1603708289000')"]
> 2022-09-29T02:10:45.3617687Z Sep 29 02:10:45 >   
> self.assert_equals_sorted(expected, results)
> 2022-09-29T02:10:45.3618620Z Sep 29 02:10:45 
> 2022-09-29T02:10:45.3619425Z Sep 29 02:10:45 
> 

[jira] [Commented] (FLINK-29461) ProcessDataStreamStreamingTests.test_process_function unstable

2022-11-21 Thread Leonard Xu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17637056#comment-17637056
 ] 

Leonard Xu commented on FLINK-29461:


https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=43369=logs=bf5e383b-9fd3-5f02-ca1c-8f788e2e76d3=85189c57-d8a0-5c9c-b61d-fc05cfac62cf

> ProcessDataStreamStreamingTests.test_process_function unstable
> --
>
> Key: FLINK-29461
> URL: https://issues.apache.org/jira/browse/FLINK-29461
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python
>Affects Versions: 1.16.0, 1.17.0
>Reporter: Huang Xingbo
>Assignee: Huang Xingbo
>Priority: Critical
>  Labels: test-stability
>
> {code:java}
> 2022-09-29T02:10:45.3571648Z Sep 29 02:10:45 self = 
>  testMethod=test_process_function>
> 2022-09-29T02:10:45.3572279Z Sep 29 02:10:45 
> 2022-09-29T02:10:45.3572810Z Sep 29 02:10:45 def 
> test_process_function(self):
> 2022-09-29T02:10:45.3573495Z Sep 29 02:10:45 
> self.env.set_parallelism(1)
> 2022-09-29T02:10:45.3574148Z Sep 29 02:10:45 
> self.env.get_config().set_auto_watermark_interval(2000)
> 2022-09-29T02:10:45.3580634Z Sep 29 02:10:45 
> self.env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
> 2022-09-29T02:10:45.3583194Z Sep 29 02:10:45 data_stream = 
> self.env.from_collection([(1, '1603708211000'),
> 2022-09-29T02:10:45.3584515Z Sep 29 02:10:45  
>(2, '1603708224000'),
> 2022-09-29T02:10:45.3585957Z Sep 29 02:10:45  
>(3, '1603708226000'),
> 2022-09-29T02:10:45.3587132Z Sep 29 02:10:45  
>(4, '1603708289000')],
> 2022-09-29T02:10:45.3588094Z Sep 29 02:10:45  
>   type_info=Types.ROW([Types.INT(), Types.STRING()]))
> 2022-09-29T02:10:45.3589090Z Sep 29 02:10:45 
> 2022-09-29T02:10:45.3589949Z Sep 29 02:10:45 class 
> MyProcessFunction(ProcessFunction):
> 2022-09-29T02:10:45.3590710Z Sep 29 02:10:45 
> 2022-09-29T02:10:45.3591856Z Sep 29 02:10:45 def 
> process_element(self, value, ctx):
> 2022-09-29T02:10:45.3592873Z Sep 29 02:10:45 
> current_timestamp = ctx.timestamp()
> 2022-09-29T02:10:45.3593862Z Sep 29 02:10:45 
> current_watermark = ctx.timer_service().current_watermark()
> 2022-09-29T02:10:45.3594915Z Sep 29 02:10:45 yield "current 
> timestamp: {}, current watermark: {}, current_value: {}"\
> 2022-09-29T02:10:45.3596201Z Sep 29 02:10:45 
> .format(str(current_timestamp), str(current_watermark), str(value))
> 2022-09-29T02:10:45.3597089Z Sep 29 02:10:45 
> 2022-09-29T02:10:45.3597942Z Sep 29 02:10:45 watermark_strategy = 
> WatermarkStrategy.for_monotonous_timestamps()\
> 2022-09-29T02:10:45.3599260Z Sep 29 02:10:45 
> .with_timestamp_assigner(SecondColumnTimestampAssigner())
> 2022-09-29T02:10:45.3600611Z Sep 29 02:10:45 
> data_stream.assign_timestamps_and_watermarks(watermark_strategy)\
> 2022-09-29T02:10:45.3601877Z Sep 29 02:10:45 
> .process(MyProcessFunction(), 
> output_type=Types.STRING()).add_sink(self.test_sink)
> 2022-09-29T02:10:45.3603527Z Sep 29 02:10:45 self.env.execute('test 
> process function')
> 2022-09-29T02:10:45.3604445Z Sep 29 02:10:45 results = 
> self.test_sink.get_results()
> 2022-09-29T02:10:45.3605684Z Sep 29 02:10:45 expected = ["current 
> timestamp: 1603708211000, current watermark: "
> 2022-09-29T02:10:45.3607157Z Sep 29 02:10:45 
> "-9223372036854775808, current_value: Row(f0=1, f1='1603708211000')",
> 2022-09-29T02:10:45.3608256Z Sep 29 02:10:45 "current 
> timestamp: 1603708224000, current watermark: "
> 2022-09-29T02:10:45.3609650Z Sep 29 02:10:45 
> "-9223372036854775808, current_value: Row(f0=2, f1='1603708224000')",
> 2022-09-29T02:10:45.3610854Z Sep 29 02:10:45 "current 
> timestamp: 1603708226000, current watermark: "
> 2022-09-29T02:10:45.3612279Z Sep 29 02:10:45 
> "-9223372036854775808, current_value: Row(f0=3, f1='1603708226000')",
> 2022-09-29T02:10:45.3613382Z Sep 29 02:10:45 "current 
> timestamp: 1603708289000, current watermark: "
> 2022-09-29T02:10:45.3615683Z Sep 29 02:10:45 
> "-9223372036854775808, current_value: Row(f0=4, f1='1603708289000')"]
> 2022-09-29T02:10:45.3617687Z Sep 29 02:10:45 >   
> self.assert_equals_sorted(expected, results)
> 2022-09-29T02:10:45.3618620Z Sep 29 02:10:45 
> 2022-09-29T02:10:45.3619425Z Sep 29 02:10:45 
> pyflink/datastream/tests/test_data_stream.py:986: 

[jira] [Commented] (FLINK-29461) ProcessDataStreamStreamingTests.test_process_function unstable

2022-11-21 Thread Leonard Xu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17637042#comment-17637042
 ] 

Leonard Xu commented on FLINK-29461:


https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=43367=logs=9cada3cb-c1d3-5621-16da-0f718fb86602=c67e71ed-6451-5d26-8920-5a8cf9651901

> ProcessDataStreamStreamingTests.test_process_function unstable
> --
>
> Key: FLINK-29461
> URL: https://issues.apache.org/jira/browse/FLINK-29461
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python
>Affects Versions: 1.16.0, 1.17.0
>Reporter: Huang Xingbo
>Assignee: Huang Xingbo
>Priority: Critical
>  Labels: test-stability
>
> {code:java}
> 2022-09-29T02:10:45.3571648Z Sep 29 02:10:45 self = 
>  testMethod=test_process_function>
> 2022-09-29T02:10:45.3572279Z Sep 29 02:10:45 
> 2022-09-29T02:10:45.3572810Z Sep 29 02:10:45 def 
> test_process_function(self):
> 2022-09-29T02:10:45.3573495Z Sep 29 02:10:45 
> self.env.set_parallelism(1)
> 2022-09-29T02:10:45.3574148Z Sep 29 02:10:45 
> self.env.get_config().set_auto_watermark_interval(2000)
> 2022-09-29T02:10:45.3580634Z Sep 29 02:10:45 
> self.env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
> 2022-09-29T02:10:45.3583194Z Sep 29 02:10:45 data_stream = 
> self.env.from_collection([(1, '1603708211000'),
> 2022-09-29T02:10:45.3584515Z Sep 29 02:10:45  
>(2, '1603708224000'),
> 2022-09-29T02:10:45.3585957Z Sep 29 02:10:45  
>(3, '1603708226000'),
> 2022-09-29T02:10:45.3587132Z Sep 29 02:10:45  
>(4, '1603708289000')],
> 2022-09-29T02:10:45.3588094Z Sep 29 02:10:45  
>   type_info=Types.ROW([Types.INT(), Types.STRING()]))
> 2022-09-29T02:10:45.3589090Z Sep 29 02:10:45 
> 2022-09-29T02:10:45.3589949Z Sep 29 02:10:45 class 
> MyProcessFunction(ProcessFunction):
> 2022-09-29T02:10:45.3590710Z Sep 29 02:10:45 
> 2022-09-29T02:10:45.3591856Z Sep 29 02:10:45 def 
> process_element(self, value, ctx):
> 2022-09-29T02:10:45.3592873Z Sep 29 02:10:45 
> current_timestamp = ctx.timestamp()
> 2022-09-29T02:10:45.3593862Z Sep 29 02:10:45 
> current_watermark = ctx.timer_service().current_watermark()
> 2022-09-29T02:10:45.3594915Z Sep 29 02:10:45 yield "current 
> timestamp: {}, current watermark: {}, current_value: {}"\
> 2022-09-29T02:10:45.3596201Z Sep 29 02:10:45 
> .format(str(current_timestamp), str(current_watermark), str(value))
> 2022-09-29T02:10:45.3597089Z Sep 29 02:10:45 
> 2022-09-29T02:10:45.3597942Z Sep 29 02:10:45 watermark_strategy = 
> WatermarkStrategy.for_monotonous_timestamps()\
> 2022-09-29T02:10:45.3599260Z Sep 29 02:10:45 
> .with_timestamp_assigner(SecondColumnTimestampAssigner())
> 2022-09-29T02:10:45.3600611Z Sep 29 02:10:45 
> data_stream.assign_timestamps_and_watermarks(watermark_strategy)\
> 2022-09-29T02:10:45.3601877Z Sep 29 02:10:45 
> .process(MyProcessFunction(), 
> output_type=Types.STRING()).add_sink(self.test_sink)
> 2022-09-29T02:10:45.3603527Z Sep 29 02:10:45 self.env.execute('test 
> process function')
> 2022-09-29T02:10:45.3604445Z Sep 29 02:10:45 results = 
> self.test_sink.get_results()
> 2022-09-29T02:10:45.3605684Z Sep 29 02:10:45 expected = ["current 
> timestamp: 1603708211000, current watermark: "
> 2022-09-29T02:10:45.3607157Z Sep 29 02:10:45 
> "-9223372036854775808, current_value: Row(f0=1, f1='1603708211000')",
> 2022-09-29T02:10:45.3608256Z Sep 29 02:10:45 "current 
> timestamp: 1603708224000, current watermark: "
> 2022-09-29T02:10:45.3609650Z Sep 29 02:10:45 
> "-9223372036854775808, current_value: Row(f0=2, f1='1603708224000')",
> 2022-09-29T02:10:45.3610854Z Sep 29 02:10:45 "current 
> timestamp: 1603708226000, current watermark: "
> 2022-09-29T02:10:45.3612279Z Sep 29 02:10:45 
> "-9223372036854775808, current_value: Row(f0=3, f1='1603708226000')",
> 2022-09-29T02:10:45.3613382Z Sep 29 02:10:45 "current 
> timestamp: 1603708289000, current watermark: "
> 2022-09-29T02:10:45.3615683Z Sep 29 02:10:45 
> "-9223372036854775808, current_value: Row(f0=4, f1='1603708289000')"]
> 2022-09-29T02:10:45.3617687Z Sep 29 02:10:45 >   
> self.assert_equals_sorted(expected, results)
> 2022-09-29T02:10:45.3618620Z Sep 29 02:10:45 
> 2022-09-29T02:10:45.3619425Z Sep 29 02:10:45 
> pyflink/datastream/tests/test_data_stream.py:986: 

[jira] [Commented] (FLINK-29461) ProcessDataStreamStreamingTests.test_process_function unstable

2022-11-20 Thread Leonard Xu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17636431#comment-17636431
 ] 

Leonard Xu commented on FLINK-29461:


https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=43332=logs=e92ecf6d-e207-5a42-7ff7-528ff0c5b259=40fc352e-9b4c-5fd8-363f-628f24b01ec2

> ProcessDataStreamStreamingTests.test_process_function unstable
> --
>
> Key: FLINK-29461
> URL: https://issues.apache.org/jira/browse/FLINK-29461
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python
>Affects Versions: 1.16.0, 1.17.0
>Reporter: Huang Xingbo
>Assignee: Huang Xingbo
>Priority: Critical
>  Labels: test-stability
>
> {code:java}
> 2022-09-29T02:10:45.3571648Z Sep 29 02:10:45 self = 
>  testMethod=test_process_function>
> 2022-09-29T02:10:45.3572279Z Sep 29 02:10:45 
> 2022-09-29T02:10:45.3572810Z Sep 29 02:10:45 def 
> test_process_function(self):
> 2022-09-29T02:10:45.3573495Z Sep 29 02:10:45 
> self.env.set_parallelism(1)
> 2022-09-29T02:10:45.3574148Z Sep 29 02:10:45 
> self.env.get_config().set_auto_watermark_interval(2000)
> 2022-09-29T02:10:45.3580634Z Sep 29 02:10:45 
> self.env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
> 2022-09-29T02:10:45.3583194Z Sep 29 02:10:45 data_stream = 
> self.env.from_collection([(1, '1603708211000'),
> 2022-09-29T02:10:45.3584515Z Sep 29 02:10:45  
>(2, '1603708224000'),
> 2022-09-29T02:10:45.3585957Z Sep 29 02:10:45  
>(3, '1603708226000'),
> 2022-09-29T02:10:45.3587132Z Sep 29 02:10:45  
>(4, '1603708289000')],
> 2022-09-29T02:10:45.3588094Z Sep 29 02:10:45  
>   type_info=Types.ROW([Types.INT(), Types.STRING()]))
> 2022-09-29T02:10:45.3589090Z Sep 29 02:10:45 
> 2022-09-29T02:10:45.3589949Z Sep 29 02:10:45 class 
> MyProcessFunction(ProcessFunction):
> 2022-09-29T02:10:45.3590710Z Sep 29 02:10:45 
> 2022-09-29T02:10:45.3591856Z Sep 29 02:10:45 def 
> process_element(self, value, ctx):
> 2022-09-29T02:10:45.3592873Z Sep 29 02:10:45 
> current_timestamp = ctx.timestamp()
> 2022-09-29T02:10:45.3593862Z Sep 29 02:10:45 
> current_watermark = ctx.timer_service().current_watermark()
> 2022-09-29T02:10:45.3594915Z Sep 29 02:10:45 yield "current 
> timestamp: {}, current watermark: {}, current_value: {}"\
> 2022-09-29T02:10:45.3596201Z Sep 29 02:10:45 
> .format(str(current_timestamp), str(current_watermark), str(value))
> 2022-09-29T02:10:45.3597089Z Sep 29 02:10:45 
> 2022-09-29T02:10:45.3597942Z Sep 29 02:10:45 watermark_strategy = 
> WatermarkStrategy.for_monotonous_timestamps()\
> 2022-09-29T02:10:45.3599260Z Sep 29 02:10:45 
> .with_timestamp_assigner(SecondColumnTimestampAssigner())
> 2022-09-29T02:10:45.3600611Z Sep 29 02:10:45 
> data_stream.assign_timestamps_and_watermarks(watermark_strategy)\
> 2022-09-29T02:10:45.3601877Z Sep 29 02:10:45 
> .process(MyProcessFunction(), 
> output_type=Types.STRING()).add_sink(self.test_sink)
> 2022-09-29T02:10:45.3603527Z Sep 29 02:10:45 self.env.execute('test 
> process function')
> 2022-09-29T02:10:45.3604445Z Sep 29 02:10:45 results = 
> self.test_sink.get_results()
> 2022-09-29T02:10:45.3605684Z Sep 29 02:10:45 expected = ["current 
> timestamp: 1603708211000, current watermark: "
> 2022-09-29T02:10:45.3607157Z Sep 29 02:10:45 
> "-9223372036854775808, current_value: Row(f0=1, f1='1603708211000')",
> 2022-09-29T02:10:45.3608256Z Sep 29 02:10:45 "current 
> timestamp: 1603708224000, current watermark: "
> 2022-09-29T02:10:45.3609650Z Sep 29 02:10:45 
> "-9223372036854775808, current_value: Row(f0=2, f1='1603708224000')",
> 2022-09-29T02:10:45.3610854Z Sep 29 02:10:45 "current 
> timestamp: 1603708226000, current watermark: "
> 2022-09-29T02:10:45.3612279Z Sep 29 02:10:45 
> "-9223372036854775808, current_value: Row(f0=3, f1='1603708226000')",
> 2022-09-29T02:10:45.3613382Z Sep 29 02:10:45 "current 
> timestamp: 1603708289000, current watermark: "
> 2022-09-29T02:10:45.3615683Z Sep 29 02:10:45 
> "-9223372036854775808, current_value: Row(f0=4, f1='1603708289000')"]
> 2022-09-29T02:10:45.3617687Z Sep 29 02:10:45 >   
> self.assert_equals_sorted(expected, results)
> 2022-09-29T02:10:45.3618620Z Sep 29 02:10:45 
> 2022-09-29T02:10:45.3619425Z Sep 29 02:10:45 
> pyflink/datastream/tests/test_data_stream.py:986: 

[jira] [Commented] (FLINK-29461) ProcessDataStreamStreamingTests.test_process_function unstable

2022-11-11 Thread Martijn Visser (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17632189#comment-17632189
 ] 

Martijn Visser commented on FLINK-29461:


https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=43043=logs=3e4dd1a2-fe2f-5e5d-a581-48087e718d53=b4612f28-e3b5-5853-8a8b-610ae894217a=25767

> ProcessDataStreamStreamingTests.test_process_function unstable
> --
>
> Key: FLINK-29461
> URL: https://issues.apache.org/jira/browse/FLINK-29461
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python
>Affects Versions: 1.16.0, 1.17.0
>Reporter: Huang Xingbo
>Assignee: Huang Xingbo
>Priority: Critical
>  Labels: test-stability
>
> {code:java}
> 2022-09-29T02:10:45.3571648Z Sep 29 02:10:45 self = 
>  testMethod=test_process_function>
> 2022-09-29T02:10:45.3572279Z Sep 29 02:10:45 
> 2022-09-29T02:10:45.3572810Z Sep 29 02:10:45 def 
> test_process_function(self):
> 2022-09-29T02:10:45.3573495Z Sep 29 02:10:45 
> self.env.set_parallelism(1)
> 2022-09-29T02:10:45.3574148Z Sep 29 02:10:45 
> self.env.get_config().set_auto_watermark_interval(2000)
> 2022-09-29T02:10:45.3580634Z Sep 29 02:10:45 
> self.env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
> 2022-09-29T02:10:45.3583194Z Sep 29 02:10:45 data_stream = 
> self.env.from_collection([(1, '1603708211000'),
> 2022-09-29T02:10:45.3584515Z Sep 29 02:10:45  
>(2, '1603708224000'),
> 2022-09-29T02:10:45.3585957Z Sep 29 02:10:45  
>(3, '1603708226000'),
> 2022-09-29T02:10:45.3587132Z Sep 29 02:10:45  
>(4, '1603708289000')],
> 2022-09-29T02:10:45.3588094Z Sep 29 02:10:45  
>   type_info=Types.ROW([Types.INT(), Types.STRING()]))
> 2022-09-29T02:10:45.3589090Z Sep 29 02:10:45 
> 2022-09-29T02:10:45.3589949Z Sep 29 02:10:45 class 
> MyProcessFunction(ProcessFunction):
> 2022-09-29T02:10:45.3590710Z Sep 29 02:10:45 
> 2022-09-29T02:10:45.3591856Z Sep 29 02:10:45 def 
> process_element(self, value, ctx):
> 2022-09-29T02:10:45.3592873Z Sep 29 02:10:45 
> current_timestamp = ctx.timestamp()
> 2022-09-29T02:10:45.3593862Z Sep 29 02:10:45 
> current_watermark = ctx.timer_service().current_watermark()
> 2022-09-29T02:10:45.3594915Z Sep 29 02:10:45 yield "current 
> timestamp: {}, current watermark: {}, current_value: {}"\
> 2022-09-29T02:10:45.3596201Z Sep 29 02:10:45 
> .format(str(current_timestamp), str(current_watermark), str(value))
> 2022-09-29T02:10:45.3597089Z Sep 29 02:10:45 
> 2022-09-29T02:10:45.3597942Z Sep 29 02:10:45 watermark_strategy = 
> WatermarkStrategy.for_monotonous_timestamps()\
> 2022-09-29T02:10:45.3599260Z Sep 29 02:10:45 
> .with_timestamp_assigner(SecondColumnTimestampAssigner())
> 2022-09-29T02:10:45.3600611Z Sep 29 02:10:45 
> data_stream.assign_timestamps_and_watermarks(watermark_strategy)\
> 2022-09-29T02:10:45.3601877Z Sep 29 02:10:45 
> .process(MyProcessFunction(), 
> output_type=Types.STRING()).add_sink(self.test_sink)
> 2022-09-29T02:10:45.3603527Z Sep 29 02:10:45 self.env.execute('test 
> process function')
> 2022-09-29T02:10:45.3604445Z Sep 29 02:10:45 results = 
> self.test_sink.get_results()
> 2022-09-29T02:10:45.3605684Z Sep 29 02:10:45 expected = ["current 
> timestamp: 1603708211000, current watermark: "
> 2022-09-29T02:10:45.3607157Z Sep 29 02:10:45 
> "-9223372036854775808, current_value: Row(f0=1, f1='1603708211000')",
> 2022-09-29T02:10:45.3608256Z Sep 29 02:10:45 "current 
> timestamp: 1603708224000, current watermark: "
> 2022-09-29T02:10:45.3609650Z Sep 29 02:10:45 
> "-9223372036854775808, current_value: Row(f0=2, f1='1603708224000')",
> 2022-09-29T02:10:45.3610854Z Sep 29 02:10:45 "current 
> timestamp: 1603708226000, current watermark: "
> 2022-09-29T02:10:45.3612279Z Sep 29 02:10:45 
> "-9223372036854775808, current_value: Row(f0=3, f1='1603708226000')",
> 2022-09-29T02:10:45.3613382Z Sep 29 02:10:45 "current 
> timestamp: 1603708289000, current watermark: "
> 2022-09-29T02:10:45.3615683Z Sep 29 02:10:45 
> "-9223372036854775808, current_value: Row(f0=4, f1='1603708289000')"]
> 2022-09-29T02:10:45.3617687Z Sep 29 02:10:45 >   
> self.assert_equals_sorted(expected, results)
> 2022-09-29T02:10:45.3618620Z Sep 29 02:10:45 
> 2022-09-29T02:10:45.3619425Z Sep 29 02:10:45 
> 

[jira] [Commented] (FLINK-29461) ProcessDataStreamStreamingTests.test_process_function unstable

2022-11-07 Thread Matthias Pohl (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17630240#comment-17630240
 ] 

Matthias Pohl commented on FLINK-29461:
---

https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=42908=logs=3e4dd1a2-fe2f-5e5d-a581-48087e718d53=b4612f28-e3b5-5853-8a8b-610ae894217a=27019

> ProcessDataStreamStreamingTests.test_process_function unstable
> --
>
> Key: FLINK-29461
> URL: https://issues.apache.org/jira/browse/FLINK-29461
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python
>Affects Versions: 1.16.0, 1.17.0
>Reporter: Huang Xingbo
>Assignee: Huang Xingbo
>Priority: Critical
>  Labels: test-stability
>
> {code:java}
> 2022-09-29T02:10:45.3571648Z Sep 29 02:10:45 self = 
>  testMethod=test_process_function>
> 2022-09-29T02:10:45.3572279Z Sep 29 02:10:45 
> 2022-09-29T02:10:45.3572810Z Sep 29 02:10:45 def 
> test_process_function(self):
> 2022-09-29T02:10:45.3573495Z Sep 29 02:10:45 
> self.env.set_parallelism(1)
> 2022-09-29T02:10:45.3574148Z Sep 29 02:10:45 
> self.env.get_config().set_auto_watermark_interval(2000)
> 2022-09-29T02:10:45.3580634Z Sep 29 02:10:45 
> self.env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
> 2022-09-29T02:10:45.3583194Z Sep 29 02:10:45 data_stream = 
> self.env.from_collection([(1, '1603708211000'),
> 2022-09-29T02:10:45.3584515Z Sep 29 02:10:45  
>(2, '1603708224000'),
> 2022-09-29T02:10:45.3585957Z Sep 29 02:10:45  
>(3, '1603708226000'),
> 2022-09-29T02:10:45.3587132Z Sep 29 02:10:45  
>(4, '1603708289000')],
> 2022-09-29T02:10:45.3588094Z Sep 29 02:10:45  
>   type_info=Types.ROW([Types.INT(), Types.STRING()]))
> 2022-09-29T02:10:45.3589090Z Sep 29 02:10:45 
> 2022-09-29T02:10:45.3589949Z Sep 29 02:10:45 class 
> MyProcessFunction(ProcessFunction):
> 2022-09-29T02:10:45.3590710Z Sep 29 02:10:45 
> 2022-09-29T02:10:45.3591856Z Sep 29 02:10:45 def 
> process_element(self, value, ctx):
> 2022-09-29T02:10:45.3592873Z Sep 29 02:10:45 
> current_timestamp = ctx.timestamp()
> 2022-09-29T02:10:45.3593862Z Sep 29 02:10:45 
> current_watermark = ctx.timer_service().current_watermark()
> 2022-09-29T02:10:45.3594915Z Sep 29 02:10:45 yield "current 
> timestamp: {}, current watermark: {}, current_value: {}"\
> 2022-09-29T02:10:45.3596201Z Sep 29 02:10:45 
> .format(str(current_timestamp), str(current_watermark), str(value))
> 2022-09-29T02:10:45.3597089Z Sep 29 02:10:45 
> 2022-09-29T02:10:45.3597942Z Sep 29 02:10:45 watermark_strategy = 
> WatermarkStrategy.for_monotonous_timestamps()\
> 2022-09-29T02:10:45.3599260Z Sep 29 02:10:45 
> .with_timestamp_assigner(SecondColumnTimestampAssigner())
> 2022-09-29T02:10:45.3600611Z Sep 29 02:10:45 
> data_stream.assign_timestamps_and_watermarks(watermark_strategy)\
> 2022-09-29T02:10:45.3601877Z Sep 29 02:10:45 
> .process(MyProcessFunction(), 
> output_type=Types.STRING()).add_sink(self.test_sink)
> 2022-09-29T02:10:45.3603527Z Sep 29 02:10:45 self.env.execute('test 
> process function')
> 2022-09-29T02:10:45.3604445Z Sep 29 02:10:45 results = 
> self.test_sink.get_results()
> 2022-09-29T02:10:45.3605684Z Sep 29 02:10:45 expected = ["current 
> timestamp: 1603708211000, current watermark: "
> 2022-09-29T02:10:45.3607157Z Sep 29 02:10:45 
> "-9223372036854775808, current_value: Row(f0=1, f1='1603708211000')",
> 2022-09-29T02:10:45.3608256Z Sep 29 02:10:45 "current 
> timestamp: 1603708224000, current watermark: "
> 2022-09-29T02:10:45.3609650Z Sep 29 02:10:45 
> "-9223372036854775808, current_value: Row(f0=2, f1='1603708224000')",
> 2022-09-29T02:10:45.3610854Z Sep 29 02:10:45 "current 
> timestamp: 1603708226000, current watermark: "
> 2022-09-29T02:10:45.3612279Z Sep 29 02:10:45 
> "-9223372036854775808, current_value: Row(f0=3, f1='1603708226000')",
> 2022-09-29T02:10:45.3613382Z Sep 29 02:10:45 "current 
> timestamp: 1603708289000, current watermark: "
> 2022-09-29T02:10:45.3615683Z Sep 29 02:10:45 
> "-9223372036854775808, current_value: Row(f0=4, f1='1603708289000')"]
> 2022-09-29T02:10:45.3617687Z Sep 29 02:10:45 >   
> self.assert_equals_sorted(expected, results)
> 2022-09-29T02:10:45.3618620Z Sep 29 02:10:45 
> 2022-09-29T02:10:45.3619425Z Sep 29 02:10:45 
> 

[jira] [Commented] (FLINK-29461) ProcessDataStreamStreamingTests.test_process_function unstable

2022-11-07 Thread Matthias Pohl (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17629665#comment-17629665
 ] 

Matthias Pohl commented on FLINK-29461:
---

https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=42867=logs=bf5e383b-9fd3-5f02-ca1c-8f788e2e76d3=85189c57-d8a0-5c9c-b61d-fc05cfac62cf=27223

> ProcessDataStreamStreamingTests.test_process_function unstable
> --
>
> Key: FLINK-29461
> URL: https://issues.apache.org/jira/browse/FLINK-29461
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python
>Affects Versions: 1.16.0, 1.17.0
>Reporter: Huang Xingbo
>Assignee: Huang Xingbo
>Priority: Critical
>  Labels: test-stability
>
> {code:java}
> 2022-09-29T02:10:45.3571648Z Sep 29 02:10:45 self = 
>  testMethod=test_process_function>
> 2022-09-29T02:10:45.3572279Z Sep 29 02:10:45 
> 2022-09-29T02:10:45.3572810Z Sep 29 02:10:45 def 
> test_process_function(self):
> 2022-09-29T02:10:45.3573495Z Sep 29 02:10:45 
> self.env.set_parallelism(1)
> 2022-09-29T02:10:45.3574148Z Sep 29 02:10:45 
> self.env.get_config().set_auto_watermark_interval(2000)
> 2022-09-29T02:10:45.3580634Z Sep 29 02:10:45 
> self.env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
> 2022-09-29T02:10:45.3583194Z Sep 29 02:10:45 data_stream = 
> self.env.from_collection([(1, '1603708211000'),
> 2022-09-29T02:10:45.3584515Z Sep 29 02:10:45  
>(2, '1603708224000'),
> 2022-09-29T02:10:45.3585957Z Sep 29 02:10:45  
>(3, '1603708226000'),
> 2022-09-29T02:10:45.3587132Z Sep 29 02:10:45  
>(4, '1603708289000')],
> 2022-09-29T02:10:45.3588094Z Sep 29 02:10:45  
>   type_info=Types.ROW([Types.INT(), Types.STRING()]))
> 2022-09-29T02:10:45.3589090Z Sep 29 02:10:45 
> 2022-09-29T02:10:45.3589949Z Sep 29 02:10:45 class 
> MyProcessFunction(ProcessFunction):
> 2022-09-29T02:10:45.3590710Z Sep 29 02:10:45 
> 2022-09-29T02:10:45.3591856Z Sep 29 02:10:45 def 
> process_element(self, value, ctx):
> 2022-09-29T02:10:45.3592873Z Sep 29 02:10:45 
> current_timestamp = ctx.timestamp()
> 2022-09-29T02:10:45.3593862Z Sep 29 02:10:45 
> current_watermark = ctx.timer_service().current_watermark()
> 2022-09-29T02:10:45.3594915Z Sep 29 02:10:45 yield "current 
> timestamp: {}, current watermark: {}, current_value: {}"\
> 2022-09-29T02:10:45.3596201Z Sep 29 02:10:45 
> .format(str(current_timestamp), str(current_watermark), str(value))
> 2022-09-29T02:10:45.3597089Z Sep 29 02:10:45 
> 2022-09-29T02:10:45.3597942Z Sep 29 02:10:45 watermark_strategy = 
> WatermarkStrategy.for_monotonous_timestamps()\
> 2022-09-29T02:10:45.3599260Z Sep 29 02:10:45 
> .with_timestamp_assigner(SecondColumnTimestampAssigner())
> 2022-09-29T02:10:45.3600611Z Sep 29 02:10:45 
> data_stream.assign_timestamps_and_watermarks(watermark_strategy)\
> 2022-09-29T02:10:45.3601877Z Sep 29 02:10:45 
> .process(MyProcessFunction(), 
> output_type=Types.STRING()).add_sink(self.test_sink)
> 2022-09-29T02:10:45.3603527Z Sep 29 02:10:45 self.env.execute('test 
> process function')
> 2022-09-29T02:10:45.3604445Z Sep 29 02:10:45 results = 
> self.test_sink.get_results()
> 2022-09-29T02:10:45.3605684Z Sep 29 02:10:45 expected = ["current 
> timestamp: 1603708211000, current watermark: "
> 2022-09-29T02:10:45.3607157Z Sep 29 02:10:45 
> "-9223372036854775808, current_value: Row(f0=1, f1='1603708211000')",
> 2022-09-29T02:10:45.3608256Z Sep 29 02:10:45 "current 
> timestamp: 1603708224000, current watermark: "
> 2022-09-29T02:10:45.3609650Z Sep 29 02:10:45 
> "-9223372036854775808, current_value: Row(f0=2, f1='1603708224000')",
> 2022-09-29T02:10:45.3610854Z Sep 29 02:10:45 "current 
> timestamp: 1603708226000, current watermark: "
> 2022-09-29T02:10:45.3612279Z Sep 29 02:10:45 
> "-9223372036854775808, current_value: Row(f0=3, f1='1603708226000')",
> 2022-09-29T02:10:45.3613382Z Sep 29 02:10:45 "current 
> timestamp: 1603708289000, current watermark: "
> 2022-09-29T02:10:45.3615683Z Sep 29 02:10:45 
> "-9223372036854775808, current_value: Row(f0=4, f1='1603708289000')"]
> 2022-09-29T02:10:45.3617687Z Sep 29 02:10:45 >   
> self.assert_equals_sorted(expected, results)
> 2022-09-29T02:10:45.3618620Z Sep 29 02:10:45 
> 2022-09-29T02:10:45.3619425Z Sep 29 02:10:45 
> 

[jira] [Commented] (FLINK-29461) ProcessDataStreamStreamingTests.test_process_function unstable

2022-11-07 Thread Matthias Pohl (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17629654#comment-17629654
 ] 

Matthias Pohl commented on FLINK-29461:
---

https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=42858=logs=9cada3cb-c1d3-5621-16da-0f718fb86602=c67e71ed-6451-5d26-8920-5a8cf9651901=28095

> ProcessDataStreamStreamingTests.test_process_function unstable
> --
>
> Key: FLINK-29461
> URL: https://issues.apache.org/jira/browse/FLINK-29461
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python
>Affects Versions: 1.16.0, 1.17.0
>Reporter: Huang Xingbo
>Assignee: Huang Xingbo
>Priority: Critical
>  Labels: test-stability
>
> {code:java}
> 2022-09-29T02:10:45.3571648Z Sep 29 02:10:45 self = 
>  testMethod=test_process_function>
> 2022-09-29T02:10:45.3572279Z Sep 29 02:10:45 
> 2022-09-29T02:10:45.3572810Z Sep 29 02:10:45 def 
> test_process_function(self):
> 2022-09-29T02:10:45.3573495Z Sep 29 02:10:45 
> self.env.set_parallelism(1)
> 2022-09-29T02:10:45.3574148Z Sep 29 02:10:45 
> self.env.get_config().set_auto_watermark_interval(2000)
> 2022-09-29T02:10:45.3580634Z Sep 29 02:10:45 
> self.env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
> 2022-09-29T02:10:45.3583194Z Sep 29 02:10:45 data_stream = 
> self.env.from_collection([(1, '1603708211000'),
> 2022-09-29T02:10:45.3584515Z Sep 29 02:10:45  
>(2, '1603708224000'),
> 2022-09-29T02:10:45.3585957Z Sep 29 02:10:45  
>(3, '1603708226000'),
> 2022-09-29T02:10:45.3587132Z Sep 29 02:10:45  
>(4, '1603708289000')],
> 2022-09-29T02:10:45.3588094Z Sep 29 02:10:45  
>   type_info=Types.ROW([Types.INT(), Types.STRING()]))
> 2022-09-29T02:10:45.3589090Z Sep 29 02:10:45 
> 2022-09-29T02:10:45.3589949Z Sep 29 02:10:45 class 
> MyProcessFunction(ProcessFunction):
> 2022-09-29T02:10:45.3590710Z Sep 29 02:10:45 
> 2022-09-29T02:10:45.3591856Z Sep 29 02:10:45 def 
> process_element(self, value, ctx):
> 2022-09-29T02:10:45.3592873Z Sep 29 02:10:45 
> current_timestamp = ctx.timestamp()
> 2022-09-29T02:10:45.3593862Z Sep 29 02:10:45 
> current_watermark = ctx.timer_service().current_watermark()
> 2022-09-29T02:10:45.3594915Z Sep 29 02:10:45 yield "current 
> timestamp: {}, current watermark: {}, current_value: {}"\
> 2022-09-29T02:10:45.3596201Z Sep 29 02:10:45 
> .format(str(current_timestamp), str(current_watermark), str(value))
> 2022-09-29T02:10:45.3597089Z Sep 29 02:10:45 
> 2022-09-29T02:10:45.3597942Z Sep 29 02:10:45 watermark_strategy = 
> WatermarkStrategy.for_monotonous_timestamps()\
> 2022-09-29T02:10:45.3599260Z Sep 29 02:10:45 
> .with_timestamp_assigner(SecondColumnTimestampAssigner())
> 2022-09-29T02:10:45.3600611Z Sep 29 02:10:45 
> data_stream.assign_timestamps_and_watermarks(watermark_strategy)\
> 2022-09-29T02:10:45.3601877Z Sep 29 02:10:45 
> .process(MyProcessFunction(), 
> output_type=Types.STRING()).add_sink(self.test_sink)
> 2022-09-29T02:10:45.3603527Z Sep 29 02:10:45 self.env.execute('test 
> process function')
> 2022-09-29T02:10:45.3604445Z Sep 29 02:10:45 results = 
> self.test_sink.get_results()
> 2022-09-29T02:10:45.3605684Z Sep 29 02:10:45 expected = ["current 
> timestamp: 1603708211000, current watermark: "
> 2022-09-29T02:10:45.3607157Z Sep 29 02:10:45 
> "-9223372036854775808, current_value: Row(f0=1, f1='1603708211000')",
> 2022-09-29T02:10:45.3608256Z Sep 29 02:10:45 "current 
> timestamp: 1603708224000, current watermark: "
> 2022-09-29T02:10:45.3609650Z Sep 29 02:10:45 
> "-9223372036854775808, current_value: Row(f0=2, f1='1603708224000')",
> 2022-09-29T02:10:45.3610854Z Sep 29 02:10:45 "current 
> timestamp: 1603708226000, current watermark: "
> 2022-09-29T02:10:45.3612279Z Sep 29 02:10:45 
> "-9223372036854775808, current_value: Row(f0=3, f1='1603708226000')",
> 2022-09-29T02:10:45.3613382Z Sep 29 02:10:45 "current 
> timestamp: 1603708289000, current watermark: "
> 2022-09-29T02:10:45.3615683Z Sep 29 02:10:45 
> "-9223372036854775808, current_value: Row(f0=4, f1='1603708289000')"]
> 2022-09-29T02:10:45.3617687Z Sep 29 02:10:45 >   
> self.assert_equals_sorted(expected, results)
> 2022-09-29T02:10:45.3618620Z Sep 29 02:10:45 
> 2022-09-29T02:10:45.3619425Z Sep 29 02:10:45 
> 

[jira] [Commented] (FLINK-29461) ProcessDataStreamStreamingTests.test_process_function unstable

2022-11-04 Thread Matthias Pohl (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17629022#comment-17629022
 ] 

Matthias Pohl commented on FLINK-29461:
---

https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=42827=logs=9cada3cb-c1d3-5621-16da-0f718fb86602=c67e71ed-6451-5d26-8920-5a8cf9651901=25085

> ProcessDataStreamStreamingTests.test_process_function unstable
> --
>
> Key: FLINK-29461
> URL: https://issues.apache.org/jira/browse/FLINK-29461
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python
>Affects Versions: 1.16.0, 1.17.0
>Reporter: Huang Xingbo
>Assignee: Huang Xingbo
>Priority: Critical
>  Labels: test-stability
>
> {code:java}
> 2022-09-29T02:10:45.3571648Z Sep 29 02:10:45 self = 
>  testMethod=test_process_function>
> 2022-09-29T02:10:45.3572279Z Sep 29 02:10:45 
> 2022-09-29T02:10:45.3572810Z Sep 29 02:10:45 def 
> test_process_function(self):
> 2022-09-29T02:10:45.3573495Z Sep 29 02:10:45 
> self.env.set_parallelism(1)
> 2022-09-29T02:10:45.3574148Z Sep 29 02:10:45 
> self.env.get_config().set_auto_watermark_interval(2000)
> 2022-09-29T02:10:45.3580634Z Sep 29 02:10:45 
> self.env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
> 2022-09-29T02:10:45.3583194Z Sep 29 02:10:45 data_stream = 
> self.env.from_collection([(1, '1603708211000'),
> 2022-09-29T02:10:45.3584515Z Sep 29 02:10:45  
>(2, '1603708224000'),
> 2022-09-29T02:10:45.3585957Z Sep 29 02:10:45  
>(3, '1603708226000'),
> 2022-09-29T02:10:45.3587132Z Sep 29 02:10:45  
>(4, '1603708289000')],
> 2022-09-29T02:10:45.3588094Z Sep 29 02:10:45  
>   type_info=Types.ROW([Types.INT(), Types.STRING()]))
> 2022-09-29T02:10:45.3589090Z Sep 29 02:10:45 
> 2022-09-29T02:10:45.3589949Z Sep 29 02:10:45 class 
> MyProcessFunction(ProcessFunction):
> 2022-09-29T02:10:45.3590710Z Sep 29 02:10:45 
> 2022-09-29T02:10:45.3591856Z Sep 29 02:10:45 def 
> process_element(self, value, ctx):
> 2022-09-29T02:10:45.3592873Z Sep 29 02:10:45 
> current_timestamp = ctx.timestamp()
> 2022-09-29T02:10:45.3593862Z Sep 29 02:10:45 
> current_watermark = ctx.timer_service().current_watermark()
> 2022-09-29T02:10:45.3594915Z Sep 29 02:10:45 yield "current 
> timestamp: {}, current watermark: {}, current_value: {}"\
> 2022-09-29T02:10:45.3596201Z Sep 29 02:10:45 
> .format(str(current_timestamp), str(current_watermark), str(value))
> 2022-09-29T02:10:45.3597089Z Sep 29 02:10:45 
> 2022-09-29T02:10:45.3597942Z Sep 29 02:10:45 watermark_strategy = 
> WatermarkStrategy.for_monotonous_timestamps()\
> 2022-09-29T02:10:45.3599260Z Sep 29 02:10:45 
> .with_timestamp_assigner(SecondColumnTimestampAssigner())
> 2022-09-29T02:10:45.3600611Z Sep 29 02:10:45 
> data_stream.assign_timestamps_and_watermarks(watermark_strategy)\
> 2022-09-29T02:10:45.3601877Z Sep 29 02:10:45 
> .process(MyProcessFunction(), 
> output_type=Types.STRING()).add_sink(self.test_sink)
> 2022-09-29T02:10:45.3603527Z Sep 29 02:10:45 self.env.execute('test 
> process function')
> 2022-09-29T02:10:45.3604445Z Sep 29 02:10:45 results = 
> self.test_sink.get_results()
> 2022-09-29T02:10:45.3605684Z Sep 29 02:10:45 expected = ["current 
> timestamp: 1603708211000, current watermark: "
> 2022-09-29T02:10:45.3607157Z Sep 29 02:10:45 
> "-9223372036854775808, current_value: Row(f0=1, f1='1603708211000')",
> 2022-09-29T02:10:45.3608256Z Sep 29 02:10:45 "current 
> timestamp: 1603708224000, current watermark: "
> 2022-09-29T02:10:45.3609650Z Sep 29 02:10:45 
> "-9223372036854775808, current_value: Row(f0=2, f1='1603708224000')",
> 2022-09-29T02:10:45.3610854Z Sep 29 02:10:45 "current 
> timestamp: 1603708226000, current watermark: "
> 2022-09-29T02:10:45.3612279Z Sep 29 02:10:45 
> "-9223372036854775808, current_value: Row(f0=3, f1='1603708226000')",
> 2022-09-29T02:10:45.3613382Z Sep 29 02:10:45 "current 
> timestamp: 1603708289000, current watermark: "
> 2022-09-29T02:10:45.3615683Z Sep 29 02:10:45 
> "-9223372036854775808, current_value: Row(f0=4, f1='1603708289000')"]
> 2022-09-29T02:10:45.3617687Z Sep 29 02:10:45 >   
> self.assert_equals_sorted(expected, results)
> 2022-09-29T02:10:45.3618620Z Sep 29 02:10:45 
> 2022-09-29T02:10:45.3619425Z Sep 29 02:10:45 
> 

[jira] [Commented] (FLINK-29461) ProcessDataStreamStreamingTests.test_process_function unstable

2022-11-03 Thread Qingsheng Ren (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17628626#comment-17628626
 ] 

Qingsheng Ren commented on FLINK-29461:
---

https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=42803=logs=9cada3cb-c1d3-5621-16da-0f718fb86602=c67e71ed-6451-5d26-8920-5a8cf9651901=24482

> ProcessDataStreamStreamingTests.test_process_function unstable
> --
>
> Key: FLINK-29461
> URL: https://issues.apache.org/jira/browse/FLINK-29461
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python
>Affects Versions: 1.16.0, 1.17.0
>Reporter: Huang Xingbo
>Assignee: Huang Xingbo
>Priority: Critical
>  Labels: test-stability
>
> {code:java}
> 2022-09-29T02:10:45.3571648Z Sep 29 02:10:45 self = 
>  testMethod=test_process_function>
> 2022-09-29T02:10:45.3572279Z Sep 29 02:10:45 
> 2022-09-29T02:10:45.3572810Z Sep 29 02:10:45 def 
> test_process_function(self):
> 2022-09-29T02:10:45.3573495Z Sep 29 02:10:45 
> self.env.set_parallelism(1)
> 2022-09-29T02:10:45.3574148Z Sep 29 02:10:45 
> self.env.get_config().set_auto_watermark_interval(2000)
> 2022-09-29T02:10:45.3580634Z Sep 29 02:10:45 
> self.env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
> 2022-09-29T02:10:45.3583194Z Sep 29 02:10:45 data_stream = 
> self.env.from_collection([(1, '1603708211000'),
> 2022-09-29T02:10:45.3584515Z Sep 29 02:10:45  
>(2, '1603708224000'),
> 2022-09-29T02:10:45.3585957Z Sep 29 02:10:45  
>(3, '1603708226000'),
> 2022-09-29T02:10:45.3587132Z Sep 29 02:10:45  
>(4, '1603708289000')],
> 2022-09-29T02:10:45.3588094Z Sep 29 02:10:45  
>   type_info=Types.ROW([Types.INT(), Types.STRING()]))
> 2022-09-29T02:10:45.3589090Z Sep 29 02:10:45 
> 2022-09-29T02:10:45.3589949Z Sep 29 02:10:45 class 
> MyProcessFunction(ProcessFunction):
> 2022-09-29T02:10:45.3590710Z Sep 29 02:10:45 
> 2022-09-29T02:10:45.3591856Z Sep 29 02:10:45 def 
> process_element(self, value, ctx):
> 2022-09-29T02:10:45.3592873Z Sep 29 02:10:45 
> current_timestamp = ctx.timestamp()
> 2022-09-29T02:10:45.3593862Z Sep 29 02:10:45 
> current_watermark = ctx.timer_service().current_watermark()
> 2022-09-29T02:10:45.3594915Z Sep 29 02:10:45 yield "current 
> timestamp: {}, current watermark: {}, current_value: {}"\
> 2022-09-29T02:10:45.3596201Z Sep 29 02:10:45 
> .format(str(current_timestamp), str(current_watermark), str(value))
> 2022-09-29T02:10:45.3597089Z Sep 29 02:10:45 
> 2022-09-29T02:10:45.3597942Z Sep 29 02:10:45 watermark_strategy = 
> WatermarkStrategy.for_monotonous_timestamps()\
> 2022-09-29T02:10:45.3599260Z Sep 29 02:10:45 
> .with_timestamp_assigner(SecondColumnTimestampAssigner())
> 2022-09-29T02:10:45.3600611Z Sep 29 02:10:45 
> data_stream.assign_timestamps_and_watermarks(watermark_strategy)\
> 2022-09-29T02:10:45.3601877Z Sep 29 02:10:45 
> .process(MyProcessFunction(), 
> output_type=Types.STRING()).add_sink(self.test_sink)
> 2022-09-29T02:10:45.3603527Z Sep 29 02:10:45 self.env.execute('test 
> process function')
> 2022-09-29T02:10:45.3604445Z Sep 29 02:10:45 results = 
> self.test_sink.get_results()
> 2022-09-29T02:10:45.3605684Z Sep 29 02:10:45 expected = ["current 
> timestamp: 1603708211000, current watermark: "
> 2022-09-29T02:10:45.3607157Z Sep 29 02:10:45 
> "-9223372036854775808, current_value: Row(f0=1, f1='1603708211000')",
> 2022-09-29T02:10:45.3608256Z Sep 29 02:10:45 "current 
> timestamp: 1603708224000, current watermark: "
> 2022-09-29T02:10:45.3609650Z Sep 29 02:10:45 
> "-9223372036854775808, current_value: Row(f0=2, f1='1603708224000')",
> 2022-09-29T02:10:45.3610854Z Sep 29 02:10:45 "current 
> timestamp: 1603708226000, current watermark: "
> 2022-09-29T02:10:45.3612279Z Sep 29 02:10:45 
> "-9223372036854775808, current_value: Row(f0=3, f1='1603708226000')",
> 2022-09-29T02:10:45.3613382Z Sep 29 02:10:45 "current 
> timestamp: 1603708289000, current watermark: "
> 2022-09-29T02:10:45.3615683Z Sep 29 02:10:45 
> "-9223372036854775808, current_value: Row(f0=4, f1='1603708289000')"]
> 2022-09-29T02:10:45.3617687Z Sep 29 02:10:45 >   
> self.assert_equals_sorted(expected, results)
> 2022-09-29T02:10:45.3618620Z Sep 29 02:10:45 
> 2022-09-29T02:10:45.3619425Z Sep 29 02:10:45 
> 

[jira] [Commented] (FLINK-29461) ProcessDataStreamStreamingTests.test_process_function unstable

2022-11-02 Thread Martijn Visser (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17627520#comment-17627520
 ] 

Martijn Visser commented on FLINK-29461:


https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=42724=logs=3e4dd1a2-fe2f-5e5d-a581-48087e718d53=b4612f28-e3b5-5853-8a8b-610ae894217a=25197

> ProcessDataStreamStreamingTests.test_process_function unstable
> --
>
> Key: FLINK-29461
> URL: https://issues.apache.org/jira/browse/FLINK-29461
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python
>Affects Versions: 1.16.0
>Reporter: Huang Xingbo
>Assignee: Huang Xingbo
>Priority: Critical
>  Labels: test-stability
>
> {code:java}
> 2022-09-29T02:10:45.3571648Z Sep 29 02:10:45 self = 
>  testMethod=test_process_function>
> 2022-09-29T02:10:45.3572279Z Sep 29 02:10:45 
> 2022-09-29T02:10:45.3572810Z Sep 29 02:10:45 def 
> test_process_function(self):
> 2022-09-29T02:10:45.3573495Z Sep 29 02:10:45 
> self.env.set_parallelism(1)
> 2022-09-29T02:10:45.3574148Z Sep 29 02:10:45 
> self.env.get_config().set_auto_watermark_interval(2000)
> 2022-09-29T02:10:45.3580634Z Sep 29 02:10:45 
> self.env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
> 2022-09-29T02:10:45.3583194Z Sep 29 02:10:45 data_stream = 
> self.env.from_collection([(1, '1603708211000'),
> 2022-09-29T02:10:45.3584515Z Sep 29 02:10:45  
>(2, '1603708224000'),
> 2022-09-29T02:10:45.3585957Z Sep 29 02:10:45  
>(3, '1603708226000'),
> 2022-09-29T02:10:45.3587132Z Sep 29 02:10:45  
>(4, '1603708289000')],
> 2022-09-29T02:10:45.3588094Z Sep 29 02:10:45  
>   type_info=Types.ROW([Types.INT(), Types.STRING()]))
> 2022-09-29T02:10:45.3589090Z Sep 29 02:10:45 
> 2022-09-29T02:10:45.3589949Z Sep 29 02:10:45 class 
> MyProcessFunction(ProcessFunction):
> 2022-09-29T02:10:45.3590710Z Sep 29 02:10:45 
> 2022-09-29T02:10:45.3591856Z Sep 29 02:10:45 def 
> process_element(self, value, ctx):
> 2022-09-29T02:10:45.3592873Z Sep 29 02:10:45 
> current_timestamp = ctx.timestamp()
> 2022-09-29T02:10:45.3593862Z Sep 29 02:10:45 
> current_watermark = ctx.timer_service().current_watermark()
> 2022-09-29T02:10:45.3594915Z Sep 29 02:10:45 yield "current 
> timestamp: {}, current watermark: {}, current_value: {}"\
> 2022-09-29T02:10:45.3596201Z Sep 29 02:10:45 
> .format(str(current_timestamp), str(current_watermark), str(value))
> 2022-09-29T02:10:45.3597089Z Sep 29 02:10:45 
> 2022-09-29T02:10:45.3597942Z Sep 29 02:10:45 watermark_strategy = 
> WatermarkStrategy.for_monotonous_timestamps()\
> 2022-09-29T02:10:45.3599260Z Sep 29 02:10:45 
> .with_timestamp_assigner(SecondColumnTimestampAssigner())
> 2022-09-29T02:10:45.3600611Z Sep 29 02:10:45 
> data_stream.assign_timestamps_and_watermarks(watermark_strategy)\
> 2022-09-29T02:10:45.3601877Z Sep 29 02:10:45 
> .process(MyProcessFunction(), 
> output_type=Types.STRING()).add_sink(self.test_sink)
> 2022-09-29T02:10:45.3603527Z Sep 29 02:10:45 self.env.execute('test 
> process function')
> 2022-09-29T02:10:45.3604445Z Sep 29 02:10:45 results = 
> self.test_sink.get_results()
> 2022-09-29T02:10:45.3605684Z Sep 29 02:10:45 expected = ["current 
> timestamp: 1603708211000, current watermark: "
> 2022-09-29T02:10:45.3607157Z Sep 29 02:10:45 
> "-9223372036854775808, current_value: Row(f0=1, f1='1603708211000')",
> 2022-09-29T02:10:45.3608256Z Sep 29 02:10:45 "current 
> timestamp: 1603708224000, current watermark: "
> 2022-09-29T02:10:45.3609650Z Sep 29 02:10:45 
> "-9223372036854775808, current_value: Row(f0=2, f1='1603708224000')",
> 2022-09-29T02:10:45.3610854Z Sep 29 02:10:45 "current 
> timestamp: 1603708226000, current watermark: "
> 2022-09-29T02:10:45.3612279Z Sep 29 02:10:45 
> "-9223372036854775808, current_value: Row(f0=3, f1='1603708226000')",
> 2022-09-29T02:10:45.3613382Z Sep 29 02:10:45 "current 
> timestamp: 1603708289000, current watermark: "
> 2022-09-29T02:10:45.3615683Z Sep 29 02:10:45 
> "-9223372036854775808, current_value: Row(f0=4, f1='1603708289000')"]
> 2022-09-29T02:10:45.3617687Z Sep 29 02:10:45 >   
> self.assert_equals_sorted(expected, results)
> 2022-09-29T02:10:45.3618620Z Sep 29 02:10:45 
> 2022-09-29T02:10:45.3619425Z Sep 29 02:10:45 
> 

[jira] [Commented] (FLINK-29461) ProcessDataStreamStreamingTests.test_process_function unstable

2022-11-01 Thread Martijn Visser (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17627008#comment-17627008
 ] 

Martijn Visser commented on FLINK-29461:


https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=42680=logs=e92ecf6d-e207-5a42-7ff7-528ff0c5b259=40fc352e-9b4c-5fd8-363f-628f24b01ec2=25187

> ProcessDataStreamStreamingTests.test_process_function unstable
> --
>
> Key: FLINK-29461
> URL: https://issues.apache.org/jira/browse/FLINK-29461
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python
>Affects Versions: 1.16.0
>Reporter: Huang Xingbo
>Priority: Major
>  Labels: test-stability
>
> {code:java}
> 2022-09-29T02:10:45.3571648Z Sep 29 02:10:45 self = 
>  testMethod=test_process_function>
> 2022-09-29T02:10:45.3572279Z Sep 29 02:10:45 
> 2022-09-29T02:10:45.3572810Z Sep 29 02:10:45 def 
> test_process_function(self):
> 2022-09-29T02:10:45.3573495Z Sep 29 02:10:45 
> self.env.set_parallelism(1)
> 2022-09-29T02:10:45.3574148Z Sep 29 02:10:45 
> self.env.get_config().set_auto_watermark_interval(2000)
> 2022-09-29T02:10:45.3580634Z Sep 29 02:10:45 
> self.env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
> 2022-09-29T02:10:45.3583194Z Sep 29 02:10:45 data_stream = 
> self.env.from_collection([(1, '1603708211000'),
> 2022-09-29T02:10:45.3584515Z Sep 29 02:10:45  
>(2, '1603708224000'),
> 2022-09-29T02:10:45.3585957Z Sep 29 02:10:45  
>(3, '1603708226000'),
> 2022-09-29T02:10:45.3587132Z Sep 29 02:10:45  
>(4, '1603708289000')],
> 2022-09-29T02:10:45.3588094Z Sep 29 02:10:45  
>   type_info=Types.ROW([Types.INT(), Types.STRING()]))
> 2022-09-29T02:10:45.3589090Z Sep 29 02:10:45 
> 2022-09-29T02:10:45.3589949Z Sep 29 02:10:45 class 
> MyProcessFunction(ProcessFunction):
> 2022-09-29T02:10:45.3590710Z Sep 29 02:10:45 
> 2022-09-29T02:10:45.3591856Z Sep 29 02:10:45 def 
> process_element(self, value, ctx):
> 2022-09-29T02:10:45.3592873Z Sep 29 02:10:45 
> current_timestamp = ctx.timestamp()
> 2022-09-29T02:10:45.3593862Z Sep 29 02:10:45 
> current_watermark = ctx.timer_service().current_watermark()
> 2022-09-29T02:10:45.3594915Z Sep 29 02:10:45 yield "current 
> timestamp: {}, current watermark: {}, current_value: {}"\
> 2022-09-29T02:10:45.3596201Z Sep 29 02:10:45 
> .format(str(current_timestamp), str(current_watermark), str(value))
> 2022-09-29T02:10:45.3597089Z Sep 29 02:10:45 
> 2022-09-29T02:10:45.3597942Z Sep 29 02:10:45 watermark_strategy = 
> WatermarkStrategy.for_monotonous_timestamps()\
> 2022-09-29T02:10:45.3599260Z Sep 29 02:10:45 
> .with_timestamp_assigner(SecondColumnTimestampAssigner())
> 2022-09-29T02:10:45.3600611Z Sep 29 02:10:45 
> data_stream.assign_timestamps_and_watermarks(watermark_strategy)\
> 2022-09-29T02:10:45.3601877Z Sep 29 02:10:45 
> .process(MyProcessFunction(), 
> output_type=Types.STRING()).add_sink(self.test_sink)
> 2022-09-29T02:10:45.3603527Z Sep 29 02:10:45 self.env.execute('test 
> process function')
> 2022-09-29T02:10:45.3604445Z Sep 29 02:10:45 results = 
> self.test_sink.get_results()
> 2022-09-29T02:10:45.3605684Z Sep 29 02:10:45 expected = ["current 
> timestamp: 1603708211000, current watermark: "
> 2022-09-29T02:10:45.3607157Z Sep 29 02:10:45 
> "-9223372036854775808, current_value: Row(f0=1, f1='1603708211000')",
> 2022-09-29T02:10:45.3608256Z Sep 29 02:10:45 "current 
> timestamp: 1603708224000, current watermark: "
> 2022-09-29T02:10:45.3609650Z Sep 29 02:10:45 
> "-9223372036854775808, current_value: Row(f0=2, f1='1603708224000')",
> 2022-09-29T02:10:45.3610854Z Sep 29 02:10:45 "current 
> timestamp: 1603708226000, current watermark: "
> 2022-09-29T02:10:45.3612279Z Sep 29 02:10:45 
> "-9223372036854775808, current_value: Row(f0=3, f1='1603708226000')",
> 2022-09-29T02:10:45.3613382Z Sep 29 02:10:45 "current 
> timestamp: 1603708289000, current watermark: "
> 2022-09-29T02:10:45.3615683Z Sep 29 02:10:45 
> "-9223372036854775808, current_value: Row(f0=4, f1='1603708289000')"]
> 2022-09-29T02:10:45.3617687Z Sep 29 02:10:45 >   
> self.assert_equals_sorted(expected, results)
> 2022-09-29T02:10:45.3618620Z Sep 29 02:10:45 
> 2022-09-29T02:10:45.3619425Z Sep 29 02:10:45 
> pyflink/datastream/tests/test_data_stream.py:986: 
> 2022-09-29T02:10:45.3620424Z