是下面这个类吗?  没有import
org.apache.flink.table.functions.ScalarFunction

用flinksql创建Function的时候没有要求import  ScalarFunction

下面是上传py文件的逻辑代码

String originalFilename = file.getOriginalFilename();
destFile = new File(System.getProperty("user.dir") + uploadTempPath, 
originalFilename);
FileUtils.copyInputStreamToFile(file.getInputStream(), destFile);
String localPath = destFile.getPath();
copyToHdfsFromLocalFile(localPath,dst);
hadoopPath = dst+originalFilename;
String serviceName = configuration.get("fs.defaultFS");
StreamTableEnvironment tableEnv = getTableEnv();
String pyStr = 
tableEnv.getConfig().getConfiguration().getString("python.files", "");
log.info(pyStr);
String[] split = pyStr.split(",");
List<String> list = new ArrayList<>(Arrays.asList(split));
list.add(serviceName+hadoopPath);
tableEnv.getConfig().getConfiguration().setString("python.files",StrUtil.join(",",list));
tableEnv.executeSql("DROP FUNCTION IF EXISTS 
"+catalogName+"."+defaultDatabase+"."+functionName).print();
String createSql = "CREATE FUNCTION IF NOT EXISTS 
"+catalogName+"."+defaultDatabase+"."+functionName+" AS '" +className+ "' 
LANGUAGE PYTHON";
tableEnv.executeSql(createSql).print();




799590...@qq.com
 
From: Dian Fu
Date: 2022-04-19 14:20
To: user-zh; 799590989
Subject: Re: Re: Python callback server start failed
NameError: name 'ScalarFunction' is not defined

你 import ScalarFunction了吗?

On Tue, Apr 19, 2022 at 2:08 PM 799590...@qq.com.INVALID 
<799590...@qq.com.invalid> wrote:

以下是刚刚的报错日志,现在里面没有Python callback server start 
failed了,是因为原来黄底部分获取的Python环境变量的路径为C:\Python310(早先安装的python,后来安装python36,设置环境PATH变量为python36),重启机器后好了

2022-04-19 13:58:11 |INFO  |http-nio-9532-exec-3 |HiveCatalog.java:257 
|org.apache.flink.table.catalog.hive.HiveCatalog |Setting hive conf dir as 
/home/tetris/conf
2022-04-19 13:58:11 |INFO  |http-nio-9532-exec-3 |HiveCatalog.java:219 
|org.apache.flink.table.catalog.hive.HiveCatalog |Created HiveCatalog 'myhive'
2022-04-19 13:58:11 |INFO  |http-nio-9532-exec-3 |HiveCatalog.java:299 
|org.apache.flink.table.catalog.hive.HiveCatalog |Connected to Hive metastore
2022-04-19 13:58:11 |INFO  |http-nio-9532-exec-3 |CatalogManager.java:262 
|org.apache.flink.table.catalog.CatalogManager |Set the current default catalog 
as [myhive] and the current default database as [tetris].
2022-04-19 13:58:11 |INFO  |http-nio-9532-exec-3 |SqlTaskRunService.java:599 
|com.chinaoly.tetris.flink.service.SqlTaskRunService |开始加载hdfs上的udf
2022-04-19 13:58:12 |INFO  |http-nio-9532-exec-3 |EnvUtil.java:31 
|com.chinaoly.tetris.flink.util.EnvUtil |registerFactory : 
org.apache.hadoop.fs.FsUrlStreamHandlerFactory
2022-04-19 13:58:12 |INFO  |http-nio-9532-exec-3 |EnvUtil.java:55 
|com.chinaoly.tetris.flink.util.EnvUtil 
|hdfs://chinaoly/tetris/file/function/flink/myReplace.py
2022-04-19 13:58:12 |INFO  |http-nio-9532-exec-3 |SqlTaskRunService.java:621 
|com.chinaoly.tetris.flink.service.SqlTaskRunService |完成加载hdfs上的udf
13:58:20,956 |-INFO in ReconfigureOnChangeTask(born:1650344954461) - Empty 
watch file list. Disabling 
2022-04-19 13:58:21 |INFO  |http-nio-9532-exec-3 |PythonEnvUtils.java:284 
|org.apache.flink.client.python.PythonEnvUtils |Starting Python process with 
environment variables: {USERDOMAIN_ROAMINGPROFILE=DESKTOP-LBP3EGP, 
HADOOP_CONF_DIR=/home/tetris/conf, PROCESSOR_LEVEL=6, SESSIONNAME=Console, 
ALLUSERSPROFILE=C:\ProgramData, PROCESSOR_ARCHITECTURE=AMD64, 
PSModulePath=C:\Program 
Files\WindowsPowerShell\Modules;C:\WINDOWS\system32\WindowsPowerShell\v1.0\Modules,
 SystemDrive=C:, MAVEN_HOME=E:\Program Files\apache-maven-3.8.2, 
USERNAME=Administrator, ProgramFiles(x86)=C:\Program Files (x86), 
FPS_BROWSER_USER_PROFILE_STRING=Default, 
PYTHONPATH=C:/Users/ADMINI~1/AppData/Local/Temp/pyflink/df5887da-8c4d-4807-b891-c313338f1c14/63150f6e-5ea0-4186-be56-ce25e69c4265,
 PATHEXT=.COM;.EXE;.BAT;.CMD;.VBS;.VBE;.JS;.JSE;.WSF;.WSH;.MSC, 
DriverData=C:\Windows\System32\Drivers\DriverData, PYTHON_HOME=E:\Program 
Files\python36, ProgramData=C:\ProgramData, ProgramW6432=C:\Program Files, 
HOMEPATH=\Users\Administrator, PROCESSOR_IDENTIFIER=Intel64 Family 6 Model 94 
Stepping 3, GenuineIntel, ProgramFiles=C:\Program Files, 
PUBLIC=C:\Users\Public, windir=C:\WINDOWS, =::=::\, ZES_ENABLE_SYSMAN=1, 
LOCALAPPDATA=C:\Users\Administrator\AppData\Local, 
ChocolateyLastPathUpdate=132792636909481380, IntelliJ IDEA=E:\Program 
Files\JetBrains\IntelliJ IDEA 2019.3.3\bin;, USERDOMAIN=DESKTOP-LBP3EGP, 
FPS_BROWSER_APP_PROFILE_STRING=Internet Explorer, 
LOGONSERVER=\\DESKTOP-LBP3EGP, JAVA_HOME=C:\Program Files\Java\jdk1.8.0_261, 
OneDrive=C:\Users\Administrator\OneDrive, 
APPDATA=C:\Users\Administrator\AppData\Roaming, 
ChocolateyInstall=C:\ProgramData\chocolatey, SCALA_HOME=C:\Program Files 
(x86)\scala, CommonProgramFiles=C:\Program Files\Common Files, Path=C:\Program 
Files (x86)\Common 
Files\Oracle\Java\javapath;C:\WINDOWS\system32;C:\WINDOWS;C:\WINDOWS\System32\Wbem;C:\WINDOWS\System32\WindowsPowerShell\v1.0\;C:\WINDOWS\System32\OpenSSH\;C:\Program
 Files\Git\cmd;C:\Program Files\Java\jdk1.8.0_261\bin;E:\Program 
Files\apache-maven-3.8.2\bin;E:\Program 
Files\TortoiseSVN\bin;%HADOOP_HOME%\bin;E:\Program 
Files\nodejs\;C:\ProgramData\chocolatey\bin;C:\Program Files 
(x86)\scala\bin;C:\Program Files (x86)\scala\bin;E:\Program 
Files\python36;E:\Program Files\python36\Scripts\;E:\Program 
Files\python36\;C:\Users\Administrator\AppData\Local\Microsoft\WindowsApps;;E:\Program
 Files\JetBrains\IntelliJ IDEA 
2019.3.3\bin;;C:\Users\Administrator\AppData\Roaming\npm, 
JETBRAINS_LICENSE_SERVER=http://fls.jetbrains-agent.com, OS=Windows_NT, 
COMPUTERNAME=DESKTOP-LBP3EGP, PROCESSOR_REVISION=5e03, 
CommonProgramW6432=C:\Program Files\Common Files, PYFLINK_GATEWAY_PORT=53944, 
ComSpec=C:\WINDOWS\system32\cmd.exe, SystemRoot=C:\WINDOWS, 
TEMP=C:\Users\ADMINI~1\AppData\Local\Temp, HOMEDRIVE=C:, 
USERPROFILE=C:\Users\Administrator, TMP=C:\Users\ADMINI~1\AppData\Local\Temp, 
CommonProgramFiles(x86)=C:\Program Files (x86)\Common Files, 
NUMBER_OF_PROCESSORS=4, IDEA_INITIAL_DIRECTORY=C:\Users\Administrator\Desktop}, 
command: python.exe -m pyflink.pyflink_callback_server
2022-04-19 13:58:22 |ERROR |http-nio-9532-exec-3 |ExceptionHandle.java:31 
|com.chinaoly.frm.core.aop.ExceptionHandle 
|org.apache.flink.table.api.ValidationException: SQL validation failed. Cannot 
instantiate user-defined function 'myhive.tetris.myReplace'.
at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:156)
at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:107)
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:205)
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:603)
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:272)
at 
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:101)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:724)
at 
com.chinaoly.tetris.flink.service.SqlTaskRunService.executeDefaultSql(SqlTaskRunService.java:117)
at 
com.chinaoly.tetris.flink.controller.SqlTaskRunController.executeDefaultSql(SqlTaskRunController.java:66)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:209)
at 
org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:136)
at 
org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:102)
at 
org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:893)
at 
org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:799)
at 
org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:87)
at 
org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:991)
at 
org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:925)
at 
org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:981)
at 
org.springframework.web.servlet.FrameworkServlet.doPost(FrameworkServlet.java:884)
at javax.servlet.http.HttpServlet.service(HttpServlet.java:661)
at 
org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:858)
at javax.servlet.http.HttpServlet.service(HttpServlet.java:742)
at 
org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:231)
at 
org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:52)
at 
org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
at 
org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
at 
org.springframework.web.filter.CorsFilter.doFilterInternal(CorsFilter.java:96)
at 
org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
at 
org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
at 
org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
at com.alibaba.druid.support.http.WebStatFilter.doFilter(WebStatFilter.java:124)
at 
org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
at 
org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
at 
org.springframework.web.filter.HttpPutFormContentFilter.doFilterInternal(HttpPutFormContentFilter.java:109)
at 
org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
at 
org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
at 
org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
at 
org.springframework.web.filter.HiddenHttpMethodFilter.doFilterInternal(HiddenHttpMethodFilter.java:93)
at 
org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
at 
org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
at 
org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
at 
org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:200)
at 
org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
at 
org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
at 
org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
at 
org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:199)
at 
org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:96)
at 
org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:493)
at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:137)
at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:81)
at 
org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:87)
at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:343)
at org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:798)
at 
org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:66)
at 
org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:806)
at 
org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1498)
at 
org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:49)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at 
org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.table.api.ValidationException: Cannot instantiate 
user-defined function 'myhive.tetris.myReplace'.
at 
org.apache.flink.table.functions.UserDefinedFunctionHelper.instantiateFunction(UserDefinedFunctionHelper.java:219)
at 
org.apache.flink.table.catalog.FunctionCatalog.getFunctionDefinition(FunctionCatalog.java:659)
at 
org.apache.flink.table.catalog.FunctionCatalog.resolvePreciseFunctionReference(FunctionCatalog.java:582)
at 
org.apache.flink.table.catalog.FunctionCatalog.lambda$resolveAmbiguousFunctionReference$3(FunctionCatalog.java:624)
at java.util.Optional.orElseGet(Optional.java:267)
at 
org.apache.flink.table.catalog.FunctionCatalog.resolveAmbiguousFunctionReference(FunctionCatalog.java:624)
at 
org.apache.flink.table.catalog.FunctionCatalog.lookupFunction(FunctionCatalog.java:362)
at 
org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.lookupOperatorOverloads(FunctionCatalogOperatorTable.java:97)
at 
org.apache.calcite.sql.util.ChainedSqlOperatorTable.lookupOperatorOverloads(ChainedSqlOperatorTable.java:67)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1182)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1168)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1199)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1168)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1168)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:944)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:703)
at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:151)
... 65 more
Caused by: java.lang.IllegalStateException: Instantiating python function 
'myReplace.MyReplace' failed.
at 
org.apache.flink.table.functions.python.utils.PythonFunctionUtils.getPythonFunction(PythonFunctionUtils.java:48)
at 
org.apache.flink.table.functions.UserDefinedFunctionHelper.instantiateFunction(UserDefinedFunctionHelper.java:206)
... 81 more
Caused by: java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.flink.table.functions.python.utils.PythonFunctionUtils.getPythonFunction(PythonFunctionUtils.java:45)
... 82 more
Caused by: org.apache.flink.api.python.shaded.py4j.Py4JException: An exception 
was raised by the Python Proxy. Return Message: Traceback (most recent call 
last):
  File "E:\Program Files\python36\lib\site-packages\py4j\java_gateway.py", line 
2410, in _call_proxy
    return_value = getattr(self.pool[obj_id], method)(*params)
  File "E:\Program Files\python36\lib\site-packages\pyflink\java_gateway.py", 
line 169, in getPythonFunction
    udf_wrapper = getattr(importlib.import_module(moduleName), objectName)
  File "E:\Program Files\python36\lib\importlib\__init__.py", line 126, in 
import_module
    return _bootstrap._gcd_import(name[level:], package, level)
  File "<frozen importlib._bootstrap>", line 994, in _gcd_import
  File "<frozen importlib._bootstrap>", line 971, in _find_and_load
  File "<frozen importlib._bootstrap>", line 955, in _find_and_load_unlocked
  File "<frozen importlib._bootstrap>", line 665, in _load_unlocked
  File "<frozen importlib._bootstrap_external>", line 678, in exec_module
  File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
  File 
"C:\Users\ADMINI~1\AppData\Local\Temp\pyflink\df5887da-8c4d-4807-b891-c313338f1c14\63150f6e-5ea0-4186-be56-ce25e69c4265\myReplace.py",
 line 3, in <module>
    class MyReplace(ScalarFunction):
NameError: name 'ScalarFunction' is not defined

at 
org.apache.flink.api.python.shaded.py4j.Protocol.getReturnValue(Protocol.java:476)
at 
org.apache.flink.api.python.shaded.py4j.reflection.PythonProxyHandler.invoke(PythonProxyHandler.java:108)
at com.sun.proxy.$Proxy159.getPythonFunction(Unknown Source)
at 
org.apache.flink.client.python.PythonFunctionFactoryImpl.getPythonFunction(PythonFunctionFactoryImpl.java:47)
at 
org.apache.flink.client.python.PythonFunctionFactory.getPythonFunction(PythonFunctionFactory.java:131)
... 87 more

2022-04-19 13:58:22 |ERROR |http-nio-9532-exec-3 |DirectJDKLog.java:182 
|org.apache.catalina.core.ContainerBase.[Tomcat].[localhost].[/].[dispatcherServlet]
 |Servlet.service() for servlet [dispatcherServlet] in context with path [] 
threw exception [Request processing failed; nested exception is 
org.apache.flink.table.api.ValidationException: SQL validation failed. Cannot 
instantiate user-defined function 'myhive.tetris.myReplace'.] with root cause
org.apache.flink.api.python.shaded.py4j.Py4JException: An exception was raised 
by the Python Proxy. Return Message: Traceback (most recent call last):
  File "E:\Program Files\python36\lib\site-packages\py4j\java_gateway.py", line 
2410, in _call_proxy
    return_value = getattr(self.pool[obj_id], method)(*params)
  File "E:\Program Files\python36\lib\site-packages\pyflink\java_gateway.py", 
line 169, in getPythonFunction
    udf_wrapper = getattr(importlib.import_module(moduleName), objectName)
  File "E:\Program Files\python36\lib\importlib\__init__.py", line 126, in 
import_module
    return _bootstrap._gcd_import(name[level:], package, level)
  File "<frozen importlib._bootstrap>", line 994, in _gcd_import
  File "<frozen importlib._bootstrap>", line 971, in _find_and_load
  File "<frozen importlib._bootstrap>", line 955, in _find_and_load_unlocked
  File "<frozen importlib._bootstrap>", line 665, in _load_unlocked
  File "<frozen importlib._bootstrap_external>", line 678, in exec_module
  File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
  File 
"C:\Users\ADMINI~1\AppData\Local\Temp\pyflink\df5887da-8c4d-4807-b891-c313338f1c14\63150f6e-5ea0-4186-be56-ce25e69c4265\myReplace.py",
 line 3, in <module>
    class MyReplace(ScalarFunction):
NameError: name 'ScalarFunction' is not defined
at 
org.apache.flink.api.python.shaded.py4j.Protocol.getReturnValue(Protocol.java:476)
at 
org.apache.flink.api.python.shaded.py4j.reflection.PythonProxyHandler.invoke(PythonProxyHandler.java:108)
at com.sun.proxy.$Proxy159.getPythonFunction(Unknown Source)
at 
org.apache.flink.client.python.PythonFunctionFactoryImpl.getPythonFunction(PythonFunctionFactoryImpl.java:47)
at 
org.apache.flink.client.python.PythonFunctionFactory.getPythonFunction(PythonFunctionFactory.java:131)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.flink.table.functions.python.utils.PythonFunctionUtils.getPythonFunction(PythonFunctionUtils.java:45)
at 
org.apache.flink.table.functions.UserDefinedFunctionHelper.instantiateFunction(UserDefinedFunctionHelper.java:206)
at 
org.apache.flink.table.catalog.FunctionCatalog.getFunctionDefinition(FunctionCatalog.java:659)
at 
org.apache.flink.table.catalog.FunctionCatalog.resolvePreciseFunctionReference(FunctionCatalog.java:582)
at 
org.apache.flink.table.catalog.FunctionCatalog.lambda$resolveAmbiguousFunctionReference$3(FunctionCatalog.java:624)
at java.util.Optional.orElseGet(Optional.java:267)
at 
org.apache.flink.table.catalog.FunctionCatalog.resolveAmbiguousFunctionReference(FunctionCatalog.java:624)
at 
org.apache.flink.table.catalog.FunctionCatalog.lookupFunction(FunctionCatalog.java:362)
at 
org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.lookupOperatorOverloads(FunctionCatalogOperatorTable.java:97)
at 
org.apache.calcite.sql.util.ChainedSqlOperatorTable.lookupOperatorOverloads(ChainedSqlOperatorTable.java:67)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1182)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1168)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1199)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1168)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1168)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:944)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:703)
at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:151)
at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:107)
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:205)
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:603)
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:272)
at 
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:101)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:724)
at 
com.chinaoly.tetris.flink.service.SqlTaskRunService.executeDefaultSql(SqlTaskRunService.java:117)
at 
com.chinaoly.tetris.flink.controller.SqlTaskRunController.executeDefaultSql(SqlTaskRunController.java:66)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:209)
at 
org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:136)
at 
org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:102)
at 
org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:893)
at 
org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:799)
at 
org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:87)
at 
org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:991)
at 
org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:925)
at 
org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:981)
at 
org.springframework.web.servlet.FrameworkServlet.doPost(FrameworkServlet.java:884)
at javax.servlet.http.HttpServlet.service(HttpServlet.java:661)
at 
org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:858)
at javax.servlet.http.HttpServlet.service(HttpServlet.java:742)
at 
org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:231)
at 
org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:52)
at 
org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
at 
org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
at 
org.springframework.web.filter.CorsFilter.doFilterInternal(CorsFilter.java:96)
at 
org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
at 
org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
at 
org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
at com.alibaba.druid.support.http.WebStatFilter.doFilter(WebStatFilter.java:124)
at 
org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
at 
org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
at 
org.springframework.web.filter.HttpPutFormContentFilter.doFilterInternal(HttpPutFormContentFilter.java:109)
at 
org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
at 
org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
at 
org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
at 
org.springframework.web.filter.HiddenHttpMethodFilter.doFilterInternal(HiddenHttpMethodFilter.java:93)
at 
org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
at 
org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
at 
org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
at 
org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:200)
at 
org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
at 
org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
at 
org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
at 
org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:199)
at 
org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:96)
at 
org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:493)
at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:137)
at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:81)
at 
org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:87)
at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:343)
at org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:798)
at 
org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:66)
at 
org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:806)
at 
org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1498)
at 
org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:49)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at 
org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61)
at java.lang.Thread.run(Thread.java:748)



799590...@qq.com

From: Dian Fu
Date: 2022-04-19 13:49
To: user-zh; 799590989
Subject: Re: Python callback server start failed
发一下完整的日志文件?

On Tue, Apr 19, 2022 at 10:53 AM 799590...@qq.com.INVALID
<799590...@qq.com.invalid> wrote:

> 软件版本
>
> flink-1.13.6
> python3.6.8
> 本地win10也安装了python3.6.8且添加了python的环境变量和成功运行了$ python -m pip install
> apache-flink==1.13.6
> standalonesession方式部署的,一个JM  两个TM,3台集群都安装了python3.6.8 且安装了pyflink-1.13.6
>
> 问题:
>
> 1、调用python udf时会报如下错误
> Servlet.service() for servlet [dispatcherServlet] in context with path []
> threw exception [Request processing failed; nested exception is
> org.apache.flink.table.api.ValidationException: SQL validation failed.
> Cannot instantiate user-defined function 'myhive.tetris.myreplace'.] with
> root cause
> java.lang.RuntimeException: Python callback server start failed!
>
> 2、sql中CREATE FUNCTION 中的 AS 后面的类路径在python环境下的填写有什么规则吗?
>
> python udf文件myReplace.py的内容
>
> from pyflink.table.expressions import call
>
> class MyReplace(ScalarFunction):
>   def __init__(self):
>     self.factor = 12
>
>   def eval(self, s):
>     return s.replace('buy','sale')
>
> 获取远程集群环境,其中的catalogName=myhive,defaultDatabase=tetris
>
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.createRemoteEnvironment(host,port);
> env.setStateBackend(new HashMapStateBackend());
> env.enableCheckpointing(1000);
> env.getCheckpointConfig().setCheckpointStorage(new
> JobManagerCheckpointStorage());
>
> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
> env.getCheckpointConfig().setCheckpointTimeout(60000);
> env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
> env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
> env.getCheckpointConfig().setTolerableCheckpointFailureNumber(0);
> env.getCheckpointConfig().setExternalizedCheckpointCleanup(
> CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION
> );
> EnvironmentSettings bsSettings =
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
> StreamTableEnvironment tableEnv =
> StreamTableEnvironment.create(env,bsSettings);
>
> tableEnv.getConfig().getConfiguration().setBoolean("table.exec.hive.fallback-mapred-reader",true);
>
> tableEnv.getConfig().getConfiguration().setBoolean("table.dynamic-table-options.enabled",true);
> if (!catalogName.equals(tableEnv.getCurrentCatalog())) {
>     HiveCatalog hiveCatalog = new HiveCatalog(catalogName,
> defaultDatabase, hiveConfDir);
>     tableEnv.registerCatalog(catalogName, hiveCatalog);
> }
> tableEnv.useCatalog(catalogName);
> tableEnv.useDatabase(defaultDatabase);
> List<String> jars = new ArrayList<>();
> List<String> pys = new ArrayList<>();
> log.info("开始加载hdfs上的udf");
> String prefix = "/file/function/flink/";
> try {
>     EnvUtil.registerFactory(new FsUrlStreamHandlerFactory());
>     for (String hdf : EnvUtil.listHdfs(prefix,configuration)) {
> if (hdf.endsWith(".jar")) {
>     jars.add(hdf);
>     EnvUtil.loadJar(URLUtil.url(hdf));
> } else if (hdf.endsWith(".py")){
>     pys.add(hdf);
> }
>     }
>
> tableEnv.getConfig().getConfiguration().set(PipelineOptions.CLASSPATHS,jars);
>
> tableEnv.getConfig().getConfiguration().setString("python.files",StringUtils.join(pys,","));
>     log.info("完成加载hdfs上的udf");
> }catch (Exception e){
>     e.printStackTrace();
> }
>
> python文件存放在hdfs指定的路劲下面
>
> 上传py文件后通过tableEnv.executeSql 执行了
>
> CREATE FUNCTION IF NOT EXISTS myhive.tetris.myReplaceAS
> 'myReplace.MyReplace' LANGUAGE PYTHON
>
> 先行感谢flink官方同学的辛苦付出
>
>
>
> 799590...@qq.com
>

回复