Re:flink s3 checkpoint 一直IN_PROGRESS(100%)直到失败
图挂了 https://postimg.cc/Z9XdxwSk 在 2022-03-08 14:05:39,"Sun.Zhu" <17626017...@163.com> 写道: hi all, flink 1.13.2,将checkpoint 写到S3但是一直成功不了,一直显示IN_PROGRESS,直到超时失败,有大佬遇到过吗?
flink s3 checkpoint 一直IN_PROGRESS(100%)直到失败
hi all, flink 1.13.2,将checkpoint 写到S3但是一直成功不了,一直显示IN_PROGRESS,直到超时失败,有大佬遇到过吗?
回复: 一个source多个sink的同步问题
窗口的触发逻辑就是这样的,必须watermark达到了窗口结束时间才会触发,可能10-11点的窗口中的数据最大只有10:59呢 | | Sun.Zhu | | 17626017...@163.com | 签名由网易邮箱大师定制 在2020年07月7日 18:44,lgs<9925...@qq.com> 写道: 是1个小时才到来。10:00- 11:00的数据,11:01分到来。 但是现在的问题是这个数据来了,我的第一个sink马上就保存到数据库了, 11:02进数据库。但是第二个sink,因为有tumble window,所以10:00- 11:00的数据,需要到12:01,才会触发这个窗口。 -- Sent from: http://apache-flink.147419.n8.nabble.com/
回复: Flink sql 主动使数据延时一段时间有什么方案
感谢benchao和forideal的方案, 方法1.使用udf,查不到 sleep 等一下在查 --这个可以尝试 方法2.在 join operator处数据等一会再去查 —我们使用的是flink sql,不是streaming,所以该方案可能行不通 方法3.如果没有 join 上,就把数据发到source,循环join。 --我们这个维表join的场景类似filter的功能,如果关联上则主流数据就不处理了,所以不一定非要join上,只是想延迟一会提升准确率 方法4.如果你的source的mq支持延迟消息,这个应该就不需要Flink做什么了,直接用mq的延迟消息就可以了 —我们的source是kafka,好像不支持kafka的功能 方法5.扩展了下 Flink 的 Source。比如在 kafka connector 中加了一个 time.wait 的属性,当用户设置了这个属性,就让source 的数据等一会儿发到下游。起到等一会的效果。 --这个方案需要修改源码,也可以试一下 Best Sun.Zhu | | Sun.Zhu | | 17626017...@163.com | 签名由网易邮箱大师定制 在2020年07月3日 23:26,forideal 写道: Hi 刚刚本超说了四种方法, 方法1.使用udf,查不到 sleep 等一下在查 方法2.在 join operator处数据等一会再去查 方法3.如果没有 join 上,就把数据发到source,循环join。 方法4.如果你的source的mq支持延迟消息,这个应该就不需要Flink做什么了,直接用mq的延迟消息就可以了 上述方法应该都能实现相同的效果。 我们也实现了一种方法。这种方法是扩展了下 Flink 的 Source。比如在 kafka connector 中加了一个 time.wait 的属性,当用户设置了这个属性,就让source 的数据等一会儿发到下游。起到等一会的效果。 Best forideal 在 2020-07-03 23:05:06,"Benchao Li" 写道: 奥,对,还有一种思路。如果你的source的mq支持延迟消息,这个应该就不需要Flink做什么了,直接用mq的延迟消息就可以了。 admin <17626017...@163.com> 于2020年7月3日周五 下午5:54写道: Hi,all 我们有这样一个场景,双流join,一个快流,一个慢流,想让快流等一段时间,目的是能提高join的命中率。 FLink sql有什么方案实现吗? 感谢您的回复 -- Best, Benchao Li
回复:Flink sql 主动使数据延时一段时间有什么方案
窗口得用group by,字段会丢失 在2020年07月03日 19:11,kcz 写道: 设置一个窗口时间,如果有需要取最新的,可以再做一下处理。 -- 原始邮件 -- 发件人: admin <17626017...@163.com 发送时间: 2020年7月3日 18:01 收件人: user-zh
回复: 如何快速定位拖慢速度的 operator
虽然chain在一起,但是可以通过metrics中看出来各个算子的各项指标的 | | Sun.Zhu | | 17626017...@163.com | 签名由网易邮箱大师定制 在2020年06月25日 00:51,徐骁 写道: 两个方法确实可以, 但是要追踪起来很废时间, 对小白太不友好啊
?????? ??????savepoint????????????????????
hi??claylin ??uidDAG?? | | Sun.Zhu | | 17626017...@163.com | ?? ??2020??06??23?? 16:29??claylin<1012539...@qq.com> ?? ??savepoint?? flatmap??jobgraph ??flatmap??Rebalance?? ---- ??:"Congxian Qiu"https://issues.apache.org/jira/browse/FLINK-5601 <https://issues.apache.org/jira/browse/FLINK-5601?; Best, Congxian claylin <1012539...@qq.com ??2020??6??23?? 2:44?? ??watermark?? --nbsp;nbsp;-- ??:nbsp;"Congxian Qiu"
回复: sqlclient集成hiveCatalog查询kafka表问题
非常感谢,我去试试 | | Sun.Zhu | | 17626017...@163.com | 签名由网易邮箱大师定制 在2020年06月18日 18:13,Rui Li 写道: 需要启动一个独立的metastore server,然后hive.metastore.uris配置的是你metastore server的地址。 最简单的场景,在本地启动metastore server命令:hive --service metastore hive.metastore.uris设置成:thrift://localhost:9083 更详细的metastore使用方法可以参考hive文档: https://cwiki.apache.org/confluence/display/Hive/AdminManual+Metastore+Administration On Thu, Jun 18, 2020 at 5:21 PM Sun.Zhu <17626017...@163.com> wrote: 对应这种改动还是挺大的,有对应的说明文档吗? hive.metastore.uris 这个需要怎么配置,有样例吗? | | Sun.Zhu | | 17626017...@163.com | 签名由网易邮箱大师定制 在2020年06月18日 17:01,Rui Li 写道: 是的,embedded模式需要添加额外的jar包,容易导致依赖冲突。而且生产环境中embedded模式也比较少见,所以在1.11中HiveCatalog已经不允许embedded模式了。 On Thu, Jun 18, 2020 at 4:53 PM Leonard Xu wrote: Hi 在 2020年6月18日,16:45,Sun.Zhu <17626017...@163.com> 写道: Caused by: java.lang.IllegalArgumentException: Embedded metastore is not allowed. Make sure you have set a valid value for hive.metastore.uris 错误的原因应该是这个,flink 集成 hive 时 不支持embedded metastore的,你的 hive 需要起一个hive metastore 并在conf文件配置 hive.metastore.uris Best, Leonard Xu -- Best regards! Rui Li -- Best regards! Rui Li
回复: sqlclient集成hiveCatalog查询kafka表问题
对应这种改动还是挺大的,有对应的说明文档吗? hive.metastore.uris 这个需要怎么配置,有样例吗? | | Sun.Zhu | | 17626017...@163.com | 签名由网易邮箱大师定制 在2020年06月18日 17:01,Rui Li 写道: 是的,embedded模式需要添加额外的jar包,容易导致依赖冲突。而且生产环境中embedded模式也比较少见,所以在1.11中HiveCatalog已经不允许embedded模式了。 On Thu, Jun 18, 2020 at 4:53 PM Leonard Xu wrote: Hi 在 2020年6月18日,16:45,Sun.Zhu <17626017...@163.com> 写道: Caused by: java.lang.IllegalArgumentException: Embedded metastore is not allowed. Make sure you have set a valid value for hive.metastore.uris 错误的原因应该是这个,flink 集成 hive 时 不支持embedded metastore的,你的 hive 需要起一个hive metastore 并在conf文件配置 hive.metastore.uris Best, Leonard Xu -- Best regards! Rui Li
回复: sqlclient集成hiveCatalog查询kafka表问题
Hi,Rui Li 我把connector的包也替换成1.11的了,结果sql-cli启动报错 Exception in thread "main" org.apache.flink.table.client.SqlClientException: Unexpected exception. This is a bug. Please consider filing an issue. at org.apache.flink.table.client.SqlClient.main(SqlClient.java:213) Caused by: org.apache.flink.table.client.gateway.SqlExecutionException: Could not create execution context. at org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:818) at org.apache.flink.table.client.gateway.local.LocalExecutor.openSession(LocalExecutor.java:230) at org.apache.flink.table.client.SqlClient.start(SqlClient.java:108) at org.apache.flink.table.client.SqlClient.main(SqlClient.java:201) Caused by: java.lang.IllegalArgumentException: Embedded metastore is not allowed. Make sure you have set a valid value for hive.metastore.uris at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:139) at org.apache.flink.table.catalog.hive.HiveCatalog.(HiveCatalog.java:171) at org.apache.flink.table.catalog.hive.HiveCatalog.(HiveCatalog.java:157) at org.apache.flink.table.catalog.hive.factories.HiveCatalogFactory.createCatalog(HiveCatalogFactory.java:84) at org.apache.flink.table.client.gateway.local.ExecutionContext.createCatalog(ExecutionContext.java:366) at org.apache.flink.table.client.gateway.local.ExecutionContext.lambda$null$6(ExecutionContext.java:565) at java.util.HashMap.forEach(HashMap.java:1289) at org.apache.flink.table.client.gateway.local.ExecutionContext.lambda$initializeCatalogs$7(ExecutionContext.java:564) at org.apache.flink.table.client.gateway.local.ExecutionContext.wrapClassLoader(ExecutionContext.java:252) at org.apache.flink.table.client.gateway.local.ExecutionContext.initializeCatalogs(ExecutionContext.java:563) at org.apache.flink.table.client.gateway.local.ExecutionContext.initializeTableEnvironment(ExecutionContext.java:512) at org.apache.flink.table.client.gateway.local.ExecutionContext.(ExecutionContext.java:171) at org.apache.flink.table.client.gateway.local.ExecutionContext.(ExecutionContext.java:124) at org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:807) hive catalog的配置和1.10.1一样,如下: catalogs: #[] # empty list # A typical catalog definition looks like: - name: myhive type: hive hive-conf-dir: /Users/zhushang/Desktop/software/apache-hive-2.2.0-bin/conf hive-version: 2.2.0 | | Sun.Zhu | | 17626017...@163.com | 签名由网易邮箱大师定制 在2020年06月18日 15:46,Rui Li 写道: 第三方包指的是flink-connector-hive这种吗?这些包在build的时候也会打出来的,只不过没有加到flink-dist里。到对应的module里找一下,比如flink-connector-hive会在/flink-connectors/flink-connector-hive/target下面。 On Thu, Jun 18, 2020 at 12:22 PM Jark Wu wrote: 你可以拿 release-1.11 分支: https://github.com/apache/flink/tree/release-1.11/ 自己编译一下:mvn clean install -DskipTests 在 build-target 下就是打出来的 1.11 的分发包内容。 Best, Jark On Wed, 17 Jun 2020 at 15:30, Sun.Zhu <17626017...@163.com> wrote: 是的,除了编译出来1.11的包之外,第三方包都拿的1.10.1的版本,但是对应的1.11还没有release吧,从哪里获取呢 | | Sun.Zhu | | 17626017...@163.com | 签名由网易邮箱大师定制 在2020年06月17日 13:25,Rui Li 写道: 是说把1.10.1的hive connector跟1.11的flink一起用么?如果这样用是肯定有问题的。可以把版本都统一成1.11试试。 On Wed, Jun 17, 2020 at 12:18 PM Sun.Zhu <17626017...@163.com> wrote: Sqlcli上得报错就上面这点,没有更多得信息了,或者从哪些log里可以看到更多信息 在2020年06月17日 10:27,Benchao Li 写道: 目前这个缺少的依赖是在flink-table-runtime-blink module的,现在这些重构到了flink-table-common module了。 如果只是connector、format这些用老的版本,应该是没有问题的。 你可以把更详细的报错信息发一下吗?看一下具体是哪个模块还在依赖老版本的flink-table-runtime-blink Sun.Zhu <17626017...@163.com> 于2020年6月17日周三 上午12:49写道: 是的 除了1.11 编译出来的包之外依赖的包,比如connector的、hivecatalog需要依赖的包,由于1.11 还没有release所以就用的1.10.1版本的,上面两个问题在1.10.1版本下是没有的,升级了1.11报了不知道什么原因,缺少依赖吗? 在2020年06月16日 18:38,Benchao Li 写道: 1.11中对底层数据结构做了一些重构,所以你不可以直接把1.10的jar包拿到1.11里面使用的。 你可以直接使用1.11里面编译出来的jar包来跑应该是没有问题的。 Sun.Zhu <17626017...@163.com> 于2020年6月16日周二 下午6:11写道: 我编译了1.11包 在sql-cli下查询hive的表报如下错误: [ERROR] Could not execute SQL statement. Reason: java.lang.NoClassDefFoundError: org/apache/flink/table/dataformat/BaseRow 查注册的kafka表报: [ERROR] Could not execute SQL statement. Reason: java.lang.ClassNotFoundException: org.apache.flink.table.dataformat.BaseRow 依赖包是从1.10.1下面拷贝的 | | Sun.Zhu | | 17626017...@163.com | 签名由网易邮箱大师定制 在2020年06月13日 11:44,Sun.Zhu<17626017...@163.com> 写道: Got it! Thx,junbao | | Sun.Zhu | | 17626017...@163.com | 签名由网易邮箱大师定制 在2020年06月13日 09:32,zhangjunbao 写道: 1.10.x版本,在hivecatalog下,建表时用proctime as PROCTIME()应该是有bug的, https://issues.apache.org/jira/browse/FLINK-17189 < https://issues.apache.org/jira/browse/FLINK-17189> Best, Junbao Zhang 2020年6月13日 上午12:31,Sun.Zhu <17626017...@163.com> 写道: hi,all 在用sql client集成hiveCatalog时,在hiveCatalog中注册了一个kafka的table ddl如下: | CREATETABLE user_behavior ( user_id BIGINT, item_id BIGINT, category_id BIGINT, behavior STRING, ts TIMESTAMP(3), proctime as PROCTIME(), -- 通过计算列产生一个处理时间列 WATERMARK FOR ts as ts - INTERVAL'5'SECOND-- 在ts上定义watermark,ts成为事件时
回复:sqlclient集成hiveCatalog查询kafka表问题
Sqlcli上得报错就上面这点,没有更多得信息了,或者从哪些log里可以看到更多信息 在2020年06月17日 10:27,Benchao Li 写道: 目前这个缺少的依赖是在flink-table-runtime-blink module的,现在这些重构到了flink-table-common module了。 如果只是connector、format这些用老的版本,应该是没有问题的。 你可以把更详细的报错信息发一下吗?看一下具体是哪个模块还在依赖老版本的flink-table-runtime-blink Sun.Zhu <17626017...@163.com> 于2020年6月17日周三 上午12:49写道: > 是的 除了1.11 编译出来的包之外依赖的包,比如connector的、hivecatalog需要依赖的包,由于1.11 > 还没有release所以就用的1.10.1版本的,上面两个问题在1.10.1版本下是没有的,升级了1.11报了不知道什么原因,缺少依赖吗? > > > > > 在2020年06月16日 18:38,Benchao Li 写道: > 1.11中对底层数据结构做了一些重构,所以你不可以直接把1.10的jar包拿到1.11里面使用的。 > 你可以直接使用1.11里面编译出来的jar包来跑应该是没有问题的。 > > Sun.Zhu <17626017...@163.com> 于2020年6月16日周二 下午6:11写道: > > > 我编译了1.11包 > > 在sql-cli下查询hive的表报如下错误: > > [ERROR] Could not execute SQL statement. Reason: > > java.lang.NoClassDefFoundError: org/apache/flink/table/dataformat/BaseRow > > > > > > 查注册的kafka表报: > > [ERROR] Could not execute SQL statement. Reason: > > java.lang.ClassNotFoundException: > org.apache.flink.table.dataformat.BaseRow > > > > > > 依赖包是从1.10.1下面拷贝的 > > | | > > Sun.Zhu > > | > > | > > 17626017...@163.com > > | > > 签名由网易邮箱大师定制 > > > > > > 在2020年06月13日 11:44,Sun.Zhu<17626017...@163.com> 写道: > > Got it! > > Thx,junbao > > > > > > | | > > Sun.Zhu > > | > > | > > 17626017...@163.com > > | > > 签名由网易邮箱大师定制 > > > > > > 在2020年06月13日 09:32,zhangjunbao 写道: > > 1.10.x版本,在hivecatalog下,建表时用proctime as PROCTIME()应该是有bug的, > > https://issues.apache.org/jira/browse/FLINK-17189 < > > https://issues.apache.org/jira/browse/FLINK-17189> > > > > Best, > > Junbao Zhang > > > > 2020年6月13日 上午12:31,Sun.Zhu <17626017...@163.com> 写道: > > > > hi,all > > 在用sql client集成hiveCatalog时,在hiveCatalog中注册了一个kafka的table > > ddl如下: > > | > > CREATETABLE user_behavior ( > > user_id BIGINT, > > item_id BIGINT, > > category_id BIGINT, > > behavior STRING, > > ts TIMESTAMP(3), > > proctime as PROCTIME(), -- 通过计算列产生一个处理时间列 > > WATERMARK FOR ts as ts - INTERVAL'5'SECOND-- 在ts上定义watermark,ts成为事件时间列 > > ) WITH ( > > 'connector.type' = 'kafka', -- 使用 kafka connector > > 'connector.version' = 'universal', -- kafka 版本,universal 支持 0.11 以上的版本 > > 'connector.topic' = 'user_behavior', -- kafka topic > > 'connector.startup-mode' = 'earliest-offset', -- 从起始 offset 开始读取 > > 'connector.properties.zookeeper.connect' = 'localhost:2181', -- > zookeeper > > 地址 > > 'connector.properties.bootstrap.servers' = 'localhost:9092', -- kafka > > broker 地址 > > 'format.type' = 'json'-- 数据源格式为 json > > ); > > | > > 在查询时select * from user_behavior;报错如下: > > [ERROR] Could not execute SQL statement. Reason: > > java.lang.AssertionError: Conversion to relational algebra failed to > > preserve datatypes: > > validated type: > > RecordType(BIGINT user_id, BIGINT item_id, BIGINT category_id, > > VARCHAR(2147483647) CHARACTER SET "UTF-16LE" behavior, TIME > > ATTRIBUTE(ROWTIME) ts, TIMESTAMP(3) NOT NULL proctime) NOT NULL > > converted type: > > RecordType(BIGINT user_id, BIGINT item_id, BIGINT category_id, > > VARCHAR(2147483647) CHARACTER SET "UTF-16LE" behavior, TIME > > ATTRIBUTE(ROWTIME) ts, TIME ATTRIBUTE(PROCTIME) NOT NULL proctime) NOT > NULL > > rel: > > LogicalProject(user_id=[$0], item_id=[$1], category_id=[$2], > > behavior=[$3], ts=[$4], proctime=[$5]) > > LogicalWatermarkAssigner(rowtime=[ts], watermark=[-($4, 5000:INTERVAL > > SECOND)]) > > LogicalProject(user_id=[$0], item_id=[$1], category_id=[$2], > > behavior=[$3], ts=[$4], proctime=[PROCTIME()]) > > LogicalTableScan(table=[[myhive, my_db, user_behavior, source: > > [KafkaTableSource(user_id, item_id, category_id, behavior, ts)]]]) > > > > > > flink版本:1.10.1 > > blink planner,streaming model > > > > > > Thx > > | | > > Sun.Zhu > > | > > | > > 17626017...@163.com > > | > > 签名由网易邮箱大师定制 > > > > > > >
回复:通过Kafka更新规则
为什么会重头消费规则呢?没有开启checkpoint吗?重启可以从checkpoint中的offset继续消费kafka中的规则吧 在2020年06月16日 11:57,Ruibin Xing 写道: 我们有一个Flink Job需要一些自定义的规则,希望能够动态添加、更新、删除。规则的数量在百到千条。目前设计的结构是RDB+ Kafka + Flink。 RDB存储规则的完整快照,以展示给Web应用作增删改查。改动通过Kafka发送消息至Flink,通过BroadcastState传播规则。 目前有一个问题没有解决:如何使用Kafka来传递状态。我想了一下,大概有几种方案: 1. 消息标记Add、Upadte、Delete类型,在Flink中写逻辑来处理状态以和RDB中状态保持一致。 目前的问题是,每次重启Job,都需要从头读Kafka,来回放状态的更新。Kafka中的状态消息也需要持久保存。担心长期会堆积很多消息。 2. 使用Kafka Compact Log来解决1的问题。这个方案主要是之前没有使用过Compact Log,不清楚会不会有坑。 3.使用方案1,但是启动时Flink从RDB拉取全量规则。 4. 规则更新后Kafka消息发送全量规则,启动时Flink只拉取最新一条消息。 各位大佬是否有经验可以分享,怎么处理是比较科学的?不胜感激!
回复:如何做Flink Stream的性能测试
Hi 1.11 版本内置了DataGen、print、Blackhole的connector用来辅助功能测试,性能测试,线上观察,欢迎试用 在2020年06月16日 09:26,aven.wu 写道: 各位好; 最近我想测试一下我的程序处理性能如何。请问有什么工具、或者应该通过什么方法来获得一个比较准确的测试结果。 我的场景包含从kafka读取,flink 处理(有查询es做维表关联),处理结果输出到ES 和 Kafka。 Best Aven
回复:sqlclient集成hiveCatalog查询kafka表问题
是的 除了1.11 编译出来的包之外依赖的包,比如connector的、hivecatalog需要依赖的包,由于1.11 还没有release所以就用的1.10.1版本的,上面两个问题在1.10.1版本下是没有的,升级了1.11报了不知道什么原因,缺少依赖吗? 在2020年06月16日 18:38,Benchao Li 写道: 1.11中对底层数据结构做了一些重构,所以你不可以直接把1.10的jar包拿到1.11里面使用的。 你可以直接使用1.11里面编译出来的jar包来跑应该是没有问题的。 Sun.Zhu <17626017...@163.com> 于2020年6月16日周二 下午6:11写道: > 我编译了1.11包 > 在sql-cli下查询hive的表报如下错误: > [ERROR] Could not execute SQL statement. Reason: > java.lang.NoClassDefFoundError: org/apache/flink/table/dataformat/BaseRow > > > 查注册的kafka表报: > [ERROR] Could not execute SQL statement. Reason: > java.lang.ClassNotFoundException: org.apache.flink.table.dataformat.BaseRow > > > 依赖包是从1.10.1下面拷贝的 > | | > Sun.Zhu > | > | > 17626017...@163.com > | > 签名由网易邮箱大师定制 > > > 在2020年06月13日 11:44,Sun.Zhu<17626017...@163.com> 写道: > Got it! > Thx,junbao > > > | | > Sun.Zhu > | > | > 17626017...@163.com > | > 签名由网易邮箱大师定制 > > > 在2020年06月13日 09:32,zhangjunbao 写道: > 1.10.x版本,在hivecatalog下,建表时用proctime as PROCTIME()应该是有bug的, > https://issues.apache.org/jira/browse/FLINK-17189 < > https://issues.apache.org/jira/browse/FLINK-17189> > > Best, > Junbao Zhang > > 2020年6月13日 上午12:31,Sun.Zhu <17626017...@163.com> 写道: > > hi,all > 在用sql client集成hiveCatalog时,在hiveCatalog中注册了一个kafka的table > ddl如下: > | > CREATETABLE user_behavior ( > user_id BIGINT, > item_id BIGINT, > category_id BIGINT, > behavior STRING, > ts TIMESTAMP(3), > proctime as PROCTIME(), -- 通过计算列产生一个处理时间列 > WATERMARK FOR ts as ts - INTERVAL'5'SECOND-- 在ts上定义watermark,ts成为事件时间列 > ) WITH ( > 'connector.type' = 'kafka', -- 使用 kafka connector > 'connector.version' = 'universal', -- kafka 版本,universal 支持 0.11 以上的版本 > 'connector.topic' = 'user_behavior', -- kafka topic > 'connector.startup-mode' = 'earliest-offset', -- 从起始 offset 开始读取 > 'connector.properties.zookeeper.connect' = 'localhost:2181', -- zookeeper > 地址 > 'connector.properties.bootstrap.servers' = 'localhost:9092', -- kafka > broker 地址 > 'format.type' = 'json'-- 数据源格式为 json > ); > | > 在查询时select * from user_behavior;报错如下: > [ERROR] Could not execute SQL statement. Reason: > java.lang.AssertionError: Conversion to relational algebra failed to > preserve datatypes: > validated type: > RecordType(BIGINT user_id, BIGINT item_id, BIGINT category_id, > VARCHAR(2147483647) CHARACTER SET "UTF-16LE" behavior, TIME > ATTRIBUTE(ROWTIME) ts, TIMESTAMP(3) NOT NULL proctime) NOT NULL > converted type: > RecordType(BIGINT user_id, BIGINT item_id, BIGINT category_id, > VARCHAR(2147483647) CHARACTER SET "UTF-16LE" behavior, TIME > ATTRIBUTE(ROWTIME) ts, TIME ATTRIBUTE(PROCTIME) NOT NULL proctime) NOT NULL > rel: > LogicalProject(user_id=[$0], item_id=[$1], category_id=[$2], > behavior=[$3], ts=[$4], proctime=[$5]) > LogicalWatermarkAssigner(rowtime=[ts], watermark=[-($4, 5000:INTERVAL > SECOND)]) > LogicalProject(user_id=[$0], item_id=[$1], category_id=[$2], > behavior=[$3], ts=[$4], proctime=[PROCTIME()]) > LogicalTableScan(table=[[myhive, my_db, user_behavior, source: > [KafkaTableSource(user_id, item_id, category_id, behavior, ts)]]]) > > > flink版本:1.10.1 > blink planner,streaming model > > > Thx > | | > Sun.Zhu > | > | > 17626017...@163.com > | > 签名由网易邮箱大师定制 > > >
回复: sqlclient集成hiveCatalog查询kafka表问题
我编译了1.11包 在sql-cli下查询hive的表报如下错误: [ERROR] Could not execute SQL statement. Reason: java.lang.NoClassDefFoundError: org/apache/flink/table/dataformat/BaseRow 查注册的kafka表报: [ERROR] Could not execute SQL statement. Reason: java.lang.ClassNotFoundException: org.apache.flink.table.dataformat.BaseRow 依赖包是从1.10.1下面拷贝的 | | Sun.Zhu | | 17626017...@163.com | 签名由网易邮箱大师定制 在2020年06月13日 11:44,Sun.Zhu<17626017...@163.com> 写道: Got it! Thx,junbao | | Sun.Zhu | | 17626017...@163.com | 签名由网易邮箱大师定制 在2020年06月13日 09:32,zhangjunbao 写道: 1.10.x版本,在hivecatalog下,建表时用proctime as PROCTIME()应该是有bug的, https://issues.apache.org/jira/browse/FLINK-17189 <https://issues.apache.org/jira/browse/FLINK-17189> Best, Junbao Zhang 2020年6月13日 上午12:31,Sun.Zhu <17626017...@163.com> 写道: hi,all 在用sql client集成hiveCatalog时,在hiveCatalog中注册了一个kafka的table ddl如下: | CREATETABLE user_behavior ( user_id BIGINT, item_id BIGINT, category_id BIGINT, behavior STRING, ts TIMESTAMP(3), proctime as PROCTIME(), -- 通过计算列产生一个处理时间列 WATERMARK FOR ts as ts - INTERVAL'5'SECOND-- 在ts上定义watermark,ts成为事件时间列 ) WITH ( 'connector.type' = 'kafka', -- 使用 kafka connector 'connector.version' = 'universal', -- kafka 版本,universal 支持 0.11 以上的版本 'connector.topic' = 'user_behavior', -- kafka topic 'connector.startup-mode' = 'earliest-offset', -- 从起始 offset 开始读取 'connector.properties.zookeeper.connect' = 'localhost:2181', -- zookeeper 地址 'connector.properties.bootstrap.servers' = 'localhost:9092', -- kafka broker 地址 'format.type' = 'json'-- 数据源格式为 json ); | 在查询时select * from user_behavior;报错如下: [ERROR] Could not execute SQL statement. Reason: java.lang.AssertionError: Conversion to relational algebra failed to preserve datatypes: validated type: RecordType(BIGINT user_id, BIGINT item_id, BIGINT category_id, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" behavior, TIME ATTRIBUTE(ROWTIME) ts, TIMESTAMP(3) NOT NULL proctime) NOT NULL converted type: RecordType(BIGINT user_id, BIGINT item_id, BIGINT category_id, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" behavior, TIME ATTRIBUTE(ROWTIME) ts, TIME ATTRIBUTE(PROCTIME) NOT NULL proctime) NOT NULL rel: LogicalProject(user_id=[$0], item_id=[$1], category_id=[$2], behavior=[$3], ts=[$4], proctime=[$5]) LogicalWatermarkAssigner(rowtime=[ts], watermark=[-($4, 5000:INTERVAL SECOND)]) LogicalProject(user_id=[$0], item_id=[$1], category_id=[$2], behavior=[$3], ts=[$4], proctime=[PROCTIME()]) LogicalTableScan(table=[[myhive, my_db, user_behavior, source: [KafkaTableSource(user_id, item_id, category_id, behavior, ts)]]]) flink版本:1.10.1 blink planner,streaming model Thx | | Sun.Zhu | | 17626017...@163.com | 签名由网易邮箱大师定制
Re: flink sql sink hbase failed
好像不需要改源码 'connector.version' = ‘1.4.3’ 也可以往2.x版本里写 | | Sun.Zhu | | 17626017...@163.com | 签名由网易邮箱大师定制 On 06/15/2020 19:22,Zhou Zach wrote: 改了源码,可以了 在 2020-06-15 16:17:46,"Leonard Xu" 写道: Hi 在 2020年6月15日,15:36,Zhou Zach 写道: 'connector.version' expects '1.4.3', but is '2.1.0' Hbase connector只支持1.4.3的版本,其他不支持,但之前看有社区用户用1.4.3的connector写入高版本的case,你可以试下。 祝好 Leonard Xu
??????Flink??????????????????
1: Flink?? --try catch??checkpoint?? 2??checkpoint??savepointsavepoint?? ?? 3?? ??1??flinksql??1.11??format.ignore-parse-errors | | Sun.Zhu | | 17626017...@163.com | ?? ??2020??06??9?? 13:49??Z-Z ?? Hi?? ?? ??Flink??(NullPointer??)checkpoint??savepoint?? 1: Flink?? 2??checkpoint??savepointsavepoint?? 3??
回复: Flink 1.11 什么时候正式发布呢
据说是6月下旬 | | Sun.Zhu | | 17626017...@163.com | 签名由网易邮箱大师定制 在2020年06月9日 11:13,zhipengchen 写道: +1 发送自 Windows 10 版邮件应用 发件人: a773807...@gmail.com 发送时间: 2020年6月9日 10:53 收件人: user-zh 主题: 回复: Flink 1.11 什么时候正式发布呢 +1 a773807...@gmail.com 发件人: hyangvv 发送时间: 2020-06-09 10:52 收件人: user-zh 主题: Flink 1.11 什么时候正式发布呢 hi,flink项目的大神们,能透漏下 Flink1.11大概什么时候正式发布呢。
回复: sqlclient集成hiveCatalog查询kafka表问题
Got it! Thx,junbao | | Sun.Zhu | | 17626017...@163.com | 签名由网易邮箱大师定制 在2020年06月13日 09:32,zhangjunbao 写道: 1.10.x版本,在hivecatalog下,建表时用proctime as PROCTIME()应该是有bug的, https://issues.apache.org/jira/browse/FLINK-17189 <https://issues.apache.org/jira/browse/FLINK-17189> Best, Junbao Zhang 2020年6月13日 上午12:31,Sun.Zhu <17626017...@163.com> 写道: hi,all 在用sql client集成hiveCatalog时,在hiveCatalog中注册了一个kafka的table ddl如下: | CREATETABLE user_behavior ( user_id BIGINT, item_id BIGINT, category_id BIGINT, behavior STRING, ts TIMESTAMP(3), proctime as PROCTIME(), -- 通过计算列产生一个处理时间列 WATERMARK FOR ts as ts - INTERVAL'5'SECOND-- 在ts上定义watermark,ts成为事件时间列 ) WITH ( 'connector.type' = 'kafka', -- 使用 kafka connector 'connector.version' = 'universal', -- kafka 版本,universal 支持 0.11 以上的版本 'connector.topic' = 'user_behavior', -- kafka topic 'connector.startup-mode' = 'earliest-offset', -- 从起始 offset 开始读取 'connector.properties.zookeeper.connect' = 'localhost:2181', -- zookeeper 地址 'connector.properties.bootstrap.servers' = 'localhost:9092', -- kafka broker 地址 'format.type' = 'json'-- 数据源格式为 json ); | 在查询时select * from user_behavior;报错如下: [ERROR] Could not execute SQL statement. Reason: java.lang.AssertionError: Conversion to relational algebra failed to preserve datatypes: validated type: RecordType(BIGINT user_id, BIGINT item_id, BIGINT category_id, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" behavior, TIME ATTRIBUTE(ROWTIME) ts, TIMESTAMP(3) NOT NULL proctime) NOT NULL converted type: RecordType(BIGINT user_id, BIGINT item_id, BIGINT category_id, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" behavior, TIME ATTRIBUTE(ROWTIME) ts, TIME ATTRIBUTE(PROCTIME) NOT NULL proctime) NOT NULL rel: LogicalProject(user_id=[$0], item_id=[$1], category_id=[$2], behavior=[$3], ts=[$4], proctime=[$5]) LogicalWatermarkAssigner(rowtime=[ts], watermark=[-($4, 5000:INTERVAL SECOND)]) LogicalProject(user_id=[$0], item_id=[$1], category_id=[$2], behavior=[$3], ts=[$4], proctime=[PROCTIME()]) LogicalTableScan(table=[[myhive, my_db, user_behavior, source: [KafkaTableSource(user_id, item_id, category_id, behavior, ts)]]]) flink版本:1.10.1 blink planner,streaming model Thx | | Sun.Zhu | | 17626017...@163.com | 签名由网易邮箱大师定制
sqlclient集成hiveCatalog查询kafka表问题
hi,all 在用sql client集成hiveCatalog时,在hiveCatalog中注册了一个kafka的table ddl如下: | CREATETABLE user_behavior ( user_id BIGINT, item_id BIGINT, category_id BIGINT, behavior STRING, ts TIMESTAMP(3), proctime as PROCTIME(), -- 通过计算列产生一个处理时间列 WATERMARK FOR ts as ts - INTERVAL'5'SECOND-- 在ts上定义watermark,ts成为事件时间列 ) WITH ( 'connector.type' = 'kafka', -- 使用 kafka connector 'connector.version' = 'universal', -- kafka 版本,universal 支持 0.11 以上的版本 'connector.topic' = 'user_behavior', -- kafka topic 'connector.startup-mode' = 'earliest-offset', -- 从起始 offset 开始读取 'connector.properties.zookeeper.connect' = 'localhost:2181', -- zookeeper 地址 'connector.properties.bootstrap.servers' = 'localhost:9092', -- kafka broker 地址 'format.type' = 'json'-- 数据源格式为 json ); | 在查询时select * from user_behavior;报错如下: [ERROR] Could not execute SQL statement. Reason: java.lang.AssertionError: Conversion to relational algebra failed to preserve datatypes: validated type: RecordType(BIGINT user_id, BIGINT item_id, BIGINT category_id, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" behavior, TIME ATTRIBUTE(ROWTIME) ts, TIMESTAMP(3) NOT NULL proctime) NOT NULL converted type: RecordType(BIGINT user_id, BIGINT item_id, BIGINT category_id, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" behavior, TIME ATTRIBUTE(ROWTIME) ts, TIME ATTRIBUTE(PROCTIME) NOT NULL proctime) NOT NULL rel: LogicalProject(user_id=[$0], item_id=[$1], category_id=[$2], behavior=[$3], ts=[$4], proctime=[$5]) LogicalWatermarkAssigner(rowtime=[ts], watermark=[-($4, 5000:INTERVAL SECOND)]) LogicalProject(user_id=[$0], item_id=[$1], category_id=[$2], behavior=[$3], ts=[$4], proctime=[PROCTIME()]) LogicalTableScan(table=[[myhive, my_db, user_behavior, source: [KafkaTableSource(user_id, item_id, category_id, behavior, ts)]]]) flink版本:1.10.1 blink planner,streaming model Thx | | Sun.Zhu | | 17626017...@163.com | 签名由网易邮箱大师定制
??????flink 1.9 ????????????????
Hi,star KafkaConnectorupsert[1] [1]https://mp.weixin.qq.com/s/MSs7HSaegyWWU3Fig2PYYA | | Sun.Zhu | | 17626017...@163.com | ?? ??2020??06??3?? 14:47??star<3149768...@qq.com> ?? ??toRetractStreamkafka?? ??kafka??flink ??RetractStream
回复:flink sql 窗口场景的问题
hi 你是要每条数据都计算当前5分钟内的聚合值吗?如果是这样的话可以考虑使用over window | | Sun.Zhu | | 17626017...@163.com | 签名由网易邮箱大师定制 在2020年06月3日 02:56,steven chen 写道: hi : 我现在遇到有这样一个场景,我们需要实时去统计5分和30分的粒度,flink sql 窗口使用了处理时间滑动窗口方式 但是都是只有5分结束的时候才能把聚合结果输出,这个不满足我们需求,有没有方式可以直接实时输出结果,比如18:02 的统计+1+1 都能直接落在18:00-18:05的窗口上,并每次+1都能实时输出,而不是等到窗口结束才sink 到mysql .30分钟我同样
回复:flink on yarn报错 怎么获取
Hi,air 可以通过日志采集来收集异常日志,然后统一展示并监控告警。 | | Sun.Zhu | | 17626017...@163.com | 签名由网易邮箱大师定制 在2020年06月2日 14:05,阿华田 写道: 这种情况需要对flink任务进行监控 获取flink的任务状态 | | 阿华田 | | a15733178...@163.com | 签名由网易邮箱大师定制 在2020年06月2日 14:03,air23 写道: 今天发现taskmanagers报json解析失败 他一起在重启 但是我们这边是监控yarn 任务级别的。像这种task 里面报错。yarn任务又不会挂掉。应该怎么去做监控。才能得知 程序后台有问题 谢谢
??????flink-1.10 ????hdfs????????????????????????
Hi,kcz inprogressfinished??[1]??RollingPolicy inprogressfinished [1]https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/streamfile_sink.html#part-file-lifecycle [2]https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/streamfile_sink.html#rolling-policy Best Sun.Zhu | | Sun.Zhu | | 17626017...@163.com | ?? ??2020??06??2?? 19:20??kcz<573693...@qq.com> ?? ?? String path = "hdfs://HACluster/user/flink/test-1/2020-05-29--15/"; StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); FileInputFormat fileInputFormat = new TextInputFormat(new Path(path)); fileInputFormat.setNestedFileEnumeration(true); env.readFile(fileInputFormat, path).print(); env.execute();hdfs??/user/flink/test-1/2020-05-29--15/.part-0-0.inprogress.6c12fe72-5602-4458-b29f-c8c8b4a7b73b(??)/user/flink/test-1/2020-05-29--15/.part-1-0.inprogress.34b1d5ff-cf0d-4209-b409-21920b12327dflink??
回复:toAppendStream 类型不匹配问题
好的,我试试,感谢 | | Sun.Zhu | | 邮箱:17626017...@163.com | Signature is customized by Netease Mail Master 在2020年05月04日 11:22,Jark Wu 写道: 看起来是一个已经修复的 bug (FLINK-16108)。 你可以用正在 RC 的 release-1.10.1 再试下吗? https://dist.apache.org/repos/dist/dev/flink/flink-1.10.1-rc2/ Best, Jark On Mon, 4 May 2020 at 01:01, 祝尚 <17626017...@163.com> wrote: > 参考jark老师博客里的demo,写了个table api/sql的程序,在table转appendStream时报错 > flink版本1.10 > 代码如下: > public static void main(String[] args) throws Exception { > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > //以后版本会将old planner移除 > EnvironmentSettings settings = > EnvironmentSettings.newInstance().useBlinkPlanner().build(); > StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, > settings); > tableEnv.sqlUpdate("CREATE TABLE user_behavior (\n" + > "user_id BIGINT,\n" + > "item_id BIGINT,\n" + > "category_id BIGINT,\n" + > "behavior STRING,\n" + > "ts TIMESTAMP(3),\n" + > "proctime as PROCTIME(), -- 通过计算列产生一个处理时间列\n" + > "WATERMARK FOR ts as ts - INTERVAL '5' SECOND -- > 在ts上定义watermark,ts成为事件时间列\n" + > ") WITH (\n" + > "'connector.type' = 'kafka', -- 使用 kafka connector\n" + > "'connector.version' = 'universal', -- kafka 版本,universal > 支持 0.11 以上的版本\n" + > "'connector.topic' = 'user_behavior', -- kafka topic\n" + > "'connector.startup-mode' = 'earliest-offset', -- 从起始 > offset 开始读取\n" + > "'connector.properties.zookeeper.connect' = > 'localhost:2181', -- zookeeper 地址\n" + > "'connector.properties.bootstrap.servers' = > 'localhost:9092', -- kafka broker 地址\n" + > "'format.type' = 'json' -- 数据源格式为 json\n" + > ")"); > Table table1 = tableEnv.sqlQuery("select > user_id,item_id,category_id,behavior,ts," + > "proctime from user_behavior where behavior='buy'"); > tableEnv.toAppendStream(table1, Behavior.class).print(); > env.execute(); > > } > > public class Behavior { > public Long user_id; > public Long item_id; > public Long category_id; > public String behavior; > public Timestamp ts; > public Timestamp proctime; > > > @Override > public String toString() { > return "Behavior{" + > "user_id=" + user_id + > ", item_id=" + item_id + > ", category_id=" + category_id + > ", behavior='" + behavior + '\'' + > ", ts=" + ts + > ", proctime=" + proctime + > '}'; > } > } > 报错如下: > Exception in thread "main" org.apache.flink.table.api.ValidationException: > Field types of query result and registered TableSink do not match. > Query schema: [user_id: BIGINT, item_id: BIGINT, category_id: BIGINT, > behavior: STRING, ts: TIMESTAMP(3) *ROWTIME*, proctime: TIMESTAMP(3) NOT > NULL *PROCTIME*] > Sink schema: [behavior: STRING, category_id: BIGINT, item_id: BIGINT, > proctime: TIMESTAMP(3), ts: TIMESTAMP(3), user_id: BIGINT] > at > org.apache.flink.table.planner.sinks.TableSinkUtils$.validateSchemaAndApplyImplicitCast(TableSinkUtils.scala:96) > at > org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:229) > at > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150) > at > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at scala.collection.Iterator$class.foreach(Iterator.scala:891) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > at > scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at > scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > at > org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:150) > at > org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:351) > at > org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:259) > at > org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:250) > at sql.KafkaSourceTable.main(KafkaSourceTable.java:35) > > pojo的类型定义是和source table字段类型是一致的, > 为什么还会校验 NOT NULL *PROCTIME* ,*ROWTIME*?
回复:问题请教-flinksql的kafkasource方面
嗯是的,都设置成小于等于partition数 | | Sun.Zhu | | 邮箱:17626017...@163.com | Signature is customized by Netease Mail Master 在2020年04月21日 00:28,Jark Wu 写道: Hi, 你可以将 kafka 并行度设置成等于 kafka partition 个数。这个方式肯定能 work,且不浪费 task 资源。 Best, Jark On Mon, 20 Apr 2020 at 22:33, Benchao Li wrote: > 我对checkpoint这块不是很了解。不过subtask标记为idle跟finish还是有区别的吧。 > > 祝尚 <17626017...@163.com> 于2020年4月20日周一 下午10:29写道: > > > 我们是1.8版本,但是这段源码应该是没变把 > > // check if all tasks that we need to trigger are running. > > // if not, abort the checkpoint > > Execution[] executions = new Execution[tasksToTrigger.length]; > > for (int i = 0; i < tasksToTrigger.length; i++) { > >Execution ee = tasksToTrigger[i].getCurrentExecutionAttempt(); > >if (ee == null) { > > LOG.info("Checkpoint triggering task {} of job {} is not being > > executed at the moment. Aborting checkpoint.", > > tasksToTrigger[i].getTaskNameWithSubtaskIndex(), > > job); > > throw new > > > CheckpointException(CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING); > >} else if (ee.getState() == ExecutionState.RUNNING) { > > executions[i] = ee; > >} else { > > LOG.info("Checkpoint triggering task {} of job {} is not in state > {} > > but {} instead. Aborting checkpoint.", > > tasksToTrigger[i].getTaskNameWithSubtaskIndex(), > > job, > > ExecutionState.RUNNING, > > ee.getState()); > > throw new > > > CheckpointException(CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING); > >} > > } > > 还是我理解的不对 > > > > > 2020年4月20日 下午6:21,Benchao Li 写道: > > > > > > 你们用的是哪个版本?我们用的是1.9,还没有遇到过这个问题呢。 > > > > > > Sun.Zhu <17626017...@163.com> 于2020年4月20日周一 下午5:43写道: > > > > > >> > > > 我们生产上遇到过这个问题,分不到partition的source短暂时间后变成了finished状态,在做checkpoint时会检查所有executor的状态是不是running,否则将不做checkpoint > > >> ,源码CheckpointCoordinator#triggerCheckpoint也有说明 > > >> > > >> > > >> > > >> > > >> > > >> | | > > >> Sun.Zhu > > >> | > > >> | > > >> 邮箱:17626017...@163.com > > >> | > > >> > > >> Signature is customized by Netease Mail Master > > >> > > >> 在2020年04月20日 10:37,Benchao Li 写道: > > >> 应该是不会的。分配不到partition的source会标记为idle状态。 > > >> > > >> Sun.Zhu <17626017...@163.com> 于2020年4月20日周一 上午10:28写道: > > >> > > >>> Hi,benchao,source并发度大于partition数的话,会导致不做checkpoint的问题吧 > > >>> > > >>> > > >>> > > >>> > > >>> | | > > >>> Sun.Zhu > > >>> | > > >>> | > > >>> 邮箱:17626017...@163.com > > >>> | > > >>> > > >>> Signature is customized by Netease Mail Master > > >>> > > >>> 在2020年04月19日 22:43,人生若只如初见 写道: > > >>> 嗯嗯,十分感谢 > > >>> > > >>> > > >>> > > >>> > > >>> --原始邮件-- > > >>> 发件人:"Benchao Li" > >>> 发送时间:2020年4月19日(星期天) 晚上9:25 > > >>> 收件人:"user-zh" > >>> > > >>> 主题:Re: 问题请教-flinksql的kafkasource方面 > > >>> > > >>> > > >>> > > >>> 如果是这种情况,可以让你的source的并发度大于等于kafka partition的数量来避免一下。 > > >>> > > >>> Jark Wu > >>> > > >>> Hi, > > >>> > > >>> 根据你描述的现象,以及提供的代码。我觉得原因应该是数据乱序导致的。 > > >>> 根据你的 Java 代码,数据的 event time > > >>> 不是单调递增的,会有一定程度的乱序,这种乱序在作业正常运行时影响不大(watermark > > >>> 能容忍 5s 乱序). > > >>> 但是在追数据时,由于 flink 目前还没有做到event time 对齐,所以会导致追数据时某些 partition > 进度比某些 > > >>> partition > > >>> 进度快很多的现象, > > >>> > 导致乱序程度拉大(如原先迟到最久的数据时4s,现在可能是10s),所以会导致丢弃的数据更多,也就造成了追数据时,统计值偏低的现象。 > > >>> > > >>> 完美的解决方案还需要等 FLIP-27 的完成。 > > >>> 当前可以通过增加 watermark delay来增大迟到数据的容忍。 > > >>> > > >>> Best, > > >>> Jark > > >>> > > >>> > > >>> On Sat, 18 Apr 2020 at 22:53, 人生若只如初见 > >>> > > >>> 你好 > > >>> > > >>> > > >>> > > >>> > > >> > > > 感谢解答,第一个问题就是当我用插入语句时,如果
回复:问题请教-flinksql的kafkasource方面
我们生产上遇到过这个问题,分不到partition的source短暂时间后变成了finished状态,在做checkpoint时会检查所有executor的状态是不是running,否则将不做checkpoint ,源码CheckpointCoordinator#triggerCheckpoint也有说明 | | Sun.Zhu | | 邮箱:17626017...@163.com | Signature is customized by Netease Mail Master 在2020年04月20日 10:37,Benchao Li 写道: 应该是不会的。分配不到partition的source会标记为idle状态。 Sun.Zhu <17626017...@163.com> 于2020年4月20日周一 上午10:28写道: > Hi,benchao,source并发度大于partition数的话,会导致不做checkpoint的问题吧 > > > > > | | > Sun.Zhu > | > | > 邮箱:17626017...@163.com > | > > Signature is customized by Netease Mail Master > > 在2020年04月19日 22:43,人生若只如初见 写道: > 嗯嗯,十分感谢 > > > > > --原始邮件-- > 发件人:"Benchao Li" 发送时间:2020年4月19日(星期天) 晚上9:25 > 收件人:"user-zh" > 主题:Re: 问题请教-flinksql的kafkasource方面 > > > > 如果是这种情况,可以让你的source的并发度大于等于kafka partition的数量来避免一下。 > > Jark Wu > Hi, > > 根据你描述的现象,以及提供的代码。我觉得原因应该是数据乱序导致的。 > 根据你的 Java 代码,数据的 event time > 不是单调递增的,会有一定程度的乱序,这种乱序在作业正常运行时影响不大(watermark > 能容忍 5s 乱序). > 但是在追数据时,由于 flink 目前还没有做到event time 对齐,所以会导致追数据时某些 partition 进度比某些 > partition > 进度快很多的现象, > 导致乱序程度拉大(如原先迟到最久的数据时4s,现在可能是10s),所以会导致丢弃的数据更多,也就造成了追数据时,统计值偏低的现象。 > > 完美的解决方案还需要等 FLIP-27 的完成。 > 当前可以通过增加 watermark delay来增大迟到数据的容忍。 > > Best, > Jark > > > On Sat, 18 Apr 2020 at 22:53, 人生若只如初见 > 你好 > > > > 感谢解答,第一个问题就是当我用插入语句时,如果kafka里面已经发了3个多小时的用户数据,他会马上求出前三个小时的购买数,大概每小时只有140个,但统计的之后几个小时的购买数都在1100左右,两者差了好多 > > > > 附: > userbehavior建表语句 > CREATE TABLE user_behavior ( > nbsp; nbsp; user_id BIGINT, > nbsp; nbsp; item_id BIGINT, > nbsp; nbsp; category_id BIGINT, > nbsp; nbsp; behavior STRING, > nbsp; nbsp; ts TIMESTAMP(3), > nbsp; nbsp; proctime as PROCTIME(), nbsp; -- > 通过计算列产生一个处理时间列 > nbsp; nbsp; WATERMARK FOR ts as ts - INTERVAL '5' > SECOND nbsp;-- > 在ts上定义watermark,ts成为事件时间列 > ) WITH ( > nbsp; nbsp; 'connector.type' = 'kafka', nbsp;-- > 使用 kafka connector > nbsp; nbsp; 'connector.version' = 'universal', > nbsp;-- kafka > 版本,universal 支持 0.11 以上的版本 > nbsp; nbsp; 'connector.topic' = 'user_behavior', > nbsp;-- kafka topic > nbsp; nbsp; 'connector.startup-mode' = > 'earliest-offset', nbsp;-- 从起始 > offset 开始读取 > nbsp; nbsp; 'connector.properties.zookeeper.connect' = > ' > 192.168.0.150:2181', nbsp;-- zookeeper 地址 > nbsp; nbsp; 'connector.properties.bootstrap.servers' = > ' > 192.168.0.150:9092', nbsp;-- kafka broker 地址 > nbsp; nbsp; 'format.type' = 'json' nbsp;-- 数据源格式为 > json > ) > > 每小时购买数建表语句 > CREATE TABLE buy_cnt_per_hour (nbsp; > nbsp; nbsp; hour_of_day BIGINT, > nbsp; nbsp; buy_cnt BIGINT > ) WITH ( > nbsp; nbsp; 'connector.type' = 'elasticsearch', -- 使用 > elasticsearch > connector > nbsp; nbsp; 'connector.version' = '6', nbsp;-- > elasticsearch 版本,6 能支持 > es 6+ 以及 7+ 的版本 > nbsp; nbsp; 'connector.hosts' = ' > http://192.168.0.150:9200', nbsp;-- > elasticsearch 地址 > nbsp; nbsp; 'connector.index' = 'buy_cnt_per_hour', > nbsp;-- > elasticsearch 索引名,相当于数据库的表名 > nbsp; nbsp; 'connector.document-type' = > 'user_behavior', -- > elasticsearch 的 type,相当于数据库的库名 > nbsp; nbsp; 'connector.bulk-flush.max-actions' = '1', > nbsp;-- 每条数据都刷新 > nbsp; nbsp; 'format.type' = 'json', nbsp;-- > 输出数据格式 json > nbsp; nbsp; 'update-mode' = 'append' > ) > > 插入语句 > INSERT INTO buy_cnt_per_hournbsp; > SELECT HOUR(TUMBLE_START(ts,INTERVAL '1' > HOUR)),COUNT(*)nbsp; > FROM user_behavior > WHERE behavior = 'buy' > GROUP BY TUMBLE(ts,INTERVAL '1' HOUR) > > kafka数据发送代码 > > import com.alibaba.fastjson.JSONObject; > import org.apache.kafka.clients.producer.KafkaProducer; > import org.apache.kafka.clients.producer.ProducerRecord; > > import java.text.SimpleDateFormat; > import java.util.*; > > > public class UserBehaviorProducer { > public static final String brokerList = " > 192.168.0.150:9092"; > > // public static final > String topic="user_behavior"; > public static final String topic = > "user_behavior"; > > public static void main(String args[]) { > > //配置生产者客户端参数 > //将配置序列化 > Properties > properties = new Properties(); > > properties.put("key.serializer", > "org.apache.kafka.common.serialization.StringSerializer"); > > properties.put("value.serializer", > "org.apache.kafka.common.serialization.StringSerializer"); > > properties.put("bootstrap.servers", brokerList); > > //创建KafkaProducer 实例 > > KafkaProducer KafkaProducer<gt;(p
??????????????-flinksql??kafkasource????
Hi,benchao??source??partition??checkpoint | | Sun.Zhu | | ??17626017...@163.com | Signature is customized by Netease Mail Master ??2020??04??19?? 22:43 ?? ?? ---- ??:"Benchao Li"
回复:keyby的乱序处理
1.未keyby的话,user1 user2 user3的顺序取决于分区策略,比如forward他们还是会在一个subtask上,顺序还是有序的,如果被打散的话就不确定了 2.keyby的话,可以保证同一个key的后续数据保持有序,不同的key不能保证一定有序 | | Sun.Zhu | | 邮箱:17626017...@163.com | Signature is customized by Netease Mail Master 在2020年03月31日 15:39,tingli ke 写道: HI,再次补充一下我的场景,如下图所示: 1、kafka TopicA的Partiton1的数据包含3个user的数据 2、flink在对该分区生成了w1、w2、w3...的watermark 问题来了: 1、w1、w2、w3...的watermark只能保证user1、user2、user3的整体数据的有序处理对吗? 2、在对user1、user2、user3进行keyby后,w1、w2、w3...的watermark能保证user1或者user2或者user3的有序处理吗? 期待大神的回复! jun su 于2020年3月31日周二 下午1:10写道: hi, keyby后的watermark应该是上游多个线程中最小的watermark , 所以数据虽然可能乱序, 但是watermark并不会乱, 不会影响后续的窗口触发 tingli ke 于2020年3月31日周二 上午9:54写道: > 您好, > 针对您的回复,现在的场景是这样子的 > 1、kafka存在多个partition,针对多个Partiton,flink watermark assiger会对每个Partiton > 发射 watermark; > 2、在第一个前提下,水位已经设置好了,还可以在keyby之后在次设置watermark吗? > 3、是否存在可以不用经过第一个前提的方案,直接在keyby之后设置watermark? > > Jimmy Wong 于2020年3月30日周一 下午9:13写道: > > > Hi, > > watermark 可以在 keyBy 后分配,但是最好紧跟 SourceFunction。经过 KeyBy > > 或其他分配策略,可能导致数据更大的延迟(EventTime)。 > > > > > > “想做key化的乱序处理” 这句没太理解,麻烦解释下。 > > > > > > | | > > Jimmy Wong > > | > > | > > wangzmk...@163.com > > | > > 签名由网易邮箱大师定制 > > > > > > 在2020年03月30日 20:58,tingli ke 写道: > > 请教一个问题:kafka-per-partition 的watermark的分配,可以在keyby之后分配吗,想做key化的乱序处理能支持吗 > > > -- Best, Jun Su
Flink有类似storm主动fail的机制吗?
Hi, Flink有类似storm主动fail的机制吗? 没有的话,有什么好的实现方案吗?比如用状态存储失败的记录? 感谢您的回复 | | Sun.Zhu | | 邮箱:17626017...@163.com | Signature is customized by Netease Mail Master