Hello community, Any help here please?
Thanks, Megh On Mon, May 19, 2025, 18:48 megh vidani <vidanimeg...@gmail.com> wrote: > I'm aware that Spark does not rely on the kafka committed offsets. It is > purely for monitoring purposes. > > Thanks, > Megh > > On Mon, May 19, 2025, 18:46 megh vidani <vidanimeg...@gmail.com> wrote: > >> Hi Prashant, >> >> I would like to do it so that I can monitor the consumer group along with >> my other consumer groups. >> >> Thanks, >> Megh >> >> On Mon, May 19, 2025, 18:21 Prashant Sharma <scrapco...@gmail.com> wrote: >> >>> Spark does not rely on Kafka's commit, in fact it tracks the stream >>> progress itself and reads via offsets (e.g. from last read points). Why do >>> you want to commit? >>> >>> On Mon, May 19, 2025 at 5:58 PM megh vidani <vidanimeg...@gmail.com> >>> wrote: >>> >>>> Hello Spark Dev Community, >>>> >>>> Reaching out for the below problem statement. >>>> >>>> Thanks, >>>> Megh >>>> >>>> On Mon, May 19, 2025, 13:16 megh vidani <vidanimeg...@gmail.com> wrote: >>>> >>>>> Hello Spark Community, I have a structured streaming job in which I'm >>>>> consuming a topic with the same name in two different kafka clusters and >>>>> then creating a union of these two streams. I've developed a custom >>>>> query listener to commit the offsets back to the kafka clusters once every >>>>> batch is completed in the onQueryProgress method. Now here's my >>>>> problem: To commit these offsets back to Kafka, I need to iterate over the >>>>> event.progress.sources list to fetch the individual source (2 in my case) >>>>> and call the endOffsets method to fetch the latest offsets that have been >>>>> consumed by the Query. As per the current structure of the source object, >>>>> kafka bootstrap server info is not stored and it only stores the topic >>>>> name. Since the topic name is same in both the streams, I can't determine >>>>> which source belongs to cluster1 and which belongs to cluster2. This >>>>> is basically leading to incorrect offsets getting committed to incorrect >>>>> cluster. Has anyone encountered this issue before and have any >>>>> solution to this? I tried to find a way to pass some custom metadata >>>>> to the source object but there doesn't seem to be any. If anyone has >>>>> any inputs please let me know. Thanks in advance. Megh >>>>> >>>>