Re: [ANNOUNCE] Apache Flink 1.10.3 released

2021-01-28 文章 Yu Li
Thanks Xintong for being our release manager and everyone else who made the
release possible!

Best Regards,
Yu


On Fri, 29 Jan 2021 at 15:05, Xintong Song  wrote:

> The Apache Flink community is very happy to announce the release of Apache
> Flink 1.10.3, which is the third bugfix release for the Apache Flink 1.10
> series.
>
> Apache Flink® is an open-source stream processing framework for
> distributed, high-performing, always-available, and accurate data streaming
> applications.
>
> The release is available for download at:
> https://flink.apache.org/downloads.html
>
> Please check out the release blog post for an overview of the improvements
> for this bugfix release:
> https://flink.apache.org/news/2021/01/29/release-1.10.3.html
>
> The full release notes are available in Jira:
>
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12348668
>
> We would like to thank all contributors of the Apache Flink community who
> made this release possible!
>
> Regards,
> Xintong Song
>


退订

2021-01-28 文章 追梦的废柴



[ANNOUNCE] Apache Flink 1.10.3 released

2021-01-28 文章 Xintong Song
The Apache Flink community is very happy to announce the release of Apache
Flink 1.10.3, which is the third bugfix release for the Apache Flink 1.10
series.

Apache Flink® is an open-source stream processing framework for
distributed, high-performing, always-available, and accurate data streaming
applications.

The release is available for download at:
https://flink.apache.org/downloads.html

Please check out the release blog post for an overview of the improvements
for this bugfix release:
https://flink.apache.org/news/2021/01/29/release-1.10.3.html

The full release notes are available in Jira:
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12348668

We would like to thank all contributors of the Apache Flink community who
made this release possible!

Regards,
Xintong Song


Re: 提交job的命令,./bin/flink run-application -t yarn-application ... 和 ./bin/flink run -m yarn-cluster ...

2021-01-28 文章 lp
应该说是否:1.11和1.12这里这两种提交方式 是不是一样的,只不过命令有了变化?

官网中的摘录如下:

flink-1.11:
Run a single Flink job on YARN

Example:
./bin/flink run -m yarn-cluster ./examples/batch/WordCount.jar

--
flink-1.12:
Per-Job Cluster Mode

Example:
./bin/flink run -t yarn-per-job --detached
./examples/streaming/TopSpeedWindowing.jar



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: reduce函数的trigger问题

2021-01-28 文章 yang nick
窗口没有结束,所有的数据都还在的

xiaolail...@163.com  于2021年1月29日周五 上午11:27写道:

> 您好!最近刚开始学习flink,问一个关于trigger的问题:
>
> 如下的reduce操作:
> env.socketTextStream("localhost", )
> .flatMap(new Splitter())
> .keyBy(value -> value.f0)
> .window(TumblingEventTimeWindows.of(Time.seconds(15)))
> .reduce(new ReduceFunction>() {
> @Override
> public Tuple2 reduce(Tuple2 Integer> value1, Tuple2 value2) throws Exception {
> return new Tuple2<>(value1.f0, value1.f1 +
> value2.f1);
> }
> });
>
> 使用的trigger是:
> @Override
> public Trigger
> getDefaultTrigger(StreamExecutionEnvironment env) {
> return EventTimeTrigger.create();
> }
>
>
> 然后的EventTimeTrigger实现是当watermark漫过当前window之后才触发的,我的疑问是reduce函数不是增量做计算的吗?如果等到watermark漫过当前window之后才触发计算,那岂不是要缓着所有的记录?
> 多谢指导!
>
>
>
>
>
> xiaolail...@163.com
>


回复:关于Flink作业的负载监控 task-load指标

2021-01-28 文章 13051111332
滴滴文章描述如下:
我们一直希望能精确衡量任务的负载状况,使用反压指标指标只能粗略的判断任务的资源够或者不够。
结合新版的 Mailbox 线程模型,所有互斥操作全部运行在 TaskThread 中,只需统计出线程的占用时间,就可以精确计算任务负载的百分比。
未来可以使用指标进行任务的资源推荐,让任务负载维持在一个比较健康的水平
在2021年01月29日 11:59,1305332<1305...@163.com> 写道:


Hi,everyone:
看到滴滴的一篇文章中指出task-load指标,这个线程的占用时间 具体该怎么统计呢?求大神指点

| |
1305332
|
|
1305...@163.com
|
签名由网易邮箱大师定制



关于Flink作业的负载监控 task-load指标

2021-01-28 文章 13051111332


Hi,everyone:
滴滴的一篇文档中提到:


 "我们一直希望能精确衡量任务的负载状况,使用反压指标指标只能粗略的判断任务的资源够或者不够。结合新版的 
Mailbox 线程模型,所有互斥操作全部运行在 TaskThread 中,只需统计出线程的占用时间,就可以精确计算任务负载的百分比。
未来可以使用指标进行任务的资源推荐,让任务负载维持在一个比较健康的水平”
  关于统计出线程的占用时间,这个具体该怎么做呢?




Re: kafka 所有分区无数据的情况下,导致watermark无法前进

2021-01-28 文章 赵一旦
所有分区无数据,为什么还期望watermark推进呢?目的是啥。貌似没啥需要计算的呀。

LakeShen  于2021年1月28日周四 下午7:42写道:

> 如果是窗口类聚合,可以尝试一下自定义窗口 Trigger
>
> Best,
> LakeShen
>
> 林影  于2021年1月28日周四 下午5:46写道:
>
> > Hi, Jessica.J.Wang
> > 开源flink看起来没这个功能哈,文档翻了一遍没找到
> >
> > Jessica.J.Wang  于2021年1月28日周四 下午5:25写道:
> >
> > > 你使用的是什么窗口呢,是 Tumble或者Hop吗,如果没数据 但是想提前输出结果可以用 emit
> > >
> > >
> >
> https://help.aliyun.com/document_detail/98951.html?spm=5176.11065259.1996646101.searchclickresult.513d1037M5VADa
> > >
> > >
> > >
> > > --
> > > Sent from: http://apache-flink.147419.n8.nabble.com/
> > >
> >
>


提交job的命令,./bin/flink run-application -t yarn-application ... 和 ./bin/flink run -m yarn-cluster ...

2021-01-28 文章 lp
如题,在 ./flink --help中看到提交job的命令有两个相似的,貌似都会将jobManager发送yarn
node上去之行,但不明白他们区别,官网也未找到他们的区别,请帮忙解释下他们之间的区别?



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: reduce函数的trigger问题

2021-01-28 文章 赵一旦
此处的trigger你可以理解成是计算结果的输出时机,你的reduce计算还是增量的哈。

xiaolail...@163.com  于2021年1月29日周五 上午11:27写道:

> 您好!最近刚开始学习flink,问一个关于trigger的问题:
>
> 如下的reduce操作:
> env.socketTextStream("localhost", )
> .flatMap(new Splitter())
> .keyBy(value -> value.f0)
> .window(TumblingEventTimeWindows.of(Time.seconds(15)))
> .reduce(new ReduceFunction>() {
> @Override
> public Tuple2 reduce(Tuple2 Integer> value1, Tuple2 value2) throws Exception {
> return new Tuple2<>(value1.f0, value1.f1 +
> value2.f1);
> }
> });
>
> 使用的trigger是:
> @Override
> public Trigger
> getDefaultTrigger(StreamExecutionEnvironment env) {
> return EventTimeTrigger.create();
> }
>
>
> 然后的EventTimeTrigger实现是当watermark漫过当前window之后才触发的,我的疑问是reduce函数不是增量做计算的吗?如果等到watermark漫过当前window之后才触发计算,那岂不是要缓着所有的记录?
> 多谢指导!
>
>
>
>
>
> xiaolail...@163.com
>


Re: 为什么window的reduceFunction不支持RichFunction呢

2021-01-28 文章 赵一旦
不是,flink是提供了richReduce,但不支持基于window的richReduce。
基于window的reduce只支持简单reduce,具体需要做自定义状态计算的,按照注释要求使用windowFunction和ProcessWindowFunction去做呀。

一直都是这样,1.12也是的哈。

Kezhu Wang  于2021年1月29日周五 上午11:40写道:

> reduceFunction 是和 ReducingStateDescriptor 配合的, 并不是
> “window的”。“WindowOperator” 的 function 是 “InternalWindowFunction”,这个可以是
> “RichFunction”。
>
> Flink 中的 state 除了可以绑定 key,也可以绑定 namespace,只是 namespace
> 并没有直接暴露给用户。如果需要的话,可以自己写个 WindowOperator,暴露个 WindowFunction。
>
> Interface WindowFunction {
> // You could do incremental aggregation here.
> void processElement(Context context, Window window, Element element);
>
> void fireWindow(Context context, Window window);
> }
>
> interface WindowedRuntimeContext {
>  State getWindowedState(StateDescriptor descriptor).
> }
>
> 把所有 window concepts 交给 WindowOperator,WindowFunction 作具体业务。
>
> On January 28, 2021 at 20:26:47, 赵一旦 (hinobl...@gmail.com) wrote:
>
> 问题如title,当然当前Flink的API还是可以实现RichReduceFunction的效果的,就是基于windowFunction或者processWindowFuntion。
>
> 但是,windowFunc和reduceFunc最大的区别在于windowFunc是窗口触发操作,而reduce是实时的增量操作。
> 如果说我的窗口计算对于每个record的到来都需要一个极复杂的操作,我更希望在reduce中完成,而不是windowFunc中完成,因为那样会导致整个窗口所有key的数据几乎在同一时刻触发,这回导致压力变高。
>
>
>


关于Flink作业的负载监控 task-load指标

2021-01-28 文章 13051111332


Hi,everyone:
看到滴滴的一篇文章中指出task-load指标,这个线程的占用时间 具体该怎么统计呢?求大神指点

| |
1305332
|
|
1305...@163.com
|
签名由网易邮箱大师定制



Hi??

2021-01-28 文章 Ayesha Johnson
Hi dear friend. how are you doing today and how is business moving? i
contact to buy from your company
kindly send me your latest catalog. also inform me about the
1)Minimum Order Quantity,
2)Delivery time or FOB,
3) payment terms warranty.
Please contact us via email: Your early reply is highly appreciated.
__

Mrs Ayesha Johnson

Sales & Marketing Manager.

National  Emirates Company Limited.

2nd Floor, Office # 205, Al Khabaisi Street, Deira, Dubai
Landmark: Behind Hyundai Showroom

P.O.Box: 97349,
Dubai
Tel: +971 4 7664822

Fax: +971 4 766524.


Re: 为什么window的reduceFunction不支持RichFunction呢

2021-01-28 文章 Kezhu Wang
reduceFunction 是和 ReducingStateDescriptor 配合的, 并不是
“window的”。“WindowOperator” 的 function 是 “InternalWindowFunction”,这个可以是
“RichFunction”。

Flink 中的 state 除了可以绑定 key,也可以绑定 namespace,只是 namespace
并没有直接暴露给用户。如果需要的话,可以自己写个 WindowOperator,暴露个 WindowFunction。

Interface WindowFunction {
// You could do incremental aggregation here.
void processElement(Context context, Window window, Element element);

void fireWindow(Context context, Window window);
}

interface WindowedRuntimeContext {
 State getWindowedState(StateDescriptor descriptor).
}

把所有 window concepts 交给 WindowOperator,WindowFunction 作具体业务。

On January 28, 2021 at 20:26:47, 赵一旦 (hinobl...@gmail.com) wrote:

问题如title,当然当前Flink的API还是可以实现RichReduceFunction的效果的,就是基于windowFunction或者processWindowFuntion。

但是,windowFunc和reduceFunc最大的区别在于windowFunc是窗口触发操作,而reduce是实时的增量操作。
如果说我的窗口计算对于每个record的到来都需要一个极复杂的操作,我更希望在reduce中完成,而不是windowFunc中完成,因为那样会导致整个窗口所有key的数据几乎在同一时刻触发,这回导致压力变高。


Re: 为什么window的reduceFunction不支持RichFunction呢

2021-01-28 文章 Smile
Hi, nobleyd,

请问你是在哪个版本发现 reduceFunction 不支持 RichFunction 呢?
我在1.12 版本试了一下是支持的呀,而且 JavaDoc 里也有 RichReduceFunction 类[1]

[1].
https://ci.apache.org/projects/flink/flink-docs-release-1.12/api/java/org/apache/flink/api/common/functions/RichReduceFunction.html





--
Sent from: http://apache-flink.147419.n8.nabble.com/


reduce函数的trigger问题

2021-01-28 文章 xiaolail...@163.com
您好!最近刚开始学习flink,问一个关于trigger的问题:

如下的reduce操作:
env.socketTextStream("localhost", )
.flatMap(new Splitter())
.keyBy(value -> value.f0)
.window(TumblingEventTimeWindows.of(Time.seconds(15)))
.reduce(new ReduceFunction>() {
@Override
public Tuple2 reduce(Tuple2 value1, Tuple2 value2) throws Exception {
return new Tuple2<>(value1.f0, value1.f1 + value2.f1);
}
});

使用的trigger是:
@Override
public Trigger getDefaultTrigger(StreamExecutionEnvironment 
env) {
return EventTimeTrigger.create();
}

然后的EventTimeTrigger实现是当watermark漫过当前window之后才触发的,我的疑问是reduce函数不是增量做计算的吗?如果等到watermark漫过当前window之后才触发计算,那岂不是要缓着所有的记录?
多谢指导!





xiaolail...@163.com


Re: 对于 Flink 可撤回流做聚合,当上游数据撤回时,最终聚合值变小了,怎么解决这种问题。

2021-01-28 文章 Jessica.J.Wang
要看一下具体的Sql 或者具体的算子

下游的 sink needRetract=false的情况,有些场景可以抑制上游算子的回撤,Retract 可以优化成 Upsert



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: 使用DataStream API + HBaseRowInputFormat读取HBase时表时,Row arity of from (2) does not match this serializers field length (1)

2021-01-28 文章 Jessica.J.Wang
可以参照一下 HBaseTableSource 里面的实现方法

HBaseTableSchema hbaseSchema = new HBaseTableSchema();
hbaseSchema.addColumn(xxx)
hbaseSchema.setRowKey(xxx);


execEnv.createInput(new HBaseRowInputFormat(conf, tableName, hbaseSchema),
getReturnType())
.name(explainSource());



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re:对于 Flink 可撤回流做聚合,当上游数据撤回时,最终聚合值变小了,怎么解决这种问题。

2021-01-28 文章 Michael Ran
以前做过,自定义sink,更新值小于 存储值的时候不更新
在 2021-01-25 16:00:28,"LakeShen"  写道:
>Hi 社区,
>
>之前遇到一个问题,在 Flink 里面,对于一个数据流(能够撤回的)进行聚合时,当这个数据流有大量撤回时,最终聚合值会变小。这种情况之前有看到说加一个
>mini batch 来减轻这种情况的影响,还有别的方法能够解决这个问题吗?
>
>Best,
>LakeShen


Re: 怎么理解 tolerableCheckpointFailureNumber

2021-01-28 文章 Yun Tang
Hi,

tolerableCheckpointFailureNumber 限制的是最大可容忍的连续失败checkpoint计数 
continuousFailureCounter [1],例如将tolerableCheckpointFailureNumber 
设置成3,连续失败3次,continuousFailureCounter 会累计到3,作业就会尝试重启。
如果中间有一个checkpoint成功了,continuousFailureCounter 就会重置为零 [2]。

checkpoint失败后,如果作业没有发生failover,下一次checkpoint还是周期性的触发,并受 
execution.checkpointing.min-pause [3] 等参数的影响。


[1] 
https://github.com/apache/flink/blob/4f5747fa0f7226c780742a4549408a38bc95d052/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java#L51
[2] 
https://github.com/apache/flink/blob/4f5747fa0f7226c780742a4549408a38bc95d052/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java#L161-L171
[3] 
https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/config.html#execution-checkpointing-min-pause

祝好
唐云


From: jiangjiguang719 
Sent: Friday, January 29, 2021 9:35
To: user-zh@flink.apache.org 
Subject: 怎么理解 tolerableCheckpointFailureNumber

tolerableCheckpointFailureNumber 是设置可容忍的checkpoint失败次数,具体怎么理解呢?比如 设置为3
1. 当checkpoint 失败时,该值+1,直到 大于 3,实时作业就发生失败或重启?
2. 当checkpoint 失败时,是立即进行下个checkpoint?还是根据周期设置自动触发?
3. 该值是累加值吗


怎么理解 tolerableCheckpointFailureNumber

2021-01-28 文章 jiangjiguang719
tolerableCheckpointFailureNumber 是设置可容忍的checkpoint失败次数,具体怎么理解呢?比如 设置为3
1. 当checkpoint 失败时,该值+1,直到 大于 3,实时作业就发生失败或重启?
2. 当checkpoint 失败时,是立即进行下个checkpoint?还是根据周期设置自动触发?
3. 该值是累加值吗

Re: key group from xx to yy does not contain zz异常

2021-01-28 文章 restart
感谢老师解答,keyBy的执行逻辑看来我理解的太肤浅了。随机数生成逻辑在keyBy前通过map赋值到具体字段,保证后续keyby时稳定,应该就对了。再次感谢老师指点迷津。



--
Sent from: http://apache-flink.147419.n8.nabble.com/

使用DataStream API + HBaseRowInputFormat读取HBase时表时,Row arity of from (2) does not match this serializers field length (1)

2021-01-28 文章 automths
Hi:
您好,我在使用DataStream API 
读取HBase表时,使用了HBaseRowInputFormat,并根据HBaseTableSchema了schema,代码如下:


val env = StreamExecutionEnvironment.getExecutionEnvironment
val hbaseTableSchema = TableSchema.builder()
  .add(TableColumn.of("id", DataTypes.STRING()))
  .add(TableColumn.of("f1", DataTypes.ROW(DataTypes.FIELD("value", 
DataTypes.STRING()
  .build()
val schema = HBaseTableSchema.fromTableSchema(hbaseTableSchema)


val ds: DataStream[Row] = env.createInput(new HBaseRowInputFormat(
  hbaseConfig(),
  tabelName,
  schema
))
ds.print()
env.execute(this.getClass.getSimpleName)
运行时报了如下错误:

 java.lang.RuntimeException: Row arity of from (2) does not match this 
serializers field length (1).
at 
org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:113)
at 
org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:58)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:715)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
at 
org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:93)
at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:213)




找到了原因是HBaseRowInputFormat源码中:
@Override
public TypeInformation getProducedType() {
// split the fieldNames
String[] famNames = schema.getFamilyNames();
TypeInformation[] typeInfos = new TypeInformation[famNames.length];
int i = 0;
for (String family : famNames) {
typeInfos[i] = new RowTypeInfo(
schema.getQualifierTypes(family),
schema.getQualifierNames(family));
i++;
}
return new RowTypeInfo(typeInfos, famNames);
}
此处在构建TypeInformation时,没有加入rowkey的类型


所以这是一个bug吗?




祝好!
automths





为什么window的reduceFunction不支持RichFunction呢

2021-01-28 文章 赵一旦
问题如title,当然当前Flink的API还是可以实现RichReduceFunction的效果的,就是基于windowFunction或者processWindowFuntion。
但是,windowFunc和reduceFunc最大的区别在于windowFunc是窗口触发操作,而reduce是实时的增量操作。
如果说我的窗口计算对于每个record的到来都需要一个极复杂的操作,我更希望在reduce中完成,而不是windowFunc中完成,因为那样会导致整个窗口所有key的数据几乎在同一时刻触发,这回导致压力变高。


Re: key group from xx to yy does not contain zz异常

2021-01-28 文章 Yun Tang
Hi,

原因是你的key selector引入了随机变量 (也就是下面的方法keyBy),导致其select出来的key不是固定的

public KeySelector keyBy(int parallelism) {
return value -> 
Joiner.on(SeparatorConstant.BAR).join(value.getMetricsId(), 
ThreadLocalRandom.current().nextInt(parallelism));
}

例如原先的key selector选出的key是 key-A,经过取模得到的key group是44,理应将该record发送给下游key 
group包含44的task,但是相关record进入到对应group的task之后,在加入到timer队列的时候,还会再次进行group的计算,由于你的key
 selector有随机性,导致这次选出的key可能是key-B,而针对key-B的取模运算得到的key group是4,也就有可能不在你的task (key 
group 44-45) 中了,导致了最终的异常。

祝好
唐云

From: restart 
Sent: Thursday, January 28, 2021 17:54
To: user-zh@flink.apache.org 
Subject: key group from xx to yy does not contain zz异常

线上部署flink项目时,启动爆如下错误,在测试环境可正常启动,job依赖的flink版本是1.10,flink
集群版本是1.12-SNAPSHOT,与线上一致。有点摸不着头脑,麻烦各位老师帮分析分析
堆栈信息:
java.lang.IllegalArgumentException: key group from 44 to 45 does not contain
4
at
org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:161)
at
org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.globalKeyGroupToLocalIndex(KeyGroupPartitionedPriorityQueue.java:187)
at
org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.computeKeyGroupIndex(KeyGroupPartitionedPriorityQueue.java:182)
at
org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.getKeyGroupSubHeapForElement(KeyGroupPartitionedPriorityQueue.java:176)
at
org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.add(KeyGroupPartitionedPriorityQueue.java:112)
at
org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.registerEventTimeTimer(InternalTimerServiceImpl.java:217)
at
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$Context.registerEventTimeTimer(WindowOperator.java:884)
at
org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger.onElement(EventTimeTrigger.java:42)
at
org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger.onElement(EventTimeTrigger.java:30)
at
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$Context.onElement(WindowOperator.java:898)
at
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:399)
at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:161)
at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:178)
at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153)
at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:351)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:567)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:536)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
at java.lang.Thread.run(Thread.java:748)

代码逻辑大致:
DataStream stream = dataStream
.keyBy(keyBy(globalParallelism))
.window(window(downsampling))
.reduce(reduce(trackerType), processWindow(trackerType),
TypeInformation.of(Metrics.class))
.keyBy(secondKeyBy())

.window(ProcessingTimeSessionWindows.withGap(Time.seconds(10)))
.reduce(reduce(trackerType),
processSecondWindow(trackerType), TypeInformation.of(Metrics.class))
.rebalance()
.addSink(sink())
.setParallelism(globalParallelism/2);

public KeySelector keyBy(int parallelism) {
return value ->
Joiner.on(SeparatorConstant.BAR).join(value.getMetricsId(),ThreadLocalRandom.current().nextInt(parallelism));
}

public KeySelector secondKeyBy() {
return value ->
Joiner.on(SeparatorConstant.BAR).join(value.getMetricsId(),
value.getWindowEnd());
}
备注:二次keyby的原因是为了解决数据倾斜问题,第一个keyby用来基于EventTime的翻滚窗口,第二个keyby使用了基于processTime的session窗口



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re:Re: flink-1.12 通过-t指定模式后无法指定yarn参数

2021-01-28 文章 Yapor
好的 感谢!
在 2021-01-28 15:52:36,"silence"  写道:
>flink1.12后所有的yarn相关的参数通过-D进行指定
>例:-D yarn.application.name=xxx 替代以前的-ynm xxx
>更多配置参考文档https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/config.html#yarn
>
>
>
>--
>Sent from: http://apache-flink.147419.n8.nabble.com/


Re: kafka 所有分区无数据的情况下,导致watermark无法前进

2021-01-28 文章 LakeShen
如果是窗口类聚合,可以尝试一下自定义窗口 Trigger

Best,
LakeShen

林影  于2021年1月28日周四 下午5:46写道:

> Hi, Jessica.J.Wang
> 开源flink看起来没这个功能哈,文档翻了一遍没找到
>
> Jessica.J.Wang  于2021年1月28日周四 下午5:25写道:
>
> > 你使用的是什么窗口呢,是 Tumble或者Hop吗,如果没数据 但是想提前输出结果可以用 emit
> >
> >
> https://help.aliyun.com/document_detail/98951.html?spm=5176.11065259.1996646101.searchclickresult.513d1037M5VADa
> >
> >
> >
> > --
> > Sent from: http://apache-flink.147419.n8.nabble.com/
> >
>


关于flink-shaded-xxx的问题

2021-01-28 文章 赵一旦
如题,我想知道flink shade了多个包,比如jackson,guava等。
其目的是(1)flink用到这些,为了避免冲突所以shade。还是(2)flink推荐用户直接使用flink shade好的这些包?

如上,我想知道是否“推荐”用户直接使用flink
shade的这些包。还是我们自己去依赖自己的包,比如我当前就用到了jackson,以及guava(我直接用了最新的30-jre的版本)。


Re: Flink1.12 批处理模式,分词统计时单词个数为1的单词不会被打印

2021-01-28 文章 Xintong Song
你用的应该是 1.12.0 版本吧。这是一个已知问题 [1],升级到 1.12.1 有修复。

Thank you~

Xintong Song


[1] https://issues.apache.org/jira/browse/FLINK-20764

On Thu, Jan 28, 2021 at 4:55 PM xhyan0427 <15527609...@163.com> wrote:

> 代码:
> val env = StreamExecutionEnvironment.getExecutionEnvironment
>
> env.setRuntimeMode(RuntimeExecutionMode.BATCH)  // 在DataStream
> API上以批处理方式执行
>
> // 本地测试文件
> val inputStream =
> env.readTextFile(getClass.getResource("/hello.txt").getPath)
>
> // 分词统计,问题:批处理模式的时候,sum 为 1 的单词不会被打印
> val resultStream = inputStream
>   .flatMap(_.split(","))
>   .filter(_.nonEmpty)
>   .map((_, 1))
>   .keyBy(_._1)
>   .sum(1)
> resultStream.print()
> env.execute("word count")
>
> 测试文件的数据内容:
> hello,flink
> hello,flink
> hello,hive
> hello,hive
> hello,hbase
> hello,hbase
> hello,scala
> hello,kafka
> hello,kafka
>
>
> 测试结果:hello/flink/hive/hbase/kafka的和大于1,会打印出来;但是 scala的个数为1,不会被打印出来
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


key group from xx to yy does not contain zz异常

2021-01-28 文章 restart
线上部署flink项目时,启动爆如下错误,在测试环境可正常启动,job依赖的flink版本是1.10,flink
集群版本是1.12-SNAPSHOT,与线上一致。有点摸不着头脑,麻烦各位老师帮分析分析
堆栈信息:
java.lang.IllegalArgumentException: key group from 44 to 45 does not contain
4
at
org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:161)
at
org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.globalKeyGroupToLocalIndex(KeyGroupPartitionedPriorityQueue.java:187)
at
org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.computeKeyGroupIndex(KeyGroupPartitionedPriorityQueue.java:182)
at
org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.getKeyGroupSubHeapForElement(KeyGroupPartitionedPriorityQueue.java:176)
at
org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.add(KeyGroupPartitionedPriorityQueue.java:112)
at
org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.registerEventTimeTimer(InternalTimerServiceImpl.java:217)
at
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$Context.registerEventTimeTimer(WindowOperator.java:884)
at
org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger.onElement(EventTimeTrigger.java:42)
at
org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger.onElement(EventTimeTrigger.java:30)
at
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$Context.onElement(WindowOperator.java:898)
at
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:399)
at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:161)
at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:178)
at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153)
at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:351)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:567)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:536)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
at java.lang.Thread.run(Thread.java:748)

代码逻辑大致:
DataStream stream = dataStream
.keyBy(keyBy(globalParallelism))
.window(window(downsampling))
.reduce(reduce(trackerType), processWindow(trackerType),
TypeInformation.of(Metrics.class))
.keyBy(secondKeyBy())
   
.window(ProcessingTimeSessionWindows.withGap(Time.seconds(10)))
.reduce(reduce(trackerType),
processSecondWindow(trackerType), TypeInformation.of(Metrics.class))
.rebalance()
.addSink(sink())
.setParallelism(globalParallelism/2);

public KeySelector keyBy(int parallelism) {
return value ->
Joiner.on(SeparatorConstant.BAR).join(value.getMetricsId(),ThreadLocalRandom.current().nextInt(parallelism));
}

public KeySelector secondKeyBy() {
return value ->
Joiner.on(SeparatorConstant.BAR).join(value.getMetricsId(),
value.getWindowEnd());
}
备注:二次keyby的原因是为了解决数据倾斜问题,第一个keyby用来基于EventTime的翻滚窗口,第二个keyby使用了基于processTime的session窗口



--
Sent from: http://apache-flink.147419.n8.nabble.com/


回复: 关于端到端的延迟监控

2021-01-28 文章 13051111332


谢谢大家,清楚了


| |
1305332
|
|
1305...@163.com
|
签名由网易邮箱大师定制


在2021年01月28日 17:56,Jessica.J.Wang 写道:
官方的LatencyMarker 表示的是数据的流通性, 他和数据是在同一个pipeline 中顺序处理的,如果你的算子都是同步的情况
是可以反应出数据的真实处理延迟,生产上是可以使用的,但是 延迟粒度 metrics.latency.granularity 最好调整成 single或者
operator ,防止latency上报太多 压垮服务


但当你的算子是个异步 用AsyncWaitOperator实现的话,因为latencyMarker并没有像watermark一样
addToWorkQueue,直接处理上报metrics,所以延迟信息就不准确了

所以自己做端到端延迟的话,可以flink sql source 层 抽取其event time时间往下游发送,insert into sink的时候
,写一个udf (currenttime-eventime) 计算其延迟时间,写到外部数据库中,sink最好是influxdb之类的,方便统计






--
Sent from: http://apache-flink.147419.n8.nabble.com/

检查点成功,但从检查点恢复失败。使用了guava的bloomFilter,有人帮忙分析下吗?

2021-01-28 文章 赵一旦
如下,我使用到了Guava的BloomFilter,貌似是基于kyro序列化的。检查点成功了,但是基于检查点恢复任务是失败的。
报错堆栈如下,关键错误是什么无法访问public修饰的成员?

java.lang.Exception: Exception while creating StreamOperatorStateContext.
at org.apache.flink.streaming.api.operators.
StreamTaskStateInitializerImpl.streamOperatorStateContext(
StreamTaskStateInitializerImpl.java:235)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator
.initializeState(AbstractStreamOperator.java:248)
at org.apache.flink.streaming.runtime.tasks.OperatorChain
.initializeStateAndOpenOperators(OperatorChain.java:400)
at org.apache.flink.streaming.runtime.tasks.StreamTask
.lambda$beforeInvoke$2(StreamTask.java:507)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1
.runThrowing(StreamTaskActionExecutor.java:47)
at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(
StreamTask.java:501)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask
.java:531)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.FlinkException: Could not restore keyed
state backend for KeyedProcessOperator_efc74365c4197a3ac1978016263fc7e7_(5/
30) from any of the 1 provided restore options.
at org.apache.flink.streaming.api.operators.BackendRestorerProcedure
.createAndRestore(BackendRestorerProcedure.java:135)
at org.apache.flink.streaming.api.operators.
StreamTaskStateInitializerImpl.keyedStatedBackend(
StreamTaskStateInitializerImpl.java:316)
at org.apache.flink.streaming.api.operators.
StreamTaskStateInitializerImpl.streamOperatorStateContext(
StreamTaskStateInitializerImpl.java:155)
... 9 more
Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed
when trying to restore heap backend
at org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder
.build(HeapKeyedStateBackendBuilder.java:116)
at org.apache.flink.runtime.state.filesystem.FsStateBackend
.createKeyedStateBackend(FsStateBackend.java:540)
at org.apache.flink.runtime.state.filesystem.FsStateBackend
.createKeyedStateBackend(FsStateBackend.java:100)
at org.apache.flink.runtime.state.StateBackend.createKeyedStateBackend(
StateBackend.java:178)
at org.apache.flink.streaming.api.operators.
StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(
StreamTaskStateInitializerImpl.java:299)
at org.apache.flink.streaming.api.operators.BackendRestorerProcedure
.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
at org.apache.flink.streaming.api.operators.BackendRestorerProcedure
.createAndRestore(BackendRestorerProcedure.java:121)
... 11 more
Caused by: com.esotericsoftware.kryo.KryoException: Error constructing
instance of class: com.google.common.hash.LongAdder
Serialization trace:
bitCount (com.google.common.hash.BloomFilterStrategies$LockFreeBitArray)
bits (com.google.common.hash.BloomFilter)
at com.twitter.chill.Instantiators$$anon$1.newInstance(KryoBase.scala:
136)
at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1061)
at com.esotericsoftware.kryo.serializers.FieldSerializer.create(
FieldSerializer.java:547)
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(
FieldSerializer.java:523)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField
.java:106)
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(
FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:730)
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField
.java:113)
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(
FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
at com.esotericsoftware.kryo.serializers.MapSerializer.read(
MapSerializer.java:143)
at com.esotericsoftware.kryo.serializers.MapSerializer.read(
MapSerializer.java:21)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer
.deserialize(KryoSerializer.java:346)
at org.apache.flink.runtime.state.heap.StateTableByKeyGroupReaders
.lambda$createV2PlusReader$0(StateTableByKeyGroupReaders.java:77)
at org.apache.flink.runtime.state.
KeyGroupPartitioner$PartitioningResultKeyGroupReader.readMappingsInKeyGroup(
KeyGroupPartitioner.java:297)
at org.apache.flink.runtime.state.heap.HeapRestoreOperation
.readKeyGroupStateData(HeapRestoreOperation.java:299)
at org.apache.flink.runtime.state.heap.HeapRestoreOperation
.readStateHandleStateData(HeapRestoreOperation.java:260)
at org.apache.flink.runtime.state.heap.HeapRestoreOperation.restore(
HeapRestoreOperation.java:160)
at org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder

Re: 关于端到端的延迟监控

2021-01-28 文章 Jessica.J.Wang
官方的LatencyMarker 表示的是数据的流通性, 他和数据是在同一个pipeline 中顺序处理的,如果你的算子都是同步的情况
是可以反应出数据的真实处理延迟,生产上是可以使用的,但是 延迟粒度 metrics.latency.granularity 最好调整成 single或者
operator ,防止latency上报太多 压垮服务


但当你的算子是个异步 用AsyncWaitOperator实现的话,因为latencyMarker并没有像watermark一样
addToWorkQueue,直接处理上报metrics,所以延迟信息就不准确了

所以自己做端到端延迟的话,可以flink sql source 层 抽取其event time时间往下游发送,insert into sink的时候
,写一个udf (currenttime-eventime) 计算其延迟时间,写到外部数据库中,sink最好是influxdb之类的,方便统计






--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: kafka 所有分区无数据的情况下,导致watermark无法前进

2021-01-28 文章 Jessica.J.Wang
你使用的是什么窗口呢,是 Tumble或者Hop吗,如果没数据 但是想提前输出结果可以用 emit
https://help.aliyun.com/document_detail/98951.html?spm=5176.11065259.1996646101.searchclickresult.513d1037M5VADa



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Flink1.12 批处理模式,分词统计时单词个数为1的单词不会被打印

2021-01-28 文章 xhyan0427
代码:
val env = StreamExecutionEnvironment.getExecutionEnvironment
  
env.setRuntimeMode(RuntimeExecutionMode.BATCH)  // 在DataStream
API上以批处理方式执行

// 本地测试文件
val inputStream =
env.readTextFile(getClass.getResource("/hello.txt").getPath)

// 分词统计,问题:批处理模式的时候,sum 为 1 的单词不会被打印
val resultStream = inputStream
  .flatMap(_.split(","))
  .filter(_.nonEmpty)
  .map((_, 1))
  .keyBy(_._1)
  .sum(1)
resultStream.print()
env.execute("word count")

测试文件的数据内容:
hello,flink
hello,flink
hello,hive
hello,hive
hello,hbase
hello,hbase
hello,scala
hello,kafka
hello,kafka


测试结果:hello/flink/hive/hbase/kafka的和大于1,会打印出来;但是 scala的个数为1,不会被打印出来



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: 关于端到端的延迟监控

2021-01-28 文章 zelin jin
每一条records处理过程中透传开始时间,在sink算子通过metrics上报opentsdb、Prometheus

等时间序列数据库,最后通过grafana等可视化工具展示。

wpp <1215303...@qq.com> 于2021年1月28日周四 下午2:53写道:

> 这个延迟,只是给一个参考意义吧,
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: kafka 所有分区无数据的情况下,导致watermark无法前进

2021-01-28 文章 林影
Hi,刘小红:
这个我查阅过,实验过,通过idle source 是无法解决的。
可以查看这个链接:http://apache-flink.147419.n8.nabble.com/Flink-SQL-td4535.html

在我的这个场景里面,上游已经配置了idle source,上游如果突然没有数据了,下游的flink 窗口还是无法关闭输出结果。
这个说明idle source 无法解决这个问题

刘小红 <18500348...@163.com> 于2021年1月28日周四 下午3:01写道:

> 可以调用WatermarkStrategy.withIdleness(Duration idleTimeout)
> 指定空闲超时时间,这样不会影响水印的进度,进而影响下游算子操作
>
>
> | |
> 刘小红
> |
> |
> 18500348...@163.com
> |
> 签名由网易邮箱大师定制
> 在2021年1月28日 14:42,wpp<1215303...@qq.com> 写道:
> 可以按照proceeTime来处理吧
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>