[jira] [Commented] (FLINK-10597) Enable UDFs support in MATCH_RECOGNIZE

2018-11-30 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10597?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16704880#comment-16704880
 ] 

ASF GitHub Bot commented on FLINK-10597:


dawidwys closed pull request #7189: [FLINK-10597][table] Enabled UDFs support 
in MATCH_RECOGNIZE
URL: https://github.com/apache/flink/pull/7189
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/dev/table/streaming/match_recognize.md 
b/docs/dev/table/streaming/match_recognize.md
index 3cd1ed09cc9..e61d384075e 100644
--- a/docs/dev/table/streaming/match_recognize.md
+++ b/docs/dev/table/streaming/match_recognize.md
@@ -838,5 +838,4 @@ Unsupported features include:
 * Physical offsets - `PREV/NEXT`, which indexes all events seen rather than 
only those that were mapped to a pattern variable(as in [logical 
offsets](#logical-offsets) case).
 * Extracting time attributes - there is currently no possibility to get a time 
attribute for subsequent time-based operations.
 * Aggregates - one cannot use aggregates in `MEASURES` nor `DEFINE` clauses.
-* User defined functions cannot be used within `MATCH_RECOGNIZE`.
 * `MATCH_RECOGNIZE` is supported only for SQL. There is no equivalent in the 
Table API.
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/CepRuntimeContext.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/CepRuntimeContext.java
index ff6e610845c..ed93f8e52cd 100644
--- 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/CepRuntimeContext.java
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/CepRuntimeContext.java
@@ -103,6 +103,11 @@ public ClassLoader getUserCodeClassLoader() {
return runtimeContext.getUserCodeClassLoader();
}
 
+   @Override
+   public DistributedCache getDistributedCache() {
+   return runtimeContext.getDistributedCache();
+   }
+
// 
---
// Unsupported operations
// 
---
@@ -159,11 +164,6 @@ public boolean hasBroadcastVariable(String name) {
throw new UnsupportedOperationException("Broadcast variables 
are not supported.");
}
 
-   @Override
-   public DistributedCache getDistributedCache() {
-   throw new UnsupportedOperationException("Distributed cache is 
not supported.");
-   }
-
@Override
public  ValueState getState(ValueStateDescriptor 
stateProperties) {
throw new UnsupportedOperationException("State is not 
supported.");
diff --git 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CepRuntimeContextTest.java
 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CepRuntimeContextTest.java
index 6bc4081da54..ef7ee89881e 100644
--- 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CepRuntimeContextTest.java
+++ 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CepRuntimeContextTest.java
@@ -20,6 +20,7 @@
 
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.api.common.cache.DistributedCache;
 import org.apache.flink.api.common.functions.AggregateFunction;
 import org.apache.flink.api.common.functions.BroadcastVariableInitializer;
 import org.apache.flink.api.common.functions.FoldFunction;
@@ -134,6 +135,7 @@ private void verifyRuntimeContext(final RichFunction 
function) {
final String taskNameWithSubtask = "barfoo";
final ExecutionConfig executionConfig = 
mock(ExecutionConfig.class);
final ClassLoader userCodeClassLoader = mock(ClassLoader.class);
+   final DistributedCache distributedCache = 
mock(DistributedCache.class);
 
RuntimeContext mockedRuntimeContext = 
mock(RuntimeContext.class);
 
@@ -145,6 +147,7 @@ private void verifyRuntimeContext(final RichFunction 
function) {

when(mockedRuntimeContext.getTaskNameWithSubtasks()).thenReturn(taskNameWithSubtask);

when(mockedRuntimeContext.getExecutionConfig()).thenReturn(executionConfig);

when(mockedRuntimeContext.getUserCodeClassLoader()).thenReturn(userCodeClassLoader);
+   
when(mockedRuntimeContext.getDistributedCache()).thenReturn(distributedCache);
 
function.setRuntimeContext(mockedRuntimeContext);
 
@@ -159,13 +162,7 @@ private void verifyRuntimeContext(final RichFunction 
function) {
assertEquals(taskNameWithSubtask, 

[jira] [Commented] (FLINK-10597) Enable UDFs support in MATCH_RECOGNIZE

2018-11-30 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10597?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16704405#comment-16704405
 ] 

ASF GitHub Bot commented on FLINK-10597:


dawidwys commented on a change in pull request #7189: [FLINK-10597][table] 
Enabled UDFs support in MATCH_RECOGNIZE
URL: https://github.com/apache/flink/pull/7189#discussion_r237778606
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/match/PatternSelectFunctionRunner.scala
 ##
 @@ -35,20 +36,26 @@ import org.apache.flink.util.Collector
 class PatternSelectFunctionRunner(
 name: String,
 code: String)
-  extends PatternFlatSelectFunction[Row, CRow]
-  with Compiler[PatternSelectFunction[Row, Row]]
+  extends RichPatternFlatSelectFunction[Row, CRow]
 
 Review comment:
   Agree, had that in mind. I would change that either during introducing the 
`ALL ROWS PER MATCH`  or as a separate issue, as it is not directly connected 
with this one and requires more changes in the code generator.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Enable UDFs support in MATCH_RECOGNIZE
> --
>
> Key: FLINK-10597
> URL: https://issues.apache.org/jira/browse/FLINK-10597
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP, Table API  SQL
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10597) Enable UDFs support in MATCH_RECOGNIZE

2018-11-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10597?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16704135#comment-16704135
 ] 

ASF GitHub Bot commented on FLINK-10597:


dianfu commented on a change in pull request #7189: [FLINK-10597][table] 
Enabled UDFs support in MATCH_RECOGNIZE
URL: https://github.com/apache/flink/pull/7189#discussion_r237722370
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/match/PatternSelectFunctionRunner.scala
 ##
 @@ -35,20 +36,26 @@ import org.apache.flink.util.Collector
 class PatternSelectFunctionRunner(
 name: String,
 code: String)
-  extends PatternFlatSelectFunction[Row, CRow]
-  with Compiler[PatternSelectFunction[Row, Row]]
+  extends RichPatternFlatSelectFunction[Row, CRow]
 
 Review comment:
   @dawidwys I have rethought about this issue and what about always generating 
a `PatternFlatSelectFunction` no matter `ONE ROW PER MATCH` or `ALL ROWS PER 
MATCH` is specified? 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Enable UDFs support in MATCH_RECOGNIZE
> --
>
> Key: FLINK-10597
> URL: https://issues.apache.org/jira/browse/FLINK-10597
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP, Table API  SQL
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10597) Enable UDFs support in MATCH_RECOGNIZE

2018-11-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10597?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16703095#comment-16703095
 ] 

ASF GitHub Bot commented on FLINK-10597:


dianfu commented on a change in pull request #7189: [FLINK-10597][table] 
Enabled UDFs support in MATCH_RECOGNIZE
URL: https://github.com/apache/flink/pull/7189#discussion_r237464451
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala
 ##
 @@ -131,47 +132,39 @@ class MatchCodeGenerator(
 : GeneratedFunction[F, T] = {
 
 Review comment:
   We can add something like `reusablePrepareInputStatements` in CodeGenerator 
and  place it before `reuseInputUnboxingCode` when generating the function. 
Then we can make use of `reusablePrepareInputStatements` to construct the 
`reusePatternLists`. But I'm fine with the current implementation as 
`reusablePrepareInputStatements` is currently only useful for CEP. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Enable UDFs support in MATCH_RECOGNIZE
> --
>
> Key: FLINK-10597
> URL: https://issues.apache.org/jira/browse/FLINK-10597
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP, Table API  SQL
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10597) Enable UDFs support in MATCH_RECOGNIZE

2018-11-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10597?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16703096#comment-16703096
 ] 

ASF GitHub Bot commented on FLINK-10597:


dianfu commented on a change in pull request #7189: [FLINK-10597][table] 
Enabled UDFs support in MATCH_RECOGNIZE
URL: https://github.com/apache/flink/pull/7189#discussion_r237464564
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/match/PatternSelectFunctionRunner.scala
 ##
 @@ -35,20 +36,26 @@ import org.apache.flink.util.Collector
 class PatternSelectFunctionRunner(
 name: String,
 code: String)
-  extends PatternFlatSelectFunction[Row, CRow]
-  with Compiler[PatternSelectFunction[Row, Row]]
+  extends RichPatternFlatSelectFunction[Row, CRow]
 
 Review comment:
   OK. Make sense.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Enable UDFs support in MATCH_RECOGNIZE
> --
>
> Key: FLINK-10597
> URL: https://issues.apache.org/jira/browse/FLINK-10597
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP, Table API  SQL
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10597) Enable UDFs support in MATCH_RECOGNIZE

2018-11-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10597?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16702986#comment-16702986
 ] 

ASF GitHub Bot commented on FLINK-10597:


twalthr commented on a change in pull request #7189: [FLINK-10597][table] 
Enabled UDFs support in MATCH_RECOGNIZE
URL: https://github.com/apache/flink/pull/7189#discussion_r237428381
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/MatchRecognizeITCase.scala
 ##
 @@ -543,10 +543,79 @@ class MatchRecognizeITCase extends 
StreamingWithStateTestBase {
 // We do not assert the proctime in the result, cause it is currently
 
 Review comment:
   We just introduced a new rich class. I think it would also be a perfect time 
to think about this shortcoming.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Enable UDFs support in MATCH_RECOGNIZE
> --
>
> Key: FLINK-10597
> URL: https://issues.apache.org/jira/browse/FLINK-10597
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP, Table API  SQL
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10597) Enable UDFs support in MATCH_RECOGNIZE

2018-11-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10597?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16702953#comment-16702953
 ] 

ASF GitHub Bot commented on FLINK-10597:


dawidwys commented on a change in pull request #7189: [FLINK-10597][table] 
Enabled UDFs support in MATCH_RECOGNIZE
URL: https://github.com/apache/flink/pull/7189#discussion_r237417851
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/match/PatternSelectFunctionRunner.scala
 ##
 @@ -35,20 +36,26 @@ import org.apache.flink.util.Collector
 class PatternSelectFunctionRunner(
 name: String,
 code: String)
-  extends PatternFlatSelectFunction[Row, CRow]
-  with Compiler[PatternSelectFunction[Row, Row]]
+  extends RichPatternFlatSelectFunction[Row, CRow]
 
 Review comment:
   We need access to the collector to be able to erase timestamp.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Enable UDFs support in MATCH_RECOGNIZE
> --
>
> Key: FLINK-10597
> URL: https://issues.apache.org/jira/browse/FLINK-10597
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP, Table API  SQL
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10597) Enable UDFs support in MATCH_RECOGNIZE

2018-11-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10597?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16702952#comment-16702952
 ] 

ASF GitHub Bot commented on FLINK-10597:


dawidwys commented on a change in pull request #7189: [FLINK-10597][table] 
Enabled UDFs support in MATCH_RECOGNIZE
URL: https://github.com/apache/flink/pull/7189#discussion_r237417676
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/match/IterativeConditionRunner.scala
 ##
 @@ -31,30 +31,27 @@ import org.apache.flink.types.Row
 class IterativeConditionRunner(
 name: String,
 code: String)
-  extends IterativeCondition[Row]
-  with Compiler[IterativeCondition[Row]]
+  extends RichIterativeCondition[Row]
+  with Compiler[RichIterativeCondition[Row]]
   with Logging {
 
-  @transient private var function: IterativeCondition[Row] = _
+  @transient private var function: RichIterativeCondition[Row] = _
 
-  def init(): Unit = {
+  override def open(parameters: Configuration): Unit = {
 LOG.debug(s"Compiling IterativeCondition: $name \n\n Code:\n$code")
-// We cannot get user's classloader currently, see FLINK-6938 for details
-val clazz = compile(Thread.currentThread().getContextClassLoader, name, 
code)
+val clazz = compile(getRuntimeContext.getUserCodeClassLoader, name, code)
 LOG.debug("Instantiating IterativeCondition.")
 function = clazz.newInstance()
-// TODO add logic for opening and closing the function once it can be a 
RichFunction
+FunctionUtils.setFunctionRuntimeContext(function, getRuntimeContext)
 
 Review comment:
   I don't think there is any particular reason. Just wanted to maximally limit 
the scope. If it is important though, I will change that.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Enable UDFs support in MATCH_RECOGNIZE
> --
>
> Key: FLINK-10597
> URL: https://issues.apache.org/jira/browse/FLINK-10597
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP, Table API  SQL
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10597) Enable UDFs support in MATCH_RECOGNIZE

2018-11-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10597?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16702949#comment-16702949
 ] 

ASF GitHub Bot commented on FLINK-10597:


dawidwys commented on a change in pull request #7189: [FLINK-10597][table] 
Enabled UDFs support in MATCH_RECOGNIZE
URL: https://github.com/apache/flink/pull/7189#discussion_r237416695
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/MatchRecognizeITCase.scala
 ##
 @@ -543,10 +543,79 @@ class MatchRecognizeITCase extends 
StreamingWithStateTestBase {
 // We do not assert the proctime in the result, cause it is currently
 
 Review comment:
   Unfortunately not, there is a separate issue for this: 
https://issues.apache.org/jira/browse/FLINK-10596


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Enable UDFs support in MATCH_RECOGNIZE
> --
>
> Key: FLINK-10597
> URL: https://issues.apache.org/jira/browse/FLINK-10597
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP, Table API  SQL
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10597) Enable UDFs support in MATCH_RECOGNIZE

2018-11-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10597?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16702940#comment-16702940
 ] 

ASF GitHub Bot commented on FLINK-10597:


dawidwys commented on a change in pull request #7189: [FLINK-10597][table] 
Enabled UDFs support in MATCH_RECOGNIZE
URL: https://github.com/apache/flink/pull/7189#discussion_r237414177
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala
 ##
 @@ -131,47 +132,39 @@ class MatchCodeGenerator(
 : GeneratedFunction[F, T] = {
 
 Review comment:
   There is one additional obstacle right now. The `reusePatternLists()` call, 
I would remove the comment and leave it separated, as there might be some 
additional differences in the future. What do you think?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Enable UDFs support in MATCH_RECOGNIZE
> --
>
> Key: FLINK-10597
> URL: https://issues.apache.org/jira/browse/FLINK-10597
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP, Table API  SQL
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10597) Enable UDFs support in MATCH_RECOGNIZE

2018-11-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10597?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16702933#comment-16702933
 ] 

ASF GitHub Bot commented on FLINK-10597:


dianfu commented on a change in pull request #7189: [FLINK-10597][table] 
Enabled UDFs support in MATCH_RECOGNIZE
URL: https://github.com/apache/flink/pull/7189#discussion_r237407834
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala
 ##
 @@ -131,47 +132,39 @@ class MatchCodeGenerator(
 : GeneratedFunction[F, T] = {
 
 Review comment:
   We can already reuse `FunctionCodeGenerator.generateFunction()` as 
IterativeCondition and Pattern(Flat)SelectFunction already support rich 
interfaces.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Enable UDFs support in MATCH_RECOGNIZE
> --
>
> Key: FLINK-10597
> URL: https://issues.apache.org/jira/browse/FLINK-10597
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP, Table API  SQL
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10597) Enable UDFs support in MATCH_RECOGNIZE

2018-11-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10597?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16702934#comment-16702934
 ] 

ASF GitHub Bot commented on FLINK-10597:


dianfu commented on a change in pull request #7189: [FLINK-10597][table] 
Enabled UDFs support in MATCH_RECOGNIZE
URL: https://github.com/apache/flink/pull/7189#discussion_r237411315
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/match/PatternSelectFunctionRunner.scala
 ##
 @@ -35,20 +36,26 @@ import org.apache.flink.util.Collector
 class PatternSelectFunctionRunner(
 name: String,
 code: String)
-  extends PatternFlatSelectFunction[Row, CRow]
-  with Compiler[PatternSelectFunction[Row, Row]]
+  extends RichPatternFlatSelectFunction[Row, CRow]
 
 Review comment:
   Is there any reason we use a `RichPatternFlatSelectFunction` which actually 
wraps a `RichPatternSelectFunction`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Enable UDFs support in MATCH_RECOGNIZE
> --
>
> Key: FLINK-10597
> URL: https://issues.apache.org/jira/browse/FLINK-10597
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP, Table API  SQL
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10597) Enable UDFs support in MATCH_RECOGNIZE

2018-11-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10597?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16702894#comment-16702894
 ] 

ASF GitHub Bot commented on FLINK-10597:


twalthr commented on a change in pull request #7189: [FLINK-10597][table] 
Enabled UDFs support in MATCH_RECOGNIZE
URL: https://github.com/apache/flink/pull/7189#discussion_r237402448
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/MatchRecognizeITCase.scala
 ##
 @@ -543,10 +543,79 @@ class MatchRecognizeITCase extends 
StreamingWithStateTestBase {
 // We do not assert the proctime in the result, cause it is currently
 // accessed from System.currentTimeMillis(), so there is no graceful way 
to assert the proctime
   }
+
+  @Test
+  def testRichUdfs(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+env.setParallelism(1)
+val tEnv = TableEnvironment.getTableEnvironment(env)
+tEnv.getConfig.setMaxGeneratedCodeLength(1)
+StreamITCase.clear
+
+val data = new mutable.MutableList[(Int, String, Long)]
+data.+=((1, "a", 1))
+data.+=((2, "a", 1))
+data.+=((3, "a", 1))
+data.+=((4, "a", 1))
+data.+=((5, "a", 1))
+data.+=((6, "b", 1))
+data.+=((7, "a", 1))
+data.+=((8, "a", 1))
+data.+=((9, "f", 1))
+
+val t = env.fromCollection(data)
+  .toTable(tEnv, 'id, 'name, 'price, 'proctime.proctime)
+tEnv.registerTable("MyTable", t)
+tEnv.registerFunction("prefix", new RichScalarFunc)
+val prefix = "PREF"
+UserDefinedFunctionTestUtils
+  .setJobParameters(env, Map("prefix" -> prefix))
+
+val sqlQuery =
+  s"""
+ |SELECT *
+ |FROM MyTable
+ |MATCH_RECOGNIZE (
+ |  ORDER BY proctime
+ |  MEASURES
+ |FIRST(id) as firstId,
+ |prefix(A.name) as prefixedNameA,
+ |LAST(id) as lastId
+ |  AFTER MATCH SKIP PAST LAST ROW
+ |  PATTERN (A+ C)
+ |  DEFINE
+ |A AS prefix(A.name) = '$prefix:a'
+ |) AS T
+ |""".stripMargin
+
+val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
+result.addSink(new StreamITCase.StringSink[Row])
+env.execute()
+
+val expected = mutable.MutableList("1,PREF:a,6", "7,PREF:a,9")
+assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
 }
 
 class ToMillis extends ScalarFunction {
   def eval(t: Timestamp): Long = {
 t.toInstant.toEpochMilli + 
TimeZone.getDefault.getOffset(t.toInstant.toEpochMilli)
   }
 }
+
+private class RichScalarFunc extends ScalarFunction {
 
 Review comment:
   Give more meaningful name.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Enable UDFs support in MATCH_RECOGNIZE
> --
>
> Key: FLINK-10597
> URL: https://issues.apache.org/jira/browse/FLINK-10597
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP, Table API  SQL
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10597) Enable UDFs support in MATCH_RECOGNIZE

2018-11-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10597?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16702899#comment-16702899
 ] 

ASF GitHub Bot commented on FLINK-10597:


twalthr commented on a change in pull request #7189: [FLINK-10597][table] 
Enabled UDFs support in MATCH_RECOGNIZE
URL: https://github.com/apache/flink/pull/7189#discussion_r237401057
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/MatchRecognizeITCase.scala
 ##
 @@ -543,10 +543,79 @@ class MatchRecognizeITCase extends 
StreamingWithStateTestBase {
 // We do not assert the proctime in the result, cause it is currently
 // accessed from System.currentTimeMillis(), so there is no graceful way 
to assert the proctime
   }
+
+  @Test
+  def testRichUdfs(): Unit = {
 
 Review comment:
   nit: There are no `RichUdfs` just `UserDefinedFunctions` ;-)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Enable UDFs support in MATCH_RECOGNIZE
> --
>
> Key: FLINK-10597
> URL: https://issues.apache.org/jira/browse/FLINK-10597
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP, Table API  SQL
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10597) Enable UDFs support in MATCH_RECOGNIZE

2018-11-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10597?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16702896#comment-16702896
 ] 

ASF GitHub Bot commented on FLINK-10597:


twalthr commented on a change in pull request #7189: [FLINK-10597][table] 
Enabled UDFs support in MATCH_RECOGNIZE
URL: https://github.com/apache/flink/pull/7189#discussion_r237399713
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/match/IterativeConditionRunner.scala
 ##
 @@ -31,30 +31,27 @@ import org.apache.flink.types.Row
 class IterativeConditionRunner(
 name: String,
 code: String)
-  extends IterativeCondition[Row]
-  with Compiler[IterativeCondition[Row]]
+  extends RichIterativeCondition[Row]
+  with Compiler[RichIterativeCondition[Row]]
   with Logging {
 
-  @transient private var function: IterativeCondition[Row] = _
+  @transient private var function: RichIterativeCondition[Row] = _
 
-  def init(): Unit = {
+  override def open(parameters: Configuration): Unit = {
 LOG.debug(s"Compiling IterativeCondition: $name \n\n Code:\n$code")
-// We cannot get user's classloader currently, see FLINK-6938 for details
-val clazz = compile(Thread.currentThread().getContextClassLoader, name, 
code)
+val clazz = compile(getRuntimeContext.getUserCodeClassLoader, name, code)
 LOG.debug("Instantiating IterativeCondition.")
 function = clazz.newInstance()
-// TODO add logic for opening and closing the function once it can be a 
RichFunction
+FunctionUtils.setFunctionRuntimeContext(function, getRuntimeContext)
+FunctionUtils.openFunction(function, parameters)
   }
 
   override def filter(value: Row, ctx: IterativeCondition.Context[Row]): 
Boolean = {
 function.filter(value, ctx)
   }
 
-  @throws(classOf[IOException])
-  private def readObject(in: ObjectInputStream): Unit = {
-in.defaultReadObject()
-if (function == null) {
-  init()
-}
+  override def close(): Unit = {
+super.close()
+function.close()
 
 Review comment:
   Use `org.apache.flink.api.common.functions.util.FunctionUtils#closeFunction` 
instead?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Enable UDFs support in MATCH_RECOGNIZE
> --
>
> Key: FLINK-10597
> URL: https://issues.apache.org/jira/browse/FLINK-10597
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP, Table API  SQL
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10597) Enable UDFs support in MATCH_RECOGNIZE

2018-11-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10597?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16702898#comment-16702898
 ] 

ASF GitHub Bot commented on FLINK-10597:


twalthr commented on a change in pull request #7189: [FLINK-10597][table] 
Enabled UDFs support in MATCH_RECOGNIZE
URL: https://github.com/apache/flink/pull/7189#discussion_r237400562
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/MatchRecognizeITCase.scala
 ##
 @@ -543,10 +543,79 @@ class MatchRecognizeITCase extends 
StreamingWithStateTestBase {
 // We do not assert the proctime in the result, cause it is currently
 
 Review comment:
   I guess this problem has not been solved with the new rich function?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Enable UDFs support in MATCH_RECOGNIZE
> --
>
> Key: FLINK-10597
> URL: https://issues.apache.org/jira/browse/FLINK-10597
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP, Table API  SQL
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10597) Enable UDFs support in MATCH_RECOGNIZE

2018-11-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10597?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16702895#comment-16702895
 ] 

ASF GitHub Bot commented on FLINK-10597:


twalthr commented on a change in pull request #7189: [FLINK-10597][table] 
Enabled UDFs support in MATCH_RECOGNIZE
URL: https://github.com/apache/flink/pull/7189#discussion_r237398248
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/match/IterativeConditionRunner.scala
 ##
 @@ -31,30 +31,27 @@ import org.apache.flink.types.Row
 class IterativeConditionRunner(
 name: String,
 code: String)
-  extends IterativeCondition[Row]
-  with Compiler[IterativeCondition[Row]]
+  extends RichIterativeCondition[Row]
+  with Compiler[RichIterativeCondition[Row]]
   with Logging {
 
-  @transient private var function: IterativeCondition[Row] = _
+  @transient private var function: RichIterativeCondition[Row] = _
 
-  def init(): Unit = {
+  override def open(parameters: Configuration): Unit = {
 LOG.debug(s"Compiling IterativeCondition: $name \n\n Code:\n$code")
-// We cannot get user's classloader currently, see FLINK-6938 for details
-val clazz = compile(Thread.currentThread().getContextClassLoader, name, 
code)
+val clazz = compile(getRuntimeContext.getUserCodeClassLoader, name, code)
 LOG.debug("Instantiating IterativeCondition.")
 function = clazz.newInstance()
-// TODO add logic for opening and closing the function once it can be a 
RichFunction
+FunctionUtils.setFunctionRuntimeContext(function, getRuntimeContext)
 
 Review comment:
   Is there a reason why a distributed cache is not supported in a 
`CepRuntimeContext`? We don't expose many context properties in SQL UDFs but 
the distributed cache apparently seems to be important.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Enable UDFs support in MATCH_RECOGNIZE
> --
>
> Key: FLINK-10597
> URL: https://issues.apache.org/jira/browse/FLINK-10597
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP, Table API  SQL
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10597) Enable UDFs support in MATCH_RECOGNIZE

2018-11-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10597?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16702892#comment-16702892
 ] 

ASF GitHub Bot commented on FLINK-10597:


twalthr commented on a change in pull request #7189: [FLINK-10597][table] 
Enabled UDFs support in MATCH_RECOGNIZE
URL: https://github.com/apache/flink/pull/7189#discussion_r237396997
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala
 ##
 @@ -131,47 +132,39 @@ class MatchCodeGenerator(
 : GeneratedFunction[F, T] = {
 val funcName = newName(name)
 val collectorTypeTerm = classOf[Collector[Any]].getCanonicalName
-val (functionClass, signature, inputStatements, isInterface) =
-  if (clazz == classOf[IterativeCondition[_]]) {
-val baseClass = classOf[IterativeCondition[_]]
+val (functionClass, signature, inputStatements) =
+  if (clazz == classOf[IterativeCondition[_]] || clazz == 
classOf[RichIterativeCondition[_]]) {
 
 Review comment:
   Is there a reason to support both interface and rich class? I would keep it 
simple here and only support the rich variant.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Enable UDFs support in MATCH_RECOGNIZE
> --
>
> Key: FLINK-10597
> URL: https://issues.apache.org/jira/browse/FLINK-10597
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP, Table API  SQL
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10597) Enable UDFs support in MATCH_RECOGNIZE

2018-11-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10597?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16702900#comment-16702900
 ] 

ASF GitHub Bot commented on FLINK-10597:


twalthr commented on a change in pull request #7189: [FLINK-10597][table] 
Enabled UDFs support in MATCH_RECOGNIZE
URL: https://github.com/apache/flink/pull/7189#discussion_r237402642
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/MatchRecognizeITCase.scala
 ##
 @@ -543,10 +543,79 @@ class MatchRecognizeITCase extends 
StreamingWithStateTestBase {
 // We do not assert the proctime in the result, cause it is currently
 // accessed from System.currentTimeMillis(), so there is no graceful way 
to assert the proctime
   }
+
+  @Test
+  def testRichUdfs(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+env.setParallelism(1)
+val tEnv = TableEnvironment.getTableEnvironment(env)
+tEnv.getConfig.setMaxGeneratedCodeLength(1)
+StreamITCase.clear
+
+val data = new mutable.MutableList[(Int, String, Long)]
+data.+=((1, "a", 1))
+data.+=((2, "a", 1))
+data.+=((3, "a", 1))
+data.+=((4, "a", 1))
+data.+=((5, "a", 1))
+data.+=((6, "b", 1))
+data.+=((7, "a", 1))
+data.+=((8, "a", 1))
+data.+=((9, "f", 1))
+
+val t = env.fromCollection(data)
+  .toTable(tEnv, 'id, 'name, 'price, 'proctime.proctime)
+tEnv.registerTable("MyTable", t)
+tEnv.registerFunction("prefix", new RichScalarFunc)
+val prefix = "PREF"
+UserDefinedFunctionTestUtils
+  .setJobParameters(env, Map("prefix" -> prefix))
+
+val sqlQuery =
+  s"""
+ |SELECT *
+ |FROM MyTable
+ |MATCH_RECOGNIZE (
+ |  ORDER BY proctime
+ |  MEASURES
+ |FIRST(id) as firstId,
+ |prefix(A.name) as prefixedNameA,
+ |LAST(id) as lastId
+ |  AFTER MATCH SKIP PAST LAST ROW
+ |  PATTERN (A+ C)
+ |  DEFINE
+ |A AS prefix(A.name) = '$prefix:a'
+ |) AS T
+ |""".stripMargin
+
+val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
+result.addSink(new StreamITCase.StringSink[Row])
+env.execute()
+
+val expected = mutable.MutableList("1,PREF:a,6", "7,PREF:a,9")
+assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
 }
 
 class ToMillis extends ScalarFunction {
   def eval(t: Timestamp): Long = {
 t.toInstant.toEpochMilli + 
TimeZone.getDefault.getOffset(t.toInstant.toEpochMilli)
   }
 }
+
+private class RichScalarFunc extends ScalarFunction {
+
+  private var prefix = "ERROR_VALUE"
+
+  override def open(context: FunctionContext): Unit = {
+prefix = context.getJobParameter("prefix", "")
+  }
+
+  def eval(value: String): String = {
+s"$prefix:$value"
+  }
+
+  override def close(): Unit = {
+prefix = "ERROR_VALUE"
 
 Review comment:
   This line has no meaning so far. Remove the entire method?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Enable UDFs support in MATCH_RECOGNIZE
> --
>
> Key: FLINK-10597
> URL: https://issues.apache.org/jira/browse/FLINK-10597
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP, Table API  SQL
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10597) Enable UDFs support in MATCH_RECOGNIZE

2018-11-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10597?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16702893#comment-16702893
 ] 

ASF GitHub Bot commented on FLINK-10597:


twalthr commented on a change in pull request #7189: [FLINK-10597][table] 
Enabled UDFs support in MATCH_RECOGNIZE
URL: https://github.com/apache/flink/pull/7189#discussion_r237398634
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/match/IterativeConditionRunner.scala
 ##
 @@ -31,30 +31,27 @@ import org.apache.flink.types.Row
 class IterativeConditionRunner(
 name: String,
 code: String)
-  extends IterativeCondition[Row]
-  with Compiler[IterativeCondition[Row]]
+  extends RichIterativeCondition[Row]
+  with Compiler[RichIterativeCondition[Row]]
   with Logging {
 
-  @transient private var function: IterativeCondition[Row] = _
+  @transient private var function: RichIterativeCondition[Row] = _
 
-  def init(): Unit = {
+  override def open(parameters: Configuration): Unit = {
 LOG.debug(s"Compiling IterativeCondition: $name \n\n Code:\n$code")
-// We cannot get user's classloader currently, see FLINK-6938 for details
-val clazz = compile(Thread.currentThread().getContextClassLoader, name, 
code)
+val clazz = compile(getRuntimeContext.getUserCodeClassLoader, name, code)
 LOG.debug("Instantiating IterativeCondition.")
 function = clazz.newInstance()
-// TODO add logic for opening and closing the function once it can be a 
RichFunction
+FunctionUtils.setFunctionRuntimeContext(function, getRuntimeContext)
+FunctionUtils.openFunction(function, parameters)
   }
 
   override def filter(value: Row, ctx: IterativeCondition.Context[Row]): 
Boolean = {
 function.filter(value, ctx)
   }
 
-  @throws(classOf[IOException])
-  private def readObject(in: ObjectInputStream): Unit = {
-in.defaultReadObject()
-if (function == null) {
-  init()
-}
+  override def close(): Unit = {
+super.close()
 
 Review comment:
   Remove this line?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Enable UDFs support in MATCH_RECOGNIZE
> --
>
> Key: FLINK-10597
> URL: https://issues.apache.org/jira/browse/FLINK-10597
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP, Table API  SQL
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10597) Enable UDFs support in MATCH_RECOGNIZE

2018-11-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10597?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16702897#comment-16702897
 ] 

ASF GitHub Bot commented on FLINK-10597:


twalthr commented on a change in pull request #7189: [FLINK-10597][table] 
Enabled UDFs support in MATCH_RECOGNIZE
URL: https://github.com/apache/flink/pull/7189#discussion_r237399504
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/match/PatternSelectFunctionRunner.scala
 ##
 @@ -60,17 +67,9 @@ class PatternSelectFunctionRunner(
 out.collect(outCRow)
   }
 
-  @throws(classOf[IOException])
-  private def readObject(in: ObjectInputStream): Unit = {
-in.defaultReadObject()
-
-if (outCRow == null) {
-  outCRow = new CRow(null, true)
-}
-
-if (function == null) {
-  init()
-}
+  override def close(): Unit = {
+super.close()
 
 Review comment:
   Remove this line?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Enable UDFs support in MATCH_RECOGNIZE
> --
>
> Key: FLINK-10597
> URL: https://issues.apache.org/jira/browse/FLINK-10597
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP, Table API  SQL
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10597) Enable UDFs support in MATCH_RECOGNIZE

2018-11-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10597?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16702901#comment-16702901
 ] 

ASF GitHub Bot commented on FLINK-10597:


twalthr commented on a change in pull request #7189: [FLINK-10597][table] 
Enabled UDFs support in MATCH_RECOGNIZE
URL: https://github.com/apache/flink/pull/7189#discussion_r237399688
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/match/PatternSelectFunctionRunner.scala
 ##
 @@ -60,17 +67,9 @@ class PatternSelectFunctionRunner(
 out.collect(outCRow)
   }
 
-  @throws(classOf[IOException])
-  private def readObject(in: ObjectInputStream): Unit = {
-in.defaultReadObject()
-
-if (outCRow == null) {
-  outCRow = new CRow(null, true)
-}
-
-if (function == null) {
-  init()
-}
+  override def close(): Unit = {
+super.close()
+function.close()
 
 Review comment:
   Use `org.apache.flink.api.common.functions.util.FunctionUtils#closeFunction` 
instead?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Enable UDFs support in MATCH_RECOGNIZE
> --
>
> Key: FLINK-10597
> URL: https://issues.apache.org/jira/browse/FLINK-10597
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP, Table API  SQL
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10597) Enable UDFs support in MATCH_RECOGNIZE

2018-11-28 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10597?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16701970#comment-16701970
 ] 

ASF GitHub Bot commented on FLINK-10597:


dawidwys opened a new pull request #7189: [FLINK-10597] Enabled UDFs support in 
MATCH_RECOGNIZE
URL: https://github.com/apache/flink/pull/7189
 
 
   ## What is the purpose of the change
   
   This PR enables user defined scalar function in MATCH_RECOGNIZE
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / **no**)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
 - The serializers: (yes / **no** / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / **no** 
/ don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
 - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (**yes** / no)
 - If yes, how is the feature documented? (not applicable / **docs** / 
JavaDocs / not documented)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Enable UDFs support in MATCH_RECOGNIZE
> --
>
> Key: FLINK-10597
> URL: https://issues.apache.org/jira/browse/FLINK-10597
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP, Table API  SQL
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)