Example Flink SQL fromDataStream watermark code not showing *rowtime*

2022-11-25 Thread Dan Hill
Hi.  I copied the Flink code from this page.  My printSchema() does not
contain **ROWTIME** in the output.  I'm running Flink v1.14.4.

https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/data_stream_api/



public static class User {...}

DataStream dataStream =
env.fromElements(
new User("Alice", 4, Instant.ofEpochMilli(1000)),
new User("Bob", 6, Instant.ofEpochMilli(1001)),
new User("Alice", 10, Instant.ofEpochMilli(1002)));

Table table =
tableEnv.fromDataStream(
dataStream,
Schema.newBuilder()
.columnByExpression("rowtime",
"CAST(event_time AS TIMESTAMP_LTZ(3))")
.watermark("rowtime", "rowtime - INTERVAL '10' SECOND")
.build());
table.printSchema();


Actual: this printSchema prints the following:

(
`name` STRING,
`score` INT,
`event_time` TIMESTAMP_LTZ(9),
`rowtime` TIMESTAMP_LTZ(3) AS CAST(event_time AS TIMESTAMP_LTZ(3)),
WATERMARK FOR `rowtime`: TIMESTAMP_LTZ(3) AS rowtime - INTERVAL '10' SECOND
)

Expected: the Flink docs say there should be a **ROWTIME** in the schema
details for rowtime.


如何扩展flink sql以实现延迟调用?

2022-11-25 Thread casel.chen
双流关联场景下,流A数据到达后并不马上和流B进行关联,而是经过设置一段时间后再关联,这种场景下如何用flink 
sql实现?如果当前不支持,需要怎样扩展flink sql呢?

Flink jdbc base-url 格式限制探讨

2022-11-25 Thread melin li
JdbcCatalogUtils 简单校验url 格式,对于pg,sqlserver 等数据库是有多catalog,且不同catalog
之间是不能相互访问,参考presto 设计处理,对于同一个pg实例的不同catalog,需要配置多个,具体到catalog,建议配置url
应该指定catalog。还有oracle数据,是需要指定sid,定制OracleCatalog 就遇到困惑。Spark jdbc 没有限制。
[image: image.png]

第二问题:
oracle,pg,sqlserver Dialect中 quoteIdentifier方法,没有添加 双引号,有一些场景会出现问题。
建议默认添加双引号。例如spark 默认添加了
[image: image.png]


[ANNOUNCE] Apache Flink 1.15.3 released

2022-11-25 Thread Fabian Paul
The Apache Flink community is very happy to announce the release of
Apache Flink 1.15.3, which is the third bugfix release for the Apache
Flink 1.15 series.

Apache Flink® is an open-source stream processing framework for
distributed, high-performing, always-available, and accurate data
streaming applications.

The release is available for download at:
https://flink.apache.org/downloads.html

The full release notes are available in Jira:
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12352210

We would like to thank all contributors of the Apache Flink community
who made this release possible!

Feel free to reach out to the release managers (or respond to this
thread) with feedback on the release process. Our goal is to
constantly improve the release process. Feedback on what could be
improved or things that didn't go so well are appreciated.

Regards,
Release Manager


[ANNOUNCE] Apache Flink 1.15.3 released

2022-11-25 Thread Fabian Paul
The Apache Flink community is very happy to announce the release of
Apache Flink 1.15.3, which is the third bugfix release for the Apache
Flink 1.15 series.

Apache Flink® is an open-source stream processing framework for
distributed, high-performing, always-available, and accurate data
streaming applications.

The release is available for download at:
https://flink.apache.org/downloads.html

The full release notes are available in Jira:
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12352210

We would like to thank all contributors of the Apache Flink community
who made this release possible!

Feel free to reach out to the release managers (or respond to this
thread) with feedback on the release process. Our goal is to
constantly improve the release process. Feedback on what could be
improved or things that didn't go so well are appreciated.

Regards,
Release Manager


Several job in kubernetes restarts because Scheduler is being stopped.

2022-11-25 Thread Evgeniy Lyutikov
Hello!
In our k8s application cluster (served by flink-operator) several jobs restart 
at the same time with the same error.

What is the reason for this restart and how can it be prevented?



2022-11-25T07:50:47.253459360Z INFO  
org.apache.flink.runtime.resourcemanager.ResourceManagerServiceImpl [] - 
Resource manager service is revoked leadership with session id 
5e76a7e2-0a88-4cff-b371-0a36f2b4cebd.
2022-11-25T07:50:47.257093040Z INFO  
org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager [] 
- Closing the slot manager.
2022-11-25T07:50:47.257145141Z INFO  
org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager [] 
- Suspending the slot manager.
2022-11-25T07:50:47.258932353Z INFO  
org.apache.flink.runtime.leaderretrieval.DefaultLeaderRetrievalService [] - 
Stopping DefaultLeaderRetrievalService.
2022-11-25T07:50:47.258974224Z INFO  
org.apache.flink.kubernetes.highavailability.KubernetesLeaderRetrievalDriver [] 
- Stopping 
KubernetesLeaderRetrievalDriver{configMapName='job-name--jobmanager-leader'}.
2022-11-25T07:50:47.259457605Z INFO  
org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMapSharedInformer
 [] - Stopped to watch for 
job-name/job-name--jobmanager-leader, watching 
id:c310c788-946a-4afd-8aeb-debd99a9045d
2022-11-25T07:50:47.351610077Z INFO  
org.apache.flink.runtime.dispatcher.runner.DefaultDispatcherRunner [] - 
DefaultDispatcherRunner was revoked the leadership with leader id 
dc208675-e994-4697-b94c-542fd52e2046. Stopping the DispatcherLeaderProcess.
2022-11-25T07:50:47.352119375Z INFO  
org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess [] - 
Stopping SessionDispatcherLeaderProcess.
2022-11-25T07:50:47.352310045Z INFO  
org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Stopping 
dispatcher akka.tcp://flink@10.156.130.89:6123/user/rpc/dispatcher_0.
2022-11-25T07:50:47.352349213Z INFO  
org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Stopping all 
currently running jobs of dispatcher 
akka.tcp://flink@10.156.130.89:6123/user/rpc/dispatcher_0.
2022-11-25T07:50:47.353150035Z INFO  
org.apache.flink.runtime.jobmaster.JobMaster [] - Stopping the 
JobMaster for job 'job-name' ().
2022-11-25T07:50:47.363572384Z INFO  
org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Job 
 reached terminal state SUSPENDED.
2022-11-25T07:50:47.364190310Z INFO  
org.apache.flink.runtime.jobmanager.DefaultJobGraphStore [] - Released job 
graph  from 
KubernetesStateHandleStore{configMapName='job-name-dispatcher-leader'}.
2022-11-25T07:50:47.366146897Z INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Job job-name 
() switched from state RUNNING to SUSPENDED.
2022-11-25T07:50:47.366172044Z org.apache.flink.util.FlinkException: Scheduler 
is being stopped.
2022-11-25T07:50:47.366178811Z  at 
org.apache.flink.runtime.scheduler.SchedulerBase.closeAsync(SchedulerBase.java:600)
 ~[flink-dist_2.12-1.14.4.jar:1.14.4]
2022-11-25T07:50:47.366184972Z  at 
org.apache.flink.runtime.jobmaster.JobMaster.stopScheduling(JobMaster.java:972) 
~[flink-dist_2.12-1.14.4.jar:1.14.4]
2022-11-25T07:50:47.366190500Z  at 
org.apache.flink.runtime.jobmaster.JobMaster.stopJobExecution(JobMaster.java:935)
 ~[flink-dist_2.12-1.14.4.jar:1.14.4]
2022-11-25T07:50:47.366196302Z  at 
org.apache.flink.runtime.jobmaster.JobMaster.onStop(JobMaster.java:407) 
~[flink-dist_2.12-1.14.4.jar:1.14.4]
2022-11-25T07:50:47.366201692Z  at 
org.apache.flink.runtime.rpc.RpcEndpoint.internalCallOnStop(RpcEndpoint.java:214)
 ~[flink-dist_2.12-1.14.4.jar:1.14.4]
2022-11-25T07:50:47.366207681Z  at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StartedState.lambda$terminate$0(AkkaRpcActor.java:580)
 ~[flink-rpc-akka_de9aa37f-fc7d-4780-8d43-5715ee860795.jar:1.14.4]
2022-11-25T07:50:47.366213755Z  at 
org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
 ~[flink-rpc-akka_de9aa37f-fc7d-4780-8d43-5715ee860795.jar:1.14.4]
2022-11-25T07:50:47.366220099Z  at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StartedState.terminate(AkkaRpcActor.java:579)
 ~[flink-rpc-akka_de9aa37f-fc7d-4780-8d43-5715ee860795.jar:1.14.4]
2022-11-25T07:50:47.366225868Z  at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleControlMessage(AkkaRpcActor.java:191)
 ~[flink-rpc-akka_de9aa37f-fc7d-4780-8d43-5715ee860795.jar:1.14.4]
2022-11-25T07:50:47.366231897Z  at 
akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) 
[flink-rpc-akka_de9aa37f-fc7d-4780-8d43-5715ee860795.jar:1.14.4]
2022-11-25T07:50:47.366237418Z  at 
akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) 
[flink-rpc-akka_de9aa37f-fc7d-4780-8d43-5715ee860795.jar:1.14.4]

Re: How can I deploy a flink cluster with 4 TaskManagers?

2022-11-25 Thread Evgeniy Lyutikov
Hello
Taskmanager count = job.parallelism / taskmanager.numberOfTaskSlots



От: Mark Lee 
Отправлено: 25 ноября 2022 г. 18:30:31
Кому: user@flink.apache.org
Тема: How can I deploy a flink cluster with 4 TaskManagers?

Hi all,
How can I deploy a flink cluster with 1 Job Manager and 4 Task Managers using 
FlinkDeployment CR?

Such sample in Flink Operator can only create 1 Task Manager.
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: basic-session-deployment-only-example
spec:
  image: flink:1.15
  flinkVersion: v1_15
  flinkConfiguration:
taskmanager.numberOfTaskSlots: "2"
  serviceAccount: flink
  jobManager:
resource:
  memory: "2048m"
  cpu: 1
  taskManager:
resource:
  memory: "2048m"
  cpu: 1

Thank you.


"This message contains confidential information/commercial secret. If you are 
not the intended addressee of this message you may not copy, save, print or 
forward it to any third party and you are kindly requested to destroy this 
message and notify the sender thereof by email.
Данное сообщение содержит конфиденциальную информацию/информацию, являющуюся 
коммерческой тайной. Если Вы не являетесь надлежащим адресатом данного 
сообщения, Вы не вправе копировать, сохранять, печатать или пересылать его 
каким либо иным лицам. Просьба уничтожить данное сообщение и уведомить об этом 
отправителя электронным письмом."


How can I deploy a flink cluster with 4 TaskManagers?

2022-11-25 Thread Mark Lee
Hi all,

How can I deploy a flink cluster with 1 Job Manager and 4 Task Managers
using FlinkDeployment CR?

 

Such sample in Flink Operator can only create 1 Task Manager.

apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: basic-session-deployment-only-example
spec:
  image: flink:1.15
  flinkVersion: v1_15
  flinkConfiguration:
taskmanager.numberOfTaskSlots: "2"
  serviceAccount: flink
  jobManager:
resource:
  memory: "2048m"
  cpu: 1
  taskManager:
resource:
  memory: "2048m"
  cpu: 1

 

Thank you.



Re: 支持oracle、sqlserver、db2 jdbc catalog

2022-11-25 Thread melin li
https://github.com/melin/flink-cdc-catalog 准备补全jdbc catalog和cdc catalog

melin li  于2022年11月24日周四 19:08写道:

> flink jdbc catalog 只支持mysql pg,有计划支持oracle、sqlserver、db2 数据库?
>


Re: Safe way to clear old checkpoint data

2022-11-25 Thread Evgeniy Lyutikov
Thanks for the answer
We can't update flink to version 1.15 yet.
I'm interested in restoring from a checkpoint, theoretically, only those sst 
files that are mentioned in _metadata or something else are enough?
Can I just delete files that are not referenced in _metadata?



От: Martijn Visser 
Отправлено: 25 ноября 2022 г. 16:15:45
Кому: Evgeniy Lyutikov
Копия: user
Тема: Re: Safe way to clear old checkpoint data

Hi,

I would recommend upgrading to Flink 1.15, given the changes that were made in 
1.15 make ownership more understandable.  See 
https://flink.apache.org/2022/05/06/restore-modes.html

Best regards,

Martijn

On Fri, Nov 25, 2022 at 9:33 AM Evgeniy Lyutikov 
mailto:eblyuti...@avito.ru>> wrote:

Hello
We use Flink 1.14.4 in kubernetes operator (version 1.2.0), all chepoint data 
store in s3 bucket.

If parse _metadata file of checkpoint it contains links to objects in the 
shared directory and their number is much less than the total number of objects 
in the directory.

For example, the number of links in _metadata file is 24000, and the total 
number of objects in shared directory is about 10. What is the safest way 
to delete unused files and free up space?


“This message contains confidential information/commercial secret. If you are 
not the intended addressee of this message you may not copy, save, print or 
forward it to any third party and you are kindly requested to destroy this 
message and notify the sender thereof by email.
Данное сообщение содержит конфиденциальную информацию/информацию, являющуюся 
коммерческой тайной. Если Вы не являетесь надлежащим адресатом данного 
сообщения, Вы не вправе копировать, сохранять, печатать или пересылать его 
каким либо иным лицам. Просьба уничтожить данное сообщение и уведомить об этом 
отправителя электронным письмом.”


Re: Safe way to clear old checkpoint data

2022-11-25 Thread Martijn Visser
Hi,

I would recommend upgrading to Flink 1.15, given the changes that were made
in 1.15 make ownership more understandable.  See
https://flink.apache.org/2022/05/06/restore-modes.html

Best regards,

Martijn

On Fri, Nov 25, 2022 at 9:33 AM Evgeniy Lyutikov 
wrote:

> Hello
> We use Flink 1.14.4 in kubernetes operator (version 1.2.0), all chepoint
> data store in s3 bucket.
>
> If parse _metadata file of checkpoint it contains links to objects in the
> shared directory and their number is much less than the total number of
> objects in the directory.
>
> For example, the number of links in _metadata file is 24000, and the
> total number of objects in shared directory is about 10. What is the
> safest way to delete unused files and free up space?
>
> * -- *“This message contains confidential
> information/commercial secret. If you are not the intended addressee of
> this message you may not copy, save, print or forward it to any third party
> and you are kindly requested to destroy this message and notify the sender
> thereof by email.
> Данное сообщение содержит конфиденциальную информацию/информацию,
> являющуюся коммерческой тайной. Если Вы не являетесь надлежащим адресатом
> данного сообщения, Вы не вправе копировать, сохранять, печатать или
> пересылать его каким либо иным лицам. Просьба уничтожить данное сообщение и
> уведомить об этом отправителя электронным письмом.”
>


flink-kafka-connector 消费时获取不到topic-paitition

2022-11-25 Thread 朱文忠
kafka connector 开了这个配置, 'properties.allow.auto.create.topics' = 'true'
文档里面也有提到
, 但是开启flinkKafkaComsumer消费一个新的topic时,还是报找不到topic的错误,有大佬帮忙解释一下吗?
报错如下:
这是我的配置
kafka broker 也开启了自动创建topic的配置


Safe way to clear old checkpoint data

2022-11-25 Thread Evgeniy Lyutikov
Hello
We use Flink 1.14.4 in kubernetes operator (version 1.2.0), all chepoint data 
store in s3 bucket.

If parse _metadata file of checkpoint it contains links to objects in the 
shared directory and their number is much less than the total number of objects 
in the directory.

For example, the number of links in _metadata file is 24000, and the total 
number of objects in shared directory is about 10. What is the safest way 
to delete unused files and free up space?


"This message contains confidential information/commercial secret. If you are 
not the intended addressee of this message you may not copy, save, print or 
forward it to any third party and you are kindly requested to destroy this 
message and notify the sender thereof by email.
Данное сообщение содержит конфиденциальную информацию/информацию, являющуюся 
коммерческой тайной. Если Вы не являетесь надлежащим адресатом данного 
сообщения, Вы не вправе копировать, сохранять, печатать или пересылать его 
каким либо иным лицам. Просьба уничтожить данное сообщение и уведомить об этом 
отправителя электронным письмом."


Re: Support for higher-than-millisecond resolution event-time timestamps

2022-11-25 Thread David Anderson
When it comes to event time processing and watermarks, I believe that if
you stick to the lower level APIs, then the milliseconds assumption is
indeed arbitrary, but at higher levels that assumption is baked in.

In other words, that rules out using Flink SQL, or things
like TumblingEventTimeWindows.of(Time.milliseconds(10)). It might not be
difficult to build something to work around those assumptions, but I
haven't given it much thought. But if you stick to KeyedProcessFunction, it
should be fine.

Best,
David

On Fri, Nov 25, 2022 at 5:32 AM Salva Alcántara 
wrote:

> As mentioned in the docs
> 
> :
>
> > Attention: Both timestamps and watermarks are specified as milliseconds
> since the Java epoch of 1970-01-01T00:00:00Z.
>
> Are there any plans for supporting higher time resolutions?
>
> Also, internally, Flink uses the `long` type for the timestamps, so maybe
> the milliseconds assumption is arbitrary and things would actually work
> just fine for higher resolutions provided that they fit into the long type
> (???). I found this SO post:
>
>
> https://stackoverflow.com/questions/54402759/streaming-data-processing-and-nano-second-time-resolution
>
> which touches upon this but it's a bit old already and there seems to be
> no clear answer in the end. So maybe we could touch base on it.
>
> Regards,
>
> Salva
>


Re: Support for higher-than-millisecond resolution event-time timestamps

2022-11-25 Thread Martijn Visser
Hi Salva,

I'm unaware of any plans to support those. Contributions are always welcome
of course :)

Best regards,

Martijn

On Fri, Nov 25, 2022 at 5:32 AM Salva Alcántara 
wrote:

> As mentioned in the docs
> 
> :
>
> > Attention: Both timestamps and watermarks are specified as milliseconds
> since the Java epoch of 1970-01-01T00:00:00Z.
>
> Are there any plans for supporting higher time resolutions?
>
> Also, internally, Flink uses the `long` type for the timestamps, so maybe
> the milliseconds assumption is arbitrary and things would actually work
> just fine for higher resolutions provided that they fit into the long type
> (???). I found this SO post:
>
>
> https://stackoverflow.com/questions/54402759/streaming-data-processing-and-nano-second-time-resolution
>
> which touches upon this but it's a bit old already and there seems to be
> no clear answer in the end. So maybe we could touch base on it.
>
> Regards,
>
> Salva
>


Re: Pyflink提交

2022-11-25 Thread Xingbo Huang
Hi,

根据报错的提示,执行命令./python3.6.8.zip/bin/python3时没法导入pyflink,你可以在本地检查一下你的这个虚拟环境是不是没有成功安上pyflink

Best,
Xingbo

程龙 <13162790...@163.com> 于2022年11月25日周五 16:02写道:

> 在使用pyflink提交任务时,部署模式onyarn
> 1 在不使用Map等算子下如下参数 能够提交成功 并且运行
> .flink run  -ynm pytest   -m yarn-cluster  -pyclientexec ***/python3
>  -pyexec ***/python3  -pyarch *** /python3.6.8.zip   -py  demo.py
>
>
> 2 在使用到map算子时 提交没有问题,但是运行报错,报错日志如下:
> .flink run  -ynm pytest   -m yarn-cluster  -pyclientexec ***/python3
>  -pyexec ***/python3  -pyarch *** /python3.6.8.zip   -py  demo.py
> 报错内容:
> Caused by: java.io.IOException: Failed to execute the command:
> ./python3.6.8.zip/bin/python3 -c import pyflink;import
> os;print(os.path.join(os.path.abspath(os.path.dirname(pyflink.__file__)),
> 'bin'))
> output: Traceback (most recent call last):
>   File "", line 1, in 
> ModuleNotFoundError: No module named 'pyflink'
> at
> org.apache.flink.python.util.PythonEnvironmentManagerUtils.execute(PythonEnvironmentManagerUtils.java:211)
> at
> org.apache.flink.python.util.PythonEnvironmentManagerUtils.getPythonUdfRunnerScript(PythonEnvironmentManagerUtils.java:154)
> at
> org.apache.flink.python.env.beam.ProcessPythonEnvironmentManager.createEnvironment(ProcessPythonEnvironmentManager.java:156)
> at
> org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.createPythonExecutionEnvironment(BeamPythonFunctionRunner.java:398)
> at
> org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.lambda$open$0(BeamPythonFunctionRunner.java:246)
> at
> org.apache.flink.runtime.memory.MemoryManager.lambda$getSharedMemoryResourceForManagedMemory$5(MemoryManager.java:539)
>
>
> 请问环境变量如何设置才能正常运行


Re: Pyflink提交

2022-11-25 Thread Dian Fu
集群端的 Python 环境中没有安装 PyFlink: ***/python3 这个环境



On Fri, Nov 25, 2022 at 4:02 PM 程龙 <13162790...@163.com> wrote:

> 在使用pyflink提交任务时,部署模式onyarn
> 1 在不使用Map等算子下如下参数 能够提交成功 并且运行
> .flink run  -ynm pytest   -m yarn-cluster  -pyclientexec ***/python3
>  -pyexec ***/python3  -pyarch *** /python3.6.8.zip   -py  demo.py
>
>
> 2 在使用到map算子时 提交没有问题,但是运行报错,报错日志如下:
> .flink run  -ynm pytest   -m yarn-cluster  -pyclientexec ***/python3
>  -pyexec ***/python3  -pyarch *** /python3.6.8.zip   -py  demo.py
> 报错内容:
> Caused by: java.io.IOException: Failed to execute the command:
> ./python3.6.8.zip/bin/python3 -c import pyflink;import
> os;print(os.path.join(os.path.abspath(os.path.dirname(pyflink.__file__)),
> 'bin'))
> output: Traceback (most recent call last):
>   File "", line 1, in 
> ModuleNotFoundError: No module named 'pyflink'
> at
> org.apache.flink.python.util.PythonEnvironmentManagerUtils.execute(PythonEnvironmentManagerUtils.java:211)
> at
> org.apache.flink.python.util.PythonEnvironmentManagerUtils.getPythonUdfRunnerScript(PythonEnvironmentManagerUtils.java:154)
> at
> org.apache.flink.python.env.beam.ProcessPythonEnvironmentManager.createEnvironment(ProcessPythonEnvironmentManager.java:156)
> at
> org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.createPythonExecutionEnvironment(BeamPythonFunctionRunner.java:398)
> at
> org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.lambda$open$0(BeamPythonFunctionRunner.java:246)
> at
> org.apache.flink.runtime.memory.MemoryManager.lambda$getSharedMemoryResourceForManagedMemory$5(MemoryManager.java:539)
>
>
> 请问环境变量如何设置才能正常运行


flink 1.15.3 已发布,但 maven repo 库中,暂无 1.15.3 的依赖

2022-11-25 Thread highfei2011


Pyflink提交

2022-11-25 Thread 程龙
在使用pyflink提交任务时,部署模式onyarn 
1 在不使用Map等算子下如下参数 能够提交成功 并且运行
.flink run  -ynm pytest   -m yarn-cluster  -pyclientexec ***/python3   -pyexec 
***/python3  -pyarch *** /python3.6.8.zip   -py  demo.py 


2 在使用到map算子时 提交没有问题,但是运行报错,报错日志如下:
.flink run  -ynm pytest   -m yarn-cluster  -pyclientexec ***/python3   -pyexec 
***/python3  -pyarch *** /python3.6.8.zip   -py  demo.py 
报错内容:
Caused by: java.io.IOException: Failed to execute the command: 
./python3.6.8.zip/bin/python3 -c import pyflink;import 
os;print(os.path.join(os.path.abspath(os.path.dirname(pyflink.__file__)), 
'bin'))
output: Traceback (most recent call last):
  File "", line 1, in 
ModuleNotFoundError: No module named 'pyflink'
at 
org.apache.flink.python.util.PythonEnvironmentManagerUtils.execute(PythonEnvironmentManagerUtils.java:211)
at 
org.apache.flink.python.util.PythonEnvironmentManagerUtils.getPythonUdfRunnerScript(PythonEnvironmentManagerUtils.java:154)
at 
org.apache.flink.python.env.beam.ProcessPythonEnvironmentManager.createEnvironment(ProcessPythonEnvironmentManager.java:156)
at 
org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.createPythonExecutionEnvironment(BeamPythonFunctionRunner.java:398)
at 
org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.lambda$open$0(BeamPythonFunctionRunner.java:246)
at 
org.apache.flink.runtime.memory.MemoryManager.lambda$getSharedMemoryResourceForManagedMemory$5(MemoryManager.java:539)


请问环境变量如何设置才能正常运行