[GitHub] flink issue #4502: [FLINK-7062] [table, cep] Support the basic functionality...

2018-07-25 Thread dianfu
Github user dianfu commented on the issue:

https://github.com/apache/flink/pull/4502
  
@twalthr  Have updated the PR and looking forward to your comments.


---


[GitHub] flink pull request #5141: [FLINK-8226] [cep] Dangling reference generated af...

2017-12-31 Thread dianfu
Github user dianfu commented on a diff in the pull request:

https://github.com/apache/flink/pull/5141#discussion_r159149194
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
 ---
@@ -191,22 +191,28 @@ public boolean isEmpty() {
 */
public boolean prune(long pruningTimestamp) {
Iterator<Map.Entry<K, SharedBufferPage<K, V>>> iter = 
pages.entrySet().iterator();
-   boolean pruned = false;
+   List<SharedBufferEntry<K, V>> prunedEntries = new ArrayList<>();
 
while (iter.hasNext()) {
SharedBufferPage<K, V> page = iter.next().getValue();
 
-   if (page.prune(pruningTimestamp)) {
-   pruned = true;
-   }
+   page.prune(pruningTimestamp, prunedEntries);
 
if (page.isEmpty()) {
// delete page if it is empty
iter.remove();
}
}
 
-   return pruned;
+   if (!prunedEntries.isEmpty()) {
+   for (Map.Entry<K, SharedBufferPage<K, V>> entry : 
pages.entrySet()) {
+   entry.getValue().removeEdges(prunedEntries);
+   }
+   prunedEntries.clear();
--- End diff --

Updated.


---


[GitHub] flink pull request #5141: [FLINK-8226] [cep] Dangling reference generated af...

2017-12-27 Thread dianfu
Github user dianfu commented on a diff in the pull request:

https://github.com/apache/flink/pull/5141#discussion_r158909117
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
 ---
@@ -82,9 +82,12 @@
 
private transient Map<K, SharedBufferPage<K, V>> pages;
 
+   private final transient List<SharedBufferEntry<K, V>> prunedEntries;
--- End diff --

Updated the PR. Thanks a lot for the review. :)


---


[GitHub] flink pull request #5141: [FLINK-8226] [cep] Dangling reference generated af...

2017-12-27 Thread dianfu
Github user dianfu commented on a diff in the pull request:

https://github.com/apache/flink/pull/5141#discussion_r158908615
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
 ---
@@ -82,9 +82,12 @@
 
private transient Map<K, SharedBufferPage<K, V>> pages;
 
+   private final transient List<SharedBufferEntry<K, V>> prunedEntries;
--- End diff --

Make sense, I will change it to local variable.


---


[GitHub] flink pull request #5141: [FLINK-8226] [cep] Dangling reference generated af...

2017-12-27 Thread dianfu
Github user dianfu commented on a diff in the pull request:

https://github.com/apache/flink/pull/5141#discussion_r15833
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
 ---
@@ -82,9 +82,12 @@
 
private transient Map<K, SharedBufferPage<K, V>> pages;
 
+   private final transient List<SharedBufferEntry<K, V>> prunedEntries;
--- End diff --

It is for object reuse. What's your thought?


---


[GitHub] flink issue #5142: [FLINK-8227] Optimize the performance of SharedBufferSeri...

2017-12-27 Thread dianfu
Github user dianfu commented on the issue:

https://github.com/apache/flink/pull/5142
  
The page number is the number of patterns and so it is usually very small.  
The `long` is only needed in theory just as you said MAX_INT * MAX_INT 
resulting to MAX_LONG. If this makes sense, then the `HashMap` may be should 
also be replaced as the entry number in one page may be MAX_LONG in theory too. 
 

Changing `int` to `long` will introduce state incompatible issue and we 
need to take care of that when performing serialization/deserialization. I just 
wonder if it is worth making this change.


---


[GitHub] flink issue #5141: [FLINK-8226] [cep] Dangling reference generated after NFA...

2017-12-20 Thread dianfu
Github user dianfu commented on the issue:

https://github.com/apache/flink/pull/5141
  
@dawidwys  Could you help to take a look at this PR? This is a bug fix and 
the issue can be easily reproduced with the test case included in the PR. 
Thanks a lot.


---


[GitHub] flink issue #5142: [FLINK-8227] Optimize the performance of SharedBufferSeri...

2017-12-20 Thread dianfu
Github user dianfu commented on the issue:

https://github.com/apache/flink/pull/5142
  
@dawidwys  @StephanEwen Sorry for late response. For question 1 and 2, I 
have the same thought with @dawidwys and have updated the PR accordingly. For 
question 3,  I think `int` is enough as we currently store `SharedBufferEntry` 
in a `HashMap` for each `SharedBufferPage`, and the size of  `HashMap` is 
`int`. If we want to support `long`, we should also change `HashMap` to 
something else. What's your thought?


---


[GitHub] flink pull request #5142: [FLINK-8227] Optimize the performance of SharedBuf...

2017-12-08 Thread dianfu
GitHub user dianfu opened a pull request:

https://github.com/apache/flink/pull/5142

[FLINK-8227] Optimize the performance of SharedBufferSerializer

## What is the purpose of the change

*This pull request optimize the performance of SharedBufferSerializer*

## Verifying this change

This change is a performance improvement without any test coverage.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/dianfu/flink optimize_sharedbuffer

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5142.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5142


commit b586abec579ef7f251333032c9385d7e71f3799b
Author: Dian Fu <fudian...@alibaba-inc.com>
Date:   2017-12-09T03:51:04Z

[FLINK-8227] Optimize the performance of SharedBufferSerializer




---


[GitHub] flink pull request #5141: [FLINK-8226] [cep] Dangling reference generated af...

2017-12-08 Thread dianfu
GitHub user dianfu opened a pull request:

https://github.com/apache/flink/pull/5141

[FLINK-8226] [cep] Dangling reference generated after NFA clean up timed 
out SharedBufferEntry

…med out SharedBufferEntry


*Thank you very much for contributing to Apache Flink - we are happy that 
you want to help us improve Flink. To help the community review your 
contribution in the best possible way, please go through the checklist below, 
which will get the contribution into a shape in which it can be best reviewed.*

*Please understand that we do not do this to make contributions to Flink a 
hassle. In order to uphold a high standard of quality for code contributions, 
while at the same time managing a large number of contributions, we need 
contributors to prepare the contributions well, and give reviewers enough 
contextual information for the review. Please also understand that 
contributions that do not follow this guide will take longer to review and thus 
typically be picked up with lower priority by the community.*

## What is the purpose of the change

*(For example: This pull request fix the issue that dangling reference 
generated after NFA clean up timed out SharedBufferEntry. Exception will be 
thrown when serializing NFA.*


## Verifying this change

This change added tests and can be verified as follows:

*(example:)*
  - *Added tests NFATest#testTimeoutWindowPruning2*

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/dianfu/flink dangling_ref

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5141.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5141


commit 982bfafaabcfbfd78f4fcbdd9438eab9c8be65bb
Author: Dian Fu <fudian...@alibaba-inc.com>
Date:   2017-12-09T02:55:14Z

[FLINK-8226] [cep] Dangling reference generated after NFA clean up timed 
out SharedBufferEntry




---


[GitHub] flink issue #5063: [FLINK-8144] [table] Optimize the timer logic in RowTimeU...

2017-11-28 Thread dianfu
Github user dianfu commented on the issue:

https://github.com/apache/flink/pull/5063
  
The current known impact is that it will need to checkpoint more timer. For 
the impact to performance, I will do some benchmark later. :)   
I think we only need to consider this optimization if there are cases where 
users have to use punctuated watermark in the way that generating one watermark 
for every incoming event. I will reopen this PR if we find a persuading use 
case and reason. :)


---


[GitHub] flink pull request #5036: [FLINK-8106] [cep] Optimize the timer logic in Abs...

2017-11-28 Thread dianfu
Github user dianfu closed the pull request at:

https://github.com/apache/flink/pull/5036


---


[GitHub] flink issue #5036: [FLINK-8106] [cep] Optimize the timer logic in AbstractKe...

2017-11-28 Thread dianfu
Github user dianfu commented on the issue:

https://github.com/apache/flink/pull/5036
  
Close this PR for the same reason as #5063 


---


[GitHub] flink issue #5063: [FLINK-8144] [table] Optimize the timer logic in RowTimeU...

2017-11-28 Thread dianfu
Github user dianfu commented on the issue:

https://github.com/apache/flink/pull/5063
  
For historical reasons, we only use punctuated watermark currently. But I 
think you are right, we should consider using periodic watermark instead. 
Thanks a lot for your explanation. I will close this PR.


---


[GitHub] flink pull request #5063: [FLINK-8144] [table] Optimize the timer logic in R...

2017-11-28 Thread dianfu
Github user dianfu closed the pull request at:

https://github.com/apache/flink/pull/5063


---


[GitHub] flink issue #5063: [FLINK-8144] [table] Optimize the timer logic in RowTimeU...

2017-11-27 Thread dianfu
Github user dianfu commented on the issue:

https://github.com/apache/flink/pull/5063
  
@fhueske Thanks a lot for your comments. Definitely agree with you on the 
motivation of punctuated watermark. But in practice, there are many cases there 
is no watermark information in the source data at all. So we have to generate 
one watermark for every incoming event.


---


[GitHub] flink pull request #5080: [FLINK-8159] [cep] Add rich support for SelectWrap...

2017-11-27 Thread dianfu
GitHub user dianfu opened a pull request:

https://github.com/apache/flink/pull/5080

[FLINK-8159] [cep] Add rich support for SelectWrapper and FlatSelectWrapper

## What is the purpose of the change

*This pull request add the rich support for SelectWrapper and 
FlatSelectWrapper. It the wrapped functions are rich function, it should 
process correctly.*


## Verifying this change

This change is a trivial rework / code cleanup without any test coverage.


## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): ( no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/dianfu/flink SelectFunction

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5080.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5080


commit 4c3ccb008b38d44189578975b5eee9208561567b
Author: Dian Fu <fudian...@alibaba-inc.com>
Date:   2017-11-27T12:50:30Z

[FLINK-8159] [cep] Add rich support for SelectWrapper and FlatSelectWrapper




---


[GitHub] flink issue #5063: [FLINK-8144] [table] Optimize the timer logic in RowTimeU...

2017-11-26 Thread dianfu
Github user dianfu commented on the issue:

https://github.com/apache/flink/pull/5063
  
@fhueske  Yes, the bad performance is related to the way we generate the 
watermarks. But this is the way how punctuated watermark works. If we do not 
optimize for punctuated watermark, then are we suggesting users not to use 
punctuated watermark for unbounded over window?

For the side effect `By registering timers on different timestamps, we have 
many timers that all will fire when a watermark is received.`, I think we can 
add a variable such as `lastProcessedWatermark` to record the last processed 
watermark. Then there will be no much overhead. Thoughts?


---


[GitHub] flink pull request #5063: [FLINK-8144] [table] Optimize the timer logic in R...

2017-11-24 Thread dianfu
Github user dianfu commented on a diff in the pull request:

https://github.com/apache/flink/pull/5063#discussion_r152955709
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeUnboundedOver.scala
 ---
@@ -116,7 +116,7 @@ abstract class RowTimeUnboundedOver(
 // discard late record
 if (timestamp > curWatermark) {
   // ensure every key just registers one timer
-  ctx.timerService.registerEventTimeTimer(curWatermark + 1)
+  ctx.timerService.registerEventTimeTimer(timestamp)
--- End diff --

@fhueske Thanks a lot for your comments. Your concern makes sense to me. I 
think the current implementation is ok under periodic watermark. But I'm not 
sure if it's optimal under punctuated watermark. We will perform some 
performance test for unbounded over under punctuated watermark and share the 
results.


---


[GitHub] flink issue #5063: [FLINK-8144] [table] Optimize the timer logic in RowTimeU...

2017-11-24 Thread dianfu
Github user dianfu commented on the issue:

https://github.com/apache/flink/pull/5063
  
{{AbstractKeyedCEPPatternOperator}} has similar logic as 
{{RowTimeUnboundedOver}}. As described in FLINK-8106, we find that the 
performance is very bad under the current logic for 
{{AbstractKeyedCEPPatternOperator}}. The throughput can increase from 10+ tps 
to about 3500 tps for one operator in the case of RocksDBStateBackend after 
optimizing the timer logic. I think the optimization should also apply to 
{{RowTimeUnboundedOver}}.

BTW: the watermark we use in the CEP use case is 
{{AssignerWithPunctuatedWatermarks}}. It will generate one watermark for every 
input element.


---


[GitHub] flink pull request #5063: [FLINK-8144] [table] Optimize the timer logic in R...

2017-11-24 Thread dianfu
GitHub user dianfu opened a pull request:

https://github.com/apache/flink/pull/5063

[FLINK-8144] [table] Optimize the timer logic in RowTimeUnboundedOver

## What is the purpose of the change

*This pull request optimize the timer handling in RowTimeUnboundedOver. 
Currently the MapState will be scanned a lot of times if the watermark arrives 
some seconds later than the event.*

## Verifying this change

This change is already covered by existing tests, such as 
*OverWindowHarnessTest.testRowTimeUnboundedRangeOver*.


## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/dianfu/flink optimize_timer_over

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5063.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5063


commit a7d7635be1b126283573ac5b55472a79d94ac0fb
Author: Dian Fu <fudian...@alibaba-inc.com>
Date:   2017-11-24T09:28:02Z

[FLINK-8144] [table] Optimize the timer logic in RowTimeUnboundedOver




---


[GitHub] flink pull request #5036: [FLINK-8106] [cep] Optimize the timer logic in Abs...

2017-11-20 Thread dianfu
GitHub user dianfu opened a pull request:

https://github.com/apache/flink/pull/5036

[FLINK-8106] [cep] Optimize the timer logic in 
AbstractKeyedCEPPatternOperator

## What is the purpose of the change

*This pull request optimize the performance of 
AbstractKeyedCEPPatternOperator*

## Verifying this change
  - *Existing tests*

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/dianfu/flink optimize_timer_cep

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5036.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5036


commit 74d432f998afe38ed85ff24481521e5db09805c0
Author: Dian Fu <fudian...@alibaba-inc.com>
Date:   2017-11-20T08:20:53Z

[FLINK-8106] [cep] Optimize the timer logic in 
AbstractKeyedCEPPatternOperator




---


[GitHub] flink pull request #5025: [FLINK-8096] [table] Fix time material issue when ...

2017-11-17 Thread dianfu
Github user dianfu commented on a diff in the pull request:

https://github.com/apache/flink/pull/5025#discussion_r151625442
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
 ---
@@ -234,11 +234,12 @@ abstract class StreamTableEnvironment(
 "UpsertStreamTableSink requires that Table has a full primary 
keys if it is updated.")
 }
 val outputType = sink.getOutputType
+val resultType = getResultType(table.getRelNode, optimizedPlan)
--- End diff --

The `resultType` generated from the optimized plan contains time indicator 
information. In `StreamTableEnvironment.translate`, it needs this information 
to transform the time indicator column to `TimeStamp` type. For the type 
consistent issue, it will be validated during converting CRow to output type:

https://github.com/apache/flink/blob/81dc260dc653085b9dbf098e8fd70a72d2d0828e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala#L941


---


[GitHub] flink pull request #5025: [FLINK-8096] [table] Fix time material issue when ...

2017-11-16 Thread dianfu
Github user dianfu commented on a diff in the pull request:

https://github.com/apache/flink/pull/5025#discussion_r151609882
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala
 ---
@@ -179,6 +179,32 @@ class TimeAttributesITCase extends 
StreamingMultipleProgramsTestBase {
   }
 
   @Test
+  def testCalcMaterialization3(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+val tEnv = TableEnvironment.getTableEnvironment(env)
+MemoryTableSinkUtil.clear
+
+val stream = env
+  .fromCollection(data)
+  .assignTimestampsAndWatermarks(new TimestampWithEqualWatermark())
+val table = stream.toTable(tEnv, 'rowtime.rowtime, 'int, 'double, 
'float, 'bigdec, 'string)
+
+val t = table
--- End diff --

Removed.


---


[GitHub] flink pull request #5025: [FLINK-8096] [table] Fix time material issue when ...

2017-11-16 Thread dianfu
Github user dianfu commented on a diff in the pull request:

https://github.com/apache/flink/pull/5025#discussion_r151609700
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
 ---
@@ -234,11 +234,12 @@ abstract class StreamTableEnvironment(
 "UpsertStreamTableSink requires that Table has a full primary 
keys if it is updated.")
 }
 val outputType = sink.getOutputType
+val resultType = getResultType(table.getRelNode, optimizedPlan)
--- End diff --

Do you mean adding method such as `getResultType` in `TableSink`? The 
`resultType`  means the type of the result of the current sql query. IMO, even 
we can specify the `resultType` in `TableSink`, it can only be used for 
validation purpose, we should not use it directly. Thoughts?


---


[GitHub] flink pull request #5027: [FLINK-8097] [table] Add built-in support for min/...

2017-11-16 Thread dianfu
GitHub user dianfu opened a pull request:

https://github.com/apache/flink/pull/5027

[FLINK-8097] [table] Add built-in support for min/max aggregation for 
Date/Time

## What is the purpose of the change

*This PR adds built-in support for min/max aggregation for Date/Time*

## Verifying this change

  - *Added tests in MaxAggFunctionTest, MaxWithRetractAggFunctionTest, 
MinAggFunctionTest, MinWithRetractAggFunctionTest*

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/dianfu/flink FLINK-8097

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5027.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5027


commit 7c0cf68279570cb8f8bd00e18b8fb38de943a645
Author: Dian Fu <fudian...@alibaba-inc.com>
Date:   2017-11-17T05:22:47Z

[FLINK-8097] [table] Add built-in support for min/max aggregation for 
Date/Time




---


[GitHub] flink pull request #5025: [FLINK-8096] [table] Fix time material issue when ...

2017-11-16 Thread dianfu
GitHub user dianfu opened a pull request:

https://github.com/apache/flink/pull/5025

[FLINK-8096] [table] Fix time material issue when write to TableSink

## What is the purpose of the change

*This pull request fix the time material issue when write to TableSink.*

## Verifying this change

This change added tests and can be verified as follows:

*(example:)*
  - *Added test TimeAttributesITCase.testCalcMaterialization3*

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/dianfu/flink fix-result-type

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5025.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5025


commit e672bf610e87dfd5e9847fa6324bf3ab2572b50b
Author: Dian Fu <fudian...@alibaba-inc.com>
Date:   2017-11-17T02:53:31Z

[FLINK-8096] [table] Fix time material issue when write to TableSink




---


[GitHub] flink issue #4936: [FLINK-7962] Add built-in support for min/max aggregation...

2017-11-14 Thread dianfu
Github user dianfu commented on the issue:

https://github.com/apache/flink/pull/4936
  
@wuchong Thanks a lot for your review. 
@fhueske It would be great if you can also give some feedback.



---


[GitHub] flink pull request #4936: [FLINK-7962] Add built-in support for min/max aggr...

2017-11-01 Thread dianfu
GitHub user dianfu opened a pull request:

https://github.com/apache/flink/pull/4936

[FLINK-7962] Add built-in support for min/max aggregation for Timestamp

## What is the purpose of the change

*This JIRA adds the built-in support for min/max aggregation for Timestamp.*


## Brief change log

  - *Add TimestampMinAggFunction, TimestampMinWithRetractAggFunction, 
TimestampMaxAggFunction and TimestampMaxWithRetractAggFunction*
  - *Add TimestampOrdering*

## Verifying this change

This change added tests and can be verified as follows:

  - *Added TimestampMaxAggFunctionTest, 
TimestampMaxWithRetractAggFunctionTest, TimestampMinAggFunctionTest and 
TimestampMinWithRetractAggFunctionTest*

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/dianfu/flink timestamp_minmax

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4936.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4936


commit 8c69d1daae156852fc88b5d5ac20683a8979c353
Author: Dian Fu <fudian...@alibaba-inc.com>
Date:   2017-11-02T03:26:25Z

[FLINK-7962] Add built-in support for min/max aggregation for Timestamp




---


[GitHub] flink issue #4502: [FLINK-7062] [table, cep] Support the basic functionality...

2017-09-14 Thread dianfu
Github user dianfu commented on the issue:

https://github.com/apache/flink/pull/4502
  
@fhueske Could you help to take a look at this PR in case you missed? This 
PR has been ready for more than one month. Very appreciated.  


---


[GitHub] flink issue #4513: [FLINK-6938] [cep] IterativeCondition should support Rich...

2017-09-06 Thread dianfu
Github user dianfu commented on the issue:

https://github.com/apache/flink/pull/4513
  
@dawidwys Any comments?


---


[GitHub] flink issue #4513: [FLINK-6938] [cep] IterativeCondition should support Rich...

2017-08-27 Thread dianfu
Github user dianfu commented on the issue:

https://github.com/apache/flink/pull/4513
  
@dawidwys It would be great if you could take a look. Very appreciated!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4296: [FLINK-7147] [cep] Support greedy quantifier in CE...

2017-08-27 Thread dianfu
Github user dianfu commented on a diff in the pull request:

https://github.com/apache/flink/pull/4296#discussion_r135441351
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
 ---
@@ -421,6 +437,15 @@ private void addStopStateToLooping(final State 
loopingState) {
untilCondition,
true);
 
+   if 
(currentPattern.getQuantifier().hasProperty(Quantifier.QuantifierProperty.GREEDY)
 &&
+   times.getFrom() != times.getTo()) {
+   if (untilCondition != null) {
+   State sinkStateCopy = 
copy(sinkState);
+   
originalStateMap.put(sinkState.getName(), sinkStateCopy);
--- End diff --

originalStateMap is used when compiling the NFA and it will be collected 
after NFA is created and so I think it's unnecessary to clear the entries.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4296: [FLINK-7147] [cep] Support greedy quantifier in CE...

2017-08-27 Thread dianfu
Github user dianfu commented on a diff in the pull request:

https://github.com/apache/flink/pull/4296#discussion_r135440842
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
 ---
@@ -526,18 +551,32 @@ private boolean isPatternOptional(Pattern<T, ?> 
pattern) {
return createGroupPatternState((GroupPattern) 
currentPattern, sinkState, proceedState, isOptional);
}
 
-   final IterativeCondition trueFunction = 
getTrueFunction();
-
final State singletonState = 
createState(currentPattern.getName(), State.StateType.Normal);
// if event is accepted then all notPatterns previous 
to the optional states are no longer valid
final State sink = 
copyWithoutTransitiveNots(sinkState);
singletonState.addTake(sink, takeCondition);
 
+   // if no element accepted the previous nots are still 
valid.
+   final IterativeCondition proceedCondition = 
getTrueFunction();
+
// for the first state of a group pattern, its PROCEED 
edge should point to the following state of
// that group pattern and the edge will be added at the 
end of creating the NFA for that group pattern
if (isOptional && !headOfGroup(currentPattern)) {
-   // if no element accepted the previous nots are 
still valid.
-   singletonState.addProceed(proceedState, 
trueFunction);
+   if 
(currentPattern.getQuantifier().hasProperty(Quantifier.QuantifierProperty.GREEDY))
 {
+   final IterativeCondition 
untilCondition =
+   (IterativeCondition) 
currentPattern.getUntilCondition();
+   if (untilCondition != null) {
+   singletonState.addProceed(
+   
originalStateMap.get(proceedState.getName()),
+   new 
AndCondition<>(proceedCondition, untilCondition));
--- End diff --

When untilCondition holds, the loop should break and the state should 
proceed to the next state. This is covered by the test case 
GreedyITCase#testGreedyUntilWithDummyEventsBeforeQuantifier.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #4296: [FLINK-7147] [cep] Support greedy quantifier in CEP

2017-08-24 Thread dianfu
Github user dianfu commented on the issue:

https://github.com/apache/flink/pull/4296
  
Sure. Have created FLINK-7496 to track this issue.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #4513: [FLINK-6938] [cep] IterativeCondition should support Rich...

2017-08-21 Thread dianfu
Github user dianfu commented on the issue:

https://github.com/apache/flink/pull/4513
  
@dawidwys I have updated the PR and it currently only contains changes of 
the RichFunction.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #4563: [FLINK-7479] [cep] Support to retrieve the past event by ...

2017-08-21 Thread dianfu
Github user dianfu commented on the issue:

https://github.com/apache/flink/pull/4563
  
@dawidwys Have updated the PR according to your comments.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #4563: [FLINK-7479] [cep] Support to retrieve the past event by ...

2017-08-21 Thread dianfu
Github user dianfu commented on the issue:

https://github.com/apache/flink/pull/4563
  
Thanks @dawidwys for the suggestion. I will update the PR accordingly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #4513: [FLINK-6938][FLINK-6939] [cep] Not store IterativeConditi...

2017-08-21 Thread dianfu
Github user dianfu commented on the issue:

https://github.com/apache/flink/pull/4513
  
Thanks @dawidwys for the remind. Yes, you're right and that make sense to 
me. I will update the the PR and remove ConditionRegistry related changes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4563: [FLINK-7479] [cep] Support to retrieve the past ev...

2017-08-18 Thread dianfu
GitHub user dianfu opened a pull request:

https://github.com/apache/flink/pull/4563

[FLINK-7479] [cep] Support to retrieve the past event by an offset

## What is the purpose of the change

*Currently, it's already able to retrieve events matched to the specifed 
pattern in IterativeCondition.Context. While there are also requirements to 
retrieve events by an physical offset. The retrieved events may not be matched 
to any pattern.*


## Brief change log

  - *Add API retain() in Pattern*
  - *Buffer the past events in NFA.process*
  - *Access the past events by the newly added API getEventByOffset in 
IterativeCondition.Context*


## Verifying this change

This change added tests and can be verified as follows:

  - *Added test in IterativeConditionsITCase*

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)

## Documentation

  - Does this pull request introduce a new feature? (yes)
  - If yes, how is the feature documented? (docs / JavaDocs)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/dianfu/flink FLINK-7479

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4563.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4563


commit e9feb55e6c2f7d32ec6266049be0ea9bbff967b2
Author: Dian Fu <fudian...@alibaba-inc.com>
Date:   2017-08-18T12:50:25Z

[FLINK-7479] [cep] Support to retrieve the past event by an offset




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #4502: [FLINK-7062] [table, cep] Support the basic functionality...

2017-08-17 Thread dianfu
Github user dianfu commented on the issue:

https://github.com/apache/flink/pull/4502
  
@fhueske @kl0u @dawidwys @wuchong Any feedback?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #4523: [FLINK-7123] [cep] Support timesOrMore in CEP

2017-08-17 Thread dianfu
Github user dianfu commented on the issue:

https://github.com/apache/flink/pull/4523
  
@kl0u That's OK. Thanks a lot. :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #4513: [FLINK-6938][FLINK-6939] [cep] Not store IterativeConditi...

2017-08-16 Thread dianfu
Github user dianfu commented on the issue:

https://github.com/apache/flink/pull/4513
  
@dawidwys @kl0u In case you missed this PR, could you help to take a look 
at? Very appreciated.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #4523: [FLINK-7123] [cep] Support timesOrMore in CEP

2017-08-16 Thread dianfu
Github user dianfu commented on the issue:

https://github.com/apache/flink/pull/4523
  
@dawidwys @kl0u Could you help to take a look at this PR? Thanks a lot.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4331: [FLINK-7169][CEP] Support AFTER MATCH SKIP functio...

2017-08-13 Thread dianfu
Github user dianfu commented on a diff in the pull request:

https://github.com/apache/flink/pull/4331#discussion_r132867483
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---
@@ -340,6 +362,65 @@ public void resetNFAChanged() {
return Tuple2.of(result, timeoutResult);
}
 
+   private void 
discardComputationStatesAccordingToStrategy(Queue<ComputationState> 
computationStates,
+   Collection<Map<String, List>> matchedResult, 
AfterMatchSkipStrategy afterMatchSkipStrategy) {
+   Set discardEvents = new HashSet<>();
+   switch(afterMatchSkipStrategy.getStrategy()) {
+   case SKIP_TO_LAST:
+   for (Map<String, List> resultMap: 
matchedResult) {
+   for (Map.Entry<String, List> 
keyMatches : resultMap.entrySet()) {
+   if 
(keyMatches.getKey().equals(afterMatchSkipStrategy.getPatternName())) {
+   
discardEvents.addAll(keyMatches.getValue().subList(0, 
keyMatches.getValue().size() - 1));
+   break;
+   } else {
+   
discardEvents.addAll(keyMatches.getValue());
+   }
+   }
+   }
+   break;
+   case SKIP_TO_FIRST:
+   for (Map<String, List> resultMap: 
matchedResult) {
+   for (Map.Entry<String, List> 
keyMatches : resultMap.entrySet()) {
+   if 
(keyMatches.getKey().equals(afterMatchSkipStrategy.getPatternName())) {
+   break;
+   } else {
+   
discardEvents.addAll(keyMatches.getValue());
+   }
+   }
+   }
+   break;
+   case SKIP_PAST_LAST_EVENT:
+   for (Map<String, List> resultMap: 
matchedResult) {
+   for (List eventList: 
resultMap.values()) {
+   discardEvents.addAll(eventList);
+   }
+   }
+   break;
+   }
+   if (!discardEvents.isEmpty()) {
+   List<ComputationState> discardStates = new 
ArrayList<>();
+   for (ComputationState computationState : 
computationStates) {
+   Map<String, List> partialMatch = 
extractCurrentMatches(computationState);
+   for (List list: partialMatch.values()) {
+   for (T e: list) {
+   if (discardEvents.contains(e)) {
+   // discard the 
computation state.
+   
eventSharedBuffer.release(
+   
NFAStateNameHandler.getOriginalNameFromInternal(
+   
computationState.getState().getName()),
+   
computationState.getEvent(),
+   
computationState.getTimestamp(),
+   
computationState.getCounter()
+   );
+   
discardStates.add(computationState);
--- End diff --

Should add **break;** after **discardStates.add(computationState);**, right?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4523: [FLINK-7123] [cep] Support timesOrMore in CEP

2017-08-10 Thread dianfu
GitHub user dianfu opened a pull request:

https://github.com/apache/flink/pull/4523

[FLINK-7123] [cep] Support timesOrMore in CEP


## What is the purpose of the change

*This pull request adds timesOrMore to CEP pattern API*


## Verifying this change

This change added tests and can be verified as follows:

  - *Added test in TimesOrMoreITCase*

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: ( no)

## Documentation

  - Does this pull request introduce a new feature? (yes)
  - If yes, how is the feature documented? (docs)



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/dianfu/flink timesOrMore

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4523.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4523


commit 6ccbb0f53f1a860ae05b3d24c17408cf22a5aab0
Author: Dian Fu <fudian...@alibaba-inc.com>
Date:   2017-08-11T03:29:01Z

[FLINK-7123] [cep] Support timesOrMore in CEP




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #4502: [FLINK-7062] [table, cep] Support the basic functionality...

2017-08-10 Thread dianfu
Github user dianfu commented on the issue:

https://github.com/apache/flink/pull/4502
  
@fhueske Great, thanks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #4296: [FLINK-7147] [cep] Support greedy quantifier in CEP

2017-08-10 Thread dianfu
Github user dianfu commented on the issue:

https://github.com/apache/flink/pull/4296
  
@dawidwys Any comments?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #4513: [FLINK-6938][FLINK-6939] [cep] Not store IterativeConditi...

2017-08-10 Thread dianfu
Github user dianfu commented on the issue:

https://github.com/apache/flink/pull/4513
  
As discussed with @wuchong offline, I will continue the work of #4145 as 
this is required by the feature of cep on sql, see the PR of FLINK-7062 for 
details. 

@dawidwys @kl0u Could you help to review? Actually, I just rebased the code 
of #4145 and addressed the comments there. Most of the fixes refer the test 
branch https://github.com/kl0u/flink/tree/cep-iter-pr by @kl0u. I have not 
removed IterativeCondition as I think it breaks the backward compatibility.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4513: [FLINK-6938][FLINK-6939] [cep] Not store Iterative...

2017-08-10 Thread dianfu
GitHub user dianfu opened a pull request:

https://github.com/apache/flink/pull/4513

[FLINK-6938][FLINK-6939] [cep] Not store IterativeCondition with NFA state 
and support RichFunction interface

## What is the purpose of the change

*The core idea is that the StateTransition is unique in a NFA graph. So we 
store the conditions with a map which mapping from StateTransition to 
IterativeCondition, so the conditions can not serialized with NFA state. If I 
missed something, please point out.

This PR also includes FLINK-6938: IterativeCondition supports RichFunction 
interface.*


## Verifying this change

This change is already covered by existing tests*.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): ( no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (JavaDocs)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/dianfu/flink IterativeCondition

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4513.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4513


commit e8f4bfd55eb50b151b8160c7c8f8901114aa7606
Author: Jark Wu <wuchong...@alibaba-inc.com>
Date:   2017-06-20T06:02:21Z

[FLINK-6938][FLINK-6939] [cep] Not store IterativeCondition with NFA state 
and support RichFunction interface

commit 994bc06c11bffbf129d7160a4db2a16db01199d8
Author: Jark Wu <wuchong...@alibaba-inc.com>
Date:   2017-06-20T13:25:59Z

minor change

commit ae26dab180943f9b6609341d7447718f772b8b19
Author: Jark Wu <wuchong...@alibaba-inc.com>
Date:   2017-06-22T03:17:47Z

address dawid's comments

commit e8c28049d2eb96500f53d6e7f284ae074280b5a1
Author: Dian Fu <fudian...@alibaba-inc.com>
Date:   2017-08-10T12:25:50Z

Rebase the code




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #4502: [FLINK-7062] [table, cep] Support the basic functionality...

2017-08-10 Thread dianfu
Github user dianfu commented on the issue:

https://github.com/apache/flink/pull/4502
  
@kl0u @dawidwys @wuchong It will be great if you could take a look at this 
PR. This PR add the basic support for cep on sql. Thanks in advance. :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4502: [FLINK-7062] [table, cep] Support the basic functi...

2017-08-09 Thread dianfu
GitHub user dianfu opened a pull request:

https://github.com/apache/flink/pull/4502

[FLINK-7062] [table, cep] Support the basic functionality of MATCH_RECOGNIZE

## What is the purpose of the change

*This pull request adds the basic support of  MATCH_RECOGNIZE.*


## Brief change log

*(for example:)*
  - *The MATCH_RECOGNIZE clause is transformed to CEP job with the existing 
CEP API*


## Verifying this change


This change added tests and can be verified as follows:
  - *Added test that validates MATCH_RECOGNIZE is parsed correctly and 
expected results are got*

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)

## Documentation

  - Does this pull request introduce a new feature? (yes)
  - If yes, how is the feature documented? (JavaDocs)



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/dianfu/flink FLINK-7062

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4502.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4502


commit 34f7704f72cdb9cf0e3808e779b35d72eadb3e7f
Author: Dian Fu <fudian...@alibaba-inc.com>
Date:   2017-08-08T11:10:24Z

[FLINK-7062] [table, cep] Support the basic functionality of MATCH_RECOGNIZE




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4296: [FLINK-7147] [cep] Support greedy quantifier in CE...

2017-08-07 Thread dianfu
Github user dianfu commented on a diff in the pull request:

https://github.com/apache/flink/pull/4296#discussion_r131804949
  
--- Diff: 
flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/GreedyITCase.java
 ---
@@ -0,0 +1,1068 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.nfa;
+
+import org.apache.flink.cep.Event;
+import org.apache.flink.cep.nfa.compiler.NFACompiler;
+import org.apache.flink.cep.pattern.Pattern;
+import org.apache.flink.cep.pattern.conditions.SimpleCondition;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.TestLogger;
+
+import com.google.common.collect.Lists;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.flink.cep.nfa.NFATestUtilities.compareMaps;
+import static org.apache.flink.cep.nfa.NFATestUtilities.feedNFA;
+
+/**
+ * IT tests covering {@link Pattern#greedy()}.
+ */
+public class GreedyITCase extends TestLogger {
+
+   @Test
+   public void testGreedyZeroOrMore() {
+   List<StreamRecord> inputEvents = new ArrayList<>();
+
+   Event c = new Event(40, "c", 1.0);
+   Event a1 = new Event(41, "a", 2.0);
+   Event a2 = new Event(42, "a", 2.0);
+   Event a3 = new Event(43, "a", 2.0);
+   Event d = new Event(44, "d", 3.0);
+
+   inputEvents.add(new StreamRecord<>(c, 1));
+   inputEvents.add(new StreamRecord<>(a1, 2));
+   inputEvents.add(new StreamRecord<>(a2, 3));
+   inputEvents.add(new StreamRecord<>(a3, 4));
+   inputEvents.add(new StreamRecord<>(d, 5));
+
+   // c a* d
+   Pattern<Event, ?> pattern = 
Pattern.begin("start").where(new SimpleCondition() {
+   private static final long serialVersionUID = 
5726188262756267490L;
+
+   @Override
+   public boolean filter(Event value) throws Exception {
+   return value.getName().equals("c");
+   }
+   }).followedBy("middle").where(new SimpleCondition() {
+   private static final long serialVersionUID = 
5726188262756267490L;
+
+   @Override
+   public boolean filter(Event value) throws Exception {
+   return value.getName().equals("a");
+   }
+   }).oneOrMore().optional().greedy().followedBy("end").where(new 
SimpleCondition() {
+   private static final long serialVersionUID = 
5726188262756267490L;
+
+   @Override
+   public boolean filter(Event value) throws Exception {
+   return value.getName().equals("d");
+   }
+   });
+
+   NFA nfa = NFACompiler.compile(pattern, 
Event.createTypeSerializer(), false);
+
+   final List<List> resultingPatterns = 
feedNFA(inputEvents, nfa);
+
+   compareMaps(resultingPatterns, Lists.<List>newArrayList(
+   Lists.newArrayList(c, a1, a2, a3, d)
+   ));
+   }
+
+   @Test
+   public void testGreedyZeroOrMoreConsecutiveEndWithOptional() {
+   List<StreamRecord> inputEvents = new ArrayList<>();
+
+   Event c = new Event(40, "c", 1.0);
+   Event a1 = new Event(41, "a", 2.0);
+   Event a2 = new Event(42, "a", 2.0);
+   Event a3 = new Event(43, "a", 2.0);
+   Event d = new Event(44, "d", 3.0);
+
+   inputEvents.add(new Stream

[GitHub] flink pull request #4296: [FLINK-7147] [cep] Support greedy quantifier in CE...

2017-08-07 Thread dianfu
Github user dianfu commented on a diff in the pull request:

https://github.com/apache/flink/pull/4296#discussion_r131804902
  
--- Diff: 
flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/GreedyITCase.java
 ---
@@ -0,0 +1,1068 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.nfa;
+
+import org.apache.flink.cep.Event;
+import org.apache.flink.cep.nfa.compiler.NFACompiler;
+import org.apache.flink.cep.pattern.Pattern;
+import org.apache.flink.cep.pattern.conditions.SimpleCondition;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.TestLogger;
+
+import com.google.common.collect.Lists;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.flink.cep.nfa.NFATestUtilities.compareMaps;
+import static org.apache.flink.cep.nfa.NFATestUtilities.feedNFA;
+
+/**
+ * IT tests covering {@link Pattern#greedy()}.
+ */
+public class GreedyITCase extends TestLogger {
+
+   @Test
+   public void testGreedyZeroOrMore() {
+   List<StreamRecord> inputEvents = new ArrayList<>();
+
+   Event c = new Event(40, "c", 1.0);
+   Event a1 = new Event(41, "a", 2.0);
+   Event a2 = new Event(42, "a", 2.0);
+   Event a3 = new Event(43, "a", 2.0);
+   Event d = new Event(44, "d", 3.0);
+
+   inputEvents.add(new StreamRecord<>(c, 1));
+   inputEvents.add(new StreamRecord<>(a1, 2));
+   inputEvents.add(new StreamRecord<>(a2, 3));
+   inputEvents.add(new StreamRecord<>(a3, 4));
+   inputEvents.add(new StreamRecord<>(d, 5));
+
+   // c a* d
+   Pattern<Event, ?> pattern = 
Pattern.begin("start").where(new SimpleCondition() {
+   private static final long serialVersionUID = 
5726188262756267490L;
+
+   @Override
+   public boolean filter(Event value) throws Exception {
+   return value.getName().equals("c");
+   }
+   }).followedBy("middle").where(new SimpleCondition() {
+   private static final long serialVersionUID = 
5726188262756267490L;
+
+   @Override
+   public boolean filter(Event value) throws Exception {
+   return value.getName().equals("a");
+   }
+   }).oneOrMore().optional().greedy().followedBy("end").where(new 
SimpleCondition() {
+   private static final long serialVersionUID = 
5726188262756267490L;
+
+   @Override
+   public boolean filter(Event value) throws Exception {
+   return value.getName().equals("d");
+   }
+   });
+
+   NFA nfa = NFACompiler.compile(pattern, 
Event.createTypeSerializer(), false);
+
+   final List<List> resultingPatterns = 
feedNFA(inputEvents, nfa);
+
+   compareMaps(resultingPatterns, Lists.<List>newArrayList(
+   Lists.newArrayList(c, a1, a2, a3, d)
+   ));
+   }
+
+   @Test
+   public void testGreedyZeroOrMoreConsecutiveEndWithOptional() {
+   List<StreamRecord> inputEvents = new ArrayList<>();
+
+   Event c = new Event(40, "c", 1.0);
+   Event a1 = new Event(41, "a", 2.0);
+   Event a2 = new Event(42, "a", 2.0);
+   Event a3 = new Event(43, "a", 2.0);
+   Event d = new Event(44, "d", 3.0);
+
+   inputEvents.add(new Stream

[GitHub] flink pull request #4296: [FLINK-7147] [cep] Support greedy quantifier in CE...

2017-08-07 Thread dianfu
Github user dianfu commented on a diff in the pull request:

https://github.com/apache/flink/pull/4296#discussion_r131804887
  
--- Diff: 
flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/GreedyITCase.java
 ---
@@ -0,0 +1,1068 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.nfa;
+
+import org.apache.flink.cep.Event;
+import org.apache.flink.cep.nfa.compiler.NFACompiler;
+import org.apache.flink.cep.pattern.Pattern;
+import org.apache.flink.cep.pattern.conditions.SimpleCondition;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.TestLogger;
+
+import com.google.common.collect.Lists;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.flink.cep.nfa.NFATestUtilities.compareMaps;
+import static org.apache.flink.cep.nfa.NFATestUtilities.feedNFA;
+
+/**
+ * IT tests covering {@link Pattern#greedy()}.
+ */
+public class GreedyITCase extends TestLogger {
+
+   @Test
+   public void testGreedyZeroOrMore() {
+   List<StreamRecord> inputEvents = new ArrayList<>();
+
+   Event c = new Event(40, "c", 1.0);
+   Event a1 = new Event(41, "a", 2.0);
+   Event a2 = new Event(42, "a", 2.0);
+   Event a3 = new Event(43, "a", 2.0);
+   Event d = new Event(44, "d", 3.0);
+
+   inputEvents.add(new StreamRecord<>(c, 1));
+   inputEvents.add(new StreamRecord<>(a1, 2));
+   inputEvents.add(new StreamRecord<>(a2, 3));
+   inputEvents.add(new StreamRecord<>(a3, 4));
+   inputEvents.add(new StreamRecord<>(d, 5));
+
+   // c a* d
+   Pattern<Event, ?> pattern = 
Pattern.begin("start").where(new SimpleCondition() {
+   private static final long serialVersionUID = 
5726188262756267490L;
+
+   @Override
+   public boolean filter(Event value) throws Exception {
+   return value.getName().equals("c");
+   }
+   }).followedBy("middle").where(new SimpleCondition() {
+   private static final long serialVersionUID = 
5726188262756267490L;
+
+   @Override
+   public boolean filter(Event value) throws Exception {
+   return value.getName().equals("a");
+   }
+   }).oneOrMore().optional().greedy().followedBy("end").where(new 
SimpleCondition() {
+   private static final long serialVersionUID = 
5726188262756267490L;
+
+   @Override
+   public boolean filter(Event value) throws Exception {
+   return value.getName().equals("d");
+   }
+   });
+
+   NFA nfa = NFACompiler.compile(pattern, 
Event.createTypeSerializer(), false);
+
+   final List<List> resultingPatterns = 
feedNFA(inputEvents, nfa);
+
+   compareMaps(resultingPatterns, Lists.<List>newArrayList(
+   Lists.newArrayList(c, a1, a2, a3, d)
+   ));
+   }
+
+   @Test
+   public void testGreedyZeroOrMoreConsecutiveEndWithOptional() {
+   List<StreamRecord> inputEvents = new ArrayList<>();
+
+   Event c = new Event(40, "c", 1.0);
+   Event a1 = new Event(41, "a", 2.0);
+   Event a2 = new Event(42, "a", 2.0);
+   Event a3 = new Event(43, "a", 2.0);
+   Event d = new Event(44, "d", 3.0);
+
+   inputEvents.add(new Stream

[GitHub] flink pull request #4296: [FLINK-7147] [cep] Support greedy quantifier in CE...

2017-08-07 Thread dianfu
Github user dianfu commented on a diff in the pull request:

https://github.com/apache/flink/pull/4296#discussion_r131804932
  
--- Diff: 
flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/GreedyITCase.java
 ---
@@ -0,0 +1,1068 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.nfa;
+
+import org.apache.flink.cep.Event;
+import org.apache.flink.cep.nfa.compiler.NFACompiler;
+import org.apache.flink.cep.pattern.Pattern;
+import org.apache.flink.cep.pattern.conditions.SimpleCondition;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.TestLogger;
+
+import com.google.common.collect.Lists;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.flink.cep.nfa.NFATestUtilities.compareMaps;
+import static org.apache.flink.cep.nfa.NFATestUtilities.feedNFA;
+
+/**
+ * IT tests covering {@link Pattern#greedy()}.
+ */
+public class GreedyITCase extends TestLogger {
+
+   @Test
+   public void testGreedyZeroOrMore() {
+   List<StreamRecord> inputEvents = new ArrayList<>();
+
+   Event c = new Event(40, "c", 1.0);
+   Event a1 = new Event(41, "a", 2.0);
+   Event a2 = new Event(42, "a", 2.0);
+   Event a3 = new Event(43, "a", 2.0);
+   Event d = new Event(44, "d", 3.0);
+
+   inputEvents.add(new StreamRecord<>(c, 1));
+   inputEvents.add(new StreamRecord<>(a1, 2));
+   inputEvents.add(new StreamRecord<>(a2, 3));
+   inputEvents.add(new StreamRecord<>(a3, 4));
+   inputEvents.add(new StreamRecord<>(d, 5));
+
+   // c a* d
+   Pattern<Event, ?> pattern = 
Pattern.begin("start").where(new SimpleCondition() {
+   private static final long serialVersionUID = 
5726188262756267490L;
+
+   @Override
+   public boolean filter(Event value) throws Exception {
+   return value.getName().equals("c");
+   }
+   }).followedBy("middle").where(new SimpleCondition() {
+   private static final long serialVersionUID = 
5726188262756267490L;
+
+   @Override
+   public boolean filter(Event value) throws Exception {
+   return value.getName().equals("a");
+   }
+   }).oneOrMore().optional().greedy().followedBy("end").where(new 
SimpleCondition() {
+   private static final long serialVersionUID = 
5726188262756267490L;
+
+   @Override
+   public boolean filter(Event value) throws Exception {
+   return value.getName().equals("d");
+   }
+   });
+
+   NFA nfa = NFACompiler.compile(pattern, 
Event.createTypeSerializer(), false);
+
+   final List<List> resultingPatterns = 
feedNFA(inputEvents, nfa);
+
+   compareMaps(resultingPatterns, Lists.<List>newArrayList(
+   Lists.newArrayList(c, a1, a2, a3, d)
+   ));
+   }
+
+   @Test
+   public void testGreedyZeroOrMoreConsecutiveEndWithOptional() {
+   List<StreamRecord> inputEvents = new ArrayList<>();
+
+   Event c = new Event(40, "c", 1.0);
+   Event a1 = new Event(41, "a", 2.0);
+   Event a2 = new Event(42, "a", 2.0);
+   Event a3 = new Event(43, "a", 2.0);
+   Event d = new Event(44, "d", 3.0);
+
+   inputEvents.add(new Stream

[GitHub] flink pull request #4296: [FLINK-7147] [cep] Support greedy quantifier in CE...

2017-08-07 Thread dianfu
Github user dianfu commented on a diff in the pull request:

https://github.com/apache/flink/pull/4296#discussion_r131804764
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
 ---
@@ -637,9 +675,23 @@ private boolean isPatternOptional(Pattern<T, ?> 
pattern) {
untilCondition,
true);
 
-   final IterativeCondition proceedCondition = 
getTrueFunction();
+   IterativeCondition proceedCondition = 
getTrueFunction();
final State loopingState = 
createState(currentPattern.getName(), State.StateType.Normal);
-   loopingState.addProceed(sinkState, proceedCondition);
+
+   if 
(currentPattern.getQuantifier().hasProperty(Quantifier.QuantifierProperty.GREEDY))
 {
+   State sinkStateCopy = copy(sinkState);
--- End diff --

Make sense. Updated.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4296: [FLINK-7147] [cep] Support greedy quantifier in CE...

2017-08-07 Thread dianfu
Github user dianfu commented on a diff in the pull request:

https://github.com/apache/flink/pull/4296#discussion_r131804398
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
 ---
@@ -386,6 +387,19 @@ private void checkPatternNameUniqueness(final Pattern 
pattern) {
return copyOfSink;
}
 
+   private State copy(final State state) {
+   final State copyOfState = new 
State<>(state.getName(), state.getStateType());
--- End diff --

Updated.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #4418: [FLINK-7293] [cep] Support custom order by in PatternStre...

2017-08-04 Thread dianfu
Github user dianfu commented on the issue:

https://github.com/apache/flink/pull/4418
  
@kl0u Good catch. Have updated the PR according to the comments.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #4296: [FLINK-7147] [cep] Support greedy quantifier in CEP

2017-08-03 Thread dianfu
Github user dianfu commented on the issue:

https://github.com/apache/flink/pull/4296
  
@dawidwys Regarding to the times().greedy(), the result is not expected and 
have fixed the issue in the latest PR. Also updated the doc.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #4418: [FLINK-7293] [cep] Support custom order by in PatternStre...

2017-08-03 Thread dianfu
Github user dianfu commented on the issue:

https://github.com/apache/flink/pull/4418
  
@dawidwys Updated the contribution checklist.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #4418: [FLINK-7293] [cep] Support custom order by in PatternStre...

2017-08-02 Thread dianfu
Github user dianfu commented on the issue:

https://github.com/apache/flink/pull/4418
  
@dawidwys  Thanks a lot for your review. Have updated the PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4418: [FLINK-7293] [cep] Support custom order by in Patt...

2017-08-02 Thread dianfu
Github user dianfu commented on a diff in the pull request:

https://github.com/apache/flink/pull/4418#discussion_r130896153
  
--- Diff: 
flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
 ---
@@ -923,6 +934,126 @@ public boolean filter(Event value) throws Exception {
}
}
 
+   @Test
+   public void testCEPOperatorComparatorProcessTime() throws Exception {
+   Event startEvent1 = new Event(42, "start", 1.0);
+   Event startEvent2 = new Event(42, "start", 2.0);
+   SubEvent middleEvent1 = new SubEvent(42, "foo1", 3.0, 10.0);
+   SubEvent middleEvent2 = new SubEvent(42, "foo2", 4.0, 10.0);
+   Event endEvent1 = new Event(42, "end", 1.0);
+
+   Event startEventK2 = new Event(43, "start", 1.0);
+
+   KeyedCEPPatternOperator<Event, Integer> operator = 
getKeyedCepOpearatorWithComparator(true);
+   OneInputStreamOperatorTestHarness<Event, Map<String, 
List>> harness = getCepTestHarness(operator);
+
+   try {
+   harness.open();
+
+   harness.setProcessingTime(0L);
+
+   harness.processElement(new StreamRecord<>(startEvent1, 
1L));
--- End diff --

Updated


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4418: [FLINK-7293] [cep] Support custom order by in Patt...

2017-08-02 Thread dianfu
Github user dianfu commented on a diff in the pull request:

https://github.com/apache/flink/pull/4418#discussion_r130896034
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
 ---
@@ -257,7 +289,32 @@ public void onEventTime(InternalTimer<KEY, 
VoidNamespace> timer) throws Exceptio
 
@Override
public void onProcessingTime(InternalTimer<KEY, VoidNamespace> timer) 
throws Exception {
-   // not used
+   NFA nfa = getNFA();
+
+   // emit the events in order
+   for (IN event : sort(bufferedEvents.get())) {
+   processEvent(nfa, event, 
getProcessingTimeService().getCurrentProcessingTime());
+   }
+
+   // remove all buffered rows
+   bufferedEvents.clear();
+
+   updateNFA(nfa);
+   }
+
+   private Iterable sort(Iterable iter) {
+   if (comparator == null) {
+   return iter;
+   } else {
+   // insert all events into the sort buffer
+   List sortBuffer = new ArrayList<>();
--- End diff --

Good advice. Updated.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4418: [FLINK-7293] [cep] Support custom order by in Patt...

2017-08-02 Thread dianfu
Github user dianfu commented on a diff in the pull request:

https://github.com/apache/flink/pull/4418#discussion_r130896085
  
--- Diff: 
flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
 ---
@@ -923,6 +934,126 @@ public boolean filter(Event value) throws Exception {
}
}
 
+   @Test
+   public void testCEPOperatorComparatorProcessTime() throws Exception {
+   Event startEvent1 = new Event(42, "start", 1.0);
+   Event startEvent2 = new Event(42, "start", 2.0);
+   SubEvent middleEvent1 = new SubEvent(42, "foo1", 3.0, 10.0);
+   SubEvent middleEvent2 = new SubEvent(42, "foo2", 4.0, 10.0);
+   Event endEvent1 = new Event(42, "end", 1.0);
+
+   Event startEventK2 = new Event(43, "start", 1.0);
+
+   KeyedCEPPatternOperator<Event, Integer> operator = 
getKeyedCepOpearatorWithComparator(true);
+   OneInputStreamOperatorTestHarness<Event, Map<String, 
List>> harness = getCepTestHarness(operator);
+
+   try {
+   harness.open();
+
+   harness.setProcessingTime(0L);
+
+   harness.processElement(new StreamRecord<>(startEvent1, 
1L));
--- End diff --

Updated.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4418: [FLINK-7293] [cep] Support custom order by in Patt...

2017-08-02 Thread dianfu
Github user dianfu commented on a diff in the pull request:

https://github.com/apache/flink/pull/4418#discussion_r130895852
  
--- Diff: 
flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/CEP.scala
 ---
@@ -36,8 +38,26 @@ object CEP {
 * @tparam T Type of the input events
 * @return Resulting pattern stream
 */
-  def pattern[T](input: DataStream[T], pattern: Pattern[T, _]): 
PatternStream[T] = {
+  def pattern[T](input: DataStream[T], pattern: Pattern[T, _ <: T]): 
PatternStream[T] = {
 wrapPatternStream(JCEP.pattern(input.javaStream, 
pattern.wrappedPattern))
   }
+
+  /**
+* Transforms a [[DataStream]] into a [[PatternStream]] in the Scala 
API.
+* See [[org.apache.flink.cep.CEP}]]for a more detailed description how 
the underlying
+* Java API works.
+*
+* @param input  DataStream containing the input events
+* @param patternPattern specification which shall be detected
+* @param comparator Comparator to sort events
--- End diff --

Updated the doc.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4418: [FLINK-7293] [cep] Support custom order by in Patt...

2017-08-02 Thread dianfu
Github user dianfu commented on a diff in the pull request:

https://github.com/apache/flink/pull/4418#discussion_r130895782
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/CEP.java ---
@@ -38,4 +40,17 @@
public static  PatternStream pattern(DataStream input, 
Pattern<T, ?> pattern) {
return new PatternStream<>(input, pattern);
}
+
+   /**
+* Creates a {@link PatternStream} from an input data stream and a 
pattern.
+*
+* @param input DataStream containing the input events
+* @param pattern Pattern specification which shall be detected
+* @param comparator Comparator to sort events
--- End diff --

Updated the doc.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4296: [FLINK-7147] [cep] Support greedy quantifier in CE...

2017-08-02 Thread dianfu
Github user dianfu commented on a diff in the pull request:

https://github.com/apache/flink/pull/4296#discussion_r130883414
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
 ---
@@ -657,25 +663,34 @@ private boolean isPatternOptional(Pattern<T, ?> 
pattern) {
true);
 
IterativeCondition proceedCondition = 
getTrueFunction();
-   if (currentPattern.getQuantifier().isGreedy()) {
-   proceedCondition = 
getGreedyCondition(proceedCondition, Lists.newArrayList(takeCondition));
+   if 
(currentPattern.getQuantifier().hasProperty(Quantifier.QuantifierProperty.GREEDY))
 {
+   proceedCondition = getGreedyCondition(
+   proceedCondition,
+   takeCondition,
+   ignoreCondition,
+   followingTakeCondition);;
}
final State loopingState = 
createState(currentPattern.getName(), State.StateType.Normal,
-   currentPattern.getQuantifier().isGreedy());
+   
currentPattern.getQuantifier().hasProperty(Quantifier.QuantifierProperty.GREEDY));
loopingState.addProceed(sinkState, proceedCondition);
loopingState.addTake(takeCondition);
 
addStopStateToLooping(loopingState);
 
if (ignoreCondition != null) {
final State ignoreState = 
createState(currentPattern.getName(), State.StateType.Normal,
-   
currentPattern.getQuantifier().isGreedy());
+   
currentPattern.getQuantifier().hasProperty(Quantifier.QuantifierProperty.GREEDY));
ignoreState.addTake(loopingState, 
takeCondition);
ignoreState.addIgnore(ignoreCondition);
loopingState.addIgnore(ignoreState, 
ignoreCondition);
+   if 
(currentPattern.getQuantifier().hasProperty(Quantifier.QuantifierProperty.GREEDY))
 {
--- End diff --

@dawidwys  Good advice. Thanks a lot :). I have updated the PR per the 
solution you suggested.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4373: [FLINK-6429] [table] Bump up Calcite version to 1....

2017-08-02 Thread dianfu
Github user dianfu commented on a diff in the pull request:

https://github.com/apache/flink/pull/4373#discussion_r130829080
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala
 ---
@@ -55,12 +55,13 @@ class JoinTest extends TableTestBase {
   unaryNode(
 "DataStreamCalc",
 streamTableNode(1),
-term("select", "a", "b", "proctime")
+term("select", "a", "b", "-(proctime, 360) AS -",
--- End diff --

I think this issue may be caused by CALCITE-1753, still investigating the 
changes in that JIRA. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4373: [FLINK-6429] [table] Bump up Calcite version to 1....

2017-08-02 Thread dianfu
Github user dianfu commented on a diff in the pull request:

https://github.com/apache/flink/pull/4373#discussion_r130801549
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala
 ---
@@ -55,12 +55,13 @@ class JoinTest extends TableTestBase {
   unaryNode(
 "DataStreamCalc",
 streamTableNode(1),
-term("select", "a", "b", "proctime")
+term("select", "a", "b", "-(proctime, 360) AS -",
--- End diff --

In the original test, the calculation of **proctime** is located when doing 
**Join**. While currently, it's pushed down to the **DataStremaCalc**. It seems 
that this is not correct?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #4373: [FLINK-6429] [table] Bump up Calcite version to 1.13.

2017-08-01 Thread dianfu
Github user dianfu commented on the issue:

https://github.com/apache/flink/pull/4373
  
I have did some investigation of the test failure of 
**JoinITCase.testJoinWithExpressionPreds** and would like to share my findings 
and solutions for your reference. The cause of this issue is that for preserved 
expressions, in **PushProjector#createProjectRefsAndExprs**, the column names 
corresponding to them will be the operator names of the expressions. For 
example for expression **a - 1** in the test case, the  column corresponding to 
it will be **-**. I think this behavior is not expected and have copied 
**PushProjector** from calcite and made some changes to it (line 507). Please 
refer to 
[here](https://github.com/dianfu/flink/commit/efa9641e0bd395a3679b0d496b60e3d42aa7b832)
 for more information.

For the files copied from calcite, all files can be removed except 
**SqlTimestampAddFunction** and **AuxiliaryConverter**.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4331: [FLINK-7169][CEP] Support AFTER MATCH SKIP functio...

2017-07-30 Thread dianfu
Github user dianfu commented on a diff in the pull request:

https://github.com/apache/flink/pull/4331#discussion_r130264164
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---
@@ -551,18 +579,47 @@ private boolean isSelfIgnore(final StateTransition 
edge) {
nextVersion,
startTimestamp);
}
+
+   switch (skipStrategy.getStrategy()) {
+   case SKIP_PAST_LAST_EVENT:
+   if 
(nextState.isFinal()) {
--- End diff --

Should also consider the situation **Proceed to Final state**.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4331: [FLINK-7169][CEP] Support AFTER MATCH SKIP functio...

2017-07-30 Thread dianfu
Github user dianfu commented on a diff in the pull request:

https://github.com/apache/flink/pull/4331#discussion_r130266394
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---
@@ -551,18 +579,47 @@ private boolean isSelfIgnore(final StateTransition 
edge) {
nextVersion,
startTimestamp);
}
+
+   switch (skipStrategy.getStrategy()) {
+   case SKIP_PAST_LAST_EVENT:
+   if 
(nextState.isFinal()) {
+   
resultingComputationStates.add(createStartComputationState(computationState, 
event));
+   }
+   break;
+   case SKIP_TO_FIRST:
+   if 
(nextState.getName().equals(skipStrategy.getPatternName()) &&
+   
!nextState.getName().equals(currentState.getName())) {
+   
ComputationState startComputationState = 
createStartComputationState(computationState, event);
+   if (callLevel > 
0) {
+   throw 
new RuntimeException("infinite loop! Will abort the match process, please 
rewrite your pattern query");
+   }
+   // feed current 
matched event to the state.
+   
Collection<ComputationState> computationStates = 
computeNextStates(startComputationState, event, timestamp, callLevel++);
+   
resultingComputationStates.addAll(computationStates);
+   } else if 
(previousState == null && 
currentState.getName().equals(skipStrategy.getPatternName())) {
+   throw new 
RuntimeException("infinite loop! Will abort the match process, please rewrite 
your pattern query");
+   }
+   break;
+   case SKIP_TO_LAST:
+   if 
(currentState.getName().equals(skipStrategy.getPatternName()) &&
+   
!nextState.getName().equals(currentState.getName())) {
+   
ComputationState startComputationState = 
createStartComputationState(computationState, event);
+   if (callLevel > 
0) {
+   throw 
new RuntimeException("infinite loop! Will abort the match process, please 
rewrite your pattern query");
+   }
+   // feed current 
matched event to the state.
+   
Collection<ComputationState> computationStates = 
computeNextStates(startComputationState, event, timestamp, callLevel++);
+   
resultingComputationStates.addAll(computationStates);
+   }
+   break;
+   }
break;
}
}
 
-   if (computationState.isStartState()) {
-   int totalBranches = calculateIncreasingSelfState(
-   outgoingEdges.getTotalIgnoreBranches(),
-   outgoingEdges.getTotalTakeBranches());
-
-   DeweyNumber startVersion = 
computationState.getVersion().increase(totalBranches);
-   ComputationState startState = 
ComputationState.createStartState(this, computationState.getState(), 
startVersion);
-   resultingComputationStates.add(startState);
+   if (computationState.isStartState() &&
+   skipStrategy.getStrategy(

[GitHub] flink pull request #4331: [FLINK-7169][CEP] Support AFTER MATCH SKIP functio...

2017-07-30 Thread dianfu
Github user dianfu commented on a diff in the pull request:

https://github.com/apache/flink/pull/4331#discussion_r130264936
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---
@@ -551,18 +579,47 @@ private boolean isSelfIgnore(final StateTransition 
edge) {
nextVersion,
startTimestamp);
}
+
+   switch (skipStrategy.getStrategy()) {
+   case SKIP_PAST_LAST_EVENT:
+   if 
(nextState.isFinal()) {
+   
resultingComputationStates.add(createStartComputationState(computationState, 
event));
+   }
+   break;
+   case SKIP_TO_FIRST:
+   if 
(nextState.getName().equals(skipStrategy.getPatternName()) &&
--- End diff --

Should use NFAStateNameHandler.getOriginalNameFromInternal() to compare 
state name.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4331: [FLINK-7169][CEP] Support AFTER MATCH SKIP functio...

2017-07-30 Thread dianfu
Github user dianfu commented on a diff in the pull request:

https://github.com/apache/flink/pull/4331#discussion_r130265354
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---
@@ -551,18 +579,47 @@ private boolean isSelfIgnore(final StateTransition 
edge) {
nextVersion,
startTimestamp);
}
+
+   switch (skipStrategy.getStrategy()) {
+   case SKIP_PAST_LAST_EVENT:
+   if 
(nextState.isFinal()) {
+   
resultingComputationStates.add(createStartComputationState(computationState, 
event));
+   }
+   break;
+   case SKIP_TO_FIRST:
+   if 
(nextState.getName().equals(skipStrategy.getPatternName()) &&
+   
!nextState.getName().equals(currentState.getName())) {
+   
ComputationState startComputationState = 
createStartComputationState(computationState, event);
+   if (callLevel > 
0) {
--- End diff --

Why need the callLevel?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4418: [FLINK-7293] [cep] Support custom order by in Patt...

2017-07-28 Thread dianfu
GitHub user dianfu opened a pull request:

https://github.com/apache/flink/pull/4418

[FLINK-7293] [cep] Support custom order by in PatternStream

*Thank you very much for contributing to Apache Flink - we are happy that 
you want to help us improve Flink. To help the community review your 
contribution in the best possible way, please go through the checklist below, 
which will get the contribution into a shape in which it can be best reviewed.*

*Please understand that we do not do this to make contributions to Flink a 
hassle. In order to uphold a high standard of quality for code contributions, 
while at the same time managing a large number of contributions, we need 
contributors to prepare the contributions well, and give reviewers enough 
contextual information for the review. Please also understand that 
contributions that do not follow this guide will take longer to review and thus 
typically be picked up with lower priority by the community.*

## Contribution Checklist

  - Make sure that the pull request corresponds to a [JIRA 
issue](https://issues.apache.org/jira/projects/FLINK/issues). Exceptions are 
made for typos in JavaDoc or documentation files, which need no JIRA issue.
  
  - Name the pull request in the form "[FLINK-1234] [component] Title of 
the pull request", where *FLINK-1234* should be replaced by the actual issue 
number. Skip *component* if you are unsure about which is the best component.
  Typo fixes that have no associated JIRA issue should be named following 
this pattern: `[hotfix] [docs] Fix typo in event time introduction` or 
`[hotfix] [javadocs] Expand JavaDoc for PuncuatedWatermarkGenerator`.

  - Fill out the template below to describe the changes contributed by the 
pull request. That will give reviewers the context they need to do the review.
  
  - Make sure that the change passes the automated tests, i.e., `mvn clean 
verify` passes. You can set up Travis CI to do that following [this 
guide](http://flink.apache.org/contribute-code.html#best-practices).

  - Each pull request should address only one issue, not mix up code from 
multiple issues.
  
  - Each commit in the pull request has a meaningful commit message 
(including the JIRA id)

  - Once all items of the checklist are addressed, remove the above text 
and this checklist, leaving only the filled out template below.


**(The sections below can be removed for hotfixes of typos)**

## What is the purpose of the change

*(For example: This pull request makes task deployment go through the blob 
server, rather than through RPC. That way we avoid re-transferring them on each 
deployment (during recovery).)*


## Brief change log

*(for example:)*
  - *The TaskInfo is stored in the blob store on job creation time as a 
persistent artifact*
  - *Deployments RPC transmits only the blob storage reference*
  - *TaskManagers retrieve the TaskInfo from the blob cache*


## Verifying this change

*(Please pick either of the following options)*

This change is a trivial rework / code cleanup without any test coverage.

*(or)*

This change is already covered by existing tests, such as *(please describe 
tests)*.

*(or)*

This change added tests and can be verified as follows:

*(example:)*
  - *Added integration tests for end-to-end deployment with large payloads 
(100MB)*
  - *Extended integration test for recovery after master (JobManager) 
failure*
  - *Added test that validates that TaskInfo is transferred only once 
across recoveries*
  - *Manually verified the change by running a 4 node cluser with 2 
JobManagers and 4 TaskManagers, a stateful streaming program, and killing one 
JobManager and two TaskManagers during the execution, verifying that recovery 
happens correctly.*

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / no)
  - The serializers: (yes / no / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / no / 
don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / no / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / no)
  - If yes, how is the feature documented? (not applicable / docs / 
JavaDocs / not documented)



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/dianfu/flink support_comparator

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4418.patch

[GitHub] flink issue #4331: [FLINK-7169][CEP] Support AFTER MATCH SKIP function in CE...

2017-07-27 Thread dianfu
Github user dianfu commented on the issue:

https://github.com/apache/flink/pull/4331
  
Sorry for late response. I think this feature is very useful and agree that 
we should have a clear thought on what things should be for each skip strategy. 
I noticed that there are already some discussions in FLINK-3703 which we can 
refer. I will take a look at this PR and also FLINK-3703 these two days and 
will post my thought.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #4296: [FLINK-7147] [cep] Support greedy quantifier in CEP

2017-07-27 Thread dianfu
Github user dianfu commented on the issue:

https://github.com/apache/flink/pull/4296
  
@dawidwys Thanks a lot for the review. I have updated the patch. Currently, 
there is something wrong when the greedy state is followed by an optional 
state. This can be covered by test case 
GreedyITCase.testGreedyZeroOrMoreBeforeOptional2 (duplicate results will be 
got). Solutions from my mind are removing the duplicate results before 
returning the results in NFA or disabling this case for the time being. What's 
your thought? Do you have any suggestions to this problem?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4296: [FLINK-7147] [cep] Support greedy quantifier in CE...

2017-07-27 Thread dianfu
Github user dianfu commented on a diff in the pull request:

https://github.com/apache/flink/pull/4296#discussion_r129832284
  
--- Diff: 
flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/GreedyITCase.java
 ---
@@ -0,0 +1,290 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.nfa;
+
+import org.apache.flink.cep.Event;
+import org.apache.flink.cep.nfa.compiler.NFACompiler;
+import org.apache.flink.cep.pattern.Pattern;
+import org.apache.flink.cep.pattern.conditions.SimpleCondition;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.TestLogger;
+
+import com.google.common.collect.Lists;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.flink.cep.nfa.NFATestUtilities.compareMaps;
+import static org.apache.flink.cep.nfa.NFATestUtilities.feedNFA;
+
+/**
+ * IT tests covering {@link Pattern#greedy()}.
+ */
+public class GreedyITCase extends TestLogger {
+
+   @Test
+   public void testGreedyFollowedBy() {
+   List<StreamRecord> inputEvents = new ArrayList<>();
+
+   Event c = new Event(40, "c", 1.0);
+   Event a1 = new Event(41, "a", 2.0);
+   Event a2 = new Event(42, "a", 2.0);
+   Event a3 = new Event(43, "a", 2.0);
+   Event d = new Event(44, "d", 3.0);
+
+   inputEvents.add(new StreamRecord<>(c, 1));
+   inputEvents.add(new StreamRecord<>(a1, 2));
+   inputEvents.add(new StreamRecord<>(a2, 3));
+   inputEvents.add(new StreamRecord<>(a3, 4));
+   inputEvents.add(new StreamRecord<>(d, 5));
+
+   // c a* d
+   Pattern<Event, ?> pattern = 
Pattern.begin("start").where(new SimpleCondition() {
+   private static final long serialVersionUID = 
5726188262756267490L;
+
+   @Override
+   public boolean filter(Event value) throws Exception {
+   return value.getName().equals("c");
+   }
+   }).followedBy("middle").where(new SimpleCondition() {
+   private static final long serialVersionUID = 
5726188262756267490L;
+
+   @Override
+   public boolean filter(Event value) throws Exception {
+   return value.getName().equals("a");
+   }
+   }).oneOrMore().optional().greedy().followedBy("end").where(new 
SimpleCondition() {
+   private static final long serialVersionUID = 
5726188262756267490L;
+
+   @Override
+   public boolean filter(Event value) throws Exception {
+   return value.getName().equals("d");
+   }
+   });
+
+   NFA nfa = NFACompiler.compile(pattern, 
Event.createTypeSerializer(), false);
+
+   final List<List> resultingPatterns = 
feedNFA(inputEvents, nfa);
+
+   compareMaps(resultingPatterns, Lists.<List>newArrayList(
+   Lists.newArrayList(c, a1, a2, a3, d)
+   ));
+   }
+
+   @Test
+   public void testGreedyUtil() {
+   List<StreamRecord> inputEvents = new ArrayList<>();
+
+   Event c = new Event(40, "c", 1.0);
+   Event a1 = new Event(41, "a", 2.0);
+   Event a2 = new Event(42, "a", 3.0);
+   Event a3 = new Event(43, "a", 4.0);
+   Event d = new Event(44, "d", 3.0);
+
+   inputEvents.add(new StreamRecord<>(c, 1));
+   

[GitHub] flink pull request #4296: [FLINK-7147] [cep] Support greedy quantifier in CE...

2017-07-27 Thread dianfu
Github user dianfu commented on a diff in the pull request:

https://github.com/apache/flink/pull/4296#discussion_r129830324
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java
 ---
@@ -105,6 +107,14 @@ public void optional() {
properties.add(Quantifier.QuantifierProperty.OPTIONAL);
}
 
+   public void greedy() {
+   greedy = true;
--- End diff --

Make sense. Updated.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4296: [FLINK-7147] [cep] Support greedy quantifier in CE...

2017-07-27 Thread dianfu
Github user dianfu commented on a diff in the pull request:

https://github.com/apache/flink/pull/4296#discussion_r129830367
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
 ---
@@ -492,4 +506,10 @@ private void checkIfQuantifierApplied() {
"Current quantifier is: " + quantifier);
}
}
+
+   private void checkIfNoFollowedByAny() {
+   if (quantifier.getConsumingStrategy() == 
ConsumingStrategy.SKIP_TILL_ANY) {
--- End diff --

Make sense. Updated.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #4296: [FLINK-7147] [cep] Support greedy quantifier in CEP

2017-07-13 Thread dianfu
Github user dianfu commented on the issue:

https://github.com/apache/flink/pull/4296
  
@dawidwys OK. Have a good time. :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #4296: [FLINK-7147] [cep] Support greedy quantifier in CEP

2017-07-13 Thread dianfu
Github user dianfu commented on the issue:

https://github.com/apache/flink/pull/4296
  
@dawidwys Could you help to take a look at this PR? Thanks a lot in advance.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4318: [FLINK-7170] [cep] Fix until condition when the co...

2017-07-13 Thread dianfu
GitHub user dianfu opened a pull request:

https://github.com/apache/flink/pull/4318

[FLINK-7170] [cep] Fix until condition when the contiguity is strict

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [ ] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [ ] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [ ] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/dianfu/flink fix-until-condition

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4318.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4318


commit 605311b8db8669ab1086eebc953892f90eb27ffa
Author: Dian Fu <fudian...@alibaba-inc.com>
Date:   2017-07-13T09:21:30Z

[FLINK-7170] [cep] Fix until condition when the contiguity is strict




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4296: [FLINK-7147] [cep] Support greedy quantifier in CE...

2017-07-11 Thread dianfu
GitHub user dianfu opened a pull request:

https://github.com/apache/flink/pull/4296

[FLINK-7147] [cep] Support greedy quantifier in CEP

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [ ] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [ ] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [ ] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/dianfu/flink greedy_quantifier

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4296.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4296


commit 647d4c58309990d9fc924bc4343ef811dacf4ef1
Author: Dian Fu <fudian...@alibaba-inc.com>
Date:   2017-07-11T08:03:42Z

[FLINK-7147] [cep] Support greedy quantifier in CEP




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #4153: [FLINK-6927] [cep] Support pattern group in CEP

2017-07-03 Thread dianfu
Github user dianfu commented on the issue:

https://github.com/apache/flink/pull/4153
  
@dawidwys Thanks a lot for the review. Updated the doc.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #4153: [FLINK-6927] [cep] Support pattern group in CEP

2017-07-02 Thread dianfu
Github user dianfu commented on the issue:

https://github.com/apache/flink/pull/4153
  
@dawidwys thanks a lot for your comments. Have updated the PR and it should 
have addressed all the comments. :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4153: [FLINK-6927] [cep] Support pattern group in CEP

2017-07-02 Thread dianfu
Github user dianfu commented on a diff in the pull request:

https://github.com/apache/flink/pull/4153#discussion_r125178823
  
--- Diff: 
flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/GroupITCase.java
 ---
@@ -0,0 +1,807 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.nfa;
+
+import org.apache.flink.cep.Event;
+import org.apache.flink.cep.nfa.compiler.NFACompiler;
+import org.apache.flink.cep.pattern.GroupPattern;
+import org.apache.flink.cep.pattern.Pattern;
+import org.apache.flink.cep.pattern.conditions.SimpleCondition;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.TestLogger;
+
+import com.google.common.collect.Lists;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.flink.cep.nfa.NFATestUtilities.compareMaps;
+import static org.apache.flink.cep.nfa.NFATestUtilities.feedNFA;
+
+/**
+ * IT tests covering {@link GroupPattern}.
+ */
+@SuppressWarnings("unchecked")
+public class GroupITCase extends TestLogger {
--- End diff --

updated.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4153: [FLINK-6927] [cep] Support pattern group in CEP

2017-07-02 Thread dianfu
Github user dianfu commented on a diff in the pull request:

https://github.com/apache/flink/pull/4153#discussion_r125176853
  
--- Diff: 
flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/GroupITCase.java
 ---
@@ -0,0 +1,807 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.nfa;
+
+import org.apache.flink.cep.Event;
+import org.apache.flink.cep.nfa.compiler.NFACompiler;
+import org.apache.flink.cep.pattern.GroupPattern;
+import org.apache.flink.cep.pattern.Pattern;
+import org.apache.flink.cep.pattern.conditions.SimpleCondition;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.TestLogger;
+
+import com.google.common.collect.Lists;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.flink.cep.nfa.NFATestUtilities.compareMaps;
+import static org.apache.flink.cep.nfa.NFATestUtilities.feedNFA;
+
+/**
+ * IT tests covering {@link GroupPattern}.
+ */
+@SuppressWarnings("unchecked")
+public class GroupITCase extends TestLogger {
+
+   @Test
+   public void testGroupFollowedBy() {
+   List<StreamRecord> inputEvents = new ArrayList<>();
+
+   Event c = new Event(40, "c", 1.0);
+   Event a1 = new Event(41, "a", 2.0);
+   Event b1 = new Event(42, "b", 3.0);
+   Event a2 = new Event(43, "a", 4.0);
+   Event b2 = new Event(44, "b", 5.0);
+   Event d = new Event(45, "d", 6.0);
+
+   inputEvents.add(new StreamRecord<>(c, 1));
+   inputEvents.add(new StreamRecord<>(a1, 2));
+   inputEvents.add(new StreamRecord<>(b1, 3));
+   inputEvents.add(new StreamRecord<>(a2, 4));
+   inputEvents.add(new StreamRecord<>(b2, 5));
+   inputEvents.add(new StreamRecord<>(d, 6));
+
+   Pattern<Event, ?> pattern = 
Pattern.begin("start").where(new SimpleCondition() {
+   private static final long serialVersionUID = 
5726188262756267490L;
+
+   @Override
+   public boolean filter(Event value) throws Exception {
+   return value.getName().equals("c");
+   }
+   }).followedBy(Pattern.begin("middle1").where(new 
SimpleCondition() {
+   private static final long serialVersionUID = 
5726188262756267490L;
+
+   @Override
+   public boolean filter(Event value) throws Exception {
+   return value.getName().equals("a");
+   }
+   }).followedBy("middle2").where(new SimpleCondition() {
+   private static final long serialVersionUID = 
5726188262756267490L;
+
+   @Override
+   public boolean filter(Event value) throws Exception {
+   return value.getName().equals("b");
+   }
+   })).times(2).followedBy("end").where(new 
SimpleCondition() {
+   private static final long serialVersionUID = 
5726188262756267490L;
+
+   @Override
+   public boolean filter(Event value) throws Exception {
+   return value.getName().equals("d");
+   }
+   });
+
+   NFA nfa = NFACompiler.compile(pattern, 
Event.createTypeSerializer(), false);
+
+   final List<List> resultingPatterns = 
feedNFA(inputEvents, nfa);
+
+   compareMaps(resultingPatterns, Lists.<List>newArrayList(
+   Lists.newArrayList(c, a1, b1,

[GitHub] flink pull request #4153: [FLINK-6927] [cep] Support pattern group in CEP

2017-07-02 Thread dianfu
Github user dianfu commented on a diff in the pull request:

https://github.com/apache/flink/pull/4153#discussion_r125176801
  
--- Diff: 
flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/GroupITCase.java
 ---
@@ -0,0 +1,807 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.nfa;
+
+import org.apache.flink.cep.Event;
+import org.apache.flink.cep.nfa.compiler.NFACompiler;
+import org.apache.flink.cep.pattern.GroupPattern;
+import org.apache.flink.cep.pattern.Pattern;
+import org.apache.flink.cep.pattern.conditions.SimpleCondition;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.TestLogger;
+
+import com.google.common.collect.Lists;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.flink.cep.nfa.NFATestUtilities.compareMaps;
+import static org.apache.flink.cep.nfa.NFATestUtilities.feedNFA;
+
+/**
+ * IT tests covering {@link GroupPattern}.
+ */
+@SuppressWarnings("unchecked")
+public class GroupITCase extends TestLogger {
+
+   @Test
+   public void testGroupFollowedBy() {
+   List<StreamRecord> inputEvents = new ArrayList<>();
+
+   Event c = new Event(40, "c", 1.0);
+   Event a1 = new Event(41, "a", 2.0);
+   Event b1 = new Event(42, "b", 3.0);
+   Event a2 = new Event(43, "a", 4.0);
+   Event b2 = new Event(44, "b", 5.0);
+   Event d = new Event(45, "d", 6.0);
+
+   inputEvents.add(new StreamRecord<>(c, 1));
+   inputEvents.add(new StreamRecord<>(a1, 2));
+   inputEvents.add(new StreamRecord<>(b1, 3));
+   inputEvents.add(new StreamRecord<>(a2, 4));
+   inputEvents.add(new StreamRecord<>(b2, 5));
+   inputEvents.add(new StreamRecord<>(d, 6));
+
+   Pattern<Event, ?> pattern = 
Pattern.begin("start").where(new SimpleCondition() {
+   private static final long serialVersionUID = 
5726188262756267490L;
+
+   @Override
+   public boolean filter(Event value) throws Exception {
+   return value.getName().equals("c");
+   }
+   }).followedBy(Pattern.begin("middle1").where(new 
SimpleCondition() {
+   private static final long serialVersionUID = 
5726188262756267490L;
+
+   @Override
+   public boolean filter(Event value) throws Exception {
+   return value.getName().equals("a");
+   }
+   }).followedBy("middle2").where(new SimpleCondition() {
+   private static final long serialVersionUID = 
5726188262756267490L;
+
+   @Override
+   public boolean filter(Event value) throws Exception {
+   return value.getName().equals("b");
+   }
+   })).times(2).followedBy("end").where(new 
SimpleCondition() {
+   private static final long serialVersionUID = 
5726188262756267490L;
+
+   @Override
+   public boolean filter(Event value) throws Exception {
+   return value.getName().equals("d");
+   }
+   });
+
+   NFA nfa = NFACompiler.compile(pattern, 
Event.createTypeSerializer(), false);
+
+   final List<List> resultingPatterns = 
feedNFA(inputEvents, nfa);
+
+   compareMaps(resultingPatterns, Lists.<List>newArrayList(
+   Lists.newArrayList(c, a1, b1,

[GitHub] flink pull request #4153: [FLINK-6927] [cep] Support pattern group in CEP

2017-07-02 Thread dianfu
Github user dianfu commented on a diff in the pull request:

https://github.com/apache/flink/pull/4153#discussion_r125176457
  
--- Diff: 
flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/GroupITCase.java
 ---
@@ -0,0 +1,807 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.nfa;
+
+import org.apache.flink.cep.Event;
+import org.apache.flink.cep.nfa.compiler.NFACompiler;
+import org.apache.flink.cep.pattern.GroupPattern;
+import org.apache.flink.cep.pattern.Pattern;
+import org.apache.flink.cep.pattern.conditions.SimpleCondition;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.TestLogger;
+
+import com.google.common.collect.Lists;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.flink.cep.nfa.NFATestUtilities.compareMaps;
+import static org.apache.flink.cep.nfa.NFATestUtilities.feedNFA;
+
+/**
+ * IT tests covering {@link GroupPattern}.
+ */
+@SuppressWarnings("unchecked")
+public class GroupITCase extends TestLogger {
+
+   @Test
+   public void testGroupFollowedBy() {
+   List<StreamRecord> inputEvents = new ArrayList<>();
+
+   Event c = new Event(40, "c", 1.0);
+   Event a1 = new Event(41, "a", 2.0);
+   Event b1 = new Event(42, "b", 3.0);
+   Event a2 = new Event(43, "a", 4.0);
+   Event b2 = new Event(44, "b", 5.0);
+   Event d = new Event(45, "d", 6.0);
+
+   inputEvents.add(new StreamRecord<>(c, 1));
+   inputEvents.add(new StreamRecord<>(a1, 2));
+   inputEvents.add(new StreamRecord<>(b1, 3));
+   inputEvents.add(new StreamRecord<>(a2, 4));
+   inputEvents.add(new StreamRecord<>(b2, 5));
+   inputEvents.add(new StreamRecord<>(d, 6));
+
+   Pattern<Event, ?> pattern = 
Pattern.begin("start").where(new SimpleCondition() {
+   private static final long serialVersionUID = 
5726188262756267490L;
+
+   @Override
+   public boolean filter(Event value) throws Exception {
+   return value.getName().equals("c");
+   }
+   }).followedBy(Pattern.begin("middle1").where(new 
SimpleCondition() {
+   private static final long serialVersionUID = 
5726188262756267490L;
+
+   @Override
+   public boolean filter(Event value) throws Exception {
+   return value.getName().equals("a");
+   }
+   }).followedBy("middle2").where(new SimpleCondition() {
+   private static final long serialVersionUID = 
5726188262756267490L;
+
+   @Override
+   public boolean filter(Event value) throws Exception {
+   return value.getName().equals("b");
+   }
+   })).times(2).followedBy("end").where(new 
SimpleCondition() {
+   private static final long serialVersionUID = 
5726188262756267490L;
+
+   @Override
+   public boolean filter(Event value) throws Exception {
+   return value.getName().equals("d");
+   }
+   });
+
+   NFA nfa = NFACompiler.compile(pattern, 
Event.createTypeSerializer(), false);
+
+   final List<List> resultingPatterns = 
feedNFA(inputEvents, nfa);
+
+   compareMaps(resultingPatterns, Lists.<List>newArrayList(
+   Lists.newArrayList(c, a1, b1,

[GitHub] flink pull request #4153: [FLINK-6927] [cep] Support pattern group in CEP

2017-07-02 Thread dianfu
Github user dianfu commented on a diff in the pull request:

https://github.com/apache/flink/pull/4153#discussion_r125176383
  
--- Diff: 
flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/GroupITCase.java
 ---
@@ -0,0 +1,807 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.nfa;
+
+import org.apache.flink.cep.Event;
+import org.apache.flink.cep.nfa.compiler.NFACompiler;
+import org.apache.flink.cep.pattern.GroupPattern;
+import org.apache.flink.cep.pattern.Pattern;
+import org.apache.flink.cep.pattern.conditions.SimpleCondition;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.TestLogger;
+
+import com.google.common.collect.Lists;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.flink.cep.nfa.NFATestUtilities.compareMaps;
+import static org.apache.flink.cep.nfa.NFATestUtilities.feedNFA;
+
+/**
+ * IT tests covering {@link GroupPattern}.
+ */
+@SuppressWarnings("unchecked")
+public class GroupITCase extends TestLogger {
+
+   @Test
+   public void testGroupFollowedBy() {
+   List<StreamRecord> inputEvents = new ArrayList<>();
+
+   Event c = new Event(40, "c", 1.0);
+   Event a1 = new Event(41, "a", 2.0);
+   Event b1 = new Event(42, "b", 3.0);
+   Event a2 = new Event(43, "a", 4.0);
+   Event b2 = new Event(44, "b", 5.0);
+   Event d = new Event(45, "d", 6.0);
+
+   inputEvents.add(new StreamRecord<>(c, 1));
+   inputEvents.add(new StreamRecord<>(a1, 2));
+   inputEvents.add(new StreamRecord<>(b1, 3));
+   inputEvents.add(new StreamRecord<>(a2, 4));
+   inputEvents.add(new StreamRecord<>(b2, 5));
+   inputEvents.add(new StreamRecord<>(d, 6));
+
+   Pattern<Event, ?> pattern = 
Pattern.begin("start").where(new SimpleCondition() {
+   private static final long serialVersionUID = 
5726188262756267490L;
+
+   @Override
+   public boolean filter(Event value) throws Exception {
+   return value.getName().equals("c");
+   }
+   }).followedBy(Pattern.begin("middle1").where(new 
SimpleCondition() {
+   private static final long serialVersionUID = 
5726188262756267490L;
+
+   @Override
+   public boolean filter(Event value) throws Exception {
+   return value.getName().equals("a");
+   }
+   }).followedBy("middle2").where(new SimpleCondition() {
+   private static final long serialVersionUID = 
5726188262756267490L;
+
+   @Override
+   public boolean filter(Event value) throws Exception {
+   return value.getName().equals("b");
+   }
+   })).times(2).followedBy("end").where(new 
SimpleCondition() {
+   private static final long serialVersionUID = 
5726188262756267490L;
+
+   @Override
+   public boolean filter(Event value) throws Exception {
+   return value.getName().equals("d");
+   }
+   });
+
+   NFA nfa = NFACompiler.compile(pattern, 
Event.createTypeSerializer(), false);
+
+   final List<List> resultingPatterns = 
feedNFA(inputEvents, nfa);
+
+   compareMaps(resultingPatterns, Lists.<List>newArrayList(
+   Lists.newArrayList(c, a1, b1,

[GitHub] flink pull request #4153: [FLINK-6927] [cep] Support pattern group in CEP

2017-07-02 Thread dianfu
Github user dianfu commented on a diff in the pull request:

https://github.com/apache/flink/pull/4153#discussion_r125176384
  
--- Diff: 
flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/GroupITCase.java
 ---
@@ -0,0 +1,807 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.nfa;
+
+import org.apache.flink.cep.Event;
+import org.apache.flink.cep.nfa.compiler.NFACompiler;
+import org.apache.flink.cep.pattern.GroupPattern;
+import org.apache.flink.cep.pattern.Pattern;
+import org.apache.flink.cep.pattern.conditions.SimpleCondition;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.TestLogger;
+
+import com.google.common.collect.Lists;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.flink.cep.nfa.NFATestUtilities.compareMaps;
+import static org.apache.flink.cep.nfa.NFATestUtilities.feedNFA;
+
+/**
+ * IT tests covering {@link GroupPattern}.
+ */
+@SuppressWarnings("unchecked")
+public class GroupITCase extends TestLogger {
+
+   @Test
+   public void testGroupFollowedBy() {
--- End diff --

done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4153: [FLINK-6927] [cep] Support pattern group in CEP

2017-07-01 Thread dianfu
Github user dianfu commented on a diff in the pull request:

https://github.com/apache/flink/pull/4153#discussion_r125173912
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
 ---
@@ -430,6 +431,54 @@ public Quantifier getQuantifier() {
return this;
}
 
+   /**
+* Starts a new pattern sequence. The provided pattern is the initial 
pattern
+* of the new sequence.
+*
+* @param group the pattern to begin with
+* @return the first pattern of a pattern sequence
+*/
+   public static <T, F extends T> GroupPattern<T, F> begin(Pattern<T, F> 
group) {
+   return new GroupPattern<>(null, group);
+   }
+
+   /**
+* Appends a new pattern to the existing one. The new pattern enforces 
non-strict
--- End diff --

done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4153: [FLINK-6927] [cep] Support pattern group in CEP

2017-07-01 Thread dianfu
Github user dianfu commented on a diff in the pull request:

https://github.com/apache/flink/pull/4153#discussion_r125173877
  
--- Diff: 
flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/TimesRangeITCase.java
 ---
@@ -92,6 +92,57 @@ public boolean filter(Event value) throws Exception {
}
 
@Test
+   public void testTimesRangeFromZero() {
--- End diff --

done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4153: [FLINK-6927] [cep] Support pattern group in CEP

2017-07-01 Thread dianfu
Github user dianfu commented on a diff in the pull request:

https://github.com/apache/flink/pull/4153#discussion_r125173876
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java
 ---
@@ -153,9 +153,8 @@ public int hashCode() {
private final int to;
 
private Times(int from, int to) {
-   Preconditions.checkArgument(from >= 0, "The from should 
be a non-negative number greater than or equal to 0.");
+   Preconditions.checkArgument(from > 0, "The from should 
be a positive number greater than 0.");
--- End diff --

done.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4153: [FLINK-6927] [cep] Support pattern group in CEP

2017-07-01 Thread dianfu
Github user dianfu commented on a diff in the pull request:

https://github.com/apache/flink/pull/4153#discussion_r125173871
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFAStateNameHandler.java
 ---
@@ -55,6 +55,11 @@ public void checkNameUniqueness(String name) {
if (usedNames.contains(name)) {
throw new MalformedPatternException("Duplicate pattern 
name: " + name + ". Names must be unique.");
}
+   usedNames.add(name);
+   }
+
--- End diff --

updated


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4153: [FLINK-6927] [cep] Support pattern group in CEP

2017-07-01 Thread dianfu
Github user dianfu commented on a diff in the pull request:

https://github.com/apache/flink/pull/4153#discussion_r125173832
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
 ---
@@ -455,6 +548,76 @@ private void addStopStateToLooping(final State 
loopingState) {
}
 
/**
+* Create all the states for the group pattern.
+*
+* @param groupPattern the group pattern to create the states 
for
+* @param sinkState the state that the group pattern being 
converted should point to
+* @param proceedState the state that the group pattern being 
converted should proceed to
+* @param isOptional whether the group pattern being converted 
is optional
+* @return the first state of the states of the group pattern
+*/
+   private State createGroupPatternState(
+   final GroupPattern<T, ?> groupPattern,
+   final State sinkState,
+   final State proceedState,
+   final boolean isOptional) {
+   final IterativeCondition trueFunction = 
BooleanConditions.trueFunction();
+
+   Pattern<T, ?> oldCurrentPattern = currentPattern;
+   Pattern<T, ?> oldFollowingPattern = followingPattern;
+   GroupPattern<T, ?> oldGroupPattern = 
currentGroupPattern;
+   try {
+   State lastSink = sinkState;
+   currentGroupPattern = groupPattern;
+   currentPattern = groupPattern.getRawPattern();
+   lastSink = createMiddleStates(lastSink);
+   lastSink = convertPattern(lastSink);
+   if (isOptional) {
+   // for the first state of a group 
pattern, its PROCEED edge should point to
+   // the following state of that group 
pattern
+   lastSink.addProceed(proceedState, 
trueFunction);
+   }
+   return lastSink;
+   } finally {
+   currentPattern = oldCurrentPattern;
+   followingPattern = oldFollowingPattern;
+   currentGroupPattern = oldGroupPattern;
+   }
+   }
+
+   /**
+* Create the states for the group pattern as a looping one.
+*
+* @param groupPattern the group pattern to create the states 
for
+* @param sinkState the state that the group pattern being 
converted should point to
+* @return the first state of the states of the group pattern
+*/
+   private State createLoopingGroupPatternState(
+   final GroupPattern<T, ?> groupPattern,
+   final State sinkState) {
+   final IterativeCondition trueFunction = 
BooleanConditions.trueFunction();
+
+   Pattern<T, ?> oldCurrentPattern = currentPattern;
+   Pattern<T, ?> oldFollowingPattern = followingPattern;
+   GroupPattern<T, ?> oldGroupPattern = 
currentGroupPattern;
+   try {
--- End diff --

updated.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4153: [FLINK-6927] [cep] Support pattern group in CEP

2017-07-01 Thread dianfu
Github user dianfu commented on a diff in the pull request:

https://github.com/apache/flink/pull/4153#discussion_r125173828
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
 ---
@@ -455,6 +548,76 @@ private void addStopStateToLooping(final State 
loopingState) {
}
 
/**
+* Create all the states for the group pattern.
+*
+* @param groupPattern the group pattern to create the states 
for
+* @param sinkState the state that the group pattern being 
converted should point to
+* @param proceedState the state that the group pattern being 
converted should proceed to
+* @param isOptional whether the group pattern being converted 
is optional
+* @return the first state of the states of the group pattern
+*/
+   private State createGroupPatternState(
+   final GroupPattern<T, ?> groupPattern,
+   final State sinkState,
+   final State proceedState,
+   final boolean isOptional) {
+   final IterativeCondition trueFunction = 
BooleanConditions.trueFunction();
+
+   Pattern<T, ?> oldCurrentPattern = currentPattern;
+   Pattern<T, ?> oldFollowingPattern = followingPattern;
+   GroupPattern<T, ?> oldGroupPattern = 
currentGroupPattern;
+   try {
--- End diff --

updated.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4153: [FLINK-6927] [cep] Support pattern group in CEP

2017-07-01 Thread dianfu
Github user dianfu commented on a diff in the pull request:

https://github.com/apache/flink/pull/4153#discussion_r125173760
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
 ---
@@ -366,8 +366,9 @@ public Quantifier getQuantifier() {
checkIfNoNotPattern();
checkIfQuantifierApplied();
this.quantifier = 
Quantifier.times(quantifier.getConsumingStrategy());
-   if (from == 0) {
--- End diff --

Thanks for the suggestion, created PR: 
https://github.com/apache/flink/pull/4242


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


  1   2   >