退订
退订
Re:Re: flink sql connector options如何支持Map数据类型?
看过了,不支持http source table,而且即使http lookup table也不支持map数据类型 在 2022-12-19 14:51:42,"Weihua Hu" 写道: >Hi, 你可以尝试使用独立开源的 http connector > >https://github.com/getindata/flink-http-connector > >Best, >Weihua > > >On Sat, Dec 17, 2022 at 10:21 AM casel.chen wrote: > >> 我想开发一个Flink SQL Http Connector,遇到用户添加自定义请求头Headers问题,如何在connector >> options中支持Map数据类型呢?
RE: Re:Re: sql-client pyexec参数生效疑问
设置-pyclientexec参数和sql client终端SET 'python.client.executable'='’; 都能解决` java.lang.IllegalStateException: Instantiating python function xx failed`的问题,但我接下来正常执行udf函数的时候依然报错,一直没有找到` Could not initialize class org.apache.beam.sdk.options.PipelineOptionsFactory `的原因是啥。 ``` select func1('Chicago'); # console log [ERROR] Could not execute SQL statement. Reason: java.lang.NoClassDefFoundError: Could not initialize class org.apache.beam.sdk.options.PipelineOptionsFactory #client log org.apache.flink.table.client.gateway.SqlExecutionException: Error while retrieving result. at org.apache.flink.table.client.gateway.local.result.CollectResultBase$ResultRetrievalThread.run(CollectResultBase.java:79) ~[flink-sql-client-1.16.0.jar:1.16.0] Caused by: java.lang.RuntimeException: Failed to fetch next result at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:109) ~[flink-dist-1.16.0.jar:1.16.0] at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80) ~[flink-dist-1.16.0.jar:1.16.0] at org.apache.flink.table.planner.connectors.CollectDynamicSink$CloseableRowIteratorWrapper.hasNext(CollectDynamicSink.java:222) ~[?:?] at org.apache.flink.table.client.gateway.local.result.CollectResultBase$ResultRetrievalThread.run(CollectResultBase.java:75) ~[flink-sql-client-1.16.0.jar:1.16.0] Caused by: java.io.IOException: Failed to fetch job execution result at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:184) ~[flink-dist-1.16.0.jar:1.16.0] at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:121) ~[flink-dist-1.16.0.jar:1.16.0] at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:106) ~[flink-dist-1.16.0.jar:1.16.0] at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80) ~[flink-dist-1.16.0.jar:1.16.0] at org.apache.flink.table.planner.connectors.CollectDynamicSink$CloseableRowIteratorWrapper.hasNext(CollectDynamicSink.java:222) ~[?:?] at org.apache.flink.table.client.gateway.local.result.CollectResultBase$ResultRetrievalThread.run(CollectResultBase.java:75) ~[flink-sql-client-1.16.0.jar:1.16.0] Caused by: java.util.concurrent.ExecutionException: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: b3dcd20267356a2280d09df4bcd6f440) at java.util.concurrent.CompletableFuture.reportGet(Unknown Source) ~[?:?] at java.util.concurrent.CompletableFuture.get(Unknown Source) ~[?:?] at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:182) ~[flink-dist-1.16.0.jar:1.16.0] at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:121) ~[flink-dist-1.16.0.jar:1.16.0] at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:106) ~[flink-dist-1.16.0.jar:1.16.0] at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80) ~[flink-dist-1.16.0.jar:1.16.0] at org.apache.flink.table.planner.connectors.CollectDynamicSink$CloseableRowIteratorWrapper.hasNext(CollectDynamicSink.java:222) ~[?:?] at org.apache.flink.table.client.gateway.local.result.CollectResultBase$ResultRetrievalThread.run(CollectResultBase.java:75) ~[flink-sql-client-1.16.0.jar:1.16.0] Caused by: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: b3dcd20267356a2280d09df4bcd6f440) at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:130) ~[flink-dist-1.16.0.jar:1.16.0] at java.util.concurrent.CompletableFuture$UniApply.tryFire(Unknown Source) ~[?:?] at java.util.concurrent.CompletableFuture.postComplete(Unknown Source) ~[?:?] at java.util.concurrent.CompletableFuture.complete(Unknown Source) ~[?:?] at org.apache.flink.util.concurrent.FutureUtils.lambda$retryOperationWithDelay$6(FutureUtils.java:301) ~[flink-dist-1.16.0.jar:1.16.0] at java.util.concurrent.CompletableFuture.uniWhenComplete(Unknown Source) ~[?:?] at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(Unknown Source) ~[?:?] at java.util.concurrent.CompletableFuture.postComplete(Unknown Source) ~[?:?] at java.util.concurrent.CompletableFuture.complete(Unknown Source) ~[?:?] at org.apache.flink.client.program.rest.RestClusterClient.lambda$pollResourceAsync$31(RestClusterClient.java:772) ~[flink-dist-1.16.0.jar:1.16.0]
关于flink sql调用python udf失败的问题
Flink使用官方docker-compose起的,python env用conda打的包(python3.7.12+apache-flink==1.16.0+apache-beam=2.38.0); ```bash sql client 启动参数 # sql_client/task_manager/job_manager都挂载了对应目录,权限、所有者均为flink:flink bin/sql-client.sh \ --pyExecutable /opt/flink_data/requirements/py_env/jm_env/bin/python3.7 \ -pyfs file:///opt/flink_data/requirements/udfs/ # 对应conda打包后的执行环境 bin/sql-client.sh \ --pyArchives file:///opt/flink_data/requirements/py_env/pyflink_jm_1.16.0_env.zip \ --pyExecutable pyflink_jm_1.16.0_env.zip/bin/python3.7 \ -pyfs file:///opt/flink_data/requirements/udfs/ -j /opt/flink/lib/flink-python-1.16.0.jar # UDF函数 from pyflink.table import DataTypes from pyflink.table.udf import udf @udf(input_types=[DataTypes.STRING()],result_type=DataTypes.STRING()) def func1(line): return "udf_{}".format(line) # sql测试调用方式 CREATE TEMPORARY FUNCTION func1 AS 'to_fahr.to_fahr.func1' LANGUAGE PYTHON; select func1('Chicago'); # console log [ERROR] Could not execute SQL statement. Reason: java.lang.NoClassDefFoundError: Could not initialize class org.apache.beam.sdk.options.PipelineOptionsFactory #client log org.apache.flink.table.client.gateway.SqlExecutionException: Error while retrieving result. at org.apache.flink.table.client.gateway.local.result.CollectResultBase$ResultRetrievalThread.run(CollectResultBase.java:79) ~[flink-sql-client-1.16.0.jar:1.16.0] Caused by: java.lang.RuntimeException: Failed to fetch next result at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:109) ~[flink-dist-1.16.0.jar:1.16.0] at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80) ~[flink-dist-1.16.0.jar:1.16.0] at org.apache.flink.table.planner.connectors.CollectDynamicSink$CloseableRowIteratorWrapper.hasNext(CollectDynamicSink.java:222) ~[?:?] at org.apache.flink.table.client.gateway.local.result.CollectResultBase$ResultRetrievalThread.run(CollectResultBase.java:75) ~[flink-sql-client-1.16.0.jar:1.16.0] Caused by: java.io.IOException: Failed to fetch job execution result at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:184) ~[flink-dist-1.16.0.jar:1.16.0] at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:121) ~[flink-dist-1.16.0.jar:1.16.0] at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:106) ~[flink-dist-1.16.0.jar:1.16.0] at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80) ~[flink-dist-1.16.0.jar:1.16.0] at org.apache.flink.table.planner.connectors.CollectDynamicSink$CloseableRowIteratorWrapper.hasNext(CollectDynamicSink.java:222) ~[?:?] at org.apache.flink.table.client.gateway.local.result.CollectResultBase$ResultRetrievalThread.run(CollectResultBase.java:75) ~[flink-sql-client-1.16.0.jar:1.16.0] Caused by: java.util.concurrent.ExecutionException: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: b3dcd20267356a2280d09df4bcd6f440) at java.util.concurrent.CompletableFuture.reportGet(Unknown Source) ~[?:?] at java.util.concurrent.CompletableFuture.get(Unknown Source) ~[?:?] at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:182) ~[flink-dist-1.16.0.jar:1.16.0] at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:121) ~[flink-dist-1.16.0.jar:1.16.0] at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:106) ~[flink-dist-1.16.0.jar:1.16.0] at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80) ~[flink-dist-1.16.0.jar:1.16.0] at org.apache.flink.table.planner.connectors.CollectDynamicSink$CloseableRowIteratorWrapper.hasNext(CollectDynamicSink.java:222) ~[?:?] at org.apache.flink.table.client.gateway.local.result.CollectResultBase$ResultRetrievalThread.run(CollectResultBase.java:75) ~[flink-sql-client-1.16.0.jar:1.16.0] Caused by: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: b3dcd20267356a2280d09df4bcd6f440) at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:130) ~[flink-dist-1.16.0.jar:1.16.0] at java.util.concurrent.CompletableFuture$UniApply.tryFire(Unknown Source) ~[?:?] at java.util.concurrent.CompletableFuture.postComplete(Unknown Source) ~[?:?] at java.util.concurrent.CompletableFuture.complete(Unknown Source) ~[?:?] at org.apache.flink.util.conc