Re: checkpoint失败导致watermark不更新问题

2020-10-09 文章 shizk233
hi,这种情况似乎像是反压造成的,数据流反压会导致算子来不及处理checkpoint事件,watermark消息也会因为反压无法发送到下游算子。 建议观察下反压的情况[1],如果是这样的话,再针对反压源头进行优化处理。 [1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/monitoring/back_pressure.html restart 于2020年10月9日周五 上午11:48写道: > 大家好,请假一个问题: >

Re: flink 1.11.2 Table sql聚合后设置并行度为2及以上,数据无法输出

2020-09-28 文章 shizk233
flink sql似乎不能设置rebalance,在Data Stream API可以设。 一种思路是拆分sql逻辑,在Data Stream API上创建kafka source后再reblance成一张新表,再做后续dml sql。 另一种思路就是kafka topic增加一下分区 Asahi Lee <978466...@qq.com> 于2020年9月28日周一 下午1:56写道: > 你好! 使用flink > SQL,如何设置rebalance呢?--原始邮件-- >

Re: 当kafka有大量数据积压并且flink冷启动,flink端读取kafka有没有限流的参数?

2020-09-28 文章 shizk233
应该是没有的,可以自己改造下kafka source来实现。 话说flink自动反压流控不能满足场景需要吗 me 于2020年9月27日周日 下午5:45写道: > flink版本1.11 > flink连接kafka使用的是 flink addSource特性 > > > 原始邮件 > 发件人: me > 收件人: user-zh > 发送时间: 2020年9月27日(周日) 17:22 > 主题: 当kafka有大量数据积压并且flink冷启动,flink端读取kafka有没有限流的参数? > > >

Re: flink获取latencymarker有什么好的方法

2020-09-26 文章 shizk233
如果是Data Stream API的话,可以考虑在目标算子上使用自定义metrics来展示数据延时情况 郭士榕 于2020年9月26日周六 下午9:15写道: > Hi,All > > > 想问下大家如果要展示Flink任务的当前延时情况,有什么比较好的方法吗?用LatencyMarker是否可以,用API/JMX层面来获取的histogram能否汇总成一个数字?

Re: 请教二阶段提交问题

2020-09-26 文章 shizk233
需要搭配事务性存储机制来使用,能够保证预提交成功的数据能最终被commit成功。 详情可以参考孙金城老师关于这一部分的讲解和代码实现[1] [1]https://www.bilibili.com/video/BV1yk4y1z7Lr?p=33 高亮 于2020年9月25日周五 上午11:14写道: > 各位大佬,请教一下二阶段提交的问题,preCommit预提交失败回滚已经很清楚了,就是在commit阶段提交如果失败会怎么,比较迷惑。 > > > >

Re: 从Savepoint/Checkpoint恢复时 keyedstate中部分数据无法被读取

2020-09-10 文章 shizk233
flink用的自己的序列化机制。从chk恢复的时候,在open方法里会进行状态数据的注入。 按我的理解,该transient标记有没有都可以从chk恢复,但一般加入transient可以明确只有open方法中的数据注入这一种方法。 至于不加上transient是否可能产生其他影响,就不太清楚了。 范超 于2020年9月10日周四 上午9:35写道: > Transient 都不参与序列化了,怎么可能从checkopont里恢复? > > -邮件原件- > 发件人: Yun Tang [mailto:myas...@live.com] > 发送时间:

Re: 回复:flink1.9.1采用Prometheus Pushgateway监控,当任务被kill掉,但grafana还是可以监控到数据

2020-09-01 文章 shizk233
Hi Xiao, 我这边实践过程中发现,该参数只能删除jobmanager对应的metrics group,不能删除tm的。 我们开启了randomJobNameSuffix,该参数会让JM和TM的metrics信息分属不同metrics group。 感觉这可能是一个bug? xiao cai 于2020年9月1日周二 下午4:57写道: > Hi: > 可以试试在flink-conf.yaml中添加: > metrics.reporter.promgateway.deleteOnShutdown: true > > > Best, > Xiao > 原始邮件 > 发件人:

Re: TM SLOT资源共享

2020-09-01 文章 shizk233
你说的资源共享是指slot sharing吗,同一个job的task默认都在一个slot group中,都是可以共享slot的。 但同一个task的多个并发实例不能共享slot,多个实例需要分配在不同的slot中。 另外,关于“slot中存在资源共享时,1个slot会有多个thread”。如果是指数据处理的thread,那我理解应该是1个slot只有1个thread。 liangji 于2020年9月1日周二 下午4:45写道: > >

Re: 关于sink失败 不消费kafka消息的处理

2020-08-27 文章 shizk233
失败,而标记这个checkpoint的快照整体失败。 > > 从而重启消费会从source的1开始重新消费 > > > > > > -邮件原件- > > 发件人: Benchao Li [mailto:libenc...@apache.org] > > 发送时间: 2020年8月27日 星期四 10:06 > > 收件人: user-zh > > 主题: Re: 关于sink失败 不消费kafka消息的处理 > > > > Hi Eleanore,

Re: Source 定时执行sql,只执行一次就close了source

2020-08-27 文章 shizk233
不太清楚你定时读mysql是需要做什么,如果是维表join的话考虑temporal table join[1],通过设置ttl时间和数量来更新缓存[2] [1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/streaming/temporal_tables.html [2]

Re: 关于sink失败 不消费kafka消息的处理

2020-08-26 文章 shizk233
Hi Eleanore,这个问题我可以提供一点理解作为参考 1.chk与at least once checkpoint机制的思想就是做全局一致性的快照,失败恢复时数据的消费位点会回滚到上一次chk n的进度, 然后进行数据重放,这样就保证了数据不会缺失,至少被消费一次。 2. sink2PC 在chk机制下,数据重放时一般sink端的数据不能回滚,就会有重复数据。如果是upsert sink那仍然是一致的, 否则需要通过2PC的预提交来将chk n+1成功前的数据写到临时存储,等chk n+1完成再真正写入的物理存储。如果 在chk

Re: ProcessWindowFunction为何在clear方法中无法清理状态-v1.10.1

2020-08-25 文章 shizk233
按我的理解,参考aggregate(AggregateFunction aggFunction, ProcessWindowFunction windowFunction)方法, 窗口中的状态数据是存在某个聚合函数里的,processWindowFunction只是处理窗口的结果,需要通过context获取对应的窗口state来做清理。 x <35907...@qq.com> 于2020年8月25日周二 下午6:25写道: > >

Re: flink on yarn默认GC的问题

2020-08-24 文章 shizk233
Song > > > > On Mon, Aug 24, 2020 at 5:26 PM shizk233 > wrote: > > > Hi all, > > > > 请教一下,flink自从1.10开始默认GC就是G1了,在taskmanager.sh脚本中也能看到。 > > 在*默认设置*下,能观察到本地flink使用的G1,但on > yarn运行时却发现使用的是PS,想请教下这是为什么?是yarn会对应用有一些默认设置吗? > > > > 我搜索了一些相关资料,但仍然没有

flink on yarn默认GC的问题

2020-08-24 文章 shizk233
Hi all, 请教一下,flink自从1.10开始默认GC就是G1了,在taskmanager.sh脚本中也能看到。 在*默认设置*下,能观察到本地flink使用的G1,但on yarn运行时却发现使用的是PS,想请教下这是为什么?是yarn会对应用有一些默认设置吗? 我搜索了一些相关资料,但仍然没有搞清楚这是怎么回事,希望有了解的朋友帮忙解答下。感谢! 备注:我可以通过在flink-conf.yaml中设置env.java.opts: -XX:+UseG1GC来使flink on yarn也使用G1。

Re: state序列化问题

2020-08-21 文章 shizk233
ob/master/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializer.java > > > 祝好 > 唐云 > > ____ > From: shizk233 > Sent: Friday, August 21, 2020 10:51 > To: user-zh@flink.apache.org > Subject: Re: state序列化问题 > >

Re: state序列化问题

2020-08-20 文章 shizk233
ListState的格式应该是 > ListState, 而不是 > ListState>,后者表示有一个list,list中的每一个元素均是一个list > > ListState 本身并不属于java的collection,所以不存在ArrayList 与 LinkedList的区别。 > > 祝好 > 唐云 > ____ > From: shizk233 > Sent: Thursday, August 20, 2020 18:00 > To: user-zh@flink.apa

Re: KeyedCoProcessFunction中的状态数据是否存在资源竞争的可能

2020-08-20 文章 shizk233
理解了!感谢! Benchao Li 于2020年8月20日周四 下午6:00写道: > 不同的key应该用的是同一个state的实例,但是state下面会处理不同的key对应的state,也就是key对于用户来说是透明的。 > 比如你用一个MapState,那就是约等于每个key都有一个Map实例,不同key之间是独立的。 > > shizk233 于2020年8月20日周四 下午5:03写道: > > > 谢谢大佬解答。 > > 想再问一下,在并发1的情况下,一个算子里的MapState应该只有一个实例, > >

state序列化问题

2020-08-20 文章 shizk233
Hi all, 请教一下,State应该是通过StateDescriptor提取的类型信息来序列化/反序列化, 那么如果声明为接口类型,如ListState>,但实际存入的是ArrayList/LinkedList, 会对类型信息提取产生不良影响吗? 按我的理解,ArrayList和LinkedList在序列化时的bytes组成结构应该是不太一样的。 但是都可以作为List>来声明。 请求野生的大佬支援一下!

Re: KeyedCoProcessFunction中的状态数据是否存在资源竞争的可能

2020-08-20 文章 shizk233
"user-zh" > < > libenc...@apache.org; > 发送时间:2020年8月20日(星期四) 下午4:39 > 收件人:"user-zh" > 主题:Re: KeyedCoProcessFunction中的状态数据是否存在资源竞争的可能 > > > > Hi, > > 问题12 都不存在多线程的问题。Flink底层来保证这些方法都是在同一

Re: KeyedCoProcessFunction中的状态数据是否存在资源竞争的可能

2020-08-20 文章 shizk233
谢谢大佬解答。 想再问一下,在并发1的情况下,一个算子里的MapState应该只有一个实例, 那么数据流上的不同key(不是map里的hash key)是怎么保证只获取到对应的那部分map state数据呢? 按我的理解,按key分流后,每个子流应该是只有自己的state,但从算子实例考虑,好像只有1个state实例存在。 Benchao Li 于2020年8月20日周四 下午4:40写道: > Hi, > > 问题1&2 都不存在多线程的问题。Flink底层来保证这些方法都是在同一个线程串行执行的。 > > shizk233 于20

KeyedCoProcessFunction中的状态数据是否存在资源竞争的可能

2020-08-20 文章 shizk233
Hi all, 请教一下,KeyedCoProcessFunction比较特殊,有两个输入,对应两个ProcessElement方法。 问题1: 如果在这两个Process方法中都对同一个MapState进行修改,是否会存在资源竞争的关系? 还是这两个方法是顺序执行的? 问题2: 虽然有不同的key,但函数只有一个实例,其中的MapState应该也是一个实例,那么不同key下的 Process过程是并发执行的还是顺序执行的,会竞争MapState资源吗?

Re: task传输数据时反序列化失败

2020-08-19 文章 shizk233
Congxian Qiu 于2020年8月19日周三 下午1:58写道: > Hi > 从栈看应该是 deserialize 的时候出错了,另外 kryo 可以,Pojo 不行,能否检查下,是否满足 POJO > 的一些要求[1]呢? > > [1] > > https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/state/schema_evolution.html#pojo-types > Best, > Congxian > > &

task传输数据时反序列化失败

2020-08-18 文章 shizk233
Hi all, 请教一下反序列化的问题,我有一个KeyedCoProcessFunction,输入是log流和rule流。 数据流如下: logSource .connect(ruleSource) .keyby(...) .process(My KeyedCoProcessFunction<>) .keyby(...) .print() 其中CoProcess函数中有两个MapState分别做log缓存和rule缓存。 结构为Map> logState,Map> ruleState. T在实例化函数时确定,为MyLog类型。

Re: flink sql 数据异常导致任务失败

2020-08-18 文章 shizk233
考虑修改一下json解析的逻辑来处理异常数据? 赵一旦 于2020年8月18日周二 上午11:59写道: > 有没有小伙伴生产中有类似问题呢,都怎么解决的呢? > 我指通过FlinkSQL方式,结合datastream api方式倒是好解决,只需要把容易出错的地方换成datastream > api,然后捕获所有异常即可。 > > 赵一旦 于2020年8月17日周一 下午7:15写道: > > > kafka source,一条异常数据就会导致任务失败,这种问题怎么解决呢? > > > >

Re: flink sql在实时数仓中,关联hbase维表频繁变化的问题

2020-08-17 文章 shizk233
有没有可能把维表数据也作为数据流从kafka输入呢 Jim Chen 于2020年8月17日周一 下午4:36写道: > 大家好: > 我们现在在用flink sql在做实时数仓,架构大概是kafka关联hbase维表,然后写入clickhouse。hbase维表是频繁变化的 > 现在遇到的几个比较棘手的问题: > 1、自己在实现AsyncTableFunction做异步io的时候,发现性能还是不够。后来就加入本地缓存,但是缓存一致性出现问题,不知道该如何解决 > 2、写入hbase的时候,是批量写的,无法保证有序,维表频繁变化的话,顺序不对,会造成结果有问题 >

Re: 如何在KeyedProcessFunction中获取processingTime

2020-08-17 文章 shizk233
ctx.timestamp()其实就是获取的StreamRecord的时间戳,也就是事件被提取出来的时间戳。 这个方法一般需要使用event time,并且在数据流上assign过timestamp和watermark。 ゞ野蠻遊戲χ 于2020年8月16日周日 下午7:57写道: > 大家好 > > > 当我在KeyedProcessFunction的processElement方法中获取processingTime,就像这样ctx.timestamp(),返回为null,我改如何在processElement中获取processingTime? > > > 谢谢! > 嘉治

Re: flink 1.11 日志不能正常打印问题

2020-08-13 文章 shizk233
flink框架里用的slf4j吧,log4j2只是一种具体实现,应该是可以直接替换掉的。 就是把flink发行包下log4j2相关的jar替换成log4j的jar,当然,相应的配置文件也要改成log4j支持的配置。 caozhen 于2020年8月13日周四 下午3:39写道: > flink1.11好像是用的log4j2,我的mainjar用到了log4j, 两者类有冲突,导致JM、TM日志为空。 > > SLF4J: Class path contains multiple SLF4J bindings. > SLF4J: Found binding in >

Re: 关于flink sql的内置函数实现与debug

2020-08-12 文章 shizk233
> 一种是通过类似于UDAF的方式来实现的,继承的是`AggregateFunction` > 他们都在`org.apache.flink.table.planner.functions.aggfunctions` > 包里面(flink-table-planner-blink模块) > > shizk233 于2020年8月12日周三 上午10:39写道: > > > hi all, > > > > 请教一下,flink sql内置的众多functions[1]有对应的Java实现类吗?我只在blink tab

关于flink sql的内置函数实现与debug

2020-08-11 文章 shizk233
hi all, 请教一下,flink sql内置的众多functions[1]有对应的Java实现类吗?我只在blink table planner模块下的functions package里找到了一部分,并且是基于Expresstion的。 问题来源:我试图在flink sql里去做debug,如果是自定义的udf可以打断点在实现上,但内置函数没找到相应的实现,似乎也没有相应的文档在这一块。 [1]

Re: 读取hive表的数据转化为流;按照jdbc两阶段提交方式写入oracle数据库,没有异常但是checkpoint失败

2020-08-10 文章 shizk233
Hi,这个日志全是有点头大。。。 我刚想到,除了task重启外,还有一种情况是task没有调度成功。 你能通过flink web ui观察到task的状态吗,都是RUNNING吗? 如果一直是schedule,那应该是缺少对应的资源进行调度,需要检查下task manager提供的slot资源以及任务所需的资源。 如果是running、failed、schedule的不断切换,那需要检查task manager的日志,应该有warn。 Bruce 于2020年8月10日周一 下午6:12写道: > 下面是附件的内容,请问是因为什么导致重启呢? > > >

Re: flink集群搭建

2020-08-10 文章 shizk233
这个应该根据业务特性来决定吧。 如果是一些大型的streaming任务,需要长期稳定运行并且有良好的隔离性,则可以考虑perjob模式。 如果需要经常性提交一些小任务(常见于batch任务)或者说有一批相关联的任务,彼此隔离性要求也不高的,可以考虑session模式。 感觉说到底还是业务隔离性与资源的权衡。 Dream-底限 于2020年8月10日周一 下午4:21写道: > hi、 > FlinkOnYarn集群部署是推荐使用yarn-session模式所有任务共用一个,还是推荐使用preJob模式每个任务起一个小集群 >

Re: Flink slot 可以跨 job 共享吗?

2020-08-10 文章 shizk233
“By default, Flink allows subtasks to share slots even if they are subtasks of different tasks, so long as they are from the same job.” 参考官网描述[1],应该只有相同job下的task可以共享slot。 [1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/concepts/flink-architecture.html wangl...@geekplus.com

Re: flink krb5.conf配置

2020-08-10 文章 shizk233
flink官网有一个kerberos相关的说明文档[1],不知是否能帮助到你。 [1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/security-kerberos.html zjfpla...@hotmail.com 于2020年8月10日周一 下午3:15写道: > Flink on yarn模式 > > > > zjfpla...@hotmail.com > > 发件人: zjfpla...@hotmail.com > 发送时间: 2020-08-10 15:09 > 收件人:

Re: flink timerservice注册的timer定时器只有少部分触发

2020-08-06 文章 shizk233
具体原因不太清楚,但建议使用context.timeService().currentEventTime()和currentProcessingTime()来获取当前的时间。 排查方法的话,不知道你有没有做算子的单元测试,如果还没有的话可以通过flink test util[1][2]做单元测试来debug排查, 可以比较明确的观察到timeService上的Timer状态。 [1] https://flink.apache.org/news/2020/02/07/a-guide-for-unit-testing-in-apache-flink.html [2]

Re: flink-1.11 模拟背压

2020-08-03 文章 shizk233
看到背压。 我不断产生数据100w以上了。 > > > > > > -- 原始邮件 ------ > 发件人: shizk233 发送时间: 2020年8月3日 23:03 > 收件人: user-zh@flink.apache.org 主题: 回复:flink-1.11 模拟背压 > > > > source没有看到数据消费具体是指什么,web ui上task的numOfRecordsIn吗 > > kcz <573693...@qq.com

Re: flink-1.11 模拟背压

2020-08-03 文章 shizk233
source没有看到数据消费具体是指什么,web ui上task的numOfRecordsIn吗 kcz <573693...@qq.com> 于2020年8月3日周一 下午7:29写道: > 我想看下背压的指标数据,我往kafka发送了100w数据,但是source我也没有看到数据被消费,是我哪里模拟错了吗 > public static void main(String[] args) throws Exception{ > > StreamExecutionEnvironment env = >

Re: flink run-application 怎样设置配置文件的环境变量

2020-08-03 文章 shizk233
在yarn上各节点起作用的环境变量应该是用-yD设置 Zhou Zach 于2020年8月3日周一 下午6:35写道: > Hi all, > > 通过如下方式设置HBASE_CONF_PATH变量,提交到yarn时,发现HBASE_CONF_PATH没有生效, > > > /opt/flink-1.11.1/bin/flink run-application -t yarn-application \ > -DHBASE_CONF_PATH='/etc/hbase/conf' \ > > > 请问flink提交job时,怎样设置环境变量?

Re: flink1.9.1 在WebUI中查看committedOffsets指标为负值

2020-07-30 文章 shizk233
如JasonLee所说,你可以在FlinkKafkaConsumerBase的notifyCheckpointComplete方法中看到提交offset的逻辑。 值得注意的是,此处的chk完成指的是整个链路上的chk完成,而不是kafka source的chk完成。 JasonLee <17610775...@163.com> 于2020年7月30日周四 下午9:59写道: > hi > 提交offset到Kafka是在ck成功之后 如果没有开启ck的话 需要设置自动提交提交offset > > > | | > JasonLee > | > | >

Re: flink1.9.1 在WebUI中查看committedOffsets指标为负值

2020-07-30 文章 shizk233
可以检查下在patition3上有没有成功提交过offsets。负值可能是没有提交过情况下的默认值(我猜这是个不变值)。 bradyMk 于2020年7月29日周三 下午6:36写道: > flink1.9.1 > > 在WebUI中查看Source__Custom_Source.KafkaConsumer.topic.geek-event-target.partition.3.committedOffsets指标为负值,查看官网释义:对于每个分区,最后一次成功提交到Kafka的偏移量。 > 但我这里为什么是负值呢? > < >

Re: Flink使用Kafka作为source时checkpoint成功提交offset的机制

2020-07-30 文章 shizk233
似乎楼主一开始说的checkpoint成功是指source 算子的checkpoint成功?但notifyCheckpointComplete函数要求的是整个链路的chk成功。 这个时候offset为100的消息必然已经被sink算子处理完成了,因为触发chk的屏障消息必然在offset100的消息之后到达sink算子。 hk__lrzy 于2020年7月29日周三 下午5:53写道: > 你是说emit之后的offset commit么?可以看下 > `Kafka09Fetcher`的runFetchLoop方法 > > > 在2020年07月29日 15:09,shuwen

Re: (无主题)

2020-07-23 文章 shizk233
恭喜! 罗显宴 <15927482...@163.com> 于2020年7月23日周四 上午1:14写道: > 感谢shizk233大佬,我这个问题终于得到解决,我主要是通过全窗口加mapstate实现的 > best > shizk233 > > > | | > 罗显宴 > | > | > 邮箱:15927482...@163.com > | > > 签名由 网易邮箱大师 定制 > > 在2020年07月21日 15:04,罗显宴 写道: > hi,我想到解决办法了,可以用全局

Re: (无主题)

2020-07-20 文章 shizk233
新增一条((A类型,13点),1)的记录,在13点得到最终结果(13点,2)。 而这个结果和我理解的你的需求不太一样,我理解的情况应该是12点输出(12点,1),13点输出(13点,1),因为目前只有A类型的数据。 期望的输出应该是无限时间上的去重类型统计,每隔1小时输出(几点,当前所有的类型总数)。 我觉得目前的设计可能和描述的需求不太一致?你看看是不是这么回事儿 Best, shizk233 罗显宴 <15927482...@163.com> 于2020年7月21日周二 上午11:53写道: > 好的, > 输入: > 心功能不全和心律失常用药,

Re: (无主题)

2020-07-20 文章 shizk233
Hi, 有点不太理解,能举个例子说明一下result那部分你预期的输入(几条sample 数据就行)和期望的输出结果吗 Best, shizk233 罗显宴 <15927482...@163.com> 于2020年7月21日周二 上午11:29写道: > hi, > > CountAgg是对一个窗口进行聚合,而一个窗口中的元素都是根据医药类别category分区而来的,都是一样的,所以我做累加就直接置为1了,你的意思是让我在CuontAgg上做累加吗 > > > | | > 罗显宴 > | > | > 邮箱:1

Re: (无主题)

2020-07-20 文章 shizk233
Hi, 我猜是因为设的1小时滚动窗口,WindowFunction里拿到的WindowEnd就是1小时的END, 而acc其实没有变化,也就是每隔20s触发拿到的结果是一样的,在MapState里也会忽略重复值。 你可以让acc做个累加,然后结果输出里把acc的值带上看看。 Best, shizk233 罗显宴 <15927482...@163.com> 于2020年7月20日周一 下午8:44写道: > > 大佬,不好意思,可能图片看不到,我把代码发一次,刚学习flink半个月,好多不懂,希望大佬莫嫌烦 > 罗显宴 > 邮箱:1

Re: flink job 跑一段时间 watermark 不推进的问题

2020-07-20 文章 shizk233
Hi, Flink metrics里有一项是task相关的指标currentWatermark,从中可以知道subtask_index,task_name,watermark三项信息,应该能帮助排查watermark的推进情况。 Best, shizk233 snack white 于2020年7月20日周一 下午3:51写道: > HI: > flink job 跑一段时间 watermark 不推进,任务没挂,source 是 kafka ,kafka 各个partition > 均有数据, flink job statue backend 为

Re: (无主题)

2020-07-20 文章 shizk233
Hi, 从你举的这个例子考虑,仍然可以使用ContinusEventTimeTrigger来持续触发结果更新的,只不过整个窗口可以根据结束条件考虑别的,比如Global窗口。 Best, shizk233 罗显宴 <15927482...@163.com> 于2020年7月20日周一 下午2:09写道: > > > 不是,是连续累计,比如我在某个医药网站上爬取有关药物,每小时统计爬取到的新增药物种类,然后一直这样进行下去,然后这个网站爬完了,可以换另一个网站, > | | > 罗显宴 > | > | > 邮箱:159274

Re: (无主题)

2020-07-19 文章 shizk233
Hi, 累计是仅在一天之内累计吗,这样的话可以开个一天的Tumbling Window,然后使用ContinusEventTimeTrigger每小时触发一下输出结果。 Best, shizk233 罗显宴 <15927482...@163.com> 于2020年7月20日周一 上午1:18写道: > > 大家好,怎么实现一个滚动窗口内的连续递增元素个数,比如每小时累计用户登录数,比如说凌晨1点登录了500人,到了两点就累计到1200,到3点累计到这样实现一个递增曲线,在网上看到云邪大佬写的sql但是不会写datastream > api,希望

Re: flink cep 如何处理超时事件?

2020-07-14 文章 shizk233
Hi drewfranklin, flink使用event time,然后类似下面这样可以吗? Pattern.begin("a").next("b").within(Time.minutes(1)); Best, shizk233 drewfranklin 于2020年7月14日周二 上午11:05写道: > Hello all. > 想请教下各位。 > > 我有个用户开户超时断点的场景。调研了一下,想通过flink cep 来实现。 > > 但是我定义pattern 后发现,我的这个

Re: flink1.9.1-消费kafka落pg库任务出错

2020-07-14 文章 shizk233
Hi nicygan, unable to create new native thread指的是无法创建checkpoint线程,并不是内存占用过大。 这种情况一般有3种可能的原因: 1.flink应用开启太多线程 2.机器上句柄设置太小 3.机器上的其他应用开启太多线程 建议排查一下机器上的ulimit设置(文件句柄会影响应用能开启的线程数),和flink metrics里监控到的线程数变化。 Best, shizk233 nicygan 于2020年7月14日周二 上午10:31写道: > dear all: > > 我有一个消费kafka数据写到p

Re: Flink DataStream 统计UV问题

2020-07-09 文章 shizk233
Hi Jiazhi, 1.如果数据流量不是很大的话,按每条数据触发也没问题。另外,基于事件时间的情况,提前触发可以选择ContinuousEventTimeTrigger,可以查看Trigger接口的实现找到你想要的trigger。 2.窗口结束后会自动释放。一般对于Global窗口需要手动设置TTL Best, shizk233 ゞ野蠻遊戲χ 于2020年7月7日周二 下午10:27写道: > 大家好! > > 想问下,现在在用DataStream的api来统计每天的UV,代码如下,有2个使用问题: > 1、在使用Tumbling窗口的时候

Re: Flink状态调试

2020-07-06 文章 shizk233
抱歉,我似乎回错邮件了,应该使用回复全部? Congxian Qiu 于2020年7月6日周一 下午7:22写道: > Hi > 想 debug checkpoint 文件的话,可以参考下这个 UT[1] > > [1] > > https://github.com/apache/flink/blob/master/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointMetadataLoadingTest.java > Best, > Congxian > > > Z-Z

Re: Flink状态调试

2020-07-06 文章 shizk233
Hi Z-Z, 如果你想查看的是程序中的state内容,建议触发一次savepoint并搭配state processor api来查询。 参考 https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/libs/state_processor_api.html Best, shizk233 Congxian Qiu 于2020年7月6日周一 下午7:22写道: > Hi > 想 debug checkpoint 文件的话,可以参考下这个 UT[1] > > [

Re: Flink sql 主动使数据延时一段时间有什么方案

2020-07-06 文章 shizk233
Hi Sun ZHu, 关于方法4,我记得kafka有时间轮功能,可以做到延迟消息的,可以了解一下。 Best, shizk233 Sun.Zhu <17626017...@163.com> 于2020年7月4日周六 上午12:23写道: > 感谢benchao和forideal的方案, > 方法1.使用udf,查不到 sleep 等一下在查 > --这个可以尝试 > 方法2.在 join operator处数据等一会再去查 > —我们使用的是flink sql,不是streaming,所以该方案可能行不通 > 方法3.如果没有 jo

Re: 做实时数仓,sql怎么保证分topic区有序

2020-07-02 文章 shizk233
Hi air23, sql似乎不支持相关的设置,可以通过env或配置文件设置所有蒜子的并行度。 你可以试试流转表,可以做到细粒度的控制。 Best, shizk233 air23 于2020年7月2日周四 下午6:40写道: > hi > 就是我用 >flink sql 通过ddl读取和写入kafka怎么设置并行度呢? >flink sql 通过ddl写入kafka怎么自定义分区呢? > > > 这样才能保证提高消费能力。和保证数据有序。 但是好像没有发现再table模式 或者sql 语句上设置 或者做自定义分区。 > > > > > >

Re: mysql sink connection timeout

2020-06-30 文章 shizk233
Hi LakeShen, 感谢!就是这个!我这就去试一下 Thanks, Xuhui Mao LakeShen 于2020年6月30日周二 下午2:06写道: > Hi shizk233, > > 可以看下这个 Jira , https://issues.apache.org/jira/browse/FLINK-16681。 > > 这个就是长时间没有数据,导致 connection 断开问题。 > > Best, > LakeShen > > shizk233 于2020年6月30日周二 下午1:34写道: >

Re: mysql sink connection timeout

2020-06-29 文章 shizk233
以参考这个jira > https://issues.apache.org/jira/browse/FLINK-12494 > 1. Throw execption and let flink runtime handle it; > 2. Handle it in OutputFormat; > > > | | > Zhonghan Tang > | > | > 13122260...@163.com > | > 签名由网易邮箱大师定制 > > > On 06/30/2020 11:53,shizk233 wrote

mysql sink connection timeout

2020-06-29 文章 shizk233
Hi All, 最近使用flink处理数据写入mysql sink,但由于业务场景在晚上没有数据流入,会触发mysql wait timeout限制(默认的8小时)导致连接失效。 即使在mysql url中添加了autoReconnect=true参数,仍会产生相应的异常,具体信息见下。 版本信息: flink 1.10.1 mysql server 5.6.47 mysql Connector/J 5.1.49 请问: 1.flink的jdbc connector是否可以采用连接池模型?如果只使用一个connection,是否可以添加某种心跳机制以保持active?