Jobmanager重启,cannot set up a jobmanager

2020-06-22 文章 绘梦飘雪
jobmanager重启时会org.apache.flink.runtime.client.jobexecutionexception could not 
set up jobmanager
cannot set up the user code libraries file does not exist 
/flink/recovery/appid/blob/job***
到hdfs上看,果然对应文件不存在,请问这个是什么原因造成的

Re: FlinkSQL 是否支持类似临时中间表的概念

2020-06-22 文章 Leonard Xu
Hi,
是的,这个是在 FLINK 1.11 的版本里支持的,当前 FLINK 1.11代码已经冻结,正在测试中,最近应该快发布了,所以master 
分支上的版本号为1.12-SNAPSHOT
,等1.11版本发布了就可以看到对应的文档。

回到这个问题,flink sql(blink planner) 支持 multiple-sinks 的, 在 1.10 版本也可以一个作业搞定。 把 `  
select * from sourceTable , LATERAL TABLE(ParseUriRow(request_uri)) as T( )….`  
这段 sql insert 到中间结果表 和 group后再写入最终结果表就可以了。效果和使用 VIEW 应该类似,因为 planner 会做分段优化。
另外建议使用1.10.1版本,1.10.1在1.10的基础上修复了不少bug,也可以等1.11发布了试用下1.11。


祝好,
Leonard Xu

Flink JOB_MANAGER_LEADER_PATH Znode的清理时机

2020-06-22 文章 林恬
各位好:
  目前我使用的是Flink 1.9.2, 使用过程中发现ZK上的/leader/${job_id} 
节点即使作业被Cancel了也不会被清理,导致运行久了之后,/leader/下有大量job_id的空ZNode,请问这块清理时机是什么时候呢?




Re:??????flinksql????hbase??????????

2020-06-22 文章 Roc Marshal
MuChen1.??Hbase??zk??"org.apache.flink.shaded.curator.org.apache.curator.ConnectionStatenbsp;
 - Authentication failed JobManager Web Interface: 
http://uhadoop-op3raf-core24:42976 "2.Hbase"Caused by: 
org.apache.flink.runtime.client.JobExecutionException: Cannot initialize task 
'Source: HBaseTableSource[schema=[key, cf1], projectFields=[0]] ; 
SourceConversion(table=[default_catalog.default_database.hbase_video_pic_title_q70,
 source: [HBaseTableSource[schema=[key, cf1], projectFields=[0, 
fields=[key]) ; SinkConversionToTuple2 ; Sink: SQL Client Stream Collect Sink': 
Configuring the input format (null) failed: Cannot create connection to 
HBase."??HBASEHbaseHbase.Best,Roc
 Marshal.
?? 2020-06-23 11:05:43??"MuChen" <9329...@qq.com> ??
>Hi,Roc Marshal:
>
>
>
>Best,
>MuChen.
>
>
>
>
>----
>??:"Roc Marshal":2020??6??23??(??) 10:27
>??:"user-zh"
>:Re:flinksqlhbase??
>
>
>
>MuChen 
>Sourcezk Marshal.
>?? 2020-06-23 10:17:35??"MuChen" <9329...@qq.com ??
>Hi, All:
>
>
>??flinksqlhbase
>
>
>
>
>
>
>hadoop??masterflink??
>
>yarn-session:
>bin/yarn-session.sh -jm 1g -tm 4g -s 4 -qu root.flink -nm fsql-cli 
>2gt;amp;1 amp; # ?? 
>[admin@uhadoop-op3raf-master2 flink10]$ 2020-06-23 09:30:56,402 ERROR 
>org.apache.flink.shaded.curator.org.apache.curator.ConnectionState - 
>Authentication failed 2020-06-23 09:30:56,515 ERROR 
>org.apache.flink.shaded.curator.org.apache.curator.ConnectionState - 
>Authentication failed JobManager Web Interface: 
>http://uhadoop-op3raf-core24:42976 
>sql-client:
>bin/sql-client.sh embedded 
>hbaseflinksql??
># CREATE TABLE hbase_video_pic_title_q70 ( key 
>string, cf1 ROW( 'connector.type' = 'hbase', 'connector.version' = 
>'1.4.3', 'connector.table-name' = 
>'hbase_video_pic_title_q70', 'connector.zookeeper.quorum' = 
>'uhadoop-op3raf-master1:2181,uhadoop-op3raf-master2:2181,uhadoop-op3raf-core1:2181',
> 'connector.zookeeper.znode.parent' = '/hbase', 
>'connector.write.buffer-flush.max-size' = '10mb', 
>'connector.write.buffer-flush.max-rows' = '1000', 
>'connector.write.buffer-flush.interval' = '2s' ); 
>??
>select key from hbase_video_pic_title_q70; 
>??HBase
>[ERROR] Could not execute SQL statement. Reason: 
>org.apache.flink.runtime.rest.util.RestClientException: [Internal server 
>error., org.apache.flink.runtime.client.JobSubmissionException: Failed to submit 
>job. at 
>org.apache.flink.runtime.dispatcher.Dispatcher.lambda$internalSubmitJob$3(Dispatcher.java:336)
> at 
>java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822)
> at 
>java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797)
> at 
>java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
> at 
>akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
> at 
>akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
> at 
>akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at 
>akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at 
>akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at 
>akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 
>Caused by: java.lang.RuntimeException: 
>org.apache.flink.runtime.client.JobExecutionException: Could not set up 
>JobManager at 
>org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:36)
> at 
>java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
> ... 6 more Caused by: org.apache.flink.runtime.client.JobExecutionException: 
>Could not set up JobManager at 
>org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl. at 
>org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner(DefaultJobManagerRunnerFactory.java:84)
> at 
>org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$6(Dispatcher.java:379)
> at 
>org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:34)
> ... 7 more Caused by: org.apache.flink.runtime.client.JobExecutionException: 
>Cannot initialize task 'Source: HBaseTableSource[schema=[key, cf1], 
>projectFields=[0]] -gt; 
>SourceConversion(table=[default_catalog.default_database.hbase_video_pic_title_q70,
> source: [HBaseTableSource[schema=[key, cf1], projectFields=[0, 
>fields=[key]) -gt; 

Re:Re: FlinkSQL 是否支持类似临时中间表的概念

2020-06-22 文章 Weixubin



感谢,我查阅了下资料后发现CREATE VIEW这个语法是在Flink.1.12有提及而1.10版本没有 ,1.12版本暂未发行, 
而我目前使用的版本是1.10版本。
而且很奇怪,我并没有找到1.11版本的文档














在 2020-06-23 10:58:25,"Leonard Xu"  写道:
>Hi,
>
>> 在 2020年6月23日,10:49,Weixubin <18925434...@163.com> 写道:
>> 
>> //这个时候我希望能够创建一张临时中间表  tempTable用来存放 对Source表中数据拆分为多列后的结果,类似于下面这样(Flink 
>> 并不支持这么做)
>
>
>看着描述应该是源数据中的一行拆成多行。这个需求是不是用 VIEW 就可以了[1]?Flink SQL 支持 VIEW 语法的[1]。
>
>Best,
>Leonard Xu 
>
>[1] 
>https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/table/sql/create.html#create-view
> 
>


??????flinksql????hbase??????????

2020-06-22 文章 MuChen
Hi,Roc Marshal:



Best,
MuChen.




----
??:"Roc Marshal"http://uhadoop-op3raf-core24:42976 
sql-client:
bin/sql-client.sh embedded 
hbaseflinksql??
# CREATE TABLE hbase_video_pic_title_q70 ( key 
string, cf1 ROW

Re: FlinkSQL 是否支持类似临时中间表的概念

2020-06-22 文章 Leonard Xu
Hi,

> 在 2020年6月23日,10:49,Weixubin <18925434...@163.com> 写道:
> 
> //这个时候我希望能够创建一张临时中间表  tempTable用来存放 对Source表中数据拆分为多列后的结果,类似于下面这样(Flink 
> 并不支持这么做)


看着描述应该是源数据中的一行拆成多行。这个需求是不是用 VIEW 就可以了[1]?Flink SQL 支持 VIEW 语法的[1]。

Best,
Leonard Xu 

[1] 
https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/table/sql/create.html#create-view
 


FlinkSQL 是否支持类似临时中间表的概念

2020-06-22 文章 Weixubin
Hi,
我希望通过FlinkSQL的方式在一个Job中完成两步的操作,但似乎办不到,情况大致如下:


eg.有一个ETL过程,需要从Source获取数据--将每条数据拆分为一条多列数据--对拆分完的数据开窗聚合--输出到sink。
//从Source获取数据
CREATE TABLE sourceTable (
  request_uri STRING
) WITH (
   ..
);


//这个时候我希望能够创建一张临时中间表  tempTable用来存放 对Source表中数据拆分为多列后的结果,类似于下面这样(Flink 并不支持这么做)
CREATE TABLE tempTable (
  row1 STRING,
  row2 STRING,
) 
Insert into tempTable   select * from sourceTable , LATERAL 
TABLE(ParseUriRow(request_uri)) as T( )


//最后从 tempTable 表中获取数据并进行开窗做聚合操作
CREATE TABLE sinkTable (
  row1 STRING,
) 
INSERT INTO sinkTable SELECT .., SUM(unit) AS unitSum from tempTable GROUP BY 
TUMBLE(rowtime,INTERVAL '30' SECOND), ...


以上是大致情况描述,我希望能在一个Job中一次性完成以上数据处理,而不分成两个Job,不知是否有好的解决方案?
Thank!



Re:flinksql????hbase??????????

2020-06-22 文章 Roc Marshal
MuChen??HBase??zk??meta??Flink??Hbase
 
Sourcezk??Best,Roc
 Marshal.
?? 2020-06-23 10:17:35??"MuChen" <9329...@qq.com> ??
>Hi, All:
>
>
>??flinksqlhbase
>
>
>
>
>
>
>hadoop??masterflink??
>
>yarn-session:
>bin/yarn-session.sh -jm 1g -tm 4g -s 4 -qu root.flink -nm fsql-cli 21 
> # ?? [admin@uhadoop-op3raf-master2 
>flink10]$ 2020-06-23 09:30:56,402 ERROR 
>org.apache.flink.shaded.curator.org.apache.curator.ConnectionState  - 
>Authentication failed 2020-06-23 09:30:56,515 ERROR 
>org.apache.flink.shaded.curator.org.apache.curator.ConnectionState  - 
>Authentication failed JobManager Web Interface: 
>http://uhadoop-op3raf-core24:42976 
>sql-client:
>bin/sql-client.sh embedded 
>hbaseflinksql??
>#  CREATE TABLE hbase_video_pic_title_q70 (   key string,   cf1 ROWstring, q70 string ) WITH (   'connector.type' = 'hbase',   
>'connector.version' = '1.4.3',   'connector.table-name' = 
>'hbase_video_pic_title_q70',   'connector.zookeeper.quorum' = 
>'uhadoop-op3raf-master1:2181,uhadoop-op3raf-master2:2181,uhadoop-op3raf-core1:2181',
>   'connector.zookeeper.znode.parent' = '/hbase',   
>'connector.write.buffer-flush.max-size' = '10mb',   
>'connector.write.buffer-flush.max-rows' = '1000',
>'connector.write.buffer-flush.interval' = '2s' ); 
>??
>select key from hbase_video_pic_title_q70; 
>??HBase
>[ERROR] Could not execute SQL statement. Reason: 
>org.apache.flink.runtime.rest.util.RestClientException: [Internal server 
>error., org.apache.flink.runtime.client.JobSubmissionException: Failed to submit job.  
>   at 
>org.apache.flink.runtime.dispatcher.Dispatcher.lambda$internalSubmitJob$3(Dispatcher.java:336)
> at 
>java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822)   
>  at 
>java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797)
> at 
>java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)  
>   at 
>akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)  
>   at 
>akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)  
>   at 
>akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
>at 
>akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 
>Caused by: java.lang.RuntimeException: 
>org.apache.flink.runtime.client.JobExecutionException: Could not set up 
>JobManager at 
>org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:36)
> at 
>java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
> ... 6 more Caused by: 
>org.apache.flink.runtime.client.JobExecutionException: Could not set up 
>JobManager at 
>org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl. at 
>org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner(DefaultJobManagerRunnerFactory.java:84)
> at 
>org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$6(Dispatcher.java:379)
> at 
>org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:34)
> ... 7 more Caused by: 
>org.apache.flink.runtime.client.JobExecutionException: Cannot initialize task 
>'Source: HBaseTableSource[schema=[key, cf1], projectFields=[0]] - 
>SourceConversion(table=[default_catalog.default_database.hbase_video_pic_title_q70,
> source: [HBaseTableSource[schema=[key, cf1], projectFields=[0, 
>fields=[key]) - SinkConversionToTuple2 - Sink: SQL Client Stream 
>Collect Sink': Configuring the input format (null) failed: Cannot create 
>connection to HBase. at 
>org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:216)
> at 
>org.apache.flink.runtime.scheduler.SchedulerBase.createExecutionGraph(SchedulerBase.java:255)
> at 
>org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:227)
> at 
>org.apache.flink.runtime.scheduler.SchedulerBase. at 
>org.apache.flink.runtime.scheduler.DefaultScheduler. at 
>org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:105)
> at 
>org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:278)
> at 

flinksql????hbase??????????

2020-06-22 文章 MuChen
Hi, All:


??flinksqlhbase






hadoop??masterflink??

yarn-session:
bin/yarn-session.sh -jm 1g -tm 4g -s 4 -qu root.flink -nm fsql-cli 21 
 # ?? [admin@uhadoop-op3raf-master2 
flink10]$ 2020-06-23 09:30:56,402 ERROR 
org.apache.flink.shaded.curator.org.apache.curator.ConnectionState  - 
Authentication failed 2020-06-23 09:30:56,515 ERROR 
org.apache.flink.shaded.curator.org.apache.curator.ConnectionState  - 
Authentication failed JobManager Web Interface: 
http://uhadoop-op3raf-core24:42976 
sql-client:
bin/sql-client.sh embedded 
hbaseflinksql??
#  CREATE TABLE hbase_video_pic_title_q70 (   key string,   cf1 ROW

Re:flink 通过 --classpath 传入https-xxx.xxx.jar 在1.8上正常运行 在flink1.10 就会报传入的jar包找不到

2020-06-22 文章 Weixubin
和版本应该没什么关系。如果是多节点部署的情况下,-C 所指定的URL 需要各个节点都能访问得到。 确认下该URL能被所有节点访问到吗 
 Best,
 Bin














At 2020-06-22 11:43:11, "程龙" <13162790...@163.com> wrote:
>2020-06-22 10:16:34,379 INFO  
>org.apache.flink.runtime.executiongraph.ExecutionGraph- Sink: Unnamed 
>(6/6) (5dd98ec92e0d5e53597cb7520643c7f5) switched from SCHEDULED to DEPLOYING.
>2020-06-22 10:16:34,379 INFO  
>org.apache.flink.runtime.executiongraph.ExecutionGraph- Deploying 
>Sink: Unnamed (6/6) (attempt #0) to container_1590655249980_0081_01_02 @ 
>al-bj-bigdata-inf-research-flink04 (dataPort=34781)
>2020-06-22 10:16:34,456 INFO  
>org.apache.flink.runtime.executiongraph.ExecutionGraph- Source: Custom 
>Source (5/6) (aca298438b9eb6fcf295cb8af6eebcd8) switched from DEPLOYING to 
>RUNNING.
>2020-06-22 10:16:34,481 INFO  
>org.apache.flink.runtime.executiongraph.ExecutionGraph- Filter -> Map 
>(3/8) (0daed15d107c3031891f0c9e84093068) switched from DEPLOYING to RUNNING.
>2020-06-22 10:16:34,492 INFO  
>org.apache.flink.runtime.executiongraph.ExecutionGraph- Sink: Unnamed 
>(1/6) (44ca248aba351026452ba4abdb5f33a6) switched from DEPLOYING to RUNNING.
>2020-06-22 10:16:34,497 INFO  
>org.apache.flink.runtime.executiongraph.ExecutionGraph- Filter -> Map 
>(1/8) (70612735eb755269fe9590e8ab51d3e2) switched from DEPLOYING to RUNNING.
>2020-06-22 10:16:34,512 INFO  
>org.apache.flink.runtime.executiongraph.ExecutionGraph- Filter -> Map 
>(2/8) (baba331cd6bcde1f5a6024eac0b953b4) switched from DEPLOYING to RUNNING.
>2020-06-22 10:16:34,524 INFO  
>org.apache.flink.runtime.executiongraph.ExecutionGraph- Source: Custom 
>Source (4/6) (a34992362c2cf3634a29bd5a9c188754) switched from DEPLOYING to 
>RUNNING.
>2020-06-22 10:16:34,531 INFO  
>org.apache.flink.runtime.executiongraph.ExecutionGraph- Source: Custom 
>Source (2/6) (7683c15e3ebb3e718c2439c6e32f0d7d) switched from DEPLOYING to 
>RUNNING.
>2020-06-22 10:16:34,564 INFO  
>org.apache.flink.runtime.executiongraph.ExecutionGraph- Source: Custom 
>Source (6/6) (6ab1aaa5e1811c79c702197f984e9bb6) switched from DEPLOYING to 
>RUNNING.
>2020-06-22 10:16:34,609 INFO  
>org.apache.flink.runtime.executiongraph.ExecutionGraph- Source: Custom 
>Source (1/6) (6245b508b8e0494f06ef71c6ad4954b6) switched from DEPLOYING to 
>RUNNING.
>2020-06-22 10:16:34,616 INFO  
>org.apache.flink.runtime.executiongraph.ExecutionGraph- Filter -> Map 
>(4/8) (2ea049476f0b48fcd85dcd9084091e9f) switched from DEPLOYING to RUNNING.
>2020-06-22 10:16:34,650 INFO  
>org.apache.flink.runtime.executiongraph.ExecutionGraph- Sink: Unnamed 
>(2/6) (e785e44e4212dcc5279bcde761b28292) switched from DEPLOYING to RUNNING.
>2020-06-22 10:16:34,656 INFO  
>org.apache.flink.runtime.executiongraph.ExecutionGraph- Sink: Unnamed 
>(5/6) (97a5e72f731d14aa97d93253b71b6aeb) switched from DEPLOYING to RUNNING.
>2020-06-22 10:16:34,662 INFO  
>org.apache.flink.runtime.executiongraph.ExecutionGraph- Filter -> Map 
>(7/8) (59babdbc2d7bea362a2794a966fe59ef) switched from DEPLOYING to RUNNING.
>2020-06-22 10:16:34,664 INFO  
>org.apache.flink.runtime.executiongraph.ExecutionGraph- Source: Custom 
>Source (3/6) (aed33b78cc847561908ea43164e0311a) switched from DEPLOYING to 
>RUNNING.
>2020-06-22 10:16:34,669 INFO  
>org.apache.flink.runtime.executiongraph.ExecutionGraph- Filter -> Map 
>(6/8) (fd87d0111b10e9f9027068a72e9ce209) switched from DEPLOYING to RUNNING.
>2020-06-22 10:16:34,726 INFO  
>org.apache.flink.runtime.executiongraph.ExecutionGraph- Sink: Unnamed 
>(6/6) (5dd98ec92e0d5e53597cb7520643c7f5) switched from DEPLOYING to RUNNING.
>2020-06-22 10:16:34,729 INFO  
>org.apache.flink.runtime.executiongraph.ExecutionGraph- Filter -> Map 
>(8/8) (22b73fa7f7435a405e1102a3480c09c1) switched from DEPLOYING to RUNNING.
>2020-06-22 10:16:34,760 INFO  
>org.apache.flink.runtime.executiongraph.ExecutionGraph- Sink: Unnamed 
>(4/6) (4c677579ef44cf394618af38a75497da) switched from DEPLOYING to RUNNING.
>2020-06-22 10:16:37,081 INFO  
>org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering 
>checkpoint 1 @ 1592792197072 for job 1797e2f64b7b1caeb6356608290263cc.
>2020-06-22 10:16:45,065 INFO  
>org.apache.flink.runtime.executiongraph.ExecutionGraph- Sink: Unnamed 
>(3/6) (d9a9c913dcbf782bd933b0adae157b38) switched from DEPLOYING to RUNNING.
>2020-06-22 10:16:45,066 INFO  
>org.apache.flink.runtime.executiongraph.ExecutionGraph- Filter -> Map 
>(5/8) (39dbdd04e2066d1d93be1641c0ab7add) switched from DEPLOYING to RUNNING.
>2020-06-22 10:16:48,512 INFO  
>org.apache.flink.runtime.executiongraph.ExecutionGraph- Filter -> Map 
>(3/8) (0daed15d107c3031891f0c9e84093068) switched from RUNNING to FAILED on 
>org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot@25d7d2c5.
>java.lang.NoClassDefFoundError: core
>
>
>
>看日志是在部分task里面报错


?????? ??????savepoint????????????????????

2020-06-22 文章 claylin
1. 
savepoint??savepoint
2. ??window??tumble event time window
3. eventtimerecord




----
??:"Congxian Qiu"

Re: 作业从savepoint启动,状态不一致问题

2020-06-22 文章 Congxian Qiu
Hi
1 怎么对比得到“结果不一致”这个结论的呢?
2 是否有 window 算子呢,现在 checkpoint/savepoint 没有 记录 watermark 的。
3 你的 eventtime 是直接从 record 取的,还是计算出来的呢,如果是计算出来的话,重启是否影响计算逻辑得到的 eventtime 和
watermark 呢

Best,
Congxian


claylin <1012539...@qq.com> 于2020年6月22日周一 下午10:49写道:

> hi all我这边有个去重作业,从savepoint启动后,发现作业运行结果和之前的结果不一致,作业采用的是event
> time,不知道还有什么因素会导致状态不一样,求大佬支招


??????savepoint????????????????????

2020-06-22 文章 claylin
hi 
all??savepointevent
 time

Re: 请教 yarn-session.sh启动任务指定的应用名和队列无效

2020-06-22 文章 Yang Wang
目前Flink里面的实现确实是这样的,遇到不认识的option就会停下来
所以你去掉-n之后就好了

Best,
Yang

MuChen <9329...@qq.com> 于2020年6月22日周一 下午9:48写道:

> hi,Yang Wang:
>
>
>
> HDFS上面staging目录($HOME/.flink/application_id)的*flink-conf.yaml*文件中没有提交命令中指定的-qu和-nm,而且-jm和-tm也是默认值,并不是我参数中指定的值。
>
>
> 我把“-n”参数去掉后重新执行,所有指定参数全部生效了,应该是不支持的“-n”参数导致后面所有参数失效。
>
>
> 谢谢大佬的帮助
>
>
> Best,
> MuChen
>
>
> --原始邮件--
> 发件人:"Yang Wang" 发送时间:2020年6月22日(星期一) 晚上8:37
> 收件人:"user-zh"
> 主题:Re: 请教 yarn-session.sh启动任务指定的应用名和队列无效
>
>
>
>
> 你可以看一下HDFS上面staging目录($HOME/.flink/application_id)里面有client端生效的*flink-conf.yaml*
> 看看里面的"yarn.application.queue"和"yarn.application.name"是不是预期的值。
>
>
> 另外,-n参数很早就不支持了,这样设置也不会起作用的,都是动态申请TM的
>
>
> Best,
> Yang
>
> MuChen <9329...@qq.com 于2020年6月22日周一 下午7:32写道:
>
>  hi,all:
> 
> 
>  我有一个hadoop集群和一台机器A.
> 
> 
>  我在机器A上,使用bin/yarn-session.sh -n 5 -jm 1g -tm 4g -s 4 -qu root.flink
> -nm
>  fsql-cli gt; /dev/null 2gt;amp;1
> amp;命令,启动了一个yarn-session,
>  其中,指定了队列root.flink和应用名fsql-cli.
> 
> 
>  但是,在yarn控制台中看到应用的名称和队列跟指定的名称大多数情况下不一致(多次提交,偶尔一致),如图:
>  https://imgchr.com/i/NJIn4x
> 
> 
>  应用名为:Flink session cluster
>  队列名为:root.default
> 
> 
>  请问为什么我指定的队列名和应用名没有生效?
> 
> 
>  flink版本:1.10.0
>  flink-conf.yaml配置:
>  [fsql@10-42-63-116 conf]$ grep -v ^# flink-conf.yaml |grep -v ^$
>  jobmanager.rpc.address: localhost
>  jobmanager.rpc.port: 6123
>  jobmanager.heap.size: 1024m
>  taskmanager.memory.process.size: 2048m
>  taskmanager.numberOfTaskSlots: 10
>  parallelism.default: 1
>  jobmanager.execution.failover-strategy: region


?????? ???? yarn-session.sh??????????????????????????????

2020-06-22 文章 MuChen
hi,Yang Wang:


HDFSstaging($HOME/.flink/application_id)??*flink-conf.yaml*??-qu??-nm??-jm??-tm


??-n??-n


??


Best,
MuChen


----
??:"Yang Wang"https://imgchr.com/i/NJIn4x


 ??Flink session cluster
 ??root.default


 ??


 flink??1.10.0
 flink-conf.yaml??
 [fsql@10-42-63-116 conf]$ grep -v ^# flink-conf.yaml |grep -v ^$
 jobmanager.rpc.address: localhost
 jobmanager.rpc.port: 6123
 jobmanager.heap.size: 1024m
 taskmanager.memory.process.size: 2048m
 taskmanager.numberOfTaskSlots: 10
 parallelism.default: 1
 jobmanager.execution.failover-strategy: region

Re: 请教 yarn-session.sh启动任务指定的应用名和队列无效

2020-06-22 文章 Yang Wang
你可以看一下HDFS上面staging目录($HOME/.flink/application_id)里面有client端生效的*flink-conf.yaml*
看看里面的"yarn.application.queue"和"yarn.application.name"是不是预期的值。


另外,-n参数很早就不支持了,这样设置也不会起作用的,都是动态申请TM的


Best,
Yang

MuChen <9329...@qq.com> 于2020年6月22日周一 下午7:32写道:

> hi,all:
>
>
> 我有一个hadoop集群和一台机器A.
>
>
> 我在机器A上,使用bin/yarn-session.sh -n 5 -jm 1g -tm 4g -s 4 -qu root.flink -nm
> fsql-cli  /dev/null 21 命令,启动了一个yarn-session,
> 其中,指定了队列root.flink和应用名fsql-cli.
>
>
> 但是,在yarn控制台中看到应用的名称和队列跟指定的名称大多数情况下不一致(多次提交,偶尔一致),如图:
> https://imgchr.com/i/NJIn4x
>
>
> 应用名为:Flink session cluster
> 队列名为:root.default
>
>
> 请问为什么我指定的队列名和应用名没有生效?
>
>
> flink版本:1.10.0
> flink-conf.yaml配置:
> [fsql@10-42-63-116 conf]$ grep -v ^# flink-conf.yaml |grep -v ^$
> jobmanager.rpc.address: localhost
> jobmanager.rpc.port: 6123
> jobmanager.heap.size: 1024m
> taskmanager.memory.process.size: 2048m
> taskmanager.numberOfTaskSlots: 10
> parallelism.default: 1
> jobmanager.execution.failover-strategy: region


Re: flink任务失败重启时, flink last checkpoint 失败但任务仍然正常重启,导致 state 重启前后不一致

2020-06-22 文章 Congxian Qiu
hi

这里说的 state 不一致是什么意思呢?checkpoint 恢复保证全局的 state 被重置到之前某个成功的 checkpoint。

Best,
Congxian


莫失莫忘  于2020年6月22日周一 下午8:09写道:

> 如题,可以要求flink失败重启时 必须正常从checkpoint恢复,否则就重启失败吗?


flink???????????????? flink last checkpoint ???????????????????????????? state ??????????????

2020-06-22 文章 ????????
??flink?? 
??checkpoint

???? yarn-session.sh??????????????????????????????

2020-06-22 文章 MuChen
hi,all:


hadoop??A.


Abin/yarn-session.sh -n 5 -jm 1g -tm 4g -s 4 -qu root.flink -nm 
fsql-cli  /dev/null 21 yarn-session??
root.flinkfsql-cli.


yarn
https://imgchr.com/i/NJIn4x


??Flink session cluster
??root.default


??


flink??1.10.0
flink-conf.yaml??
[fsql@10-42-63-116 conf]$ grep -v ^# flink-conf.yaml |grep -v ^$
jobmanager.rpc.address: localhost
jobmanager.rpc.port: 6123
jobmanager.heap.size: 1024m
taskmanager.memory.process.size: 2048m
taskmanager.numberOfTaskSlots: 10
parallelism.default: 1
jobmanager.execution.failover-strategy: region

Re: flink 通过 --classpath 传入https-xxx.xxx.jar 在1.8上正常运行 在flink1.10 就会报传入的jar包找不到

2020-06-22 文章 Yang Wang
 -C,--classpath  Adds a URL to each user code
  classloader  on all nodes in the
  cluster. The paths must specify a
  protocol (e.g. file://) and be
  accessible on all nodes (e.g. by
means
  of a NFS share). You can use this
  option multiple times for
specifying
  more than one URL. The protocol
must
  be supported by the {@link
  java.net.URLClassLoader}.


--classpath这个参数需要提前将jar部署在各个节点上或者使用NFS,确认jar是存在的吗


Best,
Yang

程龙 <13162790...@163.com> 于2020年6月22日周一 上午11:43写道:

> 2020-06-22 10:16:34,379 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph- Sink:
> Unnamed (6/6) (5dd98ec92e0d5e53597cb7520643c7f5) switched from SCHEDULED to
> DEPLOYING.
> 2020-06-22 10:16:34,379 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph- Deploying
> Sink: Unnamed (6/6) (attempt #0) to container_1590655249980_0081_01_02
> @ al-bj-bigdata-inf-research-flink04 (dataPort=34781)
> 2020-06-22 10:16:34,456 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph- Source:
> Custom Source (5/6) (aca298438b9eb6fcf295cb8af6eebcd8) switched from
> DEPLOYING to RUNNING.
> 2020-06-22 10:16:34,481 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph- Filter ->
> Map (3/8) (0daed15d107c3031891f0c9e84093068) switched from DEPLOYING to
> RUNNING.
> 2020-06-22 10:16:34,492 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph- Sink:
> Unnamed (1/6) (44ca248aba351026452ba4abdb5f33a6) switched from DEPLOYING to
> RUNNING.
> 2020-06-22 10:16:34,497 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph- Filter ->
> Map (1/8) (70612735eb755269fe9590e8ab51d3e2) switched from DEPLOYING to
> RUNNING.
> 2020-06-22 10:16:34,512 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph- Filter ->
> Map (2/8) (baba331cd6bcde1f5a6024eac0b953b4) switched from DEPLOYING to
> RUNNING.
> 2020-06-22 10:16:34,524 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph- Source:
> Custom Source (4/6) (a34992362c2cf3634a29bd5a9c188754) switched from
> DEPLOYING to RUNNING.
> 2020-06-22 10:16:34,531 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph- Source:
> Custom Source (2/6) (7683c15e3ebb3e718c2439c6e32f0d7d) switched from
> DEPLOYING to RUNNING.
> 2020-06-22 10:16:34,564 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph- Source:
> Custom Source (6/6) (6ab1aaa5e1811c79c702197f984e9bb6) switched from
> DEPLOYING to RUNNING.
> 2020-06-22 10:16:34,609 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph- Source:
> Custom Source (1/6) (6245b508b8e0494f06ef71c6ad4954b6) switched from
> DEPLOYING to RUNNING.
> 2020-06-22 10:16:34,616 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph- Filter ->
> Map (4/8) (2ea049476f0b48fcd85dcd9084091e9f) switched from DEPLOYING to
> RUNNING.
> 2020-06-22 10:16:34,650 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph- Sink:
> Unnamed (2/6) (e785e44e4212dcc5279bcde761b28292) switched from DEPLOYING to
> RUNNING.
> 2020-06-22 10:16:34,656 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph- Sink:
> Unnamed (5/6) (97a5e72f731d14aa97d93253b71b6aeb) switched from DEPLOYING to
> RUNNING.
> 2020-06-22 10:16:34,662 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph- Filter ->
> Map (7/8) (59babdbc2d7bea362a2794a966fe59ef) switched from DEPLOYING to
> RUNNING.
> 2020-06-22 10:16:34,664 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph- Source:
> Custom Source (3/6) (aed33b78cc847561908ea43164e0311a) switched from
> DEPLOYING to RUNNING.
> 2020-06-22 10:16:34,669 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph- Filter ->
> Map (6/8) (fd87d0111b10e9f9027068a72e9ce209) switched from DEPLOYING to
> RUNNING.
> 2020-06-22 10:16:34,726 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph- Sink:
> Unnamed (6/6) (5dd98ec92e0d5e53597cb7520643c7f5) switched from DEPLOYING to
> RUNNING.
> 2020-06-22 10:16:34,729 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph- Filter ->
> Map (8/8) (22b73fa7f7435a405e1102a3480c09c1) switched from DEPLOYING to
> RUNNING.
> 2020-06-22 10:16:34,760 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph- Sink:
> Unnamed (4/6) (4c677579ef44cf394618af38a75497da) switched from DEPLOYING to
> RUNNING.
> 2020-06-22 10:16:37,081 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering
> checkpoint 1 @ 1592792197072 for job 1797e2f64b7b1caeb6356608290263cc.
> 2020-06-22 

Re:Re: pyflink的table api 能否根据 filed的值进行判断插入到不同的sink表中

2020-06-22 文章 jack
您好,jincheng老师,我已经验证了您提供的这种分开处理的逻辑,可以解决我的问题,非常感谢您的解惑




Best,
Jack







在 2020-06-22 14:28:04,"jincheng sun"  写道:

您好,jack:


Table API  不用 if/else 直接用类似逻辑即可:


val t1 = table.filter('x  > 2).groupBy(..)
val t2 = table.filter('x <= 2).groupBy(..)
t1.insert_into("sink1)
t2.insert_into("sink2")





Best,
Jincheng






jack  于2020年6月19日周五 上午10:35写道:





测试使用如下结构:
table= t_env.from_path("source")


if table.filter("logType=syslog"):
table.filter("logType=syslog").insert_into("sink1")
elif table.filter("logType=alarm"):
table.filter("logType=alarm").insert_into("sink2")




我测试了下,好像table.filter("logType=syslog").insert_into("sink1")生效,下面的elif不生效,原因是table.filter("logType=syslog")或者table.where在做条件判断的同时已经将数据进行过滤,走不到下面的分支??








在 2020-06-19 10:08:25,"jack"  写道:
>使用pyflink的table api 能否根据 filed的值进行判断插入到不同的sink表中?
>
>
>场景:使用pyflink通过filter进行条件过滤后插入到sink中,
>比如以下两条消息,logType不同,可以使用filter接口进行过滤后插入到sink表中:
>{
>"logType":"syslog",
>"message":"sla;flkdsjf"
>}
>{
>"logType":"alarm",
>"message":"sla;flkdsjf"
>}
>  t_env.from_path("source")\
>  .filter("logType=syslog")\
>  .insert_into("sink1")
>有方法直接在上面的代码中通过判断logType字段的类型实现类似if else的逻辑吗:
>if logType=="syslog":
>   insert_into(sink1)
>elif logType=="alarm":
>   insert_into(sink2)
>
>
>如果insert_into 有.filter .select等接口的返回值的话就好办了,可以接着往下通过filter进行判断,代码类似以下:
>
>
>  t_env.from_path("source")\
>  .filter("logType=syslog")\
>  .insert_into("sink1")\
>  .filter("logType=alarm")\
>  .insert_into("sink2")
>请各位大牛指点,感谢
>
>
>
>
>


Flink(1.8.3) UI exception is overwritten, and the cause of the failure is not displayed

2020-06-22 文章 Andrew
versin: 1.8.3graph: source - map - sink


Scenes??
source subtask failed causes the graph to restart, but the exception 
displayed on the flink UI is not the cause of the task failure


displayed??
JM log:
020-06-22 14:29:01.087 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph - Job 
baseInfoAdapter_20601 (20601159280210484110080369520601) switched from state 
RUNNING to FAILING.
java.lang.Exception: Could not perform checkpoint 87 for operator Sink: 
adapterOutput (19/30).
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:597)
at 
org.apache.flink.streaming.runtime.io.BarrierTracker.notifyCheckpoint(BarrierTracker.java:270)
at 
org.apache.flink.streaming.runtime.io.BarrierTracker.processBarrier(BarrierTracker.java:186)
at 
org.apache.flink.streaming.runtime.io.BarrierTracker.getNextNonBlocked(BarrierTracker.java:105)
at 
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:209)
at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:302)
at 
org.apache.flink.runtime.taskmanager.Task.run(Task.java:769)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.Exception: Could not complete snapshot 87 for operator 
Sink: adapterOutput (19/30).
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:422)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1115)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1057)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:731)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:643)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:588)
... 8 common frames omitted
Caused by: java.lang.Exception: Failed to send data to Kafka: The server 
disconnected before a response was received.
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.checkErroneous(FlinkKafkaProducerBase.java:375)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.snapshotState(FlinkKafkaProducerBase.java:363)
at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118)
at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99)
at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:395)
... 13 common frames omitted





TM log??Running to Cannceling
2020-06-22 15:39:19.816 INFO com.xxx.client.consumer.GroupConsumer 
- consumer xxx to jmq1230:xxx,READ,xxx,NONE is stopped.
2020-06-22 15:39:19.816 INFO org.apache.flink.runtime.taskmanager.Task 
- Source: baseInfo (79/90) (4e62a84f251d9c68a54e464cff51171e) switched 
from RUNNING to CANCELING.





Is this a known issue?

Re:Re: Re:Re:Re: Re: Re: flink run from checkpoit failed

2020-06-22 文章 Zhou Zach
https://issues.apache.org/jira/browse/FLINK-10636
看到这个issues说这个问题是Kafka 0.8的问题,我现在用的kafka是2.2.1+cdh6.3.2,这个kafka版本也有问题吗

















在 2020-06-22 15:16:14,"Congxian Qiu"  写道:
>1 首先,-s 后面跟的参数可以是 savepoint 也可以是 checkpoint path,从 retain checkpoint
>恢复就是这么启动的[1]
>2 从你的发的日志看,里面有一些认证相关的问题 `2020-06-22 13:00:59,368 ERROR
>org.apache.flink.shaded.curator.org.apache.curator.ConnectionState  -
>Authentication failed` 或许你可以先尝试解决下这个问题看看。
>
>[1]
>https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/state/checkpoints.html#resuming-from-a-retained-checkpoint
>Best,
>Congxian
>
>
>Zhou Zach  于2020年6月22日周一 下午3:03写道:
>
>> flink run -s 后面跟的参数是不是只能是savepointPath,不能是flnk job 自动checkpoint path吗
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> 在 2020-06-22 14:32:02,"Zhou Zach"  写道:
>> >重启了CDH6集群,还是报同样的错误,flink 故障恢复不成功,不敢上生产啊,哪位大佬帮忙看下啊
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >在 2020-06-22 13:21:01,"Zhou Zach"  写道:
>> >
>> >用yarn application kill flink job把yarn的application杀掉后,
>> >执行/opt/flink-1.10.0/bin/flink run -s
>> hdfs://nameservice1:8020/user/flink10/checkpoints/f1b6f5392cd5053db155e709ffe9f871/chk-15/_metadata
>> dataflow.sql.FromKafkaSinkJdbcForCountPerSecond
>> /data/warehouse/streaming/data-flow-1.0.jar,启动不起来,/opt/flink-1.10.0/log日志上传到附件了。。。
>> >
>> >
>> >执行/opt/flink-1.10.0/bin/flink run -c
>> dataflow.sql.FromKafkaSinkJdbcForCountPerSecond -m yarn-cluster -yjm 1024m
>> -ytm 8192m -p 2 -ys 4 -ynm UV -d data-flow-1.0.jar,是可以正常启动的,就是带上-s参数报错。。。
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >在 2020-06-21 09:16:45,"Congxian Qiu"  写道:
>> >>Hi
>> >>
>> >>这个作业的 application 有起来吗?起来了的话,可以看看 JM
>> >>log,如果没有起来,可以从提交客户端的那看看有没有更详细的提交日志。日志目录默认在 `/opt/flink-1.10.0/log` 下面
>> >>
>> >>Best,
>> >>Congxian
>> >>
>> >>
>> >>Zhou Zach  于2020年6月19日周五 下午8:15写道:
>> >>
>> >>> 我是per job模式,不是yarn session模式啊
>> >>>
>> >>>
>> >>>
>> >>>
>> >>>
>> >>>
>> >>>
>> >>>
>> >>>
>> >>>
>> >>>
>> >>>
>> >>>
>> >>>
>> >>>
>> >>>
>> >>>
>> >>> At 2020-06-19 20:06:47, "Rui Li"  wrote:
>> >>> >那得重启yarn session,再把作业提交上去
>> >>> >
>> >>> >On Fri, Jun 19, 2020 at 6:22 PM Zhou Zach  wrote:
>> >>> >
>> >>> >>
>> >>> >>
>> >>> >>
>> >>> >>
>> >>> >>
>> >>> >>
>> >>> >> 用yarn application kill flink
>> job把yarn的application杀掉了,杀掉后yarn没有重启flink
>> >>> job
>> >>> >>
>> >>> >>
>> >>> >>
>> >>> >>
>> >>> >>
>> >>> >>
>> >>> >>
>> >>> >>
>> >>> >>
>> >>> >>
>> >>> >>
>> >>> >> 在 2020-06-19 17:54:45,"Rui Li"  写道:
>> >>> >> >用yarn application kill flink job是说把yarn的application杀掉了吗?杀掉以后有没有重启呀
>> >>> >> >
>> >>> >> >On Fri, Jun 19, 2020 at 4:09 PM Zhou Zach 
>> wrote:
>> >>> >> >
>> >>> >> >>
>> >>> >> >>
>> >>> >> >> 在flink-1.10.0/conf/flink-conf.yaml中加了下面两个超时参数,不起作用
>> >>> >> >> akka.client.timeout: 6
>> >>> >> >> akka.ask.timeout: 600
>> >>> >> >>
>> >>> >> >> 有大佬知道是什么原因吗
>> >>> >> >>
>> >>> >> >>
>> >>> >> >>
>> >>> >> >>
>> >>> >> >>
>> >>> >> >>
>> >>> >> >>
>> >>> >> >>
>> >>> >> >>
>> >>> >> >>
>> >>> >> >>
>> >>> >> >>
>> >>> >> >>
>> >>> >> >>
>> >>> >> >> 在 2020-06-19 14:57:05,"Zhou Zach"  写道:
>> >>> >> >> >
>> >>> >> >> >
>> >>> >> >> >
>> >>> >> >> >
>> >>> >> >> >用yarn application kill flink job后,
>> >>> >> >> >执行/opt/flink-1.10.0/bin/flink run -s
>> >>> >> >>
>> >>> >>
>> >>>
>> /user/flink10/checkpoints/69e450574d8520ac5961e20a6fc4798a/chk-18/_metadata
>> >>> >> >> -d -c dataflow.sql.FromKafkaSinkJdbcForCountPerSecond
>> >>> >> >> /data/warehouse/streaming/data-flow-1.0.jar
>> >>> >> >> >
>> >>> >> >> >
>> >>> >> >> >
>> >>> >> >> >
>> >>> >> >> >
>> >>> >> >> >
>> >>> >> >> >
>> >>> >> >> >
>> >>> >> >> >2020-06-19 14:39:54,563 INFO
>> >>> >> >>
>> >>> >>
>> >>>
>> org.apache.flink.shaded.curator.org.apache.curator.framework.state.ConnectionStateManager
>> >>> >> >> - State change: CONNECTED
>> >>> >> >> >2020-06-19 14:39:54,664 INFO
>> >>> >> >>
>> >>> >>
>> >>>
>> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  -
>> >>> >> >> Starting ZooKeeperLeaderRetrievalService
>> /leader/rest_server_lock.
>> >>> >> >> >2020-06-19 14:40:24,728 INFO
>> >>> >> >>
>> >>> >>
>> >>>
>> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  -
>> >>> >> >> Stopping ZooKeeperLeaderRetrievalService
>> /leader/rest_server_lock.
>> >>> >> >> >2020-06-19 14:40:24,729 INFO
>> >>> >> >>
>> >>> >>
>> >>>
>> org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorFrameworkImpl
>> >>> >> >> - backgroundOperationsLoop exiting
>> >>> >> >> >2020-06-19 14:40:24,733 INFO
>> >>> >> >>
>> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper  -
>> >>> >> >> Session: 0x272b776faca2414 closed
>> >>> >> >> >2020-06-19 14:40:24,733 INFO
>> >>> >> >>
>> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  -
>> >>> >> >> EventThread shut down for session: 0x272b776faca2414
>> >>> >> >> >2020-06-19 14:40:24,734 ERROR
>> >>> org.apache.flink.client.cli.CliFrontend
>> >>> >> >>- 

Re: flinksql

2020-06-22 文章 Leonard Xu
Hi,
这个报错通常是缺少了 connector 相关的jar包,或者 connector 的with参数填写错误。
> flink sql-client下建的表
这是什么表,可以把建表 SQL 发出来看看吗?

Best,
Leonard Xu

flinksql

2020-06-22 文章 arhuawu
您好:
   我现在用flink1.10.1版本在研究那个sql-client共hive集成,目前遇到这样一个问题想请教一下:
   standalone部署,3台机器,hive用的是cdh5版本 hive1.1.0
   按官方文档,进行了集成,hive建的表在flink sql-client下能查询,但反过来,flink 
sql-client下建的表无法查询。(sql-client与hive都不能查)
   Could not find a suitable table factory for 
'org.apache.flink.table.factories.TableSourceFactory' in
the classpath.
   Reason: Required context properties mismatch.
   The following properties are requested:
   schema.0.data-type=INT
   schema.0.name=id
   The following factories have been considered:
   org.apache.flink.table.sources.CsvBatchTableSourceFactory
   org.apache.flink.table.sources.CsvAppendTableSourceFactory
   好象是少什么包,我查了一下,好象这个包里有flink-table-common-1.10.1.jar,放上去还不行。不知什么问题(详见附件)?
   集成要求用的包也都放到了lib下:
   flink-connector-hive_2.11-1.10.0.jar
   flink-shaded-hadoop-2-uber-2.6.5-8.0.jar
   hive-metastore-1.1.0.jar
   hive-exec-1.1.0.jar
   libfb303-0.9.2.jar 

Re: Re:Re:Re: Re: Re: flink run from checkpoit failed

2020-06-22 文章 Congxian Qiu
1 首先,-s 后面跟的参数可以是 savepoint 也可以是 checkpoint path,从 retain checkpoint
恢复就是这么启动的[1]
2 从你的发的日志看,里面有一些认证相关的问题 `2020-06-22 13:00:59,368 ERROR
org.apache.flink.shaded.curator.org.apache.curator.ConnectionState  -
Authentication failed` 或许你可以先尝试解决下这个问题看看。

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/state/checkpoints.html#resuming-from-a-retained-checkpoint
Best,
Congxian


Zhou Zach  于2020年6月22日周一 下午3:03写道:

> flink run -s 后面跟的参数是不是只能是savepointPath,不能是flnk job 自动checkpoint path吗
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-06-22 14:32:02,"Zhou Zach"  写道:
> >重启了CDH6集群,还是报同样的错误,flink 故障恢复不成功,不敢上生产啊,哪位大佬帮忙看下啊
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >在 2020-06-22 13:21:01,"Zhou Zach"  写道:
> >
> >用yarn application kill flink job把yarn的application杀掉后,
> >执行/opt/flink-1.10.0/bin/flink run -s
> hdfs://nameservice1:8020/user/flink10/checkpoints/f1b6f5392cd5053db155e709ffe9f871/chk-15/_metadata
> dataflow.sql.FromKafkaSinkJdbcForCountPerSecond
> /data/warehouse/streaming/data-flow-1.0.jar,启动不起来,/opt/flink-1.10.0/log日志上传到附件了。。。
> >
> >
> >执行/opt/flink-1.10.0/bin/flink run -c
> dataflow.sql.FromKafkaSinkJdbcForCountPerSecond -m yarn-cluster -yjm 1024m
> -ytm 8192m -p 2 -ys 4 -ynm UV -d data-flow-1.0.jar,是可以正常启动的,就是带上-s参数报错。。。
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >在 2020-06-21 09:16:45,"Congxian Qiu"  写道:
> >>Hi
> >>
> >>这个作业的 application 有起来吗?起来了的话,可以看看 JM
> >>log,如果没有起来,可以从提交客户端的那看看有没有更详细的提交日志。日志目录默认在 `/opt/flink-1.10.0/log` 下面
> >>
> >>Best,
> >>Congxian
> >>
> >>
> >>Zhou Zach  于2020年6月19日周五 下午8:15写道:
> >>
> >>> 我是per job模式,不是yarn session模式啊
> >>>
> >>>
> >>>
> >>>
> >>>
> >>>
> >>>
> >>>
> >>>
> >>>
> >>>
> >>>
> >>>
> >>>
> >>>
> >>>
> >>>
> >>> At 2020-06-19 20:06:47, "Rui Li"  wrote:
> >>> >那得重启yarn session,再把作业提交上去
> >>> >
> >>> >On Fri, Jun 19, 2020 at 6:22 PM Zhou Zach  wrote:
> >>> >
> >>> >>
> >>> >>
> >>> >>
> >>> >>
> >>> >>
> >>> >>
> >>> >> 用yarn application kill flink
> job把yarn的application杀掉了,杀掉后yarn没有重启flink
> >>> job
> >>> >>
> >>> >>
> >>> >>
> >>> >>
> >>> >>
> >>> >>
> >>> >>
> >>> >>
> >>> >>
> >>> >>
> >>> >>
> >>> >> 在 2020-06-19 17:54:45,"Rui Li"  写道:
> >>> >> >用yarn application kill flink job是说把yarn的application杀掉了吗?杀掉以后有没有重启呀
> >>> >> >
> >>> >> >On Fri, Jun 19, 2020 at 4:09 PM Zhou Zach 
> wrote:
> >>> >> >
> >>> >> >>
> >>> >> >>
> >>> >> >> 在flink-1.10.0/conf/flink-conf.yaml中加了下面两个超时参数,不起作用
> >>> >> >> akka.client.timeout: 6
> >>> >> >> akka.ask.timeout: 600
> >>> >> >>
> >>> >> >> 有大佬知道是什么原因吗
> >>> >> >>
> >>> >> >>
> >>> >> >>
> >>> >> >>
> >>> >> >>
> >>> >> >>
> >>> >> >>
> >>> >> >>
> >>> >> >>
> >>> >> >>
> >>> >> >>
> >>> >> >>
> >>> >> >>
> >>> >> >>
> >>> >> >> 在 2020-06-19 14:57:05,"Zhou Zach"  写道:
> >>> >> >> >
> >>> >> >> >
> >>> >> >> >
> >>> >> >> >
> >>> >> >> >用yarn application kill flink job后,
> >>> >> >> >执行/opt/flink-1.10.0/bin/flink run -s
> >>> >> >>
> >>> >>
> >>>
> /user/flink10/checkpoints/69e450574d8520ac5961e20a6fc4798a/chk-18/_metadata
> >>> >> >> -d -c dataflow.sql.FromKafkaSinkJdbcForCountPerSecond
> >>> >> >> /data/warehouse/streaming/data-flow-1.0.jar
> >>> >> >> >
> >>> >> >> >
> >>> >> >> >
> >>> >> >> >
> >>> >> >> >
> >>> >> >> >
> >>> >> >> >
> >>> >> >> >
> >>> >> >> >2020-06-19 14:39:54,563 INFO
> >>> >> >>
> >>> >>
> >>>
> org.apache.flink.shaded.curator.org.apache.curator.framework.state.ConnectionStateManager
> >>> >> >> - State change: CONNECTED
> >>> >> >> >2020-06-19 14:39:54,664 INFO
> >>> >> >>
> >>> >>
> >>>
> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  -
> >>> >> >> Starting ZooKeeperLeaderRetrievalService
> /leader/rest_server_lock.
> >>> >> >> >2020-06-19 14:40:24,728 INFO
> >>> >> >>
> >>> >>
> >>>
> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  -
> >>> >> >> Stopping ZooKeeperLeaderRetrievalService
> /leader/rest_server_lock.
> >>> >> >> >2020-06-19 14:40:24,729 INFO
> >>> >> >>
> >>> >>
> >>>
> org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorFrameworkImpl
> >>> >> >> - backgroundOperationsLoop exiting
> >>> >> >> >2020-06-19 14:40:24,733 INFO
> >>> >> >>
> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper  -
> >>> >> >> Session: 0x272b776faca2414 closed
> >>> >> >> >2020-06-19 14:40:24,733 INFO
> >>> >> >>
> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  -
> >>> >> >> EventThread shut down for session: 0x272b776faca2414
> >>> >> >> >2020-06-19 14:40:24,734 ERROR
> >>> org.apache.flink.client.cli.CliFrontend
> >>> >> >>- Error while running the command.
> >>> >> >> >org.apache.flink.client.program.ProgramInvocationException: The
> main
> >>> >> >> method caused an error: java.util.concurrent.ExecutionException:
> >>> >> >> org.apache.flink.runtime.client.JobSubmissionException: Failed to
> >>> submit
> >>> >> >> JobGraph.
> >>> >> >> >at
> >>> >> >>
> >>> >>
> >>>
> 

Re: flink 高可用问题

2020-06-22 文章 tison
你看一下你的 chk 间隔,看起来是作业还没调度起来就开始 chk 所以失败。可能原因资源不足,调度不起来或者调度得慢,你 chk 间隔又小,就这样了。

如果是一直 chk 以这个方式失败,应该看下调度的日志为啥迟迟调不起来

Best,
tison.


Yichao Yang <1048262...@qq.com> 于2020年6月22日周一 上午10:57写道:

> Hi
>
>
> 看日志应该只是INFO,而不是错误,你的job是做不了checkpoint吗?
>
>
> Best,
> Yichao Yang
>
>
>
>
> --原始邮件--
> 发件人:"Tony" 发送时间:2020年6月22日(星期一) 上午10:54
> 收件人:"user-zh"
> 主题:flink 高可用问题
>
>
>
> 你好。
>
>
> 我按着官方文档配置了flink的高可用(flink-conf.yaml)如下:
> high-availability:zookeeper
> high-availability.zookeeper.quorum:master:2181 ,slave1:2181,slave2:2181
> high-availability.zookeeper.path.root:/flink
> high-availability.cluster-id:/cluster_one
> highavailability.storageDir:hdfs://master:9000/flink/ha
>
>
> 我的flink和zookeeper都是在K8s的容器中
> job启动出现如下问题:麻烦帮忙看一下,谢谢。
> 2020-06-22 02:47:43,884 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> - Checkpoint triggering task Source:Kafka-Consumer - (Sink: Print to
> Std. Out, Filter -Query Map - Unwind - Custom Map - filter
> - Data Transformation - Filter) (1/1) of job
>  is not in state RUNNING but SCHEDULED
> instead. Aborting checkpoint.


Re:Re:Re:Re: Re: Re: flink run from checkpoit failed

2020-06-22 文章 Zhou Zach
flink run -s 后面跟的参数是不是只能是savepointPath,不能是flnk job 自动checkpoint path吗















在 2020-06-22 14:32:02,"Zhou Zach"  写道:
>重启了CDH6集群,还是报同样的错误,flink 故障恢复不成功,不敢上生产啊,哪位大佬帮忙看下啊
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>在 2020-06-22 13:21:01,"Zhou Zach"  写道:
>
>用yarn application kill flink job把yarn的application杀掉后,
>执行/opt/flink-1.10.0/bin/flink run -s 
>hdfs://nameservice1:8020/user/flink10/checkpoints/f1b6f5392cd5053db155e709ffe9f871/chk-15/_metadata
>  dataflow.sql.FromKafkaSinkJdbcForCountPerSecond 
>/data/warehouse/streaming/data-flow-1.0.jar,启动不起来,/opt/flink-1.10.0/log日志上传到附件了。。。
>
>
>执行/opt/flink-1.10.0/bin/flink run -c 
>dataflow.sql.FromKafkaSinkJdbcForCountPerSecond -m yarn-cluster -yjm 1024m 
>-ytm 8192m -p 2 -ys 4 -ynm UV -d data-flow-1.0.jar,是可以正常启动的,就是带上-s参数报错。。。
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>在 2020-06-21 09:16:45,"Congxian Qiu"  写道:
>>Hi
>>
>>这个作业的 application 有起来吗?起来了的话,可以看看 JM
>>log,如果没有起来,可以从提交客户端的那看看有没有更详细的提交日志。日志目录默认在 `/opt/flink-1.10.0/log` 下面
>>
>>Best,
>>Congxian
>>
>>
>>Zhou Zach  于2020年6月19日周五 下午8:15写道:
>>
>>> 我是per job模式,不是yarn session模式啊
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> At 2020-06-19 20:06:47, "Rui Li"  wrote:
>>> >那得重启yarn session,再把作业提交上去
>>> >
>>> >On Fri, Jun 19, 2020 at 6:22 PM Zhou Zach  wrote:
>>> >
>>> >>
>>> >>
>>> >>
>>> >>
>>> >>
>>> >>
>>> >> 用yarn application kill flink job把yarn的application杀掉了,杀掉后yarn没有重启flink
>>> job
>>> >>
>>> >>
>>> >>
>>> >>
>>> >>
>>> >>
>>> >>
>>> >>
>>> >>
>>> >>
>>> >>
>>> >> 在 2020-06-19 17:54:45,"Rui Li"  写道:
>>> >> >用yarn application kill flink job是说把yarn的application杀掉了吗?杀掉以后有没有重启呀
>>> >> >
>>> >> >On Fri, Jun 19, 2020 at 4:09 PM Zhou Zach  wrote:
>>> >> >
>>> >> >>
>>> >> >>
>>> >> >> 在flink-1.10.0/conf/flink-conf.yaml中加了下面两个超时参数,不起作用
>>> >> >> akka.client.timeout: 6
>>> >> >> akka.ask.timeout: 600
>>> >> >>
>>> >> >> 有大佬知道是什么原因吗
>>> >> >>
>>> >> >>
>>> >> >>
>>> >> >>
>>> >> >>
>>> >> >>
>>> >> >>
>>> >> >>
>>> >> >>
>>> >> >>
>>> >> >>
>>> >> >>
>>> >> >>
>>> >> >>
>>> >> >> 在 2020-06-19 14:57:05,"Zhou Zach"  写道:
>>> >> >> >
>>> >> >> >
>>> >> >> >
>>> >> >> >
>>> >> >> >用yarn application kill flink job后,
>>> >> >> >执行/opt/flink-1.10.0/bin/flink run -s
>>> >> >>
>>> >>
>>> /user/flink10/checkpoints/69e450574d8520ac5961e20a6fc4798a/chk-18/_metadata
>>> >> >> -d -c dataflow.sql.FromKafkaSinkJdbcForCountPerSecond
>>> >> >> /data/warehouse/streaming/data-flow-1.0.jar
>>> >> >> >
>>> >> >> >
>>> >> >> >
>>> >> >> >
>>> >> >> >
>>> >> >> >
>>> >> >> >
>>> >> >> >
>>> >> >> >2020-06-19 14:39:54,563 INFO
>>> >> >>
>>> >>
>>> org.apache.flink.shaded.curator.org.apache.curator.framework.state.ConnectionStateManager
>>> >> >> - State change: CONNECTED
>>> >> >> >2020-06-19 14:39:54,664 INFO
>>> >> >>
>>> >>
>>> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  -
>>> >> >> Starting ZooKeeperLeaderRetrievalService /leader/rest_server_lock.
>>> >> >> >2020-06-19 14:40:24,728 INFO
>>> >> >>
>>> >>
>>> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  -
>>> >> >> Stopping ZooKeeperLeaderRetrievalService /leader/rest_server_lock.
>>> >> >> >2020-06-19 14:40:24,729 INFO
>>> >> >>
>>> >>
>>> org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorFrameworkImpl
>>> >> >> - backgroundOperationsLoop exiting
>>> >> >> >2020-06-19 14:40:24,733 INFO
>>> >> >> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper  -
>>> >> >> Session: 0x272b776faca2414 closed
>>> >> >> >2020-06-19 14:40:24,733 INFO
>>> >> >> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  -
>>> >> >> EventThread shut down for session: 0x272b776faca2414
>>> >> >> >2020-06-19 14:40:24,734 ERROR
>>> org.apache.flink.client.cli.CliFrontend
>>> >> >>- Error while running the command.
>>> >> >> >org.apache.flink.client.program.ProgramInvocationException: The main
>>> >> >> method caused an error: java.util.concurrent.ExecutionException:
>>> >> >> org.apache.flink.runtime.client.JobSubmissionException: Failed to
>>> submit
>>> >> >> JobGraph.
>>> >> >> >at
>>> >> >>
>>> >>
>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
>>> >> >> >at
>>> >> >>
>>> >>
>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
>>> >> >> >at
>>> >> >>
>>> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
>>> >> >> >at
>>> >> >>
>>> >>
>>> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664)
>>> >> >> >at
>>> >> >> org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
>>> >> >> >at
>>> >> >>
>>> >>
>>> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895)
>>> >> >> >at
>>> >> >>
>>> >>
>>> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
>>> >> >> >at 

Re:Re:Re: Re: Re: flink run from checkpoit failed

2020-06-22 文章 Zhou Zach
重启了CDH6集群,还是报同样的错误,flink 故障恢复不成功,不敢上生产啊,哪位大佬帮忙看下啊
















在 2020-06-22 13:21:01,"Zhou Zach"  写道:

用yarn application kill flink job把yarn的application杀掉后,
执行/opt/flink-1.10.0/bin/flink run -s 
hdfs://nameservice1:8020/user/flink10/checkpoints/f1b6f5392cd5053db155e709ffe9f871/chk-15/_metadata
  dataflow.sql.FromKafkaSinkJdbcForCountPerSecond 
/data/warehouse/streaming/data-flow-1.0.jar,启动不起来,/opt/flink-1.10.0/log日志上传到附件了。。。


执行/opt/flink-1.10.0/bin/flink run -c 
dataflow.sql.FromKafkaSinkJdbcForCountPerSecond -m yarn-cluster -yjm 1024m -ytm 
8192m -p 2 -ys 4 -ynm UV -d data-flow-1.0.jar,是可以正常启动的,就是带上-s参数报错。。。



















在 2020-06-21 09:16:45,"Congxian Qiu"  写道:
>Hi
>
>这个作业的 application 有起来吗?起来了的话,可以看看 JM
>log,如果没有起来,可以从提交客户端的那看看有没有更详细的提交日志。日志目录默认在 `/opt/flink-1.10.0/log` 下面
>
>Best,
>Congxian
>
>
>Zhou Zach  于2020年6月19日周五 下午8:15写道:
>
>> 我是per job模式,不是yarn session模式啊
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> At 2020-06-19 20:06:47, "Rui Li"  wrote:
>> >那得重启yarn session,再把作业提交上去
>> >
>> >On Fri, Jun 19, 2020 at 6:22 PM Zhou Zach  wrote:
>> >
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >> 用yarn application kill flink job把yarn的application杀掉了,杀掉后yarn没有重启flink
>> job
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >> 在 2020-06-19 17:54:45,"Rui Li"  写道:
>> >> >用yarn application kill flink job是说把yarn的application杀掉了吗?杀掉以后有没有重启呀
>> >> >
>> >> >On Fri, Jun 19, 2020 at 4:09 PM Zhou Zach  wrote:
>> >> >
>> >> >>
>> >> >>
>> >> >> 在flink-1.10.0/conf/flink-conf.yaml中加了下面两个超时参数,不起作用
>> >> >> akka.client.timeout: 6
>> >> >> akka.ask.timeout: 600
>> >> >>
>> >> >> 有大佬知道是什么原因吗
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >> 在 2020-06-19 14:57:05,"Zhou Zach"  写道:
>> >> >> >
>> >> >> >
>> >> >> >
>> >> >> >
>> >> >> >用yarn application kill flink job后,
>> >> >> >执行/opt/flink-1.10.0/bin/flink run -s
>> >> >>
>> >>
>> /user/flink10/checkpoints/69e450574d8520ac5961e20a6fc4798a/chk-18/_metadata
>> >> >> -d -c dataflow.sql.FromKafkaSinkJdbcForCountPerSecond
>> >> >> /data/warehouse/streaming/data-flow-1.0.jar
>> >> >> >
>> >> >> >
>> >> >> >
>> >> >> >
>> >> >> >
>> >> >> >
>> >> >> >
>> >> >> >
>> >> >> >2020-06-19 14:39:54,563 INFO
>> >> >>
>> >>
>> org.apache.flink.shaded.curator.org.apache.curator.framework.state.ConnectionStateManager
>> >> >> - State change: CONNECTED
>> >> >> >2020-06-19 14:39:54,664 INFO
>> >> >>
>> >>
>> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  -
>> >> >> Starting ZooKeeperLeaderRetrievalService /leader/rest_server_lock.
>> >> >> >2020-06-19 14:40:24,728 INFO
>> >> >>
>> >>
>> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  -
>> >> >> Stopping ZooKeeperLeaderRetrievalService /leader/rest_server_lock.
>> >> >> >2020-06-19 14:40:24,729 INFO
>> >> >>
>> >>
>> org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorFrameworkImpl
>> >> >> - backgroundOperationsLoop exiting
>> >> >> >2020-06-19 14:40:24,733 INFO
>> >> >> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper  -
>> >> >> Session: 0x272b776faca2414 closed
>> >> >> >2020-06-19 14:40:24,733 INFO
>> >> >> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  -
>> >> >> EventThread shut down for session: 0x272b776faca2414
>> >> >> >2020-06-19 14:40:24,734 ERROR
>> org.apache.flink.client.cli.CliFrontend
>> >> >>- Error while running the command.
>> >> >> >org.apache.flink.client.program.ProgramInvocationException: The main
>> >> >> method caused an error: java.util.concurrent.ExecutionException:
>> >> >> org.apache.flink.runtime.client.JobSubmissionException: Failed to
>> submit
>> >> >> JobGraph.
>> >> >> >at
>> >> >>
>> >>
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
>> >> >> >at
>> >> >>
>> >>
>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
>> >> >> >at
>> >> >>
>> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
>> >> >> >at
>> >> >>
>> >>
>> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664)
>> >> >> >at
>> >> >> org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
>> >> >> >at
>> >> >>
>> >>
>> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895)
>> >> >> >at
>> >> >>
>> >>
>> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
>> >> >> >at java.security.AccessController.doPrivileged(Native
>> Method)
>> >> >> >at javax.security.auth.Subject.doAs(Subject.java:422)
>> >> >> >at
>> >> >>
>> >>
>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875)
>> >> >> >at
>> >> >>
>> >>
>> 

Re: pyflink的table api 能否根据 filed的值进行判断插入到不同的sink表中

2020-06-22 文章 jincheng sun
您好,jack:

Table API  不用 if/else 直接用类似逻辑即可:

val t1 = table.filter('x  > 2).groupBy(..)
val t2 = table.filter('x <= 2).groupBy(..)
t1.insert_into("sink1)
t2.insert_into("sink2")


Best,
Jincheng



jack  于2020年6月19日周五 上午10:35写道:

>
> 测试使用如下结构:
> table= t_env.from_path("source")
>
> if table.filter("logType=syslog"):
> table.filter("logType=syslog").insert_into("sink1")
> elif table.filter("logType=alarm"):
> table.filter("logType=alarm").insert_into("sink2")
>
>
> 我测试了下,好像table
> .filter("logType=syslog").insert_into("sink1")生效,下面的elif不生效,原因是
> table.filter("logType=syslog")或者table.where在做条件判断的同时已经将数据进行过滤,走不到下面的分支??
>
>
>
>
> 在 2020-06-19 10:08:25,"jack"  写道:
> >使用pyflink的table api 能否根据 filed的值进行判断插入到不同的sink表中?
> >
> >
> >场景:使用pyflink通过filter进行条件过滤后插入到sink中,
> >比如以下两条消息,logType不同,可以使用filter接口进行过滤后插入到sink表中:
> >{
> >"logType":"syslog",
> >"message":"sla;flkdsjf"
> >}
> >{
> >"logType":"alarm",
> >"message":"sla;flkdsjf"
> >}
> >  t_env.from_path("source")\
> >  .filter("logType=syslog")\
> >  .insert_into("sink1")
> >有方法直接在上面的代码中通过判断logType字段的类型实现类似if else的逻辑吗:
> >if logType=="syslog":
> >   insert_into(sink1)
> >elif logType=="alarm":
> >   insert_into(sink2)
> >
> >
> >如果insert_into 有.filter .select等接口的返回值的话就好办了,可以接着往下通过filter进行判断,代码类似以下:
> >
> >
> >  t_env.from_path("source")\
> >  .filter("logType=syslog")\
> >  .insert_into("sink1")\
> >  .filter("logType=alarm")\
> >  .insert_into("sink2")
> >请各位大牛指点,感谢
> >
> >
> >
> >
> >
>
>