Re: Flink kafka consumer stopped committing offsets

2018-06-19 Thread Juho Autio
Hi,

Thanks for your analysis.

We found LeaderElectionRateAndTimeMs go to non-zero value on Kafka around
the same time when this error was seen in the Flink job.

Kafka itself recovers from this and so do any other consumers that we have.
It seems like a bug in kafka consumer library if this error causes it to
stop committing offsets. If you have any further insight to this, please
let me know.

Apart from that, leader election doesn't happen in normal situation. But it
can happen for example if there are connectivity problems between the Kafka
nodes.

On Mon, Jun 11, 2018 at 6:41 PM amit pal  wrote:

> Probably your kafka consumer is rebalancing.  This can be due to a bigger
> message processing time due to which kafka broker is marking your consumer
> dead and rebalancing. This all happens before the consumer can commit the
> offsets.
>
> On Mon, Jun 11, 2018 at 7:37 PM Piotr Nowojski 
> wrote:
>
>> The more I look into it, the more it seems like a Kafka bug or some
>> cluster failure from which your Kafka cluster did not recover.
>>
>> In your cases auto committing should be set to true and in that case
>> KafkaConsumer should commit offsets once every so often when it’s polling
>> messages. Unless for example `cordinatorUnknown()` returns false in
>> `org.apache.kafka.clients.consumer.internals.ConsumerCoordinator#maybeAutoCommitOffsetsAsync`
>> (Kafka 0.10.2.1 code base):
>>
>> private void maybeAutoCommitOffsetsAsync(long now) {
>> if (autoCommitEnabled) {
>> if (coordinatorUnknown()) {
>> this.nextAutoCommitDeadline = now + retryBackoffMs;
>> } else if (now >= nextAutoCommitDeadline) {
>> this.nextAutoCommitDeadline = now + autoCommitIntervalMs;
>> doAutoCommitOffsetsAsync();
>> }
>> }
>> }
>>
>> Have you checked Kafka logs? This suggests that the real problem is
>> hidden behind:
>>
>> >  INFO  org.apache.kafka.clients.consumer.internals.AbstractCoordinator
>>  - Marking the coordinator my-kafka-host-10-1-16-
>> 97.cloud-internal.mycompany.com:9092 (id: 2147483550 <(214)%20748-3550>
>> rack: null) dead for group
>> aggregate-all_server_measurements_combined-20180606-1000
>>
>> And maybe your Kafka cluster/consumer can not recover from this situation.
>>
>> Another thing to try (simpler) is to just trying upgrading Kafka cluster.
>>
>> Piotrek
>>
>> On 11 Jun 2018, at 11:44, Juho Autio  wrote:
>>
>> Hi Piotr, thanks for your insights.
>>
>> > What’s your KafkaConsumer configuration?
>>
>> We only set these in the properties that are passed to
>> FlinkKafkaConsumer010 constructor:
>>
>> auto.offset.reset=latest
>> bootstrap.servers=my-kafka-host:9092
>> group.id=my_group
>> flink.partition-discovery.interval-millis=3
>>
>> > is checkpointing enabled?
>>
>> No.
>>
>> > enable.auto.commit (or auto.commit.enable for Kafka 0.8) /
>> auto.commit.interval.ms
>>
>> We have whatever is the default behaviour of Flink kafka consumer. It
>> seems to commit quite often, something like every 5 seconds.
>>
>> > did you set setCommitOffsetsOnCheckpoints() ?
>>
>> No. But I checked with debugger that
>> apparently enableCommitOnCheckpoints=true is the default.
>>
>> I also checked with debugger that offsetCommitMode=KAFKA_PERIODIC.
>>
>> So I guess you're right that this bug doesn't seem to be in Flink itself?
>> I wonder if it's a known issue in Kafka client lib..
>>
>> I also took thread dump on one of the task managers in this broken state.
>> But I couldn't spot anything obvious when comparing the threads to a dump
>> from a job where offsets are being committed. Any way I've saved the thread
>> dump in case there's something to look for specifically.
>>
>> Sharing the full logs of job & task managers would be a bit of a hassle,
>> because I don't have an automatic way to obfuscate the logs so that I'm
>> sure that there isn't anything sensitive left. Any way, there isn't
>> anything else to share really. I wrote: "As you can see, it didn't log
>> anything until ~2018-06-07 22:08. Also that's where the log ends".
>>
>> Thanks once more.
>>
>> On Mon, Jun 11, 2018 at 11:18 AM, Piotr Nowojski > > wrote:
>>
>>> Hi,
>>>
>>> What’s your KafkaConsumer configuration? Especially values for:
>>> - is checkpointing enabled?
>>> - enable.auto.commit (or auto.commit.enable for Kafka 0.8) /
>>> auto.commit.interval.ms
>>> - did you set setCommitOffsetsOnCheckpoints() ?
>>>
>>> Please also refer to
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html#kafka-consumers-offset-committing-behaviour-configuration
>>>  ,
>>> especially this part:
>>>
>>> > Note that the Flink Kafka Consumer does not rely on the committed
>>> offsets for fault tolerance guarantees. The committed offsets are only a
>>> means to expose the consumer’s progress for monitoring purposes.
>>>
>>> Can you post full logs from all TaskManagers/JobManager and can you
>>> say/estimate when did the committing brake/stop? Did you check Kafka logs
>>> for 

Re: Flink kafka consumer stopped committing offsets

2018-06-11 Thread amit pal
Probably your kafka consumer is rebalancing.  This can be due to a bigger
message processing time due to which kafka broker is marking your consumer
dead and rebalancing. This all happens before the consumer can commit the
offsets.

On Mon, Jun 11, 2018 at 7:37 PM Piotr Nowojski 
wrote:

> The more I look into it, the more it seems like a Kafka bug or some
> cluster failure from which your Kafka cluster did not recover.
>
> In your cases auto committing should be set to true and in that case
> KafkaConsumer should commit offsets once every so often when it’s polling
> messages. Unless for example `cordinatorUnknown()` returns false in
> `org.apache.kafka.clients.consumer.internals.ConsumerCoordinator#maybeAutoCommitOffsetsAsync`
> (Kafka 0.10.2.1 code base):
>
> private void maybeAutoCommitOffsetsAsync(long now) {
> if (autoCommitEnabled) {
> if (coordinatorUnknown()) {
> this.nextAutoCommitDeadline = now + retryBackoffMs;
> } else if (now >= nextAutoCommitDeadline) {
> this.nextAutoCommitDeadline = now + autoCommitIntervalMs;
> doAutoCommitOffsetsAsync();
> }
> }
> }
>
> Have you checked Kafka logs? This suggests that the real problem is hidden
> behind:
>
> >  INFO  org.apache.kafka.clients.consumer.internals.AbstractCoordinator
>  - Marking the coordinator my-kafka-host-10-1-16-
> 97.cloud-internal.mycompany.com:9092 (id: 2147483550 <(214)%20748-3550>
> rack: null) dead for group
> aggregate-all_server_measurements_combined-20180606-1000
>
> And maybe your Kafka cluster/consumer can not recover from this situation.
>
> Another thing to try (simpler) is to just trying upgrading Kafka cluster.
>
> Piotrek
>
> On 11 Jun 2018, at 11:44, Juho Autio  wrote:
>
> Hi Piotr, thanks for your insights.
>
> > What’s your KafkaConsumer configuration?
>
> We only set these in the properties that are passed to
> FlinkKafkaConsumer010 constructor:
>
> auto.offset.reset=latest
> bootstrap.servers=my-kafka-host:9092
> group.id=my_group
> flink.partition-discovery.interval-millis=3
>
> > is checkpointing enabled?
>
> No.
>
> > enable.auto.commit (or auto.commit.enable for Kafka 0.8) /
> auto.commit.interval.ms
>
> We have whatever is the default behaviour of Flink kafka consumer. It
> seems to commit quite often, something like every 5 seconds.
>
> > did you set setCommitOffsetsOnCheckpoints() ?
>
> No. But I checked with debugger that
> apparently enableCommitOnCheckpoints=true is the default.
>
> I also checked with debugger that offsetCommitMode=KAFKA_PERIODIC.
>
> So I guess you're right that this bug doesn't seem to be in Flink itself?
> I wonder if it's a known issue in Kafka client lib..
>
> I also took thread dump on one of the task managers in this broken state.
> But I couldn't spot anything obvious when comparing the threads to a dump
> from a job where offsets are being committed. Any way I've saved the thread
> dump in case there's something to look for specifically.
>
> Sharing the full logs of job & task managers would be a bit of a hassle,
> because I don't have an automatic way to obfuscate the logs so that I'm
> sure that there isn't anything sensitive left. Any way, there isn't
> anything else to share really. I wrote: "As you can see, it didn't log
> anything until ~2018-06-07 22:08. Also that's where the log ends".
>
> Thanks once more.
>
> On Mon, Jun 11, 2018 at 11:18 AM, Piotr Nowojski 
> wrote:
>
>> Hi,
>>
>> What’s your KafkaConsumer configuration? Especially values for:
>> - is checkpointing enabled?
>> - enable.auto.commit (or auto.commit.enable for Kafka 0.8) /
>> auto.commit.interval.ms
>> - did you set setCommitOffsetsOnCheckpoints() ?
>>
>> Please also refer to
>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html#kafka-consumers-offset-committing-behaviour-configuration
>>  ,
>> especially this part:
>>
>> > Note that the Flink Kafka Consumer does not rely on the committed
>> offsets for fault tolerance guarantees. The committed offsets are only a
>> means to expose the consumer’s progress for monitoring purposes.
>>
>> Can you post full logs from all TaskManagers/JobManager and can you
>> say/estimate when did the committing brake/stop? Did you check Kafka logs
>> for any errors?
>>
>> To me it seems more like a Kafka issue/bug:
>>
>> https://community.cloudera.com/t5/Data-Ingestion-Integration/Cannot-auto-commit-offsets-for-group-console-consumer-79720/m-p/51188
>>
>> https://stackoverflow.com/questions/42362911/kafka-high-level-consumer-error-code-15/42416232#42416232
>> Especially that in your case this offsets committing is superseded by
>> Kafka coordinator failure.
>>
>> Piotrek
>>
>>
>> On 8 Jun 2018, at 10:05, Juho Autio  wrote:
>>
>> Hi,
>>
>> We have a Flink stream job that uses Flink kafka consumer. Normally it
>> commits consumer offsets to Kafka.
>>
>> However this stream ended up in a state where it's otherwise working just
>> fine, but it isn't committing offsets to 

Re: Flink kafka consumer stopped committing offsets

2018-06-11 Thread Piotr Nowojski
The more I look into it, the more it seems like a Kafka bug or some cluster 
failure from which your Kafka cluster did not recover.

In your cases auto committing should be set to true and in that case 
KafkaConsumer should commit offsets once every so often when it’s polling 
messages. Unless for example `cordinatorUnknown()` returns false in 
`org.apache.kafka.clients.consumer.internals.ConsumerCoordinator#maybeAutoCommitOffsetsAsync`
 (Kafka 0.10.2.1 code base):

private void maybeAutoCommitOffsetsAsync(long now) {
if (autoCommitEnabled) {
if (coordinatorUnknown()) {
this.nextAutoCommitDeadline = now + retryBackoffMs;
} else if (now >= nextAutoCommitDeadline) {
this.nextAutoCommitDeadline = now + autoCommitIntervalMs;
doAutoCommitOffsetsAsync();
}
}
}

Have you checked Kafka logs? This suggests that the real problem is hidden 
behind:

>  INFO  org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - 
> Marking the coordinator 
> my-kafka-host-10-1-16-97.cloud-internal.mycompany.com:9092 (id: 2147483550 
> rack: null) dead for group 
> aggregate-all_server_measurements_combined-20180606-1000

And maybe your Kafka cluster/consumer can not recover from this situation.

Another thing to try (simpler) is to just trying upgrading Kafka cluster.

Piotrek

> On 11 Jun 2018, at 11:44, Juho Autio  wrote:
> 
> Hi Piotr, thanks for your insights.
> 
> > What’s your KafkaConsumer configuration?
> 
> We only set these in the properties that are passed to FlinkKafkaConsumer010 
> constructor:
> 
> auto.offset.reset=latest
> bootstrap.servers=my-kafka-host:9092
> group.id =my_group
> flink.partition-discovery.interval-millis=3
> 
> > is checkpointing enabled?
> 
> No.
> 
> > enable.auto.commit (or auto.commit.enable for Kafka 0.8) / 
> > auto.commit.interval.ms 
> 
> We have whatever is the default behaviour of Flink kafka consumer. It seems 
> to commit quite often, something like every 5 seconds.
> 
> > did you set setCommitOffsetsOnCheckpoints() ?
> 
> No. But I checked with debugger that apparently 
> enableCommitOnCheckpoints=true is the default.
> 
> I also checked with debugger that offsetCommitMode=KAFKA_PERIODIC.
> 
> So I guess you're right that this bug doesn't seem to be in Flink itself? I 
> wonder if it's a known issue in Kafka client lib..
> 
> I also took thread dump on one of the task managers in this broken state. But 
> I couldn't spot anything obvious when comparing the threads to a dump from a 
> job where offsets are being committed. Any way I've saved the thread dump in 
> case there's something to look for specifically.
> 
> Sharing the full logs of job & task managers would be a bit of a hassle, 
> because I don't have an automatic way to obfuscate the logs so that I'm sure 
> that there isn't anything sensitive left. Any way, there isn't anything else 
> to share really. I wrote: "As you can see, it didn't log anything until 
> ~2018-06-07 22:08. Also that's where the log ends".
> 
> Thanks once more.
> 
> On Mon, Jun 11, 2018 at 11:18 AM, Piotr Nowojski  > wrote:
> Hi,
> 
> What’s your KafkaConsumer configuration? Especially values for:
> - is checkpointing enabled?
> - enable.auto.commit (or auto.commit.enable for Kafka 0.8) / 
> auto.commit.interval.ms 
> - did you set setCommitOffsetsOnCheckpoints() ?
> 
> Please also refer to 
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html#kafka-consumers-offset-committing-behaviour-configuration
>  
> 
>  , especially this part:
> 
> > Note that the Flink Kafka Consumer does not rely on the committed offsets 
> > for fault tolerance guarantees. The committed offsets are only a means to 
> > expose the consumer’s progress for monitoring purposes.
> 
> Can you post full logs from all TaskManagers/JobManager and can you 
> say/estimate when did the committing brake/stop? Did you check Kafka logs for 
> any errors?
> 
> To me it seems more like a Kafka issue/bug:
> https://community.cloudera.com/t5/Data-Ingestion-Integration/Cannot-auto-commit-offsets-for-group-console-consumer-79720/m-p/51188
>  
> 
> https://stackoverflow.com/questions/42362911/kafka-high-level-consumer-error-code-15/42416232#42416232
>  
> 
> Especially that in your case this offsets committing is superseded by Kafka 
> coordinator failure.
> 
> Piotrek
> 
> 
>> On 8 Jun 2018, at 10:05, Juho Autio > > wrote:
>> 
>> Hi,
>> 
>> We have a Flink stream 

Re: Flink kafka consumer stopped committing offsets

2018-06-11 Thread Juho Autio
Hi Piotr, thanks for your insights.

> What’s your KafkaConsumer configuration?

We only set these in the properties that are passed to
FlinkKafkaConsumer010 constructor:

auto.offset.reset=latest
bootstrap.servers=my-kafka-host:9092
group.id=my_group
flink.partition-discovery.interval-millis=3

> is checkpointing enabled?

No.

> enable.auto.commit (or auto.commit.enable for Kafka 0.8) /
auto.commit.interval.ms

We have whatever is the default behaviour of Flink kafka consumer. It seems
to commit quite often, something like every 5 seconds.

> did you set setCommitOffsetsOnCheckpoints() ?

No. But I checked with debugger that apparently enableCommitOnCheckpoints=true
is the default.

I also checked with debugger that offsetCommitMode=KAFKA_PERIODIC.

So I guess you're right that this bug doesn't seem to be in Flink itself? I
wonder if it's a known issue in Kafka client lib..

I also took thread dump on one of the task managers in this broken state.
But I couldn't spot anything obvious when comparing the threads to a dump
from a job where offsets are being committed. Any way I've saved the thread
dump in case there's something to look for specifically.

Sharing the full logs of job & task managers would be a bit of a hassle,
because I don't have an automatic way to obfuscate the logs so that I'm
sure that there isn't anything sensitive left. Any way, there isn't
anything else to share really. I wrote: "As you can see, it didn't log
anything until ~2018-06-07 22:08. Also that's where the log ends".

Thanks once more.

On Mon, Jun 11, 2018 at 11:18 AM, Piotr Nowojski 
wrote:

> Hi,
>
> What’s your KafkaConsumer configuration? Especially values for:
> - is checkpointing enabled?
> - enable.auto.commit (or auto.commit.enable for Kafka 0.8) /
> auto.commit.interval.ms
> - did you set setCommitOffsetsOnCheckpoints() ?
>
> Please also refer to https://ci.apache.org/proje
> cts/flink/flink-docs-release-1.4/dev/connectors/kafka.html#k
> afka-consumers-offset-committing-behaviour-configuration , especially
> this part:
>
> > Note that the Flink Kafka Consumer does not rely on the committed
> offsets for fault tolerance guarantees. The committed offsets are only a
> means to expose the consumer’s progress for monitoring purposes.
>
> Can you post full logs from all TaskManagers/JobManager and can you
> say/estimate when did the committing brake/stop? Did you check Kafka logs
> for any errors?
>
> To me it seems more like a Kafka issue/bug:
> https://community.cloudera.com/t5/Data-Ingestion-Integration
> /Cannot-auto-commit-offsets-for-group-console-consumer-79720/m-p/51188
> https://stackoverflow.com/questions/42362911/kafka-high-leve
> l-consumer-error-code-15/42416232#42416232
> Especially that in your case this offsets committing is superseded by
> Kafka coordinator failure.
>
> Piotrek
>
>
> On 8 Jun 2018, at 10:05, Juho Autio  wrote:
>
> Hi,
>
> We have a Flink stream job that uses Flink kafka consumer. Normally it
> commits consumer offsets to Kafka.
>
> However this stream ended up in a state where it's otherwise working just
> fine, but it isn't committing offsets to Kafka any more. The job keeps
> writing correct aggregation results to the sink, though. At the time of
> writing this, the job has been running 14 hours without committing offsets.
>
> Below is an extract from taskmanager.log. As you can see, it didn't log
> anything until ~2018-06-07 22:08. Also that's where the log ends, these are
> the last lines so far.
>
> Could you help check if this is a know bug, possibly already fixed, or
> something new?
>
> I'm using a self-built Flink package 1.5-SNAPSHOT, flink commit
> 8395508b0401353ed07375e22882e7581d46ac0e which is not super old.
>
> Cheers,
> Juho
>
> 2018-06-06 10:01:33,498 INFO  org.apache.kafka.common.utils.AppInfoParser
>  - Kafka version : 0.10.2.1
> 2018-06-06 10:01:33,498 INFO  org.apache.kafka.common.utils.AppInfoParser
>  - Kafka commitId : e89bffd6b2eff799
> 2018-06-06 10:01:33,560 INFO  org.apache.kafka.clients.consu
> mer.internals.AbstractCoordinator  - Discovered coordinator
> my-kafka-host-10-1-16-97.cloud-internal.mycompany.com:9092 (id:
> 2147483550 rack: null) for group aggregate-all_server_measureme
> nts_combined-20180606-1000.
> 2018-06-06 10:01:33,563 INFO  org.apache.kafka.clients.consu
> mer.internals.AbstractCoordinator  - Discovered coordinator
> my-kafka-host-10-1-16-97.cloud-internal.mycompany.com:9092 (id:
> 2147483550 rack: null) for group aggregate-all_server_measureme
> nts_combined-20180606-1000.
> 2018-06-07 22:08:28,773 INFO  org.apache.kafka.clients.consu
> mer.internals.AbstractCoordinator  - Marking the coordinator
> my-kafka-host-10-1-16-97.cloud-internal.mycompany.com:9092 (id:
> 2147483550 rack: null) dead for group aggregate-all_server_measureme
> nts_combined-20180606-1000
> 2018-06-07 22:08:28,776 WARN  org.apache.kafka.clients.consu
> mer.internals.ConsumerCoordinator  - Auto-commit of offsets
> 

Re: Flink kafka consumer stopped committing offsets

2018-06-11 Thread Piotr Nowojski
Hi,

What’s your KafkaConsumer configuration? Especially values for:
- is checkpointing enabled?
- enable.auto.commit (or auto.commit.enable for Kafka 0.8) / 
auto.commit.interval.ms
- did you set setCommitOffsetsOnCheckpoints() ?

Please also refer to 
https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html#kafka-consumers-offset-committing-behaviour-configuration
 

 , especially this part:

> Note that the Flink Kafka Consumer does not rely on the committed offsets for 
> fault tolerance guarantees. The committed offsets are only a means to expose 
> the consumer’s progress for monitoring purposes.

Can you post full logs from all TaskManagers/JobManager and can you 
say/estimate when did the committing brake/stop? Did you check Kafka logs for 
any errors?

To me it seems more like a Kafka issue/bug:
https://community.cloudera.com/t5/Data-Ingestion-Integration/Cannot-auto-commit-offsets-for-group-console-consumer-79720/m-p/51188
 

https://stackoverflow.com/questions/42362911/kafka-high-level-consumer-error-code-15/42416232#42416232
 

Especially that in your case this offsets committing is superseded by Kafka 
coordinator failure.

Piotrek

> On 8 Jun 2018, at 10:05, Juho Autio  wrote:
> 
> Hi,
> 
> We have a Flink stream job that uses Flink kafka consumer. Normally it 
> commits consumer offsets to Kafka.
> 
> However this stream ended up in a state where it's otherwise working just 
> fine, but it isn't committing offsets to Kafka any more. The job keeps 
> writing correct aggregation results to the sink, though. At the time of 
> writing this, the job has been running 14 hours without committing offsets.
> 
> Below is an extract from taskmanager.log. As you can see, it didn't log 
> anything until ~2018-06-07 22:08. Also that's where the log ends, these are 
> the last lines so far.
> 
> Could you help check if this is a know bug, possibly already fixed, or 
> something new?
> 
> I'm using a self-built Flink package 1.5-SNAPSHOT, flink commit 
> 8395508b0401353ed07375e22882e7581d46ac0e which is not super old.
> 
> Cheers,
> Juho
> 
> 2018-06-06 10:01:33,498 INFO  org.apache.kafka.common.utils.AppInfoParser 
>   - Kafka version : 0.10.2.1
> 2018-06-06 10:01:33,498 INFO  org.apache.kafka.common.utils.AppInfoParser 
>   - Kafka commitId : e89bffd6b2eff799
> 2018-06-06 10:01:33,560 INFO  
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Discovered 
> coordinator my-kafka-host-10-1-16-97.cloud-internal.mycompany.com:9092 
>  (id: 
> 2147483550 rack: null) for group 
> aggregate-all_server_measurements_combined-20180606-1000.
> 2018-06-06 10:01:33,563 INFO  
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Discovered 
> coordinator my-kafka-host-10-1-16-97.cloud-internal.mycompany.com:9092 
>  (id: 
> 2147483550 rack: null) for group 
> aggregate-all_server_measurements_combined-20180606-1000.
> 2018-06-07 22:08:28,773 INFO  
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Marking 
> the coordinator my-kafka-host-10-1-16-97.cloud-internal.mycompany.com:9092 
>  (id: 
> 2147483550 rack: null) dead for group 
> aggregate-all_server_measurements_combined-20180606-1000
> 2018-06-07 22:08:28,776 WARN  
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - 
> Auto-commit of offsets {topic1-2=OffsetAndMetadata{offset=12300395550, 
> metadata=''}, topic1-18=OffsetAndMetadata{offset=12299210444, metadata=''}, 
> topic3-0=OffsetAndMetadata{offset=5064277287, metadata=''}, 
> topic4-6=OffsetAndMetadata{offset=5492398559, metadata=''}, 
> topic2-1=OffsetAndMetadata{offset=89817267, metadata=''}, 
> topic1-10=OffsetAndMetadata{offset=12299742352, metadata=''}} failed for 
> group aggregate-all_server_measurements_combined-20180606-1000: Offset commit 
> failed with a retriable exception. You should retry committing offsets.
> 2018-06-07 22:08:29,840 INFO  
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Marking 
> the coordinator my-kafka-host-10-1-16-97.cloud-internal.mycompany.com:9092 
>  (id: 
> 2147483550 rack: null) dead for group 
> aggregate-all_server_measurements_combined-20180606-1000
> 2018-06-07 22:08:29,841 WARN  
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - 
> Auto-commit of offsets 

Flink kafka consumer stopped committing offsets

2018-06-08 Thread Juho Autio
Hi,

We have a Flink stream job that uses Flink kafka consumer. Normally it
commits consumer offsets to Kafka.

However this stream ended up in a state where it's otherwise working just
fine, but it isn't committing offsets to Kafka any more. The job keeps
writing correct aggregation results to the sink, though. At the time of
writing this, the job has been running 14 hours without committing offsets.

Below is an extract from taskmanager.log. As you can see, it didn't log
anything until ~2018-06-07 22:08. Also that's where the log ends, these are
the last lines so far.

Could you help check if this is a know bug, possibly already fixed, or
something new?

I'm using a self-built Flink package 1.5-SNAPSHOT, flink commit
8395508b0401353ed07375e22882e7581d46ac0e which is not super old.

Cheers,
Juho

2018-06-06 10:01:33,498 INFO  org.apache.kafka.common.utils.AppInfoParser
 - Kafka version : 0.10.2.1
2018-06-06 10:01:33,498 INFO  org.apache.kafka.common.utils.AppInfoParser
 - Kafka commitId : e89bffd6b2eff799
2018-06-06 10:01:33,560 INFO
org.apache.kafka.clients.consumer.internals.AbstractCoordinator  -
Discovered coordinator
my-kafka-host-10-1-16-97.cloud-internal.mycompany.com:9092 (id: 2147483550
rack: null) for group
aggregate-all_server_measurements_combined-20180606-1000.
2018-06-06 10:01:33,563 INFO
org.apache.kafka.clients.consumer.internals.AbstractCoordinator  -
Discovered coordinator
my-kafka-host-10-1-16-97.cloud-internal.mycompany.com:9092 (id: 2147483550
rack: null) for group
aggregate-all_server_measurements_combined-20180606-1000.
2018-06-07 22:08:28,773 INFO
org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Marking
the coordinator my-kafka-host-10-1-16-97.cloud-internal.mycompany.com:9092
(id: 2147483550 rack: null) dead for group
aggregate-all_server_measurements_combined-20180606-1000
2018-06-07 22:08:28,776 WARN
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  -
Auto-commit of offsets {topic1-2=OffsetAndMetadata{offset=12300395550,
metadata=''}, topic1-18=OffsetAndMetadata{offset=12299210444, metadata=''},
topic3-0=OffsetAndMetadata{offset=5064277287, metadata=''},
topic4-6=OffsetAndMetadata{offset=5492398559, metadata=''},
topic2-1=OffsetAndMetadata{offset=89817267, metadata=''},
topic1-10=OffsetAndMetadata{offset=12299742352, metadata=''}} failed for
group aggregate-all_server_measurements_combined-20180606-1000: Offset
commit failed with a retriable exception. You should retry committing
offsets.
2018-06-07 22:08:29,840 INFO
org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Marking
the coordinator my-kafka-host-10-1-16-97.cloud-internal.mycompany.com:9092
(id: 2147483550 rack: null) dead for group
aggregate-all_server_measurements_combined-20180606-1000
2018-06-07 22:08:29,841 WARN
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  -
Auto-commit of offsets {topic1-6=OffsetAndMetadata{offset=12298347875,
metadata=''}, topic4-2=OffsetAndMetadata{offset=5492779112, metadata=''},
topic1-14=OffsetAndMetadata{offset=12299972108, metadata=''}} failed for
group aggregate-all_server_measurements_combined-20180606-1000: Offset
commit failed with a retriable exception. You should retry committing
offsets.