[jira] [Commented] (FLINK-27962) KafkaSourceReader fails to commit consumer offsets for checkpoints
[ https://issues.apache.org/jira/browse/FLINK-27962?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17633844#comment-17633844 ] Tommy Schnabel commented on FLINK-27962: [~elanv], I've had luck setting metadata.max.age.ms (kafka producer config) to something less than whatever your kafka timeouts are set to. We've been able to get through momentary kafka outages seamlessly that way > KafkaSourceReader fails to commit consumer offsets for checkpoints > -- > > Key: FLINK-27962 > URL: https://issues.apache.org/jira/browse/FLINK-27962 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.14.4, 1.15.0 >Reporter: Dmytro >Priority: Major > Fix For: 1.16.0 > > Attachments: Screen Shot 2022-09-20 at 5.18.04 PM.png > > > The KafkaSourceReader works well for many hours, then fails and re-connects > successfully, then continues to work some time. After the first three > failures it hangs on "Offset commit failed" and never connected again. > Restarting the Flink job does help and it works until the next "3 times fail". > I am aware about [the > note|https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/kafka/#consumer-offset-committing] > that Kafka source does NOT rely on committed offsets for fault tolerance. > Committing offset is only for exposing the progress of consumer and consuming > group for monitoring. > I agree if the failures are only periodic, but I would argue complete > failures are unacceptable > *Failed to commit consumer offsets for checkpoint:* > {code:java} > Caused by: org.apache.kafka.common.errors.CoordinatorNotAvailableException: > The coordinator is not available. > 2022-06-06 14:19:52,297 WARN > org.apache.flink.connector.kafka.source.reader.KafkaSourceReader [] - Failed > to commit consumer offsets for checkpoint 464521 > org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset > commit failed with a retriable exception. You should retry committing the > latest consumed offsets. > Caused by: org.apache.kafka.common.errors.CoordinatorNotAvailableException: > The coordinator is not available. > 2022-06-06 14:20:02,297 WARN > org.apache.flink.connector.kafka.source.reader.KafkaSourceReader [] - Failed > to commit consumer offsets for checkpoint 464522 > org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset > commit failed with a retriable exception. You should retry committing the > latest consumed offsets. > Caused by: org.apache.kafka.common.errors.CoordinatorNotAvailableException: > The coordinator is not available. > 2022-06-06 14:20:02,297 WARN > org.apache.flink.connector.kafka.source.reader.KafkaSourceReader [] - Failed > to commit consumer offsets for checkpoint 464523 > org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset > commit failed with a retriable exception. You should retry committing the > latest consumed offsets > . fails permanently until the job restart > {code} > *Consumer Config:* > {code:java} > allow.auto.create.topics = true > auto.commit.interval.ms = 5000 > auto.offset.reset = none > bootstrap.servers = [test.host.net:9093] > check.crcs = true > client.dns.lookup = use_all_dns_ips > client.id = test-client-id > client.rack = > connections.max.idle.ms = 18 > default.api.timeout.ms = 6 > enable.auto.commit = false > exclude.internal.topics = true > fetch.max.bytes = 52428800 > fetch.max.wait.ms = 500 > fetch.min.bytes = 1 > group.id = test-group-id > group.instance.id = null > heartbeat.interval.ms = 3000 > interceptor.classes = [] > internal.leave.group.on.close = true > internal.throw.on.fetch.stable.offset.unsupported = false > isolation.level = read_uncommitted > key.deserializer = class > org.apache.kafka.common.serialization.ByteArrayDeserializer > max.partition.fetch.bytes = 1048576 > max.poll.interval.ms = 30 > max.poll.records = 500 > metadata.max.age.ms = 18 > metric.reporters = [] > metrics.num.samples = 2 > metrics.recording.level = INFO > metrics.sample.window.ms = 3 > partition.assignment.strategy = [class > org.apache.kafka.clients.consumer.RangeAssignor] > receive.buffer.bytes = 65536 > reconnect.backoff.max.ms = 1000 > reconnect.backoff.ms = 50 > request.timeout.ms = 6 > retry.backoff.ms = 100 > sasl.client.callback.handler.class = null > sasl.jaas.config = [hidden] > sasl.kerberos.kinit.cmd = /usr/bin/kinit > sasl.kerberos.min.time.before.relogin = 6 > sasl.kerberos.service.name = null > sasl.kerberos.ticket.renew.jitter = 0.05 > sasl.kerberos.ticket.renew.window.factor = 0.8 > sasl.login.callback.handler.class = class > com.test.kafka.security.AzureAuthenticateCallbackHandler > sasl.login.class = null >
[jira] [Commented] (FLINK-27962) KafkaSourceReader fails to commit consumer offsets for checkpoints
[ https://issues.apache.org/jira/browse/FLINK-27962?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17608280#comment-17608280 ] Tommy Schnabel commented on FLINK-27962: Hi [~martijnvisser], yes I tested that successfully yesterday. We're upgrading to 1.15.2 to be able to use NO_CLAIM restore mode when we found this issue. I successfully cherry-picked bc9b401ed1f2e7257c7b44c9838e34ede9c52ed5 onto fork's release branch and confirmed this fixes our issue. Thanks again for all your help everyone! > KafkaSourceReader fails to commit consumer offsets for checkpoints > -- > > Key: FLINK-27962 > URL: https://issues.apache.org/jira/browse/FLINK-27962 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.14.4, 1.15.0 >Reporter: Dmytro >Priority: Major > Attachments: Screen Shot 2022-09-20 at 5.18.04 PM.png > > > The KafkaSourceReader works well for many hours, then fails and re-connects > successfully, then continues to work some time. After the first three > failures it hangs on "Offset commit failed" and never connected again. > Restarting the Flink job does help and it works until the next "3 times fail". > I am aware about [the > note|https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/kafka/#consumer-offset-committing] > that Kafka source does NOT rely on committed offsets for fault tolerance. > Committing offset is only for exposing the progress of consumer and consuming > group for monitoring. > I agree if the failures are only periodic, but I would argue complete > failures are unacceptable > *Failed to commit consumer offsets for checkpoint:* > {code:java} > Caused by: org.apache.kafka.common.errors.CoordinatorNotAvailableException: > The coordinator is not available. > 2022-06-06 14:19:52,297 WARN > org.apache.flink.connector.kafka.source.reader.KafkaSourceReader [] - Failed > to commit consumer offsets for checkpoint 464521 > org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset > commit failed with a retriable exception. You should retry committing the > latest consumed offsets. > Caused by: org.apache.kafka.common.errors.CoordinatorNotAvailableException: > The coordinator is not available. > 2022-06-06 14:20:02,297 WARN > org.apache.flink.connector.kafka.source.reader.KafkaSourceReader [] - Failed > to commit consumer offsets for checkpoint 464522 > org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset > commit failed with a retriable exception. You should retry committing the > latest consumed offsets. > Caused by: org.apache.kafka.common.errors.CoordinatorNotAvailableException: > The coordinator is not available. > 2022-06-06 14:20:02,297 WARN > org.apache.flink.connector.kafka.source.reader.KafkaSourceReader [] - Failed > to commit consumer offsets for checkpoint 464523 > org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset > commit failed with a retriable exception. You should retry committing the > latest consumed offsets > . fails permanently until the job restart > {code} > *Consumer Config:* > {code:java} > allow.auto.create.topics = true > auto.commit.interval.ms = 5000 > auto.offset.reset = none > bootstrap.servers = [test.host.net:9093] > check.crcs = true > client.dns.lookup = use_all_dns_ips > client.id = test-client-id > client.rack = > connections.max.idle.ms = 18 > default.api.timeout.ms = 6 > enable.auto.commit = false > exclude.internal.topics = true > fetch.max.bytes = 52428800 > fetch.max.wait.ms = 500 > fetch.min.bytes = 1 > group.id = test-group-id > group.instance.id = null > heartbeat.interval.ms = 3000 > interceptor.classes = [] > internal.leave.group.on.close = true > internal.throw.on.fetch.stable.offset.unsupported = false > isolation.level = read_uncommitted > key.deserializer = class > org.apache.kafka.common.serialization.ByteArrayDeserializer > max.partition.fetch.bytes = 1048576 > max.poll.interval.ms = 30 > max.poll.records = 500 > metadata.max.age.ms = 18 > metric.reporters = [] > metrics.num.samples = 2 > metrics.recording.level = INFO > metrics.sample.window.ms = 3 > partition.assignment.strategy = [class > org.apache.kafka.clients.consumer.RangeAssignor] > receive.buffer.bytes = 65536 > reconnect.backoff.max.ms = 1000 > reconnect.backoff.ms = 50 > request.timeout.ms = 6 > retry.backoff.ms = 100 > sasl.client.callback.handler.class = null > sasl.jaas.config = [hidden] > sasl.kerberos.kinit.cmd = /usr/bin/kinit > sasl.kerberos.min.time.before.relogin = 6 > sasl.kerberos.service.name = null > sasl.kerberos.ticket.renew.jitter = 0.05 > sasl.kerberos.ticket.renew.window.factor = 0.8 > sasl.login.callback.handler.class = class >
[jira] [Commented] (FLINK-27962) KafkaSourceReader fails to commit consumer offsets for checkpoints
[ https://issues.apache.org/jira/browse/FLINK-27962?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17607752#comment-17607752 ] Tommy Schnabel commented on FLINK-27962: Hi [~renqs], no we didn't have any broker failures around this time. We actually had another cluster running 1.14.4 reading from the same topics which saw no issues during this time > KafkaSourceReader fails to commit consumer offsets for checkpoints > -- > > Key: FLINK-27962 > URL: https://issues.apache.org/jira/browse/FLINK-27962 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.14.4, 1.15.0 >Reporter: Dmytro >Priority: Major > Attachments: Screen Shot 2022-09-20 at 5.18.04 PM.png > > > The KafkaSourceReader works well for many hours, then fails and re-connects > successfully, then continues to work some time. After the first three > failures it hangs on "Offset commit failed" and never connected again. > Restarting the Flink job does help and it works until the next "3 times fail". > I am aware about [the > note|https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/kafka/#consumer-offset-committing] > that Kafka source does NOT rely on committed offsets for fault tolerance. > Committing offset is only for exposing the progress of consumer and consuming > group for monitoring. > I agree if the failures are only periodic, but I would argue complete > failures are unacceptable > *Failed to commit consumer offsets for checkpoint:* > {code:java} > Caused by: org.apache.kafka.common.errors.CoordinatorNotAvailableException: > The coordinator is not available. > 2022-06-06 14:19:52,297 WARN > org.apache.flink.connector.kafka.source.reader.KafkaSourceReader [] - Failed > to commit consumer offsets for checkpoint 464521 > org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset > commit failed with a retriable exception. You should retry committing the > latest consumed offsets. > Caused by: org.apache.kafka.common.errors.CoordinatorNotAvailableException: > The coordinator is not available. > 2022-06-06 14:20:02,297 WARN > org.apache.flink.connector.kafka.source.reader.KafkaSourceReader [] - Failed > to commit consumer offsets for checkpoint 464522 > org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset > commit failed with a retriable exception. You should retry committing the > latest consumed offsets. > Caused by: org.apache.kafka.common.errors.CoordinatorNotAvailableException: > The coordinator is not available. > 2022-06-06 14:20:02,297 WARN > org.apache.flink.connector.kafka.source.reader.KafkaSourceReader [] - Failed > to commit consumer offsets for checkpoint 464523 > org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset > commit failed with a retriable exception. You should retry committing the > latest consumed offsets > . fails permanently until the job restart > {code} > *Consumer Config:* > {code:java} > allow.auto.create.topics = true > auto.commit.interval.ms = 5000 > auto.offset.reset = none > bootstrap.servers = [test.host.net:9093] > check.crcs = true > client.dns.lookup = use_all_dns_ips > client.id = test-client-id > client.rack = > connections.max.idle.ms = 18 > default.api.timeout.ms = 6 > enable.auto.commit = false > exclude.internal.topics = true > fetch.max.bytes = 52428800 > fetch.max.wait.ms = 500 > fetch.min.bytes = 1 > group.id = test-group-id > group.instance.id = null > heartbeat.interval.ms = 3000 > interceptor.classes = [] > internal.leave.group.on.close = true > internal.throw.on.fetch.stable.offset.unsupported = false > isolation.level = read_uncommitted > key.deserializer = class > org.apache.kafka.common.serialization.ByteArrayDeserializer > max.partition.fetch.bytes = 1048576 > max.poll.interval.ms = 30 > max.poll.records = 500 > metadata.max.age.ms = 18 > metric.reporters = [] > metrics.num.samples = 2 > metrics.recording.level = INFO > metrics.sample.window.ms = 3 > partition.assignment.strategy = [class > org.apache.kafka.clients.consumer.RangeAssignor] > receive.buffer.bytes = 65536 > reconnect.backoff.max.ms = 1000 > reconnect.backoff.ms = 50 > request.timeout.ms = 6 > retry.backoff.ms = 100 > sasl.client.callback.handler.class = null > sasl.jaas.config = [hidden] > sasl.kerberos.kinit.cmd = /usr/bin/kinit > sasl.kerberos.min.time.before.relogin = 6 > sasl.kerberos.service.name = null > sasl.kerberos.ticket.renew.jitter = 0.05 > sasl.kerberos.ticket.renew.window.factor = 0.8 > sasl.login.callback.handler.class = class > com.test.kafka.security.AzureAuthenticateCallbackHandler > sasl.login.class = null > sasl.login.refresh.buffer.seconds = 300 > sasl.login.refresh.min.period.seconds = 60 >
[jira] [Updated] (FLINK-27962) KafkaSourceReader fails to commit consumer offsets for checkpoints
[ https://issues.apache.org/jira/browse/FLINK-27962?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tommy Schnabel updated FLINK-27962: --- Attachment: Screen Shot 2022-09-20 at 5.18.04 PM.png > KafkaSourceReader fails to commit consumer offsets for checkpoints > -- > > Key: FLINK-27962 > URL: https://issues.apache.org/jira/browse/FLINK-27962 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.14.4, 1.15.0 >Reporter: Dmytro >Priority: Major > Attachments: Screen Shot 2022-09-20 at 5.18.04 PM.png > > > The KafkaSourceReader works well for many hours, then fails and re-connects > successfully, then continues to work some time. After the first three > failures it hangs on "Offset commit failed" and never connected again. > Restarting the Flink job does help and it works until the next "3 times fail". > I am aware about [the > note|https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/kafka/#consumer-offset-committing] > that Kafka source does NOT rely on committed offsets for fault tolerance. > Committing offset is only for exposing the progress of consumer and consuming > group for monitoring. > I agree if the failures are only periodic, but I would argue complete > failures are unacceptable > *Failed to commit consumer offsets for checkpoint:* > {code:java} > Caused by: org.apache.kafka.common.errors.CoordinatorNotAvailableException: > The coordinator is not available. > 2022-06-06 14:19:52,297 WARN > org.apache.flink.connector.kafka.source.reader.KafkaSourceReader [] - Failed > to commit consumer offsets for checkpoint 464521 > org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset > commit failed with a retriable exception. You should retry committing the > latest consumed offsets. > Caused by: org.apache.kafka.common.errors.CoordinatorNotAvailableException: > The coordinator is not available. > 2022-06-06 14:20:02,297 WARN > org.apache.flink.connector.kafka.source.reader.KafkaSourceReader [] - Failed > to commit consumer offsets for checkpoint 464522 > org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset > commit failed with a retriable exception. You should retry committing the > latest consumed offsets. > Caused by: org.apache.kafka.common.errors.CoordinatorNotAvailableException: > The coordinator is not available. > 2022-06-06 14:20:02,297 WARN > org.apache.flink.connector.kafka.source.reader.KafkaSourceReader [] - Failed > to commit consumer offsets for checkpoint 464523 > org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset > commit failed with a retriable exception. You should retry committing the > latest consumed offsets > . fails permanently until the job restart > {code} > *Consumer Config:* > {code:java} > allow.auto.create.topics = true > auto.commit.interval.ms = 5000 > auto.offset.reset = none > bootstrap.servers = [test.host.net:9093] > check.crcs = true > client.dns.lookup = use_all_dns_ips > client.id = test-client-id > client.rack = > connections.max.idle.ms = 18 > default.api.timeout.ms = 6 > enable.auto.commit = false > exclude.internal.topics = true > fetch.max.bytes = 52428800 > fetch.max.wait.ms = 500 > fetch.min.bytes = 1 > group.id = test-group-id > group.instance.id = null > heartbeat.interval.ms = 3000 > interceptor.classes = [] > internal.leave.group.on.close = true > internal.throw.on.fetch.stable.offset.unsupported = false > isolation.level = read_uncommitted > key.deserializer = class > org.apache.kafka.common.serialization.ByteArrayDeserializer > max.partition.fetch.bytes = 1048576 > max.poll.interval.ms = 30 > max.poll.records = 500 > metadata.max.age.ms = 18 > metric.reporters = [] > metrics.num.samples = 2 > metrics.recording.level = INFO > metrics.sample.window.ms = 3 > partition.assignment.strategy = [class > org.apache.kafka.clients.consumer.RangeAssignor] > receive.buffer.bytes = 65536 > reconnect.backoff.max.ms = 1000 > reconnect.backoff.ms = 50 > request.timeout.ms = 6 > retry.backoff.ms = 100 > sasl.client.callback.handler.class = null > sasl.jaas.config = [hidden] > sasl.kerberos.kinit.cmd = /usr/bin/kinit > sasl.kerberos.min.time.before.relogin = 6 > sasl.kerberos.service.name = null > sasl.kerberos.ticket.renew.jitter = 0.05 > sasl.kerberos.ticket.renew.window.factor = 0.8 > sasl.login.callback.handler.class = class > com.test.kafka.security.AzureAuthenticateCallbackHandler > sasl.login.class = null > sasl.login.refresh.buffer.seconds = 300 > sasl.login.refresh.min.period.seconds = 60 > sasl.login.refresh.window.factor = 0.8 > sasl.login.refresh.window.jitter = 0.05 > sasl.mechanism = OAUTHBEARER > security.protocol = SASL_SSL > security.providers = null
[jira] [Commented] (FLINK-27962) KafkaSourceReader fails to commit consumer offsets for checkpoints
[ https://issues.apache.org/jira/browse/FLINK-27962?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17607408#comment-17607408 ] Tommy Schnabel commented on FLINK-27962: !Screen Shot 2022-09-20 at 5.18.04 PM.png! > KafkaSourceReader fails to commit consumer offsets for checkpoints > -- > > Key: FLINK-27962 > URL: https://issues.apache.org/jira/browse/FLINK-27962 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.14.4, 1.15.0 >Reporter: Dmytro >Priority: Major > Attachments: Screen Shot 2022-09-20 at 5.18.04 PM.png > > > The KafkaSourceReader works well for many hours, then fails and re-connects > successfully, then continues to work some time. After the first three > failures it hangs on "Offset commit failed" and never connected again. > Restarting the Flink job does help and it works until the next "3 times fail". > I am aware about [the > note|https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/kafka/#consumer-offset-committing] > that Kafka source does NOT rely on committed offsets for fault tolerance. > Committing offset is only for exposing the progress of consumer and consuming > group for monitoring. > I agree if the failures are only periodic, but I would argue complete > failures are unacceptable > *Failed to commit consumer offsets for checkpoint:* > {code:java} > Caused by: org.apache.kafka.common.errors.CoordinatorNotAvailableException: > The coordinator is not available. > 2022-06-06 14:19:52,297 WARN > org.apache.flink.connector.kafka.source.reader.KafkaSourceReader [] - Failed > to commit consumer offsets for checkpoint 464521 > org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset > commit failed with a retriable exception. You should retry committing the > latest consumed offsets. > Caused by: org.apache.kafka.common.errors.CoordinatorNotAvailableException: > The coordinator is not available. > 2022-06-06 14:20:02,297 WARN > org.apache.flink.connector.kafka.source.reader.KafkaSourceReader [] - Failed > to commit consumer offsets for checkpoint 464522 > org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset > commit failed with a retriable exception. You should retry committing the > latest consumed offsets. > Caused by: org.apache.kafka.common.errors.CoordinatorNotAvailableException: > The coordinator is not available. > 2022-06-06 14:20:02,297 WARN > org.apache.flink.connector.kafka.source.reader.KafkaSourceReader [] - Failed > to commit consumer offsets for checkpoint 464523 > org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset > commit failed with a retriable exception. You should retry committing the > latest consumed offsets > . fails permanently until the job restart > {code} > *Consumer Config:* > {code:java} > allow.auto.create.topics = true > auto.commit.interval.ms = 5000 > auto.offset.reset = none > bootstrap.servers = [test.host.net:9093] > check.crcs = true > client.dns.lookup = use_all_dns_ips > client.id = test-client-id > client.rack = > connections.max.idle.ms = 18 > default.api.timeout.ms = 6 > enable.auto.commit = false > exclude.internal.topics = true > fetch.max.bytes = 52428800 > fetch.max.wait.ms = 500 > fetch.min.bytes = 1 > group.id = test-group-id > group.instance.id = null > heartbeat.interval.ms = 3000 > interceptor.classes = [] > internal.leave.group.on.close = true > internal.throw.on.fetch.stable.offset.unsupported = false > isolation.level = read_uncommitted > key.deserializer = class > org.apache.kafka.common.serialization.ByteArrayDeserializer > max.partition.fetch.bytes = 1048576 > max.poll.interval.ms = 30 > max.poll.records = 500 > metadata.max.age.ms = 18 > metric.reporters = [] > metrics.num.samples = 2 > metrics.recording.level = INFO > metrics.sample.window.ms = 3 > partition.assignment.strategy = [class > org.apache.kafka.clients.consumer.RangeAssignor] > receive.buffer.bytes = 65536 > reconnect.backoff.max.ms = 1000 > reconnect.backoff.ms = 50 > request.timeout.ms = 6 > retry.backoff.ms = 100 > sasl.client.callback.handler.class = null > sasl.jaas.config = [hidden] > sasl.kerberos.kinit.cmd = /usr/bin/kinit > sasl.kerberos.min.time.before.relogin = 6 > sasl.kerberos.service.name = null > sasl.kerberos.ticket.renew.jitter = 0.05 > sasl.kerberos.ticket.renew.window.factor = 0.8 > sasl.login.callback.handler.class = class > com.test.kafka.security.AzureAuthenticateCallbackHandler > sasl.login.class = null > sasl.login.refresh.buffer.seconds = 300 > sasl.login.refresh.min.period.seconds = 60 > sasl.login.refresh.window.factor = 0.8 > sasl.login.refresh.window.jitter = 0.05 > sasl.mechanism = OAUTHBEARER > security.protocol = SASL_SSL
[jira] [Comment Edited] (FLINK-27962) KafkaSourceReader fails to commit consumer offsets for checkpoints
[ https://issues.apache.org/jira/browse/FLINK-27962?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17607407#comment-17607407 ] Tommy Schnabel edited comment on FLINK-27962 at 9/20/22 9:21 PM: - Hi there, we're also seeing this at Twilio Segment on versions 1.15.1 and 1.15.2, but _not_ on 1.14.4. I didn't test 1.15.0 but I imagine it's present there. Here's what we're seeing in one of our task manager's logs: {code} 2022-09-20 15:58:07,978 WARN org.apache.flink.connector.kafka.source.reader.KafkaSourceReader [] - Failed to commit consumer offsets for checkpoint 363507 org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset commit failed with a retriable exception. You should retry committing the latest consumed offsets. Caused by: org.apache.kafka.common.errors.CoordinatorNotAvailableException: The coordinator is not available. 2022-09-20 15:58:07,981 WARN org.apache.flink.connector.kafka.source.reader.KafkaSourceReader [] - Failed to commit consumer offsets for checkpoint 363507 org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset commit failed with a retriable exception. You should retry committing the latest consumed offsets. Caused by: org.apache.kafka.common.errors.CoordinatorNotAvailableException: The coordinator is not available. 2022-09-20 15:58:08,029 WARN org.apache.flink.connector.kafka.source.reader.KafkaSourceReader [] - Failed to commit consumer offsets for checkpoint 363507 org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset commit failed with a retriable exception. You should retry committing the latest consumed offsets. Caused by: org.apache.kafka.common.errors.CoordinatorNotAvailableException: The coordinator is not available. 2022-09-20 15:58:08,055 WARN org.apache.flink.connector.kafka.source.reader.KafkaSourceReader [] - Failed to commit consumer offsets for checkpoint 363507 org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset commit failed with a retriable exception. You should retry committing the latest consumed offsets. Caused by: org.apache.kafka.common.errors.CoordinatorNotAvailableException: The coordinator is not available. {code} Is there any ETA on getting this fixed? Updating to attaching one of our graphs to show how we're seeing 2/3rds of our partitions not committing offsets while 1/3rd does go down. Inspecting further I've discovered that we _are_ processing all those supposedly lagged messages, the offsets are just not being committed back to kafka. was (Author: JIRAUSER294826): Hi there, we're also seeing this at Twilio Segment on versions 1.15.1 and 1.15.2, but _not_ on 1.14.4. I didn't test 1.15.0 but I imagine it's present there. Here's what we're seeing in one of our task manager's logs: {code} 2022-09-20 15:58:07,978 WARN org.apache.flink.connector.kafka.source.reader.KafkaSourceReader [] - Failed to commit consumer offsets for checkpoint 363507 org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset commit failed with a retriable exception. You should retry committing the latest consumed offsets. Caused by: org.apache.kafka.common.errors.CoordinatorNotAvailableException: The coordinator is not available. 2022-09-20 15:58:07,981 WARN org.apache.flink.connector.kafka.source.reader.KafkaSourceReader [] - Failed to commit consumer offsets for checkpoint 363507 org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset commit failed with a retriable exception. You should retry committing the latest consumed offsets. Caused by: org.apache.kafka.common.errors.CoordinatorNotAvailableException: The coordinator is not available. 2022-09-20 15:58:08,029 WARN org.apache.flink.connector.kafka.source.reader.KafkaSourceReader [] - Failed to commit consumer offsets for checkpoint 363507 org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset commit failed with a retriable exception. You should retry committing the latest consumed offsets. Caused by: org.apache.kafka.common.errors.CoordinatorNotAvailableException: The coordinator is not available. 2022-09-20 15:58:08,055 WARN org.apache.flink.connector.kafka.source.reader.KafkaSourceReader [] - Failed to commit consumer offsets for checkpoint 363507 org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset commit failed with a retriable exception. You should retry committing the latest consumed offsets. Caused by: org.apache.kafka.common.errors.CoordinatorNotAvailableException: The coordinator is not available. {code} Is there any ETA on getting this fixed? > KafkaSourceReader fails to commit consumer offsets for checkpoints > -- > > Key: FLINK-27962 > URL: https://issues.apache.org/jira/browse/FLINK-27962 > Project: Flink >
[jira] [Comment Edited] (FLINK-27962) KafkaSourceReader fails to commit consumer offsets for checkpoints
[ https://issues.apache.org/jira/browse/FLINK-27962?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17607407#comment-17607407 ] Tommy Schnabel edited comment on FLINK-27962 at 9/20/22 9:15 PM: - Hi there, we're also seeing this at Twilio Segment on versions 1.15.1 and 1.15.2, but _not_ on 1.14.4. I didn't test 1.15.0 but I imagine it's present there. Here's what we're seeing in one of our task manager's logs: {code} 2022-09-20 15:58:07,978 WARN org.apache.flink.connector.kafka.source.reader.KafkaSourceReader [] - Failed to commit consumer offsets for checkpoint 363507 org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset commit failed with a retriable exception. You should retry committing the latest consumed offsets. Caused by: org.apache.kafka.common.errors.CoordinatorNotAvailableException: The coordinator is not available. 2022-09-20 15:58:07,981 WARN org.apache.flink.connector.kafka.source.reader.KafkaSourceReader [] - Failed to commit consumer offsets for checkpoint 363507 org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset commit failed with a retriable exception. You should retry committing the latest consumed offsets. Caused by: org.apache.kafka.common.errors.CoordinatorNotAvailableException: The coordinator is not available. 2022-09-20 15:58:08,029 WARN org.apache.flink.connector.kafka.source.reader.KafkaSourceReader [] - Failed to commit consumer offsets for checkpoint 363507 org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset commit failed with a retriable exception. You should retry committing the latest consumed offsets. Caused by: org.apache.kafka.common.errors.CoordinatorNotAvailableException: The coordinator is not available. 2022-09-20 15:58:08,055 WARN org.apache.flink.connector.kafka.source.reader.KafkaSourceReader [] - Failed to commit consumer offsets for checkpoint 363507 org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset commit failed with a retriable exception. You should retry committing the latest consumed offsets. Caused by: org.apache.kafka.common.errors.CoordinatorNotAvailableException: The coordinator is not available. {code} Is there any ETA on getting this fixed? was (Author: JIRAUSER294826): Hi there, we're also seeing this at Twilio Segment on versions 1.15.1 and 1.15.2, but _not_ on 1.14.4. I didn't test 1.15.0 but I imagine it's present there. Here's what we're seeing in one of our task manager's logs: ``` 2022-09-20 15:58:07,978 WARN org.apache.flink.connector.kafka.source.reader.KafkaSourceReader [] - Failed to commit consumer offsets for checkpoint 363507 org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset commit failed with a retriable exception. You should retry committing the latest consumed offsets. Caused by: org.apache.kafka.common.errors.CoordinatorNotAvailableException: The coordinator is not available. 2022-09-20 15:58:07,981 WARN org.apache.flink.connector.kafka.source.reader.KafkaSourceReader [] - Failed to commit consumer offsets for checkpoint 363507 org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset commit failed with a retriable exception. You should retry committing the latest consumed offsets. Caused by: org.apache.kafka.common.errors.CoordinatorNotAvailableException: The coordinator is not available. 2022-09-20 15:58:08,029 WARN org.apache.flink.connector.kafka.source.reader.KafkaSourceReader [] - Failed to commit consumer offsets for checkpoint 363507 org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset commit failed with a retriable exception. You should retry committing the latest consumed offsets. Caused by: org.apache.kafka.common.errors.CoordinatorNotAvailableException: The coordinator is not available. 2022-09-20 15:58:08,055 WARN org.apache.flink.connector.kafka.source.reader.KafkaSourceReader [] - Failed to commit consumer offsets for checkpoint 363507 org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset commit failed with a retriable exception. You should retry committing the latest consumed offsets. Caused by: org.apache.kafka.common.errors.CoordinatorNotAvailableException: The coordinator is not available. ``` Is there any ETA on getting this fixed? > KafkaSourceReader fails to commit consumer offsets for checkpoints > -- > > Key: FLINK-27962 > URL: https://issues.apache.org/jira/browse/FLINK-27962 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.14.4, 1.15.0 >Reporter: Dmytro >Priority: Major > > The KafkaSourceReader works well for many hours, then fails and re-connects > successfully, then continues to work some time. After the
[jira] [Commented] (FLINK-27962) KafkaSourceReader fails to commit consumer offsets for checkpoints
[ https://issues.apache.org/jira/browse/FLINK-27962?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17607407#comment-17607407 ] Tommy Schnabel commented on FLINK-27962: Hi there, we're also seeing this at Twilio Segment on versions 1.15.1 and 1.15.2, but _not_ on 1.14.4. I didn't test 1.15.0 but I imagine it's present there. Here's what we're seeing in one of our task manager's logs: ``` 2022-09-20 15:58:07,978 WARN org.apache.flink.connector.kafka.source.reader.KafkaSourceReader [] - Failed to commit consumer offsets for checkpoint 363507 org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset commit failed with a retriable exception. You should retry committing the latest consumed offsets. Caused by: org.apache.kafka.common.errors.CoordinatorNotAvailableException: The coordinator is not available. 2022-09-20 15:58:07,981 WARN org.apache.flink.connector.kafka.source.reader.KafkaSourceReader [] - Failed to commit consumer offsets for checkpoint 363507 org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset commit failed with a retriable exception. You should retry committing the latest consumed offsets. Caused by: org.apache.kafka.common.errors.CoordinatorNotAvailableException: The coordinator is not available. 2022-09-20 15:58:08,029 WARN org.apache.flink.connector.kafka.source.reader.KafkaSourceReader [] - Failed to commit consumer offsets for checkpoint 363507 org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset commit failed with a retriable exception. You should retry committing the latest consumed offsets. Caused by: org.apache.kafka.common.errors.CoordinatorNotAvailableException: The coordinator is not available. 2022-09-20 15:58:08,055 WARN org.apache.flink.connector.kafka.source.reader.KafkaSourceReader [] - Failed to commit consumer offsets for checkpoint 363507 org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset commit failed with a retriable exception. You should retry committing the latest consumed offsets. Caused by: org.apache.kafka.common.errors.CoordinatorNotAvailableException: The coordinator is not available. ``` Is there any ETA on getting this fixed? > KafkaSourceReader fails to commit consumer offsets for checkpoints > -- > > Key: FLINK-27962 > URL: https://issues.apache.org/jira/browse/FLINK-27962 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.14.4, 1.15.0 >Reporter: Dmytro >Priority: Major > > The KafkaSourceReader works well for many hours, then fails and re-connects > successfully, then continues to work some time. After the first three > failures it hangs on "Offset commit failed" and never connected again. > Restarting the Flink job does help and it works until the next "3 times fail". > I am aware about [the > note|https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/kafka/#consumer-offset-committing] > that Kafka source does NOT rely on committed offsets for fault tolerance. > Committing offset is only for exposing the progress of consumer and consuming > group for monitoring. > I agree if the failures are only periodic, but I would argue complete > failures are unacceptable > *Failed to commit consumer offsets for checkpoint:* > {code:java} > Caused by: org.apache.kafka.common.errors.CoordinatorNotAvailableException: > The coordinator is not available. > 2022-06-06 14:19:52,297 WARN > org.apache.flink.connector.kafka.source.reader.KafkaSourceReader [] - Failed > to commit consumer offsets for checkpoint 464521 > org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset > commit failed with a retriable exception. You should retry committing the > latest consumed offsets. > Caused by: org.apache.kafka.common.errors.CoordinatorNotAvailableException: > The coordinator is not available. > 2022-06-06 14:20:02,297 WARN > org.apache.flink.connector.kafka.source.reader.KafkaSourceReader [] - Failed > to commit consumer offsets for checkpoint 464522 > org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset > commit failed with a retriable exception. You should retry committing the > latest consumed offsets. > Caused by: org.apache.kafka.common.errors.CoordinatorNotAvailableException: > The coordinator is not available. > 2022-06-06 14:20:02,297 WARN > org.apache.flink.connector.kafka.source.reader.KafkaSourceReader [] - Failed > to commit consumer offsets for checkpoint 464523 > org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset > commit failed with a retriable exception. You should retry committing the > latest consumed offsets > . fails permanently until the job restart > {code} > *Consumer Config:* >
[jira] [Commented] (FLINK-23886) An exception is thrown out when recover job timers from checkpoint file
[ https://issues.apache.org/jira/browse/FLINK-23886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17605390#comment-17605390 ] Tommy Schnabel commented on FLINK-23886: Hi [~kphan102] and [~yunta] , we found that switching to heap timers did work, but it does not solve the issue of having corrupted state in our checkpoints, so switching back to RocksDB based timers would cause us to run into the same exception Thinking about this again, I wonder if switching to heap timers, taking a savepoint, and then restoring from that savepoint would cause the corrupted timer state to be gone, since perhaps the timers won't be included in the savepoint. Let me know if you have any luck with that! > An exception is thrown out when recover job timers from checkpoint file > --- > > Key: FLINK-23886 > URL: https://issues.apache.org/jira/browse/FLINK-23886 > Project: Flink > Issue Type: Bug > Components: Runtime / State Backends >Affects Versions: 1.10.0, 1.11.3, 1.13.2, 1.13.5, 1.14.4 >Reporter: Jing Zhang >Assignee: Yuan Mei >Priority: Major > Attachments: image-2021-08-25-16-38-04-023.png, > image-2021-08-25-16-38-12-308.png, image-2021-08-25-17-06-29-806.png, > image-2021-08-25-17-07-38-327.png, segment-drop-corrupted-timer-state.diff > > > A user report the bug in the [mailist. > |http://mail-archives.apache.org/mod_mbox/flink-user/202108.mbox/%3ccakmsf43j14nkjmgjuy4dh5qn2vbjtw4tfh4pmmuyvcvfhgf...@mail.gmail.com%3E]I > paste the content here. > Setup Specifics: > Version: 1.6.2 > RocksDB Map State > Timers stored in rocksdb > > When we have this job running for long periods of time like > 30 days, if > for some reason the job restarts, we encounter "Error while deserializing the > element". Is this a known issue fixed in later versions? I see some changes > to code for FLINK-10175, but we don't use any queryable state > > Below is the stack trace > > org.apache.flink.util.FlinkRuntimeException: Error while deserializing the > element. > at > org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.deserializeElement(RocksDBCachingPriorityQueueSet.java:389) > at > org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.peek(RocksDBCachingPriorityQueueSet.java:146) > at > org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.peek(RocksDBCachingPriorityQueueSet.java:56) > at > org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue$InternalPriorityQueueComparator.comparePriority(KeyGroupPartitionedPriorityQueue.java:274) > at > org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue$InternalPriorityQueueComparator.comparePriority(KeyGroupPartitionedPriorityQueue.java:261) > at > org.apache.flink.runtime.state.heap.HeapPriorityQueue.isElementPriorityLessThen(HeapPriorityQueue.java:164) > at > org.apache.flink.runtime.state.heap.HeapPriorityQueue.siftUp(HeapPriorityQueue.java:121) > at > org.apache.flink.runtime.state.heap.HeapPriorityQueue.addInternal(HeapPriorityQueue.java:85) > at > org.apache.flink.runtime.state.heap.AbstractHeapPriorityQueue.add(AbstractHeapPriorityQueue.java:73) > at > org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.(KeyGroupPartitionedPriorityQueue.java:89) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBPriorityQueueSetFactory.create(RocksDBKeyedStateBackend.java:2792) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.create(RocksDBKeyedStateBackend.java:450) > at > org.apache.flink.streaming.api.operators.InternalTimeServiceManager.createTimerPriorityQueue(InternalTimeServiceManager.java:121) > at > org.apache.flink.streaming.api.operators.InternalTimeServiceManager.registerOrGetTimerService(InternalTimeServiceManager.java:106) > at > org.apache.flink.streaming.api.operators.InternalTimeServiceManager.getInternalTimerService(InternalTimeServiceManager.java:87) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.getInternalTimerService(AbstractStreamOperator.java:764) > at > org.apache.flink.streaming.api.operators.KeyedProcessOperator.open(KeyedProcessOperator.java:61) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.io.EOFException > at java.io.DataInputStream.readUnsignedByte(DataInputStream.java:290) > at org.apache.flink.types.StringValue.readString(StringValue.java:769) > at >
[jira] [Commented] (FLINK-23886) An exception is thrown out when recover job timers from checkpoint file
[ https://issues.apache.org/jira/browse/FLINK-23886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17603137#comment-17603137 ] Tommy Schnabel commented on FLINK-23886: Hi there, reporting in from Twilio Segment that we've seen this in Flink 1.14.4: {code:java} Caused by: java.io.EOFException at org.apache.flink.core.memory.DataInputDeserializer.readUnsignedByte(DataInputDeserializer.java:329) at org.apache.flink.types.StringValue.readString(StringValue.java:781) at org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:73) at org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:31) at org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:126) at org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:32) at org.apache.flink.streaming.api.operators.TimerSerializer.deserialize(TimerSerializer.java:161) at org.apache.flink.streaming.api.operators.TimerSerializer.deserialize(TimerSerializer.java:43) at org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.deserializeElement(RocksDBCachingPriorityQueueSet.java:386) ... 26 more {code} We remediated by forking and patching Flink to drop the corrupted state. We then ran data through our system to recreate any timers that _could_ have been lost. All in all we dropped 84 corrupted timers. Our patch included: * Catching and rethrowing a new custom exception when we see an EOFException in TimerSerializer#deserialize * Modifying RocksDBCachingPriorityQueueSet to catch our new exception and skip to the next element * Some null handling since we're explicitly returning null in cases where we detect corrupted state I've attached the diff from the commit in our fork in case it is helpful for anyone else. We're not planning to actively run that code, but instead keep it around as a remediation tool if we were to ever run into this again. We're happy to help and provide more details should they be useful! [^segment-drop-corrupted-timer-state.diff] > An exception is thrown out when recover job timers from checkpoint file > --- > > Key: FLINK-23886 > URL: https://issues.apache.org/jira/browse/FLINK-23886 > Project: Flink > Issue Type: Bug > Components: Runtime / State Backends >Affects Versions: 1.10.0, 1.11.3, 1.13.2 >Reporter: Jing Zhang >Priority: Major > Attachments: image-2021-08-25-16-38-04-023.png, > image-2021-08-25-16-38-12-308.png, image-2021-08-25-17-06-29-806.png, > image-2021-08-25-17-07-38-327.png, segment-drop-corrupted-timer-state.diff > > > A user report the bug in the [mailist. > |http://mail-archives.apache.org/mod_mbox/flink-user/202108.mbox/%3ccakmsf43j14nkjmgjuy4dh5qn2vbjtw4tfh4pmmuyvcvfhgf...@mail.gmail.com%3E]I > paste the content here. > Setup Specifics: > Version: 1.6.2 > RocksDB Map State > Timers stored in rocksdb > > When we have this job running for long periods of time like > 30 days, if > for some reason the job restarts, we encounter "Error while deserializing the > element". Is this a known issue fixed in later versions? I see some changes > to code for FLINK-10175, but we don't use any queryable state > > Below is the stack trace > > org.apache.flink.util.FlinkRuntimeException: Error while deserializing the > element. > at > org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.deserializeElement(RocksDBCachingPriorityQueueSet.java:389) > at > org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.peek(RocksDBCachingPriorityQueueSet.java:146) > at > org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.peek(RocksDBCachingPriorityQueueSet.java:56) > at > org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue$InternalPriorityQueueComparator.comparePriority(KeyGroupPartitionedPriorityQueue.java:274) > at > org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue$InternalPriorityQueueComparator.comparePriority(KeyGroupPartitionedPriorityQueue.java:261) > at > org.apache.flink.runtime.state.heap.HeapPriorityQueue.isElementPriorityLessThen(HeapPriorityQueue.java:164) > at > org.apache.flink.runtime.state.heap.HeapPriorityQueue.siftUp(HeapPriorityQueue.java:121) > at > org.apache.flink.runtime.state.heap.HeapPriorityQueue.addInternal(HeapPriorityQueue.java:85) > at > org.apache.flink.runtime.state.heap.AbstractHeapPriorityQueue.add(AbstractHeapPriorityQueue.java:73) > at > org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.(KeyGroupPartitionedPriorityQueue.java:89) > at >
[jira] [Updated] (FLINK-23886) An exception is thrown out when recover job timers from checkpoint file
[ https://issues.apache.org/jira/browse/FLINK-23886?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tommy Schnabel updated FLINK-23886: --- Attachment: segment-drop-corrupted-timer-state.diff > An exception is thrown out when recover job timers from checkpoint file > --- > > Key: FLINK-23886 > URL: https://issues.apache.org/jira/browse/FLINK-23886 > Project: Flink > Issue Type: Bug > Components: Runtime / State Backends >Affects Versions: 1.10.0, 1.11.3, 1.13.2 >Reporter: Jing Zhang >Priority: Major > Attachments: image-2021-08-25-16-38-04-023.png, > image-2021-08-25-16-38-12-308.png, image-2021-08-25-17-06-29-806.png, > image-2021-08-25-17-07-38-327.png, segment-drop-corrupted-timer-state.diff > > > A user report the bug in the [mailist. > |http://mail-archives.apache.org/mod_mbox/flink-user/202108.mbox/%3ccakmsf43j14nkjmgjuy4dh5qn2vbjtw4tfh4pmmuyvcvfhgf...@mail.gmail.com%3E]I > paste the content here. > Setup Specifics: > Version: 1.6.2 > RocksDB Map State > Timers stored in rocksdb > > When we have this job running for long periods of time like > 30 days, if > for some reason the job restarts, we encounter "Error while deserializing the > element". Is this a known issue fixed in later versions? I see some changes > to code for FLINK-10175, but we don't use any queryable state > > Below is the stack trace > > org.apache.flink.util.FlinkRuntimeException: Error while deserializing the > element. > at > org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.deserializeElement(RocksDBCachingPriorityQueueSet.java:389) > at > org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.peek(RocksDBCachingPriorityQueueSet.java:146) > at > org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.peek(RocksDBCachingPriorityQueueSet.java:56) > at > org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue$InternalPriorityQueueComparator.comparePriority(KeyGroupPartitionedPriorityQueue.java:274) > at > org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue$InternalPriorityQueueComparator.comparePriority(KeyGroupPartitionedPriorityQueue.java:261) > at > org.apache.flink.runtime.state.heap.HeapPriorityQueue.isElementPriorityLessThen(HeapPriorityQueue.java:164) > at > org.apache.flink.runtime.state.heap.HeapPriorityQueue.siftUp(HeapPriorityQueue.java:121) > at > org.apache.flink.runtime.state.heap.HeapPriorityQueue.addInternal(HeapPriorityQueue.java:85) > at > org.apache.flink.runtime.state.heap.AbstractHeapPriorityQueue.add(AbstractHeapPriorityQueue.java:73) > at > org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.(KeyGroupPartitionedPriorityQueue.java:89) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBPriorityQueueSetFactory.create(RocksDBKeyedStateBackend.java:2792) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.create(RocksDBKeyedStateBackend.java:450) > at > org.apache.flink.streaming.api.operators.InternalTimeServiceManager.createTimerPriorityQueue(InternalTimeServiceManager.java:121) > at > org.apache.flink.streaming.api.operators.InternalTimeServiceManager.registerOrGetTimerService(InternalTimeServiceManager.java:106) > at > org.apache.flink.streaming.api.operators.InternalTimeServiceManager.getInternalTimerService(InternalTimeServiceManager.java:87) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.getInternalTimerService(AbstractStreamOperator.java:764) > at > org.apache.flink.streaming.api.operators.KeyedProcessOperator.open(KeyedProcessOperator.java:61) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.io.EOFException > at java.io.DataInputStream.readUnsignedByte(DataInputStream.java:290) > at org.apache.flink.types.StringValue.readString(StringValue.java:769) > at > org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:69) > at > org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:28) > at > org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:179) > at > org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:46) > at > org.apache.flink.streaming.api.operators.TimerSerializer.deserialize(TimerSerializer.java:168) > at > org.apache.flink.streaming.api.operators.TimerSerializer.deserialize(TimerSerializer.java:45) > at >