Re: Flink sql从ck恢复,统计数据波动问题

2022-10-10 文章 Congxian Qiu
Hi
可以的话也同步下相关的计算逻辑,从 checkpoint 恢复后的统计结果可能会和计算逻辑有关
Best,
Congxian


Hangxiang Yu  于2022年10月10日周一 14:04写道:

> 是什么值下跌呢?哪个metric吗?
>
> On Mon, Oct 10, 2022 at 1:34 PM 天下五帝东  wrote:
>
> > Hi:
> > 各位大佬们,flink任务从状态恢复时,统计值有一个明显的下跌,等ck打完后,统计值才会上升,这种情况有什么好的解决方案吗?
>
>
>
> --
> Best,
> Hangxiang.
>


Re: flink作业生成保存点失败

2022-09-07 文章 Congxian Qiu
Hi

有 savepoint/checkpoint 失败时的具体 jobmanager log 以及失败 task 对应的 taskmanager log
的话可以发一下,大家帮助看一下

Best,
Congxian


Xuyang  于2022年8月30日周二 23:18写道:

>
> Hi,看起来这个报错是用于输出信息的文件找不到了,可以尝试加一下这个配置再试一下“taskmanager.log.path”,找一下导致tasks超时的根本原因。
> 还可以试一下用火焰图或jstack查看一下那几个tasks超时的时候是卡在哪个方法上。
>
>
>
>
>
>
>
>
>
>
> --
>
> Best!
> Xuyang
>
>
>
>
>
>
> Hi,看起来这个报错是用于输出信息的文件找不到了,可以尝试加一下这个配置再试一下“taskmanager.log.path”,找一下导致tasks超时的根本原因。还可以试一下用火焰图或jstack查看一下那几个tasks超时的时候是卡在哪个方法上。
> 在 2022-08-29 16:19:15,"casel.chen"  写道:
>
> >有一个线上flink作业在人为主动创建保存点时失败,作业有两个算子:从kafka读取数据和写到mongodb,都是48个并行度,出错后查看到写mongodb算子一共48个task,完成了45个,还有3个tasks超时(超时时长设为3分钟),正常情况下完成一次checkpoint要4秒,状态大小只有23.7kb。出错后,查看作业日志如下。在创建保存点失败后作业周期性的检查点生成也都失败了(每个算子各有3个tasks超时)。使用的是FileStateBackend,DFS用的是阿里云oss。请问出错会是因为什么原因造成的?
> >
> >
> >+5
> >[2022-08-29 15:38:32]
> >content:
> >2022-08-29 15:38:32,617 ERROR
> org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerStdoutFileHandler
> [] - Failed to transfer file from TaskExecutor
> sqrc-session-prod-taskmanager-1-30.
> >+6
> >[2022-08-29 15:38:32]
> >content:
> >java.util.concurrent.CompletionException:
> org.apache.flink.util.FlinkException: The file STDOUT does not exist on the
> TaskExecutor.
> >+7
> >[2022-08-29 15:38:32]
> >content:
> >at
> org.apache.flink.runtime.taskexecutor.TaskExecutor.lambda$requestFileUploadByFilePath$24(TaskExecutor.java:2064)
> ~[flink-dist_2.12-1.13.2.jar:1.13.2]
> >+8
> >[2022-08-29 15:38:32]
> >content:
> >at
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
> ~[?:1.8.0_312]
> >+9
> >[2022-08-29 15:38:32]
> >content:
> >at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> ~[?:1.8.0_312]
> >+10
> >[2022-08-29 15:38:32]
> >content:
> >at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> ~[?:1.8.0_312]
> >+11
> >[2022-08-29 15:38:32]
> >content:
> >at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_312]
> >+12
> >[2022-08-29 15:38:32]
> >content:
> >Caused by: org.apache.flink.util.FlinkException: The file STDOUT does not
> exist on the TaskExecutor.
> >+13
> >[2022-08-29 15:38:32]
> >content:
> >... 5 more
> >+14
> >[2022-08-29 15:38:32]
> >content:
> >2022-08-29 15:38:32,617 ERROR
> org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerStdoutFileHandler
> [] - Unhandled exception.
> >+15
> >[2022-08-29 15:38:32]
> >content:
> >org.apache.flink.util.FlinkException: The file STDOUT does not exist on
> the TaskExecutor.
> >+16
> >[2022-08-29 15:38:32]
> >content:
> >at
> org.apache.flink.runtime.taskexecutor.TaskExecutor.lambda$requestFileUploadByFilePath$24(TaskExecutor.java:2064)
> ~[flink-dist_2.12-1.13.2.jar:1.13.2]
> >+17
> >[2022-08-29 15:38:32]
> >content:
> >at
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
> ~[?:1.8.0_312]
> >+18
> >[2022-08-29 15:38:32]
> >content:
> >at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> ~[?:1.8.0_312]
> >+19
> >[2022-08-29 15:38:32]
> >content:
> >at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> ~[?:1.8.0_312]
> >+20
> >[2022-08-29 15:38:32]
> >content:
> >at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_312]
>


Re: Flink 1.12.0 隔几个小时Checkpoint就会失败

2021-03-21 文章 Congxian Qiu
从日志看 checkpoint 超时了,可以尝试看一下是哪个算子的哪个并发没有做完 checkpoint,可以看看这篇文章[1] 能否帮助你

[1] https://www.infoq.cn/article/g8ylv3i2akmmzgccz8ku
Best,
Congxian


Frost Wong  于2021年3月18日周四 下午12:28写道:

> 哦哦,我看到了有个
>
> setTolerableCheckpointFailureNumber
>
> 之前不知道有这个方法,倒是可以试一下,不过我就是不太理解为什么会失败,也没有任何报错
> 
> 发件人: yidan zhao 
> 发送时间: 2021年3月18日 3:47
> 收件人: user-zh 
> 主题: Re: Flink 1.12.0 隔几个小时Checkpoint就会失败
>
> 设置下检查点失败不影响任务呀,你这貌似还导致任务重启了?
>
> Frost Wong  于2021年3月18日周四 上午10:38写道:
>
> > Hi 大家好
> >
> > 我用的Flink on yarn模式运行的一个任务,每隔几个小时就会出现一次错误
> >
> > 2021-03-18 08:52:37,019 INFO
> > org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] -
> Completed
> > checkpoint 661818 for job 4fa72fc414f53e5ee062f9fbd5a2f4d5 (562357 bytes
> in
> > 4699 ms).
> > 2021-03-18 08:52:37,637 INFO
> > org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] -
> > Triggering checkpoint 661819 (type=CHECKPOINT) @ 1616028757520 for job
> > 4fa72fc414f53e5ee062f9fbd5a2f4d5.
> > 2021-03-18 08:52:42,956 INFO
> > org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] -
> Completed
> > checkpoint 661819 for job 4fa72fc414f53e5ee062f9fbd5a2f4d5 (2233389 bytes
> > in 4939 ms).
> > 2021-03-18 08:52:43,528 INFO
> > org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] -
> > Triggering checkpoint 661820 (type=CHECKPOINT) @ 1616028763457 for job
> > 4fa72fc414f53e5ee062f9fbd5a2f4d5.
> > 2021-03-18 09:12:43,528 INFO
> > org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] -
> > Checkpoint 661820 of job 4fa72fc414f53e5ee062f9fbd5a2f4d5 expired before
> > completing.
> > 2021-03-18 09:12:43,615 INFO
> > org.apache.flink.runtime.jobmaster.JobMaster [] - Trying
> to
> > recover from a global failure.
> > org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint
> tolerable
> > failure threshold.
> > at
> >
> org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleCheckpointException(CheckpointFailureManager.java:90)
> > ~[flink-dist_2.12-1.12.0.jar:1.12.0]
> > at
> >
> org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleJobLevelCheckpointException(CheckpointFailureManager.java:65)
> > ~[flink-dist_2.12-1.12.0.jar:1.12.0]
> > at
> >
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:1760)
> > ~[flink-dist_2.12-1.12.0.jar:1.12.0]
> > at
> >
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:1733)
> > ~[flink-dist_2.12-1.12.0.jar:1.12.0]
> > at
> >
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.access$600(CheckpointCoordinator.java:93)
> > ~[flink-dist_2.12-1.12.0.jar:1.12.0]
> > at
> >
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator$CheckpointCanceller.run(CheckpointCoordinator.java:1870)
> > ~[flink-dist_2.12-1.12.0.jar:1.12.0]
> > at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> > ~[?:1.8.0_231]
> > at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> ~[?:1.8.0_231]
> > at
> >
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
> > ~[?:1.8.0_231]
> > at
> >
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
> > ~[?:1.8.0_231]
> > at
> >
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> > ~[?:1.8.0_231]
> > at
> >
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> > ~[?:1.8.0_231]
> > at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_231]
> > 2021-03-18 09:12:43,618 INFO
> > org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Job
> > csmonitor_comment_strategy (4fa72fc414f53e5ee062f9fbd5a2f4d5) switched
> from
> > state RUNNING to RESTARTING.
> > 2021-03-18 09:12:43,619 INFO
> > org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Flat
> Map
> > (43/256) (18dec1f23b95f741f5266594621971d5) switched from RUNNING to
> > CANCELING.
> > 2021-03-18 09:12:43,622 INFO
> > org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Flat
> Map
> > (44/256) (3f2ec60b2f3042ceea6e1d660c78d3d7) switched from RUNNING to
> > CANCELING.
> > 2021-03-18 09:12:43,622 INFO
> > org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Flat
> Map
> > (45/256) (66d411c2266ab025b69196dfec30d888) switched from RUNNING to
> > CANCELING.
> > 然后就自己恢复了。用的是Unaligned
> >
> Checkpoint,rocksdb存储后端,在这个错误前后也没有什么其他报错信息。从Checkpoint的metrics看,总是剩最后一个无法完成,调整过parallelism也无法解决问题。
> >
> > 谢谢大家!
> >
>


Re: flink1.12版本,使用yarn-application模式提交任务失败

2021-03-15 文章 Congxian Qiu
Hi
从你的日志看作业启动失败的原因是:
   Caused by: java.lang.IllegalArgumentException: Wrong FS:
   hdfs://xx/flink120/, expected: file:///
   看上去你设置的地址和 需要的 schema 不一样,你需要解决一下这个问题

Best,
Congxian


todd  于2021年3月15日周一 下午2:22写道:

> 通过脚本提交flink作业,提交命令:
> /bin/flink run-application -t yarn-application
> -Dyarn.provided.lib.dirs="hdfs://xx/flink120/" hdfs://xx/flink-example.jar
> --sqlFilePath   /xxx/kafka2print.sql
>
> flink使用的Lib及user jar已经上传到Hdfs路径,但是抛出以下错误:
> ---
>  The program finished with the following exception:
>
> org.apache.flink.client.deployment.ClusterDeploymentException: Couldn't
> deploy Yarn Application Cluster
> at
>
> org.apache.flink.yarn.YarnClusterDescriptor.deployApplicationCluster(YarnClusterDescriptor.java:465)
> at
>
> org.apache.flink.client.deployment.application.cli.ApplicationClusterDeployer.run(ApplicationClusterDeployer.java:67)
> at
>
> org.apache.flink.client.cli.CliFrontend.runApplication(CliFrontend.java:213)
> at
> org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1061)
> at
>
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1136)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at
>
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
> at
>
> org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> at
> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1136)
> Caused by: java.lang.IllegalArgumentException: Wrong FS:
> hdfs://xx/flink120/, expected: file:///
> at org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:648)
> at
>
> org.apache.hadoop.fs.RawLocalFileSystem.pathToFile(RawLocalFileSystem.java:82)
> at
>
> org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:606)
> at
>
> org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:824)
> at
>
> org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:601)
> at
>
> org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:428)
> at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1425)
> at
>
> org.apache.flink.yarn.YarnApplicationFileUploader.lambda$getAllFilesInProvidedLibDirs$2(YarnApplicationFileUploader.java:469)
> at
>
> org.apache.flink.util.function.FunctionUtils.lambda$uncheckedConsumer$3(FunctionUtils.java:93)
> at java.util.ArrayList.forEach(ArrayList.java:1257)
> at
>
> org.apache.flink.yarn.YarnApplicationFileUploader.getAllFilesInProvidedLibDirs(YarnApplicationFileUploader.java:466)
> at
>
> org.apache.flink.yarn.YarnApplicationFileUploader.(YarnApplicationFileUploader.java:106)
> at
>
> org.apache.flink.yarn.YarnApplicationFileUploader.from(YarnApplicationFileUploader.java:381)
> at
>
> org.apache.flink.yarn.YarnClusterDescriptor.startAppMaster(YarnClusterDescriptor.java:789)
> at
>
> org.apache.flink.yarn.YarnClusterDescriptor.deployInternal(YarnClusterDescriptor.java:592)
> at
>
> org.apache.flink.yarn.YarnClusterDescriptor.deployApplicationCluster(YarnClusterDescriptor.java:458)
> ... 9 more
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: flink-savepoint问题

2021-03-03 文章 Congxian Qiu
对于 keyed state,需要保证同一个 key 在 同一个 keygroup 中,如果是某个 key 有热点,可以在 keyby 之前进行一次
map(在 key 后面拼接一些 后缀),然后 keyby,最后处理完成之后,将这些进行聚合
Best,
Congxian


guomuhua <663021...@qq.com> 于2021年3月4日周四 下午12:49写道:

> 我也遇到类似情况,为了打散数据,keyby加了随机数。请问怎么正确打散数据呢?
> nobleyd wrote
> > 是不是使用了随机key。
>
> > guaishushu1103@
>
> >  
>
> > guaishushu1103@
>
> >  于2021年3月3日周三 下午6:53写道:> checkpoint 可以成功保存,但是savepoint出现错误:>
> > java.lang.Exception: Could not materialize checkpoint 2404 for operator>
> > KeyedProcess (21/48).> at>
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:1100)>
>
> > at>
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1042)>
>
> > at>
> >
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)>
>
> > at>
> >
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)>
>
> > at java.lang.Thread.run(Thread.java:745)> Caused by:
> > java.util.concurrent.ExecutionException:>
> > java.lang.IllegalArgumentException: Key group 0 is not in>
> > KeyGroupRange{startKeyGroup=54, endKeyGroup=55}.> at
> > java.util.concurrent.FutureTask.report(FutureTask.java:122)> at
> > java.util.concurrent.FutureTask.get(FutureTask.java:192)> at>
> >
> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:450)>
>
> > at> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.
>
> > (OperatorSnapshotFinalizer.java:47)> at>
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1011)>
>
> > ... 3 more> Caused by: java.lang.IllegalArgumentException: Key group 0 is
> > not in> KeyGroupRange{startKeyGroup=54, endKeyGroup=55}.> at>
> >
> org.apache.flink.runtime.state.KeyGroupRangeOffsets.computeKeyGroupIndex(KeyGroupRangeOffsets.java:142)>
>
> > at>
> >
> org.apache.flink.runtime.state.KeyGroupRangeOffsets.setKeyGroupOffset(KeyGroupRangeOffsets.java:104)>
>
> > at>
> >
> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeKVStateData(RocksFullSnapshotStrategy.java:314)>
>
> > at>
> >
> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeSnapshotToOutputStream(RocksFullSnapshotStrategy.java:256)>
>
> > at>
> >
> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.callInternal(RocksFullSnapshotStrategy.java:221)>
>
> > at>
> >
> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.callInternal(RocksFullSnapshotStrategy.java:174)>
>
> > at>
> >
> org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75)>
>
> > at java.util.concurrent.FutureTask.run(FutureTask.java:266)> at>
> >
> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:447)>
>
> > ... 5 more>>>
>
> > guaishushu1103@
>
> >>
>
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink做checkpoint失败 Checkpoint Coordinator is suspending.

2021-02-02 文章 Congxian Qiu
Hi
 你 flink 是什么版本,以及你作业 checkpoint/state 相关的配置是什么呢?如果可以的话,把完整的 jm log 发一下
Best,
Congxian


chen310 <1...@163.com> 于2021年2月1日周一 下午5:41写道:

> 补充下,jobmanager日志异常:
>
> 2021-02-01 08:54:43,639 ERROR
> org.apache.flink.runtime.rest.handler.job.JobDetailsHandler  [] - Exception
> occurred in REST handler: Job 65892aaedb8064e5743f04b54b5380df not found
> 2021-02-01 08:54:44,642 ERROR
> org.apache.flink.runtime.rest.handler.job.JobDetailsHandler  [] - Exception
> occurred in REST handler: Job 65892aaedb8064e5743f04b54b5380df not found
> 2021-02-01 08:54:45,644 ERROR
> org.apache.flink.runtime.rest.handler.job.JobDetailsHandler  [] - Exception
> occurred in REST handler: Job 65892aaedb8064e5743f04b54b5380df not found
> 2021-02-01 08:54:46,647 ERROR
> org.apache.flink.runtime.rest.handler.job.JobDetailsHandler  [] - Exception
> occurred in REST handler: Job 65892aaedb8064e5743f04b54b5380df not found
> 2021-02-01 08:54:47,649 ERROR
> org.apache.flink.runtime.rest.handler.job.JobDetailsHandler  [] - Exception
> occurred in REST handler: Job 65892aaedb8064e5743f04b54b5380df not found
> 2021-02-01 08:54:48,652 ERROR
> org.apache.flink.runtime.rest.handler.job.JobDetailsHandler  [] - Exception
> occurred in REST handler: Job 65892aaedb8064e5743f04b54b5380df not found
> 2021-02-01 08:54:49,655 ERROR
> org.apache.flink.runtime.rest.handler.job.JobDetailsHandler  [] - Exception
> occurred in REST handler: Job 65892aaedb8064e5743f04b54b5380df not found
> 2021-02-01 08:54:50,658 ERROR
> org.apache.flink.runtime.rest.handler.job.JobDetailsHandler  [] - Exception
> occurred in REST handler: Job 65892aaedb8064e5743f04b54b5380df not found
> 2021-02-01 08:54:50,921 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] -
> Triggering
> checkpoint 8697 (type=CHECKPOINT) @ 1612169690917 for job
> 1299f2f27e56ec36a4e0ffd3472ad399.
> 2021-02-01 08:54:50,999 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Decline
> checkpoint 8697 by task 320d2c162f17265435777bb65e1a8934 of job
> 1299f2f27e56ec36a4e0ffd3472ad399 at
> container_e21_1596002540781_1159_01_000134 @
> ip-10-120-83-22.ap-northeast-1.compute.internal (dataPort=42984).
> 2021-02-01 08:54:51,661 ERROR
> org.apache.flink.runtime.rest.handler.job.JobDetailsHandler  [] - Exception
> occurred in REST handler: Job 65892aaedb8064e5743f04b54b5380df not found
> 2021-02-01 08:54:52,654 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph   [] -
> GroupWindowAggregate(window=[SlidingGroupWindow('w$, requestDateTime,
> 180, 60)], properties=[w$start, w$end, w$rowtime, w$proctime],
> select=[COUNT(DISTINCT $f1) AS totalCount, start('w$) AS w$start, end('w$)
> AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime]) ->
> Calc(select=[(UNIX_TIMESTAMP((w$start DATE_FORMAT _UTF-16LE'-MM-dd
> HH:mm:ss')) * 1000) AS requestTime, totalCount]) (1/1)
> (6beee54a923323c369b046e199f572c4) switched from RUNNING to FAILED on
> org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot@379a8f9c.
> java.io.IOException: Could not perform checkpoint 8697 for operator
> GroupWindowAggregate(window=[SlidingGroupWindow('w$, requestDateTime,
> 180, 60)], properties=[w$start, w$end, w$rowtime, w$proctime],
> select=[COUNT(DISTINCT $f1) AS totalCount, start('w$) AS w$start, end('w$)
> AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime]) ->
> Calc(select=[(UNIX_TIMESTAMP((w$start DATE_FORMAT _UTF-16LE'-MM-dd
> HH:mm:ss')) * 1000) AS requestTime, totalCount]) (1/1).
> at
>
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:897)
> ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
> at
> org.apache.flink.streaming.runtime.io
> .CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:113)
> ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
> at
> org.apache.flink.streaming.runtime.io
> .CheckpointBarrierAligner.processBarrier(CheckpointBarrierAligner.java:137)
> ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
> at
> org.apache.flink.streaming.runtime.io
> .CheckpointedInputGate.pollNext(CheckpointedInputGate.java:93)
> ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
> at
> org.apache.flink.streaming.runtime.io
> .StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:158)
> ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
> at
> org.apache.flink.streaming.runtime.io
> .StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
> ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
> at
>
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:351)
> ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
> at
>
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191)
> ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
> at
>
> 

Re: Re: 请教个Flink checkpoint的问题

2021-01-17 文章 Congxian Qiu
Hi
你可以看一下是不是 作业的状态变成了 FINISH 了,现在 Flink 在 FINISH 的时候,会删除 checkpoint(就算设置了
retain on cancel 也会删除)
PS :FLINK-18263 正在讨论是否在 Job 变为 FINISH 状态后保留 checkpoint
[1] https://issues.apache.org/jira/browse/FLINK-18263
Best,
Congxian


yinghua...@163.com  于2021年1月15日周五 上午11:23写道:

> 感谢您的答复,今天我使用cancelWithSavepoint时也还是删除了,通过你的答复知道机制如此,我再使用另外的参数来观察下。
>
>
>
> yinghua...@163.com
>
> 发件人: Yun Tang
> 发送时间: 2021-01-15 11:02
> 收件人: user-zh
> 主题: Re: 回复: 请教个Flink checkpoint的问题
> Hi
>
> 这是因为在Flink-1.7.0 之后,savepoint也被当做是retained checkpoint了 [1],当你stop with
> savepoint 成功时,新的savepoint创建之后,旧的checkpoint因为默认retain
> checkpoint的数量为1而被subsume掉了,也就是被删掉了。
>
> 如果你还想保留之前的一个旧的checkpoint,可以将默认retain的checkpoint数目设置为2 [2]。
>
> 另外说一句,即使是已经deprecated的cancel with
> savepoint的用法,当新的savepoint创建成功后,旧的checkpoint在默认情况下也应该会被删除,除非增大retain的checkpoint数量。
>
>
> [1] https://issues.apache.org/jira/browse/FLINK-10354
> [2]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/checkpointing.html#state-checkpoints-num-retained
>
> 祝好
> 唐云
> 
> From: yinghua...@163.com 
> Sent: Thursday, January 14, 2021 19:00
> To: user-zh 
> Subject: 回复: 回复: 请教个Flink checkpoint的问题
>
> 好的,感谢您的回复!
>
>
>
> yinghua...@163.com
>
> 发件人: Evan
> 发送时间: 2021-01-14 18:48
> 收件人: user-zh
> 主题: 回复: 回复: 请教个Flink checkpoint的问题
> 是的,应该是机制问题,链接[1]打开有这样一句解释:
>
> If you choose to retain externalized checkpoints on cancellation you have
> to handle checkpoint clean up manually when you cancel the job as well
> (terminating with job status JobStatus#CANCELED).
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/deployment/config.html#execution-checkpointing-externalized-checkpoint-retention
>
> 如回答有误,请指正。
>
>
>
>
>
> 发件人: yinghua...@163.com
> 发送时间: 2021-01-14 18:02
> 收件人: user-zh
> 主题: 回复: 回复: 请教个Flink checkpoint的问题
> 代码如下:
> streamEnv.enableCheckpointing(5 * 60 * 1000);
> CheckpointConfig checkPointConfig = streamEnv.getCheckpointConfig();
> checkPointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
> checkPointConfig.setCheckpointTimeout(1 * 60 * 1000);
> checkPointConfig.setMinPauseBetweenCheckpoints((1 * 30 * 1000));
> checkPointConfig.setMaxConcurrentCheckpoints(1);
> checkPointConfig.setTolerableCheckpointFailureNumber(3);
> checkPointConfig
>
> .enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
>
> String checkpointPath = this.execOptions.get(CHECKPOINT_PATH);
> try {
>   StateBackend rocksDBStateBackend = new
> RocksDBStateBackend(checkpointPath);
>   streamEnv.setStateBackend(rocksDBStateBackend);
>
>
>
> yinghua...@163.com
> 发件人: Evan
> 发送时间: 2021-01-14 17:55
> 收件人: user-zh
> 主题: 回复: 请教个Flink checkpoint的问题
> 代码图挂掉了,看不到代码
> 发件人: yinghua...@163.com
> 发送时间: 2021-01-14 17:26
> 收件人: user-zh
> 主题: 请教个Flink checkpoint的问题
>
> 我在yarn上提交任务时,设置flink的checkpoint是5分钟一次,checkpoint使用RocksDBStateBackend保存在HDFS上且任务取消后不删除checkpoint,代码如下
>
> 现在我发现,当我停止任务时使用stopWithSavepoint发现任务停止后把最近一次的checkpoint信息给删除了?目前机制是这样的吗还是我使用有问题?是不是调用cancelWithSavepoint停止任务时就不会删除最近一次的checkpoint信息?
> yinghua...@163.com
>


Re: flink1.9.1单任务配置rocksDB不生效

2020-12-15 文章 Congxian Qiu
Hi
   state.backend 应该是你在 flink-conf 中设置了这个值。具体到你这里的情况,最终的配置是
RocksDB(以代码为准,如果代码没有设置会使用 flink-conf 中的文件)。你可以看看 TM 日志,应该可以看到更详细的信息
Best,
Congxian


bradyMk  于2020年12月15日周二 下午5:05写道:

> Hi,想请教大家一个问题,我用单任务配置使用rocksDB状态后端,代码如下:
>
> val backend = new RocksDBStateBackend(path, true)
> backend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED)
> env.setStateBackend(backend.asInstanceOf[StateBackend])
>
> 但是运行代码后,去webui查看Job Manager --> Configuration
> 中查看,发现state.backend还是显示filesystem
>
> 这是说明我的配置没有生效嘛?如果没有生效,那么如何进行单任务配置rocksDB呢?
>
>
>
> -
> Best Wishes
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: the remote task manager was lost

2020-12-02 文章 Congxian Qiu
可以看一下 remote task 对应的 tm 日志,看看有没有啥异常

Best,
Congxian


赵一旦  于2020年12月2日周三 下午6:17写道:

> 我都是80G、100G这么分配资源的。。。
>
> guanxianchun  于2020年10月28日周三 下午5:02写道:
>
> > flink版本: flink-1.11
> > taskmanager memory: 8G
> > jobmanager memory: 2G
> > akka.ask.timeout:20s
> > akka.retry-gate-closed-for: 5000
> > client.timeout:600s
> >
> > 运行一段时间后报the remote task manager was lost ,错误信息如下:
> > 2020-10-28 00:25:30,608 INFO
> > org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] -
> Completed
> > checkpoint 411 for job 031e5f122711786fcc11ee6eb47291fa (2703770 bytes in
> > 336 ms).
> > 2020-10-28 00:27:30,273 INFO
> > org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] -
> > Triggering
> > checkpoint 412 (type=CHECKPOINT) @ 1603816050239 for job
> > 031e5f122711786fcc11ee6eb47291fa.
> > 2020-10-28 00:27:30,776 INFO
> > org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] -
> Completed
> > checkpoint 412 for job 031e5f122711786fcc11ee6eb47291fa (3466688 bytes in
> > 509 ms).
> > 2020-10-28 00:29:30,246 INFO
> > org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] -
> > Triggering
> > checkpoint 413 (type=CHECKPOINT) @ 1603816170239 for job
> > 031e5f122711786fcc11ee6eb47291fa.
> > 2020-10-28 00:29:30,597 INFO
> > org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] -
> Completed
> > checkpoint 413 for job 031e5f122711786fcc11ee6eb47291fa (2752681 bytes in
> > 334 ms).
> > 2020-10-28 00:29:47,353 WARN  akka.remote.ReliableDeliverySupervisor
> >
> > [] - Association with remote system
> > [akka.tcp://fl...@hadoop01.dev.test.cn:13912] has failed, address is now
> > gated for [5000] ms. Reason: [Disassociated]
> > 2020-10-28 00:29:47,353 WARN  akka.remote.ReliableDeliverySupervisor
> >
> > [] - Association with remote system
> > [akka.tcp://flink-metr...@hadoop01.dev.test.cn:31260] has failed,
> address
> > is
> > now gated for [5000] ms. Reason: [Disassociated]
> > 2020-10-28 00:29:47,377 INFO
> > org.apache.flink.runtime.executiongraph.ExecutionGraph   [] -
> > KeyedProcess -> async wait operator -> Map (1/3)
> > (f84731e57528b326ad15ddc17821d1b8) switched from RUNNING to FAILED on
> > org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot@538198b8.
> > org.apache.flink.runtime.io
> > .network.netty.exception.RemoteTransportException:
> > Connection unexpectedly closed by remote task manager
> > 'hadoop01.dev.test.cn/192.168.1.21:7527'. This might indicate that the
> > remote task manager was lost.
> > at
> > org.apache.flink.runtime.io
> >
> .network.netty.CreditBasedPartitionRequestClientHandler.channelInactive(CreditBasedPartitionRequestClientHandler.java:144)
> > ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> > at
> >
> >
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:257)
> > ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> > at
> >
> >
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:243)
> > ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> > at
> >
> >
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:236)
> > ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> > at
> >
> >
> org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81)
> > ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> > at
> > org.apache.flink.runtime.io
> >
> .network.netty.NettyMessageClientDecoderDelegate.channelInactive(NettyMessageClientDecoderDelegate.java:97)
> > ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> > at
> >
> >
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:257)
> > ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> > at
> >
> >
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:243)
> > ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> > at
> >
> >
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:236)
> > ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> > at
> >
> >
> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1416)
> > ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> > at
> >
> >
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:257)
> > ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> > at
> >
> >
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:243)
> > ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> > at
> >
> >
> 

Re: Re: Re:回复:带有状态的算子保存checkpoint失败

2020-11-30 文章 Congxian Qiu
checkpoint 失败了可以看看 是超时了,还是有 task snapshot 失败了,可以从 JM log
中来发现。超时的话,可以看下是数据量大需要时间久,还是 timeout 啥的设置太短;异常的话可以从对应的 tm log 看下为啥 snapshot
失败了

Best,
Congxian


王默  于2020年11月27日周五 下午11:43写道:

> checkpoint失败是在web页面上发现的,您看下截图https://imgchr.com/i/Dr3PNn
> 看taskmanager日志确实没有超时,也没有其他异常
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-11-27 21:39:50,"赵一旦"  写道:
> >失败原因也不写,怎么个不能保存。。。超时?还是啥。
> >
> >魏积乾  于2020年11月27日周五 下午7:08写道:
> >
> >> flink-csv-1.11.2.jar
> >> flink-dist_2.11-1.11.2.jar
> >> flink-json-1.11.2.jar
> >> flink-shaded-zookeeper-3.4.14.jar
> >> flink-table_2.11-1.11.2.jar
> >> flink-table-blink_2.11-1.11.2.jar
> >> log4j-1.2-api-2.12.1.jar
> >> log4j-api-2.12.1.jar
> >> log4j-core-2.12.1.jar
> >> log4j-slf4j-impl-2.12.1.jar
> >> flink-metrics-prometheus_2.12-1.11.2.jar
> >>
> >> 按时间排了个序,这是最新的包。
> >>
> >>
> >>
> >> 发自我的iPhone
> >>
> >>
> >> -- 原始邮件 --
> >> 发件人: 王默  >> 发送时间: 2020年11月27日 18:41
> >> 收件人: user-zh  harry...@foxmail.com
> >> 
> >> 主题: 回复:Re:回复:带有状态的算子保存checkpoint失败
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >> 请问能具体告知是哪个包没升级吗?或者是否有什么包需要从opt拷贝到lib下
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >> 在 2020-11-27 17:34:39,"魏积乾"  >>
> 我也刚遇到过,是从1.10升级上来的,一个任务可以保存checkpoint,一个任务老是保存失败,然后查看了lib下面的jar,发现有些jar没有升级上来,于是更改了,配置文件还改了state.checkpoints.dir。
> >> 希望对你有帮助发自我的iPhone  
> --
> >> 原始邮件 -- 发件人: 王默  >> 发送时间: 2020年11月27日 17:22 收件人: user-zh  gt;
> >> 主题: 回复:带有状态的算子保存checkpoint失败
> >>
> >>
> >>
> >> 
>


Re: 退订

2020-11-24 文章 Congxian Qiu
Hi
   退订请发邮件到 user-zh-unsubscr...@flink.apache.org,详情可以参考文档[1]

[1]
https://flink.apache.org/community.html#how-to-subscribe-to-a-mailing-list
Best,
Congxian


回响 <939833...@qq.com> 于2020年11月24日周二 下午8:42写道:

>


Re: flink savepoint 异常

2020-11-09 文章 Congxian Qiu
Hi
 异常信息中有 “Failed to trigger savepoint. Failure reason: An Exception
occurred while triggering the checkpoint.”  或许你可以看看 JM 的日志,找一下看看有没有什么详细日志
Best,
Congxian


张锴  于2020年11月7日周六 下午4:14写道:

> 本人用flink 1.10.1版本进行savepoint时遇到下列错误,暂时不清楚错误的原因,特来寻求帮助,麻烦大佬们看看
>
> 已经排除反压和重启的原因,checkpoint超时设置了十分钟,conf配置增加客户端连接master的时间,但还是出现异常。
>
> 命令
>
> flink savepoint -yid application_1604456903594_2381
> fb8131bcb78cbdf2bb9a705d8a4ceebc
> hdfs:///hadoopnamenodeHA/flink/flink-savepoints
>
> 异常
>
> The program finished with the following exception:
>
> org.apache.flink.util.FlinkException: Triggering a savepoint for the job
> fb8131bcb78cbdf2bb9a705d8a4ceebc failed.
>  at
>
> org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:631)
>  at
>
> org.apache.flink.client.cli.CliFrontend.lambda$savepoint$9(CliFrontend.java:609)
>  at
>
> org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:841)
>  at org.apache.flink.client.cli.CliFrontend.savepoint(CliFrontend.java:606)
>  at
>
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:908)
>  at
>
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:966)
>  at java.security.AccessController.doPrivileged(Native Method)
>  at javax.security.auth.Subject.doAs(Subject.java:422)
>  at
>
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
>  at
>
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>  at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:966)
> Caused by: java.util.concurrent.CompletionException:
> java.util.concurrent.CompletionException:
> org.apache.flink.runtime.checkpoint.CheckpointException: Failed to trigger
> savepoint. Failure reason: An Exception occurred while triggering the
> checkpoint.
>  at
>
> org.apache.flink.runtime.scheduler.SchedulerBase.lambda$triggerSavepoint$3(SchedulerBase.java:756)
>  at
>
> java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822)
>  at
>
> java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797)
>  at
>
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
>  at
>
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402)
>  at
>
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195)
>  at
>
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
>  at
>
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
>  at akka.japi.pf.UnitCaseState
>


Re: 退订

2020-11-05 文章 Congxian Qiu
hi
退订请发邮件到  user-zh-unsubscr...@flink.apache.org,更多详情请参考[1]

[1] https://flink.apache.org/community.html#mailing-lists

Best,
Congxian


李郝 <13777597...@163.com> 于2020年11月5日周四 下午9:54写道:

> 退订


Re: flink savepoint

2020-11-05 文章 Congxian Qiu
Hi
 从 client 端日志,或者 JM 日志还能看到其他的异常么?
Best,
Congxian


张锴  于2020年11月6日周五 上午11:42写道:

> 重启和反压都正常
> 另外增加了从客户端到master的时间,还是有这个问题
>
> hailongwang <18868816...@163.com> 于 2020年11月6日周五 10:54写道:
>
> > Hi,
> >
> >
> > 这个报错只是在规定的时间内没有完成 Savepoint,导致客户端连接 Master 超时,
> > 具体的原因需要看下 Jobmaster 的日志。
> > PS:在任务一直重启、反压的情况下,一般 Savepoint 都会失败。
> >
> >
> > Best,
> > Hailong Wang
> >
> >
> >
> >
> > 在 2020-11-06 09:33:48,"张锴"  写道:
> > >本人在使用flink savepoint 保存快照的时候,遇到错误,目前不清楚是因为什么导致这个问题,路过的大佬帮忙看下。
> > >
> > >flink 版本1.10.1
> > >
> > >
> > >执行   flink savepoint a3a2e6c3a5a00bbe4c0c9e351dc58c47
> > >hdfs://hadoopnamenodeHA/flink/flink-savepoints
> > >
> > >
> > >出现错误信息
> > >
> > >
> > >org.apache.flink.util.FlinkException: Triggering a savepoint for the job
> > >a3a2e6c3a5a00bbe4c0c9e351dc58c47 failed.
> > >
> > > at
> >
> >
> >org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:631)
> > >
> > > at
> >
> >
> >org.apache.flink.client.cli.CliFrontend.lambda$savepoint$9(CliFrontend.java:609)
> > >
> > > at
> >
> >
> >org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:841)
> > >
> > > at
> > org.apache.flink.client.cli.CliFrontend.savepoint(CliFrontend.java:606)
> > >
> > > at
> >
> >
> >org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:908)
> > >
> > > at
> >
> >
> >org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:966)
> > >
> > > at java.security.AccessController.doPrivileged(Native Method)
> > >
> > > at javax.security.auth.Subject.doAs(Subject.java:422)
> > >
> > > at
> >
> >
> >org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
> > >
> > > at
> >
> >
> >org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> > >
> > > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:966)
> > >
> > >Caused by: java.util.concurrent.TimeoutException
> > >
> > > at
> >
> >
> >java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771)
> > >
> > > at
> > java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
> > >
> > > at
> >
> >
> >org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:625)
> >
>


Re: Re:Re: flink-1.10 checkpoint 偶尔报 NullPointerException

2020-11-03 文章 Congxian Qiu
Hi
这个问题看上去是特定 JDK 版本上,某些写法下对象被提前回收了,猜测和 gc 有关。之前看到一个可能相关的帖子[1]

[1] https://cloud.tencent.com/developer/news/564780

Best,
Congxian


蒋佳成(Jiacheng Jiang) <920334...@qq.com> 于2020年11月4日周三 上午11:33写道:

> hi,这个问题我也遇到了,这个问题的根本原因是啥呢?
>
>
>
> --原始邮件--
> 发件人: "chenkaibit" 发送时间: 2020年5月9日(星期六) 中午12:09
> 收件人: "user-zh" 主题: Re:Re:Re: flink-1.10 checkpoint 偶尔报 NullPointerException
>
>
>
> Hi:
> 加了一些日志后发现是checkpointMetaData为NULL了
> https://github.com/apache/flink/blob/release-1.10.0/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java#L1421
> 测试程序为读kafka,然后进行wordcount,结果写入kafka。checkpoint配置如下:
> |CheckpointingMode|ExactlyOnce|
> |Interval|5s|
> |Timeout|10m0s|
>
> |MinimumPauseBetweenCheckpoints|0ms|
> |MaximumConcurrentCheckpoints|1|
>
>
> 稳定在第5377个checkpoint抛出NPE
>
>
> 虽然原因还不清楚,但是修改了部分代码(见
> https://github.com/yuchuanchen/flink/commit/e5122d9787be1fee9bce141887e0d70c9b0a4f19
> )后不再出现NPE了。
>
>
> 在2020-04-2110:21:56,chenkaibit<
> chenkai...@163.com写道:
> 
> 
> 
> 这个不是稳定复现的,但是在最近1.10上测试的几个作业出现了,触发时也没有其他报错。我加了一些日志,再观察下
> 
> 
> 
> 
> 在2020-04-2101:12:48,YunTang<
> myas...@live.com写道:
> Hi
> 
> 这个NPE有点奇怪,从executeCheckpointing方法[1]里面其实比较难定位究竟是哪一个变量或者变量的取值是null。
>
> 一种排查思路是打开org.apache.flink.streaming.runtime.tasks的DEBUGlevel日志,通过debug日志缩小范围,判断哪个变量是null
> 
> 这个异常出现的时候,相关task上面的日志有什么异常么,触发这个NPE的条件是什么,稳定复现么?
> 
> [1]
> https://github.com/apache/flink/blob/aa4eb8f0c9ce74e6b92c3d9be5dc8e8cb536239d/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java#L1349
> 
> 
> ;
> 祝好
> 唐云
> 
> 
> From:chenkaibit Sent:Monday,April20,202018:39
> To:user-zh@flink.apache.org 
>
> Subject:flink-1.10checkpoint偶尔报NullPointerException
> 
>
> 大家遇到过这个错误吗,CheckpointOperation.executeCheckpointing的时候报NullPointerException
>
> java.lang.Exception:Couldnotperformcheckpoint5505foroperatorSource:KafkaTableSource(xxx)-SourceConversion(table=[xxx,source:[KafkaTableSource(xxx)]],fields=[xxx])-Calc(select=[xxx)ASxxx])-SinkConversionToTuple2-Sink:Elasticsearch6UpsertTableSink(xxx)(1/1).
> 
>
> atorg.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:802)
> 
>
> atorg.apache.flink.streaming.runtime.tasks.StreamTask.lambda$triggerCheckpointAsync$3(StreamTask.java:777)
> 
>
> atorg.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$228/1024478318.call(UnknownSource)
> 
>
> atjava.util.concurrent.FutureTask.run(FutureTask.java:266)
> 
>
> atorg.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.run(StreamTaskActionExecutor.java:87)
> 
>
> atorg.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78)
> 
>
> atorg.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:261)
> 
>
> atorg.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
> 
>
> atorg.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487)
> 
>
> atorg.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)
> 
>
> atorg.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
> 
>
> atorg.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
> 
>
> atjava.lang.Thread.run(Thread.java:745)
> 
> Causedby:java.lang.NullPointerException
> 
>
> atorg.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1411)
> 
>
> atorg.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:991)
> 
>
> atorg.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$5(StreamTask.java:887)
> 
>
> atorg.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$229/1010499540.run(UnknownSource)
> 
>
> atorg.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
> 
>
> atorg.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:860)
> 
>
> atorg.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:793)
> 
> ...12more


Re: 社区贡献求助

2020-11-02 文章 Congxian Qiu
Hi
 你可以参考一下这个文档[1] 以及文档内的链接,这里面有些如何贡献相关的内容
[1] https://flink.apache.org/contributing/how-to-contribute.html

Best,
Congxian


Xingbo Huang  于2020年11月3日周二 上午9:29写道:

> Hi,
> 需要committer才有权限进行assign的,你可以在JIRA下面ping一下对应模块的committer
> Best,
> Xingbo
>
> zihaodeng <284616...@qq.com> 于2020年11月3日周二 上午2:31写道:
>
> > 我想做社区贡献,但是我新建的JIRA不能assign(没有assign按钮),如下图:
> >
> > 是不是因为我缺少什么权限?谁能帮帮忙,告诉我应该怎么做?
> > Username:pezynd
> > Full name:ZiHaoDeng
> > mail:284616...@qq.com
> >
>


Re: 使用BroadcastStream后checkpoint失效

2020-11-02 文章 Congxian Qiu
Hi
我理解你的 BroadcastStream 也会定期的再次读取,然后更新对应的状态,这样的话,你的 source 可以一直在读取数据(run
函数),不退出即可。如果只希望读取一次话,是不是维表也可以满足你的需求呢?
Best,
Congxian


restart  于2020年11月3日周二 上午10:11写道:

> 问题:job在接入广播流后,checkpint失效。
>
> 描述:广播流的数据来源两个地方,一个是从mongo,一个是从kafka,数据进行union,同时指定Watermark,返回Watermark.MAX_WATERMARK(用于与主数据源connect后,窗口聚合水印更新),job部署后,来源mongo的数据源状态会变为FINISHED。网上有查过,说subtask
> 状态finished会导致checkpoint不触发,那如何既能满足数据源自定义(更像是DataSet),同时保证checkpoint正常触发呢
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Re: 退订

2020-11-02 文章 Congxian Qiu
Hi
 退订请发邮箱到 user-zh-unsubscr...@flink.apache.org  更详细的可以参考这个文档[1]

[1] https://flink.apache.org/community.html#mailing-lists

Best,
Congxian


196371551 <196371...@qq.com> 于2020年11月3日周二 下午2:08写道:

> 退订


Re: flink 1.11.2 keyby 更换partition

2020-11-01 文章 Congxian Qiu
Hi
自定义的 KeySelector[1] 能否满足呢?
[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/state/state.html#keyed-datastream

Best,
Congxian


Peihui He  于2020年11月2日周一 下午2:56写道:

> Hi,
>
> 不好意思,我这边误导。
> 现在的情况是这样的
>
> 用这个方法测试
> KeyGroupRangeAssignment.assignKeyToParallelOperator(asset_id,
> KeyGroupRangeAssignment .computeDefaultMaxParallelism( parallelism ),
> parallelism)
> 发现不是很均匀( job暂时并发度是103,这样的话默认maxParallelism 就256 了
> ),下游key的数量能够相差100多。后来通过设置 maxParallelism和parallelism相等比按默认的
> KeyGroupRangeAssignment .computeDefaultMaxParallelism( parallelism
> ) 这种方式好多了。
>
>
> 请问下后续会扩展keyby接口不?keyby 可以根据自定义partition,然后返回keyedstream。
>
>
> Best Wishes.
>
>
>
> Congxian Qiu  于2020年11月2日周一 下午1:52写道:
>
> > Hi
> > 不太明白你这里问题,如果是说按照 Md5 进行 keyby 不均匀,直接用 hashcode keyby 会更均匀的话,是不是直接把计算
> > md5 的逻辑改成计算 hashcode 的逻辑就行了
> > Best,
> > Congxian
> >
> >
> > Peihui He  于2020年11月2日周一 上午10:01写道:
> >
> > > hi,
> > >
> > > 已经尝试过了,通过partitionCustom 返回的是datastream,如果后面接cep的化,并发度就是1了,性能更不上。
> > >
> > > Best Wishes.
> > >
> > > Zhang Yuxiao  于2020年10月31日周六 上午9:38写道:
> > >
> > > > 你好,
> > > >
> > > > 你看看 DataStream 类中的 partitionCustom 方法是否能够符合你的需求?
> > > > 
> > > > 发件人: Peihui He 
> > > > 发送时间: 2020年10月30日 下午 07:23
> > > > 收件人: user-zh@flink.apache.org 
> > > > 主题: flink 1.11.2 keyby 更换partition
> > > >
> > > > hi,all
> > > >
> > > > 请问可以更改keyby的hash partition 不? 现在发现这个hash partition不能均匀的分配key。
> > > >
> > > >
> > >
> >
> KeyGroupRangeAssignment.assignKeyToParallelOperator(stringToMd5(asset_id),
> > > > 128, parallesism)
> > > >
> > > > 用这个方法测试,即使个asset_id 转换为MD5都不能均匀分配。
> > > >
> > > > 相反,用最最简单的 Math.abs(asset_id.hashcode() % parallesism ) 就可以平均的分配key。
> > > >
> > > >
> > > > Best Regards.
> > > >
> > >
> >
>


Re: flink 1.11.2 keyby 更换partition

2020-11-01 文章 Congxian Qiu
Hi
不太明白你这里问题,如果是说按照 Md5 进行 keyby 不均匀,直接用 hashcode keyby 会更均匀的话,是不是直接把计算
md5 的逻辑改成计算 hashcode 的逻辑就行了
Best,
Congxian


Peihui He  于2020年11月2日周一 上午10:01写道:

> hi,
>
> 已经尝试过了,通过partitionCustom 返回的是datastream,如果后面接cep的化,并发度就是1了,性能更不上。
>
> Best Wishes.
>
> Zhang Yuxiao  于2020年10月31日周六 上午9:38写道:
>
> > 你好,
> >
> > 你看看 DataStream 类中的 partitionCustom 方法是否能够符合你的需求?
> > 
> > 发件人: Peihui He 
> > 发送时间: 2020年10月30日 下午 07:23
> > 收件人: user-zh@flink.apache.org 
> > 主题: flink 1.11.2 keyby 更换partition
> >
> > hi,all
> >
> > 请问可以更改keyby的hash partition 不? 现在发现这个hash partition不能均匀的分配key。
> >
> >
> KeyGroupRangeAssignment.assignKeyToParallelOperator(stringToMd5(asset_id),
> > 128, parallesism)
> >
> > 用这个方法测试,即使个asset_id 转换为MD5都不能均匀分配。
> >
> > 相反,用最最简单的 Math.abs(asset_id.hashcode() % parallesism ) 就可以平均的分配key。
> >
> >
> > Best Regards.
> >
>


Re: 【咨询JM磁盘磁盘打满问题】

2020-11-01 文章 Congxian Qiu
Hi

如果是空间满,可以看看都是什么文件(什么目录,文件名是否有某些格式等),这些文件的存在是否合理,如果你觉得不合理可以发到社区邮件列表讨论,或者创建
issue 进行跟进。如果是合理的那就只能想办法删除一些不需要的文件了
Best,
Congxian


赵一旦  于2020年10月30日周五 下午5:51写道:

> 磁盘满看是什么东西导致满,然后清理就是了。比如是flink日志满?那就清理flink日志。
>
> void <2030531...@qq.com> 于2020年10月29日周四 下午6:56写道:
>
> > hi all
> >flink跑批任务使用datasetapi
> > 每10分钟提交一次导致jm的磁盘打满,请问各位有碰到这种情况的么? 定期清理jm的磁盘,cli 和 rest提交的方法清理的策略也得不同么?
> > 目录不在一个位置
> > 多谢
>


Re: flink任务挂掉后自动重启

2020-10-29 文章 Congxian Qiu
Hi
1 Flink 的 RestartStrategy[1] 可以解决你的问题吗?
2 从 checkpoint 恢复 这个,可以尝试记录每个作业最新的 checkpoint 地址,也可以在启动的时候从 hdfs 获取一下

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/task_failure_recovery.html
Best,
Congxian


bradyMk  于2020年10月30日周五 上午11:51写道:

>
> flink任务一般都是7*24h在跑的,如果挂掉,有没有什么办法自动重启任务?之前都是任务挂掉然后手动再提交一次任务,但是不可能每次挂掉都可以手动重启;另外,如果对于没做checkpoints的任务,可以通过定时脚本监控yarn,如果任务不存在,则重新提交任务,但是,对于做了checkpoints的任务,我们提交的时候就需要指定ck的目录,这个目录都是在变的,那么又该如何让任务挂掉后能自动重启呢?希望能得到大佬们的指点~
>
>
>
> -
> Best Wishes
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink state.savepoints.dir 目录配置问题

2020-10-27 文章 Congxian Qiu
Hi
   这个你可以尝试把这个信息记录到哪里,或者在启动的时候从这个 jobId 的目录下去查找所有的 chk-xxx 然后选择一个合适的 目录进行恢复
Best,
Congxian


marble.zh...@coinflex.com.INVALID 
于2020年10月27日周二 下午4:54写道:

> 刚钉钉群里建议我把路径指到jobId/chk-xx目录,这样就可以恢复了。
>
> 但是如果这样,这个xx随着checkpoint的变化而变化,这样怎么做到自动提交job?
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Re:无法从checkpoint中恢复state

2020-10-27 文章 Congxian Qiu
Hi
   从报错看,你知道的是一个目录,这个目录下面没有 _metadata 文件,这不是一个完整的 checkpoint/savepoint
因此不能用于恢复
Best,
Congxian


marble.zh...@coinflex.com.INVALID 
于2020年10月27日周二 下午4:06写道:

> /opt/flink/bin/flink run -d -s /opt/flink/savepoints -c
> com.xxx.flink.ohlc.kafka.OrderTickCandleView
> /home/service-ohlc-*-SNAPSHOT.jar
>
> 在启动job时,已经指定这个目录,但会报以下错,
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Could not
> instantiate JobManager.
> at
>
> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$6(Dispatcher.java:398)
> at
>
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
> ... 6 more
> Caused by: java.io.FileNotFoundException: Cannot find meta data file
> '_metadata' in directory '/opt/flink/savepoints'. Please try to load the
> checkpoint/savepoint directly from the metadata file instead of the
> directory.
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: 容忍checkpoint 失败次数和重启策略冲突吗

2020-10-26 文章 Congxian Qiu
Hi smq
   这两个东西不相互影响,理论上 checkpoint 允许失败次数这个只会导致 job fail,而重启策略则是在 job fail
的时候判断怎么继续,如果不符合预期,可以看一下 jm 的 log 或者分享一下 jm log 让大家帮忙看看
Best,
Congxian


smq <374060...@qq.com> 于2020年10月27日周二 上午11:25写道:

> 各位大佬好:
> 我现在设置容忍checkpoint失败次数是0,重启策略为固定延时重启,重启100次。
> 经过测试发现,checkpoint连续失败多次,程序还在运行,不知道是什么原因导致容忍checkpoint失败次数这个设置没有生效。


Re: flink state.savepoints.dir 目录配置问题

2020-10-26 文章 Congxian Qiu
Hi
从报错来看是无法在 "/opt/flink/savepoints" 这个路径下创建目录,这个错误下面应该应该一些异常信息,可以看下具体是啥原因。
另外,使用本地路径存储 checkpoint/savepoint 的话,那么恢复的时候,需要确保该文件能被新的 JM/TM 所访问到

Best,
Congxian


marble.zh...@coinflex.com.INVALID 
于2020年10月26日周一 下午3:32写道:

> 你好,
> 我在flink jobmanager里的flink-conf.yaml添加了以加三个个关的state配置参数,
> state.backend: filesystem
> state.checkpoints.dir: file:///opt/flink/savepoints
> state.savepoints.dir: file:///opt/flink/savepoints
>
>
> 但在做./flink savepoint 时还是报以下的错,
>
> Caused by: java.io.IOException: Failed to create savepoint directory at
> file:/opt/flink/savepoints
> at
>
> org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorage.initializeLocationForSavepoint(AbstractFsCheckpointStorage.java:167)
> at
>
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.lambda$initializeCheckpoint$8(CheckpointCoordinator.java:633)
> ... 8 more
>
>
> 我需要做哪些调整,是只能支持hdfs吗?  多谢, 急。
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: 退订

2020-10-19 文章 Congxian Qiu
Hi
   退订请发邮件到  user-zh-unsubscr...@flink.apache.org  更多详细情况可以参考[1]
[1] https://flink.apache.org/community.html#mailing-lists
Best,
Congxian


费文杰 <15171440...@163.com> 于2020年10月20日周二 下午1:51写道:

> hi:
>  退订!


Re: flink1.10 stop with a savepoint失败

2020-10-19 文章 Congxian Qiu
Hi
你可以看下 JM log 中这个 savepoint 失败是什么原因导致的,如果是 savepoint 超时了,就要看哪个 task
完成的慢,(savepoint 可能比 checkpoint 要慢)
Best,
Congxian


Robin Zhang  于2020年10月19日周一 下午3:42写道:

> 普通的source -> map -> filter-> sink 测试应用。
>
> 触发savepoint的脚本 :
> ${FLINK_HOME} stop -p ${TARGET_DIR} -d ${JOB_ID}
> 具体报错信息:
>
> org.apache.flink.util.FlinkException: Could not stop with a savepoint job
> "81990282a4686ebda3d04041e3620776".
> at
> org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:462)
> at
>
> org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:843)
> at
> org.apache.flink.client.cli.CliFrontend.stop(CliFrontend.java:454)
> at
>
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:907)
> at
>
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at
>
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1962)
> at
>
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> at
> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
> Caused by: java.util.concurrent.TimeoutException
> at
>
> java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771)
> at
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
> at
> org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:460)
> ... 9 more
>
>
>
> 查看报错,怀疑是权限问题,我是root用户启动的应用,savepoint目录所在的hdfs路径权限所属也是root,如果不停止应用,直接触发savepoint没问题,继续定位到是root用户去停止hadoop
> 应用遇到权限问题,但是不知道怎么解决,目前卡在这里。
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: 求助:如何处理数据不连续导致状态无法清理

2020-10-19 文章 Congxian Qiu
Hi
 或许你可以使用 timer 来进行兜底,注册一个未来某个时间的 timer,然后 timer 触发的时候把 state 清理掉
Best,
Congxian


x <35907...@qq.com> 于2020年10月19日周一 下午2:55写道:

> 版本为v1.10.1
> 使用AggregateFunction+ProcessWindowFunction的方式,进行实时统计,ProcessWindowFunction中涉及状态的累计运算,使用事件时间,按维度+日期分区,按分钟开窗,跨天需要将状态清除,避免状态越来越大。状态清除的逻辑,覆盖ProcessWindowFunction的clear方法,判断窗口开始时间是否为“23:59:00”,如下:override
> def clear(ctx: Context): Unit = {
>   val dt = new SimpleDateFormat("HH:mm:00").format(ctx.window.getStart)
>   if(dt.equals("23:59:00")){
>
> state.clear()遇到的一个问题是,开窗前,keyBy分区时,有的key对应的数据不连续,十分稀疏,可能会出现每天的最后一个窗口没有数据,导致无法触发状态清理逻辑,导致总状态数据越来越大的现象,请问各位老师,有什么好的办法,可以避免这种情况吗?


Re: How to use

2020-10-18 文章 Congxian Qiu
Hi
   To subscribe to the user mail list, you need to send a mail to
user-zh-subscr...@flink.apache.org, you can get more info here[1]
可以发送邮件到 user-zh-subscr...@flink.apache.org 订阅 user-zh 邮件列表

https://flink.apache.org/community.html#mailing-lists

Best,
Congxian


fangzhou ding  于2020年10月19日周一 下午12:10写道:

> How to receive email from here
>


Re: Re:Re: Flink 1.10.1 checkpoint失败问题

2020-10-18 文章 Congxian Qiu
FYI  分享一个可能相关的文章[1]

[1] https://cloud.tencent.com/developer/news/564780

Best,
Congxian


Storm☀️  于2020年10月15日周四 上午10:42写道:

> 非常感谢。
> 后续我关注下这个问题,有结论反馈给大家,供参考。
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink on yarn容器异常退出

2020-10-12 文章 Congxian Qiu
Hi
容易异常退出是指 container 退出吗?可以看下 JM/TM log 是否有相应信息,如果没有,可以尝试从 yarn 侧看下日志为什么
container 退出了
Best,
Congxian


caozhen  于2020年10月12日周一 下午6:08写道:

>
> 可以发下 "分配完applicationid后,容器经常异常退出"  产生的错误日志吗?
>
> 或者排查下flink客户端中的错误日志,以及yarn-historyserver里的日志。
>
> 
>
> Dream-底限 wrote
> > hi
> > 我正在使用flink1.11.1 on
> >
> yarn以分离模式运行任务,但在任务提交的时候,任务在分配完applicationid后,容器经常异常退出,先前以为是yarn环境问题,但是在两个集群测都有遇到这种情况,请问这是一个已知的bug吗
>
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Flink on K8s statebackend 配置

2020-10-12 文章 Congxian Qiu
Hi
  从错误日志看,应该是 filesystem 相关的配置(或者 jar 包)有问题,可以参考下这个邮件列表[1]看看能否解决你的问题

[1]
http://apache-flink.147419.n8.nabble.com/Flink-1-11-1-on-k8s-hadoop-td5779.html#a5834
Best,
Congxian


superainbower  于2020年9月30日周三 下午3:04写道:

> 补充一下,我的错误日志
> Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException:
> Hadoop is not in the classpath/dependencies.
> Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException:
> Could not find a file system implementation for scheme 'hdfs'. The scheme
> is not directly supported by Flink and no Hadoop file system to support
> this scheme could be loaded. For a full list of supported file systems,
> please seehttps://
> ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/.
>
>
> 应该是没有Hadoop的路径,这个在K8s下面 该怎么去配置呢
> | |
>
>
> superainbower
> |
> |
> superainbo...@163.com
> |
> 签名由网易邮箱大师定制
>
>
> 在2020年09月30日 14:33,superainbower 写道:
> Hi,all
> 请教下,哪个朋友知道Flink on K8s上做 statebackend 配置,除了将下列配置写到flink-conf.yml里,还需要作哪些工作?
> state.backend: rocksdb
> state.checkpoints.dir: hdfs://master:8020/flink/checkpoints
> state.savepoints.dir: hdfs://master:8020/flink/savepoints
> state.backend.incremental: true
>
>
> | |
> superainbower
> |
> |
> superainbo...@163.com
> |
> 签名由网易邮箱大师定制
>
>


Re: Flink 1.10.1 checkpoint失败问题

2020-10-12 文章 Congxian Qiu
Hi, @Storm 请问你用的是 flink 是哪个版本,然后栈是什么呢?可以把相关性信息回复到这里,可以一起看看是啥问题

Best,
Congxian


大森林  于2020年10月10日周六 下午1:05写道:

> 我这边是老版本的jdk8,和jdk261没啥关系的
>
>
>
>
> --原始邮件--
> 发件人:
>   "user-zh"
> <
> storm_h_2...@163.com;
> 发送时间:2020年10月10日(星期六) 上午9:03
> 收件人:"user-zh"
> 主题:Re: Flink 1.10.1 checkpoint失败问题
>
>
>
> 尝试了将jdk升级到了261,报错依然还有。
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Re: rocksdb增量ckeckpoint问题

2020-10-08 文章 Congxian Qiu
Hi
   增量 checkpoint 是指,每次只上传的 *必须的* sst 文件。因为 RocksDB 生成的 sst 文件是不可变的,所以之前上传过的
sst 文件直接引用即可,这样减少了很多 sst 文件的上传(也减少了 HDFS 的存储和删除等操作)

Best,
Congxian


宁吉浩  于2020年10月9日周五 上午10:20写道:

> 没看过源码,看过一些文档,结论还需验证(应该不用了)。
> 增量checkpoint指的是
> 把内存中的state写入hdfs的时不全量写入,而是写入和上次checkpoint不一致的地方,hdfs底层文件的话会有依赖关系。也就是说本次的依赖上一次,上一次的依赖上上一次。
>
> 底部还有一个逻辑是定期合并checkpoint,这个是操作hdfs文件的,checkpoint保留个数可以配置,实际上hdfs上也不会存储太多checkpoint,就是合并这些state。
> 如下是官网连接:
> https://flink.apache.org/features/2018/01/30/incremental-checkpointing.html
>
> 大胆猜测:
> 先写入增量state,然后等待时机和之前的state合并,由于只保留一个checkpoint,所以每次都是触发合并逻辑。
> checkpoint-state是增量
> 但每次都要把之前的state进行合并
>
>
> --
> 发件人:熊云昆 
> 发送时间:2020年10月6日(星期二) 16:53
> 收件人:user-zh@flink.apache.org 
> 主 题:rocksdb增量ckeckpoint问题
>
> Hi,
>
> 有个rocksdb增量checkpoint的问题不明白,如果state.checkpoints.num-retained默认设置为1,意味着checkpoint默认只保留1个,那么在增量checkpoint的时候,它是无法引用上一个checkpoint的备份的sst文件的,其实还是相当于全量备份了,对不对?
>
>
> | |
> 熊云昆
> |
> |
> 邮箱:xiongyun...@163.com
> |
>
> 签名由 网易邮箱大师 定制


Re: checkpoint rocksdb hdfs 如何协调,保证数据不丢失

2020-09-28 文章 Congxian Qiu
Hi
   RocksDB 里面存的是 State,Flink 在做 checkpoint 的时候会把 State 备份到 HDFS 上,如果失败的话从
Checkpoint 进行恢复,如果想了解更详细的内容,可以参考文档[1][2] 以及文档里面的链接
[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/state/checkpointing.html
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/learn-flink/fault_tolerance.html

Best,
Congxian


Michael Ran  于2020年9月29日周二 上午11:06写道:

> dear all :
> 我们checkpoint 信息默认保存在rocksdb上,但是rocksdb
> 是一个单机系统,性能OK,要做到不丢失还是要备份到hdfs分布式文件系统上。
>
>
>问题是:
>1. 如果仅保存在rocksdb,那么单机物理损坏,数据是会丢失的。
>2. 如果仅保存hdfs,那么性能可能跟不上
>3.如果先保存到rocksdb,再定时备份到hdfs,那么是多久备份一次?中间出现物理损坏,还是会出现一端时间的丢失。
>4. 这块的详细设计,和具体流程、场景有合适的文档推荐吗?怎么再性能和数据完整性上做考虑的


Re: flunk - checkpoint

2020-09-27 文章 Congxian Qiu
Hi
   checkpoint 会按照用户设定的周期定期触发, 同时也会收到 minPauseBetweenCheckpoints 以及
maxConcurrentCheckpoints 等参数的控制,具体的可以看一下这个文档[1],
[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/state/checkpointing.html#enabling-and-configuring-checkpointing
Best,
Congxian


郝文强 <18846086...@163.com> 于2020年9月28日周一 上午10:34写道:

>
>
> checkpoint 到底是在什么时间点触发的呢?
> 我看过官方关于checkpoint的文档,但还是说不清 checkpoint 是什么时候触发的求大佬指点
> | |
> 郝文强
> |
> |
> 18846086...@163.com
> |
> 签名由网易邮箱大师定制
>
>


Re: Checkpoint dir is not cleaned up after cancel the job with monitoring API

2020-09-27 文章 Congxian Qiu
Hi Eleanore

What the `CheckpointRetentionPolicy`[1] did you set for your job? if
`ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION` is set, then the
checkpoint will be kept when canceling a job.

PS the image did not show

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/checkpoints.html#retained-checkpoints
Best,
Congxian


Eleanore Jin  于2020年9月27日周日 下午1:50写道:

> Hi experts,
>
> I am running flink 1.10.2 on kubernetes as per job cluster. Checkpoint is
> enabled, with interval 3s, minimumPause 1s, timeout 10s. I'm
> using FsStateBackend, snapshots are persisted to azure blob storage
> (Microsoft cloud storage service).
>
> Checkpointed state is just source kafka topic offsets, the flink job is
> stateless as it does filter/json transformation.
>
> The way I am trying to stop the flink job is via monitoring rest api
> mentioned in doc
> 
>
> e.g.
> curl -X PATCH \
>   'http://localhost:8081/jobs/3c00535c182a3a00258e2f57bc11fb1a?mode=cancel'
> \
>   -H 'Content-Type: application/json' \
>   -d '{}'
>
> This call returned successfully with statusCode 202, then I stopped the
> task manager pods and job manager pod.
>
> According to the doc, the checkpoint should be cleaned up after the job is
> stopped/cancelled.
> What I have observed is, the checkpoint dir is not cleaned up, can you
> please shield some lights on what I did wrong?
>
> Below shows the checkpoint dir for a cancelled flink job.
> [image: image.png]
>
> Thanks!
> Eleanore
>
>


Re: Flink 1.10.1 checkpoint失败问题

2020-09-27 文章 Congxian Qiu
Hi
这个问题是应该和 FLINK-17479 是一样的,是特定 JDK 上会遇到问题,可以考虑升级一下 flink 版本,或者替换一个 JDK 版本

Best,
Congxian


Storm☀️  于2020年9月27日周日 上午10:17写道:

> 各位好,checkpoint相关问题L
>
> flink版本1.10.1:,个别的checkpoint过程发生问题:
> java.lang.Exception: Could not perform checkpoint 1194 for operator Map
> (3/3).
> at
>
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:816)
> at
> org.apache.flink.streaming.runtime.io
> .CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:86)
> at
> org.apache.flink.streaming.runtime.io
> .CheckpointBarrierTracker.processBarrier(CheckpointBarrierTracker.java:99)
> at
> org.apache.flink.streaming.runtime.io
> .CheckpointedInputGate.pollNext(CheckpointedInputGate.java:155)
> at
> org.apache.flink.streaming.runtime.io
> .StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:133)
> at
> org.apache.flink.streaming.runtime.io
> .StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69)
> at
>
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:310)
> at
>
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187)
> at
>
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:485)
> at
>
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:469)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.NullPointerException
> at
>
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1382)
> at
>
> org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:974)
> at
>
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$5(StreamTask.java:870)
> at
>
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
> at
>
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:843)
> at
>
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:803)
> ... 12 mor
>
> 绝大部分是正常完成的,但是小部分比如上面的情况,就会失败,还会导致suspending-->restart.
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: 退订

2020-09-20 文章 Congxian Qiu
Hi
   退订请发邮件到 user-zh-unsubscr...@flink.apache.org
   详情可以参考文档[1]

[1] https://flink.apache.org/community.html#mailing-lists
Best,
Congxian


Han Xiao(联通集团联通支付有限公司总部)  于2020年9月18日周五 下午2:52写道:

>
> 退订
>
> 如果您错误接收了该邮件,请通过电子邮件立即通知我们。请回复邮件到 
> hqs-s...@chinaunicom.cn,即可以退订此邮件。我们将立即将您的信息从我们的发送目录中删除。
> If you have received this email in error please notify us immediately by
> e-mail. Please reply to hqs-s...@chinaunicom.cn ,you can unsubscribe from
> this mail. We will immediately remove your information from send catalogue
> of our.
>


Re: [ANNOUNCE] Apache Flink 1.11.2 released

2020-09-20 文章 Congxian Qiu
Thanks for being the release manager Zhu Zhu and everyone involved in!

Best,
Congxian


Weijie Guo 2  于2020年9月18日周五 下午11:42写道:

> Good job! Very thanks @ZhuZhu for driving this and thanks for all
> contributed
> to the release!
>
> best,
> Weijie
> Zhu Zhu-2 wrote
> > The Apache Flink community is very happy to announce the release of
> Apache
> > Flink 1.11.2, which is the second bugfix release for the Apache Flink
> 1.11
> > series.
> >
> > Apache Flink® is an open-source stream processing framework for
> > distributed, high-performing, always-available, and accurate data
> > streaming
> > applications.
> >
> > The release is available for download at:
> > https://flink.apache.org/downloads.html
> >
> > Please check out the release blog post for an overview of the
> improvements
> > for this bugfix release:
> > https://flink.apache.org/news/2020/09/17/release-1.11.2.html
> >
> > The full release notes are available in Jira:
> >
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12348575
> >
> > We would like to thank all contributors of the Apache Flink community who
> > made this release possible!
> >
> > Thanks,
> > Zhu
>
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: 含有多个kafka-source的job里如果用events_time作为时间的话,如何解决快流通过水印覆盖漫流的情况。

2020-09-15 文章 Congxian Qiu
Hi
没有太明白你这里为什么数据较少的 source 会覆盖数据更多的 source。能否详细描述下呢?
如果是因为某些 source 的数据把 watermark 给推高了,从而导致其他的 source
数据变成迟到的数据,那么我理解这个暂时没法直接解决的,因为 watermark 是 task 级别的。如果你想要在一个 Flink
作业中实现更细粒度的 watermark,或许你可以尝试下 processfuncton[1] 自己来定制一套类似的机制
另外你也可以看下这个文档[2] 看是否在你的场景中有帮助

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/operators/process_function.html#the-keyedprocessfunction
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html#watermark-strategies-and-the-kafka-connector
Best,
Congxian


hao kong  于2020年9月16日周三 上午10:24写道:

> hello,我有一份来自多个Kafka-source的工作。它们都包含某些历史数据。如果使用events-time
> window,它将导致数据较少的source通过water-mark覆盖数据更多的source。
>
> 目前我能想到的方案是用一个在source上的调度器,通过redis或者zookeeper缓存每一个source下两条数据,统一比较,时间小的先进入接下来的处理流程,实现起来比较复杂,各位大佬有什么好的办法吗?十分感谢。
>


Re: flink 填补窗口问题

2020-09-15 文章 Congxian Qiu
Hi
 这个需求可以考虑用 processfunction[1] 来实现,window 的 state 只能给单独的 window 使用,其他
window 不能操作之前的 window 的 state
[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/operators/process_function.html#the-keyedprocessfunction
Best,
Congxian


marble.zh...@coinflex.com.INVALID 
于2020年9月15日周二 下午4:53写道:

> 可以补吗? 比如我现在是1分钟的窗口,要是这一分种 没有message,那就以上一个窗口的数据作为这一窗口的数据。
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink检查点

2020-09-15 文章 Congxian Qiu
Hi
   如果你们删除过文件的话,那么你可以检查 chk-xxx 目录下是否有 _metadata 文件存在,存在基本就是可用的
checkpoint。如果你有删除过文件的话,则需要读取 _metadata 文件,然后看看是否所有文件都存在。
Best,
Congxian


Dream-底限  于2020年9月16日周三 上午10:27写道:

> hi、
> 我正在做基于检查点的任务自动恢复,请问有没有什么方法来检查检查点是否是一个完整有效的检查点,因为有的时候检查点会失败,所以有没有api层面的校验方式
>


Re: [sql-client][checkpoint] sql-client提交任务,checkpoint没有写入hdfs

2020-09-15 文章 Congxian Qiu
Hi
   你可以参考这里[1] 自己进行一些修改尝试,来分析 metadata 文件
[1]
https://github.com/apache/flink/blob/master/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointMetadataLoadingTest.java
Best,
Congxian


Harold.Miao  于2020年9月15日周二 下午1:58写道:

> 是同一个insert任务, 只是重启任务的时候 ,我加了这些代码,构造一个 SavepointRestoreSettings  来恢复cp
>
> 请教   我如何判断cp真正写入了hdfs呢,meta文件有什么工具可以解析吗
>
> 谢谢
>
> Jark Wu  于2020年9月15日周二 上午11:31写道:
>
> > 是不是你的 cp 恢复的代码,没有执行任何的 insert into 语句?
> >
> > On Mon, 14 Sep 2020 at 20:15, Harold.Miao  wrote:
> >
> > > 还有一点是 我们修改了sql-client代码, 让任务从cp恢复,修改如下
> > >
> > > private StreamExecutionEnvironment createStreamExecutionEnvironment() {
> > >final StreamExecutionEnvironment env =
> > > StreamExecutionEnvironment.getExecutionEnvironment();
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > > *   LOG.info("restore cp exist: {}",
> > > environment.getExecution().getRestoreSp().isPresent());   if
> > > (environment.getExecution().getRestoreSp().isPresent()) {
> > > LOG.info("restore cp path: {}",
> > > environment.getExecution().getRestoreSp().get());  if
> > > (!environment.getExecution().getRestoreSp().get().contains("none")) {
> > >SavepointRestoreSettings savepointRestoreSettings =
> > >
> > >
> >
> SavepointRestoreSettings.forPath(environment.getExecution().getRestoreSp().get(),
> > > true);
> > >
> >
> env.getStreamGraph().setSavepointRestoreSettings(savepointRestoreSettings);
> > >  }   }*
> > >// for TimeCharacteristic validation in StreamTableEnvironmentImpl
> > >
> > >
> >
> env.setStreamTimeCharacteristic(environment.getExecution().getTimeCharacteristic());
> > >if (env.getStreamTimeCharacteristic() ==
> > TimeCharacteristic.EventTime) {
> > >
> > >
> >
> env.getConfig().setAutoWatermarkInterval(environment.getExecution().getPeriodicWatermarksInterval());
> > >}
> > >return env;
> > > }
> > >
> > >
> > > 传入上面那个只有meta文件地址的时候报错如下:
> > >
> > > Exception in thread "main"
> > > org.apache.flink.table.client.SqlClientException: Unexpected
> > > exception. This is a bug. Please consider filing an issue.
> > > at
> > org.apache.flink.table.client.SqlClient.main(SqlClient.java:213)
> > > Caused by: org.apache.flink.table.client.gateway.SqlExecutionException:
> > > Could not create execution context.
> > > at
> > >
> >
> org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:879)
> > > at
> > >
> >
> org.apache.flink.table.client.gateway.local.LocalExecutor.openSession(LocalExecutor.java:227)
> > > at
> > > org.apache.flink.table.client.SqlClient.start(SqlClient.java:108)
> > > at
> > org.apache.flink.table.client.SqlClient.main(SqlClient.java:201)
> > > Caused by: java.lang.IllegalStateException: No operators defined in
> > > streaming topology. Cannot execute.
> > > at
> > >
> >
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraphGenerator(StreamExecutionEnvironment.java:1870)
> > > at
> > >
> >
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1861)
> > > at
> > >
> >
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1846)
> > > at
> > >
> >
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1834)
> > > at
> > >
> >
> org.apache.flink.table.client.gateway.local.ExecutionContext.createStreamExecutionEnvironment(ExecutionContext.java:691)
> > >     at
> > >
> >
> org.apache.flink.table.client.gateway.local.ExecutionContext.createTableEnvironment(ExecutionContext.java:593)
> > > at
> > >
> >
> org.apache.flink.table.client.gateway.local.ExecutionContext.initializeTableEnvironment(ExecutionContext.java:498)
> > > at
> > >
> >
> org.apache.flink.table.client.gateway.local.ExecutionContext.(ExecutionContext.java:184)
> > > at
> > >
> >
> org.apache.flink.table.client.gateway.local.ExecutionContext.(ExecutionContext.java:137)
> > 

Re: [sql-client][checkpoint] sql-client提交任务,checkpoint没有写入hdfs

2020-09-14 文章 Congxian Qiu
Hi
   如果你的 state 都非常小的话,可能就会保存在 meta 文件中了,这样的话就只有 _metadata
这一个文件的。具体逻辑可以看一下这里[1]

[1]
https://github.com/apache/flink/blob/9b0fb562898b809b860cf0065ded7a45c49300af/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java#L442
Best,
Congxian


Harold.Miao  于2020年9月14日周一 下午6:44写道:

> hi  all
>
> flink 版本: 1.11.1
>
> 我们利用sql-client提交任务, flink-conf.yaml配置如下
>
> state.backend: filesystem
> state.backend.fs.checkpointdir:
> hdfs:///ai/flink/checkpoint/dataclean/hl-redis0902/checkpoint-data/23252
> state.checkpoints.dir:
> hdfs:///ai/flink/checkpoint/dataclean/hl-redis0902/checkpoint-meta/23252
> state.savepoints.dir:
> hdfs:///ai/flink/checkpoint/dataclean/hl-redis0902/savepoint/23252
>
> execution.checkpointing.externalized-checkpoint-retention:
> RETAIN_ON_CANCELLATION
> execution.checkpointing.interval: 60s
> execution.checkpointing.mode: EXACTLY_ONCE
> jobmanager.execution.failover-strategy: full
> state.backend.incremental: true
>
>
> 任务运行后,在UI界面上看checkpoint都成功了。 但是hdfs上面却一直只有一个meta文件
>
> 类似下面:
>
> hdfs://
> 10.218.60.57:8020/ai/flink/checkpoint/dataclean/hl-redis0902/checkpoint-meta/23250/c72c1ee4362c3d0ba72db32698363fcf/chk-5/_metadata
>
> 除了这个文件,其他什么都没有。
>
> 我们的源是kafka,kafka肯定会保存state的。
>
>
> 请教大家这是什么原因导致的呢
>
>
> 谢谢
>
>
>
>
>
>
>
>
>
>
> --
>
> Best Regards,
> Harold Miao
>


Re: 请教一下Flink和主流数据湖集成的情况

2020-09-14 文章 Congxian Qiu
Hi
   据我所知,iceberg 有一个 flink 的 sink,可以看下这个 PR[1]
[1] https://github.com/apache/iceberg/pull/856
Best,
Congxian


dixingxing85  于2020年9月12日周六 下午4:54写道:

> Hi all:
> 想请教一个问题,现在一些公司已经开始应用数据湖技术了,目前flink和iceberg,hudi,delta
> lake这些的集成情况分别是怎样的?社区有主动跟进某一个数据湖技术的集成吗?预计什么时候能有相对完善的source,sink。谢谢
>
> Sent from my iPhone
>
>
> Sent from my iPhone


Re: Flink 使用 RocksDB CPU 打满

2020-09-13 文章 Congxian Qiu
Hi
   index 相关的,或许可以看下这个文档[1]
[1]
https://github.com/facebook/rocksdb/wiki/Block-Cache#caching-index-filter-and-compression-dictionary-blocks
Best,
Congxian


范瑞 <836961...@qq.com> 于2020年9月2日周三 下午2:05写道:

> 再补充一下正常 subtask 火焰图:
> https://drive.google.com/file/d/1uiH2vNi0kMGHuiHOW5Wq-m053ys4rFHu/view?usp=sharing
> 代码中多次对状态的访问,从火焰图来看: contains 操作仅占整个 CPU 的 5.92%,且代码中多次对状态的访问占的 CPU 都是均匀的。
> 有问题的 火焰图,一个 MapState 的 contains 操作就把 CPU 打到了 90%
>
>
>
>
> --原始邮件--
> 发件人:
>   "user-zh"
> <
> qcx978132...@gmail.com;
> 发送时间:2020年9月2日(星期三) 中午1:38
> 收件人:"user-zh"
> 主题:Re: Flink 使用 RocksDB CPU 打满
>
>
>
> Hi
>  从火焰图看,RocksDB#get 操作占用的时间较多,contains 会调用 RocksDB
> 的 get 函数
>  1. 你使用的是哪个版本的 Flink?
>  2. 不同 subtask 之间的数据是否均匀呢?这里主要想知道调用 RocksDB 的 get
> 函数调用频次是否符合预期
>  3. 如果我理解没错的话,有 snappy 的压缩,这个会有 IO 的操作(也就是从磁盘 load
> 数据),可能还需要看下为什么这个
> subtask 的数据大量落盘
> Best,
> Congxian
>
>
> fanrui <836961...@qq.com 于2020年9月1日周二 下午9:14写道:
>
>  备注一下:
>  Flink 任务并行度 1024,运行几分钟,就会有四五个 subtask 出现上述现象,其余 subtask 正常。
>  正常的 subtask 打出的火焰图是正常的:代码中每一步都占用了一部分 CPU,而不是 MapState 的 contains
> 操作占用了了大量
>  CPU。
> 
> 
> 
>  --
>  Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink任务运行一段时间checkpoint超时,任务挂掉

2020-09-13 文章 Congxian Qiu
Hi
   1 你的作业运行的是哪个版本
   2 你作业挂掉应该是 tolerable failure threshold 超了导致的,这个可以在 checkpoint config
中进行配置,这样 checkpoint 失败后不会导致作业失败
   3 如果可以的话,你可以上传一下 jm 和 tm log
Best,
Congxian


jordan95225  于2020年9月7日周一 上午11:05写道:

> Hi,
> 我现在有一个flink任务,运行一段时间后checkpoint会超时,INFO信息如下:
> checkpoint xxx of job xxx expired before completing.
> Trying to recover from a global failure.
> org.apache.flink.util.FlinkRuntimeException: Excedded checkpoint toerable
> failure threshold.
> 然后我查看了taskmanager日志,在报错之前的日志有一条WARN:
> WARN  akka.remote.Remoting [] -
> Association to [akka.tcp://flink@hadoop43:38839] with unknown UID is
> irrecoverably failed. Address cannot be quarantined without knowing the
> UID,
> gating instead for 50 ms.
> 这条WARN之后task就开始Attempting to cancel task Source,不知道是因为什么原因,期望收到各位的回复
> Best
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: Flink1.10.0 的checkpoint越来越大

2020-09-13 文章 Congxian Qiu
Hi
   对于 checkpoint size 持续变大的情况,可以考虑下:
   1)你使用啥 backend,是否使用 incremental 模式;checkpoint interval 是多少,tps
大概多少。这些数据用于评估 rocksdb incremental 下 checkpoint size 的大小
   2)看一下 hdfs 上的 checkpoint 路径占用大小是否有变化
   3)像 hk__lrzy 说的那样,state 是否没有清理(这里还需要看下 window 相关的)
Best,
Congxian


hk__lrzy  于2020年9月11日周五 下午2:44写道:

> 状态每次有做过清理么。还是在原始基础上进行add的呢,可以贴下代码
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Flink 1.5.0 savepoint 失败

2020-09-13 文章 Congxian Qiu
Hi
   从错误栈看是  Wrong FS:
file:///data/emr_flink_savepoint_share/savepoint-0705a3-09a2f171a080/e2f63448-eed9-4038-a64a-e874a1a99ba1,
expected: hdfs://flink-hdfs 这个导致的,你能把 savepoint 写到 hdfs://flink-hdfs 这个集群吗?
Best,
Congxian


hk__lrzy  于2020年9月11日周五 下午2:46写道:

> 代码是不是主动设置过stagebackend的地址呢
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: 无法从checkpoint中恢复state

2020-09-03 文章 Congxian Qiu
Hi
   从 retain checkpoint 恢复可以参考文档[1]

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/ops/state/checkpoints.html#%E4%BB%8E%E4%BF%9D%E7%95%99%E7%9A%84-checkpoint-%E4%B8%AD%E6%81%A2%E5%A4%8D%E7%8A%B6%E6%80%81

Best,
Congxian


sun <1392427...@qq.com> 于2020年9月3日周四 下午4:14写道:

> 你好,我有2个问题
>
> 1:每次重启服务,checkpoint的目录中chk- 总是从chk-1开始,chk-2 ,没有从上次的编号开始
>
> 2:重启服务后,没有从checkpoint中恢复state的数据
>
> 下面是我的配置,我是在本地调试的,单机
>
>
>
> final StreamExecutionEnvironment streamExecutionEnvironment =
> StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
>
> //StateBackend stateBackend = new RocksDBStateBackend("hdfs://
> 10.100.51.101:9000/flink/checkpoints",true);
> StateBackend stateBackend = new
> FsStateBackend("file:///flink/checkpoints");
> //StateBackend stateBackend = new MemoryStateBackend();
> streamExecutionEnvironment.setStateBackend(stateBackend);
>
> streamExecutionEnvironment.enableCheckpointing(1000);
>
> streamExecutionEnvironment.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
>
> streamExecutionEnvironment.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
>
> streamExecutionEnvironment.getCheckpointConfig().setCheckpointTimeout(6);
>
> streamExecutionEnvironment.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
> streamExecutionEnvironment.getCheckpointConfig()
>
> .enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);


Re: Flink 涉及到 state 恢复能从 RocketMQ 直接转到 Kafka 吗?

2020-09-03 文章 Congxian Qiu
从之前的 checkpoint/savepoint 恢复的话,加上 -n 或者 --allowNonRestoredState
是可以恢复的,不过需要注意如何保证从 *特定* 的 offset 进行恢复

Best,
Congxian


Paul Lam  于2020年9月3日周四 上午11:59写道:

> 可以,保证 RokcetMQ source 算子的 uid 和原本的 Kafka source 算子的 uid 不同就行。
> 另外启动要设置参数 -n 或 —allowNonRestoredState 。
>
> Best,
> Paul Lam
>
> > 2020年9月2日 17:21,wangl...@geekplus.com 写道:
> >
> >
> > 有一个 flink streaming 的程序,读 RocketMQ,中间有一些复杂度计算逻辑用 RocksDB state 存储.
> > 程序有小的更新直接  cancel -s 取消再 run -s 恢复
> >
> > 现在我们需要用 Kafka 替换掉 RocketMQ,消息内容都是一样的, flink 程序需要改一下改为读 Kafka
> > 我可以直接 cancel -s 后再 run -s 复用之前的 state 吗?
> >
> >
> >
> >
> >
> > wangl...@geekplus.com
>
>


Re: 从Savepoint/Checkpoint恢复时 keyedstate中部分数据无法被读取

2020-09-02 文章 Congxian Qiu
Hi
   按理说 checkpoint/savepoint 有的数据,正常恢复后是可以读取到的。
   1 正常从 checkpoint/savepoint 恢复了吗?
   2 获取 state 的时候,key 是同一个 key 吗?
Best,
Congxian


Liu Rising  于2020年9月3日周四 上午9:28写道:

> 版本: 1.9
>
> 问题:
> 当从savepoint或者checkpoint恢复flink job时,发现部分keyedState中的数据丢失。
> 这里我们使用的是ListState,里面存储的是ObjectNode(Jackson DataBinding)类型的对象。
>
> 查log发现部分key的 listState.get() 返回空的iterator。
> 然而使用State Process API确认State的内容时, 发现上述这些key对应的数据是存在于State中的。
>
> 求问各位大佬这种情况是怎么回事?应该如何排查
> 谢谢
>
>
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Flink 使用 RocksDB CPU 打满

2020-09-01 文章 Congxian Qiu
Hi
从火焰图看,RocksDB#get 操作占用的时间较多,contains  会调用 RocksDB 的 get 函数
1. 你使用的是哪个版本的 Flink?
2. 不同 subtask 之间的数据是否均匀呢?这里主要想知道调用 RocksDB 的 get 函数调用频次是否符合预期
3. 如果我理解没错的话,有 snappy 的压缩,这个会有 IO 的操作(也就是从磁盘 load 数据),可能还需要看下为什么这个
subtask 的数据大量落盘
Best,
Congxian


fanrui <836961...@qq.com> 于2020年9月1日周二 下午9:14写道:

> 备注一下:
> Flink 任务并行度 1024,运行几分钟,就会有四五个 subtask 出现上述现象,其余 subtask 正常。
> 正常的 subtask 打出的火焰图是正常的:代码中每一步都占用了一部分 CPU,而不是 MapState 的 contains 操作占用了了大量
> CPU。
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink cancel命令

2020-09-01 文章 Congxian Qiu
Hi
   可以看下 本地的 log 文件($FLINK_DIR/log 下)以及 jm log,看看有没有什么异常情况
Best,
Congxian


462329521 <462329...@qq.com> 于2020年9月1日周二 下午3:41写道:

>
> Hello,我在flink1.8使用flinkcancel命令后,任务处于canceled状态,但是yarn上仍在running,请问是什么情况


Re: flink checkpoint导致反压严重

2020-08-31 文章 Congxian Qiu
Hi
如果我理解没错的话,这种 单 key 热点的问题,需要算 中位数(无法像 sum/count
这样分步计算的),只能通过现在你写的这种方法,先分布聚合,然后最终再计算中位数。不过或许可以找找数学方法,看有没有近似的算法
Best,
Congxian


赵一旦  于2020年9月1日周二 上午10:15写道:

> (1)url理论上足够多,也足够随机。而并行度比如是30,url理论上是万、十万、百万、千万级别,理论上不会出现数据倾斜吧。
> (2)如果的确有倾斜,那么你那个方法我看不出有啥用,我看你好像是全缓存下来?这没啥用吧。
> (3)我的思路,考虑到你是要求1分钟窗口,每个url维度的,response的中位数。所以本质需要url+time维度的全部response数据排序。
>  由于url数量可能比较少(比如和并行度类似),导致了数据倾斜。所以key不能仅用url,需要分步。
>
>
>  
> 分步方法:如果url的访问量总体极大,则response的值应该有很大重复,比如url1对应response=2ms的有1000个,对应3ms的有500个等这种量级。这样的话直接url+response作为key作为第一级统计也可以降低很大压力,同时加了response后应该就够分散了。第2级别拿到的是url级别的不同response+出现次数的数据。根据这些是可以计算中位数的,同时第二步压力也低,因为url1可能有1w流量,但response的不同值可能是100个。
>  前提背景:每个url的流量 >> 该url的response不同值(即具有相同response+url的流量不少)。
>
> JasonLee <17610775...@163.com> 于2020年8月31日周一 下午7:31写道:
>
> > hi
> >
> > 我理解应该是数据倾斜的问题导致的 可以看下采用加随机数的方式key是否分布的均匀.
> >
> >
> >
> > --
> > Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: Flink Kafka作业异常重启,从Checkpoint恢复后重复推送数据部分数据如何处理?

2020-08-27 文章 Congxian Qiu
Hi
   checkpoint 只能保证 state 的 exactly once,但是单条数据可能重复处理多次,如果是 sink
输出多次的话,或许你可以看一下 TwoPhaseCommitSinkFunction 相关的,这篇文章有一个相关的描述[1]

[1]
https://www.ververica.com/blog/end-to-end-exactly-once-processing-apache-flink-apache-kafka
Best,
Congxian


Kevin Dai <154434...@qq.com> 于2020年8月28日周五 上午9:44写道:

> Flink ETL作业生成实时DWD宽表数据,写入Kafka中。
>
> 当ETL作业的TM出现异常,自动重启恢复后,作业虽然能从上一次Checkpoint状态恢复,但是会出现重复推送部分数据,导致下游DWS相关作业都要进行去重处理,增加下游作业成本。
> 想了下解决方案,扩展Kafka
> Sink,初始化的时候,先读取当前State中记录的位置后面的所有数据,然后写入的时候进行去重处理,恢复到正常位置后,清理掉这部分数据。
> 想问下大佬们,这种处理方式是否合理,或者有没其他更好的解决方案?
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Re: 求助:实时计算累计UV时,为什么使用MapState和BloomFilter,在checkpoint时的状态大小没有差异

2020-08-27 文章 Congxian Qiu
Hi
Checkpoint 的 size 取决于 state 的大小(如果是 RocksDBStateBackend + 增量
checkpoint,界面看到的 Checkpointed Data Size 是增量大小[1])。如果你把 BloomFilter 存到 State
中的话,那么需要看看最终在 State 中存储的内容大小。
如果有疑问的话,可以分析一下分别使用 bloomfilter 和 mapstate,在相同的数据更新后,state 中的数据分别有多少

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/ops/state/state_backends.html#%E5%A2%9E%E9%87%8F%E5%BF%AB%E7%85%A7
Best,
Congxian


x <35907...@qq.com> 于2020年8月27日周四 下午1:48写道:

>
> 实时计算累计UV时,为什么使用MapState的方式和使用BloomFilter的方式,checkpoint时的状态大小没有差异,感觉bloom应该会比MapState小很多才对啊


Re: 回复: 流处理任务中checkpoint失败

2020-08-27 文章 Congxian Qiu
Hi
   从代码暂时没有看出问题,不确定 迭代 作业的 checkpoint 是否有特殊的地方,我抄送了一个对迭代这块更了解的人(Yun
Gao),或许他在这块有一些建议

Best,
Congxian


Yun Tang  于2020年8月27日周四 下午5:10写道:

> Hi Robert
>
> 你的两个source
> firstSource和secondSource是自己实现的么,一种怀疑是source在没有数据时持锁[1][2]导致checkpoint
> barrier并没有下发。
> 建议使用jstack查看在没有数据下发时,source相关task的java调用栈,观察是否存在等待锁释放
>
> [1]
> https://github.com/apache/flink/blob/162e3ead63e5766cf2116af09922a88537889e53/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java#L916
> [2]
> https://github.com/apache/flink/blob/162e3ead63e5766cf2116af09922a88537889e53/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTaskActionExecutor.java#L92
>
> 祝好
> 唐云
> 
> From: Robert.Zhang <173603...@qq.com>
> Sent: Wednesday, August 26, 2020 22:17
> To: user-zh 
> Subject: 回复: 流处理任务中checkpoint失败
>
> Hi
>
>
> 代码大致如下:
> DataStream broad=env.readFrom(...).broad;
> DataStream firstSource=env.readFrom(...);
> DataStream secondSource=env.readFrom(...);
>
>
> DataStream union=firstSource.union(secondSource);
> IterativeStream iterativeStream=union.keyby(...).process(...).iterate();
>
> DataStream result=iterativeStream.closeWith(
>  
>  
>  
>iterativeStream
>  
>  
>  
>.keyby(...)
>  
>  
>  
>.connect(broad)
>  
>  
>  
>.process(...));
> result.addSink(...);
>
>
> 是否是代码的书写上有问题呢?不胜感激,Thanks all
>
>
>
>
> --原始邮件--
> 发件人:
>   "user-zh"
> <
> qcx978132...@gmail.com;
> 发送时间:2020年8月26日(星期三) 晚上7:18
> 收件人:"user-zh"
> 主题:Re: 流处理任务中checkpoint失败
>
>
>
> Hi
>  按理说,数据和 barrier 没有依赖关系的,但从你的描述看,没有数据的时候,无法接受到
> barrier,或许你可以分享一下你的代码(可以把业务逻辑进行简化),或许大家可以帮你看看
> Best,
> Congxian
>
>
> Robert.Zhang <173603...@qq.com 于2020年8月26日周三 上午11:43写道:
>
>  Hi Congxian,
> 
>  开了更多日志观察了下,是由于iteration source的barrier无法接收到导致的。
>  该barrier无法接收到,导致下游也无法拿到由该operator传递的barrier。因此checkpoint一直无法成功。
>  比较奇怪的一点是,这是在我停止数据源发送的情况下出现的。我发送一条经过这个operator的数据,
>  那么从日志中看,立刻获得了barrier,checkpoint可以迅速完成。
> 
> 
> 
>  为何会出现这个情况呢?什么情况下流里面没有数据会导致无法checkpoint成功呢?
> 
> 
> 
> 
> 
>  ---原始邮件---
>  *发件人:* "Congxian Qiu"  *发送时间:* 2020年8月25日(周二) 下午5:33
>  *收件人:* "user-zh"  *主题:* Re: 流处理任务中checkpoint失败
> 
>  Hi
>  对于 checkpoint 超时失败的情况,需要看一下具体的原因,对于 source
> 没有完成的话,或许看一下相应并发(没完成 snapshot
>  的 source)的 CPU 占用情况,以及相应逻辑是否卡在哪里或许能看到一些线索。source 是收到 JM 的 rpc 后触发的
>  snapshot,所以这里相比其他的算子,不需要考虑 barrier 对齐的事情。
>  Best,
>  Congxian
> 
> 
>  Robert.Zhang <173603...@qq.com 于2020年8月25日周二 上午12:58写道:
> 
>   看了日志,是由于部分checkpoint 超时未完成,web界面上 iteration
> source的checkpoint始终无法完成。
>   官方文档对于在iterative
>  
> stream的checkpoint没有更详细的说明。对于loop中的数据丢失可以理解。但是checkpoint无法成功不是特别能理解。
>   按照我对于chandylamport算法的理解,上游operator的barrier应该是直接给到了下游
>   ,不应该存在无法拿到barrier的情况才对。不知道这是什么原因导致的
>  
>   ---原始邮件---
>   发件人: "Congxian Qiu"   发送时间: 2020年8月24日(周一) 晚上8:21
>   收件人: "user-zh"   主题: Re: 流处理任务中checkpoint失败
>  
>  
>   Hi
>   nbsp;nbsp; 从报错 ”Exceeded checkpoint tolerable failure
> threshold“ 看,你的
>   checkpoint
>   有持续报错,导致了作业失败,你需要找一下为什么 checkpoint 失败,或许这篇文章[1] 可以有一些帮助
>   nbsp;nbsp; 另外从配置看,你开启了 unalign
> checkpoint,这个是上述文章中暂时没有设计的地方。
>  
>   [1] https://zhuanlan.zhihu.com/p/87131964
>   Best,
>   Congxian
>  
>  
>   Robert.Zhang <173603...@qq.comgt; 于2020年8月21日周五 下午6:31写道:
>  
>   gt; Hello all,
>   gt; 目前遇到一个问题,在iterative stream job
>   gt; 使用checkpoint,按照文档进行了相应的配置,测试过程中checkpoint几乎无法成功
>   gt; 测试state
>  很小,只有几k,依然无法成功。会出现org.apache.flink.util.FlinkRuntimeException:
>   gt; Exceeded checkpoint tolerable failure threshold.的报错
>   gt;
>   gt;
>   gt; 配置如下:
>   gt; env.enableCheckpointing(1,
> CheckpointingMode.EXACTLY_ONCE,
>  true);
>   gt; CheckpointConfig checkpointConfig =
> env.getCheckpointConfig();
>   gt; checkpointConfig.setCheckpointTimeout(60);
>   gt; checkpointConfig.setMinPauseBetweenCheckpoints(6);
>   gt; checkpointConfig.setMaxConcurrentCheckpoints(4);
>   gt;
>   gt;
>  
> 
> checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
>   gt; checkpointConfig.setPreferCheckpointForRecovery(true);
>   gt; checkpointConfig.setTolerableCheckpointFailureNumber(2);
>   gt; checkpointConfig.enableUnalignedCheckpoints();
>   gt;
>   gt;
>   gt; 任务只处理几条数据,未存在反压的情况。有遇到类似问题的老哥吗?
> 
>


Re: flink stream sink hive

2020-08-27 文章 Congxian Qiu
Hi
   从异常看,可能是类冲突了,或许是有两个版本的 `org.apache.orc.TypeDescription` 依赖,可以排除或者 shade
一下相关的 class
Best,
Congxian


liya...@huimin100.cn  于2020年8月27日周四 下午8:18写道:

> Exception in thread "main" java.lang.NoSuchMethodError:
> org.apache.orc.TypeDescription.fromString(Ljava/lang/String;)Lorg/apache/orc/TypeDescription;
>
> --
> liya...@huimin100.cn
>
>
> *发件人:* liya...@huimin100.cn
> *发送时间:* 2020-08-27 19:09
> *收件人:* user-zh 
> *主题:* flink stream sink hive
> flink1.11.1 往hive2.1.1 的orc表写数据报的异常,在网上查不到,只能来这里了,麻烦大佬们帮我看看
> --
> liya...@huimin100.cn
>
>


Re: [ANNOUNCE] New PMC member: Dian Fu

2020-08-27 文章 Congxian Qiu
Congratulations Dian
Best,
Congxian


Xintong Song  于2020年8月27日周四 下午7:50写道:

> Congratulations Dian~!
>
> Thank you~
>
> Xintong Song
>
>
>
> On Thu, Aug 27, 2020 at 7:42 PM Jark Wu  wrote:
>
> > Congratulations Dian!
> >
> > Best,
> > Jark
> >
> > On Thu, 27 Aug 2020 at 19:37, Leonard Xu  wrote:
> >
> > > Congrats, Dian!  Well deserved.
> > >
> > > Best
> > > Leonard
> > >
> > > > 在 2020年8月27日,19:34,Kurt Young  写道:
> > > >
> > > > Congratulations Dian!
> > > >
> > > > Best,
> > > > Kurt
> > > >
> > > >
> > > > On Thu, Aug 27, 2020 at 7:28 PM Rui Li 
> wrote:
> > > >
> > > >> Congratulations Dian!
> > > >>
> > > >> On Thu, Aug 27, 2020 at 5:39 PM Yuan Mei 
> > > wrote:
> > > >>
> > > >>> Congrats!
> > > >>>
> > > >>> On Thu, Aug 27, 2020 at 5:38 PM Xingbo Huang 
> > > wrote:
> > > >>>
> > >  Congratulations Dian!
> > > 
> > >  Best,
> > >  Xingbo
> > > 
> > >  jincheng sun  于2020年8月27日周四 下午5:24写道:
> > > 
> > > > Hi all,
> > > >
> > > > On behalf of the Flink PMC, I'm happy to announce that Dian Fu is
> > now
> > > > part of the Apache Flink Project Management Committee (PMC).
> > > >
> > > > Dian Fu has been very active on PyFlink component, working on
> > various
> > > > important features, such as the Python UDF and Pandas
> integration,
> > > and
> > > > keeps checking and voting for our releases, and also has
> > successfully
> > > > produced two releases(1.9.3&1.11.1) as RM, currently working as
> RM
> > > to push
> > > > forward the release of Flink 1.12.
> > > >
> > > > Please join me in congratulating Dian Fu for becoming a Flink PMC
> > > > Member!
> > > >
> > > > Best,
> > > > Jincheng(on behalf of the Flink PMC)
> > > >
> > > 
> > > >>
> > > >> --
> > > >> Best regards!
> > > >> Rui Li
> > > >>
> > >
> > >
> >
>


Re: flink checkpoint导致反压严重

2020-08-26 文章 Congxian Qiu
Hi
对于开启 Checkpoint
之后导致反压的情况,如果希望在现在的基础上进行优化的话,则需要找到反压的原因是什么,可以尝试从最后一个被反压的算子开始排查,到底什么原因导致的,排查过程中,或许
Arthas[1] 可以有一些帮助
另外比较好奇的是,为什么反压会导致你的作业挂掉呢?作业挂掉的原因是啥呢
[1] https://github.com/alibaba/arthas
Best,
Congxian


Yun Tang  于2020年8月26日周三 上午11:25写道:

> Hi
>
> 对于已经改为at least once的checkpoint,其在checkpoint时对于作业吞吐的影响只有task
> 同步阶段的snapshot,这个时间段的snapshot由于与task的主线程的数据访问持有同一把锁,会影响主线程的数据处理。但是就算这样,我也很怀疑checkpoint本身并不是导致早上10点高峰期无法运行的罪魁祸首。
> 使用异步的,支持增量的state backend (例如RocksDBStateBackend)会大大缓解该问题。
> 建议排查思路:
>
>   1.  检查使用的state backend类型
>   2.  检查是否存在sync阶段checkpoint用时久的问题(可以观察WEB UI上的checkpoint部分,看sync阶段的耗时)
>   3.  借助jstack等工具,检查执行checkpoint时,TM上的task执行逻辑,判断是哪里消耗CPU
>
> 祝好
> 唐云
> 
> From: LakeShen 
> Sent: Wednesday, August 26, 2020 10:00
> To: user-zh 
> Subject: Re: flink checkpoint导致反压严重
>
> Hi zhanglachun,
>
> 你们使用 checkpoint 的方式是什么?是否有 full gc 的情况呢
>
> Best,
> LakeShen
>
> 徐骁  于2020年8月26日周三 上午2:10写道:
>
> > input
> >   .keyBy()
> >   .timeWindow()
> >   .reduce(new MyReduceFunction(), new MyProcessWindowFunction());
> >
> > 可以看一下官网的 reduce 用法, 可以在数据进入窗口的时候进行处理(排序), 而没必要等到 window 结束后再进行 这样可以把时间分散到
> > window 里面
> >
>


Re: 流处理任务中checkpoint失败

2020-08-26 文章 Congxian Qiu
Hi
   按理说,数据和 barrier 没有依赖关系的,但从你的描述看,没有数据的时候,无法接受到
barrier,或许你可以分享一下你的代码(可以把业务逻辑进行简化),或许大家可以帮你看看
Best,
Congxian


Robert.Zhang <173603...@qq.com> 于2020年8月26日周三 上午11:43写道:

> Hi Congxian,
>
> 开了更多日志观察了下,是由于iteration source的barrier无法接收到导致的。
> 该barrier无法接收到,导致下游也无法拿到由该operator传递的barrier。因此checkpoint一直无法成功。
> 比较奇怪的一点是,这是在我停止数据源发送的情况下出现的。我发送一条经过这个operator的数据,
> 那么从日志中看,立刻获得了barrier,checkpoint可以迅速完成。
>
>
>
> 为何会出现这个情况呢?什么情况下流里面没有数据会导致无法checkpoint成功呢?
>
>
>
>
>
> ---原始邮件---
> *发件人:* "Congxian Qiu"
> *发送时间:* 2020年8月25日(周二) 下午5:33
> *收件人:* "user-zh";
> *主题:* Re: 流处理任务中checkpoint失败
>
> Hi
>对于 checkpoint 超时失败的情况,需要看一下具体的原因,对于 source 没有完成的话,或许看一下相应并发(没完成 snapshot
> 的 source)的 CPU 占用情况,以及相应逻辑是否卡在哪里或许能看到一些线索。source 是收到 JM 的 rpc 后触发的
> snapshot,所以这里相比其他的算子,不需要考虑 barrier 对齐的事情。
> Best,
> Congxian
>
>
> Robert.Zhang <173603...@qq.com> 于2020年8月25日周二 上午12:58写道:
>
> > 看了日志,是由于部分checkpoint 超时未完成,web界面上 iteration source的checkpoint始终无法完成。
> > 官方文档对于在iterative
> > stream的checkpoint没有更详细的说明。对于loop中的数据丢失可以理解。但是checkpoint无法成功不是特别能理解。
> > 按照我对于chandylamport算法的理解,上游operator的barrier应该是直接给到了下游
> > ,不应该存在无法拿到barrier的情况才对。不知道这是什么原因导致的
> >
> > ---原始邮件---
> > 发件人: "Congxian Qiu" > 发送时间: 2020年8月24日(周一) 晚上8:21
> > 收件人: "user-zh" > 主题: Re: 流处理任务中checkpoint失败
> >
> >
> > Hi
> >  从报错 ”Exceeded checkpoint tolerable failure threshold“ 看,你的
> > checkpoint
> > 有持续报错,导致了作业失败,你需要找一下为什么 checkpoint 失败,或许这篇文章[1] 可以有一些帮助
> >  另外从配置看,你开启了 unalign checkpoint,这个是上述文章中暂时没有设计的地方。
> >
> > [1] https://zhuanlan.zhihu.com/p/87131964
> > Best,
> > Congxian
> >
> >
> > Robert.Zhang <173603...@qq.com 于2020年8月21日周五 下午6:31写道:
> >
> >  Hello all,
> >  目前遇到一个问题,在iterative stream job
> >  使用checkpoint,按照文档进行了相应的配置,测试过程中checkpoint几乎无法成功
> >  测试state
> 很小,只有几k,依然无法成功。会出现org.apache.flink.util.FlinkRuntimeException:
> >  Exceeded checkpoint tolerable failure threshold.的报错
> > 
> > 
> >  配置如下:
> >  env.enableCheckpointing(1, CheckpointingMode.EXACTLY_ONCE,
> true);
> >  CheckpointConfig checkpointConfig = env.getCheckpointConfig();
> >  checkpointConfig.setCheckpointTimeout(60);
> >  checkpointConfig.setMinPauseBetweenCheckpoints(6);
> >  checkpointConfig.setMaxConcurrentCheckpoints(4);
> > 
> > 
> >
> checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
> >  checkpointConfig.setPreferCheckpointForRecovery(true);
> >  checkpointConfig.setTolerableCheckpointFailureNumber(2);
> >  checkpointConfig.enableUnalignedCheckpoints();
> > 
> > 
> >  任务只处理几条数据,未存在反压的情况。有遇到类似问题的老哥吗?
>


Re: [ANNOUNCE] Apache Flink 1.10.2 released

2020-08-26 文章 Congxian Qiu
Thanks ZhuZhu for managing this release and everyone else who contributed
to this release!

Best,
Congxian


Xingbo Huang  于2020年8月26日周三 下午1:53写道:

> Thanks Zhu for the great work and everyone who contributed to this release!
>
> Best,
> Xingbo
>
> Guowei Ma  于2020年8月26日周三 下午12:43写道:
>
>> Hi,
>>
>> Thanks a lot for being the release manager Zhu Zhu!
>> Thanks everyone contributed to this!
>>
>> Best,
>> Guowei
>>
>>
>> On Wed, Aug 26, 2020 at 11:18 AM Yun Tang  wrote:
>>
>>> Thanks for Zhu's work to manage this release and everyone who
>>> contributed to this!
>>>
>>> Best,
>>> Yun Tang
>>> 
>>> From: Yangze Guo 
>>> Sent: Tuesday, August 25, 2020 14:47
>>> To: Dian Fu 
>>> Cc: Zhu Zhu ; dev ; user <
>>> u...@flink.apache.org>; user-zh 
>>> Subject: Re: [ANNOUNCE] Apache Flink 1.10.2 released
>>>
>>> Thanks a lot for being the release manager Zhu Zhu!
>>> Congrats to all others who have contributed to the release!
>>>
>>> Best,
>>> Yangze Guo
>>>
>>> On Tue, Aug 25, 2020 at 2:42 PM Dian Fu  wrote:
>>> >
>>> > Thanks ZhuZhu for managing this release and everyone else who
>>> contributed to this release!
>>> >
>>> > Regards,
>>> > Dian
>>> >
>>> > 在 2020年8月25日,下午2:22,Till Rohrmann  写道:
>>> >
>>> > Great news. Thanks a lot for being our release manager Zhu Zhu and to
>>> all others who have contributed to the release!
>>> >
>>> > Cheers,
>>> > Till
>>> >
>>> > On Tue, Aug 25, 2020 at 5:37 AM Zhu Zhu  wrote:
>>> >>
>>> >> The Apache Flink community is very happy to announce the release of
>>> Apache Flink 1.10.2, which is the first bugfix release for the Apache Flink
>>> 1.10 series.
>>> >>
>>> >> Apache Flink® is an open-source stream processing framework for
>>> distributed, high-performing, always-available, and accurate data streaming
>>> applications.
>>> >>
>>> >> The release is available for download at:
>>> >> https://flink.apache.org/downloads.html
>>> >>
>>> >> Please check out the release blog post for an overview of the
>>> improvements for this bugfix release:
>>> >> https://flink.apache.org/news/2020/08/25/release-1.10.2.html
>>> >>
>>> >> The full release notes are available in Jira:
>>> >>
>>> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12347791
>>> >>
>>> >> We would like to thank all contributors of the Apache Flink community
>>> who made this release possible!
>>> >>
>>> >> Thanks,
>>> >> Zhu
>>> >
>>> >
>>>
>>


Re: Flink运行时可以转移数据吗?

2020-08-26 文章 Congxian Qiu
Hi
   据我所知,在作业启动之后,是无法改变数据的分法规则的,也就是说没办法做到这个要求。
Best,
Congxian


Sun_yijia  于2020年8月26日周三 下午2:17写道:

> 在做反压相关的代码,想请教各位大佬。
>
>
> 有一个分支节点,分支后面有两个节点A和B。假设A节点出现了反压,B节点负载空闲。
> 我想让B节点帮A节点做一些计算,这样B节点就能够缓解一部分A节点的压力。
>
>
> 有什么方法能让Flink在运行过程中,把接下来要发给A节点的数据发送给B节点吗?


Re: 流处理任务中checkpoint失败

2020-08-25 文章 Congxian Qiu
Hi
   对于 checkpoint 超时失败的情况,需要看一下具体的原因,对于 source 没有完成的话,或许看一下相应并发(没完成 snapshot
的 source)的 CPU 占用情况,以及相应逻辑是否卡在哪里或许能看到一些线索。source 是收到 JM 的 rpc 后触发的
snapshot,所以这里相比其他的算子,不需要考虑 barrier 对齐的事情。
Best,
Congxian


Robert.Zhang <173603...@qq.com> 于2020年8月25日周二 上午12:58写道:

> 看了日志,是由于部分checkpoint 超时未完成,web界面上 iteration source的checkpoint始终无法完成。
> 官方文档对于在iterative
> stream的checkpoint没有更详细的说明。对于loop中的数据丢失可以理解。但是checkpoint无法成功不是特别能理解。
> 按照我对于chandylamport算法的理解,上游operator的barrier应该是直接给到了下游
> ,不应该存在无法拿到barrier的情况才对。不知道这是什么原因导致的
>
> ---原始邮件---
> 发件人: "Congxian Qiu" 发送时间: 2020年8月24日(周一) 晚上8:21
> 收件人: "user-zh" 主题: Re: 流处理任务中checkpoint失败
>
>
> Hi
>  从报错 ”Exceeded checkpoint tolerable failure threshold“ 看,你的
> checkpoint
> 有持续报错,导致了作业失败,你需要找一下为什么 checkpoint 失败,或许这篇文章[1] 可以有一些帮助
>  另外从配置看,你开启了 unalign checkpoint,这个是上述文章中暂时没有设计的地方。
>
> [1] https://zhuanlan.zhihu.com/p/87131964
> Best,
> Congxian
>
>
> Robert.Zhang <173603...@qq.com 于2020年8月21日周五 下午6:31写道:
>
>  Hello all,
>  目前遇到一个问题,在iterative stream job
>  使用checkpoint,按照文档进行了相应的配置,测试过程中checkpoint几乎无法成功
>  测试state 很小,只有几k,依然无法成功。会出现org.apache.flink.util.FlinkRuntimeException:
>  Exceeded checkpoint tolerable failure threshold.的报错
> 
> 
>  配置如下:
>  env.enableCheckpointing(1, CheckpointingMode.EXACTLY_ONCE, true);
>  CheckpointConfig checkpointConfig = env.getCheckpointConfig();
>  checkpointConfig.setCheckpointTimeout(60);
>  checkpointConfig.setMinPauseBetweenCheckpoints(6);
>  checkpointConfig.setMaxConcurrentCheckpoints(4);
> 
> 
> checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
>  checkpointConfig.setPreferCheckpointForRecovery(true);
>  checkpointConfig.setTolerableCheckpointFailureNumber(2);
>  checkpointConfig.enableUnalignedCheckpoints();
> 
> 
>  任务只处理几条数据,未存在反压的情况。有遇到类似问题的老哥吗?


Re: Flink任务写入kafka 开启了EOS,statebackend为rocksdb,任务反压source端日志量堆积从cp恢复失败

2020-08-24 文章 Congxian Qiu
Hi
   这个文件不存在的话,应该是这次 checkpoint 没有成功完成,这样从这次 checkpoint 恢复的时候是会失败的。现在社区暂时只支持
stop with savepoint,如果想从 checkpoint 恢复的话,只能够从之前生成的 checkpoint 恢复,如果
checkpoint 生成了有一段时间之后,重放的数据会有些多,之前社区有一个 issue FLINK-12619 尝试做 stop with
checkpoint(这样能够减少重放的数据),如果有需求的话,可以在 issue 上评论
Best,
Congxian


Yang Peng  于2020年8月19日周三 下午3:03写道:

>
> 感谢邱老师,这个我查看了一下没有这个文件的,跟现在运行的相同任务的正常执行的chk目录下的文件相比这个chk-167目录下的文件数少了很多,我们当时是看着cp执行完成之后cancel了任务然后
> 从hdfs上查到这个目录cp路径去重启的任务
>
> Congxian Qiu  于2020年8月19日周三 下午2:39写道:
>
> > Hi
> >1 图挂了
> > 2 你到 hdfs 上能找到 hdfs:*xx*/flink/checkpoints/
> > 7226f43179649162e6bae2573a952e60/chk-167/_metadata 这个文件吗?
> > Best,
> > Congxian
> >
> >
> > Yang Peng  于2020年8月17日周一 下午5:47写道:
> >
> > > 找到了 具体日志如下:2020-08-13 19:45:21,932 ERROR
> > > org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Fatal error
> > > occurred in the cluster entrypoint.
> > >
> > > org.apache.flink.runtime.dispatcher.DispatcherException: Failed to take
> > leadership with session id 98a2a688-266b-4929-9442-1f0b559ade43.
> > >   at
> >
> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$null$30(Dispatcher.java:915)
> > >   at
> >
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
> > >   at
> >
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
> > >   at
> >
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
> > >   at
> >
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
> > >   at
> >
> org.apache.flink.runtime.concurrent.FutureUtils$WaitingConjunctFuture.handleCompletedFuture(FutureUtils.java:691)
> > >   at
> >
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
> > >   at
> >
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
> > >   at
> >
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
> > >   at
> >
> java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561)
> > >   at
> >
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:739)
> > >   at
> >
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
> > >   at
> >
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:397)
> > >   at
> >
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:190)
> > >   at
> >
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
> > >   at
> >
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
> > >   at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
> > >   at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
> > >   at
> > scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
> > >   at akka.japi.pf
> > .UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
> > >   at
> > scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
> > >   at
> > scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> > >   at
> > scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> > >   at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
> > >   at
> akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
> > >   at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
> > >   at akka.actor.ActorCell.invoke(ActorCell.scala:561)
> > >   at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
> > >   at akka.dispatch.Mailbox.run(Mailbox.scala:225)
> > >   at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
> > >   at
> > akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> > >   at
> >
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> > >   at
> > akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> > >   at
> >
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> > > Caused by: java.lang.RuntimeException:
> > org.apache.flink.runtime.client.JobExecutionException: Could not set up
>

Re: 答复: Flink on Yarn 启动任务,任务重启后貌似taskmanager中会丢失lib中的jar文件

2020-08-24 文章 Congxian Qiu
Hi
   理论上第一次能启动,后续的 failover 也应该是可以正常恢复的。你这边是稳定复现吗?如果能够稳定复现的话,有可能是 bug
Best,
Congxian


xiao cai  于2020年8月20日周四 下午2:27写道:

> Hi:
> 感谢答复,确实是个思路。
>
> 不过个人感觉能够在启动第一个container的时候,将本地的lib中的jar文件上传到hdfs中,然后后续的failover的container能够统一从hdfs中获取,这样应该就不会有这个问题了。貌似社区在1.11版本针对jar的拷贝做了优化,我还在看这方面的内容。有进展再交流。
>
>
> Best,
> xiao cai
>
>
>  原始邮件
> 发件人: 范超
> 收件人: user-zh@flink.apache.org
> 发送时间: 2020年8月20日(周四) 09:11
> 主题: 答复: Flink on Yarn 启动任务,任务重启后貌似taskmanager中会丢失lib中的jar文件
>
>
> 我之前开启job的failover
> restart,结果也是发现yarn直接申请新的container且旧的container并没有进一步进行处理,一直导致你这样子的报错,旧的container没有绑定的task
> executor No TaskExecutor registered under containe_.
> 我这边干脆写了个脚本通过savepoint的方式来reload应用了 希望对你有帮助 -邮件原件- 发件人: xiao cai
> [mailto:flin...@163.com] 发送时间: 2020年8月19日 星期三 12:50 收件人: user-zh <
> user-zh@flink.apache.org> 主题: Flink on Yarn
> 启动任务,任务重启后貌似taskmanager中会丢失lib中的jar文件 如题:link on Yarn
> 启动任务,任务重启后貌似taskmanager中会丢失lib中的jar文件 我的任务时kafka source -> hbase sink
> 任务申请到新的container后,启动任务时会丢失原本存在的class文件,怀疑是重新申请的container中没有获取到lib中的资源,是否应该将lib中资源放到hdfs?该如何配置。
> Best xiao cai 错误堆栈: 2020-08-19 11:23:08,099 INFO
> org.apache.flink.yarn.YarnResourceManager [] - Received 1 containers.
> 2020-08-19 11:23:08,100 INFO org.apache.flink.yarn.YarnResourceManager [] -
> Received 1 containers with resource , 1 pending
> container requests. 2020-08-19 11:23:08,100 INFO
> org.apache.flink.yarn.YarnResourceManager [] - TaskExecutor
> container_e07_1596440446172_0094_01_69 will be started on 10.3.15.22
> with TaskExecutorProcessSpec {cpuCores=4.0, frameworkHeapSize=128.000mb
> (134217728 bytes), frameworkOffHeapSize=128.000mb (134217728 bytes),
> taskHeapSize=384.000mb (402653174 bytes), taskOffHeapSize=0 bytes,
> networkMemSize=128.000mb (134217730 bytes), managedMemorySize=512.000mb
> (536870920 bytes), jvmMetaspaceSize=256.000mb (268435456 bytes),
> jvmOverheadSize=192.000mb (201326592 bytes)}. 2020-08-19 11:23:08,101 INFO
> org.apache.flink.yarn.YarnResourceManager [] - Creating container launch
> context for TaskManagers 2020-08-19 11:23:08,101 INFO
> org.apache.flink.yarn.YarnResourceManager [] - Starting TaskManagers
> 2020-08-19 11:23:08,102 INFO org.apache.flink.yarn.YarnResourceManager [] -
> Removing container request Capability[]Priority[1].
> 2020-08-19 11:23:08,102 INFO org.apache.flink.yarn.YarnResourceManager [] -
> Accepted 1 requested containers, returned 0 excess containers, 0 pending
> container requests of resource . 2020-08-19
> 11:23:08,102 INFO
> org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl [] -
> Processing Event EventType: START_CONTAINER for Container
> container_e07_1596440446172_0094_01_69 2020-08-19 11:23:10,851 ERROR
> org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerDetailsHandler
> [] - Unhandled exception.
> org.apache.flink.runtime.resourcemanager.exceptions.UnknownTaskExecutorException:
> No TaskExecutor registered under
> container_e07_1596440446172_0094_01_68. at
> org.apache.flink.runtime.resourcemanager.ResourceManager.requestTaskManagerInfo(ResourceManager.java:560)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0] at
> sun.reflect.GeneratedMethodAccessor118.invoke(Unknown Source) ~[?:?] at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> ~[?:1.8.0_191] at java.lang.reflect.Method.invoke(Method.java:498)
> ~[?:1.8.0_191] at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:284)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0] at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0] at
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0] at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0] at 
> akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0] at 
> akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0] at
> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0] at 
> akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0] at
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0] at
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0] at
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0] at
> akka.actor.Actor$class.aroundReceive(Actor.scala:517)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0] at
> akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0] at
> akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0] at
> 

Re: 增量che ckpoint

2020-08-24 文章 Congxian Qiu
Hi
  分享一篇讲解增量 checkpoint 的文章[1]

[1]
https://ververica.cn/developers/manage-large-state-incremental-checkpoint/
Best,
Congxian


Yun Tang  于2020年8月21日周五 上午12:09写道:

> Hi
>
> 增量checkpoint与web界面的信息其实没有直接联系,增量checkpoint的信息记录由CheckpointCoordinator中的SharedStateRegistry[1]
> 进行计数管理,而保留多少checkpoint则由 CheckpointStore管理 [2].
> 保留2个checkpoint的执行过程如下:
> chk-1 completed --> register chk-1 in state registry --> add to checkpoint
> store
> chk-2 completed --> register chk-2 in state registry --> add to checkpoint
> store
> chk-3 completed --> register chk-3 in state registry --> add to checkpoint
> store --> chk-1 subsumed --> unregister chk-1 in state registry --> discard
> state with reference=0
> chk-4 completed --> register chk-4 in state registry --> add to checkpoint
> store --> chk-2 subsumed --> unregister chk-2 in state registry --> discard
> state with reference=0
>
> 从上面可以看懂整个执行流程,所以当chk-3
> 仍然有部分数据依赖chk-1时,那些state数据在unregister时,其计数统计并不会降为0,也就不会删掉,也不需要copy到本次中。
>
>
> [1]
> https://github.com/apache/flink/blob/f8ce30a50b8dd803d4476ea5d83e7d48708d54fa/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java#L192
> [2]
> https://github.com/apache/flink/blob/f8ce30a50b8dd803d4476ea5d83e7d48708d54fa/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java#L41
>
> 祝好
> 唐云
>
>
> 
> From: 赵一旦 
> Sent: Thursday, August 20, 2020 10:50
> To: user-zh@flink.apache.org 
> Subject: Re: 增量che ckpoint
>
> 等其他人正解。下面是我的猜测:
> 保留2个检查点是web界面保留2个检查点,增量情况下,这2个检查点所有引用到的所有历史检查点肯定都不会被删除。
> 因此第3个检查点的时候,只有2,3检查点仍然引用了1,则1就不会被删除。
>
> superainbower  于2020年8月20日周四 上午10:46写道:
>
> > hi,请教大家一个问题,开启了增量checkpoint,同时checkpoint的个数设置为只保留2个,那么如果当前是第三次checkpoint
> > 仍然依赖第一次的checkpoint会出现什么情况,会把第一次的copy过来到本次中吗?如过第一次不删除,不是会不满足保留2个的限制吗
>


Re: 关于flink 读取 jdbc报错详情,序列化报错

2020-08-24 文章 Congxian Qiu
Hi
   从报错看 CountDownLatch 这个方法无法 serializable,这个 class 没有实现  Serializable
接口。你可以按照这里的方法[1] 尝试解决下

[1]
https://stackoverflow.com/questions/4551926/java-io-notserializableexception-while-writing-serializable-object-to-external-s/4552014
Best,
Congxian


引领  于2020年8月24日周一 下午3:34写道:

>
> 使用场景:FLink 1.11.1
> 读取mysql,一直报序列化错误,但感觉需要序列化的bean对象已经序列化,实在百思不得其解,前来求教各位大佬!!! 代码再附件!!!
>
> Exception in thread "main"
> org.apache.flink.api.common.InvalidProgramException:
> java.util.concurrent.CountDownLatch@45ca843[Count = 2] is not
> serializable. The object probably contains or references non serializable
> fields.
> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:151)
> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:71)
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1901)
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1614)
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1571)
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1553)
> at com.hsq.APP.main(APP.java:43)
> Caused by: java.io.NotSerializableException:
> java.util.concurrent.CountDownLatch
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
> at
> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:586)
> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:133)
> ... 9 more
>
> 引领
> yrx73...@163.com
>
> 
> 签名由 网易邮箱大师  定制
>
>


Re: flink taskmanager 因为内存超了container限制 被yarn kill 问题定位

2020-08-24 文章 Congxian Qiu
Hi
   比较好奇你为什么在 Blink 分支做测试,而不是用最新的 1.11 做测试呢?
Best,
Congxian


柯四海 <2693711...@qq.com> 于2020年8月24日周一 下午5:58写道:

> Hi 大家好,
> 我用github上Blink分支(1.5)编译的flink来运行一些实时任务,发现Taskmanager
> 因为内存超了container限制被yarn kill.
> 有没有人有比较好的问题定位方案?
>
> 尝试过但是还没有解决问题的方法:
>   1. 尝试增加taskmanager内存
> 修改: 从8G 提高到 36G, state back  从fileSystem 改为RocksDB.
> 现象:taskmanager运行时间增加了好几个小时,但是还是因为内存超了被yarn kill.
>   2. dump taskmanager 堆栈,查看什么对象占用大量内存
>操作: jmap -dump 
>现象: 还没有dump结束,taskmanager就因为没有heartbeat 被主动kill.
> (尝试过修改heartbeat时间,还是无果)
>   3. 借用官网debug方式,如下,但是没有dump出文件.
>4. 设置containerized.heap-cutoff-ratio,希望触发 oom 从而产生dump文件,但是这个参数似乎不起作用.
>


Re: 流处理任务中checkpoint失败

2020-08-24 文章 Congxian Qiu
Hi
   从报错 ”Exceeded checkpoint tolerable failure threshold“ 看,你的 checkpoint
有持续报错,导致了作业失败,你需要找一下为什么 checkpoint 失败,或许这篇文章[1] 可以有一些帮助
   另外从配置看,你开启了 unalign checkpoint,这个是上述文章中暂时没有设计的地方。

[1] https://zhuanlan.zhihu.com/p/87131964
Best,
Congxian


Robert.Zhang <173603...@qq.com> 于2020年8月21日周五 下午6:31写道:

> Hello all,
> 目前遇到一个问题,在iterative stream job
> 使用checkpoint,按照文档进行了相应的配置,测试过程中checkpoint几乎无法成功
> 测试state 很小,只有几k,依然无法成功。会出现org.apache.flink.util.FlinkRuntimeException:
> Exceeded checkpoint tolerable failure threshold.的报错
>
>
> 配置如下:
> env.enableCheckpointing(1, CheckpointingMode.EXACTLY_ONCE, true);
> CheckpointConfig checkpointConfig = env.getCheckpointConfig();
> checkpointConfig.setCheckpointTimeout(60);
> checkpointConfig.setMinPauseBetweenCheckpoints(6);
> checkpointConfig.setMaxConcurrentCheckpoints(4);
>
> checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
> checkpointConfig.setPreferCheckpointForRecovery(true);
> checkpointConfig.setTolerableCheckpointFailureNumber(2);
> checkpointConfig.enableUnalignedCheckpoints();
>
>
> 任务只处理几条数据,未存在反压的情况。有遇到类似问题的老哥吗?


Re: 有没有可能使用tikv作为flink 分布式的backend

2020-08-24 文章 Congxian Qiu
Hi
   StateBackend 可以理解为 一个 KV 存储加上一个 snapshot 过程,其中 snapshot 过程负责将当前 KV
存储的数据进行备份。理论上任何的 KV 存储都是有可能作为 StateBackend 的,不过增加一种 StateBackend 的话,需要实现相应的
snapshot/restore 逻辑。

   但是在多个 Flink 作业中实现共享的 state 这个在 Flink 中是不支持的。
Best,
Congxian


wxpcc  于2020年8月21日周五 下午6:33写道:

> 项目里有部分需要进行状态共享的需求,多个flink 任务之间
>
> 如题,tikv本身基于rocksdb 是否有可能扩展成为分布式 backend
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Flink任务写入kafka 开启了EOS,statebackend为rocksdb,任务反压source端日志量堆积从cp恢复失败

2020-08-19 文章 Congxian Qiu
  Please try to load the checkpoint/savepoint directly from the metadata file 
> instead of the directory.
>   at 
> org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorage.resolveCheckpointPointer(AbstractFsCheckpointStorage.java:258)
>   at 
> org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorage.resolveCheckpoint(AbstractFsCheckpointStorage.java:110)
>   at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1129)
>   at 
> org.apache.flink.runtime.scheduler.LegacyScheduler.tryRestoreExecutionGraphFromSavepoint(LegacyScheduler.java:237)
>   at 
> org.apache.flink.runtime.scheduler.LegacyScheduler.createAndRestoreExecutionGraph(LegacyScheduler.java:196)
>   at 
> org.apache.flink.runtime.scheduler.LegacyScheduler.(LegacyScheduler.java:176)
>   at 
> org.apache.flink.runtime.scheduler.LegacySchedulerFactory.createInstance(LegacySchedulerFactory.java:70)
>   at 
> org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:275)
>   at 
> org.apache.flink.runtime.jobmaster.JobMaster.(JobMaster.java:265)
>   at 
> org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:98)
>   at 
> org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:40)
>   at 
> org.apache.flink.runtime.jobmaster.JobManagerRunner.(JobManagerRunner.java:146)
>   ... 10 more
> 2020-08-13 19:45:21,941 INFO  org.apache.flink.runtime.blob.BlobServer
>   - Stopped BLOB server at 0.0.0.0:39267
>
>
> 上面日志提示hdfs上cp文件找不到但是我在hdfs目录上查找能够发现这个cp文件是存在的 而且里面有子文件
>
> [image: IMG20200817_174506.png]
>
>
> Congxian Qiu  于2020年8月17日周一 上午11:36写道:
>
>> Hi
>>JM/TM 日志如果是 OnYarn 模式,且开了了 log aggreagte 的话[1],应该是能够获取到这个日志的。
>>据我所知,暂时没有已知问题会导致增量 checkpoint 不能恢复,如果你遇到的问题确定会导致 增量 checkpoint
>> 恢复失败的话,可以考虑创建一个 Issue
>>
>> [1]
>>
>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/yarn_setup.html#log-files
>> Best,
>> Congxian
>>
>>
>> Yang Peng  于2020年8月17日周一 上午11:22写道:
>>
>> >
>> 在我们自研的开发平台上提交任务用的detach模式,提交完之后就看不到其他日志了,这个问题当天出现了两次,是不是使用增量cp会存在这个恢复失败的情况
>> >
>> > Congxian Qiu  于2020年8月17日周一 上午10:39写道:
>> >
>> > > Hi
>> > >你还有失败作业的 JM 和 TM
>> > > 日志吗?如果有的话可以看一下这两个日志来确定为什么没有恢复成功。因为你说代码未作任何改变,然后恢复失败,这个还是比较奇怪的。
>> > > Best,
>> > > Congxian
>> > >
>> > >
>> > > Yang Peng  于2020年8月17日周一 上午10:25写道:
>> > >
>> > > > 好的 感谢
>> > > >
>> > > > JasonLee <17610775...@163.com> 于2020年8月14日周五 下午9:22写道:
>> > > >
>> > > > > hi
>> > > > >
>> > > > > 没有日志不太好定位失败的原因 但是没有设置uid的话 是有可能重启失败的 建议还是都设置uid最好
>> > > > >
>> > > > >
>> > > > >
>> > > > > --
>> > > > > Sent from: http://apache-flink.147419.n8.nabble.com/
>> > > >
>> > >
>> >
>>
>


Re: task传输数据时反序列化失败

2020-08-18 文章 Congxian Qiu
Hi
从栈看应该是 deserialize 的时候出错了,另外 kryo 可以,Pojo 不行,能否检查下,是否满足 POJO  的一些要求[1]呢?

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/state/schema_evolution.html#pojo-types
Best,
Congxian


shizk233  于2020年8月18日周二 下午4:09写道:

> Hi all,
>
> 请教一下反序列化的问题,我有一个KeyedCoProcessFunction,输入是log流和rule流。
> 数据流如下:
> logSource
> .connect(ruleSource)
> .keyby(...)
> .process(My KeyedCoProcessFunction<>)
> .keyby(...)
> .print()
>
> 其中CoProcess函数中有两个MapState分别做log缓存和rule缓存。
> 结构为Map> logState,Map> ruleState.
> T在实例化函数时确定,为MyLog类型。
>
> 运行时遇到了如下错误,看样子似乎是在下游算子反序列化数据时的异常,想请教一下这个错误产生的原因是什么?
>
> 补充说明:Rule使用POJO 序列化器会产生该异常,但使用Kyro序列化器时则不就产生。
>
> Caused by: java.lang.IllegalArgumentException: Can not set java.lang.String
> field org.example.vo.Rule.dimension to org.example.vo.MyLog
> at
>
> sun.reflect.UnsafeFieldAccessorImpl.throwSetIllegalArgumentException(UnsafeFieldAccessorImpl.java:167)
> at
>
> sun.reflect.UnsafeFieldAccessorImpl.throwSetIllegalArgumentException(UnsafeFieldAccessorImpl.java:171)
> at
>
> sun.reflect.UnsafeFieldAccessorImpl.ensureObj(UnsafeFieldAccessorImpl.java:58)
> at
>
> sun.reflect.UnsafeObjectFieldAccessorImpl.set(UnsafeObjectFieldAccessorImpl.java:75)
> at java.lang.reflect.Field.set(Field.java:764)
> at
>
> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.initializeFields(PojoSerializer.java:209)
> at
>
> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:390)
> at
>
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:151)
> at
>
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:37)
> at
>
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:202)
> at
>
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
> at
>
> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
> at
> org.apache.flink.runtime.io
> .network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:106)
> at
> org.apache.flink.streaming.runtime.io
> .StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:121)
>


Re: flink1.9.1用采用-d(分离模式提交)作业报错,但是不加-d是可以正常跑的

2020-08-18 文章 Congxian Qiu
Hi
1.9 上是否加 -d 应该会使用不同的模式来启动作业 (perjob 还是
session),这两个模式下的行为应该是不完全一致的,具体的可以看下这里[1]

[1]
https://github.com/apache/flink/blob/5125b1123dfcfff73b5070401dfccb162959080c/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java#L211

Best,
Congxian


bradyMk  于2020年8月19日周三 上午10:54写道:

> 万分感谢!
> 问题已经解决,确实是包的问题,我很傻的以为不加-d可以运行,那就跟包没关系。
> 所以说加不加-d,应该是调用不同包的不同方法吧?
>
>
>
> -
> Best Wishes
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink sql 如何指定之前的checkpoint执行

2020-08-18 文章 Congxian Qiu
Hi
如果你的算子有改变的话,想从 checkpoint/savepoint 恢复,需要添加
`--allowNonRestoredState`,这样可以忽略掉那些不在新
job 中的算子(就算逻辑一样,uid 不一样也会被忽略掉的),具体的可以参考文档[1]

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/state/savepoints.html#what-happens-if-i-delete-an-operator-that-has-state-from-my-job

Best,
Congxian


JasonLee <17610775...@163.com> 于2020年8月19日周三 上午11:29写道:

> hi
> 可以参考这篇文章https://mp.weixin.qq.com/s/Vl6_GsGeG0dK84p9H2Ld0Q
> 在cancel的时候触发一个savepoint 修改完SQL从savepoint恢复任务
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: Flink on Yarn 启动任务,任务重启后貌似taskmanager中会丢失lib中的jar文件

2020-08-18 文章 Congxian Qiu
Hi
 你的 Flink 是哪个版本,期望的行为是什么样的?
 从你给的日志看,是没有  这个 class,这个  是在你放到 lib 下的某个 jar
包里面吗?另外你这个作业第一次运行的时候失败,还是中间中间 failover 之后恢复回来的时候失败呢?
Best,
Congxian


xiao cai  于2020年8月19日周三 下午12:50写道:

> 如题:link on Yarn 启动任务,任务重启后貌似taskmanager中会丢失lib中的jar文件
>
>
> 我的任务时kafka source -> hbase sink
>
>
>
> 任务申请到新的container后,启动任务时会丢失原本存在的class文件,怀疑是重新申请的container中没有获取到lib中的资源,是否应该将lib中资源放到hdfs?该如何配置。
>
>
> Best
> xiao cai
>
>
> 错误堆栈:
> 2020-08-19 11:23:08,099 INFO  org.apache.flink.yarn.YarnResourceManager
> [] - Received 1 containers.
> 2020-08-19 11:23:08,100 INFO  org.apache.flink.yarn.YarnResourceManager
> [] - Received 1 containers with resource  vCores:4>, 1 pending container requests.
> 2020-08-19 11:23:08,100 INFO  org.apache.flink.yarn.YarnResourceManager
> [] - TaskExecutor
> container_e07_1596440446172_0094_01_69 will be started on 10.3.15.22
> with TaskExecutorProcessSpec {cpuCores=4.0, frameworkHeapSize=128.000mb
> (134217728 bytes), frameworkOffHeapSize=128.000mb (134217728 bytes),
> taskHeapSize=384.000mb (402653174 bytes), taskOffHeapSize=0 bytes,
> networkMemSize=128.000mb (134217730 bytes), managedMemorySize=512.000mb
> (536870920 bytes), jvmMetaspaceSize=256.000mb (268435456 bytes),
> jvmOverheadSize=192.000mb (201326592 bytes)}.
> 2020-08-19 11:23:08,101 INFO  org.apache.flink.yarn.YarnResourceManager
> [] - Creating container launch context for TaskManagers
> 2020-08-19 11:23:08,101 INFO  org.apache.flink.yarn.YarnResourceManager
> [] - Starting TaskManagers
> 2020-08-19 11:23:08,102 INFO  org.apache.flink.yarn.YarnResourceManager
> [] - Removing container request Capability[ vCores:4>]Priority[1].
> 2020-08-19 11:23:08,102 INFO  org.apache.flink.yarn.YarnResourceManager
> [] - Accepted 1 requested containers, returned 0 excess
> containers, 0 pending container requests of resource  vCores:4>.
> 2020-08-19 11:23:08,102 INFO
> org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl [] -
> Processing Event EventType: START_CONTAINER for Container
> container_e07_1596440446172_0094_01_69
> 2020-08-19 11:23:10,851 ERROR
> org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerDetailsHandler
> [] - Unhandled exception.
> org.apache.flink.runtime.resourcemanager.exceptions.UnknownTaskExecutorException:
> No TaskExecutor registered under container_e07_1596440446172_0094_01_68.
> at
> org.apache.flink.runtime.resourcemanager.ResourceManager.requestTaskManagerInfo(ResourceManager.java:560)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> at sun.reflect.GeneratedMethodAccessor118.invoke(Unknown Source) ~[?:?]
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> ~[?:1.8.0_191]
> at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_191]
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:284)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> at
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> at akka.actor.ActorCell.invoke(ActorCell.scala:561)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> at akka.dispatch.Mailbox.run(Mailbox.scala:225)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> [flink-dist_2.11-1.11.0.jar:1.11.0]
> at
> 

Re: flink1.9.1用采用-d(分离模式提交)作业报错,但是不加-d是可以正常跑的

2020-08-17 文章 Congxian Qiu
Hi
   像我之前说的那样,加 -d 和不加 -d 使用的是不同的模式启动作业的。从你的报错栈来看,应该是类冲突了。你可以看下这个文档[1] 看看能否帮助你
java.lang.NoSuchMethodError:
org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest.newInstance(IFLjava/util/List;Ljava/util/List;Ljava/util/List;Lorg/apache/hadoop/yarn/api/records/ResourceBlacklistRequest;)Lorg/apache/hadoop/yarn/api/protocolrecords/AllocateRequest;
at
org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl.allocate(AMRMClientImpl.java:279)
at
org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl$HeartbeatThread.run(AMRMClientAsyncImpl.java:273)


[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.9/monitoring/debugging_classloading.html
Best,
Congxian


bradyMk  于2020年8月17日周一 下午2:36写道:

> 您好:
>
> 我没有尝试过新版本,但是觉得好像不是版本的问题,因为我其他所有flink作业加上-d都能正常运行,就这个不行,并且如果我不用(-d)提交,这个也是可以运行的。我也很奇怪
>
>
>
> -
> Best Wishes
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Re: 在sink 执行notifyCheckpointComplete 方法时能否收到上游 snapshot collect 发送的数据?

2020-08-17 文章 Congxian Qiu
Hi
notifyCheckpointComplete 是整个 checkpoint 完成后调用的(也就是所有算子都做完了 snapshot,而且
JM 也做完了一些其他的工作),你的需求看上去只是要在算子间做一些顺序操作,这个应该不需要依赖 notifyCheckpointComplete
的,你可以自己写一个逻辑,在 submit 收集到 N 个信号后再做相应的事情。
Best,
Congxian


key lou  于2020年8月17日周一 上午11:42写道:

> 谢谢 解答。也就是假如 A->B 这样一个 graph。在一次checkpoint 中 A 调用  snapshot 往下游发的数据,在B 执行
> notifyCheckpointComplete 与 Asnapshot 下发的数据到达B   这2者没有必然的先后顺序。
>
> 另外就是 如果没有先后顺序,有没有什么办法 或者是在 B执行 某某操作前 能确保 这次 checkpoint 中 A  snapshot  发出的数据
> 到达了B.
>
>  我的场景是 有3个核心算子  start->proccess->submit . 其中 start和 submit 并行度为1, proccess
> 并行度为N, start  会开启一个事务 编号proccess  用这个事务 编号
> 去做预处理(赞一批处理一次,并把这一次处理结果下发,给下游做事务提交),  submit  收到上游批处理的结果 用 同样的事务编号去提交
>
>
> Congxian Qiu  于2020年8月17日周一 上午10:42写道:
>
> > Hi
> > 上游 snapshot 的逻辑和下游收到之前的 notifyCheckpointComplete
> > 之间是没有必然联系的,所以这个从理论上是不保证先后顺序的。
> > Best,
> > Congxian
> >
> >
> > key lou  于2020年8月16日周日 下午9:27写道:
> >
> > > 各位大佬:
> > >在如下代码中: FCombine  执行snapshot  collect 发送数据之后如果不执行sleep 则  FSubmit
> > > 在执行 notifyCheckpointComplete 方法时,list 集合 ls 为空。
> > > 如果在  FCombine  执行snapshot  collect 发送数据之后如果执行sleep,
> > > 在执行 notifyCheckpointComplete 方法时 则就可以收到  snapshot  collect 发送的数据。
> > > 我之前的理解是每个算子在执行完checkpoint 之后 才会把 barrier 广播到下游算子。 所以觉得下游无论如何应该在执行
> > > notifyCheckpointComplete 之前就会收到 上游 snapshot  collect 发送数据(因为 snapshot
> > >  collect 在前,广播 barrier  在后,然后下游在收到了 barrier  才会执行 chekpoint
> > > 的相关方法,所以在执行相关方法前 上游 snapshot  collect 发出的数据就应该已经到达了下游)。
> > > 但是根据如下代码的测试来看,不是这样的。请大佬帮忙解答下原因。
> > >
> > > public class FlinkCheckpointTest {
> > > public static void main(String[] args) throws Exception {
> > > StreamExecutionEnvironment steamEnv =
> > > StreamExecutionEnvironment.getExecutionEnvironment();
> > > steamEnv.enableCheckpointing(1000L*2);
> > > steamEnv
> > > .addSource(new FSource()).setParallelism(4)
> > > .transform("开始事务", Types.STRING,new
> > FStart()).setParallelism(1)
> > > .process(new FCombine()).name("事务预处理").setParallelism(4)
> > > .addSink(new FSubmit()).name("提交事务").setParallelism(1)
> > > ;
> > > steamEnv.execute("test");
> > > }
> > >
> > >static class FSource extends RichParallelSourceFunction{
> > > @Override
> > > public void run(SourceContext sourceContext) throws
> > > Exception {
> > > int I =0;
> > > while (true){
> > > I = I + 1;
> > > sourceContext.collect("thread " +
> > > Thread.currentThread().getId() +"-" +I);
> > > Thread.sleep(1000);
> > > }
> > > }
> > > @Override
> > > public void cancel() {}
> > > }
> > >
> > > static class FStart extends AbstractStreamOperator
> > > implements OneInputStreamOperator{
> > >volatile Long ckid = 0L;
> > > @Override
> > > public void processElement(StreamRecord streamRecord)
> > > throws Exception {
> > > log("收到数据: " + streamRecord.getValue() + "..ckid:" + ckid);
> > > output.collect(streamRecord);
> > > }
> > > @Override
> > > public void prepareSnapshotPreBarrier(long checkpointId)
> > > throws Exception {
> > > log("开启事务: " + checkpointId);
> > > ckid = checkpointId;
> > > super.prepareSnapshotPreBarrier(checkpointId);
> > > }
> > > }
> > >
> > > static class FCombine extends ProcessFunction
> > > implements CheckpointedFunction {
> > > List ls = new ArrayList();
> > > Collector collector =null;
> > > volatile Long ckid = 0L;
> > >
> > > @Override
> > > public void snapshotState(FunctionSnapshotContext
> > > functionSnapshotContext) throws Exception {
> > > StringBuffer sb = new StringBuffer();
> > > ls.forEach(x->{sb.append(x).append(";");});
> > > log("批处理 " + functionSnapshotContext.getCheckpointId() +
> > > ": 时收到数据:" + sb.toString());
> > > Thread.sleep(5*1000);
> > > collector.collect(sb.toString());
> > > ls.clea

Re: Flink任务写入kafka 开启了EOS,statebackend为rocksdb,任务反压source端日志量堆积从cp恢复失败

2020-08-16 文章 Congxian Qiu
Hi
   JM/TM 日志如果是 OnYarn 模式,且开了了 log aggreagte 的话[1],应该是能够获取到这个日志的。
   据我所知,暂时没有已知问题会导致增量 checkpoint 不能恢复,如果你遇到的问题确定会导致 增量 checkpoint
恢复失败的话,可以考虑创建一个 Issue

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/yarn_setup.html#log-files
Best,
Congxian


Yang Peng  于2020年8月17日周一 上午11:22写道:

> 在我们自研的开发平台上提交任务用的detach模式,提交完之后就看不到其他日志了,这个问题当天出现了两次,是不是使用增量cp会存在这个恢复失败的情况
>
> Congxian Qiu  于2020年8月17日周一 上午10:39写道:
>
> > Hi
> >你还有失败作业的 JM 和 TM
> > 日志吗?如果有的话可以看一下这两个日志来确定为什么没有恢复成功。因为你说代码未作任何改变,然后恢复失败,这个还是比较奇怪的。
> > Best,
> > Congxian
> >
> >
> > Yang Peng  于2020年8月17日周一 上午10:25写道:
> >
> > > 好的 感谢
> > >
> > > JasonLee <17610775...@163.com> 于2020年8月14日周五 下午9:22写道:
> > >
> > > > hi
> > > >
> > > > 没有日志不太好定位失败的原因 但是没有设置uid的话 是有可能重启失败的 建议还是都设置uid最好
> > > >
> > > >
> > > >
> > > > --
> > > > Sent from: http://apache-flink.147419.n8.nabble.com/
> > >
> >
>


Re: flink1.9.1用采用-d(分离模式提交)作业报错,但是不加-d是可以正常跑的

2020-08-16 文章 Congxian Qiu
Hi
   如果我理解没错的话,是否添加 -d 会使用不同的模式启动作业(PerJob 和 Session
模式),从错误栈来看猜测是版本冲突了导致的,你有尝试过最新的 1.11 是否还有这个问题吗?
Best,
Congxian


bradyMk  于2020年8月14日周五 下午6:52写道:

> 请问大家:
> 我采用如下命令提交:
> flink run \
> -m yarn-cluster \
> -yn 3 \
> -ys 3 \
> -yjm 2048m \
> -ytm 2048m \
> -ynm flink_test \
> -d \
> -c net.realtime.app.FlinkTest ./hotmall-flink.jar
> 就会失败,报错信息如下:
> [AMRM Callback Handler Thread] ERROR
> org.apache.flink.yarn.YarnResourceManager - Fatal error occurred in
> ResourceManager.
> java.lang.NoSuchMethodError:
>
> org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest.newInstance(IFLjava/util/List;Ljava/util/List;Ljava/util/List;Lorg/apache/hadoop/yarn/api/records/ResourceBlacklistRequest;)Lorg/apache/hadoop/yarn/api/protocolrecords/AllocateRequest;
> at
>
> org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl.allocate(AMRMClientImpl.java:279)
> at
>
> org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl$HeartbeatThread.run(AMRMClientAsyncImpl.java:273)
> [AMRM Callback Handler Thread] ERROR
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Fatal error
> occurred
> in the cluster entrypoint.
> java.lang.NoSuchMethodError:
>
> org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest.newInstance(IFLjava/util/List;Ljava/util/List;Ljava/util/List;Lorg/apache/hadoop/yarn/api/records/ResourceBlacklistRequest;)Lorg/apache/hadoop/yarn/api/protocolrecords/AllocateRequest;
> at
>
> org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl.allocate(AMRMClientImpl.java:279)
> at
>
> org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl$HeartbeatThread.run(AMRMClientAsyncImpl.java:273)
> [flink-akka.actor.default-dispatcher-2] INFO
> org.apache.flink.yarn.YarnResourceManager - ResourceManager
> akka.tcp://flink@emr-worker-8.cluster-174460:33650/user/resourcemanager
> was
> granted leadership with fencing token 
> [BlobServer shutdown hook] INFO org.apache.flink.runtime.blob.BlobServer -
> Stopped BLOB server at 0.0.0.0:36247
> <
> http://apache-flink.147419.n8.nabble.com/file/t802/%E6%8D%95%E8%8E%B7.png>
>
> 但是我在提交命令时,不加-d,就可以正常提交运行;更奇怪的是,我运行另一个任务,加了-d参数,可以正常提交。
> 我这个提交失败的任务开始是用如下命令运行的:
> nohup flink run \
> -m yarn-cluster \
> -yn 3 \
> -ys 3 \
> -yjm 2048m \
> -ytm 2048m \
> -ynm flink_test \
> -c net.realtime.app.FlinkTest ./hotmall-flink.jar > /logs/flink.log 2>&1 &
>  > /logs/nohup.out 2>&1 &
>
> 在这个任务挂掉之后,再用-d的方式重启就会出现我开始说的问题,很奇怪,有大佬知道为什么么?
>
>
>
> -
> Best Wishes
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: 请教关于Flink算子FINISHED状态时无法保存Checkpoint的问题

2020-08-16 文章 Congxian Qiu
这个问题和下面这个问题[1] 重复了,在另外的邮件列表中已经有相关讨论

[1]
http://apache-flink.147419.n8.nabble.com/Flink-FINISHED-Checkpoint-td6008.html
Best,
Congxian


yulu yang  于2020年8月14日周五 下午1:05写道:

> 对了,我这个flink作业和和分组都是新创建,不存在抽取历史。
>
> 杨豫鲁  于2020年8月13日周四 下午3:33写道:
>
> > 请教大家一个我最近在配置Flink流的过程中遇到问题,
> >
> >
> flink作业中关联使用了物理表(字典表),在flinkjob启动后,会对字典表进行一次读取,然后该算子会变成FINISHED状态,导致该flinkjob无法保存checkpoint和savepoint。一般大家遇到这种问题都是怎么处理的,我这个作业在数据加工过程中必须用到字典表赋值。
> >
> >
> >
> >
> >
>


Re: 请教关于Flink算子FINISHED状态时无法保存Checkpoint的问题

2020-08-16 文章 Congxian Qiu
Hi  吴磊
请问你们有比较过使用 Redis 和 broadcast state 在你们场景下的区别吗?是什么原因让你们选择 Redis 而不是
BroadcastState 呢?

Best,
Congxian


吴磊  于2020年8月14日周五 下午3:39写道:

> 在我们的生产环境最常用的做法都是通过维表关联的方式进行赋值的;
> 或者可以先将字典数据写进redis,然后再在第一次使用的时候去访问redis,并加载到State中。
>
>
>
>
> --原始邮件--
> 发件人:
>   "user-zh"
> <
> yj5...@gmail.com;
> 发送时间:2020年8月13日(星期四) 中午1:49
> 收件人:"user-zh"
> 主题:请教关于Flink算子FINISHED状态时无法保存Checkpoint的问题
>
>
>
> 请教大佬一个我最近在配置Flink流的过程中遇到问题,
>
> flink作业中关联使用了物理表(字典表),在flinkjob启动后,会对字典表进行一次读取,然后该算子会变成FINISHED状态,导致该flinkjob无法保存checkpoint和savepoint。一般大家遇到这种问题都是怎么处理的,我这个作业在数据加工过程中必须用到字典表赋值。


Re: 在sink 执行notifyCheckpointComplete 方法时能否收到上游 snapshot collect 发送的数据?

2020-08-16 文章 Congxian Qiu
Hi
上游 snapshot 的逻辑和下游收到之前的 notifyCheckpointComplete
之间是没有必然联系的,所以这个从理论上是不保证先后顺序的。
Best,
Congxian


key lou  于2020年8月16日周日 下午9:27写道:

> 各位大佬:
>在如下代码中: FCombine  执行snapshot  collect 发送数据之后如果不执行sleep 则  FSubmit
> 在执行 notifyCheckpointComplete 方法时,list 集合 ls 为空。
> 如果在  FCombine  执行snapshot  collect 发送数据之后如果执行sleep,
> 在执行 notifyCheckpointComplete 方法时 则就可以收到  snapshot  collect 发送的数据。
> 我之前的理解是每个算子在执行完checkpoint 之后 才会把 barrier 广播到下游算子。 所以觉得下游无论如何应该在执行
> notifyCheckpointComplete 之前就会收到 上游 snapshot  collect 发送数据(因为 snapshot
>  collect 在前,广播 barrier  在后,然后下游在收到了 barrier  才会执行 chekpoint
> 的相关方法,所以在执行相关方法前 上游 snapshot  collect 发出的数据就应该已经到达了下游)。
> 但是根据如下代码的测试来看,不是这样的。请大佬帮忙解答下原因。
>
> public class FlinkCheckpointTest {
> public static void main(String[] args) throws Exception {
> StreamExecutionEnvironment steamEnv =
> StreamExecutionEnvironment.getExecutionEnvironment();
> steamEnv.enableCheckpointing(1000L*2);
> steamEnv
> .addSource(new FSource()).setParallelism(4)
> .transform("开始事务", Types.STRING,new FStart()).setParallelism(1)
> .process(new FCombine()).name("事务预处理").setParallelism(4)
> .addSink(new FSubmit()).name("提交事务").setParallelism(1)
> ;
> steamEnv.execute("test");
> }
>
>static class FSource extends RichParallelSourceFunction{
> @Override
> public void run(SourceContext sourceContext) throws
> Exception {
> int I =0;
> while (true){
> I = I + 1;
> sourceContext.collect("thread " +
> Thread.currentThread().getId() +"-" +I);
> Thread.sleep(1000);
> }
> }
> @Override
> public void cancel() {}
> }
>
> static class FStart extends AbstractStreamOperator
> implements OneInputStreamOperator{
>volatile Long ckid = 0L;
> @Override
> public void processElement(StreamRecord streamRecord)
> throws Exception {
> log("收到数据: " + streamRecord.getValue() + "..ckid:" + ckid);
> output.collect(streamRecord);
> }
> @Override
> public void prepareSnapshotPreBarrier(long checkpointId)
> throws Exception {
> log("开启事务: " + checkpointId);
> ckid = checkpointId;
> super.prepareSnapshotPreBarrier(checkpointId);
> }
> }
>
> static class FCombine extends ProcessFunction
> implements CheckpointedFunction {
> List ls = new ArrayList();
> Collector collector =null;
> volatile Long ckid = 0L;
>
> @Override
> public void snapshotState(FunctionSnapshotContext
> functionSnapshotContext) throws Exception {
> StringBuffer sb = new StringBuffer();
> ls.forEach(x->{sb.append(x).append(";");});
> log("批处理 " + functionSnapshotContext.getCheckpointId() +
> ": 时收到数据:" + sb.toString());
> Thread.sleep(5*1000);
> collector.collect(sb.toString());
> ls.clear();
> Thread.sleep(5*1000);
> //Thread.sleep(20*1000);
> }
> @Override
> public void initializeState(FunctionInitializationContext
> functionInitializationContext) throws Exception {}
> @Override
> public void processElement(String s, Context context,
> Collector out) throws Exception {
> if(StringUtils.isNotBlank(s)){
> ls.add(s);
> }
> log("收到数据 :" + s + "; 这批数据大小为:" + ls.size() + "..ckid:" +
> ckid);
> if(collector ==null){
> collector = out;
> }
> }
> }
>
> static class FSubmit extends RichSinkFunction implements
> /*  CheckpointedFunction,*/ CheckpointListener {
> List ls = new ArrayList();
> volatile Long ckid = 0L;
> @Override
> public void notifyCheckpointComplete(long l) throws Exception {
> ckid = l;
> StringBuffer sb = new StringBuffer();
> ls.forEach(x->{sb.append(x).append("||");});
> log("submit checkpoint " + l + " over data:list size" +
> ls.size()+ "; detail" + sb.toString());
> ls.clear();
> }
> @Override
> public void invoke(String value, Context context) throws Exception
> {
> if(StringUtils.isNotBlank(value)){
> ls.add(value);
> }
> log("收到数据 :" + value + " list zie:" + ls.size() + "..ckid:" +
> ckid);
> }
> }
> public static void log(String s){
> String name = Thread.currentThread().getName();
> System.out.println(new SimpleDateFormat("HH:mm:ss").format(new
> Date())+":"+name + ":" + s);
> }
> }
>


Re: Flink任务写入kafka 开启了EOS,statebackend为rocksdb,任务反压source端日志量堆积从cp恢复失败

2020-08-16 文章 Congxian Qiu
Hi
   你还有失败作业的 JM 和 TM
日志吗?如果有的话可以看一下这两个日志来确定为什么没有恢复成功。因为你说代码未作任何改变,然后恢复失败,这个还是比较奇怪的。
Best,
Congxian


Yang Peng  于2020年8月17日周一 上午10:25写道:

> 好的 感谢
>
> JasonLee <17610775...@163.com> 于2020年8月14日周五 下午9:22写道:
>
> > hi
> >
> > 没有日志不太好定位失败的原因 但是没有设置uid的话 是有可能重启失败的 建议还是都设置uid最好
> >
> >
> >
> > --
> > Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: flink1.11.1 flink on yarn 任务启动报错

2020-08-13 文章 Congxian Qiu
Hi

   这应该是个已知问题[1] 在 1.11.2 和 1.12 中已经修复

[1] https://issues.apache.org/jira/browse/FLINK-18710
Best,
Congxian


郭华威  于2020年8月13日周四 上午11:05写道:

> 你好,请教下:
> flink1.11.1 flink on yarn 任务启动报错:
>
>
> 启动命令:
> /opt/flink-1.11.1/bin/flink  run  -p 4 -ys 2 -m yarn-cluster -c
> yueworld.PVUV.MyPvUv  /mywork/flink/flink_1.11.1-1.0.jar
>
>
> 报错信息:
>
>
> 2020-08-13 10:53:08,160 ERROR
> org.apache.flink.runtime.rest.handler.taskmanager.TaskManagersHandler [] -
> Unhandled exception.
> org.apache.flink.runtime.rpc.akka.exceptions.AkkaRpcException: Failed to
> serialize the result for RPC call : requestTaskManagerInfo.
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.serializeRemoteResultAndVerifySize(AkkaRpcActor.java:368)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$sendAsyncResponse$0(AkkaRpcActor.java:335)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> at
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
> ~[?:1.8.0_221]
> at
> java.util.concurrent.CompletableFuture.uniWhenCompleteStage(CompletableFuture.java:778)
> ~[?:1.8.0_221]
> at
> java.util.concurrent.CompletableFuture.whenComplete(CompletableFuture.java:2140)
> ~[?:1.8.0_221]
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.sendAsyncResponse(AkkaRpcActor.java:329)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:298)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> at
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
> [flink-dist_2.11-1.11.1.jar:1.11.1]
> at akka.actor.ActorCell.invoke(ActorCell.scala:561)
> [flink-dist_2.11-1.11.1.jar:1.11.1]
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
> [flink-dist_2.11-1.11.1.jar:1.11.1]
> at akka.dispatch.Mailbox.run(Mailbox.scala:225)
> [flink-dist_2.11-1.11.1.jar:1.11.1]
> at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
> [flink-dist_2.11-1.11.1.jar:1.11.1]
> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> [flink-dist_2.11-1.11.1.jar:1.11.1]
> at
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> [flink-dist_2.11-1.11.1.jar:1.11.1]
> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> [flink-dist_2.11-1.11.1.jar:1.11.1]
> at
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> [flink-dist_2.11-1.11.1.jar:1.11.1]
> Caused by: java.io.NotSerializableException:
> org.apache.flink.runtime.rest.messages.ResourceProfileInfo
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
> ~[?:1.8.0_221]
> at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> ~[?:1.8.0_221]
> at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
> ~[?:1.8.0_221]
> at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> ~[?:1.8.0_221]
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> ~[?:1.8.0_221]
> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
> ~[?:1.8.0_221]
> at java.util.ArrayList.writeObject(ArrayList.java:766) ~[?:1.8.0_221]
> at sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source) ~[?:?]
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> ~[?:1.8.0_221]
> at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_221]
> at
> java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1140)
> ~[?:1.8.0_221]
> at
> 

Re: 请教关于Flink算子FINISHED状态时无法保存Checkpoint的问题

2020-08-13 文章 Congxian Qiu
Hi
   不好意思,上一份邮件没有说完就发送出去了。
   如果你希望把从其他地方读入 字典表,然后在 flink 中使用,或许可以看看 broadcast state[1]

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/state/broadcast_state.html
Best,
Congxian


Congxian Qiu  于2020年8月13日周四 下午2:00写道:

> Hi
> 现在 checkpoint/savepoint 需要所有算子都处于 RUNNING 状态,不过社区也有一些 issue
> 希望能够优化这个问题[1][2]
>
> [1] https://issues.apache.org/jira/browse/FLINK-2491
> [2] https://issues.apache.org/jira/browse/FLINK-18263
> Best,
> Congxian
>
>
> yulu yang  于2020年8月13日周四 下午1:49写道:
>
>> 请教大佬一个我最近在配置Flink流的过程中遇到问题,
>>
>> flink作业中关联使用了物理表(字典表),在flinkjob启动后,会对字典表进行一次读取,然后该算子会变成FINISHED状态,导致该flinkjob无法保存checkpoint和savepoint。一般大家遇到这种问题都是怎么处理的,我这个作业在数据加工过程中必须用到字典表赋值。
>>
>


Re: 请教关于Flink算子FINISHED状态时无法保存Checkpoint的问题

2020-08-13 文章 Congxian Qiu
Hi
现在 checkpoint/savepoint 需要所有算子都处于 RUNNING 状态,不过社区也有一些 issue
希望能够优化这个问题[1][2]

[1] https://issues.apache.org/jira/browse/FLINK-2491
[2] https://issues.apache.org/jira/browse/FLINK-18263
Best,
Congxian


yulu yang  于2020年8月13日周四 下午1:49写道:

> 请教大佬一个我最近在配置Flink流的过程中遇到问题,
>
> flink作业中关联使用了物理表(字典表),在flinkjob启动后,会对字典表进行一次读取,然后该算子会变成FINISHED状态,导致该flinkjob无法保存checkpoint和savepoint。一般大家遇到这种问题都是怎么处理的,我这个作业在数据加工过程中必须用到字典表赋值。
>


Re: Flink 1.10 堆外内存一直在增加

2020-08-11 文章 Congxian Qiu
Hi
   你能拿到 memory 的 dump 吗?OOM 可能需要看一下 memory 的 dump 才能更好的确定是什么问题
Best,
Congxian


ReignsDYL <1945627...@qq.com> 于2020年8月11日周二 下午4:01写道:

> 各位好,
>   Flink 1.10,集群在运行过程中,堆外内存一直在不断增加,内存就被慢慢耗尽,导致任务挂掉,请问是什么原因啊?
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Re: 一系列关于基于状态重启任务的问题

2020-08-10 文章 Congxian Qiu
hi
   1 checkpoint/savepoint 可以理解为将 状态备份到远程存储,恢复的时候会通过  operator 的 uid 来恢复
state,如果你确定不希望某些 operator 的 state 不进行恢复的话,或者使用不同的 uid
可以达到你的需求,具体的可以看一下这个文档的内容[1]
   2 合并的时候如果想把 savepoint/checkpoint 用起来,还是需要修改 checkpoint/savepoint
的内容,或者你可以试试 state processor api[2]

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/state/savepoints.html#savepoints
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/libs/state_processor_api.html
Best,
Congxian


Zhao,Yi(SEC)  于2020年8月11日周二 上午11:55写道:

> 请教几个关于基于状态重启的问题。
> 问题1:基于检查点/保存点启动时候能否指定部分结点不使用状态。
> 为什么有这么个需求呢,下面说下背景。
> 任务A:5分钟粒度的统计PV,使用event time,每10s一次触发更新到数据库。
> 任务B:天级别任务,利用了状态。
>
> 如上任务A和B,我整合为一个大任务提交到flink执行。假设有某种场景下,某些数据错误等,我需要做修复等。并且修复方案需要能做到:从指定时间开始运行(这个是我基于kafkaSouce设置开始时间实现),同时配合一个时间范围过滤算子实现。但是flink如果基于状态重启,则kafkaSouce的offset会基于状态中的offset来做,而不是我配置的开始时间来做。但我又不能不基于状态重启,因为还有任务B是不可容忍丢失状态的。
>
>
> 这种情况怎么搞呢?当然通过flink提供的状态操作API去修改状态可能是一种方式,但感觉成本挺高。或者从保存下的保存点/检查点的路径来看,有没有可能从名字看出哪个状态文件是哪个结点的呢?我能否简单找到kafkaSouce结点的状态文件删除,并且配合flink提供的—allowNonRestoredState实现KafkaSouce不基于状态重启,而其他结点基于状态重启呢?当然也不清楚即使这可行,那么这种情况下KafkaSource是否会按照我设置的开始时间去消费。
>
> 问题2:任务合并或拆分问题。
> 拆分:
>
> 仍然假设有任务A和任务B,放在同一个JOB中。如果业务需要拆分开,这个相对容易实现。我只需要做个保存点。然后启动基于保存点任务A(配合—allowNonRestoredState,任务B的状态会被忽略)。再然后启动任务B(配合—allowNonRestoredState,任务A的状态会被忽略)。
> 合并:
>
> 问题来了,合并case怎么做。任务A和任务B如想合并怎么做呢?还是之前那个想法,状态文件结构是否可以直接合并到一起呢?比如任务的保存点文件夹和任务B的保存点文件夹合并后是否可以直接被用?
> 当然我并不清楚检查点和保存点的保存文件夹中文件命名的含义是否是结点uid啥的。是否可行呢?
>


Re: flink1.10.1/1.11.1 使用sql 进行group 和 时间窗口 操作后 状态越来越大

2020-08-09 文章 Congxian Qiu
lease-1.11/zh/ops/state/state_backends.html#%E5%A2%9E%E9%87%8F%E5%BF%AB%E7%85%A7
> 
> <https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/ops/state/state_backends.html#%E5%A2%9E%E9%87%8F%E5%BF%AB%E7%85%A7>;
>  gt
>   <
> 
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/ops/state/state_backends.html#%E5%A2%9E%E9%87%8F%E5%BF%AB%E7%85%A7gt
> 
> <https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/ops/state/state_backends.html#%E5%A2%9E%E9%87%8F%E5%BF%AB%E7%85%A7gt>;
> ;
>   Best,
>   gt; Congxian
>   gt;
>   gt;
>   gt; op <520075...@qq.comamp;gt; 于2020年8月3日周一 下午2:18写道:
>   gt;
>   gt; amp;gt; amp;amp;nbsp; amp;amp;nbsp;
>   gt; amp;gt;
>   同问,我也遇到了状态越来越大的情况,使用的是1.11.0版本,用hdfs保存checkpoint,checkpoint间隔3分钟,
>   gt; amp;gt; 逻辑是按照 事件day 和 id 进行groupby
>   gt; amp;gt;
> 然后有十几个聚合指标,运行了7天左右,状态一直在增加,设置了失效时间,然后watermark看着也正常在走
>   gt; amp;gt;
> tConfig.setIdleStateRetentionTime(Time.minutes(1440),
>   gt; amp;gt; Time.minutes(1440+10))
>   gt; amp;gt;
>   gt; amp;gt;
>   gt; amp;gt;
>   gt; amp;gt;
>   gt; amp;gt;
>  
> --amp;amp;nbsp;原始邮件amp;amp;nbsp;--
>   gt; amp;gt; 发件人:
>   gt;
>  
> 
> amp;gt;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;
>   gt; amp;nbsp; "user-zh"
>   gt;
>  
> 
> amp;gt;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;
>   gt; amp;nbsp; <
>   gt; amp;gt; 384939...@qq.comamp;amp;gt;;
>   gt; amp;gt; 发送时间:amp;amp;nbsp;2020年8月3日(星期一)
> 中午1:50
>   gt; amp;gt; 收件人:amp;amp;nbsp;"user-zh"<
> user-zh@flink.apache.org
>   amp;amp;gt;;
>   gt; amp;gt;
>   gt; amp;gt; 主题:amp;amp;nbsp;Re:
> flink1.10.1/1.11.1 使用sql 进行group 和
>  时间窗口
>   操作后 状态越来越大
>   gt; amp;gt;
>   gt; amp;gt;
>   gt; amp;gt;
>   gt; amp;gt; hi,您好:
>   gt; amp;gt; 我改回增量模式重新收集了一些数据:
>   gt; amp;gt; 1、数据处理速度:3000条每秒,是测试环境的,压力比较稳定,几乎没有波动
>   gt; amp;gt; 2、checkpoint是interval设置的是5秒
>   gt; amp;gt; 3、目前这个作业是每分钟一个窗口
>   gt; amp;gt; 4、并行度设置的1,使用on-yarn模式
>   gt; amp;gt;
>   gt; amp;gt; 刚启动的时候,如下:
>   gt; amp;gt; <
>  
> http://apache-flink.147419.n8.nabble.com/file/t793/6.pngamp;amp;gt;
>   gt; amp;gt;
>   gt; amp;gt; 18分钟后,如下:
>   gt; amp;gt; <
>  
> http://apache-flink.147419.n8.nabble.com/file/t793/9.pngamp;amp;gt;
>   gt; amp;gt;
>   gt; amp;gt; checkpoints设置:
>   gt; amp;gt; <
>  
> http://apache-flink.147419.n8.nabble.com/file/t793/conf.pngamp;amp;gt
> ;
>   gt; amp;gt;
>   gt; amp;gt; hdfs上面大小:
>   gt; amp;gt; <
>  
> http://apache-flink.147419.n8.nabble.com/file/t793/hdfs.pngamp;amp;gt
> ;
>   gt; amp;gt;
>   gt; amp;gt; 页面上看到的大小:
>   gt; amp;gt; <
>   gt;
>  
> 
> http://apache-flink.147419.n8.nabble.com/file/t793/checkpoinsts1.pngamp;amp;gt
> 
> <http://apache-flink.147419.n8.nabble.com/file/t793/checkpoinsts1.pngamp;amp;gt>;
>  gt
>   <
> 
> http://apache-flink.147419.n8.nabble.com/file/t793/checkpoinsts1.pngamp;amp;gtgt
> 
> <http://apache-flink.147419.n8.nabble.com/file/t793/checkpoinsts1.pngamp;amp;gtgt>;
> ;
>   ;
>   gt; amp;gt;
>   gt; amp;gt;
>   gt; amp;gt; Congxian Qiu wrote
>   gt; amp;gt; amp;amp;gt;
> Hiamp;amp;nbsp;amp;amp;nbsp; 鱼子酱
>   gt; amp;gt;
>  
> amp;amp;gt;amp;amp;nbsp;amp;amp;nbsp;amp;amp;nbsp;amp;amp;nbsp;
>   能否把在使用增量 checkpoint
>   gt; 的模式下,截图看一下 checkpoint
>   gt; amp;gt; size 的走势呢?另外可以的话,也麻烦你在每次
>   gt; amp;gt; amp;amp;gt; checkpoint 做完之后,到 hdfs 上
> ls 一下 checkpoint
>  目录的大小。
>   gt; amp;gt;
>  
> amp;amp;gt;amp;amp;nbsp;amp;amp;nbsp;amp;amp;nbsp;amp;amp;nbsp;
>   gt; 另外有一个问题还需要回答一下,你的处理速度大概是多少,state 的更新频率能否评估一下呢?
>   gt; amp;gt; amp;amp;gt;
>   gt; amp;gt; amp;amp;gt; Best,
>   gt; amp;gt; amp;amp;gt; Congxian
>   gt; amp;gt; amp;amp;gt;
>   gt; amp;gt; amp;amp;gt;
>   gt; amp

Re: Flink 1.10 on Yarn

2020-08-09 文章 Congxian Qiu
Hi xuhaiLong
   请问你这个作业在这个版本是是必然出现 NPE 问题吗?另外 1.10 之前的版本有出现过这个问题吗?
Best,
Congxian


xuhaiLong  于2020年8月7日周五 下午3:14写道:

> 感谢回复!我这边的确是这个bug 引起的
>
>
> On 8/7/2020 13:43,chenkaibit wrote:
> hi xuhaiLong,看日志发生的 checkpoint nullpointer 是个已知的问题,具体可以查看下面两个jira。
> 你用的jdk版本是多少呢?目前发现使用 jdk8_40/jdk8_60 + flink-1.10 会出现 checkpoint
> nullpointer,可以把jdk升级下版本试一下
> https://issues.apache.org/jira/browse/FLINK-18196
> https://issues.apache.org/jira/browse/FLINK-17479
>
>
>
>
> 在 2020-08-07 12:50:23,"xuhaiLong"  写道:
>
> sorry,我添加错附件了
>
>
> 是的,taskmanager.memory.jvm-metaspace.size 为默认配置
> On 8/7/2020 11:43,Yangze Guo wrote:
> 日志没有贴成功,taskmanager.memory.jvm-metaspace.size目前是默认配置么?
>
> Best,
> Yangze Guo
>
> On Fri, Aug 7, 2020 at 11:38 AM xuhaiLong  wrote:
>
>
>
> Hi
>
>
> 场景:1 tm 三个slot,run了三个job
>
>
> 三个job 运行的时候 出现了 ck 过程中空指针异常,导致任务一致重启。最终导致`Metaspace` 空间占满,出现
> `java.lang.OutOfMemoryError: Metaspace. The metaspace out-of-memory error
> has occurred. This can mean two things: either the job requires a larger
> size of JVM metaspace to load classes or there is a class loading leak. In
> the first case 'taskmanager.memory.jvm-metaspace.size' configuration option
> should be increased. If the error persists (usually in cluster after
> several job (re-)submissions) then there is probably a class loading leak
> which has to be investigated and fixed. The task executor has to be
> shutdown...
> `
>
>
> 附件为部分异常信息
>
>
> 疑问:
> 1. 为什么会在 ck 时候出现空指针?(三个 job 为同一个 kafka topic,通过ck 恢复 job 可以正常运行,应该不是数据的问题)
> 2. 通过日志看,是可以重启的,为什么自动重启后还存在这个问题,导致一直重启?
>
>
> 感谢~~~
> 从网易邮箱大师发来的云附件
> 08-07error.txt(730.4KB,2020年8月22日 11:37 到期)
> 下载
>


Re: flink timerservice注册的timer定时器只有少部分触发

2020-08-06 文章 Congxian Qiu
Hi
   对于 event time 的处理来说,不建议注册 timer 的时候使用 System.currentTimeMillis()
这种系统时间,这两个时间可能会不一样,可以使用 TimerService 中的 currentWatermark 表示当前的 event time

Best,
Congxian


jsqf  于2020年8月6日周四 下午9:53写道:

> 试试 重写 onTimer 方法
> 可以参考
>
> https://github.com/JSQF/flink10_learn/blob/master/src/main/scala/com/yyb/flink10/DataStream/ProcessFunction/OperatorProcessFunctionDemo.java
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: flink1.10.1/1.11.1 使用sql 进行group 和 时间窗口 操作后 状态越来越大

2020-08-05 文章 Congxian Qiu
; >  gt;
> >  gt; 主题:amp;nbsp;Re: flink1.10.1/1.11.1 使用sql 进行group 和
> 时间窗口
> > 操作后 状态越来越大
> >  gt;
> >  gt;
> >  gt;
> >  gt; hi,您好:
> >  gt; 我改回增量模式重新收集了一些数据:
> >  gt; 1、数据处理速度:3000条每秒,是测试环境的,压力比较稳定,几乎没有波动
> >  gt; 2、checkpoint是interval设置的是5秒
> >  gt; 3、目前这个作业是每分钟一个窗口
> >  gt; 4、并行度设置的1,使用on-yarn模式
> >  gt;
> >  gt; 刚启动的时候,如下:
> >  gt; <
> > http://apache-flink.147419.n8.nabble.com/file/t793/6.pngamp;gt;
> >  gt;
> >  gt; 18分钟后,如下:
> >  gt; <
> > http://apache-flink.147419.n8.nabble.com/file/t793/9.pngamp;gt;
> >  gt;
> >  gt; checkpoints设置:
> >  gt; <
> > http://apache-flink.147419.n8.nabble.com/file/t793/conf.pngamp;gt;
> >  gt;
> >  gt; hdfs上面大小:
> >  gt; <
> > http://apache-flink.147419.n8.nabble.com/file/t793/hdfs.pngamp;gt;
> >  gt;
> >  gt; 页面上看到的大小:
> >  gt; <
> > 
> >
> http://apache-flink.147419.n8.nabble.com/file/t793/checkpoinsts1.pngamp;gt
> > 
> > <
> http://apache-flink.147419.n8.nabble.com/file/t793/checkpoinsts1.pngamp;gt
> >;
> > ;
> >  gt;
> >  gt;
> >  gt; Congxian Qiu wrote
> >  gt; amp;gt; Hiamp;nbsp;amp;nbsp; 鱼子酱
> >  gt;
> > amp;gt;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;
> > 能否把在使用增量 checkpoint
> >  的模式下,截图看一下 checkpoint
> >  gt; size 的走势呢?另外可以的话,也麻烦你在每次
> >  gt; amp;gt; checkpoint 做完之后,到 hdfs 上 ls 一下 checkpoint
> 目录的大小。
> >  gt;
> > amp;gt;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;
> >  另外有一个问题还需要回答一下,你的处理速度大概是多少,state 的更新频率能否评估一下呢?
> >  gt; amp;gt;
> >  gt; amp;gt; Best,
> >  gt; amp;gt; Congxian
> >  gt; amp;gt;
> >  gt; amp;gt;
> >  gt; amp;gt; 鱼子酱 <
> >  gt;
> >  gt; amp;gt; 384939718@
> >  gt;
> >  gt; amp;gt;amp;gt; 于2020年7月30日周四 上午10:43写道:
> >  gt; amp;gt;
> >  gt; amp;gt;amp;gt; 感谢!
> >  gt; amp;gt;amp;gt;
> >  gt; amp;gt;amp;gt;
> > flink1.11.1版本里面,我尝试了下面两种backend,目前运行了20多个小时,
> >  gt; amp;gt;amp;gt; 能够看到状态的大小在一个区间内波动,没有发现一直增长的情况了。
> >  gt; amp;gt;amp;gt; StateBackend backend =new
> >  gt; amp;gt;amp;gt;
> >  gt; amp;gt;amp;gt;
> >  gt;
> > 
> >
> RocksDBStateBackend("hdfs:///checkpoints-data/"+yamlReader.getValueByKey("jobName").toString()+"/",false);
> >  gt; amp;gt;amp;gt; StateBackend backend =new
> >  gt; amp;gt;amp;gt;
> >  gt; amp;gt;amp;gt;
> >  gt;
> > 
> >
> FsStateBackend("hdfs:///checkpoints-data/"+yamlReader.getValueByKey("jobName").toString()+"/",false);
> >  gt; amp;gt;amp;gt;
> >  gt; amp;gt;amp;gt;
> >  gt; amp;gt;amp;gt;
> > 这样看,有可能是RocksDBStateBackend增量模式这边可能存在一些问题。
> >  gt; amp;gt;amp;gt; RocksDBStateBackend:
> >  gt; amp;gt;amp;gt; amp;amp;lt;
> >  gt;
> >
> http://apache-flink.147419.n8.nabble.com/file/t793/444.pngamp;amp;gt
> > 
> > <
> http://apache-flink.147419.n8.nabble.com/file/t793/444.pngamp;amp;gt
> >;
> > ;
> >  gt; amp;gt;amp;gt; FsStateBackend:
> >  gt; amp;gt;amp;gt; amp;amp;lt;
> >  gt;
> >
> http://apache-flink.147419.n8.nabble.com/file/t793/555.pngamp;amp;gt
> > 
> > <
> http://apache-flink.147419.n8.nabble.com/file/t793/555.pngamp;amp;gt
> >;
> > ;
> >  gt; amp;gt;amp;gt;
> >  gt; amp;gt;amp;gt;
> >  gt; amp;gt;amp;gt;
> >  gt; amp;gt;amp;gt;
> >  gt; amp;gt;amp;gt; --
> >  gt; amp;gt;amp;gt; Sent from:
> > http://apache-flink.147419.n8.nabble.com/
> >  <http://apache-flink.147419.n8.nabble.com/>; gt <
> > http://apache-flink.147419.n8.nabble.com/gt;;
> > amp;gt;amp;gt;
> >  gt;
> >  gt;
> >  gt;
> >  gt;
> >  gt;
> >  gt; --
> >  gt; Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: flink1.10.1/1.11.1 使用sql 进行group 和 时间窗口 操作后 状态越来越大

2020-08-05 文章 Congxian Qiu
Hi
  RocksDB StateBackend 只需要在 flink-conf 中进行一下配置就行了[1].

  另外从你前面两份邮件看,我有些信息比较疑惑,你能否贴一下现在使用的 flink-conf,以及 checkpoint UI 的截图,以及 HDFS
上 checkpoint 目录的截图

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/ops/state/state_backends.html#%E8%AE%BE%E7%BD%AE-state-backend

Best,
Congxian


op <520075...@qq.com> 于2020年8月5日周三 下午4:03写道:

> 你好,ttl配置是
> val settings = EnvironmentSettings.newInstance().inStreamingMode().build()
> val tableEnv = StreamTableEnvironment.create(bsEnv, settings)
> val tConfig = tableEnv.getConfig
> tConfig.setIdleStateRetentionTime(Time.minutes(1440), Time.minutes(1450))
>
>
>   1)目前是有3个任务都是这种情况
>   2)目前集群没有RocksDB环境
> 谢谢
> --原始邮件--
> 发件人:
>   "user-zh"
> <
> qcx978132...@gmail.com;
> 发送时间:2020年8月5日(星期三) 下午3:30
> 收件人:"user-zh"
> 主题:Re: flink1.10.1/1.11.1 使用sql 进行group 和 时间窗口 操作后 状态越来越大
>
>
>
> Hi op
>  这个情况比较奇怪。我想确认下:
>  1)你所有作业都遇到 checkpoint size 不断变大的情况,还是只有这个类型的作业遇到这个问题呢?
>  2)是否尝试过 RocksDBStateBackend 呢(全量和增量)?情况如何呢
>
>  另外,你 TTL 其他的配置是怎么设置的呢?
>
> 从原理上来说,checkpoint 就是 state 的一个快照,如果 checkpoint 越来越大,那么就是 state 越来越多。
> Best,
> Congxian
>
>
> op <520075...@qq.com 于2020年8月5日周三 下午2:46写道:
>
>  nbsp; nbsp;
> 
> 你好,我使用的是FsStateBackendnbsp;状态后端,调到5分钟也是一样,看了下checkpoint花费的时间都在300ms左右,我们的业务数据量每天基本一样,
>  nbsp;
> nbsp;设置空闲状态清理时间为1440minute,按道理运行一天以后状态大小会趋于平稳,但是目前运行了5天,
>  nbsp; nbsp;观察到的checkpoint shared 目录大小一直在增加,也确认过group
>  by的key只会在处理当天出现,就是说这天的状态当天过后就会处于空闲状态,
>  nbsp; nbsp;运行5天能满足清理条件
> 
> 
> 
> 
>  -- 原始邮件 --
>  发件人:
> 
> "user-zh"
> 
> <
>  qcx978132...@gmail.comgt;;
>  发送时间:nbsp;2020年8月3日(星期一) 下午5:50
>  收件人:nbsp;"user-zh" 
>  主题:nbsp;Re: flink1.10.1/1.11.1 使用sql 进行group 和 时间窗口 操作后 状态越来越大
> 
> 
> 
>  Hi
>  nbsp;nbsp; 能否把 checkpoint 的 interval 调长一点再看看是否稳定呢?从 shared
>  目录的数据量看,有增长,后续基本持平。现在
>  Checkpointed Data Size 是增量的大小[1],而不是整个 checkpoint 的数据量的大小,如果
> checkpoint
>  之间,数据改动很多的话,这个值会变大
> 
>  [1]
> 
> 
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/ops/state/state_backends.html#%E5%A2%9E%E9%87%8F%E5%BF%AB%E7%85%A7
> 
> <https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/ops/state/state_backends.html#%E5%A2%9E%E9%87%8F%E5%BF%AB%E7%85%A7>;
> Best,
>  Congxian
> 
> 
>  op <520075...@qq.comgt; 于2020年8月3日周一 下午2:18写道:
> 
>  gt; amp;nbsp; amp;nbsp;
>  gt;
> 同问,我也遇到了状态越来越大的情况,使用的是1.11.0版本,用hdfs保存checkpoint,checkpoint间隔3分钟,
>  gt; 逻辑是按照 事件day 和 id 进行groupby
>  gt; 然后有十几个聚合指标,运行了7天左右,状态一直在增加,设置了失效时间,然后watermark看着也正常在走
>  gt; tConfig.setIdleStateRetentionTime(Time.minutes(1440),
>  gt; Time.minutes(1440+10))
>  gt;
>  gt;
>  gt;
>  gt;
>  gt;
> --amp;nbsp;原始邮件amp;nbsp;--
>  gt; 发件人:
> 
> gt;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;
>  nbsp; "user-zh"
> 
> gt;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;
>  nbsp; <
>  gt; 384939...@qq.comamp;gt;;
>  gt; 发送时间:amp;nbsp;2020年8月3日(星期一) 中午1:50
>  gt; 收件人:amp;nbsp;"user-zh" amp;gt;;
>  gt;
>  gt; 主题:amp;nbsp;Re: flink1.10.1/1.11.1 使用sql 进行group 和 时间窗口
> 操作后 状态越来越大
>  gt;
>  gt;
>  gt;
>  gt; hi,您好:
>  gt; 我改回增量模式重新收集了一些数据:
>  gt; 1、数据处理速度:3000条每秒,是测试环境的,压力比较稳定,几乎没有波动
>  gt; 2、checkpoint是interval设置的是5秒
>  gt; 3、目前这个作业是每分钟一个窗口
>  gt; 4、并行度设置的1,使用on-yarn模式
>  gt;
>  gt; 刚启动的时候,如下:
>  gt; <
> http://apache-flink.147419.n8.nabble.com/file/t793/6.pngamp;gt;
>  gt;
>  gt; 18分钟后,如下:
>  gt; <
> http://apache-flink.147419.n8.nabble.com/file/t793/9.pngamp;gt;
>  gt;
>  gt; checkpoints设置:
>  gt; <
> http://apache-flink.147419.n8.nabble.com/file/t793/conf.pngamp;gt;
>  gt;
>  gt; hdfs上面大小:
>  gt; <
> http://apache-flink.147419.n8.nabble.com/file/t793/hdfs.pngamp;gt;
>  gt;
>  gt; 页面上看到的大小:
>  gt; <
> 
> http://apache-flink.147419.n8.nabble.com/file/t793/checkpoinsts1.pngamp;gt
> 
> <http://apache-flink.147419.n8.nabble.com/file/t793/checkpoinsts1.pngamp;gt>;
> ;
>  gt;
>  gt;
>  gt; Congxian Qiu wrote
>  gt; amp;gt; H

Re: flink1.10.1/1.11.1 使用sql 进行group 和 时间窗口 操作后 状态越来越大

2020-08-05 文章 Congxian Qiu
Hi op
   这个情况比较奇怪。我想确认下:
   1)你所有作业都遇到 checkpoint size 不断变大的情况,还是只有这个类型的作业遇到这个问题呢?
   2)是否尝试过 RocksDBStateBackend 呢(全量和增量)?情况如何呢

   另外,你 TTL 其他的配置是怎么设置的呢?

从原理上来说,checkpoint 就是 state 的一个快照,如果 checkpoint 越来越大,那么就是 state 越来越多。
Best,
Congxian


op <520075...@qq.com> 于2020年8月5日周三 下午2:46写道:

>  
> 你好,我使用的是FsStateBackend状态后端,调到5分钟也是一样,看了下checkpoint花费的时间都在300ms左右,我们的业务数据量每天基本一样,
>  设置空闲状态清理时间为1440minute,按道理运行一天以后状态大小会趋于平稳,但是目前运行了5天,
>  观察到的checkpoint shared 目录大小一直在增加,也确认过group
> by的key只会在处理当天出现,就是说这天的状态当天过后就会处于空闲状态,
>  运行5天能满足清理条件
>
>
>
>
> -- 原始邮件 --
> 发件人:
>   "user-zh"
> <
> qcx978132...@gmail.com;
> 发送时间:2020年8月3日(星期一) 下午5:50
> 收件人:"user-zh"
> 主题:Re: flink1.10.1/1.11.1 使用sql 进行group 和 时间窗口 操作后 状态越来越大
>
>
>
> Hi
>  能否把 checkpoint 的 interval 调长一点再看看是否稳定呢?从 shared
> 目录的数据量看,有增长,后续基本持平。现在
> Checkpointed Data Size 是增量的大小[1],而不是整个 checkpoint 的数据量的大小,如果 checkpoint
> 之间,数据改动很多的话,这个值会变大
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/ops/state/state_backends.html#%E5%A2%9E%E9%87%8F%E5%BF%AB%E7%85%A7
> Best,
> Congxian
>
>
> op <520075...@qq.com 于2020年8月3日周一 下午2:18写道:
>
>  nbsp; nbsp;
>  同问,我也遇到了状态越来越大的情况,使用的是1.11.0版本,用hdfs保存checkpoint,checkpoint间隔3分钟,
>  逻辑是按照 事件day 和 id 进行groupby
>  然后有十几个聚合指标,运行了7天左右,状态一直在增加,设置了失效时间,然后watermark看着也正常在走
>  tConfig.setIdleStateRetentionTime(Time.minutes(1440),
>  Time.minutes(1440+10))
> 
> 
> 
> 
>  --nbsp;原始邮件nbsp;--
>  发件人:
> 
>  "user-zh"
> 
>  <
>  384939...@qq.comgt;;
>  发送时间:nbsp;2020年8月3日(星期一) 中午1:50
>  收件人:nbsp;"user-zh" 
>  主题:nbsp;Re: flink1.10.1/1.11.1 使用sql 进行group 和 时间窗口 操作后 状态越来越大
> 
> 
> 
>  hi,您好:
>  我改回增量模式重新收集了一些数据:
>  1、数据处理速度:3000条每秒,是测试环境的,压力比较稳定,几乎没有波动
>  2、checkpoint是interval设置的是5秒
>  3、目前这个作业是每分钟一个窗口
>  4、并行度设置的1,使用on-yarn模式
> 
>  刚启动的时候,如下:
>  <http://apache-flink.147419.n8.nabble.com/file/t793/6.pnggt;
> 
>  18分钟后,如下:
>  <http://apache-flink.147419.n8.nabble.com/file/t793/9.pnggt;
> 
>  checkpoints设置:
>  <http://apache-flink.147419.n8.nabble.com/file/t793/conf.pnggt;
> 
>  hdfs上面大小:
>  <http://apache-flink.147419.n8.nabble.com/file/t793/hdfs.pnggt;
> 
>  页面上看到的大小:
>  <
> http://apache-flink.147419.n8.nabble.com/file/t793/checkpoinsts1.pnggt
> ;
> 
> 
>  Congxian Qiu wrote
>  gt; Hinbsp;nbsp; 鱼子酱
>  gt;nbsp;nbsp;nbsp;nbsp; 能否把在使用增量 checkpoint
> 的模式下,截图看一下 checkpoint
>  size 的走势呢?另外可以的话,也麻烦你在每次
>  gt; checkpoint 做完之后,到 hdfs 上 ls 一下 checkpoint 目录的大小。
>  gt;nbsp;nbsp;nbsp;nbsp;
> 另外有一个问题还需要回答一下,你的处理速度大概是多少,state 的更新频率能否评估一下呢?
>  gt;
>  gt; Best,
>  gt; Congxian
>  gt;
>  gt;
>  gt; 鱼子酱 <
> 
>  gt; 384939718@
> 
>  gt;gt; 于2020年7月30日周四 上午10:43写道:
>  gt;
>  gt;gt; 感谢!
>  gt;gt;
>  gt;gt; flink1.11.1版本里面,我尝试了下面两种backend,目前运行了20多个小时,
>  gt;gt; 能够看到状态的大小在一个区间内波动,没有发现一直增长的情况了。
>  gt;gt; StateBackend backend =new
>  gt;gt;
>  gt;gt;
> 
> RocksDBStateBackend("hdfs:///checkpoints-data/"+yamlReader.getValueByKey("jobName").toString()+"/",false);
>  gt;gt; StateBackend backend =new
>  gt;gt;
>  gt;gt;
> 
> FsStateBackend("hdfs:///checkpoints-data/"+yamlReader.getValueByKey("jobName").toString()+"/",false);
>  gt;gt;
>  gt;gt;
>  gt;gt; 这样看,有可能是RocksDBStateBackend增量模式这边可能存在一些问题。
>  gt;gt; RocksDBStateBackend:
>  gt;gt; amp;lt;
>  http://apache-flink.147419.n8.nabble.com/file/t793/444.pngamp;gt
> ;
>  gt;gt; FsStateBackend:
>  gt;gt; amp;lt;
>  http://apache-flink.147419.n8.nabble.com/file/t793/555.pngamp;gt
> ;
>  gt;gt;
>  gt;gt;
>  gt;gt;
>  gt;gt;
>  gt;gt; --
>  gt;gt; Sent from: http://apache-flink.147419.n8.nabble.com/
>  <http://apache-flink.147419.n8.nabble.com/>; gt;gt;
> 
> 
> 
> 
> 
>  --
>  Sent from: http://apache-flink.147419.n8.nabble.com/


Re: 求助:Flink有没有类似Storm的ack机制呢?

2020-08-04 文章 Congxian Qiu
Hi 张洋
如果我理解没错的话,现在 Flink
无法严格保证消费了一条数据,等待上一条数据处理完成,然后再消费下一条数据的。如果想做到这个需求,需要用户做一些事情。
你说的第 2 点中是否处理完成,这个能否依赖第三方服务,在 sink(或者最后一个 operator) 处理完成之后做通知呢?
checkpoint 如果正常完成的话,那就是没有异常的,不过 checkpoint 没法完全保证一条数据一条数据的处理

Best,
Congxian


Bruce  于2020年8月5日周三 上午9:33写道:

> 1.我们这里有个需求,Flink从rabbitmq接收消息,需要完整消费处理完前一条,才可以继续消费,因为前一条的结果会影响后面一条的结果准确性。
>
>
> 2.目前我了解到的rabbitmq有个qos可以限流为1条,但是消息流入Flink处理,我并不知道什么时候处理完了,也没有标识可以知道处理完了。
>
>
>
> 3.通过checkpoint的通知,也不是很准确,我并不清楚checkpoint的备份周期内程序是否执行完成,只能知道checkpoint周期内没有异常
>
>
> 4.所以想求助下,Flink如何确认某一段task是否执行完毕?有没有这种机制呢?
>
>
>
>
>
>
> best wishes
> -
> 张洋
>
>
> 


Re: flink1.9.1任务已经fail掉了,但在yarn上这个application还是在running

2020-08-03 文章 Congxian Qiu
Hi
   或许你可以看一下 Flink 作业的 JM 是不是还在运行着?
Best,
Congxian


bradyMk  于2020年8月4日周二 上午11:38写道:

> 请教大家:
>
> flink1.9.1任务已经fail掉了,但在yarn上这个application还是在running,且yarn上分配的资源变成了1,程序中用的是固定延迟重启策略,请问有人知道任务挂掉但yarn上一直在running是什么原因么?
> <
> http://apache-flink.147419.n8.nabble.com/file/t802/Inked%E6%8D%95%E8%8E%B711_LI.jpg>
>
> 
>
>
>
>
> -
> Best Wishes
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


  1   2   3   >