Re:回复: flink日志级别问题

2019-08-27 文章 高飞龙

我也遇到同样的问题,目的是希望在web上只看到error的日志,我们不用es




--

高飞龙
手机 +86 18710107193
gaofeilong198...@163.com



在 2019-08-27 19:51:35,"王金海"  写道:
>可以日志同步到ES,然后检索error类型的
>至于是否可以代码自定义,自己也没试过
>
>
>csbl...@163.com
>Have a nice day !
>
>
>在2019年08月27日 19:46,王金海 写道:
>日志同步到ES,然后检索error类型的
>至于是否可以代码自定义,就不太清楚了
>
>
>csbl...@163.com
>Have a nice day !
>
>
>在2019年08月27日 19:29,Zili Chen 写道:
>另一种思路如果可以 download 日志的话可以直接文本处理挑出 ERROR 日志(x
>
>Best,
>tison.
>
>
>陈思 <58683...@qq.com> 于2019年8月27日周二 下午6:56写道:
>
>目的:调整flink作业日志级别为ERROR
>
>
>背景:公司提交flink作业只能通过公司的可视化平台,因此不能修改集群上的log4j.properties文件,现在的日志级别是INFO,日志太多不方便排错
>
>目前情况:打算在代码中设置日志级别,使用LogManager.getRootLogger().setLevel(Level.ERROR);在算子的open方法中填入上述代码。但依然会输出INFO日志,请问大神们有什么解决方案吗?


Re: flink1.9 Blink planner create view 问题

2019-08-27 文章 Jark Wu
1.9 还不支持 create view 语法。如果要注册一个 view,可以通过下面的办法:

Table table = tEnv.sqlQuery(“select * from T”)
tEnv.registerTable(“v1”, table);

然后你就可以在之后的sql 中直接查询 v1了

Best,
Jark


> 在 2019年8月28日,11:39,hb <343122...@163.com> 写道:
> 
> 



Re: kafka流与hive表join问题

2019-08-27 文章 Jark Wu
Hi like,

> udtf加载完后是一种什么情况,是每一个task都会维护一份hive数据吗?
是的

> 还有一个问题就是1.9中的维表join是不是跟现在一样呢?有什么特别的地方吗?
1.9 中支持的维表 join,只支持 lookup 方式查询维表,其实现方式和 UDTF 是类似的。


Best,
Jark

> 在 2019年8月28日,10:57,like  写道:
> 
> Hi Jark
> 
>
> 非常感谢你提供的方案,我不了解udtf加载完后是一种什么情况,是每一个task都会维护一份hive数据吗?这应该会带来很大的开销。还有一个问题就是1.9中的维表join是不是跟现在一样呢?有什么特别的地方吗?
> 
> 
> 在2019年8月28日 10:10,Jark Wu 写道:
> Hi,
> 
> 看了你的问题,主要有两个问题。
> 1. join hive 维表,没加载完就有 join 输出了。
> 2. hive 加载完后,就不再做 checkpoint 了。
> 
> 第一个问题,目前flink 还没有内置支持hive 维表的支持。你可以自己实现一个 udtf 去拉取 hive 数据到内存,udtf 的 eval 
> 方法在加载完 hive 数据之前不返回,这样可以避免没有加载完就有输出的问题。
> 第二个问题,目前 streaming job 中如果存在 finish vertex,是无法做 checkpoint 的。
> 
> 
> Best,
> Jark
> 
> 
> 
> 在 2019年8月27日,17:41,like  写道:
> 
> 我通过hive union 
> kafka数据来解决hive数据不更新的问题,我现在碰到的问题是hive没有加载完就开始有join数据输出,比如我取最大日期,hive读取到26号就输出了26,过一会读取到27号,又会输出27,有没有办法让hive全部加载完再join输出,这样就只会有一个值输出了?
> 
> 
> 在2019年8月27日 17:33,苏 欣 写道:
> 我之前试过两种方式,但都有各自的问题:
> 1.注册hiveCatalog然后关联kafka,这种方式会存在hive表数据读取一次后不再更新的问题。
> 2.实现LookupableTableSource方法,通过jdbc方式读取hive维表来关联kafka,但是这种方式可以做到实时加载hive数据,但是效率比较低,尤其是读取数据量较大的hive表时。
> 
> 
> sean...@live.com
> 
> 发件人: like
> 发送时间: 2019-08-27 17:15
> 收件人: user-zh@flink.apache.org
> 主题: kafka流与hive表join问题
> 请问一下各位大佬,如何保证先加载完hive表,然后再与流join,我发现在hive还没有加载完就已经有join的结果出来,这样刚开始出来的结果是不准确的,还有一个问题是hive表加载完之后不会再做checkpoint?我目前使用的是1.7.1版本,看了1.9的维表join,blink文档说(必须加上FOR
>  SYSTEM_TIME AS OF 
> PROCTIME(),表示JOIN维表当前时刻所看到的每条数据),这也就意味着跟我现在一样,没有加载完hive就会join输出了?
> 
> 
> 
> 



flink1.9 Blink planner create view 问题

2019-08-27 文章 hb
注册了T表后,创建view报错
 tEnv.sqlUpdate(s"create view v1 as select * from T")
Exception in thread "main" org.apache.flink.table.api.TableException: 
Unsupported node type SqlCreateView


是用错方法了,还是不支持



回复: kafka流与hive表join问题

2019-08-27 文章 like
Hi Jark


非常感谢你提供的方案,我不了解udtf加载完后是一种什么情况,是每一个task都会维护一份hive数据吗?这应该会带来很大的开销。还有一个问题就是1.9中的维表join是不是跟现在一样呢?有什么特别的地方吗?


在2019年8月28日 10:10,Jark Wu 写道:
Hi,

看了你的问题,主要有两个问题。
1. join hive 维表,没加载完就有 join 输出了。
2. hive 加载完后,就不再做 checkpoint 了。

第一个问题,目前flink 还没有内置支持hive 维表的支持。你可以自己实现一个 udtf 去拉取 hive 数据到内存,udtf 的 eval 
方法在加载完 hive 数据之前不返回,这样可以避免没有加载完就有输出的问题。
第二个问题,目前 streaming job 中如果存在 finish vertex,是无法做 checkpoint 的。


Best,
Jark



在 2019年8月27日,17:41,like  写道:

我通过hive union 
kafka数据来解决hive数据不更新的问题,我现在碰到的问题是hive没有加载完就开始有join数据输出,比如我取最大日期,hive读取到26号就输出了26,过一会读取到27号,又会输出27,有没有办法让hive全部加载完再join输出,这样就只会有一个值输出了?


在2019年8月27日 17:33,苏 欣 写道:
我之前试过两种方式,但都有各自的问题:
1.注册hiveCatalog然后关联kafka,这种方式会存在hive表数据读取一次后不再更新的问题。
2.实现LookupableTableSource方法,通过jdbc方式读取hive维表来关联kafka,但是这种方式可以做到实时加载hive数据,但是效率比较低,尤其是读取数据量较大的hive表时。


sean...@live.com

发件人: like
发送时间: 2019-08-27 17:15
收件人: user-zh@flink.apache.org
主题: kafka流与hive表join问题
请问一下各位大佬,如何保证先加载完hive表,然后再与流join,我发现在hive还没有加载完就已经有join的结果出来,这样刚开始出来的结果是不准确的,还有一个问题是hive表加载完之后不会再做checkpoint?我目前使用的是1.7.1版本,看了1.9的维表join,blink文档说(必须加上FOR
 SYSTEM_TIME AS OF 
PROCTIME(),表示JOIN维表当前时刻所看到的每条数据),这也就意味着跟我现在一样,没有加载完hive就会join输出了?






Re: 在子查询上使用row_number over返回的rn都是1

2019-08-27 文章 ddwcg
因为sink到hbase,使用一个column存了top5的list,sink前我要组合一下这个list

> 在 2019年8月28日,10:12,Jark Wu  写道:
> 
> 为什么还需要后面接 process operator 呢? Flink TopN 已经帮你维护好了 state,直接输出到一个 update sink 
> 中就可以了。
> 
> 
> Best,
> Jark
> 
>> 在 2019年8月28日,10:08,ddwcg <3149768...@qq.com> 写道:
>> 
>> process
> 



Re: 在子查询上使用row_number over返回的rn都是1

2019-08-27 文章 Jark Wu
为什么还需要后面接 process operator 呢? Flink TopN 已经帮你维护好了 state,直接输出到一个 update sink 
中就可以了。


Best,
Jark

> 在 2019年8月28日,10:08,ddwcg <3149768...@qq.com> 写道:
> 
> process



Re: kafka流与hive表join问题

2019-08-27 文章 Jark Wu
Hi,

看了你的问题,主要有两个问题。
1. join hive 维表,没加载完就有 join 输出了。
2. hive 加载完后,就不再做 checkpoint 了。

第一个问题,目前flink 还没有内置支持hive 维表的支持。你可以自己实现一个 udtf 去拉取 hive 数据到内存,udtf 的 eval 
方法在加载完 hive 数据之前不返回,这样可以避免没有加载完就有输出的问题。
第二个问题,目前 streaming job 中如果存在 finish vertex,是无法做 checkpoint 的。


Best,
Jark



> 在 2019年8月27日,17:41,like  写道:
> 
> 我通过hive union 
> kafka数据来解决hive数据不更新的问题,我现在碰到的问题是hive没有加载完就开始有join数据输出,比如我取最大日期,hive读取到26号就输出了26,过一会读取到27号,又会输出27,有没有办法让hive全部加载完再join输出,这样就只会有一个值输出了?
> 
> 
> 在2019年8月27日 17:33,苏 欣 写道:
> 我之前试过两种方式,但都有各自的问题:
> 1.注册hiveCatalog然后关联kafka,这种方式会存在hive表数据读取一次后不再更新的问题。
> 2.实现LookupableTableSource方法,通过jdbc方式读取hive维表来关联kafka,但是这种方式可以做到实时加载hive数据,但是效率比较低,尤其是读取数据量较大的hive表时。
> 
> 
> sean...@live.com
> 
> 发件人: like
> 发送时间: 2019-08-27 17:15
> 收件人: user-zh@flink.apache.org
> 主题: kafka流与hive表join问题
> 请问一下各位大佬,如何保证先加载完hive表,然后再与流join,我发现在hive还没有加载完就已经有join的结果出来,这样刚开始出来的结果是不准确的,还有一个问题是hive表加载完之后不会再做checkpoint?我目前使用的是1.7.1版本,看了1.9的维表join,blink文档说(必须加上FOR
>  SYSTEM_TIME AS OF 
> PROCTIME(),表示JOIN维表当前时刻所看到的每条数据),这也就意味着跟我现在一样,没有加载完hive就会join输出了?
> 
> 
> 



Re: 在子查询上使用row_number over返回的rn都是1

2019-08-27 文章 Jark Wu
Hi,

你使用的是 flink 1.9 blink planner 吧?

首先你的 topn query 没有问题。结果也没有问题。
因为你是根据 province 分组求 top5,也就是每个省份排名前5的 id。但是现在你的数据中,每个省份只有一个 id,所以大家的排名都是1。
如果你想求全局前5名的省份,那么row_number 那里不需要定义 partition by province。

Best,
Jark


> 在 2019年8月27日,15:16,ddwcg <3149768...@qq.com> 写道:
> 
> 6> (true,id001,上海,647.55,1)



Fwd: Checkpoint使用

2019-08-27 文章 Andrew Lin
1,你说的应该是savepoint吧,checkpoint是运行中failover自动恢复的,savepoint如果设置了uid,改变并行度是可以从savepoint启动的



> 下面是被转发的邮件:
> 
> 发件人: yanggang_it_job 
> 主题: Checkpoint使用
> 日期: 2019年8月27日 GMT+8 下午6:08:24
> 收件人: user-zh@flink.apache.org
> 回复-收件人: user-zh@flink.apache.org
> 
> 关于flink从checkpoint的问题:
>   1、如果我的并行度发生了改变,怎么从checkpoint启动?
>   2、是否可以动态设置checkpoint触发时间?



flink1.9 hadoop3 on yarn "StoppableFunction not found"

2019-08-27 文章 Michael Ran
deal ALL :
目前在CDH6.2.0   hadoop3 上,编译了 flink 1.9
 提交的时候 异常:
CONSOLE#org.apache.flink.client.program.ProgramInvocationException: The program 
caused an error:

CONSOLE# at 
org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:93)

CONSOLE# at 
org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:80)

CONSOLE# at 
org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:122)

CONSOLE# at 
org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:227)

CONSOLE# at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:205)

CONSOLE# at 
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1010)

CONSOLE# at 
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1083)

CONSOLE# at java.security.AccessController.doPrivileged(Native Method)

CONSOLE# at javax.security.auth.Subject.doAs(Subject.java:422)

CONSOLE# at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875)

CONSOLE# at 
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)

CONSOLE# at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1083)

CONSOLE#Caused by: java.lang.NoClassDefFoundError: 
org/apache/flink/api/common/functions/StoppableFunction

CONSOLE# at java.lang.ClassLoader.defineClass1(Native Method)

CONSOLE# at java.lang.ClassLoader.defineClass(ClassLoader.java:763)

CONSOLE# at 
java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)

CONSOLE# at java.net.URLClassLoader.defineClass(URLClassLoader.java:468)

CONSOLE# at java.net.URLClassLoader.access$100(URLClassLoader.java:74)

CONSOLE# at java.net.URLClassLoader$1.run(URLClassLoader.java:369)

CONSOLE# at java.net.URLClassLoader$1.run(URLClassLoader.java:363)

CONSOLE# at java.security.AccessController.doPrivileged(Native Method)

CONSOLE# at java.net.URLClassLoader.findClass(URLClassLoader.java:362)

CONSOLE# at java.lang.ClassLoader.loadClass(ClassLoader.java:424)

CONSOLE# at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)

CONSOLE# at java.lang.ClassLoader.loadClass(ClassLoader.java:357)

CONSOLE# at com.dlb.RocketApp.buildSource(RocketApp.java:89)

CONSOLE# at com.dlb.RocketApp.main(RocketApp.java:55)

CONSOLE# at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

CONSOLE# at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

CONSOLE# at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

CONSOLE# at java.lang.reflect.Method.invoke(Method.java:498)

CONSOLE# at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576)

CONSOLE# at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)

CONSOLE# at 
org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83)

CONSOLE# ... 11 more

CONSOLE#Caused by: java.lang.ClassNotFoundException: 
org.apache.flink.api.common.functions.StoppableFunction

CONSOLE# at java.net.URLClassLoader.findClass(URLClassLoader.java:382)

CONSOLE# at java.lang.ClassLoader.loadClass(ClassLoader.java:424)

CONSOLE# at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)

CONSOLE# at java.lang.ClassLoader.loadClass(ClassLoader.java:357)




本地没问题,on yarn 就会报这个。StoppableFunction   已经被我干掉了,没任何引用,也搜索不到这个类了。
不知道社区有人遇到这个问题没?





回复: flink日志级别问题

2019-08-27 文章 王金海
日志同步到ES,然后检索error类型的
至于是否可以代码自定义,就不太清楚了


csbl...@163.com
Have a nice day !


在2019年08月27日 19:29,Zili Chen 写道:
另一种思路如果可以 download 日志的话可以直接文本处理挑出 ERROR 日志(x

Best,
tison.


陈思 <58683...@qq.com> 于2019年8月27日周二 下午6:56写道:

目的:调整flink作业日志级别为ERROR


背景:公司提交flink作业只能通过公司的可视化平台,因此不能修改集群上的log4j.properties文件,现在的日志级别是INFO,日志太多不方便排错

目前情况:打算在代码中设置日志级别,使用LogManager.getRootLogger().setLevel(Level.ERROR);在算子的open方法中填入上述代码。但依然会输出INFO日志,请问大神们有什么解决方案吗?


Re: flink日志级别问题

2019-08-27 文章 Zili Chen
另一种思路如果可以 download 日志的话可以直接文本处理挑出 ERROR 日志(x

Best,
tison.


陈思 <58683...@qq.com> 于2019年8月27日周二 下午6:56写道:

> 目的:调整flink作业日志级别为ERROR
>
>
> 背景:公司提交flink作业只能通过公司的可视化平台,因此不能修改集群上的log4j.properties文件,现在的日志级别是INFO,日志太多不方便排错
>
> 目前情况:打算在代码中设置日志级别,使用LogManager.getRootLogger().setLevel(Level.ERROR);在算子的open方法中填入上述代码。但依然会输出INFO日志,请问大神们有什么解决方案吗?


Re: flink1.9 blink planner table ddl 使用问题

2019-08-27 文章 徐骁
珞感谢

Jark Wu  于2019年8月27日周二 下午6:49写道:

>
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/connect.html#kafka-connector
> <
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/connect.html#kafka-connector
> >
>
> > 在 2019年8月27日,17:59,徐骁  写道:
> >
> > 这部分有文档吗,看了好几圈没看到
> >
> > hb <343122...@163.com> 于2019年8月26日周一 下午3:34写道:
> >
> >> 感谢,解决了, 指定 'connector.version' = '0.11' 就可以了.
> >> Blink SQL这方面的官方资料和文档好少啊,开发容易遇到问题.
> >>
> >> 在 2019-08-26 14:26:15,"hb" <343122...@163.com> 写道:
> >>> kafka版本是 kafka_2.11-1.1.0,
> >>> 支持的kafka版本有哪些
> >>> 在 2019-08-26 14:23:19,"pengcheng...@bonc.com.cn" <
> >> pengcheng...@bonc.com.cn> 写道:
>  检查一下代码的kafka版本,可能是这方面的错误
> 
> 
> 
>  pengcheng...@bonc.com.cn
> 
>  发件人: hb
>  发送时间: 2019-08-26 15:14
>  收件人: user-zh
>  主题: Re:Re: flink1.9 blink planner table ddl 使用问题
>  之前少了 flink-connector-kafka_2.11 依赖,
>  现在错误变成  Caused by: java.lang.NoSuchMethodError:
> >>
> org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.(Lorg/slf4j/Logger;Lorg/apache/flink/streaming/connectors/kafka/internal/Handover;Ljava/util/Properties;Lorg/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueue;Ljava/lang/String;JZLorg/apache/flink/metrics/MetricGroup;Lorg/apache/flink/metrics/MetricGroup;)V
>  了
> 
> 
>  pom依赖:
>  ```
>    
> 
> 
>    
>    org.apache.flink
>    flink-core
>    ${flink.version}
> 
> 
>    
> 
> 
>    
>    org.apache.flink
>    flink-clients_2.11
>    ${flink.version}
> 
> 
>    
> 
> 
>    
>    org.apache.flink
>    flink-scala_2.11
>    ${flink.version}
> 
> 
>    
> 
> 
>    
>    org.apache.flink
>    flink-streaming-scala_2.11
>    ${flink.version}
> 
> 
>    
> 
> 
>    
>    org.apache.flink
>    flink-table
>    1.9.0
>    pom
>    provided
>    
> 
> 
>    
>    org.apache.flink
>    flink-table-common
>    ${flink.version}
>    
> 
> 
>    
>    org.apache.flink
>    flink-cep-scala_2.11
>    ${flink.version}
>    
> 
> 
>    
>    org.apache.flink
>    flink-connector-filesystem_2.11
>    ${flink.version}
>    
> 
> 
> 
> 
>    
>    org.apache.flink
>    flink-table-api-scala-bridge_2.11
>    ${flink.version}
>  
>    
> 
> 
>    
>    org.apache.flink
>    flink-table-api-java-bridge_2.11
>    ${flink.version}
>    
>    
> 
> 
>    
>    org.apache.flink
>    flink-table-planner_2.11
>    ${flink.version}
>    
>    
> 
> 
>    
>    org.apache.flink
>    flink-table-runtime-blink_2.11
>    ${flink.version}
>    
> 
> 
> 
> 
>    
>    org.apache.flink
>    flink-table-planner-blink_2.11
>    ${flink.version}
>    
>    
> 
> 
>    
>    org.apache.flink
>    flink-connector-elasticsearch6_2.11
>    ${flink.version}
>    
> 
> 
>    
>    org.apache.flink
>    flink-connector-kafka-0.11_2.11
>    ${flink.version}
>    
> 
> 
>    
>    org.apache.flink
>    flink-connector-kafka_2.11
>    ${flink.version}
>    
>    
>    org.apache.flink
>    flink-json
>    ${flink.version}
>    
>    
>    org.apache.flink
>    flink-runtime-web_2.11
>    ${flink.version}
>    
>    
> 
> 
>  ```
> 
> 
> 
> 
> 
> 
> 
> 
>  在 2019-08-26 13:37:51,"Jark Wu"  写道:
> > Maven 需要同时依赖 flink-json 和 flink-connector-kafka_2.11
> >
> > Best,
> > Jark
> >
> >> 在 2019年8月26日,13:57,hb <343122...@163.com> 写道:
> >>
> >> 使用了你的ddl语句,还是报一样的错误.
> >> 我是在idea里面执行的,maven 配置的依赖.
> >>
> >> 在 2019-08-26 11:22:20,"Jark Wu"  写道:
> >>> Hi,
> >>>
> >>> 初步看下来你的 DDL 中有这几部分定义的有问题。
> >>>
> >>> 1. 缺少format properties
> >>> 2. 缺少 connector.version
> >>> 3. bootstrap.severs 的配置方式写的不对...
> >>>
> >>>
> >>> 你可以参考下面这个作为example:
> >>>
> >>>
> >>> CREATE TABLE kafka_json_source (
> 

maven配置错误百出

2019-08-27 文章 like
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/hive/



Re: flink1.9 blink planner table ddl 使用问题

2019-08-27 文章 Jark Wu
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/connect.html#kafka-connector
 


> 在 2019年8月27日,17:59,徐骁  写道:
> 
> 这部分有文档吗,看了好几圈没看到
> 
> hb <343122...@163.com> 于2019年8月26日周一 下午3:34写道:
> 
>> 感谢,解决了, 指定 'connector.version' = '0.11' 就可以了.
>> Blink SQL这方面的官方资料和文档好少啊,开发容易遇到问题.
>> 
>> 在 2019-08-26 14:26:15,"hb" <343122...@163.com> 写道:
>>> kafka版本是 kafka_2.11-1.1.0,
>>> 支持的kafka版本有哪些
>>> 在 2019-08-26 14:23:19,"pengcheng...@bonc.com.cn" <
>> pengcheng...@bonc.com.cn> 写道:
 检查一下代码的kafka版本,可能是这方面的错误
 
 
 
 pengcheng...@bonc.com.cn
 
 发件人: hb
 发送时间: 2019-08-26 15:14
 收件人: user-zh
 主题: Re:Re: flink1.9 blink planner table ddl 使用问题
 之前少了 flink-connector-kafka_2.11 依赖,
 现在错误变成  Caused by: java.lang.NoSuchMethodError:
>> org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.(Lorg/slf4j/Logger;Lorg/apache/flink/streaming/connectors/kafka/internal/Handover;Ljava/util/Properties;Lorg/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueue;Ljava/lang/String;JZLorg/apache/flink/metrics/MetricGroup;Lorg/apache/flink/metrics/MetricGroup;)V
 了
 
 
 pom依赖:
 ```
   
 
 
   
   org.apache.flink
   flink-core
   ${flink.version}
 
 
   
 
 
   
   org.apache.flink
   flink-clients_2.11
   ${flink.version}
 
 
   
 
 
   
   org.apache.flink
   flink-scala_2.11
   ${flink.version}
 
 
   
 
 
   
   org.apache.flink
   flink-streaming-scala_2.11
   ${flink.version}
 
 
   
 
 
   
   org.apache.flink
   flink-table
   1.9.0
   pom
   provided
   
 
 
   
   org.apache.flink
   flink-table-common
   ${flink.version}
   
 
 
   
   org.apache.flink
   flink-cep-scala_2.11
   ${flink.version}
   
 
 
   
   org.apache.flink
   flink-connector-filesystem_2.11
   ${flink.version}
   
 
 
 
 
   
   org.apache.flink
   flink-table-api-scala-bridge_2.11
   ${flink.version}
 
   
 
 
   
   org.apache.flink
   flink-table-api-java-bridge_2.11
   ${flink.version}
   
   
 
 
   
   org.apache.flink
   flink-table-planner_2.11
   ${flink.version}
   
   
 
 
   
   org.apache.flink
   flink-table-runtime-blink_2.11
   ${flink.version}
   
 
 
 
 
   
   org.apache.flink
   flink-table-planner-blink_2.11
   ${flink.version}
   
   
 
 
   
   org.apache.flink
   flink-connector-elasticsearch6_2.11
   ${flink.version}
   
 
 
   
   org.apache.flink
   flink-connector-kafka-0.11_2.11
   ${flink.version}
   
 
 
   
   org.apache.flink
   flink-connector-kafka_2.11
   ${flink.version}
   
   
   org.apache.flink
   flink-json
   ${flink.version}
   
   
   org.apache.flink
   flink-runtime-web_2.11
   ${flink.version}
   
   
 
 
 ```
 
 
 
 
 
 
 
 
 在 2019-08-26 13:37:51,"Jark Wu"  写道:
> Maven 需要同时依赖 flink-json 和 flink-connector-kafka_2.11
> 
> Best,
> Jark
> 
>> 在 2019年8月26日,13:57,hb <343122...@163.com> 写道:
>> 
>> 使用了你的ddl语句,还是报一样的错误.
>> 我是在idea里面执行的,maven 配置的依赖.
>> 
>> 在 2019-08-26 11:22:20,"Jark Wu"  写道:
>>> Hi,
>>> 
>>> 初步看下来你的 DDL 中有这几部分定义的有问题。
>>> 
>>> 1. 缺少format properties
>>> 2. 缺少 connector.version
>>> 3. bootstrap.severs 的配置方式写的不对...
>>> 
>>> 
>>> 你可以参考下面这个作为example:
>>> 
>>> 
>>> CREATE TABLE kafka_json_source (
>>>  rowtime TIMESTAMP,
>>>  user_name VARCHAR,
>>>  event ROW
>>> ) WITH (
>>>  'connector.type' = 'kafka',
>>>  'connector.version' = 'universal',
>>>  'connector.topic' = 'test-json',
>>>  'connector.startup-mode' = 'earliest-offset',
>>>  'connector.properties.0.key' = 'zookeeper.connect',
>>>  'connector.properties.0.value' = 'localhost:2181',
>>>  

Re: flink异常恢复

2019-08-27 文章 Jeff Zhang
上个checkpoint

王金海  于2019年8月27日周二 下午6:14写道:

> 讨论下flink异常重启问题
>
>
> 从kafka消费数据,checkpoint周期在5分钟,如果在第6分钟,因为异常导致flink任务重启,flink是从上个checkpoint恢复呢?还是从异常时的offset恢复呢?
>
>
>
> csbl...@163.com
> Have a nice day !
>
>

-- 
Best Regards

Jeff Zhang


flink异常恢复

2019-08-27 文章 王金海
讨论下flink异常重启问题

从kafka消费数据,checkpoint周期在5分钟,如果在第6分钟,因为异常导致flink任务重启,flink是从上个checkpoint恢复呢?还是从异常时的offset恢复呢?



csbl...@163.com
Have a nice day !



Re: Checkpoint使用

2019-08-27 文章 Congxian Qiu
Hi
1. 你可以按照文档[1]从一个 retained checkpoint 进行恢复,并发度可以改变,但是你需要保证最大并发是一样的
2. 不能动态设置 checkpoint 触发时间

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/state/checkpoints.html#resuming-from-a-retained-checkpoint
Best,
Congxian


yanggang_it_job  于2019年8月27日周二 下午6:08写道:

> 关于flink从checkpoint的问题:
>1、如果我的并行度发生了改变,怎么从checkpoint启动?
>2、是否可以动态设置checkpoint触发时间?


Checkpoint使用

2019-08-27 文章 yanggang_it_job
关于flink从checkpoint的问题:
   1、如果我的并行度发生了改变,怎么从checkpoint启动?
   2、是否可以动态设置checkpoint触发时间?

Re: flink基于yarn的方式提交,log在web上看太卡了。能不能直接看log文件?

2019-08-27 文章 Jeff Zhang
你是通过flink UI看log还是yarn ui 看log ?

陈帅  于2019年8月27日周二 下午5:55写道:

> flink基于yarn的方式提交,log在web上看太卡了。能不能直接看log文件?
>


-- 
Best Regards

Jeff Zhang


Re: Re:回复: Re: flink1.9 blink planner table ddl 使用问题

2019-08-27 文章 徐骁
这部分有文档吗,看了好几圈没看到

hb <343122...@163.com> 于2019年8月26日周一 下午3:34写道:

> 感谢,解决了, 指定 'connector.version' = '0.11' 就可以了.
> Blink SQL这方面的官方资料和文档好少啊,开发容易遇到问题.
>
> 在 2019-08-26 14:26:15,"hb" <343122...@163.com> 写道:
> >kafka版本是 kafka_2.11-1.1.0,
> >支持的kafka版本有哪些
> >在 2019-08-26 14:23:19,"pengcheng...@bonc.com.cn" <
> pengcheng...@bonc.com.cn> 写道:
> >>检查一下代码的kafka版本,可能是这方面的错误
> >>
> >>
> >>
> >>pengcheng...@bonc.com.cn
> >>
> >>发件人: hb
> >>发送时间: 2019-08-26 15:14
> >>收件人: user-zh
> >>主题: Re:Re: flink1.9 blink planner table ddl 使用问题
> >>之前少了 flink-connector-kafka_2.11 依赖,
> >>现在错误变成  Caused by: java.lang.NoSuchMethodError:
> org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.(Lorg/slf4j/Logger;Lorg/apache/flink/streaming/connectors/kafka/internal/Handover;Ljava/util/Properties;Lorg/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueue;Ljava/lang/String;JZLorg/apache/flink/metrics/MetricGroup;Lorg/apache/flink/metrics/MetricGroup;)V
> >>了
> >>
> >>
> >>pom依赖:
> >>```
> >>
> >>
> >>
> >>
> >>org.apache.flink
> >>flink-core
> >>${flink.version}
> >>
> >>
> >>
> >>
> >>
> >>
> >>org.apache.flink
> >>flink-clients_2.11
> >>${flink.version}
> >>
> >>
> >>
> >>
> >>
> >>
> >>org.apache.flink
> >>flink-scala_2.11
> >>${flink.version}
> >>
> >>
> >>
> >>
> >>
> >>
> >>org.apache.flink
> >>flink-streaming-scala_2.11
> >>${flink.version}
> >>
> >>
> >>
> >>
> >>
> >>
> >>org.apache.flink
> >>flink-table
> >>1.9.0
> >>pom
> >>provided
> >>
> >>
> >>
> >>
> >>org.apache.flink
> >>flink-table-common
> >>${flink.version}
> >>
> >>
> >>
> >>
> >>org.apache.flink
> >>flink-cep-scala_2.11
> >>${flink.version}
> >>
> >>
> >>
> >>
> >>org.apache.flink
> >>flink-connector-filesystem_2.11
> >>${flink.version}
> >>
> >>
> >>
> >>
> >>
> >>
> >>org.apache.flink
> >>flink-table-api-scala-bridge_2.11
> >>${flink.version}
> >>
> >>
> >>
> >>
> >>
> >>org.apache.flink
> >>flink-table-api-java-bridge_2.11
> >>${flink.version}
> >>
> >>
> >>
> >>
> >>
> >>org.apache.flink
> >>flink-table-planner_2.11
> >>${flink.version}
> >>
> >>
> >>
> >>
> >>
> >>org.apache.flink
> >>flink-table-runtime-blink_2.11
> >>${flink.version}
> >>
> >>
> >>
> >>
> >>
> >>
> >>org.apache.flink
> >>flink-table-planner-blink_2.11
> >>${flink.version}
> >>
> >>
> >>
> >>
> >>
> >>org.apache.flink
> >>flink-connector-elasticsearch6_2.11
> >>${flink.version}
> >>
> >>
> >>
> >>
> >>org.apache.flink
> >>flink-connector-kafka-0.11_2.11
> >>${flink.version}
> >>
> >>
> >>
> >>
> >>org.apache.flink
> >>flink-connector-kafka_2.11
> >>${flink.version}
> >>
> >>
> >>org.apache.flink
> >>flink-json
> >>${flink.version}
> >>
> >>
> >>org.apache.flink
> >>flink-runtime-web_2.11
> >>${flink.version}
> >>
> >>
> >>
> >>
> >>```
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>在 2019-08-26 13:37:51,"Jark Wu"  写道:
> >>>Maven 需要同时依赖 flink-json 和 flink-connector-kafka_2.11
> >>>
> >>>Best,
> >>>Jark
> >>>
>  在 2019年8月26日,13:57,hb <343122...@163.com> 写道:
> 
>  使用了你的ddl语句,还是报一样的错误.
>  我是在idea里面执行的,maven 配置的依赖.
> 
>  在 2019-08-26 11:22:20,"Jark Wu"  写道:
> > Hi,
> >
> > 初步看下来你的 DDL 中有这几部分定义的有问题。
> >
> > 1. 缺少format properties
> > 2. 缺少 connector.version
> > 3. bootstrap.severs 的配置方式写的不对...
> >
> >
> > 你可以参考下面这个作为example:
> >
> >
> > CREATE TABLE kafka_json_source (
> >   rowtime TIMESTAMP,
> >   user_name VARCHAR,
> >   event ROW
> > ) WITH (
> >   'connector.type' = 'kafka',
> >   'connector.version' = 'universal',
> >   'connector.topic' = 'test-json',
> >   'connector.startup-mode' = 'earliest-offset',
> >   'connector.properties.0.key' = 'zookeeper.connect',
> >   'connector.properties.0.value' = 'localhost:2181',
> >   'connector.properties.1.key' = 'bootstrap.servers',
> >   'connector.properties.1.value' = 'localhost:9092',
> >   'update-mode' = 'append',
> >   'format.type' = 'json',
> >   'format.derive-schema' = 'true'
> > );
> 

flink 1.8 sql rowtime window ????

2019-08-27 文章 1142632215
1.mysql 
binlog??200-3002.??250??4001200-300??
3.source??




2 
??id??RetractStream??timestamp??KafkaTableSource??rowtime
 select order_id ,last_value(timestamp) timestamp,last_value(order_status) 
order_status from order group by order_id 





??over window ??
over(partition by df(timestamp,'-MM-dd 00:00:00') order by update_time 
range BETWEEN INTERVAL '24' hour preceding and current row

Exception in thread "main" org.apache.flink.table.api.TableException: 
Retraction on Over window aggregation is not supported yet. Note: Over window 
aggregation should not follow a non-windowed GroupBy aggregation.


??
1. ??sql
2.last_value(rowtime) as rowtime  group by order_id ?? 
(rowtime time attribute)rowtime?? watermark?? 
rowtime??watermark??stream api 
??operatewatermark

flink基于yarn提交,需要依赖很多第三方的包,有没有办法添加classpath之类的,本地测试总是报错

2019-08-27 文章 陈帅
现在都是把代码打成一个胖包,每次这样,传输太麻烦了


flink基于yarn的方式提交,log在web上看太卡了。能不能直接看log文件?

2019-08-27 文章 陈帅
flink基于yarn的方式提交,log在web上看太卡了。能不能直接看log文件?


回复: kafka流与hive表join问题

2019-08-27 文章 like
我通过hive union 
kafka数据来解决hive数据不更新的问题,我现在碰到的问题是hive没有加载完就开始有join数据输出,比如我取最大日期,hive读取到26号就输出了26,过一会读取到27号,又会输出27,有没有办法让hive全部加载完再join输出,这样就只会有一个值输出了?


在2019年8月27日 17:33,苏 欣 写道:
我之前试过两种方式,但都有各自的问题:
1.注册hiveCatalog然后关联kafka,这种方式会存在hive表数据读取一次后不再更新的问题。
2.实现LookupableTableSource方法,通过jdbc方式读取hive维表来关联kafka,但是这种方式可以做到实时加载hive数据,但是效率比较低,尤其是读取数据量较大的hive表时。


sean...@live.com

发件人: like
发送时间: 2019-08-27 17:15
收件人: user-zh@flink.apache.org
主题: kafka流与hive表join问题
请问一下各位大佬,如何保证先加载完hive表,然后再与流join,我发现在hive还没有加载完就已经有join的结果出来,这样刚开始出来的结果是不准确的,还有一个问题是hive表加载完之后不会再做checkpoint?我目前使用的是1.7.1版本,看了1.9的维表join,blink文档说(必须加上FOR
 SYSTEM_TIME AS OF 
PROCTIME(),表示JOIN维表当前时刻所看到的每条数据),这也就意味着跟我现在一样,没有加载完hive就会join输出了?





回复: kafka流与hive表join问题

2019-08-27 文章 like
我通过HCatInputFormat读取了hive的数据注册了一张表,然后读取kafka的数据也注册了一张表,join就是通过sql写的,没有什么代码逻辑呢。


| |
like
|
|
likeg...@163.com
|
签名由网易邮箱大师定制
在2019年8月27日 17:17,Jeff Zhang 写道:
你是怎么join hive表的,能share你的代码吗?

like  于2019年8月27日周二 下午5:15写道:

请问一下各位大佬,如何保证先加载完hive表,然后再与流join,我发现在hive还没有加载完就已经有join的结果出来,这样刚开始出来的结果是不准确的,还有一个问题是hive表加载完之后不会再做checkpoint?我目前使用的是1.7.1版本,看了1.9的维表join,blink文档说(必须加上FOR
SYSTEM_TIME AS OF
PROCTIME(),表示JOIN维表当前时刻所看到的每条数据),这也就意味着跟我现在一样,没有加载完hive就会join输出了?





--
Best Regards

Jeff Zhang


Re: flink1.7.2如何进行hdfs的kerberos认证

2019-08-27 文章 Jeff Zhang
See
https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/config.html#kerberos-based-security


杨文生-java开发  于2019年8月27日周二 下午3:30写道:

>
>
>public static void main(String[] args) throws Exception {
> final ParameterTool parameterTool = ParameterTool
>
> .fromPropertiesFile(BizlogStreamWithEventTimeCleaner.class.getResourceAsStream(PROPERTIES_FILE_NAME))
>
> .mergeWith(ParameterTool.fromArgs(args));
> String resultTable = parameterTool.get(BIZLOG_RESULT_TABLENAME);
> //设置窗口大小
> Time windowSize =
> Time.milliseconds(parameterTool.getLong(BIZLOG_WINDOW_SIZE));
> //设置数据最大乱序时间
> Time maxOutOfOrder =
> Time.milliseconds(parameterTool.getLong(BIZLOG_WINDOW_MAXOUTOFORDE));
> hdfsAuthenticate(parameterTool);
> ...
> }
> private static void hdfsAuthenticate(ParameterTool parameterTool)
> throws IOException {
> String kerberosConfFile =
> BizlogStreamWithEventTimeCleaner.class.getClassLoader().getResource("krb5.conf").getPath();
>
> System.setProperty("java.security.krb5.conf", kerberosConfFile);
> Configuration conf = new Configuration();
> conf.set(HADOOP_SECURITY_AUTHENTICATION, "kerberos");
> //加载hadoop配置文件
> String principal = parameterTool.get("kerberos.princpal");
> String keytabName = parameterTool.get("keytab.name");
> String keytabPath =
> BizlogStreamWithEventTimeCleaner.class.getClassLoader().getResource(keytabName).getPath();
>
> UserGroupInformation.setConfiguration(conf);
> UserGroupInformation.loginUserFromKeytab(principal, keytabPath);
> }
>
>
> 代码如上,在idea中直接运行可以认证通过,但是打成jar包提交到集群后报错如下:
> Caused by: java.io.IOException: Login failure for biuri/
> bj142.-in.dom...@btest.com from keytab
> file:/data/realtime-flink.jar!/kerberos.keytab:
> javax.security.auth.login.LoginException: Unable to obtain password from
> user
> 这个是什么原因?或者应该如何进行正确的集群认证?
>
>
>
>

-- 
Best Regards

Jeff Zhang


Re: 任务内存增长

2019-08-27 文章 Xintong Song
这个邮件列表看不到图片附件的,文本内容可以直接贴出来,图片的话需要放外部链接

Thank you~

Xintong Song



On Tue, Aug 27, 2019 at 5:17 PM 张坤  wrote:

>
> 感谢您的回复,checkpoint使用的rocksDB,现在查看GC日志得到以下信息,堆内存使用正常,线程数使用在500左右,线程回收,但是线程占用的内存好像并没有回收掉。
>
> 在 2019/8/27 下午5:02,“Xintong Song” 写入:
>
> 你用的是heap state backend吗?可以看下checkpoint
>
> size是否持续在增大,如果是的话很可能就是state增大导致的。作业运行后,随着处理的数据越来越多,state的key数量也会越来越多,大小随之增大。解决方案要么是改用RocksDB,要么是把tm内存配大为state增大留出富裕。
>
> 另外,如果checkpoint size持续增长没有趋于平缓的趋势,那么也可能state的使用有问题。
>
> 如果观察到不是state的问题,那么可能需要dump下tm的内存,看看是否哪里有内存泄露的情况。
>
> Thank you~
>
> Xintong Song
>
>
>
> On Mon, Aug 26, 2019 at 10:46 AM 张坤  wrote:
>
> > Hi:
> >
> >最近在使用Flink(1.7.2)提交任务到yarn(per
> >
> job),任务在yarn上运行几个小时就会被kill掉,观察到任务的内存一直在增长,任务提交时有内存参数设置,任务逻辑为kafka数据简单处理后,注册成table,使用窗口聚合,
> >
> > 大家有没有遇到类似的问题,原因是什么?怎么解决或者优化?谢谢!
> >
> >
>
>
>
>
>
>


Re: kafka流与hive表join问题

2019-08-27 文章 Jeff Zhang
你是怎么join hive表的,能share你的代码吗?

like  于2019年8月27日周二 下午5:15写道:

> 请问一下各位大佬,如何保证先加载完hive表,然后再与流join,我发现在hive还没有加载完就已经有join的结果出来,这样刚开始出来的结果是不准确的,还有一个问题是hive表加载完之后不会再做checkpoint?我目前使用的是1.7.1版本,看了1.9的维表join,blink文档说(必须加上FOR
> SYSTEM_TIME AS OF
> PROCTIME(),表示JOIN维表当前时刻所看到的每条数据),这也就意味着跟我现在一样,没有加载完hive就会join输出了?
>
>
>
>

-- 
Best Regards

Jeff Zhang


Re: 任务内存增长

2019-08-27 文章 张坤
感谢您的回复,checkpoint使用的rocksDB,现在查看GC日志得到以下信息,堆内存使用正常,线程数使用在500左右,线程回收,但是线程占用的内存好像并没有回收掉。

在 2019/8/27 下午5:02,“Xintong Song” 写入:

你用的是heap state backend吗?可以看下checkpoint

size是否持续在增大,如果是的话很可能就是state增大导致的。作业运行后,随着处理的数据越来越多,state的key数量也会越来越多,大小随之增大。解决方案要么是改用RocksDB,要么是把tm内存配大为state增大留出富裕。

另外,如果checkpoint size持续增长没有趋于平缓的趋势,那么也可能state的使用有问题。

如果观察到不是state的问题,那么可能需要dump下tm的内存,看看是否哪里有内存泄露的情况。

Thank you~

Xintong Song



On Mon, Aug 26, 2019 at 10:46 AM 张坤  wrote:

> Hi:
>
>最近在使用Flink(1.7.2)提交任务到yarn(per
> 
job),任务在yarn上运行几个小时就会被kill掉,观察到任务的内存一直在增长,任务提交时有内存参数设置,任务逻辑为kafka数据简单处理后,注册成table,使用窗口聚合,
>
> 大家有没有遇到类似的问题,原因是什么?怎么解决或者优化?谢谢!
>
>







kafka流与hive表join问题

2019-08-27 文章 like
请问一下各位大佬,如何保证先加载完hive表,然后再与流join,我发现在hive还没有加载完就已经有join的结果出来,这样刚开始出来的结果是不准确的,还有一个问题是hive表加载完之后不会再做checkpoint?我目前使用的是1.7.1版本,看了1.9的维表join,blink文档说(必须加上FOR
 SYSTEM_TIME AS OF 
PROCTIME(),表示JOIN维表当前时刻所看到的每条数据),这也就意味着跟我现在一样,没有加载完hive就会join输出了?





kafka流与hive表join问题

2019-08-27 文章 like
请问一下,如何保证先加载完hive表,然后再与流join,我发现在hive还没有加载完就已经有join的结果出来,这样刚开始出来的结果是不准确的,还有一个问题是hive表加载完之后不会再做checkpoint?我目前使用的是1.7.1版本,看了1.9的维表join,blink文档说(必须加上FOR
 SYSTEM_TIME AS OF 
PROCTIME(),表示JOIN维表当前时刻所看到的每条数据),这也就意味着跟我现在一样,没有加载完hive就会join输出了?


| |
like
|
|
likeg...@163.com
|
签名由网易邮箱大师定制

Re: 任务内存增长

2019-08-27 文章 Xintong Song
你用的是heap state backend吗?可以看下checkpoint
size是否持续在增大,如果是的话很可能就是state增大导致的。作业运行后,随着处理的数据越来越多,state的key数量也会越来越多,大小随之增大。解决方案要么是改用RocksDB,要么是把tm内存配大为state增大留出富裕。

另外,如果checkpoint size持续增长没有趋于平缓的趋势,那么也可能state的使用有问题。

如果观察到不是state的问题,那么可能需要dump下tm的内存,看看是否哪里有内存泄露的情况。

Thank you~

Xintong Song



On Mon, Aug 26, 2019 at 10:46 AM 张坤  wrote:

> Hi:
>
>最近在使用Flink(1.7.2)提交任务到yarn(per
> job),任务在yarn上运行几个小时就会被kill掉,观察到任务的内存一直在增长,任务提交时有内存参数设置,任务逻辑为kafka数据简单处理后,注册成table,使用窗口聚合,
>
> 大家有没有遇到类似的问题,原因是什么?怎么解决或者优化?谢谢!
>
>


flink1.7.2如何进行hdfs的kerberos认证

2019-08-27 文章 杨文生-java开发


   public static void main(String[] args) throws Exception { 
final ParameterTool parameterTool = ParameterTool 

.fromPropertiesFile(BizlogStreamWithEventTimeCleaner.class.getResourceAsStream(PROPERTIES_FILE_NAME))
 
.mergeWith(ParameterTool.fromArgs(args)); 
String resultTable = parameterTool.get(BIZLOG_RESULT_TABLENAME); 
//设置窗口大小 
Time windowSize = 
Time.milliseconds(parameterTool.getLong(BIZLOG_WINDOW_SIZE)); 
//设置数据最大乱序时间 
Time maxOutOfOrder = 
Time.milliseconds(parameterTool.getLong(BIZLOG_WINDOW_MAXOUTOFORDE)); 
hdfsAuthenticate(parameterTool); 
... 
} 
private static void hdfsAuthenticate(ParameterTool parameterTool) throws 
IOException { 
String kerberosConfFile = 
BizlogStreamWithEventTimeCleaner.class.getClassLoader().getResource("krb5.conf").getPath();
 
System.setProperty("java.security.krb5.conf", kerberosConfFile); 
Configuration conf = new Configuration(); 
conf.set(HADOOP_SECURITY_AUTHENTICATION, "kerberos"); 
//加载hadoop配置文件 
String principal = parameterTool.get("kerberos.princpal"); 
String keytabName = parameterTool.get("keytab.name"); 
String keytabPath = 
BizlogStreamWithEventTimeCleaner.class.getClassLoader().getResource(keytabName).getPath();
 
UserGroupInformation.setConfiguration(conf); 
UserGroupInformation.loginUserFromKeytab(principal, keytabPath); 
} 


代码如上,在idea中直接运行可以认证通过,但是打成jar包提交到集群后报错如下: 
Caused by: java.io.IOException: Login failure for 
biuri/bj142.-in.dom...@btest.com from keytab 
file:/data/realtime-flink.jar!/kerberos.keytab: 
javax.security.auth.login.LoginException: Unable to obtain password from user 
这个是什么原因?或者应该如何进行正确的集群认证? 





在子查询上使用row_number over返回的rn都是1

2019-08-27 文章 ddwcg
如果直接查询表是没问题,但是业务需求是按汇总后的amount排序,所以有一个from子查询,请问有没有什么方法汇总后求topN

select id,province,amount,rn from(
select
id,province,amount,
row_number() over(partition by province order by amount desc ) as rn
from (
select id,province,sum(amount) amount from mytable group by id,province
)m
)a where rn<=5
;

返回结果:

1> (true,id001,浙江,1505.66,1)
2> (true,id001,其他,3384.91,1)
7> (true,id001,北京,365.87,1)
3> (true,id001,天津,310.38,1)
7> (false,id001,北京,365.87,1)
7> (true,id001,北京,676.25,1)
7> (false,id001,北京,676.25,1)
7> (true,id001,北京,978.14,1)
7> (true,id001,广东,329.25,1)
7> (false,id001,广东,329.25,1)


如果直接查询表是没问题:
select id,province,amount,rn from(
select
id,province,amount,
row_number() over(partition by province order by amount desc ) as rn
from mytable
)a where rn<=5
;

7> (true,id001,北京,310.38,2)
6> (true,id001,湖北,344.34,1)
8> (true,id001,山东,348.11,1)
3> (true,id001,四川,7283.02,2)
7> (true,id001,北京,301.89,3)
3> (false,id001,四川,1128.3,2)
8> (true,id001,重庆,310.38,3)
3> (true,id001,四川,1128.3,3)
6> (true,id001,上海,647.55,1)
3> (false,id001,四川,310.38,3)
6> (false,id001,上海,310.38,1)
7> (true,id001,广东,329.25,1)
3> (true,id001,四川,310.38,4)
8> (true,id001,重庆,1618.87,1)
6> (true,id001,上海,310.38,2)

????flink????????????????????????????

2019-08-27 文章 1900
flink??flinkTwoPhaseCommitSinkFunction??,



?C beginTransaction 

?C preCommit 

?C commit 

?C abort 



sink??MYSQL
 preCommit??
commit??checkpoint??checkpoint
checkpoint??checkpoint
checkpoint??