This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 27a1c41 [FLINK-12822] Add explicit transformer from SerializableOptional to Optional 27a1c41 is described below commit 27a1c415d39d07790c322fe12d7c4e8075477cea Author: tison <wander4...@gmail.com> AuthorDate: Thu Jun 13 10:39:05 2019 +0800 [FLINK-12822] Add explicit transformer from SerializableOptional to Optional This closes #8724. --- .../main/java/org/apache/flink/types/SerializableOptional.java | 10 +++++++--- .../apache/flink/runtime/resourcemanager/ResourceManager.java | 2 +- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/types/SerializableOptional.java b/flink-core/src/main/java/org/apache/flink/types/SerializableOptional.java index 89dcea4..d620e97 100644 --- a/flink-core/src/main/java/org/apache/flink/types/SerializableOptional.java +++ b/flink-core/src/main/java/org/apache/flink/types/SerializableOptional.java @@ -59,14 +59,18 @@ public final class SerializableOptional<T extends Serializable> implements Seria } } - public <R> Optional<R> map(Function<? super T, ? extends R> mapper) { + public <R extends Serializable> SerializableOptional<R> map(Function<? super T, ? extends R> mapper) { if (value == null) { - return Optional.empty(); + return empty(); } else { - return Optional.ofNullable(mapper.apply(value)); + return ofNullable(mapper.apply(value)); } } + public Optional<T> toOptional() { + return Optional.ofNullable(value); + } + public static <T extends Serializable> SerializableOptional<T> of(@Nonnull T value) { return new SerializableOptional<>(value); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java index 03e1d87..441cbbd 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java @@ -588,7 +588,7 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable> final CompletableFuture<Optional<Tuple2<ResourceID, String>>> metricQueryServiceAddressFuture = taskExecutorGateway .requestMetricQueryServiceAddress(timeout) - .thenApply(optional -> optional.map(path -> Tuple2.of(tmResourceId, path))); + .thenApply(o -> o.toOptional().map(address -> Tuple2.of(tmResourceId, address))); metricQueryServiceAddressFutures.add(metricQueryServiceAddressFuture); }