[jira] [Commented] (FLINK-27962) KafkaSourceReader fails to commit consumer offsets for checkpoints

2022-11-14 Thread Tommy Schnabel (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27962?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17633844#comment-17633844
 ] 

Tommy Schnabel commented on FLINK-27962:


[~elanv], I've had luck setting metadata.max.age.ms (kafka producer config) to 
something less than whatever your kafka timeouts are set to. We've been able to 
get through momentary kafka outages seamlessly that way

> KafkaSourceReader fails to commit consumer offsets for checkpoints
> --
>
> Key: FLINK-27962
> URL: https://issues.apache.org/jira/browse/FLINK-27962
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.14.4, 1.15.0
>Reporter: Dmytro
>Priority: Major
> Fix For: 1.16.0
>
> Attachments: Screen Shot 2022-09-20 at 5.18.04 PM.png
>
>
> The KafkaSourceReader works well for many hours, then fails and re-connects 
> successfully, then continues to work some time. After the first three 
> failures it hangs on "Offset commit failed" and never connected again. 
> Restarting the Flink job does help and it works until the next "3 times fail".
> I am aware about [the 
> note|https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/kafka/#consumer-offset-committing]
>  that Kafka source does NOT rely on committed offsets for fault tolerance. 
> Committing offset is only for exposing the progress of consumer and consuming 
> group for monitoring.
> I agree if the failures are only periodic, but I would argue complete 
> failures are unacceptable
> *Failed to commit consumer offsets for checkpoint:*
> {code:java}
> Caused by: org.apache.kafka.common.errors.CoordinatorNotAvailableException: 
> The coordinator is not available.
> 2022-06-06 14:19:52,297 WARN  
> org.apache.flink.connector.kafka.source.reader.KafkaSourceReader [] - Failed 
> to commit consumer offsets for checkpoint 464521
> org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset 
> commit failed with a retriable exception. You should retry committing the 
> latest consumed offsets.
> Caused by: org.apache.kafka.common.errors.CoordinatorNotAvailableException: 
> The coordinator is not available.
> 2022-06-06 14:20:02,297 WARN  
> org.apache.flink.connector.kafka.source.reader.KafkaSourceReader [] - Failed 
> to commit consumer offsets for checkpoint 464522
> org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset 
> commit failed with a retriable exception. You should retry committing the 
> latest consumed offsets.
> Caused by: org.apache.kafka.common.errors.CoordinatorNotAvailableException: 
> The coordinator is not available.
> 2022-06-06 14:20:02,297 WARN  
> org.apache.flink.connector.kafka.source.reader.KafkaSourceReader [] - Failed 
> to commit consumer offsets for checkpoint 464523
> org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset 
> commit failed with a retriable exception. You should retry committing the 
> latest consumed offsets
> . fails permanently until the job restart
>  {code}
> *Consumer Config:*
> {code:java}
> allow.auto.create.topics = true
> auto.commit.interval.ms = 5000
> auto.offset.reset = none
> bootstrap.servers = [test.host.net:9093]
> check.crcs = true
> client.dns.lookup = use_all_dns_ips
> client.id = test-client-id
> client.rack =
> connections.max.idle.ms = 18
> default.api.timeout.ms = 6
> enable.auto.commit = false
> exclude.internal.topics = true
> fetch.max.bytes = 52428800
> fetch.max.wait.ms = 500
> fetch.min.bytes = 1
> group.id = test-group-id
> group.instance.id = null
> heartbeat.interval.ms = 3000
> interceptor.classes = []
> internal.leave.group.on.close = true
> internal.throw.on.fetch.stable.offset.unsupported = false
> isolation.level = read_uncommitted
> key.deserializer = class 
> org.apache.kafka.common.serialization.ByteArrayDeserializer
> max.partition.fetch.bytes = 1048576
> max.poll.interval.ms = 30
> max.poll.records = 500
> metadata.max.age.ms = 18
> metric.reporters = []
> metrics.num.samples = 2
> metrics.recording.level = INFO
> metrics.sample.window.ms = 3
> partition.assignment.strategy = [class 
> org.apache.kafka.clients.consumer.RangeAssignor]
> receive.buffer.bytes = 65536
> reconnect.backoff.max.ms = 1000
> reconnect.backoff.ms = 50
> request.timeout.ms = 6
> retry.backoff.ms = 100
> sasl.client.callback.handler.class = null
> sasl.jaas.config = [hidden]
> sasl.kerberos.kinit.cmd = /usr/bin/kinit
> sasl.kerberos.min.time.before.relogin = 6
> sasl.kerberos.service.name = null
> sasl.kerberos.ticket.renew.jitter = 0.05
> sasl.kerberos.ticket.renew.window.factor = 0.8
> sasl.login.callback.handler.class = class 
> com.test.kafka.security.AzureAuthenticateCallbackHandler
> sasl.login.class = null
> 

[jira] [Commented] (FLINK-27962) KafkaSourceReader fails to commit consumer offsets for checkpoints

2022-09-22 Thread Tommy Schnabel (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27962?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17608280#comment-17608280
 ] 

Tommy Schnabel commented on FLINK-27962:


Hi [~martijnvisser], yes I tested that successfully yesterday. We're upgrading 
to 1.15.2 to be able to use NO_CLAIM restore mode when we found this issue. I 
successfully cherry-picked bc9b401ed1f2e7257c7b44c9838e34ede9c52ed5 onto fork's 
release branch and confirmed this fixes our issue. Thanks again for all your 
help everyone!

> KafkaSourceReader fails to commit consumer offsets for checkpoints
> --
>
> Key: FLINK-27962
> URL: https://issues.apache.org/jira/browse/FLINK-27962
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.14.4, 1.15.0
>Reporter: Dmytro
>Priority: Major
> Attachments: Screen Shot 2022-09-20 at 5.18.04 PM.png
>
>
> The KafkaSourceReader works well for many hours, then fails and re-connects 
> successfully, then continues to work some time. After the first three 
> failures it hangs on "Offset commit failed" and never connected again. 
> Restarting the Flink job does help and it works until the next "3 times fail".
> I am aware about [the 
> note|https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/kafka/#consumer-offset-committing]
>  that Kafka source does NOT rely on committed offsets for fault tolerance. 
> Committing offset is only for exposing the progress of consumer and consuming 
> group for monitoring.
> I agree if the failures are only periodic, but I would argue complete 
> failures are unacceptable
> *Failed to commit consumer offsets for checkpoint:*
> {code:java}
> Caused by: org.apache.kafka.common.errors.CoordinatorNotAvailableException: 
> The coordinator is not available.
> 2022-06-06 14:19:52,297 WARN  
> org.apache.flink.connector.kafka.source.reader.KafkaSourceReader [] - Failed 
> to commit consumer offsets for checkpoint 464521
> org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset 
> commit failed with a retriable exception. You should retry committing the 
> latest consumed offsets.
> Caused by: org.apache.kafka.common.errors.CoordinatorNotAvailableException: 
> The coordinator is not available.
> 2022-06-06 14:20:02,297 WARN  
> org.apache.flink.connector.kafka.source.reader.KafkaSourceReader [] - Failed 
> to commit consumer offsets for checkpoint 464522
> org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset 
> commit failed with a retriable exception. You should retry committing the 
> latest consumed offsets.
> Caused by: org.apache.kafka.common.errors.CoordinatorNotAvailableException: 
> The coordinator is not available.
> 2022-06-06 14:20:02,297 WARN  
> org.apache.flink.connector.kafka.source.reader.KafkaSourceReader [] - Failed 
> to commit consumer offsets for checkpoint 464523
> org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset 
> commit failed with a retriable exception. You should retry committing the 
> latest consumed offsets
> . fails permanently until the job restart
>  {code}
> *Consumer Config:*
> {code:java}
> allow.auto.create.topics = true
> auto.commit.interval.ms = 5000
> auto.offset.reset = none
> bootstrap.servers = [test.host.net:9093]
> check.crcs = true
> client.dns.lookup = use_all_dns_ips
> client.id = test-client-id
> client.rack =
> connections.max.idle.ms = 18
> default.api.timeout.ms = 6
> enable.auto.commit = false
> exclude.internal.topics = true
> fetch.max.bytes = 52428800
> fetch.max.wait.ms = 500
> fetch.min.bytes = 1
> group.id = test-group-id
> group.instance.id = null
> heartbeat.interval.ms = 3000
> interceptor.classes = []
> internal.leave.group.on.close = true
> internal.throw.on.fetch.stable.offset.unsupported = false
> isolation.level = read_uncommitted
> key.deserializer = class 
> org.apache.kafka.common.serialization.ByteArrayDeserializer
> max.partition.fetch.bytes = 1048576
> max.poll.interval.ms = 30
> max.poll.records = 500
> metadata.max.age.ms = 18
> metric.reporters = []
> metrics.num.samples = 2
> metrics.recording.level = INFO
> metrics.sample.window.ms = 3
> partition.assignment.strategy = [class 
> org.apache.kafka.clients.consumer.RangeAssignor]
> receive.buffer.bytes = 65536
> reconnect.backoff.max.ms = 1000
> reconnect.backoff.ms = 50
> request.timeout.ms = 6
> retry.backoff.ms = 100
> sasl.client.callback.handler.class = null
> sasl.jaas.config = [hidden]
> sasl.kerberos.kinit.cmd = /usr/bin/kinit
> sasl.kerberos.min.time.before.relogin = 6
> sasl.kerberos.service.name = null
> sasl.kerberos.ticket.renew.jitter = 0.05
> sasl.kerberos.ticket.renew.window.factor = 0.8
> sasl.login.callback.handler.class = class 
> 

[jira] [Commented] (FLINK-27962) KafkaSourceReader fails to commit consumer offsets for checkpoints

2022-09-21 Thread Tommy Schnabel (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27962?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17607752#comment-17607752
 ] 

Tommy Schnabel commented on FLINK-27962:


Hi [~renqs], no we didn't have any broker failures around this time. We 
actually had another cluster running 1.14.4 reading from the same topics which 
saw no issues during this time

> KafkaSourceReader fails to commit consumer offsets for checkpoints
> --
>
> Key: FLINK-27962
> URL: https://issues.apache.org/jira/browse/FLINK-27962
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.14.4, 1.15.0
>Reporter: Dmytro
>Priority: Major
> Attachments: Screen Shot 2022-09-20 at 5.18.04 PM.png
>
>
> The KafkaSourceReader works well for many hours, then fails and re-connects 
> successfully, then continues to work some time. After the first three 
> failures it hangs on "Offset commit failed" and never connected again. 
> Restarting the Flink job does help and it works until the next "3 times fail".
> I am aware about [the 
> note|https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/kafka/#consumer-offset-committing]
>  that Kafka source does NOT rely on committed offsets for fault tolerance. 
> Committing offset is only for exposing the progress of consumer and consuming 
> group for monitoring.
> I agree if the failures are only periodic, but I would argue complete 
> failures are unacceptable
> *Failed to commit consumer offsets for checkpoint:*
> {code:java}
> Caused by: org.apache.kafka.common.errors.CoordinatorNotAvailableException: 
> The coordinator is not available.
> 2022-06-06 14:19:52,297 WARN  
> org.apache.flink.connector.kafka.source.reader.KafkaSourceReader [] - Failed 
> to commit consumer offsets for checkpoint 464521
> org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset 
> commit failed with a retriable exception. You should retry committing the 
> latest consumed offsets.
> Caused by: org.apache.kafka.common.errors.CoordinatorNotAvailableException: 
> The coordinator is not available.
> 2022-06-06 14:20:02,297 WARN  
> org.apache.flink.connector.kafka.source.reader.KafkaSourceReader [] - Failed 
> to commit consumer offsets for checkpoint 464522
> org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset 
> commit failed with a retriable exception. You should retry committing the 
> latest consumed offsets.
> Caused by: org.apache.kafka.common.errors.CoordinatorNotAvailableException: 
> The coordinator is not available.
> 2022-06-06 14:20:02,297 WARN  
> org.apache.flink.connector.kafka.source.reader.KafkaSourceReader [] - Failed 
> to commit consumer offsets for checkpoint 464523
> org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset 
> commit failed with a retriable exception. You should retry committing the 
> latest consumed offsets
> . fails permanently until the job restart
>  {code}
> *Consumer Config:*
> {code:java}
> allow.auto.create.topics = true
> auto.commit.interval.ms = 5000
> auto.offset.reset = none
> bootstrap.servers = [test.host.net:9093]
> check.crcs = true
> client.dns.lookup = use_all_dns_ips
> client.id = test-client-id
> client.rack =
> connections.max.idle.ms = 18
> default.api.timeout.ms = 6
> enable.auto.commit = false
> exclude.internal.topics = true
> fetch.max.bytes = 52428800
> fetch.max.wait.ms = 500
> fetch.min.bytes = 1
> group.id = test-group-id
> group.instance.id = null
> heartbeat.interval.ms = 3000
> interceptor.classes = []
> internal.leave.group.on.close = true
> internal.throw.on.fetch.stable.offset.unsupported = false
> isolation.level = read_uncommitted
> key.deserializer = class 
> org.apache.kafka.common.serialization.ByteArrayDeserializer
> max.partition.fetch.bytes = 1048576
> max.poll.interval.ms = 30
> max.poll.records = 500
> metadata.max.age.ms = 18
> metric.reporters = []
> metrics.num.samples = 2
> metrics.recording.level = INFO
> metrics.sample.window.ms = 3
> partition.assignment.strategy = [class 
> org.apache.kafka.clients.consumer.RangeAssignor]
> receive.buffer.bytes = 65536
> reconnect.backoff.max.ms = 1000
> reconnect.backoff.ms = 50
> request.timeout.ms = 6
> retry.backoff.ms = 100
> sasl.client.callback.handler.class = null
> sasl.jaas.config = [hidden]
> sasl.kerberos.kinit.cmd = /usr/bin/kinit
> sasl.kerberos.min.time.before.relogin = 6
> sasl.kerberos.service.name = null
> sasl.kerberos.ticket.renew.jitter = 0.05
> sasl.kerberos.ticket.renew.window.factor = 0.8
> sasl.login.callback.handler.class = class 
> com.test.kafka.security.AzureAuthenticateCallbackHandler
> sasl.login.class = null
> sasl.login.refresh.buffer.seconds = 300
> sasl.login.refresh.min.period.seconds = 60
> 

[jira] [Updated] (FLINK-27962) KafkaSourceReader fails to commit consumer offsets for checkpoints

2022-09-20 Thread Tommy Schnabel (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27962?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tommy Schnabel updated FLINK-27962:
---
Attachment: Screen Shot 2022-09-20 at 5.18.04 PM.png

> KafkaSourceReader fails to commit consumer offsets for checkpoints
> --
>
> Key: FLINK-27962
> URL: https://issues.apache.org/jira/browse/FLINK-27962
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.14.4, 1.15.0
>Reporter: Dmytro
>Priority: Major
> Attachments: Screen Shot 2022-09-20 at 5.18.04 PM.png
>
>
> The KafkaSourceReader works well for many hours, then fails and re-connects 
> successfully, then continues to work some time. After the first three 
> failures it hangs on "Offset commit failed" and never connected again. 
> Restarting the Flink job does help and it works until the next "3 times fail".
> I am aware about [the 
> note|https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/kafka/#consumer-offset-committing]
>  that Kafka source does NOT rely on committed offsets for fault tolerance. 
> Committing offset is only for exposing the progress of consumer and consuming 
> group for monitoring.
> I agree if the failures are only periodic, but I would argue complete 
> failures are unacceptable
> *Failed to commit consumer offsets for checkpoint:*
> {code:java}
> Caused by: org.apache.kafka.common.errors.CoordinatorNotAvailableException: 
> The coordinator is not available.
> 2022-06-06 14:19:52,297 WARN  
> org.apache.flink.connector.kafka.source.reader.KafkaSourceReader [] - Failed 
> to commit consumer offsets for checkpoint 464521
> org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset 
> commit failed with a retriable exception. You should retry committing the 
> latest consumed offsets.
> Caused by: org.apache.kafka.common.errors.CoordinatorNotAvailableException: 
> The coordinator is not available.
> 2022-06-06 14:20:02,297 WARN  
> org.apache.flink.connector.kafka.source.reader.KafkaSourceReader [] - Failed 
> to commit consumer offsets for checkpoint 464522
> org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset 
> commit failed with a retriable exception. You should retry committing the 
> latest consumed offsets.
> Caused by: org.apache.kafka.common.errors.CoordinatorNotAvailableException: 
> The coordinator is not available.
> 2022-06-06 14:20:02,297 WARN  
> org.apache.flink.connector.kafka.source.reader.KafkaSourceReader [] - Failed 
> to commit consumer offsets for checkpoint 464523
> org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset 
> commit failed with a retriable exception. You should retry committing the 
> latest consumed offsets
> . fails permanently until the job restart
>  {code}
> *Consumer Config:*
> {code:java}
> allow.auto.create.topics = true
> auto.commit.interval.ms = 5000
> auto.offset.reset = none
> bootstrap.servers = [test.host.net:9093]
> check.crcs = true
> client.dns.lookup = use_all_dns_ips
> client.id = test-client-id
> client.rack =
> connections.max.idle.ms = 18
> default.api.timeout.ms = 6
> enable.auto.commit = false
> exclude.internal.topics = true
> fetch.max.bytes = 52428800
> fetch.max.wait.ms = 500
> fetch.min.bytes = 1
> group.id = test-group-id
> group.instance.id = null
> heartbeat.interval.ms = 3000
> interceptor.classes = []
> internal.leave.group.on.close = true
> internal.throw.on.fetch.stable.offset.unsupported = false
> isolation.level = read_uncommitted
> key.deserializer = class 
> org.apache.kafka.common.serialization.ByteArrayDeserializer
> max.partition.fetch.bytes = 1048576
> max.poll.interval.ms = 30
> max.poll.records = 500
> metadata.max.age.ms = 18
> metric.reporters = []
> metrics.num.samples = 2
> metrics.recording.level = INFO
> metrics.sample.window.ms = 3
> partition.assignment.strategy = [class 
> org.apache.kafka.clients.consumer.RangeAssignor]
> receive.buffer.bytes = 65536
> reconnect.backoff.max.ms = 1000
> reconnect.backoff.ms = 50
> request.timeout.ms = 6
> retry.backoff.ms = 100
> sasl.client.callback.handler.class = null
> sasl.jaas.config = [hidden]
> sasl.kerberos.kinit.cmd = /usr/bin/kinit
> sasl.kerberos.min.time.before.relogin = 6
> sasl.kerberos.service.name = null
> sasl.kerberos.ticket.renew.jitter = 0.05
> sasl.kerberos.ticket.renew.window.factor = 0.8
> sasl.login.callback.handler.class = class 
> com.test.kafka.security.AzureAuthenticateCallbackHandler
> sasl.login.class = null
> sasl.login.refresh.buffer.seconds = 300
> sasl.login.refresh.min.period.seconds = 60
> sasl.login.refresh.window.factor = 0.8
> sasl.login.refresh.window.jitter = 0.05
> sasl.mechanism = OAUTHBEARER
> security.protocol = SASL_SSL
> security.providers = null

[jira] [Commented] (FLINK-27962) KafkaSourceReader fails to commit consumer offsets for checkpoints

2022-09-20 Thread Tommy Schnabel (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27962?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17607408#comment-17607408
 ] 

Tommy Schnabel commented on FLINK-27962:


 !Screen Shot 2022-09-20 at 5.18.04 PM.png! 

> KafkaSourceReader fails to commit consumer offsets for checkpoints
> --
>
> Key: FLINK-27962
> URL: https://issues.apache.org/jira/browse/FLINK-27962
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.14.4, 1.15.0
>Reporter: Dmytro
>Priority: Major
> Attachments: Screen Shot 2022-09-20 at 5.18.04 PM.png
>
>
> The KafkaSourceReader works well for many hours, then fails and re-connects 
> successfully, then continues to work some time. After the first three 
> failures it hangs on "Offset commit failed" and never connected again. 
> Restarting the Flink job does help and it works until the next "3 times fail".
> I am aware about [the 
> note|https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/kafka/#consumer-offset-committing]
>  that Kafka source does NOT rely on committed offsets for fault tolerance. 
> Committing offset is only for exposing the progress of consumer and consuming 
> group for monitoring.
> I agree if the failures are only periodic, but I would argue complete 
> failures are unacceptable
> *Failed to commit consumer offsets for checkpoint:*
> {code:java}
> Caused by: org.apache.kafka.common.errors.CoordinatorNotAvailableException: 
> The coordinator is not available.
> 2022-06-06 14:19:52,297 WARN  
> org.apache.flink.connector.kafka.source.reader.KafkaSourceReader [] - Failed 
> to commit consumer offsets for checkpoint 464521
> org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset 
> commit failed with a retriable exception. You should retry committing the 
> latest consumed offsets.
> Caused by: org.apache.kafka.common.errors.CoordinatorNotAvailableException: 
> The coordinator is not available.
> 2022-06-06 14:20:02,297 WARN  
> org.apache.flink.connector.kafka.source.reader.KafkaSourceReader [] - Failed 
> to commit consumer offsets for checkpoint 464522
> org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset 
> commit failed with a retriable exception. You should retry committing the 
> latest consumed offsets.
> Caused by: org.apache.kafka.common.errors.CoordinatorNotAvailableException: 
> The coordinator is not available.
> 2022-06-06 14:20:02,297 WARN  
> org.apache.flink.connector.kafka.source.reader.KafkaSourceReader [] - Failed 
> to commit consumer offsets for checkpoint 464523
> org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset 
> commit failed with a retriable exception. You should retry committing the 
> latest consumed offsets
> . fails permanently until the job restart
>  {code}
> *Consumer Config:*
> {code:java}
> allow.auto.create.topics = true
> auto.commit.interval.ms = 5000
> auto.offset.reset = none
> bootstrap.servers = [test.host.net:9093]
> check.crcs = true
> client.dns.lookup = use_all_dns_ips
> client.id = test-client-id
> client.rack =
> connections.max.idle.ms = 18
> default.api.timeout.ms = 6
> enable.auto.commit = false
> exclude.internal.topics = true
> fetch.max.bytes = 52428800
> fetch.max.wait.ms = 500
> fetch.min.bytes = 1
> group.id = test-group-id
> group.instance.id = null
> heartbeat.interval.ms = 3000
> interceptor.classes = []
> internal.leave.group.on.close = true
> internal.throw.on.fetch.stable.offset.unsupported = false
> isolation.level = read_uncommitted
> key.deserializer = class 
> org.apache.kafka.common.serialization.ByteArrayDeserializer
> max.partition.fetch.bytes = 1048576
> max.poll.interval.ms = 30
> max.poll.records = 500
> metadata.max.age.ms = 18
> metric.reporters = []
> metrics.num.samples = 2
> metrics.recording.level = INFO
> metrics.sample.window.ms = 3
> partition.assignment.strategy = [class 
> org.apache.kafka.clients.consumer.RangeAssignor]
> receive.buffer.bytes = 65536
> reconnect.backoff.max.ms = 1000
> reconnect.backoff.ms = 50
> request.timeout.ms = 6
> retry.backoff.ms = 100
> sasl.client.callback.handler.class = null
> sasl.jaas.config = [hidden]
> sasl.kerberos.kinit.cmd = /usr/bin/kinit
> sasl.kerberos.min.time.before.relogin = 6
> sasl.kerberos.service.name = null
> sasl.kerberos.ticket.renew.jitter = 0.05
> sasl.kerberos.ticket.renew.window.factor = 0.8
> sasl.login.callback.handler.class = class 
> com.test.kafka.security.AzureAuthenticateCallbackHandler
> sasl.login.class = null
> sasl.login.refresh.buffer.seconds = 300
> sasl.login.refresh.min.period.seconds = 60
> sasl.login.refresh.window.factor = 0.8
> sasl.login.refresh.window.jitter = 0.05
> sasl.mechanism = OAUTHBEARER
> security.protocol = SASL_SSL

[jira] [Comment Edited] (FLINK-27962) KafkaSourceReader fails to commit consumer offsets for checkpoints

2022-09-20 Thread Tommy Schnabel (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27962?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17607407#comment-17607407
 ] 

Tommy Schnabel edited comment on FLINK-27962 at 9/20/22 9:21 PM:
-


Hi there, we're also seeing this at Twilio Segment on versions 1.15.1 and 
1.15.2, but _not_ on 1.14.4. I didn't test 1.15.0 but I imagine it's present 
there. Here's what we're seeing in one of our task manager's logs:
{code}
2022-09-20 15:58:07,978 WARN  
org.apache.flink.connector.kafka.source.reader.KafkaSourceReader [] - Failed to 
commit consumer offsets for checkpoint 363507
org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset commit 
failed with a retriable exception. You should retry committing the latest 
consumed offsets.
Caused by: org.apache.kafka.common.errors.CoordinatorNotAvailableException: The 
coordinator is not available.
2022-09-20 15:58:07,981 WARN  
org.apache.flink.connector.kafka.source.reader.KafkaSourceReader [] - Failed to 
commit consumer offsets for checkpoint 363507
org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset commit 
failed with a retriable exception. You should retry committing the latest 
consumed offsets.
Caused by: org.apache.kafka.common.errors.CoordinatorNotAvailableException: The 
coordinator is not available.
2022-09-20 15:58:08,029 WARN  
org.apache.flink.connector.kafka.source.reader.KafkaSourceReader [] - Failed to 
commit consumer offsets for checkpoint 363507
org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset commit 
failed with a retriable exception. You should retry committing the latest 
consumed offsets.
Caused by: org.apache.kafka.common.errors.CoordinatorNotAvailableException: The 
coordinator is not available.
2022-09-20 15:58:08,055 WARN  
org.apache.flink.connector.kafka.source.reader.KafkaSourceReader [] - Failed to 
commit consumer offsets for checkpoint 363507
org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset commit 
failed with a retriable exception. You should retry committing the latest 
consumed offsets.
Caused by: org.apache.kafka.common.errors.CoordinatorNotAvailableException: The 
coordinator is not available.
{code}

Is there any ETA on getting this fixed?

Updating to attaching one of our graphs to show how we're seeing 2/3rds of our 
partitions not committing offsets while 1/3rd does go down. Inspecting further 
I've discovered that we _are_ processing all those supposedly lagged messages, 
the offsets are just not being committed back to kafka.


was (Author: JIRAUSER294826):


Hi there, we're also seeing this at Twilio Segment on versions 1.15.1 and 
1.15.2, but _not_ on 1.14.4. I didn't test 1.15.0 but I imagine it's present 
there. Here's what we're seeing in one of our task manager's logs:
{code}
2022-09-20 15:58:07,978 WARN  
org.apache.flink.connector.kafka.source.reader.KafkaSourceReader [] - Failed to 
commit consumer offsets for checkpoint 363507
org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset commit 
failed with a retriable exception. You should retry committing the latest 
consumed offsets.
Caused by: org.apache.kafka.common.errors.CoordinatorNotAvailableException: The 
coordinator is not available.
2022-09-20 15:58:07,981 WARN  
org.apache.flink.connector.kafka.source.reader.KafkaSourceReader [] - Failed to 
commit consumer offsets for checkpoint 363507
org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset commit 
failed with a retriable exception. You should retry committing the latest 
consumed offsets.
Caused by: org.apache.kafka.common.errors.CoordinatorNotAvailableException: The 
coordinator is not available.
2022-09-20 15:58:08,029 WARN  
org.apache.flink.connector.kafka.source.reader.KafkaSourceReader [] - Failed to 
commit consumer offsets for checkpoint 363507
org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset commit 
failed with a retriable exception. You should retry committing the latest 
consumed offsets.
Caused by: org.apache.kafka.common.errors.CoordinatorNotAvailableException: The 
coordinator is not available.
2022-09-20 15:58:08,055 WARN  
org.apache.flink.connector.kafka.source.reader.KafkaSourceReader [] - Failed to 
commit consumer offsets for checkpoint 363507
org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset commit 
failed with a retriable exception. You should retry committing the latest 
consumed offsets.
Caused by: org.apache.kafka.common.errors.CoordinatorNotAvailableException: The 
coordinator is not available.
{code}

Is there any ETA on getting this fixed?

> KafkaSourceReader fails to commit consumer offsets for checkpoints
> --
>
> Key: FLINK-27962
> URL: https://issues.apache.org/jira/browse/FLINK-27962
> Project: Flink
>  

[jira] [Comment Edited] (FLINK-27962) KafkaSourceReader fails to commit consumer offsets for checkpoints

2022-09-20 Thread Tommy Schnabel (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27962?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17607407#comment-17607407
 ] 

Tommy Schnabel edited comment on FLINK-27962 at 9/20/22 9:15 PM:
-



Hi there, we're also seeing this at Twilio Segment on versions 1.15.1 and 
1.15.2, but _not_ on 1.14.4. I didn't test 1.15.0 but I imagine it's present 
there. Here's what we're seeing in one of our task manager's logs:
{code}
2022-09-20 15:58:07,978 WARN  
org.apache.flink.connector.kafka.source.reader.KafkaSourceReader [] - Failed to 
commit consumer offsets for checkpoint 363507
org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset commit 
failed with a retriable exception. You should retry committing the latest 
consumed offsets.
Caused by: org.apache.kafka.common.errors.CoordinatorNotAvailableException: The 
coordinator is not available.
2022-09-20 15:58:07,981 WARN  
org.apache.flink.connector.kafka.source.reader.KafkaSourceReader [] - Failed to 
commit consumer offsets for checkpoint 363507
org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset commit 
failed with a retriable exception. You should retry committing the latest 
consumed offsets.
Caused by: org.apache.kafka.common.errors.CoordinatorNotAvailableException: The 
coordinator is not available.
2022-09-20 15:58:08,029 WARN  
org.apache.flink.connector.kafka.source.reader.KafkaSourceReader [] - Failed to 
commit consumer offsets for checkpoint 363507
org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset commit 
failed with a retriable exception. You should retry committing the latest 
consumed offsets.
Caused by: org.apache.kafka.common.errors.CoordinatorNotAvailableException: The 
coordinator is not available.
2022-09-20 15:58:08,055 WARN  
org.apache.flink.connector.kafka.source.reader.KafkaSourceReader [] - Failed to 
commit consumer offsets for checkpoint 363507
org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset commit 
failed with a retriable exception. You should retry committing the latest 
consumed offsets.
Caused by: org.apache.kafka.common.errors.CoordinatorNotAvailableException: The 
coordinator is not available.
{code}

Is there any ETA on getting this fixed?


was (Author: JIRAUSER294826):
Hi there, we're also seeing this at Twilio Segment on versions 1.15.1 and 
1.15.2, but _not_ on 1.14.4. I didn't test 1.15.0 but I imagine it's present 
there. Here's what we're seeing in one of our task manager's logs:
```
2022-09-20 15:58:07,978 WARN  
org.apache.flink.connector.kafka.source.reader.KafkaSourceReader [] - Failed to 
commit consumer offsets for checkpoint 363507
org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset commit 
failed with a retriable exception. You should retry committing the latest 
consumed offsets.
Caused by: org.apache.kafka.common.errors.CoordinatorNotAvailableException: The 
coordinator is not available.
2022-09-20 15:58:07,981 WARN  
org.apache.flink.connector.kafka.source.reader.KafkaSourceReader [] - Failed to 
commit consumer offsets for checkpoint 363507
org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset commit 
failed with a retriable exception. You should retry committing the latest 
consumed offsets.
Caused by: org.apache.kafka.common.errors.CoordinatorNotAvailableException: The 
coordinator is not available.
2022-09-20 15:58:08,029 WARN  
org.apache.flink.connector.kafka.source.reader.KafkaSourceReader [] - Failed to 
commit consumer offsets for checkpoint 363507
org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset commit 
failed with a retriable exception. You should retry committing the latest 
consumed offsets.
Caused by: org.apache.kafka.common.errors.CoordinatorNotAvailableException: The 
coordinator is not available.
2022-09-20 15:58:08,055 WARN  
org.apache.flink.connector.kafka.source.reader.KafkaSourceReader [] - Failed to 
commit consumer offsets for checkpoint 363507
org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset commit 
failed with a retriable exception. You should retry committing the latest 
consumed offsets.
Caused by: org.apache.kafka.common.errors.CoordinatorNotAvailableException: The 
coordinator is not available.
```

Is there any ETA on getting this fixed?

> KafkaSourceReader fails to commit consumer offsets for checkpoints
> --
>
> Key: FLINK-27962
> URL: https://issues.apache.org/jira/browse/FLINK-27962
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.14.4, 1.15.0
>Reporter: Dmytro
>Priority: Major
>
> The KafkaSourceReader works well for many hours, then fails and re-connects 
> successfully, then continues to work some time. After the 

[jira] [Commented] (FLINK-27962) KafkaSourceReader fails to commit consumer offsets for checkpoints

2022-09-20 Thread Tommy Schnabel (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27962?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17607407#comment-17607407
 ] 

Tommy Schnabel commented on FLINK-27962:


Hi there, we're also seeing this at Twilio Segment on versions 1.15.1 and 
1.15.2, but _not_ on 1.14.4. I didn't test 1.15.0 but I imagine it's present 
there. Here's what we're seeing in one of our task manager's logs:
```
2022-09-20 15:58:07,978 WARN  
org.apache.flink.connector.kafka.source.reader.KafkaSourceReader [] - Failed to 
commit consumer offsets for checkpoint 363507
org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset commit 
failed with a retriable exception. You should retry committing the latest 
consumed offsets.
Caused by: org.apache.kafka.common.errors.CoordinatorNotAvailableException: The 
coordinator is not available.
2022-09-20 15:58:07,981 WARN  
org.apache.flink.connector.kafka.source.reader.KafkaSourceReader [] - Failed to 
commit consumer offsets for checkpoint 363507
org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset commit 
failed with a retriable exception. You should retry committing the latest 
consumed offsets.
Caused by: org.apache.kafka.common.errors.CoordinatorNotAvailableException: The 
coordinator is not available.
2022-09-20 15:58:08,029 WARN  
org.apache.flink.connector.kafka.source.reader.KafkaSourceReader [] - Failed to 
commit consumer offsets for checkpoint 363507
org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset commit 
failed with a retriable exception. You should retry committing the latest 
consumed offsets.
Caused by: org.apache.kafka.common.errors.CoordinatorNotAvailableException: The 
coordinator is not available.
2022-09-20 15:58:08,055 WARN  
org.apache.flink.connector.kafka.source.reader.KafkaSourceReader [] - Failed to 
commit consumer offsets for checkpoint 363507
org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset commit 
failed with a retriable exception. You should retry committing the latest 
consumed offsets.
Caused by: org.apache.kafka.common.errors.CoordinatorNotAvailableException: The 
coordinator is not available.
```

Is there any ETA on getting this fixed?

> KafkaSourceReader fails to commit consumer offsets for checkpoints
> --
>
> Key: FLINK-27962
> URL: https://issues.apache.org/jira/browse/FLINK-27962
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.14.4, 1.15.0
>Reporter: Dmytro
>Priority: Major
>
> The KafkaSourceReader works well for many hours, then fails and re-connects 
> successfully, then continues to work some time. After the first three 
> failures it hangs on "Offset commit failed" and never connected again. 
> Restarting the Flink job does help and it works until the next "3 times fail".
> I am aware about [the 
> note|https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/kafka/#consumer-offset-committing]
>  that Kafka source does NOT rely on committed offsets for fault tolerance. 
> Committing offset is only for exposing the progress of consumer and consuming 
> group for monitoring.
> I agree if the failures are only periodic, but I would argue complete 
> failures are unacceptable
> *Failed to commit consumer offsets for checkpoint:*
> {code:java}
> Caused by: org.apache.kafka.common.errors.CoordinatorNotAvailableException: 
> The coordinator is not available.
> 2022-06-06 14:19:52,297 WARN  
> org.apache.flink.connector.kafka.source.reader.KafkaSourceReader [] - Failed 
> to commit consumer offsets for checkpoint 464521
> org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset 
> commit failed with a retriable exception. You should retry committing the 
> latest consumed offsets.
> Caused by: org.apache.kafka.common.errors.CoordinatorNotAvailableException: 
> The coordinator is not available.
> 2022-06-06 14:20:02,297 WARN  
> org.apache.flink.connector.kafka.source.reader.KafkaSourceReader [] - Failed 
> to commit consumer offsets for checkpoint 464522
> org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset 
> commit failed with a retriable exception. You should retry committing the 
> latest consumed offsets.
> Caused by: org.apache.kafka.common.errors.CoordinatorNotAvailableException: 
> The coordinator is not available.
> 2022-06-06 14:20:02,297 WARN  
> org.apache.flink.connector.kafka.source.reader.KafkaSourceReader [] - Failed 
> to commit consumer offsets for checkpoint 464523
> org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset 
> commit failed with a retriable exception. You should retry committing the 
> latest consumed offsets
> . fails permanently until the job restart
>  {code}
> *Consumer Config:*
> 

[jira] [Commented] (FLINK-23886) An exception is thrown out when recover job timers from checkpoint file

2022-09-15 Thread Tommy Schnabel (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17605390#comment-17605390
 ] 

Tommy Schnabel commented on FLINK-23886:


Hi [~kphan102] and [~yunta] , we found that switching to heap timers did work, 
but it does not solve the issue of having corrupted state in our checkpoints, 
so switching back to RocksDB based timers would cause us to run into the same 
exception

Thinking about this again, I wonder if switching to heap timers, taking a 
savepoint, and then restoring from that savepoint would cause the corrupted 
timer state to be gone, since perhaps the timers won't be included in the 
savepoint. Let me know if you have any luck with that!

> An exception is thrown out when recover job timers from checkpoint file
> ---
>
> Key: FLINK-23886
> URL: https://issues.apache.org/jira/browse/FLINK-23886
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Affects Versions: 1.10.0, 1.11.3, 1.13.2, 1.13.5, 1.14.4
>Reporter: Jing Zhang
>Assignee: Yuan Mei
>Priority: Major
> Attachments: image-2021-08-25-16-38-04-023.png, 
> image-2021-08-25-16-38-12-308.png, image-2021-08-25-17-06-29-806.png, 
> image-2021-08-25-17-07-38-327.png, segment-drop-corrupted-timer-state.diff
>
>
> A user report the bug in the [mailist. 
> |http://mail-archives.apache.org/mod_mbox/flink-user/202108.mbox/%3ccakmsf43j14nkjmgjuy4dh5qn2vbjtw4tfh4pmmuyvcvfhgf...@mail.gmail.com%3E]I
>  paste the content here.
> Setup Specifics:
>  Version: 1.6.2
>  RocksDB Map State
>  Timers stored in rocksdb
>   
>  When we have this job running for long periods of time like > 30 days, if 
> for some reason the job restarts, we encounter "Error while deserializing the 
> element". Is this a known issue fixed in later versions? I see some changes 
> to code for FLINK-10175, but we don't use any queryable state 
>   
>  Below is the stack trace
>   
>  org.apache.flink.util.FlinkRuntimeException: Error while deserializing the 
> element.
> at 
> org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.deserializeElement(RocksDBCachingPriorityQueueSet.java:389)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.peek(RocksDBCachingPriorityQueueSet.java:146)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.peek(RocksDBCachingPriorityQueueSet.java:56)
> at 
> org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue$InternalPriorityQueueComparator.comparePriority(KeyGroupPartitionedPriorityQueue.java:274)
> at 
> org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue$InternalPriorityQueueComparator.comparePriority(KeyGroupPartitionedPriorityQueue.java:261)
> at 
> org.apache.flink.runtime.state.heap.HeapPriorityQueue.isElementPriorityLessThen(HeapPriorityQueue.java:164)
> at 
> org.apache.flink.runtime.state.heap.HeapPriorityQueue.siftUp(HeapPriorityQueue.java:121)
> at 
> org.apache.flink.runtime.state.heap.HeapPriorityQueue.addInternal(HeapPriorityQueue.java:85)
> at 
> org.apache.flink.runtime.state.heap.AbstractHeapPriorityQueue.add(AbstractHeapPriorityQueue.java:73)
> at 
> org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.(KeyGroupPartitionedPriorityQueue.java:89)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBPriorityQueueSetFactory.create(RocksDBKeyedStateBackend.java:2792)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.create(RocksDBKeyedStateBackend.java:450)
> at 
> org.apache.flink.streaming.api.operators.InternalTimeServiceManager.createTimerPriorityQueue(InternalTimeServiceManager.java:121)
> at 
> org.apache.flink.streaming.api.operators.InternalTimeServiceManager.registerOrGetTimerService(InternalTimeServiceManager.java:106)
> at 
> org.apache.flink.streaming.api.operators.InternalTimeServiceManager.getInternalTimerService(InternalTimeServiceManager.java:87)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.getInternalTimerService(AbstractStreamOperator.java:764)
> at 
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.open(KeyedProcessOperator.java:61)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.io.EOFException
> at java.io.DataInputStream.readUnsignedByte(DataInputStream.java:290)
> at org.apache.flink.types.StringValue.readString(StringValue.java:769)
> at 
> 

[jira] [Commented] (FLINK-23886) An exception is thrown out when recover job timers from checkpoint file

2022-09-12 Thread Tommy Schnabel (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17603137#comment-17603137
 ] 

Tommy Schnabel commented on FLINK-23886:


Hi there, reporting in from Twilio Segment that we've seen this in Flink 1.14.4:

 
{code:java}
Caused by: java.io.EOFException
        at 
org.apache.flink.core.memory.DataInputDeserializer.readUnsignedByte(DataInputDeserializer.java:329)
        at org.apache.flink.types.StringValue.readString(StringValue.java:781)
        at 
org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:73)
        at 
org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:31)
        at 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:126)
        at 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:32)
        at 
org.apache.flink.streaming.api.operators.TimerSerializer.deserialize(TimerSerializer.java:161)
        at 
org.apache.flink.streaming.api.operators.TimerSerializer.deserialize(TimerSerializer.java:43)
        at 
org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.deserializeElement(RocksDBCachingPriorityQueueSet.java:386)
        ... 26 more {code}
 

 

We remediated by forking and patching Flink to drop the corrupted state. We 
then ran data through our system to recreate any timers that _could_ have been 
lost. All in all we dropped 84 corrupted timers.

Our patch included:
 * Catching and rethrowing a new custom exception when we see an EOFException 
in TimerSerializer#deserialize
 * Modifying RocksDBCachingPriorityQueueSet to catch our new exception and skip 
to the next element
 * Some null handling since we're explicitly returning null in cases where we 
detect corrupted state

I've attached the diff from the commit in our fork in case it is helpful for 
anyone else. We're not planning to actively run that code, but instead keep it 
around as a remediation tool if we were to ever run into this again.

We're happy to help and provide more details should they be useful!

[^segment-drop-corrupted-timer-state.diff]

> An exception is thrown out when recover job timers from checkpoint file
> ---
>
> Key: FLINK-23886
> URL: https://issues.apache.org/jira/browse/FLINK-23886
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Affects Versions: 1.10.0, 1.11.3, 1.13.2
>Reporter: Jing Zhang
>Priority: Major
> Attachments: image-2021-08-25-16-38-04-023.png, 
> image-2021-08-25-16-38-12-308.png, image-2021-08-25-17-06-29-806.png, 
> image-2021-08-25-17-07-38-327.png, segment-drop-corrupted-timer-state.diff
>
>
> A user report the bug in the [mailist. 
> |http://mail-archives.apache.org/mod_mbox/flink-user/202108.mbox/%3ccakmsf43j14nkjmgjuy4dh5qn2vbjtw4tfh4pmmuyvcvfhgf...@mail.gmail.com%3E]I
>  paste the content here.
> Setup Specifics:
>  Version: 1.6.2
>  RocksDB Map State
>  Timers stored in rocksdb
>   
>  When we have this job running for long periods of time like > 30 days, if 
> for some reason the job restarts, we encounter "Error while deserializing the 
> element". Is this a known issue fixed in later versions? I see some changes 
> to code for FLINK-10175, but we don't use any queryable state 
>   
>  Below is the stack trace
>   
>  org.apache.flink.util.FlinkRuntimeException: Error while deserializing the 
> element.
> at 
> org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.deserializeElement(RocksDBCachingPriorityQueueSet.java:389)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.peek(RocksDBCachingPriorityQueueSet.java:146)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.peek(RocksDBCachingPriorityQueueSet.java:56)
> at 
> org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue$InternalPriorityQueueComparator.comparePriority(KeyGroupPartitionedPriorityQueue.java:274)
> at 
> org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue$InternalPriorityQueueComparator.comparePriority(KeyGroupPartitionedPriorityQueue.java:261)
> at 
> org.apache.flink.runtime.state.heap.HeapPriorityQueue.isElementPriorityLessThen(HeapPriorityQueue.java:164)
> at 
> org.apache.flink.runtime.state.heap.HeapPriorityQueue.siftUp(HeapPriorityQueue.java:121)
> at 
> org.apache.flink.runtime.state.heap.HeapPriorityQueue.addInternal(HeapPriorityQueue.java:85)
> at 
> org.apache.flink.runtime.state.heap.AbstractHeapPriorityQueue.add(AbstractHeapPriorityQueue.java:73)
> at 
> org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.(KeyGroupPartitionedPriorityQueue.java:89)
> at 
> 

[jira] [Updated] (FLINK-23886) An exception is thrown out when recover job timers from checkpoint file

2022-09-12 Thread Tommy Schnabel (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-23886?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tommy Schnabel updated FLINK-23886:
---
Attachment: segment-drop-corrupted-timer-state.diff

> An exception is thrown out when recover job timers from checkpoint file
> ---
>
> Key: FLINK-23886
> URL: https://issues.apache.org/jira/browse/FLINK-23886
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Affects Versions: 1.10.0, 1.11.3, 1.13.2
>Reporter: Jing Zhang
>Priority: Major
> Attachments: image-2021-08-25-16-38-04-023.png, 
> image-2021-08-25-16-38-12-308.png, image-2021-08-25-17-06-29-806.png, 
> image-2021-08-25-17-07-38-327.png, segment-drop-corrupted-timer-state.diff
>
>
> A user report the bug in the [mailist. 
> |http://mail-archives.apache.org/mod_mbox/flink-user/202108.mbox/%3ccakmsf43j14nkjmgjuy4dh5qn2vbjtw4tfh4pmmuyvcvfhgf...@mail.gmail.com%3E]I
>  paste the content here.
> Setup Specifics:
>  Version: 1.6.2
>  RocksDB Map State
>  Timers stored in rocksdb
>   
>  When we have this job running for long periods of time like > 30 days, if 
> for some reason the job restarts, we encounter "Error while deserializing the 
> element". Is this a known issue fixed in later versions? I see some changes 
> to code for FLINK-10175, but we don't use any queryable state 
>   
>  Below is the stack trace
>   
>  org.apache.flink.util.FlinkRuntimeException: Error while deserializing the 
> element.
> at 
> org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.deserializeElement(RocksDBCachingPriorityQueueSet.java:389)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.peek(RocksDBCachingPriorityQueueSet.java:146)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.peek(RocksDBCachingPriorityQueueSet.java:56)
> at 
> org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue$InternalPriorityQueueComparator.comparePriority(KeyGroupPartitionedPriorityQueue.java:274)
> at 
> org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue$InternalPriorityQueueComparator.comparePriority(KeyGroupPartitionedPriorityQueue.java:261)
> at 
> org.apache.flink.runtime.state.heap.HeapPriorityQueue.isElementPriorityLessThen(HeapPriorityQueue.java:164)
> at 
> org.apache.flink.runtime.state.heap.HeapPriorityQueue.siftUp(HeapPriorityQueue.java:121)
> at 
> org.apache.flink.runtime.state.heap.HeapPriorityQueue.addInternal(HeapPriorityQueue.java:85)
> at 
> org.apache.flink.runtime.state.heap.AbstractHeapPriorityQueue.add(AbstractHeapPriorityQueue.java:73)
> at 
> org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.(KeyGroupPartitionedPriorityQueue.java:89)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBPriorityQueueSetFactory.create(RocksDBKeyedStateBackend.java:2792)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.create(RocksDBKeyedStateBackend.java:450)
> at 
> org.apache.flink.streaming.api.operators.InternalTimeServiceManager.createTimerPriorityQueue(InternalTimeServiceManager.java:121)
> at 
> org.apache.flink.streaming.api.operators.InternalTimeServiceManager.registerOrGetTimerService(InternalTimeServiceManager.java:106)
> at 
> org.apache.flink.streaming.api.operators.InternalTimeServiceManager.getInternalTimerService(InternalTimeServiceManager.java:87)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.getInternalTimerService(AbstractStreamOperator.java:764)
> at 
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.open(KeyedProcessOperator.java:61)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.io.EOFException
> at java.io.DataInputStream.readUnsignedByte(DataInputStream.java:290)
> at org.apache.flink.types.StringValue.readString(StringValue.java:769)
> at 
> org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:69)
> at 
> org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:28)
> at 
> org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:179)
> at 
> org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:46)
> at 
> org.apache.flink.streaming.api.operators.TimerSerializer.deserialize(TimerSerializer.java:168)
> at 
> org.apache.flink.streaming.api.operators.TimerSerializer.deserialize(TimerSerializer.java:45)
> at 
>