Re: [PR] [FLINK-32512] Don't register resource to user resource manager when creating temporary function [flink]

2023-10-18 Thread via GitHub


KarmaGYZ merged PR #22938:
URL: https://github.com/apache/flink/pull/22938


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-32512] Don't register resource to user resource manager when creating temporary function [flink]

2023-10-18 Thread via GitHub


KarmaGYZ commented on PR #22938:
URL: https://github.com/apache/flink/pull/22938#issuecomment-1769817718

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-32512] Don't register resource to user resource manager when creating temporary function [flink]

2023-10-18 Thread via GitHub


KarmaGYZ commented on code in PR #22938:
URL: https://github.com/apache/flink/pull/22938#discussion_r1363506946


##
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java:
##
@@ -269,6 +279,7 @@ public boolean dropCatalogFunction(
 "Could not drop catalog function '%s'.", 
identifier.asSummaryString()),
 t);
 }
+

Review Comment:
   nit: redundant line.



##
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java:
##
@@ -131,10 +132,17 @@ public boolean dropTemporarySystemFunction(String name, 
boolean ignoreIfNotExist
 "Could not drop temporary system function. A 
function named '%s' doesn't exist.",
 name));
 }
+unregisterFunctionJarResources(function);
 
 return function != null;
 }
 
+private void unregisterFunctionJarResources(CatalogFunction function) {

Review Comment:
   +1



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-32512] Don't register resource to user resource manager when creating temporary function [flink]

2023-10-11 Thread via GitHub


FangYongs commented on PR #22938:
URL: https://github.com/apache/flink/pull/22938#issuecomment-1757174465

   @fsk119 @KarmaGYZ I have updated this PR, please have a look again, thanks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-32512] Don't register resource to user resource manager when creating temporary function [flink]

2023-10-10 Thread via GitHub


FangYongs commented on code in PR #22938:
URL: https://github.com/apache/flink/pull/22938#discussion_r1353844833


##
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/resource/ResourceManager.java:
##
@@ -65,9 +69,14 @@ public class ResourceManager implements Closeable {
 private static final String FILE_SCHEME = "file";
 
 private final Path localResourceDir;
+/** Resource infos for functions. */
+private final Map functionResourceInfos;
+
 protected final Map resourceInfos;
 protected final MutableURLClassLoader userClassLoader;
 
+private final List createdClassLoaderList;

Review Comment:
   DONE



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-32512] Don't register resource to user resource manager when creating temporary function [flink]

2023-10-10 Thread via GitHub


FangYongs commented on code in PR #22938:
URL: https://github.com/apache/flink/pull/22938#discussion_r1353833344


##
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/resource/ResourceManager.java:
##
@@ -124,15 +136,79 @@ public String registerFileResource(ResourceUri 
resourceUri) throws IOException {
 Collections.singletonList(resourceUri),
 ResourceType.FILE,
 false,
-url -> {});
+url -> {},
+false);
 registerResources(stagingResources, false);
 return resourceInfos.get(new 
ArrayList<>(stagingResources.keySet()).get(0)).getPath();
 }
 
+/**
+ * Register a resource for function and add it to the function resource 
infos. If the file is
+ * remote, it will be copied to a local file.
+ *
+ * @param resourceUris the resource uri for function.
+ */
+public void registerFunctionResources(Set resourceUris) 
throws IOException {
+prepareStagingResources(
+resourceUris,
+ResourceType.JAR,
+true,
+url -> {
+try {
+JarUtils.checkJarFile(url);
+} catch (IOException e) {
+throw new ValidationException(
+String.format("Failed to register jar resource 
[%s]", url), e);
+}
+},
+true);
+}
+
+/**
+ * Unregister the resource uri in function resources, when the reference 
count of the resource
+ * is 0, the resource will be removed from the function resources.
+ *
+ * @param resourceUris the uris to unregister in function resources.
+ */
+public void unregisterFunctionResources(List resourceUris) {
+if (!resourceUris.isEmpty()) {
+resourceUris.forEach(
+uri -> {
+ResourceCounter counter = 
functionResourceInfos.get(uri);
+if (counter != null && counter.decreaseCounter()) {
+functionResourceInfos.remove(uri);
+}
+});
+}
+}
+
 public URLClassLoader getUserClassLoader() {
 return userClassLoader;
 }
 
+public URLClassLoader createUserClassLoader(List 
resourceUris) {
+if (resourceUris.isEmpty()) {
+return userClassLoader;
+}
+MutableURLClassLoader classLoader = userClassLoader.copy();
+for (ResourceUri resourceUri : new HashSet<>(resourceUris)) {
+
classLoader.addURL(checkNotNull(functionResourceInfos.get(resourceUri)).url);
+}
+createdClassLoaderList.add(classLoader);
+
+return classLoader;
+}
+
+public void closeUserClassLoader(URLClassLoader classLoader) {
+if (createdClassLoaderList.remove(classLoader)) {

Review Comment:
   Remove created class loader list in resourcemanager



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-32512] Don't register resource to user resource manager when creating temporary function [flink]

2023-10-10 Thread via GitHub


FangYongs commented on code in PR #22938:
URL: https://github.com/apache/flink/pull/22938#discussion_r1353833112


##
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java:
##
@@ -131,10 +132,17 @@ public boolean dropTemporarySystemFunction(String name, 
boolean ignoreIfNotExist
 "Could not drop temporary system function. A 
function named '%s' doesn't exist.",
 name));
 }
+unregisterFunctionJarResources(function);
 
 return function != null;
 }
 
+private void unregisterFunctionJarResources(CatalogFunction function) {
+if (function != null && function.getFunctionLanguage() == 
FunctionLanguage.JAVA) {

Review Comment:
   Python udf is another story, we fix java udf first and consider python udf 
in another PR



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-32512] Don't register resource to user resource manager when creating temporary function [flink]

2023-10-09 Thread via GitHub


fsk119 commented on code in PR #22938:
URL: https://github.com/apache/flink/pull/22938#discussion_r1350236277


##
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/resource/ResourceManager.java:
##
@@ -65,9 +69,14 @@ public class ResourceManager implements Closeable {
 private static final String FILE_SCHEME = "file";
 
 private final Path localResourceDir;
+/** Resource infos for functions. */
+private final Map functionResourceInfos;
+
 protected final Map resourceInfos;
 protected final MutableURLClassLoader userClassLoader;
 
+private final List createdClassLoaderList;

Review Comment:
   Why we need this list here? I find the only usage is here. 
   ```
   URLClassLoader classLoader =
   
resourceManager.createUserClassLoader(function.getFunctionResources());
   try {
   UserDefinedFunctionHelper.validateClass(
   (Class)
   
classLoader.loadClass(function.getClassName()));
   } finally {
   resourceManager.closeUserClassLoader(classLoader);
   }
   ```
   If we close the created classloder immediately, can we move the management 
outside?



##
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/resource/ResourceManager.java:
##
@@ -124,15 +136,79 @@ public String registerFileResource(ResourceUri 
resourceUri) throws IOException {
 Collections.singletonList(resourceUri),
 ResourceType.FILE,
 false,
-url -> {});
+url -> {},
+false);
 registerResources(stagingResources, false);
 return resourceInfos.get(new 
ArrayList<>(stagingResources.keySet()).get(0)).getPath();
 }
 
+/**
+ * Register a resource for function and add it to the function resource 
infos. If the file is
+ * remote, it will be copied to a local file.
+ *
+ * @param resourceUris the resource uri for function.
+ */
+public void registerFunctionResources(Set resourceUris) 
throws IOException {
+prepareStagingResources(
+resourceUris,
+ResourceType.JAR,
+true,
+url -> {
+try {
+JarUtils.checkJarFile(url);
+} catch (IOException e) {
+throw new ValidationException(
+String.format("Failed to register jar resource 
[%s]", url), e);
+}
+},
+true);
+}
+
+/**
+ * Unregister the resource uri in function resources, when the reference 
count of the resource
+ * is 0, the resource will be removed from the function resources.
+ *
+ * @param resourceUris the uris to unregister in function resources.
+ */
+public void unregisterFunctionResources(List resourceUris) {
+if (!resourceUris.isEmpty()) {
+resourceUris.forEach(
+uri -> {
+ResourceCounter counter = 
functionResourceInfos.get(uri);
+if (counter != null && counter.decreaseCounter()) {
+functionResourceInfos.remove(uri);
+}

Review Comment:
   No. We always assume table env is used by one thread at any moement. 



##
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/resource/ResourceManager.java:
##
@@ -80,8 +89,10 @@ public ResourceManager(ReadableConfig config, 
MutableURLClassLoader userClassLoa
 new Path(
 config.get(TableConfigOptions.RESOURCES_DOWNLOAD_DIR),
 String.format("flink-table-%s", UUID.randomUUID()));
+this.functionResourceInfos = new HashMap<>();
 this.resourceInfos = new HashMap<>();
 this.userClassLoader = userClassLoader;
+this.createdClassLoaderList = new ArrayList<>();

Review Comment:
   use a better name?



##
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java:
##
@@ -44,7 +44,11 @@
 import org.apache.flink.table.resource.ResourceUri;
 import org.apache.flink.util.Preconditions;
 
+import javax.annotation.Nullable;

Review Comment:
   +1



##
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java:
##
@@ -569,6 +581,8 @@ private Optional 
resolvePreciseFunctionReference(Object
 CatalogFunction potentialResult = 
tempCatalogFunctions.get(normalizedIdentifier);
 
 if (potentialResult != null) {
+registerFunctionJarResources(
+oi.asSummaryString(), 
potentialResult.getFunctionResources());

Review Comment:
   for temporary function, Function