Re: Re: 关于flink table store的疑问
我理解,目前大的目标是为了流批一体,设计上实际上是对存储进行了抽象,从某种角度上来看可以理解为存储虚拟化,未来的想象空间要大很多。Iceberg,Hudi这些可以作为底层对接的一种具体实现。 On Fri, Sep 9, 2022 at 2:44 PM Xuyang wrote: > Hi,我理解Flink table store主要有以下几个优势: > 1、减少架构复杂性,不需要额外引入多余的组件 > 2、支持Flink计算中直接使用Flink table store的存储 > 3、毫秒级流式查询和olap能力 > > > > > -- > > Best! > Xuyang > > > > > > 在 2022-09-08 16:09:39,"r pp" 写道: > >应该是为了 流批一体 。不丢数据 > > > >Kyle Zhang 于2022年9月8日周四 08:37写道: > > > >> Hi all, > >> 看table > >> store的介绍也是关于数据湖存储以及用于实时流式读取的,那在定位上与iceberg、hudi等项目有什么不一样么,为什么要再开发一个项目? > >> > >> Best. > >> > > > > > >-- > >Best, > > pp >
Re:Re: 关于 UDAF 里面 ListView 的疑问
Hi,可以尝试下在createAccumulator中打个断点,然后一步步看为啥在getValue的时候acc变成null了。 我理解如果是“使用 ListView 时,无法正常获得 TypeInference”的话,应该报错,而不应该正确执行但是后面突然null了。 如果确定是某个地方发生了问题的话,可以在jira里贴一个issue[1]向社区反馈这个问题 ;) [1]https://issues.apache.org/jira/projects/FLINK/summary -- Best! Xuyang 在 2022-09-08 10:48:10,"Zhiwen Sun" 写道: >hi, > >感谢你的回复。 > >报错是在 getValue 的时候。 > > at GroupAggsHandler$439.getValue(Unknown Source) > at > org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:146) > > > >我的疑问是使用一个 Class 包装下 ListView 就能正常工作,而直接使用 ListView 是会报错。 > >比如使用 AggregateFunction 就正常,而使用 >AggregateFunctionListView> 就会 NPE。 > > >我怀疑使用 ListView 时,无法正常获得 TypeInference。 > > >Zhiwen Sun > > > >On Wed, Sep 7, 2022 at 11:46 PM Xuyang wrote: > >> Hi, >> 理论上来说,在你的case中,会先通过createAccumulator方法创建一个ListView作为acc,然后,每一个输入的row都会触发accumulate方法,将数据更新到刚才的acc中,最终通过getValue方法拿到当前acc的值。 >> >> >> >> >> 实际测试中的NPE发生在更新acc的时候还是getValue的时候呢?可以通过在这三个阶段设一下断点,分别看一下当前持有的acc是不是同一个对象 >> >> >> >> >> -- >> >> Best! >> Xuyang >> >> >> >> >> >> 在 2022-09-07 16:23:25,"Zhiwen Sun" 写道: >> >> Hi, >> 理论上来说,在你的case中,会先通过createAccumulator方法创建一个ListView作为acc,然后,每一个输入的row都会触发accumulate方法,将数据更新到刚才的acc中,最终通过getValue方法拿到当前acc的值。实际测试中的NPE发生在更新acc的时候还是getValue的时候呢?可以通过在这三个阶段设一下断点,分别看一下当前持有的acc是不是同一个对象
Re:Re: 关于flink table store的疑问
Hi,我理解Flink table store主要有以下几个优势: 1、减少架构复杂性,不需要额外引入多余的组件 2、支持Flink计算中直接使用Flink table store的存储 3、毫秒级流式查询和olap能力 -- Best! Xuyang 在 2022-09-08 16:09:39,"r pp" 写道: >应该是为了 流批一体 。不丢数据 > >Kyle Zhang 于2022年9月8日周四 08:37写道: > >> Hi all, >> 看table >> store的介绍也是关于数据湖存储以及用于实时流式读取的,那在定位上与iceberg、hudi等项目有什么不一样么,为什么要再开发一个项目? >> >> Best. >> > > >-- >Best, > pp
Re:这里为什么会报null指针错误,和源表数据有关系吗?
Hi,看上去是遇到了一条脏数据,问一下是在运行了一段时间之后突然报错的嘛? -- Best! Xuyang At 2022-09-09 11:46:47, "Asahi Lee" wrote: >2022-09-09 11:36:42,866 INFO >org.apache.flink.runtime.executiongraph.ExecutionGraph >[] - Source: HiveSource-ods_jt_hrs.ods_hrmis_HR_EMPL_Education (1/1) >(2a68412dab3602a1eeda5a750b308e23) switched from RUNNING to FAILED on >container_1658144991761_106260_01_02 @ hhny-cdh05 (dataPort=45015). >java.lang.RuntimeException: null > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:105) > ~[flink-dist_2.11-1.14.3.jar:1.14.3] > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89) > ~[flink-dist_2.11-1.14.3.jar:1.14.3] > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:43) > ~[flink-dist_2.11-1.14.3.jar:1.14.3] > at > org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:196) > ~[flink-dist_2.11-1.14.3.jar:1.14.3] > at > org.apache.flink.streaming.api.operators.source.NoOpTimestampsAndWatermarks$TimestampsOnlyOutput.collect(NoOpTimestampsAndWatermarks.java:97) > ~[flink-dist_2.11-1.14.3.jar:1.14.3] > at > org.apache.flink.streaming.api.operators.source.NoOpTimestampsAndWatermarks$TimestampsOnlyOutput.collect(NoOpTimestampsAndWatermarks.java:91) > ~[flink-dist_2.11-1.14.3.jar:1.14.3] > at > org.apache.flink.connector.file.src.impl.FileSourceRecordEmitter.emitRecord(FileSourceRecordEmitter.java:45) > ~[flink-table_2.11-1.14.3.jar:1.14.3] > at > org.apache.flink.connector.file.src.impl.FileSourceRecordEmitter.emitRecord(FileSourceRecordEmitter.java:35) > ~[flink-table_2.11-1.14.3.jar:1.14.3] > at > org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:143) > ~[flink-table_2.11-1.14.3.jar:1.14.3] > at > org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:350) > ~[flink-dist_2.11-1.14.3.jar:1.14.3] > at > org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68) > ~[flink-dist_2.11-1.14.3.jar:1.14.3] > at > org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) > ~[flink-dist_2.11-1.14.3.jar:1.14.3] > at > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:496) > ~[flink-dist_2.11-1.14.3.jar:1.14.3] > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203) > ~[flink-dist_2.11-1.14.3.jar:1.14.3] > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809) > ~[flink-dist_2.11-1.14.3.jar:1.14.3] > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761) > ~[flink-dist_2.11-1.14.3.jar:1.14.3] > at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958) > ~[flink-dist_2.11-1.14.3.jar:1.14.3] > at > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937) > ~[flink-dist_2.11-1.14.3.jar:1.14.3] > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766) > ~[flink-dist_2.11-1.14.3.jar:1.14.3] > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) > ~[flink-dist_2.11-1.14.3.jar:1.14.3] > at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_181] >Caused by: java.lang.NullPointerException > at > org.apache.flink.table.data.writer.BinaryWriter.write(BinaryWriter.java:118) > ~[flink-table_2.11-1.14.3.jar:1.14.3] > at > org.apache.flink.table.runtime.typeutils.RowDataSerializer.toBinaryRow(RowDataSerializer.java:204) > ~[flink-table_2.11-1.14.3.jar:1.14.3] > at > org.apache.flink.table.runtime.typeutils.RowDataSerializer.serialize(RowDataSerializer.java:103) > ~[flink-table_2.11-1.14.3.jar:1.14.3] > at > org.apache.flink.table.runtime.typeutils.RowDataSerializer.serialize(RowDataSerializer.java:48) > ~[flink-table_2.11-1.14.3.jar:1.14.3] > at > org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:168) > ~[flink-dist_2.11-1.14.3.jar:1.14.3] > at > org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:46) > ~[flink-dist_2.11-1.14.3.jar:1.14.3] > at > org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54) > ~[flink-dist_2.11-1.14.3.jar:1.14.3] > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.serializeRecord(RecordWriter.java:132) > ~[flink-dist_2.11-1.14.3.jar:1.14.3] > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:106) > ~[flink-dist_2.11-1.14.3.jar:1.14.3] > at >
Re:flink 的数据传输,是上游算子推给下游, 还是下游算子拉取上游, 设计的考虑是啥?
Hi,主要是pull模型:下游主动拉取上游的数据。可以在下游的消费能力达到极限时,通过反压机制,让上游减少生产的数据。 -- Best! Xuyang 在 2022-09-09 19:04:27,"郑 致远" 写道: >各位大佬好 >请教下, >flink 的数据传输,是上游算子推给下游, 还是下游算子拉取上游, 这么设计的考虑是啥呢?
flink 的数据传输,是上游算子推给下游, 还是下游算子拉取上游, 设计的考虑是啥?
各位大佬好 请教下, flink 的数据传输,是上游算子推给下游, 还是下游算子拉取上游, 这么设计的考虑是啥呢?
回复: Re: Key group is not in KeyGroupRange
Integer[] rebalanceKeys = createRebalanceKeys(parallelism); int rebalanceKeyIndex = new Random().nextInt(parallelism); Integer key = rebalanceKeys[rebalanceKeyIndex]; /** * 构建均衡 KEY 数组 * * @param parallelism 并行度 * @return */ public static Integer[] createRebalanceKeys(int parallelism) { HashMap> groupRanges = new HashMap<>(); int maxParallelism = KeyGroupRangeAssignment.computeDefaultMaxParallelism(parallelism); // 构造多个 key 用于生成足够的 groupRanges int maxRandomKey = parallelism * 10; for (int randomKey = 0; randomKey < maxRandomKey; randomKey++) { int subtaskIndex = KeyGroupRangeAssignment.assignKeyToParallelOperator(randomKey, maxParallelism, parallelism); LinkedHashSet randomKeys = groupRanges.computeIfAbsent(subtaskIndex, k -> new LinkedHashSet<>()); randomKeys.add(randomKey); } Integer[] result = new Integer[parallelism]; for (int i = 0; i < parallelism; i++) { LinkedHashSet ranges = groupRanges.get(i); if (ranges == null || ranges.isEmpty()) { throw new RuntimeException("create rebalance keys error"); } result[i] = ranges.stream().findFirst().get(); } return result; } 发件人: junjie.m...@goupwith.com 发送时间: 2022-09-09 17:52 收件人: user-zh 主题: Re: Re: Key group is not in KeyGroupRange key selector中使用random.nextInt(parallelism) 有时会报错 From: yue ma Date: 2022-09-09 17:41 To: user-zh Subject: Re: Key group is not in KeyGroupRange 你好,可以看一下使用的 key selector 是否稳定,key 是否会变化。 junjie.m...@goupwith.com 于2022年9月9日周五 17:35写道: > hi: > 本人遇到了这个报错: > Key group 51 is not in KeyGroupRange{startKeyGroup=64, endKeyGroup=127}. > Unless you're directly using low level state access APIs, this is most > likely caused by non-deterministic shuffle key (hashCode and equals > implementation). > > 这个报错出现的莫名其妙,想了解这个错误是在什么样的情况下才会导致这个报错的? > 谢谢!! > >
Re: Re: Key group is not in KeyGroupRange
key selector中使用random.nextInt(parallelism) 有时会报错 缪俊杰 成就客户 | 团队合作 开放进取 | 务实 诚信 用心 上海致宇信息技术有限公司 Shanghai Hex Information Technology Co,. Ltd. 地址:上海市徐汇区桂平路418号漕河泾国际孵化中心A区2601室 电话:021-64958718 手机:17858939731 传真:021-64958710 邮编:200233 网站:www.goupwith.com From: yue ma Date: 2022-09-09 17:41 To: user-zh Subject: Re: Key group is not in KeyGroupRange 你好,可以看一下使用的 key selector 是否稳定,key 是否会变化。 junjie.m...@goupwith.com 于2022年9月9日周五 17:35写道: > hi: > 本人遇到了这个报错: > Key group 51 is not in KeyGroupRange{startKeyGroup=64, endKeyGroup=127}. > Unless you're directly using low level state access APIs, this is most > likely caused by non-deterministic shuffle key (hashCode and equals > implementation). > > 这个报错出现的莫名其妙,想了解这个错误是在什么样的情况下才会导致这个报错的? > 谢谢!! > >
Re: Key group is not in KeyGroupRange
你好,可以看一下使用的 key selector 是否稳定,key 是否会变化。 junjie.m...@goupwith.com 于2022年9月9日周五 17:35写道: > hi: > 本人遇到了这个报错: > Key group 51 is not in KeyGroupRange{startKeyGroup=64, endKeyGroup=127}. > Unless you're directly using low level state access APIs, this is most > likely caused by non-deterministic shuffle key (hashCode and equals > implementation). > > 这个报错出现的莫名其妙,想了解这个错误是在什么样的情况下才会导致这个报错的? > 谢谢!! > >
??????Key group is not in KeyGroupRange
?? piao289108...@vip.qq.com ---- ??: "user-zh"
Key group is not in KeyGroupRange
hi: 本人遇到了这个报错: Key group 51 is not in KeyGroupRange{startKeyGroup=64, endKeyGroup=127}. Unless you're directly using low level state access APIs, this is most likely caused by non-deterministic shuffle key (hashCode and equals implementation). 这个报错出现的莫名其妙,想了解这个错误是在什么样的情况下才会导致这个报错的? 谢谢!!
??????Re: ????????????????????????
?? https://www.cnblogs.com/zhikou/p/8537074.html#3954115 piao289108...@vip.qq.com ---- ??: "user-zh"
Re:回复:Re: 用什么工具展示实时报表?
有相关的文章博客可以参考一下吗 在 2022-09-02 09:46:17,"张洋" <289108...@qq.com.INVALID> 写道: >实时报表要体现随时间变化的数据,所以这种复杂多变的数据,建议入库到prometheus,然后用grafana做数据源引入,利用sql查询出各种样式的实时数据,您可以设置1s刷新一次页面,更新数据展示结构 > > > > >张洋 >piao289108...@vip.qq.com > > > > > > > > >--原始邮件-- >发件人: > "user-zh" > >发送时间:2022年9月2日(星期五) 上午9:28 >收件人:"user-zh" >主题:Re:Re: 用什么工具展示实时报表? > > > >实时报表展示也可以通过flink + doris,doris计算结果使用mysql jdbc将doris的计算结果返回给前端展示 > > > > > > > > > > > > > > >在 2022-08-17 10:22:05,"joey" - 目前使用较多开源BI工具:Superset、Redash、Metabase; >- Kafka数据我们使用了Elasticsearch + Kibana 的方式进行探索。 > > 2022年8月16日 14:37,Kyle Zhang > Hi all, > > >经过flink处理过的数据想要做成实时报表,现在业内都在用什么方案?是通过flink写入db,然后用永洪/帆软等定时刷新,还是flink写入kafka,有工具能读取kafka数据源展示? > > Best >