Re: How to make two SQLs use the same KafkaTableSource?

2019-08-08 Thread Zhenghua Gao
Blink planner support lazy translation for multiple SQLs, and the common
nodes will be reused in a single job.
The only thing you need note here is the unified TableEnvironmentImpl do
not support conversions between Table(s) and Stream(s).
U must use pure SQL api (DDL/DML by sqlUpdate, DQL by sqlQuery).

*Best Regards,*
*Zhenghua Gao*


On Fri, Aug 9, 2019 at 12:38 PM Tony Wei  wrote:

> forgot to send to user mailing list.
>
> Tony Wei  於 2019年8月9日 週五 下午12:36寫道:
>
>> Hi Zhenghua,
>>
>> I didn't get your point. It seems that `isEagerOperationTranslation` is
>> always return false. Is that
>> means even I used Blink planner, the sql translation is still in a lazy
>> manner?
>>
>> Or do you mean Blink planner will recognize and link two SQLs to the same
>> kafka source, if
>> they both use the same kafka table, even if the translation is lazy?
>>
>> I'm not familiar with the details of translation process, but I guessed
>> the translating eagerly is not
>> be an only solution. If the translation of the second SQL can reuse the
>> operators from the first SQL,
>> then it is possible to link them to the same kafka source operator.
>>
>> Best,
>> Tony Wei
>>
>> Zhenghua Gao  於 2019年8月9日 週五 上午11:57寫道:
>>
>>> This needs EagerOperationTranslation[1]
>>> 
>>> support. you can try in Blink planner in 1.9.0.
>>>
>>> [1]
>>> https://github.com/apache/flink/blob/master/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java#L413
>>>
>>> *Best Regards,*
>>> *Zhenghua Gao*
>>>
>>>
>>> On Fri, Aug 9, 2019 at 10:37 AM Tony Wei  wrote:
>>>
 Hi,

 I used `flinkTableEnv.connect(new Kafka()...).registerTableSource(...)`
 to register my kafka table.
 However, I found that because SQL is a lazy operation, it will convert
 to DataStream under some
 criteria. For example, `Table#toRetractStream`.

 So, when I used two SQLs in one application job, the same kafka table
 will be constructed twice. It
 is not a problem from flink side, because two operators held their own
 state for offsets. But from
 kafka side, they will have the same group_id.

 I want to make sure that only one kafka source will commit group_id's
 offsets back to kafka. A
 workaround might be registering the same kafka topic twice with
 different name, group_id for
 two SQLs. But I would still like to know if there is any way to make
 two SQLs just read from the
 same KafkaTableSource? Thanks in advance.

 Best,
 Tony Wei

>>>


Re: How to make two SQLs use the same KafkaTableSource?

2019-08-08 Thread Tony Wei
forgot to send to user mailing list.

Tony Wei  於 2019年8月9日 週五 下午12:36寫道:

> Hi Zhenghua,
>
> I didn't get your point. It seems that `isEagerOperationTranslation` is
> always return false. Is that
> means even I used Blink planner, the sql translation is still in a lazy
> manner?
>
> Or do you mean Blink planner will recognize and link two SQLs to the same
> kafka source, if
> they both use the same kafka table, even if the translation is lazy?
>
> I'm not familiar with the details of translation process, but I guessed
> the translating eagerly is not
> be an only solution. If the translation of the second SQL can reuse the
> operators from the first SQL,
> then it is possible to link them to the same kafka source operator.
>
> Best,
> Tony Wei
>
> Zhenghua Gao  於 2019年8月9日 週五 上午11:57寫道:
>
>> This needs EagerOperationTranslation[1]
>> 
>> support. you can try in Blink planner in 1.9.0.
>>
>> [1]
>> https://github.com/apache/flink/blob/master/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java#L413
>>
>> *Best Regards,*
>> *Zhenghua Gao*
>>
>>
>> On Fri, Aug 9, 2019 at 10:37 AM Tony Wei  wrote:
>>
>>> Hi,
>>>
>>> I used `flinkTableEnv.connect(new Kafka()...).registerTableSource(...)`
>>> to register my kafka table.
>>> However, I found that because SQL is a lazy operation, it will convert
>>> to DataStream under some
>>> criteria. For example, `Table#toRetractStream`.
>>>
>>> So, when I used two SQLs in one application job, the same kafka table
>>> will be constructed twice. It
>>> is not a problem from flink side, because two operators held their own
>>> state for offsets. But from
>>> kafka side, they will have the same group_id.
>>>
>>> I want to make sure that only one kafka source will commit group_id's
>>> offsets back to kafka. A
>>> workaround might be registering the same kafka topic twice with
>>> different name, group_id for
>>> two SQLs. But I would still like to know if there is any way to make two
>>> SQLs just read from the
>>> same KafkaTableSource? Thanks in advance.
>>>
>>> Best,
>>> Tony Wei
>>>
>>


Re: 如何获取Flink table api/sql code gen 代码

2019-08-08 Thread Zhenghua Gao
Currently Flink DO NOT provides a direct way to get code gen code. But
there are indirect ways to try.
1) debug in IDE
Flink use Janino to compile all code gen code, and there is a single entry
point [1]

for
Blink planner, [2]

for
old planner, you can set breakpoint there and get the code.

2) enable debug logging
Blink planner logging code in CompileUtils, and old planner logging code in
subclass of Compiler

3) use Janino options
Janino caches code in tmp directory, and you can enable these options[3]
.
Note: org.codehaus.janino.source_debugging.keep is not supported in current
Janino version, which means this method can only be used to debug in
IDE(need breakpoint to keep source code)

[1]
https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/generated/CompileUtils.java
[2]
https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/codegen/Compiler.scala
[3]
https://github.com/janino-compiler/janino/blob/master/janino/src/main/java/org/codehaus/janino/Scanner.java#L71

*Best Regards,*
*Zhenghua Gao*


On Wed, Aug 7, 2019 at 12:02 AM Vincent Cai  wrote:

> Hi all,
> 在Spark中,可以通过调用Dataset的queryExecution.debug.codegen() 方法获得 Catalyst 产生的代码。
> 在Flink是否有类似的方法可以获得code gen的代码?
>
>
> 参考链接:
> https://medium.com/virtuslab/spark-sql-under-the-hood-part-i-26077f85ebf0
>
>
> Regards
> Vincent  Cai


Re: Re: CsvTableSink 目录没有写入具体的数据

2019-08-08 Thread wangl...@geekplus.com.cn

抱歉,是我搞错了。
实际上是写入数据的。我在 windows 下做测试,刷新下文件的大小始终是 0 , 只有编辑看下那个文件显示的文件大小才会变更。



wangl...@geekplus.com.cn
 
Sender: Alec Chen
Send Time: 2019-08-09 10:17
Receiver: user-zh
Subject: Re: Re: CsvTableSink 目录没有写入具体的数据
没数据是因为没有trigger执行,  参考sample code from doc(
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/common.html
)
 
// get a StreamTableEnvironment, works for BatchTableEnvironment
equivalentlyStreamTableEnvironment tableEnv =
StreamTableEnvironment.create(env);
// create a TableSinkTableSink sink = new
CsvTableSink("/path/to/file", fieldDelim = "|");
// register the TableSink with a specific schemaString[] fieldNames =
{"a", "b", "c"};TypeInformation[] fieldTypes = {Types.INT,
Types.STRING, Types.LONG};tableEnv.registerTableSink("CsvSinkTable",
fieldNames, fieldTypes, sink);
// compute a result Table using Table API operators and/or SQL
queriesTable result = ...// emit the result Table to the registered
TableSinkresult.insertInto("CsvSinkTable");
// execute the program
 
加上 tableEnv.execute();
 
 
wangl...@geekplus.com.cn  于2019年8月9日周五 上午9:42写道:
 
>
> 我接入了一个 RocketMQ 的流作为输入。
>
>
>  DataStream> ds = env.addSource(new
> RocketMQSource(
> 
> System.out.println(res);
> return res;
> }
> });
>
>
> tableEnv.registerDataStream("t_pick_task", ds, "pick_task_id,
> pick_list_no, sku_code");
>
> TableSink csvSink = new CsvTableSink("D:\\data\\flink",",");
> String[] fieldNames = {"num"};
> TypeInformation[] fieldTypes = {Types.INT};
> tableEnv.registerTableSink("RubberOrders", fieldNames, fieldTypes,
> csvSink);
> tableEnv.sqlUpdate(
> "INSERT INTO RubberOrders SELECT pick_task_id FROM
> t_pick_task");
>
>
>
> wangl...@geekplus.com.cn
>
> Sender: Alec Chen
> Send Time: 2019-08-08 21:01
> Receiver: user-zh
> Subject: Re: CsvTableSink 目录没有写入具体的数据
> 完整代码发一下
>
> wangl...@geekplus.com.cn  于2019年8月8日周四 下午7:37写道:
>
> >
> > 我按官网上的
> >
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/sql.html#specifying-a-query
> > 例子写的代码
> > 但运行后 CsvTableSink 指定的目录只生成了空文件,没有具体的内容,这是为什么呢?
> >
> >
> >
> > wangl...@geekplus.com.cn
> >
>


Re: Flink sql join问题

2019-08-08 Thread Zhenghua Gao
可以试下最新flink 1.9 blink
planner的firstRow/lastRow优化[1]能否满足你的需求,目前的限制是只能基于procTime来去重。

* e.g.
* 1. {{{
* SELECT a, b, c FROM (
*   SELECT a, b, c, proctime,
*  ROW_NUMBER() OVER (PARTITION BY a ORDER BY proctime ASC) as row_num
*   FROM MyTable
* ) WHERE row_num <= 1
* }}} will be converted to StreamExecDeduplicate which keeps first row.
* 2. {{{
* SELECT a, b, c FROM (
*   SELECT a, b, c, proctime,
*  ROW_NUMBER() OVER (PARTITION BY a ORDER BY proctime DESC) as row_num
*   FROM MyTable
* ) WHERE row_num <= 1
* }}} will be converted to StreamExecDeduplicate which keeps last row.


[1]
https://github.com/apache/flink/blob/master/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamExecDeduplicateRule.scala

*Best Regards,*
*Zhenghua Gao*


On Tue, Aug 6, 2019 at 2:28 PM huang  wrote:

> Hi all,
>
>
> 请问用Flink
> sql做双流join。如果希望两个流都只保存每个key的最新的数据,这样相当于每次join都只输出最新的一条记录。请问这种场景sql支持吗
>
>
> thanks


如何讓兩個 SQL 使用相同的 KafkaTableSource

2019-08-08 Thread Tony Wei
Hi

我在我的 flink job 中透過 `flinkTableEnv.connect(new
Kafka()...).registerTableSource(...)` 註冊了
一張 kafka table。但從文件上我才知道 SQL 只會在特定的條件下才會真正的轉為 DataStream,比
如說呼叫了Table#toRetractStream`。

因為如此,我發現當我嘗試在同一個 flink job 中使用了不同的 SQL 時,他們會同時產生各自的
kafka source operator。從 flink 的角度來說可能不是什麼大問題,各自獨立的 operator 會各自管理
好自己的 offset state,也不會互相影響。但是從 kafka 方面來看,因為兩邊都是使用相同的
group_id,當 offset 被 commit 回 kafka 時,就會在 kafka 端有衝突。

我想要確保每個 group_id 只會被一個 operator 負責執行 commit 的動作。最簡單的做法可能是故意
為相同的 kafka topic 註冊兩個名稱不同的 table, group_id,分別給兩個 SQL 使用。但我想知道是
不是有更好的做法,可以讓兩個 SQL 是真正的從同一個 kafka operator 讀取資料?這樣也不需要同
時存在兩個做一樣事情的 kafka operator 。先謝謝各位的幫助。

Best,
Tony Wei


How to make two SQLs use the same KafkaTableSource?

2019-08-08 Thread Tony Wei
Hi,

I used `flinkTableEnv.connect(new Kafka()...).registerTableSource(...)` to
register my kafka table.
However, I found that because SQL is a lazy operation, it will convert to
DataStream under some
criteria. For example, `Table#toRetractStream`.

So, when I used two SQLs in one application job, the same kafka table will
be constructed twice. It
is not a problem from flink side, because two operators held their own
state for offsets. But from
kafka side, they will have the same group_id.

I want to make sure that only one kafka source will commit group_id's
offsets back to kafka. A
workaround might be registering the same kafka topic twice with different
name, group_id for
two SQLs. But I would still like to know if there is any way to make two
SQLs just read from the
same KafkaTableSource? Thanks in advance.

Best,
Tony Wei


Re: Re: [ANNOUNCE] Hequn becomes a Flink committer

2019-08-08 Thread Yun Gao
Congratulations Hequn!

Best,
Yun


--
From:Congxian Qiu 
Send Time:2019 Aug. 8 (Thu.) 21:34
To:Yu Li 
Cc:Haibo Sun ; dev ; Rong Rong 
; user 
Subject:Re: Re: [ANNOUNCE] Hequn becomes a Flink committer

Congratulations Hequn!

Best,
Congxian


Yu Li  于2019年8月8日周四 下午2:02写道:

> Congratulations Hequn! Well deserved!
>
> Best Regards,
> Yu
>
>
> On Thu, 8 Aug 2019 at 03:53, Haibo Sun  wrote:
>
>> Congratulations!
>>
>> Best,
>> Haibo
>>
>> At 2019-08-08 02:08:21, "Yun Tang"  wrote:
>> >Congratulations Hequn.
>> >
>> >Best
>> >Yun Tang
>> >
>> >From: Rong Rong 
>> >Sent: Thursday, August 8, 2019 0:41
>> >Cc: dev ; user 
>> >Subject: Re: [ANNOUNCE] Hequn becomes a Flink committer
>> >
>> >Congratulations Hequn, well deserved!
>> >
>> >--
>> >Rong
>> >
>> >On Wed, Aug 7, 2019 at 8:30 AM 
>> >mailto:xingc...@gmail.com>> wrote:
>> >
>> >Congratulations, Hequn!
>> >
>> >
>> >
>> >From: Xintong Song mailto:tonysong...@gmail.com>>
>> >Sent: Wednesday, August 07, 2019 10:41 AM
>> >To: d...@flink.apache.org
>> >Cc: user mailto:user@flink.apache.org>>
>> >Subject: Re: [ANNOUNCE] Hequn becomes a Flink committer
>> >
>> >
>> >
>> >Congratulations~!
>> >
>> >
>> >Thank you~
>> >
>> >Xintong Song
>> >
>> >
>> >
>> >
>> >
>> >On Wed, Aug 7, 2019 at 4:00 PM vino yang 
>> >mailto:yanghua1...@gmail.com>> wrote:
>> >
>> >Congratulations!
>> >
>> >highfei2...@126.com 
>> >mailto:highfei2...@126.com>> 于2019年8月7日周三 下午7:09写道:
>> >
>> >> Congrats Hequn!
>> >>
>> >> Best,
>> >> Jeff Yang
>> >>
>> >>
>> >>  Original Message 
>> >> Subject: Re: [ANNOUNCE] Hequn becomes a Flink committer
>> >> From: Piotr Nowojski
>> >> To: JingsongLee
>> >> CC: Biao Liu ,Zhu Zhu ,Zili Chen ,Jeff Zhang ,Paul Lam ,jincheng sun ,dev 
>> >> ,user
>> >>
>> >>
>> >> Congratulations :)
>> >>
>> >> On 7 Aug 2019, at 12:09, JingsongLee 
>> >> mailto:lzljs3620...@aliyun.com>> wrote:
>> >>
>> >> Congrats Hequn!
>> >>
>> >> Best,
>> >> Jingsong Lee
>> >>
>> >> --
>> >> From:Biao Liu mailto:mmyy1...@gmail.com>>
>> >> Send Time:2019年8月7日(星期三) 12:05
>> >> To:Zhu Zhu mailto:reed...@gmail.com>>
>> >> Cc:Zili Chen mailto:wander4...@gmail.com>>; Jeff 
>> >> Zhang mailto:zjf...@gmail.com>>; Paul
>> >> Lam mailto:paullin3...@gmail.com>>; jincheng sun 
>> >> mailto:sunjincheng...@gmail.com>>; dev
>> >> mailto:d...@flink.apache.org>>; user 
>> >> mailto:user@flink.apache.org>>
>> >> Subject:Re: [ANNOUNCE] Hequn becomes a Flink committer
>> >>
>> >> Congrats Hequn!
>> >>
>> >> Thanks,
>> >> Biao /'bɪ.aʊ/
>> >>
>> >>
>> >>
>> >> On Wed, Aug 7, 2019 at 6:00 PM Zhu Zhu 
>> >> mailto:reed...@gmail.com>> wrote:
>> >> Congratulations to Hequn!
>> >>
>> >> Thanks,
>> >> Zhu Zhu
>> >>
>> >> Zili Chen mailto:wander4...@gmail.com>> 
>> >> 于2019年8月7日周三 下午5:16写道:
>> >> Congrats Hequn!
>> >>
>> >> Best,
>> >> tison.
>> >>
>> >>
>> >> Jeff Zhang mailto:zjf...@gmail.com>> 于2019年8月7日周三 
>> >> 下午5:14写道:
>> >> Congrats Hequn!
>> >>
>> >> Paul Lam mailto:paullin3...@gmail.com>> 
>> >> 于2019年8月7日周三 下午5:08写道:
>> >> Congrats Hequn! Well deserved!
>> >>
>> >> Best,
>> >> Paul Lam
>> >>
>> >> 在 2019年8月7日,16:28,jincheng sun 
>> >> mailto:sunjincheng...@gmail.com>> 写道:
>> >>
>> >> Hi everyone,
>> >>
>> >> I'm very happy to announce that Hequn accepted the offer of the Flink PMC
>> >> to become a committer of the Flink project.
>> >>
>> >> Hequn has been contributing to Flink for many years, mainly working on
>> >> SQL/Table API features. He's also frequently helping out on the user
>> >> mailing lists and helping check/vote the release.
>> >>
>> >> Congratulations Hequn!
>> >>
>> >> Best, Jincheng
>> >> (on behalf of the Flink PMC)
>> >>
>> >>
>> >>
>> >> --
>> >> Best Regards
>> >>
>> >> Jeff Zhang
>> >>
>> >>
>> >>
>>
>>


Re: NoClassDefFoundError in failing-restarting job that uses url classloader

2019-08-08 Thread Zhu Zhu
Hi Subramanyam,

Could you share more information? including:
1. the URL pattern
2. the detailed exception and the log around it
3. the cluster the job is running on, e.g. standalone, yarn, k8s
4. it's session mode or per job mode

This information would be helpful to identify the failure cause.

Thanks,
Zhu Zhu











Subramanyam Ramanathan  于2019年8月9日周五
上午1:45写道:

>
>
> Hello,
>
>
>
> I'm currently using flink 1.7.2.
>
>
>
> I'm trying to run a job that's submitted programmatically using the
> ClusterClient API.
>
>public JobSubmissionResult run(PackagedProgram prog, int
> parallelism)
>
>
>
>
>
> The job makes use of some jars which I add to the packaged program through
> the Packaged constructor, along with the Jar file.
>
>public PackagedProgram(File jarFile, List classpaths, String...
> args)
>
> Normally, This works perfectly and the job runs fine.
>
>
>
> However, if there's an error in the job, and the job goes into failing
> state and when it's continously  trying to restart the job for an hour or
> so, I notice a NoClassDefFoundError for some classes in the jars that I
> load using the URL class loader and the job never recovers after that, even
> if the root cause of the issue was fixed (I had a kafka source/sink in my
> job, and kafka was down temporarily, and was brought up after that).
>
> The jar is still available at the path referenced by the url classloader
> and is not tampered with.
>
>
>
> Could anyone please give me some pointers with regard to the reason why
> this could happen/what I could be missing here/how can I debug further ?
>
>
>
> thanks
>
> Subbu
>
>
>
>
>


Re: Re: CsvTableSink 目录没有写入具体的数据

2019-08-08 Thread Alec Chen
没数据是因为没有trigger执行,  参考sample code from doc(
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/common.html
)

// get a StreamTableEnvironment, works for BatchTableEnvironment
equivalentlyStreamTableEnvironment tableEnv =
StreamTableEnvironment.create(env);
// create a TableSinkTableSink sink = new
CsvTableSink("/path/to/file", fieldDelim = "|");
// register the TableSink with a specific schemaString[] fieldNames =
{"a", "b", "c"};TypeInformation[] fieldTypes = {Types.INT,
Types.STRING, Types.LONG};tableEnv.registerTableSink("CsvSinkTable",
fieldNames, fieldTypes, sink);
// compute a result Table using Table API operators and/or SQL
queriesTable result = ...// emit the result Table to the registered
TableSinkresult.insertInto("CsvSinkTable");
// execute the program

加上 tableEnv.execute();


wangl...@geekplus.com.cn  于2019年8月9日周五 上午9:42写道:

>
> 我接入了一个 RocketMQ 的流作为输入。
>
>
>  DataStream> ds = env.addSource(new
> RocketMQSource(
> 
> System.out.println(res);
> return res;
> }
> });
>
>
> tableEnv.registerDataStream("t_pick_task", ds, "pick_task_id,
> pick_list_no, sku_code");
>
> TableSink csvSink = new CsvTableSink("D:\\data\\flink",",");
> String[] fieldNames = {"num"};
> TypeInformation[] fieldTypes = {Types.INT};
> tableEnv.registerTableSink("RubberOrders", fieldNames, fieldTypes,
> csvSink);
> tableEnv.sqlUpdate(
> "INSERT INTO RubberOrders SELECT pick_task_id FROM
> t_pick_task");
>
>
>
> wangl...@geekplus.com.cn
>
> Sender: Alec Chen
> Send Time: 2019-08-08 21:01
> Receiver: user-zh
> Subject: Re: CsvTableSink 目录没有写入具体的数据
> 完整代码发一下
>
> wangl...@geekplus.com.cn  于2019年8月8日周四 下午7:37写道:
>
> >
> > 我按官网上的
> >
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/sql.html#specifying-a-query
> > 例子写的代码
> > 但运行后 CsvTableSink 指定的目录只生成了空文件,没有具体的内容,这是为什么呢?
> >
> >
> >
> > wangl...@geekplus.com.cn
> >
>


Flink fs s3 shaded hadoop: KerberosAuthException when using StreamingFileSink to S3

2019-08-08 Thread Achyuth Narayan Samudrala
Hi,

We are trying to use StreamingFileSink to write to a S3 bucket. Its a
simple job which reads from Kafka and sinks to S3. The credentials for s3
are configured in the flink cluster. We are using flink 1.7.2 without pre
bundled hadoop. As suggested in the documentation we have added the
flink-s3-fs-hadoop jar to the lib directory of the flink cluster. When we
run the job, we are getting this particular auth exception. What are we
doing wrong? Is there any configuration/jar that we are missing? There is
no HADOOP_HOME etc set since we are using the shaded jar.

Looking forward to hearing from you.
Best Wishes,
Achyuth


org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.KerberosAuthException:
failure to login: javax.security.auth.login.LoginException:
java.lang.NullPointerException: invalid null input: name
at com.sun.security.auth.UnixPrincipal.(UnixPrincipal.java:71)
at com.sun.security.auth.module.UnixLoginModule.login(UnixLoginModule.java:133)
at sun.reflect.GeneratedMethodAccessor26.invoke(Unknown Source)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at javax.security.auth.login.LoginContext.invoke(LoginContext.java:755)
at javax.security.auth.login.LoginContext.access$000(LoginContext.java:195)
at javax.security.auth.login.LoginContext$4.run(LoginContext.java:682)
at javax.security.auth.login.LoginContext$4.run(LoginContext.java:680)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.login.LoginContext.invokePriv(LoginContext.java:680)
at javax.security.auth.login.LoginContext.login(LoginContext.java:587)
at 
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.UserGroupInformation$HadoopLoginContext.login(UserGroupInformation.java:1877)
at 
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.UserGroupInformation.doSubjectLogin(UserGroupInformation.java:1789)
at 
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.UserGroupInformation.createLoginUser(UserGroupInformation.java:704)
at 
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.UserGroupInformation.getLoginUser(UserGroupInformation.java:654)
at 
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.UserGroupInformation.getCurrentUser(UserGroupInformation.java:565)
at 
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:247)
at 
org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory.create(AbstractS3FileSystemFactory.java:125)
at 
org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:395)
at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:318)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.(Buckets.java:112)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink$BulkFormatBuilder.createBuckets(StreamingFileSink.java:317)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:327)
at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:278)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
at java.lang.Thread.run(Thread.java:748)

at 
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.UserGroupInformation.doSubjectLogin(UserGroupInformation.java:1799)
at 
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.UserGroupInformation.createLoginUser(UserGroupInformation.java:704)
at 
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.UserGroupInformation.getLoginUser(UserGroupInformation.java:654)
at 
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.UserGroupInformation.getCurrentUser(UserGroupInformation.java:565)
at 
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:247)
at 
org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory.create(AbstractS3FileSystemFactory.java:125)
at 
org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:395)
at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:318)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.(Buckets.java:112)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink$BulkFormatBuilder.createBuckets(StreamingFileSink.java:317)
at 

Re: Capping RocksDb memory usage

2019-08-08 Thread Congxian Qiu
Hi
Maybe FLIP-49[1] "Unified Memory Configuration for TaskExecutors" can give
some information here

[1]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-49-Unified-Memory-Configuration-for-TaskExecutors-td31436.html
Best,
Congxian


Cam Mach  于2019年8月9日周五 上午4:59写道:

> Hi Biao, Yun and Ning.
>
> Thanks for your response and pointers. Those are very helpful!
>
> So far, we have tried with some of those parameters (WriterBufferManager,
> write_buffer_size, write_buffer_count, ...), but still continuously having
> issues with memory.
> Here are our cluster configurations:
>
>- 1 Job Controller (32 GB RAM and 8 cores)
>- 10 Task Managers: (32 GB RAM, 8 cores CPU, and 300GB SSD configured
>for RocksDB, and we set 10GB heap for each)
>- Running under Kuberntes
>
> We have a pipeline that read/transfer 500 million records (around 1kb
> each), and write to our sink. Our total data is around 1.2 Terabytes. Our
> pipeline configurations are as follows:
>
>- 13 operators - some of them (around 6) are stateful
>- Parallelism: 60
>- Task slots: 6
>
> We have run several tests and observed that memory just keep growing while
> our TM's CPU stay around 10 - 15% usage. We are now just focusing limiting
> memory usage from Flink and RocksDB, so Kubernetes won't kill it.
>
> Any recommendations or advices are greatly appreciated!
>
> Thanks,
>
>
>
>
> On Thu, Aug 8, 2019 at 6:57 AM Yun Tang  wrote:
>
>> Hi Cam
>>
>> I think FLINK-7289 [1] might offer you some insights to control RocksDB
>> memory, especially the idea using write buffer manager [2] to control the
>> total write buffer memory. If you do not have too many sst files, write
>> buffer memory usage would consume much more space than index and filter
>> usage. Since Flink would use per state per column family, and the write
>> buffer number increase when more column families created.
>>
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-7289
>> [2] https://github.com/dataArtisans/frocksdb/pull/4
>>
>> Best
>> Yun Tang
>>
>>
>> --
>> *From:* Cam Mach 
>> *Sent:* Thursday, August 8, 2019 21:39
>> *To:* Biao Liu 
>> *Cc:* miki haiat ; user 
>> *Subject:* Re: Capping RocksDb memory usage
>>
>> Thanks for your response, Biao.
>>
>>
>>
>> On Wed, Aug 7, 2019 at 11:41 PM Biao Liu  wrote:
>>
>> Hi Cam,
>>
>> AFAIK, that's not an easy thing. Actually it's more like a Rocksdb issue.
>> There is a document explaining the memory usage of Rocksdb [1]. It might be
>> helpful.
>>
>> You could define your own option to tune Rocksdb through
>> "state.backend.rocksdb.options-factory" [2]. However I would suggest not to
>> do this unless you are fully experienced of Rocksdb. IMO it's quite
>> complicated.
>>
>> Meanwhile I can share a bit experience of this. We have tried to put the
>> cache and filter into block cache before. It's useful to control the memory
>> usage. But the performance might be affected at the same time. Anyway you
>> could try and tune it. Good luck!
>>
>> 1. https://github.com/facebook/rocksdb/wiki/Memory-usage-in-RocksDB
>> 2.
>> https://ci.apache.org/projects/flink/flink-docs-master/ops/state/large_state_tuning.html#tuning-rocksdb
>>
>> Thanks,
>> Biao /'bɪ.aʊ/
>>
>>
>>
>> On Thu, Aug 8, 2019 at 11:44 AM Cam Mach  wrote:
>>
>> Yes, that is correct.
>> Cam Mach
>> Software Engineer
>> E-mail: cammac...@gmail.com
>> Tel: 206 972 2768
>>
>>
>>
>> On Wed, Aug 7, 2019 at 8:33 PM Biao Liu  wrote:
>>
>> Hi Cam,
>>
>> Do you mean you want to limit the memory usage of RocksDB state backend?
>>
>> Thanks,
>> Biao /'bɪ.aʊ/
>>
>>
>>
>> On Thu, Aug 8, 2019 at 2:23 AM miki haiat  wrote:
>>
>> I think using metrics exporter is the easiest way
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html#rocksdb
>>
>>
>> On Wed, Aug 7, 2019, 20:28 Cam Mach  wrote:
>>
>> Hello everyone,
>>
>> What is the most easy and efficiently way to cap RocksDb's memory usage?
>>
>> Thanks,
>> Cam
>>
>>


Re: flink-1.8.1 yarn per job模式使用

2019-08-08 Thread Zili Chen
刚发现 user-zh 是有 archive[1] 的,上面提到过的跟你类似的问题是这个 thread[2]。

Best,
tison.

[1] https://lists.apache.org/list.html?user-zh@flink.apache.org
[2]
https://lists.apache.org/thread.html/061d8e48b091b27e797975880c193838f2c37894c2a90aa6a6e83d36@%3Cuser-zh.flink.apache.org%3E

Yuhuan Li  于2019年8月7日周三 下午7:57写道:

> 非常感谢tison,完美的解决了我的问题,以后会多留意社区问题。
>
> 具体到自己的hadoop版本,就是在flink工程编译
> flink-1.8.1/flink-shaded-hadoop/flink-shaded-hadoop2-uber/target
> 的jar放在lib下即可
>
> Zili Chen  于2019年8月7日周三 下午7:33写道:
>
> > 这个问题以前邮件列表有人提过...不过现在 user-zh 没有 archive 不好引用。
> >
> > 你看下是不是 lib 下面没有 flink-shaded-hadoop-2-uber--7.0.jar
> 这样一个文件。
> >
> > 1.8.1 之后 FLINK 把 hadoop(YARN) 的 lib 分开 release 了,你要指定自己的 HADOOP_CLASSPATH
> > 或者下载 FLINK 官网 pre-bundle 的 hadoop。
> >
> > 具体可以看这个页面(https://flink.apache.org/downloads.html)第一段的内容。
> >
> > Best,
> > tison.
> >
> >
> > 李玉环  于2019年8月7日周三 下午7:15写道:
> >
> > > Hi 大家好:
> > >
> > > 在使用flink过程中,运行官网给的命令
> > >
> > >
> >
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/yarn_setup.html#run-a-single-flink-job-on-yarn
> > > 报错如下:
> > >
> > > ➜  flink-1.8.1 ./bin/flink run -m yarn-cluster
> > > ./examples/batch/WordCount.jar
> > > 
> > >  The program finished with the following exception:
> > >
> > > java.lang.RuntimeException: Could not identify hostname and port in
> > > 'yarn-cluster'.
> > > at
> > >
> > >
> >
> org.apache.flink.client.ClientUtils.parseHostPortAddress(ClientUtils.java:47)
> > > at
> > >
> > >
> >
> org.apache.flink.client.cli.AbstractCustomCommandLine.applyCommandLineOptionsToConfiguration(AbstractCustomCommandLine.java:83)
> > > at
> > >
> > >
> >
> org.apache.flink.client.cli.DefaultCLI.createClusterDescriptor(DefaultCLI.java:60)
> > > at
> > >
> > >
> >
> org.apache.flink.client.cli.DefaultCLI.createClusterDescriptor(DefaultCLI.java:35)
> > > at
> > org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:224)
> > > at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
> > > at
> > >
> > >
> >
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050)
> > > at
> > >
> > >
> >
> org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)
> > > at
> > >
> > >
> >
> org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
> > > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)
> > >
> > >
> > > 疑问:
> > > 1.为什么会将 yarn-clustet解析为host?
> > > 2.要运行single flink job on yarn的正确姿势是啥?
> > >
> > > Best,
> > > Yuhuan
> > >
> >
>


Re: Re: CsvTableSink 目录没有写入具体的数据

2019-08-08 Thread wangl...@geekplus.com.cn
   
我接入了一个 RocketMQ 的流作为输入。


 DataStream> ds = env.addSource(new 
RocketMQSource(

System.out.println(res);
return res;
}
});


tableEnv.registerDataStream("t_pick_task", ds, "pick_task_id, 
pick_list_no, sku_code");

TableSink csvSink = new CsvTableSink("D:\\data\\flink",",");
String[] fieldNames = {"num"};
TypeInformation[] fieldTypes = {Types.INT};
tableEnv.registerTableSink("RubberOrders", fieldNames, fieldTypes, 
csvSink);
tableEnv.sqlUpdate(
"INSERT INTO RubberOrders SELECT pick_task_id FROM 
t_pick_task");



wangl...@geekplus.com.cn
 
Sender: Alec Chen
Send Time: 2019-08-08 21:01
Receiver: user-zh
Subject: Re: CsvTableSink 目录没有写入具体的数据
完整代码发一下
 
wangl...@geekplus.com.cn  于2019年8月8日周四 下午7:37写道:
 
>
> 我按官网上的
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/sql.html#specifying-a-query
> 例子写的代码
> 但运行后 CsvTableSink 指定的目录只生成了空文件,没有具体的内容,这是为什么呢?
>
>
>
> wangl...@geekplus.com.cn
>


Re: Capping RocksDb memory usage

2019-08-08 Thread Cam Mach
Hi Biao, Yun and Ning.

Thanks for your response and pointers. Those are very helpful!

So far, we have tried with some of those parameters (WriterBufferManager,
write_buffer_size, write_buffer_count, ...), but still continuously having
issues with memory.
Here are our cluster configurations:

   - 1 Job Controller (32 GB RAM and 8 cores)
   - 10 Task Managers: (32 GB RAM, 8 cores CPU, and 300GB SSD configured
   for RocksDB, and we set 10GB heap for each)
   - Running under Kuberntes

We have a pipeline that read/transfer 500 million records (around 1kb
each), and write to our sink. Our total data is around 1.2 Terabytes. Our
pipeline configurations are as follows:

   - 13 operators - some of them (around 6) are stateful
   - Parallelism: 60
   - Task slots: 6

We have run several tests and observed that memory just keep growing while
our TM's CPU stay around 10 - 15% usage. We are now just focusing limiting
memory usage from Flink and RocksDB, so Kubernetes won't kill it.

Any recommendations or advices are greatly appreciated!

Thanks,




On Thu, Aug 8, 2019 at 6:57 AM Yun Tang  wrote:

> Hi Cam
>
> I think FLINK-7289 [1] might offer you some insights to control RocksDB
> memory, especially the idea using write buffer manager [2] to control the
> total write buffer memory. If you do not have too many sst files, write
> buffer memory usage would consume much more space than index and filter
> usage. Since Flink would use per state per column family, and the write
> buffer number increase when more column families created.
>
>
> [1] https://issues.apache.org/jira/browse/FLINK-7289
> [2] https://github.com/dataArtisans/frocksdb/pull/4
>
> Best
> Yun Tang
>
>
> --
> *From:* Cam Mach 
> *Sent:* Thursday, August 8, 2019 21:39
> *To:* Biao Liu 
> *Cc:* miki haiat ; user 
> *Subject:* Re: Capping RocksDb memory usage
>
> Thanks for your response, Biao.
>
>
>
> On Wed, Aug 7, 2019 at 11:41 PM Biao Liu  wrote:
>
> Hi Cam,
>
> AFAIK, that's not an easy thing. Actually it's more like a Rocksdb issue.
> There is a document explaining the memory usage of Rocksdb [1]. It might be
> helpful.
>
> You could define your own option to tune Rocksdb through
> "state.backend.rocksdb.options-factory" [2]. However I would suggest not to
> do this unless you are fully experienced of Rocksdb. IMO it's quite
> complicated.
>
> Meanwhile I can share a bit experience of this. We have tried to put the
> cache and filter into block cache before. It's useful to control the memory
> usage. But the performance might be affected at the same time. Anyway you
> could try and tune it. Good luck!
>
> 1. https://github.com/facebook/rocksdb/wiki/Memory-usage-in-RocksDB
> 2.
> https://ci.apache.org/projects/flink/flink-docs-master/ops/state/large_state_tuning.html#tuning-rocksdb
>
> Thanks,
> Biao /'bɪ.aʊ/
>
>
>
> On Thu, Aug 8, 2019 at 11:44 AM Cam Mach  wrote:
>
> Yes, that is correct.
> Cam Mach
> Software Engineer
> E-mail: cammac...@gmail.com
> Tel: 206 972 2768
>
>
>
> On Wed, Aug 7, 2019 at 8:33 PM Biao Liu  wrote:
>
> Hi Cam,
>
> Do you mean you want to limit the memory usage of RocksDB state backend?
>
> Thanks,
> Biao /'bɪ.aʊ/
>
>
>
> On Thu, Aug 8, 2019 at 2:23 AM miki haiat  wrote:
>
> I think using metrics exporter is the easiest way
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html#rocksdb
>
>
> On Wed, Aug 7, 2019, 20:28 Cam Mach  wrote:
>
> Hello everyone,
>
> What is the most easy and efficiently way to cap RocksDb's memory usage?
>
> Thanks,
> Cam
>
>


NoClassDefFoundError in failing-restarting job that uses url classloader

2019-08-08 Thread Subramanyam Ramanathan

Hello,

I'm currently using flink 1.7.2.

I'm trying to run a job that's submitted programmatically using the 
ClusterClient API.
   public JobSubmissionResult run(PackagedProgram prog, int 
parallelism)


The job makes use of some jars which I add to the packaged program through the 
Packaged constructor, along with the Jar file.
   public PackagedProgram(File jarFile, List classpaths, String... args)
Normally, This works perfectly and the job runs fine.

However, if there's an error in the job, and the job goes into failing state 
and when it's continously  trying to restart the job for an hour or so, I 
notice a NoClassDefFoundError for some classes in the jars that I load using 
the URL class loader and the job never recovers after that, even if the root 
cause of the issue was fixed (I had a kafka source/sink in my job, and kafka 
was down temporarily, and was brought up after that).
The jar is still available at the path referenced by the url classloader and is 
not tampered with.

Could anyone please give me some pointers with regard to the reason why this 
could happen/what I could be missing here/how can I debug further ?

thanks
Subbu




Re: Can Flink help us solve the following use case

2019-08-08 Thread Yoandy Rodríguez
Hello Biao,

There's a legacy component that expect this "time slices" and tags to be
set on our operational data store.

Right now I would like to just have the tags set properly on each
record, after some reading I came out with the idea of setting multiple
sliding windows

but there's still an issue with the overlapping "time slices", some
elements belong to more than one tag and in that case, the one

representing the shortest time span should be used.

On 07/08/2019 23:02, Biao Liu wrote:
> Hi Yoandy,
>
> Could you explain more of your requirements?
> Why do you want to split data into "time slices"? Do you want to do
> some aggregations or just give each record a tag or tags?
>
> Thanks,
> Biao /'bɪ.aʊ/
>
>
>
> On Thu, Aug 8, 2019 at 4:52 AM Sameer Wadkar  > wrote:
>
> You could do this using custom triggers and evictors in Flink.
> That way you can control when the windows fire and what elements
> are fired with it. And lastly the custom evictors know when to
> remove elements from the window.
>
> Yes Flink can support it.
>
> Sent from my iPhone
>
> > On Aug 7, 2019, at 4:19 PM, Yoandy Rodríguez
> mailto:mr.domi...@gmail.com>> wrote:
> >
> > Hello everybody,
> >
> > We have the following situation:
> >
> > 1) A data stream which collects all system events (near 1/2 a
> mil per day).
> >
> > 2) A database storing some aggregation of the data.
> >
> > We want to split the data into different "time slices" and be
> able to
> > "tag it" accordingly.
> >
> > Example:
> >
> > the events in the first hour will be tagged as such:
> >
> > Time of arrival (slice)        Tag
> >
> > 0:00:00 - 0:59:59               Last Hour
> >
> > 0:30:00 - 0:59:59              Last 1/2 Hour
> >
> > 0:50:00 - 0:59:59              Last 10 minutes
> >
> > Now, when we reach 1:09:59 the "last ten minutes" tags, moves
> to  that
> > slice, and so do the other ones.
> >
> > Mi initial idea was to have multiple windows operating over the same
> > stream, but in that case I would have
> >
> > to keep a longer window just to remove the tag for events after
> the 1
> > hour period.  Is there any way to avoid this?
> >
> >
> > PD.
> >
> > This is part of my first Flink project so alternative
> > solutions/literature are very much welcome
> >
> >
>


Re: Capping RocksDb memory usage

2019-08-08 Thread Yun Tang
Hi Cam

I think FLINK-7289 [1] might offer you some insights to control RocksDB memory, 
especially the idea using write buffer manager [2] to control the total write 
buffer memory. If you do not have too many sst files, write buffer memory usage 
would consume much more space than index and filter usage. Since Flink would 
use per state per column family, and the write buffer number increase when more 
column families created.


[1] https://issues.apache.org/jira/browse/FLINK-7289
[2] https://github.com/dataArtisans/frocksdb/pull/4

Best
Yun Tang



From: Cam Mach 
Sent: Thursday, August 8, 2019 21:39
To: Biao Liu 
Cc: miki haiat ; user 
Subject: Re: Capping RocksDb memory usage

Thanks for your response, Biao.



On Wed, Aug 7, 2019 at 11:41 PM Biao Liu 
mailto:mmyy1...@gmail.com>> wrote:
Hi Cam,

AFAIK, that's not an easy thing. Actually it's more like a Rocksdb issue. There 
is a document explaining the memory usage of Rocksdb [1]. It might be helpful.

You could define your own option to tune Rocksdb through 
"state.backend.rocksdb.options-factory" [2]. However I would suggest not to do 
this unless you are fully experienced of Rocksdb. IMO it's quite complicated.

Meanwhile I can share a bit experience of this. We have tried to put the cache 
and filter into block cache before. It's useful to control the memory usage. 
But the performance might be affected at the same time. Anyway you could try 
and tune it. Good luck!

1. https://github.com/facebook/rocksdb/wiki/Memory-usage-in-RocksDB
2. 
https://ci.apache.org/projects/flink/flink-docs-master/ops/state/large_state_tuning.html#tuning-rocksdb

Thanks,
Biao /'bɪ.aʊ/



On Thu, Aug 8, 2019 at 11:44 AM Cam Mach 
mailto:cammac...@gmail.com>> wrote:
Yes, that is correct.
Cam Mach
Software Engineer
E-mail: cammac...@gmail.com
Tel: 206 972 2768



On Wed, Aug 7, 2019 at 8:33 PM Biao Liu 
mailto:mmyy1...@gmail.com>> wrote:
Hi Cam,

Do you mean you want to limit the memory usage of RocksDB state backend?

Thanks,
Biao /'bɪ.aʊ/



On Thu, Aug 8, 2019 at 2:23 AM miki haiat 
mailto:miko5...@gmail.com>> wrote:
I think using metrics exporter is the easiest way

[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html#rocksdb


On Wed, Aug 7, 2019, 20:28 Cam Mach 
mailto:cammac...@gmail.com>> wrote:
Hello everyone,

What is the most easy and efficiently way to cap RocksDb's memory usage?

Thanks,
Cam



Re: Capping RocksDb memory usage

2019-08-08 Thread Ning Shi
Hi Cam,

This blog post has some pointers in tuning RocksDB memory usage that
might be of help.

https://klaviyo.tech/flinkperf-c7bd28acc67

Ning

On Wed, Aug 7, 2019 at 1:28 PM Cam Mach  wrote:
>
> Hello everyone,
>
> What is the most easy and efficiently way to cap RocksDb's memory usage?
>
> Thanks,
> Cam
>


Re: Capping RocksDb memory usage

2019-08-08 Thread Cam Mach
Thanks for your response, Biao.



On Wed, Aug 7, 2019 at 11:41 PM Biao Liu  wrote:

> Hi Cam,
>
> AFAIK, that's not an easy thing. Actually it's more like a Rocksdb issue.
> There is a document explaining the memory usage of Rocksdb [1]. It might be
> helpful.
>
> You could define your own option to tune Rocksdb through
> "state.backend.rocksdb.options-factory" [2]. However I would suggest not to
> do this unless you are fully experienced of Rocksdb. IMO it's quite
> complicated.
>
> Meanwhile I can share a bit experience of this. We have tried to put the
> cache and filter into block cache before. It's useful to control the memory
> usage. But the performance might be affected at the same time. Anyway you
> could try and tune it. Good luck!
>
> 1. https://github.com/facebook/rocksdb/wiki/Memory-usage-in-RocksDB
> 2.
> https://ci.apache.org/projects/flink/flink-docs-master/ops/state/large_state_tuning.html#tuning-rocksdb
>
> Thanks,
> Biao /'bɪ.aʊ/
>
>
>
> On Thu, Aug 8, 2019 at 11:44 AM Cam Mach  wrote:
>
>> Yes, that is correct.
>> Cam Mach
>> Software Engineer
>> E-mail: cammac...@gmail.com
>> Tel: 206 972 2768
>>
>>
>>
>> On Wed, Aug 7, 2019 at 8:33 PM Biao Liu  wrote:
>>
>>> Hi Cam,
>>>
>>> Do you mean you want to limit the memory usage of RocksDB state backend?
>>>
>>> Thanks,
>>> Biao /'bɪ.aʊ/
>>>
>>>
>>>
>>> On Thu, Aug 8, 2019 at 2:23 AM miki haiat  wrote:
>>>
 I think using metrics exporter is the easiest way

 [1]
 https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html#rocksdb


 On Wed, Aug 7, 2019, 20:28 Cam Mach  wrote:

> Hello everyone,
>
> What is the most easy and efficiently way to cap RocksDb's memory
> usage?
>
> Thanks,
> Cam
>
>


Re: Re: [ANNOUNCE] Hequn becomes a Flink committer

2019-08-08 Thread Congxian Qiu
Congratulations Hequn!

Best,
Congxian


Yu Li  于2019年8月8日周四 下午2:02写道:

> Congratulations Hequn! Well deserved!
>
> Best Regards,
> Yu
>
>
> On Thu, 8 Aug 2019 at 03:53, Haibo Sun  wrote:
>
>> Congratulations!
>>
>> Best,
>> Haibo
>>
>> At 2019-08-08 02:08:21, "Yun Tang"  wrote:
>> >Congratulations Hequn.
>> >
>> >Best
>> >Yun Tang
>> >
>> >From: Rong Rong 
>> >Sent: Thursday, August 8, 2019 0:41
>> >Cc: dev ; user 
>> >Subject: Re: [ANNOUNCE] Hequn becomes a Flink committer
>> >
>> >Congratulations Hequn, well deserved!
>> >
>> >--
>> >Rong
>> >
>> >On Wed, Aug 7, 2019 at 8:30 AM 
>> >mailto:xingc...@gmail.com>> wrote:
>> >
>> >Congratulations, Hequn!
>> >
>> >
>> >
>> >From: Xintong Song mailto:tonysong...@gmail.com>>
>> >Sent: Wednesday, August 07, 2019 10:41 AM
>> >To: d...@flink.apache.org
>> >Cc: user mailto:user@flink.apache.org>>
>> >Subject: Re: [ANNOUNCE] Hequn becomes a Flink committer
>> >
>> >
>> >
>> >Congratulations~!
>> >
>> >
>> >Thank you~
>> >
>> >Xintong Song
>> >
>> >
>> >
>> >
>> >
>> >On Wed, Aug 7, 2019 at 4:00 PM vino yang 
>> >mailto:yanghua1...@gmail.com>> wrote:
>> >
>> >Congratulations!
>> >
>> >highfei2...@126.com 
>> >mailto:highfei2...@126.com>> 于2019年8月7日周三 下午7:09写道:
>> >
>> >> Congrats Hequn!
>> >>
>> >> Best,
>> >> Jeff Yang
>> >>
>> >>
>> >>  Original Message 
>> >> Subject: Re: [ANNOUNCE] Hequn becomes a Flink committer
>> >> From: Piotr Nowojski
>> >> To: JingsongLee
>> >> CC: Biao Liu ,Zhu Zhu ,Zili Chen ,Jeff Zhang ,Paul Lam ,jincheng sun ,dev 
>> >> ,user
>> >>
>> >>
>> >> Congratulations :)
>> >>
>> >> On 7 Aug 2019, at 12:09, JingsongLee 
>> >> mailto:lzljs3620...@aliyun.com>> wrote:
>> >>
>> >> Congrats Hequn!
>> >>
>> >> Best,
>> >> Jingsong Lee
>> >>
>> >> --
>> >> From:Biao Liu mailto:mmyy1...@gmail.com>>
>> >> Send Time:2019年8月7日(星期三) 12:05
>> >> To:Zhu Zhu mailto:reed...@gmail.com>>
>> >> Cc:Zili Chen mailto:wander4...@gmail.com>>; Jeff 
>> >> Zhang mailto:zjf...@gmail.com>>; Paul
>> >> Lam mailto:paullin3...@gmail.com>>; jincheng sun 
>> >> mailto:sunjincheng...@gmail.com>>; dev
>> >> mailto:d...@flink.apache.org>>; user 
>> >> mailto:user@flink.apache.org>>
>> >> Subject:Re: [ANNOUNCE] Hequn becomes a Flink committer
>> >>
>> >> Congrats Hequn!
>> >>
>> >> Thanks,
>> >> Biao /'bɪ.aʊ/
>> >>
>> >>
>> >>
>> >> On Wed, Aug 7, 2019 at 6:00 PM Zhu Zhu 
>> >> mailto:reed...@gmail.com>> wrote:
>> >> Congratulations to Hequn!
>> >>
>> >> Thanks,
>> >> Zhu Zhu
>> >>
>> >> Zili Chen mailto:wander4...@gmail.com>> 
>> >> 于2019年8月7日周三 下午5:16写道:
>> >> Congrats Hequn!
>> >>
>> >> Best,
>> >> tison.
>> >>
>> >>
>> >> Jeff Zhang mailto:zjf...@gmail.com>> 于2019年8月7日周三 
>> >> 下午5:14写道:
>> >> Congrats Hequn!
>> >>
>> >> Paul Lam mailto:paullin3...@gmail.com>> 
>> >> 于2019年8月7日周三 下午5:08写道:
>> >> Congrats Hequn! Well deserved!
>> >>
>> >> Best,
>> >> Paul Lam
>> >>
>> >> 在 2019年8月7日,16:28,jincheng sun 
>> >> mailto:sunjincheng...@gmail.com>> 写道:
>> >>
>> >> Hi everyone,
>> >>
>> >> I'm very happy to announce that Hequn accepted the offer of the Flink PMC
>> >> to become a committer of the Flink project.
>> >>
>> >> Hequn has been contributing to Flink for many years, mainly working on
>> >> SQL/Table API features. He's also frequently helping out on the user
>> >> mailing lists and helping check/vote the release.
>> >>
>> >> Congratulations Hequn!
>> >>
>> >> Best, Jincheng
>> >> (on behalf of the Flink PMC)
>> >>
>> >>
>> >>
>> >> --
>> >> Best Regards
>> >>
>> >> Jeff Zhang
>> >>
>> >>
>> >>
>>
>>


Re: CsvTableSink 目录没有写入具体的数据

2019-08-08 Thread Alec Chen
完整代码发一下

wangl...@geekplus.com.cn  于2019年8月8日周四 下午7:37写道:

>
> 我按官网上的
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/sql.html#specifying-a-query
> 例子写的代码
> 但运行后 CsvTableSink 指定的目录只生成了空文件,没有具体的内容,这是为什么呢?
>
>
>
> wangl...@geekplus.com.cn
>


Re: need help

2019-08-08 Thread Biao Liu
你好,

异常里可以看出 AskTimeoutException, 可以调整两个参数 akka.ask.timeout 和 web.timeout
再试一下,默认值如下

akka.ask.timeout: 10 s
web.timeout: 1

PS: 搜 “AskTimeoutException Flink” 可以搜到很多相关答案

Thanks,
Biao /'bɪ.aʊ/



On Thu, Aug 8, 2019 at 7:33 PM 陈某  wrote:

>
>
> -- Forwarded message -
> 发件人: 陈某 
> Date: 2019年8月8日周四 下午7:25
> Subject: need help
> To: 
>
>
> 你好,我是一个刚接触flink的新手,在搭建完flink on
> yarn集群后,依次启动zookeeper,hadoop,yarn,flkink集群,并提交认识到yarn上时运行遇到问题,网上搜索相关问题,暂未找到解决方式,希望能得到帮助,谢谢。
>
> 使用的运行指令为:
> [root@flink01 logs]# flink run -m  yarn-cluster
> ./examples/streaming/WordCount.jar
> 查看log后错误信息如下:(附件中为完整的log文件)
> org.apache.flink.client.program.ProgramInvocationException: Could not
> retrieve the execution result. (JobID: 91e82fd8626bde4c901bf0b1639e12e7)
> at
> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:261)
> at
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:483)
> at
> org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
> at
> org.apache.flink.streaming.examples.wordcount.WordCount.main(WordCount.java:89)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)
> at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
> at
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:423)
> at
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:813)
> at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:287)
> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
> at
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050)
> at
> org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
> at
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)
> Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed
> to submit JobGraph.
> at
> org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$8(RestClusterClient.java:388)
> at
> java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
> at
> java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
> at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
> at
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
> at
> org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$5(FutureUtils.java:208)
> at
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
> at
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
> at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
> at
> java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561)
> at
> java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:929)
> at
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.runtime.rest.util.RestClientException:
> [Internal server error.,  akka.pattern.AskTimeoutException: Ask timed out on
> [Actor[akka://flink/user/dispatcher#2035575525]] after [1 ms].
> Sender[null] sent message of type
> "org.apache.flink.runtime.rpc.messages.LocalFencedMessage".
> at
> akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604)
> at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)
> at
> scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)
> at
> scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
> at
> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)
> at
> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)
> at
> akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280)
> at
> 

CsvTableSink 目录没有写入具体的数据

2019-08-08 Thread wangl...@geekplus.com.cn

我按官网上的 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/sql.html#specifying-a-query
  例子写的代码
但运行后 CsvTableSink 指定的目录只生成了空文件,没有具体的内容,这是为什么呢?



wangl...@geekplus.com.cn


Fwd: need help

2019-08-08 Thread 陈某
-- Forwarded message -
发件人: 陈某 
Date: 2019年8月8日周四 下午7:25
Subject: need help
To: 


你好,我是一个刚接触flink的新手,在搭建完flink on
yarn集群后,依次启动zookeeper,hadoop,yarn,flkink集群,并提交认识到yarn上时运行遇到问题,网上搜索相关问题,暂未找到解决方式,希望能得到帮助,谢谢。

使用的运行指令为:
[root@flink01 logs]# flink run -m  yarn-cluster
./examples/streaming/WordCount.jar
查看log后错误信息如下:(附件中为完整的log文件)
org.apache.flink.client.program.ProgramInvocationException: Could not
retrieve the execution result. (JobID: 91e82fd8626bde4c901bf0b1639e12e7)
at
org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:261)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:483)
at
org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
at
org.apache.flink.streaming.examples.wordcount.WordCount.main(WordCount.java:89)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:423)
at
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:813)
at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:287)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
at
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050)
at
org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
at
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)
Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed
to submit JobGraph.
at
org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$8(RestClusterClient.java:388)
at
java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
at
java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
at
org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$5(FutureUtils.java:208)
at
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
at
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at
java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561)
at
java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:929)
at
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.runtime.rest.util.RestClientException:
[Internal server error., 

Re: Operator state

2019-08-08 Thread Yun Tang
Hi

When talking about sharing state, broadcast state [1][2] might be a choice.

[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/broadcast_state.html#provided-apis
[2] https://flink.apache.org/2019/06/26/broadcast-state.html

Best
Yun Tang



From: Navneeth Krishnan 
Sent: Thursday, August 8, 2019 13:50
To: user 
Subject: Operator state

Hi All,

Is there a way to share operator state among operators? For example, I have an 
operator which has union state and the same state data is required in a 
downstream operator. If not, is there a recommended way to share the state?

Thanks


Re: flink 结合canal统计订单gmv

2019-08-08 Thread Alec Chen
Hi,

截图无法显示, 不知道你是使用FlinkSQL还是DataStreamAPI实现, 前者可以参考UDTF, 后者可以参考FlatMap "Takes
one element and produces zero, one, or more elements. A flatmap function
that splits sentences to words"
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/

王飞  于2019年8月8日周四 下午4:53写道:

> hi 你好
> 需要用flink 解析mysql的binlog 统计订单表  产品维度的gmv,
> 但是一个insert的binlog 会出现同时购买多个订单 会出现一个集合的订单集合 但是我想统计订单里面产品维度的gmv,如下图
> 返回的是一个list的订单集合,但是我想取每个订单里面的产品id进行维度统计 ,请问flink 有什么算子 可以把一个list数据的流数据
> 变成多条流
> 谢谢
>
>
>
>


Re: 关于event-time的定义与产生时间戳位置的问题。

2019-08-08 Thread Alec Chen
Hi,
Q: event time这个时间戳是在什么时候打到数据上面去的,
A: event time按字面意思理解为event发生的时间, 如果产生数据的设备提供了记录时间的字段, 并且业务逻辑也需要使用这个时间,
则可以将该时间作为event time. 更多信息可以参考
https://ci.apache.org/projects/flink/flink-docs-stable/dev/event_time.html
关于event
time, processing time的描述

zhaoheng.zhaoh...@qq.com  于2019年8月8日周四 下午4:36写道:

>
> hi,all:
>   event time这个时间戳是在什么时候打到数据上面去的,看api是在flink
> source收到数据之后再标注的,并不是真正的数据源携带过来的(比如手机终端)?使用kafka
> source的话根据文档的定义kafka携带的时间戳也仅仅是kafka收到数据的时候打上的时间戳。
>
> 那么有个问题:以kafka为例,数据到队列的时候按「顺序」打上时间戳,那么如果数据是「乱序到达」的也被打上了「递增的时间戳」,后续基于event-time的处理都是基于这个时间戳来进行,那不就丧失了真实世界的定义吗?
>   不知道有哪里是我理解不对的地方望指教!
>   祝好~
>


Re: Consuming data from dynamoDB streams to flink

2019-08-08 Thread Vinay Patil
Hello,

For anyone looking for setting up alerts for flink application ,here is
good blog by Flink itself :
https://www.ververica.com/blog/monitoring-apache-flink-applications-101
So, for dynamoDb streams we can set the alert on millisBehindLatest

Regards,
Vinay Patil


On Wed, Aug 7, 2019 at 2:24 PM Vinay Patil  wrote:

> Hi Andrey,
>
> Thank you for your reply, I understand that the checkpoints are gone when
> the job is cancelled or killed, may be configuring external checkpoints
> will help here so that we can resume from there.
>
> My points was if the job is terminated, and the stream position is set to
> TRIM_HORIZON , the consumer will start processing from the start, I was
> curious to know if there is a configuration like kafka_group_id that we can
> set for dynamoDB streams.
>
> Also, can you please let me know on which metrics should I generate an
> alert in case of DynamoDb Streams  (I am sending the metrics to
> prometheus), I see these metrics :
> https://github.com/apache/flink/blob/50d076ab6ad325907690a2c115ee2cb1c45775c9/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/metrics/ShardMetricsReporter.java
>
>
> For Example: in case of Kafka we generate  alert when the consumer is
> lagging behind.
>
>
> Regards,
> Vinay Patil
>
>
> On Fri, Jul 19, 2019 at 10:40 PM Andrey Zagrebin 
> wrote:
>
>> Hi Vinay,
>>
>> 1. I would assume it works similar to kinesis connector (correct me if
>> wrong, people who actually developed it)
>> 2. If you have activated just checkpointing, the checkpoints are gone if
>> you externally kill the job. You might be interested in savepoints [1]
>> 3. See paragraph in [2] about kinesis consumer parallelism
>>
>> Best,
>> Andrey
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/state/savepoints.html
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/connectors/kinesis.html#kinesis-consumer
>>
>> On Fri, Jul 19, 2019 at 1:45 PM Vinay Patil 
>> wrote:
>>
>>> Hi,
>>>
>>> I am using this consumer for processing records from DynamoDb Streams ,
>>> few questions on this :
>>>
>>> 1. How does checkpointing works with Dstreams, since this class is
>>> extending FlinkKinesisConsumer, I am assuming it will start from the last
>>> successful checkpoint in case of failure, right ?
>>> 2. Currently when I kill the pipeline and start again it reads all the
>>> data from the start of the stream, is there any configuration to avoid this
>>> (apart from ConsumerConfigConstants.InitialPosition.LATEST), similar to
>>> group-id in Kafka.
>>> 3. As these DynamoDB Streams are separated by shards what is the
>>> recommended parallelism to be set for the source , should it be one to one
>>> mapping , for example if there are 3 shards , then parallelism should be 3 ?
>>>
>>>
>>> Regards,
>>> Vinay Patil
>>>
>>>
>>> On Wed, Aug 1, 2018 at 3:42 PM Ying Xu [via Apache Flink Mailing List
>>> archive.]  wrote:
>>>
 Thank you so much Fabian!

 Will update status in the JIRA.

 -
 Ying

 On Tue, Jul 31, 2018 at 1:37 AM, Fabian Hueske <[hidden email]
 > wrote:

 > Done!
 >
 > Thank you :-)
 >
 > 2018-07-31 6:41 GMT+02:00 Ying Xu <[hidden email]
 >:
 >
 > > Thanks Fabian and Thomas.
 > >
 > > Please assign FLINK-4582 to the following username:
 > > *yxu-apache
 > > <
 https://issues.apache.org/jira/secure/ViewProfile.jspa?name=yxu-apache
 > >*
 > >
 > > If needed I can get a ICLA or CCLA whichever is proper.
 > >
 > > *Ying Xu*
 > > Software Engineer
 > > 510.368.1252 <+15103681252>
 > > [image: Lyft] 
 > >
 > > On Mon, Jul 30, 2018 at 8:31 PM, Thomas Weise <[hidden email]
 > wrote:
 > >
 > > > The user is yxu-lyft, Ying had commented on that JIRA as well.
 > > >
 > > > https://issues.apache.org/jira/browse/FLINK-4582
 > > >
 > > >
 > > > On Mon, Jul 30, 2018 at 1:25 AM Fabian Hueske <[hidden email]
 >
 > wrote:
 > > >
 > > > > Hi Ying,
 > > > >
 > > > > Thanks for considering to contribute the connector!
 > > > >
 > > > > In general, you don't need special permissions to contribute to
 > Flink.
 > > > > Anybody can open Jiras and PRs.
 > > > > You only need to be assigned to the Contributor role in Jira to
 be
 > able
 > > > to
 > > > > assign an issue to you.
 > > > > I can give you these permissions if you tell me your Jira user.
 > > > >
 > > > > It would also be good if you could submit a CLA [1] if you plan
 to
 > > > > contribute a larger feature.
 > > > >
 > > > > Thanks, Fabian
 > > > >
 > > > > [1] 

flink 结合canal统计订单gmv

2019-08-08 Thread 王飞
hi 你好
需要用flink 解析mysql的binlog 统计订单表  产品维度的gmv,
但是一个insert的binlog 会出现同时购买多个订单 会出现一个集合的订单集合 但是我想统计订单里面产品维度的gmv,如下图
返回的是一个list的订单集合,但是我想取每个订单里面的产品id进行维度统计 ,请问flink 有什么算子 可以把一个list数据的流数据 变成多条流
谢谢

Re: Capping RocksDb memory usage

2019-08-08 Thread Biao Liu
Hi Cam,

AFAIK, that's not an easy thing. Actually it's more like a Rocksdb issue.
There is a document explaining the memory usage of Rocksdb [1]. It might be
helpful.

You could define your own option to tune Rocksdb through
"state.backend.rocksdb.options-factory" [2]. However I would suggest not to
do this unless you are fully experienced of Rocksdb. IMO it's quite
complicated.

Meanwhile I can share a bit experience of this. We have tried to put the
cache and filter into block cache before. It's useful to control the memory
usage. But the performance might be affected at the same time. Anyway you
could try and tune it. Good luck!

1. https://github.com/facebook/rocksdb/wiki/Memory-usage-in-RocksDB
2.
https://ci.apache.org/projects/flink/flink-docs-master/ops/state/large_state_tuning.html#tuning-rocksdb

Thanks,
Biao /'bɪ.aʊ/



On Thu, Aug 8, 2019 at 11:44 AM Cam Mach  wrote:

> Yes, that is correct.
> Cam Mach
> Software Engineer
> E-mail: cammac...@gmail.com
> Tel: 206 972 2768
>
>
>
> On Wed, Aug 7, 2019 at 8:33 PM Biao Liu  wrote:
>
>> Hi Cam,
>>
>> Do you mean you want to limit the memory usage of RocksDB state backend?
>>
>> Thanks,
>> Biao /'bɪ.aʊ/
>>
>>
>>
>> On Thu, Aug 8, 2019 at 2:23 AM miki haiat  wrote:
>>
>>> I think using metrics exporter is the easiest way
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html#rocksdb
>>>
>>>
>>> On Wed, Aug 7, 2019, 20:28 Cam Mach  wrote:
>>>
 Hello everyone,

 What is the most easy and efficiently way to cap RocksDb's memory usage?

 Thanks,
 Cam




回复:Re: Re: submit jobGraph error on server side

2019-08-08 Thread 王智
感谢大神,

是我配置的资源太少导致响应慢,导致akka 超时。




现在我换了一个k8s 集群,调大了资源,已经不再配到邮件中的发生的异常。









原始邮件



发件人:"Zili Chen"< wander4...@gmail.com ;

发件时间:2019/8/7 15:32

收件人:"王智"< ben.wa...@foxmail.com ;

抄送人:"user-zh"< user-zh@flink.apache.org ;

主题:Re: Re: submit jobGraph error on server side




从错误堆栈上看你的请求应该是已经发到 jobmanager 上了,也就是不存在找不到端口的问题。
但是 jobmanager 在处理 submit job 的时候某个动作超时了。你这个问题是一旦把
gateway 分开就稳定复现吗?也有可能是 akka 偶然的超时。


Best,
tison.










王智 

Re: Re: [ANNOUNCE] Hequn becomes a Flink committer

2019-08-08 Thread Yu Li
Congratulations Hequn! Well deserved!

Best Regards,
Yu


On Thu, 8 Aug 2019 at 03:53, Haibo Sun  wrote:

> Congratulations!
>
> Best,
> Haibo
>
> At 2019-08-08 02:08:21, "Yun Tang"  wrote:
> >Congratulations Hequn.
> >
> >Best
> >Yun Tang
> >
> >From: Rong Rong 
> >Sent: Thursday, August 8, 2019 0:41
> >Cc: dev ; user 
> >Subject: Re: [ANNOUNCE] Hequn becomes a Flink committer
> >
> >Congratulations Hequn, well deserved!
> >
> >--
> >Rong
> >
> >On Wed, Aug 7, 2019 at 8:30 AM 
> >mailto:xingc...@gmail.com>> wrote:
> >
> >Congratulations, Hequn!
> >
> >
> >
> >From: Xintong Song mailto:tonysong...@gmail.com>>
> >Sent: Wednesday, August 07, 2019 10:41 AM
> >To: d...@flink.apache.org
> >Cc: user mailto:user@flink.apache.org>>
> >Subject: Re: [ANNOUNCE] Hequn becomes a Flink committer
> >
> >
> >
> >Congratulations~!
> >
> >
> >Thank you~
> >
> >Xintong Song
> >
> >
> >
> >
> >
> >On Wed, Aug 7, 2019 at 4:00 PM vino yang 
> >mailto:yanghua1...@gmail.com>> wrote:
> >
> >Congratulations!
> >
> >highfei2...@126.com 
> >mailto:highfei2...@126.com>> 于2019年8月7日周三 下午7:09写道:
> >
> >> Congrats Hequn!
> >>
> >> Best,
> >> Jeff Yang
> >>
> >>
> >>  Original Message 
> >> Subject: Re: [ANNOUNCE] Hequn becomes a Flink committer
> >> From: Piotr Nowojski
> >> To: JingsongLee
> >> CC: Biao Liu ,Zhu Zhu ,Zili Chen ,Jeff Zhang ,Paul Lam ,jincheng sun ,dev 
> >> ,user
> >>
> >>
> >> Congratulations :)
> >>
> >> On 7 Aug 2019, at 12:09, JingsongLee 
> >> mailto:lzljs3620...@aliyun.com>> wrote:
> >>
> >> Congrats Hequn!
> >>
> >> Best,
> >> Jingsong Lee
> >>
> >> --
> >> From:Biao Liu mailto:mmyy1...@gmail.com>>
> >> Send Time:2019年8月7日(星期三) 12:05
> >> To:Zhu Zhu mailto:reed...@gmail.com>>
> >> Cc:Zili Chen mailto:wander4...@gmail.com>>; Jeff 
> >> Zhang mailto:zjf...@gmail.com>>; Paul
> >> Lam mailto:paullin3...@gmail.com>>; jincheng sun 
> >> mailto:sunjincheng...@gmail.com>>; dev
> >> mailto:d...@flink.apache.org>>; user 
> >> mailto:user@flink.apache.org>>
> >> Subject:Re: [ANNOUNCE] Hequn becomes a Flink committer
> >>
> >> Congrats Hequn!
> >>
> >> Thanks,
> >> Biao /'bɪ.aʊ/
> >>
> >>
> >>
> >> On Wed, Aug 7, 2019 at 6:00 PM Zhu Zhu 
> >> mailto:reed...@gmail.com>> wrote:
> >> Congratulations to Hequn!
> >>
> >> Thanks,
> >> Zhu Zhu
> >>
> >> Zili Chen mailto:wander4...@gmail.com>> 于2019年8月7日周三 
> >> 下午5:16写道:
> >> Congrats Hequn!
> >>
> >> Best,
> >> tison.
> >>
> >>
> >> Jeff Zhang mailto:zjf...@gmail.com>> 于2019年8月7日周三 
> >> 下午5:14写道:
> >> Congrats Hequn!
> >>
> >> Paul Lam mailto:paullin3...@gmail.com>> 
> >> 于2019年8月7日周三 下午5:08写道:
> >> Congrats Hequn! Well deserved!
> >>
> >> Best,
> >> Paul Lam
> >>
> >> 在 2019年8月7日,16:28,jincheng sun 
> >> mailto:sunjincheng...@gmail.com>> 写道:
> >>
> >> Hi everyone,
> >>
> >> I'm very happy to announce that Hequn accepted the offer of the Flink PMC
> >> to become a committer of the Flink project.
> >>
> >> Hequn has been contributing to Flink for many years, mainly working on
> >> SQL/Table API features. He's also frequently helping out on the user
> >> mailing lists and helping check/vote the release.
> >>
> >> Congratulations Hequn!
> >>
> >> Best, Jincheng
> >> (on behalf of the Flink PMC)
> >>
> >>
> >>
> >> --
> >> Best Regards
> >>
> >> Jeff Zhang
> >>
> >>
> >>
>
>