Re: 关于flink sql 1.10 source并行度自动推断的疑问

2020-03-25 文章 Jingsong Li
Hi,

就像Zhenghua所说,各个tasks是去抢split的,而不是平均分配,所以一旦后面的tasks没有调度起来,前面的tasks会把split抢光的。

但是少slots多并发的场景并不少见,前面tasks读取太多数据可能会让性能/容错都不友好。所以我们也需要引入平均分配的策略。创建了个JIRA
[1], FYI.

[1]https://issues.apache.org/jira/browse/FLINK-16787

Best,
Jingsong Lee

On Wed, Mar 25, 2020 at 6:25 PM Chief  wrote:

> hi Zhenghua Gao
> 好的,谢谢,我拉日志看看。
>
>
> --原始邮件--
> 发件人:"Zhenghua Gao" 发送时间:2020年3月25日(星期三) 晚上6:09
> 收件人:"user-zh"
> 主题:Re: 关于flink sql 1.10 source并行度自动推断的疑问
>
>
>
> Hi Chief,
>
> 目前Hive connector读取数据是通过 InputFormatSourceFunction 来实现的。
> InputFormatSourceFunction 的工作模式不是预分配的模式,而是每个source task向master请求split。
> 如果某些source task提前调度起来且读完了所有的split,后调度起来的source task就没有数据可读了。
> 你可以看看JM/TM日志,确认下是不是前十个调度起来的source task读完了所有的数据。
>
> *Best Regards,*
> *Zhenghua Gao*
>
>
> On Wed, Mar 25, 2020 at 3:31 PM Chief 
>  hinbsp;Jun Zhang
>  您说的我明白,就是不太理解,为什么根据文件数量自动推断任务并行后,不是每个并行任务读取一个文件?
> 
> 
> 
> 
> 
>  --nbsp;原始邮件nbsp;--
>  发件人:nbsp;"Jun Zhang"  发送时间:nbsp;2020年3月25日(星期三) 上午9:08
>  收件人:nbsp;"user-zh" 
>  主题:nbsp;Re: 关于flink sql 1.10 source并行度自动推断的疑问
> 
> 
> 
>  hi,Chief:
> 
> 
> 
> 目前flink读取hive的时候,如果开启了自动推断,系统会根据所读取的文件数来推断并发,如果没有超过最大并发数(默认1000),source的并行度就等于你文件的个数,
>  你可以通过table.exec.hive.infer-source-parallelism.max来设置source的最大并发度。
> 
>  Kurt Young  
>  gt; 你的数据量有多大?有一个可能的原因是source的其他并发调度起来的时候,数据已经被先调度起来的并发读完了。
>  gt;
>  gt; Best,
>  gt; Kurt
>  gt;
>  gt;
>  gt; On Tue, Mar 24, 2020 at 10:39 PM Chief  gt;
>  wrote:
>  gt;
>  gt; gt; hi all:
>  gt; gt; 之前用flink sql查询hive的数据,hive的数据文件是150个,sql
>  gt; gt; client配置文件设置的并行度是10,source通过自动推断生成了150并发,但是通过看web
>  gt; gt; ui发现只有前十个子任务是读到数据了,其他的任务显示没有读到数据,请问是我设置有问题吗?
>  gt;



-- 
Best, Jingsong Lee


Re: 关于 SQL DATE_FORMAT 的时区设置的构想

2020-03-25 文章 Jark Wu
顺便说一下,目前 localtimestamp 的实现看起来是没有问题的。@Dong 你可以先用 localtimestamp 。

在标准里面,以及一些常见数据库中(如 postgres[1], oracle[2]),localtimestamp 是 without time
zone 的实现,
其值是 session zone 看到的值,等于 cast(current_timestamp as timestamp without time
zone)。
所以目前 localtimestamp 的实现应该是没有问题的。

举个例子,理论上,这两个函数的行为应该如下:

> SET time-zone=+08:00

> SELECT CURRENT_TIMESTAMP, LOCALTIMESTAMP;

   EXPR$0 |   EXPR$1
   2020-03-26 09:51:42.299 +08:00  |  2020-03-26 09:51:42.299

目前 Flink 中的 LOCALTIMESTAMP 是符合预期的。

Best,
Jark

[1]:
https://www.postgresql.org/docs/9.5/functions-datetime.html#FUNCTIONS-DATETIME-CURRENT
[2]:
https://docs.oracle.com/cd/B28359_01/server.111/b28286/functions084.htm#SQLRF00660



On Wed, 25 Mar 2020 at 21:46, Kurt Young  wrote:

> 我们先改成 timestamp with local zone,如果这个字段的类型在整个query里都没变过,那个 with time
> zone的效果也差不多了。
>
> Best,
> Kurt
>
>
> On Wed, Mar 25, 2020 at 8:43 PM Zhenghua Gao  wrote:
>
> > Hi Jark,
> >
> > 这里的确是有问题的。
> > 目前的问题是Calcite本身并不支持TIMESTAMP WITH TIME ZONE.
> >
> > *Best Regards,*
> > *Zhenghua Gao*
> >
> >
> > On Tue, Mar 24, 2020 at 11:00 PM Jark Wu  wrote:
> >
> > > Thanks for reporting this Weike.
> > >
> > > 首先,我觉得目前 Flink 返回 TIMESTAMP WITHOUT TIME ZONE 应该是有问题的。
> > > 因为 SQL 标准(SQL:2011 Part 2 Section 6.32)定义了返回类型是 WITH TIME ZONE。
> > > 另外 Calcite 文档中 [1] 也说返回的是 TIMESTAMP WITH TIME ZONE (虽然好像和实现不一致)
> > > 其他的一些数据库也都差不多:mysql [2], oracle[3]
> > >
> > > Best,
> > > Jark
> > >
> > > [1]: https://calcite.apache.org/docs/reference.html#datetime-functions
> > > [2]:
> > >
> > >
> >
> https://dev.mysql.com/doc/refman/8.0/en/date-and-time-functions.html#function_current-timestamp
> > > [3]:
> > >
> > >
> >
> https://docs.oracle.com/cd/B28359_01/server.111/b28286/functions038.htm#SQLRF00629
> > >
> > >
> > >
> > > On Tue, 24 Mar 2020 at 17:00, DONG, Weike 
> > wrote:
> > >
> > > > Hi Zhenghua,
> > > >
> > > > 感谢您的回复。感觉既然 Flink 已经提供了时区的设定,这方面也许可以进一步增强一些。CONVERT_TZ
> > > > 用户很容易忘记或者漏掉,这里还是有不少完善的空间。
> > > >
> > > > Best,
> > > > Weike
> > > >
> > > > On Tue, Mar 24, 2020 at 4:20 PM Zhenghua Gao 
> wrote:
> > > >
> > > > > CURRENT_TIMESTAMP 返回值类型是 TIMESTAMP (WITHOUT TIME ZONE),
> > > > > 其语义可参考 java.time.LocalDateTime。
> > > > > 其字符形式的表示并不随着时区变化而变化(如你所见,和UTC+0 一致)。
> > > > >
> > > > > 你的需求可以通过 CONVERT_TZ(timestamp_string, time_zone_from_string,
> > > > > time_zone_to_string)
> > > > >
> > > > > *Best Regards,*
> > > > > *Zhenghua Gao*
> > > > >
> > > > >
> > > > > On Mon, Mar 23, 2020 at 10:12 PM DONG, Weike <
> > kyled...@connect.hku.hk>
> > > > > wrote:
> > > > >
> > > > > > Hi,
> > > > > >
> > > > > > 近期发现 Flink 的 Blink Planner 在 DATE_FORMAT 对 CURRENT_TIMESTAMP
> > > > > > 做时间格式化为字符串时,默认以 UTC+0 为准。
> > > > > >
> > > > > > 长期以来,TableConfig 类里面有一个 setLocalTimeZone
> > 方法;将其设置为东八区以后,发现格式化后的字符串仍然是
> > > > > UTC+0
> > > > > > 的。而深入来看,Flink 的时间格式化时的代码生成逻辑(time.scala)并未考虑时区的设置。
> > > > > >
> > > > > > 由于大多数用户的时区均不是 UTC+0(GMT、UTC),如果时间格式化、显示等都可以考虑到 TableConfig
> > 中的时区设置,那么
> > > > > Flink
> > > > > > 是否会更用户友好一些呢?当然这个会涉及到不兼容的变更,需要谨慎一些。
> > > > > >
> > > > > > 也许提供一个 DATE_FORMAT_WITH_TIMEZONE 的内置函数,社区是否会更容易接受一些呢?
> > > > > >
> > > > > > 仅仅是个人一点想法,感谢 :)
> > > > > >
> > > > >
> > > >
> > >
> >
>


flink动态分区写入hive如何处理数据倾斜的问题

2020-03-25 文章 Jun Zhang
大家好:
有一个类似的sql 拿官网的这个做示例:INSERT OVERWRITE myparttable SELECT 'Tom', 25, 'type_1', 
'2019-08-08’;
如果实际上第三个type字段,某一种type数据量特别大,导致了数据倾斜,这种情况一般怎么处理呢?
谢谢。

Re: 关于 SQL DATE_FORMAT 的时区设置的构想

2020-03-25 文章 Kurt Young
我们先改成 timestamp with local zone,如果这个字段的类型在整个query里都没变过,那个 with time
zone的效果也差不多了。

Best,
Kurt


On Wed, Mar 25, 2020 at 8:43 PM Zhenghua Gao  wrote:

> Hi Jark,
>
> 这里的确是有问题的。
> 目前的问题是Calcite本身并不支持TIMESTAMP WITH TIME ZONE.
>
> *Best Regards,*
> *Zhenghua Gao*
>
>
> On Tue, Mar 24, 2020 at 11:00 PM Jark Wu  wrote:
>
> > Thanks for reporting this Weike.
> >
> > 首先,我觉得目前 Flink 返回 TIMESTAMP WITHOUT TIME ZONE 应该是有问题的。
> > 因为 SQL 标准(SQL:2011 Part 2 Section 6.32)定义了返回类型是 WITH TIME ZONE。
> > 另外 Calcite 文档中 [1] 也说返回的是 TIMESTAMP WITH TIME ZONE (虽然好像和实现不一致)
> > 其他的一些数据库也都差不多:mysql [2], oracle[3]
> >
> > Best,
> > Jark
> >
> > [1]: https://calcite.apache.org/docs/reference.html#datetime-functions
> > [2]:
> >
> >
> https://dev.mysql.com/doc/refman/8.0/en/date-and-time-functions.html#function_current-timestamp
> > [3]:
> >
> >
> https://docs.oracle.com/cd/B28359_01/server.111/b28286/functions038.htm#SQLRF00629
> >
> >
> >
> > On Tue, 24 Mar 2020 at 17:00, DONG, Weike 
> wrote:
> >
> > > Hi Zhenghua,
> > >
> > > 感谢您的回复。感觉既然 Flink 已经提供了时区的设定,这方面也许可以进一步增强一些。CONVERT_TZ
> > > 用户很容易忘记或者漏掉,这里还是有不少完善的空间。
> > >
> > > Best,
> > > Weike
> > >
> > > On Tue, Mar 24, 2020 at 4:20 PM Zhenghua Gao  wrote:
> > >
> > > > CURRENT_TIMESTAMP 返回值类型是 TIMESTAMP (WITHOUT TIME ZONE),
> > > > 其语义可参考 java.time.LocalDateTime。
> > > > 其字符形式的表示并不随着时区变化而变化(如你所见,和UTC+0 一致)。
> > > >
> > > > 你的需求可以通过 CONVERT_TZ(timestamp_string, time_zone_from_string,
> > > > time_zone_to_string)
> > > >
> > > > *Best Regards,*
> > > > *Zhenghua Gao*
> > > >
> > > >
> > > > On Mon, Mar 23, 2020 at 10:12 PM DONG, Weike <
> kyled...@connect.hku.hk>
> > > > wrote:
> > > >
> > > > > Hi,
> > > > >
> > > > > 近期发现 Flink 的 Blink Planner 在 DATE_FORMAT 对 CURRENT_TIMESTAMP
> > > > > 做时间格式化为字符串时,默认以 UTC+0 为准。
> > > > >
> > > > > 长期以来,TableConfig 类里面有一个 setLocalTimeZone
> 方法;将其设置为东八区以后,发现格式化后的字符串仍然是
> > > > UTC+0
> > > > > 的。而深入来看,Flink 的时间格式化时的代码生成逻辑(time.scala)并未考虑时区的设置。
> > > > >
> > > > > 由于大多数用户的时区均不是 UTC+0(GMT、UTC),如果时间格式化、显示等都可以考虑到 TableConfig
> 中的时区设置,那么
> > > > Flink
> > > > > 是否会更用户友好一些呢?当然这个会涉及到不兼容的变更,需要谨慎一些。
> > > > >
> > > > > 也许提供一个 DATE_FORMAT_WITH_TIMEZONE 的内置函数,社区是否会更容易接受一些呢?
> > > > >
> > > > > 仅仅是个人一点想法,感谢 :)
> > > > >
> > > >
> > >
> >
>


Re: 关于 SQL DATE_FORMAT 的时区设置的构想

2020-03-25 文章 Zhenghua Gao
Hi Jark,

这里的确是有问题的。
目前的问题是Calcite本身并不支持TIMESTAMP WITH TIME ZONE.

*Best Regards,*
*Zhenghua Gao*


On Tue, Mar 24, 2020 at 11:00 PM Jark Wu  wrote:

> Thanks for reporting this Weike.
>
> 首先,我觉得目前 Flink 返回 TIMESTAMP WITHOUT TIME ZONE 应该是有问题的。
> 因为 SQL 标准(SQL:2011 Part 2 Section 6.32)定义了返回类型是 WITH TIME ZONE。
> 另外 Calcite 文档中 [1] 也说返回的是 TIMESTAMP WITH TIME ZONE (虽然好像和实现不一致)
> 其他的一些数据库也都差不多:mysql [2], oracle[3]
>
> Best,
> Jark
>
> [1]: https://calcite.apache.org/docs/reference.html#datetime-functions
> [2]:
>
> https://dev.mysql.com/doc/refman/8.0/en/date-and-time-functions.html#function_current-timestamp
> [3]:
>
> https://docs.oracle.com/cd/B28359_01/server.111/b28286/functions038.htm#SQLRF00629
>
>
>
> On Tue, 24 Mar 2020 at 17:00, DONG, Weike  wrote:
>
> > Hi Zhenghua,
> >
> > 感谢您的回复。感觉既然 Flink 已经提供了时区的设定,这方面也许可以进一步增强一些。CONVERT_TZ
> > 用户很容易忘记或者漏掉,这里还是有不少完善的空间。
> >
> > Best,
> > Weike
> >
> > On Tue, Mar 24, 2020 at 4:20 PM Zhenghua Gao  wrote:
> >
> > > CURRENT_TIMESTAMP 返回值类型是 TIMESTAMP (WITHOUT TIME ZONE),
> > > 其语义可参考 java.time.LocalDateTime。
> > > 其字符形式的表示并不随着时区变化而变化(如你所见,和UTC+0 一致)。
> > >
> > > 你的需求可以通过 CONVERT_TZ(timestamp_string, time_zone_from_string,
> > > time_zone_to_string)
> > >
> > > *Best Regards,*
> > > *Zhenghua Gao*
> > >
> > >
> > > On Mon, Mar 23, 2020 at 10:12 PM DONG, Weike 
> > > wrote:
> > >
> > > > Hi,
> > > >
> > > > 近期发现 Flink 的 Blink Planner 在 DATE_FORMAT 对 CURRENT_TIMESTAMP
> > > > 做时间格式化为字符串时,默认以 UTC+0 为准。
> > > >
> > > > 长期以来,TableConfig 类里面有一个 setLocalTimeZone 方法;将其设置为东八区以后,发现格式化后的字符串仍然是
> > > UTC+0
> > > > 的。而深入来看,Flink 的时间格式化时的代码生成逻辑(time.scala)并未考虑时区的设置。
> > > >
> > > > 由于大多数用户的时区均不是 UTC+0(GMT、UTC),如果时间格式化、显示等都可以考虑到 TableConfig 中的时区设置,那么
> > > Flink
> > > > 是否会更用户友好一些呢?当然这个会涉及到不兼容的变更,需要谨慎一些。
> > > >
> > > > 也许提供一个 DATE_FORMAT_WITH_TIMEZONE 的内置函数,社区是否会更容易接受一些呢?
> > > >
> > > > 仅仅是个人一点想法,感谢 :)
> > > >
> > >
> >
>


Re: 回复: Flink JDBC Driver是否支持创建流数据表

2020-03-25 文章 godfrey he
hi 赵峰,

你出现的这个问题,是在classpath中找不到Kafka相关TableFactory,按照zhenghua说的方式可以解决。但是现在Flink
JDBC Driver只支持Batch模式,而Kafka table source目前只支持stream模式。

Best,
Godfrey

Zhenghua Gao  于2020年3月25日周三 下午4:26写道:

> 请确认一下 kafka connector 的jar包是否在 flink/lib 下。
> 目前的报错看起来是找不到kafka connector的jar包。
>
> *Best Regards,*
> *Zhenghua Gao*
>
>
> On Wed, Mar 25, 2020 at 4:18 PM 赵峰  wrote:
>
> > 不是语法问题,我建表也没有问题,是查询报错。你有没有试查询数据或者数据写人文件表中
> >
> >
> > 
> >
> > 参考下这个文档:
> >
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector
> > 下面的语法应该是不支持的:
> >   'format.type' = 'csv',\n" +
> > "'format.field-delimiter' = '|'\n"
> >
> > 下面是我可以跑通的代码, kafka 里的数据需要是这种格式:{"order_no":"abcdefg","status":90}
> > tEnv.sqlUpdate("CREATE TABLE pick_order (\n"
> > + "order_no VARCHAR,\n"
> > + "status INT\n"
> > + ") WITH (\n"
> > + "'connector.type' = 'kafka',\n"
> > + "'connector.version' = 'universal',\n"
> > + "'connector.topic' = 'wanglei_test',\n"
> > + "'connector.startup-mode' = 'latest-offset',\n"
> > + "'connector.properties.0.key' = 'zookeeper.connect',\n"
> > + "'connector.properties.0.value' = 'xxx:2181',\n"
> > + "'connector.properties.1.key' = 'bootstrap.servers',\n"
> > + "'connector.properties.1.value' = 'xxx:9092',\n"
> > + "'update-mode' = 'append',\n"
> > + "'format.type' = 'json',\n"
> > + "'format.derive-schema' = 'true'\n"
> > + ")");
> >
> > 王磊
> >
> >
> > wangl...@geekplus.com.cn
> > 发件人: 赵峰
> > 发送时间: 2020-03-24 21:28
> > 收件人: user-zh
> > 主题: Flink JDBC Driver是否支持创建流数据表
> > hi
> >
> > Flink JDBC Driver创建kafka表报错,是我的建表代码不正确?代码如下:
> > Connection connection =
> > DriverManager.getConnection("jdbc:flink://localhost:8083?planner=blink");
> > Statement statement = connection.createStatement();
> > statement.executeUpdate(
> > "CREATE TABLE table_kafka (\n" +
> > "user_id BIGINT,\n" +
> > "item_id BIGINT,\n" +
> > "category_id BIGINT,\n" +
> > "behavior STRING,\n" +
> > "ts TIMESTAMP(3),\n" +
> > "proctime as PROCTIME(),\n" +
> > "WATERMARK FOR ts as ts - INTERVAL '5' SECOND\n" +
> > ") WITH (\n" +
> > "'connector.type' = 'kafka', \n" +
> > "'connector.version' = 'universal', \n" +
> > "'connector.topic' = 'flink_im02', \n" +
> > "'connector.properties.group.id' = 'flink_im02_new',\n" +
> > "'connector.startup-mode' = 'earliest-offset', \n" +
> > "'connector.properties.zookeeper.connect' = '*.*.*.*:2181', \n" +
> > "'connector.properties.bootstrap.servers' = '*.*.*.*:9092', \n" +
> > "'format.type' = 'csv',\n" +
> > "'format.field-delimiter' = '|'\n" +
> > ")");
> > ResultSet rs1 = statement.executeQuery("SELECT * FROM table_kafka");
> > while (rs1.next()) {
> > System.out.println(rs1.getInt(1) + ", " + rs1.getInt(2));
> > }
> > statement.close();
> > connection.close();
> > 报错:
> > Reason: Required context properties mismatch.
> > The matching candidates:
> > org.apache.flink.table.sources.CsvBatchTableSourceFactory
> > Mismatched properties:
> > 'connector.type' expects 'filesystem', but is 'kafka'
> > 赵峰
> >
> > 
> > Quoted from:
> >
> http://apache-flink.147419.n8.nabble.com/Flink-JDBC-Driver-tp2103p2104.html
> >
> >
> >
> >
> > 赵峰
>


?????? ????flink sql 1.10 source????????????????????

2020-03-25 文章 Chief
hi Zhenghua Gao
??


----
??:"Zhenghua Gao"

Re: 关于flink sql 1.10 source并行度自动推断的疑问

2020-03-25 文章 Zhenghua Gao
Hi Chief,

目前Hive connector读取数据是通过 InputFormatSourceFunction 来实现的。
InputFormatSourceFunction 的工作模式不是预分配的模式,而是每个source task向master请求split。
如果某些source task提前调度起来且读完了所有的split,后调度起来的source task就没有数据可读了。
你可以看看JM/TM日志,确认下是不是前十个调度起来的source task读完了所有的数据。

*Best Regards,*
*Zhenghua Gao*


On Wed, Mar 25, 2020 at 3:31 PM Chief  wrote:

> hiJun Zhang
> 您说的我明白,就是不太理解,为什么根据文件数量自动推断任务并行后,不是每个并行任务读取一个文件?
>
>
>
>
>
> --原始邮件--
> 发件人:"Jun Zhang" 发送时间:2020年3月25日(星期三) 上午9:08
> 收件人:"user-zh"
> 主题:Re: 关于flink sql 1.10 source并行度自动推断的疑问
>
>
>
> hi,Chief:
>
>
> 目前flink读取hive的时候,如果开启了自动推断,系统会根据所读取的文件数来推断并发,如果没有超过最大并发数(默认1000),source的并行度就等于你文件的个数,
> 你可以通过table.exec.hive.infer-source-parallelism.max来设置source的最大并发度。
>
> Kurt Young 
>  你的数据量有多大?有一个可能的原因是source的其他并发调度起来的时候,数据已经被先调度起来的并发读完了。
> 
>  Best,
>  Kurt
> 
> 
>  On Tue, Mar 24, 2020 at 10:39 PM Chief  wrote:
> 
>   hi all:
>   之前用flink sql查询hive的数据,hive的数据文件是150个,sql
>   client配置文件设置的并行度是10,source通过自动推断生成了150并发,但是通过看web
>   ui发现只有前十个子任务是读到数据了,其他的任务显示没有读到数据,请问是我设置有问题吗?
> 


Flink1.10版本消费Kafka0.11版本,页面监控received都是0

2020-03-25 文章 Jim Chen
请教一个问题:我使用的是Flink是1.10版本消费Kafka0.11版本,直接打印出来。Flink集群是standalong模式,页面监控上的received都是0,不知道怎么回事?


Re: 回复: Flink JDBC Driver是否支持创建流数据表

2020-03-25 文章 Zhenghua Gao
请确认一下 kafka connector 的jar包是否在 flink/lib 下。
目前的报错看起来是找不到kafka connector的jar包。

*Best Regards,*
*Zhenghua Gao*


On Wed, Mar 25, 2020 at 4:18 PM 赵峰  wrote:

> 不是语法问题,我建表也没有问题,是查询报错。你有没有试查询数据或者数据写人文件表中
>
>
> 
>
> 参考下这个文档:
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector
> 下面的语法应该是不支持的:
>   'format.type' = 'csv',\n" +
> "'format.field-delimiter' = '|'\n"
>
> 下面是我可以跑通的代码, kafka 里的数据需要是这种格式:{"order_no":"abcdefg","status":90}
> tEnv.sqlUpdate("CREATE TABLE pick_order (\n"
> + "order_no VARCHAR,\n"
> + "status INT\n"
> + ") WITH (\n"
> + "'connector.type' = 'kafka',\n"
> + "'connector.version' = 'universal',\n"
> + "'connector.topic' = 'wanglei_test',\n"
> + "'connector.startup-mode' = 'latest-offset',\n"
> + "'connector.properties.0.key' = 'zookeeper.connect',\n"
> + "'connector.properties.0.value' = 'xxx:2181',\n"
> + "'connector.properties.1.key' = 'bootstrap.servers',\n"
> + "'connector.properties.1.value' = 'xxx:9092',\n"
> + "'update-mode' = 'append',\n"
> + "'format.type' = 'json',\n"
> + "'format.derive-schema' = 'true'\n"
> + ")");
>
> 王磊
>
>
> wangl...@geekplus.com.cn
> 发件人: 赵峰
> 发送时间: 2020-03-24 21:28
> 收件人: user-zh
> 主题: Flink JDBC Driver是否支持创建流数据表
> hi
>
> Flink JDBC Driver创建kafka表报错,是我的建表代码不正确?代码如下:
> Connection connection =
> DriverManager.getConnection("jdbc:flink://localhost:8083?planner=blink");
> Statement statement = connection.createStatement();
> statement.executeUpdate(
> "CREATE TABLE table_kafka (\n" +
> "user_id BIGINT,\n" +
> "item_id BIGINT,\n" +
> "category_id BIGINT,\n" +
> "behavior STRING,\n" +
> "ts TIMESTAMP(3),\n" +
> "proctime as PROCTIME(),\n" +
> "WATERMARK FOR ts as ts - INTERVAL '5' SECOND\n" +
> ") WITH (\n" +
> "'connector.type' = 'kafka', \n" +
> "'connector.version' = 'universal', \n" +
> "'connector.topic' = 'flink_im02', \n" +
> "'connector.properties.group.id' = 'flink_im02_new',\n" +
> "'connector.startup-mode' = 'earliest-offset', \n" +
> "'connector.properties.zookeeper.connect' = '*.*.*.*:2181', \n" +
> "'connector.properties.bootstrap.servers' = '*.*.*.*:9092', \n" +
> "'format.type' = 'csv',\n" +
> "'format.field-delimiter' = '|'\n" +
> ")");
> ResultSet rs1 = statement.executeQuery("SELECT * FROM table_kafka");
> while (rs1.next()) {
> System.out.println(rs1.getInt(1) + ", " + rs1.getInt(2));
> }
> statement.close();
> connection.close();
> 报错:
> Reason: Required context properties mismatch.
> The matching candidates:
> org.apache.flink.table.sources.CsvBatchTableSourceFactory
> Mismatched properties:
> 'connector.type' expects 'filesystem', but is 'kafka'
> 赵峰
>
> 
> Quoted from:
> http://apache-flink.147419.n8.nabble.com/Flink-JDBC-Driver-tp2103p2104.html
>
>
>
>
> 赵峰


Re: 回复: Flink JDBC Driver是否支持创建流数据表

2020-03-25 文章 赵峰
不是语法问题,我建表也没有问题,是查询报错。你有没有试查询数据或者数据写人文件表中




参考下这个文档:
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector
 
下面的语法应该是不支持的:
  'format.type' = 'csv',\n" +
"'format.field-delimiter' = '|'\n"

下面是我可以跑通的代码, kafka 里的数据需要是这种格式:{"order_no":"abcdefg","status":90}
tEnv.sqlUpdate("CREATE TABLE pick_order (\n"
+ "order_no VARCHAR,\n"
+ "status INT\n"
+ ") WITH (\n"
+ "'connector.type' = 'kafka',\n"
+ "'connector.version' = 'universal',\n"
+ "'connector.topic' = 'wanglei_test',\n"
+ "'connector.startup-mode' = 'latest-offset',\n"
+ "'connector.properties.0.key' = 'zookeeper.connect',\n"
+ "'connector.properties.0.value' = 'xxx:2181',\n"
+ "'connector.properties.1.key' = 'bootstrap.servers',\n"
+ "'connector.properties.1.value' = 'xxx:9092',\n"
+ "'update-mode' = 'append',\n"
+ "'format.type' = 'json',\n"
+ "'format.derive-schema' = 'true'\n"
+ ")");

王磊


wangl...@geekplus.com.cn
发件人: 赵峰
发送时间: 2020-03-24 21:28
收件人: user-zh
主题: Flink JDBC Driver是否支持创建流数据表
hi
   
Flink JDBC Driver创建kafka表报错,是我的建表代码不正确?代码如下:
Connection connection =
DriverManager.getConnection("jdbc:flink://localhost:8083?planner=blink");
Statement statement = connection.createStatement();
statement.executeUpdate(
"CREATE TABLE table_kafka (\n" +
"user_id BIGINT,\n" +
"item_id BIGINT,\n" +
"category_id BIGINT,\n" +
"behavior STRING,\n" +
"ts TIMESTAMP(3),\n" +
"proctime as PROCTIME(),\n" +
"WATERMARK FOR ts as ts - INTERVAL '5' SECOND\n" +
") WITH (\n" +
"'connector.type' = 'kafka', \n" +
"'connector.version' = 'universal', \n" +
"'connector.topic' = 'flink_im02', \n" +
"'connector.properties.group.id' = 'flink_im02_new',\n" +
"'connector.startup-mode' = 'earliest-offset', \n" +
"'connector.properties.zookeeper.connect' = '*.*.*.*:2181', \n" +
"'connector.properties.bootstrap.servers' = '*.*.*.*:9092', \n" +
"'format.type' = 'csv',\n" +
"'format.field-delimiter' = '|'\n" +
")");
ResultSet rs1 = statement.executeQuery("SELECT * FROM table_kafka");
while (rs1.next()) {
System.out.println(rs1.getInt(1) + ", " + rs1.getInt(2));
} 
statement.close();
connection.close();
报错:
Reason: Required context properties mismatch.
The matching candidates:
org.apache.flink.table.sources.CsvBatchTableSourceFactory
Mismatched properties:
'connector.type' expects 'filesystem', but is 'kafka'
赵峰


Quoted from: 
http://apache-flink.147419.n8.nabble.com/Flink-JDBC-Driver-tp2103p2104.html




赵峰

?????? ????flink sql 1.10 source????????????????????

2020-03-25 文章 Chief
hiJun Zhang






----
??:"Jun Zhang"

?????? ????flink sql 1.10 source????????????????????

2020-03-25 文章 Chief
hi Kurt Young
hive??13??web ui 
source??150source




----
??:"Kurt Young"

Re: ddl es 报错

2020-03-25 文章 Leonard Xu
, zhisheng
我觉得支持ES鉴权在生产中是蛮有用的功能,nice to have, 
如jinhai所说,可以先提个improvement的issue,在社区里讨论下(具体参数名,这些参数应该是可选的),讨论一致后开PR就可以了。

Best,
Leonard
  


> 在 2020年3月25日,13:51,jinhai wang  写道:
> 
> 优秀!可以提个improve issue
> 
> 
> Best Regards
> 
> jinhai...@gmail.com
> 
>> 2020年3月25日 下午1:40,zhisheng  写道:
>> 
>> hi,Leonar Xu
>> 
>> 官方 ES DDL 现在不支持填写 ES 集群的用户名和密码,我现在在公司已经做了扩展,加了这个功能,请问社区是否需要这个功能?我该怎么贡献呢?
>> 
>> 效果如图:http://zhisheng-blog.oss-cn-hangzhou.aliyuncs.com/2020-03-25-053948.png
>> 
>> Best Wishes!
>> 
>> zhisheng
>> 
>> Leonard Xu  于2020年3月24日周二 下午5:53写道:
>> 
>>> Hi, 出发
>>> 看起来是你缺少了依赖es connector的依赖[1],所以只能找到内置的filesystem connector,目前内置的filesystem
>>> connector只支持csv format,所以会有这个错误。
>>> 在项目中加上缺失的依赖即可,如果使用SQL CLI,也需要将依赖的jar包放到 flink的lib目录下。
>>> 
>>> 
>>>   org.apache.flink
>>>   flink-sql-connector-elasticsearch6_2.11
>>>   ${flink.version}
>>> 
>>> 
>>>   org.apache.flink
>>>   flink-json
>>>   ${flink.version}
>>> 
>>> 
>>> Best,
>>> Leonard
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connect.html#elasticsearch-connector
>>> <
>>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connect.html#elasticsearch-connector
 
>>> 
>>> 
 在 2020年3月23日,23:30,出发 <573693...@qq.com> 写道:
 
 
 源码如下:
 CREATE TABLE buy_cnt_per_hour (
   hour_of_day BIGINT,
   buy_cnt BIGINT
 ) WITH (
   'connector.type' = 'elasticsearch',
   'connector.version' = '6',
   'connector.hosts' = 'http://localhost:9200',
   'connector.index' = 'buy_cnt_per_hour',
   'connector.document-type' = 'user_behavior',
   'connector.bulk-flush.max-actions' = '1',
   'format.type' = 'json',
   'update-mode' = 'append'
 )
 import
>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.api.EnvironmentSettings;
 import org.apache.flink.table.api.Table;
 import org.apache.flink.table.api.java.StreamTableEnvironment;
 import org.apache.flink.types.Row;
 
 public class ESTest {
 
   public static void main(String[] args) throws Exception {
 
   //2、设置运行环境
   StreamExecutionEnvironment streamEnv =
>>> StreamExecutionEnvironment.getExecutionEnvironment();
   EnvironmentSettings settings =
>>> EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build();
   StreamTableEnvironment tableEnv =
>>> StreamTableEnvironment.create(streamEnv, settings);
   streamEnv.setParallelism(1);
   String sinkDDL = " CREATE TABLE test_es ( hour_of_day BIGINT,
>>> buy_cnt BIGINT "
   + ") WITH ( 'connector.type' = 'elasticsearch',
>>> 'connector.version' = '6',"
   + "'connector.hosts' = 'http://localhost:9200',
>>> 'connector.index' = 'buy_cnt_per_hour',"
   + "'connector.document-type' = 'user_behavior',"
   + "'connector.bulk-flush.max-actions' = '1',\n" + "
>>> 'format.type' = 'json',"
   + "'update-mode' = 'append' )";
   tableEnv.sqlUpdate(sinkDDL);
   Table table = tableEnv.sqlQuery("select * from test_es ");
   tableEnv.toRetractStream(table, Row.class).print();
   streamEnv.execute("");
   }
 
 }
 具体error
 The matching candidates:
 org.apache.flink.table.sources.CsvAppendTableSourceFactory
 Mismatched properties:
 'connector.type' expects 'filesystem', but is 'elasticsearch'
 'format.type' expects 'csv', but is 'json'
 
 The following properties are requested:
 connector.bulk-flush.max-actions=1
 connector.document-type=user_behavior
 connector.hosts=http://localhost:9200
 connector.index=buy_cnt_per_hour
 connector.type=elasticsearch
 connector.version=6
 format.type=json
 schema.0.data-type=BIGINT
 schema.0.name=hour_of_day
 schema.1.data-type=BIGINT
 schema.1.name=buy_cnt
 update-mode=append
>>> 
>>> 
> 



flink 1.9 状态后端为FsStateBackend,修改checkpoint时出现警告

2020-03-25 文章 guanyq

package com.guanyq.study.libraries.stateProcessorApi.FsStateBackend;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.FlatMapOperator;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.state.api.BootstrapTransformation;
import org.apache.flink.state.api.OperatorTransformation;
import org.apache.flink.state.api.Savepoint;
import org.apache.flink.state.api.functions.KeyedStateBootstrapFunction;
import org.apache.flink.state.api.functions.KeyedStateReaderFunction;
import org.apache.flink.util.Collector;

import java.util.ArrayList;
import java.util.List;
import java.util.function.Consumer;
import java.util.stream.Collectors;

public class ReorganizeListState {
public static void main(String[] args) throws Exception {
// set up the batch execution environment
final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
FlatMapOperator dataSet = 
Savepoint.load(env, 
"hdfs://MacBookPro:9000/flink/savepoints/8b900fc76a5a99dc5d0201e29cd7bef1/savepoint-8b900f-1e3d035ee046",
 new FsStateBackend("hdfs://MacBookPro:9000/flink/local/checkpoints"))
.readKeyedState("map", new ReaderFunction())
.flatMap(new FlatMapFunction() {
@Override
public void flatMap(KeyedListState keyedListState, 
Collector collector) {
KeyedListState newState = new KeyedListState();
newState.value = keyedListState.value.stream()
.map(x -> x + 
1).collect(Collectors.toList());
newState.key = keyedListState.key;
collector.collect(newState);
}
});
BootstrapTransformation transformation = 
OperatorTransformation
.bootstrapWith(dataSet)
.keyBy(acc -> acc.key)
.transform(new KeyedListStateBootstrapper());

String local = String.valueOf(System.currentTimeMillis());
System.out.println(local);

Savepoint.create(new 
FsStateBackend("hdfs://MacBookPro:9000/flink/local/checkpoints"),128)
.withOperator("map",transformation)

.write("hdfs://MacBookPro:9000/flink/local/reorganizeListState/"+local);

dataSet.writeAsText("./readSavepoint_"+System.currentTimeMillis());
env.execute("ReorganizeListState");
}

static class KeyedListState {
Integer key;
List value;
}

static class ReaderFunction extends KeyedStateReaderFunction {
private transient ListState listState;

@Override
public void open(Configuration parameters) {
ListStateDescriptor lsd =
new ListStateDescriptor<>("list", 
TypeInformation.of(Integer.class));
listState = getRuntimeContext().getListState(lsd);
}

@Override
public void readKey(
Integer key,
Context ctx,
Collector out) throws Exception {
List li = new ArrayList<>();
listState.get().forEach(new Consumer() {
@Override
public void accept(Integer integer) {
li.add(integer);
}
});

KeyedListState kl = new KeyedListState();
kl.key = key;
kl.value = li;

out.collect(kl);
}
}

static class KeyedListStateBootstrapper extends 
KeyedStateBootstrapFunction {
private transient ListState listState;

@Override
public void open(Configuration parameters) {
ListStateDescriptor lsd =
new ListStateDescriptor<>("list", 
TypeInformation.of(Integer.class));
listState = getRuntimeContext().getListState(lsd);
}

@Override
public void processElement(KeyedListState value, Context ctx) throws 
Exception {
listState.addAll(value.value);
}
}
}


Re: Re: 向您请教pyflink在windows上运行的问题,我第一次接触flink。

2020-03-25 文章 jincheng sun
上面视频中对应的word_count示例的源码应该是这个:
https://github.com/sunjincheng121/enjoyment.code/blob/master/myPyFlink/enjoyment/word_count.py运行完成之后计算结果应该是写到sink_file
= 'sink.csv'文件里面去了。你可以将这个文件的路径打印出来,查看这个文件内容。

另外如果您只是为了学习入门的话,建议你查阅[1][2], 我让想整理了解PyFlink最新的状况,可以查看[3]。

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/python/installation.html
[2]
https://enjoyment.cool/2020/01/22/Three-Min-Series-How-PyFlink-does-ETL/#more
[3]
https://www.bilibili.com/video/BV1W7411o7Tj?from=search=14518199503613218690

Best,
Jincheng
-
Twitter: https://twitter.com/sunjincheng121
-


xu1990xaut  于2020年3月25日周三 下午2:23写道:

> 孙老师您好,我之前在网上看的是这个视频《【Apache Flink 进阶教程】14课. Apache Flink Python API
> 的现状及未来规划》。 今天我也在虚拟机下试了,还是无法运行。
> 我用的是flink1.10,python3.6。  麻烦老师指点指点。
>
>
>
>
>
>
> 在 2020-03-25 11:32:29,"jincheng sun"  写道:
>
> 很高兴收到您的邮件,我不太确定您具体看的是哪一个视频,所以很难确定您遇到的问题原因。您可以参考官方文档[1],
> 同时我个人博客里有一些3分钟的视频和文档入门教程[2],您可以先查阅一下。如又问题,可以保持邮件沟通,可以在我的博客留言!
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/tutorials/python_table_api.html
> [2] https://enjoyment.cool/
>
> Best,
> Jincheng
>
>
>
> xu1990xaut  于2020年3月24日周二 下午11:36写道:
>
>> 您好,之前在哔哩哔哩上看过您讲的视频。  也跟着视频动手做了。
>> 我用的flink1.10,在pip的时候是直接pip install apache-flink,结果默认就是1.10版本。
>> 然后我在pycharm中运行word-count这个脚本时,一直不出结果,也不报错。   请问这是什么原因。
>> 我也装了jdk,另外页面访问flink8081那个端口也可以出来界面。  我是第一次接触flink,在网上也搜过这个问题,
>> 可是一直没有得到答案。  麻烦您,给小弟指点指点,   谢谢您了。
>>
>>
>>
>>
>
>
>
>