[ https://issues.apache.org/jira/browse/FLINK-18053?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Sergii Mikhtoniuk updated FLINK-18053: -------------------------------------- Description: Flink produces invalid result when streaming SQL aggregation is stopped and resumed from a savepoint. *Steps to reproduce:* 1) Create an assembly from the attached file. This job will be reading CSV files as a stream. Files contain fake stock tickers which will be aggregated with following tumbling window query: {code:java} SELECT TUMBLE_START(event_time, INTERVAL '1' DAY) as event_time, symbol as symbol, min(price) as `min`, max(price) as `max` FROM Tickers GROUP BY TUMBLE(event_time, INTERVAL '1' DAY), symbol {code} Stream uses punctuated watermarks with max lateness of 1 day 2) Create two CSV files with fake stock tickers: {{1.csv}}: {code:java} 2000-01-01 01:00:00.0,A,10 2000-01-01 01:00:00.0,B,20 2000-01-01 02:00:00.0,A,10 2000-01-01 02:00:00.0,B,21 2000-01-02 01:00:00.0,A,12 2000-01-02 01:00:00.0,B,22 2000-01-02 02:00:00.0,A,13 2000-01-02 02:00:00.0,B,23 2000-01-01 03:00:00.0,A,11 // Late arrival - still above watermark 2000-01-03 01:00:00.0,A,14 2000-01-03 01:00:00.0,B,24 2000-01-03 02:00:00.0,A,15 2000-01-03 02:00:00.0,B,25 {code} {{2.csv}}: {code:java} 2000-01-01 04:00:00.0,A,12 // Late arrival - under watermark 2000-01-04 01:00:00.0,A,16 // Next values won't be visible in the result, they only push watermark up 2000-01-04 01:00:00.0,B,26 2000-01-04 02:00:00.0,A,17 2000-01-04 02:00:00.0,B,27 2000-01-05 01:00:00.0,A,18 2000-01-05 01:00:00.0,B,28 {code} 3) Run the job on the folder containing both files. Observed result is as expected: {code:java} 2000-01-01,A,10,11 2000-01-01,B,20,21 2000-01-02,A,12,13 2000-01-02,B,22,23 2000-01-03,A,14,15 2000-01-03,B,24,25 {code} 4) Now run the job with only {{1.csv}} in the directory. Produces still correct: {code:java} 2000-01-01,A,10,11 2000-01-01,B,20,21 {code} 5) Cancel job with savepoint, move {{2.csv}} into the directory. Restart job from savepoint. Produces incorrect result: {code:java} 2000-01-01,A,12,12 2000-01-02,A,12,13 2000-01-02,B,22,23 2000-01-03,A,14,15 2000-01-03,B,24,25 {code} *Expectation:* We were not supposed to see {{2000-01-01,A,12,12}} record, as it should not have passed the watermark check. This tells me that Flink did not save the watermark in the savepoint. was: Flink produces invalid result when streaming SQL aggregation is stopped and resumed from a savepoint. *Steps to reproduce:* 1) Create an assembly from the attached file. This job will be reading CSV files as a stream. Files contain fake stock tickers which will be aggregated with following tumbling window query: {code:java} SELECT TUMBLE_START(event_time, INTERVAL '1' DAY) as event_time, symbol as symbol, min(price) as `min`, max(price) as `max` FROM Tickers GROUP BY TUMBLE(event_time, INTERVAL '1' DAY), symbol {code} Stream uses punctuated watermarks with max lateness of 1 day 2) Create two CSV files with fake stock tickers: {{1.csv}}: {code:java} 2000-01-01 01:00:00.0,A,10 2000-01-01 01:00:00.0,B,20 2000-01-01 02:00:00.0,A,10 2000-01-01 02:00:00.0,B,21 2000-01-02 01:00:00.0,A,12 2000-01-02 01:00:00.0,B,22 2000-01-02 02:00:00.0,A,13 2000-01-02 02:00:00.0,B,23 2000-01-01 03:00:00.0,A,11 // Late arrival - still above watermark 2000-01-03 01:00:00.0,A,14 2000-01-03 01:00:00.0,B,24 2000-01-03 02:00:00.0,A,15 2000-01-03 02:00:00.0,B,25 {code} {{2.csv}}: {code:java} 2000-01-01 04:00:00.0,A,12 // Late arrival - under watermark 2000-01-04 01:00:00.0,A,16 // Next values won't be visible in the result, they only push watermark up 2000-01-04 01:00:00.0,B,26 2000-01-04 02:00:00.0,A,17 2000-01-04 02:00:00.0,B,27 2000-01-05 01:00:00.0,A,18 2000-01-05 01:00:00.0,B,28 {code} 3) Run the job on the folder containing both files. Observed result is as expected: {code:java} 2000-01-01,A,10,11 2000-01-01,B,20,21 2000-01-02,A,12,13 2000-01-02,B,22,23 2000-01-03,A,14,15 2000-01-03,B,24,25 {code} 4) Now run the job with only {{1.csv}} in the directory. Produces still correct: {code:java} 2000-01-01,A,10,11 2000-01-01,B,20,21 {code} 5) Cancel job with savepoint, move {{2.csv}} into the directory. Restart job from savepoint. Produces incorrect result: {code:java} 2000-01-01,A,12,12 2000-01-02,A,12,13 2000-01-02,B,22,23 2000-01-03,A,14,15 2000-01-03,B,24,25 {code} *Expectation:* We were not supposed to see {{20[^MyApp.scala][^1.csv]00-01-01,A,12,12}} record, as it should not have passed the watermark check. This tells me that Flink did not save the watermark in the savepoint. > Savepoints do not preserve watermarks > ------------------------------------- > > Key: FLINK-18053 > URL: https://issues.apache.org/jira/browse/FLINK-18053 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing, Table SQL / Runtime > Affects Versions: 1.10.1 > Reporter: Sergii Mikhtoniuk > Priority: Major > Attachments: 1.csv, 2.csv, MyApp.scala > > > Flink produces invalid result when streaming SQL aggregation is stopped and > resumed from a savepoint. > > *Steps to reproduce:* > 1) Create an assembly from the attached file. > This job will be reading CSV files as a stream. Files contain fake stock > tickers which will be aggregated with following tumbling window query: > {code:java} > SELECT > TUMBLE_START(event_time, INTERVAL '1' DAY) as event_time, > symbol as symbol, > min(price) as `min`, > max(price) as `max` > FROM Tickers > GROUP BY TUMBLE(event_time, INTERVAL '1' DAY), symbol > {code} > Stream uses punctuated watermarks with max lateness of 1 day > 2) Create two CSV files with fake stock tickers: > {{1.csv}}: > {code:java} > 2000-01-01 01:00:00.0,A,10 > 2000-01-01 01:00:00.0,B,20 > 2000-01-01 02:00:00.0,A,10 > 2000-01-01 02:00:00.0,B,21 > 2000-01-02 01:00:00.0,A,12 > 2000-01-02 01:00:00.0,B,22 > 2000-01-02 02:00:00.0,A,13 > 2000-01-02 02:00:00.0,B,23 > 2000-01-01 03:00:00.0,A,11 // Late arrival - still above watermark > 2000-01-03 01:00:00.0,A,14 > 2000-01-03 01:00:00.0,B,24 > 2000-01-03 02:00:00.0,A,15 > 2000-01-03 02:00:00.0,B,25 > {code} > {{2.csv}}: > {code:java} > 2000-01-01 04:00:00.0,A,12 // Late arrival - under watermark > 2000-01-04 01:00:00.0,A,16 // Next values won't be visible in the result, > they only push watermark up > 2000-01-04 01:00:00.0,B,26 > 2000-01-04 02:00:00.0,A,17 > 2000-01-04 02:00:00.0,B,27 > 2000-01-05 01:00:00.0,A,18 > 2000-01-05 01:00:00.0,B,28 > {code} > 3) Run the job on the folder containing both files. Observed result is as > expected: > {code:java} > 2000-01-01,A,10,11 > 2000-01-01,B,20,21 > 2000-01-02,A,12,13 > 2000-01-02,B,22,23 > 2000-01-03,A,14,15 > 2000-01-03,B,24,25 > {code} > 4) Now run the job with only {{1.csv}} in the directory. Produces still > correct: > {code:java} > 2000-01-01,A,10,11 > 2000-01-01,B,20,21 > {code} > 5) Cancel job with savepoint, move {{2.csv}} into the directory. Restart job > from savepoint. Produces incorrect result: > {code:java} > 2000-01-01,A,12,12 > 2000-01-02,A,12,13 > 2000-01-02,B,22,23 > 2000-01-03,A,14,15 > 2000-01-03,B,24,25 > {code} > > *Expectation:* > We were not supposed to see {{2000-01-01,A,12,12}} record, as it should not > have passed the watermark check. This tells me that Flink did not save the > watermark in the savepoint. -- This message was sent by Atlassian Jira (v8.3.4#803005)