?????? ??????????????????????flink state
TTL??keybykey1state?? ---- ??: ""https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#session-windows news_...@163.com
求一个简单的示例,flink写orc格式文件,对于Map复杂类型的写法。
目前通过自定义OrcBulkWriterFactory方式,拿到一个一个的ColumnVector,然后设置值。 对于简单类型,API看起来很清晰,但是Map类型没看懂怎么写。如下,对于serverTime是INT类型,直接设置vector[rowId]即可。那么对于MapColumnVector怎么设置呢,将多个key-value对写进去具体怎么写呢。 serverTimeColumnVector.vector[rowId] = ele.getTimestamp(); MapColumnVector dColumnVector = (MapColumnVector) batch.cols[2];
Re: Re: Re: Flink写hdfs提交任务就报错。Recoverable writers on Hadoop are only supported for HDFS
此外,写ORC格式文件,对于Map格式的有人知道怎么写的话给个示例吧。 如下,拿到MapColumnVector之后怎么写呢,目前非Map的简单字段都比较清晰,直接设置xxxColumnVector.vector[rowId]的值即可。但是MapColumnVector的API比较乱,没看懂怎么用。 MapColumnVector dColumnVector = (MapColumnVector) batch.cols[2]; 赵一旦 于2021年1月23日周六 下午1:42写道: > 已解决。覆盖了flink这部分源码去除了对非hdfs的schema限制。 > > 张锴 于2021年1月21日周四 下午7:35写道: > >> @赵一旦 >> 另外,上次我还提了一个问题请教你,我试了你说的那个想法,但是好像有点问题,你可以看一下 >> >> 张锴 于2021年1月21日周四 下午7:13写道: >> >> > 我用的flink 1.10版,FlieSink就是BucketingSink,我是用这个写hdfs的 >> > >> > 赵一旦 于2021年1月21日周四 下午7:05写道: >> > >> >> @Michael Ran; 嗯嗯,没关系。 >> >> >> >> @张锴 你说的是flink哪个版本的connector,stream or sql。我搜了下我的没有。我是1.12,stream。 >> >> >> >> >> 目前看文档有streamFileSink,还有FileSink,从文档内容来看使用方式差不多。我计划试一下FileSink,但不清楚FileSink和StreamFileSink啥区别,是否都能写hadoop类文件系统,因为涉及是否原子写,比较分布式文件系统不支持追加和编辑等。 >> >> >> >> Michael Ran 于2021年1月21日周四 下午7:01写道: >> >> >> >> > >> >> > >> >> >> 很抱歉,我已经很久没用过这个了。但是可以根据异常信息以及API源码执行进行分析,确定是否能直接写入。如果你要写入自定义的文件系统,那么只能实现自己的sink方式。或者你的文件系统的写入方式兼容hdfs的上层API可以参考各个sink端的写法 >> >> > 在 2021-01-21 18:45:06,"张锴" 写道: >> >> > >import >> >> org.apache.flink.streaming.connectors.fs.bucketing.{BucketingSink, >> >> > >DateTimeBucketer} >> >> > > >> >> > >sink.setBucketer sink.setWriter用这种方式试试 >> >> > > >> >> > > >> >> > > >> >> > >赵一旦 于2021年1月21日周四 下午6:37写道: >> >> > > >> >> > >> @Michael Ran >> >> > >> 然后有什么解决方案吗,我这个是使用flink的streamFileSink方式写hdfs的时候出现的异常。 >> >> > >> >> >> > >> Michael Ran 于2021年1月21日周四 下午5:23写道: >> >> > >> >> >> > >> > 这里应该是用了hdfs 的特定API吧,文件系统没兼容public >> >> > >> > HadoopRecoverableWriter(org.apache.hadoop.fs.FileSystem fs) >> {...} >> >> > >> > 在 2021-01-21 17:18:23,"赵一旦" 写道: >> >> > >> > >具体报错信息如下: >> >> > >> > > >> >> > >> > >java.lang.UnsupportedOperationException: Recoverable writers on >> >> > Hadoop >> >> > >> are >> >> > >> > >only supported for HDFS >> >> > >> > >at >> >> > org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.( >> >> > >> > >HadoopRecoverableWriter.java:61) >> >> > >> > >at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem >> >> > >> > >.createRecoverableWriter(HadoopFileSystem.java:210) >> >> > >> > >at org.apache.flink.core.fs.SafetyNetWrapperFileSystem >> >> > >> > >.createRecoverableWriter(SafetyNetWrapperFileSystem.java:69) >> >> > >> > >at >> org.apache.flink.streaming.api.functions.sink.filesystem. >> >> > >> > >> >> > >> >StreamingFileSink$RowFormatBuilder.createBucketWriter(StreamingFileSink >> >> > >> > >.java:260) >> >> > >> > >at >> org.apache.flink.streaming.api.functions.sink.filesystem. >> >> > >> > >> >> > >> > >> >> > >> >> >> > >> >> >> >StreamingFileSink$RowFormatBuilder.createBuckets(StreamingFileSink.java:270) >> >> > >> > >at >> org.apache.flink.streaming.api.functions.sink.filesystem. >> >> > >> > >StreamingFileSink.initializeState(StreamingFileSink.java:412) >> >> > >> > >at >> >> > org.apache.flink.streaming.util.functions.StreamingFunctionUtils >> >> > >> > >.tryRestoreFunction(StreamingFunctionUtils.java:185) >> >> > >> > >at >> >> > org.apache.flink.streaming.util.functions.StreamingFunctionUtils >> >> > >> > >.restoreFunctionState(StreamingFunctionUtils.java:167) >> >> > >> > >at >> >> > >> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator >> >> > >> > >.initializeState(AbstractUdfStreamOperator.java:96) >> >> > >> > >at >> >> > >> >> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler >> >> > >> > >.initializeOperatorState(StreamOperatorStateHandler.java:107) >> >> > >> > >at >> >> > org.apache.flink.streaming.api.operators.AbstractStreamOperator >> >> > >> > >.initializeState(AbstractStreamOperator.java:264) >> >> > >> > >at org.apache.flink.streaming.runtime.tasks.OperatorChain >> >> > >> > >.initializeStateAndOpenOperators(OperatorChain.java:400) >> >> > >> > >at org.apache.flink.streaming.runtime.tasks.StreamTask >> >> > >> > >.lambda$beforeInvoke$2(StreamTask.java:507) >> >> > >> > >at >> >> > >> >> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1 >> >> > >> > >.runThrowing(StreamTaskActionExecutor.java:47) >> >> > >> > >at >> >> > org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke( >> >> > >> > >StreamTask.java:501) >> >> > >> > >at >> >> > >> > >> >> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask >> >> > >> > >.java:531) >> >> > >> > >at >> >> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722) >> >> > >> > >at >> >> org.apache.flink.runtime.taskmanager.Task.run(Task.java:547) >> >> > >> > >at java.lang.Thread.run(Thread.java:748) >> >> > >> > > >> >> > >> > > >> >> > >> > >赵一旦 于2021年1月21日周四 下午5:17写道: >> >> > >> > > >> >> > >> > >> Recoverable writers on Hadoop are only supported for HDFS >> >> > >> > >> >> >> > >> > >> 如上,我们用的hadoop协议的,但底层不是hdfs,是公司自研的分布式文件系统。 >> >> > >> > >> >> >> > >> > >> 使用spark写,spark-sql读等都没问题。但是flink写和读当前都没尝试成功。 >> >> > >> > >> >> >> > >> > >> >> >> > >> > >> >> >> > >> > >> >> > >> >> >> > >> >> >> > >> >
Re: Re: Re: Flink写hdfs提交任务就报错。Recoverable writers on Hadoop are only supported for HDFS
已解决。覆盖了flink这部分源码去除了对非hdfs的schema限制。 张锴 于2021年1月21日周四 下午7:35写道: > @赵一旦 > 另外,上次我还提了一个问题请教你,我试了你说的那个想法,但是好像有点问题,你可以看一下 > > 张锴 于2021年1月21日周四 下午7:13写道: > > > 我用的flink 1.10版,FlieSink就是BucketingSink,我是用这个写hdfs的 > > > > 赵一旦 于2021年1月21日周四 下午7:05写道: > > > >> @Michael Ran; 嗯嗯,没关系。 > >> > >> @张锴 你说的是flink哪个版本的connector,stream or sql。我搜了下我的没有。我是1.12,stream。 > >> > >> > 目前看文档有streamFileSink,还有FileSink,从文档内容来看使用方式差不多。我计划试一下FileSink,但不清楚FileSink和StreamFileSink啥区别,是否都能写hadoop类文件系统,因为涉及是否原子写,比较分布式文件系统不支持追加和编辑等。 > >> > >> Michael Ran 于2021年1月21日周四 下午7:01写道: > >> > >> > > >> > > >> > 很抱歉,我已经很久没用过这个了。但是可以根据异常信息以及API源码执行进行分析,确定是否能直接写入。如果你要写入自定义的文件系统,那么只能实现自己的sink方式。或者你的文件系统的写入方式兼容hdfs的上层API可以参考各个sink端的写法 > >> > 在 2021-01-21 18:45:06,"张锴" 写道: > >> > >import > >> org.apache.flink.streaming.connectors.fs.bucketing.{BucketingSink, > >> > >DateTimeBucketer} > >> > > > >> > >sink.setBucketer sink.setWriter用这种方式试试 > >> > > > >> > > > >> > > > >> > >赵一旦 于2021年1月21日周四 下午6:37写道: > >> > > > >> > >> @Michael Ran > >> > >> 然后有什么解决方案吗,我这个是使用flink的streamFileSink方式写hdfs的时候出现的异常。 > >> > >> > >> > >> Michael Ran 于2021年1月21日周四 下午5:23写道: > >> > >> > >> > >> > 这里应该是用了hdfs 的特定API吧,文件系统没兼容public > >> > >> > HadoopRecoverableWriter(org.apache.hadoop.fs.FileSystem fs) {...} > >> > >> > 在 2021-01-21 17:18:23,"赵一旦" 写道: > >> > >> > >具体报错信息如下: > >> > >> > > > >> > >> > >java.lang.UnsupportedOperationException: Recoverable writers on > >> > Hadoop > >> > >> are > >> > >> > >only supported for HDFS > >> > >> > >at > >> > org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.( > >> > >> > >HadoopRecoverableWriter.java:61) > >> > >> > >at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem > >> > >> > >.createRecoverableWriter(HadoopFileSystem.java:210) > >> > >> > >at org.apache.flink.core.fs.SafetyNetWrapperFileSystem > >> > >> > >.createRecoverableWriter(SafetyNetWrapperFileSystem.java:69) > >> > >> > >at org.apache.flink.streaming.api.functions.sink.filesystem. > >> > >> > > >> > > >StreamingFileSink$RowFormatBuilder.createBucketWriter(StreamingFileSink > >> > >> > >.java:260) > >> > >> > >at org.apache.flink.streaming.api.functions.sink.filesystem. > >> > >> > > >> > >> > > >> > >> > >> > > >> > >StreamingFileSink$RowFormatBuilder.createBuckets(StreamingFileSink.java:270) > >> > >> > >at org.apache.flink.streaming.api.functions.sink.filesystem. > >> > >> > >StreamingFileSink.initializeState(StreamingFileSink.java:412) > >> > >> > >at > >> > org.apache.flink.streaming.util.functions.StreamingFunctionUtils > >> > >> > >.tryRestoreFunction(StreamingFunctionUtils.java:185) > >> > >> > >at > >> > org.apache.flink.streaming.util.functions.StreamingFunctionUtils > >> > >> > >.restoreFunctionState(StreamingFunctionUtils.java:167) > >> > >> > >at > >> > >> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator > >> > >> > >.initializeState(AbstractUdfStreamOperator.java:96) > >> > >> > >at > >> > >> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler > >> > >> > >.initializeOperatorState(StreamOperatorStateHandler.java:107) > >> > >> > >at > >> > org.apache.flink.streaming.api.operators.AbstractStreamOperator > >> > >> > >.initializeState(AbstractStreamOperator.java:264) > >> > >> > >at org.apache.flink.streaming.runtime.tasks.OperatorChain > >> > >> > >.initializeStateAndOpenOperators(OperatorChain.java:400) > >> > >> > >at org.apache.flink.streaming.runtime.tasks.StreamTask > >> > >> > >.lambda$beforeInvoke$2(StreamTask.java:507) > >> > >> > >at > >> > >> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1 > >> > >> > >.runThrowing(StreamTaskActionExecutor.java:47) > >> > >> > >at > >> > org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke( > >> > >> > >StreamTask.java:501) > >> > >> > >at > >> > >> > > >> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask > >> > >> > >.java:531) > >> > >> > >at > >> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722) > >> > >> > >at > >> org.apache.flink.runtime.taskmanager.Task.run(Task.java:547) > >> > >> > >at java.lang.Thread.run(Thread.java:748) > >> > >> > > > >> > >> > > > >> > >> > >赵一旦 于2021年1月21日周四 下午5:17写道: > >> > >> > > > >> > >> > >> Recoverable writers on Hadoop are only supported for HDFS > >> > >> > >> > >> > >> > >> 如上,我们用的hadoop协议的,但底层不是hdfs,是公司自研的分布式文件系统。 > >> > >> > >> > >> > >> > >> 使用spark写,spark-sql读等都没问题。但是flink写和读当前都没尝试成功。 > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > > >> > >> > >> > > >> > > >
Re: Flink的StreamFileSink和1.12提供的FileSink中,BucketsBuilder的createBucketWriter中仅支持recoverableWriter。
已解决。重改写了flink源码覆盖了这部分限制就可以了。 赵一旦 于2021年1月22日周五 上午10:17写道: > 如题,为什么仅支持recoverableWriter,如果我使用的文件系统不支持怎么办呢,必须自定义sink吗? > > > 我这边用的是一个公司自研的大型分布式文件系统,支持hadoop协议(但不清楚是否所有特性都支持),目前使用streamFileSink和FileSink貌似都无法正常写。 > > 不清楚是否有其他问题,至少当前是卡住在这个recoverable上了。 > > 报错是只有hdfs才支持recoverableWriter。 > > 有人知道如何解吗? >
?????? ????????????Orders????????????????join????????????????????
=_= ??thank you?? ---- ??: "yang nick"
Re: 公司强行拆分Orders主表,遇到事实表join请各位大神们支个招。
我感觉你这个用实时很难做,涉及到状态更新的无限流,需要配置 state ttl 徐州州 <25977...@qq.com> 于2021年1月23日周六 上午11:23写道: > > 我遇到的难题是,拒收订单想拿到payAment字段必须扫描全量的order_money表。order_money是下单时候才会产生,我拒收订单根本不知道它的下单时间根本不知道怎么拿,而且order_money没有任何标记,我全量扫描money表程序OOM。我的数据是通过Canal监控过来的,我需要写flink-sql来进行join。 > > > > > --原始邮件-- > 发件人: "yang nick" 发送时间: 2021年1月23日(星期六) 中午11:16 > 收件人: "user-zh" 主题: Re: 公司强行拆分Orders主表,遇到事实表join请各位大神们支个招。 > > > > 我觉的既然是做数仓,这里没啥好纠结的,数据已经在数仓里面了,做成宽表就行了 > > 徐州州 <25977...@qq.com 于2021年1月23日周六 上午11:11写道: > > > > 我起初也有这个想法,但是他们业务告诉我,order_reject表是单独存在的。拒收单子只会在order_reject出现,而order_money只在它下单的时候写入,拒收订单数据过来order_money不会有任何变化,所以我即使拿到每天的拒收订单去join也是要读取全量order_money。 > > > > > --nbsp;原始邮件nbsp;-- > 发件人: "yang nick" 发送时间: 2021年1月23日(星期六) 中午11:04 > 收件人: "user-zh" 主题: Re: 公司强行拆分Orders主表,遇到事实表join请各位大神们支个招。 > > > > 把拆开的表在join起来嘛 > > 徐州州 <25977...@qq.comgt; 于2021年1月23日周六 上午10:48写道: > > gt; > > 请教各位大佬一个业务问题,(当天日报业务)版本1:我可以通过拒收时间直接拿到订单主表当天的拒收订单和payAmont金额,版本2:现在公司订单主表重做,拒收订单和payAmont被拆分出两张表,Order_reject和Order_money,现在我的问题是就算我可以从(拒收订单表)中通过时间拿到当天拒收订单,但是PayAmont无法通过时间在Order_money表拿到,PayAmont是在下单的时候才会生成,这样我当天拒收业务都要扫描全量的order_money表拿到对应的PayAmont金额字段,有什么好的处理方法吗 > gt; > > ?我目前的处理方法是强行读取Order_money的createTime-30day来,尽可能的让拒收订单找到对应的payAmont字段,不只是只拆分出一张order_money还有order_extend还有好几张表,我是做数仓的,我就没见过这么干的。强行把度量值字段拆分出来。请各位大神支个招。
Re: flink-sql-gateway支持远程吗
可以试试zeppelin 罗显宴 <15927482...@163.com> 于2021年1月23日周六 上午11:19写道: > > 大佬们,我试过flinksql-client和flink-sql-gateway发现都是把代码提交到本地集群运行,好像不能提交到指定远程集群上执行任务,请问大佬们,怎么提交代码到远程集群运行 > > > | | > 15927482803 > | > | > 邮箱:15927482...@163.com > | > > 签名由 网易邮箱大师 定制
Re: 公司强行拆分Orders主表,遇到事实表join请各位大神们支个招。
我觉的既然是做数仓,这里没啥好纠结的,数据已经在数仓里面了,做成宽表就行了 徐州州 <25977...@qq.com> 于2021年1月23日周六 上午11:11写道: > > 我起初也有这个想法,但是他们业务告诉我,order_reject表是单独存在的。拒收单子只会在order_reject出现,而order_money只在它下单的时候写入,拒收订单数据过来order_money不会有任何变化,所以我即使拿到每天的拒收订单去join也是要读取全量order_money。 > > > > > --原始邮件-- > 发件人: "yang nick" 发送时间: 2021年1月23日(星期六) 中午11:04 > 收件人: "user-zh" 主题: Re: 公司强行拆分Orders主表,遇到事实表join请各位大神们支个招。 > > > > 把拆开的表在join起来嘛 > > 徐州州 <25977...@qq.com 于2021年1月23日周六 上午10:48写道: > > > 请教各位大佬一个业务问题,(当天日报业务)版本1:我可以通过拒收时间直接拿到订单主表当天的拒收订单和payAmont金额,版本2:现在公司订单主表重做,拒收订单和payAmont被拆分出两张表,Order_reject和Order_money,现在我的问题是就算我可以从(拒收订单表)中通过时间拿到当天拒收订单,但是PayAmont无法通过时间在Order_money表拿到,PayAmont是在下单的时候才会生成,这样我当天拒收业务都要扫描全量的order_money表拿到对应的PayAmont金额字段,有什么好的处理方法吗 > > ?我目前的处理方法是强行读取Order_money的createTime-30day来,尽可能的让拒收订单找到对应的payAmont字段,不只是只拆分出一张order_money还有order_extend还有好几张表,我是做数仓的,我就没见过这么干的。强行把度量值字段拆分出来。请各位大神支个招。
?????? ????????????Orders????????????????join????????????????????
order_reject??order_rejectorder_money??order_money??join??order_money?? ---- ??: "yang nick"
回复:请教关于Flink yarnship的使用
URL url = this.getClass().getClassLoader().getResource("conf”); String dir = url.getFile(); dir目录下应该会包含ship的配置文件,你可以试下。 在2021年01月22日 15:38,Yan Tang 写道: 我把配置和jar包分开了,用-yt option将配置文件Ship到yarn cluster中,但是在获取配置文件的时候,老是拿不到,有人有相关经验么? 我的提交命令: -yt /path/to/conf code: this.getClass().getResourceAsStream("conf/cmp_online.cfg") 但一直返回null. -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: 请教关于Flink yarnship的使用
你可以尝试同时指定-C "file:///path/to/conf/cmp_online.cfg" 以及 -yt /path/to/conf 来进行测试 然后代码里这么获取this.getClass().getResourceAsStream("cmp_online.cfg") -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: 请教关于Flink yarnship的使用
如果-yt 不适用我这种场景,真不知道这个option是做什么的了。在spark中我用的就是--files,可以达到我想要的效果。 -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: 根据业务需求选择合适的flink state
@赵一旦 可以添加一下微信好友吗,具体的实践上还有点问题,我是在window后直接reduce(new myReduceFunc(),new AssignWindowProcessFunc())自定义了这两个方法,但是效果还是有点问题,不知道我的写法是不是有问题 赵一旦 于2021年1月22日周五 上午10:10写道: > 我理解你要的最终mysql结果表是: > 直播间ID;用户ID;上线时间;下线时间;durationn=(下线时间 - 上线时间); > > 如果user1在直播间1,一天内出现10次,就出现10个记录,分别记录了每次的duration。 > > > 如上按照我的方案就可以实现哈。 > > xuhaiLong 于2021年1月22日周五 上午10:03写道: > > > 可以试试这样,mysql 中 设置唯一键为窗口的 startTime 和 > > userId,然后对用户的每个窗口做停留时间的计算,最终会同一个用户在一天会产生多条记录,不过窗口的 startTime 不同,取值的时候sum > 试试? > > > > > > 在2021年1月21日 18:24,张锴 写道: > > 你好,之前我用了你上诉的方法出现一个问题,我并没有用min/max,我是在procss方法里用的context.window.getStart和 > > > > > context.window.getEnd作为开始和结束时间的,感觉这样也能获得最大和最小值,但是出来的数据最长停留了4分多钟,我跑的离线任务停留的时长有几个小时的都有,感觉出来的效果有问题。 > > 下面是我的部分代码逻辑: > > > > val ds = dataStream > > .filter(_.liveType == 1) > > .keyBy(1, 2) > > .window(EventTimeSessionWindows.withGap(Time.minutes(1))) > > .process(new myProcessWindow()).uid("process-id") > > > > class myProcessWindow() extends > > ProcessWindowFunction[CloudLiveLogOnLine, CloudliveWatcher, Tuple, > > TimeWindow] { > > > > override def process(key: Tuple, context: Context, elements: > > Iterable[CloudLiveLogOnLine], out: Collector[CloudliveWatcher]): Unit > > = { > > var startTime = context.window.getStart //定义第一个元素进入窗口的开始时间 > > var endTime = context.window.getEnd //定义最后一个元素进入窗口的时间 > > > > val currentDate = DateUtil.currentDate > > val created_time = currentDate > > val modified_time = currentDate > > 。。。 > > > > val join_time: String = > > DateUtil.convertTimeStamp2DateStr(startTime, > > DateUtil.SECOND_DATE_FORMAT) > > val leave_time:String = DateUtil.convertTimeStamp2DateStr(endTime, > > DateUtil.SECOND_DATE_FORMAT) > > val duration = (endTime - startTime) / 1000 //停留多少秒 > > val duration_time = DateUtil.secondsToFormat(duration) //停留时分秒 > > out.collect(CloudliveWatcher(id, partnerId, courseId, customerId, > > courseNumber, nickName, ip, device_type, net_opretor, net_type, area, > > join_time, leave_time, created_time, modified_time > > , liveType, plat_form, duration, duration_time, > > network_operator, role, useragent, uid, eventTime)) > > > > CloudliveWatcher(id, partnerId, courseId, customerId, > > courseNumber, nickName, ip, device_type, net_opretor, net_type, area, > > join_time, leave_time, created_time, modified_time > > , liveType, plat_form, duration, duration_time, > > network_operator, role, useragent, uid, eventTime) > > > > } > > > > > > 这样写是否合适,如果要用min/max应该如何代入上诉逻辑当中? > > > > > > > > > > 赵一旦 于2020年12月28日周一 下午7:12写道: > > > > 按直播间ID和用户ID分组,使用session window,使用1min作为gap,统计key+window内的count即可,即sum(1)。 > > > > 或者感觉你打点实际不一定肯定是1min、2min这种整时间点,可以统计key+window内的min/max,然后输出的时候做个减法。 > > > > session window的作用就是根据连续2个元素的gap决定是否将2个元素放入同一个window中。 > > > > > > 张锴 于2020年12月28日周一 下午5:35写道: > > > > 能描述一下用session window的考虑吗 > > > > Akisaya 于2020年12月28日周一 下午5:00写道: > > > > 这个可以用 session window 吧 > > > > > > > > > > > https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#session-windows > > > > news_...@163.com 于2020年12月28日周一 下午2:15写道: > > > > 这么做的前提是每条记录是顺序进入KAFKA的才行,但真实场景不能保证这条,有可能较早的打点记录却较晚进入kafka队列。 > > > > > > > > news_...@163.com > > > > 发件人: 张锴 > > 发送时间: 2020-12-28 13:35 > > 收件人: user-zh > > 主题: 根据业务需求选择合适的flink state > > 各位大佬帮我分析下如下需求应该怎么写 > > > > 需求说明: > > > > 公司是做直播业务方面的,现在需要实时统计用户的在线时长,来源kafka,每分钟产生直播打点记录,数据中包含eventTime字段。举个例子,用户A > > > > > > > > > > > > > 在1,2,3分钟一直产生打点记录,那么他的停留时长就是3分钟,第5,6分钟又来了,那他的停留时长就是2分钟,只要是连续的打点记录就合成一条记录,每个直播间每个用户同理。 > > > > 我的想法: > > 我现在的想法是按直播间ID和用户ID分组,然后process,想通过state方式来做,通过截取每条记录的event Time中的分钟数 > > 减去上一条的分钟数,如果他们差值等于1,说明是连续关系,则继续。如果不为1,说明断开了,直接输出这条记录,同时情况当前key的状态。 > > > > 不知道各位大佬对我的这个想法有什么建议,或者说有更好的处理方式。 > > > > flink 版本1.10.1 > > > > > > > > > > >
退订
退订
Re: 请教关于Flink yarnship的使用
这个方法应该是读取本地的文件,但是你放到yarn中执行,就会找不到这个文件。所以建议可以把配置上传到hdfs中试试看 Yan Tang 于2021年1月22日周五 下午4:53写道: > 我把配置和jar包分开了,用-yt option将配置文件Ship到yarn > cluster中,但是在获取配置文件的时候,老是拿不到,有人有相关经验么? > 我的提交命令: > -yt /path/to/conf > > code: > this.getClass().getResourceAsStream("conf/cmp_online.cfg") > 但一直返回null. > > > > > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ >
Re: Flink 并行度问题
并行度和CPU的核数没啥关系。 设置slot数量也不代表使用多少个CPU。 -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: Flink 并行度问题
如果是 standalone的模式部署在一台机器上,那么据我了解,只会有一个TM,一个TM可以有多个slot Jacob <17691150...@163.com> 于2021年1月22日周五 下午4:18写道: > 使用Flink以来,一直有一个问题困扰着。 > > > Flink 设置n个并行度后,是指占用n个CPU,而不是n个CPU核数。 > > 比如Flink消费kafka > > topic时,并行度数量往往都建议设置topic分区的个数,意在让每个并行度消费一个分区,达到性能最优。那也就是说一个并行度代表一个消费线程,同时也表示一个slot,又由于在Flink中一个并行度表示一个CPU,那么是不是可以理解为一个CPU就是一个线程。 > > > 如果FLink 以standalone的模式部署在一台机器上,这台机器有4个CPU,每个CPU有6个核,那么该集群的最大并行度是不是就是 4 ? > > 在我的认识中,一个核表示一个线程,但在上面的理解中,好像是一个CPU就是一个线程,这是否矛盾呢? > > > > > - > Thanks! > Jacob > -- > Sent from: http://apache-flink.147419.n8.nabble.com/
请教关于Flink yarnship的使用
我把配置和jar包分开了,用-yt option将配置文件Ship到yarn cluster中,但是在获取配置文件的时候,老是拿不到,有人有相关经验么? 我的提交命令: -yt /path/to/conf code: this.getClass().getResourceAsStream("conf/cmp_online.cfg") 但一直返回null. -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re:Flink 并行度问题
@jacob hi, TaskManager 的一个 Slot 代表一个可用线程,该线程具有固定的内存,并且 Slot 只对内存隔离,没有对 CPU 隔离。 而slot 和并行度的关系是:Slot 是指 TaskManager 最大能并发执行的能力,parallelism 是指 TaskManager 实际使用的并发能力。 个人见解,并行度的设置一般无需考虑CPU。 在 2021-01-22 16:18:32,"Jacob" <17691150...@163.com> 写道: >使用Flink以来,一直有一个问题困扰着。 > > >Flink 设置n个并行度后,是指占用n个CPU,而不是n个CPU核数。 > >比如Flink消费kafka >topic时,并行度数量往往都建议设置topic分区的个数,意在让每个并行度消费一个分区,达到性能最优。那也就是说一个并行度代表一个消费线程,同时也表示一个slot,又由于在Flink中一个并行度表示一个CPU,那么是不是可以理解为一个CPU就是一个线程。 > > >如果FLink 以standalone的模式部署在一台机器上,这台机器有4个CPU,每个CPU有6个核,那么该集群的最大并行度是不是就是 4 ? > >在我的认识中,一个核表示一个线程,但在上面的理解中,好像是一个CPU就是一个线程,这是否矛盾呢? > > > > >- >Thanks! >Jacob >-- >Sent from: http://apache-flink.147419.n8.nabble.com/
Re: Flink 并行度问题
不清楚你为啥需要想这些,集群的并行度你随意设置就好,考虑CPU核数等的地方都只是考虑理想情况的并发。 比如你CPU最高10个核,来20个线程也没办法“并行”,但是可以“并发”。如果你的线程事情很少,10个并发是无法占满10个CPU核的,所以没任何理由因为CPU核的数量去限制你的并发度。 Jacob <17691150...@163.com> 于2021年1月22日周五 下午4:18写道: > 使用Flink以来,一直有一个问题困扰着。 > > > Flink 设置n个并行度后,是指占用n个CPU,而不是n个CPU核数。 > > 比如Flink消费kafka > > topic时,并行度数量往往都建议设置topic分区的个数,意在让每个并行度消费一个分区,达到性能最优。那也就是说一个并行度代表一个消费线程,同时也表示一个slot,又由于在Flink中一个并行度表示一个CPU,那么是不是可以理解为一个CPU就是一个线程。 > > > 如果FLink 以standalone的模式部署在一台机器上,这台机器有4个CPU,每个CPU有6个核,那么该集群的最大并行度是不是就是 4 ? > > 在我的认识中,一个核表示一个线程,但在上面的理解中,好像是一个CPU就是一个线程,这是否矛盾呢? > > > > > - > Thanks! > Jacob > -- > Sent from: http://apache-flink.147419.n8.nabble.com/
Re: Pyflink JVM Metaspace 内存泄漏定位
关闭问题,已经解决,解决方法是不通过 pipeline.jars 的方式跟随python任务动态提交jar包,改为放在 FLINK_HOME/lib 下 -- Sent from: http://apache-flink.147419.n8.nabble.com/
Exception in thread "main" java.lang.RuntimeException: Unknown call expression: avg(amount)
我在驗證 https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/tableApi.html 中的 "Distinct aggregation on over window"(請在上述鏈接內,Ctrl+f搜索該雙引號內的整個字符串) 測試代碼: distinctaggregation3.java https://paste.ubuntu.com/p/7HJ9W3hVVN/ 測試用的POJO: OrderStream.java https://paste.ubuntu.com/p/f8msWgtzft/ 異常棧是: Exception in thread "main" java.lang.RuntimeException: Unknown call expression: avg(amount) at org.apache.flink.table.planner.expressions.converter.ExpressionConverter.visit(ExpressionConverter.java:102) at org.apache.flink.table.planner.expressions.converter.ExpressionConverter.visit(ExpressionConverter.java:72) at org.apache.flink.table.expressions.CallExpression.accept(CallExpression.java:122) at org.apache.flink.table.planner.expressions.converter.ExpressionConverter$1.toRexNode(ExpressionConverter.java:226) at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1374) at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481) at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471) at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499) at org.apache.flink.table.planner.expressions.converter.OverConvertRule.convert(OverConvertRule.java:81) at org.apache.flink.table.planner.expressions.converter.ExpressionConverter.visit(ExpressionConverter.java:97) at org.apache.flink.table.planner.expressions.converter.ExpressionConverter.visit(ExpressionConverter.java:72) at org.apache.flink.table.expressions.CallExpression.accept(CallExpression.java:122) at org.apache.flink.table.planner.expressions.converter.ExpressionConverter$1.toRexNode(ExpressionConverter.java:226) at org.apache.flink.table.planner.expressions.converter.CustomizedConvertRule.convertAs(CustomizedConvertRule.java:275) at org.apache.flink.table.planner.expressions.converter.CustomizedConvertRule.lambda$convert$0(CustomizedConvertRule.java:95) at java.util.Optional.map(Optional.java:215) at org.apache.flink.table.planner.expressions.converter.CustomizedConvertRule.convert(CustomizedConvertRule.java:95) at org.apache.flink.table.planner.expressions.converter.ExpressionConverter.visit(ExpressionConverter.java:97) at org.apache.flink.table.planner.expressions.converter.ExpressionConverter.visit(ExpressionConverter.java:72) at org.apache.flink.table.expressions.CallExpression.accept(CallExpression.java:122) at org.apache.flink.table.planner.plan.QueryOperationConverter.convertExprToRexNode(QueryOperationConverter.java:741) at org.apache.flink.table.planner.plan.QueryOperationConverter.access$800(QueryOperationConverter.java:132) at org.apache.flink.table.planner.plan.QueryOperationConverter$SingleRelVisitor.lambda$convertToRexNodes$6(QueryOperationConverter.java:547) at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1374) at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481) at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471) at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499) at org.apache.flink.table.planner.plan.QueryOperationConverter$SingleRelVisitor.convertToRexNodes(QueryOperationConverter.java:548) at org.apache.flink.table.planner.plan.QueryOperationConverter$SingleRelVisitor.visit(QueryOperationConverter.java:156) at org.apache.flink.table.planner.plan.QueryOperationConverter$SingleRelVisitor.visit(QueryOperationConverter.java:152) at org.apache.flink.table.operations.ProjectQueryOperation.accept(ProjectQueryOperation.java:75) at org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:149) at org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:131) at org.apache.flink.table.operations.utils.QueryOperationDefaultVisitor.visit(QueryOperationDefaultVisitor.java:47) at org.apache.flink.table.operations.ProjectQueryOperation.accept(ProjectQueryOperation.java:75) at org.apache.flink.table.planner.calcite.FlinkRelBuilder.queryOperation(FlinkRelBuilder.scala:165) at org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:247) at org.apache.flink.table.planner.delegation.PlannerBase.$anonfun$translate$1(PlannerBase.scala:163) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233) at scala.collection.Iterator.foreach(Iterator.scala:937) at scala.collection.Iterator.foreach$(Iterator.scala:937) at
Flink 并行度问题
使用Flink以来,一直有一个问题困扰着。 Flink 设置n个并行度后,是指占用n个CPU,而不是n个CPU核数。 比如Flink消费kafka topic时,并行度数量往往都建议设置topic分区的个数,意在让每个并行度消费一个分区,达到性能最优。那也就是说一个并行度代表一个消费线程,同时也表示一个slot,又由于在Flink中一个并行度表示一个CPU,那么是不是可以理解为一个CPU就是一个线程。 如果FLink 以standalone的模式部署在一台机器上,这台机器有4个CPU,每个CPU有6个核,那么该集群的最大并行度是不是就是 4 ? 在我的认识中,一个核表示一个线程,但在上面的理解中,好像是一个CPU就是一个线程,这是否矛盾呢? - Thanks! Jacob -- Sent from: http://apache-flink.147419.n8.nabble.com/
flink-Kafka 报错:ByteArraySerializer is not an instance of org.apache.kafka.common.serialization.Serializer
测试代码如下: -- public class Sink_KafkaSink_1{ public static void main(String[] args) throws Exception { final ParameterTool params = ParameterTool.fromPropertiesFile(Sink_KafkaSink_1.class.getResourceAsStream("/pro.properties")); String host = params.get("host"); int kafkaPort = Integer.parseInt(params.get("kafkaPort")); produceTestdata2kafka(new StringJoiner(":").add(host).add(String.valueOf(kafkaPort)).toString()); } private static void produceTestdata2kafka(String kafkaAddr) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(5000); env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); DataStreamSource text = env.addSource(new CustomsourceFuncation()).setParallelism(1); Properties properties = new Properties(); properties.setProperty("bootstrap.servers",kafkaAddr); FlinkKafkaProducer producer = new FlinkKafkaProducer("flinktest",//topic new SimpleStringSchema(), //消息序列化 properties ); //写入 Kafka 时附加记录的事件时间戳 producer.setWriteTimestampToKafka(true); text.addSink(producer); env.execute("[kafkaSink with custom source]"); } } class CustomsourceFuncation implements SourceFunction { //private long count = 1L; private boolean isRunning = true; @Override public void run(SourceContext ctx) throws Exception { while(isRunning){ //图书的排行榜 List books = new ArrayList<>(); books.add("msg1"); books.add("msg2"); books.add("msg3"); books.add("msg4"); books.add("msg5"); int i = new Random().nextInt(5); ctx.collect(books.get(i)); //每2秒产生一条数据 Thread.sleep(2000); } } //取消一个cancel的时候会调用的方法 @Override public void cancel() { isRunning = false; } } -- 本地测试无异常,maven打包后提交yarn集群运行,application Mode模式,jobmanager循环一直报错如下: -- 2021-01-22 07:54:31,929 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job [kafkaSink with custom source] (c256bf309be7e543182c5e1d9af659ef) switched from state RUNNING to RESTARTING. 2021-01-22 07:54:32,930 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job [kafkaSink with custom source] (c256bf309be7e543182c5e1d9af659ef) switched from state RESTARTING to RUNNING. 2021-01-22 07:54:32,931 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - No checkpoint found during restore. 2021-01-22 07:54:32,931 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: Custom Source -> Sink: Unnamed (1/1) (ca057bcbb78c0a81fc471d81db89ec28) switched from CREATED to SCHEDULED. 2021-01-22 07:54:32,932 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: Custom Source -> Sink: Unnamed (1/1) (ca057bcbb78c0a81fc471d81db89ec28) switched from SCHEDULED to DEPLOYING. 2021-01-22 07:54:32,932 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Deploying Source: Custom Source -> Sink: Unnamed (1/1) (attempt #2) with attempt id ca057bcbb78c0a81fc471d81db89ec28 to container_1611044725922_0027_01_02 @ slave02 (dataPort=37913) with allocation id 3f0f1dc64e898272d68989ca9a8feff2 2021-01-22 07:54:32,950 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: Custom Source -> Sink: Unnamed (1/1) (ca057bcbb78c0a81fc471d81db89ec28) switched from DEPLOYING to RUNNING. 2021-01-22 07:54:32,969 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: Custom Source -> Sink: Unnamed (1/1) (ca057bcbb78c0a81fc471d81db89ec28) switched from RUNNING to FAILED on container_1611044725922_0027_01_02 @ slave02 (dataPort=37913). org.apache.kafka.common.KafkaException: Failed to construct kafka producer at org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:432) ~[quickstart-0.1.jar:?] at org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:298) ~[quickstart-0.1.jar:?] at org.apache.flink.streaming.connectors.kafka.internals.FlinkKafkaInternalProducer.(FlinkKafkaInternalProducer.java:77) ~[quickstart-0.1.jar:?] at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.createProducer(FlinkKafkaProducer.java:1230) ~[quickstart-0.1.jar:?] at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initProducer(FlinkKafkaProducer.java:1346) ~[quickstart-0.1.jar:?] at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initNonTransactionalProducer(FlinkKafkaProducer.java:1342) ~[quickstart-0.1.jar:?] at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.beginTransaction(FlinkKafkaProducer.java:990)
Re: flink sql 执行limit 很少的语句依然会暴增
嗯嗯 好的 谢谢大家 ,应该就是这个问题了,merge到分支验证下 On Fri, Jan 22, 2021 at 11:35 AM Shengkai Fang wrote: > hi, LIMIT PUSH DOWN 近期已经merge进了master分支了。 > > [1] https://github.com/apache/flink/pull/13800 > > Land 于2021年1月22日周五 上午11:28写道: > > > 可能是没有下推到MySQL执行。 > > 问题和我遇到的类似: > > http://apache-flink.147419.n8.nabble.com/Flink-MySQL-td10374.html > > > > > > > > -- > > Sent from: http://apache-flink.147419.n8.nabble.com/ > > >