退订

2022-12-19 文章 邱钺
退订


Re:Re: flink sql connector options如何支持Map数据类型?

2022-12-19 文章 casel.chen
看过了,不支持http source table,而且即使http lookup table也不支持map数据类型

















在 2022-12-19 14:51:42,"Weihua Hu"  写道:
>Hi, 你可以尝试使用独立开源的 http connector
>
>https://github.com/getindata/flink-http-connector
>
>Best,
>Weihua
>
>
>On Sat, Dec 17, 2022 at 10:21 AM casel.chen  wrote:
>
>> 我想开发一个Flink SQL Http Connector,遇到用户添加自定义请求头Headers问题,如何在connector
>> options中支持Map数据类型呢?


RE: Re:Re: sql-client pyexec参数生效疑问

2022-12-19 文章 kung harold
设置-pyclientexec参数和sql client终端SET 'python.client.executable'='’; 都能解决` 
java.lang.IllegalStateException: Instantiating python function xx 
failed`的问题,但我接下来正常执行udf函数的时候依然报错,一直没有找到` Could not initialize class 
org.apache.beam.sdk.options.PipelineOptionsFactory `的原因是啥。
```
select func1('Chicago');
# console log
[ERROR] Could not execute SQL statement. Reason:
java.lang.NoClassDefFoundError: Could not initialize class 
org.apache.beam.sdk.options.PipelineOptionsFactory

#client log
org.apache.flink.table.client.gateway.SqlExecutionException: Error while 
retrieving result.
at 
org.apache.flink.table.client.gateway.local.result.CollectResultBase$ResultRetrievalThread.run(CollectResultBase.java:79)
 ~[flink-sql-client-1.16.0.jar:1.16.0]
Caused by: java.lang.RuntimeException: Failed to fetch next result
at 
org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:109)
 ~[flink-dist-1.16.0.jar:1.16.0]
at 
org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80)
 ~[flink-dist-1.16.0.jar:1.16.0]
at 
org.apache.flink.table.planner.connectors.CollectDynamicSink$CloseableRowIteratorWrapper.hasNext(CollectDynamicSink.java:222)
 ~[?:?]
at 
org.apache.flink.table.client.gateway.local.result.CollectResultBase$ResultRetrievalThread.run(CollectResultBase.java:75)
 ~[flink-sql-client-1.16.0.jar:1.16.0]
Caused by: java.io.IOException: Failed to fetch job execution result
at 
org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:184)
 ~[flink-dist-1.16.0.jar:1.16.0]
at 
org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:121)
 ~[flink-dist-1.16.0.jar:1.16.0]
at 
org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:106)
 ~[flink-dist-1.16.0.jar:1.16.0]
at 
org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80)
 ~[flink-dist-1.16.0.jar:1.16.0]
at 
org.apache.flink.table.planner.connectors.CollectDynamicSink$CloseableRowIteratorWrapper.hasNext(CollectDynamicSink.java:222)
 ~[?:?]
at 
org.apache.flink.table.client.gateway.local.result.CollectResultBase$ResultRetrievalThread.run(CollectResultBase.java:75)
 ~[flink-sql-client-1.16.0.jar:1.16.0]
Caused by: java.util.concurrent.ExecutionException: 
org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: 
b3dcd20267356a2280d09df4bcd6f440)
at java.util.concurrent.CompletableFuture.reportGet(Unknown Source) 
~[?:?]
at java.util.concurrent.CompletableFuture.get(Unknown Source) ~[?:?]
at 
org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:182)
 ~[flink-dist-1.16.0.jar:1.16.0]
at 
org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:121)
 ~[flink-dist-1.16.0.jar:1.16.0]
at 
org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:106)
 ~[flink-dist-1.16.0.jar:1.16.0]
at 
org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80)
 ~[flink-dist-1.16.0.jar:1.16.0]
at 
org.apache.flink.table.planner.connectors.CollectDynamicSink$CloseableRowIteratorWrapper.hasNext(CollectDynamicSink.java:222)
 ~[?:?]
at 
org.apache.flink.table.client.gateway.local.result.CollectResultBase$ResultRetrievalThread.run(CollectResultBase.java:75)
 ~[flink-sql-client-1.16.0.jar:1.16.0]
Caused by: org.apache.flink.client.program.ProgramInvocationException: Job 
failed (JobID: b3dcd20267356a2280d09df4bcd6f440)
at 
org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:130)
 ~[flink-dist-1.16.0.jar:1.16.0]
at java.util.concurrent.CompletableFuture$UniApply.tryFire(Unknown 
Source) ~[?:?]
at java.util.concurrent.CompletableFuture.postComplete(Unknown Source) 
~[?:?]
at java.util.concurrent.CompletableFuture.complete(Unknown Source) 
~[?:?]
at 
org.apache.flink.util.concurrent.FutureUtils.lambda$retryOperationWithDelay$6(FutureUtils.java:301)
 ~[flink-dist-1.16.0.jar:1.16.0]
at java.util.concurrent.CompletableFuture.uniWhenComplete(Unknown 
Source) ~[?:?]
at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(Unknown Source) 
~[?:?]
at java.util.concurrent.CompletableFuture.postComplete(Unknown Source) 
~[?:?]
at java.util.concurrent.CompletableFuture.complete(Unknown Source) 
~[?:?]
at 
org.apache.flink.client.program.rest.RestClusterClient.lambda$pollResourceAsync$31(RestClusterClient.java:772)
 ~[flink-dist-1.16.0.jar:1.16.0]
  

关于flink sql调用python udf失败的问题

2022-12-19 文章 kung harold
Flink使用官方docker-compose起的,python 
env用conda打的包(python3.7.12+apache-flink==1.16.0+apache-beam=2.38.0);
```bash
sql client 启动参数
# sql_client/task_manager/job_manager都挂载了对应目录,权限、所有者均为flink:flink
bin/sql-client.sh \ --pyExecutable 
/opt/flink_data/requirements/py_env/jm_env/bin/python3.7 \ -pyfs 
file:///opt/flink_data/requirements/udfs/

# 对应conda打包后的执行环境
bin/sql-client.sh \
--pyArchives 
file:///opt/flink_data/requirements/py_env/pyflink_jm_1.16.0_env.zip \
--pyExecutable pyflink_jm_1.16.0_env.zip/bin/python3.7 \
-pyfs file:///opt/flink_data/requirements/udfs/ -j 
/opt/flink/lib/flink-python-1.16.0.jar

# UDF函数
from pyflink.table import DataTypes
from pyflink.table.udf import udf


@udf(input_types=[DataTypes.STRING()],result_type=DataTypes.STRING())
def func1(line):
return "udf_{}".format(line)


# sql测试调用方式
CREATE TEMPORARY FUNCTION func1 AS 'to_fahr.to_fahr.func1' LANGUAGE PYTHON;
select func1('Chicago');
# console log
[ERROR] Could not execute SQL statement. Reason:
java.lang.NoClassDefFoundError: Could not initialize class 
org.apache.beam.sdk.options.PipelineOptionsFactory

#client log
org.apache.flink.table.client.gateway.SqlExecutionException: Error while 
retrieving result.
at 
org.apache.flink.table.client.gateway.local.result.CollectResultBase$ResultRetrievalThread.run(CollectResultBase.java:79)
 ~[flink-sql-client-1.16.0.jar:1.16.0]
Caused by: java.lang.RuntimeException: Failed to fetch next result
at 
org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:109)
 ~[flink-dist-1.16.0.jar:1.16.0]
at 
org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80)
 ~[flink-dist-1.16.0.jar:1.16.0]
at 
org.apache.flink.table.planner.connectors.CollectDynamicSink$CloseableRowIteratorWrapper.hasNext(CollectDynamicSink.java:222)
 ~[?:?]
at 
org.apache.flink.table.client.gateway.local.result.CollectResultBase$ResultRetrievalThread.run(CollectResultBase.java:75)
 ~[flink-sql-client-1.16.0.jar:1.16.0]
Caused by: java.io.IOException: Failed to fetch job execution result
at 
org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:184)
 ~[flink-dist-1.16.0.jar:1.16.0]
at 
org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:121)
 ~[flink-dist-1.16.0.jar:1.16.0]
at 
org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:106)
 ~[flink-dist-1.16.0.jar:1.16.0]
at 
org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80)
 ~[flink-dist-1.16.0.jar:1.16.0]
at 
org.apache.flink.table.planner.connectors.CollectDynamicSink$CloseableRowIteratorWrapper.hasNext(CollectDynamicSink.java:222)
 ~[?:?]
at 
org.apache.flink.table.client.gateway.local.result.CollectResultBase$ResultRetrievalThread.run(CollectResultBase.java:75)
 ~[flink-sql-client-1.16.0.jar:1.16.0]
Caused by: java.util.concurrent.ExecutionException: 
org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: 
b3dcd20267356a2280d09df4bcd6f440)
at java.util.concurrent.CompletableFuture.reportGet(Unknown Source) 
~[?:?]
at java.util.concurrent.CompletableFuture.get(Unknown Source) ~[?:?]
at 
org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:182)
 ~[flink-dist-1.16.0.jar:1.16.0]
at 
org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:121)
 ~[flink-dist-1.16.0.jar:1.16.0]
at 
org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:106)
 ~[flink-dist-1.16.0.jar:1.16.0]
at 
org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80)
 ~[flink-dist-1.16.0.jar:1.16.0]
at 
org.apache.flink.table.planner.connectors.CollectDynamicSink$CloseableRowIteratorWrapper.hasNext(CollectDynamicSink.java:222)
 ~[?:?]
at 
org.apache.flink.table.client.gateway.local.result.CollectResultBase$ResultRetrievalThread.run(CollectResultBase.java:75)
 ~[flink-sql-client-1.16.0.jar:1.16.0]
Caused by: org.apache.flink.client.program.ProgramInvocationException: Job 
failed (JobID: b3dcd20267356a2280d09df4bcd6f440)
at 
org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:130)
 ~[flink-dist-1.16.0.jar:1.16.0]
at java.util.concurrent.CompletableFuture$UniApply.tryFire(Unknown 
Source) ~[?:?]
at java.util.concurrent.CompletableFuture.postComplete(Unknown Source) 
~[?:?]
at java.util.concurrent.CompletableFuture.complete(Unknown Source) 
~[?:?]
at 
org.apache.flink.util.conc