Flink使用官方docker-compose起的,python 
env用conda打的包(python3.7.12+apache-flink==1.16.0+apache-beam=2.38.0);
```bash
sql client 启动参数
# sql_client/task_manager/job_manager都挂载了对应目录,权限、所有者均为flink:flink
bin/sql-client.sh \ --pyExecutable 
/opt/flink_data/requirements/py_env/jm_env/bin/python3.7 \ -pyfs 
file:///opt/flink_data/requirements/udfs/

# 对应conda打包后的执行环境
bin/sql-client.sh \
--pyArchives 
file:///opt/flink_data/requirements/py_env/pyflink_jm_1.16.0_env.zip \
--pyExecutable pyflink_jm_1.16.0_env.zip/bin/python3.7 \
-pyfs file:///opt/flink_data/requirements/udfs/ -j 
/opt/flink/lib/flink-python-1.16.0.jar

# UDF函数
from pyflink.table import DataTypes
from pyflink.table.udf import udf


@udf(input_types=[DataTypes.STRING()],result_type=DataTypes.STRING())
def func1(line):
    return "udf_{}".format(line)


# sql测试调用方式
CREATE TEMPORARY FUNCTION func1 AS 'to_fahr.to_fahr.func1' LANGUAGE PYTHON;
select func1('Chicago');
# console log
[ERROR] Could not execute SQL statement. Reason:
java.lang.NoClassDefFoundError: Could not initialize class 
org.apache.beam.sdk.options.PipelineOptionsFactory

#client log
org.apache.flink.table.client.gateway.SqlExecutionException: Error while 
retrieving result.
        at 
org.apache.flink.table.client.gateway.local.result.CollectResultBase$ResultRetrievalThread.run(CollectResultBase.java:79)
 ~[flink-sql-client-1.16.0.jar:1.16.0]
Caused by: java.lang.RuntimeException: Failed to fetch next result
        at 
org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:109)
 ~[flink-dist-1.16.0.jar:1.16.0]
        at 
org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80)
 ~[flink-dist-1.16.0.jar:1.16.0]
        at 
org.apache.flink.table.planner.connectors.CollectDynamicSink$CloseableRowIteratorWrapper.hasNext(CollectDynamicSink.java:222)
 ~[?:?]
        at 
org.apache.flink.table.client.gateway.local.result.CollectResultBase$ResultRetrievalThread.run(CollectResultBase.java:75)
 ~[flink-sql-client-1.16.0.jar:1.16.0]
Caused by: java.io.IOException: Failed to fetch job execution result
        at 
org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:184)
 ~[flink-dist-1.16.0.jar:1.16.0]
        at 
org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:121)
 ~[flink-dist-1.16.0.jar:1.16.0]
        at 
org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:106)
 ~[flink-dist-1.16.0.jar:1.16.0]
        at 
org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80)
 ~[flink-dist-1.16.0.jar:1.16.0]
        at 
org.apache.flink.table.planner.connectors.CollectDynamicSink$CloseableRowIteratorWrapper.hasNext(CollectDynamicSink.java:222)
 ~[?:?]
        at 
org.apache.flink.table.client.gateway.local.result.CollectResultBase$ResultRetrievalThread.run(CollectResultBase.java:75)
 ~[flink-sql-client-1.16.0.jar:1.16.0]
Caused by: java.util.concurrent.ExecutionException: 
org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: 
b3dcd20267356a2280d09df4bcd6f440)
        at java.util.concurrent.CompletableFuture.reportGet(Unknown Source) 
~[?:?]
        at java.util.concurrent.CompletableFuture.get(Unknown Source) ~[?:?]
        at 
org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:182)
 ~[flink-dist-1.16.0.jar:1.16.0]
        at 
org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:121)
 ~[flink-dist-1.16.0.jar:1.16.0]
        at 
org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:106)
 ~[flink-dist-1.16.0.jar:1.16.0]
        at 
org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80)
 ~[flink-dist-1.16.0.jar:1.16.0]
        at 
org.apache.flink.table.planner.connectors.CollectDynamicSink$CloseableRowIteratorWrapper.hasNext(CollectDynamicSink.java:222)
 ~[?:?]
        at 
org.apache.flink.table.client.gateway.local.result.CollectResultBase$ResultRetrievalThread.run(CollectResultBase.java:75)
 ~[flink-sql-client-1.16.0.jar:1.16.0]
Caused by: org.apache.flink.client.program.ProgramInvocationException: Job 
failed (JobID: b3dcd20267356a2280d09df4bcd6f440)
        at 
org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:130)
 ~[flink-dist-1.16.0.jar:1.16.0]
        at java.util.concurrent.CompletableFuture$UniApply.tryFire(Unknown 
Source) ~[?:?]
        at java.util.concurrent.CompletableFuture.postComplete(Unknown Source) 
~[?:?]
        at java.util.concurrent.CompletableFuture.complete(Unknown Source) 
~[?:?]
        at 
org.apache.flink.util.concurrent.FutureUtils.lambda$retryOperationWithDelay$6(FutureUtils.java:301)
 ~[flink-dist-1.16.0.jar:1.16.0]
        at java.util.concurrent.CompletableFuture.uniWhenComplete(Unknown 
Source) ~[?:?]
        at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(Unknown Source) 
~[?:?]
        at java.util.concurrent.CompletableFuture.postComplete(Unknown Source) 
~[?:?]
        at java.util.concurrent.CompletableFuture.complete(Unknown Source) 
~[?:?]
        at 
org.apache.flink.client.program.rest.RestClusterClient.lambda$pollResourceAsync$31(RestClusterClient.java:772)
 ~[flink-dist-1.16.0.jar:1.16.0]
        at java.util.concurrent.CompletableFuture.uniWhenComplete(Unknown 
Source) ~[?:?]
        at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(Unknown Source) 
~[?:?]
        at java.util.concurrent.CompletableFuture.postComplete(Unknown Source) 
~[?:?]
        at java.util.concurrent.CompletableFuture.complete(Unknown Source) 
~[?:?]
        at 
org.apache.flink.util.concurrent.FutureUtils.lambda$retryOperationWithDelay$6(FutureUtils.java:301)
 ~[flink-dist-1.16.0.jar:1.16.0]
        at java.util.concurrent.CompletableFuture.uniWhenComplete(Unknown 
Source) ~[?:?]
        at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(Unknown Source) 
~[?:?]
        at java.util.concurrent.CompletableFuture.postComplete(Unknown Source) 
~[?:?]
        at java.util.concurrent.CompletableFuture.postFire(Unknown Source) 
~[?:?]
        at java.util.concurrent.CompletableFuture$UniCompose.tryFire(Unknown 
Source) ~[?:?]
        at java.util.concurrent.CompletableFuture$Completion.run(Unknown 
Source) ~[?:?]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) 
~[?:?]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) 
~[?:?]
        at java.lang.Thread.run(Unknown Source) ~[?:?]
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution 
failed.
        at 
org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
 ~[flink-dist-1.16.0.jar:1.16.0]
        at 
org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:128)
 ~[flink-dist-1.16.0.jar:1.16.0]
        at java.util.concurrent.CompletableFuture$UniApply.tryFire(Unknown 
Source) ~[?:?]
        at java.util.concurrent.CompletableFuture.postComplete(Unknown Source) 
~[?:?]
        at java.util.concurrent.CompletableFuture.complete(Unknown Source) 
~[?:?]
        at 
org.apache.flink.util.concurrent.FutureUtils.lambda$retryOperationWithDelay$6(FutureUtils.java:301)
 ~[flink-dist-1.16.0.jar:1.16.0]
        at java.util.concurrent.CompletableFuture.uniWhenComplete(Unknown 
Source) ~[?:?]
        at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(Unknown Source) 
~[?:?]
        at java.util.concurrent.CompletableFuture.postComplete(Unknown Source) 
~[?:?]
        at java.util.concurrent.CompletableFuture.complete(Unknown Source) 
~[?:?]
        at 
org.apache.flink.client.program.rest.RestClusterClient.lambda$pollResourceAsync$31(RestClusterClient.java:772)
 ~[flink-dist-1.16.0.jar:1.16.0]
        at java.util.concurrent.CompletableFuture.uniWhenComplete(Unknown 
Source) ~[?:?]
        at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(Unknown Source) 
~[?:?]
        at java.util.concurrent.CompletableFuture.postComplete(Unknown Source) 
~[?:?]
        at java.util.concurrent.CompletableFuture.complete(Unknown Source) 
~[?:?]
        at 
org.apache.flink.util.concurrent.FutureUtils.lambda$retryOperationWithDelay$6(FutureUtils.java:301)
 ~[flink-dist-1.16.0.jar:1.16.0]
        at java.util.concurrent.CompletableFuture.uniWhenComplete(Unknown 
Source) ~[?:?]
        at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(Unknown Source) 
~[?:?]
        at java.util.concurrent.CompletableFuture.postComplete(Unknown Source) 
~[?:?]
        at java.util.concurrent.CompletableFuture.postFire(Unknown Source) 
~[?:?]
        at java.util.concurrent.CompletableFuture$UniCompose.tryFire(Unknown 
Source) ~[?:?]
        at java.util.concurrent.CompletableFuture$Completion.run(Unknown 
Source) ~[?:?]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) 
~[?:?]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) 
~[?:?]
        at java.lang.Thread.run(Unknown Source) ~[?:?]
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by 
NoRestartBackoffTimeStrategy
        at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:139)
 ~[flink-dist-1.16.0.jar:1.16.0]
        at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:83)
 ~[flink-dist-1.16.0.jar:1.16.0]
        at 
org.apache.flink.runtime.scheduler.DefaultScheduler.recordTaskFailure(DefaultScheduler.java:256)
 ~[flink-dist-1.16.0.jar:1.16.0]
        at 
org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:247)
 ~[flink-dist-1.16.0.jar:1.16.0]
        at 
org.apache.flink.runtime.scheduler.DefaultScheduler.onTaskFailed(DefaultScheduler.java:240)
 ~[flink-dist-1.16.0.jar:1.16.0]
        at 
org.apache.flink.runtime.scheduler.SchedulerBase.onTaskExecutionStateUpdate(SchedulerBase.java:738)
 ~[flink-dist-1.16.0.jar:1.16.0]
        at 
org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:715)
 ~[flink-dist-1.16.0.jar:1.16.0]
        at 
org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:78)
 ~[flink-dist-1.16.0.jar:1.16.0]
        at 
org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:477)
 ~[flink-dist-1.16.0.jar:1.16.0]
        at jdk.internal.reflect.GeneratedMethodAccessor88.invoke(Unknown 
Source) ~[?:?]
        at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown 
Source) ~[?:?]
        at java.lang.reflect.Method.invoke(Unknown Source) ~[?:?]
        at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRpcInvocation$1(AkkaRpcActor.java:309)
 ~[?:?]
        at 
org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
 ~[?:?]
        at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:307)
 ~[?:?]
        at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:222)
 ~[?:?]
        at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:84)
 ~[?:?]
        at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:168)
 ~[?:?]
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) ~[?:?]
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) ~[?:?]
        at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) 
~[flink-scala_2.12-1.16.0.jar:1.16.0]
        at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) 
~[flink-scala_2.12-1.16.0.jar:1.16.0]
        at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) 
~[?:?]
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) 
~[flink-scala_2.12-1.16.0.jar:1.16.0]
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) 
~[flink-scala_2.12-1.16.0.jar:1.16.0]
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) 
~[flink-scala_2.12-1.16.0.jar:1.16.0]
        at akka.actor.Actor.aroundReceive(Actor.scala:537) ~[?:?]
        at akka.actor.Actor.aroundReceive$(Actor.scala:535) ~[?:?]
        at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220) 
~[?:?]
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580) ~[?:?]
        at akka.actor.ActorCell.invoke(ActorCell.scala:548) ~[?:?]
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270) ~[?:?]
        at akka.dispatch.Mailbox.run(Mailbox.scala:231) ~[?:?]
        at akka.dispatch.Mailbox.exec(Mailbox.scala:243) ~[?:?]
        at java.util.concurrent.ForkJoinTask.doExec(Unknown Source) ~[?:?]
        at java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(Unknown 
Source) ~[?:?]
        at java.util.concurrent.ForkJoinPool.scan(Unknown Source) ~[?:?]
        at java.util.concurrent.ForkJoinPool.runWorker(Unknown Source) ~[?:?]
        at java.util.concurrent.ForkJoinWorkerThread.run(Unknown Source) ~[?:?]
Caused by: java.lang.NoClassDefFoundError: Could not initialize class 
org.apache.beam.sdk.options.PipelineOptionsFactory
        at 
org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.open(BeamPythonFunctionRunner.java:238)
 ~[flink-python-1.16.0.jar:1.16.0]
        at 
org.apache.flink.streaming.api.operators.python.process.AbstractExternalPythonFunctionOperator.open(AbstractExternalPythonFunctionOperator.java:57)
 ~[flink-python-1.16.0.jar:1.16.0]
        at 
org.apache.flink.table.runtime.operators.python.AbstractStatelessFunctionOperator.open(AbstractStatelessFunctionOperator.java:92)
 ~[flink-python-1.16.0.jar:1.16.0]
        at 
org.apache.flink.table.runtime.operators.python.scalar.AbstractPythonScalarFunctionOperator.open(AbstractPythonScalarFunctionOperator.java:101)
 ~[flink-python-1.16.0.jar:1.16.0]
        at 
org.apache.flink.table.runtime.operators.python.scalar.PythonScalarFunctionOperator.open(PythonScalarFunctionOperator.java:71)
 ~[flink-python-1.16.0.jar:1.16.0]
        at 
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:107)
 ~[flink-dist-1.16.0.jar:1.16.0]
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:726)
 ~[flink-dist-1.16.0.jar:1.16.0]
        at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.call(StreamTaskActionExecutor.java:100)
 ~[flink-dist-1.16.0.jar:1.16.0]
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:702)
 ~[flink-dist-1.16.0.jar:1.16.0]
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:669)
 ~[flink-dist-1.16.0.jar:1.16.0]
        at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
 ~[flink-dist-1.16.0.jar:1.16.0]
        at 
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:904) 
~[flink-dist-1.16.0.jar:1.16.0]
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728) 
~[flink-dist-1.16.0.jar:1.16.0]
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550) 
~[flink-dist-1.16.0.jar:1.16.0]
        at java.lang.Thread.run(Unknown Source) ~[?:?]

此外在actiavte了对应env的情况下,使用pyflink调用函数,是正常返回的

from pyflink.common import Row
from pyflink.table import AggregateFunction, DataTypes, TableEnvironment, 
EnvironmentSettings
from pyflink.table.expressions import call
from pyflink.table.udf import udaf, udf
from pyflink.table.expressions import col, lit
from pyflink.table.window import Tumble

@udf(input_types=[DataTypes.STRING()],result_type=DataTypes.STRING())
def func1(line):
    return "udf_{}".format(line)

env_settings = EnvironmentSettings.in_streaming_mode()
table_env = TableEnvironment.create(env_settings)

t = table_env.from_elements([(1, 2, "Lee"),
                             (3, 4, "Jay"),
                             (5, 6, "Jay"),
                             (7, 8, "Lee")]).alias("value", "count", "name")



# register table
table_env.create_temporary_view("source", t)
table_env.execute_sql("CREATE TEMPORARY FUNCTION func1 AS 
'to_fahr.to_fahr.func1' LANGUAGE PYTHON;");
result = table_env.sql_query(
    """
    SELECT func1('test');
    """).execute()
result.print()
```


所以想请教下大佬们在flink sql模式下,都是怎么使用python udf的?


回复