Re: flink异常

2022-07-24 Thread Px New
樂

Zhanghao Chen  于2022年7月25日周一 13:39写道:

> 你好,可以检查下:
>
>   1.  tm 侧是否有异常,导致 tm 退出;
>   2.  tm 侧是否 gc 严重导致没有及时处理心跳;
>   3.  jm - tm 间是否网络有异常导致心跳信息无法传达。
>
> Best,
> Zhanghao Chen
> 
> From: 陈卓宇 <2572805...@qq.com.INVALID>
> Sent: Friday, July 22, 2022 11:30
> To: user-zh 
> Subject: flink异常
>
> 社区大佬您好,小弟请教一个问题:
> flink版本:1.14.5
> 异常内容如下:
> 2022-07-22 10:07:51
> java.util.concurrent.TimeoutException: Heartbeat of TaskManager with id
> bdp-changlog-mid-relx-middle-promotion-dev-taskmanager-1-1 timed out.
> at
> org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(JobMaster.java:1299)
> at
> org.apache.flink.runtime.heartbeat.HeartbeatMonitorImpl.run(HeartbeatMonitorImpl.java:111)
> at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208)
> at
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
> at akka.japi.pf
> .UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
> at
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
> at
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> at
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
> at akka.actor.ActorCell.invoke(ActorCell.scala:561)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
> at akka.dispatch.Mailbox.run(Mailbox.scala:225)
> at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at
> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
> 我该如何解决,如何优化
>


Re: [ANNOUNCE] Apache Flink Kubernetes Operator 1.1.0 released

2022-07-24 Thread Px New


Yang Wang  于2022年7月25日周一 10:55写道:

> Congrats! Thanks Gyula for driving this release, and thanks to all
> contributors!
>
>
> Best,
> Yang
>
> Gyula Fóra  于2022年7月25日周一 10:44写道:
>
> > The Apache Flink community is very happy to announce the release of
> Apache
> > Flink Kubernetes Operator 1.1.0.
> >
> > The Flink Kubernetes Operator allows users to manage their Apache Flink
> > applications and their lifecycle through native k8s tooling like kubectl.
> >
> > Please check out the release blog post for an overview of the release:
> >
> >
> https://flink.apache.org/news/2022/07/25/release-kubernetes-operator-1.1.0.html
> >
> > The release is available for download at:
> > https://flink.apache.org/downloads.html
> >
> > Maven artifacts for Flink Kubernetes Operator can be found at:
> >
> >
> https://search.maven.org/artifact/org.apache.flink/flink-kubernetes-operator
> >
> > Official Docker image for the Flink Kubernetes Operator can be found at:
> > https://hub.docker.com/r/apache/flink-kubernetes-operator
> >
> > The full release notes are available in Jira:
> >
> >
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12351723
> >
> > We would like to thank all contributors of the Apache Flink community who
> > made this release possible!
> >
> > Regards,
> > Gyula Fora
> >
>


Re: 关于Flink对中标/银河麒麟和统信的适配

2022-07-23 Thread Px New
嗨。适配的话可以参考下 华为的鲲鹏适配。arm、kylin、龙芯 适配


On Fri, Jul 22, 2022 at 09:44 张 兴博  wrote:

> 尊敬的Flink贡献者您好:
>
>  
> 由于政策问题,很多国企和央企都将要采用银河麒麟或者中标麒麟或者统信,请问Flink/PyFlink在以上三个系统中有进行适配应用吗?有无问题?以及和在Centos部署的有何不同呢?
>
>期望得到回复,万分感谢!
>


Re: 退订

2021-07-06 Thread Px New
如果需要取消订阅 u...@flink.apache.org 
邮件组,请发送任意内容的邮件到 user-zh-unsubscr...@flink.apache.org  。

张保淇  于2021年7月6日周二 下午4:13写道:

> 退订


Re: Re: flink waterMark 相关问题

2021-01-12 Thread Px New
private long autoWatermarkInterval = 200;

/**
 * Interval in milliseconds for sending latency tracking marks from
the sources to the sinks.
 */


张锴  于2021年1月13日周三 上午10:26写道:

> ok,明白了
>
> anonnius  于2021年1月13日周三 上午10:20写道:
>
> > 在 StreamExecutionEnvironmennt的方法@PublicEvolving   public
> > void setStreamTimeCharacteristic(TimeCharacteristic characteristic)
> {
> >  this.timeCharacteristic =
> > Preconditions.checkNotNull(characteristic);  if
> > (characteristic == TimeCharacteristic.ProcessingTime) {
> >  getConfig().setAutoWatermarkInterval(0);   } else {
> >  getConfig().setAutoWatermarkInterval(200);
> >  }  }
> > 在 2021-01-13 10:09:54,"张锴"  写道:
> > >我从ExecutionConfig找到了,private long autoWatermarkInterval =
> > >0,并不是200毫秒,这个代表一个时间戳就代表一个watermark是吗
> > >
> > >Ball's Holy <873925...@qq.com> 于2021年1月13日周三 上午9:42写道:
> > >
> > >> hi张锴,
> > >> 如果我没记错的话,默认时间间隔是200毫秒,具体设置细节可以看flink.api.common.ExecutionConfig
> > >> 对应的属性autoWatermarkInterval
> > >>
> > >>
> > >>
> > >>
> > >> --原始邮件--
> > >> 发件人: "anonnius" > >> 发送时间: 2021年1月13日(星期三) 上午9:19
> > >> 收件人: "user-zh" > >> 主题: Re:flink waterMark 相关问题
> > >>
> > >>
> > >>
> > >> 可以看一下 ExecutionConfig这个类
> > >> 在 2021-01-12 17:55:47,"张锴"  > >> hi,我使用的flink版本是1.10,我想问一下watermark的默认时间间隔是多少,以及所对应的源码位置在哪里,我并没有找到。
> >
>


Re: flink如何传递全局变量

2020-06-09 Thread Px New
对
正如 -> 1048262223  所说的一样 , 目前我就是通过BroadCast 动态更细一些规则带到下游并在Process method 中
进行操作 | 

zjfpla...@hotmail.com  于2020年6月9日周二 下午8:14写道:

> hi,
> 请问flink如何传递全局变量,静态类好像服务器端运行不行。
> 场景是:一开始flink程序起来时,读取配置文件中的配置项,此类配置项可能会在sink,source等等其他地方用到,算是整个程序的全局配置
>
>
>
> zjfpla...@hotmail.com
>


Re: Flink sql 中 无法通过 TableEnvironment 调出 createTemporaryTable()方法 以及.TableException: findAndCreateTableSource failed 异常

2020-06-09 Thread Px New
Hi *Benchao Li*   Thanks ,你说的很对 我现在已经走在了sql的实践道路上(还好有你指出)

Benchao Li  于2020年6月9日周二 上午10:05写道:

> Hi,
> 我看你用的是1.9.1版本,但是createTemporaryTable应该是在1.10之后才引入的。不知道你参考的是哪一版的文档呢?
>
> Px New <15701181132mr@gmail.com> 于2020年6月8日周一 下午10:00写道:
>
> > Hi 社区:  关于flink sql 使用上的一个问题以及一个sql异常
> > 
> > 我通过官网给出的结构编写代码时发现注册临时表方法无法被调用?[图1, 图 2, 图 3]
> > 通过 tableEnvironment 调用createTemporaryTable 方法
> >
> > 我排查过。 但还是没能解决
> > 1:包倒入的是官网所声明的包。
> > 2:类倒入的是 flink.table.api.TableEnvironment/以及.java.StreamTableEnvironment
> 两个类
> >
> >
> >  图 1 (依赖导入):
> > https://imgkr.cn-bj.ufileos.com/941dbd86-34f4-4579-a53d-f7c04439d6f0.PNG
> >  图 2 (import *):
> > https://imgkr.cn-bj.ufileos.com/e19b90f7-ef60-42d8-a93e-d65c4269e053.png
> >  图 3 (无法调用?):
> > https://imgkr.cn-bj.ufileos.com/579e5336-503c-490f-83b9-ebb46bd1b568.png
> >  图 4 (官网格式):
> > https://imgkr.cn-bj.ufileos.com/fcc7365e-aecf-49a3-94e1-ae64ee67122e.png
> > 图5 (TableException: findAndCreateTableSource failed 异常)
> > https://imgkr.cn-bj.ufileos.com/386a783a-a42e-4466-97f8-540adb524be0.PNG
> > 以及:
> > https://imgkr.cn-bj.ufileos.com/9cac88cc-7601-4b03-ade1-f2432c84adac.PNG
> >
>
>
> --
>
> Best,
> Benchao Li
>


Flink sql 中 无法通过 TableEnvironment 调出 createTemporaryTable()方法 以及.TableException: findAndCreateTableSource failed 异常

2020-06-08 Thread Px New
Hi 社区:  关于flink sql 使用上的一个问题以及一个sql异常

我通过官网给出的结构编写代码时发现注册临时表方法无法被调用?[图1, 图 2, 图 3]
通过 tableEnvironment 调用createTemporaryTable 方法

我排查过。 但还是没能解决
1:包倒入的是官网所声明的包。
2:类倒入的是 flink.table.api.TableEnvironment/以及.java.StreamTableEnvironment 两个类


 图 1 (依赖导入):
https://imgkr.cn-bj.ufileos.com/941dbd86-34f4-4579-a53d-f7c04439d6f0.PNG
 图 2 (import *):
https://imgkr.cn-bj.ufileos.com/e19b90f7-ef60-42d8-a93e-d65c4269e053.png
 图 3 (无法调用?):
https://imgkr.cn-bj.ufileos.com/579e5336-503c-490f-83b9-ebb46bd1b568.png
 图 4 (官网格式):
https://imgkr.cn-bj.ufileos.com/fcc7365e-aecf-49a3-94e1-ae64ee67122e.png
图5 (TableException: findAndCreateTableSource failed 异常)
https://imgkr.cn-bj.ufileos.com/386a783a-a42e-4466-97f8-540adb524be0.PNG
以及:
https://imgkr.cn-bj.ufileos.com/9cac88cc-7601-4b03-ade1-f2432c84adac.PNG


Re: 关于flinksql 与维表mysql的关联问题

2020-06-07 Thread Px New
好的 我可以理解为是: 通过env.addsouce创建一个广播流。下游connect后 在process方法中操作?

1048262223 <1048262...@qq.com>于2020年6月7日 周日下午3:57写道:

> Hi
>
>
> 可以使用open + broadcast的方式解决~
>
>
> Best,
> Yichao Yang
>
>
>
>
>
> ------原始邮件--
> 发件人:"Px New"<15701181132mr@gmail.com;
> 发送时间:2020年6月6日(星期六) 上午9:50
> 收件人:"user-zh"
> 主题:Re: 关于flinksql 与维表mysql的关联问题
>
>
>
> Hi ,我有一个相关操作的一疑问.
> 疑问: 如果我放在open 中的规则 有可能发生改变,需要重新更新的话呢?
>
> Michael Ran 
>  放到open 方法里面可以吗?
>  在 2020-06-04 14:15:05,"小屁孩" <932460...@qq.com 写道:
>  dear:nbsp; nbsp; 我有个问题想请教下,关于flinksql与mysql维表关联
> 关于mysql更新的问题
> 
> 有没有好的方法?我现在使用的广播mysql但是有个问题,广播mysql时我的流数据已经到了但是mysql的数据还没有到导致我现在数据会再启动的时候丢失一部分数据
> 


Re: 关于flinksql 与维表mysql的关联问题

2020-06-05 Thread Px New
Hi ,我有一个相关操作的一疑问.
疑问: 如果我放在open 中的规则 有可能发生改变,需要重新更新的话呢?

Michael Ran  于2020年6月4日周四 下午5:22写道:

> 放到open 方法里面可以吗?
> 在 2020-06-04 14:15:05,"小屁孩" <932460...@qq.com> 写道:
> >dear:  我有个问题想请教下,关于flinksql与mysql维表关联 关于mysql更新的问题
> 有没有好的方法?我现在使用的广播mysql但是有个问题,广播mysql时我的流数据已经到了但是mysql的数据还没有到导致我现在数据会再启动的时候丢失一部分数据
>


Re: CheckPoint Dir 路径下引发的一些问题

2020-06-05 Thread Px New
感谢回复, 我明白了在状态恢复时具体细节,以及其他文件的产生及作用

Weihua Hu  于2020年6月5日周五 下午1:48写道:

> HI, Px New
>
> 1. Checkpoint 保存数量可以通过参数: state.checkpoints.num-retained 来控制,默认是 1
> 2. _metadata 只是一些元数据,保存了state 的句柄,其他文件是 state 数据,由各 Task 在触发 checkpoint
> 的时候上传。相反在恢复 checkpoint 的时候JM 读取_metadata 将相应句柄下发到 Task,Task 通过远端 HDFS 拉取对应的
> state。
>
>
> Best
> Weihua Hu
>
> > 2020年6月5日 13:36,Px New <15701181132mr@gmail.com> 写道:
> >
> > Hi everyOne 有一个关于CheckPoint相关的一个问题:
> > 1.我在项目中使用的状态后端为:Fsstatebackend
> > 2.我在jobManager的log输出找到相应的job ID后 去对应的HDFS 找到了对应的chk目录
> > 3.但我有两个疑问:
> > 3.1.没有设置 chk的存储数默认是多保留多少份呢(我这边看到保留了近20次的chk)?
> > 3.2 当我点进具体的chk-id 后 发现有很多文件[见2图] 我清楚的是当任务发生异常后tesk 会从hdfs 将_metadata
> 下载后进行任务恢复操作的,那其他的哪些文件是如何产生的?以及有什么作用呢?
> > 期待回复:
> >
> >
> >
>
>


Re: CheckPoint Dir 路径下引发的一些问题

2020-06-05 Thread Px New
哦 对此我很抱歉:
图1:  https://i.loli.net/2020/06/05/SAfpnkqlOUM9hD3.png
图2:
https://imgkr.cn-bj.ufileos.com/aed4cb64-dd24-4076-ba4c-a0e07bc356bf.png

zhiyezou <1530130...@qq.com> 于2020年6月5日周五 下午1:58写道:

> Hi
> 麻烦使用第三方图床,把图片链接过来,直接贴图片的话显示不出来
>
>
>
>
> --原始邮件--
> 发件人:"Weihua Hu" 发送时间:2020年6月5日(星期五) 中午1:48
> 收件人:"user-zh"
> 主题:Re: CheckPoint Dir 路径下引发的一些问题
>
>
>
> HI, Px New
>
> 1. Checkpoint 保存数量可以通过参数: state.checkpoints.num-retained 来控制,默认是 1
> 2. _metadata 只是一些元数据,保存了state 的句柄,其他文件是 state 数据,由各 Task 在触发 checkpoint
> 的时候上传。相反在恢复 checkpoint 的时候JM 读取_metadata 将相应句柄下发到 Task,Task 通过远端 HDFS 拉取对应的
> state。
>
>
> Best
> Weihua Hu
>
>  2020年6月5日 13:36,Px New <15701181132mr@gmail.com 写道:
> 
>  Hi everyOne 有一个关于CheckPoint相关的一个问题:
>  1.我在项目中使用的状态后端为:Fsstatebackend
>  2.我在jobManager的log输出找到相应的job ID后 去对应的HDFS 找到了对应的chk目录
>  3.但我有两个疑问:
>  3.1.没有设置 chk的存储数默认是多保留多少份呢(我这边看到保留了近20次的chk)?
>  3.2 当我点进具体的chk-id 后 发现有很多文件[见2图] 我清楚的是当任务发生异常后tesk 会从hdfs 将_metadata
> 下载后进行任务恢复操作的,那其他的哪些文件是如何产生的?以及有什么作用呢?
>  期待回复:
> 
> 
> 


CheckPoint Dir 路径下引发的一些问题

2020-06-04 Thread Px New
Hi everyOne 有一个关于CheckPoint相关的一个问题:
1.我在项目中使用的状态后端为:Fsstatebackend
2.我在jobManager的log输出找到相应的job ID后 去对应的HDFS 找到了对应的chk目录
3.但我有两个疑问:
3.1.没有设置 chk的存储数默认是多保留多少份呢(我这边看到保留了近20次的chk)?
3.2 当我点进具体的chk-id 后 发现有很多文件[见2图] 我清楚的是当任务发生异常后tesk 会从hdfs 将_metadata
下载后进行任务恢复操作的,那其他的哪些文件是如何产生的?以及有什么作用呢?
期待回复:

[image: image.png]
[image: image.png]


Re: Flink 1.6 版本,RocksDBStateBackend ,增量 Checkpoint 目录问题

2020-06-04 Thread Px New
对  我认为这也不是很合理, 不知道有什么更好的解释吗

LakeShen  于2020年1月16日周四 下午5:06写道:

> 原来是这样,中间的确有很多 Checkpoint 超时失败,对于这种由于 Checkpoint 失败创建的目录,Flink 本身是不会删除的,对吗?
> 也就是这部分状态文件,会一直存储在HDFS 上面,这种情况是否会造成 Flink Checkpoint 目录下 shared目录无限增大呢,
> 非常感谢你,lucas,期待你的回复。
>
> lucas.wu  于2020年1月16日周四 下午4:50写道:
>
> > 可能是失败的checkpoint目录,可以看看程序中间是不是有失败的checkpoint
> >
> >
> > 原始邮件
> > 发件人:lakeshenshenleifight...@gmail.com
> > 收件人:user-zhuser...@flink.apache.org
> > 发送时间:2020年1月16日(周四) 16:47
> > 主题:Flink 1.6 版本,RocksDBStateBackend ,增量 Checkpoint 目录问题
> >
> >
> > 我使用 Flink 1.6 版本,使用的增量 Checkpoint,我看官网说,默认的 Checkpoint
> > 保留目录是1,也就是会保留一个最新完成的 Checkpoint 目录,但是在我的任务 Checkpoint 路径下面,居然有很多个 chk-xxx
> > 目录,比如说 chk-86515,chk-37878,而且在这些目录下面,还有数据,这是什么原因呢。
> > 对这个地方有点困惑,既然默认保留的目录是1了,为什么还有这么多 chk 目录呢。 期待你的回答
>


Re: flink数据sink到mysql 是事务处理

2020-06-02 Thread Px New
Hi 我最近在处理幂等性写入Mysql 但相关文档太少并没有实质性的操作, 所有方便参观下你这边实务写入的code吗? 非常感谢你
也可发code到我的email 15701181132mr@gmail.com

1101300123  于2020年4月10日周五 上午11:42写道:

>
>
> 目前使用flink处理kafka数据后sink到mysql,处理完的数据是一个用户ID对应多条记录,都要插入到mysql,所以在sink中会循环list把数据组装成batch提交,所以mysql开启了事务保证一批数据的事务。现在发现程序运行几个小时候会出现mysql死锁!想了解一下大家是怎么处理sink端
> 多条记录的操作
> sink的invoke代码
> @Override
> public void invoke(Tuple5 List> value, Context context) throws Exception {
> connection.setAutoCommit(false);
> List f4 = value.f4;
> for (BroadBandReq rs: f4){
> statement.setString(1,rs.getUserId());
> statement.setString(2,rs.getPhoneNum());
> statement.setString(3,rs.getProvId());
> statement.addBatch();
> }
> try {
> statement.executeBatch();
> connection.commit();
> }catch (Exception e){
> LOG.info(" add data for rds ; operTag:{},
> userId:{},removeTag:{},stateCode:{}, phoneList:{}", value.f0, value.f1,
> value.f2, value.f3,f4);
> connection.rollback();
> e.printStackTrace();
> throw new Exception(e);
> }
> }
>
>
>
>
> java.lang.Exception: java.sql.BatchUpdateException: Deadlock found when
> trying to get lock; try restarting transaction
> at com.mycom.ogg.RdsOperaterSink.invoke(RdsOperaterSink.java:73)
> at com.mycom.ogg.RdsOperaterSink.invoke(RdsOperaterSink.java:18)
> at
> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708)
> at
> org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:40)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:627)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:718)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:736)
> at
> org.apache.flink.streaming.api.operators.ProcessOperator$ContextImpl.output(ProcessOperator.java:102)
> at
> com.unicom.ogg.ExtractOggApplicationV2$6.processElement(ExtractOggApplicationV2.java:162)
> at
> com.unicom.ogg.ExtractOggApplicationV2$6.processElement(ExtractOggApplicationV2.java:157)
> at
> org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708)
> at
> org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:40)
> at
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:173)
> at org.apache.flink.streaming.runtime.io
> .StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:151)
> at org.apache.flink.streaming.runtime.io
> .StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:128)
> at org.apache.flink.streaming.runtime.io
> .StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:311)
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: 

Re: Re: flink savepoint checkpoint

2020-01-10 Thread Px New
Hello ,针对于你的问题 我发现一件有趣的事情
在我以 Yarn per-Job 方式 启动Job程序后  在yarn 的资源管理界面 可以看到我启动的任务 ->
它有属于自己的application-Id  然后当我 通过Yarn 的Tracking Ui 下的 Application
<http://node01:8088/proxy/application_1577499691717_0064/>Master
点击进入到Job的Web Ui 界面后(flink的web ui)通过在此界面点击canal 这个按钮 kill 掉程序后 在Yarn 的 管理界面
发现还是有个空壳子的。  当我通过在终端输入 yarn application -kill Id  后 这个程序才会被杀死。 所以我初步认为
他是Stop 程序。

祝好!
Px

amen...@163.com  于2020年1月10日周五 下午5:59写道:

> hi,了解到使用stop进行任务停止并触发savepoint,会在停止之前生成max_watermark,并注册event-time计时器,我想请问使用yarn
> kill方式直接停止任务,会属于cancel还是stop亦或是其他?
>
>
>
> amen...@163.com
>
> From: Congxian Qiu
> Date: 2020-01-10 17:16
> To: user-zh
> Subject: Re: flink savepoint checkpoint
> Hi
> 从 Flink 的角度看,Checkpoint 用户 Job 运行过程中发生 failover 进行恢复,savepoint 用于 Job
> 之间的状态复用。
> 另外,从 1.9 开始,可以尝试下 StopWithSavepoint[1],以及社区另外一个 issue 尝试做
> StopWithCheckpoint[2]
>
> [1] https://issues.apache.org/jira/browse/FLINK-11458
> [2] https://issues.apache.org/jira/browse/FLINK-12619
> Best,
> Congxian
>
>
> zhisheng  于2020年1月10日周五 上午11:39写道:
>
> > hi,这个参数我理解的作用应该是:作业取消的时候是否清除之前的 checkpoint,但这个 checkpoint
> > 的并不一定是作业最新的状态。而如果你取消命令的时候触发一次 savepoint,那么这次的状态就是最新且最全的。
> >
> > 祝好!
> > zhisheng
> >
> > Px New <15701181132mr@gmail.com> 于2020年1月10日周五 上午10:58写道:
> >
> > > Hello ,针对于你这个问题 在FLink 中 是有这个配置的. 当程序stop时,会额外保存检查点
> > > -->
> > >
> > >
> > >
> >
> env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
> > >
> > >
> > > lucas.wu  于2019年12月11日周三 上午11:56写道:
> > >
> > > > hi 各位:
> > > >
> > > >
> > >
> >
> 有个问题想和大家讨论一下,就是flink的savepoint为什么要设置成手动的?如果在stop程序的时候没有做savepoint的话,在我重启的时候就不能使用之前保存的一些状态信息。为什么不参考spark的方式,定时做checkpoint,然后启动的时候指定ck地址就可以从上次执行的地方继续执行。
> > >
> >
>


Re: flink savepoint checkpoint

2020-01-09 Thread Px New
Hello ,针对于你这个问题 在FLink 中 是有这个配置的. 当程序stop时,会额外保存检查点
-->

env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);


lucas.wu  于2019年12月11日周三 上午11:56写道:

> hi 各位:
>
> 有个问题想和大家讨论一下,就是flink的savepoint为什么要设置成手动的?如果在stop程序的时候没有做savepoint的话,在我重启的时候就不能使用之前保存的一些状态信息。为什么不参考spark的方式,定时做checkpoint,然后启动的时候指定ck地址就可以从上次执行的地方继续执行。


Re: checkpoint、state

2020-01-09 Thread Px New
Yes, CheckPoint 是一个封装后的Meta信息 而这个被封装的Meta信息是有所有Operator 的state 的组成了

hahaha sc  于2019年11月29日周五 下午4:12写道:

>
> flink的每条数据既然都做了checkpoint,做成全局分布式一致性快照,那还需要本地state干啥呢?是否可以理解成,本地state是一致性快照的一部分而已?
>   昨天看了 社区的直播回放,听PMC的介绍,好像不是一回事。
>


Re: 流处理任务失败该如何追回之前的数据

2020-01-09 Thread Px New
rollback 后
taskManager 会去获取持久化存储的snapshot , Source 也会回放到 做CheckPoint 时的那个点上
不论你使用的是是什么时间吧 -


Dian Fu  于2019年11月14日周四 下午1:14写道:

> 如果使用的event
> time,watermark是根据event计算出来的,和系统时间没有关系,所以从最后一次checkpoint恢复即可。为什么你会觉得有问题?
>
> > 在 2019年11月13日,下午8:29,柯桂强  写道:
> >
> >
> 我现在有一个流处理任务失败了,并且保留了checkpoint或者savepoint,我希望从最后一次checkpoint恢复,但是任务使用的是事件时间,超过窗口的数据就会被丢弃,我想到一个方法是,重启之前的数据通过批处理完成然后跑流处理,想问问大家这个方案是否可行,但是感觉如何限定批处理的范围并且和之后的流处理完美拼接是一个比较难的问题
>
>


Re: Flink DataStream KeyedStream 与 AggregateFunction

2019-11-09 Thread Px New
[image: image.png]建议深入解下 keyWindow,NoKeyWindow 与Assigner TimeWindow
And WindowsFunction 

Yuan,Youjun  于2019年11月9日周六 下午7:46写道:

> 1, 是
> 2,没有标准答案,是否可以本地先聚合?
> 3,AggFunc是指定做何种聚合,是sum, 还是avg, 还是count。不指定的话,Flink哪里指导你要计算啥?
>
> -邮件原件-
> 发件人: 王佩 
> 发送时间: Saturday, November 9, 2019 11:45 AM
> 收件人: user-zh 
> 主题: Flink DataStream KeyedStream 与 AggregateFunction
>
> 请教下:
>
> 1、DataStream 如按用户ID KeyBy后,同一个用户ID的数据最终会被分到一个Partition中吗?
>
> 2、假设1成立,这样就会有数据倾斜的问题。该如何解决?
>
> 3、假设1成立,如: DataStream
>.keyBy(userID)
>.timeWindow()
>.aggregate(new
> AggregateFunction(...)),这里的AggregateFunction
> 为啥还需要merge呢。因为同一个Key的数据只会在同一个Partition中被计算,觉得不需要merge啊。
>
> 这三个问题有点疑惑,大神们帮忙看下!
> 感谢!
>