zentol closed pull request #3615: [FLINK-2720] support flink-storm metrics
URL: https://github.com/apache/flink/pull/3615
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/.gitignore b/.gitignore
index 8fc9fce6fb2..b1806ad1d07 100644
--- a/.gitignore
+++ b/.gitignore
@@ -34,3 +34,4 @@ out/
 *.iws
 tools/flink
 tools/flink-*
+.cache-main
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
 
b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
index ac779a7faa2..8ce8776668c 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
@@ -24,6 +24,7 @@
 import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.common.accumulators.AccumulatorHelper;
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.optimizer.CompilerException;
@@ -184,7 +185,10 @@ public ClusterClient(Configuration flinkConfig, 
HighAvailabilityServices highAva
                        flinkConfig,
                        log);
 
-               this.highAvailabilityServices = 
Preconditions.checkNotNull(highAvailabilityServices);
+
+         this.highAvailabilityServices = 
Preconditions.checkNotNull(highAvailabilityServices);
+         this.detachedJobSubmission = 
flinkConfig.getBoolean(ConfigConstants.FLINK_DETACHED_JOB_SUBMISSION,
+                 ConfigConstants.DEFAULT_FLINK_DETACHED_JOB_SUBMISSION);
                this.sharedHaServices = sharedHaServices;
        }
 
diff --git 
a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/metric/CounterMetricAdapter.java
 
b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/metric/CounterMetricAdapter.java
new file mode 100644
index 00000000000..67f144248cf
--- /dev/null
+++ 
b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/metric/CounterMetricAdapter.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.storm.metric;
+
+import org.apache.flink.metrics.Counter;
+
+import org.apache.storm.metric.api.CountMetric;
+
+/**
+ * An adapter to compose the counter metric between flink counter and storm
+ * counter metric.
+ * @since Mar 5, 2017
+ */
+public class CounterMetricAdapter extends CountMetric {
+
+       private Counter delegate;
+
+       public CounterMetricAdapter(final Counter counter) {
+               this.delegate = counter;
+       }
+
+       public void incr() {
+               delegate.inc();
+       }
+
+       public void incrBy(final long incrementBy) {
+               delegate.inc(incrementBy);
+       }
+
+       public Long getValueAndReset() {
+               long count = delegate.getCount();
+               delegate.dec(delegate.getCount());
+               return count;
+       }
+
+}
diff --git 
a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/metric/MultiCountMetricAdapter.java
 
b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/metric/MultiCountMetricAdapter.java
new file mode 100644
index 00000000000..cfb7b77dc8a
--- /dev/null
+++ 
b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/metric/MultiCountMetricAdapter.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.storm.metric;
+
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+
+import org.apache.storm.metric.api.MultiCountMetric;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * @since Mar 5, 2017
+ */
+public class MultiCountMetricAdapter extends MultiCountMetric {
+
+       private Map<String, CounterMetricAdapter> counterPerKey = new 
HashMap<>();
+       private StreamingRuntimeContext context;
+
+       public MultiCountMetricAdapter(final StreamingRuntimeContext context) {
+               this.context = context;
+       }
+
+       public CounterMetricAdapter scope(final String key) {
+               CounterMetricAdapter val = counterPerKey.get(key);
+               if (val == null) {
+                       Counter flinkCounter = 
context.getMetricGroup().counter(key);
+                       val = new CounterMetricAdapter(flinkCounter);
+                       counterPerKey.put(key, val);
+               }
+               return val;
+       }
+
+       @SuppressWarnings({ "unchecked", "rawtypes" })
+       public Object getValueAndReset() {
+               Map ret = new HashMap();
+               for (Map.Entry<String, CounterMetricAdapter> e : 
counterPerKey.entrySet()) {
+                       ret.put(e.getKey(), e.getValue().getValueAndReset());
+               }
+               return ret;
+       }
+
+}
diff --git 
a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/FlinkTopologyContext.java
 
b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/FlinkTopologyContext.java
index e84abccefba..a67946ebca3 100644
--- 
a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/FlinkTopologyContext.java
+++ 
b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/FlinkTopologyContext.java
@@ -17,13 +17,20 @@
 
 package org.apache.flink.storm.wrappers;
 
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.storm.metric.CounterMetricAdapter;
+import org.apache.flink.storm.metric.MultiCountMetricAdapter;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+
 import clojure.lang.Atom;
 import org.apache.storm.generated.StormTopology;
 import org.apache.storm.hooks.ITaskHook;
 import org.apache.storm.metric.api.CombinedMetric;
+import org.apache.storm.metric.api.CountMetric;
 import org.apache.storm.metric.api.ICombiner;
 import org.apache.storm.metric.api.IMetric;
 import org.apache.storm.metric.api.IReducer;
+import org.apache.storm.metric.api.MultiCountMetric;
 import org.apache.storm.metric.api.ReducedMetric;
 import org.apache.storm.state.ISubscribedState;
 import org.apache.storm.task.TopologyContext;
@@ -39,11 +46,14 @@
  */
 final class FlinkTopologyContext extends TopologyContext {
 
+       private final StreamingRuntimeContext context;
+
        /**
         * Instantiates a new {@link FlinkTopologyContext} for a given Storm 
topology. The context object is instantiated
         * for each parallel task
         */
-       FlinkTopologyContext(final StormTopology topology,
+       @SuppressWarnings("unchecked")
+       FlinkTopologyContext(StreamingRuntimeContext context, final 
StormTopology topology,
                        @SuppressWarnings("rawtypes") final Map stormConf,
                        final Map<Integer, String> taskToComponent, final 
Map<String, List<Integer>> componentToSortedTasks,
                        final Map<String, Map<String, Fields>> 
componentToStreamToFields, final String stormId, final String codeDir,
@@ -54,6 +64,7 @@
                super(topology, stormConf, taskToComponent, 
componentToSortedTasks, componentToStreamToFields, stormId,
                                codeDir, pidDir, taskId, workerPort, 
workerTasks, defaultResources, userResources, executorData,
                                registeredMetrics, openOrPrepareWasCalled);
+               this.context = context;
        }
 
        /**
@@ -86,8 +97,7 @@ public void addTaskHook(final ITaskHook hook) {
         */
        @Override
        public IMetric getRegisteredMetricByName(final String name) {
-               throw new UnsupportedOperationException("Metrics are not 
supported by Flink");
-
+               return super.getRegisteredMetricByName(name);
        }
 
        /**
@@ -115,14 +125,21 @@ public ReducedMetric registerMetric(final String name, 
final IReducer combiner,
        }
 
        /**
-        * Not supported by Flink.
-        *
-        * @throws UnsupportedOperationException
-        *              at every invocation
+        * @return - null when storm metric not supported by flink
         */
+       @SuppressWarnings("unchecked")
        @Override
        public <T extends IMetric> T registerMetric(final String name, final T 
metric, final int timeBucketSizeInSecs) {
-               throw new UnsupportedOperationException("Metrics are not 
supported by Flink");
+               IMetric stormMetric = null;
+               if (metric instanceof CountMetric) {
+                       Counter flinkc = context.getMetricGroup().counter(name);
+                       stormMetric = new CounterMetricAdapter(flinkc);
+               } else if (metric instanceof MultiCountMetric) {
+                       stormMetric = new MultiCountMetricAdapter(context);
+               } else {
+                       throw new UnsupportedOperationException("Metrics are 
not supported by Flink");
+               }
+               return super.registerMetric(name, (T) stormMetric, 
timeBucketSizeInSecs);
        }
 
        /**
diff --git 
a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/WrapperSetupHelper.java
 
b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/WrapperSetupHelper.java
index 1d3a544a830..e9c76131146 100644
--- 
a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/WrapperSetupHelper.java
+++ 
b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/WrapperSetupHelper.java
@@ -129,7 +129,7 @@ static synchronized TopologyContext createTopologyContext(
                final Map<String, Object> userResources = new HashMap<String, 
Object>();
                final Map<String, Object> executorData = new HashMap<String, 
Object>();
                final Map registeredMetrics = new HashMap();
-               Atom openOrPrepareWasCalled = null;
+               Atom openOrPrepareWasCalled = new Atom(new Boolean(true));
 
                if (stormTopology == null) {
                        // embedded mode
@@ -196,7 +196,7 @@ static synchronized TopologyContext createTopologyContext(
                        stormConfig.put(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS, 
30); // Storm default value
                }
 
-               return new FlinkTopologyContext(stormTopology, stormConfig, 
taskToComponents,
+               return new FlinkTopologyContext(context, stormTopology, 
stormConfig, taskToComponents,
                                componentToSortedTasks, 
componentToStreamToFields, stormId, codeDir, pidDir,
                                taskId, workerPort, workerTasks, 
defaultResources, userResources, executorData,
                                registeredMetrics, openOrPrepareWasCalled);
diff --git 
a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/FlinkTopologyContextTest.java
 
b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/FlinkTopologyContextTest.java
index 0ee796b4db6..32b9e873d04 100644
--- 
a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/FlinkTopologyContextTest.java
+++ 
b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/FlinkTopologyContextTest.java
@@ -17,18 +17,38 @@
 
 package org.apache.flink.storm.wrappers;
 
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.metrics.CharacterFilter;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.Meter;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.SimpleCounter;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.operators.testutils.MockEnvironment;
+import org.apache.flink.runtime.state.TestTaskStateManager;
 import org.apache.flink.storm.util.AbstractTest;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 
+import clojure.lang.Atom;
 import org.apache.storm.generated.Bolt;
 import org.apache.storm.generated.SpoutSpec;
 import org.apache.storm.generated.StateSpoutSpec;
 import org.apache.storm.generated.StormTopology;
+import org.apache.storm.metric.api.CountMetric;
 import org.apache.storm.metric.api.ICombiner;
 import org.apache.storm.metric.api.IMetric;
 import org.apache.storm.metric.api.IReducer;
+import org.apache.storm.metric.api.MultiCountMetric;
+import org.junit.Assert;
 import org.junit.Test;
 
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.Map;
 
 /**
  * FlinkTopologyContext.getSources(componentId) and 
FlinkTopologyContext.getTargets(componentId) are not tested here,
@@ -38,7 +58,7 @@
 
        @Test(expected = UnsupportedOperationException.class)
        public void testAddTaskHook() {
-               new FlinkTopologyContext(new StormTopology(new HashMap<String, 
SpoutSpec>(),
+               new FlinkTopologyContext(null, new StormTopology(new 
HashMap<String, SpoutSpec>(),
                                new HashMap<String, Bolt>(), new 
HashMap<String, StateSpoutSpec>()), null, null,
                                null, null, null, null, null, null, null, null, 
null, null, null, null, null)
                .addTaskHook(null);
@@ -46,7 +66,7 @@ public void testAddTaskHook() {
 
        @Test(expected = UnsupportedOperationException.class)
        public void testGetHooks() {
-               new FlinkTopologyContext(new StormTopology(new HashMap<String, 
SpoutSpec>(),
+               new FlinkTopologyContext(null, new StormTopology(new 
HashMap<String, SpoutSpec>(),
                                new HashMap<String, Bolt>(), new 
HashMap<String, StateSpoutSpec>()), null, null,
                                null, null, null, null, null, null, null, null, 
null, null, null, null, null)
                .getHooks();
@@ -55,7 +75,7 @@ public void testGetHooks() {
        @SuppressWarnings("rawtypes")
        @Test(expected = UnsupportedOperationException.class)
        public void testRegisteredMetric1() {
-               new FlinkTopologyContext(new StormTopology(new HashMap<String, 
SpoutSpec>(),
+               new FlinkTopologyContext(null, new StormTopology(new 
HashMap<String, SpoutSpec>(),
                                new HashMap<String, Bolt>(), new 
HashMap<String, StateSpoutSpec>()), null, null,
                                null, null, null, null, null, null, null, null, 
null, null, null, null, null)
                .registerMetric(null, (ICombiner) null, 0);
@@ -64,7 +84,7 @@ public void testRegisteredMetric1() {
        @SuppressWarnings("rawtypes")
        @Test(expected = UnsupportedOperationException.class)
        public void testRegisteredMetric2() {
-               new FlinkTopologyContext(new StormTopology(new HashMap<String, 
SpoutSpec>(),
+               new FlinkTopologyContext(null, new StormTopology(new 
HashMap<String, SpoutSpec>(),
                                new HashMap<String, Bolt>(), new 
HashMap<String, StateSpoutSpec>()), null, null,
                                null, null, null, null, null, null, null, null, 
null, null, null, null, null)
                .registerMetric(null, (IReducer) null, 0);
@@ -72,23 +92,206 @@ public void testRegisteredMetric2() {
 
        @Test(expected = UnsupportedOperationException.class)
        public void testRegisteredMetric3() {
-               new FlinkTopologyContext(new StormTopology(new HashMap<String, 
SpoutSpec>(),
+               new FlinkTopologyContext(null, new StormTopology(new 
HashMap<String, SpoutSpec>(),
                                new HashMap<String, Bolt>(), new 
HashMap<String, StateSpoutSpec>()), null, null,
                                null, null, null, null, null, null, null, null, 
null, null, null, null, null)
                .registerMetric(null, (IMetric) null, 0);
        }
 
-       @Test(expected = UnsupportedOperationException.class)
+       private static class MockRuntimeContext extends StreamingRuntimeContext 
{
+               MockMetricGroup mmg = new MockMetricGroup();;
+               private MockRuntimeContext() {
+                       super(
+                               new MockStreamOperator(),
+                               new MockEnvironment(
+                                       "mockTask",
+                                       4 * MemoryManager.DEFAULT_PAGE_SIZE,
+                                       null,
+                                       16,
+                                       new TestTaskStateManager()),
+                                       Collections.<String, Accumulator<?, 
?>>emptyMap());
+               }
+
+               @Override
+               public MetricGroup getMetricGroup() {
+                       return mmg;
+               }
+
+               @Override
+               public boolean isCheckpointingEnabled() {
+                       return false;
+               }
+
+               @Override
+               public int getIndexOfThisSubtask() {
+                       return 0;
+               }
+
+               @Override
+               public int getNumberOfParallelSubtasks() {
+                       return 0;
+               }
+
+               // 
------------------------------------------------------------------------
+
+               public static class MockStreamOperator extends 
AbstractStreamOperator<Integer> {
+                       private static final long serialVersionUID = 
-1153976702711944427L;
+
+                       @Override
+                       public ExecutionConfig getExecutionConfig() {
+                               return new ExecutionConfig();
+                       }
+               }
+
+               public static class MockMetricGroup implements MetricGroup {
+                       private Map<String, Counter> counters = new 
HashMap<String, Counter>();
+
+                       @Override
+                       public Counter counter(int name) {
+                               Counter c = new SimpleCounter();
+                               counters.put(String.valueOf(name), c);
+                               return c;
+                       }
+
+                       @Override
+                       public Counter counter(String name) {
+                               Counter c = new SimpleCounter();
+                               counters.put(name, c);
+                               return c;
+                       }
+
+                       @Override
+                       public <C extends Counter> C counter(int name, C 
counter) {
+                               return null;
+                       }
+
+                       @Override
+                       public <C extends Counter> C counter(String name, C 
counter) {
+                               return null;
+                       }
+
+                       @Override
+                       public <T, G extends Gauge<T>> G gauge(int name, G 
gauge) {
+                               return null;
+                       }
+
+                       @Override
+                       public <T, G extends Gauge<T>> G gauge(String name, G 
gauge) {
+                               return null;
+                       }
+
+                       @Override
+                       public <H extends Histogram> H histogram(String name, H 
histogram) {
+                               return null;
+                       }
+
+                       @Override
+                       public <H extends Histogram> H histogram(int name, H 
histogram) {
+                               return null;
+                       }
+
+                       @Override
+                       public <M extends Meter> M meter(String name, M meter) {
+                               return null;
+                       }
+
+                       @Override
+                       public <M extends Meter> M meter(int name, M meter) {
+                               return null;
+                       }
+
+                       @Override
+                       public MetricGroup addGroup(int name) {
+                               return null;
+                       }
+
+                       @Override
+                       public MetricGroup addGroup(String name) {
+                               return null;
+                       }
+
+                       @Override
+                       public MetricGroup addGroup(String key, String value) {
+                               return null;
+                       }
+
+                       @Override
+                       public String[] getScopeComponents() {
+                               return null;
+                       }
+
+                       @Override
+                       public Map<String, String> getAllVariables() {
+                               return null;
+                       }
+
+                       @Override
+                       public String getMetricIdentifier(String metricName) {
+                               return null;
+                       }
+
+                       @Override
+                       public String getMetricIdentifier(String metricName, 
CharacterFilter filter) {
+                               return null;
+                       }
+
+                       public Counter getCounter(String name) {
+                               return counters.get(name);
+                       }
+
+                       public Map<String, Counter> getCounters() {
+                               return counters;
+                       }
+               }
+       }
+
+       @Test
+       public void testRegisteredMetric_count() {
+               StreamingRuntimeContext streamContext = new 
MockRuntimeContext();
+               CountMetric cm = new FlinkTopologyContext(streamContext, new 
StormTopology(new HashMap<String, SpoutSpec>(),
+                               new HashMap<String, Bolt>(), new 
HashMap<String, StateSpoutSpec>()), null, null,
+                               null, null, null, null, null, null, null, null, 
null, null, null, new HashMap<Integer, Map<Integer, Map<String, IMetric>>>(), 
new Atom(new Boolean(false)))
+               .registerMetric("test-count", new CountMetric(), 1);
+
+               Counter counter = 
((org.apache.flink.storm.wrappers.FlinkTopologyContextTest.MockRuntimeContext.MockMetricGroup)
 streamContext.getMetricGroup()).getCounter("test-count");
+               Assert.assertNotNull(counter);
+               cm.incr();
+               Assert.assertEquals(1, counter.getCount());
+               cm.incrBy(2);
+               Assert.assertEquals(3, counter.getCount());
+       }
+
+       @Test
+       public void testRegisteredMetric_MultiCount() {
+               StreamingRuntimeContext streamContext = new 
MockRuntimeContext();
+               MultiCountMetric multiCount = new 
FlinkTopologyContext(streamContext, new StormTopology(new HashMap<String, 
SpoutSpec>(),
+                               new HashMap<String, Bolt>(), new 
HashMap<String, StateSpoutSpec>()), null, null,
+                               null, null, null, null, null, null, null, null, 
null, null, null, new HashMap<Integer, Map<Integer, Map<String, IMetric>>>(), 
new Atom(new Boolean(false)))
+                       .registerMetric("test-multi-count", new 
MultiCountMetric(), 1);
+               CountMetric cm = multiCount.scope("count1");
+
+               
System.out.println(streamContext.getMetricGroup().getClass().getCanonicalName());
+               
org.apache.flink.storm.wrappers.FlinkTopologyContextTest.MockRuntimeContext.MockMetricGroup
 mmg = 
(org.apache.flink.storm.wrappers.FlinkTopologyContextTest.MockRuntimeContext.MockMetricGroup)
 streamContext.getMetricGroup();
+               System.out.println(mmg.getCounters().size());
+               Counter counter = mmg.getCounter("count1");
+               Assert.assertNotNull(counter);
+               cm.incr();
+               Assert.assertEquals(1, counter.getCount());
+               cm.incrBy(2);
+               Assert.assertEquals(3, counter.getCount());
+       }
+
+       @Test
        public void testGetRegisteredMetricByName() {
-               new FlinkTopologyContext(new StormTopology(new HashMap<String, 
SpoutSpec>(),
+               org.junit.Assert.assertNull(new FlinkTopologyContext(null, new 
StormTopology(new HashMap<String, SpoutSpec>(),
                                new HashMap<String, Bolt>(), new 
HashMap<String, StateSpoutSpec>()), null, null,
-                               null, null, null, null, null, null, null, null, 
null, null, null, null, null)
-               .getRegisteredMetricByName(null);
+                               null, null, null, null, null, null, null, null, 
null, null, null, new HashMap<Integer, Map<Integer, Map<String, IMetric>>>(), 
new Atom(new Boolean(false)))
+               .getRegisteredMetricByName(null));
        }
 
        @Test(expected = UnsupportedOperationException.class)
        public void testSetAllSubscribedState() {
-               new FlinkTopologyContext(new StormTopology(new HashMap<String, 
SpoutSpec>(),
+               new FlinkTopologyContext(null, new StormTopology(new 
HashMap<String, SpoutSpec>(),
                                new HashMap<String, Bolt>(), new 
HashMap<String, StateSpoutSpec>()), null, null,
                                null, null, null, null, null, null, null, null, 
null, null, null, null, null)
                .setAllSubscribedState(null);
@@ -96,15 +299,14 @@ public void testSetAllSubscribedState() {
 
        @Test(expected = UnsupportedOperationException.class)
        public void testSetSubscribedState1() {
-               new FlinkTopologyContext(new StormTopology(new HashMap<String, 
SpoutSpec>(),
-                               new HashMap<String, Bolt>(), new 
HashMap<String, StateSpoutSpec>()), null, null,
-                               null, null, null, null, null, null, null, null, 
null, null, null, null, null)
+               new FlinkTopologyContext(null, new StormTopology(new 
HashMap<String, SpoutSpec>(),
+                               new HashMap<String, Bolt>(), new 
HashMap<String, StateSpoutSpec>()), null, null, null, null, null, null, null, 
null, null, null, null, null, null, null, null)
                .setSubscribedState(null, null);
        }
 
        @Test(expected = UnsupportedOperationException.class)
        public void testSetSubscribedState2() {
-               new FlinkTopologyContext(new StormTopology(new HashMap<String, 
SpoutSpec>(),
+               new FlinkTopologyContext(null, new StormTopology(new 
HashMap<String, SpoutSpec>(),
                                new HashMap<String, Bolt>(), new 
HashMap<String, StateSpoutSpec>()), null, null,
                                null, null, null, null, null, null, null, null, 
null, null, null, null, null)
                .setSubscribedState(null, null, null);
diff --git 
a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/WrapperSetupInLocalClusterTest.java
 
b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/WrapperSetupInLocalClusterTest.java
index 2b0b2753f2c..80e93e06b19 100644
--- 
a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/WrapperSetupInLocalClusterTest.java
+++ 
b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/WrapperSetupInLocalClusterTest.java
@@ -187,11 +187,6 @@ public void testCreateTopologyContext() {
                                topologyContext.getHooks();
                                Assert.fail();
                        } catch (UnsupportedOperationException e) { /* expected 
*/ }
-
-                       try {
-                               topologyContext.getRegisteredMetricByName(null);
-                               Assert.fail();
-                       } catch (UnsupportedOperationException e) { /* expected 
*/ }
                }
        }
 
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java 
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index 6dd30674921..920353a256a 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -1032,6 +1032,8 @@
        @Deprecated
        public static final String FLINK_JVM_OPTIONS = "env.java.opts";
 
+       public static final String FLINK_DETACHED_JOB_SUBMISSION = 
"flink.detached.job.submission";
+
        // --------------------------- High Availability 
--------------------------
 
        /** @deprecated Deprecated in favour of {@link 
HighAvailabilityOptions#HA_MODE} */
@@ -1992,6 +1994,9 @@
        @Deprecated
        public static final int DEFAULT_METRICS_LATENCY_HISTORY_SIZE = 128;
 
+       // ----------------------------- Misc ----------------------------
+       public static final boolean DEFAULT_FLINK_DETACHED_JOB_SUBMISSION = 
true;
+
        // ----------------------------- Environment Variables 
----------------------------
 
        /** The environment variable name which contains the location of the 
configuration directory. */


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to