回复:本地测试 flink 1.10 hbase sql create table 在反序列化byte之后得到的对象conf配置为null,导致无法连接hbase集群

2020-05-22 文章 shao.hongxiao
没有,经过测试是可以写入,读取会出这个问题




| |
邵红晓
|
|
邮箱:17611022...@163.com
|

签名由 网易邮箱大师 定制

在2020年05月23日 08:20,Jacky Lau 写道:
你是否将hbase conf目录在configs. sh脚本中显示指定了


发自我的iPhone



-- 原始邮件 --
发件人: Leonard Xu 
发送时间: 2020年5月23日 00:19
收件人: shao.hongxiao <17611022...@163.com>
抄送: user-zh 
主题: 回复:本地测试 flink 1.10 hbase sql create table 
在反序列化byte之后得到的对象conf配置为null,导致无法连接hbase集群

Hi, hongxiao

我试了下,我本地hbase集群测试了下ok的[1],没能复现你的问题,你hbase集群的环境是怎么样的呀?

Best,
Leonard Xu
[1] 
https://github.com/leonardBang/flink-sql-etl/blob/master/etl-job/src/main/java/kafka2hbase/KafkaJoinHbaseJoinMysql2Hbase.java#L33
 


> 在 2020年5月22日,11:48,shao.hongxiao <17611022...@163.com> 写道:
>
> 一下是我的程序
> sql
> val hbaseTable =
>   """
> |CREATE TABLE pvuv_sink(
> |user_id varchar,
> |f ROW
> |) WITH (
> |'connector.type' = 'hbase',
> |'connector.version' = '1.4.3',
> |'connector.table-name' = 'test_shx',
> |'connector.zookeeper.quorum' = 'docker-hbase:2181',
> |'connector.zookeeper.znode.parent' = '/hbase'
> |)
>   """.stripMargin.toString
>
>  val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment
> bsEnv.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
>  bsEnv.setParallelism(1)
> val bsSettings = 
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
> val bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings)
> bsTableEnv.sqlUpdate(hbaseTable)
> bsTableEnv.execute("SQL Job")
>
> 报错
> job graph 阶段
> HBaseRowInputFormat.java
> this.conf = {Configuration@4841} "Configuration: core-default.xml, 
> core-site.xml, hbase-default.xml, hbase-site.xml"
> quietmode = true
> allowNullValueProperties = false
> resources = {ArrayList@4859}  size = 2
> finalParameters = {Collections$SetFromMap@4860}  size = 0
> loadDefaults = true
> updatingResource = {ConcurrentHashMap@4861}  size = 343
> properties = {Properties@4862}  size = 343
> overlay = {Properties@4863}  size = 2
> classLoader = {Launcher$AppClassLoader@4864}
>
> Executor job 阶段  InstantiationUtil.javareadObjectFromConfig
> userCodeObject = {HBaseRowInputFormat@13658}
> tableName = "test_shx"
> schema = {HBaseTableSchema@13660}
> conf = null
> readHelper = null
> endReached = false
> table = null
> scan = null
> resultScanner = null
> currentRow = null
> scannedRows = 0
> runtimeContext = null
>
>
> 恳请各位大神相帮
>
>
> 邵红晓
> 邮箱:17611022...@163.com
>  
> 
> 签名由 网易邮箱大师  定制




回复:本地测试 flink 1.10 hbase sql create table 在反序列化byte之后得到的对象conf配置为null,导致无法连接hbase集群

2020-05-22 文章 shao.hongxiao
感谢,经过测试发现,hbase可以写成功,就是无法读取,查看源码发现
hbaseRowinputformatImpl大概是这个类下面,有一句话
private transient Configuration conf;

这样conf就不会被序列化,也就是反序列化的时候为null了的原因了,具体你可以再复现一下


| |
邵红晓
|
|
邮箱:17611022...@163.com
|

签名由 网易邮箱大师 定制

在2020年05月23日 00:19,Leonard Xu 写道:
Hi, hongxiao

我试了下,我本地hbase集群测试了下ok的[1],没能复现你的问题,你hbase集群的环境是怎么样的呀?

Best,
Leonard Xu
[1] 
https://github.com/leonardBang/flink-sql-etl/blob/master/etl-job/src/main/java/kafka2hbase/KafkaJoinHbaseJoinMysql2Hbase.java#L33
 


> 在 2020年5月22日,11:48,shao.hongxiao <17611022...@163.com> 写道:
>
> 一下是我的程序
> sql
> val hbaseTable =
>   """
> |CREATE TABLE pvuv_sink(
> |user_id varchar,
> |f ROW
> |) WITH (
> |'connector.type' = 'hbase',
> |'connector.version' = '1.4.3',
> |'connector.table-name' = 'test_shx',
> |'connector.zookeeper.quorum' = 'docker-hbase:2181',
> |'connector.zookeeper.znode.parent' = '/hbase'
> |)
>   """.stripMargin.toString
>
>  val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment
> bsEnv.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
>  bsEnv.setParallelism(1)
> val bsSettings = 
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
> val bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings)
> bsTableEnv.sqlUpdate(hbaseTable)
> bsTableEnv.execute("SQL Job")
>
> 报错
> job graph 阶段
> HBaseRowInputFormat.java
> this.conf = {Configuration@4841} "Configuration: core-default.xml, 
> core-site.xml, hbase-default.xml, hbase-site.xml"
> quietmode = true
> allowNullValueProperties = false
> resources = {ArrayList@4859}  size = 2
> finalParameters = {Collections$SetFromMap@4860}  size = 0
> loadDefaults = true
> updatingResource = {ConcurrentHashMap@4861}  size = 343
> properties = {Properties@4862}  size = 343
> overlay = {Properties@4863}  size = 2
> classLoader = {Launcher$AppClassLoader@4864}
>
> Executor job 阶段  InstantiationUtil.javareadObjectFromConfig
> userCodeObject = {HBaseRowInputFormat@13658}
> tableName = "test_shx"
> schema = {HBaseTableSchema@13660}
> conf = null
> readHelper = null
> endReached = false
> table = null
> scan = null
> resultScanner = null
> currentRow = null
> scannedRows = 0
> runtimeContext = null
>
>
> 恳请各位大神相帮
>
>
> 邵红晓
> 邮箱:17611022...@163.com
>  
> 
> 签名由 网易邮箱大师  定制



Re: 本地测试 flink 1.10 hbase sql create table 在反序列化byte之后得到的对象conf配置为null,导致无法连接hbase集群

2020-05-22 文章 Leonard Xu
Hi, hongxiao

我试了下,我本地hbase集群测试了下ok的[1],没能复现你的问题,你hbase集群的环境是怎么样的呀?

Best,
Leonard Xu
[1] 
https://github.com/leonardBang/flink-sql-etl/blob/master/etl-job/src/main/java/kafka2hbase/KafkaJoinHbaseJoinMysql2Hbase.java#L33
 


> 在 2020年5月22日,11:48,shao.hongxiao <17611022...@163.com> 写道:
> 
> 一下是我的程序
> sql
> val hbaseTable =
>   """
> |CREATE TABLE pvuv_sink(
> |user_id varchar,
> |f ROW
> |) WITH (
> |'connector.type' = 'hbase',
> |'connector.version' = '1.4.3',
> |'connector.table-name' = 'test_shx',
> |'connector.zookeeper.quorum' = 'docker-hbase:2181',
> |'connector.zookeeper.znode.parent' = '/hbase'
> |)
>   """.stripMargin.toString
> 
>  val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment
> bsEnv.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
>  bsEnv.setParallelism(1)
> val bsSettings = 
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
> val bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings)
> bsTableEnv.sqlUpdate(hbaseTable)
> bsTableEnv.execute("SQL Job")
> 
> 报错
> job graph 阶段
> HBaseRowInputFormat.java 
> this.conf = {Configuration@4841} "Configuration: core-default.xml, 
> core-site.xml, hbase-default.xml, hbase-site.xml"
> quietmode = true
> allowNullValueProperties = false
> resources = {ArrayList@4859}  size = 2
> finalParameters = {Collections$SetFromMap@4860}  size = 0
> loadDefaults = true
> updatingResource = {ConcurrentHashMap@4861}  size = 343
> properties = {Properties@4862}  size = 343
> overlay = {Properties@4863}  size = 2
> classLoader = {Launcher$AppClassLoader@4864} 
> 
> Executor job 阶段  InstantiationUtil.javareadObjectFromConfig
> userCodeObject = {HBaseRowInputFormat@13658} 
> tableName = "test_shx"
> schema = {HBaseTableSchema@13660} 
> conf = null
> readHelper = null
> endReached = false
> table = null
> scan = null
> resultScanner = null
> currentRow = null
> scannedRows = 0
> runtimeContext = null
> 
> 
> 恳请各位大神相帮
> 
>   
> 邵红晓
> 邮箱:17611022...@163.com
>  
> 
> 签名由 网易邮箱大师  定制



Re: sinktable更新部分字段问题

2020-05-22 文章 Leonard Xu
Hi,naturalfree

Flink SQL 里es sink 是支持Append mode和upsert mode的[1],upsert mode下支持按主键更新的,你可以看看。


Best,
Leonard Xu
[1] 
https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/table/connect.html#elasticsearch-connector
 


> 在 2020年5月20日,16:07,naturalfree  写道:
> 
> 现在有一个es索引,想通过flink sql根据主键更新部分字段。不知是否有可行方案
> 
> 
> | |
> naturalfree
> |
> |
> 邮箱:naturalf...@126.com
> |
> 
> 签名由 网易邮箱大师 定制



Re: flink正则读取hdfs目录下的文件

2020-05-22 文章 Jingsong Li
Hi,

我们在1.11已经支持了较完整的filesystem支持,并且支持分区。(csv,json,avro,parquet,orc)

对于灵活的read,争取在1.12完成。已经有issue了:
https://issues.apache.org/jira/browse/FLINK-17398

Best,
Jingsong Lee

On Fri, May 22, 2020 at 10:47 AM 阿华田  wrote:

> input_data = "hdfs://localhost:9002/tmp/match_bak/%s*[0-9]" %
> ('2018-07-16’)
> result = sc.textFile(input_data)
> flink可以像spark一样正则读取hdfs目录下的文件吗?目前测试好像不行,如果不支持,最早什么版本会支持呢?
>
>
> | |
> 王志华
> |
> |
> a15733178...@163.com
> |
> 签名由网易邮箱大师定制
>
>

-- 
Best, Jingsong Lee


flink+prometheus继承问题

2020-05-22 文章 guaishushu1...@163.com
自定义的flink metrics可以通过flink rest接口拿到数据,但是prometheus没有拿到,有人遇到这种问题吗



guaishushu1...@163.com


Re: DDL定义@timestamp字段问题提示org.apache.flink.table.api.SqlParserException

2020-05-22 文章 oliver
谢谢 Benchao
是的,1.10.0,我升级到1.10.1试试

> 2020年5月22日 下午6:48,Benchao Li  写道:
> 
> Hi,
> 
> 这个应该是一个已知bug[1], 你应该用的是1.10.0版本吧?这个在1.10.1已经修复了。
> 
> [1] https://issues.apache.org/jira/browse/FLINK-16068
> 
> oliver  于2020年5月22日周五 下午5:38写道:
> 
>> hello,
>> 使用版本 Flink 1.10
>> 部分业务数据kafka数据存在字段`@timestamp`,
>> DDL如下:
>> CREATE TABLE kafkaSrc(
>>`@timestamp` TIMESTAMP(3)
>>,domain VARCHAR
>>,proctime AS proctime()
>> )WITH(
>>'connector.type' = 'kafka',
>>'connector.version' = 'universal',
>>'connector.topic' = ’topic',
>>'connector.startup-mode' = 'latest-offset',
>>'connector.properties.group.id' = ‘id',
>>'connector.properties.zookeeper.connect' = ‘xxx',
>>'connector.properties.bootstrap.servers' = ‘xxx',
>>'format.type' = 'json',
>>'format.derive-schema' = 'true'
>> );
>> 
>> 如果DML为:
>> insert into MyResult
>> select
>>,`@timestamp`
>>,domain
>>,proctime
>> from
>>kafkaSrc
>> ;
>> 
>> 则会提示SQL解析失败,异常如下:
>> Caused by: org.apache.flink.table.api.SqlParserException: SQL parse
>> failed. Lexical error at line 1, column 8.  Encountered: "@" (64), after :
>> ""
>>at
>> org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:50)
>>at
>> org.apache.flink.table.planner.calcite.SqlExprToRexConverterImpl.convertToRexNodes(SqlExprToRexConverterImpl.java:79)
>>at
>> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:111)
>>at
>> org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3328)
>>at
>> org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2357)
>>at
>> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051)
>>at
>> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2005)
>>at
>> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:646)
>>at
>> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:627)
>>at
>> org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3181)
>>at
>> org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:563)
>>at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
>> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:148)
>>at
>> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:135)
>>at
>> org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:522)
>>at
>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:436)
>>at
>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:154)
>>at
>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:342)
>>at
>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:142)
>>at
>> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66)
>>at
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:484)
>> 
>> Debug发现:
>> 
>> https://github.com/apache/flink/blob/release-1.10/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/calcite/CalciteParser.java#L45-L52
>> 
>> 函数:public SqlNode parse(String sql)入参sql的value是:SELECT
>> @timestamp,domain,PROCTIME() FROM __temp_table__
>> 
>> 图片地址:http://chuantu.xyz/t6/735/1590139759x2728303292.png
>> 
>> 另外,去掉proctime AS proctime(),可以正常输出结果,不知道有没有帮助
>> 
>> Best,
>> Oliver 云长
>> 
>> 
> 
> -- 
> 
> Benchao Li
> School of Electronics Engineering and Computer Science, Peking University
> Tel:+86-15650713730
> Email: libenc...@gmail.com; libenc...@pku.edu.cn
> 



Re: DDL定义@timestamp字段问题提示org.apache.flink.table.api.SqlParserException

2020-05-22 文章 Benchao Li
Hi,

这个应该是一个已知bug[1], 你应该用的是1.10.0版本吧?这个在1.10.1已经修复了。

[1] https://issues.apache.org/jira/browse/FLINK-16068

oliver  于2020年5月22日周五 下午5:38写道:

> hello,
> 使用版本 Flink 1.10
> 部分业务数据kafka数据存在字段`@timestamp`,
> DDL如下:
> CREATE TABLE kafkaSrc(
> `@timestamp` TIMESTAMP(3)
> ,domain VARCHAR
> ,proctime AS proctime()
>  )WITH(
> 'connector.type' = 'kafka',
> 'connector.version' = 'universal',
> 'connector.topic' = ’topic',
> 'connector.startup-mode' = 'latest-offset',
> 'connector.properties.group.id' = ‘id',
> 'connector.properties.zookeeper.connect' = ‘xxx',
> 'connector.properties.bootstrap.servers' = ‘xxx',
> 'format.type' = 'json',
> 'format.derive-schema' = 'true'
>  );
>
> 如果DML为:
> insert into MyResult
> select
> ,`@timestamp`
> ,domain
> ,proctime
> from
> kafkaSrc
> ;
>
> 则会提示SQL解析失败,异常如下:
> Caused by: org.apache.flink.table.api.SqlParserException: SQL parse
> failed. Lexical error at line 1, column 8.  Encountered: "@" (64), after :
> ""
> at
> org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:50)
> at
> org.apache.flink.table.planner.calcite.SqlExprToRexConverterImpl.convertToRexNodes(SqlExprToRexConverterImpl.java:79)
> at
> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:111)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3328)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2357)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2005)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:646)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:627)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3181)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:563)
> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:148)
> at
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:135)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:522)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:436)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:154)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:342)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:142)
> at
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:484)
>
> Debug发现:
>
> https://github.com/apache/flink/blob/release-1.10/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/calcite/CalciteParser.java#L45-L52
>
> 函数:public SqlNode parse(String sql)入参sql的value是:SELECT
> @timestamp,domain,PROCTIME() FROM __temp_table__
>
> 图片地址:http://chuantu.xyz/t6/735/1590139759x2728303292.png
>
> 另外,去掉proctime AS proctime(),可以正常输出结果,不知道有没有帮助
>
> Best,
> Oliver 云长
>
>

-- 

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenc...@gmail.com; libenc...@pku.edu.cn


How to control the banlence by using FlinkKafkaConsumer.setStartFromTimestamp(timestamp)

2020-05-22 文章 Jary Zhen
Hello everyone,

   First,a brief pipeline introduction:
  env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
  consume multi kafka topic
  -> union them
  -> assignTimestampsAndWatermarks
  -> keyby
  -> window()  and so on …
It's a very normal way use flink to process data like this in production
environment.
But,  If I want to test the pipeline above I need to use the api of
FlinkKafkaConsumer.setStartFromTimestamp(timestamp) to comsume 'past’ data.
So my question is how to control the ’step‘ banlence as one topic produces
3 records per second while another topic produces 3 per second.

I don’t know if I describe clearly . so any suspicion please let me know

Tks


DDL定义@timestamp字段问题提示org.apache.flink.table.api.SqlParserException

2020-05-22 文章 oliver
hello,
使用版本 Flink 1.10
部分业务数据kafka数据存在字段`@timestamp`,
DDL如下:
CREATE TABLE kafkaSrc(
`@timestamp` TIMESTAMP(3)
,domain VARCHAR
,proctime AS proctime()
 )WITH(
'connector.type' = 'kafka',
'connector.version' = 'universal',
'connector.topic' = ’topic',
'connector.startup-mode' = 'latest-offset',
'connector.properties.group.id' = ‘id',
'connector.properties.zookeeper.connect' = ‘xxx',
'connector.properties.bootstrap.servers' = ‘xxx',
'format.type' = 'json',
'format.derive-schema' = 'true'
 );

如果DML为:
insert into MyResult
select
,`@timestamp`
,domain
,proctime
from
kafkaSrc
;

则会提示SQL解析失败,异常如下:
Caused by: org.apache.flink.table.api.SqlParserException: SQL parse failed. 
Lexical error at line 1, column 8.  Encountered: "@" (64), after : ""
at 
org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:50)
at 
org.apache.flink.table.planner.calcite.SqlExprToRexConverterImpl.convertToRexNodes(SqlExprToRexConverterImpl.java:79)
at 
org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:111)
at 
org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3328)
at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2357)
at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051)
at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2005)
at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:646)
at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:627)
at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3181)
at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:563)
at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:148)
at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:135)
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:522)
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:436)
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:154)
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:342)
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:142)
at 
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:484)

Debug发现:
https://github.com/apache/flink/blob/release-1.10/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/calcite/CalciteParser.java#L45-L52

函数:public SqlNode parse(String sql)入参sql的value是:SELECT 
@timestamp,domain,PROCTIME() FROM __temp_table__

图片地址:http://chuantu.xyz/t6/735/1590139759x2728303292.png

另外,去掉proctime AS proctime(),可以正常输出结果,不知道有没有帮助

Best,
Oliver 云长



Re: 疑问:flink sql 不同job消费同一个kafka表(指定了groupId)时输出相同数据?

2020-05-22 文章 Benchao Li
Hi,

Flink的Kafka Connector的实现是用的Kafka lower
api,也就是会自己去获取当前的partition信息,自己来分配那些subtask读取那个partition。
所以如果有两个任务,他们互相之间是没有关系的,也不会相互感知到。(只有一点,就是如果你配置了相同的group
id,他们提交offset可能会互相覆盖。)
你说的那个模式是Kafka high-level api。

wind.fly@outlook.com  于2020年5月22日周五 下午4:21写道:

> Hi,all
> 使用flink版本1.10.0,在hive catalog下建了映射kafka的表:
> CREATE TABLE x.log.yanfa_log (
> dt TIMESTAMP(3),
> conn_id STRING,
> sequence STRING,
> trace_id STRING,
> span_info STRING,
> service_id STRING,
> msg_id STRING,
> servicename STRING,
> ret_code STRING,
> duration STRING,
> req_body MAP,
> res_body MAP,
> extra_info MAP,
> WATERMARK FOR dt AS dt - INTERVAL '60' SECOND
> ) WITH (
> 'connector.type' = 'kafka',
> 'connector.version' = '0.11',
> 'connector.topic' = 'x-log-yanfa_log',
> 'connector.properties.bootstrap.servers' = '**:9092',
> 'connector.properties.zookeeper.connect' = '**:2181',
> 'connector.properties.group.id' = 'testGroup',
> 'connector.startup-mode' = 'group-offsets',
> 'update-mode' = 'append',
> 'format.type' = 'json',
> 'format.fail-on-missing-field' = 'true'
> );
> 消费表x.log.yanfa_log程序如下:
> Catalog myCatalog = new HiveCatalog("x", "default",
> "D:\\conf", "1.1.0");
> tEnv.registerCatalog("x", myCatalog);
> Table rs = tEnv.sqlQuery("select * from x.log.yanfa_log");
> tEnv.toAppendStream(rs, Row.class).print();
>
> 然后针对同一个程序启动了2个job,结果都输出了相同的结果。我的疑问是kafka
> topic的同一个partition不是只能被group下至多一个consumer消费吗?为什么2个job会输出相同结果呢?
>


-- 

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenc...@gmail.com; libenc...@pku.edu.cn


疑问:flink sql 不同job消费同一个kafka表(指定了groupId)时输出相同数据?

2020-05-22 文章 wind.fly....@outlook.com
Hi,all
使用flink版本1.10.0,在hive catalog下建了映射kafka的表:
CREATE TABLE x.log.yanfa_log (
dt TIMESTAMP(3),
conn_id STRING,
sequence STRING,
trace_id STRING,
span_info STRING,
service_id STRING,
msg_id STRING,
servicename STRING,
ret_code STRING,
duration STRING,
req_body MAP,
res_body MAP,
extra_info MAP,
WATERMARK FOR dt AS dt - INTERVAL '60' SECOND
) WITH (
'connector.type' = 'kafka',
'connector.version' = '0.11',
'connector.topic' = 'x-log-yanfa_log',
'connector.properties.bootstrap.servers' = '**:9092',
'connector.properties.zookeeper.connect' = '**:2181',
'connector.properties.group.id' = 'testGroup',
'connector.startup-mode' = 'group-offsets',
'update-mode' = 'append',
'format.type' = 'json',
'format.fail-on-missing-field' = 'true'
);
消费表x.log.yanfa_log程序如下:
Catalog myCatalog = new HiveCatalog("x", "default",
"D:\\conf", "1.1.0");
tEnv.registerCatalog("x", myCatalog);
Table rs = tEnv.sqlQuery("select * from x.log.yanfa_log");
tEnv.toAppendStream(rs, Row.class).print();

然后针对同一个程序启动了2个job,结果都输出了相同的结果。我的疑问是kafka 
topic的同一个partition不是只能被group下至多一个consumer消费吗?为什么2个job会输出相同结果呢?


flink ???? Kafka ???? eos ????

2020-05-22 文章 ????????????????
Hi All??


flink  kafka ?? eos ?? 
??


0-05-21 16:52:15,057 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph- Source: Custom 
Source (1/1) (f65b2869d898a050238c53f9fbc9573b) switched from DEPLOYING to 
RUNNING. 2020-05-21 16:52:15,062 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph- 
Co-Process-Broadcast - Map - Sink: Unnamed (1/1) 
(d0739aa81367223f83a63a86307fffb3) switched from DEPLOYING to RUNNING. 
2020-05-21 16:52:15,276 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph- 
Co-Process-Broadcast - Map - Sink: Unnamed (1/1) 
(d0739aa81367223f83a63a86307fffb3) switched from RUNNING to FAILED. 
org.apache.kafka.common.KafkaException: org.apache.kafka.common.KafkaException: 
Could not find a coordinator with type TRANSACTION with key 
Co-Process-Broadcast - Map - Sink: 
Unnamed-c4ffe334eee7821772b24597621064ce-32 due tounexpected error: The server 
experienced an unexpected error when processing the request.   at 
sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
 at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
 at java.lang.reflect.Constructor.newInstance(Constructor.java:423) 
 at 
java.util.concurrent.ForkJoinTask.getThrowableException(ForkJoinTask.java:593)  
 at 
java.util.concurrent.ForkJoinTask.reportException(ForkJoinTask.java:677) at 
java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:735)  at 
java.util.stream.ForEachOps$ForEachOp.evaluateParallel(ForEachOps.java:160)  at 
java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateParallel(ForEachOps.java:174)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233)
at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418)   
at 
java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:583)  at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.abortTransactions(FlinkKafkaProducer.java:1101)
   at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initializeState(FlinkKafkaProducer.java:1037)
 at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
 at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
   at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:284)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:989)
  at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:453)
   at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:448)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:460) 
 at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)   at 
org.apache.flink.runtime.taskmanager.Task.run(Task.java:533) at 
java.lang.Thread.run(Thread.java:748) Caused by: 
org.apache.kafka.common.KafkaException: Could not find a coordinator with type 
TRANSACTION with key Co-Process-Broadcast - Map - Sink: 
Unnamed-c4ffe334eee7821772b24597621064ce-32 due tounexpected error: The server 
experienced an unexpected error when processing the request.  at 
org.apache.kafka.clients.producer.internals.TransactionManager$FindCoordinatorHandler.handleResponse(TransactionManager.java:1142)
   at 
org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:909)
 at 
org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109)  at 
org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:557)
 at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:549)  at 
org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:288)  at 
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:235)  
... 1 more

关于flink-streaming cogroup 算子 window 支持sideout 问题

2020-05-22 文章 zhaiao...@58.com
flink 1.10.0 中  streaming API 的 coGroup 算子目前还不支持sideout 来处理迟到的消息,请问社区有打算支持吗,


zhaiao...@58ganji.com


回复:sql client定义指向elasticsearch索引密码问题

2020-05-22 文章 naturalfree
好的,非常感谢


| |
naturalfree
|
|
邮箱:naturalf...@126.com
|

签名由 网易邮箱大师 定制

在2020年05月22日 11:15,Yangze Guo 写道:
目前1.11已经feature freeze,该功能最早1.12才能支持,着急的话可以看看DataStream
API的ElasticSearchSink,这个是支持安全认证的,也可以自己实现一个TableSink

Best,
Yangze Guo

On Fri, May 22, 2020 at 9:59 AM Rui Li  wrote:
>
> Hi,目前还不支持,不过有PR在做这个功能:https://github.com/apache/flink/pull/11822
>
> On Wed, May 20, 2020 at 4:10 PM naturalfree  wrote:
>
> > 在flink sql client配置文件中定义指向es的索引。发现没有设置用户名密码的属性,现在的es connector是否支持安全认证呢
> >
> > | |
> > naturalfree
> > |
> > |
> > 邮箱:naturalf...@126.com
> > |
> >
> > 签名由 网易邮箱大师 定制
>
>
>
> --
> Best regards!
> Rui Li


Session Window使用event time延迟特别高

2020-05-22 文章 李佳宸
大家好,

我遇到一个问题一直想不明白原因,想请教大家

我的代码keyby userid 然后使用session window 实现一个按userid聚合 并执行了一个 topN方法。

代码大致如下
// Topn聚合
DataStream itemList = resultDataStream
.assignTimestampsAndWatermarks(
new
BoundedOutOfOrdernessTimestampExtractor(Time.milliseconds(100))
{
@Override
 public long extractTimestamp(PredictResult
predictResult) {
 return predictResult.getDate_timestamp();
   }
}
)
.keyBy("userId")

.window(EventTimeSessionWindows.withGap(Time.milliseconds(100)))
.process(new TopNService(11));
itemList.print("IRS_RESULT: ");


作业的延迟特别的高,高达30秒,完全无法接受。 起初我以为是自己的 topN方法有问题,但我采用
ProcessTimeSessionWindow后,延迟降低为一秒以内。
使用processtime 的弊端是gap是不好估计,高了影响作业延迟,低了 无法完成预期的聚合,导致报错(且运行不稳定)。
我不太理解为什么会出现这样的情况~还烦请大家给与一点解决思路~~


谢谢

// top n方法

public static class TopNService extends
ProcessWindowFunction {

private final int topSize;

public TopNService(int topSize) {
this.topSize = topSize;
}
@Override
public void process(Tuple tuple, Context context,
Iterable iterable, Collector collector) throws
Exception {
List allItems = new ArrayList<>();
for (PredictResult predictResult:iterable){
allItems.add(predictResult);
}
allItems.sort(new Comparator() {
@Override
public int compare(PredictResult o1, PredictResult o2) {
return o2.probability.compareTo(o1.probability);
}
});
int userId = allItems.get(0).userId ;
String logonType=allItems.get(0).getLogonType();
StringBuilder result = new StringBuilder();
for (int i=0;i

Flink 1.10 本机模式(DefaultActionSuspension.resume()是怎么被调用的,想知道这个调用的过程)

2020-05-22 文章  
).Flink 1.10 
本机模式).方法org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.queueChannel
  调用toNotify.complete(null); 直接调用 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.DefaultActionSuspension.resume()).上面resume()是怎么被调用的,想知道这个调用的过程

flink-table sink 与sql-cli结合案例

2020-05-22 文章 guaishushu1...@163.com
大佬们,flink-tablesink 实现与sql-cli结合有什么案例吗?



guaishushu1...@163.com
 
From: user-zh-digest-help
Date: 2020-05-21 21:45
To: user-zh
Subject: user-zh Digest 21 May 2020 13:45:23 - Issue 703
 
user-zh Digest 21 May 2020 13:45:23 - Issue 703
 
Topics (messages 3698 through 3702)
 
回�:flink如何正则读�hdfs下的文件
3698 by: jimandlice
 
flink proctime error
3699 by: Á˲»ÆðµÄ¸Ç´Ä±È
3700 by: Benchao Li
3701 by: Jingsong Li
3702 by: Á˲»ÆðµÄ¸Ç´Ä±È
 
Administrivia:
 
-
To post to the list, e-mail: user-zh@flink.apache.org
To unsubscribe, e-mail: user-zh-digest-unsubscr...@flink.apache.org
For additional commands, e-mail: user-zh-digest-h...@flink.apache.org
 
--