[jira] [Commented] (FLINK-13940) S3RecoverableWriter causes job to get stuck in recovery

2019-09-04 Thread Jimmy Weibel Rasmussen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-13940?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16922250#comment-16922250
 ] 

Jimmy Weibel Rasmussen commented on FLINK-13940:


Thank you Kostas!

Thanks for explaining the problems with the state backend solution.

> S3RecoverableWriter causes job to get stuck in recovery
> ---
>
> Key: FLINK-13940
> URL: https://issues.apache.org/jira/browse/FLINK-13940
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / FileSystem
>Affects Versions: 1.8.0, 1.8.1, 1.9.0
>Reporter: Jimmy Weibel Rasmussen
>Assignee: Kostas Kloudas
>Priority: Major
> Fix For: 1.10.0, 1.9.1
>
>
>  
>  The cleaning up of tmp files in S3 introduced by this ticket/PR:
>  https://issues.apache.org/jira/browse/FLINK-10963
>   is preventing the flink job from being able to recover under some 
> circumstances.
>   
>  This is what seems to be happening:
>  When the jobs tries to recover, it will call initializeState() on all 
> operators, which results in the Bucket.restoreInProgressFile method being 
> called.
>  This will download the part_tmp file mentioned in the checkpoint that we're 
> restoring from, and finally it will call fsWriter.cleanupRecoverableState 
> which deletes the part_tmp file in S3.
>   Now the open() method is called on all operators. If the open() call fails 
> for one of the operators (this might happen if the issue that caused the job 
> to fail and restart is still unresolved), the job will fail again and try to 
> restart from the same checkpoint as before. This time however, downloading 
> the part_tmp file mentioned in the checkpoint fails because it was deleted 
> during the last recover attempt.
> The bug is critical because it results in data loss.
>   
>   
>   
>  I discovered the bug because I have a flink job with a RabbitMQ source and a 
> StreamingFileSink that writes to S3 (and therefore uses the 
> S3RecoverableWriter).
>  Occasionally I have some RabbitMQ connection issues which causes the job to 
> fail and restart, sometimes the first few restart attempts fail because 
> rabbitmq is unreachable when flink tries to reconnect.
>   
>  This is what I was seeing:
>  RabbitMQ goes down
>  Job fails because of a RabbitMQ ConsumerCancelledException
>  Job attempts to restart but fails with a Rabbitmq connection exception (x 
> number of times)
>  RabbitMQ is back up
>  Job attempts to restart but fails with a FileNotFoundException due to some 
> _part_tmp file missing in S3.
>   
>  The job will be unable to restart and only option is to cancel and restart 
> the job (and loose all state)
>   
>   
>   



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Commented] (FLINK-13940) S3RecoverableWriter causes job to get stuck in recovery

2019-09-03 Thread Kostas Kloudas (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-13940?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16921434#comment-16921434
 ] 

Kostas Kloudas commented on FLINK-13940:


So I merged the solution of simply not deleting upon recovery 
([FLINK-13941)|https://issues.apache.org/jira/browse/FLINK-13941]. This solves 
the issue without leaving a lot of small files behind.

> S3RecoverableWriter causes job to get stuck in recovery
> ---
>
> Key: FLINK-13940
> URL: https://issues.apache.org/jira/browse/FLINK-13940
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / FileSystem
>Affects Versions: 1.8.0, 1.8.1, 1.9.0
>Reporter: Jimmy Weibel Rasmussen
>Assignee: Kostas Kloudas
>Priority: Blocker
> Fix For: 1.8.2, 1.10.0, 1.9.1
>
>
>  
>  The cleaning up of tmp files in S3 introduced by this ticket/PR:
>  https://issues.apache.org/jira/browse/FLINK-10963
>   is preventing the flink job from being able to recover under some 
> circumstances.
>   
>  This is what seems to be happening:
>  When the jobs tries to recover, it will call initializeState() on all 
> operators, which results in the Bucket.restoreInProgressFile method being 
> called.
>  This will download the part_tmp file mentioned in the checkpoint that we're 
> restoring from, and finally it will call fsWriter.cleanupRecoverableState 
> which deletes the part_tmp file in S3.
>   Now the open() method is called on all operators. If the open() call fails 
> for one of the operators (this might happen if the issue that caused the job 
> to fail and restart is still unresolved), the job will fail again and try to 
> restart from the same checkpoint as before. This time however, downloading 
> the part_tmp file mentioned in the checkpoint fails because it was deleted 
> during the last recover attempt.
> The bug is critical because it results in data loss.
>   
>   
>   
>  I discovered the bug because I have a flink job with a RabbitMQ source and a 
> StreamingFileSink that writes to S3 (and therefore uses the 
> S3RecoverableWriter).
>  Occasionally I have some RabbitMQ connection issues which causes the job to 
> fail and restart, sometimes the first few restart attempts fail because 
> rabbitmq is unreachable when flink tries to reconnect.
>   
>  This is what I was seeing:
>  RabbitMQ goes down
>  Job fails because of a RabbitMQ ConsumerCancelledException
>  Job attempts to restart but fails with a Rabbitmq connection exception (x 
> number of times)
>  RabbitMQ is back up
>  Job attempts to restart but fails with a FileNotFoundException due to some 
> _part_tmp file missing in S3.
>   
>  The job will be unable to restart and only option is to cancel and restart 
> the job (and loose all state)
>   
>   
>   



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Commented] (FLINK-13940) S3RecoverableWriter causes job to get stuck in recovery

2019-09-03 Thread Kostas Kloudas (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-13940?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16921252#comment-16921252
 ] 

Kostas Kloudas commented on FLINK-13940:


I pushed a commit in the PR that keeps the cleaning up of small files for 
intermediate checkpoints but not when we are restoring. 

Please have a look [~jweibel22] and let me know what you think.

 

Buffering in the state backend was a design alternative that we investigated 
when implementing the streaming file sink but we rejected it because at the 
end, these files will have to be copied from the state backend and be sent to 
the final location over the network. This can lead to slow {{commit}} phases,  
especially for big part files, which is also the most common case I would 
assume for block-based storage systems like filesystems. In addition, it 
stresses unnecessarily the state-backend and it could also lead to long queues 
of "pending commits".

> S3RecoverableWriter causes job to get stuck in recovery
> ---
>
> Key: FLINK-13940
> URL: https://issues.apache.org/jira/browse/FLINK-13940
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / FileSystem
>Affects Versions: 1.8.0, 1.8.1, 1.9.0
>Reporter: Jimmy Weibel Rasmussen
>Assignee: Kostas Kloudas
>Priority: Blocker
> Fix For: 1.8.2, 1.10.0, 1.9.1
>
>
>  
>  The cleaning up of tmp files in S3 introduced by this ticket/PR:
>  https://issues.apache.org/jira/browse/FLINK-10963
>   is preventing the flink job from being able to recover under some 
> circumstances.
>   
>  This is what seems to be happening:
>  When the jobs tries to recover, it will call initializeState() on all 
> operators, which results in the Bucket.restoreInProgressFile method being 
> called.
>  This will download the part_tmp file mentioned in the checkpoint that we're 
> restoring from, and finally it will call fsWriter.cleanupRecoverableState 
> which deletes the part_tmp file in S3.
>   Now the open() method is called on all operators. If the open() call fails 
> for one of the operators (this might happen if the issue that caused the job 
> to fail and restart is still unresolved), the job will fail again and try to 
> restart from the same checkpoint as before. This time however, downloading 
> the part_tmp file mentioned in the checkpoint fails because it was deleted 
> during the last recover attempt.
> The bug is critical because it results in data loss.
>   
>   
>   
>  I discovered the bug because I have a flink job with a RabbitMQ source and a 
> StreamingFileSink that writes to S3 (and therefore uses the 
> S3RecoverableWriter).
>  Occasionally I have some RabbitMQ connection issues which causes the job to 
> fail and restart, sometimes the first few restart attempts fail because 
> rabbitmq is unreachable when flink tries to reconnect.
>   
>  This is what I was seeing:
>  RabbitMQ goes down
>  Job fails because of a RabbitMQ ConsumerCancelledException
>  Job attempts to restart but fails with a Rabbitmq connection exception (x 
> number of times)
>  RabbitMQ is back up
>  Job attempts to restart but fails with a FileNotFoundException due to some 
> _part_tmp file missing in S3.
>   
>  The job will be unable to restart and only option is to cancel and restart 
> the job (and loose all state)
>   
>   
>   



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Commented] (FLINK-13940) S3RecoverableWriter causes job to get stuck in recovery

2019-09-02 Thread Jimmy Weibel Rasmussen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-13940?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16921044#comment-16921044
 ] 

Jimmy Weibel Rasmussen commented on FLINK-13940:


I do agree that the solution would have to work for externalized checkpoints 
and savepoints as well, but in those cases wouldn't it be more natural to store 
the tmp data together with the checkpoint/savepoint state, in the checkpoint 
dir / rocksdb? That way the lifecycle would be managed automatically. (I'm new 
to flink, so don't know if this is difficult to achieve...)

I tried setting the requiresCleanupOfRecoverableState to false myself and I 
ended up with a LOT of tmp files in S3 because I have a pretty short checkpoint 
interval. It wastes a lot of space and all of a sudden my sink is full of 
duplicated data. So I have to either deal with the fact that I have duplicated 
data, explicitly ignore tmp files when reading from the bucket or create my own 
process for cleaning up the tmp files. I guess it could work, but it's not 
ideal, and it's definitely not the behaviour you would expect when enabling 
EXACTLY_ONCE checkpointing as a user.

> S3RecoverableWriter causes job to get stuck in recovery
> ---
>
> Key: FLINK-13940
> URL: https://issues.apache.org/jira/browse/FLINK-13940
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / FileSystem
>Affects Versions: 1.8.0, 1.8.1, 1.9.0
>Reporter: Jimmy Weibel Rasmussen
>Assignee: Kostas Kloudas
>Priority: Blocker
> Fix For: 1.8.2, 1.10.0, 1.9.1
>
>
>  
>  The cleaning up of tmp files in S3 introduced by this ticket/PR:
>  https://issues.apache.org/jira/browse/FLINK-10963
>   is preventing the flink job from being able to recover under some 
> circumstances.
>   
>  This is what seems to be happening:
>  When the jobs tries to recover, it will call initializeState() on all 
> operators, which results in the Bucket.restoreInProgressFile method being 
> called.
>  This will download the part_tmp file mentioned in the checkpoint that we're 
> restoring from, and finally it will call fsWriter.cleanupRecoverableState 
> which deletes the part_tmp file in S3.
>   Now the open() method is called on all operators. If the open() call fails 
> for one of the operators (this might happen if the issue that caused the job 
> to fail and restart is still unresolved), the job will fail again and try to 
> restart from the same checkpoint as before. This time however, downloading 
> the part_tmp file mentioned in the checkpoint fails because it was deleted 
> during the last recover attempt.
> The bug is critical because it results in data loss.
>   
>   
>   
>  I discovered the bug because I have a flink job with a RabbitMQ source and a 
> StreamingFileSink that writes to S3 (and therefore uses the 
> S3RecoverableWriter).
>  Occasionally I have some RabbitMQ connection issues which causes the job to 
> fail and restart, sometimes the first few restart attempts fail because 
> rabbitmq is unreachable when flink tries to reconnect.
>   
>  This is what I was seeing:
>  RabbitMQ goes down
>  Job fails because of a RabbitMQ ConsumerCancelledException
>  Job attempts to restart but fails with a Rabbitmq connection exception (x 
> number of times)
>  RabbitMQ is back up
>  Job attempts to restart but fails with a FileNotFoundException due to some 
> _part_tmp file missing in S3.
>   
>  The job will be unable to restart and only option is to cancel and restart 
> the job (and loose all state)
>   
>   
>   



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Commented] (FLINK-13940) S3RecoverableWriter causes job to get stuck in recovery

2019-09-02 Thread Kostas Kloudas (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-13940?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16920892#comment-16920892
 ] 

Kostas Kloudas commented on FLINK-13940:


Actually I think not deleting anything is the safest solution to be sure that 
we play well also with other features like externalized checkpoints, 
savepoints, etc. So I think that the solution will be that we simply do not 
clear up anything. I already have a branch and I will open a PR as soon as 
travis given green.

> S3RecoverableWriter causes job to get stuck in recovery
> ---
>
> Key: FLINK-13940
> URL: https://issues.apache.org/jira/browse/FLINK-13940
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / FileSystem
>Affects Versions: 1.8.0, 1.8.1, 1.9.0
>Reporter: Jimmy Weibel Rasmussen
>Assignee: Kostas Kloudas
>Priority: Blocker
> Fix For: 1.8.2, 1.10.0, 1.9.1
>
>
>  
>  The cleaning up of tmp files in S3 introduced by this ticket/PR:
>  https://issues.apache.org/jira/browse/FLINK-10963
>   is preventing the flink job from being able to recover under some 
> circumstances.
>   
>  This is what seems to be happening:
>  When the jobs tries to recover, it will call initializeState() on all 
> operators, which results in the Bucket.restoreInProgressFile method being 
> called.
>  This will download the part_tmp file mentioned in the checkpoint that we're 
> restoring from, and finally it will call fsWriter.cleanupRecoverableState 
> which deletes the part_tmp file in S3.
>   Now the open() method is called on all operators. If the open() call fails 
> for one of the operators (this might happen if the issue that caused the job 
> to fail and restart is still unresolved), the job will fail again and try to 
> restart from the same checkpoint as before. This time however, downloading 
> the part_tmp file mentioned in the checkpoint fails because it was deleted 
> during the last recover attempt.
> The bug is critical because it results in data loss.
>   
>   
>   
>  I discovered the bug because I have a flink job with a RabbitMQ source and a 
> StreamingFileSink that writes to S3 (and therefore uses the 
> S3RecoverableWriter).
>  Occasionally I have some RabbitMQ connection issues which causes the job to 
> fail and restart, sometimes the first few restart attempts fail because 
> rabbitmq is unreachable when flink tries to reconnect.
>   
>  This is what I was seeing:
>  RabbitMQ goes down
>  Job fails because of a RabbitMQ ConsumerCancelledException
>  Job attempts to restart but fails with a Rabbitmq connection exception (x 
> number of times)
>  RabbitMQ is back up
>  Job attempts to restart but fails with a FileNotFoundException due to some 
> _part_tmp file missing in S3.
>   
>  The job will be unable to restart and only option is to cancel and restart 
> the job (and loose all state)
>   
>   
>   



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Commented] (FLINK-13940) S3RecoverableWriter causes job to get stuck in recovery

2019-09-02 Thread Kostas Kloudas (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-13940?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16920822#comment-16920822
 ] 

Kostas Kloudas commented on FLINK-13940:


I am also leaning towards this solution. At least this can be a first solution. 

A nicer approach would be to delete these small files at the "next" successful 
checkpoint. This implies that Flink has to remember them. I think this is 
doable though.

 

For now I will create a subtask with the solution of not deleting anything. And 
I will leave the main Jira open to remind us that there may be a better 
solution to be implemented.

 

What do you think [~jweibel22]  and [~aljoscha]?

> S3RecoverableWriter causes job to get stuck in recovery
> ---
>
> Key: FLINK-13940
> URL: https://issues.apache.org/jira/browse/FLINK-13940
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / FileSystem
>Affects Versions: 1.8.0, 1.8.1, 1.9.0
>Reporter: Jimmy Weibel Rasmussen
>Assignee: Kostas Kloudas
>Priority: Blocker
> Fix For: 1.8.2, 1.10.0, 1.9.1
>
>
>  
>  The cleaning up of tmp files in S3 introduced by this ticket/PR:
>  https://issues.apache.org/jira/browse/FLINK-10963
>   is preventing the flink job from being able to recover under some 
> circumstances.
>   
>  This is what seems to be happening:
>  When the jobs tries to recover, it will call initializeState() on all 
> operators, which results in the Bucket.restoreInProgressFile method being 
> called.
>  This will download the part_tmp file mentioned in the checkpoint that we're 
> restoring from, and finally it will call fsWriter.cleanupRecoverableState 
> which deletes the part_tmp file in S3.
>   Now the open() method is called on all operators. If the open() call fails 
> for one of the operators (this might happen if the issue that caused the job 
> to fail and restart is still unresolved), the job will fail again and try to 
> restart from the same checkpoint as before. This time however, downloading 
> the part_tmp file mentioned in the checkpoint fails because it was deleted 
> during the last recover attempt.
> The bug is critical because it results in data loss.
>   
>   
>   
>  I discovered the bug because I have a flink job with a RabbitMQ source and a 
> StreamingFileSink that writes to S3 (and therefore uses the 
> S3RecoverableWriter).
>  Occasionally I have some RabbitMQ connection issues which causes the job to 
> fail and restart, sometimes the first few restart attempts fail because 
> rabbitmq is unreachable when flink tries to reconnect.
>   
>  This is what I was seeing:
>  RabbitMQ goes down
>  Job fails because of a RabbitMQ ConsumerCancelledException
>  Job attempts to restart but fails with a Rabbitmq connection exception (x 
> number of times)
>  RabbitMQ is back up
>  Job attempts to restart but fails with a FileNotFoundException due to some 
> _part_tmp file missing in S3.
>   
>  The job will be unable to restart and only option is to cancel and restart 
> the job (and loose all state)
>   
>   
>   



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Commented] (FLINK-13940) S3RecoverableWriter causes job to get stuck in recovery

2019-09-02 Thread Aljoscha Krettek (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-13940?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16920800#comment-16920800
 ] 

Aljoscha Krettek commented on FLINK-13940:
--

I think the solution is to never clean up anything in the recover code-path. 
However, this would mean that we will have those lingering small files when we 
have a failure->recovery cycle. This should be preferable to data loss, I think.

Are there any other opinions?

> S3RecoverableWriter causes job to get stuck in recovery
> ---
>
> Key: FLINK-13940
> URL: https://issues.apache.org/jira/browse/FLINK-13940
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / FileSystem
>Affects Versions: 1.8.0, 1.8.1, 1.9.0
>Reporter: Jimmy Weibel Rasmussen
>Priority: Blocker
> Fix For: 1.8.2, 1.10.0, 1.9.1
>
>
>  
>  The cleaning up of tmp files in S3 introduced by this ticket/PR:
>  https://issues.apache.org/jira/browse/FLINK-10963
>   is preventing the flink job from being able to recover under some 
> circumstances.
>   
>  This is what seems to be happening:
>  When the jobs tries to recover, it will call initializeState() on all 
> operators, which results in the Bucket.restoreInProgressFile method being 
> called.
>  This will download the part_tmp file mentioned in the checkpoint that we're 
> restoring from, and finally it will call fsWriter.cleanupRecoverableState 
> which deletes the part_tmp file in S3.
>   Now the open() method is called on all operators. If the open() call fails 
> for one of the operators (this might happen if the issue that caused the job 
> to fail and restart is still unresolved), the job will fail again and try to 
> restart from the same checkpoint as before. This time however, downloading 
> the part_tmp file mentioned in the checkpoint fails because it was deleted 
> during the last recover attempt.
> The bug is critical because it results in data loss.
>   
>   
>   
>  I discovered the bug because I have a flink job with a RabbitMQ source and a 
> StreamingFileSink that writes to S3 (and therefore uses the 
> S3RecoverableWriter).
>  Occasionally I have some RabbitMQ connection issues which causes the job to 
> fail and restart, sometimes the first few restart attempts fail because 
> rabbitmq is unreachable when flink tries to reconnect.
>   
>  This is what I was seeing:
>  RabbitMQ goes down
>  Job fails because of a RabbitMQ ConsumerCancelledException
>  Job attempts to restart but fails with a Rabbitmq connection exception (x 
> number of times)
>  RabbitMQ is back up
>  Job attempts to restart but fails with a FileNotFoundException due to some 
> _part_tmp file missing in S3.
>   
>  The job will be unable to restart and only option is to cancel and restart 
> the job (and loose all state)
>   
>   
>   



--
This message was sent by Atlassian Jira
(v8.3.2#803003)