Re: flink 1.11 cdc: 如何将DataStream 要如何转成flink sql cdc里的table?

2020-11-05 文章 jindy_liu
好的,谢谢jark!
数据是有删除的,所以看看要实现下souce方案。本来只想在上层用mapfuction进行一下合并来的,再做转换!
看来还是绕不过sql connector实现。源是kafka,看样子要想办法把kafka的流KafkaDynamicSource想办法改造下!!!



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: flink savepoint

2020-11-05 文章 admin
Hi,
你的任务时跑在yarn上的吗?如果是 需要指定 -yid

> 2020年11月6日 下午1:31,Congxian Qiu  写道:
> 
> Hi
> 从 client 端日志,或者 JM 日志还能看到其他的异常么?
> Best,
> Congxian
> 
> 
> 张锴  于2020年11月6日周五 上午11:42写道:
> 
>> 重启和反压都正常
>> 另外增加了从客户端到master的时间,还是有这个问题
>> 
>> hailongwang <18868816...@163.com> 于 2020年11月6日周五 10:54写道:
>> 
>>> Hi,
>>> 
>>> 
>>> 这个报错只是在规定的时间内没有完成 Savepoint,导致客户端连接 Master 超时,
>>> 具体的原因需要看下 Jobmaster 的日志。
>>> PS:在任务一直重启、反压的情况下,一般 Savepoint 都会失败。
>>> 
>>> 
>>> Best,
>>> Hailong Wang
>>> 
>>> 
>>> 
>>> 
>>> 在 2020-11-06 09:33:48,"张锴"  写道:
 本人在使用flink savepoint 保存快照的时候,遇到错误,目前不清楚是因为什么导致这个问题,路过的大佬帮忙看下。
 
 flink 版本1.10.1
 
 
 执行   flink savepoint a3a2e6c3a5a00bbe4c0c9e351dc58c47
 hdfs://hadoopnamenodeHA/flink/flink-savepoints
 
 
 出现错误信息
 
 
 org.apache.flink.util.FlinkException: Triggering a savepoint for the job
 a3a2e6c3a5a00bbe4c0c9e351dc58c47 failed.
 
 at
>>> 
>>> 
>>> org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:631)
 
 at
>>> 
>>> 
>>> org.apache.flink.client.cli.CliFrontend.lambda$savepoint$9(CliFrontend.java:609)
 
 at
>>> 
>>> 
>>> org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:841)
 
 at
>>> org.apache.flink.client.cli.CliFrontend.savepoint(CliFrontend.java:606)
 
 at
>>> 
>>> 
>>> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:908)
 
 at
>>> 
>>> 
>>> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:966)
 
 at java.security.AccessController.doPrivileged(Native Method)
 
 at javax.security.auth.Subject.doAs(Subject.java:422)
 
 at
>>> 
>>> 
>>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
 
 at
>>> 
>>> 
>>> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
 
 at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:966)
 
 Caused by: java.util.concurrent.TimeoutException
 
 at
>>> 
>>> 
>>> java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771)
 
 at
>>> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
 
 at
>>> 
>>> 
>>> org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:625)
>>> 
>> 



Re: 退订

2020-11-05 文章 Congxian Qiu
hi
退订请发邮件到  user-zh-unsubscr...@flink.apache.org,更多详情请参考[1]

[1] https://flink.apache.org/community.html#mailing-lists

Best,
Congxian


李郝 <13777597...@163.com> 于2020年11月5日周四 下午9:54写道:

> 退订


Re: flink savepoint

2020-11-05 文章 Congxian Qiu
Hi
 从 client 端日志,或者 JM 日志还能看到其他的异常么?
Best,
Congxian


张锴  于2020年11月6日周五 上午11:42写道:

> 重启和反压都正常
> 另外增加了从客户端到master的时间,还是有这个问题
>
> hailongwang <18868816...@163.com> 于 2020年11月6日周五 10:54写道:
>
> > Hi,
> >
> >
> > 这个报错只是在规定的时间内没有完成 Savepoint,导致客户端连接 Master 超时,
> > 具体的原因需要看下 Jobmaster 的日志。
> > PS:在任务一直重启、反压的情况下,一般 Savepoint 都会失败。
> >
> >
> > Best,
> > Hailong Wang
> >
> >
> >
> >
> > 在 2020-11-06 09:33:48,"张锴"  写道:
> > >本人在使用flink savepoint 保存快照的时候,遇到错误,目前不清楚是因为什么导致这个问题,路过的大佬帮忙看下。
> > >
> > >flink 版本1.10.1
> > >
> > >
> > >执行   flink savepoint a3a2e6c3a5a00bbe4c0c9e351dc58c47
> > >hdfs://hadoopnamenodeHA/flink/flink-savepoints
> > >
> > >
> > >出现错误信息
> > >
> > >
> > >org.apache.flink.util.FlinkException: Triggering a savepoint for the job
> > >a3a2e6c3a5a00bbe4c0c9e351dc58c47 failed.
> > >
> > > at
> >
> >
> >org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:631)
> > >
> > > at
> >
> >
> >org.apache.flink.client.cli.CliFrontend.lambda$savepoint$9(CliFrontend.java:609)
> > >
> > > at
> >
> >
> >org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:841)
> > >
> > > at
> > org.apache.flink.client.cli.CliFrontend.savepoint(CliFrontend.java:606)
> > >
> > > at
> >
> >
> >org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:908)
> > >
> > > at
> >
> >
> >org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:966)
> > >
> > > at java.security.AccessController.doPrivileged(Native Method)
> > >
> > > at javax.security.auth.Subject.doAs(Subject.java:422)
> > >
> > > at
> >
> >
> >org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
> > >
> > > at
> >
> >
> >org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> > >
> > > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:966)
> > >
> > >Caused by: java.util.concurrent.TimeoutException
> > >
> > > at
> >
> >
> >java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771)
> > >
> > > at
> > java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
> > >
> > > at
> >
> >
> >org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:625)
> >
>


Re: Re:Re: Flink StreamingFileSink滚动策略

2020-11-05 文章 bradyMk
Hi,guoliang_wang1335
请问StreamingFileSink用forBulkFormat方法时,可以自定义滚动策略么?你这边实现成功了么?



-
Best Wishes
--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink savepoint

2020-11-05 文章 张锴
重启和反压都正常
另外增加了从客户端到master的时间,还是有这个问题

hailongwang <18868816...@163.com> 于 2020年11月6日周五 10:54写道:

> Hi,
>
>
> 这个报错只是在规定的时间内没有完成 Savepoint,导致客户端连接 Master 超时,
> 具体的原因需要看下 Jobmaster 的日志。
> PS:在任务一直重启、反压的情况下,一般 Savepoint 都会失败。
>
>
> Best,
> Hailong Wang
>
>
>
>
> 在 2020-11-06 09:33:48,"张锴"  写道:
> >本人在使用flink savepoint 保存快照的时候,遇到错误,目前不清楚是因为什么导致这个问题,路过的大佬帮忙看下。
> >
> >flink 版本1.10.1
> >
> >
> >执行   flink savepoint a3a2e6c3a5a00bbe4c0c9e351dc58c47
> >hdfs://hadoopnamenodeHA/flink/flink-savepoints
> >
> >
> >出现错误信息
> >
> >
> >org.apache.flink.util.FlinkException: Triggering a savepoint for the job
> >a3a2e6c3a5a00bbe4c0c9e351dc58c47 failed.
> >
> > at
>
> >org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:631)
> >
> > at
>
> >org.apache.flink.client.cli.CliFrontend.lambda$savepoint$9(CliFrontend.java:609)
> >
> > at
>
> >org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:841)
> >
> > at
> org.apache.flink.client.cli.CliFrontend.savepoint(CliFrontend.java:606)
> >
> > at
>
> >org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:908)
> >
> > at
>
> >org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:966)
> >
> > at java.security.AccessController.doPrivileged(Native Method)
> >
> > at javax.security.auth.Subject.doAs(Subject.java:422)
> >
> > at
>
> >org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
> >
> > at
>
> >org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> >
> > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:966)
> >
> >Caused by: java.util.concurrent.TimeoutException
> >
> > at
>
> >java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771)
> >
> > at
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
> >
> > at
>
> >org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:625)
>


Re: Re:请教一下目前flink submit能不能指定额外的依赖jar

2020-11-05 文章 silence
感谢回复,还是希望可以从submit上解决这个问题,不能添加依赖限制了很多应用场景,特别是针对平台来说



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re:请教一下目前flink submit能不能指定额外的依赖jar

2020-11-05 文章 hailongwang
Hi silence,


 目前有个 issue [1]在跟进创建 UDF 时候添加 jar 包。
PS:目前在我的内部版本,是扩展了 类似 ADD Dependency 语法,在 job 提交运行时候会把 jar 包等加载到所运行的 classpath 下,
这样就可以让用户在 SQL 中注册 UDF,自己定义 Connector等,但是是非标准 SQL。


[1] https://issues.apache.org/jira/browse/FLINK-14055


Best,
Hailong Wang

在 2020-11-06 09:34:27,"silence"  写道:
>大家好
>
>由于目前用了flink SQL封装了jar包,sql是作为参数动态传入的,
>因此需要动态的引入一下依赖jar,比如udf jar,connector的jar等,
>由于不同任务的依赖jar是不同的,不适合都放在flink lib目录下(可能会冲突)
>因此想请教一下有没有办法在submit时才指定任务依赖的jar包,类似spark submit的--jars
>没有的话有没有相关的issue可以跟进这个问题
>
>谢谢
>
>
>
>--
>Sent from: http://apache-flink.147419.n8.nabble.com/


Re:flink savepoint

2020-11-05 文章 hailongwang
Hi,


这个报错只是在规定的时间内没有完成 Savepoint,导致客户端连接 Master 超时,
具体的原因需要看下 Jobmaster 的日志。
PS:在任务一直重启、反压的情况下,一般 Savepoint 都会失败。


Best,
Hailong Wang




在 2020-11-06 09:33:48,"张锴"  写道:
>本人在使用flink savepoint 保存快照的时候,遇到错误,目前不清楚是因为什么导致这个问题,路过的大佬帮忙看下。
>
>flink 版本1.10.1
>
>
>执行   flink savepoint a3a2e6c3a5a00bbe4c0c9e351dc58c47
>hdfs://hadoopnamenodeHA/flink/flink-savepoints
>
>
>出现错误信息
>
>
>org.apache.flink.util.FlinkException: Triggering a savepoint for the job
>a3a2e6c3a5a00bbe4c0c9e351dc58c47 failed.
>
> at
>org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:631)
>
> at
>org.apache.flink.client.cli.CliFrontend.lambda$savepoint$9(CliFrontend.java:609)
>
> at
>org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:841)
>
> at org.apache.flink.client.cli.CliFrontend.savepoint(CliFrontend.java:606)
>
> at
>org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:908)
>
> at
>org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:966)
>
> at java.security.AccessController.doPrivileged(Native Method)
>
> at javax.security.auth.Subject.doAs(Subject.java:422)
>
> at
>org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
>
> at
>org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:966)
>
>Caused by: java.util.concurrent.TimeoutException
>
> at
>java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771)
>
> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
>
> at
>org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:625)


请教一下目前flink submit能不能指定额外的依赖jar

2020-11-05 文章 silence
大家好

由于目前用了flink SQL封装了jar包,sql是作为参数动态传入的,
因此需要动态的引入一下依赖jar,比如udf jar,connector的jar等,
由于不同任务的依赖jar是不同的,不适合都放在flink lib目录下(可能会冲突)
因此想请教一下有没有办法在submit时才指定任务依赖的jar包,类似spark submit的--jars
没有的话有没有相关的issue可以跟进这个问题

谢谢



--
Sent from: http://apache-flink.147419.n8.nabble.com/

flink savepoint

2020-11-05 文章 张锴
本人在使用flink savepoint 保存快照的时候,遇到错误,目前不清楚是因为什么导致这个问题,路过的大佬帮忙看下。

flink 版本1.10.1


执行   flink savepoint a3a2e6c3a5a00bbe4c0c9e351dc58c47
hdfs://hadoopnamenodeHA/flink/flink-savepoints


出现错误信息


org.apache.flink.util.FlinkException: Triggering a savepoint for the job
a3a2e6c3a5a00bbe4c0c9e351dc58c47 failed.

 at
org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:631)

 at
org.apache.flink.client.cli.CliFrontend.lambda$savepoint$9(CliFrontend.java:609)

 at
org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:841)

 at org.apache.flink.client.cli.CliFrontend.savepoint(CliFrontend.java:606)

 at
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:908)

 at
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:966)

 at java.security.AccessController.doPrivileged(Native Method)

 at javax.security.auth.Subject.doAs(Subject.java:422)

 at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)

 at
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)

 at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:966)

Caused by: java.util.concurrent.TimeoutException

 at
java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771)

 at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)

 at
org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:625)


回复: Flink1.11.0 sql org.apache.flink.connector.jdbc.dialect.MySQLDialect发现疑似bug

2020-11-05 文章 wind.fly....@outlook.com
嗯,应该是没问题的,我理解错了,谢谢指正。

发件人: 史 正超 
发送时间: 2020年11月5日 19:30
收件人: user-zh@flink.apache.org 
主题: 回复: Flink1.11.0 sql 
org.apache.flink.connector.jdbc.dialect.MySQLDialect发现疑似bug

应该是没有问题的,
首先你在flink sql中指定的primary key 应该要与mysql中的唯一索引或者主键对应。
其次那个方法里组装出来的语句 类似下面的语句:
INSERT INTO `tablename`(`key1`, `key2`, `f1`, `f2`) VALUES (?, ?, ?, ?) ON 
DUPLICATE KEY UPDATE `key1`=VALUES(`key1`), `key2`=VALUES(`key2`), 
`f1`=VALUES(`f1`), `f2`=VALUES(`f2`)

里面已经包含了定义的key, 当发生唯一键冲突时,会执行更新。所以无需指定uniqueKeyFields的

发件人: wind.fly@outlook.com 
发送时间: 2020年11月5日 10:58
收件人: user-zh@flink.apache.org 
主题: Flink1.11.0 sql org.apache.flink.connector.jdbc.dialect.MySQLDialect发现疑似bug

Hi,all:
   近日因为用到JdbcDynamicTableSink,发现往mysql插入数据时没有按我指定的primary 
key更新数据,无意间追踪到org.apache.flink.connector.jdbc.dialect.MySQLDialect类中getUpsertStatement方法:


/**
*
 Mysql upsert query use DUPLICATE KEY UPDATE.
*
*
 NOTE: It requires Mysql's primary key to be consistent with pkFields.
*
*
 We don't use REPLACE INTO, if there are other fields, we can keep their 
previous values.
*/
@Override
public Optional getUpsertStatement(String tableName, String[] 
fieldNames, String[] uniqueKeyFields)
 {
String
 updateClause = Arrays.stream(fieldNames)
.map(f ->
 quoteIdentifier(f) + "=VALUES(" + quoteIdentifier(f) + ")")
.collect(Collectors.joining(", "));
return Optional.of(getInsertIntoStatement(tableName,
 fieldNames) +
"
 ON DUPLICATE KEY UPDATE " + updateClause
);
}

该方法中有uniqueKeyFields参数但是没有用到,且我认为updateClause应该是用uniqueKeyFields生成, 代码改成:

String updateClause = Arrays.stream(uniqueKeyFields)
   .map(f -> quoteIdentifier(f) + "=VALUES(" + quoteIdentifier(f) + ")")
   .collect(Collectors.joining(", "));

麻烦各位大佬确认。


Re:关于flink任务挂掉报警的监控指标选择

2020-11-05 文章 RS
可以配置任务重启告警, flink任务挂掉之后会自动尝试重启
如果是固定任务数量的话, 还可以配置slot数量告警



在 2020-11-05 10:15:01,"bradyMk"  写道:
>请问各位大佬,我基于grafana+prometheus构建的Flink监控,现在想实现flink任务挂掉后,grafana就发出报警的功能,但是目前不知道该用什么指标去监控,我之前想监控flink_jobmanager_job_uptime这个指标,设置的监控规则是:max_over_time(flink_jobmanager_job_uptime[1m])
>-
>min_over_time(flink_jobmanager_job_uptime[1m])的差小于等于0就报警,但是任务刚启动,会有误报,想请教下有没有更好的办法
>
>
>
>-
>Best Wishes
>--
>Sent from: http://apache-flink.147419.n8.nabble.com/


回复: flink1.11的cdc功能对消息顺序性的处理

2020-11-05 文章 史 正超
Canal可以配置分区策略:配置保证相同id的记录都发到同一个分区,比如 `db.table1:id`
这样就保证了数据的有序。

发送自 Windows 10 版邮件应用

发件人: Jark Wu
发送时间: 2020年11月5日 21:28
收件人: user-zh
主题: Re: flink1.11的cdc功能对消息顺序性的处理

我理解你说的是对 pk 的更新的场景。

比如一张 user 表,有[user_id, user_name] 2个字段,
假设有 "101, Tim" 记录 做了两次更新
update1:update test set id=102 where id=101;
update2: update test set id=103 wehre id=102;

针对这种场景 debezium 是会把这种针对 pk的更新拆成一条 delete 和一条 insert,而不是 update 消息。

所以 update1 语句产生了:
DELETE(101,Timo) 发到了p1
INSERT(102,Tim) 发到了 p2

update2 语句产生了:
DELETE(102, Tim) 发到了 p2
INSERT(103, Tim) 发到了 p3

所以 flink 去对接这个数据的时候,仍然能够最终数据是 (103, Tim), 因为 102 的两条数据,INSERT, DELETE
仍然是有序的。

所以如果 canal 对于 pk 更新也是同样的策略,那么也是一样的。 但我不确定 canal 是怎么处理 pk 更新的,这个需要调研下。

Best,
Jark

On Thu, 5 Nov 2020 at 21:05, hailongwang <18868816...@163.com> wrote:

> Hi,
>
> 可以看下 Jark 的 《基于 Flink SQL CDC 的实时数据同步方案》文章 [1]. 其中在最后的 Q 中描述了
> "首先需要 kafka 在分区中保证有序,同一个 key 的变更数据需要打入到同一个 kafka 的分区里面,这样 flink
> 读取的时候才能保证顺序。"
>
>
> 个人认为,需要 Update 的 key 可以更 canal 采集到 kakfa 的 hash key 一致,这样就保证了有序?
>
>
> [1] https://mp.weixin.qq.com/s/QNJlacBUlkMT7ksKKSNa5Q
>
>
> Best,
> Hailong Wang
>
>
>
>
>
> 在 2020-11-05 15:35:55,"18392099563" <18392099...@163.com> 写道:
> >hi everyone,
> >麻烦请教下各位大神,flink如何处理如下问题:
>
> >flink1.11引入cdc,可以解析canal和debezuim发送的CDC数据,其中canal一般是可以指定某些字段作为key进行hash分区发送到同一topic下的不同分区的。
> >如果源端短时间对pk值进行多次update,则有可能导致发往不同分区,从而无法保证顺序性。
> >假如
> >1.有源表和目标表:
> >create table test(
> >id int(10) primary key
> >)
> >2.源表的增量数据通过canal发往kafka,目标表接收kafka消息进行同步。
> >3.发往的topic下有三个partition:p0、p1、p2
> >4.源端和目标端都有一条记录id=1
> >
> >此时对源端进行两次update:
> >update1:update test set id=2 where id=1;
> >update2: update test set id=3 wehre id=2;
>
> >假如两条消息都在同一批message中发往kafka,其中update1发送到p1,pudate2发送到p2,这两条消息的顺序性是无法保证的,假如update2先到达,则目标端最终结果为id=2,与源端结果id=3不一致。
> >
> >
> >
> >
> >
> >--
> >Sent from: http://apache-flink.147419.n8.nabble.com/
>



关于cluster.evenly-spread-out-slots参数的底层原理

2020-11-05 文章 赵一旦
有没有人对cluster.evenly-spread-out-slots参数了解比较深入的给讲解下。

我主要想知道,设置这个参数为true之后。Flink是以一个什么样的规则去尽可能均衡分配的。
standalone集群模式下,每个机器性能相同,flink slot数量配置相同情况下。基于*这种分配规则*,有没有一种方法让Flink做到
*完全均衡*,而*不是尽可能均衡*?

此外,我说的“均衡”都特指算子级别的均衡。不要5机器一共5个slot,然后任务有5个算子,每个算子单并发并且通过不同的share
group各独占1个slot这种均衡。我指的是每个算子都均衡到机器(*假设并发设置合理*)。


退订

2020-11-05 文章 李郝
退订

Re: flink1.11的cdc功能对消息顺序性的处理

2020-11-05 文章 Jark Wu
我理解你说的是对 pk 的更新的场景。

比如一张 user 表,有[user_id, user_name] 2个字段,
假设有 "101, Tim" 记录 做了两次更新
update1:update test set id=102 where id=101;
update2: update test set id=103 wehre id=102;

针对这种场景 debezium 是会把这种针对 pk的更新拆成一条 delete 和一条 insert,而不是 update 消息。

所以 update1 语句产生了:
DELETE(101,Timo) 发到了p1
INSERT(102,Tim) 发到了 p2

update2 语句产生了:
DELETE(102, Tim) 发到了 p2
INSERT(103, Tim) 发到了 p3

所以 flink 去对接这个数据的时候,仍然能够最终数据是 (103, Tim), 因为 102 的两条数据,INSERT, DELETE
仍然是有序的。

所以如果 canal 对于 pk 更新也是同样的策略,那么也是一样的。 但我不确定 canal 是怎么处理 pk 更新的,这个需要调研下。

Best,
Jark

On Thu, 5 Nov 2020 at 21:05, hailongwang <18868816...@163.com> wrote:

> Hi,
>
> 可以看下 Jark 的 《基于 Flink SQL CDC 的实时数据同步方案》文章 [1]. 其中在最后的 Q 中描述了
> "首先需要 kafka 在分区中保证有序,同一个 key 的变更数据需要打入到同一个 kafka 的分区里面,这样 flink
> 读取的时候才能保证顺序。"
>
>
> 个人认为,需要 Update 的 key 可以更 canal 采集到 kakfa 的 hash key 一致,这样就保证了有序?
>
>
> [1] https://mp.weixin.qq.com/s/QNJlacBUlkMT7ksKKSNa5Q
>
>
> Best,
> Hailong Wang
>
>
>
>
>
> 在 2020-11-05 15:35:55,"18392099563" <18392099...@163.com> 写道:
> >hi everyone,
> >麻烦请教下各位大神,flink如何处理如下问题:
>
> >flink1.11引入cdc,可以解析canal和debezuim发送的CDC数据,其中canal一般是可以指定某些字段作为key进行hash分区发送到同一topic下的不同分区的。
> >如果源端短时间对pk值进行多次update,则有可能导致发往不同分区,从而无法保证顺序性。
> >假如
> >1.有源表和目标表:
> >create table test(
> >id int(10) primary key
> >)
> >2.源表的增量数据通过canal发往kafka,目标表接收kafka消息进行同步。
> >3.发往的topic下有三个partition:p0、p1、p2
> >4.源端和目标端都有一条记录id=1
> >
> >此时对源端进行两次update:
> >update1:update test set id=2 where id=1;
> >update2: update test set id=3 wehre id=2;
>
> >假如两条消息都在同一批message中发往kafka,其中update1发送到p1,pudate2发送到p2,这两条消息的顺序性是无法保证的,假如update2先到达,则目标端最终结果为id=2,与源端结果id=3不一致。
> >
> >
> >
> >
> >
> >--
> >Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: JobManager responsible for xxx lost the leadership.

2020-11-05 文章 赵一旦
standalone模式。

hailongwang <18868816...@163.com> 于2020年11月5日周四 下午8:55写道:

> Hi,
> 你是 on-yarn 的模式吗?
> JobManager 并不是 worker,只是控制 Checkpoint ,接收 TM 的心跳等,可以看下在这个之前的其它日志。
> 还可以看下 ZK 是否正常等。
> On-yarn 的话,也可以看下 NM 对这个AM处理 的日志。
>
>
> Best,
> Hailong Wang
>
> 在 2020-11-05 15:03:11,"赵一旦"  写道:
> >JobManager responsible for ff2118284beed21ac220ee7cc0a639c0 lost the
> >leadership.
> >
> >
> >
> >这种错误原因是什么,会导致任务重启,本身压力大,突然重启使用10分钟前的ckpt,压力更大了。
>


Re: keyBy的数据均衡性

2020-11-05 文章 赵一旦
如果uid=0的组合数规模大点,能够更加均衡的分到10个并发算子。那么相当于10个并发算子能公平的分到流量低(uid!=0)的组合,以及流量高(uid=0)的组合。所以本身就不会不均衡了。
此处应该是因为*sid+subid+browser*的组合数正好也不够大导致的。

感觉有道理不。

赵一旦  于2020年11月5日周四 下午9:08写道:

> 感觉好像有道理哈哈。
>
> 分析下:*sid+subid+browser+uid* 一共大约300w假设,*sid+subid+browser *则假设是300个。
>   那么uid=0的存在300种组合,即 *300w种组合 *中有 *300种组合(uid=0) *是相对大概率出现的。
>
>   那么这300种大概率出现的组合如果碰巧分布不够均衡,就会导致window算子部分不均衡。
>
> 之前我考虑了uid的问题,但想的是hash是一堆字段一起哈希,uid自身不均衡不会导致问题。但基于如上分析,貌似是有问题的。因为uid=0的组合数的
> *规模太小(300)*,如果这个规模稍微大点的话,uid的不均衡就不会导致这个问题了可能。
>
>
>
> 范瑞 <836961...@qq.com> 于2020年11月5日周四 下午8:29写道:
>
>> Hello
>>
>>
>> 是不是因为 uid = 0 的数据较多导致的倾斜呢?
>>
>>
>> BestWishes
>> fanrui
>>
>>
>>
>>
>> --原始邮件--
>> 发件人:
>>   "user-zh"
>> <
>> hinobl...@gmail.com;
>> 发送时间:2020年11月5日(星期四) 晚上8:13
>> 收件人:"user-zh">
>> 主题:keyBy的数据均衡性
>>
>>
>>
>> 我这边遇到一个情况比较奇怪。
>> (1)一整天数据的统计信息如下:
>> sid+subid+browser+ip: 13068577
>> sid+subid+browser+uid: 2962237
>> 如上,sid和subid是渠道和子渠道,browser是浏览器,ip和uid都是一个相对变化较大的维度。
>> *数字是distinct count信息。*
>> (2)任务逻辑
>>
>> 流A,分别基于sid+subid+browser+ip和sid+subid+browser+uid组合维护做统计。window算子并发都是10。结果是sid+subid+browser+ip的window算子收到数据基本均衡(1.09G~1.48G),此处是指运行一段时间后。但sid+subid+browser+uid算子收到数据却很不均衡(230MB~6.84G)。
>>
>> 我的疑问是,虽然keyBy不能完全均衡,这很好理解。但是差距也太奇葩了。230MB和6.84G。
>> 而且从统计信息看uid的确没有ip区分度大。但 sid+subid+browser+uid 的组合数达到 296w,并发才10,会这么不均衡的嘛?
>
>


Re: keyBy的数据均衡性

2020-11-05 文章 赵一旦
感觉好像有道理哈哈。

分析下:*sid+subid+browser+uid* 一共大约300w假设,*sid+subid+browser *则假设是300个。
  那么uid=0的存在300种组合,即 *300w种组合 *中有 *300种组合(uid=0) *是相对大概率出现的。

  那么这300种大概率出现的组合如果碰巧分布不够均衡,就会导致window算子部分不均衡。

之前我考虑了uid的问题,但想的是hash是一堆字段一起哈希,uid自身不均衡不会导致问题。但基于如上分析,貌似是有问题的。因为uid=0的组合数的
*规模太小(300)*,如果这个规模稍微大点的话,uid的不均衡就不会导致这个问题了可能。



范瑞 <836961...@qq.com> 于2020年11月5日周四 下午8:29写道:

> Hello
>
>
> 是不是因为 uid = 0 的数据较多导致的倾斜呢?
>
>
> BestWishes
> fanrui
>
>
>
>
> --原始邮件--
> 发件人:
>   "user-zh"
> <
> hinobl...@gmail.com;
> 发送时间:2020年11月5日(星期四) 晚上8:13
> 收件人:"user-zh"
> 主题:keyBy的数据均衡性
>
>
>
> 我这边遇到一个情况比较奇怪。
> (1)一整天数据的统计信息如下:
> sid+subid+browser+ip: 13068577
> sid+subid+browser+uid: 2962237
> 如上,sid和subid是渠道和子渠道,browser是浏览器,ip和uid都是一个相对变化较大的维度。
> *数字是distinct count信息。*
> (2)任务逻辑
>
> 流A,分别基于sid+subid+browser+ip和sid+subid+browser+uid组合维护做统计。window算子并发都是10。结果是sid+subid+browser+ip的window算子收到数据基本均衡(1.09G~1.48G),此处是指运行一段时间后。但sid+subid+browser+uid算子收到数据却很不均衡(230MB~6.84G)。
>
> 我的疑问是,虽然keyBy不能完全均衡,这很好理解。但是差距也太奇葩了。230MB和6.84G。
> 而且从统计信息看uid的确没有ip区分度大。但 sid+subid+browser+uid 的组合数达到 296w,并发才10,会这么不均衡的嘛?


Re:flink1.11的cdc功能对消息顺序性的处理

2020-11-05 文章 hailongwang
Hi,
 
可以看下 Jark 的 《基于 Flink SQL CDC 的实时数据同步方案》文章 [1]. 其中在最后的 Q 中描述了
"首先需要 kafka 在分区中保证有序,同一个 key 的变更数据需要打入到同一个 kafka 的分区里面,这样 flink 读取的时候才能保证顺序。"


个人认为,需要 Update 的 key 可以更 canal 采集到 kakfa 的 hash key 一致,这样就保证了有序?


[1] https://mp.weixin.qq.com/s/QNJlacBUlkMT7ksKKSNa5Q


Best,
Hailong Wang





在 2020-11-05 15:35:55,"18392099563" <18392099...@163.com> 写道:
>hi everyone,
>麻烦请教下各位大神,flink如何处理如下问题:
>flink1.11引入cdc,可以解析canal和debezuim发送的CDC数据,其中canal一般是可以指定某些字段作为key进行hash分区发送到同一topic下的不同分区的。
>如果源端短时间对pk值进行多次update,则有可能导致发往不同分区,从而无法保证顺序性。
>假如
>1.有源表和目标表:
>create table test(
>id int(10) primary key
>)
>2.源表的增量数据通过canal发往kafka,目标表接收kafka消息进行同步。
>3.发往的topic下有三个partition:p0、p1、p2
>4.源端和目标端都有一条记录id=1
>
>此时对源端进行两次update:
>update1:update test set id=2 where id=1;
>update2: update test set id=3 wehre id=2;
>假如两条消息都在同一批message中发往kafka,其中update1发送到p1,pudate2发送到p2,这两条消息的顺序性是无法保证的,假如update2先到达,则目标端最终结果为id=2,与源端结果id=3不一致。
>
>
>
>
>
>--
>Sent from: http://apache-flink.147419.n8.nabble.com/


Re:Re:Re:union all 丢失部分数据

2020-11-05 文章 hailongwang


可以确认下 union all 之后的数据是不是根据 group by 的 key 相互覆盖的情况。

在 2020-11-05 13:24:20,"夜思流年梦"  写道:
>
>
>
>
>
>
>
>
>
>flink 版本是1.11的版本了
>
>
>
>
>
>
>
>
>在 2020-11-05 00:02:12,"hailongwang" <18868816...@163.com> 写道:
>>Hi liaobiao,
>>
>>
>>你的 flink 版本是什么呢?
>>根据你的 SQL,如果是版本是 <= 1.10 的话,会根据 MetaDataHander 识别出你的 group by 后面的 key 作为 
>>upsert key,这样就会产生覆盖的情况。
>>你看下结果是否是这种情况的?
>>
>>
>>Best,
>>Hailong Wang
>>
>>
>>
>>
>>在 2020-11-04 17:20:23,"夜思流年梦"  写道:
>>>开发者好:
>>>   目前有此场景:算不同部门的总收入和所有部门的总收入,打算把两部分SQL union all  ,但是实际情况发现 
>>> union all的时候会丢一部分数据,要么是各个部门的数据少了,要么是所有部门的总收入少了
>>>   如果把union  all 的两段SQL 分别独立出来,插入同一张表,那么数据就是正常的,不知道是否是bug还是使用方法不对
>>>
>>>
>>>
>>>
>>>原sql :
>>>
>>>
>>>insert into dws_
>>>
>>>
>>>select 
>>>0 as id
>>>,cast (DATE_FORMAT(LOCALTIMESTAMP, 'MMddHH') as bigint) as ftime
>>>,case 
>>>when dept_name like '%XX%' then 'X1'
>>>when dept_name = 'xXX' then 'X2'
>>>else 'X3' end as paytype
>>>,count(orderid) as paynum_h 
>>>,round(sum(amt)) as paymoney_h 
>>>from dwd_XXX
>>>where write_time >=DATE_FORMAT(LOCALTIMESTAMP, '-MM-dd')
>>>group by
>>>DATE_FORMAT(LOCALTIMESTAMP, 'MMddHH'), 
>>>case 
>>>when dept_name like '%XX%' then 'X1'
>>>when dept_name = 'xXX' then 'X2'
>>>else 'X3' end ;
>>>
>>>
>>>
>>>
>>>union all
>>>
>>>
>>>
>>>
>>>select 0 as id
>>>,cast (DATE_FORMAT(LOCALTIMESTAMP, 'MMddHH') as int) as ftime
>>>,'all' as paytype
>>>,count(orderid) as paynum_h  
>>>,round(sum(amt)) as paymoney_h  
>>>from dwd_XXX
>>>where write_time >=DATE_FORMAT(LOCALTIMESTAMP, '-MM-dd')
>>>group by DATE_FORMAT(LOCALTIMESTAMP, 'MMddHH') ;


Re:JobManager responsible for xxx lost the leadership.

2020-11-05 文章 hailongwang
Hi,
你是 on-yarn 的模式吗?
JobManager 并不是 worker,只是控制 Checkpoint ,接收 TM 的心跳等,可以看下在这个之前的其它日志。
还可以看下 ZK 是否正常等。
On-yarn 的话,也可以看下 NM 对这个AM处理 的日志。


Best,
Hailong Wang

在 2020-11-05 15:03:11,"赵一旦"  写道:
>JobManager responsible for ff2118284beed21ac220ee7cc0a639c0 lost the
>leadership.
>
>
>
>这种错误原因是什么,会导致任务重启,本身压力大,突然重启使用10分钟前的ckpt,压力更大了。


Re:flink 1.11.0 chk超时

2020-11-05 文章 hailongwang
Hi liangji,


CP 超时的原因一般是因任务而议的。从你提供的 2 张截图来看,卡在第二个 operator 的 subtask3 上。
上下两个 operator 之间的关系是 forworad 还是 reblance 呢?如果是 forward 的话,可以看下是不是数据倾斜,subtask3 
需要处理的数据量比较多。
如果是 reblance 的话,以为 subtask1 和 subtask2 都成功了,所以上游的 barrier 应该都往下发了,所以 
subtask3也收到了上游的 barrier,而 reblance 数据量都一样,所以可以看下是不是 sink 出去太慢导致。
查看任务一般可以看下任务的日志, GC,采堆栈,画火焰图等。


Best,
Hailong Wang


在 2020-11-05 16:26:06,"liangji"  写道:
>
> 
>
> 
>
>chk history如图,以下是TM中找到的INFO信息:
>2020-11-05 13:13:38,101 INFO 
>org.apache.flink.streaming.api.functions.sink.filesystem.Buckets [] -
>Subtask 2 checkpointing for checkpoint with id=16 (max part counter=6).
>2020-11-05 13:13:38,143 INFO 
>org.apache.flink.streaming.api.functions.sink.filesystem.Buckets [] -
>Subtask 1 checkpointing for checkpoint with id=16 (max part counter=0).
>2020-11-05 13:14:37,779 WARN  org.apache.kafka.clients.NetworkClient   
>   
>[] - Connection to node -3 could not be established. Broker may not be
>available.
>2020-11-05 13:14:37,786 WARN  org.apache.kafka.clients.NetworkClient   
>   
>[] - Connection to node -2 could not be established. Broker may not be
>available.
>2020-11-05 13:33:38,115 INFO  org.apache.flink.runtime.taskmanager.Task
>   
>[] - Attempting to cancel task Source: Custom Source -> Process -> (Sink:
>Hdfs sink, Sink: HistoryTopic hbase sink, Sink: HistoryTopicCopy hbase sink)
>(3/3) (68bfa6305a9aa5a7381b9ca4a8fef2fa).
>请路过的大佬们指点下chk超时怎么定位问题,多谢(目前日志级别切换不成debug)
>
>
>
>--
>Sent from: http://apache-flink.147419.n8.nabble.com/


keyBy的数据均衡性

2020-11-05 文章 赵一旦
我这边遇到一个情况比较奇怪。
(1)一整天数据的统计信息如下:
sid+subid+browser+ip: 13068577
sid+subid+browser+uid: 2962237
如上,sid和subid是渠道和子渠道,browser是浏览器,ip和uid都是一个相对变化较大的维度。
*数字是distinct count信息。*
(2)任务逻辑
流A,分别基于sid+subid+browser+ip和sid+subid+browser+uid组合维护做统计。window算子并发都是10。结果是sid+subid+browser+ip的window算子收到数据基本均衡(1.09G~1.48G),此处是指运行一段时间后。但sid+subid+browser+uid算子收到数据却很不均衡(230MB~6.84G)。

我的疑问是,虽然keyBy不能完全均衡,这很好理解。但是差距也太奇葩了。230MB和6.84G。
而且从统计信息看uid的确没有ip区分度大。但 sid+subid+browser+uid 的组合数达到 296w,并发才10,会这么不均衡的嘛?


Re: flink-1.11 写 hive 报错

2020-11-05 文章 Rui Li
感觉像是依赖冲突,hive和Hadoop的版本是什么呢?

On Thu, Nov 5, 2020 at 3:50 PM nashcen <2415370...@qq.com> wrote:

>
>
> flink 读 kafka 写 hive,之前运行好好的。在IDEA也能正常运行,打成jar包,提交到 flink 集群,报错如下。请问是什么原因?
>
> 2020-11-05 15:34:36
> org.apache.flink.connectors.hive.FlinkHiveException:
> org.apache.flink.table.catalog.exceptions.CatalogException: Failed to
> create
> Hive RecordWriter
> at
>
> org.apache.flink.connectors.hive.write.HiveWriterFactory.createRecordWriter(HiveWriterFactory.java:159)
> at
>
> org.apache.flink.connectors.hive.write.HiveBulkWriterFactory.create(HiveBulkWriterFactory.java:47)
> at
>
> org.apache.flink.formats.hadoop.bulk.HadoopPathBasedPartFileWriter$HadoopPathBasedBucketWriter.openNewInProgressFile(HadoopPathBasedPartFileWriter.java:257)
> at
>
> org.apache.flink.formats.hadoop.bulk.HadoopPathBasedPartFileWriter$HadoopPathBasedBucketWriter.openNewInProgressFile(HadoopPathBasedPartFileWriter.java:230)
> at
>
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.rollPartFile(Bucket.java:209)
> at
>
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.write(Bucket.java:200)
> at
>
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onElement(Buckets.java:282)
> at
>
> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.onElement(StreamingFileSinkHelper.java:104)
> at
>
> org.apache.flink.table.filesystem.stream.StreamingFileWriter.processElement(StreamingFileWriter.java:118)
> at
>
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
> at
>
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
> at
>
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
> at
>
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
> at
>
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
> at StreamExecCalc$43.processElement(Unknown Source)
> at
>
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
> at
>
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
> at
>
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
> at
>
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
> at
>
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
> at
>
> org.apache.flink.table.runtime.operators.wmassigners.WatermarkAssignerOperator.processElement(WatermarkAssignerOperator.java:123)
> at
>
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
> at
>
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
> at
>
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
> at
>
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
> at
>
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
> at StreamExecCalc$19.processElement(Unknown Source)
> at
>
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
> at
>
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
> at
>
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
> at
>
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
> at
>
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
> at
>
> org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310)
> at
>
> org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409)
> at
>
> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352)
> at
>
> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:185)
> at
>
> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141)
> at
>
> 

回复: Flink1.11.0 sql org.apache.flink.connector.jdbc.dialect.MySQLDialect发现疑似bug

2020-11-05 文章 史 正超
应该是没有问题的,
首先你在flink sql中指定的primary key 应该要与mysql中的唯一索引或者主键对应。
其次那个方法里组装出来的语句 类似下面的语句:
INSERT INTO `tablename`(`key1`, `key2`, `f1`, `f2`) VALUES (?, ?, ?, ?) ON 
DUPLICATE KEY UPDATE `key1`=VALUES(`key1`), `key2`=VALUES(`key2`), 
`f1`=VALUES(`f1`), `f2`=VALUES(`f2`)

里面已经包含了定义的key, 当发生唯一键冲突时,会执行更新。所以无需指定uniqueKeyFields的

发件人: wind.fly@outlook.com 
发送时间: 2020年11月5日 10:58
收件人: user-zh@flink.apache.org 
主题: Flink1.11.0 sql org.apache.flink.connector.jdbc.dialect.MySQLDialect发现疑似bug

Hi,all:
   近日因为用到JdbcDynamicTableSink,发现往mysql插入数据时没有按我指定的primary 
key更新数据,无意间追踪到org.apache.flink.connector.jdbc.dialect.MySQLDialect类中getUpsertStatement方法:


/**
*
 Mysql upsert query use DUPLICATE KEY UPDATE.
*
*
 NOTE: It requires Mysql's primary key to be consistent with pkFields.
*
*
 We don't use REPLACE INTO, if there are other fields, we can keep their 
previous values.
*/
@Override
public Optional getUpsertStatement(String tableName, String[] 
fieldNames, String[] uniqueKeyFields)
 {
String
 updateClause = Arrays.stream(fieldNames)
.map(f ->
 quoteIdentifier(f) + "=VALUES(" + quoteIdentifier(f) + ")")
.collect(Collectors.joining(", "));
return Optional.of(getInsertIntoStatement(tableName,
 fieldNames) +
"
 ON DUPLICATE KEY UPDATE " + updateClause
);
}

该方法中有uniqueKeyFields参数但是没有用到,且我认为updateClause应该是用uniqueKeyFields生成, 代码改成:

String updateClause = Arrays.stream(uniqueKeyFields)
   .map(f -> quoteIdentifier(f) + "=VALUES(" + quoteIdentifier(f) + ")")
   .collect(Collectors.joining(", "));

麻烦各位大佬确认。


Flink1.11.0 sql org.apache.flink.connector.jdbc.dialect.MySQLDialect发现疑似bug

2020-11-05 文章 wind.fly....@outlook.com
Hi,all:
   近日因为用到JdbcDynamicTableSink,发现往mysql插入数据时没有按我指定的primary 
key更新数据,无意间追踪到org.apache.flink.connector.jdbc.dialect.MySQLDialect类中getUpsertStatement方法:


/**
*
 Mysql upsert query use DUPLICATE KEY UPDATE.
*
*
 NOTE: It requires Mysql's primary key to be consistent with pkFields.
*
*
 We don't use REPLACE INTO, if there are other fields, we can keep their 
previous values.
*/
@Override
public Optional getUpsertStatement(String tableName, String[] 
fieldNames, String[] uniqueKeyFields)
 {
String
 updateClause = Arrays.stream(fieldNames)
.map(f ->
 quoteIdentifier(f) + "=VALUES(" + quoteIdentifier(f) + ")")
.collect(Collectors.joining(", "));
return Optional.of(getInsertIntoStatement(tableName,
 fieldNames) +
"
 ON DUPLICATE KEY UPDATE " + updateClause
);
}

该方法中有uniqueKeyFields参数但是没有用到,且我认为updateClause应该是用uniqueKeyFields生成, 代码改成:

String updateClause = Arrays.stream(uniqueKeyFields)
   .map(f -> quoteIdentifier(f) + "=VALUES(" + quoteIdentifier(f) + ")")
   .collect(Collectors.joining(", "));

麻烦各位大佬确认。


Re: Re: sql-cli执行sql报错

2020-11-05 文章 zhisheng
这个问题同样在最新的 master 分支也有这个问题,我建了一个 Issue 描述了下整个流程
https://issues.apache.org/jira/browse/FLINK-19995

hl9...@126.com  于2020年9月28日周一 下午6:06写道:

> 按照您的方法重试了下,又报了另一个错误:
> Flink SQL> CREATE TABLE tx (
> > account_id  BIGINT,
> > amount  BIGINT,
> > transaction_time TIMESTAMP(3),
> > WATERMARK FOR transaction_time AS transaction_time -
> INTERVAL '5' SECOND
> > ) WITH (
> > 'connector.type' = 'kafka',
> > 'connector.version' = 'universal',
> > 'connector.topic' = 'heli01',
> > 'connector.properties.group.id' = 'heli-test',
> > 'connector.properties.bootstrap.servers' = '
> 10.100.51.56:9092',
> > 'connector.startup-mode' = 'earliest-offset',
> > 'format.type'= 'csv'
> > );
> [INFO] Table has been created.
>
> Flink SQL> show tables ;
> tx
>
> Flink SQL> select * from tx ;
> [ERROR] Could not execute SQL statement. Reason:
> org.apache.flink.kafka.shaded.org.apache.kafka.common.KafkaException:
> org.apache.kafka.common.serialization.ByteArrayDeserializer is not an
> instance of
> org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.Deserializer
>
> 附:lib包清单
> [test@rcx51101 lib]$ pwd
> /opt/flink-1.10.2/lib
>
> flink-csv-1.10.2.jar
> flink-dist_2.12-1.10.2.jar
> flink-jdbc_2.12-1.10.2.jar
> flink-json-1.10.2.jar
> flink-shaded-hadoop-2-uber-2.6.5-10.0.jar
> flink-sql-connector-kafka_2.11-1.10.2.jar
> flink-table_2.12-1.10.2.jar
> flink-table-blink_2.12-1.10.2.jar
> log4j-1.2.17.jar
> mysql-connector-java-5.1.48.jar
> slf4j-log4j12-1.7.15.jar
>
>
>
>
> hl9...@126.com
>
> 发件人: Leonard Xu
> 发送时间: 2020-09-28 16:36
> 收件人: user-zh
> 主题: Re: sql-cli执行sql报错
> Hi
> benchao的回复是的对的,
> 你用SQL client 时, 不需要datastream connector的jar包,直接用SQL connector 对应的jar包
> flink-*sql*-connector-kafka***.jar就行了,把你添加的其他jar包都删掉。
>
>
> > 相关lib包:
> > flink-connector-kafka_2.12-1.10.2.jar
> > kafka-clients-0.11.0.3.jar
>
> 祝好
> Leonard
>


flink 1.11.0 chk超时

2020-11-05 文章 liangji

 

 

chk history如图,以下是TM中找到的INFO信息:
2020-11-05 13:13:38,101 INFO 
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets [] -
Subtask 2 checkpointing for checkpoint with id=16 (max part counter=6).
2020-11-05 13:13:38,143 INFO 
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets [] -
Subtask 1 checkpointing for checkpoint with id=16 (max part counter=0).
2020-11-05 13:14:37,779 WARN  org.apache.kafka.clients.NetworkClient
  
[] - Connection to node -3 could not be established. Broker may not be
available.
2020-11-05 13:14:37,786 WARN  org.apache.kafka.clients.NetworkClient
  
[] - Connection to node -2 could not be established. Broker may not be
available.
2020-11-05 13:33:38,115 INFO  org.apache.flink.runtime.taskmanager.Task 
  
[] - Attempting to cancel task Source: Custom Source -> Process -> (Sink:
Hdfs sink, Sink: HistoryTopic hbase sink, Sink: HistoryTopicCopy hbase sink)
(3/3) (68bfa6305a9aa5a7381b9ca4a8fef2fa).
请路过的大佬们指点下chk超时怎么定位问题,多谢(目前日志级别切换不成debug)



--
Sent from: http://apache-flink.147419.n8.nabble.com/


flink 1.11.0 chk超时

2020-11-05 文章 liangji

 

 

chk history如图,以下是TM中找到的INFO信息:
2020-11-05 13:13:38,101 INFO 
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets [] -
Subtask 2 checkpointing for checkpoint with id=16 (max part counter=6).
2020-11-05 13:13:38,143 INFO 
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets [] -
Subtask 1 checkpointing for checkpoint with id=16 (max part counter=0).
2020-11-05 13:14:37,779 WARN  org.apache.kafka.clients.NetworkClient
  
[] - Connection to node -3 could not be established. Broker may not be
available.
2020-11-05 13:14:37,786 WARN  org.apache.kafka.clients.NetworkClient
  
[] - Connection to node -2 could not be established. Broker may not be
available.
2020-11-05 13:33:38,115 INFO  org.apache.flink.runtime.taskmanager.Task 
  
[] - Attempting to cancel task Source: Custom Source -> Process -> (Sink:
Hdfs sink, Sink: HistoryTopic hbase sink, Sink: HistoryTopicCopy hbase sink)
(3/3) (68bfa6305a9aa5a7381b9ca4a8fef2fa).
请路过的大佬们指点下chk超时怎么定位问题,多谢(目前日志级别切换不成debug)



--
Sent from: http://apache-flink.147419.n8.nabble.com/


flink1.11的cdc功能对消息顺序性的处理

2020-11-05 文章 18392099563
hi everyone,
麻烦请教下各位大神,flink如何处理如下问题:
flink1.11引入cdc,可以解析canal和debezuim发送的CDC数据,其中canal一般是可以指定某些字段作为key进行hash分区发送到同一topic下的不同分区的。
如果源端短时间对pk值进行多次update,则有可能导致发往不同分区,从而无法保证顺序性。
假如
1.有源表和目标表:
create table test(
id int(10) primary key
)
2.源表的增量数据通过canal发往kafka,目标表接收kafka消息进行同步。
3.发往的topic下有三个partition:p0、p1、p2
4.源端和目标端都有一条记录id=1

此时对源端进行两次update:
update1:update test set id=2 where id=1;
update2: update test set id=3 wehre id=2;
假如两条消息都在同一批message中发往kafka,其中update1发送到p1,pudate2发送到p2,这两条消息的顺序性是无法保证的,假如update2先到达,则目标端最终结果为id=2,与源端结果id=3不一致。





--
Sent from: http://apache-flink.147419.n8.nabble.com/

flink 1.11.0 chk超时

2020-11-05 文章 liangji

 

 
chk的历史如图,第三个subtask未能ack,同时在TM中只能找到如下信息:

2020-11-05 13:13:38,101 INFO 
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets [] -
Subtask 2 checkpointing for checkpoint with id=16 (max part counter=6).
2020-11-05 13:13:38,143 INFO 
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets [] -
Subtask 1 checkpointing for checkpoint with id=16 (max part counter=0).
2020-11-05 13:14:37,779 WARN  org.apache.kafka.clients.NetworkClient
  
[] - Connection to node -3 could not be established. Broker may not be
available.
2020-11-05 13:14:37,786 WARN  org.apache.kafka.clients.NetworkClient
  
[] - Connection to node -2 could not be established. Broker may not be
available.
2020-11-05 13:33:38,115 INFO  org.apache.flink.runtime.taskmanager.Task 
  
[] - Attempting to cancel task Source: Custom Source -> Process -> (Sink:
***, Sink: *** sink, Sink: ***) (3/3) (68bfa6305a9aa5a7381b9ca4a8fef2fa).

请路过的大佬们指点下排查方式,多谢(当前日志级别暂无法改成debug)



--
Sent from: http://apache-flink.147419.n8.nabble.com/


flink sql读写带kerberos认证的kafka问题请教

2020-11-05 文章 amen...@163.com
hi everyone,

想请问下社区及各位大神们,通过Flink Table 
API连接带有kerberos认证的kafka时,怎么做的kafka集群中topic和group权限认证的?

best,
amenhub





??????1.11.1 ??OutOfMemoryError: Metaspace. ????

2020-11-05 文章 Asahi Lee
??
   ??flink 
1.11.2MySQL
/*
  Copyright (c) 2013, 2015, Oracle and/or its affiliates. All rights reserved.

  The MySQL Connector/J is licensed under the terms of the GPLv2
  

JobManager responsible for xxx lost the leadership.

2020-11-05 文章 赵一旦
JobManager responsible for ff2118284beed21ac220ee7cc0a639c0 lost the
leadership.



这种错误原因是什么,会导致任务重启,本身压力大,突然重启使用10分钟前的ckpt,压力更大了。