Re: 关于flink sql 1.10 source并行度自动推断的疑问
Hi, 就像Zhenghua所说,各个tasks是去抢split的,而不是平均分配,所以一旦后面的tasks没有调度起来,前面的tasks会把split抢光的。 但是少slots多并发的场景并不少见,前面tasks读取太多数据可能会让性能/容错都不友好。所以我们也需要引入平均分配的策略。创建了个JIRA [1], FYI. [1]https://issues.apache.org/jira/browse/FLINK-16787 Best, Jingsong Lee On Wed, Mar 25, 2020 at 6:25 PM Chief wrote: > hi Zhenghua Gao > 好的,谢谢,我拉日志看看。 > > > --原始邮件-- > 发件人:"Zhenghua Gao" 发送时间:2020年3月25日(星期三) 晚上6:09 > 收件人:"user-zh" > 主题:Re: 关于flink sql 1.10 source并行度自动推断的疑问 > > > > Hi Chief, > > 目前Hive connector读取数据是通过 InputFormatSourceFunction 来实现的。 > InputFormatSourceFunction 的工作模式不是预分配的模式,而是每个source task向master请求split。 > 如果某些source task提前调度起来且读完了所有的split,后调度起来的source task就没有数据可读了。 > 你可以看看JM/TM日志,确认下是不是前十个调度起来的source task读完了所有的数据。 > > *Best Regards,* > *Zhenghua Gao* > > > On Wed, Mar 25, 2020 at 3:31 PM Chief > hinbsp;Jun Zhang > 您说的我明白,就是不太理解,为什么根据文件数量自动推断任务并行后,不是每个并行任务读取一个文件? > > > > > > --nbsp;原始邮件nbsp;-- > 发件人:nbsp;"Jun Zhang" 发送时间:nbsp;2020年3月25日(星期三) 上午9:08 > 收件人:nbsp;"user-zh" > 主题:nbsp;Re: 关于flink sql 1.10 source并行度自动推断的疑问 > > > > hi,Chief: > > > > 目前flink读取hive的时候,如果开启了自动推断,系统会根据所读取的文件数来推断并发,如果没有超过最大并发数(默认1000),source的并行度就等于你文件的个数, > 你可以通过table.exec.hive.infer-source-parallelism.max来设置source的最大并发度。 > > Kurt Young > gt; 你的数据量有多大?有一个可能的原因是source的其他并发调度起来的时候,数据已经被先调度起来的并发读完了。 > gt; > gt; Best, > gt; Kurt > gt; > gt; > gt; On Tue, Mar 24, 2020 at 10:39 PM Chief gt; > wrote: > gt; > gt; gt; hi all: > gt; gt; 之前用flink sql查询hive的数据,hive的数据文件是150个,sql > gt; gt; client配置文件设置的并行度是10,source通过自动推断生成了150并发,但是通过看web > gt; gt; ui发现只有前十个子任务是读到数据了,其他的任务显示没有读到数据,请问是我设置有问题吗? > gt; -- Best, Jingsong Lee
Re: 关于 SQL DATE_FORMAT 的时区设置的构想
顺便说一下,目前 localtimestamp 的实现看起来是没有问题的。@Dong 你可以先用 localtimestamp 。 在标准里面,以及一些常见数据库中(如 postgres[1], oracle[2]),localtimestamp 是 without time zone 的实现, 其值是 session zone 看到的值,等于 cast(current_timestamp as timestamp without time zone)。 所以目前 localtimestamp 的实现应该是没有问题的。 举个例子,理论上,这两个函数的行为应该如下: > SET time-zone=+08:00 > SELECT CURRENT_TIMESTAMP, LOCALTIMESTAMP; EXPR$0 | EXPR$1 2020-03-26 09:51:42.299 +08:00 | 2020-03-26 09:51:42.299 目前 Flink 中的 LOCALTIMESTAMP 是符合预期的。 Best, Jark [1]: https://www.postgresql.org/docs/9.5/functions-datetime.html#FUNCTIONS-DATETIME-CURRENT [2]: https://docs.oracle.com/cd/B28359_01/server.111/b28286/functions084.htm#SQLRF00660 On Wed, 25 Mar 2020 at 21:46, Kurt Young wrote: > 我们先改成 timestamp with local zone,如果这个字段的类型在整个query里都没变过,那个 with time > zone的效果也差不多了。 > > Best, > Kurt > > > On Wed, Mar 25, 2020 at 8:43 PM Zhenghua Gao wrote: > > > Hi Jark, > > > > 这里的确是有问题的。 > > 目前的问题是Calcite本身并不支持TIMESTAMP WITH TIME ZONE. > > > > *Best Regards,* > > *Zhenghua Gao* > > > > > > On Tue, Mar 24, 2020 at 11:00 PM Jark Wu wrote: > > > > > Thanks for reporting this Weike. > > > > > > 首先,我觉得目前 Flink 返回 TIMESTAMP WITHOUT TIME ZONE 应该是有问题的。 > > > 因为 SQL 标准(SQL:2011 Part 2 Section 6.32)定义了返回类型是 WITH TIME ZONE。 > > > 另外 Calcite 文档中 [1] 也说返回的是 TIMESTAMP WITH TIME ZONE (虽然好像和实现不一致) > > > 其他的一些数据库也都差不多:mysql [2], oracle[3] > > > > > > Best, > > > Jark > > > > > > [1]: https://calcite.apache.org/docs/reference.html#datetime-functions > > > [2]: > > > > > > > > > https://dev.mysql.com/doc/refman/8.0/en/date-and-time-functions.html#function_current-timestamp > > > [3]: > > > > > > > > > https://docs.oracle.com/cd/B28359_01/server.111/b28286/functions038.htm#SQLRF00629 > > > > > > > > > > > > On Tue, 24 Mar 2020 at 17:00, DONG, Weike > > wrote: > > > > > > > Hi Zhenghua, > > > > > > > > 感谢您的回复。感觉既然 Flink 已经提供了时区的设定,这方面也许可以进一步增强一些。CONVERT_TZ > > > > 用户很容易忘记或者漏掉,这里还是有不少完善的空间。 > > > > > > > > Best, > > > > Weike > > > > > > > > On Tue, Mar 24, 2020 at 4:20 PM Zhenghua Gao > wrote: > > > > > > > > > CURRENT_TIMESTAMP 返回值类型是 TIMESTAMP (WITHOUT TIME ZONE), > > > > > 其语义可参考 java.time.LocalDateTime。 > > > > > 其字符形式的表示并不随着时区变化而变化(如你所见,和UTC+0 一致)。 > > > > > > > > > > 你的需求可以通过 CONVERT_TZ(timestamp_string, time_zone_from_string, > > > > > time_zone_to_string) > > > > > > > > > > *Best Regards,* > > > > > *Zhenghua Gao* > > > > > > > > > > > > > > > On Mon, Mar 23, 2020 at 10:12 PM DONG, Weike < > > kyled...@connect.hku.hk> > > > > > wrote: > > > > > > > > > > > Hi, > > > > > > > > > > > > 近期发现 Flink 的 Blink Planner 在 DATE_FORMAT 对 CURRENT_TIMESTAMP > > > > > > 做时间格式化为字符串时,默认以 UTC+0 为准。 > > > > > > > > > > > > 长期以来,TableConfig 类里面有一个 setLocalTimeZone > > 方法;将其设置为东八区以后,发现格式化后的字符串仍然是 > > > > > UTC+0 > > > > > > 的。而深入来看,Flink 的时间格式化时的代码生成逻辑(time.scala)并未考虑时区的设置。 > > > > > > > > > > > > 由于大多数用户的时区均不是 UTC+0(GMT、UTC),如果时间格式化、显示等都可以考虑到 TableConfig > > 中的时区设置,那么 > > > > > Flink > > > > > > 是否会更用户友好一些呢?当然这个会涉及到不兼容的变更,需要谨慎一些。 > > > > > > > > > > > > 也许提供一个 DATE_FORMAT_WITH_TIMEZONE 的内置函数,社区是否会更容易接受一些呢? > > > > > > > > > > > > 仅仅是个人一点想法,感谢 :) > > > > > > > > > > > > > > > > > > > > >
flink动态分区写入hive如何处理数据倾斜的问题
大家好: 有一个类似的sql 拿官网的这个做示例:INSERT OVERWRITE myparttable SELECT 'Tom', 25, 'type_1', '2019-08-08’; 如果实际上第三个type字段,某一种type数据量特别大,导致了数据倾斜,这种情况一般怎么处理呢? 谢谢。
Re: 关于 SQL DATE_FORMAT 的时区设置的构想
我们先改成 timestamp with local zone,如果这个字段的类型在整个query里都没变过,那个 with time zone的效果也差不多了。 Best, Kurt On Wed, Mar 25, 2020 at 8:43 PM Zhenghua Gao wrote: > Hi Jark, > > 这里的确是有问题的。 > 目前的问题是Calcite本身并不支持TIMESTAMP WITH TIME ZONE. > > *Best Regards,* > *Zhenghua Gao* > > > On Tue, Mar 24, 2020 at 11:00 PM Jark Wu wrote: > > > Thanks for reporting this Weike. > > > > 首先,我觉得目前 Flink 返回 TIMESTAMP WITHOUT TIME ZONE 应该是有问题的。 > > 因为 SQL 标准(SQL:2011 Part 2 Section 6.32)定义了返回类型是 WITH TIME ZONE。 > > 另外 Calcite 文档中 [1] 也说返回的是 TIMESTAMP WITH TIME ZONE (虽然好像和实现不一致) > > 其他的一些数据库也都差不多:mysql [2], oracle[3] > > > > Best, > > Jark > > > > [1]: https://calcite.apache.org/docs/reference.html#datetime-functions > > [2]: > > > > > https://dev.mysql.com/doc/refman/8.0/en/date-and-time-functions.html#function_current-timestamp > > [3]: > > > > > https://docs.oracle.com/cd/B28359_01/server.111/b28286/functions038.htm#SQLRF00629 > > > > > > > > On Tue, 24 Mar 2020 at 17:00, DONG, Weike > wrote: > > > > > Hi Zhenghua, > > > > > > 感谢您的回复。感觉既然 Flink 已经提供了时区的设定,这方面也许可以进一步增强一些。CONVERT_TZ > > > 用户很容易忘记或者漏掉,这里还是有不少完善的空间。 > > > > > > Best, > > > Weike > > > > > > On Tue, Mar 24, 2020 at 4:20 PM Zhenghua Gao wrote: > > > > > > > CURRENT_TIMESTAMP 返回值类型是 TIMESTAMP (WITHOUT TIME ZONE), > > > > 其语义可参考 java.time.LocalDateTime。 > > > > 其字符形式的表示并不随着时区变化而变化(如你所见,和UTC+0 一致)。 > > > > > > > > 你的需求可以通过 CONVERT_TZ(timestamp_string, time_zone_from_string, > > > > time_zone_to_string) > > > > > > > > *Best Regards,* > > > > *Zhenghua Gao* > > > > > > > > > > > > On Mon, Mar 23, 2020 at 10:12 PM DONG, Weike < > kyled...@connect.hku.hk> > > > > wrote: > > > > > > > > > Hi, > > > > > > > > > > 近期发现 Flink 的 Blink Planner 在 DATE_FORMAT 对 CURRENT_TIMESTAMP > > > > > 做时间格式化为字符串时,默认以 UTC+0 为准。 > > > > > > > > > > 长期以来,TableConfig 类里面有一个 setLocalTimeZone > 方法;将其设置为东八区以后,发现格式化后的字符串仍然是 > > > > UTC+0 > > > > > 的。而深入来看,Flink 的时间格式化时的代码生成逻辑(time.scala)并未考虑时区的设置。 > > > > > > > > > > 由于大多数用户的时区均不是 UTC+0(GMT、UTC),如果时间格式化、显示等都可以考虑到 TableConfig > 中的时区设置,那么 > > > > Flink > > > > > 是否会更用户友好一些呢?当然这个会涉及到不兼容的变更,需要谨慎一些。 > > > > > > > > > > 也许提供一个 DATE_FORMAT_WITH_TIMEZONE 的内置函数,社区是否会更容易接受一些呢? > > > > > > > > > > 仅仅是个人一点想法,感谢 :) > > > > > > > > > > > > > > >
Re: 关于 SQL DATE_FORMAT 的时区设置的构想
Hi Jark, 这里的确是有问题的。 目前的问题是Calcite本身并不支持TIMESTAMP WITH TIME ZONE. *Best Regards,* *Zhenghua Gao* On Tue, Mar 24, 2020 at 11:00 PM Jark Wu wrote: > Thanks for reporting this Weike. > > 首先,我觉得目前 Flink 返回 TIMESTAMP WITHOUT TIME ZONE 应该是有问题的。 > 因为 SQL 标准(SQL:2011 Part 2 Section 6.32)定义了返回类型是 WITH TIME ZONE。 > 另外 Calcite 文档中 [1] 也说返回的是 TIMESTAMP WITH TIME ZONE (虽然好像和实现不一致) > 其他的一些数据库也都差不多:mysql [2], oracle[3] > > Best, > Jark > > [1]: https://calcite.apache.org/docs/reference.html#datetime-functions > [2]: > > https://dev.mysql.com/doc/refman/8.0/en/date-and-time-functions.html#function_current-timestamp > [3]: > > https://docs.oracle.com/cd/B28359_01/server.111/b28286/functions038.htm#SQLRF00629 > > > > On Tue, 24 Mar 2020 at 17:00, DONG, Weike wrote: > > > Hi Zhenghua, > > > > 感谢您的回复。感觉既然 Flink 已经提供了时区的设定,这方面也许可以进一步增强一些。CONVERT_TZ > > 用户很容易忘记或者漏掉,这里还是有不少完善的空间。 > > > > Best, > > Weike > > > > On Tue, Mar 24, 2020 at 4:20 PM Zhenghua Gao wrote: > > > > > CURRENT_TIMESTAMP 返回值类型是 TIMESTAMP (WITHOUT TIME ZONE), > > > 其语义可参考 java.time.LocalDateTime。 > > > 其字符形式的表示并不随着时区变化而变化(如你所见,和UTC+0 一致)。 > > > > > > 你的需求可以通过 CONVERT_TZ(timestamp_string, time_zone_from_string, > > > time_zone_to_string) > > > > > > *Best Regards,* > > > *Zhenghua Gao* > > > > > > > > > On Mon, Mar 23, 2020 at 10:12 PM DONG, Weike > > > wrote: > > > > > > > Hi, > > > > > > > > 近期发现 Flink 的 Blink Planner 在 DATE_FORMAT 对 CURRENT_TIMESTAMP > > > > 做时间格式化为字符串时,默认以 UTC+0 为准。 > > > > > > > > 长期以来,TableConfig 类里面有一个 setLocalTimeZone 方法;将其设置为东八区以后,发现格式化后的字符串仍然是 > > > UTC+0 > > > > 的。而深入来看,Flink 的时间格式化时的代码生成逻辑(time.scala)并未考虑时区的设置。 > > > > > > > > 由于大多数用户的时区均不是 UTC+0(GMT、UTC),如果时间格式化、显示等都可以考虑到 TableConfig 中的时区设置,那么 > > > Flink > > > > 是否会更用户友好一些呢?当然这个会涉及到不兼容的变更,需要谨慎一些。 > > > > > > > > 也许提供一个 DATE_FORMAT_WITH_TIMEZONE 的内置函数,社区是否会更容易接受一些呢? > > > > > > > > 仅仅是个人一点想法,感谢 :) > > > > > > > > > >
Re: 回复: Flink JDBC Driver是否支持创建流数据表
hi 赵峰, 你出现的这个问题,是在classpath中找不到Kafka相关TableFactory,按照zhenghua说的方式可以解决。但是现在Flink JDBC Driver只支持Batch模式,而Kafka table source目前只支持stream模式。 Best, Godfrey Zhenghua Gao 于2020年3月25日周三 下午4:26写道: > 请确认一下 kafka connector 的jar包是否在 flink/lib 下。 > 目前的报错看起来是找不到kafka connector的jar包。 > > *Best Regards,* > *Zhenghua Gao* > > > On Wed, Mar 25, 2020 at 4:18 PM 赵峰 wrote: > > > 不是语法问题,我建表也没有问题,是查询报错。你有没有试查询数据或者数据写人文件表中 > > > > > > > > > > 参考下这个文档: > > > > > https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector > > 下面的语法应该是不支持的: > > 'format.type' = 'csv',\n" + > > "'format.field-delimiter' = '|'\n" > > > > 下面是我可以跑通的代码, kafka 里的数据需要是这种格式:{"order_no":"abcdefg","status":90} > > tEnv.sqlUpdate("CREATE TABLE pick_order (\n" > > + "order_no VARCHAR,\n" > > + "status INT\n" > > + ") WITH (\n" > > + "'connector.type' = 'kafka',\n" > > + "'connector.version' = 'universal',\n" > > + "'connector.topic' = 'wanglei_test',\n" > > + "'connector.startup-mode' = 'latest-offset',\n" > > + "'connector.properties.0.key' = 'zookeeper.connect',\n" > > + "'connector.properties.0.value' = 'xxx:2181',\n" > > + "'connector.properties.1.key' = 'bootstrap.servers',\n" > > + "'connector.properties.1.value' = 'xxx:9092',\n" > > + "'update-mode' = 'append',\n" > > + "'format.type' = 'json',\n" > > + "'format.derive-schema' = 'true'\n" > > + ")"); > > > > 王磊 > > > > > > wangl...@geekplus.com.cn > > 发件人: 赵峰 > > 发送时间: 2020-03-24 21:28 > > 收件人: user-zh > > 主题: Flink JDBC Driver是否支持创建流数据表 > > hi > > > > Flink JDBC Driver创建kafka表报错,是我的建表代码不正确?代码如下: > > Connection connection = > > DriverManager.getConnection("jdbc:flink://localhost:8083?planner=blink"); > > Statement statement = connection.createStatement(); > > statement.executeUpdate( > > "CREATE TABLE table_kafka (\n" + > > "user_id BIGINT,\n" + > > "item_id BIGINT,\n" + > > "category_id BIGINT,\n" + > > "behavior STRING,\n" + > > "ts TIMESTAMP(3),\n" + > > "proctime as PROCTIME(),\n" + > > "WATERMARK FOR ts as ts - INTERVAL '5' SECOND\n" + > > ") WITH (\n" + > > "'connector.type' = 'kafka', \n" + > > "'connector.version' = 'universal', \n" + > > "'connector.topic' = 'flink_im02', \n" + > > "'connector.properties.group.id' = 'flink_im02_new',\n" + > > "'connector.startup-mode' = 'earliest-offset', \n" + > > "'connector.properties.zookeeper.connect' = '*.*.*.*:2181', \n" + > > "'connector.properties.bootstrap.servers' = '*.*.*.*:9092', \n" + > > "'format.type' = 'csv',\n" + > > "'format.field-delimiter' = '|'\n" + > > ")"); > > ResultSet rs1 = statement.executeQuery("SELECT * FROM table_kafka"); > > while (rs1.next()) { > > System.out.println(rs1.getInt(1) + ", " + rs1.getInt(2)); > > } > > statement.close(); > > connection.close(); > > 报错: > > Reason: Required context properties mismatch. > > The matching candidates: > > org.apache.flink.table.sources.CsvBatchTableSourceFactory > > Mismatched properties: > > 'connector.type' expects 'filesystem', but is 'kafka' > > 赵峰 > > > > > > Quoted from: > > > http://apache-flink.147419.n8.nabble.com/Flink-JDBC-Driver-tp2103p2104.html > > > > > > > > > > 赵峰 >
?????? ????flink sql 1.10 source????????????????????
hi Zhenghua Gao ?? ---- ??:"Zhenghua Gao"
Re: 关于flink sql 1.10 source并行度自动推断的疑问
Hi Chief, 目前Hive connector读取数据是通过 InputFormatSourceFunction 来实现的。 InputFormatSourceFunction 的工作模式不是预分配的模式,而是每个source task向master请求split。 如果某些source task提前调度起来且读完了所有的split,后调度起来的source task就没有数据可读了。 你可以看看JM/TM日志,确认下是不是前十个调度起来的source task读完了所有的数据。 *Best Regards,* *Zhenghua Gao* On Wed, Mar 25, 2020 at 3:31 PM Chief wrote: > hiJun Zhang > 您说的我明白,就是不太理解,为什么根据文件数量自动推断任务并行后,不是每个并行任务读取一个文件? > > > > > > --原始邮件-- > 发件人:"Jun Zhang" 发送时间:2020年3月25日(星期三) 上午9:08 > 收件人:"user-zh" > 主题:Re: 关于flink sql 1.10 source并行度自动推断的疑问 > > > > hi,Chief: > > > 目前flink读取hive的时候,如果开启了自动推断,系统会根据所读取的文件数来推断并发,如果没有超过最大并发数(默认1000),source的并行度就等于你文件的个数, > 你可以通过table.exec.hive.infer-source-parallelism.max来设置source的最大并发度。 > > Kurt Young > 你的数据量有多大?有一个可能的原因是source的其他并发调度起来的时候,数据已经被先调度起来的并发读完了。 > > Best, > Kurt > > > On Tue, Mar 24, 2020 at 10:39 PM Chief wrote: > > hi all: > 之前用flink sql查询hive的数据,hive的数据文件是150个,sql > client配置文件设置的并行度是10,source通过自动推断生成了150并发,但是通过看web > ui发现只有前十个子任务是读到数据了,其他的任务显示没有读到数据,请问是我设置有问题吗? >
Flink1.10版本消费Kafka0.11版本,页面监控received都是0
请教一个问题:我使用的是Flink是1.10版本消费Kafka0.11版本,直接打印出来。Flink集群是standalong模式,页面监控上的received都是0,不知道怎么回事?
Re: 回复: Flink JDBC Driver是否支持创建流数据表
请确认一下 kafka connector 的jar包是否在 flink/lib 下。 目前的报错看起来是找不到kafka connector的jar包。 *Best Regards,* *Zhenghua Gao* On Wed, Mar 25, 2020 at 4:18 PM 赵峰 wrote: > 不是语法问题,我建表也没有问题,是查询报错。你有没有试查询数据或者数据写人文件表中 > > > > > 参考下这个文档: > > https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector > 下面的语法应该是不支持的: > 'format.type' = 'csv',\n" + > "'format.field-delimiter' = '|'\n" > > 下面是我可以跑通的代码, kafka 里的数据需要是这种格式:{"order_no":"abcdefg","status":90} > tEnv.sqlUpdate("CREATE TABLE pick_order (\n" > + "order_no VARCHAR,\n" > + "status INT\n" > + ") WITH (\n" > + "'connector.type' = 'kafka',\n" > + "'connector.version' = 'universal',\n" > + "'connector.topic' = 'wanglei_test',\n" > + "'connector.startup-mode' = 'latest-offset',\n" > + "'connector.properties.0.key' = 'zookeeper.connect',\n" > + "'connector.properties.0.value' = 'xxx:2181',\n" > + "'connector.properties.1.key' = 'bootstrap.servers',\n" > + "'connector.properties.1.value' = 'xxx:9092',\n" > + "'update-mode' = 'append',\n" > + "'format.type' = 'json',\n" > + "'format.derive-schema' = 'true'\n" > + ")"); > > 王磊 > > > wangl...@geekplus.com.cn > 发件人: 赵峰 > 发送时间: 2020-03-24 21:28 > 收件人: user-zh > 主题: Flink JDBC Driver是否支持创建流数据表 > hi > > Flink JDBC Driver创建kafka表报错,是我的建表代码不正确?代码如下: > Connection connection = > DriverManager.getConnection("jdbc:flink://localhost:8083?planner=blink"); > Statement statement = connection.createStatement(); > statement.executeUpdate( > "CREATE TABLE table_kafka (\n" + > "user_id BIGINT,\n" + > "item_id BIGINT,\n" + > "category_id BIGINT,\n" + > "behavior STRING,\n" + > "ts TIMESTAMP(3),\n" + > "proctime as PROCTIME(),\n" + > "WATERMARK FOR ts as ts - INTERVAL '5' SECOND\n" + > ") WITH (\n" + > "'connector.type' = 'kafka', \n" + > "'connector.version' = 'universal', \n" + > "'connector.topic' = 'flink_im02', \n" + > "'connector.properties.group.id' = 'flink_im02_new',\n" + > "'connector.startup-mode' = 'earliest-offset', \n" + > "'connector.properties.zookeeper.connect' = '*.*.*.*:2181', \n" + > "'connector.properties.bootstrap.servers' = '*.*.*.*:9092', \n" + > "'format.type' = 'csv',\n" + > "'format.field-delimiter' = '|'\n" + > ")"); > ResultSet rs1 = statement.executeQuery("SELECT * FROM table_kafka"); > while (rs1.next()) { > System.out.println(rs1.getInt(1) + ", " + rs1.getInt(2)); > } > statement.close(); > connection.close(); > 报错: > Reason: Required context properties mismatch. > The matching candidates: > org.apache.flink.table.sources.CsvBatchTableSourceFactory > Mismatched properties: > 'connector.type' expects 'filesystem', but is 'kafka' > 赵峰 > > > Quoted from: > http://apache-flink.147419.n8.nabble.com/Flink-JDBC-Driver-tp2103p2104.html > > > > > 赵峰
Re: 回复: Flink JDBC Driver是否支持创建流数据表
不是语法问题,我建表也没有问题,是查询报错。你有没有试查询数据或者数据写人文件表中 参考下这个文档: https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector 下面的语法应该是不支持的: 'format.type' = 'csv',\n" + "'format.field-delimiter' = '|'\n" 下面是我可以跑通的代码, kafka 里的数据需要是这种格式:{"order_no":"abcdefg","status":90} tEnv.sqlUpdate("CREATE TABLE pick_order (\n" + "order_no VARCHAR,\n" + "status INT\n" + ") WITH (\n" + "'connector.type' = 'kafka',\n" + "'connector.version' = 'universal',\n" + "'connector.topic' = 'wanglei_test',\n" + "'connector.startup-mode' = 'latest-offset',\n" + "'connector.properties.0.key' = 'zookeeper.connect',\n" + "'connector.properties.0.value' = 'xxx:2181',\n" + "'connector.properties.1.key' = 'bootstrap.servers',\n" + "'connector.properties.1.value' = 'xxx:9092',\n" + "'update-mode' = 'append',\n" + "'format.type' = 'json',\n" + "'format.derive-schema' = 'true'\n" + ")"); 王磊 wangl...@geekplus.com.cn 发件人: 赵峰 发送时间: 2020-03-24 21:28 收件人: user-zh 主题: Flink JDBC Driver是否支持创建流数据表 hi Flink JDBC Driver创建kafka表报错,是我的建表代码不正确?代码如下: Connection connection = DriverManager.getConnection("jdbc:flink://localhost:8083?planner=blink"); Statement statement = connection.createStatement(); statement.executeUpdate( "CREATE TABLE table_kafka (\n" + "user_id BIGINT,\n" + "item_id BIGINT,\n" + "category_id BIGINT,\n" + "behavior STRING,\n" + "ts TIMESTAMP(3),\n" + "proctime as PROCTIME(),\n" + "WATERMARK FOR ts as ts - INTERVAL '5' SECOND\n" + ") WITH (\n" + "'connector.type' = 'kafka', \n" + "'connector.version' = 'universal', \n" + "'connector.topic' = 'flink_im02', \n" + "'connector.properties.group.id' = 'flink_im02_new',\n" + "'connector.startup-mode' = 'earliest-offset', \n" + "'connector.properties.zookeeper.connect' = '*.*.*.*:2181', \n" + "'connector.properties.bootstrap.servers' = '*.*.*.*:9092', \n" + "'format.type' = 'csv',\n" + "'format.field-delimiter' = '|'\n" + ")"); ResultSet rs1 = statement.executeQuery("SELECT * FROM table_kafka"); while (rs1.next()) { System.out.println(rs1.getInt(1) + ", " + rs1.getInt(2)); } statement.close(); connection.close(); 报错: Reason: Required context properties mismatch. The matching candidates: org.apache.flink.table.sources.CsvBatchTableSourceFactory Mismatched properties: 'connector.type' expects 'filesystem', but is 'kafka' 赵峰 Quoted from: http://apache-flink.147419.n8.nabble.com/Flink-JDBC-Driver-tp2103p2104.html 赵峰
?????? ????flink sql 1.10 source????????????????????
hiJun Zhang ---- ??:"Jun Zhang"
?????? ????flink sql 1.10 source????????????????????
hi Kurt Young hive??13??web ui source??150source ---- ??:"Kurt Young"
Re: ddl es 报错
, zhisheng 我觉得支持ES鉴权在生产中是蛮有用的功能,nice to have, 如jinhai所说,可以先提个improvement的issue,在社区里讨论下(具体参数名,这些参数应该是可选的),讨论一致后开PR就可以了。 Best, Leonard > 在 2020年3月25日,13:51,jinhai wang 写道: > > 优秀!可以提个improve issue > > > Best Regards > > jinhai...@gmail.com > >> 2020年3月25日 下午1:40,zhisheng 写道: >> >> hi,Leonar Xu >> >> 官方 ES DDL 现在不支持填写 ES 集群的用户名和密码,我现在在公司已经做了扩展,加了这个功能,请问社区是否需要这个功能?我该怎么贡献呢? >> >> 效果如图:http://zhisheng-blog.oss-cn-hangzhou.aliyuncs.com/2020-03-25-053948.png >> >> Best Wishes! >> >> zhisheng >> >> Leonard Xu 于2020年3月24日周二 下午5:53写道: >> >>> Hi, 出发 >>> 看起来是你缺少了依赖es connector的依赖[1],所以只能找到内置的filesystem connector,目前内置的filesystem >>> connector只支持csv format,所以会有这个错误。 >>> 在项目中加上缺失的依赖即可,如果使用SQL CLI,也需要将依赖的jar包放到 flink的lib目录下。 >>> >>> >>> org.apache.flink >>> flink-sql-connector-elasticsearch6_2.11 >>> ${flink.version} >>> >>> >>> org.apache.flink >>> flink-json >>> ${flink.version} >>> >>> >>> Best, >>> Leonard >>> [1] >>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connect.html#elasticsearch-connector >>> < >>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connect.html#elasticsearch-connector >>> >>> 在 2020年3月23日,23:30,出发 <573693...@qq.com> 写道: 源码如下: CREATE TABLE buy_cnt_per_hour ( hour_of_day BIGINT, buy_cnt BIGINT ) WITH ( 'connector.type' = 'elasticsearch', 'connector.version' = '6', 'connector.hosts' = 'http://localhost:9200', 'connector.index' = 'buy_cnt_per_hour', 'connector.document-type' = 'user_behavior', 'connector.bulk-flush.max-actions' = '1', 'format.type' = 'json', 'update-mode' = 'append' ) import >>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.java.StreamTableEnvironment; import org.apache.flink.types.Row; public class ESTest { public static void main(String[] args) throws Exception { //2、设置运行环境 StreamExecutionEnvironment streamEnv = >>> StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings settings = >>> EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build(); StreamTableEnvironment tableEnv = >>> StreamTableEnvironment.create(streamEnv, settings); streamEnv.setParallelism(1); String sinkDDL = " CREATE TABLE test_es ( hour_of_day BIGINT, >>> buy_cnt BIGINT " + ") WITH ( 'connector.type' = 'elasticsearch', >>> 'connector.version' = '6'," + "'connector.hosts' = 'http://localhost:9200', >>> 'connector.index' = 'buy_cnt_per_hour'," + "'connector.document-type' = 'user_behavior'," + "'connector.bulk-flush.max-actions' = '1',\n" + " >>> 'format.type' = 'json'," + "'update-mode' = 'append' )"; tableEnv.sqlUpdate(sinkDDL); Table table = tableEnv.sqlQuery("select * from test_es "); tableEnv.toRetractStream(table, Row.class).print(); streamEnv.execute(""); } } 具体error The matching candidates: org.apache.flink.table.sources.CsvAppendTableSourceFactory Mismatched properties: 'connector.type' expects 'filesystem', but is 'elasticsearch' 'format.type' expects 'csv', but is 'json' The following properties are requested: connector.bulk-flush.max-actions=1 connector.document-type=user_behavior connector.hosts=http://localhost:9200 connector.index=buy_cnt_per_hour connector.type=elasticsearch connector.version=6 format.type=json schema.0.data-type=BIGINT schema.0.name=hour_of_day schema.1.data-type=BIGINT schema.1.name=buy_cnt update-mode=append >>> >>> >
flink 1.9 状态后端为FsStateBackend,修改checkpoint时出现警告
package com.guanyq.study.libraries.stateProcessorApi.FsStateBackend; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.operators.FlatMapOperator; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.state.api.BootstrapTransformation; import org.apache.flink.state.api.OperatorTransformation; import org.apache.flink.state.api.Savepoint; import org.apache.flink.state.api.functions.KeyedStateBootstrapFunction; import org.apache.flink.state.api.functions.KeyedStateReaderFunction; import org.apache.flink.util.Collector; import java.util.ArrayList; import java.util.List; import java.util.function.Consumer; import java.util.stream.Collectors; public class ReorganizeListState { public static void main(String[] args) throws Exception { // set up the batch execution environment final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); FlatMapOperator dataSet = Savepoint.load(env, "hdfs://MacBookPro:9000/flink/savepoints/8b900fc76a5a99dc5d0201e29cd7bef1/savepoint-8b900f-1e3d035ee046", new FsStateBackend("hdfs://MacBookPro:9000/flink/local/checkpoints")) .readKeyedState("map", new ReaderFunction()) .flatMap(new FlatMapFunction() { @Override public void flatMap(KeyedListState keyedListState, Collector collector) { KeyedListState newState = new KeyedListState(); newState.value = keyedListState.value.stream() .map(x -> x + 1).collect(Collectors.toList()); newState.key = keyedListState.key; collector.collect(newState); } }); BootstrapTransformation transformation = OperatorTransformation .bootstrapWith(dataSet) .keyBy(acc -> acc.key) .transform(new KeyedListStateBootstrapper()); String local = String.valueOf(System.currentTimeMillis()); System.out.println(local); Savepoint.create(new FsStateBackend("hdfs://MacBookPro:9000/flink/local/checkpoints"),128) .withOperator("map",transformation) .write("hdfs://MacBookPro:9000/flink/local/reorganizeListState/"+local); dataSet.writeAsText("./readSavepoint_"+System.currentTimeMillis()); env.execute("ReorganizeListState"); } static class KeyedListState { Integer key; List value; } static class ReaderFunction extends KeyedStateReaderFunction { private transient ListState listState; @Override public void open(Configuration parameters) { ListStateDescriptor lsd = new ListStateDescriptor<>("list", TypeInformation.of(Integer.class)); listState = getRuntimeContext().getListState(lsd); } @Override public void readKey( Integer key, Context ctx, Collector out) throws Exception { List li = new ArrayList<>(); listState.get().forEach(new Consumer() { @Override public void accept(Integer integer) { li.add(integer); } }); KeyedListState kl = new KeyedListState(); kl.key = key; kl.value = li; out.collect(kl); } } static class KeyedListStateBootstrapper extends KeyedStateBootstrapFunction { private transient ListState listState; @Override public void open(Configuration parameters) { ListStateDescriptor lsd = new ListStateDescriptor<>("list", TypeInformation.of(Integer.class)); listState = getRuntimeContext().getListState(lsd); } @Override public void processElement(KeyedListState value, Context ctx) throws Exception { listState.addAll(value.value); } } }
Re: Re: 向您请教pyflink在windows上运行的问题,我第一次接触flink。
上面视频中对应的word_count示例的源码应该是这个: https://github.com/sunjincheng121/enjoyment.code/blob/master/myPyFlink/enjoyment/word_count.py运行完成之后计算结果应该是写到sink_file = 'sink.csv'文件里面去了。你可以将这个文件的路径打印出来,查看这个文件内容。 另外如果您只是为了学习入门的话,建议你查阅[1][2], 我让想整理了解PyFlink最新的状况,可以查看[3]。 [1] https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/python/installation.html [2] https://enjoyment.cool/2020/01/22/Three-Min-Series-How-PyFlink-does-ETL/#more [3] https://www.bilibili.com/video/BV1W7411o7Tj?from=search=14518199503613218690 Best, Jincheng - Twitter: https://twitter.com/sunjincheng121 - xu1990xaut 于2020年3月25日周三 下午2:23写道: > 孙老师您好,我之前在网上看的是这个视频《【Apache Flink 进阶教程】14课. Apache Flink Python API > 的现状及未来规划》。 今天我也在虚拟机下试了,还是无法运行。 > 我用的是flink1.10,python3.6。 麻烦老师指点指点。 > > > > > > > 在 2020-03-25 11:32:29,"jincheng sun" 写道: > > 很高兴收到您的邮件,我不太确定您具体看的是哪一个视频,所以很难确定您遇到的问题原因。您可以参考官方文档[1], > 同时我个人博客里有一些3分钟的视频和文档入门教程[2],您可以先查阅一下。如又问题,可以保持邮件沟通,可以在我的博客留言! > > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.10/tutorials/python_table_api.html > [2] https://enjoyment.cool/ > > Best, > Jincheng > > > > xu1990xaut 于2020年3月24日周二 下午11:36写道: > >> 您好,之前在哔哩哔哩上看过您讲的视频。 也跟着视频动手做了。 >> 我用的flink1.10,在pip的时候是直接pip install apache-flink,结果默认就是1.10版本。 >> 然后我在pycharm中运行word-count这个脚本时,一直不出结果,也不报错。 请问这是什么原因。 >> 我也装了jdk,另外页面访问flink8081那个端口也可以出来界面。 我是第一次接触flink,在网上也搜过这个问题, >> 可是一直没有得到答案。 麻烦您,给小弟指点指点, 谢谢您了。 >> >> >> >> > > > >