Re: Status of a job when a kafka source dies

2020-08-14 Thread Becket Qin
Hey Nick and Piotr,

Sorry for the late reply. This email somehow failed to pass my mail filter.

The KafkaConsumer in Apache Kafka itself does not throw any exception if
the broker is down. There isn't any API in KafkaConsumer telling you that
the brokers are not reachable. Instead, the consumer just keeps retrying to
fetch the records. It is designed this way so that when there is a Kafka
failure in an organization, people won't have to restart all the downstream
applications after Kafka is up again.

The Spring  Kafka consumer is a community project which wraps the Java
KafkaConsumer from Apache Kafka. The Spring Kafka consumer emits a special
event if no message is received from the consumer.poll() call for some
time. As Nick mentioned, it does not necessarily mean that the broker is
down. It simply means that there isn't any message consumed from Kafka for
some time.

Nick, can you elaborate a little bit on why you would like to have an
exception thrown in your Flink app when Kafka is down, rather than let it
run until Kafka is up again?

Thanks,

JIangjie (Becket) Qin

On Fri, Aug 14, 2020 at 4:28 PM Piotr Nowojski  wrote:

> Hey,
>
> But do you know what API is Kafka providing that Spring is using to
> provide this feature?
>
> Piotrek
>
> czw., 13 sie 2020 o 17:15 Nick Bendtner  napisał(a):
>
>> Hi Piotr,
>> Sorry for the late reply. So the poll does not throw an exception when a
>> broker goes down. In spring they solve it by generating an event [1]
>> whenever this happens and you can intercept this event,
>> consumer.timeout.ms helps to some extent does help but if the source
>> topic does not receive any messages for the specified value then it still
>> throws an exception.
>>
>> Best,
>> Nick.
>>
>>
>> [1]
>> https://docs.spring.io/spring-kafka/api/org/springframework/kafka/event/NonResponsiveConsumerEvent.html
>>
>> On Wed, Aug 5, 2020 at 1:30 PM Piotr Nowojski 
>> wrote:
>>
>>> Hi Nick,
>>>
>>> Could you elaborate more, what event and how would you like Flink to
>>> handle? Is there some kind of Kafka's API that can be used to listen to
>>> such kind of events? Becket, do you maybe know something about this?
>>>
>>> As a side note Nick, can not you configure some timeouts [1] in the
>>> KafkaConsumer? Like `request.timeout.ms` or `consumer.timeout.ms`? But
>>> as I wrote before, that would be more a question to Kafka guys.
>>>
>>> Piotrek
>>>
>>> [1] http://kafka.apache.org/20/documentation/
>>>
>>> śr., 5 sie 2020 o 19:58 Nick Bendtner  napisał(a):
>>>
 +user group.

 On Wed, Aug 5, 2020 at 12:57 PM Nick Bendtner 
 wrote:

> Thanks Piotr but shouldn't this event be handled by the
> FlinkKafkaConsumer since the poll happens inside the FlinkKafkaConsumer.
> How can I catch this event in my code since I don't have control over the
> poll.
>
> Best,
> Nick.
>
> On Wed, Aug 5, 2020 at 12:14 PM Piotr Nowojski 
> wrote:
>
>> Hi Nick,
>>
>> What Aljoscha was trying to say is that Flink is not trying to do any
>> magic. If `KafkaConsumer` - which is being used under the hood of
>> `FlinkKafkaConsumer` connector - throws an exception, this
>> exception bubbles up causing the job to failover. If the failure is 
>> handled
>> by the `KafkaConsumer` silently, that's what's happening. As we can in 
>> the
>> TM log that you attached, the latter seems to be happening - note that 
>> the
>> warning is logged by "org.apache.kafka.clients.NetworkClient" package, so
>> that's not the code we (Flink developers) control.
>>
>> If you want to change this behaviour, unless someone here on this
>> mailing list just happens to know the answer, the better place to ask 
>> such
>> a question on the Kafka mailing list. Maybe there is some way to 
>> configure
>> this.
>>
>> And sorry I don't know much about neither the KafkaConsumer nor the
>> KafkaBrokers configuration :(
>>
>> Piotrek
>>
>> wt., 4 sie 2020 o 22:04 Nick Bendtner 
>> napisał(a):
>>
>>> Hi,
>>> I don't observe this behaviour though, we use flink 1.7.2 . I
>>> stopped kafka and zookeeper on all broker nodes. On the flink side, I 
>>> see
>>> the messages in the log ( data is obfuscated) . There are no error logs.
>>> The kafka consumer properties are
>>>
>>> 1. "bootstrap.servers"
>>>
>>> 2. "zookeeper.connect
>>>
>>> 3. "auto.offset.reset"
>>>
>>> 4. "group.id"
>>>
>>> 5."security.protocol"
>>>
>>>
>>> The flink consumer starts consuming data as soon as the kafka comes
>>> back up. So I want to know in what scenario/kafka consumer config will 
>>> the
>>> job go to failed state after a finite number of restart attempts from
>>> checkpoint.
>>>
>>>
>>> TM log.
>>> 2020-08-04 19:50:55,539 WARN  org.apache.kafka.clients.NetworkClient
>>>- 

Re: Status of a job when a kafka source dies

2020-08-14 Thread Piotr Nowojski
Hey,

But do you know what API is Kafka providing that Spring is using to provide
this feature?

Piotrek

czw., 13 sie 2020 o 17:15 Nick Bendtner  napisał(a):

> Hi Piotr,
> Sorry for the late reply. So the poll does not throw an exception when a
> broker goes down. In spring they solve it by generating an event [1]
> whenever this happens and you can intercept this event,
> consumer.timeout.ms helps to some extent does help but if the source
> topic does not receive any messages for the specified value then it still
> throws an exception.
>
> Best,
> Nick.
>
>
> [1]
> https://docs.spring.io/spring-kafka/api/org/springframework/kafka/event/NonResponsiveConsumerEvent.html
>
> On Wed, Aug 5, 2020 at 1:30 PM Piotr Nowojski 
> wrote:
>
>> Hi Nick,
>>
>> Could you elaborate more, what event and how would you like Flink to
>> handle? Is there some kind of Kafka's API that can be used to listen to
>> such kind of events? Becket, do you maybe know something about this?
>>
>> As a side note Nick, can not you configure some timeouts [1] in the
>> KafkaConsumer? Like `request.timeout.ms` or `consumer.timeout.ms`? But
>> as I wrote before, that would be more a question to Kafka guys.
>>
>> Piotrek
>>
>> [1] http://kafka.apache.org/20/documentation/
>>
>> śr., 5 sie 2020 o 19:58 Nick Bendtner  napisał(a):
>>
>>> +user group.
>>>
>>> On Wed, Aug 5, 2020 at 12:57 PM Nick Bendtner 
>>> wrote:
>>>
 Thanks Piotr but shouldn't this event be handled by the
 FlinkKafkaConsumer since the poll happens inside the FlinkKafkaConsumer.
 How can I catch this event in my code since I don't have control over the
 poll.

 Best,
 Nick.

 On Wed, Aug 5, 2020 at 12:14 PM Piotr Nowojski 
 wrote:

> Hi Nick,
>
> What Aljoscha was trying to say is that Flink is not trying to do any
> magic. If `KafkaConsumer` - which is being used under the hood of
> `FlinkKafkaConsumer` connector - throws an exception, this
> exception bubbles up causing the job to failover. If the failure is 
> handled
> by the `KafkaConsumer` silently, that's what's happening. As we can in the
> TM log that you attached, the latter seems to be happening - note that the
> warning is logged by "org.apache.kafka.clients.NetworkClient" package, so
> that's not the code we (Flink developers) control.
>
> If you want to change this behaviour, unless someone here on this
> mailing list just happens to know the answer, the better place to ask such
> a question on the Kafka mailing list. Maybe there is some way to configure
> this.
>
> And sorry I don't know much about neither the KafkaConsumer nor the
> KafkaBrokers configuration :(
>
> Piotrek
>
> wt., 4 sie 2020 o 22:04 Nick Bendtner  napisał(a):
>
>> Hi,
>> I don't observe this behaviour though, we use flink 1.7.2 . I stopped
>> kafka and zookeeper on all broker nodes. On the flink side, I see the
>> messages in the log ( data is obfuscated) . There are no error logs. The
>> kafka consumer properties are
>>
>> 1. "bootstrap.servers"
>>
>> 2. "zookeeper.connect
>>
>> 3. "auto.offset.reset"
>>
>> 4. "group.id"
>>
>> 5."security.protocol"
>>
>>
>> The flink consumer starts consuming data as soon as the kafka comes
>> back up. So I want to know in what scenario/kafka consumer config will 
>> the
>> job go to failed state after a finite number of restart attempts from
>> checkpoint.
>>
>>
>> TM log.
>> 2020-08-04 19:50:55,539 WARN  org.apache.kafka.clients.NetworkClient
>>- [Consumer clientId=consumer-5,
>> groupId=flink-AIP-QA-Audit-Consumer] Connection to node 1003 (
>> yyyrspapd036.xxx.com/ss.mm.120.124:9093) could not be established.
>> Broker may not be available.
>> 2020-08-04 19:50:55,540 WARN  org.apache.kafka.clients.NetworkClient
>>- [Consumer clientId=consumer-4,
>> groupId=flink-AIP-QA-Audit-Consumer] Connection to node 1002 (
>> yyyrspapd037.xxx.com/ss.mm.120.125:9093) could not be established.
>> Broker may not be available.
>> 2020-08-04 19:50:55,791 WARN  org.apache.kafka.clients.NetworkClient
>>- [Consumer clientId=consumer-4,
>> groupId=flink-AIP-QA-Audit-Consumer] Connection to node 1004 (
>> yyyrspapd035.xxx.com/ss.mm.120.123:9093) could not be established.
>> Broker may not be available.
>> 2020-08-04 19:50:55,791 WARN  org.apache.kafka.clients.NetworkClient
>>- [Consumer clientId=consumer-6,
>> groupId=flink-AIP-QA-Audit-Consumer] Connection to node 1003 (
>> yyyrspapd036.xxx.com/ss.mm.120.124:9093) could not be established.
>> Broker may not be available.
>>
>> Best,
>> Nick
>>
>> On Mon, Jul 20, 2020 at 10:27 AM Aljoscha Krettek <
>> aljos...@apache.org> wrote:
>>
>>> 

Re: Status of a job when a kafka source dies

2020-08-13 Thread Nick Bendtner
Hi Piotr,
Sorry for the late reply. So the poll does not throw an exception when a
broker goes down. In spring they solve it by generating an event [1]
whenever this happens and you can intercept this event, consumer.timeout.ms
helps to some extent does help but if the source topic does not receive any
messages for the specified value then it still throws an exception.

Best,
Nick.


[1]
https://docs.spring.io/spring-kafka/api/org/springframework/kafka/event/NonResponsiveConsumerEvent.html

On Wed, Aug 5, 2020 at 1:30 PM Piotr Nowojski  wrote:

> Hi Nick,
>
> Could you elaborate more, what event and how would you like Flink to
> handle? Is there some kind of Kafka's API that can be used to listen to
> such kind of events? Becket, do you maybe know something about this?
>
> As a side note Nick, can not you configure some timeouts [1] in the
> KafkaConsumer? Like `request.timeout.ms` or `consumer.timeout.ms`? But as
> I wrote before, that would be more a question to Kafka guys.
>
> Piotrek
>
> [1] http://kafka.apache.org/20/documentation/
>
> śr., 5 sie 2020 o 19:58 Nick Bendtner  napisał(a):
>
>> +user group.
>>
>> On Wed, Aug 5, 2020 at 12:57 PM Nick Bendtner  wrote:
>>
>>> Thanks Piotr but shouldn't this event be handled by the
>>> FlinkKafkaConsumer since the poll happens inside the FlinkKafkaConsumer.
>>> How can I catch this event in my code since I don't have control over the
>>> poll.
>>>
>>> Best,
>>> Nick.
>>>
>>> On Wed, Aug 5, 2020 at 12:14 PM Piotr Nowojski 
>>> wrote:
>>>
 Hi Nick,

 What Aljoscha was trying to say is that Flink is not trying to do any
 magic. If `KafkaConsumer` - which is being used under the hood of
 `FlinkKafkaConsumer` connector - throws an exception, this
 exception bubbles up causing the job to failover. If the failure is handled
 by the `KafkaConsumer` silently, that's what's happening. As we can in the
 TM log that you attached, the latter seems to be happening - note that the
 warning is logged by "org.apache.kafka.clients.NetworkClient" package, so
 that's not the code we (Flink developers) control.

 If you want to change this behaviour, unless someone here on this
 mailing list just happens to know the answer, the better place to ask such
 a question on the Kafka mailing list. Maybe there is some way to configure
 this.

 And sorry I don't know much about neither the KafkaConsumer nor the
 KafkaBrokers configuration :(

 Piotrek

 wt., 4 sie 2020 o 22:04 Nick Bendtner  napisał(a):

> Hi,
> I don't observe this behaviour though, we use flink 1.7.2 . I stopped
> kafka and zookeeper on all broker nodes. On the flink side, I see the
> messages in the log ( data is obfuscated) . There are no error logs. The
> kafka consumer properties are
>
> 1. "bootstrap.servers"
>
> 2. "zookeeper.connect
>
> 3. "auto.offset.reset"
>
> 4. "group.id"
>
> 5."security.protocol"
>
>
> The flink consumer starts consuming data as soon as the kafka comes
> back up. So I want to know in what scenario/kafka consumer config will the
> job go to failed state after a finite number of restart attempts from
> checkpoint.
>
>
> TM log.
> 2020-08-04 19:50:55,539 WARN  org.apache.kafka.clients.NetworkClient
>  - [Consumer clientId=consumer-5,
> groupId=flink-AIP-QA-Audit-Consumer] Connection to node 1003 (
> yyyrspapd036.xxx.com/ss.mm.120.124:9093) could not be established.
> Broker may not be available.
> 2020-08-04 19:50:55,540 WARN  org.apache.kafka.clients.NetworkClient
>  - [Consumer clientId=consumer-4,
> groupId=flink-AIP-QA-Audit-Consumer] Connection to node 1002 (
> yyyrspapd037.xxx.com/ss.mm.120.125:9093) could not be established.
> Broker may not be available.
> 2020-08-04 19:50:55,791 WARN  org.apache.kafka.clients.NetworkClient
>  - [Consumer clientId=consumer-4,
> groupId=flink-AIP-QA-Audit-Consumer] Connection to node 1004 (
> yyyrspapd035.xxx.com/ss.mm.120.123:9093) could not be established.
> Broker may not be available.
> 2020-08-04 19:50:55,791 WARN  org.apache.kafka.clients.NetworkClient
>  - [Consumer clientId=consumer-6,
> groupId=flink-AIP-QA-Audit-Consumer] Connection to node 1003 (
> yyyrspapd036.xxx.com/ss.mm.120.124:9093) could not be established.
> Broker may not be available.
>
> Best,
> Nick
>
> On Mon, Jul 20, 2020 at 10:27 AM Aljoscha Krettek 
> wrote:
>
>> Hi,
>>
>> Flink doesn't do any special failure-handling or retry logic, so it’s
>> up
>> to how the KafkaConsumer is configured via properties. In general
>> Flink
>> doesn’t try to be smart: when something fails an exception fill
>> bubble
>> up that will fail this execution of the job. If checkpoints are
>> 

Re: Status of a job when a kafka source dies

2020-08-05 Thread Piotr Nowojski
Hi Nick,

Could you elaborate more, what event and how would you like Flink to
handle? Is there some kind of Kafka's API that can be used to listen to
such kind of events? Becket, do you maybe know something about this?

As a side note Nick, can not you configure some timeouts [1] in the
KafkaConsumer? Like `request.timeout.ms` or `consumer.timeout.ms`? But as I
wrote before, that would be more a question to Kafka guys.

Piotrek

[1] http://kafka.apache.org/20/documentation/

śr., 5 sie 2020 o 19:58 Nick Bendtner  napisał(a):

> +user group.
>
> On Wed, Aug 5, 2020 at 12:57 PM Nick Bendtner  wrote:
>
>> Thanks Piotr but shouldn't this event be handled by the
>> FlinkKafkaConsumer since the poll happens inside the FlinkKafkaConsumer.
>> How can I catch this event in my code since I don't have control over the
>> poll.
>>
>> Best,
>> Nick.
>>
>> On Wed, Aug 5, 2020 at 12:14 PM Piotr Nowojski 
>> wrote:
>>
>>> Hi Nick,
>>>
>>> What Aljoscha was trying to say is that Flink is not trying to do any
>>> magic. If `KafkaConsumer` - which is being used under the hood of
>>> `FlinkKafkaConsumer` connector - throws an exception, this
>>> exception bubbles up causing the job to failover. If the failure is handled
>>> by the `KafkaConsumer` silently, that's what's happening. As we can in the
>>> TM log that you attached, the latter seems to be happening - note that the
>>> warning is logged by "org.apache.kafka.clients.NetworkClient" package, so
>>> that's not the code we (Flink developers) control.
>>>
>>> If you want to change this behaviour, unless someone here on this
>>> mailing list just happens to know the answer, the better place to ask such
>>> a question on the Kafka mailing list. Maybe there is some way to configure
>>> this.
>>>
>>> And sorry I don't know much about neither the KafkaConsumer nor the
>>> KafkaBrokers configuration :(
>>>
>>> Piotrek
>>>
>>> wt., 4 sie 2020 o 22:04 Nick Bendtner  napisał(a):
>>>
 Hi,
 I don't observe this behaviour though, we use flink 1.7.2 . I stopped
 kafka and zookeeper on all broker nodes. On the flink side, I see the
 messages in the log ( data is obfuscated) . There are no error logs. The
 kafka consumer properties are

 1. "bootstrap.servers"

 2. "zookeeper.connect

 3. "auto.offset.reset"

 4. "group.id"

 5."security.protocol"


 The flink consumer starts consuming data as soon as the kafka comes
 back up. So I want to know in what scenario/kafka consumer config will the
 job go to failed state after a finite number of restart attempts from
 checkpoint.


 TM log.
 2020-08-04 19:50:55,539 WARN  org.apache.kafka.clients.NetworkClient
  - [Consumer clientId=consumer-5,
 groupId=flink-AIP-QA-Audit-Consumer] Connection to node 1003 (
 yyyrspapd036.xxx.com/ss.mm.120.124:9093) could not be established.
 Broker may not be available.
 2020-08-04 19:50:55,540 WARN  org.apache.kafka.clients.NetworkClient
  - [Consumer clientId=consumer-4,
 groupId=flink-AIP-QA-Audit-Consumer] Connection to node 1002 (
 yyyrspapd037.xxx.com/ss.mm.120.125:9093) could not be established.
 Broker may not be available.
 2020-08-04 19:50:55,791 WARN  org.apache.kafka.clients.NetworkClient
  - [Consumer clientId=consumer-4,
 groupId=flink-AIP-QA-Audit-Consumer] Connection to node 1004 (
 yyyrspapd035.xxx.com/ss.mm.120.123:9093) could not be established.
 Broker may not be available.
 2020-08-04 19:50:55,791 WARN  org.apache.kafka.clients.NetworkClient
  - [Consumer clientId=consumer-6,
 groupId=flink-AIP-QA-Audit-Consumer] Connection to node 1003 (
 yyyrspapd036.xxx.com/ss.mm.120.124:9093) could not be established.
 Broker may not be available.

 Best,
 Nick

 On Mon, Jul 20, 2020 at 10:27 AM Aljoscha Krettek 
 wrote:

> Hi,
>
> Flink doesn't do any special failure-handling or retry logic, so it’s
> up
> to how the KafkaConsumer is configured via properties. In general
> Flink
> doesn’t try to be smart: when something fails an exception fill bubble
> up that will fail this execution of the job. If checkpoints are
> enabled
> this will trigger a restore, this is controlled by the restart
> strategy.
> If that eventually gives up the job fill go to “FAILED” and stop.
>
> This is the relevant section of the docs:
>
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/task_failure_recovery.html
>
> Best,
> Aljoscha
>
> On 15.07.20 17:42, Nick Bendtner wrote:
> > Hi guys,
> > I want to know what is the default behavior of Kafka source when a
> kafka
> > cluster goes down during streaming. Will the job status go to
> failing or is
> > the exception caught and there is a back off before the source tries
> to

Re: Status of a job when a kafka source dies

2020-08-05 Thread Nick Bendtner
+user group.

On Wed, Aug 5, 2020 at 12:57 PM Nick Bendtner  wrote:

> Thanks Piotr but shouldn't this event be handled by the FlinkKafkaConsumer
> since the poll happens inside the FlinkKafkaConsumer. How can I catch this
> event in my code since I don't have control over the poll.
>
> Best,
> Nick.
>
> On Wed, Aug 5, 2020 at 12:14 PM Piotr Nowojski 
> wrote:
>
>> Hi Nick,
>>
>> What Aljoscha was trying to say is that Flink is not trying to do any
>> magic. If `KafkaConsumer` - which is being used under the hood of
>> `FlinkKafkaConsumer` connector - throws an exception, this
>> exception bubbles up causing the job to failover. If the failure is handled
>> by the `KafkaConsumer` silently, that's what's happening. As we can in the
>> TM log that you attached, the latter seems to be happening - note that the
>> warning is logged by "org.apache.kafka.clients.NetworkClient" package, so
>> that's not the code we (Flink developers) control.
>>
>> If you want to change this behaviour, unless someone here on this mailing
>> list just happens to know the answer, the better place to ask such a
>> question on the Kafka mailing list. Maybe there is some way to configure
>> this.
>>
>> And sorry I don't know much about neither the KafkaConsumer nor the
>> KafkaBrokers configuration :(
>>
>> Piotrek
>>
>> wt., 4 sie 2020 o 22:04 Nick Bendtner  napisał(a):
>>
>>> Hi,
>>> I don't observe this behaviour though, we use flink 1.7.2 . I stopped
>>> kafka and zookeeper on all broker nodes. On the flink side, I see the
>>> messages in the log ( data is obfuscated) . There are no error logs. The
>>> kafka consumer properties are
>>>
>>> 1. "bootstrap.servers"
>>>
>>> 2. "zookeeper.connect
>>>
>>> 3. "auto.offset.reset"
>>>
>>> 4. "group.id"
>>>
>>> 5."security.protocol"
>>>
>>>
>>> The flink consumer starts consuming data as soon as the kafka comes back
>>> up. So I want to know in what scenario/kafka consumer config will the job
>>> go to failed state after a finite number of restart attempts from
>>> checkpoint.
>>>
>>>
>>> TM log.
>>> 2020-08-04 19:50:55,539 WARN  org.apache.kafka.clients.NetworkClient
>>>- [Consumer clientId=consumer-5,
>>> groupId=flink-AIP-QA-Audit-Consumer] Connection to node 1003 (
>>> yyyrspapd036.xxx.com/ss.mm.120.124:9093) could not be established.
>>> Broker may not be available.
>>> 2020-08-04 19:50:55,540 WARN  org.apache.kafka.clients.NetworkClient
>>>- [Consumer clientId=consumer-4,
>>> groupId=flink-AIP-QA-Audit-Consumer] Connection to node 1002 (
>>> yyyrspapd037.xxx.com/ss.mm.120.125:9093) could not be established.
>>> Broker may not be available.
>>> 2020-08-04 19:50:55,791 WARN  org.apache.kafka.clients.NetworkClient
>>>- [Consumer clientId=consumer-4,
>>> groupId=flink-AIP-QA-Audit-Consumer] Connection to node 1004 (
>>> yyyrspapd035.xxx.com/ss.mm.120.123:9093) could not be established.
>>> Broker may not be available.
>>> 2020-08-04 19:50:55,791 WARN  org.apache.kafka.clients.NetworkClient
>>>- [Consumer clientId=consumer-6,
>>> groupId=flink-AIP-QA-Audit-Consumer] Connection to node 1003 (
>>> yyyrspapd036.xxx.com/ss.mm.120.124:9093) could not be established.
>>> Broker may not be available.
>>>
>>> Best,
>>> Nick
>>>
>>> On Mon, Jul 20, 2020 at 10:27 AM Aljoscha Krettek 
>>> wrote:
>>>
 Hi,

 Flink doesn't do any special failure-handling or retry logic, so it’s
 up
 to how the KafkaConsumer is configured via properties. In general Flink
 doesn’t try to be smart: when something fails an exception fill bubble
 up that will fail this execution of the job. If checkpoints are enabled
 this will trigger a restore, this is controlled by the restart
 strategy.
 If that eventually gives up the job fill go to “FAILED” and stop.

 This is the relevant section of the docs:

 https://ci.apache.org/projects/flink/flink-docs-stable/dev/task_failure_recovery.html

 Best,
 Aljoscha

 On 15.07.20 17:42, Nick Bendtner wrote:
 > Hi guys,
 > I want to know what is the default behavior of Kafka source when a
 kafka
 > cluster goes down during streaming. Will the job status go to failing
 or is
 > the exception caught and there is a back off before the source tries
 to
 > poll for more events ?
 >
 >
 > Best,
 > Nick.
 >




Re: Status of a job when a kafka source dies

2020-08-05 Thread Piotr Nowojski
Hi Nick,

What Aljoscha was trying to say is that Flink is not trying to do any
magic. If `KafkaConsumer` - which is being used under the hood of
`FlinkKafkaConsumer` connector - throws an exception, this
exception bubbles up causing the job to failover. If the failure is handled
by the `KafkaConsumer` silently, that's what's happening. As we can in the
TM log that you attached, the latter seems to be happening - note that the
warning is logged by "org.apache.kafka.clients.NetworkClient" package, so
that's not the code we (Flink developers) control.

If you want to change this behaviour, unless someone here on this mailing
list just happens to know the answer, the better place to ask such a
question on the Kafka mailing list. Maybe there is some way to configure
this.

And sorry I don't know much about neither the KafkaConsumer nor the
KafkaBrokers configuration :(

Piotrek

wt., 4 sie 2020 o 22:04 Nick Bendtner  napisał(a):

> Hi,
> I don't observe this behaviour though, we use flink 1.7.2 . I stopped
> kafka and zookeeper on all broker nodes. On the flink side, I see the
> messages in the log ( data is obfuscated) . There are no error logs. The
> kafka consumer properties are
>
> 1. "bootstrap.servers"
>
> 2. "zookeeper.connect
>
> 3. "auto.offset.reset"
>
> 4. "group.id"
>
> 5."security.protocol"
>
>
> The flink consumer starts consuming data as soon as the kafka comes back
> up. So I want to know in what scenario/kafka consumer config will the job
> go to failed state after a finite number of restart attempts from
> checkpoint.
>
>
> TM log.
> 2020-08-04 19:50:55,539 WARN  org.apache.kafka.clients.NetworkClient
>  - [Consumer clientId=consumer-5,
> groupId=flink-AIP-QA-Audit-Consumer] Connection to node 1003 (
> yyyrspapd036.xxx.com/ss.mm.120.124:9093) could not be established. Broker
> may not be available.
> 2020-08-04 19:50:55,540 WARN  org.apache.kafka.clients.NetworkClient
>  - [Consumer clientId=consumer-4,
> groupId=flink-AIP-QA-Audit-Consumer] Connection to node 1002 (
> yyyrspapd037.xxx.com/ss.mm.120.125:9093) could not be established. Broker
> may not be available.
> 2020-08-04 19:50:55,791 WARN  org.apache.kafka.clients.NetworkClient
>  - [Consumer clientId=consumer-4,
> groupId=flink-AIP-QA-Audit-Consumer] Connection to node 1004 (
> yyyrspapd035.xxx.com/ss.mm.120.123:9093) could not be established. Broker
> may not be available.
> 2020-08-04 19:50:55,791 WARN  org.apache.kafka.clients.NetworkClient
>  - [Consumer clientId=consumer-6,
> groupId=flink-AIP-QA-Audit-Consumer] Connection to node 1003 (
> yyyrspapd036.xxx.com/ss.mm.120.124:9093) could not be established. Broker
> may not be available.
>
> Best,
> Nick
>
> On Mon, Jul 20, 2020 at 10:27 AM Aljoscha Krettek 
> wrote:
>
>> Hi,
>>
>> Flink doesn't do any special failure-handling or retry logic, so it’s up
>> to how the KafkaConsumer is configured via properties. In general Flink
>> doesn’t try to be smart: when something fails an exception fill bubble
>> up that will fail this execution of the job. If checkpoints are enabled
>> this will trigger a restore, this is controlled by the restart strategy.
>> If that eventually gives up the job fill go to “FAILED” and stop.
>>
>> This is the relevant section of the docs:
>>
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/task_failure_recovery.html
>>
>> Best,
>> Aljoscha
>>
>> On 15.07.20 17:42, Nick Bendtner wrote:
>> > Hi guys,
>> > I want to know what is the default behavior of Kafka source when a kafka
>> > cluster goes down during streaming. Will the job status go to failing
>> or is
>> > the exception caught and there is a back off before the source tries to
>> > poll for more events ?
>> >
>> >
>> > Best,
>> > Nick.
>> >
>>
>>


Re: Status of a job when a kafka source dies

2020-08-04 Thread Nick Bendtner
Hi,
I don't observe this behaviour though, we use flink 1.7.2 . I stopped kafka
and zookeeper on all broker nodes. On the flink side, I see the messages in
the log ( data is obfuscated) . There are no error logs. The kafka consumer
properties are

1. "bootstrap.servers"

2. "zookeeper.connect

3. "auto.offset.reset"

4. "group.id"

5."security.protocol"


The flink consumer starts consuming data as soon as the kafka comes back
up. So I want to know in what scenario/kafka consumer config will the job
go to failed state after a finite number of restart attempts from
checkpoint.


TM log.
2020-08-04 19:50:55,539 WARN  org.apache.kafka.clients.NetworkClient
 - [Consumer clientId=consumer-5,
groupId=flink-AIP-QA-Audit-Consumer] Connection to node 1003 (
yyyrspapd036.xxx.com/ss.mm.120.124:9093) could not be established. Broker
may not be available.
2020-08-04 19:50:55,540 WARN  org.apache.kafka.clients.NetworkClient
 - [Consumer clientId=consumer-4,
groupId=flink-AIP-QA-Audit-Consumer] Connection to node 1002 (
yyyrspapd037.xxx.com/ss.mm.120.125:9093) could not be established. Broker
may not be available.
2020-08-04 19:50:55,791 WARN  org.apache.kafka.clients.NetworkClient
 - [Consumer clientId=consumer-4,
groupId=flink-AIP-QA-Audit-Consumer] Connection to node 1004 (
yyyrspapd035.xxx.com/ss.mm.120.123:9093) could not be established. Broker
may not be available.
2020-08-04 19:50:55,791 WARN  org.apache.kafka.clients.NetworkClient
 - [Consumer clientId=consumer-6,
groupId=flink-AIP-QA-Audit-Consumer] Connection to node 1003 (
yyyrspapd036.xxx.com/ss.mm.120.124:9093) could not be established. Broker
may not be available.

Best,
Nick

On Mon, Jul 20, 2020 at 10:27 AM Aljoscha Krettek 
wrote:

> Hi,
>
> Flink doesn't do any special failure-handling or retry logic, so it’s up
> to how the KafkaConsumer is configured via properties. In general Flink
> doesn’t try to be smart: when something fails an exception fill bubble
> up that will fail this execution of the job. If checkpoints are enabled
> this will trigger a restore, this is controlled by the restart strategy.
> If that eventually gives up the job fill go to “FAILED” and stop.
>
> This is the relevant section of the docs:
>
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/task_failure_recovery.html
>
> Best,
> Aljoscha
>
> On 15.07.20 17:42, Nick Bendtner wrote:
> > Hi guys,
> > I want to know what is the default behavior of Kafka source when a kafka
> > cluster goes down during streaming. Will the job status go to failing or
> is
> > the exception caught and there is a back off before the source tries to
> > poll for more events ?
> >
> >
> > Best,
> > Nick.
> >
>
>


Re: Status of a job when a kafka source dies

2020-07-20 Thread Aljoscha Krettek

Hi,

Flink doesn't do any special failure-handling or retry logic, so it’s up 
to how the KafkaConsumer is configured via properties. In general Flink 
doesn’t try to be smart: when something fails an exception fill bubble 
up that will fail this execution of the job. If checkpoints are enabled 
this will trigger a restore, this is controlled by the restart strategy. 
If that eventually gives up the job fill go to “FAILED” and stop.


This is the relevant section of the docs: 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/task_failure_recovery.html


Best,
Aljoscha

On 15.07.20 17:42, Nick Bendtner wrote:

Hi guys,
I want to know what is the default behavior of Kafka source when a kafka
cluster goes down during streaming. Will the job status go to failing or is
the exception caught and there is a back off before the source tries to
poll for more events ?


Best,
Nick.





Status of a job when a kafka source dies

2020-07-15 Thread Nick Bendtner
Hi guys,
I want to know what is the default behavior of Kafka source when a kafka
cluster goes down during streaming. Will the job status go to failing or is
the exception caught and there is a back off before the source tries to
poll for more events ?


Best,
Nick.