Re: Status.JVM.Memory.Heap.Used metric shows only a few megabytes but profiling shows gigabytes (as expected)
Thanks, next time I'll check in in jira better. On Tue, Mar 19, 2019 at 6:27 PM Chesnay Schepler wrote: > Known issue, fixed in 1.7.3/1.8.0: > https://issues.apache.org/jira/browse/FLINK-11183 > > On 19.03.2019 15:03, gerardg wrote: > > Hi, > > > > Before Flink 1.7.0 we were getting correct values in > > Status.JVM.Memory.Heap.Used metric. Since updating we just see a constant > > small value (just a few megabytes), did something change? > > > > Gerard > > > > > > > > -- > > Sent from: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ > > > >
Re: Timestamp synchronized message consumption across kafka partitions
I'll answer myself. I guess the most viable option for now is to wait for the work in http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Sharing-state-between-subtasks-td24489.html On Thu, Mar 7, 2019, 3:24 PM gerardg wrote: > I'm wondering if there is a way to avoid consuming too fast from partitions > that not have as much data as the other ones in the same topic by keeping > them more or less synchronized by its ingestion timestamp. Similar to what > kafka streams does: > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-353%3A+Improve+Kafka+Streams+Timestamp+Synchronization > > We are having an issue where partitions with less data are consumed very > fast which creates a lot of windows that can't be triggered until the > partitions with more data are consumed and the watermark gets advanced. It > seems that this issue should be quite common but we can't seem to find any > standard solution to it. Maybe is just that our partitions are too > unbalanced but still, without having a way to bound the skew between > partition (for example when processing accumulated data) it seems like a > potential source of problems. > > Anyone have an idea or suggestion to deal with this issue? > > Thanks, > > Gerard > > > > -- > Sent from: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ >
Re: Unbalanced Kafka consumer consumption
We finally figure it out. We had a large value in the Kafka consumer option 'max.partition.fetch.bytes', this made the KafkaConsumer to not consume at a balanced rate from all partitions. Gerard
Re: Unbalanced Kafka consumer consumption
HI Till, sorry for the late reply, I was on holidays and couldn't follow up the issue. 1. Flink 1.6.1, Kafka 1.1.0 2. The topic has 64 partitions. We don't have so many slots available but we could try this. 3. Yes, they are running in different nodes 4. I meant that until the operator that is represented by its lag (meaning the amount of messages to consume) with a blue line (in the background) finishes consuming all its pending messages the rate does not increase. Yes, the problem appears when there is pending data (persisted) because it can keep with the incoming rate or the app has been stopped. 5. Yes. I guess you mean numRecordsOutPerSecond. We will monitor this to have data the next time it happens. I'll try to reduce the number of partitions so they can be assigned one per source task and see how it behaves. Thanks, Gerard On Wed, Nov 7, 2018 at 5:38 PM Till Rohrmann wrote: > Hi Gerard, > > the behaviour you are describing sounds odd to me. I have a couple of > questions: > > 1. Which Flink and Kafka version are you using? > 2. How many partitions do you have? --> Try to set the parallelism of your > job to the number of partitions. That way, you will have one partition per > source task. > 3. How are the source operators distributed? Are they running on different > nodes? > 4. What do you mean with "until it (the blue one) was finished consuming > the partition"? I assume that you don't ingest into the Kafka topic live > but want to read persisted data. > 5. Are you using Flink's metrics to monitor the different source tasks? > Check what the source operator's output rate is (should be visible from the > web UI). > > Cheers, > Till > > On Tue, Oct 30, 2018 at 10:27 AM Gerard Garcia wrote: > >> I think my problem is not the same, yours is that you want to consume >> from partitions with more data faster to avoid consuming first the one with >> less elements which could advance the event time too fast. Mine is that >> Kafka only consumes from some partitions even if it seems that it has >> resources to read and process from all of them at the same time. >> >> Gerard >> >> On Tue, Oct 30, 2018 at 9:36 AM bupt_ljy wrote: >> >>> Hi, >>> >>>If I understand your problem correctly, there is a similar JIRA >>> issue FLINK-10348, reported by me. Maybe you can take a look at it. >>> >>> >>> Jiayi Liao,Best >>> >>> Original Message >>> *Sender:* Gerard Garcia >>> *Recipient:* fearsome.lucidity >>> *Cc:* user >>> *Date:* Monday, Oct 29, 2018 17:50 >>> *Subject:* Re: Unbalanced Kafka consumer consumption >>> >>> The stream is partitioned by key after ingestion at the finest >>> granularity that we can (which is finer than how stream is partitioned when >>> produced to kafka). It is not perfectly balanced but still is not so >>> unbalanced to show this behavior (more balanced than what the lag images >>> show). >>> >>> Anyway, let's assume that the problem is that the stream is so >>> unbalanced that one operator subtask can't handle the ingestion rate. It is >>> expected then that all the others operators reduce its ingestion rate even >>> if they have resources to spare? The task is configured with processing >>> time and there are no windows. If that is the case, is there a way to let >>> operator subtasks process freely even if one of them is causing back >>> pressure upstream? >>> >>> The attached images shows how Kafka lag increases while the throughput >>> is stable until some operator subtasks finish. >>> >>> Thanks, >>> >>> Gerard >>> >>> On Fri, Oct 26, 2018 at 8:09 PM Elias Levy >>> wrote: >>> >>>> You can always shuffle the stream generated by the Kafka source >>>> (dataStream.shuffle()) to evenly distribute records downstream. >>>> >>>> On Fri, Oct 26, 2018 at 2:08 AM gerardg wrote: >>>> >>>>> Hi, >>>>> >>>>> We are experience issues scaling our Flink application and we have >>>>> observed >>>>> that it may be because Kafka messages consumption is not balanced >>>>> across >>>>> partitions. The attached image (lag per partition) shows how only one >>>>> partition consumes messages (the blue one in the back) and it wasn't >>>>> until >>>>> it finished that the other ones started to consume at a good rate >>>>> (actually >>>>> the total throughput multiplied b
Re: Unbalanced Kafka consumer consumption
I think my problem is not the same, yours is that you want to consume from partitions with more data faster to avoid consuming first the one with less elements which could advance the event time too fast. Mine is that Kafka only consumes from some partitions even if it seems that it has resources to read and process from all of them at the same time. Gerard On Tue, Oct 30, 2018 at 9:36 AM bupt_ljy wrote: > Hi, > >If I understand your problem correctly, there is a similar JIRA > issue FLINK-10348, reported by me. Maybe you can take a look at it. > > > Jiayi Liao,Best > > Original Message > *Sender:* Gerard Garcia > *Recipient:* fearsome.lucidity > *Cc:* user > *Date:* Monday, Oct 29, 2018 17:50 > *Subject:* Re: Unbalanced Kafka consumer consumption > > The stream is partitioned by key after ingestion at the finest granularity > that we can (which is finer than how stream is partitioned when produced to > kafka). It is not perfectly balanced but still is not so unbalanced to show > this behavior (more balanced than what the lag images show). > > Anyway, let's assume that the problem is that the stream is so unbalanced > that one operator subtask can't handle the ingestion rate. It is expected > then that all the others operators reduce its ingestion rate even if they > have resources to spare? The task is configured with processing time and > there are no windows. If that is the case, is there a way to let operator > subtasks process freely even if one of them is causing back pressure > upstream? > > The attached images shows how Kafka lag increases while the throughput is > stable until some operator subtasks finish. > > Thanks, > > Gerard > > On Fri, Oct 26, 2018 at 8:09 PM Elias Levy > wrote: > >> You can always shuffle the stream generated by the Kafka source >> (dataStream.shuffle()) to evenly distribute records downstream. >> >> On Fri, Oct 26, 2018 at 2:08 AM gerardg wrote: >> >>> Hi, >>> >>> We are experience issues scaling our Flink application and we have >>> observed >>> that it may be because Kafka messages consumption is not balanced across >>> partitions. The attached image (lag per partition) shows how only one >>> partition consumes messages (the blue one in the back) and it wasn't >>> until >>> it finished that the other ones started to consume at a good rate >>> (actually >>> the total throughput multiplied by 4 when these started) . Also, when >>> that >>> ones started to consume, one partition just stopped an accumulated >>> messages >>> back again until they finished. >>> >>> We don't see any resource (CPU, network, disk..) struggling in our >>> cluster >>> so we are not sure what could be causing this behavior. I can only assume >>> that somehow Flink or the Kafka consumer is artificially slowing down the >>> other partitions. Maybe due to how back pressure is handled? >>> >>> < >>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t1007/consumer_max_lag.png> >>> >>> >>> Gerard >>> >>> >>> >>> >>> >>> -- >>> Sent from: >>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ >>> >>
Re: When a jobmanager fails, it doesn't restart because it tries to restart non existing tasks
Hi Till, Sorry for the late reply, I was waiting to update to Flink 1.6.0 to see if the problem got fixed but I still experience the first issue (jobgraph not deleted from zookeeper when task is canceled). The second issue (taskmanagers unable to register to the new elected jobmanager) was actually a configuration error, all jobmanagers had the "jobmanager.rpc.address" option set to point to the same jobmanager so I guess the new one was not registering its url correctly in Zookeeper. And for the first issue, it seems to be a known bug: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Old-job-resurrected-during-HA-failover-td22000.html http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/JobGraphs-not-cleaned-up-in-HA-mode-td22600.html https://issues.apache.org/jira/browse/FLINK-10011 so I guess I'll have to wait until there is a fix as I have not seens any workaround (other than removing the jobgraph from Zookeeper after cancelling the task) Thanks, Gerard On Tue, Jul 24, 2018 at 1:46 PM Till Rohrmann wrote: > Hi Gerard, > > the first log snippet from the client does not show anything suspicious. > The warning just says that you cannot use the Yarn CLI because it lacks the > Hadoop dependencies in the classpath. > > The second snippet is indeed more interesting. If the TaskExecutors are > not notified about the changed leader, then this might indicate a problem > with the ZooKeeper connection or the ZooKeeper cluster itself. This might > also explain why the job deletion from ZooKeeper does not succeed. > > One thing you could check is whether the leader ZNode under > `/flink/default/leader/dispatcher_lock` (if you are using the defaults) > actually contains the address of the newly elected leader. The leader path > should also be logged in the cluster entrypoint logs. You can use the > ZooKeeper cli for accessing the ZNodes. > > Cheers, > Till > > On Mon, Jul 23, 2018 at 4:07 PM Gerard Garcia wrote: > >> We have just started experiencing a different problem that could be >> related, maybe it helps to diagnose the issue. >> >> In the last 24h the jobmanager lost connection to Zookeeper a couple of >> times. Each time, a new jobmanager (in a different node) was elected leader >> correctly but the taskamangers kept trying to connect to the old >> jobmanager. These are the ending log messages until the taskamanger shut >> down itself. >> >> 12:06:41.747 [flink-akka.actor.default-dispatcher-5] WARN >> akka.remote.transport.netty.NettyTransport New I/O boss #3 - Remote >> connection to [null] failed with java.net.ConnectException: Connection >> refused: (...)1/192.168.1.9:35605 >> 12:06:41.748 [flink-akka.actor.default-dispatcher-2] INFO >> org.apache.flink.runtime.taskexecutor.TaskExecutor - Could not resolve >> ResourceManager address akka.tcp://flink@(...)1:35605/user/resourcemanager, >> retrying in 1 ms: Could not connect to rpc endpoint under address >> akka.tcp://flink@(...)1:35605/user/resourcemanager.. >> 12:06:41.748 [flink-akka.actor.default-dispatcher-5] WARN >> akka.remote.ReliableDeliverySupervisor >> flink-akka.remote.default-remote-dispatcher-15 - Association with remote >> system [akka.tcp://flink@(...)1:35605] has failed, address is now gated >> for [50] ms. Reason: [Association failed with >> [akka.tcp://flink@(...)1:35605]] >> Caused by: [Connection refused: (...)1/192.168.1.9:35605] >> 12:06:51.766 [flink-akka.actor.default-dispatcher-5] WARN >> akka.remote.transport.netty.NettyTransport New I/O boss #3 - Remote >> connection to [null] failed with java.net.ConnectException: Connection >> refused: (...)1/192.168.1.9:35605 >> 12:06:51.767 [flink-akka.actor.default-dispatcher-2] INFO >> org.apache.flink.runtime.taskexecutor.TaskExecutor - Could not resolve >> ResourceManager address akka.tcp://flink@(...)1:35605/user/resourcemanager, >> retrying in 1 ms: Could not connect to rpc endpoint under address >> akka.tcp://flink@(...)1:35605/user/resourcemanager.. >> 12:06:51.767 [flink-akka.actor.default-dispatcher-5] WARN >> akka.remote.ReliableDeliverySupervisor >> flink-akka.remote.default-remote-dispatcher-7 - Association with remote >> system [akka.tcp://flink@(...)1:35605] has failed, address is now gated >> for [50] ms. Reason: [Association failed with >> [akka.tcp://flink@(...)1:35605]] >> Caused by: [Connection refused: (...)1/192.168.1.9:35605] >> 12:07:01.123 [flink-akka.actor.default-dispatcher-5] ERROR >> org.apache.flink.runtime.taskexecutor.TaskExecutor - Fatal error occurred >> in TaskExecutor akka.tcp://flink@(...)2:33455/user/taskmanager_0. >> org.apache.flink.r
Re: Override CaseClassSerializer with custom serializer
Hi Timo, I see. Yes, we have already use the "Object Reuse" option. It was a nice performance improvement when we first set it! I guess another option we can try is to somehow make things "easier" to Flink so it can chain operators together. Most of them are not chained, I think it's because they have a control stream as source together with the main stream. I'll need to check that and see if we can re-architecture them. Thanks, Gerard On Fri, Aug 17, 2018 at 11:21 PM Timo Walther wrote: > Hi Gerard, > > you are correct, Kryo serializers are only used when no built-in Flink > serializer is available. > > Actually, the tuple and case class serializers are one of the most > performant serializers in Flink (due to their fixed length, no null > support). If you really want to reduce the serialization overhead you > could look into the object reuse mode. We had this topic on the mailing > list recently, I will just copy it here: > > If you want to improve the performance of a collect() between operators, > you could also enable object reuse. You can read more about this here > [1] (section "Issue 2: Object Reuse"), but make sure your implementation > is correct because an operator could modify the objects of follwing > operators. > > I hope this helps. > > Regards, > Timo > > [1] > > https://data-artisans.com/blog/curious-case-broken-benchmark-revisiting-apache-flink-vs-databricks-runtime > > Am 17.08.18 um 17:29 schrieb gerardg: > > Hello, > > > > I can't seem to be able to override the CaseClassSerializer with my > custom > > serializer. I'm using env.getConfig.addDefaultKryoSerializer() to add the > > custom serializer but I don't see it being used. I guess it is because it > > only uses Kryo based serializers if it can't find a Flink serializer? > > > > Is then worth it to replace the CaseClassSerializer with a custom > > serializer? (when I profile the CaseClassSerializer.(de)serialize method > > appears as the most used so I wanted to give it a try) If so, how can I > do > > it? > > > > Thanks, > > > > Gerard > > > > > > > > -- > > Sent from: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ > > >
Re: When a jobmanager fails, it doesn't restart because it tries to restart non existing tasks
) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142) at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165) at akka.actor.Actor$class.aroundReceive(Actor.scala:502) at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526) at akka.actor.ActorCell.invoke(ActorCell.scala:495) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) at akka.dispatch.Mailbox.run(Mailbox.scala:224) at akka.dispatch.Mailbox.exec(Mailbox.scala:234) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 12:07:01.128 [flink-akka.actor.default-dispatcher-5] INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Stopping TaskExecutor akka.tcp://flink@(...)2:33455/user/taskmanager_0. 12:07:01.128 [flink-akka.actor.default-dispatcher-5] INFO o.a.f.r.leaderretrieval.ZooKeeperLeaderRetrievalService - Stopping ZooKeeperLeaderRetrievalService /leader/resource_manager_lock. 12:07:01.128 [flink-akka.actor.default-dispatcher-5] INFO o.a.flink.runtime.state.TaskExecutorLocalStateStoresManager - Shutting down TaskExecutorLocalStateStoresManager. 12:07:01.130 [flink-akka.actor.default-dispatcher-5] INFO org.apache.flink.runtime.io.disk.iomanager.IOManager - I/O manager removed spill file directory /home/tmp/flink-io-cf83b38e-53f1-4802-a097-e6db95b46084 12:07:01.130 [flink-akka.actor.default-dispatcher-5] INFO org.apache.flink.runtime.io.network.NetworkEnvironment - Shutting down the network environment and its components. 12:07:01.131 [flink-akka.actor.default-dispatcher-5] INFO org.apache.flink.runtime.io.network.netty.NettyClient - Successful shutdown (took 1 ms). 12:07:01.132 [flink-akka.actor.default-dispatcher-5] INFO org.apache.flink.runtime.io.network.netty.NettyServer - Successful shutdown (took 1 ms). 12:07:01.141 [flink-akka.actor.default-dispatcher-5] INFO org.apache.flink.runtime.taskexecutor.JobLeaderService - Stop job leader service. 12:07:01.142 [flink-akka.actor.default-dispatcher-5] INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Stopped TaskExecutor akka.tcp://flink@(...)2:33455/user/taskmanager_0. 12:07:01.143 [flink-akka.actor.default-dispatcher-5] INFO org.apache.flink.runtime.blob.PermanentBlobCache - Shutting down BLOB cache 12:07:01.143 [flink-akka.actor.default-dispatcher-5] INFO org.apache.flink.runtime.blob.TransientBlobCache - Shutting down BLOB cache 12:07:01.148 [Curator-Framework-0] INFO o.a.f.s.c.o.a.curator.framework.imps.CuratorFrameworkImpl - backgroundOperationsLoop exiting 12:07:01.173 [flink-akka.actor.default-dispatcher-5] INFO o.a.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper - Session: 0x3011955028f00f4 closed 12:07:01.173 [main-EventThread] INFO o.a.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn - EventThread shut down for session: 0x3011955028f00f4 12:07:01.173 [flink-akka.actor.default-dispatcher-5] INFO org.apache.flink.runtime.rpc.akka.AkkaRpcService - Stopping Akka RPC service. 12:07:01.178 [flink-akka.actor.default-dispatcher-3] INFO akka.remote.RemoteActorRefProvider$RemotingTerminator flink-akka.remote.default-remote-dispatcher-16 - Shutting down remote daemon. 12:07:01.178 [flink-akka.actor.default-dispatcher-3] INFO akka.remote.RemoteActorRefProvider$RemotingTerminator flink-akka.remote.default-remote-dispatcher-16 - Remote daemon shut down; proceeding with flushing remote transports. 12:07:01.190 [flink-akka.actor.default-dispatcher-5] INFO akka.remote.RemoteActorRefProvider$RemotingTerminator flink-akka.remote.default-remote-dispatcher-7 - Remoting shut down. It seems like there are problems updating Zookeeper. I've have also noticed these messages in Zookeeper log: WARN [SyncThread:2:FileTxnLog@378] - fsync-ing the write ahead log in SyncThread:2 took 1259ms which will adversely effect operation latency. See the ZooKeeper troubleshooting guide Maybe Flink hits some timeout? Gerard On Mon, Jul 23, 2018 at 11:57 AM Gerard Garcia wrote: > Hi Till, > > I can't post the full log (as there is internal info in them) but I've > found this. Is that what you are looking for? > > 11:29:17.351 [main] INFO org.apache.flink.client.cli.CliFrontend - > > 11:29:17.372 [main] INFO org.apache.flink.client.cli.CliFrontend - > Starting Command Line Client (Version: 1.5-SNAPSHOT, Rev:a4fc4c6, > Date:05.06.2018 @ 10:22:30 CEST) > 11:29:17.372 [main] INFO org.apache.flink.client.cli.CliFrontend - OS > current user: (...) > 11:29:17.372 [main] INFO org.apache.fl
Re: Flink job hangs/deadlocks (possibly related to out of memory)
Thanks Zhijiang, Yes, I guess our best option right now is to just reduce the structure of the output record and see if that solves the problem. Gerard On Tue, Jul 17, 2018 at 4:56 PM Zhijiang(wangzhijiang999) < wangzhijiang...@aliyun.com> wrote: > Hi Gerard, > > From the jstack you provided, the task is serializing the output record > and during this process it will not process the input data any more. > It can not indicate out of memory issue from this stack. And if the output > buffer is exhausted, the task will be blocked on requestBufferBlocking > process. > > I think the key point is your output record is too large and complicated > structure, because every field and collection in this complicated class > will be traversed to serialize, then it will cost much time and CPU usage. > Furthermore, the checkpoint can not be done because of waiting for lock > which is also occupied by task output process. > > As you mentioned, it makes sense to check the data structure of the output > record and reduces the size or make it lightweight to handle. > > Best, > > Zhijiang > > ------ > 发件人:Gerard Garcia > 发送时间:2018年7月17日(星期二) 21:53 > 收件人:piotr > 抄 送:fhueske ; wangzhijiang999 < > wangzhijiang...@aliyun.com>; user ; nico < > n...@data-artisans.com> > 主 题:Re: Flink job hangs/deadlocks (possibly related to out of memory) > > Yes, I'm using Flink 1.5.0 and what I'm serializing is a really big record > (probably too big, we have already started working to reduce its size) > which consists of several case classes which have (among others) fields of > type String. > > I attach a CPU profile of the thread stuck serializing. I also attach the > memory and GC telemetry that the profiler shows (which maybe is more > informative than the one recorded from the JVM metrics). Only one node was > actually "doing something" all others had CPU usage near zero. > > The task is at the same time trying to perform a checkpoint but keeps > failing. Would it make sense that the problem is that there is not enough > memory available to perform the checkpoint so all operators are stuck > waiting for it to finish, and at the same time, the operator stuck > serializing is keeping all the memory so neither it nor the checkpoint can > advance? > > I realized that I don't have a minimum pause between checkpoints so it is > continuously trying. Maybe I can reduce the checkpoint timeout from the 10m > default and introduce a minimum pause (e.g. 5m timeout and 5m minimum > pause) and this way I could break the deadlock. > > Gerard > > > On Tue, Jul 17, 2018 at 9:00 AM Piotr Nowojski > wrote: > Hi, > > Thanks for the additional data. Just to make sure, are you using Flink > 1.5.0? > > There are a couple of threads that seams to be looping in serialisation, > while others are blocked and either waiting for new data or waiting for > some one to consume some data. Could you debug or CPU profile the code, in > particularly focusing on threads with stack trace as below [1]. Aren’t you > trying to serialise some gigantic String? > > Piotrek > > [1]: > > "(...) (19/26)" #2737 prio=5 os_prio=0 tid=0x7f52584d2800 nid=0x6819 > runnable [0x7f451a843000] >java.lang.Thread.State: RUNNABLE > at > org.apache.flink.core.memory.DataOutputSerializer.write(DataOutputSerializer.java:133) > at org.apache.flink.types.StringValue.writeString(StringValue.java:812) > at > org.apache.flink.api.common.typeutils.base.StringSerializer.serialize(StringSerializer.java:64) > at > org.apache.flink.api.common.typeutils.base.StringSerializer.serialize(StringSerializer.java:28) > at > org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:113) > at > org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:32) > at > org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:113) > at > org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:32) > at > org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$serialize$1.apply(TraversableSerializer.scala:98) > at > org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$serialize$1.apply(TraversableSerializer.scala:93) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) > at > org.apache.flink.api.scala.typeutils.TraversableSerializer.serialize(TraversableSerializer.scala:93) > at > org.apache.flink.api.scala.typeutils.TraversableSerializer.se
Re: When a jobmanager fails, it doesn't restart because it tries to restart non existing tasks
o.a.f.s.c.o.a.curator.framework.state.ConnectionStateManager - State change: CONNECTED 11:29:17.764 [main] INFO org.apache.flink.runtime.rest.RestClient - Rest client endpoint started. 11:29:17.766 [main] INFO o.a.f.r.leaderretrieval.ZooKeeperLeaderRetrievalService - Starting ZooKeeperLeaderRetrievalService /leader/rest_server_lock. 11:29:17.812 [main] INFO o.a.f.r.leaderretrieval.ZooKeeperLeaderRetrievalService - Starting ZooKeeperLeaderRetrievalService /leader/dispatcher_lock. 11:29:18.007 [main] INFO org.apache.flink.runtime.rest.RestClient - Shutting down rest endpoint. 11:29:18.008 [main] INFO org.apache.flink.runtime.rest.RestClient - Rest endpoint shutdown complete. 11:29:18.008 [main] INFO o.a.f.r.leaderretrieval.ZooKeeperLeaderRetrievalService - Stopping ZooKeeperLeaderRetrievalService /leader/rest_server_lock. 11:29:18.009 [main] INFO o.a.f.r.leaderretrieval.ZooKeeperLeaderRetrievalService - Stopping ZooKeeperLeaderRetrievalService /leader/dispatcher_lock. 11:29:18.010 [Curator-Framework-0] INFO o.a.f.s.c.o.a.curator.framework.imps.CuratorFrameworkImpl - backgroundOperationsLoop exiting 11:29:18.030 [main] INFO o.a.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper - Session: 0x100571bda1903c3 closed 11:29:18.030 [main-EventThread] INFO o.a.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn - EventThread shut down for session: 0x100571bda1903c3 11:29:18.030 [main] INFO org.apache.flink.client.cli.CliFrontend - Cancelled job e403893e5208ca47ace886a77e405291. Gerard On Fri, Jul 20, 2018 at 5:14 AM vino yang wrote: > Hi Till, > > You are right, we also saw the problem you said. Curator removes the > specific job graph path asynchronously. But it's the only gist when > recovering, right? Is there any plan to enhance this point? > > Thanks, vino. > > 2018-07-19 21:58 GMT+08:00 Till Rohrmann : > >> Hi Gerard, >> >> the logging statement `Removed job graph ... from ZooKeeper` is actually >> not 100% accurate. The actual deletion is executed as an asynchronous >> background task and the log statement is not printed in the callback (which >> it should). Therefore, the deletion could still have failed. In order to >> see this, the full jobmanager/cluster entry point logs would be >> tremendously helpful. >> >> Cheers, >> Till >> >> On Thu, Jul 19, 2018 at 1:33 PM Gerard Garcia wrote: >> >>> Thanks Andrey, >>> >>> That is the log from the jobmanager just after it has finished >>> cancelling the task: >>> >>> 11:29:18.716 [flink-akka.actor.default-dispatcher-15695] INFO >>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Stopping >>> checkpoint coordinator for job e403893e5208ca47ace886a77e405291. >>> 11:29:18.716 [flink-akka.actor.default-dispatcher-15695] INFO >>> o.a.f.runtime.checkpoint.ZooKeeperCompletedCheckpointStore - Shutting down >>> 11:29:18.738 [flink-akka.actor.default-dispatcher-15695] INFO >>> o.a.f.runtime.checkpoint.ZooKeeperCompletedCheckpointStore - Removing >>> /flink-eur/default/checkpoints/e403893e5208ca47ace886a77e405291 from >>> ZooKeeper >>> 11:29:18.780 [flink-akka.actor.default-dispatcher-15695] INFO >>> o.a.flink.runtime.checkpoint.ZooKeeperCheckpointIDCounter - Shutting down. >>> 11:29:18.780 [flink-akka.actor.default-dispatcher-15695] INFO >>> o.a.flink.runtime.checkpoint.ZooKeeperCheckpointIDCounter - Removing >>> /checkpoint-counter/e403893e5208ca47ace886a77e405291 from ZooKeeper >>> 11:29:18.827 [flink-akka.actor.default-dispatcher-15695] INFO >>> org.apache.flink.runtime.dispatcher.StandaloneDispatcher - Job >>> e403893e5208ca47ace886a77e405291 reached globally terminal state CANCELED. >>> 11:29:18.846 [flink-akka.actor.default-dispatcher-15675] INFO >>> org.apache.flink.runtime.jobmaster.JobMaster - Stopping the JobMaster for >>> job (...)(e403893e5208ca47ace886a77e405291). >>> 11:29:18.848 [flink-akka.actor.default-dispatcher-15675] INFO >>> o.a.f.r.leaderretrieval.ZooKeeperLeaderRetrievalService - Stopping >>> ZooKeeperLeaderRetrievalService /leader/resource_manager_lock. >>> 11:29:18.864 [flink-akka.actor.default-dispatcher-15675] INFO >>> org.apache.flink.runtime.jobmaster.JobMaster - Close ResourceManager >>> connection d5fbc30a895066054e29fb2fd60fb0f1: JobManager is shutting down.. >>> 11:29:18.864 [flink-akka.actor.default-dispatcher-15695] INFO >>> org.apache.flink.runtime.jobmaster.slotpool.SlotPool - Suspending SlotPool. >>> 11:29:18.864 [flink-akka.actor.default-dispatcher-15695] INFO >>> org.apache.flink.runtime.jobmaster.slotpool.SlotPool - Stopping SlotPool. >>> 11:29:18.864 [flink-akka.actor.default-dispatcher-1
Re: When a jobmanager fails, it doesn't restart because it tries to restart non existing tasks
Thanks Andrey, That is the log from the jobmanager just after it has finished cancelling the task: 11:29:18.716 [flink-akka.actor.default-dispatcher-15695] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Stopping checkpoint coordinator for job e403893e5208ca47ace886a77e405291. 11:29:18.716 [flink-akka.actor.default-dispatcher-15695] INFO o.a.f.runtime.checkpoint.ZooKeeperCompletedCheckpointStore - Shutting down 11:29:18.738 [flink-akka.actor.default-dispatcher-15695] INFO o.a.f.runtime.checkpoint.ZooKeeperCompletedCheckpointStore - Removing /flink-eur/default/checkpoints/e403893e5208ca47ace886a77e405291 from ZooKeeper 11:29:18.780 [flink-akka.actor.default-dispatcher-15695] INFO o.a.flink.runtime.checkpoint.ZooKeeperCheckpointIDCounter - Shutting down. 11:29:18.780 [flink-akka.actor.default-dispatcher-15695] INFO o.a.flink.runtime.checkpoint.ZooKeeperCheckpointIDCounter - Removing /checkpoint-counter/e403893e5208ca47ace886a77e405291 from ZooKeeper 11:29:18.827 [flink-akka.actor.default-dispatcher-15695] INFO org.apache.flink.runtime.dispatcher.StandaloneDispatcher - Job e403893e5208ca47ace886a77e405291 reached globally terminal state CANCELED. 11:29:18.846 [flink-akka.actor.default-dispatcher-15675] INFO org.apache.flink.runtime.jobmaster.JobMaster - Stopping the JobMaster for job (...)(e403893e5208ca47ace886a77e405291). 11:29:18.848 [flink-akka.actor.default-dispatcher-15675] INFO o.a.f.r.leaderretrieval.ZooKeeperLeaderRetrievalService - Stopping ZooKeeperLeaderRetrievalService /leader/resource_manager_lock. 11:29:18.864 [flink-akka.actor.default-dispatcher-15675] INFO org.apache.flink.runtime.jobmaster.JobMaster - Close ResourceManager connection d5fbc30a895066054e29fb2fd60fb0f1: JobManager is shutting down.. 11:29:18.864 [flink-akka.actor.default-dispatcher-15695] INFO org.apache.flink.runtime.jobmaster.slotpool.SlotPool - Suspending SlotPool. 11:29:18.864 [flink-akka.actor.default-dispatcher-15695] INFO org.apache.flink.runtime.jobmaster.slotpool.SlotPool - Stopping SlotPool. 11:29:18.864 [flink-akka.actor.default-dispatcher-15688] INFO o.a.flink.runtime.resourcemanager.StandaloneResourceManager - Disconnect job manager 9cf221e2340597629fb932c03aa14...@akka.tcp://flink@(...):33827/user/jobmanager_9 for job e403893e5208ca47ace886a77e405291 from the resource manager. 11:29:18.864 [flink-akka.actor.default-dispatcher-15675] INFO o.a.f.runtime.leaderelection.ZooKeeperLeaderElectionService - Stopping ZooKeeperLeaderElectionService ZooKeeperLeaderElectionService{leaderPath='/leader/e403893e5208ca47ace886a77e405291/job_manager_lock'}. 11:29:18.980 [flink-akka.actor.default-dispatcher-15695] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed checkpoint 31154 for job 5d8c376b10d358b9c9470b3e70113626 (132520 bytes in 411 ms). 11:29:19.025 [flink-akka.actor.default-dispatcher-15683] INFO o.a.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore - Removed job graph e403893e5208ca47ace886a77e405291 from ZooKeeper. At the end it says removed job graph e403893e5208ca47ace886a77e405291 from ZooKeeper but I still can see it at /flink/default/jobgraphs: [zk: localhost:2181(CONNECTED) 14] ls /flink/default/jobgraphs/e403893e5208ca47ace886a77e405291 [3fe9c3c8-5bec-404e-a720-75f9b188124f, 36208299-0f6d-462c-bae4-2e3d53f50e8c] Gerard On Wed, Jul 18, 2018 at 4:24 PM Andrey Zagrebin wrote: > Hi Gerard, > > There is an issue recently fixed for 1.5.2, 1.6.0: > https://issues.apache.org/jira/browse/FLINK-9575 > It might have caused your problem. > > Can you please provide log from JobManager/Entry point for further > investigation? > > Cheers, > Andrey > > On 18 Jul 2018, at 10:16, Gerard Garcia wrote: > > Hi vino, > > Seems that jobs id stay in /jobgraphs when we cancel them manually. For > example, after cancelling the job with id 75e16686cb4fe0d33ead8e29af131d09 > the entry is still in zookeeper's path /flink/default/jobgraphs, but the > job disappeared from /home/nas/flink/ha/default/blob/. > > That is the client log: > > 09:20:58.492 [main] INFO org.apache.flink.client.cli.CliFrontend - > Cancelling job 75e16686cb4fe0d33ead8e29af131d09. > 09:20:58.503 [main] INFO > org.apache.flink.runtime.blob.FileSystemBlobStore - Creating highly > available BLOB storage directory at > file:///home/nas/flink/ha//default/blob > 09:20:58.505 [main] INFO org.apache.flink.runtime.util.ZooKeeperUtils - > Enforcing default ACL for ZK connections > 09:20:58.505 [main] INFO org.apache.flink.runtime.util.ZooKeeperUtils - > Using '/flink-eur/default' as Zookeeper namespace. > 09:20:58.539 [main] INFO > o.a.f.s.c.o.a.curator.framework.imps.CuratorFrameworkImpl - Starting > 09:20:58.543 [main] INFO > o.a.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper - Client > environment:zookeeper.version= > 3.4.10-39d3a4f269333c922ed3db283be479f9deacaa0f, b
Re: When a jobmanager fails, it doesn't restart because it tries to restart non existing tasks
Hi vino, Seems that jobs id stay in /jobgraphs when we cancel them manually. For example, after cancelling the job with id 75e16686cb4fe0d33ead8e29af131d09 the entry is still in zookeeper's path /flink/default/jobgraphs, but the job disappeared from /home/nas/flink/ha/default/blob/. That is the client log: 09:20:58.492 [main] INFO org.apache.flink.client.cli.CliFrontend - Cancelling job 75e16686cb4fe0d33ead8e29af131d09. 09:20:58.503 [main] INFO org.apache.flink.runtime.blob.FileSystemBlobStore - Creating highly available BLOB storage directory at file:///home/nas/flink/ha//default/blob 09:20:58.505 [main] INFO org.apache.flink.runtime.util.ZooKeeperUtils - Enforcing default ACL for ZK connections 09:20:58.505 [main] INFO org.apache.flink.runtime.util.ZooKeeperUtils - Using '/flink-eur/default' as Zookeeper namespace. 09:20:58.539 [main] INFO o.a.f.s.c.o.a.curator.framework.imps.CuratorFrameworkImpl - Starting 09:20:58.543 [main] INFO o.a.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper - Client environment:zookeeper.version= 3.4.10-39d3a4f269333c922ed3db283be479f9deacaa0f, built on 03/23/2017 10:13 GMT 09:20:58.543 [main] INFO o.a.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper - Client environment:host.name=flink-eur-production1 09:20:58.543 [main] INFO o.a.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper - Client environment:java.version=1.8.0_131 09:20:58.544 [main] INFO o.a.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper - Client environment:java.vendor=Oracle Corporation 09:20:58.546 [main] INFO o.a.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper - Client environment:java.home=/opt/jdk/jdk1.8.0_131/jre 09:20:58.546 [main] INFO o.a.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper - Client environment:java.class.path=/opt/flink/flink-1.5.0/lib/commons-httpclient-3.1.jar:/opt/flink/flink-1.5.0/lib/flink-metrics-statsd-1.5.0.jar:/opt/flink/flink-1.5.0/lib/flink-python_2.11-1.5.0.jar:/opt/flink/flink-1.5.0/lib/fluency-1.8.0.jar:/opt/flink/flink-1.5.0/lib/gcs-connector-latest-hadoop2.jar:/opt/flink/flink-1.5.0/lib/hadoop-openstack-2.7.1.jar:/opt/flink/flink-1.5.0/lib/jackson-annotations-2.8.0.jar:/opt/flink/flink-1.5.0/lib/jackson-core-2.8.10.jar:/opt/flink/flink-1.5.0/lib/jackson-databind-2.8.11.1.jar:/opt/flink/flink-1.5.0/lib/jackson-dataformat-msgpack-0.8.15.jar:/opt/flink/flink-1.5.0/lib/log4j-1.2.17.jar:/opt/flink/flink-1.5.0/lib/log4j-over-slf4j-1.7.25.jar:/opt/flink/flink-1.5.0/lib/logback-classic-1.2.3.jar:/opt/flink/flink-1.5.0/lib/logback-core-1.2.3.jar:/opt/flink/flink-1.5.0/lib/logback-more-appenders-1.4.2.jar:/opt/flink/flink-1.5.0/lib/msgpack-0.6.12.jar:/opt/flink/flink-1.5.0/lib/msgpack-core-0.8.15.jar:/opt/flink/flink-1.5.0/lib/phi-accural-failure-detector-0.0.4.jar:/opt/flink/flink-1.5.0/lib/slf4j-log4j12-1.7.7.jar:/opt/flink/flink-1.5.0/lib/flink-dist_2.11-1.5.0.jar::: 09:20:58.546 [main] INFO o.a.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper - Client environment:java.library.path=/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib 09:20:58.546 [main] INFO o.a.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper - Client environment:java.io.tmpdir=/tmp 09:20:58.546 [main] INFO o.a.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper - Client environment:java.compiler= 09:20:58.547 [main] INFO o.a.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper - Client environment:os.name=Linux 09:20:58.547 [main] INFO o.a.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper - Client environment:os.arch=amd64 09:20:58.547 [main] INFO o.a.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper - Client environment:os.version=4.9.87--std-ipv6-64 09:20:58.547 [main] INFO o.a.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper - Client environment:user.name=root 09:20:58.547 [main] INFO o.a.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper - Client environment:user.home=/root 09:20:58.547 [main] INFO o.a.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper - Client environment:user.dir=/opt/flink/flink-1.5.0/bin 09:20:58.548 [main] INFO o.a.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper - Initiating client connection, connectString=10.1.1.5:2181,10.1.1.6:2181,10.1.1.7:2181 sessionTimeout=6 watcher=org.apache.flink.shaded.curator.org.apache.curator.ConnectionState@4a003cbe 09:20:58.555 [main-SendThread(10.1.1.5:2181)] WARN o.a.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn - SASL configuration failed: javax.security.auth.login.LoginException: No JAAS configuration section named 'Client' was found in specified JAAS configuration file: '/tmp/jaas-9143038863636945274.conf'. Will continue connection to Zookeeper server without SASL authentication, if Zookeeper server allows it. 09:20:58.556 [main-SendThread(10.1.1.5:2181)] INFO o.a.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn - Opening socket connection to server 10.1.1.5/10.1.1.5:2181 09:20:58.556 [main-EventThread] ERROR
Re: Flink job hangs/deadlocks (possibly related to out of memory)
Hi Zhijiang, The problem is that no other task failed first. We have a task that sometimes just stops processing data, and when we cancel it, we see the logs messages saying: " Task (...) did not react to cancelling signal for 30 seconds, but is stuck in method: org.apache.flink.core.memory.DataOutputSerializer.resize(DataOutputSerializer.java:305) org.apache.flink.core.memory.DataOutputSerializer.write(DataOutputSerializer.java:133) org.apache.flink.types.StringValue.writeString(StringValue.java:802) (...)" That is why we suspect that it hangs forever at that point and that is why it stops processing data. I don;t see any increase in memory use in the heap (I guess because these buffers are managed by Flink) so I'm not sure if that is really the problem. Gerard On Tue, Jul 3, 2018 at 6:15 AM Zhijiang(wangzhijiang999) < wangzhijiang...@aliyun.com> wrote: > Hi Gerard, > > I think you can check the job manager log to find which task failed at > first, and then trace the task manager log containing the failed task to > find the initial reason. > The failed task will trigger canceling all the other tasks, and during > canceling process, the blocked task that is waiting for output buffer can > not be interrupted by the > canceler thread which is shown in your description. So I think the cancel > process is not the key point and is in expectation. Maybe it did not cause > OOM at all. > If the taskduring canceling, the task manager process will be exited > finally to trigger restarting the job. > > Zhijiang > > ---------- > 发件人:Gerard Garcia > 发送时间:2018年7月2日(星期一) 18:29 > 收件人:wangzhijiang999 > 抄 送:user > 主 题:Re: Flink job hangs/deadlocks (possibly related to out of memory) > > Thanks Zhijiang, > > We haven't found any other relevant log messages anywhere. These traces > belong to the unresponsive task, that is why we suspect that at some point > it did not have enough memory to serialize the message and it blocked. I've > also found that when it hanged several output buffers were full (see > attached image buffers.outPoolUsage.png) so I guess the traces just reflect > that. > > Probably the task hanged for some other reason and that is what filled the > output buffers previous to the blocked operator. I'll have to continue > investigating to find the real cause. > > Gerard > > > > > On Mon, Jul 2, 2018 at 9:50 AM Zhijiang(wangzhijiang999) < > wangzhijiang...@aliyun.com> wrote: > Hi Gerard, > > From the below stack, it can only indicate the task is canceled that > may be triggered by job manager becuase of other task failure. If the task > can not be interrupted within timeout config, the task managerprocess will > be exited. Do you see any OutOfMemory messages from the task manager log? > Normally the ouput serialization buffer is managed by task manager > framework and will not cause OOM, and on the input desearialization side, > there will be a temp bytes array on each channel for holding partial > records which is not managed by framework. I think you can confirm whether > and where caused the OOM. Maybe check the task failure logs. > > Zhijiang > > -- > 发件人:gerardg > 发送时间:2018年6月30日(星期六) 00:12 > 收件人:user > 主 题:Re: Flink job hangs/deadlocks (possibly related to out of memory) > > (fixed formatting) > > Hello, > > We have experienced some problems where a task just hangs without showing > any kind of log error while other tasks running in the same task manager > continue without problems. When these tasks are restarted the task manager > gets killed and shows several errors similar to these ones: > > [Canceler/Interrupts for (...)' did not react to cancelling signal for 30 > seconds, but is stuck in method: > java.nio.ByteBuffer.wrap(ByteBuffer.java:373) > java.nio.ByteBuffer.wrap(ByteBuffer.java:396) > org.apache.flink.core.memory.DataOutputSerializer.resize(DataOutputSerializer.java:330) > org.apache.flink.core.memory.DataOutputSerializer.writeInt(DataOutputSerializer.java:212) > org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:63) > org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:27) > org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:113) > org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:32) > org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$serialize$1.apply(TraversableSerializer.scala:98) > org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$serialize$1.apply(TraversableSerialize
Re: Get which key groups are assigned to an operator
You are right that probably the best solution would be to be able to use different state backends for different operators, I hope it gets implemented at some point. Meanwhile I'll take a look at the methods in org.apache.flink.runtime.state.KeyGroupRangeAssignment, maybe I can find a workaround good enough for me. Thanks, Gerard On Tue, Feb 20, 2018 at 3:56 PM, Stefan Richter <s.rich...@data-artisans.com > wrote: > Hi, > > ok, now I understand your goal a bit better. If would still like to point > out that it may take a bit more than it looks like. Just to name one > example, you probably also want to support asynchronous snapshots which is > most likely difficult when using just a hashmap. I think the proper > solution for you (and also something that we are considering to support in > the future) is that different backends could be supported for different > operators in a job. But that is currently not possible. I still want to > answer your other question: you could currently compute all things about > key-groups and their assignment to operators by using the methods > from org.apache.flink.runtime.state.KeyGroupRangeAssignment. > > Best, > Stefan > > > Am 20.02.2018 um 14:52 schrieb Gerard Garcia <ger...@talaia.io>: > > Hi Stefan, thanks > > Yes, we are also using keyed state in other operators the problem is that > serialization is quite expensive and in some of them we would prefer to > avoid it by storing the state in memory (for our use case one specific > operator with in memory state gives at least a 30% throughput improvement). > When we are not operating in a keyed stream is easy, basically all the > operators have the same in memory state, what we would like to do is the > same but when we are operating in a keyed stream. Does it make more sense > now? > > We are using rocksdb as state backend and as far as I know elements get > always serialized when stored in the state and I'm not sure if there is > even some disk access (maybe not synchronously) that could hurt performance. > > Gerard > > On Tue, Feb 20, 2018 at 2:42 PM, Stefan Richter < > s.rich...@data-artisans.com> wrote: > >> Hi, >> >> from what I read, I get the impression that you attempt to implement you >> own "keyed state" with a hashmap? Why not using the keyed state that is >> already provided by Flink and gives you efficient rescaling etc. out of the >> box? Please see [1] for the details. >> >> [1] https://ci.apache.org/projects/flink/flink-docs-master/ >> dev/stream/state/state.html#using-managed-keyed-state >> >> Am 20.02.2018 um 13:44 schrieb gerardg <ger...@talaia.io>: >> >> Hello, >> >> To improve performance we have " keyed state" in the operator's memory, >> basically we keep a Map which contains the state per each of the keys. The >> problem comes when we want to restore the state after a failure or after >> rescaling the operator. What we are doing is sending the concatenation of >> all the state to every operator using an union redistribution and then we >> restore the "in memory state" every time we see a new key. Then, after a >> while, we just clear the redistributed state. This is somewhat complex and >> prone to errors so we would like to find an alternative way of doing this. >> >> As far as I know Flink knows which keys belong to each operator >> (distributing key groups) so I guess it would be possible to somehow >> calculate the key id from each of the stored keys and restore the in >> memory >> state at once if we could access to the key groups mapping. Is that >> possible? We could patch Flink if necessary to access that information. >> >> Thanks, >> >> Gerard >> >> >> >> -- >> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4. >> nabble.com/ >> >> >> > >
Re: Get which key groups are assigned to an operator
Hi Stefan, thanks Yes, we are also using keyed state in other operators the problem is that serialization is quite expensive and in some of them we would prefer to avoid it by storing the state in memory (for our use case one specific operator with in memory state gives at least a 30% throughput improvement). When we are not operating in a keyed stream is easy, basically all the operators have the same in memory state, what we would like to do is the same but when we are operating in a keyed stream. Does it make more sense now? We are using rocksdb as state backend and as far as I know elements get always serialized when stored in the state and I'm not sure if there is even some disk access (maybe not synchronously) that could hurt performance. Gerard On Tue, Feb 20, 2018 at 2:42 PM, Stefan Richterwrote: > Hi, > > from what I read, I get the impression that you attempt to implement you > own "keyed state" with a hashmap? Why not using the keyed state that is > already provided by Flink and gives you efficient rescaling etc. out of the > box? Please see [1] for the details. > > [1] https://ci.apache.org/projects/flink/flink-docs- > master/dev/stream/state/state.html#using-managed-keyed-state > > Am 20.02.2018 um 13:44 schrieb gerardg : > > Hello, > > To improve performance we have " keyed state" in the operator's memory, > basically we keep a Map which contains the state per each of the keys. The > problem comes when we want to restore the state after a failure or after > rescaling the operator. What we are doing is sending the concatenation of > all the state to every operator using an union redistribution and then we > restore the "in memory state" every time we see a new key. Then, after a > while, we just clear the redistributed state. This is somewhat complex and > prone to errors so we would like to find an alternative way of doing this. > > As far as I know Flink knows which keys belong to each operator > (distributing key groups) so I guess it would be possible to somehow > calculate the key id from each of the stored keys and restore the in memory > state at once if we could access to the key groups mapping. Is that > possible? We could patch Flink if necessary to access that information. > > Thanks, > > Gerard > > > > -- > Sent from: http://apache-flink-user-mailing-list-archive.2336050. > n4.nabble.com/ > > >
Re: Kafka topic partition skewness causes watermark not being emitted
Thanks Gordon. Don't worry, I'll be careful to not have empty partitions until the next release. Also, I'll keep an eye to FLINK-5479 and if at some point I see that there is a fix and the issue bothers us too much I'll try to apply the patch myself to the latest stable release. Gerard On Wed, Dec 13, 2017 at 10:31 AM, Tzu-Li (Gordon) Taiwrote: > Hi, > > I've just elevated FLINK-5479 to BLOCKER for 1.5. > > Unfortunately, AFAIK there is no easy workaround solution for this issue > yet in the releases so far. > The min watermark logic that controls per-partition watermark emission is > hidden inside the consumer, making it hard to work around it. > > One possible solution I can imagine, but perhaps not that trivial, is to > inject some special marker event into all partitions periodically. > The watermark assigner should be able to recognize this special marker and > try to provide some watermark for it. > Another option is that I can provide some patch you can apply for a custom > build of the Kafka connector that handles partition idleness properly. > However, given that we're aiming for a faster release cycle for Flink 1.5 > (proposed release date is Feb. 2018), it might not be worth the extra > maintenance effort on your side of a custom build. > > Best, > Gordon > > > On Tue, Dec 12, 2017 at 9:28 PM, gerardg wrote: > >> I'm also affected by this behavior. There are no updates in FLINK-5479 but >> did you manage to find a way to workaround this? >> >> Thanks, >> >> Gerard >> >> >> >> -- >> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4. >> nabble.com/ >> > >
Re: Missing checkpoint when restarting failed job
I've been monitoring the task and checkpoint 1 never gets deleted. Right now we have: chk-1 chk-1222 chk-326 chk-329 chk-357 chk-358 chk-8945 chk-8999 chk-9525 chk-9788 chk-9789 chk-9790 chk-9791 I made the task fail and it recovered without problems so for now I would say that the problem was with the distributed system or that somehow the chk-1 folder got deleted by something external to flink. If I see the problem again I will try to get more information. Thanks, Gerard On Tue, Nov 21, 2017 at 4:27 PM, Stefan Richterwrote: > Ok, thanks for trying to reproduce this. If possible, could you also > activate trace-level logging for class > org.apache.flink.runtime.state.SharedStateRegistry? > In case the problem occurs, this would greatly help to understand what was > going on. > > > Am 21.11.2017 um 15:16 schrieb gerardg : > > > >> where exactly did you read many times that incremental checkpoints > cannot > > reference files from previous > >> checkpoints, because we would have to correct that information. In fact, > >> this is how incremental checkpoints work. > > > > My fault, I read it in some other posts in the mailing list but now that > I > > read it carefully it meant savepoints not checkpoints. > > > >> Now for this case, I would consider it extremely unlikely that a > >> checkpoint 1620 would still reference a checkpoint 1, > >> in particular if the files for that checkpoint are already deleted, > which > >> should only happen if it is no longer > >> referenced. Which version of Flink are you using and what is your > >> distributed filesystem? Is there any way to > >> reproduce the problem? > > > > We are using Flink version 1.3.2 and GlusterFS. There are usually a few > > checkpoints around at the same time, for example right now: > > > > chk-1 chk-26 chk-27 chk-28 chk-29 chk-30 chk-31 > > > > I'm not sure how to reproduce the problem but I'll monitor the folder to > see > > when chk-1 gets deleted and try to make the task fail when that happens. > > > > Gerard > > > > Gerard > > > > > > > > > > -- > > Sent from: http://apache-flink-user-mailing-list-archive.2336050. > n4.nabble.com/ > >