Re: flink-savepoint问题
对于 keyed state,需要保证同一个 key 在 同一个 keygroup 中,如果是某个 key 有热点,可以在 keyby 之前进行一次 map(在 key 后面拼接一些 后缀),然后 keyby,最后处理完成之后,将这些进行聚合 Best, Congxian guomuhua <663021...@qq.com> 于2021年3月4日周四 下午12:49写道: > 我也遇到类似情况,为了打散数据,keyby加了随机数。请问怎么正确打散数据呢? > nobleyd wrote > > 是不是使用了随机key。 > > > guaishushu1103@ > > > > > > guaishushu1103@ > > > 于2021年3月3日周三 下午6:53写道:> checkpoint 可以成功保存,但是savepoint出现错误:> > > java.lang.Exception: Could not materialize checkpoint 2404 for operator> > > KeyedProcess (21/48).> at> > > > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:1100)> > > > at> > > > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1042)> > > > at> > > > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)> > > > at> > > > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)> > > > at java.lang.Thread.run(Thread.java:745)> Caused by: > > java.util.concurrent.ExecutionException:> > > java.lang.IllegalArgumentException: Key group 0 is not in> > > KeyGroupRange{startKeyGroup=54, endKeyGroup=55}.> at > > java.util.concurrent.FutureTask.report(FutureTask.java:122)> at > > java.util.concurrent.FutureTask.get(FutureTask.java:192)> at> > > > org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:450)> > > > at> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer. > > > (OperatorSnapshotFinalizer.java:47)> at> > > > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1011)> > > > ... 3 more> Caused by: java.lang.IllegalArgumentException: Key group 0 is > > not in> KeyGroupRange{startKeyGroup=54, endKeyGroup=55}.> at> > > > org.apache.flink.runtime.state.KeyGroupRangeOffsets.computeKeyGroupIndex(KeyGroupRangeOffsets.java:142)> > > > at> > > > org.apache.flink.runtime.state.KeyGroupRangeOffsets.setKeyGroupOffset(KeyGroupRangeOffsets.java:104)> > > > at> > > > org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeKVStateData(RocksFullSnapshotStrategy.java:314)> > > > at> > > > org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeSnapshotToOutputStream(RocksFullSnapshotStrategy.java:256)> > > > at> > > > org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.callInternal(RocksFullSnapshotStrategy.java:221)> > > > at> > > > org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.callInternal(RocksFullSnapshotStrategy.java:174)> > > > at> > > > org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75)> > > > at java.util.concurrent.FutureTask.run(FutureTask.java:266)> at> > > > org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:447)> > > > ... 5 more>>> > > > guaishushu1103@ > > >> > > > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/
Re: flink-savepoint问题
我也遇到类似情况,为了打散数据,keyby加了随机数。请问怎么正确打散数据呢? nobleyd wrote > 是不是使用了随机key。 > guaishushu1103@ > > guaishushu1103@ > 于2021年3月3日周三 下午6:53写道:> checkpoint 可以成功保存,但是savepoint出现错误:> > java.lang.Exception: Could not materialize checkpoint 2404 for operator> > KeyedProcess (21/48).> at> > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:1100)> > > at> > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1042)> > > at> > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)> > > at> > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)> > > at java.lang.Thread.run(Thread.java:745)> Caused by: > java.util.concurrent.ExecutionException:> > java.lang.IllegalArgumentException: Key group 0 is not in> > KeyGroupRange{startKeyGroup=54, endKeyGroup=55}.> at > java.util.concurrent.FutureTask.report(FutureTask.java:122)> at > java.util.concurrent.FutureTask.get(FutureTask.java:192)> at> > org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:450)> > > at> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer. > (OperatorSnapshotFinalizer.java:47)> at> > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1011)> > > ... 3 more> Caused by: java.lang.IllegalArgumentException: Key group 0 is > not in> KeyGroupRange{startKeyGroup=54, endKeyGroup=55}.> at> > org.apache.flink.runtime.state.KeyGroupRangeOffsets.computeKeyGroupIndex(KeyGroupRangeOffsets.java:142)> > > at> > org.apache.flink.runtime.state.KeyGroupRangeOffsets.setKeyGroupOffset(KeyGroupRangeOffsets.java:104)> > > at> > org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeKVStateData(RocksFullSnapshotStrategy.java:314)> > > at> > org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeSnapshotToOutputStream(RocksFullSnapshotStrategy.java:256)> > > at> > org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.callInternal(RocksFullSnapshotStrategy.java:221)> > > at> > org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.callInternal(RocksFullSnapshotStrategy.java:174)> > > at> > org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75)> > > at java.util.concurrent.FutureTask.run(FutureTask.java:266)> at> > org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:447)> > > ... 5 more>>> > guaishushu1103@ >> -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: flink-savepoint问题
我也遇到同样问题,为了打散数据,在keyby时加了随机数作为后缀,去掉随机数,可以正常savepoint,加上随机数就savepoint失败。所以如果确有要打散数据的需求,应该怎么处理呢? -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: flink-savepoint问题
是不是使用了随机key。 guaishushu1...@163.com 于2021年3月3日周三 下午6:53写道: > checkpoint 可以成功保存,但是savepoint出现错误: > java.lang.Exception: Could not materialize checkpoint 2404 for operator > KeyedProcess (21/48). > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:1100) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1042) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.util.concurrent.ExecutionException: > java.lang.IllegalArgumentException: Key group 0 is not in > KeyGroupRange{startKeyGroup=54, endKeyGroup=55}. > at java.util.concurrent.FutureTask.report(FutureTask.java:122) > at java.util.concurrent.FutureTask.get(FutureTask.java:192) > at > org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:450) > at > org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:47) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1011) > ... 3 more > Caused by: java.lang.IllegalArgumentException: Key group 0 is not in > KeyGroupRange{startKeyGroup=54, endKeyGroup=55}. > at > org.apache.flink.runtime.state.KeyGroupRangeOffsets.computeKeyGroupIndex(KeyGroupRangeOffsets.java:142) > at > org.apache.flink.runtime.state.KeyGroupRangeOffsets.setKeyGroupOffset(KeyGroupRangeOffsets.java:104) > at > org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeKVStateData(RocksFullSnapshotStrategy.java:314) > at > org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeSnapshotToOutputStream(RocksFullSnapshotStrategy.java:256) > at > org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.callInternal(RocksFullSnapshotStrategy.java:221) > at > org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.callInternal(RocksFullSnapshotStrategy.java:174) > at > org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:447) > ... 5 more > > > guaishushu1...@163.com >
Re: flink savepoint问题
Hi 对于问题 1 在反压的情况下,可能导致 Savepoint 做不成功从而超时,这个暂时没法解决,现在有一个 issue[1] 在做 Unalign Checkpoint 可以解决反压情况下的 checkpoint 对于问题 3,checkpoint 超时了,超时的定义:在设置的时间内(比如你这里 5 分钟),有 task 没有完成 snapshot。调长超时时间能够一定的缓解这个问题,不过你最好找到超时的原因,然后针对性的优化。 [1] https://issues.apache.org/jira/browse/FLINK-14551 Best, Congxian 大数据开发面试_夏永权 于2020年3月27日周五 下午4:19写道: > Hi,您好,在使用flink的过程中遇到如下问题,个人未能解决,所以请求您指导一下,谢谢! > > 1. flink cancel -s $SAVEPOINT_DIR $job_id -yid $application_id > 在程序有背压的时候停不掉 > > > The program finished with the following exception: > org.apache.flink.util.FlinkException: Could not cancel job > 1f768e4ca9ad5792a4844a5d12163b73. > at > org.apache.flink.client.cli.CliFrontend.lambda$cancel$6(CliFrontend.java:523) > at > org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:843) > at org.apache.flink.client.cli.CliFrontend.cancel(CliFrontend.java:515) > at > org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:904) > at > org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692) > at > org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968) > Caused by: java.util.concurrent.TimeoutException > at > java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771) > at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915) > at > org.apache.flink.client.cli.CliFrontend.lambda$cancel$6(CliFrontend.java:521) > ... 9 more > stop flink job failed!!! > > > > > 2.再用flink > sql的ddl时候增加一个字段后,程序启动失败,需要删除savepoint才能启动(ddl后双流join的逻辑,其中一个流加了一个字段) > > > The program finished with the following exception: > org.apache.flink.client.program.ProgramInvocationException: The main > method caused an error: > org.apache.flink.runtime.concurrent.FutureUtils$RetryException: Could not > complete the operation. Number of retries has been exhausted. > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335) > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205) > at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138) > at > org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664) > at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213) > at > org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895) > at > org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692) > at > org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968) > Caused by: java.util.concurrent.ExecutionException: > org.apache.flink.runtime.concurrent.FutureUtils$RetryException: Could not > complete the operation. Number of retries has been exhausted. > at > java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) > at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) > at > org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:83) > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1620) > at > com.etl.DwdBaseCarlifeTIotBindGeoManage.main(DwdBaseCarlifeTIotBindGeoManage.java:265) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321) > ... 11 more > Caused by: org.apache.flink.runtime.concurrent.FutureUtils$RetryException: > Could not complete the operation. Number of retries has been exhausted. > at > org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:284) > at > java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) > at > java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) > at > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) > at > java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) > at > org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$1(RestClient.java:342) > at >