Re: Lateral join not finding correlate variable

2020-11-19 Thread godfrey he
Hi Dylan,

I have reproduced your issue based on your code,
Currently Flink does not support such nested correlate pattern query.
I have created a issue to track this [1].
Thanks for your reporting and help.

[1] https://issues.apache.org/jira/browse/FLINK-20255

Best,
Godfrey

Dylan Forciea  于2020年11月19日周四 下午12:10写道:

> Godfrey,
>
>
>
> I confirmed that in Flink 1.11.2 and in 1.12-SNAPSHOT I get the stack
> trace running exactly this code:
>
>
>
> import org.apache.flink.api.scala._
>
> import org.apache.flink.core.fs.FileSystem.WriteMode
>
> import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
>
> import org.apache.flink.table.api._
>
> import org.apache.flink.table.api.bridge.scala._
>
> import org.apache.flink.types.Row
>
> import org.apache.flink.table.annotation.FunctionHint
>
> import org.apache.flink.table.annotation.DataTypeHint
>
> import org.apache.flink.table.functions.TableFunction
>
>
>
>
>
> @FunctionHint(output = new DataTypeHint("ROW"))
>
> class SplitStringToRows extends TableFunction[Row] {
>
>   def eval(str: String, separator: String = ";"): Unit = {
>
> if (str != null) {
>
>   str.split(separator).foreach(s => collect(Row.of(s.trim(
>
> }
>
>   }
>
> }
>
> object Job {
>
>
>
>   def main(args: Array[String]): Unit = {
>
> val settings = EnvironmentSettings
> .newInstance().useBlinkPlanner().inStreamingMode().build()
>
> val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
>
> val streamTableEnv = StreamTableEnvironment.create(streamEnv,
> settings)
>
>
>
> streamTableEnv.createTemporarySystemFunction(
>
>   "SplitStringToRows",
>
>   classOf[SplitStringToRows]
>
> ) // Class defined in previous email
>
>
>
> streamTableEnv.executeSql(
>
>   """
>
>   CREATE TABLE table1 (
>
> id_source BIGINT PRIMARY KEY,
>
> attr1_source STRING,
>
> attr2 STRING
>
>   ) WITH (
>
>'connector' = 'jdbc',
>
>'url' = 'jdbc:postgresql://host.domain.com/db1?ssl=true',
>
>'table-name' = '',
>
>'username' = '',
>
>'password' = '',
>
>'scan.fetch-size' = '500',
>
>'scan.auto-commit' = 'false')
>
> """)
>
>
>
> streamTableEnv.executeSql(
>
>   """
>
>   CREATE TABLE table2 (
>
> attr1_source STRING,
>
> attr2 STRING,
>
> attr3 DECIMAL,
>
> attr4 DATE
>
>   ) WITH (
>
>'connector' = 'jdbc',
>
>'url' = 'jdbc:postgresql://host.domain.com/db1?ssl=true',
>
>'table-name' = '',
>
>'username' = '',
>
>'password' = '',
>
>'scan.fetch-size' = '500',
>
>'scan.auto-commit' = 'false')
>
> """)
>
>
>
> val q1 = streamTableEnv.sqlQuery("""
>
>   SELECT
>
> id_source AS id,
>
> attr1_source AS attr1,
>
> attr2
>
>   FROM table1
>
> """)
>
> streamTableEnv.createTemporaryView("view1", q1)
>
>
>
> val q2 = streamTableEnv.sqlQuery(
>
>   """
>
> SELECT
>
>   a.attr1 AS attr1,
>
>   attr2,
>
>   attr3,
>
>   attr4
>
> FROM table2 p, LATERAL TABLE(SplitStringToRows(p.attr1_source,
> ';')) AS a(attr1)
>
> """)
>
> streamTableEnv.createTemporaryView("view2", q2)
>
>
>
> val q3 = streamTableEnv.sqlQuery("""
>
> SELECT
>
>   w.attr1,
>
>   p.attr3
>
> FROM view1 w
>
> LEFT JOIN LATERAL (
>
>   SELECT
>
> attr1,
>
> attr3
>
>   FROM (
>
> SELECT
>
>   attr1,
>
>   attr3,
>
>   ROW_NUMBER() OVER (
>
> PARTITION BY attr1
>
> ORDER BY
>
>   attr4 DESC NULLS LAST,
>
>   w.attr2 = attr2 DESC NULLS LAST
>
>   ) AS row_num
>
>   FROM view2)
>
>   WHERE row_num = 1) p
>
> ON (w.attr1 = p.attr1)
>
> """)
>
> streamTableEnv.createTemporaryView("view3", q3)
>
>
>
> val view3 = streamTableEnv.from("view3")
>
>
>
> view3
>
>   .toRetractStream[Row]
>
>   .writeAsCsv("./view3.csv", WriteMode.OVERWRITE)
>
>   .setParallelism(1)
>
>
>
> streamEnv.execute()
>
>   }
>
> }
>
>
>
> Thanks,
>
> Dylan Forciea
>
>
>
> *From: *godfrey he 
> *Date: *Wednesday, November 18, 2020 at 8:29 PM
> *To: *Dylan Forciea 
> *Cc: *"user@flink.apache.org" 
> *Subject: *Re: Lateral join not finding correlate variable
>
>
>
> Dylan,
>
>
>
> Thanks for you feedback, if the planner encounters
>
> "unexpected correlate variable $cor2 in the plan" exception,
>
> There's a high probability that FlinkDecorrelateProgram has some bugs
>
> or the query pattern is not supported now. I try to use JDBC Connector as
> the input tables,
>
> but I still don't reproduce the exception. Could you provide your full
> code, including ddl, query, etc.
>
> Thanks so much.
>
>
>
> Best,
>
> Godfrey
>
>
>
>
>
>
>
> Dylan Forciea  于2020年11月18日周三 

关于CatalogPartitionSpec类的一些想法

2020-11-19 Thread Jun Zhang
大家好:
我现在在使用flink中Catalog的查询分区的方法:Catalog#listPartitions(ObjectPath,
CatalogPartitionSpec)的时候遇到一个问题。
   我发现CatalogPartitionSpec中存放分区信息的字partitionSpec类型是Map类型是否会更加合理和通用呢?

 谢谢


Re: Is possible that make two operators always locate in same taskmanager?

2020-11-19 Thread Arvid Heise
I still haven't fully understood. Do you mean you can't infer the timestamp
in source A because it depends on some internal field of source B?

How is that actually working in a parallel setting? Which timestamp is used
in the different instances of a source?

Say, we have task A1 which is the first subtask of source A and task B2 as
the second subtask of source B. How would you like them to be located? How
does that correlate to the third subtask of the join (let's call it J3).

Remember that through the shuffling before the join there is no clear
correlation between any subtask of A or B to J...

On Fri, Nov 20, 2020 at 3:58 AM Si-li Liu  wrote:

> Thanks for your help!
>
> Now the timestamps already go with the items in streaming. My streaming
> pipeline is like this:
>
> source -> parser --shuffle--> join -> sink
>
> Streaming A and streaming B go through this pipeline, I keep logs in
> streaming A in memory cache (linkedHashmap) in join operator, then all logs
> in streaming B tries to lookup up the cache and perform the actual join
> work.
>
> I try to use the timestamp of the lastest expire item in memory as a safe
> rollback timestamp, if I restart job, the source should use this timestamp
> as start offset. The safe rollback timestamp is calucated in join operator,
> but I want to use it in source. So the simplest way to pass this
> information from join operator to source is use static variable, which
> require source operator and join operator always locate in same TM process.
>
> Arvid Heise  于2020年11月20日周五 上午3:33写道:
>
>> Hi Si-li,
>>
>> couldn't you also add the timestamp as a state to the source? So the time
>> would store the timestamp of the last emitted record.
>> It's nearly identical to your solution but would fit the recovery model
>> of Flink much better.
>> If you want to go further back to account for the records that have been
>> actually processed in the join, you could also replay the data from > timestamp> - .
>>
>> On Mon, Nov 16, 2020 at 8:39 AM Si-li Liu  wrote:
>>
>>> Thanks, I'll try it.
>>>
>>> Matthias Pohl  于2020年11月14日周六 上午12:53写道:
>>>
 Hi Si-li,
 trying to answer your initial question: Theoretically, you could try
 using the co-location constraints to achieve this. But keep in mind that
 this might lead to multiple Join operators running in the same JVM reducing
 the amount of memory each operator can utilize.

 Best,
 Matthias

 On Mon, Nov 9, 2020 at 4:23 AM Si-li Liu  wrote:

> Thanks for your reply.
>
> It's a streaming job. The join operator is doing join work, such as
> join. The join state is too large so I don't want to keep the state using
> the mechanism that Flink provided, and also I don't need very precise 
> join.
> So I prefer to let the join operator to calculate a backward timestamp as
> state, if the cluster restarts, the consumer can use setStartFromTimestamp
> to start from that timestamp.
>
> Now my problem is, consumer can't read the state that join operator
> written, so I need a way to need small message (64bit long) from 
> downstream
> to upstream. Redis may be a solution, but add external  dependency is a
> secondary option if I can pass this message through memory.
>
>
> Chesnay Schepler  于2020年11月6日周五 上午7:06写道:
>
>> It would be good if you could elaborate a bit more on your use-case.
>> Are you using batch or streaming? What kind of "message" are we
>> talking about? Why are you thinking of using a static variable, instead 
>> of
>> just treating this message as part of the data(set/stream)?
>>
>> On 11/5/2020 12:55 PM, Si-li Liu wrote:
>>
>> Currently I use Flink 1.9.1. The actual thing I want to do is send
>> some messages from downstream operators to upstream operators, which I
>> consider use static variable.
>>
>> But it makes me have to make sure in one taskmanager process it
>> always has these two operators, can I use CoLocationGroup to solve this
>> problem? Or can anyone give me an example to demostrate the usage
>> of CoLocationGroup ?
>>
>> Thanks!
>> --
>> Best regards
>>
>> Sili Liu
>>
>>
>>
>
> --
> Best regards
>
> Sili Liu
>

>>>
>>> --
>>> Best regards
>>>
>>> Sili Liu
>>>
>>
>>
>> --
>>
>> Arvid Heise | Senior Java Developer
>>
>> 
>>
>> Follow us @VervericaData
>>
>> --
>>
>> Join Flink Forward  - The Apache Flink
>> Conference
>>
>> Stream Processing | Event Driven | Real Time
>>
>> --
>>
>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>
>> --
>> Ververica GmbH
>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
>> (Toni) Cheng
>>
>
>
> --
> Best regards
>
> Sili Liu
>


-- 

Arvid Heise | Senior Java Developer


Re: Jdbc input format and system properties

2020-11-19 Thread Arvid Heise
Hi Flavio,

if it arrives in the java process then you are doing everything right
already (or almost).

Are you shading the mysql connector? I'm suspecting that the property also
get shaded then. You could decompile your jar to be sure. Have you verified
that this is working as intended without Flink?

On Thu, Nov 19, 2020 at 9:19 PM Flavio Pompermaier 
wrote:

> the properties arrives to the task manager because I can see them in the
> java process (using ps aux)..or donyoubmean some special line of code?
>
> Il gio 19 nov 2020, 20:53 Arvid Heise  ha scritto:
>
>> Hi Flavio,
>>
>> you are right, all looks good.
>>
>> Can you please verify if the properties arrived at the task manager in
>> the remote debugger session? For example, you could check the JVisualVM
>> Overview tab.
>>
>> On Thu, Nov 19, 2020 at 8:38 PM Flavio Pompermaier 
>> wrote:
>>
>>> At the moment I use a standalone cluster, isn't using env.java.opts the
>>> right way to do it?
>>>
>>> Il gio 19 nov 2020, 20:11 Arvid Heise  ha scritto:
>>>
 Hi Flavio,

 -D afaik passes only the system property to the entry point (client or
 jobmanager depending on setup), while you probably want to have it on the
 task managers.

 The specific options to pass it to the task managers depend on the way
 you deploy. -yD for yarn for example. For docker or k8s, you would use env.

 On Wed, Nov 18, 2020 at 10:20 PM Flavio Pompermaier <
 pomperma...@okkam.it> wrote:

> Hi to all,
> while trying to solve a leak with dynamic class loading I found out
> that mysql connector creates an AbandonedConnectionCleanupThread that
> is retained in the ChildFirstClassLoader..from version 8.0.22 there's
> the possibility to inhibit this thread passing the system property
> com.mysql.disableAbandonedConnectionCleanup=true [1] (I put the mysql
> jar in the lib folder).
>
> I tried to set in the flink-conf.yml
> env.java.opts: "-Dcom.mysql.disableAbandonedConnectionCleanup=true"
>
> but the property does not produce the desired effect in the static
> section of such a thread [2] (I verified that attaching the remote
> debugger to the task manager).
>
> How can I fix this problem?
>
> [1]
> https://dev.mysql.com/doc/relnotes/connector-j/8.0/en/news-8-0-22.html
> [2]
> public class AbandonedConnectionCleanupThread implements Runnable {
> private static boolean abandonedConnectionCleanupDisabled =
>
> Boolean.getBoolean(PropertyDefinitions.SYSP_disableAbandonedConnectionCleanup);
>
> static {
> if (abandonedConnectionCleanupDisabled) {
> cleanupThreadExecutorService = null;
> } else {
> cleanupThreadExecutorService =
>Executors.newSingleThreadExecutor(r -> {}
>}
>   }
>


 --

 Arvid Heise | Senior Java Developer

 

 Follow us @VervericaData

 --

 Join Flink Forward  - The Apache Flink
 Conference

 Stream Processing | Event Driven | Real Time

 --

 Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

 --
 Ververica GmbH
 Registered at Amtsgericht Charlottenburg: HRB 158244 B
 Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
 (Toni) Cheng

>>>
>>
>> --
>>
>> Arvid Heise | Senior Java Developer
>>
>> 
>>
>> Follow us @VervericaData
>>
>> --
>>
>> Join Flink Forward  - The Apache Flink
>> Conference
>>
>> Stream Processing | Event Driven | Real Time
>>
>> --
>>
>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>
>> --
>> Ververica GmbH
>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
>> (Toni) Cheng
>>
>

-- 

Arvid Heise | Senior Java Developer



Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng


关于CatalogPartitionSpec类的一些想法

2020-11-19 Thread Jun Zhang
大家好:
  
我现在在使用flink中Catalog的查询分区的方法:Catalog#listPartitions(ObjectPath, 
CatalogPartitionSpec)的时候遇到一个问题。
  我发现CatalogPartitionSpec中存放分区信息的字partitionSpec类型是Map

Re:Re: Re: Re: flink sql cdc sum 结果出现NULL

2020-11-19 Thread kandy.wang
1.没有初始的全量数据可能是会有问题的
这个怎么理解,默认情况,就是从kafka group-sets 消费的,怎么才能保证全量数据? 
我们这个binlog同步都是增量同步。不会做一次初始化的全量同步。
2.先发的before 后发的after


3. 数据在kafka里,是按照mysql的id主键hash的。是有序的,group key 的所有数据不能保证 都在同 一个 partition 
中。由于是按照主键id hash的








在 2020-11-20 13:25:53,"Jark Wu"  写道:
>1. 没有初始的全量数据可能是会有问题的。
>
>3. 你的 format 再解析 update 时,时先发的 before 还是 after?
>4. 你的数据在 kafka 中时有序的么?也就是同一 key 的所有数据都在一个 partition 中不?
>
>On Fri, 20 Nov 2020 at 12:46, kandy.wang  wrote:
>
>>
>>
>>
>>
>>
>>
>> 1.是的。  这个程序跑起来的时候,是无状态的,然后开始慢慢积累状态吧。
>>
>> 2. 没有开启
>>
>>
>>
>>
>> 在 2020-11-20 11:49:44,"Jark Wu"  写道:
>> >实现上应该没什么问题。
>> >
>> >1. 你的cdc 数据中是不是没有包括全量数据,直接从某天的增量开始读的?
>> >2. 是否开启 mini-batch了?
>> >
>> >Best,
>> >Jark
>> >
>> >On Fri, 20 Nov 2020 at 11:44, kandy.wang  wrote:
>> >
>> >> hi Jark:
>> >>
>> >>
>> >> 打了一下log 看了一下聚合相关的几个字段: v_spu_id 、leaving_num 、price
>> >> 都没有出现NULL的情况,在sql-client里看了一下也没有出现NULL的情况
>> >>
>> >> 自定义的format逻辑和canal的类似,insert update delete ,update 消息是需要发送2条
>> update_before
>> >> update_after,format逻辑是应该这么写的吧。
>> >>
>> >>
>> >>
>> >>
>> >> 在 2020-11-19 23:13:19,"Jark Wu"  写道:
>> >> >你可以先直接 select * from hive.database.table; 看看每个字段的值是否是正常正确的,有无 null
>> >> >值的,以验证你的自定义 format 没有问题。
>> >> >
>> >> >Best,
>> >> >Jark
>> >> >
>> >> >On Thu, 19 Nov 2020 at 22:41, kandy.wang  wrote:
>> >> >
>> >> >> --mysql表
>> >> >> CREATE TABLE IF NOT EXISTS
>> `mysql_realtime_leaving_price_spu_index_agg`(
>> >> >>`id` INT UNSIGNED AUTO_INCREMENT,
>> >> >>`spu_id` BIGINT NOT NULL,
>> >> >>`leaving_price`  DECIMAL(10, 5)
>> >> >>PRIMARY KEY ( `id` ),
>> >> >>unique key idx_spu_id (spu_id)
>> >> >> )ENGINE=InnoDB DEFAULT CHARSET=utf8
>> >> >>
>> >> >>
>> >> >> --flink表
>> >> >> CREATE TABLE
>> hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg
>> >> (
>> >> >>`spu_id` BIGINT ,
>> >> >>`leaving_price`  DECIMAL(10, 5),
>> >> >> PRIMARY KEY ( `spu_id`) NOT ENFORCED
>> >> >> ) WITH (
>> >> >>   'connector' = 'jdbc',
>> >> >>'url' = 'jdbc:mysql://...',
>> >> >>'table-name' = 'mysql_realtime_leaving_price_spu_index_agg',
>> >> >>'username' = '...',
>> >> >>'password' = '..'
>> >> >> );
>> >> >>
>> >> >>
>> >> >> --binlog 2mysql
>> >> >>
>> >> >> insert into
>> hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg
>> >> >>
>> >> >> SELECT v_spu_id as spu_id,sum(leaving_num*price) as leaving_price
>> >> >>
>> >> >> FROM hive.database.table
>> >> >>
>> >> >> group by v_spu_id;
>> >> >>
>> >> >>
>> >> >> hive.database.table是指向公司自己的kafka,实现了类似cannal cdc的那4种消息格式。
>> >> >>
>> >> >>
>> >> >> 问题是 mysql结果表 leaving_price 出现很多NULL的情况,看了一下数据leaving_num  price
>> >> >> 字段都不可能出现NULL的情况。就不清楚为啥结果表可能会为NULL。
>> >> >> 有什么好的排查思路么?
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >>
>>


Re:Re: Re: Re: flink sql cdc sum 结果出现NULL

2020-11-19 Thread kandy.wang
1.没有初始的全量数据可能是会有问题的
这个怎么理解,默认情况,就是从kafka group-sets 消费的,怎么才能保证全量数据? 
我们这个binlog同步都是增量同步。不会做一次初始化的全量同步。
2.先发的before 后发的after


3. 数据在kafka里,是按照mysql的id主键hash的。是有序的,group key 的所有数据不能保证 都在同 一个 partition 
中。由于是按照主键id hash的。





在 2020-11-20 13:25:53,"Jark Wu"  写道:
>1. 没有初始的全量数据可能是会有问题的。
>
>3. 你的 format 再解析 update 时,时先发的 before 还是 after?
>4. 你的数据在 kafka 中时有序的么?也就是同一 key 的所有数据都在一个 partition 中不?
>
>On Fri, 20 Nov 2020 at 12:46, kandy.wang  wrote:
>
>>
>>
>>
>>
>>
>>
>> 1.是的。  这个程序跑起来的时候,是无状态的,然后开始慢慢积累状态吧。
>>
>> 2. 没有开启
>>
>>
>>
>>
>> 在 2020-11-20 11:49:44,"Jark Wu"  写道:
>> >实现上应该没什么问题。
>> >
>> >1. 你的cdc 数据中是不是没有包括全量数据,直接从某天的增量开始读的?
>> >2. 是否开启 mini-batch了?
>> >
>> >Best,
>> >Jark
>> >
>> >On Fri, 20 Nov 2020 at 11:44, kandy.wang  wrote:
>> >
>> >> hi Jark:
>> >>
>> >>
>> >> 打了一下log 看了一下聚合相关的几个字段: v_spu_id 、leaving_num 、price
>> >> 都没有出现NULL的情况,在sql-client里看了一下也没有出现NULL的情况
>> >>
>> >> 自定义的format逻辑和canal的类似,insert update delete ,update 消息是需要发送2条
>> update_before
>> >> update_after,format逻辑是应该这么写的吧。
>> >>
>> >>
>> >>
>> >>
>> >> 在 2020-11-19 23:13:19,"Jark Wu"  写道:
>> >> >你可以先直接 select * from hive.database.table; 看看每个字段的值是否是正常正确的,有无 null
>> >> >值的,以验证你的自定义 format 没有问题。
>> >> >
>> >> >Best,
>> >> >Jark
>> >> >
>> >> >On Thu, 19 Nov 2020 at 22:41, kandy.wang  wrote:
>> >> >
>> >> >> --mysql表
>> >> >> CREATE TABLE IF NOT EXISTS
>> `mysql_realtime_leaving_price_spu_index_agg`(
>> >> >>`id` INT UNSIGNED AUTO_INCREMENT,
>> >> >>`spu_id` BIGINT NOT NULL,
>> >> >>`leaving_price`  DECIMAL(10, 5)
>> >> >>PRIMARY KEY ( `id` ),
>> >> >>unique key idx_spu_id (spu_id)
>> >> >> )ENGINE=InnoDB DEFAULT CHARSET=utf8
>> >> >>
>> >> >>
>> >> >> --flink表
>> >> >> CREATE TABLE
>> hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg
>> >> (
>> >> >>`spu_id` BIGINT ,
>> >> >>`leaving_price`  DECIMAL(10, 5),
>> >> >> PRIMARY KEY ( `spu_id`) NOT ENFORCED
>> >> >> ) WITH (
>> >> >>   'connector' = 'jdbc',
>> >> >>'url' = 'jdbc:mysql://...',
>> >> >>'table-name' = 'mysql_realtime_leaving_price_spu_index_agg',
>> >> >>'username' = '...',
>> >> >>'password' = '..'
>> >> >> );
>> >> >>
>> >> >>
>> >> >> --binlog 2mysql
>> >> >>
>> >> >> insert into
>> hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg
>> >> >>
>> >> >> SELECT v_spu_id as spu_id,sum(leaving_num*price) as leaving_price
>> >> >>
>> >> >> FROM hive.database.table
>> >> >>
>> >> >> group by v_spu_id;
>> >> >>
>> >> >>
>> >> >> hive.database.table是指向公司自己的kafka,实现了类似cannal cdc的那4种消息格式。
>> >> >>
>> >> >>
>> >> >> 问题是 mysql结果表 leaving_price 出现很多NULL的情况,看了一下数据leaving_num  price
>> >> >> 字段都不可能出现NULL的情况。就不清楚为啥结果表可能会为NULL。
>> >> >> 有什么好的排查思路么?
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >>
>>


Re: Re: Re: flink sql cdc sum 结果出现NULL

2020-11-19 Thread Jark Wu
1. 没有初始的全量数据可能是会有问题的。

3. 你的 format 再解析 update 时,时先发的 before 还是 after?
4. 你的数据在 kafka 中时有序的么?也就是同一 key 的所有数据都在一个 partition 中不?

On Fri, 20 Nov 2020 at 12:46, kandy.wang  wrote:

>
>
>
>
>
>
> 1.是的。  这个程序跑起来的时候,是无状态的,然后开始慢慢积累状态吧。
>
> 2. 没有开启
>
>
>
>
> 在 2020-11-20 11:49:44,"Jark Wu"  写道:
> >实现上应该没什么问题。
> >
> >1. 你的cdc 数据中是不是没有包括全量数据,直接从某天的增量开始读的?
> >2. 是否开启 mini-batch了?
> >
> >Best,
> >Jark
> >
> >On Fri, 20 Nov 2020 at 11:44, kandy.wang  wrote:
> >
> >> hi Jark:
> >>
> >>
> >> 打了一下log 看了一下聚合相关的几个字段: v_spu_id 、leaving_num 、price
> >> 都没有出现NULL的情况,在sql-client里看了一下也没有出现NULL的情况
> >>
> >> 自定义的format逻辑和canal的类似,insert update delete ,update 消息是需要发送2条
> update_before
> >> update_after,format逻辑是应该这么写的吧。
> >>
> >>
> >>
> >>
> >> 在 2020-11-19 23:13:19,"Jark Wu"  写道:
> >> >你可以先直接 select * from hive.database.table; 看看每个字段的值是否是正常正确的,有无 null
> >> >值的,以验证你的自定义 format 没有问题。
> >> >
> >> >Best,
> >> >Jark
> >> >
> >> >On Thu, 19 Nov 2020 at 22:41, kandy.wang  wrote:
> >> >
> >> >> --mysql表
> >> >> CREATE TABLE IF NOT EXISTS
> `mysql_realtime_leaving_price_spu_index_agg`(
> >> >>`id` INT UNSIGNED AUTO_INCREMENT,
> >> >>`spu_id` BIGINT NOT NULL,
> >> >>`leaving_price`  DECIMAL(10, 5)
> >> >>PRIMARY KEY ( `id` ),
> >> >>unique key idx_spu_id (spu_id)
> >> >> )ENGINE=InnoDB DEFAULT CHARSET=utf8
> >> >>
> >> >>
> >> >> --flink表
> >> >> CREATE TABLE
> hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg
> >> (
> >> >>`spu_id` BIGINT ,
> >> >>`leaving_price`  DECIMAL(10, 5),
> >> >> PRIMARY KEY ( `spu_id`) NOT ENFORCED
> >> >> ) WITH (
> >> >>   'connector' = 'jdbc',
> >> >>'url' = 'jdbc:mysql://...',
> >> >>'table-name' = 'mysql_realtime_leaving_price_spu_index_agg',
> >> >>'username' = '...',
> >> >>'password' = '..'
> >> >> );
> >> >>
> >> >>
> >> >> --binlog 2mysql
> >> >>
> >> >> insert into
> hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg
> >> >>
> >> >> SELECT v_spu_id as spu_id,sum(leaving_num*price) as leaving_price
> >> >>
> >> >> FROM hive.database.table
> >> >>
> >> >> group by v_spu_id;
> >> >>
> >> >>
> >> >> hive.database.table是指向公司自己的kafka,实现了类似cannal cdc的那4种消息格式。
> >> >>
> >> >>
> >> >> 问题是 mysql结果表 leaving_price 出现很多NULL的情况,看了一下数据leaving_num  price
> >> >> 字段都不可能出现NULL的情况。就不清楚为啥结果表可能会为NULL。
> >> >> 有什么好的排查思路么?
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >>
>


Re:关于global window

2020-11-19 Thread hailongwang
Hi sparklelj,

Global window 的是所有相同的 key 的元素会在一个 window里,它没有 window end,所以需要自己实现 custom 
trigger 来触发 window 的计算[1]。
它属于 keyed window,并不是只能有一个 window 实例( windowAll 只有一个 window 实例)。
所以看下是不是用法有错误呢,你的 ‘ StreamToBatchWindow’ 类是继承了哪个接口的?


[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/operators/windows.html#global-windows


Best,
Hailong Wang
在 2020-11-20 01:19:09,"j l"  写道:
>您好,我在看global window的时候有一些疑问,首先是global
>window是全局唯一的,也就是所以的元素都会流到这一个window实例上,这样的话前面加keyBy还有什么意义呢?反正都要流到一个window里面,我测试的结果好像也确实是这样,我自定义了一个global
>window,然后设置了process的并行度,但是window确实是只有一个
>示例如下:
>
>dataUnion.keyBy(0).window(new  StreamToBatchWindow()).process(new
>StreamToBatchProcess()).setParallelism(20).print();
>
>如果是这样的话,这个window岂不是成了瓶颈?不知道我理解的对不对,我是希望能多一些窗口对不同的key stream做global
>window的处理。
>另外一个就是global window会为每个key维护一个状态,这样如果key不断增加岂不是要爆了?怎样才能清除永远不会再出现的key的状态呢?
>
>谢谢!


Re:Re: Re: flink sql cdc sum 结果出现NULL

2020-11-19 Thread kandy.wang






1.是的。  这个程序跑起来的时候,是无状态的,然后开始慢慢积累状态吧。

2. 没有开启




在 2020-11-20 11:49:44,"Jark Wu"  写道:
>实现上应该没什么问题。
>
>1. 你的cdc 数据中是不是没有包括全量数据,直接从某天的增量开始读的?
>2. 是否开启 mini-batch了?
>
>Best,
>Jark
>
>On Fri, 20 Nov 2020 at 11:44, kandy.wang  wrote:
>
>> hi Jark:
>>
>>
>> 打了一下log 看了一下聚合相关的几个字段: v_spu_id 、leaving_num 、price
>> 都没有出现NULL的情况,在sql-client里看了一下也没有出现NULL的情况
>>
>> 自定义的format逻辑和canal的类似,insert update delete ,update 消息是需要发送2条 update_before
>> update_after,format逻辑是应该这么写的吧。
>>
>>
>>
>>
>> 在 2020-11-19 23:13:19,"Jark Wu"  写道:
>> >你可以先直接 select * from hive.database.table; 看看每个字段的值是否是正常正确的,有无 null
>> >值的,以验证你的自定义 format 没有问题。
>> >
>> >Best,
>> >Jark
>> >
>> >On Thu, 19 Nov 2020 at 22:41, kandy.wang  wrote:
>> >
>> >> --mysql表
>> >> CREATE TABLE IF NOT EXISTS `mysql_realtime_leaving_price_spu_index_agg`(
>> >>`id` INT UNSIGNED AUTO_INCREMENT,
>> >>`spu_id` BIGINT NOT NULL,
>> >>`leaving_price`  DECIMAL(10, 5)
>> >>PRIMARY KEY ( `id` ),
>> >>unique key idx_spu_id (spu_id)
>> >> )ENGINE=InnoDB DEFAULT CHARSET=utf8
>> >>
>> >>
>> >> --flink表
>> >> CREATE TABLE hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg
>> (
>> >>`spu_id` BIGINT ,
>> >>`leaving_price`  DECIMAL(10, 5),
>> >> PRIMARY KEY ( `spu_id`) NOT ENFORCED
>> >> ) WITH (
>> >>   'connector' = 'jdbc',
>> >>'url' = 'jdbc:mysql://...',
>> >>'table-name' = 'mysql_realtime_leaving_price_spu_index_agg',
>> >>'username' = '...',
>> >>'password' = '..'
>> >> );
>> >>
>> >>
>> >> --binlog 2mysql
>> >>
>> >> insert into hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg
>> >>
>> >> SELECT v_spu_id as spu_id,sum(leaving_num*price) as leaving_price
>> >>
>> >> FROM hive.database.table
>> >>
>> >> group by v_spu_id;
>> >>
>> >>
>> >> hive.database.table是指向公司自己的kafka,实现了类似cannal cdc的那4种消息格式。
>> >>
>> >>
>> >> 问题是 mysql结果表 leaving_price 出现很多NULL的情况,看了一下数据leaving_num  price
>> >> 字段都不可能出现NULL的情况。就不清楚为啥结果表可能会为NULL。
>> >> 有什么好的排查思路么?
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>>


Re:Re: Re: flink sql cdc sum 结果出现NULL

2020-11-19 Thread kandy.wang
1.是的。  这个程序跑起来的时候,是无状态的,然后开始慢慢积累状态吧。

2. 没有开启


在 2020-11-20 11:49:44,"Jark Wu"  写道:
>实现上应该没什么问题。
>
>1. 你的cdc 数据中是不是没有包括全量数据,直接从某天的增量开始读的?
>2. 是否开启 mini-batch了?
>
>Best,
>Jark
>
>On Fri, 20 Nov 2020 at 11:44, kandy.wang  wrote:
>
>> hi Jark:
>>
>>
>> 打了一下log 看了一下聚合相关的几个字段: v_spu_id 、leaving_num 、price
>> 都没有出现NULL的情况,在sql-client里看了一下也没有出现NULL的情况
>>
>> 自定义的format逻辑和canal的类似,insert update delete ,update 消息是需要发送2条 update_before
>> update_after,format逻辑是应该这么写的吧。
>>
>>
>>
>>
>> 在 2020-11-19 23:13:19,"Jark Wu"  写道:
>> >你可以先直接 select * from hive.database.table; 看看每个字段的值是否是正常正确的,有无 null
>> >值的,以验证你的自定义 format 没有问题。
>> >
>> >Best,
>> >Jark
>> >
>> >On Thu, 19 Nov 2020 at 22:41, kandy.wang  wrote:
>> >
>> >> --mysql表
>> >> CREATE TABLE IF NOT EXISTS `mysql_realtime_leaving_price_spu_index_agg`(
>> >>`id` INT UNSIGNED AUTO_INCREMENT,
>> >>`spu_id` BIGINT NOT NULL,
>> >>`leaving_price`  DECIMAL(10, 5)
>> >>PRIMARY KEY ( `id` ),
>> >>unique key idx_spu_id (spu_id)
>> >> )ENGINE=InnoDB DEFAULT CHARSET=utf8
>> >>
>> >>
>> >> --flink表
>> >> CREATE TABLE hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg
>> (
>> >>`spu_id` BIGINT ,
>> >>`leaving_price`  DECIMAL(10, 5),
>> >> PRIMARY KEY ( `spu_id`) NOT ENFORCED
>> >> ) WITH (
>> >>   'connector' = 'jdbc',
>> >>'url' = 'jdbc:mysql://...',
>> >>'table-name' = 'mysql_realtime_leaving_price_spu_index_agg',
>> >>'username' = '...',
>> >>'password' = '..'
>> >> );
>> >>
>> >>
>> >> --binlog 2mysql
>> >>
>> >> insert into hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg
>> >>
>> >> SELECT v_spu_id as spu_id,sum(leaving_num*price) as leaving_price
>> >>
>> >> FROM hive.database.table
>> >>
>> >> group by v_spu_id;
>> >>
>> >>
>> >> hive.database.table是指向公司自己的kafka,实现了类似cannal cdc的那4种消息格式。
>> >>
>> >>
>> >> 问题是 mysql结果表 leaving_price 出现很多NULL的情况,看了一下数据leaving_num  price
>> >> 字段都不可能出现NULL的情况。就不清楚为啥结果表可能会为NULL。
>> >> 有什么好的排查思路么?
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>>


Re:Re:flink1.11.1任务重启 偶现org.apache.kafka.common.KafkaException: Failed to construct kafka consumer异常

2020-11-19 Thread hailongwang
可以 grep 看下哪些 jar 包包含这 2 个类的?




在 2020-11-20 08:51:59,"m13162790856"  写道:
>HI:
>   偶现的 ,但是最近几次出现的概率还是比较大的, 之前怀疑classload , 我的jar包做了分离, 不会把任何包打进去, 
> 所以包能确保每次启动都是一样,很奇怪这种情况
>
>
>在 2020年11月19日 17:14,hailongwang<18868816...@163.com> 写道:
>
>
>Hi, 这个如果是偶尔出现的,应该是跟 ClassLoad 加载有关。 如果 
>`org.apache.kafka.common.serialization.ByteArrayDeserializer` 被 child 
>classload 加载了, 而 `org.apache.kafka.common.serialization.Deserializer` 被 parent 
>classload 加载了,那么会有问题。 你的 jar 包里面打了 kakfa-connector 的包了吗,如果集群上有,可以provided 看下。 
>希望对你有帮助。 Best, Hailong Wang 在 2020-11-19 14:33:25,"m13162790856" 
> 写道: >具体日主信息如下: > > > 
>org.apache.kafka.common.KafkaException: Failed to construct kafka consumer >at 
>org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:789) 
>at 
>org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:643) 
>at 
>org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:623) 
>at 
>org.apache.flink.streaming.connectors.kafka.internal.KafkaPartitionDiscoverer.initializeConnections(KafkaPartitionDiscoverer.java:58)
> at 
>org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.open(AbstractPartitionDiscoverer.java:94)
> at 
>org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:550)
> at 
>org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
> at 
>org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
> at 
>org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:291)
> at 
>org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:479)
> at 
>org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
> at 
>org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475)
> at 
>org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:528)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) at 
>org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) at 
>java.lang.Thread.run(Thread.java:745) Caused by: 
>org.apache.kafka.common.KafkaException: 
>org.apache.kafka.common.serialization.ByteArrayDeserializer is not an instance 
>of org.apache.kafka.common.serialization.Deserializer at 
>org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:263)
> at 
>org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:688) 
>... 15 more 2020-11-19 15:17:32,0 > > >有哪位同学遇见过


Re: Re: flink sql cdc sum 结果出现NULL

2020-11-19 Thread Jark Wu
实现上应该没什么问题。

1. 你的cdc 数据中是不是没有包括全量数据,直接从某天的增量开始读的?
2. 是否开启 mini-batch了?

Best,
Jark

On Fri, 20 Nov 2020 at 11:44, kandy.wang  wrote:

> hi Jark:
>
>
> 打了一下log 看了一下聚合相关的几个字段: v_spu_id 、leaving_num 、price
> 都没有出现NULL的情况,在sql-client里看了一下也没有出现NULL的情况
>
> 自定义的format逻辑和canal的类似,insert update delete ,update 消息是需要发送2条 update_before
> update_after,format逻辑是应该这么写的吧。
>
>
>
>
> 在 2020-11-19 23:13:19,"Jark Wu"  写道:
> >你可以先直接 select * from hive.database.table; 看看每个字段的值是否是正常正确的,有无 null
> >值的,以验证你的自定义 format 没有问题。
> >
> >Best,
> >Jark
> >
> >On Thu, 19 Nov 2020 at 22:41, kandy.wang  wrote:
> >
> >> --mysql表
> >> CREATE TABLE IF NOT EXISTS `mysql_realtime_leaving_price_spu_index_agg`(
> >>`id` INT UNSIGNED AUTO_INCREMENT,
> >>`spu_id` BIGINT NOT NULL,
> >>`leaving_price`  DECIMAL(10, 5)
> >>PRIMARY KEY ( `id` ),
> >>unique key idx_spu_id (spu_id)
> >> )ENGINE=InnoDB DEFAULT CHARSET=utf8
> >>
> >>
> >> --flink表
> >> CREATE TABLE hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg
> (
> >>`spu_id` BIGINT ,
> >>`leaving_price`  DECIMAL(10, 5),
> >> PRIMARY KEY ( `spu_id`) NOT ENFORCED
> >> ) WITH (
> >>   'connector' = 'jdbc',
> >>'url' = 'jdbc:mysql://...',
> >>'table-name' = 'mysql_realtime_leaving_price_spu_index_agg',
> >>'username' = '...',
> >>'password' = '..'
> >> );
> >>
> >>
> >> --binlog 2mysql
> >>
> >> insert into hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg
> >>
> >> SELECT v_spu_id as spu_id,sum(leaving_num*price) as leaving_price
> >>
> >> FROM hive.database.table
> >>
> >> group by v_spu_id;
> >>
> >>
> >> hive.database.table是指向公司自己的kafka,实现了类似cannal cdc的那4种消息格式。
> >>
> >>
> >> 问题是 mysql结果表 leaving_price 出现很多NULL的情况,看了一下数据leaving_num  price
> >> 字段都不可能出现NULL的情况。就不清楚为啥结果表可能会为NULL。
> >> 有什么好的排查思路么?
> >>
> >>
> >>
> >>
> >>
> >>
>


Re: Flink on block storage in k8s

2020-11-19 Thread Yang Wang
Hi George,

If you PVCs could be mounted ReadWriteMany[1], then I think Flink could be
deployed on these PVs.
However, for the high availability enabled, you still need a distributed
coordination system(ZooKeeper,
or the new introduced Kubernetes HA[2]) for the leader election/retrieval
and meta storage. The real
data will be stored in the PV.

[1].
https://kubernetes.io/docs/concepts/storage/persistent-volumes/#access-modes
[2].
https://cwiki.apache.org/confluence/display/FLINK/FLIP-144%3A+Native+Kubernetes+HA+for+Flink

Best,
Yang

George Costea  于2020年11月20日周五 上午3:55写道:

> Hi there,
>
> Can flink be deployed to PVCs backed by block storage?  It seems the
> only option is blob storage today.
>
> Thanks,
> George
>


Re:Re: flink sql cdc sum 结果出现NULL

2020-11-19 Thread kandy.wang
hi Jark:


打了一下log 看了一下聚合相关的几个字段: v_spu_id 、leaving_num 、price 
都没有出现NULL的情况,在sql-client里看了一下也没有出现NULL的情况

自定义的format逻辑和canal的类似,insert update delete ,update 消息是需要发送2条 update_before 
update_after,format逻辑是应该这么写的吧。




在 2020-11-19 23:13:19,"Jark Wu"  写道:
>你可以先直接 select * from hive.database.table; 看看每个字段的值是否是正常正确的,有无 null
>值的,以验证你的自定义 format 没有问题。
>
>Best,
>Jark
>
>On Thu, 19 Nov 2020 at 22:41, kandy.wang  wrote:
>
>> --mysql表
>> CREATE TABLE IF NOT EXISTS `mysql_realtime_leaving_price_spu_index_agg`(
>>`id` INT UNSIGNED AUTO_INCREMENT,
>>`spu_id` BIGINT NOT NULL,
>>`leaving_price`  DECIMAL(10, 5)
>>PRIMARY KEY ( `id` ),
>>unique key idx_spu_id (spu_id)
>> )ENGINE=InnoDB DEFAULT CHARSET=utf8
>>
>>
>> --flink表
>> CREATE TABLE hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg (
>>`spu_id` BIGINT ,
>>`leaving_price`  DECIMAL(10, 5),
>> PRIMARY KEY ( `spu_id`) NOT ENFORCED
>> ) WITH (
>>   'connector' = 'jdbc',
>>'url' = 'jdbc:mysql://...',
>>'table-name' = 'mysql_realtime_leaving_price_spu_index_agg',
>>'username' = '...',
>>'password' = '..'
>> );
>>
>>
>> --binlog 2mysql
>>
>> insert into hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg
>>
>> SELECT v_spu_id as spu_id,sum(leaving_num*price) as leaving_price
>>
>> FROM hive.database.table
>>
>> group by v_spu_id;
>>
>>
>> hive.database.table是指向公司自己的kafka,实现了类似cannal cdc的那4种消息格式。
>>
>>
>> 问题是 mysql结果表 leaving_price 出现很多NULL的情况,看了一下数据leaving_num  price
>> 字段都不可能出现NULL的情况。就不清楚为啥结果表可能会为NULL。
>> 有什么好的排查思路么?
>>
>>
>>
>>
>>
>>


Re: Filter Null in Array in SQL Connector

2020-11-19 Thread Rex Fenley
Thanks!

Update: We've confirmed with a test copy of our data now that if we remove
all the null values from arrays everything works smoothly and as expected.
So this definitely appears to be the culprit.

On Thu, Nov 19, 2020 at 6:41 PM Jark Wu  wrote:

> Thanks Rex! This is very helpful. Will check it out later.
>
>
> On Fri, 20 Nov 2020 at 03:02, Rex Fenley  wrote:
>
>> Below is a highly redacted set of data that should represent the problem.
>> As you can see, the "roles" field has "[null]" in it, a null value within
>> the array. We also see in our DB corresponding rows like the following.
>> id | roles
>> ---+
>>   16867433 | {NULL}
>>
>> We have confirmed that by not selecting "roles" all data passes through
>> without failure on a single operator, but selecting "roles" will eventually
>> always fail with java.lang.NullPointerException repeatedly. What is odd
>> about this is there is 0 additional stack trace, just the exception, in our
>> logs and in Flink UI. We only have INFO logging on, however, other
>> exceptions we've encountered in our development have always revealed a
>> stack trace.
>>
>> {
>>   "schema": {
>> "type": "struct",
>> "fields": [
>>   {
>> "type": "struct",
>> "fields": [
>>   { "type": "int32", "optional": false, "field": "id" },
>>   {
>> "type": "array",
>> "items": { "type": "string", "optional": true },
>> "optional": false,
>> "field": "roles"
>>   },
>> ],
>> "optional": true,
>> "name": "db.public.data.Value",
>> "field": "before"
>>   },
>>   {
>> "type": "struct",
>> "fields": [
>>   { "type": "int32", "optional": false, "field": "id" },
>>   {
>> "type": "array",
>> "items": { "type": "string", "optional": true },
>> "optional": false,
>> "field": "roles"
>>   },
>> ],
>> "optional": true,
>> "name": "db.public.data.Value",
>> "field": "after"
>>   },
>>   {
>> "type": "struct",
>> "fields": [
>>   { "type": "string", "optional": false, "field": "version" },
>>   { "type": "string", "optional": false, "field": "connector" },
>>   { "type": "string", "optional": false, "field": "name" },
>>   { "type": "int64", "optional": false, "field": "ts_ms" },
>>   {
>> "type": "string",
>> "optional": true,
>> "name": "io.debezium.data.Enum",
>> "version": 1,
>> "parameters": { "allowed": "true,last,false" },
>> "default": "false",
>> "field": "snapshot"
>>   },
>>   { "type": "string", "optional": false, "field": "db" },
>>   { "type": "string", "optional": false, "field": "schema" },
>>   { "type": "string", "optional": false, "field": "table" },
>>   { "type": "int64", "optional": true, "field": "txId" },
>>   { "type": "int64", "optional": true, "field": "lsn" },
>>   { "type": "int64", "optional": true, "field": "xmin" }
>> ],
>> "optional": false,
>> "name": "io.debezium.connector.postgresql.Source",
>> "field": "source"
>>   },
>>   { "type": "string", "optional": false, "field": "op" },
>>   { "type": "int64", "optional": true, "field": "ts_ms" },
>>   {
>> "type": "struct",
>> "fields": [
>>   { "type": "string", "optional": false, "field": "id" },
>>   { "type": "int64", "optional": false, "field": "total_order" },
>>   {
>> "type": "int64",
>> "optional": false,
>> "field": "data_collection_order"
>>   }
>> ],
>> "optional": true,
>> "field": "transaction"
>>   }
>> ],
>> "optional": false,
>> "name": "db.public.data.Envelope"
>>   },
>>   "payload": {
>> "before": null,
>> "after": {
>>   "id": 76704,
>>   "roles": [null],
>> },
>> "source": {
>>   "version": "1.3.0.Final",
>>   "connector": "postgresql",
>>   "name": "db",
>>   "ts_ms": 1605739197360,
>>   "snapshot": "true",
>>   "db": "db",
>>   "schema": "public",
>>   "table": "data",
>>   "txId": 1784,
>>   "lsn": 1305806608,
>>   "xmin": null
>> },
>> "op": "r",
>> "ts_ms": 1605739197373,
>> "transaction": null
>>   }
>> }
>>
>> cc Brad
>>
>> On Thu, Nov 19, 2020 at 7:29 AM Dylan Forciea  wrote:
>>
>>> Ah yes, missed the kafka part and just saw the array part. FLINK-19771
>>> definitely was solely in the postgres-specific code.
>>>
>>>
>>>
>>> Dylan
>>>
>>>
>>>
>>> *From: *Jark Wu 
>>> *Date: *Thursday, November 19, 2020 at 9:12 AM
>>> *To: *Dylan Forciea 
>>> *Cc: *Danny Chan , Rex Fenley ,
>>> Flink ML 
>>> *Subject: *Re: Filter Null in Array in SQL Connector
>>>
>>>
>>>

Re:Re: flink sql cdc sum 结果出现NULL

2020-11-19 Thread kandy.wang
hi Jark:

打了一下log 看了一下聚合相关的几个字段: v_spu_id 、leaving_num 、price 
都没有出现NULL的情况,在sql-client里看了一下也没有出现NULL的情况
自定义的format逻辑和canal的类似,insert update delete ,update 消息是需要发送2条 update_before 
update_after,format逻辑是应该这么写的吧。




在 2020-11-19 23:13:19,"Jark Wu"  写道:
>你可以先直接 select * from hive.database.table; 看看每个字段的值是否是正常正确的,有无 null
>值的,以验证你的自定义 format 没有问题。
>
>Best,
>Jark
>
>On Thu, 19 Nov 2020 at 22:41, kandy.wang  wrote:
>
>> --mysql表
>> CREATE TABLE IF NOT EXISTS `mysql_realtime_leaving_price_spu_index_agg`(
>>`id` INT UNSIGNED AUTO_INCREMENT,
>>`spu_id` BIGINT NOT NULL,
>>`leaving_price`  DECIMAL(10, 5)
>>PRIMARY KEY ( `id` ),
>>unique key idx_spu_id (spu_id)
>> )ENGINE=InnoDB DEFAULT CHARSET=utf8
>>
>>
>> --flink表
>> CREATE TABLE hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg (
>>`spu_id` BIGINT ,
>>`leaving_price`  DECIMAL(10, 5),
>> PRIMARY KEY ( `spu_id`) NOT ENFORCED
>> ) WITH (
>>   'connector' = 'jdbc',
>>'url' = 'jdbc:mysql://...',
>>'table-name' = 'mysql_realtime_leaving_price_spu_index_agg',
>>'username' = '...',
>>'password' = '..'
>> );
>>
>>
>> --binlog 2mysql
>>
>> insert into hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg
>>
>> SELECT v_spu_id as spu_id,sum(leaving_num*price) as leaving_price
>>
>> FROM hive.database.table
>>
>> group by v_spu_id;
>>
>>
>> hive.database.table是指向公司自己的kafka,实现了类似cannal cdc的那4种消息格式。
>>
>>
>> 问题是 mysql结果表 leaving_price 出现很多NULL的情况,看了一下数据leaving_num  price
>> 字段都不可能出现NULL的情况。就不清楚为啥结果表可能会为NULL。
>> 有什么好的排查思路么?
>>
>>
>>
>>
>>
>>


答复: Flink 1.10 和 Flink 1.11 中 keyBy 算子聚合多个字段的问题

2020-11-19 Thread sherlock zw
我看了现在的 flink 1.11 的 keyBy 的代码,是使用的KeySelector 
key,但每次只能返回一个字段,不支持返回多个字段,也就说明了一次只能按一个字段去分组(PS: test.keyBy(t -> 
t.f0)),如果我想按多个字段进行分组的话该怎么操作呢?

-邮件原件-
发件人: guanxianchun  
发送时间: 2020年11月19日 20:53
收件人: user-zh@flink.apache.org
主题: Re: Flink 1.10 和 Flink 1.11 中 keyBy 算子聚合多个字段的问题

flink-1.11使用KeySelector



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Filter Null in Array in SQL Connector

2020-11-19 Thread Danny Chan
I checked with the following json:

{
   "schema":{
  "type":"struct",
  "fields":[
 {
"type":"struct",
"fields":[
   {
  "type":"int32",
  "optional":false,
  "field":"id"
   },
   {
  "type":"array",
  "items":{
 "type":"string",
 "optional":true
  },
  "optional":false,
  "field":"roles"
   }
],
"optional":true,
"name":"db.public.data.Value",
"field":"before"
 },
 {
"type":"struct",
"fields":[
   {
  "type":"int32",
  "optional":false,
  "field":"id"
   },
   {
  "type":"array",
  "items":{
 "type":"string",
 "optional":true
  },
  "optional":false,
  "field":"roles"
   }
],
"optional":true,
"name":"db.public.data.Value",
"field":"after"
 },
 {
"type":"struct",
"fields":[
   {
  "type":"string",
  "optional":false,
  "field":"version"
   },
   {
  "type":"string",
  "optional":false,
  "field":"connector"
   },
   {
  "type":"string",
  "optional":false,
  "field":"name"
   },
   {
  "type":"int64",
  "optional":false,
  "field":"ts_ms"
   },
   {
  "type":"string",
  "optional":true,
  "name":"io.debezium.data.Enum",
  "version":1,
  "parameters":{
 "allowed":"true,last,false"
  },
  "default":"false",
  "field":"snapshot"
   },
   {
  "type":"string",
  "optional":false,
  "field":"db"
   },
   {
  "type":"string",
  "optional":false,
  "field":"schema"
   },
   {
  "type":"string",
  "optional":false,
  "field":"table"
   },
   {
  "type":"int64",
  "optional":true,
  "field":"txId"
   },
   {
  "type":"int64",
  "optional":true,
  "field":"lsn"
   },
   {
  "type":"int64",
  "optional":true,
  "field":"xmin"
   }
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
 },
 {
"type":"string",
"optional":false,
"field":"op"
 },
 {
"type":"int64",
"optional":true,
"field":"ts_ms"
 },
 {
"type":"struct",
"fields":[
   {
  "type":"string",
  "optional":false,
  "field":"id"
   },
   {
  "type":"int64",
  "optional":false,
  "field":"total_order"
   },
   {
  "type":"int64",
  "optional":false,
  "field":"data_collection_order"
   }
],
"optional":true,
"field":"transaction"
 }
  ],
  "optional":false,
  "name":"db.public.data.Envelope"
   },
   "payload":{
  "before":null,
  "after":{
 "id":76704,
 "roles":[
null
 ]
  },
  "source":{
 "version":"1.3.0.Final",
 "connector":"postgresql",
 "name":"db",
 "ts_ms":1605739197360,
 "snapshot":"true",
 "db":"db",
 "schema":"public",
 "table":"data",
 "txId":1784,
 "lsn":1305806608,
 "xmin":null
  },
  "op":"r",
  "ts_ms":1605739197373,
  "transaction":null
   }
}

Which works correctly. I reformatted it because it is with invalid JSON
format.

Rex Fenley  于2020年11月20日周五 上午3:02写道:

> Below is a highly redacted set of data that should represent the problem.
> As you can see, the "roles" field has "[null]" in it, a null value within
> the array. We also see in our DB corresponding rows like the following.
> id

Re: Is possible that make two operators always locate in same taskmanager?

2020-11-19 Thread Si-li Liu
Thanks for your help!

Now the timestamps already go with the items in streaming. My streaming
pipeline is like this:

source -> parser --shuffle--> join -> sink

Streaming A and streaming B go through this pipeline, I keep logs in
streaming A in memory cache (linkedHashmap) in join operator, then all logs
in streaming B tries to lookup up the cache and perform the actual join
work.

I try to use the timestamp of the lastest expire item in memory as a safe
rollback timestamp, if I restart job, the source should use this timestamp
as start offset. The safe rollback timestamp is calucated in join operator,
but I want to use it in source. So the simplest way to pass this
information from join operator to source is use static variable, which
require source operator and join operator always locate in same TM process.

Arvid Heise  于2020年11月20日周五 上午3:33写道:

> Hi Si-li,
>
> couldn't you also add the timestamp as a state to the source? So the time
> would store the timestamp of the last emitted record.
> It's nearly identical to your solution but would fit the recovery model of
> Flink much better.
> If you want to go further back to account for the records that have been
> actually processed in the join, you could also replay the data from  timestamp> - .
>
> On Mon, Nov 16, 2020 at 8:39 AM Si-li Liu  wrote:
>
>> Thanks, I'll try it.
>>
>> Matthias Pohl  于2020年11月14日周六 上午12:53写道:
>>
>>> Hi Si-li,
>>> trying to answer your initial question: Theoretically, you could try
>>> using the co-location constraints to achieve this. But keep in mind that
>>> this might lead to multiple Join operators running in the same JVM reducing
>>> the amount of memory each operator can utilize.
>>>
>>> Best,
>>> Matthias
>>>
>>> On Mon, Nov 9, 2020 at 4:23 AM Si-li Liu  wrote:
>>>
 Thanks for your reply.

 It's a streaming job. The join operator is doing join work, such as
 join. The join state is too large so I don't want to keep the state using
 the mechanism that Flink provided, and also I don't need very precise join.
 So I prefer to let the join operator to calculate a backward timestamp as
 state, if the cluster restarts, the consumer can use setStartFromTimestamp
 to start from that timestamp.

 Now my problem is, consumer can't read the state that join operator
 written, so I need a way to need small message (64bit long) from downstream
 to upstream. Redis may be a solution, but add external  dependency is a
 secondary option if I can pass this message through memory.


 Chesnay Schepler  于2020年11月6日周五 上午7:06写道:

> It would be good if you could elaborate a bit more on your use-case.
> Are you using batch or streaming? What kind of "message" are we
> talking about? Why are you thinking of using a static variable, instead of
> just treating this message as part of the data(set/stream)?
>
> On 11/5/2020 12:55 PM, Si-li Liu wrote:
>
> Currently I use Flink 1.9.1. The actual thing I want to do is send
> some messages from downstream operators to upstream operators, which I
> consider use static variable.
>
> But it makes me have to make sure in one taskmanager process it always
> has these two operators, can I use CoLocationGroup to solve this problem?
> Or can anyone give me an example to demostrate the usage of 
> CoLocationGroup
> ?
>
> Thanks!
> --
> Best regards
>
> Sili Liu
>
>
>

 --
 Best regards

 Sili Liu

>>>
>>
>> --
>> Best regards
>>
>> Sili Liu
>>
>
>
> --
>
> Arvid Heise | Senior Java Developer
>
> 
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward  - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
> (Toni) Cheng
>


-- 
Best regards

Sili Liu


Re: Force Join Unique Key

2020-11-19 Thread Rex Fenley
I'm reading your response as rocksdb having to seek across the whole
dataset for the whole table, which we hope to avoid.

What are the rules for the unique key and unique join key inference? Maybe
we can reorganize our plan to allow it to infer unique keys more correctly.

Thanks

On Wed, Nov 18, 2020 at 9:50 PM Jark Wu  wrote:

> Yes, exactly. The rocksdb has to "seek" data sets because it doesn't know
> how many entries are under the join key.
>
> On Thu, 19 Nov 2020 at 13:38, Rex Fenley  wrote:
>
>> Ok, but if there is only 1 row per Join key on either side of the join,
>> then wouldn't "iterate all the values in the MapState under the current
>> key" effectively be "iterate 1 value in MapState under the current key"
>> which would be O(1)? Or are you saying that it must seek across the entire
>> dataset for the whole table even for that 1 row on either side of the join?
>>
>> Thanks for the help so far!
>>
>> On Wed, Nov 18, 2020 at 6:30 PM Jark Wu  wrote:
>>
>>> Actually, if there is no unique key, it's not O(1), because there maybe
>>> multiple rows are joined by the join key, i.e. iterate all the values in
>>> the MapState under the current key, this is a "seek" operation on rocksdb
>>> which is not efficient.
>>>
>>> Are you asking where the join key is set? The join key is set by the
>>> framework via `AbstractStreamOperator#setKeyContextElement1`.
>>>
>>> Best,
>>> Jark
>>>
>>> On Thu, 19 Nov 2020 at 03:18, Rex Fenley  wrote:
>>>
 Thanks for the info.

 So even if there is no unique key inferred for a Row, the set of rows
 to join on each Join key should effectively still be an O(1) lookup if the
 join key is unique right?

 Also, I've been digging around the code to find where the lookup of
 rows for a join key happens and haven't come across anything. Mind pointing
 me in the right direction?

 Thanks!

 cc Brad

 On Wed, Nov 18, 2020 at 7:39 AM Jark Wu  wrote:

> Hi Rex,
>
> Currently, the join operator may use 3 kinds of state structure
> depending on the input key and join key information.
>
> 1) input doesn't have a unique key => MapState,
> where the map key is the input row and the map value is the number
> of equal rows.
>
> 2) input has unique key, but the unique key is not a subset of join
> key => MapState
> this is better than the above one, because it has a shorter map key
> and
> is more efficient when retracting records.
>
> 3) input has a unique key, and the unique key is a subset of join key
> => ValueState
> this is the best performance, because it only performs a "get"
> operation rather than "seek" on rocksdb
>  for each record of the other input side.
>
> Note: the join key is the key of the keyed states.
>
> You can see the implementation differences
> in 
> org.apache.flink.table.runtime.operators.join.stream.state.JoinRecordStateViews.
>
> Best,
> Jark
>
> On Wed, 18 Nov 2020 at 02:30, Rex Fenley  wrote:
>
>> Ok, what are the performance consequences then of having a join with
>> NoUniqueKey if the left side's key actually is unique in practice?
>>
>> Thanks!
>>
>>
>> On Tue, Nov 17, 2020 at 7:35 AM Jark Wu  wrote:
>>
>>> Hi Rex,
>>>
>>> Currently, the unique key is inferred by the optimizer. However, the
>>> inference is not perfect.
>>> There are known issues that the unique key is not derived correctly,
>>> e.g. FLINK-20036 (is this opened by you?). If you think you have the 
>>> same
>>> case, please open an issue.
>>>
>>> Query hint is a nice way for this, but it is not supported yet.
>>> We have an issue to track supporting query hint, see FLINK-17173.
>>>
>>> Beest,
>>> Jark
>>>
>>>
>>> On Tue, 17 Nov 2020 at 15:23, Rex Fenley  wrote:
>>>
 Hello,

 I have quite a few joins in my plan that have

 leftInputSpec=[NoUniqueKey]

 in Flink UI. I know this can't truly be the case that there is no
 unique key, at least for some of these joins that I've evaluated.

 Is there a way to hint to the join what the unique key is for a
 table?

 Thanks!

 --

 Rex Fenley  |  Software Engineer - Mobile and Backend


 Remind.com  |  BLOG
   |  FOLLOW US
   |  LIKE US
 

>>>
>>
>> --
>>
>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>
>>
>> Remind.com  |  BLOG
>>   |  FOLLOW US
>>   |  LIKE US
>> 
>>
>

 --

Re: Filter Null in Array in SQL Connector

2020-11-19 Thread Jark Wu
Thanks Rex! This is very helpful. Will check it out later.


On Fri, 20 Nov 2020 at 03:02, Rex Fenley  wrote:

> Below is a highly redacted set of data that should represent the problem.
> As you can see, the "roles" field has "[null]" in it, a null value within
> the array. We also see in our DB corresponding rows like the following.
> id | roles
> ---+
>   16867433 | {NULL}
>
> We have confirmed that by not selecting "roles" all data passes through
> without failure on a single operator, but selecting "roles" will eventually
> always fail with java.lang.NullPointerException repeatedly. What is odd
> about this is there is 0 additional stack trace, just the exception, in our
> logs and in Flink UI. We only have INFO logging on, however, other
> exceptions we've encountered in our development have always revealed a
> stack trace.
>
> {
>   "schema": {
> "type": "struct",
> "fields": [
>   {
> "type": "struct",
> "fields": [
>   { "type": "int32", "optional": false, "field": "id" },
>   {
> "type": "array",
> "items": { "type": "string", "optional": true },
> "optional": false,
> "field": "roles"
>   },
> ],
> "optional": true,
> "name": "db.public.data.Value",
> "field": "before"
>   },
>   {
> "type": "struct",
> "fields": [
>   { "type": "int32", "optional": false, "field": "id" },
>   {
> "type": "array",
> "items": { "type": "string", "optional": true },
> "optional": false,
> "field": "roles"
>   },
> ],
> "optional": true,
> "name": "db.public.data.Value",
> "field": "after"
>   },
>   {
> "type": "struct",
> "fields": [
>   { "type": "string", "optional": false, "field": "version" },
>   { "type": "string", "optional": false, "field": "connector" },
>   { "type": "string", "optional": false, "field": "name" },
>   { "type": "int64", "optional": false, "field": "ts_ms" },
>   {
> "type": "string",
> "optional": true,
> "name": "io.debezium.data.Enum",
> "version": 1,
> "parameters": { "allowed": "true,last,false" },
> "default": "false",
> "field": "snapshot"
>   },
>   { "type": "string", "optional": false, "field": "db" },
>   { "type": "string", "optional": false, "field": "schema" },
>   { "type": "string", "optional": false, "field": "table" },
>   { "type": "int64", "optional": true, "field": "txId" },
>   { "type": "int64", "optional": true, "field": "lsn" },
>   { "type": "int64", "optional": true, "field": "xmin" }
> ],
> "optional": false,
> "name": "io.debezium.connector.postgresql.Source",
> "field": "source"
>   },
>   { "type": "string", "optional": false, "field": "op" },
>   { "type": "int64", "optional": true, "field": "ts_ms" },
>   {
> "type": "struct",
> "fields": [
>   { "type": "string", "optional": false, "field": "id" },
>   { "type": "int64", "optional": false, "field": "total_order" },
>   {
> "type": "int64",
> "optional": false,
> "field": "data_collection_order"
>   }
> ],
> "optional": true,
> "field": "transaction"
>   }
> ],
> "optional": false,
> "name": "db.public.data.Envelope"
>   },
>   "payload": {
> "before": null,
> "after": {
>   "id": 76704,
>   "roles": [null],
> },
> "source": {
>   "version": "1.3.0.Final",
>   "connector": "postgresql",
>   "name": "db",
>   "ts_ms": 1605739197360,
>   "snapshot": "true",
>   "db": "db",
>   "schema": "public",
>   "table": "data",
>   "txId": 1784,
>   "lsn": 1305806608,
>   "xmin": null
> },
> "op": "r",
> "ts_ms": 1605739197373,
> "transaction": null
>   }
> }
>
> cc Brad
>
> On Thu, Nov 19, 2020 at 7:29 AM Dylan Forciea  wrote:
>
>> Ah yes, missed the kafka part and just saw the array part. FLINK-19771
>> definitely was solely in the postgres-specific code.
>>
>>
>>
>> Dylan
>>
>>
>>
>> *From: *Jark Wu 
>> *Date: *Thursday, November 19, 2020 at 9:12 AM
>> *To: *Dylan Forciea 
>> *Cc: *Danny Chan , Rex Fenley ,
>> Flink ML 
>> *Subject: *Re: Filter Null in Array in SQL Connector
>>
>>
>>
>> Hi Dylan,
>>
>>
>>
>> I think Rex encountered another issue, because he is using Kafka with
>> Debezium format.
>>
>>
>>
>> Hi Rex,
>>
>>
>>
>> If you can share the json data and the exception stack, that would be
>> helpful!
>>
>>
>>
>> Besides, you can try to enable 'debezium-json.ignore-parse-errors' option
>> [1] to skip the dirty data.
>>
>>
>>
>> Best,
>>
>> Jark
>>
>>
>>
>> [1]:
>> 

Re: State of Machine Learning with Flink and especially FLIP-39

2020-11-19 Thread Becket Qin
Hi Niklas,

We dropped the Flink ML lib in 1.9 and plan to replace it with a new
machine learning library for traditional machine learning algorithms. And
that library will be based on FLIP-39. The plan was pushed back a little
bit because we plan to deprecate DataSet API and but haven't got the batch
iteration support in DataStream API yet. So at this point we don't have an
ML lib implementation in Flink.

That being said, we are working with the community to add some ML related
features to Flink. At this point, we have the following two projects
available from Alibaba that will likely be contributed to Flink. You may
also take a look at them.

Alink -  A machine learning library.
https://github.com/alibaba/alink

Flink-AI-Extended - A project helps running TF / PyTorch on top of Flink.
https://github.com/alibaba/flink-ai-extended

Thanks,

Jiangjie (Becket) Qin

On Fri, Nov 20, 2020 at 3:43 AM Arvid Heise  wrote:

> Hi Niklas,
>
> indeed some efforts on the machine learning libraries are pushed back in
> favor of getting proper PyTorch and Tensorflow support through PyFlink.
>
> Native implementations in Flink have been done so far in the DataSet API,
> which is going to deprecated in the next few releases in favor of the
> unified DataStream API with bounded streams. I expect efforts for native
> implementations to be picked up once DataSet is fully replaced to avoid
> doubling the work. One of the most important features that is lacking is
> proper iteration support in DataStream.
>
> On Thu, Nov 19, 2020 at 1:34 PM Niklas Wilcke 
> wrote:
>
>> Hi Flink-Community,
>>
>> I'm digging through the history of FlinkML and FLIP-39 [0]. What I
>> understood so far is that FlinkML has been removed in 1.9, because it got
>> unmaintained.
>> I'm not really able to find out whether FLIP-39 and providing a
>> replacement for FlinkML is currently worked on. The Umbrella Jira Ticket
>> FLINK-12470 [1] looks stale to me.
>> Was there maybe a change of strategy in the meantime? Is the focus
>> currently on PyFlink to provide ML-Solutions (FLIP-96 [2])?
>> It would be really interesting to get some insights about the future and
>> roadmap of ML in the Flink ecosystem. Thank you very much!
>>
>> Kind Regards,
>> Niklas
>>
>> [0]
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-39+Flink+ML+pipeline+and+ML+libs
>> [1] https://issues.apache.org/jira/browse/FLINK-12470
>> [2]
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-96%3A+Support+Python+ML+Pipeline+API
>
>
>
> --
>
> Arvid Heise | Senior Java Developer
>
> 
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward  - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
> (Toni) Cheng
>


关于global window

2020-11-19 Thread j l
您好,我在看global window的时候有一些疑问,首先是global
window是全局唯一的,也就是所以的元素都会流到这一个window实例上,这样的话前面加keyBy还有什么意义呢?反正都要流到一个window里面,我测试的结果好像也确实是这样,我自定义了一个global
window,然后设置了process的并行度,但是window确实是只有一个
示例如下:

dataUnion.keyBy(0).window(new  StreamToBatchWindow()).process(new
StreamToBatchProcess()).setParallelism(20).print();

如果是这样的话,这个window岂不是成了瓶颈?不知道我理解的对不对,我是希望能多一些窗口对不同的key stream做global
window的处理。
另外一个就是global window会为每个key维护一个状态,这样如果key不断增加岂不是要爆了?怎样才能清除永远不会再出现的key的状态呢?

谢谢!


Re: Flink 1.10 和 Flink 1.11 中 keyBy 算子聚合多个字段的问题

2020-11-19 Thread guanxianchun
flink-1.11使用KeySelector



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re:flink1.11.1任务重启 偶现org.apache.kafka.common.KafkaException: Failed to construct kafka consumer异常

2020-11-19 Thread m13162790856
HI:
   偶现的 ,但是最近几次出现的概率还是比较大的, 之前怀疑classload , 我的jar包做了分离, 不会把任何包打进去, 
所以包能确保每次启动都是一样,很奇怪这种情况


在 2020年11月19日 17:14,hailongwang<18868816...@163.com> 写道:


Hi, 这个如果是偶尔出现的,应该是跟 ClassLoad 加载有关。 如果 
`org.apache.kafka.common.serialization.ByteArrayDeserializer` 被 child classload 
加载了, 而 `org.apache.kafka.common.serialization.Deserializer` 被 parent classload 
加载了,那么会有问题。 你的 jar 包里面打了 kakfa-connector 的包了吗,如果集群上有,可以provided 看下。 希望对你有帮助。 
Best, Hailong Wang 在 2020-11-19 14:33:25,"m13162790856"  
写道: >具体日主信息如下: > > > org.apache.kafka.common.KafkaException: Failed to 
construct kafka consumer >at 
org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:789) 
at 
org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:643) 
at 
org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:623) 
at 
org.apache.flink.streaming.connectors.kafka.internal.KafkaPartitionDiscoverer.initializeConnections(KafkaPartitionDiscoverer.java:58)
 at 
org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.open(AbstractPartitionDiscoverer.java:94)
 at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:550)
 at 
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
 at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
 at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:291)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:479)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:528) 
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) at 
org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) at 
java.lang.Thread.run(Thread.java:745) Caused by: 
org.apache.kafka.common.KafkaException: 
org.apache.kafka.common.serialization.ByteArrayDeserializer is not an instance 
of org.apache.kafka.common.serialization.Deserializer at 
org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:263)
 at 
org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:688) 
... 15 more 2020-11-19 15:17:32,0 > > >有哪位同学遇见过

Re: Job Manager is taking very long time to finalize the Checkpointing.

2020-11-19 Thread Slim Bouguerra
@Arvid thanks will try that, The NFS server I am using should be able to
have TP. In my observation the Serde is taking most of the CPU.

@Yun Tang 
Please find the logs also what are your thoughts? about Source Task Data
Gen is causing this aka pusing the checkpoint to JM instead of filesystem ?
The TM stacktrace
https://gist.github.com/b-slim/971a069dd0754eb770d0e319a12657fb
The JM stacktrace
https://gist.github.com/b-slim/24808478c3e857be563e513a3d65e223

On Thu, Nov 19, 2020 at 11:20 AM Arvid Heise  wrote:

> Hi Slim,
>
> for your initial question concerning the size of _metadata. When Flink
> writes the checkpoint, it assumes some kind of DFS. Pretty much all known
> DFS implementations behave poorly for many small files. If you run a job
> with 5 tasks and parallelism of 120, then you'd get 600 small checkpoint
> files (or more depending on the configuration).
>
> To solve it, Flink combines very small files into the _metadata according
> to some threshold [1]. These small files can quickly add up though. You can
> disable that behavior by setting the threshold to 0.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/config.html#advanced-state-backends-options
>
> On Thu, Nov 19, 2020 at 12:57 AM Slim Bouguerra 
> wrote:
>
>> Hi Yun,
>> Thanks for the help after applying your recommendation, I am getting the
>> same issue aka very long checkpoints and then timeout
>> Now My guess is maybe the datagen source is pushing the checkpoint via
>> the network to JM is there a way to double check?
>> IF that is the case is there a way to exclude the source operators from
>> the checkpoints ?
>> Thanks
>> Please find the attached logs:
>> 1 I checked the shared folder and it has the shared operator state.
>> 2 I did set the value of fs-memory-threshold to 1kb
>>
>> This the source of the SQL testing job
>>
>> CREATE TABLE datagen (
>>   f_sequence INT,
>>   f_random INT,
>>   f_random_str STRING,
>>   f_random_str_4 STRING,
>>   f_random_str_3 STRING,
>>   f_random_str_2 STRING,
>>   f_random_str_1 STRING,
>>   ts AS localtimestamp,
>>   WATERMARK FOR ts AS ts
>> ) WITH (
>>   'connector' = 'datagen',
>>   -- optional options --
>>   'rows-per-second'='50',
>>   'fields.f_sequence.kind'='sequence',
>>   'fields.f_sequence.start'='1',
>>   'fields.f_sequence.end'='2',
>>   'fields.f_random.min'='1',
>>   'fields.f_random.max'='100',
>>   'fields.f_random_str.length'='10',
>>   'fields.f_random_str_4.length'='10',
>>   'fields.f_random_str_3.length'='10',
>>   'fields.f_random_str_2.length'='10',
>>   'fields.f_random_str_1.length'='10'
>>   );
>>
>> ---
>> With more debugging I see this exception stack on the job manager
>> java.io.IOException: The rpc invocation size 199965215 exceeds the
>> maximum akka framesize.
>>
>>  at
>> org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.createRpcInvocationMessage(AkkaInvocationHandler.java:276)
>> [flink-dist_2.11-1.11.1.jar:1.11.1]
>>  at
>> org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invokeRpc(AkkaInvocationHandler.java:205)
>> [flink-dist_2.11-1.11.1.jar:1.11.1]
>>  at
>> org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invoke(AkkaInvocationHandler.java:134)
>> [flink-dist_2.11-1.11.1.jar:1.11.1]
>>  at
>> org.apache.flink.runtime.rpc.akka.FencedAkkaInvocationHandler.invoke(FencedAkkaInvocationHandler.java:79)
>> [flink-dist_2.11-1.11.1.jar:1.11.1]
>>  at com.sun.proxy.$Proxy25.acknowledgeCheckpoint(Unknown Source)
>> [?:?]
>>
>>  at
>> org.apache.flink.runtime.taskexecutor.rpc.RpcCheckpointResponder.acknowledgeCheckpoint(RpcCheckpointResponder.java:46)
>> [flink-dist_2.11-1.11.1.jar:1.1
>>  .1[]
>>
>>
>>  at
>> org.apache.flink.runtime.state.TaskStateManagerImpl.reportTaskStateSnapshots(TaskStateManagerImpl.java:117)
>> [flink-dist_2.11-1.11.1.jar:1.11.1]
>>  at
>> org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.reportCompletedSnapshotStates(AsyncCheckpointRunnable.java:160)
>> [flink-dist_2.11-1.11
>>  1.jar:1.11.1[]
>>
>>
>>  at
>> org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:121)
>> [flink-dist_2.11-1.11.1.jar:1.11.1]
>>  at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>> [?:1.8.0_172]
>>  at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>> [?:1.8.0_172]
>>
>> --
>> And sometime the JM dies with this OOM
>>  java.lang.OutOfMemoryError: Java heap space
>>   at java.util.Arrays.copyOf(Arrays.java:3236) ~[?:1.8.0_172]
>>   at 

Re: Dynamic ad hoc query deployment strategy

2020-11-19 Thread Kostas Kloudas
Hi,

Thanks for reaching out!

First of all, I would like to point out that an interesting
alternative to the per-job cluster could be running your jobs in
application mode [1].

Given that you want to run arbitrary SQL queries, I do not think you
can "share" across queries the part of the job graph that reads a
topic. In general, Flink (not only in SQL) creates the graph of a job
before the job is executed. And especially in SQL you do not even have
control over the graph, as the translation logic from query to
physical operators is opaque and not exposed to the user.

That said, you may want to have a look at [2]. It is pretty old but it
describes a potentially similar usecase. Unfortunately, it does not
support SQL.

Cheers,
Kostas

[1] https://flink.apache.org/news/2020/07/14/application-mode.html
[2] https://www.ververica.com/blog/rbea-scalable-real-time-analytics-at-king

On Sun, Nov 15, 2020 at 10:11 AM lalala  wrote:
>
> Hi all,
>
> I would like to consult with you regarding deployment strategies.
>
> We have +250 Kafka topics that we want users of the platform to submit SQL
> queries that will run indefinitely. We have a query parsers to extract topic
> names from user queries, and the application locally creates Kafka tables
> and execute the query. The result can be collected to multiple sinks such as
> databases, files, cloud services.
>
> We want to have the best isolation between queries, so in case of failures,
> the other jobs will not get affected. We have a huge YARN cluster to handle
> 1PB a day scale from Kafka. I believe cluster per job type deployment makes
> sense for the sake of isolation. However, that creates some scalability
> problems. There might be SQL queries running on the same Kafka topic that we
> do not want to read them again for each query in different sessions. The
> ideal case is that we read the topic once and executes multiple queries on
> this data to avoid rereading the same topic. That breaks the desire of a
> fully isolated system, but it improves network and Kafka performance and
> still provides isolation on the topic level as we just read the topic once
> and execute multiple SQL queries on it.
>
> We are quite new to Flink, but we have experience with Spark. In Spark, we
> can submit an application, and in master, that can listen a query queue and
> submit jobs to the cluster dynamically from different threads. However, In
> Flink, it looks like the main() has to produce the job the graph in advance.
>
> We do use an EMR cluster; what would you recommend for my use case?
>
> Thank you.
>
>
>
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Jdbc input format and system properties

2020-11-19 Thread Flavio Pompermaier
the properties arrives to the task manager because I can see them in the
java process (using ps aux)..or donyoubmean some special line of code?

Il gio 19 nov 2020, 20:53 Arvid Heise  ha scritto:

> Hi Flavio,
>
> you are right, all looks good.
>
> Can you please verify if the properties arrived at the task manager in the
> remote debugger session? For example, you could check the JVisualVM
> Overview tab.
>
> On Thu, Nov 19, 2020 at 8:38 PM Flavio Pompermaier 
> wrote:
>
>> At the moment I use a standalone cluster, isn't using env.java.opts the
>> right way to do it?
>>
>> Il gio 19 nov 2020, 20:11 Arvid Heise  ha scritto:
>>
>>> Hi Flavio,
>>>
>>> -D afaik passes only the system property to the entry point (client or
>>> jobmanager depending on setup), while you probably want to have it on the
>>> task managers.
>>>
>>> The specific options to pass it to the task managers depend on the way
>>> you deploy. -yD for yarn for example. For docker or k8s, you would use env.
>>>
>>> On Wed, Nov 18, 2020 at 10:20 PM Flavio Pompermaier <
>>> pomperma...@okkam.it> wrote:
>>>
 Hi to all,
 while trying to solve a leak with dynamic class loading I found out
 that mysql connector creates an AbandonedConnectionCleanupThread that
 is retained in the ChildFirstClassLoader..from version 8.0.22 there's
 the possibility to inhibit this thread passing the system property
 com.mysql.disableAbandonedConnectionCleanup=true [1] (I put the mysql
 jar in the lib folder).

 I tried to set in the flink-conf.yml
 env.java.opts: "-Dcom.mysql.disableAbandonedConnectionCleanup=true"

 but the property does not produce the desired effect in the static
 section of such a thread [2] (I verified that attaching the remote
 debugger to the task manager).

 How can I fix this problem?

 [1]
 https://dev.mysql.com/doc/relnotes/connector-j/8.0/en/news-8-0-22.html
 [2]
 public class AbandonedConnectionCleanupThread implements Runnable {
 private static boolean abandonedConnectionCleanupDisabled =

 Boolean.getBoolean(PropertyDefinitions.SYSP_disableAbandonedConnectionCleanup);

 static {
 if (abandonedConnectionCleanupDisabled) {
 cleanupThreadExecutorService = null;
 } else {
 cleanupThreadExecutorService =
Executors.newSingleThreadExecutor(r -> {}
}
   }

>>>
>>>
>>> --
>>>
>>> Arvid Heise | Senior Java Developer
>>>
>>> 
>>>
>>> Follow us @VervericaData
>>>
>>> --
>>>
>>> Join Flink Forward  - The Apache Flink
>>> Conference
>>>
>>> Stream Processing | Event Driven | Real Time
>>>
>>> --
>>>
>>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>>
>>> --
>>> Ververica GmbH
>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
>>> (Toni) Cheng
>>>
>>
>
> --
>
> Arvid Heise | Senior Java Developer
>
> 
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward  - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
> (Toni) Cheng
>


Re: Flink on block storage in k8s

2020-11-19 Thread George Costea
Hi there,

Can flink be deployed to PVCs backed by block storage?  It seems the
only option is blob storage today.

Thanks,
George


Re: Jdbc input format and system properties

2020-11-19 Thread Arvid Heise
Hi Flavio,

you are right, all looks good.

Can you please verify if the properties arrived at the task manager in the
remote debugger session? For example, you could check the JVisualVM
Overview tab.

On Thu, Nov 19, 2020 at 8:38 PM Flavio Pompermaier 
wrote:

> At the moment I use a standalone cluster, isn't using env.java.opts the
> right way to do it?
>
> Il gio 19 nov 2020, 20:11 Arvid Heise  ha scritto:
>
>> Hi Flavio,
>>
>> -D afaik passes only the system property to the entry point (client or
>> jobmanager depending on setup), while you probably want to have it on the
>> task managers.
>>
>> The specific options to pass it to the task managers depend on the way
>> you deploy. -yD for yarn for example. For docker or k8s, you would use env.
>>
>> On Wed, Nov 18, 2020 at 10:20 PM Flavio Pompermaier 
>> wrote:
>>
>>> Hi to all,
>>> while trying to solve a leak with dynamic class loading I found out
>>> that mysql connector creates an AbandonedConnectionCleanupThread that
>>> is retained in the ChildFirstClassLoader..from version 8.0.22 there's
>>> the possibility to inhibit this thread passing the system property
>>> com.mysql.disableAbandonedConnectionCleanup=true [1] (I put the mysql
>>> jar in the lib folder).
>>>
>>> I tried to set in the flink-conf.yml
>>> env.java.opts: "-Dcom.mysql.disableAbandonedConnectionCleanup=true"
>>>
>>> but the property does not produce the desired effect in the static
>>> section of such a thread [2] (I verified that attaching the remote
>>> debugger to the task manager).
>>>
>>> How can I fix this problem?
>>>
>>> [1]
>>> https://dev.mysql.com/doc/relnotes/connector-j/8.0/en/news-8-0-22.html
>>> [2]
>>> public class AbandonedConnectionCleanupThread implements Runnable {
>>> private static boolean abandonedConnectionCleanupDisabled =
>>>
>>> Boolean.getBoolean(PropertyDefinitions.SYSP_disableAbandonedConnectionCleanup);
>>>
>>> static {
>>> if (abandonedConnectionCleanupDisabled) {
>>> cleanupThreadExecutorService = null;
>>> } else {
>>> cleanupThreadExecutorService =
>>>Executors.newSingleThreadExecutor(r -> {}
>>>}
>>>   }
>>>
>>
>>
>> --
>>
>> Arvid Heise | Senior Java Developer
>>
>> 
>>
>> Follow us @VervericaData
>>
>> --
>>
>> Join Flink Forward  - The Apache Flink
>> Conference
>>
>> Stream Processing | Event Driven | Real Time
>>
>> --
>>
>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>
>> --
>> Ververica GmbH
>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
>> (Toni) Cheng
>>
>

-- 

Arvid Heise | Senior Java Developer



Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng


Re: State of Machine Learning with Flink and especially FLIP-39

2020-11-19 Thread Arvid Heise
Hi Niklas,

indeed some efforts on the machine learning libraries are pushed back in
favor of getting proper PyTorch and Tensorflow support through PyFlink.

Native implementations in Flink have been done so far in the DataSet API,
which is going to deprecated in the next few releases in favor of the
unified DataStream API with bounded streams. I expect efforts for native
implementations to be picked up once DataSet is fully replaced to avoid
doubling the work. One of the most important features that is lacking is
proper iteration support in DataStream.

On Thu, Nov 19, 2020 at 1:34 PM Niklas Wilcke 
wrote:

> Hi Flink-Community,
>
> I'm digging through the history of FlinkML and FLIP-39 [0]. What I
> understood so far is that FlinkML has been removed in 1.9, because it got
> unmaintained.
> I'm not really able to find out whether FLIP-39 and providing a
> replacement for FlinkML is currently worked on. The Umbrella Jira Ticket
> FLINK-12470 [1] looks stale to me.
> Was there maybe a change of strategy in the meantime? Is the focus
> currently on PyFlink to provide ML-Solutions (FLIP-96 [2])?
> It would be really interesting to get some insights about the future and
> roadmap of ML in the Flink ecosystem. Thank you very much!
>
> Kind Regards,
> Niklas
>
> [0]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-39+Flink+ML+pipeline+and+ML+libs
> [1] https://issues.apache.org/jira/browse/FLINK-12470
> [2]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-96%3A+Support+Python+ML+Pipeline+API



-- 

Arvid Heise | Senior Java Developer



Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng


Re: Jdbc input format and system properties

2020-11-19 Thread Flavio Pompermaier
At the moment I use a standalone cluster, isn't using env.java.opts the
right way to do it?

Il gio 19 nov 2020, 20:11 Arvid Heise  ha scritto:

> Hi Flavio,
>
> -D afaik passes only the system property to the entry point (client or
> jobmanager depending on setup), while you probably want to have it on the
> task managers.
>
> The specific options to pass it to the task managers depend on the way you
> deploy. -yD for yarn for example. For docker or k8s, you would use env.
>
> On Wed, Nov 18, 2020 at 10:20 PM Flavio Pompermaier 
> wrote:
>
>> Hi to all,
>> while trying to solve a leak with dynamic class loading I found out
>> that mysql connector creates an AbandonedConnectionCleanupThread that
>> is retained in the ChildFirstClassLoader..from version 8.0.22 there's
>> the possibility to inhibit this thread passing the system property
>> com.mysql.disableAbandonedConnectionCleanup=true [1] (I put the mysql
>> jar in the lib folder).
>>
>> I tried to set in the flink-conf.yml
>> env.java.opts: "-Dcom.mysql.disableAbandonedConnectionCleanup=true"
>>
>> but the property does not produce the desired effect in the static
>> section of such a thread [2] (I verified that attaching the remote
>> debugger to the task manager).
>>
>> How can I fix this problem?
>>
>> [1]
>> https://dev.mysql.com/doc/relnotes/connector-j/8.0/en/news-8-0-22.html
>> [2]
>> public class AbandonedConnectionCleanupThread implements Runnable {
>> private static boolean abandonedConnectionCleanupDisabled =
>>
>> Boolean.getBoolean(PropertyDefinitions.SYSP_disableAbandonedConnectionCleanup);
>>
>> static {
>> if (abandonedConnectionCleanupDisabled) {
>> cleanupThreadExecutorService = null;
>> } else {
>> cleanupThreadExecutorService =
>>Executors.newSingleThreadExecutor(r -> {}
>>}
>>   }
>>
>
>
> --
>
> Arvid Heise | Senior Java Developer
>
> 
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward  - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
> (Toni) Cheng
>


Re: How to achieve co-location constraints in Flink 1.9.1

2020-11-19 Thread Arvid Heise
Hi Si-li,

slot sharing is indeed the way that Flink performs co-location. It's
actually enabled by default. It should work as expected if upstream and
downstream operators have the same parallelism.

In certain cases, two operators can be even chained into one task where no
serialization/network traffic happens.

Btw, I also responded to your initial mail to point out an alternative
solution.

On Thu, Nov 19, 2020 at 12:52 PM Si-li Liu  wrote:

> Hi
>
> Flink only have slotSharingGroup API on DataStream class, I can't find any
> public API to achieve co-location constraints. Could anyone provide me an
> example?
>
> Another question is that if I use slotSharing group, Flink will schedule
> two sub tasks to same slot is possible. I think such schedule will always
> success because two tasks run in one slot just decrease the resource usage.
> Could anyone provide me an example when slotSharing group schedule fail ?
>
> Thanks
>
> --
> Best regards
>
> Sili Liu
>


-- 

Arvid Heise | Senior Java Developer



Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng


Re: Is possible that make two operators always locate in same taskmanager?

2020-11-19 Thread Arvid Heise
Hi Si-li,

couldn't you also add the timestamp as a state to the source? So the time
would store the timestamp of the last emitted record.
It's nearly identical to your solution but would fit the recovery model of
Flink much better.
If you want to go further back to account for the records that have been
actually processed in the join, you could also replay the data from  - .

On Mon, Nov 16, 2020 at 8:39 AM Si-li Liu  wrote:

> Thanks, I'll try it.
>
> Matthias Pohl  于2020年11月14日周六 上午12:53写道:
>
>> Hi Si-li,
>> trying to answer your initial question: Theoretically, you could try
>> using the co-location constraints to achieve this. But keep in mind that
>> this might lead to multiple Join operators running in the same JVM reducing
>> the amount of memory each operator can utilize.
>>
>> Best,
>> Matthias
>>
>> On Mon, Nov 9, 2020 at 4:23 AM Si-li Liu  wrote:
>>
>>> Thanks for your reply.
>>>
>>> It's a streaming job. The join operator is doing join work, such as
>>> join. The join state is too large so I don't want to keep the state using
>>> the mechanism that Flink provided, and also I don't need very precise join.
>>> So I prefer to let the join operator to calculate a backward timestamp as
>>> state, if the cluster restarts, the consumer can use setStartFromTimestamp
>>> to start from that timestamp.
>>>
>>> Now my problem is, consumer can't read the state that join operator
>>> written, so I need a way to need small message (64bit long) from downstream
>>> to upstream. Redis may be a solution, but add external  dependency is a
>>> secondary option if I can pass this message through memory.
>>>
>>>
>>> Chesnay Schepler  于2020年11月6日周五 上午7:06写道:
>>>
 It would be good if you could elaborate a bit more on your use-case.
 Are you using batch or streaming? What kind of "message" are we talking
 about? Why are you thinking of using a static variable, instead of just
 treating this message as part of the data(set/stream)?

 On 11/5/2020 12:55 PM, Si-li Liu wrote:

 Currently I use Flink 1.9.1. The actual thing I want to do is send some
 messages from downstream operators to upstream operators, which I consider
 use static variable.

 But it makes me have to make sure in one taskmanager process it always
 has these two operators, can I use CoLocationGroup to solve this problem?
 Or can anyone give me an example to demostrate the usage of CoLocationGroup
 ?

 Thanks!
 --
 Best regards

 Sili Liu



>>>
>>> --
>>> Best regards
>>>
>>> Sili Liu
>>>
>>
>
> --
> Best regards
>
> Sili Liu
>


-- 

Arvid Heise | Senior Java Developer



Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng


Re: How to set keystore.jks location on EMR when reading Kafka topics via SSL

2020-11-19 Thread Arvid Heise
Glad to hear that you worked it out.

Indeed, the path has to be accessible by the worker nodes. A common
solution is also to put it on some DFS like HDFS and reference that. Then
you only need to update one file if the key changes.

On Thu, Nov 19, 2020 at 2:14 AM Fanbin Bu  wrote:

> i have to put the keystore file to the nodes.
>
> On Wed, Nov 18, 2020 at 4:29 PM Fanbin Bu  wrote:
>
>> Hi,
>>
>> This is a repost with modified subject per Sri Tummala's suggestion.
>>
>> I'm running Flink 1.11 on EMR and would like to read Kafka via SSL. I
>> tried to put keystore.jks location under /usr/lib/flink/... like:
>>
>> export
>> SSL_KEYSTORE_LOCATION=/usr/lib/flink/kafka/kafka_2.11-2.3.1/config/ssl/keystore/kafka.keystore.jks
>>
>> Notice that this is on EMR master(master) node. Both JM and TMs are on
>> EMR core(slave) nodes.
>>
>> Here is the code snippet:
>>
>> val stmt = s"""
>>   |create table ${table.name} (${schema}, ${watermark})
>>   |with(
>>   |'connector' = 'kafka',
>>   |'topic' = '${table.topic}',
>>   |'scan.startup.mode'= '${table.scanStartUpMode}',
>>   |'properties.zookeeper.connect'='xxx',
>>   |'properties.bootstrap.servers'='xxx',
>>  *
>> |'properties.ssl.keystore.location'='/usr/lib/flink/kafka/kafka_2.11-2.3.1/config/ssl/keystore/kafka.keystore.jks',*
>>   |'properties.ssl.keystore.password'='xxx',
>>   |'properties.ssl.key.password'='xxx',
>>   |'properties.security.protocol'='SSL',
>>   |'properties.ssl.keystore.type'='JKS',
>>   |'properties.ssl.truststore.type'='JKS',
>>   |'properties.ssl.enabled.protocols'='TLSv1.2,TLSv1.1,TLSv1',
>>   |'properties.group.id' = '${table.name}_group_id',
>>   |'format' = 'json',
>>   |'json.ignore-parse-errors' = 'true'
>>   |)
>> """.stripMargin
>>
>> tEnv.executeSql(stmt)
>>
>>
>> However, I got exception: *Caused by: java.nio.file.NoSuchFileException:
>> /usr/lib/flink/kafka/kafka_2.11-2.3.1/config/ssl/keystore/kafka.keystore.jks*
>> even though the file is there
>> [hadoop@ip-10-200-41-39 flink]$ ll
>> /usr/lib/flink/kafka/kafka_2.11-2.3.1/config/ssl/keystore/kafka.keystore.jks
>> -rw-r--r-- 1 root root 5565 Nov 17 22:24
>> /usr/lib/flink/kafka/kafka_2.11-2.3.1/config/ssl/keystore/kafka.keystore.jks
>>
>> Things i tried:
>> 1. the keystore.jks file itself works since I can use console-consumer to
>> read kafka topics on EMR master.
>> 2. set the location to be s3://my-bucket/keystore.jks, not working
>>
>> What value should I set the keystore location to?
>> Thanks!
>> Fanbin
>>
>> Also attached the full exception log:
>>
>> 2020-11-17 09:35:49
>> org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
>> at
>> org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:820)
>> at
>> org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:666)
>> at
>> org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:646)
>> at
>> org.apache.flink.streaming.connectors.kafka.internal.KafkaPartitionDiscoverer.initializeConnections(KafkaPartitionDiscoverer.java:58)
>> at
>> org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.open(AbstractPartitionDiscoverer.java:94)
>> at
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:550)
>> at
>> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>> at
>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:291)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:479)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:528)
>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
>> at java.lang.Thread.run(Thread.java:748)
>> Caused by: org.apache.kafka.common.KafkaException:
>> org.apache.kafka.common.KafkaException:
>> org.apache.kafka.common.KafkaException: Failed to load SSL keystore
>> /usr/lib/flink/kafka/kafka_2.11-2.3.1/config/ssl/keystore/kafka.keystore.jks
>> of type JKS
>> at
>> org.apache.kafka.common.network.SslChannelBuilder.configure(SslChannelBuilder.java:71)
>> at
>> org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:146)
>> at
>> org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:67)
>> at
>> org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:99)
>> at
>> org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:741)
>> ... 15 more
>> Caused by: 

Re: Job Manager is taking very long time to finalize the Checkpointing.

2020-11-19 Thread Arvid Heise
Hi Slim,

for your initial question concerning the size of _metadata. When Flink
writes the checkpoint, it assumes some kind of DFS. Pretty much all known
DFS implementations behave poorly for many small files. If you run a job
with 5 tasks and parallelism of 120, then you'd get 600 small checkpoint
files (or more depending on the configuration).

To solve it, Flink combines very small files into the _metadata according
to some threshold [1]. These small files can quickly add up though. You can
disable that behavior by setting the threshold to 0.

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/config.html#advanced-state-backends-options

On Thu, Nov 19, 2020 at 12:57 AM Slim Bouguerra 
wrote:

> Hi Yun,
> Thanks for the help after applying your recommendation, I am getting the
> same issue aka very long checkpoints and then timeout
> Now My guess is maybe the datagen source is pushing the checkpoint via the
> network to JM is there a way to double check?
> IF that is the case is there a way to exclude the source operators from
> the checkpoints ?
> Thanks
> Please find the attached logs:
> 1 I checked the shared folder and it has the shared operator state.
> 2 I did set the value of fs-memory-threshold to 1kb
>
> This the source of the SQL testing job
>
> CREATE TABLE datagen (
>   f_sequence INT,
>   f_random INT,
>   f_random_str STRING,
>   f_random_str_4 STRING,
>   f_random_str_3 STRING,
>   f_random_str_2 STRING,
>   f_random_str_1 STRING,
>   ts AS localtimestamp,
>   WATERMARK FOR ts AS ts
> ) WITH (
>   'connector' = 'datagen',
>   -- optional options --
>   'rows-per-second'='50',
>   'fields.f_sequence.kind'='sequence',
>   'fields.f_sequence.start'='1',
>   'fields.f_sequence.end'='2',
>   'fields.f_random.min'='1',
>   'fields.f_random.max'='100',
>   'fields.f_random_str.length'='10',
>   'fields.f_random_str_4.length'='10',
>   'fields.f_random_str_3.length'='10',
>   'fields.f_random_str_2.length'='10',
>   'fields.f_random_str_1.length'='10'
>   );
>
> ---
> With more debugging I see this exception stack on the job manager
> java.io.IOException: The rpc invocation size 199965215 exceeds the maximum
> akka framesize.
>
>  at
> org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.createRpcInvocationMessage(AkkaInvocationHandler.java:276)
> [flink-dist_2.11-1.11.1.jar:1.11.1]
>  at
> org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invokeRpc(AkkaInvocationHandler.java:205)
> [flink-dist_2.11-1.11.1.jar:1.11.1]
>  at
> org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invoke(AkkaInvocationHandler.java:134)
> [flink-dist_2.11-1.11.1.jar:1.11.1]
>  at
> org.apache.flink.runtime.rpc.akka.FencedAkkaInvocationHandler.invoke(FencedAkkaInvocationHandler.java:79)
> [flink-dist_2.11-1.11.1.jar:1.11.1]
>  at com.sun.proxy.$Proxy25.acknowledgeCheckpoint(Unknown Source) [?:?]
>
>
>  at
> org.apache.flink.runtime.taskexecutor.rpc.RpcCheckpointResponder.acknowledgeCheckpoint(RpcCheckpointResponder.java:46)
> [flink-dist_2.11-1.11.1.jar:1.1
>  .1[]
>
>
>  at
> org.apache.flink.runtime.state.TaskStateManagerImpl.reportTaskStateSnapshots(TaskStateManagerImpl.java:117)
> [flink-dist_2.11-1.11.1.jar:1.11.1]
>  at
> org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.reportCompletedSnapshotStates(AsyncCheckpointRunnable.java:160)
> [flink-dist_2.11-1.11
>  1.jar:1.11.1[]
>
>
>  at
> org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:121)
> [flink-dist_2.11-1.11.1.jar:1.11.1]
>  at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> [?:1.8.0_172]
>  at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> [?:1.8.0_172]
>
> --
> And sometime the JM dies with this OOM
>  java.lang.OutOfMemoryError: Java heap space
>   at java.util.Arrays.copyOf(Arrays.java:3236) ~[?:1.8.0_172]
>   at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:118)
> ~[?:1.8.0_172]
>   at
> java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
> ~[?:1.8.0_172]
>   at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
> ~[?:1.8.0_172]
>   at
> java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877)
> ~[?:1.8.0_172]
>   at
> java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
> ~[?:1.8.0_172]
>   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189)
> ~[?:1.8.0_172]
>   at 

Re: Jdbc input format and system properties

2020-11-19 Thread Arvid Heise
Hi Flavio,

-D afaik passes only the system property to the entry point (client or
jobmanager depending on setup), while you probably want to have it on the
task managers.

The specific options to pass it to the task managers depend on the way you
deploy. -yD for yarn for example. For docker or k8s, you would use env.

On Wed, Nov 18, 2020 at 10:20 PM Flavio Pompermaier 
wrote:

> Hi to all,
> while trying to solve a leak with dynamic class loading I found out
> that mysql connector creates an AbandonedConnectionCleanupThread that
> is retained in the ChildFirstClassLoader..from version 8.0.22 there's
> the possibility to inhibit this thread passing the system property
> com.mysql.disableAbandonedConnectionCleanup=true [1] (I put the mysql
> jar in the lib folder).
>
> I tried to set in the flink-conf.yml
> env.java.opts: "-Dcom.mysql.disableAbandonedConnectionCleanup=true"
>
> but the property does not produce the desired effect in the static
> section of such a thread [2] (I verified that attaching the remote
> debugger to the task manager).
>
> How can I fix this problem?
>
> [1] https://dev.mysql.com/doc/relnotes/connector-j/8.0/en/news-8-0-22.html
> [2]
> public class AbandonedConnectionCleanupThread implements Runnable {
> private static boolean abandonedConnectionCleanupDisabled =
>
> Boolean.getBoolean(PropertyDefinitions.SYSP_disableAbandonedConnectionCleanup);
>
> static {
> if (abandonedConnectionCleanupDisabled) {
> cleanupThreadExecutorService = null;
> } else {
> cleanupThreadExecutorService =
>Executors.newSingleThreadExecutor(r -> {}
>}
>   }
>


-- 

Arvid Heise | Senior Java Developer



Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng


Re: How do i load mysql data into task

2020-11-19 Thread Arvid Heise
Hi Jiazhi,

you can use a rich function and query all static data in open [1] as you'd
do it in Java if you want to load the data into main memory. If you want to
dynamically query the database (enriching a record), you should use Async
IO instead. [2]

Alternatively, you can also use the data source directly in table API. [3]

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/user_defined_functions.html#rich-functions
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/operators/asyncio.html
[3]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html#how-to-create-a-jdbc-table

On Wed, Nov 18, 2020 at 5:00 PM ゞ野蠻遊戲χ  wrote:

> Hi all
>
>  How to use DataStream to load mysql data into the memory of flink
> task when the task is initialized? Please give me a demo.
>
> Thanks,
> Jiazhi
>
>
>

-- 

Arvid Heise | Senior Java Developer



Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng


Re: Filter Null in Array in SQL Connector

2020-11-19 Thread Rex Fenley
Below is a highly redacted set of data that should represent the problem.
As you can see, the "roles" field has "[null]" in it, a null value within
the array. We also see in our DB corresponding rows like the following.
id | roles
---+
  16867433 | {NULL}

We have confirmed that by not selecting "roles" all data passes through
without failure on a single operator, but selecting "roles" will eventually
always fail with java.lang.NullPointerException repeatedly. What is odd
about this is there is 0 additional stack trace, just the exception, in our
logs and in Flink UI. We only have INFO logging on, however, other
exceptions we've encountered in our development have always revealed a
stack trace.

{
  "schema": {
"type": "struct",
"fields": [
  {
"type": "struct",
"fields": [
  { "type": "int32", "optional": false, "field": "id" },
  {
"type": "array",
"items": { "type": "string", "optional": true },
"optional": false,
"field": "roles"
  },
],
"optional": true,
"name": "db.public.data.Value",
"field": "before"
  },
  {
"type": "struct",
"fields": [
  { "type": "int32", "optional": false, "field": "id" },
  {
"type": "array",
"items": { "type": "string", "optional": true },
"optional": false,
"field": "roles"
  },
],
"optional": true,
"name": "db.public.data.Value",
"field": "after"
  },
  {
"type": "struct",
"fields": [
  { "type": "string", "optional": false, "field": "version" },
  { "type": "string", "optional": false, "field": "connector" },
  { "type": "string", "optional": false, "field": "name" },
  { "type": "int64", "optional": false, "field": "ts_ms" },
  {
"type": "string",
"optional": true,
"name": "io.debezium.data.Enum",
"version": 1,
"parameters": { "allowed": "true,last,false" },
"default": "false",
"field": "snapshot"
  },
  { "type": "string", "optional": false, "field": "db" },
  { "type": "string", "optional": false, "field": "schema" },
  { "type": "string", "optional": false, "field": "table" },
  { "type": "int64", "optional": true, "field": "txId" },
  { "type": "int64", "optional": true, "field": "lsn" },
  { "type": "int64", "optional": true, "field": "xmin" }
],
"optional": false,
"name": "io.debezium.connector.postgresql.Source",
"field": "source"
  },
  { "type": "string", "optional": false, "field": "op" },
  { "type": "int64", "optional": true, "field": "ts_ms" },
  {
"type": "struct",
"fields": [
  { "type": "string", "optional": false, "field": "id" },
  { "type": "int64", "optional": false, "field": "total_order" },
  {
"type": "int64",
"optional": false,
"field": "data_collection_order"
  }
],
"optional": true,
"field": "transaction"
  }
],
"optional": false,
"name": "db.public.data.Envelope"
  },
  "payload": {
"before": null,
"after": {
  "id": 76704,
  "roles": [null],
},
"source": {
  "version": "1.3.0.Final",
  "connector": "postgresql",
  "name": "db",
  "ts_ms": 1605739197360,
  "snapshot": "true",
  "db": "db",
  "schema": "public",
  "table": "data",
  "txId": 1784,
  "lsn": 1305806608,
  "xmin": null
},
"op": "r",
"ts_ms": 1605739197373,
"transaction": null
  }
}

cc Brad

On Thu, Nov 19, 2020 at 7:29 AM Dylan Forciea  wrote:

> Ah yes, missed the kafka part and just saw the array part. FLINK-19771
> definitely was solely in the postgres-specific code.
>
>
>
> Dylan
>
>
>
> *From: *Jark Wu 
> *Date: *Thursday, November 19, 2020 at 9:12 AM
> *To: *Dylan Forciea 
> *Cc: *Danny Chan , Rex Fenley ,
> Flink ML 
> *Subject: *Re: Filter Null in Array in SQL Connector
>
>
>
> Hi Dylan,
>
>
>
> I think Rex encountered another issue, because he is using Kafka with
> Debezium format.
>
>
>
> Hi Rex,
>
>
>
> If you can share the json data and the exception stack, that would be
> helpful!
>
>
>
> Besides, you can try to enable 'debezium-json.ignore-parse-errors' option
> [1] to skip the dirty data.
>
>
>
> Best,
>
> Jark
>
>
>
> [1]:
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/debezium.html#debezium-json-ignore-parse-errors
>
>
>
> On Thu, 19 Nov 2020 at 21:13, Dylan Forciea  wrote:
>
> Do you mean that the array contains values that are null, or that the
> entire array itself is null? If it’s the latter, I have an issue written,
> along with a PR to fix it that has been pending review [1].
>
>

Re: SSL setup for YARN deployment when hostnames are unknown.

2020-11-19 Thread Jiahui Jiang
Yeah there is no wildcard hostname it can be using.

Went ahead and started the implementation for the start up wrapper, but just 
realized after generating the key-cert pair in the JM wrapper, we will need to 
ping back to the client with the cert.

Another question I have is, currently we are using Flink Yarn client. Is there 
anywhere I can configure it to use a separate sslContext rather than the 
current JVM's truststore? Since for every cluster we submit, it needs to 
configure the trust context dynamically.

Thank you!


From: Robert Metzger 
Sent: Thursday, November 12, 2020 2:08 AM
To: Jiahui Jiang 
Cc: matth...@ververica.com ; user@flink.apache.org 
; aljos...@apache.org 
Subject: Re: SSL setup for YARN deployment when hostnames are unknown.

Hi Jiahui,

using the yarn.container-start-command-template is indeed a good idea.

I was also wondering whether the Flink YARN client that submits the Flink 
cluster to YARN has knowledge of the host where the ApplicationMaster gets 
deployed to. But that doesn't seem to be the case.

On Wed, Nov 11, 2020 at 7:57 PM Jiahui Jiang 
mailto:qzhzm173...@hotmail.com>> wrote:
Since the issue is right now we can't dynamically generate a keystore when the 
YARN application launches, but before the JobManager process starts. Do you 
think the best short term solution we will hack around 
`yarn.container-start-command-template`and have it execute a custom script that 
can generate the keystore, then start the JM process? Will that be allowed 
given the current Flink architecture?

Thanks!

From: Jiahui Jiang mailto:qzhzm173...@hotmail.com>>
Sent: Wednesday, November 11, 2020 9:09 AM
To: matth...@ververica.com 
mailto:matth...@ververica.com>>
Cc: user@flink.apache.org 
mailto:user@flink.apache.org>>; 
aljos...@apache.org 
mailto:aljos...@apache.org>>
Subject: Re: SSL setup for YARN deployment when hostnames are unknown.

Hello Matthias,

Thank you for the links! I did see the documentations and went through the 
sourcecode. But unfortunately it looks like only a prebuilt keystore can be 
supported for YARN right now.

In term of dynamic loading security modules, the link you sent seems to mainly 
for zookeeper's security? I checked the part of code that sets up SSL for rest 
server [1], it doesn't look like the SslContext creation path is pluggable.


[1]
 
https://github.com/apache/flink/blob/be419e2560ef89683b7795c75eb08ae2337fefee/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpointConfiguration.java#L160

From: Matthias Pohl mailto:matth...@ververica.com>>
Sent: Wednesday, November 11, 2020 3:58 AM
To: Jiahui Jiang mailto:qzhzm173...@hotmail.com>>
Cc: user@flink.apache.org 
mailto:user@flink.apache.org>>; 
aljos...@apache.org 
mailto:aljos...@apache.org>>
Subject: Re: SSL setup for YARN deployment when hostnames are unknown.

Hi Jiahui,
thanks for reaching out to the mailing list. This is not something I have 
expertise in. But have you checked out the Flink SSL Setup documentation [1]? 
Maybe, you'd find some help there.

Additionally, I did go through the code a bit: A SecurityContext is loaded 
during ClusterEntrypoint startup [2]. It supports dynamic loading of security 
modules. You might have to implement 
org.apache.flink.runtime.security.contexts.SecurityContextFactory and configure 
it in your flink-conf.yaml. Is this something that might help you? I'm adding 
Aljoscha to this thread as he worked on dynamically loading these modules 
recently.

Best,
Matthias

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/security-ssl.html
[2] 
https://github.com/apache/flink/blob/2c8631a4eb7a247ce8fb4205f838e8c0f8019367/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java#L170

On Wed, Nov 11, 2020 at 6:17 AM Jiahui Jiang 
mailto:qzhzm173...@hotmail.com>> wrote:
Ping on this   It there anyway I can run a script or implement some interface 
to run before the Dispatcher service starts up to dynamically generate the 
keystore?

Thank you!

From: Jiahui Jiang mailto:qzhzm173...@hotmail.com>>
Sent: Monday, November 9, 2020 3:19 PM
To: user@flink.apache.org 
mailto:user@flink.apache.org>>
Subject: SSL setup for YARN deployment when hostnames are unknown.

Hello Flink!

We are working on turning on REST SSL for YARN deployments. We built a generic 
orchestration server that can submit Flink clusters to any YARN clusters given 
the relevant Hadoop configs. But this means we may not know the hostname the 
Job Managers can 

Re: Filter Null in Array in SQL Connector

2020-11-19 Thread Dylan Forciea
Ah yes, missed the kafka part and just saw the array part. FLINK-19771 
definitely was solely in the postgres-specific code.

Dylan

From: Jark Wu 
Date: Thursday, November 19, 2020 at 9:12 AM
To: Dylan Forciea 
Cc: Danny Chan , Rex Fenley , Flink 
ML 
Subject: Re: Filter Null in Array in SQL Connector

Hi Dylan,

I think Rex encountered another issue, because he is using Kafka with Debezium 
format.

Hi Rex,

If you can share the json data and the exception stack, that would be helpful!

Besides, you can try to enable 'debezium-json.ignore-parse-errors' option [1] 
to skip the dirty data.

Best,
Jark

[1]: 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/debezium.html#debezium-json-ignore-parse-errors

On Thu, 19 Nov 2020 at 21:13, Dylan Forciea 
mailto:dy...@oseberg.io>> wrote:
Do you mean that the array contains values that are null, or that the entire 
array itself is null? If it’s the latter, I have an issue written, along with a 
PR to fix it that has been pending review [1].

Regards,
Dylan Forciea

[1] https://issues.apache.org/jira/browse/FLINK-19771

From: Danny Chan mailto:danny0...@apache.org>>
Date: Thursday, November 19, 2020 at 2:24 AM
To: Rex Fenley mailto:r...@remind101.com>>
Cc: Flink ML mailto:user@flink.apache.org>>
Subject: Re: Filter Null in Array in SQL Connector

Hi, Fenley ~

You are right, parsing nulls of ARRAY field is not supported now, i have logged 
an issue [1] and would fix it soon ~

[1] https://issues.apache.org/jira/browse/FLINK-20234

Rex Fenley mailto:r...@remind101.com>> 于2020年11月19日周四 
下午2:51写道:
Hi,

I recently discovered some of our data has NULL values arriving in an 
ARRAY column. This column is being consumed by Flink via the Kafka 
connector Debezium format. We seem to be receiving NullPointerExceptions for 
when these NULL values in the arrays arrive which restarts the source operator 
in a loop.

Is there any way to not throw or to possibly filter out NULLs in an Array of 
Strings in Flink?

We're somewhat stuck on how to solve this problem, we'd like to be defensive 
about this on Flink's side.

Thanks!

(P.S. The exception was not that informative, there may be room for improvement 
in terms of a richer error message when this happens.)

--

Rex Fenley  |  Software Engineer - Mobile and Backend



Remind.com |  BLOG  |  FOLLOW 
US  |  LIKE US


Re: Logs of JobExecutionListener

2020-11-19 Thread Flavio Pompermaier
You're right..I removed my flink dir and I re-extracted it and now it
works. Unfortunately I didn't keep the old version to understand what
were the difference but the error was probably caused by the fact that
I had a previous version of the WordCount.jar (without the listener)
in the flink lib dir.. (in another dev session I was experimenting in
running the job having the user jar in the lib dir). Sorry for the
confusion.
Just one last question: is the listener executed on the client or on
the job server? This is not entirely clear to me..

Best,
Flavio

On Thu, Nov 19, 2020 at 1:53 PM Andrey Zagrebin  wrote:
>
> I also tried 1.11.0 and 1.11.2, both work for me.
>
> On Thu, Nov 19, 2020 at 3:39 PM Aljoscha Krettek  wrote:
>>
>> Hmm, there was this issue:
>> https://issues.apache.org/jira/browse/FLINK-17744 But it should be fixed
>> in your version.
>>
>> On 19.11.20 12:58, Flavio Pompermaier wrote:
>> > Which version are you using?
>> > I used the exact same commands on Flink 1.11.0 and I didn't get the job
>> > listener output..
>> >
>> > Il gio 19 nov 2020, 12:53 Andrey Zagrebin  ha 
>> > scritto:
>> >
>> >> Hi Flavio and Aljoscha,
>> >>
>> >> Sorry for the late heads up. I could not actually reproduce the reported
>> >> problem with 'flink run' and local standalone cluster on master.
>> >> I get the expected output with the suggested modification of WordCount
>> >> program:
>> >>
>> >> $ bin/start-cluster.sh
>> >>
>> >> $ rm -rf out; bin/flink run
>> >> flink/flink-examples/flink-examples-batch/target/WordCount.jar --output
>> >> flink/build-target/out
>> >>
>> >> Executing WordCount example with default input data set.
>> >> Use --input to specify file input.
>> >>  SUBMITTED
>> >> Job has been submitted with JobID c454a894d0524ccb69943b95838eea07
>> >> Program execution finished
>> >> Job with JobID c454a894d0524ccb69943b95838eea07 has finished.
>> >> Job Runtime: 139 ms
>> >>
>> >>  EXECUTED
>> >>
>> >> Best,
>> >> Andrey
>> >>
>> >> On Thu, Nov 19, 2020 at 2:40 PM Aljoscha Krettek 
>> >> wrote:
>> >>
>> >>> JobListener.onJobExecuted() is only invoked in
>> >>> ExecutionEnvironment.execute() and ContextEnvironment.execute(). If none
>> >>> of these is still in the call chain with that setup then the listener
>> >>> will not be invoked.
>> >>>
>> >>> Also, this would only happen on the client, not on the broker (in your
>> >>> case) or the server (JobManager).
>> >>>
>> >>> Does that help to debug the problem?
>> >>>
>> >>> Aljoscha
>> >>>
>> >>> On 19.11.20 09:49, Flavio Pompermaier wrote:
>>  I have a spring boot job server that act as a broker towards our
>>  application and a Flink session cluster. To submit a job I use the
>>  FlinkRestClient (that is also the one used in the CLI client when I use
>> >>> the
>>  run action it if I'm not wrong). However both methods don't trigger the
>> >>> job
>>  listener.
>> 
>>  Il gio 19 nov 2020, 09:39 Aljoscha Krettek  ha
>> >>> scritto:
>> 
>> > @Flavio, when you're saying you're using the RestClusterClient, you are
>> > not actually using that manually, right? You're just submitting your
>> >>> job
>> > via "bin/flink run ...", right?
>> >
>> > What's the exact invocation of "bin/flink run" that you're using?
>> >
>> > On 19.11.20 09:29, Andrey Zagrebin wrote:
>> >> Hi Flavio,
>> >>
>> >> I think I can reproduce what you are reporting (assuming you also pass
>> >> '--output' to 'flink run').
>> >> I am not sure why it behaves like this. I would suggest filing a Jira
>> >> ticket for this.
>> >>
>> >> Best,
>> >> Andrey
>> >>
>> >> On Wed, Nov 18, 2020 at 9:45 AM Flavio Pompermaier <
>> >>> pomperma...@okkam.it
>> >>
>> >> wrote:
>> >>
>> >>> is this a bug or is it a documentation problem...?
>> >>>
>> >>> Il sab 14 nov 2020, 18:44 Flavio Pompermaier 
>> >>> ha
>> >>> scritto:
>> >>>
>>  I've also verified that the problem persist also using a modified
>> > version
>>  of the WordCount class.
>>  If you add the code pasted at the end of this email at the end of
>> >>> its
>>  main method you can verify that the listener is called if you run
>> >>> the
>>  program from the IDE, but it's not called if you submit the job
>> >>> using
>> > the
>>  CLI client using the command
>> 
>>    - bin/flink run
>> 
>> >
>> >>>   
>> >>> /home/okkam/git/flink/flink-examples/flink-examples-batch/target/WordCount.jar
>> 
>>  Maybe this is an expected result but I didn't find any
>> >>> documentation of
>>  this behaviour (neither in the Javadoc or in the flink web site,
>> >>> where
>> > I
>>  can't find any documentation about JobListener at all).
>> 
>>  [Code to add to main()]
>> // emit result
>> if (params.has("output")) {
>> 

Re: flink sql cdc sum 结果出现NULL

2020-11-19 Thread Jark Wu
你可以先直接 select * from hive.database.table; 看看每个字段的值是否是正常正确的,有无 null
值的,以验证你的自定义 format 没有问题。

Best,
Jark

On Thu, 19 Nov 2020 at 22:41, kandy.wang  wrote:

> --mysql表
> CREATE TABLE IF NOT EXISTS `mysql_realtime_leaving_price_spu_index_agg`(
>`id` INT UNSIGNED AUTO_INCREMENT,
>`spu_id` BIGINT NOT NULL,
>`leaving_price`  DECIMAL(10, 5)
>PRIMARY KEY ( `id` ),
>unique key idx_spu_id (spu_id)
> )ENGINE=InnoDB DEFAULT CHARSET=utf8
>
>
> --flink表
> CREATE TABLE hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg (
>`spu_id` BIGINT ,
>`leaving_price`  DECIMAL(10, 5),
> PRIMARY KEY ( `spu_id`) NOT ENFORCED
> ) WITH (
>   'connector' = 'jdbc',
>'url' = 'jdbc:mysql://...',
>'table-name' = 'mysql_realtime_leaving_price_spu_index_agg',
>'username' = '...',
>'password' = '..'
> );
>
>
> --binlog 2mysql
>
> insert into hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg
>
> SELECT v_spu_id as spu_id,sum(leaving_num*price) as leaving_price
>
> FROM hive.database.table
>
> group by v_spu_id;
>
>
> hive.database.table是指向公司自己的kafka,实现了类似cannal cdc的那4种消息格式。
>
>
> 问题是 mysql结果表 leaving_price 出现很多NULL的情况,看了一下数据leaving_num  price
> 字段都不可能出现NULL的情况。就不清楚为啥结果表可能会为NULL。
> 有什么好的排查思路么?
>
>
>
>
>
>


Re: How to use EventTimeSessionWindows.withDynamicGap()

2020-11-19 Thread Simone Cavallarin
Many thanks for the Help!!

Simone


From: Aljoscha Krettek 
Sent: 19 November 2020 11:46
To: user@flink.apache.org 
Subject: Re: How to use EventTimeSessionWindows.withDynamicGap()

On 17.11.20 17:37, Simone Cavallarin wrote:
> Hi,
>
> I have been working on the suggestion that you gave me, thanks! The first 
> part is to add to the message the gap. 1)I receive the event, 2)I take that 
> event and I map it using  StatefulsessionCalculator, that is where I put 
> together "The message", and "long" that is my gap in millis.
>
> DataStream source = 
>
> Operation in front of the window that keeps track of session gaps
>
> DataStream> enriched = source
> .keyBy()
> .map(new StatefulSessionCalculator()); // or process()
>
> This is my StatefulSessionCalculator():
>
> Tuple2 map(MyMessageType input) {
> ValueState valueState = getState(myModelStateDescriptor);
> MyState state = valueState.value()
> state.update(input);
> long suggestedGap = state.getSuggestedGap();
> valueState.update(state);
> return Tuple2.of(input, suggestedGap);
> }
>
> If the "gap" calculated is "1234".
> The result would be: [Tom, 1.70, 50, 1605612588995], [1234]>?

That looks correct, yes.

> The second step is to use the gap calculated through  
> DynamicWindowGapExtractor().
>
> DataStream<...> result = enriched
> .keyBy(new MyKeySelector())
> .window(EventTimeSessionWindows.withDynamicGap(new 
> DynamicWindowGapExtractor()))
>
>
> The DynamicWindowGapExtractor() extract the gap from the message and feed it 
> back to Flink.
> Could you please give me an example also for this one?

This would just be class that extends
SessionWindowTimeGapExtractor> and returns the gap
from the extract() method.

> One thing that I don't understand is that after enriching the message my 
> event that contain a POJO is nested inside tuple. How can I access it?

You would just read the first field of the tuple, i.e. tuple.f0.


> The last point when you said: "I think, however, that it might be easier at 
> this point to just use a stateful ProcessFunction", you meant a completely 
> different approach, would be better?

That's what I meant yes. Because it seems to complicated to split the
logic into the part that determines the dynamic gap and then another
part that does the computation per session. It seems easier to just roll
that into one operator that does everything. And with state and timers
you should have enough flexibility.

Best,
Aljoscha



Re: Filter Null in Array in SQL Connector

2020-11-19 Thread Jark Wu
Hi Dylan,

I think Rex encountered another issue, because he is using Kafka with
Debezium format.

Hi Rex,

If you can share the json data and the exception stack, that would be
helpful!

Besides, you can try to enable 'debezium-json.ignore-parse-errors' option
[1] to skip the dirty data.

Best,
Jark

[1]:
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/debezium.html#debezium-json-ignore-parse-errors

On Thu, 19 Nov 2020 at 21:13, Dylan Forciea  wrote:

> Do you mean that the array contains values that are null, or that the
> entire array itself is null? If it’s the latter, I have an issue written,
> along with a PR to fix it that has been pending review [1].
>
>
>
> Regards,
>
> Dylan Forciea
>
>
>
> [1] https://issues.apache.org/jira/browse/FLINK-19771
>
>
>
> *From: *Danny Chan 
> *Date: *Thursday, November 19, 2020 at 2:24 AM
> *To: *Rex Fenley 
> *Cc: *Flink ML 
> *Subject: *Re: Filter Null in Array in SQL Connector
>
>
>
> Hi, Fenley ~
>
>
>
> You are right, parsing nulls of ARRAY field is not supported now, i have
> logged an issue [1] and would fix it soon ~
>
>
>
> [1] https://issues.apache.org/jira/browse/FLINK-20234
>
>
>
> Rex Fenley  于2020年11月19日周四 下午2:51写道:
>
> Hi,
>
>
>
> I recently discovered some of our data has NULL values arriving in an
> ARRAY column. This column is being consumed by Flink via the Kafka
> connector Debezium format. We seem to be receiving NullPointerExceptions
> for when these NULL values in the arrays arrive which restarts the source
> operator in a loop.
>
>
>
> Is there any way to not throw or to possibly filter out NULLs in an Array
> of Strings in Flink?
>
>
>
> We're somewhat stuck on how to solve this problem, we'd like to be
> defensive about this on Flink's side.
>
>
>
> Thanks!
>
>
>
> (P.S. The exception was not that informative, there may be room for
> improvement in terms of a richer error message when this happens.)
>
>
> --
>
> *Rex Fenley*  |  Software Engineer - Mobile and Backend
>
>
>
> *Remind.com*  |  BLOG 
>  |  FOLLOW US   |  *LIKE US
> *
>
>


flink sql cdc sum 结果出现NULL

2020-11-19 Thread kandy.wang
--mysql表
CREATE TABLE IF NOT EXISTS `mysql_realtime_leaving_price_spu_index_agg`(
   `id` INT UNSIGNED AUTO_INCREMENT,
   `spu_id` BIGINT NOT NULL,
   `leaving_price`  DECIMAL(10, 5)
   PRIMARY KEY ( `id` ),
   unique key idx_spu_id (spu_id)
)ENGINE=InnoDB DEFAULT CHARSET=utf8


--flink表
CREATE TABLE hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg (
   `spu_id` BIGINT ,
   `leaving_price`  DECIMAL(10, 5),
PRIMARY KEY ( `spu_id`) NOT ENFORCED
) WITH (
  'connector' = 'jdbc',
   'url' = 'jdbc:mysql://...',
   'table-name' = 'mysql_realtime_leaving_price_spu_index_agg',
   'username' = '...',
   'password' = '..'
);


--binlog 2mysql

insert into hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg

SELECT v_spu_id as spu_id,sum(leaving_num*price) as leaving_price

FROM hive.database.table

group by v_spu_id;


hive.database.table是指向公司自己的kafka,实现了类似cannal cdc的那4种消息格式。


问题是 mysql结果表 leaving_price 出现很多NULL的情况,看了一下数据leaving_num  price 
字段都不可能出现NULL的情况。就不清楚为啥结果表可能会为NULL。
有什么好的排查思路么?







Re: Filter Null in Array in SQL Connector

2020-11-19 Thread Dylan Forciea
Do you mean that the array contains values that are null, or that the entire 
array itself is null? If it’s the latter, I have an issue written, along with a 
PR to fix it that has been pending review [1].

Regards,
Dylan Forciea

[1] https://issues.apache.org/jira/browse/FLINK-19771

From: Danny Chan 
Date: Thursday, November 19, 2020 at 2:24 AM
To: Rex Fenley 
Cc: Flink ML 
Subject: Re: Filter Null in Array in SQL Connector

Hi, Fenley ~

You are right, parsing nulls of ARRAY field is not supported now, i have logged 
an issue [1] and would fix it soon ~

[1] https://issues.apache.org/jira/browse/FLINK-20234

Rex Fenley mailto:r...@remind101.com>> 于2020年11月19日周四 
下午2:51写道:
Hi,

I recently discovered some of our data has NULL values arriving in an 
ARRAY column. This column is being consumed by Flink via the Kafka 
connector Debezium format. We seem to be receiving NullPointerExceptions for 
when these NULL values in the arrays arrive which restarts the source operator 
in a loop.

Is there any way to not throw or to possibly filter out NULLs in an Array of 
Strings in Flink?

We're somewhat stuck on how to solve this problem, we'd like to be defensive 
about this on Flink's side.

Thanks!

(P.S. The exception was not that informative, there may be room for improvement 
in terms of a richer error message when this happens.)

--

Rex Fenley  |  Software Engineer - Mobile and Backend



Remind.com |  BLOG  |  FOLLOW 
US  |  LIKE US


Re: Caching Mechanism in Flink

2020-11-19 Thread Andrey Zagrebin
Hi Iacovos,

As Matthias mentioned tasks' off-heap has nothing to do with the memory
segments. This memory component is reserved only for the user code.

The memory segments are managed by Flink and used for batch workloads, like
in memory joins etc.
They are part of managed memory (taskmanager.memory.managed.size)
which is also off-heap but not tasks' off-heap
(taskmanager.memory.task.off-heap.size) and not JVM direct memory.

The memory segments are also used to wrap network buffers. Those are JVM
direct memory (which is also off-heap) but again it is not about the tasks'
off-heap.

Maybe, the confusion comes from the fact that 'off-heap' generally refers
to everything which is not JVM Heap: direct or native memory.
The tasks' off-heap is that part of general 'off-heap' (direct memory limit
to be precise) which is reserved only for the user code but not intended to
be used by Flink.

Best,
Andrey

On Wed, Nov 11, 2020 at 3:06 PM Jack Kolokasis 
wrote:

> Hi Matthias,
>
> Yeap, I am refer to the tasks' off-heap configuration value.
>
> Best,
> Iacovos
> On 11/11/20 1:37 μ.μ., Matthias Pohl wrote:
>
> When talking about the "off-heap" in your most recent message, are you
> still referring to the task's off-heap configuration value?
>
> AFAIK, the HybridMemorySegment shouldn't be directly related to the
> off-heap parameter.
>
> The HybridMemorySegment can be used as a wrapper around any kind of
> memory, i.e. byte[]. It can be either used for heap memory but also
> DirectByteBuffers (located in JVM's direct memory pool which is not part of
> the JVM's heap) or memory allocated through Unsafe's allocation methods
> (so-called native memory which is also not part of the JVM's heap).
> The HybridMemorySegments are utilized within the MemoryManager class. The
> MemoryManager instances are responsible for maintaining the managed memory
> used in each of the TaskSlots. Managed Memory is used in different settings
> (e.g. for the RocksDB state backend in streaming applications). It can be
> configured using taskmanager.memory.managed.size (or the corresponding
> *.fraction parameter) [1]. See more details on that in [2].
>
> I'm going to pull in Andrey as he has worked on that topic recently.
>
> Best,
> Matthias
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/config.html#taskmanager-memory-managed-size
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/memory/mem_setup_tm.html#managed-memory
>
> On Wed, Nov 11, 2020 at 12:00 PM Jack Kolokasis 
> wrote:
>
>> Hi Matthias,
>>
>> Thank you for your reply and useful information. I find that the off-heap
>> is used when Flink uses HybridMemorySegments. Well, how the Flink knows
>> when to use these HybridMemorySegments and in which operations this is
>> happened?
>>
>> Best,
>> Iacovos
>> On 11/11/20 11:41 π.μ., Matthias Pohl wrote:
>>
>> Hi Iacovos,
>> The task's off-heap configuration value is used when spinning up
>> TaskManager containers in a clustered environment. It will contribute to
>> the overall memory reserved for a TaskManager container during deployment.
>> This parameter can be used to influence the amount of memory allocated if
>> the user code relies on DirectByteBuffers and/or native memory allocation.
>> There is no active memory pool management beyond that from Flink's side.
>> The configuration parameter is ignored if you run a Flink cluster locally.
>>
>> Besides this, Flink also utilizes the JVM's using DirectByteBuffers (for
>> network buffers) and native memory (through Flink's internally used managed
>> memory) internally.
>>
>> You can find a more detailed description of Flink's memory model in [1].
>> I hope that helps.
>>
>> Best,
>> Matthias
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/memory/mem_setup_tm.html#detailed-memory-model
>>
>> On Tue, Nov 10, 2020 at 3:57 AM Jack Kolokasis 
>> wrote:
>>
>>> Thank you Xuannan for the reply.
>>>
>>> Also I want to ask about how Flink uses the off-heap memory. If I set
>>> taskmanager.memory.task.off-heap.size then which data does Flink allocate
>>> off-heap? This is handle by the programmer?
>>>
>>> Best,
>>> Iacovos
>>> On 10/11/20 4:42 π.μ., Xuannan Su wrote:
>>>
>>> Hi Jack,
>>>
>>> At the moment, Flink doesn't support caching the intermediate result.
>>> However, there is some ongoing effort to support caching in Flink.
>>> FLIP-36[1] propose to add the caching mechanism at the Table API. And it
>>> is planned for 1.13.
>>>
>>> Best,
>>> Xuannan
>>>
>>> On Nov 10, 2020, 4:29 AM +0800, Jack Kolokasis ,
>>> wrote:
>>>
>>> Hello all,
>>>
>>> I am new to Flink and I want to ask if the Flink supports a caching
>>> mechanism to store intermediate results in memory for machine learning
>>> workloads.
>>>
>>> If yes, how can I enable it and how can I use it?
>>>
>>> Thank you,
>>> Iacovos
>>>
>>>


Re: Logs of JobExecutionListener

2020-11-19 Thread Andrey Zagrebin
I also tried 1.11.0 and 1.11.2, both work for me.

On Thu, Nov 19, 2020 at 3:39 PM Aljoscha Krettek 
wrote:

> Hmm, there was this issue:
> https://issues.apache.org/jira/browse/FLINK-17744 But it should be fixed
> in your version.
>
> On 19.11.20 12:58, Flavio Pompermaier wrote:
> > Which version are you using?
> > I used the exact same commands on Flink 1.11.0 and I didn't get the job
> > listener output..
> >
> > Il gio 19 nov 2020, 12:53 Andrey Zagrebin  ha
> scritto:
> >
> >> Hi Flavio and Aljoscha,
> >>
> >> Sorry for the late heads up. I could not actually reproduce the reported
> >> problem with 'flink run' and local standalone cluster on master.
> >> I get the expected output with the suggested modification of WordCount
> >> program:
> >>
> >> $ bin/start-cluster.sh
> >>
> >> $ rm -rf out; bin/flink run
> >> flink/flink-examples/flink-examples-batch/target/WordCount.jar --output
> >> flink/build-target/out
> >>
> >> Executing WordCount example with default input data set.
> >> Use --input to specify file input.
> >>  SUBMITTED
> >> Job has been submitted with JobID c454a894d0524ccb69943b95838eea07
> >> Program execution finished
> >> Job with JobID c454a894d0524ccb69943b95838eea07 has finished.
> >> Job Runtime: 139 ms
> >>
> >>  EXECUTED
> >>
> >> Best,
> >> Andrey
> >>
> >> On Thu, Nov 19, 2020 at 2:40 PM Aljoscha Krettek 
> >> wrote:
> >>
> >>> JobListener.onJobExecuted() is only invoked in
> >>> ExecutionEnvironment.execute() and ContextEnvironment.execute(). If
> none
> >>> of these is still in the call chain with that setup then the listener
> >>> will not be invoked.
> >>>
> >>> Also, this would only happen on the client, not on the broker (in your
> >>> case) or the server (JobManager).
> >>>
> >>> Does that help to debug the problem?
> >>>
> >>> Aljoscha
> >>>
> >>> On 19.11.20 09:49, Flavio Pompermaier wrote:
>  I have a spring boot job server that act as a broker towards our
>  application and a Flink session cluster. To submit a job I use the
>  FlinkRestClient (that is also the one used in the CLI client when I
> use
> >>> the
>  run action it if I'm not wrong). However both methods don't trigger
> the
> >>> job
>  listener.
> 
>  Il gio 19 nov 2020, 09:39 Aljoscha Krettek  ha
> >>> scritto:
> 
> > @Flavio, when you're saying you're using the RestClusterClient, you
> are
> > not actually using that manually, right? You're just submitting your
> >>> job
> > via "bin/flink run ...", right?
> >
> > What's the exact invocation of "bin/flink run" that you're using?
> >
> > On 19.11.20 09:29, Andrey Zagrebin wrote:
> >> Hi Flavio,
> >>
> >> I think I can reproduce what you are reporting (assuming you also
> pass
> >> '--output' to 'flink run').
> >> I am not sure why it behaves like this. I would suggest filing a
> Jira
> >> ticket for this.
> >>
> >> Best,
> >> Andrey
> >>
> >> On Wed, Nov 18, 2020 at 9:45 AM Flavio Pompermaier <
> >>> pomperma...@okkam.it
> >>
> >> wrote:
> >>
> >>> is this a bug or is it a documentation problem...?
> >>>
> >>> Il sab 14 nov 2020, 18:44 Flavio Pompermaier  >
> >>> ha
> >>> scritto:
> >>>
>  I've also verified that the problem persist also using a modified
> > version
>  of the WordCount class.
>  If you add the code pasted at the end of this email at the end of
> >>> its
>  main method you can verify that the listener is called if you run
> >>> the
>  program from the IDE, but it's not called if you submit the job
> >>> using
> > the
>  CLI client using the command
> 
>    - bin/flink run
> 
> >
> >>>
>  
> /home/okkam/git/flink/flink-examples/flink-examples-batch/target/WordCount.jar
> 
>  Maybe this is an expected result but I didn't find any
> >>> documentation of
>  this behaviour (neither in the Javadoc or in the flink web site,
> >>> where
> > I
>  can't find any documentation about JobListener at all).
> 
>  [Code to add to main()]
> // emit result
> if (params.has("output")) {
>   counts.writeAsCsv(params.get("output"), "\n", " ");
>   // execute program
>   env.registerJobListener(new JobListener() {
> 
> @Override
> public void onJobSubmitted(JobClient arg0, Throwable
> >>> arg1) {
>   System.out.println(" SUBMITTED");
> }
> 
> @Override
> public void onJobExecuted(JobExecutionResult arg0,
> >>> Throwable
>  arg1) {
>   System.out.println(" EXECUTED");
> }
>   });
>   env.execute("WordCount Example");
> } 

Re: Logs of JobExecutionListener

2020-11-19 Thread Aljoscha Krettek
Hmm, there was this issue: 
https://issues.apache.org/jira/browse/FLINK-17744 But it should be fixed 
in your version.


On 19.11.20 12:58, Flavio Pompermaier wrote:

Which version are you using?
I used the exact same commands on Flink 1.11.0 and I didn't get the job
listener output..

Il gio 19 nov 2020, 12:53 Andrey Zagrebin  ha scritto:


Hi Flavio and Aljoscha,

Sorry for the late heads up. I could not actually reproduce the reported
problem with 'flink run' and local standalone cluster on master.
I get the expected output with the suggested modification of WordCount
program:

$ bin/start-cluster.sh

$ rm -rf out; bin/flink run
flink/flink-examples/flink-examples-batch/target/WordCount.jar --output
flink/build-target/out

Executing WordCount example with default input data set.
Use --input to specify file input.
 SUBMITTED
Job has been submitted with JobID c454a894d0524ccb69943b95838eea07
Program execution finished
Job with JobID c454a894d0524ccb69943b95838eea07 has finished.
Job Runtime: 139 ms

 EXECUTED

Best,
Andrey

On Thu, Nov 19, 2020 at 2:40 PM Aljoscha Krettek 
wrote:


JobListener.onJobExecuted() is only invoked in
ExecutionEnvironment.execute() and ContextEnvironment.execute(). If none
of these is still in the call chain with that setup then the listener
will not be invoked.

Also, this would only happen on the client, not on the broker (in your
case) or the server (JobManager).

Does that help to debug the problem?

Aljoscha

On 19.11.20 09:49, Flavio Pompermaier wrote:

I have a spring boot job server that act as a broker towards our
application and a Flink session cluster. To submit a job I use the
FlinkRestClient (that is also the one used in the CLI client when I use

the

run action it if I'm not wrong). However both methods don't trigger the

job

listener.

Il gio 19 nov 2020, 09:39 Aljoscha Krettek  ha

scritto:



@Flavio, when you're saying you're using the RestClusterClient, you are
not actually using that manually, right? You're just submitting your

job

via "bin/flink run ...", right?

What's the exact invocation of "bin/flink run" that you're using?

On 19.11.20 09:29, Andrey Zagrebin wrote:

Hi Flavio,

I think I can reproduce what you are reporting (assuming you also pass
'--output' to 'flink run').
I am not sure why it behaves like this. I would suggest filing a Jira
ticket for this.

Best,
Andrey

On Wed, Nov 18, 2020 at 9:45 AM Flavio Pompermaier <

pomperma...@okkam.it


wrote:


is this a bug or is it a documentation problem...?

Il sab 14 nov 2020, 18:44 Flavio Pompermaier 

ha

scritto:


I've also verified that the problem persist also using a modified

version

of the WordCount class.
If you add the code pasted at the end of this email at the end of

its

main method you can verify that the listener is called if you run

the

program from the IDE, but it's not called if you submit the job

using

the

CLI client using the command

  - bin/flink run




  /home/okkam/git/flink/flink-examples/flink-examples-batch/target/WordCount.jar


Maybe this is an expected result but I didn't find any

documentation of

this behaviour (neither in the Javadoc or in the flink web site,

where

I

can't find any documentation about JobListener at all).

[Code to add to main()]
   // emit result
   if (params.has("output")) {
 counts.writeAsCsv(params.get("output"), "\n", " ");
 // execute program
 env.registerJobListener(new JobListener() {

   @Override
   public void onJobSubmitted(JobClient arg0, Throwable

arg1) {

 System.out.println(" SUBMITTED");
   }

   @Override
   public void onJobExecuted(JobExecutionResult arg0,

Throwable

arg1) {
 System.out.println(" EXECUTED");
   }
 });
 env.execute("WordCount Example");
   } else {
 System.out.println("Printing result to stdout. Use --output

to

specify output path.");
 counts.print();
   }

On Fri, Nov 13, 2020 at 4:25 PM Flavio Pompermaier <

pomperma...@okkam.it>

wrote:


see inline

Il ven 13 nov 2020, 14:31 Matthias Pohl 

ha

scritto:


Hi Flavio,
thanks for sharing this with the Flink community. Could you answer

the

following questions, please:
- What's the code of your Job's main method?



it's actually very simple...the main class creates a batch

execution

env

using ExecutionEnvironment.getExecutionEnvironment(), I register a

job

listener to the env and I do some stuff before calling

env.execute().

The listener is executed correctly but if I use the

RestClusterClient

to

sibmit the jobGraph exyracted from that main contained in a jar,

the

program is executed as usual but the job listener is not called.

- What cluster backend and application do you use to execute the

job?




I use a standalone session cluster for the moment

- Is there anything suspicious you can find in the logs that might

be

Re: Strange behaviour when using RMQSource in Flink 1.11.2

2020-11-19 Thread Thomas Eckestad
Hi Andrey,

Thank you for your response. I created 
https://issues.apache.org/jira/browse/FLINK-20244.

Best Regards,
Thomas

From: Andrey Zagrebin 
Sent: Thursday, November 19, 2020 8:41
To: Thomas Eckestad 
Cc: user@flink.apache.org 
Subject: Re: Strange behaviour when using RMQSource in Flink 1.11.2

Hi Thomas,

I am not an expert on RMQSource connector but your concerns look valid.
Could you file a Jira issue in Flink issue tracker? [1]

I cannot immediately refer to a committer who could help with this but let's 
hope that the issue gets attention.
If you want to contribute an improvement for this in Flink, you can write your 
suggestion there as well
and once there is positive feedback from a committer, a github PR can be opened.

Best,
Andrey

[1] 
https://issues.apache.org/jira/projects/FLINK


On Wed, Nov 18, 2020 at 3:49 PM Thomas Eckestad 
mailto:thomas.eckes...@verisure.com>> wrote:
Hi,

we are using the RabbitMQ source connector with exactly-once guarantees. For 
this to work, according to the official Flink documentation, we are supplying 
correlation IDs with each published message and we use a parallelism of one 
with the Flink job being the single/only consumer of the queue in question (and 
we have enabled checkpointing).

The following behavior by the RMQSource seems strange to us. When a job is 
restarted from a checkpoint and there are unacked messages on the RabbitMQ 
queue for messages processed in the previous checkpoint interval, those 
messages will stay unacked until the job either finishes or is restarted again. 
When the connection to RabbitMQ is later closed (the job finished or is 
restarted), the unacked messages will be requeued for resend and sent when the 
next connection is established.

When looking at the source code, messages are ACK:ed by the RMQSource after a 
checkpoint is complete 
(MessageAcknowledgingSourceBase::notifyCheckpointComplete).

Also, when looking at the source code in RMQSource::setMessageIdentifier() (on 
the master branch, the ACK semantics does not seem to have changed since 
1.11.2) it is clear that if a RMQ message carries a correlation ID which has 
already been handled that message is skipped and not further processed. It is 
also clear that skipped messages are not added to the sessionIds-list of 
messages that are targeted for ACK to RMQ. I believe all successfully consumed 
RMQ messages should be ACK:ed, it is irrelevant if the message is ignored or 
processed by Flink. RMQ needs to know that the consumer considers the message 
as handled OK.

The following code is from RMQSource::setMessageIdentifier(). Note the return 
before sessionIds.add():
.
.
.
  if (!addId(correlationId)) {
// we have already processed this message
return false;
  }
}
sessionIds.add(deliveryTag);
.
.
.

Directly related to the above I also notice that RMQ connections are leaked at 
internal job restart. From the Flink log (this stack trace is from 1.11.2):

2020-11-18 10:08:25,118 ERROR 
org.apache.flink.streaming.runtime.tasks.StreamTask  [] - Error during 
disposal of stream operator.
com.rabbitmq.client.AlreadyClosedException: connection is already closed due to 
connection error; protocol method: #method(reply-code=320, 
reply-text=CONNECTION_FORCED - Closed via management plugin, class-id=0, 
method-id=0)
at com.rabbitmq.client.impl.AMQChannel.ensureIsOpen(AMQChannel.java:228) 
~[blob_p-ca82946ac30c169a36c8631b9c5ed87eac51082c-df5535f2bff73c43c02410fbc19aca4a:?]
at com.rabbitmq.client.impl.AMQChannel.rpc(AMQChannel.java:303) 
~[blob_p-ca82946ac30c169a36c8631b9c5ed87eac51082c-df5535f2bff73c43c02410fbc19aca4a:?]
at com.rabbitmq.client.impl.ChannelN.basicCancel(ChannelN.java:1294) 
~[blob_p-ca82946ac30c169a36c8631b9c5ed87eac51082c-df5535f2bff73c43c02410fbc19aca4a:?]
at 
com.rabbitmq.client.impl.recovery.AutorecoveringChannel.basicCancel(AutorecoveringChannel.java:482)
 
~[blob_p-ca82946ac30c169a36c8631b9c5ed87eac51082c-df5535f2bff73c43c02410fbc19aca4a:?]
at 
org.apache.flink.streaming.connectors.rabbitmq.RMQSource.close(RMQSource.java:192)
 
~[blob_p-ca82946ac30c169a36c8631b9c5ed87eac51082c-df5535f2bff73c43c02410fbc19aca4a:?]

AlreadyClosedException is not handled by the RMQSource::close(). This results 
in a RMQ connection thread somewhere being left behind. I triggered three 
restarts of the job in a row and noticed one new connection added to the pile 
of connections for each restart. I triggered the restart by killing the active 
connection to RMQ using the RMQ admin GUI (management plugin, see above 
exception details).

I also tried to kill one of the leaked connections. But a new one is instantly 
created when doing so. The traceback when doing this (1.11.2):

2020-11-18 10:27:51,715 ERROR 
com.rabbitmq.client.impl.ForgivingExceptionHandler  

State of Machine Learning with Flink and especially FLIP-39

2020-11-19 Thread Niklas Wilcke
Hi Flink-Community,

I'm digging through the history of FlinkML and FLIP-39 [0]. What I understood 
so far is that FlinkML has been removed in 1.9, because it got unmaintained.
I'm not really able to find out whether FLIP-39 and providing a replacement for 
FlinkML is currently worked on. The Umbrella Jira Ticket FLINK-12470 [1] looks 
stale to me.
Was there maybe a change of strategy in the meantime? Is the focus currently on 
PyFlink to provide ML-Solutions (FLIP-96 [2])?
It would be really interesting to get some insights about the future and 
roadmap of ML in the Flink ecosystem. Thank you very much!

Kind Regards,
Niklas

[0] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-39+Flink+ML+pipeline+and+ML+libs
[1] https://issues.apache.org/jira/browse/FLINK-12470
[2] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-96%3A+Support+Python+ML+Pipeline+API

smime.p7s
Description: S/MIME cryptographic signature


Re: Logs of JobExecutionListener

2020-11-19 Thread Flavio Pompermaier
Which version are you using?
I used the exact same commands on Flink 1.11.0 and I didn't get the job
listener output..

Il gio 19 nov 2020, 12:53 Andrey Zagrebin  ha scritto:

> Hi Flavio and Aljoscha,
>
> Sorry for the late heads up. I could not actually reproduce the reported
> problem with 'flink run' and local standalone cluster on master.
> I get the expected output with the suggested modification of WordCount
> program:
>
> $ bin/start-cluster.sh
>
> $ rm -rf out; bin/flink run
> flink/flink-examples/flink-examples-batch/target/WordCount.jar --output
> flink/build-target/out
>
> Executing WordCount example with default input data set.
> Use --input to specify file input.
>  SUBMITTED
> Job has been submitted with JobID c454a894d0524ccb69943b95838eea07
> Program execution finished
> Job with JobID c454a894d0524ccb69943b95838eea07 has finished.
> Job Runtime: 139 ms
>
>  EXECUTED
>
> Best,
> Andrey
>
> On Thu, Nov 19, 2020 at 2:40 PM Aljoscha Krettek 
> wrote:
>
>> JobListener.onJobExecuted() is only invoked in
>> ExecutionEnvironment.execute() and ContextEnvironment.execute(). If none
>> of these is still in the call chain with that setup then the listener
>> will not be invoked.
>>
>> Also, this would only happen on the client, not on the broker (in your
>> case) or the server (JobManager).
>>
>> Does that help to debug the problem?
>>
>> Aljoscha
>>
>> On 19.11.20 09:49, Flavio Pompermaier wrote:
>> > I have a spring boot job server that act as a broker towards our
>> > application and a Flink session cluster. To submit a job I use the
>> > FlinkRestClient (that is also the one used in the CLI client when I use
>> the
>> > run action it if I'm not wrong). However both methods don't trigger the
>> job
>> > listener.
>> >
>> > Il gio 19 nov 2020, 09:39 Aljoscha Krettek  ha
>> scritto:
>> >
>> >> @Flavio, when you're saying you're using the RestClusterClient, you are
>> >> not actually using that manually, right? You're just submitting your
>> job
>> >> via "bin/flink run ...", right?
>> >>
>> >> What's the exact invocation of "bin/flink run" that you're using?
>> >>
>> >> On 19.11.20 09:29, Andrey Zagrebin wrote:
>> >>> Hi Flavio,
>> >>>
>> >>> I think I can reproduce what you are reporting (assuming you also pass
>> >>> '--output' to 'flink run').
>> >>> I am not sure why it behaves like this. I would suggest filing a Jira
>> >>> ticket for this.
>> >>>
>> >>> Best,
>> >>> Andrey
>> >>>
>> >>> On Wed, Nov 18, 2020 at 9:45 AM Flavio Pompermaier <
>> pomperma...@okkam.it
>> >>>
>> >>> wrote:
>> >>>
>>  is this a bug or is it a documentation problem...?
>> 
>>  Il sab 14 nov 2020, 18:44 Flavio Pompermaier 
>> ha
>>  scritto:
>> 
>> > I've also verified that the problem persist also using a modified
>> >> version
>> > of the WordCount class.
>> > If you add the code pasted at the end of this email at the end of
>> its
>> > main method you can verify that the listener is called if you run
>> the
>> > program from the IDE, but it's not called if you submit the job
>> using
>> >> the
>> > CLI client using the command
>> >
>> >  - bin/flink run
>> >
>> >>
>>  
>> /home/okkam/git/flink/flink-examples/flink-examples-batch/target/WordCount.jar
>> >
>> > Maybe this is an expected result but I didn't find any
>> documentation of
>> > this behaviour (neither in the Javadoc or in the flink web site,
>> where
>> >> I
>> > can't find any documentation about JobListener at all).
>> >
>> > [Code to add to main()]
>> >   // emit result
>> >   if (params.has("output")) {
>> > counts.writeAsCsv(params.get("output"), "\n", " ");
>> > // execute program
>> > env.registerJobListener(new JobListener() {
>> >
>> >   @Override
>> >   public void onJobSubmitted(JobClient arg0, Throwable
>> arg1) {
>> > System.out.println(" SUBMITTED");
>> >   }
>> >
>> >   @Override
>> >   public void onJobExecuted(JobExecutionResult arg0,
>> Throwable
>> > arg1) {
>> > System.out.println(" EXECUTED");
>> >   }
>> > });
>> > env.execute("WordCount Example");
>> >   } else {
>> > System.out.println("Printing result to stdout. Use --output
>> to
>> > specify output path.");
>> > counts.print();
>> >   }
>> >
>> > On Fri, Nov 13, 2020 at 4:25 PM Flavio Pompermaier <
>> >> pomperma...@okkam.it>
>> > wrote:
>> >
>> >> see inline
>> >>
>> >> Il ven 13 nov 2020, 14:31 Matthias Pohl 
>> ha
>> >> scritto:
>> >>
>> >>> Hi Flavio,
>> >>> thanks for sharing this with the Flink community. Could you answer
>> >> the
>> >>> following questions, please:
>> >>> - What's the code of your Job's main method?
>> >>>
>> >>
>> 

Re: Logs of JobExecutionListener

2020-11-19 Thread Flavio Pompermaier
Hi Aljoscha, in my main class, within the jar, I create the env and I call
env.execute(). The listener is not called if the job is ran by the CLI
client or FlinkRestClient, I don't see anything on the job manager or task
manager. To me this is a bug and you can verify it attaching a listener to
the WordCount example and launching the job using the CLI client. If this
is the expected behaviour it is not reported anywhere

Il gio 19 nov 2020, 12:40 Aljoscha Krettek  ha scritto:

> JobListener.onJobExecuted() is only invoked in
> ExecutionEnvironment.execute() and ContextEnvironment.execute(). If none
> of these is still in the call chain with that setup then the listener
> will not be invoked.
>
> Also, this would only happen on the client, not on the broker (in your
> case) or the server (JobManager).
>
> Does that help to debug the problem?
>
> Aljoscha
>
> On 19.11.20 09:49, Flavio Pompermaier wrote:
> > I have a spring boot job server that act as a broker towards our
> > application and a Flink session cluster. To submit a job I use the
> > FlinkRestClient (that is also the one used in the CLI client when I use
> the
> > run action it if I'm not wrong). However both methods don't trigger the
> job
> > listener.
> >
> > Il gio 19 nov 2020, 09:39 Aljoscha Krettek  ha
> scritto:
> >
> >> @Flavio, when you're saying you're using the RestClusterClient, you are
> >> not actually using that manually, right? You're just submitting your job
> >> via "bin/flink run ...", right?
> >>
> >> What's the exact invocation of "bin/flink run" that you're using?
> >>
> >> On 19.11.20 09:29, Andrey Zagrebin wrote:
> >>> Hi Flavio,
> >>>
> >>> I think I can reproduce what you are reporting (assuming you also pass
> >>> '--output' to 'flink run').
> >>> I am not sure why it behaves like this. I would suggest filing a Jira
> >>> ticket for this.
> >>>
> >>> Best,
> >>> Andrey
> >>>
> >>> On Wed, Nov 18, 2020 at 9:45 AM Flavio Pompermaier <
> pomperma...@okkam.it
> >>>
> >>> wrote:
> >>>
>  is this a bug or is it a documentation problem...?
> 
>  Il sab 14 nov 2020, 18:44 Flavio Pompermaier 
> ha
>  scritto:
> 
> > I've also verified that the problem persist also using a modified
> >> version
> > of the WordCount class.
> > If you add the code pasted at the end of this email at the end of its
> > main method you can verify that the listener is called if you run the
> > program from the IDE, but it's not called if you submit the job using
> >> the
> > CLI client using the command
> >
> >  - bin/flink run
> >
> >>
>  
> /home/okkam/git/flink/flink-examples/flink-examples-batch/target/WordCount.jar
> >
> > Maybe this is an expected result but I didn't find any documentation
> of
> > this behaviour (neither in the Javadoc or in the flink web site,
> where
> >> I
> > can't find any documentation about JobListener at all).
> >
> > [Code to add to main()]
> >   // emit result
> >   if (params.has("output")) {
> > counts.writeAsCsv(params.get("output"), "\n", " ");
> > // execute program
> > env.registerJobListener(new JobListener() {
> >
> >   @Override
> >   public void onJobSubmitted(JobClient arg0, Throwable arg1)
> {
> > System.out.println(" SUBMITTED");
> >   }
> >
> >   @Override
> >   public void onJobExecuted(JobExecutionResult arg0,
> Throwable
> > arg1) {
> > System.out.println(" EXECUTED");
> >   }
> > });
> > env.execute("WordCount Example");
> >   } else {
> > System.out.println("Printing result to stdout. Use --output
> to
> > specify output path.");
> > counts.print();
> >   }
> >
> > On Fri, Nov 13, 2020 at 4:25 PM Flavio Pompermaier <
> >> pomperma...@okkam.it>
> > wrote:
> >
> >> see inline
> >>
> >> Il ven 13 nov 2020, 14:31 Matthias Pohl  ha
> >> scritto:
> >>
> >>> Hi Flavio,
> >>> thanks for sharing this with the Flink community. Could you answer
> >> the
> >>> following questions, please:
> >>> - What's the code of your Job's main method?
> >>>
> >>
> >> it's actually very simple...the main class creates a batch execution
> >> env
> >> using ExecutionEnvironment.getExecutionEnvironment(), I register a
> job
> >> listener to the env and I do some stuff before calling
> env.execute().
> >> The listener is executed correctly but if I use the
> RestClusterClient
> >> to
> >> sibmit the jobGraph exyracted from that main contained in a jar, the
> >> program is executed as usual but the job listener is not called.
> >>
> >> - What cluster backend and application do you use to execute the
> job?
> >>>
> >>
> >> I use a standalone session cluster for the moment
> >>
> 

Re: Logs of JobExecutionListener

2020-11-19 Thread Andrey Zagrebin
Hi Flavio and Aljoscha,

Sorry for the late heads up. I could not actually reproduce the reported
problem with 'flink run' and local standalone cluster on master.
I get the expected output with the suggested modification of WordCount
program:

$ bin/start-cluster.sh

$ rm -rf out; bin/flink run
flink/flink-examples/flink-examples-batch/target/WordCount.jar --output
flink/build-target/out

Executing WordCount example with default input data set.
Use --input to specify file input.
 SUBMITTED
Job has been submitted with JobID c454a894d0524ccb69943b95838eea07
Program execution finished
Job with JobID c454a894d0524ccb69943b95838eea07 has finished.
Job Runtime: 139 ms

 EXECUTED

Best,
Andrey

On Thu, Nov 19, 2020 at 2:40 PM Aljoscha Krettek 
wrote:

> JobListener.onJobExecuted() is only invoked in
> ExecutionEnvironment.execute() and ContextEnvironment.execute(). If none
> of these is still in the call chain with that setup then the listener
> will not be invoked.
>
> Also, this would only happen on the client, not on the broker (in your
> case) or the server (JobManager).
>
> Does that help to debug the problem?
>
> Aljoscha
>
> On 19.11.20 09:49, Flavio Pompermaier wrote:
> > I have a spring boot job server that act as a broker towards our
> > application and a Flink session cluster. To submit a job I use the
> > FlinkRestClient (that is also the one used in the CLI client when I use
> the
> > run action it if I'm not wrong). However both methods don't trigger the
> job
> > listener.
> >
> > Il gio 19 nov 2020, 09:39 Aljoscha Krettek  ha
> scritto:
> >
> >> @Flavio, when you're saying you're using the RestClusterClient, you are
> >> not actually using that manually, right? You're just submitting your job
> >> via "bin/flink run ...", right?
> >>
> >> What's the exact invocation of "bin/flink run" that you're using?
> >>
> >> On 19.11.20 09:29, Andrey Zagrebin wrote:
> >>> Hi Flavio,
> >>>
> >>> I think I can reproduce what you are reporting (assuming you also pass
> >>> '--output' to 'flink run').
> >>> I am not sure why it behaves like this. I would suggest filing a Jira
> >>> ticket for this.
> >>>
> >>> Best,
> >>> Andrey
> >>>
> >>> On Wed, Nov 18, 2020 at 9:45 AM Flavio Pompermaier <
> pomperma...@okkam.it
> >>>
> >>> wrote:
> >>>
>  is this a bug or is it a documentation problem...?
> 
>  Il sab 14 nov 2020, 18:44 Flavio Pompermaier 
> ha
>  scritto:
> 
> > I've also verified that the problem persist also using a modified
> >> version
> > of the WordCount class.
> > If you add the code pasted at the end of this email at the end of its
> > main method you can verify that the listener is called if you run the
> > program from the IDE, but it's not called if you submit the job using
> >> the
> > CLI client using the command
> >
> >  - bin/flink run
> >
> >>
>  
> /home/okkam/git/flink/flink-examples/flink-examples-batch/target/WordCount.jar
> >
> > Maybe this is an expected result but I didn't find any documentation
> of
> > this behaviour (neither in the Javadoc or in the flink web site,
> where
> >> I
> > can't find any documentation about JobListener at all).
> >
> > [Code to add to main()]
> >   // emit result
> >   if (params.has("output")) {
> > counts.writeAsCsv(params.get("output"), "\n", " ");
> > // execute program
> > env.registerJobListener(new JobListener() {
> >
> >   @Override
> >   public void onJobSubmitted(JobClient arg0, Throwable arg1)
> {
> > System.out.println(" SUBMITTED");
> >   }
> >
> >   @Override
> >   public void onJobExecuted(JobExecutionResult arg0,
> Throwable
> > arg1) {
> > System.out.println(" EXECUTED");
> >   }
> > });
> > env.execute("WordCount Example");
> >   } else {
> > System.out.println("Printing result to stdout. Use --output
> to
> > specify output path.");
> > counts.print();
> >   }
> >
> > On Fri, Nov 13, 2020 at 4:25 PM Flavio Pompermaier <
> >> pomperma...@okkam.it>
> > wrote:
> >
> >> see inline
> >>
> >> Il ven 13 nov 2020, 14:31 Matthias Pohl  ha
> >> scritto:
> >>
> >>> Hi Flavio,
> >>> thanks for sharing this with the Flink community. Could you answer
> >> the
> >>> following questions, please:
> >>> - What's the code of your Job's main method?
> >>>
> >>
> >> it's actually very simple...the main class creates a batch execution
> >> env
> >> using ExecutionEnvironment.getExecutionEnvironment(), I register a
> job
> >> listener to the env and I do some stuff before calling
> env.execute().
> >> The listener is executed correctly but if I use the
> RestClusterClient
> >> to
> >> sibmit the 

How to achieve co-location constraints in Flink 1.9.1

2020-11-19 Thread Si-li Liu
Hi

Flink only have slotSharingGroup API on DataStream class, I can't find any
public API to achieve co-location constraints. Could anyone provide me an
example?

Another question is that if I use slotSharing group, Flink will schedule
two sub tasks to same slot is possible. I think such schedule will always
success because two tasks run in one slot just decrease the resource usage.
Could anyone provide me an example when slotSharing group schedule fail ?

Thanks

-- 
Best regards

Sili Liu


Re: How to use EventTimeSessionWindows.withDynamicGap()

2020-11-19 Thread Aljoscha Krettek

On 17.11.20 17:37, Simone Cavallarin wrote:

Hi,

I have been working on the suggestion that you gave me, thanks! The first part is to add to the 
message the gap. 1)I receive the event, 2)I take that event and I map it using  
StatefulsessionCalculator, that is where I put together "The message", and 
"long" that is my gap in millis.

DataStream source = 

Operation in front of the window that keeps track of session gaps

DataStream> enriched = source
.keyBy()
.map(new StatefulSessionCalculator()); // or process()

This is my StatefulSessionCalculator():

Tuple2 map(MyMessageType input) {
ValueState valueState = getState(myModelStateDescriptor);
MyState state = valueState.value()
state.update(input);
long suggestedGap = state.getSuggestedGap();
valueState.update(state);
return Tuple2.of(input, suggestedGap);
}

If the "gap" calculated is "1234".
The result would be: [Tom, 1.70, 50, 1605612588995], [1234]>?


That looks correct, yes.


The second step is to use the gap calculated through  
DynamicWindowGapExtractor().

DataStream<...> result = enriched
.keyBy(new MyKeySelector())
.window(EventTimeSessionWindows.withDynamicGap(new 
DynamicWindowGapExtractor()))


The DynamicWindowGapExtractor() extract the gap from the message and feed it 
back to Flink.
Could you please give me an example also for this one?


This would just be class that extends 
SessionWindowTimeGapExtractor> and returns the gap 
from the extract() method.



One thing that I don't understand is that after enriching the message my event 
that contain a POJO is nested inside tuple. How can I access it?


You would just read the first field of the tuple, i.e. tuple.f0.



The last point when you said: "I think, however, that it might be easier at this 
point to just use a stateful ProcessFunction", you meant a completely different 
approach, would be better?


That's what I meant yes. Because it seems to complicated to split the 
logic into the part that determines the dynamic gap and then another 
part that does the computation per session. It seems easier to just roll 
that into one operator that does everything. And with state and timers 
you should have enough flexibility.


Best,
Aljoscha



Re: execution.runtime-mode=BATCH when reading from Hive

2020-11-19 Thread Aljoscha Krettek

Thanks! It's good to see that it is helpful to you.

Best,
Aljoscha

On 18.11.20 18:11, Dongwon Kim wrote:

Hi Aljoscha,

Unfortunately, it's not that easy right now because normal Sinks that

rely on checkpointing to write out data, such as Kafka, don't work in
BATCH execution mode because we don't have checkpoints there. It will
work, however, if you use a source that doesn't rely on checkpointing it
will work. The FlinkKafkaProducer with Semantic.NONE should work, for
example.


As the output produced to Kafka is eventually stored on Cassandra, I might
use a different sink to write output directly to Cassandra for BATCH
execution.
In such a case, I have to replace both (A) source and (E) sink.

There is HiveSource, which is built on the new Source API that will work

well with both BATCH and STREAMING. It's quite new and it was only added
to be used by a Table/SQL connector but you might have some success with
that one.


Oh, this one is a new one which will be introduced in the upcoming 1.12
release.
How I've missed this one.
I'm going to give it a try :-)

BTW, thanks a lot for the input and the nice presentation - it's very
helpful and insightful.

Best,

Dongwon

On Wed, Nov 18, 2020 at 9:44 PM Aljoscha Krettek 
wrote:


Hi Dongwon,

Unfortunately, it's not that easy right now because normal Sinks that
rely on checkpointing to write out data, such as Kafka, don't work in
BATCH execution mode because we don't have checkopoints there. It will
work, however, if you use a source that doesn't rely on checkpointing it
will work. The FlinkKafkaProducer with Semantic.NONE should work, for
example.

There is HiveSource, which is built on the new Source API that will work
well with both BATCH and STREAMING. It's quite new and it was only added
to be used by a Table/SQL connector but you might have some success with
that one.

Best,
Aljoscha

On 18.11.20 07:03, Dongwon Kim wrote:

Hi,

Recently I've been working on a real-time data stream processing pipeline
with DataStream API while preparing for a new service to launch.
Now it's time to develop a back-fill job to produce the same result by
reading data stored on Hive which we use for long-term storage.

Meanwhile, I watched Aljoscha's talk [1] and just wondered if I could

reuse

major components of the pipeline written in DataStream API.
The pipeline conceptually looks as follows:
(A) reads input from Kafka
(B) performs AsyncIO to Redis in order to enrich the input data
(C) appends timestamps and emits watermarks before time-based window
(D) keyBy followed by a session window with a custom trigger for early
firing
(E) writes output to Kafka

I have simple (maybe stupid) questions on reusing components of the
pipeline written in DataStream API.
(1) By replacing (A) with a bounded source, can I execute the pipeline

with

a new BATCH execution mode without modifying (B)~(E)?
(2) Is there a bounded source for Hive available for DataStream API?

Best,

Dongwon

[1] https://www.youtube.com/watch?v=z9ye4jzp4DQ










Re: Logs of JobExecutionListener

2020-11-19 Thread Aljoscha Krettek
JobListener.onJobExecuted() is only invoked in 
ExecutionEnvironment.execute() and ContextEnvironment.execute(). If none 
of these is still in the call chain with that setup then the listener 
will not be invoked.


Also, this would only happen on the client, not on the broker (in your 
case) or the server (JobManager).


Does that help to debug the problem?

Aljoscha

On 19.11.20 09:49, Flavio Pompermaier wrote:

I have a spring boot job server that act as a broker towards our
application and a Flink session cluster. To submit a job I use the
FlinkRestClient (that is also the one used in the CLI client when I use the
run action it if I'm not wrong). However both methods don't trigger the job
listener.

Il gio 19 nov 2020, 09:39 Aljoscha Krettek  ha scritto:


@Flavio, when you're saying you're using the RestClusterClient, you are
not actually using that manually, right? You're just submitting your job
via "bin/flink run ...", right?

What's the exact invocation of "bin/flink run" that you're using?

On 19.11.20 09:29, Andrey Zagrebin wrote:

Hi Flavio,

I think I can reproduce what you are reporting (assuming you also pass
'--output' to 'flink run').
I am not sure why it behaves like this. I would suggest filing a Jira
ticket for this.

Best,
Andrey

On Wed, Nov 18, 2020 at 9:45 AM Flavio Pompermaier 
is this a bug or is it a documentation problem...?

Il sab 14 nov 2020, 18:44 Flavio Pompermaier  ha
scritto:


I've also verified that the problem persist also using a modified

version

of the WordCount class.
If you add the code pasted at the end of this email at the end of its
main method you can verify that the listener is called if you run the
program from the IDE, but it's not called if you submit the job using

the

CLI client using the command

 - bin/flink run


  /home/okkam/git/flink/flink-examples/flink-examples-batch/target/WordCount.jar


Maybe this is an expected result but I didn't find any documentation of
this behaviour (neither in the Javadoc or in the flink web site, where

I

can't find any documentation about JobListener at all).

[Code to add to main()]
  // emit result
  if (params.has("output")) {
counts.writeAsCsv(params.get("output"), "\n", " ");
// execute program
env.registerJobListener(new JobListener() {

  @Override
  public void onJobSubmitted(JobClient arg0, Throwable arg1) {
System.out.println(" SUBMITTED");
  }

  @Override
  public void onJobExecuted(JobExecutionResult arg0, Throwable
arg1) {
System.out.println(" EXECUTED");
  }
});
env.execute("WordCount Example");
  } else {
System.out.println("Printing result to stdout. Use --output to
specify output path.");
counts.print();
  }

On Fri, Nov 13, 2020 at 4:25 PM Flavio Pompermaier <

pomperma...@okkam.it>

wrote:


see inline

Il ven 13 nov 2020, 14:31 Matthias Pohl  ha
scritto:


Hi Flavio,
thanks for sharing this with the Flink community. Could you answer

the

following questions, please:
- What's the code of your Job's main method?



it's actually very simple...the main class creates a batch execution

env

using ExecutionEnvironment.getExecutionEnvironment(), I register a job
listener to the env and I do some stuff before calling env.execute().
The listener is executed correctly but if I use the RestClusterClient

to

sibmit the jobGraph exyracted from that main contained in a jar, the
program is executed as usual but the job listener is not called.

- What cluster backend and application do you use to execute the job?




I use a standalone session cluster for the moment

- Is there anything suspicious you can find in the logs that might be

related?



no unfortunately..



Best,
Matthias

On Thu, Nov 12, 2020 at 7:48 PM Flavio Pompermaier <
pomperma...@okkam.it> wrote:


Actually what I'm experiencing is that the JobListener is executed
successfully if I run my main class from the IDE, while the job

listener is

not fired at all if I submit the JobGraph of the application to a

cluster

using the RestClusterClient..
Am I doing something wrong?

My main class ends with the env.execute() and i do
env.registerJobListener() when I create the Exceution env
via ExecutionEnvironment.getExecutionEnvironment().

Thanks in advance for any help,
Flavio

On Thu, Nov 12, 2020 at 2:13 PM Flavio Pompermaier <
pomperma...@okkam.it> wrote:


Hello everybody,
I'm trying to use the JobListener to track when a job finishes

(with

Flink 1.11.0).
It works great but I have the problem that logs inside
the onJobExecuted are not logged anywhere..is it normal?

Best,
Flavio














Flink 1.10 和 Flink 1.11 中 keyBy 算子聚合多个字段的问题

2020-11-19 Thread sherlock zw
大佬们:
   在 Flink 1.10.x 中的 keyBy 算子可以同时按多个字段分组,比如 map.keyBy(0,1),但在 1.11.x 
版本中这种方式被弃用了,看了下源码好像不支持按多字段分组了?还是有别的其他形式?
   如果我想按多个字段分组的话需要怎么操作?
   请大佬指点!


Re: Filter Null in Array in SQL Connector

2020-11-19 Thread Danny Chan
Can you also share your problematic json string here ? So that we can
decide the specific error case cause.

Rex Fenley  于2020年11月19日周四 下午2:51写道:

> Hi,
>
> I recently discovered some of our data has NULL values arriving in an
> ARRAY column. This column is being consumed by Flink via the Kafka
> connector Debezium format. We seem to be receiving NullPointerExceptions
> for when these NULL values in the arrays arrive which restarts the source
> operator in a loop.
>
> Is there any way to not throw or to possibly filter out NULLs in an Array
> of Strings in Flink?
>
> We're somewhat stuck on how to solve this problem, we'd like to be
> defensive about this on Flink's side.
>
> Thanks!
>
> (P.S. The exception was not that informative, there may be room for
> improvement in terms of a richer error message when this happens.)
>
> --
>
> Rex Fenley  |  Software Engineer - Mobile and Backend
>
>
> Remind.com  |  BLOG   |
>  FOLLOW US   |  LIKE US
> 
>


Re:flink1.11.1任务重启 偶现org.apache.kafka.common.KafkaException: Failed to construct kafka consumer异常

2020-11-19 Thread hailongwang
Hi,
   这个如果是偶尔出现的,应该是跟 ClassLoad 加载有关。
   如果 `org.apache.kafka.common.serialization.ByteArrayDeserializer` 被 child 
classload 加载了,
而 `org.apache.kafka.common.serialization.Deserializer` 被 parent classload 
加载了,那么会有问题。
你的 jar 包里面打了 kakfa-connector 的包了吗,如果集群上有,可以provided 看下。
希望对你有帮助。


Best,
Hailong Wang

在 2020-11-19 14:33:25,"m13162790856"  写道:
>具体日主信息如下:
>
>
>   org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
>at 
>org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:789) 
>at 
>org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:643) 
>at 
>org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:623) 
>at 
>org.apache.flink.streaming.connectors.kafka.internal.KafkaPartitionDiscoverer.initializeConnections(KafkaPartitionDiscoverer.java:58)
> at 
>org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.open(AbstractPartitionDiscoverer.java:94)
> at 
>org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:550)
> at 
>org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
> at 
>org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
> at 
>org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:291)
> at 
>org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:479)
> at 
>org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
> at 
>org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475)
> at 
>org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:528)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) at 
>org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) at 
>java.lang.Thread.run(Thread.java:745) Caused by: 
>org.apache.kafka.common.KafkaException: 
>org.apache.kafka.common.serialization.ByteArrayDeserializer is not an instance 
>of org.apache.kafka.common.serialization.Deserializer at 
>org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:263)
> at 
>org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:688) 
>... 15 more 2020-11-19 15:17:32,0
>
>
>有哪位同学遇见过


Re: Logs of JobExecutionListener

2020-11-19 Thread Flavio Pompermaier
I have a spring boot job server that act as a broker towards our
application and a Flink session cluster. To submit a job I use the
FlinkRestClient (that is also the one used in the CLI client when I use the
run action it if I'm not wrong). However both methods don't trigger the job
listener.

Il gio 19 nov 2020, 09:39 Aljoscha Krettek  ha scritto:

> @Flavio, when you're saying you're using the RestClusterClient, you are
> not actually using that manually, right? You're just submitting your job
> via "bin/flink run ...", right?
>
> What's the exact invocation of "bin/flink run" that you're using?
>
> On 19.11.20 09:29, Andrey Zagrebin wrote:
> > Hi Flavio,
> >
> > I think I can reproduce what you are reporting (assuming you also pass
> > '--output' to 'flink run').
> > I am not sure why it behaves like this. I would suggest filing a Jira
> > ticket for this.
> >
> > Best,
> > Andrey
> >
> > On Wed, Nov 18, 2020 at 9:45 AM Flavio Pompermaier  >
> > wrote:
> >
> >> is this a bug or is it a documentation problem...?
> >>
> >> Il sab 14 nov 2020, 18:44 Flavio Pompermaier  ha
> >> scritto:
> >>
> >>> I've also verified that the problem persist also using a modified
> version
> >>> of the WordCount class.
> >>> If you add the code pasted at the end of this email at the end of its
> >>> main method you can verify that the listener is called if you run the
> >>> program from the IDE, but it's not called if you submit the job using
> the
> >>> CLI client using the command
> >>>
> >>> - bin/flink run
> >>>
>  
> /home/okkam/git/flink/flink-examples/flink-examples-batch/target/WordCount.jar
> >>>
> >>> Maybe this is an expected result but I didn't find any documentation of
> >>> this behaviour (neither in the Javadoc or in the flink web site, where
> I
> >>> can't find any documentation about JobListener at all).
> >>>
> >>> [Code to add to main()]
> >>>  // emit result
> >>>  if (params.has("output")) {
> >>>counts.writeAsCsv(params.get("output"), "\n", " ");
> >>>// execute program
> >>>env.registerJobListener(new JobListener() {
> >>>
> >>>  @Override
> >>>  public void onJobSubmitted(JobClient arg0, Throwable arg1) {
> >>>System.out.println(" SUBMITTED");
> >>>  }
> >>>
> >>>  @Override
> >>>  public void onJobExecuted(JobExecutionResult arg0, Throwable
> >>> arg1) {
> >>>System.out.println(" EXECUTED");
> >>>  }
> >>>});
> >>>env.execute("WordCount Example");
> >>>  } else {
> >>>System.out.println("Printing result to stdout. Use --output to
> >>> specify output path.");
> >>>counts.print();
> >>>  }
> >>>
> >>> On Fri, Nov 13, 2020 at 4:25 PM Flavio Pompermaier <
> pomperma...@okkam.it>
> >>> wrote:
> >>>
>  see inline
> 
>  Il ven 13 nov 2020, 14:31 Matthias Pohl  ha
>  scritto:
> 
> > Hi Flavio,
> > thanks for sharing this with the Flink community. Could you answer
> the
> > following questions, please:
> > - What's the code of your Job's main method?
> >
> 
>  it's actually very simple...the main class creates a batch execution
> env
>  using ExecutionEnvironment.getExecutionEnvironment(), I register a job
>  listener to the env and I do some stuff before calling env.execute().
>  The listener is executed correctly but if I use the RestClusterClient
> to
>  sibmit the jobGraph exyracted from that main contained in a jar, the
>  program is executed as usual but the job listener is not called.
> 
>  - What cluster backend and application do you use to execute the job?
> >
> 
>  I use a standalone session cluster for the moment
> 
>  - Is there anything suspicious you can find in the logs that might be
> > related?
> >
> 
>  no unfortunately..
> 
> 
> > Best,
> > Matthias
> >
> > On Thu, Nov 12, 2020 at 7:48 PM Flavio Pompermaier <
> > pomperma...@okkam.it> wrote:
> >
> >> Actually what I'm experiencing is that the JobListener is executed
> >> successfully if I run my main class from the IDE, while the job
> listener is
> >> not fired at all if I submit the JobGraph of the application to a
> cluster
> >> using the RestClusterClient..
> >> Am I doing something wrong?
> >>
> >> My main class ends with the env.execute() and i do
> >> env.registerJobListener() when I create the Exceution env
> >> via ExecutionEnvironment.getExecutionEnvironment().
> >>
> >> Thanks in advance for any help,
> >> Flavio
> >>
> >> On Thu, Nov 12, 2020 at 2:13 PM Flavio Pompermaier <
> >> pomperma...@okkam.it> wrote:
> >>
> >>> Hello everybody,
> >>> I'm trying to use the JobListener to track when a job finishes
> (with
> >>> Flink 1.11.0).
> >>> It works great but I have the problem that logs inside
> >>> the onJobExecuted are not 

Re: Logs of JobExecutionListener

2020-11-19 Thread Aljoscha Krettek
@Flavio, when you're saying you're using the RestClusterClient, you are 
not actually using that manually, right? You're just submitting your job 
via "bin/flink run ...", right?


What's the exact invocation of "bin/flink run" that you're using?

On 19.11.20 09:29, Andrey Zagrebin wrote:

Hi Flavio,

I think I can reproduce what you are reporting (assuming you also pass
'--output' to 'flink run').
I am not sure why it behaves like this. I would suggest filing a Jira
ticket for this.

Best,
Andrey

On Wed, Nov 18, 2020 at 9:45 AM Flavio Pompermaier 
wrote:


is this a bug or is it a documentation problem...?

Il sab 14 nov 2020, 18:44 Flavio Pompermaier  ha
scritto:


I've also verified that the problem persist also using a modified version
of the WordCount class.
If you add the code pasted at the end of this email at the end of its
main method you can verify that the listener is called if you run the
program from the IDE, but it's not called if you submit the job using the
CLI client using the command

- bin/flink run

/home/okkam/git/flink/flink-examples/flink-examples-batch/target/WordCount.jar

Maybe this is an expected result but I didn't find any documentation of
this behaviour (neither in the Javadoc or in the flink web site, where I
can't find any documentation about JobListener at all).

[Code to add to main()]
 // emit result
 if (params.has("output")) {
   counts.writeAsCsv(params.get("output"), "\n", " ");
   // execute program
   env.registerJobListener(new JobListener() {

 @Override
 public void onJobSubmitted(JobClient arg0, Throwable arg1) {
   System.out.println(" SUBMITTED");
 }

 @Override
 public void onJobExecuted(JobExecutionResult arg0, Throwable
arg1) {
   System.out.println(" EXECUTED");
 }
   });
   env.execute("WordCount Example");
 } else {
   System.out.println("Printing result to stdout. Use --output to
specify output path.");
   counts.print();
 }

On Fri, Nov 13, 2020 at 4:25 PM Flavio Pompermaier 
wrote:


see inline

Il ven 13 nov 2020, 14:31 Matthias Pohl  ha
scritto:


Hi Flavio,
thanks for sharing this with the Flink community. Could you answer the
following questions, please:
- What's the code of your Job's main method?



it's actually very simple...the main class creates a batch execution env
using ExecutionEnvironment.getExecutionEnvironment(), I register a job
listener to the env and I do some stuff before calling env.execute().
The listener is executed correctly but if I use the RestClusterClient to
sibmit the jobGraph exyracted from that main contained in a jar, the
program is executed as usual but the job listener is not called.

- What cluster backend and application do you use to execute the job?




I use a standalone session cluster for the moment

- Is there anything suspicious you can find in the logs that might be

related?



no unfortunately..



Best,
Matthias

On Thu, Nov 12, 2020 at 7:48 PM Flavio Pompermaier <
pomperma...@okkam.it> wrote:


Actually what I'm experiencing is that the JobListener is executed
successfully if I run my main class from the IDE, while the job listener is
not fired at all if I submit the JobGraph of the application to a cluster
using the RestClusterClient..
Am I doing something wrong?

My main class ends with the env.execute() and i do
env.registerJobListener() when I create the Exceution env
via ExecutionEnvironment.getExecutionEnvironment().

Thanks in advance for any help,
Flavio

On Thu, Nov 12, 2020 at 2:13 PM Flavio Pompermaier <
pomperma...@okkam.it> wrote:


Hello everybody,
I'm trying to use the JobListener to track when a job finishes (with
Flink 1.11.0).
It works great but I have the problem that logs inside
the onJobExecuted are not logged anywhere..is it normal?

Best,
Flavio









Re: Logs of JobExecutionListener

2020-11-19 Thread Andrey Zagrebin
Hi Flavio,

I think I can reproduce what you are reporting (assuming you also pass
'--output' to 'flink run').
I am not sure why it behaves like this. I would suggest filing a Jira
ticket for this.

Best,
Andrey

On Wed, Nov 18, 2020 at 9:45 AM Flavio Pompermaier 
wrote:

> is this a bug or is it a documentation problem...?
>
> Il sab 14 nov 2020, 18:44 Flavio Pompermaier  ha
> scritto:
>
>> I've also verified that the problem persist also using a modified version
>> of the WordCount class.
>> If you add the code pasted at the end of this email at the end of its
>> main method you can verify that the listener is called if you run the
>> program from the IDE, but it's not called if you submit the job using the
>> CLI client using the command
>>
>>- bin/flink run
>>
>> /home/okkam/git/flink/flink-examples/flink-examples-batch/target/WordCount.jar
>>
>> Maybe this is an expected result but I didn't find any documentation of
>> this behaviour (neither in the Javadoc or in the flink web site, where I
>> can't find any documentation about JobListener at all).
>>
>> [Code to add to main()]
>> // emit result
>> if (params.has("output")) {
>>   counts.writeAsCsv(params.get("output"), "\n", " ");
>>   // execute program
>>   env.registerJobListener(new JobListener() {
>>
>> @Override
>> public void onJobSubmitted(JobClient arg0, Throwable arg1) {
>>   System.out.println(" SUBMITTED");
>> }
>>
>> @Override
>> public void onJobExecuted(JobExecutionResult arg0, Throwable
>> arg1) {
>>   System.out.println(" EXECUTED");
>> }
>>   });
>>   env.execute("WordCount Example");
>> } else {
>>   System.out.println("Printing result to stdout. Use --output to
>> specify output path.");
>>   counts.print();
>> }
>>
>> On Fri, Nov 13, 2020 at 4:25 PM Flavio Pompermaier 
>> wrote:
>>
>>> see inline
>>>
>>> Il ven 13 nov 2020, 14:31 Matthias Pohl  ha
>>> scritto:
>>>
 Hi Flavio,
 thanks for sharing this with the Flink community. Could you answer the
 following questions, please:
 - What's the code of your Job's main method?

>>>
>>> it's actually very simple...the main class creates a batch execution env
>>> using ExecutionEnvironment.getExecutionEnvironment(), I register a job
>>> listener to the env and I do some stuff before calling env.execute().
>>> The listener is executed correctly but if I use the RestClusterClient to
>>> sibmit the jobGraph exyracted from that main contained in a jar, the
>>> program is executed as usual but the job listener is not called.
>>>
>>> - What cluster backend and application do you use to execute the job?

>>>
>>> I use a standalone session cluster for the moment
>>>
>>> - Is there anything suspicious you can find in the logs that might be
 related?

>>>
>>> no unfortunately..
>>>
>>>
 Best,
 Matthias

 On Thu, Nov 12, 2020 at 7:48 PM Flavio Pompermaier <
 pomperma...@okkam.it> wrote:

> Actually what I'm experiencing is that the JobListener is executed
> successfully if I run my main class from the IDE, while the job listener 
> is
> not fired at all if I submit the JobGraph of the application to a cluster
> using the RestClusterClient..
> Am I doing something wrong?
>
> My main class ends with the env.execute() and i do
> env.registerJobListener() when I create the Exceution env
> via ExecutionEnvironment.getExecutionEnvironment().
>
> Thanks in advance for any help,
> Flavio
>
> On Thu, Nov 12, 2020 at 2:13 PM Flavio Pompermaier <
> pomperma...@okkam.it> wrote:
>
>> Hello everybody,
>> I'm trying to use the JobListener to track when a job finishes (with
>> Flink 1.11.0).
>> It works great but I have the problem that logs inside
>> the onJobExecuted are not logged anywhere..is it normal?
>>
>> Best,
>> Flavio
>>
>


Re: Problems with FlinkKafkaProducer - closing after timeoutMillis = 9223372036854775807 ms.

2020-11-19 Thread Tzu-Li (Gordon) Tai
Hi,

One thing to clarify first:
I think the "Closing the Kafka producer with timeoutMillis =
9223372036854775807 ms" log doesn't necessarily mean that a producer was
closed due to timeout (Long.MAX_VALUE).
I guess that is just a Kafka log message that is logged when a Kafka
producer is closed without specifying a timeout (i.e., infinite timeout, or
Long.MAX_VALUE in Kafka's case).

With that in mind, when using exactly-once semantics for the
FlinkKafkaProducer, there is a fixed-sized pool of short-living Kafka
producers that are created for each concurrent checkpoint.
When a checkpoint begins, the FlinkKafkaProducer creates a new producer for
that checkpoint. Once said checkpoint completes, the producer for that
checkpoint is attempted to be closed and recycled.
So, it is normal to see logs of Kafka producers being closed if you're
using an exactly-once transactional FlinkKafkaProducer.

Best,
Gordon

On Mon, Nov 16, 2020 at 9:11 PM Tim Josefsson 
wrote:

> To add to this, setting FlinkKafkaProducer.Semantic.AT_LEAST_ONCE instead
> of EXACTLY_ONCE makes the problem go away so I imagine there is something
> wrong with my setup.
> I'm using Kafka 2.2 and I have the following things set on the cluster:
>
> transaction.max.timeout.ms=360
> transaction.state.log.replication.factor=1
> transaction.state.log.min.isr=1
>
>
> On Mon, 16 Nov 2020 at 14:05, Tim Josefsson 
> wrote:
>
>> Hello!
>>
>> I'm having some problems with my KafkaProducer that I've been unable to
>> find a solution to.
>>
>> I've set up a simple Flink Job that reads from one kafka topic, using
>>*kafkaProps.setProperty("isolation.level", "read_committed") *
>> since I want to support exactly once data in my application.
>>
>> After doing some enriching of the data I read from kafka I have the
>> following producer set up
>>
>> FlinkKafkaProducer kafkaSinkProducer = new
>> FlinkKafkaProducer<>(
>> "enrichedPlayerSessionsTest",
>> new
>> KafkaStringSerializationSchema("enrichedPlayerSessionsTest"),
>> producerProps,
>> FlinkKafkaProducer.Semantic.EXACTLY_ONCE
>> );
>>
>> The producer above is then added as a sink at the end of my Flink job.
>>
>> Now when I run this application I get the following message,
>>
>> 13:44:40,758 INFO  
>> org.apache.kafka.clients.producer.internals.TransactionManager  - [Producer 
>> clientId=producer-6, transactionalId=Source: playerSession and 
>> playserSessionStarted from Kafka -> Filter out playerSessionStarted -> 
>> Extract PlayerSession -> Set MDC for event -> Map -> (Wrap 
>> playerSessionEnriched into EntityEventBatch, Sink: Post sessionEnriched to 
>> Kafka)-ac09b6abdcafcb1e86b87abd220f2a9d-2] ProducerId set to 21280 with 
>> epoch 4
>> 13:44:40,759 INFO  org.apache.kafka.clients.producer.KafkaProducer   
>> - [Producer clientId=producer-6, transactionalId=Source: 
>> playerSession and playserSessionStarted from Kafka -> Filter out 
>> playerSessionStarted -> Extract PlayerSession -> Set MDC for event -> Map -> 
>> (Wrap playerSessionEnriched into EntityEventBatch, Sink: Post 
>> sessionEnriched to Kafka)-ac09b6abdcafcb1e86b87abd220f2a9d-2] Closing the 
>> Kafka producer with timeoutMillis = 9223372036854775807 ms.
>>
>> Sometime I also see the following:
>>
>> 13:44:43,740 INFO  
>> org.apache.kafka.clients.producer.internals.TransactionManager  - [Producer 
>> clientId=producer-26, transactionalId=Source: playerSession and 
>> playserSessionStarted from Kafka -> Filter out playerSessionStarted -> 
>> Extract PlayerSession -> Set MDC for event -> Map -> (Wrap 
>> playerSessionEnriched into EntityEventBatch, Sink: Post sessionEnriched to 
>> Kafka)-ac09b6abdcafcb1e86b87abd220f2a9d-0] ProducerId set to -1 with epoch -1
>> 13:44:44,136 INFO  
>> org.apache.kafka.clients.producer.internals.TransactionManager  - [Producer 
>> clientId=producer-26, transactionalId=Source: playerSession and 
>> playserSessionStarted from Kafka -> Filter out playerSessionStarted -> 
>> Extract PlayerSession -> Set MDC for event -> Map -> (Wrap 
>> playerSessionEnriched into EntityEventBatch, Sink: Post sessionEnriched to 
>> Kafka)-ac09b6abdcafcb1e86b87abd220f2a9d-0] ProducerId set to 21297 with 
>> epoch 11
>> 13:44:44,147 INFO  
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase  - 
>> Consumer subtask 0 has no restore state.
>>
>> Now since this isn't an error the job doesn't crash while running and data 
>> does get written to Kafka even with this message. However it does seem wrong 
>> to me and I'm wondering if anyone has any insight into why this is happening?
>>
>> I'm attaching a GIST with the complete log from the application, I ran the 
>> job with *env.setParallelism(1)* but I still get around 26 producers created 
>> which still seems odd to me. Running without any parallelism set creates 
>> about 300-400 producers (based of the clientIds reported)
>>
>> Thankful for any insight into this!

Re: Filter Null in Array in SQL Connector

2020-11-19 Thread Danny Chan
Hi, Fenley ~

You are right, parsing nulls of ARRAY field is not supported now, i have
logged an issue [1] and would fix it soon ~

[1] https://issues.apache.org/jira/browse/FLINK-20234

Rex Fenley  于2020年11月19日周四 下午2:51写道:

> Hi,
>
> I recently discovered some of our data has NULL values arriving in an
> ARRAY column. This column is being consumed by Flink via the Kafka
> connector Debezium format. We seem to be receiving NullPointerExceptions
> for when these NULL values in the arrays arrive which restarts the source
> operator in a loop.
>
> Is there any way to not throw or to possibly filter out NULLs in an Array
> of Strings in Flink?
>
> We're somewhat stuck on how to solve this problem, we'd like to be
> defensive about this on Flink's side.
>
> Thanks!
>
> (P.S. The exception was not that informative, there may be room for
> improvement in terms of a richer error message when this happens.)
>
> --
>
> Rex Fenley  |  Software Engineer - Mobile and Backend
>
>
> Remind.com  |  BLOG   |
>  FOLLOW US   |  LIKE US
> 
>