Re: KafkaSink handling message size produce errors
ticket created: https://issues.apache.org/jira/browse/FLINK-31121 On Fri, Feb 17, 2023 at 9:59 AM Hatem Mostafa wrote: > Thanks for adding this to your backlog, I think it's definitely a very > useful feature. > > Can you provide an example for how to extend KafkaSink to > add this error handling? I have tried to do so but did not find it straight > forward, since errors are thrown in the deliveryCallback of KafkaWriter and > KafkaSink is not extendable since all its members are private and the > constructor is package private. > > On Fri, Feb 17, 2023 at 8:17 AM Shammon FY wrote: > >> Hi jing, >> >> It sounds good to me, we can add an option for it >> >> Best, >> Shammon >> >> >> On Fri, Feb 17, 2023 at 3:13 PM Jing Ge wrote: >> >>> Hi, >>> >>> It makes sense to offer this feature of catching and ignoring exp with >>> config on/off, when we put ourselves in users' shoes. WDYT? I will create a >>> ticket if most of you consider it as a good feature to help users. >>> >>> Best regards, >>> Jing >>> >>> On Fri, Feb 17, 2023 at 6:01 AM Shammon FY wrote: >>> >>>> Hi Hatem >>>> >>>> As mentioned above, you can extend the KafkaSink or create a udf and >>>> process the record before sink >>>> >>>> Best, >>>> Shammon >>>> >>>> On Fri, Feb 17, 2023 at 9:54 AM yuxia >>>> wrote: >>>> >>>>> Hi, Hatem. >>>>> I think there is no way to catch the exception and then ignore it in >>>>> current implementation for KafkaSink. You may also need to extend the >>>>> KafkaSink. >>>>> >>>>> Best regards, >>>>> Yuxia >>>>> >>>>> -- >>>>> *发件人: *"Hatem Mostafa" >>>>> *收件人: *"User" >>>>> *发送时间: *星期四, 2023年 2 月 16日 下午 9:32:44 >>>>> *主题: *KafkaSink handling message size produce errors >>>>> >>>>> Hello, >>>>> I am writing a flink job that reads and writes into kafka, it is using >>>>> a window operator and eventually writing the result of the window into a >>>>> kafka topic. The accumulated data can exceed the maximum message size >>>>> after >>>>> compression on the producer level. I want to be able to catch the >>>>> exception >>>>> coming from the producer and ignore this window. I could not find a way to >>>>> do that in KafkaSink >>>>> <https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#kafka-sink>, >>>>> is there a way to do so? >>>>> >>>>> I attached here an example of an error that I would like to handle >>>>> gracefully. >>>>> >>>>> [image: image.png] >>>>> >>>>> >>>>> This question is similar to one that was asked on stackoverflow here >>>>> <https://stackoverflow.com/questions/52308911/how-to-handle-exceptions-in-kafka-sink> >>>>> but >>>>> the answer is relevant for older versions of flink. >>>>> >>>>> Regards, >>>>> Hatem >>>>> >>>>>
Re: KafkaSink handling message size produce errors
Thanks for adding this to your backlog, I think it's definitely a very useful feature. Can you provide an example for how to extend KafkaSink to add this error handling? I have tried to do so but did not find it straight forward, since errors are thrown in the deliveryCallback of KafkaWriter and KafkaSink is not extendable since all its members are private and the constructor is package private. On Fri, Feb 17, 2023 at 8:17 AM Shammon FY wrote: > Hi jing, > > It sounds good to me, we can add an option for it > > Best, > Shammon > > > On Fri, Feb 17, 2023 at 3:13 PM Jing Ge wrote: > >> Hi, >> >> It makes sense to offer this feature of catching and ignoring exp with >> config on/off, when we put ourselves in users' shoes. WDYT? I will create a >> ticket if most of you consider it as a good feature to help users. >> >> Best regards, >> Jing >> >> On Fri, Feb 17, 2023 at 6:01 AM Shammon FY wrote: >> >>> Hi Hatem >>> >>> As mentioned above, you can extend the KafkaSink or create a udf and >>> process the record before sink >>> >>> Best, >>> Shammon >>> >>> On Fri, Feb 17, 2023 at 9:54 AM yuxia >>> wrote: >>> >>>> Hi, Hatem. >>>> I think there is no way to catch the exception and then ignore it in >>>> current implementation for KafkaSink. You may also need to extend the >>>> KafkaSink. >>>> >>>> Best regards, >>>> Yuxia >>>> >>>> -- >>>> *发件人: *"Hatem Mostafa" >>>> *收件人: *"User" >>>> *发送时间: *星期四, 2023年 2 月 16日 下午 9:32:44 >>>> *主题: *KafkaSink handling message size produce errors >>>> >>>> Hello, >>>> I am writing a flink job that reads and writes into kafka, it is using >>>> a window operator and eventually writing the result of the window into a >>>> kafka topic. The accumulated data can exceed the maximum message size after >>>> compression on the producer level. I want to be able to catch the exception >>>> coming from the producer and ignore this window. I could not find a way to >>>> do that in KafkaSink >>>> <https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#kafka-sink>, >>>> is there a way to do so? >>>> >>>> I attached here an example of an error that I would like to handle >>>> gracefully. >>>> >>>> [image: image.png] >>>> >>>> >>>> This question is similar to one that was asked on stackoverflow here >>>> <https://stackoverflow.com/questions/52308911/how-to-handle-exceptions-in-kafka-sink> >>>> but >>>> the answer is relevant for older versions of flink. >>>> >>>> Regards, >>>> Hatem >>>> >>>>
Re: KafkaSink handling message size produce errors
Hi jing, It sounds good to me, we can add an option for it Best, Shammon On Fri, Feb 17, 2023 at 3:13 PM Jing Ge wrote: > Hi, > > It makes sense to offer this feature of catching and ignoring exp with > config on/off, when we put ourselves in users' shoes. WDYT? I will create a > ticket if most of you consider it as a good feature to help users. > > Best regards, > Jing > > On Fri, Feb 17, 2023 at 6:01 AM Shammon FY wrote: > >> Hi Hatem >> >> As mentioned above, you can extend the KafkaSink or create a udf and >> process the record before sink >> >> Best, >> Shammon >> >> On Fri, Feb 17, 2023 at 9:54 AM yuxia >> wrote: >> >>> Hi, Hatem. >>> I think there is no way to catch the exception and then ignore it in >>> current implementation for KafkaSink. You may also need to extend the >>> KafkaSink. >>> >>> Best regards, >>> Yuxia >>> >>> -- >>> *发件人: *"Hatem Mostafa" >>> *收件人: *"User" >>> *发送时间: *星期四, 2023年 2 月 16日 下午 9:32:44 >>> *主题: *KafkaSink handling message size produce errors >>> >>> Hello, >>> I am writing a flink job that reads and writes into kafka, it is using a >>> window operator and eventually writing the result of the window into a >>> kafka topic. The accumulated data can exceed the maximum message size after >>> compression on the producer level. I want to be able to catch the exception >>> coming from the producer and ignore this window. I could not find a way to >>> do that in KafkaSink >>> <https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#kafka-sink>, >>> is there a way to do so? >>> >>> I attached here an example of an error that I would like to handle >>> gracefully. >>> >>> [image: image.png] >>> >>> >>> This question is similar to one that was asked on stackoverflow here >>> <https://stackoverflow.com/questions/52308911/how-to-handle-exceptions-in-kafka-sink> >>> but >>> the answer is relevant for older versions of flink. >>> >>> Regards, >>> Hatem >>> >>>
Re: KafkaSink handling message size produce errors
Hi, It makes sense to offer this feature of catching and ignoring exp with config on/off, when we put ourselves in users' shoes. WDYT? I will create a ticket if most of you consider it as a good feature to help users. Best regards, Jing On Fri, Feb 17, 2023 at 6:01 AM Shammon FY wrote: > Hi Hatem > > As mentioned above, you can extend the KafkaSink or create a udf and > process the record before sink > > Best, > Shammon > > On Fri, Feb 17, 2023 at 9:54 AM yuxia wrote: > >> Hi, Hatem. >> I think there is no way to catch the exception and then ignore it in >> current implementation for KafkaSink. You may also need to extend the >> KafkaSink. >> >> Best regards, >> Yuxia >> >> -- >> *发件人: *"Hatem Mostafa" >> *收件人: *"User" >> *发送时间: *星期四, 2023年 2 月 16日 下午 9:32:44 >> *主题: *KafkaSink handling message size produce errors >> >> Hello, >> I am writing a flink job that reads and writes into kafka, it is using a >> window operator and eventually writing the result of the window into a >> kafka topic. The accumulated data can exceed the maximum message size after >> compression on the producer level. I want to be able to catch the exception >> coming from the producer and ignore this window. I could not find a way to >> do that in KafkaSink >> <https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#kafka-sink>, >> is there a way to do so? >> >> I attached here an example of an error that I would like to handle >> gracefully. >> >> [image: image.png] >> >> >> This question is similar to one that was asked on stackoverflow here >> <https://stackoverflow.com/questions/52308911/how-to-handle-exceptions-in-kafka-sink> >> but >> the answer is relevant for older versions of flink. >> >> Regards, >> Hatem >> >>
Re: KafkaSink handling message size produce errors
Hi Hatem As mentioned above, you can extend the KafkaSink or create a udf and process the record before sink Best, Shammon On Fri, Feb 17, 2023 at 9:54 AM yuxia wrote: > Hi, Hatem. > I think there is no way to catch the exception and then ignore it in > current implementation for KafkaSink. You may also need to extend the > KafkaSink. > > Best regards, > Yuxia > > -- > *发件人: *"Hatem Mostafa" > *收件人: *"User" > *发送时间: *星期四, 2023年 2 月 16日 下午 9:32:44 > *主题: *KafkaSink handling message size produce errors > > Hello, > I am writing a flink job that reads and writes into kafka, it is using a > window operator and eventually writing the result of the window into a > kafka topic. The accumulated data can exceed the maximum message size after > compression on the producer level. I want to be able to catch the exception > coming from the producer and ignore this window. I could not find a way to > do that in KafkaSink > <https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#kafka-sink>, > is there a way to do so? > > I attached here an example of an error that I would like to handle > gracefully. > > [image: image.png] > > > This question is similar to one that was asked on stackoverflow here > <https://stackoverflow.com/questions/52308911/how-to-handle-exceptions-in-kafka-sink> > but > the answer is relevant for older versions of flink. > > Regards, > Hatem > >
Re: KafkaSink handling message size produce errors
Hi, Hatem. I think there is no way to catch the exception and then ignore it in current implementation for KafkaSink. You may also need to extend the KafkaSink. Best regards, Yuxia 发件人: "Hatem Mostafa" 收件人: "User" 发送时间: 星期四, 2023年 2 月 16日 下午 9:32:44 主题: KafkaSink handling message size produce errors Hello, I am writing a flink job that reads and writes into kafka, it is using a window operator and eventually writing the result of the window into a kafka topic. The accumulated data can exceed the maximum message size after compression on the producer level. I want to be able to catch the exception coming from the producer and ignore this window. I could not find a way to do that in [ https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#kafka-sink | KafkaSink ] , is there a way to do so? I attached here an example of an error that I would like to handle gracefully. This question is similar to one that was asked on stackoverflow [ https://stackoverflow.com/questions/52308911/how-to-handle-exceptions-in-kafka-sink | here ] but the answer is relevant for older versions of flink. Regards, Hatem