Re: [PR] [FLINK-32512] Don't register resource to user resource manager when creating temporary function [flink]
KarmaGYZ merged PR #22938: URL: https://github.com/apache/flink/pull/22938 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-32512] Don't register resource to user resource manager when creating temporary function [flink]
KarmaGYZ commented on PR #22938: URL: https://github.com/apache/flink/pull/22938#issuecomment-1769817718 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-32512] Don't register resource to user resource manager when creating temporary function [flink]
KarmaGYZ commented on code in PR #22938: URL: https://github.com/apache/flink/pull/22938#discussion_r1363506946 ## flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java: ## @@ -269,6 +279,7 @@ public boolean dropCatalogFunction( "Could not drop catalog function '%s'.", identifier.asSummaryString()), t); } + Review Comment: nit: redundant line. ## flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java: ## @@ -131,10 +132,17 @@ public boolean dropTemporarySystemFunction(String name, boolean ignoreIfNotExist "Could not drop temporary system function. A function named '%s' doesn't exist.", name)); } +unregisterFunctionJarResources(function); return function != null; } +private void unregisterFunctionJarResources(CatalogFunction function) { Review Comment: +1 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-32512] Don't register resource to user resource manager when creating temporary function [flink]
FangYongs commented on PR #22938: URL: https://github.com/apache/flink/pull/22938#issuecomment-1757174465 @fsk119 @KarmaGYZ I have updated this PR, please have a look again, thanks -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-32512] Don't register resource to user resource manager when creating temporary function [flink]
FangYongs commented on code in PR #22938: URL: https://github.com/apache/flink/pull/22938#discussion_r1353844833 ## flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/resource/ResourceManager.java: ## @@ -65,9 +69,14 @@ public class ResourceManager implements Closeable { private static final String FILE_SCHEME = "file"; private final Path localResourceDir; +/** Resource infos for functions. */ +private final Map functionResourceInfos; + protected final Map resourceInfos; protected final MutableURLClassLoader userClassLoader; +private final List createdClassLoaderList; Review Comment: DONE -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-32512] Don't register resource to user resource manager when creating temporary function [flink]
FangYongs commented on code in PR #22938: URL: https://github.com/apache/flink/pull/22938#discussion_r1353833344 ## flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/resource/ResourceManager.java: ## @@ -124,15 +136,79 @@ public String registerFileResource(ResourceUri resourceUri) throws IOException { Collections.singletonList(resourceUri), ResourceType.FILE, false, -url -> {}); +url -> {}, +false); registerResources(stagingResources, false); return resourceInfos.get(new ArrayList<>(stagingResources.keySet()).get(0)).getPath(); } +/** + * Register a resource for function and add it to the function resource infos. If the file is + * remote, it will be copied to a local file. + * + * @param resourceUris the resource uri for function. + */ +public void registerFunctionResources(Set resourceUris) throws IOException { +prepareStagingResources( +resourceUris, +ResourceType.JAR, +true, +url -> { +try { +JarUtils.checkJarFile(url); +} catch (IOException e) { +throw new ValidationException( +String.format("Failed to register jar resource [%s]", url), e); +} +}, +true); +} + +/** + * Unregister the resource uri in function resources, when the reference count of the resource + * is 0, the resource will be removed from the function resources. + * + * @param resourceUris the uris to unregister in function resources. + */ +public void unregisterFunctionResources(List resourceUris) { +if (!resourceUris.isEmpty()) { +resourceUris.forEach( +uri -> { +ResourceCounter counter = functionResourceInfos.get(uri); +if (counter != null && counter.decreaseCounter()) { +functionResourceInfos.remove(uri); +} +}); +} +} + public URLClassLoader getUserClassLoader() { return userClassLoader; } +public URLClassLoader createUserClassLoader(List resourceUris) { +if (resourceUris.isEmpty()) { +return userClassLoader; +} +MutableURLClassLoader classLoader = userClassLoader.copy(); +for (ResourceUri resourceUri : new HashSet<>(resourceUris)) { + classLoader.addURL(checkNotNull(functionResourceInfos.get(resourceUri)).url); +} +createdClassLoaderList.add(classLoader); + +return classLoader; +} + +public void closeUserClassLoader(URLClassLoader classLoader) { +if (createdClassLoaderList.remove(classLoader)) { Review Comment: Remove created class loader list in resourcemanager -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-32512] Don't register resource to user resource manager when creating temporary function [flink]
FangYongs commented on code in PR #22938: URL: https://github.com/apache/flink/pull/22938#discussion_r1353833112 ## flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java: ## @@ -131,10 +132,17 @@ public boolean dropTemporarySystemFunction(String name, boolean ignoreIfNotExist "Could not drop temporary system function. A function named '%s' doesn't exist.", name)); } +unregisterFunctionJarResources(function); return function != null; } +private void unregisterFunctionJarResources(CatalogFunction function) { +if (function != null && function.getFunctionLanguage() == FunctionLanguage.JAVA) { Review Comment: Python udf is another story, we fix java udf first and consider python udf in another PR -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-32512] Don't register resource to user resource manager when creating temporary function [flink]
fsk119 commented on code in PR #22938: URL: https://github.com/apache/flink/pull/22938#discussion_r1350236277 ## flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/resource/ResourceManager.java: ## @@ -65,9 +69,14 @@ public class ResourceManager implements Closeable { private static final String FILE_SCHEME = "file"; private final Path localResourceDir; +/** Resource infos for functions. */ +private final Map functionResourceInfos; + protected final Map resourceInfos; protected final MutableURLClassLoader userClassLoader; +private final List createdClassLoaderList; Review Comment: Why we need this list here? I find the only usage is here. ``` URLClassLoader classLoader = resourceManager.createUserClassLoader(function.getFunctionResources()); try { UserDefinedFunctionHelper.validateClass( (Class) classLoader.loadClass(function.getClassName())); } finally { resourceManager.closeUserClassLoader(classLoader); } ``` If we close the created classloder immediately, can we move the management outside? ## flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/resource/ResourceManager.java: ## @@ -124,15 +136,79 @@ public String registerFileResource(ResourceUri resourceUri) throws IOException { Collections.singletonList(resourceUri), ResourceType.FILE, false, -url -> {}); +url -> {}, +false); registerResources(stagingResources, false); return resourceInfos.get(new ArrayList<>(stagingResources.keySet()).get(0)).getPath(); } +/** + * Register a resource for function and add it to the function resource infos. If the file is + * remote, it will be copied to a local file. + * + * @param resourceUris the resource uri for function. + */ +public void registerFunctionResources(Set resourceUris) throws IOException { +prepareStagingResources( +resourceUris, +ResourceType.JAR, +true, +url -> { +try { +JarUtils.checkJarFile(url); +} catch (IOException e) { +throw new ValidationException( +String.format("Failed to register jar resource [%s]", url), e); +} +}, +true); +} + +/** + * Unregister the resource uri in function resources, when the reference count of the resource + * is 0, the resource will be removed from the function resources. + * + * @param resourceUris the uris to unregister in function resources. + */ +public void unregisterFunctionResources(List resourceUris) { +if (!resourceUris.isEmpty()) { +resourceUris.forEach( +uri -> { +ResourceCounter counter = functionResourceInfos.get(uri); +if (counter != null && counter.decreaseCounter()) { +functionResourceInfos.remove(uri); +} Review Comment: No. We always assume table env is used by one thread at any moement. ## flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/resource/ResourceManager.java: ## @@ -80,8 +89,10 @@ public ResourceManager(ReadableConfig config, MutableURLClassLoader userClassLoa new Path( config.get(TableConfigOptions.RESOURCES_DOWNLOAD_DIR), String.format("flink-table-%s", UUID.randomUUID())); +this.functionResourceInfos = new HashMap<>(); this.resourceInfos = new HashMap<>(); this.userClassLoader = userClassLoader; +this.createdClassLoaderList = new ArrayList<>(); Review Comment: use a better name? ## flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java: ## @@ -44,7 +44,11 @@ import org.apache.flink.table.resource.ResourceUri; import org.apache.flink.util.Preconditions; +import javax.annotation.Nullable; Review Comment: +1 ## flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java: ## @@ -569,6 +581,8 @@ private Optional resolvePreciseFunctionReference(Object CatalogFunction potentialResult = tempCatalogFunctions.get(normalizedIdentifier); if (potentialResult != null) { +registerFunctionJarResources( +oi.asSummaryString(), potentialResult.getFunctionResources()); Review Comment: for temporary function, Function