Re: RichAsyncFunction for Scala?

2019-05-17 Thread Rong Rong
Hi Shannon,

I think the RichAsyncFunction[1] extends from the normal AsyncFunction so
regarding on the API perspective you should be able to use it.

The problem I think is with Scala anonymous function where I think it went
through a different code path when wrapping the Scala RichAsyncFunction
[2].
Is your problem specifically with the rich anonymous async function or do
you also have problem with regular function extended from RichAsyncFunction?

--
Rong

[1]
https://github.com/apache/flink/blob/release-1.8/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/async/RichAsyncFunction.scala
[2]
https://github.com/apache/flink/blob/release-1.8/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AsyncDataStream.scala#L289

On Thu, May 16, 2019 at 12:26 AM Fabian Hueske  wrote:

> Hi Shannon,
>
> That's a good observation. To be honest, I know why the Scala
> AsyncFunction does not implement RichFunction.
> Maybe this was not intentional and just overlooked when porting the
> functionality to Scala.
>
> Would you mind creating a Jira ticket for this?
>
> Thank you,
> Fabian
>
> Am Di., 14. Mai 2019 um 23:29 Uhr schrieb Shannon Carey <
> sca...@expedia.com>:
>
>> I have some awkward code in a few Flink jobs which is converting a Scala
>> stream into a Java stream in order to pass it to
>> AsyncDataStream.unorderedWait(), and using a Java RichAsyncFunction, due to
>> old versions of Flink not having the ability to do async stuff with a Scala
>> stream.
>>
>>
>>
>> In newer versions of Flink, I see that
>> org.apache.flink.streaming.api.scala.AsyncDataStream is available. However,
>> it accepts only org.apache.flink.streaming.api.scala.async.AsyncFunction,
>> and there does not appear to be an AbstractRichFunction subclass of that
>> trait as I expected. Is there a way to use the Scala interfaces but provide
>> a rich AsyncFunction to AsyncDataStream.unorderedWait()? If not, I will
>> leave the old code as-is.
>>
>>
>>
>> Thanks,
>>
>> Shannon
>>
>


Propagating delta from window upon trigger

2019-05-17 Thread Nikhil Goyal
Hi guys,

Is there a way in Flink to only propagate the changes which happened in the
window's state rather than dumbing the contents of the window again and
again upon trigger?

Thanks
Nikhil


Re: Received fatal alert: certificate_unknown

2019-05-17 Thread Andrey Zagrebin
Hi Pedro,

thanks for letting know!

Best,
Andrey

On Fri, May 17, 2019 at 4:29 PM PedroMrChaves 
wrote:

> We found the issue.
>
> It was using the DNSName for the certificate validation and we were
> accessing via localhost.
>
>
>
> -
> Best Regards,
> Pedro Chaves
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Flink Forward Europe 2019 - Call for Presentations open until 17th May

2019-05-17 Thread Oytun Tez
Thanks for the update, Robert!

I am planning to prepare a use case content with how we use Flink at
MotaWord, focusing more on Flink as "application framework", rather than
confining our mindset to Flink as "stream processor", on non-Uber,
non-Alibaba scales. Hopefully over the weekend, I should be ready to submit
CFP.




---
Oytun Tez

*M O T A W O R D*
The World's Fastest Human Translation Platform.
oy...@motaword.com — www.motaword.com


On Fri, May 17, 2019 at 11:02 AM Robert Metzger  wrote:

> Hey all,
>
> Short update on the Flink Forward Call For Presentations: We've extended
> the submission deadline till May 31 ... so there's more time to finish the
> talk abstracts.
>
> Also, the organizers are now able to cover travel costs for speakers in
> cases where an employer can not cover them.
>
>
>
> On Fri, May 10, 2019 at 9:38 AM Fabian Hueske  wrote:
>
>> Hi Tim,
>>
>> Thanks for submitting a talk!
>> This sounds like a good and interesting use case to me.
>> Machine learning on streaming data is definitely a relevant and
>> interesting
>> topic for Flink Forward!
>>
>> Best,
>> Fabian
>>
>>
>> Am Mo., 6. Mai 2019 um 19:52 Uhr schrieb Tim Frey :
>>
>> > Hi All,
>> >
>> > Sounds interesting. I submitted a talk about using Flink for machine
>> > learning.
>> > However, I would also be happy to gain some community feedback if the
>> > topic is to the right interest of the community.
>> >
>> > In short, we use flink to train machine learning models and to then use
>> > the same models for predict then. Our goal was to determine if it is
>> > possible to predict crypto currency exchange rates by utilizing social
>> data
>> > from Twitter.
>> > I would talk about our experiences and describe how we leveraged online
>> > learning in conjunction with social data to determine if we are able to
>> > predict future currency exchange rates. I’ll point out the general
>> > architecture and describe the most interesting findings.
>> >
>> > Best
>> > Tim
>> >
>> > -Ursprüngliche Nachricht-
>> > Von: Robert Metzger 
>> > Gesendet: Montag, 6. Mai 2019 09:44
>> > An: Fabian Hueske 
>> > Cc: user ; dev ;
>> > commun...@flink.apache.org
>> > Betreff: Re: Flink Forward Europe 2019 - Call for Presentations open
>> until
>> > 17th May
>> >
>> > Thanks for announcing the Call for Presentations here!
>> >
>> > Since the deadline is approaching, I wanted to bump up this thread to
>> > remind everybody to submit talks!
>> > Please reach out to me or Fabian directly if you have any questions or
>> if
>> > you need any support!
>> >
>> >
>> >
>> > On Thu, Apr 11, 2019 at 3:47 PM Fabian Hueske 
>> wrote:
>> >
>> > > Hi all,
>> > >
>> > > Flink Forward Europe returns to Berlin on October 7-9th, 2019.
>> > > We are happy to announce that the Call for Presentations is open!
>> > >
>> > > Please submit a proposal if you'd like to present your Apache Flink
>> > > experience, best practices, new features, or use cases in front of an
>> > > international audience of highly skilled and enthusiastic Flink users
>> > > and committers.
>> > >
>> > > Flink Forward will run tracks for the following topics:
>> > > * Use Case
>> > > * Operations
>> > > * Technology Deep Dive
>> > > * Ecosystem
>> > > * Research
>> > >
>> > > For the first time, we'll also have a Community track.
>> > >
>> > > Please find the submission form at
>> > > https://berlin-2019.flink-forward.org/call-for-presentations
>> > >
>> > > The deadline for submissions is May 17th, 11:59pm (CEST).
>> > >
>> > > Best regards,
>> > > Fabian
>> > > (PC Chair for Flink Forward Berlin 2019)
>> > >
>> >
>> >
>>
>


FlinkKinesisConsumer not getting data from Kinesis at a constant speed -lag of about 30-55 secs

2019-05-17 Thread Vijay Balakrishnan
Hi,
In using FlinkKinesisConsumer, I am seeing a lag of about 30-55 secs in
fetching data from Kinesis after it has done 1 or 2 fetches even though
data is getting put in the Kinesis data stream at a high clip.
I used ConsumerConfigConstants.SHARD_GETRECORDS_MAX of 1 (tried with
5000, 200 etc) and ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS
of 200ms(default is great here becaise of the 5 transaction limit per sec
from AWS).Have also tried reducing the interval but I run into
readThroughput Exception. How can I reduce this lag to make it pretty much
real-time. I am also using Flink Processing time. Have gone from 1-3 shards
for Kinesis Data stream. Is there some other tuning parm I need to add for
FlinkKinesisConsumer or is it just that it doesn't have any data to pull
from Kinesis.
I do 5 sec Tumbling time windows and use the window end timestamp to put
into my InfluxDB timestamp column. I see that there is a constant 35 sec-
55 sec lag in the timestamps and that corresponds to the time lag I see in
the logs where FlinkKinesisConsumer is waiting to fetch data from Kinesis.
I am seeing these log statements and not sure what to make of it to reduce
the time lag of fetching data from Kinesis.
Logs:

23:23:40,286 [shardConsumers-Source: Custom Source -> (Map -> Sink:
Unnamed, Filter) (2/8)-thread-0] DEBUG
org.apache.flink.kinesis.shaded.com.amazonaws.requestId  [] -
x-amzn-RequestId: f06409aa-d996-fb3f-a53c-5c066d509c9b
23:23:40,335 [Source: Custom Source -> (Map -> Sink: Unnamed, Filter)
(2/8)] DEBUG
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher
[] - Subtask 1 is trying to discover new shards that were created due to
resharding ...


TIA,


Re: Flink Forward Europe 2019 - Call for Presentations open until 17th May

2019-05-17 Thread Robert Metzger
Hey all,

Short update on the Flink Forward Call For Presentations: We've extended
the submission deadline till May 31 ... so there's more time to finish the
talk abstracts.

Also, the organizers are now able to cover travel costs for speakers in
cases where an employer can not cover them.



On Fri, May 10, 2019 at 9:38 AM Fabian Hueske  wrote:

> Hi Tim,
>
> Thanks for submitting a talk!
> This sounds like a good and interesting use case to me.
> Machine learning on streaming data is definitely a relevant and interesting
> topic for Flink Forward!
>
> Best,
> Fabian
>
>
> Am Mo., 6. Mai 2019 um 19:52 Uhr schrieb Tim Frey :
>
> > Hi All,
> >
> > Sounds interesting. I submitted a talk about using Flink for machine
> > learning.
> > However, I would also be happy to gain some community feedback if the
> > topic is to the right interest of the community.
> >
> > In short, we use flink to train machine learning models and to then use
> > the same models for predict then. Our goal was to determine if it is
> > possible to predict crypto currency exchange rates by utilizing social
> data
> > from Twitter.
> > I would talk about our experiences and describe how we leveraged online
> > learning in conjunction with social data to determine if we are able to
> > predict future currency exchange rates. I’ll point out the general
> > architecture and describe the most interesting findings.
> >
> > Best
> > Tim
> >
> > -Ursprüngliche Nachricht-
> > Von: Robert Metzger 
> > Gesendet: Montag, 6. Mai 2019 09:44
> > An: Fabian Hueske 
> > Cc: user ; dev ;
> > commun...@flink.apache.org
> > Betreff: Re: Flink Forward Europe 2019 - Call for Presentations open
> until
> > 17th May
> >
> > Thanks for announcing the Call for Presentations here!
> >
> > Since the deadline is approaching, I wanted to bump up this thread to
> > remind everybody to submit talks!
> > Please reach out to me or Fabian directly if you have any questions or if
> > you need any support!
> >
> >
> >
> > On Thu, Apr 11, 2019 at 3:47 PM Fabian Hueske  wrote:
> >
> > > Hi all,
> > >
> > > Flink Forward Europe returns to Berlin on October 7-9th, 2019.
> > > We are happy to announce that the Call for Presentations is open!
> > >
> > > Please submit a proposal if you'd like to present your Apache Flink
> > > experience, best practices, new features, or use cases in front of an
> > > international audience of highly skilled and enthusiastic Flink users
> > > and committers.
> > >
> > > Flink Forward will run tracks for the following topics:
> > > * Use Case
> > > * Operations
> > > * Technology Deep Dive
> > > * Ecosystem
> > > * Research
> > >
> > > For the first time, we'll also have a Community track.
> > >
> > > Please find the submission form at
> > > https://berlin-2019.flink-forward.org/call-for-presentations
> > >
> > > The deadline for submissions is May 17th, 11:59pm (CEST).
> > >
> > > Best regards,
> > > Fabian
> > > (PC Chair for Flink Forward Berlin 2019)
> > >
> >
> >
>


Re: Received fatal alert: certificate_unknown

2019-05-17 Thread PedroMrChaves
We found the issue.

It was using the DNSName for the certificate validation and we were
accessing via localhost.



-
Best Regards,
Pedro Chaves
--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Issue with job crashing due to KinesisProducer

2019-05-17 Thread Steven Nelson
Hello!

We are running a simple job on a Flink 1.7.2 cluster that reads from one
kinesis stream, de-duplicates some values and writes to another stream. We
made some changes to use IngestionTime and added a custom AutoWatermarker
to emit watermarks in case nothing comes in on the stream after a period of
time.

Originally we were using EventTime and getting the even time from the
stream arrival time, but we decided that IngestionTime fit better with the
data we were working with. I'm not sure if that makes a difference.

Here is the exception we are getting.

java.lang.RuntimeException: Exception occurred while processing valve
output watermark:
at
org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:265)
at
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
at
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
at
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:184)
at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
at java.lang.Thread.run(Thread.java:748)
Caused by:
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
Could not forward element to next operator
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:596)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
at
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
at
org.apache.flink.streaming.api.functions.windowing.PassThroughWindowFunction.apply(PassThroughWindowFunction.java:36)
at
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction.process(InternalSingleValueWindowFunction.java:46)
at
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:546)
at
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onEventTime(WindowOperator.java:454)
at
org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:251)
at
org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:128)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:775)
at
org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:262)
... 7 more
Caused by:
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
Could not forward element to next operator
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:596)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
at
org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
... 20 more
Caused by: java.lang.RuntimeException: Kinesis producer has been closed
at
org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer.invoke(FlinkKinesisProducer.java:256)
at 
at 
at
org.apache.flink.streaming.api.scala.DataStream$$anon$8.invoke(DataStream.scala:1133)
at
org.apache.flink.streaming.api.functions.sink.SinkFunction.invoke(SinkFunction.java:52)
at
org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
... 26 more


Flink 1.8.0: Akka starting actor system with IP address instead of host name

2019-05-17 Thread Kumar Bolar, Harshith
Hi all,

We have recently upgraded from 1.7.2 to 1.8.0. In 1.7.2, when I start a task 
manager in Standalone mode, the actor system was being started with a host 
name. But after upgrading to 1.8.0, this is getting started with the IP 
address. As a result, the task managers on the dashboard show IP addresses 
instead of their hostnames. Is there some change that needs to be done while 
starting task managers when switching from 1.8.0 to 1.7.2?

This is how I’m starting the task manager - ${Flink_HOME}/bin/taskmanager.sh 
start

Including logs of both versions –

Flink 1.7.2:
2019-05-17 08:36:59,421 WARN  
org.apache.flink.configuration.Configuration  - Config uses 
deprecated configuration key 'jobmanager.rpc.address' instead of proper key 
'rest.address'
   2019-05-17 08:36:59,425 INFO  
org.apache.flink.runtime.util.LeaderRetrievalUtils- Trying to 
select the network interface and address to use by connecting to the leading 
JobManager.
   2019-05-17 08:36:59,425 INFO  
org.apache.flink.runtime.util.LeaderRetrievalUtils- TaskManager 
will try to connect for 1 milliseconds before falling back to heuristics
   2019-05-17 08:36:59,428 INFO  
org.apache.flink.runtime.net.ConnectionUtils  - Retrieved new 
target address flink0-0.high.ue1.pre.aws.cloud.abc.com/10.86.94.11:6123.

   2019-05-17 08:36:59,430 INFO  
org.apache.flink.runtime.taskexecutor.TaskManagerRunner   - TaskManager 
will use hostname/address 'flink1-0.high.ue1.pre.aws.cloud.abc.com' 
(10.86.94.11) for communication.
   2019-05-17 08:36:59,432 INFO  
org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils - Trying to start 
actor system at flink1-0.high.ue1.pre.aws.cloud.abc.com:0
   2019-05-17 08:36:59,802 INFO  akka.event.slf4j.Slf4jLogger   
   - Slf4jLogger started
   2019-05-17 08:36:59,856 INFO  akka.remote.Remoting   
   - Starting remoting
   2019-05-17 08:36:59,964 INFO  akka.remote.Remoting   
   - Remoting started; listening on addresses 
:[akka.tcp://fl...@flink1-0.high.ue1.pre.aws.cloud.abc.com:20183]
   2019-05-17 08:36:59,966 INFO  
org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils - Actor system 
started at akka.tcp://fl...@flink1-0.high.ue1.pre.aws.cloud.abc.com:20183
   2019-05-17 08:36:59,979 INFO  
org.apache.flink.runtime.taskexecutor.TaskManagerRunner   - Trying to start 
actor system at flink1-0.high.ue1.pre.aws.cloud.abc.com:0
   2019-05-17 08:37:00,017 INFO  akka.event.slf4j.Slf4jLogger   
   - Slf4jLogger started
   2019-05-17 08:37:00,025 INFO  akka.remote.Remoting   
   - Starting remoting

Flink 1.8.0:
2019-05-17 08:48:59,819 INFO  
org.apache.flink.configuration.Configuration  - Config uses 
fallback configuration key 'jobmanager.rpc.address' instead of key 
'rest.address'
2019-05-17 08:48:59,824 INFO  
org.apache.flink.runtime.util.LeaderRetrievalUtils- Trying to 
select the network interface and address to use by connecting to the leading 
JobManager.
2019-05-17 08:48:59,824 INFO  
org.apache.flink.runtime.util.LeaderRetrievalUtils- TaskManager 
will try to connect for 1 milliseconds before falling back to heuristics
2019-05-17 08:48:59,827 INFO  
org.apache.flink.runtime.net.ConnectionUtils  - Retrieved new 
target address flink0-1.high.ue1.pre.aws.cloud.abc.com/10.16.74.117:6123.
2019-05-17 08:48:59,830 INFO  
org.apache.flink.runtime.taskexecutor.TaskManagerRunner   - TaskManager 
will use hostname/address 'flink1-1.high.ue1.pre.aws.cloud.abc.com' 
(10.86.94.11) for communication.
2019-05-17 08:48:59,832 INFO  
org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils - Trying to start 
actor system at 10.86.94.11:0
2019-05-17 08:49:00,212 INFO  akka.event.slf4j.Slf4jLogger  
- Slf4jLogger started
2019-05-17 08:49:00,277 INFO  akka.remote.Remoting  
- Starting remoting
2019-05-17 08:49:00,422 INFO  akka.remote.Remoting  
- Remoting started; listening on addresses 
:[akka.tcp://flink@10.86.94.11:12923]
2019-05-17 08:49:00,429 INFO  
org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils - Actor system 
started at akka.tcp://flink@10.86.94.11:12923
2019-05-17 08:49:00,438 INFO  
org.apache.flink.runtime.taskexecutor.TaskManagerRunner   - Trying to start 
actor system at 10.86.94.11:0
2019-05-17 08:49:00,462 INFO  akka.event.slf4j.Slf4jLogger  
- Slf4jLogger started
2019-05-17 08:49:00,467 INFO  akka.remote.Remoting  
- Starting remoti

Re: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered when running a large job > 10k tasks

2019-05-17 Thread zhijiang
I already created the jira [1] for it and you could monitor it for progress.

In addition, the SpillableSubpartition would be abandoned from FLINK-1.9, and 
stephan already implemented a new BoundedBlockingSubpartition to replace it. Of 
course we would still provide the support for the existing bugs in previous 
flink versions.

[1] https://issues.apache.org/jira/browse/FLINK-12544

Best,
Zhijiang


--
From:Narayanaswamy, Krishna 
Send Time:2019年5月17日(星期五) 19:00
To:zhijiang ; Aljoscha Krettek 
; Piotr Nowojski 
Cc:Nico Kruber ; user@flink.apache.org 
; "Chan, Regina" ; "Erai, Rahul" 

Subject:RE: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered 
when running a large job > 10k tasks


Thanks Zhijiang. 

We will try these deadlock usecases with a single slot approach to see how they 
go. Will await the fix to start using more slots on the single TM.

Thanks,
Krishna.

From: zhijiang  
Sent: Friday, May 17, 2019 4:05 PM
To: Aljoscha Krettek ; Piotr Nowojski 
; Narayanaswamy, Krishna [Tech] 

Cc: Nico Kruber ; user@flink.apache.org; Chan, Regina 
[Tech] ; Erai, Rahul [Tech] 

Subject: Re: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered 
when running a large job > 10k tasks
I already analyzed out this deadlock case based on the codes. FLINK-10491 has 
already solved on place to cause deadlock in SpillableSubpartition, but this is 
a different place to cause this issue.
When source task is trying to release subpartition memory, meanwhile another 
CoGroup task is submitted to trigger source task to release its memory, then it 
might cause deadlock.
I would create a jira ticket for this issue and think how to solve it soon. 
Currently if you still want to use the blocking type, the simple way to avoid 
this is to make only one slot in TM, then there never happen one task triggers 
another task to release memory in the same TM. Or you could increase the 
network buffer setting to work aournd, but not sure this way could work for 
your case because it is up to the total data size the source produced.
Best,
Zhijiang
--
From:Narayanaswamy, Krishna 
Send Time:2019年5月17日(星期五) 17:37
To:Zhijiang(wangzhijiang999) ; Aljoscha Krettek 
; Piotr Nowojski 
Cc:Nico Kruber ; user@flink.apache.org 
; "Chan, Regina" ; "Erai, Rahul" 

Subject:RE: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered 
when running a large job > 10k tasks
We see this JIRA issue (FLINK-10491) as fixed and the fix is present in Flink 
v1.6.4 which we are using now but the problem now seems to come up for 
relatively simpler scenarios as well. Deadlock dump below -

Java stack information for the threads listed above:
===
"CoGroup (2/2)":
at 
org.apache.flink.runtime.io.network.NetworkEnvironment.registerTask(NetworkEnvironment.java:213)
- waiting to lock <0x00062bf859b8> (a java.lang.Object)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:614)
at java.lang.Thread.run(Thread.java:745)
"CoGroup (1/2)":
at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:277)
- waiting to lock <0x00063fdf4888> (a java.util.ArrayDeque)
at 
org.apache.flink.runtime.io.network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
at 
org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
at 
org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
at 
org.apache.flink.runtime.io.network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
at 
org.apache.flink.runtime.io.network.buffer.BufferConsumer.close(BufferConsumer.java:121)
at 
org.apache.flink.runtime.io.network.partition.SpillableSubpartition.spillFinishedBufferConsumers(SpillableSubpartition.java:274)
at 
org.apache.flink.runtime.io.network.partition.SpillableSubpartition.releaseMemory(SpillableSubpartition.java:239)
- locked <0x00063fdf4ac8> (a java.util.ArrayDeque)
at 
org.apache.flink.runtime.io.network.partition.ResultPartition.releaseMemory(ResultPartition.java:371)
at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.setNumBuffers(LocalBufferPool.java:375)
at 
org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.redistributeBuffers(NetworkBufferPool.java:408)
at 
org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:297)
- locked <0x00063c785350> (a java.lang.Object)
at 
org.apache.flink.runtime.io.

RE: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered when running a large job > 10k tasks

2019-05-17 Thread Narayanaswamy, Krishna
Thanks Zhijiang.

We will try these deadlock usecases with a single slot approach to see how they 
go. Will await the fix to start using more slots on the single TM.

Thanks,
Krishna.

From: zhijiang 
Sent: Friday, May 17, 2019 4:05 PM
To: Aljoscha Krettek ; Piotr Nowojski 
; Narayanaswamy, Krishna [Tech] 

Cc: Nico Kruber ; user@flink.apache.org; Chan, Regina 
[Tech] ; Erai, Rahul [Tech] 

Subject: Re: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered 
when running a large job > 10k tasks

I already analyzed out this deadlock case based on the codes. FLINK-10491 has 
already solved on place to cause deadlock in SpillableSubpartition, but this is 
a different place to cause this issue.

When source task is trying to release subpartition memory, meanwhile another 
CoGroup task is submitted to trigger source task to release its memory, then it 
might cause deadlock.

I would create a jira ticket for this issue and think how to solve it soon. 
Currently if you still want to use the blocking type, the simple way to avoid 
this is to make only one slot in TM, then there never happen one task triggers 
another task to release memory in the same TM. Or you could increase the 
network buffer setting to work aournd, but not sure this way could work for 
your case because it is up to the total data size the source produced.

Best,
Zhijiang
--
From:Narayanaswamy, Krishna 
mailto:krishna.narayanasw...@gs.com>>
Send Time:2019年5月17日(星期五) 17:37
To:Zhijiang(wangzhijiang999) 
mailto:wangzhijiang...@aliyun.com>>; Aljoscha 
Krettek mailto:aljos...@apache.org>>; Piotr Nowojski 
mailto:pi...@data-artisans.com>>
Cc:Nico Kruber mailto:n...@data-artisans.com>>; 
user@flink.apache.org 
mailto:user@flink.apache.org>>; "Chan, Regina" 
mailto:regina.c...@gs.com>>; "Erai, Rahul" 
mailto:rahul.e...@gs.com>>
Subject:RE: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered 
when running a large job > 10k tasks

We see this JIRA issue (FLINK-10491) as fixed and the fix is present in Flink 
v1.6.4 which we are using now but the problem now seems to come up for 
relatively simpler scenarios as well. Deadlock dump below -

Java stack information for the threads listed above:
===
"CoGroup (2/2)":
at 
org.apache.flink.runtime.io.network.NetworkEnvironment.registerTask(NetworkEnvironment.java:213)
- waiting to lock <0x00062bf859b8> (a java.lang.Object)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:614)
at java.lang.Thread.run(Thread.java:745)
"CoGroup (1/2)":
at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:277)
- waiting to lock <0x00063fdf4888> (a java.util.ArrayDeque)
at 
org.apache.flink.runtime.io.network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
at 
org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
at 
org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
at 
org.apache.flink.runtime.io.network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
at 
org.apache.flink.runtime.io.network.buffer.BufferConsumer.close(BufferConsumer.java:121)
at 
org.apache.flink.runtime.io.network.partition.SpillableSubpartition.spillFinishedBufferConsumers(SpillableSubpartition.java:274)
at 
org.apache.flink.runtime.io.network.partition.SpillableSubpartition.releaseMemory(SpillableSubpartition.java:239)
- locked <0x00063fdf4ac8> (a java.util.ArrayDeque)
at 
org.apache.flink.runtime.io.network.partition.ResultPartition.releaseMemory(ResultPartition.java:371)
at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.setNumBuffers(LocalBufferPool.java:375)
at 
org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.redistributeBuffers(NetworkBufferPool.java:408)
at 
org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:297)
- locked <0x00063c785350> (a java.lang.Object)
at 
org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:259)
at 
org.apache.flink.runtime.io.network.NetworkEnvironment.setupInputGate(NetworkEnvironment.java:272)
at 
org.apache.flink.runtime.io.network.NetworkEnvironment.registerTask(NetworkEnvironment.java:224)
- locked <0x00062bf859b8> (a java.lang.Object)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:614)
at java.lang.T

Re: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered when running a large job > 10k tasks

2019-05-17 Thread zhijiang
I already analyzed out this deadlock case based on the codes. FLINK-10491 has 
already solved on place to cause deadlock in SpillableSubpartition, but this is 
a different place to cause this issue.

When source task is trying to release subpartition memory, meanwhile another 
CoGroup task is submitted to trigger source task to release its memory, then it 
might cause deadlock.

I would create a jira ticket for this issue and think how to solve it soon. 
Currently if you still want to use the blocking type, the simple way to avoid 
this is to make only one slot in TM, then there never happen one task triggers 
another task to release memory in the same TM. Or you could increase the 
network buffer setting to work aournd, but not sure this way could work for 
your case because it is up to the total data size the source produced.

Best,
Zhijiang
--
From:Narayanaswamy, Krishna 
Send Time:2019年5月17日(星期五) 17:37
To:Zhijiang(wangzhijiang999) ; Aljoscha Krettek 
; Piotr Nowojski 
Cc:Nico Kruber ; user@flink.apache.org 
; "Chan, Regina" ; "Erai, Rahul" 

Subject:RE: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered 
when running a large job > 10k tasks


We see this JIRA issue (FLINK-10491) as fixed and the fix is present in Flink 
v1.6.4 which we are using now but the problem now seems to come up for 
relatively simpler scenarios as well. Deadlock dump below -

Java stack information for the threads listed above:
===
"CoGroup (2/2)":
at 
org.apache.flink.runtime.io.network.NetworkEnvironment.registerTask(NetworkEnvironment.java:213)
- waiting to lock <0x00062bf859b8> (a java.lang.Object)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:614)
at java.lang.Thread.run(Thread.java:745)
"CoGroup (1/2)":
at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:277)
- waiting to lock <0x00063fdf4888> (a java.util.ArrayDeque)
at 
org.apache.flink.runtime.io.network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
at 
org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
at 
org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
at 
org.apache.flink.runtime.io.network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
at 
org.apache.flink.runtime.io.network.buffer.BufferConsumer.close(BufferConsumer.java:121)
at 
org.apache.flink.runtime.io.network.partition.SpillableSubpartition.spillFinishedBufferConsumers(SpillableSubpartition.java:274)
at 
org.apache.flink.runtime.io.network.partition.SpillableSubpartition.releaseMemory(SpillableSubpartition.java:239)
- locked <0x00063fdf4ac8> (a java.util.ArrayDeque)
at 
org.apache.flink.runtime.io.network.partition.ResultPartition.releaseMemory(ResultPartition.java:371)
at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.setNumBuffers(LocalBufferPool.java:375)
at 
org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.redistributeBuffers(NetworkBufferPool.java:408)
at 
org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:297)
- locked <0x00063c785350> (a java.lang.Object)
at 
org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:259)
at 
org.apache.flink.runtime.io.network.NetworkEnvironment.setupInputGate(NetworkEnvironment.java:272)
at 
org.apache.flink.runtime.io.network.NetworkEnvironment.registerTask(NetworkEnvironment.java:224)
- locked <0x00062bf859b8> (a java.lang.Object)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:614)
at java.lang.Thread.run(Thread.java:745)
"DataSource  (1/1)":
at 
org.apache.flink.runtime.io.network.partition.SpillableSubpartition.releaseMemory(SpillableSubpartition.java:227)
- waiting to lock <0x00063fdf4ac8> (a java.util.ArrayDeque)
at 
org.apache.flink.runtime.io.network.partition.ResultPartition.releaseMemory(ResultPartition.java:371)
at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegment(LocalBufferPool.java:257)
- locked <0x00063fdf4888> (a java.util.ArrayDeque)
at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:218)
at 
org.apache.flink.runtime.io.network.api.writer.RecordWrit

RE: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered when running a large job > 10k tasks

2019-05-17 Thread Narayanaswamy, Krishna
We see this JIRA issue (FLINK-10491) as fixed and the fix is present in Flink 
v1.6.4 which we are using now but the problem now seems to come up for 
relatively simpler scenarios as well. Deadlock dump below -

Java stack information for the threads listed above:
===
"CoGroup (2/2)":
at 
org.apache.flink.runtime.io.network.NetworkEnvironment.registerTask(NetworkEnvironment.java:213)
- waiting to lock <0x00062bf859b8> (a java.lang.Object)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:614)
at java.lang.Thread.run(Thread.java:745)
"CoGroup (1/2)":
at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:277)
- waiting to lock <0x00063fdf4888> (a java.util.ArrayDeque)
at 
org.apache.flink.runtime.io.network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
at 
org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
at 
org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
at 
org.apache.flink.runtime.io.network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
at 
org.apache.flink.runtime.io.network.buffer.BufferConsumer.close(BufferConsumer.java:121)
at 
org.apache.flink.runtime.io.network.partition.SpillableSubpartition.spillFinishedBufferConsumers(SpillableSubpartition.java:274)
at 
org.apache.flink.runtime.io.network.partition.SpillableSubpartition.releaseMemory(SpillableSubpartition.java:239)
- locked <0x00063fdf4ac8> (a java.util.ArrayDeque)
at 
org.apache.flink.runtime.io.network.partition.ResultPartition.releaseMemory(ResultPartition.java:371)
at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.setNumBuffers(LocalBufferPool.java:375)
at 
org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.redistributeBuffers(NetworkBufferPool.java:408)
at 
org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:297)
- locked <0x00063c785350> (a java.lang.Object)
at 
org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:259)
at 
org.apache.flink.runtime.io.network.NetworkEnvironment.setupInputGate(NetworkEnvironment.java:272)
at 
org.apache.flink.runtime.io.network.NetworkEnvironment.registerTask(NetworkEnvironment.java:224)
- locked <0x00062bf859b8> (a java.lang.Object)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:614)
at java.lang.Thread.run(Thread.java:745)
"DataSource  (1/1)":
at 
org.apache.flink.runtime.io.network.partition.SpillableSubpartition.releaseMemory(SpillableSubpartition.java:227)
- waiting to lock <0x00063fdf4ac8> (a java.util.ArrayDeque)
at 
org.apache.flink.runtime.io.network.partition.ResultPartition.releaseMemory(ResultPartition.java:371)
at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegment(LocalBufferPool.java:257)
- locked <0x00063fdf4888> (a java.util.ArrayDeque)
at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:218)
at 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.requestNewBufferBuilder(RecordWriter.java:213)
at 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:144)
at 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:107)
at 
org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
at 
org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
at 
org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:193)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:745)

Found 1 deadlock.

We are not setting any slot sharing parameters since this batch based 
processing so it uses the default (and there don’t seem to be any options 
available to manipulate slot sharing for non-streaming).
If we disable slot sharing (assuming it will be through some config across the 
job) wouldn’t the job become relatively more slower?

Thanks,
Krishna.

From: Zhijiang(wangzhijiang999) 
Sent: Monday, October 08, 2018 1:39 PM
To: Aljoscha Krettek ; Piotr Nowojski 

Cc: Naraya