Re: ddl es 报错

2020-03-24 文章 jinhai wang
优秀!可以提个improve issue


Best Regards

jinhai...@gmail.com

> 2020年3月25日 下午1:40,zhisheng  写道:
> 
> hi,Leonar Xu
> 
> 官方 ES DDL 现在不支持填写 ES 集群的用户名和密码,我现在在公司已经做了扩展,加了这个功能,请问社区是否需要这个功能?我该怎么贡献呢?
> 
> 效果如图:http://zhisheng-blog.oss-cn-hangzhou.aliyuncs.com/2020-03-25-053948.png
> 
> Best Wishes!
> 
> zhisheng
> 
> Leonard Xu  于2020年3月24日周二 下午5:53写道:
> 
>> Hi, 出发
>> 看起来是你缺少了依赖es connector的依赖[1],所以只能找到内置的filesystem connector,目前内置的filesystem
>> connector只支持csv format,所以会有这个错误。
>> 在项目中加上缺失的依赖即可,如果使用SQL CLI,也需要将依赖的jar包放到 flink的lib目录下。
>> 
>> 
>>org.apache.flink
>>flink-sql-connector-elasticsearch6_2.11
>>${flink.version}
>> 
>> 
>>org.apache.flink
>>flink-json
>>${flink.version}
>> 
>> 
>> Best,
>> Leonard
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connect.html#elasticsearch-connector
>> <
>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connect.html#elasticsearch-connector
>>> 
>> 
>> 
>>> 在 2020年3月23日,23:30,出发 <573693...@qq.com> 写道:
>>> 
>>> 
>>> 源码如下:
>>> CREATE TABLE buy_cnt_per_hour (
>>>hour_of_day BIGINT,
>>>buy_cnt BIGINT
>>> ) WITH (
>>>'connector.type' = 'elasticsearch',
>>>'connector.version' = '6',
>>>'connector.hosts' = 'http://localhost:9200',
>>>'connector.index' = 'buy_cnt_per_hour',
>>>'connector.document-type' = 'user_behavior',
>>>'connector.bulk-flush.max-actions' = '1',
>>>'format.type' = 'json',
>>>'update-mode' = 'append'
>>> )
>>> import
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>>> import org.apache.flink.table.api.EnvironmentSettings;
>>> import org.apache.flink.table.api.Table;
>>> import org.apache.flink.table.api.java.StreamTableEnvironment;
>>> import org.apache.flink.types.Row;
>>> 
>>> public class ESTest {
>>> 
>>>public static void main(String[] args) throws Exception {
>>> 
>>>//2、设置运行环境
>>>StreamExecutionEnvironment streamEnv =
>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>EnvironmentSettings settings =
>> EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build();
>>>StreamTableEnvironment tableEnv =
>> StreamTableEnvironment.create(streamEnv, settings);
>>>streamEnv.setParallelism(1);
>>>String sinkDDL = " CREATE TABLE test_es ( hour_of_day BIGINT,
>> buy_cnt BIGINT "
>>>+ ") WITH ( 'connector.type' = 'elasticsearch',
>> 'connector.version' = '6',"
>>>+ "'connector.hosts' = 'http://localhost:9200',
>> 'connector.index' = 'buy_cnt_per_hour',"
>>>+ "'connector.document-type' = 'user_behavior',"
>>>+ "'connector.bulk-flush.max-actions' = '1',\n" + "
>>  'format.type' = 'json',"
>>>+ "'update-mode' = 'append' )";
>>>tableEnv.sqlUpdate(sinkDDL);
>>>Table table = tableEnv.sqlQuery("select * from test_es ");
>>>tableEnv.toRetractStream(table, Row.class).print();
>>>streamEnv.execute("");
>>>}
>>> 
>>> }
>>> 具体error
>>> The matching candidates:
>>> org.apache.flink.table.sources.CsvAppendTableSourceFactory
>>> Mismatched properties:
>>> 'connector.type' expects 'filesystem', but is 'elasticsearch'
>>> 'format.type' expects 'csv', but is 'json'
>>> 
>>> The following properties are requested:
>>> connector.bulk-flush.max-actions=1
>>> connector.document-type=user_behavior
>>> connector.hosts=http://localhost:9200
>>> connector.index=buy_cnt_per_hour
>>> connector.type=elasticsearch
>>> connector.version=6
>>> format.type=json
>>> schema.0.data-type=BIGINT
>>> schema.0.name=hour_of_day
>>> schema.1.data-type=BIGINT
>>> schema.1.name=buy_cnt
>>> update-mode=append
>> 
>> 



Re: ddl es 报错

2020-03-24 文章 zhisheng
hi,Leonar Xu

官方 ES DDL 现在不支持填写 ES 集群的用户名和密码,我现在在公司已经做了扩展,加了这个功能,请问社区是否需要这个功能?我该怎么贡献呢?

效果如图:http://zhisheng-blog.oss-cn-hangzhou.aliyuncs.com/2020-03-25-053948.png

Best Wishes!

zhisheng

Leonard Xu  于2020年3月24日周二 下午5:53写道:

> Hi, 出发
> 看起来是你缺少了依赖es connector的依赖[1],所以只能找到内置的filesystem connector,目前内置的filesystem
> connector只支持csv format,所以会有这个错误。
> 在项目中加上缺失的依赖即可,如果使用SQL CLI,也需要将依赖的jar包放到 flink的lib目录下。
>
> 
> org.apache.flink
> flink-sql-connector-elasticsearch6_2.11
> ${flink.version}
> 
> 
> org.apache.flink
> flink-json
> ${flink.version}
> 
>
> Best,
> Leonard
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connect.html#elasticsearch-connector
> <
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connect.html#elasticsearch-connector
> >
>
>
> > 在 2020年3月23日,23:30,出发 <573693...@qq.com> 写道:
> >
> >
> > 源码如下:
> > CREATE TABLE buy_cnt_per_hour (
> > hour_of_day BIGINT,
> > buy_cnt BIGINT
> > ) WITH (
> > 'connector.type' = 'elasticsearch',
> > 'connector.version' = '6',
> > 'connector.hosts' = 'http://localhost:9200',
> > 'connector.index' = 'buy_cnt_per_hour',
> > 'connector.document-type' = 'user_behavior',
> > 'connector.bulk-flush.max-actions' = '1',
> > 'format.type' = 'json',
> > 'update-mode' = 'append'
> > )
> > import
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> > import org.apache.flink.table.api.EnvironmentSettings;
> > import org.apache.flink.table.api.Table;
> > import org.apache.flink.table.api.java.StreamTableEnvironment;
> > import org.apache.flink.types.Row;
> >
> > public class ESTest {
> >
> > public static void main(String[] args) throws Exception {
> >
> > //2、设置运行环境
> > StreamExecutionEnvironment streamEnv =
> StreamExecutionEnvironment.getExecutionEnvironment();
> > EnvironmentSettings settings =
> EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build();
> > StreamTableEnvironment tableEnv =
> StreamTableEnvironment.create(streamEnv, settings);
> > streamEnv.setParallelism(1);
> > String sinkDDL = " CREATE TABLE test_es ( hour_of_day BIGINT,
> buy_cnt BIGINT "
> > + ") WITH ( 'connector.type' = 'elasticsearch',
> 'connector.version' = '6',"
> > + "'connector.hosts' = 'http://localhost:9200',
> 'connector.index' = 'buy_cnt_per_hour',"
> > + "'connector.document-type' = 'user_behavior',"
> > + "'connector.bulk-flush.max-actions' = '1',\n" + "
>   'format.type' = 'json',"
> > + "'update-mode' = 'append' )";
> > tableEnv.sqlUpdate(sinkDDL);
> > Table table = tableEnv.sqlQuery("select * from test_es ");
> > tableEnv.toRetractStream(table, Row.class).print();
> > streamEnv.execute("");
> > }
> >
> > }
> > 具体error
> > The matching candidates:
> > org.apache.flink.table.sources.CsvAppendTableSourceFactory
> > Mismatched properties:
> > 'connector.type' expects 'filesystem', but is 'elasticsearch'
> > 'format.type' expects 'csv', but is 'json'
> >
> > The following properties are requested:
> > connector.bulk-flush.max-actions=1
> > connector.document-type=user_behavior
> > connector.hosts=http://localhost:9200
> > connector.index=buy_cnt_per_hour
> > connector.type=elasticsearch
> > connector.version=6
> > format.type=json
> > schema.0.data-type=BIGINT
> > schema.0.name=hour_of_day
> > schema.1.data-type=BIGINT
> > schema.1.name=buy_cnt
> > update-mode=append
>
>


回复: 关于sql-gateway insert into 异常捕获的问题

2020-03-24 文章 111
Hi godfrey,
关于exceptions 这个rest api 的建议,试验了下,目前可以满足需求。非常感谢 !
Best,
Xinghalo


在2020年03月25日 11:37,111 写道:
Hi,
好的,我研究下哈。现在taskmanager的原理还不太熟,有问题再沟通
Best,
Xinghalo


在2020年03月25日 11:01,godfrey he 写道:
Hi,你可以尝试在yarn上去拿历史作业的日志

Best,
Godfrey

111  于2020年3月25日周三 上午10:53写道:

Hi,

目前确实通过sql-gateway拿到了jobId,并且获取到了状态。不过由于使用的是yarn-session,导致失败后taskmanager回收,taskmanager上的日志也就丢失了。
如果连接到Jobmanager上,是拿不到taskmanager曾经的日志吧?


Best,
xinghalo
在2020年03月25日 10:47,godfrey he 写道:
hi, sql gateway目前支持获取作业状态(要求jm还能查询到该作业)。如果要获取作业异常可以通过jm提供的REST API [1]

[1]

https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/rest_api.html#jobs-jobid-exceptions

Best,
Godfrey

111  于2020年3月25日周三 上午10:38写道:

Hi,
确实是执行报错….那如果是执行报错,flink本身是否有提供获取exception的机制呢?


| |
xinghalo
|
|
xingh...@163.com
|
签名由网易邮箱大师定制


在2020年03月25日 10:32,godfrey he 写道:
目前sql gateway只负责提交作业,不负责跟踪作业的状态。如果作业没提交成功,sql gateway会返回相关的错误;如果是执行时报错,sql
gateway不会返回错误。你看看flink web ui作业是否提交成功

Best,
Godfrey

111  于2020年3月25日周三 上午10:29写道:



Hi,
我试过了,insert into是不行的…sql-gateway的后台日志也没有任何报错。
怀疑是不是我使用的jdbc sink connector,内部是流的方式。流不会把异常抛给sql-gateway?


在2020年03月25日 10:26,godfrey he 写道:
hi, sql gateway当前会把服务端的完整异常栈返回给用户,
例如:
Caused by: java.util.concurrent.ExecutionException:
org.apache.flink.runtime.rest.util.RestClientException: [Internal server
error.,  于2020年3月25日周三 上午8:47写道:

Hi,
最近在使用sql-gateway,当使用
StatementExecuteResponseBody body = getInstance().sendRequest(
host,port,StatementExecuteHeaders.getInstance(),
new SessionMessageParameters(sessionId),
new StatementExecuteRequestBody(stmt, timeout)).get();
提交insert语句时,当任务失败,无法返回对应的异常信息;是目前版本暂时不支持这种特性吗?





回复: 关于sql-gateway insert into 异常捕获的问题

2020-03-24 文章 111
Hi,
好的,我研究下哈。现在taskmanager的原理还不太熟,有问题再沟通
Best,
Xinghalo


在2020年03月25日 11:01,godfrey he 写道:
Hi,你可以尝试在yarn上去拿历史作业的日志

Best,
Godfrey

111  于2020年3月25日周三 上午10:53写道:

Hi,

目前确实通过sql-gateway拿到了jobId,并且获取到了状态。不过由于使用的是yarn-session,导致失败后taskmanager回收,taskmanager上的日志也就丢失了。
如果连接到Jobmanager上,是拿不到taskmanager曾经的日志吧?


Best,
xinghalo
在2020年03月25日 10:47,godfrey he 写道:
hi, sql gateway目前支持获取作业状态(要求jm还能查询到该作业)。如果要获取作业异常可以通过jm提供的REST API [1]

[1]

https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/rest_api.html#jobs-jobid-exceptions

Best,
Godfrey

111  于2020年3月25日周三 上午10:38写道:

Hi,
确实是执行报错….那如果是执行报错,flink本身是否有提供获取exception的机制呢?


| |
xinghalo
|
|
xingh...@163.com
|
签名由网易邮箱大师定制


在2020年03月25日 10:32,godfrey he 写道:
目前sql gateway只负责提交作业,不负责跟踪作业的状态。如果作业没提交成功,sql gateway会返回相关的错误;如果是执行时报错,sql
gateway不会返回错误。你看看flink web ui作业是否提交成功

Best,
Godfrey

111  于2020年3月25日周三 上午10:29写道:



Hi,
我试过了,insert into是不行的…sql-gateway的后台日志也没有任何报错。
怀疑是不是我使用的jdbc sink connector,内部是流的方式。流不会把异常抛给sql-gateway?


在2020年03月25日 10:26,godfrey he 写道:
hi, sql gateway当前会把服务端的完整异常栈返回给用户,
例如:
Caused by: java.util.concurrent.ExecutionException:
org.apache.flink.runtime.rest.util.RestClientException: [Internal server
error.,  于2020年3月25日周三 上午8:47写道:

Hi,
最近在使用sql-gateway,当使用
StatementExecuteResponseBody body = getInstance().sendRequest(
host,port,StatementExecuteHeaders.getInstance(),
new SessionMessageParameters(sessionId),
new StatementExecuteRequestBody(stmt, timeout)).get();
提交insert语句时,当任务失败,无法返回对应的异常信息;是目前版本暂时不支持这种特性吗?





Re: 向您请教pyflink在windows上运行的问题,我第一次接触flink。

2020-03-24 文章 jincheng sun
很高兴收到您的邮件,我不太确定您具体看的是哪一个视频,所以很难确定您遇到的问题原因。您可以参考官方文档[1],
同时我个人博客里有一些3分钟的视频和文档入门教程[2],您可以先查阅一下。如又问题,可以保持邮件沟通,可以在我的博客留言!

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/tutorials/python_table_api.html
[2] https://enjoyment.cool/

Best,
Jincheng



xu1990xaut  于2020年3月24日周二 下午11:36写道:

> 您好,之前在哔哩哔哩上看过您讲的视频。  也跟着视频动手做了。
> 我用的flink1.10,在pip的时候是直接pip install apache-flink,结果默认就是1.10版本。
> 然后我在pycharm中运行word-count这个脚本时,一直不出结果,也不报错。   请问这是什么原因。
> 我也装了jdk,另外页面访问flink8081那个端口也可以出来界面。  我是第一次接触flink,在网上也搜过这个问题,
> 可是一直没有得到答案。  麻烦您,给小弟指点指点,   谢谢您了。
>
>
>
>


Re: 【PyFlink】对于数据以Csv()格式写入kafka报错,以及使用python udf时无法启动udf

2020-03-24 文章 jincheng sun
Hi Zhefu,

谢谢您分享解决问题的细节,这对社区有很大的贡献!

1. 关于订阅问题

我想确认一下,你是否参考了[1],同时以订阅中文用户列表(user-zh@flink.apache.org)为例,您需要发送邮件到(
user-zh-subscr...@flink.apache.org),就是在原有邮件的地址上添加subscribe。同时收到一封“confirm
subscribe to *user-zh*@flink.apache.org”的确认邮件,需要进行确认回复。

2. 关于JAR包冲突问题

flink-python
JAR会携带flink-python对beam依赖的核心JAR包,我这里想了解一些,为啥你集群上面存在这beam相关的包?另外我认为您提供的case很好,让我想到了可以对PyFlink对Beam的依赖进行一些优化,比如将beam进行relocation.
我已经创建了社区改进JIRA[2].

3. 关于打包问题

上传给PyFlink的Python环境包需要是在机器间可移植的,所以的确不能包含软链接。如果是用virtualenv创建的环境的话,需要加上--always-copy选项。此外,如果集群机器上已经有准备好的python3.5+的环境,可以不用上传环境包,直接使用add_python_executable("python3")为集群指定要使用的Python
Interpreter。
除了virtualenv,conda/miniconda
也可用于创建虚拟环境,但是大小要大很多,在virtualenv处于某些原因不work的时候(比如源python解释器依赖的so文件在集群上不存在),可以考虑使用。

再次感谢您分享问题的解决细节和问题的反馈!

Best,
Jincheng

[1]
https://flink.apache.org/community.html#how-to-subscribe-to-a-mailing-list
[2] https://issues.apache.org/jira/browse/FLINK-16762


Zhefu PENG  于2020年3月24日周二 下午9:33写道:

> Hi Jincheng,
>
> 在中文邮件用户列表里我无法回复自己的问题(我已经十分确认subscribe了mailing list, but i dont know why
> and how),所以在这里回复一下。 经过同事的帮忙和共同努力,我们初步解决了之前的疑问。反馈如下:
>
> 1. 首先是包冲突的问题,我们发现flink-python这个包下也有beam-runners-core-java-2.15.0.jar,
> beam-runners-direct-java-2.15.0.jar,  beam-runners-flink_2.11-2.15.0.jar,
> beam-sdk-java-core-2.15.0.jar这四个jar包的代码,若是运行的集群环境下本身也有这四个包的话,则会产生冲突,运行udf功能时找不到依赖。
>
> 2.
> 因为一些原因,集群的python默认环境无法改成python3,因此我们在代码中添加env.add_python_archive的功能,并且使用set_python_executor来帮助指定解释器。
> 用到的压缩包,在打包时一定要去掉软链接的使用,(可能在解压后不支持软链接查找。),我之前打包方式有错,所以出现了问题。
>
> 以上两点是我们排查出的原因,也终于能在cluster
> mode下成功运行udf。希望我们的反馈也能给你们的发展以及对用户的指导增加一点贡献。再次感谢帮助:)
>
> Best,
> Zhefu
>
> Zhefu PENG  于2020年3月20日周五 下午11:19写道:
>
>>
>> 感谢回复!beam就是直接用pip install方法安装的,因为在用pip install
>> apache-flink的时候发现有很多依赖,而且在安装时候要求安装beam的2.15.0的版本,我就安上了2.15.0。flink版本是1.10,没有从源码编译。因此我们也很困扰现在,希望能够求得帮助。
>>
>> 还有个小问题,就是我看到似乎你没有把日志一起转到邮件列表中,我想附带上去,那么我在订阅后,直接回复可以把日志贴上去吗?
>>
>> 顺祝,周末愉快:)非常感谢回复!
>>
>> 彭哲夫
>>
>> On Fri, Mar 20, 2020 at 22:34 jincheng sun 
>> wrote:
>>
>>> 彭哲夫,你好:
>>>
>>> 你是如何安装的beam,安装的版本是多少?如果是1.10 需要apache-beam 2.15,如果是 master需要apache-beam
>>> 2.19.
>>>
>>> BTW,  为了共享你的问题,我将你的问题发到了中文用户列表里面,我们大家一起讨论。
>>>
>>> Best,
>>> Jincheng
>>>
>>>
>>> Zhefu PENG  于2020年3月20日周五 下午5:12写道:
>>>
 Hi Jincheng,

 针对昨天提到的第二点,我做了两个思路:1. 将我的pyflink.py以本地模式运行,2.对集群节点进行环境配置

 1.
 在本地模式中,我们发现在装apache-beam包的时候出于某些原因没有装全,少了_bz2模块,再补充上之后,pyflink.py的脚本可以正常运行,其中的udf功能也能正常使用。

 2.
 在本地模式运行成功的基础上,我们根据你的建议,对所有的worker节点进行了环境的更新,都更新到了python3.6以及安装了apache_beam和apache-flink.
 但是以集群模式运行带有udf功能的脚本仍然报错,尝试谷歌搜索以后也没有搜到相关解答,在附件附上错误日志,希望能得到帮助(因为本地模式已经成功所以就不附带代码了),非常感谢!

 期待您的回复
 彭哲夫


 Zhefu PENG  于2020年3月19日周四 下午11:14写道:

> Hi Jincheng:
>
> 非常感谢你如此迅速而细致的回复!~
>
> 关于第一点:根据你的回复,我在flink的lib目录下增加flink-csv-1.10.0-sql-jar.jar包之后,运行成功。而第一个包我在之前浏览你博客中关于kafka的使用的demo(based
> on flink 1.9)中有看到并下载,因此这里有个提议,或许你未来可以对于后续使用者补充
> flink-csv-1.10.0-sql-jar.jar包的使用的必要性 :),但也有可能是我在查询学习时看漏了,但不管怎么说感谢你的帮助解决;
>
> 关于第二点:因为部门相关组织安排问题,我现在没有权限去worker节点上查询,但是针对这一点我有个好奇的地方:我目前只在启动脚本的主机上安装了python3.5+,
> 并且除了udf功能外,我都能正常使用(比如sql本身就有的concat之类,或者add_columns()这种简单功能)。所以是不是我理解为,如果要使用pyflink的全部功能,应该是集群的环境都要是python3.5+?
> 但是简单的功能,只要启动脚本的主机环境符合就够了?
> 还是关于第二点,我刚刚又重新跑了一下脚本,本来是希望能获得和之前一样的错误日志发给我的mentor,但是发现这次报了新的问题:
> java.lang.NoClassDefFoundError: Could not initialize class
> org.apache.beam.sdk.options.PipelineOptionsFactory
> at
> org.apache.flink.python.AbstractPythonFunctionRunner.open(AbstractPythonFunctionRunner.java:173)
> at
> org.apache.flink.table.runtime.operators.python.AbstractPythonScalarFunctionOperator$ProjectUdfInputPythonScalarFunctionRunner.open(AbstractPythonScalarFunctionOperator.java:193)
> at
> org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.open(AbstractPythonFunctionOperator.java:139)
> at
> org.apache.flink.table.runtime.operators.python.AbstractPythonScalarFunctionOperator.open(AbstractPythonScalarFunctionOperator.java:143)
> at
> org.apache.flink.table.runtime.operators.python.PythonScalarFunctionOperator.open(PythonScalarFunctionOperator.java:73)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1007)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
>
>
> 我猜测原因正如你提到的worker的环境不符标准,我会在明天上班后请同事帮忙check后,根据你的建议进行修改尝试。也希望能解答一下疑问,因为刚毕业参加工作,可能提的问题会显得比较低级,请见谅!
>
> 再次感谢你的回复,我会根据建议尽快进行错误修复
> 彭哲夫
>
> jincheng sun  于2020年3月19日周四 下午9:08写道:
>
>> 彭哲夫,你好:
>>
>> 你上面问题可能原因是:
>>
>> 1. pyflink默认不包含kafka
>> connector的jar包和csv的格式JIR包,需要把这些jar包加到pyflink的lib目录下:
>>
>> $ PYFLINK_LIB=`python -c "import pyflink;import
>> 

Re:Re: flinksql创建源表添加水位线失败

2020-03-24 文章 flink小猪
感谢您的回复,这是我lib目录下的jar包

flink-dist_2.11-1.10.0.jar  flink-sql-connector-kafka_2.11-1.10.0.jar  
flink-table-blink_2.11-1.10.0.jar  slf4j-log4j12-1.7.15.jar

flink-json-1.10.0.jar   flink-table_2.11-1.10.0.jar
log4j-1.2.17.jar
以下是提交任务的异常信息
org.apache.flink.client.program.ProgramInvocationException: The main method 
caused an error: From line 9, column 25 to line 9, column 26: Unknown 
identifier 'ts'
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895)
at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
at 
org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
Caused by: org.apache.calcite.runtime.CalciteContextException: From line 9, 
column 25 to line 9, column 26: Unknown identifier 'ts'
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463)
at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:834)
at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:819)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4841)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5667)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5587)
at org.apache.calcite.sql.SqlIdentifier.accept(SqlIdentifier.java:317)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1691)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1676)
at org.apache.calcite.sql.SqlOperator.deriveType(SqlOperator.java:501)
at 
org.apache.calcite.sql.SqlBinaryOperator.deriveType(SqlBinaryOperator.java:144)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5600)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5587)
at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1691)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1676)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:947)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateParameterizedExpression(SqlValidatorImpl.java:930)
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.lambda$createTableSchema$8(SqlToOperationConverter.java:509)
at java.util.Optional.ifPresent(Optional.java:159)
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.createTableSchema(SqlToOperationConverter.java:505)
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convertCreateTable(SqlToOperationConverter.java:177)
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:130)
at 
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:484)
at WindowUv$.main(WindowUv.scala:49)
at WindowUv.main(WindowUv.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321)
... 8 more
Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Unknown 
identifier 'ts'
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463)
at 

Re: 关于sql-gateway insert into 异常捕获的问题

2020-03-24 文章 godfrey he
Hi,你可以尝试在yarn上去拿历史作业的日志

Best,
Godfrey

111  于2020年3月25日周三 上午10:53写道:

> Hi,
>
> 目前确实通过sql-gateway拿到了jobId,并且获取到了状态。不过由于使用的是yarn-session,导致失败后taskmanager回收,taskmanager上的日志也就丢失了。
> 如果连接到Jobmanager上,是拿不到taskmanager曾经的日志吧?
>
>
> Best,
>  xinghalo
> 在2020年03月25日 10:47,godfrey he 写道:
> hi, sql gateway目前支持获取作业状态(要求jm还能查询到该作业)。如果要获取作业异常可以通过jm提供的REST API [1]
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/rest_api.html#jobs-jobid-exceptions
>
> Best,
> Godfrey
>
> 111  于2020年3月25日周三 上午10:38写道:
>
> Hi,
> 确实是执行报错….那如果是执行报错,flink本身是否有提供获取exception的机制呢?
>
>
> | |
> xinghalo
> |
> |
> xingh...@163.com
> |
> 签名由网易邮箱大师定制
>
>
> 在2020年03月25日 10:32,godfrey he 写道:
> 目前sql gateway只负责提交作业,不负责跟踪作业的状态。如果作业没提交成功,sql gateway会返回相关的错误;如果是执行时报错,sql
> gateway不会返回错误。你看看flink web ui作业是否提交成功
>
> Best,
> Godfrey
>
> 111  于2020年3月25日周三 上午10:29写道:
>
>
>
> Hi,
> 我试过了,insert into是不行的…sql-gateway的后台日志也没有任何报错。
> 怀疑是不是我使用的jdbc sink connector,内部是流的方式。流不会把异常抛给sql-gateway?
>
>
> 在2020年03月25日 10:26,godfrey he 写道:
> hi, sql gateway当前会把服务端的完整异常栈返回给用户,
> 例如:
> Caused by: java.util.concurrent.ExecutionException:
> org.apache.flink.runtime.rest.util.RestClientException: [Internal server
> error.,  com.ververica.flink.table.gateway.SqlExecutionException: xxx
>
> 你可以检查一下sql gateway的日志,看看是否相关的错误
>
> Best,
> Godfrey
>
> 111  于2020年3月25日周三 上午8:47写道:
>
> Hi,
> 最近在使用sql-gateway,当使用
> StatementExecuteResponseBody body = getInstance().sendRequest(
> host,port,StatementExecuteHeaders.getInstance(),
> new SessionMessageParameters(sessionId),
> new StatementExecuteRequestBody(stmt, timeout)).get();
> 提交insert语句时,当任务失败,无法返回对应的异常信息;是目前版本暂时不支持这种特性吗?
>
>
>


回复: 关于sql-gateway insert into 异常捕获的问题

2020-03-24 文章 111
Hi,
目前确实通过sql-gateway拿到了jobId,并且获取到了状态。不过由于使用的是yarn-session,导致失败后taskmanager回收,taskmanager上的日志也就丢失了。
如果连接到Jobmanager上,是拿不到taskmanager曾经的日志吧?


Best,
 xinghalo
在2020年03月25日 10:47,godfrey he 写道:
hi, sql gateway目前支持获取作业状态(要求jm还能查询到该作业)。如果要获取作业异常可以通过jm提供的REST API [1]

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/rest_api.html#jobs-jobid-exceptions

Best,
Godfrey

111  于2020年3月25日周三 上午10:38写道:

Hi,
确实是执行报错….那如果是执行报错,flink本身是否有提供获取exception的机制呢?


| |
xinghalo
|
|
xingh...@163.com
|
签名由网易邮箱大师定制


在2020年03月25日 10:32,godfrey he 写道:
目前sql gateway只负责提交作业,不负责跟踪作业的状态。如果作业没提交成功,sql gateway会返回相关的错误;如果是执行时报错,sql
gateway不会返回错误。你看看flink web ui作业是否提交成功

Best,
Godfrey

111  于2020年3月25日周三 上午10:29写道:



Hi,
我试过了,insert into是不行的…sql-gateway的后台日志也没有任何报错。
怀疑是不是我使用的jdbc sink connector,内部是流的方式。流不会把异常抛给sql-gateway?


在2020年03月25日 10:26,godfrey he 写道:
hi, sql gateway当前会把服务端的完整异常栈返回给用户,
例如:
Caused by: java.util.concurrent.ExecutionException:
org.apache.flink.runtime.rest.util.RestClientException: [Internal server
error.,  于2020年3月25日周三 上午8:47写道:

Hi,
最近在使用sql-gateway,当使用
StatementExecuteResponseBody body = getInstance().sendRequest(
host,port,StatementExecuteHeaders.getInstance(),
new SessionMessageParameters(sessionId),
new StatementExecuteRequestBody(stmt, timeout)).get();
提交insert语句时,当任务失败,无法返回对应的异常信息;是目前版本暂时不支持这种特性吗?




Re: 关于sql-gateway insert into 异常捕获的问题

2020-03-24 文章 godfrey he
hi, sql gateway目前支持获取作业状态(要求jm还能查询到该作业)。如果要获取作业异常可以通过jm提供的REST API [1]

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/rest_api.html#jobs-jobid-exceptions

Best,
Godfrey

111  于2020年3月25日周三 上午10:38写道:

> Hi,
> 确实是执行报错….那如果是执行报错,flink本身是否有提供获取exception的机制呢?
>
>
> | |
> xinghalo
> |
> |
> xingh...@163.com
> |
> 签名由网易邮箱大师定制
>
>
> 在2020年03月25日 10:32,godfrey he 写道:
> 目前sql gateway只负责提交作业,不负责跟踪作业的状态。如果作业没提交成功,sql gateway会返回相关的错误;如果是执行时报错,sql
> gateway不会返回错误。你看看flink web ui作业是否提交成功
>
> Best,
> Godfrey
>
> 111  于2020年3月25日周三 上午10:29写道:
>
>
>
> Hi,
> 我试过了,insert into是不行的…sql-gateway的后台日志也没有任何报错。
> 怀疑是不是我使用的jdbc sink connector,内部是流的方式。流不会把异常抛给sql-gateway?
>
>
> 在2020年03月25日 10:26,godfrey he 写道:
> hi, sql gateway当前会把服务端的完整异常栈返回给用户,
> 例如:
> Caused by: java.util.concurrent.ExecutionException:
> org.apache.flink.runtime.rest.util.RestClientException: [Internal server
> error.,  com.ververica.flink.table.gateway.SqlExecutionException: xxx
>
> 你可以检查一下sql gateway的日志,看看是否相关的错误
>
> Best,
> Godfrey
>
> 111  于2020年3月25日周三 上午8:47写道:
>
> Hi,
> 最近在使用sql-gateway,当使用
> StatementExecuteResponseBody body = getInstance().sendRequest(
> host,port,StatementExecuteHeaders.getInstance(),
> new SessionMessageParameters(sessionId),
> new StatementExecuteRequestBody(stmt, timeout)).get();
> 提交insert语句时,当任务失败,无法返回对应的异常信息;是目前版本暂时不支持这种特性吗?
>
>


回复: 关于sql-gateway insert into 异常捕获的问题

2020-03-24 文章 111
Hi,
确实是执行报错….那如果是执行报错,flink本身是否有提供获取exception的机制呢?


| |
xinghalo
|
|
xingh...@163.com
|
签名由网易邮箱大师定制


在2020年03月25日 10:32,godfrey he 写道:
目前sql gateway只负责提交作业,不负责跟踪作业的状态。如果作业没提交成功,sql gateway会返回相关的错误;如果是执行时报错,sql
gateway不会返回错误。你看看flink web ui作业是否提交成功

Best,
Godfrey

111  于2020年3月25日周三 上午10:29写道:



Hi,
我试过了,insert into是不行的…sql-gateway的后台日志也没有任何报错。
怀疑是不是我使用的jdbc sink connector,内部是流的方式。流不会把异常抛给sql-gateway?


在2020年03月25日 10:26,godfrey he 写道:
hi, sql gateway当前会把服务端的完整异常栈返回给用户,
例如:
Caused by: java.util.concurrent.ExecutionException:
org.apache.flink.runtime.rest.util.RestClientException: [Internal server
error.,  于2020年3月25日周三 上午8:47写道:

Hi,
最近在使用sql-gateway,当使用
StatementExecuteResponseBody body = getInstance().sendRequest(
host,port,StatementExecuteHeaders.getInstance(),
new SessionMessageParameters(sessionId),
new StatementExecuteRequestBody(stmt, timeout)).get();
提交insert语句时,当任务失败,无法返回对应的异常信息;是目前版本暂时不支持这种特性吗?



Re: 关于sql-gateway insert into 异常捕获的问题

2020-03-24 文章 godfrey he
目前sql gateway只负责提交作业,不负责跟踪作业的状态。如果作业没提交成功,sql gateway会返回相关的错误;如果是执行时报错,sql
gateway不会返回错误。你看看flink web ui作业是否提交成功

Best,
Godfrey

111  于2020年3月25日周三 上午10:29写道:

>
>
> Hi,
> 我试过了,insert into是不行的…sql-gateway的后台日志也没有任何报错。
> 怀疑是不是我使用的jdbc sink connector,内部是流的方式。流不会把异常抛给sql-gateway?
>
>
> 在2020年03月25日 10:26,godfrey he 写道:
> hi, sql gateway当前会把服务端的完整异常栈返回给用户,
> 例如:
> Caused by: java.util.concurrent.ExecutionException:
> org.apache.flink.runtime.rest.util.RestClientException: [Internal server
> error.,  com.ververica.flink.table.gateway.SqlExecutionException: xxx
>
> 你可以检查一下sql gateway的日志,看看是否相关的错误
>
> Best,
> Godfrey
>
> 111  于2020年3月25日周三 上午8:47写道:
>
> Hi,
> 最近在使用sql-gateway,当使用
> StatementExecuteResponseBody body = getInstance().sendRequest(
> host,port,StatementExecuteHeaders.getInstance(),
> new SessionMessageParameters(sessionId),
> new StatementExecuteRequestBody(stmt, timeout)).get();
> 提交insert语句时,当任务失败,无法返回对应的异常信息;是目前版本暂时不支持这种特性吗?
>


回复: 关于sql-gateway insert into 异常捕获的问题

2020-03-24 文章 111


Hi,
我试过了,insert into是不行的…sql-gateway的后台日志也没有任何报错。
怀疑是不是我使用的jdbc sink connector,内部是流的方式。流不会把异常抛给sql-gateway?


在2020年03月25日 10:26,godfrey he 写道:
hi, sql gateway当前会把服务端的完整异常栈返回给用户,
例如:
Caused by: java.util.concurrent.ExecutionException:
org.apache.flink.runtime.rest.util.RestClientException: [Internal server
error.,  于2020年3月25日周三 上午8:47写道:

Hi,
最近在使用sql-gateway,当使用
StatementExecuteResponseBody body = getInstance().sendRequest(
host,port,StatementExecuteHeaders.getInstance(),
new SessionMessageParameters(sessionId),
new StatementExecuteRequestBody(stmt, timeout)).get();
提交insert语句时,当任务失败,无法返回对应的异常信息;是目前版本暂时不支持这种特性吗?


Re: 关于sql-gateway insert into 异常捕获的问题

2020-03-24 文章 godfrey he
hi, sql gateway当前会把服务端的完整异常栈返回给用户,
例如:
Caused by: java.util.concurrent.ExecutionException:
org.apache.flink.runtime.rest.util.RestClientException: [Internal server
error.,  于2020年3月25日周三 上午8:47写道:

> Hi,
> 最近在使用sql-gateway,当使用
> StatementExecuteResponseBody body = getInstance().sendRequest(
> host,port,StatementExecuteHeaders.getInstance(),
> new SessionMessageParameters(sessionId),
> new StatementExecuteRequestBody(stmt, timeout)).get();
> 提交insert语句时,当任务失败,无法返回对应的异常信息;是目前版本暂时不支持这种特性吗?


Re: 如何提升任务cpu使用率

2020-03-24 文章 Xintong Song
你的 Flink 版本是什么?运行环境是 Yarn?

降低 slot 数并不能提高 cpu 的使用率。默认情况下 yarn container 申请 vcore 数等于 slot 数,降低 slot
数相当于等比例地降低了每个 container 的 cpu 资源和计算需求。如果想提高 cpu 的使用率,可以考虑让 container 的
vcore 数少于 slot 数。通过 ‘yarn.containers.vcores’ 可以设置 container 的 vcore 数不用默认的
slot 数。

Thank you~

Xintong Song



On Tue, Mar 24, 2020 at 8:09 PM yanggang_it_job 
wrote:

> hi:
>背景介绍,现在集群的剩余核数不多,就去梳理了一些大任务。
>
>  
> 通过PromSQL:max(flink_taskmanager_Status_JVM_CPU_Load{job_name={job_name}})获取指定任务的cpu使用率,
>发现任务的cpu使用率普遍较低,一个slot为10的container,使用率大多小于6%。
>
>然后我测试中我降低container里面的slot数,发现cpu使用率并没有线性增加,同理我增大slot数也没有线性减少。
>
>我是不是测试的有问题呢?或者有什么相关思路吗?
>
>
>
>
>
>


Re: flinksql创建源表添加水位线失败

2020-03-24 文章 Jark Wu
Emm... 这个好奇怪,理论上 IDEA 中运行的时候可能会有问题 (Calcite bug 导致的问题),SQL CLI 中不应该有问题。
你的集群/作业中有其他的依赖吗? 比如自己依赖了 Calcite?

Best,
Jark

On Tue, 24 Mar 2020 at 23:37, flink小猪 <18579099...@163.com> wrote:

> 当我尝试通过sql client创建kafka源表时(这里借鉴了云邪大佬的《Demo:基于 Flink SQL 构建流式应用》文章),
> CREATE TABLE user_behavior (
> user_id BIGINT,
> item_id BIGINT,
> category_id BIGINT,
> behavior STRING,
> ts TIMESTAMP(3),
> proctime as PROCTIME(),
> WATERMARK FOR ts as ts - INTERVAL '5' SECOND
> ) WITH (
> 'connector.type' = 'kafka',
> 'connector.version' = 'universal',
> 'connector.topic' = 'user_behavior',
> 'connector.startup-mode' = 'earliest-offset',
> 'connector.properties.zookeeper.connect' = '192.168.1.214:2181',
> 'connector.properties.bootstrap.servers' = '192.168.1.214:9092',
> 'format.type' = 'json'
> )
> 出现错误
> org.apache.calcite.sql.validate.SqlValidatorException: Unknown identifier
> 'ts'
> 我以为是我的建表语句有问题,但是当我通过代码(在idea上运行)创建这个表时,没有任何问题,数据也是可以正常查询到。当我提交到集群上运行时又出现org.apache.calcite.sql.validate.SqlValidatorException:
> Unknown identifier 'ts'
> ,我猜测是jar不全的问题,但是我按照云邪大佬的文章导入了
> flink-sql-connector-kafka_2.11-1.10.0.jar
> flink-json-1.10.0.jar
> 在sql-client上执行还是错误,是我缺少什么jar包吗?
>
>
>
>
>
>


回复:关于flink sql 1.10 source并行度自动推断的疑问

2020-03-24 文章 Jun Zhang
hi,Chief:
  
目前flink读取hive的时候,如果开启了自动推断,系统会根据所读取的文件数来推断并发,如果没有超过最大并发数(默认1000),source的并行度就等于你文件的个数,
你可以通过table.exec.hive.infer-source-parallelism.max来设置source的最大并发度。



BestJun


-- 原始邮件 --
发件人: Kurt Young 

Re: 关于flink sql 1.10 source并行度自动推断的疑问

2020-03-24 文章 Jun Zhang
hi,Chief:

目前flink读取hive的时候,如果开启了自动推断,系统会根据所读取的文件数来推断并发,如果没有超过最大并发数(默认1000),source的并行度就等于你文件的个数,
你可以通过table.exec.hive.infer-source-parallelism.max来设置source的最大并发度。

Kurt Young  于2020年3月25日周三 上午8:53写道:

> 你的数据量有多大?有一个可能的原因是source的其他并发调度起来的时候,数据已经被先调度起来的并发读完了。
>
> Best,
> Kurt
>
>
> On Tue, Mar 24, 2020 at 10:39 PM Chief  wrote:
>
> > hi all:
> > 之前用flink sql查询hive的数据,hive的数据文件是150个,sql
> > client配置文件设置的并行度是10,source通过自动推断生成了150并发,但是通过看web
> > ui发现只有前十个子任务是读到数据了,其他的任务显示没有读到数据,请问是我设置有问题吗?
>


Re: 关于flink sql 1.10 source并行度自动推断的疑问

2020-03-24 文章 Kurt Young
你的数据量有多大?有一个可能的原因是source的其他并发调度起来的时候,数据已经被先调度起来的并发读完了。

Best,
Kurt


On Tue, Mar 24, 2020 at 10:39 PM Chief  wrote:

> hi all:
> 之前用flink sql查询hive的数据,hive的数据文件是150个,sql
> client配置文件设置的并行度是10,source通过自动推断生成了150并发,但是通过看web
> ui发现只有前十个子任务是读到数据了,其他的任务显示没有读到数据,请问是我设置有问题吗?


关于sql-gateway insert into 异常捕获的问题

2020-03-24 文章 111
Hi,
最近在使用sql-gateway,当使用
StatementExecuteResponseBody body = getInstance().sendRequest(
host,port,StatementExecuteHeaders.getInstance(),
new SessionMessageParameters(sessionId),
new StatementExecuteRequestBody(stmt, timeout)).get();
提交insert语句时,当任务失败,无法返回对应的异常信息;是目前版本暂时不支持这种特性吗?

flinksql创建源表添加水位线失败

2020-03-24 文章 flink小猪
当我尝试通过sql client创建kafka源表时(这里借鉴了云邪大佬的《Demo:基于 Flink SQL 构建流式应用》文章),
CREATE TABLE user_behavior (
user_id BIGINT,
item_id BIGINT,
category_id BIGINT,
behavior STRING,
ts TIMESTAMP(3),
proctime as PROCTIME(),
WATERMARK FOR ts as ts - INTERVAL '5' SECOND
) WITH (
'connector.type' = 'kafka',
'connector.version' = 'universal',
'connector.topic' = 'user_behavior',
'connector.startup-mode' = 'earliest-offset',
'connector.properties.zookeeper.connect' = '192.168.1.214:2181',
'connector.properties.bootstrap.servers' = '192.168.1.214:9092',
'format.type' = 'json'
)
出现错误
org.apache.calcite.sql.validate.SqlValidatorException: Unknown identifier 'ts'
我以为是我的建表语句有问题,但是当我通过代码(在idea上运行)创建这个表时,没有任何问题,数据也是可以正常查询到。当我提交到集群上运行时又出现org.apache.calcite.sql.validate.SqlValidatorException:
 Unknown identifier 'ts'
,我猜测是jar不全的问题,但是我按照云邪大佬的文章导入了
flink-sql-connector-kafka_2.11-1.10.0.jar
flink-json-1.10.0.jar
在sql-client上执行还是错误,是我缺少什么jar包吗?







Re: 关于 SQL DATE_FORMAT 的时区设置的构想

2020-03-24 文章 Jark Wu
Thanks for reporting this Weike.

首先,我觉得目前 Flink 返回 TIMESTAMP WITHOUT TIME ZONE 应该是有问题的。
因为 SQL 标准(SQL:2011 Part 2 Section 6.32)定义了返回类型是 WITH TIME ZONE。
另外 Calcite 文档中 [1] 也说返回的是 TIMESTAMP WITH TIME ZONE (虽然好像和实现不一致)
其他的一些数据库也都差不多:mysql [2], oracle[3]

Best,
Jark

[1]: https://calcite.apache.org/docs/reference.html#datetime-functions
[2]:
https://dev.mysql.com/doc/refman/8.0/en/date-and-time-functions.html#function_current-timestamp
[3]:
https://docs.oracle.com/cd/B28359_01/server.111/b28286/functions038.htm#SQLRF00629



On Tue, 24 Mar 2020 at 17:00, DONG, Weike  wrote:

> Hi Zhenghua,
>
> 感谢您的回复。感觉既然 Flink 已经提供了时区的设定,这方面也许可以进一步增强一些。CONVERT_TZ
> 用户很容易忘记或者漏掉,这里还是有不少完善的空间。
>
> Best,
> Weike
>
> On Tue, Mar 24, 2020 at 4:20 PM Zhenghua Gao  wrote:
>
> > CURRENT_TIMESTAMP 返回值类型是 TIMESTAMP (WITHOUT TIME ZONE),
> > 其语义可参考 java.time.LocalDateTime。
> > 其字符形式的表示并不随着时区变化而变化(如你所见,和UTC+0 一致)。
> >
> > 你的需求可以通过 CONVERT_TZ(timestamp_string, time_zone_from_string,
> > time_zone_to_string)
> >
> > *Best Regards,*
> > *Zhenghua Gao*
> >
> >
> > On Mon, Mar 23, 2020 at 10:12 PM DONG, Weike 
> > wrote:
> >
> > > Hi,
> > >
> > > 近期发现 Flink 的 Blink Planner 在 DATE_FORMAT 对 CURRENT_TIMESTAMP
> > > 做时间格式化为字符串时,默认以 UTC+0 为准。
> > >
> > > 长期以来,TableConfig 类里面有一个 setLocalTimeZone 方法;将其设置为东八区以后,发现格式化后的字符串仍然是
> > UTC+0
> > > 的。而深入来看,Flink 的时间格式化时的代码生成逻辑(time.scala)并未考虑时区的设置。
> > >
> > > 由于大多数用户的时区均不是 UTC+0(GMT、UTC),如果时间格式化、显示等都可以考虑到 TableConfig 中的时区设置,那么
> > Flink
> > > 是否会更用户友好一些呢?当然这个会涉及到不兼容的变更,需要谨慎一些。
> > >
> > > 也许提供一个 DATE_FORMAT_WITH_TIMEZONE 的内置函数,社区是否会更容易接受一些呢?
> > >
> > > 仅仅是个人一点想法,感谢 :)
> > >
> >
>


????flink sql 1.10 source????????????????????

2020-03-24 文章 Chief
hi all??
??flink sqlhivehive150sql 
client??10??source??150web 
ui

Re: Flink on YARN 使用Kerboros认证失败

2020-03-24 文章 nie...@163.com
对于Flink on YARN,最简单的情况是直接在终端 kinit,就能提交任务。flink本身不用配置。
Can't get Kerberos realm一般是是krb5.conf对应realm的配置的问题。

flink/hado...@example.com   
hadoop0不知道是不是主机,这看起来像是个服务的principal 。 这里应该是user的principal 就行了。






> 在 2020年3月24日,下午9:03,巫旭阳  写道:
> 
> 之前在使用hadoop client时设置了一个系统变量, 当这个变量没设置的时候就会报之前的错误
> System.setProperty("java.security.krb5.conf", 
> "C:\\Users\\86177\\Desktop\\tmp\\5\\krb5.conf" );
> 但flink on yarn 没有提供这个参数的设置。
> 
> 
> 
> 
> 
> 
> 
> 在 2020-03-24 20:52:44,"aven.wu"  写道:
> 
> Flink 提交作业到有kerboros认证的集群报以下异常
> 
> 
> 
> java.lang.Exception: unable to establish the security context
> at 
> org.apache.flink.runtime.security.SecurityUtils.install(SecurityUtils.java:73)
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1124)
> Caused by: java.lang.IllegalArgumentException: Can't get Kerberos realm
> at 
> org.apache.hadoop.security.HadoopKerberosName.setConfiguration(HadoopKerberosName.java:65)
> at 
> org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:276)
> at 
> org.apache.hadoop.security.UserGroupInformation.setConfiguration(UserGroupInformation.java:312)
> at 
> org.apache.flink.runtime.security.modules.HadoopModule.install(HadoopModule.java:70)
> at 
> org.apache.flink.runtime.security.SecurityUtils.install(SecurityUtils.java:67)
> ... 1 more
> Caused by: java.lang.reflect.InvocationTargetException
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at 
> org.apache.hadoop.security.authentication.util.KerberosUtil.getDefaultRealm(KerberosUtil.java:84)
> at 
> org.apache.hadoop.security.HadoopKerberosName.setConfiguration(HadoopKerberosName.java:63)
> ... 5 more
> Caused by: KrbException: Cannot locate default realm
> at sun.security.krb5.Config.getDefaultRealm(Config.java:1029)
> ... 11 more
> 
> 
> 
> 使用了官网提供的四个参数,配置在了flink-conf.yaml里
> 
> 
> 
> security.kerberos.login.use-ticket-cache: false
> security.kerberos.login.keytab: /home/flink-1.8.0/conf/flink.keytab
> security.kerberos.login.principal: flink/hado...@example.com
> security.kerberos.login.realm: EXAMPLE.COM
> security.kerberos.login.contexts: KafkaClient
> 
> 
> 
> /home/flink-1.8.0/conf/flink.keytab 文件已放好,
> 
> 
> 
> 
> 
> Best
> 
> Aven
> 



回复: Flink JDBC Driver是否支持创建流数据表

2020-03-24 文章 wangl...@geekplus.com.cn

参考下这个文档: 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector
 
下面的语法应该是不支持的:
  'format.type' = 'csv',\n" +
"'format.field-delimiter' = '|'\n"

下面是我可以跑通的代码, kafka 里的数据需要是这种格式:{"order_no":"abcdefg","status":90}
tEnv.sqlUpdate("CREATE TABLE pick_order (\n"
+ "order_no VARCHAR,\n"
+ "status INT\n"
+ ") WITH (\n"
+ "'connector.type' = 'kafka',\n"
+ "'connector.version' = 'universal',\n"
+ "'connector.topic' = 'wanglei_test',\n"
+ "'connector.startup-mode' = 'latest-offset',\n"
+ "'connector.properties.0.key' = 'zookeeper.connect',\n"
+ "'connector.properties.0.value' = 'xxx:2181',\n"
+ "'connector.properties.1.key' = 'bootstrap.servers',\n"
+ "'connector.properties.1.value' = 'xxx:9092',\n"
+ "'update-mode' = 'append',\n"
+ "'format.type' = 'json',\n"
+ "'format.derive-schema' = 'true'\n"
+ ")");

王磊


wangl...@geekplus.com.cn
 
发件人: 赵峰
发送时间: 2020-03-24 21:28
收件人: user-zh
主题: Flink JDBC Driver是否支持创建流数据表
hi
   
Flink JDBC Driver创建kafka表报错,是我的建表代码不正确?代码如下:
Connection connection = 
DriverManager.getConnection("jdbc:flink://localhost:8083?planner=blink");
Statement statement = connection.createStatement();
 
statement.executeUpdate(
"CREATE TABLE table_kafka (\n" +
"user_id BIGINT,\n" +
"item_id BIGINT,\n" +
"category_id BIGINT,\n" +
"behavior STRING,\n" +
"ts TIMESTAMP(3),\n" +
"proctime as PROCTIME(),\n" +
"WATERMARK FOR ts as ts - INTERVAL '5' SECOND\n" +
") WITH (\n" +
"'connector.type' = 'kafka', \n" +
"'connector.version' = 'universal', \n" +
"'connector.topic' = 'flink_im02', \n" +
"'connector.properties.group.id' = 'flink_im02_new',\n" +
"'connector.startup-mode' = 'earliest-offset', \n" +
"'connector.properties.zookeeper.connect' = '*.*.*.*:2181', \n" +
"'connector.properties.bootstrap.servers' = '*.*.*.*:9092', \n" +
"'format.type' = 'csv',\n" +
"'format.field-delimiter' = '|'\n" +
")");
ResultSet rs1 = statement.executeQuery("SELECT * FROM table_kafka");
while (rs1.next()) {
System.out.println(rs1.getInt(1) + ", " + rs1.getInt(2));
} 
 
statement.close();
connection.close();
 
报错:
Reason: Required context properties mismatch.
 
The matching candidates:
org.apache.flink.table.sources.CsvBatchTableSourceFactory
Mismatched properties:
'connector.type' expects 'filesystem', but is 'kafka'
 
 
 
 
赵峰


Flink JDBC Driver是否支持创建流数据表

2020-03-24 文章 赵峰
hi
   
Flink JDBC Driver创建kafka表报错,是我的建表代码不正确?代码如下:
Connection connection = 
DriverManager.getConnection("jdbc:flink://localhost:8083?planner=blink");
Statement statement = connection.createStatement();

statement.executeUpdate(
"CREATE TABLE table_kafka (\n" +
"user_id BIGINT,\n" +
"item_id BIGINT,\n" +
"category_id BIGINT,\n" +
"behavior STRING,\n" +
"ts TIMESTAMP(3),\n" +
"proctime as PROCTIME(),\n" +
"WATERMARK FOR ts as ts - INTERVAL '5' SECOND\n" +
") WITH (\n" +
"'connector.type' = 'kafka', \n" +
"'connector.version' = 'universal', \n" +
"'connector.topic' = 'flink_im02', \n" +
"'connector.properties.group.id' = 'flink_im02_new',\n" +
"'connector.startup-mode' = 'earliest-offset', \n" +
"'connector.properties.zookeeper.connect' = '*.*.*.*:2181', \n" +
"'connector.properties.bootstrap.servers' = '*.*.*.*:9092', \n" +
"'format.type' = 'csv',\n" +
"'format.field-delimiter' = '|'\n" +
")");
ResultSet rs1 = statement.executeQuery("SELECT * FROM table_kafka");
while (rs1.next()) {
System.out.println(rs1.getInt(1) + ", " + rs1.getInt(2));
} 

statement.close();
connection.close();

报错:
Reason: Required context properties mismatch.

The matching candidates:
org.apache.flink.table.sources.CsvBatchTableSourceFactory
Mismatched properties:
'connector.type' expects 'filesystem', but is 'kafka'




赵峰

Flink JDBC Driver可以创建kafka表吗?

2020-03-24 文章 赵峰
hi
   Flink JDBC Driver创建kafka表报错,是我的建表代码不正确?代码如下:
Connection connection = 
DriverManager.getConnection("jdbc:flink://localhost:8083?planner=blink");
Statement statement = connection.createStatement();

statement.executeUpdate(
"CREATE TABLE table_kafka (\n" +
"user_id BIGINT,\n" +
"item_id BIGINT,\n" +
"category_id BIGINT,\n" +
"behavior STRING,\n" +
"ts TIMESTAMP(3),\n" +
"proctime as PROCTIME(),\n" +
"WATERMARK FOR ts as ts - INTERVAL '5' SECOND\n" +
") WITH (\n" +
"'connector.type' = 'kafka', \n" +
"'connector.version' = 'universal', \n" +
"'connector.topic' = 'flink_im02', \n" +
"'connector.properties.group.id' = 'flink_im02_new',\n" +
"'connector.startup-mode' = 'earliest-offset', \n" +
"'connector.properties.zookeeper.connect' = '*.*.*.*:2181', \n" +
"'connector.properties.bootstrap.servers' = '*.*.*.*:9092', \n" +
"'format.type' = 'csv',\n" +
"'format.field-delimiter' = '|'\n" +
")");
ResultSet rs1 = statement.executeQuery("SELECT * FROM table_kafka");
while (rs1.next()) {
System.out.println(rs1.getInt(1) + ", " + rs1.getInt(2));
} 

statement.close();
connection.close();

报错:
Reason: Required context properties mismatch.

The matching candidates:
org.apache.flink.table.sources.CsvBatchTableSourceFactory
Mismatched properties:
'connector.type' expects 'filesystem', but is 'kafka'




赵峰

Re:Flink on YARN 使用Kerboros认证失败

2020-03-24 文章 巫旭阳
之前在使用hadoop client时设置了一个系统变量, 当这个变量没设置的时候就会报之前的错误
System.setProperty("java.security.krb5.conf", 
"C:\\Users\\86177\\Desktop\\tmp\\5\\krb5.conf" );
但flink on yarn 没有提供这个参数的设置。







在 2020-03-24 20:52:44,"aven.wu"  写道:

Flink 提交作业到有kerboros认证的集群报以下异常

 

java.lang.Exception: unable to establish the security context
at 
org.apache.flink.runtime.security.SecurityUtils.install(SecurityUtils.java:73)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1124)
Caused by: java.lang.IllegalArgumentException: Can't get Kerberos realm
at 
org.apache.hadoop.security.HadoopKerberosName.setConfiguration(HadoopKerberosName.java:65)
at 
org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:276)
at 
org.apache.hadoop.security.UserGroupInformation.setConfiguration(UserGroupInformation.java:312)
at 
org.apache.flink.runtime.security.modules.HadoopModule.install(HadoopModule.java:70)
at 
org.apache.flink.runtime.security.SecurityUtils.install(SecurityUtils.java:67)
... 1 more
Caused by: java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.hadoop.security.authentication.util.KerberosUtil.getDefaultRealm(KerberosUtil.java:84)
at 
org.apache.hadoop.security.HadoopKerberosName.setConfiguration(HadoopKerberosName.java:63)
... 5 more
Caused by: KrbException: Cannot locate default realm
at sun.security.krb5.Config.getDefaultRealm(Config.java:1029)
... 11 more

 

使用了官网提供的四个参数,配置在了flink-conf.yaml里

 

security.kerberos.login.use-ticket-cache: false
security.kerberos.login.keytab: /home/flink-1.8.0/conf/flink.keytab
security.kerberos.login.principal: flink/hado...@example.com
security.kerberos.login.realm: EXAMPLE.COM
security.kerberos.login.contexts: KafkaClient

 

/home/flink-1.8.0/conf/flink.keytab 文件已放好,

 

 

Best

Aven

 

Flink on YARN 使用Kerboros认证失败

2020-03-24 文章 aven . wu
Flink 提交作业到有kerboros认证的集群报以下异常

java.lang.Exception: unable to establish the security context
at 
org.apache.flink.runtime.security.SecurityUtils.install(SecurityUtils.java:73)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1124)
Caused by: java.lang.IllegalArgumentException: Can't get Kerberos realm
at 
org.apache.hadoop.security.HadoopKerberosName.setConfiguration(HadoopKerberosName.java:65)
at 
org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:276)
at 
org.apache.hadoop.security.UserGroupInformation.setConfiguration(UserGroupInformation.java:312)
at 
org.apache.flink.runtime.security.modules.HadoopModule.install(HadoopModule.java:70)
at 
org.apache.flink.runtime.security.SecurityUtils.install(SecurityUtils.java:67)
... 1 more
Caused by: java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.hadoop.security.authentication.util.KerberosUtil.getDefaultRealm(KerberosUtil.java:84)
at 
org.apache.hadoop.security.HadoopKerberosName.setConfiguration(HadoopKerberosName.java:63)
... 5 more
Caused by: KrbException: Cannot locate default realm
at sun.security.krb5.Config.getDefaultRealm(Config.java:1029)
... 11 more

使用了官网提供的四个参数,配置在了flink-conf.yaml里

security.kerberos.login.use-ticket-cache: false
security.kerberos.login.keytab: /home/flink-1.8.0/conf/flink.keytab
security.kerberos.login.principal: flink/hado...@example.com
security.kerberos.login.realm: EXAMPLE.COM
security.kerberos.login.contexts: KafkaClient

/home/flink-1.8.0/conf/flink.keytab 文件已放好,


Best
Aven



如何提升任务cpu使用率

2020-03-24 文章 yanggang_it_job
hi:
   背景介绍,现在集群的剩余核数不多,就去梳理了一些大任务。
   
通过PromSQL:max(flink_taskmanager_Status_JVM_CPU_Load{job_name={job_name}})获取指定任务的cpu使用率,
   发现任务的cpu使用率普遍较低,一个slot为10的container,使用率大多小于6%。
   
   然后我测试中我降低container里面的slot数,发现cpu使用率并没有线性增加,同理我增大slot数也没有线性减少。
   
   我是不是测试的有问题呢?或者有什么相关思路吗?




  
 

Re: JDBC Sink参数connector.write.max-retries 在Oracle中的bug

2020-03-24 文章 Leonard Xu
Hi, xinghalo
这是jdbc sink 的AppenOnlyWriter的一个已知bug,在1.10.1里已经修复[1],社区近期在准备1.10.1的发布,
建议等1.10.1发布后升级即可。

Best,
Leonard

[1]https://issues.apache.org/jira/browse/FLINK-16281 


> 在 2020年3月24日,18:32,111  写道:
> 
> Hi,
> 在使用jdbc sink时,底层使用oracle驱动会出现bug。
> 出现的现象:当max-retries参数设置为1时,任务能正常报错;当max-retries参数大于1时,虽然程序内部报错,但是任务总是正常结束。
> 
> 
> 在JDBCUpsertOutputFormat.java中的flush()方法中,设计了重试机制:
> public synchronized void flush() throws Exception {
>   checkFlushException();
> 
>   for (int i = 1; i <= maxRetryTimes; i++) {
> try {
> jdbcWriter.executeBatch();
> batchCount = 0;
> break;
> } catch (SQLException e) {
> LOG.error("JDBC executeBatch error, retry times = {}", i, e);
> if (i >= maxRetryTimes) {
> //throw e;
> System.exit(-1);
> }
> Thread.sleep(1000 * i);
> }
>   }
> }
> 但是当executeBatch出现异常时,会进入异常捕获,并清空rank信息(ojdbc14的5339行):
> } catch (SQLException var17) {
> this.clearBatch();
>this.needToParse = true;
>if (this.sqlKind != 1 && this.sqlKind != 4) {
> for(var3 = 0; var3 < var4.length; ++var3) {
>var4[var3] = -3;
> }
>}
> 
>DatabaseError.throwBatchUpdateException(var17, this.sqlKind != 1 && 
> this.sqlKind != 4 ? var4.length : var3, var4);
> } finally {
> 下一次执行executeBatch时,由于rank为0,会直接跳过插入操作,返回成功。
> public int[] executeBatch() throws SQLException {
> synchronized(this.connection) {
> int[] var1;
>synchronized(this) {
> int var3 = 0;
>this.setJdbcBatchStyle();
>int[] var4 = new int[this.currentRank];
>if (this.currentRank > 0) {
> this.ensureOpen();
> 从而导致第二次重试的时候直接跳过插入操作,成功返回。
> 
> 
> 
> 
> 



JDBC Sink参数connector.write.max-retries 在Oracle中的bug

2020-03-24 文章 111
Hi,
在使用jdbc sink时,底层使用oracle驱动会出现bug。
出现的现象:当max-retries参数设置为1时,任务能正常报错;当max-retries参数大于1时,虽然程序内部报错,但是任务总是正常结束。


在JDBCUpsertOutputFormat.java中的flush()方法中,设计了重试机制:
public synchronized void flush() throws Exception {
   checkFlushException();

   for (int i = 1; i <= maxRetryTimes; i++) {
try {
jdbcWriter.executeBatch();
batchCount = 0;
 break;
} catch (SQLException e) {
LOG.error("JDBC executeBatch error, retry times = {}", i, e);
 if (i >= maxRetryTimes) {
//throw e;
System.exit(-1);
}
 Thread.sleep(1000 * i);
}
   }
}
但是当executeBatch出现异常时,会进入异常捕获,并清空rank信息(ojdbc14的5339行):
} catch (SQLException var17) {
this.clearBatch();
this.needToParse = true;
if (this.sqlKind != 1 && this.sqlKind != 4) {
for(var3 = 0; var3 < var4.length; ++var3) {
var4[var3] = -3;
}
}

DatabaseError.throwBatchUpdateException(var17, this.sqlKind != 1 && 
this.sqlKind != 4 ? var4.length : var3, var4);
} finally {
下一次执行executeBatch时,由于rank为0,会直接跳过插入操作,返回成功。
public int[] executeBatch() throws SQLException {
synchronized(this.connection) {
int[] var1;
synchronized(this) {
int var3 = 0;
this.setJdbcBatchStyle();
int[] var4 = new int[this.currentRank];
if (this.currentRank > 0) {
this.ensureOpen();
从而导致第二次重试的时候直接跳过插入操作,成功返回。







Re: FLINK SQL中时间戳怎么处理处理

2020-03-24 文章 Leonard Xu
Hi,吴志勇
你的SQL表定义应该没问题,出问题的地方 现在flink的 json format 遵循 
RFC3399标准[1],其识别的timestamp的格式是:'-MM-dd'T'HH:mm:ss.SSS’Z', 暂不支持long解析为 
timestamp,你可以在输出到kafka时将timestamp转换成该格式:
DateFormat dateFormat =  new SimpleDateFormat("-MM-dd'T'HH:mm:ss.SSS'Z'");
Date date = new Date(System.currentTimeMillis());
String jsonSchemaDate = dateFormat.format(date);
//{"id":5,"price":40,"timestamp”:”2020-03-24T18:23:38.123Z","type":"math"}
这个问题社区已经有issue [2]在跟进了,该issue会支持long转timestamp。

另外,如果是中文提问请发user-zh邮件列表,user邮件列表请用英语哈^_^

Best,
Leonard

[1]https://json-schema.org/understanding-json-schema/reference/string.html#dates-and-times
 

[2]https://issues.apache.org/jira/browse/FLINK-16725 


> 在 2020年3月23日,15:35,吴志勇 <1154365...@qq.com> 写道:
> 
> 如题:
> 我向kafka中输出了json格式的数据
> {"id":5,"price":40,"timestamp":1584942626828,"type":"math"}
> {"id":2,"price":70,"timestamp":1584942629638,"type":"math"}
> {"id":2,"price":70,"timestamp":1584942634951,"type":"math"}
> 
> 其中timestamp字段是13位时间戳,对应的SQL表中应该怎么处理成时间格式呢?
>   - name: bookpojo
> type: source-table
> connector:
>   property-version: 1
>   type: kafka
>   version: "universal"
>   topic: pojosource
>   startup-mode: earliest-offset
>   properties:
> zookeeper.connect: localhost:2181
> bootstrap.servers: localhost:9092
> group.id: testGroup
> format:
>   property-version: 1
>   type: json
>   schema: "ROW"
> schema:
>   - name: id
> data-type: INT
>   - name: type
> data-type: STRING
>   - name: price
> data-type: INT
>   - name: timestamp
> data-type: TIMESTAMP(3)
> 
> 上述配置,好像有问题。
> 
> 我在官网中找到这样一句说明:
> 字符串和时间类型:时间类型必须根据Java SQL时间格式进行格式化,并以毫秒为单位。例如: 
> 2018-01-01日期,20:43:59时间和2018-01-01 20:43:59.999时间戳。
> 时间一定得是字符串类型且带毫秒吗?
> 
> 谢谢。
> 



Re: ddl es 报错

2020-03-24 文章 Leonard Xu
Hi, 出发
看起来是你缺少了依赖es connector的依赖[1],所以只能找到内置的filesystem connector,目前内置的filesystem 
connector只支持csv format,所以会有这个错误。
在项目中加上缺失的依赖即可,如果使用SQL CLI,也需要将依赖的jar包放到 flink的lib目录下。


org.apache.flink
flink-sql-connector-elasticsearch6_2.11
${flink.version}


org.apache.flink
flink-json
${flink.version}


Best,
Leonard
[1] 
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connect.html#elasticsearch-connector
 



> 在 2020年3月23日,23:30,出发 <573693...@qq.com> 写道:
> 
> 
> 源码如下:
> CREATE TABLE buy_cnt_per_hour ( 
> hour_of_day BIGINT,
> buy_cnt BIGINT
> ) WITH (
> 'connector.type' = 'elasticsearch',
> 'connector.version' = '6',
> 'connector.hosts' = 'http://localhost:9200',
> 'connector.index' = 'buy_cnt_per_hour',
> 'connector.document-type' = 'user_behavior',
> 'connector.bulk-flush.max-actions' = '1',
> 'format.type' = 'json',
> 'update-mode' = 'append'
> )
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.table.api.EnvironmentSettings;
> import org.apache.flink.table.api.Table;
> import org.apache.flink.table.api.java.StreamTableEnvironment;
> import org.apache.flink.types.Row;
> 
> public class ESTest {
> 
> public static void main(String[] args) throws Exception {
> 
> //2、设置运行环境
> StreamExecutionEnvironment streamEnv = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> EnvironmentSettings settings = 
> EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build();
> StreamTableEnvironment tableEnv = 
> StreamTableEnvironment.create(streamEnv, settings);
> streamEnv.setParallelism(1);
> String sinkDDL = " CREATE TABLE test_es ( hour_of_day BIGINT,  
> buy_cnt BIGINT "
> + ") WITH ( 'connector.type' = 'elasticsearch',  
> 'connector.version' = '6',"
> + "'connector.hosts' = 'http://localhost:9200',  
> 'connector.index' = 'buy_cnt_per_hour',"
> + "'connector.document-type' = 'user_behavior',"
> + "'connector.bulk-flush.max-actions' = '1',\n" + "
> 'format.type' = 'json',"
> + "'update-mode' = 'append' )";
> tableEnv.sqlUpdate(sinkDDL);
> Table table = tableEnv.sqlQuery("select * from test_es ");
> tableEnv.toRetractStream(table, Row.class).print();
> streamEnv.execute("");
> }
> 
> }
> 具体error
> The matching candidates:
> org.apache.flink.table.sources.CsvAppendTableSourceFactory
> Mismatched properties:
> 'connector.type' expects 'filesystem', but is 'elasticsearch'
> 'format.type' expects 'csv', but is 'json'
> 
> The following properties are requested:
> connector.bulk-flush.max-actions=1
> connector.document-type=user_behavior
> connector.hosts=http://localhost:9200
> connector.index=buy_cnt_per_hour
> connector.type=elasticsearch
> connector.version=6
> format.type=json
> schema.0.data-type=BIGINT
> schema.0.name=hour_of_day
> schema.1.data-type=BIGINT
> schema.1.name=buy_cnt
> update-mode=append



ddl es ????

2020-03-24 文章 ????
:
CREATE TABLE buy_cnt_per_hour (
  hour_of_day BIGINT,
  buy_cnt BIGINT
) WITH (
  'connector.type' = 'elasticsearch',
  'connector.version' = '6',
  'connector.hosts' = 'http://localhost:9200',
  'connector.index' = 'buy_cnt_per_hour',
  'connector.document-type' = 'user_behavior',
  'connector.bulk-flush.max-actions' = '1',
  'format.type' = 'json',
  'update-mode' = 'append'
)

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

public class ESTest {

public static void main(String[] args) throws Exception {

//2??
StreamExecutionEnvironment streamEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings settings = 
EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build();
StreamTableEnvironment tableEnv = 
StreamTableEnvironment.create(streamEnv, settings);
streamEnv.setParallelism(1);
String sinkDDL = " CREATE TABLE test_es ( hour_of_day BIGINT,  buy_cnt 
BIGINT "
+ ") WITH ( 'connector.type' = 'elasticsearch',  
'connector.version' = '6',"
+ "'connector.hosts' = 'http://localhost:9200',  
'connector.index' = 'buy_cnt_per_hour',"
+ "'connector.document-type' = 'user_behavior',"
+ "'connector.bulk-flush.max-actions' = '1',\n" + "
'format.type' = 'json',"
+ "'update-mode' = 'append' )";
tableEnv.sqlUpdate(sinkDDL);
Table table = tableEnv.sqlQuery("select * from test_es ");
tableEnv.toRetractStream(table, Row.class).print();
streamEnv.execute("");
}

}
errorThe matching candidates: 
org.apache.flink.table.sources.CsvAppendTableSourceFactory Mismatched 
properties: 'connector.type' expects 'filesystem', but is 'elasticsearch' 
'format.type' expects 'csv', but is 'json' The following properties are 
requested: connector.bulk-flush.max-actions=1 
connector.document-type=user_behavior connector.hosts=http://localhost:9200 
connector.index=buy_cnt_per_hour connector.type=elasticsearch 
connector.version=6 format.type=json schema.0.data-type=BIGINT 
schema.0.name=hour_of_day schema.1.data-type=BIGINT schema.1.name=buy_cnt 
update-mode=append

ddl es 报错

2020-03-24 文章 出发
图片是我用到的属性

Re: 关于 SQL DATE_FORMAT 的时区设置的构想

2020-03-24 文章 DONG, Weike
Hi Zhenghua,

感谢您的回复。感觉既然 Flink 已经提供了时区的设定,这方面也许可以进一步增强一些。CONVERT_TZ
用户很容易忘记或者漏掉,这里还是有不少完善的空间。

Best,
Weike

On Tue, Mar 24, 2020 at 4:20 PM Zhenghua Gao  wrote:

> CURRENT_TIMESTAMP 返回值类型是 TIMESTAMP (WITHOUT TIME ZONE),
> 其语义可参考 java.time.LocalDateTime。
> 其字符形式的表示并不随着时区变化而变化(如你所见,和UTC+0 一致)。
>
> 你的需求可以通过 CONVERT_TZ(timestamp_string, time_zone_from_string,
> time_zone_to_string)
>
> *Best Regards,*
> *Zhenghua Gao*
>
>
> On Mon, Mar 23, 2020 at 10:12 PM DONG, Weike 
> wrote:
>
> > Hi,
> >
> > 近期发现 Flink 的 Blink Planner 在 DATE_FORMAT 对 CURRENT_TIMESTAMP
> > 做时间格式化为字符串时,默认以 UTC+0 为准。
> >
> > 长期以来,TableConfig 类里面有一个 setLocalTimeZone 方法;将其设置为东八区以后,发现格式化后的字符串仍然是
> UTC+0
> > 的。而深入来看,Flink 的时间格式化时的代码生成逻辑(time.scala)并未考虑时区的设置。
> >
> > 由于大多数用户的时区均不是 UTC+0(GMT、UTC),如果时间格式化、显示等都可以考虑到 TableConfig 中的时区设置,那么
> Flink
> > 是否会更用户友好一些呢?当然这个会涉及到不兼容的变更,需要谨慎一些。
> >
> > 也许提供一个 DATE_FORMAT_WITH_TIMEZONE 的内置函数,社区是否会更容易接受一些呢?
> >
> > 仅仅是个人一点想法,感谢 :)
> >
>


Re: Flink 1.10 的 JDBCUpsertOutputFormat flush方法的重试机制无法生效

2020-03-24 文章 Leonard Xu
Hi, shangwen

这应该是AppendOnlyWriter的一个bug[1], 在1.10.1/1.11-SNAPSHOT(master)中已经修复.
用1.10.1或master分支就好了,目前1.10.1还未发布,我了解到的1.10.1社区正在准备发布中。
如果急需修复,你可以参考1.10.1分支的代码。

Best,
Leonard

[1]https://issues.apache.org/jira/browse/FLINK-16281 

[2]https://github.com/apache/flink/tree/release-1.10 


> 在 2020年3月23日,11:05,shangwen <583767...@qq.com> 写道:
> 
> 我们在测试环境测试JDBC写入postgresql的场景,用tcpkill模拟链接被关闭的情况,测试对异常的兼容性,我们发现一直打印类似的日志
> 
> 
> 2020-03-20 21:16:21.247 [jdbc-upsert-output-format-thread-1] ERROR 
> org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat - JDBC 
> executeBatch error, retry times = 1
> org.postgresql.util.PSQLException: This connection has been closed.
>   at org.postgresql.jdbc.PgConnection.checkClosed(PgConnection.java:857)
>   at org.postgresql.jdbc.PgConnection.getAutoCommit(PgConnection.java:817)
>   at 
> org.postgresql.jdbc.PgStatement.internalExecuteBatch(PgStatement.java:813)
>   at org.postgresql.jdbc.PgStatement.executeBatch(PgStatement.java:873)
>   at 
> org.postgresql.jdbc.PgPreparedStatement.executeBatch(PgPreparedStatement.java:1569)
>   at 
> org.apache.flink.api.java.io.jdbc.writer.AppendOnlyWriter.executeBatch(AppendOnlyWriter.java:62)
>   at 
> org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat.flush(JDBCUpsertOutputFormat.java:159)
>   at 
> org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat.lambda$open$0(JDBCUpsertOutputFormat.java:124)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
>   at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>   at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>   at java.lang.Thread.run(Thread.java:748)
> 
> 
> 
> 从日志上看来,链接被关闭,只会导致重试一次,这里是Flink的重试代码
> //JDBCUpsertOutputFormat.javapublic synchronized void flush() throws 
> Exception {
>   checkFlushException();
> 
>   for (int i = 1; i <= maxRetryTimes; i++) {
>  try {
> jdbcWriter.executeBatch();
> batchCount = 0;
> break;
>  } catch (SQLException e) {
> LOG.error("JDBC executeBatch error, retry times = {}", i, e);
> if (i = maxRetryTimes) {
>throw e;
> }
> Thread.sleep(1000 * i);
>  }
>   }
> }
> 
> 
> 通过远程debug分析,在第一次执行
> JDBCUpsertOutputFormat.flush
>  - AppendOnlyWriter.executeBatch
>   ...
>   - PgConnection.getAutoCommit
> 抛出PSQLException: This connection has been closed时,batchStatements在这之前已经被清空
> // PgStatement.java private BatchResultHandler internalExecuteBatch() throws 
> SQLException {   // Construct query/parameter arrays.   
> transformQueriesAndParameters();   // Empty arrays should be passed to 
> toArray   // see http://shipilev.net/blog/2016/arrays-wisdom-ancients/   
> Query[] queries = batchStatements.toArray(new Query[0]);   ParameterList[] 
> parameterLists = batchParameters.toArray(new ParameterList[0]);   
> batchStatements.clear(); // 这里已经被清空   batchParameters.clear();   ...   if 
> (connection.getAutoCommit()) { // 抛出异常   flags |= 
> QueryExecutor.QUERY_SUPPRESS_BEGIN;   }   ... }
> 
> 
> 所以在Flink第二次重试执行这段代码的时候jdbcWriter.executeBatch的时候,batchStatements已经为Empty并直接返回了,导致Flink认为是成功的,这样会导致其实语句没有被执行?
> // PgStatement.java public int[] executeBatch() throws SQLException {   
> checkClosed();   closeForNextExecution();   if (batchStatements == null || 
> batchStatements.isEmpty()) { //这里就直接返回了 return new int[0];   }   return 
> internalExecuteBatch().getUpdateCount(); }
> 
> 
> 目前的想法是,当我出现异常捕获的时候,在重试之前,判断这个链接是否已经被关闭,如果已经被关闭,则重新open,不知道大家有啥看法,这是我提的issue
> https://issues.apache.org/jira/browse/FLINK-16708



Re: 关于 SQL DATE_FORMAT 的时区设置的构想

2020-03-24 文章 Zhenghua Gao
CURRENT_TIMESTAMP 返回值类型是 TIMESTAMP (WITHOUT TIME ZONE),
其语义可参考 java.time.LocalDateTime。
其字符形式的表示并不随着时区变化而变化(如你所见,和UTC+0 一致)。

你的需求可以通过 CONVERT_TZ(timestamp_string, time_zone_from_string,
time_zone_to_string)

*Best Regards,*
*Zhenghua Gao*


On Mon, Mar 23, 2020 at 10:12 PM DONG, Weike 
wrote:

> Hi,
>
> 近期发现 Flink 的 Blink Planner 在 DATE_FORMAT 对 CURRENT_TIMESTAMP
> 做时间格式化为字符串时,默认以 UTC+0 为准。
>
> 长期以来,TableConfig 类里面有一个 setLocalTimeZone 方法;将其设置为东八区以后,发现格式化后的字符串仍然是 UTC+0
> 的。而深入来看,Flink 的时间格式化时的代码生成逻辑(time.scala)并未考虑时区的设置。
>
> 由于大多数用户的时区均不是 UTC+0(GMT、UTC),如果时间格式化、显示等都可以考虑到 TableConfig 中的时区设置,那么 Flink
> 是否会更用户友好一些呢?当然这个会涉及到不兼容的变更,需要谨慎一些。
>
> 也许提供一个 DATE_FORMAT_WITH_TIMEZONE 的内置函数,社区是否会更容易接受一些呢?
>
> 仅仅是个人一点想法,感谢 :)
>