Hi, Timo is correct - partition discovery is supported by the consumer only starting from Flink 1.4.
The expected behaviour without partition discovery on, is that the list of partitions picked up on the first execution of the job will be the list of subscribed partition across all executions. When restoring from a savepoint / checkpoint, discovery for new partitions will not occur. The reason why new partitions are discovered after you changed the UID of the consumer operator to a new one, is because the consumer is considered a completely new operator without any restored state. Since Flink 1.4, you can choose to enable partition discovery by setting flink.partition-discovery.interval-millis. This can be turned on / off at the start of any execution attempt. For example, you can have it off initially, take a savepoint, and when restoring change that configuration to enable discovery. Cheers, Gordon -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/