[jira] [Comment Edited] (FLINK-28984) FsCheckpointStateOutputStream is not being released normally

2022-10-13 Thread ChangjiGuo (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17616973#comment-17616973
 ] 

ChangjiGuo edited comment on FLINK-28984 at 10/13/22 11:29 AM:
---

Hi [~Yanfei Lei], thanks for your reply! Yes, that's what I want to express! 
 


was (Author: changjiguo):
Hi [~Yanfei Lei], thans for your reply! Yes, that's what I want to express! 
 

> FsCheckpointStateOutputStream is not being released normally
> 
>
> Key: FLINK-28984
> URL: https://issues.apache.org/jira/browse/FLINK-28984
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.11.6, 1.15.1
>Reporter: ChangjiGuo
>Priority: Major
>  Labels: pull-request-available
> Attachments: log.png
>
>
> If the checkpoint is aborted, AsyncSnapshotCallable will close the 
> snapshotCloseableRegistry when it is canceled. There may be two situations 
> here:
>  # The FSDataOutputStream has been created and closed while closing 
> FsCheckpointStateOutputStream.
>  # The FSDataOutputStream has not been created yet, but closed flag has been 
> set to true. You can see this in log:
> {code:java}
> 2022-08-16 12:55:44,161 WARN  
> org.apache.flink.core.fs.SafetyNetCloseableRegistry   - Closing 
> unclosed resource via safety-net: 
> ClosingFSDataOutputStream(org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream@4ebe8e64)
>  : 
> x/flink/checkpoint/state/9214a2e302904b14baf2dc1aacbc7933/ae157c5a05a8922a46a179cdb4c86b10/shared/9d8a1e92-2f69-4ab0-8ce9-c1beb149229a
>  {code}
>         The output stream will be automatically closed by the 
> SafetyNetCloseableRegistry but the file will not be deleted.
> The second case usually occurs when the storage system has high latency in 
> creating files.
> How to reproduce?
> This is not easy to reproduce, but you can try to set a smaller checkpoint 
> timeout and increase the parallelism of the flink job.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-28984) FsCheckpointStateOutputStream is not being released normally

2022-10-13 Thread ChangjiGuo (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17616973#comment-17616973
 ] 

ChangjiGuo commented on FLINK-28984:


Hi [~Yanfei Lei], thans for your reply! Yes, that's what I want to express! 
 

> FsCheckpointStateOutputStream is not being released normally
> 
>
> Key: FLINK-28984
> URL: https://issues.apache.org/jira/browse/FLINK-28984
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.11.6, 1.15.1
>Reporter: ChangjiGuo
>Priority: Major
>  Labels: pull-request-available
> Attachments: log.png
>
>
> If the checkpoint is aborted, AsyncSnapshotCallable will close the 
> snapshotCloseableRegistry when it is canceled. There may be two situations 
> here:
>  # The FSDataOutputStream has been created and closed while closing 
> FsCheckpointStateOutputStream.
>  # The FSDataOutputStream has not been created yet, but closed flag has been 
> set to true. You can see this in log:
> {code:java}
> 2022-08-16 12:55:44,161 WARN  
> org.apache.flink.core.fs.SafetyNetCloseableRegistry   - Closing 
> unclosed resource via safety-net: 
> ClosingFSDataOutputStream(org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream@4ebe8e64)
>  : 
> x/flink/checkpoint/state/9214a2e302904b14baf2dc1aacbc7933/ae157c5a05a8922a46a179cdb4c86b10/shared/9d8a1e92-2f69-4ab0-8ce9-c1beb149229a
>  {code}
>         The output stream will be automatically closed by the 
> SafetyNetCloseableRegistry but the file will not be deleted.
> The second case usually occurs when the storage system has high latency in 
> creating files.
> How to reproduce?
> This is not easy to reproduce, but you can try to set a smaller checkpoint 
> timeout and increase the parallelism of the flink job.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-28927) Can not clean up the uploaded shared files when the checkpoint fails

2022-10-13 Thread ChangjiGuo (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17616965#comment-17616965
 ] 

ChangjiGuo commented on FLINK-28927:


Hi, [~yunta]! Can you help me review this idea in your free time? Thanks.

> Can not clean up the uploaded shared files when the checkpoint fails
> 
>
> Key: FLINK-28927
> URL: https://issues.apache.org/jira/browse/FLINK-28927
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Affects Versions: 1.11.6, 1.15.1
> Environment: Flink-1.11
>Reporter: ChangjiGuo
>Assignee: ChangjiGuo
>Priority: Major
>
> If a checkpoint times out, the task will cancel all snapshot futures and do 
> some cleanup work, including the following:
>  * Cancel all AsyncSnapshotTasks.
>  * If the future has finished, it will clean up all state object.
>  * If the future has not completed, it will be interrupted(maybe).
>  * Close snapshotCloseableRegistry.
> In my case, the thread was interrupted while waiting for the file upload to 
> complete, but the file was not cleaned up.
> RocksDBStateUploader.java
> {code:java}
> FutureUtils.waitForAll(futures.values()).get();
> {code}
> It will wait for all files to be uploaded here. Although it has been 
> interrupted, the uploaded files will not be cleaned up. The remaining files 
> are mainly divided into:
>  * Files that have finished uploading before the thread is canceled.
>  * outputStream.closeAndGetHandle() is called, but snapshotCloseableRegistry 
> has not been closed.
> How to reproduce?
> Shorten the checkpoint timeout time, making the checkpoint fail. Then check 
> if there are any files in the shared directory.
> I'm testing on Flink-1.11, but I found the code from the latest branch may 
> have the same problem. I tried to fix it.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-28927) Can not clean up the uploaded shared files when the checkpoint fails

2022-08-29 Thread ChangjiGuo (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17597196#comment-17597196
 ] 

ChangjiGuo commented on FLINK-28927:


Hi [~yunta]! I verified the above idea on Flink-1.15.1 and tested the case of 
continuous checkpoint times out, it can clean up the files that TMs have not 
reported to the JM, and the remaining files will not be cleaned up by JM 
because of issue: FLINK-24611.

> Can not clean up the uploaded shared files when the checkpoint fails
> 
>
> Key: FLINK-28927
> URL: https://issues.apache.org/jira/browse/FLINK-28927
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Affects Versions: 1.11.6, 1.15.1
> Environment: Flink-1.11
>Reporter: ChangjiGuo
>Assignee: ChangjiGuo
>Priority: Major
>
> If a checkpoint times out, the task will cancel all snapshot futures and do 
> some cleanup work, including the following:
>  * Cancel all AsyncSnapshotTasks.
>  * If the future has finished, it will clean up all state object.
>  * If the future has not completed, it will be interrupted(maybe).
>  * Close snapshotCloseableRegistry.
> In my case, the thread was interrupted while waiting for the file upload to 
> complete, but the file was not cleaned up.
> RocksDBStateUploader.java
> {code:java}
> FutureUtils.waitForAll(futures.values()).get();
> {code}
> It will wait for all files to be uploaded here. Although it has been 
> interrupted, the uploaded files will not be cleaned up. The remaining files 
> are mainly divided into:
>  * Files that have finished uploading before the thread is canceled.
>  * outputStream.closeAndGetHandle() is called, but snapshotCloseableRegistry 
> has not been closed.
> How to reproduce?
> Shorten the checkpoint timeout time, making the checkpoint fail. Then check 
> if there are any files in the shared directory.
> I'm testing on Flink-1.11, but I found the code from the latest branch may 
> have the same problem. I tried to fix it.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-28927) Can not clean up the uploaded shared files when the checkpoint fails

2022-08-29 Thread ChangjiGuo (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17580261#comment-17580261
 ] 

ChangjiGuo edited comment on FLINK-28927 at 8/29/22 1:06 PM:
-

[~yunta] Thanks for your reply! I sorted out my thoughts, we can clean up 
uploaded files in two ways:
 # For being interrupted or getting a unexpected exception while waiting for 
the future to complete, we can catch exception, and then set a callback for 
each future and discard the stream state handle(as mentioned above).
 # For uninterrupted case, that is, the files have been uploaded, but the 
AsyncSnapshotTask is still running when canceled. First, we need to collect all 
uploaded files, which can be cleaned up according to whether the 
AsyncSnapshotTask is canceled. But there is a uncertainty problem that we 
cannot ensure the order of getting value of {_}isCancelled(){_}(if true, we 
will delete these files) and calling _stateFuture.cancel(true)._ That is to 
say, we can't handle it in _FutureTask#run, and_ have to wait until the run 
method finishes. I have an idea that we can override the _FutureTask#set_ 
method and check if it can be discarded.
{code:java}
public class AsyncSnapshotTask extends FutureTask {

@Override
protected void set(T t) {
super.set(t);
if (isCancelled()) {
if (t instanceof SnapshotResult) {
try {
((SnapshotResult) t).discardState();
} catch (Exception e) {
LOG.warn("clean this occured error", e);
}
}
} else {
// super.set(t) has modified internal state, cancel will return 
false, StateUtil#discardStateFuture will clean it.
}
}
}{code}

I pasted part of the code of the _RocksDBIncrementalSnapshotOperation#get_ as 
follows:
{code:java}
@Override
public SnapshotResult get(CloseableRegistry 
snapshotCloseableRegistry)
throws Exception {

boolean completed = false;


final Map sstFiles = new HashMap<>();
// Handles to the misc files in the current snapshot will go here
final Map miscFiles = new HashMap<>();

try {



uploadSstFiles(sstFiles, miscFiles, snapshotCloseableRegistry);




completed = true;

return snapshotResult;
} finally {
if (!completed) {
final List statesToDiscard =
new ArrayList<>(1 + miscFiles.size() + sstFiles.size());
statesToDiscard.add(metaStateHandle);
statesToDiscard.addAll(miscFiles.values());
statesToDiscard.addAll(sstFiles.values());
cleanupIncompleteSnapshot(statesToDiscard);
}
}{code}
Here are some cases I can think of:
 # Interrupted while uploading sst files, and no misc file uploaded yet, the 
first way can clean up uploaded sst files and delete the file while waiting for 
the unfinished future to complete.
 # Interrupted while uploading misc files, _sstFiles_ already contains all 
uploaded sst files and it will be cleaned because of completed is false. The 
first way also can clean up uploaded misc files and delete the file while 
waiting for the unfinished future to complete too.
 # Both sst files and misc files have been uploaded, but the AsyncSnapshotTasks 
is cancelled(can't actually be interrupted), the _FutureTask#run_ will execute 
normally. The second way can clean up all state files.

I'm Looking forward for your reply! Thx.
 
 


was (Author: changjiguo):
[~yunta] Thanks for your reply! I sorted out my thoughts, we can clean up 
uploaded files in two ways:
 # For being interrupted or getting a unexpected exception while waiting for 
the future to complete, we can catch exception, and then set a callback for 
each future and discard the stream state handle(as mentioned above).
 # For uninterrupted case, that is, the files have been uploaded, but the 
AsyncSnapshotTask is still running when canceled. First, we need to collect all 
uploaded files, which can be cleaned up according to whether the 
AsyncSnapshotTask is canceled. But there is a uncertainty problem that we 
cannot ensure the order of getting value of {_}isCancelled(){_}(if true, we 
will delete these files) and calling _stateFuture.cancel(true)._ That is to 
say, we can't handle it in _FutureTask#run, and_ have to wait until the run 
method finishes. I have an idea that we can override the _FutureTask#set_ 
method and check if it can be discarded.
{code:java}
public class AsyncSnapshotTask extends FutureTask {

@Override
protected void set(T t) {
super.set(t);
if (isCancelled()) {
if (t instanceof SnapshotResult) {
try {
((SnapshotResult) t).discardState();
} catch (Exception e) {
LOG.warn("clean this occured error", 

[jira] [Comment Edited] (FLINK-28927) Can not clean up the uploaded shared files when the checkpoint fails

2022-08-29 Thread ChangjiGuo (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17580261#comment-17580261
 ] 

ChangjiGuo edited comment on FLINK-28927 at 8/29/22 1:03 PM:
-

[~yunta] Thanks for your reply! I sorted out my thoughts, we can clean up 
uploaded files in two ways:
 # For being interrupted or getting a unexpected exception while waiting for 
the future to complete, we can catch exception, and then set a callback for 
each future and discard the stream state handle(as mentioned above).
 # For uninterrupted case, that is, the files have been uploaded, but the 
AsyncSnapshotTask is still running when canceled. First, we need to collect all 
uploaded files, which can be cleaned up according to whether the 
AsyncSnapshotTask is canceled. But there is a uncertainty problem that we 
cannot ensure the order of getting value of {_}isCancelled(){_}(if true, we 
will delete these files) and calling _stateFuture.cancel(true)._ That is to 
say, we can't handle it in _FutureTask#run, and_ have to wait until the run 
method finishes. I have an idea that we can override the _FutureTask#set_ 
method and check if it can be discarded.
{code:java}
public class AsyncSnapshotTask extends FutureTask {

@Override
protected void set(T t) {
super.set(t);
if (isCancelled()) {
if (t instanceof SnapshotResult) {
try {
((SnapshotResult) t).discardState();
} catch (Exception e) {
LOG.warn("clean this occured error", e);
}
}
} else {
// super.set(t) has modified internal state, cancel will return 
false, StateUtil will clean it.
}
}
}{code}

I pasted part of the code of the _RocksDBIncrementalSnapshotOperation#get_ as 
follows:
{code:java}
@Override
public SnapshotResult get(CloseableRegistry 
snapshotCloseableRegistry)
throws Exception {

boolean completed = false;


final Map sstFiles = new HashMap<>();
// Handles to the misc files in the current snapshot will go here
final Map miscFiles = new HashMap<>();

try {



uploadSstFiles(sstFiles, miscFiles, snapshotCloseableRegistry);




completed = true;

return snapshotResult;
} finally {
if (!completed) {
final List statesToDiscard =
new ArrayList<>(1 + miscFiles.size() + sstFiles.size());
statesToDiscard.add(metaStateHandle);
statesToDiscard.addAll(miscFiles.values());
statesToDiscard.addAll(sstFiles.values());
cleanupIncompleteSnapshot(statesToDiscard);
}
}{code}
Here are some cases I can think of:
 # Interrupted while uploading sst files, and no misc file uploaded yet, the 
first way can clean up uploaded sst files and delete the file while waiting for 
the unfinished future to complete.
 # Interrupted while uploading misc files, _sstFiles_ already contains all 
uploaded sst files and it will be cleaned because of completed is false. The 
first way also can clean up uploaded misc files and delete the file while 
waiting for the unfinished future to complete too.
 # Both sst files and misc files have been uploaded, but the AsyncSnapshotTasks 
is cancelled(can't actually be interrupted), the _FutureTask#run_ will execute 
normally. The second way can clean up all state files.

I'm Looking forward for your reply! Thx.
 
 


was (Author: changjiguo):
[~yunta] Thanks for your reply! I sorted out my thoughts, we can clean up 
uploaded files in two ways:
 # For being interrupted or getting a unexpected exception while waiting for 
the future to complete, we can catch exception, and then set a callback for 
each future and discard the stream state handle(as mentioned above).
 # For uninterrupted case, that is, the files have been uploaded, but the 
AsyncSnapshotTask is still running when canceled. First, we need to collect all 
uploaded files, which can be cleaned up according to whether the 
AsyncSnapshotTask is canceled. But there is a uncertainty problem that we 
cannot ensure the order of getting value of {_}isCancelled(){_}(if true, we 
will delete these files) and calling {_}stateFuture.cancel(true){_}, so we must 
clean up state after future is really done. The solution that comes to my mind 
is that we can discard state at _AsyncCheckpointRunnable#run_ in finally block 
according to whether _asyncCheckpointState_ is 
{_}AsyncCheckpointState.DISCARDED{_}.

I pasted part of the code of the _RocksDBIncrementalSnapshotOperation#get_ as 
follows:
{code:java}
@Override
public SnapshotResult get(CloseableRegistry 
snapshotCloseableRegistry)
throws Exception {

boolean completed = false;


final Map sstFiles = new HashMap<>();
// Handles to the misc files in the current 

[jira] [Comment Edited] (FLINK-28984) FsCheckpointStateOutputStream is not being released normally

2022-08-18 Thread ChangjiGuo (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17581190#comment-17581190
 ] 

ChangjiGuo edited comment on FLINK-28984 at 8/18/22 7:48 AM:
-

Hi [~yunta], I have verified on Flink-1.15.1.

My fix is to check if _FsCheckpointStateOutputStream_ has been closed after 
creating the output stream and clean up(include closing stream and deleting 
file) if closed. It works well and without the above logs.

Can you take a look if you have time? Thx.


was (Author: changjiguo):
Hi [~yunta], I have verified on Flink-1.15.1.

My fix is to check if _FsCheckpointStateOutputStream_ has been closed after 
creating the output stream and clean up if closed. It works well and without 
the above logs.

Can you take a look if you have time? Thx.

> FsCheckpointStateOutputStream is not being released normally
> 
>
> Key: FLINK-28984
> URL: https://issues.apache.org/jira/browse/FLINK-28984
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.11.6, 1.15.1
>Reporter: ChangjiGuo
>Priority: Major
> Attachments: log.png
>
>
> If the checkpoint is aborted, AsyncSnapshotCallable will close the 
> snapshotCloseableRegistry when it is canceled. There may be two situations 
> here:
>  # The FSDataOutputStream has been created and closed while closing 
> FsCheckpointStateOutputStream.
>  # The FSDataOutputStream has not been created yet, but closed flag has been 
> set to true. You can see this in log:
> {code:java}
> 2022-08-16 12:55:44,161 WARN  
> org.apache.flink.core.fs.SafetyNetCloseableRegistry   - Closing 
> unclosed resource via safety-net: 
> ClosingFSDataOutputStream(org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream@4ebe8e64)
>  : 
> x/flink/checkpoint/state/9214a2e302904b14baf2dc1aacbc7933/ae157c5a05a8922a46a179cdb4c86b10/shared/9d8a1e92-2f69-4ab0-8ce9-c1beb149229a
>  {code}
>         The output stream will be automatically closed by the 
> SafetyNetCloseableRegistry but the file will not be deleted.
> The second case usually occurs when the storage system has high latency in 
> creating files.
> How to reproduce?
> This is not easy to reproduce, but you can try to set a smaller checkpoint 
> timeout and increase the parallelism of the flink job.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-28984) FsCheckpointStateOutputStream is not being released normally

2022-08-18 Thread ChangjiGuo (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17581190#comment-17581190
 ] 

ChangjiGuo commented on FLINK-28984:


Hi [~yunta], I have verified on Flink-1.15.1.

My fix is to check if _FsCheckpointStateOutputStream_ has been closed after 
creating the output stream and clean up if closed. It works well and without 
the above logs.

Can you take a look if you have time? Thx.

> FsCheckpointStateOutputStream is not being released normally
> 
>
> Key: FLINK-28984
> URL: https://issues.apache.org/jira/browse/FLINK-28984
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.11.6, 1.15.1
>Reporter: ChangjiGuo
>Priority: Major
> Attachments: log.png
>
>
> If the checkpoint is aborted, AsyncSnapshotCallable will close the 
> snapshotCloseableRegistry when it is canceled. There may be two situations 
> here:
>  # The FSDataOutputStream has been created and closed while closing 
> FsCheckpointStateOutputStream.
>  # The FSDataOutputStream has not been created yet, but closed flag has been 
> set to true. You can see this in log:
> {code:java}
> 2022-08-16 12:55:44,161 WARN  
> org.apache.flink.core.fs.SafetyNetCloseableRegistry   - Closing 
> unclosed resource via safety-net: 
> ClosingFSDataOutputStream(org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream@4ebe8e64)
>  : 
> x/flink/checkpoint/state/9214a2e302904b14baf2dc1aacbc7933/ae157c5a05a8922a46a179cdb4c86b10/shared/9d8a1e92-2f69-4ab0-8ce9-c1beb149229a
>  {code}
>         The output stream will be automatically closed by the 
> SafetyNetCloseableRegistry but the file will not be deleted.
> The second case usually occurs when the storage system has high latency in 
> creating files.
> How to reproduce?
> This is not easy to reproduce, but you can try to set a smaller checkpoint 
> timeout and increase the parallelism of the flink job.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-28927) Can not clean up the uploaded shared files when the checkpoint fails

2022-08-17 Thread ChangjiGuo (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17580261#comment-17580261
 ] 

ChangjiGuo edited comment on FLINK-28927 at 8/18/22 3:46 AM:
-

[~yunta] Thanks for your reply! I sorted out my thoughts, we can clean up 
uploaded files in two ways:
 # For being interrupted or getting a unexpected exception while waiting for 
the future to complete, we can catch exception, and then set a callback for 
each future and discard the stream state handle(as mentioned above).
 # For uninterrupted case, that is, the files have been uploaded, but the 
AsyncSnapshotTask is still running when canceled. First, we need to collect all 
uploaded files, which can be cleaned up according to whether the 
AsyncSnapshotTask is canceled. But there is a uncertainty problem that we 
cannot ensure the order of getting value of {_}isCancelled(){_}(if true, we 
will delete these files) and calling {_}stateFuture.cancel(true){_}, so we must 
clean up state after future is really done. The solution that comes to my mind 
is that we can discard state at _AsyncCheckpointRunnable#run_ in finally block 
according to whether _asyncCheckpointState_ is 
{_}AsyncCheckpointState.DISCARDED{_}.

I pasted part of the code of the _RocksDBIncrementalSnapshotOperation#get_ as 
follows:
{code:java}
@Override
public SnapshotResult get(CloseableRegistry 
snapshotCloseableRegistry)
throws Exception {

boolean completed = false;


final Map sstFiles = new HashMap<>();
// Handles to the misc files in the current snapshot will go here
final Map miscFiles = new HashMap<>();

try {



uploadSstFiles(sstFiles, miscFiles, snapshotCloseableRegistry);




completed = true;

return snapshotResult;
} finally {
if (!completed) {
final List statesToDiscard =
new ArrayList<>(1 + miscFiles.size() + sstFiles.size());
statesToDiscard.add(metaStateHandle);
statesToDiscard.addAll(miscFiles.values());
statesToDiscard.addAll(sstFiles.values());
cleanupIncompleteSnapshot(statesToDiscard);
}
}{code}
Here are some cases I can think of:
 # Interrupted while uploading sst files, and no misc file uploaded yet, the 
first way can clean up uploaded sst files and delete the file while waiting for 
the unfinished future to complete.
 # Interrupted while uploading misc files, _sstFiles_ already contains all 
uploaded sst files and it will be cleaned because of completed is false. The 
first way also can clean up uploaded misc files and delete the file while 
waiting for the unfinished future to complete too.
 # Both sst files and misc files have been uploaded, but the AsyncSnapshotTasks 
is cancelled(can't actually be interrupted), the future will return normally. 
The second way can clean up all state files.

I'm Looking forward for your reply! Thx.
 
 


was (Author: changjiguo):
[~yunta] Thanks for your reply! I sorted out my thoughts, we can clean up 
uploaded files in two ways:
 # For being interrupted or getting a unexpected exception while waiting for 
the future to complete, we can catch exception, and then set a callback for 
each future and discard the stream state handle(as mentioned above).
 # For uninterrupted case, that is, the files have been uploaded, but the 
AsyncSnapshotTask is still running when canceled. First, we need to collect all 
uploaded files, which can be cleaned up according to whether the 
AsyncSnapshotTask is canceled. But there is a key problem that we cannot ensure 
the order of getting value of _isCancelled()_ and calling 
{_}stateFuture.cancel(true){_}, so we must clean up state after future is done. 
In my opinion, we can discard state at _AsyncCheckpointRunnable#run_ in finally 
block according to whether _asyncCheckpointState_ is 
{_}AsyncCheckpointState.DISCARDED{_}.

I pasted part of the code of the _RocksDBIncrementalSnapshotOperation#get_ as 
follows:
{code:java}
@Override
public SnapshotResult get(CloseableRegistry 
snapshotCloseableRegistry)
throws Exception {

boolean completed = false;


final Map sstFiles = new HashMap<>();
// Handles to the misc files in the current snapshot will go here
final Map miscFiles = new HashMap<>();

try {



uploadSstFiles(sstFiles, miscFiles, snapshotCloseableRegistry);




completed = true;

return snapshotResult;
} finally {
if (!completed) {
final List statesToDiscard =
new ArrayList<>(1 + miscFiles.size() + sstFiles.size());
statesToDiscard.add(metaStateHandle);
statesToDiscard.addAll(miscFiles.values());
statesToDiscard.addAll(sstFiles.values());

[jira] [Comment Edited] (FLINK-28927) Can not clean up the uploaded shared files when the checkpoint fails

2022-08-17 Thread ChangjiGuo (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17580261#comment-17580261
 ] 

ChangjiGuo edited comment on FLINK-28927 at 8/18/22 3:36 AM:
-

[~yunta] Thanks for your reply! I sorted out my thoughts, we can clean up 
uploaded files in two ways:
 # For being interrupted or getting a unexpected exception while waiting for 
the future to complete, we can catch exception, and then set a callback for 
each future and discard the stream state handle(as mentioned above).
 # For uninterrupted case, that is, the files have been uploaded, but the 
AsyncSnapshotTask is still running when canceled. First, we need to collect all 
uploaded files, which can be cleaned up according to whether the 
AsyncSnapshotTask is canceled. But there is a key problem that we cannot ensure 
the order of getting value of _isCancelled()_ and calling 
{_}stateFuture.cancel(true){_}, so we must clean up state after future is done. 
In my opinion, we can discard state at _AsyncCheckpointRunnable#run_ in finally 
block according to whether _asyncCheckpointState_ is 
{_}AsyncCheckpointState.DISCARDED{_}.

I pasted part of the code of the _RocksDBIncrementalSnapshotOperation#get_ as 
follows:
{code:java}
@Override
public SnapshotResult get(CloseableRegistry 
snapshotCloseableRegistry)
throws Exception {

boolean completed = false;


final Map sstFiles = new HashMap<>();
// Handles to the misc files in the current snapshot will go here
final Map miscFiles = new HashMap<>();

try {



uploadSstFiles(sstFiles, miscFiles, snapshotCloseableRegistry);




completed = true;

return snapshotResult;
} finally {
if (!completed) {
final List statesToDiscard =
new ArrayList<>(1 + miscFiles.size() + sstFiles.size());
statesToDiscard.add(metaStateHandle);
statesToDiscard.addAll(miscFiles.values());
statesToDiscard.addAll(sstFiles.values());
cleanupIncompleteSnapshot(statesToDiscard);
}
}{code}
Here are some cases I can think of:
 # Interrupted while uploading sst files, and no misc file uploaded yet, the 
first way can clean up uploaded sst files and delete the file while waiting for 
the unfinished future to complete.
 # Interrupted while uploading misc files, _sstFiles_ already contains all 
uploaded sst files and it will be cleaned because of completed is false. The 
first way also can clean up uploaded misc files and delete the file while 
waiting for the unfinished future to complete too.
 # Both sst files and misc files have been uploaded, but the AsyncSnapshotTasks 
is cancelled(can't actually be interrupted), the future will return normally. 
The second way can clean up all state files.

I'm Looking forward for your reply! Thx.
 
 


was (Author: changjiguo):
[~yunta] Thanks for your reply! I sorted out my thoughts, we can clean up 
uploaded files in two ways:
 # For being interrupted or getting a unexpected exception while waiting for 
the future to complete, we can catch exception, and then set a callback for 
each future and discard the stream state handle(as mentioned above).
 # For uninterrupted case, that is, the files have been uploaded, but the 
AsyncSnapshotTask is still running when canceled. First, we need to collect all 
uploaded files, which can be cleaned up according to whether the 
AsyncSnapshotTask is canceled. But there is a key problem that we cannot ensure 
the order of getting value of _isCancelled()_ and calling 
{_}stateFuture.cancel(true){_}, so we must clean up state after future is done. 
In my opinion, we could discard state at _AsyncCheckpointRunnable#run_ in 
finally block according to whether _asyncCheckpointState_ is 
{_}AsyncCheckpointState.DISCARDED{_}.

I pasted part of the code of the _RocksDBIncrementalSnapshotOperation#get_ is 
as follows:
{code:java}
@Override
public SnapshotResult get(CloseableRegistry 
snapshotCloseableRegistry)
throws Exception {

boolean completed = false;


final Map sstFiles = new HashMap<>();
// Handles to the misc files in the current snapshot will go here
final Map miscFiles = new HashMap<>();

try {



uploadSstFiles(sstFiles, miscFiles, snapshotCloseableRegistry);




completed = true;

return snapshotResult;
} finally {
if (!completed) {
final List statesToDiscard =
new ArrayList<>(1 + miscFiles.size() + sstFiles.size());
statesToDiscard.add(metaStateHandle);
statesToDiscard.addAll(miscFiles.values());
statesToDiscard.addAll(sstFiles.values());
cleanupIncompleteSnapshot(statesToDiscard);
}
}{code}
Here are some cases I can 

[jira] [Comment Edited] (FLINK-28927) Can not clean up the uploaded shared files when the checkpoint fails

2022-08-17 Thread ChangjiGuo (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17580261#comment-17580261
 ] 

ChangjiGuo edited comment on FLINK-28927 at 8/18/22 3:31 AM:
-

[~yunta] Thanks for your reply! I sorted out my thoughts, we can clean up 
uploaded files in two ways:
 # For being interrupted or getting a unexpected exception while waiting for 
the future to complete, we can catch exception, and then set a callback for 
each future and discard the stream state handle(as mentioned above).
 # For uninterrupted case, that is, the files have been uploaded, but the 
AsyncSnapshotTask is still running when canceled. First, we need to collect all 
uploaded files, which can be cleaned up according to whether the 
AsyncSnapshotTask is canceled. But there is a key problem that we cannot ensure 
the order of getting value of _isCancelled()_ and calling 
{_}stateFuture.cancel(true){_}, so we must clean up state after future is done. 
In my opinion, we could discard state at _AsyncCheckpointRunnable#run_ in 
finally block according to whether _asyncCheckpointState_ is 
{_}AsyncCheckpointState.DISCARDED{_}.

I pasted part of the code of the _RocksDBIncrementalSnapshotOperation#get_ is 
as follows:
{code:java}
@Override
public SnapshotResult get(CloseableRegistry 
snapshotCloseableRegistry)
throws Exception {

boolean completed = false;


final Map sstFiles = new HashMap<>();
// Handles to the misc files in the current snapshot will go here
final Map miscFiles = new HashMap<>();

try {



uploadSstFiles(sstFiles, miscFiles, snapshotCloseableRegistry);




completed = true;

return snapshotResult;
} finally {
if (!completed) {
final List statesToDiscard =
new ArrayList<>(1 + miscFiles.size() + sstFiles.size());
statesToDiscard.add(metaStateHandle);
statesToDiscard.addAll(miscFiles.values());
statesToDiscard.addAll(sstFiles.values());
cleanupIncompleteSnapshot(statesToDiscard);
}
}{code}
Here are some cases I can think of:
 # Interrupted while uploading sst files, and no misc file uploaded yet, the 
first way can clean up uploaded sst files and delete the file while waiting for 
the unfinished future to complete.
 # Interrupted while uploading misc files, _sstFiles_ already contains all 
uploaded sst files and it will be cleaned because of completed is false. The 
first way also can clean up uploaded misc files and delete the file while 
waiting for the unfinished future to complete.
 # Both sst files and misc files have been uploaded, but the future is 
cancelled(can't actually be interrupted), the future will return normally. The 
second way can clean up all states.

Looking forward to your reply! Thx.
 


was (Author: changjiguo):
[~yunta] Thanks for your reply! I sorted out my thoughts, we can clean up 
uploaded files in two ways:
 # For being interrupted or getting a unexpected exception while waiting for 
the future to complete, we can catch exception, and then set a callback for 
each future and discard the stream state handle(as mentioned above).
 # For uninterrupted case, that is, the files have been uploaded, but the 
AsyncSnapshotTask is still running when canceled. First, we need to collect all 
uploaded files, which can be cleaned up according to whether the 
AsyncSnapshotTask is canceled. But there is a key problem that we cannot ensure 
the order of getting value of _isCancelled()_ and calling 
{_}stateFuture.cancel(true){_}, so we must clean up state after future is done. 
In my opinion, we could discard state at _AsyncCheckpointRunnable#run_ in 
finally block according to whether _asyncCheckpointState_ is 
{_}AsyncCheckpointState.DISCARDED{_}.

I pasted part of the code of the _RocksDBIncrementalSnapshotOperation#get_ is 
as follows:
{code:java}
@Override
public SnapshotResult get(CloseableRegistry 
snapshotCloseableRegistry)
throws Exception {

boolean completed = false;


final Map sstFiles = new HashMap<>();
// Handles to the misc files in the current snapshot will go here
final Map miscFiles = new HashMap<>();

try {



uploadSstFiles(sstFiles, miscFiles, snapshotCloseableRegistry);




completed = true;

return snapshotResult;
} finally {
if (!completed) {
final List statesToDiscard =
new ArrayList<>(1 + miscFiles.size() + sstFiles.size());
statesToDiscard.add(metaStateHandle);
statesToDiscard.addAll(miscFiles.values());
statesToDiscard.addAll(sstFiles.values());
cleanupIncompleteSnapshot(statesToDiscard);
}
}{code}
Here are some cases I can think of:
 # Interrupted 

[jira] [Updated] (FLINK-28927) Can not clean up the uploaded shared files when the checkpoint fails

2022-08-17 Thread ChangjiGuo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28927?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ChangjiGuo updated FLINK-28927:
---
Description: 
If a checkpoint times out, the task will cancel all snapshot futures and do 
some cleanup work, including the following:
 * Cancel all AsyncSnapshotTasks.
 * If the future has finished, it will clean up all state object.
 * If the future has not completed, it will be interrupted(maybe).
 * Close snapshotCloseableRegistry.

In my case, the thread was interrupted while waiting for the file upload to 
complete, but the file was not cleaned up.

RocksDBStateUploader.java
{code:java}
FutureUtils.waitForAll(futures.values()).get();
{code}
It will wait for all files to be uploaded here. Although it has been 
interrupted, the uploaded files will not be cleaned up. The remaining files are 
mainly divided into:
 * Files that have finished uploading before the thread is canceled.
 * outputStream.closeAndGetHandle() is called, but snapshotCloseableRegistry 
has not been closed.

How to reproduce?
Shorten the checkpoint timeout time, making the checkpoint fail. Then check if 
there are any files in the shared directory.

I'm testing on Flink-1.11, but I found the code from the latest branch may have 
the same problem. I tried to fix it.
 
 

  was:
If a checkpoint times out, the task will cancel the AsyncCheckpointRunnable 
thread and do some cleanup work, including the following:
 * Cancel all AsyncSnapshotTasks.
 * If the future has finished, it will clean up all state object.
 * If the future has not completed, it will be interrupted(maybe).
 * Close snapshotCloseableRegistry.

In my case, the thread was interrupted while waiting for the file upload to 
complete, but the file was not cleaned up.

RocksDBStateUploader.java
{code:java}
FutureUtils.waitForAll(futures.values()).get();
{code}
It will wait for all files to be uploaded here. Although it has been 
interrupted, the uploaded files will not be cleaned up. The remaining files are 
mainly divided into:
 * Files that have finished uploading before the thread is canceled.
 * outputStream.closeAndGetHandle() is called, but snapshotCloseableRegistry 
has not been closed.

How to reproduce?
Shorten the checkpoint timeout time, making the checkpoint fail. Then check if 
there are any files in the shared directory.

I'm testing on Flink-1.11, but I found the code from the latest branch may have 
the same problem. I tried to fix it.
 
 


> Can not clean up the uploaded shared files when the checkpoint fails
> 
>
> Key: FLINK-28927
> URL: https://issues.apache.org/jira/browse/FLINK-28927
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Affects Versions: 1.11.6, 1.15.1
> Environment: Flink-1.11
>Reporter: ChangjiGuo
>Assignee: ChangjiGuo
>Priority: Major
>
> If a checkpoint times out, the task will cancel all snapshot futures and do 
> some cleanup work, including the following:
>  * Cancel all AsyncSnapshotTasks.
>  * If the future has finished, it will clean up all state object.
>  * If the future has not completed, it will be interrupted(maybe).
>  * Close snapshotCloseableRegistry.
> In my case, the thread was interrupted while waiting for the file upload to 
> complete, but the file was not cleaned up.
> RocksDBStateUploader.java
> {code:java}
> FutureUtils.waitForAll(futures.values()).get();
> {code}
> It will wait for all files to be uploaded here. Although it has been 
> interrupted, the uploaded files will not be cleaned up. The remaining files 
> are mainly divided into:
>  * Files that have finished uploading before the thread is canceled.
>  * outputStream.closeAndGetHandle() is called, but snapshotCloseableRegistry 
> has not been closed.
> How to reproduce?
> Shorten the checkpoint timeout time, making the checkpoint fail. Then check 
> if there are any files in the shared directory.
> I'm testing on Flink-1.11, but I found the code from the latest branch may 
> have the same problem. I tried to fix it.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-28984) FsCheckpointStateOutputStream is not being released normally

2022-08-16 Thread ChangjiGuo (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17580099#comment-17580099
 ] 

ChangjiGuo edited comment on FLINK-28984 at 8/17/22 3:06 AM:
-

[~yunta]  I'm sorry, maybe I didn't express it clearly. There is a prerequisite 
here that snapshotCloseableRegistry will be closed, and then the registered 
closabe will call the close method. Both 
_FsCheckpointStateOutputStream#createStream_ and 
_FsCheckpointStateOutputStream#close_ can be called at the same time. It is 
possible that the FSDataOutputStream has not been created when the close 
called(at this time, outStream is null).

In order to verify this conjecture, I printed the log at both setting closed = 
true and returning stream, as follows:

!log.png|width=741,height=304!
 


was (Author: changjiguo):
[~yunta]  I'm sorry, maybe I didn't express it clearly. There is a prerequisite 
here is to close the snapshotCloseableRegistry, and the registered closabe will 
call the close method. Both _FsCheckpointStateOutputStream#createStream_ and 
_FsCheckpointStateOutputStream#close_ can be called at the same time. It is 
possible that the FSDataOutputStream has not been created when the close 
called(at this time, outStream is null).

In order to verify this conjecture, I printed the log at both setting closed = 
true and returning stream, as follows:

!log.png|width=741,height=304!

> FsCheckpointStateOutputStream is not being released normally
> 
>
> Key: FLINK-28984
> URL: https://issues.apache.org/jira/browse/FLINK-28984
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.11.6, 1.15.1
>Reporter: ChangjiGuo
>Priority: Major
> Attachments: log.png
>
>
> If the checkpoint is aborted, AsyncSnapshotCallable will close the 
> snapshotCloseableRegistry when it is canceled. There may be two situations 
> here:
>  # The FSDataOutputStream has been created and closed while closing 
> FsCheckpointStateOutputStream.
>  # The FSDataOutputStream has not been created yet, but closed flag has been 
> set to true. You can see this in log:
> {code:java}
> 2022-08-16 12:55:44,161 WARN  
> org.apache.flink.core.fs.SafetyNetCloseableRegistry   - Closing 
> unclosed resource via safety-net: 
> ClosingFSDataOutputStream(org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream@4ebe8e64)
>  : 
> x/flink/checkpoint/state/9214a2e302904b14baf2dc1aacbc7933/ae157c5a05a8922a46a179cdb4c86b10/shared/9d8a1e92-2f69-4ab0-8ce9-c1beb149229a
>  {code}
>         The output stream will be automatically closed by the 
> SafetyNetCloseableRegistry but the file will not be deleted.
> The second case usually occurs when the storage system has high latency in 
> creating files.
> How to reproduce?
> This is not easy to reproduce, but you can try to set a smaller checkpoint 
> timeout and increase the parallelism of the flink job.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-28927) Can not clean up the uploaded shared files when the checkpoint fails

2022-08-16 Thread ChangjiGuo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28927?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ChangjiGuo updated FLINK-28927:
---
Description: 
If a checkpoint times out, the task will cancel the AsyncCheckpointRunnable 
thread and do some cleanup work, including the following:
 * Cancel all AsyncSnapshotTasks.
 * If the future has finished, it will clean up all state object.
 * If the future has not completed, it will be interrupted(maybe).
 * Close snapshotCloseableRegistry.

In my case, the thread was interrupted while waiting for the file upload to 
complete, but the file was not cleaned up.

RocksDBStateUploader.java
{code:java}
FutureUtils.waitForAll(futures.values()).get();
{code}
It will wait for all files to be uploaded here. Although it has been 
interrupted, the uploaded files will not be cleaned up. The remaining files are 
mainly divided into:
 * Files that have finished uploading before the thread is canceled.
 * outputStream.closeAndGetHandle() is called, but snapshotCloseableRegistry 
has not been closed.

How to reproduce?
Shorten the checkpoint timeout time, making the checkpoint fail. Then check if 
there are any files in the shared directory.

I'm testing on Flink-1.11, but I found the code from the latest branch may have 
the same problem. I tried to fix it.
 
 

  was:
If a checkpoint times out, the task will cancel the AsyncCheckpointRunnable 
thread and do some cleanup work, including the following:
 * Cancel all AsyncSnapshotCallable thread.
 * If the thread has finished, it will clean up all state object.
 * If the thread has not completed, it will be interrupted(maybe).
 * Close snapshotCloseableRegistry.

In my case, the thread was interrupted while waiting for the file upload to 
complete, but the file was not cleaned up.

RocksDBStateUploader.java
{code:java}
FutureUtils.waitForAll(futures.values()).get();
{code}
It will wait for all files to be uploaded here. Although it has been 
interrupted, the uploaded files will not be cleaned up. The remaining files are 
mainly divided into:
 * Files that have finished uploading before the thread is canceled.
 * outputStream.closeAndGetHandle() is called, but snapshotCloseableRegistry 
has not been closed.

How to reproduce?
Shorten the checkpoint timeout time, making the checkpoint fail. Then check if 
there are any files in the shared directory.

I'm testing on Flink-1.11, but I found the code from the latest branch may have 
the same problem. I tried to fix it.
 


> Can not clean up the uploaded shared files when the checkpoint fails
> 
>
> Key: FLINK-28927
> URL: https://issues.apache.org/jira/browse/FLINK-28927
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Affects Versions: 1.11.6, 1.15.1
> Environment: Flink-1.11
>Reporter: ChangjiGuo
>Assignee: ChangjiGuo
>Priority: Major
>
> If a checkpoint times out, the task will cancel the AsyncCheckpointRunnable 
> thread and do some cleanup work, including the following:
>  * Cancel all AsyncSnapshotTasks.
>  * If the future has finished, it will clean up all state object.
>  * If the future has not completed, it will be interrupted(maybe).
>  * Close snapshotCloseableRegistry.
> In my case, the thread was interrupted while waiting for the file upload to 
> complete, but the file was not cleaned up.
> RocksDBStateUploader.java
> {code:java}
> FutureUtils.waitForAll(futures.values()).get();
> {code}
> It will wait for all files to be uploaded here. Although it has been 
> interrupted, the uploaded files will not be cleaned up. The remaining files 
> are mainly divided into:
>  * Files that have finished uploading before the thread is canceled.
>  * outputStream.closeAndGetHandle() is called, but snapshotCloseableRegistry 
> has not been closed.
> How to reproduce?
> Shorten the checkpoint timeout time, making the checkpoint fail. Then check 
> if there are any files in the shared directory.
> I'm testing on Flink-1.11, but I found the code from the latest branch may 
> have the same problem. I tried to fix it.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-28927) Can not clean up the uploaded shared files when the checkpoint fails

2022-08-16 Thread ChangjiGuo (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17580261#comment-17580261
 ] 

ChangjiGuo commented on FLINK-28927:


[~yunta] Thanks for your reply! I sorted out my thoughts, we can clean up 
uploaded files in two ways:
 # For being interrupted or getting a unexpected exception while waiting for 
the future to complete, we can catch exception, and then set a callback for 
each future and discard the stream state handle(as mentioned above).
 # For uninterrupted case, that is, the files have been uploaded, but the 
AsyncSnapshotTask is still running when canceled. First, we need to collect all 
uploaded files, which can be cleaned up according to whether the 
AsyncSnapshotTask is canceled. But there is a key problem that we cannot ensure 
the order of getting value of _isCancelled()_ and calling 
{_}stateFuture.cancel(true){_}, so we must clean up state after future is done. 
In my opinion, we could discard state at _AsyncCheckpointRunnable#run_ in 
finally block according to whether _asyncCheckpointState_ is 
{_}AsyncCheckpointState.DISCARDED{_}.

I pasted part of the code of the _RocksDBIncrementalSnapshotOperation#get_ is 
as follows:
{code:java}
@Override
public SnapshotResult get(CloseableRegistry 
snapshotCloseableRegistry)
throws Exception {

boolean completed = false;


final Map sstFiles = new HashMap<>();
// Handles to the misc files in the current snapshot will go here
final Map miscFiles = new HashMap<>();

try {



uploadSstFiles(sstFiles, miscFiles, snapshotCloseableRegistry);




completed = true;

return snapshotResult;
} finally {
if (!completed) {
final List statesToDiscard =
new ArrayList<>(1 + miscFiles.size() + sstFiles.size());
statesToDiscard.add(metaStateHandle);
statesToDiscard.addAll(miscFiles.values());
statesToDiscard.addAll(sstFiles.values());
cleanupIncompleteSnapshot(statesToDiscard);
}
}{code}
Here are some cases I can think of:
 # Interrupted while uploading sst files, and no misc file uploaded yet, the 
first way can clean up uploaded sst files.
 # Interrupted while uploading misc files, _sstFiles_ already contains all 
uploaded sst files and it will be cleaned because of completed is false. The 
first way also can clean up uploaded misc files.
 # Both sst files and misc files have been uploaded, but the future is 
cancelled(can't actually be interrupted), the future will return normally. The 
second way can clean up all states.

Looking forward to your reply! Thx.

> Can not clean up the uploaded shared files when the checkpoint fails
> 
>
> Key: FLINK-28927
> URL: https://issues.apache.org/jira/browse/FLINK-28927
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Affects Versions: 1.11.6, 1.15.1
> Environment: Flink-1.11
>Reporter: ChangjiGuo
>Assignee: ChangjiGuo
>Priority: Major
>
> If a checkpoint times out, the task will cancel the AsyncCheckpointRunnable 
> thread and do some cleanup work, including the following:
>  * Cancel all AsyncSnapshotCallable thread.
>  * If the thread has finished, it will clean up all state object.
>  * If the thread has not completed, it will be interrupted(maybe).
>  * Close snapshotCloseableRegistry.
> In my case, the thread was interrupted while waiting for the file upload to 
> complete, but the file was not cleaned up.
> RocksDBStateUploader.java
> {code:java}
> FutureUtils.waitForAll(futures.values()).get();
> {code}
> It will wait for all files to be uploaded here. Although it has been 
> interrupted, the uploaded files will not be cleaned up. The remaining files 
> are mainly divided into:
>  * Files that have finished uploading before the thread is canceled.
>  * outputStream.closeAndGetHandle() is called, but snapshotCloseableRegistry 
> has not been closed.
> How to reproduce?
> Shorten the checkpoint timeout time, making the checkpoint fail. Then check 
> if there are any files in the shared directory.
> I'm testing on Flink-1.11, but I found the code from the latest branch may 
> have the same problem. I tried to fix it.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-28927) Can not clean up the uploaded shared files when the checkpoint fails

2022-08-16 Thread ChangjiGuo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28927?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ChangjiGuo updated FLINK-28927:
---
Description: 
If a checkpoint times out, the task will cancel the AsyncCheckpointRunnable 
thread and do some cleanup work, including the following:
 * Cancel all AsyncSnapshotCallable thread.
 * If the thread has finished, it will clean up all state object.
 * If the thread has not completed, it will be interrupted(maybe).
 * Close snapshotCloseableRegistry.

In my case, the thread was interrupted while waiting for the file upload to 
complete, but the file was not cleaned up.

RocksDBStateUploader.java
{code:java}
FutureUtils.waitForAll(futures.values()).get();
{code}
It will wait for all files to be uploaded here. Although it has been 
interrupted, the uploaded files will not be cleaned up. The remaining files are 
mainly divided into:
 * Files that have finished uploading before the thread is canceled.
 * outputStream.closeAndGetHandle() is called, but snapshotCloseableRegistry 
has not been closed.

How to reproduce?
Shorten the checkpoint timeout time, making the checkpoint fail. Then check if 
there are any files in the shared directory.

I'm testing on Flink-1.11, but I found the code from the latest branch may have 
the same problem. I tried to fix it.
 

  was:
If a checkpoint times out, the task will cancel the AsyncCheckpointRunnable 
thread and do some cleanup work, including the following:
 * Cancel all AsyncSnapshotCallable thread.
 * If the thread has finished, it will clean up all state object.
 * If the thread has not completed, it will be interrupted(maybe).
 * Close snapshotCloseableRegistry.

In my case, the thread was interrupted while waiting for the file upload to 
complete, but the file was not cleaned up.

RocksDBStateUploader.java
{code:java}
FutureUtils.waitForAll(futures.values()).get();
{code}
It will wait for all files to be uploaded here. Although it has been 
interrupted, the uploaded files will not be cleaned up. The remaining files are 
mainly divided into:
 * Files that have finished uploading before the thread is canceled.
 * outputStream.closeAndGetHandle() is called, but snapshotCloseableRegistry 
has not been closed.

How to reproduce?
Shorten the checkpoint timeout time, making the checkpoint fail. Then check if 
there are any files in the shared directory.

I'm testing on Flink-1.11, but I found the code from the latest branch may have 
the same problem. I tried to fix it and it works well so far.


> Can not clean up the uploaded shared files when the checkpoint fails
> 
>
> Key: FLINK-28927
> URL: https://issues.apache.org/jira/browse/FLINK-28927
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Affects Versions: 1.11.6, 1.15.1
> Environment: Flink-1.11
>Reporter: ChangjiGuo
>Assignee: ChangjiGuo
>Priority: Major
>
> If a checkpoint times out, the task will cancel the AsyncCheckpointRunnable 
> thread and do some cleanup work, including the following:
>  * Cancel all AsyncSnapshotCallable thread.
>  * If the thread has finished, it will clean up all state object.
>  * If the thread has not completed, it will be interrupted(maybe).
>  * Close snapshotCloseableRegistry.
> In my case, the thread was interrupted while waiting for the file upload to 
> complete, but the file was not cleaned up.
> RocksDBStateUploader.java
> {code:java}
> FutureUtils.waitForAll(futures.values()).get();
> {code}
> It will wait for all files to be uploaded here. Although it has been 
> interrupted, the uploaded files will not be cleaned up. The remaining files 
> are mainly divided into:
>  * Files that have finished uploading before the thread is canceled.
>  * outputStream.closeAndGetHandle() is called, but snapshotCloseableRegistry 
> has not been closed.
> How to reproduce?
> Shorten the checkpoint timeout time, making the checkpoint fail. Then check 
> if there are any files in the shared directory.
> I'm testing on Flink-1.11, but I found the code from the latest branch may 
> have the same problem. I tried to fix it.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-28984) FsCheckpointStateOutputStream is not being released normally

2022-08-16 Thread ChangjiGuo (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17580099#comment-17580099
 ] 

ChangjiGuo commented on FLINK-28984:


[~yunta]  I'm sorry, maybe I didn't express it clearly. There is a prerequisite 
here is to close the snapshotCloseableRegistry, and the registered closabe will 
call the close method. Both _FsCheckpointStateOutputStream#createStream_ and 
_FsCheckpointStateOutputStream#close_ can be called at the same time. It is 
possible that the FSDataOutputStream has not been created when the close 
called(at this time, outStream is null).

In order to verify this conjecture, I printed the log at both setting closed = 
true and returning stream, as follows:

!log.png|width=741,height=304!

> FsCheckpointStateOutputStream is not being released normally
> 
>
> Key: FLINK-28984
> URL: https://issues.apache.org/jira/browse/FLINK-28984
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.11.6, 1.15.1
>Reporter: ChangjiGuo
>Priority: Major
> Attachments: log.png
>
>
> If the checkpoint is aborted, AsyncSnapshotCallable will close the 
> snapshotCloseableRegistry when it is canceled. There may be two situations 
> here:
>  # The FSDataOutputStream has been created and closed while closing 
> FsCheckpointStateOutputStream.
>  # The FSDataOutputStream has not been created yet, but closed flag has been 
> set to true. You can see this in log:
> {code:java}
> 2022-08-16 12:55:44,161 WARN  
> org.apache.flink.core.fs.SafetyNetCloseableRegistry   - Closing 
> unclosed resource via safety-net: 
> ClosingFSDataOutputStream(org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream@4ebe8e64)
>  : 
> x/flink/checkpoint/state/9214a2e302904b14baf2dc1aacbc7933/ae157c5a05a8922a46a179cdb4c86b10/shared/9d8a1e92-2f69-4ab0-8ce9-c1beb149229a
>  {code}
>         The output stream will be automatically closed by the 
> SafetyNetCloseableRegistry but the file will not be deleted.
> The second case usually occurs when the storage system has high latency in 
> creating files.
> How to reproduce?
> This is not easy to reproduce, but you can try to set a smaller checkpoint 
> timeout and increase the parallelism of the flink job.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-28984) FsCheckpointStateOutputStream is not being released normally

2022-08-16 Thread ChangjiGuo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28984?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ChangjiGuo updated FLINK-28984:
---
Attachment: log.png

> FsCheckpointStateOutputStream is not being released normally
> 
>
> Key: FLINK-28984
> URL: https://issues.apache.org/jira/browse/FLINK-28984
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.11.6, 1.15.1
>Reporter: ChangjiGuo
>Priority: Major
> Attachments: log.png
>
>
> If the checkpoint is aborted, AsyncSnapshotCallable will close the 
> snapshotCloseableRegistry when it is canceled. There may be two situations 
> here:
>  # The FSDataOutputStream has been created and closed while closing 
> FsCheckpointStateOutputStream.
>  # The FSDataOutputStream has not been created yet, but closed flag has been 
> set to true. You can see this in log:
> {code:java}
> 2022-08-16 12:55:44,161 WARN  
> org.apache.flink.core.fs.SafetyNetCloseableRegistry   - Closing 
> unclosed resource via safety-net: 
> ClosingFSDataOutputStream(org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream@4ebe8e64)
>  : 
> x/flink/checkpoint/state/9214a2e302904b14baf2dc1aacbc7933/ae157c5a05a8922a46a179cdb4c86b10/shared/9d8a1e92-2f69-4ab0-8ce9-c1beb149229a
>  {code}
>         The output stream will be automatically closed by the 
> SafetyNetCloseableRegistry but the file will not be deleted.
> The second case usually occurs when the storage system has high latency in 
> creating files.
> How to reproduce?
> This is not easy to reproduce, but you can try to set a smaller checkpoint 
> timeout and increase the parallelism of the flink job.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-28927) Can not clean up the uploaded shared files when the checkpoint fails

2022-08-15 Thread ChangjiGuo (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17580044#comment-17580044
 ] 

ChangjiGuo edited comment on FLINK-28927 at 8/16/22 5:31 AM:
-

I also found another problem when debugging the code.

See [here|https://issues.apache.org/jira/browse/FLINK-28984]

[~yunta] Can you assign these two issues to me? Thanks!
 
 


was (Author: changjiguo):
I also found another problem when debugging the code.

See [https://issues.apache.org/jira/projects/FLINK/issues/FLINK-28984]

[~yunta] Can you assign these two issues to me? Thanks!
 

> Can not clean up the uploaded shared files when the checkpoint fails
> 
>
> Key: FLINK-28927
> URL: https://issues.apache.org/jira/browse/FLINK-28927
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Affects Versions: 1.11.6, 1.15.1
> Environment: Flink-1.11
>Reporter: ChangjiGuo
>Priority: Major
>
> If a checkpoint times out, the task will cancel the AsyncCheckpointRunnable 
> thread and do some cleanup work, including the following:
>  * Cancel all AsyncSnapshotCallable thread.
>  * If the thread has finished, it will clean up all state object.
>  * If the thread has not completed, it will be interrupted(maybe).
>  * Close snapshotCloseableRegistry.
> In my case, the thread was interrupted while waiting for the file upload to 
> complete, but the file was not cleaned up.
> RocksDBStateUploader.java
> {code:java}
> FutureUtils.waitForAll(futures.values()).get();
> {code}
> It will wait for all files to be uploaded here. Although it has been 
> interrupted, the uploaded files will not be cleaned up. The remaining files 
> are mainly divided into:
>  * Files that have finished uploading before the thread is canceled.
>  * outputStream.closeAndGetHandle() is called, but snapshotCloseableRegistry 
> has not been closed.
> How to reproduce?
> Shorten the checkpoint timeout time, making the checkpoint fail. Then check 
> if there are any files in the shared directory.
> I'm testing on Flink-1.11, but I found the code from the latest branch may 
> have the same problem. I tried to fix it and it works well so far.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-28984) FsCheckpointStateOutputStream is not being released normally

2022-08-15 Thread ChangjiGuo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28984?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ChangjiGuo updated FLINK-28984:
---
Description: 
If the checkpoint is aborted, AsyncSnapshotCallable will close the 
snapshotCloseableRegistry when it is canceled. There may be two situations here:
 # The FSDataOutputStream has been created and closed while closing 
FsCheckpointStateOutputStream.
 # The FSDataOutputStream has not been created yet, but closed flag has been 
set to true. You can see this in log:
{code:java}
2022-08-16 12:55:44,161 WARN  
org.apache.flink.core.fs.SafetyNetCloseableRegistry   - Closing 
unclosed resource via safety-net: 
ClosingFSDataOutputStream(org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream@4ebe8e64)
 : 
x/flink/checkpoint/state/9214a2e302904b14baf2dc1aacbc7933/ae157c5a05a8922a46a179cdb4c86b10/shared/9d8a1e92-2f69-4ab0-8ce9-c1beb149229a
 {code}

        The output stream will be automatically closed by the 
SafetyNetCloseableRegistry but the file will not be deleted.

The second case usually occurs when the storage system has high latency in 
creating files.

How to reproduce?

This is not easy to reproduce, but you can try to set a smaller checkpoint 
timeout and increase the parallelism of the flink job.
 

  was:
If the checkpoint is aborted, AsyncSnapshotCallable will close the 
snapshotCloseableRegistry when it is canceled. There may be two situations here:
 # The FSDataOutputStream has been created and closed while closing 
FsCheckpointStateOutputStream.
 # The FSDataOutputStream has not been created yet, but closed flag has been 
set to true. You can see this in log:
{code:java}
2022-08-16 12:55:44,161 WARN  
org.apache.flink.core.fs.SafetyNetCloseableRegistry   - Closing 
unclosed resource via safety-net: 
ClosingFSDataOutputStream(org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream@4ebe8e64)
 : 
x/flink/checkpoint/state/9214a2e302904b14baf2dc1aacbc7933/ae157c5a05a8922a46a179cdb4c86b10/shared/9d8a1e92-2f69-4ab0-8ce9-c1beb149229a
 {code}

        The output stream will be automatically closed by the 
SafetyNetCloseableRegistry but the file will not be deleted.

The second case usually occurs when the storage system has high latency in 
creating files.

 


> FsCheckpointStateOutputStream is not being released normally
> 
>
> Key: FLINK-28984
> URL: https://issues.apache.org/jira/browse/FLINK-28984
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.11.6, 1.15.1
>Reporter: ChangjiGuo
>Priority: Major
>
> If the checkpoint is aborted, AsyncSnapshotCallable will close the 
> snapshotCloseableRegistry when it is canceled. There may be two situations 
> here:
>  # The FSDataOutputStream has been created and closed while closing 
> FsCheckpointStateOutputStream.
>  # The FSDataOutputStream has not been created yet, but closed flag has been 
> set to true. You can see this in log:
> {code:java}
> 2022-08-16 12:55:44,161 WARN  
> org.apache.flink.core.fs.SafetyNetCloseableRegistry   - Closing 
> unclosed resource via safety-net: 
> ClosingFSDataOutputStream(org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream@4ebe8e64)
>  : 
> x/flink/checkpoint/state/9214a2e302904b14baf2dc1aacbc7933/ae157c5a05a8922a46a179cdb4c86b10/shared/9d8a1e92-2f69-4ab0-8ce9-c1beb149229a
>  {code}
>         The output stream will be automatically closed by the 
> SafetyNetCloseableRegistry but the file will not be deleted.
> The second case usually occurs when the storage system has high latency in 
> creating files.
> How to reproduce?
> This is not easy to reproduce, but you can try to set a smaller checkpoint 
> timeout and increase the parallelism of the flink job.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-28984) FsCheckpointStateOutputStream is not being released normally

2022-08-15 Thread ChangjiGuo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28984?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ChangjiGuo updated FLINK-28984:
---
External issue URL: https://issues.apache.org/jira/browse/FLINK-28927

> FsCheckpointStateOutputStream is not being released normally
> 
>
> Key: FLINK-28984
> URL: https://issues.apache.org/jira/browse/FLINK-28984
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.11.6, 1.15.1
>Reporter: ChangjiGuo
>Priority: Major
>
> If the checkpoint is aborted, AsyncSnapshotCallable will close the 
> snapshotCloseableRegistry when it is canceled. There may be two situations 
> here:
>  # The FSDataOutputStream has been created and closed while closing 
> FsCheckpointStateOutputStream.
>  # The FSDataOutputStream has not been created yet, but closed flag has been 
> set to true. You can see this in log:
> {code:java}
> 2022-08-16 12:55:44,161 WARN  
> org.apache.flink.core.fs.SafetyNetCloseableRegistry   - Closing 
> unclosed resource via safety-net: 
> ClosingFSDataOutputStream(org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream@4ebe8e64)
>  : 
> x/flink/checkpoint/state/9214a2e302904b14baf2dc1aacbc7933/ae157c5a05a8922a46a179cdb4c86b10/shared/9d8a1e92-2f69-4ab0-8ce9-c1beb149229a
>  {code}
>         The output stream will be automatically closed by the 
> SafetyNetCloseableRegistry but the file will not be deleted.
> The second case usually occurs when the storage system has high latency in 
> creating files.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-28984) FsCheckpointStateOutputStream is not being released normally

2022-08-15 Thread ChangjiGuo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28984?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ChangjiGuo updated FLINK-28984:
---
External issue URL:   (was: 
https://issues.apache.org/jira/browse/FLINK-28927)

> FsCheckpointStateOutputStream is not being released normally
> 
>
> Key: FLINK-28984
> URL: https://issues.apache.org/jira/browse/FLINK-28984
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.11.6, 1.15.1
>Reporter: ChangjiGuo
>Priority: Major
>
> If the checkpoint is aborted, AsyncSnapshotCallable will close the 
> snapshotCloseableRegistry when it is canceled. There may be two situations 
> here:
>  # The FSDataOutputStream has been created and closed while closing 
> FsCheckpointStateOutputStream.
>  # The FSDataOutputStream has not been created yet, but closed flag has been 
> set to true. You can see this in log:
> {code:java}
> 2022-08-16 12:55:44,161 WARN  
> org.apache.flink.core.fs.SafetyNetCloseableRegistry   - Closing 
> unclosed resource via safety-net: 
> ClosingFSDataOutputStream(org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream@4ebe8e64)
>  : 
> x/flink/checkpoint/state/9214a2e302904b14baf2dc1aacbc7933/ae157c5a05a8922a46a179cdb4c86b10/shared/9d8a1e92-2f69-4ab0-8ce9-c1beb149229a
>  {code}
>         The output stream will be automatically closed by the 
> SafetyNetCloseableRegistry but the file will not be deleted.
> The second case usually occurs when the storage system has high latency in 
> creating files.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-28927) Can not clean up the uploaded shared files when the checkpoint fails

2022-08-15 Thread ChangjiGuo (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17580044#comment-17580044
 ] 

ChangjiGuo edited comment on FLINK-28927 at 8/16/22 5:15 AM:
-

I also found another problem when debugging the code.

See [https://issues.apache.org/jira/projects/FLINK/issues/FLINK-28984]

[~yunta] Can you assign these two issues to me? Thanks!
 


was (Author: changjiguo):
I also found another problem when debugging the code.

See [https://issues.apache.org/jira/projects/FLINK/issues/FLINK-28984]
 

> Can not clean up the uploaded shared files when the checkpoint fails
> 
>
> Key: FLINK-28927
> URL: https://issues.apache.org/jira/browse/FLINK-28927
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Affects Versions: 1.11.6, 1.15.1
> Environment: Flink-1.11
>Reporter: ChangjiGuo
>Priority: Major
>
> If a checkpoint times out, the task will cancel the AsyncCheckpointRunnable 
> thread and do some cleanup work, including the following:
>  * Cancel all AsyncSnapshotCallable thread.
>  * If the thread has finished, it will clean up all state object.
>  * If the thread has not completed, it will be interrupted(maybe).
>  * Close snapshotCloseableRegistry.
> In my case, the thread was interrupted while waiting for the file upload to 
> complete, but the file was not cleaned up.
> RocksDBStateUploader.java
> {code:java}
> FutureUtils.waitForAll(futures.values()).get();
> {code}
> It will wait for all files to be uploaded here. Although it has been 
> interrupted, the uploaded files will not be cleaned up. The remaining files 
> are mainly divided into:
>  * Files that have finished uploading before the thread is canceled.
>  * outputStream.closeAndGetHandle() is called, but snapshotCloseableRegistry 
> has not been closed.
> How to reproduce?
> Shorten the checkpoint timeout time, making the checkpoint fail. Then check 
> if there are any files in the shared directory.
> I'm testing on Flink-1.11, but I found the code from the latest branch may 
> have the same problem. I tried to fix it and it works well so far.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-28927) Can not clean up the uploaded shared files when the checkpoint fails

2022-08-15 Thread ChangjiGuo (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17580044#comment-17580044
 ] 

ChangjiGuo commented on FLINK-28927:


I also found another problem when debugging the code.

See [https://issues.apache.org/jira/projects/FLINK/issues/FLINK-28984]
 

> Can not clean up the uploaded shared files when the checkpoint fails
> 
>
> Key: FLINK-28927
> URL: https://issues.apache.org/jira/browse/FLINK-28927
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Affects Versions: 1.11.6, 1.15.1
> Environment: Flink-1.11
>Reporter: ChangjiGuo
>Priority: Major
>
> If a checkpoint times out, the task will cancel the AsyncCheckpointRunnable 
> thread and do some cleanup work, including the following:
>  * Cancel all AsyncSnapshotCallable thread.
>  * If the thread has finished, it will clean up all state object.
>  * If the thread has not completed, it will be interrupted(maybe).
>  * Close snapshotCloseableRegistry.
> In my case, the thread was interrupted while waiting for the file upload to 
> complete, but the file was not cleaned up.
> RocksDBStateUploader.java
> {code:java}
> FutureUtils.waitForAll(futures.values()).get();
> {code}
> It will wait for all files to be uploaded here. Although it has been 
> interrupted, the uploaded files will not be cleaned up. The remaining files 
> are mainly divided into:
>  * Files that have finished uploading before the thread is canceled.
>  * outputStream.closeAndGetHandle() is called, but snapshotCloseableRegistry 
> has not been closed.
> How to reproduce?
> Shorten the checkpoint timeout time, making the checkpoint fail. Then check 
> if there are any files in the shared directory.
> I'm testing on Flink-1.11, but I found the code from the latest branch may 
> have the same problem. I tried to fix it and it works well so far.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-28984) FsCheckpointStateOutputStream is not being released normally

2022-08-15 Thread ChangjiGuo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28984?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ChangjiGuo updated FLINK-28984:
---
Description: 
If the checkpoint is aborted, AsyncSnapshotCallable will close the 
snapshotCloseableRegistry when it is canceled. There may be two situations here:
 # The FSDataOutputStream has been created and closed while closing 
FsCheckpointStateOutputStream.
 # The FSDataOutputStream has not been created yet, but closed flag has been 
set to true. You can see this in log:
{code:java}
2022-08-16 12:55:44,161 WARN  
org.apache.flink.core.fs.SafetyNetCloseableRegistry   - Closing 
unclosed resource via safety-net: 
ClosingFSDataOutputStream(org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream@4ebe8e64)
 : 
x/flink/checkpoint/state/9214a2e302904b14baf2dc1aacbc7933/ae157c5a05a8922a46a179cdb4c86b10/shared/9d8a1e92-2f69-4ab0-8ce9-c1beb149229a
 {code}

        The output stream will be automatically closed by the 
SafetyNetCloseableRegistry but the file will not be deleted.

The second case usually occurs when the storage system has high latency in 
creating files.

 

  was:
If the checkpoint is aborted, AsyncSnapshotCallable will close the 
snapshotCloseableRegistry when it is canceled. There may be two situations here:
 # The FSDataOutputStream has been created and closed while closing 
FsCheckpointStateOutputStream.
 # The FSDataOutputStream has not been created yet, but closed flag has been 
set to true. You can see this in log:
{code:java}
2022-08-16 12:55:44,161 WARN  
org.apache.flink.core.fs.SafetyNetCloseableRegistry   - Closing 
unclosed resource via safety-net: 
ClosingFSDataOutputStream(org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream@4ebe8e64)
 : 
x/flink/checkpoint/state/9214a2e302904b14baf2dc1aacbc7933/ae157c5a05a8922a46a179cdb4c86b10/shared/9d8a1e92-2f69-4ab0-8ce9-c1beb149229a
 {code}

        The output stream will be automatically closed by the 
SafetyNetCloseableRegistry          but the file will not be deleted.

The second case usually occurs when the storage system has high latency in 
creating files.

 


> FsCheckpointStateOutputStream is not being released normally
> 
>
> Key: FLINK-28984
> URL: https://issues.apache.org/jira/browse/FLINK-28984
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.11.6, 1.15.1
>Reporter: ChangjiGuo
>Priority: Major
>
> If the checkpoint is aborted, AsyncSnapshotCallable will close the 
> snapshotCloseableRegistry when it is canceled. There may be two situations 
> here:
>  # The FSDataOutputStream has been created and closed while closing 
> FsCheckpointStateOutputStream.
>  # The FSDataOutputStream has not been created yet, but closed flag has been 
> set to true. You can see this in log:
> {code:java}
> 2022-08-16 12:55:44,161 WARN  
> org.apache.flink.core.fs.SafetyNetCloseableRegistry   - Closing 
> unclosed resource via safety-net: 
> ClosingFSDataOutputStream(org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream@4ebe8e64)
>  : 
> x/flink/checkpoint/state/9214a2e302904b14baf2dc1aacbc7933/ae157c5a05a8922a46a179cdb4c86b10/shared/9d8a1e92-2f69-4ab0-8ce9-c1beb149229a
>  {code}
>         The output stream will be automatically closed by the 
> SafetyNetCloseableRegistry but the file will not be deleted.
> The second case usually occurs when the storage system has high latency in 
> creating files.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-28984) FsCheckpointStateOutputStream is not being released normally

2022-08-15 Thread ChangjiGuo (Jira)
ChangjiGuo created FLINK-28984:
--

 Summary: FsCheckpointStateOutputStream is not being released 
normally
 Key: FLINK-28984
 URL: https://issues.apache.org/jira/browse/FLINK-28984
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Checkpointing
Affects Versions: 1.15.1, 1.11.6
Reporter: ChangjiGuo


If the checkpoint is aborted, AsyncSnapshotCallable will close the 
snapshotCloseableRegistry when it is canceled. There may be two situations here:
 # The FSDataOutputStream has been created and closed while closing 
FsCheckpointStateOutputStream.
 # The FSDataOutputStream has not been created yet, but closed flag has been 
set to true. You can see this in log:
{code:java}
2022-08-16 12:55:44,161 WARN  
org.apache.flink.core.fs.SafetyNetCloseableRegistry   - Closing 
unclosed resource via safety-net: 
ClosingFSDataOutputStream(org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream@4ebe8e64)
 : 
x/flink/checkpoint/state/9214a2e302904b14baf2dc1aacbc7933/ae157c5a05a8922a46a179cdb4c86b10/shared/9d8a1e92-2f69-4ab0-8ce9-c1beb149229a
 {code}

        The output stream will be automatically closed by the 
SafetyNetCloseableRegistry          but the file will not be deleted.

The second case usually occurs when the storage system has high latency in 
creating files.

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-28927) Can not clean up the uploaded shared files when the checkpoint fails

2022-08-15 Thread ChangjiGuo (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17579515#comment-17579515
 ] 

ChangjiGuo edited comment on FLINK-28927 at 8/15/22 11:25 AM:
--

[~yunta] Thanks for your replay. In my first solution, I caught the exception 
while waiting for the upload to complete and set a callback for each futures. 
It could delete most of the remaining files. But I found that there are some 
threads that are not interrupted normally, the files will upload complete and 
the solution above doesn't work well. 

In this 
method([https://github.com/apache/flink/blob/master/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java#L265]),
 the completed variable will be set true, and uploaded file is also not cleaned 
up. I'm still testing for this.

In short, interrupting a thread has some unexpected result. What do you think, 
looking forward to your reply. [~yunta]
 


was (Author: changjiguo):
[~yunta] Thanks for your replay. In my first solution, I caught the exception 
while waiting for the upload to complete and set a callback for all futures. It 
could delete most of the remaining files. But I found that there are some 
threads that are not interrupted normally, the files will upload complete and 
the solution above doesn't work well. 

In this 
method(https://github.com/apache/flink/blob/master/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java#L265),
 the completed variable will be set true, and uploaded file is also not cleaned 
up. I'm still testing for this. 

In short, interrupting a thread has some unexpected result. What do you think, 
looking forward to your reply. [~yunta]




> Can not clean up the uploaded shared files when the checkpoint fails
> 
>
> Key: FLINK-28927
> URL: https://issues.apache.org/jira/browse/FLINK-28927
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Affects Versions: 1.11.6, 1.15.1
> Environment: Flink-1.11
>Reporter: ChangjiGuo
>Priority: Major
>
> If a checkpoint times out, the task will cancel the AsyncCheckpointRunnable 
> thread and do some cleanup work, including the following:
>  * Cancel all AsyncSnapshotCallable thread.
>  * If the thread has finished, it will clean up all state object.
>  * If the thread has not completed, it will be interrupted(maybe).
>  * Close snapshotCloseableRegistry.
> In my case, the thread was interrupted while waiting for the file upload to 
> complete, but the file was not cleaned up.
> RocksDBStateUploader.java
> {code:java}
> FutureUtils.waitForAll(futures.values()).get();
> {code}
> It will wait for all files to be uploaded here. Although it has been 
> interrupted, the uploaded files will not be cleaned up. The remaining files 
> are mainly divided into:
>  * Files that have finished uploading before the thread is canceled.
>  * outputStream.closeAndGetHandle() is called, but snapshotCloseableRegistry 
> has not been closed.
> How to reproduce?
> Shorten the checkpoint timeout time, making the checkpoint fail. Then check 
> if there are any files in the shared directory.
> I'm testing on Flink-1.11, but I found the code from the latest branch may 
> have the same problem. I tried to fix it and it works well so far.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-28927) Can not clean up the uploaded shared files when the checkpoint fails

2022-08-15 Thread ChangjiGuo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28927?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ChangjiGuo updated FLINK-28927:
---
Summary: Can not clean up the uploaded shared files when the checkpoint 
fails  (was: Can not clean up the uploaded shared files when the checkpoint 
times out)

> Can not clean up the uploaded shared files when the checkpoint fails
> 
>
> Key: FLINK-28927
> URL: https://issues.apache.org/jira/browse/FLINK-28927
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Affects Versions: 1.11.6, 1.15.1
> Environment: Flink-1.11
>Reporter: ChangjiGuo
>Priority: Major
>
> If a checkpoint times out, the task will cancel the AsyncCheckpointRunnable 
> thread and do some cleanup work, including the following:
>  * Cancel all AsyncSnapshotCallable thread.
>  * If the thread has finished, it will clean up all state object.
>  * If the thread has not completed, it will be interrupted(maybe).
>  * Close snapshotCloseableRegistry.
> In my case, the thread was interrupted while waiting for the file upload to 
> complete, but the file was not cleaned up.
> RocksDBStateUploader.java
> {code:java}
> FutureUtils.waitForAll(futures.values()).get();
> {code}
> It will wait for all files to be uploaded here. Although it has been 
> interrupted, the uploaded files will not be cleaned up. The remaining files 
> are mainly divided into:
>  * Files that have finished uploading before the thread is canceled.
>  * outputStream.closeAndGetHandle() is called, but snapshotCloseableRegistry 
> has not been closed.
> How to reproduce?
> Shorten the checkpoint timeout time, making the checkpoint fail. Then check 
> if there are any files in the shared directory.
> I'm testing on Flink-1.11, but I found the code from the latest branch may 
> have the same problem. I tried to fix it and it works well so far.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-28927) Can not clean up the uploaded shared files when the checkpoint times out

2022-08-14 Thread ChangjiGuo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28927?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ChangjiGuo updated FLINK-28927:
---
Description: 
If a checkpoint times out, the task will cancel the AsyncCheckpointRunnable 
thread and do some cleanup work, including the following:
 * Cancel all AsyncSnapshotCallable thread.
 * If the thread has finished, it will clean up all state object.
 * If the thread has not completed, it will be interrupted(maybe).
 * Close snapshotCloseableRegistry.

In my case, the thread was interrupted while waiting for the file upload to 
complete, but the file was not cleaned up.

RocksDBStateUploader.java
{code:java}
FutureUtils.waitForAll(futures.values()).get();
{code}
It will wait for all files to be uploaded here. Although it has been 
interrupted, the uploaded files will not be cleaned up. The remaining files are 
mainly divided into:
 * Files that have finished uploading before the thread is canceled.
 * outputStream.closeAndGetHandle() is called, but snapshotCloseableRegistry 
has not been closed.

How to reproduce?
Shorten the checkpoint timeout time, making the checkpoint fail. Then check if 
there are any files in the shared directory.

I'm testing on Flink-1.11, but I found the code from the latest branch may have 
the same problem. I tried to fix it and it works well so far.

  was:
If a checkpoint times out, the task will cancel the AsyncCheckpointRunnable 
thread and do some cleanup work, including the following:
* Cancel all AsyncSnapshotCallable thread.
* If the thread has finished, it will clean up all state object.
* If the thread has not completed, it will be interrupted.
* Close snapshotCloseableRegistry.

In my case, the thread was interrupted while waiting for the file upload to 
complete, but the file was not cleaned up.

RocksDBStateUploader.java
{code:java}
FutureUtils.waitForAll(futures.values()).get();
{code}

It will wait for all files to be uploaded here. Although it has been 
interrupted, the uploaded files will not be cleaned up. The remaining files are 
mainly divided into:
* Files that have finished uploading before the thread is canceled.
* outputStream.closeAndGetHandle() is called, but snapshotCloseableRegistry has 
not been closed.

How to reproduce?
Shorten the checkpoint timeout time, making the checkpoint fail. Then check if 
there are any files in the shared directory.

I'm testing on Flink-1.11, but I found the code from the latest branch may have 
the same problem. I tried to fix it and it works well so far.


> Can not clean up the uploaded shared files when the checkpoint times out
> 
>
> Key: FLINK-28927
> URL: https://issues.apache.org/jira/browse/FLINK-28927
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Affects Versions: 1.11.6, 1.15.1
> Environment: Flink-1.11
>Reporter: ChangjiGuo
>Priority: Major
>
> If a checkpoint times out, the task will cancel the AsyncCheckpointRunnable 
> thread and do some cleanup work, including the following:
>  * Cancel all AsyncSnapshotCallable thread.
>  * If the thread has finished, it will clean up all state object.
>  * If the thread has not completed, it will be interrupted(maybe).
>  * Close snapshotCloseableRegistry.
> In my case, the thread was interrupted while waiting for the file upload to 
> complete, but the file was not cleaned up.
> RocksDBStateUploader.java
> {code:java}
> FutureUtils.waitForAll(futures.values()).get();
> {code}
> It will wait for all files to be uploaded here. Although it has been 
> interrupted, the uploaded files will not be cleaned up. The remaining files 
> are mainly divided into:
>  * Files that have finished uploading before the thread is canceled.
>  * outputStream.closeAndGetHandle() is called, but snapshotCloseableRegistry 
> has not been closed.
> How to reproduce?
> Shorten the checkpoint timeout time, making the checkpoint fail. Then check 
> if there are any files in the shared directory.
> I'm testing on Flink-1.11, but I found the code from the latest branch may 
> have the same problem. I tried to fix it and it works well so far.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-28927) Can not clean up the uploaded shared files when the checkpoint times out

2022-08-14 Thread ChangjiGuo (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17579515#comment-17579515
 ] 

ChangjiGuo commented on FLINK-28927:


[~yunta] Thanks for your replay. In my first solution, I caught the exception 
while waiting for the upload to complete and set a callback for all futures. It 
could delete most of the remaining files. But I found that there are some 
threads that are not interrupted normally, the files will upload complete and 
the solution above doesn't work well. 

In this 
method(https://github.com/apache/flink/blob/master/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java#L265),
 the completed variable will be set true, and uploaded file is also not cleaned 
up. I'm still testing for this. 

In short, interrupting a thread has some unexpected result. What do you think, 
looking forward to your reply. [~yunta]




> Can not clean up the uploaded shared files when the checkpoint times out
> 
>
> Key: FLINK-28927
> URL: https://issues.apache.org/jira/browse/FLINK-28927
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Affects Versions: 1.11.6, 1.15.1
> Environment: Flink-1.11
>Reporter: ChangjiGuo
>Priority: Major
>
> If a checkpoint times out, the task will cancel the AsyncCheckpointRunnable 
> thread and do some cleanup work, including the following:
> * Cancel all AsyncSnapshotCallable thread.
> * If the thread has finished, it will clean up all state object.
> * If the thread has not completed, it will be interrupted.
> * Close snapshotCloseableRegistry.
> In my case, the thread was interrupted while waiting for the file upload to 
> complete, but the file was not cleaned up.
> RocksDBStateUploader.java
> {code:java}
> FutureUtils.waitForAll(futures.values()).get();
> {code}
> It will wait for all files to be uploaded here. Although it has been 
> interrupted, the uploaded files will not be cleaned up. The remaining files 
> are mainly divided into:
> * Files that have finished uploading before the thread is canceled.
> * outputStream.closeAndGetHandle() is called, but snapshotCloseableRegistry 
> has not been closed.
> How to reproduce?
> Shorten the checkpoint timeout time, making the checkpoint fail. Then check 
> if there are any files in the shared directory.
> I'm testing on Flink-1.11, but I found the code from the latest branch may 
> have the same problem. I tried to fix it and it works well so far.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-28927) Can not clean up the uploaded shared files when the checkpoint times out

2022-08-11 Thread ChangjiGuo (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17578463#comment-17578463
 ] 

ChangjiGuo edited comment on FLINK-28927 at 8/11/22 1:08 PM:
-

Can anyone take a look? :)
 


was (Author: changjiguo):
Can anyone help me take a look? :)

> Can not clean up the uploaded shared files when the checkpoint times out
> 
>
> Key: FLINK-28927
> URL: https://issues.apache.org/jira/browse/FLINK-28927
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Affects Versions: 1.11.6, 1.15.1
> Environment: Flink-1.11
>Reporter: ChangjiGuo
>Priority: Major
>
> If a checkpoint times out, the task will cancel the AsyncCheckpointRunnable 
> thread and do some cleanup work, including the following:
> * Cancel all AsyncSnapshotCallable thread.
> * If the thread has finished, it will clean up all state object.
> * If the thread has not completed, it will be interrupted.
> * Close snapshotCloseableRegistry.
> In my case, the thread was interrupted while waiting for the file upload to 
> complete, but the file was not cleaned up.
> RocksDBStateUploader.java
> {code:java}
> FutureUtils.waitForAll(futures.values()).get();
> {code}
> It will wait for all files to be uploaded here. Although it has been 
> interrupted, the uploaded files will not be cleaned up. The remaining files 
> are mainly divided into:
> * Files that have finished uploading before the thread is canceled.
> * outputStream.closeAndGetHandle() is called, but snapshotCloseableRegistry 
> has not been closed.
> How to reproduce?
> Shorten the checkpoint timeout time, making the checkpoint fail. Then check 
> if there are any files in the shared directory.
> I'm testing on Flink-1.11, but I found the code from the latest branch may 
> have the same problem. I tried to fix it and it works well so far.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-28927) Can not clean up the uploaded shared files when the checkpoint times out

2022-08-11 Thread ChangjiGuo (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17578463#comment-17578463
 ] 

ChangjiGuo edited comment on FLINK-28927 at 8/11/22 12:50 PM:
--

Can anyone help me take a look? :)


was (Author: changjiguo):
Can anyone help me take a look? ^_^

> Can not clean up the uploaded shared files when the checkpoint times out
> 
>
> Key: FLINK-28927
> URL: https://issues.apache.org/jira/browse/FLINK-28927
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Affects Versions: 1.11.6, 1.15.1
> Environment: Flink-1.11
>Reporter: ChangjiGuo
>Priority: Major
>
> If a checkpoint times out, the task will cancel the AsyncCheckpointRunnable 
> thread and do some cleanup work, including the following:
> * Cancel all AsyncSnapshotCallable thread.
> * If the thread has finished, it will clean up all state object.
> * If the thread has not completed, it will be interrupted.
> * Close snapshotCloseableRegistry.
> In my case, the thread was interrupted while waiting for the file upload to 
> complete, but the file was not cleaned up.
> RocksDBStateUploader.java
> {code:java}
> FutureUtils.waitForAll(futures.values()).get();
> {code}
> It will wait for all files to be uploaded here. Although it has been 
> interrupted, the uploaded files will not be cleaned up. The remaining files 
> are mainly divided into:
> * Files that have finished uploading before the thread is canceled.
> * outputStream.closeAndGetHandle() is called, but snapshotCloseableRegistry 
> has not been closed.
> How to reproduce?
> Shorten the checkpoint timeout time, making the checkpoint fail. Then check 
> if there are any files in the shared directory.
> I'm testing on Flink-1.11, but I found the code from the latest branch may 
> have the same problem. I tried to fix it and it works well so far.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-28927) Can not clean up the uploaded shared files when the checkpoint times out

2022-08-11 Thread ChangjiGuo (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17578463#comment-17578463
 ] 

ChangjiGuo commented on FLINK-28927:


Can anyone help me take a look? ^_^

> Can not clean up the uploaded shared files when the checkpoint times out
> 
>
> Key: FLINK-28927
> URL: https://issues.apache.org/jira/browse/FLINK-28927
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Affects Versions: 1.11.6, 1.15.1
> Environment: Flink-1.11
>Reporter: ChangjiGuo
>Priority: Major
>
> If a checkpoint times out, the task will cancel the AsyncCheckpointRunnable 
> thread and do some cleanup work, including the following:
> * Cancel all AsyncSnapshotCallable thread.
> * If the thread has finished, it will clean up all state object.
> * If the thread has not completed, it will be interrupted.
> * Close snapshotCloseableRegistry.
> In my case, the thread was interrupted while waiting for the file upload to 
> complete, but the file was not cleaned up.
> RocksDBStateUploader.java
> {code:java}
> FutureUtils.waitForAll(futures.values()).get();
> {code}
> It will wait for all files to be uploaded here. Although it has been 
> interrupted, the uploaded files will not be cleaned up. The remaining files 
> are mainly divided into:
> * Files that have finished uploading before the thread is canceled.
> * outputStream.closeAndGetHandle() is called, but snapshotCloseableRegistry 
> has not been closed.
> How to reproduce?
> Shorten the checkpoint timeout time, making the checkpoint fail. Then check 
> if there are any files in the shared directory.
> I'm testing on Flink-1.11, but I found the code from the latest branch may 
> have the same problem. I tried to fix it and it works well so far.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-28927) Can not clean up the uploaded shared files when the checkpoint times out

2022-08-11 Thread ChangjiGuo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28927?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ChangjiGuo updated FLINK-28927:
---
Description: 
If a checkpoint times out, the task will cancel the AsyncCheckpointRunnable 
thread and do some cleanup work, including the following:
* Cancel all AsyncSnapshotCallable thread.
* If the thread has finished, it will clean up all state object.
* If the thread has not completed, it will be interrupted.
* Close snapshotCloseableRegistry.

In my case, the thread was interrupted while waiting for the file upload to 
complete, but the file was not cleaned up.

RocksDBStateUploader.java
{code:java}
FutureUtils.waitForAll(futures.values()).get();
{code}

It will wait for all files to be uploaded here. Although it has been 
interrupted, the uploaded files will not be cleaned up. The remaining files are 
mainly divided into:
* Files that have finished uploading before the thread is canceled.
* outputStream.closeAndGetHandle() is called, but snapshotCloseableRegistry has 
not been closed.

How to reproduce?
Shorten the checkpoint timeout time, making the checkpoint fail. Then check if 
there are any files in the shared directory.

I'm testing on Flink-1.11, but I found the code from the latest branch may have 
the same problem. I tried to fix it and it works well so far.

  was:
If a checkpoint times out, the task will cancel the AsyncCheckpointRunnable 
thread and do some cleanup work, including the following:
* Cancel all AsyncSnapshotCallable thread.
* If the thread has finished, it will clean up all state object.
* If the thread has not completed, it will be interrupted.
* Close snapshotCloseableRegistry.

In my case, the thread was interrupted while waiting for the file upload to 
complete, but the file was not cleaned up.

RocksDBStateUploader.java
{code:java}
FutureUtils.waitForAll(futures.values()).get();
{code}

It will wait for all files to be uploaded here. Although it has been 
interrupted, the uploaded files will not be cleaned up. The remaining files are 
mainly divided into:
* Files that have finished uploading before the thread is canceled.
* outputStream.closeAndGetHandle() is called, but snapshotCloseableRegistry has 
not been closed.

How to reproduce?
Shorten the checkpoint timeout time, making the checkpoint fail. Then check if 
there are any files in the shared directory.

I'm testing on Flink-1.11, but I look at the code from the latest branch and 
there may be the same problem here. I tried to fix it and it works well so far.


> Can not clean up the uploaded shared files when the checkpoint times out
> 
>
> Key: FLINK-28927
> URL: https://issues.apache.org/jira/browse/FLINK-28927
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Affects Versions: 1.11.6, 1.15.1
> Environment: Flink-1.11
>Reporter: ChangjiGuo
>Priority: Major
>
> If a checkpoint times out, the task will cancel the AsyncCheckpointRunnable 
> thread and do some cleanup work, including the following:
> * Cancel all AsyncSnapshotCallable thread.
> * If the thread has finished, it will clean up all state object.
> * If the thread has not completed, it will be interrupted.
> * Close snapshotCloseableRegistry.
> In my case, the thread was interrupted while waiting for the file upload to 
> complete, but the file was not cleaned up.
> RocksDBStateUploader.java
> {code:java}
> FutureUtils.waitForAll(futures.values()).get();
> {code}
> It will wait for all files to be uploaded here. Although it has been 
> interrupted, the uploaded files will not be cleaned up. The remaining files 
> are mainly divided into:
> * Files that have finished uploading before the thread is canceled.
> * outputStream.closeAndGetHandle() is called, but snapshotCloseableRegistry 
> has not been closed.
> How to reproduce?
> Shorten the checkpoint timeout time, making the checkpoint fail. Then check 
> if there are any files in the shared directory.
> I'm testing on Flink-1.11, but I found the code from the latest branch may 
> have the same problem. I tried to fix it and it works well so far.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-28927) Can not clean up the uploaded shared files when the checkpoint times out

2022-08-11 Thread ChangjiGuo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28927?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ChangjiGuo updated FLINK-28927:
---
Summary: Can not clean up the uploaded shared files when the checkpoint 
times out  (was: When the checkpoint times out, the uploaded shared files are 
not cleaned up)

> Can not clean up the uploaded shared files when the checkpoint times out
> 
>
> Key: FLINK-28927
> URL: https://issues.apache.org/jira/browse/FLINK-28927
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Affects Versions: 1.11.6, 1.15.1
> Environment: Flink-1.11
>Reporter: ChangjiGuo
>Priority: Major
>
> If a checkpoint times out, the task will cancel the AsyncCheckpointRunnable 
> thread and do some cleanup work, including the following:
> * Cancel all AsyncSnapshotCallable thread.
> * If the thread has finished, it will clean up all state object.
> * If the thread has not completed, it will be interrupted.
> * Close snapshotCloseableRegistry.
> In my case, the thread was interrupted while waiting for the file upload to 
> complete, but the file was not cleaned up.
> RocksDBStateUploader.java
> {code:java}
> FutureUtils.waitForAll(futures.values()).get();
> {code}
> It will wait for all files to be uploaded here. Although it has been 
> interrupted, the uploaded files will not be cleaned up. The remaining files 
> are mainly divided into:
> * Files that have finished uploading before the thread is canceled.
> * outputStream.closeAndGetHandle() is called, but snapshotCloseableRegistry 
> has not been closed.
> How to reproduce?
> Shorten the checkpoint timeout time, making the checkpoint fail. Then check 
> if there are any files in the shared directory.
> I'm testing on Flink-1.11, but I look at the code from the latest branch and 
> there may be the same problem here. I tried to fix it and it works well so 
> far.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-28927) When the checkpoint times out, the uploaded shared files are not cleaned up

2022-08-11 Thread ChangjiGuo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28927?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ChangjiGuo updated FLINK-28927:
---
Affects Version/s: 1.15.1
   1.11.6

> When the checkpoint times out, the uploaded shared files are not cleaned up
> ---
>
> Key: FLINK-28927
> URL: https://issues.apache.org/jira/browse/FLINK-28927
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Affects Versions: 1.11.6, 1.15.1
> Environment: Flink-1.11
>Reporter: ChangjiGuo
>Priority: Major
>
> If a checkpoint times out, the task will cancel the AsyncCheckpointRunnable 
> thread and do some cleanup work, including the following:
> * Cancel all AsyncSnapshotCallable thread.
> * If the thread has finished, it will clean up all state object.
> * If the thread has not completed, it will be interrupted.
> * Close snapshotCloseableRegistry.
> In my case, the thread was interrupted while waiting for the file upload to 
> complete, but the file was not cleaned up.
> RocksDBStateUploader.java
> {code:java}
> FutureUtils.waitForAll(futures.values()).get();
> {code}
> It will wait for all files to be uploaded here. Although it has been 
> interrupted, the uploaded files will not be cleaned up. The remaining files 
> are mainly divided into:
> * Files that have finished uploading before the thread is canceled.
> * outputStream.closeAndGetHandle() is called, but snapshotCloseableRegistry 
> has not been closed.
> How to reproduce?
> Shorten the checkpoint timeout time, making the checkpoint fail. Then check 
> if there are any files in the shared directory.
> I'm testing on Flink-1.11, but I look at the code from the latest branch and 
> there may be the same problem here. I tried to fix it and it works well so 
> far.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-28927) When the checkpoint times out, the uploaded shared files are not cleaned up

2022-08-11 Thread ChangjiGuo (Jira)
ChangjiGuo created FLINK-28927:
--

 Summary: When the checkpoint times out, the uploaded shared files 
are not cleaned up
 Key: FLINK-28927
 URL: https://issues.apache.org/jira/browse/FLINK-28927
 Project: Flink
  Issue Type: Bug
  Components: Runtime / State Backends
 Environment: Flink-1.11
Reporter: ChangjiGuo


If a checkpoint times out, the task will cancel the AsyncCheckpointRunnable 
thread and do some cleanup work, including the following:
* Cancel all AsyncSnapshotCallable thread.
* If the thread has finished, it will clean up all state object.
* If the thread has not completed, it will be interrupted.
* Close snapshotCloseableRegistry.

In my case, the thread was interrupted while waiting for the file upload to 
complete, but the file was not cleaned up.

RocksDBStateUploader.java
{code:java}
FutureUtils.waitForAll(futures.values()).get();
{code}

It will wait for all files to be uploaded here. Although it has been 
interrupted, the uploaded files will not be cleaned up. The remaining files are 
mainly divided into:
* Files that have finished uploading before the thread is canceled.
* outputStream.closeAndGetHandle() is called, but snapshotCloseableRegistry has 
not been closed.

How to reproduce?
Shorten the checkpoint timeout time, making the checkpoint fail. Then check if 
there are any files in the shared directory.

I'm testing on Flink-1.11, but I look at the code from the latest branch and 
there may be the same problem here. I tried to fix it and it works well so far.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-25475) When windowAgg and groupAgg are included at the same time, there is no assigner generated but MiniBatch optimization is still used.

2022-01-25 Thread ChangjiGuo (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25475?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17482183#comment-17482183
 ] 

ChangjiGuo commented on FLINK-25475:


Hi, [~xuyangzhong]. Thanks for your reply!
Similar to this kind of sql:
{code:sql}
SELECT b, SUM(cnt)
 FROM (
   SELECT b,
 COUNT(a) as cnt,
 HOP_START(rowtime, INTERVAL '5' SECOND, INTERVAL '6' SECOND) as w_start,
 HOP_END(rowtime, INTERVAL '5' SECOND, INTERVAL '6' SECOND) as w_end
   FROM wmTable1
   GROUP BY b, HOP(rowtime, INTERVAL '5' SECOND, INTERVAL '6' SECOND)
 )
 GROUP BY b
{code}


> When windowAgg and groupAgg are included at the same time, there is no 
> assigner generated but MiniBatch optimization is still used.
> ---
>
> Key: FLINK-25475
> URL: https://issues.apache.org/jira/browse/FLINK-25475
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.14.2
>Reporter: ChangjiGuo
>Priority: Major
> Attachments: image-2021-12-29-16-04-50-211.png, 
> image-2021-12-29-16-05-15-519.png
>
>
> If the relNode has both windowAgg and groupAgg, MiniBatchIntervalInferRule 
> will not add StreamExecMiniBatchAssigner node, but MiniBatchGroupAggFunction 
> or MiniBatchLocalGroupAggFunction or MiniBatchGlobalGroupAggFunction will 
> still be generated when translated into transformation.
> It will only judge whether to enable minibacth.
> {code:java}
> val operator = if (isMiniBatchEnabled) {
>   val aggFunction = new MiniBatchGroupAggFunction(
> aggsHandler,
> recordEqualiser,
> accTypes,
> inputRowType,
> inputCountIndex,
> generateUpdateBefore)
>   new KeyedMapBundleOperator(
> aggFunction,
> AggregateUtil.createMiniBatchTrigger(tableConfig))
> } else {
>   val aggFunction = new GroupAggFunction(
> tableConfig.getMinIdleStateRetentionTime,
> tableConfig.getMaxIdleStateRetentionTime,
> aggsHandler,
> recordEqualiser,
> accTypes,
> inputCountIndex,
> generateUpdateBefore)
>   val operator = new KeyedProcessOperator[RowData, RowData, 
> RowData](aggFunction)
>   operator
> } {code}
> for example:
> before:
> !image-2021-12-29-16-04-50-211.png!
> after:
> !image-2021-12-29-16-05-15-519.png!
> The WatermarkAssigner will send watermark to downstream, and the finishBundle 
> method will be called frequently, which does not match the expected result.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-25475) When windowAgg and groupAgg are included at the same time, there is no assigner generated but MiniBatch optimization is still used.

2022-01-06 Thread ChangjiGuo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25475?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ChangjiGuo updated FLINK-25475:
---
Affects Version/s: 1.14.2

> When windowAgg and groupAgg are included at the same time, there is no 
> assigner generated but MiniBatch optimization is still used.
> ---
>
> Key: FLINK-25475
> URL: https://issues.apache.org/jira/browse/FLINK-25475
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.14.2
>Reporter: ChangjiGuo
>Priority: Major
> Attachments: image-2021-12-29-16-04-50-211.png, 
> image-2021-12-29-16-05-15-519.png
>
>
> If the relNode has both windowAgg and groupAgg, MiniBatchIntervalInferRule 
> will not add StreamExecMiniBatchAssigner node, but MiniBatchGroupAggFunction 
> or MiniBatchLocalGroupAggFunction or MiniBatchGlobalGroupAggFunction will 
> still be generated when translated into transformation.
> It will only judge whether to enable minibacth.
> {code:java}
> val operator = if (isMiniBatchEnabled) {
>   val aggFunction = new MiniBatchGroupAggFunction(
> aggsHandler,
> recordEqualiser,
> accTypes,
> inputRowType,
> inputCountIndex,
> generateUpdateBefore)
>   new KeyedMapBundleOperator(
> aggFunction,
> AggregateUtil.createMiniBatchTrigger(tableConfig))
> } else {
>   val aggFunction = new GroupAggFunction(
> tableConfig.getMinIdleStateRetentionTime,
> tableConfig.getMaxIdleStateRetentionTime,
> aggsHandler,
> recordEqualiser,
> accTypes,
> inputCountIndex,
> generateUpdateBefore)
>   val operator = new KeyedProcessOperator[RowData, RowData, 
> RowData](aggFunction)
>   operator
> } {code}
> for example:
> before:
> !image-2021-12-29-16-04-50-211.png!
> after:
> !image-2021-12-29-16-05-15-519.png!
> The WatermarkAssigner will send watermark to downstream, and the finishBundle 
> method will be called frequently, which does not match the expected result.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Comment Edited] (FLINK-25475) When windowAgg and groupAgg are included at the same time, there is no assigner generated but MiniBatch optimization is still used.

2021-12-30 Thread ChangjiGuo (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25475?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17467081#comment-17467081
 ] 

ChangjiGuo edited comment on FLINK-25475 at 12/31/21, 5:55 AM:
---

Are there any side effects if the StreamExecMiniBatchAssigner node is added to 
the front of StreamExecGroupAggregate node? As far as I know, the current 
implementation is added after the source node or watermark node.


was (Author: changjiguo):
Are there any side effects if the StreamExecMiniBatchAssigner node is added to 
the front of StreamExecGroupAggregate node? As far as I know, the current 
implementation is added in front of the source node or watermark node.

> When windowAgg and groupAgg are included at the same time, there is no 
> assigner generated but MiniBatch optimization is still used.
> ---
>
> Key: FLINK-25475
> URL: https://issues.apache.org/jira/browse/FLINK-25475
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Reporter: ChangjiGuo
>Priority: Major
> Attachments: image-2021-12-29-16-04-50-211.png, 
> image-2021-12-29-16-05-15-519.png
>
>
> If the relNode has both windowAgg and groupAgg, MiniBatchIntervalInferRule 
> will not add StreamExecMiniBatchAssigner node, but MiniBatchGroupAggFunction 
> or MiniBatchLocalGroupAggFunction or MiniBatchGlobalGroupAggFunction will 
> still be generated when translated into transformation.
> It will only judge whether to enable minibacth.
> {code:java}
> val operator = if (isMiniBatchEnabled) {
>   val aggFunction = new MiniBatchGroupAggFunction(
> aggsHandler,
> recordEqualiser,
> accTypes,
> inputRowType,
> inputCountIndex,
> generateUpdateBefore)
>   new KeyedMapBundleOperator(
> aggFunction,
> AggregateUtil.createMiniBatchTrigger(tableConfig))
> } else {
>   val aggFunction = new GroupAggFunction(
> tableConfig.getMinIdleStateRetentionTime,
> tableConfig.getMaxIdleStateRetentionTime,
> aggsHandler,
> recordEqualiser,
> accTypes,
> inputCountIndex,
> generateUpdateBefore)
>   val operator = new KeyedProcessOperator[RowData, RowData, 
> RowData](aggFunction)
>   operator
> } {code}
> for example:
> before:
> !image-2021-12-29-16-04-50-211.png!
> after:
> !image-2021-12-29-16-05-15-519.png!
> The WatermarkAssigner will send watermark to downstream, and the finishBundle 
> method will be called frequently, which does not match the expected result.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-25475) When windowAgg and groupAgg are included at the same time, there is no assigner generated but MiniBatch optimization is still used.

2021-12-30 Thread ChangjiGuo (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25475?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17467081#comment-17467081
 ] 

ChangjiGuo commented on FLINK-25475:


Are there any side effects if the StreamExecMiniBatchAssigner node is added to 
the front of StreamExecGroupAggregate node? As far as I know, the current 
implementation is added in front of the source node or watermark node.

> When windowAgg and groupAgg are included at the same time, there is no 
> assigner generated but MiniBatch optimization is still used.
> ---
>
> Key: FLINK-25475
> URL: https://issues.apache.org/jira/browse/FLINK-25475
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Reporter: ChangjiGuo
>Priority: Major
> Attachments: image-2021-12-29-16-04-50-211.png, 
> image-2021-12-29-16-05-15-519.png
>
>
> If the relNode has both windowAgg and groupAgg, MiniBatchIntervalInferRule 
> will not add StreamExecMiniBatchAssigner node, but MiniBatchGroupAggFunction 
> or MiniBatchLocalGroupAggFunction or MiniBatchGlobalGroupAggFunction will 
> still be generated when translated into transformation.
> It will only judge whether to enable minibacth.
> {code:java}
> val operator = if (isMiniBatchEnabled) {
>   val aggFunction = new MiniBatchGroupAggFunction(
> aggsHandler,
> recordEqualiser,
> accTypes,
> inputRowType,
> inputCountIndex,
> generateUpdateBefore)
>   new KeyedMapBundleOperator(
> aggFunction,
> AggregateUtil.createMiniBatchTrigger(tableConfig))
> } else {
>   val aggFunction = new GroupAggFunction(
> tableConfig.getMinIdleStateRetentionTime,
> tableConfig.getMaxIdleStateRetentionTime,
> aggsHandler,
> recordEqualiser,
> accTypes,
> inputCountIndex,
> generateUpdateBefore)
>   val operator = new KeyedProcessOperator[RowData, RowData, 
> RowData](aggFunction)
>   operator
> } {code}
> for example:
> before:
> !image-2021-12-29-16-04-50-211.png!
> after:
> !image-2021-12-29-16-05-15-519.png!
> The WatermarkAssigner will send watermark to downstream, and the finishBundle 
> method will be called frequently, which does not match the expected result.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-25475) When windowAgg and groupAgg are included at the same time, there is no assigner generated but MiniBatch optimization is still used.

2021-12-29 Thread ChangjiGuo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25475?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ChangjiGuo updated FLINK-25475:
---
Affects Version/s: (was: 1.14.0)

> When windowAgg and groupAgg are included at the same time, there is no 
> assigner generated but MiniBatch optimization is still used.
> ---
>
> Key: FLINK-25475
> URL: https://issues.apache.org/jira/browse/FLINK-25475
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Reporter: ChangjiGuo
>Priority: Major
> Attachments: image-2021-12-29-16-04-50-211.png, 
> image-2021-12-29-16-05-15-519.png
>
>
> If the relNode has both windowAgg and groupAgg, MiniBatchIntervalInferRule 
> will not add StreamExecMiniBatchAssigner node, but MiniBatchGroupAggFunction 
> or MiniBatchLocalGroupAggFunction or MiniBatchGlobalGroupAggFunction will 
> still be generated when translated into transformation.
> It will only judge whether to enable minibacth.
> {code:java}
> val operator = if (isMiniBatchEnabled) {
>   val aggFunction = new MiniBatchGroupAggFunction(
> aggsHandler,
> recordEqualiser,
> accTypes,
> inputRowType,
> inputCountIndex,
> generateUpdateBefore)
>   new KeyedMapBundleOperator(
> aggFunction,
> AggregateUtil.createMiniBatchTrigger(tableConfig))
> } else {
>   val aggFunction = new GroupAggFunction(
> tableConfig.getMinIdleStateRetentionTime,
> tableConfig.getMaxIdleStateRetentionTime,
> aggsHandler,
> recordEqualiser,
> accTypes,
> inputCountIndex,
> generateUpdateBefore)
>   val operator = new KeyedProcessOperator[RowData, RowData, 
> RowData](aggFunction)
>   operator
> } {code}
> for example:
> before:
> !image-2021-12-29-16-04-50-211.png!
> after:
> !image-2021-12-29-16-05-15-519.png!
> The WatermarkAssigner will send watermark to downstream, and the finishBundle 
> method will be called frequently, which does not match the expected result.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-25475) When windowAgg and groupAgg are included at the same time, there is no assigner generated but MiniBatch optimization is still used.

2021-12-29 Thread ChangjiGuo (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25475?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17466345#comment-17466345
 ] 

ChangjiGuo commented on FLINK-25475:


Hello [~jark], Can you help me review this issue?

> When windowAgg and groupAgg are included at the same time, there is no 
> assigner generated but MiniBatch optimization is still used.
> ---
>
> Key: FLINK-25475
> URL: https://issues.apache.org/jira/browse/FLINK-25475
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.14.0
>Reporter: ChangjiGuo
>Priority: Major
> Attachments: image-2021-12-29-16-04-50-211.png, 
> image-2021-12-29-16-05-15-519.png
>
>
> If the relNode has both windowAgg and groupAgg, MiniBatchIntervalInferRule 
> will not add StreamExecMiniBatchAssigner node, but MiniBatchGroupAggFunction 
> or MiniBatchLocalGroupAggFunction or MiniBatchGlobalGroupAggFunction will 
> still be generated when translated into transformation.
> It will only judge whether to enable minibacth.
> {code:java}
> val operator = if (isMiniBatchEnabled) {
>   val aggFunction = new MiniBatchGroupAggFunction(
> aggsHandler,
> recordEqualiser,
> accTypes,
> inputRowType,
> inputCountIndex,
> generateUpdateBefore)
>   new KeyedMapBundleOperator(
> aggFunction,
> AggregateUtil.createMiniBatchTrigger(tableConfig))
> } else {
>   val aggFunction = new GroupAggFunction(
> tableConfig.getMinIdleStateRetentionTime,
> tableConfig.getMaxIdleStateRetentionTime,
> aggsHandler,
> recordEqualiser,
> accTypes,
> inputCountIndex,
> generateUpdateBefore)
>   val operator = new KeyedProcessOperator[RowData, RowData, 
> RowData](aggFunction)
>   operator
> } {code}
> for example:
> before:
> !image-2021-12-29-16-04-50-211.png!
> after:
> !image-2021-12-29-16-05-15-519.png!
> The WatermarkAssigner will send watermark to downstream, and the finishBundle 
> method will be called frequently, which does not match the expected result.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25475) When windowAgg and groupAgg are included at the same time, there is no assigner generated but MiniBatch optimization is still used.

2021-12-29 Thread ChangjiGuo (Jira)
ChangjiGuo created FLINK-25475:
--

 Summary: When windowAgg and groupAgg are included at the same 
time, there is no assigner generated but MiniBatch optimization is still used.
 Key: FLINK-25475
 URL: https://issues.apache.org/jira/browse/FLINK-25475
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Affects Versions: 1.14.0
Reporter: ChangjiGuo
 Attachments: image-2021-12-29-16-04-50-211.png, 
image-2021-12-29-16-05-15-519.png

If the relNode has both windowAgg and groupAgg, MiniBatchIntervalInferRule will 
not add StreamExecMiniBatchAssigner node, but MiniBatchGroupAggFunction or 
MiniBatchLocalGroupAggFunction or MiniBatchGlobalGroupAggFunction will still be 
generated when translated into transformation.

It will only judge whether to enable minibacth.
{code:java}
val operator = if (isMiniBatchEnabled) {
  val aggFunction = new MiniBatchGroupAggFunction(
aggsHandler,
recordEqualiser,
accTypes,
inputRowType,
inputCountIndex,
generateUpdateBefore)

  new KeyedMapBundleOperator(
aggFunction,
AggregateUtil.createMiniBatchTrigger(tableConfig))
} else {
  val aggFunction = new GroupAggFunction(
tableConfig.getMinIdleStateRetentionTime,
tableConfig.getMaxIdleStateRetentionTime,
aggsHandler,
recordEqualiser,
accTypes,
inputCountIndex,
generateUpdateBefore)

  val operator = new KeyedProcessOperator[RowData, RowData, 
RowData](aggFunction)
  operator
} {code}
for example:

before:

!image-2021-12-29-16-04-50-211.png!

after:
!image-2021-12-29-16-05-15-519.png!

The WatermarkAssigner will send watermark to downstream, and the finishBundle 
method will be called frequently, which does not match the expected result.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Closed] (FLINK-22497) When using DefaultRollingPolicy in StreamingFileSink, the file will be finished delayed

2021-07-23 Thread ChangjiGuo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22497?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ChangjiGuo closed FLINK-22497.
--
Resolution: Not A Bug

> When using DefaultRollingPolicy in StreamingFileSink, the file will be 
> finished delayed
> ---
>
> Key: FLINK-22497
> URL: https://issues.apache.org/jira/browse/FLINK-22497
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / FileSystem
>Affects Versions: 1.11.2
> Environment: hadoop-2.8.4,Flink-1.11.2
>Reporter: ChangjiGuo
>Priority: Minor
>  Labels: auto-deprioritized-major
> Attachments: 1.png
>
>
> I had a doubt when testing StreamingFileSink:
> The default 60s rolling interval in DefaultRollingPolicy is detected by 
> procTimeService. If the rolling interval is not met this time, it will be 
> delayed to the next timer trigger point (after 60s), so this is not real-time 
> and does not match the maximum duration. For example, if the checkpoint 
> period is set to 60s, the file should be converted to finished at the second 
> checkpoint, but it will be delayed to the third checkpoint.
> You can refer to the attached picture for detail.
> If we add rollingPolicy.shouldRollOnProcessingTime to the if condition of 
> Bucket.write method, the file will be set to finished as we expect at the 
> second checkpoint.
> {code:java}
> void write(IN element, long currentTime) throws IOException {
> if (inProgressPart == null || 
> rollingPolicy.shouldRollOnEvent(inProgressPart, element) || 
> rollingPolicy.shouldRollOnProcessingTime(inProgressPart, currentTime)) {
> if (LOG.isDebugEnabled()) {
> LOG.info("Subtask {} closing in-progress part file for bucket 
> id={} due to element {}.", subtaskIndex, bucketId, element);
>   }
>   rollPartFile(currentTime);
> }
> inProgressPart.write(element, currentTime);
> }
> {code}
> Maybe we can replace periodic detection with this?
> Is my understanding correct? Or can we do this? 
>  Thanks! ^_^



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-22497) When using DefaultRollingPolicy in StreamingFileSink, the file will be finished delayed

2021-04-30 Thread ChangjiGuo (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22497?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17334641#comment-17334641
 ] 

ChangjiGuo edited comment on FLINK-22497 at 4/30/21, 9:51 AM:
--

Can someone review this? Thanks!


was (Author: changjiguo):
Can someone review this? thanks!

> When using DefaultRollingPolicy in StreamingFileSink, the file will be 
> finished delayed
> ---
>
> Key: FLINK-22497
> URL: https://issues.apache.org/jira/browse/FLINK-22497
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / FileSystem
>Affects Versions: 1.11.2
> Environment: hadoop-2.8.4,Flink-1.11.2
>Reporter: ChangjiGuo
>Priority: Major
> Attachments: 1.png
>
>
> I had a doubt when testing StreamingFileSink:
> The default 60s rolling interval in DefaultRollingPolicy is detected by 
> procTimeService. If the rolling interval is not met this time, it will be 
> delayed to the next timer trigger point (after 60s), so this is not real-time 
> and does not match the maximum duration. For example, if the checkpoint 
> period is set to 60s, the file should be converted to finished at the second 
> checkpoint, but it will be delayed to the third checkpoint.
> You can refer to the attached picture for detail.
> If we add rollingPolicy.shouldRollOnProcessingTime to the if condition of 
> Bucket.write method, the file will be set to finished as we expect at the 
> second checkpoint.
> {code:java}
> void write(IN element, long currentTime) throws IOException {
> if (inProgressPart == null || 
> rollingPolicy.shouldRollOnEvent(inProgressPart, element) || 
> rollingPolicy.shouldRollOnProcessingTime(inProgressPart, currentTime)) {
> if (LOG.isDebugEnabled()) {
> LOG.info("Subtask {} closing in-progress part file for bucket 
> id={} due to element {}.", subtaskIndex, bucketId, element);
>   }
>   rollPartFile(currentTime);
> }
> inProgressPart.write(element, currentTime);
> }
> {code}
> Maybe we can replace periodic detection with this?
> Is my understanding correct? Or can we do this? 
>  Thanks! ^_^



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-22497) When using DefaultRollingPolicy in StreamingFileSink, the file will be finished delayed

2021-04-30 Thread ChangjiGuo (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22497?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17334641#comment-17334641
 ] 

ChangjiGuo edited comment on FLINK-22497 at 4/30/21, 9:51 AM:
--

Can someone review this? thanks!


was (Author: changjiguo):
can someone review this? thanks!

> When using DefaultRollingPolicy in StreamingFileSink, the file will be 
> finished delayed
> ---
>
> Key: FLINK-22497
> URL: https://issues.apache.org/jira/browse/FLINK-22497
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / FileSystem
>Affects Versions: 1.11.2
> Environment: hadoop-2.8.4,Flink-1.11.2
>Reporter: ChangjiGuo
>Priority: Major
> Attachments: 1.png
>
>
> I had a doubt when testing StreamingFileSink:
> The default 60s rolling interval in DefaultRollingPolicy is detected by 
> procTimeService. If the rolling interval is not met this time, it will be 
> delayed to the next timer trigger point (after 60s), so this is not real-time 
> and does not match the maximum duration. For example, if the checkpoint 
> period is set to 60s, the file should be converted to finished at the second 
> checkpoint, but it will be delayed to the third checkpoint.
> You can refer to the attached picture for detail.
> If we add rollingPolicy.shouldRollOnProcessingTime to the if condition of 
> Bucket.write method, the file will be set to finished as we expect at the 
> second checkpoint.
> {code:java}
> void write(IN element, long currentTime) throws IOException {
> if (inProgressPart == null || 
> rollingPolicy.shouldRollOnEvent(inProgressPart, element) || 
> rollingPolicy.shouldRollOnProcessingTime(inProgressPart, currentTime)) {
> if (LOG.isDebugEnabled()) {
> LOG.info("Subtask {} closing in-progress part file for bucket 
> id={} due to element {}.", subtaskIndex, bucketId, element);
>   }
>   rollPartFile(currentTime);
> }
> inProgressPart.write(element, currentTime);
> }
> {code}
> Maybe we can replace periodic detection with this?
> Is my understanding correct? Or can we do this? 
>  Thanks! ^_^



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-22497) When using DefaultRollingPolicy in StreamingFileSink, the file will be finished delayed

2021-04-30 Thread ChangjiGuo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22497?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ChangjiGuo updated FLINK-22497:
---
Description: 
I had a doubt when testing StreamingFileSink:

The default 60s rolling interval in DefaultRollingPolicy is detected by 
procTimeService. If the rolling interval is not met this time, it will be 
delayed to the next timer trigger point (after 60s), so this is not real-time 
and does not match the maximum duration. For example, if the checkpoint period 
is set to 60s, the file should be converted to finished at the second 
checkpoint, but it will be delayed to the third checkpoint.

You can refer to the attached picture for detail.

If we add rollingPolicy.shouldRollOnProcessingTime to the if condition of 
Bucket.write method, the file will be set to finished as we expect at the 
second checkpoint.
{code:java}
void write(IN element, long currentTime) throws IOException {
if (inProgressPart == null || 
rollingPolicy.shouldRollOnEvent(inProgressPart, element) || 
rollingPolicy.shouldRollOnProcessingTime(inProgressPart, currentTime)) {
if (LOG.isDebugEnabled()) {
LOG.info("Subtask {} closing in-progress part file for bucket id={} 
due to element {}.", subtaskIndex, bucketId, element);
}
rollPartFile(currentTime);
}
inProgressPart.write(element, currentTime);
}
{code}
Maybe we can replace periodic detection with this?

Is my understanding correct? Or can we do this? 
 Thanks! ^_^

  was:
I had a doubt when testing StreamingFileSink:

The default 60s rolling interval in DefaultRollingPolicy is detected by 
procTimeService. If the rolling interval is not met this time, it will be 
delayed to the next timer trigger point (after 60s), so this is not real-time 
and does not match the maximum duration. For example, if the checkpoint period 
is set to 60s, the file should be converted to finished at the second 
checkpoint, but it will be delayed to the third checkpoint.

You can refer to the attached picture for detail.

If we add rollingPolicy.shouldRollOnProcessingTime to the if condition of 
Bucket.write method, the file will be set to finished as we expect at the 
second checkpoint.
{code:java}
void write(IN element, long currentTime) throws IOException {
if (inProgressPart == null || 
rollingPolicy.shouldRollOnEvent(inProgressPart, element) || 
rollingPolicy.shouldRollOnProcessingTime(inProgressPart, currentTime)) {
if (LOG.isDebugEnabled()) {
LOG.info("Subtask {} closing in-progress part file for bucket id={} 
due to element {}.", subtaskIndex, bucketId, element);
}
rollPartFile(currentTime);
}
inProgressPart.write(element, currentTime);
}
{code}
 
 Is my understanding correct? Or can we do this? 
 Thanks! ^_^


> When using DefaultRollingPolicy in StreamingFileSink, the file will be 
> finished delayed
> ---
>
> Key: FLINK-22497
> URL: https://issues.apache.org/jira/browse/FLINK-22497
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / FileSystem
>Affects Versions: 1.11.2
> Environment: hadoop-2.8.4,Flink-1.11.2
>Reporter: ChangjiGuo
>Priority: Major
> Attachments: 1.png
>
>
> I had a doubt when testing StreamingFileSink:
> The default 60s rolling interval in DefaultRollingPolicy is detected by 
> procTimeService. If the rolling interval is not met this time, it will be 
> delayed to the next timer trigger point (after 60s), so this is not real-time 
> and does not match the maximum duration. For example, if the checkpoint 
> period is set to 60s, the file should be converted to finished at the second 
> checkpoint, but it will be delayed to the third checkpoint.
> You can refer to the attached picture for detail.
> If we add rollingPolicy.shouldRollOnProcessingTime to the if condition of 
> Bucket.write method, the file will be set to finished as we expect at the 
> second checkpoint.
> {code:java}
> void write(IN element, long currentTime) throws IOException {
> if (inProgressPart == null || 
> rollingPolicy.shouldRollOnEvent(inProgressPart, element) || 
> rollingPolicy.shouldRollOnProcessingTime(inProgressPart, currentTime)) {
> if (LOG.isDebugEnabled()) {
> LOG.info("Subtask {} closing in-progress part file for bucket 
> id={} due to element {}.", subtaskIndex, bucketId, element);
>   }
>   rollPartFile(currentTime);
> }
> inProgressPart.write(element, currentTime);
> }
> {code}
> Maybe we can replace periodic detection with this?
> Is my understanding correct? Or can we do this? 
>  Thanks! ^_^



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-22497) When using DefaultRollingPolicy in StreamingFileSink, the file will be finished delayed

2021-04-29 Thread ChangjiGuo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22497?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ChangjiGuo updated FLINK-22497:
---
Description: 
I had a doubt when testing StreamingFileSink:

The default 60s rolling interval in DefaultRollingPolicy is detected by 
procTimeService. If the rolling interval is not met this time, it will be 
delayed to the next timer trigger point (after 60s), so this is not real-time 
and does not match the maximum duration. For example, if the checkpoint period 
is set to 60s, the file should be converted to finished at the second 
checkpoint, but it will be delayed to the third checkpoint.

You can refer to the attached picture for detail.

If we add rollingPolicy.shouldRollOnProcessingTime to the if condition of 
Bucket.write method, the file will be set to finished as we expect at the 
second checkpoint.
{code:java}
void write(IN element, long currentTime) throws IOException {
if (inProgressPart == null || 
rollingPolicy.shouldRollOnEvent(inProgressPart, element) || 
rollingPolicy.shouldRollOnProcessingTime(inProgressPart, currentTime)) {
if (LOG.isDebugEnabled()) {
LOG.info("Subtask {} closing in-progress part file for bucket id={} 
due to element {}.", subtaskIndex, bucketId, element);
}
rollPartFile(currentTime);
}
inProgressPart.write(element, currentTime);
}
{code}
 
 Is my understanding correct? Or can we do this? 
 Thanks! ^_^

  was:
I had a doubt when testing StreamingFileSink:

The default 60s rolling interval in DefaultRollingPolicy is detected by 
procTimeService. If the rolling interval is not met this time, it will be 
delayed to the next timer trigger point (after 60s), so this is not real-time. 
For example, if the checkpoint period is set to 60s, the file should be 
converted to finished at the second checkpoint, but it will be delayed to the 
third checkpoint.

You can refer to the attached picture for detail.

If we add rollingPolicy.shouldRollOnProcessingTime to the if condition of 
Bucket.write method, the file will be set to finished as we expect at the 
second checkpoint.
{code:java}
void write(IN element, long currentTime) throws IOException {
if (inProgressPart == null || 
rollingPolicy.shouldRollOnEvent(inProgressPart, element) || 
rollingPolicy.shouldRollOnProcessingTime(inProgressPart, currentTime)) {
if (LOG.isDebugEnabled()) {
LOG.info("Subtask {} closing in-progress part file for bucket id={} 
due to element {}.", subtaskIndex, bucketId, element);
}
rollPartFile(currentTime);
}
inProgressPart.write(element, currentTime);
}
{code}
 
 Is my understanding correct? Or can we do this? 
 Thanks! ^_^


> When using DefaultRollingPolicy in StreamingFileSink, the file will be 
> finished delayed
> ---
>
> Key: FLINK-22497
> URL: https://issues.apache.org/jira/browse/FLINK-22497
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / FileSystem
>Affects Versions: 1.11.2
> Environment: hadoop-2.8.4,Flink-1.11.2
>Reporter: ChangjiGuo
>Priority: Major
> Attachments: 1.png
>
>
> I had a doubt when testing StreamingFileSink:
> The default 60s rolling interval in DefaultRollingPolicy is detected by 
> procTimeService. If the rolling interval is not met this time, it will be 
> delayed to the next timer trigger point (after 60s), so this is not real-time 
> and does not match the maximum duration. For example, if the checkpoint 
> period is set to 60s, the file should be converted to finished at the second 
> checkpoint, but it will be delayed to the third checkpoint.
> You can refer to the attached picture for detail.
> If we add rollingPolicy.shouldRollOnProcessingTime to the if condition of 
> Bucket.write method, the file will be set to finished as we expect at the 
> second checkpoint.
> {code:java}
> void write(IN element, long currentTime) throws IOException {
> if (inProgressPart == null || 
> rollingPolicy.shouldRollOnEvent(inProgressPart, element) || 
> rollingPolicy.shouldRollOnProcessingTime(inProgressPart, currentTime)) {
> if (LOG.isDebugEnabled()) {
> LOG.info("Subtask {} closing in-progress part file for bucket 
> id={} due to element {}.", subtaskIndex, bucketId, element);
>   }
>   rollPartFile(currentTime);
> }
> inProgressPart.write(element, currentTime);
> }
> {code}
>  
>  Is my understanding correct? Or can we do this? 
>  Thanks! ^_^



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-22497) When using DefaultRollingPolicy in StreamingFileSink, the file will be finished delayed

2021-04-28 Thread ChangjiGuo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22497?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ChangjiGuo updated FLINK-22497:
---
Environment: hadoop-2.8.4,Flink-1.11.2  (was: hadoop-2.8.4)

> When using DefaultRollingPolicy in StreamingFileSink, the file will be 
> finished delayed
> ---
>
> Key: FLINK-22497
> URL: https://issues.apache.org/jira/browse/FLINK-22497
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / FileSystem
>Affects Versions: 1.11.2
> Environment: hadoop-2.8.4,Flink-1.11.2
>Reporter: ChangjiGuo
>Priority: Major
> Attachments: 1.png
>
>
> I had a doubt when testing StreamingFileSink:
> The default 60s rolling interval in DefaultRollingPolicy is detected by 
> procTimeService. If the rolling interval is not met this time, it will be 
> delayed to the next timer trigger point (after 60s), so this is not 
> real-time. For example, if the checkpoint period is set to 60s, the file 
> should be converted to finished at the second checkpoint, but it will be 
> delayed to the third checkpoint.
> You can refer to the attached picture for detail.
> If we add rollingPolicy.shouldRollOnProcessingTime to the if condition of 
> Bucket.write method, the file will be set to finished as we expect at the 
> second checkpoint.
> {code:java}
> void write(IN element, long currentTime) throws IOException {
> if (inProgressPart == null || 
> rollingPolicy.shouldRollOnEvent(inProgressPart, element) || 
> rollingPolicy.shouldRollOnProcessingTime(inProgressPart, currentTime)) {
> if (LOG.isDebugEnabled()) {
> LOG.info("Subtask {} closing in-progress part file for bucket 
> id={} due to element {}.", subtaskIndex, bucketId, element);
>   }
>   rollPartFile(currentTime);
> }
> inProgressPart.write(element, currentTime);
> }
> {code}
>  
>  Is my understanding correct? Or can we do this? 
>  Thanks! ^_^



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-22497) When using DefaultRollingPolicy in StreamingFileSink, the file will be finished delayed

2021-04-28 Thread ChangjiGuo (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22497?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17334641#comment-17334641
 ] 

ChangjiGuo commented on FLINK-22497:


can someone review this? thanks!

> When using DefaultRollingPolicy in StreamingFileSink, the file will be 
> finished delayed
> ---
>
> Key: FLINK-22497
> URL: https://issues.apache.org/jira/browse/FLINK-22497
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / FileSystem
>Affects Versions: 1.11.2
> Environment: hadoop-2.8.4
>Reporter: ChangjiGuo
>Priority: Major
> Attachments: 1.png
>
>
> I had a doubt when testing StreamingFileSink:
> The default 60s rolling interval in DefaultRollingPolicy is detected by 
> procTimeService. If the rolling interval is not met this time, it will be 
> delayed to the next timer trigger point (after 60s), so this is not 
> real-time. For example, if the checkpoint period is set to 60s, the file 
> should be converted to finished at the second checkpoint, but it will be 
> delayed to the third checkpoint.
> You can refer to the attached picture for detail.
> If we add rollingPolicy.shouldRollOnProcessingTime to the if condition of 
> Bucket.write method, the file will be set to finished as we expect at the 
> second checkpoint.
> {code:java}
> void write(IN element, long currentTime) throws IOException {
> if (inProgressPart == null || 
> rollingPolicy.shouldRollOnEvent(inProgressPart, element) || 
> rollingPolicy.shouldRollOnProcessingTime(inProgressPart, currentTime)) {
> if (LOG.isDebugEnabled()) {
> LOG.info("Subtask {} closing in-progress part file for bucket 
> id={} due to element {}.", subtaskIndex, bucketId, element);
>   }
>   rollPartFile(currentTime);
> }
> inProgressPart.write(element, currentTime);
> }
> {code}
>  
>  Is my understanding correct? Or can we do this? 
>  Thanks! ^_^



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-22497) When using DefaultRollingPolicy in StreamingFileSink, the file will be finished delayed

2021-04-28 Thread ChangjiGuo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22497?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ChangjiGuo updated FLINK-22497:
---
Description: 
I had a doubt when testing StreamingFileSink:

The default 60s rolling interval in DefaultRollingPolicy is detected by 
procTimeService. If the rolling interval is not met this time, it will be 
delayed to the next timer trigger point (after 60s), so this is not real-time. 
For example, if the checkpoint period is set to 60s, the file should be 
converted to finished at the second checkpoint, but it will be delayed to the 
third checkpoint.

You can refer to the attached picture for detail.

If we add rollingPolicy.shouldRollOnProcessingTime to the if condition of 
Bucket.write method, the file will be set to finished as we expect at the 
second checkpoint.
{code:java}
void write(IN element, long currentTime) throws IOException {
if (inProgressPart == null || 
rollingPolicy.shouldRollOnEvent(inProgressPart, element) || 
rollingPolicy.shouldRollOnProcessingTime(inProgressPart, currentTime)) {
if (LOG.isDebugEnabled()) {
LOG.info("Subtask {} closing in-progress part file for bucket id={} 
due to element {}.", subtaskIndex, bucketId, element);
}
rollPartFile(currentTime);
}
inProgressPart.write(element, currentTime);
}
{code}
 
 Is my understanding correct? Or can we do this? 
 Thanks! ^_^

  was:
I had a doubt when testing StreamingFileSink:

The default 60s rolling interval in DefaultRollingPolicy is detected by 
procTimeService. If the rolling interval is not met this time, it will be 
delayed to the next timer trigger point (after 60s), so this is not real-time. 
For example, if the checkpoint period is set to 60s, the file should be 
converted to finished at the second checkpoint, but it will be delayed to the 
third checkpoint.

You can refer to the attached picture for detail.

If we add rollingPolicy.shouldRollOnProcessingTime to the if condition of 
Bucket.write method, the file will be set to finished as we expect at the 
second checkpoint.
{code:java}
void write(IN element, long currentTime) throws IOException {
if (inProgressPart == null || 
rollingPolicy.shouldRollOnEvent(inProgressPart, element)
|| rollingPolicy.shouldRollOnProcessingTime(inProgressPart, 
currentTime)) {
if (LOG.isDebugEnabled()) {
LOG.info("Subtask {} closing in-progress part file for 
bucket id={} due to element {}.", subtaskIndex, bucketId, element);
}
rollPartFile(currentTime);
}
inProgressPart.write(element, currentTime);
}
{code}
 
 Is my understanding correct? Or can we do this? 
 Thanks! ^_^


> When using DefaultRollingPolicy in StreamingFileSink, the file will be 
> finished delayed
> ---
>
> Key: FLINK-22497
> URL: https://issues.apache.org/jira/browse/FLINK-22497
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / FileSystem
>Affects Versions: 1.11.2
> Environment: hadoop-2.8.4
>Reporter: ChangjiGuo
>Priority: Major
> Attachments: 1.png
>
>
> I had a doubt when testing StreamingFileSink:
> The default 60s rolling interval in DefaultRollingPolicy is detected by 
> procTimeService. If the rolling interval is not met this time, it will be 
> delayed to the next timer trigger point (after 60s), so this is not 
> real-time. For example, if the checkpoint period is set to 60s, the file 
> should be converted to finished at the second checkpoint, but it will be 
> delayed to the third checkpoint.
> You can refer to the attached picture for detail.
> If we add rollingPolicy.shouldRollOnProcessingTime to the if condition of 
> Bucket.write method, the file will be set to finished as we expect at the 
> second checkpoint.
> {code:java}
> void write(IN element, long currentTime) throws IOException {
> if (inProgressPart == null || 
> rollingPolicy.shouldRollOnEvent(inProgressPart, element) || 
> rollingPolicy.shouldRollOnProcessingTime(inProgressPart, currentTime)) {
> if (LOG.isDebugEnabled()) {
> LOG.info("Subtask {} closing in-progress part file for bucket 
> id={} due to element {}.", subtaskIndex, bucketId, element);
>   }
>   rollPartFile(currentTime);
> }
> inProgressPart.write(element, currentTime);
> }
> {code}
>  
>  Is my understanding correct? Or can we do this? 
>  Thanks! ^_^



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-22497) When using DefaultRollingPolicy in StreamingFileSink, the file will be finished delayed

2021-04-28 Thread ChangjiGuo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22497?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ChangjiGuo updated FLINK-22497:
---
Description: 
I had a doubt when testing StreamingFileSink:

The default 60s rolling interval in DefaultRollingPolicy is detected by 
procTimeService. If the rolling interval is not met this time, it will be 
delayed to the next timer trigger point (after 60s), so this is not real-time. 
For example, if the checkpoint period is set to 60s, the file should be 
converted to finished at the second checkpoint, but it will be delayed to the 
third checkpoint.

You can refer to the attached picture for detail.

If we add rollingPolicy.shouldRollOnProcessingTime to the if condition of 
Bucket.write method, the file will be set to finished as we expect at the 
second checkpoint.
{code:java}
void write(IN element, long currentTime) throws IOException {
if (inProgressPart == null || 
rollingPolicy.shouldRollOnEvent(inProgressPart, element)
|| rollingPolicy.shouldRollOnProcessingTime(inProgressPart, 
currentTime)) {
if (LOG.isDebugEnabled()) {
LOG.info("Subtask {} closing in-progress part file for 
bucket id={} due to element {}.", subtaskIndex, bucketId, element);
}
rollPartFile(currentTime);
}
inProgressPart.write(element, currentTime);
}
{code}
 
 Is my understanding correct? Or can we do this? 
 Thanks! ^_^

  was:
I had a doubt when testing StreamingFileSink:

The default 60s rolling interval in DefaultRollingPolicy is detected by 
procTimeService. If the rolling interval is not met this time, it will be 
delayed to the next timer trigger point (after 60s), so this is not real-time. 
For example, if the checkpoint period is set to 60s, the file should be 
converted to finished at the second checkpoint, but it will be delayed to the 
third checkpoint.

You can refer to the attached picture for detail.

If we add rollingPolicy.shouldRollOnProcessingTime to the if condition of 
Bucket.write method, the file will be set to finished in the second checkpoint.
{code:java}
void write(IN element, long currentTime) throws IOException {
if (inProgressPart == null || 
rollingPolicy.shouldRollOnEvent(inProgressPart, element)
||rollingPolicy.shouldRollOnProcessingTime(inProgressPart, 
currentTime)) {
if (LOG.isDebugEnabled()) {
LOG.info("Subtask {} closing in-progress part file for 
bucket id={} due to element {}.",
subtaskIndex, bucketId, element);
}
rollPartFile(currentTime);
}
inProgressPart.write(element, currentTime);
}
{code}
 
Is my understanding correct? 
Thanks! ^_^


> When using DefaultRollingPolicy in StreamingFileSink, the file will be 
> finished delayed
> ---
>
> Key: FLINK-22497
> URL: https://issues.apache.org/jira/browse/FLINK-22497
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / FileSystem
>Affects Versions: 1.11.2
> Environment: hadoop-2.8.4
>Reporter: ChangjiGuo
>Priority: Major
> Attachments: 1.png
>
>
> I had a doubt when testing StreamingFileSink:
> The default 60s rolling interval in DefaultRollingPolicy is detected by 
> procTimeService. If the rolling interval is not met this time, it will be 
> delayed to the next timer trigger point (after 60s), so this is not 
> real-time. For example, if the checkpoint period is set to 60s, the file 
> should be converted to finished at the second checkpoint, but it will be 
> delayed to the third checkpoint.
> You can refer to the attached picture for detail.
> If we add rollingPolicy.shouldRollOnProcessingTime to the if condition of 
> Bucket.write method, the file will be set to finished as we expect at the 
> second checkpoint.
> {code:java}
> void write(IN element, long currentTime) throws IOException {
> if (inProgressPart == null || 
> rollingPolicy.shouldRollOnEvent(inProgressPart, element)
>   || rollingPolicy.shouldRollOnProcessingTime(inProgressPart, 
> currentTime)) {
>   if (LOG.isDebugEnabled()) {
>   LOG.info("Subtask {} closing in-progress part file for 
> bucket id={} due to element {}.", subtaskIndex, bucketId, element);
>   }
>   rollPartFile(currentTime);
>   }
>   inProgressPart.write(element, currentTime);
> }
> {code}
>  
>  Is my understanding correct? Or can we do this? 
>  Thanks! ^_^



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-22497) When using DefaultRollingPolicy in StreamingFileSink, the file will be finished delayed

2021-04-27 Thread ChangjiGuo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22497?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ChangjiGuo updated FLINK-22497:
---
Attachment: (was: 企业微信截图_43e36204-0a2f-4acd-ae17-56aa4d7661e4.png)

> When using DefaultRollingPolicy in StreamingFileSink, the file will be 
> finished delayed
> ---
>
> Key: FLINK-22497
> URL: https://issues.apache.org/jira/browse/FLINK-22497
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / FileSystem
>Affects Versions: 1.11.2
> Environment: hadoop-2.8.4
>Reporter: ChangjiGuo
>Priority: Major
> Attachments: 1.png
>
>
> I had a doubt when testing StreamingFileSink:
> The default 60s rolling interval in DefaultRollingPolicy is detected by 
> procTimeService. If the rolling interval is not met this time, it will be 
> delayed to the next timer trigger point (after 60s), so this is not 
> real-time. For example, if the checkpoint period is set to 60s, the file 
> should be converted to finished at the second checkpoint, but it will be 
> delayed to the third checkpoint.
> You can refer to the attached picture for detail.
> If we add rollingPolicy.shouldRollOnProcessingTime to the if condition of 
> Bucket.write method, the file will be set to finished in the second 
> checkpoint.
> {code:java}
> void write(IN element, long currentTime) throws IOException {
> if (inProgressPart == null || 
> rollingPolicy.shouldRollOnEvent(inProgressPart, element)
>   ||rollingPolicy.shouldRollOnProcessingTime(inProgressPart, 
> currentTime)) {
>   if (LOG.isDebugEnabled()) {
>   LOG.info("Subtask {} closing in-progress part file for 
> bucket id={} due to element {}.",
>   subtaskIndex, bucketId, element);
>   }
>   rollPartFile(currentTime);
>   }
>   inProgressPart.write(element, currentTime);
> }
> {code}
>  
> Is my understanding correct? 
> Thanks! ^_^



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-22497) When using DefaultRollingPolicy in StreamingFileSink, the file will be finished delayed

2021-04-27 Thread ChangjiGuo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22497?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ChangjiGuo updated FLINK-22497:
---
Attachment: 1.png

> When using DefaultRollingPolicy in StreamingFileSink, the file will be 
> finished delayed
> ---
>
> Key: FLINK-22497
> URL: https://issues.apache.org/jira/browse/FLINK-22497
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / FileSystem
>Affects Versions: 1.11.2
> Environment: hadoop-2.8.4
>Reporter: ChangjiGuo
>Priority: Major
> Attachments: 1.png
>
>
> I had a doubt when testing StreamingFileSink:
> The default 60s rolling interval in DefaultRollingPolicy is detected by 
> procTimeService. If the rolling interval is not met this time, it will be 
> delayed to the next timer trigger point (after 60s), so this is not 
> real-time. For example, if the checkpoint period is set to 60s, the file 
> should be converted to finished at the second checkpoint, but it will be 
> delayed to the third checkpoint.
> You can refer to the attached picture for detail.
> If we add rollingPolicy.shouldRollOnProcessingTime to the if condition of 
> Bucket.write method, the file will be set to finished in the second 
> checkpoint.
> {code:java}
> void write(IN element, long currentTime) throws IOException {
> if (inProgressPart == null || 
> rollingPolicy.shouldRollOnEvent(inProgressPart, element)
>   ||rollingPolicy.shouldRollOnProcessingTime(inProgressPart, 
> currentTime)) {
>   if (LOG.isDebugEnabled()) {
>   LOG.info("Subtask {} closing in-progress part file for 
> bucket id={} due to element {}.",
>   subtaskIndex, bucketId, element);
>   }
>   rollPartFile(currentTime);
>   }
>   inProgressPart.write(element, currentTime);
> }
> {code}
>  
> Is my understanding correct? 
> Thanks! ^_^



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-22497) When using DefaultRollingPolicy in StreamingFileSink, the file will be finished delayed

2021-04-27 Thread ChangjiGuo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22497?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ChangjiGuo updated FLINK-22497:
---
Attachment: 企业微信截图_43e36204-0a2f-4acd-ae17-56aa4d7661e4.png

> When using DefaultRollingPolicy in StreamingFileSink, the file will be 
> finished delayed
> ---
>
> Key: FLINK-22497
> URL: https://issues.apache.org/jira/browse/FLINK-22497
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / FileSystem
>Affects Versions: 1.11.2
> Environment: hadoop-2.8.4
>Reporter: ChangjiGuo
>Priority: Major
> Attachments: 1.png
>
>
> I had a doubt when testing StreamingFileSink:
> The default 60s rolling interval in DefaultRollingPolicy is detected by 
> procTimeService. If the rolling interval is not met this time, it will be 
> delayed to the next timer trigger point (after 60s), so this is not 
> real-time. For example, if the checkpoint period is set to 60s, the file 
> should be converted to finished at the second checkpoint, but it will be 
> delayed to the third checkpoint.
> You can refer to the attached picture for detail.
> If we add rollingPolicy.shouldRollOnProcessingTime to the if condition of 
> Bucket.write method, the file will be set to finished in the second 
> checkpoint.
> {code:java}
> void write(IN element, long currentTime) throws IOException {
> if (inProgressPart == null || 
> rollingPolicy.shouldRollOnEvent(inProgressPart, element)
>   ||rollingPolicy.shouldRollOnProcessingTime(inProgressPart, 
> currentTime)) {
>   if (LOG.isDebugEnabled()) {
>   LOG.info("Subtask {} closing in-progress part file for 
> bucket id={} due to element {}.",
>   subtaskIndex, bucketId, element);
>   }
>   rollPartFile(currentTime);
>   }
>   inProgressPart.write(element, currentTime);
> }
> {code}
>  
> Is my understanding correct? 
> Thanks! ^_^



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-22497) When using DefaultRollingPolicy in StreamingFileSink, the file will be finished delayed

2021-04-27 Thread ChangjiGuo (Jira)
ChangjiGuo created FLINK-22497:
--

 Summary: When using DefaultRollingPolicy in StreamingFileSink, the 
file will be finished delayed
 Key: FLINK-22497
 URL: https://issues.apache.org/jira/browse/FLINK-22497
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / FileSystem
Affects Versions: 1.11.2
 Environment: hadoop-2.8.4
Reporter: ChangjiGuo


I had a doubt when testing StreamingFileSink:

The default 60s rolling interval in DefaultRollingPolicy is detected by 
procTimeService. If the rolling interval is not met this time, it will be 
delayed to the next timer trigger point (after 60s), so this is not real-time. 
For example, if the checkpoint period is set to 60s, the file should be 
converted to finished at the second checkpoint, but it will be delayed to the 
third checkpoint.

You can refer to the attached picture for detail.

If we add rollingPolicy.shouldRollOnProcessingTime to the if condition of 
Bucket.write method, the file will be set to finished in the second checkpoint.
{code:java}
void write(IN element, long currentTime) throws IOException {
if (inProgressPart == null || 
rollingPolicy.shouldRollOnEvent(inProgressPart, element)
||rollingPolicy.shouldRollOnProcessingTime(inProgressPart, 
currentTime)) {
if (LOG.isDebugEnabled()) {
LOG.info("Subtask {} closing in-progress part file for 
bucket id={} due to element {}.",
subtaskIndex, bucketId, element);
}
rollPartFile(currentTime);
}
inProgressPart.write(element, currentTime);
}
{code}
 
Is my understanding correct? 
Thanks! ^_^



--
This message was sent by Atlassian Jira
(v8.3.4#803005)