Hi all, It was a bit tricky to figure out what was going wrong here, hopefully someone can add the missing piece to the puzzle.
I have a Kafka source with a custom AssignerWithPeriodicWatermarks timestamp assigner. It's a copy of the AscendingTimestampExtractor with a log statement printing each timestamp and watermark produced (along with the hash of the assigner instance so I know exactly how far each substream has progressed). Attached to that there is a JDBCSinkFunction. I have set the whole plan's parallelism to 1 and the max also to 1. My first surprise was to see there are 16 instances of my assigner created, despite there being only one thread using all 16. My second surprise was to see there were only 4 assigner instances that were extracting timestamps. This meant the whole job's watermark wasn't advancing (and while that's not important in this simplified example it is in my real life use case). If I replace my JDBC sink for a print sink though all 16 assigners get fully used (i.e. they all receive messages from which they have to extract a timestamp). What is happening here? I don't want to ignore the unattended Kafka partitions or mark them as idle - because I know from using the print sink that they do have messages in them. I'm also surprised that there are 16 instances of the assigner (one per Kafka partition) even though the parallelism of the job is one - is that a conscious decision and if so what's the reason? Finally I'd also like to know why only 4 assigners are effectively been used, I suspect it's a JDBC default I can override somehow. Thanks for getting to the bottom of this!