Re: Flink SQL 1.10.0窗口计算结果无法sink

2020-06-19 文章 王超
除了pravega connector之外的有没有什么地方有问题呢?

On Fri, Jun 19, 2020 at 18:43 Leonard Xu  wrote:

> Hi,
> pravega connector[1] 应该不是社区提供的,之前没看过这个connector的代码,
> 看你的描述,可以检查下写入时是否有一些参数需要设置。
>
>
> 祝好,
> Leonard Xu
> [1] https://github.com/pravega/flink-connectors <
> https://github.com/pravega/flink-connectors>
>
>
> > 在 2020年6月19日,13:31,王超 <1984chaow...@gmail.com> 写道:
> >
> > 各位大神求帮忙看一下。
> >
> > Flink 版本:1.10.0
> > Planner:blink
> >
> > 我在使用Flink SQL的时候遇到了一个问题,能否帮忙看一下,我尝试在寻找了解决方法,但是没有起作用。
> > 比如我发现类似的问题
> > https://www.mail-archive.com/user-zh@flink.apache.org/msg03916.html
> > 中描述的问题,根据这个mail中的解决方法我设置了timezone,但是问题没有被解决。
> >
> >
> > Flink Table Env配置
> > *StreamExecutionEnvironment env =
> > StreamExecutionEnvironment.getExecutionEnvironment();*
> > *env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);*
> > *env.setParallelism(1);*
> > *EnvironmentSettings envSetting =
> >
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
> >  *
> > *StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,
> > envSetting);*
> > *tableEnv.getConfig().setLocalTimeZone(ZoneId.of("Asia/Shanghai"));  *
> >
> >
> > 我这个job应用中定义了两个table,分别为source table “sqlDdlAnaTable”
> >
> > *String sqlDdlAnaTable = "CREATE TABLE ana_Source(type INT, datatime
> > BIGINT, list ARRAY , ts AS
> > TO_TIMESTAMP(FROM_UNIXTIME(datatime)), WATERMARK FOR ts AS ts - INTERVAL
> > '5' SECOND)" +*
> > *" WITH (" +*
> > *"'connector.type' = 'pravega'," +*
> > *"'connector.version' = '1'," +*
> > *"'connector.connection-config.controller-uri'=
> > 'tcp://192.168.188.130:9090 '," +*
> > *"'connector.connection-config.default-scope' = 'Demo',"
> +*
> > *"'connector.reader.stream-info.0.stream' = 'test'," +*
> > *"'format.type' = 'json'," +*
> > *"'format.fail-on-missing-field' = 'false', " +*
> > *"'update-mode' = 'append')";*
> >
> > 和sink table " sqlDdlSinkTable ".
> >
> > * String sqlDdlSinkTable = "CREATE TABLE tb_sink" +*
> > *"(id STRING, " +*
> > *"wStart TIMESTAMP(3) , " +*
> > *"v FLOAT)" +*
> > *" WITH (" +*
> > *"'connector.type' = 'pravega'," +*
> > *"'connector.version' = '1'," +*
> > *"'connector.connection-config.controller-uri'=
> > 'tcp://192.168.188.130:9090 '," +*
> > *"'connector.connection-config.default-scope' = 'Demo',"
> +*
> > *"'connector.writer.stream' = 'result'," +*
> > *"'connector.writer.routingkey-field-name' = 'id'," +*
> > *"'connector.writer.mode' = 'atleast_once'," +*
> > *"'format.type' = 'json'," +*
> > *"'update-mode' = 'append')";*
> >
> > 在数据处理逻辑比较简单,计算10s tumble window的vaule的平均。
> > 我一开始直接打印结果能够明确看到10s中输出一次计算结果,watermark也正常移动。
> > *String sqlAna = "SELECT ts, id, v " +*
> > *"FROM tb_JsonRecord " +*
> > *"WHERE q=1 AND type=1";*
> > *Table tableAnaRecord = tableEnv.sqlQuery(sqlAna);*
> > *tableEnv.registerTable("tb_AnaRecord", tableAnaRecord);*
> >
> > *tableEnv.toAppendStream(tableAnaRecord, Row.class).print()*
> >
> >
> >
> > 但是我尝试将结果insert到sink table中发现,就没有任何结果被写入。
> > *String sqlAnaAvg = "INSERT INTO tb_sink(id, wStart, v) " +*
> > *"SELECT id, " +*
> > *"TUMBLE_START(ts, INTERVAL '10' SECOND) as wStart, "  +*
> > *"AVG(v) FROM tb_AnaRecord " +*
> > *"GROUP BY TUMBLE(ts, INTERVAL '10' SECOND), id";   *
> > * tableEnv.sqlUpdate(sqlAnaAvg);*
> >
> >
> > 提前感谢!
> >
> > BR//Chao
>
> --
发自移动版 Gmail


Re: Re: Flink 多Sink 数据一致性保证

2020-06-19 文章 xueaohui_...@163.com
看了一下 TwoPhaseCommitSinkFunction ,因为我测试的时候是自己写的Sink,没有继承这个Sink去实现,所以是我的问题。谢谢



xueaohui_...@163.com
 
发件人: xueaohui_...@163.com
发送时间: 2020-06-20 09:54
收件人: user-zh
抄送: yungao.gy >
主题: Re: Re: Flink 多Sink 数据一致性保证
Hello: 
 我手动试了一下,一个Sink的抛异常是不会影响另外一个Sink的写入的。
 引用: 按二阶段提交的逻辑,这种情况下kafka这边的事务也会被abort掉,数据不会真正写入。
 这个是需要手动开启吗?



xueaohui_...@163.com
 
发件人: yunyungao...@aliyun.com>Gao
发送时间: 2020-06-19 15:21
收件人: xueaohui_...@163.com; user-zh
主题: Re: Flink 多Sink 数据一致性保证
Hello,
 

我理解多个sink的情况下,数据保证写入仍然发生在数据写入后的checkpoint完成,如果写入Hbase这边写入失败的时候会触发failover的话,按二阶段提交的逻辑,这种情况下kafka这边的事务也会被abort掉,数据不会真正写入。
 
 
--Original Mail --
Sender:xueaohui_...@163.com 
Send Date:Thu Jun 18 19:39:46 2020
Recipients:user-zh 
Subject:Flink 多Sink 数据一致性保证
 
 
 
如上图所示,目前通过把作业加入多个sink,这种场景下面当hbase写入的失败的时候,不影响kakfa的写入。期望hbase写入失败,kafka也不发送。
如何保证hbase和kafka的写入为原子性呢?
不知道flink是否有多sink的二阶段提交方案。
 
 
 
xueaohui_...@163.com


Re: Re: Re: Re: Flink 多Sink 数据一致性保证

2020-06-19 文章 Yun Gao
下面这个是一个单独的问题;我想说的是逻辑上hbase写挂导致failover的时候kafka应该会abort掉当前事务,但是反过来kafka写错的时候,hbase这边应该没有办法不写入的,如果按现在的默认实现的话。


--
Sender:Yun Gao
Date:2020/06/20 10:42:23
Recipient:xueaohui_...@163.com; 
user-zh
Theme:Re: Re: Re: Flink 多Sink 数据一致性保证

hello~,
一个Sink的抛异常是不会影响另外一个Sink的写入的: 
这个是指hbase的异常不会影响kafka的写入?hbase的异常有导致failover么?逻辑上如果出现整个作业的failover,kafka的sink也会开始走failover,然后把当前的事务failover掉,然后整个作业回退到上一个checkpoint开始执行。

   不过还有一个问题是现在hbase的sink的写法是?因为hbase默认应该hbase sink function本身应该不是基于两阶段提交的。


此致
敬礼



 --Original Mail --
Sender:xueaohui_...@163.com 
Send Date:Sat Jun 20 09:54:59 2020
Recipients:user-zh 
CC:yungao.gy 
Subject:Re: Re: Flink 多Sink 数据一致性保证

Hello: 
 我手动试了一下,一个Sink的抛异常是不会影响另外一个Sink的写入的。
引用: 按二阶段提交的逻辑,这种情况下kafka这边的事务也会被abort掉,数据不会真正写入。
 这个是需要手动开启吗?



xueaohui_...@163.com
发件人: yunyungao...@aliyun.com>Gao
发送时间: 2020-06-19 15:21
收件人: xueaohui_...@163.com; user-zh
主题: Re: Flink 多Sink 数据一致性保证
Hello,

我理解多个sink的情况下,数据保证写入仍然发生在数据写入后的checkpoint完成,如果写入Hbase这边写入失败的时候会触发failover的话,按二阶段提交的逻辑,这种情况下kafka这边的事务也会被abort掉,数据不会真正写入。
 --Original Mail --
Sender:xueaohui_...@163.com 
Send Date:Thu Jun 18 19:39:46 2020
Recipients:user-zh 
Subject:Flink 多Sink 数据一致性保证
如上图所示,目前通过把作业加入多个sink,这种场景下面当hbase写入的失败的时候,不影响kakfa的写入。期望hbase写入失败,kafka也不发送。
如何保证hbase和kafka的写入为原子性呢?
不知道flink是否有多sink的二阶段提交方案。
xueaohui_...@163.com


Re: Re: Re: Flink 多Sink 数据一致性保证

2020-06-19 文章 Yun Gao
hello~,
一个Sink的抛异常是不会影响另外一个Sink的写入的: 
这个是指hbase的异常不会影响kafka的写入?hbase的异常有导致failover么?逻辑上如果出现整个作业的failover,kafka的sink也会开始走failover,然后把当前的事务failover掉,然后整个作业回退到上一个checkpoint开始执行。

   不过还有一个问题是现在hbase的sink的写法是?因为hbase默认应该hbase sink function本身应该不是基于两阶段提交的。


此致
敬礼



 --Original Mail --
Sender:xueaohui_...@163.com 
Send Date:Sat Jun 20 09:54:59 2020
Recipients:user-zh 
CC:yungao.gy 
Subject:Re: Re: Flink 多Sink 数据一致性保证

Hello: 
 我手动试了一下,一个Sink的抛异常是不会影响另外一个Sink的写入的。
引用: 按二阶段提交的逻辑,这种情况下kafka这边的事务也会被abort掉,数据不会真正写入。
 这个是需要手动开启吗?



xueaohui_...@163.com
发件人: yunyungao...@aliyun.com>Gao
发送时间: 2020-06-19 15:21
收件人: xueaohui_...@163.com; user-zh
主题: Re: Flink 多Sink 数据一致性保证
Hello,

我理解多个sink的情况下,数据保证写入仍然发生在数据写入后的checkpoint完成,如果写入Hbase这边写入失败的时候会触发failover的话,按二阶段提交的逻辑,这种情况下kafka这边的事务也会被abort掉,数据不会真正写入。
 --Original Mail --
Sender:xueaohui_...@163.com 
Send Date:Thu Jun 18 19:39:46 2020
Recipients:user-zh 
Subject:Flink 多Sink 数据一致性保证
如上图所示,目前通过把作业加入多个sink,这种场景下面当hbase写入的失败的时候,不影响kakfa的写入。期望hbase写入失败,kafka也不发送。
如何保证hbase和kafka的写入为原子性呢?
不知道flink是否有多sink的二阶段提交方案。
xueaohui_...@163.com

?????? Re: Flink ??Sink ??????????????

2020-06-19 文章 ??????
??kafka??sinkhbase 
sinkhbase??kafka----
??:xueaohui_...@163.com

回复:Flink 多Sink 数据一致性保证

2020-06-19 文章 Yichao Yang
Hi


能不能描述一下你是什么样的场景下才会需要到多sink一致性保证?
可不可以换一下思路,选择只出kafka的数据,然后把kafka数据实时同步hbase。


Best,
Yichao Yang



发自我的iPhone


-- 原始邮件 --
发件人: xueaohui_...@163.com 

回复: flink1.9 on yarn 运行二个多月之后出现错误

2020-06-19 文章 xueaohui_...@163.com
不知道有没有yarn上面的详细日志。

hdfs是否有权限问题



xueaohui_...@163.com
 
发件人: guanyq
发送时间: 2020-06-20 08:48
收件人: user-zh
主题: flink1.9 on yarn 运行二个多月之后出现错误
附件为错误日志。哪位大佬帮忙分析下。


 


Re: Re: Flink 多Sink 数据一致性保证

2020-06-19 文章 xueaohui_...@163.com
Hello: 
 我手动试了一下,一个Sink的抛异常是不会影响另外一个Sink的写入的。
 引用: 按二阶段提交的逻辑,这种情况下kafka这边的事务也会被abort掉,数据不会真正写入。
 这个是需要手动开启吗?



xueaohui_...@163.com
 
发件人: yunyungao...@aliyun.com>Gao
发送时间: 2020-06-19 15:21
收件人: xueaohui_...@163.com; user-zh
主题: Re: Flink 多Sink 数据一致性保证
Hello,
 

我理解多个sink的情况下,数据保证写入仍然发生在数据写入后的checkpoint完成,如果写入Hbase这边写入失败的时候会触发failover的话,按二阶段提交的逻辑,这种情况下kafka这边的事务也会被abort掉,数据不会真正写入。
 
 
--Original Mail --
Sender:xueaohui_...@163.com 
Send Date:Thu Jun 18 19:39:46 2020
Recipients:user-zh 
Subject:Flink 多Sink 数据一致性保证
 
 
 
如上图所示,目前通过把作业加入多个sink,这种场景下面当hbase写入的失败的时候,不影响kakfa的写入。期望hbase写入失败,kafka也不发送。
如何保证hbase和kafka的写入为原子性呢?
不知道flink是否有多sink的二阶段提交方案。
 
 
 
xueaohui_...@163.com


flink1.9 on yarn 运行二个多月之后出现错误

2020-06-19 文章 guanyq
附件为错误日志。哪位大佬帮忙分析下。2020-06-20 08:39:47,829 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner  
- 

2020-06-20 08:39:47,830 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner  
-  Starting YARN TaskExecutor runner (Version: 1.9.2, Rev:c9d2c90, 
Date:24.01.2020 @ 08:44:30 CST)
2020-06-20 08:39:47,830 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner  
-  OS current user: ocdc
2020-06-20 08:39:48,235 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner  
-  Current Hadoop/Kerberos user: ocdp
2020-06-20 08:39:48,235 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner  
-  JVM: Java HotSpot(TM) 64-Bit Server VM - Oracle Corporation - 
1.8/25.152-b16
2020-06-20 08:39:48,235 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner  
-  Maximum heap size: 5300 MiBytes
2020-06-20 08:39:48,235 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner  
-  JAVA_HOME: /usr/local/jdk1.8.0_152
2020-06-20 08:39:48,236 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner  
-  Hadoop version: 2.7.3.2.6.0.3-8
2020-06-20 08:39:48,236 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner  
-  JVM Options:
2020-06-20 08:39:48,236 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner  
- -Xms5529m
2020-06-20 08:39:48,236 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner  
- -Xmx5529m
2020-06-20 08:39:48,236 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner  
- -XX:MaxDirectMemorySize=2663m
2020-06-20 08:39:48,236 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner  
- -Dfile.encoding=UTF-8
2020-06-20 08:39:48,236 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner  
- 
-Dlog.file=/data09/hadoop/yarn/log/application_1567067657620_0251/container_e07_1567067657620_0251_01_05/taskmanager.log
2020-06-20 08:39:48,236 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner  
- -Dlogback.configurationFile=file:./logback.xml
2020-06-20 08:39:48,237 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner  
- -Dlog4j.configuration=file:./log4j.properties
2020-06-20 08:39:48,237 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner  
-  Program Arguments:
2020-06-20 08:39:48,237 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner  
- --configDir
2020-06-20 08:39:48,237 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner  
- .
2020-06-20 08:39:48,237 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner  
- -Dweb.port=0
2020-06-20 08:39:48,237 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner  
- -Djobmanager.rpc.address=audit-dp04
2020-06-20 08:39:48,237 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner  
- -Dtaskmanager.memory.size=4058744064b
2020-06-20 08:39:48,237 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner  
- 
-Dweb.tmpdir=/tmp/flink-web-3ead1dd7-b12c-4a61-9a5d-793743c58302
2020-06-20 08:39:48,237 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner  
- -Djobmanager.rpc.port=57053
2020-06-20 08:39:48,237 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner  
- -Drest.address=audit-dp04
2020-06-20 08:39:48,237 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner  
-  Classpath: 

Re:Re: Re: flink run from checkpoit failed

2020-06-19 文章 Zhou Zach
我是per job模式,不是yarn session模式啊

















At 2020-06-19 20:06:47, "Rui Li"  wrote:
>那得重启yarn session,再把作业提交上去
>
>On Fri, Jun 19, 2020 at 6:22 PM Zhou Zach  wrote:
>
>>
>>
>>
>>
>>
>>
>> 用yarn application kill flink job把yarn的application杀掉了,杀掉后yarn没有重启flink job
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> 在 2020-06-19 17:54:45,"Rui Li"  写道:
>> >用yarn application kill flink job是说把yarn的application杀掉了吗?杀掉以后有没有重启呀
>> >
>> >On Fri, Jun 19, 2020 at 4:09 PM Zhou Zach  wrote:
>> >
>> >>
>> >>
>> >> 在flink-1.10.0/conf/flink-conf.yaml中加了下面两个超时参数,不起作用
>> >> akka.client.timeout: 6
>> >> akka.ask.timeout: 600
>> >>
>> >> 有大佬知道是什么原因吗
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >> 在 2020-06-19 14:57:05,"Zhou Zach"  写道:
>> >> >
>> >> >
>> >> >
>> >> >
>> >> >用yarn application kill flink job后,
>> >> >执行/opt/flink-1.10.0/bin/flink run -s
>> >>
>> /user/flink10/checkpoints/69e450574d8520ac5961e20a6fc4798a/chk-18/_metadata
>> >> -d -c dataflow.sql.FromKafkaSinkJdbcForCountPerSecond
>> >> /data/warehouse/streaming/data-flow-1.0.jar
>> >> >
>> >> >
>> >> >
>> >> >
>> >> >
>> >> >
>> >> >
>> >> >
>> >> >2020-06-19 14:39:54,563 INFO
>> >>
>> org.apache.flink.shaded.curator.org.apache.curator.framework.state.ConnectionStateManager
>> >> - State change: CONNECTED
>> >> >2020-06-19 14:39:54,664 INFO
>> >>
>> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  -
>> >> Starting ZooKeeperLeaderRetrievalService /leader/rest_server_lock.
>> >> >2020-06-19 14:40:24,728 INFO
>> >>
>> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  -
>> >> Stopping ZooKeeperLeaderRetrievalService /leader/rest_server_lock.
>> >> >2020-06-19 14:40:24,729 INFO
>> >>
>> org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorFrameworkImpl
>> >> - backgroundOperationsLoop exiting
>> >> >2020-06-19 14:40:24,733 INFO
>> >> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper  -
>> >> Session: 0x272b776faca2414 closed
>> >> >2020-06-19 14:40:24,733 INFO
>> >> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  -
>> >> EventThread shut down for session: 0x272b776faca2414
>> >> >2020-06-19 14:40:24,734 ERROR org.apache.flink.client.cli.CliFrontend
>> >>- Error while running the command.
>> >> >org.apache.flink.client.program.ProgramInvocationException: The main
>> >> method caused an error: java.util.concurrent.ExecutionException:
>> >> org.apache.flink.runtime.client.JobSubmissionException: Failed to submit
>> >> JobGraph.
>> >> >at
>> >>
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
>> >> >at
>> >>
>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
>> >> >at
>> >> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
>> >> >at
>> >>
>> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664)
>> >> >at
>> >> org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
>> >> >at
>> >>
>> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895)
>> >> >at
>> >>
>> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
>> >> >at java.security.AccessController.doPrivileged(Native Method)
>> >> >at javax.security.auth.Subject.doAs(Subject.java:422)
>> >> >at
>> >>
>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875)
>> >> >at
>> >>
>> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>> >> >at
>> >> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
>> >> >Caused by: java.lang.RuntimeException:
>> >> java.util.concurrent.ExecutionException:
>> >> org.apache.flink.runtime.client.JobSubmissionException: Failed to submit
>> >> JobGraph.
>> >> >at
>> >> org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:199)
>> >> >at
>> >>
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1741)
>> >> >at
>> >>
>> org.apache.flink.streaming.api.environment.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:94)
>> >> >at
>> >>
>> org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:63)
>> >> >at
>> >>
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1620)
>> >> >at
>> >>
>> org.apache.flink.table.planner.delegation.StreamExecutor.execute(StreamExecutor.java:42)
>> >> >at
>> >>
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:643)
>> >> >at
>> >>
>> 

Re: Re: flink run from checkpoit failed

2020-06-19 文章 Rui Li
那得重启yarn session,再把作业提交上去

On Fri, Jun 19, 2020 at 6:22 PM Zhou Zach  wrote:

>
>
>
>
>
>
> 用yarn application kill flink job把yarn的application杀掉了,杀掉后yarn没有重启flink job
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-06-19 17:54:45,"Rui Li"  写道:
> >用yarn application kill flink job是说把yarn的application杀掉了吗?杀掉以后有没有重启呀
> >
> >On Fri, Jun 19, 2020 at 4:09 PM Zhou Zach  wrote:
> >
> >>
> >>
> >> 在flink-1.10.0/conf/flink-conf.yaml中加了下面两个超时参数,不起作用
> >> akka.client.timeout: 6
> >> akka.ask.timeout: 600
> >>
> >> 有大佬知道是什么原因吗
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >> 在 2020-06-19 14:57:05,"Zhou Zach"  写道:
> >> >
> >> >
> >> >
> >> >
> >> >用yarn application kill flink job后,
> >> >执行/opt/flink-1.10.0/bin/flink run -s
> >>
> /user/flink10/checkpoints/69e450574d8520ac5961e20a6fc4798a/chk-18/_metadata
> >> -d -c dataflow.sql.FromKafkaSinkJdbcForCountPerSecond
> >> /data/warehouse/streaming/data-flow-1.0.jar
> >> >
> >> >
> >> >
> >> >
> >> >
> >> >
> >> >
> >> >
> >> >2020-06-19 14:39:54,563 INFO
> >>
> org.apache.flink.shaded.curator.org.apache.curator.framework.state.ConnectionStateManager
> >> - State change: CONNECTED
> >> >2020-06-19 14:39:54,664 INFO
> >>
> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  -
> >> Starting ZooKeeperLeaderRetrievalService /leader/rest_server_lock.
> >> >2020-06-19 14:40:24,728 INFO
> >>
> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  -
> >> Stopping ZooKeeperLeaderRetrievalService /leader/rest_server_lock.
> >> >2020-06-19 14:40:24,729 INFO
> >>
> org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorFrameworkImpl
> >> - backgroundOperationsLoop exiting
> >> >2020-06-19 14:40:24,733 INFO
> >> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper  -
> >> Session: 0x272b776faca2414 closed
> >> >2020-06-19 14:40:24,733 INFO
> >> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  -
> >> EventThread shut down for session: 0x272b776faca2414
> >> >2020-06-19 14:40:24,734 ERROR org.apache.flink.client.cli.CliFrontend
> >>- Error while running the command.
> >> >org.apache.flink.client.program.ProgramInvocationException: The main
> >> method caused an error: java.util.concurrent.ExecutionException:
> >> org.apache.flink.runtime.client.JobSubmissionException: Failed to submit
> >> JobGraph.
> >> >at
> >>
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
> >> >at
> >>
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
> >> >at
> >> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
> >> >at
> >>
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664)
> >> >at
> >> org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
> >> >at
> >>
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895)
> >> >at
> >>
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
> >> >at java.security.AccessController.doPrivileged(Native Method)
> >> >at javax.security.auth.Subject.doAs(Subject.java:422)
> >> >at
> >>
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875)
> >> >at
> >>
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> >> >at
> >> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
> >> >Caused by: java.lang.RuntimeException:
> >> java.util.concurrent.ExecutionException:
> >> org.apache.flink.runtime.client.JobSubmissionException: Failed to submit
> >> JobGraph.
> >> >at
> >> org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:199)
> >> >at
> >>
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1741)
> >> >at
> >>
> org.apache.flink.streaming.api.environment.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:94)
> >> >at
> >>
> org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:63)
> >> >at
> >>
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1620)
> >> >at
> >>
> org.apache.flink.table.planner.delegation.StreamExecutor.execute(StreamExecutor.java:42)
> >> >at
> >>
> org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:643)
> >> >at
> >>
> cn.ibobei.qile.dataflow.sql.FromKafkaSinkJdbcForCountPerSecond$.main(FromKafkaSinkJdbcForCountPerSecond.scala:120)
> >> >at
> >>
> cn.ibobei.qile.dataflow.sql.FromKafkaSinkJdbcForCountPerSecond.main(FromKafkaSinkJdbcForCountPerSecond.scala)
> >> >at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 

Re: Flink SQL 1.10.0窗口计算结果无法sink

2020-06-19 文章 Leonard Xu
Hi,
pravega connector[1] 应该不是社区提供的,之前没看过这个connector的代码, 看你的描述,可以检查下写入时是否有一些参数需要设置。


祝好,
Leonard Xu
[1] https://github.com/pravega/flink-connectors 



> 在 2020年6月19日,13:31,王超 <1984chaow...@gmail.com> 写道:
> 
> 各位大神求帮忙看一下。
> 
> Flink 版本:1.10.0
> Planner:blink
> 
> 我在使用Flink SQL的时候遇到了一个问题,能否帮忙看一下,我尝试在寻找了解决方法,但是没有起作用。
> 比如我发现类似的问题
> https://www.mail-archive.com/user-zh@flink.apache.org/msg03916.html
> 中描述的问题,根据这个mail中的解决方法我设置了timezone,但是问题没有被解决。
> 
> 
> Flink Table Env配置
> *StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();*
> *env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);*
> *env.setParallelism(1);*
> *EnvironmentSettings envSetting =
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>  *
> *StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,
> envSetting);*
> *tableEnv.getConfig().setLocalTimeZone(ZoneId.of("Asia/Shanghai"));  *
> 
> 
> 我这个job应用中定义了两个table,分别为source table “sqlDdlAnaTable”
> 
> *String sqlDdlAnaTable = "CREATE TABLE ana_Source(type INT, datatime
> BIGINT, list ARRAY , ts AS
> TO_TIMESTAMP(FROM_UNIXTIME(datatime)), WATERMARK FOR ts AS ts - INTERVAL
> '5' SECOND)" +*
> *" WITH (" +*
> *"'connector.type' = 'pravega'," +*
> *"'connector.version' = '1'," +*
> *"'connector.connection-config.controller-uri'=
> 'tcp://192.168.188.130:9090 '," +*
> *"'connector.connection-config.default-scope' = 'Demo'," +*
> *"'connector.reader.stream-info.0.stream' = 'test'," +*
> *"'format.type' = 'json'," +*
> *"'format.fail-on-missing-field' = 'false', " +*
> *"'update-mode' = 'append')";*
> 
> 和sink table " sqlDdlSinkTable ".
> 
> * String sqlDdlSinkTable = "CREATE TABLE tb_sink" +*
> *"(id STRING, " +*
> *"wStart TIMESTAMP(3) , " +*
> *"v FLOAT)" +*
> *" WITH (" +*
> *"'connector.type' = 'pravega'," +*
> *"'connector.version' = '1'," +*
> *"'connector.connection-config.controller-uri'=
> 'tcp://192.168.188.130:9090 '," +*
> *"'connector.connection-config.default-scope' = 'Demo'," +*
> *"'connector.writer.stream' = 'result'," +*
> *"'connector.writer.routingkey-field-name' = 'id'," +*
> *"'connector.writer.mode' = 'atleast_once'," +*
> *"'format.type' = 'json'," +*
> *"'update-mode' = 'append')";*
> 
> 在数据处理逻辑比较简单,计算10s tumble window的vaule的平均。
> 我一开始直接打印结果能够明确看到10s中输出一次计算结果,watermark也正常移动。
> *String sqlAna = "SELECT ts, id, v " +*
> *"FROM tb_JsonRecord " +*
> *"WHERE q=1 AND type=1";*
> *Table tableAnaRecord = tableEnv.sqlQuery(sqlAna);*
> *tableEnv.registerTable("tb_AnaRecord", tableAnaRecord);*
> 
> *tableEnv.toAppendStream(tableAnaRecord, Row.class).print()*
> 
> 
> 
> 但是我尝试将结果insert到sink table中发现,就没有任何结果被写入。
> *String sqlAnaAvg = "INSERT INTO tb_sink(id, wStart, v) " +*
> *"SELECT id, " +*
> *"TUMBLE_START(ts, INTERVAL '10' SECOND) as wStart, "  +*
> *"AVG(v) FROM tb_AnaRecord " +*
> *"GROUP BY TUMBLE(ts, INTERVAL '10' SECOND), id";   *
> * tableEnv.sqlUpdate(sqlAnaAvg);*
> 
> 
> 提前感谢!
> 
> BR//Chao



Re: flink run from checkpoit failed

2020-06-19 文章 Rui Li
用yarn application kill flink job是说把yarn的application杀掉了吗?杀掉以后有没有重启呀

On Fri, Jun 19, 2020 at 4:09 PM Zhou Zach  wrote:

>
>
> 在flink-1.10.0/conf/flink-conf.yaml中加了下面两个超时参数,不起作用
> akka.client.timeout: 6
> akka.ask.timeout: 600
>
> 有大佬知道是什么原因吗
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-06-19 14:57:05,"Zhou Zach"  写道:
> >
> >
> >
> >
> >用yarn application kill flink job后,
> >执行/opt/flink-1.10.0/bin/flink run -s
> /user/flink10/checkpoints/69e450574d8520ac5961e20a6fc4798a/chk-18/_metadata
> -d -c dataflow.sql.FromKafkaSinkJdbcForCountPerSecond
> /data/warehouse/streaming/data-flow-1.0.jar
> >
> >
> >
> >
> >
> >
> >
> >
> >2020-06-19 14:39:54,563 INFO
> org.apache.flink.shaded.curator.org.apache.curator.framework.state.ConnectionStateManager
> - State change: CONNECTED
> >2020-06-19 14:39:54,664 INFO
> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  -
> Starting ZooKeeperLeaderRetrievalService /leader/rest_server_lock.
> >2020-06-19 14:40:24,728 INFO
> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  -
> Stopping ZooKeeperLeaderRetrievalService /leader/rest_server_lock.
> >2020-06-19 14:40:24,729 INFO
> org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorFrameworkImpl
> - backgroundOperationsLoop exiting
> >2020-06-19 14:40:24,733 INFO
> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper  -
> Session: 0x272b776faca2414 closed
> >2020-06-19 14:40:24,733 INFO
> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  -
> EventThread shut down for session: 0x272b776faca2414
> >2020-06-19 14:40:24,734 ERROR org.apache.flink.client.cli.CliFrontend
>- Error while running the command.
> >org.apache.flink.client.program.ProgramInvocationException: The main
> method caused an error: java.util.concurrent.ExecutionException:
> org.apache.flink.runtime.client.JobSubmissionException: Failed to submit
> JobGraph.
> >at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
> >at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
> >at
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
> >at
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664)
> >at
> org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
> >at
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895)
> >at
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
> >at java.security.AccessController.doPrivileged(Native Method)
> >at javax.security.auth.Subject.doAs(Subject.java:422)
> >at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875)
> >at
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> >at
> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
> >Caused by: java.lang.RuntimeException:
> java.util.concurrent.ExecutionException:
> org.apache.flink.runtime.client.JobSubmissionException: Failed to submit
> JobGraph.
> >at
> org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:199)
> >at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1741)
> >at
> org.apache.flink.streaming.api.environment.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:94)
> >at
> org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:63)
> >at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1620)
> >at
> org.apache.flink.table.planner.delegation.StreamExecutor.execute(StreamExecutor.java:42)
> >at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:643)
> >at
> cn.ibobei.qile.dataflow.sql.FromKafkaSinkJdbcForCountPerSecond$.main(FromKafkaSinkJdbcForCountPerSecond.scala:120)
> >at
> cn.ibobei.qile.dataflow.sql.FromKafkaSinkJdbcForCountPerSecond.main(FromKafkaSinkJdbcForCountPerSecond.scala)
> >at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> >at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> >at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> >at java.lang.reflect.Method.invoke(Method.java:498)
> >at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321)
> >... 11 more
> >Caused by: java.util.concurrent.ExecutionException:
> org.apache.flink.runtime.client.JobSubmissionException: Failed to submit
> JobGraph.
> >at
> 

Re: Re: 回复: 关于拓展 Tuple元组的问题

2020-06-19 文章 刘大龙
特别香!


> -原始邮件-
> 发件人: "Jark Wu" 
> 发送时间: 2020-06-19 15:46:42 (星期五)
> 收件人: user-zh 
> 抄送: 
> 主题: Re: 回复: 关于拓展 Tuple元组的问题
> 
> 用 Row 啊,支持 Int.MAX 个元素,还能支持 null 值,不香么?
> 
> 
> On Fri, 19 Jun 2020 at 15:42, Weixubin <18925434...@163.com> wrote:
> 
> > 感谢你的回答,请问可否举一个参照例子?
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> > 在 2020-06-19 15:31:53,"wangweigu...@stevegame.cn" <
> > wangweigu...@stevegame.cn> 写道:
> > >
> > >   多个值组合在一起,当一个复合值使用!
> > >
> > >
> > >
> > >
> > >发件人: 魏旭斌
> > >发送时间: 2020-06-19 15:01
> > >收件人: user-zh
> > >主题: 关于拓展 Tuple元组的问题
> > >目前Flink 提供了Tuple1 ~ Tuple25,在实际开发中不能满足我们的需求,我们希望能有更大的元组,比如构建一个Tuple50。
> > 请问有什么解决的方案? 谢谢
> >




Re:Re: 回复: 关于拓展 Tuple元组的问题

2020-06-19 文章 Weixubin
感谢,这边做了一下简单测试,已解决~   真香




//Demo
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.types.Row;


public class ParseUriRow extends TableFunction {

public void eval(String data) {
//逻辑处理
}

@Override
public TypeInformation getResultType() {
return Types.ROW(
Types.STRING,
Types.STRING,
Types.STRING,
Types.STRING,
Types.STRING,
Types.STRING,
Types.STRING,
Types.STRING,
Types.STRING,
Types.STRING,
Types.STRING,
Types.STRING,
Types.STRING,
Types.STRING,
Types.STRING,
Types.STRING,
Types.STRING,
Types.STRING,
Types.STRING,
Types.STRING,
Types.STRING,
Types.STRING,
Types.STRING,
Types.STRING,
Types.STRING,
Types.STRING
);
}
}
 


















在 2020-06-19 15:46:42,"Jark Wu"  写道:
>用 Row 啊,支持 Int.MAX 个元素,还能支持 null 值,不香么?
>
>
>On Fri, 19 Jun 2020 at 15:42, Weixubin <18925434...@163.com> wrote:
>
>> 感谢你的回答,请问可否举一个参照例子?
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> 在 2020-06-19 15:31:53,"wangweigu...@stevegame.cn" <
>> wangweigu...@stevegame.cn> 写道:
>> >
>> >   多个值组合在一起,当一个复合值使用!
>> >
>> >
>> >
>> >
>> >发件人: 魏旭斌
>> >发送时间: 2020-06-19 15:01
>> >收件人: user-zh
>> >主题: 关于拓展 Tuple元组的问题
>> >目前Flink 提供了Tuple1 ~ Tuple25,在实际开发中不能满足我们的需求,我们希望能有更大的元组,比如构建一个Tuple50。
>> 请问有什么解决的方案? 谢谢
>>


Re:flink run from checkpoit failed

2020-06-19 文章 Zhou Zach


在flink-1.10.0/conf/flink-conf.yaml中加了下面两个超时参数,不起作用
akka.client.timeout: 6
akka.ask.timeout: 600

有大佬知道是什么原因吗














在 2020-06-19 14:57:05,"Zhou Zach"  写道:
>
>
>
>
>用yarn application kill flink job后,
>执行/opt/flink-1.10.0/bin/flink run -s 
>/user/flink10/checkpoints/69e450574d8520ac5961e20a6fc4798a/chk-18/_metadata -d 
>-c dataflow.sql.FromKafkaSinkJdbcForCountPerSecond  
>/data/warehouse/streaming/data-flow-1.0.jar
>
>
>
>
>
>
>
>
>2020-06-19 14:39:54,563 INFO  
>org.apache.flink.shaded.curator.org.apache.curator.framework.state.ConnectionStateManager
>  - State change: CONNECTED
>2020-06-19 14:39:54,664 INFO  
>org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - 
>Starting ZooKeeperLeaderRetrievalService /leader/rest_server_lock.
>2020-06-19 14:40:24,728 INFO  
>org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - 
>Stopping ZooKeeperLeaderRetrievalService /leader/rest_server_lock.
>2020-06-19 14:40:24,729 INFO  
>org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorFrameworkImpl
>  - backgroundOperationsLoop exiting
>2020-06-19 14:40:24,733 INFO  
>org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper  - Session: 
>0x272b776faca2414 closed
>2020-06-19 14:40:24,733 INFO  
>org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - 
>EventThread shut down for session: 0x272b776faca2414
>2020-06-19 14:40:24,734 ERROR org.apache.flink.client.cli.CliFrontend  
> - Error while running the command.
>org.apache.flink.client.program.ProgramInvocationException: The main method 
>caused an error: java.util.concurrent.ExecutionException: 
>org.apache.flink.runtime.client.JobSubmissionException: Failed to submit 
>JobGraph.
>at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
>at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
>at 
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
>at 
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664)
>at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
>at 
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895)
>at 
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
>at java.security.AccessController.doPrivileged(Native Method)
>at javax.security.auth.Subject.doAs(Subject.java:422)
>at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875)
>at 
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
>Caused by: java.lang.RuntimeException: 
>java.util.concurrent.ExecutionException: 
>org.apache.flink.runtime.client.JobSubmissionException: Failed to submit 
>JobGraph.
>at 
> org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:199)
>at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1741)
>at 
> org.apache.flink.streaming.api.environment.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:94)
>at 
> org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:63)
>at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1620)
>at 
> org.apache.flink.table.planner.delegation.StreamExecutor.execute(StreamExecutor.java:42)
>at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:643)
>at 
> cn.ibobei.qile.dataflow.sql.FromKafkaSinkJdbcForCountPerSecond$.main(FromKafkaSinkJdbcForCountPerSecond.scala:120)
>at 
> cn.ibobei.qile.dataflow.sql.FromKafkaSinkJdbcForCountPerSecond.main(FromKafkaSinkJdbcForCountPerSecond.scala)
>at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>at java.lang.reflect.Method.invoke(Method.java:498)
>at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321)
>... 11 more
>Caused by: java.util.concurrent.ExecutionException: 
>org.apache.flink.runtime.client.JobSubmissionException: Failed to submit 
>JobGraph.
>at 
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
>at 
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
>at 
> 

Re: 回复: 关于拓展 Tuple元组的问题

2020-06-19 文章 Jark Wu
用 Row 啊,支持 Int.MAX 个元素,还能支持 null 值,不香么?


On Fri, 19 Jun 2020 at 15:42, Weixubin <18925434...@163.com> wrote:

> 感谢你的回答,请问可否举一个参照例子?
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-06-19 15:31:53,"wangweigu...@stevegame.cn" <
> wangweigu...@stevegame.cn> 写道:
> >
> >   多个值组合在一起,当一个复合值使用!
> >
> >
> >
> >
> >发件人: 魏旭斌
> >发送时间: 2020-06-19 15:01
> >收件人: user-zh
> >主题: 关于拓展 Tuple元组的问题
> >目前Flink 提供了Tuple1 ~ Tuple25,在实际开发中不能满足我们的需求,我们希望能有更大的元组,比如构建一个Tuple50。
> 请问有什么解决的方案? 谢谢
>


Re: Re: 对于维表频繁更新,状态越来越大的场景如何保证数据的准确性

2020-06-19 文章 Jark Wu
Flink SQL 没有支持 localcache (或者说 keyby), 这个需要改源码。
以及内置支持的 JDBC, HBase 维表都不支持 async lookup, 这个需要用户自己开发 JDBC, HBase async
lookup。

或者用户也可以自己通过 DataStream API 实现上述这些功能。

keyby+localcache+异步IO 只是说解决这个问题的一个思路。

Best,
Jark

On Fri, 19 Jun 2020 at 11:11, wangweigu...@stevegame.cn <
wangweigu...@stevegame.cn> wrote:

>
>   可以通过异步的方式(RichAsyncFunction)进行维表关联操作,异步多线程方式进行维表读取!
>
>
>
>
> 发件人: Jim Chen
> 发送时间: 2020-06-19 10:34
> 收件人: user-zh
> 主题: Re: 对于维表频繁更新,状态越来越大的场景如何保证数据的准确性
> 请问下,在flink sql1.10中, localcache+异步IO,这个方案,是直接写sql关联维表就行了吗?flink
> sql会自动在底层做优化工作吗?如果要自己手动实现的话,有没有什么demo呢?谢谢
>
> Jark Wu  于2020年6月17日周三 上午12:11写道:
>
> > 如果更新非常频繁,又要保证关联的准确性,又要保证吞吐,那么最佳的解决方案我觉得只能是关联 changelog 了,
> > 只是 Flink 目前还没有原生支持维表关联一个 changelog,会在Flink SQL 1.12中去支持。
> >
> > 当前版本下的话,可以尝试 keyby+localcache+异步IO。
> >
> > Best,
> > Jark
> >
> > On Tue, 16 Jun 2020 at 22:35, 李奇 <359502...@qq.com> wrote:
> >
> > > 或者采用redis做维表存储介质。
> > >
> > > > 在 2020年6月16日,下午10:10,wangxiangyan  写道:
> > > >
> > > > hi,大家
> > > > 维表被频繁更新,数据量1g左右,需要频繁同步,使用什么方案去关联比较好呢?
> > >
> >
>


Re:回复: 关于拓展 Tuple元组的问题

2020-06-19 文章 Weixubin
感谢你的回答,请问可否举一个参照例子?

















在 2020-06-19 15:31:53,"wangweigu...@stevegame.cn"  
写道:
>
>   多个值组合在一起,当一个复合值使用!
>
>
>
> 
>发件人: 魏旭斌
>发送时间: 2020-06-19 15:01
>收件人: user-zh
>主题: 关于拓展 Tuple元组的问题
>目前Flink 提供了Tuple1 ~ Tuple25,在实际开发中不能满足我们的需求,我们希望能有更大的元组,比如构建一个Tuple50。 
>请问有什么解决的方案? 谢谢 


回复: 关于拓展 Tuple元组的问题

2020-06-19 文章 wangweigu...@stevegame.cn

   多个值组合在一起,当一个复合值使用!



 
发件人: 魏旭斌
发送时间: 2020-06-19 15:01
收件人: user-zh
主题: 关于拓展 Tuple元组的问题
目前Flink 提供了Tuple1 ~ Tuple25,在实际开发中不能满足我们的需求,我们希望能有更大的元组,比如构建一个Tuple50。 
请问有什么解决的方案? 谢谢 


flink job自动checkpoint是成功,手动checkpoint失败

2020-06-19 文章 Zhou Zach




2020-06-19 15:11:18,361 INFO  org.apache.flink.client.cli.CliFrontend   
- Triggering savepoint for job e229c76e6a1b43142cb4272523102ed1.
2020-06-19 15:11:18,378 INFO  org.apache.flink.client.cli.CliFrontend   
- Waiting for response...
2020-06-19 15:11:48,381 INFO  
org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - 
Stopping ZooKeeperLeaderRetrievalService /leader/rest_server_lock.
2020-06-19 15:11:48,382 INFO  
org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorFrameworkImpl
  - backgroundOperationsLoop exiting
2020-06-19 15:11:48,385 INFO  
org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper  - Session: 
0x172b776fac82479 closed
2020-06-19 15:11:48,385 INFO  
org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - 
EventThread shut down for session: 0x172b776fac82479
2020-06-19 15:11:48,385 ERROR org.apache.flink.client.cli.CliFrontend   
- Error while running the command.
org.apache.flink.util.FlinkException: Triggering a savepoint for the job 
e229c76e6a1b43142cb4272523102ed1 failed.
at 
org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:633)
at 
org.apache.flink.client.cli.CliFrontend.lambda$savepoint$9(CliFrontend.java:611)
at 
org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:843)
at 
org.apache.flink.client.cli.CliFrontend.savepoint(CliFrontend.java:608)
at 
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:910)
at 
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875)
at 
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
Caused by: java.util.concurrent.TimeoutException
at 
org.apache.flink.runtime.concurrent.FutureUtils$Timeout.run(FutureUtils.java:999)
at 
org.apache.flink.runtime.concurrent.DirectExecutorService.execute(DirectExecutorService.java:211)
at 
org.apache.flink.runtime.concurrent.FutureUtils.lambda$orTimeout$14(FutureUtils.java:427)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

Re: Flink 多Sink 数据一致性保证

2020-06-19 文章 Yun Gao
Hello,


我理解多个sink的情况下,数据保证写入仍然发生在数据写入后的checkpoint完成,如果写入Hbase这边写入失败的时候会触发failover的话,按二阶段提交的逻辑,这种情况下kafka这边的事务也会被abort掉,数据不会真正写入。


 --Original Mail --
Sender:xueaohui_...@163.com 
Send Date:Thu Jun 18 19:39:46 2020
Recipients:user-zh 
Subject:Flink 多Sink 数据一致性保证



如上图所示,目前通过把作业加入多个sink,这种场景下面当hbase写入的失败的时候,不影响kakfa的写入。期望hbase写入失败,kafka也不发送。
如何保证hbase和kafka的写入为原子性呢?
不知道flink是否有多sink的二阶段提交方案。



xueaohui_...@163.com

关于拓展 Tuple元组的问题

2020-06-19 文章 魏旭斌
目前Flink 提供了Tuple1 ~ Tuple25,在实际开发中不能满足我们的需求,我们希望能有更大的元组,比如构建一个Tuple50。 
请问有什么解决的方案? 谢谢 

flink run from checkpoit failed

2020-06-19 文章 Zhou Zach




用yarn application kill flink job后,
执行/opt/flink-1.10.0/bin/flink run -s 
/user/flink10/checkpoints/69e450574d8520ac5961e20a6fc4798a/chk-18/_metadata -d 
-c dataflow.sql.FromKafkaSinkJdbcForCountPerSecond  
/data/warehouse/streaming/data-flow-1.0.jar








2020-06-19 14:39:54,563 INFO  
org.apache.flink.shaded.curator.org.apache.curator.framework.state.ConnectionStateManager
  - State change: CONNECTED
2020-06-19 14:39:54,664 INFO  
org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - 
Starting ZooKeeperLeaderRetrievalService /leader/rest_server_lock.
2020-06-19 14:40:24,728 INFO  
org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - 
Stopping ZooKeeperLeaderRetrievalService /leader/rest_server_lock.
2020-06-19 14:40:24,729 INFO  
org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorFrameworkImpl
  - backgroundOperationsLoop exiting
2020-06-19 14:40:24,733 INFO  
org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper  - Session: 
0x272b776faca2414 closed
2020-06-19 14:40:24,733 INFO  
org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - 
EventThread shut down for session: 0x272b776faca2414
2020-06-19 14:40:24,734 ERROR org.apache.flink.client.cli.CliFrontend   
- Error while running the command.
org.apache.flink.client.program.ProgramInvocationException: The main method 
caused an error: java.util.concurrent.ExecutionException: 
org.apache.flink.runtime.client.JobSubmissionException: Failed to submit 
JobGraph.
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
at 
org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
at 
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
at 
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895)
at 
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875)
at 
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
Caused by: java.lang.RuntimeException: java.util.concurrent.ExecutionException: 
org.apache.flink.runtime.client.JobSubmissionException: Failed to submit 
JobGraph.
at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:199)
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1741)
at 
org.apache.flink.streaming.api.environment.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:94)
at 
org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:63)
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1620)
at 
org.apache.flink.table.planner.delegation.StreamExecutor.execute(StreamExecutor.java:42)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:643)
at 
cn.ibobei.qile.dataflow.sql.FromKafkaSinkJdbcForCountPerSecond$.main(FromKafkaSinkJdbcForCountPerSecond.scala:120)
at 
cn.ibobei.qile.dataflow.sql.FromKafkaSinkJdbcForCountPerSecond.main(FromKafkaSinkJdbcForCountPerSecond.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321)
... 11 more
Caused by: java.util.concurrent.ExecutionException: 
org.apache.flink.runtime.client.JobSubmissionException: Failed to submit 
JobGraph.
at 
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at 
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1736)
... 23 more
Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to 
submit JobGraph.
at 
org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$7(RestClusterClient.java:359)
at