[jira] [Updated] (FLINK-34325) Inconsistent state with data loss after OutOfMemoryError

2024-02-02 Thread Alexis Sarda-Espinosa (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34325?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alexis Sarda-Espinosa updated FLINK-34325:
--
Description: 
I have a job that uses broadcast state to maintain a cache of required 
metadata. I am currently evaluating memory requirements of my specific use 
case, and I ran into a weird situation that seems worrisome.

All sources in my job are Kafka sources. I wrote a large amount of messages in 
Kafka to force the broadcast state's cache to grow. At some point, this caused 
an "{{java.lang.OutOfMemoryError: Java heap space}}" error in the -Job- Task 
Manager. I would have expected the whole java process of the TM to crash, but 
the job was simply restarted. What's worrisome is that, after 2 checkpoint 
failures ^1^, the job restarted and subsequently resumed from the latest 
successful checkpoint and completely ignored all the events I wrote to Kafka, 
which I can verify because I have a custom metric that exposes the approximate 
size of this cache, and the fact that the job didn't crashloop at this point 
after reading all the messages from Kafka over and over again.

I'm attaching an excerpt of the Job Manager's logs. My main concerns are:

# It seems the memory error somehow prevented the Kafka offsets from being 
"rolled back", so eventually the Kafka events that should have ended in the 
broadcast state's cache were ignored.
# -Is it normal that the state is somehow "materialized" in the JM and is thus 
affected by the size of the JM's heap? Is this something particular due to the 
use of broadcast state? I found this very surprising.- See comments

^1^ Two failures are expected since 
{{execution.checkpointing.tolerable-failed-checkpoints=1}}

  was:
I have a job that uses broadcast state to maintain a cache of required 
metadata. I am currently evaluating memory requirements of my specific use 
case, and I ran into a weird situation that seems worrisome.

All sources in my job are Kafka sources. I wrote a large amount of messages in 
Kafka to force the broadcast state's cache to grow. At some point, this caused 
an "{{java.lang.OutOfMemoryError: Java heap space}}" error in the -Job- Task 
Manager. I would have expected the whole java process of the TM to crash, but 
the job was simply restarted. What's worrisome is that, after 2 checkpoint 
failures ^1^, the job restarted and subsequently resumed from the latest 
successful checkpoint and completely ignored all the events I wrote to Kafka, 
which I can verify because I have a custom metric that exposes the approximate 
size of this cache, and the fact that the job didn't crashloop at this point 
after reading all the messages from Kafka over and over again.

I'm attaching an excerpt of the Job Manager's logs. My main concerns are:

# It seems the memory error from the JM didn't prevent the Kafka offsets from 
being "rolled back", so eventually the Kafka events that should have ended in 
the broadcast state's cache were ignored.
# -Is it normal that the state is somehow "materialized" in the JM and is thus 
affected by the size of the JM's heap? Is this something particular due to the 
use of broadcast state? I found this very surprising.- See comments

^1^ Two failures are expected since 
{{execution.checkpointing.tolerable-failed-checkpoints=1}}


> Inconsistent state with data loss after OutOfMemoryError
> 
>
> Key: FLINK-34325
> URL: https://issues.apache.org/jira/browse/FLINK-34325
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.17.1
> Environment: Flink on Kubernetes with HA, RocksDB with incremental 
> checkpoints on Azure
>Reporter: Alexis Sarda-Espinosa
>Priority: Major
> Attachments: jobmanager_log.txt
>
>
> I have a job that uses broadcast state to maintain a cache of required 
> metadata. I am currently evaluating memory requirements of my specific use 
> case, and I ran into a weird situation that seems worrisome.
> All sources in my job are Kafka sources. I wrote a large amount of messages 
> in Kafka to force the broadcast state's cache to grow. At some point, this 
> caused an "{{java.lang.OutOfMemoryError: Java heap space}}" error in the 
> -Job- Task Manager. I would have expected the whole java process of the TM to 
> crash, but the job was simply restarted. What's worrisome is that, after 2 
> checkpoint failures ^1^, the job restarted and subsequently resumed from the 
> latest successful checkpoint and completely ignored all the events I wrote to 
> Kafka, which I can verify because I have a custom metric that exposes the 
> approximate size of this cache, and the fact that the job didn't crashloop at 
> this point after reading all the messages from Kafka over and over again.
> I'm attaching an excerpt of the Job Manager's logs. My main concerns 

[jira] [Updated] (FLINK-34325) Inconsistent state with data loss after OutOfMemoryError

2024-02-02 Thread Alexis Sarda-Espinosa (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34325?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alexis Sarda-Espinosa updated FLINK-34325:
--
Description: 
I have a job that uses broadcast state to maintain a cache of required 
metadata. I am currently evaluating memory requirements of my specific use 
case, and I ran into a weird situation that seems worrisome.

All sources in my job are Kafka sources. I wrote a large amount of messages in 
Kafka to force the broadcast state's cache to grow. At some point, this caused 
an "{{java.lang.OutOfMemoryError: Java heap space}}" error in the -Job- Task 
Manager. I would have expected the whole java process of the TM to crash, but 
the job was simply restarted. What's worrisome is that, after 2 checkpoint 
failures ^1^, the job restarted and subsequently resumed from the latest 
successful checkpoint and completely ignored all the events I wrote to Kafka, 
which I can verify because I have a custom metric that exposes the approximate 
size of this cache, and the fact that the job didn't crashloop at this point 
after reading all the messages from Kafka over and over again.

I'm attaching an excerpt of the Job Manager's logs. My main concerns are:

# It seems the memory error from the JM didn't prevent the Kafka offsets from 
being "rolled back", so eventually the Kafka events that should have ended in 
the broadcast state's cache were ignored.
# -Is it normal that the state is somehow "materialized" in the JM and is thus 
affected by the size of the JM's heap? Is this something particular due to the 
use of broadcast state? I found this very surprising.- See comments

^1^ Two failures are expected since 
{{execution.checkpointing.tolerable-failed-checkpoints=1}}

  was:
I have a job that uses broadcast state to maintain a cache of required 
metadata. I am currently evaluating memory requirements of my specific use 
case, and I ran into a weird situation that seems worrisome.

All sources in my job are Kafka sources. I wrote a large amount of messages in 
Kafka to force the broadcast state's cache to grow. At some point, this caused 
an "{{java.lang.OutOfMemoryError: Java heap space}}" error in the Job Manager. 
I would have expected the whole java process of the JM to crash, but the job 
was simply restarted. What's worrisome is that, after 2 checkpoint failures 
^1^, the job restarted and subsequently resumed from the latest successful 
checkpoint and completely ignored all the events I wrote to Kafka, which I can 
verify because I have a custom metric that exposes the approximate size of this 
cache, and the fact that the job didn't crashloop at this point after reading 
all the messages from Kafka over and over again.

I'm attaching an excerpt of the Job Manager's logs. My main concerns are:

# It seems the memory error from the JM didn't prevent the Kafka offsets from 
being "rolled back", so eventually the Kafka events that should have ended in 
the broadcast state's cache were ignored.
# -Is it normal that the state is somehow "materialized" in the JM and is thus 
affected by the size of the JM's heap? Is this something particular due to the 
use of broadcast state? I found this very surprising.- See comments

^1^ Two failures are expected since 
{{execution.checkpointing.tolerable-failed-checkpoints=1}}


> Inconsistent state with data loss after OutOfMemoryError
> 
>
> Key: FLINK-34325
> URL: https://issues.apache.org/jira/browse/FLINK-34325
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.17.1
> Environment: Flink on Kubernetes with HA, RocksDB with incremental 
> checkpoints on Azure
>Reporter: Alexis Sarda-Espinosa
>Priority: Major
> Attachments: jobmanager_log.txt
>
>
> I have a job that uses broadcast state to maintain a cache of required 
> metadata. I am currently evaluating memory requirements of my specific use 
> case, and I ran into a weird situation that seems worrisome.
> All sources in my job are Kafka sources. I wrote a large amount of messages 
> in Kafka to force the broadcast state's cache to grow. At some point, this 
> caused an "{{java.lang.OutOfMemoryError: Java heap space}}" error in the 
> -Job- Task Manager. I would have expected the whole java process of the TM to 
> crash, but the job was simply restarted. What's worrisome is that, after 2 
> checkpoint failures ^1^, the job restarted and subsequently resumed from the 
> latest successful checkpoint and completely ignored all the events I wrote to 
> Kafka, which I can verify because I have a custom metric that exposes the 
> approximate size of this cache, and the fact that the job didn't crashloop at 
> this point after reading all the messages from Kafka over and over again.
> I'm attaching an excerpt of the Job Manager's logs. My main 

[jira] [Updated] (FLINK-34325) Inconsistent state with data loss after OutOfMemoryError

2024-02-01 Thread Alexis Sarda-Espinosa (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34325?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alexis Sarda-Espinosa updated FLINK-34325:
--
Description: 
I have a job that uses broadcast state to maintain a cache of required 
metadata. I am currently evaluating memory requirements of my specific use 
case, and I ran into a weird situation that seems worrisome.

All sources in my job are Kafka sources. I wrote a large amount of messages in 
Kafka to force the broadcast state's cache to grow. At some point, this caused 
an "{{java.lang.OutOfMemoryError: Java heap space}}" error in the Job Manager. 
I would have expected the whole java process of the JM to crash, but the job 
was simply restarted. What's worrisome is that, after 2 checkpoint failures 
^1^, the job restarted and subsequently resumed from the latest successful 
checkpoint and completely ignored all the events I wrote to Kafka, which I can 
verify because I have a custom metric that exposes the approximate size of this 
cache, and the fact that the job didn't crashloop at this point after reading 
all the messages from Kafka over and over again.

I'm attaching an excerpt of the Job Manager's logs. My main concerns are:

# It seems the memory error from the JM didn't prevent the Kafka offsets from 
being "rolled back", so eventually the Kafka events that should have ended in 
the broadcast state's cache were ignored.
# -Is it normal that the state is somehow "materialized" in the JM and is thus 
affected by the size of the JM's heap? Is this something particular due to the 
use of broadcast state? I found this very surprising.- See comments

^1^ Two failures are expected since 
{{execution.checkpointing.tolerable-failed-checkpoints=1}}

  was:
I have a job that uses broadcast state to maintain a cache of required 
metadata. I am currently evaluating memory requirements of my specific use 
case, and I ran into a weird situation that seems worrisome.

All sources in my job are Kafka sources. I wrote a large amount of messages in 
Kafka to force the broadcast state's cache to grow. At some point, this caused 
an "{{java.lang.OutOfMemoryError: Java heap space}}" error in the Job Manager. 
I would have expected the whole java process of the JM to crash, but the job 
was simply restarted. What's worrisome is that, after 2 restarts, the job 
resumed from the latest successful checkpoint and completely ignored all the 
events I wrote to Kafka, which I can verify because I have a custom metric that 
exposes the approximate size of this cache, and the fact that the job didn't 
crashloop at this point after reading all the messages from Kafka over and over 
again.

I'm attaching an excerpt of the Job Manager's logs. My main concerns are:

# It seems the memory error from the JM didn't prevent the Kafka offsets from 
being "rolled back", so eventually the Kafka events that should have ended in 
the broadcast state's cache were ignored.
# Is it normal that the state is somehow "materialized" in the JM and is thus 
affected by the size of the JM's heap? Is this something particular due to the 
use of broadcast state? I found this very surprising.


> Inconsistent state with data loss after OutOfMemoryError
> 
>
> Key: FLINK-34325
> URL: https://issues.apache.org/jira/browse/FLINK-34325
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.17.1
> Environment: Flink on Kubernetes with HA, RocksDB with incremental 
> checkpoints on Azure
>Reporter: Alexis Sarda-Espinosa
>Priority: Major
> Attachments: jobmanager_log.txt
>
>
> I have a job that uses broadcast state to maintain a cache of required 
> metadata. I am currently evaluating memory requirements of my specific use 
> case, and I ran into a weird situation that seems worrisome.
> All sources in my job are Kafka sources. I wrote a large amount of messages 
> in Kafka to force the broadcast state's cache to grow. At some point, this 
> caused an "{{java.lang.OutOfMemoryError: Java heap space}}" error in the Job 
> Manager. I would have expected the whole java process of the JM to crash, but 
> the job was simply restarted. What's worrisome is that, after 2 checkpoint 
> failures ^1^, the job restarted and subsequently resumed from the latest 
> successful checkpoint and completely ignored all the events I wrote to Kafka, 
> which I can verify because I have a custom metric that exposes the 
> approximate size of this cache, and the fact that the job didn't crashloop at 
> this point after reading all the messages from Kafka over and over again.
> I'm attaching an excerpt of the Job Manager's logs. My main concerns are:
> # It seems the memory error from the JM didn't prevent the Kafka offsets from 
> being "rolled back", so eventually the Kafka events that should have ended in 

[jira] [Updated] (FLINK-34325) Inconsistent state with data loss after OutOfMemoryError

2024-01-31 Thread Alexis Sarda-Espinosa (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34325?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alexis Sarda-Espinosa updated FLINK-34325:
--
Summary: Inconsistent state with data loss after OutOfMemoryError  (was: 
Inconsistent state with data loss after OutOfMemoryError in Job Manager)

> Inconsistent state with data loss after OutOfMemoryError
> 
>
> Key: FLINK-34325
> URL: https://issues.apache.org/jira/browse/FLINK-34325
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.17.1
> Environment: Flink on Kubernetes with HA, RocksDB with incremental 
> checkpoints on Azure
>Reporter: Alexis Sarda-Espinosa
>Priority: Major
> Attachments: jobmanager_log.txt
>
>
> I have a job that uses broadcast state to maintain a cache of required 
> metadata. I am currently evaluating memory requirements of my specific use 
> case, and I ran into a weird situation that seems worrisome.
> All sources in my job are Kafka sources. I wrote a large amount of messages 
> in Kafka to force the broadcast state's cache to grow. At some point, this 
> caused an "{{java.lang.OutOfMemoryError: Java heap space}}" error in the Job 
> Manager. I would have expected the whole java process of the JM to crash, but 
> the job was simply restarted. What's worrisome is that, after 2 restarts, the 
> job resumed from the latest successful checkpoint and completely ignored all 
> the events I wrote to Kafka, which I can verify because I have a custom 
> metric that exposes the approximate size of this cache, and the fact that the 
> job didn't crashloop at this point after reading all the messages from Kafka 
> over and over again.
> I'm attaching an excerpt of the Job Manager's logs. My main concerns are:
> # It seems the memory error from the JM didn't prevent the Kafka offsets from 
> being "rolled back", so eventually the Kafka events that should have ended in 
> the broadcast state's cache were ignored.
> # Is it normal that the state is somehow "materialized" in the JM and is thus 
> affected by the size of the JM's heap? Is this something particular due to 
> the use of broadcast state? I found this very surprising.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)