How to start flink standalone session on windows ?

2020-07-23 Thread wangl...@geekplus.com.cn

There's no  start-cluster.bat and flink.bat in bin directory.

So how can i start flink on windowns OS?

Thanks,
Lei 


wangl...@geekplus.com.cn 


flink-1.11 在 windows 下怎样启动

2020-07-23 Thread wangl...@geekplus.com.cn

我看 flink-1.11 发布包 bin 目录没有 windows 启动所需的 .bat 文件了。
那在 windows 下怎样启动呢?

谢谢
王磊




wangl...@geekplus.com.cn 


flink-1.11 KafkaDynamicTableSouce groupBy 结果怎样发送到 kafka

2020-07-16 Thread wangl...@geekplus.com.cn

 INSERT INTO kafka_dws_artemis_out_order select warehouse_id, count(*) from 
kafka_ods_artemis_out_order group by warehouse_id;
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.TableException: Table sink 
'myhive.wanglei.kafka_dws_artemis_out_order' doesn't support consuming update 
changes which is produced by node GroupAggregate(groupBy=[warehouse_id], 
select=[warehouse_id, COUNT(*) AS EXPR$1])

在 Flink-1.10 中可以更改 KafkaTableSinkBase 让它 implements RetractStream 实现。
 
我看现在 Flink-1.11 中是用了  KafkaDynamicSource, KafkaDynamicSink,这样怎样改动才能让 GroupBy 
的结果也发送到 Kafka 呢?

谢谢,
王磊 


wangl...@geekplus.com.cn 



Re: Re: FlinkSQL 是通过怎样的机制找到要执行的 Java 代码的呢?

2020-07-16 Thread wangl...@geekplus.com.cn

 谢谢,我理解了。



wangl...@geekplus.com.cn 

Sender: Harold.Miao
Send Time: 2020-07-16 19:33
Receiver: user-zh
Subject: Re: Re: FlinkSQL 是通过怎样的机制找到要执行的 Java 代码的呢?
我的理解 :  大概就是通过spi机制加载类,然后通过属性去过滤出来需要的class   类似下面的代码
 
private static  T findSingleInternal(
  Class factoryClass,
  Map properties,
  Optional classLoader) {
 
   List tableFactories = discoverFactories(classLoader);
   List filtered = filter(tableFactories, factoryClass, properties);
 
   if (filtered.size() > 1) {
  throw new AmbiguousTableFactoryException(
 filtered,
 factoryClass,
 tableFactories,
 properties);
   } else {
  return filtered.get(0);
   }
}
 
private static List
discoverFactories(Optional classLoader) {
   try {
  List result = new LinkedList<>();
  ClassLoader cl =
classLoader.orElse(Thread.currentThread().getContextClassLoader());
  ServiceLoader
 .load(TableFactory.class, cl)
 .iterator()
 .forEachRemaining(result::add);
  return result;
   } catch (ServiceConfigurationError e) {
  LOG.error("Could not load service provider for table factories.", e);
  throw new TableException("Could not load service provider for
table factories.", e);
   }
 
}
 
 
wangl...@geekplus.com.cn  于2020年7月16日周四 下午7:04写道:
 
>
> 我在
> flink-formats/flink-json/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
> 找到了 SPI 的配置:
>
> org.apache.flink.formats.json.JsonFileSystemFormatFactory
> org.apache.flink.formats.json.JsonFormatFactory
> org.apache.flink.formats.json.debezium.DebeziumJsonFormatFactory
> org.apache.flink.formats.json.canal.CanalJsonFormatFactory
>
> 还是没有搞清楚 指定 'format'='debezium-json' 怎么就能对应到 DebeziumJsonFormatFactory
> 我的理解肯定要有一个地方指明 debezium-json 要对应到 DebeziumJsonFormatFactory, 但是我 grep
> 代码没找到类似的关系映射配置。
>
>
> 谢谢,
> 王磊
>
>
>
> wangl...@geekplus.com.cn
>
>
> Sender: godfrey he
> Send Time: 2020-07-16 16:38
> Receiver: user-zh
> Subject: Re: FlinkSQL 是通过怎样的机制找到要执行的 Java 代码的呢?
> 通过Java 的 SPI 机制来找到对应的 format,可以参考 [1]
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/#how-to-use-connectors
>
> Best,
> Godfrey
>
> wangl...@geekplus.com.cn  于2020年7月16日周四
> 下午4:02写道:
>
> > 比如:
> >
> > CREATE TABLE my_table (
> >   id BIGINT,
> >  first_name STRING,
> >  last_name STRING,
> >  email STRING
> > ) WITH (
> >  'connector'='kafka',
> >  'topic'='user_topic',
> >  'properties.bootstrap.servers'='localhost:9092',
> >  'scan.startup.mode'='earliest-offset',
> >  'format'='debezium-json'
> > );
> >
> > 最终解析 debezium-json 应该是
> >
> flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/debezium
> > 下面的代码
> > 但 flinkSQL 是怎样的机制找到要执行的 Java 代码的呢?
> >
> > 谢谢,
> > 王磊
> >
> >
> > wangl...@geekplus.com.cn
> >
> >
>
 
 
-- 
 
Best Regards,
Harold Miao


Re: Re: FlinkSQL 是通过怎样的机制找到要执行的 Java 代码的呢?

2020-07-16 Thread wangl...@geekplus.com.cn

我在 
flink-formats/flink-json/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
 找到了 SPI 的配置:

org.apache.flink.formats.json.JsonFileSystemFormatFactory
org.apache.flink.formats.json.JsonFormatFactory
org.apache.flink.formats.json.debezium.DebeziumJsonFormatFactory
org.apache.flink.formats.json.canal.CanalJsonFormatFactory

还是没有搞清楚 指定 'format'='debezium-json' 怎么就能对应到 DebeziumJsonFormatFactory
我的理解肯定要有一个地方指明 debezium-json 要对应到 DebeziumJsonFormatFactory, 但是我 grep 
代码没找到类似的关系映射配置。


谢谢,
王磊



wangl...@geekplus.com.cn 

 
Sender: godfrey he
Send Time: 2020-07-16 16:38
Receiver: user-zh
Subject: Re: FlinkSQL 是通过怎样的机制找到要执行的 Java 代码的呢?
通过Java 的 SPI 机制来找到对应的 format,可以参考 [1]
 
[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/#how-to-use-connectors
 
Best,
Godfrey
 
wangl...@geekplus.com.cn  于2020年7月16日周四 下午4:02写道:
 
> 比如:
>
> CREATE TABLE my_table (
>   id BIGINT,
>  first_name STRING,
>  last_name STRING,
>  email STRING
> ) WITH (
>  'connector'='kafka',
>  'topic'='user_topic',
>  'properties.bootstrap.servers'='localhost:9092',
>  'scan.startup.mode'='earliest-offset',
>  'format'='debezium-json'
> );
>
> 最终解析 debezium-json 应该是
> flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/debezium
> 下面的代码
> 但 flinkSQL 是怎样的机制找到要执行的 Java 代码的呢?
>
> 谢谢,
> 王磊
>
>
> wangl...@geekplus.com.cn
>
>


Re: [sql-client] 通过sql-client提交sql怎么设置checkpointing.interval

2020-07-16 Thread wangl...@geekplus.com.cn

直接在 flink-conf.yaml 文件中加配置
execution.checkpointing.interval: 6




wangl...@geekplus.com.cn 

 
Sender: Harold.Miao
Send Time: 2020-07-16 13:27
Receiver: user-zh
Subject: [sql-client] 通过sql-client提交sql怎么设置checkpointing.interval
hi flink users
 
通过sql-client提交sql怎么设置checkpointing.interval?
我看了一下sql-client-defaults.yaml中的execution, 并没有发现这个参数。请教大家一下。
谢谢
 
 
 
-- 
 
Best Regards,
Harold Miao


FlinkSQL 是通过怎样的机制找到要执行的 Java 代码的呢?

2020-07-16 Thread wangl...@geekplus.com.cn
比如:

CREATE TABLE my_table (
  id BIGINT,
 first_name STRING,
 last_name STRING,
 email STRING
) WITH (
 'connector'='kafka',
 'topic'='user_topic',
 'properties.bootstrap.servers'='localhost:9092',
 'scan.startup.mode'='earliest-offset',
 'format'='debezium-json'
);

最终解析 debezium-json 应该是  
flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/debezium 
下面的代码
但 flinkSQL 是怎样的机制找到要执行的 Java 代码的呢?

谢谢,
王磊


wangl...@geekplus.com.cn 



回复: FlinkSQL 入到 MySQL后汉字乱码

2020-07-15 Thread wangl...@geekplus.com.cn

是 MySQL_tableB 所在的 server 端字符设置有问题。
配置中加上下面的配置就好了。


[mysqld] character-set-server=utf8 [client] default-character-set=utf8 [mysql] 
default-character-set=utf8




wangl...@geekplus.com.cn 

发件人: wangl...@geekplus.com.cn
发送时间: 2020-07-15 16:34
收件人: user-zh
主题: FlinkSQL 入到 MySQL后汉字乱码
 
KafkaTable:kafka 消息
MySQL_tableA:  维表,维表里 value 是汉字
MySQL_tableB:  join后的结果表。和 MySQL_tableA 不在同一台服务器上。
 
我直接在 flink sql client   SELECT 是可以正常显示, 但 INSERT INTO MySQL_tableB SELECT 后到 
MySQL_tableB 里去查看,汉字就乱码了。
大家有什么建议吗?
 
谢谢,
王磊
 
 
 
wangl...@geekplus.com.cn 


Re: Re: 不能实时读取实时写入到 Hive 的数据

2020-07-14 Thread wangl...@geekplus.com.cn

谢谢,根本原因就是  flink sql-client 客户端默认没有设置 checkpoint 导致的。



wangl...@geekplus.com.cn 

 
Sender: Rui Li
Send Time: 2020-07-14 18:29
Receiver: user-zh
cc: Leonard Xu; 夏帅
Subject: Re: Re: 不能实时读取实时写入到 Hive 的数据
流数据写hive时,不管是分区表还是非分区表,commit都是通过checkpoint触发的。用SQL
client的话可以在flink-conf.yaml里设置execution.checkpointing.interval来开启checkpoint
 
On Tue, Jul 14, 2020 at 5:49 PM wangl...@geekplus.com.cn <
wangl...@geekplus.com.cn> wrote:
 
> 我把问题简化一下,创建 Hive 表时不带任何参数
>
> CREATE TABLE hive_ods_wms_pick_order (
>   order_no STRING,
>   status INT,
>   dispatch_time TIMESTAMP
> )  STORED AS parquet
>
> INSERT INTO TABLE hive_ods_wms_pick_order SELECT order_no, status,
> dispatch_time FROM kafka_ods_wms_pick_order;
>
> 我用的 sql-client 客户端,15 分钟过去了 hive 表对应的 hdfs 目录为什么还只是有一个大小为 0 的 .part 文件呢?
> 我在 flink 客户端 SELECT order_no, status, dispatch_time FROM
> kafka_ods_wms_pick_order 确实是有数据返回的。
>
> 我在 flink web ui 看了下这个 job 的 Checkpoint Counts 是 0.
> 是需要让 job 做 checkpoint 才能写到 hdfs 上吗?
> 我用 Flink sql-client 客户端怎么设置做 checkpoint 的频率呢?
>
> 谢谢,
> 王磊
>
>
>
>
> wangl...@geekplus.com.cn
>
>
> 发件人: Leonard Xu
> 发送时间: 2020-07-14 17:20
> 收件人: user-zh; 夏帅
> 抄送: wangl...@geekplus.com.cn
> 主题: Re: 不能实时读取实时写入到 Hive 的数据
>
> Hi, wanglei
>
> 这个参数 'sink.partition-commit.delay'='1 h’会在cp 完成后 + 你设置的1h delay后才会提交 hive
> 的分区已完成信息(通过metastore或success文件).
>
> 你看下夏帅贴的邮件,检查下 checkpoint 和  partition-commit的设置
>
> 祝好,
> Leonard Xu
>
>
> 在 2020年7月14日,16:59,夏帅  写道:
>
> 你好,
> 可以参考下这个问题的解决
>
> http://apache-flink.147419.n8.nabble.com/Table-options-do-not-contain-an-option-key-connector-for-discovering-a-connector-td4767.html
>
>
> --
> 发件人:wangl...@geekplus.com.cn 
> 发送时间:2020年7月14日(星期二) 16:50
> 收件人:user-zh ; 夏帅 ;
> Leonard Xu 
> 主 题:Re: 回复: 不能实时读取实时写入到 Hive 的数据
>
>
> 应该是我没有理解 partitiion-commit 的意思,我看这里有文档:
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/filesystem.html#partition-commit
>
>
> CREATE TABLE kafka_ods_wms_pick_order (
> order_no STRING,
> status INT,
> dispatch_time TIMESTAMP(3)
> ) WITH (
> 'connector' = 'kafka',
> 'topic' = 'ods_wms_pick_order',
> 'properties.bootstrap.servers' = ':9092',
> 'properties.group.id' = 'testGroup',
> 'format' = 'json',
> 'scan.startup.mode' = 'latest-offset'
> )
>
>
> CREATE TABLE hive_ods_wms_pick_order (
>  order_no STRING,
>  status INT,
>  dispatch_time TIMESTAMP
> ) PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet TBLPROPERTIES (
>  'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00',
>  'sink.partition-commit.trigger'='partition-time',
>  'sink.partition-commit.delay'='1 h',
>  'sink.partition-commit.policy.kind'='metastore,success-file'
> );
>
> INSERT INTO TABLE hive_ods_wms_pick_order SELECT order_no, status,
> dispatch_time, DATE_FORMAT(dispatch_time, '-MM-dd'),
> DATE_FORMAT(dispatch_time, 'HH') FROM kafka_ods_wms_pick_order;
> SELECT * FROM hive_ods_wms_pick_order /*+
> OPTIONS('streaming-source.enable'='true',
> 'streaming-source.consume-start-offset'='2020-07-24') */;
>
>
>
>
> wangl...@geekplus.com.cn
>
>
> Sender: 夏帅
> Send Time: 2020-07-14 16:43
> Receiver: user-zh; xbjtdcq
> Subject: 回复:Re: 不能实时读取实时写入到 Hive 的数据
> 你好,
> 这说明写入的hive文件没有进行rollup,可以贴下SQL么
> --
> 发件人:wangl...@geekplus.com.cn 
> 发送时间:2020年7月14日(星期二) 16:40
> 收件人:user-zh ; xbjtdcq 
> 主 题:回复: Re: 不能实时读取实时写入到 Hive 的数据
> 我加上了这个 tablehint 。
> 任务提交上去了,但客户端还是没有任何返回显示。
> 我到 hadoop 集群上看了下 hive 表所在的这个目录,所有的文件都是 .part 开头的 inprogress 文件。
> 谢谢,
> 王磊
> wangl...@geekplus.com.cn
> 发件人: Leonard Xu
> 发送时间: 2020-07-14 16:17
> 收件人: user-zh
> 主题: Re: 不能实时读取实时写入到 Hive 的数据
> HI, wanglei
> 你开启了 streaming-source.enable
> 吗?这个参数用于指定如何读取是batch读,还是stream读,如果你要实时读的话应该把这个值设定为true, 可以使用tablehints
> 方便地指定参数。
> SELECT * FROM hive_table /*+ OPTIONS('streaming-source.enable'='true',
> 'streaming-source.consume-start-offset'='2020-05-20') */;
> 就在你看得这个页面应该有对应的文档说明如何读取hive数据。
> 祝好,
> Leonard Xu
> 在 2020年7月14日,15:47,wangl...@geekplus.com.cn 写道:
>
>
> 试验了一下 Flink-1.11  hive streaming 的功能
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/hive/hive_streaming.html
>
> 创建 kafka 表,通过 SQL 实时写入 Hive.
>
> 但我再通过 flink sql-client 客户端 select * from hive_table 客户端没有任何返回,通过 flink
> webUI 页面观察 这个 select * from hive_table 的 job 已经结束了。
>
> 谢谢,
> 王磊
>
>
>
> wangl...@geekplus.com.cn
>
>
>
 
-- 
Best regards!
Rui Li


回复: Re: 不能实时读取实时写入到 Hive 的数据

2020-07-14 Thread wangl...@geekplus.com.cn
我把问题简化一下,创建 Hive 表时不带任何参数

CREATE TABLE hive_ods_wms_pick_order (
  order_no STRING,
  status INT,
  dispatch_time TIMESTAMP
)  STORED AS parquet

INSERT INTO TABLE hive_ods_wms_pick_order SELECT order_no, status, 
dispatch_time FROM kafka_ods_wms_pick_order;

我用的 sql-client 客户端,15 分钟过去了 hive 表对应的 hdfs 目录为什么还只是有一个大小为 0 的 .part 文件呢?
我在 flink 客户端 SELECT order_no, status, dispatch_time FROM 
kafka_ods_wms_pick_order 确实是有数据返回的。

我在 flink web ui 看了下这个 job 的 Checkpoint Counts 是 0.
是需要让 job 做 checkpoint 才能写到 hdfs 上吗?
我用 Flink sql-client 客户端怎么设置做 checkpoint 的频率呢?

谢谢,
王磊




wangl...@geekplus.com.cn 

 
发件人: Leonard Xu
发送时间: 2020-07-14 17:20
收件人: user-zh; 夏帅
抄送: wangl...@geekplus.com.cn
主题: Re: 不能实时读取实时写入到 Hive 的数据

Hi, wanglei

这个参数 'sink.partition-commit.delay'='1 h’会在cp 完成后 + 你设置的1h delay后才会提交 hive 
的分区已完成信息(通过metastore或success文件).

你看下夏帅贴的邮件,检查下 checkpoint 和  partition-commit的设置

祝好,
Leonard Xu


在 2020年7月14日,16:59,夏帅  写道:

你好,
可以参考下这个问题的解决
http://apache-flink.147419.n8.nabble.com/Table-options-do-not-contain-an-option-key-connector-for-discovering-a-connector-td4767.html


--
发件人:wangl...@geekplus.com.cn 
发送时间:2020年7月14日(星期二) 16:50
收件人:user-zh ; 夏帅 ; Leonard Xu 

主 题:Re: 回复: 不能实时读取实时写入到 Hive 的数据


应该是我没有理解 partitiion-commit 
的意思,我看这里有文档:https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/filesystem.html#partition-commit


CREATE TABLE kafka_ods_wms_pick_order (
order_no STRING,
status INT,
dispatch_time TIMESTAMP(3)
) WITH (
'connector' = 'kafka',
'topic' = 'ods_wms_pick_order',
'properties.bootstrap.servers' = ':9092',
'properties.group.id' = 'testGroup',
'format' = 'json',
'scan.startup.mode' = 'latest-offset'
)


CREATE TABLE hive_ods_wms_pick_order (
 order_no STRING,
 status INT,
 dispatch_time TIMESTAMP
) PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet TBLPROPERTIES (
 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00',
 'sink.partition-commit.trigger'='partition-time',
 'sink.partition-commit.delay'='1 h',
 'sink.partition-commit.policy.kind'='metastore,success-file'
);

INSERT INTO TABLE hive_ods_wms_pick_order SELECT order_no, status, 
dispatch_time, DATE_FORMAT(dispatch_time, '-MM-dd'), 
DATE_FORMAT(dispatch_time, 'HH') FROM kafka_ods_wms_pick_order;
SELECT * FROM hive_ods_wms_pick_order /*+ 
OPTIONS('streaming-source.enable'='true', 
'streaming-source.consume-start-offset'='2020-07-24') */;




wangl...@geekplus.com.cn


Sender: 夏帅
Send Time: 2020-07-14 16:43
Receiver: user-zh; xbjtdcq
Subject: 回复:Re: 不能实时读取实时写入到 Hive 的数据
你好,
这说明写入的hive文件没有进行rollup,可以贴下SQL么
--
发件人:wangl...@geekplus.com.cn 
发送时间:2020年7月14日(星期二) 16:40
收件人:user-zh ; xbjtdcq 
主 题:回复: Re: 不能实时读取实时写入到 Hive 的数据
我加上了这个 tablehint 。
任务提交上去了,但客户端还是没有任何返回显示。 
我到 hadoop 集群上看了下 hive 表所在的这个目录,所有的文件都是 .part 开头的 inprogress 文件。 
谢谢,
王磊
wangl...@geekplus.com.cn 
发件人: Leonard Xu
发送时间: 2020-07-14 16:17
收件人: user-zh
主题: Re: 不能实时读取实时写入到 Hive 的数据
HI, wanglei
你开启了 streaming-source.enable 
吗?这个参数用于指定如何读取是batch读,还是stream读,如果你要实时读的话应该把这个值设定为true, 可以使用tablehints 方便地指定参数。
SELECT * FROM hive_table /*+ OPTIONS('streaming-source.enable'='true', 
'streaming-source.consume-start-offset'='2020-05-20') */;
就在你看得这个页面应该有对应的文档说明如何读取hive数据。
祝好,
Leonard Xu
在 2020年7月14日,15:47,wangl...@geekplus.com.cn 写道:


试验了一下 Flink-1.11  hive streaming 的功能
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/hive/hive_streaming.html

创建 kafka 表,通过 SQL 实时写入 Hive.

但我再通过 flink sql-client 客户端 select * from hive_table 客户端没有任何返回,通过 flink webUI 
页面观察 这个 select * from hive_table 的 job 已经结束了。

谢谢,
王磊



wangl...@geekplus.com.cn 




Re: 回复: 不能实时读取实时写入到 Hive 的数据

2020-07-14 Thread wangl...@geekplus.com.cn
应该是我没有理解 partitiion-commit 
的意思,我看这里有文档:https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/filesystem.html#partition-commit


CREATE TABLE kafka_ods_wms_pick_order (
 order_no STRING,
 status INT,
 dispatch_time TIMESTAMP(3)
) WITH (
 'connector' = 'kafka',
 'topic' = 'ods_wms_pick_order',
 'properties.bootstrap.servers' = ':9092',
 'properties.group.id' = 'testGroup',
 'format' = 'json',
 'scan.startup.mode' = 'latest-offset'
)


CREATE TABLE hive_ods_wms_pick_order (
  order_no STRING,
  status INT,
  dispatch_time TIMESTAMP
) PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet TBLPROPERTIES (
  'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00',
  'sink.partition-commit.trigger'='partition-time',
  'sink.partition-commit.delay'='1 h',
  'sink.partition-commit.policy.kind'='metastore,success-file'
);

INSERT INTO TABLE hive_ods_wms_pick_order SELECT order_no, status, 
dispatch_time, DATE_FORMAT(dispatch_time, '-MM-dd'), 
DATE_FORMAT(dispatch_time, 'HH') FROM kafka_ods_wms_pick_order;
SELECT * FROM hive_ods_wms_pick_order /*+ 
OPTIONS('streaming-source.enable'='true', 
'streaming-source.consume-start-offset'='2020-07-24') */;




wangl...@geekplus.com.cn 

 
Sender: 夏帅
Send Time: 2020-07-14 16:43
Receiver: user-zh; xbjtdcq
Subject: 回复:Re: 不能实时读取实时写入到 Hive 的数据
你好,
这说明写入的hive文件没有进行rollup,可以贴下SQL么
 
 
--
发件人:wangl...@geekplus.com.cn 
发送时间:2020年7月14日(星期二) 16:40
收件人:user-zh ; xbjtdcq 
主 题:回复: Re: 不能实时读取实时写入到 Hive 的数据
 
 
我加上了这个 tablehint 。
任务提交上去了,但客户端还是没有任何返回显示。 
我到 hadoop 集群上看了下 hive 表所在的这个目录,所有的文件都是 .part 开头的 inprogress 文件。 
 
谢谢,
王磊
 
 
 
wangl...@geekplus.com.cn 
 
发件人: Leonard Xu
发送时间: 2020-07-14 16:17
收件人: user-zh
主题: Re: 不能实时读取实时写入到 Hive 的数据
HI, wanglei
 
你开启了 streaming-source.enable 
吗?这个参数用于指定如何读取是batch读,还是stream读,如果你要实时读的话应该把这个值设定为true, 可以使用tablehints 方便地指定参数。
 
SELECT * FROM hive_table /*+ OPTIONS('streaming-source.enable'='true', 
'streaming-source.consume-start-offset'='2020-05-20') */;
 
就在你看得这个页面应该有对应的文档说明如何读取hive数据。
 
祝好,
Leonard Xu
 
> 在 2020年7月14日,15:47,wangl...@geekplus.com.cn 写道:
> 
> 
> 试验了一下 Flink-1.11  hive streaming 的功能
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/hive/hive_streaming.html
> 
> 创建 kafka 表,通过 SQL 实时写入 Hive.
> 
> 但我再通过 flink sql-client 客户端 select * from hive_table 客户端没有任何返回,通过 flink webUI 
> 页面观察 这个 select * from hive_table 的 job 已经结束了。
> 
> 谢谢,
> 王磊
> 
> 
> 
> wangl...@geekplus.com.cn 
> 
 


回复: Re: 不能实时读取实时写入到 Hive 的数据

2020-07-14 Thread wangl...@geekplus.com.cn

我加上了这个 tablehint 。
任务提交上去了,但客户端还是没有任何返回显示。 
我到 hadoop 集群上看了下 hive 表所在的这个目录,所有的文件都是 .part 开头的 inprogress 文件。 

谢谢,
王磊



wangl...@geekplus.com.cn 

发件人: Leonard Xu
发送时间: 2020-07-14 16:17
收件人: user-zh
主题: Re: 不能实时读取实时写入到 Hive 的数据
HI, wanglei
 
你开启了 streaming-source.enable 
吗?这个参数用于指定如何读取是batch读,还是stream读,如果你要实时读的话应该把这个值设定为true, 可以使用tablehints 方便地指定参数。
 
SELECT * FROM hive_table /*+ OPTIONS('streaming-source.enable'='true', 
'streaming-source.consume-start-offset'='2020-05-20') */;
 
就在你看得这个页面应该有对应的文档说明如何读取hive数据。
 
祝好,
Leonard Xu
 
> 在 2020年7月14日,15:47,wangl...@geekplus.com.cn 写道:
> 
> 
> 试验了一下 Flink-1.11  hive streaming 的功能
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/hive/hive_streaming.html
> 
> 创建 kafka 表,通过 SQL 实时写入 Hive.
> 
> 但我再通过 flink sql-client 客户端 select * from hive_table 客户端没有任何返回,通过 flink webUI 
> 页面观察 这个 select * from hive_table 的 job 已经结束了。
> 
> 谢谢,
> 王磊
> 
> 
> 
> wangl...@geekplus.com.cn 
> 
 


不能实时读取实时写入到 Hive 的数据

2020-07-14 Thread wangl...@geekplus.com.cn

试验了一下 Flink-1.11  hive streaming 的功能
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/hive/hive_streaming.html

创建 kafka 表,通过 SQL 实时写入 Hive.

但我再通过 flink sql-client 客户端 select * from hive_table 客户端没有任何返回,通过 flink webUI 
页面观察 这个 select * from hive_table 的 job 已经结束了。

谢谢,
王磊



wangl...@geekplus.com.cn 



Re: Re: How to dynamically initialize flink metrics in invoke method and then reuse it?

2020-07-03 Thread wangl...@geekplus.com.cn

Seems there's no direct solution.
Perhaps i can implement this by initializing a HashMap with all 
the possible value of tableName  in `open` mehtod and get the corresponding  
Meter according to tableName in the `invoke` method. 


Thanks,
Lei 


wangl...@geekplus.com.cn 
 
Sender: wangl...@geekplus.com.cn
Send Time: 2020-07-03 14:27
Receiver: Xintong Song
cc: user
Subject: Re: Re: How to dynamically initialize flink metrics in invoke method 
and then reuse it?
Hi Xintong, 

Yes, initializing the metric in the `open` method works, but it doesn't solve 
my problem. 
I want to initialize the metric with a name that is extracted from the record 
content. Only in the `invoke` method i can do it.

Actually my scenario is as follows.
The record is MySQL binlog info.  I want to monitor the qps by tableName. The 
tableName is different for every record. 

Thanks,
Lei




wangl...@geekplus.com.cn 

Sender: Xintong Song
Send Time: 2020-07-03 13:14
Receiver: wangl...@geekplus.com.cn
cc: user
Subject: Re: How to dynamically initialize flink metrics in invoke method and 
then reuse it?
Hi Lei,

I think you should initialize the metric in the `open` method. Then you can 
save the initialized metric as a class field, and update it in the `invoke` 
method for each record.

Thank you~
Xintong Song


On Fri, Jul 3, 2020 at 11:50 AM wangl...@geekplus.com.cn 
 wrote:

In one flink operator, i want to initialize multiple flink metrics according to 
message content. 
As the code below.

public void invoke(ObjectNode node, Context context) throws Exception {

String tableName = node.get("metadata").get("topic").asText();
Meter meter = getRuntimeContext().getMetricGroup().meter(tableName, new 
MeterView(10));
meter.markEvent();
log.info("### counter: " + meter.toString() + "\t" +  meter.getCount());


But in this way every invoke call will initialize a new metrics and the count 
will be from zero again.
How can i reuse the metric initialized before?

Thanks,
Lei


wangl...@geekplus.com.cn 



Re: Re: How to dynamically initialize flink metrics in invoke method and then reuse it?

2020-07-03 Thread wangl...@geekplus.com.cn
Hi Xintong, 

Yes, initializing the metric in the `open` method works, but it doesn't solve 
my problem. 
I want to initialize the metric with a name that is extracted from the record 
content. Only in the `invoke` method i can do it.

Actually my scenario is as follows.
The record is MySQL binlog info.  I want to monitor the qps by tableName. The 
tableName is different for every record. 

Thanks,
Lei




wangl...@geekplus.com.cn 

Sender: Xintong Song
Send Time: 2020-07-03 13:14
Receiver: wangl...@geekplus.com.cn
cc: user
Subject: Re: How to dynamically initialize flink metrics in invoke method and 
then reuse it?
Hi Lei,

I think you should initialize the metric in the `open` method. Then you can 
save the initialized metric as a class field, and update it in the `invoke` 
method for each record.

Thank you~
Xintong Song


On Fri, Jul 3, 2020 at 11:50 AM wangl...@geekplus.com.cn 
 wrote:

In one flink operator, i want to initialize multiple flink metrics according to 
message content. 
As the code below.

public void invoke(ObjectNode node, Context context) throws Exception {

String tableName = node.get("metadata").get("topic").asText();
Meter meter = getRuntimeContext().getMetricGroup().meter(tableName, new 
MeterView(10));
meter.markEvent();
log.info("### counter: " + meter.toString() + "\t" +  meter.getCount());


But in this way every invoke call will initialize a new metrics and the count 
will be from zero again.
How can i reuse the metric initialized before?

Thanks,
Lei


wangl...@geekplus.com.cn 



How to dynamically initialize flink metrics in invoke method and then reuse it?

2020-07-02 Thread wangl...@geekplus.com.cn

In one flink operator, i want to initialize multiple flink metrics according to 
message content. 
As the code below.

public void invoke(ObjectNode node, Context context) throws Exception {

String tableName = node.get("metadata").get("topic").asText();
Meter meter = getRuntimeContext().getMetricGroup().meter(tableName, new 
MeterView(10));
meter.markEvent();
log.info("### counter: " + meter.toString() + "\t" +  meter.getCount());


But in this way every invoke call will initialize a new metrics and the count 
will be from zero again.
How can i reuse the metric initialized before?

Thanks,
Lei


wangl...@geekplus.com.cn 



Re: 回复:在一个 flink operator 中怎样根据消息内容动态生成多个监控指标?

2020-07-02 Thread wangl...@geekplus.com.cn
public void invoke(ObjectNode node, Context context) throws Exception {

String tableName = node.get("metadata").get("topic").asText();
Meter meter = getRuntimeContext().getMetricGroup().meter(tableName, new 
MeterView(10));
meter.markEvent();
log.info("### counter: " + meter.toString() + "\t" +  meter.getCount());

如上面代码所示,在 invoke 方法中解析得到 tableName, 以 tableName 名字作为 metrics.
但这样写每一消息下来了后相当于重新定义了 这个 metrics , 又从 0 开始计数了。

谢谢,
王磊


wangl...@geekplus.com.cn 

 
Sender: kcz
Send Time: 2020-07-03 09:13
Receiver: wanglei2
Subject: 回复:在一个 flink operator 中怎样根据消息内容动态生成多个监控指标?
按照你的描述 你就是少了tablename,那么你解析log 得到了tablename又做metric就好了吧



-- 原始邮件 --
发件人: 王磊2 
发送时间: 2020年7月2日 21:46
收件人: user-zh , 17610775726 <17610775...@163.com>
主题: 回复:在一个 flink operator 中怎样根据消息内容动态生成多个监控指标?


没有明白你说的实现方式。

我最终要得到类似的 Metrics:  myCounter_table1, myCounter_table2, ..., myCounter_tableX
但我看代码中 Metrics 的初始化都是在 open 方法中的,在这个方法中我没法得到 tableName 是什么。

谢谢,
王磊



--
发件人:JasonLee <17610775...@163.com>
发送时间:2020年7月2日(星期四) 21:12
收件人:user-zh 
主 题:回复:在一个 flink operator 中怎样根据消息内容动态生成多个监控指标?

你把tablename传到下面metric里不就行了吗


| |
JasonLee
|
|
邮箱:17610775...@163.com
|

Signature is customized by Netease Mail Master

在2020年07月02日 16:39,wangl...@geekplus.com.cn 写道:

全都是同一种类型的 metrics.
比如消息中是 mysql binlog 解析结果,我想要根据消息内容拿到 tableName, 按 tableName 生成不同名称的 metrics(但都是 
meter 类型)

谢谢,
王磊




wangl...@geekplus.com.cn


发件人: JasonLee
发送时间: 2020-07-02 16:16
收件人: user-zh
主题: 回复:在一个 flink operator 中怎样根据消息内容动态生成多个监控指标?
是要生成不同类型的metric吗 比如counter meter ?


| |
JasonLee
|
|
邮箱:17610775...@163.com
|

Signature is customized by Netease Mail Master

在2020年07月02日 15:34,wangl...@geekplus.com.cn 写道:

官网上的例子:

public class MyMapper extends RichMapFunction {
private transient Counter counter;
@Override
public void open(Configuration config) {
  this.counter = getRuntimeContext()
.getMetricGroup()
.counter("myCounter");
}
@Override
public String map(String value) throws Exception {
  this.counter.inc();
  return value;
}
}

我想要根据 map 方法中传入的参数生成不同的 监控指标,怎样可以实现呢?

谢谢,
王磊



wangl...@geekplus.com.cn




回复: 回复:在一个 flink operator 中怎样根据消息内容动态生成多个监控指标?

2020-07-02 Thread wangl...@geekplus.com.cn

全都是同一种类型的 metrics.
比如消息中是 mysql binlog 解析结果,我想要根据消息内容拿到 tableName, 按 tableName 生成不同名称的 metrics(但都是 
meter 类型) 

谢谢,
王磊




wangl...@geekplus.com.cn 


发件人: JasonLee
发送时间: 2020-07-02 16:16
收件人: user-zh
主题: 回复:在一个 flink operator 中怎样根据消息内容动态生成多个监控指标?
是要生成不同类型的metric吗 比如counter meter ?
 
 
| |
JasonLee
|
|
邮箱:17610775...@163.com
|
 
Signature is customized by Netease Mail Master
 
在2020年07月02日 15:34,wangl...@geekplus.com.cn 写道:
 
官网上的例子:
 
public class MyMapper extends RichMapFunction {
private transient Counter counter;
@Override
public void open(Configuration config) {
   this.counter = getRuntimeContext()
 .getMetricGroup()
 .counter("myCounter");
}
@Override
public String map(String value) throws Exception {
   this.counter.inc();
   return value;
}
}
 
我想要根据 map 方法中传入的参数生成不同的 监控指标,怎样可以实现呢?
 
谢谢,
王磊
 
 
 
wangl...@geekplus.com.cn
 


在一个 flink operator 中怎样根据消息内容动态生成多个监控指标?

2020-07-02 Thread wangl...@geekplus.com.cn

官网上的例子:

public class MyMapper extends RichMapFunction {
  private transient Counter counter;
  @Override
  public void open(Configuration config) {
this.counter = getRuntimeContext()
  .getMetricGroup()
  .counter("myCounter");
  }
  @Override
  public String map(String value) throws Exception {
this.counter.inc();
return value;
  }
}

我想要根据 map 方法中传入的参数生成不同的 监控指标,怎样可以实现呢?

谢谢,
王磊



wangl...@geekplus.com.cn



Re: Re: Flip-105 can the debezium/canal SQL sink to database directly?

2020-07-01 Thread wangl...@geekplus.com.cn
CREATE TABLE t_pick_order (
  order_no VARCHAR,
  status INT
  ) WITH (
  'connector' = 'kafka',
  'topic' = 'example',
  'scan.startup.mode' = 'latest-offset',
  'properties.bootstrap.servers' = '172.19.78.32:9092',
  'format' = 'canal-json'
   )
CREATE TABLE order_status (
  order_no VARCHAR,
  status INT, PRIMARY KEY (order_no) NOT ENFORCED
  ) WITH (
  'connector' = 'jdbc',
  'url' = 'jdbc:mysql://xxx:3306/flink_test',
  'table-name' = 'order_status',
  'username' = 'dev',
  'password' = ''
   )



But when i execute insert INTO order_status SELECT order_no, status FROM 
t_pick_order  
There's error: 

[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.TableException: Provided trait [BEFORE_AND_AFTER] 
can't satisfy required trait [ONLY_UPDATE_AFTER]. This is a bug in planner, 
please file an issue. 
Current node is TableSourceScan(table=[[default_catalog, default_database, 
t_pick_order]], fields=[order_no, status])




wangl...@geekplus.com.cn 
From: Danny Chan
Date: 2020-06-30 20:25
To: wangl...@geekplus.com.cn
Subject: Re: Re: Flip-105 can the debezium/canal SQL sink to database directly?
Hi, wanglei2 ~ 

For primary key syntax you can reference [1] for the “PRIMARY KEY” part, notice 
that currently we only support the NOT ENFORCED mode. Here is the reason:

>SQL standard specifies that a constraint can either be ENFORCED or NOT 
>ENFORCED. This controls if the constraint checks are performed on the 
>incoming/outgoing data. Flink does not own the data therefore the only mode we 
>want to support is the NOT ENFORCED mode. It is up to the user to ensure that 
>the query enforces key integrity.

For DDL to create JDBC table, you can reference [2]

[1] 
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/create.html#create-table
[2] 
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/jdbc.html#how-to-create-a-jdbc-table

Best, 
Danny Chan
在 2020年6月30日 +0800 AM10:25,wangl...@geekplus.com.cn 
,写道:
Thanks Jingsong,

Is there any document or example to this?
I will build the flink-1.11 package and have a try.

Thanks,
Lei  



wangl...@geekplus.com.cn 
 
From: Jingsong Li
Date: 2020-06-30 10:08
To: wangl...@geekplus.com.cn
CC: user
Subject: Re: Flip-105 can the debezium/canal SQL sink to database directly?
Hi Lei, 

INSERT INTO  jdbc_table SELECT * FROM changelog_table;

For Flink 1.11 new connectors, you need to define the primary key for 
jdbc_table (and also your mysql table needs to have the corresponding primary 
key) because changelog_table has the "update", "delete" records.

And then, jdbc sink will:
- insert or update using "INSERT INTO ... ON DUPLICATE KEY UPDATE ." to deal 
with "insert" and "update" messages.
- delete to deal with "delete" messages.

So generally speaking, with the primary key, this mysql table will be the same 
to your source database table. (table for generating changelog)

Best,
Jingsong

On Tue, Jun 30, 2020 at 9:58 AM wangl...@geekplus.com.cn 
 wrote:

CREATE TABLE my_table (
  id BIGINT,
  first_name STRING,
  last_name STRING,
  email STRING
) WITH (
 'connector'='kafka',
 'topic'='user_topic',
 'properties.bootstrap.servers'='localhost:9092',
 'scan.startup.mode'='earliest-offset',
 'format'='debezium-json'
);
INSERT INTO  mysql_sink_table SELECT id, first_name, last_name FROM  my_table;

What will happen after  i execute the insert sql statement? For the 
update/delete message from kafka, the corresponding record will be updated or 
deleted in the mysql_sink_table?
INSERT INTO  mysql_sink_table SELECT id, first_name, last_name FROM  my_table; 

Thanks,
Lei



wangl...@geekplus.com.cn 



--
Best, Jingsong Lee


Re: Re: Flip-105 can the debezium/canal SQL sink to database directly?

2020-06-29 Thread wangl...@geekplus.com.cn
Thanks Jingsong,

Is there any document or example to this?
I will build the flink-1.11 package and have a try.

Thanks,
Lei  



wangl...@geekplus.com.cn 
 
From: Jingsong Li
Date: 2020-06-30 10:08
To: wangl...@geekplus.com.cn
CC: user
Subject: Re: Flip-105 can the debezium/canal SQL sink to database directly?
Hi Lei,

INSERT INTO  jdbc_table SELECT * FROM changelog_table;

For Flink 1.11 new connectors, you need to define the primary key for 
jdbc_table (and also your mysql table needs to have the corresponding primary 
key) because changelog_table has the "update", "delete" records.

And then, jdbc sink will:
- insert or update using "INSERT INTO ... ON DUPLICATE KEY UPDATE ." to deal 
with "insert" and "update" messages.
- delete to deal with "delete" messages.

So generally speaking, with the primary key, this mysql table will be the same 
to your source database table. (table for generating changelog)

Best,
Jingsong

On Tue, Jun 30, 2020 at 9:58 AM wangl...@geekplus.com.cn 
 wrote:

CREATE TABLE my_table (
  id BIGINT,
  first_name STRING,
  last_name STRING,
  email STRING
) WITH (
 'connector'='kafka',
 'topic'='user_topic',
 'properties.bootstrap.servers'='localhost:9092',
 'scan.startup.mode'='earliest-offset',
 'format'='debezium-json'
);
INSERT INTO  mysql_sink_table SELECT id, first_name, last_name FROM  my_table;

What will happen after  i execute the insert sql statement? For the 
update/delete message from kafka, the corresponding record will be updated or 
deleted in the mysql_sink_table?
INSERT INTO  mysql_sink_table SELECT id, first_name, last_name FROM  my_table; 

Thanks,
Lei



wangl...@geekplus.com.cn 



-- 
Best, Jingsong Lee


Flip-105 can the debezium/canal SQL sink to database directly?

2020-06-29 Thread wangl...@geekplus.com.cn

CREATE TABLE my_table (
  id BIGINT,
  first_name STRING,
  last_name STRING,
  email STRING
) WITH (
 'connector'='kafka',
 'topic'='user_topic',
 'properties.bootstrap.servers'='localhost:9092',
 'scan.startup.mode'='earliest-offset',
 'format'='debezium-json'
);
INSERT INTO  mysql_sink_table SELECT id, first_name, last_name FROM  my_table;

What will happen after  i execute the insert sql statement? For the 
update/delete message from kafka, the corresponding record will be updated or 
deleted in the mysql_sink_table?
INSERT INTO  mysql_sink_table SELECT id, first_name, last_name FROM  my_table; 

Thanks,
Lei



wangl...@geekplus.com.cn 



????: ??????Flink State ?????????? state ????????????

2020-06-09 Thread wangl...@geekplus.com.cn

OrderState 
 get set ?? 

 flink ??OrderState??



wang...@geekplus.com.cn

 1048262223
?? 2020-06-09 18:11
 user-zh
?? ??Flink State ?? state 
Hi
 
 
flink??OrderStatepojo??savepoint??
 
Best,
Yichao Yang
 
 
 
 
----
??:"wangl...@geekplus.com.cn"

Flink State 增加字段后 state 还能识别吗?

2020-06-09 Thread wangl...@geekplus.com.cn

写了个简单的类会在 Flink State 中使用:

public class OrderState {
private Integer warehouseId;
private String orderNo;
private String ownerCode;
private Long inputDate;
private int orderType;
private int amount = 0;
private int status = 0;
.
}


现在程序要升级,这个类还要增加一个新的字段。从state 能正常恢复吗?
也就是 flink run -s   savepointdir   后能正常识别旧的代码保存的 state 吗?

谢谢,
王磊



wangl...@geekplus.com.cn


回复: Re: java.lang.AbstractMethodError when implementing KafkaSerializationSchema

2020-05-30 Thread wangl...@geekplus.com.cn

It is because the jar conflict and i have fixed it. 

I put  flink-connector-kafka_2.11-1.10.0.jar in the flink lib directory. 
Also in my project pom file has the dependency  flink-connector-kafka and 
builded as a fat jar

Thanks,
Lei



wangl...@geekplus.com.cn

 
发件人: Leonard Xu
发送时间: 2020-05-26 15:47
收件人: Aljoscha Krettek
抄送: user
主题: Re: java.lang.AbstractMethodError when implementing KafkaSerializationSchema
Hi,wanglei

I think Aljoscha is wright. Could you post your denpendency list?
Dependency flink-connector-kafka is used in dataStream Application which you 
should use, dependency flink-sql-connector-kafka is used in Table API & SQL 
Application. We should only add one of them because the two dependency will 
conflict.   

Best,
Leonard Xu

在 2020年5月26日,15:02,Aljoscha Krettek  写道:

I think what might be happening is that you're mixing dependencies from the 
flink-sql-connector-kafka and the proper flink-connector-kafka that should be 
used with the DataStream API. Could that be the case?

Best,
Aljoscha

On 25.05.20 19:18, Piotr Nowojski wrote:
Hi,
It would be helpful if you could provide full stack trace, what Flink version 
and which Kafka connector version are you using?
It sounds like either a dependency convergence error (mixing Kafka 
dependencies/various versions of flink-connector-kafka inside a single job/jar) 
or some shading issue. Can you check your project for such issues (`mvn 
dependency:tree` command [1]).
Also what’s a bit suspicious for me is the return type:
Lorg/apache/flink/kafka/shaded/org/apache/kafka/clients/producer/ProducerRecord;
I’m not sure, but I was not aware that we are shading Kafka dependency in our 
connectors? Are you manually shading something?
Piotrek
[1] 
https://maven.apache.org/plugins/maven-dependency-plugin/examples/resolving-conflicts-using-the-dependency-tree.html
 
<https://maven.apache.org/plugins/maven-dependency-plugin/examples/resolving-conflicts-using-the-dependency-tree.html>
On 22 May 2020, at 15:34, wangl...@geekplus.com.cn wrote:


public class MyKafkaSerializationSchema implements 
KafkaSerializationSchema> {
@Override
public ProducerRecord serialize(Tuple2 o, 
@Nullable Long aLong) {
ProducerRecord record = new ProducerRecord<>(o.f0,
o.f1.getBytes(StandardCharsets.UTF_8));
return record;
}
}
FlinkKafkaProducer> producer = new 
FlinkKafkaProducer>(
"default", new MyKafkaSerializationSchema(),
prop2,Semantic.EXACTLY_ONCE);

But there's  error when runnng:

java.lang.AbstractMethodError: 
com.geekplus.flinketl.schema.MyKafkaSerializationSchema.serialize(Ljava/lang/Object;Ljava/lang/Long;)Lorg/apache/flink/kafka/shaded/org/apache/kafka/clients/producer/ProducerRecord;

Any suggestion on this?

Thanks,
Lei
wangl...@geekplus.com.cn <mailto:wangl...@geekplus.com.cn>




java.lang.AbstractMethodError when implementing KafkaSerializationSchema

2020-05-22 Thread wangl...@geekplus.com.cn

public class MyKafkaSerializationSchema implements 
KafkaSerializationSchema> {
@Override
public ProducerRecord serialize(Tuple2 o, 
@Nullable Long aLong) {
ProducerRecord record = new ProducerRecord<>(o.f0,
o.f1.getBytes(StandardCharsets.UTF_8));
return record;
}
}
FlinkKafkaProducer> producer = new 
FlinkKafkaProducer>(
"default", new MyKafkaSerializationSchema(),
prop2,Semantic.EXACTLY_ONCE);

But there's  error when runnng:

java.lang.AbstractMethodError: 
com.geekplus.flinketl.schema.MyKafkaSerializationSchema.serialize(Ljava/lang/Object;Ljava/lang/Long;)Lorg/apache/flink/kafka/shaded/org/apache/kafka/clients/producer/ProducerRecord;

Any suggestion on this?

Thanks,
Lei


wangl...@geekplus.com.cn 



flink how to access remote hdfs using namenode nameservice

2020-05-07 Thread wangl...@geekplus.com.cn

According to 
https://ci.apache.org/projects/flink/flink-docs-stable/ops/jobmanager_high_availability.html
 

I am deploying standalone  cluster with jobmanager HA and need the hdfs 
address:  

high-availability.storageDir: hdfs:///flink/recovery  

My hadoop is a remote cluster. I can write it as  
hdfs://active-namenode-ip:8020. But this way lost namenode HA

Is there's any method that I can config it as hdfs://name-service:8020 

Thanks,
Lei




wangl...@geekplus.com.cn 



Re: 回复:在已有 Hadoop 外搭建 standalone 模式 HA flink 集群

2020-05-07 Thread wangl...@geekplus.com.cn

我试了下是可以的,但现在有一个访问 HDFS 的问题。

我用的 hadoop 是阿里云 EMR 管理, 在 EMR 管理的机器上可以以 hdfs://emr-cluster:8020/ 访问 HDFS

但我部署的 Flink 不属于 EMR 管理,这个地址是不能解析的,我只能写成  hdfs://active-namenode-ip:8020/ 
的形式,NameNode 丧失了 HA 的功能

有什么方式解决这个问题吗?

谢谢,
王磊



wangl...@geekplus.com.cn

Sender: Andrew
Send Time: 2020-05-07 12:31
Receiver: user-zh
Subject: 回复:在已有 Hadoop 外搭建 standalone 模式 HA flink 集群
https://ci.apache.org/projects/flink/flink-docs-stable/ops/jobmanager_high_availability.html
 
 
 
 
---原始邮件---
发件人: "wangl...@geekplus.com.cn"

回复: 在已有 Hadoop 外搭建 standalone 模式 HA flink 集群

2020-05-06 Thread wangl...@geekplus.com.cn

看起来这个文档可以,我先试下:

https://ci.apache.org/projects/flink/flink-docs-stable/ops/jobmanager_high_availability.html



wangl...@geekplus.com.cn

发件人: wangl...@geekplus.com.cn
发送时间: 2020-05-07 12:23
收件人: user-zh
主题: 在已有 Hadoop 外搭建 standalone 模式 HA flink 集群
 
现在已经有了一个 Hadoop 集群。
我想在这个 集群外(不同的机器,网络互通)部署一个 standalone 模式的 flink cluster,配置 jobmanager 的 
HA,访问和使用已有 Hadoop 的 HDFS,使用已有 Hadoop 的 zookeeper 
 
这个有参考文档怎么操作吗?
 
谢谢,
王磊
 
 
 
wangl...@geekplus.com.cn 
 


Re: Re:FlinkSQL Retraction 问题原理咨询

2020-05-05 Thread wangl...@geekplus.com.cn

Thanks  Jingsong Lee.

我用的是 MySQL,sink 表中没有任何主键或唯一键.
如果 sink 表设主键或唯一键,确实能达到只保留两条记录的效果。

我把 flink sql-client 客户端设置 SET execution.result-mode=changelog 试验了下,左边标上了是第几条 
kafka 消息导致的行为:

 +/- tms_company order_cnt 1 + zhongtong 1 2 - zhongtong 1 2 + yuantong 1 3 - 
yuantong 1 3 + yuantong 2 4 - yuantong 2 4 + yuantong 1 4 + zhongtong 1

第1条消息:执行一个 INSERT 
第2条消息:执行了 一个 DELETE, 一个  INSERT 
第3条消息:执行了一个  INSERT ON DUPLICATE UPDATE
第4条消息:执行了两个  INSERT ON DUPLICATE UPDATE


我总结这个逻辑应该是如果 回撤导致的结果变成 0 ,就会执行 delete , 否则就是INSERT ON DUPLICATE UPDATE

不知道我这样理解是否正确。

谢谢,
王磊




wangl...@geekplus.com.cn

Sender: Jingsong Li
Send Time: 2020-05-06 11:35
Receiver: user-zh
Subject: Re: Re:FlinkSQL Retraction 问题原理咨询
Hi,
 
问题一:删除数据可不单单只是retract stream的功能。upsert
stream是当下游具有按key覆盖的功能时的特殊优化,除了按key覆盖外,它也需要在上游retract时删除数据,意思是upsert
stream也有retract的input数据的。JDBC实现的是upsert stream的消费。
 
问题二:正确数据应该是:
1  {"order_id":1,"tms_company":"zhongtong"}   数据库1条记录: zhongtong 1
2  {"order_id":1,"tms_company":"yuantong"}  数据库1条记录: yuantong 1  ( 删除
zhongtong 1)
3  {"order_id":2,"tms_company":"yuantong"} 数据库1条记录: yuantong 2  (
删除yuantong 1)
4  {"order_id":2,"tms_company":"zhongtong"}   数据库2条记录: yuantong 1,
zhongtong 1( 删除yuantong 2)
 
你用了什么dialect?是不是mysql?
Flink JDBC的Mysql用了DUPLICATE KEY UPDATE的语法来更新数据。
看起来这个语法在RDS没有建主键或者唯一键时可能不会去覆盖老数据?尝试创建下主键或唯一建?
 
Best,
Jingsong Lee
 
On Wed, May 6, 2020 at 10:36 AM wangl...@geekplus.com.cn <
wangl...@geekplus.com.cn> wrote:
 
>
> 更新键是 tms_company, 但这是通过双层的 group 实现了回退功能,总共就 两个 order_id, order_id 对应的
> tms_company 是有变化的。
> 我不是很明白这种回退的具体原理,为什么有的会删除,有的就没有删除。
>
>
>
>
> wangl...@geekplus.com.cn
>
> 发件人: Michael Ran
> 发送时间: 2020-04-30 17:23
> 收件人: user-zh
> 主题: Re:FlinkSQL Retraction 问题原理咨询
>
>
>
> 指定的更新键是tms_company?
>
>
> 结果是:
> yuantong:2
> zhongtong:2
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-04-30 17:08:22,"wangl...@geekplus.com.cn" 
> 写道:
> >
> >自己实现了一下 https://yq.aliyun.com/articles/457392/ 菜鸟物流订单统计的例子,读 kafka 写到
> RDS, RDS 表没有主键,也没有唯一键。
> >
> >INSERT INTO table_out select  tms_company,  count(distinct order_id) as
> order_cnt from
> >(select order_id, LAST_VALUE(tms_company) AS tms_company from
> dwd_table group by order_id)
> > group by tms_company;
> >
> >
> >总共发送了 4 条消息,顺序如下:
> >
> >1  {"order_id":1,"tms_company":"zhongtong"}   数据库1条记录: zhongtong 1
> >
> >2  {"order_id":1,"tms_company":"yuantong"}  数据库1条记录: yuantong 1
> (上一条记录被删除了)
> >
> >3  {"order_id":2,"tms_company":"yuantong"} 数据库2条记录: yuantong 1,
> yuantong 2  (增加了条记录,没有删除)
> >
> >4  {"order_id":2,"tms_company":"zhongtong"}   数据库4条记录: yuantong 1,
> yuantong 2, yuantong 1, zhongtong 1(增加了两条记录,没有删除)
> >
> >
> >问题一:
> >第 2 条消息发送后,数据库的上一条记录被删除了。我的理解这应该是 RetractStream 的功能。当我看源码
> https://github.com/apache/flink/tree/master/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc
> 这里也没有 RetractionStream 的实现,哪里的代码把他删除了呢?
> >
> >问题二:
> >   第 3 条记录来了后,直接在数据库增加了 一条 yuantong 2, 为什么没把 yuantong 1, 删除呢?
> >   第 4 条记录来了后,又在数据库里加了两条记录,为什么也没有删除之前的呢?
> >
> >谢谢,
> >王磊
> >
> >
> >
> >wangl...@geekplus.com.cn
>
 
 
-- 
Best, Jingsong Lee


FlinkSQL Retraction 问题原理咨询

2020-04-30 Thread wangl...@geekplus.com.cn

自己实现了一下 https://yq.aliyun.com/articles/457392/ 菜鸟物流订单统计的例子,读 kafka 写到 RDS, RDS 
表没有主键,也没有唯一键。

INSERT INTO table_out select  tms_company,  count(distinct order_id) as 
order_cnt from 
(select order_id, LAST_VALUE(tms_company) AS tms_company from dwd_table 
group by order_id) 
 group by tms_company;


总共发送了 4 条消息,顺序如下:

1  {"order_id":1,"tms_company":"zhongtong"}   数据库1条记录: zhongtong 1

2  {"order_id":1,"tms_company":"yuantong"}  数据库1条记录: yuantong 1  (上一条记录被删除了)

3  {"order_id":2,"tms_company":"yuantong"} 数据库2条记录: yuantong 1, yuantong 2  
(增加了条记录,没有删除)

4  {"order_id":2,"tms_company":"zhongtong"}   数据库4条记录: yuantong 1, yuantong 2, 
yuantong 1, zhongtong 1(增加了两条记录,没有删除)


问题一:
第 2 条消息发送后,数据库的上一条记录被删除了。我的理解这应该是 RetractStream 的功能。当我看源码 
https://github.com/apache/flink/tree/master/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc
 这里也没有 RetractionStream 的实现,哪里的代码把他删除了呢?

问题二:
   第 3 条记录来了后,直接在数据库增加了 一条 yuantong 2, 为什么没把 yuantong 1, 删除呢?
   第 4 条记录来了后,又在数据库里加了两条记录,为什么也没有删除之前的呢?

谢谢,
王磊



wangl...@geekplus.com.cn 


Re: Re: 1.11 snapshot: Name or service not knownname localhost and taskMgr not started

2020-04-29 Thread wangl...@geekplus.com.cn

localhost is available on my CentOS machine.  I can start the cluster(including 
taskMgr) with start-cluster.sh when using flink-1.10 release tgz package
change the slave file localhost to 127.0.0.1 can resolve the problem. But it 
request the password of the host.
Not change  slave file, i can first start-cluster(only job manager started). 
And then “taskmanager.sh start“  to start the taskMgr. No password needed.

Thanks,
Lei



wangl...@geekplus.com.cn 

 
Sender: Xintong Song
Send Time: 2020-04-30 12:13
Receiver: wangl...@geekplus.com.cn
cc: user
Subject: Re: 1.11 snapshot: Name or service not knownname localhost and taskMgr 
not started
Hi Lei,

Could you check whether the hostname 'localhost' is available on your CentOS 
machine? This is usually defined in "/etc/hosts".
You can also try to modify the slaves file, replacing 'localhost' with 
'127.0.0.1'. The path is: /conf/slaves

Thank you~
Xintong Song


On Thu, Apr 30, 2020 at 11:38 AM wangl...@geekplus.com.cn 
 wrote:

1 Clone the 1.11 snapshot repository
2 Build it on windowns 
3 Scp the flink-dist/target/flink-1.11-SNAPSHOT-bin/flink-1.11-SNAPSHOT dir to 
a CentOS  server
4 ./bin/start-cluster.sh on the CentOS server 


There's message: 
Starting cluster.
Starting standalonesession daemon on host test_3.6.
: Name or service not knownname localhost

Only the jobMgr is started. The taskMgr not start 

Any insight on this?

Thanks
Lei



wangl...@geekplus.com.cn 


1.11 snapshot: Name or service not knownname localhost and taskMgr not started

2020-04-29 Thread wangl...@geekplus.com.cn

1 Clone the 1.11 snapshot repository
2 Build it on windowns 
3 Scp the flink-dist/target/flink-1.11-SNAPSHOT-bin/flink-1.11-SNAPSHOT dir to 
a CentOS  server
4 ./bin/start-cluster.sh on the CentOS server 


There's message: 
Starting cluster.
Starting standalonesession daemon on host test_3.6.
: Name or service not knownname localhost

Only the jobMgr is started. The taskMgr not start 

Any insight on this?

Thanks
Lei



wangl...@geekplus.com.cn 


回复: 回复: FlinkSQL Upsert/Retraction 写入 MySQL 的问题

2020-04-27 Thread wangl...@geekplus.com.cn

Thanks Leonard, 

JDBCUpsertTableSink 按照 Upsert 的方式处理,实际执行的 SQL 语句是 INSERT INTO  ON DUPLICATE KEY 
吗? 
这个在源代码哪个地方呢?

谢谢,
王磊



wangl...@geekplus.com.cn 

 
发件人: Leonard Xu
发送时间: 2020-04-27 12:58
收件人: user-zh
主题: Re: FlinkSQL Upsert/Retraction 写入 MySQL 的问题
Hi,wanglei
 
> INSERT INTO  mysql_sink SELECT  f1, count(*) FROM kafka_src GROUP BY f1
> 每从 kafka 过来一条新的记录,会生成两条记录 Tuple2, 旧的被删除,新的会添加上。
这是query是会一个会产生retract stream的query,可以简单理解成每条kafka的数据过来会产生两条记录,但是最终写入下游的系统
需要看下游的系统支持和实现的sink(现在有三种sink AppendStreamSink, UpsertStreamSink, 
RetractStreamSink)
 
> 我看 
> https://github.com/apache/flink/tree/master/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc
>  没有 Retract 方式
> 实际上使用了 JDBCUpsertTableSink.java 的代码写入 MySQL 吗?
现有的sink中,kafka是实现的AppendStreamSink,所以只支持insert 的记录,不支持retract.
你用DDL声明的mysql表,对应的jdbc sink 是JDBCUpsertTableSink,所以会按照Upsert的逻辑处理, 也不支持retract。
 
> 如若不带 group by 直接:
> INSERT INTO  mysql_sink SELECT  f1,  f2 FROM kafka_src
> 主键冲突写入 mysql 是会出错的,怎么可以用 Upsert 的方式直接覆盖呢?
 
不带 group by时无法推导出query的 unique key,没法做按照unique key的更新,
只需要将 query的 key (你这里是group by 后的字段)和db中主键保持一致即可 
 
Best,
 
Leonard Xu


FlinkSQL Upsert/Retraction 写入 MySQL 的问题

2020-04-26 Thread wangl...@geekplus.com.cn


INSERT INTO  mysql_sink SELECT  f1, count(*) FROM kafka_src GROUP BY f1

每从 kafka 过来一条新的记录,会生成两条记录 Tuple2, 旧的被删除,新的会添加上。
但这个 Sink 到底是用到了 UpsertStream 还是 RetractStream 呢,怎么判断是 UpsertStream 还是 
RetractStream 呢?

我看 
https://github.com/apache/flink/tree/master/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc
 没有 Retract 方式
实际上使用了 JDBCUpsertTableSink.java 的代码写入 MySQL 吗?



如若不带 group by 直接:
INSERT INTO  mysql_sink SELECT  f1,  f2 FROM kafka_src
主键冲突写入 mysql 是会出错的,怎么可以用 Upsert 的方式直接覆盖呢?




wangl...@geekplus.com.cn 



FlinkSQL Upsert/Retraction 写入 MySQL 的问题

2020-04-26 Thread wangl...@geekplus.com.cn


INSERT INTO  mysql_sink SELECT  f1, count(*) FROM kafka_src GROUP BY f1

每从 kafka 过来一条新的记录,会生成两条记录 Tuple2, 旧的被删除,新的会添加上。
但这个 Sink 到底是用到了 UpsertStream 还是 RetractStream 呢,怎么判断是 UpsertStream 还是 
RetractStream 呢?

我看 
https://github.com/apache/flink/tree/master/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc
 没有 Retract 方式
实际上使用了 JDBCUpsertTableSink.java 的代码写入 MySQL 吗?



如若不带 group by 直接:
INSERT INTO  mysql_sink SELECT  f1,  f2 FROM kafka_src
主键冲突写入 mysql 是会出错的,怎么可以用 Upsert 的方式直接覆盖呢?





wangl...@geekplus.com.cn



Re: Re: FlinkSQL query error when specify json-schema.

2020-04-16 Thread wangl...@geekplus.com.cn

Thanks, I have tried. 

 'format.derive-schema' = 'true' will work.

But if i insist to use format.json-schema,  the CREATE TABLE must be writtten 
as: 

`id` DECIMAL(38,18),
`timestamp` DECIMAL(38,18)



wangl...@geekplus.com.cn 

 
From: Benchao Li
Date: 2020-04-16 16:56
To: wangl...@geekplus.com.cn
CC: user
Subject: Re: FlinkSQL query error when specify json-schema.
Hi wanglei,

You don't need to specify 'format.json-schema', the format can derive schema 
from the DDL.
Your exception above means the schema in 'format.json-schema' and DDL are not 
match.

wangl...@geekplus.com.cn  于2020年4月16日周四 下午4:21写道:

CREATE TABLE user_log(
`id` INT,
`timestamp`  BIGINT
) WITH (
'connector.type' = 'kafka',
'connector.version' = 'universal',
'connector.topic' = 'wanglei_jsontest',
'connector.startup-mode' = 'latest-offset',
'connector.properties.0.key' = 'zookeeper.connect',
'connector.properties.0.value' = '172.19.78.32:2181',
'connector.properties.1.key' = 'bootstrap.servers',
'connector.properties.1.value' = '172.19.78.32:9092',
'update-mode' = 'append',
'format.type' = 'json',
'format.json-schema' = '{
"type": "object",
"properties": {
   "id": {"type": "integer"},
   "timestamp": {"type": "number"}
}
}'
);

Then select * from user_log;

org.apache.flink.table.api.ValidationException: Type INT of table field 'id' 
does not match with the physical type LEGACY('DECIMAL', 'DECIMAL') of the 'id' 
field of the TableSource return type.

Seems the specified type "integer", "number" can not be mapped to  INT, BIGINT 

How can i solve this problem?

Thanks,
Lei



wangl...@geekplus.com.cn 



-- 
Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenc...@gmail.com; libenc...@pku.edu.cn


FlinkSQL query error when specify json-schema.

2020-04-16 Thread wangl...@geekplus.com.cn

CREATE TABLE user_log(
`id` INT,
`timestamp`  BIGINT
) WITH (
'connector.type' = 'kafka',
'connector.version' = 'universal',
'connector.topic' = 'wanglei_jsontest',
'connector.startup-mode' = 'latest-offset',
'connector.properties.0.key' = 'zookeeper.connect',
'connector.properties.0.value' = '172.19.78.32:2181',
'connector.properties.1.key' = 'bootstrap.servers',
'connector.properties.1.value' = '172.19.78.32:9092',
'update-mode' = 'append',
'format.type' = 'json',
'format.json-schema' = '{
"type": "object",
"properties": {
   "id": {"type": "integer"},
   "timestamp": {"type": "number"}
}
}'
);

Then select * from user_log;

org.apache.flink.table.api.ValidationException: Type INT of table field 'id' 
does not match with the physical type LEGACY('DECIMAL', 'DECIMAL') of the 'id' 
field of the TableSource return type.

Seems the specified type "integer", "number" can not be mapped to  INT, BIGINT 

How can i solve this problem?

Thanks,
Lei



wangl...@geekplus.com.cn 



Re: Re: fink sql client not able to read parquet format table

2020-04-10 Thread wangl...@geekplus.com.cn

https://issues.apache.org/jira/browse/FLINK-17086

It is my first time to create a flink jira issue. 
Just point it out and correct it if I write something wrong.

Thanks,
Lei



wangl...@geekplus.com.cn 

 
From: Jingsong Li
Date: 2020-04-10 11:03
To: wangl...@geekplus.com.cn
CC: Jark Wu; lirui; user
Subject: Re: Re: fink sql client not able to read parquet format table
Hi lei,

I think the reason is that our `HiveMapredSplitReader` not supports name 
mapping reading for parquet format.
Can you create a JIRA for tracking this?

Best,
Jingsong Lee

On Fri, Apr 10, 2020 at 9:42 AM wangl...@geekplus.com.cn 
 wrote:

I am using Hive 3.1.1 
The table has many fields, each field is corresponded to a feild in the 
RobotUploadData0101 class.

CREATE TABLE `robotparquet`(`robotid` int,   `framecount` int,   `robottime` 
bigint,   `robotpathmode` int,   `movingmode` int,   `submovingmode` int,   
`xlocation` int,   `ylocation` int,   `robotradangle` int,   `velocity` int,   
`acceleration` int,   `angularvelocity` int,   `angularacceleration` int,   
`literangle` int,   `shelfangle` int,   `onloadshelfid` int,   `rcvdinstr` int, 
  `sensordist` int,   `pathstate` int,   `powerpresent` int,   `neednewpath` 
int,   `pathelenum` int,   `taskstate` int,   `receivedtaskid` int,   
`receivedcommcount` int,   `receiveddispatchinstr` int,   
`receiveddispatchcount` int,   `subtaskmode` int,   `versiontype` int,   
`version` int,   `liftheight` int,   `codecheckstatus` int,   `cameraworkmode` 
int,   `backrimstate` int,   `frontrimstate` int,   `pathselectstate` int,   
`codemisscount` int,   `groundcameraresult` int,   `shelfcameraresult` int,   
`softwarerespondframe` int,   `paramstate` int,   `pilotlamp` int,   
`codecount` int,   `dist2waitpoint` int,   `targetdistance` int,   
`obstaclecount` int,   `obstacleframe` int,   `cellcodex` int,   `cellcodey` 
int,   `cellangle` int,   `shelfqrcode` int,   `shelfqrangle` int,   `shelfqrx` 
int,   `shelfqry` int,   `trackthetaerror` int,   `tracksideerror` int,   
`trackfuseerror` int,   `lifterangleerror` int,   `lifterheighterror` int,   
`linearcmdspeed` int,   `angluarcmdspeed` int,   `liftercmdspeed` int,   
`rotatorcmdspeed` int) PARTITIONED BY (`hour` string) STORED AS parquet; 


Thanks,
Lei


wangl...@geekplus.com.cn 

 
From: Jingsong Li
Date: 2020-04-09 21:45
To: wangl...@geekplus.com.cn
CC: Jark Wu; lirui; user
Subject: Re: Re: fink sql client not able to read parquet format table
Hi lei,

Which hive version did you use?
Can you share the complete hive DDL?

Best,
Jingsong Lee

On Thu, Apr 9, 2020 at 7:15 PM wangl...@geekplus.com.cn 
 wrote:

I am using the newest 1.10 blink planner.

Perhaps it is because of the method i used to write the parquet file.

Receive kafka message, transform each message to a Java class Object, write the 
Object to HDFS using StreamingFileSink, add  the HDFS path as a partition of 
the hive table

No matter what the order of the field description in  hive ddl statement, the 
hive client will work, as long as  the field name is the same with Java Object 
field name.
But flink sql client will not work.

DataStream sourceRobot = source.map( x->transform(x));
final StreamingFileSink sink;
sink = StreamingFileSink
.forBulkFormat(new 
Path("hdfs://172.19.78.38:8020/user/root/wanglei/robotdata/parquet"),
ParquetAvroWriters.forReflectRecord(RobotUploadData0101.class))
For example 
RobotUploadData0101 has two fields:  robotId int, robotTime long

CREATE TABLE `robotparquet`(  `robotid` int,  `robottime` bigint ) and 
CREATE TABLE `robotparquet`(  `robottime` bigint,   `robotid` int)
is the same for hive client, but is different for flink-sql client

It is an expected behavior?

Thanks,
Lei



wangl...@geekplus.com.cn 

 
From: Jark Wu
Date: 2020-04-09 14:48
To: wangl...@geekplus.com.cn; Jingsong Li; lirui
CC: user
Subject: Re: fink sql client not able to read parquet format table
Hi Lei,

Are you using the newest 1.10 blink planner? 

I'm not familiar with Hive and parquet, but I know @Jingsong Li and 
@li...@apache.org are experts on this. Maybe they can help on this question. 

Best,
Jark

On Tue, 7 Apr 2020 at 16:17, wangl...@geekplus.com.cn 
 wrote:

Hive table stored as parquet.

Under hive client: 
hive> select robotid from robotparquet limit 2;
OK
1291097
1291044


But under flink sql-client the result is 0
Flink SQL> select robotid  from robotparquet limit 2;
  robotid
 0
 0

Any insight on this?

Thanks,
Lei





wangl...@geekplus.com.cn 



-- 
Best, Jingsong Lee


-- 
Best, Jingsong Lee


Re: Re: fink sql client not able to read parquet format table

2020-04-09 Thread wangl...@geekplus.com.cn

I am using Hive 3.1.1 
The table has many fields, each field is corresponded to a feild in the 
RobotUploadData0101 class.

CREATE TABLE `robotparquet`(`robotid` int,   `framecount` int,   `robottime` 
bigint,   `robotpathmode` int,   `movingmode` int,   `submovingmode` int,   
`xlocation` int,   `ylocation` int,   `robotradangle` int,   `velocity` int,   
`acceleration` int,   `angularvelocity` int,   `angularacceleration` int,   
`literangle` int,   `shelfangle` int,   `onloadshelfid` int,   `rcvdinstr` int, 
  `sensordist` int,   `pathstate` int,   `powerpresent` int,   `neednewpath` 
int,   `pathelenum` int,   `taskstate` int,   `receivedtaskid` int,   
`receivedcommcount` int,   `receiveddispatchinstr` int,   
`receiveddispatchcount` int,   `subtaskmode` int,   `versiontype` int,   
`version` int,   `liftheight` int,   `codecheckstatus` int,   `cameraworkmode` 
int,   `backrimstate` int,   `frontrimstate` int,   `pathselectstate` int,   
`codemisscount` int,   `groundcameraresult` int,   `shelfcameraresult` int,   
`softwarerespondframe` int,   `paramstate` int,   `pilotlamp` int,   
`codecount` int,   `dist2waitpoint` int,   `targetdistance` int,   
`obstaclecount` int,   `obstacleframe` int,   `cellcodex` int,   `cellcodey` 
int,   `cellangle` int,   `shelfqrcode` int,   `shelfqrangle` int,   `shelfqrx` 
int,   `shelfqry` int,   `trackthetaerror` int,   `tracksideerror` int,   
`trackfuseerror` int,   `lifterangleerror` int,   `lifterheighterror` int,   
`linearcmdspeed` int,   `angluarcmdspeed` int,   `liftercmdspeed` int,   
`rotatorcmdspeed` int) PARTITIONED BY (`hour` string) STORED AS parquet; 


Thanks,
Lei


wangl...@geekplus.com.cn 

 
From: Jingsong Li
Date: 2020-04-09 21:45
To: wangl...@geekplus.com.cn
CC: Jark Wu; lirui; user
Subject: Re: Re: fink sql client not able to read parquet format table
Hi lei,

Which hive version did you use?
Can you share the complete hive DDL?

Best,
Jingsong Lee

On Thu, Apr 9, 2020 at 7:15 PM wangl...@geekplus.com.cn 
 wrote:

I am using the newest 1.10 blink planner.

Perhaps it is because of the method i used to write the parquet file.

Receive kafka message, transform each message to a Java class Object, write the 
Object to HDFS using StreamingFileSink, add  the HDFS path as a partition of 
the hive table

No matter what the order of the field description in  hive ddl statement, the 
hive client will work, as long as  the field name is the same with Java Object 
field name.
But flink sql client will not work.

DataStream sourceRobot = source.map( x->transform(x));
final StreamingFileSink sink;
sink = StreamingFileSink
.forBulkFormat(new 
Path("hdfs://172.19.78.38:8020/user/root/wanglei/robotdata/parquet"),
ParquetAvroWriters.forReflectRecord(RobotUploadData0101.class))
For example 
RobotUploadData0101 has two fields:  robotId int, robotTime long

CREATE TABLE `robotparquet`(  `robotid` int,  `robottime` bigint ) and 
CREATE TABLE `robotparquet`(  `robottime` bigint,   `robotid` int)
is the same for hive client, but is different for flink-sql client

It is an expected behavior?

Thanks,
Lei



wangl...@geekplus.com.cn 

 
From: Jark Wu
Date: 2020-04-09 14:48
To: wangl...@geekplus.com.cn; Jingsong Li; lirui
CC: user
Subject: Re: fink sql client not able to read parquet format table
Hi Lei,

Are you using the newest 1.10 blink planner? 

I'm not familiar with Hive and parquet, but I know @Jingsong Li and 
@li...@apache.org are experts on this. Maybe they can help on this question. 

Best,
Jark

On Tue, 7 Apr 2020 at 16:17, wangl...@geekplus.com.cn 
 wrote:

Hive table stored as parquet.

Under hive client: 
hive> select robotid from robotparquet limit 2;
OK
1291097
1291044


But under flink sql-client the result is 0
Flink SQL> select robotid  from robotparquet limit 2;
  robotid
 0
 0

Any insight on this?

Thanks,
Lei





wangl...@geekplus.com.cn 



-- 
Best, Jingsong Lee


Re: Re: fink sql client not able to read parquet format table

2020-04-09 Thread wangl...@geekplus.com.cn

I am using the newest 1.10 blink planner.

Perhaps it is because of the method i used to write the parquet file.

Receive kafka message, transform each message to a Java class Object, write the 
Object to HDFS using StreamingFileSink, add  the HDFS path as a partition of 
the hive table

No matter what the order of the field description in  hive ddl statement, the 
hive client will work, as long as  the field name is the same with Java Object 
field name.
But flink sql client will not work.

DataStream sourceRobot = source.map( x->transform(x));
final StreamingFileSink sink;
sink = StreamingFileSink
.forBulkFormat(new 
Path("hdfs://172.19.78.38:8020/user/root/wanglei/robotdata/parquet"),
ParquetAvroWriters.forReflectRecord(RobotUploadData0101.class))
For example 
RobotUploadData0101 has two fields:  robotId int, robotTime long

CREATE TABLE `robotparquet`(  `robotid` int,  `robottime` bigint ) and 
CREATE TABLE `robotparquet`(  `robottime` bigint,   `robotid` int)
is the same for hive client, but is different for flink-sql client

It is an expected behavior?

Thanks,
Lei



wangl...@geekplus.com.cn 

 
From: Jark Wu
Date: 2020-04-09 14:48
To: wangl...@geekplus.com.cn; Jingsong Li; lirui
CC: user
Subject: Re: fink sql client not able to read parquet format table
Hi Lei,

Are you using the newest 1.10 blink planner? 

I'm not familiar with Hive and parquet, but I know @Jingsong Li and 
@li...@apache.org are experts on this. Maybe they can help on this question. 

Best,
Jark

On Tue, 7 Apr 2020 at 16:17, wangl...@geekplus.com.cn 
 wrote:

Hive table stored as parquet.

Under hive client: 
hive> select robotid from robotparquet limit 2;
OK
1291097
1291044


But under flink sql-client the result is 0
Flink SQL> select robotid  from robotparquet limit 2;
  robotid
 0
 0

Any insight on this?

Thanks,
Lei





wangl...@geekplus.com.cn 



fink sql client not able to read parquet format table

2020-04-07 Thread wangl...@geekplus.com.cn

Hive table stored as parquet.

Under hive client: 
hive> select robotid from robotparquet limit 2;
OK
1291097
1291044


But under flink sql-client the result is 0
Flink SQL> select robotid  from robotparquet limit 2;
  robotid
 0
 0

Any insight on this?

Thanks,
Lei





wangl...@geekplus.com.cn 



Re: Re: 实现 KafkaUpsertTableSink

2020-03-30 Thread wangl...@geekplus.com.cn

我只保留 KafkaRetractTableSourceSinkFactory  一个, KafkaRetractTableSinkBase 实现 
RetractStreamTableSink 接口,在 consumeDataStream 实现只有 True 才发送,最终 work 了。
@Override
public DataStreamSink consumeDataStream(DataStream> 
dataStream) {
   DataStream dtNeed = dataStream.filter(x -> x.f0 == Boolean.TRUE).map(x -> 
x.f1);

INSERT INTO table1 SELCET field, count(*) from table2 group by field 这是 一个 
RetractStream,结果里面会有 True/False, 通过这个过滤是可以的。
INSERT INTO table1 SELECT feild, 1 from table2  
  我理解这不是一个 RetractStream, 上面  dataStream.filter(x -> x.f0 == 
Boolean.TRUE) 的代码应该会出错,但实际上没有出错

还不是完全能理解,我再看一下吧。

谢谢,
王磊


wangl...@geekplus.com.cn 

Sender: Benchao Li
Send Time: 2020-03-31 12:02
Receiver: user-zh
Subject: Re: Re: 实现 KafkaUpsertTableSink
我理解你可以让KafkaRetractTableSourceSinkFactory的参数跟KafkaTableSourceSinkFactory的参数有不同的地方,
然后通过这个参数来区分两个不同的factory。比如加一个参数,表示这个sink是retract还是append类型之类的?
 
wangl...@geekplus.com.cn  于2020年3月31日周二 上午11:17写道:
 
> 这个应该是得到两个 tableFactories 了。 我直接平行的把 KafkaTableSourceSinkFactory 那一套变成
> KafkaRetractTableSourceSinkFactory 写了一遍
> 但这个应该怎样改才合适呢?
>
> 137 private static  T
> findSingleInternal(
> 138 Class factoryClass,
> 139 Map properties,
> 140 Optional classLoader) {
> 141
> 142 List tableFactories =
> discoverFactories(classLoader);
> 143 List filtered = filter(tableFactories,
> factoryClass, properties);
> 144
> 145 if (filtered.size() > 1) {
> 146 throw new AmbiguousTableFactoryException(
> 147 filtered,
> 148 factoryClass,
> 149 tableFactories,
> 150 properties);
> 151 } else {
> 152 return filtered.get(0);
> 153 }
> 154 }
>
>
> 谢谢,
> 王磊
>
>
> wangl...@geekplus.com.cn
>
>
> Sender: wangl...@geekplus.com.cn
> Send Time: 2020-03-31 10:50
> Receiver: user-zh
> Subject: Re: RE: 实现 KafkaUpsertTableSink
>
> 我以相同的方式 实现了一个  KafkaRetractTableSink, 然后打成 jar 包放在 lib 目录下启动 sql-client:
>
> org.apache.flink.table.planner.delegation.BlinkExecutorFactory
> at
> org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:146)
> at
> org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:113)
> at
> org.apache.flink.table.client.gateway.local.ExecutionContext.createTableSource(ExecutionContext.java:377)
> at
> org.apache.flink.table.client.gateway.local.ExecutionContext.lambda$initializeCatalogs$6(ExecutionContext.java:559)
> at java.util.LinkedHashMap.forEach(LinkedHashMap.java:684)
> at
> org.apache.flink.table.client.gateway.local.ExecutionContext.initializeCatalogs(ExecutionContext.java:557)
> at
> org.apache.flink.table.client.gateway.local.ExecutionContext.initializeTableEnvironment(ExecutionContext.java:494)
> at
> org.apache.flink.table.client.gateway.local.ExecutionContext.(ExecutionContext.java:159)
> at
> org.apache.flink.table.client.gateway.local.ExecutionContext.(ExecutionContext.java:118)
> at
> org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:742)
> ... 3 more
>
> 这个改怎样解决呢?
>
> 谢谢,
> 王磊
>
>
>
> wangl...@geekplus.com.cn
>
> Sender: wxchunj...@163.com
> Send Time: 2020-03-29 10:32
> Receiver: user-zh@flink.apache.org
> Subject: RE: 实现 KafkaUpsertTableSink
> Benchao,非常感谢大佬,之前不知道需要添加 Factory 到文件中,添加之后,可以正常运行了。
> -Original Message-
> From: user-zh-return-2640-wxchunjhyy=163@flink.apache.org
>  On Behalf Of
> Benchao Li
> Sent: Saturday, March 28, 2020 6:28 PM
> To: user-zh 
> Subject: Re: 实现 KafkaUpsertTableSink
> Hi,
> 你需要把你新增的Factory添加到 resources下的
>
> META-INF/services/org.apache.flink.table.factories.TableFactory文件中,不知道这一步你是否已经做过了呢?
>  于2020年3月28日周六 下午5:38写道:
> > 各位大佬:
> >
> > 由于现在的 KafkaTableSink 不支持 sql 中有group ,参照
> > KafkaTableSink 和 HbaseUpsertTableSink  的一套逻辑实现了一套
> > KafkaUpsertTableSink:
> >
> > KafkaUpsertTableSink
> >
> > KafkaUpsertTableSinkBase
> >
> > KafkaUpsertTableSourceSinkFactory
> >
> > KafkaUpsertTableSourceSinkFactoryBase
> >
> > MyKafkaValidator
> >
> > 但是在 TableFactoryService. discoverFactories 的方法中不能加载我定义的
> > KafkaUpsertTableSourceSinkFactory ,请问各位大佬,是否是需要在什么地方注册
> > 呢?
> >

Re: Re: 实现 KafkaUpsertTableSink

2020-03-30 Thread wangl...@geekplus.com.cn
这个应该是得到两个 tableFactories 了。 我直接平行的把 KafkaTableSourceSinkFactory 那一套变成 
KafkaRetractTableSourceSinkFactory 写了一遍
但这个应该怎样改才合适呢?

137 private static  T findSingleInternal(
138 Class factoryClass,
139 Map properties,
140 Optional classLoader) {
141
142 List tableFactories = 
discoverFactories(classLoader);
143 List filtered = filter(tableFactories, factoryClass, 
properties);
144
145 if (filtered.size() > 1) {
146 throw new AmbiguousTableFactoryException(
147 filtered,
148 factoryClass,
149 tableFactories,
150 properties);
151 } else {
152 return filtered.get(0);
153 }
154 }


谢谢,
王磊


wangl...@geekplus.com.cn 

 
Sender: wangl...@geekplus.com.cn
Send Time: 2020-03-31 10:50
Receiver: user-zh
Subject: Re: RE: 实现 KafkaUpsertTableSink
 
我以相同的方式 实现了一个  KafkaRetractTableSink, 然后打成 jar 包放在 lib 目录下启动 sql-client:
 
org.apache.flink.table.planner.delegation.BlinkExecutorFactory
at 
org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:146)
at 
org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:113)
at 
org.apache.flink.table.client.gateway.local.ExecutionContext.createTableSource(ExecutionContext.java:377)
at 
org.apache.flink.table.client.gateway.local.ExecutionContext.lambda$initializeCatalogs$6(ExecutionContext.java:559)
at java.util.LinkedHashMap.forEach(LinkedHashMap.java:684)
at 
org.apache.flink.table.client.gateway.local.ExecutionContext.initializeCatalogs(ExecutionContext.java:557)
at 
org.apache.flink.table.client.gateway.local.ExecutionContext.initializeTableEnvironment(ExecutionContext.java:494)
at 
org.apache.flink.table.client.gateway.local.ExecutionContext.(ExecutionContext.java:159)
at 
org.apache.flink.table.client.gateway.local.ExecutionContext.(ExecutionContext.java:118)
at 
org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:742)
... 3 more
 
这个改怎样解决呢?
 
谢谢,
王磊
 
 
 
wangl...@geekplus.com.cn 
 
Sender: wxchunj...@163.com
Send Time: 2020-03-29 10:32
Receiver: user-zh@flink.apache.org
Subject: RE: 实现 KafkaUpsertTableSink
Benchao,非常感谢大佬,之前不知道需要添加 Factory 到文件中,添加之后,可以正常运行了。
-Original Message-
From: user-zh-return-2640-wxchunjhyy=163@flink.apache.org 
 On Behalf Of Benchao 
Li
Sent: Saturday, March 28, 2020 6:28 PM
To: user-zh 
Subject: Re: 实现 KafkaUpsertTableSink
Hi,
你需要把你新增的Factory添加到 resources下的
META-INF/services/org.apache.flink.table.factories.TableFactory文件中,不知道这一步你是否已经做过了呢?
 于2020年3月28日周六 下午5:38写道:
> 各位大佬:
>
> 由于现在的 KafkaTableSink 不支持 sql 中有group ,参照
> KafkaTableSink 和 HbaseUpsertTableSink  的一套逻辑实现了一套
> KafkaUpsertTableSink:
>
> KafkaUpsertTableSink
>
> KafkaUpsertTableSinkBase
>
> KafkaUpsertTableSourceSinkFactory
>
> KafkaUpsertTableSourceSinkFactoryBase
>
> MyKafkaValidator
>
> 但是在 TableFactoryService. discoverFactories 的方法中不能加载我定义的
> KafkaUpsertTableSourceSinkFactory ,请问各位大佬,是否是需要在什么地方注册
> 呢?
>
>
>
>
> /**
> * Searches for factories using Java service providers.
> *
> * @return all factories in the classpath */ private static 
> List discoverFactories(Optional
> classLoader) {
>try {
>   List result = new LinkedList<>();
>   ClassLoader cl =
> classLoader.orElse(Thread.currentThread().getContextClassLoader());
>   ServiceLoader
>  .load(TableFactory.class, cl)
>  .iterator()
>  .forEachRemaining(result::add);
>   //todo add
>   result.add(new KafkaUpsertTableSourceSinkFactory());
>   return result;
>} catch (ServiceConfigurationError e) {
>   LOG.error("Could not load service provider for table factories.", e);
>   throw new TableException("Could not load service provider for 
> table factories.", e);
>}
>
> }
>
>
>
>
>
> 直接在返回的 result 里面添加对应的  KafkaUpsertTableSourceSinkFactory  是可
> 以成功运行的。
>
> 非常感谢
>
>
>
>
>
> --
>
> Thanks
>
> venn
>
>
>
>
-- 
Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenc...@gmail.com; libenc...@pku.edu.cn


Re: RE: 实现 KafkaUpsertTableSink

2020-03-30 Thread wangl...@geekplus.com.cn

我以相同的方式 实现了一个  KafkaRetractTableSink, 然后打成 jar 包放在 lib 目录下启动 sql-client:

org.apache.flink.table.planner.delegation.BlinkExecutorFactory
at 
org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:146)
at 
org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:113)
at 
org.apache.flink.table.client.gateway.local.ExecutionContext.createTableSource(ExecutionContext.java:377)
at 
org.apache.flink.table.client.gateway.local.ExecutionContext.lambda$initializeCatalogs$6(ExecutionContext.java:559)
at java.util.LinkedHashMap.forEach(LinkedHashMap.java:684)
at 
org.apache.flink.table.client.gateway.local.ExecutionContext.initializeCatalogs(ExecutionContext.java:557)
at 
org.apache.flink.table.client.gateway.local.ExecutionContext.initializeTableEnvironment(ExecutionContext.java:494)
at 
org.apache.flink.table.client.gateway.local.ExecutionContext.(ExecutionContext.java:159)
at 
org.apache.flink.table.client.gateway.local.ExecutionContext.(ExecutionContext.java:118)
at 
org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:742)
... 3 more

这个改怎样解决呢?

谢谢,
王磊



wangl...@geekplus.com.cn 

 
Sender: wxchunj...@163.com
Send Time: 2020-03-29 10:32
Receiver: user-zh@flink.apache.org
Subject: RE: 实现 KafkaUpsertTableSink
Benchao,非常感谢大佬,之前不知道需要添加 Factory 到文件中,添加之后,可以正常运行了。
 
 
-Original Message-
From: user-zh-return-2640-wxchunjhyy=163@flink.apache.org 
 On Behalf Of Benchao 
Li
Sent: Saturday, March 28, 2020 6:28 PM
To: user-zh 
Subject: Re: 实现 KafkaUpsertTableSink
 
Hi,
 
你需要把你新增的Factory添加到 resources下的
META-INF/services/org.apache.flink.table.factories.TableFactory文件中,不知道这一步你是否已经做过了呢?
 
 于2020年3月28日周六 下午5:38写道:
 
> 各位大佬:
>
> 由于现在的 KafkaTableSink 不支持 sql 中有group ,参照
> KafkaTableSink 和 HbaseUpsertTableSink  的一套逻辑实现了一套
> KafkaUpsertTableSink:
>
> KafkaUpsertTableSink
>
> KafkaUpsertTableSinkBase
>
> KafkaUpsertTableSourceSinkFactory
>
> KafkaUpsertTableSourceSinkFactoryBase
>
> MyKafkaValidator
>
> 但是在 TableFactoryService. discoverFactories 的方法中不能加载我定义的
> KafkaUpsertTableSourceSinkFactory ,请问各位大佬,是否是需要在什么地方注册
> 呢?
>
>
>
>
> /**
> * Searches for factories using Java service providers.
> *
> * @return all factories in the classpath */ private static 
> List discoverFactories(Optional
> classLoader) {
>try {
>   List result = new LinkedList<>();
>   ClassLoader cl =
> classLoader.orElse(Thread.currentThread().getContextClassLoader());
>   ServiceLoader
>  .load(TableFactory.class, cl)
>  .iterator()
>  .forEachRemaining(result::add);
>   //todo add
>   result.add(new KafkaUpsertTableSourceSinkFactory());
>   return result;
>} catch (ServiceConfigurationError e) {
>   LOG.error("Could not load service provider for table factories.", e);
>   throw new TableException("Could not load service provider for 
> table factories.", e);
>}
>
> }
>
>
>
>
>
> 直接在返回的 result 里面添加对应的  KafkaUpsertTableSourceSinkFactory  是可
> 以成功运行的。
>
> 非常感谢
>
>
>
>
>
> --
>
> Thanks
>
> venn
>
>
>
>
 
-- 
 
Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenc...@gmail.com; libenc...@pku.edu.cn


Re: Re: flink 安装包的几个 jar 是怎么 build 出来的

2020-03-26 Thread wangl...@geekplus.com.cn

flink-table-uber-blink 下
 mvn clean install -DskipTests -Dscala-2.12 -DskipTests

不清楚这个 -Dscala-2.12 怎样起作用的,但这样是可以直接替换掉服务器上的 jar 并正常 work的

谢谢, 
王磊


wangl...@geekplus.com.cn 
 
Sender: Kurt Young
Send Time: 2020-03-26 18:15
Receiver: user-zh
cc: jihongchao
Subject: Re: flink 安装包的几个 jar 是怎么 build 出来的
flink-table-uber-blink 应该是这个module,它负责打出 blink planner 的胖包(uber jar)
 
Best,
Kurt
 
 
On Thu, Mar 26, 2020 at 5:54 PM wangl...@geekplus.com.cn <
wangl...@geekplus.com.cn> wrote:
 
>
> 单机版 下载 tgz 解压,lib 目录下有好几个 jar, 比如 flink-table-blink_2.12-1.10.0.jar
> 这个 jar 是从哪里 build 出来的呢?
>
> 我 clone github 上的源代码,mvn clean package
> 我以为 flink-table/flink-table-planner-blink 目录下build 出的
> flink-table-planner-blink_2.11-1.10.0.jar 跟发布 tgz 里的
>  flink-table-blink_2.12-1.10.0.jar  是对应的
> 我直接替换到安装目录下,start-cluster.sh 还可以,但 ./bin/sql-client.sh embedded 就出错了。
>
> 谢谢,
> 王磊
>
>
>
> wangl...@geekplus.com.cn
>
>


flink 安装包的几个 jar 是怎么 build 出来的

2020-03-26 Thread wangl...@geekplus.com.cn

单机版 下载 tgz 解压,lib 目录下有好几个 jar, 比如 flink-table-blink_2.12-1.10.0.jar 
这个 jar 是从哪里 build 出来的呢?
 
我 clone github 上的源代码,mvn clean package
我以为 flink-table/flink-table-planner-blink 目录下build 出的 
flink-table-planner-blink_2.11-1.10.0.jar 跟发布 tgz 里的   
flink-table-blink_2.12-1.10.0.jar  是对应的
我直接替换到安装目录下,start-cluster.sh 还可以,但 ./bin/sql-client.sh embedded 就出错了。

谢谢,
王磊



wangl...@geekplus.com.cn 



Re: Re: Where can i find MySQL retract stream table sink java soure code?

2020-03-24 Thread wangl...@geekplus.com.cn
Thanks Jingsong.

So JDBCTableSink now suport append and upsert mode.  Retract mode not  
available yet. It is right?

Thanks,
Lei 




wangl...@geekplus.com.cn 

 
Sender: Jingsong Li
Send Time: 2020-03-25 11:39
Receiver: wangl...@geekplus.com.cn
cc: user
Subject: Re: Where can i find MySQL retract stream table sink java soure code?
Hi,

Maybe you have some misunderstanding to upsert sink. You can take a look to 
[1], it can deal with "delete" records.

[1]https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/dynamic_tables.html#table-to-stream-conversion

Best,
Jingsong Lee

On Wed, Mar 25, 2020 at 11:37 AM Jingsong Li  wrote:
Hi,

This can be a upsert stream [1], and JDBC has upsert sink now [2].

[1]https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/dynamic_tables.html#table-to-stream-conversion
[2]https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connect.html#jdbc-connector

Best,
Jingsong Lee

On Wed, Mar 25, 2020 at 11:14 AM Jingsong Li  wrote:
Hi,

This can be a upsert stream [1]

Best,
Jingsong Lee

On Wed, Mar 25, 2020 at 11:12 AM wangl...@geekplus.com.cn 
 wrote:

Create one table with kafka,  another table with MySQL  using flinksql.
Write a sql to read from kafka and write to MySQL.

INSERT INTO mysqlTable  SELECT status, COUNT(order_no) AS num FROM 
(SELECT order_no, LAST_VALUE(status) AS status FROM 
kafkaTable GROUP BY order_no) 
GROUP BY statusI think this is a retract stream. But 
where can i find the java source code  about MySQL retract table sink?
Thanks,Lei



wangl...@geekplus.com.cn 



-- 
Best, Jingsong Lee


-- 
Best, Jingsong Lee


-- 
Best, Jingsong Lee


Re: Re: Where can i find MySQL retract stream table sink java soure code?

2020-03-24 Thread wangl...@geekplus.com.cn

Seems it is here:  
https://github.com/apache/flink/tree/master/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc
There's no JDBCRetractTableSink, only  append and upsert.
I am confused why the MySQL record can be deleted. 

Thanks,
Lei 


wangl...@geekplus.com.cn 

 
Sender: wangl...@geekplus.com.cn
Send Time: 2020-03-25 11:25
Receiver: Jingsong Li
cc: user
Subject: Re: Re: Where can i find MySQL retract stream table sink java soure 
code?

Thanks Jingsong.

When executing this sql, the mysql table record can be deleted. So i guess it 
is a retract stream.
I want to know the exactly java code it is generated and have a look at it.  

Thanks,
Lei




wangl...@geekplus.com.cn
 
Sender: Jingsong Li
Send Time: 2020-03-25 11:14
Receiver: wangl...@geekplus.com.cn
cc: user
Subject: Re: Where can i find MySQL retract stream table sink java soure code?
Hi,

This can be a upsert stream [1]

Best,
Jingsong Lee

On Wed, Mar 25, 2020 at 11:12 AM wangl...@geekplus.com.cn 
 wrote:

Create one table with kafka,  another table with MySQL  using flinksql.
Write a sql to read from kafka and write to MySQL.

INSERT INTO mysqlTable  SELECT status, COUNT(order_no) AS num FROM 
(SELECT order_no, LAST_VALUE(status) AS status FROM 
kafkaTable GROUP BY order_no) 
GROUP BY statusI think this is a retract stream. But 
where can i find the java source code  about MySQL retract table sink?
Thanks,Lei



wangl...@geekplus.com.cn 



-- 
Best, Jingsong Lee


Re: Re: Where can i find MySQL retract stream table sink java soure code?

2020-03-24 Thread wangl...@geekplus.com.cn

Thanks Jingsong.

When executing this sql, the mysql table record can be deleted. So i guess it 
is a retract stream.
I want to know the exactly java code it is generated and have a look at it.  

Thanks,
Lei




wangl...@geekplus.com.cn
 
Sender: Jingsong Li
Send Time: 2020-03-25 11:14
Receiver: wangl...@geekplus.com.cn
cc: user
Subject: Re: Where can i find MySQL retract stream table sink java soure code?
Hi,

This can be a upsert stream [1]

Best,
Jingsong Lee

On Wed, Mar 25, 2020 at 11:12 AM wangl...@geekplus.com.cn 
 wrote:

Create one table with kafka,  another table with MySQL  using flinksql.
Write a sql to read from kafka and write to MySQL.

INSERT INTO mysqlTable  SELECT status, COUNT(order_no) AS num FROM 
(SELECT order_no, LAST_VALUE(status) AS status FROM 
kafkaTable GROUP BY order_no) 
GROUP BY statusI think this is a retract stream. But 
where can i find the java source code  about MySQL retract table sink?
Thanks,Lei



wangl...@geekplus.com.cn 



-- 
Best, Jingsong Lee


Where can i find MySQL retract stream table sink java soure code?

2020-03-24 Thread wangl...@geekplus.com.cn

Create one table with kafka,  another table with MySQL  using flinksql.
Write a sql to read from kafka and write to MySQL.

INSERT INTO mysqlTable  SELECT status, COUNT(order_no) AS num FROM 
(SELECT order_no, LAST_VALUE(status) AS status FROM 
kafkaTable GROUP BY order_no) 
GROUP BY statusI think this is a retract stream. But 
where can i find the java source code  about MySQL retract table sink?
Thanks,Lei



wangl...@geekplus.com.cn 



回复: Flink JDBC Driver是否支持创建流数据表

2020-03-24 Thread wangl...@geekplus.com.cn

参考下这个文档: 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector
 
下面的语法应该是不支持的:
  'format.type' = 'csv',\n" +
"'format.field-delimiter' = '|'\n"

下面是我可以跑通的代码, kafka 里的数据需要是这种格式:{"order_no":"abcdefg","status":90}
tEnv.sqlUpdate("CREATE TABLE pick_order (\n"
+ "order_no VARCHAR,\n"
+ "status INT\n"
+ ") WITH (\n"
+ "'connector.type' = 'kafka',\n"
+ "'connector.version' = 'universal',\n"
+ "'connector.topic' = 'wanglei_test',\n"
+ "'connector.startup-mode' = 'latest-offset',\n"
+ "'connector.properties.0.key' = 'zookeeper.connect',\n"
+ "'connector.properties.0.value' = 'xxx:2181',\n"
+ "'connector.properties.1.key' = 'bootstrap.servers',\n"
+ "'connector.properties.1.value' = 'xxx:9092',\n"
    + "'update-mode' = 'append',\n"
+ "'format.type' = 'json',\n"
+ "'format.derive-schema' = 'true'\n"
+ ")");

王磊


wangl...@geekplus.com.cn
 
发件人: 赵峰
发送时间: 2020-03-24 21:28
收件人: user-zh
主题: Flink JDBC Driver是否支持创建流数据表
hi
   
Flink JDBC Driver创建kafka表报错,是我的建表代码不正确?代码如下:
Connection connection = 
DriverManager.getConnection("jdbc:flink://localhost:8083?planner=blink");
Statement statement = connection.createStatement();
 
statement.executeUpdate(
"CREATE TABLE table_kafka (\n" +
"user_id BIGINT,\n" +
"item_id BIGINT,\n" +
"category_id BIGINT,\n" +
"behavior STRING,\n" +
"ts TIMESTAMP(3),\n" +
"proctime as PROCTIME(),\n" +
"WATERMARK FOR ts as ts - INTERVAL '5' SECOND\n" +
") WITH (\n" +
"'connector.type' = 'kafka', \n" +
"'connector.version' = 'universal', \n" +
"'connector.topic' = 'flink_im02', \n" +
"'connector.properties.group.id' = 'flink_im02_new',\n" +
"'connector.startup-mode' = 'earliest-offset', \n" +
"'connector.properties.zookeeper.connect' = '*.*.*.*:2181', \n" +
"'connector.properties.bootstrap.servers' = '*.*.*.*:9092', \n" +
"'format.type' = 'csv',\n" +
"'format.field-delimiter' = '|'\n" +
")");
ResultSet rs1 = statement.executeQuery("SELECT * FROM table_kafka");
while (rs1.next()) {
System.out.println(rs1.getInt(1) + ", " + rs1.getInt(2));
} 
 
statement.close();
connection.close();
 
报错:
Reason: Required context properties mismatch.
 
The matching candidates:
org.apache.flink.table.sources.CsvBatchTableSourceFactory
Mismatched properties:
'connector.type' expects 'filesystem', but is 'kafka'
 
 
 
 
赵峰


Streaming kafka data sink to hive

2020-03-19 Thread wangl...@geekplus.com.cn

We have many app logs on our app server  and want to parse the logs to structed 
table format and then sink to hive.
Seems it is good to use batch mode. The app log is hourly compressed and it is 
convenience to do partitioning.

We want to use streaming mode. Tail the app logs to Kafka,  then use flink to 
read kafka topic  and then sink to Hive.
I have several questions.

1  Is there any flink-hive-connector that i can use to write to hive 
streamingly?
2  Since HDFS is not friendly to frequently append and hive's data is stored to 
hdfs,  is it  OK if the throughput is high? 

Thanks,
Lei



wangl...@geekplus.com.cn 


Re: Re: flink sql-client read hive orc table no result

2020-03-18 Thread wangl...@geekplus.com.cn

Seems the root cause is  "transactional"='true'. 
After remove this,the table can be queryed from flink sql-client,even i add  
"clustered by (robot_id) into 3 buckets" again.

Thanks,
Lei


wangl...@geekplus.com.cn
 
From: Kurt Young
Date: 2020-03-18 18:04
To: wangl...@geekplus.com.cn
CC: lirui; user
Subject: Re: Re: flink sql-client read hive orc table no result
also try to remove "transactional"='true'?

Best,
Kurt


On Wed, Mar 18, 2020 at 5:54 PM wangl...@geekplus.com.cn 
 wrote:

Tried again. Even i  remove the "clustered by (robot_id) into 3 buckets"  
statement, no result from flink sql-client

Thanks,
Lei 



wangl...@geekplus.com.cn
From: Kurt Young
Date: 2020-03-18 17:41
To: wangl...@geekplus.com.cn; lirui
CC: user
Subject: Re: flink sql-client read hive orc table no result
My guess is we haven't support hive bucket table yet.
cc Rui Li for confirmation.

Best,
Kurt


On Wed, Mar 18, 2020 at 5:19 PM wangl...@geekplus.com.cn 
 wrote:
Hive table store as orc format: 
 CREATE  TABLE `robot_tr`(`robot_id` int,  `robot_time` bigint, 
`linear_velocity` double, `track_side_error` int) 
 partitioned by (warehouseid STRING) clustered by (robot_id) into 3 buckets 
 stored as orc tblproperties("transactional"='true');

Under hive client,  insert into one record and then select there will be the 
result to the console.

But under flink sql-client, when select * from  robot_tr, there's no result? 

Any insight on this?

Thanks,
Lei 



wangl...@geekplus.com.cn 



Re: Re: flink sql-client read hive orc table no result

2020-03-18 Thread wangl...@geekplus.com.cn

Tried again. Even i  remove the "clustered by (robot_id) into 3 buckets"  
statement, no result from flink sql-client

Thanks,
Lei 



wangl...@geekplus.com.cn
From: Kurt Young
Date: 2020-03-18 17:41
To: wangl...@geekplus.com.cn; lirui
CC: user
Subject: Re: flink sql-client read hive orc table no result
My guess is we haven't support hive bucket table yet.
cc Rui Li for confirmation.

Best,
Kurt


On Wed, Mar 18, 2020 at 5:19 PM wangl...@geekplus.com.cn 
 wrote:
Hive table store as orc format: 
 CREATE  TABLE `robot_tr`(`robot_id` int,  `robot_time` bigint, 
`linear_velocity` double, `track_side_error` int) 
 partitioned by (warehouseid STRING) clustered by (robot_id) into 3 buckets 
 stored as orc tblproperties("transactional"='true');

Under hive client,  insert into one record and then select there will be the 
result to the console.

But under flink sql-client, when select * from  robot_tr, there's no result? 

Any insight on this?

Thanks,
Lei 



wangl...@geekplus.com.cn 



flink sql-client read hive orc table no result

2020-03-18 Thread wangl...@geekplus.com.cn
Hive table store as orc format: 
 CREATE  TABLE `robot_tr`(`robot_id` int,  `robot_time` bigint, 
`linear_velocity` double, `track_side_error` int) 
 partitioned by (warehouseid STRING) clustered by (robot_id) into 3 buckets 
 stored as orc tblproperties("transactional"='true');

Under hive client,  insert into one record and then select there will be the 
result to the console.

But under flink sql-client, when select * from  robot_tr, there's no result? 

Any insight on this?

Thanks,
Lei 



wangl...@geekplus.com.cn 



Re: Re: dimention table join not work under sql-client fink-1.10.0

2020-03-12 Thread wangl...@geekplus.com.cn
Thanks, works now. 

Seems the open source version is  different from alibaba cloud: 
https://www.alibabacloud.com/help/doc-detail/62506.htm




wangl...@geekplus.com.cn 

From: Zhenghua Gao
Date: 2020-03-13 12:12
To: wangl...@geekplus.com.cn
CC: user
Subject: Re: dimention table join not work under sql-client fink-1.10.0
We don't support 'PROCTIME()' in a temporal table join. Please use a left 
table's proctime field. [1]

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/joins.html#usage-1

Best Regards,
Zhenghua Gao


On Fri, Mar 13, 2020 at 11:57 AM wangl...@geekplus.com.cn 
 wrote:

Kafka source table:
CREATE TABLE out_order (
  out_order_code VARCHAR,
  intput_date BIGINT,
  owner_code VARCHAR
  ) WITH (
  'connector.type' = 'kafka',MySQL dimention table: CREATE TABLE dim_owner (
  owner_code VARCHAR,
  owner_name VARCHAR
  ) WITH (
  'connector.type' = 'jdbc',When i submit the sql:  SELECT 
o.out_order_code, o.input_date, o.owner_code, d.owner_name FROM out_order as o  
JOIN dim_owner FOR SYSTEM_TIME AS OF PROCTIME()  as d 
ON o.owner_code = d.owner_code;
There's error:  
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.TableException: Temporal table join currently only 
supports 'FOR SYSTEM_TIME AS OF' left table's proctime field, doesn't support 
'PROCTIME()'

Thanks,
Lei



wangl...@geekplus.com.cn 



dimention table join not work under sql-client fink-1.10.0

2020-03-12 Thread wangl...@geekplus.com.cn

Kafka source table:
CREATE TABLE out_order (
  out_order_code VARCHAR,
  intput_date BIGINT,
  owner_code VARCHAR
  ) WITH (
  'connector.type' = 'kafka',MySQL dimention table: CREATE TABLE dim_owner (
  owner_code VARCHAR,
  owner_name VARCHAR
  ) WITH (
  'connector.type' = 'jdbc',When i submit the sql:  SELECT 
o.out_order_code, o.input_date, o.owner_code, d.owner_name FROM out_order as o  
JOIN dim_owner FOR SYSTEM_TIME AS OF PROCTIME()  as d 
ON o.owner_code = d.owner_code;
There's error:  
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.TableException: Temporal table join currently only 
supports 'FOR SYSTEM_TIME AS OF' left table's proctime field, doesn't support 
'PROCTIME()'

Thanks,
Lei



wangl...@geekplus.com.cn 



Re: Re: flinkSQL join表的历史信息保存在哪里保存多久

2020-03-12 Thread wangl...@geekplus.com.cn

Thanks, it works.


wangl...@geekplus.com.cn
 
Sender: sunfulin
Send Time: 2020-03-12 14:19
Receiver: user-zh; wanglei2
cc: jinhai.me
Subject: Re:Re: Re: flinkSQL join表的历史信息保存在哪里保存多久


这样来用:
StreamTableEnvironment.getConfig().setIdleStateRetentionTime(min, max);




在 2020-03-12 14:11:31,"wangl...@geekplus.com.cn"  写道:
>
>这个文档是最新的吗,我直接在 IDEA 里面写这三行代码。
>StreamQueryConfig Deprecated, tableEnv 没有 queryConfig() 方法
>StreamExecutionEnvironment env = 
>StreamExecutionEnvironment.getExecutionEnvironment();
>StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
>StreamQueryConfig qConfig = tableEnv.queryConfig();
>
>
>
>wangl...@geekplus.com.cn 
>
> 
>Sender: jinhai wang
>Send Time: 2020-03-12 13:44
>Receiver: user-zh@flink.apache.org
>Subject: Re: flinkSQL join表的历史信息保存在哪里保存多久
>应该是withIdleStateRetentionTime参数的配置时间。具体文档:https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/query_configuration.html#idle-state-retention-time
> 
> 
>在 2020/3/12 下午12:37,“wangl...@geekplus.com.cn” 写入:
> 
>
>两个从 kafka 创建的表:
>
>tableA: key  valueA
>tableB: key  valueB 
>
>用 flink sql 提交job 运行: select  tableA.key, tableA.valueA,tableB.valueB from 
> tableA join tableB on tableA.key = tableB.key;
>这两个表的历史数据在 flink 中存在哪里?存多久呢?
>
>比如我 tableA key1 先出现,很长时间以后(一个月)tableB key1 才出现,这时候还能 join 上吗?
> 
>谢谢,
>王磊
>
>
>
>wangl...@geekplus.com.cn 
>
> 



 


Re: Re: flinkSQL join表的历史信息保存在哪里保存多久

2020-03-12 Thread wangl...@geekplus.com.cn

这个文档是最新的吗,我直接在 IDEA 里面写这三行代码。
StreamQueryConfig Deprecated, tableEnv 没有 queryConfig() 方法
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
StreamQueryConfig qConfig = tableEnv.queryConfig();



wangl...@geekplus.com.cn 

 
Sender: jinhai wang
Send Time: 2020-03-12 13:44
Receiver: user-zh@flink.apache.org
Subject: Re: flinkSQL join表的历史信息保存在哪里保存多久
应该是withIdleStateRetentionTime参数的配置时间。具体文档:https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/query_configuration.html#idle-state-retention-time
 
 
在 2020/3/12 下午12:37,“wangl...@geekplus.com.cn” 写入:
 

两个从 kafka 创建的表:

tableA: key  valueA
tableB: key  valueB 

用 flink sql 提交job 运行: select  tableA.key, tableA.valueA,tableB.valueB from 
tableA join tableB on tableA.key = tableB.key;
这两个表的历史数据在 flink 中存在哪里?存多久呢?

比如我 tableA key1 先出现,很长时间以后(一个月)tableB key1 才出现,这时候还能 join 上吗?
 
谢谢,
王磊



wangl...@geekplus.com.cn 

 


flinkSQL join 表的历史信息保存在哪里保存多久

2020-03-11 Thread wangl...@geekplus.com.cn

两个从 kafka 创建的表:

tableA: key  valueA
tableB: key  valueB 

用 flink sql 提交job 运行: select  tableA.key, tableA.valueA,tableB.valueB from 
tableA join tableB on tableA.key = tableB.key;
这两个表的历史数据在 flink 中存在哪里?存多久呢?

比如我 tableA key1 先出现,很长时间以后(一个月)tableB key1 才出现,这时候还能 join 上吗?
 
谢谢,
王磊



wangl...@geekplus.com.cn 


Re: Re: flink sql join 可以有 state 存储并从 state 恢复数据吗?

2020-03-11 Thread wangl...@geekplus.com.cn
我试了下,是可以的。

Thanks


wangl...@geekplus.com.cn
 
Sender: Kurt Young
Send Time: 2020-03-11 19:59
Receiver: wangl...@geekplus.com.cn
cc: user-zh
Subject: Re: Re: flink sql join 可以有 state 存储并从 state 恢复数据吗?
那有可能是可以的,你可以试试看

Best,
Kurt


On Wed, Mar 11, 2020 at 6:57 PM wangl...@geekplus.com.cn 
 wrote:
Hi Kurt,

如果我不用 sql-client, 直接把表注册、sql join 之类的操作写在 java 代码里打成 jar 包就能实现从 state 中恢复的功能吗?
代码里没有任何跟 state 声明、TTL 定义之类的操作。任务 cancel -s 之前已经存在的表信息会在 state 
存储并且再次提交任务可以被访问到直接用吗?

谢谢,
王磊



wangl...@geekplus.com.cn
 
Sender: Kurt Young
Send Time: 2020-03-11 12:54
Receiver: wangl...@geekplus.com.cn
cc: user-zh
Subject: Re: Re: flink sql join 可以有 state 存储并从 state 恢复数据吗?
sql client 目前还不支持这个功能。

Best,
Kurt


On Wed, Mar 11, 2020 at 11:35 AM wangl...@geekplus.com.cn 
 wrote:
Hi Kurt, 
确实是可以 直接 flink  cancel -s 保存状态。
但我是用 flink-sql-client 直接写 sql 提交的 job,再提交的时候怎样可以指定状态目录让这个任务从状态恢复呢?

谢谢,
王磊
 
Sender: Kurt Young
Send Time: 2020-03-11 10:38
Receiver: user-zh
Subject: Re: flink sql join 可以有 state 存储并从 state 恢复数据吗?
理论上来说,flink SQL的作业在编译完生成JobGraph并提交到集群上后,和Datastream的作业就没有什么本质的不同了。
应该也可以支持flink cancel -s 的功能,你可以先试下,如果碰到什么问题再看看。
 
Best,
Kurt
 
 
On Wed, Mar 11, 2020 at 10:24 AM wangl...@geekplus.com.cn <
wangl...@geekplus.com.cn> wrote:
 
> 有两个表:
> tableA: key  valueA
> tableB: key  valueB
>
> 我之前用 flink state 的方式存储 tableA,tableB 的消息过来以后去 query 这个 state 得到 valueA
> 直接 写 flinkSQL 也可以实现这种功能,但这两个表有时间差,任务停止后重新提交会丢失部分 join 的结果。
> flinkSQL 有没有类似 flink cancel -s 保存 state 的功能呢?
>
> 谢谢,
> 王磊
>


Re: Re: How to set stateBackEnd in flink sql program?

2020-03-11 Thread wangl...@geekplus.com.cn
Hi Jingsong, 

So i can write the code as following?

EnvironmentSettings settings = 
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
TableEnvironment tEnv = TableEnvironment.create(settings);
tEnv.getConfig().getConfiguration().setString("state.backend","rocksdb");eEnv.sqlUpdate(..)
Thanks,
Lei


wangl...@geekplus.com.cn
 
From: Jingsong Li
Date: 2020-03-12 11:32
To: wangl...@geekplus.com.cn
CC: user
Subject: Re: How to set stateBackEnd in flink sql program?
Hi wanglei,

If you are using Flink 1.10, you can set "state.backend=rocksdb" to 
"TableConfig.getConfiguration".
And you can find related config options here[1].

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/config.html

Jingsong Lee

On Thu, Mar 12, 2020 at 11:15 AM wangl...@geekplus.com.cn 
 wrote:

EnvironmentSettings settings = 
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
TableEnvironment tEnv = TableEnvironment.create(settings);
tEnv.sqlUpdate()...
Is there a way i can set stateBackEnd like normal streaming program as 
folloing:StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new RocksDBStateBackend(args[0], true));



wangl...@geekplus.com.cn


-- 
Best, Jingsong Lee


How to set stateBackEnd in flink sql program?

2020-03-11 Thread wangl...@geekplus.com.cn

EnvironmentSettings settings = 
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
TableEnvironment tEnv = TableEnvironment.create(settings);
tEnv.sqlUpdate()...
Is there a way i can set stateBackEnd like normal streaming program as 
folloing:StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new RocksDBStateBackend(args[0], true));



wangl...@geekplus.com.cn


Re: Re: json 中 timestamp 类型在json中怎样写才能被 flink sql 识别

2020-03-11 Thread wangl...@geekplus.com.cn

Thanks Jark,

No word to express my '囧'.


wangl...@geekplus.com.cn 

Sender: Jark Wu
Send Time: 2020-03-11 18:32
Receiver: wangl...@geekplus.com.cn
cc: user; user-zh
Subject: Re: json 中 timestamp 类型在json中怎样写才能被 flink sql 识别
Hi Lei, 

The "2020-03-11T13:00:00.123Z" format is correct, but you defined the wrong 
field name in the DDL. 
It should be "input_date", not "intput_date".

Best,
Jark

On Wed, 11 Mar 2020 at 17:52, wangl...@geekplus.com.cn 
 wrote:

Sorry i sent the Chinese written email to user@ 
Let me translate it to English.

I  create a table using sql-client from kafka topic:
CREATE TABLE order_status (
  out_order_code VARCHAR,
  intput_date TIMESTAMP(3),
  owner_code VARCHAR,
  status INT
  ) WITH (
  'connector.type' = 'kafka',.
  'format.type' = 'json',  'format.derive-schema' = 'true'   )
Then I send message to the topic: 
{"out_order_code":"MAMICK2020031048","input_date":"2020-03-11T13:00:00.123Z","owner_code":"WDTLEN04","status":90}
But the input_date is not recognized on the sql-client and is null, even i  
tried   1583828700240  "2020-03-11 13:00:00"  "2020-03-11 13:00:00.000" 

How should the timestamp(3) look like in the json message?

Thanks,
Lei
 




wangl...@geekplus.com.cn
 
发件人: wangl...@geekplus.com.cn
发送时间: 2020-03-11 17:41
收件人: user
主题: json 中 timestamp 类型在json中怎样写才能被 flink sql 识别

用 sql-client create 了一个 kafka table:
CREATE TABLE order_status (
  out_order_code VARCHAR,
  intput_date TIMESTAMP(3),
  owner_code VARCHAR,
  status INT
  ) WITH (
  'connector.type' = 'kafka',.
  'format.type' = 'json',  'format.derive-schema' = 'true'   )
然后往 kafka 这个 topic 
发送消息:{"out_order_code":"MAMICK2020031048","input_date":"2020-03-11T13:00:00.123Z","owner_code":"WDTLEN04","status":90}
input_date 在 sql-clinet 端始终是 NULL. 
我把 发送的 input_date 改成 1583828700240  "2020-03-11 13:00:00"  "2020-03-11 
13:00:00.000" 也都不行。 
这个 TIMESTAMP(3)在JSON 中应该写成什么样子呢?

谢谢,
王磊



wangl...@geekplus.com.cn


Re: Re: json 中 timestamp 类型在json中怎样写才能被 flink sql 识别

2020-03-11 Thread wangl...@geekplus.com.cn

Thanks Jark,

No word to express my '囧'.


wangl...@geekplus.com.cn 

Sender: Jark Wu
Send Time: 2020-03-11 18:32
Receiver: wangl...@geekplus.com.cn
cc: user; user-zh
Subject: Re: json 中 timestamp 类型在json中怎样写才能被 flink sql 识别
Hi Lei, 

The "2020-03-11T13:00:00.123Z" format is correct, but you defined the wrong 
field name in the DDL. 
It should be "input_date", not "intput_date".

Best,
Jark

On Wed, 11 Mar 2020 at 17:52, wangl...@geekplus.com.cn 
 wrote:

Sorry i sent the Chinese written email to user@ 
Let me translate it to English.

I  create a table using sql-client from kafka topic:
CREATE TABLE order_status (
  out_order_code VARCHAR,
  intput_date TIMESTAMP(3),
  owner_code VARCHAR,
  status INT
  ) WITH (
  'connector.type' = 'kafka',.
  'format.type' = 'json',  'format.derive-schema' = 'true'   )
Then I send message to the topic: 
{"out_order_code":"MAMICK2020031048","input_date":"2020-03-11T13:00:00.123Z","owner_code":"WDTLEN04","status":90}
But the input_date is not recognized on the sql-client and is null, even i  
tried   1583828700240  "2020-03-11 13:00:00"  "2020-03-11 13:00:00.000" 

How should the timestamp(3) look like in the json message?

Thanks,
Lei
 




wangl...@geekplus.com.cn
 
发件人: wangl...@geekplus.com.cn
发送时间: 2020-03-11 17:41
收件人: user
主题: json 中 timestamp 类型在json中怎样写才能被 flink sql 识别

用 sql-client create 了一个 kafka table:
CREATE TABLE order_status (
  out_order_code VARCHAR,
  intput_date TIMESTAMP(3),
  owner_code VARCHAR,
  status INT
  ) WITH (
  'connector.type' = 'kafka',.
  'format.type' = 'json',  'format.derive-schema' = 'true'   )
然后往 kafka 这个 topic 
发送消息:{"out_order_code":"MAMICK2020031048","input_date":"2020-03-11T13:00:00.123Z","owner_code":"WDTLEN04","status":90}
input_date 在 sql-clinet 端始终是 NULL. 
我把 发送的 input_date 改成 1583828700240  "2020-03-11 13:00:00"  "2020-03-11 
13:00:00.000" 也都不行。 
这个 TIMESTAMP(3)在JSON 中应该写成什么样子呢?

谢谢,
王磊



wangl...@geekplus.com.cn


Re: Re: flink sql join 可以有 state 存储并从 state 恢复数据吗?

2020-03-11 Thread wangl...@geekplus.com.cn
Hi Kurt,

如果我不用 sql-client, 直接把表注册、sql join 之类的操作写在 java 代码里打成 jar 包就能实现从 state 中恢复的功能吗?
代码里没有任何跟 state 声明、TTL 定义之类的操作。任务 cancel -s 之前已经存在的表信息会在 state 
存储并且再次提交任务可以被访问到直接用吗?

谢谢,
王磊



wangl...@geekplus.com.cn
 
Sender: Kurt Young
Send Time: 2020-03-11 12:54
Receiver: wangl...@geekplus.com.cn
cc: user-zh
Subject: Re: Re: flink sql join 可以有 state 存储并从 state 恢复数据吗?
sql client 目前还不支持这个功能。

Best,
Kurt


On Wed, Mar 11, 2020 at 11:35 AM wangl...@geekplus.com.cn 
 wrote:
Hi Kurt, 
确实是可以 直接 flink  cancel -s 保存状态。
但我是用 flink-sql-client 直接写 sql 提交的 job,再提交的时候怎样可以指定状态目录让这个任务从状态恢复呢?

谢谢,
王磊
 
Sender: Kurt Young
Send Time: 2020-03-11 10:38
Receiver: user-zh
Subject: Re: flink sql join 可以有 state 存储并从 state 恢复数据吗?
理论上来说,flink SQL的作业在编译完生成JobGraph并提交到集群上后,和Datastream的作业就没有什么本质的不同了。
应该也可以支持flink cancel -s 的功能,你可以先试下,如果碰到什么问题再看看。
 
Best,
Kurt
 
 
On Wed, Mar 11, 2020 at 10:24 AM wangl...@geekplus.com.cn <
wangl...@geekplus.com.cn> wrote:
 
> 有两个表:
> tableA: key  valueA
> tableB: key  valueB
>
> 我之前用 flink state 的方式存储 tableA,tableB 的消息过来以后去 query 这个 state 得到 valueA
> 直接 写 flinkSQL 也可以实现这种功能,但这两个表有时间差,任务停止后重新提交会丢失部分 join 的结果。
> flinkSQL 有没有类似 flink cancel -s 保存 state 的功能呢?
>
> 谢谢,
> 王磊
>


回复: json 中 timestamp 类型在json中怎样写才能被 flink sql 识别

2020-03-11 Thread wangl...@geekplus.com.cn

Sorry i sent the Chinese written email to user@ 
Let me translate it to English.

I  create a table using sql-client from kafka topic:
CREATE TABLE order_status (
  out_order_code VARCHAR,
  intput_date TIMESTAMP(3),
  owner_code VARCHAR,
  status INT
  ) WITH (
  'connector.type' = 'kafka',.
  'format.type' = 'json',  'format.derive-schema' = 'true'   )
Then I send message to the topic: 
{"out_order_code":"MAMICK2020031048","input_date":"2020-03-11T13:00:00.123Z","owner_code":"WDTLEN04","status":90}
But the input_date is not recognized on the sql-client and is null, even i  
tried   1583828700240  "2020-03-11 13:00:00"  "2020-03-11 13:00:00.000" 

How should the timestamp(3) look like in the json message?

Thanks,
Lei
 




wangl...@geekplus.com.cn
 
发件人: wangl...@geekplus.com.cn
发送时间: 2020-03-11 17:41
收件人: user
主题: json 中 timestamp 类型在json中怎样写才能被 flink sql 识别

用 sql-client create 了一个 kafka table:
CREATE TABLE order_status (
  out_order_code VARCHAR,
  intput_date TIMESTAMP(3),
  owner_code VARCHAR,
  status INT
  ) WITH (
  'connector.type' = 'kafka',.
  'format.type' = 'json',  'format.derive-schema' = 'true'   )
然后往 kafka 这个 topic 
发送消息:{"out_order_code":"MAMICK2020031048","input_date":"2020-03-11T13:00:00.123Z","owner_code":"WDTLEN04","status":90}
input_date 在 sql-clinet 端始终是 NULL. 
我把 发送的 input_date 改成 1583828700240  "2020-03-11 13:00:00"  "2020-03-11 
13:00:00.000" 也都不行。 
这个 TIMESTAMP(3)在JSON 中应该写成什么样子呢?

谢谢,
王磊



wangl...@geekplus.com.cn


回复: json 中 timestamp 类型在json中怎样写才能被 flink sql 识别

2020-03-11 Thread wangl...@geekplus.com.cn

Sorry i sent the Chinese written email to user@ 
Let me translate it to English.

I  create a table using sql-client from kafka topic:
CREATE TABLE order_status (
  out_order_code VARCHAR,
  intput_date TIMESTAMP(3),
  owner_code VARCHAR,
  status INT
  ) WITH (
  'connector.type' = 'kafka',.
  'format.type' = 'json',  'format.derive-schema' = 'true'   )
Then I send message to the topic: 
{"out_order_code":"MAMICK2020031048","input_date":"2020-03-11T13:00:00.123Z","owner_code":"WDTLEN04","status":90}
But the input_date is not recognized on the sql-client and is null, even i  
tried   1583828700240  "2020-03-11 13:00:00"  "2020-03-11 13:00:00.000" 

How should the timestamp(3) look like in the json message?

Thanks,
Lei
 




wangl...@geekplus.com.cn
 
发件人: wangl...@geekplus.com.cn
发送时间: 2020-03-11 17:41
收件人: user
主题: json 中 timestamp 类型在json中怎样写才能被 flink sql 识别

用 sql-client create 了一个 kafka table:
CREATE TABLE order_status (
  out_order_code VARCHAR,
  intput_date TIMESTAMP(3),
  owner_code VARCHAR,
  status INT
  ) WITH (
  'connector.type' = 'kafka',.
  'format.type' = 'json',  'format.derive-schema' = 'true'   )
然后往 kafka 这个 topic 
发送消息:{"out_order_code":"MAMICK2020031048","input_date":"2020-03-11T13:00:00.123Z","owner_code":"WDTLEN04","status":90}
input_date 在 sql-clinet 端始终是 NULL. 
我把 发送的 input_date 改成 1583828700240  "2020-03-11 13:00:00"  "2020-03-11 
13:00:00.000" 也都不行。 
这个 TIMESTAMP(3)在JSON 中应该写成什么样子呢?

谢谢,
王磊



wangl...@geekplus.com.cn


json 中 timestamp 类型在json中怎样写才能被 flink sql 识别

2020-03-11 Thread wangl...@geekplus.com.cn

用 sql-client create 了一个 kafka table:
CREATE TABLE order_status (
  out_order_code VARCHAR,
  intput_date TIMESTAMP(3),
  owner_code VARCHAR,
  status INT
  ) WITH (
  'connector.type' = 'kafka',.
  'format.type' = 'json',  'format.derive-schema' = 'true'   )
然后往 kafka 这个 topic 
发送消息:{"out_order_code":"MAMICK2020031048","input_date":"2020-03-11T13:00:00.123Z","owner_code":"WDTLEN04","status":90}
input_date 在 sql-clinet 端始终是 NULL. 
我把 发送的 input_date 改成 1583828700240  "2020-03-11 13:00:00"  "2020-03-11 
13:00:00.000" 也都不行。 
这个 TIMESTAMP(3)在JSON 中应该写成什么样子呢?

谢谢,
王磊



wangl...@geekplus.com.cn


Re: Re: Dose flink-1.10 sql-client support kafka sink?

2020-03-11 Thread wangl...@geekplus.com.cn

I am using flink-1.10.  But I add flink-json-1.9.1.jar and 
flink-sql-connector-kafka_2.11-1.9.1.jar to lib directory.
After change to flink-json-1.10.0.jar, 
flink-sql-connector-kafka_2.12-1.10.0.jar, it works.

But I have no idea why the yaml way works when i use  flink-json-1.9.1.jar and 
flink-sql-connector-kafka_2.11-1.9.1.jar in  flink-1.10 environment.

Thanks,
Lei



wangl...@geekplus.com.cn

 
From: wangl...@geekplus.com.cn
Date: 2020-03-11 14:51
To: Jark Wu
CC: Arvid Heise; user
Subject: Re: Re: Dose flink-1.10 sql-client support kafka sink?
Hi Jark, 

I  have tried to use CREATE table DDL
First  ./bin/sql-client.sh embedded. Then create a table from kafka topic and 
it tell me table has been created.
But when I query with select * from tableName. There's error:

[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a 
suitable table factory for 
'org.apache.flink.table.factories.TableSourceFactory' in
the classpath.

Perhaps i need some jar to the lib directory.
But If i write the table configuration in the sql-client-defaults.yaml file,i 
can select the result correctly

Thanks,
Lei
 



 
From: Jark Wu
Date: 2020-03-11 11:13
To: wangl...@geekplus.com.cn
CC: Arvid Heise; user
Subject: Re: Re: Dose flink-1.10 sql-client support kafka sink?
Hi Lei,

CREATE TABLE DDL [1][2] is the recommended way to register a table since 1.9. 
And the yaml way might be deprecated in the future. 
By using DDL, a registered table can both be used as source and sink. 

Best,
Jark

[1]: 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/create.html#create-table
[2]: 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector

On Tue, 10 Mar 2020 at 21:52, wangl...@geekplus.com.cn 
 wrote:
Thanks, works now. 

Seems it is because i added the  
   schema: "ROW(out_order_code STRING,input_date BIGINT, owner_code STRING, 
status INT)"

under format label.

From: Arvid Heise
Date: 2020-03-10 20:51
To: wangl...@geekplus.com.cn
CC: user
Subject: Re: Dose flink-1.10 sql-client support kafka sink?
Hi Lei,

yes Kafka as a sink is supported albeit only for appends (no deletions/updates 
yet) [1].

An example is a bit hidden in the documentation [2]:
tables:
  - name: MyTableSink
type: sink-table
update-mode: append
connector:
  property-version: 1
  type: kafka
  version: "0.11"
  topic: OutputTopic
  properties:
zookeeper.connect: localhost:2181
bootstrap.servers: localhost:9092
group.id: testGroup
format:
  property-version: 1
  type: json
  derive-schema: true
schema:
  - name: rideId
data-type: BIGINT
  - name: lon
data-type: FLOAT
  - name: lat
data-type: FLOAT
  - name: rideTime
data-type: TIMESTAMP(3)
[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector
[2] 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sqlClient.html#detached-sql-queries

On Tue, Mar 10, 2020 at 10:51 AM wangl...@geekplus.com.cn 
 wrote:

I have configured  source table successfully using the following configuration:

- name: out_order
type: source
update-mode: append
schema:
- name: out_order_code
  type: STRING
- name: input_date
  type: BIGINT
- name: owner_code
  type: STRING
connector:
  property-version: 1
  type: kafka
  version: universal
  topic: out_order
  startup-mode: latest-offset
  properties:
  - key: zookeeper.connect
value: 172.19.78.32:2181
  - key: bootstrap.servers
value: 172.19.78.32:9092
  - key: group.id
  property-version: 1
  type: json
  schema: "ROW(out_order_code STRING,owner_code STRING,input_date BIGINT)"

How can i configure a sink table? I haven't found any useful docs for this.

Thanks,
Lei


Re: Re: Dose flink-1.10 sql-client support kafka sink?

2020-03-11 Thread wangl...@geekplus.com.cn
Hi Jark, 

I  have tried to use CREATE table DDL
First  ./bin/sql-client.sh embedded. Then create a table from kafka topic and 
it tell me table has been created.
But when I query with select * from tableName. There's error:

[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a 
suitable table factory for 
'org.apache.flink.table.factories.TableSourceFactory' in
the classpath.

Perhaps i need some jar to the lib directory.
But If i write the table configuration in the sql-client-defaults.yaml file,i 
can select the result correctly

Thanks,
Lei
 



 
From: Jark Wu
Date: 2020-03-11 11:13
To: wangl...@geekplus.com.cn
CC: Arvid Heise; user
Subject: Re: Re: Dose flink-1.10 sql-client support kafka sink?
Hi Lei,

CREATE TABLE DDL [1][2] is the recommended way to register a table since 1.9. 
And the yaml way might be deprecated in the future. 
By using DDL, a registered table can both be used as source and sink. 

Best,
Jark

[1]: 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/create.html#create-table
[2]: 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector

On Tue, 10 Mar 2020 at 21:52, wangl...@geekplus.com.cn 
 wrote:
Thanks, works now. 

Seems it is because i added the  
   schema: "ROW(out_order_code STRING,input_date BIGINT, owner_code STRING, 
status INT)"

under format label.

From: Arvid Heise
Date: 2020-03-10 20:51
To: wangl...@geekplus.com.cn
CC: user
Subject: Re: Dose flink-1.10 sql-client support kafka sink?
Hi Lei,

yes Kafka as a sink is supported albeit only for appends (no deletions/updates 
yet) [1].

An example is a bit hidden in the documentation [2]:
tables:
  - name: MyTableSink
type: sink-table
update-mode: append
connector:
  property-version: 1
  type: kafka
  version: "0.11"
  topic: OutputTopic
  properties:
zookeeper.connect: localhost:2181
bootstrap.servers: localhost:9092
group.id: testGroup
format:
  property-version: 1
  type: json
  derive-schema: true
schema:
  - name: rideId
data-type: BIGINT
  - name: lon
data-type: FLOAT
  - name: lat
data-type: FLOAT
  - name: rideTime
data-type: TIMESTAMP(3)
[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector
[2] 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sqlClient.html#detached-sql-queries

On Tue, Mar 10, 2020 at 10:51 AM wangl...@geekplus.com.cn 
 wrote:

I have configured  source table successfully using the following configuration:

- name: out_order
type: source
update-mode: append
schema:
- name: out_order_code
  type: STRING
- name: input_date
  type: BIGINT
- name: owner_code
  type: STRING
connector:
  property-version: 1
  type: kafka
  version: universal
  topic: out_order
  startup-mode: latest-offset
  properties:
  - key: zookeeper.connect
value: 172.19.78.32:2181
  - key: bootstrap.servers
value: 172.19.78.32:9092
  - key: group.id
  property-version: 1
  type: json
  schema: "ROW(out_order_code STRING,owner_code STRING,input_date BIGINT)"

How can i configure a sink table? I haven't found any useful docs for this.

Thanks,
Lei


Re: Re: flink sql join 可以有 state 存储并从 state 恢复数据吗?

2020-03-10 Thread wangl...@geekplus.com.cn
Hi Kurt, 
确实是可以 直接 flink  cancel -s 保存状态。
但我是用 flink-sql-client 直接写 sql 提交的 job,再提交的时候怎样可以指定状态目录让这个任务从状态恢复呢?

谢谢,
王磊
 
Sender: Kurt Young
Send Time: 2020-03-11 10:38
Receiver: user-zh
Subject: Re: flink sql join 可以有 state 存储并从 state 恢复数据吗?
理论上来说,flink SQL的作业在编译完生成JobGraph并提交到集群上后,和Datastream的作业就没有什么本质的不同了。
应该也可以支持flink cancel -s 的功能,你可以先试下,如果碰到什么问题再看看。
 
Best,
Kurt
 
 
On Wed, Mar 11, 2020 at 10:24 AM wangl...@geekplus.com.cn <
wangl...@geekplus.com.cn> wrote:
 
> 有两个表:
> tableA: key  valueA
> tableB: key  valueB
>
> 我之前用 flink state 的方式存储 tableA,tableB 的消息过来以后去 query 这个 state 得到 valueA
> 直接 写 flinkSQL 也可以实现这种功能,但这两个表有时间差,任务停止后重新提交会丢失部分 join 的结果。
> flinkSQL 有没有类似 flink cancel -s 保存 state 的功能呢?
>
> 谢谢,
> 王磊
>


flink sql join 可以有 state 存储并从 state 恢复数据吗?

2020-03-10 Thread wangl...@geekplus.com.cn
有两个表:
tableA: key  valueA
tableB: key  valueB

我之前用 flink state 的方式存储 tableA,tableB 的消息过来以后去 query 这个 state 得到 valueA
直接 写 flinkSQL 也可以实现这种功能,但这两个表有时间差,任务停止后重新提交会丢失部分 join 的结果。
flinkSQL 有没有类似 flink cancel -s 保存 state 的功能呢?

谢谢,
王磊


Re: Re: Dose flink-1.10 sql-client support kafka sink?

2020-03-10 Thread wangl...@geekplus.com.cn
Thanks, works now. 

Seems it is because i added the  
   schema: "ROW(out_order_code STRING,input_date BIGINT, owner_code STRING, 
status INT)"

under format label.

From: Arvid Heise
Date: 2020-03-10 20:51
To: wangl...@geekplus.com.cn
CC: user
Subject: Re: Dose flink-1.10 sql-client support kafka sink?
Hi Lei,

yes Kafka as a sink is supported albeit only for appends (no deletions/updates 
yet) [1].

An example is a bit hidden in the documentation [2]:
tables:
  - name: MyTableSink
type: sink-table
update-mode: append
connector:
  property-version: 1
  type: kafka
  version: "0.11"
  topic: OutputTopic
  properties:
zookeeper.connect: localhost:2181
bootstrap.servers: localhost:9092
group.id: testGroup
format:
  property-version: 1
  type: json
  derive-schema: true
schema:
  - name: rideId
data-type: BIGINT
  - name: lon
data-type: FLOAT
  - name: lat
data-type: FLOAT
  - name: rideTime
data-type: TIMESTAMP(3)
[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector
[2] 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sqlClient.html#detached-sql-queries

On Tue, Mar 10, 2020 at 10:51 AM wangl...@geekplus.com.cn 
 wrote:

I have configured  source table successfully using the following configuration:

- name: out_order
type: source
update-mode: append
schema:
- name: out_order_code
  type: STRING
- name: input_date
  type: BIGINT
- name: owner_code
  type: STRING
connector:
  property-version: 1
  type: kafka
  version: universal
  topic: out_order
  startup-mode: latest-offset
  properties:
  - key: zookeeper.connect
value: 172.19.78.32:2181
  - key: bootstrap.servers
value: 172.19.78.32:9092
  - key: group.id
  property-version: 1
  type: json
  schema: "ROW(out_order_code STRING,owner_code STRING,input_date BIGINT)"

How can i configure a sink table? I haven't found any useful docs for this.

Thanks,
Lei


Re: Re: Kafka sink only support append mode?

2020-03-10 Thread wangl...@geekplus.com.cn
Hi Jark, 
Thanks  for the  explanation.
The group by statement will result a not append stream. 
I have just tried a join statement and want to send the result to kafka, it 
also has the error:
 AppendStreamTableSink requires that Table has only insert changes
Why the join result is not appendable. It confused me.

Thanks,
Lei
 
From: Jark Wu
Date: 2020-03-09 19:25
To: wangl...@geekplus.com.cn
CC: user
Subject: Re: Kafka sink only support append mode?
Hi Lei,

Yes. Currently, Kafka sink only supports append mode. Other update mode (e.g. 
upsert mode / retract mode) is on the agenda. 
For now, you can customize a KafkaTableSink with implementing 
UpsertStreamTableSink interface, where you will get a Tuple2 
records, 
and the Boolean represents insert or delete operation. Then you can encode the 
insert/delete operation into Kafka storage or just ignore the operations. 

Best,
Jark

On Mon, 9 Mar 2020 at 19:14, wangl...@geekplus.com.cn 
 wrote:
I  wrote a simple program reading from kafka using  sql  and sink to kafka.
But only  'update-mode' = 'append' is supported for sink table and the query 
sql must have no group statement.
Only append mode is supported for kafka sink?

Thanks,
Lei




Dose flink-1.10 sql-client support kafka sink?

2020-03-10 Thread wangl...@geekplus.com.cn

I have configured  source table successfully using the following configuration:

- name: out_order
type: source
update-mode: append
schema:
- name: out_order_code
  type: STRING
- name: input_date
  type: BIGINT
- name: owner_code
  type: STRING
connector:
  property-version: 1
  type: kafka
  version: universal
  topic: out_order
  startup-mode: latest-offset
  properties:
  - key: zookeeper.connect
value: 172.19.78.32:2181
  - key: bootstrap.servers
value: 172.19.78.32:9092
  - key: group.id
  property-version: 1
  type: json
  schema: "ROW(out_order_code STRING,owner_code STRING,input_date BIGINT)"

How can i configure a sink table? I haven't found any useful docs for this.

Thanks,
Lei


join key 有重复的双流 join 怎样去重后发送到 kafka

2020-03-10 Thread wangl...@geekplus.com.cn
有两个 kafka 作为数据源的表 
order_info:
  order_no   info
order_status: 
  order_no  status

两个表的 order_no 都会有重复,来一条其中一个表的记录,会在另外一个表中找到多条记录。
我怎样实现在另外一个表中只取出与该 join key 相关的最新的一条记录并发送到 kafka 中呢?

kafka 只支持 append 模式的 sink,先把 表 group 再join 行不通。

谢谢,
王磊




Kafka sink only support append mode?

2020-03-09 Thread wangl...@geekplus.com.cn
I  wrote a simple program reading from kafka using  sql  and sink to kafka.
But only  'update-mode' = 'append' is supported for sink table and the query 
sql must have no group statement.
Only append mode is supported for kafka sink?

Thanks,
Lei




How to use self defined json format when create table from kafka stream?

2020-03-04 Thread wangl...@geekplus.com.cn

I want to rigister a table from mysql binlog like this: 
tEnv.sqlUpdate("CREATE TABLE order(\n"
+ "order_id BIGINT,\n"
+ "order_no VARCHAR,\n"
+ ") WITH (\n"
+ "'connector.type' = 'kafka',\n"
...
+ "'update-mode' = 'append',\n"
+ "'format.type' = 'json',\n"
+ "'format.derive-schema' = 'true'\n"
+ ")");using the following log format: 
{
  "type" : "update",
  "timestamp" : 1583373066000,
  "binlog_filename" : "mysql-bin.000453",
  "binlog_position" : 923020943,
  "database" : "wms",
  "table_name" : "t_pick_order",
  "table_id" : 131936,
  "columns" : [ {
"id" : 1,
"name" : "order_id",
"column_type" : -5,
"last_value" : 4606458,
"value" : 4606458
  }, {
"id" : 2,
"name" : "order_no",
"column_type" : 12,
"last_value" : "EDBMFSJ1S2003050006628",
"value" : "EDBMFSJ1S2003050006628"
  }]
}

Surely the format.type' = 'json',\n" will not parse the result as I expected.
Is there any method I can implement this? For example, using a self defined 
format class.

Thanks,
Lei



wangl...@geekplus.com.cn



Submit high version compiled code jar to low version flink cluster?

2019-12-29 Thread wangl...@geekplus.com.cn

The flink cluster version is 1.8.2
The application source code needs some feature only supported in 1.9.1.  So it 
is compiled with flink-1.9.1 denendency and builed to a fat jar with all the 
flink dependencies.
What it will happen if I submit the high version builed jar to the low verion 
flink cluster?

Thansk,
Lei






Re: Re: Flink State 过期清除 TTL 问题

2019-12-09 Thread wangl...@geekplus.com.cn
Hi 唐云,

我的集群已经升到了 1.8.2,  cleanupFullSnapshot 和 cleanupInRocksdbCompactFilter 都试验了下。
但 cancel -s 停止后, 生成的 savepoint 目录还是没有变小。过程是这样的:

cancel -s 停止,savepoint 目录大小为 100M
代码变更,把原来的 setUpdateType 变为 cleanupFullSnapshot 
新的代码从 1 的 savepoint 目录恢复
新的代码运行一天左右,再 cancel -s, 新的 savepoint 目录变大

会不会是 每次 flink run -s ,从已有 savepoint 目录中恢复的数据所有的 updateTime 都变成了当前时间? 

谢谢,
王磊



wangl...@geekplus.com.cn
 
Sender: Yun Tang
Send Time: 2019-11-01 01:38
Receiver: user-zh@flink.apache.org
Subject: Re: Flink State 过期清除 TTL 问题
Hi 王磊
 
从你的配置以及使用Flink-1.7版本看,并没有启用主动清理过期数据的功能 [1],建议对StateTtlConfig 配置 
cleanupFullSnapshot,这样你在执行full 
snapshot(也就是Savepoint)的时候,会把过期的数据不保存在你的Savepoint内容中。如果你不启用主动清理,只有当原先的数据被读取时,才会进行清理,这可能就导致你的Savepoint内容不断增长的原因。
 
另外,建议升级到Flink-1.8+,对state TTL的特性支持更好,可以参考中文文章 [2]
 
[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/state/state.html#cleanup-of-expired-state
[2] https://www.infoq.cn/article/L_66ga2G9oLWypvOLtIv
 
祝好
唐云
 
 
On 10/31/19, 10:25 AM, "wangl...@geekplus.com.cn"  
wrote:
 
flink-1.7.2, 用下面的清除策略:StateTtlConfig ttlConfig = StateTtlConfig

.newBuilder(Time.days(3)).setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite).setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired).build();
ValueStateDescriptor descriptor = new 
ValueStateDescriptor("OrderState", TypeInformation.of(OrderState.class));
descriptor.enableTimeToLive(ttlConfig);

程序更新时用 savepoint 的方式, flink cancel -s 保存到 savepoint 目录,再从 savepoint 目录恢复。
我的程序运行了一段时间,已经远大于 3 天,每次 flink cancel -s 后生成的 savepoint 目录不断变大。是过期清除策略没生效吗?

谢谢,
王磊

    

wangl...@geekplus.com.cn

 


Re: Re: IOException when using Prometheus Monitor

2019-11-20 Thread wangl...@geekplus.com.cn
After downgrade the  pushgateway to pushgateway-0.8.0.linux-amd64.tar.gz, no 
this Exception again.

Thanks very much.



wangl...@geekplus.com.cn
 
From: wangl...@geekplus.com.cn
Date: 2019-11-20 18:19
To: Chesnay Schepler; user
Subject: Re: Re: IOException when using Prometheus Monitor
Hi Chesnay,

Although there's Exception, actually the metrics has been put to the 
pushgateway successfully. 

Promethues version i used:

prometheus-2.8.1.linux-amd64.tar.gz
pushgateway-1.0.0.linux-amd64.tar.gz
flink-metrics-prometheus_2.12-1.8.2.jar

I just download the tar.gz file to CentOS node,  tar -xzvf and then start the 
process.

Thanks,
Lei





wangl...@geekplus.com.cn
 
From: Chesnay Schepler
Date: 2019-11-20 17:46
To: wangl...@geekplus.com.cn; user
Subject: Re: IOException when using Prometheus Monitor
From what I found so far this appears to be an incompatibility between the 
pushgateway and client version.

So you can either
a) use an older version of the pushgateway
b) bump the version of the prometheus reporter.

Unfortunately I cannot tell you which version you would need.

On 20/11/2019 10:24, wangl...@geekplus.com.cn wrote:
Standalone session  flink, 1.8.2 version.
Install prometheus and pushgateway on the same host.

After start-cluster.sh,  every 10 seconds there's following error log:
2019-11-20 17:23:12,849 WARN  
org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter  - Failed to 
push metrics to PushGateway with jobName flink58c234884dcfb60a860d7a040aab6a4a.
java.io.IOException: Response code from 
http://10.44.51.23:9091/metrics/job/flink58c234884dcfb60a860d7a040aab6a4a was 
200
at 
org.apache.flink.shaded.io.prometheus.client.exporter.PushGateway.doRequest(PushGateway.java:297)
at 
org.apache.flink.shaded.io.prometheus.client.exporter.PushGateway.push(PushGateway.java:105)
at 
org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter.report(PrometheusPushGatewayReporter.java:76)
at 
org.apache.flink.runtime.metrics.MetricRegistryImpl$ReporterTask.run(MetricRegistryImpl.java:430)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)


Any insight on this?

Thanks,
Lei




wangl...@geekplus.com.cn



Re: Re: IOException when using Prometheus Monitor

2019-11-20 Thread wangl...@geekplus.com.cn
Hi Chesnay,

Although there's Exception, actually the metrics has been put to the 
pushgateway successfully. 

Promethues version i used:

prometheus-2.8.1.linux-amd64.tar.gz
pushgateway-1.0.0.linux-amd64.tar.gz
flink-metrics-prometheus_2.12-1.8.2.jar

I just download the tar.gz file to CentOS node,  tar -xzvf and then start the 
process.

Thanks,
Lei





wangl...@geekplus.com.cn
 
From: Chesnay Schepler
Date: 2019-11-20 17:46
To: wangl...@geekplus.com.cn; user
Subject: Re: IOException when using Prometheus Monitor
From what I found so far this appears to be an incompatibility between the 
pushgateway and client version.

So you can either
a) use an older version of the pushgateway
b) bump the version of the prometheus reporter.

Unfortunately I cannot tell you which version you would need.

On 20/11/2019 10:24, wangl...@geekplus.com.cn wrote:
Standalone session  flink, 1.8.2 version.
Install prometheus and pushgateway on the same host.

After start-cluster.sh,  every 10 seconds there's following error log:
2019-11-20 17:23:12,849 WARN  
org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter  - Failed to 
push metrics to PushGateway with jobName flink58c234884dcfb60a860d7a040aab6a4a.
java.io.IOException: Response code from 
http://10.44.51.23:9091/metrics/job/flink58c234884dcfb60a860d7a040aab6a4a was 
200
at 
org.apache.flink.shaded.io.prometheus.client.exporter.PushGateway.doRequest(PushGateway.java:297)
at 
org.apache.flink.shaded.io.prometheus.client.exporter.PushGateway.push(PushGateway.java:105)
at 
org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter.report(PrometheusPushGatewayReporter.java:76)
at 
org.apache.flink.runtime.metrics.MetricRegistryImpl$ReporterTask.run(MetricRegistryImpl.java:430)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)


Any insight on this?

Thanks,
Lei




wangl...@geekplus.com.cn



IOException when using Prometheus Monitor

2019-11-20 Thread wangl...@geekplus.com.cn
Standalone session  flink, 1.8.2 version.
Install prometheus and pushgateway on the same host.

After start-cluster.sh,  every 10 seconds there's following error log:
2019-11-20 17:23:12,849 WARN  
org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter  - Failed to 
push metrics to PushGateway with jobName flink58c234884dcfb60a860d7a040aab6a4a.
java.io.IOException: Response code from 
http://10.44.51.23:9091/metrics/job/flink58c234884dcfb60a860d7a040aab6a4a was 
200
at 
org.apache.flink.shaded.io.prometheus.client.exporter.PushGateway.doRequest(PushGateway.java:297)
at 
org.apache.flink.shaded.io.prometheus.client.exporter.PushGateway.push(PushGateway.java:105)
at 
org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter.report(PrometheusPushGatewayReporter.java:76)
at 
org.apache.flink.runtime.metrics.MetricRegistryImpl$ReporterTask.run(MetricRegistryImpl.java:430)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)


Any insight on this?

Thanks,
Lei




wangl...@geekplus.com.cn


state TTL 变更问题

2019-11-15 Thread wangl...@geekplus.com.cn

有一个程序用到了 state, 设置 TTL 为3天。
运行一段时间后 cancel -s 停止,把过期时间设为 7 天,再从 state 文件恢复运行。

cancel -s 停止时生成的文件里面的所有 key,TTL 都会变成 7 天吗? 还是依然是 3 天?

谢谢,
王磊 





wangl...@geekplus.com.cn


从 state 中恢复数据,更改 yarn container 个数会有影响吗

2019-11-04 Thread wangl...@geekplus.com.cn

从 RocketMQ 中消费数据做处理。
代码中最大的并行度为 8, 提交任务时指定 -ys 4 ,会自动分配 2 个 container 作为 taskMgr
运行一段时间后以 savepoint 方式停止。
再从 savepoint 恢复,此时指定 -ys 2 , 会分配 4 个container 作为 taskMgr , 但任务提交后程序不从 RocketMQ 
消费数据了,消费 TPS 一直是 0,这是什么原因呢?


谢谢,
王磊 




wangl...@geekplus.com.cn


Re: Re: 怎样把 state 定时写到外部存储

2019-10-31 Thread wangl...@geekplus.com.cn
Hi Congxian,

以 sink 的方式写出去最终也要落在某个地方才能供查询使用啊。
我们的 case 是写到 MySQL 中



wangl...@geekplus.com.cn
 
Sender: Congxian Qiu
Send Time: 2019-11-01 10:10
Receiver: user-zh
Subject: Re: 怎样把 state 定时写到外部存储
好奇为什么要把 State 定期写到外存呢?是外部系统需要使用这些 State 吗?那为什么不把 State 以 sink 的方式写出去呢?
 
Best,
Congxian
 
 
Jun Zhang <825875...@qq.com> 于2019年10月31日周四 上午10:36写道:
 
> 是否可以注册一个定时器?
>
>
> 你看看这个文章,是否对你有帮助
>
>
> https://mp.weixin.qq.com/s/VUEvvoHoupZMpxWQsEeHEA
>  在2019年10月31日 10:16,wangl...@geekplus.com.cn 写道:
>
>
> 消息驱动,QPS 很高, 每一条消息来都会更改 state 值,如果每一条消息来都写外部存储下游撑不住。
> 有没有什么方式可以定期读 state 写到外部存储?
> 我现在用的是 Flink1.7.2 版本。
>
>
>
>
>
> wangl...@geekplus.com.cn


Re: Re: Flink State 过期清除 TTL 问题

2019-10-31 Thread wangl...@geekplus.com.cn
谢谢,了解了。

王磊



wangl...@geekplus.com.cn
 
Sender: Yun Tang
Send Time: 2019-11-01 01:38
Receiver: user-zh@flink.apache.org
Subject: Re: Flink State 过期清除 TTL 问题
Hi 王磊
 
从你的配置以及使用Flink-1.7版本看,并没有启用主动清理过期数据的功能 [1],建议对StateTtlConfig 配置 
cleanupFullSnapshot,这样你在执行full 
snapshot(也就是Savepoint)的时候,会把过期的数据不保存在你的Savepoint内容中。如果你不启用主动清理,只有当原先的数据被读取时,才会进行清理,这可能就导致你的Savepoint内容不断增长的原因。
 
另外,建议升级到Flink-1.8+,对state TTL的特性支持更好,可以参考中文文章 [2]
 
[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/state/state.html#cleanup-of-expired-state
[2] https://www.infoq.cn/article/L_66ga2G9oLWypvOLtIv
 
祝好
唐云
 
 
On 10/31/19, 10:25 AM, "wangl...@geekplus.com.cn"  
wrote:
 
flink-1.7.2, 用下面的清除策略:StateTtlConfig ttlConfig = StateTtlConfig

.newBuilder(Time.days(3)).setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite).setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired).build();
ValueStateDescriptor descriptor = new 
ValueStateDescriptor("OrderState", TypeInformation.of(OrderState.class));
descriptor.enableTimeToLive(ttlConfig);

程序更新时用 savepoint 的方式, flink cancel -s 保存到 savepoint 目录,再从 savepoint 目录恢复。
我的程序运行了一段时间,已经远大于 3 天,每次 flink cancel -s 后生成的 savepoint 目录不断变大。是过期清除策略没生效吗?

谢谢,
王磊

    

wangl...@geekplus.com.cn

 


Flink State 过期清除 TTL 问题

2019-10-30 Thread wangl...@geekplus.com.cn
flink-1.7.2, 用下面的清除策略:StateTtlConfig ttlConfig = StateTtlConfig

.newBuilder(Time.days(3)).setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite).setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired).build();
ValueStateDescriptor descriptor = new 
ValueStateDescriptor("OrderState", TypeInformation.of(OrderState.class));
descriptor.enableTimeToLive(ttlConfig);

程序更新时用 savepoint 的方式, flink cancel -s 保存到 savepoint 目录,再从 savepoint 目录恢复。
我的程序运行了一段时间,已经远大于 3 天,每次 flink cancel -s 后生成的 savepoint 目录不断变大。是过期清除策略没生效吗?

谢谢,
王磊



wangl...@geekplus.com.cn


怎样把 state 定时写到外部存储

2019-10-30 Thread wangl...@geekplus.com.cn

消息驱动,QPS 很高, 每一条消息来都会更改 state 值,如果每一条消息来都写外部存储下游撑不住。
有没有什么方式可以定期读 state 写到外部存储? 
我现在用的是 Flink1.7.2 版本。





wangl...@geekplus.com.cn


Re: Re: CsvTableSink 目录没有写入具体的数据

2019-08-08 Thread wangl...@geekplus.com.cn

抱歉,是我搞错了。
实际上是写入数据的。我在 windows 下做测试,刷新下文件的大小始终是 0 , 只有编辑看下那个文件显示的文件大小才会变更。



wangl...@geekplus.com.cn
 
Sender: Alec Chen
Send Time: 2019-08-09 10:17
Receiver: user-zh
Subject: Re: Re: CsvTableSink 目录没有写入具体的数据
没数据是因为没有trigger执行,  参考sample code from doc(
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/common.html
)
 
// get a StreamTableEnvironment, works for BatchTableEnvironment
equivalentlyStreamTableEnvironment tableEnv =
StreamTableEnvironment.create(env);
// create a TableSinkTableSink sink = new
CsvTableSink("/path/to/file", fieldDelim = "|");
// register the TableSink with a specific schemaString[] fieldNames =
{"a", "b", "c"};TypeInformation[] fieldTypes = {Types.INT,
Types.STRING, Types.LONG};tableEnv.registerTableSink("CsvSinkTable",
fieldNames, fieldTypes, sink);
// compute a result Table using Table API operators and/or SQL
queriesTable result = ...// emit the result Table to the registered
TableSinkresult.insertInto("CsvSinkTable");
// execute the program
 
加上 tableEnv.execute();
 
 
wangl...@geekplus.com.cn  于2019年8月9日周五 上午9:42写道:
 
>
> 我接入了一个 RocketMQ 的流作为输入。
>
>
>  DataStream> ds = env.addSource(new
> RocketMQSource(
> 
> System.out.println(res);
> return res;
> }
> });
>
>
> tableEnv.registerDataStream("t_pick_task", ds, "pick_task_id,
> pick_list_no, sku_code");
>
> TableSink csvSink = new CsvTableSink("D:\\data\\flink",",");
> String[] fieldNames = {"num"};
> TypeInformation[] fieldTypes = {Types.INT};
> tableEnv.registerTableSink("RubberOrders", fieldNames, fieldTypes,
> csvSink);
> tableEnv.sqlUpdate(
> "INSERT INTO RubberOrders SELECT pick_task_id FROM
> t_pick_task");
>
>
>
> wangl...@geekplus.com.cn
>
> Sender: Alec Chen
> Send Time: 2019-08-08 21:01
> Receiver: user-zh
> Subject: Re: CsvTableSink 目录没有写入具体的数据
> 完整代码发一下
>
> wangl...@geekplus.com.cn  于2019年8月8日周四 下午7:37写道:
>
> >
> > 我按官网上的
> >
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/sql.html#specifying-a-query
> > 例子写的代码
> > 但运行后 CsvTableSink 指定的目录只生成了空文件,没有具体的内容,这是为什么呢?
> >
> >
> >
> > wangl...@geekplus.com.cn
> >
>


Re: Re: CsvTableSink 目录没有写入具体的数据

2019-08-08 Thread wangl...@geekplus.com.cn
   
我接入了一个 RocketMQ 的流作为输入。


 DataStream> ds = env.addSource(new 
RocketMQSource(

System.out.println(res);
return res;
}
});


tableEnv.registerDataStream("t_pick_task", ds, "pick_task_id, 
pick_list_no, sku_code");

TableSink csvSink = new CsvTableSink("D:\\data\\flink",",");
String[] fieldNames = {"num"};
TypeInformation[] fieldTypes = {Types.INT};
tableEnv.registerTableSink("RubberOrders", fieldNames, fieldTypes, 
csvSink);
tableEnv.sqlUpdate(
"INSERT INTO RubberOrders SELECT pick_task_id FROM 
t_pick_task");



wangl...@geekplus.com.cn
 
Sender: Alec Chen
Send Time: 2019-08-08 21:01
Receiver: user-zh
Subject: Re: CsvTableSink 目录没有写入具体的数据
完整代码发一下
 
wangl...@geekplus.com.cn  于2019年8月8日周四 下午7:37写道:
 
>
> 我按官网上的
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/sql.html#specifying-a-query
> 例子写的代码
> 但运行后 CsvTableSink 指定的目录只生成了空文件,没有具体的内容,这是为什么呢?
>
>
>
> wangl...@geekplus.com.cn
>


CsvTableSink 目录没有写入具体的数据

2019-08-08 Thread wangl...@geekplus.com.cn

我按官网上的 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/sql.html#specifying-a-query
  例子写的代码
但运行后 CsvTableSink 指定的目录只生成了空文件,没有具体的内容,这是为什么呢?



wangl...@geekplus.com.cn


Does RocksDBStateBackend need a separate RocksDB service?

2019-08-07 Thread wangl...@geekplus.com.cn

In  my  code,  I just setStateBackend with a hdfs direcoty.
   env.setStateBackend(new 
RocksDBStateBackend("hdfs://user/test/job")); 

Is there an embeded  RocksDB  service in the flink task? 



wangl...@geekplus.com.cn


Re: Re: Unable to restore state value after job failed using RocksDBStateBackend

2019-06-25 Thread wangl...@geekplus.com.cn

I  start and cancel it just in my intellij idea development environment.

First click the run button, then click the red stop button, and then click the 
run button again. 

Let me google about the savepoint.

Thanks,
Lei Wang




wangl...@geekplus.com.cn
 
From: Stephan Ewen
Date: 2019-06-25 20:36
To: user
Subject: Re: Unable to restore state value after job failed using 
RocksDBStateBackend
If you manually cancel and restart the job, state is only carried forward if 
you use a savepoint.
Can you check if that is what you are doing?

On Tue, Jun 25, 2019 at 2:21 PM Simon Su  wrote:

Hi wanglei

 Can you post how you restart the job ? 

Thanks,
Simon
On 06/25/2019 20:11,wangl...@geekplus.com.cn wrote: 
public class StateProcessTest extends KeyedProcessFunction, String> {

private transient ValueState> state;
public void processElement(Tuple2 value, Context ctx, 
Collector out) throws Exception {


Tuple2 stateValue = state.value();

if(stateValue == null){
log.info("##  initialize");
stateValue = new Tuple2(34l,56l);
}
state.update(stateValue);
  
}

@Override
public void open(Configuration parameters) throws Exception {
ValueStateDescriptor> descriptor = new 
ValueStateDescriptor>("avg", TypeInformation.of(
new TypeHint>() {}));
state = getRuntimeContext().getState(descriptor);
}
}



Every time I restarted the job,   The stateValue is still null.




wangl...@geekplus.com.cn
 


Unable to restore state value after job failed using RocksDBStateBackend

2019-06-25 Thread wangl...@geekplus.com.cn
public class StateProcessTest extends KeyedProcessFunction, String> {

private transient ValueState> state;
public void processElement(Tuple2 value, Context ctx, 
Collector out) throws Exception {


Tuple2 stateValue = state.value();

if(stateValue == null){
log.info("##  initialize");
stateValue = new Tuple2(34l,56l);
}
state.update(stateValue);
  
}

@Override
public void open(Configuration parameters) throws Exception {
ValueStateDescriptor> descriptor = new 
ValueStateDescriptor>("avg", TypeInformation.of(
new TypeHint>() {}));
state = getRuntimeContext().getState(descriptor);
}
}



Every time I restarted the job,   The stateValue is still null.




wangl...@geekplus.com.cn
 


How to trigger the window function even there's no message input in this window?

2019-06-14 Thread wangl...@geekplus.com.cn

windowAll(TumblingProcessingTimeWindows.of(Time.seconds(10))).process(new 
MyProcessWindowFunction());How can i trigger the MyProcessWindowFunction even 
there's no input during this window time? 



wangl...@geekplus.com.cn



Re: Re: How can i just implement a crontab function using flink?

2019-06-14 Thread wangl...@geekplus.com.cn

I  tried。 But the  MyProcessWindowFunction still not tigged when there's no 
event in the window

Any insight on this?


source.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks() {
@Override
public Watermark getCurrentWatermark() {
return new Watermark(System.currentTimeMillis() - 1);
}

@Override
public long extractTimestamp(Map map, long l) {
return System.currentTimeMillis();
}
}).windowAll(TumblingEventTimeWindows.of(Time.seconds(10))).process(new 
MyProcessWindowFunction());



wangl...@geekplus.com.cn
 
From: Puneet Kinra
Date: 2019-05-24 17:02
To: wangl...@geekplus.com.cn
CC: user
Subject: Re: How can i just implement a crontab function using flink?
There is concept of periodic watermarker , you can use that
if you are working on eventtime.

On Fri, May 24, 2019 at 1:51 PM wangl...@geekplus.com.cn 
 wrote:

I want to do something every one minute.

Using TumblingWindow, the function will not be triigged if there's no message 
received during this minute. But  i still need to execute the funtion.

How can i implement it ? 



wangl...@geekplus.com.cn
 


-- 
Cheers 

Puneet Kinra

Mobile:+918800167808 | Skype : puneet.ki...@customercentria.com
e-mail :puneet.ki...@customercentria.com



Re: Re: How can I add config file as classpath in taskmgr node when submitting a flink job?

2019-05-28 Thread wangl...@geekplus.com.cn

Thanks. Let me have a try


wangl...@geekplus.com.cn
 
From: Yang Wang
Date: 2019-05-28 09:47
To: wangl...@geekplus.com.cn
CC: user
Subject: Re: How can I add config file as classpath in taskmgr node when 
submitting a flink job?
Hi, wangleiYou could use the flink distributed cache to register some config 
files and then access them in your task.1. Register a cached 
fileStreamExecutionEnvironment.registerCachedFile(inputFile.toString(), 
"test_data", false);2. Access the file in your taskfinal Path testFile = 
getRuntimeContext().getDistributedCache().getFile("test_data").toPath();

wangl...@geekplus.com.cn  于2019年5月26日周日 上午12:06写道:

When starting  a single node java application, I can add some config file to it.

How can i implenment it when submitting a flink job? The config file need to be 
read from taskMgr node and used to initialize some classess.





wangl...@geekplus.com.cn


  1   2   >