Re: [PR] [FLINK-33981][runtime] Fix not closing DirectoryStream after listing local state files [flink]

2024-02-04 Thread via GitHub


1996fanrui commented on PR #24212:
URL: https://github.com/apache/flink/pull/24212#issuecomment-1926354179

   Thank you @fengjiajie for the great work. I have merged all PRs.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33981][runtime] Fix not closing DirectoryStream after listing local state files [flink]

2024-02-04 Thread via GitHub


1996fanrui merged PR #24262:
URL: https://github.com/apache/flink/pull/24262


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33981][runtime] Fix not closing DirectoryStream after listing local state files [flink]

2024-02-04 Thread via GitHub


1996fanrui merged PR #24261:
URL: https://github.com/apache/flink/pull/24261


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33981][runtime] Fix not closing DirectoryStream after listing local state files [flink]

2024-02-04 Thread via GitHub


fengjiajie commented on PR #24212:
URL: https://github.com/apache/flink/pull/24212#issuecomment-1926335570

   > Thanks @fengjiajie for the fix!
   > 
   > I see other callers of `Files.list` has already used the 
`try-with-resource`, and the comment also mentions it. So LGTM.
   
   @1996fanrui  Thanks for reviewing and merging the PRs. I've submitted PRs to 
the other two branches.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33981][runtime] Fix not closing DirectoryStream after listing local state files [flink]

2024-02-04 Thread via GitHub


TimFruit commented on PR #24262:
URL: https://github.com/apache/flink/pull/24262#issuecomment-1926295937

   
![image](https://github.com/apache/flink/assets/23203575/9531b846-d2c3-4736-8d63-2625f309cb81)
   
   it could be solved by adding the class to the project


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33981][runtime] Fix not closing DirectoryStream after listing local state files [flink]

2024-02-04 Thread via GitHub


TimFruit commented on PR #24262:
URL: https://github.com/apache/flink/pull/24262#issuecomment-1926173570

   when to publish this fixed version, this really bothers me when i try to use 
flink 1.17


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33981][runtime] Fix not closing DirectoryStream after listing local state files [flink]

2024-02-04 Thread via GitHub


fengjiajie commented on PR #24261:
URL: https://github.com/apache/flink/pull/24261#issuecomment-1926120194

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33981][runtime] Fix not closing DirectoryStream after listing local state files [flink]

2024-02-04 Thread via GitHub


fengjiajie commented on PR #24261:
URL: https://github.com/apache/flink/pull/24261#issuecomment-1925732421

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33981][runtime] Fix not closing DirectoryStream after listing local state files [flink]

2024-02-03 Thread via GitHub


flinkbot commented on PR #24262:
URL: https://github.com/apache/flink/pull/24262#issuecomment-1925611990

   
   ## CI report:
   
   * b2dd522ec1fa4600ce536b3f78ce74ca5d12fbd0 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33981][runtime] Fix not closing DirectoryStream after listing local state files [flink]

2024-02-03 Thread via GitHub


flinkbot commented on PR #24261:
URL: https://github.com/apache/flink/pull/24261#issuecomment-1925610512

   
   ## CI report:
   
   * a51edbb7b93d958103a4b783fc5402f7ab1d UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [FLINK-33981][runtime] Fix not closing DirectoryStream after listing local state files [flink]

2024-02-03 Thread via GitHub


fengjiajie opened a new pull request, #24261:
URL: https://github.com/apache/flink/pull/24261

   This closes https://github.com/apache/flink/pull/24212
   
   
   
   ## What is the purpose of the change
   
   When using MiniCluster mode, file descriptors like 
/tmp/minicluster_6e62ab17ab058e0d5df56b97e9ccef71/tm_0/localState are not 
released after a Job completes. Executing multiple Jobs in the same JVM might 
result in leftover file descriptors, potentially leading to problems.
   
   After executing the reproducing code provided below (after entering the 
sleep), running lsof -p 18162 reveals:
   
   ```
   ...
   java18162 sa_cluster   30r   DIR  253,1 01311962 
/tmp/minicluster_79186ba3856bb18fd21c20d9adbaa3da/tm_0/localState (deleted)
   java18162 sa_cluster   31r   DIR  253,1 01311962 
/tmp/minicluster_79186ba3856bb18fd21c20d9adbaa3da/tm_0/localState (deleted)
   java18162 sa_cluster   32r   DIR  253,1 01310787 
/tmp/minicluster_f3ac056ffa1c47010698fc02c5e6d4cf/tm_0/localState (deleted)
   java18162 sa_cluster   33r   DIR  253,1 01310787 
/tmp/minicluster_f3ac056ffa1c47010698fc02c5e6d4cf/tm_0/localState (deleted)
   java18162 sa_cluster   34r   DIR  253,1 01311960 
/tmp/minicluster_ef6177c97a2c9096758d0a8f132f9f02/tm_0/localState (deleted)
   java18162 sa_cluster   35r   DIR  253,1 01311960 
/tmp/minicluster_ef6177c97a2c9096758d0a8f132f9f02/tm_0/localState (deleted)
   java18162 sa_cluster   36r   DIR  253,1 01311974 
/tmp/minicluster_661a2967ea5c377265fa9f41c768db21/tm_0/localState (deleted)
   java18162 sa_cluster   37r   DIR  253,1 01311974 
/tmp/minicluster_661a2967ea5c377265fa9f41c768db21/tm_0/localState (deleted)
   java18162 sa_cluster   38r   DIR  253,1 01311979 
/tmp/minicluster_8413f476b77245203ed1ef759eb0d2de/tm_0/localState (deleted)
   ...
   ```
   
   The code used for reproduction is as follows:
   
   ```
   import org.apache.flink.api.common.JobExecutionResult;
   import org.apache.flink.configuration.Configuration;
   import org.apache.flink.core.execution.JobClient;
   import org.apache.flink.streaming.api.datastream.DataStreamSource;
   import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
   import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
   import org.apache.flink.streaming.api.graph.StreamGraph;
   
   import java.util.concurrent.CompletableFuture;
   import java.util.concurrent.TimeUnit;
   import java.util.concurrent.TimeoutException;
   
   /**
* javac -cp 'lib/*' TestReleaseFd.java
* java -Xmx600m -cp '.:lib/*' TestReleaseFd
*/
   public class TestReleaseFd {
   
 public static void main(String[] args) throws Exception {
   for (int i = 0; i < 10; ++i) {
 int round = i;
 Thread thread = new Thread(() -> {
   try {
 Configuration configuration = new Configuration();
 final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment(configuration);
 env.setParallelism(1);
   
 DataStreamSource longDataStreamSource = env.fromSequence(1, 
10);
 longDataStreamSource.addSink(new DiscardingSink<>());
   
 StreamGraph streamGraph = env.getStreamGraph();
 streamGraph.setJobName("test-" + System.nanoTime());
 JobClient jobClient = env.executeAsync(streamGraph);
   
 CompletableFuture 
jobExecutionResultCompletableFuture = jobClient.getJobExecutionResult();
 JobExecutionResult jobExecutionResult = null;
 while (jobExecutionResult == null) {
   try {
 jobExecutionResult = 
jobExecutionResultCompletableFuture.get(20, TimeUnit.SECONDS);
   } catch (TimeoutException timeoutException) {
 // ignore
   }
 }
 System.out.println("finished round: " + round);
 env.close();
   } catch (Exception e) {
 throw new RuntimeException(e);
   }
 });
   
 thread.setDaemon(true);
 thread.start();
 thread.join();
   
 System.out.println("done ... " + i);
   }
   
   // === lsof -p 18162
   Thread.sleep(500_000_000);
 }
   }
   ```
   
   ## Brief change log
   
   Close the DirectoryStream after using.
   
   ## Verifying this change
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / **no**)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
 - The serializers: (yes / 

Re: [PR] [FLINK-33981][runtime] Fix not closing DirectoryStream after listing local state files [flink]

2024-02-03 Thread via GitHub


1996fanrui merged PR #24212:
URL: https://github.com/apache/flink/pull/24212


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33981][runtime] Fix not closing DirectoryStream after listing local state files [flink]

2024-02-03 Thread via GitHub


fengjiajie commented on PR #24212:
URL: https://github.com/apache/flink/pull/24212#issuecomment-1925600132

   Hi @1996fanrui , please take a look at this fix when you have a moment to 
see if it can be merged, or if I need to reach out to someone else? thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33981][runtime] Fix not closing DirectoryStream after listing local state files [flink]

2024-01-28 Thread via GitHub


flinkbot commented on PR #24212:
URL: https://github.com/apache/flink/pull/24212#issuecomment-1913877869

   
   ## CI report:
   
   * 3d66ebaffc112e0b3777fc3e3841b64f045357bb UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [FLINK-33981][runtime] Fix not closing DirectoryStream after listing local state files [flink]

2024-01-28 Thread via GitHub


fengjiajie opened a new pull request, #24212:
URL: https://github.com/apache/flink/pull/24212

   
   
   ## What is the purpose of the change
   
   When using MiniCluster mode, file descriptors like 
/tmp/minicluster_6e62ab17ab058e0d5df56b97e9ccef71/tm_0/localState are not 
released after a Job completes. Executing multiple Jobs in the same JVM might 
result in leftover file descriptors, potentially leading to problems.
   
   After executing the reproducing code provided below (after entering the 
sleep), running lsof -p 18162 reveals:
   
   ```
   ...
   java18162 sa_cluster   30r   DIR  253,1 01311962 
/tmp/minicluster_79186ba3856bb18fd21c20d9adbaa3da/tm_0/localState (deleted)
   java18162 sa_cluster   31r   DIR  253,1 01311962 
/tmp/minicluster_79186ba3856bb18fd21c20d9adbaa3da/tm_0/localState (deleted)
   java18162 sa_cluster   32r   DIR  253,1 01310787 
/tmp/minicluster_f3ac056ffa1c47010698fc02c5e6d4cf/tm_0/localState (deleted)
   java18162 sa_cluster   33r   DIR  253,1 01310787 
/tmp/minicluster_f3ac056ffa1c47010698fc02c5e6d4cf/tm_0/localState (deleted)
   java18162 sa_cluster   34r   DIR  253,1 01311960 
/tmp/minicluster_ef6177c97a2c9096758d0a8f132f9f02/tm_0/localState (deleted)
   java18162 sa_cluster   35r   DIR  253,1 01311960 
/tmp/minicluster_ef6177c97a2c9096758d0a8f132f9f02/tm_0/localState (deleted)
   java18162 sa_cluster   36r   DIR  253,1 01311974 
/tmp/minicluster_661a2967ea5c377265fa9f41c768db21/tm_0/localState (deleted)
   java18162 sa_cluster   37r   DIR  253,1 01311974 
/tmp/minicluster_661a2967ea5c377265fa9f41c768db21/tm_0/localState (deleted)
   java18162 sa_cluster   38r   DIR  253,1 01311979 
/tmp/minicluster_8413f476b77245203ed1ef759eb0d2de/tm_0/localState (deleted)
   ...
   ```
   
   The code used for reproduction is as follows:
   
   ```
   import org.apache.flink.api.common.JobExecutionResult;
   import org.apache.flink.configuration.Configuration;
   import org.apache.flink.core.execution.JobClient;
   import org.apache.flink.streaming.api.datastream.DataStreamSource;
   import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
   import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
   import org.apache.flink.streaming.api.graph.StreamGraph;
   
   import java.util.concurrent.CompletableFuture;
   import java.util.concurrent.TimeUnit;
   import java.util.concurrent.TimeoutException;
   
   /**
* javac -cp 'lib/*' TestReleaseFd.java
* java -Xmx600m -cp '.:lib/*' TestReleaseFd
*/
   public class TestReleaseFd {
   
 public static void main(String[] args) throws Exception {
   for (int i = 0; i < 10; ++i) {
 int round = i;
 Thread thread = new Thread(() -> {
   try {
 Configuration configuration = new Configuration();
 final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment(configuration);
 env.setParallelism(1);
   
 DataStreamSource longDataStreamSource = env.fromSequence(1, 
10);
 longDataStreamSource.addSink(new DiscardingSink<>());
   
 StreamGraph streamGraph = env.getStreamGraph();
 streamGraph.setJobName("test-" + System.nanoTime());
 JobClient jobClient = env.executeAsync(streamGraph);
   
 CompletableFuture 
jobExecutionResultCompletableFuture = jobClient.getJobExecutionResult();
 JobExecutionResult jobExecutionResult = null;
 while (jobExecutionResult == null) {
   try {
 jobExecutionResult = 
jobExecutionResultCompletableFuture.get(20, TimeUnit.SECONDS);
   } catch (TimeoutException timeoutException) {
 // ignore
   }
 }
 System.out.println("finished round: " + round);
 env.close();
   } catch (Exception e) {
 throw new RuntimeException(e);
   }
 });
   
 thread.setDaemon(true);
 thread.start();
 thread.join();
   
 System.out.println("done ... " + i);
   }
   
   // === lsof -p 18162
   Thread.sleep(500_000_000);
 }
   }
   ```
   
   ## Brief change log
   
   Close the DirectoryStream after using.
   
   ## Verifying this change
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / **no**)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
 - The serializers: (yes / **no** / don't know)
 - The runtime per-record code paths