Hello Spark Community, I have a structured streaming job in which I'm
consuming a topic with the same name in two different kafka clusters and
then creating a union of these two streams. I've developed a custom query
listener to commit the offsets back to the kafka clusters once every batch
is completed in the onQueryProgress method. Now here's my problem: To
commit these offsets back to Kafka, I need to iterate over the
event.progress.sources list to fetch the individual source (2 in my case)
and call the endOffsets method to fetch the latest offsets that have been
consumed by the Query. As per the current structure of the source object,
kafka bootstrap server info is not stored and it only stores the topic
name. Since the topic name is same in both the streams, I can't determine
which source belongs to cluster1 and which belongs to cluster2. This is
basically leading to incorrect offsets getting committed to incorrect
cluster. Has anyone encountered this issue before and have any solution to
this? I tried to find a way to pass some custom metadata to the source
object but there doesn't seem to be any. If anyone has any inputs please
let me know. Thanks in advance. Megh