Re: batch job OOM

2020-01-22 Thread Fanbin Bu
I got the following error when running another job. any suggestions?

Caused by: java.lang.IndexOutOfBoundsException
at org.apache.flink.core.memory.MemorySegment.getInt(MemorySegment.java:701)
at org.apache.flink.table.dataformat.BinaryRow.getInt(BinaryRow.java:264)
at HashWinAggWithKeys$538.endInput(Unknown Source)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain.endInput(OperatorChain.java:276)
at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.checkFinished(StreamOneInputProcessor.java:151)
at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:138)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.performDefaultAction(StreamTask.java:276)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:298)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:403)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:748)

On Wed, Jan 22, 2020 at 8:57 PM Fanbin Bu  wrote:

> Jingsong,
>
> I set the config value to be too large. After I changed it to a smaller
> number it works now!
> thanks you for the help. really appreciate it!
>
> Fanbin
>
> On Wed, Jan 22, 2020 at 8:50 PM Jingsong Li 
> wrote:
>
>> Fanbin,
>>
>> Looks like your config is wrong, can you show your config code?
>>
>> Best,
>> Jingsong Lee
>>
>> On Thu, Jan 23, 2020 at 12:41 PM Fanbin Bu 
>> wrote:
>>
>>> Jingsong,
>>>
>>> Great, now i got a different error:
>>>
>>> java.lang.NullPointerException: Initial Segment may not be null
>>> at 
>>> org.apache.flink.runtime.memory.AbstractPagedOutputView.(AbstractPagedOutputView.java:65)
>>> at 
>>> org.apache.flink.runtime.io.disk.SimpleCollectingOutputView.(SimpleCollectingOutputView.java:49)
>>> at 
>>> org.apache.flink.table.runtime.operators.aggregate.BytesHashMap$RecordArea.(BytesHashMap.java:522)
>>> at 
>>> org.apache.flink.table.runtime.operators.aggregate.BytesHashMap.(BytesHashMap.java:190)
>>> at 
>>> org.apache.flink.table.runtime.operators.aggregate.BytesHashMap.(BytesHashMap.java:149)
>>> at LocalHashWinAggWithKeys$292.open(Unknown Source)
>>> at 
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:529)
>>> at 
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:393)
>>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>>> at java.lang.Thread.run(Thread.java:748)
>>>
>>>
>>> is there any other config i should add?
>>>
>>> thanks,
>>>
>>> Fanbin
>>>
>>>
>>> On Wed, Jan 22, 2020 at 8:07 PM Fanbin Bu 
>>> wrote:
>>>
 you beat me to it.
 let's me try that.

 On Wed, Jan 22, 2020 at 7:57 PM Jingsong Li 
 wrote:

> Fanbin,
>
> Document is here:
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/config.html
> NOTE: you need configure this into TableConfig.
>
> Best,
> Jingsong Lee
>
> On Thu, Jan 23, 2020 at 11:54 AM Fanbin Bu 
> wrote:
>
>> Jingsong,
>>
>> Thank you for the response.
>> Since I'm using flink on EMR and the latest version is 1.9 now. the
>> second option is ruled out. but will keep that in mind for future 
>> upgrade.
>>
>> I'm going to try the first option. It's probably a good idea to add
>> that in the doc for example:
>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html
>>
>> Thanks,
>> Fanbin
>>
>> On Wed, Jan 22, 2020 at 7:08 PM Jingsong Li 
>> wrote:
>>
>>> Hi Fanbin,
>>>
>>> Thanks for using blink batch mode.
>>>
>>> The OOM is caused by the manage memory not enough in Hash
>>> aggregation.
>>>
>>> There are three options you can choose from:
>>>
>>> 1.Is your version Flink 1.9? 1.9 still use fix memory configuration.
>>> So you need increase hash memory:
>>> - table.exec.resource.hash-agg.memory: 1024 mb
>>>
>>> 2.In 1.10, we use slot manage memory to dynamic config real operator
>>> memory, so operator can use more manage memory, so you don't need 
>>> configure
>>> hash agg memory anymore. You can try 1.10 RC0 [1]
>>>
>>> 3.We can use sort aggregation to avoid OOM too, but there is no
>>> config option now, I created JIRA to track it. [2]
>>>
>>> [1]
>>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/ANNOUNCE-Apache-Flink-1-10-0-release-candidate-0-td36770.html
>>> [2] https://issues.apache.org/jira/browse/FLINK-15732
>>>
>>> Best,
>>> Jingsong Lee
>>>
>>> On Thu, Jan 23, 2020 at 7:40 AM Fanbin Bu 
>>> wrote:
>>>

 tried to increase memory:
 flink run  -m yarn-cluster -p 16 -ys 1 -ytm 20 -yjm 8096 

Re: where does flink store the intermediate results of a join and what is the key?

2020-01-22 Thread kant kodali
Is it a common practice to have a custom state backend? if so, what would
be a popular custom backend?

Can I do Elasticseatch as a state backend?

Thanks!

On Wed, Jan 22, 2020 at 1:42 AM Jark Wu  wrote:

> Hi Kant,
>
> 1) List of row is also sufficient in this case. Using a MapState is in
> order to retract a row faster, and save the storage size.
>
> 2) State Process API is usually used to process save point. I’m afraid the
> performance is not good to use it for querying.
> On the other side, AFAIK, State Process API requires the uid of
> operator. However, uid of operators is not set in Table API & SQL.
> So I’m not sure whether it works or not.
>
> 3)You can have a custom statebackend by
> implement org.apache.flink.runtime.state.StateBackend interface, and use it
> via `env.setStateBackend(…)`.
>
> Best,
> Jark
>
> On Wed, 22 Jan 2020 at 14:16, kant kodali  wrote:
>
>> Hi Jark,
>>
>> 1) shouldn't it be a col1 to List of row? multiple rows can have the same
>> joining key right?
>>
>> 2) Can I use state processor API
>> 
>> from an external application to query the intermediate results in near
>> real-time? I thought querying rocksdb state is a widely requested feature.
>> It would be really great to consider this feature for 1.11
>>
>> 3) Is there any interface where I can implement my own state backend?
>>
>> Thanks!
>>
>>
>> On Tue, Jan 21, 2020 at 6:08 PM Jark Wu  wrote:
>>
>>> Hi Kant,
>>>
>>> 1) Yes, it will be stored in rocksdb statebackend.
>>> 2) In old planner, the left state is the same with right state which are
>>> both `>>`.
>>> It is a 2-level map structure, where the `col1` is the join key, it
>>> is the first-level key of the state. The key of the MapState is the input
>>> row,
>>> and the `count` is the number of this row, the expiredTime indicates
>>> when to cleanup this row (avoid infinite state size). You can find the
>>> source code here[1].
>>> In blink planner, the state structure will be more complex which is
>>> determined by the meta-information of upstream. You can see the source code
>>> of blink planner here [2].
>>> 3) Currently, the intermediate state is not exposed to users. Usually,
>>> users should write the query result to an external system (like Mysql) and
>>> query the external system.
>>> Query on the intermediate state is on the roadmap, but I guess it is
>>> not in 1.11 plan.
>>>
>>> Best,
>>> Jark
>>>
>>> [1]:
>>> http://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/join/NonWindowJoin.scala#L61
>>> [2]:
>>> https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/stream/state/JoinRecordStateViews.java#L45
>>>
>>>
>>> 2020年1月21日 18:01,kant kodali  写道:
>>>
>>> Hi All,
>>>
>>> If I run a query like this
>>>
>>> StreamTableEnvironment.sqlQuery("select * from table1 join table2 on
>>> table1.col1 = table2.col1")
>>>
>>> 1) Where will flink store the intermediate result? Imagine
>>> flink-conf.yaml says state.backend = 'rocksdb'
>>>
>>> 2) If the intermediate results are stored in rockdb then what is the key
>>> and value in this case(given the query above)?
>>>
>>> 3) What is the best way to query these intermediate results from an
>>> external application? while the job is running and while the job is not
>>> running?
>>>
>>> Thanks!
>>>
>>>
>>>


Blocking KeyedCoProcessFunction.processElement1

2020-01-22 Thread Alexey Trenikhun

Hello,
If KeyedCoProcessFunction.processElement1 blocks for significant amount of 
time, will it prevent checkpoint ?

Thanks,
Alexey


Re: batch job OOM

2020-01-22 Thread Fanbin Bu
Jingsong,

I set the config value to be too large. After I changed it to a smaller
number it works now!
thanks you for the help. really appreciate it!

Fanbin

On Wed, Jan 22, 2020 at 8:50 PM Jingsong Li  wrote:

> Fanbin,
>
> Looks like your config is wrong, can you show your config code?
>
> Best,
> Jingsong Lee
>
> On Thu, Jan 23, 2020 at 12:41 PM Fanbin Bu  wrote:
>
>> Jingsong,
>>
>> Great, now i got a different error:
>>
>> java.lang.NullPointerException: Initial Segment may not be null
>>  at 
>> org.apache.flink.runtime.memory.AbstractPagedOutputView.(AbstractPagedOutputView.java:65)
>>  at 
>> org.apache.flink.runtime.io.disk.SimpleCollectingOutputView.(SimpleCollectingOutputView.java:49)
>>  at 
>> org.apache.flink.table.runtime.operators.aggregate.BytesHashMap$RecordArea.(BytesHashMap.java:522)
>>  at 
>> org.apache.flink.table.runtime.operators.aggregate.BytesHashMap.(BytesHashMap.java:190)
>>  at 
>> org.apache.flink.table.runtime.operators.aggregate.BytesHashMap.(BytesHashMap.java:149)
>>  at LocalHashWinAggWithKeys$292.open(Unknown Source)
>>  at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:529)
>>  at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:393)
>>  at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>>  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>>  at java.lang.Thread.run(Thread.java:748)
>>
>>
>> is there any other config i should add?
>>
>> thanks,
>>
>> Fanbin
>>
>>
>> On Wed, Jan 22, 2020 at 8:07 PM Fanbin Bu  wrote:
>>
>>> you beat me to it.
>>> let's me try that.
>>>
>>> On Wed, Jan 22, 2020 at 7:57 PM Jingsong Li 
>>> wrote:
>>>
 Fanbin,

 Document is here:
 https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/config.html
 NOTE: you need configure this into TableConfig.

 Best,
 Jingsong Lee

 On Thu, Jan 23, 2020 at 11:54 AM Fanbin Bu 
 wrote:

> Jingsong,
>
> Thank you for the response.
> Since I'm using flink on EMR and the latest version is 1.9 now. the
> second option is ruled out. but will keep that in mind for future upgrade.
>
> I'm going to try the first option. It's probably a good idea to add
> that in the doc for example:
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html
>
> Thanks,
> Fanbin
>
> On Wed, Jan 22, 2020 at 7:08 PM Jingsong Li 
> wrote:
>
>> Hi Fanbin,
>>
>> Thanks for using blink batch mode.
>>
>> The OOM is caused by the manage memory not enough in Hash aggregation.
>>
>> There are three options you can choose from:
>>
>> 1.Is your version Flink 1.9? 1.9 still use fix memory configuration.
>> So you need increase hash memory:
>> - table.exec.resource.hash-agg.memory: 1024 mb
>>
>> 2.In 1.10, we use slot manage memory to dynamic config real operator
>> memory, so operator can use more manage memory, so you don't need 
>> configure
>> hash agg memory anymore. You can try 1.10 RC0 [1]
>>
>> 3.We can use sort aggregation to avoid OOM too, but there is no
>> config option now, I created JIRA to track it. [2]
>>
>> [1]
>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/ANNOUNCE-Apache-Flink-1-10-0-release-candidate-0-td36770.html
>> [2] https://issues.apache.org/jira/browse/FLINK-15732
>>
>> Best,
>> Jingsong Lee
>>
>> On Thu, Jan 23, 2020 at 7:40 AM Fanbin Bu 
>> wrote:
>>
>>>
>>> tried to increase memory:
>>> flink run  -m yarn-cluster -p 16 -ys 1 -ytm 20 -yjm 8096 myjar
>>>
>>> and still got the same OOM exception.
>>>
>>> my sql is like:
>>>
>>> select id, hop_end(created_at, interval '30' second, interval '1' 
>>> minute), sum(field)... #20 of these sums
>>>
>>> from table group by id, hop(created_at, interval '30' second, interval 
>>> '1' minute)
>>>
>>>
>>>
>>> On Wed, Jan 22, 2020 at 3:19 PM Fanbin Bu 
>>> wrote:
>>>
 Hi,

 I have a batch job using blink planner. and got the following
 error. I was able to successfully run the same job with flink 1.8 on 
 yarn.

 I set conf as:
 taskmanager.heap.size: 5m

 and flink UI gives me
 Last Heartbeat:20-01-22
 14:56:25ID:container_1579720108062_0018_01_20Data Port:41029Free 
 Slots
 / All Slots:1 / 0CPU Cores:16Physical Memory:62.9 GBJVM Heap Size:10.3
 GBFlink Managed Memory:24.9 GB

 any suggestions on how to move forward?
 Thanks,
 Fanbin

 Caused by: org.apache.flink.runtime.client.JobExecutionException:
 Job execution failed.
 at
 

Re: batch job OOM

2020-01-22 Thread Jingsong Li
Fanbin,

Looks like your config is wrong, can you show your config code?

Best,
Jingsong Lee

On Thu, Jan 23, 2020 at 12:41 PM Fanbin Bu  wrote:

> Jingsong,
>
> Great, now i got a different error:
>
> java.lang.NullPointerException: Initial Segment may not be null
>   at 
> org.apache.flink.runtime.memory.AbstractPagedOutputView.(AbstractPagedOutputView.java:65)
>   at 
> org.apache.flink.runtime.io.disk.SimpleCollectingOutputView.(SimpleCollectingOutputView.java:49)
>   at 
> org.apache.flink.table.runtime.operators.aggregate.BytesHashMap$RecordArea.(BytesHashMap.java:522)
>   at 
> org.apache.flink.table.runtime.operators.aggregate.BytesHashMap.(BytesHashMap.java:190)
>   at 
> org.apache.flink.table.runtime.operators.aggregate.BytesHashMap.(BytesHashMap.java:149)
>   at LocalHashWinAggWithKeys$292.open(Unknown Source)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:529)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:393)
>   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>   at java.lang.Thread.run(Thread.java:748)
>
>
> is there any other config i should add?
>
> thanks,
>
> Fanbin
>
>
> On Wed, Jan 22, 2020 at 8:07 PM Fanbin Bu  wrote:
>
>> you beat me to it.
>> let's me try that.
>>
>> On Wed, Jan 22, 2020 at 7:57 PM Jingsong Li 
>> wrote:
>>
>>> Fanbin,
>>>
>>> Document is here:
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/config.html
>>> NOTE: you need configure this into TableConfig.
>>>
>>> Best,
>>> Jingsong Lee
>>>
>>> On Thu, Jan 23, 2020 at 11:54 AM Fanbin Bu 
>>> wrote:
>>>
 Jingsong,

 Thank you for the response.
 Since I'm using flink on EMR and the latest version is 1.9 now. the
 second option is ruled out. but will keep that in mind for future upgrade.

 I'm going to try the first option. It's probably a good idea to add
 that in the doc for example:
 https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html

 Thanks,
 Fanbin

 On Wed, Jan 22, 2020 at 7:08 PM Jingsong Li 
 wrote:

> Hi Fanbin,
>
> Thanks for using blink batch mode.
>
> The OOM is caused by the manage memory not enough in Hash aggregation.
>
> There are three options you can choose from:
>
> 1.Is your version Flink 1.9? 1.9 still use fix memory configuration.
> So you need increase hash memory:
> - table.exec.resource.hash-agg.memory: 1024 mb
>
> 2.In 1.10, we use slot manage memory to dynamic config real operator
> memory, so operator can use more manage memory, so you don't need 
> configure
> hash agg memory anymore. You can try 1.10 RC0 [1]
>
> 3.We can use sort aggregation to avoid OOM too, but there is no config
> option now, I created JIRA to track it. [2]
>
> [1]
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/ANNOUNCE-Apache-Flink-1-10-0-release-candidate-0-td36770.html
> [2] https://issues.apache.org/jira/browse/FLINK-15732
>
> Best,
> Jingsong Lee
>
> On Thu, Jan 23, 2020 at 7:40 AM Fanbin Bu 
> wrote:
>
>>
>> tried to increase memory:
>> flink run  -m yarn-cluster -p 16 -ys 1 -ytm 20 -yjm 8096 myjar
>>
>> and still got the same OOM exception.
>>
>> my sql is like:
>>
>> select id, hop_end(created_at, interval '30' second, interval '1' 
>> minute), sum(field)... #20 of these sums
>>
>> from table group by id, hop(created_at, interval '30' second, interval 
>> '1' minute)
>>
>>
>>
>> On Wed, Jan 22, 2020 at 3:19 PM Fanbin Bu 
>> wrote:
>>
>>> Hi,
>>>
>>> I have a batch job using blink planner. and got the following error.
>>> I was able to successfully run the same job with flink 1.8 on yarn.
>>>
>>> I set conf as:
>>> taskmanager.heap.size: 5m
>>>
>>> and flink UI gives me
>>> Last Heartbeat:20-01-22
>>> 14:56:25ID:container_1579720108062_0018_01_20Data Port:41029Free 
>>> Slots
>>> / All Slots:1 / 0CPU Cores:16Physical Memory:62.9 GBJVM Heap Size:10.3
>>> GBFlink Managed Memory:24.9 GB
>>>
>>> any suggestions on how to move forward?
>>> Thanks,
>>> Fanbin
>>>
>>> Caused by: org.apache.flink.runtime.client.JobExecutionException:
>>> Job execution failed.
>>> at
>>> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
>>> at
>>> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:259)
>>> ... 25 more
>>>
>>> *Caused by: java.io.IOException: Hash window aggregate map OOM.* at
>>> HashWinAggWithKeys$534.processElement(Unknown Source)
>>> at
>>> 

Re: batch job OOM

2020-01-22 Thread Fanbin Bu
Jingsong,

Great, now i got a different error:

java.lang.NullPointerException: Initial Segment may not be null
at 
org.apache.flink.runtime.memory.AbstractPagedOutputView.(AbstractPagedOutputView.java:65)
at 
org.apache.flink.runtime.io.disk.SimpleCollectingOutputView.(SimpleCollectingOutputView.java:49)
at 
org.apache.flink.table.runtime.operators.aggregate.BytesHashMap$RecordArea.(BytesHashMap.java:522)
at 
org.apache.flink.table.runtime.operators.aggregate.BytesHashMap.(BytesHashMap.java:190)
at 
org.apache.flink.table.runtime.operators.aggregate.BytesHashMap.(BytesHashMap.java:149)
at LocalHashWinAggWithKeys$292.open(Unknown Source)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:529)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:393)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:748)


is there any other config i should add?

thanks,

Fanbin


On Wed, Jan 22, 2020 at 8:07 PM Fanbin Bu  wrote:

> you beat me to it.
> let's me try that.
>
> On Wed, Jan 22, 2020 at 7:57 PM Jingsong Li 
> wrote:
>
>> Fanbin,
>>
>> Document is here:
>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/config.html
>> NOTE: you need configure this into TableConfig.
>>
>> Best,
>> Jingsong Lee
>>
>> On Thu, Jan 23, 2020 at 11:54 AM Fanbin Bu 
>> wrote:
>>
>>> Jingsong,
>>>
>>> Thank you for the response.
>>> Since I'm using flink on EMR and the latest version is 1.9 now. the
>>> second option is ruled out. but will keep that in mind for future upgrade.
>>>
>>> I'm going to try the first option. It's probably a good idea to add that
>>> in the doc for example:
>>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html
>>>
>>> Thanks,
>>> Fanbin
>>>
>>> On Wed, Jan 22, 2020 at 7:08 PM Jingsong Li 
>>> wrote:
>>>
 Hi Fanbin,

 Thanks for using blink batch mode.

 The OOM is caused by the manage memory not enough in Hash aggregation.

 There are three options you can choose from:

 1.Is your version Flink 1.9? 1.9 still use fix memory configuration. So
 you need increase hash memory:
 - table.exec.resource.hash-agg.memory: 1024 mb

 2.In 1.10, we use slot manage memory to dynamic config real operator
 memory, so operator can use more manage memory, so you don't need configure
 hash agg memory anymore. You can try 1.10 RC0 [1]

 3.We can use sort aggregation to avoid OOM too, but there is no config
 option now, I created JIRA to track it. [2]

 [1]
 http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/ANNOUNCE-Apache-Flink-1-10-0-release-candidate-0-td36770.html
 [2] https://issues.apache.org/jira/browse/FLINK-15732

 Best,
 Jingsong Lee

 On Thu, Jan 23, 2020 at 7:40 AM Fanbin Bu 
 wrote:

>
> tried to increase memory:
> flink run  -m yarn-cluster -p 16 -ys 1 -ytm 20 -yjm 8096 myjar
>
> and still got the same OOM exception.
>
> my sql is like:
>
> select id, hop_end(created_at, interval '30' second, interval '1' 
> minute), sum(field)... #20 of these sums
>
> from table group by id, hop(created_at, interval '30' second, interval 
> '1' minute)
>
>
>
> On Wed, Jan 22, 2020 at 3:19 PM Fanbin Bu 
> wrote:
>
>> Hi,
>>
>> I have a batch job using blink planner. and got the following error.
>> I was able to successfully run the same job with flink 1.8 on yarn.
>>
>> I set conf as:
>> taskmanager.heap.size: 5m
>>
>> and flink UI gives me
>> Last Heartbeat:20-01-22
>> 14:56:25ID:container_1579720108062_0018_01_20Data Port:41029Free 
>> Slots
>> / All Slots:1 / 0CPU Cores:16Physical Memory:62.9 GBJVM Heap Size:10.3
>> GBFlink Managed Memory:24.9 GB
>>
>> any suggestions on how to move forward?
>> Thanks,
>> Fanbin
>>
>> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
>> execution failed.
>> at
>> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
>> at
>> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:259)
>> ... 25 more
>>
>> *Caused by: java.io.IOException: Hash window aggregate map OOM.* at
>> HashWinAggWithKeys$534.processElement(Unknown Source)
>> at
>> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processElement(StreamOneInputProcessor.java:164)
>> at
>> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:143)
>> at
>> 

Re: batch job OOM

2020-01-22 Thread Fanbin Bu
you beat me to it.
let's me try that.

On Wed, Jan 22, 2020 at 7:57 PM Jingsong Li  wrote:

> Fanbin,
>
> Document is here:
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/config.html
> NOTE: you need configure this into TableConfig.
>
> Best,
> Jingsong Lee
>
> On Thu, Jan 23, 2020 at 11:54 AM Fanbin Bu  wrote:
>
>> Jingsong,
>>
>> Thank you for the response.
>> Since I'm using flink on EMR and the latest version is 1.9 now. the
>> second option is ruled out. but will keep that in mind for future upgrade.
>>
>> I'm going to try the first option. It's probably a good idea to add that
>> in the doc for example:
>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html
>>
>> Thanks,
>> Fanbin
>>
>> On Wed, Jan 22, 2020 at 7:08 PM Jingsong Li 
>> wrote:
>>
>>> Hi Fanbin,
>>>
>>> Thanks for using blink batch mode.
>>>
>>> The OOM is caused by the manage memory not enough in Hash aggregation.
>>>
>>> There are three options you can choose from:
>>>
>>> 1.Is your version Flink 1.9? 1.9 still use fix memory configuration. So
>>> you need increase hash memory:
>>> - table.exec.resource.hash-agg.memory: 1024 mb
>>>
>>> 2.In 1.10, we use slot manage memory to dynamic config real operator
>>> memory, so operator can use more manage memory, so you don't need configure
>>> hash agg memory anymore. You can try 1.10 RC0 [1]
>>>
>>> 3.We can use sort aggregation to avoid OOM too, but there is no config
>>> option now, I created JIRA to track it. [2]
>>>
>>> [1]
>>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/ANNOUNCE-Apache-Flink-1-10-0-release-candidate-0-td36770.html
>>> [2] https://issues.apache.org/jira/browse/FLINK-15732
>>>
>>> Best,
>>> Jingsong Lee
>>>
>>> On Thu, Jan 23, 2020 at 7:40 AM Fanbin Bu 
>>> wrote:
>>>

 tried to increase memory:
 flink run  -m yarn-cluster -p 16 -ys 1 -ytm 20 -yjm 8096 myjar

 and still got the same OOM exception.

 my sql is like:

 select id, hop_end(created_at, interval '30' second, interval '1' minute), 
 sum(field)... #20 of these sums

 from table group by id, hop(created_at, interval '30' second, interval '1' 
 minute)



 On Wed, Jan 22, 2020 at 3:19 PM Fanbin Bu 
 wrote:

> Hi,
>
> I have a batch job using blink planner. and got the following error. I
> was able to successfully run the same job with flink 1.8 on yarn.
>
> I set conf as:
> taskmanager.heap.size: 5m
>
> and flink UI gives me
> Last Heartbeat:20-01-22
> 14:56:25ID:container_1579720108062_0018_01_20Data Port:41029Free Slots
> / All Slots:1 / 0CPU Cores:16Physical Memory:62.9 GBJVM Heap Size:10.3
> GBFlink Managed Memory:24.9 GB
>
> any suggestions on how to move forward?
> Thanks,
> Fanbin
>
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
> execution failed.
> at
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
> at
> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:259)
> ... 25 more
>
> *Caused by: java.io.IOException: Hash window aggregate map OOM.* at
> HashWinAggWithKeys$534.processElement(Unknown Source)
> at
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processElement(StreamOneInputProcessor.java:164)
> at
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:143)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.performDefaultAction(StreamTask.java:276)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:298)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:403)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
> at java.lang.Thread.run(Thread.java:748)
>

>>>
>>> --
>>> Best, Jingsong Lee
>>>
>>
>
> --
> Best, Jingsong Lee
>


Re: batch job OOM

2020-01-22 Thread Fanbin Bu
I saw the doc in
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/config.html
.
Do i have to set that in the code or can i do it through flink-conf.yaml?

On Wed, Jan 22, 2020 at 7:54 PM Fanbin Bu  wrote:

> Jingsong,
>
> Thank you for the response.
> Since I'm using flink on EMR and the latest version is 1.9 now. the second
> option is ruled out. but will keep that in mind for future upgrade.
>
> I'm going to try the first option. It's probably a good idea to add that
> in the doc for example:
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html
>
> Thanks,
> Fanbin
>
> On Wed, Jan 22, 2020 at 7:08 PM Jingsong Li 
> wrote:
>
>> Hi Fanbin,
>>
>> Thanks for using blink batch mode.
>>
>> The OOM is caused by the manage memory not enough in Hash aggregation.
>>
>> There are three options you can choose from:
>>
>> 1.Is your version Flink 1.9? 1.9 still use fix memory configuration. So
>> you need increase hash memory:
>> - table.exec.resource.hash-agg.memory: 1024 mb
>>
>> 2.In 1.10, we use slot manage memory to dynamic config real operator
>> memory, so operator can use more manage memory, so you don't need configure
>> hash agg memory anymore. You can try 1.10 RC0 [1]
>>
>> 3.We can use sort aggregation to avoid OOM too, but there is no config
>> option now, I created JIRA to track it. [2]
>>
>> [1]
>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/ANNOUNCE-Apache-Flink-1-10-0-release-candidate-0-td36770.html
>> [2] https://issues.apache.org/jira/browse/FLINK-15732
>>
>> Best,
>> Jingsong Lee
>>
>> On Thu, Jan 23, 2020 at 7:40 AM Fanbin Bu  wrote:
>>
>>>
>>> tried to increase memory:
>>> flink run  -m yarn-cluster -p 16 -ys 1 -ytm 20 -yjm 8096 myjar
>>>
>>> and still got the same OOM exception.
>>>
>>> my sql is like:
>>>
>>> select id, hop_end(created_at, interval '30' second, interval '1' minute), 
>>> sum(field)... #20 of these sums
>>>
>>> from table group by id, hop(created_at, interval '30' second, interval '1' 
>>> minute)
>>>
>>>
>>>
>>> On Wed, Jan 22, 2020 at 3:19 PM Fanbin Bu 
>>> wrote:
>>>
 Hi,

 I have a batch job using blink planner. and got the following error. I
 was able to successfully run the same job with flink 1.8 on yarn.

 I set conf as:
 taskmanager.heap.size: 5m

 and flink UI gives me
 Last Heartbeat:20-01-22
 14:56:25ID:container_1579720108062_0018_01_20Data Port:41029Free Slots
 / All Slots:1 / 0CPU Cores:16Physical Memory:62.9 GBJVM Heap Size:10.3
 GBFlink Managed Memory:24.9 GB

 any suggestions on how to move forward?
 Thanks,
 Fanbin

 Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
 execution failed.
 at
 org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
 at
 org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:259)
 ... 25 more

 *Caused by: java.io.IOException: Hash window aggregate map OOM.* at
 HashWinAggWithKeys$534.processElement(Unknown Source)
 at
 org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processElement(StreamOneInputProcessor.java:164)
 at
 org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:143)
 at
 org.apache.flink.streaming.runtime.tasks.StreamTask.performDefaultAction(StreamTask.java:276)
 at
 org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:298)
 at
 org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:403)
 at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
 at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
 at java.lang.Thread.run(Thread.java:748)

>>>
>>
>> --
>> Best, Jingsong Lee
>>
>


Re: batch job OOM

2020-01-22 Thread Jingsong Li
Fanbin,

Document is here:
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/config.html
NOTE: you need configure this into TableConfig.

Best,
Jingsong Lee

On Thu, Jan 23, 2020 at 11:54 AM Fanbin Bu  wrote:

> Jingsong,
>
> Thank you for the response.
> Since I'm using flink on EMR and the latest version is 1.9 now. the second
> option is ruled out. but will keep that in mind for future upgrade.
>
> I'm going to try the first option. It's probably a good idea to add that
> in the doc for example:
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html
>
> Thanks,
> Fanbin
>
> On Wed, Jan 22, 2020 at 7:08 PM Jingsong Li 
> wrote:
>
>> Hi Fanbin,
>>
>> Thanks for using blink batch mode.
>>
>> The OOM is caused by the manage memory not enough in Hash aggregation.
>>
>> There are three options you can choose from:
>>
>> 1.Is your version Flink 1.9? 1.9 still use fix memory configuration. So
>> you need increase hash memory:
>> - table.exec.resource.hash-agg.memory: 1024 mb
>>
>> 2.In 1.10, we use slot manage memory to dynamic config real operator
>> memory, so operator can use more manage memory, so you don't need configure
>> hash agg memory anymore. You can try 1.10 RC0 [1]
>>
>> 3.We can use sort aggregation to avoid OOM too, but there is no config
>> option now, I created JIRA to track it. [2]
>>
>> [1]
>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/ANNOUNCE-Apache-Flink-1-10-0-release-candidate-0-td36770.html
>> [2] https://issues.apache.org/jira/browse/FLINK-15732
>>
>> Best,
>> Jingsong Lee
>>
>> On Thu, Jan 23, 2020 at 7:40 AM Fanbin Bu  wrote:
>>
>>>
>>> tried to increase memory:
>>> flink run  -m yarn-cluster -p 16 -ys 1 -ytm 20 -yjm 8096 myjar
>>>
>>> and still got the same OOM exception.
>>>
>>> my sql is like:
>>>
>>> select id, hop_end(created_at, interval '30' second, interval '1' minute), 
>>> sum(field)... #20 of these sums
>>>
>>> from table group by id, hop(created_at, interval '30' second, interval '1' 
>>> minute)
>>>
>>>
>>>
>>> On Wed, Jan 22, 2020 at 3:19 PM Fanbin Bu 
>>> wrote:
>>>
 Hi,

 I have a batch job using blink planner. and got the following error. I
 was able to successfully run the same job with flink 1.8 on yarn.

 I set conf as:
 taskmanager.heap.size: 5m

 and flink UI gives me
 Last Heartbeat:20-01-22
 14:56:25ID:container_1579720108062_0018_01_20Data Port:41029Free Slots
 / All Slots:1 / 0CPU Cores:16Physical Memory:62.9 GBJVM Heap Size:10.3
 GBFlink Managed Memory:24.9 GB

 any suggestions on how to move forward?
 Thanks,
 Fanbin

 Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
 execution failed.
 at
 org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
 at
 org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:259)
 ... 25 more

 *Caused by: java.io.IOException: Hash window aggregate map OOM.* at
 HashWinAggWithKeys$534.processElement(Unknown Source)
 at
 org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processElement(StreamOneInputProcessor.java:164)
 at
 org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:143)
 at
 org.apache.flink.streaming.runtime.tasks.StreamTask.performDefaultAction(StreamTask.java:276)
 at
 org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:298)
 at
 org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:403)
 at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
 at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
 at java.lang.Thread.run(Thread.java:748)

>>>
>>
>> --
>> Best, Jingsong Lee
>>
>

-- 
Best, Jingsong Lee


Re: batch job OOM

2020-01-22 Thread Fanbin Bu
Jingsong,

Thank you for the response.
Since I'm using flink on EMR and the latest version is 1.9 now. the second
option is ruled out. but will keep that in mind for future upgrade.

I'm going to try the first option. It's probably a good idea to add that in
the doc for example:
https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html

Thanks,
Fanbin

On Wed, Jan 22, 2020 at 7:08 PM Jingsong Li  wrote:

> Hi Fanbin,
>
> Thanks for using blink batch mode.
>
> The OOM is caused by the manage memory not enough in Hash aggregation.
>
> There are three options you can choose from:
>
> 1.Is your version Flink 1.9? 1.9 still use fix memory configuration. So
> you need increase hash memory:
> - table.exec.resource.hash-agg.memory: 1024 mb
>
> 2.In 1.10, we use slot manage memory to dynamic config real operator
> memory, so operator can use more manage memory, so you don't need configure
> hash agg memory anymore. You can try 1.10 RC0 [1]
>
> 3.We can use sort aggregation to avoid OOM too, but there is no config
> option now, I created JIRA to track it. [2]
>
> [1]
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/ANNOUNCE-Apache-Flink-1-10-0-release-candidate-0-td36770.html
> [2] https://issues.apache.org/jira/browse/FLINK-15732
>
> Best,
> Jingsong Lee
>
> On Thu, Jan 23, 2020 at 7:40 AM Fanbin Bu  wrote:
>
>>
>> tried to increase memory:
>> flink run  -m yarn-cluster -p 16 -ys 1 -ytm 20 -yjm 8096 myjar
>>
>> and still got the same OOM exception.
>>
>> my sql is like:
>>
>> select id, hop_end(created_at, interval '30' second, interval '1' minute), 
>> sum(field)... #20 of these sums
>>
>> from table group by id, hop(created_at, interval '30' second, interval '1' 
>> minute)
>>
>>
>>
>> On Wed, Jan 22, 2020 at 3:19 PM Fanbin Bu  wrote:
>>
>>> Hi,
>>>
>>> I have a batch job using blink planner. and got the following error. I
>>> was able to successfully run the same job with flink 1.8 on yarn.
>>>
>>> I set conf as:
>>> taskmanager.heap.size: 5m
>>>
>>> and flink UI gives me
>>> Last Heartbeat:20-01-22
>>> 14:56:25ID:container_1579720108062_0018_01_20Data Port:41029Free Slots
>>> / All Slots:1 / 0CPU Cores:16Physical Memory:62.9 GBJVM Heap Size:10.3
>>> GBFlink Managed Memory:24.9 GB
>>>
>>> any suggestions on how to move forward?
>>> Thanks,
>>> Fanbin
>>>
>>> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
>>> execution failed.
>>> at
>>> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
>>> at
>>> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:259)
>>> ... 25 more
>>>
>>> *Caused by: java.io.IOException: Hash window aggregate map OOM.* at
>>> HashWinAggWithKeys$534.processElement(Unknown Source)
>>> at
>>> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processElement(StreamOneInputProcessor.java:164)
>>> at
>>> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:143)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.performDefaultAction(StreamTask.java:276)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:298)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:403)
>>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>>> at java.lang.Thread.run(Thread.java:748)
>>>
>>
>
> --
> Best, Jingsong Lee
>


Re: java.lang.StackOverflowError

2020-01-22 Thread 刘建刚
多谢,已经找到解决的issue了:https://issues.apache.org/jira/browse/FLINK-10367 


> 2020年1月22日 下午4:48,zhisheng  写道:
> 
> 1、建议问题别同时发到三个邮件去
> 2、找找还有没有更加明显的异常日志
> 
> 刘建刚  于2020年1月22日周三 上午10:25写道:
> 
>> I am using flink 1.6.2 on yarn. State backend is rocksdb.
>> 
>>> 2020年1月22日 上午10:15,刘建刚  写道:
>>> 
>>>  I have a flink job which fails occasionally. I am eager to avoid
>> this problem. Can anyone help me? The error stacktrace is as following:
>>> java.io.IOException: java.lang.StackOverflowError
>>>  at org.apache.flink.runtime.io
>> .network.partition.consumer.InputChannel.checkError(InputChannel.java:191)
>>>  at org.apache.flink.runtime.io
>> .network.partition.consumer.RemoteInputChannel.getNextBuffer(RemoteInputChannel.java:194)
>>>  at org.apache.flink.runtime.io
>> .network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:589)
>>>  at org.apache.flink.runtime.io
>> .network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:546)
>>>  at
>> org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:175)
>>>  at org.apache.flink.streaming.runtime.io
>> .StreamInputProcessor.processInput(StreamInputProcessor.java:236)
>>>  at
>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
>>>  at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:335)
>>>  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:754)
>>>  at java.lang.Thread.run(Thread.java:745)
>>> Caused by: java.lang.StackOverflowError
>>>  at org.apache.flink.runtime.io
>> .network.partition.consumer.SingleInputGate.notifyChannelNonEmpty(SingleInputGate.java:656)
>>>  at org.apache.flink.runtime.io
>> .network.partition.consumer.InputChannel.notifyChannelNonEmpty(InputChannel.java:125)
>>>  at org.apache.flink.runtime.io
>> .network.partition.consumer.InputChannel.setError(InputChannel.java:203)
>>>  at org.apache.flink.runtime.io
>> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:403)
>>>  at org.apache.flink.runtime.io
>> .network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
>>>  at org.apache.flink.runtime.io
>> .network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
>>>  at
>> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
>>>  at
>> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
>>>  at org.apache.flink.runtime.io
>> .network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
>>>  at org.apache.flink.runtime.io
>> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
>>>  at org.apache.flink.runtime.io
>> .network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
>>>  at org.apache.flink.runtime.io
>> .network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
>>>  at
>> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
>>>  at
>> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
>>>  at org.apache.flink.runtime.io
>> .network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
>>>  at org.apache.flink.runtime.io
>> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
>>>  at org.apache.flink.runtime.io
>> .network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
>>>  at org.apache.flink.runtime.io
>> .network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
>>>  at
>> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
>>>  at
>> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
>>>  at org.apache.flink.runtime.io
>> .network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
>>>  at org.apache.flink.runtime.io
>> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
>>>  at org.apache.flink.runtime.io
>> .network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
>>>  at org.apache.flink.runtime.io
>> .network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
>>>  at
>> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
>>>  at
>> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
>>>  at org.apache.flink.runtime.io
>> 

Re: batch job OOM

2020-01-22 Thread Jingsong Li
Hi Fanbin,

Thanks for using blink batch mode.

The OOM is caused by the manage memory not enough in Hash aggregation.

There are three options you can choose from:

1.Is your version Flink 1.9? 1.9 still use fix memory configuration. So you
need increase hash memory:
- table.exec.resource.hash-agg.memory: 1024 mb

2.In 1.10, we use slot manage memory to dynamic config real operator
memory, so operator can use more manage memory, so you don't need configure
hash agg memory anymore. You can try 1.10 RC0 [1]

3.We can use sort aggregation to avoid OOM too, but there is no config
option now, I created JIRA to track it. [2]

[1]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/ANNOUNCE-Apache-Flink-1-10-0-release-candidate-0-td36770.html
[2] https://issues.apache.org/jira/browse/FLINK-15732

Best,
Jingsong Lee

On Thu, Jan 23, 2020 at 7:40 AM Fanbin Bu  wrote:

>
> tried to increase memory:
> flink run  -m yarn-cluster -p 16 -ys 1 -ytm 20 -yjm 8096 myjar
>
> and still got the same OOM exception.
>
> my sql is like:
>
> select id, hop_end(created_at, interval '30' second, interval '1' minute), 
> sum(field)... #20 of these sums
>
> from table group by id, hop(created_at, interval '30' second, interval '1' 
> minute)
>
>
>
> On Wed, Jan 22, 2020 at 3:19 PM Fanbin Bu  wrote:
>
>> Hi,
>>
>> I have a batch job using blink planner. and got the following error. I
>> was able to successfully run the same job with flink 1.8 on yarn.
>>
>> I set conf as:
>> taskmanager.heap.size: 5m
>>
>> and flink UI gives me
>> Last Heartbeat:20-01-22
>> 14:56:25ID:container_1579720108062_0018_01_20Data Port:41029Free Slots
>> / All Slots:1 / 0CPU Cores:16Physical Memory:62.9 GBJVM Heap Size:10.3
>> GBFlink Managed Memory:24.9 GB
>>
>> any suggestions on how to move forward?
>> Thanks,
>> Fanbin
>>
>> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
>> execution failed.
>> at
>> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
>> at
>> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:259)
>> ... 25 more
>>
>> *Caused by: java.io.IOException: Hash window aggregate map OOM.* at
>> HashWinAggWithKeys$534.processElement(Unknown Source)
>> at
>> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processElement(StreamOneInputProcessor.java:164)
>> at
>> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:143)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.performDefaultAction(StreamTask.java:276)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:298)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:403)
>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>> at java.lang.Thread.run(Thread.java:748)
>>
>

-- 
Best, Jingsong Lee


Re: Usage of KafkaDeserializationSchema and KafkaSerializationSchema

2020-01-22 Thread Jason Kania
 Thanks for responding.
I am aware where the topic is used. What I do not see is how to set the topic 
within the class that implements the KafkaSerializationSchema.serialize(  T 
classObject, Long timestamp ) method.
The method must create and return a value of type ProducerRecord, but all the constructors for ProducerRecord expect "String topic" as 
the first argument. This will not be passed to the method so the question is 
where the implementation of the class is supposed to get the topic?
On Wednesday, January 22, 2020, 08:29:49 p.m. EST, David Magalhães 
 wrote:  
 
 Hi Jason,
The topic is used in FlinkKafkaConsumer, following the 
KafkaDeserializationSchema and then Properties.

https://ci.apache.org/projects/flink/flink-docs-release-1.9/api/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.html

new FlinkKafkaConsumer(kafkaTopic, new MessageDeserializer, kafkaProperties)
...
class MessageDeserializer extends KafkaDeserializationSchema[GenericRecord] {

|  |



On Thu, Jan 23, 2020 at 1:20 AM Jason Kania  wrote:

Hello,
I was looking for documentation in 1.9.1 on how to create implementations of 
the KafkaSerializationSchema and KafkaDeserializationSchema interfaces. I have 
created implementations in the past for the SerializationSchema and 
DeserializationSchema interface. Unfortunately, I can find no examples and the 
code contains no documentation for this purpose but some information appears 
missing.
Can someone please answer the following:
1) When creating a ProducerRecord with the KafkaSerializationSchema.serialize() 
method, how is the topic String supposed to be obtained by the implementing 
class? All of the constructors require that the topic be specified, but the 
topic is not passed in. Is there another interface that should be implemented 
to get the topic or get a callback? Or is expected that the topic has to be 
fixed in the interface's implementation class? Some of the constructors also 
ask for a partition. Again, where is this information expected to come from?
2) The interfaces specify that ConsumerRecord is received and 
ProducerRecord is to be generated. What are the 2 byte arrays 
referencing in the type definitions?
Thanks,
Jason
  

Re: Usage of KafkaDeserializationSchema and KafkaSerializationSchema

2020-01-22 Thread David Magalhães
Hi Jason,

The topic is used in *FlinkKafkaConsumer*, following the
*KafkaDeserializationSchema* and then *Properties*.

https://ci.apache.org/projects/flink/flink-docs-release-1.9/api/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.html

new FlinkKafkaConsumer(kafkaTopic, new MessageDeserializer, kafkaProperties)
...
class MessageDeserializer extends KafkaDeserializationSchema[GenericRecord]
{


On Thu, Jan 23, 2020 at 1:20 AM Jason Kania  wrote:

> Hello,
>
> I was looking for documentation in 1.9.1 on how to create implementations
> of the KafkaSerializationSchema and KafkaDeserializationSchema
> interfaces. I have created implementations in the past for the
> SerializationSchema and DeserializationSchema interface. Unfortunately, I
> can find no examples and the code contains no documentation for this
> purpose but some information appears missing.
>
> Can someone please answer the following:
>
> 1) When creating a ProducerRecord with the 
> KafkaSerializationSchema.serialize()
> method, how is the topic String supposed to be obtained by the implementing
> class? All of the constructors require that the topic be specified, but the
> topic is not passed in. Is there another interface that should be
> implemented to get the topic or get a callback? Or is expected that the
> topic has to be fixed in the interface's implementation class? Some of the
> constructors also ask for a partition. Again, where is this information
> expected to come from?
>
> 2) The interfaces specify that ConsumerRecord is received
> and ProducerRecord is to be generated. What are the 2
> byte arrays referencing in the type definitions?
>
> Thanks,
>
> Jason
>


Re: Flink ParquetAvroWriters Sink

2020-01-22 Thread aj
 Hi Arvid,

I want to keep generic records only and I do not want to keep the schema
definition on the consumer side and should be resolve from the schema
registry only. I am following the below post

https://stackoverflow.com/questions/58849635/is-it-possible-to-deserialize-avro-messageconsuming-message-from-kafka-without/59865360#59865360

so please help me what is wrong with my code.



On Thu, Jan 23, 2020, 00:38 Arvid Heise  wrote:

> Hi Anuj,
>
> I recommend using the ConfluentRegistryAvroDeserializationSchema [1] with
> a specific record that has been generated with the Avro Maven Plugin [2] or
> Avro Gradle Plugin [3]. That should result into almost no code and maximal
> maintainability.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/connectors/kafka.html#the-deserializationschema
> [2] https://avro.apache.org/docs/1.8.2/gettingstartedjava.html
> [3] https://github.com/davidmc24/gradle-avro-plugin
>
> On Wed, Jan 22, 2020 at 6:43 PM aj  wrote:
>
>> Hi Arvid,
>>
>> I have implemented the code with envelope schema as you suggested but now
>> I am facing issues with the consumer . I have written code like this:
>>
>> FlinkKafkaConsumer010 kafkaConsumer010 = new
>> FlinkKafkaConsumer010(KAFKA_TOPICS,
>> new
>> KafkaGenericAvroDeserializationSchema(schemaRegistryUrl),
>> properties);
>>
>> And the Deserialization class looks like this :
>>
>> pblic class KafkaGenericAvroDeserializationSchema implements
>> KeyedDeserializationSchema {
>>
>> private final String registryUrl;
>> private transient KafkaAvroDeserializer inner;
>>
>> public KafkaGenericAvroDeserializationSchema(String registryUrl) {
>> this.registryUrl = registryUrl;
>> }
>>
>> @Override
>> public GenericRecord deserialize(byte[] messageKey, byte[] message,
>> String topic, int partition, long offset) {
>> checkInitialized();
>> return (GenericRecord) inner.deserialize(topic, message);
>> }
>>
>> @Override
>> public boolean isEndOfStream(GenericRecord nextElement) {
>> return false;
>> }
>>
>> @Override
>> public TypeInformation getProducedType() {
>> return TypeExtractor.getForClass(GenericRecord.class);
>> }
>>
>> private void checkInitialized() {
>> if (inner == null) {
>> Map props = new HashMap<>();
>>
>> props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
>> registryUrl);
>>
>> props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, false);
>> SchemaRegistryClient client =
>> new CachedSchemaRegistryClient(
>> registryUrl,
>> AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT);
>> inner = new KafkaAvroDeserializer(client, props);
>> }
>> }
>> }
>>
>>
>> It's working locally on my machine but when I deployed it on yarn cluster
>> I am getting below exception:
>>
>>
>> java.lang.Exception: org.apache.flink.streaming.runtime.tasks.
>> ExceptionInChainedOperatorException: Could not forward element to next
>> operator
>> at org.apache.flink.streaming.runtime.tasks.
>> SourceStreamTask$LegacySourceFunctionThread
>> .checkThrowSourceExecutionException(SourceStreamTask.java:212)
>> at org.apache.flink.streaming.runtime.tasks.SourceStreamTask
>> .performDefaultAction(SourceStreamTask.java:132)
>> at org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask
>> .java:298)
>> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
>> StreamTask.java:403)
>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>> at java.lang.Thread.run(Thread.java:748)
>> Caused by: org.apache.flink.streaming.runtime.tasks.
>> ExceptionInChainedOperatorException: Could not forward element to next
>> operator
>> at org.apache.flink.streaming.runtime.tasks.
>> OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:654
>> )
>> at org.apache.flink.streaming.runtime.tasks.
>> OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
>> at org.apache.flink.streaming.runtime.tasks.
>> OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
>> at org.apache.flink.streaming.api.operators.
>> AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator
>> .java:727)
>> at org.apache.flink.streaming.api.operators.
>> AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator
>> .java:705)
>> at org.apache.flink.streaming.api.operators.
>> StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts
>> .java:104)
>> at org.apache.flink.streaming.api.operators.
>> StreamSourceContexts$NonTimestampContext.collectWithTimestamp(
>> StreamSourceContexts.java:111)
>> at org.apache.flink.streaming.connectors.kafka.internals.
>> 

Usage of KafkaDeserializationSchema and KafkaSerializationSchema

2020-01-22 Thread Jason Kania
Hello,
I was looking for documentation in 1.9.1 on how to create implementations of 
the KafkaSerializationSchema and KafkaDeserializationSchema interfaces. I have 
created implementations in the past for the SerializationSchema and 
DeserializationSchema interface. Unfortunately, I can find no examples and the 
code contains no documentation for this purpose but some information appears 
missing.
Can someone please answer the following:
1) When creating a ProducerRecord with the KafkaSerializationSchema.serialize() 
method, how is the topic String supposed to be obtained by the implementing 
class? All of the constructors require that the topic be specified, but the 
topic is not passed in. Is there another interface that should be implemented 
to get the topic or get a callback? Or is expected that the topic has to be 
fixed in the interface's implementation class? Some of the constructors also 
ask for a partition. Again, where is this information expected to come from?
2) The interfaces specify that ConsumerRecord is received and 
ProducerRecord is to be generated. What are the 2 byte arrays 
referencing in the type definitions?
Thanks,
Jason

Re: Custom label for Prometheus Exporter

2020-01-22 Thread Austin Cawley-Edwards
Following up, we deploy to K8s with one service per job manager and task
manager for metrics, and we add job-identifying labels to those. We also
use the Prometheus Operator which makes it easy to add those labels as
dimensions when scraping.

Best,
Austin

On Wed, Jan 22, 2020 at 7:21 PM Austin Cawley-Edwards <
austin.caw...@gmail.com> wrote:

> Hey Anaray,
>
> Have you checked out the “scope” configuration?[1]
>
> Best,
> Austin
>
>
> [1]:
>
> https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html#scope
>
> On Wed, Jan 22, 2020 at 4:09 PM anaray  wrote:
>
>> Hi flink team,
>>
>> Is there a way to add a custom label to flink metrics when using
>> Prometheus
>> Exporter ? I need to add a label= for the JobManager metrics. As
>> of
>> now I see only host label
>>
>> for example
>>
>> *flink_jobmanager_Status_JVM_Memory_Direct_Count{host="localhost",} 18.0*
>>
>> This is not of big help if I deploy the service in k8s or swarm. I would
>> like associate a jobmanager with a jobname atleast in a JOB mode
>> deployment.
>>
>> Please let me know if there is any way to add a custom label by
>> cinfiguration?
>>
>> Thanks,
>> anaray
>>
>>
>>
>> --
>> Sent from:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>
>


Re: Custom label for Prometheus Exporter

2020-01-22 Thread Austin Cawley-Edwards
Hey Anaray,

Have you checked out the “scope” configuration?[1]

Best,
Austin


[1]:
https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html#scope

On Wed, Jan 22, 2020 at 4:09 PM anaray  wrote:

> Hi flink team,
>
> Is there a way to add a custom label to flink metrics when using Prometheus
> Exporter ? I need to add a label= for the JobManager metrics. As
> of
> now I see only host label
>
> for example
>
> *flink_jobmanager_Status_JVM_Memory_Direct_Count{host="localhost",} 18.0*
>
> This is not of big help if I deploy the service in k8s or swarm. I would
> like associate a jobmanager with a jobname atleast in a JOB mode
> deployment.
>
> Please let me know if there is any way to add a custom label by
> cinfiguration?
>
> Thanks,
> anaray
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


[State Processor API] how to convert savepoint back to broadcast state

2020-01-22 Thread Jin Yi
Hi there,

I would like to read the savepoints (for broadcast state) back into the
broadcast state, how should I do it?

// load the existingSavepoint;
ExistingSavepoint existingSavepoint = Savepoint.load(environment,
"file:///tmp/new_savepoints", new MemoryStateBackend());

// read state from existing savepoint
dataSet = existingSavepoint.readBroadcastState(OPERATOR_UID,
"largeKeySetStateDescription", BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO);

// TODO in BoradcastProcessFunction, how can I put the savepoint
dataset back into BroadcastState?

Thanks!

Eleanore


[State Processor API] how to convert savepoint back to broadcast state

2020-01-22 Thread Jin Yi
Hi there,

I would like to read the savepoints (for broadcast state) back into the
broadcast state, how should I do it?

// load the existingSavepoint;
ExistingSavepoint existingSavepoint = Savepoint.load(environment,
"file:///tmp/new_savepoints", new MemoryStateBackend());

// read state from existing savepoint
dataSet = existingSavepoint.readBroadcastState(OPERATOR_UID,
"largeKeySetStateDescription", BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO);

// TODO in BoradcastProcessFunction, how can I put the savepoint
dataset back into BroadcastState?

Thanks!

Eleanore


Re: batch job OOM

2020-01-22 Thread Fanbin Bu
tried to increase memory:
flink run  -m yarn-cluster -p 16 -ys 1 -ytm 20 -yjm 8096 myjar

and still got the same OOM exception.

my sql is like:

select id, hop_end(created_at, interval '30' second, interval '1'
minute), sum(field)... #20 of these sums

from table group by id, hop(created_at, interval '30' second, interval
'1' minute)



On Wed, Jan 22, 2020 at 3:19 PM Fanbin Bu  wrote:

> Hi,
>
> I have a batch job using blink planner. and got the following error. I was
> able to successfully run the same job with flink 1.8 on yarn.
>
> I set conf as:
> taskmanager.heap.size: 5m
>
> and flink UI gives me
> Last Heartbeat:20-01-22
> 14:56:25ID:container_1579720108062_0018_01_20Data Port:41029Free Slots
> / All Slots:1 / 0CPU Cores:16Physical Memory:62.9 GBJVM Heap Size:10.3
> GBFlink Managed Memory:24.9 GB
>
> any suggestions on how to move forward?
> Thanks,
> Fanbin
>
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
> execution failed.
> at
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
> at
> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:259)
> ... 25 more
>
> *Caused by: java.io.IOException: Hash window aggregate map OOM.* at
> HashWinAggWithKeys$534.processElement(Unknown Source)
> at
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processElement(StreamOneInputProcessor.java:164)
> at
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:143)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.performDefaultAction(StreamTask.java:276)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:298)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:403)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
> at java.lang.Thread.run(Thread.java:748)
>


batch job OOM

2020-01-22 Thread Fanbin Bu
Hi,

I have a batch job using blink planner. and got the following error. I was
able to successfully run the same job with flink 1.8 on yarn.

I set conf as:
taskmanager.heap.size: 5m

and flink UI gives me
Last Heartbeat:20-01-22
14:56:25ID:container_1579720108062_0018_01_20Data Port:41029Free Slots
/ All Slots:1 / 0CPU Cores:16Physical Memory:62.9 GBJVM Heap Size:10.3
GBFlink Managed Memory:24.9 GB

any suggestions on how to move forward?
Thanks,
Fanbin

Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
execution failed.
at
org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
at
org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:259)
... 25 more

*Caused by: java.io.IOException: Hash window aggregate map OOM.* at
HashWinAggWithKeys$534.processElement(Unknown Source)
at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processElement(StreamOneInputProcessor.java:164)
at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:143)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.performDefaultAction(StreamTask.java:276)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:298)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:403)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:748)


TableSource being duplicated

2020-01-22 Thread Benoît Paris
Hello all!

I'm having a problem with TableSources' DataStream being duplicated when
pulled on from 2 sinks.

I understand that sometimes the best plan might just be to duplicate and
read both times a TableSource/SourceFunction; but in my case I can't quite
reproduce the data as say Kafka would. I just need the SourceFunction and
DataStream provided by the TableSource to not be duplicated.

As a workaround to this issue, I introduce some sort of materialization
barrier that makes the planner pull only on one instance of the
TableSource/SourceFunction:
Instead of:

tEnv.registerTableSource("foo_table", new FooTableSource());

I convert it to an Append Stream, and back again to a Table:

tEnv.registerTableSource("foo_table_source", new FooTableSource());
Table sourceTable = tEnv.sqlQuery("SELECT * FROM foo_table_source");
Table appendingSourceTable = tEnv.fromDataStream(
tEnv.toAppendStream(sourceTable, Types.ROW(new
String[]{"field_1"}, new TypeInformation[]{Types.LONG()}))
);
tEnv.registerTable("foo_table", appendingSourceTable);

And the conversion to an Append Stream somewhat makes the planner behave
and there is only one DataSource in the execution plan.

But I'm feeling like I'm just missing a simple option (on the
SourceFunction, or on the TableSource?) to invoke and declare the Source as
being non duplicateable.

I have tried a lot of options (uid(), operation chaining restrictions,
twiddling the transformation, forceNonParallel(), etc.), but can't find
quite how to do that! My SourceFunction is a RichSourceFunction

At this point I'm wondering if this is a bug, or if it is a feature that
would have to be implemented.

Cheers,
Ben


Custom label for Prometheus Exporter

2020-01-22 Thread anaray
Hi flink team,

Is there a way to add a custom label to flink metrics when using Prometheus
Exporter ? I need to add a label= for the JobManager metrics. As of
now I see only host label 

for example

*flink_jobmanager_Status_JVM_Memory_Direct_Count{host="localhost",} 18.0*

This is not of big help if I deploy the service in k8s or swarm. I would
like associate a jobmanager with a jobname atleast in a JOB mode deployment.

Please let me know if there is any way to add a custom label by
cinfiguration?

Thanks,
anaray



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: How to get Task metrics with StatsD metric reporter?

2020-01-22 Thread John Smith
Hi,

1- Yes. I have spaces in the job name and task. How do you configure the
metric scope for a particular job?
2- I opted for the second solution, I forked my own StatsD reporter and
squashed all spaces. Here:
https://github.com/javadevmtl/flink/blob/statsd-spaces/flink-metrics/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java
3- Maybe filter characters or an additional function can take in a config
for RegEx that removes any special chars from the RegEx pattern?
4- Another idea I also explored but didn't get around to was to have
configurable drop event by RegEx or keep event by RegEx. Not tied to the
above but a good option to have as a feature.




On Wed, 22 Jan 2020 at 03:55, Chesnay Schepler  wrote:

> I presume your job/task names contains a space, which is included in the
> metrics scope?
>
> You can either configure the metric scope such that the job/task ID is
> included instead, or create a modified version of the StatsDReporter that
> filters out additional characters(i.e., override #filterCharacters).
>
> When it comes to automatically filtering characters the StatsDReporter is
> in a bit of a pickle; different backends have different rules for what
> characters are allowed which also differ with StatsD.
> I'm not sure yet what the best solution for this is.
>
> On 21/01/2020 17:18, John Smith wrote:
>
> I think I figured it out. I used netcat to debug. I think the Telegraf
> StatsD server doesn't support spaces in the stats names.
>
> On Mon, 20 Jan 2020 at 12:19, John Smith  wrote:
>
>> Hi, running Flink 1.8
>>
>> I'm declaring my metric as such.
>>
>> invalidList = getRuntimeContext()
>>   .getMetricGroup()
>>   .addGroup("MyMetrics")
>>   .meter("invalidList", new DropwizardMeterWrapper(new 
>> com.codahale.metrics.Meter()));
>>
>> Then in my code I call.
>>
>> invalidList.markEvent();
>>
>>
>> On the task nodes I enabled the Influx Telegraf StatsD server. And I
>> enabled the task node with.
>>
>> metrics.reporter.stsd.class:
>> org.apache.flink.metrics.statsd.StatsDReporter
>> metrics.reporter.stsd.host: localhost
>> metrics.reporter.stsd.port: 8125
>>
>> The metrics are being pushed to Elasticsearch. So far I only see the
>> Status_JVM_* metrics.
>>
>> Do the task specific metrics come from the Job nodes? I have not enabled
>> reporting on the Job nodes yet.
>>
>>
>>
>>
>>
>>
>>
>>
>


Re: Flink ParquetAvroWriters Sink

2020-01-22 Thread Arvid Heise
Hi Anuj,

I recommend using the ConfluentRegistryAvroDeserializationSchema [1] with a
specific record that has been generated with the Avro Maven Plugin [2] or
Avro Gradle Plugin [3]. That should result into almost no code and maximal
maintainability.

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/connectors/kafka.html#the-deserializationschema
[2] https://avro.apache.org/docs/1.8.2/gettingstartedjava.html
[3] https://github.com/davidmc24/gradle-avro-plugin

On Wed, Jan 22, 2020 at 6:43 PM aj  wrote:

> Hi Arvid,
>
> I have implemented the code with envelope schema as you suggested but now
> I am facing issues with the consumer . I have written code like this:
>
> FlinkKafkaConsumer010 kafkaConsumer010 = new
> FlinkKafkaConsumer010(KAFKA_TOPICS,
> new
> KafkaGenericAvroDeserializationSchema(schemaRegistryUrl),
> properties);
>
> And the Deserialization class looks like this :
>
> pblic class KafkaGenericAvroDeserializationSchema implements
> KeyedDeserializationSchema {
>
> private final String registryUrl;
> private transient KafkaAvroDeserializer inner;
>
> public KafkaGenericAvroDeserializationSchema(String registryUrl) {
> this.registryUrl = registryUrl;
> }
>
> @Override
> public GenericRecord deserialize(byte[] messageKey, byte[] message,
> String topic, int partition, long offset) {
> checkInitialized();
> return (GenericRecord) inner.deserialize(topic, message);
> }
>
> @Override
> public boolean isEndOfStream(GenericRecord nextElement) {
> return false;
> }
>
> @Override
> public TypeInformation getProducedType() {
> return TypeExtractor.getForClass(GenericRecord.class);
> }
>
> private void checkInitialized() {
> if (inner == null) {
> Map props = new HashMap<>();
>
> props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
> registryUrl);
>
> props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, false);
> SchemaRegistryClient client =
> new CachedSchemaRegistryClient(
> registryUrl,
> AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT);
> inner = new KafkaAvroDeserializer(client, props);
> }
> }
> }
>
>
> It's working locally on my machine but when I deployed it on yarn cluster
> I am getting below exception:
>
>
> java.lang.Exception: org.apache.flink.streaming.runtime.tasks.
> ExceptionInChainedOperatorException: Could not forward element to next
> operator
> at org.apache.flink.streaming.runtime.tasks.
> SourceStreamTask$LegacySourceFunctionThread
> .checkThrowSourceExecutionException(SourceStreamTask.java:212)
> at org.apache.flink.streaming.runtime.tasks.SourceStreamTask
> .performDefaultAction(SourceStreamTask.java:132)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask
> .java:298)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
> StreamTask.java:403)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.streaming.runtime.tasks.
> ExceptionInChainedOperatorException: Could not forward element to next
> operator
> at org.apache.flink.streaming.runtime.tasks.
> OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:654)
> at org.apache.flink.streaming.runtime.tasks.
> OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
> at org.apache.flink.streaming.runtime.tasks.
> OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
> at org.apache.flink.streaming.api.operators.
> AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:
> 727)
> at org.apache.flink.streaming.api.operators.
> AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:
> 705)
> at org.apache.flink.streaming.api.operators.
> StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts
> .java:104)
> at org.apache.flink.streaming.api.operators.
> StreamSourceContexts$NonTimestampContext.collectWithTimestamp(
> StreamSourceContexts.java:111)
> at org.apache.flink.streaming.connectors.kafka.internals.
> AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:398)
> at org.apache.flink.streaming.connectors.kafka.internal.
> Kafka010Fetcher.emitRecord(Kafka010Fetcher.java:91)
> at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher
> .runFetchLoop(Kafka09Fetcher.java:156)
> at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase
> .run(FlinkKafkaConsumerBase.java:715)
> at org.apache.flink.streaming.api.operators.StreamSource.run(
> StreamSource.java:100)
> at org.apache.flink.streaming.api.operators.StreamSource.run(
> StreamSource.java:63)
> at 

Re: Flink ParquetAvroWriters Sink

2020-01-22 Thread aj
Hi Arvid,

I have implemented the code with envelope schema as you suggested but now I
am facing issues with the consumer . I have written code like this:

FlinkKafkaConsumer010 kafkaConsumer010 = new
FlinkKafkaConsumer010(KAFKA_TOPICS,
new
KafkaGenericAvroDeserializationSchema(schemaRegistryUrl),
properties);

And the Deserialization class looks like this :

pblic class KafkaGenericAvroDeserializationSchema implements
KeyedDeserializationSchema {

private final String registryUrl;
private transient KafkaAvroDeserializer inner;

public KafkaGenericAvroDeserializationSchema(String registryUrl) {
this.registryUrl = registryUrl;
}

@Override
public GenericRecord deserialize(byte[] messageKey, byte[] message,
String topic, int partition, long offset) {
checkInitialized();
return (GenericRecord) inner.deserialize(topic, message);
}

@Override
public boolean isEndOfStream(GenericRecord nextElement) {
return false;
}

@Override
public TypeInformation getProducedType() {
return TypeExtractor.getForClass(GenericRecord.class);
}

private void checkInitialized() {
if (inner == null) {
Map props = new HashMap<>();

props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
registryUrl);

props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, false);
SchemaRegistryClient client =
new CachedSchemaRegistryClient(
registryUrl,
AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT);
inner = new KafkaAvroDeserializer(client, props);
}
}
}


It's working locally on my machine but when I deployed it on yarn cluster I
am getting below exception:


java.lang.Exception: org.apache.flink.streaming.runtime.tasks.
ExceptionInChainedOperatorException: Could not forward element to next
operator
at org.apache.flink.streaming.runtime.tasks.
SourceStreamTask$LegacySourceFunctionThread
.checkThrowSourceExecutionException(SourceStreamTask.java:212)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask
.performDefaultAction(SourceStreamTask.java:132)
at org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask
.java:298)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask
.java:403)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.streaming.runtime.tasks.
ExceptionInChainedOperatorException: Could not forward element to next
operator
at org.apache.flink.streaming.runtime.tasks.
OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:654)
at org.apache.flink.streaming.runtime.tasks.
OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
at org.apache.flink.streaming.runtime.tasks.
OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
at org.apache.flink.streaming.api.operators.
AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:
727)
at org.apache.flink.streaming.api.operators.
AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:
705)
at org.apache.flink.streaming.api.operators.
StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:
104)
at org.apache.flink.streaming.api.operators.
StreamSourceContexts$NonTimestampContext.collectWithTimestamp(
StreamSourceContexts.java:111)
at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher
.emitRecordWithTimestamp(AbstractFetcher.java:398)
at org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher
.emitRecord(Kafka010Fetcher.java:91)
at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher
.runFetchLoop(Kafka09Fetcher.java:156)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase
.run(FlinkKafkaConsumerBase.java:715)
at org.apache.flink.streaming.api.operators.StreamSource.run(
StreamSource.java:100)
at org.apache.flink.streaming.api.operators.StreamSource.run(
StreamSource.java:63)
at org.apache.flink.streaming.runtime.tasks.
SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:202)
Caused by: com.esotericsoftware.kryo.KryoException: Error constructing
instance of class: org.apache.avro.Schema$LockableArrayList
Serialization trace:
types (org.apache.avro.Schema$UnionSchema)
schema (org.apache.avro.Schema$Field)
fieldMap (org.apache.avro.Schema$RecordSchema)
schema (org.apache.avro.generic.GenericData$Record)
at com.twitter.chill.Instantiators$$anon$1.newInstance(KryoBase.scala:
136)
at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1061)
at com.esotericsoftware.kryo.serializers.CollectionSerializer.create(
CollectionSerializer.java:89)
at 

Re: [DISCUSS] decentralized scheduling strategy is needed

2020-01-22 Thread Till Rohrmann
Thanks for reporting the issue HuWeihua. Choosing the right scheduling
strategy when using Yarn with potentially infinite resources can be quite
hard because you don't know over how many TaskExecutors one should
distribute the tasks. It becomes easier if one can configure the minimum
number of TaskExecutors a cluster should always have. This is currently
being discussed and I hope that we can complete this feature for the next
release.

Cheers,
Till

On Wed, Jan 15, 2020 at 11:29 AM HuWeihua  wrote:

> Hi, Andrey
>
> Thanks for your response.
>
> I have checked this Jira ticket and I think it can work in standalone mode
> which TaskManager has been started before scheduling tasks.
> But we are currently running flink on yarn in per-job cluster mode.
>
> I noticed that this issue has already been raised. I will keep watching
> this ticket.
>
> Thanks again.
>
> Best
> Weihua Hu
>
> 2020年1月15日 17:53,Andrey Zagrebin  写道:
>
> HI HuWeihua,
>
> I think your issue should resolve with 1.9.2 and 1.10 (not released but in
> progress).
> You can check the related Jira ticket [1].
>
> Best,
> Andrey
>
> [1] https://jira.apache.org/jira/browse/FLINK-12122
>
> On Wed, Jan 15, 2020 at 10:08 AM HuWeihua  wrote:
>
>> Hi, All
>> We encountered some problems during the upgrade from Flink 1.5 to Flink
>> 1.9. Flink's scheduling strategy has changed. Flink 1.9 prefers centralized
>> scheduling, while Flink 1.5 prefers decentralized scheduling. This change
>> has caused resources imbalance and blocked our upgrade plan. We have
>> thousands of jobs that need to be upgraded.
>>
>> For example,
>> There is a job with 10 sources and 100 sinks. Each source need 1 core and
>> each sink need 0.1 core.
>> Try to run this job on Yarn, configure the numberOfTaskSlots is 10,
>> yarn.containers.vcores is 2.
>>
>> When using Flink-1.5:
>> Each TaskManager will run 1 source and 9 sinks, they need 1.9 cores
>> totally. So the job with this configuration works very well. The schedule
>> results is shown in Figure 1.
>> When using Flink-1.9:
>> The 10 sources will be scheduled to one TaskManager  and the 100 sinks
>> will scheduled to other 10 TaskManagers.  The schedule results is shown
>> in Figure 2.
>> In this scenario, the TaskManager which run sources need 10 cores, other
>> TaskManagers need 1 cores. But TaskManager must be configured the same, So
>> we need 11 TaskManager with 10 cores.
>> This situation waste (10-2)*11 = 88 cores more than Flink 1.5.
>>
>> In addition to the waste of resources, we also encountered other problems
>> caused by centralized scheduling strategy.
>>
>>1. Network bandwidth. Tasks of the same type are scheduled to the one
>>TaskManager, causing too much network traffic on the machine.
>>
>>
>>1. Some jobs need to sink to the local agent. After centralized
>>scheduling, the insufficient processing capacity of the single machine
>>causes a backlog of consumption.
>>
>>
>> In summary, we think a decentralized scheduling strategy is necessary.
>>
>>
>> Figure 1. Flink 1.5 schedule results
>> <粘贴的图形-3.tiff>
>>
>> Figure 2. Flink 1.9 schedule results
>> <粘贴的图形-4.tiff>
>>
>>
>>
>> Best
>> Weihua Hu
>>
>>
>


Re: Flink Metrics - PrometheusReporter

2020-01-22 Thread Sidney Feiner
Ok, I configured the PrometheusReporter's ports to be a range and now every 
TaskManager has it's own port where I can see it's metrics. Thank you very much!


Sidney Feiner / Data Platform Developer
M: +972.528197720 / Skype: sidney.feiner.startapp

[emailsignature]



From: Chesnay Schepler 
Sent: Wednesday, January 22, 2020 6:07 PM
To: Sidney Feiner ; flink-u...@apache.org 

Subject: Re: Flink Metrics - PrometheusReporter

Metrics are exposed via reporters by each process separately, whereas the WebUI 
aggregates metrics.

As such you have to configure Prometheus to also scrape the TaskExecutors.

On 22/01/2020 16:58, Sidney Feiner wrote:
Hey,
I've been trying to use the PrometheusReporter and when I used in locally on my 
computer, I would access the port I configured and see all the metrics I've 
created.
In production, we use High Availability mode and when I try to access the 
JobManager's metrics in the port I've configured on the PrometheusReporter, I 
see some very basic metrics - default Flink metrics, but I can't see any of my 
custom metrics.

Weird thing is I can see those metrics through Flink's UI in the Metrics tab:
[cid:part1.8D6219CF.AA6B4229@apache.org]

Does anybody have a clue why my custom metrics are configured but not being 
reported in high availability but are reported when I run the job locally 
though IntelliJ?

Thanks 



Sidney Feiner / Data Platform Developer
M: +972.528197720 / Skype: sidney.feiner.startapp

[emailsignature]




Re: Flink configuration on Docker deployment

2020-01-22 Thread Soheil Pourbafrani
Thanks a lot!

On Wed, Jan 22, 2020 at 3:58 AM Yang Wang  wrote:

> Hi Soheil,
>
> Since you are not using any container orchestration framework(e.g.
> docker-compose, Kubernetes,
> mesos), so you need to manually update the flink-conf.yaml in your docker
> images. Usually, it is
> located in the path "/opt/flink/conf".
> Docker volume also could be used to override the flink configuration when
> you start the jobmanager
> and taskmanager containers[1].
>
> Best,
> Yang
>
> [1]. https://docs.docker.com/storage/volumes/
>
> Soheil Pourbafrani  于2020年1月21日周二 下午7:46写道:
>
>> Hi,
>>
>> I need to set up a Flink cluster using the docker(and not using the
>> docker-compose). I successfully could strat the jobmanager and taskmanager
>> but the problem is I have no idea how to change the default configuration
>> for them. For example in the case of giving 8 slots to the taskmanager or
>> change the memory size of both jobmanager and taskmanager.
>> It will be appreciated if somebody tells me how to change the Flink
>> parameters on docker
>>
>> Thanks
>>
>


Re: Flink Metrics - PrometheusReporter

2020-01-22 Thread Chesnay Schepler
Metrics are exposed via reporters by each process separately, whereas 
the WebUI aggregates metrics.


As such you have to configure Prometheus to also scrape the TaskExecutors.

On 22/01/2020 16:58, Sidney Feiner wrote:

Hey,
I've been trying to use the PrometheusReporter and when I used in 
locally on my computer, I would access the port I configured and see 
all the metrics I've created.
In production, we use High Availability mode and when I try to access 
the JobManager's metrics in the port I've configured on the 
PrometheusReporter, I see some very basic metrics - default Flink 
metrics, but I can't see any of my custom metrics.


Weird thing is I can see those metrics through Flink's UI in the 
Metrics tab:



Does anybody have a clue why my custom metrics are configured but not 
being reported in high availability but are reported when I run the 
job locally though IntelliJ?


Thanks 



*Sidney Feiner**/*Data Platform Developer
M: +972.528197720 */*Skype: sidney.feiner.startapp
emailsignature





Flink Metrics - PrometheusReporter

2020-01-22 Thread Sidney Feiner
Hey,
I've been trying to use the PrometheusReporter and when I used in locally on my 
computer, I would access the port I configured and see all the metrics I've 
created.
In production, we use High Availability mode and when I try to access the 
JobManager's metrics in the port I've configured on the PrometheusReporter, I 
see some very basic metrics - default Flink metrics, but I can't see any of my 
custom metrics.

Weird thing is I can see those metrics through Flink's UI in the Metrics tab:
[cid:dc6050e2-a947-4856-8339-5daea66b6a77]

Does anybody have a clue why my custom metrics are configured but not being 
reported in high availability but are reported when I run the job locally 
though IntelliJ?

Thanks 




Sidney Feiner / Data Platform Developer
M: +972.528197720 / Skype: sidney.feiner.startapp

[emailsignature]



Re: Custom Metrics outside RichFunctions

2020-01-22 Thread David Magalhães
Thanks for the feedback. I will use elastalert to generate an alarm from
the logs.

On Wed, Jan 22, 2020, 15:03 Chesnay Schepler  wrote:

> It is not possible to access metrics from within a schema.
>
> I can't think of a non-hacky workaround (the hacky one being to create a
> custom kafka consumer that checks the schema class, casts it to your
> specific class, and then calls a method on your schema that accepts a
> metric group).
>
> On 22/01/2020 14:33, David Magalhães wrote:
>
> Hi Yun, I'm trying to use inside a custom *DeserializationSchema*. Here
> is the constructor of *FlinkKafkaConsumer*. Inside *DeserializationSchema*
> I can't use *getRuntimeContext()*.
>
> FlinkKafkaConsumer
> 
> (List
> 
>  
> > topics, DeserializationSchema
> 
>  
> > deserializer, Properties
> 
>  props)
>
> On Wed, Jan 22, 2020 at 3:21 AM Yun Tang  wrote:
>
>> Hi David
>>
>> FlinkKafkaConsumer in itself is RichParallelSourceFunction, and you could
>> call function below to register your metrics group:
>>
>> getRuntimeContext().getMetricGroup().addGroup("MyMetrics").counter("myCounter")
>>
>>
>>
>>
>> Best
>> Yun Tang
>> --
>> *From:* David Magalhães 
>> *Sent:* Tuesday, January 21, 2020 3:45
>> *To:* user 
>> *Subject:* Custom Metrics outside RichFunctions
>>
>> Hi, I want to create a custom metric that shows the number of message
>> that couldn't be deserialized using a custom deserializer inside
>> FlinkKafkaConsumer.
>>
>> Looking into Metrics page (
>> https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html
>>  )
>> that doesn't seem to be possible, because it it's a RichFunction.
>>
>> Anyone know another way to achieve this ?
>>
>> Thanks,
>> David
>>
>
>


Re: Custom Metrics outside RichFunctions

2020-01-22 Thread Chesnay Schepler

It is not possible to access metrics from within a schema.

I can't think of a non-hacky workaround (the hacky one being to create a 
custom kafka consumer that checks the schema class, casts it to your 
specific class, and then calls a method on your schema that accepts a 
metric group).


On 22/01/2020 14:33, David Magalhães wrote:
Hi Yun, I'm trying to use inside a custom *DeserializationSchema*. 
Here is the constructor of *FlinkKafkaConsumer*. Inside 
*DeserializationSchema* I can't use *getRuntimeContext()*.


FlinkKafkaConsumer 
(List 
> topics, 
DeserializationSchema 
> deserializer, 
Properties 
 props)


On Wed, Jan 22, 2020 at 3:21 AM Yun Tang > wrote:


Hi David

FlinkKafkaConsumer in itself is RichParallelSourceFunction, and
you could call function below to register your metrics group:


getRuntimeContext().getMetricGroup().addGroup("MyMetrics").counter("myCounter")




Best
Yun Tang

*From:* David Magalhães mailto:speeddra...@gmail.com>>
*Sent:* Tuesday, January 21, 2020 3:45
*To:* user mailto:user@flink.apache.org>>
*Subject:* Custom Metrics outside RichFunctions
Hi, I want to create a custom metric that shows the number of
message that couldn't be deserialized using a custom deserializer
inside FlinkKafkaConsumer.

Looking into Metrics page (

https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html )
that doesn't seem to be possible, because it it's a RichFunction.

Anyone know another way to achieve this ?

Thanks,
David





Re: request for a flink sink

2020-01-22 Thread zhisheng
 hi,flink don't have facebook faiss connector now, you can custom Sink
(implement SinkFunction)

容祖儿  于2020年1月22日周三 下午7:55写道:

> Hi members,
>
> Do you know if there is a sink who writes data to facebook faiss[1]?
> I am looking for a sink class like this one [2].
>
> [1] https://github.com/facebookresearch/faiss
> [2]
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/zh/dev/connectors/rabbitmq.html
>
> Thank you.
>


Re: Custom Metrics outside RichFunctions

2020-01-22 Thread David Magalhães
Hi Yun, I'm trying to use inside a custom *DeserializationSchema*. Here is
the constructor of *FlinkKafkaConsumer*. Inside *DeserializationSchema* I
can't use *getRuntimeContext()*.

FlinkKafkaConsumer

(List

http://docs.oracle.com/javase/7/docs/api/java/lang/String.html?is-external=true>
> topics, DeserializationSchema

https://ci.apache.org/projects/flink/flink-docs-release-1.9/api/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.html>
> deserializer, Properties

 props)

On Wed, Jan 22, 2020 at 3:21 AM Yun Tang  wrote:

> Hi David
>
> FlinkKafkaConsumer in itself is RichParallelSourceFunction, and you could
> call function below to register your metrics group:
>
> getRuntimeContext().getMetricGroup().addGroup("MyMetrics").counter("myCounter")
>
>
>
>
> Best
> Yun Tang
> --
> *From:* David Magalhães 
> *Sent:* Tuesday, January 21, 2020 3:45
> *To:* user 
> *Subject:* Custom Metrics outside RichFunctions
>
> Hi, I want to create a custom metric that shows the number of message that
> couldn't be deserialized using a custom deserializer inside
> FlinkKafkaConsumer.
>
> Looking into Metrics page (
> https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html
>  )
> that doesn't seem to be possible, because it it's a RichFunction.
>
> Anyone know another way to achieve this ?
>
> Thanks,
> David
>


Re: GC overhead limit exceeded, memory full of DeleteOnExit hooks for S3a files

2020-01-22 Thread Piotr Nowojski
Hi,

This is probably a known issue of Hadoop [1]. Unfortunately it was only fixed 
in 3.3.0.

Piotrek

[1] https://issues.apache.org/jira/browse/HADOOP-15658 


> On 22 Jan 2020, at 13:56, Till Rohrmann  wrote:
> 
> Thanks for reporting this issue Mark. I'm pulling Klou into this conversation 
> who knows more about the StreamingFileSink. @Klou does the StreamingFileSink 
> relies on DeleteOnExitHooks to clean up files?
> 
> Cheers,
> Till
> 
> On Tue, Jan 21, 2020 at 3:38 PM Mark Harris  > wrote:
> Hi,
> 
> We're using flink 1.7.2 on an EMR cluster v emr-5.22.0, which runs hadoop v 
> "Amazon 2.8.5". We've recently noticed that some TaskManagers fail (causing 
> all the jobs running on them to fail) with an "java.lang.OutOfMemoryError: GC 
> overhead limit exceeded”. The taskmanager (and jobs that should be running on 
> it) remain down until manually restarted.
> 
> I managed to take and analyze a memory dump from one of the afflicted 
> taskmanagers. 
> 
> It showed that 85% of the heap was made up of the 
> java.io.DeleteOnExitHook.files hashset. The majority of the strings in that 
> hashset (9041060 out of ~9041100) pointed to files that began 
> /tmp/hadoop-yarn/s3a/s3ablock
> 
> The problem seems to affect jobs that make use of the StreamingFileSink - all 
> of the taskmanager crashes have been on the taskmaster running at least one 
> job using this sink, and a cluster running only a single taskmanager / job 
> that uses the StreamingFileSink crashed with the GC overhead limit exceeded 
> error.
> 
> I've had a look for advice on handling this error more broadly without luck.
> 
> Any suggestions or advice gratefully received.
> 
> Best regards,
> 
> Mark Harris
> 
> 
> 
> The information contained in or attached to this email is intended only for 
> the use of the individual or entity to which it is addressed. If you are not 
> the intended recipient, or a person responsible for delivering it to the 
> intended recipient, you are not authorised to and must not disclose, copy, 
> distribute, or retain this message or any part of it. It may contain 
> information which is confidential and/or covered by legal professional or 
> other privilege under applicable law. 
> 
> The views expressed in this email are not necessarily the views of Centrica 
> plc or its subsidiaries, and the company, its directors, officers or 
> employees make no representation or accept any liability for its accuracy or 
> completeness unless expressly stated to the contrary. 
> 
> Additional regulatory disclosures may be found here: 
> https://www.centrica.com/privacy-cookies-and-legal-disclaimer#email 
>  
> 
> PH Jones is a trading name of British Gas Social Housing Limited. British Gas 
> Social Housing Limited (company no: 01026007), British Gas Trading Limited 
> (company no: 03078711), British Gas Services Limited (company no: 3141243), 
> British Gas Insurance Limited (company no: 06608316), British Gas New Heating 
> Limited (company no: 06723244), British Gas Services (Commercial) Limited 
> (company no: 07385984) and Centrica Energy (Trading) Limited (company no: 
> 02877397) are all wholly owned subsidiaries of Centrica plc (company no: 
> 3033654). Each company is registered in England and Wales with a registered 
> office at Millstream, Maidenhead Road, Windsor, Berkshire SL4 5GD. 
> 
> British Gas Insurance Limited is authorised by the Prudential Regulation 
> Authority and regulated by the Financial Conduct Authority and the Prudential 
> Regulation Authority. British Gas Services Limited and Centrica Energy 
> (Trading) Limited are authorised and regulated by the Financial Conduct 
> Authority. British Gas Trading Limited is an appointed representative of 
> British Gas Services Limited which is authorised and regulated by the 
> Financial Conduct Authority.



Re: GC overhead limit exceeded, memory full of DeleteOnExit hooks for S3a files

2020-01-22 Thread Till Rohrmann
Thanks for reporting this issue Mark. I'm pulling Klou into this
conversation who knows more about the StreamingFileSink. @Klou does the
StreamingFileSink relies on DeleteOnExitHooks to clean up files?

Cheers,
Till

On Tue, Jan 21, 2020 at 3:38 PM Mark Harris 
wrote:

> Hi,
>
> We're using flink 1.7.2 on an EMR cluster v emr-5.22.0, which runs hadoop
> v "Amazon 2.8.5". We've recently noticed that some TaskManagers fail
> (causing all the jobs running on them to fail) with an
> "java.lang.OutOfMemoryError: GC overhead limit exceeded”. The taskmanager
> (and jobs that should be running on it) remain down until manually
> restarted.
>
> I managed to take and analyze a memory dump from one of the afflicted
> taskmanagers.
>
> It showed that 85% of the heap was made up of
> the java.io.DeleteOnExitHook.files hashset. The majority of the strings in
> that hashset (9041060 out of ~9041100) pointed to files that began
> /tmp/hadoop-yarn/s3a/s3ablock
>
> The problem seems to affect jobs that make use of the StreamingFileSink -
> all of the taskmanager crashes have been on the taskmaster running at least
> one job using this sink, and a cluster running only a single taskmanager /
> job that uses the StreamingFileSink crashed with the GC overhead limit
> exceeded error.
>
> I've had a look for advice on handling this error more broadly without
> luck.
>
> Any suggestions or advice gratefully received.
>
> Best regards,
>
> Mark Harris
>
>
>
> The information contained in or attached to this email is intended only
> for the use of the individual or entity to which it is addressed. If you
> are not the intended recipient, or a person responsible for delivering it
> to the intended recipient, you are not authorised to and must not disclose,
> copy, distribute, or retain this message or any part of it. It may contain
> information which is confidential and/or covered by legal professional or
> other privilege under applicable law.
>
> The views expressed in this email are not necessarily the views of
> Centrica plc or its subsidiaries, and the company, its directors, officers
> or employees make no representation or accept any liability for its
> accuracy or completeness unless expressly stated to the contrary.
>
> Additional regulatory disclosures may be found here:
> https://www.centrica.com/privacy-cookies-and-legal-disclaimer#email
>
> PH Jones is a trading name of British Gas Social Housing Limited. British
> Gas Social Housing Limited (company no: 01026007), British Gas Trading
> Limited (company no: 03078711), British Gas Services Limited (company no:
> 3141243), British Gas Insurance Limited (company no: 06608316), British Gas
> New Heating Limited (company no: 06723244), British Gas Services
> (Commercial) Limited (company no: 07385984) and Centrica Energy (Trading)
> Limited (company no: 02877397) are all wholly owned subsidiaries of
> Centrica plc (company no: 3033654). Each company is registered in England
> and Wales with a registered office at Millstream, Maidenhead Road, Windsor,
> Berkshire SL4 5GD.
>
> British Gas Insurance Limited is authorised by the Prudential Regulation
> Authority and regulated by the Financial Conduct Authority and the
> Prudential Regulation Authority. British Gas Services Limited and Centrica
> Energy (Trading) Limited are authorised and regulated by the Financial
> Conduct Authority. British Gas Trading Limited is an appointed
> representative of British Gas Services Limited which is authorised and
> regulated by the Financial Conduct Authority.
>


request for a flink sink

2020-01-22 Thread 容祖儿
Hi members,

Do you know if there is a sink who writes data to facebook faiss[1]?
I am looking for a sink class like this one [2].

[1] https://github.com/facebookresearch/faiss
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.9/zh/dev/connectors/rabbitmq.html

Thank you.


request for a flink sink

2020-01-22 Thread 容祖儿
Hi members,

Do you know if there is a sink who writes data to facebook faiss[1]?
I am looking for a sink class like this one [2].

[1] https://github.com/facebookresearch/faiss
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.9/zh/dev/connectors/rabbitmq.html

Thank you.


Re: where does flink store the intermediate results of a join and what is the key?

2020-01-22 Thread Jark Wu
Hi Kant,

1) List of row is also sufficient in this case. Using a MapState is in
order to retract a row faster, and save the storage size.

2) State Process API is usually used to process save point. I’m afraid the
performance is not good to use it for querying.
On the other side, AFAIK, State Process API requires the uid of
operator. However, uid of operators is not set in Table API & SQL.
So I’m not sure whether it works or not.

3)You can have a custom statebackend by
implement org.apache.flink.runtime.state.StateBackend interface, and use it
via `env.setStateBackend(…)`.

Best,
Jark

On Wed, 22 Jan 2020 at 14:16, kant kodali  wrote:

> Hi Jark,
>
> 1) shouldn't it be a col1 to List of row? multiple rows can have the same
> joining key right?
>
> 2) Can I use state processor API
> 
> from an external application to query the intermediate results in near
> real-time? I thought querying rocksdb state is a widely requested feature.
> It would be really great to consider this feature for 1.11
>
> 3) Is there any interface where I can implement my own state backend?
>
> Thanks!
>
>
> On Tue, Jan 21, 2020 at 6:08 PM Jark Wu  wrote:
>
>> Hi Kant,
>>
>> 1) Yes, it will be stored in rocksdb statebackend.
>> 2) In old planner, the left state is the same with right state which are
>> both `>>`.
>> It is a 2-level map structure, where the `col1` is the join key, it
>> is the first-level key of the state. The key of the MapState is the input
>> row,
>> and the `count` is the number of this row, the expiredTime indicates
>> when to cleanup this row (avoid infinite state size). You can find the
>> source code here[1].
>> In blink planner, the state structure will be more complex which is
>> determined by the meta-information of upstream. You can see the source code
>> of blink planner here [2].
>> 3) Currently, the intermediate state is not exposed to users. Usually,
>> users should write the query result to an external system (like Mysql) and
>> query the external system.
>> Query on the intermediate state is on the roadmap, but I guess it is
>> not in 1.11 plan.
>>
>> Best,
>> Jark
>>
>> [1]:
>> http://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/join/NonWindowJoin.scala#L61
>> [2]:
>> https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/stream/state/JoinRecordStateViews.java#L45
>>
>>
>> 2020年1月21日 18:01,kant kodali  写道:
>>
>> Hi All,
>>
>> If I run a query like this
>>
>> StreamTableEnvironment.sqlQuery("select * from table1 join table2 on
>> table1.col1 = table2.col1")
>>
>> 1) Where will flink store the intermediate result? Imagine
>> flink-conf.yaml says state.backend = 'rocksdb'
>>
>> 2) If the intermediate results are stored in rockdb then what is the key
>> and value in this case(given the query above)?
>>
>> 3) What is the best way to query these intermediate results from an
>> external application? while the job is running and while the job is not
>> running?
>>
>> Thanks!
>>
>>
>>


Re: Influxdb reporter not honouring the metrics scope

2020-01-22 Thread David Anderson
Gaurav,

I haven't used it for a couple of years, so I don't know if it still works,
but https://github.com/jgrier/flink-stuff/tree/master/flink-influx-reporter is
an influxdb reporter (wrapped around
https://github.com/davidB/metrics-influxdb/tree/master/src/main/java/metrics_influxdb)
that uses scope formats.

Best,
David

On Mon, Jan 20, 2020 at 6:04 AM Gaurav Singhania  wrote:

> Hi,
> We are using influxdb reporter for flink 1.9 to capture our metrics. We
> want to override the scope of task metrics, however even after providing
> the config in yaml file the metrics continues to have the tags we don't
> want.
>
> The metric scope we want to change is :
> *metrics.scope.task *with a default configuration of "
> .taskmanager"
> We tried following configuration and none of them worked
> ".taskmanager..."
> ".taskmanager...constant_value."
>
> None of them worked and task_name continues to be part of the tags of the
> measurement sent by influxdb reporter.
>
> Thanks,
> Gaurav Singhania
>


Re: How to get Task metrics with StatsD metric reporter?

2020-01-22 Thread Chesnay Schepler
I presume your job/task names contains a space, which is included in the 
metrics scope?


You can either configure the metric scope such that the job/task ID is 
included instead, or create a modified version of the StatsDReporter 
that filters out additional characters(i.e., override #filterCharacters).


When it comes to automatically filtering characters the StatsDReporter 
is in a bit of a pickle; different backends have different rules for 
what characters are allowed which also differ with StatsD.

I'm not sure yet what the best solution for this is.

On 21/01/2020 17:18, John Smith wrote:
I think I figured it out. I used netcat to debug. I think the Telegraf 
StatsD server doesn't support spaces in the stats names.


On Mon, 20 Jan 2020 at 12:19, John Smith > wrote:


Hi, running Flink 1.8

I'm declaring my metric as such.

invalidList = getRuntimeContext()
   .getMetricGroup()
   .addGroup("MyMetrics")
   .meter("invalidList", new DropwizardMeterWrapper(new 
com.codahale.metrics.Meter()));

Then in my code I call.

invalidList.markEvent();


On the task nodes I enabled the Influx Telegraf StatsD server. And
I enabled the task node with.

metrics.reporter.stsd.class:
org.apache.flink.metrics.statsd.StatsDReporter
metrics.reporter.stsd.host: localhost
metrics.reporter.stsd.port: 8125

The metrics are being pushed to Elasticsearch. So far I only see
the Status_JVM_* metrics.

Do the task specific metrics come from the Job nodes? I have not
enabled reporting on the Job nodes yet.











Re: java.lang.StackOverflowError

2020-01-22 Thread zhisheng
1、建议问题别同时发到三个邮件去
2、找找还有没有更加明显的异常日志

刘建刚  于2020年1月22日周三 上午10:25写道:

> I am using flink 1.6.2 on yarn. State backend is rocksdb.
>
> > 2020年1月22日 上午10:15,刘建刚  写道:
> >
> >   I have a flink job which fails occasionally. I am eager to avoid
> this problem. Can anyone help me? The error stacktrace is as following:
> > java.io.IOException: java.lang.StackOverflowError
> >   at org.apache.flink.runtime.io
> .network.partition.consumer.InputChannel.checkError(InputChannel.java:191)
> >   at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.getNextBuffer(RemoteInputChannel.java:194)
> >   at org.apache.flink.runtime.io
> .network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:589)
> >   at org.apache.flink.runtime.io
> .network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:546)
> >   at
> org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:175)
> >   at org.apache.flink.streaming.runtime.io
> .StreamInputProcessor.processInput(StreamInputProcessor.java:236)
> >   at
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
> >   at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:335)
> >   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:754)
> >   at java.lang.Thread.run(Thread.java:745)
> > Caused by: java.lang.StackOverflowError
> >   at org.apache.flink.runtime.io
> .network.partition.consumer.SingleInputGate.notifyChannelNonEmpty(SingleInputGate.java:656)
> >   at org.apache.flink.runtime.io
> .network.partition.consumer.InputChannel.notifyChannelNonEmpty(InputChannel.java:125)
> >   at org.apache.flink.runtime.io
> .network.partition.consumer.InputChannel.setError(InputChannel.java:203)
> >   at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:403)
> >   at org.apache.flink.runtime.io
> .network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
> >   at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
> >   at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
> >   at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
> >   at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
> >   at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
> >   at org.apache.flink.runtime.io
> .network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
> >   at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
> >   at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
> >   at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
> >   at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
> >   at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
> >   at org.apache.flink.runtime.io
> .network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
> >   at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
> >   at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
> >   at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
> >   at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
> >   at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
> >   at org.apache.flink.runtime.io
> .network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
> >   at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
> >   at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
> >   at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
> >   at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
> >   at org.apache.flink.runtime.io
> 

Re: Re: Taskmanager fails to connect to Jobmanager [Could not find any IPv4 address that is not loopback or link-local. Using localhost address.]

2020-01-22 Thread Kumar Bolar, Harshith
Thank you, Yangze and Yang. 

Turns out the high-availability.cluster-id parameter on the TM and JM were 
different. After updating it, the issue went away.

On 17/01/20, 3:14 PM, "Yangze Guo"  wrote:

Hi, Harshith

As a supplementary note to Yang, the issue seems to be that something
went wrong when trying to connect to the ResourceManager.
There may be two possibilities, the leader of ResourceManager does not
write the znode or the TaskExecutor fails to connect to it. If you
turn on the DEBUG log, it will help a lot. Also, you could watch the
content znode "/leader/resource_manager_lock" of ZooKeeper.

Best,
Yangze Guo

On Fri, Jan 17, 2020 at 5:11 PM Yang Wang  wrote:
>
> Hi Kumar Bolar, Harshith,
>
> Could you please check the jobmanager log to find out what address the 
akka is listening?
> Also the address could be used to connected to the jobmanager on the 
taskmanger machine.
>
> BTW, if you could share the debug level logs of jobmanger and taskmanger. 
It will help a lot to find
> the root cause.
>
>
> Best,
> Yang
>
> Kumar Bolar, Harshith  于2020年1月16日周四 下午7:10写道:
>>
>> Hi all,
>>
>>
>>
>> We were previously using RHEL for our Flink machines. I'm currently 
working on moving them over to Ubuntu. When I start the task manager, it fails 
to connect to the job manager with the following message -
>>
>>
>>
>> 2020-01-16 10:54:42,777 INFO  
org.apache.flink.runtime.util.LeaderRetrievalUtils- Trying to 
select the network interface and address to use by connecting to the leading 
JobManager.
>>
>> 2020-01-16 10:54:42,778 INFO  
org.apache.flink.runtime.util.LeaderRetrievalUtils- TaskManager 
will try to connect for 1 milliseconds before falling back to heuristics
>>
>> 2020-01-16 10:54:52,780 WARN  
org.apache.flink.runtime.net.ConnectionUtils  - Could not find 
any IPv4 address that is not loopback or link-local. Using localhost address.
>>
>>
>>
>> The network interface on the machine looks like this -
>>
>>
>>
>>
>>
>> ens5: flags=4163  mtu 9001
>>
>> inet 
https://urldefense.proofpoint.com/v2/url?u=http-3A__10.16.75.30=DwIFaQ=gtIjdLs6LnStUpy9cTOW9w=61bFb6zUNKZxlAQDRo_jKA=-xUFfpbE8LMFd_Z2bLTC60iRhyX6kRY17t4_2KSy_xs=53OA1njQf1iaaB9btpV8bgi7qYya9rwWK9DUUr9A580=
   netmask 
https://urldefense.proofpoint.com/v2/url?u=http-3A__255.255.255.128=DwIFaQ=gtIjdLs6LnStUpy9cTOW9w=61bFb6zUNKZxlAQDRo_jKA=-xUFfpbE8LMFd_Z2bLTC60iRhyX6kRY17t4_2KSy_xs=VAkjHsvEbPDirc3_I_ZhOOrimAfIYYsGUUEVaC1rhH8=
   broadcast 
https://urldefense.proofpoint.com/v2/url?u=http-3A__10.16.75.127=DwIFaQ=gtIjdLs6LnStUpy9cTOW9w=61bFb6zUNKZxlAQDRo_jKA=-xUFfpbE8LMFd_Z2bLTC60iRhyX6kRY17t4_2KSy_xs=hXAq7oqzjzyE4mGfBDaQQhn-LhoxC7tjLRTzYvNufxE=
 
>>
>> ether 02:f1:8b:34:75:51  txqueuelen 1000  (Ethernet)
>>
>> RX packets 69370  bytes 80369110 (80.3 MB)
>>
>> RX errors 0  dropped 0  overruns 0  frame 0
>>
>> TX packets 28787  bytes 2898540 (2.8 MB)
>>
>> TX errors 0  dropped 0 overruns 0  carrier 0  collisions 0
>>
>>
>>
>> lo: flags=73  mtu 65536
>>
>> inet 
https://urldefense.proofpoint.com/v2/url?u=http-3A__127.0.0.1=DwIFaQ=gtIjdLs6LnStUpy9cTOW9w=61bFb6zUNKZxlAQDRo_jKA=-xUFfpbE8LMFd_Z2bLTC60iRhyX6kRY17t4_2KSy_xs=qzQ1Wkjhm6A1Y5WVp7oWupNN6xESCglqAofLiJnyXXg=
   netmask 
https://urldefense.proofpoint.com/v2/url?u=http-3A__255.0.0.0=DwIFaQ=gtIjdLs6LnStUpy9cTOW9w=61bFb6zUNKZxlAQDRo_jKA=-xUFfpbE8LMFd_Z2bLTC60iRhyX6kRY17t4_2KSy_xs=bdn272PCxu0YYSd6dH2BPxITorcdno5flP1nOB379ns=
 
>>
>> loop  txqueuelen 1000  (Local Loopback)
>>
>> RX packets 9562  bytes 1596138 (1.5 MB)
>>
>> RX errors 0  dropped 0  overruns 0  frame 0
>>
>> TX packets 9562  bytes 1596138 (1.5 MB)
>>
>> TX errors 0  dropped 0 overruns 0  carrier 0  collisions 0
>>
>>
>>
>>
>>
>> Note: On RHEL, the primary network interface was eth0. Could this be the 
issue?
>>
>>
>>
>> Here's the full task manager log - 
https://urldefense.proofpoint.com/v2/url?u=https-3A__paste.ubuntu.com_p_vgh96FHzRq_=DwIFaQ=gtIjdLs6LnStUpy9cTOW9w=61bFb6zUNKZxlAQDRo_jKA=-xUFfpbE8LMFd_Z2bLTC60iRhyX6kRY17t4_2KSy_xs=xslfIsmeNZvi6en0rEcsuZ-0ODtAxVYaNgAJViglZDY=
 
>>
>>
>>
>> Thanks
>>
>> Harshith




Re: flink on yarn任务启动报错 The assigned slot container_e10_1579661300080_0005_01_000002_0 was removed.

2020-01-22 Thread zhisheng
应该是你作业之前挂过了

郑 洁锋  于2020年1月22日周三 上午11:16写道:

> 大家好,
>flink on yarn任务启动时,发现报错了The assigned slot
> container_e10_1579661300080_0005_01_02_0 was removed.
>环境:flink1.8.1,cdh5.14.2,kafka0.10,jdk1.8.0_241
>
> flink版本为1.8.1,yarn上的日志:
>
> 20/01/22 11:07:53 INFO entrypoint.ClusterEntrypoint:
> 
> 20/01/22 11:07:53 INFO entrypoint.ClusterEntrypoint:  Starting
> YarnJobClusterEntrypoint (Version: , Rev:7297bac, Date:24.06.2019
> @ 23:04:28 CST)
> 20/01/22 11:07:53 INFO entrypoint.ClusterEntrypoint:  OS current user:
> cloudera-scm
> 20/01/22 11:07:53 INFO entrypoint.ClusterEntrypoint:  Current
> Hadoop/Kerberos user: root
> 20/01/22 11:07:53 INFO entrypoint.ClusterEntrypoint:  JVM: Java
> HotSpot(TM) 64-Bit Server VM - Oracle Corporation - 1.8/25.241-b07
> 20/01/22 11:07:53 INFO entrypoint.ClusterEntrypoint:  Maximum heap size:
> 406 MiBytes
> 20/01/22 11:07:53 INFO entrypoint.ClusterEntrypoint:  JAVA_HOME:
> /usr/java/default
> 20/01/22 11:07:53 INFO entrypoint.ClusterEntrypoint:  Hadoop version: 2.6.5
> 20/01/22 11:07:53 INFO entrypoint.ClusterEntrypoint:  JVM Options:
> 20/01/22 11:07:53 INFO entrypoint.ClusterEntrypoint: -Xms424m
> 20/01/22 11:07:53 INFO entrypoint.ClusterEntrypoint: -Xmx424m
> 20/01/22 11:07:53 INFO entrypoint.ClusterEntrypoint:  Program Arguments:
> (none)
> 20/01/22 11:07:53 INFO entrypoint.ClusterEntrypoint:  Classpath:
>