Re: Flink 1.11.2运行一段时间后,会报ResourceManager leader changed to new address null的异常

2021-03-02 文章 yidan zhao
Mark下。这个问题我也遇到多次,看过一个xintognsongn的回复,由于网络、zk可用性等问题会导致。不够一般会自动恢复。

史 正超  于2020年12月7日周一 下午10:13写道:

> 8 个slot,8个并行度,jm是2G,tm配置的是8G,其它的任务配置是
> ```
> SET 'execution.checkpointing.interval' = '5min';
> SET 'execution.checkpointing.min-pause' = '10s';
> SET 'min.idle.state.retention.time' = '1d';
> SET 'max.idle.state.retention.time' = '25h';
> SET 'checkpoint.with.rocksdb' = 'true';
> set 'table.exec.mini-batch.enabled' = 'true';
> set 'table.exec.mini-batch.allow-latency' = '5s';
> set 'table.exec.mini-batch.size' = '5000';
>
> ```
>
> 2020-12-07 19:35:01
> org.apache.flink.util.FlinkException: ResourceManager leader changed to
> new address null
> at
> org.apache.flink.runtime.taskexecutor.TaskExecutor.notifyOfNewResourceManagerLeader(TaskExecutor.java:1093)
> at
> org.apache.flink.runtime.taskexecutor.TaskExecutor.access$800(TaskExecutor.java:173)
> at
> org.apache.flink.runtime.taskexecutor.TaskExecutor$ResourceManagerLeaderListener.lambda$notifyLeaderAddress$0(TaskExecutor.java:1816)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
> at akka.actor.ActorCell.invoke(ActorCell.scala:561)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
> at akka.dispatch.Mailbox.run(Mailbox.scala:225)
> at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at
> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
>
>


Flink启动后某个TM的某个slot不工作,看起来像是直接没任何通信。

2021-03-02 文章 yidan zhao
如题,日志:
2021-03-03 11:03:17,151 WARN org.apache.flink.runtime.util.HadoopUtils [] -
Could not find Hadoop configuration via any of the supported methods (Flink
configuration, environment variables).

2021-03-03 11:03:17,344 WARN org.apache.hadoop.util.NativeCodeLoader [] -
Unable to load native-hadoop library for your platform... using
builtin-java classes where applicable

2021-03-03 11:03:17,441 WARN org.apache.flink.runtime.util.HadoopUtils [] -
Could not find Hadoop configuration via any of the supported methods (Flink
configuration, environment variables).

2021-03-03 11:03:18,226 WARN
org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ClientCnxn [] -
SASL configuration failed: javax.security.auth.login.LoginException: No
JAAS configuration section named 'Client' was found in specified JAAS
configuration file:
'/home/work/antibotFlink/flink-1.12.0/tmp/jaas-1092430908919603833.conf'.
Will continue connection to Zookeeper server without SASL authentication,
if Zookeeper server allows it.

2021-03-03 11:03:18,227 ERROR
org.apache.flink.shaded.curator4.org.apache.curator.ConnectionState [] -
Authentication failed

2021-03-03 11:03:18,957 ERROR akka.remote.transport.netty.NettyTransport []
- failed to bind to /0.0.0.0:2027, shutting down Netty transport
2021-03-03 11:03:18,973 ERROR akka.remote.Remoting [] - Remoting system has
been terminated abrubtly. Attempting to shut down transports

如上,有个关于端口的报错,有人知道原因吗?
问题直接表现和影响是,我某个source的task无任何输出(此处的无输出包括任何数据,bytes
sent为0)。导致后续结点无watermark。进而反压永久=1(进而出现了一种之前就觉得很奇怪的场景:即反压到不工作,CPU都不再利用了。。。)。


???????????? flink run ?? client????????????????

2021-03-02 文章 ??????
Hi all.
flink runFlink??flink client?? $FLINK_HOME/log 

??flink 
run



1. env.log.dir ?? 
flink-conf.yaml -yD ?? -D 
it doesn't seem to work
2. flink version: 1.10


Thanks.

flink Application Native k8s使用oss作为backend日志偶尔报错

2021-03-02 文章 王 羽凡
版本:Flink 1.12.0
环境:Native Kubernetes
模式:Application Mode

描述:
Flink以Native Kubernetes Application模式运行在k8s时,使用filesystem 
OSS作为backend发现日志请求OSS报错。
当代码使用`source.setStartFromEarliest();`,启动job之后从头开始消费,运行过程正常,运行到最新点位后会出现以下报错,过一段时间或者重启job之后报错消失。
当代码使用`source.setStartFromLatest();`,启动job之后直接从最新点位开始消费,则不会出现此报错。
据观察请问是我哪里配置或者使用有问题么?

命令:

./bin/flink run-application \
--target kubernetes-application \
-Dkubernetes.cluster-id=demo \
-Dkubernetes.container.image=xx/xx/xx:2.0.16 \
-Dstate.backend=filesystem \
-Dstate.checkpoints.dir=oss://bucket/文件夹 \

-Dfs.oss.endpoint=oss-cn-beijing-internal.aliyuncs.com
 \
-Dfs.oss.accessKeyId=xx \
-Dfs.oss.accessKeySecret=xx \
local:///opt/flink/usrlib/my-flink-job.jar

报错日志:

2021-03-03 02:53:46,133 INFO  
org.apache.flink.streaming.connectors.pulsar.internal.PulsarMetadataReader [] - 
Committing offset 12701:1:-1:4 to topic 
TopicRange{topic=persistent://public/xx/, 
key-range=SerializableRange{range=[0, 65535]}}
2021/3/3 上午10:53:46 2021-03-03 02:53:46,140 INFO  
org.apache.flink.streaming.connectors.pulsar.internal.PulsarMetadataReader [] - 
Successfully committed offset 12701:1:-1:4 to topic 
TopicRange{topic=persistent://public/xx/, 
key-range=SerializableRange{range=[0, 65535]}}
2021/3/3 上午10:53:50 2021-03-03 02:53:50,899 INFO  
org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss  [] - 
[Server]Unable to execute HTTP request: Not Found
2021/3/3 上午10:53:50 [ErrorCode]: NoSuchKey
2021/3/3 上午10:53:50 [RequestId]: xx
2021/3/3 上午10:53:50 [HostId]: null
2021/3/3 上午10:53:50 2021-03-03 02:53:50,904 INFO  
org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss  [] - 
[Server]Unable to execute HTTP request: Not Found
2021/3/3 上午10:53:50 [ErrorCode]: NoSuchKey
2021/3/3 上午10:53:50 [RequestId]: xx
2021/3/3 上午10:53:50 [HostId]: null

kill进程pod重启或过一段时间后taskManager正常日志:

2021-03-03 03:18:21,602 INFO  
org.apache.flink.streaming.connectors.pulsar.internal.PulsarMetadataReader [] - 
Successfully committed offset 12716:7:-1:1 to topic 
TopicRange{topic=persistent://public/xx/, 
key-range=SerializableRange{range=[0, 65535]}}
2021/3/3 上午11:18:26 2021-03-03 03:18:26,573 INFO  
org.apache.flink.streaming.connectors.pulsar.internal.PulsarMetadataReader [] - 
Committing offset 12716:7:-1:1 to topic 
TopicRange{topic=persistent://public/xx/, 
key-range=SerializableRange{range=[0, 65535]}}
2021/3/3 上午11:18:26 2021-03-03 03:18:26,582 INFO  
org.apache.flink.streaming.connectors.pulsar.internal.PulsarMetadataReader [] - 
Successfully committed offset 12716:7:-1:1 to topic 
TopicRange{topic=persistent://public/xx/, 
key-range=SerializableRange{range=[0, 65535]}}
2021/3/3 上午11:18:31 2021-03-03 03:18:31,571 INFO  
org.apache.flink.streaming.connectors.pulsar.internal.PulsarMetadataReader [] - 
Committing offset 12716:7:-1:1 to topic 
TopicRange{topic=persistent://public/xx/, 
key-range=SerializableRange{range=[0, 65535]}}
2021/3/3 上午11:18:31 2021-03-03 03:18:31,580 INFO  
org.apache.flink.streaming.connectors.pulsar.internal.PulsarMetadataReader [] - 
Successfully committed offset 12716:7:-1:1 to topic 
TopicRange{topic=persistent://public/xx/, 
key-range=SerializableRange{range=[0, 65535]}}
2021/3/3 上午11:18:36 2021-03-03 03:18:36,633 INFO  
org.apache.flink.streaming.connectors.pulsar.internal.PulsarMetadataReader [] - 
Committing offset 12716:7:-1:1 to topic 
TopicRange{topic=persistent://public/xx/, 
key-range=SerializableRange{range=[0, 65535]}}
2021/3/3 上午11:18:36 2021-03-03 03:18:36,642 INFO  
org.apache.flink.streaming.connectors.pulsar.internal.PulsarMetadataReader [] - 
Successfully committed offset 12716:7:-1:1 to topic 
TopicRange{topic=persistent://public/xx/, 
key-range=SerializableRange{range=[0, 65535]}}

oss内文件:
[cid:AD54B42B-CDD0-4EAC-93A8-BCD11C8D1904]
chk-10880目录:
[cid:FD31EF7D-6D35-4B26-9A2A-FA13170EBB70]


Re: UDF 重复调用的问题、

2021-03-02 文章 Benchao Li
我没有搜到相关的issue,所以我先建了一个issue[1]。
这个优化相对来说影响比较大,需要仔细的设计和权衡,所以在社区推进的速度
可能没有办法保证,大家感兴趣的可以在issue里去讨论。

[1] https://issues.apache.org/jira/browse/FLINK-21573

Qishang  于2021年3月3日周三 上午11:03写道:

> Hi Benchao.
>
> 现在的场景是UDF中去查询外部存储,数据量不大,但是执行多次还是在一个算子里串行的。算子耗时就会变成调用次数的倍数了。 这个影响就有点严重了。
> 这个 feature 社区有规划了吗?
>
>
> Benchao Li  于2021年3月3日周三 上午10:23写道:
>
> > 当前的确是还没有表达式复用的优化,所以表达式最终都是会重复执行的。
> > 这个应该是未来要优化的一个点,我们内部也是刚刚做了这个feature。
> >
> > 这个没有复用不只是在SQL里面看到的多少次,就会执行多少次,而是在
> > plan的过程中会将表达式完全展开,比如下面的SQL:
> > ```SQL
> > SELECT my_map['key1'] as key1, my_map['key2'] as key2, my_map['key3'] as
> > key3
> > FROM (
> >   SELECT dump_json_to_map(col1) as my_map
> >   FROM T
> > )
> > ```
> > 这种写法也会将`dump_json_to_map`这个函数执行3次。
> >
> > HunterXHunter <1356469...@qq.com> 于2021年3月3日周三 上午9:43写道:
> >
> > > 为什么4次是没问题的,感觉只执行一次才是最优的
> > >
> > >
> > >
> > > --
> > > Sent from: http://apache-flink.147419.n8.nabble.com/
> >
> >
> >
> > --
> >
> > Best,
> > Benchao Li
> >
>


-- 

Best,
Benchao Li


Re: Re: flink 1.12.2-rc2 被挖矿

2021-03-02 文章 Yang Wang
根本原因还是因为你把8081端口暴露在了公网上面,并且Flink的rest endpoint默认是没有鉴权的
所以可以任意提交jar进行运行

你应该用的是session模式吧,application模式默认是把web提交任务关闭了的


Best,
Yang

Michael Ran  于2021年3月3日周三 上午11:03写道:

> 网络层面 不会直接到公网才对,是开了什么吧?
> 在 2021-03-02 13:04:41,"macdoor"  写道:
> >我不是安全专家,不知道如何才能确认是 flink 的问题,但从现象看跟之前 flink 1.10
> >遇到的问题非常类似,建议你们能有这方面的测试用例,也能把测试结果提供出来
> >
> >
> >
> >--
> >Sent from: http://apache-flink.147419.n8.nabble.com/
>


????

2021-03-02 文章 Presley


Re:回复:编译Flink1.11的flink-runtime-web失败

2021-03-02 文章 Michael Ran



flink-test-utils-junit 单独编译下。 缺什么编译什么就行














在 2021-03-03 10:57:27,"Natasha"  写道:
>hi Michael,
>我拉取flink 1.11 realse分支后,可以看到flink-runtime-web中的版本就是1.11-SNAPSHOT。
>
>
>
>
>
>--原始邮件--
>发件人:   
> "user-zh" 
>   
>发送时间:2021年3月3日(星期三) 上午10:50
>收件人:"user-zh"
>主题:Re:编译Flink1.11的flink-runtime-web失败
>
>
>
>为什么还会依赖 -SNAPSHOT 的jar。不是release 的 的版本吗?
>
>
>
>在 2021-03-03 10:34:23,"Natasha" 
>hi,all
>我在编译Flink1.11,由于每次到flink-runtime-web都失败,于是我cd flink-runtime-web进行单独编译,发现
>Cannot resolve org.apache.flink:flink-test-utils-junit:1.11-SNAPSHOT,
>Cannot resolve org.apache.flink:flink-test-utils_2.11:1.11-SNAPSHOT
>依赖一直无法下载下来。请问有好的解决方法吗?
> 感谢你们在百忙之中看到我的邮件!


Re: UDF 重复调用的问题、

2021-03-02 文章 Qishang
Hi Benchao.

现在的场景是UDF中去查询外部存储,数据量不大,但是执行多次还是在一个算子里串行的。算子耗时就会变成调用次数的倍数了。 这个影响就有点严重了。
这个 feature 社区有规划了吗?


Benchao Li  于2021年3月3日周三 上午10:23写道:

> 当前的确是还没有表达式复用的优化,所以表达式最终都是会重复执行的。
> 这个应该是未来要优化的一个点,我们内部也是刚刚做了这个feature。
>
> 这个没有复用不只是在SQL里面看到的多少次,就会执行多少次,而是在
> plan的过程中会将表达式完全展开,比如下面的SQL:
> ```SQL
> SELECT my_map['key1'] as key1, my_map['key2'] as key2, my_map['key3'] as
> key3
> FROM (
>   SELECT dump_json_to_map(col1) as my_map
>   FROM T
> )
> ```
> 这种写法也会将`dump_json_to_map`这个函数执行3次。
>
> HunterXHunter <1356469...@qq.com> 于2021年3月3日周三 上午9:43写道:
>
> > 为什么4次是没问题的,感觉只执行一次才是最优的
> >
> >
> >
> > --
> > Sent from: http://apache-flink.147419.n8.nabble.com/
>
>
>
> --
>
> Best,
> Benchao Li
>


Re:Re: flink 1.12.2-rc2 被挖矿

2021-03-02 文章 Michael Ran
网络层面 不会直接到公网才对,是开了什么吧?
在 2021-03-02 13:04:41,"macdoor"  写道:
>我不是安全专家,不知道如何才能确认是 flink 的问题,但从现象看跟之前 flink 1.10
>遇到的问题非常类似,建议你们能有这方面的测试用例,也能把测试结果提供出来
>
>
>
>--
>Sent from: http://apache-flink.147419.n8.nabble.com/


??????????Flink1.11??flink-runtime-web????

2021-03-02 文章 Natasha
hi Michael,
??flink 1.11 
realseflink-runtime-web1.11-SNAPSHOT??





----
??: 
   "user-zh"



Re: 理性规律异常,停止任务后,再提交会导致JobManager进程失败。

2021-03-02 文章 yidan zhao
这个问题和我之前反馈的另一个问题估计也有关系。实际表现还有个特点:
即提交任务后,任务会处于initialize阶段较长时间,并且WEB-UI开始卡顿转圈无法展示具体状态。然后过一会恢复(此期间某JM进程会失败自动重启(我们这边的脚本机制))。这是某种表现,还有一种是处于initialize阶段较长时间后,恢复之后会出现多个一模一样的处于innitialize阶段的任务(从web-ui界面看到),然后陆续减少到1个,最终只有1个成功运行处于running状态。

yidan zhao  于2021年3月3日周三 上午10:52写道:

> 如题,standalone,1.12。
> 目前感觉不像是停止任务或启动任务本身问题。看起来像是这俩操作导致JM的压力大什么的。然后报错异常如下:
>
> 2021-03-03 10:14:51,298 ERROR
> org.apache.flink.runtime.util.FatalExitExceptionHandler [] - FATAL:
> Thread 'cluster-io-thread-3' produced an uncaught exception. Stopping the
> process...
>
> java.util.concurrent.RejectedExecutionException: Task
> java.util.concurrent.ScheduledThreadPoolExecutor
> $ScheduledFutureTask@422cfccb rejected from
> java.util.concurrent.ScheduledThreadPoolExecutor@6709b3f5[Terminated,
> pool size = 0, active threads = 0, queued tasks = 0, completed tasks =
> 2304]
> at
> java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
> ~[?:1.8.0_251]
> at
> java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
> ~[?:1.8.0_251]
> at
> java.util.concurrent.ScheduledThreadPoolExecutor.delayedExecute(ScheduledThreadPoolExecutor.java:326)
> ~[?:1.8.0_251]
> at
> java.util.concurrent.ScheduledThreadPoolExecutor.schedule(ScheduledThreadPoolExecutor.java:533)
> ~[?:1.8.0_251]
> at
> java.util.concurrent.ScheduledThreadPoolExecutor.execute(ScheduledThreadPoolExecutor.java:622)
> ~[?:1.8.0_251]
> at
> java.util.concurrent.Executors$DelegatedExecutorService.execute(Executors.java:668)
> ~[?:1.8.0_251]
> at
> org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter.execute(ScheduledExecutorServiceAdapter.java:62)
> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
> at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.scheduleTriggerRequest(CheckpointCoordinator.java:1152)
> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
> at
> org.apache.flink.runtime.checkpoint.CheckpointsCleaner.lambda$cleanCheckpoint$0(CheckpointsCleaner.java:58)
> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> ~[?:1.8.0_251]
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> ~[?:1.8.0_251]
> at java.lang.Thread.run(Thread.java:748) [?:1.8.0_251]
>
>
>


??????????Flink1.11??flink-runtime-web????

2021-03-02 文章 Natasha
hi Michael,
??flink 1.11 
realseflink-runtime-web1.11-SNAPSHOT??







----
??: 
   "user-zh"



理性规律异常,停止任务后,再提交会导致JobManager进程失败。

2021-03-02 文章 yidan zhao
如题,standalone,1.12。
目前感觉不像是停止任务或启动任务本身问题。看起来像是这俩操作导致JM的压力大什么的。然后报错异常如下:

2021-03-03 10:14:51,298 ERROR
org.apache.flink.runtime.util.FatalExitExceptionHandler [] - FATAL: Thread
'cluster-io-thread-3' produced an uncaught exception. Stopping the
process...

java.util.concurrent.RejectedExecutionException: Task
java.util.concurrent.ScheduledThreadPoolExecutor
$ScheduledFutureTask@422cfccb rejected from
java.util.concurrent.ScheduledThreadPoolExecutor@6709b3f5[Terminated, pool
size = 0, active threads = 0, queued tasks = 0, completed tasks = 2304]
at
java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
~[?:1.8.0_251]
at
java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
~[?:1.8.0_251]
at
java.util.concurrent.ScheduledThreadPoolExecutor.delayedExecute(ScheduledThreadPoolExecutor.java:326)
~[?:1.8.0_251]
at
java.util.concurrent.ScheduledThreadPoolExecutor.schedule(ScheduledThreadPoolExecutor.java:533)
~[?:1.8.0_251]
at
java.util.concurrent.ScheduledThreadPoolExecutor.execute(ScheduledThreadPoolExecutor.java:622)
~[?:1.8.0_251]
at
java.util.concurrent.Executors$DelegatedExecutorService.execute(Executors.java:668)
~[?:1.8.0_251]
at
org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter.execute(ScheduledExecutorServiceAdapter.java:62)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.scheduleTriggerRequest(CheckpointCoordinator.java:1152)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at
org.apache.flink.runtime.checkpoint.CheckpointsCleaner.lambda$cleanCheckpoint$0(CheckpointsCleaner.java:58)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
~[?:1.8.0_251]
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
~[?:1.8.0_251]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_251]


Re:????Flink1.11??flink-runtime-web????

2021-03-02 文章 Michael Ran
?? -SNAPSHOT ??jar??release ?? ??



?? 2021-03-03 10:34:23??"Natasha"  ??

hi??all
Flink1.11flink-runtime-web??cd 
flink-runtime-web??
Cannot resolve org.apache.flink:flink-test-utils-junit:1.11-SNAPSHOT??
Cannot resolve org.apache.flink:flink-test-utils_2.11:1.11-SNAPSHOT



????Flink1.11??flink-runtime-web????

2021-03-02 文章 Natasha
hi??all
Flink1.11flink-runtime-web??cdflink-runtime-web??
Cannot resolve org.apache.flink:flink-test-utils-junit:1.11-SNAPSHOT??
Cannot resolve org.apache.flink:flink-test-utils_2.11:1.11-SNAPSHOT

  

Re: UDF 重复调用的问题、

2021-03-02 文章 Benchao Li
当前的确是还没有表达式复用的优化,所以表达式最终都是会重复执行的。
这个应该是未来要优化的一个点,我们内部也是刚刚做了这个feature。

这个没有复用不只是在SQL里面看到的多少次,就会执行多少次,而是在
plan的过程中会将表达式完全展开,比如下面的SQL:
```SQL
SELECT my_map['key1'] as key1, my_map['key2'] as key2, my_map['key3'] as
key3
FROM (
  SELECT dump_json_to_map(col1) as my_map
  FROM T
)
```
这种写法也会将`dump_json_to_map`这个函数执行3次。

HunterXHunter <1356469...@qq.com> 于2021年3月3日周三 上午9:43写道:

> 为什么4次是没问题的,感觉只执行一次才是最优的
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/



-- 

Best,
Benchao Li


Re: UDF 重复调用的问题、

2021-03-02 文章 HunterXHunter
为什么4次是没问题的,感觉只执行一次才是最优的



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Reply:回复:Flink checkpoint 速度慢问题请教

2021-03-02 文章 smq
我之前遇到过ck时间长,是因为反压比较高,你可以看看有没有反压的情况



发自我的iPhone


-- 原始邮件 --
From: Jacob <17691150...@163.com
发送时间: 03/02/2021, 18:02
To: user-zh http://apache-flink.147419.n8.nabble.com/

Re: Flink 1.12 ApplicationMode运行在阿里云托管Kubernetes报错

2021-03-02 文章 王 羽凡
多谢,确实创建了一个LoadBalance service,报错是来自于此。

> 2021年3月1日 下午2:09,Yang Wang  写道:
> 
> 这个其实原因是阿里云的LoadBalancer探活机制不停的给Flink的rest endpoint发送RST导致的
> 目前有一个ticket来跟进这个问题[1],但还没有修复
> 
> 短时间内你可以通过log4j的配置将org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint
> 这个package的log level设置为WARN来暂时避免
> 
> [1]. https://issues.apache.org/jira/browse/FLINK-18129
> 
> 
> Best,
> Yang
> 
> 王 羽凡  于2021年3月1日周一 下午1:01写道:
> 
>> 使用Flink1.12 Application Mode在阿里云托管Kubernetes
>> ACK启动发现一些报错,同样的报错在自建Kubernetes集群中未发现。
>> 但是观察taskmanager容器有正常启动,后续任务也可正常执行,针对该报错需如何处理?是不兼容阿里云ACK集群么?
>> 
>> 启动命令:
>> ./bin/flink run-application \
>>--target kubernetes-application \
>>-Dkubernetes.cluster-id=demo \
>>-Dkubernetes.container.image=xx.xx.xx/xx/xxx:2.0.12 \
>>local:///opt/flink/usrlib/my-flink-job.jar
>> 
>> 日志:
>> 2021-03-01 04:52:06,518 INFO
>> org.apache.flink.client.deployment.application.executors.EmbeddedExecutor
>> [] - Job 6eb4027586e7137b20ecc8c3ce624417 is submitted.
>> 2021-03-01 04:52:06,518 INFO
>> org.apache.flink.client.deployment.application.executors.EmbeddedExecutor
>> [] - Submitting Job with JobId=6eb4027586e7137b20ecc8c3ce624417.
>> 2021-03-01 04:52:08,303 INFO
>> org.apache.flink.kubernetes.KubernetesResourceManagerDriver  [] - Recovered
>> 0 pods from previous attempts, current attempt id is 1.
>> 2021-03-01 04:52:08,303 INFO
>> org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] -
>> Recovered 0 workers from previous attempt.
>> 2021-03-01 04:52:08,306 INFO
>> org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] -
>> ResourceManager akka.tcp://flink@demo.default:6123/user/rpc/resourcemanager_0
>> was granted leadership with fencing token 
>> 2021-03-01 04:52:08,310 INFO
>> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl [] -
>> Starting the SlotManager.
>> 2021-03-01 04:52:08,596 WARN
>> org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] -
>> Unhandled exception
>> java.io.IOException: Connection reset by peer
>> at sun.nio.ch.FileDispatcherImpl.read0(Native Method) ~[?:1.8.0_275]
>> at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
>> ~[?:1.8.0_275]
>> at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) ~[?:1.8.0_275]
>> at sun.nio.ch.IOUtil.read(IOUtil.java:192) ~[?:1.8.0_275]
>> at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
>> ~[?:1.8.0_275]
>> at
>> org.apache.flink.shaded.netty4.io.netty.buffer.PooledByteBuf.setBytes(PooledByteBuf.java:253)
>> ~[flink-dist_2.12-1.12.0.jar:1.12.0]
>> at
>> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1133)
>> ~[flink-dist_2.12-1.12.0.jar:1.12.0]
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:350)
>> ~[flink-dist_2.12-1.12.0.jar:1.12.0]
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:148)
>> [flink-dist_2.12-1.12.0.jar:1.12.0]
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714)
>> [flink-dist_2.12-1.12.0.jar:1.12.0]
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
>> [flink-dist_2.12-1.12.0.jar:1.12.0]
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
>> [flink-dist_2.12-1.12.0.jar:1.12.0]
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
>> [flink-dist_2.12-1.12.0.jar:1.12.0]
>> at
>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
>> [flink-dist_2.12-1.12.0.jar:1.12.0]
>> at
>> org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
>> [flink-dist_2.12-1.12.0.jar:1.12.0]
>> at java.lang.Thread.run(Thread.java:748) [?:1.8.0_275]
>> 2021-03-01 04:52:08,596 WARN
>> org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] -
>> Unhandled exception
>> java.io.IOException: Connection reset by peer
>> at sun.nio.ch.FileDispatcherImpl.read0(Native Method) ~[?:1.8.0_275]
>> at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
>> ~[?:1.8.0_275]
>> at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) ~[?:1.8.0_275]
>> at sun.nio.ch.IOUtil.read(IOUtil.java:192) ~[?:1.8.0_275]
>> at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
>> ~[?:1.8.0_275]
>> at
>> org.apache.flink.shaded.netty4.io.netty.buffer.PooledByteBuf.setBytes(PooledByteBuf.java:253)
>> ~[flink-dist_2.12-1.12.0.jar:1.12.0]
>> at
>> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1133)
>> ~[flink-dist_2.12-1.12.0.jar:1.12.0]
>> at
>> 

Reply:回复:Flink checkpoint 速度慢问题请教

2021-03-02 文章 smq
我之前遇到过ck时间长,是因为反压比较高,你可以看看有没有反压的情况




发自我的iPhone


-- 原始邮件 --
From: Jacob <17691150...@163.com
发送时间: 03/02/2021, 18:02
To: user-zh http://apache-flink.147419.n8.nabble.com/

Reply:回复:Flink checkpoint 速度慢问题请教

2021-03-02 文章 smq
我之前遇到过ck时间长,是因为反压比较高,你可以看看有没有反压的情况




发自我的iPhone


-- 原始邮件 --
From: Jacob <17691150...@163.com
发送时间: 03/02/2021, 18:02
To: user-zh http://apache-flink.147419.n8.nabble.com/

Re: UDF 重复调用的问题、

2021-03-02 文章 Qishang
2. 是我搞错了,是四次,没问题


Qishang  于2021年3月2日周二 下午6:50写道:

> Hi 社区。
> 版本 : Flink 1.12.1
> 在Flink SQL 中调用自定义 UDF,UDF中调用了SDK的方法,复用的字段,发现SDK被重复调用了。
> e.g.
> INSERT INTO table_a
> SELECT
> update_time,
> MD5(p_key) AS id,
> p_key
> FROM
> (
> SELECT
> LOCALTIMESTAMP AS update_time ,
> findkeyudf(p_name) AS p_key
> FROM table_b
> ) T
> WHERE COALESCE(p_key, '')<> ''
> ;
>
> == Physical Execution Plan ==
> Stage 1 : Data Source
> content : Source: TableSourceScan(table=[[default_catalog,
> default_database, table_b]], fields=[p_name, xxx, ...])
>
> Stage 2 : Operator
> content : Calc(select=[CAST(()) AS update_date,
> CAST(MD5(findkeyudf(p_name))) AS comp_name, findkeyudf(p_name) AS p_key],
> where=[(findkeyudf(p_name) IS NOT NULL CASE (CAST(findkeyudf(p_name)) <>
> _UTF-16LE'':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") CASE false)])
> ship_strategy : FORWARD
>
> Stage 3 : Data Sink
> content : Sink: Sink(table=[default_catalog.default_database.table_a],
> fields=[update_date, comp_name, p_key])
> ship_strategy : FORWARD
>
> 查看 explain,  udf 调用有四次,但是从日志发现同一个Key 执行了 8次。
>
> 现在有2个问题:
> 1. udf 调用不会被优化成一次,结果复用吗?
> 2. 查看 explain,不应该是四次吗,执行了八次有点不理解,没有加 过滤条件( WHERE COALESCE(p_key, '')<> ''
> )是执行了2次的。
> 3. 顺便问下,JDBC 维表异步Join的 Feature 有对应的规划吗?
>
>
>


UDF 重复调用的问题、

2021-03-02 文章 Qishang
Hi 社区。
版本 : Flink 1.12.1
在Flink SQL 中调用自定义 UDF,UDF中调用了SDK的方法,复用的字段,发现SDK被重复调用了。
e.g.
INSERT INTO table_a
SELECT
update_time,
MD5(p_key) AS id,
p_key
FROM
(
SELECT
LOCALTIMESTAMP AS update_time ,
findkeyudf(p_name) AS p_key
FROM table_b
) T
WHERE COALESCE(p_key, '')<> ''
;

== Physical Execution Plan ==
Stage 1 : Data Source
content : Source: TableSourceScan(table=[[default_catalog,
default_database, table_b]], fields=[p_name, xxx, ...])

Stage 2 : Operator
content : Calc(select=[CAST(()) AS update_date,
CAST(MD5(findkeyudf(p_name))) AS comp_name, findkeyudf(p_name) AS p_key],
where=[(findkeyudf(p_name) IS NOT NULL CASE (CAST(findkeyudf(p_name)) <>
_UTF-16LE'':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") CASE false)])
ship_strategy : FORWARD

Stage 3 : Data Sink
content : Sink: Sink(table=[default_catalog.default_database.table_a],
fields=[update_date, comp_name, p_key])
ship_strategy : FORWARD

查看 explain,  udf 调用有四次,但是从日志发现同一个Key 执行了 8次。

现在有2个问题:
1. udf 调用不会被优化成一次,结果复用吗?
2. 查看 explain,不应该是四次吗,执行了八次有点不理解,没有加 过滤条件( WHERE COALESCE(p_key, '')<> ''
)是执行了2次的。
3. 顺便问下,JDBC 维表异步Join的 Feature 有对应的规划吗?


Reply:回复:Flink checkpoint 速度慢问题请教

2021-03-02 文章 smq
我之前遇到过ck 时间长,是因为反压比较高,你可以看看有没有反压的情况







-- 原始邮件 --
From: Jacob <17691150...@163.com
发送时间: 03/02/2021, 18:02
To: user-zh http://apache-flink.147419.n8.nabble.com/

Re: 回复:Flink checkpoint 速度慢问题请教

2021-03-02 文章 Jacob
谢谢回复

看了数次checkpoint慢的情况,发现大多是async阶段耗时,如果是这样,那这应该是那个时刻网络原因导致的慢吧?
但是我还是觉得跟磁盘有一定关系



-
Thanks!
Jacob
--
Sent from: http://apache-flink.147419.n8.nabble.com/

flink on kubernetes

2021-03-02 文章 Robert.Zhang
Hello,all


??flink on kubernetes ??standalone??session??
??standalonehadoop??uber??lib??hadoop
 dependency



hdfs




??




Best regards

Re: 回复:Flink checkpoint 速度慢问题请教

2021-03-02 文章 tison
Hi Jacob,

能通过日志或监控判断是 checkpoint 时 snapshot 的 sync 阶段慢,还是 async 阶段慢,还是上传到 HDFS
时间长或是其他阶段的瓶颈吗?

几十 KB 的状态慢很可能是某个步骤出故障卡住了。

Best,
tison.


yidan zhao  于2021年3月2日周二 下午3:58写道:

> 我比较奇怪的是再慢的磁盘,对于几十KB的状态也不至于“慢”吧。
>
> Jacob <17691150...@163.com> 于2021年3月2日周二 上午10:34写道:
>
> > 谢谢回复
> >
> > 我用的是filesystem,
> > 相关配置如下:
> >
> >
> > state.backend: filesystem
> > state.checkpoints.dir: hdfs://nameservice1/datafeed/prd/flink_checkpoint
> > state.savepoints.dir: hdfs://nameservice1/datafeed/prd/flink_checkpoint
> > state.backend.incremental: false
> > state.backend.fs.memory-threshold: 1024
> > state.checkpoints.num-retained: 3
> > restart-strategy: fixed-delay
> > restart-strategy.fixed-delay.attempts: 1000
> > restart-strategy.fixed-delay.delay: 30 s
> >
> >
> >
> > 后面把上面配置注释掉,然后在代码中指定了checkpoint类型为内存,但速度还是很慢。
> >
> >
> >
> > -
> > Thanks!
> > Jacob
> > --
> > Sent from: http://apache-flink.147419.n8.nabble.com/
> >
>