How to start flink standalone session on windows ?
There's no start-cluster.bat and flink.bat in bin directory. So how can i start flink on windowns OS? Thanks, Lei wangl...@geekplus.com.cn
flink-1.11 在 windows 下怎样启动
我看 flink-1.11 发布包 bin 目录没有 windows 启动所需的 .bat 文件了。 那在 windows 下怎样启动呢? 谢谢 王磊 wangl...@geekplus.com.cn
flink-1.11 KafkaDynamicTableSouce groupBy 结果怎样发送到 kafka
INSERT INTO kafka_dws_artemis_out_order select warehouse_id, count(*) from kafka_ods_artemis_out_order group by warehouse_id; [ERROR] Could not execute SQL statement. Reason: org.apache.flink.table.api.TableException: Table sink 'myhive.wanglei.kafka_dws_artemis_out_order' doesn't support consuming update changes which is produced by node GroupAggregate(groupBy=[warehouse_id], select=[warehouse_id, COUNT(*) AS EXPR$1]) 在 Flink-1.10 中可以更改 KafkaTableSinkBase 让它 implements RetractStream 实现。 我看现在 Flink-1.11 中是用了 KafkaDynamicSource, KafkaDynamicSink,这样怎样改动才能让 GroupBy 的结果也发送到 Kafka 呢? 谢谢, 王磊 wangl...@geekplus.com.cn
Re: Re: FlinkSQL 是通过怎样的机制找到要执行的 Java 代码的呢?
谢谢,我理解了。 wangl...@geekplus.com.cn Sender: Harold.Miao Send Time: 2020-07-16 19:33 Receiver: user-zh Subject: Re: Re: FlinkSQL 是通过怎样的机制找到要执行的 Java 代码的呢? 我的理解 : 大概就是通过spi机制加载类,然后通过属性去过滤出来需要的class 类似下面的代码 private static T findSingleInternal( Class factoryClass, Map properties, Optional classLoader) { List tableFactories = discoverFactories(classLoader); List filtered = filter(tableFactories, factoryClass, properties); if (filtered.size() > 1) { throw new AmbiguousTableFactoryException( filtered, factoryClass, tableFactories, properties); } else { return filtered.get(0); } } private static List discoverFactories(Optional classLoader) { try { List result = new LinkedList<>(); ClassLoader cl = classLoader.orElse(Thread.currentThread().getContextClassLoader()); ServiceLoader .load(TableFactory.class, cl) .iterator() .forEachRemaining(result::add); return result; } catch (ServiceConfigurationError e) { LOG.error("Could not load service provider for table factories.", e); throw new TableException("Could not load service provider for table factories.", e); } } wangl...@geekplus.com.cn 于2020年7月16日周四 下午7:04写道: > > 我在 > flink-formats/flink-json/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory > 找到了 SPI 的配置: > > org.apache.flink.formats.json.JsonFileSystemFormatFactory > org.apache.flink.formats.json.JsonFormatFactory > org.apache.flink.formats.json.debezium.DebeziumJsonFormatFactory > org.apache.flink.formats.json.canal.CanalJsonFormatFactory > > 还是没有搞清楚 指定 'format'='debezium-json' 怎么就能对应到 DebeziumJsonFormatFactory > 我的理解肯定要有一个地方指明 debezium-json 要对应到 DebeziumJsonFormatFactory, 但是我 grep > 代码没找到类似的关系映射配置。 > > > 谢谢, > 王磊 > > > > wangl...@geekplus.com.cn > > > Sender: godfrey he > Send Time: 2020-07-16 16:38 > Receiver: user-zh > Subject: Re: FlinkSQL 是通过怎样的机制找到要执行的 Java 代码的呢? > 通过Java 的 SPI 机制来找到对应的 format,可以参考 [1] > > [1] > > https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/#how-to-use-connectors > > Best, > Godfrey > > wangl...@geekplus.com.cn 于2020年7月16日周四 > 下午4:02写道: > > > 比如: > > > > CREATE TABLE my_table ( > > id BIGINT, > > first_name STRING, > > last_name STRING, > > email STRING > > ) WITH ( > > 'connector'='kafka', > > 'topic'='user_topic', > > 'properties.bootstrap.servers'='localhost:9092', > > 'scan.startup.mode'='earliest-offset', > > 'format'='debezium-json' > > ); > > > > 最终解析 debezium-json 应该是 > > > flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/debezium > > 下面的代码 > > 但 flinkSQL 是怎样的机制找到要执行的 Java 代码的呢? > > > > 谢谢, > > 王磊 > > > > > > wangl...@geekplus.com.cn > > > > > -- Best Regards, Harold Miao
Re: Re: FlinkSQL 是通过怎样的机制找到要执行的 Java 代码的呢?
我在 flink-formats/flink-json/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory 找到了 SPI 的配置: org.apache.flink.formats.json.JsonFileSystemFormatFactory org.apache.flink.formats.json.JsonFormatFactory org.apache.flink.formats.json.debezium.DebeziumJsonFormatFactory org.apache.flink.formats.json.canal.CanalJsonFormatFactory 还是没有搞清楚 指定 'format'='debezium-json' 怎么就能对应到 DebeziumJsonFormatFactory 我的理解肯定要有一个地方指明 debezium-json 要对应到 DebeziumJsonFormatFactory, 但是我 grep 代码没找到类似的关系映射配置。 谢谢, 王磊 wangl...@geekplus.com.cn Sender: godfrey he Send Time: 2020-07-16 16:38 Receiver: user-zh Subject: Re: FlinkSQL 是通过怎样的机制找到要执行的 Java 代码的呢? 通过Java 的 SPI 机制来找到对应的 format,可以参考 [1] [1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/#how-to-use-connectors Best, Godfrey wangl...@geekplus.com.cn 于2020年7月16日周四 下午4:02写道: > 比如: > > CREATE TABLE my_table ( > id BIGINT, > first_name STRING, > last_name STRING, > email STRING > ) WITH ( > 'connector'='kafka', > 'topic'='user_topic', > 'properties.bootstrap.servers'='localhost:9092', > 'scan.startup.mode'='earliest-offset', > 'format'='debezium-json' > ); > > 最终解析 debezium-json 应该是 > flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/debezium > 下面的代码 > 但 flinkSQL 是怎样的机制找到要执行的 Java 代码的呢? > > 谢谢, > 王磊 > > > wangl...@geekplus.com.cn > >
Re: [sql-client] 通过sql-client提交sql怎么设置checkpointing.interval
直接在 flink-conf.yaml 文件中加配置 execution.checkpointing.interval: 6 wangl...@geekplus.com.cn Sender: Harold.Miao Send Time: 2020-07-16 13:27 Receiver: user-zh Subject: [sql-client] 通过sql-client提交sql怎么设置checkpointing.interval hi flink users 通过sql-client提交sql怎么设置checkpointing.interval? 我看了一下sql-client-defaults.yaml中的execution, 并没有发现这个参数。请教大家一下。 谢谢 -- Best Regards, Harold Miao
FlinkSQL 是通过怎样的机制找到要执行的 Java 代码的呢?
比如: CREATE TABLE my_table ( id BIGINT, first_name STRING, last_name STRING, email STRING ) WITH ( 'connector'='kafka', 'topic'='user_topic', 'properties.bootstrap.servers'='localhost:9092', 'scan.startup.mode'='earliest-offset', 'format'='debezium-json' ); 最终解析 debezium-json 应该是 flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/debezium 下面的代码 但 flinkSQL 是怎样的机制找到要执行的 Java 代码的呢? 谢谢, 王磊 wangl...@geekplus.com.cn
回复: FlinkSQL 入到 MySQL后汉字乱码
是 MySQL_tableB 所在的 server 端字符设置有问题。 配置中加上下面的配置就好了。 [mysqld] character-set-server=utf8 [client] default-character-set=utf8 [mysql] default-character-set=utf8 wangl...@geekplus.com.cn 发件人: wangl...@geekplus.com.cn 发送时间: 2020-07-15 16:34 收件人: user-zh 主题: FlinkSQL 入到 MySQL后汉字乱码 KafkaTable:kafka 消息 MySQL_tableA: 维表,维表里 value 是汉字 MySQL_tableB: join后的结果表。和 MySQL_tableA 不在同一台服务器上。 我直接在 flink sql client SELECT 是可以正常显示, 但 INSERT INTO MySQL_tableB SELECT 后到 MySQL_tableB 里去查看,汉字就乱码了。 大家有什么建议吗? 谢谢, 王磊 wangl...@geekplus.com.cn
Re: Re: 不能实时读取实时写入到 Hive 的数据
谢谢,根本原因就是 flink sql-client 客户端默认没有设置 checkpoint 导致的。 wangl...@geekplus.com.cn Sender: Rui Li Send Time: 2020-07-14 18:29 Receiver: user-zh cc: Leonard Xu; 夏帅 Subject: Re: Re: 不能实时读取实时写入到 Hive 的数据 流数据写hive时,不管是分区表还是非分区表,commit都是通过checkpoint触发的。用SQL client的话可以在flink-conf.yaml里设置execution.checkpointing.interval来开启checkpoint On Tue, Jul 14, 2020 at 5:49 PM wangl...@geekplus.com.cn < wangl...@geekplus.com.cn> wrote: > 我把问题简化一下,创建 Hive 表时不带任何参数 > > CREATE TABLE hive_ods_wms_pick_order ( > order_no STRING, > status INT, > dispatch_time TIMESTAMP > ) STORED AS parquet > > INSERT INTO TABLE hive_ods_wms_pick_order SELECT order_no, status, > dispatch_time FROM kafka_ods_wms_pick_order; > > 我用的 sql-client 客户端,15 分钟过去了 hive 表对应的 hdfs 目录为什么还只是有一个大小为 0 的 .part 文件呢? > 我在 flink 客户端 SELECT order_no, status, dispatch_time FROM > kafka_ods_wms_pick_order 确实是有数据返回的。 > > 我在 flink web ui 看了下这个 job 的 Checkpoint Counts 是 0. > 是需要让 job 做 checkpoint 才能写到 hdfs 上吗? > 我用 Flink sql-client 客户端怎么设置做 checkpoint 的频率呢? > > 谢谢, > 王磊 > > > > > wangl...@geekplus.com.cn > > > 发件人: Leonard Xu > 发送时间: 2020-07-14 17:20 > 收件人: user-zh; 夏帅 > 抄送: wangl...@geekplus.com.cn > 主题: Re: 不能实时读取实时写入到 Hive 的数据 > > Hi, wanglei > > 这个参数 'sink.partition-commit.delay'='1 h’会在cp 完成后 + 你设置的1h delay后才会提交 hive > 的分区已完成信息(通过metastore或success文件). > > 你看下夏帅贴的邮件,检查下 checkpoint 和 partition-commit的设置 > > 祝好, > Leonard Xu > > > 在 2020年7月14日,16:59,夏帅 写道: > > 你好, > 可以参考下这个问题的解决 > > http://apache-flink.147419.n8.nabble.com/Table-options-do-not-contain-an-option-key-connector-for-discovering-a-connector-td4767.html > > > -- > 发件人:wangl...@geekplus.com.cn > 发送时间:2020年7月14日(星期二) 16:50 > 收件人:user-zh ; 夏帅 ; > Leonard Xu > 主 题:Re: 回复: 不能实时读取实时写入到 Hive 的数据 > > > 应该是我没有理解 partitiion-commit 的意思,我看这里有文档: > https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/filesystem.html#partition-commit > > > CREATE TABLE kafka_ods_wms_pick_order ( > order_no STRING, > status INT, > dispatch_time TIMESTAMP(3) > ) WITH ( > 'connector' = 'kafka', > 'topic' = 'ods_wms_pick_order', > 'properties.bootstrap.servers' = ':9092', > 'properties.group.id' = 'testGroup', > 'format' = 'json', > 'scan.startup.mode' = 'latest-offset' > ) > > > CREATE TABLE hive_ods_wms_pick_order ( > order_no STRING, > status INT, > dispatch_time TIMESTAMP > ) PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet TBLPROPERTIES ( > 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00', > 'sink.partition-commit.trigger'='partition-time', > 'sink.partition-commit.delay'='1 h', > 'sink.partition-commit.policy.kind'='metastore,success-file' > ); > > INSERT INTO TABLE hive_ods_wms_pick_order SELECT order_no, status, > dispatch_time, DATE_FORMAT(dispatch_time, '-MM-dd'), > DATE_FORMAT(dispatch_time, 'HH') FROM kafka_ods_wms_pick_order; > SELECT * FROM hive_ods_wms_pick_order /*+ > OPTIONS('streaming-source.enable'='true', > 'streaming-source.consume-start-offset'='2020-07-24') */; > > > > > wangl...@geekplus.com.cn > > > Sender: 夏帅 > Send Time: 2020-07-14 16:43 > Receiver: user-zh; xbjtdcq > Subject: 回复:Re: 不能实时读取实时写入到 Hive 的数据 > 你好, > 这说明写入的hive文件没有进行rollup,可以贴下SQL么 > -- > 发件人:wangl...@geekplus.com.cn > 发送时间:2020年7月14日(星期二) 16:40 > 收件人:user-zh ; xbjtdcq > 主 题:回复: Re: 不能实时读取实时写入到 Hive 的数据 > 我加上了这个 tablehint 。 > 任务提交上去了,但客户端还是没有任何返回显示。 > 我到 hadoop 集群上看了下 hive 表所在的这个目录,所有的文件都是 .part 开头的 inprogress 文件。 > 谢谢, > 王磊 > wangl...@geekplus.com.cn > 发件人: Leonard Xu > 发送时间: 2020-07-14 16:17 > 收件人: user-zh > 主题: Re: 不能实时读取实时写入到 Hive 的数据 > HI, wanglei > 你开启了 streaming-source.enable > 吗?这个参数用于指定如何读取是batch读,还是stream读,如果你要实时读的话应该把这个值设定为true, 可以使用tablehints > 方便地指定参数。 > SELECT * FROM hive_table /*+ OPTIONS('streaming-source.enable'='true', > 'streaming-source.consume-start-offset'='2020-05-20') */; > 就在你看得这个页面应该有对应的文档说明如何读取hive数据。 > 祝好, > Leonard Xu > 在 2020年7月14日,15:47,wangl...@geekplus.com.cn 写道: > > > 试验了一下 Flink-1.11 hive streaming 的功能 > > https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/hive/hive_streaming.html > > 创建 kafka 表,通过 SQL 实时写入 Hive. > > 但我再通过 flink sql-client 客户端 select * from hive_table 客户端没有任何返回,通过 flink > webUI 页面观察 这个 select * from hive_table 的 job 已经结束了。 > > 谢谢, > 王磊 > > > > wangl...@geekplus.com.cn > > > -- Best regards! Rui Li
回复: Re: 不能实时读取实时写入到 Hive 的数据
我把问题简化一下,创建 Hive 表时不带任何参数 CREATE TABLE hive_ods_wms_pick_order ( order_no STRING, status INT, dispatch_time TIMESTAMP ) STORED AS parquet INSERT INTO TABLE hive_ods_wms_pick_order SELECT order_no, status, dispatch_time FROM kafka_ods_wms_pick_order; 我用的 sql-client 客户端,15 分钟过去了 hive 表对应的 hdfs 目录为什么还只是有一个大小为 0 的 .part 文件呢? 我在 flink 客户端 SELECT order_no, status, dispatch_time FROM kafka_ods_wms_pick_order 确实是有数据返回的。 我在 flink web ui 看了下这个 job 的 Checkpoint Counts 是 0. 是需要让 job 做 checkpoint 才能写到 hdfs 上吗? 我用 Flink sql-client 客户端怎么设置做 checkpoint 的频率呢? 谢谢, 王磊 wangl...@geekplus.com.cn 发件人: Leonard Xu 发送时间: 2020-07-14 17:20 收件人: user-zh; 夏帅 抄送: wangl...@geekplus.com.cn 主题: Re: 不能实时读取实时写入到 Hive 的数据 Hi, wanglei 这个参数 'sink.partition-commit.delay'='1 h’会在cp 完成后 + 你设置的1h delay后才会提交 hive 的分区已完成信息(通过metastore或success文件). 你看下夏帅贴的邮件,检查下 checkpoint 和 partition-commit的设置 祝好, Leonard Xu 在 2020年7月14日,16:59,夏帅 写道: 你好, 可以参考下这个问题的解决 http://apache-flink.147419.n8.nabble.com/Table-options-do-not-contain-an-option-key-connector-for-discovering-a-connector-td4767.html -- 发件人:wangl...@geekplus.com.cn 发送时间:2020年7月14日(星期二) 16:50 收件人:user-zh ; 夏帅 ; Leonard Xu 主 题:Re: 回复: 不能实时读取实时写入到 Hive 的数据 应该是我没有理解 partitiion-commit 的意思,我看这里有文档:https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/filesystem.html#partition-commit CREATE TABLE kafka_ods_wms_pick_order ( order_no STRING, status INT, dispatch_time TIMESTAMP(3) ) WITH ( 'connector' = 'kafka', 'topic' = 'ods_wms_pick_order', 'properties.bootstrap.servers' = ':9092', 'properties.group.id' = 'testGroup', 'format' = 'json', 'scan.startup.mode' = 'latest-offset' ) CREATE TABLE hive_ods_wms_pick_order ( order_no STRING, status INT, dispatch_time TIMESTAMP ) PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet TBLPROPERTIES ( 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00', 'sink.partition-commit.trigger'='partition-time', 'sink.partition-commit.delay'='1 h', 'sink.partition-commit.policy.kind'='metastore,success-file' ); INSERT INTO TABLE hive_ods_wms_pick_order SELECT order_no, status, dispatch_time, DATE_FORMAT(dispatch_time, '-MM-dd'), DATE_FORMAT(dispatch_time, 'HH') FROM kafka_ods_wms_pick_order; SELECT * FROM hive_ods_wms_pick_order /*+ OPTIONS('streaming-source.enable'='true', 'streaming-source.consume-start-offset'='2020-07-24') */; wangl...@geekplus.com.cn Sender: 夏帅 Send Time: 2020-07-14 16:43 Receiver: user-zh; xbjtdcq Subject: 回复:Re: 不能实时读取实时写入到 Hive 的数据 你好, 这说明写入的hive文件没有进行rollup,可以贴下SQL么 -- 发件人:wangl...@geekplus.com.cn 发送时间:2020年7月14日(星期二) 16:40 收件人:user-zh ; xbjtdcq 主 题:回复: Re: 不能实时读取实时写入到 Hive 的数据 我加上了这个 tablehint 。 任务提交上去了,但客户端还是没有任何返回显示。 我到 hadoop 集群上看了下 hive 表所在的这个目录,所有的文件都是 .part 开头的 inprogress 文件。 谢谢, 王磊 wangl...@geekplus.com.cn 发件人: Leonard Xu 发送时间: 2020-07-14 16:17 收件人: user-zh 主题: Re: 不能实时读取实时写入到 Hive 的数据 HI, wanglei 你开启了 streaming-source.enable 吗?这个参数用于指定如何读取是batch读,还是stream读,如果你要实时读的话应该把这个值设定为true, 可以使用tablehints 方便地指定参数。 SELECT * FROM hive_table /*+ OPTIONS('streaming-source.enable'='true', 'streaming-source.consume-start-offset'='2020-05-20') */; 就在你看得这个页面应该有对应的文档说明如何读取hive数据。 祝好, Leonard Xu 在 2020年7月14日,15:47,wangl...@geekplus.com.cn 写道: 试验了一下 Flink-1.11 hive streaming 的功能 https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/hive/hive_streaming.html 创建 kafka 表,通过 SQL 实时写入 Hive. 但我再通过 flink sql-client 客户端 select * from hive_table 客户端没有任何返回,通过 flink webUI 页面观察 这个 select * from hive_table 的 job 已经结束了。 谢谢, 王磊 wangl...@geekplus.com.cn
Re: 回复: 不能实时读取实时写入到 Hive 的数据
应该是我没有理解 partitiion-commit 的意思,我看这里有文档:https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/filesystem.html#partition-commit CREATE TABLE kafka_ods_wms_pick_order ( order_no STRING, status INT, dispatch_time TIMESTAMP(3) ) WITH ( 'connector' = 'kafka', 'topic' = 'ods_wms_pick_order', 'properties.bootstrap.servers' = ':9092', 'properties.group.id' = 'testGroup', 'format' = 'json', 'scan.startup.mode' = 'latest-offset' ) CREATE TABLE hive_ods_wms_pick_order ( order_no STRING, status INT, dispatch_time TIMESTAMP ) PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet TBLPROPERTIES ( 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00', 'sink.partition-commit.trigger'='partition-time', 'sink.partition-commit.delay'='1 h', 'sink.partition-commit.policy.kind'='metastore,success-file' ); INSERT INTO TABLE hive_ods_wms_pick_order SELECT order_no, status, dispatch_time, DATE_FORMAT(dispatch_time, '-MM-dd'), DATE_FORMAT(dispatch_time, 'HH') FROM kafka_ods_wms_pick_order; SELECT * FROM hive_ods_wms_pick_order /*+ OPTIONS('streaming-source.enable'='true', 'streaming-source.consume-start-offset'='2020-07-24') */; wangl...@geekplus.com.cn Sender: 夏帅 Send Time: 2020-07-14 16:43 Receiver: user-zh; xbjtdcq Subject: 回复:Re: 不能实时读取实时写入到 Hive 的数据 你好, 这说明写入的hive文件没有进行rollup,可以贴下SQL么 -- 发件人:wangl...@geekplus.com.cn 发送时间:2020年7月14日(星期二) 16:40 收件人:user-zh ; xbjtdcq 主 题:回复: Re: 不能实时读取实时写入到 Hive 的数据 我加上了这个 tablehint 。 任务提交上去了,但客户端还是没有任何返回显示。 我到 hadoop 集群上看了下 hive 表所在的这个目录,所有的文件都是 .part 开头的 inprogress 文件。 谢谢, 王磊 wangl...@geekplus.com.cn 发件人: Leonard Xu 发送时间: 2020-07-14 16:17 收件人: user-zh 主题: Re: 不能实时读取实时写入到 Hive 的数据 HI, wanglei 你开启了 streaming-source.enable 吗?这个参数用于指定如何读取是batch读,还是stream读,如果你要实时读的话应该把这个值设定为true, 可以使用tablehints 方便地指定参数。 SELECT * FROM hive_table /*+ OPTIONS('streaming-source.enable'='true', 'streaming-source.consume-start-offset'='2020-05-20') */; 就在你看得这个页面应该有对应的文档说明如何读取hive数据。 祝好, Leonard Xu > 在 2020年7月14日,15:47,wangl...@geekplus.com.cn 写道: > > > 试验了一下 Flink-1.11 hive streaming 的功能 > https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/hive/hive_streaming.html > > 创建 kafka 表,通过 SQL 实时写入 Hive. > > 但我再通过 flink sql-client 客户端 select * from hive_table 客户端没有任何返回,通过 flink webUI > 页面观察 这个 select * from hive_table 的 job 已经结束了。 > > 谢谢, > 王磊 > > > > wangl...@geekplus.com.cn >
回复: Re: 不能实时读取实时写入到 Hive 的数据
我加上了这个 tablehint 。 任务提交上去了,但客户端还是没有任何返回显示。 我到 hadoop 集群上看了下 hive 表所在的这个目录,所有的文件都是 .part 开头的 inprogress 文件。 谢谢, 王磊 wangl...@geekplus.com.cn 发件人: Leonard Xu 发送时间: 2020-07-14 16:17 收件人: user-zh 主题: Re: 不能实时读取实时写入到 Hive 的数据 HI, wanglei 你开启了 streaming-source.enable 吗?这个参数用于指定如何读取是batch读,还是stream读,如果你要实时读的话应该把这个值设定为true, 可以使用tablehints 方便地指定参数。 SELECT * FROM hive_table /*+ OPTIONS('streaming-source.enable'='true', 'streaming-source.consume-start-offset'='2020-05-20') */; 就在你看得这个页面应该有对应的文档说明如何读取hive数据。 祝好, Leonard Xu > 在 2020年7月14日,15:47,wangl...@geekplus.com.cn 写道: > > > 试验了一下 Flink-1.11 hive streaming 的功能 > https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/hive/hive_streaming.html > > 创建 kafka 表,通过 SQL 实时写入 Hive. > > 但我再通过 flink sql-client 客户端 select * from hive_table 客户端没有任何返回,通过 flink webUI > 页面观察 这个 select * from hive_table 的 job 已经结束了。 > > 谢谢, > 王磊 > > > > wangl...@geekplus.com.cn >
不能实时读取实时写入到 Hive 的数据
试验了一下 Flink-1.11 hive streaming 的功能 https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/hive/hive_streaming.html 创建 kafka 表,通过 SQL 实时写入 Hive. 但我再通过 flink sql-client 客户端 select * from hive_table 客户端没有任何返回,通过 flink webUI 页面观察 这个 select * from hive_table 的 job 已经结束了。 谢谢, 王磊 wangl...@geekplus.com.cn
Re: Re: How to dynamically initialize flink metrics in invoke method and then reuse it?
Seems there's no direct solution. Perhaps i can implement this by initializing a HashMap with all the possible value of tableName in `open` mehtod and get the corresponding Meter according to tableName in the `invoke` method. Thanks, Lei wangl...@geekplus.com.cn Sender: wangl...@geekplus.com.cn Send Time: 2020-07-03 14:27 Receiver: Xintong Song cc: user Subject: Re: Re: How to dynamically initialize flink metrics in invoke method and then reuse it? Hi Xintong, Yes, initializing the metric in the `open` method works, but it doesn't solve my problem. I want to initialize the metric with a name that is extracted from the record content. Only in the `invoke` method i can do it. Actually my scenario is as follows. The record is MySQL binlog info. I want to monitor the qps by tableName. The tableName is different for every record. Thanks, Lei wangl...@geekplus.com.cn Sender: Xintong Song Send Time: 2020-07-03 13:14 Receiver: wangl...@geekplus.com.cn cc: user Subject: Re: How to dynamically initialize flink metrics in invoke method and then reuse it? Hi Lei, I think you should initialize the metric in the `open` method. Then you can save the initialized metric as a class field, and update it in the `invoke` method for each record. Thank you~ Xintong Song On Fri, Jul 3, 2020 at 11:50 AM wangl...@geekplus.com.cn wrote: In one flink operator, i want to initialize multiple flink metrics according to message content. As the code below. public void invoke(ObjectNode node, Context context) throws Exception { String tableName = node.get("metadata").get("topic").asText(); Meter meter = getRuntimeContext().getMetricGroup().meter(tableName, new MeterView(10)); meter.markEvent(); log.info("### counter: " + meter.toString() + "\t" + meter.getCount()); But in this way every invoke call will initialize a new metrics and the count will be from zero again. How can i reuse the metric initialized before? Thanks, Lei wangl...@geekplus.com.cn
Re: Re: How to dynamically initialize flink metrics in invoke method and then reuse it?
Hi Xintong, Yes, initializing the metric in the `open` method works, but it doesn't solve my problem. I want to initialize the metric with a name that is extracted from the record content. Only in the `invoke` method i can do it. Actually my scenario is as follows. The record is MySQL binlog info. I want to monitor the qps by tableName. The tableName is different for every record. Thanks, Lei wangl...@geekplus.com.cn Sender: Xintong Song Send Time: 2020-07-03 13:14 Receiver: wangl...@geekplus.com.cn cc: user Subject: Re: How to dynamically initialize flink metrics in invoke method and then reuse it? Hi Lei, I think you should initialize the metric in the `open` method. Then you can save the initialized metric as a class field, and update it in the `invoke` method for each record. Thank you~ Xintong Song On Fri, Jul 3, 2020 at 11:50 AM wangl...@geekplus.com.cn wrote: In one flink operator, i want to initialize multiple flink metrics according to message content. As the code below. public void invoke(ObjectNode node, Context context) throws Exception { String tableName = node.get("metadata").get("topic").asText(); Meter meter = getRuntimeContext().getMetricGroup().meter(tableName, new MeterView(10)); meter.markEvent(); log.info("### counter: " + meter.toString() + "\t" + meter.getCount()); But in this way every invoke call will initialize a new metrics and the count will be from zero again. How can i reuse the metric initialized before? Thanks, Lei wangl...@geekplus.com.cn
How to dynamically initialize flink metrics in invoke method and then reuse it?
In one flink operator, i want to initialize multiple flink metrics according to message content. As the code below. public void invoke(ObjectNode node, Context context) throws Exception { String tableName = node.get("metadata").get("topic").asText(); Meter meter = getRuntimeContext().getMetricGroup().meter(tableName, new MeterView(10)); meter.markEvent(); log.info("### counter: " + meter.toString() + "\t" + meter.getCount()); But in this way every invoke call will initialize a new metrics and the count will be from zero again. How can i reuse the metric initialized before? Thanks, Lei wangl...@geekplus.com.cn
Re: 回复:在一个 flink operator 中怎样根据消息内容动态生成多个监控指标?
public void invoke(ObjectNode node, Context context) throws Exception { String tableName = node.get("metadata").get("topic").asText(); Meter meter = getRuntimeContext().getMetricGroup().meter(tableName, new MeterView(10)); meter.markEvent(); log.info("### counter: " + meter.toString() + "\t" + meter.getCount()); 如上面代码所示,在 invoke 方法中解析得到 tableName, 以 tableName 名字作为 metrics. 但这样写每一消息下来了后相当于重新定义了 这个 metrics , 又从 0 开始计数了。 谢谢, 王磊 wangl...@geekplus.com.cn Sender: kcz Send Time: 2020-07-03 09:13 Receiver: wanglei2 Subject: 回复:在一个 flink operator 中怎样根据消息内容动态生成多个监控指标? 按照你的描述 你就是少了tablename,那么你解析log 得到了tablename又做metric就好了吧 -- 原始邮件 -- 发件人: 王磊2 发送时间: 2020年7月2日 21:46 收件人: user-zh , 17610775726 <17610775...@163.com> 主题: 回复:在一个 flink operator 中怎样根据消息内容动态生成多个监控指标? 没有明白你说的实现方式。 我最终要得到类似的 Metrics: myCounter_table1, myCounter_table2, ..., myCounter_tableX 但我看代码中 Metrics 的初始化都是在 open 方法中的,在这个方法中我没法得到 tableName 是什么。 谢谢, 王磊 -- 发件人:JasonLee <17610775...@163.com> 发送时间:2020年7月2日(星期四) 21:12 收件人:user-zh 主 题:回复:在一个 flink operator 中怎样根据消息内容动态生成多个监控指标? 你把tablename传到下面metric里不就行了吗 | | JasonLee | | 邮箱:17610775...@163.com | Signature is customized by Netease Mail Master 在2020年07月02日 16:39,wangl...@geekplus.com.cn 写道: 全都是同一种类型的 metrics. 比如消息中是 mysql binlog 解析结果,我想要根据消息内容拿到 tableName, 按 tableName 生成不同名称的 metrics(但都是 meter 类型) 谢谢, 王磊 wangl...@geekplus.com.cn 发件人: JasonLee 发送时间: 2020-07-02 16:16 收件人: user-zh 主题: 回复:在一个 flink operator 中怎样根据消息内容动态生成多个监控指标? 是要生成不同类型的metric吗 比如counter meter ? | | JasonLee | | 邮箱:17610775...@163.com | Signature is customized by Netease Mail Master 在2020年07月02日 15:34,wangl...@geekplus.com.cn 写道: 官网上的例子: public class MyMapper extends RichMapFunction { private transient Counter counter; @Override public void open(Configuration config) { this.counter = getRuntimeContext() .getMetricGroup() .counter("myCounter"); } @Override public String map(String value) throws Exception { this.counter.inc(); return value; } } 我想要根据 map 方法中传入的参数生成不同的 监控指标,怎样可以实现呢? 谢谢, 王磊 wangl...@geekplus.com.cn
回复: 回复:在一个 flink operator 中怎样根据消息内容动态生成多个监控指标?
全都是同一种类型的 metrics. 比如消息中是 mysql binlog 解析结果,我想要根据消息内容拿到 tableName, 按 tableName 生成不同名称的 metrics(但都是 meter 类型) 谢谢, 王磊 wangl...@geekplus.com.cn 发件人: JasonLee 发送时间: 2020-07-02 16:16 收件人: user-zh 主题: 回复:在一个 flink operator 中怎样根据消息内容动态生成多个监控指标? 是要生成不同类型的metric吗 比如counter meter ? | | JasonLee | | 邮箱:17610775...@163.com | Signature is customized by Netease Mail Master 在2020年07月02日 15:34,wangl...@geekplus.com.cn 写道: 官网上的例子: public class MyMapper extends RichMapFunction { private transient Counter counter; @Override public void open(Configuration config) { this.counter = getRuntimeContext() .getMetricGroup() .counter("myCounter"); } @Override public String map(String value) throws Exception { this.counter.inc(); return value; } } 我想要根据 map 方法中传入的参数生成不同的 监控指标,怎样可以实现呢? 谢谢, 王磊 wangl...@geekplus.com.cn
在一个 flink operator 中怎样根据消息内容动态生成多个监控指标?
官网上的例子: public class MyMapper extends RichMapFunction { private transient Counter counter; @Override public void open(Configuration config) { this.counter = getRuntimeContext() .getMetricGroup() .counter("myCounter"); } @Override public String map(String value) throws Exception { this.counter.inc(); return value; } } 我想要根据 map 方法中传入的参数生成不同的 监控指标,怎样可以实现呢? 谢谢, 王磊 wangl...@geekplus.com.cn
Re: Re: Flip-105 can the debezium/canal SQL sink to database directly?
CREATE TABLE t_pick_order ( order_no VARCHAR, status INT ) WITH ( 'connector' = 'kafka', 'topic' = 'example', 'scan.startup.mode' = 'latest-offset', 'properties.bootstrap.servers' = '172.19.78.32:9092', 'format' = 'canal-json' ) CREATE TABLE order_status ( order_no VARCHAR, status INT, PRIMARY KEY (order_no) NOT ENFORCED ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://xxx:3306/flink_test', 'table-name' = 'order_status', 'username' = 'dev', 'password' = '' ) But when i execute insert INTO order_status SELECT order_no, status FROM t_pick_order There's error: [ERROR] Could not execute SQL statement. Reason: org.apache.flink.table.api.TableException: Provided trait [BEFORE_AND_AFTER] can't satisfy required trait [ONLY_UPDATE_AFTER]. This is a bug in planner, please file an issue. Current node is TableSourceScan(table=[[default_catalog, default_database, t_pick_order]], fields=[order_no, status]) wangl...@geekplus.com.cn From: Danny Chan Date: 2020-06-30 20:25 To: wangl...@geekplus.com.cn Subject: Re: Re: Flip-105 can the debezium/canal SQL sink to database directly? Hi, wanglei2 ~ For primary key syntax you can reference [1] for the “PRIMARY KEY” part, notice that currently we only support the NOT ENFORCED mode. Here is the reason: >SQL standard specifies that a constraint can either be ENFORCED or NOT >ENFORCED. This controls if the constraint checks are performed on the >incoming/outgoing data. Flink does not own the data therefore the only mode we >want to support is the NOT ENFORCED mode. It is up to the user to ensure that >the query enforces key integrity. For DDL to create JDBC table, you can reference [2] [1] https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/create.html#create-table [2] https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/jdbc.html#how-to-create-a-jdbc-table Best, Danny Chan 在 2020年6月30日 +0800 AM10:25,wangl...@geekplus.com.cn ,写道: Thanks Jingsong, Is there any document or example to this? I will build the flink-1.11 package and have a try. Thanks, Lei wangl...@geekplus.com.cn From: Jingsong Li Date: 2020-06-30 10:08 To: wangl...@geekplus.com.cn CC: user Subject: Re: Flip-105 can the debezium/canal SQL sink to database directly? Hi Lei, INSERT INTO jdbc_table SELECT * FROM changelog_table; For Flink 1.11 new connectors, you need to define the primary key for jdbc_table (and also your mysql table needs to have the corresponding primary key) because changelog_table has the "update", "delete" records. And then, jdbc sink will: - insert or update using "INSERT INTO ... ON DUPLICATE KEY UPDATE ." to deal with "insert" and "update" messages. - delete to deal with "delete" messages. So generally speaking, with the primary key, this mysql table will be the same to your source database table. (table for generating changelog) Best, Jingsong On Tue, Jun 30, 2020 at 9:58 AM wangl...@geekplus.com.cn wrote: CREATE TABLE my_table ( id BIGINT, first_name STRING, last_name STRING, email STRING ) WITH ( 'connector'='kafka', 'topic'='user_topic', 'properties.bootstrap.servers'='localhost:9092', 'scan.startup.mode'='earliest-offset', 'format'='debezium-json' ); INSERT INTO mysql_sink_table SELECT id, first_name, last_name FROM my_table; What will happen after i execute the insert sql statement? For the update/delete message from kafka, the corresponding record will be updated or deleted in the mysql_sink_table? INSERT INTO mysql_sink_table SELECT id, first_name, last_name FROM my_table; Thanks, Lei wangl...@geekplus.com.cn -- Best, Jingsong Lee
Re: Re: Flip-105 can the debezium/canal SQL sink to database directly?
Thanks Jingsong, Is there any document or example to this? I will build the flink-1.11 package and have a try. Thanks, Lei wangl...@geekplus.com.cn From: Jingsong Li Date: 2020-06-30 10:08 To: wangl...@geekplus.com.cn CC: user Subject: Re: Flip-105 can the debezium/canal SQL sink to database directly? Hi Lei, INSERT INTO jdbc_table SELECT * FROM changelog_table; For Flink 1.11 new connectors, you need to define the primary key for jdbc_table (and also your mysql table needs to have the corresponding primary key) because changelog_table has the "update", "delete" records. And then, jdbc sink will: - insert or update using "INSERT INTO ... ON DUPLICATE KEY UPDATE ." to deal with "insert" and "update" messages. - delete to deal with "delete" messages. So generally speaking, with the primary key, this mysql table will be the same to your source database table. (table for generating changelog) Best, Jingsong On Tue, Jun 30, 2020 at 9:58 AM wangl...@geekplus.com.cn wrote: CREATE TABLE my_table ( id BIGINT, first_name STRING, last_name STRING, email STRING ) WITH ( 'connector'='kafka', 'topic'='user_topic', 'properties.bootstrap.servers'='localhost:9092', 'scan.startup.mode'='earliest-offset', 'format'='debezium-json' ); INSERT INTO mysql_sink_table SELECT id, first_name, last_name FROM my_table; What will happen after i execute the insert sql statement? For the update/delete message from kafka, the corresponding record will be updated or deleted in the mysql_sink_table? INSERT INTO mysql_sink_table SELECT id, first_name, last_name FROM my_table; Thanks, Lei wangl...@geekplus.com.cn -- Best, Jingsong Lee
Flip-105 can the debezium/canal SQL sink to database directly?
CREATE TABLE my_table ( id BIGINT, first_name STRING, last_name STRING, email STRING ) WITH ( 'connector'='kafka', 'topic'='user_topic', 'properties.bootstrap.servers'='localhost:9092', 'scan.startup.mode'='earliest-offset', 'format'='debezium-json' ); INSERT INTO mysql_sink_table SELECT id, first_name, last_name FROM my_table; What will happen after i execute the insert sql statement? For the update/delete message from kafka, the corresponding record will be updated or deleted in the mysql_sink_table? INSERT INTO mysql_sink_table SELECT id, first_name, last_name FROM my_table; Thanks, Lei wangl...@geekplus.com.cn
????: ??????Flink State ?????????? state ????????????
OrderState get set ?? flink ??OrderState?? wang...@geekplus.com.cn 1048262223 ?? 2020-06-09 18:11 user-zh ?? ??Flink State ?? state Hi flink??OrderStatepojo??savepoint?? Best, Yichao Yang ---- ??:"wangl...@geekplus.com.cn"
Flink State 增加字段后 state 还能识别吗?
写了个简单的类会在 Flink State 中使用: public class OrderState { private Integer warehouseId; private String orderNo; private String ownerCode; private Long inputDate; private int orderType; private int amount = 0; private int status = 0; . } 现在程序要升级,这个类还要增加一个新的字段。从state 能正常恢复吗? 也就是 flink run -s savepointdir 后能正常识别旧的代码保存的 state 吗? 谢谢, 王磊 wangl...@geekplus.com.cn
回复: Re: java.lang.AbstractMethodError when implementing KafkaSerializationSchema
It is because the jar conflict and i have fixed it. I put flink-connector-kafka_2.11-1.10.0.jar in the flink lib directory. Also in my project pom file has the dependency flink-connector-kafka and builded as a fat jar Thanks, Lei wangl...@geekplus.com.cn 发件人: Leonard Xu 发送时间: 2020-05-26 15:47 收件人: Aljoscha Krettek 抄送: user 主题: Re: java.lang.AbstractMethodError when implementing KafkaSerializationSchema Hi,wanglei I think Aljoscha is wright. Could you post your denpendency list? Dependency flink-connector-kafka is used in dataStream Application which you should use, dependency flink-sql-connector-kafka is used in Table API & SQL Application. We should only add one of them because the two dependency will conflict. Best, Leonard Xu 在 2020年5月26日,15:02,Aljoscha Krettek 写道: I think what might be happening is that you're mixing dependencies from the flink-sql-connector-kafka and the proper flink-connector-kafka that should be used with the DataStream API. Could that be the case? Best, Aljoscha On 25.05.20 19:18, Piotr Nowojski wrote: Hi, It would be helpful if you could provide full stack trace, what Flink version and which Kafka connector version are you using? It sounds like either a dependency convergence error (mixing Kafka dependencies/various versions of flink-connector-kafka inside a single job/jar) or some shading issue. Can you check your project for such issues (`mvn dependency:tree` command [1]). Also what’s a bit suspicious for me is the return type: Lorg/apache/flink/kafka/shaded/org/apache/kafka/clients/producer/ProducerRecord; I’m not sure, but I was not aware that we are shading Kafka dependency in our connectors? Are you manually shading something? Piotrek [1] https://maven.apache.org/plugins/maven-dependency-plugin/examples/resolving-conflicts-using-the-dependency-tree.html <https://maven.apache.org/plugins/maven-dependency-plugin/examples/resolving-conflicts-using-the-dependency-tree.html> On 22 May 2020, at 15:34, wangl...@geekplus.com.cn wrote: public class MyKafkaSerializationSchema implements KafkaSerializationSchema> { @Override public ProducerRecord serialize(Tuple2 o, @Nullable Long aLong) { ProducerRecord record = new ProducerRecord<>(o.f0, o.f1.getBytes(StandardCharsets.UTF_8)); return record; } } FlinkKafkaProducer> producer = new FlinkKafkaProducer>( "default", new MyKafkaSerializationSchema(), prop2,Semantic.EXACTLY_ONCE); But there's error when runnng: java.lang.AbstractMethodError: com.geekplus.flinketl.schema.MyKafkaSerializationSchema.serialize(Ljava/lang/Object;Ljava/lang/Long;)Lorg/apache/flink/kafka/shaded/org/apache/kafka/clients/producer/ProducerRecord; Any suggestion on this? Thanks, Lei wangl...@geekplus.com.cn <mailto:wangl...@geekplus.com.cn>
java.lang.AbstractMethodError when implementing KafkaSerializationSchema
public class MyKafkaSerializationSchema implements KafkaSerializationSchema> { @Override public ProducerRecord serialize(Tuple2 o, @Nullable Long aLong) { ProducerRecord record = new ProducerRecord<>(o.f0, o.f1.getBytes(StandardCharsets.UTF_8)); return record; } } FlinkKafkaProducer> producer = new FlinkKafkaProducer>( "default", new MyKafkaSerializationSchema(), prop2,Semantic.EXACTLY_ONCE); But there's error when runnng: java.lang.AbstractMethodError: com.geekplus.flinketl.schema.MyKafkaSerializationSchema.serialize(Ljava/lang/Object;Ljava/lang/Long;)Lorg/apache/flink/kafka/shaded/org/apache/kafka/clients/producer/ProducerRecord; Any suggestion on this? Thanks, Lei wangl...@geekplus.com.cn
flink how to access remote hdfs using namenode nameservice
According to https://ci.apache.org/projects/flink/flink-docs-stable/ops/jobmanager_high_availability.html I am deploying standalone cluster with jobmanager HA and need the hdfs address: high-availability.storageDir: hdfs:///flink/recovery My hadoop is a remote cluster. I can write it as hdfs://active-namenode-ip:8020. But this way lost namenode HA Is there's any method that I can config it as hdfs://name-service:8020 Thanks, Lei wangl...@geekplus.com.cn
Re: 回复:在已有 Hadoop 外搭建 standalone 模式 HA flink 集群
我试了下是可以的,但现在有一个访问 HDFS 的问题。 我用的 hadoop 是阿里云 EMR 管理, 在 EMR 管理的机器上可以以 hdfs://emr-cluster:8020/ 访问 HDFS 但我部署的 Flink 不属于 EMR 管理,这个地址是不能解析的,我只能写成 hdfs://active-namenode-ip:8020/ 的形式,NameNode 丧失了 HA 的功能 有什么方式解决这个问题吗? 谢谢, 王磊 wangl...@geekplus.com.cn Sender: Andrew Send Time: 2020-05-07 12:31 Receiver: user-zh Subject: 回复:在已有 Hadoop 外搭建 standalone 模式 HA flink 集群 https://ci.apache.org/projects/flink/flink-docs-stable/ops/jobmanager_high_availability.html ---原始邮件--- 发件人: "wangl...@geekplus.com.cn"
回复: 在已有 Hadoop 外搭建 standalone 模式 HA flink 集群
看起来这个文档可以,我先试下: https://ci.apache.org/projects/flink/flink-docs-stable/ops/jobmanager_high_availability.html wangl...@geekplus.com.cn 发件人: wangl...@geekplus.com.cn 发送时间: 2020-05-07 12:23 收件人: user-zh 主题: 在已有 Hadoop 外搭建 standalone 模式 HA flink 集群 现在已经有了一个 Hadoop 集群。 我想在这个 集群外(不同的机器,网络互通)部署一个 standalone 模式的 flink cluster,配置 jobmanager 的 HA,访问和使用已有 Hadoop 的 HDFS,使用已有 Hadoop 的 zookeeper 这个有参考文档怎么操作吗? 谢谢, 王磊 wangl...@geekplus.com.cn
Re: Re:FlinkSQL Retraction 问题原理咨询
Thanks Jingsong Lee. 我用的是 MySQL,sink 表中没有任何主键或唯一键. 如果 sink 表设主键或唯一键,确实能达到只保留两条记录的效果。 我把 flink sql-client 客户端设置 SET execution.result-mode=changelog 试验了下,左边标上了是第几条 kafka 消息导致的行为: +/- tms_company order_cnt 1 + zhongtong 1 2 - zhongtong 1 2 + yuantong 1 3 - yuantong 1 3 + yuantong 2 4 - yuantong 2 4 + yuantong 1 4 + zhongtong 1 第1条消息:执行一个 INSERT 第2条消息:执行了 一个 DELETE, 一个 INSERT 第3条消息:执行了一个 INSERT ON DUPLICATE UPDATE 第4条消息:执行了两个 INSERT ON DUPLICATE UPDATE 我总结这个逻辑应该是如果 回撤导致的结果变成 0 ,就会执行 delete , 否则就是INSERT ON DUPLICATE UPDATE 不知道我这样理解是否正确。 谢谢, 王磊 wangl...@geekplus.com.cn Sender: Jingsong Li Send Time: 2020-05-06 11:35 Receiver: user-zh Subject: Re: Re:FlinkSQL Retraction 问题原理咨询 Hi, 问题一:删除数据可不单单只是retract stream的功能。upsert stream是当下游具有按key覆盖的功能时的特殊优化,除了按key覆盖外,它也需要在上游retract时删除数据,意思是upsert stream也有retract的input数据的。JDBC实现的是upsert stream的消费。 问题二:正确数据应该是: 1 {"order_id":1,"tms_company":"zhongtong"} 数据库1条记录: zhongtong 1 2 {"order_id":1,"tms_company":"yuantong"} 数据库1条记录: yuantong 1 ( 删除 zhongtong 1) 3 {"order_id":2,"tms_company":"yuantong"} 数据库1条记录: yuantong 2 ( 删除yuantong 1) 4 {"order_id":2,"tms_company":"zhongtong"} 数据库2条记录: yuantong 1, zhongtong 1( 删除yuantong 2) 你用了什么dialect?是不是mysql? Flink JDBC的Mysql用了DUPLICATE KEY UPDATE的语法来更新数据。 看起来这个语法在RDS没有建主键或者唯一键时可能不会去覆盖老数据?尝试创建下主键或唯一建? Best, Jingsong Lee On Wed, May 6, 2020 at 10:36 AM wangl...@geekplus.com.cn < wangl...@geekplus.com.cn> wrote: > > 更新键是 tms_company, 但这是通过双层的 group 实现了回退功能,总共就 两个 order_id, order_id 对应的 > tms_company 是有变化的。 > 我不是很明白这种回退的具体原理,为什么有的会删除,有的就没有删除。 > > > > > wangl...@geekplus.com.cn > > 发件人: Michael Ran > 发送时间: 2020-04-30 17:23 > 收件人: user-zh > 主题: Re:FlinkSQL Retraction 问题原理咨询 > > > > 指定的更新键是tms_company? > > > 结果是: > yuantong:2 > zhongtong:2 > > > > > > > > > > > > > 在 2020-04-30 17:08:22,"wangl...@geekplus.com.cn" > 写道: > > > >自己实现了一下 https://yq.aliyun.com/articles/457392/ 菜鸟物流订单统计的例子,读 kafka 写到 > RDS, RDS 表没有主键,也没有唯一键。 > > > >INSERT INTO table_out select tms_company, count(distinct order_id) as > order_cnt from > >(select order_id, LAST_VALUE(tms_company) AS tms_company from > dwd_table group by order_id) > > group by tms_company; > > > > > >总共发送了 4 条消息,顺序如下: > > > >1 {"order_id":1,"tms_company":"zhongtong"} 数据库1条记录: zhongtong 1 > > > >2 {"order_id":1,"tms_company":"yuantong"} 数据库1条记录: yuantong 1 > (上一条记录被删除了) > > > >3 {"order_id":2,"tms_company":"yuantong"} 数据库2条记录: yuantong 1, > yuantong 2 (增加了条记录,没有删除) > > > >4 {"order_id":2,"tms_company":"zhongtong"} 数据库4条记录: yuantong 1, > yuantong 2, yuantong 1, zhongtong 1(增加了两条记录,没有删除) > > > > > >问题一: > >第 2 条消息发送后,数据库的上一条记录被删除了。我的理解这应该是 RetractStream 的功能。当我看源码 > https://github.com/apache/flink/tree/master/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc > 这里也没有 RetractionStream 的实现,哪里的代码把他删除了呢? > > > >问题二: > > 第 3 条记录来了后,直接在数据库增加了 一条 yuantong 2, 为什么没把 yuantong 1, 删除呢? > > 第 4 条记录来了后,又在数据库里加了两条记录,为什么也没有删除之前的呢? > > > >谢谢, > >王磊 > > > > > > > >wangl...@geekplus.com.cn > -- Best, Jingsong Lee
FlinkSQL Retraction 问题原理咨询
自己实现了一下 https://yq.aliyun.com/articles/457392/ 菜鸟物流订单统计的例子,读 kafka 写到 RDS, RDS 表没有主键,也没有唯一键。 INSERT INTO table_out select tms_company, count(distinct order_id) as order_cnt from (select order_id, LAST_VALUE(tms_company) AS tms_company from dwd_table group by order_id) group by tms_company; 总共发送了 4 条消息,顺序如下: 1 {"order_id":1,"tms_company":"zhongtong"} 数据库1条记录: zhongtong 1 2 {"order_id":1,"tms_company":"yuantong"} 数据库1条记录: yuantong 1 (上一条记录被删除了) 3 {"order_id":2,"tms_company":"yuantong"} 数据库2条记录: yuantong 1, yuantong 2 (增加了条记录,没有删除) 4 {"order_id":2,"tms_company":"zhongtong"} 数据库4条记录: yuantong 1, yuantong 2, yuantong 1, zhongtong 1(增加了两条记录,没有删除) 问题一: 第 2 条消息发送后,数据库的上一条记录被删除了。我的理解这应该是 RetractStream 的功能。当我看源码 https://github.com/apache/flink/tree/master/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc 这里也没有 RetractionStream 的实现,哪里的代码把他删除了呢? 问题二: 第 3 条记录来了后,直接在数据库增加了 一条 yuantong 2, 为什么没把 yuantong 1, 删除呢? 第 4 条记录来了后,又在数据库里加了两条记录,为什么也没有删除之前的呢? 谢谢, 王磊 wangl...@geekplus.com.cn
Re: Re: 1.11 snapshot: Name or service not knownname localhost and taskMgr not started
localhost is available on my CentOS machine. I can start the cluster(including taskMgr) with start-cluster.sh when using flink-1.10 release tgz package change the slave file localhost to 127.0.0.1 can resolve the problem. But it request the password of the host. Not change slave file, i can first start-cluster(only job manager started). And then “taskmanager.sh start“ to start the taskMgr. No password needed. Thanks, Lei wangl...@geekplus.com.cn Sender: Xintong Song Send Time: 2020-04-30 12:13 Receiver: wangl...@geekplus.com.cn cc: user Subject: Re: 1.11 snapshot: Name or service not knownname localhost and taskMgr not started Hi Lei, Could you check whether the hostname 'localhost' is available on your CentOS machine? This is usually defined in "/etc/hosts". You can also try to modify the slaves file, replacing 'localhost' with '127.0.0.1'. The path is: /conf/slaves Thank you~ Xintong Song On Thu, Apr 30, 2020 at 11:38 AM wangl...@geekplus.com.cn wrote: 1 Clone the 1.11 snapshot repository 2 Build it on windowns 3 Scp the flink-dist/target/flink-1.11-SNAPSHOT-bin/flink-1.11-SNAPSHOT dir to a CentOS server 4 ./bin/start-cluster.sh on the CentOS server There's message: Starting cluster. Starting standalonesession daemon on host test_3.6. : Name or service not knownname localhost Only the jobMgr is started. The taskMgr not start Any insight on this? Thanks Lei wangl...@geekplus.com.cn
1.11 snapshot: Name or service not knownname localhost and taskMgr not started
1 Clone the 1.11 snapshot repository 2 Build it on windowns 3 Scp the flink-dist/target/flink-1.11-SNAPSHOT-bin/flink-1.11-SNAPSHOT dir to a CentOS server 4 ./bin/start-cluster.sh on the CentOS server There's message: Starting cluster. Starting standalonesession daemon on host test_3.6. : Name or service not knownname localhost Only the jobMgr is started. The taskMgr not start Any insight on this? Thanks Lei wangl...@geekplus.com.cn
回复: 回复: FlinkSQL Upsert/Retraction 写入 MySQL 的问题
Thanks Leonard, JDBCUpsertTableSink 按照 Upsert 的方式处理,实际执行的 SQL 语句是 INSERT INTO ON DUPLICATE KEY 吗? 这个在源代码哪个地方呢? 谢谢, 王磊 wangl...@geekplus.com.cn 发件人: Leonard Xu 发送时间: 2020-04-27 12:58 收件人: user-zh 主题: Re: FlinkSQL Upsert/Retraction 写入 MySQL 的问题 Hi,wanglei > INSERT INTO mysql_sink SELECT f1, count(*) FROM kafka_src GROUP BY f1 > 每从 kafka 过来一条新的记录,会生成两条记录 Tuple2, 旧的被删除,新的会添加上。 这是query是会一个会产生retract stream的query,可以简单理解成每条kafka的数据过来会产生两条记录,但是最终写入下游的系统 需要看下游的系统支持和实现的sink(现在有三种sink AppendStreamSink, UpsertStreamSink, RetractStreamSink) > 我看 > https://github.com/apache/flink/tree/master/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc > 没有 Retract 方式 > 实际上使用了 JDBCUpsertTableSink.java 的代码写入 MySQL 吗? 现有的sink中,kafka是实现的AppendStreamSink,所以只支持insert 的记录,不支持retract. 你用DDL声明的mysql表,对应的jdbc sink 是JDBCUpsertTableSink,所以会按照Upsert的逻辑处理, 也不支持retract。 > 如若不带 group by 直接: > INSERT INTO mysql_sink SELECT f1, f2 FROM kafka_src > 主键冲突写入 mysql 是会出错的,怎么可以用 Upsert 的方式直接覆盖呢? 不带 group by时无法推导出query的 unique key,没法做按照unique key的更新, 只需要将 query的 key (你这里是group by 后的字段)和db中主键保持一致即可 Best, Leonard Xu
FlinkSQL Upsert/Retraction 写入 MySQL 的问题
INSERT INTO mysql_sink SELECT f1, count(*) FROM kafka_src GROUP BY f1 每从 kafka 过来一条新的记录,会生成两条记录 Tuple2, 旧的被删除,新的会添加上。 但这个 Sink 到底是用到了 UpsertStream 还是 RetractStream 呢,怎么判断是 UpsertStream 还是 RetractStream 呢? 我看 https://github.com/apache/flink/tree/master/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc 没有 Retract 方式 实际上使用了 JDBCUpsertTableSink.java 的代码写入 MySQL 吗? 如若不带 group by 直接: INSERT INTO mysql_sink SELECT f1, f2 FROM kafka_src 主键冲突写入 mysql 是会出错的,怎么可以用 Upsert 的方式直接覆盖呢? wangl...@geekplus.com.cn
FlinkSQL Upsert/Retraction 写入 MySQL 的问题
INSERT INTO mysql_sink SELECT f1, count(*) FROM kafka_src GROUP BY f1 每从 kafka 过来一条新的记录,会生成两条记录 Tuple2, 旧的被删除,新的会添加上。 但这个 Sink 到底是用到了 UpsertStream 还是 RetractStream 呢,怎么判断是 UpsertStream 还是 RetractStream 呢? 我看 https://github.com/apache/flink/tree/master/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc 没有 Retract 方式 实际上使用了 JDBCUpsertTableSink.java 的代码写入 MySQL 吗? 如若不带 group by 直接: INSERT INTO mysql_sink SELECT f1, f2 FROM kafka_src 主键冲突写入 mysql 是会出错的,怎么可以用 Upsert 的方式直接覆盖呢? wangl...@geekplus.com.cn
Re: Re: FlinkSQL query error when specify json-schema.
Thanks, I have tried. 'format.derive-schema' = 'true' will work. But if i insist to use format.json-schema, the CREATE TABLE must be writtten as: `id` DECIMAL(38,18), `timestamp` DECIMAL(38,18) wangl...@geekplus.com.cn From: Benchao Li Date: 2020-04-16 16:56 To: wangl...@geekplus.com.cn CC: user Subject: Re: FlinkSQL query error when specify json-schema. Hi wanglei, You don't need to specify 'format.json-schema', the format can derive schema from the DDL. Your exception above means the schema in 'format.json-schema' and DDL are not match. wangl...@geekplus.com.cn 于2020年4月16日周四 下午4:21写道: CREATE TABLE user_log( `id` INT, `timestamp` BIGINT ) WITH ( 'connector.type' = 'kafka', 'connector.version' = 'universal', 'connector.topic' = 'wanglei_jsontest', 'connector.startup-mode' = 'latest-offset', 'connector.properties.0.key' = 'zookeeper.connect', 'connector.properties.0.value' = '172.19.78.32:2181', 'connector.properties.1.key' = 'bootstrap.servers', 'connector.properties.1.value' = '172.19.78.32:9092', 'update-mode' = 'append', 'format.type' = 'json', 'format.json-schema' = '{ "type": "object", "properties": { "id": {"type": "integer"}, "timestamp": {"type": "number"} } }' ); Then select * from user_log; org.apache.flink.table.api.ValidationException: Type INT of table field 'id' does not match with the physical type LEGACY('DECIMAL', 'DECIMAL') of the 'id' field of the TableSource return type. Seems the specified type "integer", "number" can not be mapped to INT, BIGINT How can i solve this problem? Thanks, Lei wangl...@geekplus.com.cn -- Benchao Li School of Electronics Engineering and Computer Science, Peking University Tel:+86-15650713730 Email: libenc...@gmail.com; libenc...@pku.edu.cn
FlinkSQL query error when specify json-schema.
CREATE TABLE user_log( `id` INT, `timestamp` BIGINT ) WITH ( 'connector.type' = 'kafka', 'connector.version' = 'universal', 'connector.topic' = 'wanglei_jsontest', 'connector.startup-mode' = 'latest-offset', 'connector.properties.0.key' = 'zookeeper.connect', 'connector.properties.0.value' = '172.19.78.32:2181', 'connector.properties.1.key' = 'bootstrap.servers', 'connector.properties.1.value' = '172.19.78.32:9092', 'update-mode' = 'append', 'format.type' = 'json', 'format.json-schema' = '{ "type": "object", "properties": { "id": {"type": "integer"}, "timestamp": {"type": "number"} } }' ); Then select * from user_log; org.apache.flink.table.api.ValidationException: Type INT of table field 'id' does not match with the physical type LEGACY('DECIMAL', 'DECIMAL') of the 'id' field of the TableSource return type. Seems the specified type "integer", "number" can not be mapped to INT, BIGINT How can i solve this problem? Thanks, Lei wangl...@geekplus.com.cn
Re: Re: fink sql client not able to read parquet format table
https://issues.apache.org/jira/browse/FLINK-17086 It is my first time to create a flink jira issue. Just point it out and correct it if I write something wrong. Thanks, Lei wangl...@geekplus.com.cn From: Jingsong Li Date: 2020-04-10 11:03 To: wangl...@geekplus.com.cn CC: Jark Wu; lirui; user Subject: Re: Re: fink sql client not able to read parquet format table Hi lei, I think the reason is that our `HiveMapredSplitReader` not supports name mapping reading for parquet format. Can you create a JIRA for tracking this? Best, Jingsong Lee On Fri, Apr 10, 2020 at 9:42 AM wangl...@geekplus.com.cn wrote: I am using Hive 3.1.1 The table has many fields, each field is corresponded to a feild in the RobotUploadData0101 class. CREATE TABLE `robotparquet`(`robotid` int, `framecount` int, `robottime` bigint, `robotpathmode` int, `movingmode` int, `submovingmode` int, `xlocation` int, `ylocation` int, `robotradangle` int, `velocity` int, `acceleration` int, `angularvelocity` int, `angularacceleration` int, `literangle` int, `shelfangle` int, `onloadshelfid` int, `rcvdinstr` int, `sensordist` int, `pathstate` int, `powerpresent` int, `neednewpath` int, `pathelenum` int, `taskstate` int, `receivedtaskid` int, `receivedcommcount` int, `receiveddispatchinstr` int, `receiveddispatchcount` int, `subtaskmode` int, `versiontype` int, `version` int, `liftheight` int, `codecheckstatus` int, `cameraworkmode` int, `backrimstate` int, `frontrimstate` int, `pathselectstate` int, `codemisscount` int, `groundcameraresult` int, `shelfcameraresult` int, `softwarerespondframe` int, `paramstate` int, `pilotlamp` int, `codecount` int, `dist2waitpoint` int, `targetdistance` int, `obstaclecount` int, `obstacleframe` int, `cellcodex` int, `cellcodey` int, `cellangle` int, `shelfqrcode` int, `shelfqrangle` int, `shelfqrx` int, `shelfqry` int, `trackthetaerror` int, `tracksideerror` int, `trackfuseerror` int, `lifterangleerror` int, `lifterheighterror` int, `linearcmdspeed` int, `angluarcmdspeed` int, `liftercmdspeed` int, `rotatorcmdspeed` int) PARTITIONED BY (`hour` string) STORED AS parquet; Thanks, Lei wangl...@geekplus.com.cn From: Jingsong Li Date: 2020-04-09 21:45 To: wangl...@geekplus.com.cn CC: Jark Wu; lirui; user Subject: Re: Re: fink sql client not able to read parquet format table Hi lei, Which hive version did you use? Can you share the complete hive DDL? Best, Jingsong Lee On Thu, Apr 9, 2020 at 7:15 PM wangl...@geekplus.com.cn wrote: I am using the newest 1.10 blink planner. Perhaps it is because of the method i used to write the parquet file. Receive kafka message, transform each message to a Java class Object, write the Object to HDFS using StreamingFileSink, add the HDFS path as a partition of the hive table No matter what the order of the field description in hive ddl statement, the hive client will work, as long as the field name is the same with Java Object field name. But flink sql client will not work. DataStream sourceRobot = source.map( x->transform(x)); final StreamingFileSink sink; sink = StreamingFileSink .forBulkFormat(new Path("hdfs://172.19.78.38:8020/user/root/wanglei/robotdata/parquet"), ParquetAvroWriters.forReflectRecord(RobotUploadData0101.class)) For example RobotUploadData0101 has two fields: robotId int, robotTime long CREATE TABLE `robotparquet`( `robotid` int, `robottime` bigint ) and CREATE TABLE `robotparquet`( `robottime` bigint, `robotid` int) is the same for hive client, but is different for flink-sql client It is an expected behavior? Thanks, Lei wangl...@geekplus.com.cn From: Jark Wu Date: 2020-04-09 14:48 To: wangl...@geekplus.com.cn; Jingsong Li; lirui CC: user Subject: Re: fink sql client not able to read parquet format table Hi Lei, Are you using the newest 1.10 blink planner? I'm not familiar with Hive and parquet, but I know @Jingsong Li and @li...@apache.org are experts on this. Maybe they can help on this question. Best, Jark On Tue, 7 Apr 2020 at 16:17, wangl...@geekplus.com.cn wrote: Hive table stored as parquet. Under hive client: hive> select robotid from robotparquet limit 2; OK 1291097 1291044 But under flink sql-client the result is 0 Flink SQL> select robotid from robotparquet limit 2; robotid 0 0 Any insight on this? Thanks, Lei wangl...@geekplus.com.cn -- Best, Jingsong Lee -- Best, Jingsong Lee
Re: Re: fink sql client not able to read parquet format table
I am using Hive 3.1.1 The table has many fields, each field is corresponded to a feild in the RobotUploadData0101 class. CREATE TABLE `robotparquet`(`robotid` int, `framecount` int, `robottime` bigint, `robotpathmode` int, `movingmode` int, `submovingmode` int, `xlocation` int, `ylocation` int, `robotradangle` int, `velocity` int, `acceleration` int, `angularvelocity` int, `angularacceleration` int, `literangle` int, `shelfangle` int, `onloadshelfid` int, `rcvdinstr` int, `sensordist` int, `pathstate` int, `powerpresent` int, `neednewpath` int, `pathelenum` int, `taskstate` int, `receivedtaskid` int, `receivedcommcount` int, `receiveddispatchinstr` int, `receiveddispatchcount` int, `subtaskmode` int, `versiontype` int, `version` int, `liftheight` int, `codecheckstatus` int, `cameraworkmode` int, `backrimstate` int, `frontrimstate` int, `pathselectstate` int, `codemisscount` int, `groundcameraresult` int, `shelfcameraresult` int, `softwarerespondframe` int, `paramstate` int, `pilotlamp` int, `codecount` int, `dist2waitpoint` int, `targetdistance` int, `obstaclecount` int, `obstacleframe` int, `cellcodex` int, `cellcodey` int, `cellangle` int, `shelfqrcode` int, `shelfqrangle` int, `shelfqrx` int, `shelfqry` int, `trackthetaerror` int, `tracksideerror` int, `trackfuseerror` int, `lifterangleerror` int, `lifterheighterror` int, `linearcmdspeed` int, `angluarcmdspeed` int, `liftercmdspeed` int, `rotatorcmdspeed` int) PARTITIONED BY (`hour` string) STORED AS parquet; Thanks, Lei wangl...@geekplus.com.cn From: Jingsong Li Date: 2020-04-09 21:45 To: wangl...@geekplus.com.cn CC: Jark Wu; lirui; user Subject: Re: Re: fink sql client not able to read parquet format table Hi lei, Which hive version did you use? Can you share the complete hive DDL? Best, Jingsong Lee On Thu, Apr 9, 2020 at 7:15 PM wangl...@geekplus.com.cn wrote: I am using the newest 1.10 blink planner. Perhaps it is because of the method i used to write the parquet file. Receive kafka message, transform each message to a Java class Object, write the Object to HDFS using StreamingFileSink, add the HDFS path as a partition of the hive table No matter what the order of the field description in hive ddl statement, the hive client will work, as long as the field name is the same with Java Object field name. But flink sql client will not work. DataStream sourceRobot = source.map( x->transform(x)); final StreamingFileSink sink; sink = StreamingFileSink .forBulkFormat(new Path("hdfs://172.19.78.38:8020/user/root/wanglei/robotdata/parquet"), ParquetAvroWriters.forReflectRecord(RobotUploadData0101.class)) For example RobotUploadData0101 has two fields: robotId int, robotTime long CREATE TABLE `robotparquet`( `robotid` int, `robottime` bigint ) and CREATE TABLE `robotparquet`( `robottime` bigint, `robotid` int) is the same for hive client, but is different for flink-sql client It is an expected behavior? Thanks, Lei wangl...@geekplus.com.cn From: Jark Wu Date: 2020-04-09 14:48 To: wangl...@geekplus.com.cn; Jingsong Li; lirui CC: user Subject: Re: fink sql client not able to read parquet format table Hi Lei, Are you using the newest 1.10 blink planner? I'm not familiar with Hive and parquet, but I know @Jingsong Li and @li...@apache.org are experts on this. Maybe they can help on this question. Best, Jark On Tue, 7 Apr 2020 at 16:17, wangl...@geekplus.com.cn wrote: Hive table stored as parquet. Under hive client: hive> select robotid from robotparquet limit 2; OK 1291097 1291044 But under flink sql-client the result is 0 Flink SQL> select robotid from robotparquet limit 2; robotid 0 0 Any insight on this? Thanks, Lei wangl...@geekplus.com.cn -- Best, Jingsong Lee
Re: Re: fink sql client not able to read parquet format table
I am using the newest 1.10 blink planner. Perhaps it is because of the method i used to write the parquet file. Receive kafka message, transform each message to a Java class Object, write the Object to HDFS using StreamingFileSink, add the HDFS path as a partition of the hive table No matter what the order of the field description in hive ddl statement, the hive client will work, as long as the field name is the same with Java Object field name. But flink sql client will not work. DataStream sourceRobot = source.map( x->transform(x)); final StreamingFileSink sink; sink = StreamingFileSink .forBulkFormat(new Path("hdfs://172.19.78.38:8020/user/root/wanglei/robotdata/parquet"), ParquetAvroWriters.forReflectRecord(RobotUploadData0101.class)) For example RobotUploadData0101 has two fields: robotId int, robotTime long CREATE TABLE `robotparquet`( `robotid` int, `robottime` bigint ) and CREATE TABLE `robotparquet`( `robottime` bigint, `robotid` int) is the same for hive client, but is different for flink-sql client It is an expected behavior? Thanks, Lei wangl...@geekplus.com.cn From: Jark Wu Date: 2020-04-09 14:48 To: wangl...@geekplus.com.cn; Jingsong Li; lirui CC: user Subject: Re: fink sql client not able to read parquet format table Hi Lei, Are you using the newest 1.10 blink planner? I'm not familiar with Hive and parquet, but I know @Jingsong Li and @li...@apache.org are experts on this. Maybe they can help on this question. Best, Jark On Tue, 7 Apr 2020 at 16:17, wangl...@geekplus.com.cn wrote: Hive table stored as parquet. Under hive client: hive> select robotid from robotparquet limit 2; OK 1291097 1291044 But under flink sql-client the result is 0 Flink SQL> select robotid from robotparquet limit 2; robotid 0 0 Any insight on this? Thanks, Lei wangl...@geekplus.com.cn
fink sql client not able to read parquet format table
Hive table stored as parquet. Under hive client: hive> select robotid from robotparquet limit 2; OK 1291097 1291044 But under flink sql-client the result is 0 Flink SQL> select robotid from robotparquet limit 2; robotid 0 0 Any insight on this? Thanks, Lei wangl...@geekplus.com.cn
Re: Re: 实现 KafkaUpsertTableSink
我只保留 KafkaRetractTableSourceSinkFactory 一个, KafkaRetractTableSinkBase 实现 RetractStreamTableSink 接口,在 consumeDataStream 实现只有 True 才发送,最终 work 了。 @Override public DataStreamSink consumeDataStream(DataStream> dataStream) { DataStream dtNeed = dataStream.filter(x -> x.f0 == Boolean.TRUE).map(x -> x.f1); INSERT INTO table1 SELCET field, count(*) from table2 group by field 这是 一个 RetractStream,结果里面会有 True/False, 通过这个过滤是可以的。 INSERT INTO table1 SELECT feild, 1 from table2 我理解这不是一个 RetractStream, 上面 dataStream.filter(x -> x.f0 == Boolean.TRUE) 的代码应该会出错,但实际上没有出错 还不是完全能理解,我再看一下吧。 谢谢, 王磊 wangl...@geekplus.com.cn Sender: Benchao Li Send Time: 2020-03-31 12:02 Receiver: user-zh Subject: Re: Re: 实现 KafkaUpsertTableSink 我理解你可以让KafkaRetractTableSourceSinkFactory的参数跟KafkaTableSourceSinkFactory的参数有不同的地方, 然后通过这个参数来区分两个不同的factory。比如加一个参数,表示这个sink是retract还是append类型之类的? wangl...@geekplus.com.cn 于2020年3月31日周二 上午11:17写道: > 这个应该是得到两个 tableFactories 了。 我直接平行的把 KafkaTableSourceSinkFactory 那一套变成 > KafkaRetractTableSourceSinkFactory 写了一遍 > 但这个应该怎样改才合适呢? > > 137 private static T > findSingleInternal( > 138 Class factoryClass, > 139 Map properties, > 140 Optional classLoader) { > 141 > 142 List tableFactories = > discoverFactories(classLoader); > 143 List filtered = filter(tableFactories, > factoryClass, properties); > 144 > 145 if (filtered.size() > 1) { > 146 throw new AmbiguousTableFactoryException( > 147 filtered, > 148 factoryClass, > 149 tableFactories, > 150 properties); > 151 } else { > 152 return filtered.get(0); > 153 } > 154 } > > > 谢谢, > 王磊 > > > wangl...@geekplus.com.cn > > > Sender: wangl...@geekplus.com.cn > Send Time: 2020-03-31 10:50 > Receiver: user-zh > Subject: Re: RE: 实现 KafkaUpsertTableSink > > 我以相同的方式 实现了一个 KafkaRetractTableSink, 然后打成 jar 包放在 lib 目录下启动 sql-client: > > org.apache.flink.table.planner.delegation.BlinkExecutorFactory > at > org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:146) > at > org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:113) > at > org.apache.flink.table.client.gateway.local.ExecutionContext.createTableSource(ExecutionContext.java:377) > at > org.apache.flink.table.client.gateway.local.ExecutionContext.lambda$initializeCatalogs$6(ExecutionContext.java:559) > at java.util.LinkedHashMap.forEach(LinkedHashMap.java:684) > at > org.apache.flink.table.client.gateway.local.ExecutionContext.initializeCatalogs(ExecutionContext.java:557) > at > org.apache.flink.table.client.gateway.local.ExecutionContext.initializeTableEnvironment(ExecutionContext.java:494) > at > org.apache.flink.table.client.gateway.local.ExecutionContext.(ExecutionContext.java:159) > at > org.apache.flink.table.client.gateway.local.ExecutionContext.(ExecutionContext.java:118) > at > org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:742) > ... 3 more > > 这个改怎样解决呢? > > 谢谢, > 王磊 > > > > wangl...@geekplus.com.cn > > Sender: wxchunj...@163.com > Send Time: 2020-03-29 10:32 > Receiver: user-zh@flink.apache.org > Subject: RE: 实现 KafkaUpsertTableSink > Benchao,非常感谢大佬,之前不知道需要添加 Factory 到文件中,添加之后,可以正常运行了。 > -Original Message- > From: user-zh-return-2640-wxchunjhyy=163@flink.apache.org > On Behalf Of > Benchao Li > Sent: Saturday, March 28, 2020 6:28 PM > To: user-zh > Subject: Re: 实现 KafkaUpsertTableSink > Hi, > 你需要把你新增的Factory添加到 resources下的 > > META-INF/services/org.apache.flink.table.factories.TableFactory文件中,不知道这一步你是否已经做过了呢? > 于2020年3月28日周六 下午5:38写道: > > 各位大佬: > > > > 由于现在的 KafkaTableSink 不支持 sql 中有group ,参照 > > KafkaTableSink 和 HbaseUpsertTableSink 的一套逻辑实现了一套 > > KafkaUpsertTableSink: > > > > KafkaUpsertTableSink > > > > KafkaUpsertTableSinkBase > > > > KafkaUpsertTableSourceSinkFactory > > > > KafkaUpsertTableSourceSinkFactoryBase > > > > MyKafkaValidator > > > > 但是在 TableFactoryService. discoverFactories 的方法中不能加载我定义的 > > KafkaUpsertTableSourceSinkFactory ,请问各位大佬,是否是需要在什么地方注册 > > 呢? > >
Re: Re: 实现 KafkaUpsertTableSink
这个应该是得到两个 tableFactories 了。 我直接平行的把 KafkaTableSourceSinkFactory 那一套变成 KafkaRetractTableSourceSinkFactory 写了一遍 但这个应该怎样改才合适呢? 137 private static T findSingleInternal( 138 Class factoryClass, 139 Map properties, 140 Optional classLoader) { 141 142 List tableFactories = discoverFactories(classLoader); 143 List filtered = filter(tableFactories, factoryClass, properties); 144 145 if (filtered.size() > 1) { 146 throw new AmbiguousTableFactoryException( 147 filtered, 148 factoryClass, 149 tableFactories, 150 properties); 151 } else { 152 return filtered.get(0); 153 } 154 } 谢谢, 王磊 wangl...@geekplus.com.cn Sender: wangl...@geekplus.com.cn Send Time: 2020-03-31 10:50 Receiver: user-zh Subject: Re: RE: 实现 KafkaUpsertTableSink 我以相同的方式 实现了一个 KafkaRetractTableSink, 然后打成 jar 包放在 lib 目录下启动 sql-client: org.apache.flink.table.planner.delegation.BlinkExecutorFactory at org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:146) at org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:113) at org.apache.flink.table.client.gateway.local.ExecutionContext.createTableSource(ExecutionContext.java:377) at org.apache.flink.table.client.gateway.local.ExecutionContext.lambda$initializeCatalogs$6(ExecutionContext.java:559) at java.util.LinkedHashMap.forEach(LinkedHashMap.java:684) at org.apache.flink.table.client.gateway.local.ExecutionContext.initializeCatalogs(ExecutionContext.java:557) at org.apache.flink.table.client.gateway.local.ExecutionContext.initializeTableEnvironment(ExecutionContext.java:494) at org.apache.flink.table.client.gateway.local.ExecutionContext.(ExecutionContext.java:159) at org.apache.flink.table.client.gateway.local.ExecutionContext.(ExecutionContext.java:118) at org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:742) ... 3 more 这个改怎样解决呢? 谢谢, 王磊 wangl...@geekplus.com.cn Sender: wxchunj...@163.com Send Time: 2020-03-29 10:32 Receiver: user-zh@flink.apache.org Subject: RE: 实现 KafkaUpsertTableSink Benchao,非常感谢大佬,之前不知道需要添加 Factory 到文件中,添加之后,可以正常运行了。 -Original Message- From: user-zh-return-2640-wxchunjhyy=163@flink.apache.org On Behalf Of Benchao Li Sent: Saturday, March 28, 2020 6:28 PM To: user-zh Subject: Re: 实现 KafkaUpsertTableSink Hi, 你需要把你新增的Factory添加到 resources下的 META-INF/services/org.apache.flink.table.factories.TableFactory文件中,不知道这一步你是否已经做过了呢? 于2020年3月28日周六 下午5:38写道: > 各位大佬: > > 由于现在的 KafkaTableSink 不支持 sql 中有group ,参照 > KafkaTableSink 和 HbaseUpsertTableSink 的一套逻辑实现了一套 > KafkaUpsertTableSink: > > KafkaUpsertTableSink > > KafkaUpsertTableSinkBase > > KafkaUpsertTableSourceSinkFactory > > KafkaUpsertTableSourceSinkFactoryBase > > MyKafkaValidator > > 但是在 TableFactoryService. discoverFactories 的方法中不能加载我定义的 > KafkaUpsertTableSourceSinkFactory ,请问各位大佬,是否是需要在什么地方注册 > 呢? > > > > > /** > * Searches for factories using Java service providers. > * > * @return all factories in the classpath */ private static > List discoverFactories(Optional > classLoader) { >try { > List result = new LinkedList<>(); > ClassLoader cl = > classLoader.orElse(Thread.currentThread().getContextClassLoader()); > ServiceLoader > .load(TableFactory.class, cl) > .iterator() > .forEachRemaining(result::add); > //todo add > result.add(new KafkaUpsertTableSourceSinkFactory()); > return result; >} catch (ServiceConfigurationError e) { > LOG.error("Could not load service provider for table factories.", e); > throw new TableException("Could not load service provider for > table factories.", e); >} > > } > > > > > > 直接在返回的 result 里面添加对应的 KafkaUpsertTableSourceSinkFactory 是可 > 以成功运行的。 > > 非常感谢 > > > > > > -- > > Thanks > > venn > > > > -- Benchao Li School of Electronics Engineering and Computer Science, Peking University Tel:+86-15650713730 Email: libenc...@gmail.com; libenc...@pku.edu.cn
Re: RE: 实现 KafkaUpsertTableSink
我以相同的方式 实现了一个 KafkaRetractTableSink, 然后打成 jar 包放在 lib 目录下启动 sql-client: org.apache.flink.table.planner.delegation.BlinkExecutorFactory at org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:146) at org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:113) at org.apache.flink.table.client.gateway.local.ExecutionContext.createTableSource(ExecutionContext.java:377) at org.apache.flink.table.client.gateway.local.ExecutionContext.lambda$initializeCatalogs$6(ExecutionContext.java:559) at java.util.LinkedHashMap.forEach(LinkedHashMap.java:684) at org.apache.flink.table.client.gateway.local.ExecutionContext.initializeCatalogs(ExecutionContext.java:557) at org.apache.flink.table.client.gateway.local.ExecutionContext.initializeTableEnvironment(ExecutionContext.java:494) at org.apache.flink.table.client.gateway.local.ExecutionContext.(ExecutionContext.java:159) at org.apache.flink.table.client.gateway.local.ExecutionContext.(ExecutionContext.java:118) at org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:742) ... 3 more 这个改怎样解决呢? 谢谢, 王磊 wangl...@geekplus.com.cn Sender: wxchunj...@163.com Send Time: 2020-03-29 10:32 Receiver: user-zh@flink.apache.org Subject: RE: 实现 KafkaUpsertTableSink Benchao,非常感谢大佬,之前不知道需要添加 Factory 到文件中,添加之后,可以正常运行了。 -Original Message- From: user-zh-return-2640-wxchunjhyy=163@flink.apache.org On Behalf Of Benchao Li Sent: Saturday, March 28, 2020 6:28 PM To: user-zh Subject: Re: 实现 KafkaUpsertTableSink Hi, 你需要把你新增的Factory添加到 resources下的 META-INF/services/org.apache.flink.table.factories.TableFactory文件中,不知道这一步你是否已经做过了呢? 于2020年3月28日周六 下午5:38写道: > 各位大佬: > > 由于现在的 KafkaTableSink 不支持 sql 中有group ,参照 > KafkaTableSink 和 HbaseUpsertTableSink 的一套逻辑实现了一套 > KafkaUpsertTableSink: > > KafkaUpsertTableSink > > KafkaUpsertTableSinkBase > > KafkaUpsertTableSourceSinkFactory > > KafkaUpsertTableSourceSinkFactoryBase > > MyKafkaValidator > > 但是在 TableFactoryService. discoverFactories 的方法中不能加载我定义的 > KafkaUpsertTableSourceSinkFactory ,请问各位大佬,是否是需要在什么地方注册 > 呢? > > > > > /** > * Searches for factories using Java service providers. > * > * @return all factories in the classpath */ private static > List discoverFactories(Optional > classLoader) { >try { > List result = new LinkedList<>(); > ClassLoader cl = > classLoader.orElse(Thread.currentThread().getContextClassLoader()); > ServiceLoader > .load(TableFactory.class, cl) > .iterator() > .forEachRemaining(result::add); > //todo add > result.add(new KafkaUpsertTableSourceSinkFactory()); > return result; >} catch (ServiceConfigurationError e) { > LOG.error("Could not load service provider for table factories.", e); > throw new TableException("Could not load service provider for > table factories.", e); >} > > } > > > > > > 直接在返回的 result 里面添加对应的 KafkaUpsertTableSourceSinkFactory 是可 > 以成功运行的。 > > 非常感谢 > > > > > > -- > > Thanks > > venn > > > > -- Benchao Li School of Electronics Engineering and Computer Science, Peking University Tel:+86-15650713730 Email: libenc...@gmail.com; libenc...@pku.edu.cn
Re: Re: flink 安装包的几个 jar 是怎么 build 出来的
flink-table-uber-blink 下 mvn clean install -DskipTests -Dscala-2.12 -DskipTests 不清楚这个 -Dscala-2.12 怎样起作用的,但这样是可以直接替换掉服务器上的 jar 并正常 work的 谢谢, 王磊 wangl...@geekplus.com.cn Sender: Kurt Young Send Time: 2020-03-26 18:15 Receiver: user-zh cc: jihongchao Subject: Re: flink 安装包的几个 jar 是怎么 build 出来的 flink-table-uber-blink 应该是这个module,它负责打出 blink planner 的胖包(uber jar) Best, Kurt On Thu, Mar 26, 2020 at 5:54 PM wangl...@geekplus.com.cn < wangl...@geekplus.com.cn> wrote: > > 单机版 下载 tgz 解压,lib 目录下有好几个 jar, 比如 flink-table-blink_2.12-1.10.0.jar > 这个 jar 是从哪里 build 出来的呢? > > 我 clone github 上的源代码,mvn clean package > 我以为 flink-table/flink-table-planner-blink 目录下build 出的 > flink-table-planner-blink_2.11-1.10.0.jar 跟发布 tgz 里的 > flink-table-blink_2.12-1.10.0.jar 是对应的 > 我直接替换到安装目录下,start-cluster.sh 还可以,但 ./bin/sql-client.sh embedded 就出错了。 > > 谢谢, > 王磊 > > > > wangl...@geekplus.com.cn > >
flink 安装包的几个 jar 是怎么 build 出来的
单机版 下载 tgz 解压,lib 目录下有好几个 jar, 比如 flink-table-blink_2.12-1.10.0.jar 这个 jar 是从哪里 build 出来的呢? 我 clone github 上的源代码,mvn clean package 我以为 flink-table/flink-table-planner-blink 目录下build 出的 flink-table-planner-blink_2.11-1.10.0.jar 跟发布 tgz 里的 flink-table-blink_2.12-1.10.0.jar 是对应的 我直接替换到安装目录下,start-cluster.sh 还可以,但 ./bin/sql-client.sh embedded 就出错了。 谢谢, 王磊 wangl...@geekplus.com.cn
Re: Re: Where can i find MySQL retract stream table sink java soure code?
Thanks Jingsong. So JDBCTableSink now suport append and upsert mode. Retract mode not available yet. It is right? Thanks, Lei wangl...@geekplus.com.cn Sender: Jingsong Li Send Time: 2020-03-25 11:39 Receiver: wangl...@geekplus.com.cn cc: user Subject: Re: Where can i find MySQL retract stream table sink java soure code? Hi, Maybe you have some misunderstanding to upsert sink. You can take a look to [1], it can deal with "delete" records. [1]https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/dynamic_tables.html#table-to-stream-conversion Best, Jingsong Lee On Wed, Mar 25, 2020 at 11:37 AM Jingsong Li wrote: Hi, This can be a upsert stream [1], and JDBC has upsert sink now [2]. [1]https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/dynamic_tables.html#table-to-stream-conversion [2]https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connect.html#jdbc-connector Best, Jingsong Lee On Wed, Mar 25, 2020 at 11:14 AM Jingsong Li wrote: Hi, This can be a upsert stream [1] Best, Jingsong Lee On Wed, Mar 25, 2020 at 11:12 AM wangl...@geekplus.com.cn wrote: Create one table with kafka, another table with MySQL using flinksql. Write a sql to read from kafka and write to MySQL. INSERT INTO mysqlTable SELECT status, COUNT(order_no) AS num FROM (SELECT order_no, LAST_VALUE(status) AS status FROM kafkaTable GROUP BY order_no) GROUP BY statusI think this is a retract stream. But where can i find the java source code about MySQL retract table sink? Thanks,Lei wangl...@geekplus.com.cn -- Best, Jingsong Lee -- Best, Jingsong Lee -- Best, Jingsong Lee
Re: Re: Where can i find MySQL retract stream table sink java soure code?
Seems it is here: https://github.com/apache/flink/tree/master/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc There's no JDBCRetractTableSink, only append and upsert. I am confused why the MySQL record can be deleted. Thanks, Lei wangl...@geekplus.com.cn Sender: wangl...@geekplus.com.cn Send Time: 2020-03-25 11:25 Receiver: Jingsong Li cc: user Subject: Re: Re: Where can i find MySQL retract stream table sink java soure code? Thanks Jingsong. When executing this sql, the mysql table record can be deleted. So i guess it is a retract stream. I want to know the exactly java code it is generated and have a look at it. Thanks, Lei wangl...@geekplus.com.cn Sender: Jingsong Li Send Time: 2020-03-25 11:14 Receiver: wangl...@geekplus.com.cn cc: user Subject: Re: Where can i find MySQL retract stream table sink java soure code? Hi, This can be a upsert stream [1] Best, Jingsong Lee On Wed, Mar 25, 2020 at 11:12 AM wangl...@geekplus.com.cn wrote: Create one table with kafka, another table with MySQL using flinksql. Write a sql to read from kafka and write to MySQL. INSERT INTO mysqlTable SELECT status, COUNT(order_no) AS num FROM (SELECT order_no, LAST_VALUE(status) AS status FROM kafkaTable GROUP BY order_no) GROUP BY statusI think this is a retract stream. But where can i find the java source code about MySQL retract table sink? Thanks,Lei wangl...@geekplus.com.cn -- Best, Jingsong Lee
Re: Re: Where can i find MySQL retract stream table sink java soure code?
Thanks Jingsong. When executing this sql, the mysql table record can be deleted. So i guess it is a retract stream. I want to know the exactly java code it is generated and have a look at it. Thanks, Lei wangl...@geekplus.com.cn Sender: Jingsong Li Send Time: 2020-03-25 11:14 Receiver: wangl...@geekplus.com.cn cc: user Subject: Re: Where can i find MySQL retract stream table sink java soure code? Hi, This can be a upsert stream [1] Best, Jingsong Lee On Wed, Mar 25, 2020 at 11:12 AM wangl...@geekplus.com.cn wrote: Create one table with kafka, another table with MySQL using flinksql. Write a sql to read from kafka and write to MySQL. INSERT INTO mysqlTable SELECT status, COUNT(order_no) AS num FROM (SELECT order_no, LAST_VALUE(status) AS status FROM kafkaTable GROUP BY order_no) GROUP BY statusI think this is a retract stream. But where can i find the java source code about MySQL retract table sink? Thanks,Lei wangl...@geekplus.com.cn -- Best, Jingsong Lee
Where can i find MySQL retract stream table sink java soure code?
Create one table with kafka, another table with MySQL using flinksql. Write a sql to read from kafka and write to MySQL. INSERT INTO mysqlTable SELECT status, COUNT(order_no) AS num FROM (SELECT order_no, LAST_VALUE(status) AS status FROM kafkaTable GROUP BY order_no) GROUP BY statusI think this is a retract stream. But where can i find the java source code about MySQL retract table sink? Thanks,Lei wangl...@geekplus.com.cn
回复: Flink JDBC Driver是否支持创建流数据表
参考下这个文档: https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector 下面的语法应该是不支持的: 'format.type' = 'csv',\n" + "'format.field-delimiter' = '|'\n" 下面是我可以跑通的代码, kafka 里的数据需要是这种格式:{"order_no":"abcdefg","status":90} tEnv.sqlUpdate("CREATE TABLE pick_order (\n" + "order_no VARCHAR,\n" + "status INT\n" + ") WITH (\n" + "'connector.type' = 'kafka',\n" + "'connector.version' = 'universal',\n" + "'connector.topic' = 'wanglei_test',\n" + "'connector.startup-mode' = 'latest-offset',\n" + "'connector.properties.0.key' = 'zookeeper.connect',\n" + "'connector.properties.0.value' = 'xxx:2181',\n" + "'connector.properties.1.key' = 'bootstrap.servers',\n" + "'connector.properties.1.value' = 'xxx:9092',\n" + "'update-mode' = 'append',\n" + "'format.type' = 'json',\n" + "'format.derive-schema' = 'true'\n" + ")"); 王磊 wangl...@geekplus.com.cn 发件人: 赵峰 发送时间: 2020-03-24 21:28 收件人: user-zh 主题: Flink JDBC Driver是否支持创建流数据表 hi Flink JDBC Driver创建kafka表报错,是我的建表代码不正确?代码如下: Connection connection = DriverManager.getConnection("jdbc:flink://localhost:8083?planner=blink"); Statement statement = connection.createStatement(); statement.executeUpdate( "CREATE TABLE table_kafka (\n" + "user_id BIGINT,\n" + "item_id BIGINT,\n" + "category_id BIGINT,\n" + "behavior STRING,\n" + "ts TIMESTAMP(3),\n" + "proctime as PROCTIME(),\n" + "WATERMARK FOR ts as ts - INTERVAL '5' SECOND\n" + ") WITH (\n" + "'connector.type' = 'kafka', \n" + "'connector.version' = 'universal', \n" + "'connector.topic' = 'flink_im02', \n" + "'connector.properties.group.id' = 'flink_im02_new',\n" + "'connector.startup-mode' = 'earliest-offset', \n" + "'connector.properties.zookeeper.connect' = '*.*.*.*:2181', \n" + "'connector.properties.bootstrap.servers' = '*.*.*.*:9092', \n" + "'format.type' = 'csv',\n" + "'format.field-delimiter' = '|'\n" + ")"); ResultSet rs1 = statement.executeQuery("SELECT * FROM table_kafka"); while (rs1.next()) { System.out.println(rs1.getInt(1) + ", " + rs1.getInt(2)); } statement.close(); connection.close(); 报错: Reason: Required context properties mismatch. The matching candidates: org.apache.flink.table.sources.CsvBatchTableSourceFactory Mismatched properties: 'connector.type' expects 'filesystem', but is 'kafka' 赵峰
Streaming kafka data sink to hive
We have many app logs on our app server and want to parse the logs to structed table format and then sink to hive. Seems it is good to use batch mode. The app log is hourly compressed and it is convenience to do partitioning. We want to use streaming mode. Tail the app logs to Kafka, then use flink to read kafka topic and then sink to Hive. I have several questions. 1 Is there any flink-hive-connector that i can use to write to hive streamingly? 2 Since HDFS is not friendly to frequently append and hive's data is stored to hdfs, is it OK if the throughput is high? Thanks, Lei wangl...@geekplus.com.cn
Re: Re: flink sql-client read hive orc table no result
Seems the root cause is "transactional"='true'. After remove this,the table can be queryed from flink sql-client,even i add "clustered by (robot_id) into 3 buckets" again. Thanks, Lei wangl...@geekplus.com.cn From: Kurt Young Date: 2020-03-18 18:04 To: wangl...@geekplus.com.cn CC: lirui; user Subject: Re: Re: flink sql-client read hive orc table no result also try to remove "transactional"='true'? Best, Kurt On Wed, Mar 18, 2020 at 5:54 PM wangl...@geekplus.com.cn wrote: Tried again. Even i remove the "clustered by (robot_id) into 3 buckets" statement, no result from flink sql-client Thanks, Lei wangl...@geekplus.com.cn From: Kurt Young Date: 2020-03-18 17:41 To: wangl...@geekplus.com.cn; lirui CC: user Subject: Re: flink sql-client read hive orc table no result My guess is we haven't support hive bucket table yet. cc Rui Li for confirmation. Best, Kurt On Wed, Mar 18, 2020 at 5:19 PM wangl...@geekplus.com.cn wrote: Hive table store as orc format: CREATE TABLE `robot_tr`(`robot_id` int, `robot_time` bigint, `linear_velocity` double, `track_side_error` int) partitioned by (warehouseid STRING) clustered by (robot_id) into 3 buckets stored as orc tblproperties("transactional"='true'); Under hive client, insert into one record and then select there will be the result to the console. But under flink sql-client, when select * from robot_tr, there's no result? Any insight on this? Thanks, Lei wangl...@geekplus.com.cn
Re: Re: flink sql-client read hive orc table no result
Tried again. Even i remove the "clustered by (robot_id) into 3 buckets" statement, no result from flink sql-client Thanks, Lei wangl...@geekplus.com.cn From: Kurt Young Date: 2020-03-18 17:41 To: wangl...@geekplus.com.cn; lirui CC: user Subject: Re: flink sql-client read hive orc table no result My guess is we haven't support hive bucket table yet. cc Rui Li for confirmation. Best, Kurt On Wed, Mar 18, 2020 at 5:19 PM wangl...@geekplus.com.cn wrote: Hive table store as orc format: CREATE TABLE `robot_tr`(`robot_id` int, `robot_time` bigint, `linear_velocity` double, `track_side_error` int) partitioned by (warehouseid STRING) clustered by (robot_id) into 3 buckets stored as orc tblproperties("transactional"='true'); Under hive client, insert into one record and then select there will be the result to the console. But under flink sql-client, when select * from robot_tr, there's no result? Any insight on this? Thanks, Lei wangl...@geekplus.com.cn
flink sql-client read hive orc table no result
Hive table store as orc format: CREATE TABLE `robot_tr`(`robot_id` int, `robot_time` bigint, `linear_velocity` double, `track_side_error` int) partitioned by (warehouseid STRING) clustered by (robot_id) into 3 buckets stored as orc tblproperties("transactional"='true'); Under hive client, insert into one record and then select there will be the result to the console. But under flink sql-client, when select * from robot_tr, there's no result? Any insight on this? Thanks, Lei wangl...@geekplus.com.cn
Re: Re: dimention table join not work under sql-client fink-1.10.0
Thanks, works now. Seems the open source version is different from alibaba cloud: https://www.alibabacloud.com/help/doc-detail/62506.htm wangl...@geekplus.com.cn From: Zhenghua Gao Date: 2020-03-13 12:12 To: wangl...@geekplus.com.cn CC: user Subject: Re: dimention table join not work under sql-client fink-1.10.0 We don't support 'PROCTIME()' in a temporal table join. Please use a left table's proctime field. [1] [1] https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/joins.html#usage-1 Best Regards, Zhenghua Gao On Fri, Mar 13, 2020 at 11:57 AM wangl...@geekplus.com.cn wrote: Kafka source table: CREATE TABLE out_order ( out_order_code VARCHAR, intput_date BIGINT, owner_code VARCHAR ) WITH ( 'connector.type' = 'kafka',MySQL dimention table: CREATE TABLE dim_owner ( owner_code VARCHAR, owner_name VARCHAR ) WITH ( 'connector.type' = 'jdbc',When i submit the sql: SELECT o.out_order_code, o.input_date, o.owner_code, d.owner_name FROM out_order as o JOIN dim_owner FOR SYSTEM_TIME AS OF PROCTIME() as d ON o.owner_code = d.owner_code; There's error: [ERROR] Could not execute SQL statement. Reason: org.apache.flink.table.api.TableException: Temporal table join currently only supports 'FOR SYSTEM_TIME AS OF' left table's proctime field, doesn't support 'PROCTIME()' Thanks, Lei wangl...@geekplus.com.cn
dimention table join not work under sql-client fink-1.10.0
Kafka source table: CREATE TABLE out_order ( out_order_code VARCHAR, intput_date BIGINT, owner_code VARCHAR ) WITH ( 'connector.type' = 'kafka',MySQL dimention table: CREATE TABLE dim_owner ( owner_code VARCHAR, owner_name VARCHAR ) WITH ( 'connector.type' = 'jdbc',When i submit the sql: SELECT o.out_order_code, o.input_date, o.owner_code, d.owner_name FROM out_order as o JOIN dim_owner FOR SYSTEM_TIME AS OF PROCTIME() as d ON o.owner_code = d.owner_code; There's error: [ERROR] Could not execute SQL statement. Reason: org.apache.flink.table.api.TableException: Temporal table join currently only supports 'FOR SYSTEM_TIME AS OF' left table's proctime field, doesn't support 'PROCTIME()' Thanks, Lei wangl...@geekplus.com.cn
Re: Re: flinkSQL join表的历史信息保存在哪里保存多久
Thanks, it works. wangl...@geekplus.com.cn Sender: sunfulin Send Time: 2020-03-12 14:19 Receiver: user-zh; wanglei2 cc: jinhai.me Subject: Re:Re: Re: flinkSQL join表的历史信息保存在哪里保存多久 这样来用: StreamTableEnvironment.getConfig().setIdleStateRetentionTime(min, max); 在 2020-03-12 14:11:31,"wangl...@geekplus.com.cn" 写道: > >这个文档是最新的吗,我直接在 IDEA 里面写这三行代码。 >StreamQueryConfig Deprecated, tableEnv 没有 queryConfig() 方法 >StreamExecutionEnvironment env = >StreamExecutionEnvironment.getExecutionEnvironment(); >StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); >StreamQueryConfig qConfig = tableEnv.queryConfig(); > > > >wangl...@geekplus.com.cn > > >Sender: jinhai wang >Send Time: 2020-03-12 13:44 >Receiver: user-zh@flink.apache.org >Subject: Re: flinkSQL join表的历史信息保存在哪里保存多久 >应该是withIdleStateRetentionTime参数的配置时间。具体文档:https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/query_configuration.html#idle-state-retention-time > > >在 2020/3/12 下午12:37,“wangl...@geekplus.com.cn” 写入: > > >两个从 kafka 创建的表: > >tableA: key valueA >tableB: key valueB > >用 flink sql 提交job 运行: select tableA.key, tableA.valueA,tableB.valueB from > tableA join tableB on tableA.key = tableB.key; >这两个表的历史数据在 flink 中存在哪里?存多久呢? > >比如我 tableA key1 先出现,很长时间以后(一个月)tableB key1 才出现,这时候还能 join 上吗? > >谢谢, >王磊 > > > >wangl...@geekplus.com.cn > >
Re: Re: flinkSQL join表的历史信息保存在哪里保存多久
这个文档是最新的吗,我直接在 IDEA 里面写这三行代码。 StreamQueryConfig Deprecated, tableEnv 没有 queryConfig() 方法 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); StreamQueryConfig qConfig = tableEnv.queryConfig(); wangl...@geekplus.com.cn Sender: jinhai wang Send Time: 2020-03-12 13:44 Receiver: user-zh@flink.apache.org Subject: Re: flinkSQL join表的历史信息保存在哪里保存多久 应该是withIdleStateRetentionTime参数的配置时间。具体文档:https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/query_configuration.html#idle-state-retention-time 在 2020/3/12 下午12:37,“wangl...@geekplus.com.cn” 写入: 两个从 kafka 创建的表: tableA: key valueA tableB: key valueB 用 flink sql 提交job 运行: select tableA.key, tableA.valueA,tableB.valueB from tableA join tableB on tableA.key = tableB.key; 这两个表的历史数据在 flink 中存在哪里?存多久呢? 比如我 tableA key1 先出现,很长时间以后(一个月)tableB key1 才出现,这时候还能 join 上吗? 谢谢, 王磊 wangl...@geekplus.com.cn
flinkSQL join 表的历史信息保存在哪里保存多久
两个从 kafka 创建的表: tableA: key valueA tableB: key valueB 用 flink sql 提交job 运行: select tableA.key, tableA.valueA,tableB.valueB from tableA join tableB on tableA.key = tableB.key; 这两个表的历史数据在 flink 中存在哪里?存多久呢? 比如我 tableA key1 先出现,很长时间以后(一个月)tableB key1 才出现,这时候还能 join 上吗? 谢谢, 王磊 wangl...@geekplus.com.cn
Re: Re: flink sql join 可以有 state 存储并从 state 恢复数据吗?
我试了下,是可以的。 Thanks wangl...@geekplus.com.cn Sender: Kurt Young Send Time: 2020-03-11 19:59 Receiver: wangl...@geekplus.com.cn cc: user-zh Subject: Re: Re: flink sql join 可以有 state 存储并从 state 恢复数据吗? 那有可能是可以的,你可以试试看 Best, Kurt On Wed, Mar 11, 2020 at 6:57 PM wangl...@geekplus.com.cn wrote: Hi Kurt, 如果我不用 sql-client, 直接把表注册、sql join 之类的操作写在 java 代码里打成 jar 包就能实现从 state 中恢复的功能吗? 代码里没有任何跟 state 声明、TTL 定义之类的操作。任务 cancel -s 之前已经存在的表信息会在 state 存储并且再次提交任务可以被访问到直接用吗? 谢谢, 王磊 wangl...@geekplus.com.cn Sender: Kurt Young Send Time: 2020-03-11 12:54 Receiver: wangl...@geekplus.com.cn cc: user-zh Subject: Re: Re: flink sql join 可以有 state 存储并从 state 恢复数据吗? sql client 目前还不支持这个功能。 Best, Kurt On Wed, Mar 11, 2020 at 11:35 AM wangl...@geekplus.com.cn wrote: Hi Kurt, 确实是可以 直接 flink cancel -s 保存状态。 但我是用 flink-sql-client 直接写 sql 提交的 job,再提交的时候怎样可以指定状态目录让这个任务从状态恢复呢? 谢谢, 王磊 Sender: Kurt Young Send Time: 2020-03-11 10:38 Receiver: user-zh Subject: Re: flink sql join 可以有 state 存储并从 state 恢复数据吗? 理论上来说,flink SQL的作业在编译完生成JobGraph并提交到集群上后,和Datastream的作业就没有什么本质的不同了。 应该也可以支持flink cancel -s 的功能,你可以先试下,如果碰到什么问题再看看。 Best, Kurt On Wed, Mar 11, 2020 at 10:24 AM wangl...@geekplus.com.cn < wangl...@geekplus.com.cn> wrote: > 有两个表: > tableA: key valueA > tableB: key valueB > > 我之前用 flink state 的方式存储 tableA,tableB 的消息过来以后去 query 这个 state 得到 valueA > 直接 写 flinkSQL 也可以实现这种功能,但这两个表有时间差,任务停止后重新提交会丢失部分 join 的结果。 > flinkSQL 有没有类似 flink cancel -s 保存 state 的功能呢? > > 谢谢, > 王磊 >
Re: Re: How to set stateBackEnd in flink sql program?
Hi Jingsong, So i can write the code as following? EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); TableEnvironment tEnv = TableEnvironment.create(settings); tEnv.getConfig().getConfiguration().setString("state.backend","rocksdb");eEnv.sqlUpdate(..) Thanks, Lei wangl...@geekplus.com.cn From: Jingsong Li Date: 2020-03-12 11:32 To: wangl...@geekplus.com.cn CC: user Subject: Re: How to set stateBackEnd in flink sql program? Hi wanglei, If you are using Flink 1.10, you can set "state.backend=rocksdb" to "TableConfig.getConfiguration". And you can find related config options here[1]. [1] https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/config.html Jingsong Lee On Thu, Mar 12, 2020 at 11:15 AM wangl...@geekplus.com.cn wrote: EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); TableEnvironment tEnv = TableEnvironment.create(settings); tEnv.sqlUpdate()... Is there a way i can set stateBackEnd like normal streaming program as folloing:StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStateBackend(new RocksDBStateBackend(args[0], true)); wangl...@geekplus.com.cn -- Best, Jingsong Lee
How to set stateBackEnd in flink sql program?
EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); TableEnvironment tEnv = TableEnvironment.create(settings); tEnv.sqlUpdate()... Is there a way i can set stateBackEnd like normal streaming program as folloing:StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStateBackend(new RocksDBStateBackend(args[0], true)); wangl...@geekplus.com.cn
Re: Re: json 中 timestamp 类型在json中怎样写才能被 flink sql 识别
Thanks Jark, No word to express my '囧'. wangl...@geekplus.com.cn Sender: Jark Wu Send Time: 2020-03-11 18:32 Receiver: wangl...@geekplus.com.cn cc: user; user-zh Subject: Re: json 中 timestamp 类型在json中怎样写才能被 flink sql 识别 Hi Lei, The "2020-03-11T13:00:00.123Z" format is correct, but you defined the wrong field name in the DDL. It should be "input_date", not "intput_date". Best, Jark On Wed, 11 Mar 2020 at 17:52, wangl...@geekplus.com.cn wrote: Sorry i sent the Chinese written email to user@ Let me translate it to English. I create a table using sql-client from kafka topic: CREATE TABLE order_status ( out_order_code VARCHAR, intput_date TIMESTAMP(3), owner_code VARCHAR, status INT ) WITH ( 'connector.type' = 'kafka',. 'format.type' = 'json', 'format.derive-schema' = 'true' ) Then I send message to the topic: {"out_order_code":"MAMICK2020031048","input_date":"2020-03-11T13:00:00.123Z","owner_code":"WDTLEN04","status":90} But the input_date is not recognized on the sql-client and is null, even i tried 1583828700240 "2020-03-11 13:00:00" "2020-03-11 13:00:00.000" How should the timestamp(3) look like in the json message? Thanks, Lei wangl...@geekplus.com.cn 发件人: wangl...@geekplus.com.cn 发送时间: 2020-03-11 17:41 收件人: user 主题: json 中 timestamp 类型在json中怎样写才能被 flink sql 识别 用 sql-client create 了一个 kafka table: CREATE TABLE order_status ( out_order_code VARCHAR, intput_date TIMESTAMP(3), owner_code VARCHAR, status INT ) WITH ( 'connector.type' = 'kafka',. 'format.type' = 'json', 'format.derive-schema' = 'true' ) 然后往 kafka 这个 topic 发送消息:{"out_order_code":"MAMICK2020031048","input_date":"2020-03-11T13:00:00.123Z","owner_code":"WDTLEN04","status":90} input_date 在 sql-clinet 端始终是 NULL. 我把 发送的 input_date 改成 1583828700240 "2020-03-11 13:00:00" "2020-03-11 13:00:00.000" 也都不行。 这个 TIMESTAMP(3)在JSON 中应该写成什么样子呢? 谢谢, 王磊 wangl...@geekplus.com.cn
Re: Re: json 中 timestamp 类型在json中怎样写才能被 flink sql 识别
Thanks Jark, No word to express my '囧'. wangl...@geekplus.com.cn Sender: Jark Wu Send Time: 2020-03-11 18:32 Receiver: wangl...@geekplus.com.cn cc: user; user-zh Subject: Re: json 中 timestamp 类型在json中怎样写才能被 flink sql 识别 Hi Lei, The "2020-03-11T13:00:00.123Z" format is correct, but you defined the wrong field name in the DDL. It should be "input_date", not "intput_date". Best, Jark On Wed, 11 Mar 2020 at 17:52, wangl...@geekplus.com.cn wrote: Sorry i sent the Chinese written email to user@ Let me translate it to English. I create a table using sql-client from kafka topic: CREATE TABLE order_status ( out_order_code VARCHAR, intput_date TIMESTAMP(3), owner_code VARCHAR, status INT ) WITH ( 'connector.type' = 'kafka',. 'format.type' = 'json', 'format.derive-schema' = 'true' ) Then I send message to the topic: {"out_order_code":"MAMICK2020031048","input_date":"2020-03-11T13:00:00.123Z","owner_code":"WDTLEN04","status":90} But the input_date is not recognized on the sql-client and is null, even i tried 1583828700240 "2020-03-11 13:00:00" "2020-03-11 13:00:00.000" How should the timestamp(3) look like in the json message? Thanks, Lei wangl...@geekplus.com.cn 发件人: wangl...@geekplus.com.cn 发送时间: 2020-03-11 17:41 收件人: user 主题: json 中 timestamp 类型在json中怎样写才能被 flink sql 识别 用 sql-client create 了一个 kafka table: CREATE TABLE order_status ( out_order_code VARCHAR, intput_date TIMESTAMP(3), owner_code VARCHAR, status INT ) WITH ( 'connector.type' = 'kafka',. 'format.type' = 'json', 'format.derive-schema' = 'true' ) 然后往 kafka 这个 topic 发送消息:{"out_order_code":"MAMICK2020031048","input_date":"2020-03-11T13:00:00.123Z","owner_code":"WDTLEN04","status":90} input_date 在 sql-clinet 端始终是 NULL. 我把 发送的 input_date 改成 1583828700240 "2020-03-11 13:00:00" "2020-03-11 13:00:00.000" 也都不行。 这个 TIMESTAMP(3)在JSON 中应该写成什么样子呢? 谢谢, 王磊 wangl...@geekplus.com.cn
Re: Re: flink sql join 可以有 state 存储并从 state 恢复数据吗?
Hi Kurt, 如果我不用 sql-client, 直接把表注册、sql join 之类的操作写在 java 代码里打成 jar 包就能实现从 state 中恢复的功能吗? 代码里没有任何跟 state 声明、TTL 定义之类的操作。任务 cancel -s 之前已经存在的表信息会在 state 存储并且再次提交任务可以被访问到直接用吗? 谢谢, 王磊 wangl...@geekplus.com.cn Sender: Kurt Young Send Time: 2020-03-11 12:54 Receiver: wangl...@geekplus.com.cn cc: user-zh Subject: Re: Re: flink sql join 可以有 state 存储并从 state 恢复数据吗? sql client 目前还不支持这个功能。 Best, Kurt On Wed, Mar 11, 2020 at 11:35 AM wangl...@geekplus.com.cn wrote: Hi Kurt, 确实是可以 直接 flink cancel -s 保存状态。 但我是用 flink-sql-client 直接写 sql 提交的 job,再提交的时候怎样可以指定状态目录让这个任务从状态恢复呢? 谢谢, 王磊 Sender: Kurt Young Send Time: 2020-03-11 10:38 Receiver: user-zh Subject: Re: flink sql join 可以有 state 存储并从 state 恢复数据吗? 理论上来说,flink SQL的作业在编译完生成JobGraph并提交到集群上后,和Datastream的作业就没有什么本质的不同了。 应该也可以支持flink cancel -s 的功能,你可以先试下,如果碰到什么问题再看看。 Best, Kurt On Wed, Mar 11, 2020 at 10:24 AM wangl...@geekplus.com.cn < wangl...@geekplus.com.cn> wrote: > 有两个表: > tableA: key valueA > tableB: key valueB > > 我之前用 flink state 的方式存储 tableA,tableB 的消息过来以后去 query 这个 state 得到 valueA > 直接 写 flinkSQL 也可以实现这种功能,但这两个表有时间差,任务停止后重新提交会丢失部分 join 的结果。 > flinkSQL 有没有类似 flink cancel -s 保存 state 的功能呢? > > 谢谢, > 王磊 >
回复: json 中 timestamp 类型在json中怎样写才能被 flink sql 识别
Sorry i sent the Chinese written email to user@ Let me translate it to English. I create a table using sql-client from kafka topic: CREATE TABLE order_status ( out_order_code VARCHAR, intput_date TIMESTAMP(3), owner_code VARCHAR, status INT ) WITH ( 'connector.type' = 'kafka',. 'format.type' = 'json', 'format.derive-schema' = 'true' ) Then I send message to the topic: {"out_order_code":"MAMICK2020031048","input_date":"2020-03-11T13:00:00.123Z","owner_code":"WDTLEN04","status":90} But the input_date is not recognized on the sql-client and is null, even i tried 1583828700240 "2020-03-11 13:00:00" "2020-03-11 13:00:00.000" How should the timestamp(3) look like in the json message? Thanks, Lei wangl...@geekplus.com.cn 发件人: wangl...@geekplus.com.cn 发送时间: 2020-03-11 17:41 收件人: user 主题: json 中 timestamp 类型在json中怎样写才能被 flink sql 识别 用 sql-client create 了一个 kafka table: CREATE TABLE order_status ( out_order_code VARCHAR, intput_date TIMESTAMP(3), owner_code VARCHAR, status INT ) WITH ( 'connector.type' = 'kafka',. 'format.type' = 'json', 'format.derive-schema' = 'true' ) 然后往 kafka 这个 topic 发送消息:{"out_order_code":"MAMICK2020031048","input_date":"2020-03-11T13:00:00.123Z","owner_code":"WDTLEN04","status":90} input_date 在 sql-clinet 端始终是 NULL. 我把 发送的 input_date 改成 1583828700240 "2020-03-11 13:00:00" "2020-03-11 13:00:00.000" 也都不行。 这个 TIMESTAMP(3)在JSON 中应该写成什么样子呢? 谢谢, 王磊 wangl...@geekplus.com.cn
回复: json 中 timestamp 类型在json中怎样写才能被 flink sql 识别
Sorry i sent the Chinese written email to user@ Let me translate it to English. I create a table using sql-client from kafka topic: CREATE TABLE order_status ( out_order_code VARCHAR, intput_date TIMESTAMP(3), owner_code VARCHAR, status INT ) WITH ( 'connector.type' = 'kafka',. 'format.type' = 'json', 'format.derive-schema' = 'true' ) Then I send message to the topic: {"out_order_code":"MAMICK2020031048","input_date":"2020-03-11T13:00:00.123Z","owner_code":"WDTLEN04","status":90} But the input_date is not recognized on the sql-client and is null, even i tried 1583828700240 "2020-03-11 13:00:00" "2020-03-11 13:00:00.000" How should the timestamp(3) look like in the json message? Thanks, Lei wangl...@geekplus.com.cn 发件人: wangl...@geekplus.com.cn 发送时间: 2020-03-11 17:41 收件人: user 主题: json 中 timestamp 类型在json中怎样写才能被 flink sql 识别 用 sql-client create 了一个 kafka table: CREATE TABLE order_status ( out_order_code VARCHAR, intput_date TIMESTAMP(3), owner_code VARCHAR, status INT ) WITH ( 'connector.type' = 'kafka',. 'format.type' = 'json', 'format.derive-schema' = 'true' ) 然后往 kafka 这个 topic 发送消息:{"out_order_code":"MAMICK2020031048","input_date":"2020-03-11T13:00:00.123Z","owner_code":"WDTLEN04","status":90} input_date 在 sql-clinet 端始终是 NULL. 我把 发送的 input_date 改成 1583828700240 "2020-03-11 13:00:00" "2020-03-11 13:00:00.000" 也都不行。 这个 TIMESTAMP(3)在JSON 中应该写成什么样子呢? 谢谢, 王磊 wangl...@geekplus.com.cn
json 中 timestamp 类型在json中怎样写才能被 flink sql 识别
用 sql-client create 了一个 kafka table: CREATE TABLE order_status ( out_order_code VARCHAR, intput_date TIMESTAMP(3), owner_code VARCHAR, status INT ) WITH ( 'connector.type' = 'kafka',. 'format.type' = 'json', 'format.derive-schema' = 'true' ) 然后往 kafka 这个 topic 发送消息:{"out_order_code":"MAMICK2020031048","input_date":"2020-03-11T13:00:00.123Z","owner_code":"WDTLEN04","status":90} input_date 在 sql-clinet 端始终是 NULL. 我把 发送的 input_date 改成 1583828700240 "2020-03-11 13:00:00" "2020-03-11 13:00:00.000" 也都不行。 这个 TIMESTAMP(3)在JSON 中应该写成什么样子呢? 谢谢, 王磊 wangl...@geekplus.com.cn
Re: Re: Dose flink-1.10 sql-client support kafka sink?
I am using flink-1.10. But I add flink-json-1.9.1.jar and flink-sql-connector-kafka_2.11-1.9.1.jar to lib directory. After change to flink-json-1.10.0.jar, flink-sql-connector-kafka_2.12-1.10.0.jar, it works. But I have no idea why the yaml way works when i use flink-json-1.9.1.jar and flink-sql-connector-kafka_2.11-1.9.1.jar in flink-1.10 environment. Thanks, Lei wangl...@geekplus.com.cn From: wangl...@geekplus.com.cn Date: 2020-03-11 14:51 To: Jark Wu CC: Arvid Heise; user Subject: Re: Re: Dose flink-1.10 sql-client support kafka sink? Hi Jark, I have tried to use CREATE table DDL First ./bin/sql-client.sh embedded. Then create a table from kafka topic and it tell me table has been created. But when I query with select * from tableName. There's error: [ERROR] Could not execute SQL statement. Reason: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSourceFactory' in the classpath. Perhaps i need some jar to the lib directory. But If i write the table configuration in the sql-client-defaults.yaml file,i can select the result correctly Thanks, Lei From: Jark Wu Date: 2020-03-11 11:13 To: wangl...@geekplus.com.cn CC: Arvid Heise; user Subject: Re: Re: Dose flink-1.10 sql-client support kafka sink? Hi Lei, CREATE TABLE DDL [1][2] is the recommended way to register a table since 1.9. And the yaml way might be deprecated in the future. By using DDL, a registered table can both be used as source and sink. Best, Jark [1]: https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/create.html#create-table [2]: https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector On Tue, 10 Mar 2020 at 21:52, wangl...@geekplus.com.cn wrote: Thanks, works now. Seems it is because i added the schema: "ROW(out_order_code STRING,input_date BIGINT, owner_code STRING, status INT)" under format label. From: Arvid Heise Date: 2020-03-10 20:51 To: wangl...@geekplus.com.cn CC: user Subject: Re: Dose flink-1.10 sql-client support kafka sink? Hi Lei, yes Kafka as a sink is supported albeit only for appends (no deletions/updates yet) [1]. An example is a bit hidden in the documentation [2]: tables: - name: MyTableSink type: sink-table update-mode: append connector: property-version: 1 type: kafka version: "0.11" topic: OutputTopic properties: zookeeper.connect: localhost:2181 bootstrap.servers: localhost:9092 group.id: testGroup format: property-version: 1 type: json derive-schema: true schema: - name: rideId data-type: BIGINT - name: lon data-type: FLOAT - name: lat data-type: FLOAT - name: rideTime data-type: TIMESTAMP(3) [1] https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector [2] https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sqlClient.html#detached-sql-queries On Tue, Mar 10, 2020 at 10:51 AM wangl...@geekplus.com.cn wrote: I have configured source table successfully using the following configuration: - name: out_order type: source update-mode: append schema: - name: out_order_code type: STRING - name: input_date type: BIGINT - name: owner_code type: STRING connector: property-version: 1 type: kafka version: universal topic: out_order startup-mode: latest-offset properties: - key: zookeeper.connect value: 172.19.78.32:2181 - key: bootstrap.servers value: 172.19.78.32:9092 - key: group.id property-version: 1 type: json schema: "ROW(out_order_code STRING,owner_code STRING,input_date BIGINT)" How can i configure a sink table? I haven't found any useful docs for this. Thanks, Lei
Re: Re: Dose flink-1.10 sql-client support kafka sink?
Hi Jark, I have tried to use CREATE table DDL First ./bin/sql-client.sh embedded. Then create a table from kafka topic and it tell me table has been created. But when I query with select * from tableName. There's error: [ERROR] Could not execute SQL statement. Reason: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSourceFactory' in the classpath. Perhaps i need some jar to the lib directory. But If i write the table configuration in the sql-client-defaults.yaml file,i can select the result correctly Thanks, Lei From: Jark Wu Date: 2020-03-11 11:13 To: wangl...@geekplus.com.cn CC: Arvid Heise; user Subject: Re: Re: Dose flink-1.10 sql-client support kafka sink? Hi Lei, CREATE TABLE DDL [1][2] is the recommended way to register a table since 1.9. And the yaml way might be deprecated in the future. By using DDL, a registered table can both be used as source and sink. Best, Jark [1]: https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/create.html#create-table [2]: https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector On Tue, 10 Mar 2020 at 21:52, wangl...@geekplus.com.cn wrote: Thanks, works now. Seems it is because i added the schema: "ROW(out_order_code STRING,input_date BIGINT, owner_code STRING, status INT)" under format label. From: Arvid Heise Date: 2020-03-10 20:51 To: wangl...@geekplus.com.cn CC: user Subject: Re: Dose flink-1.10 sql-client support kafka sink? Hi Lei, yes Kafka as a sink is supported albeit only for appends (no deletions/updates yet) [1]. An example is a bit hidden in the documentation [2]: tables: - name: MyTableSink type: sink-table update-mode: append connector: property-version: 1 type: kafka version: "0.11" topic: OutputTopic properties: zookeeper.connect: localhost:2181 bootstrap.servers: localhost:9092 group.id: testGroup format: property-version: 1 type: json derive-schema: true schema: - name: rideId data-type: BIGINT - name: lon data-type: FLOAT - name: lat data-type: FLOAT - name: rideTime data-type: TIMESTAMP(3) [1] https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector [2] https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sqlClient.html#detached-sql-queries On Tue, Mar 10, 2020 at 10:51 AM wangl...@geekplus.com.cn wrote: I have configured source table successfully using the following configuration: - name: out_order type: source update-mode: append schema: - name: out_order_code type: STRING - name: input_date type: BIGINT - name: owner_code type: STRING connector: property-version: 1 type: kafka version: universal topic: out_order startup-mode: latest-offset properties: - key: zookeeper.connect value: 172.19.78.32:2181 - key: bootstrap.servers value: 172.19.78.32:9092 - key: group.id property-version: 1 type: json schema: "ROW(out_order_code STRING,owner_code STRING,input_date BIGINT)" How can i configure a sink table? I haven't found any useful docs for this. Thanks, Lei
Re: Re: flink sql join 可以有 state 存储并从 state 恢复数据吗?
Hi Kurt, 确实是可以 直接 flink cancel -s 保存状态。 但我是用 flink-sql-client 直接写 sql 提交的 job,再提交的时候怎样可以指定状态目录让这个任务从状态恢复呢? 谢谢, 王磊 Sender: Kurt Young Send Time: 2020-03-11 10:38 Receiver: user-zh Subject: Re: flink sql join 可以有 state 存储并从 state 恢复数据吗? 理论上来说,flink SQL的作业在编译完生成JobGraph并提交到集群上后,和Datastream的作业就没有什么本质的不同了。 应该也可以支持flink cancel -s 的功能,你可以先试下,如果碰到什么问题再看看。 Best, Kurt On Wed, Mar 11, 2020 at 10:24 AM wangl...@geekplus.com.cn < wangl...@geekplus.com.cn> wrote: > 有两个表: > tableA: key valueA > tableB: key valueB > > 我之前用 flink state 的方式存储 tableA,tableB 的消息过来以后去 query 这个 state 得到 valueA > 直接 写 flinkSQL 也可以实现这种功能,但这两个表有时间差,任务停止后重新提交会丢失部分 join 的结果。 > flinkSQL 有没有类似 flink cancel -s 保存 state 的功能呢? > > 谢谢, > 王磊 >
flink sql join 可以有 state 存储并从 state 恢复数据吗?
有两个表: tableA: key valueA tableB: key valueB 我之前用 flink state 的方式存储 tableA,tableB 的消息过来以后去 query 这个 state 得到 valueA 直接 写 flinkSQL 也可以实现这种功能,但这两个表有时间差,任务停止后重新提交会丢失部分 join 的结果。 flinkSQL 有没有类似 flink cancel -s 保存 state 的功能呢? 谢谢, 王磊
Re: Re: Dose flink-1.10 sql-client support kafka sink?
Thanks, works now. Seems it is because i added the schema: "ROW(out_order_code STRING,input_date BIGINT, owner_code STRING, status INT)" under format label. From: Arvid Heise Date: 2020-03-10 20:51 To: wangl...@geekplus.com.cn CC: user Subject: Re: Dose flink-1.10 sql-client support kafka sink? Hi Lei, yes Kafka as a sink is supported albeit only for appends (no deletions/updates yet) [1]. An example is a bit hidden in the documentation [2]: tables: - name: MyTableSink type: sink-table update-mode: append connector: property-version: 1 type: kafka version: "0.11" topic: OutputTopic properties: zookeeper.connect: localhost:2181 bootstrap.servers: localhost:9092 group.id: testGroup format: property-version: 1 type: json derive-schema: true schema: - name: rideId data-type: BIGINT - name: lon data-type: FLOAT - name: lat data-type: FLOAT - name: rideTime data-type: TIMESTAMP(3) [1] https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector [2] https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sqlClient.html#detached-sql-queries On Tue, Mar 10, 2020 at 10:51 AM wangl...@geekplus.com.cn wrote: I have configured source table successfully using the following configuration: - name: out_order type: source update-mode: append schema: - name: out_order_code type: STRING - name: input_date type: BIGINT - name: owner_code type: STRING connector: property-version: 1 type: kafka version: universal topic: out_order startup-mode: latest-offset properties: - key: zookeeper.connect value: 172.19.78.32:2181 - key: bootstrap.servers value: 172.19.78.32:9092 - key: group.id property-version: 1 type: json schema: "ROW(out_order_code STRING,owner_code STRING,input_date BIGINT)" How can i configure a sink table? I haven't found any useful docs for this. Thanks, Lei
Re: Re: Kafka sink only support append mode?
Hi Jark, Thanks for the explanation. The group by statement will result a not append stream. I have just tried a join statement and want to send the result to kafka, it also has the error: AppendStreamTableSink requires that Table has only insert changes Why the join result is not appendable. It confused me. Thanks, Lei From: Jark Wu Date: 2020-03-09 19:25 To: wangl...@geekplus.com.cn CC: user Subject: Re: Kafka sink only support append mode? Hi Lei, Yes. Currently, Kafka sink only supports append mode. Other update mode (e.g. upsert mode / retract mode) is on the agenda. For now, you can customize a KafkaTableSink with implementing UpsertStreamTableSink interface, where you will get a Tuple2 records, and the Boolean represents insert or delete operation. Then you can encode the insert/delete operation into Kafka storage or just ignore the operations. Best, Jark On Mon, 9 Mar 2020 at 19:14, wangl...@geekplus.com.cn wrote: I wrote a simple program reading from kafka using sql and sink to kafka. But only 'update-mode' = 'append' is supported for sink table and the query sql must have no group statement. Only append mode is supported for kafka sink? Thanks, Lei
Dose flink-1.10 sql-client support kafka sink?
I have configured source table successfully using the following configuration: - name: out_order type: source update-mode: append schema: - name: out_order_code type: STRING - name: input_date type: BIGINT - name: owner_code type: STRING connector: property-version: 1 type: kafka version: universal topic: out_order startup-mode: latest-offset properties: - key: zookeeper.connect value: 172.19.78.32:2181 - key: bootstrap.servers value: 172.19.78.32:9092 - key: group.id property-version: 1 type: json schema: "ROW(out_order_code STRING,owner_code STRING,input_date BIGINT)" How can i configure a sink table? I haven't found any useful docs for this. Thanks, Lei
join key 有重复的双流 join 怎样去重后发送到 kafka
有两个 kafka 作为数据源的表 order_info: order_no info order_status: order_no status 两个表的 order_no 都会有重复,来一条其中一个表的记录,会在另外一个表中找到多条记录。 我怎样实现在另外一个表中只取出与该 join key 相关的最新的一条记录并发送到 kafka 中呢? kafka 只支持 append 模式的 sink,先把 表 group 再join 行不通。 谢谢, 王磊
Kafka sink only support append mode?
I wrote a simple program reading from kafka using sql and sink to kafka. But only 'update-mode' = 'append' is supported for sink table and the query sql must have no group statement. Only append mode is supported for kafka sink? Thanks, Lei
How to use self defined json format when create table from kafka stream?
I want to rigister a table from mysql binlog like this: tEnv.sqlUpdate("CREATE TABLE order(\n" + "order_id BIGINT,\n" + "order_no VARCHAR,\n" + ") WITH (\n" + "'connector.type' = 'kafka',\n" ... + "'update-mode' = 'append',\n" + "'format.type' = 'json',\n" + "'format.derive-schema' = 'true'\n" + ")");using the following log format: { "type" : "update", "timestamp" : 1583373066000, "binlog_filename" : "mysql-bin.000453", "binlog_position" : 923020943, "database" : "wms", "table_name" : "t_pick_order", "table_id" : 131936, "columns" : [ { "id" : 1, "name" : "order_id", "column_type" : -5, "last_value" : 4606458, "value" : 4606458 }, { "id" : 2, "name" : "order_no", "column_type" : 12, "last_value" : "EDBMFSJ1S2003050006628", "value" : "EDBMFSJ1S2003050006628" }] } Surely the format.type' = 'json',\n" will not parse the result as I expected. Is there any method I can implement this? For example, using a self defined format class. Thanks, Lei wangl...@geekplus.com.cn
Submit high version compiled code jar to low version flink cluster?
The flink cluster version is 1.8.2 The application source code needs some feature only supported in 1.9.1. So it is compiled with flink-1.9.1 denendency and builed to a fat jar with all the flink dependencies. What it will happen if I submit the high version builed jar to the low verion flink cluster? Thansk, Lei
Re: Re: Flink State 过期清除 TTL 问题
Hi 唐云, 我的集群已经升到了 1.8.2, cleanupFullSnapshot 和 cleanupInRocksdbCompactFilter 都试验了下。 但 cancel -s 停止后, 生成的 savepoint 目录还是没有变小。过程是这样的: cancel -s 停止,savepoint 目录大小为 100M 代码变更,把原来的 setUpdateType 变为 cleanupFullSnapshot 新的代码从 1 的 savepoint 目录恢复 新的代码运行一天左右,再 cancel -s, 新的 savepoint 目录变大 会不会是 每次 flink run -s ,从已有 savepoint 目录中恢复的数据所有的 updateTime 都变成了当前时间? 谢谢, 王磊 wangl...@geekplus.com.cn Sender: Yun Tang Send Time: 2019-11-01 01:38 Receiver: user-zh@flink.apache.org Subject: Re: Flink State 过期清除 TTL 问题 Hi 王磊 从你的配置以及使用Flink-1.7版本看,并没有启用主动清理过期数据的功能 [1],建议对StateTtlConfig 配置 cleanupFullSnapshot,这样你在执行full snapshot(也就是Savepoint)的时候,会把过期的数据不保存在你的Savepoint内容中。如果你不启用主动清理,只有当原先的数据被读取时,才会进行清理,这可能就导致你的Savepoint内容不断增长的原因。 另外,建议升级到Flink-1.8+,对state TTL的特性支持更好,可以参考中文文章 [2] [1] https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/state/state.html#cleanup-of-expired-state [2] https://www.infoq.cn/article/L_66ga2G9oLWypvOLtIv 祝好 唐云 On 10/31/19, 10:25 AM, "wangl...@geekplus.com.cn" wrote: flink-1.7.2, 用下面的清除策略:StateTtlConfig ttlConfig = StateTtlConfig .newBuilder(Time.days(3)).setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite).setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired).build(); ValueStateDescriptor descriptor = new ValueStateDescriptor("OrderState", TypeInformation.of(OrderState.class)); descriptor.enableTimeToLive(ttlConfig); 程序更新时用 savepoint 的方式, flink cancel -s 保存到 savepoint 目录,再从 savepoint 目录恢复。 我的程序运行了一段时间,已经远大于 3 天,每次 flink cancel -s 后生成的 savepoint 目录不断变大。是过期清除策略没生效吗? 谢谢, 王磊 wangl...@geekplus.com.cn
Re: Re: IOException when using Prometheus Monitor
After downgrade the pushgateway to pushgateway-0.8.0.linux-amd64.tar.gz, no this Exception again. Thanks very much. wangl...@geekplus.com.cn From: wangl...@geekplus.com.cn Date: 2019-11-20 18:19 To: Chesnay Schepler; user Subject: Re: Re: IOException when using Prometheus Monitor Hi Chesnay, Although there's Exception, actually the metrics has been put to the pushgateway successfully. Promethues version i used: prometheus-2.8.1.linux-amd64.tar.gz pushgateway-1.0.0.linux-amd64.tar.gz flink-metrics-prometheus_2.12-1.8.2.jar I just download the tar.gz file to CentOS node, tar -xzvf and then start the process. Thanks, Lei wangl...@geekplus.com.cn From: Chesnay Schepler Date: 2019-11-20 17:46 To: wangl...@geekplus.com.cn; user Subject: Re: IOException when using Prometheus Monitor From what I found so far this appears to be an incompatibility between the pushgateway and client version. So you can either a) use an older version of the pushgateway b) bump the version of the prometheus reporter. Unfortunately I cannot tell you which version you would need. On 20/11/2019 10:24, wangl...@geekplus.com.cn wrote: Standalone session flink, 1.8.2 version. Install prometheus and pushgateway on the same host. After start-cluster.sh, every 10 seconds there's following error log: 2019-11-20 17:23:12,849 WARN org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter - Failed to push metrics to PushGateway with jobName flink58c234884dcfb60a860d7a040aab6a4a. java.io.IOException: Response code from http://10.44.51.23:9091/metrics/job/flink58c234884dcfb60a860d7a040aab6a4a was 200 at org.apache.flink.shaded.io.prometheus.client.exporter.PushGateway.doRequest(PushGateway.java:297) at org.apache.flink.shaded.io.prometheus.client.exporter.PushGateway.push(PushGateway.java:105) at org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter.report(PrometheusPushGatewayReporter.java:76) at org.apache.flink.runtime.metrics.MetricRegistryImpl$ReporterTask.run(MetricRegistryImpl.java:430) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Any insight on this? Thanks, Lei wangl...@geekplus.com.cn
Re: Re: IOException when using Prometheus Monitor
Hi Chesnay, Although there's Exception, actually the metrics has been put to the pushgateway successfully. Promethues version i used: prometheus-2.8.1.linux-amd64.tar.gz pushgateway-1.0.0.linux-amd64.tar.gz flink-metrics-prometheus_2.12-1.8.2.jar I just download the tar.gz file to CentOS node, tar -xzvf and then start the process. Thanks, Lei wangl...@geekplus.com.cn From: Chesnay Schepler Date: 2019-11-20 17:46 To: wangl...@geekplus.com.cn; user Subject: Re: IOException when using Prometheus Monitor From what I found so far this appears to be an incompatibility between the pushgateway and client version. So you can either a) use an older version of the pushgateway b) bump the version of the prometheus reporter. Unfortunately I cannot tell you which version you would need. On 20/11/2019 10:24, wangl...@geekplus.com.cn wrote: Standalone session flink, 1.8.2 version. Install prometheus and pushgateway on the same host. After start-cluster.sh, every 10 seconds there's following error log: 2019-11-20 17:23:12,849 WARN org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter - Failed to push metrics to PushGateway with jobName flink58c234884dcfb60a860d7a040aab6a4a. java.io.IOException: Response code from http://10.44.51.23:9091/metrics/job/flink58c234884dcfb60a860d7a040aab6a4a was 200 at org.apache.flink.shaded.io.prometheus.client.exporter.PushGateway.doRequest(PushGateway.java:297) at org.apache.flink.shaded.io.prometheus.client.exporter.PushGateway.push(PushGateway.java:105) at org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter.report(PrometheusPushGatewayReporter.java:76) at org.apache.flink.runtime.metrics.MetricRegistryImpl$ReporterTask.run(MetricRegistryImpl.java:430) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Any insight on this? Thanks, Lei wangl...@geekplus.com.cn
IOException when using Prometheus Monitor
Standalone session flink, 1.8.2 version. Install prometheus and pushgateway on the same host. After start-cluster.sh, every 10 seconds there's following error log: 2019-11-20 17:23:12,849 WARN org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter - Failed to push metrics to PushGateway with jobName flink58c234884dcfb60a860d7a040aab6a4a. java.io.IOException: Response code from http://10.44.51.23:9091/metrics/job/flink58c234884dcfb60a860d7a040aab6a4a was 200 at org.apache.flink.shaded.io.prometheus.client.exporter.PushGateway.doRequest(PushGateway.java:297) at org.apache.flink.shaded.io.prometheus.client.exporter.PushGateway.push(PushGateway.java:105) at org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter.report(PrometheusPushGatewayReporter.java:76) at org.apache.flink.runtime.metrics.MetricRegistryImpl$ReporterTask.run(MetricRegistryImpl.java:430) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Any insight on this? Thanks, Lei wangl...@geekplus.com.cn
state TTL 变更问题
有一个程序用到了 state, 设置 TTL 为3天。 运行一段时间后 cancel -s 停止,把过期时间设为 7 天,再从 state 文件恢复运行。 cancel -s 停止时生成的文件里面的所有 key,TTL 都会变成 7 天吗? 还是依然是 3 天? 谢谢, 王磊 wangl...@geekplus.com.cn
从 state 中恢复数据,更改 yarn container 个数会有影响吗
从 RocketMQ 中消费数据做处理。 代码中最大的并行度为 8, 提交任务时指定 -ys 4 ,会自动分配 2 个 container 作为 taskMgr 运行一段时间后以 savepoint 方式停止。 再从 savepoint 恢复,此时指定 -ys 2 , 会分配 4 个container 作为 taskMgr , 但任务提交后程序不从 RocketMQ 消费数据了,消费 TPS 一直是 0,这是什么原因呢? 谢谢, 王磊 wangl...@geekplus.com.cn
Re: Re: 怎样把 state 定时写到外部存储
Hi Congxian, 以 sink 的方式写出去最终也要落在某个地方才能供查询使用啊。 我们的 case 是写到 MySQL 中 wangl...@geekplus.com.cn Sender: Congxian Qiu Send Time: 2019-11-01 10:10 Receiver: user-zh Subject: Re: 怎样把 state 定时写到外部存储 好奇为什么要把 State 定期写到外存呢?是外部系统需要使用这些 State 吗?那为什么不把 State 以 sink 的方式写出去呢? Best, Congxian Jun Zhang <825875...@qq.com> 于2019年10月31日周四 上午10:36写道: > 是否可以注册一个定时器? > > > 你看看这个文章,是否对你有帮助 > > > https://mp.weixin.qq.com/s/VUEvvoHoupZMpxWQsEeHEA > 在2019年10月31日 10:16,wangl...@geekplus.com.cn 写道: > > > 消息驱动,QPS 很高, 每一条消息来都会更改 state 值,如果每一条消息来都写外部存储下游撑不住。 > 有没有什么方式可以定期读 state 写到外部存储? > 我现在用的是 Flink1.7.2 版本。 > > > > > > wangl...@geekplus.com.cn
Re: Re: Flink State 过期清除 TTL 问题
谢谢,了解了。 王磊 wangl...@geekplus.com.cn Sender: Yun Tang Send Time: 2019-11-01 01:38 Receiver: user-zh@flink.apache.org Subject: Re: Flink State 过期清除 TTL 问题 Hi 王磊 从你的配置以及使用Flink-1.7版本看,并没有启用主动清理过期数据的功能 [1],建议对StateTtlConfig 配置 cleanupFullSnapshot,这样你在执行full snapshot(也就是Savepoint)的时候,会把过期的数据不保存在你的Savepoint内容中。如果你不启用主动清理,只有当原先的数据被读取时,才会进行清理,这可能就导致你的Savepoint内容不断增长的原因。 另外,建议升级到Flink-1.8+,对state TTL的特性支持更好,可以参考中文文章 [2] [1] https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/state/state.html#cleanup-of-expired-state [2] https://www.infoq.cn/article/L_66ga2G9oLWypvOLtIv 祝好 唐云 On 10/31/19, 10:25 AM, "wangl...@geekplus.com.cn" wrote: flink-1.7.2, 用下面的清除策略:StateTtlConfig ttlConfig = StateTtlConfig .newBuilder(Time.days(3)).setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite).setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired).build(); ValueStateDescriptor descriptor = new ValueStateDescriptor("OrderState", TypeInformation.of(OrderState.class)); descriptor.enableTimeToLive(ttlConfig); 程序更新时用 savepoint 的方式, flink cancel -s 保存到 savepoint 目录,再从 savepoint 目录恢复。 我的程序运行了一段时间,已经远大于 3 天,每次 flink cancel -s 后生成的 savepoint 目录不断变大。是过期清除策略没生效吗? 谢谢, 王磊 wangl...@geekplus.com.cn
Flink State 过期清除 TTL 问题
flink-1.7.2, 用下面的清除策略:StateTtlConfig ttlConfig = StateTtlConfig .newBuilder(Time.days(3)).setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite).setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired).build(); ValueStateDescriptor descriptor = new ValueStateDescriptor("OrderState", TypeInformation.of(OrderState.class)); descriptor.enableTimeToLive(ttlConfig); 程序更新时用 savepoint 的方式, flink cancel -s 保存到 savepoint 目录,再从 savepoint 目录恢复。 我的程序运行了一段时间,已经远大于 3 天,每次 flink cancel -s 后生成的 savepoint 目录不断变大。是过期清除策略没生效吗? 谢谢, 王磊 wangl...@geekplus.com.cn
怎样把 state 定时写到外部存储
消息驱动,QPS 很高, 每一条消息来都会更改 state 值,如果每一条消息来都写外部存储下游撑不住。 有没有什么方式可以定期读 state 写到外部存储? 我现在用的是 Flink1.7.2 版本。 wangl...@geekplus.com.cn
Re: Re: CsvTableSink 目录没有写入具体的数据
抱歉,是我搞错了。 实际上是写入数据的。我在 windows 下做测试,刷新下文件的大小始终是 0 , 只有编辑看下那个文件显示的文件大小才会变更。 wangl...@geekplus.com.cn Sender: Alec Chen Send Time: 2019-08-09 10:17 Receiver: user-zh Subject: Re: Re: CsvTableSink 目录没有写入具体的数据 没数据是因为没有trigger执行, 参考sample code from doc( https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/common.html ) // get a StreamTableEnvironment, works for BatchTableEnvironment equivalentlyStreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); // create a TableSinkTableSink sink = new CsvTableSink("/path/to/file", fieldDelim = "|"); // register the TableSink with a specific schemaString[] fieldNames = {"a", "b", "c"};TypeInformation[] fieldTypes = {Types.INT, Types.STRING, Types.LONG};tableEnv.registerTableSink("CsvSinkTable", fieldNames, fieldTypes, sink); // compute a result Table using Table API operators and/or SQL queriesTable result = ...// emit the result Table to the registered TableSinkresult.insertInto("CsvSinkTable"); // execute the program 加上 tableEnv.execute(); wangl...@geekplus.com.cn 于2019年8月9日周五 上午9:42写道: > > 我接入了一个 RocketMQ 的流作为输入。 > > > DataStream> ds = env.addSource(new > RocketMQSource( > > System.out.println(res); > return res; > } > }); > > > tableEnv.registerDataStream("t_pick_task", ds, "pick_task_id, > pick_list_no, sku_code"); > > TableSink csvSink = new CsvTableSink("D:\\data\\flink",","); > String[] fieldNames = {"num"}; > TypeInformation[] fieldTypes = {Types.INT}; > tableEnv.registerTableSink("RubberOrders", fieldNames, fieldTypes, > csvSink); > tableEnv.sqlUpdate( > "INSERT INTO RubberOrders SELECT pick_task_id FROM > t_pick_task"); > > > > wangl...@geekplus.com.cn > > Sender: Alec Chen > Send Time: 2019-08-08 21:01 > Receiver: user-zh > Subject: Re: CsvTableSink 目录没有写入具体的数据 > 完整代码发一下 > > wangl...@geekplus.com.cn 于2019年8月8日周四 下午7:37写道: > > > > > 我按官网上的 > > > https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/sql.html#specifying-a-query > > 例子写的代码 > > 但运行后 CsvTableSink 指定的目录只生成了空文件,没有具体的内容,这是为什么呢? > > > > > > > > wangl...@geekplus.com.cn > > >
Re: Re: CsvTableSink 目录没有写入具体的数据
我接入了一个 RocketMQ 的流作为输入。 DataStream> ds = env.addSource(new RocketMQSource( System.out.println(res); return res; } }); tableEnv.registerDataStream("t_pick_task", ds, "pick_task_id, pick_list_no, sku_code"); TableSink csvSink = new CsvTableSink("D:\\data\\flink",","); String[] fieldNames = {"num"}; TypeInformation[] fieldTypes = {Types.INT}; tableEnv.registerTableSink("RubberOrders", fieldNames, fieldTypes, csvSink); tableEnv.sqlUpdate( "INSERT INTO RubberOrders SELECT pick_task_id FROM t_pick_task"); wangl...@geekplus.com.cn Sender: Alec Chen Send Time: 2019-08-08 21:01 Receiver: user-zh Subject: Re: CsvTableSink 目录没有写入具体的数据 完整代码发一下 wangl...@geekplus.com.cn 于2019年8月8日周四 下午7:37写道: > > 我按官网上的 > https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/sql.html#specifying-a-query > 例子写的代码 > 但运行后 CsvTableSink 指定的目录只生成了空文件,没有具体的内容,这是为什么呢? > > > > wangl...@geekplus.com.cn >
CsvTableSink 目录没有写入具体的数据
我按官网上的 https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/sql.html#specifying-a-query 例子写的代码 但运行后 CsvTableSink 指定的目录只生成了空文件,没有具体的内容,这是为什么呢? wangl...@geekplus.com.cn
Does RocksDBStateBackend need a separate RocksDB service?
In my code, I just setStateBackend with a hdfs direcoty. env.setStateBackend(new RocksDBStateBackend("hdfs://user/test/job")); Is there an embeded RocksDB service in the flink task? wangl...@geekplus.com.cn
Re: Re: Unable to restore state value after job failed using RocksDBStateBackend
I start and cancel it just in my intellij idea development environment. First click the run button, then click the red stop button, and then click the run button again. Let me google about the savepoint. Thanks, Lei Wang wangl...@geekplus.com.cn From: Stephan Ewen Date: 2019-06-25 20:36 To: user Subject: Re: Unable to restore state value after job failed using RocksDBStateBackend If you manually cancel and restart the job, state is only carried forward if you use a savepoint. Can you check if that is what you are doing? On Tue, Jun 25, 2019 at 2:21 PM Simon Su wrote: Hi wanglei Can you post how you restart the job ? Thanks, Simon On 06/25/2019 20:11,wangl...@geekplus.com.cn wrote: public class StateProcessTest extends KeyedProcessFunction, String> { private transient ValueState> state; public void processElement(Tuple2 value, Context ctx, Collector out) throws Exception { Tuple2 stateValue = state.value(); if(stateValue == null){ log.info("## initialize"); stateValue = new Tuple2(34l,56l); } state.update(stateValue); } @Override public void open(Configuration parameters) throws Exception { ValueStateDescriptor> descriptor = new ValueStateDescriptor>("avg", TypeInformation.of( new TypeHint>() {})); state = getRuntimeContext().getState(descriptor); } } Every time I restarted the job, The stateValue is still null. wangl...@geekplus.com.cn
Unable to restore state value after job failed using RocksDBStateBackend
public class StateProcessTest extends KeyedProcessFunction, String> { private transient ValueState> state; public void processElement(Tuple2 value, Context ctx, Collector out) throws Exception { Tuple2 stateValue = state.value(); if(stateValue == null){ log.info("## initialize"); stateValue = new Tuple2(34l,56l); } state.update(stateValue); } @Override public void open(Configuration parameters) throws Exception { ValueStateDescriptor> descriptor = new ValueStateDescriptor>("avg", TypeInformation.of( new TypeHint>() {})); state = getRuntimeContext().getState(descriptor); } } Every time I restarted the job, The stateValue is still null. wangl...@geekplus.com.cn
How to trigger the window function even there's no message input in this window?
windowAll(TumblingProcessingTimeWindows.of(Time.seconds(10))).process(new MyProcessWindowFunction());How can i trigger the MyProcessWindowFunction even there's no input during this window time? wangl...@geekplus.com.cn
Re: Re: How can i just implement a crontab function using flink?
I tried。 But the MyProcessWindowFunction still not tigged when there's no event in the window Any insight on this? source.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks() { @Override public Watermark getCurrentWatermark() { return new Watermark(System.currentTimeMillis() - 1); } @Override public long extractTimestamp(Map map, long l) { return System.currentTimeMillis(); } }).windowAll(TumblingEventTimeWindows.of(Time.seconds(10))).process(new MyProcessWindowFunction()); wangl...@geekplus.com.cn From: Puneet Kinra Date: 2019-05-24 17:02 To: wangl...@geekplus.com.cn CC: user Subject: Re: How can i just implement a crontab function using flink? There is concept of periodic watermarker , you can use that if you are working on eventtime. On Fri, May 24, 2019 at 1:51 PM wangl...@geekplus.com.cn wrote: I want to do something every one minute. Using TumblingWindow, the function will not be triigged if there's no message received during this minute. But i still need to execute the funtion. How can i implement it ? wangl...@geekplus.com.cn -- Cheers Puneet Kinra Mobile:+918800167808 | Skype : puneet.ki...@customercentria.com e-mail :puneet.ki...@customercentria.com
Re: Re: How can I add config file as classpath in taskmgr node when submitting a flink job?
Thanks. Let me have a try wangl...@geekplus.com.cn From: Yang Wang Date: 2019-05-28 09:47 To: wangl...@geekplus.com.cn CC: user Subject: Re: How can I add config file as classpath in taskmgr node when submitting a flink job? Hi, wangleiYou could use the flink distributed cache to register some config files and then access them in your task.1. Register a cached fileStreamExecutionEnvironment.registerCachedFile(inputFile.toString(), "test_data", false);2. Access the file in your taskfinal Path testFile = getRuntimeContext().getDistributedCache().getFile("test_data").toPath(); wangl...@geekplus.com.cn 于2019年5月26日周日 上午12:06写道: When starting a single node java application, I can add some config file to it. How can i implenment it when submitting a flink job? The config file need to be read from taskMgr node and used to initialize some classess. wangl...@geekplus.com.cn