[GitHub] flink pull request #6170: [FLINK-9563]: Using a custom sink function for tes...

2018-07-13 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/6170


---


[GitHub] flink pull request #6170: [FLINK-9563]: Using a custom sink function for tes...

2018-06-25 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6170#discussion_r197716343
  
--- Diff: 
flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java ---
@@ -658,20 +668,26 @@ public String select(Map> 
pattern) {
StringBuilder builder = new 
StringBuilder();
 

builder.append(pattern.get("start").get(0).getId()).append(",")
-   
.append(pattern.get("middle").get(0).getId()).append(",")
-   
.append(pattern.get("end").get(0).getId());
+   
.append(pattern.get("middle").get(0).getId()).append(",")
--- End diff --

still needs to be reverted


---


[GitHub] flink pull request #6170: [FLINK-9563]: Using a custom sink function for tes...

2018-06-25 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6170#discussion_r197716214
  
--- Diff: 
flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java ---
@@ -229,7 +206,7 @@ public void testSimplePatternEventTime() throws 
Exception {
Tuple2.of(new Event(4, "end", 4.0), 10L),
Tuple2.of(new Event(5, "middle", 5.0), 7L),
// last element for high final watermark
-   Tuple2.of(new Event(5, "middle", 5.0), 100L)
+   Tuple2.of(new Event(6, "middle", 5.0), 100L)
--- End diff --

why is this change necessary?


---


[GitHub] flink pull request #6170: [FLINK-9563]: Using a custom sink function for tes...

2018-06-25 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6170#discussion_r197717301
  
--- Diff: 
flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java ---
@@ -580,12 +563,17 @@ public String select(Map> 
pattern) {
}
});
 
-   result.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
+   List resultList = new ArrayList<>();
+
+   
DataStreamUtils.collect(result).forEachRemaining(resultList::add);
+
+   List expected = 
Arrays.asList("1,5,6\n1,2,3\n4,5,6\n1,2,6".split("\n"));
--- End diff --

changes this line to work like other tests, inline `Arrays.asList` into the 
`assertEquals` call, split the list right away instead of with `split()`.


---


[GitHub] flink pull request #6170: [FLINK-9563]: Using a custom sink function for tes...

2018-06-25 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6170#discussion_r197716849
  
--- Diff: 
flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java ---
@@ -512,12 +491,16 @@ public String select(Map> 
pattern) {
}
);
 
-   result.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
+   List> resultList = new ArrayList<>();
+
+   
DataStreamUtils.collect(result).forEachRemaining(resultList::add);
 
-   // the expected sequences of matching event ids
-   expected = 
"Left(1.0)\nLeft(2.0)\nLeft(2.0)\nRight(2.0,2.0,2.0)";
+   resultList.sort(Comparator.comparing(either -> 
either.toString()));
 
-   env.execute();
+   List> expected = 
Arrays.asList(Either.Left.of("1.0"), Either.Left.of("2.0"),
--- End diff --

put each  element on a new line:
```
Arrays.asList(
a,
b
c
``` 


---


[GitHub] flink pull request #6170: [FLINK-9563]: Using a custom sink function for tes...

2018-06-25 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6170#discussion_r197716928
  
--- Diff: 
flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java ---
@@ -512,12 +491,16 @@ public String select(Map> 
pattern) {
}
);
 
-   result.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
+   List> resultList = new ArrayList<>();
+
+   
DataStreamUtils.collect(result).forEachRemaining(resultList::add);
 
-   // the expected sequences of matching event ids
-   expected = 
"Left(1.0)\nLeft(2.0)\nLeft(2.0)\nRight(2.0,2.0,2.0)";
+   resultList.sort(Comparator.comparing(either -> 
either.toString()));
 
-   env.execute();
+   List> expected = 
Arrays.asList(Either.Left.of("1.0"), Either.Left.of("2.0"),
+   
Either.Left.of("2.0"), Either.Right.of("2.0,2.0,2.0"));
--- End diff --

indentation should only be 1 tab


---


[GitHub] flink pull request #6170: [FLINK-9563]: Using a custom sink function for tes...

2018-06-25 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6170#discussion_r197717352
  
--- Diff: 
flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java ---
@@ -666,12 +654,17 @@ public String select(Map> 
pattern) {
}
);
 
-   result.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
+   List resultList = new ArrayList<>();
+
+   
DataStreamUtils.collect(result).forEachRemaining(resultList::add);
+
+   List expected = 
Arrays.asList("1,6,4\n1,5,4".split("\n"));
--- End diff --

same as above


---


[GitHub] flink pull request #6170: [FLINK-9563]: Using a custom sink function for tes...

2018-06-15 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6170#discussion_r195826270
  
--- Diff: 
flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java ---
@@ -708,10 +724,32 @@ public boolean filter(Tuple2 rec) 
throws Exception {
}
});
 
-   result.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
+   CollectSink.VALUES.clear();
 
-   expected = "(1,a)\n(3,a)";
+   result.map(new MapFunction, String>() {
+   @Override
+   public String map(Tuple2 value) throws 
Exception {
+   return value.toString();
+   }
+   }).addSink(new CollectSink());
 
env.execute();
+
+   CollectSink.VALUES.sort(String::compareTo);
+
+   List expected = 
Arrays.asList("(1,a)\n(3,a)".split("\n"));
+
+   assertEquals(expected, CollectSink.VALUES);
}
+
+   private static class CollectSink implements SinkFunction {
+
+   public static final List VALUES = new ArrayList<>();
+
+   @Override
+   public synchronized void invoke(String value) throws Exception {
--- End diff --

this synchronization may not work correctly if multiple instances of this 
function exists. Synchronize on `VALUES` instead.


---


[GitHub] flink pull request #6170: [FLINK-9563]: Using a custom sink function for tes...

2018-06-15 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6170#discussion_r195824130
  
--- Diff: 
flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java ---
@@ -573,19 +577,25 @@ public String select(Map> 
pattern) {
StringBuilder builder = new StringBuilder();
 

builder.append(pattern.get("start").get(0).getId()).append(",")
-   
.append(pattern.get("middle").get(0).getId()).append(",")
-   
.append(pattern.get("end").get(0).getId());
+   
.append(pattern.get("middle").get(0).getId()).append(",")
--- End diff --

indendation


---


[GitHub] flink pull request #6170: [FLINK-9563]: Using a custom sink function for tes...

2018-06-15 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6170#discussion_r195824665
  
--- Diff: 
flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java ---
@@ -658,20 +668,26 @@ public String select(Map> 
pattern) {
StringBuilder builder = new 
StringBuilder();
 

builder.append(pattern.get("start").get(0).getId()).append(",")
-   
.append(pattern.get("middle").get(0).getId()).append(",")
-   
.append(pattern.get("end").get(0).getId());
+   
.append(pattern.get("middle").get(0).getId()).append(",")
--- End diff --

indentation


---


[GitHub] flink pull request #6170: [FLINK-9563]: Using a custom sink function for tes...

2018-06-15 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6170#discussion_r195823948
  
--- Diff: 
flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java ---
@@ -360,27 +345,30 @@ public boolean filter(Event value) throws Exception {
});
 
DataStream result = CEP.pattern(input, pattern).select(
-   new PatternSelectFunction() {
+   new PatternSelectFunction() {
 
-   @Override
-   public String select(Map> 
pattern) {
-   StringBuilder builder = new 
StringBuilder();
+   @Override
--- End diff --

indendation changes that weren't reversed


---


[GitHub] flink pull request #6170: [FLINK-9563]: Using a custom sink function for tes...

2018-06-15 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6170#discussion_r195824079
  
--- Diff: 
flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java ---
@@ -504,20 +498,30 @@ public String select(Map> 
pattern) {
StringBuilder builder = new 
StringBuilder();
 

builder.append(pattern.get("start").get(0).getPrice()).append(",")
-   
.append(pattern.get("middle").get(0).getPrice()).append(",")
-   
.append(pattern.get("end").get(0).getPrice());
+   
.append(pattern.get("middle").get(0).getPrice()).append(",")
+   
.append(pattern.get("end").get(0).getPrice());
--- End diff --

indendation change, revert like the others


---


[GitHub] flink pull request #6170: [FLINK-9563]: Using a custom sink function for tes...

2018-06-15 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6170#discussion_r195824580
  
--- Diff: 
flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java ---
@@ -708,10 +724,32 @@ public boolean filter(Tuple2 rec) 
throws Exception {
}
});
 
-   result.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
+   CollectSink.VALUES.clear();
 
-   expected = "(1,a)\n(3,a)";
+   result.map(new MapFunction, String>() {
+   @Override
+   public String map(Tuple2 value) throws 
Exception {
+   return value.toString();
--- End diff --

please do not compare as strings. This was only done for simplicity in the 
previous code so we don't have to re-parse the contents of the text file.

This also applies to other tests.


---


[GitHub] flink pull request #6170: [FLINK-9563]: Using a custom sink function for tes...

2018-06-14 Thread deepaks4077
GitHub user deepaks4077 opened a pull request:

https://github.com/apache/flink/pull/6170

[FLINK-9563]: Using a custom sink function for tests in CEPITCase instead 
of writing to disk

## What is the purpose of the change

This change modifies the CEPITCase integration test to use a custom sink 
function to collect and compare test results, instead of writing them to a 
file. It does not add/remove any constituent tests.

## Brief change log

- Removed Before and After junit annotations
- Added a custom sink function with a static arraylist to collect and 
compare test results

## Verifying this change

This change is already covered by existing tests, such as CEPITCase, which 
is the end to end test of the CEP API.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): no
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
  - The serializers: no
  - The runtime per-record code paths (performance sensitive): no
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no
  - The S3 file system connector: no

## Documentation

  - Does this pull request introduce a new feature? no
  - If yes, how is the feature documented? not applicable


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/deepaks4077/flink FLINK-9563

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6170.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6170


commit 8fc629a557af4a56ab7638cc5eb519e163267cdc
Author: Deepak Sharnma 
Date:   2018-06-13T02:41:27Z

[FLINK-9563]: Using a custom sink function for tests in CEPITCase




---