buptljy commented on a change in pull request #11048: [FLINK-15966][runtime]
Capture callstacks for RPC ask() calls to improve exceptions.
URL: https://github.com/apache/flink/pull/11048#discussion_r379743783
##
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
##
@@ -370,4 +375,33 @@ public String getHostname() {
public CompletableFuture getTerminationFuture() {
return terminationFuture;
}
+
+ static Object deserializeValueIfNeeded(Object o, Method method) {
+ if (o instanceof SerializedValue) {
+ try {
+ return ((SerializedValue)
o).deserializeValue(AkkaInvocationHandler.class.getClassLoader());
+ } catch (IOException | ClassNotFoundException e) {
+ throw new CompletionException(
+ new RpcException(
+ "Could not deserialize the
serialized payload of RPC method : " + method.getName(), e));
+ }
+ } else {
+ return o;
+ }
+ }
+
+ static Throwable resolveTimeoutException(Throwable exception, @Nullable
Throwable callStackCapture, Method method) {
+ if (callStackCapture == null || (!(exception instanceof
akka.pattern.AskTimeoutException))) {
+ return exception;
+ }
+
+ final TimeoutException newException = new
TimeoutException("Invocation of " + method + " timed out.");
+ newException.initCause(exception);
+
+ // remove the stack frames coming from the proxy interface
invocation
+ final StackTraceElement[] stackTrace =
callStackCapture.getStackTrace();
+ newException.setStackTrace(Arrays.copyOfRange(stackTrace, 3,
stackTrace.length));
Review comment:
Can we filter the stack trace containing o.a.f.runtime.rpc.akka instead of
using 3? Because others may change call stack of this function unintentionally
in the future.
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org
With regards,
Apache Git Services