Re: Serious stability issues when running on YARN (Flink 1.7.0)
In my case the problem seems to happen when a streaming job is recovering with large state. I dont really understand how it could be caused by what you described as that seems to be affecting batch jobs mostly. But I can easily be wrong, maybe there are other implications of the above issue. Gyula qi luo ezt írta (időpont: 2018. dec. 21., P, 3:35): > Hi Gyula, > > Your issue is possibly related to [1] that slots prematurely released. > I’ve raised a PR which is still pending review. > > [1] https://issues.apache.org/jira/browse/FLINK-10941 > > > On Dec 20, 2018, at 9:33 PM, Gyula Fóra wrote: > > Hi! > > Since we have moved to the new execution mode with Flink 1.7.0 we have > observed some pretty bad stability issues with the Yarn execution. > > It's pretty hard to understand what's going on so sorry for the vague > description but here is what seems to happen: > > In some cases when a bigger job fails (lets say 30 taskmanagers, 10 slots > each) and the job tries to recover we can observe taskmanagers start to > fail. > > The errors usually look like this: > > 20181220T141057.132+0100 INFO The heartbeat of TaskManager with id > container_e15_1542798679751_0033_01_21 timed out. > [org.apache.flink.runtime.resourcemanager.ResourceManager$TaskManagerHeartbeatListener$1.run() > @ 1137] > 20181220T141057.133+0100 INFO Closing TaskExecutor connection > container_e15_1542798679751_0033_01_21 because: The heartbeat of > TaskManager with id container_e15_1542798679751_0033_01_21 timed out. > [org.apache.flink.runtime.resourcemanager.ResourceManager.closeTaskManagerConnection() > @ 822] > 20181220T141057.135+0100 INFO Execute processors -> (Filter config stream -> > (Filter Failures, Flat Map), Filter BEA) (168/180) > (3e9c164e4c0594f75c758624815265f1) switched from RUNNING to FAILED. > org.apache.flink.util.FlinkException: The assigned slot > container_e15_1542798679751_0033_01_21_0 was removed. > at > org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.removeSlot(SlotManager.java:893) > at > org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.removeSlots(SlotManager.java:863) > at > org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.internalUnregisterTaskManager(SlotManager.java:1058) > at > org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.unregisterTaskManager(SlotManager.java:385) > at > org.apache.flink.runtime.resourcemanager.ResourceManager.closeTaskManagerConnection(ResourceManager.java:825) > at > org.apache.flink.runtime.resourcemanager.ResourceManager$TaskManagerHeartbeatListener$1.run(ResourceManager.java:1139) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158) > at > org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142) > at > org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40) > at > akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165) > at akka.actor.Actor$class.aroundReceive(Actor.scala:502) > at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526) > at akka.actor.ActorCell.invoke(ActorCell.scala:495) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) > at akka.dispatch.Mailbox.run(Mailbox.scala:224) > at akka.dispatch.Mailbox.exec(Mailbox.scala:234) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > [org.apache.flink.runtime.executiongraph.Execution.transitionState() @ 1342] > > > The job then goes in a restart loop, where taskmanagers come and go, the > UI sometimes displays more than 30 taskmanagers and some extra slots. I > have in some instances seen "GC overhead limit exceeded" during the > recovery which is very strange. > > I suspect there might be something strange happening, maybe some broken > logic in the slot allocations or some memory leak. > > Has anyone observed anything similar so far? > Seems to only affect some of our larger jobs. This hasn't been a problem > in the previous Flink releases where we always used the "legacy" execution > mode. > > Thank you! > Gyula > > >
Getting "ProducerFenced" exception while using flink kafka producer
Hi, I have two flink jobs, both the jobs are using Flink Kafka Producer and Flink Kafka Consumer running in Exactly-Once mode. Parallelism of both the jobs is one. Both the jobs are same in number of operators and type of operators. When we start one job then that job runs fine. But as soon as we start the second job then both the jobs start failing with "ProducerFenced" Exception at runtime. As per our understanding we think that both the jobs get the same value of transactional ID (assigned by flink), that is required to run the job in Exactly-Once mode. I think, Flink calculates the transactional ID with concatenation of operator names and subtask ID. For our case in both the jobs we have same operators, and both jobs runs with parallelism of one, so both jobs get subtask ID. Question: Kindly provide the solution for this exception, Kindly show some light on our understanding the reason of this exception. And kindly tell how can we get different transactional ID for two jobs. Thanks & Regards Gaurav Luthra Mob:- +91-9901945206
buffer pool is destroyed
Hi Flink community, I have a custom source that emits an user-defined data type, BaseEvent. The following code works fine when BaseEvent is not POJO. But, when I changed it to POJO by adding a default constructor, I'm getting "Buffer Pool is destroyed" runtime exception on the Collect method. DataStream eventStream = see.addSource(new AgoraSource(configFile, instance)); DataStream> result_order = eventStream .filter(e -> e instanceof OrderEvent) .map(e -> (OrderEvent)e) .map(e -> new Tuple3<>(e.SecurityID, Long.valueOf(1), Double.valueOf(e.OriginalQuantity))).returns(info_tuple3) .keyBy(e -> e.f0) .timeWindow(Time.seconds(5)) .reduce((a, b) -> new Tuple3<>(a.f0, a.f1 + b.f1, a.f2 + b.f2)) .map(e -> new Tuple4<>(e.f0, e.f1, e.f2, "Order")).returns(info_tuple4); Any idea? Shuang === Please access the attached hyperlink for an important electronic communications disclaimer: http://www.credit-suisse.com/legal/en/disclaimer_email_ib.html ===
Re: Serious stability issues when running on YARN (Flink 1.7.0)
Hi Gyula, Your issue is possibly related to [1] that slots prematurely released. I’ve raised a PR which is still pending review. [1] https://issues.apache.org/jira/browse/FLINK-10941 > On Dec 20, 2018, at 9:33 PM, Gyula Fóra wrote: > > Hi! > > Since we have moved to the new execution mode with Flink 1.7.0 we have > observed some pretty bad stability issues with the Yarn execution. > > It's pretty hard to understand what's going on so sorry for the vague > description but here is what seems to happen: > > In some cases when a bigger job fails (lets say 30 taskmanagers, 10 slots > each) and the job tries to recover we can observe taskmanagers start to fail. > > The errors usually look like this: > 20181220T141057.132+0100 INFO The heartbeat of TaskManager with id > container_e15_1542798679751_0033_01_21 timed out. > [org.apache.flink.runtime.resourcemanager.ResourceManager$TaskManagerHeartbeatListener$1.run() > @ 1137] > 20181220T141057.133+0100 INFO Closing TaskExecutor connection > container_e15_1542798679751_0033_01_21 because: The heartbeat of > TaskManager with id container_e15_1542798679751_0033_01_21 timed out. > [org.apache.flink.runtime.resourcemanager.ResourceManager.closeTaskManagerConnection() > @ 822] > 20181220T141057.135+0100 INFO Execute processors -> (Filter config stream -> > (Filter Failures, Flat Map), Filter BEA) (168/180) > (3e9c164e4c0594f75c758624815265f1) switched from RUNNING to FAILED. > org.apache.flink.util.FlinkException: The assigned slot > container_e15_1542798679751_0033_01_21_0 was removed. > at > org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.removeSlot(SlotManager.java:893) > at > org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.removeSlots(SlotManager.java:863) > at > org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.internalUnregisterTaskManager(SlotManager.java:1058) > at > org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.unregisterTaskManager(SlotManager.java:385) > at > org.apache.flink.runtime.resourcemanager.ResourceManager.closeTaskManagerConnection(ResourceManager.java:825) > at > org.apache.flink.runtime.resourcemanager.ResourceManager$TaskManagerHeartbeatListener$1.run(ResourceManager.java:1139) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158) > at > org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142) > at > org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40) > at > akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165) > at akka.actor.Actor$class.aroundReceive(Actor.scala:502) > at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526) > at akka.actor.ActorCell.invoke(ActorCell.scala:495) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) > at akka.dispatch.Mailbox.run(Mailbox.scala:224) > at akka.dispatch.Mailbox.exec(Mailbox.scala:234) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > [org.apache.flink.runtime.executiongraph.Execution.transitionState() @ 1342] > > The job then goes in a restart loop, where taskmanagers come and go, the UI > sometimes displays more than 30 taskmanagers and some extra slots. I have in > some instances seen "GC overhead limit exceeded" during the recovery which is > very strange. > > I suspect there might be something strange happening, maybe some broken logic > in the slot allocations or some memory leak. > > Has anyone observed anything similar so far? > Seems to only affect some of our larger jobs. This hasn't been a problem in > the previous Flink releases where we always used the "legacy" execution mode. > > Thank you! > Gyula
RE: EXT :Re: Custom S3 endpoint
Yeah, I figured that part out. I’ve tried to make it work with 2.7 and 2.8, and it looks like the prebuilt jars have actually moved to Hadoop 3 From: Paul Lam [mailto:paullin3...@gmail.com] Sent: Tuesday, December 18, 2018 7:08 PM To: Martin, Nick Cc: user@flink.apache.org Subject: EXT :Re: Custom S3 endpoint Hi Nick, What version of Hadoop are you using? AFAIK, you must use Hadoop 2.7+ to support custom s3 endpoint, or the `fs.s3a.endpoint` property in core-site.xml would be ignored. Best, Paul Lam 在 2018年12月19日,06:40,Martin, Nick mailto:nick.mar...@ngc.com>> 写道: I’m working on Flink 1.7.0 and I’m trying to use the built in S3 libraries like readFile(‘s3://bucket/object’) or StreamingFileSink. My storage provider is not AWS, but they implement the same API. So I need to point the S3 client to a different address. The Hadoop documentation shows that there are options in core-site to set that up. The problem is, I can’t seem to get the right dependencies together to use the S3 filesystem. As far as I can tell, the pre-built Hadoop/Presto jars don’t use core-site.xml, and the instructions for manual setup given here (https://ci.apache.org/projects/flink/flink-docs-release-1.7/ops/deployment/aws.html#hadoop-provided-s3-file-systems---manual-setup) list a set of dependencies that seems to be completely wrong. How can use the S3 sources/sinks with a custom http endpoint? Nick Martin Notice: This e-mail is intended solely for use of the individual or entity to which it is addressed and may contain information that is proprietary, privileged and/or exempt from disclosure under applicable law. If the reader is not the intended recipient or agent responsible for delivering the message to the intended recipient, you are hereby notified that any dissemination, distribution or copying of this communication is strictly prohibited. This communication may also contain data subject to U.S. export laws. If so, data subject to the International Traffic in Arms Regulation cannot be disseminated, distributed, transferred, or copied, whether incorporated or in its original form, to foreign nationals residing in the U.S. or abroad, absent the express prior approval of the U.S. Department of State. Data subject to the Export Administration Act may not be disseminated, distributed, transferred or copied contrary to U. S. Department of Commerce regulations. If you have received this communication in error, please notify the sender by reply e-mail and destroy the e-mail message and any physical copies made of the communication. Thank you. * Notice: This e-mail is intended solely for use of the individual or entity to which it is addressed and may contain information that is proprietary, privileged and/or exempt from disclosure under applicable law. If the reader is not the intended recipient or agent responsible for delivering the message to the intended recipient, you are hereby notified that any dissemination, distribution or copying of this communication is strictly prohibited. This communication may also contain data subject to U.S. export laws. If so, data subject to the International Traffic in Arms Regulation cannot be disseminated, distributed, transferred, or copied, whether incorporated or in its original form, to foreign nationals residing in the U.S. or abroad, absent the express prior approval of the U.S. Department of State. Data subject to the Export Administration Act may not be disseminated, distributed, transferred or copied contrary to U. S. Department of Commerce regulations. If you have received this communication in error, please notify the sender by reply e-mail and destroy the e-mail message and any physical copies made of the communication. Thank you. * -- Notice: This e-mail is intended solely for use of the individual or entity to which it is addressed and may contain information that is proprietary, privileged and/or exempt from disclosure under applicable law. If the reader is not the intended recipient or agent responsible for delivering the message to the intended recipient, you are hereby notified that any dissemination, distribution or copying of this communication is strictly prohibited. This communication may also contain data subject to U.S. export laws. If so, data subject to the International Traffic in Arms Regulation cannot be disseminated, distributed, transferred, or copied, whether incorporated or in its original form, to foreign nationals residing in the U.S. or abroad, absent the express prior approval of the U.S. Department of State. Data subject to the Export Administration Act may not be disseminated, distributed, transferred or copied contrary to U. S. Department of Commerce regulations. If you have received this communication in
[ANNOUNCE] Weekly community update #51
Dear community, this is the weekly community update thread #51. Please post any news and updates you want to share with the community to this thread. # Flink Forward China is happening This week the Flink community meets in Beijing for the first Flink Forward China which takes place from the 20th to the 21st of December. Find out more about the given talks and training sessions on the website [0]. # Release voting for 1.5.6, 1.6.3 and 1.7.1 The community is currently voting on three bug fix releases Flink 1.5.6, 1.6.3 and 1.7.1 [1, 2, 3]. Please help the community by trying out the release candidates. # Flink SQL DDL design The Flink SQL DDL design discussion is currently converging and the community tries to define the scope of the MVP [4]. Join the discussion to learn where the journey is headed to. # Improving Flink's Kerberos integration Rong started a discussion on how Flink's Kerberos integration could be improved to better meet enterprise requirements [5]. If you want to share your experiences and voice your opinion what should be supported by Flink wrt Kerberos please join this discussion. # Python and non-JVM language support in Flink Xianda started a discussion on Flink's support of non-JVM languages in particular Python [6]. The discussion revolves around three different strategies: (1) Language portability via Apache Beam; (2) Implement own Python API; (3) Implement own portability layer. Please chime in if you want to make your opinion heard. [0] https://china-2018.flink-forward.org/ [1] https://lists.apache.org/thread.html/dfcb64406bc70d85b6d9aed34638e9099524c1c0de54c3a0e7590aa3@%3Cdev.flink.apache.org%3E [2] https://lists.apache.org/thread.html/4c2b101abc5b443cc16a83e918095d116250ee008ae0299d459943ca@%3Cdev.flink.apache.org%3E [3] https://lists.apache.org/thread.html/e7cc411d0654131d1cacd19448363ceece87f459be364e66f70982a1@%3Cdev.flink.apache.org%3E [4] https://lists.apache.org/thread.html/9e8eaa74b391ac21ca7268475e1179e7965ecd6389b8f5bbf9e7d6e2@%3Cdev.flink.apache.org%3E [5] https://lists.apache.org/thread.html/7ce36d5ecf0e5d0bfc106affc21dec11da514aac16b2f0971f53f60a@%3Cdev.flink.apache.org%3E [6] https://lists.apache.org/thread.html/f6f8116b4b38b0b2d70ed45b990d6bb1bcb33611fde6fdf32ec0e840@%3Cdev.flink.apache.org%3E Cheers, Till
Re: How to test window
Since you define a 15 second window you have to ensure that your source generates at least 15 seconds worth of data; otherwise the window will never fire. Since you do not use event-time your source has to actually run for at least 15 seconds; for this case collection sources will simply not work. You need a custom SourceFunction that emits your data over a 15 + x seconds period. On 20.12.2018 15:12, עדן שרקון wrote: Hey guys, i Incurred in situation and i need you help. im trying Using unit test inorder to check my results, first my timeWindow is set for 15sec, but the assertyEquals doesnt wait for the window getting the answer, so everything is telling me index 0 out of bounds (cuze its didnt get to place my object in the list yet) thank you all! import org.apache.flink.annotation.Public; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.test.util.AbstractTestBase; import org.com.CameraEvent; import org.com.StreamingJob; import org.junit.Test; import java.util.ArrayList; import java.util.Date; import java.util.LinkedList; import java.util.List; import static org.junit.Assert.assertEquals; public class IntergrationTest extends AbstractTestBase { @Test public void test() throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // configure your test environment env.setParallelism(1); // values are collected in a static variable CollectSink.values.clear(); LinkedList events = GenerateEvents(); env.fromCollection(events) .keyBy(new StreamingJob.GetKey()) .timeWindow(Time.seconds(10)) .minBy("dateTime") .addSink(new CollectSink()); env.execute("lala"); assertEquals(events.get(1), CollectSink.values.get(0)); } private static LinkedList GenerateEvents() { LinkedList linkedList; CameraEvent cameraEvent; linkedList = new LinkedList<>(); for (int i = 0; i < 2; i++) { cameraEvent = new CameraEvent("123-123-12", 1, new Date(), "OUT", "CAR"); linkedList.add(cameraEvent); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } return linkedList; } private static class CollectSink implements SinkFunction { // must be static public static final List values = new ArrayList<>(); @Override public synchronized void invoke(CameraEvent value) throws Exception { values.add(value); } } }
How to test window
Hey guys, i Incurred in situation and i need you help. im trying Using unit test inorder to check my results, first my timeWindow is set for 15sec, but the assertyEquals doesnt wait for the window getting the answer, so everything is telling me index 0 out of bounds (cuze its didnt get to place my object in the list yet) thank you all! import org.apache.flink.annotation.Public; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.test.util.AbstractTestBase; import org.com.CameraEvent; import org.com.StreamingJob; import org.junit.Test; import java.util.ArrayList; import java.util.Date; import java.util.LinkedList; import java.util.List; import static org.junit.Assert.assertEquals; public class IntergrationTest extends AbstractTestBase { @Test public void test() throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // configure your test environment env.setParallelism(1); // values are collected in a static variable CollectSink.values.clear(); LinkedList events = GenerateEvents(); env.fromCollection(events) .keyBy(new StreamingJob.GetKey()) .timeWindow(Time.seconds(10)) .minBy("dateTime") .addSink(new CollectSink()); env.execute("lala"); assertEquals(events.get(1), CollectSink.values.get(0)); } private static LinkedList GenerateEvents() { LinkedList linkedList; CameraEvent cameraEvent; linkedList = new LinkedList<>(); for (int i = 0; i < 2; i++) { cameraEvent = new CameraEvent("123-123-12", 1, new Date(), "OUT", "CAR"); linkedList.add(cameraEvent); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } return linkedList; } private static class CollectSink implements SinkFunction { // must be static public static final List values = new ArrayList<>(); @Override public synchronized void invoke(CameraEvent value) throws Exception { values.add(value); } } }
Serious stability issues when running on YARN (Flink 1.7.0)
Hi! Since we have moved to the new execution mode with Flink 1.7.0 we have observed some pretty bad stability issues with the Yarn execution. It's pretty hard to understand what's going on so sorry for the vague description but here is what seems to happen: In some cases when a bigger job fails (lets say 30 taskmanagers, 10 slots each) and the job tries to recover we can observe taskmanagers start to fail. The errors usually look like this: 20181220T141057.132+0100 INFO The heartbeat of TaskManager with id container_e15_1542798679751_0033_01_21 timed out. [org.apache.flink.runtime.resourcemanager.ResourceManager$TaskManagerHeartbeatListener$1.run() @ 1137] 20181220T141057.133+0100 INFO Closing TaskExecutor connection container_e15_1542798679751_0033_01_21 because: The heartbeat of TaskManager with id container_e15_1542798679751_0033_01_21 timed out. [org.apache.flink.runtime.resourcemanager.ResourceManager.closeTaskManagerConnection() @ 822] 20181220T141057.135+0100 INFO Execute processors -> (Filter config stream -> (Filter Failures, Flat Map), Filter BEA) (168/180) (3e9c164e4c0594f75c758624815265f1) switched from RUNNING to FAILED. org.apache.flink.util.FlinkException: The assigned slot container_e15_1542798679751_0033_01_21_0 was removed. at org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.removeSlot(SlotManager.java:893) at org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.removeSlots(SlotManager.java:863) at org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.internalUnregisterTaskManager(SlotManager.java:1058) at org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.unregisterTaskManager(SlotManager.java:385) at org.apache.flink.runtime.resourcemanager.ResourceManager.closeTaskManagerConnection(ResourceManager.java:825) at org.apache.flink.runtime.resourcemanager.ResourceManager$TaskManagerHeartbeatListener$1.run(ResourceManager.java:1139) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40) at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165) at akka.actor.Actor$class.aroundReceive(Actor.scala:502) at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526) at akka.actor.ActorCell.invoke(ActorCell.scala:495) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) at akka.dispatch.Mailbox.run(Mailbox.scala:224) at akka.dispatch.Mailbox.exec(Mailbox.scala:234) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [org.apache.flink.runtime.executiongraph.Execution.transitionState() @ 1342] The job then goes in a restart loop, where taskmanagers come and go, the UI sometimes displays more than 30 taskmanagers and some extra slots. I have in some instances seen "GC overhead limit exceeded" during the recovery which is very strange. I suspect there might be something strange happening, maybe some broken logic in the slot allocations or some memory leak. Has anyone observed anything similar so far? Seems to only affect some of our larger jobs. This hasn't been a problem in the previous Flink releases where we always used the "legacy" execution mode. Thank you! Gyula
Use s3 on Flink on kubernetes
How can I easiest use s3 from a Flink job deployed in a session cluster on kubernetes? I've tried including the flink-s3-fs-hadoop dependency in the sbt file for my job, can I programmatically set the properties to point to it? Is there a ready-made docker image for a flink with s3 dependencies configured to deploy in the jobmanager/taskmanager deployments?