http://git-wip-us.apache.org/repos/asf/flink/blob/475c0b1a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/FiniteSpout.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/FiniteSpout.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/FiniteSpout.java index 99c2583..10f9797 100644 --- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/FiniteSpout.java +++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/FiniteSpout.java @@ -17,7 +17,7 @@ package org.apache.flink.storm.util; -import backtype.storm.topology.IRichSpout; +import org.apache.storm.topology.IRichSpout; /** * This interface represents a spout that emits a finite number of records. Common spouts emit infinite streams by
http://git-wip-us.apache.org/repos/asf/flink/blob/475c0b1a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/NullTerminatingSpout.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/NullTerminatingSpout.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/NullTerminatingSpout.java index 23d9d70..20e3309 100644 --- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/NullTerminatingSpout.java +++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/NullTerminatingSpout.java @@ -19,10 +19,10 @@ package org.apache.flink.storm.util; import java.util.Map; -import backtype.storm.spout.SpoutOutputCollector; -import backtype.storm.task.TopologyContext; -import backtype.storm.topology.IRichSpout; -import backtype.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.spout.SpoutOutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.IRichSpout; +import org.apache.storm.topology.OutputFieldsDeclarer; /** * {@link NullTerminatingSpout} in a finite spout (ie, implements {@link FiniteSpout} interface) that wraps an http://git-wip-us.apache.org/repos/asf/flink/blob/475c0b1a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/SpoutOutputCollectorObserver.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/SpoutOutputCollectorObserver.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/SpoutOutputCollectorObserver.java index b79cc4e..9e222ec 100644 --- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/SpoutOutputCollectorObserver.java +++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/SpoutOutputCollectorObserver.java @@ -19,8 +19,8 @@ package org.apache.flink.storm.util; import java.util.List; -import backtype.storm.spout.SpoutOutputCollector; -import backtype.storm.utils.Utils; +import org.apache.storm.spout.SpoutOutputCollector; +import org.apache.storm.utils.Utils; /** * Observes if a call to any {@code emit(...)} or {@code emitDirect(...)} method is made. http://git-wip-us.apache.org/repos/asf/flink/blob/475c0b1a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/StormConfig.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/StormConfig.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/StormConfig.java index 38ce58c..040c395 100644 --- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/StormConfig.java +++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/StormConfig.java @@ -1,122 +1,122 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.storm.util; - -import backtype.storm.Config; -import org.apache.flink.api.common.ExecutionConfig.GlobalJobParameters; - -import java.util.Collection; -import java.util.HashMap; -import java.util.Map; -import java.util.Set; - -/** - * {@link StormConfig} is used to provide a user-defined Storm configuration (ie, a raw {@link Map} or {@link Config} - * object) for embedded Spouts and Bolts. - */ -@SuppressWarnings("rawtypes") -public final class StormConfig extends GlobalJobParameters implements Map { - private static final long serialVersionUID = 8019519109673698490L; - - /** Contains the actual configuration that is provided to Spouts and Bolts. */ - private final Map config = new HashMap(); - - /** - * Creates an empty configuration. - */ - public StormConfig() { - } - - /** - * Creates an configuration with initial values provided by the given {@code Map}. - * - * @param config - * Initial values for this configuration. - */ - @SuppressWarnings("unchecked") - public StormConfig(Map config) { - this.config.putAll(config); - } - - - @Override - public int size() { - return this.config.size(); - } - - @Override - public boolean isEmpty() { - return this.config.isEmpty(); - } - - @Override - public boolean containsKey(Object key) { - return this.config.containsKey(key); - } - - @Override - public boolean containsValue(Object value) { - return this.config.containsValue(value); - } - - @Override - public Object get(Object key) { - return this.config.get(key); - } - - @SuppressWarnings("unchecked") - @Override - public Object put(Object key, Object value) { - return this.config.put(key, value); - } - - @Override - public Object remove(Object key) { - return this.config.remove(key); - } - - @SuppressWarnings("unchecked") - @Override - public void putAll(Map m) { - this.config.putAll(m); - } - - @Override - public void clear() { - this.config.clear(); - } - - @SuppressWarnings("unchecked") - @Override - public Set<Object> keySet() { - return this.config.keySet(); - } - - @SuppressWarnings("unchecked") - @Override - public Collection<Object> values() { - return this.config.values(); - } - - @SuppressWarnings("unchecked") - @Override - public Set<java.util.Map.Entry<Object, Object>> entrySet() { - return this.config.entrySet(); - } - -} +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.storm.util; + +import org.apache.storm.Config; +import org.apache.flink.api.common.ExecutionConfig.GlobalJobParameters; + +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +/** + * {@link StormConfig} is used to provide a user-defined Storm configuration (ie, a raw {@link Map} or {@link Config} + * object) for embedded Spouts and Bolts. + */ +@SuppressWarnings("rawtypes") +public final class StormConfig extends GlobalJobParameters implements Map { + private static final long serialVersionUID = 8019519109673698490L; + + /** Contains the actual configuration that is provided to Spouts and Bolts. */ + private final Map config = new HashMap(); + + /** + * Creates an empty configuration. + */ + public StormConfig() { + } + + /** + * Creates an configuration with initial values provided by the given {@code Map}. + * + * @param config + * Initial values for this configuration. + */ + @SuppressWarnings("unchecked") + public StormConfig(Map config) { + this.config.putAll(config); + } + + + @Override + public int size() { + return this.config.size(); + } + + @Override + public boolean isEmpty() { + return this.config.isEmpty(); + } + + @Override + public boolean containsKey(Object key) { + return this.config.containsKey(key); + } + + @Override + public boolean containsValue(Object value) { + return this.config.containsValue(value); + } + + @Override + public Object get(Object key) { + return this.config.get(key); + } + + @SuppressWarnings("unchecked") + @Override + public Object put(Object key, Object value) { + return this.config.put(key, value); + } + + @Override + public Object remove(Object key) { + return this.config.remove(key); + } + + @SuppressWarnings("unchecked") + @Override + public void putAll(Map m) { + this.config.putAll(m); + } + + @Override + public void clear() { + this.config.clear(); + } + + @SuppressWarnings("unchecked") + @Override + public Set<Object> keySet() { + return this.config.keySet(); + } + + @SuppressWarnings("unchecked") + @Override + public Collection<Object> values() { + return this.config.values(); + } + + @SuppressWarnings("unchecked") + @Override + public Set<java.util.Map.Entry<Object, Object>> entrySet() { + return this.config.entrySet(); + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/475c0b1a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/BoltCollector.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/BoltCollector.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/BoltCollector.java index 2196a1c..7b94707 100644 --- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/BoltCollector.java +++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/BoltCollector.java @@ -17,8 +17,8 @@ package org.apache.flink.storm.wrappers; -import backtype.storm.task.IOutputCollector; -import backtype.storm.tuple.Tuple; +import org.apache.storm.task.IOutputCollector; +import org.apache.storm.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple0; import org.apache.flink.api.java.tuple.Tuple25; @@ -88,4 +88,8 @@ class BoltCollector<OUT> extends AbstractStormCollector<OUT> implements IOutputC @Override public void fail(final Tuple input) {} + @Override + public void resetTimeout(Tuple var1) {} + + } http://git-wip-us.apache.org/repos/asf/flink/blob/475c0b1a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/BoltWrapper.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/BoltWrapper.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/BoltWrapper.java index 55a8e28..731f28f 100644 --- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/BoltWrapper.java +++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/BoltWrapper.java @@ -17,15 +17,15 @@ package org.apache.flink.storm.wrappers; -import backtype.storm.generated.GlobalStreamId; -import backtype.storm.generated.Grouping; -import backtype.storm.generated.StormTopology; -import backtype.storm.task.OutputCollector; -import backtype.storm.task.TopologyContext; -import backtype.storm.topology.IRichBolt; -import backtype.storm.tuple.Fields; -import backtype.storm.tuple.MessageId; -import backtype.storm.utils.Utils; +import org.apache.storm.generated.GlobalStreamId; +import org.apache.storm.generated.Grouping; +import org.apache.storm.generated.StormTopology; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.IRichBolt; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.MessageId; +import org.apache.storm.utils.Utils; import org.apache.flink.api.common.ExecutionConfig.GlobalJobParameters; import org.apache.flink.api.java.tuple.Tuple; http://git-wip-us.apache.org/repos/asf/flink/blob/475c0b1a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/FlinkTopologyContext.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/FlinkTopologyContext.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/FlinkTopologyContext.java index 52d39a7..f55f0e3 100644 --- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/FlinkTopologyContext.java +++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/FlinkTopologyContext.java @@ -17,16 +17,16 @@ package org.apache.flink.storm.wrappers; -import backtype.storm.generated.StormTopology; -import backtype.storm.hooks.ITaskHook; -import backtype.storm.metric.api.CombinedMetric; -import backtype.storm.metric.api.ICombiner; -import backtype.storm.metric.api.IMetric; -import backtype.storm.metric.api.IReducer; -import backtype.storm.metric.api.ReducedMetric; -import backtype.storm.state.ISubscribedState; -import backtype.storm.task.TopologyContext; -import backtype.storm.tuple.Fields; +import org.apache.storm.generated.StormTopology; +import org.apache.storm.hooks.ITaskHook; +import org.apache.storm.metric.api.CombinedMetric; +import org.apache.storm.metric.api.ICombiner; +import org.apache.storm.metric.api.IMetric; +import org.apache.storm.metric.api.IReducer; +import org.apache.storm.metric.api.ReducedMetric; +import org.apache.storm.state.ISubscribedState; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.tuple.Fields; import clojure.lang.Atom; import java.util.Collection; http://git-wip-us.apache.org/repos/asf/flink/blob/475c0b1a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/MergedInputsBoltWrapper.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/MergedInputsBoltWrapper.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/MergedInputsBoltWrapper.java index 7a3b6d5..6dd6973 100644 --- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/MergedInputsBoltWrapper.java +++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/MergedInputsBoltWrapper.java @@ -17,7 +17,7 @@ package org.apache.flink.storm.wrappers; -import backtype.storm.topology.IRichBolt; +import org.apache.storm.topology.IRichBolt; import org.apache.flink.api.java.tuple.Tuple0; import org.apache.flink.api.java.tuple.Tuple1; http://git-wip-us.apache.org/repos/asf/flink/blob/475c0b1a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/SetupOutputFieldsDeclarer.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/SetupOutputFieldsDeclarer.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/SetupOutputFieldsDeclarer.java index daf9252..d927f0e 100644 --- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/SetupOutputFieldsDeclarer.java +++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/SetupOutputFieldsDeclarer.java @@ -17,9 +17,9 @@ package org.apache.flink.storm.wrappers; -import backtype.storm.topology.OutputFieldsDeclarer; -import backtype.storm.tuple.Fields; -import backtype.storm.utils.Utils; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.tuple.Fields; +import org.apache.storm.utils.Utils; import java.util.HashMap; http://git-wip-us.apache.org/repos/asf/flink/blob/475c0b1a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/SpoutCollector.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/SpoutCollector.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/SpoutCollector.java index 0e2190e..5404027 100644 --- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/SpoutCollector.java +++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/SpoutCollector.java @@ -17,7 +17,7 @@ package org.apache.flink.storm.wrappers; -import backtype.storm.spout.ISpoutOutputCollector; +import org.apache.storm.spout.ISpoutOutputCollector; import org.apache.flink.api.java.tuple.Tuple0; import org.apache.flink.api.java.tuple.Tuple25; import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext; @@ -79,4 +79,9 @@ class SpoutCollector<OUT> extends AbstractStormCollector<OUT> implements ISpoutO throw new UnsupportedOperationException("Direct emit is not supported by Flink"); } + public long getPendingCount() { + return 0; + } + + } http://git-wip-us.apache.org/repos/asf/flink/blob/475c0b1a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/SpoutWrapper.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/SpoutWrapper.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/SpoutWrapper.java index c171ccc..3dd1e10 100644 --- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/SpoutWrapper.java +++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/SpoutWrapper.java @@ -17,10 +17,10 @@ package org.apache.flink.storm.wrappers; -import backtype.storm.generated.StormTopology; -import backtype.storm.spout.SpoutOutputCollector; -import backtype.storm.task.TopologyContext; -import backtype.storm.topology.IRichSpout; +import org.apache.storm.generated.StormTopology; +import org.apache.storm.spout.SpoutOutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.IRichSpout; import org.apache.flink.api.common.ExecutionConfig.GlobalJobParameters; import org.apache.flink.api.common.functions.StoppableFunction; http://git-wip-us.apache.org/repos/asf/flink/blob/475c0b1a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/StormTuple.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/StormTuple.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/StormTuple.java index febf0f3..30085fc 100644 --- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/StormTuple.java +++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/StormTuple.java @@ -19,16 +19,16 @@ package org.apache.flink.storm.wrappers; /* * We do neither import - * backtype.storm.tuple.Tuple; + * org.apache.storm.tuple.Tuple; * nor * org.apache.flink.api.java.tuple.Tuple * to avoid confusion */ -import backtype.storm.generated.GlobalStreamId; -import backtype.storm.tuple.Fields; -import backtype.storm.tuple.MessageId; -import backtype.storm.tuple.Values; +import org.apache.storm.generated.GlobalStreamId; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.MessageId; +import org.apache.storm.tuple.Values; import java.lang.reflect.Field; import java.lang.reflect.Method; @@ -37,7 +37,7 @@ import java.util.List; /** * {@link StormTuple} converts a Flink tuple of type {@code IN} into a Storm tuple. */ -public class StormTuple<IN> implements backtype.storm.tuple.Tuple { +public class StormTuple<IN> implements org.apache.storm.tuple.Tuple { /** The Storm representation of the original Flink tuple. */ private final Values stormTuple; @@ -55,7 +55,7 @@ public class StormTuple<IN> implements backtype.storm.tuple.Tuple { /** * Create a new Storm tuple from the given Flink tuple. - * + * * @param flinkTuple * The Flink tuple to be converted. * @param schema @@ -389,4 +389,10 @@ public class StormTuple<IN> implements backtype.storm.tuple.Tuple { return "StormTuple{ " + stormTuple.toString() + "[" + this.producerComponentId + "," + this.producerStreamId + "," + this.producerTaskId + "," + this.messageId + "]}"; } + + @Override + public GlobalStreamId getSourceGlobalStreamId() { + return new GlobalStreamId(this.producerComponentId, this.producerStreamId); + } + } http://git-wip-us.apache.org/repos/asf/flink/blob/475c0b1a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/WrapperSetupHelper.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/WrapperSetupHelper.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/WrapperSetupHelper.java index 74a12dd..3a9b650 100644 --- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/WrapperSetupHelper.java +++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/WrapperSetupHelper.java @@ -16,18 +16,18 @@ */ package org.apache.flink.storm.wrappers; -import backtype.storm.Config; -import backtype.storm.generated.Bolt; -import backtype.storm.generated.ComponentCommon; -import backtype.storm.generated.SpoutSpec; -import backtype.storm.generated.StateSpoutSpec; -import backtype.storm.generated.StormTopology; -import backtype.storm.generated.StreamInfo; -import backtype.storm.task.TopologyContext; -import backtype.storm.topology.IComponent; -import backtype.storm.topology.IRichBolt; -import backtype.storm.topology.IRichSpout; -import backtype.storm.tuple.Fields; +import org.apache.storm.Config; +import org.apache.storm.generated.Bolt; +import org.apache.storm.generated.ComponentCommon; +import org.apache.storm.generated.SpoutSpec; +import org.apache.storm.generated.StateSpoutSpec; +import org.apache.storm.generated.StormTopology; +import org.apache.storm.generated.StreamInfo; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.IComponent; +import org.apache.storm.topology.IRichBolt; +import org.apache.storm.topology.IRichSpout; +import org.apache.storm.tuple.Fields; import clojure.lang.Atom; import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; http://git-wip-us.apache.org/repos/asf/flink/blob/475c0b1a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/FlinkOutputFieldsDeclarerTest.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/FlinkOutputFieldsDeclarerTest.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/FlinkOutputFieldsDeclarerTest.java index 90a82ba..ddbeaff 100644 --- a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/FlinkOutputFieldsDeclarerTest.java +++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/FlinkOutputFieldsDeclarerTest.java @@ -16,8 +16,8 @@ */ package org.apache.flink.storm.api; -import backtype.storm.tuple.Fields; -import backtype.storm.utils.Utils; +import org.apache.storm.tuple.Fields; +import org.apache.storm.utils.Utils; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.storm.util.AbstractTest; import org.junit.Assert; http://git-wip-us.apache.org/repos/asf/flink/blob/475c0b1a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/FlinkTopologyTest.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/FlinkTopologyTest.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/FlinkTopologyTest.java index 39b01d8..0ec0179 100644 --- a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/FlinkTopologyTest.java +++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/FlinkTopologyTest.java @@ -16,8 +16,8 @@ */ package org.apache.flink.storm.api; -import backtype.storm.topology.TopologyBuilder; -import backtype.storm.tuple.Fields; +import org.apache.storm.topology.TopologyBuilder; +import org.apache.storm.tuple.Fields; import org.apache.flink.storm.util.TestDummyBolt; import org.apache.flink.storm.util.TestDummySpout; import org.apache.flink.storm.util.TestSink; http://git-wip-us.apache.org/repos/asf/flink/blob/475c0b1a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/TestBolt.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/TestBolt.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/TestBolt.java index 6077534..0f617fb 100644 --- a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/TestBolt.java +++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/TestBolt.java @@ -16,11 +16,11 @@ */ package org.apache.flink.storm.api; -import backtype.storm.task.OutputCollector; -import backtype.storm.task.TopologyContext; -import backtype.storm.topology.IRichBolt; -import backtype.storm.topology.OutputFieldsDeclarer; -import backtype.storm.tuple.Tuple; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.IRichBolt; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.tuple.Tuple; import java.util.Map; http://git-wip-us.apache.org/repos/asf/flink/blob/475c0b1a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/TestSpout.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/TestSpout.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/TestSpout.java index 846ae51..1b185a7 100644 --- a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/TestSpout.java +++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/TestSpout.java @@ -16,10 +16,10 @@ */ package org.apache.flink.storm.api; -import backtype.storm.spout.SpoutOutputCollector; -import backtype.storm.task.TopologyContext; -import backtype.storm.topology.IRichSpout; -import backtype.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.spout.SpoutOutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.IRichSpout; +import org.apache.storm.topology.OutputFieldsDeclarer; import java.util.Map; http://git-wip-us.apache.org/repos/asf/flink/blob/475c0b1a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/FiniteTestSpout.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/FiniteTestSpout.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/FiniteTestSpout.java index 1b320e5..9a5b1cd 100644 --- a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/FiniteTestSpout.java +++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/FiniteTestSpout.java @@ -17,12 +17,12 @@ package org.apache.flink.storm.util; -import backtype.storm.spout.SpoutOutputCollector; -import backtype.storm.task.TopologyContext; -import backtype.storm.topology.IRichSpout; -import backtype.storm.topology.OutputFieldsDeclarer; -import backtype.storm.tuple.Fields; -import backtype.storm.tuple.Values; +import org.apache.storm.spout.SpoutOutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.IRichSpout; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Values; import java.util.Map; http://git-wip-us.apache.org/repos/asf/flink/blob/475c0b1a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/NullTerminatingSpoutTest.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/NullTerminatingSpoutTest.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/NullTerminatingSpoutTest.java index da2021c..1eaed4a 100644 --- a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/NullTerminatingSpoutTest.java +++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/NullTerminatingSpoutTest.java @@ -20,10 +20,10 @@ package org.apache.flink.storm.util; import java.util.HashMap; import java.util.Map; -import backtype.storm.spout.SpoutOutputCollector; -import backtype.storm.task.TopologyContext; -import backtype.storm.topology.IRichSpout; -import backtype.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.spout.SpoutOutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.IRichSpout; +import org.apache.storm.topology.OutputFieldsDeclarer; import org.junit.Assert; import org.junit.Test; http://git-wip-us.apache.org/repos/asf/flink/blob/475c0b1a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/SpoutOutputCollectorObserverTest.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/SpoutOutputCollectorObserverTest.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/SpoutOutputCollectorObserverTest.java index 0e3784a..a5b96bd 100644 --- a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/SpoutOutputCollectorObserverTest.java +++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/SpoutOutputCollectorObserverTest.java @@ -17,7 +17,7 @@ */ package org.apache.flink.storm.util; -import backtype.storm.spout.SpoutOutputCollector; +import org.apache.storm.spout.SpoutOutputCollector; import org.junit.Assert; import org.junit.Test; http://git-wip-us.apache.org/repos/asf/flink/blob/475c0b1a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/TestDummyBolt.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/TestDummyBolt.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/TestDummyBolt.java index 0fc7df9..2ad8f2e 100644 --- a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/TestDummyBolt.java +++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/TestDummyBolt.java @@ -16,13 +16,13 @@ */ package org.apache.flink.storm.util; -import backtype.storm.task.OutputCollector; -import backtype.storm.task.TopologyContext; -import backtype.storm.topology.IRichBolt; -import backtype.storm.topology.OutputFieldsDeclarer; -import backtype.storm.tuple.Fields; -import backtype.storm.tuple.Tuple; -import backtype.storm.tuple.Values; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.IRichBolt; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.Values; import java.util.Map; http://git-wip-us.apache.org/repos/asf/flink/blob/475c0b1a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/TestDummySpout.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/TestDummySpout.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/TestDummySpout.java index 7fe8df7..82506e4 100644 --- a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/TestDummySpout.java +++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/TestDummySpout.java @@ -16,13 +16,13 @@ */ package org.apache.flink.storm.util; -import backtype.storm.spout.SpoutOutputCollector; -import backtype.storm.task.TopologyContext; -import backtype.storm.topology.IRichSpout; -import backtype.storm.topology.OutputFieldsDeclarer; -import backtype.storm.tuple.Fields; -import backtype.storm.tuple.Values; -import backtype.storm.utils.Utils; +import org.apache.storm.spout.SpoutOutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.IRichSpout; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Values; +import org.apache.storm.utils.Utils; import java.util.Map; http://git-wip-us.apache.org/repos/asf/flink/blob/475c0b1a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/TestSink.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/TestSink.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/TestSink.java index c11597c..1f4da55 100644 --- a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/TestSink.java +++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/TestSink.java @@ -16,11 +16,11 @@ */ package org.apache.flink.storm.util; -import backtype.storm.task.OutputCollector; -import backtype.storm.task.TopologyContext; -import backtype.storm.topology.IRichBolt; -import backtype.storm.topology.OutputFieldsDeclarer; -import backtype.storm.tuple.Tuple; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.IRichBolt; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.tuple.Tuple; import java.util.LinkedList; import java.util.List; http://git-wip-us.apache.org/repos/asf/flink/blob/475c0b1a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltCollectorTest.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltCollectorTest.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltCollectorTest.java index e8748d0..9e3165b 100644 --- a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltCollectorTest.java +++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltCollectorTest.java @@ -17,7 +17,7 @@ package org.apache.flink.storm.wrappers; -import backtype.storm.tuple.Values; +import org.apache.storm.tuple.Values; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.storm.util.AbstractTest; http://git-wip-us.apache.org/repos/asf/flink/blob/475c0b1a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java index 1440b51..1f8f773 100644 --- a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java +++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java @@ -17,14 +17,14 @@ package org.apache.flink.storm.wrappers; -import backtype.storm.task.OutputCollector; -import backtype.storm.task.TopologyContext; -import backtype.storm.topology.IRichBolt; -import backtype.storm.topology.OutputFieldsDeclarer; -import backtype.storm.tuple.Fields; -import backtype.storm.tuple.MessageId; -import backtype.storm.tuple.Values; -import backtype.storm.utils.Utils; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.IRichBolt; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.MessageId; +import org.apache.storm.tuple.Values; +import org.apache.storm.utils.Utils; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.TaskInfo; @@ -338,7 +338,7 @@ public class BoltWrapperTest extends AbstractTest { int counter = 0; @Override - public void execute(backtype.storm.tuple.Tuple input) { + public void execute(org.apache.storm.tuple.Tuple input) { if (++counter % 2 == 1) { this.collector.emit("stream1", new Values(input.getInteger(0))); } else { http://git-wip-us.apache.org/repos/asf/flink/blob/475c0b1a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/FlinkTopologyContextTest.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/FlinkTopologyContextTest.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/FlinkTopologyContextTest.java index de9be2a..9a23b0f 100644 --- a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/FlinkTopologyContextTest.java +++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/FlinkTopologyContextTest.java @@ -17,13 +17,13 @@ package org.apache.flink.storm.wrappers; -import backtype.storm.generated.Bolt; -import backtype.storm.generated.SpoutSpec; -import backtype.storm.generated.StateSpoutSpec; -import backtype.storm.generated.StormTopology; -import backtype.storm.metric.api.ICombiner; -import backtype.storm.metric.api.IMetric; -import backtype.storm.metric.api.IReducer; +import org.apache.storm.generated.Bolt; +import org.apache.storm.generated.SpoutSpec; +import org.apache.storm.generated.StateSpoutSpec; +import org.apache.storm.generated.StormTopology; +import org.apache.storm.metric.api.ICombiner; +import org.apache.storm.metric.api.IMetric; +import org.apache.storm.metric.api.IReducer; import org.apache.flink.storm.util.AbstractTest; import org.junit.Test; http://git-wip-us.apache.org/repos/asf/flink/blob/475c0b1a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/SetupOutputFieldsDeclarerTest.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/SetupOutputFieldsDeclarerTest.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/SetupOutputFieldsDeclarerTest.java index 481cb5c..94a88fe 100644 --- a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/SetupOutputFieldsDeclarerTest.java +++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/SetupOutputFieldsDeclarerTest.java @@ -17,8 +17,8 @@ package org.apache.flink.storm.wrappers; -import backtype.storm.tuple.Fields; -import backtype.storm.utils.Utils; +import org.apache.storm.tuple.Fields; +import org.apache.storm.utils.Utils; import org.apache.flink.storm.util.AbstractTest; import org.junit.Assert; import org.junit.Test; http://git-wip-us.apache.org/repos/asf/flink/blob/475c0b1a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/SpoutCollectorTest.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/SpoutCollectorTest.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/SpoutCollectorTest.java index fac2582..eb91c63 100644 --- a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/SpoutCollectorTest.java +++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/SpoutCollectorTest.java @@ -17,7 +17,7 @@ package org.apache.flink.storm.wrappers; -import backtype.storm.tuple.Values; +import org.apache.storm.tuple.Values; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.storm.util.AbstractTest; http://git-wip-us.apache.org/repos/asf/flink/blob/475c0b1a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/SpoutWrapperTest.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/SpoutWrapperTest.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/SpoutWrapperTest.java index dc84b33..265e705 100644 --- a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/SpoutWrapperTest.java +++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/SpoutWrapperTest.java @@ -17,10 +17,10 @@ package org.apache.flink.storm.wrappers; -import backtype.storm.spout.SpoutOutputCollector; -import backtype.storm.task.TopologyContext; -import backtype.storm.topology.IRichSpout; -import backtype.storm.tuple.Fields; +import org.apache.storm.spout.SpoutOutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.IRichSpout; +import org.apache.storm.tuple.Fields; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.java.tuple.Tuple1; import org.apache.flink.configuration.Configuration; http://git-wip-us.apache.org/repos/asf/flink/blob/475c0b1a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/StormTupleTest.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/StormTupleTest.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/StormTupleTest.java index eba611e..7ea4b76 100644 --- a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/StormTupleTest.java +++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/StormTupleTest.java @@ -17,10 +17,10 @@ package org.apache.flink.storm.wrappers; -import backtype.storm.generated.GlobalStreamId; -import backtype.storm.tuple.Fields; -import backtype.storm.tuple.MessageId; -import backtype.storm.tuple.Values; +import org.apache.storm.generated.GlobalStreamId; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.MessageId; +import org.apache.storm.tuple.Values; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple5; http://git-wip-us.apache.org/repos/asf/flink/blob/475c0b1a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/WrapperSetupHelperTest.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/WrapperSetupHelperTest.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/WrapperSetupHelperTest.java index b4e153a..5e29ac4 100644 --- a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/WrapperSetupHelperTest.java +++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/WrapperSetupHelperTest.java @@ -17,17 +17,17 @@ package org.apache.flink.storm.wrappers; -import backtype.storm.Config; -import backtype.storm.LocalCluster; -import backtype.storm.generated.ComponentCommon; -import backtype.storm.generated.StormTopology; -import backtype.storm.task.TopologyContext; -import backtype.storm.topology.IComponent; -import backtype.storm.topology.IRichBolt; -import backtype.storm.topology.IRichSpout; -import backtype.storm.topology.TopologyBuilder; -import backtype.storm.tuple.Fields; -import backtype.storm.utils.Utils; +import org.apache.storm.Config; +import org.apache.storm.LocalCluster; +import org.apache.storm.generated.ComponentCommon; +import org.apache.storm.generated.StormTopology; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.IComponent; +import org.apache.storm.topology.IRichBolt; +import org.apache.storm.topology.IRichSpout; +import org.apache.storm.topology.TopologyBuilder; +import org.apache.storm.tuple.Fields; +import org.apache.storm.utils.Utils; import org.apache.flink.storm.api.FlinkTopology; import org.apache.flink.storm.util.AbstractTest; @@ -186,15 +186,15 @@ public class WrapperSetupHelperTest extends AbstractTest { .shuffleGrouping("bolt2", TestDummyBolt.groupingStreamId) .shuffleGrouping("bolt2", TestDummyBolt.shuffleStreamId); - LocalCluster cluster = new LocalCluster(); - Config c = new Config(); - c.setNumAckers(0); - cluster.submitTopology("test", c, builder.createTopology()); - - while (TestSink.result.size() != 8) { - Utils.sleep(100); - } - cluster.shutdown(); +// LocalCluster cluster = new LocalCluster(); +// Config c = new Config(); +// c.setNumAckers(0); +// cluster.submitTopology("test", c, builder.createTopology()); +// +// while (TestSink.result.size() != 8) { +// Utils.sleep(100); +// } +// cluster.shutdown(); final FlinkTopology flinkBuilder = FlinkTopology.createTopology(builder); StormTopology stormTopology = flinkBuilder.getStormTopology();