[jira] [Commented] (FLINK-15156) Warn user if System.exit() is called in user code

2021-01-19 Thread Hwanju Kim (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15156?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17267987#comment-17267987
 ] 

Hwanju Kim commented on FLINK-15156:


[~rmetzger] - No problem at all and thanks a lot for such thorough review. Good 
that we have unified security manager.

> Warn user if System.exit() is called in user code
> -
>
> Key: FLINK-15156
> URL: https://issues.apache.org/jira/browse/FLINK-15156
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Reporter: Robert Metzger
>Assignee: Hwanju Kim
>Priority: Minor
>  Labels: pull-request-available, starter
> Fix For: 1.13.0
>
>
> It would make debugging Flink errors easier if we would intercept and log 
> calls to System.exit() through the SecurityManager.
> A user recently had an error where the JobManager was shutting down because 
> of a System.exit() in the user code: 
> https://lists.apache.org/thread.html/b28dabcf3068d489f38399c456c80d48569fcdf74b15f8bb95d532d0%40%3Cuser.flink.apache.org%3E
> If I remember correctly, we had such issues before.
> I put this ticket into the "Runtime / Coordination" component, as it is 
> mostly about improving the usability / debuggability in that area.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15156) Warn user if System.exit() is called in user code

2020-10-19 Thread Hwanju Kim (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15156?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17216875#comment-17216875
 ] 

Hwanju Kim commented on FLINK-15156:


For 3, definitely small effort. I was just curious about what usecase you have 
seen with the concern. I agree and I think we should add this option as well.

I will rebase the current one onto the mainline to prepare for the review. For 
2, I think I would leave the current instrumentation as is, while expecting 
addition/removal to be done, which should be easy, during the review.

> Warn user if System.exit() is called in user code
> -
>
> Key: FLINK-15156
> URL: https://issues.apache.org/jira/browse/FLINK-15156
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Reporter: Robert Metzger
>Priority: Minor
>  Labels: starter
>
> It would make debugging Flink errors easier if we would intercept and log 
> calls to System.exit() through the SecurityManager.
> A user recently had an error where the JobManager was shutting down because 
> of a System.exit() in the user code: 
> https://lists.apache.org/thread.html/b28dabcf3068d489f38399c456c80d48569fcdf74b15f8bb95d532d0%40%3Cuser.flink.apache.org%3E
> If I remember correctly, we had such issues before.
> I put this ticket into the "Runtime / Coordination" component, as it is 
> mostly about improving the usability / debuggability in that area.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15156) Warn user if System.exit() is called in user code

2020-10-16 Thread Hwanju Kim (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15156?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17215561#comment-17215561
 ] 

Hwanju Kim commented on FLINK-15156:


Robert, thanks for the comments:
 # Performance penalties: Good question. Although we haven't microbenchmarked 
its impact, we haven't heard performance regression. Actually, all of the 
security check is not on data path more in control path (thread control, socket 
accept/connect, exit, etc), I did not think it would matter. More importantly, 
if there is no added check, it's just a single function call with null 
check/return (and JDK also by default does one getSecurityManager call and null 
check as well). So considering that additional path is very small in control 
path, I think overhead should be negligible. Nonetheless, for platform 
provider, such security manager for sandboxing is inevitable going forward not 
only for exit, which is just the first usecase, even if it may cause a bit of 
penalties. That's ene reason to have configuration that defaults to false for 
general users (not platform providers).
 # More check points: Right. Thanks for the input. I also tried (ambitiously) 
to find all the places in the first place, but it was not very practical (also 
code addition may make patch larger) at that point so decided to add the 
frequently implemented path. Indeed, main() is the one where exit() is (most 
likely erroneously) put blindly following the same practice for general Java 
application. I haven't had data for the case where users put exit() in UDF 
though (but it may be possible if some badly written library can call it behind 
the scene, but I haven't seen that yet). We can further discuss about what more 
points are interesting.
 # In the case I have seen, mostly their exit is with log already (e.g., this 
argument is not provided, exiting) and we wanted to put the guard not 
terminating JVM by further messaging "you are not allowed to terminate JVM". I 
thought throwing exception with message is providing user with 
warning/debugging message without terminating JVM. Do you think some users 
still want their exit to be performed intentionally? I assumed that UDF or main 
terminating JVM is always unintended in Flink. But if exit may still be a 
wanted behavior in some cases, I think it makes sense to provide that option as 
well.

> Warn user if System.exit() is called in user code
> -
>
> Key: FLINK-15156
> URL: https://issues.apache.org/jira/browse/FLINK-15156
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Reporter: Robert Metzger
>Priority: Minor
>  Labels: starter
>
> It would make debugging Flink errors easier if we would intercept and log 
> calls to System.exit() through the SecurityManager.
> A user recently had an error where the JobManager was shutting down because 
> of a System.exit() in the user code: 
> https://lists.apache.org/thread.html/b28dabcf3068d489f38399c456c80d48569fcdf74b15f8bb95d532d0%40%3Cuser.flink.apache.org%3E
> If I remember correctly, we had such issues before.
> I put this ticket into the "Runtime / Coordination" component, as it is 
> mostly about improving the usability / debuggability in that area.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15156) Warn user if System.exit() is called in user code

2020-10-11 Thread Hwanju Kim (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15156?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17211949#comment-17211949
 ] 

Hwanju Kim commented on FLINK-15156:


FWIW, the following is what we have done:
 * Flink user security manager is added for general user sandbox checking, 
where currently only the exit is checked (others can be added later here).
 * The added one is forwarding all the checks but its overridden ones to 
previous security manager, if any (like decorator).
 * The security manager is set when JM and TM start (if configured, as 
described in the last bullet point).
 * Exit check has enabling/disabling point via a method only to affect user 
code, as Flink runtime needs to exit for some cases (e.g., fatal error).
 ** Once enabled, any thread spawned from the main thread inherits the enable 
flag.
 * What's enclosed by this enabled exit check is currently best-effort, not 
covering all the places where user code is involved. Main places are:
 ** main() in JM (currently for invokeInteractiveModeForExecution)
 ** StreamTask.invoke, triggerCheckpoint, cancel.
 * New exception, UserSystemExitException, is defined to be thrown when user 
code attempts to exit JVM. This has default message to warn the user.
 ** In main(), it's wrapped into ProgramInvocationException.
 ** In UDF, it fails the exiting task, thereby shipping the exception to JM 
triggering fail-over.
 * This security manager is only added if configuration (under security 
section) in flink-conf.yaml is enabled (disabled by default). The configuration 
is per check case (but currently only disallow-system-exit is available).

Please let me know if anyone wants to review the patch, or just discussion if 
anything does not make sense.

> Warn user if System.exit() is called in user code
> -
>
> Key: FLINK-15156
> URL: https://issues.apache.org/jira/browse/FLINK-15156
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Reporter: Robert Metzger
>Priority: Minor
>  Labels: starter
>
> It would make debugging Flink errors easier if we would intercept and log 
> calls to System.exit() through the SecurityManager.
> A user recently had an error where the JobManager was shutting down because 
> of a System.exit() in the user code: 
> https://lists.apache.org/thread.html/b28dabcf3068d489f38399c456c80d48569fcdf74b15f8bb95d532d0%40%3Cuser.flink.apache.org%3E
> If I remember correctly, we had such issues before.
> I put this ticket into the "Runtime / Coordination" component, as it is 
> mostly about improving the usability / debuggability in that area.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14938) Flink elasticsearch failure handler re-add indexrequest causes ConcurrentModificationException

2020-04-27 Thread Hwanju Kim (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14938?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17093058#comment-17093058
 ] 

Hwanju Kim commented on FLINK-14938:


[~ysn2233] - I have a question on your proposed solution. I wonder if you've 
gotten any performance measurement, which would've led you to that hybrid 
solution. IMO, since this ElasticSearch sink path is involved in I/O to 
external services, any CPU penalty incurred by concurrency-aware data structure 
may be dwarfed or invisible by much high I/O latency, so using 
CurrentLinkedQueue seems to be just fine to me (I mean in terms of latency, but 
it could be some additional CPU cost, which I am also not sure about its 
significance quantitatively). We are also looking to the resolution of this 
problem, but wanted to check if you have performance test to confirm whether 
such cost matters.

> Flink elasticsearch failure handler re-add indexrequest causes 
> ConcurrentModificationException
> --
>
> Key: FLINK-14938
> URL: https://issues.apache.org/jira/browse/FLINK-14938
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / ElasticSearch
>Affects Versions: 1.8.1
>Reporter: Shengnan YU
>Assignee: Shengnan YU
>Priority: Major
>
>  
> When use Elasticsearch connector failure handler (from official example) to 
> re-add documents, Flink encountered ConcurrentModificationException.
> {code:java}
> input.addSink(new ElasticsearchSink<>(
> config, transportAddresses,
> new ElasticsearchSinkFunction() {...},
> new ActionRequestFailureHandler() {
> @Override
> void onFailure(ActionRequest action,
> Throwable failure,
> int restStatusCode,
> RequestIndexer indexer) throw Throwable {
> if (ExceptionUtils.findThrowable(failure, 
> EsRejectedExecutionException.class).isPresent()) {
> // full queue; re-add document for indexing
> indexer.add(action);
> }
> }
> }));
> {code}
> I found that in method BufferingNoOpRequestIndexer$processBufferedRequests, 
> it will iterator a list of ActionRequest. However the failure handler will 
> keep re-adding request to that list after bulk, which causes 
> ConcurrentModificationException.
> {code:java}
> void processBufferedRequests(RequestIndexer actualIndexer) {
>for (ActionRequest request : bufferedRequests) {
>   if (request instanceof IndexRequest) {
>  actualIndexer.add((IndexRequest) request);
>   } else if (request instanceof DeleteRequest) {
>  actualIndexer.add((DeleteRequest) request);
>   } else if (request instanceof UpdateRequest) {
>  actualIndexer.add((UpdateRequest) request);
>   }
>}
>bufferedRequests.clear();
> }{code}
> I think it should be a multi-thread bug and is it ok to use concurrent queue 
> to maintain the failure request?
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-8417) Support STSAssumeRoleSessionCredentialsProvider in FlinkKinesisConsumer

2020-03-10 Thread Hwanju Kim (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-8417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17055658#comment-17055658
 ] 

Hwanju Kim edited comment on FLINK-8417 at 3/10/20, 7:24 AM:
-

I have a basic question on this. I wonder how this is different from 
{{AWSConfigConstants.CredentialsProvider.ASSUME_ROLE}} (by FLINK-9686 - 
although it says it's for producer, it should be available for consumer as it's 
with properties). AFAIK, with ASSUME_ROLE, if correct role ARN with proper 
policy/trust relationship is set, cross-account stream access could be 
feasible. I may miss some context here about what's currently not supported 
(consumer support, or creds expiration issue, or something else?). From the 
thread right above, it seems to point to ASSUME_ROLE, but it says it's not 
working in TM but in JM, which is little confusing to me.


was (Author: hwanju):
I have a basic question on this. I wonder how this is different from 
{{AWSConfigConstants.CredentialsProvider.}}ASSUME_ROLE (by FLINK-9686 - 
although it says it's for producer, it should be available for consumer as it's 
with properties). AFAIK, with ASSUME_ROLE, if correct role ARN with proper 
policy/trust relationship is set, cross-account stream access could be 
feasible. I may miss some context here about what's currently not supported 
(consumer support, or creds expiration issue, or something else?). From the 
thread right above, it seems to point to ASSUME_ROLE, but it says it's not 
working in TM but in JM, which is little confusing to me.

> Support STSAssumeRoleSessionCredentialsProvider in FlinkKinesisConsumer
> ---
>
> Key: FLINK-8417
> URL: https://issues.apache.org/jira/browse/FLINK-8417
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / Kinesis
>Reporter: Tzu-Li (Gordon) Tai
>Priority: Major
>  Labels: usability
>
> As discussed in ML: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kinesis-Connectors-With-Temporary-Credentials-td17734.html.
> Users need the functionality to access cross-account AWS Kinesis streams, 
> using AWS Temporary Credentials [1].
> We should add support for 
> {{AWSConfigConstants.CredentialsProvider.STSAssumeRole}}, which internally 
> would use the {{STSAssumeRoleSessionCredentialsProvider}} [2] in 
> {{AWSUtil#getCredentialsProvider(Properties)}}.
> [1] https://docs.aws.amazon.com/IAM/latest/UserGuide/id_credentials_temp.html
> [2] 
> https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/STSAssumeRoleSessionCredentialsProvider.html



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-8417) Support STSAssumeRoleSessionCredentialsProvider in FlinkKinesisConsumer

2020-03-10 Thread Hwanju Kim (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-8417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17055658#comment-17055658
 ] 

Hwanju Kim commented on FLINK-8417:
---

I have a basic question on this. I wonder how this is different from 
{{AWSConfigConstants.CredentialsProvider.}}ASSUME_ROLE (by FLINK-9686 - 
although it says it's for producer, it should be available for consumer as it's 
with properties). AFAIK, with ASSUME_ROLE, if correct role ARN with proper 
policy/trust relationship is set, cross-account stream access could be 
feasible. I may miss some context here about what's currently not supported 
(consumer support, or creds expiration issue, or something else?). From the 
thread right above, it seems to point to ASSUME_ROLE, but it says it's not 
working in TM but in JM, which is little confusing to me.

> Support STSAssumeRoleSessionCredentialsProvider in FlinkKinesisConsumer
> ---
>
> Key: FLINK-8417
> URL: https://issues.apache.org/jira/browse/FLINK-8417
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / Kinesis
>Reporter: Tzu-Li (Gordon) Tai
>Priority: Major
>  Labels: usability
>
> As discussed in ML: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kinesis-Connectors-With-Temporary-Credentials-td17734.html.
> Users need the functionality to access cross-account AWS Kinesis streams, 
> using AWS Temporary Credentials [1].
> We should add support for 
> {{AWSConfigConstants.CredentialsProvider.STSAssumeRole}}, which internally 
> would use the {{STSAssumeRoleSessionCredentialsProvider}} [2] in 
> {{AWSUtil#getCredentialsProvider(Properties)}}.
> [1] https://docs.aws.amazon.com/IAM/latest/UserGuide/id_credentials_temp.html
> [2] 
> https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/STSAssumeRoleSessionCredentialsProvider.html



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14949) Task cancellation can be stuck against out-of-thread error

2019-11-26 Thread Hwanju Kim (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14949?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16982305#comment-16982305
 ] 

Hwanju Kim commented on FLINK-14949:


[~azagrebin], thanks for the quick answer and sure, I can work on this.

> Task cancellation can be stuck against out-of-thread error
> --
>
> Key: FLINK-14949
> URL: https://issues.apache.org/jira/browse/FLINK-14949
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Task
>Affects Versions: 1.8.2
>Reporter: Hwanju Kim
>Priority: Major
>
> Task cancellation 
> ([_cancelOrFailAndCancelInvokable_|https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java#L991])
>  relies on multiple separate threads, which are _TaskCanceler_, 
> _TaskInterrupter_, and _TaskCancelerWatchdog_. While TaskCanceler performs 
> cancellation itself, TaskInterrupter periodically interrupts a non-reacting 
> task and TaskCancelerWatchdog kills JVM if cancellation has never been 
> finished within a certain amount of time (by default 3 min). Those all ensure 
> that cancellation can be done or either aborted transitioning to a terminal 
> state in finite time (FLINK-4715).
> However, if any asynchronous thread creation is failed such as by 
> out-of-thread (_java.lang.OutOfMemoryError: unable to create new native 
> thread_), the code transitions to CANCELING, but nothing could be performed 
> for cancellation or watched by watchdog. Currently, jobmanager does [retry 
> cancellation|https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java#L1121]
>  against any error returned, but a next retry [returns success once it sees 
> CANCELING|https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java#L997],
>  assuming that it is in progress. This leads to complete stuck in CANCELING, 
> which is non-terminal, so state machine is stuck after that.
> One solution would be that if a task has transitioned to CANCELLING but it 
> gets fatal error or OOM (i.e., _isJvmFatalOrOutOfMemoryError_ is true) 
> indicating that it could not reach spawning TaskCancelerWatchdog, it could 
> immediately consider that as fatal error (not safely cancellable) calling 
> _notifyFatalError_, just as TaskCancelerWatchdog does but eagerly and 
> synchronously. That way, it can at least transition out of the non-terminal 
> state and furthermore clear potentially leaked thread/memory by restarting 
> JVM. The same method is also invoked by _failExternally_, but transitioning 
> to FAILED seems less critical as it's already terminal state.
> How to reproduce is straightforward by running an application that keeps 
> creating threads, each of which never finishes in a loop, and has multiple 
> tasks so that one task triggers failure and then the others are attempted to 
> be cancelled by full fail-over. In web UI dashboard, some tasks from a task 
> manager where any of cancellation-related threads failed to be spawned are 
> stuck in CANCELLING for good.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-14949) Task cancellation can be stuck against out-of-thread error

2019-11-25 Thread Hwanju Kim (Jira)
Hwanju Kim created FLINK-14949:
--

 Summary: Task cancellation can be stuck against out-of-thread error
 Key: FLINK-14949
 URL: https://issues.apache.org/jira/browse/FLINK-14949
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Coordination
Affects Versions: 1.8.2
Reporter: Hwanju Kim


Task cancellation 
([_cancelOrFailAndCancelInvokable_|https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java#L991])
 relies on multiple separate threads, which are _TaskCanceler_, 
_TaskInterrupter_, and _TaskCancelerWatchdog_. While TaskCanceler performs 
cancellation itself, TaskInterrupter periodically interrupts a non-reacting 
task and TaskCancelerWatchdog kills JVM if cancellation has never been finished 
within a certain amount of time (by default 3 min). Those all ensure that 
cancellation can be done or either aborted transitioning to a terminal state in 
finite time (FLINK-4715).

However, if any asynchronous thread creation is failed such as by out-of-thread 
(_java.lang.OutOfMemoryError: unable to create new native thread_), the code 
transitions to CANCELING, but nothing could be performed for cancellation or 
watched by watchdog. Currently, jobmanager does [retry 
cancellation|https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java#L1121]
 against any error returned, but a next retry [returns success once it sees 
CANCELING|https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java#L997],
 assuming that it is in progress. This leads to complete stuck in CANCELING, 
which is non-terminal, so state machine is stuck after that.

One solution would be that if a task has transitioned to CANCELLING but it gets 
fatal error or OOM (i.e., _isJvmFatalOrOutOfMemoryError_ is true) indicating 
that it could not reach spawning TaskCancelerWatchdog, it could immediately 
consider that as fatal error (not safely cancellable) calling 
_notifyFatalError_, just as TaskCancelerWatchdog does but eagerly and 
synchronously. That way, it can at least transition out of the non-terminal 
state and furthermore clear potentially leaked thread/memory by restarting JVM. 
The same method is also invoked by _failExternally_, but transitioning to 
FAILED seems less critical as it's already terminal state.

How to reproduce is straightforward by running an application that keeps 
creating threads, each of which never finishes in a loop, and has multiple 
tasks so that one task triggers failure and then the others are attempted to be 
cancelled by full fail-over. In web UI dashboard, some tasks from a task 
manager where any of cancellation-related threads failed to be spawned are 
stuck in CANCELLING for good.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14589) Redundant slot requests with the same AllocationID leads to inconsistent slot table

2019-11-04 Thread Hwanju Kim (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14589?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16966989#comment-16966989
 ] 

Hwanju Kim commented on FLINK-14589:


[~trohrmann], sure I will ping you once PR can be ready.

> Redundant slot requests with the same AllocationID leads to inconsistent slot 
> table
> ---
>
> Key: FLINK-14589
> URL: https://issues.apache.org/jira/browse/FLINK-14589
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.6.3
>Reporter: Hwanju Kim
>Assignee: Hwanju Kim
>Priority: Major
> Fix For: 1.10.0, 1.8.3, 1.9.2
>
>
> _NOTE: We found this issue in 1.6.2, but I checked the relevant code is still 
> in the mainline. What I am not sure, however, is what other slot-related 
> fixes after 1.6.2 (such as FLINK-11059 and FLINK-12863, etc) would prevent 
> the initial cause of this issue from happening. So far, I have not found the 
> related fix to the issue I am describing here, so opening this issue. Please 
> feel free to deduplicate this if another one already covers it. Please note 
> that we have already picked FLINK-9912, which turned out to be a major fix to 
> slot allocation failure issue. I will note the ramification to that issue 
> just in case others experience the same problem)._
> h2. Summary
> When *requestSlot* is called from ResourceManager (RM) to TaskManager (TM), 
> TM firstly reserves the requested slot marking it as ALLOCATED, offers the 
> slot to JM, and marks the slot as ACTIVE once getting acknowledgement from 
> JM. This three-way communication for slot allocation is identified by 
> AllocationID, which is generated by JM initially. The way TM reserves a slot 
> is by calling *TaskSlotTable.allocateSlot* if the requested slot number 
> (i.e., slot index) is free to use. The major data structure is *TaskSlot* 
> indexed by slot index. Once the slot is marked as ALLOCATED with a given 
> AllocationID, it tries to update other maps such as *allocationIDTaskSlotMap* 
> keyed by AllocationID and *slotsPerJob* keyed by JobID. Here when updating 
> *allocationIDTaskSlotMap*, it's directly using 
> *allocationIDTaskSlotMap.put(allocationId, taskSlot)*, which may overwrite 
> existing entry, if one is already there with the same AllocationID. This 
> would render inconsistency between *TaskSlot* and *allocationIDTaskSlotMap*, 
> where the former says two slots are allocated by the same AllocationID and 
> the latter says the AllocationID only has the latest task slot. With this 
> state, once the slot is freed, *freeSlot* is driven by AllocationID, so it 
> fetches slot index (i.e., the latter one that has arrived later) from 
> *allocationIDTaskSlotMap*, marks the slot free, and removes it from 
> *allocationIDTaskSlotMap*. But still the old task slot is marked as 
> allocated. This old task slot becomes zombie and can never be freed. This can 
> cause permanent slot allocation failure if TM slots are statically and 
> tightly provisioned and resource manager is not actively spawning new TMs 
> where unavailable (e.g., Kubernetes without active mode integration, which is 
> not yet available).
> h2. Scenario
> From my observation, the redundant slot requests with the same AllocationID 
> and different slot indices should be rare but can happen with race condition 
> especially when repeated fail-over and heartbeat timeout (primarily caused by 
> transient resource overload, not permanent network partition/node outage) are 
> taking place. The following is a detailed scenario, which could lead to this 
> issue (AID is AllocationID):
>  # AID1 is requested from JM and put in the pending request queue in RM.
>  # RM picks up slot number 1 (Slot1) from freeSlots and performs requestSlot 
> with Slot1 and AID1. Here this slot request is on the fly.
>  # In the meantime, Slot1 is occupied by AID2 in TM for a delayed slot 
> request and TM sends slot report via heartbeat to RM saying Slot1 is already 
> allocated with AID2.
>  # RM's heartbeat handler identifies that Slot1 is occupied with a different 
> AID (AID2) so that it should reject the pending request sent from step 2.
>  # handleFailedSlotRequest puts the rejected AID1 to pending request again by 
> retrying the slot request. Now it picks up another available slot, say Slot2. 
> So, the retried slot request with Slot 2 and AID1 is on the fly.
>  # In the meantime, Slot1 occupied by AID2 is freed (by any disconnection 
> with JM, or releasing all the tasks in the slot on cancellation/failure - the 
> latter was observed).
>  # The in-flight slot request (Slot1, AID1) from step 2 arrives at TM, and 
> it's succeeded as Slot1 is free to allocate. TM offers the Slot1 to JM, which 
> acknowledges it so that TM marks 

[jira] [Created] (FLINK-14589) Redundant slot requests with the same AllocationID leads to inconsistent slot table

2019-10-31 Thread Hwanju Kim (Jira)
Hwanju Kim created FLINK-14589:
--

 Summary: Redundant slot requests with the same AllocationID leads 
to inconsistent slot table
 Key: FLINK-14589
 URL: https://issues.apache.org/jira/browse/FLINK-14589
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Coordination
Affects Versions: 1.6.3
Reporter: Hwanju Kim


_NOTE: We found this issue in 1.6.2, but I checked the relevant code is still 
in the mainline. What I am not sure, however, is what other slot-related fixes 
after 1.6.2 (such as FLINK-11059 and FLINK-12863, etc) would prevent the 
initial cause of this issue from happening. So far, I have not found the 
related fix to the issue I am describing here, so opening this issue. Please 
feel free to deduplicate this if another one already covers it. Please note 
that we have already picked FLINK-9912, which turned out to be a major fix to 
slot allocation failure issue. I will note the ramification to that issue just 
in case others experience the same problem)._
h2. Summary

When *requestSlot* is called from ResourceManager (RM) to TaskManager (TM), TM 
firstly reserves the requested slot marking it as ALLOCATED, offers the slot to 
JM, and marks the slot as ACTIVE once getting acknowledgement from JM. This 
three-way communication for slot allocation is identified by AllocationID, 
which is generated by JM initially. The way TM reserves a slot is by calling 
*TaskSlotTable.allocateSlot* if the requested slot number (i.e., slot index) is 
free to use. The major data structure is *TaskSlot* indexed by slot index. Once 
the slot is marked as ALLOCATED with a given AllocationID, it tries to update 
other maps such as *allocationIDTaskSlotMap* keyed by AllocationID and 
*slotsPerJob* keyed by JobID. Here when updating *allocationIDTaskSlotMap*, 
it's directly using *allocationIDTaskSlotMap.put(allocationId, taskSlot)*, 
which may overwrite existing entry, if one is already there with the same 
AllocationID. This would render inconsistency between *TaskSlot* and 
*allocationIDTaskSlotMap*, where the former says two slots are allocated by the 
same AllocationID and the latter says the AllocationID only has the latest task 
slot. With this state, once the slot is freed, *freeSlot* is driven by 
AllocationID, so it fetches slot index (i.e., the latter one that has arrived 
later) from *allocationIDTaskSlotMap*, marks the slot free, and removes it from 
*allocationIDTaskSlotMap*. But still the old task slot is marked as allocated. 
This old task slot becomes zombie and can never be freed. This can cause 
permanent slot allocation failure if TM slots are statically and tightly 
provisioned and resource manager is not actively spawning new TMs where 
unavailable (e.g., Kubernetes without active mode integration, which is not yet 
available).
h2. Scenario

>From my observation, the redundant slot requests with the same AllocationID 
>and different slot indices should be rare but can happen with race condition 
>especially when repeated fail-over and heartbeat timeout (primarily caused by 
>transient resource overload, not permanent network partition/node outage) are 
>taking place. The following is a detailed scenario, which could lead to this 
>issue (AID is AllocationID):
 # AID1 is requested from JM and put in the pending request queue in RM.
 # RM picks up slot number 1 (Slot1) from freeSlots and performs requestSlot 
with Slot1 and AID1. Here this slot request is on the fly.
 # In the meantime, Slot1 is occupied by AID2 in TM for a delayed slot request 
and TM sends slot report via heartbeat to RM saying Slot1 is already allocated 
with AID2.
 # RM's heartbeat handler identifies that Slot1 is occupied with a different 
AID (AID2) so that it should reject the pending request sent from step 2.
 # handleFailedSlotRequest puts the rejected AID1 to pending request again by 
retrying the slot request. Now it picks up another available slot, say Slot2. 
So, the retried slot request with Slot 2 and AID1 is on the fly.
 # In the meantime, Slot1 occupied by AID2 is freed (by any disconnection with 
JM, or releasing all the tasks in the slot on cancellation/failure - the latter 
was observed).
 # The in-flight slot request (Slot1, AID1) from step 2 arrives at TM, and it's 
succeeded as Slot1 is free to allocate. TM offers the Slot1 to JM, which 
acknowledges it so that TM marks Slot1 ACTIVE with AID1. As this point, 
allocationIDTaskSlotMap[AID1] = Slot1 in TM. JM's allocatedSlots[AID1] = Slot1.
 # The next in-flight slot request (Slot2, AID1) from step 5 arrives at TM. As 
Slot2 is still free, TM marks it ALLOCATED and offers Slot2 to JM and 
*"overwrite allocationIDTaskSlotMap[AID1] to Slot2"*
 # In step 7, JM has allocatedSlots[AID1] = Slot1, which leads JM to reject the 
offer as the same AID is already occupied by another slot.
 # TM gets the rejected offer for (Slot2, AID1) and frees 

[jira] [Commented] (FLINK-11127) Make metrics query service establish connection to JobManager

2019-10-31 Thread Hwanju Kim (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-11127?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16964210#comment-16964210
 ] 

Hwanju Kim commented on FLINK-11127:


[~tsubasa2oo2], in addition to what [~trohrmann] said (and it's interesting to 
me as well why cancel led to metric connection issue), I also wonder what was 
the workaround fix you used before 1.8 and had worked fine in the same scenario 
without the error you mentioned above. The problematic error related to DNS 
would show akka error with "Name or service not known". We've tested for 
TM-to-RM registration but not necessarily have seen the metrics connection 
error that you showed.

> Make metrics query service establish connection to JobManager
> -
>
> Key: FLINK-11127
> URL: https://issues.apache.org/jira/browse/FLINK-11127
> Project: Flink
>  Issue Type: Improvement
>  Components: Deployment / Kubernetes, Runtime / Coordination, Runtime 
> / Metrics
>Affects Versions: 1.7.0
>Reporter: Ufuk Celebi
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> As part of FLINK-10247, the internal metrics query service has been separated 
> into its own actor system. Before this change, the JobManager (JM) queried 
> TaskManager (TM) metrics via the TM actor. Now, the JM needs to establish a 
> separate connection to the TM metrics query service actor.
> In the context of Kubernetes, this is problematic as the JM will typically 
> *not* be able to resolve the TMs by name, resulting in warnings as follows:
> {code}
> 2018-12-11 08:32:33,962 WARN  akka.remote.ReliableDeliverySupervisor  
>   - Association with remote system 
> [akka.tcp://flink-metrics@flink-task-manager-64b868487c-x9l4b:39183] has 
> failed, address is now gated for [50] ms. Reason: [Association failed with 
> [akka.tcp://flink-metrics@flink-task-manager-64b868487c-x9l4b:39183]] Caused 
> by: [flink-task-manager-64b868487c-x9l4b: Name does not resolve]
> {code}
> In order to expose the TMs by name in Kubernetes, users require a service 
> *for each* TM instance which is not practical.
> This currently results in the web UI not being to display some basic metrics 
> about number of sent records. You can reproduce this by following the READMEs 
> in {{flink-container/kubernetes}}.
> This worked before, because the JM is typically exposed via a service with a 
> known name and the TMs establish the connection to it which the metrics query 
> service piggybacked on.
> A potential solution to this might be to let the query service connect to the 
> JM similar to how the TMs register.
> I tagged this ticket as an improvement, but in the context of Kubernetes I 
> would consider this to be a bug.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-11127) Make metrics query service establish connection to JobManager

2019-10-13 Thread Hwanju Kim (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-11127?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16950234#comment-16950234
 ] 

Hwanju Kim commented on FLINK-11127:


[~vicTTim], although I have not retested with 1.8 (we would shortly though), I 
think that problem has been solved by that FLINK-11632 if you use "ip" as bind 
policy. What we have applied is a subset of FLINK-11632.

> Make metrics query service establish connection to JobManager
> -
>
> Key: FLINK-11127
> URL: https://issues.apache.org/jira/browse/FLINK-11127
> Project: Flink
>  Issue Type: Improvement
>  Components: Deployment / Kubernetes, Runtime / Coordination, Runtime 
> / Metrics
>Affects Versions: 1.7.0
>Reporter: Ufuk Celebi
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> As part of FLINK-10247, the internal metrics query service has been separated 
> into its own actor system. Before this change, the JobManager (JM) queried 
> TaskManager (TM) metrics via the TM actor. Now, the JM needs to establish a 
> separate connection to the TM metrics query service actor.
> In the context of Kubernetes, this is problematic as the JM will typically 
> *not* be able to resolve the TMs by name, resulting in warnings as follows:
> {code}
> 2018-12-11 08:32:33,962 WARN  akka.remote.ReliableDeliverySupervisor  
>   - Association with remote system 
> [akka.tcp://flink-metrics@flink-task-manager-64b868487c-x9l4b:39183] has 
> failed, address is now gated for [50] ms. Reason: [Association failed with 
> [akka.tcp://flink-metrics@flink-task-manager-64b868487c-x9l4b:39183]] Caused 
> by: [flink-task-manager-64b868487c-x9l4b: Name does not resolve]
> {code}
> In order to expose the TMs by name in Kubernetes, users require a service 
> *for each* TM instance which is not practical.
> This currently results in the web UI not being to display some basic metrics 
> about number of sent records. You can reproduce this by following the READMEs 
> in {{flink-container/kubernetes}}.
> This worked before, because the JM is typically exposed via a service with a 
> known name and the TMs establish the connection to it which the metrics query 
> service piggybacked on.
> A potential solution to this might be to let the query service connect to the 
> JM similar to how the TMs register.
> I tagged this ticket as an improvement, but in the context of Kubernetes I 
> would consider this to be a bug.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-12260) Slot allocation failure by taskmanager registration timeout and race

2019-05-03 Thread Hwanju Kim (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12260?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16832313#comment-16832313
 ] 

Hwanju Kim commented on FLINK-12260:


Sure. I will ping you once I will have tested the fix.

> Slot allocation failure by taskmanager registration timeout and race
> 
>
> Key: FLINK-12260
> URL: https://issues.apache.org/jira/browse/FLINK-12260
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.6.3
>Reporter: Hwanju Kim
>Priority: Critical
> Attachments: FLINK-12260-repro.diff
>
>
>  
> In 1.6.2., we have seen slot allocation failure keep happening for long time. 
> Having looked at the log, I see the following behavior:
>  # TM sends a registration request R1 to resource manager.
>  # R1 times out after 100ms, which is initial timeout.
>  # TM retries a registration request R2 to resource manager (with timeout 
> 200ms).
>  # R2 arrives first at resource manager and registered, and then TM gets 
> successful response moving onto step 5 below.
>  # On successful registration, R2's instance is put to 
> taskManagerRegistrations
>  # Then R1 arrives at resource manager and realizes the same TM resource ID 
> is already registered, which then unregisters R2's instance ID from 
> taskManagerRegistrations. A new instance ID for R1 is registered to 
> workerRegistration.
>  # R1's response is not handled though since it already timed out (see akka 
> temp actor resolve failure below), hence no registration to 
> taskManagerRegistrations.
>  # TM keeps heartbeating to the resource manager with slot status.
>  # Resource manager ignores this slot status, since taskManagerRegistrations 
> contains R2, not R1, which replaced R2 in workerRegistration at step 6.
>  # Slot request can never be fulfilled, timing out.
> The following is the debug logs for the above steps:
>  
> {code:java}
> JM log:
> 2019-04-11 22:39:40.000,Registering TaskManager with ResourceID 
> 46c8e0d0fcf2c306f11954a1040d5677 
> (akka.ssl.tcp://flink@flink-taskmanager:6122/user/taskmanager_0) at 
> ResourceManager
> 2019-04-11 22:39:40.000,Registering TaskManager 
> 46c8e0d0fcf2c306f11954a1040d5677 under deade132e2c41c52019cdc27977266cf at 
> the SlotManager.
> 2019-04-11 22:39:40.000,Replacing old registration of TaskExecutor 
> 46c8e0d0fcf2c306f11954a1040d5677.
> 2019-04-11 22:39:40.000,Unregister TaskManager 
> deade132e2c41c52019cdc27977266cf from the SlotManager.
> 2019-04-11 22:39:40.000,Registering TaskManager with ResourceID 
> 46c8e0d0fcf2c306f11954a1040d5677 
> (akka.ssl.tcp://flink@flink-taskmanager:6122/user/taskmanager_0) at 
> ResourceManager
> TM log:
> 2019-04-11 22:39:40.000,Registration at ResourceManager attempt 1 
> (timeout=100ms)
> 2019-04-11 22:39:40.000,Registration at ResourceManager 
> (akka.ssl.tcp://flink@flink-jobmanager:6123/user/resourcemanager) attempt 1 
> timed out after 100 ms
> 2019-04-11 22:39:40.000,Registration at ResourceManager attempt 2 
> (timeout=200ms)
> 2019-04-11 22:39:40.000,Successful registration at resource manager 
> akka.ssl.tcp://flink@flink-jobmanager:6123/user/resourcemanager under 
> registration id deade132e2c41c52019cdc27977266cf.
> 2019-04-11 22:39:41.000,resolve of path sequence [/temp/$c] failed{code}
>  
> As RPC calls seem to use akka ask, which creates temporary source actor, I 
> think multiple RPC calls could've arrived out or order by different actor 
> pairs and the symptom above seems to be due to that. If so, it could have 
> attempt account in the call argument to prevent unexpected unregistration? At 
> this point, what I have done is only log analysis, so I could do further 
> analysis, but before that wanted to check if it's a known issue. I also 
> searched with some relevant terms and log pieces, but couldn't find the 
> duplicate. Please deduplicate if any.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-12260) Slot allocation failure by taskmanager registration timeout and race

2019-04-30 Thread Hwanju Kim (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12260?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16830862#comment-16830862
 ] 

Hwanju Kim commented on FLINK-12260:


Thanks for the clarification. I thought you meant it without introducing any 
additional map, but now it seems clear.

I had tried thinking conservative approach as I couldn't 100% rule out the 
possibility of sender-side race. As we may have a potential simpler solution, I 
looked at the code again a little further. Initially what led me to any 
possibility of race is this part:

 
{code:java}
val a = PromiseActorRef(ref.provider, timeout, targetName = actorRef, 
message.getClass.getName, sender)
actorRef.tell(message, a)
a.result.future
{code}
This is internalAsk from invokeRpc and PromiseActorRef internally does 
scheduler.scheduleOnce(timeout.duration) for the timer. The tell of actorRef is 
sending a message to RM through RemoteActorRef and EndpointManager where the 
message is passed to Dispatcher, which enqueues the message to mbox and 
executes mbox via executor thread. My impression was that as tell is 
asynchronous via executor service, the timer of PromiseActorRef set up before 
can fire before the message hit the road off the sender. Although that'd be 
possible, the message at least seems to be enqueued to mbox for RM endpoint and 
thus the order can be preserved against the next attempt after timeout. So, the 
ordering seems fine. In addition I was also concerned the case where two 
different ask calls might happen to use two different TCP connections leading 
any possible out-of-order delivery. Although not 100% exercising the relevant 
code, it seems to use a single connection associated by akka endpoints and I 
checked that's true by packet capture. 

So, based on the code inspection and no successful repro on sender-side, we can 
currently conclude that the race is likely happening in task executor 
connection/handshake on the receiver-side (as repro does). I will test it out 
with the Till's proposal. On our side, once this fix ends up being applied, we 
can keep eyes on our test apps, which intermittently hit this issue, to see if 
there's any other race issue.

 

 

> Slot allocation failure by taskmanager registration timeout and race
> 
>
> Key: FLINK-12260
> URL: https://issues.apache.org/jira/browse/FLINK-12260
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.6.3
>Reporter: Hwanju Kim
>Priority: Critical
> Attachments: FLINK-12260-repro.diff
>
>
>  
> In 1.6.2., we have seen slot allocation failure keep happening for long time. 
> Having looked at the log, I see the following behavior:
>  # TM sends a registration request R1 to resource manager.
>  # R1 times out after 100ms, which is initial timeout.
>  # TM retries a registration request R2 to resource manager (with timeout 
> 200ms).
>  # R2 arrives first at resource manager and registered, and then TM gets 
> successful response moving onto step 5 below.
>  # On successful registration, R2's instance is put to 
> taskManagerRegistrations
>  # Then R1 arrives at resource manager and realizes the same TM resource ID 
> is already registered, which then unregisters R2's instance ID from 
> taskManagerRegistrations. A new instance ID for R1 is registered to 
> workerRegistration.
>  # R1's response is not handled though since it already timed out (see akka 
> temp actor resolve failure below), hence no registration to 
> taskManagerRegistrations.
>  # TM keeps heartbeating to the resource manager with slot status.
>  # Resource manager ignores this slot status, since taskManagerRegistrations 
> contains R2, not R1, which replaced R2 in workerRegistration at step 6.
>  # Slot request can never be fulfilled, timing out.
> The following is the debug logs for the above steps:
>  
> {code:java}
> JM log:
> 2019-04-11 22:39:40.000,Registering TaskManager with ResourceID 
> 46c8e0d0fcf2c306f11954a1040d5677 
> (akka.ssl.tcp://flink@flink-taskmanager:6122/user/taskmanager_0) at 
> ResourceManager
> 2019-04-11 22:39:40.000,Registering TaskManager 
> 46c8e0d0fcf2c306f11954a1040d5677 under deade132e2c41c52019cdc27977266cf at 
> the SlotManager.
> 2019-04-11 22:39:40.000,Replacing old registration of TaskExecutor 
> 46c8e0d0fcf2c306f11954a1040d5677.
> 2019-04-11 22:39:40.000,Unregister TaskManager 
> deade132e2c41c52019cdc27977266cf from the SlotManager.
> 2019-04-11 22:39:40.000,Registering TaskManager with ResourceID 
> 46c8e0d0fcf2c306f11954a1040d5677 
> (akka.ssl.tcp://flink@flink-taskmanager:6122/user/taskmanager_0) at 
> ResourceManager
> TM log:
> 2019-04-11 22:39:40.000,Registration at ResourceManager attempt 1 
> (timeout=100ms)
> 2019-04-11 22:39:40.000,Registration at ResourceManager 
> 

[jira] [Commented] (FLINK-12260) Slot allocation failure by taskmanager registration timeout and race

2019-04-30 Thread Hwanju Kim (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12260?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16830100#comment-16830100
 ] 

Hwanju Kim commented on FLINK-12260:


Thanks Till. I had tested with attempt count addition to RPC before your 
comment. I worked as expected.

At any rate, I may need some clarification on your idea about reference 
equality, regarding how it could address this issue. Although the repro uses 
delay in task executor connection, as mentioned, the race may happen on sender 
side. Even for dealing with race in the task executor connection, I am curious 
how reference check alone would work. If receiver-only approach can work, it 
would be better compared to attempt count one.

> Slot allocation failure by taskmanager registration timeout and race
> 
>
> Key: FLINK-12260
> URL: https://issues.apache.org/jira/browse/FLINK-12260
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.6.3
>Reporter: Hwanju Kim
>Priority: Critical
> Attachments: FLINK-12260-repro.diff
>
>
>  
> In 1.6.2., we have seen slot allocation failure keep happening for long time. 
> Having looked at the log, I see the following behavior:
>  # TM sends a registration request R1 to resource manager.
>  # R1 times out after 100ms, which is initial timeout.
>  # TM retries a registration request R2 to resource manager (with timeout 
> 200ms).
>  # R2 arrives first at resource manager and registered, and then TM gets 
> successful response moving onto step 5 below.
>  # On successful registration, R2's instance is put to 
> taskManagerRegistrations
>  # Then R1 arrives at resource manager and realizes the same TM resource ID 
> is already registered, which then unregisters R2's instance ID from 
> taskManagerRegistrations. A new instance ID for R1 is registered to 
> workerRegistration.
>  # R1's response is not handled though since it already timed out (see akka 
> temp actor resolve failure below), hence no registration to 
> taskManagerRegistrations.
>  # TM keeps heartbeating to the resource manager with slot status.
>  # Resource manager ignores this slot status, since taskManagerRegistrations 
> contains R2, not R1, which replaced R2 in workerRegistration at step 6.
>  # Slot request can never be fulfilled, timing out.
> The following is the debug logs for the above steps:
>  
> {code:java}
> JM log:
> 2019-04-11 22:39:40.000,Registering TaskManager with ResourceID 
> 46c8e0d0fcf2c306f11954a1040d5677 
> (akka.ssl.tcp://flink@flink-taskmanager:6122/user/taskmanager_0) at 
> ResourceManager
> 2019-04-11 22:39:40.000,Registering TaskManager 
> 46c8e0d0fcf2c306f11954a1040d5677 under deade132e2c41c52019cdc27977266cf at 
> the SlotManager.
> 2019-04-11 22:39:40.000,Replacing old registration of TaskExecutor 
> 46c8e0d0fcf2c306f11954a1040d5677.
> 2019-04-11 22:39:40.000,Unregister TaskManager 
> deade132e2c41c52019cdc27977266cf from the SlotManager.
> 2019-04-11 22:39:40.000,Registering TaskManager with ResourceID 
> 46c8e0d0fcf2c306f11954a1040d5677 
> (akka.ssl.tcp://flink@flink-taskmanager:6122/user/taskmanager_0) at 
> ResourceManager
> TM log:
> 2019-04-11 22:39:40.000,Registration at ResourceManager attempt 1 
> (timeout=100ms)
> 2019-04-11 22:39:40.000,Registration at ResourceManager 
> (akka.ssl.tcp://flink@flink-jobmanager:6123/user/resourcemanager) attempt 1 
> timed out after 100 ms
> 2019-04-11 22:39:40.000,Registration at ResourceManager attempt 2 
> (timeout=200ms)
> 2019-04-11 22:39:40.000,Successful registration at resource manager 
> akka.ssl.tcp://flink@flink-jobmanager:6123/user/resourcemanager under 
> registration id deade132e2c41c52019cdc27977266cf.
> 2019-04-11 22:39:41.000,resolve of path sequence [/temp/$c] failed{code}
>  
> As RPC calls seem to use akka ask, which creates temporary source actor, I 
> think multiple RPC calls could've arrived out or order by different actor 
> pairs and the symptom above seems to be due to that. If so, it could have 
> attempt account in the call argument to prevent unexpected unregistration? At 
> this point, what I have done is only log analysis, so I could do further 
> analysis, but before that wanted to check if it's a known issue. I also 
> searched with some relevant terms and log pieces, but couldn't find the 
> duplicate. Please deduplicate if any.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-12260) Slot allocation failure by taskmanager registration timeout and race

2019-04-26 Thread Hwanju Kim (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12260?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16827369#comment-16827369
 ] 

Hwanju Kim edited comment on FLINK-12260 at 4/26/19 11:30 PM:
--

I got repro but in somewhat tricky way, since it's definitely rarely happening 
race. But as mentioned, once it falls into this state, it can't get out of the 
state (by assuming that we're not using active resource manager).

In the repro, I injected artificial delay to RM->TM connection on task executor 
registration, which can timeout the first registration request resulting in 2nd 
try. Since RM->TM connection is carried out in a separate thread via akka ask 
call, delaying here can't block the resource manager endpoint mailbox 
processing, so any further request can be processed during the delay. I 
initially added the delay in handling registerTaskExecutorInternal, but as it 
uses RPC's executor, the delay blocks all the further retries, hence not 
reproducing the race. With the delay in TM connection, 2nd task registration 
attempt can overtake the 1st one going ahead with TM registration, and then the 
resumed 1st request unregisters the TM registration. Although I mimicked the 
race on RM side, I think still sender side can also have potential delay (like 
by network) during tell part in akka ask causing timeout and leading to 2nd try 
racing 1st one. The latter was trickier to mimic, so I tried the first 
approach. 

The following is the JM/TM logs.

JM log:
{code:java}
2019-04-26 17:14:44,921 DEBUG org.apache.flink.runtime.rpc.akka.AkkaRpcService 
- Try to connect to remote RPC endpoint with address 
akka.ssl.tcp://flink@192.168.69.15:6122/user/taskmanager_0. Returning a 
org.apache.flink.runtime.taskexecutor.TaskExecutorGateway gateway. 
2019-04-26 17:14:44,924 INFO org.apache.flink.runtime.rpc.akka.AkkaRpcService - 
[REPRO] thread 19 attempt 1
2019-04-26 17:14:44,996 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcService  
            - [REPRO] thread 22 sleep...
2019-04-26 17:14:45,021 DEBUG org.apache.flink.runtime.rpc.akka.AkkaRpcService 
- Try to connect to remote RPC endpoint with address 
akka.ssl.tcp://flink@192.168.69.15:6122/user/taskmanager_0. Returning a 
org.apache.flink.runtime.taskexecutor.TaskExecutorGateway gateway.
2019-04-26 17:14:45,022 INFO org.apache.flink.runtime.rpc.akka.AkkaRpcService - 
[REPRO] thread 19 attempt 2
2019-04-26 17:14:45,038 INFO  
org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  - 
Registering TaskManager with ResourceID d0a410ee9060e62e0c7ef9e46f6418da 
(akka.ssl.tcp://flink@192.168.69.15:6122/user/taskmanager_0) at ResourceManager 
under instance id fa4408b5412bb8c18a6a7e58fdc8ff18
2019-04-26 17:14:45,093 DEBUG 
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager  - Registering 
TaskManager d0a410ee9060e62e0c7ef9e46f6418da under 
fa4408b5412bb8c18a6a7e58fdc8ff18 at the SlotManager.
2019-04-26 17:14:45,997 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcService  
            - [REPRO] thread 22 done
2019-04-26 17:14:45,998 DEBUG 
org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  - Replacing 
old registration of TaskExecutor d0a410ee9060e62e0c7ef9e46f6418da.
2019-04-26 17:14:45,998 DEBUG 
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager  - Unregister 
TaskManager fa4408b5412bb8c18a6a7e58fdc8ff18 from the SlotManager.
2019-04-26 17:14:46,000 INFO  
org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  - 
Registering TaskManager with ResourceID d0a410ee9060e62e0c7ef9e46f6418da 
(akka.ssl.tcp://flink@192.168.69.15:6122/user/taskmanager_0) at ResourceManager 
under instance id ebad00b418637d2774b8f131d49cc79e
2019-04-26 17:14:46,000 DEBUG 
org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  - The 
target with resource ID d0a410ee9060e62e0c7ef9e46f6418da is already been 
monitored.
2019-04-26 17:14:47,387 DEBUG 
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager  - Received 
slot report from instance ebad00b418637d2774b8f131d49cc79e.
2019-04-26 17:14:47,387 DEBUG 
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager  - Received 
slot report for unknown task manager with instance id 
ebad00b418637d2774b8f131d49cc79e. Ignoring this report.
2019-04-26 17:19:48,045 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job  
(ed0fbfff272391d1f2a98de45fda6453) switched from state RUNNING to FAILING.
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: 
Could not allocate all requires slots within timeout of 30 ms. Slots 
required: 1, slots allocated: 0
...
{code}
TM log:

 
{code:java}
2019-04-26 17:14:44,897 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor            - Resolved 
ResourceManager address, beginning registration
2019-04-26 17:14:44,897 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor        

[jira] [Commented] (FLINK-12260) Slot allocation failure by taskmanager registration timeout and race

2019-04-26 Thread Hwanju Kim (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12260?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16827369#comment-16827369
 ] 

Hwanju Kim commented on FLINK-12260:


I got repro but in somewhat tricky way, since it's definitely rarely happening 
race. But as mentioned, once it falls into this state, it can't get out of the 
state (by assuming that we're not using active resource manager).

In the repro, I injected artificial delay to RM->TM connection on task executor 
registration, which can timeout the first registration request resulting in 2nd 
try. Since RM->TM connection is carried out in a separate thread via akka ask 
call, delaying here can't block the resource manager endpoint mailbox 
processing, so any further request can be processed during the delay. I 
initially added the delay in handling registerTaskExecutorInternal, but as it 
uses RPC's executor, the delay blocks all the further retries, hence not 
reproducing the race. With the delay in TM connection, 2nd task registration 
attempt can overtake the 1st one going ahead with TM registration, and then the 
resumed 1st request unregisters the TM registration. Although I mimicked the 
race on RM side, I think still sender side can also have potential delay (like 
by network) during tell part in akka ask causing timeout and leading to 2nd try 
racing 1st one. The latter was trickier to mimic, so I tried the first 
approach. 

The following is the JM/TM logs.

JM log:
{code:java}
2019-04-26 17:14:44,921 DEBUG org.apache.flink.runtime.rpc.akka.AkkaRpcService 
- Try to connect to remote RPC endpoint with address 
akka.ssl.tcp://flink@192.168.69.15:6122/user/taskmanager_0. Returning a 
org.apache.flink.runtime.taskexecutor.TaskExecutorGateway gateway. 
2019-04-26 17:14:44,924 INFO org.apache.flink.runtime.rpc.akka.AkkaRpcService - 
[REPRO] thread 19 attempt 1
2019-04-26 17:14:44,996 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcService  
            - [REPRO] thread 22 sleep...
2019-04-26 17:14:45,021 DEBUG org.apache.flink.runtime.rpc.akka.AkkaRpcService 
- Try to connect to remote RPC endpoint with address 
akka.ssl.tcp://flink@192.168.69.15:6122/user/taskmanager_0. Returning a 
org.apache.flink.runtime.taskexecutor.TaskExecutorGateway gateway.
2019-04-26 17:14:45,022 INFO org.apache.flink.runtime.rpc.akka.AkkaRpcService - 
[REPRO] thread 19 attempt 2
2019-04-26 17:14:45,038 INFO  
org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  - 
Registering TaskManager with ResourceID d0a410ee9060e62e0c7ef9e46f6418da 
(akka.ssl.tcp://flink@192.168.69.15:6122/user/taskmanager_0) at ResourceManager 
under instance id fa4408b5412bb8c18a6a7e58fdc8ff18
2019-04-26 17:14:45,093 DEBUG 
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager  - Registering 
TaskManager d0a410ee9060e62e0c7ef9e46f6418da under 
fa4408b5412bb8c18a6a7e58fdc8ff18 at the SlotManager.
2019-04-26 17:14:45,997 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcService  
            - [REPRO] thread 22 done
2019-04-26 17:14:45,998 DEBUG 
org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  - Replacing 
old registration of TaskExecutor d0a410ee9060e62e0c7ef9e46f6418da.
2019-04-26 17:14:45,998 DEBUG 
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager  - Unregister 
TaskManager fa4408b5412bb8c18a6a7e58fdc8ff18 from the SlotManager.
2019-04-26 17:14:46,000 INFO  
org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  - 
Registering TaskManager with ResourceID d0a410ee9060e62e0c7ef9e46f6418da 
(akka.ssl.tcp://flink@192.168.69.15:6122/user/taskmanager_0) at ResourceManager 
under instance id ebad00b418637d2774b8f131d49cc79e
2019-04-26 17:14:46,000 DEBUG 
org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  - The 
target with resource ID d0a410ee9060e62e0c7ef9e46f6418da is already been 
monitored.
2019-04-26 17:14:47,387 DEBUG 
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager  - Received 
slot report from instance ebad00b418637d2774b8f131d49cc79e.
2019-04-26 17:14:47,387 DEBUG 
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager  - Received 
slot report for unknown task manager with instance id 
ebad00b418637d2774b8f131d49cc79e. Ignoring this report.
2019-04-26 17:19:48,045 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job  
(ed0fbfff272391d1f2a98de45fda6453) switched from state RUNNING to FAILING.
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: 
Could not allocate all requires slots within timeout of 30 ms. Slots 
required: 1, slots allocated: 0
...
{code}
TM log:

 
{code:java}
2019-04-26 17:14:44,897 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor            - Resolved 
ResourceManager address, beginning registration
2019-04-26 17:14:44,897 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor            - Registration at 
ResourceManager attempt 1 

[jira] [Updated] (FLINK-12260) Slot allocation failure by taskmanager registration timeout and race

2019-04-26 Thread Hwanju Kim (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12260?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hwanju Kim updated FLINK-12260:
---
Attachment: FLINK-12260-repro.diff

> Slot allocation failure by taskmanager registration timeout and race
> 
>
> Key: FLINK-12260
> URL: https://issues.apache.org/jira/browse/FLINK-12260
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.6.3
>Reporter: Hwanju Kim
>Priority: Critical
> Attachments: FLINK-12260-repro.diff
>
>
>  
> In 1.6.2., we have seen slot allocation failure keep happening for long time. 
> Having looked at the log, I see the following behavior:
>  # TM sends a registration request R1 to resource manager.
>  # R1 times out after 100ms, which is initial timeout.
>  # TM retries a registration request R2 to resource manager (with timeout 
> 200ms).
>  # R2 arrives first at resource manager and registered, and then TM gets 
> successful response moving onto step 5 below.
>  # On successful registration, R2's instance is put to 
> taskManagerRegistrations
>  # Then R1 arrives at resource manager and realizes the same TM resource ID 
> is already registered, which then unregisters R2's instance ID from 
> taskManagerRegistrations. A new instance ID for R1 is registered to 
> workerRegistration.
>  # R1's response is not handled though since it already timed out (see akka 
> temp actor resolve failure below), hence no registration to 
> taskManagerRegistrations.
>  # TM keeps heartbeating to the resource manager with slot status.
>  # Resource manager ignores this slot status, since taskManagerRegistrations 
> contains R2, not R1, which replaced R2 in workerRegistration at step 6.
>  # Slot request can never be fulfilled, timing out.
> The following is the debug logs for the above steps:
>  
> {code:java}
> JM log:
> 2019-04-11 22:39:40.000,Registering TaskManager with ResourceID 
> 46c8e0d0fcf2c306f11954a1040d5677 
> (akka.ssl.tcp://flink@flink-taskmanager:6122/user/taskmanager_0) at 
> ResourceManager
> 2019-04-11 22:39:40.000,Registering TaskManager 
> 46c8e0d0fcf2c306f11954a1040d5677 under deade132e2c41c52019cdc27977266cf at 
> the SlotManager.
> 2019-04-11 22:39:40.000,Replacing old registration of TaskExecutor 
> 46c8e0d0fcf2c306f11954a1040d5677.
> 2019-04-11 22:39:40.000,Unregister TaskManager 
> deade132e2c41c52019cdc27977266cf from the SlotManager.
> 2019-04-11 22:39:40.000,Registering TaskManager with ResourceID 
> 46c8e0d0fcf2c306f11954a1040d5677 
> (akka.ssl.tcp://flink@flink-taskmanager:6122/user/taskmanager_0) at 
> ResourceManager
> TM log:
> 2019-04-11 22:39:40.000,Registration at ResourceManager attempt 1 
> (timeout=100ms)
> 2019-04-11 22:39:40.000,Registration at ResourceManager 
> (akka.ssl.tcp://flink@flink-jobmanager:6123/user/resourcemanager) attempt 1 
> timed out after 100 ms
> 2019-04-11 22:39:40.000,Registration at ResourceManager attempt 2 
> (timeout=200ms)
> 2019-04-11 22:39:40.000,Successful registration at resource manager 
> akka.ssl.tcp://flink@flink-jobmanager:6123/user/resourcemanager under 
> registration id deade132e2c41c52019cdc27977266cf.
> 2019-04-11 22:39:41.000,resolve of path sequence [/temp/$c] failed{code}
>  
> As RPC calls seem to use akka ask, which creates temporary source actor, I 
> think multiple RPC calls could've arrived out or order by different actor 
> pairs and the symptom above seems to be due to that. If so, it could have 
> attempt account in the call argument to prevent unexpected unregistration? At 
> this point, what I have done is only log analysis, so I could do further 
> analysis, but before that wanted to check if it's a known issue. I also 
> searched with some relevant terms and log pieces, but couldn't find the 
> duplicate. Please deduplicate if any.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-12260) Slot allocation failure by taskmanager registration timeout and race

2019-04-23 Thread Hwanju Kim (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12260?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16824340#comment-16824340
 ] 

Hwanju Kim commented on FLINK-12260:


Thanks Till for the comment. I haven't attempted to reproduce the issue 
locally, and I will try reproducing the issue with the better logging and will 
update this thread once I get more info.

Regarding odd timestamps of the logs, this is our logging issue we plan to fix, 
which loses millisecond granularity when sending flink logs to long-term log 
archive. As this was the postmortem analysis, I couldn't gather the live flink 
logs, that was why the logs zeroed out milliseconds part.

> Slot allocation failure by taskmanager registration timeout and race
> 
>
> Key: FLINK-12260
> URL: https://issues.apache.org/jira/browse/FLINK-12260
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.6.3
>Reporter: Hwanju Kim
>Priority: Major
>
>  
> In 1.6.2., we have seen slot allocation failure keep happening for long time. 
> Having looked at the log, I see the following behavior:
>  # TM sends a registration request R1 to resource manager.
>  # R1 times out after 100ms, which is initial timeout.
>  # TM retries a registration request R2 to resource manager (with timeout 
> 200ms).
>  # R2 arrives first at resource manager and registered, and then TM gets 
> successful response moving onto step 5 below.
>  # On successful registration, R2's instance is put to 
> taskManagerRegistrations
>  # Then R1 arrives at resource manager and realizes the same TM resource ID 
> is already registered, which then unregisters R2's instance ID from 
> taskManagerRegistrations. A new instance ID for R1 is registered to 
> workerRegistration.
>  # R1's response is not handled though since it already timed out (see akka 
> temp actor resolve failure below), hence no registration to 
> taskManagerRegistrations.
>  # TM keeps heartbeating to the resource manager with slot status.
>  # Resource manager ignores this slot status, since taskManagerRegistrations 
> contains R2, not R1, which replaced R2 in workerRegistration at step 6.
>  # Slot request can never be fulfilled, timing out.
> The following is the debug logs for the above steps:
>  
> {code:java}
> JM log:
> 2019-04-11 22:39:40.000,Registering TaskManager with ResourceID 
> 46c8e0d0fcf2c306f11954a1040d5677 
> (akka.ssl.tcp://flink@flink-taskmanager:6122/user/taskmanager_0) at 
> ResourceManager
> 2019-04-11 22:39:40.000,Registering TaskManager 
> 46c8e0d0fcf2c306f11954a1040d5677 under deade132e2c41c52019cdc27977266cf at 
> the SlotManager.
> 2019-04-11 22:39:40.000,Replacing old registration of TaskExecutor 
> 46c8e0d0fcf2c306f11954a1040d5677.
> 2019-04-11 22:39:40.000,Unregister TaskManager 
> deade132e2c41c52019cdc27977266cf from the SlotManager.
> 2019-04-11 22:39:40.000,Registering TaskManager with ResourceID 
> 46c8e0d0fcf2c306f11954a1040d5677 
> (akka.ssl.tcp://flink@flink-taskmanager:6122/user/taskmanager_0) at 
> ResourceManager
> TM log:
> 2019-04-11 22:39:40.000,Registration at ResourceManager attempt 1 
> (timeout=100ms)
> 2019-04-11 22:39:40.000,Registration at ResourceManager 
> (akka.ssl.tcp://flink@flink-jobmanager:6123/user/resourcemanager) attempt 1 
> timed out after 100 ms
> 2019-04-11 22:39:40.000,Registration at ResourceManager attempt 2 
> (timeout=200ms)
> 2019-04-11 22:39:40.000,Successful registration at resource manager 
> akka.ssl.tcp://flink@flink-jobmanager:6123/user/resourcemanager under 
> registration id deade132e2c41c52019cdc27977266cf.
> 2019-04-11 22:39:41.000,resolve of path sequence [/temp/$c] failed{code}
>  
> As RPC calls seem to use akka ask, which creates temporary source actor, I 
> think multiple RPC calls could've arrived out or order by different actor 
> pairs and the symptom above seems to be due to that. If so, it could have 
> attempt account in the call argument to prevent unexpected unregistration? At 
> this point, what I have done is only log analysis, so I could do further 
> analysis, but before that wanted to check if it's a known issue. I also 
> searched with some relevant terms and log pieces, but couldn't find the 
> duplicate. Please deduplicate if any.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-11127) Make metrics query service establish connection to JobManager

2019-04-22 Thread Hwanju Kim (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11127?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16821683#comment-16821683
 ] 

Hwanju Kim edited comment on FLINK-11127 at 4/22/19 7:06 AM:
-

We experienced a similar issue to this, but more seriously with the 
communication between resource manager and task manager. In a normal situation, 
it works fine since only TMs actively connect to JM, whose name is resolvable 
(i.e., there's no outbound association from JM actor, only inbound). However, 
if a TM has a fatal error such as a task not responding to canceling request, 
it does graceful cleanup, a part of which is closing akka system sending a 
poison pill to JM, and then shutdown itself. Once this poison pill is gotten in 
JM, (as part of fail-over restart) its actor starts doing outbound association 
to destination TM host name that was provided during initial handshake. This 
outbound association here can't be succeeded if TM is not accessible via host 
name like in general Kubernetes setting. From this point on, TM can talk to JM 
for TM registration, but JM can't respond to this registration request, since 
outbound association can never be made. This failure of outbound association 
from JM's akka endpoint causes indefinite stuck in task scheduling due to the 
failure of TM registration with this error:
{code:java}
2019-02-28 21:58:15,867 DEBUG 
org.apache.flink.runtime.taskexecutor.TaskExecutor            - Could not 
resolve ResourceManager address 
akka.ssl.tcp://flink@flink-jobmanager:6123/user/resourcemanager, retrying in 
1 ms.
{code}
In response to constant failure like above, JM has slot allocation failure 
indefinitely as well:
{code:java}
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: 
Could not allocate all requires slots within timeout of 30 ms
{code}
We know there's multiple workarounds suggested here in this thread like 
stateful set, init container, and the passing JVM argument, but we did not want 
to add artifacts and complexity to deployment in production just to fix this 
issue (I tried the last taskmanager.host one as it's the least invasive to 
deployment, but it did not work for our case). Therefore, we went ahead adding 
"_taskmanager.rpc.use-host-address_" configuration in Flink and it's false by 
default, but if it's set to true, only in RPC setting, TM simply uses 
_taskManagerAddress.getHostAddress()_ instead of 
_taskManagerAddress.getHostName()_ (actual patch is a few lines as you could 
expect). It was minimal enough to us and it has been solving the problem so 
far. We decided to do this way because this could be a helpful option for an 
environment like the usual Kubernetes setting without TM stateful set or 
tweaks. -I am not sure if you guys are interested in this way, but sharing this 
for thought or interest.-

_*Since I wrote this, I found that FLINK-11632 had done what I described and 
it's been applied to 1.7 onward.*_ 

 


was (Author: hwanju):
We experienced a similar issue to this, but more seriously with the 
communication between resource manager and task manager. In a normal situation, 
it works fine since only TMs actively connect to JM, whose name is resolvable 
(i.e., there's no outbound association from JM actor, only inbound). However, 
if a TM has a fatal error such as a task not responding to canceling request, 
it does graceful cleanup, a part of which is closing akka system sending a 
poison pill to JM, and then shutdown itself. Once this poison pill is gotten in 
JM, (as part of fail-over restart) its actor starts doing outbound association 
to destination TM host name that was provided during initial handshake. This 
outbound association here can't be succeeded if TM is not accessible via host 
name like in general Kubernetes setting. From this point on, TM can talk to JM 
for TM registration, but JM can't respond to this registration request, since 
outbound association can never be made. This failure of outbound association 
from JM's akka endpoint causes indefinite stuck in task scheduling due to the 
failure of TM registration with this error:
{code:java}
2019-02-28 21:58:15,867 DEBUG 
org.apache.flink.runtime.taskexecutor.TaskExecutor            - Could not 
resolve ResourceManager address 
akka.ssl.tcp://flink@flink-jobmanager:6123/user/resourcemanager, retrying in 
1 ms.
{code}
In response to constant failure like above, JM has slot allocation failure 
indefinitely as well:
{code:java}
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: 
Could not allocate all requires slots within timeout of 30 ms
{code}
We know there's multiple workarounds suggested here in this thread like 
stateful set, init container, and the passing JVM argument, but we did not want 
to add artifacts and complexity to deployment in production just to fix this 
issue (I tried the last taskmanager.host one 

[jira] [Commented] (FLINK-11127) Make metrics query service establish connection to JobManager

2019-04-19 Thread Hwanju Kim (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11127?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16821683#comment-16821683
 ] 

Hwanju Kim commented on FLINK-11127:


We experienced a similar issue to this, but more seriously with the 
communication between resource manager and task manager. In a normal situation, 
it works fine since only TMs actively connect to JM, whose name is resolvable 
(i.e., there's no outbound association from JM actor, only inbound). However, 
if a TM has a fatal error such as a task not responding to canceling request, 
it does graceful cleanup, a part of which is closing akka system sending a 
poison pill to JM, and then shutdown itself. Once this poison pill is gotten in 
JM, (as part of fail-over restart) its actor starts doing outbound association 
to destination TM host name that was provided during initial handshake. This 
outbound association here can't be succeeded if TM is not accessible via host 
name like in general Kubernetes setting. From this point on, TM can talk to JM 
for TM registration, but JM can't respond to this registration request, since 
outbound association can never be made. This failure of outbound association 
from JM's akka endpoint causes indefinite stuck in task scheduling due to the 
failure of TM registration with this error:
{code:java}
2019-02-28 21:58:15,867 DEBUG 
org.apache.flink.runtime.taskexecutor.TaskExecutor            - Could not 
resolve ResourceManager address 
akka.ssl.tcp://flink@flink-jobmanager:6123/user/resourcemanager, retrying in 
1 ms.
{code}
In response to constant failure like above, JM has slot allocation failure 
indefinitely as well:
{code:java}
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: 
Could not allocate all requires slots within timeout of 30 ms
{code}
We know there's multiple workarounds suggested here in this thread like 
stateful set, init container, and the passing JVM argument, but we did not want 
to add artifacts and complexity to deployment in production just to fix this 
issue (I tried the last taskmanager.host one as it's the least invasive to 
deployment, but it did not work for our case). Therefore, we went ahead adding 
"_taskmanager.rpc.use-host-address_" configuration in Flink and it's false by 
default, but if it's set to true, only in RPC setting, TM simply uses 
_taskManagerAddress.getHostAddress()_ instead of 
_taskManagerAddress.getHostName()_ (actual patch is a few lines as you could 
expect). It was minimal enough to us and it has been solving the problem so 
far. We decided to do this way because this could be a helpful option for an 
environment like the usual Kubernetes setting without TM stateful set or 
tweaks. I am not sure if you guys are interested in this way, but sharing this 
for thought or interest.

 

> Make metrics query service establish connection to JobManager
> -
>
> Key: FLINK-11127
> URL: https://issues.apache.org/jira/browse/FLINK-11127
> Project: Flink
>  Issue Type: Improvement
>  Components: Deployment / Kubernetes, Runtime / Coordination, Runtime 
> / Metrics
>Affects Versions: 1.7.0
>Reporter: Ufuk Celebi
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> As part of FLINK-10247, the internal metrics query service has been separated 
> into its own actor system. Before this change, the JobManager (JM) queried 
> TaskManager (TM) metrics via the TM actor. Now, the JM needs to establish a 
> separate connection to the TM metrics query service actor.
> In the context of Kubernetes, this is problematic as the JM will typically 
> *not* be able to resolve the TMs by name, resulting in warnings as follows:
> {code}
> 2018-12-11 08:32:33,962 WARN  akka.remote.ReliableDeliverySupervisor  
>   - Association with remote system 
> [akka.tcp://flink-metrics@flink-task-manager-64b868487c-x9l4b:39183] has 
> failed, address is now gated for [50] ms. Reason: [Association failed with 
> [akka.tcp://flink-metrics@flink-task-manager-64b868487c-x9l4b:39183]] Caused 
> by: [flink-task-manager-64b868487c-x9l4b: Name does not resolve]
> {code}
> In order to expose the TMs by name in Kubernetes, users require a service 
> *for each* TM instance which is not practical.
> This currently results in the web UI not being to display some basic metrics 
> about number of sent records. You can reproduce this by following the READMEs 
> in {{flink-container/kubernetes}}.
> This worked before, because the JM is typically exposed via a service with a 
> known name and the TMs establish the connection to it which the metrics query 
> service piggybacked on.
> A potential solution to this might be to let the query service connect to the 
> JM similar to how the 

[jira] [Created] (FLINK-12260) Slot allocation failure by taskmanager registration timeout and race

2019-04-18 Thread Hwanju Kim (JIRA)
Hwanju Kim created FLINK-12260:
--

 Summary: Slot allocation failure by taskmanager registration 
timeout and race
 Key: FLINK-12260
 URL: https://issues.apache.org/jira/browse/FLINK-12260
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Coordination
Affects Versions: 1.6.3
Reporter: Hwanju Kim


 

In 1.6.2., we have seen slot allocation failure keep happening for long time. 
Having looked at the log, I see the following behavior:
 # TM sends a registration request R1 to resource manager.
 # R1 times out after 100ms, which is initial timeout.
 # TM retries a registration request R2 to resource manager (with timeout 
200ms).
 # R2 arrives first at resource manager and registered, and then TM gets 
successful response moving onto step 5 below.
 # On successful registration, R2's instance is put to taskManagerRegistrations
 # Then R1 arrives at resource manager and realizes the same TM resource ID is 
already registered, which then unregisters R2's instance ID from 
taskManagerRegistrations. A new instance ID for R1 is registered to 
workerRegistration.
 # R1's response is not handled though since it already timed out (see akka 
temp actor resolve failure below), hence no registration to 
taskManagerRegistrations.
 # TM keeps heartbeating to the resource manager with slot status.
 # Resource manager ignores this slot status, since taskManagerRegistrations 
contains R2, not R1, which replaced R2 in workerRegistration at step 6.
 # Slot request can never be fulfilled, timing out.

The following is the debug logs for the above steps:

 
{code:java}
JM log:

2019-04-11 22:39:40.000,Registering TaskManager with ResourceID 
46c8e0d0fcf2c306f11954a1040d5677 
(akka.ssl.tcp://flink@flink-taskmanager:6122/user/taskmanager_0) at 
ResourceManager

2019-04-11 22:39:40.000,Registering TaskManager 
46c8e0d0fcf2c306f11954a1040d5677 under deade132e2c41c52019cdc27977266cf at the 
SlotManager.

2019-04-11 22:39:40.000,Replacing old registration of TaskExecutor 
46c8e0d0fcf2c306f11954a1040d5677.

2019-04-11 22:39:40.000,Unregister TaskManager deade132e2c41c52019cdc27977266cf 
from the SlotManager.

2019-04-11 22:39:40.000,Registering TaskManager with ResourceID 
46c8e0d0fcf2c306f11954a1040d5677 
(akka.ssl.tcp://flink@flink-taskmanager:6122/user/taskmanager_0) at 
ResourceManager

TM log:

2019-04-11 22:39:40.000,Registration at ResourceManager attempt 1 
(timeout=100ms)

2019-04-11 22:39:40.000,Registration at ResourceManager 
(akka.ssl.tcp://flink@flink-jobmanager:6123/user/resourcemanager) attempt 1 
timed out after 100 ms

2019-04-11 22:39:40.000,Registration at ResourceManager attempt 2 
(timeout=200ms)

2019-04-11 22:39:40.000,Successful registration at resource manager 
akka.ssl.tcp://flink@flink-jobmanager:6123/user/resourcemanager under 
registration id deade132e2c41c52019cdc27977266cf.

2019-04-11 22:39:41.000,resolve of path sequence [/temp/$c] failed{code}
 

As RPC calls seem to use akka ask, which creates temporary source actor, I 
think multiple RPC calls could've arrived out or order by different actor pairs 
and the symptom above seems to be due to that. If so, it could have attempt 
account in the call argument to prevent unexpected unregistration? At this 
point, what I have done is only log analysis, so I could do further analysis, 
but before that wanted to check if it's a known issue. I also searched with 
some relevant terms and log pieces, but couldn't find the duplicate. Please 
deduplicate if any.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-12245) Transient slot allocation failure on job recovery

2019-04-18 Thread Hwanju Kim (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16821407#comment-16821407
 ] 

Hwanju Kim commented on FLINK-12245:


Thanks Till! So, FLINK-9635 indeed extracted the relevant subset for this issue 
from the scheduler refactoring in group-aware scheduler work. That's nice and 
what I wanted. I just cherry-picked the patch to 1.6.2 and retest worked fine. 

> Transient slot allocation failure on job recovery
> -
>
> Key: FLINK-12245
> URL: https://issues.apache.org/jira/browse/FLINK-12245
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.6.3
> Environment: Flink 1.6.2 with Kubernetes
>Reporter: Hwanju Kim
>Priority: Major
>
> In 1.6.2, We have experienced that slot allocation is transiently failed on 
> job recovery especially when task manager (TM) is unavailable leading to 
> heartbeat failure. By transient, it means it fails once with slot allocation 
> timeout (by default 5min) and then next recovering restart is succeeded.
>  
> I found that each _Execution_ remembers previous allocations and tries to 
> prefer the last previous allocation for the sake of local state recovery from 
> the resolved slot candidates. If the previous allocation belongs to 
> unavailable TM, the candidates do not have this previous allocation, thereby 
> forcing slot provider to request a new slot to resource manager, which then 
> finds a new TM and its available slots. So far it is expected and fine, but 
> any next execution that also belonged to the unavailable TM and has the first 
> task as predecessor fails with the unavailable previous allocation as well. 
> Here it also requests another new slot since it never finds the gone previous 
> allocation from candidates. However, this behavior may make more slot 
> requests than available. For example, if two pipelined tasks shared one slot 
> in one TM, which is then crashed being replaced with a new TM, two new slot 
> requests are generated from the tasks. Since two slot requests cannot be 
> fulfilled by one slot TM, it hits slot allocation timeout and restarts the 
> job. 
>  
> {code:java}
> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: 
> Could not allocate all requires slots within timeout of 30 ms. Slots 
> required: 2, slots allocated: 1 {code}
>  
> At the next round of recovery, since the second execution failed to allocate 
> a new slot, its last previous allocation is _null_, then it falls back to 
> locality-based allocation strategy, which can find the slot allocated for the 
> first task, and thus succeeded. Although it is eventually succeeded, it 
> increases downtime by slot allocation timeout.
>  
> The reason of this behavior is 
> _PreviousAllocationSchedulingStrategy.findMatchWithLocality()_ immediately 
> returns _null_ if previous allocation is not empty and is not contained in 
> candidate list. I thought that if previous allocation is not in the 
> candidates, it can fall back to 
> _LocationPreferenceSchedulingStrategy.findMatchWithLocality()_ rather than 
> returning _null_. By doing so, it can avoid requesting more than available. 
> Requesting more slots could be fine in an environment where resource managers 
> can reactively spawn up more TMs (like Yarn/Mesos) although it could spawn 
> more than needed, but StandaloneResourceManager with statically provisioned 
> resource cannot help but failing to allocate requested slots.
>  
> Having looked at the mainline branch and 1.8.0, although I have not attempted 
> to reproduce this issue with mainline, the related code is changed to what I 
> have expected (falling back to locality-based strategy if previous allocation 
> is not in candidates): 
> PreviousAllocationSlotSelectionStrategy.selectBestSlotForProfile(). Those led 
> me to reading group-aware scheduling work 
> ([https://docs.google.com/document/d/1q7NOqt05HIN-PlKEEPB36JiuU1Iu9fnxxVGJzylhsxU/edit#heading=h.k15nfgsa5bnk]).
>   In addition, I checked in 1.6.2 _matchPreviousLocationNotAvailable_ test 
> expects the problematic behavior I described. So, I started wondering whether 
> the behavior of previous allocation strategy in non-mainline is by design or 
> not. I have a fix similar to the mainline and verified that the problem is 
> resolved, but I am bringing up the issue to have context around the behavior 
> and to discuss what would be the side-effect of the fix. I understand the 
> current vertex-by-vertex scheduling would be inefficient by letting an 
> execution that belonged to unavailable slot steal another task's previous 
> slot, but having slot allocation failure seems worse to me.
>  
> I searched with slot allocation failure term in existing issues, and couldn't 
> find the same 

[jira] [Created] (FLINK-12245) Transient slot allocation failure on job recovery

2019-04-17 Thread Hwanju Kim (JIRA)
Hwanju Kim created FLINK-12245:
--

 Summary: Transient slot allocation failure on job recovery
 Key: FLINK-12245
 URL: https://issues.apache.org/jira/browse/FLINK-12245
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Coordination
Affects Versions: 1.6.3
 Environment: Flink 1.6.2 with Kubernetes
Reporter: Hwanju Kim


In 1.6.2, We have experienced that slot allocation is transiently failed on job 
recovery especially when task manager (TM) is unavailable leading to heartbeat 
failure. By transient, it means it fails once with slot allocation timeout (by 
default 5min) and then next recovering restart is succeeded.

 

I found that each _Execution_ remembers previous allocations and tries to 
prefer the last previous allocation for the sake of local state recovery from 
the resolved slot candidates. If the previous allocation belongs to unavailable 
TM, the candidates do not have this previous allocation, thereby forcing slot 
provider to request a new slot to resource manager, which then finds a new TM 
and its available slots. So far it is expected and fine, but any next execution 
that also belonged to the unavailable TM and has the first task as predecessor 
fails with the unavailable previous allocation as well. Here it also requests 
another new slot since it never finds the gone previous allocation from 
candidates. However, this behavior may make more slot requests than available. 
For example, if two pipelined tasks shared one slot in one TM, which is then 
crashed being replaced with a new TM, two new slot requests are generated from 
the tasks. Since two slot requests cannot be fulfilled by one slot TM, it hits 
slot allocation timeout and restarts the job. 

 
{code:java}
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: 
Could not allocate all requires slots within timeout of 30 ms. Slots 
required: 2, slots allocated: 1 {code}
 

At the next round of recovery, since the second execution failed to allocate a 
new slot, its last previous allocation is _null_, then it falls back to 
locality-based allocation strategy, which can find the slot allocated for the 
first task, and thus succeeded. Although it is eventually succeeded, it 
increases downtime by slot allocation timeout.

 

The reason of this behavior is 
_PreviousAllocationSchedulingStrategy.findMatchWithLocality()_ immediately 
returns _null_ if previous allocation is not empty and is not contained in 
candidate list. I thought that if previous allocation is not in the candidates, 
it can fall back to 
_LocationPreferenceSchedulingStrategy.findMatchWithLocality()_ rather than 
returning _null_. By doing so, it can avoid requesting more than available. 
Requesting more slots could be fine in an environment where resource managers 
can reactively spawn up more TMs (like Yarn/Mesos) although it could spawn more 
than needed, but StandaloneResourceManager with statically provisioned resource 
cannot help but failing to allocate requested slots.

 

Having looked at the mainline branch and 1.8.0, although I have not attempted 
to reproduce this issue with mainline, the related code is changed to what I 
have expected (falling back to locality-based strategy if previous allocation 
is not in candidates): 
PreviousAllocationSlotSelectionStrategy.selectBestSlotForProfile(). Those led 
me to reading group-aware scheduling work 
([https://docs.google.com/document/d/1q7NOqt05HIN-PlKEEPB36JiuU1Iu9fnxxVGJzylhsxU/edit#heading=h.k15nfgsa5bnk]).
  In addition, I checked in 1.6.2 _matchPreviousLocationNotAvailable_ test 
expects the problematic behavior I described. So, I started wondering whether 
the behavior of previous allocation strategy in non-mainline is by design or 
not. I have a fix similar to the mainline and verified that the problem is 
resolved, but I am bringing up the issue to have context around the behavior 
and to discuss what would be the side-effect of the fix. I understand the 
current vertex-by-vertex scheduling would be inefficient by letting an 
execution that belonged to unavailable slot steal another task's previous slot, 
but having slot allocation failure seems worse to me.

 

I searched with slot allocation failure term in existing issues, and couldn't 
find the same issue, hence this issue. Please feel free to deduplicate it if 
any.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)