Re: KafkaSink handling message size produce errors

2023-02-17 Thread Jing Ge via user
ticket created: https://issues.apache.org/jira/browse/FLINK-31121

On Fri, Feb 17, 2023 at 9:59 AM Hatem Mostafa  wrote:

> Thanks for adding this to your backlog, I think it's definitely a very
> useful feature.
>
> Can you provide an example for how to extend KafkaSink to
> add this error handling? I have tried to do so but did not find it straight
> forward, since errors are thrown in the deliveryCallback of KafkaWriter and
> KafkaSink is not extendable since all its members are private and the
> constructor is package private.
>
> On Fri, Feb 17, 2023 at 8:17 AM Shammon FY  wrote:
>
>> Hi jing,
>>
>> It sounds good to me, we can add an option for it
>>
>> Best,
>> Shammon
>>
>>
>> On Fri, Feb 17, 2023 at 3:13 PM Jing Ge  wrote:
>>
>>> Hi,
>>>
>>> It makes sense to offer this feature of catching and ignoring exp with
>>> config on/off, when we put ourselves in users' shoes. WDYT? I will create a
>>> ticket if most of you consider it as a good feature to help users.
>>>
>>> Best regards,
>>> Jing
>>>
>>> On Fri, Feb 17, 2023 at 6:01 AM Shammon FY  wrote:
>>>
>>>> Hi Hatem
>>>>
>>>> As mentioned above, you can extend the KafkaSink or create a udf and
>>>> process the record before sink
>>>>
>>>> Best,
>>>> Shammon
>>>>
>>>> On Fri, Feb 17, 2023 at 9:54 AM yuxia 
>>>> wrote:
>>>>
>>>>> Hi, Hatem.
>>>>> I think there is no way to catch the exception and then ignore it in
>>>>> current implementation for KafkaSink.  You may also need to extend the
>>>>> KafkaSink.
>>>>>
>>>>> Best regards,
>>>>> Yuxia
>>>>>
>>>>> --
>>>>> *发件人: *"Hatem Mostafa" 
>>>>> *收件人: *"User" 
>>>>> *发送时间: *星期四, 2023年 2 月 16日 下午 9:32:44
>>>>> *主题: *KafkaSink handling message size produce errors
>>>>>
>>>>> Hello,
>>>>> I am writing a flink job that reads and writes into kafka, it is using
>>>>> a window operator and eventually writing the result of the window into a
>>>>> kafka topic. The accumulated data can exceed the maximum message size 
>>>>> after
>>>>> compression on the producer level. I want to be able to catch the 
>>>>> exception
>>>>> coming from the producer and ignore this window. I could not find a way to
>>>>> do that in KafkaSink
>>>>> <https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#kafka-sink>,
>>>>> is there a way to do so?
>>>>>
>>>>> I attached here an example of an error that I would like to handle
>>>>> gracefully.
>>>>>
>>>>> [image: image.png]
>>>>>
>>>>>
>>>>> This question is similar to one that was asked on stackoverflow here
>>>>> <https://stackoverflow.com/questions/52308911/how-to-handle-exceptions-in-kafka-sink>
>>>>>  but
>>>>> the answer is relevant for older versions of flink.
>>>>>
>>>>> Regards,
>>>>> Hatem
>>>>>
>>>>>


Re: KafkaSink handling message size produce errors

2023-02-17 Thread Hatem Mostafa
Thanks for adding this to your backlog, I think it's definitely a very
useful feature.

Can you provide an example for how to extend KafkaSink to
add this error handling? I have tried to do so but did not find it straight
forward, since errors are thrown in the deliveryCallback of KafkaWriter and
KafkaSink is not extendable since all its members are private and the
constructor is package private.

On Fri, Feb 17, 2023 at 8:17 AM Shammon FY  wrote:

> Hi jing,
>
> It sounds good to me, we can add an option for it
>
> Best,
> Shammon
>
>
> On Fri, Feb 17, 2023 at 3:13 PM Jing Ge  wrote:
>
>> Hi,
>>
>> It makes sense to offer this feature of catching and ignoring exp with
>> config on/off, when we put ourselves in users' shoes. WDYT? I will create a
>> ticket if most of you consider it as a good feature to help users.
>>
>> Best regards,
>> Jing
>>
>> On Fri, Feb 17, 2023 at 6:01 AM Shammon FY  wrote:
>>
>>> Hi Hatem
>>>
>>> As mentioned above, you can extend the KafkaSink or create a udf and
>>> process the record before sink
>>>
>>> Best,
>>> Shammon
>>>
>>> On Fri, Feb 17, 2023 at 9:54 AM yuxia 
>>> wrote:
>>>
>>>> Hi, Hatem.
>>>> I think there is no way to catch the exception and then ignore it in
>>>> current implementation for KafkaSink.  You may also need to extend the
>>>> KafkaSink.
>>>>
>>>> Best regards,
>>>> Yuxia
>>>>
>>>> --
>>>> *发件人: *"Hatem Mostafa" 
>>>> *收件人: *"User" 
>>>> *发送时间: *星期四, 2023年 2 月 16日 下午 9:32:44
>>>> *主题: *KafkaSink handling message size produce errors
>>>>
>>>> Hello,
>>>> I am writing a flink job that reads and writes into kafka, it is using
>>>> a window operator and eventually writing the result of the window into a
>>>> kafka topic. The accumulated data can exceed the maximum message size after
>>>> compression on the producer level. I want to be able to catch the exception
>>>> coming from the producer and ignore this window. I could not find a way to
>>>> do that in KafkaSink
>>>> <https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#kafka-sink>,
>>>> is there a way to do so?
>>>>
>>>> I attached here an example of an error that I would like to handle
>>>> gracefully.
>>>>
>>>> [image: image.png]
>>>>
>>>>
>>>> This question is similar to one that was asked on stackoverflow here
>>>> <https://stackoverflow.com/questions/52308911/how-to-handle-exceptions-in-kafka-sink>
>>>>  but
>>>> the answer is relevant for older versions of flink.
>>>>
>>>> Regards,
>>>> Hatem
>>>>
>>>>


Re: KafkaSink handling message size produce errors

2023-02-16 Thread Shammon FY
Hi jing,

It sounds good to me, we can add an option for it

Best,
Shammon


On Fri, Feb 17, 2023 at 3:13 PM Jing Ge  wrote:

> Hi,
>
> It makes sense to offer this feature of catching and ignoring exp with
> config on/off, when we put ourselves in users' shoes. WDYT? I will create a
> ticket if most of you consider it as a good feature to help users.
>
> Best regards,
> Jing
>
> On Fri, Feb 17, 2023 at 6:01 AM Shammon FY  wrote:
>
>> Hi Hatem
>>
>> As mentioned above, you can extend the KafkaSink or create a udf and
>> process the record before sink
>>
>> Best,
>> Shammon
>>
>> On Fri, Feb 17, 2023 at 9:54 AM yuxia 
>> wrote:
>>
>>> Hi, Hatem.
>>> I think there is no way to catch the exception and then ignore it in
>>> current implementation for KafkaSink.  You may also need to extend the
>>> KafkaSink.
>>>
>>> Best regards,
>>> Yuxia
>>>
>>> --
>>> *发件人: *"Hatem Mostafa" 
>>> *收件人: *"User" 
>>> *发送时间: *星期四, 2023年 2 月 16日 下午 9:32:44
>>> *主题: *KafkaSink handling message size produce errors
>>>
>>> Hello,
>>> I am writing a flink job that reads and writes into kafka, it is using a
>>> window operator and eventually writing the result of the window into a
>>> kafka topic. The accumulated data can exceed the maximum message size after
>>> compression on the producer level. I want to be able to catch the exception
>>> coming from the producer and ignore this window. I could not find a way to
>>> do that in KafkaSink
>>> <https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#kafka-sink>,
>>> is there a way to do so?
>>>
>>> I attached here an example of an error that I would like to handle
>>> gracefully.
>>>
>>> [image: image.png]
>>>
>>>
>>> This question is similar to one that was asked on stackoverflow here
>>> <https://stackoverflow.com/questions/52308911/how-to-handle-exceptions-in-kafka-sink>
>>>  but
>>> the answer is relevant for older versions of flink.
>>>
>>> Regards,
>>> Hatem
>>>
>>>


Re: KafkaSink handling message size produce errors

2023-02-16 Thread Jing Ge via user
Hi,

It makes sense to offer this feature of catching and ignoring exp with
config on/off, when we put ourselves in users' shoes. WDYT? I will create a
ticket if most of you consider it as a good feature to help users.

Best regards,
Jing

On Fri, Feb 17, 2023 at 6:01 AM Shammon FY  wrote:

> Hi Hatem
>
> As mentioned above, you can extend the KafkaSink or create a udf and
> process the record before sink
>
> Best,
> Shammon
>
> On Fri, Feb 17, 2023 at 9:54 AM yuxia  wrote:
>
>> Hi, Hatem.
>> I think there is no way to catch the exception and then ignore it in
>> current implementation for KafkaSink.  You may also need to extend the
>> KafkaSink.
>>
>> Best regards,
>> Yuxia
>>
>> --
>> *发件人: *"Hatem Mostafa" 
>> *收件人: *"User" 
>> *发送时间: *星期四, 2023年 2 月 16日 下午 9:32:44
>> *主题: *KafkaSink handling message size produce errors
>>
>> Hello,
>> I am writing a flink job that reads and writes into kafka, it is using a
>> window operator and eventually writing the result of the window into a
>> kafka topic. The accumulated data can exceed the maximum message size after
>> compression on the producer level. I want to be able to catch the exception
>> coming from the producer and ignore this window. I could not find a way to
>> do that in KafkaSink
>> <https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#kafka-sink>,
>> is there a way to do so?
>>
>> I attached here an example of an error that I would like to handle
>> gracefully.
>>
>> [image: image.png]
>>
>>
>> This question is similar to one that was asked on stackoverflow here
>> <https://stackoverflow.com/questions/52308911/how-to-handle-exceptions-in-kafka-sink>
>>  but
>> the answer is relevant for older versions of flink.
>>
>> Regards,
>> Hatem
>>
>>


Re: KafkaSink handling message size produce errors

2023-02-16 Thread Shammon FY
Hi Hatem

As mentioned above, you can extend the KafkaSink or create a udf and
process the record before sink

Best,
Shammon

On Fri, Feb 17, 2023 at 9:54 AM yuxia  wrote:

> Hi, Hatem.
> I think there is no way to catch the exception and then ignore it in
> current implementation for KafkaSink.  You may also need to extend the
> KafkaSink.
>
> Best regards,
> Yuxia
>
> --
> *发件人: *"Hatem Mostafa" 
> *收件人: *"User" 
> *发送时间: *星期四, 2023年 2 月 16日 下午 9:32:44
> *主题: *KafkaSink handling message size produce errors
>
> Hello,
> I am writing a flink job that reads and writes into kafka, it is using a
> window operator and eventually writing the result of the window into a
> kafka topic. The accumulated data can exceed the maximum message size after
> compression on the producer level. I want to be able to catch the exception
> coming from the producer and ignore this window. I could not find a way to
> do that in KafkaSink
> <https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#kafka-sink>,
> is there a way to do so?
>
> I attached here an example of an error that I would like to handle
> gracefully.
>
> [image: image.png]
>
>
> This question is similar to one that was asked on stackoverflow here
> <https://stackoverflow.com/questions/52308911/how-to-handle-exceptions-in-kafka-sink>
>  but
> the answer is relevant for older versions of flink.
>
> Regards,
> Hatem
>
>


Re: KafkaSink handling message size produce errors

2023-02-16 Thread yuxia
Hi, Hatem. 
I think there is no way to catch the exception and then ignore it in current 
implementation for KafkaSink. You may also need to extend the KafkaSink. 

Best regards, 
Yuxia 


发件人: "Hatem Mostafa"  
收件人: "User"  
发送时间: 星期四, 2023年 2 月 16日 下午 9:32:44 
主题: KafkaSink handling message size produce errors 

Hello, 
I am writing a flink job that reads and writes into kafka, it is using a window 
operator and eventually writing the result of the window into a kafka topic. 
The accumulated data can exceed the maximum message size after compression on 
the producer level. I want to be able to catch the exception coming from the 
producer and ignore this window. I could not find a way to do that in [ 
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#kafka-sink
 | KafkaSink ] , is there a way to do so? 

I attached here an example of an error that I would like to handle gracefully. 




This question is similar to one that was asked on stackoverflow [ 
https://stackoverflow.com/questions/52308911/how-to-handle-exceptions-in-kafka-sink
 | here ] but the answer is relevant for older versions of flink. 

Regards, 
Hatem