Flink使用官方docker-compose起的,python env用conda打的包(python3.7.12+apache-flink==1.16.0+apache-beam=2.38.0); ```bash sql client 启动参数 # sql_client/task_manager/job_manager都挂载了对应目录,权限、所有者均为flink:flink bin/sql-client.sh \ --pyExecutable /opt/flink_data/requirements/py_env/jm_env/bin/python3.7 \ -pyfs file:///opt/flink_data/requirements/udfs/
# 对应conda打包后的执行环境 bin/sql-client.sh \ --pyArchives file:///opt/flink_data/requirements/py_env/pyflink_jm_1.16.0_env.zip \ --pyExecutable pyflink_jm_1.16.0_env.zip/bin/python3.7 \ -pyfs file:///opt/flink_data/requirements/udfs/ -j /opt/flink/lib/flink-python-1.16.0.jar # UDF函数 from pyflink.table import DataTypes from pyflink.table.udf import udf @udf(input_types=[DataTypes.STRING()],result_type=DataTypes.STRING()) def func1(line): return "udf_{}".format(line) # sql测试调用方式 CREATE TEMPORARY FUNCTION func1 AS 'to_fahr.to_fahr.func1' LANGUAGE PYTHON; select func1('Chicago'); # console log [ERROR] Could not execute SQL statement. Reason: java.lang.NoClassDefFoundError: Could not initialize class org.apache.beam.sdk.options.PipelineOptionsFactory #client log org.apache.flink.table.client.gateway.SqlExecutionException: Error while retrieving result. at org.apache.flink.table.client.gateway.local.result.CollectResultBase$ResultRetrievalThread.run(CollectResultBase.java:79) ~[flink-sql-client-1.16.0.jar:1.16.0] Caused by: java.lang.RuntimeException: Failed to fetch next result at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:109) ~[flink-dist-1.16.0.jar:1.16.0] at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80) ~[flink-dist-1.16.0.jar:1.16.0] at org.apache.flink.table.planner.connectors.CollectDynamicSink$CloseableRowIteratorWrapper.hasNext(CollectDynamicSink.java:222) ~[?:?] at org.apache.flink.table.client.gateway.local.result.CollectResultBase$ResultRetrievalThread.run(CollectResultBase.java:75) ~[flink-sql-client-1.16.0.jar:1.16.0] Caused by: java.io.IOException: Failed to fetch job execution result at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:184) ~[flink-dist-1.16.0.jar:1.16.0] at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:121) ~[flink-dist-1.16.0.jar:1.16.0] at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:106) ~[flink-dist-1.16.0.jar:1.16.0] at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80) ~[flink-dist-1.16.0.jar:1.16.0] at org.apache.flink.table.planner.connectors.CollectDynamicSink$CloseableRowIteratorWrapper.hasNext(CollectDynamicSink.java:222) ~[?:?] at org.apache.flink.table.client.gateway.local.result.CollectResultBase$ResultRetrievalThread.run(CollectResultBase.java:75) ~[flink-sql-client-1.16.0.jar:1.16.0] Caused by: java.util.concurrent.ExecutionException: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: b3dcd20267356a2280d09df4bcd6f440) at java.util.concurrent.CompletableFuture.reportGet(Unknown Source) ~[?:?] at java.util.concurrent.CompletableFuture.get(Unknown Source) ~[?:?] at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:182) ~[flink-dist-1.16.0.jar:1.16.0] at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:121) ~[flink-dist-1.16.0.jar:1.16.0] at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:106) ~[flink-dist-1.16.0.jar:1.16.0] at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80) ~[flink-dist-1.16.0.jar:1.16.0] at org.apache.flink.table.planner.connectors.CollectDynamicSink$CloseableRowIteratorWrapper.hasNext(CollectDynamicSink.java:222) ~[?:?] at org.apache.flink.table.client.gateway.local.result.CollectResultBase$ResultRetrievalThread.run(CollectResultBase.java:75) ~[flink-sql-client-1.16.0.jar:1.16.0] Caused by: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: b3dcd20267356a2280d09df4bcd6f440) at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:130) ~[flink-dist-1.16.0.jar:1.16.0] at java.util.concurrent.CompletableFuture$UniApply.tryFire(Unknown Source) ~[?:?] at java.util.concurrent.CompletableFuture.postComplete(Unknown Source) ~[?:?] at java.util.concurrent.CompletableFuture.complete(Unknown Source) ~[?:?] at org.apache.flink.util.concurrent.FutureUtils.lambda$retryOperationWithDelay$6(FutureUtils.java:301) ~[flink-dist-1.16.0.jar:1.16.0] at java.util.concurrent.CompletableFuture.uniWhenComplete(Unknown Source) ~[?:?] at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(Unknown Source) ~[?:?] at java.util.concurrent.CompletableFuture.postComplete(Unknown Source) ~[?:?] at java.util.concurrent.CompletableFuture.complete(Unknown Source) ~[?:?] at org.apache.flink.client.program.rest.RestClusterClient.lambda$pollResourceAsync$31(RestClusterClient.java:772) ~[flink-dist-1.16.0.jar:1.16.0] at java.util.concurrent.CompletableFuture.uniWhenComplete(Unknown Source) ~[?:?] at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(Unknown Source) ~[?:?] at java.util.concurrent.CompletableFuture.postComplete(Unknown Source) ~[?:?] at java.util.concurrent.CompletableFuture.complete(Unknown Source) ~[?:?] at org.apache.flink.util.concurrent.FutureUtils.lambda$retryOperationWithDelay$6(FutureUtils.java:301) ~[flink-dist-1.16.0.jar:1.16.0] at java.util.concurrent.CompletableFuture.uniWhenComplete(Unknown Source) ~[?:?] at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(Unknown Source) ~[?:?] at java.util.concurrent.CompletableFuture.postComplete(Unknown Source) ~[?:?] at java.util.concurrent.CompletableFuture.postFire(Unknown Source) ~[?:?] at java.util.concurrent.CompletableFuture$UniCompose.tryFire(Unknown Source) ~[?:?] at java.util.concurrent.CompletableFuture$Completion.run(Unknown Source) ~[?:?] at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) ~[?:?] at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) ~[?:?] at java.lang.Thread.run(Unknown Source) ~[?:?] Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed. at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144) ~[flink-dist-1.16.0.jar:1.16.0] at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:128) ~[flink-dist-1.16.0.jar:1.16.0] at java.util.concurrent.CompletableFuture$UniApply.tryFire(Unknown Source) ~[?:?] at java.util.concurrent.CompletableFuture.postComplete(Unknown Source) ~[?:?] at java.util.concurrent.CompletableFuture.complete(Unknown Source) ~[?:?] at org.apache.flink.util.concurrent.FutureUtils.lambda$retryOperationWithDelay$6(FutureUtils.java:301) ~[flink-dist-1.16.0.jar:1.16.0] at java.util.concurrent.CompletableFuture.uniWhenComplete(Unknown Source) ~[?:?] at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(Unknown Source) ~[?:?] at java.util.concurrent.CompletableFuture.postComplete(Unknown Source) ~[?:?] at java.util.concurrent.CompletableFuture.complete(Unknown Source) ~[?:?] at org.apache.flink.client.program.rest.RestClusterClient.lambda$pollResourceAsync$31(RestClusterClient.java:772) ~[flink-dist-1.16.0.jar:1.16.0] at java.util.concurrent.CompletableFuture.uniWhenComplete(Unknown Source) ~[?:?] at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(Unknown Source) ~[?:?] at java.util.concurrent.CompletableFuture.postComplete(Unknown Source) ~[?:?] at java.util.concurrent.CompletableFuture.complete(Unknown Source) ~[?:?] at org.apache.flink.util.concurrent.FutureUtils.lambda$retryOperationWithDelay$6(FutureUtils.java:301) ~[flink-dist-1.16.0.jar:1.16.0] at java.util.concurrent.CompletableFuture.uniWhenComplete(Unknown Source) ~[?:?] at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(Unknown Source) ~[?:?] at java.util.concurrent.CompletableFuture.postComplete(Unknown Source) ~[?:?] at java.util.concurrent.CompletableFuture.postFire(Unknown Source) ~[?:?] at java.util.concurrent.CompletableFuture$UniCompose.tryFire(Unknown Source) ~[?:?] at java.util.concurrent.CompletableFuture$Completion.run(Unknown Source) ~[?:?] at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) ~[?:?] at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) ~[?:?] at java.lang.Thread.run(Unknown Source) ~[?:?] Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:139) ~[flink-dist-1.16.0.jar:1.16.0] at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:83) ~[flink-dist-1.16.0.jar:1.16.0] at org.apache.flink.runtime.scheduler.DefaultScheduler.recordTaskFailure(DefaultScheduler.java:256) ~[flink-dist-1.16.0.jar:1.16.0] at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:247) ~[flink-dist-1.16.0.jar:1.16.0] at org.apache.flink.runtime.scheduler.DefaultScheduler.onTaskFailed(DefaultScheduler.java:240) ~[flink-dist-1.16.0.jar:1.16.0] at org.apache.flink.runtime.scheduler.SchedulerBase.onTaskExecutionStateUpdate(SchedulerBase.java:738) ~[flink-dist-1.16.0.jar:1.16.0] at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:715) ~[flink-dist-1.16.0.jar:1.16.0] at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:78) ~[flink-dist-1.16.0.jar:1.16.0] at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:477) ~[flink-dist-1.16.0.jar:1.16.0] at jdk.internal.reflect.GeneratedMethodAccessor88.invoke(Unknown Source) ~[?:?] at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) ~[?:?] at java.lang.reflect.Method.invoke(Unknown Source) ~[?:?] at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRpcInvocation$1(AkkaRpcActor.java:309) ~[?:?] at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83) ~[?:?] at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:307) ~[?:?] at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:222) ~[?:?] at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:84) ~[?:?] at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:168) ~[?:?] at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) ~[?:?] at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) ~[?:?] at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) ~[flink-scala_2.12-1.16.0.jar:1.16.0] at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) ~[flink-scala_2.12-1.16.0.jar:1.16.0] at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) ~[?:?] at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) ~[flink-scala_2.12-1.16.0.jar:1.16.0] at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) ~[flink-scala_2.12-1.16.0.jar:1.16.0] at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) ~[flink-scala_2.12-1.16.0.jar:1.16.0] at akka.actor.Actor.aroundReceive(Actor.scala:537) ~[?:?] at akka.actor.Actor.aroundReceive$(Actor.scala:535) ~[?:?] at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220) ~[?:?] at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580) ~[?:?] at akka.actor.ActorCell.invoke(ActorCell.scala:548) ~[?:?] at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270) ~[?:?] at akka.dispatch.Mailbox.run(Mailbox.scala:231) ~[?:?] at akka.dispatch.Mailbox.exec(Mailbox.scala:243) ~[?:?] at java.util.concurrent.ForkJoinTask.doExec(Unknown Source) ~[?:?] at java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(Unknown Source) ~[?:?] at java.util.concurrent.ForkJoinPool.scan(Unknown Source) ~[?:?] at java.util.concurrent.ForkJoinPool.runWorker(Unknown Source) ~[?:?] at java.util.concurrent.ForkJoinWorkerThread.run(Unknown Source) ~[?:?] Caused by: java.lang.NoClassDefFoundError: Could not initialize class org.apache.beam.sdk.options.PipelineOptionsFactory at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.open(BeamPythonFunctionRunner.java:238) ~[flink-python-1.16.0.jar:1.16.0] at org.apache.flink.streaming.api.operators.python.process.AbstractExternalPythonFunctionOperator.open(AbstractExternalPythonFunctionOperator.java:57) ~[flink-python-1.16.0.jar:1.16.0] at org.apache.flink.table.runtime.operators.python.AbstractStatelessFunctionOperator.open(AbstractStatelessFunctionOperator.java:92) ~[flink-python-1.16.0.jar:1.16.0] at org.apache.flink.table.runtime.operators.python.scalar.AbstractPythonScalarFunctionOperator.open(AbstractPythonScalarFunctionOperator.java:101) ~[flink-python-1.16.0.jar:1.16.0] at org.apache.flink.table.runtime.operators.python.scalar.PythonScalarFunctionOperator.open(PythonScalarFunctionOperator.java:71) ~[flink-python-1.16.0.jar:1.16.0] at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:107) ~[flink-dist-1.16.0.jar:1.16.0] at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:726) ~[flink-dist-1.16.0.jar:1.16.0] at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.call(StreamTaskActionExecutor.java:100) ~[flink-dist-1.16.0.jar:1.16.0] at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:702) ~[flink-dist-1.16.0.jar:1.16.0] at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:669) ~[flink-dist-1.16.0.jar:1.16.0] at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935) ~[flink-dist-1.16.0.jar:1.16.0] at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:904) ~[flink-dist-1.16.0.jar:1.16.0] at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728) ~[flink-dist-1.16.0.jar:1.16.0] at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550) ~[flink-dist-1.16.0.jar:1.16.0] at java.lang.Thread.run(Unknown Source) ~[?:?] 此外在actiavte了对应env的情况下,使用pyflink调用函数,是正常返回的 from pyflink.common import Row from pyflink.table import AggregateFunction, DataTypes, TableEnvironment, EnvironmentSettings from pyflink.table.expressions import call from pyflink.table.udf import udaf, udf from pyflink.table.expressions import col, lit from pyflink.table.window import Tumble @udf(input_types=[DataTypes.STRING()],result_type=DataTypes.STRING()) def func1(line): return "udf_{}".format(line) env_settings = EnvironmentSettings.in_streaming_mode() table_env = TableEnvironment.create(env_settings) t = table_env.from_elements([(1, 2, "Lee"), (3, 4, "Jay"), (5, 6, "Jay"), (7, 8, "Lee")]).alias("value", "count", "name") # register table table_env.create_temporary_view("source", t) table_env.execute_sql("CREATE TEMPORARY FUNCTION func1 AS 'to_fahr.to_fahr.func1' LANGUAGE PYTHON;"); result = table_env.sql_query( """ SELECT func1('test'); """).execute() result.print() ``` 所以想请教下大佬们在flink sql模式下,都是怎么使用python udf的?