Re: Re: 关于flink table store的疑问

2022-09-09 文章 Jing Ge
我理解,目前大的目标是为了流批一体,设计上实际上是对存储进行了抽象,从某种角度上来看可以理解为存储虚拟化,未来的想象空间要大很多。Iceberg,Hudi这些可以作为底层对接的一种具体实现。

On Fri, Sep 9, 2022 at 2:44 PM Xuyang  wrote:

> Hi,我理解Flink table store主要有以下几个优势:
> 1、减少架构复杂性,不需要额外引入多余的组件
> 2、支持Flink计算中直接使用Flink table store的存储
> 3、毫秒级流式查询和olap能力
>
>
>
>
> --
>
> Best!
> Xuyang
>
>
>
>
>
> 在 2022-09-08 16:09:39,"r pp"  写道:
> >应该是为了 流批一体 。不丢数据
> >
> >Kyle Zhang  于2022年9月8日周四 08:37写道:
> >
> >> Hi all,
> >>   看table
> >> store的介绍也是关于数据湖存储以及用于实时流式读取的,那在定位上与iceberg、hudi等项目有什么不一样么,为什么要再开发一个项目?
> >>
> >> Best.
> >>
> >
> >
> >--
> >Best,
> >  pp
>


Re:Re: 关于 UDAF 里面 ListView 的疑问

2022-09-09 文章 Xuyang
Hi,可以尝试下在createAccumulator中打个断点,然后一步步看为啥在getValue的时候acc变成null了。


我理解如果是“使用 ListView 时,无法正常获得 TypeInference”的话,应该报错,而不应该正确执行但是后面突然null了。 
如果确定是某个地方发生了问题的话,可以在jira里贴一个issue[1]向社区反馈这个问题 ;)


[1]https://issues.apache.org/jira/projects/FLINK/summary




--

Best!
Xuyang





在 2022-09-08 10:48:10,"Zhiwen Sun"  写道:
>hi,
>
>感谢你的回复。
>
>报错是在 getValue 的时候。
>
>   at GroupAggsHandler$439.getValue(Unknown Source)
>   at 
> org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:146)
>
>
>
>我的疑问是使用一个 Class 包装下 ListView 就能正常工作,而直接使用 ListView 是会报错。
>
>比如使用  AggregateFunction  就正常,而使用
>AggregateFunctionListView>  就会 NPE。
>
>
>我怀疑使用 ListView 时,无法正常获得 TypeInference。
>
>
>Zhiwen Sun
>
>
>
>On Wed, Sep 7, 2022 at 11:46 PM Xuyang  wrote:
>
>> Hi,
>> 理论上来说,在你的case中,会先通过createAccumulator方法创建一个ListView作为acc,然后,每一个输入的row都会触发accumulate方法,将数据更新到刚才的acc中,最终通过getValue方法拿到当前acc的值。
>>
>>
>>
>>
>> 实际测试中的NPE发生在更新acc的时候还是getValue的时候呢?可以通过在这三个阶段设一下断点,分别看一下当前持有的acc是不是同一个对象
>>
>>
>>
>>
>> --
>>
>> Best!
>> Xuyang
>>
>>
>>
>>
>>
>> 在 2022-09-07 16:23:25,"Zhiwen Sun"  写道:
>>
>> Hi,
>> 理论上来说,在你的case中,会先通过createAccumulator方法创建一个ListView作为acc,然后,每一个输入的row都会触发accumulate方法,将数据更新到刚才的acc中,最终通过getValue方法拿到当前acc的值。实际测试中的NPE发生在更新acc的时候还是getValue的时候呢?可以通过在这三个阶段设一下断点,分别看一下当前持有的acc是不是同一个对象


Re:Re: 关于flink table store的疑问

2022-09-09 文章 Xuyang
Hi,我理解Flink table store主要有以下几个优势:
1、减少架构复杂性,不需要额外引入多余的组件
2、支持Flink计算中直接使用Flink table store的存储
3、毫秒级流式查询和olap能力




--

Best!
Xuyang





在 2022-09-08 16:09:39,"r pp"  写道:
>应该是为了 流批一体 。不丢数据
>
>Kyle Zhang  于2022年9月8日周四 08:37写道:
>
>> Hi all,
>>   看table
>> store的介绍也是关于数据湖存储以及用于实时流式读取的,那在定位上与iceberg、hudi等项目有什么不一样么,为什么要再开发一个项目?
>>
>> Best.
>>
>
>
>-- 
>Best,
>  pp


Re:这里为什么会报null指针错误,和源表数据有关系吗?

2022-09-09 文章 Xuyang
Hi,看上去是遇到了一条脏数据,问一下是在运行了一段时间之后突然报错的嘛?







--

Best!
Xuyang





At 2022-09-09 11:46:47, "Asahi Lee"  wrote:
>2022-09-09 11:36:42,866 INFO 
>org.apache.flink.runtime.executiongraph.ExecutionGraph   
>[] - Source: HiveSource-ods_jt_hrs.ods_hrmis_HR_EMPL_Education (1/1) 
>(2a68412dab3602a1eeda5a750b308e23) switched from RUNNING to FAILED on 
>container_1658144991761_106260_01_02 @ hhny-cdh05 (dataPort=45015).
>java.lang.RuntimeException: null
>   at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:105)
>  ~[flink-dist_2.11-1.14.3.jar:1.14.3]
>   at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
>  ~[flink-dist_2.11-1.14.3.jar:1.14.3]
>   at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:43)
>  ~[flink-dist_2.11-1.14.3.jar:1.14.3]
>   at 
> org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:196)
>  ~[flink-dist_2.11-1.14.3.jar:1.14.3]
>   at 
> org.apache.flink.streaming.api.operators.source.NoOpTimestampsAndWatermarks$TimestampsOnlyOutput.collect(NoOpTimestampsAndWatermarks.java:97)
>  ~[flink-dist_2.11-1.14.3.jar:1.14.3]
>   at 
> org.apache.flink.streaming.api.operators.source.NoOpTimestampsAndWatermarks$TimestampsOnlyOutput.collect(NoOpTimestampsAndWatermarks.java:91)
>  ~[flink-dist_2.11-1.14.3.jar:1.14.3]
>   at 
> org.apache.flink.connector.file.src.impl.FileSourceRecordEmitter.emitRecord(FileSourceRecordEmitter.java:45)
>  ~[flink-table_2.11-1.14.3.jar:1.14.3]
>   at 
> org.apache.flink.connector.file.src.impl.FileSourceRecordEmitter.emitRecord(FileSourceRecordEmitter.java:35)
>  ~[flink-table_2.11-1.14.3.jar:1.14.3]
>   at 
> org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:143)
>  ~[flink-table_2.11-1.14.3.jar:1.14.3]
>   at 
> org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:350)
>  ~[flink-dist_2.11-1.14.3.jar:1.14.3]
>   at 
> org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
>  ~[flink-dist_2.11-1.14.3.jar:1.14.3]
>   at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
>  ~[flink-dist_2.11-1.14.3.jar:1.14.3]
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:496)
>  ~[flink-dist_2.11-1.14.3.jar:1.14.3]
>   at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
>  ~[flink-dist_2.11-1.14.3.jar:1.14.3]
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809)
>  ~[flink-dist_2.11-1.14.3.jar:1.14.3]
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761)
>  ~[flink-dist_2.11-1.14.3.jar:1.14.3]
>   at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
>  ~[flink-dist_2.11-1.14.3.jar:1.14.3]
>   at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937) 
> ~[flink-dist_2.11-1.14.3.jar:1.14.3]
>   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766) 
> ~[flink-dist_2.11-1.14.3.jar:1.14.3]
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) 
> ~[flink-dist_2.11-1.14.3.jar:1.14.3]
>   at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_181]
>Caused by: java.lang.NullPointerException
>   at 
> org.apache.flink.table.data.writer.BinaryWriter.write(BinaryWriter.java:118) 
> ~[flink-table_2.11-1.14.3.jar:1.14.3]
>   at 
> org.apache.flink.table.runtime.typeutils.RowDataSerializer.toBinaryRow(RowDataSerializer.java:204)
>  ~[flink-table_2.11-1.14.3.jar:1.14.3]
>   at 
> org.apache.flink.table.runtime.typeutils.RowDataSerializer.serialize(RowDataSerializer.java:103)
>  ~[flink-table_2.11-1.14.3.jar:1.14.3]
>   at 
> org.apache.flink.table.runtime.typeutils.RowDataSerializer.serialize(RowDataSerializer.java:48)
>  ~[flink-table_2.11-1.14.3.jar:1.14.3]
>   at 
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:168)
>  ~[flink-dist_2.11-1.14.3.jar:1.14.3]
>   at 
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:46)
>  ~[flink-dist_2.11-1.14.3.jar:1.14.3]
>   at 
> org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54)
>  ~[flink-dist_2.11-1.14.3.jar:1.14.3]
>   at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.serializeRecord(RecordWriter.java:132)
>  ~[flink-dist_2.11-1.14.3.jar:1.14.3]
>   at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:106)
>  ~[flink-dist_2.11-1.14.3.jar:1.14.3]
>   at 
> 

Re:flink 的数据传输,是上游算子推给下游, 还是下游算子拉取上游, 设计的考虑是啥?

2022-09-09 文章 Xuyang
Hi,主要是pull模型:下游主动拉取上游的数据。可以在下游的消费能力达到极限时,通过反压机制,让上游减少生产的数据。







--

Best!
Xuyang





在 2022-09-09 19:04:27,"郑 致远"  写道:
>各位大佬好
>请教下,
>flink 的数据传输,是上游算子推给下游, 还是下游算子拉取上游,  这么设计的考虑是啥呢?


flink 的数据传输,是上游算子推给下游, 还是下游算子拉取上游, 设计的考虑是啥?

2022-09-09 文章 郑 致远
各位大佬好
请教下,
flink 的数据传输,是上游算子推给下游, 还是下游算子拉取上游,  这么设计的考虑是啥呢?


回复: Re: Key group is not in KeyGroupRange

2022-09-09 文章 junjie.m...@goupwith.com

Integer[] rebalanceKeys = createRebalanceKeys(parallelism);
int rebalanceKeyIndex = new Random().nextInt(parallelism);
Integer key = rebalanceKeys[rebalanceKeyIndex];

 /**
 * 构建均衡 KEY 数组
 *
 * @param parallelism 并行度
 * @return
 */
public static Integer[] createRebalanceKeys(int parallelism) {
HashMap> groupRanges = new HashMap<>();
int maxParallelism = 
KeyGroupRangeAssignment.computeDefaultMaxParallelism(parallelism);
// 构造多个 key 用于生成足够的 groupRanges
int maxRandomKey = parallelism * 10;
for (int randomKey = 0; randomKey < maxRandomKey; randomKey++) {
int subtaskIndex = 
KeyGroupRangeAssignment.assignKeyToParallelOperator(randomKey, maxParallelism, 
parallelism);
LinkedHashSet randomKeys = 
groupRanges.computeIfAbsent(subtaskIndex, k -> new LinkedHashSet<>());
randomKeys.add(randomKey);
}

Integer[] result = new Integer[parallelism];
for (int i = 0; i < parallelism; i++) {
LinkedHashSet ranges = groupRanges.get(i);
if (ranges == null || ranges.isEmpty()) {
throw new RuntimeException("create rebalance keys error");
}
result[i] = ranges.stream().findFirst().get();
}
return result;
}

 
发件人: junjie.m...@goupwith.com
发送时间: 2022-09-09 17:52
收件人: user-zh
主题: Re: Re: Key group is not in KeyGroupRange
key selector中使用random.nextInt(parallelism) 有时会报错
 
From: yue ma
Date: 2022-09-09 17:41
To: user-zh
Subject: Re: Key group is not in KeyGroupRange
你好,可以看一下使用的 key selector 是否稳定,key 是否会变化。
junjie.m...@goupwith.com  于2022年9月9日周五 17:35写道:
> hi:
> 本人遇到了这个报错:
> Key group 51 is not in KeyGroupRange{startKeyGroup=64, endKeyGroup=127}.
> Unless you're directly using low level state access APIs, this is most
> likely caused by non-deterministic shuffle key (hashCode and equals
> implementation).
>
> 这个报错出现的莫名其妙,想了解这个错误是在什么样的情况下才会导致这个报错的?
> 谢谢!!
>
>


Re: Re: Key group is not in KeyGroupRange

2022-09-09 文章 junjie.m...@goupwith.com
key selector中使用random.nextInt(parallelism) 有时会报错



缪俊杰
成就客户 | 团队合作 开放进取 | 务实 诚信 用心
上海致宇信息技术有限公司

Shanghai Hex Information Technology Co,. Ltd.
地址:上海市徐汇区桂平路418号漕河泾国际孵化中心A区2601室
电话:021-64958718
手机:17858939731
传真:021-64958710
邮编:200233
网站:www.goupwith.com
 
From: yue ma
Date: 2022-09-09 17:41
To: user-zh
Subject: Re: Key group is not in KeyGroupRange
你好,可以看一下使用的 key selector 是否稳定,key 是否会变化。
 
junjie.m...@goupwith.com  于2022年9月9日周五 17:35写道:
 
> hi:
> 本人遇到了这个报错:
> Key group 51 is not in KeyGroupRange{startKeyGroup=64, endKeyGroup=127}.
> Unless you're directly using low level state access APIs, this is most
> likely caused by non-deterministic shuffle key (hashCode and equals
> implementation).
>
> 这个报错出现的莫名其妙,想了解这个错误是在什么样的情况下才会导致这个报错的?
> 谢谢!!
>
>


Re: Key group is not in KeyGroupRange

2022-09-09 文章 yue ma
你好,可以看一下使用的 key selector 是否稳定,key 是否会变化。

junjie.m...@goupwith.com  于2022年9月9日周五 17:35写道:

> hi:
> 本人遇到了这个报错:
> Key group 51 is not in KeyGroupRange{startKeyGroup=64, endKeyGroup=127}.
> Unless you're directly using low level state access APIs, this is most
> likely caused by non-deterministic shuffle key (hashCode and equals
> implementation).
>
> 这个报错出现的莫名其妙,想了解这个错误是在什么样的情况下才会导致这个报错的?
> 谢谢!!
>
>


??????Key group is not in KeyGroupRange

2022-09-09 文章 ????
??





piao289108...@vip.qq.com








----
??: 
   "user-zh"



Key group is not in KeyGroupRange

2022-09-09 文章 junjie.m...@goupwith.com
hi:
本人遇到了这个报错:
Key group 51 is not in KeyGroupRange{startKeyGroup=64, endKeyGroup=127}. Unless 
you're directly using low level state access APIs, this is most likely caused 
by non-deterministic shuffle key (hashCode and equals implementation).

这个报错出现的莫名其妙,想了解这个错误是在什么样的情况下才会导致这个报错的?
谢谢!!



??????Re: ????????????????????????

2022-09-09 文章 ????
??
 
https://www.cnblogs.com/zhikou/p/8537074.html#3954115





piao289108...@vip.qq.com








----
??: 
   "user-zh"



Re:回复:Re: 用什么工具展示实时报表?

2022-09-09 文章 郑垚(ZhengYao)
有相关的文章博客可以参考一下吗

















在 2022-09-02 09:46:17,"张洋" <289108...@qq.com.INVALID> 写道:
>实时报表要体现随时间变化的数据,所以这种复杂多变的数据,建议入库到prometheus,然后用grafana做数据源引入,利用sql查询出各种样式的实时数据,您可以设置1s刷新一次页面,更新数据展示结构
>
>
>
>
>张洋
>piao289108...@vip.qq.com
>
>
>
>
>
>
>
>
>--原始邮件--
>发件人:   
> "user-zh" 
>   
>发送时间:2022年9月2日(星期五) 上午9:28
>收件人:"user-zh"
>主题:Re:Re: 用什么工具展示实时报表?
>
>
>
>实时报表展示也可以通过flink + doris,doris计算结果使用mysql jdbc将doris的计算结果返回给前端展示
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>在 2022-08-17 10:22:05,"joey" - 目前使用较多开源BI工具:Superset、Redash、Metabase;
>- Kafka数据我们使用了Elasticsearch + Kibana 的方式进行探索。 
>
> 2022年8月16日 14:37,Kyle Zhang  
> Hi all,
> 
> 
>经过flink处理过的数据想要做成实时报表,现在业内都在用什么方案?是通过flink写入db,然后用永洪/帆软等定时刷新,还是flink写入kafka,有工具能读取kafka数据源展示?
> 
> Best
>