Re: ddl es 报错

2020-03-24 Thread jinhai wang
优秀!可以提个improve issue


Best Regards

jinhai...@gmail.com

> 2020年3月25日 下午1:40,zhisheng  写道:
> 
> hi,Leonar Xu
> 
> 官方 ES DDL 现在不支持填写 ES 集群的用户名和密码,我现在在公司已经做了扩展,加了这个功能,请问社区是否需要这个功能?我该怎么贡献呢?
> 
> 效果如图:http://zhisheng-blog.oss-cn-hangzhou.aliyuncs.com/2020-03-25-053948.png
> 
> Best Wishes!
> 
> zhisheng
> 
> Leonard Xu  于2020年3月24日周二 下午5:53写道:
> 
>> Hi, 出发
>> 看起来是你缺少了依赖es connector的依赖[1],所以只能找到内置的filesystem connector,目前内置的filesystem
>> connector只支持csv format,所以会有这个错误。
>> 在项目中加上缺失的依赖即可,如果使用SQL CLI,也需要将依赖的jar包放到 flink的lib目录下。
>> 
>> 
>>org.apache.flink
>>flink-sql-connector-elasticsearch6_2.11
>>${flink.version}
>> 
>> 
>>org.apache.flink
>>flink-json
>>${flink.version}
>> 
>> 
>> Best,
>> Leonard
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connect.html#elasticsearch-connector
>> <
>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connect.html#elasticsearch-connector
>>> 
>> 
>> 
>>> 在 2020年3月23日,23:30,出发 <573693...@qq.com> 写道:
>>> 
>>> 
>>> 源码如下:
>>> CREATE TABLE buy_cnt_per_hour (
>>>hour_of_day BIGINT,
>>>buy_cnt BIGINT
>>> ) WITH (
>>>'connector.type' = 'elasticsearch',
>>>'connector.version' = '6',
>>>'connector.hosts' = 'http://localhost:9200',
>>>'connector.index' = 'buy_cnt_per_hour',
>>>'connector.document-type' = 'user_behavior',
>>>'connector.bulk-flush.max-actions' = '1',
>>>'format.type' = 'json',
>>>'update-mode' = 'append'
>>> )
>>> import
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>>> import org.apache.flink.table.api.EnvironmentSettings;
>>> import org.apache.flink.table.api.Table;
>>> import org.apache.flink.table.api.java.StreamTableEnvironment;
>>> import org.apache.flink.types.Row;
>>> 
>>> public class ESTest {
>>> 
>>>public static void main(String[] args) throws Exception {
>>> 
>>>//2、设置运行环境
>>>StreamExecutionEnvironment streamEnv =
>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>EnvironmentSettings settings =
>> EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build();
>>>StreamTableEnvironment tableEnv =
>> StreamTableEnvironment.create(streamEnv, settings);
>>>streamEnv.setParallelism(1);
>>>String sinkDDL = " CREATE TABLE test_es ( hour_of_day BIGINT,
>> buy_cnt BIGINT "
>>>+ ") WITH ( 'connector.type' = 'elasticsearch',
>> 'connector.version' = '6',"
>>>+ "'connector.hosts' = 'http://localhost:9200',
>> 'connector.index' = 'buy_cnt_per_hour',"
>>>+ "'connector.document-type' = 'user_behavior',"
>>>+ "'connector.bulk-flush.max-actions' = '1',\n" + "
>>  'format.type' = 'json',"
>>>+ "'update-mode' = 'append' )";
>>>tableEnv.sqlUpdate(sinkDDL);
>>>Table table = tableEnv.sqlQuery("select * from test_es ");
>>>tableEnv.toRetractStream(table, Row.class).print();
>>>streamEnv.execute("");
>>>}
>>> 
>>> }
>>> 具体error
>>> The matching candidates:
>>> org.apache.flink.table.sources.CsvAppendTableSourceFactory
>>> Mismatched properties:
>>> 'connector.type' expects 'filesystem', but is 'elasticsearch'
>>> 'format.type' expects 'csv', but is 'json'
>>> 
>>> The following properties are requested:
>>> connector.bulk-flush.max-actions=1
>>> connector.document-type=user_behavior
>>> connector.hosts=http://localhost:9200
>>> connector.index=buy_cnt_per_hour
>>> connector.type=elasticsearch
>>> connector.version=6
>>> format.type=json
>>> schema.0.data-type=BIGINT
>>> schema.0.name=hour_of_day
>>> schema.1.data-type=BIGINT
>>> schema.1.name=buy_cnt
>>> update-mode=append
>> 
>> 



Re: ddl es 报错

2020-03-24 Thread zhisheng
hi,Leonar Xu

官方 ES DDL 现在不支持填写 ES 集群的用户名和密码,我现在在公司已经做了扩展,加了这个功能,请问社区是否需要这个功能?我该怎么贡献呢?

效果如图:http://zhisheng-blog.oss-cn-hangzhou.aliyuncs.com/2020-03-25-053948.png

Best Wishes!

zhisheng

Leonard Xu  于2020年3月24日周二 下午5:53写道:

> Hi, 出发
> 看起来是你缺少了依赖es connector的依赖[1],所以只能找到内置的filesystem connector,目前内置的filesystem
> connector只支持csv format,所以会有这个错误。
> 在项目中加上缺失的依赖即可,如果使用SQL CLI,也需要将依赖的jar包放到 flink的lib目录下。
>
> 
> org.apache.flink
> flink-sql-connector-elasticsearch6_2.11
> ${flink.version}
> 
> 
> org.apache.flink
> flink-json
> ${flink.version}
> 
>
> Best,
> Leonard
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connect.html#elasticsearch-connector
> <
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connect.html#elasticsearch-connector
> >
>
>
> > 在 2020年3月23日,23:30,出发 <573693...@qq.com> 写道:
> >
> >
> > 源码如下:
> > CREATE TABLE buy_cnt_per_hour (
> > hour_of_day BIGINT,
> > buy_cnt BIGINT
> > ) WITH (
> > 'connector.type' = 'elasticsearch',
> > 'connector.version' = '6',
> > 'connector.hosts' = 'http://localhost:9200',
> > 'connector.index' = 'buy_cnt_per_hour',
> > 'connector.document-type' = 'user_behavior',
> > 'connector.bulk-flush.max-actions' = '1',
> > 'format.type' = 'json',
> > 'update-mode' = 'append'
> > )
> > import
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> > import org.apache.flink.table.api.EnvironmentSettings;
> > import org.apache.flink.table.api.Table;
> > import org.apache.flink.table.api.java.StreamTableEnvironment;
> > import org.apache.flink.types.Row;
> >
> > public class ESTest {
> >
> > public static void main(String[] args) throws Exception {
> >
> > //2、设置运行环境
> > StreamExecutionEnvironment streamEnv =
> StreamExecutionEnvironment.getExecutionEnvironment();
> > EnvironmentSettings settings =
> EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build();
> > StreamTableEnvironment tableEnv =
> StreamTableEnvironment.create(streamEnv, settings);
> > streamEnv.setParallelism(1);
> > String sinkDDL = " CREATE TABLE test_es ( hour_of_day BIGINT,
> buy_cnt BIGINT "
> > + ") WITH ( 'connector.type' = 'elasticsearch',
> 'connector.version' = '6',"
> > + "'connector.hosts' = 'http://localhost:9200',
> 'connector.index' = 'buy_cnt_per_hour',"
> > + "'connector.document-type' = 'user_behavior',"
> > + "'connector.bulk-flush.max-actions' = '1',\n" + "
>   'format.type' = 'json',"
> > + "'update-mode' = 'append' )";
> > tableEnv.sqlUpdate(sinkDDL);
> > Table table = tableEnv.sqlQuery("select * from test_es ");
> > tableEnv.toRetractStream(table, Row.class).print();
> > streamEnv.execute("");
> > }
> >
> > }
> > 具体error
> > The matching candidates:
> > org.apache.flink.table.sources.CsvAppendTableSourceFactory
> > Mismatched properties:
> > 'connector.type' expects 'filesystem', but is 'elasticsearch'
> > 'format.type' expects 'csv', but is 'json'
> >
> > The following properties are requested:
> > connector.bulk-flush.max-actions=1
> > connector.document-type=user_behavior
> > connector.hosts=http://localhost:9200
> > connector.index=buy_cnt_per_hour
> > connector.type=elasticsearch
> > connector.version=6
> > format.type=json
> > schema.0.data-type=BIGINT
> > schema.0.name=hour_of_day
> > schema.1.data-type=BIGINT
> > schema.1.name=buy_cnt
> > update-mode=append
>
>


回复: 关于sql-gateway insert into 异常捕获的问题

2020-03-24 Thread 111
Hi godfrey,
关于exceptions 这个rest api 的建议,试验了下,目前可以满足需求。非常感谢 !
Best,
Xinghalo


在2020年03月25日 11:37,111 写道:
Hi,
好的,我研究下哈。现在taskmanager的原理还不太熟,有问题再沟通
Best,
Xinghalo


在2020年03月25日 11:01,godfrey he 写道:
Hi,你可以尝试在yarn上去拿历史作业的日志

Best,
Godfrey

111  于2020年3月25日周三 上午10:53写道:

Hi,

目前确实通过sql-gateway拿到了jobId,并且获取到了状态。不过由于使用的是yarn-session,导致失败后taskmanager回收,taskmanager上的日志也就丢失了。
如果连接到Jobmanager上,是拿不到taskmanager曾经的日志吧?


Best,
xinghalo
在2020年03月25日 10:47,godfrey he 写道:
hi, sql gateway目前支持获取作业状态(要求jm还能查询到该作业)。如果要获取作业异常可以通过jm提供的REST API [1]

[1]

https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/rest_api.html#jobs-jobid-exceptions

Best,
Godfrey

111  于2020年3月25日周三 上午10:38写道:

Hi,
确实是执行报错….那如果是执行报错,flink本身是否有提供获取exception的机制呢?


| |
xinghalo
|
|
xingh...@163.com
|
签名由网易邮箱大师定制


在2020年03月25日 10:32,godfrey he 写道:
目前sql gateway只负责提交作业,不负责跟踪作业的状态。如果作业没提交成功,sql gateway会返回相关的错误;如果是执行时报错,sql
gateway不会返回错误。你看看flink web ui作业是否提交成功

Best,
Godfrey

111  于2020年3月25日周三 上午10:29写道:



Hi,
我试过了,insert into是不行的…sql-gateway的后台日志也没有任何报错。
怀疑是不是我使用的jdbc sink connector,内部是流的方式。流不会把异常抛给sql-gateway?


在2020年03月25日 10:26,godfrey he 写道:
hi, sql gateway当前会把服务端的完整异常栈返回给用户,
例如:
Caused by: java.util.concurrent.ExecutionException:
org.apache.flink.runtime.rest.util.RestClientException: [Internal server
error.,  于2020年3月25日周三 上午8:47写道:

Hi,
最近在使用sql-gateway,当使用
StatementExecuteResponseBody body = getInstance().sendRequest(
host,port,StatementExecuteHeaders.getInstance(),
new SessionMessageParameters(sessionId),
new StatementExecuteRequestBody(stmt, timeout)).get();
提交insert语句时,当任务失败,无法返回对应的异常信息;是目前版本暂时不支持这种特性吗?





Re: Re: Where can i find MySQL retract stream table sink java soure code?

2020-03-24 Thread Jingsong Li
It is not the mean what you said.

There are two queries: append query and update query.

For update query, there are two ways to handle, one is retract, another is
upsert.
So the thing is a sink can choose a mode to handle update query. Just
choose one is OK.

You could read more in [1].

[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/dynamic_tables.html

Best,
Jingsong Lee

On Wed, Mar 25, 2020 at 11:57 AM wangl...@geekplus.com.cn <
wangl...@geekplus.com.cn> wrote:

> Thanks Jingsong.
>
> So JDBCTableSink now suport append and upsert mode.  Retract mode not
>  available yet. It is right?
>
> Thanks,
> Lei
>
>
> --
> wangl...@geekplus.com.cn
>
>
> *Sender:* Jingsong Li 
> *Send Time:* 2020-03-25 11:39
> *Receiver:* wangl...@geekplus.com.cn
> *cc:* user 
> *Subject:* Re: Where can i find MySQL retract stream table sink java
> soure code?
> Hi,
>
> Maybe you have some misunderstanding to upsert sink. You can take a look
> to [1], it can deal with "delete" records.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/dynamic_tables.html#table-to-stream-conversion
>
> Best,
> Jingsong Lee
>
> On Wed, Mar 25, 2020 at 11:37 AM Jingsong Li 
> wrote:
>
>> Hi,
>>
>> This can be a upsert stream [1], and JDBC has upsert sink now [2].
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/dynamic_tables.html#table-to-stream-conversion
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connect.html#jdbc-connector
>>
>> Best,
>> Jingsong Lee
>>
>> On Wed, Mar 25, 2020 at 11:14 AM Jingsong Li 
>> wrote:
>>
>>> Hi,
>>>
>>> This can be a upsert stream [1]
>>>
>>> Best,
>>> Jingsong Lee
>>>
>>> On Wed, Mar 25, 2020 at 11:12 AM wangl...@geekplus.com.cn <
>>> wangl...@geekplus.com.cn> wrote:
>>>

 Create one table with kafka,  another table with MySQL  using flinksql.
 Write a sql to read from kafka and write to MySQL.

 INSERT INTO mysqlTable  SELECT status, COUNT(order_no) AS num FROM
 (SELECT order_no, LAST_VALUE(status) AS status 
 FROM kafkaTable GROUP BY order_no)
 GROUP BY status

 I think this is a retract stream.

 But where can i find the java source code  about MySQL retract table sink?


 Thanks,

 Lei


 --
 wangl...@geekplus.com.cn


>>>
>>> --
>>> Best, Jingsong Lee
>>>
>>
>>
>> --
>> Best, Jingsong Lee
>>
>
>
> --
> Best, Jingsong Lee
>
>

-- 
Best, Jingsong Lee


Re: Re: Where can i find MySQL retract stream table sink java soure code?

2020-03-24 Thread wangl...@geekplus.com.cn
Thanks Jingsong.

So JDBCTableSink now suport append and upsert mode.  Retract mode not  
available yet. It is right?

Thanks,
Lei 




wangl...@geekplus.com.cn 

 
Sender: Jingsong Li
Send Time: 2020-03-25 11:39
Receiver: wangl...@geekplus.com.cn
cc: user
Subject: Re: Where can i find MySQL retract stream table sink java soure code?
Hi,

Maybe you have some misunderstanding to upsert sink. You can take a look to 
[1], it can deal with "delete" records.

[1]https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/dynamic_tables.html#table-to-stream-conversion

Best,
Jingsong Lee

On Wed, Mar 25, 2020 at 11:37 AM Jingsong Li  wrote:
Hi,

This can be a upsert stream [1], and JDBC has upsert sink now [2].

[1]https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/dynamic_tables.html#table-to-stream-conversion
[2]https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connect.html#jdbc-connector

Best,
Jingsong Lee

On Wed, Mar 25, 2020 at 11:14 AM Jingsong Li  wrote:
Hi,

This can be a upsert stream [1]

Best,
Jingsong Lee

On Wed, Mar 25, 2020 at 11:12 AM wangl...@geekplus.com.cn 
 wrote:

Create one table with kafka,  another table with MySQL  using flinksql.
Write a sql to read from kafka and write to MySQL.

INSERT INTO mysqlTable  SELECT status, COUNT(order_no) AS num FROM 
(SELECT order_no, LAST_VALUE(status) AS status FROM 
kafkaTable GROUP BY order_no) 
GROUP BY statusI think this is a retract stream. But 
where can i find the java source code  about MySQL retract table sink?
Thanks,Lei



wangl...@geekplus.com.cn 



-- 
Best, Jingsong Lee


-- 
Best, Jingsong Lee


-- 
Best, Jingsong Lee


Re: Where can i find MySQL retract stream table sink java soure code?

2020-03-24 Thread Jingsong Li
Hi,

Maybe you have some misunderstanding to upsert sink. You can take a look to
[1], it can deal with "delete" records.

[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/dynamic_tables.html#table-to-stream-conversion

Best,
Jingsong Lee

On Wed, Mar 25, 2020 at 11:37 AM Jingsong Li  wrote:

> Hi,
>
> This can be a upsert stream [1], and JDBC has upsert sink now [2].
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/dynamic_tables.html#table-to-stream-conversion
> [2]
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connect.html#jdbc-connector
>
> Best,
> Jingsong Lee
>
> On Wed, Mar 25, 2020 at 11:14 AM Jingsong Li 
> wrote:
>
>> Hi,
>>
>> This can be a upsert stream [1]
>>
>> Best,
>> Jingsong Lee
>>
>> On Wed, Mar 25, 2020 at 11:12 AM wangl...@geekplus.com.cn <
>> wangl...@geekplus.com.cn> wrote:
>>
>>>
>>> Create one table with kafka,  another table with MySQL  using flinksql.
>>> Write a sql to read from kafka and write to MySQL.
>>>
>>> INSERT INTO mysqlTable  SELECT status, COUNT(order_no) AS num FROM
>>> (SELECT order_no, LAST_VALUE(status) AS status FROM 
>>> kafkaTable GROUP BY order_no)
>>> GROUP BY status
>>>
>>> I think this is a retract stream.
>>>
>>> But where can i find the java source code  about MySQL retract table sink?
>>>
>>>
>>> Thanks,
>>>
>>> Lei
>>>
>>>
>>> --
>>> wangl...@geekplus.com.cn
>>>
>>>
>>
>> --
>> Best, Jingsong Lee
>>
>
>
> --
> Best, Jingsong Lee
>


-- 
Best, Jingsong Lee


Re: Where can i find MySQL retract stream table sink java soure code?

2020-03-24 Thread Jingsong Li
Hi,

This can be a upsert stream [1], and JDBC has upsert sink now [2].

[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/dynamic_tables.html#table-to-stream-conversion
[2]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connect.html#jdbc-connector

Best,
Jingsong Lee

On Wed, Mar 25, 2020 at 11:14 AM Jingsong Li  wrote:

> Hi,
>
> This can be a upsert stream [1]
>
> Best,
> Jingsong Lee
>
> On Wed, Mar 25, 2020 at 11:12 AM wangl...@geekplus.com.cn <
> wangl...@geekplus.com.cn> wrote:
>
>>
>> Create one table with kafka,  another table with MySQL  using flinksql.
>> Write a sql to read from kafka and write to MySQL.
>>
>> INSERT INTO mysqlTable  SELECT status, COUNT(order_no) AS num FROM
>> (SELECT order_no, LAST_VALUE(status) AS status FROM 
>> kafkaTable GROUP BY order_no)
>> GROUP BY status
>>
>> I think this is a retract stream.
>>
>> But where can i find the java source code  about MySQL retract table sink?
>>
>>
>> Thanks,
>>
>> Lei
>>
>>
>> --
>> wangl...@geekplus.com.cn
>>
>>
>
> --
> Best, Jingsong Lee
>


-- 
Best, Jingsong Lee


回复: 关于sql-gateway insert into 异常捕获的问题

2020-03-24 Thread 111
Hi,
好的,我研究下哈。现在taskmanager的原理还不太熟,有问题再沟通
Best,
Xinghalo


在2020年03月25日 11:01,godfrey he 写道:
Hi,你可以尝试在yarn上去拿历史作业的日志

Best,
Godfrey

111  于2020年3月25日周三 上午10:53写道:

Hi,

目前确实通过sql-gateway拿到了jobId,并且获取到了状态。不过由于使用的是yarn-session,导致失败后taskmanager回收,taskmanager上的日志也就丢失了。
如果连接到Jobmanager上,是拿不到taskmanager曾经的日志吧?


Best,
xinghalo
在2020年03月25日 10:47,godfrey he 写道:
hi, sql gateway目前支持获取作业状态(要求jm还能查询到该作业)。如果要获取作业异常可以通过jm提供的REST API [1]

[1]

https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/rest_api.html#jobs-jobid-exceptions

Best,
Godfrey

111  于2020年3月25日周三 上午10:38写道:

Hi,
确实是执行报错….那如果是执行报错,flink本身是否有提供获取exception的机制呢?


| |
xinghalo
|
|
xingh...@163.com
|
签名由网易邮箱大师定制


在2020年03月25日 10:32,godfrey he 写道:
目前sql gateway只负责提交作业,不负责跟踪作业的状态。如果作业没提交成功,sql gateway会返回相关的错误;如果是执行时报错,sql
gateway不会返回错误。你看看flink web ui作业是否提交成功

Best,
Godfrey

111  于2020年3月25日周三 上午10:29写道:



Hi,
我试过了,insert into是不行的…sql-gateway的后台日志也没有任何报错。
怀疑是不是我使用的jdbc sink connector,内部是流的方式。流不会把异常抛给sql-gateway?


在2020年03月25日 10:26,godfrey he 写道:
hi, sql gateway当前会把服务端的完整异常栈返回给用户,
例如:
Caused by: java.util.concurrent.ExecutionException:
org.apache.flink.runtime.rest.util.RestClientException: [Internal server
error.,  于2020年3月25日周三 上午8:47写道:

Hi,
最近在使用sql-gateway,当使用
StatementExecuteResponseBody body = getInstance().sendRequest(
host,port,StatementExecuteHeaders.getInstance(),
new SessionMessageParameters(sessionId),
new StatementExecuteRequestBody(stmt, timeout)).get();
提交insert语句时,当任务失败,无法返回对应的异常信息;是目前版本暂时不支持这种特性吗?





Re: 向您请教pyflink在windows上运行的问题,我第一次接触flink。

2020-03-24 Thread jincheng sun
很高兴收到您的邮件,我不太确定您具体看的是哪一个视频,所以很难确定您遇到的问题原因。您可以参考官方文档[1],
同时我个人博客里有一些3分钟的视频和文档入门教程[2],您可以先查阅一下。如又问题,可以保持邮件沟通,可以在我的博客留言!

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/tutorials/python_table_api.html
[2] https://enjoyment.cool/

Best,
Jincheng



xu1990xaut  于2020年3月24日周二 下午11:36写道:

> 您好,之前在哔哩哔哩上看过您讲的视频。  也跟着视频动手做了。
> 我用的flink1.10,在pip的时候是直接pip install apache-flink,结果默认就是1.10版本。
> 然后我在pycharm中运行word-count这个脚本时,一直不出结果,也不报错。   请问这是什么原因。
> 我也装了jdk,另外页面访问flink8081那个端口也可以出来界面。  我是第一次接触flink,在网上也搜过这个问题,
> 可是一直没有得到答案。  麻烦您,给小弟指点指点,   谢谢您了。
>
>
>
>


Re: Re: Where can i find MySQL retract stream table sink java soure code?

2020-03-24 Thread wangl...@geekplus.com.cn

Seems it is here:  
https://github.com/apache/flink/tree/master/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc
There's no JDBCRetractTableSink, only  append and upsert.
I am confused why the MySQL record can be deleted. 

Thanks,
Lei 


wangl...@geekplus.com.cn 

 
Sender: wangl...@geekplus.com.cn
Send Time: 2020-03-25 11:25
Receiver: Jingsong Li
cc: user
Subject: Re: Re: Where can i find MySQL retract stream table sink java soure 
code?

Thanks Jingsong.

When executing this sql, the mysql table record can be deleted. So i guess it 
is a retract stream.
I want to know the exactly java code it is generated and have a look at it.  

Thanks,
Lei




wangl...@geekplus.com.cn
 
Sender: Jingsong Li
Send Time: 2020-03-25 11:14
Receiver: wangl...@geekplus.com.cn
cc: user
Subject: Re: Where can i find MySQL retract stream table sink java soure code?
Hi,

This can be a upsert stream [1]

Best,
Jingsong Lee

On Wed, Mar 25, 2020 at 11:12 AM wangl...@geekplus.com.cn 
 wrote:

Create one table with kafka,  another table with MySQL  using flinksql.
Write a sql to read from kafka and write to MySQL.

INSERT INTO mysqlTable  SELECT status, COUNT(order_no) AS num FROM 
(SELECT order_no, LAST_VALUE(status) AS status FROM 
kafkaTable GROUP BY order_no) 
GROUP BY statusI think this is a retract stream. But 
where can i find the java source code  about MySQL retract table sink?
Thanks,Lei



wangl...@geekplus.com.cn 



-- 
Best, Jingsong Lee


Re: Re: Where can i find MySQL retract stream table sink java soure code?

2020-03-24 Thread wangl...@geekplus.com.cn

Thanks Jingsong.

When executing this sql, the mysql table record can be deleted. So i guess it 
is a retract stream.
I want to know the exactly java code it is generated and have a look at it.  

Thanks,
Lei




wangl...@geekplus.com.cn
 
Sender: Jingsong Li
Send Time: 2020-03-25 11:14
Receiver: wangl...@geekplus.com.cn
cc: user
Subject: Re: Where can i find MySQL retract stream table sink java soure code?
Hi,

This can be a upsert stream [1]

Best,
Jingsong Lee

On Wed, Mar 25, 2020 at 11:12 AM wangl...@geekplus.com.cn 
 wrote:

Create one table with kafka,  another table with MySQL  using flinksql.
Write a sql to read from kafka and write to MySQL.

INSERT INTO mysqlTable  SELECT status, COUNT(order_no) AS num FROM 
(SELECT order_no, LAST_VALUE(status) AS status FROM 
kafkaTable GROUP BY order_no) 
GROUP BY statusI think this is a retract stream. But 
where can i find the java source code  about MySQL retract table sink?
Thanks,Lei



wangl...@geekplus.com.cn 



-- 
Best, Jingsong Lee


Re: 【PyFlink】对于数据以Csv()格式写入kafka报错,以及使用python udf时无法启动udf

2020-03-24 Thread jincheng sun
Hi Zhefu,

谢谢您分享解决问题的细节,这对社区有很大的贡献!

1. 关于订阅问题

我想确认一下,你是否参考了[1],同时以订阅中文用户列表(user-zh@flink.apache.org)为例,您需要发送邮件到(
user-zh-subscr...@flink.apache.org),就是在原有邮件的地址上添加subscribe。同时收到一封“confirm
subscribe to *user-zh*@flink.apache.org”的确认邮件,需要进行确认回复。

2. 关于JAR包冲突问题

flink-python
JAR会携带flink-python对beam依赖的核心JAR包,我这里想了解一些,为啥你集群上面存在这beam相关的包?另外我认为您提供的case很好,让我想到了可以对PyFlink对Beam的依赖进行一些优化,比如将beam进行relocation.
我已经创建了社区改进JIRA[2].

3. 关于打包问题

上传给PyFlink的Python环境包需要是在机器间可移植的,所以的确不能包含软链接。如果是用virtualenv创建的环境的话,需要加上--always-copy选项。此外,如果集群机器上已经有准备好的python3.5+的环境,可以不用上传环境包,直接使用add_python_executable("python3")为集群指定要使用的Python
Interpreter。
除了virtualenv,conda/miniconda
也可用于创建虚拟环境,但是大小要大很多,在virtualenv处于某些原因不work的时候(比如源python解释器依赖的so文件在集群上不存在),可以考虑使用。

再次感谢您分享问题的解决细节和问题的反馈!

Best,
Jincheng

[1]
https://flink.apache.org/community.html#how-to-subscribe-to-a-mailing-list
[2] https://issues.apache.org/jira/browse/FLINK-16762


Zhefu PENG  于2020年3月24日周二 下午9:33写道:

> Hi Jincheng,
>
> 在中文邮件用户列表里我无法回复自己的问题(我已经十分确认subscribe了mailing list, but i dont know why
> and how),所以在这里回复一下。 经过同事的帮忙和共同努力,我们初步解决了之前的疑问。反馈如下:
>
> 1. 首先是包冲突的问题,我们发现flink-python这个包下也有beam-runners-core-java-2.15.0.jar,
> beam-runners-direct-java-2.15.0.jar,  beam-runners-flink_2.11-2.15.0.jar,
> beam-sdk-java-core-2.15.0.jar这四个jar包的代码,若是运行的集群环境下本身也有这四个包的话,则会产生冲突,运行udf功能时找不到依赖。
>
> 2.
> 因为一些原因,集群的python默认环境无法改成python3,因此我们在代码中添加env.add_python_archive的功能,并且使用set_python_executor来帮助指定解释器。
> 用到的压缩包,在打包时一定要去掉软链接的使用,(可能在解压后不支持软链接查找。),我之前打包方式有错,所以出现了问题。
>
> 以上两点是我们排查出的原因,也终于能在cluster
> mode下成功运行udf。希望我们的反馈也能给你们的发展以及对用户的指导增加一点贡献。再次感谢帮助:)
>
> Best,
> Zhefu
>
> Zhefu PENG  于2020年3月20日周五 下午11:19写道:
>
>>
>> 感谢回复!beam就是直接用pip install方法安装的,因为在用pip install
>> apache-flink的时候发现有很多依赖,而且在安装时候要求安装beam的2.15.0的版本,我就安上了2.15.0。flink版本是1.10,没有从源码编译。因此我们也很困扰现在,希望能够求得帮助。
>>
>> 还有个小问题,就是我看到似乎你没有把日志一起转到邮件列表中,我想附带上去,那么我在订阅后,直接回复可以把日志贴上去吗?
>>
>> 顺祝,周末愉快:)非常感谢回复!
>>
>> 彭哲夫
>>
>> On Fri, Mar 20, 2020 at 22:34 jincheng sun 
>> wrote:
>>
>>> 彭哲夫,你好:
>>>
>>> 你是如何安装的beam,安装的版本是多少?如果是1.10 需要apache-beam 2.15,如果是 master需要apache-beam
>>> 2.19.
>>>
>>> BTW,  为了共享你的问题,我将你的问题发到了中文用户列表里面,我们大家一起讨论。
>>>
>>> Best,
>>> Jincheng
>>>
>>>
>>> Zhefu PENG  于2020年3月20日周五 下午5:12写道:
>>>
 Hi Jincheng,

 针对昨天提到的第二点,我做了两个思路:1. 将我的pyflink.py以本地模式运行,2.对集群节点进行环境配置

 1.
 在本地模式中,我们发现在装apache-beam包的时候出于某些原因没有装全,少了_bz2模块,再补充上之后,pyflink.py的脚本可以正常运行,其中的udf功能也能正常使用。

 2.
 在本地模式运行成功的基础上,我们根据你的建议,对所有的worker节点进行了环境的更新,都更新到了python3.6以及安装了apache_beam和apache-flink.
 但是以集群模式运行带有udf功能的脚本仍然报错,尝试谷歌搜索以后也没有搜到相关解答,在附件附上错误日志,希望能得到帮助(因为本地模式已经成功所以就不附带代码了),非常感谢!

 期待您的回复
 彭哲夫


 Zhefu PENG  于2020年3月19日周四 下午11:14写道:

> Hi Jincheng:
>
> 非常感谢你如此迅速而细致的回复!~
>
> 关于第一点:根据你的回复,我在flink的lib目录下增加flink-csv-1.10.0-sql-jar.jar包之后,运行成功。而第一个包我在之前浏览你博客中关于kafka的使用的demo(based
> on flink 1.9)中有看到并下载,因此这里有个提议,或许你未来可以对于后续使用者补充
> flink-csv-1.10.0-sql-jar.jar包的使用的必要性 :),但也有可能是我在查询学习时看漏了,但不管怎么说感谢你的帮助解决;
>
> 关于第二点:因为部门相关组织安排问题,我现在没有权限去worker节点上查询,但是针对这一点我有个好奇的地方:我目前只在启动脚本的主机上安装了python3.5+,
> 并且除了udf功能外,我都能正常使用(比如sql本身就有的concat之类,或者add_columns()这种简单功能)。所以是不是我理解为,如果要使用pyflink的全部功能,应该是集群的环境都要是python3.5+?
> 但是简单的功能,只要启动脚本的主机环境符合就够了?
> 还是关于第二点,我刚刚又重新跑了一下脚本,本来是希望能获得和之前一样的错误日志发给我的mentor,但是发现这次报了新的问题:
> java.lang.NoClassDefFoundError: Could not initialize class
> org.apache.beam.sdk.options.PipelineOptionsFactory
> at
> org.apache.flink.python.AbstractPythonFunctionRunner.open(AbstractPythonFunctionRunner.java:173)
> at
> org.apache.flink.table.runtime.operators.python.AbstractPythonScalarFunctionOperator$ProjectUdfInputPythonScalarFunctionRunner.open(AbstractPythonScalarFunctionOperator.java:193)
> at
> org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.open(AbstractPythonFunctionOperator.java:139)
> at
> org.apache.flink.table.runtime.operators.python.AbstractPythonScalarFunctionOperator.open(AbstractPythonScalarFunctionOperator.java:143)
> at
> org.apache.flink.table.runtime.operators.python.PythonScalarFunctionOperator.open(PythonScalarFunctionOperator.java:73)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1007)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
>
>
> 我猜测原因正如你提到的worker的环境不符标准,我会在明天上班后请同事帮忙check后,根据你的建议进行修改尝试。也希望能解答一下疑问,因为刚毕业参加工作,可能提的问题会显得比较低级,请见谅!
>
> 再次感谢你的回复,我会根据建议尽快进行错误修复
> 彭哲夫
>
> jincheng sun  于2020年3月19日周四 下午9:08写道:
>
>> 彭哲夫,你好:
>>
>> 你上面问题可能原因是:
>>
>> 1. pyflink默认不包含kafka
>> connector的jar包和csv的格式JIR包,需要把这些jar包加到pyflink的lib目录下:
>>
>> $ PYFLINK_LIB=`python -c "import pyflink;import
>> 

Re: Where can i find MySQL retract stream table sink java soure code?

2020-03-24 Thread Jingsong Li
Hi,

This can be a upsert stream [1]

Best,
Jingsong Lee

On Wed, Mar 25, 2020 at 11:12 AM wangl...@geekplus.com.cn <
wangl...@geekplus.com.cn> wrote:

>
> Create one table with kafka,  another table with MySQL  using flinksql.
> Write a sql to read from kafka and write to MySQL.
>
> INSERT INTO mysqlTable  SELECT status, COUNT(order_no) AS num FROM
> (SELECT order_no, LAST_VALUE(status) AS status FROM 
> kafkaTable GROUP BY order_no)
> GROUP BY status
>
> I think this is a retract stream.
>
> But where can i find the java source code  about MySQL retract table sink?
>
>
> Thanks,
>
> Lei
>
>
> --
> wangl...@geekplus.com.cn
>
>

-- 
Best, Jingsong Lee


Where can i find MySQL retract stream table sink java soure code?

2020-03-24 Thread wangl...@geekplus.com.cn

Create one table with kafka,  another table with MySQL  using flinksql.
Write a sql to read from kafka and write to MySQL.

INSERT INTO mysqlTable  SELECT status, COUNT(order_no) AS num FROM 
(SELECT order_no, LAST_VALUE(status) AS status FROM 
kafkaTable GROUP BY order_no) 
GROUP BY statusI think this is a retract stream. But 
where can i find the java source code  about MySQL retract table sink?
Thanks,Lei



wangl...@geekplus.com.cn 



Re:Re: flinksql创建源表添加水位线失败

2020-03-24 Thread flink小猪
感谢您的回复,这是我lib目录下的jar包

flink-dist_2.11-1.10.0.jar  flink-sql-connector-kafka_2.11-1.10.0.jar  
flink-table-blink_2.11-1.10.0.jar  slf4j-log4j12-1.7.15.jar

flink-json-1.10.0.jar   flink-table_2.11-1.10.0.jar
log4j-1.2.17.jar
以下是提交任务的异常信息
org.apache.flink.client.program.ProgramInvocationException: The main method 
caused an error: From line 9, column 25 to line 9, column 26: Unknown 
identifier 'ts'
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895)
at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
at 
org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
Caused by: org.apache.calcite.runtime.CalciteContextException: From line 9, 
column 25 to line 9, column 26: Unknown identifier 'ts'
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463)
at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:834)
at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:819)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4841)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5667)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5587)
at org.apache.calcite.sql.SqlIdentifier.accept(SqlIdentifier.java:317)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1691)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1676)
at org.apache.calcite.sql.SqlOperator.deriveType(SqlOperator.java:501)
at 
org.apache.calcite.sql.SqlBinaryOperator.deriveType(SqlBinaryOperator.java:144)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5600)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5587)
at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1691)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1676)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:947)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateParameterizedExpression(SqlValidatorImpl.java:930)
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.lambda$createTableSchema$8(SqlToOperationConverter.java:509)
at java.util.Optional.ifPresent(Optional.java:159)
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.createTableSchema(SqlToOperationConverter.java:505)
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convertCreateTable(SqlToOperationConverter.java:177)
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:130)
at 
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:484)
at WindowUv$.main(WindowUv.scala:49)
at WindowUv.main(WindowUv.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321)
... 8 more
Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Unknown 
identifier 'ts'
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463)
at 

Re: 关于sql-gateway insert into 异常捕获的问题

2020-03-24 Thread godfrey he
Hi,你可以尝试在yarn上去拿历史作业的日志

Best,
Godfrey

111  于2020年3月25日周三 上午10:53写道:

> Hi,
>
> 目前确实通过sql-gateway拿到了jobId,并且获取到了状态。不过由于使用的是yarn-session,导致失败后taskmanager回收,taskmanager上的日志也就丢失了。
> 如果连接到Jobmanager上,是拿不到taskmanager曾经的日志吧?
>
>
> Best,
>  xinghalo
> 在2020年03月25日 10:47,godfrey he 写道:
> hi, sql gateway目前支持获取作业状态(要求jm还能查询到该作业)。如果要获取作业异常可以通过jm提供的REST API [1]
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/rest_api.html#jobs-jobid-exceptions
>
> Best,
> Godfrey
>
> 111  于2020年3月25日周三 上午10:38写道:
>
> Hi,
> 确实是执行报错….那如果是执行报错,flink本身是否有提供获取exception的机制呢?
>
>
> | |
> xinghalo
> |
> |
> xingh...@163.com
> |
> 签名由网易邮箱大师定制
>
>
> 在2020年03月25日 10:32,godfrey he 写道:
> 目前sql gateway只负责提交作业,不负责跟踪作业的状态。如果作业没提交成功,sql gateway会返回相关的错误;如果是执行时报错,sql
> gateway不会返回错误。你看看flink web ui作业是否提交成功
>
> Best,
> Godfrey
>
> 111  于2020年3月25日周三 上午10:29写道:
>
>
>
> Hi,
> 我试过了,insert into是不行的…sql-gateway的后台日志也没有任何报错。
> 怀疑是不是我使用的jdbc sink connector,内部是流的方式。流不会把异常抛给sql-gateway?
>
>
> 在2020年03月25日 10:26,godfrey he 写道:
> hi, sql gateway当前会把服务端的完整异常栈返回给用户,
> 例如:
> Caused by: java.util.concurrent.ExecutionException:
> org.apache.flink.runtime.rest.util.RestClientException: [Internal server
> error.,  com.ververica.flink.table.gateway.SqlExecutionException: xxx
>
> 你可以检查一下sql gateway的日志,看看是否相关的错误
>
> Best,
> Godfrey
>
> 111  于2020年3月25日周三 上午8:47写道:
>
> Hi,
> 最近在使用sql-gateway,当使用
> StatementExecuteResponseBody body = getInstance().sendRequest(
> host,port,StatementExecuteHeaders.getInstance(),
> new SessionMessageParameters(sessionId),
> new StatementExecuteRequestBody(stmt, timeout)).get();
> 提交insert语句时,当任务失败,无法返回对应的异常信息;是目前版本暂时不支持这种特性吗?
>
>
>


回复: 关于sql-gateway insert into 异常捕获的问题

2020-03-24 Thread 111
Hi,
目前确实通过sql-gateway拿到了jobId,并且获取到了状态。不过由于使用的是yarn-session,导致失败后taskmanager回收,taskmanager上的日志也就丢失了。
如果连接到Jobmanager上,是拿不到taskmanager曾经的日志吧?


Best,
 xinghalo
在2020年03月25日 10:47,godfrey he 写道:
hi, sql gateway目前支持获取作业状态(要求jm还能查询到该作业)。如果要获取作业异常可以通过jm提供的REST API [1]

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/rest_api.html#jobs-jobid-exceptions

Best,
Godfrey

111  于2020年3月25日周三 上午10:38写道:

Hi,
确实是执行报错….那如果是执行报错,flink本身是否有提供获取exception的机制呢?


| |
xinghalo
|
|
xingh...@163.com
|
签名由网易邮箱大师定制


在2020年03月25日 10:32,godfrey he 写道:
目前sql gateway只负责提交作业,不负责跟踪作业的状态。如果作业没提交成功,sql gateway会返回相关的错误;如果是执行时报错,sql
gateway不会返回错误。你看看flink web ui作业是否提交成功

Best,
Godfrey

111  于2020年3月25日周三 上午10:29写道:



Hi,
我试过了,insert into是不行的…sql-gateway的后台日志也没有任何报错。
怀疑是不是我使用的jdbc sink connector,内部是流的方式。流不会把异常抛给sql-gateway?


在2020年03月25日 10:26,godfrey he 写道:
hi, sql gateway当前会把服务端的完整异常栈返回给用户,
例如:
Caused by: java.util.concurrent.ExecutionException:
org.apache.flink.runtime.rest.util.RestClientException: [Internal server
error.,  于2020年3月25日周三 上午8:47写道:

Hi,
最近在使用sql-gateway,当使用
StatementExecuteResponseBody body = getInstance().sendRequest(
host,port,StatementExecuteHeaders.getInstance(),
new SessionMessageParameters(sessionId),
new StatementExecuteRequestBody(stmt, timeout)).get();
提交insert语句时,当任务失败,无法返回对应的异常信息;是目前版本暂时不支持这种特性吗?




Re: 关于sql-gateway insert into 异常捕获的问题

2020-03-24 Thread godfrey he
hi, sql gateway目前支持获取作业状态(要求jm还能查询到该作业)。如果要获取作业异常可以通过jm提供的REST API [1]

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/rest_api.html#jobs-jobid-exceptions

Best,
Godfrey

111  于2020年3月25日周三 上午10:38写道:

> Hi,
> 确实是执行报错….那如果是执行报错,flink本身是否有提供获取exception的机制呢?
>
>
> | |
> xinghalo
> |
> |
> xingh...@163.com
> |
> 签名由网易邮箱大师定制
>
>
> 在2020年03月25日 10:32,godfrey he 写道:
> 目前sql gateway只负责提交作业,不负责跟踪作业的状态。如果作业没提交成功,sql gateway会返回相关的错误;如果是执行时报错,sql
> gateway不会返回错误。你看看flink web ui作业是否提交成功
>
> Best,
> Godfrey
>
> 111  于2020年3月25日周三 上午10:29写道:
>
>
>
> Hi,
> 我试过了,insert into是不行的…sql-gateway的后台日志也没有任何报错。
> 怀疑是不是我使用的jdbc sink connector,内部是流的方式。流不会把异常抛给sql-gateway?
>
>
> 在2020年03月25日 10:26,godfrey he 写道:
> hi, sql gateway当前会把服务端的完整异常栈返回给用户,
> 例如:
> Caused by: java.util.concurrent.ExecutionException:
> org.apache.flink.runtime.rest.util.RestClientException: [Internal server
> error.,  com.ververica.flink.table.gateway.SqlExecutionException: xxx
>
> 你可以检查一下sql gateway的日志,看看是否相关的错误
>
> Best,
> Godfrey
>
> 111  于2020年3月25日周三 上午8:47写道:
>
> Hi,
> 最近在使用sql-gateway,当使用
> StatementExecuteResponseBody body = getInstance().sendRequest(
> host,port,StatementExecuteHeaders.getInstance(),
> new SessionMessageParameters(sessionId),
> new StatementExecuteRequestBody(stmt, timeout)).get();
> 提交insert语句时,当任务失败,无法返回对应的异常信息;是目前版本暂时不支持这种特性吗?
>
>


回复: 关于sql-gateway insert into 异常捕获的问题

2020-03-24 Thread 111
Hi,
确实是执行报错….那如果是执行报错,flink本身是否有提供获取exception的机制呢?


| |
xinghalo
|
|
xingh...@163.com
|
签名由网易邮箱大师定制


在2020年03月25日 10:32,godfrey he 写道:
目前sql gateway只负责提交作业,不负责跟踪作业的状态。如果作业没提交成功,sql gateway会返回相关的错误;如果是执行时报错,sql
gateway不会返回错误。你看看flink web ui作业是否提交成功

Best,
Godfrey

111  于2020年3月25日周三 上午10:29写道:



Hi,
我试过了,insert into是不行的…sql-gateway的后台日志也没有任何报错。
怀疑是不是我使用的jdbc sink connector,内部是流的方式。流不会把异常抛给sql-gateway?


在2020年03月25日 10:26,godfrey he 写道:
hi, sql gateway当前会把服务端的完整异常栈返回给用户,
例如:
Caused by: java.util.concurrent.ExecutionException:
org.apache.flink.runtime.rest.util.RestClientException: [Internal server
error.,  于2020年3月25日周三 上午8:47写道:

Hi,
最近在使用sql-gateway,当使用
StatementExecuteResponseBody body = getInstance().sendRequest(
host,port,StatementExecuteHeaders.getInstance(),
new SessionMessageParameters(sessionId),
new StatementExecuteRequestBody(stmt, timeout)).get();
提交insert语句时,当任务失败,无法返回对应的异常信息;是目前版本暂时不支持这种特性吗?



Re: 关于sql-gateway insert into 异常捕获的问题

2020-03-24 Thread godfrey he
目前sql gateway只负责提交作业,不负责跟踪作业的状态。如果作业没提交成功,sql gateway会返回相关的错误;如果是执行时报错,sql
gateway不会返回错误。你看看flink web ui作业是否提交成功

Best,
Godfrey

111  于2020年3月25日周三 上午10:29写道:

>
>
> Hi,
> 我试过了,insert into是不行的…sql-gateway的后台日志也没有任何报错。
> 怀疑是不是我使用的jdbc sink connector,内部是流的方式。流不会把异常抛给sql-gateway?
>
>
> 在2020年03月25日 10:26,godfrey he 写道:
> hi, sql gateway当前会把服务端的完整异常栈返回给用户,
> 例如:
> Caused by: java.util.concurrent.ExecutionException:
> org.apache.flink.runtime.rest.util.RestClientException: [Internal server
> error.,  com.ververica.flink.table.gateway.SqlExecutionException: xxx
>
> 你可以检查一下sql gateway的日志,看看是否相关的错误
>
> Best,
> Godfrey
>
> 111  于2020年3月25日周三 上午8:47写道:
>
> Hi,
> 最近在使用sql-gateway,当使用
> StatementExecuteResponseBody body = getInstance().sendRequest(
> host,port,StatementExecuteHeaders.getInstance(),
> new SessionMessageParameters(sessionId),
> new StatementExecuteRequestBody(stmt, timeout)).get();
> 提交insert语句时,当任务失败,无法返回对应的异常信息;是目前版本暂时不支持这种特性吗?
>


Re: Deploying native Kubernetes via yaml files?

2020-03-24 Thread Yang Wang
Hi Niels,

I have created a ticket[1] to track the yaml file submission for native K8s
integration.
Feel free to share your significative thoughts about this way.


[1]. https://issues.apache.org/jira/browse/FLINK-16760


Best,
Yang

Niels Basjes  于2020年3月24日周二 下午11:10写道:

> Thanks.
> I'll have a look at this.
>
> Can you post the ticket number for this feature (after you created it)
> please?
>
> Niels
>
> On Tue, Mar 24, 2020 at 3:54 PM Yang Wang  wrote:
>
>> Hi Niels,
>>
>> Currently, the native integration Flink cluster could not be created via
>> yaml file. The reason
>> why we introduce the native integration is for the users who are not
>> familiar with K8s and
>> kubectl. So we want to make it easier for our Flink users to deploy Flink
>> cluster on K8s.
>>
>> However, i think some other users, especially K8s experts, they really
>> like the yaml way to
>> create Flink cluster. I will create a ticket for this feature. If you
>> want to do this manually now,
>> i think you need to at least do the following changes.
>> 1. Update the `flink-console.sh` and add a new script(mostly like
>> jobmanager.sh) to set the
>> entrypoint to
>> "org.apache.flink.kubernetes.entrypoint.KubernetesSessionClusterEntrypoint".
>> 2. Create the configmap, service like the standalone on K8s[1].
>> 3. Create a jobmanager.yaml with service account(enough permission to
>> create pod in K8s
>> cluster) correctly set
>>
>>
>> [1].
>> https://ci.apache.org/projects/flink/flink-docs-master/ops/deployment/kubernetes.html#appendix
>>
>>
>> Best,
>> Yang
>>
>> Ufuk Celebi  于2020年3月24日周二 下午9:51写道:
>>
>>> PS: See also
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/deployment/kubernetes.html#flink-job-cluster-on-kubernetes
>>>
>>> On Tue, Mar 24, 2020 at 2:49 PM Ufuk Celebi  wrote:
>>>
 Hey Niels,

 you can check out the README with example configuration files here:
 https://github.com/apache/flink/tree/master/flink-container/kubernetes

 Is that what you were looking for?

 Best,

 Ufuk

 On Tue, Mar 24, 2020 at 2:42 PM Niels Basjes  wrote:

> Hi,
>
> As clearly documented here
> https://ci.apache.org/projects/flink/flink-docs-master/ops/deployment/native_kubernetes.html
>  the
> current way of deploying Flink natively on Kubernetes is by running the 
> ./bin/kubernetes-session.sh
> script that runs some Java code that does "magic" to deploy in on the
> cluster.
> This works.
>
> I was wondering: Is it with the current code base already possible to
> craft a set of Yaml files (perhaps even with a special Docker image) so
> that I can deploy it using the 'normal' Kubernetes way of doing a kubectl
> apply -f foo.yaml ?
>
> --
> Best regards / Met vriendelijke groeten,
>
> Niels Basjes
>

>
> --
> Best regards / Met vriendelijke groeten,
>
> Niels Basjes
>


回复: 关于sql-gateway insert into 异常捕获的问题

2020-03-24 Thread 111


Hi,
我试过了,insert into是不行的…sql-gateway的后台日志也没有任何报错。
怀疑是不是我使用的jdbc sink connector,内部是流的方式。流不会把异常抛给sql-gateway?


在2020年03月25日 10:26,godfrey he 写道:
hi, sql gateway当前会把服务端的完整异常栈返回给用户,
例如:
Caused by: java.util.concurrent.ExecutionException:
org.apache.flink.runtime.rest.util.RestClientException: [Internal server
error.,  于2020年3月25日周三 上午8:47写道:

Hi,
最近在使用sql-gateway,当使用
StatementExecuteResponseBody body = getInstance().sendRequest(
host,port,StatementExecuteHeaders.getInstance(),
new SessionMessageParameters(sessionId),
new StatementExecuteRequestBody(stmt, timeout)).get();
提交insert语句时,当任务失败,无法返回对应的异常信息;是目前版本暂时不支持这种特性吗?


Re: 关于sql-gateway insert into 异常捕获的问题

2020-03-24 Thread godfrey he
hi, sql gateway当前会把服务端的完整异常栈返回给用户,
例如:
Caused by: java.util.concurrent.ExecutionException:
org.apache.flink.runtime.rest.util.RestClientException: [Internal server
error.,  于2020年3月25日周三 上午8:47写道:

> Hi,
> 最近在使用sql-gateway,当使用
> StatementExecuteResponseBody body = getInstance().sendRequest(
> host,port,StatementExecuteHeaders.getInstance(),
> new SessionMessageParameters(sessionId),
> new StatementExecuteRequestBody(stmt, timeout)).get();
> 提交insert语句时,当任务失败,无法返回对应的异常信息;是目前版本暂时不支持这种特性吗?


Re: 如何提升任务cpu使用率

2020-03-24 Thread Xintong Song
你的 Flink 版本是什么?运行环境是 Yarn?

降低 slot 数并不能提高 cpu 的使用率。默认情况下 yarn container 申请 vcore 数等于 slot 数,降低 slot
数相当于等比例地降低了每个 container 的 cpu 资源和计算需求。如果想提高 cpu 的使用率,可以考虑让 container 的
vcore 数少于 slot 数。通过 ‘yarn.containers.vcores’ 可以设置 container 的 vcore 数不用默认的
slot 数。

Thank you~

Xintong Song



On Tue, Mar 24, 2020 at 8:09 PM yanggang_it_job 
wrote:

> hi:
>背景介绍,现在集群的剩余核数不多,就去梳理了一些大任务。
>
>  
> 通过PromSQL:max(flink_taskmanager_Status_JVM_CPU_Load{job_name={job_name}})获取指定任务的cpu使用率,
>发现任务的cpu使用率普遍较低,一个slot为10的container,使用率大多小于6%。
>
>然后我测试中我降低container里面的slot数,发现cpu使用率并没有线性增加,同理我增大slot数也没有线性减少。
>
>我是不是测试的有问题呢?或者有什么相关思路吗?
>
>
>
>
>
>


Re: flinksql创建源表添加水位线失败

2020-03-24 Thread Jark Wu
Emm... 这个好奇怪,理论上 IDEA 中运行的时候可能会有问题 (Calcite bug 导致的问题),SQL CLI 中不应该有问题。
你的集群/作业中有其他的依赖吗? 比如自己依赖了 Calcite?

Best,
Jark

On Tue, 24 Mar 2020 at 23:37, flink小猪 <18579099...@163.com> wrote:

> 当我尝试通过sql client创建kafka源表时(这里借鉴了云邪大佬的《Demo:基于 Flink SQL 构建流式应用》文章),
> CREATE TABLE user_behavior (
> user_id BIGINT,
> item_id BIGINT,
> category_id BIGINT,
> behavior STRING,
> ts TIMESTAMP(3),
> proctime as PROCTIME(),
> WATERMARK FOR ts as ts - INTERVAL '5' SECOND
> ) WITH (
> 'connector.type' = 'kafka',
> 'connector.version' = 'universal',
> 'connector.topic' = 'user_behavior',
> 'connector.startup-mode' = 'earliest-offset',
> 'connector.properties.zookeeper.connect' = '192.168.1.214:2181',
> 'connector.properties.bootstrap.servers' = '192.168.1.214:9092',
> 'format.type' = 'json'
> )
> 出现错误
> org.apache.calcite.sql.validate.SqlValidatorException: Unknown identifier
> 'ts'
> 我以为是我的建表语句有问题,但是当我通过代码(在idea上运行)创建这个表时,没有任何问题,数据也是可以正常查询到。当我提交到集群上运行时又出现org.apache.calcite.sql.validate.SqlValidatorException:
> Unknown identifier 'ts'
> ,我猜测是jar不全的问题,但是我按照云邪大佬的文章导入了
> flink-sql-connector-kafka_2.11-1.10.0.jar
> flink-json-1.10.0.jar
> 在sql-client上执行还是错误,是我缺少什么jar包吗?
>
>
>
>
>
>


Re: How to calculate one alarm strategy for each device or one alarm strategy for each type of IOT device

2020-03-24 Thread yang xu
Hi Dawid
I use Flink to calculate IOT device alarms,My scenario is that each device
has an independent alarm strategy,For example, I calculate that the
temperature of 10 consecutive event data of a device is higher than 10
degrees。
I use: 
sourceStream.keyBy("deviceNo")
.flatMap(new StateAlarmCalculationFunction()).
StateAlarmCalculationFunction extends RichFlatMapFunction 
StateAlarmCalculationFunction  use ListState deviceListState to
store the device state. After each device data comes in, I load the
deviceListState for calculation,But I don't think it's smart enough. Is
there anything better



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: java.lang.AbstractMethodError when implementing KafkaSerializationSchema

2020-03-24 Thread Steve Whelan
Hi Arvid,

Interestingly, my job runs successfully in a docker container (image*
flink:1.9.0-scala_2.11*) but is failing with the
*java.lang.AbstractMethodError* on AWS EMR (non-docker). I am compiling
with java version OpenJDK 1.8.0_242, which is the same version my EMR
cluster is running. Though since it runs successfully locally in a docker
container, it would point to an issue in our AWS environment setup. Oddly,
we have been running Flink on EMR for +2 years and have never come across
this till now.

Results of javap are:

public class
com.jwplayer.flink.serialization.json.JsonRowKeyedSerializationSchema
implements
org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema
{
  public static
org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema
create(com.jwplayer.flink.config.serde.SerDeConfig);
  public byte[] serializeKey(org.apache.flink.types.Row);
  public byte[] serializeValue(org.apache.flink.types.Row);
  public org.apache.kafka.clients.producer.ProducerRecord
serialize(org.apache.flink.types.Row, java.lang.Long);
  public org.apache.kafka.clients.producer.ProducerRecord
serialize(java.lang.Object, java.lang.Long);
}


On Mon, Mar 23, 2020 at 9:55 AM Arvid Heise  wrote:

> Hi Steve,
>
> for some reason, it seems as if the Java compiler is not generating the
> bridge method [1].
>
> Could you double-check that the Java version of your build process and
> your cluster match?
>
> Could you run javap on your generated class file and report back?
>
> [1]
> https://docs.oracle.com/javase/tutorial/java/generics/bridgeMethods.html
>
> On Thu, Mar 19, 2020 at 5:13 PM Steve Whelan  wrote:
>
>> Hi,
>>
>> I am attempting to create a Key/Value serializer for the Kafka table
>> connector. I forked `KafkaTableSourceSinkFactoryBase`[1] and other relevant
>> classes, updating the serializer.
>>
>> First, I created `JsonRowKeyedSerializationSchema` which implements
>> `KeyedSerializationSchema`[2], which is deprecated. The way it works is you
>> provide a list of indices in your Row output that are the Key. This works
>> successfully.
>>
>> When I tried migrating my `JsonRowKeyedSerializationSchema` to implement
>> `KafkaSerializationSchema`[3], I get a `java.lang.AbstractMethodError`
>> exception. Normally, this would me I'm using an old interface however all
>> my Flink dependencies are version 1.9. I can not find this abstract
>> `serialize()` function in the Flink codebase. Has anyone come across this
>> before?
>>
>> When I print the method of my `JsonRowKeyedSerializationSchema` class, I
>> do see the below which seems to be getting called by the FlinkKafkaProducer
>> but I do not see it in `KafkaSerializationSchema`:
>>
>> public abstract
>> org.apache.flink.kafka.shaded.org.apache.kafka.clients.producer.ProducerRecord
>> org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema.serialize(java.lang.Object,java.lang.Long)
>> java.lang.Object
>> java.lang.Long
>>
>>
>> *`JsonRowKeyedSerializationSchema` class*
>>
>> import
>> org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
>> import org.apache.flink.types.Row;
>> import org.apache.kafka.clients.producer.ProducerRecord;
>>
>> public class JsonRowKeyedSerializationSchema implements
>> KafkaSerializationSchema {
>>
>>   // constructors and helpers
>>
>>   @Override
>>   public ProducerRecord serialize(Row row, @Nullable Long
>> aLong) {
>> return new ProducerRecord<>("some_topic", serializeKey(row),
>> serializeValue(row));
>>   }
>> }
>>
>>
>> *Stacktrace:*
>>
>> Caused by: java.lang.AbstractMethodError: Method
>> com/mypackage/flink/serialization/json/JsonRowKeyedSerializationSchema.serialize(Ljava/lang/Object;Ljava/lang/Long;)Lorg/apache/flink/kafka/shaded/org/apache/kafka/clients/producer/ProducerRecord;
>> is abstract
>> at
>> com.mypackage.flink.serialization.json.JsonRowKeyedSerializationSchema.serialize(JsonRowKeyedSerializationSchema.java)
>> at
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:816)
>> at
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:98)
>> at
>> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:228)
>> at
>> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.pushToOperator(OperatorChain.java:546)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:523)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:483)
>> at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
>> at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
>> at
>> 

回复:关于flink sql 1.10 source并行度自动推断的疑问

2020-03-24 Thread Jun Zhang
hi,Chief:
  
目前flink读取hive的时候,如果开启了自动推断,系统会根据所读取的文件数来推断并发,如果没有超过最大并发数(默认1000),source的并行度就等于你文件的个数,
你可以通过table.exec.hive.infer-source-parallelism.max来设置source的最大并发度。



BestJun


-- 原始邮件 --
发件人: Kurt Young 

Re: 关于flink sql 1.10 source并行度自动推断的疑问

2020-03-24 Thread Jun Zhang
hi,Chief:

目前flink读取hive的时候,如果开启了自动推断,系统会根据所读取的文件数来推断并发,如果没有超过最大并发数(默认1000),source的并行度就等于你文件的个数,
你可以通过table.exec.hive.infer-source-parallelism.max来设置source的最大并发度。

Kurt Young  于2020年3月25日周三 上午8:53写道:

> 你的数据量有多大?有一个可能的原因是source的其他并发调度起来的时候,数据已经被先调度起来的并发读完了。
>
> Best,
> Kurt
>
>
> On Tue, Mar 24, 2020 at 10:39 PM Chief  wrote:
>
> > hi all:
> > 之前用flink sql查询hive的数据,hive的数据文件是150个,sql
> > client配置文件设置的并行度是10,source通过自动推断生成了150并发,但是通过看web
> > ui发现只有前十个子任务是读到数据了,其他的任务显示没有读到数据,请问是我设置有问题吗?
>


Re: 关于flink sql 1.10 source并行度自动推断的疑问

2020-03-24 Thread Kurt Young
你的数据量有多大?有一个可能的原因是source的其他并发调度起来的时候,数据已经被先调度起来的并发读完了。

Best,
Kurt


On Tue, Mar 24, 2020 at 10:39 PM Chief  wrote:

> hi all:
> 之前用flink sql查询hive的数据,hive的数据文件是150个,sql
> client配置文件设置的并行度是10,source通过自动推断生成了150并发,但是通过看web
> ui发现只有前十个子任务是读到数据了,其他的任务显示没有读到数据,请问是我设置有问题吗?


关于sql-gateway insert into 异常捕获的问题

2020-03-24 Thread 111
Hi,
最近在使用sql-gateway,当使用
StatementExecuteResponseBody body = getInstance().sendRequest(
host,port,StatementExecuteHeaders.getInstance(),
new SessionMessageParameters(sessionId),
new StatementExecuteRequestBody(stmt, timeout)).get();
提交insert语句时,当任务失败,无法返回对应的异常信息;是目前版本暂时不支持这种特性吗?

flinksql创建源表添加水位线失败

2020-03-24 Thread flink小猪
当我尝试通过sql client创建kafka源表时(这里借鉴了云邪大佬的《Demo:基于 Flink SQL 构建流式应用》文章),
CREATE TABLE user_behavior (
user_id BIGINT,
item_id BIGINT,
category_id BIGINT,
behavior STRING,
ts TIMESTAMP(3),
proctime as PROCTIME(),
WATERMARK FOR ts as ts - INTERVAL '5' SECOND
) WITH (
'connector.type' = 'kafka',
'connector.version' = 'universal',
'connector.topic' = 'user_behavior',
'connector.startup-mode' = 'earliest-offset',
'connector.properties.zookeeper.connect' = '192.168.1.214:2181',
'connector.properties.bootstrap.servers' = '192.168.1.214:9092',
'format.type' = 'json'
)
出现错误
org.apache.calcite.sql.validate.SqlValidatorException: Unknown identifier 'ts'
我以为是我的建表语句有问题,但是当我通过代码(在idea上运行)创建这个表时,没有任何问题,数据也是可以正常查询到。当我提交到集群上运行时又出现org.apache.calcite.sql.validate.SqlValidatorException:
 Unknown identifier 'ts'
,我猜测是jar不全的问题,但是我按照云邪大佬的文章导入了
flink-sql-connector-kafka_2.11-1.10.0.jar
flink-json-1.10.0.jar
在sql-client上执行还是错误,是我缺少什么jar包吗?







Re: Kerberos authentication for SQL CLI

2020-03-24 Thread Dawid Wysakowicz
I think the reason why its different from the CliFrontend is that the
sql client is way younger and as far as I know never reached
"production" readiness. (as per the docs [1], it's still marked as Beta,
plus see the first "Attention" ;) ).

I think it certainly makes sense to have a proper security story. In a
way that's also where my question about which parts of the sql client
you would see guarded with kerberos. I think well put requirements in
that area, could make a really great feature request. Catalogs are
certainly a good point, as the client accesses catalogs to query the
schema even before submitting the job.

Best,

Dawid


On 24/03/2020 13:50, Gyula Fóra wrote:
> Thanks Dawid,
>
> I think you are right that most of the things should work like this
> just fine. Maybe some catalogs will need this at some point but not
> right now,
> I was just wondering why is this different from how the CliFrontend
> works which also installs the security context on the Client side.
>
> I guess same arguments should apply for the SQL CLI, whatever they
> might be :)
>
> Gyula
>
> On Tue, Mar 24, 2020 at 12:30 PM Dawid Wysakowicz
> mailto:dwysakow...@apache.org>> wrote:
>
> Hi Gyula,
>
> As far as I can tell SQL cli does not support Kerberos natively.
> SQL CLI
> submits all the queries to a running Flink cluster. Therefore if you
> kerberize the cluster the queries will use that configuration.
>
> On a different note. Out of curiosity. What would you expect the
> SQL CLI
> to use the Kerberos authentication for?
>
> Best,
>
> Dawid
>
> On 24/03/2020 11:11, Gyula Fóra wrote:
> > Hi!
> >
> > Does the SQL CLI support Kerberos Authentication?
> >
> > I am struggling to find any use of the SecurityContext in the
> SQL CLI
> > logic but maybe I am looking in the wrong place.
> >
> > Thank you!
> > Gyula
>


signature.asc
Description: OpenPGP digital signature


Re: 关于 SQL DATE_FORMAT 的时区设置的构想

2020-03-24 Thread Jark Wu
Thanks for reporting this Weike.

首先,我觉得目前 Flink 返回 TIMESTAMP WITHOUT TIME ZONE 应该是有问题的。
因为 SQL 标准(SQL:2011 Part 2 Section 6.32)定义了返回类型是 WITH TIME ZONE。
另外 Calcite 文档中 [1] 也说返回的是 TIMESTAMP WITH TIME ZONE (虽然好像和实现不一致)
其他的一些数据库也都差不多:mysql [2], oracle[3]

Best,
Jark

[1]: https://calcite.apache.org/docs/reference.html#datetime-functions
[2]:
https://dev.mysql.com/doc/refman/8.0/en/date-and-time-functions.html#function_current-timestamp
[3]:
https://docs.oracle.com/cd/B28359_01/server.111/b28286/functions038.htm#SQLRF00629



On Tue, 24 Mar 2020 at 17:00, DONG, Weike  wrote:

> Hi Zhenghua,
>
> 感谢您的回复。感觉既然 Flink 已经提供了时区的设定,这方面也许可以进一步增强一些。CONVERT_TZ
> 用户很容易忘记或者漏掉,这里还是有不少完善的空间。
>
> Best,
> Weike
>
> On Tue, Mar 24, 2020 at 4:20 PM Zhenghua Gao  wrote:
>
> > CURRENT_TIMESTAMP 返回值类型是 TIMESTAMP (WITHOUT TIME ZONE),
> > 其语义可参考 java.time.LocalDateTime。
> > 其字符形式的表示并不随着时区变化而变化(如你所见,和UTC+0 一致)。
> >
> > 你的需求可以通过 CONVERT_TZ(timestamp_string, time_zone_from_string,
> > time_zone_to_string)
> >
> > *Best Regards,*
> > *Zhenghua Gao*
> >
> >
> > On Mon, Mar 23, 2020 at 10:12 PM DONG, Weike 
> > wrote:
> >
> > > Hi,
> > >
> > > 近期发现 Flink 的 Blink Planner 在 DATE_FORMAT 对 CURRENT_TIMESTAMP
> > > 做时间格式化为字符串时,默认以 UTC+0 为准。
> > >
> > > 长期以来,TableConfig 类里面有一个 setLocalTimeZone 方法;将其设置为东八区以后,发现格式化后的字符串仍然是
> > UTC+0
> > > 的。而深入来看,Flink 的时间格式化时的代码生成逻辑(time.scala)并未考虑时区的设置。
> > >
> > > 由于大多数用户的时区均不是 UTC+0(GMT、UTC),如果时间格式化、显示等都可以考虑到 TableConfig 中的时区设置,那么
> > Flink
> > > 是否会更用户友好一些呢?当然这个会涉及到不兼容的变更,需要谨慎一些。
> > >
> > > 也许提供一个 DATE_FORMAT_WITH_TIMEZONE 的内置函数,社区是否会更容易接受一些呢?
> > >
> > > 仅仅是个人一点想法,感谢 :)
> > >
> >
>


Re: Deploying native Kubernetes via yaml files?

2020-03-24 Thread Yang Wang
Hi Niels,

Currently, the native integration Flink cluster could not be created via
yaml file. The reason
why we introduce the native integration is for the users who are not
familiar with K8s and
kubectl. So we want to make it easier for our Flink users to deploy Flink
cluster on K8s.

However, i think some other users, especially K8s experts, they really like
the yaml way to
create Flink cluster. I will create a ticket for this feature. If you want
to do this manually now,
i think you need to at least do the following changes.
1. Update the `flink-console.sh` and add a new script(mostly like
jobmanager.sh) to set the
entrypoint to
"org.apache.flink.kubernetes.entrypoint.KubernetesSessionClusterEntrypoint".
2. Create the configmap, service like the standalone on K8s[1].
3. Create a jobmanager.yaml with service account(enough permission to
create pod in K8s
cluster) correctly set


[1].
https://ci.apache.org/projects/flink/flink-docs-master/ops/deployment/kubernetes.html#appendix


Best,
Yang

Ufuk Celebi  于2020年3月24日周二 下午9:51写道:

> PS: See also
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/deployment/kubernetes.html#flink-job-cluster-on-kubernetes
>
> On Tue, Mar 24, 2020 at 2:49 PM Ufuk Celebi  wrote:
>
>> Hey Niels,
>>
>> you can check out the README with example configuration files here:
>> https://github.com/apache/flink/tree/master/flink-container/kubernetes
>>
>> Is that what you were looking for?
>>
>> Best,
>>
>> Ufuk
>>
>> On Tue, Mar 24, 2020 at 2:42 PM Niels Basjes  wrote:
>>
>>> Hi,
>>>
>>> As clearly documented here
>>> https://ci.apache.org/projects/flink/flink-docs-master/ops/deployment/native_kubernetes.html
>>>  the
>>> current way of deploying Flink natively on Kubernetes is by running the 
>>> ./bin/kubernetes-session.sh
>>> script that runs some Java code that does "magic" to deploy in on the
>>> cluster.
>>> This works.
>>>
>>> I was wondering: Is it with the current code base already possible to
>>> craft a set of Yaml files (perhaps even with a special Docker image) so
>>> that I can deploy it using the 'normal' Kubernetes way of doing a kubectl
>>> apply -f foo.yaml ?
>>>
>>> --
>>> Best regards / Met vriendelijke groeten,
>>>
>>> Niels Basjes
>>>
>>


????flink sql 1.10 source????????????????????

2020-03-24 Thread Chief
hi all??
??flink sqlhivehive150sql 
client??10??source??150web 
ui

Re: Lack of KeyedBroadcastStateBootstrapFunction

2020-03-24 Thread Dawid Wysakowicz
Hi,

I am not very familiar with the State Processor API, but from a brief
look at it, I think you are right. I think the State Processor API does
not support mixing different kinds of states in a single operator for
now. At least not in a nice way. Probably you could implement the
KeyedBroadcastStateBootstrapFunction yourself and us it with
KeyedOperatorTransformation#transform(org.apache.flink.state.api.SavepointWriterOperatorFactory).
I understand this is probably not the easiest task.

I am not aware if there are plans to support that out of the box, but I
cc'ed Gordon and Seth who if I remember correctly worked on that API. I
hope they might give you some more insights.

Best,

Dawid

 On 23/03/2020 17:36, Mark Niehe wrote:
> Hey all,
>
> I have another question about the State Processor API. I can't seem to
> find a way to create a KeyedBroadcastStateBootstrapFunction operator.
> The two options currently available to bootstrap a savepoint with
> state are KeyedStateBootstrapFunction and
> BroadcastStateBootstrapFunction. Because these are the only two
> options, it's not possible to bootstrap both keyed and broadcast state
> for the same operator. Are there any plans to add that functionality
> or did I miss it entirely when going through the API docs?
>
> Thanks,
> -- 
> 
> Mark Niehe ·  Software Engineer
> Integrations
>   ·  Blog
>   ·  We're
> Hiring! 


signature.asc
Description: OpenPGP digital signature


Re: Deploying native Kubernetes via yaml files?

2020-03-24 Thread Ufuk Celebi
PS: See also
https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/deployment/kubernetes.html#flink-job-cluster-on-kubernetes

On Tue, Mar 24, 2020 at 2:49 PM Ufuk Celebi  wrote:

> Hey Niels,
>
> you can check out the README with example configuration files here:
> https://github.com/apache/flink/tree/master/flink-container/kubernetes
>
> Is that what you were looking for?
>
> Best,
>
> Ufuk
>
> On Tue, Mar 24, 2020 at 2:42 PM Niels Basjes  wrote:
>
>> Hi,
>>
>> As clearly documented here
>> https://ci.apache.org/projects/flink/flink-docs-master/ops/deployment/native_kubernetes.html
>>  the
>> current way of deploying Flink natively on Kubernetes is by running the 
>> ./bin/kubernetes-session.sh
>> script that runs some Java code that does "magic" to deploy in on the
>> cluster.
>> This works.
>>
>> I was wondering: Is it with the current code base already possible to
>> craft a set of Yaml files (perhaps even with a special Docker image) so
>> that I can deploy it using the 'normal' Kubernetes way of doing a kubectl
>> apply -f foo.yaml ?
>>
>> --
>> Best regards / Met vriendelijke groeten,
>>
>> Niels Basjes
>>
>


Re: Deploying native Kubernetes via yaml files?

2020-03-24 Thread Ufuk Celebi
Hey Niels,

you can check out the README with example configuration files here:
https://github.com/apache/flink/tree/master/flink-container/kubernetes

Is that what you were looking for?

Best,

Ufuk

On Tue, Mar 24, 2020 at 2:42 PM Niels Basjes  wrote:

> Hi,
>
> As clearly documented here
> https://ci.apache.org/projects/flink/flink-docs-master/ops/deployment/native_kubernetes.html
>  the
> current way of deploying Flink natively on Kubernetes is by running the 
> ./bin/kubernetes-session.sh
> script that runs some Java code that does "magic" to deploy in on the
> cluster.
> This works.
>
> I was wondering: Is it with the current code base already possible to
> craft a set of Yaml files (perhaps even with a special Docker image) so
> that I can deploy it using the 'normal' Kubernetes way of doing a kubectl
> apply -f foo.yaml ?
>
> --
> Best regards / Met vriendelijke groeten,
>
> Niels Basjes
>


Re: Flink on YARN 使用Kerboros认证失败

2020-03-24 Thread nie...@163.com
对于Flink on YARN,最简单的情况是直接在终端 kinit,就能提交任务。flink本身不用配置。
Can't get Kerberos realm一般是是krb5.conf对应realm的配置的问题。

flink/hado...@example.com   
hadoop0不知道是不是主机,这看起来像是个服务的principal 。 这里应该是user的principal 就行了。






> 在 2020年3月24日,下午9:03,巫旭阳  写道:
> 
> 之前在使用hadoop client时设置了一个系统变量, 当这个变量没设置的时候就会报之前的错误
> System.setProperty("java.security.krb5.conf", 
> "C:\\Users\\86177\\Desktop\\tmp\\5\\krb5.conf" );
> 但flink on yarn 没有提供这个参数的设置。
> 
> 
> 
> 
> 
> 
> 
> 在 2020-03-24 20:52:44,"aven.wu"  写道:
> 
> Flink 提交作业到有kerboros认证的集群报以下异常
> 
> 
> 
> java.lang.Exception: unable to establish the security context
> at 
> org.apache.flink.runtime.security.SecurityUtils.install(SecurityUtils.java:73)
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1124)
> Caused by: java.lang.IllegalArgumentException: Can't get Kerberos realm
> at 
> org.apache.hadoop.security.HadoopKerberosName.setConfiguration(HadoopKerberosName.java:65)
> at 
> org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:276)
> at 
> org.apache.hadoop.security.UserGroupInformation.setConfiguration(UserGroupInformation.java:312)
> at 
> org.apache.flink.runtime.security.modules.HadoopModule.install(HadoopModule.java:70)
> at 
> org.apache.flink.runtime.security.SecurityUtils.install(SecurityUtils.java:67)
> ... 1 more
> Caused by: java.lang.reflect.InvocationTargetException
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at 
> org.apache.hadoop.security.authentication.util.KerberosUtil.getDefaultRealm(KerberosUtil.java:84)
> at 
> org.apache.hadoop.security.HadoopKerberosName.setConfiguration(HadoopKerberosName.java:63)
> ... 5 more
> Caused by: KrbException: Cannot locate default realm
> at sun.security.krb5.Config.getDefaultRealm(Config.java:1029)
> ... 11 more
> 
> 
> 
> 使用了官网提供的四个参数,配置在了flink-conf.yaml里
> 
> 
> 
> security.kerberos.login.use-ticket-cache: false
> security.kerberos.login.keytab: /home/flink-1.8.0/conf/flink.keytab
> security.kerberos.login.principal: flink/hado...@example.com
> security.kerberos.login.realm: EXAMPLE.COM
> security.kerberos.login.contexts: KafkaClient
> 
> 
> 
> /home/flink-1.8.0/conf/flink.keytab 文件已放好,
> 
> 
> 
> 
> 
> Best
> 
> Aven
> 



回复: Flink JDBC Driver是否支持创建流数据表

2020-03-24 Thread wangl...@geekplus.com.cn

参考下这个文档: 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector
 
下面的语法应该是不支持的:
  'format.type' = 'csv',\n" +
"'format.field-delimiter' = '|'\n"

下面是我可以跑通的代码, kafka 里的数据需要是这种格式:{"order_no":"abcdefg","status":90}
tEnv.sqlUpdate("CREATE TABLE pick_order (\n"
+ "order_no VARCHAR,\n"
+ "status INT\n"
+ ") WITH (\n"
+ "'connector.type' = 'kafka',\n"
+ "'connector.version' = 'universal',\n"
+ "'connector.topic' = 'wanglei_test',\n"
+ "'connector.startup-mode' = 'latest-offset',\n"
+ "'connector.properties.0.key' = 'zookeeper.connect',\n"
+ "'connector.properties.0.value' = 'xxx:2181',\n"
+ "'connector.properties.1.key' = 'bootstrap.servers',\n"
+ "'connector.properties.1.value' = 'xxx:9092',\n"
+ "'update-mode' = 'append',\n"
+ "'format.type' = 'json',\n"
+ "'format.derive-schema' = 'true'\n"
+ ")");

王磊


wangl...@geekplus.com.cn
 
发件人: 赵峰
发送时间: 2020-03-24 21:28
收件人: user-zh
主题: Flink JDBC Driver是否支持创建流数据表
hi
   
Flink JDBC Driver创建kafka表报错,是我的建表代码不正确?代码如下:
Connection connection = 
DriverManager.getConnection("jdbc:flink://localhost:8083?planner=blink");
Statement statement = connection.createStatement();
 
statement.executeUpdate(
"CREATE TABLE table_kafka (\n" +
"user_id BIGINT,\n" +
"item_id BIGINT,\n" +
"category_id BIGINT,\n" +
"behavior STRING,\n" +
"ts TIMESTAMP(3),\n" +
"proctime as PROCTIME(),\n" +
"WATERMARK FOR ts as ts - INTERVAL '5' SECOND\n" +
") WITH (\n" +
"'connector.type' = 'kafka', \n" +
"'connector.version' = 'universal', \n" +
"'connector.topic' = 'flink_im02', \n" +
"'connector.properties.group.id' = 'flink_im02_new',\n" +
"'connector.startup-mode' = 'earliest-offset', \n" +
"'connector.properties.zookeeper.connect' = '*.*.*.*:2181', \n" +
"'connector.properties.bootstrap.servers' = '*.*.*.*:9092', \n" +
"'format.type' = 'csv',\n" +
"'format.field-delimiter' = '|'\n" +
")");
ResultSet rs1 = statement.executeQuery("SELECT * FROM table_kafka");
while (rs1.next()) {
System.out.println(rs1.getInt(1) + ", " + rs1.getInt(2));
} 
 
statement.close();
connection.close();
 
报错:
Reason: Required context properties mismatch.
 
The matching candidates:
org.apache.flink.table.sources.CsvBatchTableSourceFactory
Mismatched properties:
'connector.type' expects 'filesystem', but is 'kafka'
 
 
 
 
赵峰


Deploying native Kubernetes via yaml files?

2020-03-24 Thread Niels Basjes
Hi,

As clearly documented here
https://ci.apache.org/projects/flink/flink-docs-master/ops/deployment/native_kubernetes.html
the
current way of deploying Flink natively on Kubernetes is by running
the ./bin/kubernetes-session.sh
script that runs some Java code that does "magic" to deploy in on the
cluster.
This works.

I was wondering: Is it with the current code base already possible to craft
a set of Yaml files (perhaps even with a special Docker image) so that I
can deploy it using the 'normal' Kubernetes way of doing a kubectl apply -f
foo.yaml ?

-- 
Best regards / Met vriendelijke groeten,

Niels Basjes


Flink JDBC Driver是否支持创建流数据表

2020-03-24 Thread 赵峰
hi
   
Flink JDBC Driver创建kafka表报错,是我的建表代码不正确?代码如下:
Connection connection = 
DriverManager.getConnection("jdbc:flink://localhost:8083?planner=blink");
Statement statement = connection.createStatement();

statement.executeUpdate(
"CREATE TABLE table_kafka (\n" +
"user_id BIGINT,\n" +
"item_id BIGINT,\n" +
"category_id BIGINT,\n" +
"behavior STRING,\n" +
"ts TIMESTAMP(3),\n" +
"proctime as PROCTIME(),\n" +
"WATERMARK FOR ts as ts - INTERVAL '5' SECOND\n" +
") WITH (\n" +
"'connector.type' = 'kafka', \n" +
"'connector.version' = 'universal', \n" +
"'connector.topic' = 'flink_im02', \n" +
"'connector.properties.group.id' = 'flink_im02_new',\n" +
"'connector.startup-mode' = 'earliest-offset', \n" +
"'connector.properties.zookeeper.connect' = '*.*.*.*:2181', \n" +
"'connector.properties.bootstrap.servers' = '*.*.*.*:9092', \n" +
"'format.type' = 'csv',\n" +
"'format.field-delimiter' = '|'\n" +
")");
ResultSet rs1 = statement.executeQuery("SELECT * FROM table_kafka");
while (rs1.next()) {
System.out.println(rs1.getInt(1) + ", " + rs1.getInt(2));
} 

statement.close();
connection.close();

报错:
Reason: Required context properties mismatch.

The matching candidates:
org.apache.flink.table.sources.CsvBatchTableSourceFactory
Mismatched properties:
'connector.type' expects 'filesystem', but is 'kafka'




赵峰

Flink JDBC Driver可以创建kafka表吗?

2020-03-24 Thread 赵峰
hi
   Flink JDBC Driver创建kafka表报错,是我的建表代码不正确?代码如下:
Connection connection = 
DriverManager.getConnection("jdbc:flink://localhost:8083?planner=blink");
Statement statement = connection.createStatement();

statement.executeUpdate(
"CREATE TABLE table_kafka (\n" +
"user_id BIGINT,\n" +
"item_id BIGINT,\n" +
"category_id BIGINT,\n" +
"behavior STRING,\n" +
"ts TIMESTAMP(3),\n" +
"proctime as PROCTIME(),\n" +
"WATERMARK FOR ts as ts - INTERVAL '5' SECOND\n" +
") WITH (\n" +
"'connector.type' = 'kafka', \n" +
"'connector.version' = 'universal', \n" +
"'connector.topic' = 'flink_im02', \n" +
"'connector.properties.group.id' = 'flink_im02_new',\n" +
"'connector.startup-mode' = 'earliest-offset', \n" +
"'connector.properties.zookeeper.connect' = '*.*.*.*:2181', \n" +
"'connector.properties.bootstrap.servers' = '*.*.*.*:9092', \n" +
"'format.type' = 'csv',\n" +
"'format.field-delimiter' = '|'\n" +
")");
ResultSet rs1 = statement.executeQuery("SELECT * FROM table_kafka");
while (rs1.next()) {
System.out.println(rs1.getInt(1) + ", " + rs1.getInt(2));
} 

statement.close();
connection.close();

报错:
Reason: Required context properties mismatch.

The matching candidates:
org.apache.flink.table.sources.CsvBatchTableSourceFactory
Mismatched properties:
'connector.type' expects 'filesystem', but is 'kafka'




赵峰

Re: Issue with single job yarn flink cluster HA

2020-03-24 Thread Andrey Zagrebin
Hi Dinesh,

If the current leader crashes (e.g. due to network failures) then getting
these messages do not look like a problem during the leader re-election.
They look to me just as warnings that caused failover.

Do you observe any problem with your application? Does the failover not
work, e.g. no leader is elected or a job is not restarted after the current
leader failure?

Best,
Andrey

On Sun, Mar 22, 2020 at 11:14 AM Dinesh J  wrote:

> Attaching the job manager log for reference.
>
> 2020-03-22 11:39:02,693 WARN
>  org.apache.flink.runtime.webmonitor.retriever.impl.RpcGatewayRetriever  -
> Error while retrieving the leader gateway. Retrying to connect to
> akka.tcp://flink@host1:28681/user/dispatcher.
> 2020-03-22 11:39:02,724 WARN  akka.remote.transport.netty.NettyTransport
>  - Remote connection to [null] failed with
> java.net.ConnectException: Connection refused: host1/ipaddress1:28681
> 2020-03-22 11:39:02,724 WARN  akka.remote.ReliableDeliverySupervisor
>  - Association with remote system 
> [akka.tcp://flink@host1:28681]
> has failed, address is now gated for [50] ms. Reason: [Association failed
> with [akka.tcp://flink@host1:28681]] Caused by: [Connection refused:
> host1/ipaddress1:28681]
> 2020-03-22 11:39:02,791 WARN  akka.remote.transport.netty.NettyTransport
>  - Remote connection to [null] failed with
> java.net.ConnectException: Connection refused: host1/ipaddress1:28681
> 2020-03-22 11:39:02,792 WARN  akka.remote.ReliableDeliverySupervisor
>  - Association with remote system 
> [akka.tcp://flink@host1:28681]
> has failed, address is now gated for [50] ms. Reason: [Association failed
> with [akka.tcp://flink@host1:28681]] Caused by: [Connection refused:
> host1/ipaddress1:28681]
> 2020-03-22 11:39:02,861 WARN  akka.remote.transport.netty.NettyTransport
>  - Remote connection to [null] failed with
> java.net.ConnectException: Connection refused: host1/ipaddress1:28681
> 2020-03-22 11:39:02,861 WARN  akka.remote.ReliableDeliverySupervisor
>  - Association with remote system 
> [akka.tcp://flink@host1:28681]
> has failed, address is now gated for [50] ms. Reason: [Association failed
> with [akka.tcp://flink@host1:28681]] Caused by: [Connection refused:
> host1/ipaddress1:28681]
> 2020-03-22 11:39:02,931 WARN  akka.remote.transport.netty.NettyTransport
>  - Remote connection to [null] failed with
> java.net.ConnectException: Connection refused: host1/ipaddress1:28681
> 2020-03-22 11:39:02,931 WARN  akka.remote.ReliableDeliverySupervisor
>  - Association with remote system 
> [akka.tcp://flink@host1:28681]
> has failed, address is now gated for [50] ms. Reason: [Association failed
> with [akka.tcp://flink@host1:28681]] Caused by: [Connection refused:
> host1/ipaddress1:28681]
> 2020-03-22 11:39:03,001 WARN  akka.remote.transport.netty.NettyTransport
>  - Remote connection to [null] failed with
> java.net.ConnectException: Connection refused: host1/ipaddress1:28681
> 2020-03-22 11:39:03,002 WARN  akka.remote.ReliableDeliverySupervisor
>  - Association with remote system 
> [akka.tcp://flink@host1:28681]
> has failed, address is now gated for [50] ms. Reason: [Association failed
> with [akka.tcp://flink@host1:28681]] Caused by: [Connection refused:
> host1/ipaddress1:28681]
> 2020-03-22 11:39:03,071 WARN  akka.remote.transport.netty.NettyTransport
>  - Remote connection to [null] failed with
> java.net.ConnectException: Connection refused: host1/ipaddress1:28681
> 2020-03-22 11:39:03,071 WARN  akka.remote.ReliableDeliverySupervisor
>  - Association with remote system 
> [akka.tcp://flink@host1:28681]
> has failed, address is now gated for [50] ms. Reason: [Association failed
> with [akka.tcp://flink@host1:28681]] Caused by: [Connection refused:
> host1/ipaddress1:28681]
> 2020-03-22 11:39:03,141 WARN  akka.remote.transport.netty.NettyTransport
>  - Remote connection to [null] failed with
> java.net.ConnectException: Connection refused: host1/ipaddress1:28681
> 2020-03-22 11:39:03,141 WARN  akka.remote.ReliableDeliverySupervisor
>  - Association with remote system 
> [akka.tcp://flink@host1:28681]
> has failed, address is now gated for [50] ms. Reason: [Association failed
> with [akka.tcp://flink@host1:28681]] Caused by: [Connection refused:
> host1/ipaddress1:28681]
> 2020-03-22 11:39:03,211 WARN  akka.remote.transport.netty.NettyTransport
>  - Remote connection to [null] failed with
> java.net.ConnectException: Connection refused: host1/ipaddress1:28681
> 2020-03-22 11:39:03,211 WARN  akka.remote.ReliableDeliverySupervisor
>  - Association with remote system 
> [akka.tcp://flink@host1:28681]
> has failed, address is now gated for [50] ms. Reason: [Association failed
> with [akka.tcp://flink@host1:28681]] Caused by: [Connection refused:
> 

Re:Flink on YARN 使用Kerboros认证失败

2020-03-24 Thread 巫旭阳
之前在使用hadoop client时设置了一个系统变量, 当这个变量没设置的时候就会报之前的错误
System.setProperty("java.security.krb5.conf", 
"C:\\Users\\86177\\Desktop\\tmp\\5\\krb5.conf" );
但flink on yarn 没有提供这个参数的设置。







在 2020-03-24 20:52:44,"aven.wu"  写道:

Flink 提交作业到有kerboros认证的集群报以下异常

 

java.lang.Exception: unable to establish the security context
at 
org.apache.flink.runtime.security.SecurityUtils.install(SecurityUtils.java:73)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1124)
Caused by: java.lang.IllegalArgumentException: Can't get Kerberos realm
at 
org.apache.hadoop.security.HadoopKerberosName.setConfiguration(HadoopKerberosName.java:65)
at 
org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:276)
at 
org.apache.hadoop.security.UserGroupInformation.setConfiguration(UserGroupInformation.java:312)
at 
org.apache.flink.runtime.security.modules.HadoopModule.install(HadoopModule.java:70)
at 
org.apache.flink.runtime.security.SecurityUtils.install(SecurityUtils.java:67)
... 1 more
Caused by: java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.hadoop.security.authentication.util.KerberosUtil.getDefaultRealm(KerberosUtil.java:84)
at 
org.apache.hadoop.security.HadoopKerberosName.setConfiguration(HadoopKerberosName.java:63)
... 5 more
Caused by: KrbException: Cannot locate default realm
at sun.security.krb5.Config.getDefaultRealm(Config.java:1029)
... 11 more

 

使用了官网提供的四个参数,配置在了flink-conf.yaml里

 

security.kerberos.login.use-ticket-cache: false
security.kerberos.login.keytab: /home/flink-1.8.0/conf/flink.keytab
security.kerberos.login.principal: flink/hado...@example.com
security.kerberos.login.realm: EXAMPLE.COM
security.kerberos.login.contexts: KafkaClient

 

/home/flink-1.8.0/conf/flink.keytab 文件已放好,

 

 

Best

Aven

 

Flink on YARN 使用Kerboros认证失败

2020-03-24 Thread aven . wu
Flink 提交作业到有kerboros认证的集群报以下异常

java.lang.Exception: unable to establish the security context
at 
org.apache.flink.runtime.security.SecurityUtils.install(SecurityUtils.java:73)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1124)
Caused by: java.lang.IllegalArgumentException: Can't get Kerberos realm
at 
org.apache.hadoop.security.HadoopKerberosName.setConfiguration(HadoopKerberosName.java:65)
at 
org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:276)
at 
org.apache.hadoop.security.UserGroupInformation.setConfiguration(UserGroupInformation.java:312)
at 
org.apache.flink.runtime.security.modules.HadoopModule.install(HadoopModule.java:70)
at 
org.apache.flink.runtime.security.SecurityUtils.install(SecurityUtils.java:67)
... 1 more
Caused by: java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.hadoop.security.authentication.util.KerberosUtil.getDefaultRealm(KerberosUtil.java:84)
at 
org.apache.hadoop.security.HadoopKerberosName.setConfiguration(HadoopKerberosName.java:63)
... 5 more
Caused by: KrbException: Cannot locate default realm
at sun.security.krb5.Config.getDefaultRealm(Config.java:1029)
... 11 more

使用了官网提供的四个参数,配置在了flink-conf.yaml里

security.kerberos.login.use-ticket-cache: false
security.kerberos.login.keytab: /home/flink-1.8.0/conf/flink.keytab
security.kerberos.login.principal: flink/hado...@example.com
security.kerberos.login.realm: EXAMPLE.COM
security.kerberos.login.contexts: KafkaClient

/home/flink-1.8.0/conf/flink.keytab 文件已放好,


Best
Aven



Re: Kerberos authentication for SQL CLI

2020-03-24 Thread Gyula Fóra
Thanks Dawid,

I think you are right that most of the things should work like this just
fine. Maybe some catalogs will need this at some point but not right now,
I was just wondering why is this different from how the CliFrontend works
which also installs the security context on the Client side.

I guess same arguments should apply for the SQL CLI, whatever they might be
:)

Gyula

On Tue, Mar 24, 2020 at 12:30 PM Dawid Wysakowicz 
wrote:

> Hi Gyula,
>
> As far as I can tell SQL cli does not support Kerberos natively. SQL CLI
> submits all the queries to a running Flink cluster. Therefore if you
> kerberize the cluster the queries will use that configuration.
>
> On a different note. Out of curiosity. What would you expect the SQL CLI
> to use the Kerberos authentication for?
>
> Best,
>
> Dawid
>
> On 24/03/2020 11:11, Gyula Fóra wrote:
> > Hi!
> >
> > Does the SQL CLI support Kerberos Authentication?
> >
> > I am struggling to find any use of the SecurityContext in the SQL CLI
> > logic but maybe I am looking in the wrong place.
> >
> > Thank you!
> > Gyula
>
>


Re: How to calculate one alarm strategy for each device or one alarm strategy for each type of IOT device

2020-03-24 Thread Dawid Wysakowicz
Hi,

Could you elaborate a bit more what do you want to achieve. What have
you tried so far? Could you share some code with us? What problems are
you facing?

From the vague description you provided you should be able to design it
with e.g. KeyedProcessFunction[1]

Best,

Dawid

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/operators/process_function.html#the-keyedprocessfunction

On 24/03/2020 03:57, yang xu wrote:
> Hi,
> How to calculate one alarm strategy for each device or one alarm strategy
> for each type of IOT device。
> My way is:
> 1. Use ListStateto store device state data for calculation
>
>
>
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



signature.asc
Description: OpenPGP digital signature


Re: Issues with Watermark generation after join

2020-03-24 Thread Timo Walther
Or better: "But for sources, you need to emit a watermark from all 
sources in order to have progress in event-time."



On 24.03.20 13:09, Timo Walther wrote:

Hi,

1) yes with "partition" I meant "parallel instance".

If the watermarking is correct in the DataStream API. The Table API and 
SQL will take care that it remains correct. E.g. you can only perform a 
TUMBLE window if the timestamp column has not lost its time attribute 
property. A regular JOIN (not time-versioned) does not work with 
watermarks, thus, the result will not have time attributes anymore. A 
subsequent TUMBLE window usage will fail with an exception.


2) You don't need output. Most operators deal with watermarking logic. 
But for sources, you need output from all sources in order to have 
progress in event-time.


Regards,
Timo


On 24.03.20 12:21, Dominik Wosiński wrote:

Hey Timo,
Thanks a lot for this answer! I was mostly using the DataStream API, 
so that's good to know the difference.

I have followup questions then, I will be glad for clarification:

1) So, for the SQL Join operator, is the /partition /the parallel 
instance of operator or is it the table partitioning as defined by 
/partitionBy ??/
2) Assuming that this is instance of parallel operator, does this mean 
that we need output from ALL operators so that the watermark 
progresses and the output is generated?


Best Regards,
Dom.

wt., 24 mar 2020 o 10:01 Timo Walther > napisał(a):


    Hi Dominik,

    the big conceptual difference between DataStream and Table API is 
that
    record timestamps are part of the schema in Table API whereas they 
are

    attached internally to each record in DataStream API. When you call
    `y.rowtime` during a stream to table conversion, the runtime will
    extract the internal timestamp and will copy it into the field `y`.

    Even if the timestamp is not internally anymore, Flink makes sure 
that

    the watermarking (which still happens internally) remains valid.
    However, this means that timestamps and watermarks must already be
    correct when entering the Table API. If they were not correct before,
    they will also not trigger time-based operations correctly.

    If there is no output for a parallelism > 1, usually this means that
    one
    source parition has not emitted a watermark to have progress globally
    for the job:

    watermark of operator = min(previous operator partition 1, previous
    operator partition 2, ...)

    I hope this helps.

    Regards,
    Timo


    On 19.03.20 16:38, Dominik Wosiński wrote:
 > I have created a simple minimal reproducible example that shows
    what I
 > am talking about:
 > https://github.com/DomWos/FlinkTTF/tree/sql-ttf
 >
 > It contains a test that shows that even if the output is in order
    which
 > is enforced by multiple sleeps, then for parallelism > 1 there 
is no

 > output and for parallelism == 1, the output is produced normally.
 >
 > Best Regards,
 > Dom.





Re: Issues with Watermark generation after join

2020-03-24 Thread Timo Walther

Hi,

1) yes with "partition" I meant "parallel instance".

If the watermarking is correct in the DataStream API. The Table API and 
SQL will take care that it remains correct. E.g. you can only perform a 
TUMBLE window if the timestamp column has not lost its time attribute 
property. A regular JOIN (not time-versioned) does not work with 
watermarks, thus, the result will not have time attributes anymore. A 
subsequent TUMBLE window usage will fail with an exception.


2) You don't need output. Most operators deal with watermarking logic. 
But for sources, you need output from all sources in order to have 
progress in event-time.


Regards,
Timo


On 24.03.20 12:21, Dominik Wosiński wrote:

Hey Timo,
Thanks a lot for this answer! I was mostly using the DataStream API, so 
that's good to know the difference.

I have followup questions then, I will be glad for clarification:

1) So, for the SQL Join operator, is the /partition /the parallel 
instance of operator or is it the table partitioning as defined by 
/partitionBy ??/
2) Assuming that this is instance of parallel operator, does this mean 
that we need output from ALL operators so that the watermark progresses 
and the output is generated?


Best Regards,
Dom.

wt., 24 mar 2020 o 10:01 Timo Walther > napisał(a):


Hi Dominik,

the big conceptual difference between DataStream and Table API is that
record timestamps are part of the schema in Table API whereas they are
attached internally to each record in DataStream API. When you call
`y.rowtime` during a stream to table conversion, the runtime will
extract the internal timestamp and will copy it into the field `y`.

Even if the timestamp is not internally anymore, Flink makes sure that
the watermarking (which still happens internally) remains valid.
However, this means that timestamps and watermarks must already be
correct when entering the Table API. If they were not correct before,
they will also not trigger time-based operations correctly.

If there is no output for a parallelism > 1, usually this means that
one
source parition has not emitted a watermark to have progress globally
for the job:

watermark of operator = min(previous operator partition 1, previous
operator partition 2, ...)

I hope this helps.

Regards,
Timo


On 19.03.20 16:38, Dominik Wosiński wrote:
 > I have created a simple minimal reproducible example that shows
what I
 > am talking about:
 > https://github.com/DomWos/FlinkTTF/tree/sql-ttf
 >
 > It contains a test that shows that even if the output is in order
which
 > is enforced by multiple sleeps, then for parallelism > 1 there is no
 > output and for parallelism == 1, the output is produced normally.
 >
 > Best Regards,
 > Dom.





如何提升任务cpu使用率

2020-03-24 Thread yanggang_it_job
hi:
   背景介绍,现在集群的剩余核数不多,就去梳理了一些大任务。
   
通过PromSQL:max(flink_taskmanager_Status_JVM_CPU_Load{job_name={job_name}})获取指定任务的cpu使用率,
   发现任务的cpu使用率普遍较低,一个slot为10的container,使用率大多小于6%。
   
   然后我测试中我降低container里面的slot数,发现cpu使用率并没有线性增加,同理我增大slot数也没有线性减少。
   
   我是不是测试的有问题呢?或者有什么相关思路吗?




  
 

Re: Importance of calling mapState.clear() when no entries in mapState

2020-03-24 Thread Dawid Wysakowicz
I think there should be no reason to do that.

Best,

Dawid

On 24/03/2020 09:29, Ilya Karpov wrote:
> Hi,
>
> given: 
> - flink 1.6.1
> - stateful function with MapState mapState = //init logic;
>
> Is there any reason I should call mapState.clear() if I know beforehand that 
> there are no entries in mapState (e.g. mapState.iterator().hasNext() returns 
> false)?
>
> Thanks in advance!



signature.asc
Description: OpenPGP digital signature


Re: JDBC Sink参数connector.write.max-retries 在Oracle中的bug

2020-03-24 Thread Leonard Xu
Hi, xinghalo
这是jdbc sink 的AppenOnlyWriter的一个已知bug,在1.10.1里已经修复[1],社区近期在准备1.10.1的发布,
建议等1.10.1发布后升级即可。

Best,
Leonard

[1]https://issues.apache.org/jira/browse/FLINK-16281 


> 在 2020年3月24日,18:32,111  写道:
> 
> Hi,
> 在使用jdbc sink时,底层使用oracle驱动会出现bug。
> 出现的现象:当max-retries参数设置为1时,任务能正常报错;当max-retries参数大于1时,虽然程序内部报错,但是任务总是正常结束。
> 
> 
> 在JDBCUpsertOutputFormat.java中的flush()方法中,设计了重试机制:
> public synchronized void flush() throws Exception {
>   checkFlushException();
> 
>   for (int i = 1; i <= maxRetryTimes; i++) {
> try {
> jdbcWriter.executeBatch();
> batchCount = 0;
> break;
> } catch (SQLException e) {
> LOG.error("JDBC executeBatch error, retry times = {}", i, e);
> if (i >= maxRetryTimes) {
> //throw e;
> System.exit(-1);
> }
> Thread.sleep(1000 * i);
> }
>   }
> }
> 但是当executeBatch出现异常时,会进入异常捕获,并清空rank信息(ojdbc14的5339行):
> } catch (SQLException var17) {
> this.clearBatch();
>this.needToParse = true;
>if (this.sqlKind != 1 && this.sqlKind != 4) {
> for(var3 = 0; var3 < var4.length; ++var3) {
>var4[var3] = -3;
> }
>}
> 
>DatabaseError.throwBatchUpdateException(var17, this.sqlKind != 1 && 
> this.sqlKind != 4 ? var4.length : var3, var4);
> } finally {
> 下一次执行executeBatch时,由于rank为0,会直接跳过插入操作,返回成功。
> public int[] executeBatch() throws SQLException {
> synchronized(this.connection) {
> int[] var1;
>synchronized(this) {
> int var3 = 0;
>this.setJdbcBatchStyle();
>int[] var4 = new int[this.currentRank];
>if (this.currentRank > 0) {
> this.ensureOpen();
> 从而导致第二次重试的时候直接跳过插入操作,成功返回。
> 
> 
> 
> 
> 



Re: Kerberos authentication for SQL CLI

2020-03-24 Thread Dawid Wysakowicz
Hi Gyula,

As far as I can tell SQL cli does not support Kerberos natively. SQL CLI
submits all the queries to a running Flink cluster. Therefore if you
kerberize the cluster the queries will use that configuration.

On a different note. Out of curiosity. What would you expect the SQL CLI
to use the Kerberos authentication for?

Best,

Dawid

On 24/03/2020 11:11, Gyula Fóra wrote:
> Hi!
>
> Does the SQL CLI support Kerberos Authentication?
>
> I am struggling to find any use of the SecurityContext in the SQL CLI
> logic but maybe I am looking in the wrong place.
>
> Thank you!
> Gyula



signature.asc
Description: OpenPGP digital signature


Re: Issues with Watermark generation after join

2020-03-24 Thread Dominik Wosiński
Hey Timo,
Thanks a lot for this answer! I was mostly using the DataStream API, so
that's good to know the difference.
I have followup questions then, I will be glad for clarification:

1) So, for the SQL Join operator, is the *partition *the parallel instance
of operator or is it the table partitioning as defined by *partitionBy ??*
2) Assuming that this is instance of parallel operator, does this mean that
we need output from ALL operators so that the watermark progresses and the
output is generated?

Best Regards,
Dom.

wt., 24 mar 2020 o 10:01 Timo Walther  napisał(a):

> Hi Dominik,
>
> the big conceptual difference between DataStream and Table API is that
> record timestamps are part of the schema in Table API whereas they are
> attached internally to each record in DataStream API. When you call
> `y.rowtime` during a stream to table conversion, the runtime will
> extract the internal timestamp and will copy it into the field `y`.
>
> Even if the timestamp is not internally anymore, Flink makes sure that
> the watermarking (which still happens internally) remains valid.
> However, this means that timestamps and watermarks must already be
> correct when entering the Table API. If they were not correct before,
> they will also not trigger time-based operations correctly.
>
> If there is no output for a parallelism > 1, usually this means that one
> source parition has not emitted a watermark to have progress globally
> for the job:
>
> watermark of operator = min(previous operator partition 1, previous
> operator partition 2, ...)
>
> I hope this helps.
>
> Regards,
> Timo
>
>
> On 19.03.20 16:38, Dominik Wosiński wrote:
> > I have created a simple minimal reproducible example that shows what I
> > am talking about:
> > https://github.com/DomWos/FlinkTTF/tree/sql-ttf
> >
> > It contains a test that shows that even if the output is in order which
> > is enforced by multiple sleeps, then for parallelism > 1 there is no
> > output and for parallelism == 1, the output is produced normally.
> >
> > Best Regards,
> > Dom.
>
>


Re: Object has non serializable fields

2020-03-24 Thread Kostas Kloudas
Hi Eyal,

This is a known issue which is fixed now (see [1]) and will be part of
the next releases.

Cheers,
Kostas

[1] https://issues.apache.org/jira/browse/FLINK-16371

On Tue, Mar 24, 2020 at 11:10 AM Eyal Pe'er  wrote:
>
> Hi all,
>
> I am trying to write a sink function that retrieves string and creates 
> compressed files in time buckets.
>
> The code is pretty straight forward and based on CompressWriterFactoryTest
>
>
>
> import org.apache.flink.core.fs.Path;
>
> import org.apache.flink.formats.compress.CompressWriterFactory;
>
> import org.apache.flink.formats.compress.extractor.DefaultExtractor;
>
> import org.apache.flink.streaming.api.functions.sink.SinkFunction;
>
> import 
> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
>
> import 
> org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner;
>
> import org.apache.flink.formats.compress.CompressWriters;
>
> import org.apache.hadoop.conf.Configuration;
>
>
>
> CompressWriterFactory writer = CompressWriters.forExtractor(new 
> DefaultExtractor())
>
> .withHadoopCompression("Gzip", new Configuration());
>
>
>
> StreamingFileSink.forBulkFormat(new Path(getTargetPath()), writer)
>
> .withBucketAssigner(new 
> DateTimeBucketAssigner<>(getDataTimeBucketFormat(getDataTimeBucket(.build();
>
>
>
>
>
>
>
> When I tried to add it as a sink (dataStream.addSink) the app crashed due to:
>
>
>
> org.apache.hadoop.io.compress.GzipCodec@55e3d6c3 is not serializable. The 
> object probably contains or references non serializable fields.
>
>
>
> Well, I guess I used something wrong, but I am not sure what ?
>
> Or maybe I should convert the SinkFunction to serializable one, but how can I 
> do it?
>
> Best regards
>
> Eyal Peer
>
>


JDBC Sink参数connector.write.max-retries 在Oracle中的bug

2020-03-24 Thread 111
Hi,
在使用jdbc sink时,底层使用oracle驱动会出现bug。
出现的现象:当max-retries参数设置为1时,任务能正常报错;当max-retries参数大于1时,虽然程序内部报错,但是任务总是正常结束。


在JDBCUpsertOutputFormat.java中的flush()方法中,设计了重试机制:
public synchronized void flush() throws Exception {
   checkFlushException();

   for (int i = 1; i <= maxRetryTimes; i++) {
try {
jdbcWriter.executeBatch();
batchCount = 0;
 break;
} catch (SQLException e) {
LOG.error("JDBC executeBatch error, retry times = {}", i, e);
 if (i >= maxRetryTimes) {
//throw e;
System.exit(-1);
}
 Thread.sleep(1000 * i);
}
   }
}
但是当executeBatch出现异常时,会进入异常捕获,并清空rank信息(ojdbc14的5339行):
} catch (SQLException var17) {
this.clearBatch();
this.needToParse = true;
if (this.sqlKind != 1 && this.sqlKind != 4) {
for(var3 = 0; var3 < var4.length; ++var3) {
var4[var3] = -3;
}
}

DatabaseError.throwBatchUpdateException(var17, this.sqlKind != 1 && 
this.sqlKind != 4 ? var4.length : var3, var4);
} finally {
下一次执行executeBatch时,由于rank为0,会直接跳过插入操作,返回成功。
public int[] executeBatch() throws SQLException {
synchronized(this.connection) {
int[] var1;
synchronized(this) {
int var3 = 0;
this.setJdbcBatchStyle();
int[] var4 = new int[this.currentRank];
if (this.currentRank > 0) {
this.ensureOpen();
从而导致第二次重试的时候直接跳过插入操作,成功返回。







Re: FLINK SQL中时间戳怎么处理处理

2020-03-24 Thread Leonard Xu
Hi,吴志勇
你的SQL表定义应该没问题,出问题的地方 现在flink的 json format 遵循 
RFC3399标准[1],其识别的timestamp的格式是:'-MM-dd'T'HH:mm:ss.SSS’Z', 暂不支持long解析为 
timestamp,你可以在输出到kafka时将timestamp转换成该格式:
DateFormat dateFormat =  new SimpleDateFormat("-MM-dd'T'HH:mm:ss.SSS'Z'");
Date date = new Date(System.currentTimeMillis());
String jsonSchemaDate = dateFormat.format(date);
//{"id":5,"price":40,"timestamp”:”2020-03-24T18:23:38.123Z","type":"math"}
这个问题社区已经有issue [2]在跟进了,该issue会支持long转timestamp。

另外,如果是中文提问请发user-zh邮件列表,user邮件列表请用英语哈^_^

Best,
Leonard

[1]https://json-schema.org/understanding-json-schema/reference/string.html#dates-and-times
 

[2]https://issues.apache.org/jira/browse/FLINK-16725 


> 在 2020年3月23日,15:35,吴志勇 <1154365...@qq.com> 写道:
> 
> 如题:
> 我向kafka中输出了json格式的数据
> {"id":5,"price":40,"timestamp":1584942626828,"type":"math"}
> {"id":2,"price":70,"timestamp":1584942629638,"type":"math"}
> {"id":2,"price":70,"timestamp":1584942634951,"type":"math"}
> 
> 其中timestamp字段是13位时间戳,对应的SQL表中应该怎么处理成时间格式呢?
>   - name: bookpojo
> type: source-table
> connector:
>   property-version: 1
>   type: kafka
>   version: "universal"
>   topic: pojosource
>   startup-mode: earliest-offset
>   properties:
> zookeeper.connect: localhost:2181
> bootstrap.servers: localhost:9092
> group.id: testGroup
> format:
>   property-version: 1
>   type: json
>   schema: "ROW"
> schema:
>   - name: id
> data-type: INT
>   - name: type
> data-type: STRING
>   - name: price
> data-type: INT
>   - name: timestamp
> data-type: TIMESTAMP(3)
> 
> 上述配置,好像有问题。
> 
> 我在官网中找到这样一句说明:
> 字符串和时间类型:时间类型必须根据Java SQL时间格式进行格式化,并以毫秒为单位。例如: 
> 2018-01-01日期,20:43:59时间和2018-01-01 20:43:59.999时间戳。
> 时间一定得是字符串类型且带毫秒吗?
> 
> 谢谢。
> 



RE: Need help on timestamp type conversion for Table API on Pravega Connector

2020-03-24 Thread B.Zhou
Hi,

Thanks for the information. I replied in the comment of this issue: 
https://issues.apache.org/jira/browse/FLINK-16693?focusedCommentId=17065486=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17065486
 

Best Regards,
Brian

-Original Message-
From: Timo Walther  
Sent: Tuesday, March 24, 2020 16:40
To: Zhou, Brian; imj...@gmail.com
Cc: user@flink.apache.org
Subject: Re: Need help on timestamp type conversion for Table API on Pravega 
Connector


[EXTERNAL EMAIL] 

This issue is tracked under:

https://issues.apache.org/jira/browse/FLINK-16693

Could you provide us a little reproducible example in the issue? I think that 
could help us in resolving this issue quickly in the next minor release.

Thanks,
Timo


On 20.03.20 03:28, b.z...@dell.com wrote:
> Hi,
> 
> Thanks for the reference, Jark. In Pravega connector, user will define 
> Schema first and then create the table with the descriptor using the 
> schema, see [1] and error also came from this test case. We also tried 
> the recommended `bridgedTo(Timestamp.class)` method in the schema 
> construction, it came with the same error stack trace.
> 
> We are also considering switching to Blink planner implementation, do 
> you think we can get this issue fixed with the change?
> 
> Here is the full stacktrace:
> 
> ```
> 
> org.apache.flink.table.codegen.CodeGenException: Unsupported cast from 
> 'LocalDateTime' to 'Long'.
> 
>     at
> org.apache.flink.table.codegen.calls.ScalarOperators$.generateCast(Sca
> larOperators.scala:815)
> 
>     at
> org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.s
> cala:941)
> 
>     at
> org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.s
> cala:66)
> 
>     at 
> org.apache.calcite.rex.RexCall.accept(RexCall.java:191)
> 
>     at
> org.apache.flink.table.codegen.CodeGenerator.$anonfun$visitCall$1(Code
> Generator.scala:752)
> 
>     at
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:
> 233)
> 
>     at
> scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:5
> 8)
> 
>     at
> scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:
> 51)
> 
>     at
> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> 
>     at
> scala.collection.TraversableLike.map(TraversableLike.scala:233)
> 
>     at
> scala.collection.TraversableLike.map$(TraversableLike.scala:226)
> 
>     at
> scala.collection.AbstractTraversable.map(Traversable.scala:104)
> 
>     at
> org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.s
> cala:742)
> 
>     at
> org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.s
> cala:66)
> 
>     at 
> org.apache.calcite.rex.RexCall.accept(RexCall.java:191)
> 
>     at
> org.apache.flink.table.codegen.CodeGenerator.generateExpression(CodeGe
> nerator.scala:247)
> 
>     at
> org.apache.flink.table.codegen.CodeGenerator.$anonfun$generateConverte
> rResultExpression$1(CodeGenerator.scala:273)
> 
>     at
> org.apache.flink.table.codegen.CodeGenerator.$anonfun$generateConverte
> rResultExpression$1$adapted(CodeGenerator.scala:269)
> 
>     at
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:
> 233)
> 
>     at
> scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala
> :32)
> 
>     at
> scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scal
> a:29)
> 
>     at
> scala.collection.mutable.ArrayOps$ofInt.foreach(ArrayOps.scala:242)
> 
>     at
> scala.collection.TraversableLike.map(TraversableLike.scala:233)
> 
>     at
> scala.collection.TraversableLike.map$(TraversableLike.scala:226)
> 
>     at
> scala.collection.mutable.ArrayOps$ofInt.map(ArrayOps.scala:242)
> 
>     at
> org.apache.flink.table.codegen.CodeGenerator.generateConverterResultEx
> pression(CodeGenerator.scala:269)
> 
>     at
> org.apache.flink.table.plan.nodes.dataset.BatchScan.generateConversion
> Mapper(BatchScan.scala:95)
> 
>     at
> org.apache.flink.table.plan.nodes.dataset.BatchScan.convertToInternalR
> ow(BatchScan.scala:59)
> 
>     at
> org.apache.flink.table.plan.nodes.dataset.BatchScan.convertToInternalR
> ow$(BatchScan.scala:35)
> 
>     at
> org.apache.flink.table.plan.nodes.dataset.BatchTableSourceScan.convert
> ToInternalRow(BatchTableSourceScan.scala:45)
> 
>     at
> org.apache.flink.table.plan.nodes.dataset.BatchTableSourceScan.transla
> teToPlan(BatchTableSourceScan.scala:165)
> 
>     at
> org.apache.flink.table.plan.nodes.dataset.DataSetWindowAggregate.trans
> lateToPlan(DataSetWindowAggregate.scala:114)
> 
>     

Kerberos authentication for SQL CLI

2020-03-24 Thread Gyula Fóra
Hi!

Does the SQL CLI support Kerberos Authentication?

I am struggling to find any use of the SecurityContext in the SQL CLI logic
but maybe I am looking in the wrong place.

Thank you!
Gyula


Object has non serializable fields

2020-03-24 Thread Eyal Pe'er
Hi all,
I am trying to write a sink function that retrieves string and creates 
compressed files in time buckets.
The code is pretty straight forward and based on CompressWriterFactoryTest 


import org.apache.flink.core.fs.Path;
import org.apache.flink.formats.compress.CompressWriterFactory;
import org.apache.flink.formats.compress.extractor.DefaultExtractor;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import 
org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner;
import org.apache.flink.formats.compress.CompressWriters;
import org.apache.hadoop.conf.Configuration;

CompressWriterFactory writer = CompressWriters.forExtractor(new 
DefaultExtractor())
.withHadoopCompression("Gzip", new Configuration());

StreamingFileSink.forBulkFormat(new Path(getTargetPath()), writer)
.withBucketAssigner(new 
DateTimeBucketAssigner<>(getDataTimeBucketFormat(getDataTimeBucket(.build();



When I tried to add it as a sink (dataStream.addSink) the app crashed due to:

org.apache.hadoop.io.compress.GzipCodec@55e3d6c3 is not serializable. The 
object probably contains or references non serializable fields.

Well, I guess I used something wrong, but I am not sure what ?
Or maybe I should convert the SinkFunction to serializable one, but how can I 
do it?
Best regards
Eyal Peer



Re: ddl es 报错

2020-03-24 Thread Leonard Xu
Hi, 出发
看起来是你缺少了依赖es connector的依赖[1],所以只能找到内置的filesystem connector,目前内置的filesystem 
connector只支持csv format,所以会有这个错误。
在项目中加上缺失的依赖即可,如果使用SQL CLI,也需要将依赖的jar包放到 flink的lib目录下。


org.apache.flink
flink-sql-connector-elasticsearch6_2.11
${flink.version}


org.apache.flink
flink-json
${flink.version}


Best,
Leonard
[1] 
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connect.html#elasticsearch-connector
 



> 在 2020年3月23日,23:30,出发 <573693...@qq.com> 写道:
> 
> 
> 源码如下:
> CREATE TABLE buy_cnt_per_hour ( 
> hour_of_day BIGINT,
> buy_cnt BIGINT
> ) WITH (
> 'connector.type' = 'elasticsearch',
> 'connector.version' = '6',
> 'connector.hosts' = 'http://localhost:9200',
> 'connector.index' = 'buy_cnt_per_hour',
> 'connector.document-type' = 'user_behavior',
> 'connector.bulk-flush.max-actions' = '1',
> 'format.type' = 'json',
> 'update-mode' = 'append'
> )
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.table.api.EnvironmentSettings;
> import org.apache.flink.table.api.Table;
> import org.apache.flink.table.api.java.StreamTableEnvironment;
> import org.apache.flink.types.Row;
> 
> public class ESTest {
> 
> public static void main(String[] args) throws Exception {
> 
> //2、设置运行环境
> StreamExecutionEnvironment streamEnv = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> EnvironmentSettings settings = 
> EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build();
> StreamTableEnvironment tableEnv = 
> StreamTableEnvironment.create(streamEnv, settings);
> streamEnv.setParallelism(1);
> String sinkDDL = " CREATE TABLE test_es ( hour_of_day BIGINT,  
> buy_cnt BIGINT "
> + ") WITH ( 'connector.type' = 'elasticsearch',  
> 'connector.version' = '6',"
> + "'connector.hosts' = 'http://localhost:9200',  
> 'connector.index' = 'buy_cnt_per_hour',"
> + "'connector.document-type' = 'user_behavior',"
> + "'connector.bulk-flush.max-actions' = '1',\n" + "
> 'format.type' = 'json',"
> + "'update-mode' = 'append' )";
> tableEnv.sqlUpdate(sinkDDL);
> Table table = tableEnv.sqlQuery("select * from test_es ");
> tableEnv.toRetractStream(table, Row.class).print();
> streamEnv.execute("");
> }
> 
> }
> 具体error
> The matching candidates:
> org.apache.flink.table.sources.CsvAppendTableSourceFactory
> Mismatched properties:
> 'connector.type' expects 'filesystem', but is 'elasticsearch'
> 'format.type' expects 'csv', but is 'json'
> 
> The following properties are requested:
> connector.bulk-flush.max-actions=1
> connector.document-type=user_behavior
> connector.hosts=http://localhost:9200
> connector.index=buy_cnt_per_hour
> connector.type=elasticsearch
> connector.version=6
> format.type=json
> schema.0.data-type=BIGINT
> schema.0.name=hour_of_day
> schema.1.data-type=BIGINT
> schema.1.name=buy_cnt
> update-mode=append



ddl es ????

2020-03-24 Thread ????
:
CREATE TABLE buy_cnt_per_hour (
  hour_of_day BIGINT,
  buy_cnt BIGINT
) WITH (
  'connector.type' = 'elasticsearch',
  'connector.version' = '6',
  'connector.hosts' = 'http://localhost:9200',
  'connector.index' = 'buy_cnt_per_hour',
  'connector.document-type' = 'user_behavior',
  'connector.bulk-flush.max-actions' = '1',
  'format.type' = 'json',
  'update-mode' = 'append'
)

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

public class ESTest {

public static void main(String[] args) throws Exception {

//2??
StreamExecutionEnvironment streamEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings settings = 
EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build();
StreamTableEnvironment tableEnv = 
StreamTableEnvironment.create(streamEnv, settings);
streamEnv.setParallelism(1);
String sinkDDL = " CREATE TABLE test_es ( hour_of_day BIGINT,  buy_cnt 
BIGINT "
+ ") WITH ( 'connector.type' = 'elasticsearch',  
'connector.version' = '6',"
+ "'connector.hosts' = 'http://localhost:9200',  
'connector.index' = 'buy_cnt_per_hour',"
+ "'connector.document-type' = 'user_behavior',"
+ "'connector.bulk-flush.max-actions' = '1',\n" + "
'format.type' = 'json',"
+ "'update-mode' = 'append' )";
tableEnv.sqlUpdate(sinkDDL);
Table table = tableEnv.sqlQuery("select * from test_es ");
tableEnv.toRetractStream(table, Row.class).print();
streamEnv.execute("");
}

}
errorThe matching candidates: 
org.apache.flink.table.sources.CsvAppendTableSourceFactory Mismatched 
properties: 'connector.type' expects 'filesystem', but is 'elasticsearch' 
'format.type' expects 'csv', but is 'json' The following properties are 
requested: connector.bulk-flush.max-actions=1 
connector.document-type=user_behavior connector.hosts=http://localhost:9200 
connector.index=buy_cnt_per_hour connector.type=elasticsearch 
connector.version=6 format.type=json schema.0.data-type=BIGINT 
schema.0.name=hour_of_day schema.1.data-type=BIGINT schema.1.name=buy_cnt 
update-mode=append

ddl es 报错

2020-03-24 Thread 出发
图片是我用到的属性

Re: Issues with Watermark generation after join

2020-03-24 Thread Timo Walther

Hi Dominik,

the big conceptual difference between DataStream and Table API is that 
record timestamps are part of the schema in Table API whereas they are 
attached internally to each record in DataStream API. When you call 
`y.rowtime` during a stream to table conversion, the runtime will 
extract the internal timestamp and will copy it into the field `y`.


Even if the timestamp is not internally anymore, Flink makes sure that 
the watermarking (which still happens internally) remains valid. 
However, this means that timestamps and watermarks must already be 
correct when entering the Table API. If they were not correct before, 
they will also not trigger time-based operations correctly.


If there is no output for a parallelism > 1, usually this means that one 
source parition has not emitted a watermark to have progress globally 
for the job:


watermark of operator = min(previous operator partition 1, previous 
operator partition 2, ...)


I hope this helps.

Regards,
Timo


On 19.03.20 16:38, Dominik Wosiński wrote:
I have created a simple minimal reproducible example that shows what I 
am talking about:

https://github.com/DomWos/FlinkTTF/tree/sql-ttf

It contains a test that shows that even if the output is in order which 
is enforced by multiple sleeps, then for parallelism > 1 there is no 
output and for parallelism == 1, the output is produced normally.


Best Regards,
Dom.




Re: 关于 SQL DATE_FORMAT 的时区设置的构想

2020-03-24 Thread DONG, Weike
Hi Zhenghua,

感谢您的回复。感觉既然 Flink 已经提供了时区的设定,这方面也许可以进一步增强一些。CONVERT_TZ
用户很容易忘记或者漏掉,这里还是有不少完善的空间。

Best,
Weike

On Tue, Mar 24, 2020 at 4:20 PM Zhenghua Gao  wrote:

> CURRENT_TIMESTAMP 返回值类型是 TIMESTAMP (WITHOUT TIME ZONE),
> 其语义可参考 java.time.LocalDateTime。
> 其字符形式的表示并不随着时区变化而变化(如你所见,和UTC+0 一致)。
>
> 你的需求可以通过 CONVERT_TZ(timestamp_string, time_zone_from_string,
> time_zone_to_string)
>
> *Best Regards,*
> *Zhenghua Gao*
>
>
> On Mon, Mar 23, 2020 at 10:12 PM DONG, Weike 
> wrote:
>
> > Hi,
> >
> > 近期发现 Flink 的 Blink Planner 在 DATE_FORMAT 对 CURRENT_TIMESTAMP
> > 做时间格式化为字符串时,默认以 UTC+0 为准。
> >
> > 长期以来,TableConfig 类里面有一个 setLocalTimeZone 方法;将其设置为东八区以后,发现格式化后的字符串仍然是
> UTC+0
> > 的。而深入来看,Flink 的时间格式化时的代码生成逻辑(time.scala)并未考虑时区的设置。
> >
> > 由于大多数用户的时区均不是 UTC+0(GMT、UTC),如果时间格式化、显示等都可以考虑到 TableConfig 中的时区设置,那么
> Flink
> > 是否会更用户友好一些呢?当然这个会涉及到不兼容的变更,需要谨慎一些。
> >
> > 也许提供一个 DATE_FORMAT_WITH_TIMEZONE 的内置函数,社区是否会更容易接受一些呢?
> >
> > 仅仅是个人一点想法,感谢 :)
> >
>


Re: Flink 1.10 的 JDBCUpsertOutputFormat flush方法的重试机制无法生效

2020-03-24 Thread Leonard Xu
Hi, shangwen

这应该是AppendOnlyWriter的一个bug[1], 在1.10.1/1.11-SNAPSHOT(master)中已经修复.
用1.10.1或master分支就好了,目前1.10.1还未发布,我了解到的1.10.1社区正在准备发布中。
如果急需修复,你可以参考1.10.1分支的代码。

Best,
Leonard

[1]https://issues.apache.org/jira/browse/FLINK-16281 

[2]https://github.com/apache/flink/tree/release-1.10 


> 在 2020年3月23日,11:05,shangwen <583767...@qq.com> 写道:
> 
> 我们在测试环境测试JDBC写入postgresql的场景,用tcpkill模拟链接被关闭的情况,测试对异常的兼容性,我们发现一直打印类似的日志
> 
> 
> 2020-03-20 21:16:21.247 [jdbc-upsert-output-format-thread-1] ERROR 
> org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat - JDBC 
> executeBatch error, retry times = 1
> org.postgresql.util.PSQLException: This connection has been closed.
>   at org.postgresql.jdbc.PgConnection.checkClosed(PgConnection.java:857)
>   at org.postgresql.jdbc.PgConnection.getAutoCommit(PgConnection.java:817)
>   at 
> org.postgresql.jdbc.PgStatement.internalExecuteBatch(PgStatement.java:813)
>   at org.postgresql.jdbc.PgStatement.executeBatch(PgStatement.java:873)
>   at 
> org.postgresql.jdbc.PgPreparedStatement.executeBatch(PgPreparedStatement.java:1569)
>   at 
> org.apache.flink.api.java.io.jdbc.writer.AppendOnlyWriter.executeBatch(AppendOnlyWriter.java:62)
>   at 
> org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat.flush(JDBCUpsertOutputFormat.java:159)
>   at 
> org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat.lambda$open$0(JDBCUpsertOutputFormat.java:124)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
>   at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>   at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>   at java.lang.Thread.run(Thread.java:748)
> 
> 
> 
> 从日志上看来,链接被关闭,只会导致重试一次,这里是Flink的重试代码
> //JDBCUpsertOutputFormat.javapublic synchronized void flush() throws 
> Exception {
>   checkFlushException();
> 
>   for (int i = 1; i <= maxRetryTimes; i++) {
>  try {
> jdbcWriter.executeBatch();
> batchCount = 0;
> break;
>  } catch (SQLException e) {
> LOG.error("JDBC executeBatch error, retry times = {}", i, e);
> if (i = maxRetryTimes) {
>throw e;
> }
> Thread.sleep(1000 * i);
>  }
>   }
> }
> 
> 
> 通过远程debug分析,在第一次执行
> JDBCUpsertOutputFormat.flush
>  - AppendOnlyWriter.executeBatch
>   ...
>   - PgConnection.getAutoCommit
> 抛出PSQLException: This connection has been closed时,batchStatements在这之前已经被清空
> // PgStatement.java private BatchResultHandler internalExecuteBatch() throws 
> SQLException {   // Construct query/parameter arrays.   
> transformQueriesAndParameters();   // Empty arrays should be passed to 
> toArray   // see http://shipilev.net/blog/2016/arrays-wisdom-ancients/   
> Query[] queries = batchStatements.toArray(new Query[0]);   ParameterList[] 
> parameterLists = batchParameters.toArray(new ParameterList[0]);   
> batchStatements.clear(); // 这里已经被清空   batchParameters.clear();   ...   if 
> (connection.getAutoCommit()) { // 抛出异常   flags |= 
> QueryExecutor.QUERY_SUPPRESS_BEGIN;   }   ... }
> 
> 
> 所以在Flink第二次重试执行这段代码的时候jdbcWriter.executeBatch的时候,batchStatements已经为Empty并直接返回了,导致Flink认为是成功的,这样会导致其实语句没有被执行?
> // PgStatement.java public int[] executeBatch() throws SQLException {   
> checkClosed();   closeForNextExecution();   if (batchStatements == null || 
> batchStatements.isEmpty()) { //这里就直接返回了 return new int[0];   }   return 
> internalExecuteBatch().getUpdateCount(); }
> 
> 
> 目前的想法是,当我出现异常捕获的时候,在重试之前,判断这个链接是否已经被关闭,如果已经被关闭,则重新open,不知道大家有啥看法,这是我提的issue
> https://issues.apache.org/jira/browse/FLINK-16708



Re: Need help on timestamp type conversion for Table API on Pravega Connector

2020-03-24 Thread Timo Walther

This issue is tracked under:

https://issues.apache.org/jira/browse/FLINK-16693

Could you provide us a little reproducible example in the issue? I think 
that could help us in resolving this issue quickly in the next minor 
release.


Thanks,
Timo


On 20.03.20 03:28, b.z...@dell.com wrote:

Hi,

Thanks for the reference, Jark. In Pravega connector, user will define 
Schema first and then create the table with the descriptor using the 
schema, see [1] and error also came from this test case. We also tried 
the recommended `bridgedTo(Timestamp.class)` method in the schema 
construction, it came with the same error stack trace.


We are also considering switching to Blink planner implementation, do 
you think we can get this issue fixed with the change?


Here is the full stacktrace:

```

org.apache.flink.table.codegen.CodeGenException: Unsupported cast from 
'LocalDateTime' to 'Long'.


    at 
org.apache.flink.table.codegen.calls.ScalarOperators$.generateCast(ScalarOperators.scala:815)


    at 
org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:941)


    at 
org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:66)


    at org.apache.calcite.rex.RexCall.accept(RexCall.java:191)

    at 
org.apache.flink.table.codegen.CodeGenerator.$anonfun$visitCall$1(CodeGenerator.scala:752)


    at 
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)


    at 
scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:58)


    at 
scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:51)


    at 
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)


    at 
scala.collection.TraversableLike.map(TraversableLike.scala:233)


    at 
scala.collection.TraversableLike.map$(TraversableLike.scala:226)


    at 
scala.collection.AbstractTraversable.map(Traversable.scala:104)


    at 
org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:742)


    at 
org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:66)


    at org.apache.calcite.rex.RexCall.accept(RexCall.java:191)

    at 
org.apache.flink.table.codegen.CodeGenerator.generateExpression(CodeGenerator.scala:247)


    at 
org.apache.flink.table.codegen.CodeGenerator.$anonfun$generateConverterResultExpression$1(CodeGenerator.scala:273)


    at 
org.apache.flink.table.codegen.CodeGenerator.$anonfun$generateConverterResultExpression$1$adapted(CodeGenerator.scala:269)


    at 
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)


    at 
scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:32)


    at 
scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:29)


    at 
scala.collection.mutable.ArrayOps$ofInt.foreach(ArrayOps.scala:242)


    at 
scala.collection.TraversableLike.map(TraversableLike.scala:233)


    at 
scala.collection.TraversableLike.map$(TraversableLike.scala:226)


    at 
scala.collection.mutable.ArrayOps$ofInt.map(ArrayOps.scala:242)


    at 
org.apache.flink.table.codegen.CodeGenerator.generateConverterResultExpression(CodeGenerator.scala:269)


    at 
org.apache.flink.table.plan.nodes.dataset.BatchScan.generateConversionMapper(BatchScan.scala:95)


    at 
org.apache.flink.table.plan.nodes.dataset.BatchScan.convertToInternalRow(BatchScan.scala:59)


    at 
org.apache.flink.table.plan.nodes.dataset.BatchScan.convertToInternalRow$(BatchScan.scala:35)


    at 
org.apache.flink.table.plan.nodes.dataset.BatchTableSourceScan.convertToInternalRow(BatchTableSourceScan.scala:45)


    at 
org.apache.flink.table.plan.nodes.dataset.BatchTableSourceScan.translateToPlan(BatchTableSourceScan.scala:165)


    at 
org.apache.flink.table.plan.nodes.dataset.DataSetWindowAggregate.translateToPlan(DataSetWindowAggregate.scala:114)


    at 
org.apache.flink.table.plan.nodes.dataset.DataSetCalc.translateToPlan(DataSetCalc.scala:92)


    at 
org.apache.flink.table.api.internal.BatchTableEnvImpl.translate(BatchTableEnvImpl.scala:306)


    at 
org.apache.flink.table.api.internal.BatchTableEnvImpl.translate(BatchTableEnvImpl.scala:281)


    at 
org.apache.flink.table.api.java.internal.BatchTableEnvironmentImpl.toDataSet(BatchTableEnvironmentImpl.scala:87)


    at 
io.pravega.connectors.flink.FlinkPravegaTableITCase.testTableSourceBatchDescriptor(FlinkPravegaTableITCase.java:349)


    at 
io.pravega.connectors.flink.FlinkPravegaTableITCase.testTableSourceUsingDescriptor(FlinkPravegaTableITCase.java:246)



Re: Dynamic Flink SQL

2020-03-24 Thread Arvid Heise
Hi Krzysztof,

from my past experience as data engineer, I can safely say that users often
underestimate the optimization potential and techniques of the used
systems. I implemented a similar thing in the past, where I parsed up to
500 rules reading from up to 10 data sources.
The basic idea was to simple generated one big SQL query and let the SQL
optimizer figure out what to do. And as you would have hoped, the optimizer
ultimately figured that it only needs to read each of the 10 sources once
and apply 50 aggregations on average on each of the datasets.

With that said, I'd start simple first:
* You want to use primary Table API as that allows you to programmatically
introduce structural variance (changing rules).
* You start by registering the source as temporary table.
* Then you add your rules as SQL through `TableEnvironment#sqlQuery`.
* Lastly you unionAll the results.

Then I'd perform some experiment if indeed the optimizer figured out that
it needs to only read the source once. The resulting code would be minimal
and easy to maintain. If the performance is not satisfying, you can always
make it more complicated.

Best,

Arvid


On Mon, Mar 23, 2020 at 7:02 PM Krzysztof Zarzycki 
wrote:

> Dear Flink community!
>
> In our company we have implemented a system that realize the dynamic
> business rules pattern. We spoke about it during Flink Forward 2019
> https://www.youtube.com/watch?v=CyrQ5B0exqU.
> The system is a great success and we would like to improve it. Let me
> shortly mention what the system does:
> * We have a Flink job with the engine that applies business rules on
> multiple data streams. These rules find patterns in data, produce complex
> events on these patterns.
> * The engine is built on top of CoProcessFunction, the rules are
> preimplemented using state and timers.
> * The engine accepts control messages, that deliver configuration of the
> rules, and start the instances of the rules. There might be many rule
> instances with different configurations running in parallel.
> * Data streams are routed to those rules, to all instances.
>
> The *advantages* of this design are:
>   * *The performance is superb. *The key to it is that we read data from
> the Kafka topic once, deserialize once, shuffle it once (thankfully we have
> one partitioning key) and then apply over 100 rule instances needing the
> same data.
> * We are able to deploy multiple rule instances dynamically without
> starting/stopping the job.
>
> Especially the performance is crucial, we have up to 500K events/s
> processed by 100 of rules on less than 100 of cores. I can't imagine having
> 100 of Flink SQL queries each consuming these streams from Kafka on such a
> cluster.
>
> The main *painpoints *of the design is:
> * to deploy new business rule kind, we need to predevelop the rule
> template with use of our SDK. *We can't use* *great Flink CEP*, *Flink
> SQL libraries.* Which are getting stronger every day. Flink SQL with
> MATCH_RECOGNIZE would fit perfectly for our cases.
> * The isolation of the rules is weak. There are many rules running per
> job. One fails, the whole job fails.
> * There is one set of Kafka offsets, one watermark, one checkpoint for all
> the rules.
> * We have one just distribution key. Although that can be overcome.
>
> I would like to focus on solving the *first point*. We can live with the
> rest.
>
> *Question to the community*: Do you have ideas how to make it possible to
> develop with use of Flink SQL with MATCH_RECOGNIZE?
>
> My current ideas are:
> 1. *A possibility to dynamically modify the job topology. *
> Then I imagine dynamically attaching Flink SQL jobs to the same Kafka
> sources.
>
> 2. *A possibility to save data streams internally to Flink,
> predistributed*. Then Flink SQL queries should be able to read these
> streams.
>
> The ideal imaginary solution would look that simple in use:
> CREATE TABLE my_stream(...) with (,
> cached = 'true')
> PARTITIONED BY my_partition_key
>
> (the cached table can also be a result of CREATE TABLE and INSERT INTO
> my_stream_cached SELECT ... FROM my_stream).
>
> then I can run multiple parallel Flink SQL queries reading from that
> cached table in Flink.
> These
>
> Technical implementation: Ideally, I imagine saving events in Flink state
> before they are consumed. Then implement a Flink source, that can read the
> Flink state of the state-filling job. It's a different job, I know! Of
> course it needs to run on the same Flink cluster.
> A lot of options are possible: building on top of Flink, modifying Flink
> (even keeping own fork for the time being), using an external component.
>
> In my opinion the key to the maximized performance are:
> * avoid pulling data through network from Kafka
> * avoid deserialization of messages for each of queries/ processors.
>
> Comments, ideas - Any feedback is welcome!
> Thank you!
> Krzysztof
>
> P.S.   I'm writing to both dev and users groups because I suspect I would
> need to modify Flink 

Importance of calling mapState.clear() when no entries in mapState

2020-03-24 Thread Ilya Karpov
Hi,

given: 
- flink 1.6.1
- stateful function with MapState mapState = //init logic;

Is there any reason I should call mapState.clear() if I know beforehand that 
there are no entries in mapState (e.g. mapState.iterator().hasNext() returns 
false)?

Thanks in advance!

Re: 关于 SQL DATE_FORMAT 的时区设置的构想

2020-03-24 Thread Zhenghua Gao
CURRENT_TIMESTAMP 返回值类型是 TIMESTAMP (WITHOUT TIME ZONE),
其语义可参考 java.time.LocalDateTime。
其字符形式的表示并不随着时区变化而变化(如你所见,和UTC+0 一致)。

你的需求可以通过 CONVERT_TZ(timestamp_string, time_zone_from_string,
time_zone_to_string)

*Best Regards,*
*Zhenghua Gao*


On Mon, Mar 23, 2020 at 10:12 PM DONG, Weike 
wrote:

> Hi,
>
> 近期发现 Flink 的 Blink Planner 在 DATE_FORMAT 对 CURRENT_TIMESTAMP
> 做时间格式化为字符串时,默认以 UTC+0 为准。
>
> 长期以来,TableConfig 类里面有一个 setLocalTimeZone 方法;将其设置为东八区以后,发现格式化后的字符串仍然是 UTC+0
> 的。而深入来看,Flink 的时间格式化时的代码生成逻辑(time.scala)并未考虑时区的设置。
>
> 由于大多数用户的时区均不是 UTC+0(GMT、UTC),如果时间格式化、显示等都可以考虑到 TableConfig 中的时区设置,那么 Flink
> 是否会更用户友好一些呢?当然这个会涉及到不兼容的变更,需要谨慎一些。
>
> 也许提供一个 DATE_FORMAT_WITH_TIMEZONE 的内置函数,社区是否会更容易接受一些呢?
>
> 仅仅是个人一点想法,感谢 :)
>