I’m running Beam 2.2.0 on Flink 1.3 using KafkaIO. Reading from two topics, applying a fixed window, joining via a CoGrouByKey, and outputting to another topic.
Example code that reproduces the issue can be seen here: https://gist.github.com/salbanese/c46df2718c09a897e04d498c3f59d9d7 When I cancel the job with a save point via flink cancel –s /path/to/savepoint job-id, then restart the job from that save point via flink run –s /path/to/savepoint –c … each subsequent save point grows in size by about 30 percent or so, and eventually flink starts timing out and fails to cancel the job. Eliminating the CoGroupByKey seems to stop this behavior, and save points are consistent from one run to the next. I feel like I must be missing something. Any advice would be appreciated. Thanks -seth