why BlobServer use ServerSocket instead of Netty's ServerBootstrap?
after checking code, I found that BlobServer use ServerSocket instead of Netty's ServerBootstrap. I wonder why and is it ok to migtate to ServerBootstrap -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Re: is there a config to ask taskmanager to keep retrying connect to jobmanager after Disassociated?
can anybody share anythoughts, insights about this issue? -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Re: some default config questions
thanks -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Re: why does flink release package preferred uber jar than small jar?
thanks rongrong, but it seems unrelevant. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
why does flink release package preferred uber jar than small jar?
what's the benefit that uber jar have while small jar doesn't? -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Re: is there a config to ask taskmanager to keep retrying connect to jobmanager after Disassociated?
so is there a way or config to ask taskmanager to keep continue connectting to jobmanager? -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Re: is there a config to ask taskmanager to keep retrying connect to jobmanager after Disassociated?
when I debug the jobmanager and below is the error log in task manager: 2018-06-04 17:16:33,295 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor- The heartbeat of ResourceManager with id 35df0455efc2fb6fa3f2467f7f5d2ba1 timed out. 2018-06-04 17:16:33,297 DEBUG org.apache.flink.runtime.taskexecutor.TaskExecutor- Close ResourceManager connection 35df0455efc2fb6fa3f2467f7f5d2ba1. java.util.concurrent.TimeoutException: The heartbeat of ResourceManager with id 35df0455efc2fb6fa3f2467f7f5d2ba1 timed out. at org.apache.flink.runtime.taskexecutor.TaskExecutor$ResourceManagerHeartbeatListener.lambda$notifyHeartbeatTimeout$0(TaskExecutor.java:1553) at org.apache.flink.runtime.taskexecutor.TaskExecutor$ResourceManagerHeartbeatListener$$Lambda$26/1975100911.run(Unknown Source) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:295) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:150) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$onReceive$1(AkkaRpcActor.java:132) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor$$Lambda$12/1732386307.apply(Unknown Source) at akka.actor.ActorCell$$anonfun$become$1.applyOrElse(ActorCell.scala:544) at akka.actor.Actor$class.aroundReceive(Actor.scala:502) at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526) at akka.actor.ActorCell.invoke(ActorCell.scala:495) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) at akka.dispatch.Mailbox.run(Mailbox.scala:224) at akka.dispatch.Mailbox.exec(Mailbox.scala:234) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
is there a config to ask taskmanager to keep retrying connect to jobmanager after Disassociated?
I depoloy a standard alone cluster with flink1.5 and when I try to restart the only jobmanger, below is the log print from task manager: 2018-06-04 12:06:35,882 WARN akka.remote.ReliableDeliverySupervisor - Association with remote system [akka.tcp://flink@ipaddress:6123] has failed, address is now gated for [50] ms. Reason: [Disassociated] 2018-06-04 12:07:17,580 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor- The heartbeat of ResourceManager with id 6af9bbb514a6ddaeca95d6e52db6dbd5 timed out. 2018-06-04 12:07:17,580 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor- Close ResourceManager connection 6af9bbb514a6ddaeca95d6e52db6dbd5. 2018-06-04 12:07:17,611 WARN akka.remote.transport.netty.NettyTransport - Remote connection to [null] failed with java.net.ConnectException: Connection refused: /ipaddress:6123 2018-06-04 12:07:17,611 WARN akka.remote.ReliableDeliverySupervisor - Association with remote system [akka.tcp://flink@ipaddress:6123] has failed, address is now gated for [50] ms. Reason: [Association failed with [akka.tcp://flink@ipaddress:6123]] Caused by: [Connection refused: /ipaddress:6123] so I'd like to know if there is a config to ask task manager to keep retrying to connect to job manager(since I am restating jobmanager so it will come back later)? -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Re: is it OK to PR just for code comment errors?
got it. thanks guys -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
some default config questions
why jobmanager.execution.failover-strategy default value is full not Individual? why taskmanager.jvm-exit-on-oom default value is false not true? code is flink 1.5 -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
is it OK to PR just for code comment errors?
I am not sure if it is OK to PR just for code comment errors since I found there is one when I am reading code -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Re: Task did not exit gracefully and lost TaskManager
met the same problem in 1.4 when I cancel job, one of taskmanager keep logging the exception -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Re: latency critical job
Rong Rong: my flink version is 1.4.2 since we are using the docker env which is sharing disk-io, based on our observation, disk-io spike cased by other process in the same physical machine can lead to long time operator processing. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Re: latency critical job
Timo: thanks for u suggestion -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
does Flink call FullGC to reclaim direct memory mainly occupied by RocksDB
each time when cancel Job does Flink call FullGC to reclaim direct memory mainly occupied by RocksDB? if so, where does this? if not, why? -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
latency critical job
some job is latency critical job which means it can't accept certain threadhold of latency so will flink provide timeout operator in near future which means when one operator timeout, the jobmanager will schedule a new operator which starts from previous state of the OP and keep dealing with new events and discard the events are processing. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
chained operator with different parallelism question
someStream.filter(...).map(...).map(...); there operators are supposed to chained. but what if there are set different parallelism like below: someStream.filter(...).setParallelism(X).map(...).setParallelism(Y).map(...).setParallelism(Z); X != Y != Z what will happen? -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Re: Fwd: Decrease initial source read speed
Andrei Shumanski: which source are u using? -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
are there any ways to test the performance of rocksdb state backend?
I'd like to integrate newer version of rocksdb with flink. I'd like to know if there are existing tools/ways to benchmark the performance of rocksdb state backend to see if there are performence improve or drop? MaKeyang TIG.JD.COM -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
managin order to use epoll (tasker.network.netty.transport: epoll), is it required that linux version is 4.0.16 or newer or not
my flink veriso is 1.4.2 my jdk version is 1.8.0.20 my linux version is:3.10.0 I try to use epoll with setting: tasker.network.netty.transport: epoll but it throws excption which leads me here: https://github.com/apache/flink-shaded/issues/30 I followed the instruction and still exception throws. then I found below link Native transports whic is offical netty doc below: http://netty.io/wiki/native-transports.html and it says: Netty provides the following platform specific JNI transports: Linux (since 4.0.16) so my question is: in order to use this epoll, is it required that linux version is 4.0.16 or newer or not? -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
gonna need more logs when task manager is shutting down
one of my task manager is out ot the cluster and I checked its log found something below: 2018-04-19 22:34:47,441 INFO org.apache.flink.runtime.taskmanager.Task - Attempting to fail task externally Process (115/120) (19d0b0ce1ef3b8023b37bdfda643ef44). 2018-04-19 22:34:47,441 INFO org.apache.flink.runtime.taskmanager.Task - Process (115/120) (19d0b0ce1ef3b8023b37bdfda643ef44) switched from RUNNING to FAILED. java.lang.Exception: TaskManager is shutting down. at org.apache.flink.runtime.taskmanager.TaskManager.postStop(TaskManager.scala:220) at akka.actor.Actor$class.aroundPostStop(Actor.scala:515) at org.apache.flink.runtime.taskmanager.TaskManager.aroundPostStop(TaskManager.scala:121) at akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:210) at akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:172) at akka.actor.ActorCell.terminate(ActorCell.scala:374) at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:467) at akka.actor.ActorCell.systemInvoke(ActorCell.scala:483) at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:282) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:260) at akka.dispatch.Mailbox.run(Mailbox.scala:224) at akka.dispatch.Mailbox.exec(Mailbox.scala:234) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) well, Attempting to fail task externally Process due to what? when task manager is shutting down and due to what? these import info is not found in log which is actually very useful -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Re: Slow flink checkpoint
since flink forward SF has done. can you guys give some minutes to take a look at this issue and give some thoughts on it? help to review/comments on my desgin? or give us a design so that I can help to implement it. thanks a lot. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
heartbeat.timeout in 1.4 document
in code of flink 1.4: HeartbeatManagerOptions HEARTBEAT_TIMEOUT = key("heartbeat.timeout").defaultValue(5L); but this config is not finkd in https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/config.html -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Re: Slow flink checkpoint
the test is very promising. the time sync part takes from couple of seconds to couple of mill-seconds. 1000x time reduce(overall time not save since it is just move from sync to async) are u guys interested in this change? -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Re: Slow flink checkpoint
I have put a lot of efforts on this issue and try to resolve it: 1. let me describe current timers' snapshot path first: a) for each keygroup, invoke InternalTimeServiceManager.snapshotStateForKeyGroup b) InternalTimeServiceManager create a InternalTimerServiceSerializationProxy to write snapshot c) InternalTimerServiceSerializationProxy iterattuple and write service name and snapshotTimersForKeyGroup, then get InternalTimersSnapshotWriter to writeTimersSnapshot d) in method writeTimersSnapshot of InternalTimersSnapshotWriter, first write keyserializer and namespaceserializer, then get eventTimers and processingTimers of InternalTimersSnapshot, which is Set of InternalTimer and serializer them. 2. my first try is shallow copy the tuples and then shallow copy the eventTimers and processingTimers, then use another thread to snapshot them without blocking the event processing thread. but it turns out that shallow copy of the eventTimers and processingTimers are time consumed and this solution failed 3. then I try to borrow the idea of data structure CopyOnWriteStateTable and try to manage timers with it. but after digging more, I found out that there is a more easy way to achieve asynchronous snapshot timers due to one fact: InternalTimer is immutable. we can achieve asynchronous with a more easy way based on this fact: a)maintain a stateTableVersion, which is exactly the same thing as CopyOnWriteStateTable and snapshotVersions which is exactly the same thing as CopyOnWriteStateTable in InternalTimeServiceManager. one more thing: a readwrite lock, which is used to protect snapshotVersions and stateTableVersion b)for each InternalTimer, add 2 more properties: create version and delete version beside 3 existing properties: timestamp, key and namespace. each time a Timer is registered in timerservice, it is created with stateTableVersion as its create version while delete version is -1. each time when timer is deleted in timerservice, it is marked delete for giving it a delete verison equals to stateTableVersion without physically delete it from timerservice. c)each time when try to snapshot timers, InternalTimeServiceManager increase its stateTableVersion and add this stateTableVersion in snapshotVersions. these 2 operators are protected by write lock of InternalTimeServiceManager. that current stateTableVersion take as snapshot version of this snapshot d)shallow copy tuples e)then use a another thread asynchronous snapshot whole things: keyserialized, namespaceserializer and timers. for timers which is not deleted(delete version is -1) and create version less than snapshot version, serialized it. for timers whose delete version is not -1 and is bigger than or equals snapshot version, serialized it. otherwise, it will not be serialized by this snapshot. f)when everything is serialized, remove snapshot version in snapshotVersions, which is still in another thread and this action is guarded by write lock. g)last thing: timer physical deletion. 2 places to physically delete timers: each time when timer is deleted in timerservice, it is marked delete for giving it a delete verison equals to stateTableVersion without physically delete it from timerservice. after this, check if timer's delete version is less than min value of snapshotVersions with read lock guarded(which means there is no active timer snapshot running) and if that is true, physically delete it. the other place to delete is in snapshot timer's iterat: when timer's delete version is less than min value of snapshotVersions, which means the timer is deleted and no running snapshot should keep it. h) some more additions: processingTimeTimers and eventTimeTimers for each group used to be hashset and now it is changed to concurrenthashmap with key+namesapce+timestamp as its hash key. the code is done and test is still runnng. I post this comments not only try to hear u guys voice, but also try to figure out some more questios related to currently timer snapshot code path. my questions are below: 1. in method onProcessingTime of HeapInternalTimerService, it is invoked by another thread of ProcessingTimeService, and in this thread, it will remove timer in HeapInternalTimerService. while in current timer snapshot path, I haven't found there is any shallow copy of processingTimeTimers and eventTimeTimers. how could this won't cause concurrent modification exception? 2. since onProcessingTime is trigged in another thread, when timers are snapshot in working thread, what if then a timer is fired and triggerTarget is processed, which could cause state changed, then asynchronous keyedstatsnapshot is trigged. won't this cause state inconsistent? let's image this case: all kedyed state is only chaned by timer. so Add timer1, timer2, timer3, timer4 and timer5 and since no timer is
Re: Restart hook and checkpoint
currently there is only time based way to trigger a checkpoint. based on this discussion, I think flink need to introduce event based way to trigger checkpoint such as restart a task manager should be count as a event. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/