?????? ??????????????????????flink state

2021-01-22 文章 ??????
TTL??keybykey1state??




----
??: ""https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#session-windows
 
  news_...@163.com 

求一个简单的示例,flink写orc格式文件,对于Map复杂类型的写法。

2021-01-22 文章 赵一旦
目前通过自定义OrcBulkWriterFactory方式,拿到一个一个的ColumnVector,然后设置值。
对于简单类型,API看起来很清晰,但是Map类型没看懂怎么写。如下,对于serverTime是INT类型,直接设置vector[rowId]即可。那么对于MapColumnVector怎么设置呢,将多个key-value对写进去具体怎么写呢。

serverTimeColumnVector.vector[rowId] = ele.getTimestamp();

MapColumnVector dColumnVector = (MapColumnVector) batch.cols[2];


Re: Re: Re: Flink写hdfs提交任务就报错。Recoverable writers on Hadoop are only supported for HDFS

2021-01-22 文章 赵一旦
此外,写ORC格式文件,对于Map格式的有人知道怎么写的话给个示例吧。
如下,拿到MapColumnVector之后怎么写呢,目前非Map的简单字段都比较清晰,直接设置xxxColumnVector.vector[rowId]的值即可。但是MapColumnVector的API比较乱,没看懂怎么用。

MapColumnVector dColumnVector = (MapColumnVector) batch.cols[2];


赵一旦  于2021年1月23日周六 下午1:42写道:

> 已解决。覆盖了flink这部分源码去除了对非hdfs的schema限制。
>
> 张锴  于2021年1月21日周四 下午7:35写道:
>
>> @赵一旦
>> 另外,上次我还提了一个问题请教你,我试了你说的那个想法,但是好像有点问题,你可以看一下
>>
>> 张锴  于2021年1月21日周四 下午7:13写道:
>>
>> > 我用的flink 1.10版,FlieSink就是BucketingSink,我是用这个写hdfs的
>> >
>> > 赵一旦  于2021年1月21日周四 下午7:05写道:
>> >
>> >> @Michael Ran; 嗯嗯,没关系。
>> >>
>> >> @张锴 你说的是flink哪个版本的connector,stream or sql。我搜了下我的没有。我是1.12,stream。
>> >>
>> >>
>> 目前看文档有streamFileSink,还有FileSink,从文档内容来看使用方式差不多。我计划试一下FileSink,但不清楚FileSink和StreamFileSink啥区别,是否都能写hadoop类文件系统,因为涉及是否原子写,比较分布式文件系统不支持追加和编辑等。
>> >>
>> >> Michael Ran  于2021年1月21日周四 下午7:01写道:
>> >>
>> >> >
>> >> >
>> >>
>> 很抱歉,我已经很久没用过这个了。但是可以根据异常信息以及API源码执行进行分析,确定是否能直接写入。如果你要写入自定义的文件系统,那么只能实现自己的sink方式。或者你的文件系统的写入方式兼容hdfs的上层API可以参考各个sink端的写法
>> >> > 在 2021-01-21 18:45:06,"张锴"  写道:
>> >> > >import
>> >> org.apache.flink.streaming.connectors.fs.bucketing.{BucketingSink,
>> >> > >DateTimeBucketer}
>> >> > >
>> >> > >sink.setBucketer sink.setWriter用这种方式试试
>> >> > >
>> >> > >
>> >> > >
>> >> > >赵一旦  于2021年1月21日周四 下午6:37写道:
>> >> > >
>> >> > >> @Michael Ran
>> >> > >> 然后有什么解决方案吗,我这个是使用flink的streamFileSink方式写hdfs的时候出现的异常。
>> >> > >>
>> >> > >> Michael Ran  于2021年1月21日周四 下午5:23写道:
>> >> > >>
>> >> > >> > 这里应该是用了hdfs 的特定API吧,文件系统没兼容public
>> >> > >> > HadoopRecoverableWriter(org.apache.hadoop.fs.FileSystem fs)
>> {...}
>> >> > >> > 在 2021-01-21 17:18:23,"赵一旦"  写道:
>> >> > >> > >具体报错信息如下:
>> >> > >> > >
>> >> > >> > >java.lang.UnsupportedOperationException: Recoverable writers on
>> >> > Hadoop
>> >> > >> are
>> >> > >> > >only supported for HDFS
>> >> > >> > >at
>> >> > org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.(
>> >> > >> > >HadoopRecoverableWriter.java:61)
>> >> > >> > >at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem
>> >> > >> > >.createRecoverableWriter(HadoopFileSystem.java:210)
>> >> > >> > >at org.apache.flink.core.fs.SafetyNetWrapperFileSystem
>> >> > >> > >.createRecoverableWriter(SafetyNetWrapperFileSystem.java:69)
>> >> > >> > >at
>> org.apache.flink.streaming.api.functions.sink.filesystem.
>> >> > >> >
>> >> >
>> >StreamingFileSink$RowFormatBuilder.createBucketWriter(StreamingFileSink
>> >> > >> > >.java:260)
>> >> > >> > >at
>> org.apache.flink.streaming.api.functions.sink.filesystem.
>> >> > >> >
>> >> > >> >
>> >> > >>
>> >> >
>> >>
>> >StreamingFileSink$RowFormatBuilder.createBuckets(StreamingFileSink.java:270)
>> >> > >> > >at
>> org.apache.flink.streaming.api.functions.sink.filesystem.
>> >> > >> > >StreamingFileSink.initializeState(StreamingFileSink.java:412)
>> >> > >> > >at
>> >> > org.apache.flink.streaming.util.functions.StreamingFunctionUtils
>> >> > >> > >.tryRestoreFunction(StreamingFunctionUtils.java:185)
>> >> > >> > >at
>> >> > org.apache.flink.streaming.util.functions.StreamingFunctionUtils
>> >> > >> > >.restoreFunctionState(StreamingFunctionUtils.java:167)
>> >> > >> > >at
>> >> > >> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator
>> >> > >> > >.initializeState(AbstractUdfStreamOperator.java:96)
>> >> > >> > >at
>> >> > >>
>> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler
>> >> > >> > >.initializeOperatorState(StreamOperatorStateHandler.java:107)
>> >> > >> > >at
>> >> > org.apache.flink.streaming.api.operators.AbstractStreamOperator
>> >> > >> > >.initializeState(AbstractStreamOperator.java:264)
>> >> > >> > >at org.apache.flink.streaming.runtime.tasks.OperatorChain
>> >> > >> > >.initializeStateAndOpenOperators(OperatorChain.java:400)
>> >> > >> > >at org.apache.flink.streaming.runtime.tasks.StreamTask
>> >> > >> > >.lambda$beforeInvoke$2(StreamTask.java:507)
>> >> > >> > >at
>> >> > >>
>> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1
>> >> > >> > >.runThrowing(StreamTaskActionExecutor.java:47)
>> >> > >> > >at
>> >> > org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(
>> >> > >> > >StreamTask.java:501)
>> >> > >> > >at
>> >> > >> >
>> >> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask
>> >> > >> > >.java:531)
>> >> > >> > >at
>> >> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
>> >> > >> > >at
>> >> org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
>> >> > >> > >at java.lang.Thread.run(Thread.java:748)
>> >> > >> > >
>> >> > >> > >
>> >> > >> > >赵一旦  于2021年1月21日周四 下午5:17写道:
>> >> > >> > >
>> >> > >> > >> Recoverable writers on Hadoop are only supported for HDFS
>> >> > >> > >>
>> >> > >> > >> 如上,我们用的hadoop协议的,但底层不是hdfs,是公司自研的分布式文件系统。
>> >> > >> > >>
>> >> > >> > >> 使用spark写,spark-sql读等都没问题。但是flink写和读当前都没尝试成功。
>> >> > >> > >>
>> >> > >> > >>
>> >> > >> > >>
>> >> > >> >
>> >> > >>
>> >> >
>> >>
>> >
>>
>


Re: Re: Re: Flink写hdfs提交任务就报错。Recoverable writers on Hadoop are only supported for HDFS

2021-01-22 文章 赵一旦
已解决。覆盖了flink这部分源码去除了对非hdfs的schema限制。

张锴  于2021年1月21日周四 下午7:35写道:

> @赵一旦
> 另外,上次我还提了一个问题请教你,我试了你说的那个想法,但是好像有点问题,你可以看一下
>
> 张锴  于2021年1月21日周四 下午7:13写道:
>
> > 我用的flink 1.10版,FlieSink就是BucketingSink,我是用这个写hdfs的
> >
> > 赵一旦  于2021年1月21日周四 下午7:05写道:
> >
> >> @Michael Ran; 嗯嗯,没关系。
> >>
> >> @张锴 你说的是flink哪个版本的connector,stream or sql。我搜了下我的没有。我是1.12,stream。
> >>
> >>
> 目前看文档有streamFileSink,还有FileSink,从文档内容来看使用方式差不多。我计划试一下FileSink,但不清楚FileSink和StreamFileSink啥区别,是否都能写hadoop类文件系统,因为涉及是否原子写,比较分布式文件系统不支持追加和编辑等。
> >>
> >> Michael Ran  于2021年1月21日周四 下午7:01写道:
> >>
> >> >
> >> >
> >>
> 很抱歉,我已经很久没用过这个了。但是可以根据异常信息以及API源码执行进行分析,确定是否能直接写入。如果你要写入自定义的文件系统,那么只能实现自己的sink方式。或者你的文件系统的写入方式兼容hdfs的上层API可以参考各个sink端的写法
> >> > 在 2021-01-21 18:45:06,"张锴"  写道:
> >> > >import
> >> org.apache.flink.streaming.connectors.fs.bucketing.{BucketingSink,
> >> > >DateTimeBucketer}
> >> > >
> >> > >sink.setBucketer sink.setWriter用这种方式试试
> >> > >
> >> > >
> >> > >
> >> > >赵一旦  于2021年1月21日周四 下午6:37写道:
> >> > >
> >> > >> @Michael Ran
> >> > >> 然后有什么解决方案吗,我这个是使用flink的streamFileSink方式写hdfs的时候出现的异常。
> >> > >>
> >> > >> Michael Ran  于2021年1月21日周四 下午5:23写道:
> >> > >>
> >> > >> > 这里应该是用了hdfs 的特定API吧,文件系统没兼容public
> >> > >> > HadoopRecoverableWriter(org.apache.hadoop.fs.FileSystem fs) {...}
> >> > >> > 在 2021-01-21 17:18:23,"赵一旦"  写道:
> >> > >> > >具体报错信息如下:
> >> > >> > >
> >> > >> > >java.lang.UnsupportedOperationException: Recoverable writers on
> >> > Hadoop
> >> > >> are
> >> > >> > >only supported for HDFS
> >> > >> > >at
> >> > org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.(
> >> > >> > >HadoopRecoverableWriter.java:61)
> >> > >> > >at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem
> >> > >> > >.createRecoverableWriter(HadoopFileSystem.java:210)
> >> > >> > >at org.apache.flink.core.fs.SafetyNetWrapperFileSystem
> >> > >> > >.createRecoverableWriter(SafetyNetWrapperFileSystem.java:69)
> >> > >> > >at org.apache.flink.streaming.api.functions.sink.filesystem.
> >> > >> >
> >> >
> >StreamingFileSink$RowFormatBuilder.createBucketWriter(StreamingFileSink
> >> > >> > >.java:260)
> >> > >> > >at org.apache.flink.streaming.api.functions.sink.filesystem.
> >> > >> >
> >> > >> >
> >> > >>
> >> >
> >>
> >StreamingFileSink$RowFormatBuilder.createBuckets(StreamingFileSink.java:270)
> >> > >> > >at org.apache.flink.streaming.api.functions.sink.filesystem.
> >> > >> > >StreamingFileSink.initializeState(StreamingFileSink.java:412)
> >> > >> > >at
> >> > org.apache.flink.streaming.util.functions.StreamingFunctionUtils
> >> > >> > >.tryRestoreFunction(StreamingFunctionUtils.java:185)
> >> > >> > >at
> >> > org.apache.flink.streaming.util.functions.StreamingFunctionUtils
> >> > >> > >.restoreFunctionState(StreamingFunctionUtils.java:167)
> >> > >> > >at
> >> > >> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator
> >> > >> > >.initializeState(AbstractUdfStreamOperator.java:96)
> >> > >> > >at
> >> > >> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler
> >> > >> > >.initializeOperatorState(StreamOperatorStateHandler.java:107)
> >> > >> > >at
> >> > org.apache.flink.streaming.api.operators.AbstractStreamOperator
> >> > >> > >.initializeState(AbstractStreamOperator.java:264)
> >> > >> > >at org.apache.flink.streaming.runtime.tasks.OperatorChain
> >> > >> > >.initializeStateAndOpenOperators(OperatorChain.java:400)
> >> > >> > >at org.apache.flink.streaming.runtime.tasks.StreamTask
> >> > >> > >.lambda$beforeInvoke$2(StreamTask.java:507)
> >> > >> > >at
> >> > >> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1
> >> > >> > >.runThrowing(StreamTaskActionExecutor.java:47)
> >> > >> > >at
> >> > org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(
> >> > >> > >StreamTask.java:501)
> >> > >> > >at
> >> > >> >
> >> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask
> >> > >> > >.java:531)
> >> > >> > >at
> >> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
> >> > >> > >at
> >> org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
> >> > >> > >at java.lang.Thread.run(Thread.java:748)
> >> > >> > >
> >> > >> > >
> >> > >> > >赵一旦  于2021年1月21日周四 下午5:17写道:
> >> > >> > >
> >> > >> > >> Recoverable writers on Hadoop are only supported for HDFS
> >> > >> > >>
> >> > >> > >> 如上,我们用的hadoop协议的,但底层不是hdfs,是公司自研的分布式文件系统。
> >> > >> > >>
> >> > >> > >> 使用spark写,spark-sql读等都没问题。但是flink写和读当前都没尝试成功。
> >> > >> > >>
> >> > >> > >>
> >> > >> > >>
> >> > >> >
> >> > >>
> >> >
> >>
> >
>


Re: Flink的StreamFileSink和1.12提供的FileSink中,BucketsBuilder的createBucketWriter中仅支持recoverableWriter。

2021-01-22 文章 赵一旦
已解决。重改写了flink源码覆盖了这部分限制就可以了。

赵一旦  于2021年1月22日周五 上午10:17写道:

> 如题,为什么仅支持recoverableWriter,如果我使用的文件系统不支持怎么办呢,必须自定义sink吗?
>
>
> 我这边用的是一个公司自研的大型分布式文件系统,支持hadoop协议(但不清楚是否所有特性都支持),目前使用streamFileSink和FileSink貌似都无法正常写。
>
> 不清楚是否有其他问题,至少当前是卡住在这个recoverable上了。
>
> 报错是只有hdfs才支持recoverableWriter。
>
> 有人知道如何解吗?
>


?????? ????????????Orders????????????????join????????????????????

2021-01-22 文章 ??????
=_= ??thank you??



----
??: "yang nick"

Re: 公司强行拆分Orders主表,遇到事实表join请各位大神们支个招。

2021-01-22 文章 yang nick
我感觉你这个用实时很难做,涉及到状态更新的无限流,需要配置 state ttl

徐州州 <25977...@qq.com> 于2021年1月23日周六 上午11:23写道:

>
> 我遇到的难题是,拒收订单想拿到payAment字段必须扫描全量的order_money表。order_money是下单时候才会产生,我拒收订单根本不知道它的下单时间根本不知道怎么拿,而且order_money没有任何标记,我全量扫描money表程序OOM。我的数据是通过Canal监控过来的,我需要写flink-sql来进行join。
>
>
>
>
> --原始邮件--
> 发件人: "yang nick" 发送时间: 2021年1月23日(星期六) 中午11:16
> 收件人: "user-zh" 主题: Re: 公司强行拆分Orders主表,遇到事实表join请各位大神们支个招。
>
>
>
> 我觉的既然是做数仓,这里没啥好纠结的,数据已经在数仓里面了,做成宽表就行了
>
> 徐州州 <25977...@qq.com 于2021年1月23日周六 上午11:11写道:
>
> 
> 
> 我起初也有这个想法,但是他们业务告诉我,order_reject表是单独存在的。拒收单子只会在order_reject出现,而order_money只在它下单的时候写入,拒收订单数据过来order_money不会有任何变化,所以我即使拿到每天的拒收订单去join也是要读取全量order_money。
> 
> 
> 
> 
>  --nbsp;原始邮件nbsp;--
>  发件人: "yang nick"  发送时间: 2021年1月23日(星期六) 中午11:04
>  收件人: "user-zh"  主题: Re: 公司强行拆分Orders主表,遇到事实表join请各位大神们支个招。
> 
> 
> 
>  把拆开的表在join起来嘛
> 
>  徐州州 <25977...@qq.comgt; 于2021年1月23日周六 上午10:48写道:
> 
>  gt;
> 
> 请教各位大佬一个业务问题,(当天日报业务)版本1:我可以通过拒收时间直接拿到订单主表当天的拒收订单和payAmont金额,版本2:现在公司订单主表重做,拒收订单和payAmont被拆分出两张表,Order_reject和Order_money,现在我的问题是就算我可以从(拒收订单表)中通过时间拿到当天拒收订单,但是PayAmont无法通过时间在Order_money表拿到,PayAmont是在下单的时候才会生成,这样我当天拒收业务都要扫描全量的order_money表拿到对应的PayAmont金额字段,有什么好的处理方法吗
>  gt;
> 
> ?我目前的处理方法是强行读取Order_money的createTime-30day来,尽可能的让拒收订单找到对应的payAmont字段,不只是只拆分出一张order_money还有order_extend还有好几张表,我是做数仓的,我就没见过这么干的。强行把度量值字段拆分出来。请各位大神支个招。


Re: flink-sql-gateway支持远程吗

2021-01-22 文章 yang nick
可以试试zeppelin

罗显宴 <15927482...@163.com> 于2021年1月23日周六 上午11:19写道:

>
> 大佬们,我试过flinksql-client和flink-sql-gateway发现都是把代码提交到本地集群运行,好像不能提交到指定远程集群上执行任务,请问大佬们,怎么提交代码到远程集群运行
>
>
> | |
> 15927482803
> |
> |
> 邮箱:15927482...@163.com
> |
>
> 签名由 网易邮箱大师 定制


Re: 公司强行拆分Orders主表,遇到事实表join请各位大神们支个招。

2021-01-22 文章 yang nick
我觉的既然是做数仓,这里没啥好纠结的,数据已经在数仓里面了,做成宽表就行了

徐州州 <25977...@qq.com> 于2021年1月23日周六 上午11:11写道:

>
> 我起初也有这个想法,但是他们业务告诉我,order_reject表是单独存在的。拒收单子只会在order_reject出现,而order_money只在它下单的时候写入,拒收订单数据过来order_money不会有任何变化,所以我即使拿到每天的拒收订单去join也是要读取全量order_money。
>
>
>
>
> --原始邮件--
> 发件人: "yang nick" 发送时间: 2021年1月23日(星期六) 中午11:04
> 收件人: "user-zh" 主题: Re: 公司强行拆分Orders主表,遇到事实表join请各位大神们支个招。
>
>
>
> 把拆开的表在join起来嘛
>
> 徐州州 <25977...@qq.com 于2021年1月23日周六 上午10:48写道:
>
> 
> 请教各位大佬一个业务问题,(当天日报业务)版本1:我可以通过拒收时间直接拿到订单主表当天的拒收订单和payAmont金额,版本2:现在公司订单主表重做,拒收订单和payAmont被拆分出两张表,Order_reject和Order_money,现在我的问题是就算我可以从(拒收订单表)中通过时间拿到当天拒收订单,但是PayAmont无法通过时间在Order_money表拿到,PayAmont是在下单的时候才会生成,这样我当天拒收业务都要扫描全量的order_money表拿到对应的PayAmont金额字段,有什么好的处理方法吗
> 
> ?我目前的处理方法是强行读取Order_money的createTime-30day来,尽可能的让拒收订单找到对应的payAmont字段,不只是只拆分出一张order_money还有order_extend还有好几张表,我是做数仓的,我就没见过这么干的。强行把度量值字段拆分出来。请各位大神支个招。


?????? ????????????Orders????????????????join????????????????????

2021-01-22 文章 ??????
order_reject??order_rejectorder_money??order_money??join??order_money??




----
??: "yang nick"

回复:请教关于Flink yarnship的使用

2021-01-22 文章 叶贤勋
URL url = this.getClass().getClassLoader().getResource("conf”);

String dir = url.getFile();
dir目录下应该会包含ship的配置文件,你可以试下。
在2021年01月22日 15:38,Yan Tang 写道:
我把配置和jar包分开了,用-yt option将配置文件Ship到yarn cluster中,但是在获取配置文件的时候,老是拿不到,有人有相关经验么?
我的提交命令:
-yt /path/to/conf

code:
this.getClass().getResourceAsStream("conf/cmp_online.cfg")
但一直返回null.







--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: 请教关于Flink yarnship的使用

2021-01-22 文章 silence
 你可以尝试同时指定-C "file:///path/to/conf/cmp_online.cfg" 以及 -yt /path/to/conf 来进行测试
然后代码里这么获取this.getClass().getResourceAsStream("cmp_online.cfg")



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: 请教关于Flink yarnship的使用

2021-01-22 文章 Yan Tang
如果-yt 不适用我这种场景,真不知道这个option是做什么的了。在spark中我用的就是--files,可以达到我想要的效果。



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: 根据业务需求选择合适的flink state

2021-01-22 文章 张锴
@赵一旦
可以添加一下微信好友吗,具体的实践上还有点问题,我是在window后直接reduce(new myReduceFunc(),new
AssignWindowProcessFunc())自定义了这两个方法,但是效果还是有点问题,不知道我的写法是不是有问题

赵一旦  于2021年1月22日周五 上午10:10写道:

> 我理解你要的最终mysql结果表是:
> 直播间ID;用户ID;上线时间;下线时间;durationn=(下线时间 - 上线时间);
>
> 如果user1在直播间1,一天内出现10次,就出现10个记录,分别记录了每次的duration。
>
>
> 如上按照我的方案就可以实现哈。
>
> xuhaiLong  于2021年1月22日周五 上午10:03写道:
>
> > 可以试试这样,mysql 中 设置唯一键为窗口的 startTime 和
> > userId,然后对用户的每个窗口做停留时间的计算,最终会同一个用户在一天会产生多条记录,不过窗口的 startTime 不同,取值的时候sum
> 试试?
> >
> >
> > 在2021年1月21日 18:24,张锴 写道:
> > 你好,之前我用了你上诉的方法出现一个问题,我并没有用min/max,我是在procss方法里用的context.window.getStart和
> >
> >
> context.window.getEnd作为开始和结束时间的,感觉这样也能获得最大和最小值,但是出来的数据最长停留了4分多钟,我跑的离线任务停留的时长有几个小时的都有,感觉出来的效果有问题。
> > 下面是我的部分代码逻辑:
> >
> > val ds = dataStream
> > .filter(_.liveType == 1)
> > .keyBy(1, 2)
> > .window(EventTimeSessionWindows.withGap(Time.minutes(1)))
> > .process(new myProcessWindow()).uid("process-id")
> >
> > class myProcessWindow() extends
> > ProcessWindowFunction[CloudLiveLogOnLine, CloudliveWatcher, Tuple,
> > TimeWindow] {
> >
> > override def process(key: Tuple, context: Context, elements:
> > Iterable[CloudLiveLogOnLine], out: Collector[CloudliveWatcher]): Unit
> > = {
> > var startTime = context.window.getStart //定义第一个元素进入窗口的开始时间
> > var endTime = context.window.getEnd //定义最后一个元素进入窗口的时间
> >
> > val currentDate = DateUtil.currentDate
> > val created_time = currentDate
> > val modified_time = currentDate
> > 。。。
> >
> > val join_time: String =
> > DateUtil.convertTimeStamp2DateStr(startTime,
> > DateUtil.SECOND_DATE_FORMAT)
> > val leave_time:String = DateUtil.convertTimeStamp2DateStr(endTime,
> > DateUtil.SECOND_DATE_FORMAT)
> > val duration = (endTime - startTime) / 1000  //停留多少秒
> > val duration_time = DateUtil.secondsToFormat(duration)  //停留时分秒
> > out.collect(CloudliveWatcher(id, partnerId, courseId, customerId,
> > courseNumber, nickName, ip, device_type, net_opretor, net_type, area,
> > join_time, leave_time, created_time, modified_time
> > , liveType, plat_form, duration, duration_time,
> > network_operator, role, useragent, uid, eventTime))
> >
> > CloudliveWatcher(id, partnerId, courseId, customerId,
> > courseNumber, nickName, ip, device_type, net_opretor, net_type, area,
> > join_time, leave_time, created_time, modified_time
> > , liveType, plat_form, duration, duration_time,
> > network_operator, role, useragent, uid, eventTime)
> >
> > }
> >
> >
> > 这样写是否合适,如果要用min/max应该如何代入上诉逻辑当中?
> >
> >
> >
> >
> > 赵一旦  于2020年12月28日周一 下午7:12写道:
> >
> > 按直播间ID和用户ID分组,使用session window,使用1min作为gap,统计key+window内的count即可,即sum(1)。
> >
> > 或者感觉你打点实际不一定肯定是1min、2min这种整时间点,可以统计key+window内的min/max,然后输出的时候做个减法。
> >
> > session window的作用就是根据连续2个元素的gap决定是否将2个元素放入同一个window中。
> >
> >
> > 张锴  于2020年12月28日周一 下午5:35写道:
> >
> > 能描述一下用session window的考虑吗
> >
> > Akisaya  于2020年12月28日周一 下午5:00写道:
> >
> > 这个可以用 session window 吧
> >
> >
> >
> >
> >
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#session-windows
> >
> > news_...@163.com  于2020年12月28日周一 下午2:15写道:
> >
> > 这么做的前提是每条记录是顺序进入KAFKA的才行,但真实场景不能保证这条,有可能较早的打点记录却较晚进入kafka队列。
> >
> >
> >
> > news_...@163.com
> >
> > 发件人: 张锴
> > 发送时间: 2020-12-28 13:35
> > 收件人: user-zh
> > 主题: 根据业务需求选择合适的flink state
> > 各位大佬帮我分析下如下需求应该怎么写
> >
> > 需求说明:
> >
> > 公司是做直播业务方面的,现在需要实时统计用户的在线时长,来源kafka,每分钟产生直播打点记录,数据中包含eventTime字段。举个例子,用户A
> >
> >
> >
> >
> >
> >
> 在1,2,3分钟一直产生打点记录,那么他的停留时长就是3分钟,第5,6分钟又来了,那他的停留时长就是2分钟,只要是连续的打点记录就合成一条记录,每个直播间每个用户同理。
> >
> > 我的想法:
> > 我现在的想法是按直播间ID和用户ID分组,然后process,想通过state方式来做,通过截取每条记录的event Time中的分钟数
> > 减去上一条的分钟数,如果他们差值等于1,说明是连续关系,则继续。如果不为1,说明断开了,直接输出这条记录,同时情况当前key的状态。
> >
> > 不知道各位大佬对我的这个想法有什么建议,或者说有更好的处理方式。
> >
> > flink 版本1.10.1
> >
> >
> >
> >
> >
>


退订

2021-01-22 文章 Natasha


退订

Re: 请教关于Flink yarnship的使用

2021-01-22 文章 yang nick
这个方法应该是读取本地的文件,但是你放到yarn中执行,就会找不到这个文件。所以建议可以把配置上传到hdfs中试试看

Yan Tang  于2021年1月22日周五 下午4:53写道:

> 我把配置和jar包分开了,用-yt option将配置文件Ship到yarn
> cluster中,但是在获取配置文件的时候,老是拿不到,有人有相关经验么?
> 我的提交命令:
> -yt /path/to/conf
>
> code:
> this.getClass().getResourceAsStream("conf/cmp_online.cfg")
> 但一直返回null.
>
>
>
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: Flink 并行度问题

2021-01-22 文章 gimlee
并行度和CPU的核数没啥关系。
设置slot数量也不代表使用多少个CPU。



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: Flink 并行度问题

2021-01-22 文章 yang nick
如果是 standalone的模式部署在一台机器上,那么据我了解,只会有一个TM,一个TM可以有多个slot

Jacob <17691150...@163.com> 于2021年1月22日周五 下午4:18写道:

> 使用Flink以来,一直有一个问题困扰着。
>
>
> Flink 设置n个并行度后,是指占用n个CPU,而不是n个CPU核数。
>
> 比如Flink消费kafka
>
> topic时,并行度数量往往都建议设置topic分区的个数,意在让每个并行度消费一个分区,达到性能最优。那也就是说一个并行度代表一个消费线程,同时也表示一个slot,又由于在Flink中一个并行度表示一个CPU,那么是不是可以理解为一个CPU就是一个线程。
>
>
> 如果FLink 以standalone的模式部署在一台机器上,这台机器有4个CPU,每个CPU有6个核,那么该集群的最大并行度是不是就是 4 ?
>
> 在我的认识中,一个核表示一个线程,但在上面的理解中,好像是一个CPU就是一个线程,这是否矛盾呢?
>
>
>
>
> -
> Thanks!
> Jacob
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


请教关于Flink yarnship的使用

2021-01-22 文章 Yan Tang
我把配置和jar包分开了,用-yt option将配置文件Ship到yarn cluster中,但是在获取配置文件的时候,老是拿不到,有人有相关经验么?
我的提交命令:
-yt /path/to/conf

code:
this.getClass().getResourceAsStream("conf/cmp_online.cfg")
但一直返回null.







--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re:Flink 并行度问题

2021-01-22 文章 Ye Chen
@jacob
hi, TaskManager 的一个 Slot 代表一个可用线程,该线程具有固定的内存,并且 Slot 只对内存隔离,没有对 CPU 隔离。
而slot 和并行度的关系是:Slot 是指 TaskManager 最大能并发执行的能力,parallelism 是指 TaskManager 
实际使用的并发能力。
个人见解,并行度的设置一般无需考虑CPU。







在 2021-01-22 16:18:32,"Jacob" <17691150...@163.com> 写道:
>使用Flink以来,一直有一个问题困扰着。
>
>
>Flink 设置n个并行度后,是指占用n个CPU,而不是n个CPU核数。
>
>比如Flink消费kafka
>topic时,并行度数量往往都建议设置topic分区的个数,意在让每个并行度消费一个分区,达到性能最优。那也就是说一个并行度代表一个消费线程,同时也表示一个slot,又由于在Flink中一个并行度表示一个CPU,那么是不是可以理解为一个CPU就是一个线程。
>
>
>如果FLink 以standalone的模式部署在一台机器上,这台机器有4个CPU,每个CPU有6个核,那么该集群的最大并行度是不是就是 4 ?
>
>在我的认识中,一个核表示一个线程,但在上面的理解中,好像是一个CPU就是一个线程,这是否矛盾呢?
>
>
>
>
>-
>Thanks!
>Jacob
>--
>Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Flink 并行度问题

2021-01-22 文章 赵一旦
不清楚你为啥需要想这些,集群的并行度你随意设置就好,考虑CPU核数等的地方都只是考虑理想情况的并发。
比如你CPU最高10个核,来20个线程也没办法“并行”,但是可以“并发”。如果你的线程事情很少,10个并发是无法占满10个CPU核的,所以没任何理由因为CPU核的数量去限制你的并发度。

Jacob <17691150...@163.com> 于2021年1月22日周五 下午4:18写道:

> 使用Flink以来,一直有一个问题困扰着。
>
>
> Flink 设置n个并行度后,是指占用n个CPU,而不是n个CPU核数。
>
> 比如Flink消费kafka
>
> topic时,并行度数量往往都建议设置topic分区的个数,意在让每个并行度消费一个分区,达到性能最优。那也就是说一个并行度代表一个消费线程,同时也表示一个slot,又由于在Flink中一个并行度表示一个CPU,那么是不是可以理解为一个CPU就是一个线程。
>
>
> 如果FLink 以standalone的模式部署在一台机器上,这台机器有4个CPU,每个CPU有6个核,那么该集群的最大并行度是不是就是 4 ?
>
> 在我的认识中,一个核表示一个线程,但在上面的理解中,好像是一个CPU就是一个线程,这是否矛盾呢?
>
>
>
>
> -
> Thanks!
> Jacob
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Pyflink JVM Metaspace 内存泄漏定位

2021-01-22 文章 YueKun
关闭问题,已经解决,解决方法是不通过 pipeline.jars 的方式跟随python任务动态提交jar包,改为放在 FLINK_HOME/lib 下



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Exception in thread "main" java.lang.RuntimeException: Unknown call expression: avg(amount)

2021-01-22 文章 Appleyuchi




我在驗證
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/tableApi.html
中的
"Distinct aggregation on over window"(請在上述鏈接內,Ctrl+f搜索該雙引號內的整個字符串)




測試代碼:
distinctaggregation3.java
https://paste.ubuntu.com/p/7HJ9W3hVVN/
測試用的POJO:
OrderStream.java
https://paste.ubuntu.com/p/f8msWgtzft/




異常棧是:
Exception in thread "main" java.lang.RuntimeException: Unknown call expression: 
avg(amount)
at 
org.apache.flink.table.planner.expressions.converter.ExpressionConverter.visit(ExpressionConverter.java:102)
at 
org.apache.flink.table.planner.expressions.converter.ExpressionConverter.visit(ExpressionConverter.java:72)
at 
org.apache.flink.table.expressions.CallExpression.accept(CallExpression.java:122)
at 
org.apache.flink.table.planner.expressions.converter.ExpressionConverter$1.toRexNode(ExpressionConverter.java:226)
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
at 
java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1374)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
at 
org.apache.flink.table.planner.expressions.converter.OverConvertRule.convert(OverConvertRule.java:81)
at 
org.apache.flink.table.planner.expressions.converter.ExpressionConverter.visit(ExpressionConverter.java:97)
at 
org.apache.flink.table.planner.expressions.converter.ExpressionConverter.visit(ExpressionConverter.java:72)
at 
org.apache.flink.table.expressions.CallExpression.accept(CallExpression.java:122)
at 
org.apache.flink.table.planner.expressions.converter.ExpressionConverter$1.toRexNode(ExpressionConverter.java:226)
at 
org.apache.flink.table.planner.expressions.converter.CustomizedConvertRule.convertAs(CustomizedConvertRule.java:275)
at 
org.apache.flink.table.planner.expressions.converter.CustomizedConvertRule.lambda$convert$0(CustomizedConvertRule.java:95)
at java.util.Optional.map(Optional.java:215)
at 
org.apache.flink.table.planner.expressions.converter.CustomizedConvertRule.convert(CustomizedConvertRule.java:95)
at 
org.apache.flink.table.planner.expressions.converter.ExpressionConverter.visit(ExpressionConverter.java:97)
at 
org.apache.flink.table.planner.expressions.converter.ExpressionConverter.visit(ExpressionConverter.java:72)
at 
org.apache.flink.table.expressions.CallExpression.accept(CallExpression.java:122)
at 
org.apache.flink.table.planner.plan.QueryOperationConverter.convertExprToRexNode(QueryOperationConverter.java:741)
at 
org.apache.flink.table.planner.plan.QueryOperationConverter.access$800(QueryOperationConverter.java:132)
at 
org.apache.flink.table.planner.plan.QueryOperationConverter$SingleRelVisitor.lambda$convertToRexNodes$6(QueryOperationConverter.java:547)
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
at 
java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1374)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
at 
org.apache.flink.table.planner.plan.QueryOperationConverter$SingleRelVisitor.convertToRexNodes(QueryOperationConverter.java:548)
at 
org.apache.flink.table.planner.plan.QueryOperationConverter$SingleRelVisitor.visit(QueryOperationConverter.java:156)
at 
org.apache.flink.table.planner.plan.QueryOperationConverter$SingleRelVisitor.visit(QueryOperationConverter.java:152)
at 
org.apache.flink.table.operations.ProjectQueryOperation.accept(ProjectQueryOperation.java:75)
at 
org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:149)
at 
org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:131)
at 
org.apache.flink.table.operations.utils.QueryOperationDefaultVisitor.visit(QueryOperationDefaultVisitor.java:47)
at 
org.apache.flink.table.operations.ProjectQueryOperation.accept(ProjectQueryOperation.java:75)
at 
org.apache.flink.table.planner.calcite.FlinkRelBuilder.queryOperation(FlinkRelBuilder.scala:165)
at 
org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:247)
at 
org.apache.flink.table.planner.delegation.PlannerBase.$anonfun$translate$1(PlannerBase.scala:163)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
at scala.collection.Iterator.foreach(Iterator.scala:937)
at scala.collection.Iterator.foreach$(Iterator.scala:937)
at 

Flink 并行度问题

2021-01-22 文章 Jacob
使用Flink以来,一直有一个问题困扰着。


Flink 设置n个并行度后,是指占用n个CPU,而不是n个CPU核数。

比如Flink消费kafka
topic时,并行度数量往往都建议设置topic分区的个数,意在让每个并行度消费一个分区,达到性能最优。那也就是说一个并行度代表一个消费线程,同时也表示一个slot,又由于在Flink中一个并行度表示一个CPU,那么是不是可以理解为一个CPU就是一个线程。


如果FLink 以standalone的模式部署在一台机器上,这台机器有4个CPU,每个CPU有6个核,那么该集群的最大并行度是不是就是 4 ?

在我的认识中,一个核表示一个线程,但在上面的理解中,好像是一个CPU就是一个线程,这是否矛盾呢?




-
Thanks!
Jacob
--
Sent from: http://apache-flink.147419.n8.nabble.com/

flink-Kafka 报错:ByteArraySerializer is not an instance of org.apache.kafka.common.serialization.Serializer

2021-01-22 文章 lp
测试代码如下:
--
public class Sink_KafkaSink_1{
public static void main(String[] args) throws Exception {
final ParameterTool params =
ParameterTool.fromPropertiesFile(Sink_KafkaSink_1.class.getResourceAsStream("/pro.properties"));
String host = params.get("host");
int kafkaPort = Integer.parseInt(params.get("kafkaPort"));
produceTestdata2kafka(new
StringJoiner(":").add(host).add(String.valueOf(kafkaPort)).toString());
}

private static void produceTestdata2kafka(String kafkaAddr) throws
Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(5000);
   
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

DataStreamSource text = env.addSource(new
CustomsourceFuncation()).setParallelism(1);

Properties properties = new Properties();
properties.setProperty("bootstrap.servers",kafkaAddr);

FlinkKafkaProducer producer = new
FlinkKafkaProducer("flinktest",//topic
new SimpleStringSchema(), //消息序列化
properties
);
//写入 Kafka 时附加记录的事件时间戳
producer.setWriteTimestampToKafka(true);
text.addSink(producer);
env.execute("[kafkaSink with custom source]");
}
}

class CustomsourceFuncation implements SourceFunction {
//private long count = 1L;
private boolean isRunning = true;

@Override
public void run(SourceContext ctx) throws Exception {
while(isRunning){
//图书的排行榜
List books = new ArrayList<>();
books.add("msg1");
books.add("msg2");
books.add("msg3");
books.add("msg4");
books.add("msg5");
int i = new Random().nextInt(5);
ctx.collect(books.get(i));
//每2秒产生一条数据
Thread.sleep(2000);
}
}

//取消一个cancel的时候会调用的方法
@Override
public void cancel() {
isRunning = false;
}
}
--

本地测试无异常,maven打包后提交yarn集群运行,application Mode模式,jobmanager循环一直报错如下:
--
2021-01-22 07:54:31,929 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Job
[kafkaSink with custom source] (c256bf309be7e543182c5e1d9af659ef) switched
from state RUNNING to RESTARTING.
2021-01-22 07:54:32,930 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Job
[kafkaSink with custom source] (c256bf309be7e543182c5e1d9af659ef) switched
from state RESTARTING to RUNNING.
2021-01-22 07:54:32,931 INFO 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - No
checkpoint found during restore.
2021-01-22 07:54:32,931 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Source:
Custom Source -> Sink: Unnamed (1/1) (ca057bcbb78c0a81fc471d81db89ec28)
switched from CREATED to SCHEDULED.
2021-01-22 07:54:32,932 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Source:
Custom Source -> Sink: Unnamed (1/1) (ca057bcbb78c0a81fc471d81db89ec28)
switched from SCHEDULED to DEPLOYING.
2021-01-22 07:54:32,932 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Deploying
Source: Custom Source -> Sink: Unnamed (1/1) (attempt #2) with attempt id
ca057bcbb78c0a81fc471d81db89ec28 to container_1611044725922_0027_01_02 @
slave02 (dataPort=37913) with allocation id 3f0f1dc64e898272d68989ca9a8feff2
2021-01-22 07:54:32,950 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Source:
Custom Source -> Sink: Unnamed (1/1) (ca057bcbb78c0a81fc471d81db89ec28)
switched from DEPLOYING to RUNNING.
2021-01-22 07:54:32,969 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Source:
Custom Source -> Sink: Unnamed (1/1) (ca057bcbb78c0a81fc471d81db89ec28)
switched from RUNNING to FAILED on container_1611044725922_0027_01_02 @
slave02 (dataPort=37913).
org.apache.kafka.common.KafkaException: Failed to construct kafka producer
at
org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:432)
~[quickstart-0.1.jar:?]
at
org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:298)
~[quickstart-0.1.jar:?]
at
org.apache.flink.streaming.connectors.kafka.internals.FlinkKafkaInternalProducer.(FlinkKafkaInternalProducer.java:77)
~[quickstart-0.1.jar:?]
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.createProducer(FlinkKafkaProducer.java:1230)
~[quickstart-0.1.jar:?]
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initProducer(FlinkKafkaProducer.java:1346)
~[quickstart-0.1.jar:?]
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initNonTransactionalProducer(FlinkKafkaProducer.java:1342)
~[quickstart-0.1.jar:?]
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.beginTransaction(FlinkKafkaProducer.java:990)

Re: flink sql 执行limit 很少的语句依然会暴增

2021-01-22 文章 zhang hao
嗯嗯 好的 谢谢大家 ,应该就是这个问题了,merge到分支验证下

On Fri, Jan 22, 2021 at 11:35 AM Shengkai Fang  wrote:

> hi, LIMIT PUSH DOWN 近期已经merge进了master分支了。
>
> [1] https://github.com/apache/flink/pull/13800
>
> Land  于2021年1月22日周五 上午11:28写道:
>
> > 可能是没有下推到MySQL执行。
> > 问题和我遇到的类似:
> > http://apache-flink.147419.n8.nabble.com/Flink-MySQL-td10374.html
> >
> >
> >
> > --
> > Sent from: http://apache-flink.147419.n8.nabble.com/
> >
>