[GitHub] flink pull request #6170: [FLINK-9563]: Using a custom sink function for tes...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/6170 ---
[GitHub] flink pull request #6170: [FLINK-9563]: Using a custom sink function for tes...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6170#discussion_r197716343 --- Diff: flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java --- @@ -658,20 +668,26 @@ public String select(Map> pattern) { StringBuilder builder = new StringBuilder(); builder.append(pattern.get("start").get(0).getId()).append(",") - .append(pattern.get("middle").get(0).getId()).append(",") - .append(pattern.get("end").get(0).getId()); + .append(pattern.get("middle").get(0).getId()).append(",") --- End diff -- still needs to be reverted ---
[GitHub] flink pull request #6170: [FLINK-9563]: Using a custom sink function for tes...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6170#discussion_r197716214 --- Diff: flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java --- @@ -229,7 +206,7 @@ public void testSimplePatternEventTime() throws Exception { Tuple2.of(new Event(4, "end", 4.0), 10L), Tuple2.of(new Event(5, "middle", 5.0), 7L), // last element for high final watermark - Tuple2.of(new Event(5, "middle", 5.0), 100L) + Tuple2.of(new Event(6, "middle", 5.0), 100L) --- End diff -- why is this change necessary? ---
[GitHub] flink pull request #6170: [FLINK-9563]: Using a custom sink function for tes...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6170#discussion_r197717301 --- Diff: flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java --- @@ -580,12 +563,17 @@ public String select(Map> pattern) { } }); - result.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); + List resultList = new ArrayList<>(); + + DataStreamUtils.collect(result).forEachRemaining(resultList::add); + + List expected = Arrays.asList("1,5,6\n1,2,3\n4,5,6\n1,2,6".split("\n")); --- End diff -- changes this line to work like other tests, inline `Arrays.asList` into the `assertEquals` call, split the list right away instead of with `split()`. ---
[GitHub] flink pull request #6170: [FLINK-9563]: Using a custom sink function for tes...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6170#discussion_r197716849 --- Diff: flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java --- @@ -512,12 +491,16 @@ public String select(Map> pattern) { } ); - result.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); + List> resultList = new ArrayList<>(); + + DataStreamUtils.collect(result).forEachRemaining(resultList::add); - // the expected sequences of matching event ids - expected = "Left(1.0)\nLeft(2.0)\nLeft(2.0)\nRight(2.0,2.0,2.0)"; + resultList.sort(Comparator.comparing(either -> either.toString())); - env.execute(); + List> expected = Arrays.asList(Either.Left.of("1.0"), Either.Left.of("2.0"), --- End diff -- put each element on a new line: ``` Arrays.asList( a, b c ``` ---
[GitHub] flink pull request #6170: [FLINK-9563]: Using a custom sink function for tes...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6170#discussion_r197716928 --- Diff: flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java --- @@ -512,12 +491,16 @@ public String select(Map> pattern) { } ); - result.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); + List> resultList = new ArrayList<>(); + + DataStreamUtils.collect(result).forEachRemaining(resultList::add); - // the expected sequences of matching event ids - expected = "Left(1.0)\nLeft(2.0)\nLeft(2.0)\nRight(2.0,2.0,2.0)"; + resultList.sort(Comparator.comparing(either -> either.toString())); - env.execute(); + List> expected = Arrays.asList(Either.Left.of("1.0"), Either.Left.of("2.0"), + Either.Left.of("2.0"), Either.Right.of("2.0,2.0,2.0")); --- End diff -- indentation should only be 1 tab ---
[GitHub] flink pull request #6170: [FLINK-9563]: Using a custom sink function for tes...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6170#discussion_r197717352 --- Diff: flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java --- @@ -666,12 +654,17 @@ public String select(Map> pattern) { } ); - result.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); + List resultList = new ArrayList<>(); + + DataStreamUtils.collect(result).forEachRemaining(resultList::add); + + List expected = Arrays.asList("1,6,4\n1,5,4".split("\n")); --- End diff -- same as above ---
[GitHub] flink pull request #6170: [FLINK-9563]: Using a custom sink function for tes...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6170#discussion_r195826270 --- Diff: flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java --- @@ -708,10 +724,32 @@ public boolean filter(Tuple2 rec) throws Exception { } }); - result.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); + CollectSink.VALUES.clear(); - expected = "(1,a)\n(3,a)"; + result.map(new MapFunction, String>() { + @Override + public String map(Tuple2 value) throws Exception { + return value.toString(); + } + }).addSink(new CollectSink()); env.execute(); + + CollectSink.VALUES.sort(String::compareTo); + + List expected = Arrays.asList("(1,a)\n(3,a)".split("\n")); + + assertEquals(expected, CollectSink.VALUES); } + + private static class CollectSink implements SinkFunction { + + public static final List VALUES = new ArrayList<>(); + + @Override + public synchronized void invoke(String value) throws Exception { --- End diff -- this synchronization may not work correctly if multiple instances of this function exists. Synchronize on `VALUES` instead. ---
[GitHub] flink pull request #6170: [FLINK-9563]: Using a custom sink function for tes...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6170#discussion_r195824130 --- Diff: flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java --- @@ -573,19 +577,25 @@ public String select(Map> pattern) { StringBuilder builder = new StringBuilder(); builder.append(pattern.get("start").get(0).getId()).append(",") - .append(pattern.get("middle").get(0).getId()).append(",") - .append(pattern.get("end").get(0).getId()); + .append(pattern.get("middle").get(0).getId()).append(",") --- End diff -- indendation ---
[GitHub] flink pull request #6170: [FLINK-9563]: Using a custom sink function for tes...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6170#discussion_r195824665 --- Diff: flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java --- @@ -658,20 +668,26 @@ public String select(Map> pattern) { StringBuilder builder = new StringBuilder(); builder.append(pattern.get("start").get(0).getId()).append(",") - .append(pattern.get("middle").get(0).getId()).append(",") - .append(pattern.get("end").get(0).getId()); + .append(pattern.get("middle").get(0).getId()).append(",") --- End diff -- indentation ---
[GitHub] flink pull request #6170: [FLINK-9563]: Using a custom sink function for tes...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6170#discussion_r195823948 --- Diff: flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java --- @@ -360,27 +345,30 @@ public boolean filter(Event value) throws Exception { }); DataStream result = CEP.pattern(input, pattern).select( - new PatternSelectFunction() { + new PatternSelectFunction() { - @Override - public String select(Map> pattern) { - StringBuilder builder = new StringBuilder(); + @Override --- End diff -- indendation changes that weren't reversed ---
[GitHub] flink pull request #6170: [FLINK-9563]: Using a custom sink function for tes...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6170#discussion_r195824079 --- Diff: flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java --- @@ -504,20 +498,30 @@ public String select(Map> pattern) { StringBuilder builder = new StringBuilder(); builder.append(pattern.get("start").get(0).getPrice()).append(",") - .append(pattern.get("middle").get(0).getPrice()).append(",") - .append(pattern.get("end").get(0).getPrice()); + .append(pattern.get("middle").get(0).getPrice()).append(",") + .append(pattern.get("end").get(0).getPrice()); --- End diff -- indendation change, revert like the others ---
[GitHub] flink pull request #6170: [FLINK-9563]: Using a custom sink function for tes...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6170#discussion_r195824580 --- Diff: flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java --- @@ -708,10 +724,32 @@ public boolean filter(Tuple2 rec) throws Exception { } }); - result.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); + CollectSink.VALUES.clear(); - expected = "(1,a)\n(3,a)"; + result.map(new MapFunction, String>() { + @Override + public String map(Tuple2 value) throws Exception { + return value.toString(); --- End diff -- please do not compare as strings. This was only done for simplicity in the previous code so we don't have to re-parse the contents of the text file. This also applies to other tests. ---
[GitHub] flink pull request #6170: [FLINK-9563]: Using a custom sink function for tes...
GitHub user deepaks4077 opened a pull request: https://github.com/apache/flink/pull/6170 [FLINK-9563]: Using a custom sink function for tests in CEPITCase instead of writing to disk ## What is the purpose of the change This change modifies the CEPITCase integration test to use a custom sink function to collect and compare test results, instead of writing them to a file. It does not add/remove any constituent tests. ## Brief change log - Removed Before and After junit annotations - Added a custom sink function with a static arraylist to collect and compare test results ## Verifying this change This change is already covered by existing tests, such as CEPITCase, which is the end to end test of the CEP API. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? not applicable You can merge this pull request into a Git repository by running: $ git pull https://github.com/deepaks4077/flink FLINK-9563 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6170.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6170 commit 8fc629a557af4a56ab7638cc5eb519e163267cdc Author: Deepak Sharnma Date: 2018-06-13T02:41:27Z [FLINK-9563]: Using a custom sink function for tests in CEPITCase ---