Re:flink s3 checkpoint 一直IN_PROGRESS(100%)直到失败

2022-03-07 Thread Sun.Zhu
图挂了

https://postimg.cc/Z9XdxwSk













在 2022-03-08 14:05:39,"Sun.Zhu" <17626017...@163.com> 写道:

hi all,
flink 1.13.2,将checkpoint 写到S3但是一直成功不了,一直显示IN_PROGRESS,直到超时失败,有大佬遇到过吗?





 

flink s3 checkpoint 一直IN_PROGRESS(100%)直到失败

2022-03-07 Thread Sun.Zhu
hi all,
flink 1.13.2,将checkpoint 写到S3但是一直成功不了,一直显示IN_PROGRESS,直到超时失败,有大佬遇到过吗?

回复: 一个source多个sink的同步问题

2020-07-08 Thread Sun.Zhu


窗口的触发逻辑就是这样的,必须watermark达到了窗口结束时间才会触发,可能10-11点的窗口中的数据最大只有10:59呢
| |
Sun.Zhu
|
|
17626017...@163.com
|
签名由网易邮箱大师定制


在2020年07月7日 18:44,lgs<9925...@qq.com> 写道:
是1个小时才到来。10:00- 11:00的数据,11:01分到来。

但是现在的问题是这个数据来了,我的第一个sink马上就保存到数据库了, 11:02进数据库。但是第二个sink,因为有tumble
window,所以10:00- 11:00的数据,需要到12:01,才会触发这个窗口。



--
Sent from: http://apache-flink.147419.n8.nabble.com/

回复: Flink sql 主动使数据延时一段时间有什么方案

2020-07-03 Thread Sun.Zhu
感谢benchao和forideal的方案,
方法1.使用udf,查不到 sleep 等一下在查
--这个可以尝试
方法2.在 join operator处数据等一会再去查
—我们使用的是flink sql,不是streaming,所以该方案可能行不通
方法3.如果没有 join 上,就把数据发到source,循环join。
--我们这个维表join的场景类似filter的功能,如果关联上则主流数据就不处理了,所以不一定非要join上,只是想延迟一会提升准确率
方法4.如果你的source的mq支持延迟消息,这个应该就不需要Flink做什么了,直接用mq的延迟消息就可以了
—我们的source是kafka,好像不支持kafka的功能
方法5.扩展了下 Flink 的 Source。比如在 kafka connector 中加了一个 time.wait 
的属性,当用户设置了这个属性,就让source 的数据等一会儿发到下游。起到等一会的效果。
--这个方案需要修改源码,也可以试一下


Best
Sun.Zhu
| |
Sun.Zhu
|
|
17626017...@163.com
|
签名由网易邮箱大师定制


在2020年07月3日 23:26,forideal 写道:
Hi




刚刚本超说了四种方法,

方法1.使用udf,查不到 sleep 等一下在查

方法2.在 join operator处数据等一会再去查

方法3.如果没有 join 上,就把数据发到source,循环join。

方法4.如果你的source的mq支持延迟消息,这个应该就不需要Flink做什么了,直接用mq的延迟消息就可以了




上述方法应该都能实现相同的效果。




我们也实现了一种方法。这种方法是扩展了下 Flink 的 Source。比如在 kafka connector 中加了一个 time.wait 
的属性,当用户设置了这个属性,就让source 的数据等一会儿发到下游。起到等一会的效果。




Best forideal

















在 2020-07-03 23:05:06,"Benchao Li"  写道:
奥,对,还有一种思路。如果你的source的mq支持延迟消息,这个应该就不需要Flink做什么了,直接用mq的延迟消息就可以了。

admin <17626017...@163.com> 于2020年7月3日周五 下午5:54写道:

Hi,all
我们有这样一个场景,双流join,一个快流,一个慢流,想让快流等一段时间,目的是能提高join的命中率。
FLink sql有什么方案实现吗?

感谢您的回复



--

Best,
Benchao Li


回复:Flink sql 主动使数据延时一段时间有什么方案

2020-07-03 Thread Sun.Zhu


窗口得用group by,字段会丢失
在2020年07月03日 19:11,kcz 写道:
设置一个窗口时间,如果有需要取最新的,可以再做一下处理。





-- 原始邮件 --
发件人: admin <17626017...@163.com
发送时间: 2020年7月3日 18:01
收件人: user-zh 

回复: 如何快速定位拖慢速度的 operator

2020-06-25 Thread Sun.Zhu
虽然chain在一起,但是可以通过metrics中看出来各个算子的各项指标的
| |
Sun.Zhu
|
|
17626017...@163.com
|
签名由网易邮箱大师定制


在2020年06月25日 00:51,徐骁 写道:
两个方法确实可以, 但是要追踪起来很废时间, 对小白太不友好啊


?????? ??????savepoint????????????????????

2020-06-23 Thread Sun.Zhu
hi??claylin
??uidDAG??


| |
Sun.Zhu
|
|
17626017...@163.com
|
??


??2020??06??23?? 16:29??claylin<1012539...@qq.com> ??
??savepoint??
flatmap??jobgraph
??flatmap??Rebalance??




----
??:"Congxian Qiu"https://issues.apache.org/jira/browse/FLINK-5601
<https://issues.apache.org/jira/browse/FLINK-5601?;

Best,
Congxian


claylin <1012539...@qq.com ??2020??6??23?? 2:44??

 
??watermark??




 --nbsp;nbsp;--
 ??:nbsp;"Congxian Qiu"

回复: sqlclient集成hiveCatalog查询kafka表问题

2020-06-18 Thread Sun.Zhu
非常感谢,我去试试


| |
Sun.Zhu
|
|
17626017...@163.com
|
签名由网易邮箱大师定制


在2020年06月18日 18:13,Rui Li 写道:
需要启动一个独立的metastore server,然后hive.metastore.uris配置的是你metastore server的地址。
最简单的场景,在本地启动metastore server命令:hive --service metastore
hive.metastore.uris设置成:thrift://localhost:9083

更详细的metastore使用方法可以参考hive文档:
https://cwiki.apache.org/confluence/display/Hive/AdminManual+Metastore+Administration

On Thu, Jun 18, 2020 at 5:21 PM Sun.Zhu <17626017...@163.com> wrote:

对应这种改动还是挺大的,有对应的说明文档吗?
hive.metastore.uris 这个需要怎么配置,有样例吗?


| |
Sun.Zhu
|
|
17626017...@163.com
|
签名由网易邮箱大师定制


在2020年06月18日 17:01,Rui Li 写道:

是的,embedded模式需要添加额外的jar包,容易导致依赖冲突。而且生产环境中embedded模式也比较少见,所以在1.11中HiveCatalog已经不允许embedded模式了。

On Thu, Jun 18, 2020 at 4:53 PM Leonard Xu  wrote:


Hi

在 2020年6月18日,16:45,Sun.Zhu <17626017...@163.com> 写道:

Caused by: java.lang.IllegalArgumentException: Embedded metastore is not
allowed. Make sure you have set a valid value for hive.metastore.uris

错误的原因应该是这个,flink 集成 hive 时 不支持embedded metastore的,你的 hive 需要起一个hive
metastore 并在conf文件配置 hive.metastore.uris

Best,
Leonard Xu



--
Best regards!
Rui Li



--
Best regards!
Rui Li


回复: sqlclient集成hiveCatalog查询kafka表问题

2020-06-18 Thread Sun.Zhu
对应这种改动还是挺大的,有对应的说明文档吗?
hive.metastore.uris 这个需要怎么配置,有样例吗?


| |
Sun.Zhu
|
|
17626017...@163.com
|
签名由网易邮箱大师定制


在2020年06月18日 17:01,Rui Li 写道:
是的,embedded模式需要添加额外的jar包,容易导致依赖冲突。而且生产环境中embedded模式也比较少见,所以在1.11中HiveCatalog已经不允许embedded模式了。

On Thu, Jun 18, 2020 at 4:53 PM Leonard Xu  wrote:


Hi

在 2020年6月18日,16:45,Sun.Zhu <17626017...@163.com> 写道:

Caused by: java.lang.IllegalArgumentException: Embedded metastore is not
allowed. Make sure you have set a valid value for hive.metastore.uris

错误的原因应该是这个,flink 集成 hive 时 不支持embedded metastore的,你的 hive 需要起一个hive
metastore 并在conf文件配置 hive.metastore.uris

Best,
Leonard Xu



--
Best regards!
Rui Li


回复: sqlclient集成hiveCatalog查询kafka表问题

2020-06-18 Thread Sun.Zhu
Hi,Rui Li
我把connector的包也替换成1.11的了,结果sql-cli启动报错
Exception in thread "main" org.apache.flink.table.client.SqlClientException: 
Unexpected exception. This is a bug. Please consider filing an issue.
at org.apache.flink.table.client.SqlClient.main(SqlClient.java:213)
Caused by: org.apache.flink.table.client.gateway.SqlExecutionException: Could 
not create execution context.
at 
org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:818)
at 
org.apache.flink.table.client.gateway.local.LocalExecutor.openSession(LocalExecutor.java:230)
at org.apache.flink.table.client.SqlClient.start(SqlClient.java:108)
at org.apache.flink.table.client.SqlClient.main(SqlClient.java:201)
Caused by: java.lang.IllegalArgumentException: Embedded metastore is not 
allowed. Make sure you have set a valid value for hive.metastore.uris
at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:139)
at org.apache.flink.table.catalog.hive.HiveCatalog.(HiveCatalog.java:171)
at org.apache.flink.table.catalog.hive.HiveCatalog.(HiveCatalog.java:157)
at 
org.apache.flink.table.catalog.hive.factories.HiveCatalogFactory.createCatalog(HiveCatalogFactory.java:84)
at 
org.apache.flink.table.client.gateway.local.ExecutionContext.createCatalog(ExecutionContext.java:366)
at 
org.apache.flink.table.client.gateway.local.ExecutionContext.lambda$null$6(ExecutionContext.java:565)
at java.util.HashMap.forEach(HashMap.java:1289)
at 
org.apache.flink.table.client.gateway.local.ExecutionContext.lambda$initializeCatalogs$7(ExecutionContext.java:564)
at 
org.apache.flink.table.client.gateway.local.ExecutionContext.wrapClassLoader(ExecutionContext.java:252)
at 
org.apache.flink.table.client.gateway.local.ExecutionContext.initializeCatalogs(ExecutionContext.java:563)
at 
org.apache.flink.table.client.gateway.local.ExecutionContext.initializeTableEnvironment(ExecutionContext.java:512)
at 
org.apache.flink.table.client.gateway.local.ExecutionContext.(ExecutionContext.java:171)
at 
org.apache.flink.table.client.gateway.local.ExecutionContext.(ExecutionContext.java:124)
at 
org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:807)


hive catalog的配置和1.10.1一样,如下:
catalogs: #[] # empty list
# A typical catalog definition looks like:
  - name: myhive
type: hive
hive-conf-dir: /Users/zhushang/Desktop/software/apache-hive-2.2.0-bin/conf
hive-version: 2.2.0
| |
Sun.Zhu
|
|
17626017...@163.com
|
签名由网易邮箱大师定制


在2020年06月18日 15:46,Rui Li 写道:
第三方包指的是flink-connector-hive这种吗?这些包在build的时候也会打出来的,只不过没有加到flink-dist里。到对应的module里找一下,比如flink-connector-hive会在/flink-connectors/flink-connector-hive/target下面。

On Thu, Jun 18, 2020 at 12:22 PM Jark Wu  wrote:

你可以拿 release-1.11 分支: https://github.com/apache/flink/tree/release-1.11/
自己编译一下:mvn clean install -DskipTests
在 build-target 下就是打出来的 1.11 的分发包内容。

Best,
Jark



On Wed, 17 Jun 2020 at 15:30, Sun.Zhu <17626017...@163.com> wrote:



是的,除了编译出来1.11的包之外,第三方包都拿的1.10.1的版本,但是对应的1.11还没有release吧,从哪里获取呢


| |
Sun.Zhu
|
|
17626017...@163.com
|
签名由网易邮箱大师定制


在2020年06月17日 13:25,Rui Li 写道:
是说把1.10.1的hive connector跟1.11的flink一起用么?如果这样用是肯定有问题的。可以把版本都统一成1.11试试。

On Wed, Jun 17, 2020 at 12:18 PM Sun.Zhu <17626017...@163.com> wrote:

Sqlcli上得报错就上面这点,没有更多得信息了,或者从哪些log里可以看到更多信息




在2020年06月17日 10:27,Benchao Li 写道:
目前这个缺少的依赖是在flink-table-runtime-blink module的,现在这些重构到了flink-table-common
module了。
如果只是connector、format这些用老的版本,应该是没有问题的。
你可以把更详细的报错信息发一下吗?看一下具体是哪个模块还在依赖老版本的flink-table-runtime-blink

Sun.Zhu <17626017...@163.com> 于2020年6月17日周三 上午12:49写道:

是的 除了1.11 编译出来的包之外依赖的包,比如connector的、hivecatalog需要依赖的包,由于1.11
还没有release所以就用的1.10.1版本的,上面两个问题在1.10.1版本下是没有的,升级了1.11报了不知道什么原因,缺少依赖吗?




在2020年06月16日 18:38,Benchao Li 写道:
1.11中对底层数据结构做了一些重构,所以你不可以直接把1.10的jar包拿到1.11里面使用的。
你可以直接使用1.11里面编译出来的jar包来跑应该是没有问题的。

Sun.Zhu <17626017...@163.com> 于2020年6月16日周二 下午6:11写道:

我编译了1.11包
在sql-cli下查询hive的表报如下错误:
[ERROR] Could not execute SQL statement. Reason:
java.lang.NoClassDefFoundError:
org/apache/flink/table/dataformat/BaseRow


查注册的kafka表报:
[ERROR] Could not execute SQL statement. Reason:
java.lang.ClassNotFoundException:
org.apache.flink.table.dataformat.BaseRow


依赖包是从1.10.1下面拷贝的
| |
Sun.Zhu
|
|
17626017...@163.com
|
签名由网易邮箱大师定制


在2020年06月13日 11:44,Sun.Zhu<17626017...@163.com> 写道:
Got it!
Thx,junbao


| |
Sun.Zhu
|
|
17626017...@163.com
|
签名由网易邮箱大师定制


在2020年06月13日 09:32,zhangjunbao 写道:
1.10.x版本,在hivecatalog下,建表时用proctime as PROCTIME()应该是有bug的,
https://issues.apache.org/jira/browse/FLINK-17189 <
https://issues.apache.org/jira/browse/FLINK-17189>

Best,
Junbao Zhang

2020年6月13日 上午12:31,Sun.Zhu <17626017...@163.com> 写道:

hi,all
在用sql client集成hiveCatalog时,在hiveCatalog中注册了一个kafka的table
ddl如下:
|
CREATETABLE user_behavior (
user_id BIGINT,
item_id BIGINT,
category_id BIGINT,
behavior STRING,
ts TIMESTAMP(3),
proctime as PROCTIME(),   -- 通过计算列产生一个处理时间列
WATERMARK FOR ts as ts - INTERVAL'5'SECOND-- 在ts上定义watermark,ts成为事件时

回复:sqlclient集成hiveCatalog查询kafka表问题

2020-06-16 Thread Sun.Zhu
Sqlcli上得报错就上面这点,没有更多得信息了,或者从哪些log里可以看到更多信息




在2020年06月17日 10:27,Benchao Li 写道:
目前这个缺少的依赖是在flink-table-runtime-blink module的,现在这些重构到了flink-table-common
module了。
如果只是connector、format这些用老的版本,应该是没有问题的。
你可以把更详细的报错信息发一下吗?看一下具体是哪个模块还在依赖老版本的flink-table-runtime-blink

Sun.Zhu <17626017...@163.com> 于2020年6月17日周三 上午12:49写道:

> 是的 除了1.11 编译出来的包之外依赖的包,比如connector的、hivecatalog需要依赖的包,由于1.11
> 还没有release所以就用的1.10.1版本的,上面两个问题在1.10.1版本下是没有的,升级了1.11报了不知道什么原因,缺少依赖吗?
>
>
>
>
> 在2020年06月16日 18:38,Benchao Li 写道:
> 1.11中对底层数据结构做了一些重构,所以你不可以直接把1.10的jar包拿到1.11里面使用的。
> 你可以直接使用1.11里面编译出来的jar包来跑应该是没有问题的。
>
> Sun.Zhu <17626017...@163.com> 于2020年6月16日周二 下午6:11写道:
>
> > 我编译了1.11包
> > 在sql-cli下查询hive的表报如下错误:
> > [ERROR] Could not execute SQL statement. Reason:
> > java.lang.NoClassDefFoundError: org/apache/flink/table/dataformat/BaseRow
> >
> >
> > 查注册的kafka表报:
> > [ERROR] Could not execute SQL statement. Reason:
> > java.lang.ClassNotFoundException:
> org.apache.flink.table.dataformat.BaseRow
> >
> >
> > 依赖包是从1.10.1下面拷贝的
> > | |
> > Sun.Zhu
> > |
> > |
> > 17626017...@163.com
> > |
> > 签名由网易邮箱大师定制
> >
> >
> > 在2020年06月13日 11:44,Sun.Zhu<17626017...@163.com> 写道:
> > Got it!
> > Thx,junbao
> >
> >
> > | |
> > Sun.Zhu
> > |
> > |
> > 17626017...@163.com
> > |
> > 签名由网易邮箱大师定制
> >
> >
> > 在2020年06月13日 09:32,zhangjunbao 写道:
> > 1.10.x版本,在hivecatalog下,建表时用proctime as PROCTIME()应该是有bug的,
> > https://issues.apache.org/jira/browse/FLINK-17189 <
> > https://issues.apache.org/jira/browse/FLINK-17189>
> >
> > Best,
> > Junbao Zhang
> >
> > 2020年6月13日 上午12:31,Sun.Zhu <17626017...@163.com> 写道:
> >
> > hi,all
> > 在用sql client集成hiveCatalog时,在hiveCatalog中注册了一个kafka的table
> > ddl如下:
> > |
> > CREATETABLE user_behavior (
> > user_id BIGINT,
> > item_id BIGINT,
> > category_id BIGINT,
> > behavior STRING,
> > ts TIMESTAMP(3),
> > proctime as PROCTIME(),   -- 通过计算列产生一个处理时间列
> > WATERMARK FOR ts as ts - INTERVAL'5'SECOND-- 在ts上定义watermark,ts成为事件时间列
> > ) WITH (
> > 'connector.type' = 'kafka',  -- 使用 kafka connector
> > 'connector.version' = 'universal',  -- kafka 版本,universal 支持 0.11 以上的版本
> > 'connector.topic' = 'user_behavior',  -- kafka topic
> > 'connector.startup-mode' = 'earliest-offset',  -- 从起始 offset 开始读取
> > 'connector.properties.zookeeper.connect' = 'localhost:2181',  --
> zookeeper
> > 地址
> > 'connector.properties.bootstrap.servers' = 'localhost:9092',  -- kafka
> > broker 地址
> > 'format.type' = 'json'-- 数据源格式为 json
> > );
> > |
> > 在查询时select * from user_behavior;报错如下:
> > [ERROR] Could not execute SQL statement. Reason:
> > java.lang.AssertionError: Conversion to relational algebra failed to
> > preserve datatypes:
> > validated type:
> > RecordType(BIGINT user_id, BIGINT item_id, BIGINT category_id,
> > VARCHAR(2147483647) CHARACTER SET "UTF-16LE" behavior, TIME
> > ATTRIBUTE(ROWTIME) ts, TIMESTAMP(3) NOT NULL proctime) NOT NULL
> > converted type:
> > RecordType(BIGINT user_id, BIGINT item_id, BIGINT category_id,
> > VARCHAR(2147483647) CHARACTER SET "UTF-16LE" behavior, TIME
> > ATTRIBUTE(ROWTIME) ts, TIME ATTRIBUTE(PROCTIME) NOT NULL proctime) NOT
> NULL
> > rel:
> > LogicalProject(user_id=[$0], item_id=[$1], category_id=[$2],
> > behavior=[$3], ts=[$4], proctime=[$5])
> > LogicalWatermarkAssigner(rowtime=[ts], watermark=[-($4, 5000:INTERVAL
> > SECOND)])
> > LogicalProject(user_id=[$0], item_id=[$1], category_id=[$2],
> > behavior=[$3], ts=[$4], proctime=[PROCTIME()])
> > LogicalTableScan(table=[[myhive, my_db, user_behavior, source:
> > [KafkaTableSource(user_id, item_id, category_id, behavior, ts)]]])
> >
> >
> > flink版本:1.10.1
> > blink planner,streaming model
> >
> >
> > Thx
> > | |
> > Sun.Zhu
> > |
> > |
> > 17626017...@163.com
> > |
> > 签名由网易邮箱大师定制
> >
> >
> >
>


回复:通过Kafka更新规则

2020-06-16 Thread Sun.Zhu
为什么会重头消费规则呢?没有开启checkpoint吗?重启可以从checkpoint中的offset继续消费kafka中的规则吧




在2020年06月16日 11:57,Ruibin Xing 写道:
我们有一个Flink Job需要一些自定义的规则,希望能够动态添加、更新、删除。规则的数量在百到千条。目前设计的结构是RDB+ Kafka +
Flink。

RDB存储规则的完整快照,以展示给Web应用作增删改查。改动通过Kafka发送消息至Flink,通过BroadcastState传播规则。

目前有一个问题没有解决:如何使用Kafka来传递状态。我想了一下,大概有几种方案:

1. 消息标记Add、Upadte、Delete类型,在Flink中写逻辑来处理状态以和RDB中状态保持一致。
  目前的问题是,每次重启Job,都需要从头读Kafka,来回放状态的更新。Kafka中的状态消息也需要持久保存。担心长期会堆积很多消息。
2. 使用Kafka Compact Log来解决1的问题。这个方案主要是之前没有使用过Compact Log,不清楚会不会有坑。
3.使用方案1,但是启动时Flink从RDB拉取全量规则。
4. 规则更新后Kafka消息发送全量规则,启动时Flink只拉取最新一条消息。

  各位大佬是否有经验可以分享,怎么处理是比较科学的?不胜感激!


回复:如何做Flink Stream的性能测试

2020-06-16 Thread Sun.Zhu
Hi
1.11 版本内置了DataGen、print、Blackhole的connector用来辅助功能测试,性能测试,线上观察,欢迎试用




在2020年06月16日 09:26,aven.wu 写道:
各位好;
  最近我想测试一下我的程序处理性能如何。请问有什么工具、或者应该通过什么方法来获得一个比较准确的测试结果。
   我的场景包含从kafka读取,flink 处理(有查询es做维表关联),处理结果输出到ES 和 Kafka。
Best
Aven


回复:sqlclient集成hiveCatalog查询kafka表问题

2020-06-16 Thread Sun.Zhu
是的 除了1.11 编译出来的包之外依赖的包,比如connector的、hivecatalog需要依赖的包,由于1.11 
还没有release所以就用的1.10.1版本的,上面两个问题在1.10.1版本下是没有的,升级了1.11报了不知道什么原因,缺少依赖吗?




在2020年06月16日 18:38,Benchao Li 写道:
1.11中对底层数据结构做了一些重构,所以你不可以直接把1.10的jar包拿到1.11里面使用的。
你可以直接使用1.11里面编译出来的jar包来跑应该是没有问题的。

Sun.Zhu <17626017...@163.com> 于2020年6月16日周二 下午6:11写道:

> 我编译了1.11包
> 在sql-cli下查询hive的表报如下错误:
> [ERROR] Could not execute SQL statement. Reason:
> java.lang.NoClassDefFoundError: org/apache/flink/table/dataformat/BaseRow
>
>
> 查注册的kafka表报:
> [ERROR] Could not execute SQL statement. Reason:
> java.lang.ClassNotFoundException: org.apache.flink.table.dataformat.BaseRow
>
>
> 依赖包是从1.10.1下面拷贝的
> | |
> Sun.Zhu
> |
> |
> 17626017...@163.com
> |
> 签名由网易邮箱大师定制
>
>
> 在2020年06月13日 11:44,Sun.Zhu<17626017...@163.com> 写道:
> Got it!
> Thx,junbao
>
>
> | |
> Sun.Zhu
> |
> |
> 17626017...@163.com
> |
> 签名由网易邮箱大师定制
>
>
> 在2020年06月13日 09:32,zhangjunbao 写道:
> 1.10.x版本,在hivecatalog下,建表时用proctime as PROCTIME()应该是有bug的,
> https://issues.apache.org/jira/browse/FLINK-17189 <
> https://issues.apache.org/jira/browse/FLINK-17189>
>
> Best,
> Junbao Zhang
>
> 2020年6月13日 上午12:31,Sun.Zhu <17626017...@163.com> 写道:
>
> hi,all
> 在用sql client集成hiveCatalog时,在hiveCatalog中注册了一个kafka的table
> ddl如下:
> |
> CREATETABLE user_behavior (
> user_id BIGINT,
> item_id BIGINT,
> category_id BIGINT,
> behavior STRING,
> ts TIMESTAMP(3),
> proctime as PROCTIME(),   -- 通过计算列产生一个处理时间列
> WATERMARK FOR ts as ts - INTERVAL'5'SECOND-- 在ts上定义watermark,ts成为事件时间列
> ) WITH (
> 'connector.type' = 'kafka',  -- 使用 kafka connector
> 'connector.version' = 'universal',  -- kafka 版本,universal 支持 0.11 以上的版本
> 'connector.topic' = 'user_behavior',  -- kafka topic
> 'connector.startup-mode' = 'earliest-offset',  -- 从起始 offset 开始读取
> 'connector.properties.zookeeper.connect' = 'localhost:2181',  -- zookeeper
> 地址
> 'connector.properties.bootstrap.servers' = 'localhost:9092',  -- kafka
> broker 地址
> 'format.type' = 'json'-- 数据源格式为 json
> );
> |
> 在查询时select * from user_behavior;报错如下:
> [ERROR] Could not execute SQL statement. Reason:
> java.lang.AssertionError: Conversion to relational algebra failed to
> preserve datatypes:
> validated type:
> RecordType(BIGINT user_id, BIGINT item_id, BIGINT category_id,
> VARCHAR(2147483647) CHARACTER SET "UTF-16LE" behavior, TIME
> ATTRIBUTE(ROWTIME) ts, TIMESTAMP(3) NOT NULL proctime) NOT NULL
> converted type:
> RecordType(BIGINT user_id, BIGINT item_id, BIGINT category_id,
> VARCHAR(2147483647) CHARACTER SET "UTF-16LE" behavior, TIME
> ATTRIBUTE(ROWTIME) ts, TIME ATTRIBUTE(PROCTIME) NOT NULL proctime) NOT NULL
> rel:
> LogicalProject(user_id=[$0], item_id=[$1], category_id=[$2],
> behavior=[$3], ts=[$4], proctime=[$5])
> LogicalWatermarkAssigner(rowtime=[ts], watermark=[-($4, 5000:INTERVAL
> SECOND)])
> LogicalProject(user_id=[$0], item_id=[$1], category_id=[$2],
> behavior=[$3], ts=[$4], proctime=[PROCTIME()])
> LogicalTableScan(table=[[myhive, my_db, user_behavior, source:
> [KafkaTableSource(user_id, item_id, category_id, behavior, ts)]]])
>
>
> flink版本:1.10.1
> blink planner,streaming model
>
>
> Thx
> | |
> Sun.Zhu
> |
> |
> 17626017...@163.com
> |
> 签名由网易邮箱大师定制
>
>
>


回复: sqlclient集成hiveCatalog查询kafka表问题

2020-06-16 Thread Sun.Zhu
我编译了1.11包
在sql-cli下查询hive的表报如下错误:
[ERROR] Could not execute SQL statement. Reason:
java.lang.NoClassDefFoundError: org/apache/flink/table/dataformat/BaseRow


查注册的kafka表报:
[ERROR] Could not execute SQL statement. Reason:
java.lang.ClassNotFoundException: org.apache.flink.table.dataformat.BaseRow


依赖包是从1.10.1下面拷贝的
| |
Sun.Zhu
|
|
17626017...@163.com
|
签名由网易邮箱大师定制


在2020年06月13日 11:44,Sun.Zhu<17626017...@163.com> 写道:
Got it!
Thx,junbao


| |
Sun.Zhu
|
|
17626017...@163.com
|
签名由网易邮箱大师定制


在2020年06月13日 09:32,zhangjunbao 写道:
1.10.x版本,在hivecatalog下,建表时用proctime as PROCTIME()应该是有bug的,
https://issues.apache.org/jira/browse/FLINK-17189 
<https://issues.apache.org/jira/browse/FLINK-17189>

Best,
Junbao Zhang

2020年6月13日 上午12:31,Sun.Zhu <17626017...@163.com> 写道:

hi,all
在用sql client集成hiveCatalog时,在hiveCatalog中注册了一个kafka的table
ddl如下:
|
CREATETABLE user_behavior (
user_id BIGINT,
item_id BIGINT,
category_id BIGINT,
behavior STRING,
ts TIMESTAMP(3),
proctime as PROCTIME(),   -- 通过计算列产生一个处理时间列
WATERMARK FOR ts as ts - INTERVAL'5'SECOND-- 在ts上定义watermark,ts成为事件时间列
) WITH (
'connector.type' = 'kafka',  -- 使用 kafka connector
'connector.version' = 'universal',  -- kafka 版本,universal 支持 0.11 以上的版本
'connector.topic' = 'user_behavior',  -- kafka topic
'connector.startup-mode' = 'earliest-offset',  -- 从起始 offset 开始读取
'connector.properties.zookeeper.connect' = 'localhost:2181',  -- zookeeper 地址
'connector.properties.bootstrap.servers' = 'localhost:9092',  -- kafka broker 地址
'format.type' = 'json'-- 数据源格式为 json
);
|
在查询时select * from user_behavior;报错如下:
[ERROR] Could not execute SQL statement. Reason:
java.lang.AssertionError: Conversion to relational algebra failed to preserve 
datatypes:
validated type:
RecordType(BIGINT user_id, BIGINT item_id, BIGINT category_id, 
VARCHAR(2147483647) CHARACTER SET "UTF-16LE" behavior, TIME ATTRIBUTE(ROWTIME) 
ts, TIMESTAMP(3) NOT NULL proctime) NOT NULL
converted type:
RecordType(BIGINT user_id, BIGINT item_id, BIGINT category_id, 
VARCHAR(2147483647) CHARACTER SET "UTF-16LE" behavior, TIME ATTRIBUTE(ROWTIME) 
ts, TIME ATTRIBUTE(PROCTIME) NOT NULL proctime) NOT NULL
rel:
LogicalProject(user_id=[$0], item_id=[$1], category_id=[$2], behavior=[$3], 
ts=[$4], proctime=[$5])
LogicalWatermarkAssigner(rowtime=[ts], watermark=[-($4, 5000:INTERVAL SECOND)])
LogicalProject(user_id=[$0], item_id=[$1], category_id=[$2], behavior=[$3], 
ts=[$4], proctime=[PROCTIME()])
LogicalTableScan(table=[[myhive, my_db, user_behavior, source: 
[KafkaTableSource(user_id, item_id, category_id, behavior, ts)]]])


flink版本:1.10.1
blink planner,streaming model


Thx
| |
Sun.Zhu
|
|
17626017...@163.com
|
签名由网易邮箱大师定制




Re: flink sql sink hbase failed

2020-06-15 Thread Sun.Zhu
好像不需要改源码
'connector.version' = ‘1.4.3’ 也可以往2.x版本里写
| |
Sun.Zhu
|
|
17626017...@163.com
|
签名由网易邮箱大师定制


On 06/15/2020 19:22,Zhou Zach wrote:
改了源码,可以了

















在 2020-06-15 16:17:46,"Leonard Xu"  写道:
Hi


在 2020年6月15日,15:36,Zhou Zach  写道:

'connector.version' expects '1.4.3', but is '2.1.0'

Hbase connector只支持1.4.3的版本,其他不支持,但之前看有社区用户用1.4.3的connector写入高版本的case,你可以试下。

祝好
Leonard Xu


??????Flink??????????????????

2020-06-13 Thread Sun.Zhu
1: Flink??
--try 
catch??checkpoint??
2??checkpoint??savepointsavepoint??
??
3??
??1??flinksql??1.11??format.ignore-parse-errors


| |
Sun.Zhu
|
|
17626017...@163.com
|
??


??2020??06??9?? 13:49??Z-Z ??
Hi?? ??
  
??Flink??(NullPointer??)checkpoint??savepoint??
  1: Flink??
  
2??checkpoint??savepointsavepoint??
  
3??

回复: Flink 1.11 什么时候正式发布呢

2020-06-13 Thread Sun.Zhu
据说是6月下旬


| |
Sun.Zhu
|
|
17626017...@163.com
|
签名由网易邮箱大师定制


在2020年06月9日 11:13,zhipengchen 写道:
+1

发送自 Windows 10 版邮件应用

发件人: a773807...@gmail.com
发送时间: 2020年6月9日 10:53
收件人: user-zh
主题: 回复: Flink 1.11 什么时候正式发布呢

+1



a773807...@gmail.com

发件人: hyangvv
发送时间: 2020-06-09 10:52
收件人: user-zh
主题: Flink 1.11 什么时候正式发布呢
hi,flink项目的大神们,能透漏下 Flink1.11大概什么时候正式发布呢。



回复: sqlclient集成hiveCatalog查询kafka表问题

2020-06-12 Thread Sun.Zhu
Got it!
Thx,junbao


| |
Sun.Zhu
|
|
17626017...@163.com
|
签名由网易邮箱大师定制


在2020年06月13日 09:32,zhangjunbao 写道:
1.10.x版本,在hivecatalog下,建表时用proctime as PROCTIME()应该是有bug的,
https://issues.apache.org/jira/browse/FLINK-17189 
<https://issues.apache.org/jira/browse/FLINK-17189>

Best,
Junbao Zhang

2020年6月13日 上午12:31,Sun.Zhu <17626017...@163.com> 写道:

hi,all
在用sql client集成hiveCatalog时,在hiveCatalog中注册了一个kafka的table
ddl如下:
|
CREATETABLE user_behavior (
user_id BIGINT,
item_id BIGINT,
category_id BIGINT,
behavior STRING,
ts TIMESTAMP(3),
proctime as PROCTIME(),   -- 通过计算列产生一个处理时间列
WATERMARK FOR ts as ts - INTERVAL'5'SECOND-- 在ts上定义watermark,ts成为事件时间列
) WITH (
'connector.type' = 'kafka',  -- 使用 kafka connector
'connector.version' = 'universal',  -- kafka 版本,universal 支持 0.11 以上的版本
'connector.topic' = 'user_behavior',  -- kafka topic
'connector.startup-mode' = 'earliest-offset',  -- 从起始 offset 开始读取
'connector.properties.zookeeper.connect' = 'localhost:2181',  -- zookeeper 地址
'connector.properties.bootstrap.servers' = 'localhost:9092',  -- kafka broker 地址
'format.type' = 'json'-- 数据源格式为 json
);
|
在查询时select * from user_behavior;报错如下:
[ERROR] Could not execute SQL statement. Reason:
java.lang.AssertionError: Conversion to relational algebra failed to preserve 
datatypes:
validated type:
RecordType(BIGINT user_id, BIGINT item_id, BIGINT category_id, 
VARCHAR(2147483647) CHARACTER SET "UTF-16LE" behavior, TIME ATTRIBUTE(ROWTIME) 
ts, TIMESTAMP(3) NOT NULL proctime) NOT NULL
converted type:
RecordType(BIGINT user_id, BIGINT item_id, BIGINT category_id, 
VARCHAR(2147483647) CHARACTER SET "UTF-16LE" behavior, TIME ATTRIBUTE(ROWTIME) 
ts, TIME ATTRIBUTE(PROCTIME) NOT NULL proctime) NOT NULL
rel:
LogicalProject(user_id=[$0], item_id=[$1], category_id=[$2], behavior=[$3], 
ts=[$4], proctime=[$5])
LogicalWatermarkAssigner(rowtime=[ts], watermark=[-($4, 5000:INTERVAL SECOND)])
LogicalProject(user_id=[$0], item_id=[$1], category_id=[$2], behavior=[$3], 
ts=[$4], proctime=[PROCTIME()])
LogicalTableScan(table=[[myhive, my_db, user_behavior, source: 
[KafkaTableSource(user_id, item_id, category_id, behavior, ts)]]])


flink版本:1.10.1
blink planner,streaming model


Thx
| |
Sun.Zhu
|
|
17626017...@163.com
|
签名由网易邮箱大师定制




sqlclient集成hiveCatalog查询kafka表问题

2020-06-12 Thread Sun.Zhu
hi,all
在用sql client集成hiveCatalog时,在hiveCatalog中注册了一个kafka的table
ddl如下:
|
CREATETABLE user_behavior (
user_id BIGINT,
item_id BIGINT,
category_id BIGINT,
behavior STRING,
ts TIMESTAMP(3),
proctime as PROCTIME(),   -- 通过计算列产生一个处理时间列
WATERMARK FOR ts as ts - INTERVAL'5'SECOND-- 在ts上定义watermark,ts成为事件时间列
) WITH (
'connector.type' = 'kafka',  -- 使用 kafka connector
'connector.version' = 'universal',  -- kafka 版本,universal 支持 0.11 以上的版本
'connector.topic' = 'user_behavior',  -- kafka topic
'connector.startup-mode' = 'earliest-offset',  -- 从起始 offset 开始读取
'connector.properties.zookeeper.connect' = 'localhost:2181',  -- zookeeper 地址
'connector.properties.bootstrap.servers' = 'localhost:9092',  -- kafka broker 地址
'format.type' = 'json'-- 数据源格式为 json
);
|
在查询时select * from user_behavior;报错如下:
[ERROR] Could not execute SQL statement. Reason:
java.lang.AssertionError: Conversion to relational algebra failed to preserve 
datatypes:
validated type:
RecordType(BIGINT user_id, BIGINT item_id, BIGINT category_id, 
VARCHAR(2147483647) CHARACTER SET "UTF-16LE" behavior, TIME ATTRIBUTE(ROWTIME) 
ts, TIMESTAMP(3) NOT NULL proctime) NOT NULL
converted type:
RecordType(BIGINT user_id, BIGINT item_id, BIGINT category_id, 
VARCHAR(2147483647) CHARACTER SET "UTF-16LE" behavior, TIME ATTRIBUTE(ROWTIME) 
ts, TIME ATTRIBUTE(PROCTIME) NOT NULL proctime) NOT NULL
rel:
LogicalProject(user_id=[$0], item_id=[$1], category_id=[$2], behavior=[$3], 
ts=[$4], proctime=[$5])
  LogicalWatermarkAssigner(rowtime=[ts], watermark=[-($4, 5000:INTERVAL 
SECOND)])
LogicalProject(user_id=[$0], item_id=[$1], category_id=[$2], behavior=[$3], 
ts=[$4], proctime=[PROCTIME()])
  LogicalTableScan(table=[[myhive, my_db, user_behavior, source: 
[KafkaTableSource(user_id, item_id, category_id, behavior, ts)]]])


flink版本:1.10.1
blink planner,streaming model


Thx
| |
Sun.Zhu
|
|
17626017...@163.com
|
签名由网易邮箱大师定制



??????flink 1.9 ????????????????

2020-06-06 Thread Sun.Zhu
Hi,star
KafkaConnectorupsert[1]


[1]https://mp.weixin.qq.com/s/MSs7HSaegyWWU3Fig2PYYA
| |
Sun.Zhu
|
|
17626017...@163.com
|
??


??2020??06??3?? 14:47??star<3149768...@qq.com> ??



??toRetractStreamkafka??
??kafka??flink 
??RetractStream






回复:flink sql 窗口场景的问题

2020-06-03 Thread Sun.Zhu
hi
你是要每条数据都计算当前5分钟内的聚合值吗?如果是这样的话可以考虑使用over window


| |
Sun.Zhu
|
|
17626017...@163.com
|
签名由网易邮箱大师定制


在2020年06月3日 02:56,steven chen 写道:
hi :
我现在遇到有这样一个场景,我们需要实时去统计5分和30分的粒度,flink sql 窗口使用了处理时间滑动窗口方式
但是都是只有5分结束的时候才能把聚合结果输出,这个不满足我们需求,有没有方式可以直接实时输出结果,比如18:02 的统计+1+1 
都能直接落在18:00-18:05的窗口上,并每次+1都能实时输出,而不是等到窗口结束才sink 到mysql .30分钟我同样

回复:flink on yarn报错 怎么获取

2020-06-02 Thread Sun.Zhu


Hi,air
可以通过日志采集来收集异常日志,然后统一展示并监控告警。
| |
Sun.Zhu
|
|
17626017...@163.com
|
签名由网易邮箱大师定制


在2020年06月2日 14:05,阿华田 写道:
这种情况需要对flink任务进行监控 获取flink的任务状态


| |
阿华田
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制


在2020年06月2日 14:03,air23 写道:
今天发现taskmanagers报json解析失败 他一起在重启
但是我们这边是监控yarn 任务级别的。像这种task 里面报错。yarn任务又不会挂掉。应该怎么去做监控。才能得知 程序后台有问题
谢谢

??????flink-1.10 ????hdfs????????????????????????

2020-06-02 Thread Sun.Zhu
Hi,kcz
inprogressfinished??[1]??RollingPolicy
inprogressfinished
[1]https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/streamfile_sink.html#part-file-lifecycle

[2]https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/streamfile_sink.html#rolling-policy


Best
Sun.Zhu
| |
Sun.Zhu
|
|
17626017...@163.com
|
??


??2020??06??2?? 19:20??kcz<573693...@qq.com> ??
??
String path = "hdfs://HACluster/user/flink/test-1/2020-05-29--15/";
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
FileInputFormat fileInputFormat = new TextInputFormat(new Path(path));
fileInputFormat.setNestedFileEnumeration(true);
env.readFile(fileInputFormat, path).print();
env.execute();hdfs??/user/flink/test-1/2020-05-29--15/.part-0-0.inprogress.6c12fe72-5602-4458-b29f-c8c8b4a7b73b(??)/user/flink/test-1/2020-05-29--15/.part-1-0.inprogress.34b1d5ff-cf0d-4209-b409-21920b12327dflink??

回复:toAppendStream 类型不匹配问题

2020-05-03 Thread Sun.Zhu
好的,我试试,感谢




| |
Sun.Zhu
|
|
邮箱:17626017...@163.com
|

Signature is customized by Netease Mail Master

在2020年05月04日 11:22,Jark Wu 写道:
看起来是一个已经修复的 bug (FLINK-16108)。
你可以用正在 RC 的 release-1.10.1 再试下吗?
https://dist.apache.org/repos/dist/dev/flink/flink-1.10.1-rc2/

Best,
Jark

On Mon, 4 May 2020 at 01:01, 祝尚 <17626017...@163.com> wrote:

> 参考jark老师博客里的demo,写了个table api/sql的程序,在table转appendStream时报错
> flink版本1.10
> 代码如下:
> public static void main(String[] args) throws Exception {
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> //以后版本会将old planner移除
> EnvironmentSettings settings =
> EnvironmentSettings.newInstance().useBlinkPlanner().build();
> StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,
> settings);
> tableEnv.sqlUpdate("CREATE TABLE user_behavior (\n" +
> "user_id BIGINT,\n" +
> "item_id BIGINT,\n" +
> "category_id BIGINT,\n" +
> "behavior STRING,\n" +
> "ts TIMESTAMP(3),\n" +
> "proctime as PROCTIME(),   -- 通过计算列产生一个处理时间列\n" +
> "WATERMARK FOR ts as ts - INTERVAL '5' SECOND  --
> 在ts上定义watermark,ts成为事件时间列\n" +
> ") WITH (\n" +
> "'connector.type' = 'kafka',  -- 使用 kafka connector\n" +
> "'connector.version' = 'universal',  -- kafka 版本,universal
> 支持 0.11 以上的版本\n" +
> "'connector.topic' = 'user_behavior',  -- kafka topic\n" +
> "'connector.startup-mode' = 'earliest-offset',  -- 从起始
> offset 开始读取\n" +
> "'connector.properties.zookeeper.connect' =
> 'localhost:2181',  -- zookeeper 地址\n" +
> "'connector.properties.bootstrap.servers' =
> 'localhost:9092',  -- kafka broker 地址\n" +
> "'format.type' = 'json'  -- 数据源格式为 json\n" +
> ")");
> Table table1 = tableEnv.sqlQuery("select
> user_id,item_id,category_id,behavior,ts," +
> "proctime from user_behavior where behavior='buy'");
> tableEnv.toAppendStream(table1, Behavior.class).print();
> env.execute();
>
> }
>
> public class Behavior {
> public Long user_id;
> public Long item_id;
> public Long category_id;
> public String behavior;
> public Timestamp ts;
> public Timestamp proctime;
>
>
> @Override
> public String toString() {
> return "Behavior{" +
> "user_id=" + user_id +
> ", item_id=" + item_id +
> ", category_id=" + category_id +
> ", behavior='" + behavior + '\'' +
> ", ts=" + ts +
> ", proctime=" + proctime +
> '}';
> }
> }
> 报错如下:
> Exception in thread "main" org.apache.flink.table.api.ValidationException:
> Field types of query result and registered TableSink  do not match.
> Query schema: [user_id: BIGINT, item_id: BIGINT, category_id: BIGINT,
> behavior: STRING, ts: TIMESTAMP(3) *ROWTIME*, proctime: TIMESTAMP(3) NOT
> NULL *PROCTIME*]
> Sink schema: [behavior: STRING, category_id: BIGINT, item_id: BIGINT,
> proctime: TIMESTAMP(3), ts: TIMESTAMP(3), user_id: BIGINT]
> at
> org.apache.flink.table.planner.sinks.TableSinkUtils$.validateSchemaAndApplyImplicitCast(TableSinkUtils.scala:96)
> at
> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:229)
> at
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
> at
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> at
> scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at
> scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> at scala.collection.AbstractTraversable.map(Traversable.scala:104)
> at
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:150)
> at
> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:351)
> at
> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:259)
> at
> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:250)
> at sql.KafkaSourceTable.main(KafkaSourceTable.java:35)
>
> pojo的类型定义是和source table字段类型是一致的,
> 为什么还会校验 NOT NULL *PROCTIME* ,*ROWTIME*?


回复:问题请教-flinksql的kafkasource方面

2020-04-20 Thread Sun.Zhu
嗯是的,都设置成小于等于partition数




| |
Sun.Zhu
|
|
邮箱:17626017...@163.com
|

Signature is customized by Netease Mail Master

在2020年04月21日 00:28,Jark Wu 写道:
Hi,

你可以将 kafka 并行度设置成等于 kafka partition 个数。这个方式肯定能 work,且不浪费 task 资源。

Best,
Jark

On Mon, 20 Apr 2020 at 22:33, Benchao Li  wrote:

> 我对checkpoint这块不是很了解。不过subtask标记为idle跟finish还是有区别的吧。
>
> 祝尚 <17626017...@163.com> 于2020年4月20日周一 下午10:29写道:
>
> > 我们是1.8版本,但是这段源码应该是没变把
> > // check if all tasks that we need to trigger are running.
> > // if not, abort the checkpoint
> > Execution[] executions = new Execution[tasksToTrigger.length];
> > for (int i = 0; i < tasksToTrigger.length; i++) {
> >Execution ee = tasksToTrigger[i].getCurrentExecutionAttempt();
> >if (ee == null) {
> >   LOG.info("Checkpoint triggering task {} of job {} is not being
> > executed at the moment. Aborting checkpoint.",
> > tasksToTrigger[i].getTaskNameWithSubtaskIndex(),
> > job);
> >   throw new
> >
> CheckpointException(CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING);
> >} else if (ee.getState() == ExecutionState.RUNNING) {
> >   executions[i] = ee;
> >} else {
> >   LOG.info("Checkpoint triggering task {} of job {} is not in state
> {}
> > but {} instead. Aborting checkpoint.",
> > tasksToTrigger[i].getTaskNameWithSubtaskIndex(),
> > job,
> > ExecutionState.RUNNING,
> > ee.getState());
> >   throw new
> >
> CheckpointException(CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING);
> >}
> > }
> > 还是我理解的不对
> >
> > > 2020年4月20日 下午6:21,Benchao Li  写道:
> > >
> > > 你们用的是哪个版本?我们用的是1.9,还没有遇到过这个问题呢。
> > >
> > > Sun.Zhu <17626017...@163.com> 于2020年4月20日周一 下午5:43写道:
> > >
> > >>
> >
> 我们生产上遇到过这个问题,分不到partition的source短暂时间后变成了finished状态,在做checkpoint时会检查所有executor的状态是不是running,否则将不做checkpoint
> > >> ,源码CheckpointCoordinator#triggerCheckpoint也有说明
> > >>
> > >>
> > >>
> > >>
> > >>
> > >> | |
> > >> Sun.Zhu
> > >> |
> > >> |
> > >> 邮箱:17626017...@163.com
> > >> |
> > >>
> > >> Signature is customized by Netease Mail Master
> > >>
> > >> 在2020年04月20日 10:37,Benchao Li 写道:
> > >> 应该是不会的。分配不到partition的source会标记为idle状态。
> > >>
> > >> Sun.Zhu <17626017...@163.com> 于2020年4月20日周一 上午10:28写道:
> > >>
> > >>> Hi,benchao,source并发度大于partition数的话,会导致不做checkpoint的问题吧
> > >>>
> > >>>
> > >>>
> > >>>
> > >>> | |
> > >>> Sun.Zhu
> > >>> |
> > >>> |
> > >>> 邮箱:17626017...@163.com
> > >>> |
> > >>>
> > >>> Signature is customized by Netease Mail Master
> > >>>
> > >>> 在2020年04月19日 22:43,人生若只如初见 写道:
> > >>> 嗯嗯,十分感谢
> > >>>
> > >>>
> > >>>
> > >>>
> > >>> --原始邮件--
> > >>> 发件人:"Benchao Li" > >>> 发送时间:2020年4月19日(星期天) 晚上9:25
> > >>> 收件人:"user-zh" > >>>
> > >>> 主题:Re: 问题请教-flinksql的kafkasource方面
> > >>>
> > >>>
> > >>>
> > >>> 如果是这种情况,可以让你的source的并发度大于等于kafka partition的数量来避免一下。
> > >>>
> > >>> Jark Wu  > >>>
> > >>>  Hi,
> > >>> 
> > >>>  根据你描述的现象,以及提供的代码。我觉得原因应该是数据乱序导致的。
> > >>>  根据你的 Java 代码,数据的 event time
> > >>> 不是单调递增的,会有一定程度的乱序,这种乱序在作业正常运行时影响不大(watermark
> > >>>  能容忍 5s 乱序).
> > >>>  但是在追数据时,由于 flink 目前还没有做到event time 对齐,所以会导致追数据时某些 partition
> 进度比某些
> > >>> partition
> > >>>  进度快很多的现象,
> > >>> 
> 导致乱序程度拉大(如原先迟到最久的数据时4s,现在可能是10s),所以会导致丢弃的数据更多,也就造成了追数据时,统计值偏低的现象。
> > >>> 
> > >>>  完美的解决方案还需要等 FLIP-27 的完成。
> > >>>  当前可以通过增加 watermark delay来增大迟到数据的容忍。
> > >>> 
> > >>>  Best,
> > >>>  Jark
> > >>> 
> > >>> 
> > >>>  On Sat, 18 Apr 2020 at 22:53, 人生若只如初见  > >>> 
> > >>>   你好
> > >>>  
> > >>>  
> > >>> 
> > >>>
> > >>
> >
> 感谢解答,第一个问题就是当我用插入语句时,如果

回复:问题请教-flinksql的kafkasource方面

2020-04-20 Thread Sun.Zhu
我们生产上遇到过这个问题,分不到partition的source短暂时间后变成了finished状态,在做checkpoint时会检查所有executor的状态是不是running,否则将不做checkpoint
 ,源码CheckpointCoordinator#triggerCheckpoint也有说明





| |
Sun.Zhu
|
|
邮箱:17626017...@163.com
|

Signature is customized by Netease Mail Master

在2020年04月20日 10:37,Benchao Li 写道:
应该是不会的。分配不到partition的source会标记为idle状态。

Sun.Zhu <17626017...@163.com> 于2020年4月20日周一 上午10:28写道:

> Hi,benchao,source并发度大于partition数的话,会导致不做checkpoint的问题吧
>
>
>
>
> | |
> Sun.Zhu
> |
> |
> 邮箱:17626017...@163.com
> |
>
> Signature is customized by Netease Mail Master
>
> 在2020年04月19日 22:43,人生若只如初见 写道:
> 嗯嗯,十分感谢
>
>
>
>
> --原始邮件--
>  发件人:"Benchao Li" 发送时间:2020年4月19日(星期天) 晚上9:25
> 收件人:"user-zh"
> 主题:Re: 问题请教-flinksql的kafkasource方面
>
>
>
> 如果是这种情况,可以让你的source的并发度大于等于kafka partition的数量来避免一下。
>
> Jark Wu 
>  Hi,
> 
>  根据你描述的现象,以及提供的代码。我觉得原因应该是数据乱序导致的。
>  根据你的 Java 代码,数据的 event time
> 不是单调递增的,会有一定程度的乱序,这种乱序在作业正常运行时影响不大(watermark
>  能容忍 5s 乱序).
>  但是在追数据时,由于 flink 目前还没有做到event time 对齐,所以会导致追数据时某些 partition 进度比某些
> partition
>  进度快很多的现象,
>  导致乱序程度拉大(如原先迟到最久的数据时4s,现在可能是10s),所以会导致丢弃的数据更多,也就造成了追数据时,统计值偏低的现象。
> 
>  完美的解决方案还需要等 FLIP-27 的完成。
>  当前可以通过增加 watermark delay来增大迟到数据的容忍。
> 
>  Best,
>  Jark
> 
> 
>  On Sat, 18 Apr 2020 at 22:53, 人生若只如初见  
>   你好
>  
>  
> 
> 感谢解答,第一个问题就是当我用插入语句时,如果kafka里面已经发了3个多小时的用户数据,他会马上求出前三个小时的购买数,大概每小时只有140个,但统计的之后几个小时的购买数都在1100左右,两者差了好多
>  
>  
>  
>   附:
>   userbehavior建表语句
>   CREATE TABLE user_behavior (
>   nbsp; nbsp; user_id BIGINT,
>   nbsp; nbsp; item_id BIGINT,
>   nbsp; nbsp; category_id BIGINT,
>   nbsp; nbsp; behavior STRING,
>   nbsp; nbsp; ts TIMESTAMP(3),
>   nbsp; nbsp; proctime as PROCTIME(), nbsp; --
> 通过计算列产生一个处理时间列
>   nbsp; nbsp; WATERMARK FOR ts as ts - INTERVAL '5'
> SECOND nbsp;--
>   在ts上定义watermark,ts成为事件时间列
>   ) WITH (
>   nbsp; nbsp; 'connector.type' = 'kafka', nbsp;--
> 使用 kafka connector
>   nbsp; nbsp; 'connector.version' = 'universal',
> nbsp;-- kafka
>   版本,universal 支持 0.11 以上的版本
>   nbsp; nbsp; 'connector.topic' = 'user_behavior',
> nbsp;-- kafka topic
>   nbsp; nbsp; 'connector.startup-mode' =
> 'earliest-offset', nbsp;-- 从起始
>   offset 开始读取
>   nbsp; nbsp; 'connector.properties.zookeeper.connect' =
> '
>   192.168.0.150:2181', nbsp;-- zookeeper 地址
>   nbsp; nbsp; 'connector.properties.bootstrap.servers' =
> '
>   192.168.0.150:9092', nbsp;-- kafka broker 地址
>   nbsp; nbsp; 'format.type' = 'json' nbsp;-- 数据源格式为
> json
>   )
>  
>   每小时购买数建表语句
>   CREATE TABLE buy_cnt_per_hour (nbsp;
>   nbsp; nbsp; hour_of_day BIGINT,
>   nbsp; nbsp; buy_cnt BIGINT
>   ) WITH (
>   nbsp; nbsp; 'connector.type' = 'elasticsearch', -- 使用
> elasticsearch
>   connector
>   nbsp; nbsp; 'connector.version' = '6', nbsp;--
> elasticsearch 版本,6 能支持
>   es 6+ 以及 7+ 的版本
>   nbsp; nbsp; 'connector.hosts' = '
> http://192.168.0.150:9200', nbsp;--
>   elasticsearch 地址
>   nbsp; nbsp; 'connector.index' = 'buy_cnt_per_hour',
> nbsp;--
>   elasticsearch 索引名,相当于数据库的表名
>   nbsp; nbsp; 'connector.document-type' =
> 'user_behavior', --
>   elasticsearch 的 type,相当于数据库的库名
>   nbsp; nbsp; 'connector.bulk-flush.max-actions' = '1',
> nbsp;-- 每条数据都刷新
>   nbsp; nbsp; 'format.type' = 'json', nbsp;--
> 输出数据格式 json
>   nbsp; nbsp; 'update-mode' = 'append'
>   )
>  
>   插入语句
>   INSERT INTO buy_cnt_per_hournbsp;
>   SELECT HOUR(TUMBLE_START(ts,INTERVAL '1'
> HOUR)),COUNT(*)nbsp;
>   FROM user_behavior
>   WHERE behavior = 'buy'
>   GROUP BY TUMBLE(ts,INTERVAL '1' HOUR)
>  
>   kafka数据发送代码
>  
>   import com.alibaba.fastjson.JSONObject;
>   import org.apache.kafka.clients.producer.KafkaProducer;
>   import org.apache.kafka.clients.producer.ProducerRecord;
>  
>   import java.text.SimpleDateFormat;
>   import java.util.*;
>  
>  
>   public class UserBehaviorProducer {
>   public static final String brokerList = "
> 192.168.0.150:9092";
>  
>   // public static final
> String topic="user_behavior";
>   public static final String topic =
> "user_behavior";
>  
>   public static void main(String args[]) {
>  
>   //配置生产者客户端参数
>   //将配置序列化
>   Properties
> properties = new Properties();
>  
> properties.put("key.serializer",
>   "org.apache.kafka.common.serialization.StringSerializer");
>  
> properties.put("value.serializer",
>   "org.apache.kafka.common.serialization.StringSerializer");
>  
> properties.put("bootstrap.servers", brokerList);
>  
> //创建KafkaProducer 实例
>  
> KafkaProducer   KafkaProducer<gt;(p

??????????????-flinksql??kafkasource????

2020-04-19 Thread Sun.Zhu
Hi,benchao??source??partition??checkpoint




| |
Sun.Zhu
|
|
??17626017...@163.com
|

Signature is customized by Netease Mail Master

??2020??04??19?? 22:43 ??
??
 



----
 ??:"Benchao Li"

回复:keyby的乱序处理

2020-04-03 Thread Sun.Zhu
1.未keyby的话,user1 user2 
user3的顺序取决于分区策略,比如forward他们还是会在一个subtask上,顺序还是有序的,如果被打散的话就不确定了
2.keyby的话,可以保证同一个key的后续数据保持有序,不同的key不能保证一定有序




| |
Sun.Zhu
|
|
邮箱:17626017...@163.com
|

Signature is customized by Netease Mail Master

在2020年03月31日 15:39,tingli ke 写道:
HI,再次补充一下我的场景,如下图所示:
1、kafka TopicA的Partiton1的数据包含3个user的数据
2、flink在对该分区生成了w1、w2、w3...的watermark


问题来了:
1、w1、w2、w3...的watermark只能保证user1、user2、user3的整体数据的有序处理对吗?

2、在对user1、user2、user3进行keyby后,w1、w2、w3...的watermark能保证user1或者user2或者user3的有序处理吗?


期待大神的回复!




jun su  于2020年3月31日周二 下午1:10写道:

hi,
keyby后的watermark应该是上游多个线程中最小的watermark , 所以数据虽然可能乱序, 但是watermark并不会乱,
不会影响后续的窗口触发

tingli ke  于2020年3月31日周二 上午9:54写道:

> 您好,
> 针对您的回复,现在的场景是这样子的
> 1、kafka存在多个partition,针对多个Partiton,flink watermark assiger会对每个Partiton
> 发射 watermark;
> 2、在第一个前提下,水位已经设置好了,还可以在keyby之后在次设置watermark吗?
> 3、是否存在可以不用经过第一个前提的方案,直接在keyby之后设置watermark?
>
> Jimmy Wong  于2020年3月30日周一 下午9:13写道:
>
> > Hi,
> > watermark 可以在 keyBy 后分配,但是最好紧跟 SourceFunction。经过 KeyBy
> > 或其他分配策略,可能导致数据更大的延迟(EventTime)。
> >
> >
> > “想做key化的乱序处理” 这句没太理解,麻烦解释下。
> >
> >
> > | |
> > Jimmy Wong
> > |
> > |
> > wangzmk...@163.com
> > |
> > 签名由网易邮箱大师定制
> >
> >
> > 在2020年03月30日 20:58,tingli ke 写道:
> > 请教一个问题:kafka-per-partition 的watermark的分配,可以在keyby之后分配吗,想做key化的乱序处理能支持吗
> >
>


--
Best,
Jun Su


Flink有类似storm主动fail的机制吗?

2020-03-11 Thread Sun.Zhu
Hi,
Flink有类似storm主动fail的机制吗?
没有的话,有什么好的实现方案吗?比如用状态存储失败的记录?






感谢您的回复
| |
Sun.Zhu
|
|
邮箱:17626017...@163.com
|

Signature is customized by Netease Mail Master