邮件退订

2021-06-16 Thread wangweigu...@stevegame.cn

邮箱变更,退订!





回复: Re: FlinkSQL 是否支持类似临时中间表的概念

2020-06-24 Thread wangweigu...@stevegame.cn

根据你的需求描述,用Flink Table API和SQL是可以解决的!



 
发件人: Weixubin
发送时间: 2020-06-24 12:09
收件人: user-zh
主题: Re:Re: FlinkSQL 是否支持类似临时中间表的概念
 
 
 
感谢你提供了子查询的思路,不过经过试验后有点可惜,这似乎还是满足不了我们的需求。
 
 
我们的场景是从阿里云SLS读取消息。每条消息有一个字段是request_uri。
第一个数据处理过程就是将 request_uri 解析为多个属性(多列),存成一行,作为一条记录。
第二个数据处理过程就是将每条记录里的 声明heart_time为事件时间属性并使用5秒延迟水印策略,进行开窗聚合处理。
 
 
//如果应用到子查询的话,Flink是不支持这样做的。WATERMARK FOR 水印声明只能在DDL里应用。如下:
select 
..ts as TO_TIMESTAMP(heart_time,'-MM-ddHH:mm:ss') ,
WATERMARK FOR ts AS ts - INTERVAL '1' SECOND 
from sourceTable , LATERAL TABLE(ParseUriRow(request_uri)) as T( )….・
 
 
//如果应用到source,则一开始并不知道heart_time 的值
CREATE TABLE sourceTable (
  request_uri STRING
..ts as TO_TIMESTAMP(heart_time,'-MM-ddHH:mm:ss') ,
WATERMARK FOR ts AS ts - INTERVAL '1' SECOND 
) WITH ( ... );
 
 
只能等待Flink 1.11 尝试是否可以用View作为中间临时表,并对View进行 WATERMARK水印声明
Thanks
Bin
 
在 2020-06-23 15:28:50,"Leonard Xu"  写道:
>Hi
>我的意思是你如果中间结果表如果要输出,那你就一个sink写到中间结果表(什么表根据你的需要),一个sink写到你的最终结果表,在这两个sink之前的`select
> * from sourceTable , LATERAL TABLE(ParseUriRow(request_uri)) as T( )….` 
>的这段sql是可以复用的,就和 VIEW的作用类似。
>
>如果你不需要中间结果表,只是要最终的结果表,那你写个嵌套的sql就行了,里层是`select * from sourceTable , LATERAL 
>TABLE(ParseUriRow(request_uri)) as T( )….・,外层是 group by, 插入最终的结果表就能满足需求了吧。
>
>祝好,
>Leonard Xu
>
>
>> 在 2020年6月23日,15:21,Weixubin <18925434...@163.com> 写道:
>> 
>> 
>> 
>> 
>> Hi,
>> 关于这句 “把 ` select * from sourceTable , LATERAL 
>> TABLE(ParseUriRow(request_uri)) as T( )….` 这段 sql insert 到中间结果表 和 
>> group后再写入最终结果表就可以了”
>> 我不太明白所说的中间结果表是什么意思, 我所理解为在数据库创建多一张middleSinkTable表,作为中间结果表。请问这样理解是否有误? 
>> 可否简单举个例子。
>> Thanks,
>> Bin
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 在 2020-06-23 11:57:28,"Leonard Xu"  写道:
>>> Hi,
>>> 是的,这个是在 FLINK 1.11 的版本里支持的,当前 FLINK 1.11代码已经冻结,正在测试中,最近应该快发布了,所以master 
>>> 分支上的版本号为1.12-SNAPSHOT
>>> ,等1.11版本发布了就可以看到对应的文档。
>>> 
>>> 回到这个问题,flink sql(blink planner) 支持 multiple-sinks 的, 在 1.10 版本也可以一个作业搞定。 把 
>>> `  select * from sourceTable , LATERAL TABLE(ParseUriRow(request_uri)) as 
>>> T( )….`  这段 sql insert 到中间结果表 和 group后再写入最终结果表就可以了。效果和使用 VIEW 应该类似,因为 
>>> planner 会做分段优化。
>>> 另外建议使用1.10.1版本,1.10.1在1.10的基础上修复了不少bug,也可以等1.11发布了试用下1.11。
>>> 
>>> 
>>> 祝好,
>>> Leonard Xu
>


回复: 关于拓展 Tuple元组的问题

2020-06-19 Thread wangweigu...@stevegame.cn

   多个值组合在一起,当一个复合值使用!



 
发件人: 魏旭斌
发送时间: 2020-06-19 15:01
收件人: user-zh
主题: 关于拓展 Tuple元组的问题
目前Flink 提供了Tuple1 ~ Tuple25,在实际开发中不能满足我们的需求,我们希望能有更大的元组,比如构建一个Tuple50。 
请问有什么解决的方案? 谢谢 


Re: Re: 对于维表频繁更新,状态越来越大的场景如何保证数据的准确性

2020-06-18 Thread wangweigu...@stevegame.cn

  可以通过异步的方式(RichAsyncFunction)进行维表关联操作,异步多线程方式进行维表读取!



 
发件人: Jim Chen
发送时间: 2020-06-19 10:34
收件人: user-zh
主题: Re: 对于维表频繁更新,状态越来越大的场景如何保证数据的准确性
请问下,在flink sql1.10中, localcache+异步IO,这个方案,是直接写sql关联维表就行了吗?flink
sql会自动在底层做优化工作吗?如果要自己手动实现的话,有没有什么demo呢?谢谢
 
Jark Wu  于2020年6月17日周三 上午12:11写道:
 
> 如果更新非常频繁,又要保证关联的准确性,又要保证吞吐,那么最佳的解决方案我觉得只能是关联 changelog 了,
> 只是 Flink 目前还没有原生支持维表关联一个 changelog,会在Flink SQL 1.12中去支持。
>
> 当前版本下的话,可以尝试 keyby+localcache+异步IO。
>
> Best,
> Jark
>
> On Tue, 16 Jun 2020 at 22:35, 李奇 <359502...@qq.com> wrote:
>
> > 或者采用redis做维表存储介质。
> >
> > > 在 2020年6月16日,下午10:10,wangxiangyan  写道:
> > >
> > > hi,大家
> > > 维表被频繁更新,数据量1g左右,需要频繁同步,使用什么方案去关联比较好呢?
> >
>


回复: sink mysql 失败

2020-06-10 Thread wangweigu...@stevegame.cn

Caused by: java.sql.SQLException: Access denied for user ''@'localhost' (using 
password: NO)
得指定下有操作mysql这个表的权限账号了!


 
发件人: Zhou Zach
发送时间: 2020-06-10 16:32
收件人: Flink user-zh mailing list
主题: sink mysql 失败
SLF4J: Class path contains multiple SLF4J bindings.
 
SLF4J: Found binding in 
[jar:file:/Users/Zach/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.4.1/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
 
SLF4J: Found binding in 
[jar:file:/Users/Zach/.m2/repository/org/slf4j/slf4j-log4j12/1.7.7/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class]
 
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
 
SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
 
ERROR StatusLogger No log4j2 configuration file found. Using default 
configuration: logging only errors to the console.
 
Wed Jun 10 16:27:09 CST 2020 WARN: Establishing SSL connection without server's 
identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ 
and 5.7.6+ requirements SSL connection must be established by default if 
explicit option isn't set. For compliance with existing applications not using 
SSL the verifyServerCertificate property is set to 'false'. You need either to 
explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide 
truststore for server certificate verification.
 
Wed Jun 10 16:27:09 CST 2020 WARN: Establishing SSL connection without server's 
identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ 
and 5.7.6+ requirements SSL connection must be established by default if 
explicit option isn't set. For compliance with existing applications not using 
SSL the verifyServerCertificate property is set to 'false'. You need either to 
explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide 
truststore for server certificate verification.
 
Wed Jun 10 16:27:09 CST 2020 WARN: Establishing SSL connection without server's 
identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ 
and 5.7.6+ requirements SSL connection must be established by default if 
explicit option isn't set. For compliance with existing applications not using 
SSL the verifyServerCertificate property is set to 'false'. You need either to 
explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide 
truststore for server certificate verification.
 
Wed Jun 10 16:27:09 CST 2020 WARN: Establishing SSL connection without server's 
identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ 
and 5.7.6+ requirements SSL connection must be established by default if 
explicit option isn't set. For compliance with existing applications not using 
SSL the verifyServerCertificate property is set to 'false'. You need either to 
explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide 
truststore for server certificate verification.
 
Wed Jun 10 16:27:09 CST 2020 WARN: Establishing SSL connection without server's 
identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ 
and 5.7.6+ requirements SSL connection must be established by default if 
explicit option isn't set. For compliance with existing applications not using 
SSL the verifyServerCertificate property is set to 'false'. You need either to 
explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide 
truststore for server certificate verification.
 
Wed Jun 10 16:27:09 CST 2020 WARN: Establishing SSL connection without server's 
identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ 
and 5.7.6+ requirements SSL connection must be established by default if 
explicit option isn't set. For compliance with existing applications not using 
SSL the verifyServerCertificate property is set to 'false'. You need either to 
explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide 
truststore for server certificate verification.
 
Wed Jun 10 16:27:09 CST 2020 WARN: Establishing SSL connection without server's 
identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ 
and 5.7.6+ requirements SSL connection must be established by default if 
explicit option isn't set. For compliance with existing applications not using 
SSL the verifyServerCertificate property is set to 'false'. You need either to 
explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide 
truststore for server certificate verification.
 
Wed Jun 10 16:27:09 CST 2020 WARN: Establishing SSL connection without server's 
identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ 
and 5.7.6+ requirements SSL connection must be established by default if 
explicit option isn't set. For compliance with existing applications not using 
SSL the verifyServerCertificate property is set to 'false'. You need either to 
explicitly disable SSL by setting 

????: ?????? ????flinksql between????

2020-06-10 Thread wangweigu...@stevegame.cn

  ??valuemysqlinst


 
 ??
?? 2020-06-10 15:25
 user-zh
?? ?? flinksql between
flink1.10.0 ??  

tnv.registerDataStream("sqlStream",mysqlinst,'start_ip,'end_ip,'area_id,'unit_id,'device_id)
  
tnv.registerDataStream("OMstream",value,'original_network_id,'asset_id,'types,'d_ip,'d_port,'s_ip,'s_port,'devip,'url,'common_des,'operation_des,'raw_log,'severity,'happen_time,'create_time,'s_ip_num,
 
'd_ip_num,'method,'asset_area_id,'device_id,'s_mac,'d_mac,'scope,'dcope,'s_asset_id,'d_asset_id,'asset_unit_id,'area_id,'unit_id,'enterprise_id)
  tnv.registerFunction("ip_to_num",IPtoNum)
 
?? 
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more 
info.
Exception in thread "main" org.apache.flink.table.api.ValidationException: Too 
many fields referenced from an atomic type.
at 
org.apache.flink.table.typeutils.FieldInfoUtils.extractFieldInfoFromAtomicType(FieldInfoUtils.java:388)
at 
org.apache.flink.table.typeutils.FieldInfoUtils.extractFieldInformation(FieldInfoUtils.java:259)
at 
org.apache.flink.table.typeutils.FieldInfoUtils.getFieldsInfo(FieldInfoUtils.java:227)
at 
org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl$$anonfun$1.apply(StreamTableEnvironmentImpl.scala:237)
at 
org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl$$anonfun$1.apply(StreamTableEnvironmentImpl.scala:236)
at scala.Option.map(Option.scala:146)
at 
org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl.asQueryOperation(StreamTableEnvironmentImpl.scala:236)
at 
org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl.fromDataStream(StreamTableEnvironmentImpl.scala:81)
at 
org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl.registerDataStream(StreamTableEnvironmentImpl.scala:94)
at 
com.jwell56.networksyslog.jobs.jobsource.MysqlLogToES$.main(MysqlLogToES.scala:77)
at 
com.jwell56.networksyslog.jobs.jobsource.MysqlLogToES.main(MysqlLogToES.scala)
 
 
 
 
 
----
??:"Leonard Xu"https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/table/sql/queries.html#joins
 
<https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/table/sql/queries.html#joins;
[2] 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/table/connect.html#jdbc-connector
 
<https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/table/connect.html#jdbc-connector;
 
 ?? 2020??6??1010:43 <932460...@qq.com ??
 
 hi, ?? ??joinmysql 
sourcemysql
 
 
 
 
 ------nbsp;????nbsp;--
 
??:nbsp;"wangweigu...@stevegame.cn"

????: ?????? ????flinksql between????

2020-06-09 Thread wangweigu...@stevegame.cn

  1.10 useBlinkPlanneruseOldPlanner
  
??
  Exception in thread "main" org.apache.flink.table.api.TableException: Cannot 
generate a valid execution plan for the given query: 

LogicalProject(num=[$0])
  LogicalJoin(condition=[AND(>($0, $1), <($0, $2))], joinType=[inner])
FlinkLogicalDataStreamScan(id=[1], fields=[num])
FlinkLogicalDataStreamScan(id=[2], fields=[startNum, endNum])

This exception indicates that the query uses an unsupported SQL feature.




 
 ??
?? 2020-06-09 17:41
 user-zh
?? ?? flinksql between
hi
1 flink1.9.0
2 oldplanner


Re: Re:延迟事件处理

2020-06-09 Thread wangweigu...@stevegame.cn

觉得对于下线产生的历史数据,用批处理应该更好一点,可以避免数据量过大造成的问题!


 
发件人: wangxiangyan
发送时间: 2020-06-09 17:26
收件人: user-zh
主题: Re:延迟事件处理
这是一个需要实时展示统计指标的系统,数据来源于检测器,检测器安装在客户那边,可能有下线的状态,或者数据延迟到达,不确定下线的时间,某个检测器下线之后在第二天上线会有一批昨天的数据,会发生延迟的数据处理


--Original--
From: "1048262223"<1048262...@qq.com;
Date: Tue, Jun 9, 2020 05:14 PM
To: "user-zh"

Re: Re: flink1.10 on yarn 问题

2020-06-09 Thread wangweigu...@stevegame.cn
a:373)
> ... 7 more
> Caused by: java.net.ConnectException: Connection refused
> at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
> at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
> at
> org.apache.hadoop.net.SocketIOWithTimeout.connect(SocketIOWithTimeout.java:206)
> at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:530)
> at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:494)
> at org.apache.hadoop.ipc.Client$Connection.setupConnection(Client.java:609)
> at org.apache.hadoop.ipc.Client$Connection.setupIOstreams(Client.java:707)
> at org.apache.hadoop.ipc.Client$Connection.access$2800(Client.java:370)
> at org.apache.hadoop.ipc.Client.getConnection(Client.java:1523)
> at org.apache.hadoop.ipc.Client.call(Client.java:1440)
> ... 35 more
>
>
>
> -- 原始邮件 --
> *发件人:* "wangweigu...@stevegame.cn";
> *发送时间:* 2020年5月29日(星期五) 下午2:49
> *收件人:* "user-zh";
> *主题:* 回复: Re: flink1.10 on yarn 问题
>
>
> 这个报错:>>> Caused by: org.apache.flink.runtime.JobException: Recovery is
> suppressed
> >>> by NoRestartBackoffTimeStrategy
>
> 应该是没有读取到flink conf下面的flink-conf.yaml配置文件,里面有个task失败重启配置参数!
>
>
>
>
> 发件人: air23
> 发送时间: 2020-05-29 14:34
> 收件人: user-zh
> 主题: Re:Re: flink1.10 on yarn 问题
> 代码就是flink自带的例子。
>
> public class WordCountStreamingByJava {
> public static void main(String[] args) throws Exception {
>
> // 创建执行环境
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> // 设置socket数据源
> DataStreamSource source = env.socketTextStream("zongteng75", 9001,
> "\n");
>
> // 转化处理数据
> DataStream dataStream = source.flatMap(new
> FlatMapFunction() {
> @Override
> public void flatMap(String line, Collector collector)
> throws Exception {
>
> System.out.println(line);
> for (String word : line.split(" ")) {
> collector.collect(new WordWithCount(word, 1));
> }
> }
> }).keyBy("word")//以key分组统计
> .timeWindow(Time.seconds(2),Time.seconds(2))//设置一个窗口函数,模拟数据流动
> .sum("count");//计算时间窗口内的词语个数
>
> // 输出数据到目的端
> dataStream.print();
>
> // 执行任务操作
> env.execute("Flink Streaming Word Count By Java");
>
> }
>
>
>
>
> 我现在加了flink环境变量 这个例子 可以过了。就很奇怪
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-05-29 14:22:39,"tison"  写道:
> >然后你 execute 前后的代码片段甚至整个 main 如果可以的话通过 gist 贴一下(x)
> >
> >Best,
> >tison.
> >
> >
> >tison  于2020年5月29日周五 下午2:21写道:
> >
> >> 这个问题好诡异啊,一般来说编译会在 env.execute
> >> 的时候拦截,不应该真的调度起来才对。你能详细描述一下你提交作业的方法还有这个错误报在哪里吗(client?cluster?)?
> >>
> >> Best,
> >> tison.
> >>
> >>
> >> air23  于2020年5月29日周五 下午1:38写道:
> >>
> >>> cdh运行flink1.10 on cdh yarn 报错如下。 用1.7.2版本就没有问题
> >>> flink-shaded-hadoop-2-uber-2.6.5-10.0.jar 也加了
> >>> hadoop环境变量 export HADOOP_CONF_DIR=/etc/hadoop/conf
> >>> 求解答
> >>>
> >>>
> >>>
> >>>
> >>>
> >>>
> >>>
> >>> org.apache.flink.client.program.ProgramInvocationException: The main
> >>> method caused an error:
> >>> org.apache.flink.client.program.ProgramInvocationException: Job failed
> >>> (JobID: e358699c1be6be1472078771e1fd027f)
> >>>
> >>> at
> >>>
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
> >>>
> >>> at
> >>>
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
> >>>
> >>> at
> >>>
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
> >>>
> >>> at
> >>>
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:662)
> >>>
> >>> at
> >>> org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:210)
> >>>
> >>> at
> >>>
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:893)
> >>>
> >>> at
> >>>
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:966)
> >>>
> >>> at java.security.AccessController.doPrivileged(Native Method)
> >>>
> >>> at javax.security.auth.Subject.doAs(Subject.j

????: flink-1.10 ????hdfs????????????????????????

2020-06-02 Thread wangweigu...@stevegame.cn

hdfs-site.xml ?? 
core-site.xml??IDEA??resourcesconfnamespace


 
 kcz
?? 2020-06-02 19:20
 user-zh
?? flink-1.10 hdfs
??
String path = "hdfs://HACluster/user/flink/test-1/2020-05-29--15/";
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
FileInputFormat fileInputFormat = new TextInputFormat(new Path(path));
fileInputFormat.setNestedFileEnumeration(true);
env.readFile(fileInputFormat, path).print();
env.execute();hdfs??/user/flink/test-1/2020-05-29--15/.part-0-0.inprogress.6c12fe72-5602-4458-b29f-c8c8b4a7b73b(??)/user/flink/test-1/2020-05-29--15/.part-1-0.inprogress.34b1d5ff-cf0d-4209-b409-21920b12327dflink??


回复: Re:提交flink代码 无法创建taskmanager flink web界面一致停留在created状态 ,日志报错如下

2020-05-31 Thread wangweigu...@stevegame.cn

这个邮件好像图片都看不到啊,你们能看到不?


 
发件人: 程龙
发送时间: 2020-05-30 19:20
收件人: user-zh
主题: Re:re:提交flink代码 无法创建taskmanager flink web界面一致停留在created状态 ,日志报错如下


是用代码提交的jobmanager 是可以加载的 就是启动taskmanager 这个目录就没有创建, 界面如下 ,错误日志就是我下面贴出来的那个




在 2020-05-30 19:16:57,"462329521" <462329...@qq.com> 写道:
>你的提交命令是什么呢看样子是加载不到配置文件
>
>
>-- 原始邮件 --
>发件人: "程龙"<13162790...@163.com;
>发件时间: 2020-05-30 19:13
>收件人: "user-zh"主题: 提交flink代码 无法创建taskmanager flink web界面一致停留在created状态 ,日志报错如下
>
>
>
>2020-05-30 19:07:31,418 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner 
> - 
>
> 2020-05-30 19:07:31,418 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner
>  - Registered UNIX signal handlers for [TERM, HUP, INT] 
>2020-05-30 19:07:31,420 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner 
> - Current working Directory: 
>/tmp/hadoop-bjhl/nm-local-dir/usercache/bjhl/appcache/application_1590820026922_0020/container_1590820026922_0020_01_08
> 2020-05-30 19:07:31,427 ERROR org.apache.flink.yarn.YarnTaskExecutorRunner
>  - YARN TaskManager initialization failed. 
>org.apache.flink.configuration.IllegalConfigurationException: The Flink config 
>file 
>'/tmp/hadoop-bjhl/nm-local-dir/usercache/bjhl/appcache/application_1590820026922_0020/container_1590820026922_0020_01_08/flink-conf.yaml'
> 
>(/tmp/hadoop-bjhl/nm-local-dir/usercache/bjhl/appcache/application_1590820026922_0020/container_1590820026922_0020_01_08)
> does not exist. at 
>org.apache.flink.configuration.GlobalConfiguration.loadConfiguration(GlobalConfiguration.java:112)
> at 
>org.apache.flink.yarn.YarnTaskExecutorRunner.run(YarnTaskExecutorRunner.java:114)
> at 
>org.apache.flink.yarn.YarnTaskExecutorRunner.main(YarnTaskExecutorRunner.java:82)




 


????: flink ????hadoop????????

2020-05-29 Thread wangweigu...@stevegame.cn
   cdh??hdfs8020?? 
hdfs://TestHACluster/user/flink/test   
hdfs://TestHACluster:8020/user/flink/test
   
flinkTestHAClusterNamespace??hdfsHA??hive-site.xml??hdfs-site.xml


 
 ??
?? 2020-05-29 15:06
 user-zh
?? flink hadoop
 ??
hadoopTestHACluster??apipath 
hdfs://TestHACluster/user/flink/test
??TestHACluster:8020?? 
??hiveTestHACluster:8020
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
DataStream

Re: Re: flink-sql watermark问题

2020-05-29 Thread wangweigu...@stevegame.cn

你的代码:w_ts as TO_TIMESTAMP(FROM_UNIXTIME(rowtime/1000),'-MM-dd HH:mm:ss')
这里里面的 FROM_UNIXTIME 函数就是接受BIGINT参数,返回 VARCHAR类型的日期值,默认日期格式:-MM-dd 
HH:mm:ss
然后通过TO_TIMESTAMP函数转换成TIMESTAMP类型,是timestamp(3)类型,这个只能用blink planner

参考阿里云网址:https://help.aliyun.com/knowledge_list_page/62717/1.html?spm=a2c4g.11186631.2.4.41933356drYMGX
 
里面有flink 日期函数接受!



 
发件人: Benchao Li
发送时间: 2020-05-29 10:35
收件人: user-zh
主题: Re: Re: flink-sql watermark问题
Flink支持把Timestamp(3)这种类型声明声明为事件时间列,并且为它生成watermark。
你上面提到的"-MM-dd'T'HH:mm:ss.SSS'Z'",并不是一种数据类型,它只是Timestamp的一种string表达形式,这个主要是在json
format里面把一个字符串解析为timestamp类型的时候需要的一种格式。
 
所以如果你有其他类型的字段,比如varchar、long、int等等,都可以通过内置函数或者udf将其转成timestamp(3)的类型,再在此基础上做watermark生成。
 
guaishushu1...@163.com  于2020年5月29日周五 上午10:25写道:
 
> 而且 flink不是只支持这种"-MM-dd'T'HH:mm:ss.SSS'Z'" 类型解析为watermark吗,就对这样有点疑惑
>
>
>
> guaishushu1...@163.com
>
> 发件人: guaishushu1...@163.com
> 发送时间: 2020-05-29 10:20
> 收件人: Benchao Li
> 抄送: user-zh
> 主题: Re: Re: flink-sql watermark问题
>
> 就是我是long类型的时间戳,但是用TO_TIMESTAMP转换成'-MM-dd HH:mm:ss' 之后依然可以生成watermark。
>
>
> guaishushu1...@163.com
>
> 发件人: Benchao Li
> 发送时间: 2020-05-28 17:00
> 收件人: user-zh
> 主题: Re: flink-sql watermark问题
> Hi,
>
> 没太看明白你的问题是什么。目前的确是只支持Timestmap(3)作为事件时间列。
> 之所以还不支持long作为事件时间列,主要考虑的是时区问题。但是这个社区也在考虑,可以参考[1]
>
> [1] https://issues.apache.org/jira/browse/FLINK-16938
>
> guaishushu1...@163.com  于2020年5月28日周四 下午4:22写道:
>
> > flink-1.10 sql只支持 timestamp(3) 类型字段生成watermark
> > 但是long这样转换后也可以生成watermark很奇怪?
> > CREATE TABLE user_log (
> > response_size int,
> > rowtime BIGINT,
> > w_ts as TO_TIMESTAMP(FROM_UNIXTIME(rowtime/1000),'-MM-dd HH:mm:ss'),
> > WATERMARK FOR w_ts AS w_ts - INTERVAL '5' SECOND --5秒的延迟
> > )
> >
> >
> >
> > guaishushu1...@163.com
> >
>
>
> --
>
> Best,
> Benchao Li
>
 
 
-- 
 
Best,
Benchao Li


回复: Re: flink1.10 on yarn 问题

2020-05-29 Thread wangweigu...@stevegame.cn

这个报错:>>> Caused by: org.apache.flink.runtime.JobException: Recovery is 
suppressed
>>> by NoRestartBackoffTimeStrategy

应该是没有读取到flink conf下面的flink-conf.yaml配置文件,里面有个task失败重启配置参数!



 
发件人: air23
发送时间: 2020-05-29 14:34
收件人: user-zh
主题: Re:Re: flink1.10 on yarn 问题
代码就是flink自带的例子。
 
public class WordCountStreamingByJava {
public static void main(String[] args) throws Exception {
 
// 创建执行环境
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
// 设置socket数据源
DataStreamSource source = env.socketTextStream("zongteng75", 9001, 
"\n");
 
// 转化处理数据
DataStream dataStream = source.flatMap(new 
FlatMapFunction() {
@Override
public void flatMap(String line, Collector collector) throws 
Exception {
 
System.out.println(line);
for (String word : line.split(" ")) {
collector.collect(new WordWithCount(word, 1));
}
}
}).keyBy("word")//以key分组统计
.timeWindow(Time.seconds(2),Time.seconds(2))//设置一个窗口函数,模拟数据流动
.sum("count");//计算时间窗口内的词语个数
 
// 输出数据到目的端
dataStream.print();
 
// 执行任务操作
env.execute("Flink Streaming Word Count By Java");
 
}
 
 
 
 
我现在加了flink环境变量 这个例子 可以过了。就很奇怪 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
在 2020-05-29 14:22:39,"tison"  写道:
>然后你 execute 前后的代码片段甚至整个 main 如果可以的话通过 gist 贴一下(x)
>
>Best,
>tison.
>
>
>tison  于2020年5月29日周五 下午2:21写道:
>
>> 这个问题好诡异啊,一般来说编译会在 env.execute
>> 的时候拦截,不应该真的调度起来才对。你能详细描述一下你提交作业的方法还有这个错误报在哪里吗(client?cluster?)?
>>
>> Best,
>> tison.
>>
>>
>> air23  于2020年5月29日周五 下午1:38写道:
>>
>>> cdh运行flink1.10 on cdh yarn 报错如下。 用1.7.2版本就没有问题
>>> flink-shaded-hadoop-2-uber-2.6.5-10.0.jar 也加了
>>> hadoop环境变量 export HADOOP_CONF_DIR=/etc/hadoop/conf
>>> 求解答
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> org.apache.flink.client.program.ProgramInvocationException: The main
>>> method caused an error:
>>> org.apache.flink.client.program.ProgramInvocationException: Job failed
>>> (JobID: e358699c1be6be1472078771e1fd027f)
>>>
>>> at
>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
>>>
>>> at
>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
>>>
>>> at
>>> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
>>>
>>> at
>>> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:662)
>>>
>>> at
>>> org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:210)
>>>
>>> at
>>> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:893)
>>>
>>> at
>>> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:966)
>>>
>>> at java.security.AccessController.doPrivileged(Native Method)
>>>
>>> at javax.security.auth.Subject.doAs(Subject.java:422)
>>>
>>> at
>>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692)
>>>
>>> at
>>> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>>>
>>> at
>>> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:966)
>>>
>>> Caused by: java.util.concurrent.ExecutionException:
>>> org.apache.flink.client.program.ProgramInvocationException: Job failed
>>> (JobID: e358699c1be6be1472078771e1fd027f)
>>>
>>> at
>>> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
>>>
>>> at
>>> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
>>>
>>> at
>>> org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:83)
>>>
>>> at
>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1620)
>>>
>>> at
>>> tt.WordCountStreamingByJava.main(WordCountStreamingByJava.java:36)
>>>
>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>
>>> at
>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>
>>> at
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>
>>> at java.lang.reflect.Method.invoke(Method.java:498)
>>>
>>> at
>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321)
>>>
>>> ... 11 more
>>>
>>> Caused by: org.apache.flink.client.program.ProgramInvocationException:
>>> Job failed (JobID: e358699c1be6be1472078771e1fd027f)
>>>
>>> at
>>> org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:112)
>>>
>>> at
>>> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602)
>>>
>>> at
>>> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
>>>
>>> at
>>> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>>>
>>> at
>>> 

Re: 回复:flink 1.9 1.10 on yarn在cdh上怎么搭建一个客户端

2020-05-27 Thread wangweigu...@stevegame.cn

确实,你只要配置好CDH的HADOOP_CONF环境变量,同时下载开源的Hadoop版本(和CDH版本相同)放到flink lib下,就可以访问CDH 
yarn,提交作业!

目前我这边是CDH 5.16.1,Flink 1.10,提交Flink on yarn是没问题,任务运行也没问题,还可以使用Flink on hive!

flink-shaded-hadoop-2-uber-2.6.5-10.0.jar



 
发件人: 111
发送时间: 2020-05-28 09:13
收件人: user-zh@flink.apache.org
主题: 回复:flink 1.9 1.10 on yarn在cdh上怎么搭建一个客户端
Hi,
一般只要你有yarn环境,在任意一台机器上下载flink安装包,配一下HADOOP_CONF环境变量就可以使用。
如果是session模式:可以使用Yarn-session.sh启动yarn session集群,然后通过flink run xxx 提交程序。
如果是per job模式:直接使用flink run即可。
best,
Xinghalo


Re: Re: JDBCLookupFunction被缓存导致数据的不及时性

2020-04-14 Thread wangweigu...@stevegame.cn

参考下这篇文章,里面有好多维度关联场景案例讲解!
https://ververica.cn/developers/flink-datastream-associated-dimension-table-practice/
 




 
发件人: tingli ke
发送时间: 2020-04-15 11:22
收件人: user-zh
主题: Re: JDBCLookupFunction被缓存导致数据的不及时性
是否有其他的方式来对mysql维表数据进行实时join
 
 
13122260...@163.com <13122260...@163.com> 于2020年4月15日周三 上午11:08写道:
 
> 有个setCacheMaxSize(1000),可以改成 -1 表示不使用cache
> org.apache.flink.api.java.io.jdbc.JDBCLookupFunction 这个方法有解释
> The cacheMaxSize is -1 means not use cache
>
>
>
> 13122260...@163.com
>
> 发件人: tingli ke
> 发送时间: 2020-04-15 10:55
> 收件人: user-zh
> 主题: JDBCLookupFunction被缓存导致数据的不及时性
> Hi,
>
> 流表通过JDBCLookupFunction来对mysql的维表数据进行实时join,但是JDBCLookupFunction会对数据进行缓存,导致mysql的维表数据被更新,但是flink还是老的数据,(考虑性能需要被缓存)
> 是否有其他的方式来对mysql维表数据进行实时join
>


Re: Re: Flink 1.10读取kafka数据,打包jar,提交任务到集群运行测试,遇到java.lang.NoClassDefFoundError: org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase问题,求救大神帮忙解答,谢谢!

2020-04-14 Thread wangweigu...@stevegame.cn

 昨天测试了下,除了需要添加 flink-connector-kafka_2.11-1.10.0.jar 
这个外,还需要flink-connector-kafka-base_2.11-1.10.0.jar,感觉Flink在添加依赖jar做的不是很好,添加也不够灵活!

发件人: zhisheng
发送时间: 2020-04-14 15:24
收件人: user-zh
主题: Re: Re: Flink 
1.10读取kafka数据,打包jar,提交任务到集群运行测试,遇到java.lang.NoClassDefFoundError: 
org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase问题,求救大神帮忙解答,谢谢!
应该加了 flink-connector-kafka_2.11-1.10.0.jar 这个就行
 
wangweigu...@stevegame.cn  于2020年4月13日周一
下午3:09写道:
 
>
> 感谢flink道友解答,谢谢!
>
>
> 目前是通过maven来开发flink程序,只是编译打包到集群运行的时候缺少kafka依赖包,flink-connector-kafka_2.11-1.10.0.jar,flink-connector-kafka-base_2.11-1.10.0.jar,kafka-clients-1.0.1-kafka-3.1.1.jar
> 这些添加到lib后,程序运行成功!
>
> 发件人: 刘宇宝
> 发送时间: 2020-04-13 14:59
> 收件人: user-zh@flink.apache.org
> 主题: Re: Flink
> 1.10读取kafka数据,打包jar,提交任务到集群运行测试,遇到java.lang.NoClassDefFoundError:
> org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase问题,求救大神帮忙解答,谢谢!
> 用官方项目模板起步,
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/projectsetup/java_api_quickstart.html
>
> 不要往 flink 里头加 jar 包,在你项目的 pom.xml 里加:
>
>   
> org.apache.flink
>
> flink-connector-kafka_${scala.binary.version}
> ${flink.version}
> 
>
> From: "wangweigu...@stevegame.cn" 
> Reply-To: "user-zh@flink.apache.org" 
> Date: Monday, April 13, 2020 at 2:32 PM
> To: user-zh 
> Subject: Flink
> 1.10读取kafka数据,打包jar,提交任务到集群运行测试,遇到java.lang.NoClassDefFoundError:
> org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase问题,求救大神帮忙解答,谢谢!
>
>
> 你好:
>
> 我在用Flink 1.10读取kafka数据,本地IDEA环境执行没有问题,将代码编译打包(不是fat
> jar)到集群上运行测试,执行提示:java.lang.NoClassDefFoundError:
> org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase问题。
> 我在Flink 1.10集群的每个节点下的
> /lib下都添加了kafka依赖包:flink-connector-kafka_2.11-1.10.0.jar
> 我启动的命令:
> 我先启动了一个Yarn session:
> yarn-session.sh -yd -jm 2048m -tm 2048m -s 10
> 然后在session提交任务测试
> flink run -d -p 2 -m yarn-cluster -c
> com.sdf.flink.streaming.BroadcastOrderJoinGoodsName -yid
> application_1585277813790_0006 ./flink-project_1.10.0-1.0.jar
> 启动就报如下错误:
>[cid:_Foxmail.1@bf61ef0c-2f52-034d-bba5-a41cbf6b4faf]
>
> /lib下的依赖包:
> [cid:_Foxmail.1@0be9c7f1-1b24-8e3e-ea4f-d47b95d9ffaf]
>
> 代码片段:
> [cid:_Foxmail.1@76174c8c-512d-b948-71c9-359c474bf11e]
>
> 就是简单的读取数据,输出测试!
>
> ____
> [
> https://exmail.qq.com/cgi-bin/viewfile?type=signature=ZX1328-4PdHqpEhbWjLSGE47md0b7k=688208663
> ]
>
>
>
>
>
>
> 史蒂夫软件(深圳)有限公司
> 技术部   王卫光
> wangweigu...@stevegame.cn<mailto:wangweigu...@stevegame.cn>
> 地址/Add:深圳南山科区科技园高新南十二道康佳研发大厦A座
> 手机/Mob:13128970998
> http://www.stevengame.com/<http://www.vcanbio.com/>
>


Re: Re: 关于flink 提交job参数不生效的问题

2020-04-14 Thread wangweigu...@stevegame.cn

  应该是你设置的 -ytm 和 -yjm内存大小比yarn container最小容器内存都小吧!
 yarn最小容器内存的参数: yarn.scheduler.minimum-allocation-mb
容器内存增量: yarn.scheduler.increment-allocation-mb

发件人: guanyq
发送时间: 2020-04-14 14:05
收件人: user-zh
主题: Re:Re: 关于flink 提交job参数不生效的问题
./bin/flink run -m yarn-cluster \-ynm TestDataProcess \-ytm 666 
\-yjm 666 \-c 
com.data.processing.unconditionalacceptance.TestDataProcess 
\./tasks/UnconditionalAcceptanceDataProcess.jar \--group.id Test001 
\--checkpoint.interval 5000
在 2020-04-14 14:00:59,"Xintong Song"  写道:
>你邮件里的图片没有显示出来。
>建议把完整的启动命令贴一下。
>
>Thank you~
>
>Xintong Song
>
>
>
>On Tue, Apr 14, 2020 at 1:11 PM guanyq  wrote:
>
>> flink 提交jar包是 指定-ytm不起作用。想知道什么原因?
>>
>>
>>
>>


Re: Re: Flink 1.10读取kafka数据,打包jar,提交任务到集群运行测试,遇到java.lang.NoClassDefFoundError: org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase问题,求救大神帮忙解答,谢谢!

2020-04-13 Thread wangweigu...@stevegame.cn

感谢flink道友解答,谢谢!

目前是通过maven来开发flink程序,只是编译打包到集群运行的时候缺少kafka依赖包,flink-connector-kafka_2.11-1.10.0.jar,flink-connector-kafka-base_2.11-1.10.0.jar,kafka-clients-1.0.1-kafka-3.1.1.jar
这些添加到lib后,程序运行成功!

发件人: 刘宇宝
发送时间: 2020-04-13 14:59
收件人: user-zh@flink.apache.org
主题: Re: Flink 1.10读取kafka数据,打包jar,提交任务到集群运行测试,遇到java.lang.NoClassDefFoundError: 
org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase问题,求救大神帮忙解答,谢谢!
用官方项目模板起步,https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/projectsetup/java_api_quickstart.html
 
不要往 flink 里头加 jar 包,在你项目的 pom.xml 里加:
 
  
org.apache.flink

flink-connector-kafka_${scala.binary.version}
${flink.version}

 
From: "wangweigu...@stevegame.cn" 
Reply-To: "user-zh@flink.apache.org" 
Date: Monday, April 13, 2020 at 2:32 PM
To: user-zh 
Subject: Flink 
1.10读取kafka数据,打包jar,提交任务到集群运行测试,遇到java.lang.NoClassDefFoundError: 
org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase问题,求救大神帮忙解答,谢谢!
 
 
你好:
 
我在用Flink 1.10读取kafka数据,本地IDEA环境执行没有问题,将代码编译打包(不是fat 
jar)到集群上运行测试,执行提示:java.lang.NoClassDefFoundError: 
org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase问题。
我在Flink 1.10集群的每个节点下的 
/lib下都添加了kafka依赖包:flink-connector-kafka_2.11-1.10.0.jar
我启动的命令:
我先启动了一个Yarn session:
yarn-session.sh -yd -jm 2048m -tm 2048m -s 10
然后在session提交任务测试
flink run -d -p 2 -m yarn-cluster -c 
com.sdf.flink.streaming.BroadcastOrderJoinGoodsName -yid 
application_1585277813790_0006 ./flink-project_1.10.0-1.0.jar
启动就报如下错误:
   [cid:_Foxmail.1@bf61ef0c-2f52-034d-bba5-a41cbf6b4faf]
 
/lib下的依赖包:
[cid:_Foxmail.1@0be9c7f1-1b24-8e3e-ea4f-d47b95d9ffaf]
 
代码片段:
[cid:_Foxmail.1@76174c8c-512d-b948-71c9-359c474bf11e]
 
就是简单的读取数据,输出测试!
 

[https://exmail.qq.com/cgi-bin/viewfile?type=signature=ZX1328-4PdHqpEhbWjLSGE47md0b7k=688208663]
 
 
 
 
 
 
史蒂夫软件(深圳)有限公司
技术部   王卫光
wangweigu...@stevegame.cn<mailto:wangweigu...@stevegame.cn>
地址/Add:深圳南山科区科技园高新南十二道康佳研发大厦A座
手机/Mob:13128970998
http://www.stevengame.com/<http://www.vcanbio.com/>


回复: 回复:Flink 1.10读取kafka数据,打包jar,提交任务到集群运行测试,遇到java.lang.NoClassDefFoundError: org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase问题,求救大神帮忙解答,谢谢!

2020-04-13 Thread wangweigu...@stevegame.cn

感谢各位flink道友帮忙解答

  检查了下,/lib 
中缺少了:flink-connector-kafka_2.11-1.10.0.jar,flink-connector-kafka-base_2.11-1.10.0.jar,kafka-clients-1.0.1-kafka-3.1.1.jar
 (根据自己的集群kafka版本来选择)这些jar,添加后程序运行成功!

发件人: 1035262083
发送时间: 2020-04-13 14:33
收件人: user-zh; user-zh
主题: 回复:Flink 1.10读取kafka数据,打包jar,提交任务到集群运行测试,遇到java.lang.NoClassDefFoundError: 
org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase问题,求救大神帮忙解答,谢谢!
你应该还缺少flink-connector-elasticsearch-base_2.11-1.10.0.jar
 
 
 
--原始邮件--
发件人: "wangweigu...@stevegame.cn"http://www.stevengame.com/


Flink 1.10读取kafka数据,打包jar,提交任务到集群运行测试,遇到java.lang.NoClassDefFoundError: org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase问题,求救大神帮忙解答,谢谢!

2020-04-13 Thread wangweigu...@stevegame.cn

你好:

我在用Flink 1.10读取kafka数据,本地IDEA环境执行没有问题,将代码编译打包(不是fat 
jar)到集群上运行测试,执行提示:java.lang.NoClassDefFoundError: 
org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase问题。
我在Flink 1.10集群的每个节点下的 
/lib下都添加了kafka依赖包:flink-connector-kafka_2.11-1.10.0.jar
我启动的命令:
我先启动了一个Yarn session:
yarn-session.sh -yd -jm 2048m -tm 2048m -s 10
然后在session提交任务测试
flink run -d -p 2 -m yarn-cluster -c 
com.sdf.flink.streaming.BroadcastOrderJoinGoodsName -yid 
application_1585277813790_0006 ./flink-project_1.10.0-1.0.jar
启动就报如下错误:
   

/lib下的依赖包:

代码片段:

就是简单的读取数据,输出测试!




  






史蒂夫软件(深圳)有限公司
技术部   王卫光
wangweigu...@stevegame.cn 
地址/Add:深圳南山科区科技园高新南十二道康佳研发大厦A座
手机/Mob:13128970998
http://www.stevengame.com/