Re: Re: jobGroph如何以pre-job的方式提交到yarn运行
OK 你先说一下你的 Flink 版本是啥,我感觉这个是很久以前的版本,差不多 1.7 的样子,然后没有保密问题的话你的代码文件发一下,然后完整报错堆栈也发一下。 这个可能是你初始化 YarnClusterDescriptor 的时候有问题,你提供的都是很残缺的片段,没法猜测到底是啥原因。 一般来说 Flink 现在的 Client 实现并不是很好,直接使用 CLI 是不会有太多问题的,如果直接依赖 ClusterDescriptor 这些抽象里面有一些潜规则,你自己看的话可能得对着 Flink 的源码使用点逐步调试排查。 Best, tison. nicygan 于2020年3月7日周六 下午3:16写道: > tison,你好运行到这里时,报空指针 > Caused by: java.lang.NullPointerException > at > org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.getNodeReports(YarnClientImpl.java:506) > > getNodeReports方法中: > GetClusterNodesResponse response = rmClient.getClusterNodes(request); > 这句的rmClient为null值。 > > > > 我看YarnClientImpl中,有个start()可为rmClient赋值,但是,我加上此方法执行时又会报以下错误: > Exception in thread "main" > org.apache.hadoop.service.ServiceStateException: > org.apache.hadoop.yarn.client.api.impl.YarnClientImpl cannot enter state > STARTED from state NOTINITED > at > org.apache.hadoop.service.ServiceStateModel.checkStateTransition(ServiceStateModel.java:129) > at > org.apache.hadoop.service.ServiceStateModel.enterState(ServiceStateModel.java:111) > at > org.apache.hadoop.service.AbstractService.start(AbstractService.java:190) > > > > > > > > > 在 2020-03-07 11:15:10,"tison" 写道: > >不成功的报错是啥? > > > >Best, > >tison. > > > > > >nicygan 于2020年3月7日周六 上午11:14写道: > > > >> dear all: > >> > >> > 我用的flink版本1.9,现在我把flinksql语句生成了jobGroph,现在我不知道如何提交到yarn上,以pre-job方式运行。我尝试通过生成 > >> yarnClusterDescriptor提交代码,但不成功,代码大致如下,各位有没有成功示例求分享。 > >> > >> .. > >> JobGraph jobGraph = env.getStreamGraph("StreamSql").getJobGraph(); > >> .. > >> .. > >> yarnClusterDescriptor.deployJobCluster( > >> clusterSpecification, > >> jobGraph, true); > >> > >> >
flink elasticsearch sink ????????????????
elasticsearch flink elasticsearch sink exactly-onceflinkelasticsearch sink?? ??uuId
Re:Re: jobGroph如何以pre-job的方式提交到yarn运行
tison,你好运行到这里时,报空指针 Caused by: java.lang.NullPointerException at org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.getNodeReports(YarnClientImpl.java:506) getNodeReports方法中: GetClusterNodesResponse response = rmClient.getClusterNodes(request); 这句的rmClient为null值。 我看YarnClientImpl中,有个start()可为rmClient赋值,但是,我加上此方法执行时又会报以下错误: Exception in thread "main" org.apache.hadoop.service.ServiceStateException: org.apache.hadoop.yarn.client.api.impl.YarnClientImpl cannot enter state STARTED from state NOTINITED at org.apache.hadoop.service.ServiceStateModel.checkStateTransition(ServiceStateModel.java:129) at org.apache.hadoop.service.ServiceStateModel.enterState(ServiceStateModel.java:111) at org.apache.hadoop.service.AbstractService.start(AbstractService.java:190) 在 2020-03-07 11:15:10,"tison" 写道: >不成功的报错是啥? > >Best, >tison. > > >nicygan 于2020年3月7日周六 上午11:14写道: > >> dear all: >> >> 我用的flink版本1.9,现在我把flinksql语句生成了jobGroph,现在我不知道如何提交到yarn上,以pre-job方式运行。我尝试通过生成 >> yarnClusterDescriptor提交代码,但不成功,代码大致如下,各位有没有成功示例求分享。 >> >> .. >> JobGraph jobGraph = env.getStreamGraph("StreamSql").getJobGraph(); >> .. >> .. >> yarnClusterDescriptor.deployJobCluster( >> clusterSpecification, >> jobGraph, true); >> >>
Re: jobGroph如何以pre-job的方式提交到yarn运行
不成功的报错是啥? Best, tison. nicygan 于2020年3月7日周六 上午11:14写道: > dear all: > > 我用的flink版本1.9,现在我把flinksql语句生成了jobGroph,现在我不知道如何提交到yarn上,以pre-job方式运行。我尝试通过生成 > yarnClusterDescriptor提交代码,但不成功,代码大致如下,各位有没有成功示例求分享。 > > .. > JobGraph jobGraph = env.getStreamGraph("StreamSql").getJobGraph(); > .. > .. > yarnClusterDescriptor.deployJobCluster( > clusterSpecification, > jobGraph, true); > >
jobGroph如何以pre-job的方式提交到yarn运行
dear all: 我用的flink版本1.9,现在我把flinksql语句生成了jobGroph,现在我不知道如何提交到yarn上,以pre-job方式运行。我尝试通过生成 yarnClusterDescriptor提交代码,但不成功,代码大致如下,各位有没有成功示例求分享。 .. JobGraph jobGraph = env.getStreamGraph("StreamSql").getJobGraph(); .. .. yarnClusterDescriptor.deployJobCluster( clusterSpecification, jobGraph, true);
回复: 如何通过Flink SQL注册Hbase源表
已经解决了,感谢! psyche19830...@163.com 发件人: psyche19830...@163.com 发送时间: 2020-03-06 17:44 收件人: user-zh 主题: 如何通过Flink SQL注册Hbase源表 各位好, 最近公司想用flink来做流式计算,在研究Flink SQL读写HBase的时候遇到一些问题,希望能从您们那里得到帮忙。 我在Hbase的默认命名空间里,创建了一个resume表,表结构如下: 我的Flink测试代码如下: @Test public void testReadFromHBase() throws Exception { StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings); //HBaseTableSource resume = new HBaseTableSource(); tableEnv.sqlUpdate("create table resume(\n" + " binfo ROW<>,\n" + " edu ROW<>, \n" + " work ROW<> \n" + ") with (" + " 'connector.type' = 'hbase', " + " 'connector.version' = '1.4.3', " + " 'connector.table-name' = 'resume'," + " 'connector.zookeeper.quorum' = 'flink01.flink.net:2181,flink02.flink:2181,flink03.flink:2181'," + " 'connector.zookeeper.znode.parent' = '/hbase'" + ")"); Table table = tableEnv.sqlQuery("select * from resume"); DataStream> out = tableEnv.toRetractStream(table, Row.class); out.print(); env.execute(); } 运行报下面的错误: org.apache.flink.table.api.ValidationException: Could not map binfo column to the underlying physical type root . No such field. at org.apache.flink.table.utils.TypeMappingUtils.lambda$null$7(TypeMappingUtils.java:223) at java.util.OptionalInt.orElseThrow(OptionalInt.java:189) psyche19830...@163.com
如何通过Flink SQL注册Hbase源表
各位好, 最近公司想用flink来做流式计算,在研究Flink SQL读写HBase的时候遇到一些问题,希望能从您们那里得到帮忙。 我在Hbase的默认命名空间里,创建了一个resume表,表结构如下: 我的Flink测试代码如下: @Test public void testReadFromHBase() throws Exception { StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings); //HBaseTableSource resume = new HBaseTableSource(); tableEnv.sqlUpdate("create table resume(\n" + " binfo ROW<>,\n" + " edu ROW<>, \n" + " work ROW<> \n" + ") with (" + " 'connector.type' = 'hbase', " + " 'connector.version' = '1.4.3', " + " 'connector.table-name' = 'resume'," + " 'connector.zookeeper.quorum' = 'flink01.flink.net:2181,flink02.flink:2181,flink03.flink:2181'," + " 'connector.zookeeper.znode.parent' = '/hbase'" + ")"); Table table = tableEnv.sqlQuery("select * from resume"); DataStream> out = tableEnv.toRetractStream(table, Row.class); out.print(); env.execute(); } 运行报下面的错误: org.apache.flink.table.api.ValidationException: Could not map binfo column to the underlying physical type root . No such field. at org.apache.flink.table.utils.TypeMappingUtils.lambda$null$7(TypeMappingUtils.java:223) at java.util.OptionalInt.orElseThrow(OptionalInt.java:189) psyche19830...@163.com
回复:keyby后滚动窗口,watermark如何只触发所在的组,而不触发所有的组
Hi, Keyby是把相同的key分配到同一个窗口处理,哪些key分配到哪个窗口,跟你设置得的窗口并行度有关?如果想把某个key分到单独的一个窗口实例我觉得需要自定义partition. Best, Sun.Zhu | | 17626017841 | | 邮箱:17626017...@163.com | Signature is customized by Netease Mail Master 在2020年03月06日 17:20,Utopia 写道: Hi, Flink 的水印在同一个算子上都是一样的,所以每个 keyed stream 共享的是一个水印,不能分别触发器。 Best regards Utopia 在 2020年3月6日 +0800 17:10,小旋锋 ,写道: > 大家好: > 数据流经过 keyby 分组后,在分别进入滚动窗口: > sourceDataStream .keyBy(id) > .window(TumblingEventTimeWindows.of(Time.seconds(10L))).reduce() > .print() > > 测试数据:id 从1~1500,每个id由两条数据,每条数据的 > eventtime是一样的,最后一条是id为1,eventtime大了20秒用来触发窗口计算的数据 > 实验结果:最后一条用来触发的数据发出后,所有的窗口都计算输出了 > > 请问如何配置 最后一条id为1的数据只触发 id=1所在的组的窗口呢? > > > 感谢
Re: keyby后滚动窗口,watermark如何只触发所在的组,而不触发所有的组
Hi, Flink 的水印在同一个算子上都是一样的,所以每个 keyed stream 共享的是一个水印,不能分别触发器。 Best regards Utopia 在 2020年3月6日 +0800 17:10,小旋锋 ,写道: > 大家好: > 数据流经过 keyby 分组后,在分别进入滚动窗口: > sourceDataStream .keyBy(id) > .window(TumblingEventTimeWindows.of(Time.seconds(10L))).reduce() > .print() > > 测试数据:id 从1~1500,每个id由两条数据,每条数据的 > eventtime是一样的,最后一条是id为1,eventtime大了20秒用来触发窗口计算的数据 > 实验结果:最后一条用来触发的数据发出后,所有的窗口都计算输出了 > > 请问如何配置 最后一条id为1的数据只触发 id=1所在的组的窗口呢? > > > 感谢
keyby????????????watermark????????????????????????????????????
?? keyby sourceDataStream .keyBy(id) .window(TumblingEventTimeWindows.of(Time.seconds(10L))).reduce() .print() ??id ??1~1500??id?? eventtimeid??1??eventtime20 ?? id??1 id=1??