[GitHub] flink pull request: [FLINK-3678] Make Flink logs directory configu...
Github user stefanobaghino commented on a diff in the pull request: https://github.com/apache/flink/pull/1837#discussion_r57986135 --- Diff: docs/setup/config.md --- @@ -275,6 +275,10 @@ For example when running Flink on YARN on an environment with a restrictive fire - `recovery.job.delay`: (Default 'akka.ask.timeout') Defines the delay before persisted jobs are recovered in case of a recovery situation. +## Environment + +- `env.log.dir`: (Defaults to the `log` directory under Flink's home) Defines the directory where the Flink logs are saved. --- End diff -- I did some research but checking for a valid path across all possible operative systems and filesystems would be prohibitive, I guess we'll have to settle for the documentation at this time. :smiley: Thanks for pointing it out! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3678) Make Flink logs directory configurable
[ https://issues.apache.org/jira/browse/FLINK-3678?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15219122#comment-15219122 ] ASF GitHub Bot commented on FLINK-3678: --- Github user stefanobaghino commented on a diff in the pull request: https://github.com/apache/flink/pull/1837#discussion_r57986135 --- Diff: docs/setup/config.md --- @@ -275,6 +275,10 @@ For example when running Flink on YARN on an environment with a restrictive fire - `recovery.job.delay`: (Default 'akka.ask.timeout') Defines the delay before persisted jobs are recovered in case of a recovery situation. +## Environment + +- `env.log.dir`: (Defaults to the `log` directory under Flink's home) Defines the directory where the Flink logs are saved. --- End diff -- I did some research but checking for a valid path across all possible operative systems and filesystems would be prohibitive, I guess we'll have to settle for the documentation at this time. :smiley: Thanks for pointing it out! > Make Flink logs directory configurable > -- > > Key: FLINK-3678 > URL: https://issues.apache.org/jira/browse/FLINK-3678 > Project: Flink > Issue Type: Improvement > Components: Start-Stop Scripts >Affects Versions: 1.0.0 >Reporter: Stefano Baghino >Assignee: Stefano Baghino >Priority: Minor > Fix For: 1.0.1 > > > Currently Flink logs are stored under {{$FLINK_HOME/log}} and the user cannot > configure an alternative storage location. It would be nice to add a > configuration key in the {{flink-conf.yaml}} and edit the {{bin/flink}} > launch script accordingly to get the value if present or default to the > current behavior if no value is provided. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3678) Make Flink logs directory configurable
[ https://issues.apache.org/jira/browse/FLINK-3678?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15218761#comment-15218761 ] ASF GitHub Bot commented on FLINK-3678: --- Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/1837#discussion_r57955999 --- Diff: docs/setup/config.md --- @@ -275,6 +275,10 @@ For example when running Flink on YARN on an environment with a restrictive fire - `recovery.job.delay`: (Default 'akka.ask.timeout') Defines the delay before persisted jobs are recovered in case of a recovery situation. +## Environment + +- `env.log.dir`: (Defaults to the `log` directory under Flink's home) Defines the directory where the Flink logs are saved. --- End diff -- Yes, adding a comment would be very good! A check even better. :+1: > Make Flink logs directory configurable > -- > > Key: FLINK-3678 > URL: https://issues.apache.org/jira/browse/FLINK-3678 > Project: Flink > Issue Type: Improvement > Components: Start-Stop Scripts >Affects Versions: 1.0.0 >Reporter: Stefano Baghino >Assignee: Stefano Baghino >Priority: Minor > Fix For: 1.0.1 > > > Currently Flink logs are stored under {{$FLINK_HOME/log}} and the user cannot > configure an alternative storage location. It would be nice to add a > configuration key in the {{flink-conf.yaml}} and edit the {{bin/flink}} > launch script accordingly to get the value if present or default to the > current behavior if no value is provided. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3678] Make Flink logs directory configu...
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/1837#discussion_r57955999 --- Diff: docs/setup/config.md --- @@ -275,6 +275,10 @@ For example when running Flink on YARN on an environment with a restrictive fire - `recovery.job.delay`: (Default 'akka.ask.timeout') Defines the delay before persisted jobs are recovered in case of a recovery situation. +## Environment + +- `env.log.dir`: (Defaults to the `log` directory under Flink's home) Defines the directory where the Flink logs are saved. --- End diff -- Yes, adding a comment would be very good! A check even better. :+1: --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3579) Improve String concatenation
[ https://issues.apache.org/jira/browse/FLINK-3579?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15218417#comment-15218417 ] ASF GitHub Bot commented on FLINK-3579: --- Github user ramkrish86 commented on a diff in the pull request: https://github.com/apache/flink/pull/1821#discussion_r57930954 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ExpressionParser.scala --- @@ -59,6 +59,8 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers { Literal(str.toInt) } else if (str.endsWith("f") | str.endsWith("F")) { Literal(str.toFloat) +} else if (str.endsWith(".")) { --- End diff -- So in that sense what is the valid number expression? I cannot say 1.CAST(STRING) then? If that is not valid then 1.abs() is also not vaild right? > Improve String concatenation > > > Key: FLINK-3579 > URL: https://issues.apache.org/jira/browse/FLINK-3579 > Project: Flink > Issue Type: Bug > Components: Table API >Reporter: Timo Walther >Assignee: ramkrishna.s.vasudevan >Priority: Minor > > Concatenation of a String and non-String does not work properly. > e.g. {{f0 + 42}} leads to RelBuilder Exception > ExpressionParser does not like {{f0 + 42.cast(STRING)}} either. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3664) Create a method to easily Summarize a DataSet
[ https://issues.apache.org/jira/browse/FLINK-3664?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15218418#comment-15218418 ] Todd Lisonbee commented on FLINK-3664: -- I've completed a first pass for this implementation and would like any early feedback, https://github.com/tlisonbee/flink/commit/2a7ad55d704bd3188ea8ae4cbfb7f40319474eef (the important changes you might want to look at are in Aggregator, NumericSummaryAggregator, and DataSetUtils) My "to do" list before submitting pull request: - Blanket the code with comments, unit tests, and integration tests - Incorporate any early feedback Tasks I was planning on doing under a follow-on JIRA (not part of initial pull request): - Add support for more data types (unless any others seem like must-have, I can do now) - Add a summarize() method for GroupedDataSets Thanks. > Create a method to easily Summarize a DataSet > - > > Key: FLINK-3664 > URL: https://issues.apache.org/jira/browse/FLINK-3664 > Project: Flink > Issue Type: Improvement >Reporter: Todd Lisonbee > Attachments: DataSet-Summary-Design-March2016-v1.txt > > > Here is an example: > {code} > /** > * Summarize a DataSet of Tuples by collecting single pass statistics for all > columns > */ > public Tuple summarize() > Dataset> input = // [...] > Tuple3 summary > = input.summarize() > summary.getField(0).stddev() > summary.getField(1).maxStringLength() > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: FLINK-3579 Improve String concatenation (Ram)
Github user ramkrish86 commented on a diff in the pull request: https://github.com/apache/flink/pull/1821#discussion_r57930954 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ExpressionParser.scala --- @@ -59,6 +59,8 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers { Literal(str.toInt) } else if (str.endsWith("f") | str.endsWith("F")) { Literal(str.toFloat) +} else if (str.endsWith(".")) { --- End diff -- So in that sense what is the valid number expression? I cannot say 1.CAST(STRING) then? If that is not valid then 1.abs() is also not vaild right? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3682) CEP operator does not set the processing timestamp correctly
[ https://issues.apache.org/jira/browse/FLINK-3682?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15218358#comment-15218358 ] ASF GitHub Bot commented on FLINK-3682: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/1841 > CEP operator does not set the processing timestamp correctly > > > Key: FLINK-3682 > URL: https://issues.apache.org/jira/browse/FLINK-3682 > Project: Flink > Issue Type: Bug > Components: CEP >Affects Versions: 1.0.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Fix For: 1.1.0, 1.0.1 > > > In the wake of reworking the timestamp assignment where the processing > timestamp has to be set now by the {{StreamOperator}}, the CEP operators have > not been adapted. This causes that the timestamp value assigned to the > {{StreamRecord}} is used. In case of processing time this is > {{Long.MIN_VALUE}}. In combination with a CEP time window, this can lead to > an underflow in the NFA where the window time is subtracted from the current > timestamp value. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3681) CEP library does not support Java 8 lambdas as select function
[ https://issues.apache.org/jira/browse/FLINK-3681?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15218357#comment-15218357 ] ASF GitHub Bot commented on FLINK-3681: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/1840 > CEP library does not support Java 8 lambdas as select function > -- > > Key: FLINK-3681 > URL: https://issues.apache.org/jira/browse/FLINK-3681 > Project: Flink > Issue Type: Bug > Components: CEP >Affects Versions: 1.0.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Minor > Fix For: 1.1.0, 1.0.1 > > > Currently, the CEP library does not support Java 8 lambdas to be used as > {{select}} or {{flatSelect}} function. The problem is that the > {{TypeExtractor}} has different semantics when calling > {{TypeExtractor.getUnaryOperatorReturnType}} either with a Java 8 lambda or > an instance of an UDF function. > To illustrate the problem assume we have the following UDF function > {code} > public interface MyFunction[T, O] { > O foobar(MapinputElements); > } > {code} > When calling the {{TypeExtractor}} with an anonymous class which implements > this interface, the first type parameter is considered being the input type > of the function, namely {{T}}. > In contrast, when providing a Java 8 lambda for this interface, the > {{TypeExtractor}} will see an input type of {{Map }}. > This problem also occurs with a {{FlatMapFunction}} whose first type argument > is {{T}} but whose first parameter of a Java 8 lambda is {{Iterable}}. In > order to solve the problem here, the > {{TypeExtractor.getUnaryOperatorReturnType}} method has the parameters > {{hasIterable}} and {{hasCollector}}. If these values are {{true}}, then a > special code path is taken (in case of a Java 8 lambda), where the input type > is compared to the first type argument of the first input parameter of the > lambda (here an {{Iterable}}). This hand-knitted solution does not > generalize well, as it will fail for all parameterized types which have the > input type at a different position (e.g. {{Map }}. > In order to solve the problem, I propose to generalize the > {{getUnaryOperatorReturnType}} a little bit so that one can specify at which > position the input type is specified by a parameterized type. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3682] [cep] Assign processing timestamp...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/1841 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3681] [cep, typeextractor] Generalize T...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/1840 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: FLINK-3579 Improve String concatenation (Ram)
Github user ramkrish86 commented on the pull request: https://github.com/apache/flink/pull/1821#issuecomment-203530162 Sorry for the wrong observation. I can see that 'a + 42l.abs().cast(STRING)' works fine. But 'a + 42.3.abs().cast(STRING)' does not work. It fails here ` case DOUBLE => val decimal = BigDecimal(value.asInstanceOf[java.math.BigDecimal]) if (decimal.isValidDouble) { generateNonNullLiteral(resultType, decimal.doubleValue().toString) } else { throw new CodeGenException("Decimal can not be converted to double.") }` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3579) Improve String concatenation
[ https://issues.apache.org/jira/browse/FLINK-3579?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15218329#comment-15218329 ] ASF GitHub Bot commented on FLINK-3579: --- Github user ramkrish86 commented on the pull request: https://github.com/apache/flink/pull/1821#issuecomment-203530162 Sorry for the wrong observation. I can see that 'a + 42l.abs().cast(STRING)' works fine. But 'a + 42.3.abs().cast(STRING)' does not work. It fails here ` case DOUBLE => val decimal = BigDecimal(value.asInstanceOf[java.math.BigDecimal]) if (decimal.isValidDouble) { generateNonNullLiteral(resultType, decimal.doubleValue().toString) } else { throw new CodeGenException("Decimal can not be converted to double.") }` > Improve String concatenation > > > Key: FLINK-3579 > URL: https://issues.apache.org/jira/browse/FLINK-3579 > Project: Flink > Issue Type: Bug > Components: Table API >Reporter: Timo Walther >Assignee: ramkrishna.s.vasudevan >Priority: Minor > > Concatenation of a String and non-String does not work properly. > e.g. {{f0 + 42}} leads to RelBuilder Exception > ExpressionParser does not like {{f0 + 42.cast(STRING)}} either. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3579) Improve String concatenation
[ https://issues.apache.org/jira/browse/FLINK-3579?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15218281#comment-15218281 ] ASF GitHub Bot commented on FLINK-3579: --- Github user ramkrish86 commented on a diff in the pull request: https://github.com/apache/flink/pull/1821#discussion_r57921166 --- Diff: flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/StringExpressionsITCase.java --- @@ -121,6 +120,66 @@ public void testNonWorkingSubstring2() throws Exception { resultSet.collect(); } + @Test + public void testStringConcat() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + TableEnvironment tableEnv = new TableEnvironment(); + + DataSet> ds = env.fromElements( + new Tuple2<>("ABCD", 3), + new Tuple2<>("ABCD", 2)); + + Table in = tableEnv.fromDataSet(ds, "a, b"); + + Table result = in + .select("a + b + 42"); --- End diff -- `What happens if you do "42 + a" or even "42 + b + a"?` Am able to make the above work. `a + 42.abs().cast(STRING)` I think as you said 42.abs() also does not work leave alone the .CAST added to it. abs() does not work because it is not added in the ExpressionParser.scala itself. Reading the comment on ImplicitExpressionOperations i can see that the expressionDsl.scala and ExpressionParser should be in sync. You want me to add support for abs in this patch only? Let me know. > Improve String concatenation > > > Key: FLINK-3579 > URL: https://issues.apache.org/jira/browse/FLINK-3579 > Project: Flink > Issue Type: Bug > Components: Table API >Reporter: Timo Walther >Assignee: ramkrishna.s.vasudevan >Priority: Minor > > Concatenation of a String and non-String does not work properly. > e.g. {{f0 + 42}} leads to RelBuilder Exception > ExpressionParser does not like {{f0 + 42.cast(STRING)}} either. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: FLINK-3579 Improve String concatenation (Ram)
Github user ramkrish86 commented on a diff in the pull request: https://github.com/apache/flink/pull/1821#discussion_r57921166 --- Diff: flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/StringExpressionsITCase.java --- @@ -121,6 +120,66 @@ public void testNonWorkingSubstring2() throws Exception { resultSet.collect(); } + @Test + public void testStringConcat() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + TableEnvironment tableEnv = new TableEnvironment(); + + DataSet> ds = env.fromElements( + new Tuple2<>("ABCD", 3), + new Tuple2<>("ABCD", 2)); + + Table in = tableEnv.fromDataSet(ds, "a, b"); + + Table result = in + .select("a + b + 42"); --- End diff -- `What happens if you do "42 + a" or even "42 + b + a"?` Am able to make the above work. `a + 42.abs().cast(STRING)` I think as you said 42.abs() also does not work leave alone the .CAST added to it. abs() does not work because it is not added in the ExpressionParser.scala itself. Reading the comment on ImplicitExpressionOperations i can see that the expressionDsl.scala and ExpressionParser should be in sync. You want me to add support for abs in this patch only? Let me know. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3682) CEP operator does not set the processing timestamp correctly
[ https://issues.apache.org/jira/browse/FLINK-3682?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15218257#comment-15218257 ] ASF GitHub Bot commented on FLINK-3682: --- Github user uce commented on the pull request: https://github.com/apache/flink/pull/1841#issuecomment-203513525 Till's local Travis passed (https://travis-ci.org/tillrohrmann/flink/builds/119528848) with one unrelated failing build. I'm going to merge this. > CEP operator does not set the processing timestamp correctly > > > Key: FLINK-3682 > URL: https://issues.apache.org/jira/browse/FLINK-3682 > Project: Flink > Issue Type: Bug > Components: CEP >Affects Versions: 1.0.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Fix For: 1.1.0, 1.0.1 > > > In the wake of reworking the timestamp assignment where the processing > timestamp has to be set now by the {{StreamOperator}}, the CEP operators have > not been adapted. This causes that the timestamp value assigned to the > {{StreamRecord}} is used. In case of processing time this is > {{Long.MIN_VALUE}}. In combination with a CEP time window, this can lead to > an underflow in the NFA where the window time is subtracted from the current > timestamp value. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3682] [cep] Assign processing timestamp...
Github user uce commented on the pull request: https://github.com/apache/flink/pull/1841#issuecomment-203513525 Till's local Travis passed (https://travis-ci.org/tillrohrmann/flink/builds/119528848) with one unrelated failing build. I'm going to merge this. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3678) Make Flink logs directory configurable
[ https://issues.apache.org/jira/browse/FLINK-3678?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15218198#comment-15218198 ] ASF GitHub Bot commented on FLINK-3678: --- Github user stefanobaghino commented on a diff in the pull request: https://github.com/apache/flink/pull/1837#discussion_r57913426 --- Diff: docs/setup/config.md --- @@ -275,6 +275,10 @@ For example when running Flink on YARN on an environment with a restrictive fire - `recovery.job.delay`: (Default 'akka.ask.timeout') Defines the delay before persisted jobs are recovered in case of a recovery situation. +## Environment + +- `env.log.dir`: (Defaults to the `log` directory under Flink's home) Defines the directory where the Flink logs are saved. --- End diff -- No, the user has to input an absolute path. I wouldn't recommend adding this option, a possible improvement would be to document this restriction and to check at startup for malformed paths. Would `config.sh` be a good place to perform this check? > Make Flink logs directory configurable > -- > > Key: FLINK-3678 > URL: https://issues.apache.org/jira/browse/FLINK-3678 > Project: Flink > Issue Type: Improvement > Components: Start-Stop Scripts >Affects Versions: 1.0.0 >Reporter: Stefano Baghino >Assignee: Stefano Baghino >Priority: Minor > Fix For: 1.0.1 > > > Currently Flink logs are stored under {{$FLINK_HOME/log}} and the user cannot > configure an alternative storage location. It would be nice to add a > configuration key in the {{flink-conf.yaml}} and edit the {{bin/flink}} > launch script accordingly to get the value if present or default to the > current behavior if no value is provided. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3678] Make Flink logs directory configu...
Github user stefanobaghino commented on a diff in the pull request: https://github.com/apache/flink/pull/1837#discussion_r57913426 --- Diff: docs/setup/config.md --- @@ -275,6 +275,10 @@ For example when running Flink on YARN on an environment with a restrictive fire - `recovery.job.delay`: (Default 'akka.ask.timeout') Defines the delay before persisted jobs are recovered in case of a recovery situation. +## Environment + +- `env.log.dir`: (Defaults to the `log` directory under Flink's home) Defines the directory where the Flink logs are saved. --- End diff -- No, the user has to input an absolute path. I wouldn't recommend adding this option, a possible improvement would be to document this restriction and to check at startup for malformed paths. Would `config.sh` be a good place to perform this check? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3681) CEP library does not support Java 8 lambdas as select function
[ https://issues.apache.org/jira/browse/FLINK-3681?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15218175#comment-15218175 ] ASF GitHub Bot commented on FLINK-3681: --- Github user uce commented on the pull request: https://github.com/apache/flink/pull/1840#issuecomment-203494813 Till's local Travis passed (https://travis-ci.org/tillrohrmann/flink/builds/119523927). I'm going to merge this. > CEP library does not support Java 8 lambdas as select function > -- > > Key: FLINK-3681 > URL: https://issues.apache.org/jira/browse/FLINK-3681 > Project: Flink > Issue Type: Bug > Components: CEP >Affects Versions: 1.0.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Minor > Fix For: 1.1.0, 1.0.1 > > > Currently, the CEP library does not support Java 8 lambdas to be used as > {{select}} or {{flatSelect}} function. The problem is that the > {{TypeExtractor}} has different semantics when calling > {{TypeExtractor.getUnaryOperatorReturnType}} either with a Java 8 lambda or > an instance of an UDF function. > To illustrate the problem assume we have the following UDF function > {code} > public interface MyFunction[T, O] { > O foobar(MapinputElements); > } > {code} > When calling the {{TypeExtractor}} with an anonymous class which implements > this interface, the first type parameter is considered being the input type > of the function, namely {{T}}. > In contrast, when providing a Java 8 lambda for this interface, the > {{TypeExtractor}} will see an input type of {{Map }}. > This problem also occurs with a {{FlatMapFunction}} whose first type argument > is {{T}} but whose first parameter of a Java 8 lambda is {{Iterable}}. In > order to solve the problem here, the > {{TypeExtractor.getUnaryOperatorReturnType}} method has the parameters > {{hasIterable}} and {{hasCollector}}. If these values are {{true}}, then a > special code path is taken (in case of a Java 8 lambda), where the input type > is compared to the first type argument of the first input parameter of the > lambda (here an {{Iterable}}). This hand-knitted solution does not > generalize well, as it will fail for all parameterized types which have the > input type at a different position (e.g. {{Map }}. > In order to solve the problem, I propose to generalize the > {{getUnaryOperatorReturnType}} a little bit so that one can specify at which > position the input type is specified by a parameterized type. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3681] [cep, typeextractor] Generalize T...
Github user uce commented on the pull request: https://github.com/apache/flink/pull/1840#issuecomment-203494813 Till's local Travis passed (https://travis-ci.org/tillrohrmann/flink/builds/119523927). I'm going to merge this. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request:
Github user uce commented on the pull request: https://github.com/apache/flink/commit/ce448cdbdd301585b0faea4fe4c920856ee37818#commitcomment-16897467 Did not add this to `master` yet as the ASF might support `rsync` for `home.apache.org` again until `1.1`... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3681] [cep, typeextractor] Generalize T...
Github user tillrohrmann commented on the pull request: https://github.com/apache/flink/pull/1840#issuecomment-203466325 Thanks for the review @uce --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3681) CEP library does not support Java 8 lambdas as select function
[ https://issues.apache.org/jira/browse/FLINK-3681?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15218057#comment-15218057 ] ASF GitHub Bot commented on FLINK-3681: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1840#discussion_r57900102 --- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java --- @@ -291,12 +347,15 @@ protected TypeExtractor() { // parameters must be accessed from behind, since JVM can add additional parameters e.g. when using local variables inside lambda function final int paramLen = m.getGenericParameterTypes().length - 1; - final Type input = (hasCollector)? m.getGenericParameterTypes()[paramLen - 1] : m.getGenericParameterTypes()[paramLen]; - validateInputType((hasIterable)?removeGenericWrapper(input) : input, inType); + final Type input = (outputTypeArgumentIndex >= 0) ? m.getGenericParameterTypes()[paramLen - 1] : m.getGenericParameterTypes()[paramLen]; + validateInputType((inputTypeArgumentIndex >= 0) ? extractTypeArgument(input, inputTypeArgumentIndex) : input, inType); if(function instanceof ResultTypeQueryable) { return ((ResultTypeQueryable) function).getProducedType(); } - return new TypeExtractor().privateCreateTypeInfo((hasCollector)? removeGenericWrapper(m.getGenericParameterTypes()[paramLen]) : m.getGenericReturnType(), inType, null); + return new TypeExtractor().privateCreateTypeInfo(( + outputTypeArgumentIndex >= 0)? extractTypeArgument(m.getGenericParameterTypes()[paramLen], outputTypeArgumentIndex) : m.getGenericReturnType(), --- End diff -- Good catch. Will fix it. > CEP library does not support Java 8 lambdas as select function > -- > > Key: FLINK-3681 > URL: https://issues.apache.org/jira/browse/FLINK-3681 > Project: Flink > Issue Type: Bug > Components: CEP >Affects Versions: 1.0.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Minor > Fix For: 1.1.0, 1.0.1 > > > Currently, the CEP library does not support Java 8 lambdas to be used as > {{select}} or {{flatSelect}} function. The problem is that the > {{TypeExtractor}} has different semantics when calling > {{TypeExtractor.getUnaryOperatorReturnType}} either with a Java 8 lambda or > an instance of an UDF function. > To illustrate the problem assume we have the following UDF function > {code} > public interface MyFunction[T, O] { > O foobar(MapinputElements); > } > {code} > When calling the {{TypeExtractor}} with an anonymous class which implements > this interface, the first type parameter is considered being the input type > of the function, namely {{T}}. > In contrast, when providing a Java 8 lambda for this interface, the > {{TypeExtractor}} will see an input type of {{Map }}. > This problem also occurs with a {{FlatMapFunction}} whose first type argument > is {{T}} but whose first parameter of a Java 8 lambda is {{Iterable}}. In > order to solve the problem here, the > {{TypeExtractor.getUnaryOperatorReturnType}} method has the parameters > {{hasIterable}} and {{hasCollector}}. If these values are {{true}}, then a > special code path is taken (in case of a Java 8 lambda), where the input type > is compared to the first type argument of the first input parameter of the > lambda (here an {{Iterable}}). This hand-knitted solution does not > generalize well, as it will fail for all parameterized types which have the > input type at a different position (e.g. {{Map }}. > In order to solve the problem, I propose to generalize the > {{getUnaryOperatorReturnType}} a little bit so that one can specify at which > position the input type is specified by a parameterized type. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3681] [cep, typeextractor] Generalize T...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1840#discussion_r57900102 --- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java --- @@ -291,12 +347,15 @@ protected TypeExtractor() { // parameters must be accessed from behind, since JVM can add additional parameters e.g. when using local variables inside lambda function final int paramLen = m.getGenericParameterTypes().length - 1; - final Type input = (hasCollector)? m.getGenericParameterTypes()[paramLen - 1] : m.getGenericParameterTypes()[paramLen]; - validateInputType((hasIterable)?removeGenericWrapper(input) : input, inType); + final Type input = (outputTypeArgumentIndex >= 0) ? m.getGenericParameterTypes()[paramLen - 1] : m.getGenericParameterTypes()[paramLen]; + validateInputType((inputTypeArgumentIndex >= 0) ? extractTypeArgument(input, inputTypeArgumentIndex) : input, inType); if(function instanceof ResultTypeQueryable) { return ((ResultTypeQueryable) function).getProducedType(); } - return new TypeExtractor().privateCreateTypeInfo((hasCollector)? removeGenericWrapper(m.getGenericParameterTypes()[paramLen]) : m.getGenericReturnType(), inType, null); + return new TypeExtractor().privateCreateTypeInfo(( + outputTypeArgumentIndex >= 0)? extractTypeArgument(m.getGenericParameterTypes()[paramLen], outputTypeArgumentIndex) : m.getGenericReturnType(), --- End diff -- Good catch. Will fix it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Updated] (FLINK-3683) Unstable tests: in the Kafka09ITCase, the testMultipleSourcesOnePartition() and the testOneToOneSources()
[ https://issues.apache.org/jira/browse/FLINK-3683?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler updated FLINK-3683: Labels: test-stability (was: ) > Unstable tests: in the Kafka09ITCase, the testMultipleSourcesOnePartition() > and the testOneToOneSources() > - > > Key: FLINK-3683 > URL: https://issues.apache.org/jira/browse/FLINK-3683 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Reporter: Kostas Kloudas > Labels: test-stability > > The aforementioned tests fail sometimes. To reproduce the behavior put them > in a for-loop and let them run 100 times. In this case the problem seems to > be that the topic was not deleted before being recreated for the next run. > And for a trace on Travis, look here: > https://api.travis-ci.org/jobs/119493332/log.txt?deansi=true > (although this was not on the master branch) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3678) Make Flink logs directory configurable
[ https://issues.apache.org/jira/browse/FLINK-3678?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15218068#comment-15218068 ] ASF GitHub Bot commented on FLINK-3678: --- Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/1837#discussion_r57900813 --- Diff: docs/setup/config.md --- @@ -275,6 +275,10 @@ For example when running Flink on YARN on an environment with a restrictive fire - `recovery.job.delay`: (Default 'akka.ask.timeout') Defines the delay before persisted jobs are recovered in case of a recovery situation. +## Environment + +- `env.log.dir`: (Defaults to the `log` directory under Flink's home) Defines the directory where the Flink logs are saved. --- End diff -- Are relative paths resolved relative to Flink's root dir? > Make Flink logs directory configurable > -- > > Key: FLINK-3678 > URL: https://issues.apache.org/jira/browse/FLINK-3678 > Project: Flink > Issue Type: Improvement > Components: Start-Stop Scripts >Affects Versions: 1.0.0 >Reporter: Stefano Baghino >Assignee: Stefano Baghino >Priority: Minor > Fix For: 1.0.1 > > > Currently Flink logs are stored under {{$FLINK_HOME/log}} and the user cannot > configure an alternative storage location. It would be nice to add a > configuration key in the {{flink-conf.yaml}} and edit the {{bin/flink}} > launch script accordingly to get the value if present or default to the > current behavior if no value is provided. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3678] Make Flink logs directory configu...
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/1837#discussion_r57900813 --- Diff: docs/setup/config.md --- @@ -275,6 +275,10 @@ For example when running Flink on YARN on an environment with a restrictive fire - `recovery.job.delay`: (Default 'akka.ask.timeout') Defines the delay before persisted jobs are recovered in case of a recovery situation. +## Environment + +- `env.log.dir`: (Defaults to the `log` directory under Flink's home) Defines the directory where the Flink logs are saved. --- End diff -- Are relative paths resolved relative to Flink's root dir? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3682] [cep] Assign processing timestamp...
Github user tillrohrmann commented on the pull request: https://github.com/apache/flink/pull/1841#issuecomment-203466453 Thanks for the review @uce --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3681) CEP library does not support Java 8 lambdas as select function
[ https://issues.apache.org/jira/browse/FLINK-3681?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15218066#comment-15218066 ] ASF GitHub Bot commented on FLINK-3681: --- Github user tillrohrmann commented on the pull request: https://github.com/apache/flink/pull/1840#issuecomment-203466649 Once Travis gives green light I'll merge it. > CEP library does not support Java 8 lambdas as select function > -- > > Key: FLINK-3681 > URL: https://issues.apache.org/jira/browse/FLINK-3681 > Project: Flink > Issue Type: Bug > Components: CEP >Affects Versions: 1.0.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Minor > Fix For: 1.1.0, 1.0.1 > > > Currently, the CEP library does not support Java 8 lambdas to be used as > {{select}} or {{flatSelect}} function. The problem is that the > {{TypeExtractor}} has different semantics when calling > {{TypeExtractor.getUnaryOperatorReturnType}} either with a Java 8 lambda or > an instance of an UDF function. > To illustrate the problem assume we have the following UDF function > {code} > public interface MyFunction[T, O] { > O foobar(MapinputElements); > } > {code} > When calling the {{TypeExtractor}} with an anonymous class which implements > this interface, the first type parameter is considered being the input type > of the function, namely {{T}}. > In contrast, when providing a Java 8 lambda for this interface, the > {{TypeExtractor}} will see an input type of {{Map }}. > This problem also occurs with a {{FlatMapFunction}} whose first type argument > is {{T}} but whose first parameter of a Java 8 lambda is {{Iterable}}. In > order to solve the problem here, the > {{TypeExtractor.getUnaryOperatorReturnType}} method has the parameters > {{hasIterable}} and {{hasCollector}}. If these values are {{true}}, then a > special code path is taken (in case of a Java 8 lambda), where the input type > is compared to the first type argument of the first input parameter of the > lambda (here an {{Iterable}}). This hand-knitted solution does not > generalize well, as it will fail for all parameterized types which have the > input type at a different position (e.g. {{Map }}. > In order to solve the problem, I propose to generalize the > {{getUnaryOperatorReturnType}} a little bit so that one can specify at which > position the input type is specified by a parameterized type. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3681] [cep, typeextractor] Generalize T...
Github user tillrohrmann commented on the pull request: https://github.com/apache/flink/pull/1840#issuecomment-203466649 Once Travis gives green light I'll merge it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3682) CEP operator does not set the processing timestamp correctly
[ https://issues.apache.org/jira/browse/FLINK-3682?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15218065#comment-15218065 ] ASF GitHub Bot commented on FLINK-3682: --- Github user tillrohrmann commented on the pull request: https://github.com/apache/flink/pull/1841#issuecomment-203466535 Once Travis gives green light I'll merge it. > CEP operator does not set the processing timestamp correctly > > > Key: FLINK-3682 > URL: https://issues.apache.org/jira/browse/FLINK-3682 > Project: Flink > Issue Type: Bug > Components: CEP >Affects Versions: 1.0.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Fix For: 1.1.0, 1.0.1 > > > In the wake of reworking the timestamp assignment where the processing > timestamp has to be set now by the {{StreamOperator}}, the CEP operators have > not been adapted. This causes that the timestamp value assigned to the > {{StreamRecord}} is used. In case of processing time this is > {{Long.MIN_VALUE}}. In combination with a CEP time window, this can lead to > an underflow in the NFA where the window time is subtracted from the current > timestamp value. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3682] [cep] Assign processing timestamp...
Github user tillrohrmann commented on the pull request: https://github.com/apache/flink/pull/1841#issuecomment-203466535 Once Travis gives green light I'll merge it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3681) CEP library does not support Java 8 lambdas as select function
[ https://issues.apache.org/jira/browse/FLINK-3681?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15218063#comment-15218063 ] ASF GitHub Bot commented on FLINK-3681: --- Github user tillrohrmann commented on the pull request: https://github.com/apache/flink/pull/1840#issuecomment-203466325 Thanks for the review @uce > CEP library does not support Java 8 lambdas as select function > -- > > Key: FLINK-3681 > URL: https://issues.apache.org/jira/browse/FLINK-3681 > Project: Flink > Issue Type: Bug > Components: CEP >Affects Versions: 1.0.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Minor > Fix For: 1.1.0, 1.0.1 > > > Currently, the CEP library does not support Java 8 lambdas to be used as > {{select}} or {{flatSelect}} function. The problem is that the > {{TypeExtractor}} has different semantics when calling > {{TypeExtractor.getUnaryOperatorReturnType}} either with a Java 8 lambda or > an instance of an UDF function. > To illustrate the problem assume we have the following UDF function > {code} > public interface MyFunction[T, O] { > O foobar(MapinputElements); > } > {code} > When calling the {{TypeExtractor}} with an anonymous class which implements > this interface, the first type parameter is considered being the input type > of the function, namely {{T}}. > In contrast, when providing a Java 8 lambda for this interface, the > {{TypeExtractor}} will see an input type of {{Map }}. > This problem also occurs with a {{FlatMapFunction}} whose first type argument > is {{T}} but whose first parameter of a Java 8 lambda is {{Iterable}}. In > order to solve the problem here, the > {{TypeExtractor.getUnaryOperatorReturnType}} method has the parameters > {{hasIterable}} and {{hasCollector}}. If these values are {{true}}, then a > special code path is taken (in case of a Java 8 lambda), where the input type > is compared to the first type argument of the first input parameter of the > lambda (here an {{Iterable}}). This hand-knitted solution does not > generalize well, as it will fail for all parameterized types which have the > input type at a different position (e.g. {{Map }}. > In order to solve the problem, I propose to generalize the > {{getUnaryOperatorReturnType}} a little bit so that one can specify at which > position the input type is specified by a parameterized type. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3682) CEP operator does not set the processing timestamp correctly
[ https://issues.apache.org/jira/browse/FLINK-3682?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15218064#comment-15218064 ] ASF GitHub Bot commented on FLINK-3682: --- Github user tillrohrmann commented on the pull request: https://github.com/apache/flink/pull/1841#issuecomment-203466453 Thanks for the review @uce > CEP operator does not set the processing timestamp correctly > > > Key: FLINK-3682 > URL: https://issues.apache.org/jira/browse/FLINK-3682 > Project: Flink > Issue Type: Bug > Components: CEP >Affects Versions: 1.0.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Fix For: 1.1.0, 1.0.1 > > > In the wake of reworking the timestamp assignment where the processing > timestamp has to be set now by the {{StreamOperator}}, the CEP operators have > not been adapted. This causes that the timestamp value assigned to the > {{StreamRecord}} is used. In case of processing time this is > {{Long.MIN_VALUE}}. In combination with a CEP time window, this can lead to > an underflow in the NFA where the window time is subtracted from the current > timestamp value. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3681) CEP library does not support Java 8 lambdas as select function
[ https://issues.apache.org/jira/browse/FLINK-3681?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15218060#comment-15218060 ] ASF GitHub Bot commented on FLINK-3681: --- Github user uce commented on the pull request: https://github.com/apache/flink/pull/1840#issuecomment-203465710 Looks like a nice generalization of the iterable and collector cases. Good that you did not introduce a `hasMap` ;-) +1 to merge to `master` and `release-1.0` after tests pass. > CEP library does not support Java 8 lambdas as select function > -- > > Key: FLINK-3681 > URL: https://issues.apache.org/jira/browse/FLINK-3681 > Project: Flink > Issue Type: Bug > Components: CEP >Affects Versions: 1.0.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Minor > Fix For: 1.1.0, 1.0.1 > > > Currently, the CEP library does not support Java 8 lambdas to be used as > {{select}} or {{flatSelect}} function. The problem is that the > {{TypeExtractor}} has different semantics when calling > {{TypeExtractor.getUnaryOperatorReturnType}} either with a Java 8 lambda or > an instance of an UDF function. > To illustrate the problem assume we have the following UDF function > {code} > public interface MyFunction[T, O] { > O foobar(MapinputElements); > } > {code} > When calling the {{TypeExtractor}} with an anonymous class which implements > this interface, the first type parameter is considered being the input type > of the function, namely {{T}}. > In contrast, when providing a Java 8 lambda for this interface, the > {{TypeExtractor}} will see an input type of {{Map }}. > This problem also occurs with a {{FlatMapFunction}} whose first type argument > is {{T}} but whose first parameter of a Java 8 lambda is {{Iterable}}. In > order to solve the problem here, the > {{TypeExtractor.getUnaryOperatorReturnType}} method has the parameters > {{hasIterable}} and {{hasCollector}}. If these values are {{true}}, then a > special code path is taken (in case of a Java 8 lambda), where the input type > is compared to the first type argument of the first input parameter of the > lambda (here an {{Iterable}}). This hand-knitted solution does not > generalize well, as it will fail for all parameterized types which have the > input type at a different position (e.g. {{Map }}. > In order to solve the problem, I propose to generalize the > {{getUnaryOperatorReturnType}} a little bit so that one can specify at which > position the input type is specified by a parameterized type. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3681] [cep, typeextractor] Generalize T...
Github user uce commented on the pull request: https://github.com/apache/flink/pull/1840#issuecomment-203465710 Looks like a nice generalization of the iterable and collector cases. Good that you did not introduce a `hasMap` ;-) +1 to merge to `master` and `release-1.0` after tests pass. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3678) Make Flink logs directory configurable
[ https://issues.apache.org/jira/browse/FLINK-3678?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15218049#comment-15218049 ] ASF GitHub Bot commented on FLINK-3678: --- Github user StephanEwen commented on the pull request: https://github.com/apache/flink/pull/1837#issuecomment-203462424 Looks good to me! > Make Flink logs directory configurable > -- > > Key: FLINK-3678 > URL: https://issues.apache.org/jira/browse/FLINK-3678 > Project: Flink > Issue Type: Improvement > Components: Start-Stop Scripts >Affects Versions: 1.0.0 >Reporter: Stefano Baghino >Assignee: Stefano Baghino >Priority: Minor > Fix For: 1.0.1 > > > Currently Flink logs are stored under {{$FLINK_HOME/log}} and the user cannot > configure an alternative storage location. It would be nice to add a > configuration key in the {{flink-conf.yaml}} and edit the {{bin/flink}} > launch script accordingly to get the value if present or default to the > current behavior if no value is provided. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3678] Make Flink logs directory configu...
Github user StephanEwen commented on the pull request: https://github.com/apache/flink/pull/1837#issuecomment-203462424 Looks good to me! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3681) CEP library does not support Java 8 lambdas as select function
[ https://issues.apache.org/jira/browse/FLINK-3681?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15218041#comment-15218041 ] ASF GitHub Bot commented on FLINK-3681: --- Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/1840#discussion_r57898915 --- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java --- @@ -291,12 +347,15 @@ protected TypeExtractor() { // parameters must be accessed from behind, since JVM can add additional parameters e.g. when using local variables inside lambda function final int paramLen = m.getGenericParameterTypes().length - 1; - final Type input = (hasCollector)? m.getGenericParameterTypes()[paramLen - 1] : m.getGenericParameterTypes()[paramLen]; - validateInputType((hasIterable)?removeGenericWrapper(input) : input, inType); + final Type input = (outputTypeArgumentIndex >= 0) ? m.getGenericParameterTypes()[paramLen - 1] : m.getGenericParameterTypes()[paramLen]; + validateInputType((inputTypeArgumentIndex >= 0) ? extractTypeArgument(input, inputTypeArgumentIndex) : input, inType); if(function instanceof ResultTypeQueryable) { return ((ResultTypeQueryable) function).getProducedType(); } - return new TypeExtractor().privateCreateTypeInfo((hasCollector)? removeGenericWrapper(m.getGenericParameterTypes()[paramLen]) : m.getGenericReturnType(), inType, null); + return new TypeExtractor().privateCreateTypeInfo(( + outputTypeArgumentIndex >= 0)? extractTypeArgument(m.getGenericParameterTypes()[paramLen], outputTypeArgumentIndex) : m.getGenericReturnType(), --- End diff -- Trivial: - line break after second `(` on purpose? - White space before `?` missing > CEP library does not support Java 8 lambdas as select function > -- > > Key: FLINK-3681 > URL: https://issues.apache.org/jira/browse/FLINK-3681 > Project: Flink > Issue Type: Bug > Components: CEP >Affects Versions: 1.0.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Minor > Fix For: 1.1.0, 1.0.1 > > > Currently, the CEP library does not support Java 8 lambdas to be used as > {{select}} or {{flatSelect}} function. The problem is that the > {{TypeExtractor}} has different semantics when calling > {{TypeExtractor.getUnaryOperatorReturnType}} either with a Java 8 lambda or > an instance of an UDF function. > To illustrate the problem assume we have the following UDF function > {code} > public interface MyFunction[T, O] { > O foobar(MapinputElements); > } > {code} > When calling the {{TypeExtractor}} with an anonymous class which implements > this interface, the first type parameter is considered being the input type > of the function, namely {{T}}. > In contrast, when providing a Java 8 lambda for this interface, the > {{TypeExtractor}} will see an input type of {{Map }}. > This problem also occurs with a {{FlatMapFunction}} whose first type argument > is {{T}} but whose first parameter of a Java 8 lambda is {{Iterable}}. In > order to solve the problem here, the > {{TypeExtractor.getUnaryOperatorReturnType}} method has the parameters > {{hasIterable}} and {{hasCollector}}. If these values are {{true}}, then a > special code path is taken (in case of a Java 8 lambda), where the input type > is compared to the first type argument of the first input parameter of the > lambda (here an {{Iterable}}). This hand-knitted solution does not > generalize well, as it will fail for all parameterized types which have the > input type at a different position (e.g. {{Map }}. > In order to solve the problem, I propose to generalize the > {{getUnaryOperatorReturnType}} a little bit so that one can specify at which > position the input type is specified by a parameterized type. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3681] [cep, typeextractor] Generalize T...
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/1840#discussion_r57898915 --- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java --- @@ -291,12 +347,15 @@ protected TypeExtractor() { // parameters must be accessed from behind, since JVM can add additional parameters e.g. when using local variables inside lambda function final int paramLen = m.getGenericParameterTypes().length - 1; - final Type input = (hasCollector)? m.getGenericParameterTypes()[paramLen - 1] : m.getGenericParameterTypes()[paramLen]; - validateInputType((hasIterable)?removeGenericWrapper(input) : input, inType); + final Type input = (outputTypeArgumentIndex >= 0) ? m.getGenericParameterTypes()[paramLen - 1] : m.getGenericParameterTypes()[paramLen]; + validateInputType((inputTypeArgumentIndex >= 0) ? extractTypeArgument(input, inputTypeArgumentIndex) : input, inType); if(function instanceof ResultTypeQueryable) { return ((ResultTypeQueryable) function).getProducedType(); } - return new TypeExtractor().privateCreateTypeInfo((hasCollector)? removeGenericWrapper(m.getGenericParameterTypes()[paramLen]) : m.getGenericReturnType(), inType, null); + return new TypeExtractor().privateCreateTypeInfo(( + outputTypeArgumentIndex >= 0)? extractTypeArgument(m.getGenericParameterTypes()[paramLen], outputTypeArgumentIndex) : m.getGenericReturnType(), --- End diff -- Trivial: - line break after second `(` on purpose? - White space before `?` missing --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3682) CEP operator does not set the processing timestamp correctly
[ https://issues.apache.org/jira/browse/FLINK-3682?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15218029#comment-15218029 ] ASF GitHub Bot commented on FLINK-3682: --- Github user uce commented on the pull request: https://github.com/apache/flink/pull/1841#issuecomment-203456547 Good catch! +1 to merge to `master` and `release-1.0` > CEP operator does not set the processing timestamp correctly > > > Key: FLINK-3682 > URL: https://issues.apache.org/jira/browse/FLINK-3682 > Project: Flink > Issue Type: Bug > Components: CEP >Affects Versions: 1.0.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Fix For: 1.1.0, 1.0.1 > > > In the wake of reworking the timestamp assignment where the processing > timestamp has to be set now by the {{StreamOperator}}, the CEP operators have > not been adapted. This causes that the timestamp value assigned to the > {{StreamRecord}} is used. In case of processing time this is > {{Long.MIN_VALUE}}. In combination with a CEP time window, this can lead to > an underflow in the NFA where the window time is subtracted from the current > timestamp value. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: FLINK-3579 Improve String concatenation (Ram)
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1821#discussion_r57896997 --- Diff: flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/StringExpressionsITCase.java --- @@ -121,6 +120,66 @@ public void testNonWorkingSubstring2() throws Exception { resultSet.collect(); } + @Test + public void testStringConcat() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + TableEnvironment tableEnv = new TableEnvironment(); + + DataSet> ds = env.fromElements( + new Tuple2<>("ABCD", 3), + new Tuple2<>("ABCD", 2)); + + Table in = tableEnv.fromDataSet(ds, "a, b"); + + Table result = in + .select("a + b + 42"); --- End diff -- Just out of curiosity and because I'm at a loss here, why does `a + 42.abs().cast(STRING)` does not work? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3579) Improve String concatenation
[ https://issues.apache.org/jira/browse/FLINK-3579?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15218028#comment-15218028 ] ASF GitHub Bot commented on FLINK-3579: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1821#discussion_r57896997 --- Diff: flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/StringExpressionsITCase.java --- @@ -121,6 +120,66 @@ public void testNonWorkingSubstring2() throws Exception { resultSet.collect(); } + @Test + public void testStringConcat() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + TableEnvironment tableEnv = new TableEnvironment(); + + DataSet> ds = env.fromElements( + new Tuple2<>("ABCD", 3), + new Tuple2<>("ABCD", 2)); + + Table in = tableEnv.fromDataSet(ds, "a, b"); + + Table result = in + .select("a + b + 42"); --- End diff -- Just out of curiosity and because I'm at a loss here, why does `a + 42.abs().cast(STRING)` does not work? > Improve String concatenation > > > Key: FLINK-3579 > URL: https://issues.apache.org/jira/browse/FLINK-3579 > Project: Flink > Issue Type: Bug > Components: Table API >Reporter: Timo Walther >Assignee: ramkrishna.s.vasudevan >Priority: Minor > > Concatenation of a String and non-String does not work properly. > e.g. {{f0 + 42}} leads to RelBuilder Exception > ExpressionParser does not like {{f0 + 42.cast(STRING)}} either. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3682] [cep] Assign processing timestamp...
Github user uce commented on the pull request: https://github.com/apache/flink/pull/1841#issuecomment-203456547 Good catch! +1 to merge to `master` and `release-1.0` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3579) Improve String concatenation
[ https://issues.apache.org/jira/browse/FLINK-3579?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15218019#comment-15218019 ] ASF GitHub Bot commented on FLINK-3579: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1821#discussion_r57896311 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ExpressionParser.scala --- @@ -59,6 +59,8 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers { Literal(str.toInt) } else if (str.endsWith("f") | str.endsWith("F")) { Literal(str.toFloat) +} else if (str.endsWith(".")) { --- End diff -- Seems a little bit like a hack to me. Ideally, you should design your grammar such that you won't see the token `1.` if it is not a valid number expression. > Improve String concatenation > > > Key: FLINK-3579 > URL: https://issues.apache.org/jira/browse/FLINK-3579 > Project: Flink > Issue Type: Bug > Components: Table API >Reporter: Timo Walther >Assignee: ramkrishna.s.vasudevan >Priority: Minor > > Concatenation of a String and non-String does not work properly. > e.g. {{f0 + 42}} leads to RelBuilder Exception > ExpressionParser does not like {{f0 + 42.cast(STRING)}} either. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: FLINK-3579 Improve String concatenation (Ram)
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1821#discussion_r57896311 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ExpressionParser.scala --- @@ -59,6 +59,8 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers { Literal(str.toInt) } else if (str.endsWith("f") | str.endsWith("F")) { Literal(str.toFloat) +} else if (str.endsWith(".")) { --- End diff -- Seems a little bit like a hack to me. Ideally, you should design your grammar such that you won't see the token `1.` if it is not a valid number expression. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Issue Comment Deleted] (FLINK-3683) Unstable tests: in the Kafka09ITCase, the testMultipleSourcesOnePartition() and the testOneToOneSources()
[ https://issues.apache.org/jira/browse/FLINK-3683?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler updated FLINK-3683: Comment: was deleted (was: Looked at the failures, they are not related to Kafka, but FLINK-3594.) > Unstable tests: in the Kafka09ITCase, the testMultipleSourcesOnePartition() > and the testOneToOneSources() > - > > Key: FLINK-3683 > URL: https://issues.apache.org/jira/browse/FLINK-3683 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Reporter: Kostas Kloudas > > The aforementioned tests fail sometimes. To reproduce the behavior put them > in a for-loop and let them run 100 times. In this case the problem seems to > be that the topic was not deleted before being recreated for the next run. > And for a trace on Travis, look here: > https://api.travis-ci.org/jobs/119493332/log.txt?deansi=true > (although this was not on the master branch) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3683) Unstable tests: in the Kafka09ITCase, the testMultipleSourcesOnePartition() and the testOneToOneSources()
[ https://issues.apache.org/jira/browse/FLINK-3683?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15218015#comment-15218015 ] Chesnay Schepler commented on FLINK-3683: - Looked at the failures, they are not related to Kafka, but FLINK-3594. > Unstable tests: in the Kafka09ITCase, the testMultipleSourcesOnePartition() > and the testOneToOneSources() > - > > Key: FLINK-3683 > URL: https://issues.apache.org/jira/browse/FLINK-3683 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Reporter: Kostas Kloudas > > The aforementioned tests fail sometimes. To reproduce the behavior put them > in a for-loop and let them run 100 times. In this case the problem seems to > be that the topic was not deleted before being recreated for the next run. > And for a trace on Travis, look here: > https://api.travis-ci.org/jobs/119493332/log.txt?deansi=true > (although this was not on the master branch) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-3683) Unstable tests: in the Kafka09ITCase, the testMultipleSourcesOnePartition() and the testOneToOneSources()
[ https://issues.apache.org/jira/browse/FLINK-3683?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kostas Kloudas updated FLINK-3683: -- Description: The aforementioned tests fail sometimes. To reproduce the behavior put them in a for-loop and let them run 100 times. In this case the problem seems to be that the topic was not deleted before being recreated for the next run. And for a trace on Travis, look here: https://api.travis-ci.org/jobs/119493332/log.txt?deansi=true (although this was not on the master branch) was: The aforementioned tests fail sometimes. To reproduce the behavior put them in a for-loop and let them run 100 times. In this case the problem seems to be that the topic was not deleted before being recreated for the next run. > Unstable tests: in the Kafka09ITCase, the testMultipleSourcesOnePartition() > and the testOneToOneSources() > - > > Key: FLINK-3683 > URL: https://issues.apache.org/jira/browse/FLINK-3683 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Reporter: Kostas Kloudas > > The aforementioned tests fail sometimes. To reproduce the behavior put them > in a for-loop and let them run 100 times. In this case the problem seems to > be that the topic was not deleted before being recreated for the next run. > And for a trace on Travis, look here: > https://api.travis-ci.org/jobs/119493332/log.txt?deansi=true > (although this was not on the master branch) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-3683) Unstable tests: in the Kafka09ITCase, the testMultipleSourcesOnePartition() and the testOneToOneSources()
Kostas Kloudas created FLINK-3683: - Summary: Unstable tests: in the Kafka09ITCase, the testMultipleSourcesOnePartition() and the testOneToOneSources() Key: FLINK-3683 URL: https://issues.apache.org/jira/browse/FLINK-3683 Project: Flink Issue Type: Bug Components: Kafka Connector Reporter: Kostas Kloudas The aforementioned tests fail sometimes. To reproduce the behavior put them in a for-loop and let them run 100 times. In this case the problem seems to be that the topic was not deleted before being recreated for the next run. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3682) CEP operator does not set the processing timestamp correctly
[ https://issues.apache.org/jira/browse/FLINK-3682?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15217969#comment-15217969 ] ASF GitHub Bot commented on FLINK-3682: --- GitHub user tillrohrmann opened a pull request: https://github.com/apache/flink/pull/1841 [FLINK-3682] [cep] Assign processing timestamp in CEP operators This PR fixes the problem that the CEP operators did not assign the wall clock time as the timestamp to incoming in StreamRecords if the TimeCharacteristic was set to ProcessingTime. Processing element with a Long.MIN_VALUE timestamp can lead to underflows in the NFA if a positive window length is subtracted from the timestamp. For this underflow a sanity check has been added to notify the user with an exception about it. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tillrohrmann/flink fixUnderflowPruning Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/1841.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1841 commit 3adc65e0d65604b6255f792f00a76ec017170f0d Author: Till RohrmannDate: 2016-03-30T13:27:21Z [FLINK-3682] [cep] Assign processing timestamp in CEP operators This PR fixes the problem that the CEP operators did not assign the wall clock time as the timestamp to incoming in StreamRecords if the TimeCharacteristic was set to ProcessingTime. Processing element with a Long.MIN_VALUE timestamp can lead to underflows in the NFA if a positive window length is subtracted from the timestamp. For this underflow a sanity check has been added to notify the user with an exception about it. > CEP operator does not set the processing timestamp correctly > > > Key: FLINK-3682 > URL: https://issues.apache.org/jira/browse/FLINK-3682 > Project: Flink > Issue Type: Bug > Components: CEP >Affects Versions: 1.0.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Fix For: 1.1.0, 1.0.1 > > > In the wake of reworking the timestamp assignment where the processing > timestamp has to be set now by the {{StreamOperator}}, the CEP operators have > not been adapted. This causes that the timestamp value assigned to the > {{StreamRecord}} is used. In case of processing time this is > {{Long.MIN_VALUE}}. In combination with a CEP time window, this can lead to > an underflow in the NFA where the window time is subtracted from the current > timestamp value. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-3681) CEP library does not support Java 8 lambdas as select function
[ https://issues.apache.org/jira/browse/FLINK-3681?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann updated FLINK-3681: - Fix Version/s: 1.1.0 > CEP library does not support Java 8 lambdas as select function > -- > > Key: FLINK-3681 > URL: https://issues.apache.org/jira/browse/FLINK-3681 > Project: Flink > Issue Type: Bug > Components: CEP >Affects Versions: 1.0.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Minor > Fix For: 1.1.0, 1.0.1 > > > Currently, the CEP library does not support Java 8 lambdas to be used as > {{select}} or {{flatSelect}} function. The problem is that the > {{TypeExtractor}} has different semantics when calling > {{TypeExtractor.getUnaryOperatorReturnType}} either with a Java 8 lambda or > an instance of an UDF function. > To illustrate the problem assume we have the following UDF function > {code} > public interface MyFunction[T, O] { > O foobar(MapinputElements); > } > {code} > When calling the {{TypeExtractor}} with an anonymous class which implements > this interface, the first type parameter is considered being the input type > of the function, namely {{T}}. > In contrast, when providing a Java 8 lambda for this interface, the > {{TypeExtractor}} will see an input type of {{Map }}. > This problem also occurs with a {{FlatMapFunction}} whose first type argument > is {{T}} but whose first parameter of a Java 8 lambda is {{Iterable}}. In > order to solve the problem here, the > {{TypeExtractor.getUnaryOperatorReturnType}} method has the parameters > {{hasIterable}} and {{hasCollector}}. If these values are {{true}}, then a > special code path is taken (in case of a Java 8 lambda), where the input type > is compared to the first type argument of the first input parameter of the > lambda (here an {{Iterable}}). This hand-knitted solution does not > generalize well, as it will fail for all parameterized types which have the > input type at a different position (e.g. {{Map }}. > In order to solve the problem, I propose to generalize the > {{getUnaryOperatorReturnType}} a little bit so that one can specify at which > position the input type is specified by a parameterized type. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3682] [cep] Assign processing timestamp...
GitHub user tillrohrmann opened a pull request: https://github.com/apache/flink/pull/1841 [FLINK-3682] [cep] Assign processing timestamp in CEP operators This PR fixes the problem that the CEP operators did not assign the wall clock time as the timestamp to incoming in StreamRecords if the TimeCharacteristic was set to ProcessingTime. Processing element with a Long.MIN_VALUE timestamp can lead to underflows in the NFA if a positive window length is subtracted from the timestamp. For this underflow a sanity check has been added to notify the user with an exception about it. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tillrohrmann/flink fixUnderflowPruning Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/1841.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1841 commit 3adc65e0d65604b6255f792f00a76ec017170f0d Author: Till RohrmannDate: 2016-03-30T13:27:21Z [FLINK-3682] [cep] Assign processing timestamp in CEP operators This PR fixes the problem that the CEP operators did not assign the wall clock time as the timestamp to incoming in StreamRecords if the TimeCharacteristic was set to ProcessingTime. Processing element with a Long.MIN_VALUE timestamp can lead to underflows in the NFA if a positive window length is subtracted from the timestamp. For this underflow a sanity check has been added to notify the user with an exception about it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-3682) CEP operator does not set the processing timestamp correctly
Till Rohrmann created FLINK-3682: Summary: CEP operator does not set the processing timestamp correctly Key: FLINK-3682 URL: https://issues.apache.org/jira/browse/FLINK-3682 Project: Flink Issue Type: Bug Components: CEP Affects Versions: 1.0.0 Reporter: Till Rohrmann Assignee: Till Rohrmann Fix For: 1.1.0, 1.0.1 In the wake of reworking the timestamp assignment where the processing timestamp has to be set now by the {{StreamOperator}}, the CEP operators have not been adapted. This causes that the timestamp value assigned to the {{StreamRecord}} is used. In case of processing time this is {{Long.MIN_VALUE}}. In combination with a CEP time window, this can lead to an underflow in the NFA where the window time is subtracted from the current timestamp value. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3681) CEP library does not support Java 8 lambdas as select function
[ https://issues.apache.org/jira/browse/FLINK-3681?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15217943#comment-15217943 ] ASF GitHub Bot commented on FLINK-3681: --- GitHub user tillrohrmann opened a pull request: https://github.com/apache/flink/pull/1840 [FLINK-3681] [cep, typeextractor] Generalize TypeExtractor to support more lambdas The TypeExtractor.getUnaryOperatorReturnType and TypeExtractor.getBinaryOperatorReturnType methods have been extended to support positional arguments for the input types. This allows to support parameterized types as Java 8 lambda arguments where the input type is not specified by the first type argument (e.g. Map). This also solves the problem that the CEP library did not support Java 8 lambdas as select functions. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tillrohrmann/flink fixCEPJava8 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/1840.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1840 commit 4e626b79e290acb7a5fd546a985310aef17db8c2 Author: Till Rohrmann Date: 2016-03-30T12:55:27Z [FLINK-3681] [cep, typeextractor] Generalize TypeExtractor to support more lambdas The TypeExtractor.getUnaryOperatorReturnType and TypeExtractor.getBinaryOperatorReturnType methods have been extended to support positional arguments for the input types. This allows to support parameterized types as Java 8 lambda arguments where the input type is not specified by the first type argument (e.g. Map ). This also solves the problem that the CEP library did not support Java 8 lambdas as select functions. > CEP library does not support Java 8 lambdas as select function > -- > > Key: FLINK-3681 > URL: https://issues.apache.org/jira/browse/FLINK-3681 > Project: Flink > Issue Type: Bug > Components: CEP >Affects Versions: 1.0.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Minor > Fix For: 1.0.1 > > > Currently, the CEP library does not support Java 8 lambdas to be used as > {{select}} or {{flatSelect}} function. The problem is that the > {{TypeExtractor}} has different semantics when calling > {{TypeExtractor.getUnaryOperatorReturnType}} either with a Java 8 lambda or > an instance of an UDF function. > To illustrate the problem assume we have the following UDF function > {code} > public interface MyFunction[T, O] { > O foobar(Map inputElements); > } > {code} > When calling the {{TypeExtractor}} with an anonymous class which implements > this interface, the first type parameter is considered being the input type > of the function, namely {{T}}. > In contrast, when providing a Java 8 lambda for this interface, the > {{TypeExtractor}} will see an input type of {{Map }}. > This problem also occurs with a {{FlatMapFunction}} whose first type argument > is {{T}} but whose first parameter of a Java 8 lambda is {{Iterable}}. In > order to solve the problem here, the > {{TypeExtractor.getUnaryOperatorReturnType}} method has the parameters > {{hasIterable}} and {{hasCollector}}. If these values are {{true}}, then a > special code path is taken (in case of a Java 8 lambda), where the input type > is compared to the first type argument of the first input parameter of the > lambda (here an {{Iterable}}). This hand-knitted solution does not > generalize well, as it will fail for all parameterized types which have the > input type at a different position (e.g. {{Map }}. > In order to solve the problem, I propose to generalize the > {{getUnaryOperatorReturnType}} a little bit so that one can specify at which > position the input type is specified by a parameterized type. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3681] [cep, typeextractor] Generalize T...
GitHub user tillrohrmann opened a pull request: https://github.com/apache/flink/pull/1840 [FLINK-3681] [cep, typeextractor] Generalize TypeExtractor to support more lambdas The TypeExtractor.getUnaryOperatorReturnType and TypeExtractor.getBinaryOperatorReturnType methods have been extended to support positional arguments for the input types. This allows to support parameterized types as Java 8 lambda arguments where the input type is not specified by the first type argument (e.g. Map). This also solves the problem that the CEP library did not support Java 8 lambdas as select functions. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tillrohrmann/flink fixCEPJava8 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/1840.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1840 commit 4e626b79e290acb7a5fd546a985310aef17db8c2 Author: Till Rohrmann Date: 2016-03-30T12:55:27Z [FLINK-3681] [cep, typeextractor] Generalize TypeExtractor to support more lambdas The TypeExtractor.getUnaryOperatorReturnType and TypeExtractor.getBinaryOperatorReturnType methods have been extended to support positional arguments for the input types. This allows to support parameterized types as Java 8 lambda arguments where the input type is not specified by the first type argument (e.g. Map ). This also solves the problem that the CEP library did not support Java 8 lambdas as select functions. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3673) Annotations for code generation
[ https://issues.apache.org/jira/browse/FLINK-3673?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15217940#comment-15217940 ] Gabor Horvath commented on FLINK-3673: -- Sure, I have extended the description of the ticket. > Annotations for code generation > --- > > Key: FLINK-3673 > URL: https://issues.apache.org/jira/browse/FLINK-3673 > Project: Flink > Issue Type: Sub-task > Components: Type Serialization System >Reporter: Gabor Horvath >Assignee: Gabor Horvath > Labels: gsoc2016 > > Annotations should be utilized to generate more efficient serialization code. > Planned improvements: > * Using never null annotations on a field, the serialized representation can > omit the 1 byte null tags and the serializer code handling this tag. > * Using never null annotiation on the POJO, we can omit the top level null > tag. > * Making a POJO final we can omit the subclass tag. > The very same annotations can be used to make the getLength method much > smarter. > Code generation is a prerequisite, to avoid runtime checks which could make > the common codepath (without annotations) slower. > I could also annotate some internal Flink types to make them more efficient. > The main risk: it would break savepoints created with a Flink version that > did not have annotation. We could either introduce a compatibility mode, or > force users to recreate those save points. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-3673) Annotations for code generation
[ https://issues.apache.org/jira/browse/FLINK-3673?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gabor Horvath updated FLINK-3673: - Description: Annotations should be utilized to generate more efficient serialization code. Planned improvements: * Using never null annotations on a field, the serialized representation can omit the 1 byte null tags and the serializer code handling this tag. * Using never null annotiation on the POJO, we can omit the top level null tag. * Making a POJO final we can omit the subclass tag. The very same annotations can be used to make the getLength method much smarter. Code generation is a prerequisite, to avoid runtime checks which could make the common codepath (without annotations) slower. I could also annotate some internal Flink types to make them more efficient. The main risk: it would break savepoints created with a Flink version that did not have annotation. We could either introduce a compatibility mode, or force users to recreate those save points. was: Annotations should be utilized to generate more efficient serialization code. The very same annotations can be used to make the getLength method much smarter. > Annotations for code generation > --- > > Key: FLINK-3673 > URL: https://issues.apache.org/jira/browse/FLINK-3673 > Project: Flink > Issue Type: Sub-task > Components: Type Serialization System >Reporter: Gabor Horvath >Assignee: Gabor Horvath > Labels: gsoc2016 > > Annotations should be utilized to generate more efficient serialization code. > Planned improvements: > * Using never null annotations on a field, the serialized representation can > omit the 1 byte null tags and the serializer code handling this tag. > * Using never null annotiation on the POJO, we can omit the top level null > tag. > * Making a POJO final we can omit the subclass tag. > The very same annotations can be used to make the getLength method much > smarter. > Code generation is a prerequisite, to avoid runtime checks which could make > the common codepath (without annotations) slower. > I could also annotate some internal Flink types to make them more efficient. > The main risk: it would break savepoints created with a Flink version that > did not have annotation. We could either introduce a compatibility mode, or > force users to recreate those save points. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-3681) CEP library does not support Java 8 lambdas as select function
Till Rohrmann created FLINK-3681: Summary: CEP library does not support Java 8 lambdas as select function Key: FLINK-3681 URL: https://issues.apache.org/jira/browse/FLINK-3681 Project: Flink Issue Type: Bug Components: CEP Affects Versions: 1.0.0 Reporter: Till Rohrmann Assignee: Till Rohrmann Priority: Minor Fix For: 1.0.1 Currently, the CEP library does not support Java 8 lambdas to be used as {{select}} or {{flatSelect}} function. The problem is that the {{TypeExtractor}} has different semantics when calling {{TypeExtractor.getUnaryOperatorReturnType}} either with a Java 8 lambda or an instance of an UDF function. To illustrate the problem assume we have the following UDF function {code} public interface MyFunction[T, O] { O foobar(MapinputElements); } {code} When calling the {{TypeExtractor}} with an anonymous class which implements this interface, the first type parameter is considered being the input type of the function, namely {{T}}. In contrast, when providing a Java 8 lambda for this interface, the {{TypeExtractor}} will see an input type of {{Map }}. This problem also occurs with a {{FlatMapFunction}} whose first type argument is {{T}} but whose first parameter of a Java 8 lambda is {{Iterable}}. In order to solve the problem here, the {{TypeExtractor.getUnaryOperatorReturnType}} method has the parameters {{hasIterable}} and {{hasCollector}}. If these values are {{true}}, then a special code path is taken (in case of a Java 8 lambda), where the input type is compared to the first type argument of the first input parameter of the lambda (here an {{Iterable}}). This hand-knitted solution does not generalize well, as it will fail for all parameterized types which have the input type at a different position (e.g. {{Map }}. In order to solve the problem, I propose to generalize the {{getUnaryOperatorReturnType}} a little bit so that one can specify at which position the input type is specified by a parameterized type. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-1502) Expose metrics to graphite, ganglia and JMX.
[ https://issues.apache.org/jira/browse/FLINK-1502?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dongwon Kim updated FLINK-1502: --- Assignee: (was: Dongwon Kim) > Expose metrics to graphite, ganglia and JMX. > > > Key: FLINK-1502 > URL: https://issues.apache.org/jira/browse/FLINK-1502 > Project: Flink > Issue Type: Sub-task > Components: JobManager, TaskManager >Affects Versions: 0.9 >Reporter: Robert Metzger >Priority: Minor > Fix For: pre-apache > > > The metrics library allows to expose collected metrics easily to other > systems such as graphite, ganglia or Java's JVM (VisualVM). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3579) Improve String concatenation
[ https://issues.apache.org/jira/browse/FLINK-3579?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15217848#comment-15217848 ] ASF GitHub Bot commented on FLINK-3579: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/1821#discussion_r57876138 --- Diff: flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/StringExpressionsITCase.java --- @@ -121,6 +120,66 @@ public void testNonWorkingSubstring2() throws Exception { resultSet.collect(); } + @Test + public void testStringConcat() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + TableEnvironment tableEnv = new TableEnvironment(); + + DataSet> ds = env.fromElements( + new Tuple2<>("ABCD", 3), + new Tuple2<>("ABCD", 2)); + + Table in = tableEnv.fromDataSet(ds, "a, b"); + + Table result = in + .select("a + b + 42"); --- End diff -- Maybe you need to rethink your approach. Expressions like `a + 42.abs().cast(STRING)` also do not work. It might be necessary to introduce a modified grammar where the dot is special symbol followed by symbols for `cast`, functions etc. without a dot. > Improve String concatenation > > > Key: FLINK-3579 > URL: https://issues.apache.org/jira/browse/FLINK-3579 > Project: Flink > Issue Type: Bug > Components: Table API >Reporter: Timo Walther >Assignee: ramkrishna.s.vasudevan >Priority: Minor > > Concatenation of a String and non-String does not work properly. > e.g. {{f0 + 42}} leads to RelBuilder Exception > ExpressionParser does not like {{f0 + 42.cast(STRING)}} either. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: FLINK-3579 Improve String concatenation (Ram)
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/1821#discussion_r57876138 --- Diff: flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/StringExpressionsITCase.java --- @@ -121,6 +120,66 @@ public void testNonWorkingSubstring2() throws Exception { resultSet.collect(); } + @Test + public void testStringConcat() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + TableEnvironment tableEnv = new TableEnvironment(); + + DataSet> ds = env.fromElements( + new Tuple2<>("ABCD", 3), + new Tuple2<>("ABCD", 2)); + + Table in = tableEnv.fromDataSet(ds, "a, b"); + + Table result = in + .select("a + b + 42"); --- End diff -- Maybe you need to rethink your approach. Expressions like `a + 42.abs().cast(STRING)` also do not work. It might be necessary to introduce a modified grammar where the dot is special symbol followed by symbols for `cast`, functions etc. without a dot. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Assigned] (FLINK-3138) Method References are not supported as lambda expressions
[ https://issues.apache.org/jira/browse/FLINK-3138?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Timo Walther reassigned FLINK-3138: --- Assignee: Timo Walther > Method References are not supported as lambda expressions > - > > Key: FLINK-3138 > URL: https://issues.apache.org/jira/browse/FLINK-3138 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 0.10.2 >Reporter: Stephan Ewen >Assignee: Timo Walther > Fix For: 1.0.0 > > > For many functions (here for example KeySelectors), one can use lambda > expressions: > {code} > DataStream stream = ...; > stream.keyBy( v -> v.getId() ) > {code} > Java's other syntax for this, Method References, do not work: > {code} > DataStream stream = ...; > stream.keyBy( MyType::getId ) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: Kafka-3375: Allows Watermark Generation in the...
Github user kl0u commented on the pull request: https://github.com/apache/flink/pull/1795#issuecomment-203373358 No I opened a new one. It is this one https://github.com/apache/flink/pull/1839 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Kafka-3375: Allows Watermark Generation in the...
Github user mxm commented on the pull request: https://github.com/apache/flink/pull/1795#issuecomment-203368412 @kl0u Has this been merged or are you going to open a new pr? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3375) Allow Watermark Generation in the Kafka Source
[ https://issues.apache.org/jira/browse/FLINK-3375?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15217751#comment-15217751 ] ASF GitHub Bot commented on FLINK-3375: --- Github user kl0u commented on the pull request: https://github.com/apache/flink/pull/1839#issuecomment-203354870 This PR is for FLINK-3375. Please review. > Allow Watermark Generation in the Kafka Source > -- > > Key: FLINK-3375 > URL: https://issues.apache.org/jira/browse/FLINK-3375 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.0.0 >Reporter: Stephan Ewen >Assignee: Kostas Kloudas > Fix For: 1.0.0 > > > It is a common case that event timestamps are ascending inside one Kafka > Partition. Ascending timestamps are easy for users, because they are handles > by ascending timestamp extraction. > If the Kafka source has multiple partitions per source task, then the records > become out of order before timestamps can be extracted and watermarks can be > generated. > If we make the FlinkKafkaConsumer an event time source function, it can > generate watermarks itself. It would internally implement the same logic as > the regular operators that merge streams, keeping track of event time > progress per partition and generating watermarks based on the current > guaranteed event time progress. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: Adds watermark emission to the Kafka source by...
Github user kl0u commented on the pull request: https://github.com/apache/flink/pull/1839#issuecomment-203354870 This PR is for FLINK-3375. Please review. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Updates the AssignerWithPunctuatedWatermarks a...
Github user kl0u commented on the pull request: https://github.com/apache/flink/pull/1811#issuecomment-203351512 This PR just updates documentation. Please review. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2998] Support range partition compariso...
Github user gallenvara commented on the pull request: https://github.com/apache/flink/pull/1838#issuecomment-203350617 @fhueske Yes, `TwoInputNode` rebuild the channels and `child` nodes don't have the information of `data distribution`. I have added the information into them and PR has been updated. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Adds watermark emission to the Kafka source by...
GitHub user kl0u opened a pull request: https://github.com/apache/flink/pull/1839 Adds watermark emission to the Kafka source by allowing the user to specify her TimestampExtractor. This PR allows the user to embed her desired timestampExtractor (or Watermark emitter) in the Kafka source itself, and it makes the source responsible for synchronizing between the different partitions/topics with lagging timestamps. In more detail, if a task handles two partitions, the first with timestamps of 0 to 100 and the other from 1000 to 2000, if it were to emit a watermark with timestamp equal to the maximum timestamp seen, then as soon as an element from the partition with the 1000 to 2000 timestamps arrives, it would render all elements in the other partition as late. To avoid this, this source will monitor the per partition/topic max timestamps, and emit a watermark with a timestamp equal to the minimum across the max per partition per topic timestamps. The emission will be done in a periodic or a punctuates way, depending on the type of the timestamp extractor the user specified. You can merge this pull request into a Git repository by running: $ git pull https://github.com/kl0u/flink kafka_wm_redesign Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/1839.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1839 commit 74dbb5c81ef7793202f151a96dee2ee0bd980a36 Author: kl0uDate: 2016-03-08T16:35:14Z Adds watermark emission to the Kafka source by allowing the user to specify her TimestampExtractor. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2998) Support range partition comparison for multi input nodes.
[ https://issues.apache.org/jira/browse/FLINK-2998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15217738#comment-15217738 ] ASF GitHub Bot commented on FLINK-2998: --- Github user gallenvara commented on the pull request: https://github.com/apache/flink/pull/1838#issuecomment-203350617 @fhueske Yes, `TwoInputNode` rebuild the channels and `child` nodes don't have the information of `data distribution`. I have added the information into them and PR has been updated. > Support range partition comparison for multi input nodes. > - > > Key: FLINK-2998 > URL: https://issues.apache.org/jira/browse/FLINK-2998 > Project: Flink > Issue Type: New Feature > Components: Optimizer >Reporter: Chengxiang Li >Priority: Minor > > The optimizer may have potential opportunity to optimize the DAG while it > found two input range partition are equivalent, we does not support the > comparison yet. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: Kafka-3375: Allows Watermark Generation in the...
Github user kl0u closed the pull request at: https://github.com/apache/flink/pull/1795 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3524) Provide a JSONDeserialisationSchema in the kafka connector package
[ https://issues.apache.org/jira/browse/FLINK-3524?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15217695#comment-15217695 ] ASF GitHub Bot commented on FLINK-3524: --- Github user zentol commented on the pull request: https://github.com/apache/flink/pull/1834#issuecomment-203335575 @StephanEwen yes, it's only for kafka. it relies on other classes (KeyedDeserializationSchema) that are only present in the kafka module. > Provide a JSONDeserialisationSchema in the kafka connector package > -- > > Key: FLINK-3524 > URL: https://issues.apache.org/jira/browse/FLINK-3524 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Robert Metzger >Assignee: Chesnay Schepler > Labels: starter > > (I don't want to include this into 1.0.0) > Currently, there is no standardized way of parsing JSON data from a Kafka > stream. I see a lot of users using JSON in their topics. It would make things > easier for our users to provide a serializer for them. > I suggest to use the jackson library because we have that aready as a > dependency in Flink and it allows to parse from a byte[]. > I would suggest to provide the following classes: > - JSONDeserializationSchema() > - JSONDeKeyValueSerializationSchema(bool includeMetadata) > The second variant should produce a record like this: > {code} > {"key": "keydata", > "value": "valuedata", > "metadata": {"offset": 123, "topic": "", "partition": 2 } > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3524] [kafka] Add JSONDeserializationSc...
Github user zentol commented on the pull request: https://github.com/apache/flink/pull/1834#issuecomment-203335575 @StephanEwen yes, it's only for kafka. it relies on other classes (KeyedDeserializationSchema) that are only present in the kafka module. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3678) Make Flink logs directory configurable
[ https://issues.apache.org/jira/browse/FLINK-3678?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15217668#comment-15217668 ] ASF GitHub Bot commented on FLINK-3678: --- Github user chiwanpark commented on the pull request: https://github.com/apache/flink/pull/1837#issuecomment-203323834 I tested this PR in local cluster. Looks good to me. +1 > Make Flink logs directory configurable > -- > > Key: FLINK-3678 > URL: https://issues.apache.org/jira/browse/FLINK-3678 > Project: Flink > Issue Type: Improvement > Components: Start-Stop Scripts >Affects Versions: 1.0.0 >Reporter: Stefano Baghino >Assignee: Stefano Baghino >Priority: Minor > Fix For: 1.0.1 > > > Currently Flink logs are stored under {{$FLINK_HOME/log}} and the user cannot > configure an alternative storage location. It would be nice to add a > configuration key in the {{flink-conf.yaml}} and edit the {{bin/flink}} > launch script accordingly to get the value if present or default to the > current behavior if no value is provided. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3678] Make Flink logs directory configu...
Github user chiwanpark commented on the pull request: https://github.com/apache/flink/pull/1837#issuecomment-203323834 I tested this PR in local cluster. Looks good to me. +1 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2998) Support range partition comparison for multi input nodes.
[ https://issues.apache.org/jira/browse/FLINK-2998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15217621#comment-15217621 ] ASF GitHub Bot commented on FLINK-2998: --- Github user fhueske commented on the pull request: https://github.com/apache/flink/pull/1838#issuecomment-203307525 I'm currently on vacation. Will have a closer look when I'm back in about a week. I am not sure that we need to touch the Join and CoGroup operators to pass the distributions. The optimizer is able to get this from the GlobalProperties to decide whether the partitionings are valid and equivalent. > Support range partition comparison for multi input nodes. > - > > Key: FLINK-2998 > URL: https://issues.apache.org/jira/browse/FLINK-2998 > Project: Flink > Issue Type: New Feature > Components: Optimizer >Reporter: Chengxiang Li >Priority: Minor > > The optimizer may have potential opportunity to optimize the DAG while it > found two input range partition are equivalent, we does not support the > comparison yet. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2998] Support range partition compariso...
Github user fhueske commented on the pull request: https://github.com/apache/flink/pull/1838#issuecomment-203307525 I'm currently on vacation. Will have a closer look when I'm back in about a week. I am not sure that we need to touch the Join and CoGroup operators to pass the distributions. The optimizer is able to get this from the GlobalProperties to decide whether the partitionings are valid and equivalent. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3669) WindowOperator registers a log of timers at StreamTask
[ https://issues.apache.org/jira/browse/FLINK-3669?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15217614#comment-15217614 ] Konstantin Knauf commented on FLINK-3669: - Hi Aljoscha, I think there is more to it, although this would definitely mitigate the problem. Even if we only registered one timer per second. We would still create a lot of TriggerTasks, which do not get cleaned up, when TriggerContext.deleteProcessingTimeTimer is invoked. This could still lead to GC problems, if the trigger is far in the future. When calling deleteProcessingTimerTimer shouldn't the ScheduledFuture for this timer be canceled, which together with ScheduledThreadPoolExecutor.setRemoveOnCancelPolicy(true) would remove the TriggerTask from the ExecutorService and let GC do the cleanup? Cheers, Konstantin > WindowOperator registers a log of timers at StreamTask > -- > > Key: FLINK-3669 > URL: https://issues.apache.org/jira/browse/FLINK-3669 > Project: Flink > Issue Type: Bug > Components: Streaming >Affects Versions: 1.0.1 >Reporter: Aljoscha Krettek >Priority: Blocker > > Right now, the WindowOperator registers a timer at the StreamTask for every > processing-time timer that a Trigger registers. We should combine several > registered trigger timers to only register one low-level timer (timer > coalescing). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3579) Improve String concatenation
[ https://issues.apache.org/jira/browse/FLINK-3579?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15217550#comment-15217550 ] ASF GitHub Bot commented on FLINK-3579: --- Github user ramkrish86 commented on a diff in the pull request: https://github.com/apache/flink/pull/1821#discussion_r57844972 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ExpressionParser.scala --- @@ -59,6 +59,8 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers { Literal(str.toInt) } else if (str.endsWith("f") | str.endsWith("F")) { Literal(str.toFloat) +} else if (str.endsWith(".")) { --- End diff -- Since the 'cast()' function parsing is accepting '.' as '.cast()' we need to do this way. Else the parser does not know that '.' is specific to that operation of 'cast'. But I am not so well versed with this parser so suggestions welcome. > Improve String concatenation > > > Key: FLINK-3579 > URL: https://issues.apache.org/jira/browse/FLINK-3579 > Project: Flink > Issue Type: Bug > Components: Table API >Reporter: Timo Walther >Assignee: ramkrishna.s.vasudevan >Priority: Minor > > Concatenation of a String and non-String does not work properly. > e.g. {{f0 + 42}} leads to RelBuilder Exception > ExpressionParser does not like {{f0 + 42.cast(STRING)}} either. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3579) Improve String concatenation
[ https://issues.apache.org/jira/browse/FLINK-3579?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15217547#comment-15217547 ] ASF GitHub Bot commented on FLINK-3579: --- Github user ramkrish86 commented on a diff in the pull request: https://github.com/apache/flink/pull/1821#discussion_r57844837 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ExpressionParser.scala --- @@ -116,7 +118,18 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers { atom <~ ".cast(BOOL)" ^^ { e => Cast(e, BasicTypeInfo.BOOLEAN_TYPE_INFO) } | atom <~ ".cast(BOOLEAN)" ^^ { e => Cast(e, BasicTypeInfo.BOOLEAN_TYPE_INFO) } | atom <~ ".cast(STRING)" ^^ { e => Cast(e, BasicTypeInfo.STRING_TYPE_INFO) } | -atom <~ ".cast(DATE)" ^^ { e => Cast(e, BasicTypeInfo.DATE_TYPE_INFO) } +atom <~ ".cast(DATE)" ^^ { e => Cast(e, BasicTypeInfo.DATE_TYPE_INFO) } | +// When an integer is directly casted. +atom <~ "cast(BYTE)" ^^ { e => Cast(e, BasicTypeInfo.BYTE_TYPE_INFO) } | +atom <~ "cast(SHORT)" ^^ { e => Cast(e, BasicTypeInfo.SHORT_TYPE_INFO) } | +atom <~ "cast(INT)" ^^ { e => Cast(e, BasicTypeInfo.INT_TYPE_INFO) } | +atom <~ "cast(LONG)" ^^ { e => Cast(e, BasicTypeInfo.LONG_TYPE_INFO) } | +atom <~ "cast(FLOAT)" ^^ { e => Cast(e, BasicTypeInfo.FLOAT_TYPE_INFO) } | +atom <~ "cast(DOUBLE)" ^^ { e => Cast(e, BasicTypeInfo.DOUBLE_TYPE_INFO) } | +atom <~ "cast(BOOL)" ^^ { e => Cast(e, BasicTypeInfo.BOOLEAN_TYPE_INFO) } | +atom <~ "cast(BOOLEAN)" ^^ { e => Cast(e, BasicTypeInfo.BOOLEAN_TYPE_INFO) } | +atom <~ "cast(STRING)" ^^ { e => Cast(e, BasicTypeInfo.STRING_TYPE_INFO) } | +atom <~ "cast(DATE)" ^^ { e => Cast(e, BasicTypeInfo.DATE_TYPE_INFO) } --- End diff -- Since I am handling the '.' operator specifically I needed this combination where the cast could come without '.' in it. > Improve String concatenation > > > Key: FLINK-3579 > URL: https://issues.apache.org/jira/browse/FLINK-3579 > Project: Flink > Issue Type: Bug > Components: Table API >Reporter: Timo Walther >Assignee: ramkrishna.s.vasudevan >Priority: Minor > > Concatenation of a String and non-String does not work properly. > e.g. {{f0 + 42}} leads to RelBuilder Exception > ExpressionParser does not like {{f0 + 42.cast(STRING)}} either. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: FLINK-3579 Improve String concatenation (Ram)
Github user ramkrish86 commented on a diff in the pull request: https://github.com/apache/flink/pull/1821#discussion_r57844972 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ExpressionParser.scala --- @@ -59,6 +59,8 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers { Literal(str.toInt) } else if (str.endsWith("f") | str.endsWith("F")) { Literal(str.toFloat) +} else if (str.endsWith(".")) { --- End diff -- Since the 'cast()' function parsing is accepting '.' as '.cast()' we need to do this way. Else the parser does not know that '.' is specific to that operation of 'cast'. But I am not so well versed with this parser so suggestions welcome. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3579) Improve String concatenation
[ https://issues.apache.org/jira/browse/FLINK-3579?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15217544#comment-15217544 ] ASF GitHub Bot commented on FLINK-3579: --- Github user ramkrish86 commented on a diff in the pull request: https://github.com/apache/flink/pull/1821#discussion_r57844753 --- Diff: flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/StringExpressionsITCase.java --- @@ -121,6 +120,66 @@ public void testNonWorkingSubstring2() throws Exception { resultSet.collect(); } + @Test + public void testStringConcat() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + TableEnvironment tableEnv = new TableEnvironment(); + + DataSet> ds = env.fromElements( + new Tuple2<>("ABCD", 3), + new Tuple2<>("ABCD", 2)); + + Table in = tableEnv.fromDataSet(ds, "a, b"); + + Table result = in + .select("a + b + 42"); --- End diff -- Let me check. I think it goes with the previous comment. If that is done this should work too. > Improve String concatenation > > > Key: FLINK-3579 > URL: https://issues.apache.org/jira/browse/FLINK-3579 > Project: Flink > Issue Type: Bug > Components: Table API >Reporter: Timo Walther >Assignee: ramkrishna.s.vasudevan >Priority: Minor > > Concatenation of a String and non-String does not work properly. > e.g. {{f0 + 42}} leads to RelBuilder Exception > ExpressionParser does not like {{f0 + 42.cast(STRING)}} either. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: FLINK-3579 Improve String concatenation (Ram)
Github user ramkrish86 commented on a diff in the pull request: https://github.com/apache/flink/pull/1821#discussion_r57844837 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ExpressionParser.scala --- @@ -116,7 +118,18 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers { atom <~ ".cast(BOOL)" ^^ { e => Cast(e, BasicTypeInfo.BOOLEAN_TYPE_INFO) } | atom <~ ".cast(BOOLEAN)" ^^ { e => Cast(e, BasicTypeInfo.BOOLEAN_TYPE_INFO) } | atom <~ ".cast(STRING)" ^^ { e => Cast(e, BasicTypeInfo.STRING_TYPE_INFO) } | -atom <~ ".cast(DATE)" ^^ { e => Cast(e, BasicTypeInfo.DATE_TYPE_INFO) } +atom <~ ".cast(DATE)" ^^ { e => Cast(e, BasicTypeInfo.DATE_TYPE_INFO) } | +// When an integer is directly casted. +atom <~ "cast(BYTE)" ^^ { e => Cast(e, BasicTypeInfo.BYTE_TYPE_INFO) } | +atom <~ "cast(SHORT)" ^^ { e => Cast(e, BasicTypeInfo.SHORT_TYPE_INFO) } | +atom <~ "cast(INT)" ^^ { e => Cast(e, BasicTypeInfo.INT_TYPE_INFO) } | +atom <~ "cast(LONG)" ^^ { e => Cast(e, BasicTypeInfo.LONG_TYPE_INFO) } | +atom <~ "cast(FLOAT)" ^^ { e => Cast(e, BasicTypeInfo.FLOAT_TYPE_INFO) } | +atom <~ "cast(DOUBLE)" ^^ { e => Cast(e, BasicTypeInfo.DOUBLE_TYPE_INFO) } | +atom <~ "cast(BOOL)" ^^ { e => Cast(e, BasicTypeInfo.BOOLEAN_TYPE_INFO) } | +atom <~ "cast(BOOLEAN)" ^^ { e => Cast(e, BasicTypeInfo.BOOLEAN_TYPE_INFO) } | +atom <~ "cast(STRING)" ^^ { e => Cast(e, BasicTypeInfo.STRING_TYPE_INFO) } | +atom <~ "cast(DATE)" ^^ { e => Cast(e, BasicTypeInfo.DATE_TYPE_INFO) } --- End diff -- Since I am handling the '.' operator specifically I needed this combination where the cast could come without '.' in it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3579) Improve String concatenation
[ https://issues.apache.org/jira/browse/FLINK-3579?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15217543#comment-15217543 ] ASF GitHub Bot commented on FLINK-3579: --- Github user ramkrish86 commented on a diff in the pull request: https://github.com/apache/flink/pull/1821#discussion_r57844716 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/RexNodeTranslator.scala --- @@ -134,7 +136,13 @@ object RexNodeTranslator { case Plus(left, right) => val l = toRexNode(left, relBuilder) val r = toRexNode(right, relBuilder) -relBuilder.call(SqlStdOperatorTable.PLUS, l, r) +if(SqlTypeName.STRING_TYPES.contains(l.getType.getSqlTypeName)) { --- End diff -- Valid point. I thought of handling both sides. Let me check this. > Improve String concatenation > > > Key: FLINK-3579 > URL: https://issues.apache.org/jira/browse/FLINK-3579 > Project: Flink > Issue Type: Bug > Components: Table API >Reporter: Timo Walther >Assignee: ramkrishna.s.vasudevan >Priority: Minor > > Concatenation of a String and non-String does not work properly. > e.g. {{f0 + 42}} leads to RelBuilder Exception > ExpressionParser does not like {{f0 + 42.cast(STRING)}} either. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: FLINK-3579 Improve String concatenation (Ram)
Github user ramkrish86 commented on a diff in the pull request: https://github.com/apache/flink/pull/1821#discussion_r57844753 --- Diff: flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/StringExpressionsITCase.java --- @@ -121,6 +120,66 @@ public void testNonWorkingSubstring2() throws Exception { resultSet.collect(); } + @Test + public void testStringConcat() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + TableEnvironment tableEnv = new TableEnvironment(); + + DataSet> ds = env.fromElements( + new Tuple2<>("ABCD", 3), + new Tuple2<>("ABCD", 2)); + + Table in = tableEnv.fromDataSet(ds, "a, b"); + + Table result = in + .select("a + b + 42"); --- End diff -- Let me check. I think it goes with the previous comment. If that is done this should work too. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: FLINK-3579 Improve String concatenation (Ram)
Github user ramkrish86 commented on a diff in the pull request: https://github.com/apache/flink/pull/1821#discussion_r57844716 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/RexNodeTranslator.scala --- @@ -134,7 +136,13 @@ object RexNodeTranslator { case Plus(left, right) => val l = toRexNode(left, relBuilder) val r = toRexNode(right, relBuilder) -relBuilder.call(SqlStdOperatorTable.PLUS, l, r) +if(SqlTypeName.STRING_TYPES.contains(l.getType.getSqlTypeName)) { --- End diff -- Valid point. I thought of handling both sides. Let me check this. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---