Hi all,

It was a bit tricky to figure out what was going wrong here, hopefully
someone can add the missing piece to the puzzle.

I have a Kafka source with a custom AssignerWithPeriodicWatermarks
timestamp assigner. It's a copy of the AscendingTimestampExtractor with a
log statement printing each timestamp and watermark produced (along with
the hash of the assigner instance so I know exactly how far each substream
has progressed). Attached to that there is a JDBCSinkFunction. I have set
the whole plan's parallelism to 1 and the max also to 1.

My first surprise was to see there are 16 instances of my assigner created,
despite there being only one thread using all 16.

My second surprise was to see there were only 4 assigner instances that
were extracting timestamps.

This meant the whole job's watermark wasn't advancing (and while that's not
important in this simplified example it is in my real life use case).

If I replace my JDBC sink for a print sink though all 16 assigners get
fully used (i.e. they all receive messages from which they have to extract
a timestamp).

What is happening here? I don't want to ignore the unattended Kafka
partitions or mark them as idle - because I know from using the print sink
that they do have messages in them. I'm also surprised that there are 16
instances of the assigner (one per Kafka partition) even though the
parallelism of the job is one - is that a conscious decision and if so
what's the reason?

Finally I'd also like to know why only 4 assigners are effectively been
used, I suspect it's a JDBC default I can override somehow.

Thanks for getting to the bottom of this!

Reply via email to