Hi, 1. 之前那个报错在重新安装完pyflink之后有没有解决(本地python demo.py是否正常);之前那个报错是本地运行就报错,还是在远程提交才报的错。 2. 现在这个报错是作业提交时编译阶段就报错了,还没到作业运行。在作业提交的console界面是可以看到错误日志的,可否提供一下错误日志。
Best, Xingbo jing <[email protected]> 于2020年11月3日周二 下午5:36写道: > Hi, xingbo. > 在检查之后,并没有发现什么 beam 的其他版本,flink-python 版本是 2.11-1.11.1。不管是 pip install > apache-beam==2.19.0 还是没有,都是一样的问题。 > 用 udf 的示例代码也不通过。本地 python demo.py 是可以正常的。 > 只有是 flink run -m localhost:8081 -py demo.py 是一直在出现这种问题。 > pyflink-shell.sh remote localhost 8081 也试过了。一样的结果。 > > 示例代码如下: > > import logging > import sys > from pyflink.datastream import StreamExecutionEnvironment > from pyflink.table import StreamTableEnvironment, EnvironmentSettings, > DataTypes > from pyflink.table.udf import udf > > > def word_count(): > content = "line Licensed to the Apache Software Foundation ASF under > one > " \ > "line or more contributor license agreements See the NOTICE > file " \ > "line distributed with this work for additional information " > \ > "line regarding copyright ownership The ASF licenses this > file > " \ > "to you under the Apache License Version the " \ > "License you may not use this file except in compliance " \ > "with the License" > t_env = StreamTableEnvironment.create( > StreamExecutionEnvironment.get_execution_environment(), > > > environment_settings=EnvironmentSettings.new_instance().use_blink_planner().build() > ) > > > t_env.get_config().get_configuration().set_boolean("python.fn-execution.memory.managed", > True) > sink_ddl = """ > create table Results(word VARCHAR,`count` BIGINT) with ( > 'connector' > = 'print') > """ > add = udf(lambda i: i + 1024, [DataTypes.BIGINT()], DataTypes.BIGINT()) > t_env.register_function("add_test", add) > t_env.sql_update(sink_ddl) > elements = [(word, 1) for word in content.split(" ")] > t_env.from_elements(elements, ["word", "count"]) \ > .group_by("word") \ > .select("word, add_test(count(1)) as count") \ > .insert_into("Results") > t_env.execute("word_count") > > > if __name__ == '__main__': > logging.basicConfig(stream=sys.stdout, level=logging.INFO, > format="%(message)s") > word_count() > > > 环境是基于官方的 docker 镜像 flink:1.11.1-scala_2.11,JobManager 和 TaskManager 都正常,在没有 > udf 的时候作业都是正常的,jar 包只装了 jdbc,kafka,es 的connector,还有 csv 的 jar 包。 > > 这个情况下需要装什么东西吗,还是需要改配置。 > > 日志上提示是: > > 2020-11-03 09:24:05,792 ERROR org.apache.flink.client.python.PythonDriver > > [] - Run python process failed > java.lang.RuntimeException: Python process exits with code: 1 > at > org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:88) > ~[flink-python_2.11-1.11.1.jar:1.11.1] > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > ~[?:1.8.0_265] > at > > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > ~[?:1.8.0_265] > at > > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > ~[?:1.8.0_265] > at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_265] > at > > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at > > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at > org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at > > org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at > org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at > > org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at > > org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at > > org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30) > [flink-dist_2.11-1.11.1.jar:1.11.1] > at > org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992) > [flink-dist_2.11-1.11.1.jar:1.11.1] > 2020-11-03 09:24:05,798 ERROR org.apache.flink.client.cli.CliFrontend > > [] - Fatal error while running command line interface. > org.apache.flink.client.program.ProgramAbortException: null > at > org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:95) > ~[flink-python_2.11-1.11.1.jar:1.11.1] > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > ~[?:1.8.0_265] > at > > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > ~[?:1.8.0_265] > at > > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > ~[?:1.8.0_265] > at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_265] > at > > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at > > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at > org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at > > org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at > org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at > > org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at > > org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at > > org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at > org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992) > [flink-dist_2.11-1.11.1.jar:1.11.1] > org.apache.flink.client.program.ProgramAbortException > at > org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:95) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288) > at > > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198) > at > org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149) > at > > org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699) > at > org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232) > at > > org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916) > at > > org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992) > at > > org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30) > at > org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992) > > > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ >
