Re: [ANNOUNCE] Flink Table Store Joins Apache Incubator as Apache Paimon(incubating)

2023-03-28 文章 Shuo Cheng
Cool ~ Congrats!

Best Regards,
Shuo

On Mon, Mar 27, 2023 at 5:24 PM Yu Li  wrote:

> Dear Flinkers,
>
>
> As you may have noticed, we are pleased to announce that Flink Table
> Store has joined the Apache Incubator as a separate project called
> Apache Paimon(incubating) [1] [2] [3]. The new project still aims at
> building a streaming data lake platform for high-speed data ingestion,
> change data tracking and efficient real-time analytics, with the
> vision of supporting a larger ecosystem and establishing a vibrant and
> neutral open source community.
>
>
> We would like to thank everyone for their great support and efforts
> for the Flink Table Store project, and warmly welcome everyone to join
> the development and activities of the new project. Apache Flink will
> continue to be one of the first-class citizens supported by Paimon,
> and we believe that the Flink and Paimon communities will maintain
> close cooperation.
>
>
> 亲爱的Flinkers,
>
>
> 正如您可能已经注意到的,我们很高兴地宣布,Flink Table Store 已经正式加入 Apache
> 孵化器独立孵化 [1] [2] [3]。新项目的名字是
> Apache
> Paimon(incubating),仍致力于打造一个支持高速数据摄入、流式数据订阅和高效实时分析的新一代流式湖仓平台。此外,新项目将支持更加丰富的生态,并建立一个充满活力和中立的开源社区。
>
>
> 在这里我们要感谢大家对 Flink Table Store 项目的大力支持和投入,并热烈欢迎大家加入新项目的开发和社区活动。Apache Flink
> 将继续作为 Paimon 支持的主力计算引擎之一,我们也相信 Flink 和 Paimon 社区将继续保持密切合作。
>
>
> Best Regards,
>
> Yu (on behalf of the Apache Flink PMC and Apache Paimon PPMC)
>
>
> 致礼,
>
> 李钰(谨代表 Apache Flink PMC 和 Apache Paimon PPMC)
>
>
> [1] https://paimon.apache.org/
>
> [2] https://github.com/apache/incubator-paimon
>
> [3] https://cwiki.apache.org/confluence/display/INCUBATOR/PaimonProposal
>


Re: 我上报的一个sql bug没人处理怎么办?

2023-03-21 文章 Shuo Cheng
Hi,

如果你知道问题出现在哪儿, 可以自己提个 PR 哦.

Sincerely,
Shuo

On Wed, Mar 22, 2023 at 11:23 AM Jeff  wrote:

> 复制执行我提供的两个sql就一定会复现!
> 不管哪个flink版本一定都会有这个问题,因为它们都是使用calcite 1.26.0。
> 这个问题是这个版本calcite引起的。
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2023-03-22 09:28:17,"Jeff"  写道:
> >bug地址:
> >https://issues.apache.org/jira/browse/FLINK-31375?filter=-2
> >
> >
> >bug详细内容:
> >the values of map are truncated by the CASE WHEN function.
> >// sql
> >create table test (a map) with ('connector'='print');
> >insert into test  select * from (values(case when true then
> map['test','123456789'] else map ['msg_code','0', 'msg_reason', 'abc']
> end));
> >
> >the result:
> >
> >+I[{test=123}]
> >
> >We hope the value of result is '123456789', but I get '123', the length
> is limited by 'abc'.
>


Re: 在计算Window Top-N时,Flink SQL 时间语义不生效

2023-02-24 文章 Shuo Cheng
更乱了哦...可以尝试加个附件或推到 github, 贴个链接

On Fri, Feb 24, 2023 at 4:59 PM wei_yuze  wrote:

>
> 刚才的邮件正文代码出现乱码,现在重新发送。-您好!我在运行Flink程序时遇到了一个问题,特来向各位大佬请教。程序目标:用FlinkSQL求窗口nbsp;Top-5,开一小时的窗口。数据源为Kafka,我分批向Kafka里传入数据。计算出的nbsp;Top-5结果,写入MySQL。问题:一小时窗口设置完全没生效,事件时间和处理时间两种时间语义都测试了。我每向Kafka里传入一批数据,MySQL都会看到五条新增的Top-5数据,可两批源数据之间的时间间隔并没有到一小时。问题代码初步定位:TUMBLE(TABLEwatermarkedTable,DESCRIPTOR(ts),INTERVAL1HOUR)完整源代码:nbsp;nbsp;nbsp;nbsp;finalStreamExecutionEnvironmentstreamExecutionEnvironment=StreamExecutionEnvironment.getExecutionEnvironment();nbsp;nbsp;nbsp;nbsp;//Createtableenvironmentnbsp;nbsp;nbsp;nbsp;StreamTableEnvironmentstreamTableEnvironment=StreamTableEnvironment.create(streamExecutionEnvironment);nbsp;nbsp;nbsp;nbsp;//接入Kafka数据源nbsp;nbsp;nbsp;nbsp;KafkaSourcestringkafkaSource=KafkaSourcenbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;.stringbuilder()nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;.setBootstrapServers(Config.KAFKA_BROKERS)nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;.setTopics(Config.KAFKA_TOPIC_EVENT)nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;.setGroupId(flink-consumer)nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;.setStartingOffsets(OffsetsInitializer.earliest())nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;.setValueOnlyDeserializer(newSimpleStringSchema())nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;.build();nbsp;nbsp;nbsp;nbsp;DataStreamSourcestringstringStream=streamExecutionEnvironmentnbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;.fromSource(nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;kafkaSource,nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;WatermarkStrategy.noWatermarks(),nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;Kafkastringsourcewithoutwatermarknbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;);nbsp;nbsp;nbsp;nbsp;//Deserializestringstreamnbsp;nbsp;nbsp;nbsp;SingleOutputStreamOperatoreventdeserializedStream=stringStreamnbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;.map(nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;newMapFunctionstring,event=(){nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;publicEventmap(StringjsonString)throwsException{nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;SimpleDateFormatsimpleDateFormat=newSimpleDateFormat(-MM-ddHH:mm:ss);nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;ObjectMapperobjectMapper=newObjectMapper();nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;objectMapper.setDateFormat(simpleDateFormat);nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES,false);nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;objectMapper.enable(DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT);nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;EventdeserializedObject=objectMapper.readValue(jsonString,Event.class);nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;returndeserializedObject;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;}nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;});nbsp;nbsp;nbsp;nbsp;SingleOutputStreamOperatoreventwatermarkedStream=deserializedStream.assignTimestampsAndWatermarks(nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;WatermarkStrategynbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;.eventforBoundedOutOfOrderness(Duration.ofSeconds(0L))//nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;.withTimestampAssigner((event,l)-gt;event.getTs().getTime())nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;.withTimestampAssigner(nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;newSerializableTimestampAssignerevent(){nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;@Override
> 

Re: Re: Re: [急] flink sql写数据到带主键表时能确保相同主键的记录都被同一个taskmanager处理吗?

2023-02-23 文章 Shuo Cheng
> 你说的这个参数我看了默认值不是auto吗?需要我显式地指定为force?

Sink upsert materialize would be applied in the following circumstances:
1. `TABLE_EXEC_SINK_UPSERT_MATERIALIZE` set to FORCE and sink's primary key
nonempty.
2. `TABLE_EXEC_SINK_UPSERT_MATERIALIZE` set to AUTO and sink's primary key
doesn't contain upsert keys of the input update stream.

Note: upsert materializing operator use state to resolve disorder problems
which may incur additional performance regression.

Best,
Shuo

On Fri, Feb 24, 2023 at 10:02 AM casel.chen  wrote:

> 你说的这个参数我看了默认值不是auto吗?需要我显式地指定为force?
>
>
> Because of the disorder of ChangeLog data caused by Shuffle in distributed
> system, the data received by Sink may not be the order of global upsert. So
> add upsert materialize operator before upsert sink. It receives the
> upstream changelog records and generate an upsert view for the downstream.
> By default, the materialize operator will be added when a distributed
> disorder occurs on unique keys. You can also choose no
> materialization(NONE) or force materialization(FORCE).
>
> Possible values:
> "NONE"
> "AUTO"
> "FORCE"
>
>
> public static final ConfigOption
> TABLE_EXEC_SINK_UPSERT_MATERIALIZE =
> key("table.exec.sink.upsert-materialize")
> .enumType(UpsertMaterialize.class)
> .defaultValue(UpsertMaterialize.AUTO)
> .withDescription(
> Description.builder()
> .text(
> "Because of the disorder of
> ChangeLog data caused by Shuffle in distributed system, "
> + "the data received
> by Sink may not be the order of global upsert. "
> + "So add upsert
> materialize operator before upsert sink. It receives the "
> + "upstream changelog
> records and generate an upsert view for the downstream.")
> .linebreak()
> .text(
> "By default, the materialize
> operator will be added when a distributed disorder "
> + "occurs on unique
> keys. You can also choose no materialization(NONE) "
> + "or force
> materialization(FORCE).")
> .build());
>
>
>
>
>
> 在 2023-02-22 15:34:27,"Shuo Cheng"  写道:
> >Hi,
> >
> >Re *"如何确保同一主键变更记录(上游接的是cdc数据源)的时间先后顺序性呢?",  *checking out
> >ExecutionConfigOptions.TABLE_EXEC_SINK_UPSERT_MATERIALIZE for details
> about
> >solution of disordering problems in KeyBy shuffling.
> >
> >Best,
> >Shuo
> >
> >On Wed, Feb 22, 2023 at 10:23 AM casel.chen  wrote:
> >
> >>
> >>
> 如果像楼上所说[1]主动keyby将同一主键记录汇聚到同一个TM处理的话,Flink如何确保同一主键变更记录(上游接的是cdc数据源)的时间先后顺序性呢?
> >>
> >>
> >> 在 2023-02-20 09:50:50,"Shengkai Fang"  写道:
> >> >我理解如果sink 表上带有 pk,就会主动 keyby 的[1]。
> >> >
> >> >Best,
> >> >Shengkai
> >> >
> >> >[1]
> >> >
> >>
> https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java#L188
> >> >
> >> >Shammon FY  于2023年2月20日周一 08:41写道:
> >> >
> >> >> Hi
> >> >>
> >> >> 如果join计算的关联字段里没有主键,在将join结果直接写入sink表时,像上面RS提到的,需要自己再增加一次shuffle操作
> >> >>
> >> >> Best,
> >> >> Shammon
> >> >>
> >> >>
> >> >> On Sun, Feb 19, 2023 at 1:43 PM RS  wrote:
> >> >>
> >> >> > Hi,
> >> >> > connector里面配置的主键控制是写入存储的,有些存储在写入数据的时候,能根据主键自动更新去重
> >> >> > 所以我感觉,你这里的应该是想在计算的时候shuffle(写入之前),你应该需要先执行一个 group by
> 主键,然后再执行insert
> >> into
> >> >> >
> >> >> >
> >> >> > Thanks
> >> >> >
> >> >> >
> >> >> >
> >> >> > 在 2023-02-17 15:56:51,"casel.chen"  写道:
> >> >> > >作业场景是kafka cdc数据源关联几张redis维表再和其他流表进行双流regular inner
> >> >> > join,最后将打宽表写入mongodb。使用的是flink 1.13.2 sql模式。开了debug日志。
> >> >> > >测试下来发现相同主键的记录在不同的taskmanager上打了日志(我是在Sink
> >> >> > Function的invoke方法打的日志),该行为导致最终结果表数据不正确。
> >> >> > >
> >> >> > >
> >> >> > >请问:
> >> >> > >flink sql写数据到带主键表时能确保相同主键的记录都被同一个taskmanager处理吗?
> >> >> > >是因为flink版本旧不支持吗?从flink哪个版本开始支持的呢?
> >> >> > >我理解flink
> >> >> >
> >> >>
> >>
> sql结果表上定义主键的目的就是期望按主键进行shuffle,确保相同主键的数据被同一个taskmanager处理以确保正确的变更顺序,不知道我理解得对不对。
> >> >> > >
> >> >> >
> >> >>
> >>
>


Re: Re: [急] flink sql写数据到带主键表时能确保相同主键的记录都被同一个taskmanager处理吗?

2023-02-21 文章 Shuo Cheng
Hi,

Re *"如何确保同一主键变更记录(上游接的是cdc数据源)的时间先后顺序性呢?",  *checking out
ExecutionConfigOptions.TABLE_EXEC_SINK_UPSERT_MATERIALIZE for details about
solution of disordering problems in KeyBy shuffling.

Best,
Shuo

On Wed, Feb 22, 2023 at 10:23 AM casel.chen  wrote:

>
> 如果像楼上所说[1]主动keyby将同一主键记录汇聚到同一个TM处理的话,Flink如何确保同一主键变更记录(上游接的是cdc数据源)的时间先后顺序性呢?
>
>
> 在 2023-02-20 09:50:50,"Shengkai Fang"  写道:
> >我理解如果sink 表上带有 pk,就会主动 keyby 的[1]。
> >
> >Best,
> >Shengkai
> >
> >[1]
> >
> https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java#L188
> >
> >Shammon FY  于2023年2月20日周一 08:41写道:
> >
> >> Hi
> >>
> >> 如果join计算的关联字段里没有主键,在将join结果直接写入sink表时,像上面RS提到的,需要自己再增加一次shuffle操作
> >>
> >> Best,
> >> Shammon
> >>
> >>
> >> On Sun, Feb 19, 2023 at 1:43 PM RS  wrote:
> >>
> >> > Hi,
> >> > connector里面配置的主键控制是写入存储的,有些存储在写入数据的时候,能根据主键自动更新去重
> >> > 所以我感觉,你这里的应该是想在计算的时候shuffle(写入之前),你应该需要先执行一个 group by 主键,然后再执行insert
> into
> >> >
> >> >
> >> > Thanks
> >> >
> >> >
> >> >
> >> > 在 2023-02-17 15:56:51,"casel.chen"  写道:
> >> > >作业场景是kafka cdc数据源关联几张redis维表再和其他流表进行双流regular inner
> >> > join,最后将打宽表写入mongodb。使用的是flink 1.13.2 sql模式。开了debug日志。
> >> > >测试下来发现相同主键的记录在不同的taskmanager上打了日志(我是在Sink
> >> > Function的invoke方法打的日志),该行为导致最终结果表数据不正确。
> >> > >
> >> > >
> >> > >请问:
> >> > >flink sql写数据到带主键表时能确保相同主键的记录都被同一个taskmanager处理吗?
> >> > >是因为flink版本旧不支持吗?从flink哪个版本开始支持的呢?
> >> > >我理解flink
> >> >
> >>
> sql结果表上定义主键的目的就是期望按主键进行shuffle,确保相同主键的数据被同一个taskmanager处理以确保正确的变更顺序,不知道我理解得对不对。
> >> > >
> >> >
> >>
>


Re: flink sql 类型转化遇到转化失败时候可以跳过这条数据嘛

2022-11-20 文章 Shuo Cheng
可以了解下 TRY_CAST 是不是能满足需求.

https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/functions/systemfunctions/#type-conversion-functions

On Sat, Nov 19, 2022 at 4:31 PM kcz <573693...@qq.com.invalid> wrote:

> flink-1.16.0
> flink sql 类型转化遇到转化失败时候可以跳过这条数据嘛?
> 不想任务直接就挂了,数据可以丢掉。
> flink sql 类型转化遇到转化失败时候可以跳过这条数据嘛
>
> kcz
> 573693...@qq.com
>
>
>
> 


Re: Flink SQL 中同时写入多个 sink 时,是否能够保证先后次序

2022-10-12 文章 Shuo Cheng
Flink SQL 自身机制无法保证同一个作业多个 sink 的写入次序。 是否可以考虑从业务逻辑上动手脚,比如写入消息队列 sink 前加个 udf
filter, udf 查询 database,满足条件才写入消息队列,当然这种方式对性能可能有影响。

On Wed, Oct 12, 2022 at 2:41 PM Zhiwen Sun  wrote:

> hi all:
>
> 我们有个场景,需要 Flink SQL 同时写入消息和 database, 后续实时任务消费消息,再次读取 database, 如果消息先于
> database 写入,这就可能导致读取的数据不正确。
>
> 是否有办法保证 database 写入后,再发送消息?
>
> Zhiwen Sun
>


Re: flink-1.14.4 提示内置函数不存在

2022-09-04 文章 Shuo Cheng
CURRENT_DATE 属于 niladic function (无参函数), 不需要加 '()'

On Mon, Sep 5, 2022 at 11:51 AM kcz <573693...@qq.com.invalid> wrote:

> select concat('1','2'),CURRENT_DATE();
> No match found for function signature CURRENT_DATE()。
> 是因为我哪里操作错了吗???
> concat是可以运行的。


Re: 基于savepoint重启作业无法保证端到端一致性

2022-09-02 文章 Shuo Cheng
设计上是支持的. 建议贴上代码, 这样大家比较好判断问题所在.

On Fri, Aug 26, 2022 at 4:08 PM 杨扬  wrote:

> 各位好!
> 目前有一flink作业,source与sink均为kafka。
> 在换版时(未修改任何代码)基于官网文档命令,创建savepoint并停止作业;而后基于之前创建的savepoint启动作业。
> 现在发现如此操作无法实现启停前后数据无缝对接,会出现一定的数据重复。
>
> 想请教这个问题是savepoint设计时本身就无法保证启停前后端到端一致性,还是我们哪里操作不当呢?
>
>
>
>
>
>
>


Re: 撤回流如何进行窗口分组聚合

2021-09-28 文章 Shuo Cheng
这个在 1.14 已经支持了, 详见 FLINK-20487

On 9/28/21, Liu Join  wrote:
> 我将数据流进行去重后,无法进行窗口聚合操作,一直报错GroupWindowAggregate doesn't support consuming
> update and delete changes which is produced by node Deduplicate
>


Re: flink 1.13.2 使用avg函数对int字段求平均值,输出类型为int类型,而不是浮点型

2021-09-27 文章 Shuo Cheng
by-design 的行为, avg 就是 sum / count, flink 目前行为是根据入参类型来推断返回类型 (与 `sum` 以及 `/`
保持一致), 想要保持高精度,可以考虑把入参 cast 成 double.

On Mon, Sep 27, 2021 at 2:30 PM Asahi Lee <978466...@qq.com.invalid> wrote:

> hi!   我使用flink 1.13.2版本,在对 int 类型的字段通过avg函数求平均值时,其返回值类型为
> int ,而不是 double,decimal等浮点类型,导致计算值的精度丢失,请问这是bug吗?


Re: flink sql streaming情况如何解决数据倾斜问题

2021-09-07 文章 Shuo Cheng
可以参考下针对 AGG 的调优指南
https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/tuning/

On Wed, Sep 8, 2021 at 11:05 AM yidan zhao  wrote:

> 我们流量大概4w的qps,如何根据key1+key2进行pv统计(SQL任务比较简单)。
>
> 但是key2的分布比较极端,有些可能90%集中的。
>
> Shuo Cheng  于2021年9月7日周二 下午7:30写道:
>
> > 最好具体描述下什么场景的倾斜, sql 上也有一些解倾斜的手段
> >
> > On 9/7/21, yidan zhao  wrote:
> > > 如题,目前非sql情况本身实现灵活,有很多方案。
> > > 但是SQL情况下,倾斜严重,同时无解。有没有小伙伴解决过类似问题。
> > >
> > > 注意:sql,流任务,数据倾斜。
> > >
> >
>


Re: flink sql streaming情况如何解决数据倾斜问题

2021-09-07 文章 Shuo Cheng
最好具体描述下什么场景的倾斜, sql 上也有一些解倾斜的手段

On 9/7/21, yidan zhao  wrote:
> 如题,目前非sql情况本身实现灵活,有很多方案。
> 但是SQL情况下,倾斜严重,同时无解。有没有小伙伴解决过类似问题。
>
> 注意:sql,流任务,数据倾斜。
>


Re: 未生成水位线

2021-09-02 文章 Shuo Cheng
Hello, 注意到你设置了 `table.exec.source.idle-timeout`, 需要注意的是这个参数是 table 层的参数,
需要确认你的设置方式是否生效.

On Fri, Jan 29, 2021 at 5:41 PM 沉醉寒風 <1039601...@qq.com> wrote:

> 有的
>
>
>
> kafka 是3个分区, 但是只有一个分区有数据, flink是3个并行度
>
> -- 原始邮件 --
> *发件人:* "user-zh" ;
> *发送时间:* 2021年1月29日(星期五) 下午5:30
> *收件人:* "user-zh";
> *主题:* Re: 未生成水位线
>
> 看一下 WaterMarkAssigner节点 是否有 数据流入
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: 是否可以自定义trigger实现event time window的分散触发

2021-08-31 文章 Shuo Cheng
这样做是要达到设么目的呢? 目前的触发机制以及 early/late fire 满足不了需求么?

On 8/31/21, yidan zhao  wrote:
> 如题,我目前计划自定义event time trigger实现分散触发。
> 比如0-5的窗口分散到6-11分触发, 从6开始是因为本身有个1min的乱序处理。
> 同时配合将allowedlateness设置为5min,这样避免窗口状态在触发之前被clean。
>
> 不知道想法是否OK呢?
>


Re: 基于flink 1.12 如何通过Flink SQL 实现 多sink端 在同一个事务中

2021-08-29 文章 Shuo Cheng
你好, 你说的这种控制写入的方式在同一个 Flink SQL job 里是无法实现的. 控制数据是否写入某个
Sink,可以看看是否从逻辑上能在 Sink 前加一个 Filter,从而达到过滤目的;如果 kafka sink 跟 MySQL
表是一种类似级联的关系, 可以考虑先写入 MySQL, 然后另起一个 Job 用 CDC 方式读 MySQL changelog 再写入
Kafka sink.

On 8/26/21, jie han  wrote:
> HI:
> 可以尝试下使用flink cdc 的方式写入到第二个kafka里面呀
>
> 悟空  于2021年8月26日周四 下午1:54写道:
>
>> 我目前用的是flink-connector-kafka_2.11和flink-connector-jdbc_2.11,
>> 测试时,我把任务启动好之后,把mysql 中的目标表删除 或 删除必要字段,
>> 之后发送一条kafka数据,会报java.sql.BatchUpdateException 异常,然后重试3次。
>> 但是接着sink Kafka 是成功的,Kafka端 我开启了'sink.semantic' = 'exactly-once',
>> 同时下游consumer 使用--isolation-level read_committed
>> 读取,依旧能成功读取到数据,说明sink
>> db 失败,但是sink kafka成功,同时flink 本身任务不会挂掉。
>>
>>
>>
>>
>> --原始邮件--
>> 发件人:
>>   "user-zh"
>> <
>> tsreape...@gmail.com;
>> 发送时间:2021年8月26日(星期四) 中午1:25
>> 收件人:"user-zh">
>> 主题:Re: 基于flink 1.12 如何通过Flink SQL 实现 多sink端 在同一个事务中
>>
>>
>>
>> Hi!
>>
>> 如果 statement set 已经包含了多个 insert 语句,那么写入 kafka 表和写入 db 应该是在同一个作业里进行的。如果写入
>> db
>> 失败,那么产生的 exception 应该会让作业失败才对。这里 db 写入失败但 kafka 依旧写入是什么样的现象?
>>
>> 另外其实可以考虑分成两个作业,第一个作业将数据写入 db,第二个作业从 db 读出数据写入 kafka。关于捕获 db 数据的变化,可以看一下
>> Flink CDC connector[1]
>>
>> [1] https://github.com/ververica/flink-cdc-connectors
>>
>> 悟空 >
>>  能否详细说下呢,statement set[1] 是什么意思, 我是通过StatementSet.addInsertSql 将多个sql
>>  加入的,然后执行execute()方法
>> 
>> 
>> 
>> 
>>  --nbsp;原始邮件nbsp;--
>>  发件人:
>> 
>> "user-zh"
>> 
>> <
>>  fskm...@gmail.comgt;;
>>  发送时间:nbsp;2021年8月26日(星期四) 中午12:36
>>  收件人:nbsp;"user-zh"> 
>>  主题:nbsp;Re: 基于flink 1.12 如何通过Flink SQL 实现 多sink端 在同一个事务中
>> 
>> 
>> 
>>  说的是 statement set [1] 吗 ?
>> 
>>  [1]
>> 
>> 
>> https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/sqlclient/#execute-a-set-of-sql-statements
>> 
>> 
>> ;
>>  悟空 > 
>>  gt; hi all:amp;nbsp;
>>  gt; amp;nbsp; amp;nbsp; 我目前基于flink 1.12 sql 来开发功能,
>> 目前遇到一个问题, 我现在想实现
>>  在一个事务里 先将kafka
>>  gt; 源的数据写入到一张msyql 表中, 写入成功后再写入一张kafka的表里。如果写入db失败,则不写入kafka 。
>>  gt; amp;nbsp; amp;nbsp;语句类似这种:
>>  gt; amp;nbsp; amp;nbsp;insert into
>> db_table_sinkamp;nbsp;select *
>>  fromamp;nbsp;
>>  gt; kafka_source_table;
>>  gt; amp;nbsp; amp;nbsp;insert into kafka_table_sink
>> select * from
>>  kafka_source_table;
>>  gt;
>>  gt;
>>  gt; amp;nbsp; 请问flink SQL 有实现方式吗?
>> 目前经过测试,发现如果db写入失败,但是kafka依旧写入,同时flink
>>  程序没有挂掉。
>


Re: 窗口函数使用的时间类型

2021-06-01 文章 Shuo Cheng
SQL 流作业 window 可定义在两种时间属性类型字段上:
1) event time: ddl 中需要给时间类型字段 (timestamp) 定义 watermark
2) process time: 使用 PROCTIME()

On 6/1/21, guoyb <861277...@qq.com> wrote:
> 是的。
>
>
> 大神能否指条明路解决这问题。
>
>
>
> ---原始邮件---
> 发件人: "MOBIN"<18814118...@163.com
> 发送时间: 2021年6月1日(周二) 晚上7:09
> 收件人: "user-zh@flink.apache.org" 主题: 回复:窗口函数使用的时间类型
>
>
> 是不是报的类似下面的错?
> Window aggregate can only be defined over a time attribute column, but
> TIMESTAMP(3) encountered
>
>
> | |
> MOBIN
> |
> 签名由网易邮箱大师定制
>
>
> 在2021年06月1日 19:00,guoyb<861277...@qq.com 写道:
> tumble() 开窗,需要的事件时间到底需要什么时间类型?一直报时间不对
> timestamp(3)
> datetime
> time
> 都试过了,没有一个对的。


Re: 流与流 left join

2021-05-30 文章 Shuo Cheng
state ttl 只能是全局算子维度, table.exec.state.ttl



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink 1.12.2 编译报错

2021-05-27 文章 Shuo Cheng
Hi, org.hamcrest 是 junit 的依赖

On Fri, May 28, 2021 at 10:28 AM Zhiwen Sun  wrote:

> 才编译到 Test utils : Junit 模块,就报错了
>
> maven 版本: 3.2.5
> jdk 版本:1.8.0_251
> flink 版本: flink 1.12.2
> 执行的命令:mvn clean install -DskipTests -Dfast
>
> 错误信息:
>
> [ERROR] COMPILATION ERROR :
> [INFO] -
> [ERROR]
>
> /data/flink-release-1.12.2/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CommonTestUtils.java:[38,27]
> package org.hamcrest does not exist
> [ERROR]
>
> /data/flink-release-1.12.2/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CommonTestUtils.java:[38,1]
> static import only from classes and interfaces
> [ERROR]
>
> /data/flink-release-1.12.2/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CommonTestUtils.java:[39,27]
> package org.hamcrest does not exist
>
> [INFO] Reactor Summary:
> [INFO]
> [INFO] Flink : Tools : Force Shading .. SUCCESS [
>  1.042 s]
> [INFO] Flink :  SUCCESS [
>  1.404 s]
> [INFO] Flink : Annotations  SUCCESS [
>  0.735 s]
> [INFO] Flink : Test utils : ... SUCCESS [
>  0.042 s]
> [INFO] Flink : Test utils : Junit . FAILURE [
>  0.283 s]
>
>
> 看起来是缺少 org.hamcrest  相关依赖
> 我看 flink-test-utils-parent/pom.xml 和 flink-test-utils-junit/pom.xml
> 的确没加 org.hamcrest 相关依赖, 不知道这个是怎么工作的。
>
> 请问大家下,原因是什么呢?
>
>
> Zhiwen Sun
>


Re: 流与流 left join

2021-05-27 文章 Shuo Cheng
我理解双流 Join 就能满足需求吧, 缺点是数据全量放 state,只能靠 state ttl 来清理数据

On 5/27/21, chenchencc <1353637...@qq.com> wrote:
> 想问下cep sql批处理能使用吗?想流批一体的。
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


[ANNOUNCE] Dian Fu becomes a Flink committer

2020-01-16 文章 Shuo Cheng
Congratulations!  Dian Fu

> Xingbo Wei Zhong  于2020年1月16日周四 下午6:13写道:  jincheng sun  
> 于2020年1月16日周四 下午5:58写道:


Re: 回复: 回复: 回复: 回复:如何用SQL表达对设备离在线监控

2019-12-05 文章 Shuo Cheng
用 udagg 应该能比较完美的解决你的问题 ^.^

On 12/6/19, Djeng Lee  wrote:
> 存在查询
>
> 在 2019/12/5 下午4:06,“Yuan,Youjun” 写入:
>
> Count=0的窗口如何能得到呢?没有数据就没有产出。
> 然而可以同rows
> over窗口,将两个前后窗口的sum-当前的count,可以间接得到两个窗口的count是否相等。同时辅以前后窗口时间的差,来辅助判断。
>
> 最终在自定义函数last_value_str/first_value_str的帮助下,勉强得以实现(尚不完美,可能出现连续的ONLINE的输出)
> 下面是我的SQL,仅供参考:
>
> INSERT INTO mysink
> SELECT userid, lastts, case when preCnt <= 0 OR tsdiff > 10 THEN
> 'ONLINE' ELSE 'offline' END AS status
> FROM (
>   SELECT curCnt, preCnt, lastts, firstts, userid, yeardiff * 31536000 +
> monthdiff * 2678400 + daydiff * 86400 + hourdiff * 3600 + mindiff * 60 +
> seconddiff AS tsdiff
>   FROM (
>   SELECT curCnt, preCnt,
>   cast(substring(lastts, 1, 4) as bigint) - 
> cast(substring(firstts, 1,
> 4) as bigint) as yeardiff,
>   cast(substring(lastts, 6, 2) as bigint) - 
> cast(substring(firstts, 6,
> 2) as bigint) as monthdiff,
>   cast(substring(lastts, 9, 2) as bigint) - 
> cast(substring(firstts, 9,
> 2) as bigint) as daydiff,
>   cast(substring(lastts, 12, 2) as bigint) - 
> cast(substring(firstts,
> 12, 2) as bigint) as hourdiff,
>   cast(substring(lastts, 15, 2) as bigint) - 
> cast(substring(firstts,
> 15, 2) as bigint) as mindiff,
>   cast(substring(lastts, 18, 2) as bigint) - 
> cast(substring(firstts,
> 18, 2) as bigint) as seconddiff,
>   lastts, firstts, userid
>   FROM (
>   SELECT userid, cnt AS curCnt, sum(cnt) OVER w - cnt as 
> preCnt,
> last_value_str(ts0) OVER w as lastts, first_value_str(ts0) OVER w as firstts
>
>   FROM (
>   SELECT HOP_PROCTIME(rowtime, interval '5' 
> second, interval '10'
> second) AS rowtime, count(*) as cnt, userid, last_value_str(cast(rowtime AS
> varchar)) AS ts0
>   FROM mysrc
>   GROUP BY userid, hop(rowtime, interval '5' 
> second, interval '10'
> second)
>   )
>   WINDOW w as (PARTITION BY userid ORDER BY rowtime ROWS 
> BETWEEN 1
> PRECEDING AND CURRENT ROW)
>   )
>   )
>   WHERE (preCnt <= 0 OR yeardiff * 31536000 + monthdiff * 2678400 +
> daydiff * 86400 + hourdiff * 3600 + mindiff * 60 + seconddiff > 10) OR
> (curCnt = preCnt AND lastts = lastts)
> )
>
> -邮件原件-
> 发件人: 1193216154 <1193216...@qq.com>
> 发送时间: Thursday, December 5, 2019 2:43 PM
> 收件人: user-zh 
> 主题: 回复: 回复: 回复:如何用SQL表达对设备离在线监控
>
> 可以考虑用flink cep,应该可以解决你的问题。
>
>
> --原始邮件--
> 发件人:"Djeng Lee" 发送时间:2019年12月5日(星期四) 下午2:40
> 收件人:"user-zh@flink.apache.org"
> 主题:Re: 回复: 回复:如何用SQL表达对设备离在线监控
>
>
>
> 上线时间,前n窗口count == 0 , 后n窗口count  1。说明是上线。由此得出上线时间.
> 离线时间,前n 窗口count=1, 后n窗口count==0,说明下线,由此可得下线时间。
> 前n后n都1 作为心跳维持。
>
>
>
> 在 2019/12/5 下午2:06,“Yuan,Youjun”
>  谢谢你的回复。
> 
> 这种方案比较有意思,只是还不能区分设备第一次心跳产生的count=1的消息(上线),和设备最后一次心跳产生的count=1的消息(下线)
> 
>  -邮件原件-
>  发件人: 1193216154 <1193216...@qq.com
>  发送时间: Wednesday, December 4, 2019 9:39 PM
>  收件人: user-zh   主题: 回复:如何用SQL表达对设备离在线监控
> 
>  设定一个滑动窗口,窗口大小大于等于2n,滑动间隔大于等于n,若一次窗口结算,count
> 大于等于2,则在线,否则下线
> 
>  ---原始邮件---
>  发件人: "Yuan,Youjun"  发送时间: 2019年12月4日(周三) 晚上6:49
>  收件人:
> "user-zh@flink.apache.org"  主题: 如何用SQL表达对设备离在线监控
> 
> 
>  Hi all,
> 
> 
> 假设我们有很多设备,设备正常工作期间会定时发送心跳到服务器。如果某个设备在超过N分钟的时间内,没有发送任何心跳到服务器,服务器会认为设备已经离线。直到下一次心跳,才判定设备为在线。
> 
> 需求:在判定设备离线时,产出一条设备离线消息;在设备经过一次离线后,第一次心跳时,产出一条设备上线的消息;
>  假设设备上报的消息包含当前时间(ts)和设备id(deviceid):
>  1575456144,dev1
>  1575456146,dev2
>  1575456147,dev1
>  ….
> 
>  产出的离在线消息分别格式如下(第一列为设备离在线时间):
>  1575456158,dev1,offline 
> 1575456169,dev2,online  
> 能否用一条SQL来定义这个作业呢?
> 
>  谢谢!
>  袁尤军
> 
>
>


Re: 如何优化flink内存?

2019-09-04 文章 Shuo Cheng
如果是使用 datastream api 的话,滑动窗口对于每条数据都会在 state 中存 size / slide
份,像你这种大小设置,肯定会导致内存的大量消耗.

On Wed, Sep 4, 2019 at 8:07 PM Yifei Qi  wrote:

> 大家好:
>
>
>
> 不知道大家在使用flink时遇到过内存消耗过大的问题么?
>
>
>
> 我们最近在用flink算一些实时的统计数据, 但是内存消耗很大, 不知道有没有人知道如何优化?
>
>
>
> 具体情况是这样的:
>
> 准备的测试数据模拟一天时间内3万个用户的5万条数据. 原始数据一共是100M.
>
> 按照用户进行分组.
>
> 计算两个滑动窗口任务:一个是近1小时, 每5秒滑动一次的窗口. 一个是近24小时, 每1分钟滑动一次的窗口.
>
>
>
>
>
> flink运行在3个节点后, 内存合计就用了5G.
>
>
>
>
>
> flink如此消耗内存, 不知道是它本来就这么吃内存, 还是我使用的有问题.
>
>
>
>
>
> 顺祝商祺
>
>
> --
>
>
> Qi Yifei
> [image: https://]about.me/qyf404
> <
> https://about.me/qyf404?promo=email_sig_source=product_medium=email_sig_campaign=gmail_api
> >
>