flink1.11 DDL定义kafka source报错

2020-08-06 文章 阿华田
代码如下


| |
阿华田
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制



Re: flink table api 中数据库字段大小写问题

2020-08-06 文章 lgs
schema是public
问题在这里:ERROR: column "recordid" of relation "alarm_history_data" does not
exist

数据库表里面是“recordId”,这里的提示变成了“recordid”




--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: In the future,the community plans to extend its functionality by providing a REST-based SQL

2020-08-06 文章 Sebastian Liu
目前社区的flink-sql-client module的确只支持embedded
mode,目前一个client对应一个session以及executor来提交管理sql job,
session相关代码在module的gateway目录下。这块的方向应该是会将gateway 发展成一个独立的restful web
service(任务网关), 本质是一个session manager, serve多个client的sql statement请求.
从我的理解看,client的gateway mode,应该是会先启动内置的gateway service,然后client向gateway提交请求。
Reference:
Sql gateway: https://github.com/ververica/flink-sql-gateway
jdbc driver: https://github.com/ververica/flink-jdbc-driver  (jdbc <-> rest
http request)

air23  于2020年8月6日周四 下午3:19写道:

> Limitations & Future
> The current SQL Client only supports embedded mode. In the future, the
> community plans to extend its functionality by providing a REST-based SQL
> Client Gateway, see more in FLIP-24 and FLIP-91.
>
>
>
>
> 你好 在官方文档上看到了。请问这个还在计划中。是一种rest sql web客户端吗?



-- 

*With kind regards

Sebastian Liu 刘洋
Institute of Computing Technology, Chinese Academy of Science
Mobile\WeChat: +86—15201613655
E-mail: liuyang0...@gmail.com 
QQ: 3239559*


Re: 【PyFlink】对于数据以Csv()格式写入kafka报错,以及使用python udf时无法启动udf

2020-08-06 文章 lgs
Hi Jincheng,

我现在碰到同样的问题,udf运行的时候会打印这样的log:
2020-08-07 03:06:45,920 INFO 
org.apache.beam.runners.fnexecution.environment.ProcessEnvironmentFactory []
- Still waiting for startup of environment
'/usr/local/lib64/python3.6/site-packages/pyflink/bin/pyflink-udf-runner.sh'
for worker id 1-1
然后过一阵就pyflink退出了。

我是用local模式运行的。我的beam版本是2.19 安装最新的2.23也是一样的问题。
apache-flink版本是1.11.0

请问有什么办法可以排查这个错误?



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re:Re: Flink 1.10 on Yarn

2020-08-06 文章 chenkaibit
hi xuhaiLong,看日志发生的 checkpoint nullpointer 是个已知的问题,具体可以查看下面两个jira。
你用的jdk版本是多少呢?目前发现使用 jdk8_40/jdk8_60 + flink-1.10 会出现 checkpoint 
nullpointer,可以把jdk升级下版本试一下
https://issues.apache.org/jira/browse/FLINK-18196
https://issues.apache.org/jira/browse/FLINK-17479




在 2020-08-07 12:50:23,"xuhaiLong"  写道:

sorry,我添加错附件了


是的,taskmanager.memory.jvm-metaspace.size 为默认配置
On 8/7/2020 11:43,Yangze Guo wrote:
日志没有贴成功,taskmanager.memory.jvm-metaspace.size目前是默认配置么?

Best,
Yangze Guo

On Fri, Aug 7, 2020 at 11:38 AM xuhaiLong  wrote:



Hi


场景:1 tm 三个slot,run了三个job


三个job 运行的时候 出现了 ck 过程中空指针异常,导致任务一致重启。最终导致`Metaspace` 空间占满,出现 
`java.lang.OutOfMemoryError: Metaspace. The metaspace out-of-memory error has 
occurred. This can mean two things: either the job requires a larger size of 
JVM metaspace to load classes or there is a class loading leak. In the first 
case 'taskmanager.memory.jvm-metaspace.size' configuration option should be 
increased. If the error persists (usually in cluster after several job 
(re-)submissions) then there is probably a class loading leak which has to be 
investigated and fixed. The task executor has to be shutdown...
`


附件为部分异常信息


疑问:
1. 为什么会在 ck 时候出现空指针?(三个 job 为同一个 kafka topic,通过ck 恢复 job 可以正常运行,应该不是数据的问题)
2. 通过日志看,是可以重启的,为什么自动重启后还存在这个问题,导致一直重启?


感谢~~~
从网易邮箱大师发来的云附件
08-07error.txt(730.4KB,2020年8月22日 11:37 到期)
下载


Re: Flink 1.11.1 on k8s 如何配置hadoop

2020-08-06 文章 caozhen
顺手贴一下flink1.11.1的hadoop集成wiki:
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/hadoop.html

根据官网说不再提供flink-shaded-hadoop-2-uber。并给出以下两种解决方式

1、建议使用HADOOP_CLASSPATH加载hadoop依赖
2、或者将hadoop依赖放到flink客户端lib目录下

*我在用1.11.1 flink on
yarn时,使用的是第二种方式,下载hadoop-src包,将一些常用依赖拷贝到lib目录下。(这可能会和你的mainjar程序发生类冲突问题,需要调试)

我觉得目前这种方式不好,只是暂时解决问题。还是应该有flink-shaded-hadoop包,正在尝试打包,有些问题还没完全解决。
*



--
Sent from: http://apache-flink.147419.n8.nabble.com/

flinksql连接kafka,数据里面有个字段名字是time

2020-08-06 文章 leiyanrui
使用flinksql连接kafka,kafka的数据格式内部有个字段叫time,我在create
table的时候将time字段加了反单引号还是不行,报错,有什么别的方法吗




--
Sent from: http://apache-flink.147419.n8.nabble.com/

Flink 1.11.1 on k8s 如何配置hadoop

2020-08-06 文章 RS
Hi,
Flink 1.11.1 想运行到K8S上面, 使用的镜像是flink:1.11.1-scala_2.12, 按照官网上面介绍的, 部署session 
cluster, jobmanager和taskmanager都启动成功了
然后提交任务的时候会报错:
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: 
Hadoop is not in the classpath/dependencies.
at 
org.apache.flink.core.fs.UnsupportedSchemeFactory.create(UnsupportedSchemeFactory.java:58)
at 
org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:487)


提示找不到Hadoop的依赖, 从官网上介绍的话, 1.11开始不建议使用flink-shaded-hadoop-2-uber, 
需要HADOOP_CLASSPATH
但是还是不太清楚怎么和K8S部署结合起来, 官网说的比较简单粗略, 有人有实际成功的案例可以分享下吗?


Thx



Re: Flink 1.10 on Yarn

2020-08-06 文章 Yangze Guo
日志没有贴成功,taskmanager.memory.jvm-metaspace.size目前是默认配置么?

Best,
Yangze Guo

On Fri, Aug 7, 2020 at 11:38 AM xuhaiLong  wrote:
>
>
>
> Hi
>
>
> 场景:1 tm 三个slot,run了三个job
>
>
> 三个job 运行的时候 出现了 ck 过程中空指针异常,导致任务一致重启。最终导致`Metaspace` 空间占满,出现 
> `java.lang.OutOfMemoryError: Metaspace. The metaspace out-of-memory error has 
> occurred. This can mean two things: either the job requires a larger size of 
> JVM metaspace to load classes or there is a class loading leak. In the first 
> case 'taskmanager.memory.jvm-metaspace.size' configuration option should be 
> increased. If the error persists (usually in cluster after several job 
> (re-)submissions) then there is probably a class loading leak which has to be 
> investigated and fixed. The task executor has to be shutdown...
> `
>
>
> 附件为部分异常信息
>
>
> 疑问:
> 1. 为什么会在 ck 时候出现空指针?(三个 job 为同一个 kafka topic,通过ck 恢复 job 可以正常运行,应该不是数据的问题)
> 2. 通过日志看,是可以重启的,为什么自动重启后还存在这个问题,导致一直重启?
>
>
> 感谢~~~
> 从网易邮箱大师发来的云附件
> 08-07error.txt(730.4KB,2020年8月22日 11:37 到期)
> 下载


Re: flink timerservice注册的timer定时器只有少部分触发

2020-08-06 文章 Congxian Qiu
Hi
   对于 event time 的处理来说,不建议注册 timer 的时候使用 System.currentTimeMillis()
这种系统时间,这两个时间可能会不一样,可以使用 TimerService 中的 currentWatermark 表示当前的 event time

Best,
Congxian


jsqf  于2020年8月6日周四 下午9:53写道:

> 试试 重写 onTimer 方法
> 可以参考
>
> https://github.com/JSQF/flink10_learn/blob/master/src/main/scala/com/yyb/flink10/DataStream/ProcessFunction/OperatorProcessFunctionDemo.java
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Flink 1.10 on Yarn

2020-08-06 文章 xuhaiLong


Hi


场景:1 tm 三个slot,run了三个job


三个job 运行的时候 出现了 ck 过程中空指针异常,导致任务一致重启。最终导致`Metaspace` 空间占满,出现 
`java.lang.OutOfMemoryError: Metaspace. The metaspace out-of-memory error has 
occurred. This can mean two things: either the job requires a larger size of 
JVM metaspace to load classes or there is a class loading leak. In the first 
case 'taskmanager.memory.jvm-metaspace.size' configuration option should be 
increased. If the error persists (usually in cluster after several job 
(re-)submissions) then there is probably a class loading leak which has to be 
investigated and fixed. The task executor has to be shutdown...
`


附件为部分异常信息


疑问:
1. 为什么会在 ck 时候出现空指针?(三个 job 为同一个 kafka topic,通过ck 恢复 job 可以正常运行,应该不是数据的问题)
2. 通过日志看,是可以重启的,为什么自动重启后还存在这个问题,导致一直重启?


感谢~~~
从网易邮箱大师发来的云附件
08-07error.txt(730.4KB,2020年8月22日 11:37 到期)
下载

Re: Kerberos 动态参数认证问题

2020-08-06 文章 caozhen



我的解决方式是改下 flink 的源码。

把参数 CliFrontend的args 传给 FlinkYarnSessionCli 类。并将args中的这些kv赋给configuration。

再重新打dist包



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flinksql jdbc异步维表

2020-08-06 文章 Leonard Xu
Hi

社区已经有一个issue在跟进了, 你可以关注下

Best
Leonard

[1] https://issues.apache.org/jira/browse/FLINK-14902 


> 在 2020年8月7日,11:03,todd  写道:
> 
> JdbcDynamicTableSource默认以同步方式加载JDBC数据,未来是否有计划提供异步接口,用户可以自主选择第三方异步框架。
> 
> 
> 
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/



flinksql jdbc异步维表

2020-08-06 文章 todd
JdbcDynamicTableSource默认以同步方式加载JDBC数据,未来是否有计划提供异步接口,用户可以自主选择第三方异步框架。



--
Sent from: http://apache-flink.147419.n8.nabble.com/

使用StreamTableEnvironment.createTemporarySystemFunction注册UD(T)F异常

2020-08-06 文章 zz zhang
执行如下代码提示异常,改为旧方法StreamTableEnvironment.registerFunction执行正常,
Flink version: 1.11.1

package com.test;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.types.Row;


public class TestUTDFOk {
public static class UDTF extends TableFunction {

public void eval(String input) {
Row row = new Row(3);
row.setField(0, input);
row.setField(1, input.length());
row.setField(2, input +  2);
collect(row);
}
}

public static  class UDF extends ScalarFunction {
public String eval(Row row, Integer index) {
try {
return String.valueOf(row.getField(index));
} catch (Exception e) {
throw e;
}
}
}

public static void main(String[] args) throws Exception {

StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(
env,
EnvironmentSettings.newInstance().useBlinkPlanner().build());
//tEnv.registerFunction("udtf", new UDTF());
//tEnv.registerFunction("udf", new UDF());
tEnv.createTemporarySystemFunction("udtf", new UDTF());
tEnv.createTemporarySystemFunction("udf", new UDF());

tEnv.createTemporaryView("source", tEnv.fromValues("a", "b",
"c").as("f0"));
String sinkDDL = "create table sinkTable ("
+ "f0 String"
+ ", x String"
+ ", y String"
+ ", z String"
+ ") with ("
+ "'connector.type' = 'filesystem',"
+ "'format.type' = 'csv',"
+ "'connector.path' =
'F:\\workspace\\douyu-git\\bd-flink\\core\\logs\\a.csv'"
+ ")";
String udtfCall = "insert into sinkTable SELECT S.f0"
+ ", udf(f1, 0) as x"
+ ", udf(f1, 1) as y"
+ ", udf(f1, 2) as z"
+ " FROM source as S, LATERAL TABLE(udtf(f0)) as T(f1)";

tEnv.executeSql(sinkDDL);
tEnv.executeSql(udtfCall);
}
}

异常如下:
Exception in thread "main"
org.apache.flink.table.api.ValidationException: SQL validation failed.
An error occurred in the type inference logic of function 'udf'.
at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:146)
at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:108)
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:185)
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:525)
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:202)
at 
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:684)
at com.test.TestUTDFOk.main(TestUTDFOk.java:64)
Caused by: org.apache.flink.table.api.ValidationException: An error
occurred in the type inference logic of function 'udf'.
at 
org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.convertToBridgingSqlFunction(FunctionCatalogOperatorTable.java:165)
at 
org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.convertToSqlFunction(FunctionCatalogOperatorTable.java:148)
at 
org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.lambda$lookupOperatorOverloads$0(FunctionCatalogOperatorTable.java:100)
at java.util.Optional.flatMap(Optional.java:241)
at 
org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.lookupOperatorOverloads(FunctionCatalogOperatorTable.java:99)
at 
org.apache.calcite.sql.util.ChainedSqlOperatorTable.lookupOperatorOverloads(ChainedSqlOperatorTable.java:73)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1303)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1288)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1318)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1288)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:1052)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:766)
at 

Re: stream sink hive 在hdfs ha模式下 java.net.UnknownHostException: nameservices1

2020-08-06 文章 chengyanan1...@foxmail.com
注册hiveCatalog的时候
将 hdfs-site.xml文件 放在hiveConfDir下即可



chengyanan1...@foxmail.com

发件人: Yang Wang
发送时间: 2020-08-07 10:15
收件人: user-zh
主题: Re: stream sink hive 在hdfs ha模式下
对hive不是很清楚,但报java.net.UnknownHostException: nameservices1这种错
一般是因为没有加载到HDFS的配置,core-site.xml和hdfs-site.xml
因为nameservices1本质上不是一个host,而是在hdfs-site.xml里面定义的,通过它来找nn1、nn2的地址
 
 
Best,
Yang
 
 于2020年8月7日周五 上午8:34写道:
 
> 可以在flink-conf.yaml中加入 env.hadoop.conf.dir: 指向hadooo配置文件目录
>
> 发自我的iPhone
>
> > 在 2020年8月7日,00:16,Jark Wu  写道:
> >
> > 可以提供下完整的异常栈吗?以及使用的版本号和代码。
> >
> >> On Mon, 3 Aug 2020 at 13:32, air23  wrote:
> >>
> >> hi 你好
> >>  我这边集群是cdh的。 配置了hdfs ha模式
> >> 在使用 kafka sink 到hive 时候找不到nameservices
> >> java.lang.IllegalArgumentException: java.net.UnknownHostException:
> >> nameservices1
> >>
> >>
> >> 请问 在ha模式下 应该怎么配置
>
>


Re: flink1.10.1/1.11.1 使用sql 进行group 和 时间窗口 操作后 状态越来越大

2020-08-06 文章 chengyanan1...@foxmail.com
HI:
请问这个问题有合理的解释吗,持续关注中。。。
 
发件人: 鱼子酱
发送时间: 2020-08-03 13:50
收件人: user-zh
主题: Re: flink1.10.1/1.11.1 使用sql 进行group 和 时间窗口 操作后 状态越来越大
hi,您好:
我改回增量模式重新收集了一些数据:
1、数据处理速度:3000条每秒,是测试环境的,压力比较稳定,几乎没有波动
2、checkpoint是interval设置的是5秒
3、目前这个作业是每分钟一个窗口
4、并行度设置的1,使用on-yarn模式
 
刚启动的时候,如下:
 
 
18分钟后,如下:
 
 
checkpoints设置:
 
 
hdfs上面大小:
 
 
页面上看到的大小:
 
 
 
Congxian Qiu wrote
> Hi   鱼子酱
> 能否把在使用增量 checkpoint 的模式下,截图看一下 checkpoint size 的走势呢?另外可以的话,也麻烦你在每次
> checkpoint 做完之后,到 hdfs 上 ls 一下 checkpoint 目录的大小。
> 另外有一个问题还需要回答一下,你的处理速度大概是多少,state 的更新频率能否评估一下呢?
> 
> Best,
> Congxian
> 
> 
> 鱼子酱 <
 
> 384939718@
 
>> 于2020年7月30日周四 上午10:43写道:
> 
>> 感谢!
>>
>> flink1.11.1版本里面,我尝试了下面两种backend,目前运行了20多个小时,
>> 能够看到状态的大小在一个区间内波动,没有发现一直增长的情况了。
>> StateBackend backend =new
>>
>> RocksDBStateBackend("hdfs:///checkpoints-data/"+yamlReader.getValueByKey("jobName").toString()+"/",false);
>> StateBackend backend =new
>>
>> FsStateBackend("hdfs:///checkpoints-data/"+yamlReader.getValueByKey("jobName").toString()+"/",false);
>>
>>
>> 这样看,有可能是RocksDBStateBackend增量模式这边可能存在一些问题。
>> RocksDBStateBackend:
>> http://apache-flink.147419.n8.nabble.com/file/t793/444.png;
>> FsStateBackend:
>> http://apache-flink.147419.n8.nabble.com/file/t793/555.png;
>>
>>
>>
>>
>> --
>> Sent from: http://apache-flink.147419.n8.nabble.com/
>>
 
 
 
 
 
--
Sent from: http://apache-flink.147419.n8.nabble.com/
 


Re: flink-sql-connector-elasticsearch6_2.11_1.10.0 与 flink-connector-elasticsearch6_2.11_1.10.0 并存问题

2020-08-06 文章 Leonard Xu
Hi

你这种场景使用sql jar就可以了,sql jar 里面包含了 es connector 相关的依赖,两者确实不能共存,因为sql jar 对 
es的依赖做了shade,而 es connector jar 没有对es的依赖做shade.

另外,你的异常栈应该是一个已知的lambda表达式序列化问题[1], 在1.11.0已经修复,可以升级1.11.1试下?

Best
Leonard

[1] https://issues.apache.org/jira/browse/FLINK-18006 


> 在 2020年8月7日,09:26,费文杰  写道:
> 
> 
> HI:
>   
> 使用的flink1.10环境,因为使用sql-client,我在flink/lib目录下放置了flink-sql-connector-elasticsearch6_2.11_1.10.0.jar,因为同事有直接在代码里写的sink到elasticsearch,所以他引入了flink-connector-elasticsearch6_2.11_1.10.0.jar,
> 这样就会导致他的任务报错。为了重现问题,我在本机同时引入了这两个依赖包:
> 
>  org.apache.httpcomponents
>  httpclient
>  4.5.2
>
> 
>
>  org.elasticsearch.client
>  elasticsearch-rest-high-level-client
>  ${elasticsearch.version}
>
> 
>
>  org.apache.flink
>  flink-connector-elasticsearch6_2.11
>  ${flink.version}
>  
>
> 
>
>  org.apache.flink
>  flink-sql-connector-elasticsearch6_2.11
>  ${flink.version}
>  provided
>
> 
> 本地运行,有以下报错信息:
> Caused by: org.apache.flink.client.program.ProgramInvocationException: Job 
> failed (JobID: bb3bf961144b75bfc71c9dd3efaf59f9)
> at 
> org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:112)
> at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602)
> at 
> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
> at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
> at 
> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
> at 
> org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:874)
> at akka.dispatch.OnComplete.internal(Future.scala:264)
> at akka.dispatch.OnComplete.internal(Future.scala:261)
> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191)
> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188)
> at scala.concurrent.impl.CallbackRunnable.run$$$capture(Promise.scala:36)
> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala)
> at 
> org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:74)
> at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
> at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
> at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:572)
> at 
> akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:22)
> at 
> akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21)
> at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436)
> at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435)
> at scala.concurrent.impl.CallbackRunnable.run$$$capture(Promise.scala:36)
> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala)
> at 
> akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
> at 
> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)
> at 
> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
> at 
> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
> at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
> at 
> akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)
> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
> at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at 
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at 
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job 
> execution failed.
> at 
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147)
> at 
> org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:110)
> ... 33 more
> Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by 
> NoRestartBackoffTimeStrategy
> at 
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:110)
> at 
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:76)
> at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192)
> at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:186)
> at 
> 

转发:Sql-client的checkpoint问题

2020-08-06 文章 king


Checkpoint只生成了shared和taskowned目录,没有chk,望解答,谢谢
| |
king
|
|
邮箱:kingjinhe2...@163.com
|

Signature is customized by Netease Mail Master

- 转发的邮件 -

发件人: king
发送日期: 2020年08月07日 09:05
收件人: user-zh
主题: 转发:Sql-client的checkpoint问题


抱歉,不是flink-site.yaml是flink-conf.yaml
| |
king
|
|
邮箱:kingjinhe2...@163.com
|

Signature is customized by Netease Mail Master

- 转发的邮件 -

发件人: king
发送日期: 2020年08月07日 08:23
收件人: user-zh
主题: Sql-client的checkpoint问题


您好,flink1.11.0,请问,
1.sql-client 如何设置checkpoint时间(生成周期),在做file 
streaming时候hdfs文件一直In-progress处状态,不能Finalized
2.之前在flink-site.yaml文件中设置了checkpoint,systemfile类型,只生成了另外两个目录,没有生成chk,望解答,谢谢


以上问题在编程方式下无问题。
| |
king
|
|
邮箱:kingjinhe2...@163.com
|

Signature is customized by Netease Mail Master

Re: stream sink hive 在hdfs ha模式下

2020-08-06 文章 Yang Wang
对hive不是很清楚,但报java.net.UnknownHostException: nameservices1这种错
一般是因为没有加载到HDFS的配置,core-site.xml和hdfs-site.xml
因为nameservices1本质上不是一个host,而是在hdfs-site.xml里面定义的,通过它来找nn1、nn2的地址


Best,
Yang

 于2020年8月7日周五 上午8:34写道:

> 可以在flink-conf.yaml中加入 env.hadoop.conf.dir: 指向hadooo配置文件目录
>
> 发自我的iPhone
>
> > 在 2020年8月7日,00:16,Jark Wu  写道:
> >
> > 可以提供下完整的异常栈吗?以及使用的版本号和代码。
> >
> >> On Mon, 3 Aug 2020 at 13:32, air23  wrote:
> >>
> >> hi 你好
> >>  我这边集群是cdh的。 配置了hdfs ha模式
> >> 在使用 kafka sink 到hive 时候找不到nameservices
> >> java.lang.IllegalArgumentException: java.net.UnknownHostException:
> >> nameservices1
> >>
> >>
> >> 请问 在ha模式下 应该怎么配置
>
>


flinksql写出到kafka

2020-08-06 文章 leiyanrui
flink1.10通过sql的形式连接kafka,处理数据写出到kafka,然后我里面大概是10个这样的逻辑,insert into sink
select
...,10个sql都是直接sink的,运行几分钟就报错,org.apache.flink.kafka.shaded.org.apache.kafka.common.errors.TimeoutException:
Topic boss.nlp.hbase.feature-sync-60 not present in metadata after 6 ms.
但是我吧这个10个select语句union后在sink到kafka,没有这个问题,这个有什么合理的解释吗



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink-sql-connector-elasticsearch6_2.11_1.10.0 与 flink-connector-elasticsearch6_2.11_1.10.0 并存问题

2020-08-06 文章 Yangze Guo
Hi,

flink-sql-connector-elasticsearch6_2.11已经打进了es的相关依赖,直接非provided引入应该是没有问题的,您说的报一样的错是错误栈完全相同么?在ide里面直接跑还是提交任务?

Best,
Yangze Guo

On Fri, Aug 7, 2020 at 9:32 AM JasonLee <17610775...@163.com> wrote:
>
> Hi
>
>
> 只引入-sql那个包就行了 代码也应该是可以直接用这个包的
>
>
> | |
> JasonLee
> |
> |
> 邮箱:17610775...@163.com
> |
>
> Signature is customized by Netease Mail Master
>
> 在2020年08月07日 09:26,费文杰 写道:
>
> HI:
>   
> 使用的flink1.10环境,因为使用sql-client,我在flink/lib目录下放置了flink-sql-connector-elasticsearch6_2.11_1.10.0.jar,因为同事有直接在代码里写的sink到elasticsearch,所以他引入了flink-connector-elasticsearch6_2.11_1.10.0.jar,
> 这样就会导致他的任务报错。为了重现问题,我在本机同时引入了这两个依赖包:
> 
>  org.apache.httpcomponents
>  httpclient
>  4.5.2
>
>
>
>  org.elasticsearch.client
>  elasticsearch-rest-high-level-client
>  ${elasticsearch.version}
>
>
>
>  org.apache.flink
>  flink-connector-elasticsearch6_2.11
>  ${flink.version}
>  
>
>
>
>  org.apache.flink
>  flink-sql-connector-elasticsearch6_2.11
>  ${flink.version}
>  provided
>
>
> 本地运行,有以下报错信息:
> Caused by: org.apache.flink.client.program.ProgramInvocationException: Job 
> failed (JobID: bb3bf961144b75bfc71c9dd3efaf59f9)
> at 
> org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:112)
> at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602)
> at 
> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
> at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
> at 
> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
> at 
> org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:874)
> at akka.dispatch.OnComplete.internal(Future.scala:264)
> at akka.dispatch.OnComplete.internal(Future.scala:261)
> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191)
> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188)
> at scala.concurrent.impl.CallbackRunnable.run$$$capture(Promise.scala:36)
> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala)
> at 
> org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:74)
> at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
> at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
> at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:572)
> at 
> akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:22)
> at 
> akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21)
> at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436)
> at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435)
> at scala.concurrent.impl.CallbackRunnable.run$$$capture(Promise.scala:36)
> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala)
> at 
> akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
> at 
> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)
> at 
> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
> at 
> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
> at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
> at 
> akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)
> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
> at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at 
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at 
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job 
> execution failed.
> at 
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147)
> at 
> org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:110)
> ... 33 more
> Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by 
> NoRestartBackoffTimeStrategy
> at 
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:110)
> at 
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:76)
> at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192)
> at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:186)
> at 
> 

Re: flink1.11 es connector

2020-08-06 文章 Dream-底限
hi、
是的,大佬感觉用es做维表靠谱吗,这样就不用维护hbase二级索引了(或者用es存hbase二级索引,hbase存数据,但还是要用es充当一个维度)

Jark Wu  于2020年8月7日周五 上午12:34写道:

> 目前社区由一个 issue 在跟进 es source ,可以关注一下:
> https://issues.apache.org/jira/browse/FLINK-16713
> 你想要的时态表查询,是想当成维表查询吗(lookup)?
>
> Best,
> Jark
>
> On Thu, 6 Aug 2020 at 11:20, Dream-底限  wrote:
>
> > hi
> >
> >
> 我们这面想用es做时态表查询,但是flink没有报漏es源连接器,需要自己实现一个,请问大家对es做时态表这件事感觉靠谱吗(ps:之所以不用hbase是因为hbase的rowkey设计以及可能维护的二级索引比较麻烦,但hbase也是一个调研方案)
> >
>


回复:flink-sql-connector-elasticsearch6_2.11_1.10.0 与 flink-connector-elasticsearch6_2.11_1.10.0 并存问题

2020-08-06 文章 JasonLee
Hi


只引入-sql那个包就行了 代码也应该是可以直接用这个包的


| |
JasonLee
|
|
邮箱:17610775...@163.com
|

Signature is customized by Netease Mail Master

在2020年08月07日 09:26,费文杰 写道:

HI:
  
使用的flink1.10环境,因为使用sql-client,我在flink/lib目录下放置了flink-sql-connector-elasticsearch6_2.11_1.10.0.jar,因为同事有直接在代码里写的sink到elasticsearch,所以他引入了flink-connector-elasticsearch6_2.11_1.10.0.jar,
这样就会导致他的任务报错。为了重现问题,我在本机同时引入了这两个依赖包:

 org.apache.httpcomponents
 httpclient
 4.5.2
   

   
 org.elasticsearch.client
 elasticsearch-rest-high-level-client
 ${elasticsearch.version}
   

   
 org.apache.flink
 flink-connector-elasticsearch6_2.11
 ${flink.version}
 
   

   
 org.apache.flink
 flink-sql-connector-elasticsearch6_2.11
 ${flink.version}
 provided
   

本地运行,有以下报错信息:
Caused by: org.apache.flink.client.program.ProgramInvocationException: Job 
failed (JobID: bb3bf961144b75bfc71c9dd3efaf59f9)
at 
org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:112)
at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602)
at 
java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
at 
org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:874)
at akka.dispatch.OnComplete.internal(Future.scala:264)
at akka.dispatch.OnComplete.internal(Future.scala:261)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188)
at scala.concurrent.impl.CallbackRunnable.run$$$capture(Promise.scala:36)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala)
at 
org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:74)
at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:572)
at 
akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:22)
at 
akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21)
at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436)
at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435)
at scala.concurrent.impl.CallbackRunnable.run$$$capture(Promise.scala:36)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala)
at 
akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
at 
akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)
at 
akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
at 
akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution 
failed.
at 
org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147)
at 
org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:110)
... 33 more
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by 
NoRestartBackoffTimeStrategy
at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:110)
at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:76)
at 
org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192)
at 
org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:186)
at 
org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:180)
at 
org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:484)
at 
org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:380)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 

flink-sql-connector-elasticsearch6_2.11_1.10.0 与 flink-connector-elasticsearch6_2.11_1.10.0 并存问题

2020-08-06 文章 费文杰

HI:
   
使用的flink1.10环境,因为使用sql-client,我在flink/lib目录下放置了flink-sql-connector-elasticsearch6_2.11_1.10.0.jar,因为同事有直接在代码里写的sink到elasticsearch,所以他引入了flink-connector-elasticsearch6_2.11_1.10.0.jar,
这样就会导致他的任务报错。为了重现问题,我在本机同时引入了这两个依赖包:

  org.apache.httpcomponents
  httpclient
  4.5.2



  org.elasticsearch.client
  elasticsearch-rest-high-level-client
  ${elasticsearch.version}



  org.apache.flink
  flink-connector-elasticsearch6_2.11
  ${flink.version}
  



  org.apache.flink
  flink-sql-connector-elasticsearch6_2.11
  ${flink.version}
  provided


本地运行,有以下报错信息:
Caused by: org.apache.flink.client.program.ProgramInvocationException: Job 
failed (JobID: bb3bf961144b75bfc71c9dd3efaf59f9)
 at 
org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:112)
 at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602)
 at 
java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
 at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
 at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
 at 
org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:874)
 at akka.dispatch.OnComplete.internal(Future.scala:264)
 at akka.dispatch.OnComplete.internal(Future.scala:261)
 at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191)
 at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188)
 at scala.concurrent.impl.CallbackRunnable.run$$$capture(Promise.scala:36)
 at scala.concurrent.impl.CallbackRunnable.run(Promise.scala)
 at 
org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:74)
 at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
 at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
 at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:572)
 at 
akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:22)
 at 
akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21)
 at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436)
 at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435)
 at scala.concurrent.impl.CallbackRunnable.run$$$capture(Promise.scala:36)
 at scala.concurrent.impl.CallbackRunnable.run(Promise.scala)
 at 
akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
 at 
akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)
 at 
akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
 at 
akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
 at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
 at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)
 at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
 at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
 at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
 at 
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
 at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
 at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution 
failed.
 at 
org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147)
 at 
org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:110)
 ... 33 more
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by 
NoRestartBackoffTimeStrategy
 at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:110)
 at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:76)
 at 
org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192)
 at 
org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:186)
 at 
org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:180)
 at 
org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:484)
 at 
org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:380)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
 at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at 

转发:Sql-client的checkpoint问题

2020-08-06 文章 king


抱歉,不是flink-site.yaml是flink-conf.yaml
| |
king
|
|
邮箱:kingjinhe2...@163.com
|

Signature is customized by Netease Mail Master

- 转发的邮件 -

发件人: king
发送日期: 2020年08月07日 08:23
收件人: user-zh
主题: Sql-client的checkpoint问题


您好,flink1.11.0,请问,
1.sql-client 如何设置checkpoint时间(生成周期),在做file 
streaming时候hdfs文件一直In-progress处状态,不能Finalized
2.之前在flink-site.yaml文件中设置了checkpoint,systemfile类型,只生成了另外两个目录,没有生成chk,望解答,谢谢


以上问题在编程方式下无问题。
| |
king
|
|
邮箱:kingjinhe2...@163.com
|

Signature is customized by Netease Mail Master

Re: stream sink hive 在hdfs ha模式下

2020-08-06 文章 abc15606
可以在flink-conf.yaml中加入 env.hadoop.conf.dir: 指向hadooo配置文件目录

发自我的iPhone

> 在 2020年8月7日,00:16,Jark Wu  写道:
> 
> 可以提供下完整的异常栈吗?以及使用的版本号和代码。
> 
>> On Mon, 3 Aug 2020 at 13:32, air23  wrote:
>> 
>> hi 你好
>>  我这边集群是cdh的。 配置了hdfs ha模式
>> 在使用 kafka sink 到hive 时候找不到nameservices
>> java.lang.IllegalArgumentException: java.net.UnknownHostException:
>> nameservices1
>> 
>> 
>> 请问 在ha模式下 应该怎么配置



Sql-client的checkpoint问题

2020-08-06 文章 king


您好,flink1.11.0,请问,
1.sql-client 如何设置checkpoint时间(生成周期),在做file 
streaming时候hdfs文件一直In-progress处状态,不能Finalized
2.之前在flink-site.yaml文件中设置了checkpoint,systemfile类型,只生成了另外两个目录,没有生成chk,望解答,谢谢


以上问题在编程方式下无问题。
| |
king
|
|
邮箱:kingjinhe2...@163.com
|

Signature is customized by Netease Mail Master

Re: flink sql 添加全局唯一id 字段

2020-08-06 文章 Jark Wu
可以依赖外部的全局唯一自增 id 服务,包装成一个 udf 在 flink sql 中使用。
在 flink sql 中,目前好像还没法提供这种功能。

Best,
Jark

On Thu, 6 Aug 2020 at 18:27, drewfranklin  wrote:

> Hello all . 想请教下。flink sql 想添加一个全局唯一的自增数据,有什么好的方法吗?
>
>


Re: flink1.11 es connector

2020-08-06 文章 Jark Wu
目前社区由一个 issue 在跟进 es source ,可以关注一下:
https://issues.apache.org/jira/browse/FLINK-16713
你想要的时态表查询,是想当成维表查询吗(lookup)?

Best,
Jark

On Thu, 6 Aug 2020 at 11:20, Dream-底限  wrote:

> hi
>
> 我们这面想用es做时态表查询,但是flink没有报漏es源连接器,需要自己实现一个,请问大家对es做时态表这件事感觉靠谱吗(ps:之所以不用hbase是因为hbase的rowkey设计以及可能维护的二级索引比较麻烦,但hbase也是一个调研方案)
>


Re: Behavior for flink job running on K8S failed after restart strategy exhausted

2020-08-06 文章 Eleanore Jin
Hi Yang,

Thanks a lot for the information!

Eleanore

On Thu, Aug 6, 2020 at 4:20 AM Yang Wang  wrote:

> Hi Eleanore,
>
> From my experience, collecting the Flink metrics to prometheus via metrics
> collector is a more ideal way. It is
> also easier to configure the alert.
> Maybe you could use "fullRestarts" or "numRestarts" to monitor the job
> restarting. More metrics could be find
> here[2].
>
> [1].
> https://ci.apache.org/projects/flink/flink-docs-master/monitoring/metrics.html#prometheus-orgapacheflinkmetricsprometheusprometheusreporter
> [2].
> https://ci.apache.org/projects/flink/flink-docs-master/monitoring/metrics.html#availability
>
> Best,
> Yang
>
> Eleanore Jin  于2020年8月5日周三 下午11:52写道:
>
>> Hi Yang and Till,
>>
>> Thanks a lot for the help! I have the similar question as Till mentioned,
>> if we do not fail Flink pods when the restart strategy is exhausted, it
>> might be hard to monitor such failures. Today I get alerts if the k8s pods
>> are restarted or in crash loop, but if this will no longer be the case, how
>> can we deal with the monitoring? In production, I have hundreds of small
>> flink jobs running (2-8 TM pods) doing stateless processing, it is really
>> hard for us to expose ingress for each JM rest endpoint to periodically
>> query the job status for each flink job.
>>
>> Thanks a lot!
>> Eleanore
>>
>> On Wed, Aug 5, 2020 at 4:56 AM Till Rohrmann 
>> wrote:
>>
>>> You are right Yang Wang.
>>>
>>> Thanks for creating this issue.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Wed, Aug 5, 2020 at 1:33 PM Yang Wang  wrote:
>>>
 Actually, the application status shows in YARN web UI is not determined
 by the jobmanager process exit code.
 Instead, we use "resourceManagerClient.unregisterApplicationMaster" to
 control the final status of YARN application.
 So although jobmanager exit with zero code, it still could show failed
 status in YARN web UI.

 I have created a ticket to track this improvement[1].

 [1]. https://issues.apache.org/jira/browse/FLINK-18828


 Best,
 Yang


 Till Rohrmann  于2020年8月5日周三 下午3:56写道:

> Yes for the other deployments it is not a problem. A reason why people
> preferred non-zero exit codes in case of FAILED jobs is that this is 
> easier
> to monitor than having to take a look at the actual job result. Moreover,
> in the YARN web UI the application shows as failed if I am not mistaken.
> However, from a framework's perspective, a FAILED job does not mean that
> Flink has failed and, hence, the return code could still be 0 in my 
> opinion.
>
> Cheers,
> Till
>
> On Wed, Aug 5, 2020 at 9:30 AM Yang Wang 
> wrote:
>
>> Hi Eleanore,
>>
>> Yes, I suggest to use Job to replace Deployment. It could be used
>> to run jobmanager one time and finish after a successful/failed 
>> completion.
>>
>> However, using Job still could not solve your problem completely.
>> Just as Till said, When a job exhausts the restart strategy, the 
>> jobmanager
>> pod will terminate with non-zero exit code. It will cause the K8s
>> restarting it again. Even though we could set the resartPolicy and
>> backoffLimit,
>> this is not a clean and correct way to go. We should terminate the
>> jobmanager process with zero exit code in such situation.
>>
>> @Till Rohrmann  I just have one concern. Is it
>> a special case for K8s deployment? For standalone/Yarn/Mesos, it seems 
>> that
>> terminating with
>> non-zero exit code is harmless.
>>
>>
>> Best,
>> Yang
>>
>> Eleanore Jin  于2020年8月4日周二 下午11:54写道:
>>
>>> Hi Yang & Till,
>>>
>>> Thanks for your prompt reply!
>>>
>>> Yang, regarding your question, I am actually not using k8s job, as I
>>> put my app.jar and its dependencies under flink's lib directory. I have 
>>> 1
>>> k8s deployment for job manager, and 1 k8s deployment for task manager, 
>>> and
>>> 1 k8s service for job manager.
>>>
>>> As you mentioned above, if flink job is marked as failed, it will
>>> cause the job manager pod to be restarted. Which is not the ideal
>>> behavior.
>>>
>>> Do you suggest that I should change the deployment strategy from
>>> using k8s deployment to k8s job? In case the flink program exit with
>>> non-zero code (e.g. exhausted number of configured restart), pod can be
>>> marked as complete hence not restarting the job again?
>>>
>>> Thanks a lot!
>>> Eleanore
>>>
>>> On Tue, Aug 4, 2020 at 2:49 AM Yang Wang 
>>> wrote:
>>>
 @Till Rohrmann  In native mode, when a Flink
 application terminates with FAILED state, all the resources will be 
 cleaned
 up.

 However, in standalone mode, I agree with you that we need to
 rethink the exit code of Flink. When a job 

Re: flink table api 中数据库字段大小写问题

2020-08-06 文章 Jark Wu
Hi,你的 alarm_history_data 表的 postgres 里面的 schema是 public 么?
如果不是的话,你需要显式地把 schema 名字声明到表名上,例如 schema 为 sch1,那么 FlinkSQL 里需要定义成

CREATE TABLE `sch1.alarm_history_data` (
...
) with (...);

select * from `sch1.alarm_history_data`;

Best,
Jark


On Tue, 4 Aug 2020 at 14:58, lgs <9925...@qq.com> wrote:

> Hi,
> postgres字段包含大小写。
>postgres_sink = """
> CREATE TABLE alarm_history_data (
> `recordId` STRING,
> `rowtime`  TIMESTAMP(3),
> `action`   STRING,
> `originalState`STRING,
> `newState` STRING,
> `originalCause`STRING,
> `newCause` STRING,
> `ser_name` STRING,
> `enb`  STRING,
> `eventTime`STRING,
> `ceasedTime`   STRING,
> `duration` STRING,
> `acked`STRING,
> `pmdId`STRING,
> `pmdTime`  STRING,
>  PRIMARY KEY (`recordId`) NOT ENFORCED
> ) WITH (
> 'connector.type' = 'jdbc',
> 'connector.url' = 'jdbc:postgresql://127.0.0.1:5432/postgres',
> 'connector.table' = 'alarm_history_data',
> 'connector.driver' = 'org.postgresql.Driver',
> 'connector.username' = 'postgres',
> 'connector.password' = 'my_password',
> 'connector.write.flush.max-rows' = '1'
> )
> """
>
> st_env.scan("source").group_by("recordId").select(
> "recordId,"
> "last_tvalue(actionTime) as rowtime, last_value(action),"
> "last_value(originalState) as originalState, last_value(newState),"
> "last_value(originalCause), last_value(newCause),"
> "last_value(ser_name), last_value(enb), last_value(eventTime),"
> "last_value(ceasedTime), last_value(duration), last_value(acked),"
> "last_value(pmdId), last_value(pmdTime)"
> ).insert_into("alarm_history_data")
>
> sink出错,报错是:
> Caused by: java.sql.BatchUpdateException: Batch entry 0 INSERT INTO
> alarm_history_data(recordId, rowtime, action, originalState, newState,
> originalCause, newCause, ser_name, enb, eventTime, ceasedTime, duration,
> acked, pmdId, pmdTime) VALUES ('47357607', '2020-06-03 17:37:44+08',
> 'Insert', '', 'cleared', '', 'crash', 'Oyama_ENM_MS',
> '789198-houshakuzi-RBS6302', '2020-06-03T17:24:57', '2020-06-03T17:29:50',
> '293.0', 'No', '0x8002', '2020-06-03T17:22:46') ON CONFLICT (recordId)
> DO UPDATE SET recordId=EXCLUDED.recordId, rowtime=EXCLUDED.rowtime,
> action=EXCLUDED.action, originalState=EXCLUDED.originalState,
> newState=EXCLUDED.newState, originalCause=EXCLUDED.originalCause,
> newCause=EXCLUDED.newCause, ser_name=EXCLUDED.ser_name, enb=EXCLUDED.enb,
> eventTime=EXCLUDED.eventTime, ceasedTime=EXCLUDED.ceasedTime,
> duration=EXCLUDED.duration, acked=EXCLUDED.acked, pmdId=EXCLUDED.pmdId,
> pmdTime=EXCLUDED.pmdTime was aborted: ERROR: column "recordid" of relation
> "alarm_history_data" does not exist
>
> 请问要怎么解决?要怎样才能在最终的sql语句里面加个引号把字段包起来?
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: stream sink hive 在hdfs ha模式下

2020-08-06 文章 Jark Wu
可以提供下完整的异常栈吗?以及使用的版本号和代码。

On Mon, 3 Aug 2020 at 13:32, air23  wrote:

> hi 你好
>   我这边集群是cdh的。 配置了hdfs ha模式
>  在使用 kafka sink 到hive 时候找不到nameservices
> java.lang.IllegalArgumentException: java.net.UnknownHostException:
> nameservices1
>
>
> 请问 在ha模式下 应该怎么配置


Re: 请教:时间属性字段传递问题,有办法解决吗?

2020-08-06 文章 Benchao Li
可以参考这里的文档[1]

[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#selecting-group-window-start-and-end-timestamps

kcz <573693...@qq.com> 于2020年8月6日周四 下午11:04写道:

> 可以再解释一下吗?还是没有看太懂是哪里出错,以及为什么那样就可以解决问题。
>
>
>
>
>
> -- 原始邮件 --
> 发件人: Tianwang Li  发送时间: 2020年8月6日 21:40
> 收件人: user-zh  主题: 回复:请教:时间属性字段传递问题,有办法解决吗?
>
>
>
> 知道了,使用TUMBLE_ROWTIME HOP_ROWTIME SESSION_ROWTIME
> 可以传递了。
>
>
> Tianwang Li 
> 
>  我向做两次的窗口计算。
>  1、第一次5分钟窗口。
>  2、第二次10分钟窗口,使用的上一次窗口的结果。
> 
> 
>  我尝试了发生了异常, Window aggregate can only be defined over a time attribute
>  column, but TIMESTAMP(3) encountered.
> 
>  请问有什么办法可以解决吗?
>  我希望是一个窗口计算后面可以再接一个窗口计算。
> 
> 
>  第一次计算:
> 
>  CREATE VIEW tmp_5min AS
>  SELECT
> 
>  max(rowtime) as rowtime,
>  TUMBLE_START(`rowtime`, INTERVAL '5' minute) AS
> window_start,
> 
>  user_group,
>  COUNT(*) AS cnt
>  FROM user_behavior
> 
>  GROUP BY
> 
>  TUMBLE(`rowtime`, INTERVAL '5' minute),
> 
>  user_group
> 
>  第二次计算:
> 
>  CREATE VIEW tmp_10min AS
>  SELECT
> 
>  max(rowtime) as rowtime,
>  TUMBLE_START(`rowtime`, INTERVAL '10' minute) AS
> window_start,
> 
>  user_group,
>  SUM(cnt) AS sum_cnt
>  FROM tmp_5min
> 
>  GROUP BY
> 
>  TUMBLE(`rowtime`, INTERVAL '10' minute),
> 
>  user_group
> 
> 
> 
>  --
>  **
>  tivan
>  **
> 
>
>
> --
> **
> tivanli
> **



-- 

Best,
Benchao Li


回复:请教:时间属性字段传递问题,有办法解决吗?

2020-08-06 文章 kcz
可以再解释一下吗?还是没有看太懂是哪里出错,以及为什么那样就可以解决问题。





-- 原始邮件 --
发件人: Tianwang Li 

Re: flink timerservice注册的timer定时器只有少部分触发

2020-08-06 文章 jsqf
试试 重写 onTimer 方法 
可以参考
https://github.com/JSQF/flink10_learn/blob/master/src/main/scala/com/yyb/flink10/DataStream/ProcessFunction/OperatorProcessFunctionDemo.java
 




--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: 请教:时间属性字段传递问题,有办法解决吗?

2020-08-06 文章 Tianwang Li
知道了,使用TUMBLE_ROWTIME HOP_ROWTIME SESSION_ROWTIME
 可以传递了。


Tianwang Li  于2020年8月6日周四 下午9:12写道:

>
> 我向做两次的窗口计算。
> 1、第一次5分钟窗口。
> 2、第二次10分钟窗口,使用的上一次窗口的结果。
>
>
> 我尝试了发生了异常, Window aggregate can only be defined over a time attribute
> column, but TIMESTAMP(3) encountered.
>
> 请问有什么办法可以解决吗?
> 我希望是一个窗口计算后面可以再接一个窗口计算。
>
>
> 第一次计算:
>
> CREATE VIEW tmp_5min AS
> SELECT
>
>   max(rowtime) as rowtime,
>   TUMBLE_START(`rowtime`, INTERVAL '5' minute) AS window_start,
>
>   user_group,
>   COUNT(*) AS cnt
> FROM user_behavior
>
> GROUP BY
>
> TUMBLE(`rowtime`, INTERVAL '5' minute),
>
> user_group
>
> 第二次计算:
>
> CREATE VIEW tmp_10min AS
> SELECT
>
>   max(rowtime) as rowtime,
>   TUMBLE_START(`rowtime`, INTERVAL '10' minute) AS window_start,
>
>   user_group,
>   SUM(cnt) AS sum_cnt
> FROM tmp_5min
>
> GROUP BY
>
> TUMBLE(`rowtime`, INTERVAL '10' minute),
>
> user_group
>
>
>
> --
> **
> tivan
> **
>


-- 
**
 tivanli
**


请教:时间属性字段传递问题,有办法解决吗?

2020-08-06 文章 Tianwang Li
我向做两次的窗口计算。
1、第一次5分钟窗口。
2、第二次10分钟窗口,使用的上一次窗口的结果。


我尝试了发生了异常, Window aggregate can only be defined over a time attribute
column, but TIMESTAMP(3) encountered.

请问有什么办法可以解决吗?
我希望是一个窗口计算后面可以再接一个窗口计算。


第一次计算:

CREATE VIEW tmp_5min AS
SELECT

  max(rowtime) as rowtime,
  TUMBLE_START(`rowtime`, INTERVAL '5' minute) AS window_start,

  user_group,
  COUNT(*) AS cnt
FROM user_behavior

GROUP BY

TUMBLE(`rowtime`, INTERVAL '5' minute),

user_group

第二次计算:

CREATE VIEW tmp_10min AS
SELECT

  max(rowtime) as rowtime,
  TUMBLE_START(`rowtime`, INTERVAL '10' minute) AS window_start,

  user_group,
  SUM(cnt) AS sum_cnt
FROM tmp_5min

GROUP BY

TUMBLE(`rowtime`, INTERVAL '10' minute),

user_group



-- 
**
tivan
**


Re: Behavior for flink job running on K8S failed after restart strategy exhausted

2020-08-06 文章 Yang Wang
Hi Eleanore,

>From my experience, collecting the Flink metrics to prometheus via metrics
collector is a more ideal way. It is
also easier to configure the alert.
Maybe you could use "fullRestarts" or "numRestarts" to monitor the job
restarting. More metrics could be find
here[2].

[1].
https://ci.apache.org/projects/flink/flink-docs-master/monitoring/metrics.html#prometheus-orgapacheflinkmetricsprometheusprometheusreporter
[2].
https://ci.apache.org/projects/flink/flink-docs-master/monitoring/metrics.html#availability

Best,
Yang

Eleanore Jin  于2020年8月5日周三 下午11:52写道:

> Hi Yang and Till,
>
> Thanks a lot for the help! I have the similar question as Till mentioned,
> if we do not fail Flink pods when the restart strategy is exhausted, it
> might be hard to monitor such failures. Today I get alerts if the k8s pods
> are restarted or in crash loop, but if this will no longer be the case, how
> can we deal with the monitoring? In production, I have hundreds of small
> flink jobs running (2-8 TM pods) doing stateless processing, it is really
> hard for us to expose ingress for each JM rest endpoint to periodically
> query the job status for each flink job.
>
> Thanks a lot!
> Eleanore
>
> On Wed, Aug 5, 2020 at 4:56 AM Till Rohrmann  wrote:
>
>> You are right Yang Wang.
>>
>> Thanks for creating this issue.
>>
>> Cheers,
>> Till
>>
>> On Wed, Aug 5, 2020 at 1:33 PM Yang Wang  wrote:
>>
>>> Actually, the application status shows in YARN web UI is not determined
>>> by the jobmanager process exit code.
>>> Instead, we use "resourceManagerClient.unregisterApplicationMaster" to
>>> control the final status of YARN application.
>>> So although jobmanager exit with zero code, it still could show failed
>>> status in YARN web UI.
>>>
>>> I have created a ticket to track this improvement[1].
>>>
>>> [1]. https://issues.apache.org/jira/browse/FLINK-18828
>>>
>>>
>>> Best,
>>> Yang
>>>
>>>
>>> Till Rohrmann  于2020年8月5日周三 下午3:56写道:
>>>
 Yes for the other deployments it is not a problem. A reason why people
 preferred non-zero exit codes in case of FAILED jobs is that this is easier
 to monitor than having to take a look at the actual job result. Moreover,
 in the YARN web UI the application shows as failed if I am not mistaken.
 However, from a framework's perspective, a FAILED job does not mean that
 Flink has failed and, hence, the return code could still be 0 in my 
 opinion.

 Cheers,
 Till

 On Wed, Aug 5, 2020 at 9:30 AM Yang Wang  wrote:

> Hi Eleanore,
>
> Yes, I suggest to use Job to replace Deployment. It could be used
> to run jobmanager one time and finish after a successful/failed 
> completion.
>
> However, using Job still could not solve your problem completely. Just
> as Till said, When a job exhausts the restart strategy, the jobmanager
> pod will terminate with non-zero exit code. It will cause the K8s
> restarting it again. Even though we could set the resartPolicy and
> backoffLimit,
> this is not a clean and correct way to go. We should terminate the
> jobmanager process with zero exit code in such situation.
>
> @Till Rohrmann  I just have one concern. Is it
> a special case for K8s deployment? For standalone/Yarn/Mesos, it seems 
> that
> terminating with
> non-zero exit code is harmless.
>
>
> Best,
> Yang
>
> Eleanore Jin  于2020年8月4日周二 下午11:54写道:
>
>> Hi Yang & Till,
>>
>> Thanks for your prompt reply!
>>
>> Yang, regarding your question, I am actually not using k8s job, as I
>> put my app.jar and its dependencies under flink's lib directory. I have 1
>> k8s deployment for job manager, and 1 k8s deployment for task manager, 
>> and
>> 1 k8s service for job manager.
>>
>> As you mentioned above, if flink job is marked as failed, it will
>> cause the job manager pod to be restarted. Which is not the ideal
>> behavior.
>>
>> Do you suggest that I should change the deployment strategy from
>> using k8s deployment to k8s job? In case the flink program exit with
>> non-zero code (e.g. exhausted number of configured restart), pod can be
>> marked as complete hence not restarting the job again?
>>
>> Thanks a lot!
>> Eleanore
>>
>> On Tue, Aug 4, 2020 at 2:49 AM Yang Wang 
>> wrote:
>>
>>> @Till Rohrmann  In native mode, when a Flink
>>> application terminates with FAILED state, all the resources will be 
>>> cleaned
>>> up.
>>>
>>> However, in standalone mode, I agree with you that we need to
>>> rethink the exit code of Flink. When a job exhausts the restart
>>> strategy, we should terminate the pod and do not restart again.
>>> After googling, it seems that we could not specify the restartPolicy
>>> based on exit code[1]. So maybe we need to return a zero exit code
>>> to avoid restarting by K8s.
>>>

flink sql 添加全局唯一id 字段

2020-08-06 文章 drewfranklin
Hello all . 想请教下。flink sql 想添加一个全局唯一的自增数据,有什么好的方法吗?



Re: flink timerservice注册的timer定时器只有少部分触发

2020-08-06 文章 shizk233
具体原因不太清楚,但建议使用context.timeService().currentEventTime()和currentProcessingTime()来获取当前的时间。

排查方法的话,不知道你有没有做算子的单元测试,如果还没有的话可以通过flink test util[1][2]做单元测试来debug排查,
可以比较明确的观察到timeService上的Timer状态。

[1]
https://flink.apache.org/news/2020/02/07/a-guide-for-unit-testing-in-apache-flink.html
[2]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html

钟昊(zhonghao01)-商业智能部  于2020年8月6日周四 下午5:31写道:

> hi,everyone:
>
>
> 由于业务需求,需要对一条流中的数据分4个步骤处理,处理完第一步后,再过15分钟进行第二步,第二步完成后再过30分钟处理第三步,然后60分钟后进行第四步,我这边通过timerservice设置定时器的方式进行的实现,但是运行过程中发现第二步15分钟的定时器只有非常少部分消息到时间触发了定时任务onTimer中的逻辑,部分代码实现如下:
>
>
> DataStream dataStream = env.addSource(…);
> final OutputTag> missPingback = …;
> final OutputTag> level1Tag = …;
> final OutputTag> level2Tag = …;
> final OutputTag> level3Tag = …;
>
> SingleOutputStreamOperator missDs = dataStream
> .map(…)
> .filter(…)
> .assignTimestampsAndWatermarks(new AssignWaterMark())
> .keyBy(0)
> .timeWindow(Time.seconds(winSize))
> .process(new BatchMergeProcessFunction(missPingback));
>
> SingleOutputStreamOperator level1MissedDs =
>  missDs
> .getSideOutput(missPingback)
> .keyBy(0)
> // **
> 这一步中,处理类中15分钟的定时器只有非常少部分数据的定时器timer触发了,触发比例10%不到。 
> .process(new HbaseTimerProcessFunc(level1Tag, 15));
>
> SingleOutputStreamOperator level2MissedDs =
>  level1MissedDs
> .keyBy(0)
> .process(new HbaseTimerProcessFunc(level2Tag, 30));
>
> SingleOutputStreamOperator level3MissedDs =
>  level2MissedDs
> .keyBy(0)
> .process(new HbaseTimerProcessFunc(level3Tag, 60));
> DataStream l1Ds =
> level1MissedDs.getSideOutput(level1Tag);
> DataStream l2Ds =
> level2MissedDs.getSideOutput(level2Tag);
> DataStream l3Ds =
> level3MissedDs.getSideOutput(level3Tag);
>
> missDs.union(l1Ds).union(l2Ds).union(l3Ds).addSink(…);
>
>
> ———
>  —> HbaseTimerProcessFunc.class 中定时器设置方法
> @Override
> public void processElement(SimplifiedPingbackMsg value, Context context,
> Collector out) throws Exception {
> long ts = System.currentTimeMillis() + 60 * 1000 * timeout;
> pbState.put(value.getEventId() + ts, value);
> context.timerService().registerEventTimeTimer(ts);
> MetricModifyUtils.modifyPerDay(timerTotalCounter, 1);
> }
> ———
>
> …
>
> // 水印提取类,类似于直接用了ProcessTime来提取的
> class AssignWaterMark implements
> AssignerWithPeriodicWatermarks> {
>
>  private long maxOutOfOrderness = 60 * 1000 * 5;
>  private long currentMaxTimestamp = 0L;
>
>  @Nullable
>  @Override
>  public Watermark getCurrentWatermark() {
>  return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
>  }
>
>  @Override
>  public long extractTimestamp(Tuple2
> element, long previousElementTimestamp) {
>  long timestamp = System.currentTimeMillis();
>  currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
>  return timestamp;
>  }
> }
>
>
>
> 所以为啥这里的实现会出现此类情况呢,这种情况该如何去排查为啥会有绝大多数timer不触发呢,我看了webui中水印生成是正常的。
>
>


flink timerservice注册的timer定时器只有少部分触发

2020-08-06 文章 钟昊(zhonghao01)-商业智能部
hi,everyone:

由于业务需求,需要对一条流中的数据分4个步骤处理,处理完第一步后,再过15分钟进行第二步,第二步完成后再过30分钟处理第三步,然后60分钟后进行第四步,我这边通过timerservice设置定时器的方式进行的实现,但是运行过程中发现第二步15分钟的定时器只有非常少部分消息到时间触发了定时任务onTimer中的逻辑,部分代码实现如下:


DataStream dataStream = env.addSource(…);
final OutputTag> missPingback = …;
final OutputTag> level1Tag = …;
final OutputTag> level2Tag = …;
final OutputTag> level3Tag = …;

SingleOutputStreamOperator missDs = dataStream
.map(…)
.filter(…)
.assignTimestampsAndWatermarks(new AssignWaterMark())
.keyBy(0)
.timeWindow(Time.seconds(winSize))
.process(new BatchMergeProcessFunction(missPingback));

SingleOutputStreamOperator level1MissedDs =
 missDs
.getSideOutput(missPingback)
.keyBy(0)
// ** 这一步中,处理类中15分钟的定时器只有非常少部分数据的定时器timer触发了,触发比例10%不到。 

.process(new HbaseTimerProcessFunc(level1Tag, 15));

SingleOutputStreamOperator level2MissedDs =
 level1MissedDs
.keyBy(0)
.process(new HbaseTimerProcessFunc(level2Tag, 30));

SingleOutputStreamOperator level3MissedDs =
 level2MissedDs
.keyBy(0)
.process(new HbaseTimerProcessFunc(level3Tag, 60));
DataStream l1Ds = level1MissedDs.getSideOutput(level1Tag);
DataStream l2Ds = level2MissedDs.getSideOutput(level2Tag);
DataStream l3Ds = level3MissedDs.getSideOutput(level3Tag);

missDs.union(l1Ds).union(l2Ds).union(l3Ds).addSink(…);


———
 —> HbaseTimerProcessFunc.class 中定时器设置方法
@Override
public void processElement(SimplifiedPingbackMsg value, Context context, 
Collector out) throws Exception {
long ts = System.currentTimeMillis() + 60 * 1000 * timeout;
pbState.put(value.getEventId() + ts, value);
context.timerService().registerEventTimeTimer(ts);
MetricModifyUtils.modifyPerDay(timerTotalCounter, 1);
}
———

…

// 水印提取类,类似于直接用了ProcessTime来提取的
class AssignWaterMark implements AssignerWithPeriodicWatermarks> {

 private long maxOutOfOrderness = 60 * 1000 * 5;
 private long currentMaxTimestamp = 0L;

 @Nullable
 @Override
 public Watermark getCurrentWatermark() {
 return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
 }

 @Override
 public long extractTimestamp(Tuple2 
element, long previousElementTimestamp) {
 long timestamp = System.currentTimeMillis();
 currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
 return timestamp;
 }
}



所以为啥这里的实现会出现此类情况呢,这种情况该如何去排查为啥会有绝大多数timer不触发呢,我看了webui中水印生成是正常的。



flink-OOME_Java heap space

2020-08-06 文章 Roc Marshal
Hi, all.
请教如下问题。
情景:jdk-oracle-1.8, flink-realse-1.10.0. 
flink-on-yarn的session模式。数据读取kafka.进行sql运算。
JVM Heap Size:638 MB
Flink Managed Memory:635 MB,出现异常如下。
  statebackend为filesystem->hadoop
   任务直接从deploying->feailed.
 其他参考信息如下图片。
 可以给些建议吗?
 谢谢。





Re: flink1.10.1/1.11.1 使用sql 进行group 和 时间窗口 操作后 状态越来越大

2020-08-06 文章 Yu Li
看到生产上使用的还是1.8.2版本,请问同样的作业使用1.8.2的表现是怎样的?

Best Regards,
Yu


On Thu, 6 Aug 2020 at 16:29, op <520075...@qq.com> wrote:

> 感谢回答
> 我之前用1.10也有同样的问题
>
>
>
>
> --原始邮件--
> 发件人:
>   "user-zh"
> <
> car...@gmail.com;
> 发送时间:2020年8月6日(星期四) 下午4:01
> 收件人:"user-zh"
> 主题:Re: flink1.10.1/1.11.1 使用sql 进行group 和 时间窗口 操作后 状态越来越大
>
>
>
> @鱼子酱
>
> 请问同样的作业,都使用RocksDB增量checkpoint,在1.8.2版本和1.11.1版本下的表现是否一致?还是说只有1.11.1版本下增量大小会单调增加?
>
> @op 类似的问题,请问使用FsStateBackend,是否在不同Flink版本下测试过?表现是否一致?
>
> 上述问题主要想确认一下新版本的表现和旧版本是否一致,如果不一致则有可能是新版本中引入的bug。谢谢。
>
> Best Regards,
> Yu
>
>
> On Thu, 6 Aug 2020 at 13:52, Congxian Qiu  wrote:
>
>  Hi
>  我这边没有看到相关的附件,不确定是邮件客户端的问题还是其他什么,你那边能否再确认下 附件
> 的发送情况呢?
> 
>  Best,
>  Congxian
> 
> 
>  op <520075...@qq.com 于2020年8月6日周四 上午10:36写道:
> 
>   感谢 , 截图和配置在附件里面
>   我试试配置 RocksDB StateBackend
>  
>  
>   -- 原始邮件 --
>   *发件人:* "user-zh"*发送时间:* 2020年8月5日(星期三) 下午5:43
>   *收件人:* "user-zh"   *主题:* Re: flink1.10.1/1.11.1 使用sql 进行group 和 时间窗口 操作后 状态越来越大
>  
>   Hi
>   RocksDB StateBackend 只需要在 flink-conf 中进行一下配置就行了[1].
>  
>   另外从你前面两份邮件看,我有些信息比较疑惑,你能否贴一下现在使用的 flink-conf,以及
> checkpoint UI 的截图,以及
>  HDFS
>   上 checkpoint 目录的截图
>  
>   [1]
>  
>  
> 
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/ops/state/state_backends.html#%E8%AE%BE%E7%BD%AE-state-backend
> 
> ;
> 
>   Best,
>   Congxian
>  
>  
>   op <520075...@qq.com 于2020年8月5日周三 下午4:03写道:
>  
>你好,ttl配置是
>val settings =
>   EnvironmentSettings.newInstance().inStreamingMode().build()
>val tableEnv = StreamTableEnvironment.create(bsEnv,
> settings)
>val tConfig = tableEnv.getConfig
>tConfig.setIdleStateRetentionTime(Time.minutes(1440),
>  Time.minutes(1450))
>   
>   
>nbsp; nbsp; 1)目前是有3个任务都是这种情况
>nbsp; nbsp; 2)目前集群没有RocksDB环境
>谢谢
>--nbsp;原始邮件nbsp;--
>发件人:
>  
> 
> "user-zh"
>  
> 
> <
>qcx978132...@gmail.comgt;;
>发送时间:nbsp;2020年8月5日(星期三) 下午3:30
>收件人:nbsp;"user-zh"   
>主题:nbsp;Re: flink1.10.1/1.11.1 使用sql 进行group 和 时间窗口
> 操作后 状态越来越大
>   
>   
>   
>Hi op
>nbsp;nbsp; 这个情况比较奇怪。我想确认下:
>nbsp;nbsp; 1)你所有作业都遇到 checkpoint size
> 不断变大的情况,还是只有这个类型的作业遇到这个问题呢?
>nbsp;nbsp; 2)是否尝试过 RocksDBStateBackend
> 呢(全量和增量)?情况如何呢
>   
>nbsp;nbsp; 另外,你 TTL 其他的配置是怎么设置的呢?
>   
>从原理上来说,checkpoint 就是 state 的一个快照,如果 checkpoint 越来越大,那么就是
> state 越来越多。
>Best,
>Congxian
>   
>   
>op <520075...@qq.comgt; 于2020年8月5日周三 下午2:46写道:
>   
>gt; amp;nbsp; amp;nbsp;
>gt;
>   
>  
> 
> 你好,我使用的是FsStateBackendamp;nbsp;状态后端,调到5分钟也是一样,看了下checkpoint花费的时间都在300ms左右,我们的业务数据量每天基本一样,
>gt; amp;nbsp;
>   
> amp;nbsp;设置空闲状态清理时间为1440minute,按道理运行一天以后状态大小会趋于平稳,但是目前运行了5天,
>gt; amp;nbsp; amp;nbsp;观察到的checkpoint shared
> 目录大小一直在增加,也确认过group
>gt; by的key只会在处理当天出现,就是说这天的状态当天过后就会处于空闲状态,
>gt; amp;nbsp; amp;nbsp;运行5天能满足清理条件
>gt;
>gt;
>gt;
>gt;
>gt; -- 原始邮件 --
>gt; 发件人:
>   
>  
> 
> gt;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;
>"user-zh"
>   
>  
> 
> gt;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;
><
>gt; qcx978132...@gmail.comamp;gt;;
>gt; 发送时间:amp;nbsp;2020年8月3日(星期一) 下午5:50
>gt; 收件人:amp;nbsp;"user-zh"<
> user-zh@flink.apache.orgamp;gt;;
>gt;
>gt; 主题:amp;nbsp;Re: flink1.10.1/1.11.1 使用sql
> 进行group 和 时间窗口 操作后
>  状态越来越大
>gt;
>gt;
>gt;
>gt; Hi
>gt; amp;nbsp;amp;nbsp; 能否把 checkpoint 的
> interval 调长一点再看看是否稳定呢?从
>  shared
>gt; 目录的数据量看,有增长,后续基本持平。现在
>gt; Checkpointed Data Size 是增量的大小[1],而不是整个 checkpoint
> 的数据量的大小,如果
>checkpoint
>gt; 之间,数据改动很多的话,这个值会变大
>gt;
>gt; [1]
>gt;
>gt;
>   
>  
> 
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/ops/state/state_backends.html#%E5%A2%9E%E9%87%8F%E5%BF%AB%E7%85%A7
> 
> ;
>   gt
><
>  
> 
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/ops/state/state_backends.html#%E5%A2%9E%E9%87%8F%E5%BF%AB%E7%85%A7gt
> 
> 

?????? flink1.10.1/1.11.1 ????sql ????group ?? ???????? ?????? ????????????

2020-08-06 文章 op

1.10??




----
??: 
   "user-zh"

https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/ops/state/state_backends.html#%E8%AE%BE%E7%BD%AE-state-backend
 
  Best,
  Congxian
 
 
  op <520075...@qq.com ??2020??8??5?? 4:03??
 
   ??ttl??
   val settings =
  EnvironmentSettings.newInstance().inStreamingMode().build()
   val tableEnv = StreamTableEnvironment.create(bsEnv, settings)
   val tConfig = tableEnv.getConfig
   tConfig.setIdleStateRetentionTime(Time.minutes(1440),
 Time.minutes(1450))
  
  
   nbsp; nbsp; 1)3??
   nbsp; nbsp; 2)RocksDB
   
   --nbsp;nbsp;--
   ??:
  

 "user-zh"
  

 <
   qcx978132...@gmail.comgt;;
   :nbsp;2020??8??5??(??) 3:30
   ??:nbsp;"user-zh"https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/ops/state/state_backends.html#%E5%A2%9E%E9%87%8F%E5%BF%AB%E7%85%A7
   gt
   <
 
 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/ops/state/state_backends.html#%E5%A2%9E%E9%87%8F%E5%BF%AB%E7%85%A7gt
  ;
   Best,
   gt; Congxian
   gt;
   gt;
   gt; op <520075...@qq.comamp;gt; ??2020??8??3?? 
2:18??
   gt;
   gt; amp;gt; amp;amp;nbsp; amp;amp;nbsp;
   gt; amp;gt;
   
1.11.0hdfscheckpoint??checkpoint3??
   gt; amp;gt; ?? day ?? id groupby
   gt; amp;gt; 
7watermark??
   gt; amp;gt; 
tConfig.setIdleStateRetentionTime(Time.minutes(1440),
   gt; amp;gt; Time.minutes(1440+10))
   gt; amp;gt;
   gt; amp;gt;
   gt; amp;gt;
   gt; amp;gt;
   gt; amp;gt;
   
--amp;amp;nbsp;amp;amp;nbsp;--
   gt; amp;gt; ??:
   gt;
  
 
 
amp;gt;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;
   gt; amp;nbsp; "user-zh"
   gt;
  
 
 
amp;gt;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;
   gt; amp;nbsp; <
   gt; amp;gt; 384939...@qq.comamp;amp;gt;;
   gt; amp;gt; 
:amp;amp;nbsp;2020??8??3??(??) 1:50
   gt; amp;gt; 
??:amp;amp;nbsp;"user-zh"http://apache-flink.147419.n8.nabble.com/file/t793/6.pngamp;amp;gt;
   gt; amp;gt;
   gt; amp;gt; 18??
   gt; amp;gt; <
   
http://apache-flink.147419.n8.nabble.com/file/t793/9.pngamp;amp;gt;
   gt; amp;gt;
   gt; amp;gt; checkpoints??
   gt; amp;gt; <
   
http://apache-flink.147419.n8.nabble.com/file/t793/conf.pngamp;amp;gt
 ;
   gt; amp;gt;
   gt; amp;gt; hdfs??
   gt; amp;gt; <
   
http://apache-flink.147419.n8.nabble.com/file/t793/hdfs.pngamp;amp;gt
 ;
   gt; amp;gt;
   gt; amp;gt; ??
   gt; amp;gt; <
   gt;
  
 
 
http://apache-flink.147419.n8.nabble.com/file/t793/checkpoinsts1.pngamp;amp;gt
   gt
   <
 
 
http://apache-flink.147419.n8.nabble.com/file/t793/checkpoinsts1.pngamp;amp;gtgt
  ;
   ;
   gt; amp;gt;
   gt; amp;gt;
   gt; amp;gt; Congxian Qiu wrote
   gt; amp;gt; amp;amp;gt; 
Hiamp;amp;nbsp;amp;amp;nbsp; ??
   gt; amp;gt;
   
amp;amp;gt;amp;amp;nbsp;amp;amp;nbsp;amp;amp;nbsp;amp;amp;nbsp;
    checkpoint
   gt;  checkpoint
   gt; amp;gt; size ??
   gt; amp;gt; amp;amp;gt; checkpoint  
hdfs ?? ls  checkpoint
  
   gt; amp;gt;
   
amp;amp;gt;amp;amp;nbsp;amp;amp;nbsp;amp;amp;nbsp;amp;amp;nbsp;
   gt; 
??state 
??
   gt; amp;gt; amp;amp;gt;
   gt; amp;gt; amp;amp;gt; Best,
   gt; amp;gt; amp;amp;gt; Congxian
   gt; amp;gt; amp;amp;gt;
   gt; amp;gt; amp;amp;gt;
   gt; amp;gt; amp;amp;gt; ?? <
   gt; amp;gt;
   gt; amp;gt; amp;amp;gt; 384939718@
   gt; amp;gt;
   gt; 

Re: flink1.10.1/1.11.1 使用sql 进行group 和 时间窗口 操作后 状态越来越大

2020-08-06 文章 Yu Li
@鱼子酱
请问同样的作业,都使用RocksDB增量checkpoint,在1.8.2版本和1.11.1版本下的表现是否一致?还是说只有1.11.1版本下增量大小会单调增加?

@op 类似的问题,请问使用FsStateBackend,是否在不同Flink版本下测试过?表现是否一致?

上述问题主要想确认一下新版本的表现和旧版本是否一致,如果不一致则有可能是新版本中引入的bug。谢谢。

Best Regards,
Yu


On Thu, 6 Aug 2020 at 13:52, Congxian Qiu  wrote:

> Hi
> 我这边没有看到相关的附件,不确定是邮件客户端的问题还是其他什么,你那边能否再确认下 附件 的发送情况呢?
>
> Best,
> Congxian
>
>
> op <520075...@qq.com> 于2020年8月6日周四 上午10:36写道:
>
> >感谢 ,  截图和配置在附件里面
> >   我试试配置  RocksDB StateBackend
> >
> >
> > -- 原始邮件 --
> > *发件人:* "user-zh" ;
> > *发送时间:* 2020年8月5日(星期三) 下午5:43
> > *收件人:* "user-zh";
> > *主题:* Re: flink1.10.1/1.11.1 使用sql 进行group 和 时间窗口 操作后 状态越来越大
> >
> > Hi
> >   RocksDB StateBackend 只需要在 flink-conf 中进行一下配置就行了[1].
> >
> >   另外从你前面两份邮件看,我有些信息比较疑惑,你能否贴一下现在使用的 flink-conf,以及 checkpoint UI 的截图,以及
> HDFS
> > 上 checkpoint 目录的截图
> >
> > [1]
> >
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/ops/state/state_backends.html#%E8%AE%BE%E7%BD%AE-state-backend
> >
> > Best,
> > Congxian
> >
> >
> > op <520075...@qq.com> 于2020年8月5日周三 下午4:03写道:
> >
> > > 你好,ttl配置是
> > > val settings =
> > EnvironmentSettings.newInstance().inStreamingMode().build()
> > > val tableEnv = StreamTableEnvironment.create(bsEnv, settings)
> > > val tConfig = tableEnv.getConfig
> > > tConfig.setIdleStateRetentionTime(Time.minutes(1440),
> Time.minutes(1450))
> > >
> > >
> > >   1)目前是有3个任务都是这种情况
> > >   2)目前集群没有RocksDB环境
> > > 谢谢
> > > --原始邮件--
> > > 发件人:
> > >   "user-zh"
> > > <
> > > qcx978132...@gmail.com;
> > > 发送时间:2020年8月5日(星期三) 下午3:30
> > > 收件人:"user-zh" > >
> > > 主题:Re: flink1.10.1/1.11.1 使用sql 进行group 和 时间窗口 操作后 状态越来越大
> > >
> > >
> > >
> > > Hi op
> > >  这个情况比较奇怪。我想确认下:
> > >  1)你所有作业都遇到 checkpoint size 不断变大的情况,还是只有这个类型的作业遇到这个问题呢?
> > >  2)是否尝试过 RocksDBStateBackend 呢(全量和增量)?情况如何呢
> > >
> > >  另外,你 TTL 其他的配置是怎么设置的呢?
> > >
> > > 从原理上来说,checkpoint 就是 state 的一个快照,如果 checkpoint 越来越大,那么就是 state 越来越多。
> > > Best,
> > > Congxian
> > >
> > >
> > > op <520075...@qq.com 于2020年8月5日周三 下午2:46写道:
> > >
> > >  nbsp; nbsp;
> > > 
> > >
> >
> 你好,我使用的是FsStateBackendnbsp;状态后端,调到5分钟也是一样,看了下checkpoint花费的时间都在300ms左右,我们的业务数据量每天基本一样,
> > >  nbsp;
> > > nbsp;设置空闲状态清理时间为1440minute,按道理运行一天以后状态大小会趋于平稳,但是目前运行了5天,
> > >  nbsp; nbsp;观察到的checkpoint shared 目录大小一直在增加,也确认过group
> > >  by的key只会在处理当天出现,就是说这天的状态当天过后就会处于空闲状态,
> > >  nbsp; nbsp;运行5天能满足清理条件
> > > 
> > > 
> > > 
> > > 
> > >  -- 原始邮件 --
> > >  发件人:
> > >
> >
> 
> > > "user-zh"
> > >
> >
> 
> > > <
> > >  qcx978132...@gmail.comgt;;
> > >  发送时间:nbsp;2020年8月3日(星期一) 下午5:50
> > >  收件人:nbsp;"user-zh" > > 
> > >  主题:nbsp;Re: flink1.10.1/1.11.1 使用sql 进行group 和 时间窗口 操作后
> 状态越来越大
> > > 
> > > 
> > > 
> > >  Hi
> > >  nbsp;nbsp; 能否把 checkpoint 的 interval 调长一点再看看是否稳定呢?从
> shared
> > >  目录的数据量看,有增长,后续基本持平。现在
> > >  Checkpointed Data Size 是增量的大小[1],而不是整个 checkpoint 的数据量的大小,如果
> > > checkpoint
> > >  之间,数据改动很多的话,这个值会变大
> > > 
> > >  [1]
> > > 
> > > 
> > >
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/ops/state/state_backends.html#%E5%A2%9E%E9%87%8F%E5%BF%AB%E7%85%A7
> > > 
> > > <
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/ops/state/state_backends.html#%E5%A2%9E%E9%87%8F%E5%BF%AB%E7%85%A7
> > >;
> > > Best,
> > >  Congxian
> > > 
> > > 
> > >  op <520075...@qq.comgt; 于2020年8月3日周一 下午2:18写道:
> > > 
> > >  gt; amp;nbsp; amp;nbsp;
> > >  gt;
> > > 同问,我也遇到了状态越来越大的情况,使用的是1.11.0版本,用hdfs保存checkpoint,checkpoint间隔3分钟,
> > >  gt; 逻辑是按照 事件day 和 id 进行groupby
> > >  gt; 然后有十几个聚合指标,运行了7天左右,状态一直在增加,设置了失效时间,然后watermark看着也正常在走
> > >  gt; tConfig.setIdleStateRetentionTime(Time.minutes(1440),
> > >  gt; Time.minutes(1440+10))
> > >  gt;
> > >  gt;
> > >  gt;
> > >  gt;
> > >  gt;
> > > --amp;nbsp;原始邮件amp;nbsp;--
> > >  gt; 发件人:
> > > 
> > >
> >
> gt;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;
> > >  nbsp; "user-zh"
> > > 
> > >
> >
> gt;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;
> > >  nbsp; <
> > >  gt; 384939...@qq.comamp;gt;;
> > >  gt; 发送时间:amp;nbsp;2020年8月3日(星期一) 中午1:50
> > >  gt; 收件人:amp;nbsp;"user-zh" > > amp;gt;;
> > >  gt;
> > >  gt; 主题:amp;nbsp;Re: flink1.10.1/1.11.1 使用sql 进行group 和
> > 时间窗口
> > > 操作后 状态越来越大
> > >  gt;
> > >  gt;
> > >  gt;
> > >  gt; hi,您好:
> > >  gt; 我改回增量模式重新收集了一些数据:
> > >  gt; 1、数据处理速度:3000条每秒,是测试环境的,压力比较稳定,几乎没有波动
> > >  gt; 2、checkpoint是interval设置的是5秒
> 

In the future,the community plans to extend its functionality by providing a REST-based SQL

2020-08-06 文章 air23
Limitations & Future
The current SQL Client only supports embedded mode. In the future, the 
community plans to extend its functionality by providing a REST-based SQL 
Client Gateway, see more in FLIP-24 and FLIP-91.




你好 在官方文档上看到了。请问这个还在计划中。是一种rest sql web客户端吗?