Checkpoint dir is not cleaned up after cancel the job with monitoring API

2020-09-26 文章 Eleanore Jin
Hi experts,

I am running flink 1.10.2 on kubernetes as per job cluster. Checkpoint is
enabled, with interval 3s, minimumPause 1s, timeout 10s. I'm
using FsStateBackend, snapshots are persisted to azure blob storage
(Microsoft cloud storage service).

Checkpointed state is just source kafka topic offsets, the flink job is
stateless as it does filter/json transformation.

The way I am trying to stop the flink job is via monitoring rest api
mentioned in doc


e.g.
curl -X PATCH \
  'http://localhost:8081/jobs/3c00535c182a3a00258e2f57bc11fb1a?mode=cancel'
\
  -H 'Content-Type: application/json' \
  -d '{}'

This call returned successfully with statusCode 202, then I stopped the
task manager pods and job manager pod.

According to the doc, the checkpoint should be cleaned up after the job is
stopped/cancelled.
What I have observed is, the checkpoint dir is not cleaned up, can you
please shield some lights on what I did wrong?

Below shows the checkpoint dir for a cancelled flink job.
[image: image.png]

Thanks!
Eleanore


Re: Flink SQL撤回流问题

2020-09-26 文章 xiao cai
如果是先delete再insert,为何主键id一直都是1呢,如果delete在insert时指定了id值,那么auto_increment是不会变的。
这是我很困惑的地方。


 原始邮件 
发件人: lec ssmi
收件人: flink-user-cn
发送时间: 2020年9月27日(周日) 13:06
主题: Re: Flink SQL撤回流问题


是不是底层的sink在处理retract的时候,使用的是先delte再insert , 而不是直接update呢。 xiao cai 
 于2020年9月27日周日 下午12:08写道: > 场景如下: > source table: kafka > sink 
table: mysql schem(id, dt, cnt) > > > insert : > insert into sink > select 
dt,count(distinct id) from source group by dt; > > > 这时mysql对应sink表中有一条数据(1, 
2020-09-25, 100),随着事件消费,cnt字段会不断变化,id,dt保持不变 > show create table 
sink可以发现auto_increment在不断的变大。 > 当超过id的取值范围,就会报错了。 > > > 原始邮件 > 发件人: Michael 
Ran > 收件人: user-zh > 发送时间: 
2020年9月27日(周日) 11:51 > 主题: Re:Flink SQL撤回流问题 > > > 详细场景描述下呢 在 2020-09-27 
11:48:36,"xiao cai"  写道: >Hi: > >使用Flink 
SQL撤回流写入MySQL,表的auto_increment > 
越来越大是为什么,我的输出结果只有一条,mysql表里也只有一条数据,自增主键id的值一直为1,但是auto_increment却越来越大。求解答。

FlinkSQL是否支持设置窗口trigger实现continuious trigger呢

2020-09-26 文章 赵一旦
如题,不使用DatastreamAPI,使用FlinkSQL能否实现五分钟窗口,每10秒输出一次呢?


Re: Flink SQL撤回流问题

2020-09-26 文章 lec ssmi
是不是底层的sink在处理retract的时候,使用的是先delte再insert , 而不是直接update呢。

xiao cai  于2020年9月27日周日 下午12:08写道:

> 场景如下:
> source table:   kafka
> sink table:   mysql  schem(id, dt, cnt)
>
>
> insert :
> insert into sink
> select dt,count(distinct id) from source group by dt;
>
>
> 这时mysql对应sink表中有一条数据(1, 2020-09-25, 100),随着事件消费,cnt字段会不断变化,id,dt保持不变
> show create table sink可以发现auto_increment在不断的变大。
> 当超过id的取值范围,就会报错了。
>
>
>  原始邮件
> 发件人: Michael Ran
> 收件人: user-zh
> 发送时间: 2020年9月27日(周日) 11:51
> 主题: Re:Flink SQL撤回流问题
>
>
> 详细场景描述下呢 在 2020-09-27 11:48:36,"xiao cai"  写道: >Hi:
> >使用Flink SQL撤回流写入MySQL,表的auto_increment
> 越来越大是为什么,我的输出结果只有一条,mysql表里也只有一条数据,自增主键id的值一直为1,但是auto_increment却越来越大。求解答。


flink多流关联

2020-09-26 文章 Dream-底限
hi
我们这面想用flink在实时数仓上面做些事情,进行调研后发现数仓可能有多张表进行join、现flink有两种实现方案:第一种是流表lookup时态表,但时态表更新可能会延迟导致查询结果不准确,并且io可能过大;第二种是双流关联,但是如果说有五张表进行join的话,除了状态太大还有其他问题吗,或者说有多流相互等待的问题吗


Re:Flink SQL撤回流问题

2020-09-26 文章 xiao cai
场景如下:
source table:   kafka
sink table:   mysql  schem(id, dt, cnt)


insert :
insert into sink 
select dt,count(distinct id) from source group by dt;


这时mysql对应sink表中有一条数据(1, 2020-09-25, 100),随着事件消费,cnt字段会不断变化,id,dt保持不变
show create table sink可以发现auto_increment在不断的变大。
当超过id的取值范围,就会报错了。


 原始邮件 
发件人: Michael Ran
收件人: user-zh
发送时间: 2020年9月27日(周日) 11:51
主题: Re:Flink SQL撤回流问题


详细场景描述下呢 在 2020-09-27 11:48:36,"xiao cai"  写道: >Hi: >使用Flink 
SQL撤回流写入MySQL,表的auto_increment 
越来越大是为什么,我的输出结果只有一条,mysql表里也只有一条数据,自增主键id的值一直为1,但是auto_increment却越来越大。求解答。

Re:Flink SQL撤回流问题

2020-09-26 文章 Michael Ran
详细场景描述下呢
在 2020-09-27 11:48:36,"xiao cai"  写道:
>Hi:
>使用Flink SQL撤回流写入MySQL,表的auto_increment 
>越来越大是为什么,我的输出结果只有一条,mysql表里也只有一条数据,自增主键id的值一直为1,但是auto_increment却越来越大。求解答。


Flink SQL撤回流问题

2020-09-26 文章 xiao cai
Hi:
使用Flink SQL撤回流写入MySQL,表的auto_increment 
越来越大是为什么,我的输出结果只有一条,mysql表里也只有一条数据,自增主键id的值一直为1,但是auto_increment却越来越大。求解答。

Re:Flink 1.11.1 JDBC Sink 使用旧版配置 数据丢失

2020-09-26 文章 Michael Ran
update 怎么触发的 delete 哦?
在 2020-09-14 11:37:07,"LittleFall" <1578166...@qq.com> 写道:
>Flink 版本:
>flink:1.11.1-scala_2.12
>连接器
>mysql-connector-java-8.0.21
>flink-sql-connector-kafka_2.12-1.11.1
>flink-connector-jdbc_2.12-1.11.1
>
>Flink SQL:
>
>CREATE TABLE source_user_name (
>loan_no int,
>name varchar,
>PRIMARY KEY (loan_no) NOT ENFORCED
>) WITH (
>'connector' = 'kafka',   
>'topic' = 'test.username',
>'properties.bootstrap.servers' = 'kafka:9092',
>'properties.group.id' = 'test_flink_name_group',
>'format'='canal-json',
>'scan.startup.mode' = 'group-offsets'
>);
>
>CREATE TABLE test_flink_name_sink (
>loan_no int,
>name varchar,
>PRIMARY KEY (loan_no) NOT ENFORCED
>) WITH (
>'connector.type' = 'jdbc',
>'connector.url' =
>'jdbc:mysql://host.docker.internal:3306/test?=true',
>'connector.table' = 'username',
>'connector.driver' = 'com.mysql.cj.jdbc.Driver',
>'connector.username' = 'root',
>'connector.password' = '',
>'connector.write.flush.max-rows' = '5000',
>'connector.write.flush.interval' = '1s'
>);
>
>insert into test_flink_name_sink (loan_no,name)
>select loan_no,name from source_user_name;
>
>
>外部 sql:
>
>CREATE TABLE username (
>loan_no int PRIMARY KEY,
>name varchar(10)
>);
>
>insert into username values (1,'a');
>
>架构是 mysql-canal-kafka-flink-mysql
>
>同时执行(一次输入两行)
>
>UPDATE `username` SET `name` = 'b' WHERE `loan_no` = 1;
>UPDATE `username` SET `name` = 'a' WHERE `loan_no` = 1;
>
>发现目标数据库中结果丢失,结果稳定复现。
>
>分析原因:
>
>```
>上游一个update下游会落地两个sql
>1.insert into after value
>2.delete before value
>而且insert和delete是分开两个statement batch提交的。先insert batch再delete batch
>
>如果上游同时有两个update,update逻辑id:1,name:a更新为id:1,name:b再更新为id:1,name:a
>这个时候就会触发问题
>insert batch结束之后数据变成了id:1,name:a
>再执行delete batch的第一条before delete:delete id=1 name=a会直接把最终正确的数据删除了
>第二条的before delete:delete id=1 name=b删除不到数据。因为数据已经被删除了
>```
>
>换成新版 JDBC 配置之后没有这个问题。
>
>请问这是已经发现的问题吗?有没有 issue 号
>
>
>
>--
>Sent from: http://apache-flink.147419.n8.nabble.com/


Re:Flink 1.11.1 JDBC Sink 使用旧版配置 数据丢失

2020-09-26 文章 Michael Ran
update 怎么触发的 delete 哦?
在 2020-09-14 11:37:07,"LittleFall" <1578166...@qq.com> 写道:
>Flink 版本:
>flink:1.11.1-scala_2.12
>连接器
>mysql-connector-java-8.0.21
>flink-sql-connector-kafka_2.12-1.11.1
>flink-connector-jdbc_2.12-1.11.1
>
>Flink SQL:
>
>CREATE TABLE source_user_name (
>loan_no int,
>name varchar,
>PRIMARY KEY (loan_no) NOT ENFORCED
>) WITH (
>'connector' = 'kafka',   
>'topic' = 'test.username',
>'properties.bootstrap.servers' = 'kafka:9092',
>'properties.group.id' = 'test_flink_name_group',
>'format'='canal-json',
>'scan.startup.mode' = 'group-offsets'
>);
>
>CREATE TABLE test_flink_name_sink (
>loan_no int,
>name varchar,
>PRIMARY KEY (loan_no) NOT ENFORCED
>) WITH (
>'connector.type' = 'jdbc',
>'connector.url' =
>'jdbc:mysql://host.docker.internal:3306/test?=true',
>'connector.table' = 'username',
>'connector.driver' = 'com.mysql.cj.jdbc.Driver',
>'connector.username' = 'root',
>'connector.password' = '',
>'connector.write.flush.max-rows' = '5000',
>'connector.write.flush.interval' = '1s'
>);
>
>insert into test_flink_name_sink (loan_no,name)
>select loan_no,name from source_user_name;
>
>
>外部 sql:
>
>CREATE TABLE username (
>loan_no int PRIMARY KEY,
>name varchar(10)
>);
>
>insert into username values (1,'a');
>
>架构是 mysql-canal-kafka-flink-mysql
>
>同时执行(一次输入两行)
>
>UPDATE `username` SET `name` = 'b' WHERE `loan_no` = 1;
>UPDATE `username` SET `name` = 'a' WHERE `loan_no` = 1;
>
>发现目标数据库中结果丢失,结果稳定复现。
>
>分析原因:
>
>```
>上游一个update下游会落地两个sql
>1.insert into after value
>2.delete before value
>而且insert和delete是分开两个statement batch提交的。先insert batch再delete batch
>
>如果上游同时有两个update,update逻辑id:1,name:a更新为id:1,name:b再更新为id:1,name:a
>这个时候就会触发问题
>insert batch结束之后数据变成了id:1,name:a
>再执行delete batch的第一条before delete:delete id=1 name=a会直接把最终正确的数据删除了
>第二条的before delete:delete id=1 name=b删除不到数据。因为数据已经被删除了
>```
>
>换成新版 JDBC 配置之后没有这个问题。
>
>请问这是已经发现的问题吗?有没有 issue 号
>
>
>
>--
>Sent from: http://apache-flink.147419.n8.nabble.com/


Re:Re: flink canal-json 如何获取每条数据是updata insert delete

2020-09-26 文章 Michael Ran
我们也想用,能暴露一个出来不- -
在 2020-09-25 10:39:12,"Jark Wu"  写道:
>如果要抽取对应的 type,需要用 format = json, 把 json 的完整数据结构在 DDL 中声明出来(包括 type)。
>
>目前 canal-json 是自动将 changelog 转成了 Flink 的 insert/update/delete,而这个 change
>flag 目前是不对用户暴露的。
>
>Best,
>Jark
>
>On Fri, 25 Sep 2020 at 09:39, air23  wrote:
>
>> 你好
>> flink canal-json 如何获取每条数据是updata insert delete   ,我ddl插件kafka表 用对应的type取
>> 都是为null
>> 这个操作类型 有办法取到吗?谢谢
>>
>>


Re: 查询hbase sink结果表,有时查到数据,有时查不到

2020-09-26 文章 Leonard Xu
Hi
> 实际在HBaseSinkFunction中打log测试下来发现,都是UPDATE_AFTER类型的RowData数据写Hbase,没有你说的那种retract消息呢。如果是retract
>  应该是 会先发一条UPDATE_BEFORE 消息,再发一条UPDATE_AFTER消息吧。实际测下来 
> 都是UPDATE_AFTER,转成了hbase的Put操作,就好比每次都是upsert一样。 

HBase Sink 是upsert sink,一条retract消息里的 UPDATE_BEFORE 和 UPDATE_AFTER, 
UPDATE_BEFORE是会被框架推断后省略的,这是upsert sink里的一个小优化,但是upsert 
sink是能收到DELETE消息的,根据query的changlog推导,部分query会产生DELETE消息的,所以upsert sink会处理 
INSERT,UPDATE_AFTER, DELETE三种消息,如果有DELETE消息就会出现查询时刚好被删除的情况。

祝好
Leonard Xu

Re:Re:Re: flink获取latencymarker有什么好的方法

2020-09-26 文章 izual



Latency tracking[1] 开启后,metrics 里的 
[.[.]]..latency
 值只能作为一个参考值,更适合根据对比值判断哪个 channel、operator 延时变高了。




sql 
的话,也可能遇到之前我之前困惑的一点:http://apache-flink.147419.n8.nabble.com/flink-Latency-tracking-td1800.html
 




所以如果是用于获取数据流 end-to-end 延迟的话,目前觉得还是在 sql 数据流里自己增加一个时间字段比较合适。

另外 source 是 kafka 的话 group 的 lag 值,checkpoint 的 lastCheckpointDuration 
也都可以作为一个参考




1: 
https://ci.apache.org/projects/flink/flink-docs-master/monitoring/metrics.html#latency-tracking














在 2020-09-26 21:30:24,"郭士榕"  写道:
>感谢答复,是Sql的任务,想用LatencyMarker,本身提交的Sql任务可能没有用到EventTime
>
>
>
>
>
>在 2020-09-26 21:27:08,"shizk233"  写道:
>>如果是Data Stream API的话,可以考虑在目标算子上使用自定义metrics来展示数据延时情况
>>
>>郭士榕  于2020年9月26日周六 下午9:15写道:
>>
>>> Hi,All
>>>
>>>  
>>> 想问下大家如果要展示Flink任务的当前延时情况,有什么比较好的方法吗?用LatencyMarker是否可以,用API/JMX层面来获取的histogram能否汇总成一个数字?


Re: flink获取latencymarker有什么好的方法

2020-09-26 文章 JasonLee
hi

LatencyMarker 是一个全链路的延迟 不是非常的准确 不过也能大致反应端到端的延迟情况



-
Best Wishes
JasonLee
--
Sent from: http://apache-flink.147419.n8.nabble.com/


Flink 1.10.1 checkpoint失败问题

2020-09-26 文章 Storm☀️
各位好,checkpoint相关问题L

flink版本1.10.1:,个别的checkpoint过程发生问题:
java.lang.Exception: Could not perform checkpoint 1194 for operator Map
(3/3).
at
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:816)
at
org.apache.flink.streaming.runtime.io.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:86)
at
org.apache.flink.streaming.runtime.io.CheckpointBarrierTracker.processBarrier(CheckpointBarrierTracker.java:99)
at
org.apache.flink.streaming.runtime.io.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:155)
at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:133)
at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:310)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:485)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:469)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NullPointerException
at
org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1382)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:974)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$5(StreamTask.java:870)
at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:843)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:803)
... 12 mor

绝大部分是正常完成的,但是小部分比如上面的情况,就会失败,还会导致suspending-->restart.



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: 关于sql中null值设置watermark

2020-09-26 文章 赵一旦
使用函数COALESCE将null情况转为一个默认值比如0.

Kyle Zhang  于2020年9月25日周五 下午7:24写道:

> Hi,
> 我在flink1.11 create ddl中使用 WATERMARK FOR wm AS wm - INTERVAL '5’ SECOND
> 设置watermark,当有脏数据的时候wm为null,就会报RowTime field should not be null, please
> convert it to a non-null long value.有没有好的方法直接忽略脏数据。
>
> Best
>


答复: FlinkKafkaConsumer on Yarn 模式下 设置并行度无法提高kafka的消费速度,但是提交两个应用却可以

2020-09-26 文章 范超
多谢一旦哥,我大概摸清楚了这几个关系,
目前使用kafka-consumer-perf-test.sh对单broker单分区的kafka压测在8核16G的情况下是30W/s每秒的消费能力。
SINK端的写入大概是20W/s
ON YARN的Per JOB模式下通过调整分区和并行度来进行扩容。
接下来的方向按照两位大佬所说应该是对应用的jvm调优来处理了。

感谢一旦哥和benchao哥


-邮件原件-
发件人: 赵一旦 [mailto:hinobl...@gmail.com] 
发送时间: 2020年9月25日 星期五 14:57
收件人: user-zh@flink.apache.org
主题: Re: FlinkKafkaConsumer on Yarn 模式下 设置并行度无法提高kafka的消费速度,但是提交两个应用却可以

单分区情况下30MB,这和flink已经没关系了貌似。能否更高的消费,首先还得确认你是否由足够快的速度写入压测数据。
而kafka不论是写入还是消费都和分区数量有关系。所以压测flink的最高能力,首先你得压测kafka拿到一个你预设的最高压力,否则kafka分区1个,如果qps最高达到2w。那么你基于这个单分区kafka去压测flink,flink也不可能超过1w的qps去消费,懂吧?

压测flink,那么kafka部分你就得尽可能把分区设置很大,避免是kafka的瓶颈。

范超  于2020年9月25日周五 下午2:28写道:

> 感谢benchao哥这么快就回复了。我这边再多观察测试一下。
> 再次感谢
>
> -邮件原件-
> 发件人: Benchao Li [mailto:libenc...@apache.org]
> 发送时间: 2020年9月24日 星期四 16:06
> 收件人: user-zh 
> 主题: Re: FlinkKafkaConsumer on Yarn 模式下 设置并行度无法提高kafka的消费速度,但是提交两个应用却可以
>
> 我们一般提升作业吞吐能力的步骤就是看作业的反压情况,
> - 如果作业完全没有反压,说明此时处理能力大于上游数据产生速度
> - 如果作业有反压,就具体看下反压的是哪个算子,存在什么瓶颈。比如网络IO、磁盘IO、CPU;
>   当然,有时候内存问题也会表现为CPU现象,比如GC比较严重
>
> 范超  于2020年9月24日周四 上午10:48写道:
>
> > 谢谢Benchao哥回复。
> >
> > 这几天一直忙着压测这个问题。
> > 经多轮压测(先灌满kafka数据),再去消费。
> > 发现确实是您说的问题中的第三个情况
> > 由于kafka的topic只开了一个partition
> >
> >
> 所以flinkkafkaconsumer按照一个taskmanger对应了一个kafka的parition的方式进行了处理。从而导致虽然作业并发度够大,但是由于只有一个partition,
> > 其他并发的taskmanager无法获取到更多的partition进行消费,从而导致并行度提升而作业消费能力却无法同比增大。
> >
> > 之后通过建立2个partition的topic,实现了消费能力的翻倍。
> >
> >
> > 想再请多问您一句,我如果想压出作业的极限吞吐量,请问该如何设置一些运行参数,目前我通过设置on yarn
> > 的tm的内存大小,kafka的partition数目,也无法将作业的吞吐量压上去。
> >
> >
> >
> > -邮件原件-
> > 发件人: Benchao Li [mailto:libenc...@apache.org]
> > 发送时间: 2020年9月18日 星期五 18:49
> > 收件人: user-zh 
> > 主题: Re: FlinkKafkaConsumer on Yarn 模式下 设置并行度无法提高kafka的消费速度,但是提交两个应用却可以
> >
> > 提交两个作业的话,两个作业是完全独立的,都会消费全量数据。
> >
> > 一个作业的消费能力不行,可以具体看下瓶颈在哪里,比如:
> > 1. 作业是否有lag,如果没有lag,那其实是没有问题的
> > 2. 如果作业有lag,而且lag还在上涨,说明当前消费能力不足,此时可以看下作业具体的瓶颈在哪里
> > 有可能是某个算子在反压导致整个作业的消费能力不足
> > 也有可能是作业的整体CPU资源不足导致的
> > 也有一种极端情况是,作业的并发度已经足够大,source subtask已经对应一个kafka
> > partition了,但是消费能力还是不足,这个时候其实是单个partition数据量太大,对应到Flink的source算子处理能力不足导致的
> > 3. 如果作业当前有lag,但是lag在下降,说明消费能力其实是够的,只是数据有些积压
> >
> > 范超  于2020年9月18日周五 下午4:07写道:
> >
> > > 各位好,我遇到了一个奇怪的问题
> > >
> > > 我是使用flink1.10和 flink-connector-kafka_2.11
> > >
> > > 使用Flink on yarn 模式运行,无论怎么调大并行度。Kafka节点(我使用的单节点)的网卡输出速度一直上不去。
> > >
> > > 但是提交两个同样的应用同样使用FLink on Yarm模式,Kafka节点的网卡输出速度是正常翻倍的。
> > >
> > > 我想达到的目的不是通过多向yarn集群提交多一个app,而是通过设置并行度来提高应用的吞吐量。。
> > >
> > > 求各位大佬指导
> > >
> >
> >
> > --
> >
> > Best,
> > Benchao Li
> >
>
>
> --
>
> Best,
> Benchao Li
>


Re: 请教二阶段提交问题

2020-09-26 文章 tison
> 可是再次提交没有意义啊,没有数据[捂脸哭]

这个事儿是这样的,你用 checkpoint 之后呢没有反过来确认的 commit 会留在 state 里,所以重启的时候重新加载 state
的时候就会再提交一遍。然后向 kafka 这一类存储 commit offset 是幂等的,发现已经 commit 过就跳过就 OK 了。

Best,
tison.


蒋佳成(Jiacheng Jiang) <920334...@qq.com> 于2020年9月26日周六 下午4:01写道:

>
> 两阶段提交的第一阶段提交中,事务参与者反馈ok后需要作出之后一定能提交事务的承诺,事务参与者需要做些事来兑现承诺比如将事务操作持久化。在FlinkKafkaProducer中,preCommit就是调用了KafkaProducer的flush将数据刷到kafka中,在整个checkpoint完成后再提交事务,如果提交失败,会在job重启时再次提交事务。因此,我们需要保证的是preCommit成功后commit一定要能成功,这个需要根据具体写入的存储提供的特性来完成。
>
>
>
>
> -- 原始邮件 --
> 发件人: "高亮" 发送时间: 2020年9月25日(星期五) 中午11:14
> 收件人: "user-zh" 主题: 请教二阶段提交问题
>
>
>
> 各位大佬,请教一下二阶段提交的问题,preCommit预提交失败回滚已经很清楚了,就是在commit阶段提交如果失败会怎么,比较迷惑。
>
>
>
> 我自己测试了一下,发现只要是commit失败会造成数据丢失,但是看了下方法注释,说是失败了后会重启flink恢复到最近的state,继续提交,可是我在程序里有专门打印source输入的流数据,发现没有按到任何数据进入,也就是说flink重启后就直接调用commit再次提交。
>
>
> 可是再次提交没有意义啊,没有数据[捂脸哭]
>
>
> 所以请教大佬,当commit出现异常后,flink内部是如何解决的,作为flink应用者,如何正确使用避免和解决这类问题!


Re: flinksql接收到字段值格式为2020-09-23T20:58:24+08:00,如何转成TIMESTAMP

2020-09-26 文章 Tianwang Li
FROM_UNIXTIME 使用的是本地时区, (可以看,org.apache.flink.table.runtime.functions.
SqlDateTimeUtils#fromUnixtime的实现)
可以指定时区,StreamTableEnvironment.getConfig().setLocalTimeZone()

Joker  于2020年9月24日周四 下午1:54写道:

> 不好意思,插入个问题。ts AS TO_TIMESTAMP(FROM_UNIXTIME(create_time / 1000,
> '-MM-dd HH:mm:ss')) ,我按此方式生成事件时间列,发现watermark一直比北京时间多8小时,比如create_time
> 为1600926591666,ts计算出来是2020/9/24
> 13:49:51没问题,但在WebUI上发现提取的watermark为2020/9/24 21:49:51
>
>
> | |
> Joker
> |
> |
> gaojintao...@163.com
> |
> 签名由网易邮箱大师定制
>
>
> 在2020年09月24日 13:40,Jark Wu 写道:
> Flink 的 TO_TIMESTAMP 函数用的是 Java SimpleDateFormat 来解析时间格式的,所以可以看下
> SimpleDateFormat 的 javadoc。
> 你可以试下 to_timestamp('2020-09-23T20:58:24+08:00',
> '-MM-dd''T''HH:mm:ssXXX') 来解析你的数据。
>
> Best,
> Jark
>
> On Wed, 23 Sep 2020 at 21:08, chenxuying  wrote:
>
> flinksql 版本是1.11.2
> source接收到字段是字符串类型的时间
> CREATE TABLE sourceTable (
> `time` STRING
> ) WITH(
> ...
> );
>
>
> sink如下
> CREATE TABLE sinktable (
> `time1` STRING,
> `time` TIMESTAMP(3)
> ) WITH (
> 'connector' = 'print'
> );
>
>
> insert语句,不知道怎么正确修改TO_TIMESTAMP默认的格式
> insert into sinktable select
> `time`,TO_TIMESTAMP(`time`,'-MM-ddTHH:mm:ss+08:00') from sourceTable
>
>
> 报错说是format错误
> Caused by: java.lang.IllegalArgumentException: Unknown pattern letter: T
> at
>
> java.time.format.DateTimeFormatterBuilder.parsePattern(DateTimeFormatterBuilder.java:1663)
> at
>
> java.time.format.DateTimeFormatterBuilder.appendPattern(DateTimeFormatterBuilder.java:1572)
> at java.time.format.DateTimeFormatter.ofPattern(DateTimeFormatter.java:534)
>


-- 
**
 tivanli
**


Re:Re: flink获取latencymarker有什么好的方法

2020-09-26 文章 郭士榕
感谢答复,是Sql的任务,想用LatencyMarker,本身提交的Sql任务可能没有用到EventTime





在 2020-09-26 21:27:08,"shizk233"  写道:
>如果是Data Stream API的话,可以考虑在目标算子上使用自定义metrics来展示数据延时情况
>
>郭士榕  于2020年9月26日周六 下午9:15写道:
>
>> Hi,All
>>
>>  
>> 想问下大家如果要展示Flink任务的当前延时情况,有什么比较好的方法吗?用LatencyMarker是否可以,用API/JMX层面来获取的histogram能否汇总成一个数字?


Re: flink获取latencymarker有什么好的方法

2020-09-26 文章 shizk233
如果是Data Stream API的话,可以考虑在目标算子上使用自定义metrics来展示数据延时情况

郭士榕  于2020年9月26日周六 下午9:15写道:

> Hi,All
>
>  
> 想问下大家如果要展示Flink任务的当前延时情况,有什么比较好的方法吗?用LatencyMarker是否可以,用API/JMX层面来获取的histogram能否汇总成一个数字?


flink获取latencymarker有什么好的方法

2020-09-26 文章 郭士榕
Hi,All
   
想问下大家如果要展示Flink任务的当前延时情况,有什么比较好的方法吗?用LatencyMarker是否可以,用API/JMX层面来获取的histogram能否汇总成一个数字?

Re: [flink-1.10.2] Blink SQL 超用内存严重

2020-09-26 文章 Tianwang Li
目前,观察到另外一个现象,
如果任务出现了异常,例如写Kafka失败,任务自动重启,这个时候就会突然飙升。
应该是任务失败之后,关闭重启,rocksdb占用到内存没有回收。
通过pmap查看,占用比较多内存多是很多个(128MB 和 64MB 内存块)。

另外,失败重启和如下多jira 描述重启任务多时候比较类似。
https://issues.apache.org/jira/browse/FLINK-7289


pmap图:
[image: image.png]

[image: image.png]

Tianwang Li  于2020年9月23日周三 下午9:11写道:

> 使用的是 `RocksDBStateBackend`, 是什么超用了内存, 配置了“taskmanager.memory.process.size:
> 4g”,
> 并且有预留 1G 用于jvm-overhead。
> 现在超了2.8G,是什么超用的,我想了解一下。
> 如果控制不了,很容易被资源系统(yarn、k8s等) kill 了。
>
>
> 有没有,其他人有这方面的经验。
>
>
>
> Benchao Li  于2020年9月23日周三 下午1:12写道:
>
>> 超yarn内存不合理。因为state如果用的是heap,那应该是堆内内存,不会超过配置的JVM的最大heap的内存的,
>> 只会jvm oom。超过yarn内存限制,那是因为还有jvm其他的overhead,加起来总量超了。
>>
>> 郑斌斌  于2020年9月23日周三 下午12:29写道:
>>
>> >  我这边也是遇到同样的问题,简单的双流 interval join SQL 跑4-5天就会有发生超用内存,然后container 被YARN
>> > KILL 。
>> > 单流跑的话,比较正常。
>> > JOB的内存是4G。版本1.11.1
>> > --
>> > 发件人:Benchao Li 
>> > 发送时间:2020年9月23日(星期三) 10:50
>> > 收件人:user-zh 
>> > 主 题:Re: [flink-1.10.2] Blink SQL 超用内存严重
>> >
>> > Hi Tianwang,
>> >
>> > 不知道你的DataStream是怎么实现的,只是从SQL的角度来看,有两个地方会导致状态的量会增加
>> >
>> > 1. time interval join会将watermark delay之后再发送,也就是说下游的hop窗口的状态会因为上游
>> > join算子延迟了watermark发送而状态比正常情况下大一些。目前watermark的延迟是
>> > `Math.max(leftRelativeSize, rightRelativeSize) +
>> > allowedLateness`,根据你的SQL,这个值应该是6h
>> > 2. time interval join中为了防止频繁的遍历state,曾经做过一个优化。但是这个优化会导致那些过期数据的
>> > 清理被延迟,也就是说状态清理的比正常预期要慢一点。如果用的是left outer join,甚至于会观察到
>> > 数据晚于watermark的情况,也就是下游window会有观察到丢数据的情况。这个时间目前是
>> > `minCleanUpInterval = (leftRelativeSize + rightRelativeSize) /
>> > 2;`,在你的SQL来讲,就是3h,也就是说
>> > 状态会*多保存*这么久。这个我曾经建过一个issue来优化这个点[1]
>> >
>> > 希望这个可以解答你的疑惑~
>> >
>> > [1] https://issues.apache.org/jira/browse/FLINK-18996
>> >
>> > Tianwang Li  于2020年9月22日周二 下午8:26写道:
>> >
>> > > 使用 Blink SQL 实现 interval join + hop window 超用内存比较严重。
>> > >
>> > >
>> > > 【join】
>> > >
>> > > > SELECT `b`.`rowtime`,
>> > > > `a`.`c_id`,
>> > > > `b`.`openid`
>> > > > FROM `test_table_a` AS `a`
>> > > > INNER JOIN `test_table_b` AS `b` ON `a`.`RoomID` = `b`.`RoomID`
>> > > > AND `a`.`openid` = `b`.`openid`
>> > > > AND `b`.`rowtime` BETWEEN ASYMMETRIC `a`.`rowtime` - INTERVAL '0'
>> > SECOND
>> > > > AND `a`.`rowtime` + INTERVAL '6' HOUR
>> > > >
>> > > >
>> > > 【window】
>> > >
>> > > > SELECT HOP_ROWTIME(`rowtime`, INTERVAL '5' MINUTE, INTERVAL '6'
>> HOUR)
>> > AS
>> > > > `rowtime`,
>> > > > HOP_START(`rowtime`, INTERVAL '5' MINUTE, INTERVAL '6' HOUR) AS
>> > > > `__windoow_start__`,
>> > > > HOP_END(`rowtime`, INTERVAL '5' MINUTE, INTERVAL '6' HOUR) AS
>> > > > `__window_end__`,
>> > > > `c_id`,
>> > > > COUNT(`openid`) AS `cnt`
>> > > > FROM `test_table_in_6h`
>> > > > GROUP BY HOP(`rowtime`, INTERVAL '5' MINUTE, INTERVAL '6' HOUR),
>> > > > `c_id`
>> > > >
>> > >
>> > >
>> > > 我配置了Fink的内存是4G, 实际使用达到了6.8 G。
>> > > 同样的逻辑,我使用Stream API实现,使用的内存只有4.5G左右
>> > >
>> > > 【配置】
>> > >
>> > > > cat conf/flink-conf.yaml
>> > > > jobmanager.rpc.address: flink-jobmanager
>> > > > taskmanager.numberOfTaskSlots: 1
>> > > > blob.server.port: 6124
>> > > > jobmanager.rpc.port: 6123
>> > > > taskmanager.rpc.port: 6122
>> > > > jobmanager.heap.size: 6144m
>> > > > taskmanager.memory.process.size: 4g
>> > > > taskmanager.memory.jvm-overhead.min: 1024m
>> > > > taskmanager.memory.jvm-overhead.max: 2048m
>> > > > taskmanager.debug.memory.log-interval: 1
>> > > > env.java.opts: "-Xloggc:/opt/flink/log/gc.log
>> > > > -XX:+PrintGCApplicationStoppedTime -XX:+PrintGCDetails
>> > > > -XX:+PrintGCDateStamps -XX:+UseGCLogFileRotation
>> > > -XX:NumberOfGCLogFiles=10
>> > > > -XX:GCLogFileSize=10M -XX:+PrintPromotionFailure -XX:+PrintGCCause"
>> > > >
>> > >
>> > >
>> > >
>> > > --
>> > > **
>> > >  tivanli
>> > > **
>> > >
>> >
>> >
>> > --
>> >
>> > Best,
>> > Benchao Li
>> >
>> >
>>
>> --
>>
>> Best,
>> Benchao Li
>>
>
>
> --
> **
>  tivanli
> **
>


-- 
**
 tivanli
**


????????????????????????

2020-09-26 文章 ??????(Jiacheng Jiang)
??ok??FlinkKafkaProducerpreCommit??KafkaProducer??flush??kafka??checkpointjob??preCommit??commit??




--  --
??: ""

????????????????????????

2020-09-26 文章 ??????(Jiacheng Jiang)
??ok??FlinkKafkaProducerpreCommit??KafkaProducer??flush??kafka??checkpointjob??preCommit??commit??




----
??: ""

Re: 请教二阶段提交问题

2020-09-26 文章 shizk233
需要搭配事务性存储机制来使用,能够保证预提交成功的数据能最终被commit成功。
详情可以参考孙金城老师关于这一部分的讲解和代码实现[1]

[1]https://www.bilibili.com/video/BV1yk4y1z7Lr?p=33

高亮  于2020年9月25日周五 上午11:14写道:

> 各位大佬,请教一下二阶段提交的问题,preCommit预提交失败回滚已经很清楚了,就是在commit阶段提交如果失败会怎么,比较迷惑。
>
>
>
> 我自己测试了一下,发现只要是commit失败会造成数据丢失,但是看了下方法注释,说是失败了后会重启flink恢复到最近的state,继续提交,可是我在程序里有专门打印source输入的流数据,发现没有按到任何数据进入,也就是说flink重启后就直接调用commit再次提交。
>
>
> 可是再次提交没有意义啊,没有数据[捂脸哭]
>
>
> 所以请教大佬,当commit出现异常后,flink内部是如何解决的,作为flink应用者,如何正确使用避免和解决这类问题!