http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/FiniteStormInMemorySpout.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/FiniteStormInMemorySpout.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/FiniteStormInMemorySpout.java deleted file mode 100644 index 5efff66..0000000 --- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/FiniteStormInMemorySpout.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.stormcompatibility.util; - -import org.apache.flink.stormcompatibility.util.FiniteStormSpout; - -/** - * Implements a Storm Spout that reads String[] data stored in the memory. The spout stops - * automatically when it emitted all of the data. - */ -public class FiniteStormInMemorySpout extends StormInMemorySpout<String> implements - FiniteStormSpout { - private static final long serialVersionUID = -4008858647468647019L; - - public FiniteStormInMemorySpout(String[] source) { - super(source); - } - - @Override - public boolean reachedEnd() { - return counter >= source.length; - } - -}
http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/OutputFormatter.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/OutputFormatter.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/OutputFormatter.java deleted file mode 100644 index ec9adfe..0000000 --- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/OutputFormatter.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.stormcompatibility.util; - -import backtype.storm.tuple.Tuple; - -import java.io.Serializable; - -public interface OutputFormatter extends Serializable { - - /** - * Converts a Storm {@link Tuple} to a string. This method is used for formatting the output - * tuples before writing them out to a file or to the consol. - * - * @param input The tuple to be formatted - * @return The string result of the formatting - */ - public String format(Tuple input); - -} http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/SimpleOutputFormatter.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/SimpleOutputFormatter.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/SimpleOutputFormatter.java deleted file mode 100644 index 0702e94..0000000 --- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/SimpleOutputFormatter.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.stormcompatibility.util; - -import backtype.storm.tuple.Tuple; - -public class SimpleOutputFormatter implements OutputFormatter { - private static final long serialVersionUID = 6349573860144270338L; - - /** - * Converts a Storm {@link Tuple} with 1 field to a string by retrieving the value of that - * field. This method is used for formatting raw outputs wrapped in tuples, before writing them - * out to a file or to the consol. - * - * @param input - * The tuple to be formatted - * @return The string result of the formatting - */ - @Override - public String format(final Tuple input) { - if (input.getValues().size() != 1) { - throw new RuntimeException("The output is not raw"); - } - return input.getValue(0).toString(); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/StormBoltFileSink.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/StormBoltFileSink.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/StormBoltFileSink.java deleted file mode 100644 index ee8dca4..0000000 --- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/StormBoltFileSink.java +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.stormcompatibility.util; - -import backtype.storm.task.TopologyContext; - -import java.io.BufferedWriter; -import java.io.FileWriter; -import java.io.IOException; -import java.util.Map; - -/** - * Implements a sink that write the received data to the given file (as a result of {@code Object.toString()} for each - * attribute). - */ -public final class StormBoltFileSink extends AbstractStormBoltSink { - private static final long serialVersionUID = 2014027288631273666L; - - private final String path; - private BufferedWriter writer; - - public StormBoltFileSink(final String path) { - this(path, new SimpleOutputFormatter()); - } - - public StormBoltFileSink(final String path, final OutputFormatter formatter) { - super(formatter); - this.path = path; - } - - @SuppressWarnings("rawtypes") - @Override - public void prepareSimple(final Map stormConf, final TopologyContext context) { - try { - this.writer = new BufferedWriter(new FileWriter(this.path)); - } catch (final IOException e) { - throw new RuntimeException(e); - } - } - - @Override - public void writeExternal(final String line) { - try { - this.writer.write(line + "\n"); - } catch (final IOException e) { - throw new RuntimeException(e); - } - } - - @Override - public void cleanup() { - if (this.writer != null) { - try { - this.writer.close(); - } catch (final IOException e) { - throw new RuntimeException(e); - } - } - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/StormBoltPrintSink.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/StormBoltPrintSink.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/StormBoltPrintSink.java deleted file mode 100644 index 3bf49d0..0000000 --- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/StormBoltPrintSink.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.stormcompatibility.util; - -import backtype.storm.task.TopologyContext; - -import java.util.Map; - -/** - * Implements a sink that prints the received data to {@code stdout}. - */ -public final class StormBoltPrintSink extends AbstractStormBoltSink { - private static final long serialVersionUID = -6650011223001009519L; - - public StormBoltPrintSink(OutputFormatter formatter) { - super(formatter); - } - - @SuppressWarnings("rawtypes") - @Override - public void prepareSimple(final Map stormConf, final TopologyContext context) { - /* nothing to do */ - } - - @Override - public void writeExternal(final String line) { - System.out.println(line); - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/StormFileSpout.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/StormFileSpout.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/StormFileSpout.java deleted file mode 100644 index 0611e37..0000000 --- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/StormFileSpout.java +++ /dev/null @@ -1,88 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.stormcompatibility.util; - -import backtype.storm.spout.SpoutOutputCollector; -import backtype.storm.task.TopologyContext; -import backtype.storm.tuple.Values; - -import java.io.BufferedReader; -import java.io.FileNotFoundException; -import java.io.FileReader; -import java.io.IOException; -import java.util.Map; - -/** - * Implements a Storm Spout that reads data from a given local file. - */ -public class StormFileSpout extends AbstractStormSpout { - private static final long serialVersionUID = -6996907090003590436L; - - public final static String INPUT_FILE_PATH = "input.path"; - - protected String path = null; - protected BufferedReader reader; - - public StormFileSpout() {} - - public StormFileSpout(final String path) { - this.path = path; - } - - @SuppressWarnings("rawtypes") - @Override - public void open(final Map conf, final TopologyContext context, final SpoutOutputCollector collector) { - super.open(conf, context, collector); - - Object configuredPath = conf.get(INPUT_FILE_PATH); - if(configuredPath != null) { - this.path = (String)configuredPath; - } - - try { - this.reader = new BufferedReader(new FileReader(this.path)); - } catch (final FileNotFoundException e) { - throw new RuntimeException(e); - } - } - - @Override - public void close() { - if (this.reader != null) { - try { - this.reader.close(); - } catch (final IOException e) { - throw new RuntimeException(e); - } - } - } - - @Override - public void nextTuple() { - String line; - try { - line = this.reader.readLine(); - if (line != null) { - this.collector.emit(new Values(line)); - } - } catch (final IOException e) { - throw new RuntimeException(e); - } - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/StormInMemorySpout.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/StormInMemorySpout.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/StormInMemorySpout.java deleted file mode 100644 index f6ae622..0000000 --- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/StormInMemorySpout.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.stormcompatibility.util; - -import backtype.storm.tuple.Values; - -/** - * Implements a Storm Spout that reads data from an in. - */ -public class StormInMemorySpout<T> extends AbstractStormSpout { - private static final long serialVersionUID = -4008858647468647019L; - - protected T[] source; - protected int counter = 0; - - public StormInMemorySpout(T[] source) { - this.source = source; - } - - @Override - public void nextTuple() { - if (this.counter < source.length) { - this.collector.emit(new Values(source[this.counter++])); - } - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/TupleOutputFormatter.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/TupleOutputFormatter.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/TupleOutputFormatter.java deleted file mode 100644 index 6419ee3..0000000 --- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/TupleOutputFormatter.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.stormcompatibility.util; - -import backtype.storm.tuple.Tuple; - -public class TupleOutputFormatter implements OutputFormatter { - private static final long serialVersionUID = -599665757723851761L; - - @Override - public String format(final Tuple input) { - final StringBuilder stringBuilder = new StringBuilder(); - stringBuilder.append("("); - for (final Object attribute : input.getValues()) { - stringBuilder.append(attribute); - stringBuilder.append(","); - } - stringBuilder.replace(stringBuilder.length() - 1, stringBuilder.length(), ")"); - return stringBuilder.toString(); - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCount.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCount.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCount.java deleted file mode 100644 index 6f7b6fb..0000000 --- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCount.java +++ /dev/null @@ -1,122 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.stormcompatibility.wordcount; - -import backtype.storm.topology.IRichBolt; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.typeutils.TypeExtractor; -import org.apache.flink.examples.java.wordcount.util.WordCountData; -import org.apache.flink.stormcompatibility.wordcount.stormoperators.StormBoltTokenizer; -import org.apache.flink.stormcompatibility.wrappers.StormBoltWrapper; -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; - -/** - * Implements the "WordCount" program that computes a simple word occurrence histogram over text files in a streaming - * fashion. The tokenizer step is performed by a Storm {@link IRichBolt bolt}. - * <p/> - * <p/> - * The input is a plain text file with lines separated by newline characters. - * <p/> - * <p/> - * Usage: <code>WordCount <text path> <result path></code><br> - * If no parameters are provided, the program is run with default data from {@link WordCountData}. - * <p/> - * <p/> - * This example shows how to: - * <ul> - * <li>use a Storm bolt within a Flink Streaming program.</li> - * </ul> - */ -public class BoltTokenizerWordCount { - - // ************************************************************************* - // PROGRAM - // ************************************************************************* - - public static void main(final String[] args) throws Exception { - - if (!parseParameters(args)) { - return; - } - - // set up the execution environment - final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - - // get input data - final DataStream<String> text = getTextDataStream(env); - - final DataStream<Tuple2<String, Integer>> counts = text - // split up the lines in pairs (2-tuples) containing: (word,1) - // this is done by a Storm bolt that is wrapped accordingly - .transform("StormBoltTokenizer", - TypeExtractor.getForObject(new Tuple2<String, Integer>("", 0)), - new StormBoltWrapper<String, Tuple2<String, Integer>>(new StormBoltTokenizer())) - // split up the lines in pairs (2-tuples) containing: (word,1) - // group by the tuple field "0" and sum up tuple field "1" - .keyBy(0).sum(1); - - // emit result - if (fileOutput) { - counts.writeAsText(outputPath); - } else { - counts.print(); - } - - // execute program - env.execute("Streaming WordCount with Storm bolt tokenizer"); - } - - // ************************************************************************* - // UTIL METHODS - // ************************************************************************* - - private static boolean fileOutput = false; - private static String textPath; - private static String outputPath; - - private static boolean parseParameters(final String[] args) { - - if (args.length > 0) { - // parse input arguments - fileOutput = true; - if (args.length == 2) { - textPath = args[0]; - outputPath = args[1]; - } else { - System.err.println("Usage: BoltTokenizerWordCount <text path> <result path>"); - return false; - } - } else { - System.out.println("Executing BoltTokenizerWordCount example with built-in default data"); - System.out.println(" Provide parameters to read input data from a file"); - System.out.println(" Usage: BoltTokenizerWordCount <text path> <result path>"); - } - return true; - } - - private static DataStream<String> getTextDataStream(final StreamExecutionEnvironment env) { - if (fileOutput) { - // read the text file from given input path - return env.readTextFile(textPath); - } - - return env.fromElements(WordCountData.WORDS); - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountPojo.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountPojo.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountPojo.java deleted file mode 100644 index 300f5bc..0000000 --- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountPojo.java +++ /dev/null @@ -1,135 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.stormcompatibility.wordcount; - -import backtype.storm.topology.IRichBolt; - -import org.apache.flink.api.java.io.CsvInputFormat; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.typeutils.PojoTypeInfo; -import org.apache.flink.api.java.typeutils.TypeExtractor; -import org.apache.flink.core.fs.Path; -import org.apache.flink.examples.java.wordcount.util.WordCountData; -import org.apache.flink.stormcompatibility.wordcount.stormoperators.StormBoltTokenizerByName; -import org.apache.flink.stormcompatibility.wordcount.stormoperators.WordCountDataPojos; -import org.apache.flink.stormcompatibility.wordcount.stormoperators.WordCountDataPojos.Sentence; -import org.apache.flink.stormcompatibility.wrappers.StormBoltWrapper; -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; - -/** - * Implements the "WordCount" program that computes a simple word occurrence histogram over text files in a streaming - * fashion. The tokenizer step is performed by a Storm {@link IRichBolt bolt}. In contrast to - * {@link BoltTokenizerWordCount} the tokenizer's input is a POJO type and the single field is accessed by name. - * <p/> - * <p/> - * The input is a plain text file with lines separated by newline characters. - * <p/> - * <p/> - * Usage: <code>WordCount <text path> <result path></code><br> - * If no parameters are provided, the program is run with default data from {@link WordCountData}. - * <p/> - * <p/> - * This example shows how to: - * <ul> - * <li>how to access attributes by name for POJO type input streams - * </ul> - */ -public class BoltTokenizerWordCountPojo { - - // ************************************************************************* - // PROGRAM - // ************************************************************************* - - public static void main(final String[] args) throws Exception { - - if (!parseParameters(args)) { - return; - } - - // set up the execution environment - final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - - // get input data - final DataStream<Sentence> text = getTextDataStream(env); - - final DataStream<Tuple2<String, Integer>> counts = text - // split up the lines in pairs (2-tuples) containing: (word,1) - // this is done by a Storm bolt that is wrapped accordingly - .transform("StormBoltTokenizer", - TypeExtractor.getForObject(new Tuple2<String, Integer>("", 0)), - new StormBoltWrapper<Sentence, Tuple2<String, Integer>>( - new StormBoltTokenizerByName())) - // split up the lines in pairs (2-tuples) containing: (word,1) - // group by the tuple field "0" and sum up tuple field "1" - .keyBy(0).sum(1); - - // emit result - if (fileOutput) { - counts.writeAsText(outputPath); - } else { - counts.print(); - } - - // execute program - env.execute("Streaming WordCount with Storm bolt tokenizer"); - } - - // ************************************************************************* - // UTIL METHODS - // ************************************************************************* - - private static boolean fileOutput = false; - private static String textPath; - private static String outputPath; - - private static boolean parseParameters(final String[] args) { - - if (args.length > 0) { - // parse input arguments - fileOutput = true; - if (args.length == 2) { - textPath = args[0]; - outputPath = args[1]; - } else { - System.err.println("Usage: BoltTokenizerWordCount <text path> <result path>"); - return false; - } - } else { - System.out.println("Executing BoltTokenizerWordCount example with built-in default data"); - System.out.println(" Provide parameters to read input data from a file"); - System.out.println(" Usage: BoltTokenizerWordCount <text path> <result path>"); - } - return true; - } - - private static DataStream<Sentence> getTextDataStream(final StreamExecutionEnvironment env) { - if (fileOutput) { - // read the text file from given input path - PojoTypeInfo<Sentence> sourceType = (PojoTypeInfo)TypeExtractor - .getForObject(new Sentence("")); - return env.createInput(new CsvInputFormat<Sentence>(new Path( - textPath), CsvInputFormat.DEFAULT_LINE_DELIMITER, - CsvInputFormat.DEFAULT_LINE_DELIMITER, sourceType), - sourceType); - } - - return env.fromElements(WordCountDataPojos.SENTENCES); - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountWithNames.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountWithNames.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountWithNames.java deleted file mode 100644 index ed01181..0000000 --- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountWithNames.java +++ /dev/null @@ -1,138 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.stormcompatibility.wordcount; - -import backtype.storm.topology.IRichBolt; -import backtype.storm.tuple.Fields; - -import org.apache.flink.api.java.io.CsvInputFormat; -import org.apache.flink.api.java.tuple.Tuple; -import org.apache.flink.api.java.tuple.Tuple1; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.typeutils.TupleTypeInfo; -import org.apache.flink.api.java.typeutils.TypeExtractor; -import org.apache.flink.core.fs.Path; -import org.apache.flink.examples.java.wordcount.util.WordCountData; -import org.apache.flink.stormcompatibility.wordcount.stormoperators.StormBoltTokenizerByName; -import org.apache.flink.stormcompatibility.wordcount.stormoperators.WordCountDataTuple; -import org.apache.flink.stormcompatibility.wrappers.StormBoltWrapper; -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; - -/** - * Implements the "WordCount" program that computes a simple word occurrence histogram over text files in a streaming - * fashion. The tokenizer step is performed by a Storm {@link IRichBolt bolt}. In contrast to - * {@link BoltTokenizerWordCount} the tokenizer's input is a {@link Tuple} type and the single field is accessed by - * name. - * <p/> - * <p/> - * The input is a plain text file with lines separated by newline characters. - * <p/> - * <p/> - * Usage: <code>WordCount <text path> <result path></code><br> - * If no parameters are provided, the program is run with default data from {@link WordCountData}. - * <p/> - * <p/> - * This example shows how to: - * <ul> - * <li>how to access attributes by name for {@link Tuple} type input streams - * </ul> - */ -public class BoltTokenizerWordCountWithNames { - - // ************************************************************************* - // PROGRAM - // ************************************************************************* - - public static void main(final String[] args) throws Exception { - - if (!parseParameters(args)) { - return; - } - - // set up the execution environment - final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - - // get input data - final DataStream<Tuple1<String>> text = getTextDataStream(env); - - final DataStream<Tuple2<String, Integer>> counts = text - // split up the lines in pairs (2-tuples) containing: (word,1) - // this is done by a Storm bolt that is wrapped accordingly - .transform("StormBoltTokenizer", - TypeExtractor.getForObject(new Tuple2<String, Integer>("", 0)), - new StormBoltWrapper<Tuple1<String>, Tuple2<String, Integer>>( - new StormBoltTokenizerByName(), new Fields("sentence"))) - // split up the lines in pairs (2-tuples) containing: (word,1) - // group by the tuple field "0" and sum up tuple field "1" - .keyBy(0).sum(1); - - // emit result - if (fileOutput) { - counts.writeAsText(outputPath); - } else { - counts.print(); - } - - // execute program - env.execute("Streaming WordCount with Storm bolt tokenizer"); - } - - // ************************************************************************* - // UTIL METHODS - // ************************************************************************* - - private static boolean fileOutput = false; - private static String textPath; - private static String outputPath; - - private static boolean parseParameters(final String[] args) { - - if (args.length > 0) { - // parse input arguments - fileOutput = true; - if (args.length == 2) { - textPath = args[0]; - outputPath = args[1]; - } else { - System.err.println("Usage: BoltTokenizerWordCount <text path> <result path>"); - return false; - } - } else { - System.out.println("Executing BoltTokenizerWordCount example with built-in default data"); - System.out.println(" Provide parameters to read input data from a file"); - System.out.println(" Usage: BoltTokenizerWordCount <text path> <result path>"); - } - return true; - } - - private static DataStream<Tuple1<String>> getTextDataStream(final StreamExecutionEnvironment env) { - if (fileOutput) { - // read the text file from given input path - TupleTypeInfo<Tuple1<String>> sourceType = (TupleTypeInfo<Tuple1<String>>)TypeExtractor - .getForObject(new Tuple1<String>("")); - return env.createInput(new CsvInputFormat<Tuple1<String>>(new Path( - textPath), CsvInputFormat.DEFAULT_LINE_DELIMITER, - CsvInputFormat.DEFAULT_LINE_DELIMITER, sourceType), - sourceType); - } - - return env.fromElements(WordCountDataTuple.TUPLES); - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/SpoutSourceWordCount.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/SpoutSourceWordCount.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/SpoutSourceWordCount.java deleted file mode 100644 index 21d7811..0000000 --- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/SpoutSourceWordCount.java +++ /dev/null @@ -1,157 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.stormcompatibility.wordcount; - -import backtype.storm.topology.IRichSpout; -import backtype.storm.utils.Utils; - -import org.apache.flink.api.common.functions.FlatMapFunction; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.typeutils.TypeExtractor; -import org.apache.flink.examples.java.wordcount.util.WordCountData; -import org.apache.flink.stormcompatibility.wordcount.stormoperators.StormWordCountFileSpout; -import org.apache.flink.stormcompatibility.wordcount.stormoperators.StormWordCountInMemorySpout; -import org.apache.flink.stormcompatibility.wrappers.StormFiniteSpoutWrapper; -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.util.Collector; - -/** - * Implements the "WordCount" program that computes a simple word occurrence histogram over text files in a streaming - * fashion. The used data source is a Storm {@link IRichSpout bolt}. - * <p/> - * <p/> - * The input is a plain text file with lines separated by newline characters. - * <p/> - * <p/> - * Usage: <code>WordCount <text path> <result path></code><br> - * If no parameters are provided, the program is run with default data from {@link WordCountData}. - * <p/> - * <p/> - * This example shows how to: - * <ul> - * <li>use a Storm spout within a Flink Streaming program.</li> - * </ul> - */ -public class SpoutSourceWordCount { - - // ************************************************************************* - // PROGRAM - // ************************************************************************* - - public static void main(final String[] args) throws Exception { - - if (!parseParameters(args)) { - return; - } - - // set up the execution environment - final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - - // get input data - final DataStream<String> text = getTextDataStream(env); - - final DataStream<Tuple2<String, Integer>> counts = - // split up the lines in pairs (2-tuples) containing: (word,1) - text.flatMap(new Tokenizer()) - // group by the tuple field "0" and sum up tuple field "1" - .keyBy(0).sum(1); - - // emit result - if (fileOutput) { - counts.writeAsText(outputPath); - } else { - counts.print(); - } - - // execute program - env.execute("Streaming WordCount with Storm spout source"); - } - - // ************************************************************************* - // USER FUNCTIONS - // ************************************************************************* - - /** - * Implements the string tokenizer that splits sentences into words as a user-defined FlatMapFunction. The function - * takes a line (String) and splits it into multiple pairs in the form of "(word,1)" (Tuple2<String, Integer>). - */ - public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> { - private static final long serialVersionUID = 1L; - - @Override - public void flatMap(final String value, final Collector<Tuple2<String, Integer>> out) throws Exception { - // normalize and split the line - final String[] tokens = value.toLowerCase().split("\\W+"); - - // emit the pairs - for (final String token : tokens) { - if (token.length() > 0) { - out.collect(new Tuple2<String, Integer>(token, 1)); - } - } - } - } - - // ************************************************************************* - // UTIL METHODS - // ************************************************************************* - - private static boolean fileOutput = false; - private static String textPath; - private static String outputPath; - - private static boolean parseParameters(final String[] args) { - - if (args.length > 0) { - // parse input arguments - fileOutput = true; - if (args.length == 2) { - textPath = args[0]; - outputPath = args[1]; - } else { - System.err.println("Usage: SpoutSourceWordCount <text path> <result path>"); - return false; - } - } else { - System.out.println("Executing SpoutSourceWordCount example with built-in default data"); - System.out.println(" Provide parameters to read input data from a file"); - System.out.println(" Usage: SpoutSourceWordCount <text path> <result path>"); - } - return true; - } - - private static DataStream<String> getTextDataStream(final StreamExecutionEnvironment env) { - if (fileOutput) { - // read the text file from given input path - final String[] tokens = textPath.split(":"); - final String localFile = tokens[tokens.length - 1]; - return env.addSource( - new StormFiniteSpoutWrapper<String>(new StormWordCountFileSpout(localFile), - new String[] { Utils.DEFAULT_STREAM_ID }), - TypeExtractor.getForClass(String.class)).setParallelism(1); - } - - return env.addSource( - new StormFiniteSpoutWrapper<String>(new StormWordCountInMemorySpout(), - new String[] { Utils.DEFAULT_STREAM_ID }), - TypeExtractor.getForClass(String.class)).setParallelism(1); - - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountLocal.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountLocal.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountLocal.java deleted file mode 100644 index 836c8e9..0000000 --- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountLocal.java +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.stormcompatibility.wordcount; - -import backtype.storm.LocalCluster; -import backtype.storm.generated.StormTopology; -import backtype.storm.utils.Utils; -import org.apache.flink.examples.java.wordcount.util.WordCountData; -import org.apache.flink.stormcompatibility.api.FlinkLocalCluster; -import org.apache.flink.stormcompatibility.api.FlinkTopologyBuilder; - -/** - * Implements the "WordCount" program that computes a simple word occurrence histogram over text files in a streaming - * fashion. The program is constructed as a regular {@link StormTopology} and submitted to Flink for execution in the - * same way as to a Storm {@link LocalCluster}. - * <p/> - * This example shows how to run program directly within Java, thus it cannot be used to submit a {@link StormTopology} - * via Flink command line clients (ie, bin/flink). - * <p/> - * <p/> - * The input is a plain text file with lines separated by newline characters. - * <p/> - * <p/> - * Usage: <code>WordCount <text path> <result path></code><br> - * If no parameters are provided, the program is run with default data from {@link WordCountData}. - * <p/> - * <p/> - * This example shows how to: - * <ul> - * <li>run a regular Storm program locally on Flink</li> - * </ul> - */ -public class StormWordCountLocal { - public final static String topologyId = "Streaming WordCount"; - - // ************************************************************************* - // PROGRAM - // ************************************************************************* - - public static void main(final String[] args) throws Exception { - - if (!WordCountTopology.parseParameters(args)) { - return; - } - - // build Topology the Storm way - final FlinkTopologyBuilder builder = WordCountTopology.buildTopology(); - - // execute program locally - final FlinkLocalCluster cluster = FlinkLocalCluster.getLocalCluster(); - cluster.submitTopology(topologyId, null, builder.createTopology()); - - Utils.sleep(10 * 1000); - - // TODO kill does no do anything so far - cluster.killTopology(topologyId); - cluster.shutdown(); - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountNamedLocal.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountNamedLocal.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountNamedLocal.java deleted file mode 100644 index f51afab..0000000 --- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountNamedLocal.java +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.stormcompatibility.wordcount; - -import backtype.storm.LocalCluster; -import backtype.storm.generated.StormTopology; -import backtype.storm.utils.Utils; -import org.apache.flink.examples.java.wordcount.util.WordCountData; -import org.apache.flink.stormcompatibility.api.FlinkLocalCluster; -import org.apache.flink.stormcompatibility.api.FlinkTopologyBuilder; - -/** - * Implements the "WordCount" program that computes a simple word occurrence histogram over text files in a streaming - * fashion. The program is constructed as a regular {@link StormTopology} and submitted to Flink for execution in the - * same way as to a Storm {@link LocalCluster}. In contrast to {@link StormWordCountLocal} all bolts access the field of - * input tuples by name instead of index. - * <p/> - * This example shows how to run program directly within Java, thus it cannot be used to submit a {@link StormTopology} - * via Flink command line clients (ie, bin/flink). - * <p/> - * <p/> - * The input is a plain text file with lines separated by newline characters. - * <p/> - * <p/> - * Usage: <code>WordCount <text path> <result path></code><br> - * If no parameters are provided, the program is run with default data from {@link WordCountData}. - * <p/> - * <p/> - * This example shows how to: - * <ul> - * <li>run a regular Storm program locally on Flink - * </ul> - */ -public class StormWordCountNamedLocal { - public final static String topologyId = "Streaming WordCountName"; - - // ************************************************************************* - // PROGRAM - // ************************************************************************* - - public static void main(final String[] args) throws Exception { - - if (!WordCountTopology.parseParameters(args)) { - return; - } - - // build Topology the Storm way - final FlinkTopologyBuilder builder = WordCountTopology.buildTopology(false); - - // execute program locally - final FlinkLocalCluster cluster = FlinkLocalCluster.getLocalCluster(); - cluster.submitTopology(topologyId, null, builder.createTopology()); - - Utils.sleep(10 * 1000); - - // TODO kill does no do anything so far - cluster.killTopology(topologyId); - cluster.shutdown(); - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountRemoteByClient.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountRemoteByClient.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountRemoteByClient.java deleted file mode 100644 index 3c79eda..0000000 --- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountRemoteByClient.java +++ /dev/null @@ -1,85 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.stormcompatibility.wordcount; - -import backtype.storm.Config; -import backtype.storm.generated.AlreadyAliveException; -import backtype.storm.generated.InvalidTopologyException; -import backtype.storm.generated.NotAliveException; -import backtype.storm.generated.StormTopology; -import backtype.storm.utils.NimbusClient; -import backtype.storm.utils.Utils; -import org.apache.flink.examples.java.wordcount.util.WordCountData; -import org.apache.flink.stormcompatibility.api.FlinkClient; -import org.apache.flink.stormcompatibility.api.FlinkTopologyBuilder; - -/** - * Implements the "WordCount" program that computes a simple word occurrence histogram over text files in a streaming - * fashion. The program is constructed as a regular {@link StormTopology} and submitted to Flink for execution in the - * same way as to a Storm cluster similar to {@link NimbusClient}. The Flink cluster can be local or remote. - * <p/> - * This example shows how to submit the program via Java, thus it cannot be used to submit a {@link StormTopology} via - * Flink command line clients (ie, bin/flink). - * <p/> - * <p/> - * The input is a plain text file with lines separated by newline characters. - * <p/> - * <p/> - * Usage: <code>WordCount <text path> <result path></code><br> - * If no parameters are provided, the program is run with default data from {@link WordCountData}. - * <p/> - * <p/> - * This example shows how to: - * <ul> - * <li>submit a regular Storm program to a local or remote Flink cluster.</li> - * </ul> - */ -public class StormWordCountRemoteByClient { - public final static String topologyId = "Streaming WordCount"; - private final static String uploadedJarLocation = "target/WordCount-StormTopology.jar"; - - // ************************************************************************* - // PROGRAM - // ************************************************************************* - - public static void main(final String[] args) throws AlreadyAliveException, InvalidTopologyException, - NotAliveException { - - if (!WordCountTopology.parseParameters(args)) { - return; - } - - // build Topology the Storm way - final FlinkTopologyBuilder builder = WordCountTopology.buildTopology(); - - // execute program on Flink cluster - final Config conf = new Config(); - // can be changed to remote address - conf.put(Config.NIMBUS_HOST, "localhost"); - // use default flink jobmanger.rpc.port - conf.put(Config.NIMBUS_THRIFT_PORT, 6123); - - final FlinkClient cluster = FlinkClient.getConfiguredClient(conf); - cluster.submitTopology(topologyId, uploadedJarLocation, builder.createTopology()); - - Utils.sleep(5 * 1000); - - cluster.killTopology(topologyId); - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountRemoteBySubmitter.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountRemoteBySubmitter.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountRemoteBySubmitter.java deleted file mode 100644 index de84f55..0000000 --- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountRemoteBySubmitter.java +++ /dev/null @@ -1,83 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.stormcompatibility.wordcount; - -import backtype.storm.Config; -import backtype.storm.StormSubmitter; -import backtype.storm.generated.StormTopology; -import org.apache.flink.examples.java.wordcount.util.WordCountData; -import org.apache.flink.stormcompatibility.api.FlinkClient; -import org.apache.flink.stormcompatibility.api.FlinkSubmitter; -import org.apache.flink.stormcompatibility.api.FlinkTopologyBuilder; - -/** - * Implements the "WordCount" program that computes a simple word occurrence histogram over text files in a streaming - * fashion. The program is constructed as a regular {@link StormTopology} and submitted to Flink for execution in the - * same way as to a Storm cluster similar to {@link StormSubmitter}. The Flink cluster can be local or remote. - * <p/> - * This example shows how to submit the program via Java as well as Flink's command line client (ie, bin/flink). - * <p/> - * <p/> - * The input is a plain text file with lines separated by newline characters. - * <p/> - * <p/> - * Usage: <code>WordCount <text path> <result path></code><br> - * If no parameters are provided, the program is run with default data from {@link WordCountData}. - * <p/> - * <p/> - * This example shows how to: - * <ul> - * <li>submit a regular Storm program to a local or remote Flink cluster.</li> - * </ul> - */ -public class StormWordCountRemoteBySubmitter { - public final static String topologyId = "Streaming WordCount"; - - // ************************************************************************* - // PROGRAM - // ************************************************************************* - - public static void main(final String[] args) throws Exception { - - if (!WordCountTopology.parseParameters(args)) { - return; - } - - // build Topology the Storm way - final FlinkTopologyBuilder builder = WordCountTopology.buildTopology(); - - // execute program on Flink cluster - final Config conf = new Config(); - // We can set Jobmanager host/port values manually or leave them blank - // if not set and - // - executed within Java, default values "localhost" and "6123" are set by FlinkSubmitter - // - executed via bin/flink values from flink-conf.yaml are set by FlinkSubmitter. - // conf.put(Config.NIMBUS_HOST, "localhost"); - // conf.put(Config.NIMBUS_THRIFT_PORT, new Integer(6123)); - - // The user jar file must be specified via JVM argument if executed via Java. - // => -Dstorm.jar=target/WordCount-StormTopology.jar - // If bin/flink is used, the jar file is detected automatically. - FlinkSubmitter.submitTopology(topologyId, conf, builder.createTopology()); - - Thread.sleep(5 * 1000); - - FlinkClient.getConfiguredClient(conf).killTopology(topologyId); - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/WordCountTopology.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/WordCountTopology.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/WordCountTopology.java deleted file mode 100644 index 45be821..0000000 --- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/WordCountTopology.java +++ /dev/null @@ -1,135 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.stormcompatibility.wordcount; - -import backtype.storm.generated.StormTopology; -import backtype.storm.tuple.Fields; - -import org.apache.flink.examples.java.wordcount.util.WordCountData; -import org.apache.flink.stormcompatibility.api.FlinkTopologyBuilder; -import org.apache.flink.stormcompatibility.util.OutputFormatter; -import org.apache.flink.stormcompatibility.util.StormBoltFileSink; -import org.apache.flink.stormcompatibility.util.StormBoltPrintSink; -import org.apache.flink.stormcompatibility.util.TupleOutputFormatter; -import org.apache.flink.stormcompatibility.wordcount.stormoperators.StormBoltCounter; -import org.apache.flink.stormcompatibility.wordcount.stormoperators.StormBoltCounterByName; -import org.apache.flink.stormcompatibility.wordcount.stormoperators.StormBoltTokenizer; -import org.apache.flink.stormcompatibility.wordcount.stormoperators.StormBoltTokenizerByName; -import org.apache.flink.stormcompatibility.wordcount.stormoperators.StormWordCountFileSpout; -import org.apache.flink.stormcompatibility.wordcount.stormoperators.StormWordCountInMemorySpout; - -/** - * Implements the "WordCount" program that computes a simple word occurrence histogram over text files in a streaming - * fashion. The program is constructed as a regular {@link StormTopology}. - * <p/> - * <p/> - * The input is a plain text file with lines separated by newline characters. - * <p/> - * <p/> - * Usage: <code>WordCount <text path> <result path></code><br> - * If no parameters are provided, the program is run with default data from {@link WordCountData}. - * <p/> - * <p/> - * This example shows how to: - * <ul> - * <li>how to construct a regular Storm topology as Flink program</li> - * </ul> - */ -public class WordCountTopology { - public final static String spoutId = "source"; - public final static String tokenierzerId = "tokenizer"; - public final static String counterId = "counter"; - public final static String sinkId = "sink"; - private final static OutputFormatter formatter = new TupleOutputFormatter(); - - public static FlinkTopologyBuilder buildTopology() { - return buildTopology(true); - } - - public static FlinkTopologyBuilder buildTopology(boolean indexOrName) { - - final FlinkTopologyBuilder builder = new FlinkTopologyBuilder(); - - // get input data - if (fileInputOutput) { - // read the text file from given input path - final String[] tokens = textPath.split(":"); - final String inputFile = tokens[tokens.length - 1]; - builder.setSpout(spoutId, new StormWordCountFileSpout(inputFile)); - } else { - builder.setSpout(spoutId, new StormWordCountInMemorySpout()); - } - - if (indexOrName) { - // split up the lines in pairs (2-tuples) containing: (word,1) - builder.setBolt(tokenierzerId, new StormBoltTokenizer(), 4).shuffleGrouping(spoutId); - // group by the tuple field "0" and sum up tuple field "1" - builder.setBolt(counterId, new StormBoltCounter(), 4).fieldsGrouping(tokenierzerId, - new Fields(StormBoltTokenizer.ATTRIBUTE_WORD)); - } else { - // split up the lines in pairs (2-tuples) containing: (word,1) - builder.setBolt(tokenierzerId, new StormBoltTokenizerByName(), 4).shuffleGrouping( - spoutId); - // group by the tuple field "0" and sum up tuple field "1" - builder.setBolt(counterId, new StormBoltCounterByName(), 4).fieldsGrouping( - tokenierzerId, new Fields(StormBoltTokenizerByName.ATTRIBUTE_WORD)); - } - - // emit result - if (fileInputOutput) { - // read the text file from given input path - final String[] tokens = outputPath.split(":"); - final String outputFile = tokens[tokens.length - 1]; - builder.setBolt(sinkId, new StormBoltFileSink(outputFile, formatter)).shuffleGrouping(counterId); - } else { - builder.setBolt(sinkId, new StormBoltPrintSink(formatter), 4).shuffleGrouping(counterId); - } - - return builder; - } - - // ************************************************************************* - // UTIL METHODS - // ************************************************************************* - - private static boolean fileInputOutput = false; - private static String textPath; - private static String outputPath; - - static boolean parseParameters(final String[] args) { - - if (args.length > 0) { - // parse input arguments - fileInputOutput = true; - if (args.length == 2) { - textPath = args[0]; - outputPath = args[1]; - } else { - System.err.println("Usage: StormWordCount* <text path> <result path>"); - return false; - } - } else { - System.out.println("Executing StormWordCount* example with built-in default data"); - System.out.println(" Provide parameters to read input data from a file"); - System.out.println(" Usage: StormWordCount* <text path> <result path>"); - } - - return true; - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/stormoperators/StormBoltCounter.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/stormoperators/StormBoltCounter.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/stormoperators/StormBoltCounter.java deleted file mode 100644 index 1544c19..0000000 --- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/stormoperators/StormBoltCounter.java +++ /dev/null @@ -1,88 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.stormcompatibility.wordcount.stormoperators; - -import backtype.storm.task.OutputCollector; -import backtype.storm.task.TopologyContext; -import backtype.storm.topology.IRichBolt; -import backtype.storm.topology.OutputFieldsDeclarer; -import backtype.storm.tuple.Fields; -import backtype.storm.tuple.Tuple; -import backtype.storm.tuple.Values; - -import java.util.HashMap; -import java.util.Map; - -/** - * Implements the word counter that the occurrence of each unique word. The bolt takes a pair (input tuple schema: - * {@code <String,Integer>}) and sums the given word count for each unique word (output tuple schema: - * {@code <String,Integer>} ). - */ -public class StormBoltCounter implements IRichBolt { - private static final long serialVersionUID = 399619605462625934L; - - public static final String ATTRIBUTE_WORD = "word"; - public static final String ATTRIBUTE_COUNT = "count"; - - private final HashMap<String, Count> counts = new HashMap<String, Count>(); - private OutputCollector collector; - - @SuppressWarnings("rawtypes") - @Override - public void prepare(final Map stormConf, final TopologyContext context, final OutputCollector collector) { - this.collector = collector; - } - - @Override - public void execute(final Tuple input) { - final String word = input.getString(StormBoltTokenizer.ATTRIBUTE_WORD_INDEX); - - Count currentCount = this.counts.get(word); - if (currentCount == null) { - currentCount = new Count(); - this.counts.put(word, currentCount); - } - currentCount.count += input.getInteger(StormBoltTokenizer.ATTRIBUTE_COUNT_INDEX); - - this.collector.emit(new Values(word, currentCount.count)); - } - - @Override - public void cleanup() {/* nothing to do */} - - @Override - public void declareOutputFields(final OutputFieldsDeclarer declarer) { - declarer.declare(new Fields(ATTRIBUTE_WORD, ATTRIBUTE_COUNT)); - } - - @Override - public Map<String, Object> getComponentConfiguration() { - return null; - } - - /** - * A counter helper to emit immutable tuples to the given stormCollector and avoid unnecessary object - * creating/deletion. - */ - private static final class Count { - public int count; - - public Count() {/* nothing to do */} - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/stormoperators/StormBoltCounterByName.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/stormoperators/StormBoltCounterByName.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/stormoperators/StormBoltCounterByName.java deleted file mode 100644 index bf940c3..0000000 --- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/stormoperators/StormBoltCounterByName.java +++ /dev/null @@ -1,88 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.stormcompatibility.wordcount.stormoperators; - -import backtype.storm.task.OutputCollector; -import backtype.storm.task.TopologyContext; -import backtype.storm.topology.IRichBolt; -import backtype.storm.topology.OutputFieldsDeclarer; -import backtype.storm.tuple.Fields; -import backtype.storm.tuple.Tuple; -import backtype.storm.tuple.Values; - -import java.util.HashMap; -import java.util.Map; - -/** - * Implements the word counter that the occurrence of each unique word. The bolt takes a pair (input tuple schema: - * {@code <String,Integer>}) and sums the given word count for each unique word (output tuple schema: - * {@code <String,Integer>} ). - */ -public class StormBoltCounterByName implements IRichBolt { - private static final long serialVersionUID = 399619605462625934L; - - public static final String ATTRIBUTE_WORD = "word"; - public static final String ATTRIBUTE_COUNT = "count"; - - private final HashMap<String, Count> counts = new HashMap<String, Count>(); - private OutputCollector collector; - - @SuppressWarnings("rawtypes") - @Override - public void prepare(final Map stormConf, final TopologyContext context, final OutputCollector collector) { - this.collector = collector; - } - - @Override - public void execute(final Tuple input) { - final String word = input.getStringByField(StormBoltTokenizer.ATTRIBUTE_WORD); - - Count currentCount = this.counts.get(word); - if (currentCount == null) { - currentCount = new Count(); - this.counts.put(word, currentCount); - } - currentCount.count += input.getIntegerByField(StormBoltTokenizer.ATTRIBUTE_COUNT); - - this.collector.emit(new Values(word, currentCount.count)); - } - - @Override - public void cleanup() {/* nothing to do */} - - @Override - public void declareOutputFields(final OutputFieldsDeclarer declarer) { - declarer.declare(new Fields(ATTRIBUTE_WORD, ATTRIBUTE_COUNT)); - } - - @Override - public Map<String, Object> getComponentConfiguration() { - return null; - } - - /** - * A counter helper to emit immutable tuples to the given stormCollector and avoid unnecessary object - * creating/deletion. - */ - private static final class Count { - public int count; - - public Count() {/* nothing to do */} - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/stormoperators/StormBoltTokenizer.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/stormoperators/StormBoltTokenizer.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/stormoperators/StormBoltTokenizer.java deleted file mode 100644 index dfb3e37..0000000 --- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/stormoperators/StormBoltTokenizer.java +++ /dev/null @@ -1,78 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.stormcompatibility.wordcount.stormoperators; - -import backtype.storm.task.OutputCollector; -import backtype.storm.task.TopologyContext; -import backtype.storm.topology.IRichBolt; -import backtype.storm.topology.OutputFieldsDeclarer; -import backtype.storm.tuple.Fields; -import backtype.storm.tuple.Tuple; -import backtype.storm.tuple.Values; - -import java.util.Map; - -/** - * Implements the string tokenizer that splits sentences into words as a Storm bolt. The bolt takes a line (input tuple - * schema: {@code <String>}) and splits it into multiple pairs in the form of "(word,1)" (output tuple schema: - * {@code <String,Integer>}). - * <p> - * Same as {@link StormBoltTokenizerByName}, but accesses input attribute by index (instead of name). - */ -public final class StormBoltTokenizer implements IRichBolt { - private static final long serialVersionUID = -8589620297208175149L; - - public static final String ATTRIBUTE_WORD = "word"; - public static final String ATTRIBUTE_COUNT = "count"; - - public static final int ATTRIBUTE_WORD_INDEX = 0; - public static final int ATTRIBUTE_COUNT_INDEX = 1; - - private OutputCollector collector; - - @SuppressWarnings("rawtypes") - @Override - public void prepare(final Map stormConf, final TopologyContext context, final OutputCollector collector) { - this.collector = collector; - } - - @Override - public void execute(final Tuple input) { - final String[] tokens = input.getString(0).toLowerCase().split("\\W+"); - - for (final String token : tokens) { - if (token.length() > 0) { - this.collector.emit(new Values(token, 1)); - } - } - } - - @Override - public void cleanup() {/* nothing to do */} - - @Override - public void declareOutputFields(final OutputFieldsDeclarer declarer) { - declarer.declare(new Fields(ATTRIBUTE_WORD, ATTRIBUTE_COUNT)); - } - - @Override - public Map<String, Object> getComponentConfiguration() { - return null; - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/stormoperators/StormBoltTokenizerByName.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/stormoperators/StormBoltTokenizerByName.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/stormoperators/StormBoltTokenizerByName.java deleted file mode 100644 index 8796b95..0000000 --- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/stormoperators/StormBoltTokenizerByName.java +++ /dev/null @@ -1,78 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.stormcompatibility.wordcount.stormoperators; - -import backtype.storm.task.OutputCollector; -import backtype.storm.task.TopologyContext; -import backtype.storm.topology.IRichBolt; -import backtype.storm.topology.OutputFieldsDeclarer; -import backtype.storm.tuple.Fields; -import backtype.storm.tuple.Tuple; -import backtype.storm.tuple.Values; - -import java.util.Map; - -/** - * Implements the string tokenizer that splits sentences into words as a Storm bolt. The bolt takes a line (input tuple - * schema: {@code <String>}) and splits it into multiple pairs in the form of "(word,1)" (output tuple schema: - * {@code <String,Integer>}). - * <p> - * Same as {@link StormBoltTokenizer}, but accesses input attribute by name (instead of index). - */ -public final class StormBoltTokenizerByName implements IRichBolt { - private static final long serialVersionUID = -8589620297208175149L; - - public static final String ATTRIBUTE_WORD = "word"; - public static final String ATTRIBUTE_COUNT = "count"; - - public static final int ATTRIBUTE_WORD_INDEX = 0; - public static final int ATTRIBUTE_COUNT_INDEX = 1; - - private OutputCollector collector; - - @SuppressWarnings("rawtypes") - @Override - public void prepare(final Map stormConf, final TopologyContext context, final OutputCollector collector) { - this.collector = collector; - } - - @Override - public void execute(final Tuple input) { - final String[] tokens = input.getStringByField("sentence").toLowerCase().split("\\W+"); - - for (final String token : tokens) { - if (token.length() > 0) { - this.collector.emit(new Values(token, 1)); - } - } - } - - @Override - public void cleanup() {/* nothing to do */} - - @Override - public void declareOutputFields(final OutputFieldsDeclarer declarer) { - declarer.declare(new Fields(ATTRIBUTE_WORD, ATTRIBUTE_COUNT)); - } - - @Override - public Map<String, Object> getComponentConfiguration() { - return null; - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/stormoperators/StormWordCountFileSpout.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/stormoperators/StormWordCountFileSpout.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/stormoperators/StormWordCountFileSpout.java deleted file mode 100644 index e994760..0000000 --- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/stormoperators/StormWordCountFileSpout.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.stormcompatibility.wordcount.stormoperators; - -import org.apache.flink.stormcompatibility.util.StormFileSpout; - -import backtype.storm.topology.OutputFieldsDeclarer; -import backtype.storm.tuple.Fields; - -/** - * Implements a Storm Spout that reads data from a given local file. - */ -public final class StormWordCountFileSpout extends StormFileSpout { - private static final long serialVersionUID = 2372251989250954503L; - - public StormWordCountFileSpout(String path) { - super(path); - } - - @Override - public void declareOutputFields(final OutputFieldsDeclarer declarer) { - declarer.declare(new Fields("sentence")); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/stormoperators/StormWordCountInMemorySpout.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/stormoperators/StormWordCountInMemorySpout.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/stormoperators/StormWordCountInMemorySpout.java deleted file mode 100644 index 372f66f..0000000 --- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/stormoperators/StormWordCountInMemorySpout.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.stormcompatibility.wordcount.stormoperators; - -import org.apache.flink.examples.java.wordcount.util.WordCountData; -import org.apache.flink.stormcompatibility.util.StormInMemorySpout; - -import backtype.storm.topology.OutputFieldsDeclarer; -import backtype.storm.tuple.Fields; - -/** - * Implements a Storm Spout that reads data from {@link WordCountData#WORDS}. - */ -public final class StormWordCountInMemorySpout extends StormInMemorySpout<String> { - private static final long serialVersionUID = 8832143302409465843L; - - public StormWordCountInMemorySpout() { - super(WordCountData.WORDS); - } - - @Override - public void declareOutputFields(final OutputFieldsDeclarer declarer) { - declarer.declare(new Fields("sentence")); - } -}