[FLINK-3978] [core] Add hasBroadcastVariable method to RuntimeContext New method RuntimeContext.hasBroadcastVariable(String).
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5b028797 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5b028797 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5b028797 Branch: refs/heads/master Commit: 5b0287971fa2beda360105d96e7bfbc7a110fae7 Parents: 6db9e6a Author: Greg Hogan <c...@greghogan.com> Authored: Thu May 26 14:45:00 2016 -0400 Committer: Stephan Ewen <se...@apache.org> Committed: Tue May 31 16:39:56 2016 +0200 ---------------------------------------------------------------------- .../api/common/functions/RuntimeContext.java | 10 ++++++++++ .../common/functions/util/RuntimeUDFContext.java | 5 +++++ .../functions/util/RuntimeUDFContextTest.java | 9 +++++++-- .../util/DistributedRuntimeUDFContext.java | 18 +++++++++--------- .../api/operators/StreamingRuntimeContext.java | 5 +++++ 5 files changed, 36 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/5b028797/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java index 72e1a4d..e409c11 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java @@ -164,6 +164,16 @@ public interface RuntimeContext { // -------------------------------------------------------------------------------------------- /** + * Tests for the existence of the broadcast variable identified by the + * given {@code name}. + * + * @param name The name under which the broadcast variable is registered; + * @return Whether a broadcast variable exists for the given name. + */ + @PublicEvolving + boolean hasBroadcastVariable(String name); + + /** * Returns the result bound to the broadcast variable identified by the * given {@code name}. * <p> http://git-wip-us.apache.org/repos/asf/flink/blob/5b028797/flink-core/src/main/java/org/apache/flink/api/common/functions/util/RuntimeUDFContext.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/RuntimeUDFContext.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/RuntimeUDFContext.java index 6571d0d..ba3f85e 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/RuntimeUDFContext.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/RuntimeUDFContext.java @@ -49,6 +49,11 @@ public class RuntimeUDFContext extends AbstractRuntimeUDFContext { } @Override + public boolean hasBroadcastVariable(String name) { + return this.initializedBroadcastVars.containsKey(name) || this.uninitializedBroadcastVars.containsKey(name); + } + + @Override @SuppressWarnings("unchecked") public <RT> List<RT> getBroadcastVariable(String name) { http://git-wip-us.apache.org/repos/asf/flink/blob/5b028797/flink-core/src/test/java/org/apache/flink/api/common/functions/util/RuntimeUDFContextTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/common/functions/util/RuntimeUDFContextTest.java b/flink-core/src/test/java/org/apache/flink/api/common/functions/util/RuntimeUDFContextTest.java index 8f00cd5..4cd2a64 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/functions/util/RuntimeUDFContextTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/functions/util/RuntimeUDFContextTest.java @@ -48,7 +48,9 @@ public class RuntimeUDFContextTest { new HashMap<String, Future<Path>>(), new HashMap<String, Accumulator<?, ?>>(), new UnregisteredMetricsGroup()); - + + assertFalse(ctx.hasBroadcastVariable("some name")); + try { ctx.getBroadcastVariable("some name"); fail("should throw an exception"); @@ -85,7 +87,10 @@ public class RuntimeUDFContextTest { ctx.setBroadcastVariable("name1", Arrays.asList(1, 2, 3, 4)); ctx.setBroadcastVariable("name2", Arrays.asList(1.0, 2.0, 3.0, 4.0)); - + + assertTrue(ctx.hasBroadcastVariable("name1")); + assertTrue(ctx.hasBroadcastVariable("name2")); + List<Integer> list1 = ctx.getBroadcastVariable("name1"); List<Double> list2 = ctx.getBroadcastVariable("name2"); http://git-wip-us.apache.org/repos/asf/flink/blob/5b028797/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/DistributedRuntimeUDFContext.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/DistributedRuntimeUDFContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/DistributedRuntimeUDFContext.java index 293d34f..6c7f5f3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/DistributedRuntimeUDFContext.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/DistributedRuntimeUDFContext.java @@ -47,11 +47,15 @@ public class DistributedRuntimeUDFContext extends AbstractRuntimeUDFContext { Map<String, Future<Path>> cpTasks, Map<String, Accumulator<?,?>> accumulators, MetricGroup metrics) { super(taskInfo, userCodeClassLoader, executionConfig, accumulators, cpTasks, metrics); } - + + @Override + public boolean hasBroadcastVariable(String name) { + return this.broadcastVars.containsKey(name); + } @Override public <T> List<T> getBroadcastVariable(String name) { - Preconditions.checkNotNull(name); + Preconditions.checkNotNull(name, "The broadcast variable name must not be null."); // check if we have an initialized version @SuppressWarnings("unchecked") @@ -71,13 +75,9 @@ public class DistributedRuntimeUDFContext extends AbstractRuntimeUDFContext { @Override public <T, C> C getBroadcastVariableWithInitializer(String name, BroadcastVariableInitializer<T, C> initializer) { - if (name == null) { - throw new NullPointerException("Thw broadcast variable name must not be null."); - } - if (initializer == null) { - throw new NullPointerException("Thw broadcast variable initializer must not be null."); - } - + Preconditions.checkNotNull(name, "The broadcast variable name must not be null."); + Preconditions.checkNotNull(initializer, "The broadcast variable initializer must not be null."); + // check if we have an initialized version @SuppressWarnings("unchecked") BroadcastVariableMaterialization<T, C> variable = (BroadcastVariableMaterialization<T, C>) this.broadcastVars.get(name); http://git-wip-us.apache.org/repos/asf/flink/blob/5b028797/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java index 5fef3c7..a858b4c 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java @@ -99,6 +99,11 @@ public class StreamingRuntimeContext extends AbstractRuntimeUDFContext { // ------------------------------------------------------------------------ @Override + public boolean hasBroadcastVariable(String name) { + throw new UnsupportedOperationException("Broadcast variables can only be used in DataSet programs"); + } + + @Override public <RT> List<RT> getBroadcastVariable(String name) { throw new UnsupportedOperationException("Broadcast variables can only be used in DataSet programs"); }