这个文件:myReplace.py On Tue, Apr 19, 2022 at 2:38 PM 799590...@qq.com <799590...@qq.com> wrote:
> > 是下面这个类吗? 没有import > org.apache.flink.table.functions.ScalarFunction > > 用flinksql创建Function的时候没有要求import ScalarFunction > > 下面是上传py文件的逻辑代码 > > String originalFilename = file.getOriginalFilename(); > destFile = new File(System.getProperty("user.dir") + uploadTempPath, > originalFilename); > FileUtils.copyInputStreamToFile(file.getInputStream(), destFile); > String localPath = destFile.getPath(); > copyToHdfsFromLocalFile(localPath,dst); > hadoopPath = dst+originalFilename; > String serviceName = configuration.get("fs.defaultFS"); > StreamTableEnvironment tableEnv = getTableEnv(); > String pyStr = > tableEnv.getConfig().getConfiguration().getString("python.files", ""); > log.info(pyStr); > String[] split = pyStr.split(","); > List<String> list = new ArrayList<>(Arrays.asList(split)); > list.add(serviceName+hadoopPath); > > tableEnv.getConfig().getConfiguration().setString("python.files",StrUtil.join(",",list)); > tableEnv.executeSql("DROP FUNCTION IF EXISTS > "+catalogName+"."+defaultDatabase+"."+functionName).print(); > String createSql = "CREATE FUNCTION IF NOT EXISTS > "+catalogName+"."+defaultDatabase+"."+functionName+" AS '" +className+ "' > LANGUAGE PYTHON"; > tableEnv.executeSql(createSql).print(); > > > ------------------------------ > 799590...@qq.com > > > *From:* Dian Fu <dian0511...@gmail.com> > *Date:* 2022-04-19 14:20 > *To:* user-zh <user-zh@flink.apache.org>; 799590989 <799590...@qq.com> > *Subject:* Re: Re: Python callback server start failed > NameError: name 'ScalarFunction' is not defined > > 你 import ScalarFunction了吗? > > On Tue, Apr 19, 2022 at 2:08 PM 799590...@qq.com.INVALID > <799590...@qq.com.invalid> wrote: > >> >> 以下是刚刚的报错日志,现在里面没有Python callback server start >> failed了,是因为原来黄底部分获取的Python环境变量的路径为C:\Python310(早先安装的python,后来安装python36,设置环境PATH变量为python36),重启机器后好了 >> >> 2022-04-19 13:58:11 |INFO |http-nio-9532-exec-3 |HiveCatalog.java:257 >> |org.apache.flink.table.catalog.hive.HiveCatalog |Setting hive conf dir as >> /home/tetris/conf >> 2022-04-19 13:58:11 |INFO |http-nio-9532-exec-3 |HiveCatalog.java:219 >> |org.apache.flink.table.catalog.hive.HiveCatalog |Created HiveCatalog >> 'myhive' >> 2022-04-19 13:58:11 |INFO |http-nio-9532-exec-3 |HiveCatalog.java:299 >> |org.apache.flink.table.catalog.hive.HiveCatalog |Connected to Hive >> metastore >> 2022-04-19 13:58:11 |INFO |http-nio-9532-exec-3 |CatalogManager.java:262 >> |org.apache.flink.table.catalog.CatalogManager |Set the current default >> catalog as [myhive] and the current default database as [tetris]. >> 2022-04-19 13:58:11 |INFO |http-nio-9532-exec-3 >> |SqlTaskRunService.java:599 >> |com.chinaoly.tetris.flink.service.SqlTaskRunService |开始加载hdfs上的udf >> 2022-04-19 13:58:12 |INFO |http-nio-9532-exec-3 |EnvUtil.java:31 >> |com.chinaoly.tetris.flink.util.EnvUtil |registerFactory : >> org.apache.hadoop.fs.FsUrlStreamHandlerFactory >> 2022-04-19 13:58:12 |INFO |http-nio-9532-exec-3 |EnvUtil.java:55 >> |com.chinaoly.tetris.flink.util.EnvUtil >> |hdfs://chinaoly/tetris/file/function/flink/myReplace.py >> 2022-04-19 13:58:12 |INFO |http-nio-9532-exec-3 >> |SqlTaskRunService.java:621 >> |com.chinaoly.tetris.flink.service.SqlTaskRunService |完成加载hdfs上的udf >> 13:58:20,956 |-INFO in ReconfigureOnChangeTask(born:1650344954461) - >> Empty watch file list. Disabling >> 2022-04-19 13:58:21 |INFO |http-nio-9532-exec-3 |PythonEnvUtils.java:284 >> |org.apache.flink.client.python.PythonEnvUtils |Starting Python process >> with environment variables: {USERDOMAIN_ROAMINGPROFILE=DESKTOP-LBP3EGP, >> HADOOP_CONF_DIR=/home/tetris/conf, PROCESSOR_LEVEL=6, SESSIONNAME=Console, >> ALLUSERSPROFILE=C:\ProgramData, PROCESSOR_ARCHITECTURE=AMD64, >> PSModulePath=C:\Program >> Files\WindowsPowerShell\Modules;C:\WINDOWS\system32\WindowsPowerShell\v1.0\Modules, >> SystemDrive=C:, MAVEN_HOME=E:\Program Files\apache-maven-3.8.2, >> USERNAME=Administrator, ProgramFiles(x86)=C:\Program Files (x86), >> FPS_BROWSER_USER_PROFILE_STRING=Default, >> PYTHONPATH=C:/Users/ADMINI~1/AppData/Local/Temp/pyflink/df5887da-8c4d-4807-b891-c313338f1c14/63150f6e-5ea0-4186-be56-ce25e69c4265, >> PATHEXT=.COM;.EXE;.BAT;.CMD;.VBS;.VBE;.JS;.JSE;.WSF;.WSH;.MSC, >> DriverData=C:\Windows\System32\Drivers\DriverData, PYTHON_HOME=E:\Program >> Files\python36, ProgramData=C:\ProgramData, ProgramW6432=C:\Program Files, >> HOMEPATH=\Users\Administrator, PROCESSOR_IDENTIFIER=Intel64 Family 6 Model >> 94 Stepping 3, GenuineIntel, ProgramFiles=C:\Program Files, >> PUBLIC=C:\Users\Public, windir=C:\WINDOWS, =::=::\, ZES_ENABLE_SYSMAN=1, >> LOCALAPPDATA=C:\Users\Administrator\AppData\Local, >> ChocolateyLastPathUpdate=132792636909481380, IntelliJ IDEA=E:\Program >> Files\JetBrains\IntelliJ IDEA 2019.3.3\bin;, USERDOMAIN=DESKTOP-LBP3EGP, >> FPS_BROWSER_APP_PROFILE_STRING=Internet Explorer, >> LOGONSERVER=\\DESKTOP-LBP3EGP, JAVA_HOME=C:\Program >> Files\Java\jdk1.8.0_261, OneDrive=C:\Users\Administrator\OneDrive, >> APPDATA=C:\Users\Administrator\AppData\Roaming, >> ChocolateyInstall=C:\ProgramData\chocolatey, SCALA_HOME=C:\Program Files >> (x86)\scala, CommonProgramFiles=C:\Program Files\Common Files, >> Path=C:\Program Files (x86)\Common >> Files\Oracle\Java\javapath;C:\WINDOWS\system32;C:\WINDOWS;C:\WINDOWS\System32\Wbem;C:\WINDOWS\System32\WindowsPowerShell\v1.0\;C:\WINDOWS\System32\OpenSSH\;C:\Program >> Files\Git\cmd;C:\Program Files\Java\jdk1.8.0_261\bin;E:\Program >> Files\apache-maven-3.8.2\bin;E:\Program >> Files\TortoiseSVN\bin;%HADOOP_HOME%\bin;E:\Program >> Files\nodejs\;C:\ProgramData\chocolatey\bin;C:\Program Files >> (x86)\scala\bin;C:\Program Files (x86)\scala\bin;E:\Program >> Files\python36;E:\Program Files\python36\Scripts\;E:\Program >> Files\python36\;C:\Users\Administrator\AppData\Local\Microsoft\WindowsApps;;E:\Program >> Files\JetBrains\IntelliJ IDEA >> 2019.3.3\bin;;C:\Users\Administrator\AppData\Roaming\npm, >> JETBRAINS_LICENSE_SERVER=http://fls.jetbrains-agent.com, OS=Windows_NT, >> COMPUTERNAME=DESKTOP-LBP3EGP, PROCESSOR_REVISION=5e03, >> CommonProgramW6432=C:\Program Files\Common Files, >> PYFLINK_GATEWAY_PORT=53944, ComSpec=C:\WINDOWS\system32\cmd.exe, >> SystemRoot=C:\WINDOWS, TEMP=C:\Users\ADMINI~1\AppData\Local\Temp, >> HOMEDRIVE=C:, USERPROFILE=C:\Users\Administrator, >> TMP=C:\Users\ADMINI~1\AppData\Local\Temp, >> CommonProgramFiles(x86)=C:\Program Files (x86)\Common Files, >> NUMBER_OF_PROCESSORS=4, >> IDEA_INITIAL_DIRECTORY=C:\Users\Administrator\Desktop}, command: python.exe >> -m pyflink.pyflink_callback_server >> 2022-04-19 13:58:22 |ERROR |http-nio-9532-exec-3 |ExceptionHandle.java:31 >> |com.chinaoly.frm.core.aop.ExceptionHandle >> |org.apache.flink.table.api.ValidationException: SQL validation failed. >> Cannot instantiate user-defined function 'myhive.tetris.myReplace'. >> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org >> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:156) >> at >> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:107) >> at >> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:205) >> at >> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:603) >> at >> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:272) >> at >> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:101) >> at >> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:724) >> at >> com.chinaoly.tetris.flink.service.SqlTaskRunService.executeDefaultSql(SqlTaskRunService.java:117) >> at >> com.chinaoly.tetris.flink.controller.SqlTaskRunController.executeDefaultSql(SqlTaskRunController.java:66) >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> at >> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >> at >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> at java.lang.reflect.Method.invoke(Method.java:498) >> at >> org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:209) >> at >> org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:136) >> at >> org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:102) >> at >> org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:893) >> at >> org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:799) >> at >> org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:87) >> at >> org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:991) >> at >> org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:925) >> at >> org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:981) >> at >> org.springframework.web.servlet.FrameworkServlet.doPost(FrameworkServlet.java:884) >> at javax.servlet.http.HttpServlet.service(HttpServlet.java:661) >> at >> org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:858) >> at javax.servlet.http.HttpServlet.service(HttpServlet.java:742) >> at >> org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:231) >> at >> org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) >> at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:52) >> at >> org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) >> at >> org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) >> at >> org.springframework.web.filter.CorsFilter.doFilterInternal(CorsFilter.java:96) >> at >> org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107) >> at >> org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) >> at >> org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) >> at >> com.alibaba.druid.support.http.WebStatFilter.doFilter(WebStatFilter.java:124) >> at >> org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) >> at >> org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) >> at >> org.springframework.web.filter.HttpPutFormContentFilter.doFilterInternal(HttpPutFormContentFilter.java:109) >> at >> org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107) >> at >> org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) >> at >> org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) >> at >> org.springframework.web.filter.HiddenHttpMethodFilter.doFilterInternal(HiddenHttpMethodFilter.java:93) >> at >> org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107) >> at >> org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) >> at >> org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) >> at >> org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:200) >> at >> org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107) >> at >> org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) >> at >> org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) >> at >> org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:199) >> at >> org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:96) >> at >> org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:493) >> at >> org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:137) >> at >> org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:81) >> at >> org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:87) >> at >> org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:343) >> at >> org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:798) >> at >> org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:66) >> at >> org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:806) >> at org.apache.tomcat.util.net >> .NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1498) >> at org.apache.tomcat.util.net >> .SocketProcessorBase.run(SocketProcessorBase.java:49) >> at >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) >> at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) >> at >> org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61) >> at java.lang.Thread.run(Thread.java:748) >> Caused by: org.apache.flink.table.api.ValidationException: Cannot >> instantiate user-defined function 'myhive.tetris.myReplace'. >> at >> org.apache.flink.table.functions.UserDefinedFunctionHelper.instantiateFunction(UserDefinedFunctionHelper.java:219) >> at >> org.apache.flink.table.catalog.FunctionCatalog.getFunctionDefinition(FunctionCatalog.java:659) >> at >> org.apache.flink.table.catalog.FunctionCatalog.resolvePreciseFunctionReference(FunctionCatalog.java:582) >> at >> org.apache.flink.table.catalog.FunctionCatalog.lambda$resolveAmbiguousFunctionReference$3(FunctionCatalog.java:624) >> at java.util.Optional.orElseGet(Optional.java:267) >> at >> org.apache.flink.table.catalog.FunctionCatalog.resolveAmbiguousFunctionReference(FunctionCatalog.java:624) >> at >> org.apache.flink.table.catalog.FunctionCatalog.lookupFunction(FunctionCatalog.java:362) >> at >> org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.lookupOperatorOverloads(FunctionCatalogOperatorTable.java:97) >> at >> org.apache.calcite.sql.util.ChainedSqlOperatorTable.lookupOperatorOverloads(ChainedSqlOperatorTable.java:67) >> at >> org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1182) >> at >> org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1168) >> at >> org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1199) >> at >> org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1168) >> at >> org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1168) >> at >> org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:944) >> at >> org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:703) >> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org >> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:151) >> ... 65 more >> Caused by: java.lang.IllegalStateException: Instantiating python function >> 'myReplace.MyReplace' failed. >> at >> org.apache.flink.table.functions.python.utils.PythonFunctionUtils.getPythonFunction(PythonFunctionUtils.java:48) >> at >> org.apache.flink.table.functions.UserDefinedFunctionHelper.instantiateFunction(UserDefinedFunctionHelper.java:206) >> ... 81 more >> Caused by: java.lang.reflect.InvocationTargetException >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> at >> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >> at >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> at java.lang.reflect.Method.invoke(Method.java:498) >> at >> org.apache.flink.table.functions.python.utils.PythonFunctionUtils.getPythonFunction(PythonFunctionUtils.java:45) >> ... 82 more >> Caused by: org.apache.flink.api.python.shaded.py4j.Py4JException: An >> exception was raised by the Python Proxy. Return Message: Traceback (most >> recent call last): >> File "E:\Program >> Files\python36\lib\site-packages\py4j\java_gateway.py", line 2410, in >> _call_proxy >> return_value = getattr(self.pool[obj_id], method)(*params) >> File "E:\Program >> Files\python36\lib\site-packages\pyflink\java_gateway.py", line 169, in >> getPythonFunction >> udf_wrapper = getattr(importlib.import_module(moduleName), objectName) >> File "E:\Program Files\python36\lib\importlib\__init__.py", line 126, >> in import_module >> return _bootstrap._gcd_import(name[level:], package, level) >> File "<frozen importlib._bootstrap>", line 994, in _gcd_import >> File "<frozen importlib._bootstrap>", line 971, in _find_and_load >> File "<frozen importlib._bootstrap>", line 955, in >> _find_and_load_unlocked >> File "<frozen importlib._bootstrap>", line 665, in _load_unlocked >> File "<frozen importlib._bootstrap_external>", line 678, in exec_module >> File "<frozen importlib._bootstrap>", line 219, in >> _call_with_frames_removed >> File >> "C:\Users\ADMINI~1\AppData\Local\Temp\pyflink\df5887da-8c4d-4807-b891-c313338f1c14\63150f6e-5ea0-4186-be56-ce25e69c4265\myReplace.py", >> line 3, in <module> >> class MyReplace(ScalarFunction): >> NameError: name 'ScalarFunction' is not defined >> >> at >> org.apache.flink.api.python.shaded.py4j.Protocol.getReturnValue(Protocol.java:476) >> at >> org.apache.flink.api.python.shaded.py4j.reflection.PythonProxyHandler.invoke(PythonProxyHandler.java:108) >> at com.sun.proxy.$Proxy159.getPythonFunction(Unknown Source) >> at >> org.apache.flink.client.python.PythonFunctionFactoryImpl.getPythonFunction(PythonFunctionFactoryImpl.java:47) >> at >> org.apache.flink.client.python.PythonFunctionFactory.getPythonFunction(PythonFunctionFactory.java:131) >> ... 87 more >> >> 2022-04-19 13:58:22 |ERROR |http-nio-9532-exec-3 |DirectJDKLog.java:182 >> |org.apache.catalina.core.ContainerBase.[Tomcat].[localhost].[/].[dispatcherServlet] >> |Servlet.service() for servlet [dispatcherServlet] in context with path [] >> threw exception [Request processing failed; nested exception is >> org.apache.flink.table.api.ValidationException: SQL validation failed. >> Cannot instantiate user-defined function 'myhive.tetris.myReplace'.] with >> root cause >> org.apache.flink.api.python.shaded.py4j.Py4JException: An exception was >> raised by the Python Proxy. Return Message: Traceback (most recent call >> last): >> File "E:\Program >> Files\python36\lib\site-packages\py4j\java_gateway.py", line 2410, in >> _call_proxy >> return_value = getattr(self.pool[obj_id], method)(*params) >> File "E:\Program >> Files\python36\lib\site-packages\pyflink\java_gateway.py", line 169, in >> getPythonFunction >> udf_wrapper = getattr(importlib.import_module(moduleName), objectName) >> File "E:\Program Files\python36\lib\importlib\__init__.py", line 126, >> in import_module >> return _bootstrap._gcd_import(name[level:], package, level) >> File "<frozen importlib._bootstrap>", line 994, in _gcd_import >> File "<frozen importlib._bootstrap>", line 971, in _find_and_load >> File "<frozen importlib._bootstrap>", line 955, in >> _find_and_load_unlocked >> File "<frozen importlib._bootstrap>", line 665, in _load_unlocked >> File "<frozen importlib._bootstrap_external>", line 678, in exec_module >> File "<frozen importlib._bootstrap>", line 219, in >> _call_with_frames_removed >> File >> "C:\Users\ADMINI~1\AppData\Local\Temp\pyflink\df5887da-8c4d-4807-b891-c313338f1c14\63150f6e-5ea0-4186-be56-ce25e69c4265\myReplace.py", >> line 3, in <module> >> class MyReplace(ScalarFunction): >> NameError: name 'ScalarFunction' is not defined >> at >> org.apache.flink.api.python.shaded.py4j.Protocol.getReturnValue(Protocol.java:476) >> at >> org.apache.flink.api.python.shaded.py4j.reflection.PythonProxyHandler.invoke(PythonProxyHandler.java:108) >> at com.sun.proxy.$Proxy159.getPythonFunction(Unknown Source) >> at >> org.apache.flink.client.python.PythonFunctionFactoryImpl.getPythonFunction(PythonFunctionFactoryImpl.java:47) >> at >> org.apache.flink.client.python.PythonFunctionFactory.getPythonFunction(PythonFunctionFactory.java:131) >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> at >> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >> at >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> at java.lang.reflect.Method.invoke(Method.java:498) >> at >> org.apache.flink.table.functions.python.utils.PythonFunctionUtils.getPythonFunction(PythonFunctionUtils.java:45) >> at >> org.apache.flink.table.functions.UserDefinedFunctionHelper.instantiateFunction(UserDefinedFunctionHelper.java:206) >> at >> org.apache.flink.table.catalog.FunctionCatalog.getFunctionDefinition(FunctionCatalog.java:659) >> at >> org.apache.flink.table.catalog.FunctionCatalog.resolvePreciseFunctionReference(FunctionCatalog.java:582) >> at >> org.apache.flink.table.catalog.FunctionCatalog.lambda$resolveAmbiguousFunctionReference$3(FunctionCatalog.java:624) >> at java.util.Optional.orElseGet(Optional.java:267) >> at >> org.apache.flink.table.catalog.FunctionCatalog.resolveAmbiguousFunctionReference(FunctionCatalog.java:624) >> at >> org.apache.flink.table.catalog.FunctionCatalog.lookupFunction(FunctionCatalog.java:362) >> at >> org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.lookupOperatorOverloads(FunctionCatalogOperatorTable.java:97) >> at >> org.apache.calcite.sql.util.ChainedSqlOperatorTable.lookupOperatorOverloads(ChainedSqlOperatorTable.java:67) >> at >> org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1182) >> at >> org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1168) >> at >> org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1199) >> at >> org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1168) >> at >> org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1168) >> at >> org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:944) >> at >> org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:703) >> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org >> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:151) >> at >> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:107) >> at >> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:205) >> at >> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:603) >> at >> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:272) >> at >> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:101) >> at >> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:724) >> at >> com.chinaoly.tetris.flink.service.SqlTaskRunService.executeDefaultSql(SqlTaskRunService.java:117) >> at >> com.chinaoly.tetris.flink.controller.SqlTaskRunController.executeDefaultSql(SqlTaskRunController.java:66) >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> at >> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >> at >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> at java.lang.reflect.Method.invoke(Method.java:498) >> at >> org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:209) >> at >> org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:136) >> at >> org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:102) >> at >> org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:893) >> at >> org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:799) >> at >> org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:87) >> at >> org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:991) >> at >> org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:925) >> at >> org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:981) >> at >> org.springframework.web.servlet.FrameworkServlet.doPost(FrameworkServlet.java:884) >> at javax.servlet.http.HttpServlet.service(HttpServlet.java:661) >> at >> org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:858) >> at javax.servlet.http.HttpServlet.service(HttpServlet.java:742) >> at >> org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:231) >> at >> org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) >> at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:52) >> at >> org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) >> at >> org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) >> at >> org.springframework.web.filter.CorsFilter.doFilterInternal(CorsFilter.java:96) >> at >> org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107) >> at >> org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) >> at >> org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) >> at >> com.alibaba.druid.support.http.WebStatFilter.doFilter(WebStatFilter.java:124) >> at >> org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) >> at >> org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) >> at >> org.springframework.web.filter.HttpPutFormContentFilter.doFilterInternal(HttpPutFormContentFilter.java:109) >> at >> org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107) >> at >> org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) >> at >> org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) >> at >> org.springframework.web.filter.HiddenHttpMethodFilter.doFilterInternal(HiddenHttpMethodFilter.java:93) >> at >> org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107) >> at >> org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) >> at >> org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) >> at >> org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:200) >> at >> org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107) >> at >> org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) >> at >> org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) >> at >> org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:199) >> at >> org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:96) >> at >> org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:493) >> at >> org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:137) >> at >> org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:81) >> at >> org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:87) >> at >> org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:343) >> at >> org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:798) >> at >> org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:66) >> at >> org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:806) >> at org.apache.tomcat.util.net >> .NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1498) >> at org.apache.tomcat.util.net >> .SocketProcessorBase.run(SocketProcessorBase.java:49) >> at >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) >> at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) >> at >> org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61) >> at java.lang.Thread.run(Thread.java:748) >> >> >> >> 799590...@qq.com >> >> From: Dian Fu >> Date: 2022-04-19 13:49 >> To: user-zh; 799590989 >> Subject: Re: Python callback server start failed >> 发一下完整的日志文件? >> >> On Tue, Apr 19, 2022 at 10:53 AM 799590...@qq.com.INVALID >> <799590...@qq.com.invalid> wrote: >> >> > 软件版本 >> > >> > flink-1.13.6 >> > python3.6.8 >> > 本地win10也安装了python3.6.8且添加了python的环境变量和成功运行了$ python -m pip install >> > apache-flink==1.13.6 >> > standalonesession方式部署的,一个JM 两个TM,3台集群都安装了python3.6.8 且安装了pyflink-1.13.6 >> > >> > 问题: >> > >> > 1、调用python udf时会报如下错误 >> > Servlet.service() for servlet [dispatcherServlet] in context with path >> [] >> > threw exception [Request processing failed; nested exception is >> > org.apache.flink.table.api.ValidationException: SQL validation failed. >> > Cannot instantiate user-defined function 'myhive.tetris.myreplace'.] >> with >> > root cause >> > java.lang.RuntimeException: Python callback server start failed! >> > >> > 2、sql中CREATE FUNCTION 中的 AS 后面的类路径在python环境下的填写有什么规则吗? >> > >> > python udf文件myReplace.py的内容 >> > >> > from pyflink.table.expressions import call >> > >> > class MyReplace(ScalarFunction): >> > def __init__(self): >> > self.factor = 12 >> > >> > def eval(self, s): >> > return s.replace('buy','sale') >> > >> > 获取远程集群环境,其中的catalogName=myhive,defaultDatabase=tetris >> > >> > StreamExecutionEnvironment env = >> > StreamExecutionEnvironment.createRemoteEnvironment(host,port); >> > env.setStateBackend(new HashMapStateBackend()); >> > env.enableCheckpointing(1000); >> > env.getCheckpointConfig().setCheckpointStorage(new >> > JobManagerCheckpointStorage()); >> > >> > >> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); >> > env.getCheckpointConfig().setCheckpointTimeout(60000); >> > env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500); >> > env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); >> > env.getCheckpointConfig().setTolerableCheckpointFailureNumber(0); >> > env.getCheckpointConfig().setExternalizedCheckpointCleanup( >> > CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION >> > ); >> > EnvironmentSettings bsSettings = >> > >> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); >> > StreamTableEnvironment tableEnv = >> > StreamTableEnvironment.create(env,bsSettings); >> > >> > >> tableEnv.getConfig().getConfiguration().setBoolean("table.exec.hive.fallback-mapred-reader",true); >> > >> > >> tableEnv.getConfig().getConfiguration().setBoolean("table.dynamic-table-options.enabled",true); >> > if (!catalogName.equals(tableEnv.getCurrentCatalog())) { >> > HiveCatalog hiveCatalog = new HiveCatalog(catalogName, >> > defaultDatabase, hiveConfDir); >> > tableEnv.registerCatalog(catalogName, hiveCatalog); >> > } >> > tableEnv.useCatalog(catalogName); >> > tableEnv.useDatabase(defaultDatabase); >> > List<String> jars = new ArrayList<>(); >> > List<String> pys = new ArrayList<>(); >> > log.info("开始加载hdfs上的udf"); >> > String prefix = "/file/function/flink/"; >> > try { >> > EnvUtil.registerFactory(new FsUrlStreamHandlerFactory()); >> > for (String hdf : EnvUtil.listHdfs(prefix,configuration)) { >> > if (hdf.endsWith(".jar")) { >> > jars.add(hdf); >> > EnvUtil.loadJar(URLUtil.url(hdf)); >> > } else if (hdf.endsWith(".py")){ >> > pys.add(hdf); >> > } >> > } >> > >> > >> tableEnv.getConfig().getConfiguration().set(PipelineOptions.CLASSPATHS,jars); >> > >> > >> tableEnv.getConfig().getConfiguration().setString("python.files",StringUtils.join(pys,",")); >> > log.info("完成加载hdfs上的udf"); >> > }catch (Exception e){ >> > e.printStackTrace(); >> > } >> > >> > python文件存放在hdfs指定的路劲下面 >> > >> > 上传py文件后通过tableEnv.executeSql 执行了 >> > >> > CREATE FUNCTION IF NOT EXISTS myhive.tetris.myReplaceAS >> > 'myReplace.MyReplace' LANGUAGE PYTHON >> > >> > 先行感谢flink官方同学的辛苦付出 >> > >> > >> > >> > 799590...@qq.com >> > >> >