Flink使用官方docker-compose起的,python
env用conda打的包(python3.7.12+apache-flink==1.16.0+apache-beam=2.38.0);
```bash
sql client 启动参数
# sql_client/task_manager/job_manager都挂载了对应目录,权限、所有者均为flink:flink
bin/sql-client.sh \ --pyExecutable
/opt/flink_data/requirements/py_env/jm_env/bin/python3.7 \ -pyfs
file:///opt/flink_data/requirements/udfs/
# 对应conda打包后的执行环境
bin/sql-client.sh \
--pyArchives
file:///opt/flink_data/requirements/py_env/pyflink_jm_1.16.0_env.zip \
--pyExecutable pyflink_jm_1.16.0_env.zip/bin/python3.7 \
-pyfs file:///opt/flink_data/requirements/udfs/ -j
/opt/flink/lib/flink-python-1.16.0.jar
# UDF函数
from pyflink.table import DataTypes
from pyflink.table.udf import udf
@udf(input_types=[DataTypes.STRING()],result_type=DataTypes.STRING())
def func1(line):
return "udf_{}".format(line)
# sql测试调用方式
CREATE TEMPORARY FUNCTION func1 AS 'to_fahr.to_fahr.func1' LANGUAGE PYTHON;
select func1('Chicago');
# console log
[ERROR] Could not execute SQL statement. Reason:
java.lang.NoClassDefFoundError: Could not initialize class
org.apache.beam.sdk.options.PipelineOptionsFactory
#client log
org.apache.flink.table.client.gateway.SqlExecutionException: Error while
retrieving result.
at
org.apache.flink.table.client.gateway.local.result.CollectResultBase$ResultRetrievalThread.run(CollectResultBase.java:79)
~[flink-sql-client-1.16.0.jar:1.16.0]
Caused by: java.lang.RuntimeException: Failed to fetch next result
at
org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:109)
~[flink-dist-1.16.0.jar:1.16.0]
at
org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80)
~[flink-dist-1.16.0.jar:1.16.0]
at
org.apache.flink.table.planner.connectors.CollectDynamicSink$CloseableRowIteratorWrapper.hasNext(CollectDynamicSink.java:222)
~[?:?]
at
org.apache.flink.table.client.gateway.local.result.CollectResultBase$ResultRetrievalThread.run(CollectResultBase.java:75)
~[flink-sql-client-1.16.0.jar:1.16.0]
Caused by: java.io.IOException: Failed to fetch job execution result
at
org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:184)
~[flink-dist-1.16.0.jar:1.16.0]
at
org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:121)
~[flink-dist-1.16.0.jar:1.16.0]
at
org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:106)
~[flink-dist-1.16.0.jar:1.16.0]
at
org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80)
~[flink-dist-1.16.0.jar:1.16.0]
at
org.apache.flink.table.planner.connectors.CollectDynamicSink$CloseableRowIteratorWrapper.hasNext(CollectDynamicSink.java:222)
~[?:?]
at
org.apache.flink.table.client.gateway.local.result.CollectResultBase$ResultRetrievalThread.run(CollectResultBase.java:75)
~[flink-sql-client-1.16.0.jar:1.16.0]
Caused by: java.util.concurrent.ExecutionException:
org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID:
b3dcd20267356a2280d09df4bcd6f440)
at java.util.concurrent.CompletableFuture.reportGet(Unknown Source)
~[?:?]
at java.util.concurrent.CompletableFuture.get(Unknown Source) ~[?:?]
at
org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:182)
~[flink-dist-1.16.0.jar:1.16.0]
at
org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:121)
~[flink-dist-1.16.0.jar:1.16.0]
at
org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:106)
~[flink-dist-1.16.0.jar:1.16.0]
at
org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80)
~[flink-dist-1.16.0.jar:1.16.0]
at
org.apache.flink.table.planner.connectors.CollectDynamicSink$CloseableRowIteratorWrapper.hasNext(CollectDynamicSink.java:222)
~[?:?]
at
org.apache.flink.table.client.gateway.local.result.CollectResultBase$ResultRetrievalThread.run(CollectResultBase.java:75)
~[flink-sql-client-1.16.0.jar:1.16.0]
Caused by: org.apache.flink.client.program.ProgramInvocationException: Job
failed (JobID: b3dcd20267356a2280d09df4bcd6f440)
at
org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:130)
~[flink-dist-1.16.0.jar:1.16.0]
at java.util.concurrent.CompletableFuture$UniApply.tryFire(Unknown
Source) ~[?:?]
at java.util.concurrent.CompletableFuture.postComplete(Unknown Source)
~[?:?]
at java.util.concurrent.CompletableFuture.complete(Unknown Source)
~[?:?]
at
org.apache.flink.util.concurrent.FutureUtils.lambda$retryOperationWithDelay$6(FutureUtils.java:301)
~[flink-dist-1.16.0.jar:1.16.0]
at java.util.concurrent.CompletableFuture.uniWhenComplete(Unknown
Source) ~[?:?]
at
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(Unknown Source)
~[?:?]
at java.util.concurrent.CompletableFuture.postComplete(Unknown Source)
~[?:?]
at java.util.concurrent.CompletableFuture.complete(Unknown Source)
~[?:?]
at
org.apache.flink.client.program.rest.RestClusterClient.lambda$pollResourceAsync$31(RestClusterClient.java:772)
~[flink-dist-1.16.0.jar:1.16.0]
at java.util.concurrent.CompletableFuture.uniWhenComplete(Unknown
Source) ~[?:?]
at
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(Unknown Source)
~[?:?]
at java.util.concurrent.CompletableFuture.postComplete(Unknown Source)
~[?:?]
at java.util.concurrent.CompletableFuture.complete(Unknown Source)
~[?:?]
at
org.apache.flink.util.concurrent.FutureUtils.lambda$retryOperationWithDelay$6(FutureUtils.java:301)
~[flink-dist-1.16.0.jar:1.16.0]
at java.util.concurrent.CompletableFuture.uniWhenComplete(Unknown
Source) ~[?:?]
at
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(Unknown Source)
~[?:?]
at java.util.concurrent.CompletableFuture.postComplete(Unknown Source)
~[?:?]
at java.util.concurrent.CompletableFuture.postFire(Unknown Source)
~[?:?]
at java.util.concurrent.CompletableFuture$UniCompose.tryFire(Unknown
Source) ~[?:?]
at java.util.concurrent.CompletableFuture$Completion.run(Unknown
Source) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
~[?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
~[?:?]
at java.lang.Thread.run(Unknown Source) ~[?:?]
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution
failed.
at
org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
~[flink-dist-1.16.0.jar:1.16.0]
at
org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:128)
~[flink-dist-1.16.0.jar:1.16.0]
at java.util.concurrent.CompletableFuture$UniApply.tryFire(Unknown
Source) ~[?:?]
at java.util.concurrent.CompletableFuture.postComplete(Unknown Source)
~[?:?]
at java.util.concurrent.CompletableFuture.complete(Unknown Source)
~[?:?]
at
org.apache.flink.util.concurrent.FutureUtils.lambda$retryOperationWithDelay$6(FutureUtils.java:301)
~[flink-dist-1.16.0.jar:1.16.0]
at java.util.concurrent.CompletableFuture.uniWhenComplete(Unknown
Source) ~[?:?]
at
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(Unknown Source)
~[?:?]
at java.util.concurrent.CompletableFuture.postComplete(Unknown Source)
~[?:?]
at java.util.concurrent.CompletableFuture.complete(Unknown Source)
~[?:?]
at
org.apache.flink.client.program.rest.RestClusterClient.lambda$pollResourceAsync$31(RestClusterClient.java:772)
~[flink-dist-1.16.0.jar:1.16.0]
at java.util.concurrent.CompletableFuture.uniWhenComplete(Unknown
Source) ~[?:?]
at
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(Unknown Source)
~[?:?]
at java.util.concurrent.CompletableFuture.postComplete(Unknown Source)
~[?:?]
at java.util.concurrent.CompletableFuture.complete(Unknown Source)
~[?:?]
at
org.apache.flink.util.concurrent.FutureUtils.lambda$retryOperationWithDelay$6(FutureUtils.java:301)
~[flink-dist-1.16.0.jar:1.16.0]
at java.util.concurrent.CompletableFuture.uniWhenComplete(Unknown
Source) ~[?:?]
at
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(Unknown Source)
~[?:?]
at java.util.concurrent.CompletableFuture.postComplete(Unknown Source)
~[?:?]
at java.util.concurrent.CompletableFuture.postFire(Unknown Source)
~[?:?]
at java.util.concurrent.CompletableFuture$UniCompose.tryFire(Unknown
Source) ~[?:?]
at java.util.concurrent.CompletableFuture$Completion.run(Unknown
Source) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
~[?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
~[?:?]
at java.lang.Thread.run(Unknown Source) ~[?:?]
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by
NoRestartBackoffTimeStrategy
at
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:139)
~[flink-dist-1.16.0.jar:1.16.0]
at
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:83)
~[flink-dist-1.16.0.jar:1.16.0]
at
org.apache.flink.runtime.scheduler.DefaultScheduler.recordTaskFailure(DefaultScheduler.java:256)
~[flink-dist-1.16.0.jar:1.16.0]
at
org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:247)
~[flink-dist-1.16.0.jar:1.16.0]
at
org.apache.flink.runtime.scheduler.DefaultScheduler.onTaskFailed(DefaultScheduler.java:240)
~[flink-dist-1.16.0.jar:1.16.0]
at
org.apache.flink.runtime.scheduler.SchedulerBase.onTaskExecutionStateUpdate(SchedulerBase.java:738)
~[flink-dist-1.16.0.jar:1.16.0]
at
org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:715)
~[flink-dist-1.16.0.jar:1.16.0]
at
org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:78)
~[flink-dist-1.16.0.jar:1.16.0]
at
org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:477)
~[flink-dist-1.16.0.jar:1.16.0]
at jdk.internal.reflect.GeneratedMethodAccessor88.invoke(Unknown
Source) ~[?:?]
at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown
Source) ~[?:?]
at java.lang.reflect.Method.invoke(Unknown Source) ~[?:?]
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRpcInvocation$1(AkkaRpcActor.java:309)
~[?:?]
at
org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
~[?:?]
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:307)
~[?:?]
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:222)
~[?:?]
at
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:84)
~[?:?]
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:168)
~[?:?]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) ~[?:?]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) ~[?:?]
at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
~[flink-scala_2.12-1.16.0.jar:1.16.0]
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
~[flink-scala_2.12-1.16.0.jar:1.16.0]
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
~[?:?]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
~[flink-scala_2.12-1.16.0.jar:1.16.0]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
~[flink-scala_2.12-1.16.0.jar:1.16.0]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
~[flink-scala_2.12-1.16.0.jar:1.16.0]
at akka.actor.Actor.aroundReceive(Actor.scala:537) ~[?:?]
at akka.actor.Actor.aroundReceive$(Actor.scala:535) ~[?:?]
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)
~[?:?]
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580) ~[?:?]
at akka.actor.ActorCell.invoke(ActorCell.scala:548) ~[?:?]
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270) ~[?:?]
at akka.dispatch.Mailbox.run(Mailbox.scala:231) ~[?:?]
at akka.dispatch.Mailbox.exec(Mailbox.scala:243) ~[?:?]
at java.util.concurrent.ForkJoinTask.doExec(Unknown Source) ~[?:?]
at java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(Unknown
Source) ~[?:?]
at java.util.concurrent.ForkJoinPool.scan(Unknown Source) ~[?:?]
at java.util.concurrent.ForkJoinPool.runWorker(Unknown Source) ~[?:?]
at java.util.concurrent.ForkJoinWorkerThread.run(Unknown Source) ~[?:?]
Caused by: java.lang.NoClassDefFoundError: Could not initialize class
org.apache.beam.sdk.options.PipelineOptionsFactory
at
org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.open(BeamPythonFunctionRunner.java:238)
~[flink-python-1.16.0.jar:1.16.0]
at
org.apache.flink.streaming.api.operators.python.process.AbstractExternalPythonFunctionOperator.open(AbstractExternalPythonFunctionOperator.java:57)
~[flink-python-1.16.0.jar:1.16.0]
at
org.apache.flink.table.runtime.operators.python.AbstractStatelessFunctionOperator.open(AbstractStatelessFunctionOperator.java:92)
~[flink-python-1.16.0.jar:1.16.0]
at
org.apache.flink.table.runtime.operators.python.scalar.AbstractPythonScalarFunctionOperator.open(AbstractPythonScalarFunctionOperator.java:101)
~[flink-python-1.16.0.jar:1.16.0]
at
org.apache.flink.table.runtime.operators.python.scalar.PythonScalarFunctionOperator.open(PythonScalarFunctionOperator.java:71)
~[flink-python-1.16.0.jar:1.16.0]
at
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:107)
~[flink-dist-1.16.0.jar:1.16.0]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:726)
~[flink-dist-1.16.0.jar:1.16.0]
at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.call(StreamTaskActionExecutor.java:100)
~[flink-dist-1.16.0.jar:1.16.0]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:702)
~[flink-dist-1.16.0.jar:1.16.0]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:669)
~[flink-dist-1.16.0.jar:1.16.0]
at
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
~[flink-dist-1.16.0.jar:1.16.0]
at
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:904)
~[flink-dist-1.16.0.jar:1.16.0]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
~[flink-dist-1.16.0.jar:1.16.0]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
~[flink-dist-1.16.0.jar:1.16.0]
at java.lang.Thread.run(Unknown Source) ~[?:?]
此外在actiavte了对应env的情况下,使用pyflink调用函数,是正常返回的
from pyflink.common import Row
from pyflink.table import AggregateFunction, DataTypes, TableEnvironment,
EnvironmentSettings
from pyflink.table.expressions import call
from pyflink.table.udf import udaf, udf
from pyflink.table.expressions import col, lit
from pyflink.table.window import Tumble
@udf(input_types=[DataTypes.STRING()],result_type=DataTypes.STRING())
def func1(line):
return "udf_{}".format(line)
env_settings = EnvironmentSettings.in_streaming_mode()
table_env = TableEnvironment.create(env_settings)
t = table_env.from_elements([(1, 2, "Lee"),
(3, 4, "Jay"),
(5, 6, "Jay"),
(7, 8, "Lee")]).alias("value", "count", "name")
# register table
table_env.create_temporary_view("source", t)
table_env.execute_sql("CREATE TEMPORARY FUNCTION func1 AS
'to_fahr.to_fahr.func1' LANGUAGE PYTHON;");
result = table_env.sql_query(
"""
SELECT func1('test');
""").execute()
result.print()
```
所以想请教下大佬们在flink sql模式下,都是怎么使用python udf的?