[jira] [Commented] (FLINK-15156) Warn user if System.exit() is called in user code
[ https://issues.apache.org/jira/browse/FLINK-15156?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17267987#comment-17267987 ] Hwanju Kim commented on FLINK-15156: [~rmetzger] - No problem at all and thanks a lot for such thorough review. Good that we have unified security manager. > Warn user if System.exit() is called in user code > - > > Key: FLINK-15156 > URL: https://issues.apache.org/jira/browse/FLINK-15156 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Reporter: Robert Metzger >Assignee: Hwanju Kim >Priority: Minor > Labels: pull-request-available, starter > Fix For: 1.13.0 > > > It would make debugging Flink errors easier if we would intercept and log > calls to System.exit() through the SecurityManager. > A user recently had an error where the JobManager was shutting down because > of a System.exit() in the user code: > https://lists.apache.org/thread.html/b28dabcf3068d489f38399c456c80d48569fcdf74b15f8bb95d532d0%40%3Cuser.flink.apache.org%3E > If I remember correctly, we had such issues before. > I put this ticket into the "Runtime / Coordination" component, as it is > mostly about improving the usability / debuggability in that area. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-15156) Warn user if System.exit() is called in user code
[ https://issues.apache.org/jira/browse/FLINK-15156?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17216875#comment-17216875 ] Hwanju Kim commented on FLINK-15156: For 3, definitely small effort. I was just curious about what usecase you have seen with the concern. I agree and I think we should add this option as well. I will rebase the current one onto the mainline to prepare for the review. For 2, I think I would leave the current instrumentation as is, while expecting addition/removal to be done, which should be easy, during the review. > Warn user if System.exit() is called in user code > - > > Key: FLINK-15156 > URL: https://issues.apache.org/jira/browse/FLINK-15156 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Reporter: Robert Metzger >Priority: Minor > Labels: starter > > It would make debugging Flink errors easier if we would intercept and log > calls to System.exit() through the SecurityManager. > A user recently had an error where the JobManager was shutting down because > of a System.exit() in the user code: > https://lists.apache.org/thread.html/b28dabcf3068d489f38399c456c80d48569fcdf74b15f8bb95d532d0%40%3Cuser.flink.apache.org%3E > If I remember correctly, we had such issues before. > I put this ticket into the "Runtime / Coordination" component, as it is > mostly about improving the usability / debuggability in that area. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-15156) Warn user if System.exit() is called in user code
[ https://issues.apache.org/jira/browse/FLINK-15156?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17215561#comment-17215561 ] Hwanju Kim commented on FLINK-15156: Robert, thanks for the comments: # Performance penalties: Good question. Although we haven't microbenchmarked its impact, we haven't heard performance regression. Actually, all of the security check is not on data path more in control path (thread control, socket accept/connect, exit, etc), I did not think it would matter. More importantly, if there is no added check, it's just a single function call with null check/return (and JDK also by default does one getSecurityManager call and null check as well). So considering that additional path is very small in control path, I think overhead should be negligible. Nonetheless, for platform provider, such security manager for sandboxing is inevitable going forward not only for exit, which is just the first usecase, even if it may cause a bit of penalties. That's ene reason to have configuration that defaults to false for general users (not platform providers). # More check points: Right. Thanks for the input. I also tried (ambitiously) to find all the places in the first place, but it was not very practical (also code addition may make patch larger) at that point so decided to add the frequently implemented path. Indeed, main() is the one where exit() is (most likely erroneously) put blindly following the same practice for general Java application. I haven't had data for the case where users put exit() in UDF though (but it may be possible if some badly written library can call it behind the scene, but I haven't seen that yet). We can further discuss about what more points are interesting. # In the case I have seen, mostly their exit is with log already (e.g., this argument is not provided, exiting) and we wanted to put the guard not terminating JVM by further messaging "you are not allowed to terminate JVM". I thought throwing exception with message is providing user with warning/debugging message without terminating JVM. Do you think some users still want their exit to be performed intentionally? I assumed that UDF or main terminating JVM is always unintended in Flink. But if exit may still be a wanted behavior in some cases, I think it makes sense to provide that option as well. > Warn user if System.exit() is called in user code > - > > Key: FLINK-15156 > URL: https://issues.apache.org/jira/browse/FLINK-15156 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Reporter: Robert Metzger >Priority: Minor > Labels: starter > > It would make debugging Flink errors easier if we would intercept and log > calls to System.exit() through the SecurityManager. > A user recently had an error where the JobManager was shutting down because > of a System.exit() in the user code: > https://lists.apache.org/thread.html/b28dabcf3068d489f38399c456c80d48569fcdf74b15f8bb95d532d0%40%3Cuser.flink.apache.org%3E > If I remember correctly, we had such issues before. > I put this ticket into the "Runtime / Coordination" component, as it is > mostly about improving the usability / debuggability in that area. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-15156) Warn user if System.exit() is called in user code
[ https://issues.apache.org/jira/browse/FLINK-15156?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17211949#comment-17211949 ] Hwanju Kim commented on FLINK-15156: FWIW, the following is what we have done: * Flink user security manager is added for general user sandbox checking, where currently only the exit is checked (others can be added later here). * The added one is forwarding all the checks but its overridden ones to previous security manager, if any (like decorator). * The security manager is set when JM and TM start (if configured, as described in the last bullet point). * Exit check has enabling/disabling point via a method only to affect user code, as Flink runtime needs to exit for some cases (e.g., fatal error). ** Once enabled, any thread spawned from the main thread inherits the enable flag. * What's enclosed by this enabled exit check is currently best-effort, not covering all the places where user code is involved. Main places are: ** main() in JM (currently for invokeInteractiveModeForExecution) ** StreamTask.invoke, triggerCheckpoint, cancel. * New exception, UserSystemExitException, is defined to be thrown when user code attempts to exit JVM. This has default message to warn the user. ** In main(), it's wrapped into ProgramInvocationException. ** In UDF, it fails the exiting task, thereby shipping the exception to JM triggering fail-over. * This security manager is only added if configuration (under security section) in flink-conf.yaml is enabled (disabled by default). The configuration is per check case (but currently only disallow-system-exit is available). Please let me know if anyone wants to review the patch, or just discussion if anything does not make sense. > Warn user if System.exit() is called in user code > - > > Key: FLINK-15156 > URL: https://issues.apache.org/jira/browse/FLINK-15156 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Reporter: Robert Metzger >Priority: Minor > Labels: starter > > It would make debugging Flink errors easier if we would intercept and log > calls to System.exit() through the SecurityManager. > A user recently had an error where the JobManager was shutting down because > of a System.exit() in the user code: > https://lists.apache.org/thread.html/b28dabcf3068d489f38399c456c80d48569fcdf74b15f8bb95d532d0%40%3Cuser.flink.apache.org%3E > If I remember correctly, we had such issues before. > I put this ticket into the "Runtime / Coordination" component, as it is > mostly about improving the usability / debuggability in that area. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-14938) Flink elasticsearch failure handler re-add indexrequest causes ConcurrentModificationException
[ https://issues.apache.org/jira/browse/FLINK-14938?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17093058#comment-17093058 ] Hwanju Kim commented on FLINK-14938: [~ysn2233] - I have a question on your proposed solution. I wonder if you've gotten any performance measurement, which would've led you to that hybrid solution. IMO, since this ElasticSearch sink path is involved in I/O to external services, any CPU penalty incurred by concurrency-aware data structure may be dwarfed or invisible by much high I/O latency, so using CurrentLinkedQueue seems to be just fine to me (I mean in terms of latency, but it could be some additional CPU cost, which I am also not sure about its significance quantitatively). We are also looking to the resolution of this problem, but wanted to check if you have performance test to confirm whether such cost matters. > Flink elasticsearch failure handler re-add indexrequest causes > ConcurrentModificationException > -- > > Key: FLINK-14938 > URL: https://issues.apache.org/jira/browse/FLINK-14938 > Project: Flink > Issue Type: Bug > Components: Connectors / ElasticSearch >Affects Versions: 1.8.1 >Reporter: Shengnan YU >Assignee: Shengnan YU >Priority: Major > > > When use Elasticsearch connector failure handler (from official example) to > re-add documents, Flink encountered ConcurrentModificationException. > {code:java} > input.addSink(new ElasticsearchSink<>( > config, transportAddresses, > new ElasticsearchSinkFunction() {...}, > new ActionRequestFailureHandler() { > @Override > void onFailure(ActionRequest action, > Throwable failure, > int restStatusCode, > RequestIndexer indexer) throw Throwable { > if (ExceptionUtils.findThrowable(failure, > EsRejectedExecutionException.class).isPresent()) { > // full queue; re-add document for indexing > indexer.add(action); > } > } > })); > {code} > I found that in method BufferingNoOpRequestIndexer$processBufferedRequests, > it will iterator a list of ActionRequest. However the failure handler will > keep re-adding request to that list after bulk, which causes > ConcurrentModificationException. > {code:java} > void processBufferedRequests(RequestIndexer actualIndexer) { >for (ActionRequest request : bufferedRequests) { > if (request instanceof IndexRequest) { > actualIndexer.add((IndexRequest) request); > } else if (request instanceof DeleteRequest) { > actualIndexer.add((DeleteRequest) request); > } else if (request instanceof UpdateRequest) { > actualIndexer.add((UpdateRequest) request); > } >} >bufferedRequests.clear(); > }{code} > I think it should be a multi-thread bug and is it ok to use concurrent queue > to maintain the failure request? > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (FLINK-8417) Support STSAssumeRoleSessionCredentialsProvider in FlinkKinesisConsumer
[ https://issues.apache.org/jira/browse/FLINK-8417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17055658#comment-17055658 ] Hwanju Kim edited comment on FLINK-8417 at 3/10/20, 7:24 AM: - I have a basic question on this. I wonder how this is different from {{AWSConfigConstants.CredentialsProvider.ASSUME_ROLE}} (by FLINK-9686 - although it says it's for producer, it should be available for consumer as it's with properties). AFAIK, with ASSUME_ROLE, if correct role ARN with proper policy/trust relationship is set, cross-account stream access could be feasible. I may miss some context here about what's currently not supported (consumer support, or creds expiration issue, or something else?). From the thread right above, it seems to point to ASSUME_ROLE, but it says it's not working in TM but in JM, which is little confusing to me. was (Author: hwanju): I have a basic question on this. I wonder how this is different from {{AWSConfigConstants.CredentialsProvider.}}ASSUME_ROLE (by FLINK-9686 - although it says it's for producer, it should be available for consumer as it's with properties). AFAIK, with ASSUME_ROLE, if correct role ARN with proper policy/trust relationship is set, cross-account stream access could be feasible. I may miss some context here about what's currently not supported (consumer support, or creds expiration issue, or something else?). From the thread right above, it seems to point to ASSUME_ROLE, but it says it's not working in TM but in JM, which is little confusing to me. > Support STSAssumeRoleSessionCredentialsProvider in FlinkKinesisConsumer > --- > > Key: FLINK-8417 > URL: https://issues.apache.org/jira/browse/FLINK-8417 > Project: Flink > Issue Type: New Feature > Components: Connectors / Kinesis >Reporter: Tzu-Li (Gordon) Tai >Priority: Major > Labels: usability > > As discussed in ML: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kinesis-Connectors-With-Temporary-Credentials-td17734.html. > Users need the functionality to access cross-account AWS Kinesis streams, > using AWS Temporary Credentials [1]. > We should add support for > {{AWSConfigConstants.CredentialsProvider.STSAssumeRole}}, which internally > would use the {{STSAssumeRoleSessionCredentialsProvider}} [2] in > {{AWSUtil#getCredentialsProvider(Properties)}}. > [1] https://docs.aws.amazon.com/IAM/latest/UserGuide/id_credentials_temp.html > [2] > https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/STSAssumeRoleSessionCredentialsProvider.html -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-8417) Support STSAssumeRoleSessionCredentialsProvider in FlinkKinesisConsumer
[ https://issues.apache.org/jira/browse/FLINK-8417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17055658#comment-17055658 ] Hwanju Kim commented on FLINK-8417: --- I have a basic question on this. I wonder how this is different from {{AWSConfigConstants.CredentialsProvider.}}ASSUME_ROLE (by FLINK-9686 - although it says it's for producer, it should be available for consumer as it's with properties). AFAIK, with ASSUME_ROLE, if correct role ARN with proper policy/trust relationship is set, cross-account stream access could be feasible. I may miss some context here about what's currently not supported (consumer support, or creds expiration issue, or something else?). From the thread right above, it seems to point to ASSUME_ROLE, but it says it's not working in TM but in JM, which is little confusing to me. > Support STSAssumeRoleSessionCredentialsProvider in FlinkKinesisConsumer > --- > > Key: FLINK-8417 > URL: https://issues.apache.org/jira/browse/FLINK-8417 > Project: Flink > Issue Type: New Feature > Components: Connectors / Kinesis >Reporter: Tzu-Li (Gordon) Tai >Priority: Major > Labels: usability > > As discussed in ML: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kinesis-Connectors-With-Temporary-Credentials-td17734.html. > Users need the functionality to access cross-account AWS Kinesis streams, > using AWS Temporary Credentials [1]. > We should add support for > {{AWSConfigConstants.CredentialsProvider.STSAssumeRole}}, which internally > would use the {{STSAssumeRoleSessionCredentialsProvider}} [2] in > {{AWSUtil#getCredentialsProvider(Properties)}}. > [1] https://docs.aws.amazon.com/IAM/latest/UserGuide/id_credentials_temp.html > [2] > https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/STSAssumeRoleSessionCredentialsProvider.html -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-14949) Task cancellation can be stuck against out-of-thread error
[ https://issues.apache.org/jira/browse/FLINK-14949?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16982305#comment-16982305 ] Hwanju Kim commented on FLINK-14949: [~azagrebin], thanks for the quick answer and sure, I can work on this. > Task cancellation can be stuck against out-of-thread error > -- > > Key: FLINK-14949 > URL: https://issues.apache.org/jira/browse/FLINK-14949 > Project: Flink > Issue Type: Bug > Components: Runtime / Task >Affects Versions: 1.8.2 >Reporter: Hwanju Kim >Priority: Major > > Task cancellation > ([_cancelOrFailAndCancelInvokable_|https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java#L991]) > relies on multiple separate threads, which are _TaskCanceler_, > _TaskInterrupter_, and _TaskCancelerWatchdog_. While TaskCanceler performs > cancellation itself, TaskInterrupter periodically interrupts a non-reacting > task and TaskCancelerWatchdog kills JVM if cancellation has never been > finished within a certain amount of time (by default 3 min). Those all ensure > that cancellation can be done or either aborted transitioning to a terminal > state in finite time (FLINK-4715). > However, if any asynchronous thread creation is failed such as by > out-of-thread (_java.lang.OutOfMemoryError: unable to create new native > thread_), the code transitions to CANCELING, but nothing could be performed > for cancellation or watched by watchdog. Currently, jobmanager does [retry > cancellation|https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java#L1121] > against any error returned, but a next retry [returns success once it sees > CANCELING|https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java#L997], > assuming that it is in progress. This leads to complete stuck in CANCELING, > which is non-terminal, so state machine is stuck after that. > One solution would be that if a task has transitioned to CANCELLING but it > gets fatal error or OOM (i.e., _isJvmFatalOrOutOfMemoryError_ is true) > indicating that it could not reach spawning TaskCancelerWatchdog, it could > immediately consider that as fatal error (not safely cancellable) calling > _notifyFatalError_, just as TaskCancelerWatchdog does but eagerly and > synchronously. That way, it can at least transition out of the non-terminal > state and furthermore clear potentially leaked thread/memory by restarting > JVM. The same method is also invoked by _failExternally_, but transitioning > to FAILED seems less critical as it's already terminal state. > How to reproduce is straightforward by running an application that keeps > creating threads, each of which never finishes in a loop, and has multiple > tasks so that one task triggers failure and then the others are attempted to > be cancelled by full fail-over. In web UI dashboard, some tasks from a task > manager where any of cancellation-related threads failed to be spawned are > stuck in CANCELLING for good. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-14949) Task cancellation can be stuck against out-of-thread error
Hwanju Kim created FLINK-14949: -- Summary: Task cancellation can be stuck against out-of-thread error Key: FLINK-14949 URL: https://issues.apache.org/jira/browse/FLINK-14949 Project: Flink Issue Type: Bug Components: Runtime / Coordination Affects Versions: 1.8.2 Reporter: Hwanju Kim Task cancellation ([_cancelOrFailAndCancelInvokable_|https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java#L991]) relies on multiple separate threads, which are _TaskCanceler_, _TaskInterrupter_, and _TaskCancelerWatchdog_. While TaskCanceler performs cancellation itself, TaskInterrupter periodically interrupts a non-reacting task and TaskCancelerWatchdog kills JVM if cancellation has never been finished within a certain amount of time (by default 3 min). Those all ensure that cancellation can be done or either aborted transitioning to a terminal state in finite time (FLINK-4715). However, if any asynchronous thread creation is failed such as by out-of-thread (_java.lang.OutOfMemoryError: unable to create new native thread_), the code transitions to CANCELING, but nothing could be performed for cancellation or watched by watchdog. Currently, jobmanager does [retry cancellation|https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java#L1121] against any error returned, but a next retry [returns success once it sees CANCELING|https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java#L997], assuming that it is in progress. This leads to complete stuck in CANCELING, which is non-terminal, so state machine is stuck after that. One solution would be that if a task has transitioned to CANCELLING but it gets fatal error or OOM (i.e., _isJvmFatalOrOutOfMemoryError_ is true) indicating that it could not reach spawning TaskCancelerWatchdog, it could immediately consider that as fatal error (not safely cancellable) calling _notifyFatalError_, just as TaskCancelerWatchdog does but eagerly and synchronously. That way, it can at least transition out of the non-terminal state and furthermore clear potentially leaked thread/memory by restarting JVM. The same method is also invoked by _failExternally_, but transitioning to FAILED seems less critical as it's already terminal state. How to reproduce is straightforward by running an application that keeps creating threads, each of which never finishes in a loop, and has multiple tasks so that one task triggers failure and then the others are attempted to be cancelled by full fail-over. In web UI dashboard, some tasks from a task manager where any of cancellation-related threads failed to be spawned are stuck in CANCELLING for good. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-14589) Redundant slot requests with the same AllocationID leads to inconsistent slot table
[ https://issues.apache.org/jira/browse/FLINK-14589?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16966989#comment-16966989 ] Hwanju Kim commented on FLINK-14589: [~trohrmann], sure I will ping you once PR can be ready. > Redundant slot requests with the same AllocationID leads to inconsistent slot > table > --- > > Key: FLINK-14589 > URL: https://issues.apache.org/jira/browse/FLINK-14589 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.6.3 >Reporter: Hwanju Kim >Assignee: Hwanju Kim >Priority: Major > Fix For: 1.10.0, 1.8.3, 1.9.2 > > > _NOTE: We found this issue in 1.6.2, but I checked the relevant code is still > in the mainline. What I am not sure, however, is what other slot-related > fixes after 1.6.2 (such as FLINK-11059 and FLINK-12863, etc) would prevent > the initial cause of this issue from happening. So far, I have not found the > related fix to the issue I am describing here, so opening this issue. Please > feel free to deduplicate this if another one already covers it. Please note > that we have already picked FLINK-9912, which turned out to be a major fix to > slot allocation failure issue. I will note the ramification to that issue > just in case others experience the same problem)._ > h2. Summary > When *requestSlot* is called from ResourceManager (RM) to TaskManager (TM), > TM firstly reserves the requested slot marking it as ALLOCATED, offers the > slot to JM, and marks the slot as ACTIVE once getting acknowledgement from > JM. This three-way communication for slot allocation is identified by > AllocationID, which is generated by JM initially. The way TM reserves a slot > is by calling *TaskSlotTable.allocateSlot* if the requested slot number > (i.e., slot index) is free to use. The major data structure is *TaskSlot* > indexed by slot index. Once the slot is marked as ALLOCATED with a given > AllocationID, it tries to update other maps such as *allocationIDTaskSlotMap* > keyed by AllocationID and *slotsPerJob* keyed by JobID. Here when updating > *allocationIDTaskSlotMap*, it's directly using > *allocationIDTaskSlotMap.put(allocationId, taskSlot)*, which may overwrite > existing entry, if one is already there with the same AllocationID. This > would render inconsistency between *TaskSlot* and *allocationIDTaskSlotMap*, > where the former says two slots are allocated by the same AllocationID and > the latter says the AllocationID only has the latest task slot. With this > state, once the slot is freed, *freeSlot* is driven by AllocationID, so it > fetches slot index (i.e., the latter one that has arrived later) from > *allocationIDTaskSlotMap*, marks the slot free, and removes it from > *allocationIDTaskSlotMap*. But still the old task slot is marked as > allocated. This old task slot becomes zombie and can never be freed. This can > cause permanent slot allocation failure if TM slots are statically and > tightly provisioned and resource manager is not actively spawning new TMs > where unavailable (e.g., Kubernetes without active mode integration, which is > not yet available). > h2. Scenario > From my observation, the redundant slot requests with the same AllocationID > and different slot indices should be rare but can happen with race condition > especially when repeated fail-over and heartbeat timeout (primarily caused by > transient resource overload, not permanent network partition/node outage) are > taking place. The following is a detailed scenario, which could lead to this > issue (AID is AllocationID): > # AID1 is requested from JM and put in the pending request queue in RM. > # RM picks up slot number 1 (Slot1) from freeSlots and performs requestSlot > with Slot1 and AID1. Here this slot request is on the fly. > # In the meantime, Slot1 is occupied by AID2 in TM for a delayed slot > request and TM sends slot report via heartbeat to RM saying Slot1 is already > allocated with AID2. > # RM's heartbeat handler identifies that Slot1 is occupied with a different > AID (AID2) so that it should reject the pending request sent from step 2. > # handleFailedSlotRequest puts the rejected AID1 to pending request again by > retrying the slot request. Now it picks up another available slot, say Slot2. > So, the retried slot request with Slot 2 and AID1 is on the fly. > # In the meantime, Slot1 occupied by AID2 is freed (by any disconnection > with JM, or releasing all the tasks in the slot on cancellation/failure - the > latter was observed). > # The in-flight slot request (Slot1, AID1) from step 2 arrives at TM, and > it's succeeded as Slot1 is free to allocate. TM offers the Slot1 to JM, which > acknowledges it so that TM marks
[jira] [Created] (FLINK-14589) Redundant slot requests with the same AllocationID leads to inconsistent slot table
Hwanju Kim created FLINK-14589: -- Summary: Redundant slot requests with the same AllocationID leads to inconsistent slot table Key: FLINK-14589 URL: https://issues.apache.org/jira/browse/FLINK-14589 Project: Flink Issue Type: Bug Components: Runtime / Coordination Affects Versions: 1.6.3 Reporter: Hwanju Kim _NOTE: We found this issue in 1.6.2, but I checked the relevant code is still in the mainline. What I am not sure, however, is what other slot-related fixes after 1.6.2 (such as FLINK-11059 and FLINK-12863, etc) would prevent the initial cause of this issue from happening. So far, I have not found the related fix to the issue I am describing here, so opening this issue. Please feel free to deduplicate this if another one already covers it. Please note that we have already picked FLINK-9912, which turned out to be a major fix to slot allocation failure issue. I will note the ramification to that issue just in case others experience the same problem)._ h2. Summary When *requestSlot* is called from ResourceManager (RM) to TaskManager (TM), TM firstly reserves the requested slot marking it as ALLOCATED, offers the slot to JM, and marks the slot as ACTIVE once getting acknowledgement from JM. This three-way communication for slot allocation is identified by AllocationID, which is generated by JM initially. The way TM reserves a slot is by calling *TaskSlotTable.allocateSlot* if the requested slot number (i.e., slot index) is free to use. The major data structure is *TaskSlot* indexed by slot index. Once the slot is marked as ALLOCATED with a given AllocationID, it tries to update other maps such as *allocationIDTaskSlotMap* keyed by AllocationID and *slotsPerJob* keyed by JobID. Here when updating *allocationIDTaskSlotMap*, it's directly using *allocationIDTaskSlotMap.put(allocationId, taskSlot)*, which may overwrite existing entry, if one is already there with the same AllocationID. This would render inconsistency between *TaskSlot* and *allocationIDTaskSlotMap*, where the former says two slots are allocated by the same AllocationID and the latter says the AllocationID only has the latest task slot. With this state, once the slot is freed, *freeSlot* is driven by AllocationID, so it fetches slot index (i.e., the latter one that has arrived later) from *allocationIDTaskSlotMap*, marks the slot free, and removes it from *allocationIDTaskSlotMap*. But still the old task slot is marked as allocated. This old task slot becomes zombie and can never be freed. This can cause permanent slot allocation failure if TM slots are statically and tightly provisioned and resource manager is not actively spawning new TMs where unavailable (e.g., Kubernetes without active mode integration, which is not yet available). h2. Scenario >From my observation, the redundant slot requests with the same AllocationID >and different slot indices should be rare but can happen with race condition >especially when repeated fail-over and heartbeat timeout (primarily caused by >transient resource overload, not permanent network partition/node outage) are >taking place. The following is a detailed scenario, which could lead to this >issue (AID is AllocationID): # AID1 is requested from JM and put in the pending request queue in RM. # RM picks up slot number 1 (Slot1) from freeSlots and performs requestSlot with Slot1 and AID1. Here this slot request is on the fly. # In the meantime, Slot1 is occupied by AID2 in TM for a delayed slot request and TM sends slot report via heartbeat to RM saying Slot1 is already allocated with AID2. # RM's heartbeat handler identifies that Slot1 is occupied with a different AID (AID2) so that it should reject the pending request sent from step 2. # handleFailedSlotRequest puts the rejected AID1 to pending request again by retrying the slot request. Now it picks up another available slot, say Slot2. So, the retried slot request with Slot 2 and AID1 is on the fly. # In the meantime, Slot1 occupied by AID2 is freed (by any disconnection with JM, or releasing all the tasks in the slot on cancellation/failure - the latter was observed). # The in-flight slot request (Slot1, AID1) from step 2 arrives at TM, and it's succeeded as Slot1 is free to allocate. TM offers the Slot1 to JM, which acknowledges it so that TM marks Slot1 ACTIVE with AID1. As this point, allocationIDTaskSlotMap[AID1] = Slot1 in TM. JM's allocatedSlots[AID1] = Slot1. # The next in-flight slot request (Slot2, AID1) from step 5 arrives at TM. As Slot2 is still free, TM marks it ALLOCATED and offers Slot2 to JM and *"overwrite allocationIDTaskSlotMap[AID1] to Slot2"* # In step 7, JM has allocatedSlots[AID1] = Slot1, which leads JM to reject the offer as the same AID is already occupied by another slot. # TM gets the rejected offer for (Slot2, AID1) and frees
[jira] [Commented] (FLINK-11127) Make metrics query service establish connection to JobManager
[ https://issues.apache.org/jira/browse/FLINK-11127?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16964210#comment-16964210 ] Hwanju Kim commented on FLINK-11127: [~tsubasa2oo2], in addition to what [~trohrmann] said (and it's interesting to me as well why cancel led to metric connection issue), I also wonder what was the workaround fix you used before 1.8 and had worked fine in the same scenario without the error you mentioned above. The problematic error related to DNS would show akka error with "Name or service not known". We've tested for TM-to-RM registration but not necessarily have seen the metrics connection error that you showed. > Make metrics query service establish connection to JobManager > - > > Key: FLINK-11127 > URL: https://issues.apache.org/jira/browse/FLINK-11127 > Project: Flink > Issue Type: Improvement > Components: Deployment / Kubernetes, Runtime / Coordination, Runtime > / Metrics >Affects Versions: 1.7.0 >Reporter: Ufuk Celebi >Priority: Major > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > As part of FLINK-10247, the internal metrics query service has been separated > into its own actor system. Before this change, the JobManager (JM) queried > TaskManager (TM) metrics via the TM actor. Now, the JM needs to establish a > separate connection to the TM metrics query service actor. > In the context of Kubernetes, this is problematic as the JM will typically > *not* be able to resolve the TMs by name, resulting in warnings as follows: > {code} > 2018-12-11 08:32:33,962 WARN akka.remote.ReliableDeliverySupervisor > - Association with remote system > [akka.tcp://flink-metrics@flink-task-manager-64b868487c-x9l4b:39183] has > failed, address is now gated for [50] ms. Reason: [Association failed with > [akka.tcp://flink-metrics@flink-task-manager-64b868487c-x9l4b:39183]] Caused > by: [flink-task-manager-64b868487c-x9l4b: Name does not resolve] > {code} > In order to expose the TMs by name in Kubernetes, users require a service > *for each* TM instance which is not practical. > This currently results in the web UI not being to display some basic metrics > about number of sent records. You can reproduce this by following the READMEs > in {{flink-container/kubernetes}}. > This worked before, because the JM is typically exposed via a service with a > known name and the TMs establish the connection to it which the metrics query > service piggybacked on. > A potential solution to this might be to let the query service connect to the > JM similar to how the TMs register. > I tagged this ticket as an improvement, but in the context of Kubernetes I > would consider this to be a bug. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-11127) Make metrics query service establish connection to JobManager
[ https://issues.apache.org/jira/browse/FLINK-11127?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16950234#comment-16950234 ] Hwanju Kim commented on FLINK-11127: [~vicTTim], although I have not retested with 1.8 (we would shortly though), I think that problem has been solved by that FLINK-11632 if you use "ip" as bind policy. What we have applied is a subset of FLINK-11632. > Make metrics query service establish connection to JobManager > - > > Key: FLINK-11127 > URL: https://issues.apache.org/jira/browse/FLINK-11127 > Project: Flink > Issue Type: Improvement > Components: Deployment / Kubernetes, Runtime / Coordination, Runtime > / Metrics >Affects Versions: 1.7.0 >Reporter: Ufuk Celebi >Priority: Major > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > As part of FLINK-10247, the internal metrics query service has been separated > into its own actor system. Before this change, the JobManager (JM) queried > TaskManager (TM) metrics via the TM actor. Now, the JM needs to establish a > separate connection to the TM metrics query service actor. > In the context of Kubernetes, this is problematic as the JM will typically > *not* be able to resolve the TMs by name, resulting in warnings as follows: > {code} > 2018-12-11 08:32:33,962 WARN akka.remote.ReliableDeliverySupervisor > - Association with remote system > [akka.tcp://flink-metrics@flink-task-manager-64b868487c-x9l4b:39183] has > failed, address is now gated for [50] ms. Reason: [Association failed with > [akka.tcp://flink-metrics@flink-task-manager-64b868487c-x9l4b:39183]] Caused > by: [flink-task-manager-64b868487c-x9l4b: Name does not resolve] > {code} > In order to expose the TMs by name in Kubernetes, users require a service > *for each* TM instance which is not practical. > This currently results in the web UI not being to display some basic metrics > about number of sent records. You can reproduce this by following the READMEs > in {{flink-container/kubernetes}}. > This worked before, because the JM is typically exposed via a service with a > known name and the TMs establish the connection to it which the metrics query > service piggybacked on. > A potential solution to this might be to let the query service connect to the > JM similar to how the TMs register. > I tagged this ticket as an improvement, but in the context of Kubernetes I > would consider this to be a bug. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-12260) Slot allocation failure by taskmanager registration timeout and race
[ https://issues.apache.org/jira/browse/FLINK-12260?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16832313#comment-16832313 ] Hwanju Kim commented on FLINK-12260: Sure. I will ping you once I will have tested the fix. > Slot allocation failure by taskmanager registration timeout and race > > > Key: FLINK-12260 > URL: https://issues.apache.org/jira/browse/FLINK-12260 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.6.3 >Reporter: Hwanju Kim >Priority: Critical > Attachments: FLINK-12260-repro.diff > > > > In 1.6.2., we have seen slot allocation failure keep happening for long time. > Having looked at the log, I see the following behavior: > # TM sends a registration request R1 to resource manager. > # R1 times out after 100ms, which is initial timeout. > # TM retries a registration request R2 to resource manager (with timeout > 200ms). > # R2 arrives first at resource manager and registered, and then TM gets > successful response moving onto step 5 below. > # On successful registration, R2's instance is put to > taskManagerRegistrations > # Then R1 arrives at resource manager and realizes the same TM resource ID > is already registered, which then unregisters R2's instance ID from > taskManagerRegistrations. A new instance ID for R1 is registered to > workerRegistration. > # R1's response is not handled though since it already timed out (see akka > temp actor resolve failure below), hence no registration to > taskManagerRegistrations. > # TM keeps heartbeating to the resource manager with slot status. > # Resource manager ignores this slot status, since taskManagerRegistrations > contains R2, not R1, which replaced R2 in workerRegistration at step 6. > # Slot request can never be fulfilled, timing out. > The following is the debug logs for the above steps: > > {code:java} > JM log: > 2019-04-11 22:39:40.000,Registering TaskManager with ResourceID > 46c8e0d0fcf2c306f11954a1040d5677 > (akka.ssl.tcp://flink@flink-taskmanager:6122/user/taskmanager_0) at > ResourceManager > 2019-04-11 22:39:40.000,Registering TaskManager > 46c8e0d0fcf2c306f11954a1040d5677 under deade132e2c41c52019cdc27977266cf at > the SlotManager. > 2019-04-11 22:39:40.000,Replacing old registration of TaskExecutor > 46c8e0d0fcf2c306f11954a1040d5677. > 2019-04-11 22:39:40.000,Unregister TaskManager > deade132e2c41c52019cdc27977266cf from the SlotManager. > 2019-04-11 22:39:40.000,Registering TaskManager with ResourceID > 46c8e0d0fcf2c306f11954a1040d5677 > (akka.ssl.tcp://flink@flink-taskmanager:6122/user/taskmanager_0) at > ResourceManager > TM log: > 2019-04-11 22:39:40.000,Registration at ResourceManager attempt 1 > (timeout=100ms) > 2019-04-11 22:39:40.000,Registration at ResourceManager > (akka.ssl.tcp://flink@flink-jobmanager:6123/user/resourcemanager) attempt 1 > timed out after 100 ms > 2019-04-11 22:39:40.000,Registration at ResourceManager attempt 2 > (timeout=200ms) > 2019-04-11 22:39:40.000,Successful registration at resource manager > akka.ssl.tcp://flink@flink-jobmanager:6123/user/resourcemanager under > registration id deade132e2c41c52019cdc27977266cf. > 2019-04-11 22:39:41.000,resolve of path sequence [/temp/$c] failed{code} > > As RPC calls seem to use akka ask, which creates temporary source actor, I > think multiple RPC calls could've arrived out or order by different actor > pairs and the symptom above seems to be due to that. If so, it could have > attempt account in the call argument to prevent unexpected unregistration? At > this point, what I have done is only log analysis, so I could do further > analysis, but before that wanted to check if it's a known issue. I also > searched with some relevant terms and log pieces, but couldn't find the > duplicate. Please deduplicate if any. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-12260) Slot allocation failure by taskmanager registration timeout and race
[ https://issues.apache.org/jira/browse/FLINK-12260?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16830862#comment-16830862 ] Hwanju Kim commented on FLINK-12260: Thanks for the clarification. I thought you meant it without introducing any additional map, but now it seems clear. I had tried thinking conservative approach as I couldn't 100% rule out the possibility of sender-side race. As we may have a potential simpler solution, I looked at the code again a little further. Initially what led me to any possibility of race is this part: {code:java} val a = PromiseActorRef(ref.provider, timeout, targetName = actorRef, message.getClass.getName, sender) actorRef.tell(message, a) a.result.future {code} This is internalAsk from invokeRpc and PromiseActorRef internally does scheduler.scheduleOnce(timeout.duration) for the timer. The tell of actorRef is sending a message to RM through RemoteActorRef and EndpointManager where the message is passed to Dispatcher, which enqueues the message to mbox and executes mbox via executor thread. My impression was that as tell is asynchronous via executor service, the timer of PromiseActorRef set up before can fire before the message hit the road off the sender. Although that'd be possible, the message at least seems to be enqueued to mbox for RM endpoint and thus the order can be preserved against the next attempt after timeout. So, the ordering seems fine. In addition I was also concerned the case where two different ask calls might happen to use two different TCP connections leading any possible out-of-order delivery. Although not 100% exercising the relevant code, it seems to use a single connection associated by akka endpoints and I checked that's true by packet capture. So, based on the code inspection and no successful repro on sender-side, we can currently conclude that the race is likely happening in task executor connection/handshake on the receiver-side (as repro does). I will test it out with the Till's proposal. On our side, once this fix ends up being applied, we can keep eyes on our test apps, which intermittently hit this issue, to see if there's any other race issue. > Slot allocation failure by taskmanager registration timeout and race > > > Key: FLINK-12260 > URL: https://issues.apache.org/jira/browse/FLINK-12260 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.6.3 >Reporter: Hwanju Kim >Priority: Critical > Attachments: FLINK-12260-repro.diff > > > > In 1.6.2., we have seen slot allocation failure keep happening for long time. > Having looked at the log, I see the following behavior: > # TM sends a registration request R1 to resource manager. > # R1 times out after 100ms, which is initial timeout. > # TM retries a registration request R2 to resource manager (with timeout > 200ms). > # R2 arrives first at resource manager and registered, and then TM gets > successful response moving onto step 5 below. > # On successful registration, R2's instance is put to > taskManagerRegistrations > # Then R1 arrives at resource manager and realizes the same TM resource ID > is already registered, which then unregisters R2's instance ID from > taskManagerRegistrations. A new instance ID for R1 is registered to > workerRegistration. > # R1's response is not handled though since it already timed out (see akka > temp actor resolve failure below), hence no registration to > taskManagerRegistrations. > # TM keeps heartbeating to the resource manager with slot status. > # Resource manager ignores this slot status, since taskManagerRegistrations > contains R2, not R1, which replaced R2 in workerRegistration at step 6. > # Slot request can never be fulfilled, timing out. > The following is the debug logs for the above steps: > > {code:java} > JM log: > 2019-04-11 22:39:40.000,Registering TaskManager with ResourceID > 46c8e0d0fcf2c306f11954a1040d5677 > (akka.ssl.tcp://flink@flink-taskmanager:6122/user/taskmanager_0) at > ResourceManager > 2019-04-11 22:39:40.000,Registering TaskManager > 46c8e0d0fcf2c306f11954a1040d5677 under deade132e2c41c52019cdc27977266cf at > the SlotManager. > 2019-04-11 22:39:40.000,Replacing old registration of TaskExecutor > 46c8e0d0fcf2c306f11954a1040d5677. > 2019-04-11 22:39:40.000,Unregister TaskManager > deade132e2c41c52019cdc27977266cf from the SlotManager. > 2019-04-11 22:39:40.000,Registering TaskManager with ResourceID > 46c8e0d0fcf2c306f11954a1040d5677 > (akka.ssl.tcp://flink@flink-taskmanager:6122/user/taskmanager_0) at > ResourceManager > TM log: > 2019-04-11 22:39:40.000,Registration at ResourceManager attempt 1 > (timeout=100ms) > 2019-04-11 22:39:40.000,Registration at ResourceManager >
[jira] [Commented] (FLINK-12260) Slot allocation failure by taskmanager registration timeout and race
[ https://issues.apache.org/jira/browse/FLINK-12260?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16830100#comment-16830100 ] Hwanju Kim commented on FLINK-12260: Thanks Till. I had tested with attempt count addition to RPC before your comment. I worked as expected. At any rate, I may need some clarification on your idea about reference equality, regarding how it could address this issue. Although the repro uses delay in task executor connection, as mentioned, the race may happen on sender side. Even for dealing with race in the task executor connection, I am curious how reference check alone would work. If receiver-only approach can work, it would be better compared to attempt count one. > Slot allocation failure by taskmanager registration timeout and race > > > Key: FLINK-12260 > URL: https://issues.apache.org/jira/browse/FLINK-12260 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.6.3 >Reporter: Hwanju Kim >Priority: Critical > Attachments: FLINK-12260-repro.diff > > > > In 1.6.2., we have seen slot allocation failure keep happening for long time. > Having looked at the log, I see the following behavior: > # TM sends a registration request R1 to resource manager. > # R1 times out after 100ms, which is initial timeout. > # TM retries a registration request R2 to resource manager (with timeout > 200ms). > # R2 arrives first at resource manager and registered, and then TM gets > successful response moving onto step 5 below. > # On successful registration, R2's instance is put to > taskManagerRegistrations > # Then R1 arrives at resource manager and realizes the same TM resource ID > is already registered, which then unregisters R2's instance ID from > taskManagerRegistrations. A new instance ID for R1 is registered to > workerRegistration. > # R1's response is not handled though since it already timed out (see akka > temp actor resolve failure below), hence no registration to > taskManagerRegistrations. > # TM keeps heartbeating to the resource manager with slot status. > # Resource manager ignores this slot status, since taskManagerRegistrations > contains R2, not R1, which replaced R2 in workerRegistration at step 6. > # Slot request can never be fulfilled, timing out. > The following is the debug logs for the above steps: > > {code:java} > JM log: > 2019-04-11 22:39:40.000,Registering TaskManager with ResourceID > 46c8e0d0fcf2c306f11954a1040d5677 > (akka.ssl.tcp://flink@flink-taskmanager:6122/user/taskmanager_0) at > ResourceManager > 2019-04-11 22:39:40.000,Registering TaskManager > 46c8e0d0fcf2c306f11954a1040d5677 under deade132e2c41c52019cdc27977266cf at > the SlotManager. > 2019-04-11 22:39:40.000,Replacing old registration of TaskExecutor > 46c8e0d0fcf2c306f11954a1040d5677. > 2019-04-11 22:39:40.000,Unregister TaskManager > deade132e2c41c52019cdc27977266cf from the SlotManager. > 2019-04-11 22:39:40.000,Registering TaskManager with ResourceID > 46c8e0d0fcf2c306f11954a1040d5677 > (akka.ssl.tcp://flink@flink-taskmanager:6122/user/taskmanager_0) at > ResourceManager > TM log: > 2019-04-11 22:39:40.000,Registration at ResourceManager attempt 1 > (timeout=100ms) > 2019-04-11 22:39:40.000,Registration at ResourceManager > (akka.ssl.tcp://flink@flink-jobmanager:6123/user/resourcemanager) attempt 1 > timed out after 100 ms > 2019-04-11 22:39:40.000,Registration at ResourceManager attempt 2 > (timeout=200ms) > 2019-04-11 22:39:40.000,Successful registration at resource manager > akka.ssl.tcp://flink@flink-jobmanager:6123/user/resourcemanager under > registration id deade132e2c41c52019cdc27977266cf. > 2019-04-11 22:39:41.000,resolve of path sequence [/temp/$c] failed{code} > > As RPC calls seem to use akka ask, which creates temporary source actor, I > think multiple RPC calls could've arrived out or order by different actor > pairs and the symptom above seems to be due to that. If so, it could have > attempt account in the call argument to prevent unexpected unregistration? At > this point, what I have done is only log analysis, so I could do further > analysis, but before that wanted to check if it's a known issue. I also > searched with some relevant terms and log pieces, but couldn't find the > duplicate. Please deduplicate if any. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-12260) Slot allocation failure by taskmanager registration timeout and race
[ https://issues.apache.org/jira/browse/FLINK-12260?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16827369#comment-16827369 ] Hwanju Kim edited comment on FLINK-12260 at 4/26/19 11:30 PM: -- I got repro but in somewhat tricky way, since it's definitely rarely happening race. But as mentioned, once it falls into this state, it can't get out of the state (by assuming that we're not using active resource manager). In the repro, I injected artificial delay to RM->TM connection on task executor registration, which can timeout the first registration request resulting in 2nd try. Since RM->TM connection is carried out in a separate thread via akka ask call, delaying here can't block the resource manager endpoint mailbox processing, so any further request can be processed during the delay. I initially added the delay in handling registerTaskExecutorInternal, but as it uses RPC's executor, the delay blocks all the further retries, hence not reproducing the race. With the delay in TM connection, 2nd task registration attempt can overtake the 1st one going ahead with TM registration, and then the resumed 1st request unregisters the TM registration. Although I mimicked the race on RM side, I think still sender side can also have potential delay (like by network) during tell part in akka ask causing timeout and leading to 2nd try racing 1st one. The latter was trickier to mimic, so I tried the first approach. The following is the JM/TM logs. JM log: {code:java} 2019-04-26 17:14:44,921 DEBUG org.apache.flink.runtime.rpc.akka.AkkaRpcService - Try to connect to remote RPC endpoint with address akka.ssl.tcp://flink@192.168.69.15:6122/user/taskmanager_0. Returning a org.apache.flink.runtime.taskexecutor.TaskExecutorGateway gateway. 2019-04-26 17:14:44,924 INFO org.apache.flink.runtime.rpc.akka.AkkaRpcService - [REPRO] thread 19 attempt 1 2019-04-26 17:14:44,996 INFO org.apache.flink.runtime.rpc.akka.AkkaRpcService - [REPRO] thread 22 sleep... 2019-04-26 17:14:45,021 DEBUG org.apache.flink.runtime.rpc.akka.AkkaRpcService - Try to connect to remote RPC endpoint with address akka.ssl.tcp://flink@192.168.69.15:6122/user/taskmanager_0. Returning a org.apache.flink.runtime.taskexecutor.TaskExecutorGateway gateway. 2019-04-26 17:14:45,022 INFO org.apache.flink.runtime.rpc.akka.AkkaRpcService - [REPRO] thread 19 attempt 2 2019-04-26 17:14:45,038 INFO org.apache.flink.runtime.resourcemanager.StandaloneResourceManager - Registering TaskManager with ResourceID d0a410ee9060e62e0c7ef9e46f6418da (akka.ssl.tcp://flink@192.168.69.15:6122/user/taskmanager_0) at ResourceManager under instance id fa4408b5412bb8c18a6a7e58fdc8ff18 2019-04-26 17:14:45,093 DEBUG org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager - Registering TaskManager d0a410ee9060e62e0c7ef9e46f6418da under fa4408b5412bb8c18a6a7e58fdc8ff18 at the SlotManager. 2019-04-26 17:14:45,997 INFO org.apache.flink.runtime.rpc.akka.AkkaRpcService - [REPRO] thread 22 done 2019-04-26 17:14:45,998 DEBUG org.apache.flink.runtime.resourcemanager.StandaloneResourceManager - Replacing old registration of TaskExecutor d0a410ee9060e62e0c7ef9e46f6418da. 2019-04-26 17:14:45,998 DEBUG org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager - Unregister TaskManager fa4408b5412bb8c18a6a7e58fdc8ff18 from the SlotManager. 2019-04-26 17:14:46,000 INFO org.apache.flink.runtime.resourcemanager.StandaloneResourceManager - Registering TaskManager with ResourceID d0a410ee9060e62e0c7ef9e46f6418da (akka.ssl.tcp://flink@192.168.69.15:6122/user/taskmanager_0) at ResourceManager under instance id ebad00b418637d2774b8f131d49cc79e 2019-04-26 17:14:46,000 DEBUG org.apache.flink.runtime.resourcemanager.StandaloneResourceManager - The target with resource ID d0a410ee9060e62e0c7ef9e46f6418da is already been monitored. 2019-04-26 17:14:47,387 DEBUG org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager - Received slot report from instance ebad00b418637d2774b8f131d49cc79e. 2019-04-26 17:14:47,387 DEBUG org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager - Received slot report for unknown task manager with instance id ebad00b418637d2774b8f131d49cc79e. Ignoring this report. 2019-04-26 17:19:48,045 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Job (ed0fbfff272391d1f2a98de45fda6453) switched from state RUNNING to FAILING. org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Could not allocate all requires slots within timeout of 30 ms. Slots required: 1, slots allocated: 0 ... {code} TM log: {code:java} 2019-04-26 17:14:44,897 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Resolved ResourceManager address, beginning registration 2019-04-26 17:14:44,897 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor
[jira] [Commented] (FLINK-12260) Slot allocation failure by taskmanager registration timeout and race
[ https://issues.apache.org/jira/browse/FLINK-12260?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16827369#comment-16827369 ] Hwanju Kim commented on FLINK-12260: I got repro but in somewhat tricky way, since it's definitely rarely happening race. But as mentioned, once it falls into this state, it can't get out of the state (by assuming that we're not using active resource manager). In the repro, I injected artificial delay to RM->TM connection on task executor registration, which can timeout the first registration request resulting in 2nd try. Since RM->TM connection is carried out in a separate thread via akka ask call, delaying here can't block the resource manager endpoint mailbox processing, so any further request can be processed during the delay. I initially added the delay in handling registerTaskExecutorInternal, but as it uses RPC's executor, the delay blocks all the further retries, hence not reproducing the race. With the delay in TM connection, 2nd task registration attempt can overtake the 1st one going ahead with TM registration, and then the resumed 1st request unregisters the TM registration. Although I mimicked the race on RM side, I think still sender side can also have potential delay (like by network) during tell part in akka ask causing timeout and leading to 2nd try racing 1st one. The latter was trickier to mimic, so I tried the first approach. The following is the JM/TM logs. JM log: {code:java} 2019-04-26 17:14:44,921 DEBUG org.apache.flink.runtime.rpc.akka.AkkaRpcService - Try to connect to remote RPC endpoint with address akka.ssl.tcp://flink@192.168.69.15:6122/user/taskmanager_0. Returning a org.apache.flink.runtime.taskexecutor.TaskExecutorGateway gateway. 2019-04-26 17:14:44,924 INFO org.apache.flink.runtime.rpc.akka.AkkaRpcService - [REPRO] thread 19 attempt 1 2019-04-26 17:14:44,996 INFO org.apache.flink.runtime.rpc.akka.AkkaRpcService - [REPRO] thread 22 sleep... 2019-04-26 17:14:45,021 DEBUG org.apache.flink.runtime.rpc.akka.AkkaRpcService - Try to connect to remote RPC endpoint with address akka.ssl.tcp://flink@192.168.69.15:6122/user/taskmanager_0. Returning a org.apache.flink.runtime.taskexecutor.TaskExecutorGateway gateway. 2019-04-26 17:14:45,022 INFO org.apache.flink.runtime.rpc.akka.AkkaRpcService - [REPRO] thread 19 attempt 2 2019-04-26 17:14:45,038 INFO org.apache.flink.runtime.resourcemanager.StandaloneResourceManager - Registering TaskManager with ResourceID d0a410ee9060e62e0c7ef9e46f6418da (akka.ssl.tcp://flink@192.168.69.15:6122/user/taskmanager_0) at ResourceManager under instance id fa4408b5412bb8c18a6a7e58fdc8ff18 2019-04-26 17:14:45,093 DEBUG org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager - Registering TaskManager d0a410ee9060e62e0c7ef9e46f6418da under fa4408b5412bb8c18a6a7e58fdc8ff18 at the SlotManager. 2019-04-26 17:14:45,997 INFO org.apache.flink.runtime.rpc.akka.AkkaRpcService - [REPRO] thread 22 done 2019-04-26 17:14:45,998 DEBUG org.apache.flink.runtime.resourcemanager.StandaloneResourceManager - Replacing old registration of TaskExecutor d0a410ee9060e62e0c7ef9e46f6418da. 2019-04-26 17:14:45,998 DEBUG org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager - Unregister TaskManager fa4408b5412bb8c18a6a7e58fdc8ff18 from the SlotManager. 2019-04-26 17:14:46,000 INFO org.apache.flink.runtime.resourcemanager.StandaloneResourceManager - Registering TaskManager with ResourceID d0a410ee9060e62e0c7ef9e46f6418da (akka.ssl.tcp://flink@192.168.69.15:6122/user/taskmanager_0) at ResourceManager under instance id ebad00b418637d2774b8f131d49cc79e 2019-04-26 17:14:46,000 DEBUG org.apache.flink.runtime.resourcemanager.StandaloneResourceManager - The target with resource ID d0a410ee9060e62e0c7ef9e46f6418da is already been monitored. 2019-04-26 17:14:47,387 DEBUG org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager - Received slot report from instance ebad00b418637d2774b8f131d49cc79e. 2019-04-26 17:14:47,387 DEBUG org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager - Received slot report for unknown task manager with instance id ebad00b418637d2774b8f131d49cc79e. Ignoring this report. 2019-04-26 17:19:48,045 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Job (ed0fbfff272391d1f2a98de45fda6453) switched from state RUNNING to FAILING. org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Could not allocate all requires slots within timeout of 30 ms. Slots required: 1, slots allocated: 0 ... {code} TM log: {code:java} 2019-04-26 17:14:44,897 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Resolved ResourceManager address, beginning registration 2019-04-26 17:14:44,897 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Registration at ResourceManager attempt 1
[jira] [Updated] (FLINK-12260) Slot allocation failure by taskmanager registration timeout and race
[ https://issues.apache.org/jira/browse/FLINK-12260?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hwanju Kim updated FLINK-12260: --- Attachment: FLINK-12260-repro.diff > Slot allocation failure by taskmanager registration timeout and race > > > Key: FLINK-12260 > URL: https://issues.apache.org/jira/browse/FLINK-12260 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.6.3 >Reporter: Hwanju Kim >Priority: Critical > Attachments: FLINK-12260-repro.diff > > > > In 1.6.2., we have seen slot allocation failure keep happening for long time. > Having looked at the log, I see the following behavior: > # TM sends a registration request R1 to resource manager. > # R1 times out after 100ms, which is initial timeout. > # TM retries a registration request R2 to resource manager (with timeout > 200ms). > # R2 arrives first at resource manager and registered, and then TM gets > successful response moving onto step 5 below. > # On successful registration, R2's instance is put to > taskManagerRegistrations > # Then R1 arrives at resource manager and realizes the same TM resource ID > is already registered, which then unregisters R2's instance ID from > taskManagerRegistrations. A new instance ID for R1 is registered to > workerRegistration. > # R1's response is not handled though since it already timed out (see akka > temp actor resolve failure below), hence no registration to > taskManagerRegistrations. > # TM keeps heartbeating to the resource manager with slot status. > # Resource manager ignores this slot status, since taskManagerRegistrations > contains R2, not R1, which replaced R2 in workerRegistration at step 6. > # Slot request can never be fulfilled, timing out. > The following is the debug logs for the above steps: > > {code:java} > JM log: > 2019-04-11 22:39:40.000,Registering TaskManager with ResourceID > 46c8e0d0fcf2c306f11954a1040d5677 > (akka.ssl.tcp://flink@flink-taskmanager:6122/user/taskmanager_0) at > ResourceManager > 2019-04-11 22:39:40.000,Registering TaskManager > 46c8e0d0fcf2c306f11954a1040d5677 under deade132e2c41c52019cdc27977266cf at > the SlotManager. > 2019-04-11 22:39:40.000,Replacing old registration of TaskExecutor > 46c8e0d0fcf2c306f11954a1040d5677. > 2019-04-11 22:39:40.000,Unregister TaskManager > deade132e2c41c52019cdc27977266cf from the SlotManager. > 2019-04-11 22:39:40.000,Registering TaskManager with ResourceID > 46c8e0d0fcf2c306f11954a1040d5677 > (akka.ssl.tcp://flink@flink-taskmanager:6122/user/taskmanager_0) at > ResourceManager > TM log: > 2019-04-11 22:39:40.000,Registration at ResourceManager attempt 1 > (timeout=100ms) > 2019-04-11 22:39:40.000,Registration at ResourceManager > (akka.ssl.tcp://flink@flink-jobmanager:6123/user/resourcemanager) attempt 1 > timed out after 100 ms > 2019-04-11 22:39:40.000,Registration at ResourceManager attempt 2 > (timeout=200ms) > 2019-04-11 22:39:40.000,Successful registration at resource manager > akka.ssl.tcp://flink@flink-jobmanager:6123/user/resourcemanager under > registration id deade132e2c41c52019cdc27977266cf. > 2019-04-11 22:39:41.000,resolve of path sequence [/temp/$c] failed{code} > > As RPC calls seem to use akka ask, which creates temporary source actor, I > think multiple RPC calls could've arrived out or order by different actor > pairs and the symptom above seems to be due to that. If so, it could have > attempt account in the call argument to prevent unexpected unregistration? At > this point, what I have done is only log analysis, so I could do further > analysis, but before that wanted to check if it's a known issue. I also > searched with some relevant terms and log pieces, but couldn't find the > duplicate. Please deduplicate if any. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-12260) Slot allocation failure by taskmanager registration timeout and race
[ https://issues.apache.org/jira/browse/FLINK-12260?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16824340#comment-16824340 ] Hwanju Kim commented on FLINK-12260: Thanks Till for the comment. I haven't attempted to reproduce the issue locally, and I will try reproducing the issue with the better logging and will update this thread once I get more info. Regarding odd timestamps of the logs, this is our logging issue we plan to fix, which loses millisecond granularity when sending flink logs to long-term log archive. As this was the postmortem analysis, I couldn't gather the live flink logs, that was why the logs zeroed out milliseconds part. > Slot allocation failure by taskmanager registration timeout and race > > > Key: FLINK-12260 > URL: https://issues.apache.org/jira/browse/FLINK-12260 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.6.3 >Reporter: Hwanju Kim >Priority: Major > > > In 1.6.2., we have seen slot allocation failure keep happening for long time. > Having looked at the log, I see the following behavior: > # TM sends a registration request R1 to resource manager. > # R1 times out after 100ms, which is initial timeout. > # TM retries a registration request R2 to resource manager (with timeout > 200ms). > # R2 arrives first at resource manager and registered, and then TM gets > successful response moving onto step 5 below. > # On successful registration, R2's instance is put to > taskManagerRegistrations > # Then R1 arrives at resource manager and realizes the same TM resource ID > is already registered, which then unregisters R2's instance ID from > taskManagerRegistrations. A new instance ID for R1 is registered to > workerRegistration. > # R1's response is not handled though since it already timed out (see akka > temp actor resolve failure below), hence no registration to > taskManagerRegistrations. > # TM keeps heartbeating to the resource manager with slot status. > # Resource manager ignores this slot status, since taskManagerRegistrations > contains R2, not R1, which replaced R2 in workerRegistration at step 6. > # Slot request can never be fulfilled, timing out. > The following is the debug logs for the above steps: > > {code:java} > JM log: > 2019-04-11 22:39:40.000,Registering TaskManager with ResourceID > 46c8e0d0fcf2c306f11954a1040d5677 > (akka.ssl.tcp://flink@flink-taskmanager:6122/user/taskmanager_0) at > ResourceManager > 2019-04-11 22:39:40.000,Registering TaskManager > 46c8e0d0fcf2c306f11954a1040d5677 under deade132e2c41c52019cdc27977266cf at > the SlotManager. > 2019-04-11 22:39:40.000,Replacing old registration of TaskExecutor > 46c8e0d0fcf2c306f11954a1040d5677. > 2019-04-11 22:39:40.000,Unregister TaskManager > deade132e2c41c52019cdc27977266cf from the SlotManager. > 2019-04-11 22:39:40.000,Registering TaskManager with ResourceID > 46c8e0d0fcf2c306f11954a1040d5677 > (akka.ssl.tcp://flink@flink-taskmanager:6122/user/taskmanager_0) at > ResourceManager > TM log: > 2019-04-11 22:39:40.000,Registration at ResourceManager attempt 1 > (timeout=100ms) > 2019-04-11 22:39:40.000,Registration at ResourceManager > (akka.ssl.tcp://flink@flink-jobmanager:6123/user/resourcemanager) attempt 1 > timed out after 100 ms > 2019-04-11 22:39:40.000,Registration at ResourceManager attempt 2 > (timeout=200ms) > 2019-04-11 22:39:40.000,Successful registration at resource manager > akka.ssl.tcp://flink@flink-jobmanager:6123/user/resourcemanager under > registration id deade132e2c41c52019cdc27977266cf. > 2019-04-11 22:39:41.000,resolve of path sequence [/temp/$c] failed{code} > > As RPC calls seem to use akka ask, which creates temporary source actor, I > think multiple RPC calls could've arrived out or order by different actor > pairs and the symptom above seems to be due to that. If so, it could have > attempt account in the call argument to prevent unexpected unregistration? At > this point, what I have done is only log analysis, so I could do further > analysis, but before that wanted to check if it's a known issue. I also > searched with some relevant terms and log pieces, but couldn't find the > duplicate. Please deduplicate if any. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-11127) Make metrics query service establish connection to JobManager
[ https://issues.apache.org/jira/browse/FLINK-11127?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16821683#comment-16821683 ] Hwanju Kim edited comment on FLINK-11127 at 4/22/19 7:06 AM: - We experienced a similar issue to this, but more seriously with the communication between resource manager and task manager. In a normal situation, it works fine since only TMs actively connect to JM, whose name is resolvable (i.e., there's no outbound association from JM actor, only inbound). However, if a TM has a fatal error such as a task not responding to canceling request, it does graceful cleanup, a part of which is closing akka system sending a poison pill to JM, and then shutdown itself. Once this poison pill is gotten in JM, (as part of fail-over restart) its actor starts doing outbound association to destination TM host name that was provided during initial handshake. This outbound association here can't be succeeded if TM is not accessible via host name like in general Kubernetes setting. From this point on, TM can talk to JM for TM registration, but JM can't respond to this registration request, since outbound association can never be made. This failure of outbound association from JM's akka endpoint causes indefinite stuck in task scheduling due to the failure of TM registration with this error: {code:java} 2019-02-28 21:58:15,867 DEBUG org.apache.flink.runtime.taskexecutor.TaskExecutor - Could not resolve ResourceManager address akka.ssl.tcp://flink@flink-jobmanager:6123/user/resourcemanager, retrying in 1 ms. {code} In response to constant failure like above, JM has slot allocation failure indefinitely as well: {code:java} org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Could not allocate all requires slots within timeout of 30 ms {code} We know there's multiple workarounds suggested here in this thread like stateful set, init container, and the passing JVM argument, but we did not want to add artifacts and complexity to deployment in production just to fix this issue (I tried the last taskmanager.host one as it's the least invasive to deployment, but it did not work for our case). Therefore, we went ahead adding "_taskmanager.rpc.use-host-address_" configuration in Flink and it's false by default, but if it's set to true, only in RPC setting, TM simply uses _taskManagerAddress.getHostAddress()_ instead of _taskManagerAddress.getHostName()_ (actual patch is a few lines as you could expect). It was minimal enough to us and it has been solving the problem so far. We decided to do this way because this could be a helpful option for an environment like the usual Kubernetes setting without TM stateful set or tweaks. -I am not sure if you guys are interested in this way, but sharing this for thought or interest.- _*Since I wrote this, I found that FLINK-11632 had done what I described and it's been applied to 1.7 onward.*_ was (Author: hwanju): We experienced a similar issue to this, but more seriously with the communication between resource manager and task manager. In a normal situation, it works fine since only TMs actively connect to JM, whose name is resolvable (i.e., there's no outbound association from JM actor, only inbound). However, if a TM has a fatal error such as a task not responding to canceling request, it does graceful cleanup, a part of which is closing akka system sending a poison pill to JM, and then shutdown itself. Once this poison pill is gotten in JM, (as part of fail-over restart) its actor starts doing outbound association to destination TM host name that was provided during initial handshake. This outbound association here can't be succeeded if TM is not accessible via host name like in general Kubernetes setting. From this point on, TM can talk to JM for TM registration, but JM can't respond to this registration request, since outbound association can never be made. This failure of outbound association from JM's akka endpoint causes indefinite stuck in task scheduling due to the failure of TM registration with this error: {code:java} 2019-02-28 21:58:15,867 DEBUG org.apache.flink.runtime.taskexecutor.TaskExecutor - Could not resolve ResourceManager address akka.ssl.tcp://flink@flink-jobmanager:6123/user/resourcemanager, retrying in 1 ms. {code} In response to constant failure like above, JM has slot allocation failure indefinitely as well: {code:java} org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Could not allocate all requires slots within timeout of 30 ms {code} We know there's multiple workarounds suggested here in this thread like stateful set, init container, and the passing JVM argument, but we did not want to add artifacts and complexity to deployment in production just to fix this issue (I tried the last taskmanager.host one
[jira] [Commented] (FLINK-11127) Make metrics query service establish connection to JobManager
[ https://issues.apache.org/jira/browse/FLINK-11127?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16821683#comment-16821683 ] Hwanju Kim commented on FLINK-11127: We experienced a similar issue to this, but more seriously with the communication between resource manager and task manager. In a normal situation, it works fine since only TMs actively connect to JM, whose name is resolvable (i.e., there's no outbound association from JM actor, only inbound). However, if a TM has a fatal error such as a task not responding to canceling request, it does graceful cleanup, a part of which is closing akka system sending a poison pill to JM, and then shutdown itself. Once this poison pill is gotten in JM, (as part of fail-over restart) its actor starts doing outbound association to destination TM host name that was provided during initial handshake. This outbound association here can't be succeeded if TM is not accessible via host name like in general Kubernetes setting. From this point on, TM can talk to JM for TM registration, but JM can't respond to this registration request, since outbound association can never be made. This failure of outbound association from JM's akka endpoint causes indefinite stuck in task scheduling due to the failure of TM registration with this error: {code:java} 2019-02-28 21:58:15,867 DEBUG org.apache.flink.runtime.taskexecutor.TaskExecutor - Could not resolve ResourceManager address akka.ssl.tcp://flink@flink-jobmanager:6123/user/resourcemanager, retrying in 1 ms. {code} In response to constant failure like above, JM has slot allocation failure indefinitely as well: {code:java} org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Could not allocate all requires slots within timeout of 30 ms {code} We know there's multiple workarounds suggested here in this thread like stateful set, init container, and the passing JVM argument, but we did not want to add artifacts and complexity to deployment in production just to fix this issue (I tried the last taskmanager.host one as it's the least invasive to deployment, but it did not work for our case). Therefore, we went ahead adding "_taskmanager.rpc.use-host-address_" configuration in Flink and it's false by default, but if it's set to true, only in RPC setting, TM simply uses _taskManagerAddress.getHostAddress()_ instead of _taskManagerAddress.getHostName()_ (actual patch is a few lines as you could expect). It was minimal enough to us and it has been solving the problem so far. We decided to do this way because this could be a helpful option for an environment like the usual Kubernetes setting without TM stateful set or tweaks. I am not sure if you guys are interested in this way, but sharing this for thought or interest. > Make metrics query service establish connection to JobManager > - > > Key: FLINK-11127 > URL: https://issues.apache.org/jira/browse/FLINK-11127 > Project: Flink > Issue Type: Improvement > Components: Deployment / Kubernetes, Runtime / Coordination, Runtime > / Metrics >Affects Versions: 1.7.0 >Reporter: Ufuk Celebi >Priority: Major > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > As part of FLINK-10247, the internal metrics query service has been separated > into its own actor system. Before this change, the JobManager (JM) queried > TaskManager (TM) metrics via the TM actor. Now, the JM needs to establish a > separate connection to the TM metrics query service actor. > In the context of Kubernetes, this is problematic as the JM will typically > *not* be able to resolve the TMs by name, resulting in warnings as follows: > {code} > 2018-12-11 08:32:33,962 WARN akka.remote.ReliableDeliverySupervisor > - Association with remote system > [akka.tcp://flink-metrics@flink-task-manager-64b868487c-x9l4b:39183] has > failed, address is now gated for [50] ms. Reason: [Association failed with > [akka.tcp://flink-metrics@flink-task-manager-64b868487c-x9l4b:39183]] Caused > by: [flink-task-manager-64b868487c-x9l4b: Name does not resolve] > {code} > In order to expose the TMs by name in Kubernetes, users require a service > *for each* TM instance which is not practical. > This currently results in the web UI not being to display some basic metrics > about number of sent records. You can reproduce this by following the READMEs > in {{flink-container/kubernetes}}. > This worked before, because the JM is typically exposed via a service with a > known name and the TMs establish the connection to it which the metrics query > service piggybacked on. > A potential solution to this might be to let the query service connect to the > JM similar to how the
[jira] [Created] (FLINK-12260) Slot allocation failure by taskmanager registration timeout and race
Hwanju Kim created FLINK-12260: -- Summary: Slot allocation failure by taskmanager registration timeout and race Key: FLINK-12260 URL: https://issues.apache.org/jira/browse/FLINK-12260 Project: Flink Issue Type: Bug Components: Runtime / Coordination Affects Versions: 1.6.3 Reporter: Hwanju Kim In 1.6.2., we have seen slot allocation failure keep happening for long time. Having looked at the log, I see the following behavior: # TM sends a registration request R1 to resource manager. # R1 times out after 100ms, which is initial timeout. # TM retries a registration request R2 to resource manager (with timeout 200ms). # R2 arrives first at resource manager and registered, and then TM gets successful response moving onto step 5 below. # On successful registration, R2's instance is put to taskManagerRegistrations # Then R1 arrives at resource manager and realizes the same TM resource ID is already registered, which then unregisters R2's instance ID from taskManagerRegistrations. A new instance ID for R1 is registered to workerRegistration. # R1's response is not handled though since it already timed out (see akka temp actor resolve failure below), hence no registration to taskManagerRegistrations. # TM keeps heartbeating to the resource manager with slot status. # Resource manager ignores this slot status, since taskManagerRegistrations contains R2, not R1, which replaced R2 in workerRegistration at step 6. # Slot request can never be fulfilled, timing out. The following is the debug logs for the above steps: {code:java} JM log: 2019-04-11 22:39:40.000,Registering TaskManager with ResourceID 46c8e0d0fcf2c306f11954a1040d5677 (akka.ssl.tcp://flink@flink-taskmanager:6122/user/taskmanager_0) at ResourceManager 2019-04-11 22:39:40.000,Registering TaskManager 46c8e0d0fcf2c306f11954a1040d5677 under deade132e2c41c52019cdc27977266cf at the SlotManager. 2019-04-11 22:39:40.000,Replacing old registration of TaskExecutor 46c8e0d0fcf2c306f11954a1040d5677. 2019-04-11 22:39:40.000,Unregister TaskManager deade132e2c41c52019cdc27977266cf from the SlotManager. 2019-04-11 22:39:40.000,Registering TaskManager with ResourceID 46c8e0d0fcf2c306f11954a1040d5677 (akka.ssl.tcp://flink@flink-taskmanager:6122/user/taskmanager_0) at ResourceManager TM log: 2019-04-11 22:39:40.000,Registration at ResourceManager attempt 1 (timeout=100ms) 2019-04-11 22:39:40.000,Registration at ResourceManager (akka.ssl.tcp://flink@flink-jobmanager:6123/user/resourcemanager) attempt 1 timed out after 100 ms 2019-04-11 22:39:40.000,Registration at ResourceManager attempt 2 (timeout=200ms) 2019-04-11 22:39:40.000,Successful registration at resource manager akka.ssl.tcp://flink@flink-jobmanager:6123/user/resourcemanager under registration id deade132e2c41c52019cdc27977266cf. 2019-04-11 22:39:41.000,resolve of path sequence [/temp/$c] failed{code} As RPC calls seem to use akka ask, which creates temporary source actor, I think multiple RPC calls could've arrived out or order by different actor pairs and the symptom above seems to be due to that. If so, it could have attempt account in the call argument to prevent unexpected unregistration? At this point, what I have done is only log analysis, so I could do further analysis, but before that wanted to check if it's a known issue. I also searched with some relevant terms and log pieces, but couldn't find the duplicate. Please deduplicate if any. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-12245) Transient slot allocation failure on job recovery
[ https://issues.apache.org/jira/browse/FLINK-12245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16821407#comment-16821407 ] Hwanju Kim commented on FLINK-12245: Thanks Till! So, FLINK-9635 indeed extracted the relevant subset for this issue from the scheduler refactoring in group-aware scheduler work. That's nice and what I wanted. I just cherry-picked the patch to 1.6.2 and retest worked fine. > Transient slot allocation failure on job recovery > - > > Key: FLINK-12245 > URL: https://issues.apache.org/jira/browse/FLINK-12245 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.6.3 > Environment: Flink 1.6.2 with Kubernetes >Reporter: Hwanju Kim >Priority: Major > > In 1.6.2, We have experienced that slot allocation is transiently failed on > job recovery especially when task manager (TM) is unavailable leading to > heartbeat failure. By transient, it means it fails once with slot allocation > timeout (by default 5min) and then next recovering restart is succeeded. > > I found that each _Execution_ remembers previous allocations and tries to > prefer the last previous allocation for the sake of local state recovery from > the resolved slot candidates. If the previous allocation belongs to > unavailable TM, the candidates do not have this previous allocation, thereby > forcing slot provider to request a new slot to resource manager, which then > finds a new TM and its available slots. So far it is expected and fine, but > any next execution that also belonged to the unavailable TM and has the first > task as predecessor fails with the unavailable previous allocation as well. > Here it also requests another new slot since it never finds the gone previous > allocation from candidates. However, this behavior may make more slot > requests than available. For example, if two pipelined tasks shared one slot > in one TM, which is then crashed being replaced with a new TM, two new slot > requests are generated from the tasks. Since two slot requests cannot be > fulfilled by one slot TM, it hits slot allocation timeout and restarts the > job. > > {code:java} > org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: > Could not allocate all requires slots within timeout of 30 ms. Slots > required: 2, slots allocated: 1 {code} > > At the next round of recovery, since the second execution failed to allocate > a new slot, its last previous allocation is _null_, then it falls back to > locality-based allocation strategy, which can find the slot allocated for the > first task, and thus succeeded. Although it is eventually succeeded, it > increases downtime by slot allocation timeout. > > The reason of this behavior is > _PreviousAllocationSchedulingStrategy.findMatchWithLocality()_ immediately > returns _null_ if previous allocation is not empty and is not contained in > candidate list. I thought that if previous allocation is not in the > candidates, it can fall back to > _LocationPreferenceSchedulingStrategy.findMatchWithLocality()_ rather than > returning _null_. By doing so, it can avoid requesting more than available. > Requesting more slots could be fine in an environment where resource managers > can reactively spawn up more TMs (like Yarn/Mesos) although it could spawn > more than needed, but StandaloneResourceManager with statically provisioned > resource cannot help but failing to allocate requested slots. > > Having looked at the mainline branch and 1.8.0, although I have not attempted > to reproduce this issue with mainline, the related code is changed to what I > have expected (falling back to locality-based strategy if previous allocation > is not in candidates): > PreviousAllocationSlotSelectionStrategy.selectBestSlotForProfile(). Those led > me to reading group-aware scheduling work > ([https://docs.google.com/document/d/1q7NOqt05HIN-PlKEEPB36JiuU1Iu9fnxxVGJzylhsxU/edit#heading=h.k15nfgsa5bnk]). > In addition, I checked in 1.6.2 _matchPreviousLocationNotAvailable_ test > expects the problematic behavior I described. So, I started wondering whether > the behavior of previous allocation strategy in non-mainline is by design or > not. I have a fix similar to the mainline and verified that the problem is > resolved, but I am bringing up the issue to have context around the behavior > and to discuss what would be the side-effect of the fix. I understand the > current vertex-by-vertex scheduling would be inefficient by letting an > execution that belonged to unavailable slot steal another task's previous > slot, but having slot allocation failure seems worse to me. > > I searched with slot allocation failure term in existing issues, and couldn't > find the same
[jira] [Created] (FLINK-12245) Transient slot allocation failure on job recovery
Hwanju Kim created FLINK-12245: -- Summary: Transient slot allocation failure on job recovery Key: FLINK-12245 URL: https://issues.apache.org/jira/browse/FLINK-12245 Project: Flink Issue Type: Bug Components: Runtime / Coordination Affects Versions: 1.6.3 Environment: Flink 1.6.2 with Kubernetes Reporter: Hwanju Kim In 1.6.2, We have experienced that slot allocation is transiently failed on job recovery especially when task manager (TM) is unavailable leading to heartbeat failure. By transient, it means it fails once with slot allocation timeout (by default 5min) and then next recovering restart is succeeded. I found that each _Execution_ remembers previous allocations and tries to prefer the last previous allocation for the sake of local state recovery from the resolved slot candidates. If the previous allocation belongs to unavailable TM, the candidates do not have this previous allocation, thereby forcing slot provider to request a new slot to resource manager, which then finds a new TM and its available slots. So far it is expected and fine, but any next execution that also belonged to the unavailable TM and has the first task as predecessor fails with the unavailable previous allocation as well. Here it also requests another new slot since it never finds the gone previous allocation from candidates. However, this behavior may make more slot requests than available. For example, if two pipelined tasks shared one slot in one TM, which is then crashed being replaced with a new TM, two new slot requests are generated from the tasks. Since two slot requests cannot be fulfilled by one slot TM, it hits slot allocation timeout and restarts the job. {code:java} org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Could not allocate all requires slots within timeout of 30 ms. Slots required: 2, slots allocated: 1 {code} At the next round of recovery, since the second execution failed to allocate a new slot, its last previous allocation is _null_, then it falls back to locality-based allocation strategy, which can find the slot allocated for the first task, and thus succeeded. Although it is eventually succeeded, it increases downtime by slot allocation timeout. The reason of this behavior is _PreviousAllocationSchedulingStrategy.findMatchWithLocality()_ immediately returns _null_ if previous allocation is not empty and is not contained in candidate list. I thought that if previous allocation is not in the candidates, it can fall back to _LocationPreferenceSchedulingStrategy.findMatchWithLocality()_ rather than returning _null_. By doing so, it can avoid requesting more than available. Requesting more slots could be fine in an environment where resource managers can reactively spawn up more TMs (like Yarn/Mesos) although it could spawn more than needed, but StandaloneResourceManager with statically provisioned resource cannot help but failing to allocate requested slots. Having looked at the mainline branch and 1.8.0, although I have not attempted to reproduce this issue with mainline, the related code is changed to what I have expected (falling back to locality-based strategy if previous allocation is not in candidates): PreviousAllocationSlotSelectionStrategy.selectBestSlotForProfile(). Those led me to reading group-aware scheduling work ([https://docs.google.com/document/d/1q7NOqt05HIN-PlKEEPB36JiuU1Iu9fnxxVGJzylhsxU/edit#heading=h.k15nfgsa5bnk]). In addition, I checked in 1.6.2 _matchPreviousLocationNotAvailable_ test expects the problematic behavior I described. So, I started wondering whether the behavior of previous allocation strategy in non-mainline is by design or not. I have a fix similar to the mainline and verified that the problem is resolved, but I am bringing up the issue to have context around the behavior and to discuss what would be the side-effect of the fix. I understand the current vertex-by-vertex scheduling would be inefficient by letting an execution that belonged to unavailable slot steal another task's previous slot, but having slot allocation failure seems worse to me. I searched with slot allocation failure term in existing issues, and couldn't find the same issue, hence this issue. Please feel free to deduplicate it if any. -- This message was sent by Atlassian JIRA (v7.6.3#76005)