接收Http请求与flink如何建立联系

2022-07-27 Thread
flink版本1.13.2
想通过http请求的方式将参数传给flink,这个怎么实现?


flink中文邮件列表不显示其他用户提问的flink问题。

2022-07-13 Thread
我重新订阅了flink中文邮件,但是列表里没有显示其他用户提问或者解答有关flink相关的问题和答案,这是什么原因?


订阅了flink中文邮件列表找不到用户邮件提问

2022-07-13 Thread
我最近重新订阅了flink中文邮件列表,但是我在收件箱中并没有显示大家提问及解答关于flink相关的问题。这是什么情况?


Re: flink是否支持 http请求并返回json数据

2022-02-09 Thread
我觉得这种方式是可行的,请问一下我应该如何去做,有没有一些资料参考一下呢

Caizhi Weng  于2022年2月9日周三 16:15写道:

> Hi!
>
> Flink 目前没有 http server source / sink。这是一个 OLAP
> 的需求吗?从描述的需求来看,一种更加合理的方式应该是有一个专门的 http server 接受请求,调用 Flink API 运行一个 Flink
> 作业(Flink SQL 可以运行 select 语句),再将结果返回给调用方。
>
> 张锴  于2022年2月9日周三 14:28写道:
>
> >
> >
> 业务需求:通过http请求方式将参数传给flink,将参数带入flink程序再把结果以json的形式返回。请问一下以这种方式实时计算,flink是否支持?
> >
> > flink版本:1.12.1
> >
>


flink是否支持 http请求并返回json数据

2022-02-08 Thread
业务需求:通过http请求方式将参数传给flink,将参数带入flink程序再把结果以json的形式返回。请问一下以这种方式实时计算,flink是否支持?

flink版本:1.12.1


Re: 如何按照数据量,可用内存,核为flink job分配合适的资源

2021-08-23 Thread
不是,你应该认错了

yidan zhao  于2021年8月24日周二 下午12:50写道:

> 你是zhangkai30吗~
>
> 张锴  于2021年8月24日周二 上午11:16写道:
>
> > 我们数据量大概一天6个T,只做一个简单的处理,不涉及计算,在资源够用的情况下,用命令行的方式启动flink job
> > 需要给多少比较合适,比如jobmanager内存 ,taskmanager内存, slot ,   taskmanager数量 ,并行度p,
> > 具体是怎么考虑的呢?
> >
> > 对于大规模的数据量经验还比较浅,有大佬给指明一下吗
> >
>


如何按照数据量,可用内存,核为flink job分配合适的资源

2021-08-23 Thread
我们数据量大概一天6个T,只做一个简单的处理,不涉及计算,在资源够用的情况下,用命令行的方式启动flink job
需要给多少比较合适,比如jobmanager内存 ,taskmanager内存, slot ,   taskmanager数量 ,并行度p,
具体是怎么考虑的呢?

对于大规模的数据量经验还比较浅,有大佬给指明一下吗


广告业务中 如何用flink替换spark一些逻辑处理,是否需要用到processfunction

2021-08-17 Thread
需求描述:
需要将目前的spark程序替换成flink去做,在梳理逻辑的时候有一块不知道用flink咋实现,spark是按每三分钟一个批次来跑的。
描述如下:
广告日志按照ask日志->bid->show->click顺序流程,要求是要将不同的日志都与bid日志merge,来保证bid数据的完整性,key按sessionid+Adid做唯一
逻辑:spark读取多个日志topic
含xxtopic,格式化,joinAll之后得到(string,pair)日志类型pair.logType如果是'bid'直接写到bidtopic,如果是其他类型,需要从之前HBASE缓存中拿bid表匹配,匹配到(可能是show
or click ..)合并输出到bidtopic,
没有匹配到,会有pair.n来记录次数,并写到xxtopic,n>10次(循环来回30分钟)都没有匹配到bid数据直接写到bidtopic,n<=10次内匹配不到bid
n+1,并写到xxtopic进入下个批次。
10次是业务方提的,也就是30分钟的缓存,如果没有10次限定,会有很多数据都写到xxtopic,这里不涉及计算,只是合并,也不去重,假如根据key
找到了3条同样的数据,也要合并三条。

这个用flink怎么实现?


Re: 非对齐检查点还能保证exactly once语义吗

2021-08-02 Thread
谢谢你,受教了

Caizhi Weng  于 2021年8月2日周一 19:28写道:

> Hi!
>
> shimin huang 说的可能是原本的 at least once 的 checkpoint 机制,这种 checkpoint 原本就是不对齐的。
>
> Flink 1.13 完善了 exactly once 条件下的不对齐 checkpoint 机制,因此这是能保证 exactly once
> 的。实现原理简单来说就是把还没处理的数据一起写到 state 里,下次恢复的时候把这些还没处理的数据也恢复出来接着处理。
>
> 张锴  于2021年8月2日周一 下午7:20写道:
>
> > 这个原理能说明一下吗,咋做到的
> >
> > 东东  于2021年8月2日周一 下午7:16写道:
> >
> > > 对于每一个subtask,边界仍然是清晰的,所以精确一次可以保证,只不过ck会变大。
> > >
> > > 在 2021-08-02 18:53:11,"张锴"  写道:
> > > >flink最新特性中有非对齐检查点的特性,可以用来解决一些反压下的任务,但如果用了这个,还能保证精确一次吗?对齐的检查点有清晰的快照N~N
> +
> > > >1之间的边界,这个会将数据混在一起,如何在恢复的时候保证精确一次?
> > >
> >
>


Re: 非对齐检查点还能保证exactly once语义吗

2021-08-02 Thread
这个原理能说明一下吗,咋做到的

东东  于2021年8月2日周一 下午7:16写道:

> 对于每一个subtask,边界仍然是清晰的,所以精确一次可以保证,只不过ck会变大。
>
> 在 2021-08-02 18:53:11,"张锴"  写道:
> >flink最新特性中有非对齐检查点的特性,可以用来解决一些反压下的任务,但如果用了这个,还能保证精确一次吗?对齐的检查点有清晰的快照N~N +
> >1之间的边界,这个会将数据混在一起,如何在恢复的时候保证精确一次?
>


非对齐检查点还能保证exactly once语义吗

2021-08-02 Thread
flink最新特性中有非对齐检查点的特性,可以用来解决一些反压下的任务,但如果用了这个,还能保证精确一次吗?对齐的检查点有清晰的快照N~N +
1之间的边界,这个会将数据混在一起,如何在恢复的时候保证精确一次?


Re: Flink写clickhouse怎么实现精准一次性

2021-05-07 Thread
clickhouse不支持事务及幂等写入,无法保证end to end 精准一次。

李一飞  于2021年5月7日周五 下午10:27写道:

> 请问   Flink写clickhouse怎么实现精准一次性,有啥好办法呀


Re: flink job task在taskmanager上分布不均衡

2021-05-07 Thread
给l另一个job设置个组别名,不同的组不会slot共享,会跑到别的slot上,slot可以灵活的运行在不同的TM上。

allanqinjy  于2021年5月7日周五 下午7:38写道:

>
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/config.html
> flink的配置中是有flink taskmanager配置的,一个tm对应几个slots
> 。taskmanager.numberOfTaskSlots默认是1.并行度是对应了slots数据,一般我们的slots与并行度最大的一样。你可以看一下这个参数设置。然后对照官网说明。
>
>
> | |
> allanqinjy
> |
> |
> allanqi...@163.com
> |
> 签名由网易邮箱大师定制
>
>
> 在2021年05月7日 16:42,wenyuan138 写道:
> flink集群(flink 1.10.1),taskmanager有4个,每个有10个slot。 然后我有2个job,
> 每个并行度是4,预期是会分布到不同taskmanager的slot上(也就是4个taskmanager平均分配2个slot,
> 这样能更好的利用cpu资源). 结果发现这2个job的8个task全部分配到同一个taskmanager上了。 为什么?
> 有什么配置可以改变这种行为吗?
> 我们想要的是task能分到不同的taskmanager上。 谢谢!
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Fwd: flink1.12.2 CLI连接hive出现异常

2021-05-06 Thread
  启动yarn-session之后好了,另外这里为什么需要yarn-session呢?

Rui Li  于2021年5月7日周五 上午10:40写道:

> 感觉是提交job时遇到了问题。可以看看本地SQL client的日志有没有更详细的信息。另外可以试试用yarn session模式提交。
>
> On Fri, Apr 30, 2021 at 5:15 PM 张锴  wrote:
>
> > 我没有指定,就是执行了一条查询语句,自己生成的,但是好像没彻底成功,所以日志没说哪里的问题
> >
> > Rui Li  于2021年4月30日周五 下午4:51写道:
> >
> > > 你好,
> > >
> > > 看错误信息是找不到application_1605840182730_29292,这个是你提交任务的时候指定的么?
> > >
> > > On Thu, Apr 29, 2021 at 1:35 PM 张锴  wrote:
> > >
> > > > 我这里生产的hive没有配置Kerberos认证
> > > >
> > > > 张锴  于2021年4月29日周四 上午10:05写道:
> > > >
> > > > > 官网有说吗,你在哪里找到的呢
> > > > >
> > > > > guoyb <861277...@qq.com> 于2021年4月28日周三 上午10:56写道:
> > > > >
> > > > >> 我的也有这种问题,没解决,kerberos认证的hive导致的。
> > > > >>
> > > > >>
> > > > >>
> > > > >> ---原始邮件---
> > > > >> 发件人: "张锴" > > > >> 发送时间: 2021年4月28日(周三) 上午10:41
> > > > >> 收件人: "user-zh" > > > >> 主题: Fwd: flink1.12.2 CLI连接hive出现异常
> > > > >>
> > > > >>
> > > > >> -- Forwarded message -
> > > > >> 发件人: 张锴  > > > >> Date: 2021年4月27日周二 下午1:59
> > > > >> Subject: flink1.12.2 CLI连接hive出现异常
> > > > >> To:  > > > >>
> > > > >>
> > > > >> *使用flink1.12.2
> > > CLI连接hive时,可以查看到catalog,也能看到指定的catalog下的tables,但是执行select
> > > > >> 语句时就出现异常。*
> > > > >> [ERROR] Could not execute SQL statement. Reason:
> > > > >> org.apache.hadoop.ipc.RemoteException: Application with id
> > > > >> 'application_1605840182730_29292' doesn't exist in RM. Please
> check
> > > that
> > > > >> the job submission was suc
> > > > >> at
> > > > >>
> > > > >>
> > > >
> > >
> >
> org.apache.hadoop.yarn.server.resourcemanager.ClientRMService.getApplicationReport(ClientRMService.java:382)
> > > > >> at
> > > > >>
> > > > >>
> > > >
> > >
> >
> org.apache.hadoop.yarn.api.impl.pb.service.ApplicationClientProtocolPBServiceImpl.getApplicationReport(ApplicationClientProtocolPBServiceImpl.java:
> > > > >> at
> > > > >>
> > > > >>
> > > >
> > >
> >
> org.apache.hadoop.yarn.proto.ApplicationClientProtocol$ApplicationClientProtocolService$2.callBlockingMethod(ApplicationClientProtocol.java:561)
> > > > >> at
> > > > >>
> > > > >>
> > > >
> > >
> >
> org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:524)
> > > > >> at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1025)
> > > > >> at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:876)
> > > > >> at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:822)
> > > > >> at java.security.AccessController.doPrivileged(Native Method)
> > > > >> at javax.security.auth.Subject.doAs(Subject.java:422)
> > > > >> at
> > > > >>
> > > > >>
> > > >
> > >
> >
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1730)
> > > > >> at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2682)
> > > > >>
> > > > >> *使用yarn logs -applicationId application_1605840182730_29292
> > > > >> 查看日志时,并没有给出具体的错误,以下是打出的日志。日志几乎看不出啥问题。*
> > > > >> INFO client.RMProxy: Connecting to ResourceManager at
> > > > >> hadoop01.xxx.xxx.xxx/xx.xx.x.xx:8050
> > > > >> Unable to get ApplicationState. Attempting to fetch logs directly
> > from
> > > > the
> > > > >> filesystem.
> > > > >> Can not find the appOwner. Please specify the correct appOwner
> > > > >> Could not locate application logs for
> > application_1605840182730_29292
> > > > >>
> > > > >> 这个如何排查呢,有遇到类似的问题的小伙伴吗
> > > > >
> > > > >
> > > >
> > >
> > >
> > > --
> > > Best regards!
> > > Rui Li
> > >
> >
>
>
> --
> Best regards!
> Rui Li
>


Re: Fwd: flink1.12.2 CLI连接hive出现异常

2021-05-06 Thread
启动yarn-session之后好了,另外这里为什么需要yarn-session呢?其他的不行吗?

guoyb <861277...@qq.com> 于2021年5月7日周五 上午10:59写道:

> 看看yarn session是不是被kill掉了。
>
>
>
> ---原始邮件---
> 发件人: "Rui Li" 发送时间: 2021年5月7日(周五) 上午10:39
> 收件人: "user-zh" 主题: Re: Fwd: flink1.12.2 CLI连接hive出现异常
>
>
> 感觉是提交job时遇到了问题。可以看看本地SQL client的日志有没有更详细的信息。另外可以试试用yarn session模式提交。
>
> On Fri, Apr 30, 2021 at 5:15 PM 张锴 
>  我没有指定,就是执行了一条查询语句,自己生成的,但是好像没彻底成功,所以日志没说哪里的问题
> 
>  Rui Li  
>   你好,
>  
>   看错误信息是找不到application_1605840182730_29292,这个是你提交任务的时候指定的么?
>  
>   On Thu, Apr 29, 2021 at 1:35 PM 张锴  wrote:
>  
>我这里生产的hive没有配置Kerberos认证
>   
>张锴
> 官网有说吗,你在哪里找到的呢
>
> guoyb <861277...@qq.com 于2021年4月28日周三 上午10:56写道:
>
> 我的也有这种问题,没解决,kerberos认证的hive导致的。
>
>
>
> ---原始邮件---
> 发件人: "张锴" 发送时间: 2021年4月28日(周三) 上午10:41
> 收件人: "user-zh" 主题: Fwd: flink1.12.2 CLI连接hive出现异常
>
>
> -- Forwarded message -
> 发件人: 张锴  Date: 2021年4月27日周二 下午1:59
> Subject: flink1.12.2 CLI连接hive出现异常
> To: 
>
> *使用flink1.12.2
>   CLI连接hive时,可以查看到catalog,也能看到指定的catalog下的tables,但是执行select
> 语句时就出现异常。*
> [ERROR] Could not execute SQL statement. Reason:
> org.apache.hadoop.ipc.RemoteException: Application
> with id
> 'application_1605840182730_29292' doesn't exist in
> RM. Please check
>   that
> the job submission was suc
> at
>
>
>   
>  
> 
> org.apache.hadoop.yarn.server.resourcemanager.ClientRMService.getApplicationReport(ClientRMService.java:382)
> at
>
>
>   
>  
> 
> org.apache.hadoop.yarn.api.impl.pb.service.ApplicationClientProtocolPBServiceImpl.getApplicationReport(ApplicationClientProtocolPBServiceImpl.java:
> at
>
>
>   
>  
> 
> org.apache.hadoop.yarn.proto.ApplicationClientProtocol$ApplicationClientProtocolService$2.callBlockingMethod(ApplicationClientProtocol.java:561)
> at
>
>
>   
>  
> 
> org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:524)
> at
> org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1025)
> at
> org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:876)
> at
> org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:822)
> at
> java.security.AccessController.doPrivileged(Native Method)
> at
> javax.security.auth.Subject.doAs(Subject.java:422)
> at
>
>
>   
>  
> 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1730)
> at
> org.apache.hadoop.ipc.Server$Handler.run(Server.java:2682)
>
> *使用yarn logs -applicationIdnbsp;
> application_1605840182730_29292
> 查看日志时,并没有给出具体的错误,以下是打出的日志。日志几乎看不出啥问题。*
> INFO client.RMProxy: Connecting to ResourceManager
> at
> hadoop01.xxx.xxx.xxx/xx.xx.x.xx:8050
> Unable to get ApplicationState. Attempting to
> fetch logs directly
>  from
>the
> filesystem.
> Can not find the appOwner. Please specify the
> correct appOwner
> Could not locate application logs for
>  application_1605840182730_29292
>
> 这个如何排查呢,有遇到类似的问题的小伙伴吗
>
>
>   
>  
>  
>   --
>   Best regards!
>   Rui Li
>  
> 
>
>
> --
> Best regards!
> Rui Li


Re: flink 侧输出流类型转换问题

2021-04-30 Thread
Date是case class ,我后面加了tostring()就好了

lp <973182...@qq.com> 于2021年4月30日周五 下午5:28写道:

> val outputTagDate = OutputTag[String]("Date-side-output")
> 你的outputtag定义的泛型是string
>
> ctx.output(outputTagDate,Date(first_retain,noob_endtime,noob_first_endtime))
> 这个Date是个什么函数,返回的是string类型么
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


flink watermark 合并策略机制

2021-04-30 Thread
flink 版本1.12.2,在看官方文档介绍watermark时,看到一句话
*标题是:单调递增时间戳分配器*
注意:在 Flink 应用程序中,如果是并行数据源,则只要求并行数据源中的每个*单分区数据源任务*时间戳递增。例如,设置每一个并行数据源实例都只读取一个
Kafka 分区,则时间戳只需在每个 Kafka 分区内递增即可。Flink 的 watermark
合并机制会在并行数据流进行分发(shuffle)、联合(union)、连接(connect)或合并(merge)时生成正确的 watermark。

请问一下报红的部分在官方文档哪里描述了,我没有找到,有人能给指一下吗?


Re: Fwd: flink1.12.2 CLI连接hive出现异常

2021-04-30 Thread
我没有指定,就是执行了一条查询语句,自己生成的,但是好像没彻底成功,所以日志没说哪里的问题

Rui Li  于2021年4月30日周五 下午4:51写道:

> 你好,
>
> 看错误信息是找不到application_1605840182730_29292,这个是你提交任务的时候指定的么?
>
> On Thu, Apr 29, 2021 at 1:35 PM 张锴  wrote:
>
> > 我这里生产的hive没有配置Kerberos认证
> >
> > 张锴  于2021年4月29日周四 上午10:05写道:
> >
> > > 官网有说吗,你在哪里找到的呢
> > >
> > > guoyb <861277...@qq.com> 于2021年4月28日周三 上午10:56写道:
> > >
> > >> 我的也有这种问题,没解决,kerberos认证的hive导致的。
> > >>
> > >>
> > >>
> > >> ---原始邮件---
> > >> 发件人: "张锴" > >> 发送时间: 2021年4月28日(周三) 上午10:41
> > >> 收件人: "user-zh" > >> 主题: Fwd: flink1.12.2 CLI连接hive出现异常
> > >>
> > >>
> > >> -- Forwarded message -
> > >> 发件人: 张锴  > >> Date: 2021年4月27日周二 下午1:59
> > >> Subject: flink1.12.2 CLI连接hive出现异常
> > >> To:  > >>
> > >>
> > >> *使用flink1.12.2
> CLI连接hive时,可以查看到catalog,也能看到指定的catalog下的tables,但是执行select
> > >> 语句时就出现异常。*
> > >> [ERROR] Could not execute SQL statement. Reason:
> > >> org.apache.hadoop.ipc.RemoteException: Application with id
> > >> 'application_1605840182730_29292' doesn't exist in RM. Please check
> that
> > >> the job submission was suc
> > >> at
> > >>
> > >>
> >
> org.apache.hadoop.yarn.server.resourcemanager.ClientRMService.getApplicationReport(ClientRMService.java:382)
> > >> at
> > >>
> > >>
> >
> org.apache.hadoop.yarn.api.impl.pb.service.ApplicationClientProtocolPBServiceImpl.getApplicationReport(ApplicationClientProtocolPBServiceImpl.java:
> > >> at
> > >>
> > >>
> >
> org.apache.hadoop.yarn.proto.ApplicationClientProtocol$ApplicationClientProtocolService$2.callBlockingMethod(ApplicationClientProtocol.java:561)
> > >> at
> > >>
> > >>
> >
> org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:524)
> > >> at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1025)
> > >> at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:876)
> > >> at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:822)
> > >> at java.security.AccessController.doPrivileged(Native Method)
> > >> at javax.security.auth.Subject.doAs(Subject.java:422)
> > >> at
> > >>
> > >>
> >
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1730)
> > >> at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2682)
> > >>
> > >> *使用yarn logs -applicationId application_1605840182730_29292
> > >> 查看日志时,并没有给出具体的错误,以下是打出的日志。日志几乎看不出啥问题。*
> > >> INFO client.RMProxy: Connecting to ResourceManager at
> > >> hadoop01.xxx.xxx.xxx/xx.xx.x.xx:8050
> > >> Unable to get ApplicationState. Attempting to fetch logs directly from
> > the
> > >> filesystem.
> > >> Can not find the appOwner. Please specify the correct appOwner
> > >> Could not locate application logs for application_1605840182730_29292
> > >>
> > >> 这个如何排查呢,有遇到类似的问题的小伙伴吗
> > >
> > >
> >
>
>
> --
> Best regards!
> Rui Li
>


Re: Fwd: flink1.12.2 CLI连接hive出现异常

2021-04-28 Thread
我这里生产的hive没有配置Kerberos认证

张锴  于2021年4月29日周四 上午10:05写道:

> 官网有说吗,你在哪里找到的呢
>
> guoyb <861277...@qq.com> 于2021年4月28日周三 上午10:56写道:
>
>> 我的也有这种问题,没解决,kerberos认证的hive导致的。
>>
>>
>>
>> ---原始邮件---
>> 发件人: "张锴"> 发送时间: 2021年4月28日(周三) 上午10:41
>> 收件人: "user-zh"> 主题: Fwd: flink1.12.2 CLI连接hive出现异常
>>
>>
>> -- Forwarded message -
>> 发件人: 张锴 > Date: 2021年4月27日周二 下午1:59
>> Subject: flink1.12.2 CLI连接hive出现异常
>> To: >
>>
>> *使用flink1.12.2 CLI连接hive时,可以查看到catalog,也能看到指定的catalog下的tables,但是执行select
>> 语句时就出现异常。*
>> [ERROR] Could not execute SQL statement. Reason:
>> org.apache.hadoop.ipc.RemoteException: Application with id
>> 'application_1605840182730_29292' doesn't exist in RM. Please check that
>> the job submission was suc
>> at
>>
>> org.apache.hadoop.yarn.server.resourcemanager.ClientRMService.getApplicationReport(ClientRMService.java:382)
>> at
>>
>> org.apache.hadoop.yarn.api.impl.pb.service.ApplicationClientProtocolPBServiceImpl.getApplicationReport(ApplicationClientProtocolPBServiceImpl.java:
>> at
>>
>> org.apache.hadoop.yarn.proto.ApplicationClientProtocol$ApplicationClientProtocolService$2.callBlockingMethod(ApplicationClientProtocol.java:561)
>> at
>>
>> org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:524)
>> at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1025)
>> at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:876)
>> at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:822)
>> at java.security.AccessController.doPrivileged(Native Method)
>> at javax.security.auth.Subject.doAs(Subject.java:422)
>> at
>>
>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1730)
>> at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2682)
>>
>> *使用yarn logs -applicationId application_1605840182730_29292
>> 查看日志时,并没有给出具体的错误,以下是打出的日志。日志几乎看不出啥问题。*
>> INFO client.RMProxy: Connecting to ResourceManager at
>> hadoop01.xxx.xxx.xxx/xx.xx.x.xx:8050
>> Unable to get ApplicationState. Attempting to fetch logs directly from the
>> filesystem.
>> Can not find the appOwner. Please specify the correct appOwner
>> Could not locate application logs for application_1605840182730_29292
>>
>> 这个如何排查呢,有遇到类似的问题的小伙伴吗
>
>


Re: Fwd: flink1.12.2 CLI连接hive出现异常

2021-04-28 Thread
官网有说吗,你在哪里找到的呢

guoyb <861277...@qq.com> 于2021年4月28日周三 上午10:56写道:

> 我的也有这种问题,没解决,kerberos认证的hive导致的。
>
>
>
> ---原始邮件---
> 发件人: "张锴" 发送时间: 2021年4月28日(周三) 上午10:41
> 收件人: "user-zh" 主题: Fwd: flink1.12.2 CLI连接hive出现异常
>
>
> -- Forwarded message -
> 发件人: 张锴  Date: 2021年4月27日周二 下午1:59
> Subject: flink1.12.2 CLI连接hive出现异常
> To: 
>
> *使用flink1.12.2 CLI连接hive时,可以查看到catalog,也能看到指定的catalog下的tables,但是执行select
> 语句时就出现异常。*
> [ERROR] Could not execute SQL statement. Reason:
> org.apache.hadoop.ipc.RemoteException: Application with id
> 'application_1605840182730_29292' doesn't exist in RM. Please check that
> the job submission was suc
> at
>
> org.apache.hadoop.yarn.server.resourcemanager.ClientRMService.getApplicationReport(ClientRMService.java:382)
> at
>
> org.apache.hadoop.yarn.api.impl.pb.service.ApplicationClientProtocolPBServiceImpl.getApplicationReport(ApplicationClientProtocolPBServiceImpl.java:
> at
>
> org.apache.hadoop.yarn.proto.ApplicationClientProtocol$ApplicationClientProtocolService$2.callBlockingMethod(ApplicationClientProtocol.java:561)
> at
>
> org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:524)
> at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1025)
> at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:876)
> at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:822)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at
>
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1730)
> at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2682)
>
> *使用yarn logs -applicationId application_1605840182730_29292
> 查看日志时,并没有给出具体的错误,以下是打出的日志。日志几乎看不出啥问题。*
> INFO client.RMProxy: Connecting to ResourceManager at
> hadoop01.xxx.xxx.xxx/xx.xx.x.xx:8050
> Unable to get ApplicationState. Attempting to fetch logs directly from the
> filesystem.
> Can not find the appOwner. Please specify the correct appOwner
> Could not locate application logs for application_1605840182730_29292
>
> 这个如何排查呢,有遇到类似的问题的小伙伴吗


Fwd: flink1.12.2 CLI连接hive出现异常

2021-04-27 Thread
-- Forwarded message -
发件人: 张锴 
Date: 2021年4月27日周二 下午1:59
Subject: flink1.12.2 CLI连接hive出现异常
To: 


*使用flink1.12.2 CLI连接hive时,可以查看到catalog,也能看到指定的catalog下的tables,但是执行select
语句时就出现异常。*
[ERROR] Could not execute SQL statement. Reason:
org.apache.hadoop.ipc.RemoteException: Application with id
'application_1605840182730_29292' doesn't exist in RM. Please check that
the job submission was suc
at
org.apache.hadoop.yarn.server.resourcemanager.ClientRMService.getApplicationReport(ClientRMService.java:382)
at
org.apache.hadoop.yarn.api.impl.pb.service.ApplicationClientProtocolPBServiceImpl.getApplicationReport(ApplicationClientProtocolPBServiceImpl.java:
at
org.apache.hadoop.yarn.proto.ApplicationClientProtocol$ApplicationClientProtocolService$2.callBlockingMethod(ApplicationClientProtocol.java:561)
at
org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:524)
at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1025)
at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:876)
at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:822)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1730)
at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2682)

*使用yarn logs -applicationId  application_1605840182730_29292
查看日志时,并没有给出具体的错误,以下是打出的日志。日志几乎看不出啥问题。*
INFO client.RMProxy: Connecting to ResourceManager at
hadoop01.xxx.xxx.xxx/xx.xx.x.xx:8050
Unable to get ApplicationState. Attempting to fetch logs directly from the
filesystem.
Can not find the appOwner. Please specify the correct appOwner
Could not locate application logs for application_1605840182730_29292

这个如何排查呢,有遇到类似的问题的小伙伴吗


flink1.12.2 CLI连接hive出现异常

2021-04-26 Thread
*使用flink1.12.2 CLI连接hive时,可以查看到catalog,也能看到指定的catalog下的tables,但是执行select
语句时就出现异常。*
[ERROR] Could not execute SQL statement. Reason:
org.apache.hadoop.ipc.RemoteException: Application with id
'application_1605840182730_29292' doesn't exist in RM. Please check that
the job submission was suc
at
org.apache.hadoop.yarn.server.resourcemanager.ClientRMService.getApplicationReport(ClientRMService.java:382)
at
org.apache.hadoop.yarn.api.impl.pb.service.ApplicationClientProtocolPBServiceImpl.getApplicationReport(ApplicationClientProtocolPBServiceImpl.java:
at
org.apache.hadoop.yarn.proto.ApplicationClientProtocol$ApplicationClientProtocolService$2.callBlockingMethod(ApplicationClientProtocol.java:561)
at
org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:524)
at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1025)
at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:876)
at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:822)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1730)
at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2682)

*使用yarn logs -applicationId  application_1605840182730_29292
查看日志时,并没有给出具体的错误,以下是打出的日志。日志几乎看不出啥问题。*
INFO client.RMProxy: Connecting to ResourceManager at
hadoop01.xxx.xxx.xxx/xx.xx.x.xx:8050
Unable to get ApplicationState. Attempting to fetch logs directly from the
filesystem.
Can not find the appOwner. Please specify the correct appOwner
Could not locate application logs for application_1605840182730_29292

这个如何排查呢,有遇到类似的问题的小伙伴吗


flink 侧输出流类型转换问题

2021-04-26 Thread
flink版本使用1.12.2,业务需要用到了侧输出流,首先先创建了一个侧输出标签
val outputTagDate = OutputTag[String]("Date-side-output")。程序写完编译的时候报
type mismatch :
 found   : org.apache.flink.streaming.api.scala.OutputTag[String]
 required: org.apache.flink.util.OutputTag[java.io.Serializable]
Note: String <: java.io.Serializable, but Java-defined class OutputTag is
invariant in type T.
You may wish to investigate a wildcard type such as `_ <:
java.io.Serializable`. (SLS 3.2.10)
ctx.output(outputTagDate,
Date(first_retain,noob_endtime,noob_first_endtime))
之后我修改了包名 import org.apache.flink.util.{OutputTag =>CommOutputTag}
然后重新创建  val outputTagDate = new CommOutputTag [String]("Date-side-output")
但是后面从测流中取时 val dateDS: DataStream[String] = ds.getSideOutput(outputTagDate)
报错。还是报type.,mismatch,
required: org.apache.flink.streamiing.api.scala.OutputTag[NotInferedX]
found:  org.apache.flink.util.OutputTag   [String]
  outputTagDate这个变量怎么一会儿需要   org.apache.flink.streaming.api.scala.OutputTag
一会儿又要 org.apache.flink.util.OutputTag呢。
为啥会这样呢,这块应该怎么书写呢?


Re: 多个复杂算子保证精准一次性

2021-04-26 Thread
中间的状态也不能丢,两个都需要

hk__lrzy  于2021年4月25日周日 下午8:25写道:

> 所有算子都需要维护。
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


DataStreamAPI 与flink sql疑问

2021-04-26 Thread
flink版本使用的是1.12.2.。请问如果在Dstream 上用一些Operater,比如map
,flatmap,process等,可以在其重写的方法中使用tableEnv.sqlQuery("xxx")
tableEnv.createTemporaryView(),这种sql吗,能这样结合吗?


Re: flink如何从流中取出自定义的数据结构并赋值给变量

2021-04-25 Thread
用侧输出流的方式能单独把值取出来吗?这个要怎么取值呢

JasonLee <17610775...@163.com> 于2021年4月25日周日 下午5:58写道:

> hi
>
> 你可以用 filter 过滤出多个流或者用测流输出的方式分流处理
>
>
>
> -
> Best Wishes
> JasonLee
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


flink如何从流中取出自定义的数据结构并赋值给变量

2021-04-25 Thread
flink版本使用1.12.2。有一个需求就是想要从stream中拿出自定义的数据结构,暂且叫a并赋值给后面变量,基于这个a取他的属性作一些判断操作。
比如:
   val ds: DataStream[b] = stream.filter(_.nonEmpty).map(new
MapFunction[String, b] {

  override def map(value: String) = {
  val recallKafka = JSON.parseObject(value, classOf[a])

  b(recallKafka.group_id, value, recallKafka.eventTime)

  }
})

val kafkaCommonData: a =recallKafka
判断条件
 if (kafkaCommonData.data.date != null) {x}
if (kafkaCommonData.data.userinfo != null) {}
.
请问一下,我通过什么方法能单独把流中的某个数据结构给取出来呢?如果有方式的话应该要怎么写呢?大佬们帮忙看一下啊,卡了好几天 了,难受。。




Re: Flink 1.12.0 隔几个小时Checkpoint就会失败

2021-04-22 Thread
你好,我也遇到了这个问题,你的checkpoint是怎么配置的,可以参考一下吗

Haihang Jing  于2021年3月23日周二 下午8:04写道:

> 你好,问题定位到了吗?
> 我也遇到了相同的问题,感觉和checkpoint interval有关
> 我有两个相同的作业(checkpoint interval
> 设置的是3分钟),一个运行在flink1.9,一个运行在flink1.12,1.9的作业稳定运行,1.12的运行5小时就会checkpoint
> 制作失败,抛异常 org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint
> tolerable failure threshold.
> 当我把checkpoint interval调大到10分钟后,1.12的作业也可以稳定运行,所以我怀疑和制作间隔有关。
>
> 看到过一个issuse,了解到flink1.10后对于checkpoint机制进行调整,接收端在barrier对齐时不会缓存单个barrier到达后的数据,意味着发送方必须在barrier对齐后等待credit
> feedback来传输数据,因此发送方会产生一定的冷启动,影响到延迟和网络吞吐量。但是不确定是不是一定和这个相关,以及如何定位影响。
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


如何将flink sql 查询语句的count值取出

2021-04-20 Thread
我使用的flink版本1.12.2。
有个问题请教一下,如何在flink sql 查询语句中将count值取出。
先举个例子:
val total: Int = hiveContext.sql("select count(*) from
a").collect()(0)(0).toString.toInt
可以把count值拿出来,那如果用flink sql去做的话 怎样取出查询后的结果呢。
1、是否flink sql可以这么做?
2、如果可以应该怎么写呢?


如何将flink sql 查询语句的count值取出

2021-04-20 Thread
flink版本1.12.2。
有个问题请教一下,如何在flink sql 查询语句中将count值取出
例如:tableEnv.sqlQuery("select count(*) from a")
将这个count值取出并返回。


Re: flink1.12.2 StreamingFileSink 问题

2021-04-17 Thread
请问是用flink CLI的方式写hive,还是用code方式呢,暂时没用过flink sql 不知道标准开发是怎样

LiangbinZhang  于2021年4月16日周五 下午6:57写道:

> Hi,张锴
> Flink1.12支持sql直接写hive表,可以做到分钟级的数据查询,不知道符不符合你的业务需求。
>
>
>
> 张锴 wrote
> >
> flink用的1.12.2,要sink到hdfs,选用了StreamingFileSink,导入依赖的时候maven仓库并没有1.12.2的flink-connector-filesystem的jar包,我应该选用哪个版本合适
>
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink1.12.2 StreamingFileSink 问题

2021-04-15 Thread
maven 仓库flink-connector-filesystem 最高1.11.3,也能用吗?

guoyb <861277...@qq.com> 于2021年4月15日周四 下午10:01写道:

> 1.12.0的也可以,大版本一样就行了
>
>
>
> ---原始邮件---
> 发件人: "张锴" 发送时间: 2021年4月15日(周四) 下午5:16
> 收件人: "user-zh" 主题: flink1.12.2 StreamingFileSink 问题
>
>
>
> flink用的1.12.2,要sink到hdfs,选用了StreamingFileSink,导入依赖的时候maven仓库并没有1.12.2的flink-connector-filesystem的jar包,我应该选用哪个版本合适


flink1.12.2 StreamingFileSink 问题

2021-04-15 Thread
flink用的1.12.2,要sink到hdfs,选用了StreamingFileSink,导入依赖的时候maven仓库并没有1.12.2的flink-connector-filesystem的jar包,我应该选用哪个版本合适


提交flink-sql 出现无法部署到yarn集群

2021-04-14 Thread
在用flink-sql的方式连接hive时 ,出现以下错误:
 The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The main method
caused an error: Could not deploy Yarn job cluster.
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:366)
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:219)
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
at
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)
at
org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)
at
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1688)
at
org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)
Caused by: org.apache.flink.client.deployment.ClusterDeploymentException:
Could not deploy Yarn job cluster.
at
org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:481)
at
org.apache.flink.client.deployment.executors.AbstractJobClusterExecutor.execute(AbstractJobClusterExecutor.java:81)
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1905)
at
org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:135)
at
org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:76)
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1782)
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1765)
at com.erwan.flinksql.FlinkConnectHive.main(FlinkConnectHive.java:49)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:349)
... 11 more
Caused by:
org.apache.flink.yarn.YarnClusterDescriptor$YarnDeploymentException: The
YARN application unexpectedly switched to state FAILED during deployment.
Diagnostics from YARN: Application application_1618298202025_0017 failed 1
times (global limit =2; local limit is =1) due to AM Container for
appattempt_1618298202025_0017_01 exited with  exitCode: 2
Failing this attempt.Diagnostics: [2021-04-14 19:04:02.506]Exception from
container-launch.
Container id: container_e13_1618298202025_0017_01_01
Exit code: 2。

由于错误原因不明显,不好排查,也不确定是到底是哪里的问题,请问有什么办法能够定位问题。


flink sql 客户端连接hive 3.1.0出现connection refused异常

2021-04-13 Thread
flink版本1.12.2 ,在交互式界面执行flink sql 连接hive操作时,出现连接拒绝异常,内容如下:
Flink SQL> use catalog myhive;

Flink SQL> show tables;
dim_kcl_customer_source_1h_all
mytest

Flink SQL> select * from dim_kcl_customer_source_1h_all limit 10;
2021-04-14 10:22:24,451 WARN  org.apache.hadoop.hive.conf.HiveConf
[] - HiveConf of name hive.hook.proto.base-directory does
not exist
2021-04-14 10:22:24,452 WARN  org.apache.hadoop.hive.conf.HiveConf
[] - HiveConf of name hive.strict.managed.tables does not
exist
2021-04-14 10:22:24,452 WARN  org.apache.hadoop.hive.conf.HiveConf
[] - HiveConf of name hive.stats.fetch.partition.stats does
not exist
2021-04-14 10:22:24,452 WARN  org.apache.hadoop.hive.conf.HiveConf
[] - HiveConf of name hive.heapsize does not exist
2021-04-14 10:22:24,466 INFO
 org.apache.hadoop.hive.metastore.HiveMetaStoreClient [] - Trying
to connect to metastore with URI thrift://test-hadoop002:9083
2021-04-14 10:22:24,467 INFO
 org.apache.hadoop.hive.metastore.HiveMetaStoreClient [] - Opened a
connection to metastore, current connections: 2
2021-04-14 10:22:24,468 INFO
 org.apache.hadoop.hive.metastore.HiveMetaStoreClient [] -
Connected to metastore.
2021-04-14 10:22:24,468 INFO
 org.apache.hadoop.hive.metastore.RetryingMetaStoreClient [] -
RetryingMetaStoreClient proxy=class
org.apache.hadoop.hive.metastore.HiveMetaStoreClient ugi=hadoop
(auth:SIMPLE) retries=24 delay=5 lifetime=0
2021-04-14 10:22:24,609 INFO
 org.apache.hadoop.hive.metastore.HiveMetaStoreClient [] - Closed a
connection to metastore, current connections: 1
2021-04-14 10:22:25,057 WARN
 org.apache.hadoop.hdfs.shortcircuit.DomainSocketFactory  [] - The
short-circuit local reads feature cannot be used because libhadoop cannot
be loaded.
2021-04-14 10:22:25,235 INFO  org.apache.hadoop.mapred.FileInputFormat
[] - Total input files to process : 1
[ERROR]
*Could not execute SQL statement. Reason:java.net.ConnectException:
Connection refused*

*表能查出来,执行sql语句时遇到拒绝连接,在hive里面执行同样的sql能查到数据,请问一下这块出现的问题是什么原因导致的呢。该如何排查*


"flink 1.12.2升级之后恢复任务出现异常"

2021-04-12 Thread
在client 提交命令出现以下错误,没遇到过,不清楚如何配置。

Exception in thread "Thread-5" java.lang.IllegalStateException: Trying to
access closed classloader. Please check if you store classloaders directly
or indirectly in static fields. If the stacktrace suggests that the leak
occurs in a third party library and cannot be fixed immediately, you can
disable this check with the configuration
'classloader.check-leaked-classloader'.
at
org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.ensureInner(FlinkUserCodeClassLoaders.java:164)
at
org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.getResource(FlinkUserCodeClassLoaders.java:183)
at org.apache.hadoop.conf.Configuration.getResource(Configuration.java:2738)
at
org.apache.hadoop.conf.Configuration.loadResource(Configuration.java:2952)
at
org.apache.hadoop.conf.Configuration.loadResources(Configuration.java:2926)
at org.apache.hadoop.conf.Configuration.getProps(Configuration.java:2806)
at org.apache.hadoop.conf.Configuration.get(Configuration.java:1200)
at
org.apache.hadoop.conf.Configuration.getTimeDuration(Configuration.java:1788)
at
org.apache.hadoop.util.ShutdownHookManager.getShutdownTimeout(ShutdownHookManager.java:183)
at
org.apache.hadoop.util.ShutdownHookManager.shutdownExecutor(ShutdownHookManager.java:145)
at
org.apache.hadoop.util.ShutdownHookManager.access$300(ShutdownHookManager.java:65)
at
org.apache.hadoop.util.ShutdownHookManager$1.run(ShutdownHookManager.java:102)

hadoop 版本使用的3.x的,哪位大佬有遇到过这个错误,如何解决。


Re: flink 从mysql读取数据异常

2021-03-30 Thread
报错 信息明确说了只支持insert

air23  于2021年3月30日周二 上午10:32写道:

> 你好 参考官网
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/table/connectors/jdbc.html
> 这边读取mysql jdbc数据报错Exception in thread "main"
> org.apache.flink.table.api.TableException: Only insert statement is
> supported now.
>
>
> String  a = "-- register a MySQL table 'users' in Flink SQL\n" +
> "CREATE TABLE MyUserTable (\n" +
> "  id BIGINT\n" +
> ") WITH (\n" +
> "   'connector' = 'jdbc',\n" +
> "   'url' = 'jdbc:mysql://***:3306/monitor',\n" +
> "   'table-name' = 't1',\n" +
> "   'username' = 'root',\n" +
> "   'password' = '***'\n" +
> ") ";
>
> String b ="-- scan data from the JDBC table\n" +
> "SELECT id FROM MyUserTable\n";
>
> tEnv.executeSql(a);
>
>
>
> 请问是不可以从mysql读取数据吗?
>
>
>
>
>


flink 与 hive版本选择

2021-03-08 Thread
请教一下各位大佬,flink哪个版本与hive3.x以上的版本兼容性更好呢,目前在flink版本上做选择,后续暂不会升级,希望大佬们给点建议。


Re: 根据业务需求选择合适的flink state

2021-01-23 Thread
[MinMaxTemp] = elements.iterator
if (iterator.hasNext) {
  val value: MinMaxTemp = iterator.next()
  id = value.id
  courseId= value.courseId
  partnerId = value.partnerId
  ip = value.ip
  customerId = value.customerId
  courseNumber = value.courseNumber
  nickName = value.nickName
  liveType = value.liveType
  uid = value.uid
  minTsState.update(value.mineventTime) //更新最小时间戳
  maxTsState.update(value.maxeventTime)  //更新最大时间戳
}
join_time = DateUtil.convertTimeStamp2DateStr(minTs,
DateUtil.SECOND_DATE_FORMAT)
leave_time = DateUtil.convertTimeStamp2DateStr(maxTs,
DateUtil.SECOND_DATE_FORMAT)
duration = (maxTs - minTs) / 1000  //停留多少秒
duration_time = DateUtil.secondsToFormat(duration)  //停留时分秒
minTsState.clear()
maxTsState.clear()

out.collect(CloudliveWatcher(id, partnerId, courseId, customerId,
courseNumber, nickName, ip, device_type, net_opretor, net_type, area,
join_time, leave_time, created_time, modified_time
  , liveType, plat_form, duration, duration_time,
network_operator, role, useragent, uid, eventTime))

CloudliveWatcher(id, partnerId, courseId, customerId,
courseNumber, nickName, ip, device_type, net_opretor, net_type, area,
join_time, leave_time, created_time, modified_time
  , liveType, plat_form, duration, duration_time,
network_operator, role, useragent, uid, eventTime)

  }
}





赵一旦  于2021年1月21日周四 下午8:38写道:

> 我表达的方法是按照session
> window将数据分窗口,实际就是把连续1min没数据的那部分数据给你分割好,然后这部分数据中的最大时间戳和最小时间戳的差就是你要的结果理论上。
>
> 实现的话就是用2个状态,分别保存最大最小时间戳,没进来一个数据,对比更新最大最小时间戳即可。
>
>
> 然后在窗口被触发的时候将结果输出。使用reduce+procesWindowFunction配合。reduce的时候只计算最大最小,触发的时候才基于2个状态计算duration输出结果。
>
> 赵一旦  于2021年1月21日周四 下午8:28写道:
>
> > 我其实没看懂你逻辑。这个和窗口的最大最小时间戳啥关系。
> >
> > 张锴  于2021年1月21日周四 下午6:25写道:
> >
> >> 你好,之前我用了你上诉的方法出现一个问题,我并没有用min/max,我是在procss方法里用的context.window.getStart和
> >>
> >>
> context.window.getEnd作为开始和结束时间的,感觉这样也能获得最大和最小值,但是出来的数据最长停留了4分多钟,我跑的离线任务停留的时长有几个小时的都有,感觉出来的效果有问题。
> >> 下面是我的部分代码逻辑:
> >>
> >> val ds = dataStream
> >>   .filter(_.liveType == 1)
> >>   .keyBy(1, 2)
> >>   .window(EventTimeSessionWindows.withGap(Time.minutes(1)))
> >>   .process(new myProcessWindow()).uid("process-id")
> >>
> >> class myProcessWindow() extends
> >> ProcessWindowFunction[CloudLiveLogOnLine, CloudliveWatcher, Tuple,
> >> TimeWindow] {
> >>
> >>   override def process(key: Tuple, context: Context, elements:
> >> Iterable[CloudLiveLogOnLine], out: Collector[CloudliveWatcher]): Unit
> >> = {
> >> var startTime = context.window.getStart //定义第一个元素进入窗口的开始时间
> >> var endTime = context.window.getEnd //定义最后一个元素进入窗口的时间
> >>
> >> val currentDate = DateUtil.currentDate
> >> val created_time = currentDate
> >> val modified_time = currentDate
> >>  。。。
> >>
> >> val join_time: String =
> >> DateUtil.convertTimeStamp2DateStr(startTime,
> >> DateUtil.SECOND_DATE_FORMAT)
> >> val leave_time:String = DateUtil.convertTimeStamp2DateStr(endTime,
> >> DateUtil.SECOND_DATE_FORMAT)
> >> val duration = (endTime - startTime) / 1000  //停留多少秒
> >> val duration_time = DateUtil.secondsToFormat(duration)  //停留时分秒
> >> out.collect(CloudliveWatcher(id, partnerId, courseId, customerId,
> >> courseNumber, nickName, ip, device_type, net_opretor, net_type, area,
> >> join_time, leave_time, created_time, modified_time
> >>   , liveType, plat_form, duration, duration_time,
> >> network_operator, role, useragent, uid, eventTime))
> >>
> >> CloudliveWatcher(id, partnerId, courseId, customerId,
> >> courseNumber, nickName, ip, device_type, net_opretor, net_type, area,
> >> join_time, leave_time, created_time, modified_time
> >>   , liveType, plat_form, duration, duration_time,
> >> network_operator, role, useragent, uid, eventTime)
> >>
> >> }
> >>
> >>
> >> 这样写是否合适,如果要用min/max应该如何代入上诉逻辑当中?
> >>
> >>
> >>
> >>
> >> 赵一旦  于2020年12月28日周一 下午7:12写道:
> >>
> >> > 按直播间ID和用户ID分组,使用session
> >> window,使用1min作为gap,统计key+window内的count即可,即sum(1)。
> >> >
> >> > 或者感觉你打点实际不一定肯定是1min、2min这种整时间点,可以统计key+window内的min/max,然后输出的时候做个减法。
> >> >
> >> > session window的作用就是根据连续2个元素的gap决定是否将2个元素放入同一个window中。
> >> >
> >> >
> >> > 张锴  于2020年12月28日周一 下午5:35写道:
> >> >
> >> > > 能描述一下用session window的考虑吗
> >> > >
> >> > > Akisaya  于2020年12月28日周一 下午

Re: 根据业务需求选择合适的flink state

2021-01-22 Thread
@赵一旦
可以添加一下微信好友吗,具体的实践上还有点问题,我是在window后直接reduce(new myReduceFunc(),new
AssignWindowProcessFunc())自定义了这两个方法,但是效果还是有点问题,不知道我的写法是不是有问题

赵一旦  于2021年1月22日周五 上午10:10写道:

> 我理解你要的最终mysql结果表是:
> 直播间ID;用户ID;上线时间;下线时间;durationn=(下线时间 - 上线时间);
>
> 如果user1在直播间1,一天内出现10次,就出现10个记录,分别记录了每次的duration。
>
>
> 如上按照我的方案就可以实现哈。
>
> xuhaiLong  于2021年1月22日周五 上午10:03写道:
>
> > 可以试试这样,mysql 中 设置唯一键为窗口的 startTime 和
> > userId,然后对用户的每个窗口做停留时间的计算,最终会同一个用户在一天会产生多条记录,不过窗口的 startTime 不同,取值的时候sum
> 试试?
> >
> >
> > 在2021年1月21日 18:24,张锴 写道:
> > 你好,之前我用了你上诉的方法出现一个问题,我并没有用min/max,我是在procss方法里用的context.window.getStart和
> >
> >
> context.window.getEnd作为开始和结束时间的,感觉这样也能获得最大和最小值,但是出来的数据最长停留了4分多钟,我跑的离线任务停留的时长有几个小时的都有,感觉出来的效果有问题。
> > 下面是我的部分代码逻辑:
> >
> > val ds = dataStream
> > .filter(_.liveType == 1)
> > .keyBy(1, 2)
> > .window(EventTimeSessionWindows.withGap(Time.minutes(1)))
> > .process(new myProcessWindow()).uid("process-id")
> >
> > class myProcessWindow() extends
> > ProcessWindowFunction[CloudLiveLogOnLine, CloudliveWatcher, Tuple,
> > TimeWindow] {
> >
> > override def process(key: Tuple, context: Context, elements:
> > Iterable[CloudLiveLogOnLine], out: Collector[CloudliveWatcher]): Unit
> > = {
> > var startTime = context.window.getStart //定义第一个元素进入窗口的开始时间
> > var endTime = context.window.getEnd //定义最后一个元素进入窗口的时间
> >
> > val currentDate = DateUtil.currentDate
> > val created_time = currentDate
> > val modified_time = currentDate
> > 。。。
> >
> > val join_time: String =
> > DateUtil.convertTimeStamp2DateStr(startTime,
> > DateUtil.SECOND_DATE_FORMAT)
> > val leave_time:String = DateUtil.convertTimeStamp2DateStr(endTime,
> > DateUtil.SECOND_DATE_FORMAT)
> > val duration = (endTime - startTime) / 1000  //停留多少秒
> > val duration_time = DateUtil.secondsToFormat(duration)  //停留时分秒
> > out.collect(CloudliveWatcher(id, partnerId, courseId, customerId,
> > courseNumber, nickName, ip, device_type, net_opretor, net_type, area,
> > join_time, leave_time, created_time, modified_time
> > , liveType, plat_form, duration, duration_time,
> > network_operator, role, useragent, uid, eventTime))
> >
> > CloudliveWatcher(id, partnerId, courseId, customerId,
> > courseNumber, nickName, ip, device_type, net_opretor, net_type, area,
> > join_time, leave_time, created_time, modified_time
> > , liveType, plat_form, duration, duration_time,
> > network_operator, role, useragent, uid, eventTime)
> >
> > }
> >
> >
> > 这样写是否合适,如果要用min/max应该如何代入上诉逻辑当中?
> >
> >
> >
> >
> > 赵一旦  于2020年12月28日周一 下午7:12写道:
> >
> > 按直播间ID和用户ID分组,使用session window,使用1min作为gap,统计key+window内的count即可,即sum(1)。
> >
> > 或者感觉你打点实际不一定肯定是1min、2min这种整时间点,可以统计key+window内的min/max,然后输出的时候做个减法。
> >
> > session window的作用就是根据连续2个元素的gap决定是否将2个元素放入同一个window中。
> >
> >
> > 张锴  于2020年12月28日周一 下午5:35写道:
> >
> > 能描述一下用session window的考虑吗
> >
> > Akisaya  于2020年12月28日周一 下午5:00写道:
> >
> > 这个可以用 session window 吧
> >
> >
> >
> >
> >
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#session-windows
> >
> > news_...@163.com  于2020年12月28日周一 下午2:15写道:
> >
> > 这么做的前提是每条记录是顺序进入KAFKA的才行,但真实场景不能保证这条,有可能较早的打点记录却较晚进入kafka队列。
> >
> >
> >
> > news_...@163.com
> >
> > 发件人: 张锴
> > 发送时间: 2020-12-28 13:35
> > 收件人: user-zh
> > 主题: 根据业务需求选择合适的flink state
> > 各位大佬帮我分析下如下需求应该怎么写
> >
> > 需求说明:
> >
> > 公司是做直播业务方面的,现在需要实时统计用户的在线时长,来源kafka,每分钟产生直播打点记录,数据中包含eventTime字段。举个例子,用户A
> >
> >
> >
> >
> >
> >
> 在1,2,3分钟一直产生打点记录,那么他的停留时长就是3分钟,第5,6分钟又来了,那他的停留时长就是2分钟,只要是连续的打点记录就合成一条记录,每个直播间每个用户同理。
> >
> > 我的想法:
> > 我现在的想法是按直播间ID和用户ID分组,然后process,想通过state方式来做,通过截取每条记录的event Time中的分钟数
> > 减去上一条的分钟数,如果他们差值等于1,说明是连续关系,则继续。如果不为1,说明断开了,直接输出这条记录,同时情况当前key的状态。
> >
> > 不知道各位大佬对我的这个想法有什么建议,或者说有更好的处理方式。
> >
> > flink 版本1.10.1
> >
> >
> >
> >
> >
>


Re: 根据业务需求选择合适的flink state

2021-01-21 Thread
@赵一旦
我今天调整一下逻辑再试试

赵一旦  于2021年1月22日周五 上午10:10写道:

> 我理解你要的最终mysql结果表是:
> 直播间ID;用户ID;上线时间;下线时间;durationn=(下线时间 - 上线时间);
>
> 如果user1在直播间1,一天内出现10次,就出现10个记录,分别记录了每次的duration。
>
>
> 如上按照我的方案就可以实现哈。
>
> xuhaiLong  于2021年1月22日周五 上午10:03写道:
>
> > 可以试试这样,mysql 中 设置唯一键为窗口的 startTime 和
> > userId,然后对用户的每个窗口做停留时间的计算,最终会同一个用户在一天会产生多条记录,不过窗口的 startTime 不同,取值的时候sum
> 试试?
> >
> >
> > 在2021年1月21日 18:24,张锴 写道:
> > 你好,之前我用了你上诉的方法出现一个问题,我并没有用min/max,我是在procss方法里用的context.window.getStart和
> >
> >
> context.window.getEnd作为开始和结束时间的,感觉这样也能获得最大和最小值,但是出来的数据最长停留了4分多钟,我跑的离线任务停留的时长有几个小时的都有,感觉出来的效果有问题。
> > 下面是我的部分代码逻辑:
> >
> > val ds = dataStream
> > .filter(_.liveType == 1)
> > .keyBy(1, 2)
> > .window(EventTimeSessionWindows.withGap(Time.minutes(1)))
> > .process(new myProcessWindow()).uid("process-id")
> >
> > class myProcessWindow() extends
> > ProcessWindowFunction[CloudLiveLogOnLine, CloudliveWatcher, Tuple,
> > TimeWindow] {
> >
> > override def process(key: Tuple, context: Context, elements:
> > Iterable[CloudLiveLogOnLine], out: Collector[CloudliveWatcher]): Unit
> > = {
> > var startTime = context.window.getStart //定义第一个元素进入窗口的开始时间
> > var endTime = context.window.getEnd //定义最后一个元素进入窗口的时间
> >
> > val currentDate = DateUtil.currentDate
> > val created_time = currentDate
> > val modified_time = currentDate
> > 。。。
> >
> > val join_time: String =
> > DateUtil.convertTimeStamp2DateStr(startTime,
> > DateUtil.SECOND_DATE_FORMAT)
> > val leave_time:String = DateUtil.convertTimeStamp2DateStr(endTime,
> > DateUtil.SECOND_DATE_FORMAT)
> > val duration = (endTime - startTime) / 1000  //停留多少秒
> > val duration_time = DateUtil.secondsToFormat(duration)  //停留时分秒
> > out.collect(CloudliveWatcher(id, partnerId, courseId, customerId,
> > courseNumber, nickName, ip, device_type, net_opretor, net_type, area,
> > join_time, leave_time, created_time, modified_time
> > , liveType, plat_form, duration, duration_time,
> > network_operator, role, useragent, uid, eventTime))
> >
> > CloudliveWatcher(id, partnerId, courseId, customerId,
> > courseNumber, nickName, ip, device_type, net_opretor, net_type, area,
> > join_time, leave_time, created_time, modified_time
> > , liveType, plat_form, duration, duration_time,
> > network_operator, role, useragent, uid, eventTime)
> >
> > }
> >
> >
> > 这样写是否合适,如果要用min/max应该如何代入上诉逻辑当中?
> >
> >
> >
> >
> > 赵一旦  于2020年12月28日周一 下午7:12写道:
> >
> > 按直播间ID和用户ID分组,使用session window,使用1min作为gap,统计key+window内的count即可,即sum(1)。
> >
> > 或者感觉你打点实际不一定肯定是1min、2min这种整时间点,可以统计key+window内的min/max,然后输出的时候做个减法。
> >
> > session window的作用就是根据连续2个元素的gap决定是否将2个元素放入同一个window中。
> >
> >
> > 张锴  于2020年12月28日周一 下午5:35写道:
> >
> > 能描述一下用session window的考虑吗
> >
> > Akisaya  于2020年12月28日周一 下午5:00写道:
> >
> > 这个可以用 session window 吧
> >
> >
> >
> >
> >
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#session-windows
> >
> > news_...@163.com  于2020年12月28日周一 下午2:15写道:
> >
> > 这么做的前提是每条记录是顺序进入KAFKA的才行,但真实场景不能保证这条,有可能较早的打点记录却较晚进入kafka队列。
> >
> >
> >
> > news_...@163.com
> >
> > 发件人: 张锴
> > 发送时间: 2020-12-28 13:35
> > 收件人: user-zh
> > 主题: 根据业务需求选择合适的flink state
> > 各位大佬帮我分析下如下需求应该怎么写
> >
> > 需求说明:
> >
> > 公司是做直播业务方面的,现在需要实时统计用户的在线时长,来源kafka,每分钟产生直播打点记录,数据中包含eventTime字段。举个例子,用户A
> >
> >
> >
> >
> >
> >
> 在1,2,3分钟一直产生打点记录,那么他的停留时长就是3分钟,第5,6分钟又来了,那他的停留时长就是2分钟,只要是连续的打点记录就合成一条记录,每个直播间每个用户同理。
> >
> > 我的想法:
> > 我现在的想法是按直播间ID和用户ID分组,然后process,想通过state方式来做,通过截取每条记录的event Time中的分钟数
> > 减去上一条的分钟数,如果他们差值等于1,说明是连续关系,则继续。如果不为1,说明断开了,直接输出这条记录,同时情况当前key的状态。
> >
> > 不知道各位大佬对我的这个想法有什么建议,或者说有更好的处理方式。
> >
> > flink 版本1.10.1
> >
> >
> >
> >
> >
>


Re: Re: Re: Flink写hdfs提交任务就报错。Recoverable writers on Hadoop are only supported for HDFS

2021-01-21 Thread
@赵一旦
另外,上次我还提了一个问题请教你,我试了你说的那个想法,但是好像有点问题,你可以看一下

张锴  于2021年1月21日周四 下午7:13写道:

> 我用的flink 1.10版,FlieSink就是BucketingSink,我是用这个写hdfs的
>
> 赵一旦  于2021年1月21日周四 下午7:05写道:
>
>> @Michael Ran; 嗯嗯,没关系。
>>
>> @张锴 你说的是flink哪个版本的connector,stream or sql。我搜了下我的没有。我是1.12,stream。
>>
>> 目前看文档有streamFileSink,还有FileSink,从文档内容来看使用方式差不多。我计划试一下FileSink,但不清楚FileSink和StreamFileSink啥区别,是否都能写hadoop类文件系统,因为涉及是否原子写,比较分布式文件系统不支持追加和编辑等。
>>
>> Michael Ran  于2021年1月21日周四 下午7:01写道:
>>
>> >
>> >
>> 很抱歉,我已经很久没用过这个了。但是可以根据异常信息以及API源码执行进行分析,确定是否能直接写入。如果你要写入自定义的文件系统,那么只能实现自己的sink方式。或者你的文件系统的写入方式兼容hdfs的上层API可以参考各个sink端的写法
>> > 在 2021-01-21 18:45:06,"张锴"  写道:
>> > >import
>> org.apache.flink.streaming.connectors.fs.bucketing.{BucketingSink,
>> > >DateTimeBucketer}
>> > >
>> > >sink.setBucketer sink.setWriter用这种方式试试
>> > >
>> > >
>> > >
>> > >赵一旦  于2021年1月21日周四 下午6:37写道:
>> > >
>> > >> @Michael Ran
>> > >> 然后有什么解决方案吗,我这个是使用flink的streamFileSink方式写hdfs的时候出现的异常。
>> > >>
>> > >> Michael Ran  于2021年1月21日周四 下午5:23写道:
>> > >>
>> > >> > 这里应该是用了hdfs 的特定API吧,文件系统没兼容public
>> > >> > HadoopRecoverableWriter(org.apache.hadoop.fs.FileSystem fs) {...}
>> > >> > 在 2021-01-21 17:18:23,"赵一旦"  写道:
>> > >> > >具体报错信息如下:
>> > >> > >
>> > >> > >java.lang.UnsupportedOperationException: Recoverable writers on
>> > Hadoop
>> > >> are
>> > >> > >only supported for HDFS
>> > >> > >at
>> > org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.(
>> > >> > >HadoopRecoverableWriter.java:61)
>> > >> > >at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem
>> > >> > >.createRecoverableWriter(HadoopFileSystem.java:210)
>> > >> > >at org.apache.flink.core.fs.SafetyNetWrapperFileSystem
>> > >> > >.createRecoverableWriter(SafetyNetWrapperFileSystem.java:69)
>> > >> > >at org.apache.flink.streaming.api.functions.sink.filesystem.
>> > >> >
>> > >StreamingFileSink$RowFormatBuilder.createBucketWriter(StreamingFileSink
>> > >> > >.java:260)
>> > >> > >at org.apache.flink.streaming.api.functions.sink.filesystem.
>> > >> >
>> > >> >
>> > >>
>> >
>> >StreamingFileSink$RowFormatBuilder.createBuckets(StreamingFileSink.java:270)
>> > >> > >at org.apache.flink.streaming.api.functions.sink.filesystem.
>> > >> > >StreamingFileSink.initializeState(StreamingFileSink.java:412)
>> > >> > >at
>> > org.apache.flink.streaming.util.functions.StreamingFunctionUtils
>> > >> > >.tryRestoreFunction(StreamingFunctionUtils.java:185)
>> > >> > >at
>> > org.apache.flink.streaming.util.functions.StreamingFunctionUtils
>> > >> > >.restoreFunctionState(StreamingFunctionUtils.java:167)
>> > >> > >at
>> > >> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator
>> > >> > >.initializeState(AbstractUdfStreamOperator.java:96)
>> > >> > >at
>> > >> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler
>> > >> > >.initializeOperatorState(StreamOperatorStateHandler.java:107)
>> > >> > >at
>> > org.apache.flink.streaming.api.operators.AbstractStreamOperator
>> > >> > >.initializeState(AbstractStreamOperator.java:264)
>> > >> > >at org.apache.flink.streaming.runtime.tasks.OperatorChain
>> > >> > >.initializeStateAndOpenOperators(OperatorChain.java:400)
>> > >> > >at org.apache.flink.streaming.runtime.tasks.StreamTask
>> > >> > >.lambda$beforeInvoke$2(StreamTask.java:507)
>> > >> > >at
>> > >> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1
>> > >> > >.runThrowing(StreamTaskActionExecutor.java:47)
>> > >> > >at
>> > org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(
>> > >> > >StreamTask.java:501)
>> > >> > >at
>> > >> >
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask
>> > >> > >.java:531)
>> > >> > >at
>> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
>> > >> > >at
>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
>> > >> > >at java.lang.Thread.run(Thread.java:748)
>> > >> > >
>> > >> > >
>> > >> > >赵一旦  于2021年1月21日周四 下午5:17写道:
>> > >> > >
>> > >> > >> Recoverable writers on Hadoop are only supported for HDFS
>> > >> > >>
>> > >> > >> 如上,我们用的hadoop协议的,但底层不是hdfs,是公司自研的分布式文件系统。
>> > >> > >>
>> > >> > >> 使用spark写,spark-sql读等都没问题。但是flink写和读当前都没尝试成功。
>> > >> > >>
>> > >> > >>
>> > >> > >>
>> > >> >
>> > >>
>> >
>>
>


Re: Re: Re: Flink写hdfs提交任务就报错。Recoverable writers on Hadoop are only supported for HDFS

2021-01-21 Thread
我用的flink 1.10版,FlieSink就是BucketingSink,我是用这个写hdfs的

赵一旦  于2021年1月21日周四 下午7:05写道:

> @Michael Ran; 嗯嗯,没关系。
>
> @张锴 你说的是flink哪个版本的connector,stream or sql。我搜了下我的没有。我是1.12,stream。
>
> 目前看文档有streamFileSink,还有FileSink,从文档内容来看使用方式差不多。我计划试一下FileSink,但不清楚FileSink和StreamFileSink啥区别,是否都能写hadoop类文件系统,因为涉及是否原子写,比较分布式文件系统不支持追加和编辑等。
>
> Michael Ran  于2021年1月21日周四 下午7:01写道:
>
> >
> >
> 很抱歉,我已经很久没用过这个了。但是可以根据异常信息以及API源码执行进行分析,确定是否能直接写入。如果你要写入自定义的文件系统,那么只能实现自己的sink方式。或者你的文件系统的写入方式兼容hdfs的上层API可以参考各个sink端的写法
> > 在 2021-01-21 18:45:06,"张锴"  写道:
> > >import
> org.apache.flink.streaming.connectors.fs.bucketing.{BucketingSink,
> > >DateTimeBucketer}
> > >
> > >sink.setBucketer sink.setWriter用这种方式试试
> > >
> > >
> > >
> > >赵一旦  于2021年1月21日周四 下午6:37写道:
> > >
> > >> @Michael Ran
> > >> 然后有什么解决方案吗,我这个是使用flink的streamFileSink方式写hdfs的时候出现的异常。
> > >>
> > >> Michael Ran  于2021年1月21日周四 下午5:23写道:
> > >>
> > >> > 这里应该是用了hdfs 的特定API吧,文件系统没兼容public
> > >> > HadoopRecoverableWriter(org.apache.hadoop.fs.FileSystem fs) {...}
> > >> > 在 2021-01-21 17:18:23,"赵一旦"  写道:
> > >> > >具体报错信息如下:
> > >> > >
> > >> > >java.lang.UnsupportedOperationException: Recoverable writers on
> > Hadoop
> > >> are
> > >> > >only supported for HDFS
> > >> > >at
> > org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.(
> > >> > >HadoopRecoverableWriter.java:61)
> > >> > >at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem
> > >> > >.createRecoverableWriter(HadoopFileSystem.java:210)
> > >> > >at org.apache.flink.core.fs.SafetyNetWrapperFileSystem
> > >> > >.createRecoverableWriter(SafetyNetWrapperFileSystem.java:69)
> > >> > >at org.apache.flink.streaming.api.functions.sink.filesystem.
> > >> >
> > >StreamingFileSink$RowFormatBuilder.createBucketWriter(StreamingFileSink
> > >> > >.java:260)
> > >> > >at org.apache.flink.streaming.api.functions.sink.filesystem.
> > >> >
> > >> >
> > >>
> >
> >StreamingFileSink$RowFormatBuilder.createBuckets(StreamingFileSink.java:270)
> > >> > >at org.apache.flink.streaming.api.functions.sink.filesystem.
> > >> > >StreamingFileSink.initializeState(StreamingFileSink.java:412)
> > >> > >at
> > org.apache.flink.streaming.util.functions.StreamingFunctionUtils
> > >> > >.tryRestoreFunction(StreamingFunctionUtils.java:185)
> > >> > >at
> > org.apache.flink.streaming.util.functions.StreamingFunctionUtils
> > >> > >.restoreFunctionState(StreamingFunctionUtils.java:167)
> > >> > >at
> > >> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator
> > >> > >.initializeState(AbstractUdfStreamOperator.java:96)
> > >> > >at
> > >> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler
> > >> > >.initializeOperatorState(StreamOperatorStateHandler.java:107)
> > >> > >at
> > org.apache.flink.streaming.api.operators.AbstractStreamOperator
> > >> > >.initializeState(AbstractStreamOperator.java:264)
> > >> > >at org.apache.flink.streaming.runtime.tasks.OperatorChain
> > >> > >.initializeStateAndOpenOperators(OperatorChain.java:400)
> > >> > >at org.apache.flink.streaming.runtime.tasks.StreamTask
> > >> > >.lambda$beforeInvoke$2(StreamTask.java:507)
> > >> > >at
> > >> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1
> > >> > >.runThrowing(StreamTaskActionExecutor.java:47)
> > >> > >at
> > org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(
> > >> > >StreamTask.java:501)
> > >> > >at
> > >> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask
> > >> > >.java:531)
> > >> > >at
> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
> > >> > >at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
> > >> > >at java.lang.Thread.run(Thread.java:748)
> > >> > >
> > >> > >
> > >> > >赵一旦  于2021年1月21日周四 下午5:17写道:
> > >> > >
> > >> > >> Recoverable writers on Hadoop are only supported for HDFS
> > >> > >>
> > >> > >> 如上,我们用的hadoop协议的,但底层不是hdfs,是公司自研的分布式文件系统。
> > >> > >>
> > >> > >> 使用spark写,spark-sql读等都没问题。但是flink写和读当前都没尝试成功。
> > >> > >>
> > >> > >>
> > >> > >>
> > >> >
> > >>
> >
>


Re: Re: Flink写hdfs提交任务就报错。Recoverable writers on Hadoop are only supported for HDFS

2021-01-21 Thread
import org.apache.flink.streaming.connectors.fs.bucketing.{BucketingSink,
DateTimeBucketer}

sink.setBucketer sink.setWriter用这种方式试试



赵一旦  于2021年1月21日周四 下午6:37写道:

> @Michael Ran
> 然后有什么解决方案吗,我这个是使用flink的streamFileSink方式写hdfs的时候出现的异常。
>
> Michael Ran  于2021年1月21日周四 下午5:23写道:
>
> > 这里应该是用了hdfs 的特定API吧,文件系统没兼容public
> > HadoopRecoverableWriter(org.apache.hadoop.fs.FileSystem fs) {...}
> > 在 2021-01-21 17:18:23,"赵一旦"  写道:
> > >具体报错信息如下:
> > >
> > >java.lang.UnsupportedOperationException: Recoverable writers on Hadoop
> are
> > >only supported for HDFS
> > >at org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.(
> > >HadoopRecoverableWriter.java:61)
> > >at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem
> > >.createRecoverableWriter(HadoopFileSystem.java:210)
> > >at org.apache.flink.core.fs.SafetyNetWrapperFileSystem
> > >.createRecoverableWriter(SafetyNetWrapperFileSystem.java:69)
> > >at org.apache.flink.streaming.api.functions.sink.filesystem.
> > >StreamingFileSink$RowFormatBuilder.createBucketWriter(StreamingFileSink
> > >.java:260)
> > >at org.apache.flink.streaming.api.functions.sink.filesystem.
> >
> >
> >StreamingFileSink$RowFormatBuilder.createBuckets(StreamingFileSink.java:270)
> > >at org.apache.flink.streaming.api.functions.sink.filesystem.
> > >StreamingFileSink.initializeState(StreamingFileSink.java:412)
> > >at org.apache.flink.streaming.util.functions.StreamingFunctionUtils
> > >.tryRestoreFunction(StreamingFunctionUtils.java:185)
> > >at org.apache.flink.streaming.util.functions.StreamingFunctionUtils
> > >.restoreFunctionState(StreamingFunctionUtils.java:167)
> > >at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator
> > >.initializeState(AbstractUdfStreamOperator.java:96)
> > >at
> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler
> > >.initializeOperatorState(StreamOperatorStateHandler.java:107)
> > >at org.apache.flink.streaming.api.operators.AbstractStreamOperator
> > >.initializeState(AbstractStreamOperator.java:264)
> > >at org.apache.flink.streaming.runtime.tasks.OperatorChain
> > >.initializeStateAndOpenOperators(OperatorChain.java:400)
> > >at org.apache.flink.streaming.runtime.tasks.StreamTask
> > >.lambda$beforeInvoke$2(StreamTask.java:507)
> > >at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1
> > >.runThrowing(StreamTaskActionExecutor.java:47)
> > >at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(
> > >StreamTask.java:501)
> > >at
> > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask
> > >.java:531)
> > >at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
> > >at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
> > >at java.lang.Thread.run(Thread.java:748)
> > >
> > >
> > >赵一旦  于2021年1月21日周四 下午5:17写道:
> > >
> > >> Recoverable writers on Hadoop are only supported for HDFS
> > >>
> > >> 如上,我们用的hadoop协议的,但底层不是hdfs,是公司自研的分布式文件系统。
> > >>
> > >> 使用spark写,spark-sql读等都没问题。但是flink写和读当前都没尝试成功。
> > >>
> > >>
> > >>
> >
>


Re: 根据业务需求选择合适的flink state

2021-01-21 Thread
你好,之前我用了你上诉的方法出现一个问题,我并没有用min/max,我是在procss方法里用的context.window.getStart和
context.window.getEnd作为开始和结束时间的,感觉这样也能获得最大和最小值,但是出来的数据最长停留了4分多钟,我跑的离线任务停留的时长有几个小时的都有,感觉出来的效果有问题。
下面是我的部分代码逻辑:

val ds = dataStream
  .filter(_.liveType == 1)
  .keyBy(1, 2)
  .window(EventTimeSessionWindows.withGap(Time.minutes(1)))
  .process(new myProcessWindow()).uid("process-id")

class myProcessWindow() extends
ProcessWindowFunction[CloudLiveLogOnLine, CloudliveWatcher, Tuple,
TimeWindow] {

  override def process(key: Tuple, context: Context, elements:
Iterable[CloudLiveLogOnLine], out: Collector[CloudliveWatcher]): Unit
= {
var startTime = context.window.getStart //定义第一个元素进入窗口的开始时间
var endTime = context.window.getEnd //定义最后一个元素进入窗口的时间

val currentDate = DateUtil.currentDate
val created_time = currentDate
val modified_time = currentDate
 。。。

val join_time: String =
DateUtil.convertTimeStamp2DateStr(startTime,
DateUtil.SECOND_DATE_FORMAT)
val leave_time:String = DateUtil.convertTimeStamp2DateStr(endTime,
DateUtil.SECOND_DATE_FORMAT)
val duration = (endTime - startTime) / 1000  //停留多少秒
val duration_time = DateUtil.secondsToFormat(duration)  //停留时分秒
out.collect(CloudliveWatcher(id, partnerId, courseId, customerId,
courseNumber, nickName, ip, device_type, net_opretor, net_type, area,
join_time, leave_time, created_time, modified_time
  , liveType, plat_form, duration, duration_time,
network_operator, role, useragent, uid, eventTime))

CloudliveWatcher(id, partnerId, courseId, customerId,
courseNumber, nickName, ip, device_type, net_opretor, net_type, area,
join_time, leave_time, created_time, modified_time
  , liveType, plat_form, duration, duration_time,
network_operator, role, useragent, uid, eventTime)

}


这样写是否合适,如果要用min/max应该如何代入上诉逻辑当中?




赵一旦  于2020年12月28日周一 下午7:12写道:

> 按直播间ID和用户ID分组,使用session window,使用1min作为gap,统计key+window内的count即可,即sum(1)。
>
> 或者感觉你打点实际不一定肯定是1min、2min这种整时间点,可以统计key+window内的min/max,然后输出的时候做个减法。
>
> session window的作用就是根据连续2个元素的gap决定是否将2个元素放入同一个window中。
>
>
> 张锴  于2020年12月28日周一 下午5:35写道:
>
> > 能描述一下用session window的考虑吗
> >
> > Akisaya  于2020年12月28日周一 下午5:00写道:
> >
> > > 这个可以用 session window 吧
> > >
> > >
> >
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#session-windows
> > >
> > > news_...@163.com  于2020年12月28日周一 下午2:15写道:
> > >
> > > > 这么做的前提是每条记录是顺序进入KAFKA的才行,但真实场景不能保证这条,有可能较早的打点记录却较晚进入kafka队列。
> > > >
> > > >
> > > >
> > > > news_...@163.com
> > > >
> > > > 发件人: 张锴
> > > > 发送时间: 2020-12-28 13:35
> > > > 收件人: user-zh
> > > > 主题: 根据业务需求选择合适的flink state
> > > > 各位大佬帮我分析下如下需求应该怎么写
> > > >
> > > > 需求说明:
> > > >
> > 公司是做直播业务方面的,现在需要实时统计用户的在线时长,来源kafka,每分钟产生直播打点记录,数据中包含eventTime字段。举个例子,用户A
> > > >
> > > >
> > >
> >
> 在1,2,3分钟一直产生打点记录,那么他的停留时长就是3分钟,第5,6分钟又来了,那他的停留时长就是2分钟,只要是连续的打点记录就合成一条记录,每个直播间每个用户同理。
> > > >
> > > > 我的想法:
> > > > 我现在的想法是按直播间ID和用户ID分组,然后process,想通过state方式来做,通过截取每条记录的event Time中的分钟数
> > > > 减去上一条的分钟数,如果他们差值等于1,说明是连续关系,则继续。如果不为1,说明断开了,直接输出这条记录,同时情况当前key的状态。
> > > >
> > > > 不知道各位大佬对我的这个想法有什么建议,或者说有更好的处理方式。
> > > >
> > > > flink 版本1.10.1
> > > >
> > >
> >
>


flink watermark 为负数的异常

2021-01-12 Thread
hi,我通过webUI查看了当前的程序,watermark变为-2,一直没变过,出现这种情况是什么原因,一头雾水。截图在附件里,帮忙分析一下。


Re: Re: flink waterMark 相关问题

2021-01-12 Thread
ok,明白了

anonnius  于2021年1月13日周三 上午10:20写道:

> 在 StreamExecutionEnvironmennt的方法@PublicEvolving   public
> void setStreamTimeCharacteristic(TimeCharacteristic characteristic) {
>  this.timeCharacteristic =
> Preconditions.checkNotNull(characteristic);  if
> (characteristic == TimeCharacteristic.ProcessingTime) {
>  getConfig().setAutoWatermarkInterval(0);   } else {
>  getConfig().setAutoWatermarkInterval(200);
>  }  }
> 在 2021-01-13 10:09:54,"张锴"  写道:
> >我从ExecutionConfig找到了,private long autoWatermarkInterval =
> >0,并不是200毫秒,这个代表一个时间戳就代表一个watermark是吗
> >
> >Ball's Holy <873925...@qq.com> 于2021年1月13日周三 上午9:42写道:
> >
> >> hi张锴,
> >> 如果我没记错的话,默认时间间隔是200毫秒,具体设置细节可以看flink.api.common.ExecutionConfig
> >> 对应的属性autoWatermarkInterval
> >>
> >>
> >>
> >>
> >> --原始邮件--
> >> 发件人: "anonnius" >> 发送时间: 2021年1月13日(星期三) 上午9:19
> >> 收件人: "user-zh" >> 主题: Re:flink waterMark 相关问题
> >>
> >>
> >>
> >> 可以看一下 ExecutionConfig这个类
> >> 在 2021-01-12 17:55:47,"张锴"  >> hi,我使用的flink版本是1.10,我想问一下watermark的默认时间间隔是多少,以及所对应的源码位置在哪里,我并没有找到。
>


Re: flink waterMark 相关问题

2021-01-12 Thread
我从ExecutionConfig找到了,private long autoWatermarkInterval =
0,并不是200毫秒,这个代表一个时间戳就代表一个watermark是吗

Ball's Holy <873925...@qq.com> 于2021年1月13日周三 上午9:42写道:

> hi张锴,
> 如果我没记错的话,默认时间间隔是200毫秒,具体设置细节可以看flink.api.common.ExecutionConfig
> 对应的属性autoWatermarkInterval
>
>
>
>
> --原始邮件--
> 发件人: "anonnius" 发送时间: 2021年1月13日(星期三) 上午9:19
> 收件人: "user-zh" 主题: Re:flink waterMark 相关问题
>
>
>
> 可以看一下 ExecutionConfig这个类
> 在 2021-01-12 17:55:47,"张锴"  hi,我使用的flink版本是1.10,我想问一下watermark的默认时间间隔是多少,以及所对应的源码位置在哪里,我并没有找到。


Re: flink waterMark 相关问题

2021-01-12 Thread
谢谢你

anonnius  于2021年1月13日周三 上午9:19写道:

> 可以看一下 ExecutionConfig这个类
> 在 2021-01-12 17:55:47,"张锴"  写道:
> >hi,我使用的flink版本是1.10,我想问一下watermark的默认时间间隔是多少,以及所对应的源码位置在哪里,我并没有找到。
>


flink waterMark 相关问题

2021-01-12 Thread
hi,我使用的flink版本是1.10,我想问一下watermark的默认时间间隔是多少,以及所对应的源码位置在哪里,我并没有找到。


Re: flink的算子没有类似于spark的cache操作吗?

2021-01-07 Thread
保存中间变量可以用状态存

李继  于2021年1月7日周四 下午5:42写道:

> HI , 请问当一个算子会被多次使用时,怎么把他缓存住,类似于spark的cache操作
>
> val env = getBatchEnv
> val ds = env.fromElements("a","b","c")
>
> val ds2 = ds.map(x=>{
>   println("map op")
>   x.charAt(0).toInt+1
> })
>
> //此操作会打印三遍map op
> ds2.print()
>
> //此操作又会打印三遍map op
> ds2.filter(_>100).print()
>


Re: 根据业务需求选择合适的flink state

2020-12-28 Thread
感谢你,稍后我会按这种思路试试

赵一旦  于2020年12月28日周一 下午7:12写道:

> 按直播间ID和用户ID分组,使用session window,使用1min作为gap,统计key+window内的count即可,即sum(1)。
>
> 或者感觉你打点实际不一定肯定是1min、2min这种整时间点,可以统计key+window内的min/max,然后输出的时候做个减法。
>
> session window的作用就是根据连续2个元素的gap决定是否将2个元素放入同一个window中。
>
>
> 张锴  于2020年12月28日周一 下午5:35写道:
>
> > 能描述一下用session window的考虑吗
> >
> > Akisaya  于2020年12月28日周一 下午5:00写道:
> >
> > > 这个可以用 session window 吧
> > >
> > >
> >
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#session-windows
> > >
> > > news_...@163.com  于2020年12月28日周一 下午2:15写道:
> > >
> > > > 这么做的前提是每条记录是顺序进入KAFKA的才行,但真实场景不能保证这条,有可能较早的打点记录却较晚进入kafka队列。
> > > >
> > > >
> > > >
> > > > news_...@163.com
> > > >
> > > > 发件人: 张锴
> > > > 发送时间: 2020-12-28 13:35
> > > > 收件人: user-zh
> > > > 主题: 根据业务需求选择合适的flink state
> > > > 各位大佬帮我分析下如下需求应该怎么写
> > > >
> > > > 需求说明:
> > > >
> > 公司是做直播业务方面的,现在需要实时统计用户的在线时长,来源kafka,每分钟产生直播打点记录,数据中包含eventTime字段。举个例子,用户A
> > > >
> > > >
> > >
> >
> 在1,2,3分钟一直产生打点记录,那么他的停留时长就是3分钟,第5,6分钟又来了,那他的停留时长就是2分钟,只要是连续的打点记录就合成一条记录,每个直播间每个用户同理。
> > > >
> > > > 我的想法:
> > > > 我现在的想法是按直播间ID和用户ID分组,然后process,想通过state方式来做,通过截取每条记录的event Time中的分钟数
> > > > 减去上一条的分钟数,如果他们差值等于1,说明是连续关系,则继续。如果不为1,说明断开了,直接输出这条记录,同时情况当前key的状态。
> > > >
> > > > 不知道各位大佬对我的这个想法有什么建议,或者说有更好的处理方式。
> > > >
> > > > flink 版本1.10.1
> > > >
> > >
> >
>


Re: 根据业务需求选择合适的flink state

2020-12-28 Thread
能描述一下用session window的考虑吗

Akisaya  于2020年12月28日周一 下午5:00写道:

> 这个可以用 session window 吧
>
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#session-windows
>
> news_...@163.com  于2020年12月28日周一 下午2:15写道:
>
> > 这么做的前提是每条记录是顺序进入KAFKA的才行,但真实场景不能保证这条,有可能较早的打点记录却较晚进入kafka队列。
> >
> >
> >
> > news_...@163.com
> >
> > 发件人: 张锴
> > 发送时间: 2020-12-28 13:35
> > 收件人: user-zh
> > 主题: 根据业务需求选择合适的flink state
> > 各位大佬帮我分析下如下需求应该怎么写
> >
> > 需求说明:
> > 公司是做直播业务方面的,现在需要实时统计用户的在线时长,来源kafka,每分钟产生直播打点记录,数据中包含eventTime字段。举个例子,用户A
> >
> >
> 在1,2,3分钟一直产生打点记录,那么他的停留时长就是3分钟,第5,6分钟又来了,那他的停留时长就是2分钟,只要是连续的打点记录就合成一条记录,每个直播间每个用户同理。
> >
> > 我的想法:
> > 我现在的想法是按直播间ID和用户ID分组,然后process,想通过state方式来做,通过截取每条记录的event Time中的分钟数
> > 减去上一条的分钟数,如果他们差值等于1,说明是连续关系,则继续。如果不为1,说明断开了,直接输出这条记录,同时情况当前key的状态。
> >
> > 不知道各位大佬对我的这个想法有什么建议,或者说有更好的处理方式。
> >
> > flink 版本1.10.1
> >
>


Re: 根据业务需求选择合适的flink state

2020-12-28 Thread
能描述一下用session window的考虑吗

Akisaya  于2020年12月28日周一 下午5:00写道:

> 这个可以用 session window 吧
>
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#session-windows
>
> news_...@163.com  于2020年12月28日周一 下午2:15写道:
>
> > 这么做的前提是每条记录是顺序进入KAFKA的才行,但真实场景不能保证这条,有可能较早的打点记录却较晚进入kafka队列。
> >
> >
> >
> > news_...@163.com
> >
> > 发件人: 张锴
> > 发送时间: 2020-12-28 13:35
> > 收件人: user-zh
> > 主题: 根据业务需求选择合适的flink state
> > 各位大佬帮我分析下如下需求应该怎么写
> >
> > 需求说明:
> > 公司是做直播业务方面的,现在需要实时统计用户的在线时长,来源kafka,每分钟产生直播打点记录,数据中包含eventTime字段。举个例子,用户A
> >
> >
> 在1,2,3分钟一直产生打点记录,那么他的停留时长就是3分钟,第5,6分钟又来了,那他的停留时长就是2分钟,只要是连续的打点记录就合成一条记录,每个直播间每个用户同理。
> >
> > 我的想法:
> > 我现在的想法是按直播间ID和用户ID分组,然后process,想通过state方式来做,通过截取每条记录的event Time中的分钟数
> > 减去上一条的分钟数,如果他们差值等于1,说明是连续关系,则继续。如果不为1,说明断开了,直接输出这条记录,同时情况当前key的状态。
> >
> > 不知道各位大佬对我的这个想法有什么建议,或者说有更好的处理方式。
> >
> > flink 版本1.10.1
> >
>


Re: flink-connector-clickhouse写入ClickHouse 问题

2020-12-27 Thread
换个第三方工具看看 https://github.com/blynkkk/clickhouse4j


cc.blynk.clickhouse
clickhouse4j
1.4.4



DanielGu <610493...@qq.com> 于2020年12月28日周一 上午12:22写道:

> 使用了阿里的包,写入clickhouse
> 阿里云flink-connector-clickhouse写入ClickHouse
> <
> https://help.aliyun.com/document_detail/185696.html?spm=5176.12901015.0.i12901015.2b41525cECNyYW=1ac9126237284ef9b0a25f666c3030dfxaso>
>
>
> 测试写入clickhouse ,返回如下,无报错,但并未成功写入,不知从何下手排查,请教各位大佬
> +-+
> | default_catalog.default_database.sink_table |
> +-+
> |  -1 |
> +-+
>
>
> 代码如下
> package com.daniel
> import org.apache.flink.streaming.api.scala._
> import org.apache.flink.table.sources._
> import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
> import org.apache.flink.table.api._
> import org.apache.flink.types.Row
> import org.apache.flink.table.api.{
>   TableEnvironment,
>   TableSchema,
>   Types,
>   ValidationException
> }
>
> object StreamingJob {
>   def main(args: Array[String]) {
> val SourceCsvPath =
>   "/Users/flink-sql-demo/flink-sql-source.csv"
>
> val env = StreamExecutionEnvironment.getExecutionEnvironment
>
> env.getConfig.disableClosureCleaner
>
> val tEnv = StreamTableEnvironment.create(env)
>
> val csvTableSource = CsvTableSource
>   .builder()
>   .path(SourceCsvPath)
>   .ignoreFirstLine()
>   .fieldDelimiter(",")
>   .field("name", DataTypes.STRING())
>   .field("age", DataTypes.BIGINT())
> //  .field("sex", DataTypes.STRING())
> //  .field("grade", DataTypes.INT())
>   .field("rate", DataTypes.FLOAT())
>   .build()
>
> tEnv.registerTableSource("source", csvTableSource)
>
> val create_sql =
>   s"""
>  | CREATE TABLE sink_table (
>  |name VARCHAR
>  |) WITH (
>  |'connector' = 'clickhouse',
>  |'url' = 'clickhouse://*:8080',
>  |'username' = '',
>  |'password' = '',
>  |'database-name' = '***',
>  |'table-name' = 'live.d_sink_table',
>  |'sink.batch-size' = '1',
>  |'sink.partition-strategy' = 'hash',
>  |'sink.partition-key' = 'name'
>  |)
>  |""".stripMargin
>
>
>
> tEnv.executeSql(create_sql);
>
> val result = tEnv.executeSql(
>   "INSERT INTO sink_table SELECT name FROM source"
> )
> result.print()
>   }
>
> }
>
>
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


根据业务需求选择合适的flink state

2020-12-27 Thread
各位大佬帮我分析下如下需求应该怎么写

需求说明:
公司是做直播业务方面的,现在需要实时统计用户的在线时长,来源kafka,每分钟产生直播打点记录,数据中包含eventTime字段。举个例子,用户A
在1,2,3分钟一直产生打点记录,那么他的停留时长就是3分钟,第5,6分钟又来了,那他的停留时长就是2分钟,只要是连续的打点记录就合成一条记录,每个直播间每个用户同理。

我的想法:
我现在的想法是按直播间ID和用户ID分组,然后process,想通过state方式来做,通过截取每条记录的event Time中的分钟数
减去上一条的分钟数,如果他们差值等于1,说明是连续关系,则继续。如果不为1,说明断开了,直接输出这条记录,同时情况当前key的状态。

不知道各位大佬对我的这个想法有什么建议,或者说有更好的处理方式。

flink 版本1.10.1


Re: 对于kafka partition 设置时间戳及watermark

2020-12-20 Thread
我现在用的flink 版本1.10.1 ,我点开 setAutoWatermarkInterval 看到private long
autoWatermarkInterval = 0;
是否代表它默认的执行频率是0,我理解的意思抽取的时间戳同时生成watermark,它们是一一对应的,不知道我的理解是否正确

赵一旦  于2020年12月20日周日 下午11:15写道:

> setAutoWatermarkInterval这个只是设置interval。决定你那个抽取ts的函数的执行频率的。
>
> r pp  于2020年12月20日周日 上午10:49写道:
>
> > 是的
> >
> > 张锴  于2020年12月19日周六 下午5:45写道:
> >
> > > 我按官网操作,重写了序列化方式
> > >
> > > val kafkaSource = new FlinkKafkaConsumer09[MyType]("myTopic", schema,
> > > props)kafkaSource.assignTimestampsAndWatermarks(new
> > > AscendingTimestampExtractor[MyType] {
> > > def extractAscendingTimestamp(element: MyType): Long =
> > > element.eventTimestamp})
> > > val stream: DataStream[MyType] = env.addSource(kafkaSource)
> > >
> > > *有个疑问,这样写完之后是不是不用设置*setAutoWatermarkInterval 呢?
> > >
> >
>


对于kafka partition 设置时间戳及watermark

2020-12-19 Thread
我按官网操作,重写了序列化方式

val kafkaSource = new FlinkKafkaConsumer09[MyType]("myTopic", schema,
props)kafkaSource.assignTimestampsAndWatermarks(new
AscendingTimestampExtractor[MyType] {
def extractAscendingTimestamp(element: MyType): Long =
element.eventTimestamp})
val stream: DataStream[MyType] = env.addSource(kafkaSource)

*有个疑问,这样写完之后是不是不用设置*setAutoWatermarkInterval 呢?


Re: kafka的多分区watermark

2020-12-13 Thread
谢谢你,我想明白了

Shuai Xia  于2020年12月14日周一 下午2:08写道:

>
> Hi,没有太理解你的意思,这个MyType只是说你可以把Kafka的数据反序列化后使用,像SimpleStringSchema默认是String,你可以对他进行解析
>
>
> ------
> 发件人:张锴 
> 发送时间:2020年12月14日(星期一) 13:51
> 收件人:user-zh 
> 主 题:kafka的多分区watermark
>
> 在官网看到对于Kafka分区的时间戳定义描述,给出了示例,如下图:
>
> FlinkKafkaConsumer09 kafkaSource = new
> FlinkKafkaConsumer09<>("myTopic", schema,
> props);kafkaSource.assignTimestampsAndWatermarks(new
> AscendingTimestampExtractor() {
>
> @Override
> public long extractAscendingTimestamp(MyType element) {
> return element.eventTimestamp();
> }});
> DataStream stream = env.addSource(kafkaSource);
>
> *不太理解这个里面泛型传的是用户定义的case class,还是传*ConsumerRecord,从他里面提取时间戳
>


kafka的多分区watermark

2020-12-13 Thread
在官网看到对于Kafka分区的时间戳定义描述,给出了示例,如下图:

FlinkKafkaConsumer09 kafkaSource = new
FlinkKafkaConsumer09<>("myTopic", schema,
props);kafkaSource.assignTimestampsAndWatermarks(new
AscendingTimestampExtractor() {

@Override
public long extractAscendingTimestamp(MyType element) {
return element.eventTimestamp();
}});
DataStream stream = env.addSource(kafkaSource);

*不太理解这个里面泛型传的是用户定义的case class,还是传*ConsumerRecord,从他里面提取时间戳


Re: flink savepoint

2020-11-08 Thread
看到了,通过JM看到是写的权限没有,改了之后就好了

Congxian Qiu  于2020年11月6日周五 下午1:31写道:

> Hi
>  从 client 端日志,或者 JM 日志还能看到其他的异常么?
> Best,
> Congxian
>
>
> 张锴  于2020年11月6日周五 上午11:42写道:
>
> > 重启和反压都正常
> > 另外增加了从客户端到master的时间,还是有这个问题
> >
> > hailongwang <18868816...@163.com> 于 2020年11月6日周五 10:54写道:
> >
> > > Hi,
> > >
> > >
> > > 这个报错只是在规定的时间内没有完成 Savepoint,导致客户端连接 Master 超时,
> > > 具体的原因需要看下 Jobmaster 的日志。
> > > PS:在任务一直重启、反压的情况下,一般 Savepoint 都会失败。
> > >
> > >
> > > Best,
> > > Hailong Wang
> > >
> > >
> > >
> > >
> > > 在 2020-11-06 09:33:48,"张锴"  写道:
> > > >本人在使用flink savepoint 保存快照的时候,遇到错误,目前不清楚是因为什么导致这个问题,路过的大佬帮忙看下。
> > > >
> > > >flink 版本1.10.1
> > > >
> > > >
> > > >执行   flink savepoint a3a2e6c3a5a00bbe4c0c9e351dc58c47
> > > >hdfs://hadoopnamenodeHA/flink/flink-savepoints
> > > >
> > > >
> > > >出现错误信息
> > > >
> > > >
> > > >org.apache.flink.util.FlinkException: Triggering a savepoint for the
> job
> > > >a3a2e6c3a5a00bbe4c0c9e351dc58c47 failed.
> > > >
> > > > at
> > >
> > >
> >
> >org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:631)
> > > >
> > > > at
> > >
> > >
> >
> >org.apache.flink.client.cli.CliFrontend.lambda$savepoint$9(CliFrontend.java:609)
> > > >
> > > > at
> > >
> > >
> >
> >org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:841)
> > > >
> > > > at
> > > org.apache.flink.client.cli.CliFrontend.savepoint(CliFrontend.java:606)
> > > >
> > > > at
> > >
> > >
> >
> >org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:908)
> > > >
> > > > at
> > >
> > >
> >
> >org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:966)
> > > >
> > > > at java.security.AccessController.doPrivileged(Native Method)
> > > >
> > > > at javax.security.auth.Subject.doAs(Subject.java:422)
> > > >
> > > > at
> > >
> > >
> >
> >org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
> > > >
> > > > at
> > >
> > >
> >
> >org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> > > >
> > > > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:966)
> > > >
> > > >Caused by: java.util.concurrent.TimeoutException
> > > >
> > > > at
> > >
> > >
> >
> >java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771)
> > > >
> > > > at
> > > java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
> > > >
> > > > at
> > >
> > >
> >
> >org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:625)
> > >
> >
>


Re: flink savepoint

2020-11-08 Thread
已经指定了

admin <17626017...@163.com> 于2020年11月6日周五 下午3:17写道:

> Hi,
> 你的任务时跑在yarn上的吗?如果是 需要指定 -yid
>
> > 2020年11月6日 下午1:31,Congxian Qiu  写道:
> >
> > Hi
> > 从 client 端日志,或者 JM 日志还能看到其他的异常么?
> > Best,
> > Congxian
> >
> >
> > 张锴  于2020年11月6日周五 上午11:42写道:
> >
> >> 重启和反压都正常
> >> 另外增加了从客户端到master的时间,还是有这个问题
> >>
> >> hailongwang <18868816...@163.com> 于 2020年11月6日周五 10:54写道:
> >>
> >>> Hi,
> >>>
> >>>
> >>> 这个报错只是在规定的时间内没有完成 Savepoint,导致客户端连接 Master 超时,
> >>> 具体的原因需要看下 Jobmaster 的日志。
> >>> PS:在任务一直重启、反压的情况下,一般 Savepoint 都会失败。
> >>>
> >>>
> >>> Best,
> >>> Hailong Wang
> >>>
> >>>
> >>>
> >>>
> >>> 在 2020-11-06 09:33:48,"张锴"  写道:
> >>>> 本人在使用flink savepoint 保存快照的时候,遇到错误,目前不清楚是因为什么导致这个问题,路过的大佬帮忙看下。
> >>>>
> >>>> flink 版本1.10.1
> >>>>
> >>>>
> >>>> 执行   flink savepoint a3a2e6c3a5a00bbe4c0c9e351dc58c47
> >>>> hdfs://hadoopnamenodeHA/flink/flink-savepoints
> >>>>
> >>>>
> >>>> 出现错误信息
> >>>>
> >>>>
> >>>> org.apache.flink.util.FlinkException: Triggering a savepoint for the
> job
> >>>> a3a2e6c3a5a00bbe4c0c9e351dc58c47 failed.
> >>>>
> >>>> at
> >>>
> >>>
> >>>
> org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:631)
> >>>>
> >>>> at
> >>>
> >>>
> >>>
> org.apache.flink.client.cli.CliFrontend.lambda$savepoint$9(CliFrontend.java:609)
> >>>>
> >>>> at
> >>>
> >>>
> >>>
> org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:841)
> >>>>
> >>>> at
> >>> org.apache.flink.client.cli.CliFrontend.savepoint(CliFrontend.java:606)
> >>>>
> >>>> at
> >>>
> >>>
> >>>
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:908)
> >>>>
> >>>> at
> >>>
> >>>
> >>>
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:966)
> >>>>
> >>>> at java.security.AccessController.doPrivileged(Native Method)
> >>>>
> >>>> at javax.security.auth.Subject.doAs(Subject.java:422)
> >>>>
> >>>> at
> >>>
> >>>
> >>>
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
> >>>>
> >>>> at
> >>>
> >>>
> >>>
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> >>>>
> >>>> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:966)
> >>>>
> >>>> Caused by: java.util.concurrent.TimeoutException
> >>>>
> >>>> at
> >>>
> >>>
> >>>
> java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771)
> >>>>
> >>>> at
> >>> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
> >>>>
> >>>> at
> >>>
> >>>
> >>>
> org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:625)
> >>>
> >>
>
>


flink savepoint 异常

2020-11-07 Thread
本人用flink 1.10.1版本进行savepoint时遇到下列错误,暂时不清楚错误的原因,特来寻求帮助,麻烦大佬们看看

已经排除反压和重启的原因,checkpoint超时设置了十分钟,conf配置增加客户端连接master的时间,但还是出现异常。

命令

flink savepoint -yid application_1604456903594_2381
fb8131bcb78cbdf2bb9a705d8a4ceebc
hdfs:///hadoopnamenodeHA/flink/flink-savepoints

异常

The program finished with the following exception:

org.apache.flink.util.FlinkException: Triggering a savepoint for the job
fb8131bcb78cbdf2bb9a705d8a4ceebc failed.
 at
org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:631)
 at
org.apache.flink.client.cli.CliFrontend.lambda$savepoint$9(CliFrontend.java:609)
 at
org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:841)
 at org.apache.flink.client.cli.CliFrontend.savepoint(CliFrontend.java:606)
 at
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:908)
 at
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:966)
 at java.security.AccessController.doPrivileged(Native Method)
 at javax.security.auth.Subject.doAs(Subject.java:422)
 at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
 at
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
 at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:966)
Caused by: java.util.concurrent.CompletionException:
java.util.concurrent.CompletionException:
org.apache.flink.runtime.checkpoint.CheckpointException: Failed to trigger
savepoint. Failure reason: An Exception occurred while triggering the
checkpoint.
 at
org.apache.flink.runtime.scheduler.SchedulerBase.lambda$triggerSavepoint$3(SchedulerBase.java:756)
 at
java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822)
 at
java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797)
 at
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
 at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402)
 at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195)
 at
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
 at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
 at akka.japi.pf.UnitCaseState


Re: flink savepoint

2020-11-05 Thread
重启和反压都正常
另外增加了从客户端到master的时间,还是有这个问题

hailongwang <18868816...@163.com> 于 2020年11月6日周五 10:54写道:

> Hi,
>
>
> 这个报错只是在规定的时间内没有完成 Savepoint,导致客户端连接 Master 超时,
> 具体的原因需要看下 Jobmaster 的日志。
> PS:在任务一直重启、反压的情况下,一般 Savepoint 都会失败。
>
>
> Best,
> Hailong Wang
>
>
>
>
> 在 2020-11-06 09:33:48,"张锴"  写道:
> >本人在使用flink savepoint 保存快照的时候,遇到错误,目前不清楚是因为什么导致这个问题,路过的大佬帮忙看下。
> >
> >flink 版本1.10.1
> >
> >
> >执行   flink savepoint a3a2e6c3a5a00bbe4c0c9e351dc58c47
> >hdfs://hadoopnamenodeHA/flink/flink-savepoints
> >
> >
> >出现错误信息
> >
> >
> >org.apache.flink.util.FlinkException: Triggering a savepoint for the job
> >a3a2e6c3a5a00bbe4c0c9e351dc58c47 failed.
> >
> > at
>
> >org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:631)
> >
> > at
>
> >org.apache.flink.client.cli.CliFrontend.lambda$savepoint$9(CliFrontend.java:609)
> >
> > at
>
> >org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:841)
> >
> > at
> org.apache.flink.client.cli.CliFrontend.savepoint(CliFrontend.java:606)
> >
> > at
>
> >org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:908)
> >
> > at
>
> >org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:966)
> >
> > at java.security.AccessController.doPrivileged(Native Method)
> >
> > at javax.security.auth.Subject.doAs(Subject.java:422)
> >
> > at
>
> >org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
> >
> > at
>
> >org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> >
> > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:966)
> >
> >Caused by: java.util.concurrent.TimeoutException
> >
> > at
>
> >java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771)
> >
> > at
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
> >
> > at
>
> >org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:625)
>


flink savepoint

2020-11-05 Thread
本人在使用flink savepoint 保存快照的时候,遇到错误,目前不清楚是因为什么导致这个问题,路过的大佬帮忙看下。

flink 版本1.10.1


执行   flink savepoint a3a2e6c3a5a00bbe4c0c9e351dc58c47
hdfs://hadoopnamenodeHA/flink/flink-savepoints


出现错误信息


org.apache.flink.util.FlinkException: Triggering a savepoint for the job
a3a2e6c3a5a00bbe4c0c9e351dc58c47 failed.

 at
org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:631)

 at
org.apache.flink.client.cli.CliFrontend.lambda$savepoint$9(CliFrontend.java:609)

 at
org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:841)

 at org.apache.flink.client.cli.CliFrontend.savepoint(CliFrontend.java:606)

 at
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:908)

 at
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:966)

 at java.security.AccessController.doPrivileged(Native Method)

 at javax.security.auth.Subject.doAs(Subject.java:422)

 at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)

 at
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)

 at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:966)

Caused by: java.util.concurrent.TimeoutException

 at
java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771)

 at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)

 at
org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:625)


flink on yarn 读取 hbase数据时 ,Task失败,具体描述如下

2020-08-03 Thread
问题一:flink获取Hbase数据并计算 在本地测试没问题,提交到Yarn上出现Task任务失败,Job日志如下:




17:45:11.443 TKD [main-EventThread] ERROR
o.a.f.s.c.o.a.c.ConnectionState - Authentication failed
17:45:20.998 TKD [flink-rest-server-netty-worker-thread-16] ERROR
o.a.f.r.r.h.t.TaskManagerDetailsHandler - Unhandled exception.
org.apache.flink.runtime.resourcemanager.exceptions.UnknownTaskExecutorException:
No TaskExecutor registered under
container_e16_1593566817793_0659_01_02.
at 
org.apache.flink.runtime.resourcemanager.ResourceManager.requestTaskManagerInfo(ResourceManager.java:532)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:279)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:194)
at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:123)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
at akka.actor.Actor.aroundReceive(Actor.scala:517)
at akka.actor.Actor.aroundReceive$(Actor.scala:515)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
17:46:48.520 TKD [flink-akka.actor.default-dispatcher-20] ERROR
o.a.f.r.r.h.t.TaskManagerDetailsHandler - Unhandled exception.
不清楚出现此错误的原因,请教各位大佬。

问题二:flink on yarn
提交Job的时候我配置了JobManager和TaskManager之外,一般还需要配置什么参数?感觉自己配置的少了些什么。


flink on yarn 读取 hbase数据时 ,Task失败,具体描述如下

2020-07-29 Thread
flink获取Hbase数据并计算
在本地测试没问题,提交到Yarn上出现Task任务失败,无相关日志输出,task任务一直重启。任务失败的地方在数据计算部分。
语言:Scala,无堆栈信息输出


Re: 按照官网进行flink-shell操作,出现无法解决的错误:Only BatchTableSource and InputFormatTableSource are supported in BatchTableEnvironment.

2020-05-21 Thread
我去看看

Jeff Zhang  于2020年5月21日周四 下午4:54写道:

> 可以在zeppelin里写scala代码,是支持hive的,参考这个视频,
> https://www.bilibili.com/video/BV1Te411W73b?p=10
>
> 也可以到这个钉钉群讨论: 30022475
>
> Jingsong Li  于2020年5月21日周四 下午4:43写道:
>
> > Hi,
> >
> > 不好意思,现在版本hive connector已经不支持old planner了,
> > 但是scala shell还是默认old planner。
> >
> > Best,
> > Jingsong Lee
> >
> > On Thu, May 21, 2020 at 3:24 PM 张锴  wrote:
> >
> > > 具体操作及错误信息我贴到下面,各位大佬帮忙看下如何解决,不知道是不是BUG。
> > >
> > > scala> import org.apache.flink.table.catalog.hive.HiveCatalog
> > > import org.apache.flink.table.catalog.hive.HiveCatalog
> > >
> > > scala> val hiveCatalog = new HiveCatalog("hive", "mydatabase",
> > > "/opt/hive2.3.3/conf", "2.3.3");
> > > hiveCatalog: org.apache.flink.table.catalog.hive.HiveCatalog =
> > > org.apache.flink.table.catalog.hive.HiveCatalog@193f3306
> > >
> > > scala> btenv.registerCatalog("hive", hiveCatalog)
> > > Thu May 21 15:10:05 CST 2020 WARN: Establishing SSL connection without
> > > server's identity verification is not recommended. According to MySQL
> > > 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be
> > established
> > > by default if explicit option isn't set. For compliance with existing
> > > applications not using SSL the verifyServerCertificate property is set
> to
> > > 'false'. You need either to explicitly disable SSL by setting
> > useSSL=false,
> > > or set useSSL=true and provide truststore for server certificate
> > > verification.
> > > Thu May 21 15:10:05 CST 2020 WARN: Establishing SSL connection without
> > > server's identity verification is not recommended. According to MySQL
> > > 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be
> > established
> > > by default if explicit option isn't set. For compliance with existing
> > > applications not using SSL the verifyServerCertificate property is set
> to
> > > 'false'. You need either to explicitly disable SSL by setting
> > useSSL=false,
> > > or set useSSL=true and provide truststore for server certificate
> > > verification.
> > >
> > > scala> btenv.useCatalog("hive")
> > >
> > > scala> btenv.listTables
> > > res2: Array[String] = Array(mytable)
> > >
> > > scala> btenv.sqlQuery("select * from mytable").toDataSet[Row].print()
> > > org.apache.flink.table.api.TableException: Only BatchTableSource and
> > > InputFormatTableSource are supported in BatchTableEnvironment.
> > >   at
> > >
> > >
> >
> org.apache.flink.table.plan.nodes.dataset.BatchTableSourceScan.translateToPlan(BatchTableSourceScan.scala:118)
> > >   at
> > >
> > >
> >
> org.apache.flink.table.api.internal.BatchTableEnvImpl.translate(BatchTableEnvImpl.scala:306)
> > >   at
> > >
> > >
> >
> org.apache.flink.table.api.internal.BatchTableEnvImpl.translate(BatchTableEnvImpl.scala:281)
> > >   at
> > >
> > >
> >
> org.apache.flink.table.api.scala.internal.BatchTableEnvironmentImpl.toDataSet(BatchTableEnvironmentImpl.scala:69)
> > >   at
> > >
> > >
> >
> org.apache.flink.table.api.scala.TableConversions.toDataSet(TableConversions.scala:53)
> > >   ... 30 elided
> > >
> >
> >
> > --
> > Best, Jingsong Lee
> >
>
>
> --
> Best Regards
>
> Jeff Zhang
>


Re: 按照官网进行flink-shell操作,出现无法解决的错误:Only BatchTableSource and InputFormatTableSource are supported in BatchTableEnvironment.

2020-05-21 Thread
我想请问,在blink planner中,通过连接hive的方式将数据转成DataSet[Row]呢


Jingsong Li  于2020年5月21日周四 下午4:43写道:

> Hi,
>
> 不好意思,现在版本hive connector已经不支持old planner了,
> 但是scala shell还是默认old planner。
>
> Best,
> Jingsong Lee
>
> On Thu, May 21, 2020 at 3:24 PM 张锴  wrote:
>
> > 具体操作及错误信息我贴到下面,各位大佬帮忙看下如何解决,不知道是不是BUG。
> >
> > scala> import org.apache.flink.table.catalog.hive.HiveCatalog
> > import org.apache.flink.table.catalog.hive.HiveCatalog
> >
> > scala> val hiveCatalog = new HiveCatalog("hive", "mydatabase",
> > "/opt/hive2.3.3/conf", "2.3.3");
> > hiveCatalog: org.apache.flink.table.catalog.hive.HiveCatalog =
> > org.apache.flink.table.catalog.hive.HiveCatalog@193f3306
> >
> > scala> btenv.registerCatalog("hive", hiveCatalog)
> > Thu May 21 15:10:05 CST 2020 WARN: Establishing SSL connection without
> > server's identity verification is not recommended. According to MySQL
> > 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be
> established
> > by default if explicit option isn't set. For compliance with existing
> > applications not using SSL the verifyServerCertificate property is set to
> > 'false'. You need either to explicitly disable SSL by setting
> useSSL=false,
> > or set useSSL=true and provide truststore for server certificate
> > verification.
> > Thu May 21 15:10:05 CST 2020 WARN: Establishing SSL connection without
> > server's identity verification is not recommended. According to MySQL
> > 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be
> established
> > by default if explicit option isn't set. For compliance with existing
> > applications not using SSL the verifyServerCertificate property is set to
> > 'false'. You need either to explicitly disable SSL by setting
> useSSL=false,
> > or set useSSL=true and provide truststore for server certificate
> > verification.
> >
> > scala> btenv.useCatalog("hive")
> >
> > scala> btenv.listTables
> > res2: Array[String] = Array(mytable)
> >
> > scala> btenv.sqlQuery("select * from mytable").toDataSet[Row].print()
> > org.apache.flink.table.api.TableException: Only BatchTableSource and
> > InputFormatTableSource are supported in BatchTableEnvironment.
> >   at
> >
> >
> org.apache.flink.table.plan.nodes.dataset.BatchTableSourceScan.translateToPlan(BatchTableSourceScan.scala:118)
> >   at
> >
> >
> org.apache.flink.table.api.internal.BatchTableEnvImpl.translate(BatchTableEnvImpl.scala:306)
> >   at
> >
> >
> org.apache.flink.table.api.internal.BatchTableEnvImpl.translate(BatchTableEnvImpl.scala:281)
> >   at
> >
> >
> org.apache.flink.table.api.scala.internal.BatchTableEnvironmentImpl.toDataSet(BatchTableEnvironmentImpl.scala:69)
> >   at
> >
> >
> org.apache.flink.table.api.scala.TableConversions.toDataSet(TableConversions.scala:53)
> >   ... 30 elided
> >
>
>
> --
> Best, Jingsong Lee
>


按照官网进行flink-shell操作,出现无法解决的错误:Only BatchTableSource and InputFormatTableSource are supported in BatchTableEnvironment.

2020-05-21 Thread
具体操作及错误信息我贴到下面,各位大佬帮忙看下如何解决,不知道是不是BUG。

scala> import org.apache.flink.table.catalog.hive.HiveCatalog
import org.apache.flink.table.catalog.hive.HiveCatalog

scala> val hiveCatalog = new HiveCatalog("hive", "mydatabase",
"/opt/hive2.3.3/conf", "2.3.3");
hiveCatalog: org.apache.flink.table.catalog.hive.HiveCatalog =
org.apache.flink.table.catalog.hive.HiveCatalog@193f3306

scala> btenv.registerCatalog("hive", hiveCatalog)
Thu May 21 15:10:05 CST 2020 WARN: Establishing SSL connection without
server's identity verification is not recommended. According to MySQL
5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established
by default if explicit option isn't set. For compliance with existing
applications not using SSL the verifyServerCertificate property is set to
'false'. You need either to explicitly disable SSL by setting useSSL=false,
or set useSSL=true and provide truststore for server certificate
verification.
Thu May 21 15:10:05 CST 2020 WARN: Establishing SSL connection without
server's identity verification is not recommended. According to MySQL
5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established
by default if explicit option isn't set. For compliance with existing
applications not using SSL the verifyServerCertificate property is set to
'false'. You need either to explicitly disable SSL by setting useSSL=false,
or set useSSL=true and provide truststore for server certificate
verification.

scala> btenv.useCatalog("hive")

scala> btenv.listTables
res2: Array[String] = Array(mytable)

scala> btenv.sqlQuery("select * from mytable").toDataSet[Row].print()
org.apache.flink.table.api.TableException: Only BatchTableSource and
InputFormatTableSource are supported in BatchTableEnvironment.
  at
org.apache.flink.table.plan.nodes.dataset.BatchTableSourceScan.translateToPlan(BatchTableSourceScan.scala:118)
  at
org.apache.flink.table.api.internal.BatchTableEnvImpl.translate(BatchTableEnvImpl.scala:306)
  at
org.apache.flink.table.api.internal.BatchTableEnvImpl.translate(BatchTableEnvImpl.scala:281)
  at
org.apache.flink.table.api.scala.internal.BatchTableEnvironmentImpl.toDataSet(BatchTableEnvironmentImpl.scala:69)
  at
org.apache.flink.table.api.scala.TableConversions.toDataSet(TableConversions.scala:53)
  ... 30 elided


Re: Flink convert Table to DataSet[Row]

2020-05-20 Thread
我用的是flink1.10的,那意思只能用flink planner的方式了吗

Jingsong Li  于2020年5月20日周三 下午2:55写道:

> blink planner是不支持和Dataset的转换的。
>
> Best,
> Jingsong Lee
>
> On Wed, May 20, 2020 at 2:49 PM 张锴  wrote:
>
> >   def main(args: Array[String]): Unit = {
> > val tableEnvSettings = EnvironmentSettings.newInstance()
> >   .useBlinkPlanner()
> >   .inBatchMode()
> >   .build()
> >
> > val tableEnv: TableEnvironment =
> > TableEnvironment.create(tableEnvSettings)
> >
> > val catalog = new HiveCatalog(
> >   "myhive", // catalog name
> >   "mydatabase", // default database
> >   "D:\\data\\conf", // Hive config (hive-site.xml) directory
> >   "3.1.2" // Hive version
> > )
> >
> > tableEnv.registerCatalog("myhive", catalog)
> > tableEnv.useCatalog("myhive")
> > tableEnv.getConfig.setSqlDialect(SqlDialect.HIVE)
> > tableEnv.listTables().foreach(println)
> >
> > import org.apache.flink.table.api.scala._
> > import org.apache.flink.api.scala._
> >
> >
> > val mytable = tableEnv.from("mytable")
> > val result = mytable
> >   .groupBy("pfid")
> >   .select("nv_mv", "pfid")
> >   .toDataSet[Row] // conversion to DataSet
> >   .print()
> >
> >   }
> >
> >   Exception in thread "main"
> > org.apache.flink.table.api.ValidationException: Only tables that
> originate
> > from Scala DataSets can be converted to Scala DataSets.
> > at
> >
> >
> org.apache.flink.table.api.scala.TableConversions.toDataSet(TableConversions.scala:56)
> > at HiveService$.main(HiveService.scala:40)
> > at HiveService.main(HiveService.scala)
> >
> > Jingsong Li  于2020年5月20日周三 下午2:06写道:
> >
> > > 不好意思,
> > >
> > > 还是看不到你的图,可以考虑copy异常栈。
> > >
> > > 方便问一下后续的指标计算用Table/SQL搞不定吗?
> > >
> > > Best,
> > > Jingsong Lee
> > >
> > > On Wed, May 20, 2020 at 1:52 PM 张锴  wrote:
> > >
> > > > [image: 微信图片_20200520132244.png]
> > > > [image: 微信图片_20200520132343.png]
> > > >
> > > > Jingsong Li  于2020年5月20日周三 下午1:30写道:
> > > >
> > > >> Hi,
> > > >>
> > > >> 没看见有附件,请问为啥需要转车DateSet,Table里有啥搞不定呢?
> > > >>
> > > >> Best,
> > > >> Jingsong Lee
> > > >>
> > > >> On Wed, May 20, 2020 at 1:26 PM 张锴  wrote:
> > > >>
> > > >> > 我在测试将hive查询出的数据转换成DataSet[Row]时,出现一些无法解决的问题,代码和异常
> 在附件中,麻烦各位小伙伴给看一下。
> > > >> >
> > > >>
> > > >>
> > > >> --
> > > >> Best, Jingsong Lee
> > > >>
> > > >
> > >
> > > --
> > > Best, Jingsong Lee
> > >
> >
>
>
> --
> Best, Jingsong Lee
>


Re: Flink convert Table to DataSet[Row]

2020-05-20 Thread
  def main(args: Array[String]): Unit = {
val tableEnvSettings = EnvironmentSettings.newInstance()
  .useBlinkPlanner()
  .inBatchMode()
  .build()

val tableEnv: TableEnvironment =
TableEnvironment.create(tableEnvSettings)

val catalog = new HiveCatalog(
  "myhive", // catalog name
  "mydatabase", // default database
  "D:\\data\\conf", // Hive config (hive-site.xml) directory
  "3.1.2" // Hive version
)

tableEnv.registerCatalog("myhive", catalog)
tableEnv.useCatalog("myhive")
tableEnv.getConfig.setSqlDialect(SqlDialect.HIVE)
tableEnv.listTables().foreach(println)

import org.apache.flink.table.api.scala._
import org.apache.flink.api.scala._


val mytable = tableEnv.from("mytable")
val result = mytable
  .groupBy("pfid")
  .select("nv_mv", "pfid")
  .toDataSet[Row] // conversion to DataSet
  .print()

  }

  Exception in thread "main"
org.apache.flink.table.api.ValidationException: Only tables that originate
from Scala DataSets can be converted to Scala DataSets.
at
org.apache.flink.table.api.scala.TableConversions.toDataSet(TableConversions.scala:56)
at HiveService$.main(HiveService.scala:40)
at HiveService.main(HiveService.scala)

Jingsong Li  于2020年5月20日周三 下午2:06写道:

> 不好意思,
>
> 还是看不到你的图,可以考虑copy异常栈。
>
> 方便问一下后续的指标计算用Table/SQL搞不定吗?
>
> Best,
> Jingsong Lee
>
> On Wed, May 20, 2020 at 1:52 PM 张锴  wrote:
>
> > [image: 微信图片_20200520132244.png]
> > [image: 微信图片_20200520132343.png]
> >
> > Jingsong Li  于2020年5月20日周三 下午1:30写道:
> >
> >> Hi,
> >>
> >> 没看见有附件,请问为啥需要转车DateSet,Table里有啥搞不定呢?
> >>
> >> Best,
> >> Jingsong Lee
> >>
> >> On Wed, May 20, 2020 at 1:26 PM 张锴  wrote:
> >>
> >> > 我在测试将hive查询出的数据转换成DataSet[Row]时,出现一些无法解决的问题,代码和异常 在附件中,麻烦各位小伙伴给看一下。
> >> >
> >>
> >>
> >> --
> >> Best, Jingsong Lee
> >>
> >
>
> --
> Best, Jingsong Lee
>


Re: Flink convert Table to DataSet[Row]

2020-05-19 Thread
[image: 微信图片_20200520132244.png]
[image: 微信图片_20200520132343.png]

Jingsong Li  于2020年5月20日周三 下午1:30写道:

> Hi,
>
> 没看见有附件,请问为啥需要转车DateSet,Table里有啥搞不定呢?
>
> Best,
> Jingsong Lee
>
> On Wed, May 20, 2020 at 1:26 PM 张锴  wrote:
>
> > 我在测试将hive查询出的数据转换成DataSet[Row]时,出现一些无法解决的问题,代码和异常 在附件中,麻烦各位小伙伴给看一下。
> >
>
>
> --
> Best, Jingsong Lee
>


Re: Flink convert Table to DataSet[Row]

2020-05-19 Thread
业务需要转成这种数据格式,以便后续指标计算。我直接插入图片吧


Jingsong Li  于2020年5月20日周三 下午1:30写道:

> Hi,
>
> 没看见有附件,请问为啥需要转车DateSet,Table里有啥搞不定呢?
>
> Best,
> Jingsong Lee
>
> On Wed, May 20, 2020 at 1:26 PM 张锴  wrote:
>
> > 我在测试将hive查询出的数据转换成DataSet[Row]时,出现一些无法解决的问题,代码和异常 在附件中,麻烦各位小伙伴给看一下。
> >
>
>
> --
> Best, Jingsong Lee
>


Flink convert Table to DataSet[Row]

2020-05-19 Thread
我在测试将hive查询出的数据转换成DataSet[Row]时,出现一些无法解决的问题,代码和异常 在附件中,麻烦各位小伙伴给看一下。