Re: [sql-client][checkpoint] sql-client提交任务,checkpoint没有写入hdfs

2020-09-14 文章 Harold.Miao
是同一个insert任务, 只是重启任务的时候 ,我加了这些代码,构造一个 SavepointRestoreSettings  来恢复cp

请教   我如何判断cp真正写入了hdfs呢,meta文件有什么工具可以解析吗

谢谢

Jark Wu  于2020年9月15日周二 上午11:31写道:

> 是不是你的 cp 恢复的代码,没有执行任何的 insert into 语句?
>
> On Mon, 14 Sep 2020 at 20:15, Harold.Miao  wrote:
>
> > 还有一点是 我们修改了sql-client代码, 让任务从cp恢复,修改如下
> >
> > private StreamExecutionEnvironment createStreamExecutionEnvironment() {
> >final StreamExecutionEnvironment env =
> > StreamExecutionEnvironment.getExecutionEnvironment();
> >
> >
> >
> >
> >
> >
> >
> > *   LOG.info("restore cp exist: {}",
> > environment.getExecution().getRestoreSp().isPresent());   if
> > (environment.getExecution().getRestoreSp().isPresent()) {
> > LOG.info("restore cp path: {}",
> > environment.getExecution().getRestoreSp().get());  if
> > (!environment.getExecution().getRestoreSp().get().contains("none")) {
> >SavepointRestoreSettings savepointRestoreSettings =
> >
> >
> SavepointRestoreSettings.forPath(environment.getExecution().getRestoreSp().get(),
> > true);
> >
> env.getStreamGraph().setSavepointRestoreSettings(savepointRestoreSettings);
> >  }   }*
> >// for TimeCharacteristic validation in StreamTableEnvironmentImpl
> >
> >
> env.setStreamTimeCharacteristic(environment.getExecution().getTimeCharacteristic());
> >if (env.getStreamTimeCharacteristic() ==
> TimeCharacteristic.EventTime) {
> >
> >
> env.getConfig().setAutoWatermarkInterval(environment.getExecution().getPeriodicWatermarksInterval());
> >}
> >return env;
> > }
> >
> >
> > 传入上面那个只有meta文件地址的时候报错如下:
> >
> > Exception in thread "main"
> > org.apache.flink.table.client.SqlClientException: Unexpected
> > exception. This is a bug. Please consider filing an issue.
> > at
> org.apache.flink.table.client.SqlClient.main(SqlClient.java:213)
> > Caused by: org.apache.flink.table.client.gateway.SqlExecutionException:
> > Could not create execution context.
> > at
> >
> org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:879)
> > at
> >
> org.apache.flink.table.client.gateway.local.LocalExecutor.openSession(LocalExecutor.java:227)
> > at
> > org.apache.flink.table.client.SqlClient.start(SqlClient.java:108)
> > at
> org.apache.flink.table.client.SqlClient.main(SqlClient.java:201)
> > Caused by: java.lang.IllegalStateException: No operators defined in
> > streaming topology. Cannot execute.
> > at
> >
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraphGenerator(StreamExecutionEnvironment.java:1870)
> > at
> >
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1861)
> > at
> >
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1846)
> > at
> >
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1834)
> > at
> >
> org.apache.flink.table.client.gateway.local.ExecutionContext.createStreamExecutionEnvironment(ExecutionContext.java:691)
> > at
> >
> org.apache.flink.table.client.gateway.local.ExecutionContext.createTableEnvironment(ExecutionContext.java:593)
> > at
> >
> org.apache.flink.table.client.gateway.local.ExecutionContext.initializeTableEnvironment(ExecutionContext.java:498)
> > at
> >
> org.apache.flink.table.client.gateway.local.ExecutionContext.(ExecutionContext.java:184)
> > at
> >
> org.apache.flink.table.client.gateway.local.ExecutionContext.(ExecutionContext.java:137)
> > at
> >
> org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:868)
> > ... 3 more
> >
> >
> > 错误很明显的显示没有算子的state
> >
> >
> >
> >
> >
> >
> >
> >
> > Congxian Qiu  于2020年9月14日周一 下午7:53写道:
> >
> > > Hi
> > >如果你的 state 都非常小的话,可能就会保存在 meta 文件中了,这样的话就只有 _metadata
> > > 这一个文件的。具体逻辑可以看一下这里[1]
> > >
> > > [1]
> > >
> > >
> >
> https://github.com/apache/flink/blob/9b0fb562898b809b860cf0065ded7a45c49300af/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java#L442
> > > Best,
> > > Congxian
> > >
> > >
> > > Harold.Miao  于2020年9月14日周一 下午6:44写道:
> > >
> > > > hi  all
> > > >
> > > > flink 版本: 1.11.1
> > > >
> > > > 我们利用sql-client提交任务, flink-conf.yaml配置如下
> > > >
> > > > state.backend: filesystem
> > > > state.backend.fs.checkpointdir:
> > > >
> > hdfs:///ai/flink/checkpoint/dataclean/hl-redis0902/checkpoint-data/23252
> > > > state.checkpoints.dir:
> > > >
> > hdfs:///ai/flink/checkpoint/dataclean/hl-redis0902/checkpoint-meta/23252
> > > > state.savepoints.dir:
> > > > hdfs:///ai/flink/checkpoint/dataclean/hl-redis0902/savepoint/23252
> > > >
> > > > execution.checkpointing.externalized-checkpoint-retention:
> > > > RETAIN_ON_CANCELLATION
> > > > execution.checkpointing.interval: 60s
> > > > execution.checkpointing.mode: EXACTLY_ONCE
> > 

Re: flink sql 1.11.1 如何只插入一个列簇的数据到hbase

2020-09-14 文章 Jark Wu
Hi,

目前还不支持这个功能,将来会通过 partial insert [1] 来支持这个功能。
现在只能先定义两个表,每个表只包含一个列簇。


Best,
Jark

[1]: https://issues.apache.org/jira/browse/FLINK-18726

On Wed, 9 Sep 2020 at 16:22, 大罗  wrote:

> Hi,我遇到一个问题,我在hive catalog里定义一个hbase connector表,如下:
>
> CREATE TABLE t_sems_second (
>   rowkey string,
>   status row (val VARCHAR, dqf VARCHAR),
>   analog row (val VARCHAR, dqf VARCHAR)
> ) WITH (
>  'connector' = 'hbase-1.4',
>  'table-name' = 't_sems_second',
>  'zookeeper.quorum' =
> 'dev-hadoop-node-c:2181,dev-hadoop-node-d:2181,dev-hadoop-node-e:2181'
> );
>
>
> 然后,我在flink-sql,运行sql插入数据,没有问题,如下:
> insert into t_sems_second (rowkey, analog, status) values( 'row2', row('a',
> '100'), row('b', '200') );
>
> 如果我尝试把第二个列簇的内容置为空字符串,也是可以,如下:
> insert into t_sems_second (rowkey, analog, status) values( 'row3', row('c',
> '300'), row('', '') );
>
> 但是在hbase查询里,就会显示如下的空字符串:
> hbase(main):019:0> scan 't_sems_second'
> ROW COLUMN+CELL
>
>
>  row2   column=analog:dqf,
> timestamp=1599639282193, value=200
>
>  row2   column=analog:val,
> timestamp=1599639282193, value=b
>
>  row2   column=status:dqf,
> timestamp=1599639282193, value=100
>
>  row2   column=status:val,
> timestamp=1599639282193, value=a
>
> * row3   column=analog:dqf,
> timestamp=1599639292413, value=
>
>  row3   column=analog:val,
> timestamp=1599639292413, value= *
>
>  row3   column=status:dqf,
> timestamp=1599639292413, value=300
>
>  row3   column=status:val,
> timestamp=1599639292413, value=c
>
> 2 row(s)
> Took 0.1184 seconds
>
>
> hbase(main):020:0>
>
> 最终,我的问题是如何只插入数据到列簇analog,类似下面的语句:
> insert into t_sems_second (rowkey, analog) select 'row1', row('a', '100')
> ;
> 或者:
> insert into t_sems_second (rowkey, analog, status) values( 'row2', row('a',
> '100'), row(null, NULL) );
>
> 但证明是不行的?
>
> 那么,flink sql 1.11 hbase
> connector支持只插入数据到其中一个列簇吗,还是说实现我的需求,只能定义两个表,每个表只包含一个列簇?
>
>
>
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: flink sql 1.11.1 insert data to hive from kafka split into two jobs

2020-09-14 文章 Jark Wu
Hi,

目前 DataStream 和 StatementSet 没法在一个 job 中提交。 社区已经注意到这个问题,见FLINK-18840 [1],
且会在 FLIP-136 [2] 中支持。

Best,
Jark

[1]: https://issues.apache.org/jira/browse/FLINK-18840
[2]:
https://cwiki.apache.org/confluence/display/FLINK/FLIP-136%3A++Improve+interoperability+between+DataStream+and+Table+API?src=contextnavpagetreemode

On Thu, 10 Sep 2020 at 11:03, Qishang  wrote:

> Hi. 大罗
> 试一下这个方法 org.apache.flink.table.api.StatementSet#execute
> ss.execute();
>
> 大罗  于2020年9月9日周三 下午3:13写道:
>
> > Hi,我遇到一个问题,在代码中利用flink sql 1.11 从kafka插入数据到hive表,过程如下:
> >
> > 首先,从kafka读取json字符串数组数据,比如[{"pid":"a", "val":1, "data_type": 1, "app_type"
> > :2},
> > {"pid":"a", "val":1, "data_type": 1, "app_type" :2}]
> >
> > 然后,把这个数据使用flatMap转化为单个对象runDataStream,{"pid":"a", "val":1, "data_type":
> 1,
> > "app_type" :2}
> >
> > 把runDataStream输出到redis: runDataStream.addSink(new CustomRedisSink())
> >
> > 然后,再创建临时表,比如:
> > tableEnv.createTemporaryView("kafkaT1",
> runDataSingleOutputStreamOperator,
> > $("pid"),  $("val"), $("app_type"), $("data_type"));
> >
> > 接着定义不同的sql,比如:
> > String sql1 = "insert into ods_data_10 select pid, val where data_type =
> 1
> > and app_type = 0"
> > String sql2 = "insert into ods_data_11 select pid, val where data_type =
> 1
> > and app_type = 1"
> > String sql3 = "insert into ods_data_01 select pid, val where data_type =
> 0
> > and app_type = 1"
> > String sql4 = "insert into ods_data_00 select pid, val where data_type =
> 0
> > and app_type = 0"
> >
> > 使用StatementSet运行它们:
> > StatementSet ss = tableEnv.createStatementSet();
> > ss.addInsertSql(sql1);
> > ss.addInsertSql(sql2);
> > ss.addInsertSql(sql3);
> > ss.addInsertSql(sql4);
> >
> > 最后执行作业:
> > env.execute(jobName);
> >
> > 一切都很正常,没有报错,但是在web UI,却是提交了两个作业,如图:
> >
> > <
> >
> http://apache-flink.147419.n8.nabble.com/file/t909/QQ20200909-150820%402x.png
> >
> >
> >
> > 作业"EconStreamingToHiveHbaseRedisJob"对应的应该是写入redis的操作(假设作业ID为jobA),
> >
> > 作业"insert-into_myhive.dw.ods_analog_sems
> > ***"对应的应该是写入4个表的操作(假设作业ID为jobB),如图:
> >
> > <
> >
> http://apache-flink.147419.n8.nabble.com/file/t909/QQ20200909-150928%402x.png
> >
> >
> >
> > 其中,顶端的operator的定义如下:
> > Source: Custom Source -> Map -> Flat Map -> Filter ->
> > SourceConversion(table=[myhive.dw.kafkaT1], fields=[pid, dqf, val, et,
> > run_data_type]) ->
> > (Calc(select=[pid, val, et, dqf, ((et / 1000) FROM_UNIXTIME
> > _UTF-16LE'-MM-dd') AS EXPR$4, ((et / 1000) FROM_UNIXTIME
> _UTF-16LE'HH')
> > AS EXPR$5], where=[((run_data_type = 0) AND NOT((pid LIKE
> > _UTF-16LE'BP.%')))]) -> StreamingFileWriter,
> > Calc(select=[pid, val, et, dqf, ((et / 1000) FROM_UNIXTIME
> > _UTF-16LE'-MM-dd') AS EXPR$4, ((et / 1000) FROM_UNIXTIME
> _UTF-16LE'HH')
> > AS EXPR$5], where=[((run_data_type = 1) AND NOT((pid LIKE
> > _UTF-16LE'BP.%')))]) -> StreamingFileWriter,
> > Calc(select=[pid, val, et, dqf, ((et / 1000) FROM_UNIXTIME
> > _UTF-16LE'-MM-dd') AS EXPR$4, ((et / 1000) FROM_UNIXTIME
> _UTF-16LE'HH')
> > AS EXPR$5], where=[((run_data_type = 0) AND (pid LIKE _UTF-16LE'BP.%'))])
> > ->
> > StreamingFileWriter,
> > Calc(select=[pid, val, et, dqf, ((et / 1000) FROM_UNIXTIME
> > _UTF-16LE'-MM-dd') AS EXPR$4, ((et / 1000) FROM_UNIXTIME
> _UTF-16LE'HH')
> > AS EXPR$5], where=[((run_data_type = 1) AND (pid LIKE _UTF-16LE'BP.%'))])
> > ->
> > StreamingFileWriter)
> >
> > 我的疑问是,当我想停止这些作业的时候,比如,"./bin/flink stop -m :8081 jobA"
> > 会生成savepoint,比如"Suspending job "395c1f468e65b6e29abb58c27cb80bdc" with a
> > savepoint."
> > 相应的停止作业jobB的时候也会生成这个savepoint。
> >
> > 我的问题是,停止jobA和jobB之间有没有先后顺序,以及我要使用哪个savepoint保证作业的平滑重启呢?
> >
> >
> >
> >
> > --
> > Sent from: http://apache-flink.147419.n8.nabble.com/
> >
>


Re: 关于flink cdc 测试时遇到的几种问题,比较疑惑,各位大佬求帮助

2020-09-14 文章 Jark Wu
Hi,

请问
1. 有完整的异常栈吗? 你是怎么从 ck 恢复的呢? 用的什么命令?
2. 是的。因为 source 只能并发1。先写到 kafka,再从 kafka 同步是可以的。

Best,
Jark

On Fri, 11 Sep 2020 at 17:56, 引领  wrote:

>
>
> 1、在checkpoint后,用ck恢复时报错。
> org.apache.kafka.connect.errors.ConnectException:
> com.github.shyiko.mysql.binlog.event.deserialization.EventDataDeserializationException:
> Failed to deserialize data ofEventHeaderV4{timestamp=1599815908000,
> eventType=EXT_UPDATE_ROWS, serverId=501, headerLength=19, dataLength=25879,
> nextPosition=721073164, flags=0}
> 2、关于flink cdc读取数据后,并执行join【加载维表的操作】后,写入mysql中。并发调不上去,一直是1
> 我已在配置文件中做了相应的设置,包括sql-client中
> taskmanager.numberOfTaskSlots: 5 # The parallelism used for
> programs that did not specify and other parallelism.
>  parallelism.default: 5
>
>
> 我的sql是:
>
>
> Insert into orders Select * from order o join sku s FOR SYSTEM_TIME as
> of o.proc_time s  on o.sku_id = s.id
>
>
> 提前感谢各位大佬回复
>
>
>
>
>
>


Re: 通过sql client操作flink mysql-cdc异常

2020-09-14 文章 Jark Wu
1. 请检查所有的 jar 包是否有破损
2. 只需保留 flink-sql-connector-mysql-cdc-1.1.0.jar,
flink-format-changelog-json-1.1.0.jar 这两个 jar 包,
flink-connector-debezium-1.1.0.jar 和
flink-connector-mysql-cdc-1.1.0.jar 不需要。

Best,
Jark

On Sun, 13 Sep 2020 at 11:10, 陈帅  wrote:

> flink版本是1.11.1,我将
> flink-connector-debezium-1.1.0.jar,
> flink-connector-mysql-cdc-1.1.0.jar,
> flink-sql-connector-kafka_2.12-1.11.1.jar,
> flink-sql-connector-mysql-cdc-1.1.0.jar,
> flink-format-changelog-json-1.1.0.jar
> 下载并加入到 $FLINK_HOME/lib目录,并以embedded模式启动flink sql
> client,同时在mysql中插入一张表,然后在flink sql client中创建相应的表,语句如下
>
> Flink SQL>  CREATE TABLE order_info(
> id BIGINT,
> user_id BIGINT,
> create_time TIMESTAMP(0),
> operate_time TIMESTAMP(0),
> province_id INT,
> order_status STRING,
> total_amount DECIMAL(10, 5)
>   ) WITH (
> 'connector' = 'mysql-cdc',
> 'hostname' = 'localhost',
> 'port' = '3306',
> 'username' = 'root',
> 'password' = 'x,
> 'database-name' = 'test',
> 'table-name' = 'order_info'
> );
>
> 最后在flink sql client中执行查询
> Flink SQL>  select * from order_info;
>
> *[ERROR] Could not execute SQL statement. Reason:*
>
> *java.lang.ClassNotFoundException:
> com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction*
>
>
>
> 结果报了如上exception,我查了下这个类是属于flink-connector-debezium-1.1.0.jar的,而这个jar我已经放到$FLINK_HOME/lib目录下,并且能够解压看到所报缺失类全路径,这是为什么?要如何修复?
>
> sql client日志如下:
>
> ClassLoader info: URL ClassLoader:
> file:
>
> '/var/folders/7n/pfzv54s94w9d9jl578txzx20gn/T/blobStore-ae8ce496-872b-4934-a212-405e34ecfd6f/job_a2634d29a69aa47bfdb0e65b522ff2e8/blob_p-949ecf57aaab045c3136da62fe2e2ab39f502c30-723503d095b849a5d7f3375ef6ddc85f'
> (valid JAR)
> Class not resolvable through given classloader.
> at
>
> org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:288)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> at
>
> org.apache.flink.streaming.runtime.tasks.OperatorChain.(OperatorChain.java:126)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> at
>
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:453)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> at
>
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:522)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_231]
> Caused by: java.lang.ClassNotFoundException:
> com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction
> at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
> ~[?:1.8.0_231]
> at java.lang.ClassLoader.loadClass(ClassLoader.java:418) ~[?:1.8.0_231]
> at
>
> org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:61)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> at
>
> org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> at
>
> org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> at java.lang.ClassLoader.loadClass(ClassLoader.java:351) ~[?:1.8.0_231]
> at java.lang.Class.forName0(Native Method) ~[?:1.8.0_231]
> at java.lang.Class.forName(Class.java:348) ~[?:1.8.0_231]
> at
>
> org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:78)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1868)
> ~[?:1.8.0_231]
> at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1751)
> ~[?:1.8.0_231]
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2042)
> ~[?:1.8.0_231]
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
> ~[?:1.8.0_231]
> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
> ~[?:1.8.0_231]
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
> ~[?:1.8.0_231]
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
> ~[?:1.8.0_231]
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
> ~[?:1.8.0_231]
> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
> ~[?:1.8.0_231]
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
> ~[?:1.8.0_231]
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
> ~[?:1.8.0_231]
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
> ~[?:1.8.0_231]
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
> ~[?:1.8.0_231]
> at
>
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> at
>
> 

Re: sql-client checkpoint sql-client

2020-09-14 文章 Jark Wu
1.  程序挂掉,是会自动从上一个 checkpoint 恢复的,只需要你配上重启机制即可。
2. 如果你希望改了 sql 代码还能复用之前的 state ,sql client 目前是不支持从指定 savepoint/checkpoint
恢复的,需要自己Table API program,然后通过 flink run 来运行。
(注意这种方式也不一定能复用 state,要看拓扑结构和 state 结构是否有改变)


Best,
Jark

On Mon, 14 Sep 2020 at 09:53, Harold.Miao  wrote:

> 从checkpoint恢复  官方还不支持   我司是修改了sql-client来支持这个需求的
>
> 引领  于2020年9月4日周五 下午6:13写道:
>
> >
> >
> > 想尝试采用flink-cdc 来清洗数据,但是尝试下来,困于几点:
> > ① 使用sql-client 开启checkpoint ,如果程序挂掉,该如何接着checkpoint,继续执行相应程序。尤其是在执行group
> > by或者是count等操作时该如何办?
> > ② 如果以上方式不行,是否可以采用写代码的形式,重启时指定checkpoint,但还是采用flink-cdc的方式去消费
> > | |
> > 引领
> > |
> > |
> > yrx73...@163.com
> > |
> > 签名由网易邮箱大师定制
> >
> >
>
> --
>
> Best Regards,
> Harold Miao
>


Re: [sql-client][checkpoint] sql-client提交任务,checkpoint没有写入hdfs

2020-09-14 文章 Jark Wu
是不是你的 cp 恢复的代码,没有执行任何的 insert into 语句?

On Mon, 14 Sep 2020 at 20:15, Harold.Miao  wrote:

> 还有一点是 我们修改了sql-client代码, 让任务从cp恢复,修改如下
>
> private StreamExecutionEnvironment createStreamExecutionEnvironment() {
>final StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
>
>
>
>
>
>
>
> *   LOG.info("restore cp exist: {}",
> environment.getExecution().getRestoreSp().isPresent());   if
> (environment.getExecution().getRestoreSp().isPresent()) {
> LOG.info("restore cp path: {}",
> environment.getExecution().getRestoreSp().get());  if
> (!environment.getExecution().getRestoreSp().get().contains("none")) {
>SavepointRestoreSettings savepointRestoreSettings =
>
> SavepointRestoreSettings.forPath(environment.getExecution().getRestoreSp().get(),
> true);
>  env.getStreamGraph().setSavepointRestoreSettings(savepointRestoreSettings);
>  }   }*
>// for TimeCharacteristic validation in StreamTableEnvironmentImpl
>
>  
> env.setStreamTimeCharacteristic(environment.getExecution().getTimeCharacteristic());
>if (env.getStreamTimeCharacteristic() == TimeCharacteristic.EventTime) {
>
> env.getConfig().setAutoWatermarkInterval(environment.getExecution().getPeriodicWatermarksInterval());
>}
>return env;
> }
>
>
> 传入上面那个只有meta文件地址的时候报错如下:
>
> Exception in thread "main"
> org.apache.flink.table.client.SqlClientException: Unexpected
> exception. This is a bug. Please consider filing an issue.
> at org.apache.flink.table.client.SqlClient.main(SqlClient.java:213)
> Caused by: org.apache.flink.table.client.gateway.SqlExecutionException:
> Could not create execution context.
> at
> org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:879)
> at
> org.apache.flink.table.client.gateway.local.LocalExecutor.openSession(LocalExecutor.java:227)
> at
> org.apache.flink.table.client.SqlClient.start(SqlClient.java:108)
> at org.apache.flink.table.client.SqlClient.main(SqlClient.java:201)
> Caused by: java.lang.IllegalStateException: No operators defined in
> streaming topology. Cannot execute.
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraphGenerator(StreamExecutionEnvironment.java:1870)
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1861)
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1846)
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1834)
> at
> org.apache.flink.table.client.gateway.local.ExecutionContext.createStreamExecutionEnvironment(ExecutionContext.java:691)
> at
> org.apache.flink.table.client.gateway.local.ExecutionContext.createTableEnvironment(ExecutionContext.java:593)
> at
> org.apache.flink.table.client.gateway.local.ExecutionContext.initializeTableEnvironment(ExecutionContext.java:498)
> at
> org.apache.flink.table.client.gateway.local.ExecutionContext.(ExecutionContext.java:184)
> at
> org.apache.flink.table.client.gateway.local.ExecutionContext.(ExecutionContext.java:137)
> at
> org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:868)
> ... 3 more
>
>
> 错误很明显的显示没有算子的state
>
>
>
>
>
>
>
>
> Congxian Qiu  于2020年9月14日周一 下午7:53写道:
>
> > Hi
> >如果你的 state 都非常小的话,可能就会保存在 meta 文件中了,这样的话就只有 _metadata
> > 这一个文件的。具体逻辑可以看一下这里[1]
> >
> > [1]
> >
> >
> https://github.com/apache/flink/blob/9b0fb562898b809b860cf0065ded7a45c49300af/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java#L442
> > Best,
> > Congxian
> >
> >
> > Harold.Miao  于2020年9月14日周一 下午6:44写道:
> >
> > > hi  all
> > >
> > > flink 版本: 1.11.1
> > >
> > > 我们利用sql-client提交任务, flink-conf.yaml配置如下
> > >
> > > state.backend: filesystem
> > > state.backend.fs.checkpointdir:
> > >
> hdfs:///ai/flink/checkpoint/dataclean/hl-redis0902/checkpoint-data/23252
> > > state.checkpoints.dir:
> > >
> hdfs:///ai/flink/checkpoint/dataclean/hl-redis0902/checkpoint-meta/23252
> > > state.savepoints.dir:
> > > hdfs:///ai/flink/checkpoint/dataclean/hl-redis0902/savepoint/23252
> > >
> > > execution.checkpointing.externalized-checkpoint-retention:
> > > RETAIN_ON_CANCELLATION
> > > execution.checkpointing.interval: 60s
> > > execution.checkpointing.mode: EXACTLY_ONCE
> > > jobmanager.execution.failover-strategy: full
> > > state.backend.incremental: true
> > >
> > >
> > > 任务运行后,在UI界面上看checkpoint都成功了。 但是hdfs上面却一直只有一个meta文件
> > >
> > > 类似下面:
> > >
> > > hdfs://
> > >
> >
> 10.218.60.57:8020/ai/flink/checkpoint/dataclean/hl-redis0902/checkpoint-meta/23250/c72c1ee4362c3d0ba72db32698363fcf/chk-5/_metadata
> > >
> > > 除了这个文件,其他什么都没有。
> > >
> > > 我们的源是kafka,kafka肯定会保存state的。
> > >
> > >
> > > 

Re: flink-cdc sink mysql 问题

2020-09-14 文章 Leonard Xu
陈韬说的是对的,cdc connector只有source的, 你写入的是mysql表,需要加下jdbc connector 的依赖

祝好
Leonard
> 
>> 2020年9月10日 下午3:54,杨帅统  写道:
>> 
>> pvuv_test_back
> 



flink RichFilterFunction重复过滤一条数据

2020-09-14 文章 明启 孙
场景:

flink消费kafka,然后过滤掉某种类型的数据,然后打印一条warn类型的数据。

在测试时,我往kafka写了一条会被过滤掉的数据,偶尔会正常打印一条warn,更多的时候会重复打印该条数据,类似重复消费该条数据。

然后我在warn之后写了一条print语句,这时候就能正常过滤,过滤一条就打印一次warn,不会出现过滤一条数据,重复打印warn。因为这会导致我后续正常的数据无法消费,不知道这是什么问题。
代码:
@Override
   Public boolean filter(Genericrecord record) throws Exception{
 String op_type=record.get("op_type")!=null ? 
record.get("op_type")!=null.toString():"-";
 if("D".equals(op_type)){
  logger.warn(record.toString())
//System.out.println("过滤掉");
 return false;
}return true;
 }
smq



flink RichFilterFunction重复过滤一条数据

2020-09-14 文章 13346846277


Re: [sql-client][checkpoint] sql-client提交任务,checkpoint没有写入hdfs

2020-09-14 文章 Harold.Miao
还有一点是 我们修改了sql-client代码, 让任务从cp恢复,修改如下

private StreamExecutionEnvironment createStreamExecutionEnvironment() {
   final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();







*   LOG.info("restore cp exist: {}",
environment.getExecution().getRestoreSp().isPresent());   if
(environment.getExecution().getRestoreSp().isPresent()) {
LOG.info("restore cp path: {}",
environment.getExecution().getRestoreSp().get());  if
(!environment.getExecution().getRestoreSp().get().contains("none")) {
   SavepointRestoreSettings savepointRestoreSettings =
SavepointRestoreSettings.forPath(environment.getExecution().getRestoreSp().get(),
true); 
env.getStreamGraph().setSavepointRestoreSettings(savepointRestoreSettings);
 }   }*
   // for TimeCharacteristic validation in StreamTableEnvironmentImpl
   
env.setStreamTimeCharacteristic(environment.getExecution().getTimeCharacteristic());
   if (env.getStreamTimeCharacteristic() == TimeCharacteristic.EventTime) {
  
env.getConfig().setAutoWatermarkInterval(environment.getExecution().getPeriodicWatermarksInterval());
   }
   return env;
}


传入上面那个只有meta文件地址的时候报错如下:

Exception in thread "main"
org.apache.flink.table.client.SqlClientException: Unexpected
exception. This is a bug. Please consider filing an issue.
at org.apache.flink.table.client.SqlClient.main(SqlClient.java:213)
Caused by: org.apache.flink.table.client.gateway.SqlExecutionException:
Could not create execution context.
at 
org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:879)
at 
org.apache.flink.table.client.gateway.local.LocalExecutor.openSession(LocalExecutor.java:227)
at org.apache.flink.table.client.SqlClient.start(SqlClient.java:108)
at org.apache.flink.table.client.SqlClient.main(SqlClient.java:201)
Caused by: java.lang.IllegalStateException: No operators defined in
streaming topology. Cannot execute.
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraphGenerator(StreamExecutionEnvironment.java:1870)
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1861)
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1846)
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1834)
at 
org.apache.flink.table.client.gateway.local.ExecutionContext.createStreamExecutionEnvironment(ExecutionContext.java:691)
at 
org.apache.flink.table.client.gateway.local.ExecutionContext.createTableEnvironment(ExecutionContext.java:593)
at 
org.apache.flink.table.client.gateway.local.ExecutionContext.initializeTableEnvironment(ExecutionContext.java:498)
at 
org.apache.flink.table.client.gateway.local.ExecutionContext.(ExecutionContext.java:184)
at 
org.apache.flink.table.client.gateway.local.ExecutionContext.(ExecutionContext.java:137)
at 
org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:868)
... 3 more


错误很明显的显示没有算子的state








Congxian Qiu  于2020年9月14日周一 下午7:53写道:

> Hi
>如果你的 state 都非常小的话,可能就会保存在 meta 文件中了,这样的话就只有 _metadata
> 这一个文件的。具体逻辑可以看一下这里[1]
>
> [1]
>
> https://github.com/apache/flink/blob/9b0fb562898b809b860cf0065ded7a45c49300af/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java#L442
> Best,
> Congxian
>
>
> Harold.Miao  于2020年9月14日周一 下午6:44写道:
>
> > hi  all
> >
> > flink 版本: 1.11.1
> >
> > 我们利用sql-client提交任务, flink-conf.yaml配置如下
> >
> > state.backend: filesystem
> > state.backend.fs.checkpointdir:
> > hdfs:///ai/flink/checkpoint/dataclean/hl-redis0902/checkpoint-data/23252
> > state.checkpoints.dir:
> > hdfs:///ai/flink/checkpoint/dataclean/hl-redis0902/checkpoint-meta/23252
> > state.savepoints.dir:
> > hdfs:///ai/flink/checkpoint/dataclean/hl-redis0902/savepoint/23252
> >
> > execution.checkpointing.externalized-checkpoint-retention:
> > RETAIN_ON_CANCELLATION
> > execution.checkpointing.interval: 60s
> > execution.checkpointing.mode: EXACTLY_ONCE
> > jobmanager.execution.failover-strategy: full
> > state.backend.incremental: true
> >
> >
> > 任务运行后,在UI界面上看checkpoint都成功了。 但是hdfs上面却一直只有一个meta文件
> >
> > 类似下面:
> >
> > hdfs://
> >
> 10.218.60.57:8020/ai/flink/checkpoint/dataclean/hl-redis0902/checkpoint-meta/23250/c72c1ee4362c3d0ba72db32698363fcf/chk-5/_metadata
> >
> > 除了这个文件,其他什么都没有。
> >
> > 我们的源是kafka,kafka肯定会保存state的。
> >
> >
> > 请教大家这是什么原因导致的呢
> >
> >
> > 谢谢
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> > --
> >
> > Best Regards,
> > Harold Miao
> >
>


-- 

Best Regards,
Harold Miao


Re: [sql-client][checkpoint] sql-client提交任务,checkpoint没有写入hdfs

2020-09-14 文章 Congxian Qiu
Hi
   如果你的 state 都非常小的话,可能就会保存在 meta 文件中了,这样的话就只有 _metadata
这一个文件的。具体逻辑可以看一下这里[1]

[1]
https://github.com/apache/flink/blob/9b0fb562898b809b860cf0065ded7a45c49300af/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java#L442
Best,
Congxian


Harold.Miao  于2020年9月14日周一 下午6:44写道:

> hi  all
>
> flink 版本: 1.11.1
>
> 我们利用sql-client提交任务, flink-conf.yaml配置如下
>
> state.backend: filesystem
> state.backend.fs.checkpointdir:
> hdfs:///ai/flink/checkpoint/dataclean/hl-redis0902/checkpoint-data/23252
> state.checkpoints.dir:
> hdfs:///ai/flink/checkpoint/dataclean/hl-redis0902/checkpoint-meta/23252
> state.savepoints.dir:
> hdfs:///ai/flink/checkpoint/dataclean/hl-redis0902/savepoint/23252
>
> execution.checkpointing.externalized-checkpoint-retention:
> RETAIN_ON_CANCELLATION
> execution.checkpointing.interval: 60s
> execution.checkpointing.mode: EXACTLY_ONCE
> jobmanager.execution.failover-strategy: full
> state.backend.incremental: true
>
>
> 任务运行后,在UI界面上看checkpoint都成功了。 但是hdfs上面却一直只有一个meta文件
>
> 类似下面:
>
> hdfs://
> 10.218.60.57:8020/ai/flink/checkpoint/dataclean/hl-redis0902/checkpoint-meta/23250/c72c1ee4362c3d0ba72db32698363fcf/chk-5/_metadata
>
> 除了这个文件,其他什么都没有。
>
> 我们的源是kafka,kafka肯定会保存state的。
>
>
> 请教大家这是什么原因导致的呢
>
>
> 谢谢
>
>
>
>
>
>
>
>
>
>
> --
>
> Best Regards,
> Harold Miao
>


[sql-client][checkpoint] sql-client提交任务,checkpoint没有写入hdfs

2020-09-14 文章 Harold.Miao
hi  all

flink 版本: 1.11.1

我们利用sql-client提交任务, flink-conf.yaml配置如下

state.backend: filesystem
state.backend.fs.checkpointdir:
hdfs:///ai/flink/checkpoint/dataclean/hl-redis0902/checkpoint-data/23252
state.checkpoints.dir:
hdfs:///ai/flink/checkpoint/dataclean/hl-redis0902/checkpoint-meta/23252
state.savepoints.dir:
hdfs:///ai/flink/checkpoint/dataclean/hl-redis0902/savepoint/23252

execution.checkpointing.externalized-checkpoint-retention:
RETAIN_ON_CANCELLATION
execution.checkpointing.interval: 60s
execution.checkpointing.mode: EXACTLY_ONCE
jobmanager.execution.failover-strategy: full
state.backend.incremental: true


任务运行后,在UI界面上看checkpoint都成功了。 但是hdfs上面却一直只有一个meta文件

类似下面:

hdfs://10.218.60.57:8020/ai/flink/checkpoint/dataclean/hl-redis0902/checkpoint-meta/23250/c72c1ee4362c3d0ba72db32698363fcf/chk-5/_metadata

除了这个文件,其他什么都没有。

我们的源是kafka,kafka肯定会保存state的。


请教大家这是什么原因导致的呢


谢谢










-- 

Best Regards,
Harold Miao


Re: 关于flinksql 滑动窗口数据进不来的问题

2020-09-14 文章 Benchao Li
那看起来就是watermark的问题了。你可以在Flink web UI上查看一下对应的算子的watermark是否符合预期。

有一个小tip,watermark本身是由数据来驱动更新的。比如你只有一条数据,那么你的watermark就只能是根据
这条数据计算出来的,不会自动再更新。

李杨烨 <438106...@qq.com> 于2020年9月14日周一 下午5:27写道:

> 在进入stream之前是有数据的,使用hop方法计算之后就没有数据流出了。
>
>
> 水印的设置代码如下:
> simpleResults.assignTimestampsAndWatermarks(WatermarkStrategy
> . .withTimestampAssigner((event,
> timestamp)-event.getGmtPaidLong())
> .withIdleness(Duration.ofSeconds(5)));
> ---
> 另外 刚刚我用了processTime做窗口滑动是可以实现的,但是processTime对业务不友好,因此如果根据rowTime可以做是最好的。
>
>
> --原始邮件--
> 发件人:
>   "user-zh"
> <
> libenc...@apache.org;
> 发送时间:2020年9月14日(星期一) 下午5:19
> 收件人:"user-zh"
> 主题:Re: 关于flinksql 滑动窗口数据进不来的问题
>
>
>
> 可以再详细一点描述下问题么,滑动窗口数据进不来,指的是窗口没有触发计算还是数据就没有到窗口呢?
>
> 如果只是窗口没有触发计算,一般用了row time的话,可以排查下watermark是否有正常生成。
>
> 李杨烨 <438106...@qq.com 于2020年9月14日周一 下午1:32写道:
>
>  刚刚邮件图片挂了,上传了新的图片地址:
> http://chuantu.xyz/t6/741/1600061331x-1224481926.jpg
>  使用rowTime做的滑动
>
>
>
> --
>
> Best,
> Benchao Li



-- 

Best,
Benchao Li


?????? ????flinksql ????????????????????????

2020-09-14 文章 ??????
??streamhop??



simpleResults.assignTimestampsAndWatermarks(WatermarkStrategy
.http://chuantu.xyz/t6/741/1600061331x-1224481926.jpg
 rowTime



-- 

Best,
Benchao Li

Re: 关于flinksql 滑动窗口数据进不来的问题

2020-09-14 文章 Benchao Li
可以再详细一点描述下问题么,滑动窗口数据进不来,指的是窗口没有触发计算还是数据就没有到窗口呢?

如果只是窗口没有触发计算,一般用了row time的话,可以排查下watermark是否有正常生成。

李杨烨 <438106...@qq.com> 于2020年9月14日周一 下午1:32写道:

> 刚刚邮件图片挂了,上传了新的图片地址:http://chuantu.xyz/t6/741/1600061331x-1224481926.jpg
> 使用rowTime做的滑动



-- 

Best,
Benchao Li


flink hive批量作业报FileNotFoundException

2020-09-14 文章 李佳宸
大家好~我执行batch table的作业写入hive时,会出现FileNotFoundException的错误。找不到.staging文件
版本是1.11.1
Caused by: java.io.FileNotFoundException: File
hdfs://gykjcluster/user/hive/warehouse/etl_et_flink_sink.db/ods_et_es_financialestimate/.staging_1600070419144
does not exist.
at
org.apache.hadoop.hdfs.DistributedFileSystem.listStatusInternal(DistributedFileSystem.java:1053)
~[hadoop-client-api-3.1.3.jar:?]
at
org.apache.hadoop.hdfs.DistributedFileSystem.access$1000(DistributedFileSystem.java:131)
~[hadoop-client-api-3.1.3.jar:?]
at
org.apache.hadoop.hdfs.DistributedFileSystem$24.doCall(DistributedFileSystem.java:1113)
~[hadoop-client-api-3.1.3.jar:?]
at
org.apache.hadoop.hdfs.DistributedFileSystem$24.doCall(DistributedFileSystem.java:1110)
~[hadoop-client-api-3.1.3.jar:?]
at
org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
~[hadoop-client-api-3.1.3.jar:?]
at
org.apache.hadoop.hdfs.DistributedFileSystem.listStatus(DistributedFileSystem.java:1120)
~[hadoop-client-api-3.1.3.jar:?]
at
org.apache.flink.hive.shaded.fs.hdfs.HadoopFileSystem.listStatus(HadoopFileSystem.java:157)
~[flink-sql-connector-hive-3.1.2_2.11-1.11.0.jar:1.11.0]
at
org.apache.flink.table.filesystem.PartitionTempFileManager.headCheckpoints(PartitionTempFileManager.java:140)
~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
at
org.apache.flink.table.filesystem.FileSystemCommitter.commitUpToCheckpoint(FileSystemCommitter.java:98)
~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
at
org.apache.flink.table.filesystem.FileSystemOutputFormat.finalizeGlobal(FileSystemOutputFormat.java:95)
~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
at
org.apache.flink.runtime.jobgraph.InputOutputFormatVertex.finalizeOnMaster(InputOutputFormatVertex.java:132)
~[flink-dist_2.11-1.11.1.jar:1.11.1]
at
org.apache.flink.runtime.executiongraph.ExecutionGraph.vertexFinished(ExecutionGraph.java:1286)
~[flink-dist_2.11-1.11.1.jar:1.11.1]

在standalone模式下没有这个问题,on yarn 的per job模式下部分job就会出现这个问题


Re: 请教一下Flink和主流数据湖集成的情况

2020-09-14 文章 dixingxing85
谢谢,是的是有iceberg的sink。我看之信在做flink读取iceberg这块的工作,主要想知道下进展和社区未来会重点支持哪个数据湖框架

Sent from my iPhone

> On Sep 14, 2020, at 14:00, Congxian Qiu  wrote:
> 
> Hi
>   据我所知,iceberg 有一个 flink 的 sink,可以看下这个 PR[1]
> [1] https://github.com/apache/iceberg/pull/856
> Best,
> Congxian
> 
> 
> dixingxing85  于2020年9月12日周六 下午4:54写道:
> 
>> Hi all:
>> 想请教一个问题,现在一些公司已经开始应用数据湖技术了,目前flink和iceberg,hudi,delta
>> lake这些的集成情况分别是怎样的?社区有主动跟进某一个数据湖技术的集成吗?预计什么时候能有相对完善的source,sink。谢谢
>> 
>> Sent from my iPhone
>> 
>> 
>> Sent from my iPhone


Re: UDAF函数在over窗口使用问题

2020-09-14 文章 chen310
谢谢,的确是retract方法没有实现导致了。已解决



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: 有木有比较好flink sql 任务提交插件推荐

2020-09-14 文章 admin
https://github.com/wuchong/flink-sql-submit 

大佬出品,必属精品,我们基于这个二次开发的

> 2020年9月11日 下午6:04,xuzh  写道:
> 
> Dear all:
> 目前有找到两个sql任务提交插件:
> https://github.com/wuchong/flink-sql-submit
> https://github.com/springMoon/sqlSubmit
> 大家有木有用过,推荐一下



Re: 请教一下Flink和主流数据湖集成的情况

2020-09-14 文章 Congxian Qiu
Hi
   据我所知,iceberg 有一个 flink 的 sink,可以看下这个 PR[1]
[1] https://github.com/apache/iceberg/pull/856
Best,
Congxian


dixingxing85  于2020年9月12日周六 下午4:54写道:

> Hi all:
> 想请教一个问题,现在一些公司已经开始应用数据湖技术了,目前flink和iceberg,hudi,delta
> lake这些的集成情况分别是怎样的?社区有主动跟进某一个数据湖技术的集成吗?预计什么时候能有相对完善的source,sink。谢谢
>
> Sent from my iPhone
>
>
> Sent from my iPhone