Re: Re: jobGroph如何以pre-job的方式提交到yarn运行

2020-03-06 文章 tison
OK 你先说一下你的 Flink 版本是啥,我感觉这个是很久以前的版本,差不多 1.7
的样子,然后没有保密问题的话你的代码文件发一下,然后完整报错堆栈也发一下。

这个可能是你初始化 YarnClusterDescriptor 的时候有问题,你提供的都是很残缺的片段,没法猜测到底是啥原因。

一般来说 Flink 现在的 Client 实现并不是很好,直接使用 CLI 是不会有太多问题的,如果直接依赖 ClusterDescriptor
这些抽象里面有一些潜规则,你自己看的话可能得对着 Flink 的源码使用点逐步调试排查。

Best,
tison.


nicygan  于2020年3月7日周六 下午3:16写道:

> tison,你好运行到这里时,报空指针
> Caused by: java.lang.NullPointerException
> at
> org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.getNodeReports(YarnClientImpl.java:506)
>
> getNodeReports方法中:
> GetClusterNodesResponse response = rmClient.getClusterNodes(request);
> 这句的rmClient为null值。
>
>
>
> 我看YarnClientImpl中,有个start()可为rmClient赋值,但是,我加上此方法执行时又会报以下错误:
> Exception in thread "main"
> org.apache.hadoop.service.ServiceStateException:
> org.apache.hadoop.yarn.client.api.impl.YarnClientImpl cannot enter state
> STARTED from state NOTINITED
> at
> org.apache.hadoop.service.ServiceStateModel.checkStateTransition(ServiceStateModel.java:129)
> at
> org.apache.hadoop.service.ServiceStateModel.enterState(ServiceStateModel.java:111)
> at
> org.apache.hadoop.service.AbstractService.start(AbstractService.java:190)
>
>
>
>
>
>
>
>
> 在 2020-03-07 11:15:10,"tison"  写道:
> >不成功的报错是啥?
> >
> >Best,
> >tison.
> >
> >
> >nicygan  于2020年3月7日周六 上午11:14写道:
> >
> >> dear all:
> >>
> >>
> 我用的flink版本1.9,现在我把flinksql语句生成了jobGroph,现在我不知道如何提交到yarn上,以pre-job方式运行。我尝试通过生成
> >> yarnClusterDescriptor提交代码,但不成功,代码大致如下,各位有没有成功示例求分享。
> >>
> >> ..
> >> JobGraph jobGraph = env.getStreamGraph("StreamSql").getJobGraph();
> >> ..
> >> ..
> >> yarnClusterDescriptor.deployJobCluster(
> >> clusterSpecification,
> >>   jobGraph, true);
> >>
> >>
>


flink elasticsearch sink ????????????????

2020-03-06 文章 ??????
 
elasticsearch flink elasticsearch sink 
exactly-onceflinkelasticsearch
 sink??
??uuId





Re:Re: jobGroph如何以pre-job的方式提交到yarn运行

2020-03-06 文章 nicygan
tison,你好运行到这里时,报空指针
Caused by: java.lang.NullPointerException
at 
org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.getNodeReports(YarnClientImpl.java:506)

getNodeReports方法中:
GetClusterNodesResponse response = rmClient.getClusterNodes(request);
这句的rmClient为null值。



我看YarnClientImpl中,有个start()可为rmClient赋值,但是,我加上此方法执行时又会报以下错误:
Exception in thread "main" org.apache.hadoop.service.ServiceStateException: 
org.apache.hadoop.yarn.client.api.impl.YarnClientImpl cannot enter state 
STARTED from state NOTINITED
at 
org.apache.hadoop.service.ServiceStateModel.checkStateTransition(ServiceStateModel.java:129)
at 
org.apache.hadoop.service.ServiceStateModel.enterState(ServiceStateModel.java:111)
at org.apache.hadoop.service.AbstractService.start(AbstractService.java:190)








在 2020-03-07 11:15:10,"tison"  写道:
>不成功的报错是啥?
>
>Best,
>tison.
>
>
>nicygan  于2020年3月7日周六 上午11:14写道:
>
>> dear all:
>>
>> 我用的flink版本1.9,现在我把flinksql语句生成了jobGroph,现在我不知道如何提交到yarn上,以pre-job方式运行。我尝试通过生成
>> yarnClusterDescriptor提交代码,但不成功,代码大致如下,各位有没有成功示例求分享。
>>
>> ..
>> JobGraph jobGraph = env.getStreamGraph("StreamSql").getJobGraph();
>> ..
>> ..
>> yarnClusterDescriptor.deployJobCluster(
>> clusterSpecification,
>>   jobGraph, true);
>>
>>


Re: jobGroph如何以pre-job的方式提交到yarn运行

2020-03-06 文章 tison
不成功的报错是啥?

Best,
tison.


nicygan  于2020年3月7日周六 上午11:14写道:

> dear all:
>
> 我用的flink版本1.9,现在我把flinksql语句生成了jobGroph,现在我不知道如何提交到yarn上,以pre-job方式运行。我尝试通过生成
> yarnClusterDescriptor提交代码,但不成功,代码大致如下,各位有没有成功示例求分享。
>
> ..
> JobGraph jobGraph = env.getStreamGraph("StreamSql").getJobGraph();
> ..
> ..
> yarnClusterDescriptor.deployJobCluster(
> clusterSpecification,
>   jobGraph, true);
>
>


jobGroph如何以pre-job的方式提交到yarn运行

2020-03-06 文章 nicygan
dear all:

我用的flink版本1.9,现在我把flinksql语句生成了jobGroph,现在我不知道如何提交到yarn上,以pre-job方式运行。我尝试通过生成
yarnClusterDescriptor提交代码,但不成功,代码大致如下,各位有没有成功示例求分享。

..
JobGraph jobGraph = env.getStreamGraph("StreamSql").getJobGraph();
..
..
yarnClusterDescriptor.deployJobCluster(
clusterSpecification,
  jobGraph, true);



回复: 如何通过Flink SQL注册Hbase源表

2020-03-06 文章 psyche19830...@163.com
已经解决了,感谢!



psyche19830...@163.com
 
发件人: psyche19830...@163.com
发送时间: 2020-03-06 17:44
收件人: user-zh
主题: 如何通过Flink SQL注册Hbase源表
各位好,
最近公司想用flink来做流式计算,在研究Flink SQL读写HBase的时候遇到一些问题,希望能从您们那里得到帮忙。
我在Hbase的默认命名空间里,创建了一个resume表,表结构如下:

我的Flink测试代码如下:
@Test
public void testReadFromHBase() throws Exception {
StreamExecutionEnvironment 
env=StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings settings = 
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, 
settings);
//HBaseTableSource resume = new HBaseTableSource();
tableEnv.sqlUpdate("create table resume(\n" +
" binfo ROW<>,\n" + 
" edu ROW<>,  \n" +
" work ROW<>  \n" +
") with (" +
" 'connector.type' = 'hbase',  " +
" 'connector.version' = '1.4.3', " +
" 'connector.table-name' = 'resume'," +
" 'connector.zookeeper.quorum' = 
'flink01.flink.net:2181,flink02.flink:2181,flink03.flink:2181'," +
" 'connector.zookeeper.znode.parent' = '/hbase'" +
")");
Table table = tableEnv.sqlQuery("select * from resume");
DataStream> out = tableEnv.toRetractStream(table, 
Row.class);
out.print();
env.execute();
}
运行报下面的错误:
org.apache.flink.table.api.ValidationException: Could not map binfo column to 
the underlying physical type root
. No such field.

at 
org.apache.flink.table.utils.TypeMappingUtils.lambda$null$7(TypeMappingUtils.java:223)
at java.util.OptionalInt.orElseThrow(OptionalInt.java:189)






psyche19830...@163.com


如何通过Flink SQL注册Hbase源表

2020-03-06 文章 psyche19830...@163.com
各位好,
最近公司想用flink来做流式计算,在研究Flink SQL读写HBase的时候遇到一些问题,希望能从您们那里得到帮忙。
我在Hbase的默认命名空间里,创建了一个resume表,表结构如下:

我的Flink测试代码如下:
@Test
public void testReadFromHBase() throws Exception {
StreamExecutionEnvironment 
env=StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings settings = 
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, 
settings);
//HBaseTableSource resume = new HBaseTableSource();
tableEnv.sqlUpdate("create table resume(\n" +
" binfo ROW<>,\n" + 
" edu ROW<>,  \n" +
" work ROW<>  \n" +
") with (" +
" 'connector.type' = 'hbase',  " +
" 'connector.version' = '1.4.3', " +
" 'connector.table-name' = 'resume'," +
" 'connector.zookeeper.quorum' = 
'flink01.flink.net:2181,flink02.flink:2181,flink03.flink:2181'," +
" 'connector.zookeeper.znode.parent' = '/hbase'" +
")");
Table table = tableEnv.sqlQuery("select * from resume");
DataStream> out = tableEnv.toRetractStream(table, 
Row.class);
out.print();
env.execute();
}
运行报下面的错误:
org.apache.flink.table.api.ValidationException: Could not map binfo column to 
the underlying physical type root
. No such field.

at 
org.apache.flink.table.utils.TypeMappingUtils.lambda$null$7(TypeMappingUtils.java:223)
at java.util.OptionalInt.orElseThrow(OptionalInt.java:189)






psyche19830...@163.com


回复:keyby后滚动窗口,watermark如何只触发所在的组,而不触发所有的组

2020-03-06 文章 17626017841
Hi,
Keyby是把相同的key分配到同一个窗口处理,哪些key分配到哪个窗口,跟你设置得的窗口并行度有关?如果想把某个key分到单独的一个窗口实例我觉得需要自定义partition.


Best,
Sun.Zhu




| |
17626017841
|
|
邮箱:17626017...@163.com
|

Signature is customized by Netease Mail Master

在2020年03月06日 17:20,Utopia 写道:
Hi,

Flink 的水印在同一个算子上都是一样的,所以每个 keyed stream 共享的是一个水印,不能分别触发器。

Best  regards
Utopia
在 2020年3月6日 +0800 17:10,小旋锋 ,写道:
> 大家好:
> 数据流经过 keyby 分组后,在分别进入滚动窗口:
> sourceDataStream .keyBy(id) 
> .window(TumblingEventTimeWindows.of(Time.seconds(10L))).reduce() 
> .print()
>
> 测试数据:id 从1~1500,每个id由两条数据,每条数据的 
> eventtime是一样的,最后一条是id为1,eventtime大了20秒用来触发窗口计算的数据
> 实验结果:最后一条用来触发的数据发出后,所有的窗口都计算输出了
>   
> 请问如何配置 最后一条id为1的数据只触发 id=1所在的组的窗口呢?
>
>
> 感谢


Re: keyby后滚动窗口,watermark如何只触发所在的组,而不触发所有的组

2020-03-06 文章 Utopia
Hi,

Flink 的水印在同一个算子上都是一样的,所以每个 keyed stream 共享的是一个水印,不能分别触发器。

Best  regards
Utopia
在 2020年3月6日 +0800 17:10,小旋锋 ,写道:
> 大家好:
> 数据流经过 keyby 分组后,在分别进入滚动窗口:
> sourceDataStream .keyBy(id) 
> .window(TumblingEventTimeWindows.of(Time.seconds(10L))).reduce() 
> .print()
>
> 测试数据:id 从1~1500,每个id由两条数据,每条数据的 
> eventtime是一样的,最后一条是id为1,eventtime大了20秒用来触发窗口计算的数据
> 实验结果:最后一条用来触发的数据发出后,所有的窗口都计算输出了
>   
> 请问如何配置 最后一条id为1的数据只触发 id=1所在的组的窗口呢?
>
>
> 感谢


keyby????????????watermark????????????????????????????????????

2020-03-06 文章 ??????

?? keyby 
sourceDataStream .keyBy(id) 
.window(TumblingEventTimeWindows.of(Time.seconds(10L))).reduce() .print()
   
??id ??1~1500??id?? 
eventtimeid??1??eventtime20

??
  
 id??1 
id=1??