[jira] [Commented] (FLINK-10597) Enable UDFs support in MATCH_RECOGNIZE
[ https://issues.apache.org/jira/browse/FLINK-10597?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16704880#comment-16704880 ] ASF GitHub Bot commented on FLINK-10597: dawidwys closed pull request #7189: [FLINK-10597][table] Enabled UDFs support in MATCH_RECOGNIZE URL: https://github.com/apache/flink/pull/7189 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/docs/dev/table/streaming/match_recognize.md b/docs/dev/table/streaming/match_recognize.md index 3cd1ed09cc9..e61d384075e 100644 --- a/docs/dev/table/streaming/match_recognize.md +++ b/docs/dev/table/streaming/match_recognize.md @@ -838,5 +838,4 @@ Unsupported features include: * Physical offsets - `PREV/NEXT`, which indexes all events seen rather than only those that were mapped to a pattern variable(as in [logical offsets](#logical-offsets) case). * Extracting time attributes - there is currently no possibility to get a time attribute for subsequent time-based operations. * Aggregates - one cannot use aggregates in `MEASURES` nor `DEFINE` clauses. -* User defined functions cannot be used within `MATCH_RECOGNIZE`. * `MATCH_RECOGNIZE` is supported only for SQL. There is no equivalent in the Table API. diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/CepRuntimeContext.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/CepRuntimeContext.java index ff6e610845c..ed93f8e52cd 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/CepRuntimeContext.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/CepRuntimeContext.java @@ -103,6 +103,11 @@ public ClassLoader getUserCodeClassLoader() { return runtimeContext.getUserCodeClassLoader(); } + @Override + public DistributedCache getDistributedCache() { + return runtimeContext.getDistributedCache(); + } + // --- // Unsupported operations // --- @@ -159,11 +164,6 @@ public boolean hasBroadcastVariable(String name) { throw new UnsupportedOperationException("Broadcast variables are not supported."); } - @Override - public DistributedCache getDistributedCache() { - throw new UnsupportedOperationException("Distributed cache is not supported."); - } - @Override public ValueState getState(ValueStateDescriptor stateProperties) { throw new UnsupportedOperationException("State is not supported."); diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CepRuntimeContextTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CepRuntimeContextTest.java index 6bc4081da54..ef7ee89881e 100644 --- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CepRuntimeContextTest.java +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CepRuntimeContextTest.java @@ -20,6 +20,7 @@ import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.accumulators.Accumulator; +import org.apache.flink.api.common.cache.DistributedCache; import org.apache.flink.api.common.functions.AggregateFunction; import org.apache.flink.api.common.functions.BroadcastVariableInitializer; import org.apache.flink.api.common.functions.FoldFunction; @@ -134,6 +135,7 @@ private void verifyRuntimeContext(final RichFunction function) { final String taskNameWithSubtask = "barfoo"; final ExecutionConfig executionConfig = mock(ExecutionConfig.class); final ClassLoader userCodeClassLoader = mock(ClassLoader.class); + final DistributedCache distributedCache = mock(DistributedCache.class); RuntimeContext mockedRuntimeContext = mock(RuntimeContext.class); @@ -145,6 +147,7 @@ private void verifyRuntimeContext(final RichFunction function) { when(mockedRuntimeContext.getTaskNameWithSubtasks()).thenReturn(taskNameWithSubtask); when(mockedRuntimeContext.getExecutionConfig()).thenReturn(executionConfig); when(mockedRuntimeContext.getUserCodeClassLoader()).thenReturn(userCodeClassLoader); + when(mockedRuntimeContext.getDistributedCache()).thenReturn(distributedCache); function.setRuntimeContext(mockedRuntimeContext); @@ -159,13 +162,7 @@ private void verifyRuntimeContext(final RichFunction function) { assertEquals(taskNameWithSubtask,
[jira] [Commented] (FLINK-10597) Enable UDFs support in MATCH_RECOGNIZE
[ https://issues.apache.org/jira/browse/FLINK-10597?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16704405#comment-16704405 ] ASF GitHub Bot commented on FLINK-10597: dawidwys commented on a change in pull request #7189: [FLINK-10597][table] Enabled UDFs support in MATCH_RECOGNIZE URL: https://github.com/apache/flink/pull/7189#discussion_r237778606 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/match/PatternSelectFunctionRunner.scala ## @@ -35,20 +36,26 @@ import org.apache.flink.util.Collector class PatternSelectFunctionRunner( name: String, code: String) - extends PatternFlatSelectFunction[Row, CRow] - with Compiler[PatternSelectFunction[Row, Row]] + extends RichPatternFlatSelectFunction[Row, CRow] Review comment: Agree, had that in mind. I would change that either during introducing the `ALL ROWS PER MATCH` or as a separate issue, as it is not directly connected with this one and requires more changes in the code generator. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Enable UDFs support in MATCH_RECOGNIZE > -- > > Key: FLINK-10597 > URL: https://issues.apache.org/jira/browse/FLINK-10597 > Project: Flink > Issue Type: Sub-task > Components: CEP, Table API SQL >Reporter: Dawid Wysakowicz >Assignee: Dawid Wysakowicz >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10597) Enable UDFs support in MATCH_RECOGNIZE
[ https://issues.apache.org/jira/browse/FLINK-10597?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16704135#comment-16704135 ] ASF GitHub Bot commented on FLINK-10597: dianfu commented on a change in pull request #7189: [FLINK-10597][table] Enabled UDFs support in MATCH_RECOGNIZE URL: https://github.com/apache/flink/pull/7189#discussion_r237722370 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/match/PatternSelectFunctionRunner.scala ## @@ -35,20 +36,26 @@ import org.apache.flink.util.Collector class PatternSelectFunctionRunner( name: String, code: String) - extends PatternFlatSelectFunction[Row, CRow] - with Compiler[PatternSelectFunction[Row, Row]] + extends RichPatternFlatSelectFunction[Row, CRow] Review comment: @dawidwys I have rethought about this issue and what about always generating a `PatternFlatSelectFunction` no matter `ONE ROW PER MATCH` or `ALL ROWS PER MATCH` is specified? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Enable UDFs support in MATCH_RECOGNIZE > -- > > Key: FLINK-10597 > URL: https://issues.apache.org/jira/browse/FLINK-10597 > Project: Flink > Issue Type: Sub-task > Components: CEP, Table API SQL >Reporter: Dawid Wysakowicz >Assignee: Dawid Wysakowicz >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10597) Enable UDFs support in MATCH_RECOGNIZE
[ https://issues.apache.org/jira/browse/FLINK-10597?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16703095#comment-16703095 ] ASF GitHub Bot commented on FLINK-10597: dianfu commented on a change in pull request #7189: [FLINK-10597][table] Enabled UDFs support in MATCH_RECOGNIZE URL: https://github.com/apache/flink/pull/7189#discussion_r237464451 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala ## @@ -131,47 +132,39 @@ class MatchCodeGenerator( : GeneratedFunction[F, T] = { Review comment: We can add something like `reusablePrepareInputStatements` in CodeGenerator and place it before `reuseInputUnboxingCode` when generating the function. Then we can make use of `reusablePrepareInputStatements` to construct the `reusePatternLists`. But I'm fine with the current implementation as `reusablePrepareInputStatements` is currently only useful for CEP. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Enable UDFs support in MATCH_RECOGNIZE > -- > > Key: FLINK-10597 > URL: https://issues.apache.org/jira/browse/FLINK-10597 > Project: Flink > Issue Type: Sub-task > Components: CEP, Table API SQL >Reporter: Dawid Wysakowicz >Assignee: Dawid Wysakowicz >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10597) Enable UDFs support in MATCH_RECOGNIZE
[ https://issues.apache.org/jira/browse/FLINK-10597?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16703096#comment-16703096 ] ASF GitHub Bot commented on FLINK-10597: dianfu commented on a change in pull request #7189: [FLINK-10597][table] Enabled UDFs support in MATCH_RECOGNIZE URL: https://github.com/apache/flink/pull/7189#discussion_r237464564 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/match/PatternSelectFunctionRunner.scala ## @@ -35,20 +36,26 @@ import org.apache.flink.util.Collector class PatternSelectFunctionRunner( name: String, code: String) - extends PatternFlatSelectFunction[Row, CRow] - with Compiler[PatternSelectFunction[Row, Row]] + extends RichPatternFlatSelectFunction[Row, CRow] Review comment: OK. Make sense. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Enable UDFs support in MATCH_RECOGNIZE > -- > > Key: FLINK-10597 > URL: https://issues.apache.org/jira/browse/FLINK-10597 > Project: Flink > Issue Type: Sub-task > Components: CEP, Table API SQL >Reporter: Dawid Wysakowicz >Assignee: Dawid Wysakowicz >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10597) Enable UDFs support in MATCH_RECOGNIZE
[ https://issues.apache.org/jira/browse/FLINK-10597?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16702986#comment-16702986 ] ASF GitHub Bot commented on FLINK-10597: twalthr commented on a change in pull request #7189: [FLINK-10597][table] Enabled UDFs support in MATCH_RECOGNIZE URL: https://github.com/apache/flink/pull/7189#discussion_r237428381 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/MatchRecognizeITCase.scala ## @@ -543,10 +543,79 @@ class MatchRecognizeITCase extends StreamingWithStateTestBase { // We do not assert the proctime in the result, cause it is currently Review comment: We just introduced a new rich class. I think it would also be a perfect time to think about this shortcoming. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Enable UDFs support in MATCH_RECOGNIZE > -- > > Key: FLINK-10597 > URL: https://issues.apache.org/jira/browse/FLINK-10597 > Project: Flink > Issue Type: Sub-task > Components: CEP, Table API SQL >Reporter: Dawid Wysakowicz >Assignee: Dawid Wysakowicz >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10597) Enable UDFs support in MATCH_RECOGNIZE
[ https://issues.apache.org/jira/browse/FLINK-10597?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16702953#comment-16702953 ] ASF GitHub Bot commented on FLINK-10597: dawidwys commented on a change in pull request #7189: [FLINK-10597][table] Enabled UDFs support in MATCH_RECOGNIZE URL: https://github.com/apache/flink/pull/7189#discussion_r237417851 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/match/PatternSelectFunctionRunner.scala ## @@ -35,20 +36,26 @@ import org.apache.flink.util.Collector class PatternSelectFunctionRunner( name: String, code: String) - extends PatternFlatSelectFunction[Row, CRow] - with Compiler[PatternSelectFunction[Row, Row]] + extends RichPatternFlatSelectFunction[Row, CRow] Review comment: We need access to the collector to be able to erase timestamp. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Enable UDFs support in MATCH_RECOGNIZE > -- > > Key: FLINK-10597 > URL: https://issues.apache.org/jira/browse/FLINK-10597 > Project: Flink > Issue Type: Sub-task > Components: CEP, Table API SQL >Reporter: Dawid Wysakowicz >Assignee: Dawid Wysakowicz >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10597) Enable UDFs support in MATCH_RECOGNIZE
[ https://issues.apache.org/jira/browse/FLINK-10597?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16702952#comment-16702952 ] ASF GitHub Bot commented on FLINK-10597: dawidwys commented on a change in pull request #7189: [FLINK-10597][table] Enabled UDFs support in MATCH_RECOGNIZE URL: https://github.com/apache/flink/pull/7189#discussion_r237417676 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/match/IterativeConditionRunner.scala ## @@ -31,30 +31,27 @@ import org.apache.flink.types.Row class IterativeConditionRunner( name: String, code: String) - extends IterativeCondition[Row] - with Compiler[IterativeCondition[Row]] + extends RichIterativeCondition[Row] + with Compiler[RichIterativeCondition[Row]] with Logging { - @transient private var function: IterativeCondition[Row] = _ + @transient private var function: RichIterativeCondition[Row] = _ - def init(): Unit = { + override def open(parameters: Configuration): Unit = { LOG.debug(s"Compiling IterativeCondition: $name \n\n Code:\n$code") -// We cannot get user's classloader currently, see FLINK-6938 for details -val clazz = compile(Thread.currentThread().getContextClassLoader, name, code) +val clazz = compile(getRuntimeContext.getUserCodeClassLoader, name, code) LOG.debug("Instantiating IterativeCondition.") function = clazz.newInstance() -// TODO add logic for opening and closing the function once it can be a RichFunction +FunctionUtils.setFunctionRuntimeContext(function, getRuntimeContext) Review comment: I don't think there is any particular reason. Just wanted to maximally limit the scope. If it is important though, I will change that. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Enable UDFs support in MATCH_RECOGNIZE > -- > > Key: FLINK-10597 > URL: https://issues.apache.org/jira/browse/FLINK-10597 > Project: Flink > Issue Type: Sub-task > Components: CEP, Table API SQL >Reporter: Dawid Wysakowicz >Assignee: Dawid Wysakowicz >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10597) Enable UDFs support in MATCH_RECOGNIZE
[ https://issues.apache.org/jira/browse/FLINK-10597?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16702949#comment-16702949 ] ASF GitHub Bot commented on FLINK-10597: dawidwys commented on a change in pull request #7189: [FLINK-10597][table] Enabled UDFs support in MATCH_RECOGNIZE URL: https://github.com/apache/flink/pull/7189#discussion_r237416695 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/MatchRecognizeITCase.scala ## @@ -543,10 +543,79 @@ class MatchRecognizeITCase extends StreamingWithStateTestBase { // We do not assert the proctime in the result, cause it is currently Review comment: Unfortunately not, there is a separate issue for this: https://issues.apache.org/jira/browse/FLINK-10596 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Enable UDFs support in MATCH_RECOGNIZE > -- > > Key: FLINK-10597 > URL: https://issues.apache.org/jira/browse/FLINK-10597 > Project: Flink > Issue Type: Sub-task > Components: CEP, Table API SQL >Reporter: Dawid Wysakowicz >Assignee: Dawid Wysakowicz >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10597) Enable UDFs support in MATCH_RECOGNIZE
[ https://issues.apache.org/jira/browse/FLINK-10597?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16702940#comment-16702940 ] ASF GitHub Bot commented on FLINK-10597: dawidwys commented on a change in pull request #7189: [FLINK-10597][table] Enabled UDFs support in MATCH_RECOGNIZE URL: https://github.com/apache/flink/pull/7189#discussion_r237414177 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala ## @@ -131,47 +132,39 @@ class MatchCodeGenerator( : GeneratedFunction[F, T] = { Review comment: There is one additional obstacle right now. The `reusePatternLists()` call, I would remove the comment and leave it separated, as there might be some additional differences in the future. What do you think? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Enable UDFs support in MATCH_RECOGNIZE > -- > > Key: FLINK-10597 > URL: https://issues.apache.org/jira/browse/FLINK-10597 > Project: Flink > Issue Type: Sub-task > Components: CEP, Table API SQL >Reporter: Dawid Wysakowicz >Assignee: Dawid Wysakowicz >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10597) Enable UDFs support in MATCH_RECOGNIZE
[ https://issues.apache.org/jira/browse/FLINK-10597?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16702933#comment-16702933 ] ASF GitHub Bot commented on FLINK-10597: dianfu commented on a change in pull request #7189: [FLINK-10597][table] Enabled UDFs support in MATCH_RECOGNIZE URL: https://github.com/apache/flink/pull/7189#discussion_r237407834 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala ## @@ -131,47 +132,39 @@ class MatchCodeGenerator( : GeneratedFunction[F, T] = { Review comment: We can already reuse `FunctionCodeGenerator.generateFunction()` as IterativeCondition and Pattern(Flat)SelectFunction already support rich interfaces. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Enable UDFs support in MATCH_RECOGNIZE > -- > > Key: FLINK-10597 > URL: https://issues.apache.org/jira/browse/FLINK-10597 > Project: Flink > Issue Type: Sub-task > Components: CEP, Table API SQL >Reporter: Dawid Wysakowicz >Assignee: Dawid Wysakowicz >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10597) Enable UDFs support in MATCH_RECOGNIZE
[ https://issues.apache.org/jira/browse/FLINK-10597?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16702934#comment-16702934 ] ASF GitHub Bot commented on FLINK-10597: dianfu commented on a change in pull request #7189: [FLINK-10597][table] Enabled UDFs support in MATCH_RECOGNIZE URL: https://github.com/apache/flink/pull/7189#discussion_r237411315 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/match/PatternSelectFunctionRunner.scala ## @@ -35,20 +36,26 @@ import org.apache.flink.util.Collector class PatternSelectFunctionRunner( name: String, code: String) - extends PatternFlatSelectFunction[Row, CRow] - with Compiler[PatternSelectFunction[Row, Row]] + extends RichPatternFlatSelectFunction[Row, CRow] Review comment: Is there any reason we use a `RichPatternFlatSelectFunction` which actually wraps a `RichPatternSelectFunction`? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Enable UDFs support in MATCH_RECOGNIZE > -- > > Key: FLINK-10597 > URL: https://issues.apache.org/jira/browse/FLINK-10597 > Project: Flink > Issue Type: Sub-task > Components: CEP, Table API SQL >Reporter: Dawid Wysakowicz >Assignee: Dawid Wysakowicz >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10597) Enable UDFs support in MATCH_RECOGNIZE
[ https://issues.apache.org/jira/browse/FLINK-10597?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16702894#comment-16702894 ] ASF GitHub Bot commented on FLINK-10597: twalthr commented on a change in pull request #7189: [FLINK-10597][table] Enabled UDFs support in MATCH_RECOGNIZE URL: https://github.com/apache/flink/pull/7189#discussion_r237402448 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/MatchRecognizeITCase.scala ## @@ -543,10 +543,79 @@ class MatchRecognizeITCase extends StreamingWithStateTestBase { // We do not assert the proctime in the result, cause it is currently // accessed from System.currentTimeMillis(), so there is no graceful way to assert the proctime } + + @Test + def testRichUdfs(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +env.setParallelism(1) +val tEnv = TableEnvironment.getTableEnvironment(env) +tEnv.getConfig.setMaxGeneratedCodeLength(1) +StreamITCase.clear + +val data = new mutable.MutableList[(Int, String, Long)] +data.+=((1, "a", 1)) +data.+=((2, "a", 1)) +data.+=((3, "a", 1)) +data.+=((4, "a", 1)) +data.+=((5, "a", 1)) +data.+=((6, "b", 1)) +data.+=((7, "a", 1)) +data.+=((8, "a", 1)) +data.+=((9, "f", 1)) + +val t = env.fromCollection(data) + .toTable(tEnv, 'id, 'name, 'price, 'proctime.proctime) +tEnv.registerTable("MyTable", t) +tEnv.registerFunction("prefix", new RichScalarFunc) +val prefix = "PREF" +UserDefinedFunctionTestUtils + .setJobParameters(env, Map("prefix" -> prefix)) + +val sqlQuery = + s""" + |SELECT * + |FROM MyTable + |MATCH_RECOGNIZE ( + | ORDER BY proctime + | MEASURES + |FIRST(id) as firstId, + |prefix(A.name) as prefixedNameA, + |LAST(id) as lastId + | AFTER MATCH SKIP PAST LAST ROW + | PATTERN (A+ C) + | DEFINE + |A AS prefix(A.name) = '$prefix:a' + |) AS T + |""".stripMargin + +val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row] +result.addSink(new StreamITCase.StringSink[Row]) +env.execute() + +val expected = mutable.MutableList("1,PREF:a,6", "7,PREF:a,9") +assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } } class ToMillis extends ScalarFunction { def eval(t: Timestamp): Long = { t.toInstant.toEpochMilli + TimeZone.getDefault.getOffset(t.toInstant.toEpochMilli) } } + +private class RichScalarFunc extends ScalarFunction { Review comment: Give more meaningful name. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Enable UDFs support in MATCH_RECOGNIZE > -- > > Key: FLINK-10597 > URL: https://issues.apache.org/jira/browse/FLINK-10597 > Project: Flink > Issue Type: Sub-task > Components: CEP, Table API SQL >Reporter: Dawid Wysakowicz >Assignee: Dawid Wysakowicz >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10597) Enable UDFs support in MATCH_RECOGNIZE
[ https://issues.apache.org/jira/browse/FLINK-10597?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16702899#comment-16702899 ] ASF GitHub Bot commented on FLINK-10597: twalthr commented on a change in pull request #7189: [FLINK-10597][table] Enabled UDFs support in MATCH_RECOGNIZE URL: https://github.com/apache/flink/pull/7189#discussion_r237401057 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/MatchRecognizeITCase.scala ## @@ -543,10 +543,79 @@ class MatchRecognizeITCase extends StreamingWithStateTestBase { // We do not assert the proctime in the result, cause it is currently // accessed from System.currentTimeMillis(), so there is no graceful way to assert the proctime } + + @Test + def testRichUdfs(): Unit = { Review comment: nit: There are no `RichUdfs` just `UserDefinedFunctions` ;-) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Enable UDFs support in MATCH_RECOGNIZE > -- > > Key: FLINK-10597 > URL: https://issues.apache.org/jira/browse/FLINK-10597 > Project: Flink > Issue Type: Sub-task > Components: CEP, Table API SQL >Reporter: Dawid Wysakowicz >Assignee: Dawid Wysakowicz >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10597) Enable UDFs support in MATCH_RECOGNIZE
[ https://issues.apache.org/jira/browse/FLINK-10597?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16702896#comment-16702896 ] ASF GitHub Bot commented on FLINK-10597: twalthr commented on a change in pull request #7189: [FLINK-10597][table] Enabled UDFs support in MATCH_RECOGNIZE URL: https://github.com/apache/flink/pull/7189#discussion_r237399713 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/match/IterativeConditionRunner.scala ## @@ -31,30 +31,27 @@ import org.apache.flink.types.Row class IterativeConditionRunner( name: String, code: String) - extends IterativeCondition[Row] - with Compiler[IterativeCondition[Row]] + extends RichIterativeCondition[Row] + with Compiler[RichIterativeCondition[Row]] with Logging { - @transient private var function: IterativeCondition[Row] = _ + @transient private var function: RichIterativeCondition[Row] = _ - def init(): Unit = { + override def open(parameters: Configuration): Unit = { LOG.debug(s"Compiling IterativeCondition: $name \n\n Code:\n$code") -// We cannot get user's classloader currently, see FLINK-6938 for details -val clazz = compile(Thread.currentThread().getContextClassLoader, name, code) +val clazz = compile(getRuntimeContext.getUserCodeClassLoader, name, code) LOG.debug("Instantiating IterativeCondition.") function = clazz.newInstance() -// TODO add logic for opening and closing the function once it can be a RichFunction +FunctionUtils.setFunctionRuntimeContext(function, getRuntimeContext) +FunctionUtils.openFunction(function, parameters) } override def filter(value: Row, ctx: IterativeCondition.Context[Row]): Boolean = { function.filter(value, ctx) } - @throws(classOf[IOException]) - private def readObject(in: ObjectInputStream): Unit = { -in.defaultReadObject() -if (function == null) { - init() -} + override def close(): Unit = { +super.close() +function.close() Review comment: Use `org.apache.flink.api.common.functions.util.FunctionUtils#closeFunction` instead? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Enable UDFs support in MATCH_RECOGNIZE > -- > > Key: FLINK-10597 > URL: https://issues.apache.org/jira/browse/FLINK-10597 > Project: Flink > Issue Type: Sub-task > Components: CEP, Table API SQL >Reporter: Dawid Wysakowicz >Assignee: Dawid Wysakowicz >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10597) Enable UDFs support in MATCH_RECOGNIZE
[ https://issues.apache.org/jira/browse/FLINK-10597?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16702898#comment-16702898 ] ASF GitHub Bot commented on FLINK-10597: twalthr commented on a change in pull request #7189: [FLINK-10597][table] Enabled UDFs support in MATCH_RECOGNIZE URL: https://github.com/apache/flink/pull/7189#discussion_r237400562 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/MatchRecognizeITCase.scala ## @@ -543,10 +543,79 @@ class MatchRecognizeITCase extends StreamingWithStateTestBase { // We do not assert the proctime in the result, cause it is currently Review comment: I guess this problem has not been solved with the new rich function? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Enable UDFs support in MATCH_RECOGNIZE > -- > > Key: FLINK-10597 > URL: https://issues.apache.org/jira/browse/FLINK-10597 > Project: Flink > Issue Type: Sub-task > Components: CEP, Table API SQL >Reporter: Dawid Wysakowicz >Assignee: Dawid Wysakowicz >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10597) Enable UDFs support in MATCH_RECOGNIZE
[ https://issues.apache.org/jira/browse/FLINK-10597?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16702895#comment-16702895 ] ASF GitHub Bot commented on FLINK-10597: twalthr commented on a change in pull request #7189: [FLINK-10597][table] Enabled UDFs support in MATCH_RECOGNIZE URL: https://github.com/apache/flink/pull/7189#discussion_r237398248 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/match/IterativeConditionRunner.scala ## @@ -31,30 +31,27 @@ import org.apache.flink.types.Row class IterativeConditionRunner( name: String, code: String) - extends IterativeCondition[Row] - with Compiler[IterativeCondition[Row]] + extends RichIterativeCondition[Row] + with Compiler[RichIterativeCondition[Row]] with Logging { - @transient private var function: IterativeCondition[Row] = _ + @transient private var function: RichIterativeCondition[Row] = _ - def init(): Unit = { + override def open(parameters: Configuration): Unit = { LOG.debug(s"Compiling IterativeCondition: $name \n\n Code:\n$code") -// We cannot get user's classloader currently, see FLINK-6938 for details -val clazz = compile(Thread.currentThread().getContextClassLoader, name, code) +val clazz = compile(getRuntimeContext.getUserCodeClassLoader, name, code) LOG.debug("Instantiating IterativeCondition.") function = clazz.newInstance() -// TODO add logic for opening and closing the function once it can be a RichFunction +FunctionUtils.setFunctionRuntimeContext(function, getRuntimeContext) Review comment: Is there a reason why a distributed cache is not supported in a `CepRuntimeContext`? We don't expose many context properties in SQL UDFs but the distributed cache apparently seems to be important. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Enable UDFs support in MATCH_RECOGNIZE > -- > > Key: FLINK-10597 > URL: https://issues.apache.org/jira/browse/FLINK-10597 > Project: Flink > Issue Type: Sub-task > Components: CEP, Table API SQL >Reporter: Dawid Wysakowicz >Assignee: Dawid Wysakowicz >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10597) Enable UDFs support in MATCH_RECOGNIZE
[ https://issues.apache.org/jira/browse/FLINK-10597?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16702892#comment-16702892 ] ASF GitHub Bot commented on FLINK-10597: twalthr commented on a change in pull request #7189: [FLINK-10597][table] Enabled UDFs support in MATCH_RECOGNIZE URL: https://github.com/apache/flink/pull/7189#discussion_r237396997 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala ## @@ -131,47 +132,39 @@ class MatchCodeGenerator( : GeneratedFunction[F, T] = { val funcName = newName(name) val collectorTypeTerm = classOf[Collector[Any]].getCanonicalName -val (functionClass, signature, inputStatements, isInterface) = - if (clazz == classOf[IterativeCondition[_]]) { -val baseClass = classOf[IterativeCondition[_]] +val (functionClass, signature, inputStatements) = + if (clazz == classOf[IterativeCondition[_]] || clazz == classOf[RichIterativeCondition[_]]) { Review comment: Is there a reason to support both interface and rich class? I would keep it simple here and only support the rich variant. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Enable UDFs support in MATCH_RECOGNIZE > -- > > Key: FLINK-10597 > URL: https://issues.apache.org/jira/browse/FLINK-10597 > Project: Flink > Issue Type: Sub-task > Components: CEP, Table API SQL >Reporter: Dawid Wysakowicz >Assignee: Dawid Wysakowicz >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10597) Enable UDFs support in MATCH_RECOGNIZE
[ https://issues.apache.org/jira/browse/FLINK-10597?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16702900#comment-16702900 ] ASF GitHub Bot commented on FLINK-10597: twalthr commented on a change in pull request #7189: [FLINK-10597][table] Enabled UDFs support in MATCH_RECOGNIZE URL: https://github.com/apache/flink/pull/7189#discussion_r237402642 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/MatchRecognizeITCase.scala ## @@ -543,10 +543,79 @@ class MatchRecognizeITCase extends StreamingWithStateTestBase { // We do not assert the proctime in the result, cause it is currently // accessed from System.currentTimeMillis(), so there is no graceful way to assert the proctime } + + @Test + def testRichUdfs(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +env.setParallelism(1) +val tEnv = TableEnvironment.getTableEnvironment(env) +tEnv.getConfig.setMaxGeneratedCodeLength(1) +StreamITCase.clear + +val data = new mutable.MutableList[(Int, String, Long)] +data.+=((1, "a", 1)) +data.+=((2, "a", 1)) +data.+=((3, "a", 1)) +data.+=((4, "a", 1)) +data.+=((5, "a", 1)) +data.+=((6, "b", 1)) +data.+=((7, "a", 1)) +data.+=((8, "a", 1)) +data.+=((9, "f", 1)) + +val t = env.fromCollection(data) + .toTable(tEnv, 'id, 'name, 'price, 'proctime.proctime) +tEnv.registerTable("MyTable", t) +tEnv.registerFunction("prefix", new RichScalarFunc) +val prefix = "PREF" +UserDefinedFunctionTestUtils + .setJobParameters(env, Map("prefix" -> prefix)) + +val sqlQuery = + s""" + |SELECT * + |FROM MyTable + |MATCH_RECOGNIZE ( + | ORDER BY proctime + | MEASURES + |FIRST(id) as firstId, + |prefix(A.name) as prefixedNameA, + |LAST(id) as lastId + | AFTER MATCH SKIP PAST LAST ROW + | PATTERN (A+ C) + | DEFINE + |A AS prefix(A.name) = '$prefix:a' + |) AS T + |""".stripMargin + +val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row] +result.addSink(new StreamITCase.StringSink[Row]) +env.execute() + +val expected = mutable.MutableList("1,PREF:a,6", "7,PREF:a,9") +assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } } class ToMillis extends ScalarFunction { def eval(t: Timestamp): Long = { t.toInstant.toEpochMilli + TimeZone.getDefault.getOffset(t.toInstant.toEpochMilli) } } + +private class RichScalarFunc extends ScalarFunction { + + private var prefix = "ERROR_VALUE" + + override def open(context: FunctionContext): Unit = { +prefix = context.getJobParameter("prefix", "") + } + + def eval(value: String): String = { +s"$prefix:$value" + } + + override def close(): Unit = { +prefix = "ERROR_VALUE" Review comment: This line has no meaning so far. Remove the entire method? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Enable UDFs support in MATCH_RECOGNIZE > -- > > Key: FLINK-10597 > URL: https://issues.apache.org/jira/browse/FLINK-10597 > Project: Flink > Issue Type: Sub-task > Components: CEP, Table API SQL >Reporter: Dawid Wysakowicz >Assignee: Dawid Wysakowicz >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10597) Enable UDFs support in MATCH_RECOGNIZE
[ https://issues.apache.org/jira/browse/FLINK-10597?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16702893#comment-16702893 ] ASF GitHub Bot commented on FLINK-10597: twalthr commented on a change in pull request #7189: [FLINK-10597][table] Enabled UDFs support in MATCH_RECOGNIZE URL: https://github.com/apache/flink/pull/7189#discussion_r237398634 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/match/IterativeConditionRunner.scala ## @@ -31,30 +31,27 @@ import org.apache.flink.types.Row class IterativeConditionRunner( name: String, code: String) - extends IterativeCondition[Row] - with Compiler[IterativeCondition[Row]] + extends RichIterativeCondition[Row] + with Compiler[RichIterativeCondition[Row]] with Logging { - @transient private var function: IterativeCondition[Row] = _ + @transient private var function: RichIterativeCondition[Row] = _ - def init(): Unit = { + override def open(parameters: Configuration): Unit = { LOG.debug(s"Compiling IterativeCondition: $name \n\n Code:\n$code") -// We cannot get user's classloader currently, see FLINK-6938 for details -val clazz = compile(Thread.currentThread().getContextClassLoader, name, code) +val clazz = compile(getRuntimeContext.getUserCodeClassLoader, name, code) LOG.debug("Instantiating IterativeCondition.") function = clazz.newInstance() -// TODO add logic for opening and closing the function once it can be a RichFunction +FunctionUtils.setFunctionRuntimeContext(function, getRuntimeContext) +FunctionUtils.openFunction(function, parameters) } override def filter(value: Row, ctx: IterativeCondition.Context[Row]): Boolean = { function.filter(value, ctx) } - @throws(classOf[IOException]) - private def readObject(in: ObjectInputStream): Unit = { -in.defaultReadObject() -if (function == null) { - init() -} + override def close(): Unit = { +super.close() Review comment: Remove this line? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Enable UDFs support in MATCH_RECOGNIZE > -- > > Key: FLINK-10597 > URL: https://issues.apache.org/jira/browse/FLINK-10597 > Project: Flink > Issue Type: Sub-task > Components: CEP, Table API SQL >Reporter: Dawid Wysakowicz >Assignee: Dawid Wysakowicz >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10597) Enable UDFs support in MATCH_RECOGNIZE
[ https://issues.apache.org/jira/browse/FLINK-10597?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16702897#comment-16702897 ] ASF GitHub Bot commented on FLINK-10597: twalthr commented on a change in pull request #7189: [FLINK-10597][table] Enabled UDFs support in MATCH_RECOGNIZE URL: https://github.com/apache/flink/pull/7189#discussion_r237399504 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/match/PatternSelectFunctionRunner.scala ## @@ -60,17 +67,9 @@ class PatternSelectFunctionRunner( out.collect(outCRow) } - @throws(classOf[IOException]) - private def readObject(in: ObjectInputStream): Unit = { -in.defaultReadObject() - -if (outCRow == null) { - outCRow = new CRow(null, true) -} - -if (function == null) { - init() -} + override def close(): Unit = { +super.close() Review comment: Remove this line? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Enable UDFs support in MATCH_RECOGNIZE > -- > > Key: FLINK-10597 > URL: https://issues.apache.org/jira/browse/FLINK-10597 > Project: Flink > Issue Type: Sub-task > Components: CEP, Table API SQL >Reporter: Dawid Wysakowicz >Assignee: Dawid Wysakowicz >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10597) Enable UDFs support in MATCH_RECOGNIZE
[ https://issues.apache.org/jira/browse/FLINK-10597?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16702901#comment-16702901 ] ASF GitHub Bot commented on FLINK-10597: twalthr commented on a change in pull request #7189: [FLINK-10597][table] Enabled UDFs support in MATCH_RECOGNIZE URL: https://github.com/apache/flink/pull/7189#discussion_r237399688 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/match/PatternSelectFunctionRunner.scala ## @@ -60,17 +67,9 @@ class PatternSelectFunctionRunner( out.collect(outCRow) } - @throws(classOf[IOException]) - private def readObject(in: ObjectInputStream): Unit = { -in.defaultReadObject() - -if (outCRow == null) { - outCRow = new CRow(null, true) -} - -if (function == null) { - init() -} + override def close(): Unit = { +super.close() +function.close() Review comment: Use `org.apache.flink.api.common.functions.util.FunctionUtils#closeFunction` instead? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Enable UDFs support in MATCH_RECOGNIZE > -- > > Key: FLINK-10597 > URL: https://issues.apache.org/jira/browse/FLINK-10597 > Project: Flink > Issue Type: Sub-task > Components: CEP, Table API SQL >Reporter: Dawid Wysakowicz >Assignee: Dawid Wysakowicz >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10597) Enable UDFs support in MATCH_RECOGNIZE
[ https://issues.apache.org/jira/browse/FLINK-10597?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16701970#comment-16701970 ] ASF GitHub Bot commented on FLINK-10597: dawidwys opened a new pull request #7189: [FLINK-10597] Enabled UDFs support in MATCH_RECOGNIZE URL: https://github.com/apache/flink/pull/7189 ## What is the purpose of the change This PR enables user defined scalar function in MATCH_RECOGNIZE ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (**yes** / no) - If yes, how is the feature documented? (not applicable / **docs** / JavaDocs / not documented) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Enable UDFs support in MATCH_RECOGNIZE > -- > > Key: FLINK-10597 > URL: https://issues.apache.org/jira/browse/FLINK-10597 > Project: Flink > Issue Type: Sub-task > Components: CEP, Table API SQL >Reporter: Dawid Wysakowicz >Assignee: Dawid Wysakowicz >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)