刚注意到你用的YARN application模式,PyFlink 1.14.0才支持YARN application模式,主要是新增了命令行选项“
-pyclientexec” 和配置“python.client.executable”:
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/python/python_config/#python-client-executable

对于你这个作业来说,你需要通过使用1.14.0版本,同时添加命令行选项:-pyclientexec venv.zip/venv/bin/python


On Fri, Nov 19, 2021 at 10:48 AM Asahi Lee <[email protected]> wrote:

> 我通过source my_env/bin/activate和指定环境变量PYFLINK_CLIENT_EXECUTABLE均未提交成功,错误未变化;
> 我的错误中,jobmanager的日志中显示No module named pyflink,而jobmanager是运行在yarn集群上;
> 请问,还有还有什么未配置?
>
>
> &gt; LogType:jobmanager.out
> &gt; Log Upload Time:星期四 十一月 18 20:48:45 +0800 2021
> &gt; LogLength:37
> &gt; Log Contents:
> &gt; /bin/python: No module named pyflink
>
>
>
>
> ------------------&nbsp;原始邮件&nbsp;------------------
> 发件人:
>                                                   "user-zh"
>                                                                     <
> [email protected]&gt;;
> 发送时间:&nbsp;2021年11月19日(星期五) 上午9:38
> 收件人:&nbsp;"user-zh"<[email protected]&gt;;
>
> 主题:&nbsp;Re: 哪里可以下载到downloads/setup-pyflink-virtual-env.sh脚本
>
>
>
> -pyexec 指定的是集群端所用的Python环境,客户端需要编译Flink作业,也会依赖Python环境。可以看一下这个文档:
>
> https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/python/dependency_management/#python-interpreter-of-client
>
> On Thu, Nov 18, 2021 at 9:00 PM Asahi Lee <[email protected]&gt;
> wrote:
>
> &gt; Hi !
> &gt; &amp;nbsp; &amp;nbsp;我在java Table api中使用python udf
> &gt; 函数,通过下面的命令提交应用,报无法启动python服务错误,请问我的提交方式对吗?jm日志为/bin/python: No module
> named
> &gt; pyflink。
> &gt;
> &gt;
> &gt; ./flink-1.13.2/bin/flink&amp;nbsp;
> &gt; run-application -t yarn-application&amp;nbsp;
> &gt;
> -Dyarn.provided.lib.dirs="hdfs://nameservice1/user/flink/flinklib"&amp;nbsp;
> &gt; -Dyarn.application.queue=d
> &gt; -p 1&amp;nbsp;
> &gt; -pyarch /opt/venv.zip
> &gt; -pyexec venv.zip/venv/bin/python&amp;nbsp;
> &gt; -pyfs /opt/test.py&amp;nbsp;
> &gt; -c test.PyUDFTest&amp;nbsp;
> &gt; /opt/flink-python-test-1.0-SNAPSHOT.jar
> &gt;
> &gt;
> &gt;
> &gt; 错误:
> &gt; Caused by: java.lang.RuntimeException: Python callback server start
> failed!
> &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at
> &gt;
> org.apache.flink.client.python.PythonFunctionFactory.createPythonFunctionFactory(PythonFunctionFactory.java:167)
> &gt; ~[flink-python_2.11-1.13.2.jar:1.13.2]
> &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at
> &gt;
> org.apache.flink.client.python.PythonFunctionFactory$1.load(PythonFunctionFactory.java:88)
> &gt; ~[flink-python_2.11-1.13.2.jar:1.13.2]
> &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at
> &gt;
> org.apache.flink.client.python.PythonFunctionFactory$1.load(PythonFunctionFactory.java:84)
> &gt; ~[flink-python_2.11-1.13.2.jar:1.13.2]
> &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at
> &gt;
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3527)
> &gt; ~[flink-dist_2.11-1.13.1.jar:1.13.1]
> &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at
> &gt;
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2319)
> &gt; ~[flink-dist_2.11-1.13.1.jar:1.13.1]
> &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at
> &gt;
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2282)
> &gt; ~[flink-dist_2.11-1.13.1.jar:1.13.1]
> &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at
> &gt;
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2197)
> &gt; ~[flink-dist_2.11-1.13.1.jar:1.13.1]
> &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at
> &gt;
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache.get(LocalCache.java:3937)
> &gt; ~[flink-dist_2.11-1.13.1.jar:1.13.1]
> &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at
> &gt;
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3941)
> &gt; ~[flink-dist_2.11-1.13.1.jar:1.13.1]
> &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at
> &gt;
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4824)
> &gt; ~[flink-dist_2.11-1.13.1.jar:1.13.1]
> &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at
> &gt;
> org.apache.flink.client.python.PythonFunctionFactory.getPythonFunction(PythonFunctionFactory.java:129)
> &gt; ~[flink-python_2.11-1.13.2.jar:1.13.2]
> &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at
> &gt; sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> ~[?:1.8.0_111]
> &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at
> &gt;
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> &gt; ~[?:1.8.0_111]
> &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at
> &gt;
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> &gt; ~[?:1.8.0_111]
> &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at
> &gt; java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_111]
> &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at
> &gt;
> org.apache.flink.table.functions.python.utils.PythonFunctionUtils.getPythonFunction(PythonFunctionUtils.java:45)
> &gt; ~[flink-table-blink_2.11-1.13.1.jar:1.13.1]
> &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at
> &gt;
> org.apache.flink.table.functions.UserDefinedFunctionHelper.instantiateFunction(UserDefinedFunctionHelper.java:206)
> &gt; ~[flink-table-blink_2.11-1.13.1.jar:1.13.1]
> &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at
> &gt;
> org.apache.flink.table.catalog.FunctionCatalog.getFunctionDefinition(FunctionCatalog.java:659)
> &gt; ~[flink-table-blink_2.11-1.13.1.jar:1.13.1]
> &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at
> &gt;
> org.apache.flink.table.catalog.FunctionCatalog.resolveAmbiguousFunctionReference(FunctionCatalog.java:606)
> &gt; ~[flink-table-blink_2.11-1.13.1.jar:1.13.1]
> &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at
> &gt;
> org.apache.flink.table.catalog.FunctionCatalog.lookupFunction(FunctionCatalog.java:362)
> &gt; ~[flink-table-blink_2.11-1.13.1.jar:1.13.1]
> &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at
> &gt;
> org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.lookupOperatorOverloads(FunctionCatalogOperatorTable.java:97)
> &gt; ~[flink-table-blink_2.11-1.13.1.jar:1.13.1]
> &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at
> &gt;
> org.apache.calcite.sql.util.ChainedSqlOperatorTable.lookupOperatorOverloads(ChainedSqlOperatorTable.java:67)
> &gt; ~[flink-table-blink_2.11-1.13.1.jar:1.13.1]
> &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at
> &gt;
> org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1183)
> &gt; ~[flink-table-blink_2.11-1.13.1.jar:1.13.1]
> &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at
> &gt;
> org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1169)
> &gt; ~[flink-table-blink_2.11-1.13.1.jar:1.13.1]
> &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at
> &gt;
> org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1200)
> &gt; ~[flink-table-blink_2.11-1.13.1.jar:1.13.1]
> &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at
> &gt;
> org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1169)
> &gt; ~[flink-table-blink_2.11-1.13.1.jar:1.13.1]
> &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at
> &gt;
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:945)
> &gt; ~[flink-table-blink_2.11-1.13.1.jar:1.13.1]
> &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at
> &gt;
> org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:704)
> &gt; ~[flink-table-blink_2.11-1.13.1.jar:1.13.1]
> &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at
> &gt; org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:152)
> &gt; ~[flink-table-blink_2.11-1.13.1.jar:1.13.1]
> &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at
> &gt;
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:110)
> &gt; ~[flink-table-blink_2.11-1.13.1.jar:1.13.1]
> &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at
> &gt;
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:201)
> &gt; ~[flink-table-blink_2.11-1.13.1.jar:1.13.1]
> &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at
> &gt;
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:101)
> &gt; ~[flink-table-blink_2.11-1.13.1.jar:1.13.1]
> &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at
> &gt;
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:704)
> &gt; ~[flink-table-blink_2.11-1.13.1.jar:1.13.1]
> &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at
> test.PyUDFTest.main(PyUDFTest.java:22)
> &gt; ~[flink-python-test-1.0-SNAPSHOT.jar:?]
> &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at
> &gt; sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> ~[?:1.8.0_111]
> &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at
> &gt;
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> &gt; ~[?:1.8.0_111]
> &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at
> &gt;
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> &gt; ~[?:1.8.0_111]
> &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at
> &gt; java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_111]
> &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at
> &gt;
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
> &gt; ~[flink-dist_2.11-1.13.1.jar:1.13.1]
> &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at
> &gt;
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
> &gt; ~[flink-dist_2.11-1.13.1.jar:1.13.1]
> &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at
> &gt;
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
> &gt; ~[flink-dist_2.11-1.13.1.jar:1.13.1]
> &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at
> &gt;
> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:242)
> &gt; ~[flink-dist_2.11-1.13.1.jar:1.13.1]
> &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; ... 10 more
> &gt; 2021-11-18 20:48:44,458 INFO&amp;nbsp;
> &gt; org.apache.flink.runtime.blob.BlobServer&amp;nbsp; &amp;nbsp;
> &amp;nbsp; &amp;nbsp; &amp;nbsp;
> &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp;[] -
> Stopped BLOB server at
> &gt; 0.0.0.0:45070
> &gt; 2021-11-18 20:48:44,458 INFO&amp;nbsp;
> &gt; org.apache.flink.runtime.entrypoint.ClusterEntrypoint&amp;nbsp;
> &amp;nbsp; &amp;nbsp;
> &gt; &amp;nbsp; [] - Shutting YarnApplicationClusterEntryPoint down with
> application
> &gt; status UNKNOWN. Diagnostics Cluster entrypoint has been closed
> externally..
> &gt; 2021-11-18 20:48:44,458 INFO&amp;nbsp;
> &gt; org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] -
> Shutting
> &gt; down rest endpoint.
> &gt; 2021-11-18 20:48:44,474 INFO&amp;nbsp;
> &gt; org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] -
> Removing
> &gt; cache directory
> &gt; /tmp/flink-web-dd82c3c0-f457-492d-8e64-5ae74fe9abbd/flink-web-ui
> &gt; 2021-11-18 20:48:44,474 INFO&amp;nbsp;
> &gt; org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] -
> &gt; http://cdh5node3:40216 lost leadership
> &gt; 2021-11-18 20:48:44,474 INFO&amp;nbsp;
> &gt; org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] -
> Shut
> &gt; down complete.
> &gt; 2021-11-18 20:48:44,474 INFO&amp;nbsp;
> &gt;
> org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponent
> &gt; [] - Closing components.
> &gt; 2021-11-18 20:48:44,475 INFO&amp;nbsp;
> &gt;
> org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess
> &gt; [] - Stopping SessionDispatcherLeaderProcess.
> &gt; 2021-11-18 20:48:44,475 INFO&amp;nbsp;
> &gt; org.apache.flink.runtime.dispatcher.StandaloneDispatcher&amp;nbsp;
> &amp;nbsp;
> &gt; &amp;nbsp;[] - Stopping dispatcher akka.tcp://flink@cdh5node3
> &gt; :34697/user/rpc/dispatcher_1.
> &gt; 2021-11-18 20:48:44,476 INFO&amp;nbsp;
> &gt; org.apache.flink.runtime.dispatcher.StandaloneDispatcher&amp;nbsp;
> &amp;nbsp;
> &gt; &amp;nbsp;[] - Stopping all currently running jobs of dispatcher
> &gt; akka.tcp://flink@cdh5node3:34697/user/rpc/dispatcher_1.
> &gt;
> &gt;
> &gt; LogType:jobmanager.out
> &gt; Log Upload Time:星期四 十一月 18 20:48:45 +0800 2021
> &gt; LogLength:37
> &gt; Log Contents:
> &gt; /bin/python: No module named pyflink
> &gt;
> &gt;
> &gt;
> &gt;
> &gt;
> &gt;
> &gt;
> &gt;
> &gt;
> &gt;
> &gt;
> &gt;
> &gt;
> &gt; ------------------&amp;nbsp;原始邮件&amp;nbsp;------------------
> &gt; 发件人:
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> "user-zh"
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> <
> &gt; [email protected]&amp;gt;;
> &gt; 发送时间:&amp;nbsp;2021年11月18日(星期四) 下午3:34
> &gt; 收件人:&amp;nbsp;"user-zh"<[email protected]&amp;gt;;
> &gt;
> &gt; 主题:&amp;nbsp;Re:哪里可以下载到downloads/setup-pyflink-virtual-env.sh脚本
> &gt;
> &gt;
> &gt;
> &gt; Hi!<br/&amp;gt;我在下面的旧版本中找到了,你试下能不能用:<br/&amp;gt;
> &gt;
> https://nightlies.apache.org/flink/flink-docs-release-1.12/downloads/setup-pyflink-virtual-env.sh
> &gt
> <https://nightlies.apache.org/flink/flink-docs-release-1.12/downloads/setup-pyflink-virtual-env.sh&gt>;
> 在 2021-11-18 15:05:03,"Asahi Lee" <[email protected]&amp;gt; 写道:
> &gt; &amp;gt;Hi!
> &gt; &amp;gt;&amp;amp;nbsp; &amp;amp;nbsp;
> &gt;
> 我在flink官方文档中看到通过&amp;amp;nbsp;&amp;amp;nbsp;setup-pyflink-virtual-env.sh
> &gt; 脚本制作python虚拟环境,请问再哪里可以下载到?文档地址如下:
> &gt; &amp;gt;
> &gt;
> https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/dev/python/faq/

回复