This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch release-1.9 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.9 by this push: new c2956f5 [FLINK-14139][rest] Fix potential memory leak problem of rest server. c2956f5 is described below commit c2956f512f6c4c0f93e87a04a090ceaaf9fd64da Author: kevin.cyj <kevin....@alibaba-inc.com> AuthorDate: Thu Sep 19 20:48:31 2019 +0800 [FLINK-14139][rest] Fix potential memory leak problem of rest server. This closes #9750. --- .../flink/runtime/rest/FileUploadHandler.java | 11 +++++++ .../flink/runtime/rest/FileUploadHandlerTest.java | 34 ++++++++++++++++++++++ 2 files changed, 45 insertions(+) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java index 3cd9732..6f60830 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java @@ -38,6 +38,7 @@ import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseSt import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.LastHttpContent; import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.Attribute; import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.DefaultHttpDataFactory; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.DiskAttribute; import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.DiskFileUpload; import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.HttpDataFactory; import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.HttpPostRequestDecoder; @@ -84,7 +85,17 @@ public class FileUploadHandler extends SimpleChannelInboundHandler<HttpObject> { public FileUploadHandler(final Path uploadDir) { super(true); + + // the clean up of temp files when jvm exits is handled by org.apache.flink.util.ShutdownHookUtil; thus, + // it's no need to register those files (post chunks and upload file chunks) to java.io.DeleteOnExitHook + // which may lead to memory leak. + DiskAttribute.deleteOnExitTemporaryFile = false; + DiskFileUpload.deleteOnExitTemporaryFile = false; + DiskFileUpload.baseDirectory = uploadDir.normalize().toAbsolutePath().toString(); + // share the same directory with file upload for post chunks storage. + DiskAttribute.baseDirectory = DiskFileUpload.baseDirectory; + this.uploadDir = requireNonNull(uploadDir); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/FileUploadHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/FileUploadHandlerTest.java index 771fd8a..80fa4b9 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/FileUploadHandlerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/FileUploadHandlerTest.java @@ -38,8 +38,12 @@ import org.junit.Test; import java.io.File; import java.io.IOException; import java.io.StringWriter; +import java.lang.reflect.Field; +import java.util.LinkedHashSet; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; /** * Tests for the {@link FileUploadHandler}. Ensures that multipart http messages containing files and/or json are properly @@ -136,6 +140,8 @@ public class FileUploadHandlerTest extends TestLogger { try (Response response = client.newCall(fileRequest).execute()) { assertEquals(fileHandler.getMessageHeaders().getResponseStatusCode().code(), response.code()); } + + verifyNoFileIsRegisteredToDeleteOnExitHook(); } @Test @@ -162,6 +168,8 @@ public class FileUploadHandlerTest extends TestLogger { assertEquals(mixedHandler.getMessageHeaders().getResponseStatusCode().code(), response.code()); assertEquals(json, mixedHandler.lastReceivedRequest); } + + verifyNoFileIsRegisteredToDeleteOnExitHook(); } @Test @@ -188,6 +196,8 @@ public class FileUploadHandlerTest extends TestLogger { // FileUploads are outright forbidden assertEquals(HttpResponseStatus.BAD_REQUEST.code(), response.code()); } + + verifyNoFileIsRegisteredToDeleteOnExitHook(); } @Test @@ -212,6 +222,8 @@ public class FileUploadHandlerTest extends TestLogger { // JSON payload did not match expected format assertEquals(HttpResponseStatus.BAD_REQUEST.code(), response.code()); } + + verifyNoFileIsRegisteredToDeleteOnExitHook(); } @Test @@ -223,6 +235,8 @@ public class FileUploadHandlerTest extends TestLogger { assertEquals(HttpResponseStatus.BAD_REQUEST.code(), response.code()); } MULTIPART_UPLOAD_RESOURCE.assertUploadDirectoryIsEmpty(); + + verifyNoFileIsRegisteredToDeleteOnExitHook(); } /** @@ -238,5 +252,25 @@ public class FileUploadHandlerTest extends TestLogger { assertEquals(HttpResponseStatus.INTERNAL_SERVER_ERROR.code(), response.code()); } MULTIPART_UPLOAD_RESOURCE.assertUploadDirectoryIsEmpty(); + + verifyNoFileIsRegisteredToDeleteOnExitHook(); + } + + /** + * DiskAttribute and DiskFileUpload class of netty store post chunks and file chunks as temp files on local disk. + * By default, netty will register these temp files to java.io.DeleteOnExitHook which may lead to memory leak. + * {@link FileUploadHandler} disables the shutdown hook registration so no file should be registered. Note that + * clean up of temp files is handed over to {@link org.apache.flink.runtime.entrypoint.ClusterEntrypoint}. + */ + private void verifyNoFileIsRegisteredToDeleteOnExitHook() { + try { + Class<?> clazz = Class.forName("java.io.DeleteOnExitHook"); + Field field = clazz.getDeclaredField("files"); + field.setAccessible(true); + LinkedHashSet files = (LinkedHashSet) field.get(null); + assertTrue(files.isEmpty()); + } catch (ClassNotFoundException | IllegalAccessException | NoSuchFieldException e) { + fail("This should never happen."); + } } }