Re: -yD Kerberos 认证问题

2019-12-30 Thread Terry Wang
Hi ~
这个问题在最新的代码上已经修复了,在flink 1.9 上应该也是不存在这个问题的,你可以用下看看~
Best,
Terry Wang



> 2019年12月31日 14:18,  
> 写道:
> 
> 大家好
> 
> 我们这里有通过-yd动态的提交Kerberos认证参数的需求,
> 
> 想问下这个jira为啥被标记为了won’t fix,谢谢
> 
> https://issues.apache.org/jira/browse/FLINK-12130
> 



-yD Kerberos 认证问题

2019-12-30 Thread sllence
大家好

我们这里有通过-yd动态的提交Kerberos认证参数的需求,

想问下这个jira为啥被标记为了won’t fix,谢谢

https://issues.apache.org/jira/browse/FLINK-12130



回复: StreamTableEnvironment.registerDatastream() 开放用户自定义的schemaDescriptionh和DeserializationSchema

2019-12-30 Thread aven . wu
你好!
“把 JSONObject类型定义成object类型” 可以解决在确定字段和类型的情况下并且需要编码到程序中。
如果能开放这部分的能力,可以不通过编码(新增POJO)的方式来完成一个Datastream 到 stream 的table注册。

best wish
发送自 Windows 10 版邮件应用

发件人: Terry Wang
发送时间: 2019年12月30日 12:37
收件人: user-zh@flink.apache.org
主题: Re: StreamTableEnvironment.registerDatastream() 
开放用户自定义的schemaDescriptionh和DeserializationSchema

你这种需求的一种解决思路,可以把 
JSONObject类型定义成object类型,然后注册成table之后通过一个UDTF把JSONObject转换成特定的schema。

Best,
Terry Wang



> 2019年12月27日 19:56,aven.wu  写道:
> 
> StreamTableEnvironment.registerDatastream(),目前只支持pojo 或者 
> 是public属性的对象,根据默认字段或者用户指定的字段注册table,但某些场景下没有固定的格式,比如使用JSONObject类型DataStream就无法通过这种方法注册成table,是否可以提供更底层的API来使table注册的灵活性更高。用户传入schema的描述和自定义的解析器DeserializationSchema.
> 
> 
> 发送自 Windows 10 版邮件应用
> 




Re: Connect RocksDB which created by Flink checkpoint

2019-12-30 Thread Congxian Qiu
If you have specified the LOCAL_DIRECTORIES[1] , then the LOG will go into
the LOCAL_DIRECTORIES.

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/config.html#state-backend-rocksdb-localdir

Best,
Congxian


Yun Tang  于2019年12月30日周一 下午7:03写道:

> Hi Alex
>
> First of all, RocksDB is not created by Flink checkpoint mechanism.
> RocksDB would be launched once you have configured and use keyed state no
> mater whether you have ever enabled checkpoint.
>
> If you want to check configuration and data in RocksDB, please login the
> task manager node. The RocksDB folder lies in Flink temporary dir [1] which
> looks like flink-io- and the configuration is located in the file named
> as 'LOG' with RocksDB directory.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#io-tmp-dirs
>
> Best
> Yun Tang
>
> --
> *From:* qq <471237...@qq.com>
> *Sent:* Monday, December 30, 2019 18:09
> *To:* user@flink.apache.org 
> *Subject:* Connect RocksDB which created by Flink checkpoint
>
> Hi all.
>
>
> How can I connect RocksDB which created by Flink checkpoint,  aim to
> check the rocksdb configuration and data in rocksdb. Thanks very much.
>
>
>AlexFu
>
>
>
>


Re: Duplicate tasks for the same query

2019-12-30 Thread Kurt Young
BTW, you could also have a more efficient version of deduplicating
user table by using the topn feature [1].

Best,
Kurt

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/sql.html#top-n


On Tue, Dec 31, 2019 at 9:24 AM Jingsong Li  wrote:

> Hi RKandoji,
>
> In theory, you don't need to do something.
> First, the optimizer will optimize by doing duplicate nodes.
> Second, after SQL optimization, if the optimized plan still has duplicate
> nodes, the planner will automatically reuse them.
> There are config options to control whether we should reuse plan, their
> default value is true. So you don't need modify them.
> - table.optimizer.reuse-sub-plan-enabled
> - table.optimizer.reuse-source-enabled
>
> Best,
> Jingsong Lee
>
> On Tue, Dec 31, 2019 at 6:29 AM RKandoji  wrote:
>
>> Thanks Terry and Jingsong,
>>
>> Currently I'm on 1.8 version using Flink planner for stream proessing,
>> I'll switch to 1.9 version to try out blink planner.
>>
>> Could you please point me to any examples (Java preferred) using
>> SubplanReuser?
>>
>> Thanks,
>> RK
>>
>> On Sun, Dec 29, 2019 at 11:32 PM Jingsong Li 
>> wrote:
>>
>>> Hi RKandoji,
>>>
>>> FYI: Blink-planner subplan reusing: [1] 1.9 available.
>>>
>>>Join  Join
>>>  /  \  /  \
>>>  Filter1  Filter2  Filter1  Filter2
>>> ||=>   \ /
>>>  Project1 Project2Project1
>>> ||   |
>>>   Scan1Scan2   Scan1
>>>
>>>
>>> [1]
>>> https://github.com/apache/flink/blob/master/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/reuse/SubplanReuser.scala
>>>
>>> Best,
>>> Jingsong Lee
>>>
>>> On Mon, Dec 30, 2019 at 12:28 PM Terry Wang  wrote:
>>>
 Hi RKandoji~

 Could you provide more info about your poc environment?
 Stream or batch? Flink planner or blink planner?
 AFAIK, blink planner has done some optimization to deal such duplicate
 task for one same query. You can have a try with blink planner :
 https://ci.apache.org/projects/flink/flink-docs-master/dev/table/common.html#create-a-tableenvironment

 Best,
 Terry Wang



 2019年12月30日 03:07,RKandoji  写道:

 Hi Team,

 I'm doing a POC with flink to understand if it's a good fit for my use
 case.

 As part of the process, I need to filter duplicate items and created
 below query to get only the latest records based on timestamp. For
 instance, I have "Users" table which may contain multiple messages for the
 same "userId". So I wrote below query to get only the latest message for a
 given "userId"

 Table uniqueUsers = tEnv.sqlQuery("SELECT * FROM Users WHERE (userId,
 userTimestamp) IN (SELECT userId, MAX(userTimestamp) FROM Users GROUP BY
 userId)");

 The above query works as expected and contains only the latest users
 based on timestamp.

 The issue is when I use "uniqueUsers" table multiple times in a JOIN
 operation, I see multiple tasks in the flink dashboard for the same query
 that is creating "uniqueUsers" table. It is simply creating as many tasks
 as many times I'm using the table.

 Below is the JOIN query.
 tEnv.registerTable("uniqueUsersTbl", uniqueUsers);
 Table joinOut = tEnv.sqlQuery("SELECT * FROM Car c
LEFT JOIN uniqueUsersTbl aa ON
 c.userId = aa.userId
LEFT JOIN uniqueUsersTbl ab
 ON c.ownerId = ab.userId
LEFT JOIN uniqueUsersTbl ac ON
 c.sellerId = ac.userId
LEFT JOIN uniqueUsersTbl ad
 ON c.buyerId = ad.userId");

 Could someone please help me understand how I can avoid these duplicate
 tasks?


 Thanks,
 R Kandoji



>>>
>>> --
>>> Best, Jingsong Lee
>>>
>>
>
> --
> Best, Jingsong Lee
>


Re: Flink SQL + savepoint

2019-12-30 Thread Kurt Young
 I created a issue to trace this feature:
https://issues.apache.org/jira/browse/FLINK-15440

Best,
Kurt

On Tue, Dec 31, 2019 at 8:00 AM Fanbin Bu  wrote:

> Kurt,
>
> Is there any update on this or roadmap that supports savepoints with Flink
> SQL?
>
> On Sun, Nov 3, 2019 at 11:25 PM Kurt Young  wrote:
>
>> It's not possible for SQL and Table API jobs playing with savepoints yet,
>> but I
>> think this is a popular requirement and we should definitely discuss the
>> solutions
>> in the following versions.
>>
>> Best,
>> Kurt
>>
>> On Sat, Nov 2, 2019 at 7:24 AM Fanbin Bu  wrote:
>>
>>> Kurt,
>>>
>>> What do you recommend for Flink SQL to use savepoints?
>>>
>>>
>>>
>>> On Thu, Oct 31, 2019 at 12:03 AM Yun Tang  wrote:
>>>
 Hi Fanbin



 If you do not change the parallelism or add and remove operators, you
 could still use savepoint to resume your jobs with Flink SQL.



 However, as far as I know, Flink SQL might not configure the uid
 currently and I’m pretty sure blink branch contains this part of setting
 uid to stream node. [1]



 Already CC Kurt as he could provide more detail information of this.



 [1]
 https://github.com/apache/flink/blob/blink/flink-libraries/flink-table/src/main/java/org/apache/flink/table/util/resource/StreamNodeUtil.java#L44



 Best

 Yun Tang





 *From: *Fanbin Bu 
 *Date: *Thursday, October 31, 2019 at 1:17 PM
 *To: *user 
 *Subject: *Flink SQL + savepoint



 Hi,



 it is highly recommended that we assign the uid to the operator for the
 sake of savepoint. How do we do this for Flink SQL? According to
 https://stackoverflow.com/questions/55464955/how-to-add-uid-to-operator-in-flink-table-api,
 it is not possible.



 Does that mean, I can't use savepoint to restart my program if I use
 Flink SQL?



 Thanks,



 Fanbin

>>>


Re: Duplicate tasks for the same query

2019-12-30 Thread Jingsong Li
Hi RKandoji,

In theory, you don't need to do something.
First, the optimizer will optimize by doing duplicate nodes.
Second, after SQL optimization, if the optimized plan still has duplicate
nodes, the planner will automatically reuse them.
There are config options to control whether we should reuse plan, their
default value is true. So you don't need modify them.
- table.optimizer.reuse-sub-plan-enabled
- table.optimizer.reuse-source-enabled

Best,
Jingsong Lee

On Tue, Dec 31, 2019 at 6:29 AM RKandoji  wrote:

> Thanks Terry and Jingsong,
>
> Currently I'm on 1.8 version using Flink planner for stream proessing,
> I'll switch to 1.9 version to try out blink planner.
>
> Could you please point me to any examples (Java preferred) using
> SubplanReuser?
>
> Thanks,
> RK
>
> On Sun, Dec 29, 2019 at 11:32 PM Jingsong Li 
> wrote:
>
>> Hi RKandoji,
>>
>> FYI: Blink-planner subplan reusing: [1] 1.9 available.
>>
>>Join  Join
>>  /  \  /  \
>>  Filter1  Filter2  Filter1  Filter2
>> ||=>   \ /
>>  Project1 Project2Project1
>> ||   |
>>   Scan1Scan2   Scan1
>>
>>
>> [1]
>> https://github.com/apache/flink/blob/master/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/reuse/SubplanReuser.scala
>>
>> Best,
>> Jingsong Lee
>>
>> On Mon, Dec 30, 2019 at 12:28 PM Terry Wang  wrote:
>>
>>> Hi RKandoji~
>>>
>>> Could you provide more info about your poc environment?
>>> Stream or batch? Flink planner or blink planner?
>>> AFAIK, blink planner has done some optimization to deal such duplicate
>>> task for one same query. You can have a try with blink planner :
>>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/common.html#create-a-tableenvironment
>>>
>>> Best,
>>> Terry Wang
>>>
>>>
>>>
>>> 2019年12月30日 03:07,RKandoji  写道:
>>>
>>> Hi Team,
>>>
>>> I'm doing a POC with flink to understand if it's a good fit for my use
>>> case.
>>>
>>> As part of the process, I need to filter duplicate items and created
>>> below query to get only the latest records based on timestamp. For
>>> instance, I have "Users" table which may contain multiple messages for the
>>> same "userId". So I wrote below query to get only the latest message for a
>>> given "userId"
>>>
>>> Table uniqueUsers = tEnv.sqlQuery("SELECT * FROM Users WHERE (userId,
>>> userTimestamp) IN (SELECT userId, MAX(userTimestamp) FROM Users GROUP BY
>>> userId)");
>>>
>>> The above query works as expected and contains only the latest users
>>> based on timestamp.
>>>
>>> The issue is when I use "uniqueUsers" table multiple times in a JOIN
>>> operation, I see multiple tasks in the flink dashboard for the same query
>>> that is creating "uniqueUsers" table. It is simply creating as many tasks
>>> as many times I'm using the table.
>>>
>>> Below is the JOIN query.
>>> tEnv.registerTable("uniqueUsersTbl", uniqueUsers);
>>> Table joinOut = tEnv.sqlQuery("SELECT * FROM Car c
>>>LEFT JOIN uniqueUsersTbl aa ON
>>> c.userId = aa.userId
>>>LEFT JOIN uniqueUsersTbl ab
>>> ON c.ownerId = ab.userId
>>>LEFT JOIN uniqueUsersTbl ac ON
>>> c.sellerId = ac.userId
>>>LEFT JOIN uniqueUsersTbl ad
>>> ON c.buyerId = ad.userId");
>>>
>>> Could someone please help me understand how I can avoid these duplicate
>>> tasks?
>>>
>>>
>>> Thanks,
>>> R Kandoji
>>>
>>>
>>>
>>
>> --
>> Best, Jingsong Lee
>>
>

-- 
Best, Jingsong Lee


Flink Weekly | 每周社区动态更新 - 2019/12/31

2019-12-30 Thread zhisheng
大家好,

很高兴与大家分享本周的社区摘要,其中包括讨论在 Flink SQL 中支持 JSON functions,新增 Flink
国内社区的活动和相关博客,以及汇总中文邮件中大家遇到的问题。



Flink 开发

=

* [SQL] Forward Xu 发起了一个讨论,要在 Flink SQL 中支持 JSON functions,最后将讨论的结果和想法记录在了
FLIP-90

[1]

* [SQL] Jark Wu 发起一个讨论,建议在 1.10 release 之前将 Table API 和 SQL 中的
Time-windowed Join 改为 Interval Join [2]

* [core]Stephan Ewen 之前发起的一个讨论,关于新的 TaskManager 内存配置 FLIP-49

[3]


[1]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Support-JSON-functions-in-Flink-SQL-td32674.html

[2]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Correct-the-terminology-of-quot-Time-windowed-Join-quot-to-quot-Interval-Join-quot-in-Table-L-td36202.html

[3]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Some-feedback-after-trying-out-the-new-FLIP-49-memory-configurations-td36129.html



已知缺陷

=

[FLINK-15421 ] [1.9.1,
1.10.0] roupAggsHandler throws java.time.LocalDateTime cannot be cast to
java.sql.Timestamp [4]

[FLINK-15420 ] [1.10.0]
Cast string to timestamp will loose precision [5]

[FLINK-15418 ] [1.9.1,
1.10.0] StreamExecMatchRule not set FlinkRelDistribution [6]

[FLINK-15411 ] [1.10.0]
Planner can't prune partition on DATE/TIMESTAMP columns [7]

[FLINK-15406 ] [1.9.1]
The savepoint is writted by "State Processor API" can't be restore by map
or flatmap [8]

[FLINK-15381 ] [1.10.0]
INSERT INTO VALUES statement fails if a cast project is applied [9]

[FLINK-15370 ] [1.10.0,
1.11.0] Configured write buffer manager actually not take effect in
RocksDB's DBOptions  在 DBOptions 中设置 WriteBufferManager 的值不起作用 [10]

[FLINK-15361 ] [1.9.1]
ParquetTableSource should pass predicate in projectFields [11]

[4] https://issues.apache.org/jira/browse/FLINK-15421

[5] https://issues.apache.org/jira/browse/FLINK-15420

[6] https://issues.apache.org/jira/browse/FLINK-15418

[7] https://issues.apache.org/jira/browse/FLINK-15411

[8] https://issues.apache.org/jira/browse/FLINK-15406

[9] https://issues.apache.org/jira/browse/FLINK-15381

[10] https://issues.apache.org/jira/browse/FLINK-15370

[11] https://issues.apache.org/jira/browse/FLINK-15361



活动/博客文章/其他

=

* 社区钉钉群群直播——《基于 Apache Flink 的监控告警系统》zhisheng

PPT 下载地址 [12]直播视频回放地址 [13]

* 年度回顾 | 2019 年的 Apache Flink
  伍翀(云邪) [14]


[12]
https://files.alicdn.com/tpsservice/61774ae8080cd6b28145c32c99ad026a.pdf

[13] https://www.bilibili.com/video/av80497878/

[14] https://mp.weixin.qq.com/s/0FsRGoBZaAnRYZ1BuzJ_CQ



中文邮件问题/答疑汇总

=

* StreamTableEnvironment.registerDatastream()
开放用户自定义的schemaDescriptionh和DeserializationSchema [15]

* 在 1.8 版本使用 yarn session 模式时是 1 个 TaskManager 一个 CPU,当切换到 1.9.1
时使用同样的启动命令发现一个 slot 使用一个 CPU [16]

* Flink1.9批任务yn和ys对任务的影响 [17]

* flink sql 1.9.0如何创建elasticsearch动态索引表 [18]

* source并行度不同导致任务没有数据落地 [19]

* Flink 1.9 SQL Kafka Connector,Json format,how to deal with not json
message [20]

* Flink实现Kafka到Mysql的 End-To-End Exactly-Once中遇到的问题 [21]

* Rewind offset to a previous position and ensure certainty [22]

* 关于 Flink historyserver没有completed-jobs的问题 [23]

* flink 维表关联,当维表更新后,如何将之前关联的数据进行更新 [24]

* The assigned slot was removed 分配好的slot突然就被remove了,导致作业重启 [25]

* flink checkpoint配置hdfs问题,如何配置高可用 [26]

* 实现一个两阶段提交的ETL,数据从kafka到mysql,遇到的异常问题 [27]

* using thin jar to replace fat jar on yarn cluster mode [28]

* 关于Window ProcessFunction数据丢失问题 [29]

* 关于flink窗口是否正确关闭的问题 [30]

* CEP匹配乱序数据的问题 [31]


[15]
http://apache-flink.147419.n8.nabble.com/StreamTableEnvironment-registerDatastream-schemaDescriptionh-DeserializationSchema-tp1347.html

[16] http://apache-flink.147419.n8.nabble.com/slot-td1345.html

[17] http://apache-flink.147419.n8.nabble.com/Flink1-9-yn-ys-tp1313.html

[18]
http://apache-flink.147419.n8.nabble.com/flink-sql-1-9-0-elasticsearch-tp1342.html

[19] http://apache-flink.147419.n8.nabble.com/source-tp1336.html

[20]
http://apache-flink.147419.n8.nabble.com/Flink-1-9-SQL-Kafka-Connector-Json-format-how-to-deal-with-not-json-message-tp1335.html

[21]
http://apache-flink.147419.n8.nabble.com/Flink-Kafka-Mysql-End-To-End-Exactly-Once-tp1321.html

[22]
http://apache-flink.147419.n8.nabble.com/Rewind-offset-to-a-previous-position-and-ensure-certainty-tp1324.html

[23]
http://apache-flink.147419.n8.nabble.com/FLink-historyserver-completed-jobs-tp1320.html

[24] 

Re: Flink SQL + savepoint

2019-12-30 Thread Fanbin Bu
Kurt,

Is there any update on this or roadmap that supports savepoints with Flink
SQL?

On Sun, Nov 3, 2019 at 11:25 PM Kurt Young  wrote:

> It's not possible for SQL and Table API jobs playing with savepoints yet,
> but I
> think this is a popular requirement and we should definitely discuss the
> solutions
> in the following versions.
>
> Best,
> Kurt
>
> On Sat, Nov 2, 2019 at 7:24 AM Fanbin Bu  wrote:
>
>> Kurt,
>>
>> What do you recommend for Flink SQL to use savepoints?
>>
>>
>>
>> On Thu, Oct 31, 2019 at 12:03 AM Yun Tang  wrote:
>>
>>> Hi Fanbin
>>>
>>>
>>>
>>> If you do not change the parallelism or add and remove operators, you
>>> could still use savepoint to resume your jobs with Flink SQL.
>>>
>>>
>>>
>>> However, as far as I know, Flink SQL might not configure the uid
>>> currently and I’m pretty sure blink branch contains this part of setting
>>> uid to stream node. [1]
>>>
>>>
>>>
>>> Already CC Kurt as he could provide more detail information of this.
>>>
>>>
>>>
>>> [1]
>>> https://github.com/apache/flink/blob/blink/flink-libraries/flink-table/src/main/java/org/apache/flink/table/util/resource/StreamNodeUtil.java#L44
>>>
>>>
>>>
>>> Best
>>>
>>> Yun Tang
>>>
>>>
>>>
>>>
>>>
>>> *From: *Fanbin Bu 
>>> *Date: *Thursday, October 31, 2019 at 1:17 PM
>>> *To: *user 
>>> *Subject: *Flink SQL + savepoint
>>>
>>>
>>>
>>> Hi,
>>>
>>>
>>>
>>> it is highly recommended that we assign the uid to the operator for the
>>> sake of savepoint. How do we do this for Flink SQL? According to
>>> https://stackoverflow.com/questions/55464955/how-to-add-uid-to-operator-in-flink-table-api,
>>> it is not possible.
>>>
>>>
>>>
>>> Does that mean, I can't use savepoint to restart my program if I use
>>> Flink SQL?
>>>
>>>
>>>
>>> Thanks,
>>>
>>>
>>>
>>> Fanbin
>>>
>>


Re: Duplicate tasks for the same query

2019-12-30 Thread RKandoji
Thanks Terry and Jingsong,

Currently I'm on 1.8 version using Flink planner for stream proessing, I'll
switch to 1.9 version to try out blink planner.

Could you please point me to any examples (Java preferred) using
SubplanReuser?

Thanks,
RK

On Sun, Dec 29, 2019 at 11:32 PM Jingsong Li  wrote:

> Hi RKandoji,
>
> FYI: Blink-planner subplan reusing: [1] 1.9 available.
>
>Join  Join
>  /  \  /  \
>  Filter1  Filter2  Filter1  Filter2
> ||=>   \ /
>  Project1 Project2Project1
> ||   |
>   Scan1Scan2   Scan1
>
>
> [1]
> https://github.com/apache/flink/blob/master/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/reuse/SubplanReuser.scala
>
> Best,
> Jingsong Lee
>
> On Mon, Dec 30, 2019 at 12:28 PM Terry Wang  wrote:
>
>> Hi RKandoji~
>>
>> Could you provide more info about your poc environment?
>> Stream or batch? Flink planner or blink planner?
>> AFAIK, blink planner has done some optimization to deal such duplicate
>> task for one same query. You can have a try with blink planner :
>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/common.html#create-a-tableenvironment
>>
>> Best,
>> Terry Wang
>>
>>
>>
>> 2019年12月30日 03:07,RKandoji  写道:
>>
>> Hi Team,
>>
>> I'm doing a POC with flink to understand if it's a good fit for my use
>> case.
>>
>> As part of the process, I need to filter duplicate items and created
>> below query to get only the latest records based on timestamp. For
>> instance, I have "Users" table which may contain multiple messages for the
>> same "userId". So I wrote below query to get only the latest message for a
>> given "userId"
>>
>> Table uniqueUsers = tEnv.sqlQuery("SELECT * FROM Users WHERE (userId,
>> userTimestamp) IN (SELECT userId, MAX(userTimestamp) FROM Users GROUP BY
>> userId)");
>>
>> The above query works as expected and contains only the latest users
>> based on timestamp.
>>
>> The issue is when I use "uniqueUsers" table multiple times in a JOIN
>> operation, I see multiple tasks in the flink dashboard for the same query
>> that is creating "uniqueUsers" table. It is simply creating as many tasks
>> as many times I'm using the table.
>>
>> Below is the JOIN query.
>> tEnv.registerTable("uniqueUsersTbl", uniqueUsers);
>> Table joinOut = tEnv.sqlQuery("SELECT * FROM Car c
>>LEFT JOIN uniqueUsersTbl aa ON
>> c.userId = aa.userId
>>LEFT JOIN uniqueUsersTbl ab
>> ON c.ownerId = ab.userId
>>LEFT JOIN uniqueUsersTbl ac ON
>> c.sellerId = ac.userId
>>LEFT JOIN uniqueUsersTbl ad
>> ON c.buyerId = ad.userId");
>>
>> Could someone please help me understand how I can avoid these duplicate
>> tasks?
>>
>>
>> Thanks,
>> R Kandoji
>>
>>
>>
>
> --
> Best, Jingsong Lee
>


Re: Stateful function metrics

2019-12-30 Thread Dan Pettersson
Hi Igal and Thanks for your quick response and yes, you got my second
question right.

I'm a building a small PoC around fraudulent trades and in short, I've
fine-grained the
functions to the level participantId + "::" + instrumentId (ie
"BankA::AMAZON")

In this flow of stock exchange messages, there are messages that tells the
market
if the instrument has opened, is being halted or being closed for the day.

These messages come on instrumentId level so I have to route these message
to all functions with the key participantId + "::" + (actual)instrumentId.
I had hoped to be able to get a copy of all functions from the repository
to loop thru them and dispatch but I can't find a way to get hold of them.
Is there any way I can get them?

I haven't studied the core functionality enough but could it be an option
to open up the repository and return a copy of the
ObjectOpenHashMap that holds all the functions? I guess it's not a common
requirement so to keep them hidden is probably the best option.

As a workaround, I've created "Function listeners" where functions can
subscribe to a certain type of message.

For example, FnIsClosingNotifier (key is instrumentId) is holding a
PersistenceValue with all the function addresses
that subscribe to an instrument closing message. The subscription is done
from other functions in the configuration by just sending
a "Protobuf empty message" and when the closing message arrives the
dispatch to the listeners is done in FnIsClosingNotifier.

Is there a better way that you can think of to implement this kind of
requirement, where one message should be sent to (on beforehand not known)
several subscribing functions.

Below is some code that hopefully describes my current implementation to
subscribe to a certain type of message.

The function that wants to be notified when the closing message arrives.
This function has the id participantId::InstrumentId

[image: image.png]

And the notifier that holds all subscribers Addresses in the
persistenceValue "listeners"
[image: image.png]
Regards
Dan

Den mån 30 dec. 2019 kl 00:50 skrev Igal Shilman :

> Hi Dan,
>
> You can learn more about Flink’s metrics system at [1]
> You would be able to either setup a reporter that would export the metrics
> to an external system, or query the metrics via the REST API, or simply use
> Flink’s web ui to obtain them.
>
> If I understand the second part of your question correctly - you have a
> persisted value in a base class, but few different function types that
> derive from that base class, and you are wondering what is the scope of
> that persisted value?
> If that is the question, then the scope is bound to the function
> address(type+id) and not to the Java instance.
> So it is safe.
>
> [1] -
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/monitoring/metrics.html
>
> Happy hacking,
> Igal
>
>
> On Sunday, December 29, 2019, Dan Pettersson 
> wrote:
>
>> Hi all
>>
>> I'm trying to get hold of some metrics from the functions that I have
>> created but can't find a way to get them. It's the metrics mentioned here
>> I'm interested about:
>> https://statefun.io/deployment_operations/metrics.html
>> Any suggestions are appreciated.
>>
>> I also have a question regarding "best practice" when dealing with
>> function that share business logic. Is it safe storage wise to extends an
>> Abstract class that holds the persistent values?
>>
>> Thanks in advance and Happy coding during the holidays :-)
>>
>> Regards
>> Dan
>>
>


Re: Submit high version compiled code jar to low version flink cluster?

2019-12-30 Thread Yun Tang
Hi Lei

It's better to use the SAME version to submit job from client side. Even the 
major version of Flink is the same, the compatibility has not been declared to 
support. There exist a known issue due to some classes missing 
'serialVersionUID'. [1]

[1] https://issues.apache.org/jira/browse/FLINK-13910

Best
Yun Tang

From: tison 
Sent: Monday, December 30, 2019 15:44
To: wangl...@geekplus.com.cn 
Cc: user 
Subject: Re: Submit high version compiled code jar to low version flink cluster?

It possibly fails with incompatibility. Flink doesn't promise such 
compatibility but it MIGHT work.

Best,
tison.


wangl...@geekplus.com.cn 
mailto:wangl...@geekplus.com.cn>> 于2019年12月30日周一 
下午3:17写道:

The flink cluster version is 1.8.2
The application source code needs some feature only supported in 1.9.1.  So it 
is compiled with flink-1.9.1 denendency and builed to a fat jar with all the 
flink dependencies.
What it will happen if I submit the high version builed jar to the low verion 
flink cluster?

Thansk,
Lei






Re: Connect RocksDB which created by Flink checkpoint

2019-12-30 Thread Yun Tang
Hi Alex

First of all, RocksDB is not created by Flink checkpoint mechanism. RocksDB 
would be launched once you have configured and use keyed state no mater whether 
you have ever enabled checkpoint.

If you want to check configuration and data in RocksDB, please login the task 
manager node. The RocksDB folder lies in Flink temporary dir [1] which looks 
like flink-io- and the configuration is located in the file named as 'LOG' 
with RocksDB directory.

[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#io-tmp-dirs

Best
Yun Tang


From: qq <471237...@qq.com>
Sent: Monday, December 30, 2019 18:09
To: user@flink.apache.org 
Subject: Connect RocksDB which created by Flink checkpoint

Hi all.


How can I connect RocksDB which created by Flink checkpoint,  aim to check 
the rocksdb configuration and data in rocksdb. Thanks very much.


   AlexFu





Connect RocksDB which created by Flink checkpoint

2019-12-30 Thread qq
Hi all.


How can I connect RocksDB which created by Flink checkpoint,  aim to check 
the rocksdb configuration and data in rocksdb. Thanks very much.


   AlexFu





clean package maven dependency

2019-12-30 Thread 陈赋赟
HI ALL
  i want to package  project ,but when mvn clean package 
executed,then throw exception  
which repository should i use?
>>  
[ERROR] Failed to execute goal on project flink-dist_2.11: Could not resolve 
dependencies for project org.apache.flink:flink-dist_2.11:jar:1.8-SNAPSHOT: The 
following artifacts could not be resolved: 
org.apache.flink:flink-examples-streaming-state-machine_2.11:jar:1.8-SNAPSHOT, 
org.apache.flink:flink-examples-streaming-twitter_2.11:jar:1.8-SNAPSHOT, 
org.apache.flink:flink-ml-uber_2.11:jar:1.8-SNAPSHOT: Failure to find 
org.apache.flink:flink-examples-streaming-state-machine_2.11:jar:1.8-SNAPSHOT 
in http://10.139.32.180:8080/nexus/content/groups/public was cached in the 
local repository, resolution will not be reattempted until the update interval 
of nexus has elapsed or updates are forced -> [Help 1]





Re: Exactly-once ambiguities

2019-12-30 Thread Alessandro Solimando
> Regarding the event-time processing and watermarking, I have got that if
> an event will be received late, after the allowed lateness time, it will be
> dropped even though I think it is an antithesis of exactly-once semantic.
>
> Yes, allowed lateness is a compromise between exactly-once semantic and
> acceptable delay of streaming application. Flink cannot ensure all data
> sources could generate data without any late which is not the scope of a
> streaming system should do. If you really need to the exactly once in
> event-time processing in this scenario, I suggest to run a batch job later
> to consume all data source and use that result as a credible one. For
> processing-time data, use Flink to generate a credible result is enough.
>

The default behavior is to drop late event, but you can tolerate as much
lateness as you need via `allowedLateness()` (Window parameter) and
re-trigger the window computation taking also into account late events. Of
course the memory consumption increases at the increase of the allowed
lateness, and in streaming scenarios you usually go for a sensible
trade-off as Yun Tang was mentioning. To selectively store late events for
further processing, you can use a custom `ProcessFunction` which sends late
events to a SideOutput, and store them somewhere (e.g., HDFS).

Best regards,
Alessandro


Re: An issue with low-throughput on Flink 1.8.3 running Yahoo streaming benchmarks

2019-12-30 Thread vino yang
Hi Shinhyung,

Can you compare the performance of the different Flink versions based on
the same environment (Or at least the same configuration of the node and
framework)?

I see there are some different configurations of both clusters and
frameworks. It would be better to comparison in the same environment so
that we can figure out why there are more than 4x performance differences.

WDYT?

Best,
Vino

Shinhyung Yang  于2019年12月30日周一 下午1:45写道:

> Dear Flink Users,
>
> I'm running the Yahoo streaming benchmarks (the original version) [1]
> on Flink 1.8.3 and got 60K tuples per second. Because I got 282K
> tuples per second with Flink 1.1.3, I would like to ask your opinions
> where I should look at.
>
> I have been using one node for a JobManager and 10 nodes for a
> TaskManager per each.
>
> Below is my current setting for the benchmark and Flink 1.8.3:
>
> * 16 vCPUs and 24 GiB for the JobManager node
> * 32 vCPUs and 32 GiB for each TaskManager node
>
> # localConf.yaml
> kafka.partitions: 5
> process.hosts: 1
> process.cores: 32
>
> # flink-conf.yaml
> jobmanager.heap.size: 5120m
> taskmanager.heap.size: 20480m
> taskmanager.numberOfTaskSlots: 16
> parallelism.default: 1
>
> And the following is the previous settings for the benchmark and Flink
> 1.1.3:
>
> * 16 vCPUs and 24 GiB for the JobManager node and 10 TaskManager nodes
>
> #localConf.yaml
> kafka.partitions: 5
> process.hosts: 1
> process.cores: 16
>
> # flink-conf.yaml
> jobmanager.heap.mb: 1024
> taskmanager.heap.mb: 15360
> taskmanager.numberOfTaskSlots: 16
> taskmanager.memory.preallocate: false
> parallelism.default: 1
> taskmanager.network.numberOfBuffers: 6432
>
>
> Thank you and with best regards,
> Shinhyung Yang
>
> [1]: https://github.com/yahoo/streaming-benchmarks
>