Re: Serious stability issues when running on YARN (Flink 1.7.0)

2018-12-20 Thread Gyula Fóra
In my case the problem seems to happen when a streaming job is recovering
with large state. I dont really understand how it could be caused by what
you described as that seems to be affecting batch jobs mostly.

But I can easily be wrong, maybe there are other implications of the above
issue.

Gyula

qi luo  ezt írta (időpont: 2018. dec. 21., P, 3:35):

> Hi Gyula,
>
> Your issue is possibly related to [1] that slots prematurely released.
> I’ve raised a PR which is still pending review.
>
> [1] https://issues.apache.org/jira/browse/FLINK-10941
>
>
> On Dec 20, 2018, at 9:33 PM, Gyula Fóra  wrote:
>
> Hi!
>
> Since we have moved to the new execution mode with Flink 1.7.0 we have
> observed some pretty bad stability issues with the Yarn execution.
>
> It's pretty hard to understand what's going on so sorry for the vague
> description but here is what seems to happen:
>
> In some cases when a bigger job fails (lets say 30 taskmanagers, 10 slots
> each) and the job tries to recover we can observe taskmanagers start to
> fail.
>
> The errors usually look like this:
>
> 20181220T141057.132+0100  INFO The heartbeat of TaskManager with id 
> container_e15_1542798679751_0033_01_21 timed out.  
> [org.apache.flink.runtime.resourcemanager.ResourceManager$TaskManagerHeartbeatListener$1.run()
>  @ 1137]
> 20181220T141057.133+0100  INFO Closing TaskExecutor connection 
> container_e15_1542798679751_0033_01_21 because: The heartbeat of 
> TaskManager with id container_e15_1542798679751_0033_01_21  timed out.  
> [org.apache.flink.runtime.resourcemanager.ResourceManager.closeTaskManagerConnection()
>  @ 822]
> 20181220T141057.135+0100  INFO Execute processors -> (Filter config stream -> 
> (Filter Failures, Flat Map), Filter BEA) (168/180) 
> (3e9c164e4c0594f75c758624815265f1) switched from RUNNING to FAILED. 
> org.apache.flink.util.FlinkException: The assigned slot 
> container_e15_1542798679751_0033_01_21_0 was removed.
>   at 
> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.removeSlot(SlotManager.java:893)
>   at 
> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.removeSlots(SlotManager.java:863)
>   at 
> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.internalUnregisterTaskManager(SlotManager.java:1058)
>   at 
> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.unregisterTaskManager(SlotManager.java:385)
>   at 
> org.apache.flink.runtime.resourcemanager.ResourceManager.closeTaskManagerConnection(ResourceManager.java:825)
>   at 
> org.apache.flink.runtime.resourcemanager.ResourceManager$TaskManagerHeartbeatListener$1.run(ResourceManager.java:1139)
>   at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332)
>   at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158)
>   at 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
>   at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
>   at 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
>   at 
> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
>   at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>   at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
>   at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>   at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>   at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>   at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>   at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>  [org.apache.flink.runtime.executiongraph.Execution.transitionState() @ 1342]
>
>
> The job then goes in a restart loop, where taskmanagers come and go, the
> UI sometimes displays more than 30 taskmanagers and some extra slots. I
> have in some instances seen "GC overhead limit exceeded" during the
> recovery which is very strange.
>
> I suspect there might be something strange happening, maybe some broken
> logic in the slot allocations or some memory leak.
>
> Has anyone observed anything similar so far?
> Seems to only affect some of our larger jobs. This hasn't been a problem
> in the previous Flink releases where we always used the "legacy" execution
> mode.
>
> Thank you!
> Gyula
>
>
>


Getting "ProducerFenced" exception while using flink kafka producer

2018-12-20 Thread Gaurav Luthra
Hi,

I have two flink jobs, both the jobs are using Flink Kafka Producer and
Flink Kafka Consumer running in Exactly-Once mode.
Parallelism of both the jobs is one.
Both the jobs are same in number of operators and type of operators.

When we start one job then that job runs fine. But as soon as we start the
second job then both the jobs start failing with "ProducerFenced" Exception
at runtime.

As per our understanding we think that both the jobs get the same value of
transactional ID (assigned by flink), that is required to run the job in
Exactly-Once mode.

I think, Flink calculates the transactional ID with concatenation of
operator names and subtask ID. For our case in both the jobs we have same
operators, and both jobs runs with parallelism of one, so both jobs get
subtask ID.

Question:
Kindly provide the solution for this exception, Kindly show some light on
our understanding the reason of this exception. And kindly tell how can we
get different transactional ID for two jobs.
Thanks & Regards
Gaurav Luthra
Mob:- +91-9901945206


buffer pool is destroyed

2018-12-20 Thread Chan, Shuang
Hi Flink community,

I have a custom source that emits an user-defined data type, BaseEvent.  The 
following code works fine when BaseEvent is not POJO.
But, when I changed it to POJO by adding a default constructor, I'm getting 
"Buffer Pool is destroyed" runtime exception on the Collect method.

DataStream eventStream = see.addSource(new 
AgoraSource(configFile, instance));

DataStream> result_order = 
eventStream
.filter(e -> e instanceof OrderEvent)
.map(e -> (OrderEvent)e)
.map(e -> new Tuple3<>(e.SecurityID, Long.valueOf(1), 
Double.valueOf(e.OriginalQuantity))).returns(info_tuple3)
.keyBy(e -> e.f0)
.timeWindow(Time.seconds(5))
.reduce((a, b) -> new Tuple3<>(a.f0, a.f1 + b.f1, a.f2 + 
b.f2))
.map(e -> new Tuple4<>(e.f0, e.f1, e.f2, 
"Order")).returns(info_tuple4);

Any idea?

Shuang

=== 
Please access the attached hyperlink for an important electronic communications 
disclaimer: 
http://www.credit-suisse.com/legal/en/disclaimer_email_ib.html 
=== 


Re: Serious stability issues when running on YARN (Flink 1.7.0)

2018-12-20 Thread qi luo
Hi Gyula,

Your issue is possibly related to [1] that slots prematurely released. I’ve 
raised a PR which is still pending review.

[1] https://issues.apache.org/jira/browse/FLINK-10941

> On Dec 20, 2018, at 9:33 PM, Gyula Fóra  wrote:
> 
> Hi!
> 
> Since we have moved to the new execution mode with Flink 1.7.0 we have 
> observed some pretty bad stability issues with the Yarn execution. 
> 
> It's pretty hard to understand what's going on so sorry for the vague 
> description but here is what seems to happen:
> 
> In some cases when a bigger job fails (lets say 30 taskmanagers, 10 slots 
> each) and the job tries to recover we can observe taskmanagers start to fail.
> 
> The errors usually look like this:
> 20181220T141057.132+0100  INFO The heartbeat of TaskManager with id 
> container_e15_1542798679751_0033_01_21 timed out.  
> [org.apache.flink.runtime.resourcemanager.ResourceManager$TaskManagerHeartbeatListener$1.run()
>  @ 1137]
> 20181220T141057.133+0100  INFO Closing TaskExecutor connection 
> container_e15_1542798679751_0033_01_21 because: The heartbeat of 
> TaskManager with id container_e15_1542798679751_0033_01_21  timed out.  
> [org.apache.flink.runtime.resourcemanager.ResourceManager.closeTaskManagerConnection()
>  @ 822]
> 20181220T141057.135+0100  INFO Execute processors -> (Filter config stream -> 
> (Filter Failures, Flat Map), Filter BEA) (168/180) 
> (3e9c164e4c0594f75c758624815265f1) switched from RUNNING to FAILED. 
> org.apache.flink.util.FlinkException: The assigned slot 
> container_e15_1542798679751_0033_01_21_0 was removed.
>   at 
> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.removeSlot(SlotManager.java:893)
>   at 
> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.removeSlots(SlotManager.java:863)
>   at 
> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.internalUnregisterTaskManager(SlotManager.java:1058)
>   at 
> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.unregisterTaskManager(SlotManager.java:385)
>   at 
> org.apache.flink.runtime.resourcemanager.ResourceManager.closeTaskManagerConnection(ResourceManager.java:825)
>   at 
> org.apache.flink.runtime.resourcemanager.ResourceManager$TaskManagerHeartbeatListener$1.run(ResourceManager.java:1139)
>   at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332)
>   at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158)
>   at 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
>   at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
>   at 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
>   at 
> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
>   at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>   at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
>   at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>   at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>   at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>   at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>   at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>  [org.apache.flink.runtime.executiongraph.Execution.transitionState() @ 1342]
> 
> The job then goes in a restart loop, where taskmanagers come and go, the UI 
> sometimes displays more than 30 taskmanagers and some extra slots. I have in 
> some instances seen "GC overhead limit exceeded" during the recovery which is 
> very strange.
> 
> I suspect there might be something strange happening, maybe some broken logic 
> in the slot allocations or some memory leak. 
> 
> Has anyone observed anything similar so far?
> Seems to only affect some of our larger jobs. This hasn't been a problem in 
> the previous Flink releases where we always used the "legacy" execution mode.
> 
> Thank you!
> Gyula



RE: EXT :Re: Custom S3 endpoint

2018-12-20 Thread Martin, Nick
Yeah, I figured that part out. I’ve tried to make it work with 2.7 and 2.8, and 
it looks like the prebuilt jars have actually moved to Hadoop 3

From: Paul Lam [mailto:paullin3...@gmail.com]
Sent: Tuesday, December 18, 2018 7:08 PM
To: Martin, Nick 
Cc: user@flink.apache.org
Subject: EXT :Re: Custom S3 endpoint

Hi Nick,

What version of Hadoop are you using? AFAIK, you must use Hadoop 2.7+ to 
support custom s3 endpoint, or the `fs.s3a.endpoint` property in core-site.xml 
would be ignored.

Best,
Paul Lam


在 2018年12月19日,06:40,Martin, Nick 
mailto:nick.mar...@ngc.com>> 写道:

I’m working on Flink 1.7.0 and I’m trying to use the built in S3 libraries like 
readFile(‘s3://bucket/object’) or 
StreamingFileSink. My storage provider is not AWS, but they implement the same 
API. So I need to point the S3 client to a different address. The Hadoop 
documentation shows that there are options in core-site to set that up. The 
problem is, I can’t seem to get the right dependencies together to use the S3 
filesystem. As far as I can tell, the pre-built Hadoop/Presto jars don’t use 
core-site.xml, and the instructions for manual setup given here 
(https://ci.apache.org/projects/flink/flink-docs-release-1.7/ops/deployment/aws.html#hadoop-provided-s3-file-systems---manual-setup)
 list a set of dependencies that seems to be completely wrong.

How can use the S3 sources/sinks with a custom http endpoint?



Nick Martin



Notice: This e-mail is intended solely for use of the individual or entity to 
which it is addressed and may contain information that is proprietary, 
privileged and/or exempt from disclosure under applicable law. If the reader is 
not the intended recipient or agent responsible for delivering the message to 
the intended recipient, you are hereby notified that any dissemination, 
distribution or copying of this communication is strictly prohibited. This 
communication may also contain data subject to U.S. export laws. If so, data 
subject to the International Traffic in Arms Regulation cannot be disseminated, 
distributed, transferred, or copied, whether incorporated or in its original 
form, to foreign nationals residing in the U.S. or abroad, absent the express 
prior approval of the U.S. Department of State. Data subject to the Export 
Administration Act may not be disseminated, distributed, transferred or copied 
contrary to U. S. Department of Commerce regulations. If you have received this 
communication in error, please notify the sender by reply e-mail and destroy 
the e-mail message and any physical copies made of the communication.
 Thank you.
*



Notice: This e-mail is intended solely for use of the individual or entity to 
which it is addressed and may contain information that is proprietary, 
privileged and/or exempt from disclosure under applicable law. If the reader is 
not the intended recipient or agent responsible for delivering the message to 
the intended recipient, you are hereby notified that any dissemination, 
distribution or copying of this communication is strictly prohibited. This 
communication may also contain data subject to U.S. export laws. If so, data 
subject to the International Traffic in Arms Regulation cannot be disseminated, 
distributed, transferred, or copied, whether incorporated or in its original 
form, to foreign nationals residing in the U.S. or abroad, absent the express 
prior approval of the U.S. Department of State. Data subject to the Export 
Administration Act may not be disseminated, distributed, transferred or copied 
contrary to U. S. Department of Commerce regulations. If you have received this 
communication in error, please notify the sender by reply e-mail and destroy 
the e-mail message and any physical copies made of the communication.
 Thank you.
*


--

Notice: This e-mail is intended solely for use of the individual or entity to 
which it is addressed and may contain information that is proprietary, 
privileged and/or exempt from disclosure under applicable law. If the reader is 
not the intended recipient or agent responsible for delivering the message to 
the intended recipient, you are hereby notified that any dissemination, 
distribution or copying of this communication is strictly prohibited. This 
communication may also contain data subject to U.S. export laws. If so, data 
subject to the International Traffic in Arms Regulation cannot be disseminated, 
distributed, transferred, or copied, whether incorporated or in its original 
form, to foreign nationals residing in the U.S. or abroad, absent the express 
prior approval of the U.S. Department of State. Data subject to the Export 
Administration Act may not be disseminated, distributed, transferred or copied 
contrary to U. S. Department of Commerce regulations. If you have received this 
communication in 

[ANNOUNCE] Weekly community update #51

2018-12-20 Thread Till Rohrmann
Dear community,

this is the weekly community update thread #51. Please post any news and
updates you want to share with the community to this thread.

# Flink Forward China is happening

This week the Flink community meets in Beijing for the first Flink Forward
China which takes place from the 20th to the 21st of December. Find out
more about the given talks and training sessions on the website [0].

# Release voting for 1.5.6, 1.6.3 and 1.7.1

The community is currently voting on three bug fix releases Flink 1.5.6,
1.6.3 and 1.7.1 [1, 2, 3]. Please help the community by trying out the
release candidates.

# Flink SQL DDL design

The Flink SQL DDL design discussion is currently converging and the
community tries to define the scope of the MVP [4]. Join the discussion to
learn where the journey is headed to.

# Improving Flink's Kerberos integration

Rong started a discussion on how Flink's Kerberos integration could be
improved to better meet enterprise requirements [5]. If you want to share
your experiences and voice your opinion what should be supported by Flink
wrt Kerberos please join this discussion.

# Python and non-JVM language support in Flink

Xianda started a discussion on Flink's support of non-JVM languages in
particular Python [6]. The discussion revolves around three different
strategies: (1) Language portability via Apache Beam; (2) Implement own
Python API; (3) Implement own portability layer. Please chime in if you
want to make your opinion heard.

[0] https://china-2018.flink-forward.org/
[1]
https://lists.apache.org/thread.html/dfcb64406bc70d85b6d9aed34638e9099524c1c0de54c3a0e7590aa3@%3Cdev.flink.apache.org%3E
[2]
https://lists.apache.org/thread.html/4c2b101abc5b443cc16a83e918095d116250ee008ae0299d459943ca@%3Cdev.flink.apache.org%3E
[3]
https://lists.apache.org/thread.html/e7cc411d0654131d1cacd19448363ceece87f459be364e66f70982a1@%3Cdev.flink.apache.org%3E
[4]
https://lists.apache.org/thread.html/9e8eaa74b391ac21ca7268475e1179e7965ecd6389b8f5bbf9e7d6e2@%3Cdev.flink.apache.org%3E
[5]
https://lists.apache.org/thread.html/7ce36d5ecf0e5d0bfc106affc21dec11da514aac16b2f0971f53f60a@%3Cdev.flink.apache.org%3E
[6]
https://lists.apache.org/thread.html/f6f8116b4b38b0b2d70ed45b990d6bb1bcb33611fde6fdf32ec0e840@%3Cdev.flink.apache.org%3E

Cheers,
Till


Re: How to test window

2018-12-20 Thread Chesnay Schepler
Since you define a 15 second window you have to ensure that your source 
generates at least 15 seconds worth of data; otherwise the window will 
never fire.
Since you do not use event-time your source has to actually run for at 
least 15 seconds; for this case collection sources will simply not work. 
You need a custom SourceFunction that emits your data over a 15 + x 
seconds period.


On 20.12.2018 15:12, עדן שרקון wrote:

Hey guys,
i Incurred in situation and i need you help.

im trying Using unit test inorder to check my results,
first my  timeWindow is set for 15sec, but the assertyEquals doesnt 
wait for the window getting the answer,


so everything is telling me index 0 out of bounds (cuze its didnt get 
to place my object in the list yet)


thank you all!

import org.apache.flink.annotation.Public;
import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.test.util.AbstractTestBase;
import org.com.CameraEvent;
import org.com.StreamingJob;
import org.junit.Test;
import java.util.ArrayList;
import java.util.Date;
import java.util.LinkedList;
import java.util.List;
import static org.junit.Assert.assertEquals;
public class IntergrationTest extends AbstractTestBase {
@Test
public void test() throws Exception {
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();

// configure your test environment
env.setParallelism(1);
// values are collected in a static variable
CollectSink.values.clear();
LinkedList events = GenerateEvents();
env.fromCollection(events)
.keyBy(new StreamingJob.GetKey())
.timeWindow(Time.seconds(10))
.minBy("dateTime")
.addSink(new CollectSink());
env.execute("lala");
assertEquals(events.get(1), CollectSink.values.get(0));
}
private static LinkedList GenerateEvents() {
LinkedList linkedList;
CameraEvent cameraEvent;
linkedList = new LinkedList<>();
for (int i = 0; i < 2; i++) {
cameraEvent = new CameraEvent("123-123-12", 1, new Date(), "OUT", "CAR");
linkedList.add(cameraEvent);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
return linkedList;
}
private static class CollectSink implements SinkFunction {
// must be static
public static final List values = new ArrayList<>();
@Override
public synchronized void invoke(CameraEvent value) throws Exception {
values.add(value);
}
}
}





How to test window

2018-12-20 Thread עדן שרקון
Hey guys,
i Incurred in situation and i need you help.

im trying Using unit test inorder to check my results,
first my  timeWindow is set for 15sec, but the assertyEquals doesnt wait
for the window getting the answer,

so everything is telling me index 0 out of bounds (cuze its didnt get to
place my object in the list yet)

thank you all!

 import org.apache.flink.annotation.Public;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.test.util.AbstractTestBase;
import org.com.CameraEvent;
import org.com.StreamingJob;
import org.junit.Test;

import java.util.ArrayList;
import java.util.Date;
import java.util.LinkedList;
import java.util.List;

import static org.junit.Assert.assertEquals;

public class IntergrationTest extends AbstractTestBase {



@Test
public void test() throws Exception {

StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();

// configure your test environment
env.setParallelism(1);

// values are collected in a static variable
CollectSink.values.clear();
LinkedList events = GenerateEvents();
env.fromCollection(events)
.keyBy(new StreamingJob.GetKey())
.timeWindow(Time.seconds(10))
.minBy("dateTime")
.addSink(new CollectSink());

env.execute("lala");
   assertEquals(events.get(1), CollectSink.values.get(0));
}

   private static LinkedList GenerateEvents() {
LinkedList linkedList;
CameraEvent cameraEvent;
linkedList = new LinkedList<>();
for (int i = 0; i < 2; i++) {

cameraEvent = new CameraEvent("123-123-12", 1, new Date(),
"OUT", "CAR");

linkedList.add(cameraEvent);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
return linkedList;
}
private static class CollectSink implements SinkFunction {

// must be static
public static final List values = new ArrayList<>();

@Override
public synchronized void invoke(CameraEvent value) throws Exception {
values.add(value);
}
}


}


Serious stability issues when running on YARN (Flink 1.7.0)

2018-12-20 Thread Gyula Fóra
Hi!

Since we have moved to the new execution mode with Flink 1.7.0 we have
observed some pretty bad stability issues with the Yarn execution.

It's pretty hard to understand what's going on so sorry for the vague
description but here is what seems to happen:

In some cases when a bigger job fails (lets say 30 taskmanagers, 10 slots
each) and the job tries to recover we can observe taskmanagers start to
fail.

The errors usually look like this:

20181220T141057.132+0100  INFO The heartbeat of TaskManager with id
container_e15_1542798679751_0033_01_21 timed out.
[org.apache.flink.runtime.resourcemanager.ResourceManager$TaskManagerHeartbeatListener$1.run()
@ 1137]
20181220T141057.133+0100  INFO Closing TaskExecutor connection
container_e15_1542798679751_0033_01_21 because: The heartbeat of
TaskManager with id container_e15_1542798679751_0033_01_21  timed
out.  
[org.apache.flink.runtime.resourcemanager.ResourceManager.closeTaskManagerConnection()
@ 822]
20181220T141057.135+0100  INFO Execute processors -> (Filter config
stream -> (Filter Failures, Flat Map), Filter BEA) (168/180)
(3e9c164e4c0594f75c758624815265f1) switched from RUNNING to FAILED.
org.apache.flink.util.FlinkException: The assigned slot
container_e15_1542798679751_0033_01_21_0 was removed.
at 
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.removeSlot(SlotManager.java:893)
at 
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.removeSlots(SlotManager.java:863)
at 
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.internalUnregisterTaskManager(SlotManager.java:1058)
at 
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.unregisterTaskManager(SlotManager.java:385)
at 
org.apache.flink.runtime.resourcemanager.ResourceManager.closeTaskManagerConnection(ResourceManager.java:825)
at 
org.apache.flink.runtime.resourcemanager.ResourceManager$TaskManagerHeartbeatListener$1.run(ResourceManager.java:1139)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158)
at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
at 
akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
at akka.actor.ActorCell.invoke(ActorCell.scala:495)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
at akka.dispatch.Mailbox.run(Mailbox.scala:224)
at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
 [org.apache.flink.runtime.executiongraph.Execution.transitionState() @ 1342]


The job then goes in a restart loop, where taskmanagers come and go, the UI
sometimes displays more than 30 taskmanagers and some extra slots. I have
in some instances seen "GC overhead limit exceeded" during the recovery
which is very strange.

I suspect there might be something strange happening, maybe some broken
logic in the slot allocations or some memory leak.

Has anyone observed anything similar so far?
Seems to only affect some of our larger jobs. This hasn't been a problem in
the previous Flink releases where we always used the "legacy" execution
mode.

Thank you!
Gyula


Use s3 on Flink on kubernetes

2018-12-20 Thread William Saar

How can I easiest use s3 from a Flink job deployed in a session
cluster on kubernetes? 

I've tried including the flink-s3-fs-hadoop dependency in the sbt file
for my job, can I programmatically set the properties to point to it?
Is there a ready-made docker image for a flink with s3 dependencies
configured to deploy in the jobmanager/taskmanager deployments?