[FLINK-3978] [core] Add hasBroadcastVariable method to RuntimeContext

New method RuntimeContext.hasBroadcastVariable(String).


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5b028797
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5b028797
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5b028797

Branch: refs/heads/master
Commit: 5b0287971fa2beda360105d96e7bfbc7a110fae7
Parents: 6db9e6a
Author: Greg Hogan <c...@greghogan.com>
Authored: Thu May 26 14:45:00 2016 -0400
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue May 31 16:39:56 2016 +0200

----------------------------------------------------------------------
 .../api/common/functions/RuntimeContext.java      | 10 ++++++++++
 .../common/functions/util/RuntimeUDFContext.java  |  5 +++++
 .../functions/util/RuntimeUDFContextTest.java     |  9 +++++++--
 .../util/DistributedRuntimeUDFContext.java        | 18 +++++++++---------
 .../api/operators/StreamingRuntimeContext.java    |  5 +++++
 5 files changed, 36 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/5b028797/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
index 72e1a4d..e409c11 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
@@ -164,6 +164,16 @@ public interface RuntimeContext {
        // 
--------------------------------------------------------------------------------------------
 
        /**
+        * Tests for the existence of the broadcast variable identified by the
+        * given {@code name}.
+        *
+        * @param name The name under which the broadcast variable is 
registered;
+        * @return Whether a broadcast variable exists for the given name.
+        */
+       @PublicEvolving
+       boolean hasBroadcastVariable(String name);
+
+       /**
         * Returns the result bound to the broadcast variable identified by the 
         * given {@code name}.
         * <p>

http://git-wip-us.apache.org/repos/asf/flink/blob/5b028797/flink-core/src/main/java/org/apache/flink/api/common/functions/util/RuntimeUDFContext.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/RuntimeUDFContext.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/RuntimeUDFContext.java
index 6571d0d..ba3f85e 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/RuntimeUDFContext.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/RuntimeUDFContext.java
@@ -49,6 +49,11 @@ public class RuntimeUDFContext extends 
AbstractRuntimeUDFContext {
        }
 
        @Override
+       public boolean hasBroadcastVariable(String name) {
+               return this.initializedBroadcastVars.containsKey(name) || 
this.uninitializedBroadcastVars.containsKey(name);
+       }
+
+       @Override
        @SuppressWarnings("unchecked")
        public <RT> List<RT> getBroadcastVariable(String name) {
                

http://git-wip-us.apache.org/repos/asf/flink/blob/5b028797/flink-core/src/test/java/org/apache/flink/api/common/functions/util/RuntimeUDFContextTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/functions/util/RuntimeUDFContextTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/functions/util/RuntimeUDFContextTest.java
index 8f00cd5..4cd2a64 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/common/functions/util/RuntimeUDFContextTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/functions/util/RuntimeUDFContextTest.java
@@ -48,7 +48,9 @@ public class RuntimeUDFContextTest {
                                        new HashMap<String, Future<Path>>(),
                                        new HashMap<String, Accumulator<?, 
?>>(),
                                        new UnregisteredMetricsGroup());
-                       
+
+                       assertFalse(ctx.hasBroadcastVariable("some name"));
+
                        try {
                                ctx.getBroadcastVariable("some name");
                                fail("should throw an exception");
@@ -85,7 +87,10 @@ public class RuntimeUDFContextTest {
                        
                        ctx.setBroadcastVariable("name1", Arrays.asList(1, 2, 
3, 4));
                        ctx.setBroadcastVariable("name2", Arrays.asList(1.0, 
2.0, 3.0, 4.0));
-                       
+
+                       assertTrue(ctx.hasBroadcastVariable("name1"));
+                       assertTrue(ctx.hasBroadcastVariable("name2"));
+
                        List<Integer> list1 = ctx.getBroadcastVariable("name1");
                        List<Double> list2 = ctx.getBroadcastVariable("name2");
                        

http://git-wip-us.apache.org/repos/asf/flink/blob/5b028797/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/DistributedRuntimeUDFContext.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/DistributedRuntimeUDFContext.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/DistributedRuntimeUDFContext.java
index 293d34f..6c7f5f3 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/DistributedRuntimeUDFContext.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/DistributedRuntimeUDFContext.java
@@ -47,11 +47,15 @@ public class DistributedRuntimeUDFContext extends 
AbstractRuntimeUDFContext {
                                                                                
        Map<String, Future<Path>> cpTasks, Map<String, Accumulator<?,?>> 
accumulators, MetricGroup metrics) {
                super(taskInfo, userCodeClassLoader, executionConfig, 
accumulators, cpTasks, metrics);
        }
-       
+
+       @Override
+       public boolean hasBroadcastVariable(String name) {
+               return this.broadcastVars.containsKey(name);
+       }
 
        @Override
        public <T> List<T> getBroadcastVariable(String name) {
-               Preconditions.checkNotNull(name);
+               Preconditions.checkNotNull(name, "The broadcast variable name 
must not be null.");
                
                // check if we have an initialized version
                @SuppressWarnings("unchecked")
@@ -71,13 +75,9 @@ public class DistributedRuntimeUDFContext extends 
AbstractRuntimeUDFContext {
        
        @Override
        public <T, C> C getBroadcastVariableWithInitializer(String name, 
BroadcastVariableInitializer<T, C> initializer) {
-               if (name == null) {
-                       throw new NullPointerException("Thw broadcast variable 
name must not be null.");
-               }
-               if (initializer == null) {
-                       throw new NullPointerException("Thw broadcast variable 
initializer must not be null.");
-               }
-               
+               Preconditions.checkNotNull(name, "The broadcast variable name 
must not be null.");
+               Preconditions.checkNotNull(initializer, "The broadcast variable 
initializer must not be null.");
+
                // check if we have an initialized version
                @SuppressWarnings("unchecked")
                BroadcastVariableMaterialization<T, C> variable = 
(BroadcastVariableMaterialization<T, C>) this.broadcastVars.get(name);

http://git-wip-us.apache.org/repos/asf/flink/blob/5b028797/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
index 5fef3c7..a858b4c 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
@@ -99,6 +99,11 @@ public class StreamingRuntimeContext extends 
AbstractRuntimeUDFContext {
        // 
------------------------------------------------------------------------
 
        @Override
+       public boolean hasBroadcastVariable(String name) {
+               throw new UnsupportedOperationException("Broadcast variables 
can only be used in DataSet programs");
+       }
+
+       @Override
        public <RT> List<RT> getBroadcastVariable(String name) {
                throw new UnsupportedOperationException("Broadcast variables 
can only be used in DataSet programs");
        }

Reply via email to