[GitHub] flink pull request: FLINK-1560 - Add ITCases for streaming example...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/519 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: FLINK-1560 - Add ITCases for streaming example...
Github user mbalassi commented on the pull request: https://github.com/apache/flink/pull/519#issuecomment-90250843 Thanks, now the Twitter one runs fine. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: FLINK-1560 - Add ITCases for streaming example...
Github user szape commented on the pull request: https://github.com/apache/flink/pull/519#issuecomment-86565008 I tried to fix issue FLINK-1662, that is related to IterateExample. Gyula, what do you think? To see the actual bug, run IterateExample with parallelism 1 without the last bugfix commit. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: FLINK-1560 - Add ITCases for streaming example...
Github user mbalassi commented on the pull request: https://github.com/apache/flink/pull/519#issuecomment-86569586 @szape: I've started merging this: The TwitterStreamITCase seems to fail spuriously. Could you look into it? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: FLINK-1560 - Add ITCases for streaming example...
Github user szape commented on the pull request: https://github.com/apache/flink/pull/519#issuecomment-86582509 It runs just fine on my computer. Did you pull the latest state of the branch? The TwitterStreamData class was updated not so long ago. Also, the IterateExample isn't running with parallelism higher than one. It throws something like not enough slot. I couldn't resolve this bug by now. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: FLINK-1560 - Add ITCases for streaming example...
GitHub user szape opened a pull request: https://github.com/apache/flink/pull/519 FLINK-1560 - Add ITCases for streaming examples ITCases for streaming examples have been added, except for IterateExample and StockPrices. I replaced the IterateExample with one that generates Fibonacci-sequences. Something is bugous around iteration, so the ITCase have to wait. StockPrices is untestable because of the windowJoin operator, hence it will not have an ITCase right now. You can merge this pull request into a Git repository by running: $ git pull https://github.com/mbalassi/flink FLINK-1560 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/519.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #519 commit 16da0d092ddc192afdae3a3c5a0a57144390c8ff Author: szape nemderogator...@gmail.com Date: 2015-03-05T15:27:44Z [Flink-1560] [streaming] Streaming examples rework commit 45fe9bd3381ef9cf99e8c9f0570e66e38d78b383 Author: szape nemderogator...@gmail.com Date: 2015-03-05T15:30:31Z [Flink-1560] [streaming] Added ITCases to streaming examples commit 237f08d3eaf2da36c8a9d48145b2544dcbb41012 Author: szape nemderogator...@gmail.com Date: 2015-03-23T09:28:37Z [FLINK-1560] [streaming] Added iterate example --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: FLINK-1560 - Add ITCases for streaming example...
Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/519#discussion_r26938775 --- Diff: flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/socket/SocketTextStreamWordCountITCase.java --- @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.examples.test.socket; + +import org.apache.flink.streaming.examples.socket.SocketTextStreamWordCount; +import org.apache.flink.streaming.examples.socket.util.SocketTextStreamWordCountData; +import org.apache.flink.streaming.util.StreamingProgramTestBase; + +import java.io.PrintWriter; +import java.net.ServerSocket; +import java.net.Socket; + +public class SocketTextStreamWordCountITCase extends StreamingProgramTestBase { + + private static final String HOST = localhost; + private static final String PORT = ; + protected String resultPath; + + private ServerSocket temporarySocket; + + @Override + protected void preSubmit() throws Exception { + temporarySocket = createSocket(HOST, Integer.valueOf(PORT), SocketTextStreamWordCountData.SOCKET_TEXT); + resultPath = getTempDirPath(result); + } + + @Override + protected void postSubmit() throws Exception { + compareResultsByLinesInMemory(SocketTextStreamWordCountData.STREAMING_COUNTS_AS_TUPLES, resultPath); + temporarySocket.close(); + } + + @Override + protected void testProgram() throws Exception { + SocketTextStreamWordCount.main(new String[]{HOST, PORT, resultPath}); + } + + public ServerSocket createSocket(String host, int port, String contents) throws Exception { + ServerSocket serverSocket = new ServerSocket(port); + ServerThread st = new ServerThread(serverSocket, contents); + st.start(); + return serverSocket; + } + + private static class ServerThread extends Thread { + + private ServerSocket serverSocket; + private String contents; + private Thread t; + + public ServerThread(ServerSocket serverSocket, String contents) { + this.serverSocket = serverSocket; + this.contents = contents; + t = new Thread(this); + } + + public void waitForAccept() throws Exception { + Socket socket = serverSocket.accept(); + PrintWriter writer = new PrintWriter(socket.getOutputStream(), true); + writer.println(contents); + writer.close(); + socket.close(); + } + + public void run() { + try { + waitForAccept(); + } catch (Exception e) { + e.printStackTrace(); --- End diff -- I would fail the test in case of an exception. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: FLINK-1560 - Add ITCases for streaming example...
Github user mbalassi commented on a diff in the pull request: https://github.com/apache/flink/pull/519#discussion_r26939215 --- Diff: flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java --- @@ -103,57 +115,124 @@ public static void main(String[] args) throws Exception { // * /** -* Iteration step function which takes an input (Double , Integer) and -* produces an output (Double + random, Integer + 1). +* Generate random integer pairs from the range from 0 to BOUND/2 +*/ + private static class RandomFibonacciSource implements SourceFunctionTuple2Integer, Integer { + private static final long serialVersionUID = 1L; + + private Random rnd = new Random(); + + @Override + public void run(CollectorTuple2Integer, Integer collector) throws Exception { + while(true) { + int first = rnd.nextInt(BOUND/2 - 1) + 1; + int second = rnd.nextInt(BOUND/2 - 1) + 1; + + collector.collect(new Tuple2Integer, Integer(first, second)); + Thread.sleep(100L); + } + } + + @Override + public void cancel() { + // no cleanup needed + } + } + + /** +* Generate random integer pairs from the range from 0 to BOUND/2 */ - public static class Step extends - RichMapFunctionTuple2Double, Integer, Tuple2Double, Integer { + private static class FibonacciInputMap implements MapFunctionString, Tuple2Integer, Integer { private static final long serialVersionUID = 1L; - private transient Random rnd; - public void open(Configuration parameters) { - rnd = new Random(); + @Override + public Tuple2Integer, Integer map(String value) throws Exception { + Thread.sleep(100L); + String record = value.substring(1, value.length()-1); + String[] splitted = record.split(,); + return new Tuple2Integer, Integer(Integer.parseInt(splitted[0]), Integer.parseInt(splitted[1])); } + } + + /** +* Map the inputs so that the next Fibonacci numbers can be calculated while preserving the original input tuple +* A counter is attached to the tuple and incremented in every iteration step +*/ + public static class InputMap implements MapFunctionTuple2Integer, Integer, Tuple5Integer, Integer, Integer, Integer, Integer { + + @Override + public Tuple5Integer, Integer, Integer, Integer, Integer map(Tuple2Integer, Integer value) throws + Exception { + return new Tuple5Integer, Integer, Integer, Integer, Integer(value.f0, value.f1, value.f0, value.f1, 0); + } + } + + /** +* Iteration step function that calculates the next Fibonacci number +*/ + public static class Step implements + MapFunctionTuple5Integer, Integer, Integer, Integer, Integer, Tuple5Integer, Integer, Integer, Integer, Integer { + private static final long serialVersionUID = 1L; @Override - public Tuple2Double, Integer map(Tuple2Double, Integer value) throws Exception { - return new Tuple2Double, Integer(value.f0 + rnd.nextDouble(), value.f1 + 1); + public Tuple5Integer, Integer, Integer, Integer, Integer map(Tuple5Integer, Integer, Integer, Integer, Integer value) throws Exception { + return new Tuple5Integer, Integer, Integer, Integer, Integer(value.f0, value.f1, value.f3, value.f2 + value.f3, ++value.f4); } } /** * OutputSelector testing which tuple needs to be iterated again. */ - public static class MySelector implements OutputSelectorTuple2Double, Integer { + public static class MySelector implements OutputSelectorTuple5Integer, Integer, Integer, Integer, Integer { private static final long serialVersionUID = 1L; @Override - public IterableString select(Tuple2Double, Integer value) { + public IterableString select(Tuple5Integer, Integer, Integer, Integer, Integer value) { ListString output = new ArrayListString(); - if (value.f0 100) { - output.add(output); -
[GitHub] flink pull request: FLINK-1560 - Add ITCases for streaming example...
Github user mbalassi commented on the pull request: https://github.com/apache/flink/pull/519#issuecomment-85019063 Please update this for parallelism of 4. Besides that and the inline comments looks good. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: FLINK-1560 - Add ITCases for streaming example...
Github user mbalassi commented on a diff in the pull request: https://github.com/apache/flink/pull/519#discussion_r26939465 --- Diff: flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/twitter/TwitterStream.java --- @@ -176,11 +176,14 @@ private static boolean parseParameters(String[] args) { // parse input arguments fileOutput = true; if (args.length == 2) { + fileInput = true; textPath = args[0]; outputPath = args[1]; + } else if (args.length == 1) { + outputPath = args[0]; } else { System.err.println(USAGE:\nTwitterStream pathToPropertiesFile result path); - return false; + return true; } --- End diff -- Why return true here? This way this function always returns true. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: FLINK-1560 - Add ITCases for streaming example...
Github user szape commented on the pull request: https://github.com/apache/flink/pull/519#issuecomment-85075533 Thank you guys! I corrected the errors. Updating the ITCases for parallelism 4 will be easier than I thought, but windowing makes it hard to test WindowJoin and TopSpeedWindowingExample with higher parallelism. So, parallelism will be set to 4 by default, but in the two mentioned cases it is better set to 1. What dou you think? Is there an easy workaround to this problem? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---