This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.9 by this push:
     new c2956f5  [FLINK-14139][rest] Fix potential memory leak problem of rest 
server.
c2956f5 is described below

commit c2956f512f6c4c0f93e87a04a090ceaaf9fd64da
Author: kevin.cyj <kevin....@alibaba-inc.com>
AuthorDate: Thu Sep 19 20:48:31 2019 +0800

    [FLINK-14139][rest] Fix potential memory leak problem of rest server.
    
    This closes #9750.
---
 .../flink/runtime/rest/FileUploadHandler.java      | 11 +++++++
 .../flink/runtime/rest/FileUploadHandlerTest.java  | 34 ++++++++++++++++++++++
 2 files changed, 45 insertions(+)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
index 3cd9732..6f60830 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
@@ -38,6 +38,7 @@ import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseSt
 import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.LastHttpContent;
 import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.Attribute;
 import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.DefaultHttpDataFactory;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.DiskAttribute;
 import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.DiskFileUpload;
 import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.HttpDataFactory;
 import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.HttpPostRequestDecoder;
@@ -84,7 +85,17 @@ public class FileUploadHandler extends 
SimpleChannelInboundHandler<HttpObject> {
 
        public FileUploadHandler(final Path uploadDir) {
                super(true);
+
+               // the clean up of temp files when jvm exits is handled by 
org.apache.flink.util.ShutdownHookUtil; thus,
+               // it's no need to register those files (post chunks and upload 
file chunks) to java.io.DeleteOnExitHook
+               // which may lead to memory leak.
+               DiskAttribute.deleteOnExitTemporaryFile = false;
+               DiskFileUpload.deleteOnExitTemporaryFile = false;
+
                DiskFileUpload.baseDirectory = 
uploadDir.normalize().toAbsolutePath().toString();
+               // share the same directory with file upload for post chunks 
storage.
+               DiskAttribute.baseDirectory = DiskFileUpload.baseDirectory;
+
                this.uploadDir = requireNonNull(uploadDir);
        }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/FileUploadHandlerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/FileUploadHandlerTest.java
index 771fd8a..80fa4b9 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/FileUploadHandlerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/FileUploadHandlerTest.java
@@ -38,8 +38,12 @@ import org.junit.Test;
 import java.io.File;
 import java.io.IOException;
 import java.io.StringWriter;
+import java.lang.reflect.Field;
+import java.util.LinkedHashSet;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 /**
  * Tests for the {@link FileUploadHandler}. Ensures that multipart http 
messages containing files and/or json are properly
@@ -136,6 +140,8 @@ public class FileUploadHandlerTest extends TestLogger {
                try (Response response = client.newCall(fileRequest).execute()) 
{
                        
assertEquals(fileHandler.getMessageHeaders().getResponseStatusCode().code(), 
response.code());
                }
+
+               verifyNoFileIsRegisteredToDeleteOnExitHook();
        }
 
        @Test
@@ -162,6 +168,8 @@ public class FileUploadHandlerTest extends TestLogger {
                        
assertEquals(mixedHandler.getMessageHeaders().getResponseStatusCode().code(), 
response.code());
                        assertEquals(json, mixedHandler.lastReceivedRequest);
                }
+
+               verifyNoFileIsRegisteredToDeleteOnExitHook();
        }
 
        @Test
@@ -188,6 +196,8 @@ public class FileUploadHandlerTest extends TestLogger {
                        // FileUploads are outright forbidden
                        assertEquals(HttpResponseStatus.BAD_REQUEST.code(), 
response.code());
                }
+
+               verifyNoFileIsRegisteredToDeleteOnExitHook();
        }
 
        @Test
@@ -212,6 +222,8 @@ public class FileUploadHandlerTest extends TestLogger {
                        // JSON payload did not match expected format
                        assertEquals(HttpResponseStatus.BAD_REQUEST.code(), 
response.code());
                }
+
+               verifyNoFileIsRegisteredToDeleteOnExitHook();
        }
 
        @Test
@@ -223,6 +235,8 @@ public class FileUploadHandlerTest extends TestLogger {
                        assertEquals(HttpResponseStatus.BAD_REQUEST.code(), 
response.code());
                }
                MULTIPART_UPLOAD_RESOURCE.assertUploadDirectoryIsEmpty();
+
+               verifyNoFileIsRegisteredToDeleteOnExitHook();
        }
 
        /**
@@ -238,5 +252,25 @@ public class FileUploadHandlerTest extends TestLogger {
                        
assertEquals(HttpResponseStatus.INTERNAL_SERVER_ERROR.code(), response.code());
                }
                MULTIPART_UPLOAD_RESOURCE.assertUploadDirectoryIsEmpty();
+
+               verifyNoFileIsRegisteredToDeleteOnExitHook();
+       }
+
+       /**
+        * DiskAttribute and DiskFileUpload class of netty store post chunks 
and file chunks as temp files on local disk.
+        * By default, netty will register these temp files to 
java.io.DeleteOnExitHook which may lead to memory leak.
+        * {@link FileUploadHandler} disables the shutdown hook registration so 
no file should be registered. Note that
+        * clean up of temp files is handed over to {@link 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint}.
+        */
+       private void verifyNoFileIsRegisteredToDeleteOnExitHook() {
+               try {
+                       Class<?> clazz = 
Class.forName("java.io.DeleteOnExitHook");
+                       Field field = clazz.getDeclaredField("files");
+                       field.setAccessible(true);
+                       LinkedHashSet files = (LinkedHashSet) field.get(null);
+                       assertTrue(files.isEmpty());
+               } catch (ClassNotFoundException | IllegalAccessException | 
NoSuchFieldException e) {
+                       fail("This should never happen.");
+               }
        }
 }

Reply via email to