[jira] [Updated] (FLINK-5498) Add support for outer joins with non-equality predicates (and 1+ equality predicates)
[ https://issues.apache.org/jira/browse/FLINK-5498?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Fabian Hueske updated FLINK-5498: - Summary: Add support for outer joins with non-equality predicates (and 1+ equality predicates) (was: OuterJoins with non-equality predicates compute wrong results) > Add support for outer joins with non-equality predicates (and 1+ equality > predicates) > - > > Key: FLINK-5498 > URL: https://issues.apache.org/jira/browse/FLINK-5498 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Affects Versions: 1.3.0 >Reporter: lincoln.lee >Assignee: lincoln.lee >Priority: Minor > > I found the expected result of a unit test case incorrect compare to that in > a RDMBS, > see > flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/JoinITCase.scala > {code:title=JoinITCase.scala} > def testRightJoinWithNotOnlyEquiJoin(): Unit = { > ... > val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, > 'c) > val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, > 'f, 'g, 'h) > val joinT = ds1.rightOuterJoin(ds2, 'a === 'd && 'b < 'h).select('c, 'g) > > val expected = "Hello world,BCD\n" > val results = joinT.toDataSet[Row].collect() > TestBaseUtils.compareResultAsText(results.asJava, expected) > } > {code} > Then I took some time to learn about the ‘outer join’ in relational > databases, the right result of above case should be(tested in SQL Server and > MySQL, the results are same): > {code} > > select c, g from tuple3 right outer join tuple5 on a=f and bcg > > NULL Hallo > NULL Hallo Welt > NULL Hallo Welt wie > NULL Hallo Welt wie gehts? > NULL ABC > Hello world BCD > NULL CDE > NULL DEF > NULL EFG > NULL FGH > NULL GHI > NULL HIJ > NULL IJK > NULL JKL > NULL KLM > {code} > the join condition {{rightOuterJoin('a === 'd && 'b < 'h)}} is not equivalent > to {{rightOuterJoin('a === 'd).where('b < 'h)}}. > The problem is rooted in the code-generated {{JoinFunction}} (see > {{DataSetJoin.translateToPlan()}}, line 188). If the join condition does not > match, we must emit the outer row padded with nulls instead of returning from > the function without emitting anything. > The code-generated {{JoinFunction}} does also include equality predicates. > These should be removed before generating the code, e.g., in > {{DataSetJoinRule}} when generating the {{DataSetJoin}} with help of > {{JoinInfo.getRemaining()}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5498) OuterJoins with non-equality predicates compute wrong results
[ https://issues.apache.org/jira/browse/FLINK-5498?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15825602#comment-15825602 ] Fabian Hueske commented on FLINK-5498: -- Fair enough, those are valid concerns. Then let's split this into 1. disable non-equality predicates on outer joins. This fix should go into Flink 1.2.0, see FLINK-5520. 2. enable and correctly implement non-equality predicates on outer joins. This will be an improvement for Flink 1.3.0. > OuterJoins with non-equality predicates compute wrong results > - > > Key: FLINK-5498 > URL: https://issues.apache.org/jira/browse/FLINK-5498 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.3.0 >Reporter: lincoln.lee >Assignee: lincoln.lee >Priority: Critical > > I found the expected result of a unit test case incorrect compare to that in > a RDMBS, > see > flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/JoinITCase.scala > {code:title=JoinITCase.scala} > def testRightJoinWithNotOnlyEquiJoin(): Unit = { > ... > val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, > 'c) > val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, > 'f, 'g, 'h) > val joinT = ds1.rightOuterJoin(ds2, 'a === 'd && 'b < 'h).select('c, 'g) > > val expected = "Hello world,BCD\n" > val results = joinT.toDataSet[Row].collect() > TestBaseUtils.compareResultAsText(results.asJava, expected) > } > {code} > Then I took some time to learn about the ‘outer join’ in relational > databases, the right result of above case should be(tested in SQL Server and > MySQL, the results are same): > {code} > > select c, g from tuple3 right outer join tuple5 on a=f and bcg > > NULL Hallo > NULL Hallo Welt > NULL Hallo Welt wie > NULL Hallo Welt wie gehts? > NULL ABC > Hello world BCD > NULL CDE > NULL DEF > NULL EFG > NULL FGH > NULL GHI > NULL HIJ > NULL IJK > NULL JKL > NULL KLM > {code} > the join condition {{rightOuterJoin('a === 'd && 'b < 'h)}} is not equivalent > to {{rightOuterJoin('a === 'd).where('b < 'h)}}. > The problem is rooted in the code-generated {{JoinFunction}} (see > {{DataSetJoin.translateToPlan()}}, line 188). If the join condition does not > match, we must emit the outer row padded with nulls instead of returning from > the function without emitting anything. > The code-generated {{JoinFunction}} does also include equality predicates. > These should be removed before generating the code, e.g., in > {{DataSetJoinRule}} when generating the {{DataSetJoin}} with help of > {{JoinInfo.getRemaining()}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-5498) OuterJoins with non-equality predicates compute wrong results
[ https://issues.apache.org/jira/browse/FLINK-5498?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Fabian Hueske updated FLINK-5498: - Issue Type: New Feature (was: Bug) > OuterJoins with non-equality predicates compute wrong results > - > > Key: FLINK-5498 > URL: https://issues.apache.org/jira/browse/FLINK-5498 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Affects Versions: 1.3.0 >Reporter: lincoln.lee >Assignee: lincoln.lee >Priority: Critical > > I found the expected result of a unit test case incorrect compare to that in > a RDMBS, > see > flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/JoinITCase.scala > {code:title=JoinITCase.scala} > def testRightJoinWithNotOnlyEquiJoin(): Unit = { > ... > val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, > 'c) > val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, > 'f, 'g, 'h) > val joinT = ds1.rightOuterJoin(ds2, 'a === 'd && 'b < 'h).select('c, 'g) > > val expected = "Hello world,BCD\n" > val results = joinT.toDataSet[Row].collect() > TestBaseUtils.compareResultAsText(results.asJava, expected) > } > {code} > Then I took some time to learn about the ‘outer join’ in relational > databases, the right result of above case should be(tested in SQL Server and > MySQL, the results are same): > {code} > > select c, g from tuple3 right outer join tuple5 on a=f and bcg > > NULL Hallo > NULL Hallo Welt > NULL Hallo Welt wie > NULL Hallo Welt wie gehts? > NULL ABC > Hello world BCD > NULL CDE > NULL DEF > NULL EFG > NULL FGH > NULL GHI > NULL HIJ > NULL IJK > NULL JKL > NULL KLM > {code} > the join condition {{rightOuterJoin('a === 'd && 'b < 'h)}} is not equivalent > to {{rightOuterJoin('a === 'd).where('b < 'h)}}. > The problem is rooted in the code-generated {{JoinFunction}} (see > {{DataSetJoin.translateToPlan()}}, line 188). If the join condition does not > match, we must emit the outer row padded with nulls instead of returning from > the function without emitting anything. > The code-generated {{JoinFunction}} does also include equality predicates. > These should be removed before generating the code, e.g., in > {{DataSetJoinRule}} when generating the {{DataSetJoin}} with help of > {{JoinInfo.getRemaining()}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-5498) OuterJoins with non-equality predicates compute wrong results
[ https://issues.apache.org/jira/browse/FLINK-5498?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Fabian Hueske updated FLINK-5498: - Affects Version/s: (was: 1.1.4) (was: 1.2.0) > OuterJoins with non-equality predicates compute wrong results > - > > Key: FLINK-5498 > URL: https://issues.apache.org/jira/browse/FLINK-5498 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.3.0 >Reporter: lincoln.lee >Assignee: lincoln.lee >Priority: Critical > > I found the expected result of a unit test case incorrect compare to that in > a RDMBS, > see > flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/JoinITCase.scala > {code:title=JoinITCase.scala} > def testRightJoinWithNotOnlyEquiJoin(): Unit = { > ... > val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, > 'c) > val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, > 'f, 'g, 'h) > val joinT = ds1.rightOuterJoin(ds2, 'a === 'd && 'b < 'h).select('c, 'g) > > val expected = "Hello world,BCD\n" > val results = joinT.toDataSet[Row].collect() > TestBaseUtils.compareResultAsText(results.asJava, expected) > } > {code} > Then I took some time to learn about the ‘outer join’ in relational > databases, the right result of above case should be(tested in SQL Server and > MySQL, the results are same): > {code} > > select c, g from tuple3 right outer join tuple5 on a=f and bcg > > NULL Hallo > NULL Hallo Welt > NULL Hallo Welt wie > NULL Hallo Welt wie gehts? > NULL ABC > Hello world BCD > NULL CDE > NULL DEF > NULL EFG > NULL FGH > NULL GHI > NULL HIJ > NULL IJK > NULL JKL > NULL KLM > {code} > the join condition {{rightOuterJoin('a === 'd && 'b < 'h)}} is not equivalent > to {{rightOuterJoin('a === 'd).where('b < 'h)}}. > The problem is rooted in the code-generated {{JoinFunction}} (see > {{DataSetJoin.translateToPlan()}}, line 188). If the join condition does not > match, we must emit the outer row padded with nulls instead of returning from > the function without emitting anything. > The code-generated {{JoinFunction}} does also include equality predicates. > These should be removed before generating the code, e.g., in > {{DataSetJoinRule}} when generating the {{DataSetJoin}} with help of > {{JoinInfo.getRemaining()}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-5498) OuterJoins with non-equality predicates compute wrong results
[ https://issues.apache.org/jira/browse/FLINK-5498?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Fabian Hueske updated FLINK-5498: - Priority: Minor (was: Critical) > OuterJoins with non-equality predicates compute wrong results > - > > Key: FLINK-5498 > URL: https://issues.apache.org/jira/browse/FLINK-5498 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Affects Versions: 1.3.0 >Reporter: lincoln.lee >Assignee: lincoln.lee >Priority: Minor > > I found the expected result of a unit test case incorrect compare to that in > a RDMBS, > see > flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/JoinITCase.scala > {code:title=JoinITCase.scala} > def testRightJoinWithNotOnlyEquiJoin(): Unit = { > ... > val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, > 'c) > val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, > 'f, 'g, 'h) > val joinT = ds1.rightOuterJoin(ds2, 'a === 'd && 'b < 'h).select('c, 'g) > > val expected = "Hello world,BCD\n" > val results = joinT.toDataSet[Row].collect() > TestBaseUtils.compareResultAsText(results.asJava, expected) > } > {code} > Then I took some time to learn about the ‘outer join’ in relational > databases, the right result of above case should be(tested in SQL Server and > MySQL, the results are same): > {code} > > select c, g from tuple3 right outer join tuple5 on a=f and bcg > > NULL Hallo > NULL Hallo Welt > NULL Hallo Welt wie > NULL Hallo Welt wie gehts? > NULL ABC > Hello world BCD > NULL CDE > NULL DEF > NULL EFG > NULL FGH > NULL GHI > NULL HIJ > NULL IJK > NULL JKL > NULL KLM > {code} > the join condition {{rightOuterJoin('a === 'd && 'b < 'h)}} is not equivalent > to {{rightOuterJoin('a === 'd).where('b < 'h)}}. > The problem is rooted in the code-generated {{JoinFunction}} (see > {{DataSetJoin.translateToPlan()}}, line 188). If the join condition does not > match, we must emit the outer row padded with nulls instead of returning from > the function without emitting anything. > The code-generated {{JoinFunction}} does also include equality predicates. > These should be removed before generating the code, e.g., in > {{DataSetJoinRule}} when generating the {{DataSetJoin}} with help of > {{JoinInfo.getRemaining()}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-5520) Disable outer joins with non-equality predicates
Fabian Hueske created FLINK-5520: Summary: Disable outer joins with non-equality predicates Key: FLINK-5520 URL: https://issues.apache.org/jira/browse/FLINK-5520 Project: Flink Issue Type: Bug Components: Table API & SQL Affects Versions: 1.2.0 Reporter: Fabian Hueske Priority: Blocker Fix For: 1.2.0 Outer joins with non-equality predicates (and at least one equality predicate) compute incorrect results. Since this is not a very common requirement, I propose to disable this feature for the 1.2.0 release and correctly implement it for a later version. The fix should add checks in the Table API validation phase (to get a good error message) and in the DataSetJoinRule to prevent translation of SQL queries with non-equality predicates on outer joins. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5256) Extend DataSetSingleRowJoin to support Left and Right joins
[ https://issues.apache.org/jira/browse/FLINK-5256?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15825599#comment-15825599 ] Anton Mushin commented on FLINK-5256: - Hi, I update PR, but it is incorrect yet. I have some problem with {MapJoinRightRunner}, more accurate in debug time I noticed what I never came to {MapJoinRightRunner} for any sql queries with outer join. Does it make sense support join with null records in {MapJoinRightRunner} in this case? > Extend DataSetSingleRowJoin to support Left and Right joins > --- > > Key: FLINK-5256 > URL: https://issues.apache.org/jira/browse/FLINK-5256 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.2.0 >Reporter: Fabian Hueske >Assignee: Anton Mushin > > The {{DataSetSingleRowJoin}} is a broadcast-map join that supports arbitrary > inner joins where one input is a single row. > I found that Calcite translates certain subqueries into non-equi left and > right joins with single input. These cases can be handled if the > {{DataSetSingleRowJoin}} is extended to support outer joins on the > non-single-row input, i.e., left joins if the right side is single input and > vice versa. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5256) Extend DataSetSingleRowJoin to support Left and Right joins
[ https://issues.apache.org/jira/browse/FLINK-5256?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15825594#comment-15825594 ] ASF GitHub Bot commented on FLINK-5256: --- Github user ex00 commented on a diff in the pull request: https://github.com/apache/flink/pull/3033#discussion_r96356761 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/JoinITCase.scala --- @@ -372,9 +372,163 @@ class JoinITCase( val table = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv).as('a1, 'a2, 'a3) tEnv.registerTable("A", table) -val sqlQuery1 = "SELECT * FROM A CROSS JOIN (SELECT count(*) FROM A HAVING count(*) < 0)" -val result = tEnv.sql(sqlQuery1).count() +val sqlQuery1 = "SELECT * FROM A CROSS JOIN " + + "(SELECT count(*) FROM A HAVING count(*) < 0)" +val result = tEnv.sql(sqlQuery1) +val expected =Seq( + "2,2,Hello,null", + "1,1,Hi,null", + "3,2,Hello world,null").mkString("\n") --- End diff -- this test was change bacause now we join with null record. look [testSingleRowLeftOuterJoinWithNullRecords](https://github.com/apache/flink/pull/3033/files#diff-102e5d9e330260c0acf5e4e54ff3bdceR478) > Extend DataSetSingleRowJoin to support Left and Right joins > --- > > Key: FLINK-5256 > URL: https://issues.apache.org/jira/browse/FLINK-5256 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.2.0 >Reporter: Fabian Hueske >Assignee: Anton Mushin > > The {{DataSetSingleRowJoin}} is a broadcast-map join that supports arbitrary > inner joins where one input is a single row. > I found that Calcite translates certain subqueries into non-equi left and > right joins with single input. These cases can be handled if the > {{DataSetSingleRowJoin}} is extended to support outer joins on the > non-single-row input, i.e., left joins if the right side is single input and > vice versa. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #3033: [FLINK-5256] Extend DataSetSingleRowJoin to suppor...
Github user ex00 commented on a diff in the pull request: https://github.com/apache/flink/pull/3033#discussion_r96356761 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/JoinITCase.scala --- @@ -372,9 +372,163 @@ class JoinITCase( val table = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv).as('a1, 'a2, 'a3) tEnv.registerTable("A", table) -val sqlQuery1 = "SELECT * FROM A CROSS JOIN (SELECT count(*) FROM A HAVING count(*) < 0)" -val result = tEnv.sql(sqlQuery1).count() +val sqlQuery1 = "SELECT * FROM A CROSS JOIN " + + "(SELECT count(*) FROM A HAVING count(*) < 0)" +val result = tEnv.sql(sqlQuery1) +val expected =Seq( + "2,2,Hello,null", + "1,1,Hi,null", + "3,2,Hello world,null").mkString("\n") --- End diff -- this test was change bacause now we join with null record. look [testSingleRowLeftOuterJoinWithNullRecords](https://github.com/apache/flink/pull/3033/files#diff-102e5d9e330260c0acf5e4e54ff3bdceR478) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5256) Extend DataSetSingleRowJoin to support Left and Right joins
[ https://issues.apache.org/jira/browse/FLINK-5256?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15825591#comment-15825591 ] ASF GitHub Bot commented on FLINK-5256: --- Github user ex00 commented on a diff in the pull request: https://github.com/apache/flink/pull/3033#discussion_r96356416 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetJoinRule.scala --- @@ -43,7 +43,7 @@ class DataSetJoinRule !joinInfo.pairs().isEmpty } - override def convert(rel: RelNode): RelNode = { +override def convert(rel: RelNode): RelNode = { --- End diff -- the chnges will be revert in next PR update > Extend DataSetSingleRowJoin to support Left and Right joins > --- > > Key: FLINK-5256 > URL: https://issues.apache.org/jira/browse/FLINK-5256 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.2.0 >Reporter: Fabian Hueske >Assignee: Anton Mushin > > The {{DataSetSingleRowJoin}} is a broadcast-map join that supports arbitrary > inner joins where one input is a single row. > I found that Calcite translates certain subqueries into non-equi left and > right joins with single input. These cases can be handled if the > {{DataSetSingleRowJoin}} is extended to support outer joins on the > non-single-row input, i.e., left joins if the right side is single input and > vice versa. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #3033: [FLINK-5256] Extend DataSetSingleRowJoin to suppor...
Github user ex00 commented on a diff in the pull request: https://github.com/apache/flink/pull/3033#discussion_r96356416 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetJoinRule.scala --- @@ -43,7 +43,7 @@ class DataSetJoinRule !joinInfo.pairs().isEmpty } - override def convert(rel: RelNode): RelNode = { +override def convert(rel: RelNode): RelNode = { --- End diff -- the chnges will be revert in next PR update --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5256) Extend DataSetSingleRowJoin to support Left and Right joins
[ https://issues.apache.org/jira/browse/FLINK-5256?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15825588#comment-15825588 ] ASF GitHub Bot commented on FLINK-5256: --- Github user ex00 commented on a diff in the pull request: https://github.com/apache/flink/pull/3033#discussion_r96356194 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetSingleRowJoin.scala --- @@ -144,21 +150,46 @@ class DataSetSingleRowJoin( val condition = codeGenerator.generateExpression(joinCondition) -val joinMethodBody = s""" - |${condition.code} - |if (${condition.resultTerm}) { - | ${conversion.code} - | ${codeGenerator.collectorTerm}.collect(${conversion.resultTerm}); - |} - |""".stripMargin +val joinMethodBody = + if (joinType == JoinRelType.INNER) { +s""" + |${condition.code} + |if (${condition.resultTerm}) { + | ${conversion.code} + | ${codeGenerator.collectorTerm}.collect(${conversion.resultTerm}); + |} + |""".stripMargin +} else { +val singleNode = + if (leftIsSingle) { +leftNode + } + else { +rightNode + } +val notSuitedToCondition = singleNode + .getRowType + .getFieldList + .map(field => getRowType.getFieldNames.indexOf(field.getName)) + .map(i => s"${conversion.resultTerm}.setField($i,null);") + +s""" + |${condition.code} + |${conversion.code} + |if(!${condition.resultTerm}){ + |${notSuitedToCondition.mkString("\n")} --- End diff -- I think this part is excessive, because in result code we get: ```java //{condition.code} boolean isNull$55 = isNull$45 || isNull$50; boolean result$54; if (isNull$55) { result$54 = false; } else { result$54 = result$44 > result$49; } //{conversion.code} if (isNull$45) { out.setField(0, null); } else { out.setField(0, result$44); } if (isNull$50) { out.setField(1, null); } else { out.setField(1, result$49); } //if(!${condition.resultTerm}){ {notSuitedToCondition.mkString("\n")} if(!result$54){ out.setField(1,null); } ``` As result we twice write to 1 field - rewrite value on null. I don't know how do it better. > Extend DataSetSingleRowJoin to support Left and Right joins > --- > > Key: FLINK-5256 > URL: https://issues.apache.org/jira/browse/FLINK-5256 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.2.0 >Reporter: Fabian Hueske >Assignee: Anton Mushin > > The {{DataSetSingleRowJoin}} is a broadcast-map join that supports arbitrary > inner joins where one input is a single row. > I found that Calcite translates certain subqueries into non-equi left and > right joins with single input. These cases can be handled if the > {{DataSetSingleRowJoin}} is extended to support outer joins on the > non-single-row input, i.e., left joins if the right side is single input and > vice versa. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #3033: [FLINK-5256] Extend DataSetSingleRowJoin to suppor...
Github user ex00 commented on a diff in the pull request: https://github.com/apache/flink/pull/3033#discussion_r96356194 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetSingleRowJoin.scala --- @@ -144,21 +150,46 @@ class DataSetSingleRowJoin( val condition = codeGenerator.generateExpression(joinCondition) -val joinMethodBody = s""" - |${condition.code} - |if (${condition.resultTerm}) { - | ${conversion.code} - | ${codeGenerator.collectorTerm}.collect(${conversion.resultTerm}); - |} - |""".stripMargin +val joinMethodBody = + if (joinType == JoinRelType.INNER) { +s""" + |${condition.code} + |if (${condition.resultTerm}) { + | ${conversion.code} + | ${codeGenerator.collectorTerm}.collect(${conversion.resultTerm}); + |} + |""".stripMargin +} else { +val singleNode = + if (leftIsSingle) { +leftNode + } + else { +rightNode + } +val notSuitedToCondition = singleNode + .getRowType + .getFieldList + .map(field => getRowType.getFieldNames.indexOf(field.getName)) + .map(i => s"${conversion.resultTerm}.setField($i,null);") + +s""" + |${condition.code} + |${conversion.code} + |if(!${condition.resultTerm}){ + |${notSuitedToCondition.mkString("\n")} --- End diff -- I think this part is excessive, because in result code we get: ```java //{condition.code} boolean isNull$55 = isNull$45 || isNull$50; boolean result$54; if (isNull$55) { result$54 = false; } else { result$54 = result$44 > result$49; } //{conversion.code} if (isNull$45) { out.setField(0, null); } else { out.setField(0, result$44); } if (isNull$50) { out.setField(1, null); } else { out.setField(1, result$49); } //if(!${condition.resultTerm}){ {notSuitedToCondition.mkString("\n")} if(!result$54){ out.setField(1,null); } ``` As result we twice write to 1 field - rewrite value on null. I don't know how do it better. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Updated] (FLINK-5498) OuterJoins with non-equality predicates compute wrong results
[ https://issues.apache.org/jira/browse/FLINK-5498?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] lincoln.lee updated FLINK-5498: --- Description: I found the expected result of a unit test case incorrect compare to that in a RDMBS, see flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/JoinITCase.scala {code:title=JoinITCase.scala} def testRightJoinWithNotOnlyEquiJoin(): Unit = { ... val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h) val joinT = ds1.rightOuterJoin(ds2, 'a === 'd && 'b < 'h).select('c, 'g) val expected = "Hello world,BCD\n" val results = joinT.toDataSet[Row].collect() TestBaseUtils.compareResultAsText(results.asJava, expected) } {code} Then I took some time to learn about the ‘outer join’ in relational databases, the right result of above case should be(tested in SQL Server and MySQL, the results are same): {code} > select c, g from tuple3 right outer join tuple5 on a=f and bselect c, g from tuple3 right outer join tuple5 on a=f and b
[jira] [Commented] (FLINK-5519) scala-maven-plugin version all change to 3.2.2
[ https://issues.apache.org/jira/browse/FLINK-5519?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15825571#comment-15825571 ] ASF GitHub Bot commented on FLINK-5519: --- GitHub user shijinkui opened a pull request: https://github.com/apache/flink/pull/3132 [FLINK-5519] [build] scala-maven-plugin version all change to 3.2.2 1. scala-maven-plugin version all change to 3.2.2 in all module 2. parent pom version change to apache-18 from apache-14 - [X] General - The pull request references the related JIRA issue ("[FLINK-5519] scala-maven-plugin version all change to 3.2.2") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [X] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [X] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/shijinkui/flink FLINK-5519_scala-maven-plugin Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3132.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3132 commit d583bf6038c2ffa5a50f7dd4bcc53815ed4cb003 Author: shijinkuiDate: 2017-01-17T07:10:52Z [FLINK-5519] [build] scala-maven-plugin version all change to 3.2.2 > scala-maven-plugin version all change to 3.2.2 > -- > > Key: FLINK-5519 > URL: https://issues.apache.org/jira/browse/FLINK-5519 > Project: Flink > Issue Type: Improvement > Components: Build System >Reporter: shijinkui > > 1. scala-maven-plugin version all change to 3.2.2 in all module > 2. parent pom version change to apache-18 from apache-14 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #3132: [FLINK-5519] [build] scala-maven-plugin version al...
GitHub user shijinkui opened a pull request: https://github.com/apache/flink/pull/3132 [FLINK-5519] [build] scala-maven-plugin version all change to 3.2.2 1. scala-maven-plugin version all change to 3.2.2 in all module 2. parent pom version change to apache-18 from apache-14 - [X] General - The pull request references the related JIRA issue ("[FLINK-5519] scala-maven-plugin version all change to 3.2.2") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [X] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [X] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/shijinkui/flink FLINK-5519_scala-maven-plugin Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3132.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3132 commit d583bf6038c2ffa5a50f7dd4bcc53815ed4cb003 Author: shijinkuiDate: 2017-01-17T07:10:52Z [FLINK-5519] [build] scala-maven-plugin version all change to 3.2.2 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-5519) scala-maven-plugin version all change to 3.2.2
shijinkui created FLINK-5519: Summary: scala-maven-plugin version all change to 3.2.2 Key: FLINK-5519 URL: https://issues.apache.org/jira/browse/FLINK-5519 Project: Flink Issue Type: Improvement Components: Build System Reporter: shijinkui 1. scala-maven-plugin version all change to 3.2.2 in all module 2. parent pom version change to apache-18 from apache-14 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-5518) HadoopInputFormat throws NPE when close() is called before open()
[ https://issues.apache.org/jira/browse/FLINK-5518?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jakub Havlik updated FLINK-5518: Description: When developing a simple Flink applications reading ORC files it crashes with NullPointerException when number of instances/executor threads is higher then the number of files because it is trying to close a HadoopInputFormat which is trying to close RecordReader which was not yet initialized as there is no file for which it should have been opened. The issue is caused when {code:java} public void run(SourceContext ctx) throws Exception { try { ... while (isRunning) { format.open(splitIterator.next()); ... } finally { format.close(); ... } {code} in file {{InputFormatSourceFunction.java}} which calls {code:java} public void close() throws IOException { // enforce sequential close() calls synchronized (CLOSE_MUTEX) { this.recordReader.close(); } } {code} from {{HadoopInputFormatBase.java}}. As there is just this one implementation of the {{close()}} method it may be enough just to add a null check for the {{this.recordReader}} in there. was: When developing a simple Flink applications reading ORC files it crashes with NullPointerException when number of instances/executor threads is higher then the number of files because it is trying to close a HadoopInputFormat which is trying to close RecordReader which was not yet initialized as there is no file for which it should have been opened. The issue is caused when {code:java} public void run(SourceContext ctx) throws Exception { try { ... while (isRunning) { format.open(splitIterator.next()); ... } finally { format.close(); ... } {code} in file {{InputFormatSourceFunction.java}} which calls {code:java} public void close() throws IOException { // enforce sequential close() calls synchronized (CLOSE_MUTEX) { this.recordReader.close(); } } {code} from {{HadoopInputFormatBase.java}}. As there is just this one implementation of the {{close()}} method it may be enough just to add a null check for the {{this.recordReader}} in there. > HadoopInputFormat throws NPE when close() is called before open() > - > > Key: FLINK-5518 > URL: https://issues.apache.org/jira/browse/FLINK-5518 > Project: Flink > Issue Type: Bug > Components: Batch Connectors and Input/Output Formats >Affects Versions: 1.1.4 >Reporter: Jakub Havlik > Labels: beginner, easyfix, newbie > > When developing a simple Flink applications reading ORC files it crashes with > NullPointerException when number of instances/executor threads is higher then > the number of files because it is trying to close a HadoopInputFormat which > is trying to close RecordReader which was not yet initialized as there is no > file for which it should have been opened. The issue is caused when > {code:java} > public void run(SourceContext ctx) throws Exception { > try { > ... > while (isRunning) { > format.open(splitIterator.next()); > ... > } finally { > format.close(); > ... > } > {code} > in file {{InputFormatSourceFunction.java}} which calls > {code:java} > public void close() throws IOException { > // enforce sequential close() calls > synchronized (CLOSE_MUTEX) { > this.recordReader.close(); > } > } > {code} > from {{HadoopInputFormatBase.java}}. > As there is just this one implementation of the {{close()}} method it may be > enough just to add a null check for the {{this.recordReader}} in there. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-5518) HadoopInputFormat throws NPE when close() is called before open()
Jakub Havlik created FLINK-5518: --- Summary: HadoopInputFormat throws NPE when close() is called before open() Key: FLINK-5518 URL: https://issues.apache.org/jira/browse/FLINK-5518 Project: Flink Issue Type: Bug Components: Batch Connectors and Input/Output Formats Affects Versions: 1.1.4 Reporter: Jakub Havlik When developing a simple Flink applications reading ORC files it crashes with NullPointerException when number of instances/executor threads is higher then the number of files because it is trying to close a HadoopInputFormat which is trying to close RecordReader which was not yet initialized as there is no file for which it should have been opened. The issue is caused when {code:java} public void run(SourceContext ctx) throws Exception { try { ... while (isRunning) { format.open(splitIterator.next()); ... } finally { format.close(); ... } {code} in file {{InputFormatSourceFunction.java}} which calls {code:java} public void close() throws IOException { // enforce sequential close() calls synchronized (CLOSE_MUTEX) { this.recordReader.close(); } } {code} from {{HadoopInputFormatBase.java}}. As there is just this one implementation of the {{close()}} method it may be enough just to add a null check for the {{this.recordReader}} in there. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2168) Add HBaseTableSource
[ https://issues.apache.org/jira/browse/FLINK-2168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15825551#comment-15825551 ] ramkrishna.s.vasudevan commented on FLINK-2168: --- Trying your suggestions bq.Since the code in flink-hbase is implemented in Java, the new HBaseTableSource should do the same. If I try making HBaseTableSource in java I don't have BatchTableSource or ProjectableTableSource in scala -so I need to create them first? And then link flink-hbase under flink-connectors to make use of flink-table? > Add HBaseTableSource > > > Key: FLINK-2168 > URL: https://issues.apache.org/jira/browse/FLINK-2168 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Affects Versions: 0.9 >Reporter: Fabian Hueske >Assignee: ramkrishna.s.vasudevan >Priority: Minor > Labels: starter > > Add a {{HBaseTableSource}} to read data from a HBase table. The > {{HBaseTableSource}} should implement the {{ProjectableTableSource}} > (FLINK-3848) and {{FilterableTableSource}} (FLINK-3849) interfaces. > The implementation can be based on Flink's {{TableInputFormat}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5386) Refactoring Window Clause
[ https://issues.apache.org/jira/browse/FLINK-5386?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15825546#comment-15825546 ] sunjincheng commented on FLINK-5386: Hi [~fhueske] [~shaoxuan] thanks for the reply . [~fhueske] You are right, no matter it is a stream table or a batch table, we need to ensure the correctness. As you said we must check the window's properties at the implementation phase. I agree with you. BTW, "Groupby ('w)" is not only consistent with the row-window, but also consistent with the calcite SQL. For instance: GroupBy: {code} SELECT STREAM TUMBLE_END(rowtime, INTERVAL '1' HOUR) AS rowtime, productId, COUNT(*) AS c, SUM(units) AS units FROM Orders GROUP BY TUMBLE(rowtime, INTERVAL '1' HOUR), productId; {code} Over: {code} SELECT STREAM * FROM ( SELECT STREAM rowtime, productId, units, AVG(units) OVER product (RANGE INTERVAL '10' MINUTE PRECEDING) AS m10, AVG(units) OVER product (RANGE INTERVAL '7' DAY PRECEDING) AS d7 FROM Orders WINDOW product AS ( ORDER BY rowtime PARTITION BY productId)) WHERE m10 > d7; {code} The following two statements are supported by the current changes: #1. windows are defined at the start and used later: {code} val windowedTable = table .window(Slide over 10.milli every 5.milli as 'w1) .window(Tumble over 5.milli as 'w2) .groupBy('w1, 'key) .select('string, 'int.count as 'count, 'w1.start) .groupBy( 'w2, 'key) .select('string, 'count.sum as sum2) {code} #2. windows are defined with groupBy: {code} val windowedTable = table .window(Slide over 10.milli every 5.milli as 'w1) .groupBy('w1, 'key) .select('string, 'int.count as 'count, 'w1.start) .window(Tumble over 5.milli as 'w2) .groupBy( 'w2, 'key) .select('string, 'count.sum as sum2) {code} I hope this makes sense to you? You said "by tying window and groupBy together, we could avoid such situations" is just like # 2 or must be written "groupBy (). Window ()"? reference: Azure: https://msdn.microsoft.com/en-us/library/azure/dn835051.aspx Calcite: http://calcite.apache.org/docs/stream.html#tumbling-windows > Refactoring Window Clause > - > > Key: FLINK-5386 > URL: https://issues.apache.org/jira/browse/FLINK-5386 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: sunjincheng >Assignee: sunjincheng > > Similar to the SQL, window clause is defined "as" a symbol which is > explicitly used in groupby/over. We are proposing to refactor the way to > write groupby+window tableAPI as follows: > {code} > val windowedTable = table > .window(Slide over 10.milli every 5.milli as 'w1) > .window(Tumble over 5.milli as 'w2) > .groupBy('w1, 'key) > .select('string, 'int.count as 'count, 'w1.start) > .groupBy( 'w2, 'key) > .select('string, 'count.sum as sum2) > .window(Tumble over 5.milli as 'w3) > .groupBy( 'w3) // windowAll > .select('sum2, 'w3.start, 'w3.end) > {code} > In this way, we can remove both GroupWindowedTable and the window() method in > GroupedTable which makes the API a bit clean. In addition, for row-window, we > anyway need to define window clause as a symbol. This change will make the > API of window and row-window consistent, example for row-window: > {code} > .window(RowXXXWindow as ‘x, RowYYYWindow as ‘y) > .select(‘a, ‘b.count over ‘x as ‘xcnt, ‘c.count over ‘y as ‘ycnt, ‘x.start, > ‘x.end) > {code} > What do you think? [~fhueske] [~twalthr] -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (FLINK-4266) Remote Database Statebackend
[ https://issues.apache.org/jira/browse/FLINK-4266?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15824927#comment-15824927 ] Syinchwun Leo edited comment on FLINK-4266 at 1/17/17 4:12 AM: --- We also meet this problem,and plan to using incremental method. In current master branch, there are four window state: ValueState, ListState, ReduingState and FoldingState. In ReducingState and FoldingState, window only keeps intermediate results calculated by UDF reducing function or folding function. When a element coming, the related intermediate result is recomputed again by applying the UDF function. In my opinion it is costly to record all the changes in realtime, what's more, we have to scan the pipeline reversely for the purpose of recovery . In current sliding window's saving method, there is a big drawback: an element may be kept in multiple windows, when checkpointing, all these backups are export to persistent storage, it's time and storage consuming. we have modify the saving method of sliding window (including SlidingProcessingTimeWindow and SlidingEventTimeWIndow), and it is under testing. You can keep in touch with me(liu...@mail.ustc.edu.cn), and we can develop this feature in cooperation. was (Author: syinchwunleo): We also meet this problem,and plan to using incremental method. In current master branch, there are four window state: ValueState, ListState, ReduingState and FoldingState. In ReducingState and FoldingState, window only keeps intermediate results calculated by UDF reducing function or folding function. When a element coming, the related intermediate result is recomputed again by applying the UDF function. In my opinion it is costly to record all the changes in realtime, what's more, we have to scan the pipeline reversely for the purpose of recovery . In current sliding window's saving method, there is a big drawback: an element may be kept in multiple windows, when checkpointing, all these backups are export to persistent storage, it's time and storage consuming. we have modify the saving method of sliding window (including SlidingProcessingTimeWindow and SlidingEventTimeWIndow), and it is testing. You can keep in touch with me(liu...@mail.ustc.edu.cn), and we can develop this feature in cooperation. > Remote Database Statebackend > > > Key: FLINK-4266 > URL: https://issues.apache.org/jira/browse/FLINK-4266 > Project: Flink > Issue Type: New Feature > Components: State Backends, Checkpointing >Affects Versions: 1.2.0 >Reporter: Chen Qin >Priority: Minor > > Current FileSystem statebackend limits whole state size to disk space. > Dealing with scale out checkpoint states beyond local disk space such as long > running task that hold window content for long period of time. Pipelines > needs to split out states to durable remote storage even replicated to > different data centers. > We draft a design that leverage checkpoint id as mono incremental logic > timestamp and perform range query to get evicited state k/v. we also > introduce checkpoint time commit and eviction threshold that reduce "hot > states" hitting remote db per every update between adjacent checkpoints by > tracking update logs and merge them, do batch updates only when checkpoint; > lastly, we are looking for eviction policy that can identify "hot keys" in > k/v state and lazy load those "cold keys" from remote storage(e.g Cassandra). > For now, we don't have good story regarding to data retirement. We might have > to keep forever until manually run command and clean per job related state > data. Some of features might related to incremental checkpointing feature, we > hope to align with effort there also. > Welcome comments, I will try to put a draft design doc after gathering some > feedback. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (FLINK-4266) Remote Database Statebackend
[ https://issues.apache.org/jira/browse/FLINK-4266?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15824927#comment-15824927 ] Syinchwun Leo edited comment on FLINK-4266 at 1/17/17 4:11 AM: --- We also meet this problem,and plan to using incremental method. In current master branch, there are four window state: ValueState, ListState, ReduingState and FoldingState. In ReducingState and FoldingState, window only keeps intermediate results calculated by UDF reducing function or folding function. When a element coming, the related intermediate result is recomputed again by applying the UDF function. In my opinion it is costly to record all the changes in realtime, what's more, we have to scan the pipeline reversely for the purpose of recovery . In current sliding window's saving method, there is a big drawback: an element may be kept in multiple windows, when checkpointing, all these backups are export to persistent storage, it's time and storage consuming. we have modify the saving method of sliding window (including SlidingProcessingTimeWindow and SlidingEventTimeWIndow), and it is testing. You can keep in touch with me(liu...@mail.ustc.edu.cn), and we can develop this feature in cooperation. was (Author: syinchwunleo): We also meet this problem,and plan to using incremental method. In current master branch, there are four window state: ValueState, ListState, ReduingState and FoldingState. In ReducingState and FoldingState, window only keeps intermediate results calculated by UDF reducing function or folding function. When a element coming, the related intermediate result is recomputed again by applying the UDF function. In my opinion it is costly to record all the changes in realtime, what's more, we have to scan the pipeline reversely for the purpose of recovery . In current sliding window's saving method, there is a big drawback: an element may be kept in multiple windows, when checkpointing, all these backups are export to persistent stores, it's time and storage consuming. we have modify the saving method of sliding window (including SlidingProcessingTimeWindow and SlidingEventTimeWIndow), and it is testing. You can keep in touch with me(liu...@mail.ustc.edu.cn), and we can develop this feature in cooperation. > Remote Database Statebackend > > > Key: FLINK-4266 > URL: https://issues.apache.org/jira/browse/FLINK-4266 > Project: Flink > Issue Type: New Feature > Components: State Backends, Checkpointing >Affects Versions: 1.2.0 >Reporter: Chen Qin >Priority: Minor > > Current FileSystem statebackend limits whole state size to disk space. > Dealing with scale out checkpoint states beyond local disk space such as long > running task that hold window content for long period of time. Pipelines > needs to split out states to durable remote storage even replicated to > different data centers. > We draft a design that leverage checkpoint id as mono incremental logic > timestamp and perform range query to get evicited state k/v. we also > introduce checkpoint time commit and eviction threshold that reduce "hot > states" hitting remote db per every update between adjacent checkpoints by > tracking update logs and merge them, do batch updates only when checkpoint; > lastly, we are looking for eviction policy that can identify "hot keys" in > k/v state and lazy load those "cold keys" from remote storage(e.g Cassandra). > For now, we don't have good story regarding to data retirement. We might have > to keep forever until manually run command and clean per job related state > data. Some of features might related to incremental checkpointing feature, we > hope to align with effort there also. > Welcome comments, I will try to put a draft design doc after gathering some > feedback. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (FLINK-4266) Remote Database Statebackend
[ https://issues.apache.org/jira/browse/FLINK-4266?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15824927#comment-15824927 ] Syinchwun Leo edited comment on FLINK-4266 at 1/17/17 4:09 AM: --- We also meet this problem,and plan to using incremental method. In current master branch, there are four window state: ValueState, ListState, ReduingState and FoldingState. In ReducingState and FoldingState, window only keeps intermediate results calculated by UDF reducing function or folding function. When a element coming, the related intermediate result is recomputed again by applying the UDF function. In my opinion it is costly to record all the changes in realtime, what's more, we have to scan the pipeline reversely for the purpose of recovery . In current sliding window's saving method, there is a big drawback: an element may be kept in multiple windows, when checkpointing, all these backups are export to persistent stores, it's time and storage consuming. we have modify the saving method of sliding window (including SlidingProcessingTimeWindow and SlidingEventTimeWIndow), and it is testing. You can keep in touch with me(liu...@mail.ustc.edu.cn), and we can develop this feature in cooperation. was (Author: syinchwunleo): We also meet this problem,and plan to using incremental method. In currently master branch, there are four window state: ValueState, ListState, ReduingState and FoldingState. In ReducingState and FoldingState, window only keeps intermediate results calculated by UDF reducing function or folding function. When a element coming, the related intermediate result is recomputed again by applying the UDF function. In my opinion it is costly to record all the changes in realtime, what's more, we have to scan the pipeline reversely for the purpose of recovery . In current sliding window's saving method, there is a big drawback: an element may be kept in multiple windows, when checkpointing, all these backups are export to persistent stores, it's time and storage consuming. we have modify the saving method of sliding window (including SlidingProcessingTimeWindow and SlidingEventTimeWIndow), and it is testing. You can keep in touch with me(liu...@mail.ustc.edu.cn), and we can develop this feature in cooperation. > Remote Database Statebackend > > > Key: FLINK-4266 > URL: https://issues.apache.org/jira/browse/FLINK-4266 > Project: Flink > Issue Type: New Feature > Components: State Backends, Checkpointing >Affects Versions: 1.2.0 >Reporter: Chen Qin >Priority: Minor > > Current FileSystem statebackend limits whole state size to disk space. > Dealing with scale out checkpoint states beyond local disk space such as long > running task that hold window content for long period of time. Pipelines > needs to split out states to durable remote storage even replicated to > different data centers. > We draft a design that leverage checkpoint id as mono incremental logic > timestamp and perform range query to get evicited state k/v. we also > introduce checkpoint time commit and eviction threshold that reduce "hot > states" hitting remote db per every update between adjacent checkpoints by > tracking update logs and merge them, do batch updates only when checkpoint; > lastly, we are looking for eviction policy that can identify "hot keys" in > k/v state and lazy load those "cold keys" from remote storage(e.g Cassandra). > For now, we don't have good story regarding to data retirement. We might have > to keep forever until manually run command and clean per job related state > data. Some of features might related to incremental checkpointing feature, we > hope to align with effort there also. > Welcome comments, I will try to put a draft design doc after gathering some > feedback. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4266) Remote Database Statebackend
[ https://issues.apache.org/jira/browse/FLINK-4266?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15824927#comment-15824927 ] Syinchwun Leo commented on FLINK-4266: -- We also meet this problem,and plan to using incremental method. In currently master branch, there are four window state: ValueState, ListState, ReduingState and FoldingState. In ReducingState and FoldingState, window only keeps intermediate results calculated by UDF reducing function or folding function. When a element coming, the related intermediate result is recomputed again by applying the UDF function. In my opinion it is costly to record all the changes in realtime, what's more, we have to scan the pipeline reversely for the purpose of recovery . In current sliding window's saving method, there is a big drawback: an element may be kept in multiple windows, when checkpointing, all these backups are export to persistent stores, it's time and storage consuming. we have modify the saving method of sliding window (including SlidingProcessingTimeWindow and SlidingEventTimeWIndow), and it is testing. You can keep in touch with me(liu...@mail.ustc.edu.cn), and we can develop this feature in cooperation. > Remote Database Statebackend > > > Key: FLINK-4266 > URL: https://issues.apache.org/jira/browse/FLINK-4266 > Project: Flink > Issue Type: New Feature > Components: State Backends, Checkpointing >Affects Versions: 1.2.0 >Reporter: Chen Qin >Priority: Minor > > Current FileSystem statebackend limits whole state size to disk space. > Dealing with scale out checkpoint states beyond local disk space such as long > running task that hold window content for long period of time. Pipelines > needs to split out states to durable remote storage even replicated to > different data centers. > We draft a design that leverage checkpoint id as mono incremental logic > timestamp and perform range query to get evicited state k/v. we also > introduce checkpoint time commit and eviction threshold that reduce "hot > states" hitting remote db per every update between adjacent checkpoints by > tracking update logs and merge them, do batch updates only when checkpoint; > lastly, we are looking for eviction policy that can identify "hot keys" in > k/v state and lazy load those "cold keys" from remote storage(e.g Cassandra). > For now, we don't have good story regarding to data retirement. We might have > to keep forever until manually run command and clean per job related state > data. Some of features might related to incremental checkpointing feature, we > hope to align with effort there also. > Welcome comments, I will try to put a draft design doc after gathering some > feedback. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (FLINK-5386) Refactoring Window Clause
[ https://issues.apache.org/jira/browse/FLINK-5386?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15824844#comment-15824844 ] Shaoxuan Wang edited comment on FLINK-5386 at 1/17/17 3:37 AM: --- [~sunjincheng121], thanks for the updates. Hi [~fhueske], The major reason we propose this change is that window is a clause but not an operator from semantic point of view. This changes give the flexibility to users such that they still can put window clause and groupby close (just move the window definition before groupby) if they want. I think your have raised a good question on "scope of window" for batch window on a certain column (which could be removed by some operators). We should make sure this will still work. We will check the design and add the test cases for this. was (Author: shaoxuanwang): [~sunjincheng121], thanks for the updates. Hi [~fhueske], The major reason we propose this change is because of the row window. For row window, there could be no groupby keys. As the current proposal in FLIP11, the tableAPI is as follows: {code} .window(RowWindow as ‘x) .select(‘b.count over ‘x as ‘xcnt, ‘x.start, ‘x.end) {code} If we want to partition the data and trigger the result using window function, we have to translate the .window operator to a kind of grouping by query plan, which is a little weird. With this proposal, groupby operator will be able to not only groupby keys, but also window clause. I think this is the correct semantic. The above example will be written in this way: {code} .window(RowWindow as ‘x) .groupby('x) .select(‘b.count over ‘x as ‘xcnt, ‘x.start, ‘x.end) {code} What do you think? This changes give more flexibility to users such that they can still put window clause and groupby close (just move the window definition before groupby) if they want. I think your have raised a good question on "scope of window" for batch window on a certain column (which could be removed by some operators). We should make sure this will still work. We will check the design and add the test cases for this. > Refactoring Window Clause > - > > Key: FLINK-5386 > URL: https://issues.apache.org/jira/browse/FLINK-5386 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: sunjincheng >Assignee: sunjincheng > > Similar to the SQL, window clause is defined "as" a symbol which is > explicitly used in groupby/over. We are proposing to refactor the way to > write groupby+window tableAPI as follows: > {code} > val windowedTable = table > .window(Slide over 10.milli every 5.milli as 'w1) > .window(Tumble over 5.milli as 'w2) > .groupBy('w1, 'key) > .select('string, 'int.count as 'count, 'w1.start) > .groupBy( 'w2, 'key) > .select('string, 'count.sum as sum2) > .window(Tumble over 5.milli as 'w3) > .groupBy( 'w3) // windowAll > .select('sum2, 'w3.start, 'w3.end) > {code} > In this way, we can remove both GroupWindowedTable and the window() method in > GroupedTable which makes the API a bit clean. In addition, for row-window, we > anyway need to define window clause as a symbol. This change will make the > API of window and row-window consistent, example for row-window: > {code} > .window(RowXXXWindow as ‘x, RowYYYWindow as ‘y) > .select(‘a, ‘b.count over ‘x as ‘xcnt, ‘c.count over ‘y as ‘ycnt, ‘x.start, > ‘x.end) > {code} > What do you think? [~fhueske] [~twalthr] -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5372) Fix RocksDBAsyncSnapshotTest.testCancelFullyAsyncCheckpoints()
[ https://issues.apache.org/jira/browse/FLINK-5372?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15824885#comment-15824885 ] Syinchwun Leo commented on FLINK-5372: -- When I run RocksDBStateBackend's test alone, it is OK. But when running Flink's whole test, it will be locked like the following: --- T E S T S --- Running org.apache.flink.contrib.streaming.state.RocksDBInitResetTest Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.056 sec - in org.apache.flink.contrib.streaming.state.RocksDBInitResetTest Running org.apache.flink.contrib.streaming.state.RocksDBStateBackendTest Tests run: 25, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.961 sec - in org.apache.flink.contrib.streaming.state.RocksDBStateBackendTest Running org.apache.flink.contrib.streaming.state.RocksDBStateBackendConfigTest Tests run: 11, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.049 sec - in org.apache.flink.contrib.streaming.state.RocksDBStateBackendConfigTest Running org.apache.flink.contrib.streaming.state.RocksDBMergeIteratorTest Tests run: 2, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 1.359 sec - in org.apache.flink.contrib.streaming.state.RocksDBMergeIteratorTest Running org.apache.flink.contrib.streaming.state.RocksDBStateBackendFactoryTest Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.002 sec - in org.apache.flink.contrib.streaming.state.RocksDBStateBackendFactoryTest Running org.apache.flink.contrib.streaming.state.RocksDBAsyncSnapshotTest No test result > Fix RocksDBAsyncSnapshotTest.testCancelFullyAsyncCheckpoints() > -- > > Key: FLINK-5372 > URL: https://issues.apache.org/jira/browse/FLINK-5372 > Project: Flink > Issue Type: Bug > Components: Streaming >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek > > The test is currently {{@Ignored}}. We have to change > {{AsyncCheckpointOperator}} to make sure that we can run fully > asynchronously. Then, the test will still fail because the canceling > behaviour was changed in the meantime. > {code} > public static class AsyncCheckpointOperator > extends AbstractStreamOperator > implements OneInputStreamOperator{ > @Override > public void open() throws Exception { > super.open(); > // also get the state in open, this way we are sure that it was > created before > // we trigger the test checkpoint > ValueState state = getPartitionedState( > VoidNamespace.INSTANCE, > VoidNamespaceSerializer.INSTANCE, > new ValueStateDescriptor<>("count", > StringSerializer.INSTANCE, "hello")); > } > @Override > public void processElement(StreamRecord element) throws Exception > { > // we also don't care > ValueState state = getPartitionedState( > VoidNamespace.INSTANCE, > VoidNamespaceSerializer.INSTANCE, > new ValueStateDescriptor<>("count", > StringSerializer.INSTANCE, "hello")); > state.update(element.getValue()); > } > @Override > public void snapshotState(StateSnapshotContext context) throws Exception { > // do nothing so that we don't block > } > } > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5386) Refactoring Window Clause
[ https://issues.apache.org/jira/browse/FLINK-5386?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15824844#comment-15824844 ] Shaoxuan Wang commented on FLINK-5386: -- [~sunjincheng121], thanks for the updates. Hi [~fhueske], The major reason we propose this change is because of the row window. For row window, there could be no groupby keys. As the current proposal in FLIP11, the tableAPI is as follows: {code} .window(RowWindow as ‘x) .select(‘b.count over ‘x as ‘xcnt, ‘x.start, ‘x.end) {code} If we want to partition the data and trigger the result using window function, we have to translate the .window operator to a kind of grouping by query plan, which is a little weird. With this proposal, groupby operator will be able to not only groupby keys, but also window clause. I think this is the correct semantic. The above example will be written in this way: {code} .window(RowWindow as ‘x) .groupby('x) .select(‘b.count over ‘x as ‘xcnt, ‘x.start, ‘x.end) {code} What do you think? This changes give more flexibility to users such that they can still put window clause and groupby close (just move the window definition before groupby) if they want. I think your have raised a good question on "scope of window" for batch window on a certain column (which could be removed by some operators). We should make sure this will still work. We will check the design and add the test cases for this. > Refactoring Window Clause > - > > Key: FLINK-5386 > URL: https://issues.apache.org/jira/browse/FLINK-5386 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: sunjincheng >Assignee: sunjincheng > > Similar to the SQL, window clause is defined "as" a symbol which is > explicitly used in groupby/over. We are proposing to refactor the way to > write groupby+window tableAPI as follows: > {code} > val windowedTable = table > .window(Slide over 10.milli every 5.milli as 'w1) > .window(Tumble over 5.milli as 'w2) > .groupBy('w1, 'key) > .select('string, 'int.count as 'count, 'w1.start) > .groupBy( 'w2, 'key) > .select('string, 'count.sum as sum2) > .window(Tumble over 5.milli as 'w3) > .groupBy( 'w3) // windowAll > .select('sum2, 'w3.start, 'w3.end) > {code} > In this way, we can remove both GroupWindowedTable and the window() method in > GroupedTable which makes the API a bit clean. In addition, for row-window, we > anyway need to define window clause as a symbol. This change will make the > API of window and row-window consistent, example for row-window: > {code} > .window(RowXXXWindow as ‘x, RowYYYWindow as ‘y) > .select(‘a, ‘b.count over ‘x as ‘xcnt, ‘c.count over ‘y as ‘ycnt, ‘x.start, > ‘x.end) > {code} > What do you think? [~fhueske] [~twalthr] -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-5517) Upgrade hbase version to 1.3.0
Ted Yu created FLINK-5517: - Summary: Upgrade hbase version to 1.3.0 Key: FLINK-5517 URL: https://issues.apache.org/jira/browse/FLINK-5517 Project: Flink Issue Type: Improvement Reporter: Ted Yu In the thread 'Help using HBase with Flink 1.1.4', Giuliano reported seeing: {code} java.lang.IllegalAccessError: tried to access method com.google.common.base.Stopwatch.()V from class org.apache.hadoop.hbase.zookeeper.MetaTableLocator {code} The above has been solved by HBASE-14963 hbase 1.3.0 is being released. We should upgrade hbase dependency to 1.3.0 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #3111: [FLINK-3475] [Table] DISTINCT aggregate function s...
Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3111#discussion_r96329552 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala --- @@ -96,6 +96,13 @@ object FlinkRuleSets { ProjectToCalcRule.INSTANCE, CalcMergeRule.INSTANCE, +// distinct aggregate rule for FLINK-3475 --- End diff -- I think this comment can move to `FlinkAggregateExpandDistinctAggregatesRule`, and open a jira to track to remove this class after we upgrade Calcite. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3475) DISTINCT aggregate function support for SQL queries
[ https://issues.apache.org/jira/browse/FLINK-3475?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15824818#comment-15824818 ] ASF GitHub Bot commented on FLINK-3475: --- Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3111#discussion_r96329552 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala --- @@ -96,6 +96,13 @@ object FlinkRuleSets { ProjectToCalcRule.INSTANCE, CalcMergeRule.INSTANCE, +// distinct aggregate rule for FLINK-3475 --- End diff -- I think this comment can move to `FlinkAggregateExpandDistinctAggregatesRule`, and open a jira to track to remove this class after we upgrade Calcite. > DISTINCT aggregate function support for SQL queries > --- > > Key: FLINK-3475 > URL: https://issues.apache.org/jira/browse/FLINK-3475 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Chengxiang Li >Assignee: Zhenghua Gao > > DISTINCT aggregate function may be able to reuse the aggregate function > instead of separate implementation, and let Flink runtime take care of > duplicate records. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-4266) Remote Database Statebackend
[ https://issues.apache.org/jira/browse/FLINK-4266?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chen Qin updated FLINK-4266: Affects Version/s: (was: 1.0.3) > Remote Database Statebackend > > > Key: FLINK-4266 > URL: https://issues.apache.org/jira/browse/FLINK-4266 > Project: Flink > Issue Type: New Feature > Components: State Backends, Checkpointing >Affects Versions: 1.2.0 >Reporter: Chen Qin >Priority: Minor > > Current FileSystem statebackend limits whole state size to disk space. > Dealing with scale out checkpoint states beyond local disk space such as long > running task that hold window content for long period of time. Pipelines > needs to split out states to durable remote storage even replicated to > different data centers. > We draft a design that leverage checkpoint id as mono incremental logic > timestamp and perform range query to get evicited state k/v. we also > introduce checkpoint time commit and eviction threshold that reduce "hot > states" hitting remote db per every update between adjacent checkpoints by > tracking update logs and merge them, do batch updates only when checkpoint; > lastly, we are looking for eviction policy that can identify "hot keys" in > k/v state and lazy load those "cold keys" from remote storage(e.g Cassandra). > For now, we don't have good story regarding to data retirement. We might have > to keep forever until manually run command and clean per job related state > data. Some of features might related to incremental checkpointing feature, we > hope to align with effort there also. > Welcome comments, I will try to put a draft design doc after gathering some > feedback. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5447) Sync documentation of built-in functions for Table API with SQL
[ https://issues.apache.org/jira/browse/FLINK-5447?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15824565#comment-15824565 ] ASF GitHub Bot commented on FLINK-5447: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3126#discussion_r96254835 --- Diff: docs/dev/table_api.md --- @@ -1508,522 +1508,601 @@ Both the Table API and SQL come with a set of built-in functions for data transf - Function + Comparison functions Description + {% highlight java %} -ANY.as(name [, name ]* ) +ANY === ANY {% endhighlight %} -Specifies a name for an expression i.e. a field. Additional names can be specified if the expression expands to multiple fields. +Equals. {% highlight java %} -ANY.isNull +ANY !== ANY {% endhighlight %} -Returns true if the given expression is null. +Not equal. {% highlight java %} -ANY.isNotNull +ANY > ANY {% endhighlight %} -Returns true if the given expression is not null. +Greater than. {% highlight java %} -BOOLEAN.isTrue +ANY >= ANY {% endhighlight %} -Returns true if the given boolean expression is true. False otherwise (for null and false). +Greater than or equal. {% highlight java %} -BOOLEAN.isFalse +ANY < ANY {% endhighlight %} -Returns true if given boolean expression is false. False otherwise (for null and true). +Less than. {% highlight java %} -BOOLEAN.isNotTrue +ANY <= ANY {% endhighlight %} -Returns true if the given boolean expression is not true (for null and false). False otherwise. +Less than or equal. {% highlight java %} -BOOLEAN.isNotFalse +ANY.isNull {% endhighlight %} -Returns true if given boolean expression is not false (for null and true). False otherwise. +Returns true if the given expression is null. {% highlight java %} -NUMERIC.exp() +ANY.isNotNull {% endhighlight %} -Calculates the Euler's number raised to the given power. +Returns true if the given expression is not null. {% highlight java %} -NUMERIC.log10() +STRING.like(STRING) {% endhighlight %} -Calculates the base 10 logarithm of given value. +Returns true, if a string matches the specified LIKE pattern. E.g. "Jo_n%" matches all strings that start with "Jo(arbitrary letter)n". - {% highlight java %} -NUMERIC.ln() +STRING.similar(STRING) {% endhighlight %} -Calculates the natural logarithm of given value. +Returns true, if a string matches the specified SQL regex pattern. E.g. "A+" matches all strings that consist of at least one "A". + + + + + + + Logical functions + Description + + + + + {% highlight java %} -NUMERIC.power(NUMERIC) +boolean1 || boolean2 {% endhighlight %} -Calculates the given number raised to the power of the other value. +Whether boolean1 is true or boolean2 is true. --- End diff -- I think `returns true if` is more explicit than`Whether`. > Sync documentation of built-in functions for Table API with SQL > --- > > Key: FLINK-5447 > URL: https://issues.apache.org/jira/browse/FLINK-5447 > Project: Flink > Issue Type: Improvement > Components: Documentation, Table API & SQL >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Minor > > I will split up the documentation for the built-in functions similar to the > SQL structure. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5447) Sync documentation of built-in functions for Table API with SQL
[ https://issues.apache.org/jira/browse/FLINK-5447?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15824566#comment-15824566 ] ASF GitHub Bot commented on FLINK-5447: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3126#discussion_r96255434 --- Diff: docs/dev/table_api.md --- @@ -1508,522 +1508,601 @@ Both the Table API and SQL come with a set of built-in functions for data transf - Function + Comparison functions Description + {% highlight java %} -ANY.as(name [, name ]* ) +ANY === ANY {% endhighlight %} -Specifies a name for an expression i.e. a field. Additional names can be specified if the expression expands to multiple fields. +Equals. {% highlight java %} -ANY.isNull +ANY !== ANY {% endhighlight %} -Returns true if the given expression is null. +Not equal. {% highlight java %} -ANY.isNotNull +ANY > ANY {% endhighlight %} -Returns true if the given expression is not null. +Greater than. {% highlight java %} -BOOLEAN.isTrue +ANY >= ANY {% endhighlight %} -Returns true if the given boolean expression is true. False otherwise (for null and false). +Greater than or equal. {% highlight java %} -BOOLEAN.isFalse +ANY < ANY {% endhighlight %} -Returns true if given boolean expression is false. False otherwise (for null and true). +Less than. {% highlight java %} -BOOLEAN.isNotTrue +ANY <= ANY {% endhighlight %} -Returns true if the given boolean expression is not true (for null and false). False otherwise. +Less than or equal. {% highlight java %} -BOOLEAN.isNotFalse +ANY.isNull {% endhighlight %} -Returns true if given boolean expression is not false (for null and true). False otherwise. +Returns true if the given expression is null. {% highlight java %} -NUMERIC.exp() +ANY.isNotNull {% endhighlight %} -Calculates the Euler's number raised to the given power. +Returns true if the given expression is not null. {% highlight java %} -NUMERIC.log10() +STRING.like(STRING) {% endhighlight %} -Calculates the base 10 logarithm of given value. +Returns true, if a string matches the specified LIKE pattern. E.g. "Jo_n%" matches all strings that start with "Jo(arbitrary letter)n". - {% highlight java %} -NUMERIC.ln() +STRING.similar(STRING) {% endhighlight %} -Calculates the natural logarithm of given value. +Returns true, if a string matches the specified SQL regex pattern. E.g. "A+" matches all strings that consist of at least one "A". + + + + + + + Logical functions + Description + + + + + {% highlight java %} -NUMERIC.power(NUMERIC) +boolean1 || boolean2 {% endhighlight %} -Calculates the given number raised to the power of the other value. +Whether boolean1 is true or boolean2 is true. {% highlight java %} -NUMERIC.sqrt() +boolean1 && boolean2 {% endhighlight %} -Calculates the square root of a given value. +Whether boolean1 and boolean2 are both true. {% highlight java %} -NUMERIC.abs() +!BOOLEAN {% endhighlight %} -Calculates the absolute value of given value. +Whether boolean expression is not true; returns null if boolean is null. {% highlight java %} -NUMERIC.floor()
[GitHub] flink pull request #3126: [FLINK-5447] [table] Sync documentation of built-i...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3126#discussion_r96254835 --- Diff: docs/dev/table_api.md --- @@ -1508,522 +1508,601 @@ Both the Table API and SQL come with a set of built-in functions for data transf - Function + Comparison functions Description + {% highlight java %} -ANY.as(name [, name ]* ) +ANY === ANY {% endhighlight %} -Specifies a name for an expression i.e. a field. Additional names can be specified if the expression expands to multiple fields. +Equals. {% highlight java %} -ANY.isNull +ANY !== ANY {% endhighlight %} -Returns true if the given expression is null. +Not equal. {% highlight java %} -ANY.isNotNull +ANY > ANY {% endhighlight %} -Returns true if the given expression is not null. +Greater than. {% highlight java %} -BOOLEAN.isTrue +ANY >= ANY {% endhighlight %} -Returns true if the given boolean expression is true. False otherwise (for null and false). +Greater than or equal. {% highlight java %} -BOOLEAN.isFalse +ANY < ANY {% endhighlight %} -Returns true if given boolean expression is false. False otherwise (for null and true). +Less than. {% highlight java %} -BOOLEAN.isNotTrue +ANY <= ANY {% endhighlight %} -Returns true if the given boolean expression is not true (for null and false). False otherwise. +Less than or equal. {% highlight java %} -BOOLEAN.isNotFalse +ANY.isNull {% endhighlight %} -Returns true if given boolean expression is not false (for null and true). False otherwise. +Returns true if the given expression is null. {% highlight java %} -NUMERIC.exp() +ANY.isNotNull {% endhighlight %} -Calculates the Euler's number raised to the given power. +Returns true if the given expression is not null. {% highlight java %} -NUMERIC.log10() +STRING.like(STRING) {% endhighlight %} -Calculates the base 10 logarithm of given value. +Returns true, if a string matches the specified LIKE pattern. E.g. "Jo_n%" matches all strings that start with "Jo(arbitrary letter)n". - {% highlight java %} -NUMERIC.ln() +STRING.similar(STRING) {% endhighlight %} -Calculates the natural logarithm of given value. +Returns true, if a string matches the specified SQL regex pattern. E.g. "A+" matches all strings that consist of at least one "A". + + + + + + + Logical functions + Description + + + + + {% highlight java %} -NUMERIC.power(NUMERIC) +boolean1 || boolean2 {% endhighlight %} -Calculates the given number raised to the power of the other value. +Whether boolean1 is true or boolean2 is true. --- End diff -- I think `returns true if` is more explicit than`Whether`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3126: [FLINK-5447] [table] Sync documentation of built-i...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3126#discussion_r96255434 --- Diff: docs/dev/table_api.md --- @@ -1508,522 +1508,601 @@ Both the Table API and SQL come with a set of built-in functions for data transf - Function + Comparison functions Description + {% highlight java %} -ANY.as(name [, name ]* ) +ANY === ANY {% endhighlight %} -Specifies a name for an expression i.e. a field. Additional names can be specified if the expression expands to multiple fields. +Equals. {% highlight java %} -ANY.isNull +ANY !== ANY {% endhighlight %} -Returns true if the given expression is null. +Not equal. {% highlight java %} -ANY.isNotNull +ANY > ANY {% endhighlight %} -Returns true if the given expression is not null. +Greater than. {% highlight java %} -BOOLEAN.isTrue +ANY >= ANY {% endhighlight %} -Returns true if the given boolean expression is true. False otherwise (for null and false). +Greater than or equal. {% highlight java %} -BOOLEAN.isFalse +ANY < ANY {% endhighlight %} -Returns true if given boolean expression is false. False otherwise (for null and true). +Less than. {% highlight java %} -BOOLEAN.isNotTrue +ANY <= ANY {% endhighlight %} -Returns true if the given boolean expression is not true (for null and false). False otherwise. +Less than or equal. {% highlight java %} -BOOLEAN.isNotFalse +ANY.isNull {% endhighlight %} -Returns true if given boolean expression is not false (for null and true). False otherwise. +Returns true if the given expression is null. {% highlight java %} -NUMERIC.exp() +ANY.isNotNull {% endhighlight %} -Calculates the Euler's number raised to the given power. +Returns true if the given expression is not null. {% highlight java %} -NUMERIC.log10() +STRING.like(STRING) {% endhighlight %} -Calculates the base 10 logarithm of given value. +Returns true, if a string matches the specified LIKE pattern. E.g. "Jo_n%" matches all strings that start with "Jo(arbitrary letter)n". - {% highlight java %} -NUMERIC.ln() +STRING.similar(STRING) {% endhighlight %} -Calculates the natural logarithm of given value. +Returns true, if a string matches the specified SQL regex pattern. E.g. "A+" matches all strings that consist of at least one "A". + + + + + + + Logical functions + Description + + + + + {% highlight java %} -NUMERIC.power(NUMERIC) +boolean1 || boolean2 {% endhighlight %} -Calculates the given number raised to the power of the other value. +Whether boolean1 is true or boolean2 is true. {% highlight java %} -NUMERIC.sqrt() +boolean1 && boolean2 {% endhighlight %} -Calculates the square root of a given value. +Whether boolean1 and boolean2 are both true. {% highlight java %} -NUMERIC.abs() +!BOOLEAN {% endhighlight %} -Calculates the absolute value of given value. +Whether boolean expression is not true; returns null if boolean is null. {% highlight java %} -NUMERIC.floor() +BOOLEAN.isTrue {% endhighlight %} -Calculates the largest integer less than or equal to a given number. +Returns true if the given boolean expression is true. False otherwise (for null and
[jira] [Closed] (FLINK-4959) Write Documentation for ProcessFunction
[ https://issues.apache.org/jira/browse/FLINK-4959?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen closed FLINK-4959. --- > Write Documentation for ProcessFunction > --- > > Key: FLINK-4959 > URL: https://issues.apache.org/jira/browse/FLINK-4959 > Project: Flink > Issue Type: Sub-task > Components: Streaming >Affects Versions: 1.2.0 >Reporter: Aljoscha Krettek >Assignee: Stephan Ewen >Priority: Critical > Fix For: 1.2.0, 1.3.0 > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3617) NPE from CaseClassSerializer when dealing with null Option field
[ https://issues.apache.org/jira/browse/FLINK-3617?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15824554#comment-15824554 ] Stephan Ewen commented on FLINK-3617: - Improved the null check in - 1.2.0 via 27c11e1b79bd68cbd2e8275c7938478e2e9532e6 - 1.3.0 via fdce1f319c512fc845b64cbb7cbfb10f9d899021 > NPE from CaseClassSerializer when dealing with null Option field > > > Key: FLINK-3617 > URL: https://issues.apache.org/jira/browse/FLINK-3617 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.0.0 >Reporter: Jamie Grier >Assignee: Alexander Chermenin > > This error occurs when serializing a Scala case class with an field of > Option[] type where the value is not Some or None, but null. > If this is not supported we should have a good error message. > java.lang.RuntimeException: ConsumerThread threw an exception: null > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.run(FlinkKafkaConsumer09.java:336) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:78) > at > org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:224) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.RuntimeException > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:81) > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:39) > at > org.apache.flink.streaming.api.operators.StreamSource$NonTimestampContext.collect(StreamSource.java:158) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09$ConsumerThread.run(FlinkKafkaConsumer09.java:473) > Caused by: java.lang.NullPointerException > at > org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:100) > at > org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:30) > at > org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:100) > at > org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:30) > at > org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.serialize(StreamRecordSerializer.java:107) > at > org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.serialize(StreamRecordSerializer.java:42) > at > org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:56) > at > org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:79) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:84) > at > org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:86) > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:78) > ... 3 more -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (FLINK-4959) Write Documentation for ProcessFunction
[ https://issues.apache.org/jira/browse/FLINK-4959?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen resolved FLINK-4959. - Resolution: Fixed Fixed in - 1.2.0 via 2eb926f2bed5723f160620b94f3b67e5dc418387 - 1.3.0 via 7a339a65f13bfccec1f374e035d557290b45bd01 > Write Documentation for ProcessFunction > --- > > Key: FLINK-4959 > URL: https://issues.apache.org/jira/browse/FLINK-4959 > Project: Flink > Issue Type: Sub-task > Components: Streaming >Affects Versions: 1.2.0 >Reporter: Aljoscha Krettek >Assignee: Stephan Ewen >Priority: Critical > Fix For: 1.2.0, 1.3.0 > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5345) IOManager failed to properly clean up temp file directory
[ https://issues.apache.org/jira/browse/FLINK-5345?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15824552#comment-15824552 ] Stephan Ewen commented on FLINK-5345: - [~tonycox] Sorry for the confusion in this issue. > IOManager failed to properly clean up temp file directory > - > > Key: FLINK-5345 > URL: https://issues.apache.org/jira/browse/FLINK-5345 > Project: Flink > Issue Type: Bug >Affects Versions: 1.1.3 >Reporter: Robert Metzger >Assignee: Stephan Ewen > Labels: simplex, starter > Fix For: 1.2.0, 1.3.0 > > > While testing 1.1.3 RC3, I have the following message in my log: > {code} > 2016-12-15 14:46:05,450 INFO > org.apache.flink.streaming.runtime.tasks.StreamTask - Timer service > is shutting down. > 2016-12-15 14:46:05,452 INFO org.apache.flink.runtime.taskmanager.Task > - Source: control events generator (29/40) > (73915a232ba09e642f9dff92f8c8773a) switched from CANCELING to CANCELED. > 2016-12-15 14:46:05,452 INFO org.apache.flink.runtime.taskmanager.Task > - Freeing task resources for Source: control events generator > (29/40) (73915a232ba09e642f9dff92f8c8773a). > 2016-12-15 14:46:05,454 INFO org.apache.flink.yarn.YarnTaskManager > - Un-registering task and sending final execution state > CANCELED to JobManager for task Source: control events genera > tor (73915a232ba09e642f9dff92f8c8773a) > 2016-12-15 14:46:40,609 INFO org.apache.flink.yarn.YarnTaskManagerRunner > - RECEIVED SIGNAL 15: SIGTERM. Shutting down as requested. > 2016-12-15 14:46:40,611 INFO org.apache.flink.runtime.blob.BlobCache > - Shutting down BlobCache > 2016-12-15 14:46:40,724 WARN akka.remote.ReliableDeliverySupervisor > - Association with remote system > [akka.tcp://flink@10.240.0.34:33635] has failed, address is now gated for > [5000] ms. > Reason is: [Disassociated]. > 2016-12-15 14:46:40,808 ERROR > org.apache.flink.runtime.io.disk.iomanager.IOManager - IOManager > failed to properly clean up temp file directory: > /yarn/nm/usercache/robert/appcache/application_148129128 > 9979_0024/flink-io-f0ff3f66-b9e2-4560-881f-2ab43bc448b5 > java.lang.IllegalArgumentException: > /yarn/nm/usercache/robert/appcache/application_1481291289979_0024/flink-io-f0ff3f66-b9e2-4560-881f-2ab43bc448b5/62e14e1891fe1e334c921dfd19a32a84/StreamMap_11_24/dummy_state > does not exist > at org.apache.commons.io.FileUtils.cleanDirectory(FileUtils.java:1637) > at > org.apache.commons.io.FileUtils.deleteDirectory(FileUtils.java:1535) > at org.apache.commons.io.FileUtils.forceDelete(FileUtils.java:2270) > at org.apache.commons.io.FileUtils.cleanDirectory(FileUtils.java:1653) > at > org.apache.commons.io.FileUtils.deleteDirectory(FileUtils.java:1535) > at org.apache.commons.io.FileUtils.forceDelete(FileUtils.java:2270) > at org.apache.commons.io.FileUtils.cleanDirectory(FileUtils.java:1653) > at > org.apache.commons.io.FileUtils.deleteDirectory(FileUtils.java:1535) > at org.apache.commons.io.FileUtils.forceDelete(FileUtils.java:2270) > at org.apache.commons.io.FileUtils.cleanDirectory(FileUtils.java:1653) > at > org.apache.commons.io.FileUtils.deleteDirectory(FileUtils.java:1535) > at > org.apache.flink.runtime.io.disk.iomanager.IOManager.shutdown(IOManager.java:109) > at > org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync.shutdown(IOManagerAsync.java:185) > at > org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync$1.run(IOManagerAsync.java:105) > {code} > This was the last message logged from that machine. I suspect two threads are > trying to clean up the directories during shutdown? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (FLINK-5345) IOManager failed to properly clean up temp file directory
[ https://issues.apache.org/jira/browse/FLINK-5345?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen resolved FLINK-5345. - Resolution: Fixed Fixed in - 1.2.0 via d1b86aab09061627d8b8c8f99b4277cc60e3dc28 - 1.3.0 via c4626cbae074ba288e54308c40f93258e14c9667 > IOManager failed to properly clean up temp file directory > - > > Key: FLINK-5345 > URL: https://issues.apache.org/jira/browse/FLINK-5345 > Project: Flink > Issue Type: Bug >Affects Versions: 1.1.3 >Reporter: Robert Metzger >Assignee: Stephan Ewen > Labels: simplex, starter > Fix For: 1.2.0, 1.3.0 > > > While testing 1.1.3 RC3, I have the following message in my log: > {code} > 2016-12-15 14:46:05,450 INFO > org.apache.flink.streaming.runtime.tasks.StreamTask - Timer service > is shutting down. > 2016-12-15 14:46:05,452 INFO org.apache.flink.runtime.taskmanager.Task > - Source: control events generator (29/40) > (73915a232ba09e642f9dff92f8c8773a) switched from CANCELING to CANCELED. > 2016-12-15 14:46:05,452 INFO org.apache.flink.runtime.taskmanager.Task > - Freeing task resources for Source: control events generator > (29/40) (73915a232ba09e642f9dff92f8c8773a). > 2016-12-15 14:46:05,454 INFO org.apache.flink.yarn.YarnTaskManager > - Un-registering task and sending final execution state > CANCELED to JobManager for task Source: control events genera > tor (73915a232ba09e642f9dff92f8c8773a) > 2016-12-15 14:46:40,609 INFO org.apache.flink.yarn.YarnTaskManagerRunner > - RECEIVED SIGNAL 15: SIGTERM. Shutting down as requested. > 2016-12-15 14:46:40,611 INFO org.apache.flink.runtime.blob.BlobCache > - Shutting down BlobCache > 2016-12-15 14:46:40,724 WARN akka.remote.ReliableDeliverySupervisor > - Association with remote system > [akka.tcp://flink@10.240.0.34:33635] has failed, address is now gated for > [5000] ms. > Reason is: [Disassociated]. > 2016-12-15 14:46:40,808 ERROR > org.apache.flink.runtime.io.disk.iomanager.IOManager - IOManager > failed to properly clean up temp file directory: > /yarn/nm/usercache/robert/appcache/application_148129128 > 9979_0024/flink-io-f0ff3f66-b9e2-4560-881f-2ab43bc448b5 > java.lang.IllegalArgumentException: > /yarn/nm/usercache/robert/appcache/application_1481291289979_0024/flink-io-f0ff3f66-b9e2-4560-881f-2ab43bc448b5/62e14e1891fe1e334c921dfd19a32a84/StreamMap_11_24/dummy_state > does not exist > at org.apache.commons.io.FileUtils.cleanDirectory(FileUtils.java:1637) > at > org.apache.commons.io.FileUtils.deleteDirectory(FileUtils.java:1535) > at org.apache.commons.io.FileUtils.forceDelete(FileUtils.java:2270) > at org.apache.commons.io.FileUtils.cleanDirectory(FileUtils.java:1653) > at > org.apache.commons.io.FileUtils.deleteDirectory(FileUtils.java:1535) > at org.apache.commons.io.FileUtils.forceDelete(FileUtils.java:2270) > at org.apache.commons.io.FileUtils.cleanDirectory(FileUtils.java:1653) > at > org.apache.commons.io.FileUtils.deleteDirectory(FileUtils.java:1535) > at org.apache.commons.io.FileUtils.forceDelete(FileUtils.java:2270) > at org.apache.commons.io.FileUtils.cleanDirectory(FileUtils.java:1653) > at > org.apache.commons.io.FileUtils.deleteDirectory(FileUtils.java:1535) > at > org.apache.flink.runtime.io.disk.iomanager.IOManager.shutdown(IOManager.java:109) > at > org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync.shutdown(IOManagerAsync.java:185) > at > org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync$1.run(IOManagerAsync.java:105) > {code} > This was the last message logged from that machine. I suspect two threads are > trying to clean up the directories during shutdown? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Closed] (FLINK-5345) IOManager failed to properly clean up temp file directory
[ https://issues.apache.org/jira/browse/FLINK-5345?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen closed FLINK-5345. --- > IOManager failed to properly clean up temp file directory > - > > Key: FLINK-5345 > URL: https://issues.apache.org/jira/browse/FLINK-5345 > Project: Flink > Issue Type: Bug >Affects Versions: 1.1.3 >Reporter: Robert Metzger >Assignee: Stephan Ewen > Labels: simplex, starter > Fix For: 1.2.0, 1.3.0 > > > While testing 1.1.3 RC3, I have the following message in my log: > {code} > 2016-12-15 14:46:05,450 INFO > org.apache.flink.streaming.runtime.tasks.StreamTask - Timer service > is shutting down. > 2016-12-15 14:46:05,452 INFO org.apache.flink.runtime.taskmanager.Task > - Source: control events generator (29/40) > (73915a232ba09e642f9dff92f8c8773a) switched from CANCELING to CANCELED. > 2016-12-15 14:46:05,452 INFO org.apache.flink.runtime.taskmanager.Task > - Freeing task resources for Source: control events generator > (29/40) (73915a232ba09e642f9dff92f8c8773a). > 2016-12-15 14:46:05,454 INFO org.apache.flink.yarn.YarnTaskManager > - Un-registering task and sending final execution state > CANCELED to JobManager for task Source: control events genera > tor (73915a232ba09e642f9dff92f8c8773a) > 2016-12-15 14:46:40,609 INFO org.apache.flink.yarn.YarnTaskManagerRunner > - RECEIVED SIGNAL 15: SIGTERM. Shutting down as requested. > 2016-12-15 14:46:40,611 INFO org.apache.flink.runtime.blob.BlobCache > - Shutting down BlobCache > 2016-12-15 14:46:40,724 WARN akka.remote.ReliableDeliverySupervisor > - Association with remote system > [akka.tcp://flink@10.240.0.34:33635] has failed, address is now gated for > [5000] ms. > Reason is: [Disassociated]. > 2016-12-15 14:46:40,808 ERROR > org.apache.flink.runtime.io.disk.iomanager.IOManager - IOManager > failed to properly clean up temp file directory: > /yarn/nm/usercache/robert/appcache/application_148129128 > 9979_0024/flink-io-f0ff3f66-b9e2-4560-881f-2ab43bc448b5 > java.lang.IllegalArgumentException: > /yarn/nm/usercache/robert/appcache/application_1481291289979_0024/flink-io-f0ff3f66-b9e2-4560-881f-2ab43bc448b5/62e14e1891fe1e334c921dfd19a32a84/StreamMap_11_24/dummy_state > does not exist > at org.apache.commons.io.FileUtils.cleanDirectory(FileUtils.java:1637) > at > org.apache.commons.io.FileUtils.deleteDirectory(FileUtils.java:1535) > at org.apache.commons.io.FileUtils.forceDelete(FileUtils.java:2270) > at org.apache.commons.io.FileUtils.cleanDirectory(FileUtils.java:1653) > at > org.apache.commons.io.FileUtils.deleteDirectory(FileUtils.java:1535) > at org.apache.commons.io.FileUtils.forceDelete(FileUtils.java:2270) > at org.apache.commons.io.FileUtils.cleanDirectory(FileUtils.java:1653) > at > org.apache.commons.io.FileUtils.deleteDirectory(FileUtils.java:1535) > at org.apache.commons.io.FileUtils.forceDelete(FileUtils.java:2270) > at org.apache.commons.io.FileUtils.cleanDirectory(FileUtils.java:1653) > at > org.apache.commons.io.FileUtils.deleteDirectory(FileUtils.java:1535) > at > org.apache.flink.runtime.io.disk.iomanager.IOManager.shutdown(IOManager.java:109) > at > org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync.shutdown(IOManagerAsync.java:185) > at > org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync$1.run(IOManagerAsync.java:105) > {code} > This was the last message logged from that machine. I suspect two threads are > trying to clean up the directories during shutdown? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Closed] (FLINK-5485) Mark compiled web frontend files as binary when processed by git diff
[ https://issues.apache.org/jira/browse/FLINK-5485?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen closed FLINK-5485. --- > Mark compiled web frontend files as binary when processed by git diff > - > > Key: FLINK-5485 > URL: https://issues.apache.org/jira/browse/FLINK-5485 > Project: Flink > Issue Type: Improvement > Components: Webfrontend >Affects Versions: 1.3.0 >Reporter: Greg Hogan >Assignee: Greg Hogan >Priority: Trivial > Fix For: 1.2.0, 1.3.0 > > > Particularly beneficial now that javascript is minified, we can mark compiled > web frontend files as binary when processed by git diff. > https://linux.die.net/man/5/gitattributes > This does not affect how files are displayed by github. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (FLINK-5485) Mark compiled web frontend files as binary when processed by git diff
[ https://issues.apache.org/jira/browse/FLINK-5485?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen resolved FLINK-5485. - Resolution: Fixed Fix Version/s: 1.2.0 Fixed in - 1.2.0 via 6b3c683450eb7aee1c9c65be75a0fddb06cea2ce - 1.3.0 via faee74ed8ba2476117b6c9e56932b3541f69ac5c > Mark compiled web frontend files as binary when processed by git diff > - > > Key: FLINK-5485 > URL: https://issues.apache.org/jira/browse/FLINK-5485 > Project: Flink > Issue Type: Improvement > Components: Webfrontend >Affects Versions: 1.3.0 >Reporter: Greg Hogan >Assignee: Greg Hogan >Priority: Trivial > Fix For: 1.2.0, 1.3.0 > > > Particularly beneficial now that javascript is minified, we can mark compiled > web frontend files as binary when processed by git diff. > https://linux.die.net/man/5/gitattributes > This does not affect how files are displayed by github. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Closed] (FLINK-5438) Typo in JobGraph generator Exception
[ https://issues.apache.org/jira/browse/FLINK-5438?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen closed FLINK-5438. --- > Typo in JobGraph generator Exception > - > > Key: FLINK-5438 > URL: https://issues.apache.org/jira/browse/FLINK-5438 > Project: Flink > Issue Type: Improvement > Components: Client >Reporter: Ufuk Celebi >Priority: Trivial > Fix For: 1.2.0, 1.3.0 > > > When trying to run a job with parallelism > max parallelism there is a typo > in the error message: > {code} > Caused by: java.lang.IllegalStateException: The maximum parallelism (1) of > the stream node Flat Map-3 is smaller than the parallelism (18). Increase the > maximum parallelism or decrease the parallelism >>>ofthis<<< operator. > at > org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobVertex(StreamingJobGraphGenerator.java:318) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (FLINK-5438) Typo in JobGraph generator Exception
[ https://issues.apache.org/jira/browse/FLINK-5438?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen resolved FLINK-5438. - Resolution: Fixed Fix Version/s: 1.3.0 1.2.0 Fixed in - 1.2.0 via 30b467f266970e267792411d3148b4379ec23439 - 1.3.0 via c2f28c013116328583043ca1433c45c85e32de30 > Typo in JobGraph generator Exception > - > > Key: FLINK-5438 > URL: https://issues.apache.org/jira/browse/FLINK-5438 > Project: Flink > Issue Type: Improvement > Components: Client >Reporter: Ufuk Celebi >Priority: Trivial > Fix For: 1.2.0, 1.3.0 > > > When trying to run a job with parallelism > max parallelism there is a typo > in the error message: > {code} > Caused by: java.lang.IllegalStateException: The maximum parallelism (1) of > the stream node Flat Map-3 is smaller than the parallelism (18). Increase the > maximum parallelism or decrease the parallelism >>>ofthis<<< operator. > at > org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobVertex(StreamingJobGraphGenerator.java:318) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Closed] (FLINK-5448) Fix typo in StateAssignmentOperation Exception
[ https://issues.apache.org/jira/browse/FLINK-5448?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen closed FLINK-5448. --- > Fix typo in StateAssignmentOperation Exception > -- > > Key: FLINK-5448 > URL: https://issues.apache.org/jira/browse/FLINK-5448 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Ufuk Celebi >Priority: Trivial > Fix For: 1.2.0, 1.3.0 > > > {code} > Cannot restore the latest checkpoint because the operator > cbc357ccb763df2852fee8c4fc7d55f2 has non-partitioned state and its > parallelism changed. The operatorcbc357ccb763df2852fee8c4fc7d55f2 has > parallelism 2 whereas the correspondingstate object has a parallelism of 4 > {code} > White space is missing in some places. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (FLINK-5448) Fix typo in StateAssignmentOperation Exception
[ https://issues.apache.org/jira/browse/FLINK-5448?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen resolved FLINK-5448. - Resolution: Fixed Fix Version/s: 1.3.0 1.2.0 Fixed in - 1.2.0 via 119d39b6edac6a0a80f90bb07794eca1f31425f7 - 1.3.0 via fa67ef409c9d0d152d22c74e3ace4d56bc8aa7da > Fix typo in StateAssignmentOperation Exception > -- > > Key: FLINK-5448 > URL: https://issues.apache.org/jira/browse/FLINK-5448 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Ufuk Celebi >Priority: Trivial > Fix For: 1.2.0, 1.3.0 > > > {code} > Cannot restore the latest checkpoint because the operator > cbc357ccb763df2852fee8c4fc7d55f2 has non-partitioned state and its > parallelism changed. The operatorcbc357ccb763df2852fee8c4fc7d55f2 has > parallelism 2 whereas the correspondingstate object has a parallelism of 4 > {code} > White space is missing in some places. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (FLINK-4450) update storm version to 1.0.0
[ https://issues.apache.org/jira/browse/FLINK-4450?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen resolved FLINK-4450. - Resolution: Fixed Fix Version/s: (was: 2.0.0) 1.3.0 Fixed via 475c0b1a6c74744e3431b268bc1a2ee764052cf1 Thank you for the contribution! > update storm version to 1.0.0 > - > > Key: FLINK-4450 > URL: https://issues.apache.org/jira/browse/FLINK-4450 > Project: Flink > Issue Type: Improvement > Components: flink-contrib >Reporter: yuzhongliu > Fix For: 1.3.0 > > > The storm package path was changed in new version > storm old version package: > backtype.storm.* > storm new version pachage: > org.apache.storm.* > shall we update flink/flink-storm code to new storm version? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5448) Fix typo in StateAssignmentOperation Exception
[ https://issues.apache.org/jira/browse/FLINK-5448?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15824537#comment-15824537 ] ASF GitHub Bot commented on FLINK-5448: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3097 > Fix typo in StateAssignmentOperation Exception > -- > > Key: FLINK-5448 > URL: https://issues.apache.org/jira/browse/FLINK-5448 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Ufuk Celebi >Priority: Trivial > > {code} > Cannot restore the latest checkpoint because the operator > cbc357ccb763df2852fee8c4fc7d55f2 has non-partitioned state and its > parallelism changed. The operatorcbc357ccb763df2852fee8c4fc7d55f2 has > parallelism 2 whereas the correspondingstate object has a parallelism of 4 > {code} > White space is missing in some places. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5438) Typo in JobGraph generator Exception
[ https://issues.apache.org/jira/browse/FLINK-5438?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15824536#comment-15824536 ] ASF GitHub Bot commented on FLINK-5438: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3098 > Typo in JobGraph generator Exception > - > > Key: FLINK-5438 > URL: https://issues.apache.org/jira/browse/FLINK-5438 > Project: Flink > Issue Type: Improvement > Components: Client >Reporter: Ufuk Celebi >Priority: Trivial > > When trying to run a job with parallelism > max parallelism there is a typo > in the error message: > {code} > Caused by: java.lang.IllegalStateException: The maximum parallelism (1) of > the stream node Flat Map-3 is smaller than the parallelism (18). Increase the > maximum parallelism or decrease the parallelism >>>ofthis<<< operator. > at > org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobVertex(StreamingJobGraphGenerator.java:318) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #3097: [FLINK-5448] Fix typo in StateAssignmentOperation ...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3097 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3037: Flink-4450 update storm version to 1.0
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3037 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5485) Mark compiled web frontend files as binary when processed by git diff
[ https://issues.apache.org/jira/browse/FLINK-5485?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15824538#comment-15824538 ] ASF GitHub Bot commented on FLINK-5485: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3122 > Mark compiled web frontend files as binary when processed by git diff > - > > Key: FLINK-5485 > URL: https://issues.apache.org/jira/browse/FLINK-5485 > Project: Flink > Issue Type: Improvement > Components: Webfrontend >Affects Versions: 1.3.0 >Reporter: Greg Hogan >Assignee: Greg Hogan >Priority: Trivial > Fix For: 1.3.0 > > > Particularly beneficial now that javascript is minified, we can mark compiled > web frontend files as binary when processed by git diff. > https://linux.die.net/man/5/gitattributes > This does not affect how files are displayed by github. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #3122: [FLINK-5485] [webfrontend] Mark compiled web front...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3122 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3068: [FLINK-5380] Fix task metrics reuse for single-operator c...
Github user rmetzger commented on the issue: https://github.com/apache/flink/pull/3068 Mh, I just realized that the streaming APIs multichaining is leading to some ugly side effects. In below screenshot, you see that the number of outgoing records is reported as 247. These records are the number of elements emitted by the filter. The task below the source is the window that is consuming the data from the filter. Now the next two tasks also consume data from the same task, but without the filter. You can see that they've consumed much more data (because its unfiltered). ![image](https://cloud.githubusercontent.com/assets/89049/21997216/58c989a2-dc2e-11e6-9065-214dffcc6bbc.png) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5380) Number of outgoing records not reported in web interface
[ https://issues.apache.org/jira/browse/FLINK-5380?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15824522#comment-15824522 ] ASF GitHub Bot commented on FLINK-5380: --- Github user rmetzger commented on the issue: https://github.com/apache/flink/pull/3068 Mh, I just realized that the streaming APIs multichaining is leading to some ugly side effects. In below screenshot, you see that the number of outgoing records is reported as 247. These records are the number of elements emitted by the filter. The task below the source is the window that is consuming the data from the filter. Now the next two tasks also consume data from the same task, but without the filter. You can see that they've consumed much more data (because its unfiltered). ![image](https://cloud.githubusercontent.com/assets/89049/21997216/58c989a2-dc2e-11e6-9065-214dffcc6bbc.png) > Number of outgoing records not reported in web interface > > > Key: FLINK-5380 > URL: https://issues.apache.org/jira/browse/FLINK-5380 > Project: Flink > Issue Type: Bug > Components: Metrics, Streaming, Webfrontend >Affects Versions: 1.2.0, 1.3.0 >Reporter: Robert Metzger >Assignee: Chesnay Schepler >Priority: Blocker > Fix For: 1.2.0, 1.3.0 > > Attachments: outRecordsNotreported.png > > > The web frontend does not report any outgoing records in the web frontend. > The amount of data in MB is reported correctly. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-5516) Hardcoded paths in flink-python/.../PythonPlanBinder.java
[ https://issues.apache.org/jira/browse/FLINK-5516?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Felix seibert updated FLINK-5516: - Summary: Hardcoded paths in flink-python/.../PythonPlanBinder.java (was: Hardcoded paths in flink-python/.../) > Hardcoded paths in flink-python/.../PythonPlanBinder.java > - > > Key: FLINK-5516 > URL: https://issues.apache.org/jira/browse/FLINK-5516 > Project: Flink > Issue Type: Improvement >Reporter: Felix seibert > > The PythonPlanBinder.java contains three hardcoded filesystem paths: > {code:java} > public static final String FLINK_PYTHON_FILE_PATH = > System.getProperty("java.io.tmpdir") + File.separator + "flink_plan"; > private static String FLINK_HDFS_PATH = "hdfs:/tmp"; > public static final String FLINK_TMP_DATA_DIR = > System.getProperty("java.io.tmpdir") + File.separator + "flink_data"; > {code} > _FLINK_PYTHON_FILE_PATH_ and _FLINK_TMP_DATA_DIR_ are configurable by > modifying _java.io.tmpdir_. > For _FLINK_HDFS_PATH_, there is no way of configuring otherwise but modifying > the source. > Is it possible to make all three parameters configurable in the usual flink > configuration files (like flink-conf.yaml)? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-5516) Hardcoded paths in flink-python/.../
[ https://issues.apache.org/jira/browse/FLINK-5516?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Felix seibert updated FLINK-5516: - Summary: Hardcoded paths in flink-python/.../ (was: Hardcoded paths in flink-python) > Hardcoded paths in flink-python/.../ > > > Key: FLINK-5516 > URL: https://issues.apache.org/jira/browse/FLINK-5516 > Project: Flink > Issue Type: Improvement >Reporter: Felix seibert > > The PythonPlanBinder.java contains three hardcoded filesystem paths: > {code:java} > public static final String FLINK_PYTHON_FILE_PATH = > System.getProperty("java.io.tmpdir") + File.separator + "flink_plan"; > private static String FLINK_HDFS_PATH = "hdfs:/tmp"; > public static final String FLINK_TMP_DATA_DIR = > System.getProperty("java.io.tmpdir") + File.separator + "flink_data"; > {code} > _FLINK_PYTHON_FILE_PATH_ and _FLINK_TMP_DATA_DIR_ are configurable by > modifying _java.io.tmpdir_. > For _FLINK_HDFS_PATH_, there is no way of configuring otherwise but modifying > the source. > Is it possible to make all three parameters configurable in the usual flink > configuration files (like flink-conf.yaml)? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-5516) Hardcoded paths in flink-python
[ https://issues.apache.org/jira/browse/FLINK-5516?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Felix seibert updated FLINK-5516: - Description: The PythonPlanBinder.java contains three hardcoded filesystem paths: {code:java} public static final String FLINK_PYTHON_FILE_PATH = System.getProperty("java.io.tmpdir") + File.separator + "flink_plan"; private static String FLINK_HDFS_PATH = "hdfs:/tmp"; public static final String FLINK_TMP_DATA_DIR = System.getProperty("java.io.tmpdir") + File.separator + "flink_data"; {code} _FLINK_PYTHON_FILE_PATH_ and _FLINK_TMP_DATA_DIR_ are configurable by modifying _java.io.tmpdir_. For _FLINK_HDFS_PATH_, there is no way of configuring otherwise but modifying the source. Is it possible to make all three parameters configurable in the usual flink configuration files (like flink-conf.yaml)? was: The PythonPlanBinder.java contains two hardcoded filesystem paths: {code:java} public static final String FLINK_PYTHON_FILE_PATH = System.getProperty("java.io.tmpdir") + File.separator + "flink_plan"; private static String FLINK_HDFS_PATH = "hdfs:/tmp"; public static final String FLINK_TMP_DATA_DIR = System.getProperty("java.io.tmpdir") + File.separator + "flink_data"; {code} _FLINK_PYTHON_FILE_PATH_ and {noformat}FLINK_TMP_DATA_DIR{noformat} are configurable by modifying {noformat}java.io.tmpdir{noformat}. For {noformat}FLINK_HDFS_PATH{noformat}, there is no way of configuring otherwise but modifying the source. Is it possible to make all three parameters configurable in the usual flink configuration files (like flink-conf.yaml)? > Hardcoded paths in flink-python > --- > > Key: FLINK-5516 > URL: https://issues.apache.org/jira/browse/FLINK-5516 > Project: Flink > Issue Type: Improvement >Reporter: Felix seibert > > The PythonPlanBinder.java contains three hardcoded filesystem paths: > {code:java} > public static final String FLINK_PYTHON_FILE_PATH = > System.getProperty("java.io.tmpdir") + File.separator + "flink_plan"; > private static String FLINK_HDFS_PATH = "hdfs:/tmp"; > public static final String FLINK_TMP_DATA_DIR = > System.getProperty("java.io.tmpdir") + File.separator + "flink_data"; > {code} > _FLINK_PYTHON_FILE_PATH_ and _FLINK_TMP_DATA_DIR_ are configurable by > modifying _java.io.tmpdir_. > For _FLINK_HDFS_PATH_, there is no way of configuring otherwise but modifying > the source. > Is it possible to make all three parameters configurable in the usual flink > configuration files (like flink-conf.yaml)? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-5516) Hardcoded paths in flink-python
[ https://issues.apache.org/jira/browse/FLINK-5516?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Felix seibert updated FLINK-5516: - Description: The PythonPlanBinder.java contains two hardcoded filesystem paths: {code:java} public static final String FLINK_PYTHON_FILE_PATH = System.getProperty("java.io.tmpdir") + File.separator + "flink_plan"; private static String FLINK_HDFS_PATH = "hdfs:/tmp"; public static final String FLINK_TMP_DATA_DIR = System.getProperty("java.io.tmpdir") + File.separator + "flink_data"; {code} _FLINK_PYTHON_FILE_PATH_ and {noformat}FLINK_TMP_DATA_DIR{noformat} are configurable by modifying {noformat}java.io.tmpdir{noformat}. For {noformat}FLINK_HDFS_PATH{noformat}, there is no way of configuring otherwise but modifying the source. Is it possible to make all three parameters configurable in the usual flink configuration files (like flink-conf.yaml)? was: The PythonPlanBinder.java contains two hardcoded filesystem paths: {code:java} public static final String FLINK_PYTHON_FILE_PATH = System.getProperty("java.io.tmpdir") + File.separator + "flink_plan"; private static String FLINK_HDFS_PATH = "hdfs:/tmp"; public static final String FLINK_TMP_DATA_DIR = System.getProperty("java.io.tmpdir") + File.separator + "flink_data"; {code} {noformat}FLINK_PYTHON_FILE_PATH{noformat} and {noformat}FLINK_TMP_DATA_DIR{noformat} are configurable by modifying {noformat}java.io.tmpdir{noformat}. For {noformat}FLINK_HDFS_PATH{noformat}, there is no way of configuring otherwise but modifying the source. Is it possible to make all three parameters configurable in the usual flink configuration files (like flink-conf.yaml)? > Hardcoded paths in flink-python > --- > > Key: FLINK-5516 > URL: https://issues.apache.org/jira/browse/FLINK-5516 > Project: Flink > Issue Type: Improvement >Reporter: Felix seibert > > The PythonPlanBinder.java contains two hardcoded filesystem paths: > {code:java} > public static final String FLINK_PYTHON_FILE_PATH = > System.getProperty("java.io.tmpdir") + File.separator + "flink_plan"; > private static String FLINK_HDFS_PATH = "hdfs:/tmp"; > public static final String FLINK_TMP_DATA_DIR = > System.getProperty("java.io.tmpdir") + File.separator + "flink_data"; > {code} > _FLINK_PYTHON_FILE_PATH_ and {noformat}FLINK_TMP_DATA_DIR{noformat} are > configurable by modifying {noformat}java.io.tmpdir{noformat}. > For {noformat}FLINK_HDFS_PATH{noformat}, there is no way of configuring > otherwise but modifying the source. > Is it possible to make all three parameters configurable in the usual flink > configuration files (like flink-conf.yaml)? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-5516) Hardcoded paths in flink-python
Felix seibert created FLINK-5516: Summary: Hardcoded paths in flink-python Key: FLINK-5516 URL: https://issues.apache.org/jira/browse/FLINK-5516 Project: Flink Issue Type: Improvement Reporter: Felix seibert The PythonPlanBinder.java contains two hardcoded filesystem paths: {code:java} public static final String FLINK_PYTHON_FILE_PATH = System.getProperty("java.io.tmpdir") + File.separator + "flink_plan"; private static String FLINK_HDFS_PATH = "hdfs:/tmp"; public static final String FLINK_TMP_DATA_DIR = System.getProperty("java.io.tmpdir") + File.separator + "flink_data"; {code} {noformat}FLINK_PYTHON_FILE_PATH{noformat} and {noformat}FLINK_TMP_DATA_DIR{noformat} are configurable by modifying {noformat}java.io.tmpdir{noformat}. For {noformat}FLINK_HDFS_PATH{noformat}, there is no way of configuring otherwise but modifying the source. Is it possible to make all three parameters configurable in the usual flink configuration files (like flink-conf.yaml)? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (FLINK-4959) Write Documentation for ProcessFunction
[ https://issues.apache.org/jira/browse/FLINK-4959?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen reassigned FLINK-4959: --- Assignee: Stephan Ewen > Write Documentation for ProcessFunction > --- > > Key: FLINK-4959 > URL: https://issues.apache.org/jira/browse/FLINK-4959 > Project: Flink > Issue Type: Sub-task > Components: Streaming >Affects Versions: 1.2.0 >Reporter: Aljoscha Krettek >Assignee: Stephan Ewen >Priority: Critical > Fix For: 1.2.0, 1.3.0 > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-4959) Write Documentation for ProcessFunction
[ https://issues.apache.org/jira/browse/FLINK-4959?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen updated FLINK-4959: Fix Version/s: 1.3.0 > Write Documentation for ProcessFunction > --- > > Key: FLINK-4959 > URL: https://issues.apache.org/jira/browse/FLINK-4959 > Project: Flink > Issue Type: Sub-task > Components: Streaming >Affects Versions: 1.2.0 >Reporter: Aljoscha Krettek >Assignee: Stephan Ewen >Priority: Critical > Fix For: 1.2.0, 1.3.0 > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5423) Implement Stochastic Outlier Selection
[ https://issues.apache.org/jira/browse/FLINK-5423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15824451#comment-15824451 ] ASF GitHub Bot commented on FLINK-5423: --- Github user Fokko commented on a diff in the pull request: https://github.com/apache/flink/pull/3077#discussion_r96290365 --- Diff: flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/outlier/StochasticOutlierSelection.scala --- @@ -0,0 +1,367 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.outlier + +/** An implementation of the Stochastic Outlier Selection algorithm by Jeroen Jansen + * + * For more information about SOS, see https://github.com/jeroenjanssens/sos + * J.H.M. Janssens, F. Huszar, E.O. Postma, and H.J. van den Herik. Stochastic + * Outlier Selection. Technical Report TiCC TR 2012-001, Tilburg University, + * Tilburg, the Netherlands, 2012. + * + * @example + * {{{ + * val inputDS = env.fromCollection(List( + * LabeledVector(0.0, DenseVector(1.0, 1.0)), + * LabeledVector(1.0, DenseVector(2.0, 1.0)), + * LabeledVector(2.0, DenseVector(1.0, 2.0)), + * LabeledVector(3.0, DenseVector(2.0, 2.0)), + * LabeledVector(4.0, DenseVector(5.0, 8.0)) // The outlier! + * )) + * + * val sos = StochasticOutlierSelection() + * .setPerplexity(3) + * + * val outputDS = sos.transform(inputDS) + * + * val expectedOutputDS = Array( + *0.2790094479202896, + *0.25775014551682535, + *0.22136130977995766, + *0.12707053787018444, + *0.9922779902453757 // The outlier! + * ) + * + * assert(outputDS == expectedOutputDS) + * }}} + * + * =Parameters= + * + * - [[org.apache.flink.ml.outlier.StochasticOutlierSelection.Perplexity]]: + * Perplexity can be interpreted as the k in k-nearest neighbor algorithms. The difference is that + * in SOS being a neighbor is not a binary property, but a probabilistic one. Should be between + * 1 and n-1, where n is the number of observations. + * (Default value: '''30''') + * + * - [[org.apache.flink.ml.outlier.StochasticOutlierSelection.ErrorTolerance]]: + * The accepted error tolerance. When increasing this number, it will sacrifice accuracy in + * return for reduced computational time. + * (Default value: '''1e-20''') + * + * - [[org.apache.flink.ml.outlier.StochasticOutlierSelection.MaxIterations]]: + * The maximum number of iterations to perform. (Default value: '''5000''') + */ + +import breeze.linalg.functions.euclideanDistance +import breeze.linalg.{sum, DenseVector => BreezeDenseVector, Vector => BreezeVector} +import org.apache.flink.api.common.operators.Order +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.scala._ +import org.apache.flink.api.scala.utils._ +import org.apache.flink.ml.common.{LabeledVector, Parameter, ParameterMap, WithParameters} +import org.apache.flink.ml.math.Breeze._ +import org.apache.flink.ml.math.{BreezeVectorConverter, Vector} +import org.apache.flink.ml.pipeline.{TransformDataSetOperation, Transformer} + +import scala.language.implicitConversions +import scala.reflect.ClassTag + +class StochasticOutlierSelection extends Transformer[StochasticOutlierSelection] { + + import StochasticOutlierSelection._ + + + /** Sets the perplexity of the outlier selection algorithm, can be seen as the k of kNN +* For more information, please read the Stochastic Outlier Selection algorithm paper +* +* @param perplexity the perplexity of the affinity fit +*
[GitHub] flink pull request #3077: [FLINK-5423] Implement Stochastic Outlier Selectio...
Github user Fokko commented on a diff in the pull request: https://github.com/apache/flink/pull/3077#discussion_r96290365 --- Diff: flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/outlier/StochasticOutlierSelection.scala --- @@ -0,0 +1,367 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.outlier + +/** An implementation of the Stochastic Outlier Selection algorithm by Jeroen Jansen + * + * For more information about SOS, see https://github.com/jeroenjanssens/sos + * J.H.M. Janssens, F. Huszar, E.O. Postma, and H.J. van den Herik. Stochastic + * Outlier Selection. Technical Report TiCC TR 2012-001, Tilburg University, + * Tilburg, the Netherlands, 2012. + * + * @example + * {{{ + * val inputDS = env.fromCollection(List( + * LabeledVector(0.0, DenseVector(1.0, 1.0)), + * LabeledVector(1.0, DenseVector(2.0, 1.0)), + * LabeledVector(2.0, DenseVector(1.0, 2.0)), + * LabeledVector(3.0, DenseVector(2.0, 2.0)), + * LabeledVector(4.0, DenseVector(5.0, 8.0)) // The outlier! + * )) + * + * val sos = StochasticOutlierSelection() + * .setPerplexity(3) + * + * val outputDS = sos.transform(inputDS) + * + * val expectedOutputDS = Array( + *0.2790094479202896, + *0.25775014551682535, + *0.22136130977995766, + *0.12707053787018444, + *0.9922779902453757 // The outlier! + * ) + * + * assert(outputDS == expectedOutputDS) + * }}} + * + * =Parameters= + * + * - [[org.apache.flink.ml.outlier.StochasticOutlierSelection.Perplexity]]: + * Perplexity can be interpreted as the k in k-nearest neighbor algorithms. The difference is that + * in SOS being a neighbor is not a binary property, but a probabilistic one. Should be between + * 1 and n-1, where n is the number of observations. + * (Default value: '''30''') + * + * - [[org.apache.flink.ml.outlier.StochasticOutlierSelection.ErrorTolerance]]: + * The accepted error tolerance. When increasing this number, it will sacrifice accuracy in + * return for reduced computational time. + * (Default value: '''1e-20''') + * + * - [[org.apache.flink.ml.outlier.StochasticOutlierSelection.MaxIterations]]: + * The maximum number of iterations to perform. (Default value: '''5000''') + */ + +import breeze.linalg.functions.euclideanDistance +import breeze.linalg.{sum, DenseVector => BreezeDenseVector, Vector => BreezeVector} +import org.apache.flink.api.common.operators.Order +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.scala._ +import org.apache.flink.api.scala.utils._ +import org.apache.flink.ml.common.{LabeledVector, Parameter, ParameterMap, WithParameters} +import org.apache.flink.ml.math.Breeze._ +import org.apache.flink.ml.math.{BreezeVectorConverter, Vector} +import org.apache.flink.ml.pipeline.{TransformDataSetOperation, Transformer} + +import scala.language.implicitConversions +import scala.reflect.ClassTag + +class StochasticOutlierSelection extends Transformer[StochasticOutlierSelection] { + + import StochasticOutlierSelection._ + + + /** Sets the perplexity of the outlier selection algorithm, can be seen as the k of kNN +* For more information, please read the Stochastic Outlier Selection algorithm paper +* +* @param perplexity the perplexity of the affinity fit +* @return +*/ + def setPerplexity(perplexity: Double): StochasticOutlierSelection = { +require(perplexity >= 1, "Perplexity must be at least one.") +parameters.add(Perplexity, perplexity) +this + }
[jira] [Commented] (FLINK-5423) Implement Stochastic Outlier Selection
[ https://issues.apache.org/jira/browse/FLINK-5423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15824449#comment-15824449 ] ASF GitHub Bot commented on FLINK-5423: --- Github user Fokko commented on a diff in the pull request: https://github.com/apache/flink/pull/3077#discussion_r96290195 --- Diff: flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/outlier/StochasticOutlierSelection.scala --- @@ -0,0 +1,367 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.outlier + +/** An implementation of the Stochastic Outlier Selection algorithm by Jeroen Jansen + * + * For more information about SOS, see https://github.com/jeroenjanssens/sos + * J.H.M. Janssens, F. Huszar, E.O. Postma, and H.J. van den Herik. Stochastic + * Outlier Selection. Technical Report TiCC TR 2012-001, Tilburg University, + * Tilburg, the Netherlands, 2012. + * + * @example + * {{{ + * val inputDS = env.fromCollection(List( + * LabeledVector(0.0, DenseVector(1.0, 1.0)), + * LabeledVector(1.0, DenseVector(2.0, 1.0)), + * LabeledVector(2.0, DenseVector(1.0, 2.0)), + * LabeledVector(3.0, DenseVector(2.0, 2.0)), + * LabeledVector(4.0, DenseVector(5.0, 8.0)) // The outlier! + * )) + * + * val sos = StochasticOutlierSelection() + * .setPerplexity(3) + * + * val outputDS = sos.transform(inputDS) + * + * val expectedOutputDS = Array( + *0.2790094479202896, + *0.25775014551682535, + *0.22136130977995766, + *0.12707053787018444, + *0.9922779902453757 // The outlier! + * ) + * + * assert(outputDS == expectedOutputDS) + * }}} + * + * =Parameters= + * + * - [[org.apache.flink.ml.outlier.StochasticOutlierSelection.Perplexity]]: + * Perplexity can be interpreted as the k in k-nearest neighbor algorithms. The difference is that + * in SOS being a neighbor is not a binary property, but a probabilistic one. Should be between + * 1 and n-1, where n is the number of observations. + * (Default value: '''30''') + * + * - [[org.apache.flink.ml.outlier.StochasticOutlierSelection.ErrorTolerance]]: + * The accepted error tolerance. When increasing this number, it will sacrifice accuracy in + * return for reduced computational time. + * (Default value: '''1e-20''') + * + * - [[org.apache.flink.ml.outlier.StochasticOutlierSelection.MaxIterations]]: + * The maximum number of iterations to perform. (Default value: '''5000''') + */ + +import breeze.linalg.functions.euclideanDistance +import breeze.linalg.{sum, DenseVector => BreezeDenseVector, Vector => BreezeVector} +import org.apache.flink.api.common.operators.Order +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.scala._ +import org.apache.flink.api.scala.utils._ +import org.apache.flink.ml.common.{LabeledVector, Parameter, ParameterMap, WithParameters} +import org.apache.flink.ml.math.Breeze._ +import org.apache.flink.ml.math.{BreezeVectorConverter, Vector} +import org.apache.flink.ml.pipeline.{TransformDataSetOperation, Transformer} + +import scala.language.implicitConversions +import scala.reflect.ClassTag + +class StochasticOutlierSelection extends Transformer[StochasticOutlierSelection] { + + import StochasticOutlierSelection._ + + + /** Sets the perplexity of the outlier selection algorithm, can be seen as the k of kNN +* For more information, please read the Stochastic Outlier Selection algorithm paper +* +* @param perplexity the perplexity of the affinity fit +*
[GitHub] flink pull request #3077: [FLINK-5423] Implement Stochastic Outlier Selectio...
Github user Fokko commented on a diff in the pull request: https://github.com/apache/flink/pull/3077#discussion_r96290218 --- Diff: flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/outlier/StochasticOutlierSelection.scala --- @@ -0,0 +1,367 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.outlier + +/** An implementation of the Stochastic Outlier Selection algorithm by Jeroen Jansen + * + * For more information about SOS, see https://github.com/jeroenjanssens/sos + * J.H.M. Janssens, F. Huszar, E.O. Postma, and H.J. van den Herik. Stochastic + * Outlier Selection. Technical Report TiCC TR 2012-001, Tilburg University, + * Tilburg, the Netherlands, 2012. + * + * @example + * {{{ + * val inputDS = env.fromCollection(List( + * LabeledVector(0.0, DenseVector(1.0, 1.0)), + * LabeledVector(1.0, DenseVector(2.0, 1.0)), + * LabeledVector(2.0, DenseVector(1.0, 2.0)), + * LabeledVector(3.0, DenseVector(2.0, 2.0)), + * LabeledVector(4.0, DenseVector(5.0, 8.0)) // The outlier! + * )) + * + * val sos = StochasticOutlierSelection() + * .setPerplexity(3) + * + * val outputDS = sos.transform(inputDS) + * + * val expectedOutputDS = Array( + *0.2790094479202896, + *0.25775014551682535, + *0.22136130977995766, + *0.12707053787018444, + *0.9922779902453757 // The outlier! + * ) + * + * assert(outputDS == expectedOutputDS) + * }}} + * + * =Parameters= + * + * - [[org.apache.flink.ml.outlier.StochasticOutlierSelection.Perplexity]]: + * Perplexity can be interpreted as the k in k-nearest neighbor algorithms. The difference is that + * in SOS being a neighbor is not a binary property, but a probabilistic one. Should be between + * 1 and n-1, where n is the number of observations. + * (Default value: '''30''') + * + * - [[org.apache.flink.ml.outlier.StochasticOutlierSelection.ErrorTolerance]]: + * The accepted error tolerance. When increasing this number, it will sacrifice accuracy in + * return for reduced computational time. + * (Default value: '''1e-20''') + * + * - [[org.apache.flink.ml.outlier.StochasticOutlierSelection.MaxIterations]]: + * The maximum number of iterations to perform. (Default value: '''5000''') + */ + +import breeze.linalg.functions.euclideanDistance +import breeze.linalg.{sum, DenseVector => BreezeDenseVector, Vector => BreezeVector} +import org.apache.flink.api.common.operators.Order +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.scala._ +import org.apache.flink.api.scala.utils._ +import org.apache.flink.ml.common.{LabeledVector, Parameter, ParameterMap, WithParameters} +import org.apache.flink.ml.math.Breeze._ +import org.apache.flink.ml.math.{BreezeVectorConverter, Vector} +import org.apache.flink.ml.pipeline.{TransformDataSetOperation, Transformer} + +import scala.language.implicitConversions +import scala.reflect.ClassTag + +class StochasticOutlierSelection extends Transformer[StochasticOutlierSelection] { + + import StochasticOutlierSelection._ + + + /** Sets the perplexity of the outlier selection algorithm, can be seen as the k of kNN +* For more information, please read the Stochastic Outlier Selection algorithm paper +* +* @param perplexity the perplexity of the affinity fit +* @return +*/ + def setPerplexity(perplexity: Double): StochasticOutlierSelection = { +require(perplexity >= 1, "Perplexity must be at least one.") +parameters.add(Perplexity, perplexity) +this + }
[jira] [Commented] (FLINK-5423) Implement Stochastic Outlier Selection
[ https://issues.apache.org/jira/browse/FLINK-5423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15824450#comment-15824450 ] ASF GitHub Bot commented on FLINK-5423: --- Github user Fokko commented on a diff in the pull request: https://github.com/apache/flink/pull/3077#discussion_r96290218 --- Diff: flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/outlier/StochasticOutlierSelection.scala --- @@ -0,0 +1,367 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.outlier + +/** An implementation of the Stochastic Outlier Selection algorithm by Jeroen Jansen + * + * For more information about SOS, see https://github.com/jeroenjanssens/sos + * J.H.M. Janssens, F. Huszar, E.O. Postma, and H.J. van den Herik. Stochastic + * Outlier Selection. Technical Report TiCC TR 2012-001, Tilburg University, + * Tilburg, the Netherlands, 2012. + * + * @example + * {{{ + * val inputDS = env.fromCollection(List( + * LabeledVector(0.0, DenseVector(1.0, 1.0)), + * LabeledVector(1.0, DenseVector(2.0, 1.0)), + * LabeledVector(2.0, DenseVector(1.0, 2.0)), + * LabeledVector(3.0, DenseVector(2.0, 2.0)), + * LabeledVector(4.0, DenseVector(5.0, 8.0)) // The outlier! + * )) + * + * val sos = StochasticOutlierSelection() + * .setPerplexity(3) + * + * val outputDS = sos.transform(inputDS) + * + * val expectedOutputDS = Array( + *0.2790094479202896, + *0.25775014551682535, + *0.22136130977995766, + *0.12707053787018444, + *0.9922779902453757 // The outlier! + * ) + * + * assert(outputDS == expectedOutputDS) + * }}} + * + * =Parameters= + * + * - [[org.apache.flink.ml.outlier.StochasticOutlierSelection.Perplexity]]: + * Perplexity can be interpreted as the k in k-nearest neighbor algorithms. The difference is that + * in SOS being a neighbor is not a binary property, but a probabilistic one. Should be between + * 1 and n-1, where n is the number of observations. + * (Default value: '''30''') + * + * - [[org.apache.flink.ml.outlier.StochasticOutlierSelection.ErrorTolerance]]: + * The accepted error tolerance. When increasing this number, it will sacrifice accuracy in + * return for reduced computational time. + * (Default value: '''1e-20''') + * + * - [[org.apache.flink.ml.outlier.StochasticOutlierSelection.MaxIterations]]: + * The maximum number of iterations to perform. (Default value: '''5000''') + */ + +import breeze.linalg.functions.euclideanDistance +import breeze.linalg.{sum, DenseVector => BreezeDenseVector, Vector => BreezeVector} +import org.apache.flink.api.common.operators.Order +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.scala._ +import org.apache.flink.api.scala.utils._ +import org.apache.flink.ml.common.{LabeledVector, Parameter, ParameterMap, WithParameters} +import org.apache.flink.ml.math.Breeze._ +import org.apache.flink.ml.math.{BreezeVectorConverter, Vector} +import org.apache.flink.ml.pipeline.{TransformDataSetOperation, Transformer} + +import scala.language.implicitConversions +import scala.reflect.ClassTag + +class StochasticOutlierSelection extends Transformer[StochasticOutlierSelection] { + + import StochasticOutlierSelection._ + + + /** Sets the perplexity of the outlier selection algorithm, can be seen as the k of kNN +* For more information, please read the Stochastic Outlier Selection algorithm paper +* +* @param perplexity the perplexity of the affinity fit +*
[GitHub] flink pull request #3077: [FLINK-5423] Implement Stochastic Outlier Selectio...
Github user Fokko commented on a diff in the pull request: https://github.com/apache/flink/pull/3077#discussion_r96290195 --- Diff: flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/outlier/StochasticOutlierSelection.scala --- @@ -0,0 +1,367 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.outlier + +/** An implementation of the Stochastic Outlier Selection algorithm by Jeroen Jansen + * + * For more information about SOS, see https://github.com/jeroenjanssens/sos + * J.H.M. Janssens, F. Huszar, E.O. Postma, and H.J. van den Herik. Stochastic + * Outlier Selection. Technical Report TiCC TR 2012-001, Tilburg University, + * Tilburg, the Netherlands, 2012. + * + * @example + * {{{ + * val inputDS = env.fromCollection(List( + * LabeledVector(0.0, DenseVector(1.0, 1.0)), + * LabeledVector(1.0, DenseVector(2.0, 1.0)), + * LabeledVector(2.0, DenseVector(1.0, 2.0)), + * LabeledVector(3.0, DenseVector(2.0, 2.0)), + * LabeledVector(4.0, DenseVector(5.0, 8.0)) // The outlier! + * )) + * + * val sos = StochasticOutlierSelection() + * .setPerplexity(3) + * + * val outputDS = sos.transform(inputDS) + * + * val expectedOutputDS = Array( + *0.2790094479202896, + *0.25775014551682535, + *0.22136130977995766, + *0.12707053787018444, + *0.9922779902453757 // The outlier! + * ) + * + * assert(outputDS == expectedOutputDS) + * }}} + * + * =Parameters= + * + * - [[org.apache.flink.ml.outlier.StochasticOutlierSelection.Perplexity]]: + * Perplexity can be interpreted as the k in k-nearest neighbor algorithms. The difference is that + * in SOS being a neighbor is not a binary property, but a probabilistic one. Should be between + * 1 and n-1, where n is the number of observations. + * (Default value: '''30''') + * + * - [[org.apache.flink.ml.outlier.StochasticOutlierSelection.ErrorTolerance]]: + * The accepted error tolerance. When increasing this number, it will sacrifice accuracy in + * return for reduced computational time. + * (Default value: '''1e-20''') + * + * - [[org.apache.flink.ml.outlier.StochasticOutlierSelection.MaxIterations]]: + * The maximum number of iterations to perform. (Default value: '''5000''') + */ + +import breeze.linalg.functions.euclideanDistance +import breeze.linalg.{sum, DenseVector => BreezeDenseVector, Vector => BreezeVector} +import org.apache.flink.api.common.operators.Order +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.scala._ +import org.apache.flink.api.scala.utils._ +import org.apache.flink.ml.common.{LabeledVector, Parameter, ParameterMap, WithParameters} +import org.apache.flink.ml.math.Breeze._ +import org.apache.flink.ml.math.{BreezeVectorConverter, Vector} +import org.apache.flink.ml.pipeline.{TransformDataSetOperation, Transformer} + +import scala.language.implicitConversions +import scala.reflect.ClassTag + +class StochasticOutlierSelection extends Transformer[StochasticOutlierSelection] { + + import StochasticOutlierSelection._ + + + /** Sets the perplexity of the outlier selection algorithm, can be seen as the k of kNN +* For more information, please read the Stochastic Outlier Selection algorithm paper +* +* @param perplexity the perplexity of the affinity fit +* @return +*/ + def setPerplexity(perplexity: Double): StochasticOutlierSelection = { +require(perplexity >= 1, "Perplexity must be at least one.") +parameters.add(Perplexity, perplexity) +this + }
[jira] [Commented] (FLINK-5423) Implement Stochastic Outlier Selection
[ https://issues.apache.org/jira/browse/FLINK-5423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15824436#comment-15824436 ] ASF GitHub Bot commented on FLINK-5423: --- Github user Fokko commented on a diff in the pull request: https://github.com/apache/flink/pull/3077#discussion_r96289143 --- Diff: flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/outlier/StochasticOutlierSelectionITSuite.scala --- @@ -0,0 +1,240 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.outlier + +import breeze.linalg.{sum, DenseVector => BreezeDenseVector} +import org.apache.flink.api.scala._ +import org.apache.flink.ml.common.LabeledVector +import org.apache.flink.ml.math.DenseVector +import org.apache.flink.ml.outlier.StochasticOutlierSelection.BreezeLabeledVector +import org.apache.flink.ml.util.FlinkTestBase +import org.scalatest.{FlatSpec, Matchers} + +class StochasticOutlierSelectionITSuite extends FlatSpec with Matchers with FlinkTestBase { + behavior of "Stochastic Outlier Selection algorithm" + val EPSILON = 1e-16 + + /* +Unit-tests created based on the Python scripts of the algorithms author' +https://github.com/jeroenjanssens/scikit-sos + +For more information about SOS, see https://github.com/jeroenjanssens/sos +J.H.M. Janssens, F. Huszar, E.O. Postma, and H.J. van den Herik. Stochastic +Outlier Selection. Technical Report TiCC TR 2012-001, Tilburg University, +Tilburg, the Netherlands, 2012. + */ + + val perplexity = 3 + val errorTolerance = 0 + val maxIterations = 5000 + val parameters = new StochasticOutlierSelection().setPerplexity(perplexity).parameters + + val env = ExecutionEnvironment.getExecutionEnvironment + + it should "Compute the perplexity of the vector and return the correct error" in { +val vector = BreezeDenseVector(Array(1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 8.0, 9.0, 10.0)) + +val output = Array( + 0.39682901665799636, + 0.15747326846175236, + 0.06248996227359784, + 0.024797830280027126, + 0.009840498605275054, + 0.0039049953849556816, + 6.149323865970302E-4, + 2.4402301428445443E-4, + 9.683541280042027E-5 +) + +val search = StochasticOutlierSelection.binarySearch( + vector, + Math.log(perplexity), + maxIterations, + errorTolerance +).toArray + +search should be(output) + } + + it should "Compute the distance matrix and give symmetrical distances" in { + +val data = env.fromCollection(List( + BreezeLabeledVector(0, BreezeDenseVector(Array(1.0, 3.0))), + BreezeLabeledVector(1, BreezeDenseVector(Array(5.0, 1.0))) +)) + +val distanceMatrix = StochasticOutlierSelection + .computeDissimilarityVectors(data) + .map(_.data) + .collect() + .toArray + +print(distanceMatrix) --- End diff -- Oops, still in there from the debugging. > Implement Stochastic Outlier Selection > -- > > Key: FLINK-5423 > URL: https://issues.apache.org/jira/browse/FLINK-5423 > Project: Flink > Issue Type: Improvement > Components: Machine Learning Library >Reporter: Fokko Driesprong >Assignee: Fokko Driesprong > > I've implemented the Stochastic Outlier Selection (SOS) algorithm by Jeroen > Jansen. > http://jeroenjanssens.com/2013/11/24/stochastic-outlier-selection.html > Integrated as much as possible with the components from the machine learning > library. > The algorithm itself has been compared to four other algorithms and it it > shows that SOS has a higher performance on most of these real-world datasets. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5515) fix unused kvState.getSerializedValue call in KvStateServerHandler
[ https://issues.apache.org/jira/browse/FLINK-5515?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15824387#comment-15824387 ] ASF GitHub Bot commented on FLINK-5515: --- GitHub user NicoK opened a pull request: https://github.com/apache/flink/pull/3131 [FLINK-5515] remove unused kvState.getSerializedValue call in KvStateServerHandler this seems like a simple left-over from a merge that is doing unnecessary extra work You can merge this pull request into a Git repository by running: $ git pull https://github.com/NicoK/flink flink-5515 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3131.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3131 commit a92186af10e1dfa035f1650a7aaa3450b91ab037 Author: Nico KruberDate: 2017-01-16T17:45:49Z [FLINK-5515] remove unused kvState.getSerializedValue call in KvStateServerHandler > fix unused kvState.getSerializedValue call in KvStateServerHandler > -- > > Key: FLINK-5515 > URL: https://issues.apache.org/jira/browse/FLINK-5515 > Project: Flink > Issue Type: Improvement >Reporter: Nico Kruber > > This was added in 4809f5367b08a9734fc1bd4875be51a9f3bb65aa and is probably a > left-over from a merge. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #3131: [FLINK-5515] remove unused kvState.getSerializedVa...
GitHub user NicoK opened a pull request: https://github.com/apache/flink/pull/3131 [FLINK-5515] remove unused kvState.getSerializedValue call in KvStateServerHandler this seems like a simple left-over from a merge that is doing unnecessary extra work You can merge this pull request into a Git repository by running: $ git pull https://github.com/NicoK/flink flink-5515 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3131.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3131 commit a92186af10e1dfa035f1650a7aaa3450b91ab037 Author: Nico KruberDate: 2017-01-16T17:45:49Z [FLINK-5515] remove unused kvState.getSerializedValue call in KvStateServerHandler --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-5515) fix unused kvState.getSerializedValue call in KvStateServerHandler
Nico Kruber created FLINK-5515: -- Summary: fix unused kvState.getSerializedValue call in KvStateServerHandler Key: FLINK-5515 URL: https://issues.apache.org/jira/browse/FLINK-5515 Project: Flink Issue Type: Improvement Reporter: Nico Kruber This was added in 4809f5367b08a9734fc1bd4875be51a9f3bb65aa and is probably a left-over from a merge. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4988) Elasticsearch 5.x support
[ https://issues.apache.org/jira/browse/FLINK-4988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15824325#comment-15824325 ] ASF GitHub Bot commented on FLINK-4988: --- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/3112#discussion_r96274454 --- Diff: flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java --- @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.elasticsearch; + +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.util.InstantiationUtil; +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.bulk.BulkItemResponse; +import org.elasticsearch.action.bulk.BulkProcessor; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.client.Client; +import org.elasticsearch.common.unit.ByteSizeUnit; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.unit.TimeValue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Base class for all Flink Elasticsearch Sinks. + * + * + * This class implements the common behaviour across Elasticsearch versions, such as + * the use of an internal {@link BulkProcessor} to buffer multiple {@link ActionRequest}s before + * sending the requests to the cluster, as well as passing input records to the user provided + * {@link ElasticsearchSinkFunction} for processing. + * + * + * The version specific behaviours for creating a {@link Client} to connect to a Elasticsearch cluster + * should be defined by concrete implementations of a {@link ElasticsearchClientFactory}, which is to be provided to the + * constructor of this class. + * + * @param Type of the elements handled by this sink + */ +public abstract class ElasticsearchSinkBase extends RichSinkFunction { + + private static final long serialVersionUID = -1007596293618451942L; + + private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchSinkBase.class); + + // + // Internal bulk processor configuration + // + + public static final String CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS = "bulk.flush.max.actions"; + public static final String CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB = "bulk.flush.max.size.mb"; + public static final String CONFIG_KEY_BULK_FLUSH_INTERVAL_MS = "bulk.flush.interval.ms"; + + private final Integer bulkProcessorFlushMaxActions; + private final Integer bulkProcessorFlushMaxSizeMb; + private final Integer bulkProcessorFlushIntervalMillis; + + // + // User-facing API and configuration + // + + /** The user specified config map that we forward to Elasticsearch when we create the {@link Client}. */ + private final MapuserConfig; + + /** The function that is used to construct mulitple {@link ActionRequest ActionRequests} from each incoming element. */ + private final ElasticsearchSinkFunction elasticsearchSinkFunction; + + /** Provided to the user via the {@link
[GitHub] flink pull request #3112: [FLINK-4988] [elasticsearch] Add Elasticsearch 5.x...
Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/3112#discussion_r96274454 --- Diff: flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java --- @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.elasticsearch; + +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.util.InstantiationUtil; +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.bulk.BulkItemResponse; +import org.elasticsearch.action.bulk.BulkProcessor; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.client.Client; +import org.elasticsearch.common.unit.ByteSizeUnit; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.unit.TimeValue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Base class for all Flink Elasticsearch Sinks. + * + * + * This class implements the common behaviour across Elasticsearch versions, such as + * the use of an internal {@link BulkProcessor} to buffer multiple {@link ActionRequest}s before + * sending the requests to the cluster, as well as passing input records to the user provided + * {@link ElasticsearchSinkFunction} for processing. + * + * + * The version specific behaviours for creating a {@link Client} to connect to a Elasticsearch cluster + * should be defined by concrete implementations of a {@link ElasticsearchClientFactory}, which is to be provided to the + * constructor of this class. + * + * @param Type of the elements handled by this sink + */ +public abstract class ElasticsearchSinkBase extends RichSinkFunction { + + private static final long serialVersionUID = -1007596293618451942L; + + private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchSinkBase.class); + + // + // Internal bulk processor configuration + // + + public static final String CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS = "bulk.flush.max.actions"; + public static final String CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB = "bulk.flush.max.size.mb"; + public static final String CONFIG_KEY_BULK_FLUSH_INTERVAL_MS = "bulk.flush.interval.ms"; + + private final Integer bulkProcessorFlushMaxActions; + private final Integer bulkProcessorFlushMaxSizeMb; + private final Integer bulkProcessorFlushIntervalMillis; + + // + // User-facing API and configuration + // + + /** The user specified config map that we forward to Elasticsearch when we create the {@link Client}. */ + private final MapuserConfig; + + /** The function that is used to construct mulitple {@link ActionRequest ActionRequests} from each incoming element. */ + private final ElasticsearchSinkFunction elasticsearchSinkFunction; + + /** Provided to the user via the {@link ElasticsearchSinkFunction} to add {@link ActionRequest ActionRequests}. */ + private transient BulkProcessorIndexer requestIndexer; + + // + // Internals for
[jira] [Commented] (FLINK-5394) the estimateRowCount method of DataSetCalc didn't work
[ https://issues.apache.org/jira/browse/FLINK-5394?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15824320#comment-15824320 ] ASF GitHub Bot commented on FLINK-5394: --- Github user twalthr commented on the issue: https://github.com/apache/flink/pull/3058 I will also look at it tomorrow. > the estimateRowCount method of DataSetCalc didn't work > -- > > Key: FLINK-5394 > URL: https://issues.apache.org/jira/browse/FLINK-5394 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Reporter: zhangjing >Assignee: zhangjing > > The estimateRowCount method of DataSetCalc didn't work now. > If I run the following code, > {code} > Table table = tableEnv > .fromDataSet(data, "a, b, c") > .groupBy("a") > .select("a, a.avg, b.sum, c.count") > .where("a == 1"); > {code} > the cost of every node in Optimized node tree is : > {code} > DataSetAggregate(groupBy=[a], select=[a, AVG(a) AS TMP_0, SUM(b) AS TMP_1, > COUNT(c) AS TMP_2]): rowcount = 1000.0, cumulative cost = {3000.0 rows, > 5000.0 cpu, 28000.0 io} > DataSetCalc(select=[a, b, c], where=[=(a, 1)]): rowcount = 1000.0, > cumulative cost = {2000.0 rows, 2000.0 cpu, 0.0 io} > DataSetScan(table=[[_DataSetTable_0]]): rowcount = 1000.0, cumulative > cost = {1000.0 rows, 1000.0 cpu, 0.0 io} > {code} > We expect the input rowcount of DataSetAggregate less than 1000, however the > actual input rowcount is still 1000 because the the estimateRowCount method > of DataSetCalc didn't work. > There are two reasons caused to this: > 1. Didn't provide custom metadataProvider yet. So when DataSetAggregate calls > RelMetadataQuery.getRowCount(DataSetCalc) to estimate its input rowcount > which would dispatch to RelMdRowCount. > 2. DataSetCalc is subclass of SingleRel. So previous function call would > match getRowCount(SingleRel rel, RelMetadataQuery mq) which would never use > DataSetCalc.estimateRowCount. > The question would also appear to all Flink RelNodes which are subclass of > SingleRel. > I plan to resolve this problem by adding a FlinkRelMdRowCount which contains > specific getRowCount of Flink RelNodes. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #3058: [FLINK-5394] [Table API & SQL]the estimateRowCount method...
Github user twalthr commented on the issue: https://github.com/apache/flink/pull/3058 I will also look at it tomorrow. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2168) Add HBaseTableSource
[ https://issues.apache.org/jira/browse/FLINK-2168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15824291#comment-15824291 ] ramkrishna.s.vasudevan commented on FLINK-2168: --- bq.However, I think we would need a custom TableInputFormat anyway which returns Row instead of Tuple which is restricted to 25 fields and does not support null. This was the reason. Actually I did not extend the TableInputFormat rather extended RichInputFormat. I created HBaseTableSource in flink-table and I created it as scala. I can change them to Java. Thanks [~fhueske]. > Add HBaseTableSource > > > Key: FLINK-2168 > URL: https://issues.apache.org/jira/browse/FLINK-2168 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Affects Versions: 0.9 >Reporter: Fabian Hueske >Assignee: ramkrishna.s.vasudevan >Priority: Minor > Labels: starter > > Add a {{HBaseTableSource}} to read data from a HBase table. The > {{HBaseTableSource}} should implement the {{ProjectableTableSource}} > (FLINK-3848) and {{FilterableTableSource}} (FLINK-3849) interfaces. > The implementation can be based on Flink's {{TableInputFormat}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4988) Elasticsearch 5.x support
[ https://issues.apache.org/jira/browse/FLINK-4988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15824230#comment-15824230 ] ASF GitHub Bot commented on FLINK-4988: --- Github user fpompermaier commented on a diff in the pull request: https://github.com/apache/flink/pull/3112#discussion_r96265039 --- Diff: flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java --- @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.elasticsearch; + +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.util.InstantiationUtil; +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.bulk.BulkItemResponse; +import org.elasticsearch.action.bulk.BulkProcessor; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.client.Client; +import org.elasticsearch.common.unit.ByteSizeUnit; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.unit.TimeValue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Base class for all Flink Elasticsearch Sinks. + * + * + * This class implements the common behaviour across Elasticsearch versions, such as + * the use of an internal {@link BulkProcessor} to buffer multiple {@link ActionRequest}s before + * sending the requests to the cluster, as well as passing input records to the user provided + * {@link ElasticsearchSinkFunction} for processing. + * + * + * The version specific behaviours for creating a {@link Client} to connect to a Elasticsearch cluster + * should be defined by concrete implementations of a {@link ElasticsearchClientFactory}, which is to be provided to the + * constructor of this class. + * + * @param Type of the elements handled by this sink + */ +public abstract class ElasticsearchSinkBase extends RichSinkFunction { + + private static final long serialVersionUID = -1007596293618451942L; + + private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchSinkBase.class); + + // + // Internal bulk processor configuration + // + + public static final String CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS = "bulk.flush.max.actions"; + public static final String CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB = "bulk.flush.max.size.mb"; + public static final String CONFIG_KEY_BULK_FLUSH_INTERVAL_MS = "bulk.flush.interval.ms"; + + private final Integer bulkProcessorFlushMaxActions; + private final Integer bulkProcessorFlushMaxSizeMb; + private final Integer bulkProcessorFlushIntervalMillis; + + // + // User-facing API and configuration + // + + /** The user specified config map that we forward to Elasticsearch when we create the {@link Client}. */ + private final MapuserConfig; + + /** The function that is used to construct mulitple {@link ActionRequest ActionRequests} from each incoming element. */ + private final ElasticsearchSinkFunction elasticsearchSinkFunction; + + /** Provided to the user via the {@link
[GitHub] flink pull request #3112: [FLINK-4988] [elasticsearch] Add Elasticsearch 5.x...
Github user fpompermaier commented on a diff in the pull request: https://github.com/apache/flink/pull/3112#discussion_r96265039 --- Diff: flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java --- @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.elasticsearch; + +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.util.InstantiationUtil; +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.bulk.BulkItemResponse; +import org.elasticsearch.action.bulk.BulkProcessor; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.client.Client; +import org.elasticsearch.common.unit.ByteSizeUnit; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.unit.TimeValue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Base class for all Flink Elasticsearch Sinks. + * + * + * This class implements the common behaviour across Elasticsearch versions, such as + * the use of an internal {@link BulkProcessor} to buffer multiple {@link ActionRequest}s before + * sending the requests to the cluster, as well as passing input records to the user provided + * {@link ElasticsearchSinkFunction} for processing. + * + * + * The version specific behaviours for creating a {@link Client} to connect to a Elasticsearch cluster + * should be defined by concrete implementations of a {@link ElasticsearchClientFactory}, which is to be provided to the + * constructor of this class. + * + * @param Type of the elements handled by this sink + */ +public abstract class ElasticsearchSinkBase extends RichSinkFunction { + + private static final long serialVersionUID = -1007596293618451942L; + + private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchSinkBase.class); + + // + // Internal bulk processor configuration + // + + public static final String CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS = "bulk.flush.max.actions"; + public static final String CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB = "bulk.flush.max.size.mb"; + public static final String CONFIG_KEY_BULK_FLUSH_INTERVAL_MS = "bulk.flush.interval.ms"; + + private final Integer bulkProcessorFlushMaxActions; + private final Integer bulkProcessorFlushMaxSizeMb; + private final Integer bulkProcessorFlushIntervalMillis; + + // + // User-facing API and configuration + // + + /** The user specified config map that we forward to Elasticsearch when we create the {@link Client}. */ + private final MapuserConfig; + + /** The function that is used to construct mulitple {@link ActionRequest ActionRequests} from each incoming element. */ + private final ElasticsearchSinkFunction elasticsearchSinkFunction; + + /** Provided to the user via the {@link ElasticsearchSinkFunction} to add {@link ActionRequest ActionRequests}. */ + private transient BulkProcessorIndexer requestIndexer; + + // + // Internals
[jira] [Updated] (FLINK-5384) clean up jira issues
[ https://issues.apache.org/jira/browse/FLINK-5384?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Anton Solovev updated FLINK-5384: - Description: h4. must be closed: FLINK-37 -> from stratosphere; FLINK-87 -> from stratosphere; FLINK-481 -> from stratosphere; FLINK-605 -> from stratosphere; FLINK-639 -> from stratosphere; FLINK-650 -> from stratosphere; FLINK-735 -> from stratosphere; FLINK-456 -> from stratosphere; FLINK-788 -> from stratosphere; FLINK-796 -> from stratosphere; FLINK-805 -> from stratosphere ; FLINK-867 -> from stratosphere; FLINK-879 -> from stratosphere; FLINK-1166 -> closed on github; FLINK-1293 -> outdated; FLINK-1946 -> closed on github; FLINK-2119 -> closed on github; FLINK-2157 -> closed on github; FLINK-2220 -> closed on github; FLINK-2363 -> closed on github; FLINK-2480 -> closed on github; FLINK-2609 -> closed on github; FLINK-2823 -> closed on github; FLINK-3155 -> closed on github; FLINK-3964 -> closed on github; FLINK-4653 -> closed on github; FLINK-4717 -> closed on github; FLINK-4829 -> closed on github; FLINK-5016 -> closed on github; FLINK-2319 -> closed on github, outdated; FLINK-2399 -> closed on github, outdated; FLINK-2428 -> closed on github, outdated; FLINK-2472 -> closed on github, outdated; h4. should be discussed before: FLINK-1055 ; FLINK-1098 -> create other issue to add a colectEach(); FLINK-1100 ; FLINK-1146 ; FLINK-1335 -> maybe rename?; FLINK-1439 ; FLINK-1447 -> firefox problem?; FLINK-1521 -> closed on github; FLINK-1538 -> gsoc2015, is it solved?; FLINK-1541 -> gsoc2015, is it solved?; FLINK-1723 -> almost done? ; FLINK-1814 ; FLINK-1858 -> is QA bot deleted?; FLINK-2023 -> does not block Scala Graph API; FLINK-2032 -> all subtasks done; FLINK-2108 -> almost done? ; FLINK-2309 -> maybe it's worth to merge with FLINK-2316 ? ; FLINK-3109 -> its PR is stuck; FLINK-3154 -> must be addressed as part of a bigger initiative; FLINK-3297 -> solved as third party lib; FLINK-4042 -> outdated; FLINK-4313 -> outdated; FLINK-4760 -> fixed? FLINK-5282 -> closed on github; h4. far from done yet: FLINK-1926 FLINK-3331 h4. still in progress? FLINK-1999; FLINK-1735; FLINK-2030, FLINK-2274, Flink-1727 was: h4. must be closed: FLINK-37 -> from stratosphere; FLINK-87 -> from stratosphere; FLINK-481 -> from stratosphere; FLINK-605 -> from stratosphere; FLINK-639 -> from stratosphere; FLINK-650 -> from stratosphere; FLINK-735 -> from stratosphere; FLINK-456 -> from stratosphere; FLINK-788 -> from stratosphere; FLINK-796 -> from stratosphere; FLINK-805 -> from stratosphere ; FLINK-867 -> from stratosphere; FLINK-879 -> from stratosphere; FLINK-1166 -> closed on github; FLINK-1293 -> outdated; FLINK-1946 -> closed on github; FLINK-2119 -> closed on github; FLINK-2157 -> closed on github; FLINK-2220 -> closed on github; FLINK-2363 -> closed on github; FLINK-2480 -> closed on github; FLINK-2609 -> closed on github; FLINK-2823 -> closed on github; FLINK-3155 -> closed on github; FLINK-3964 -> closed on github; FLINK-4653 -> closed on github; FLINK-4717 -> closed on github; FLINK-4829 -> closed on github; FLINK-5016 -> closed on github; FLINK-2319 -> closed on github, outdated; FLINK-2399 -> closed on github, outdated; FLINK-2428 -> closed on github, outdated; FLINK-2472 -> closed on github, outdated; h4. should be discussed before: FLINK-1055 ; FLINK-1098 -> create other issue to add a colectEach(); FLINK-1100 ; FLINK-1146 ; FLINK-1335 -> maybe rename?; FLINK-1439 ; FLINK-1447 -> firefox problem?; FLINK-1521 -> closed on github; FLINK-1538 -> gsoc2015, is it solved?; FLINK-1541 -> gsoc2015, is it solved?; FLINK-1723 -> almost done? ; FLINK-1814 ; FLINK-1858 -> is QA bot deleted?; FLINK-2023 -> does not block Scala Graph API; FLINK-2032 -> all subtasks done; FLINK-2108 -> almost done? ; FLINK-2309 -> maybe it's worth to merge with FLINK-2316 ? ; FLINK-3109 -> its PR is stuck; FLINK-3154 -> must be addressed as part of a bigger initiative; FLINK-3297 -> solved as third party lib; FLINK-4042 -> outdated; FLINK-4313 -> outdated; FLINK-4760 -> fixed? FLINK-5282 -> closed on github; h4. far from done yet: FLINK-1926 FLINK-3331 h4. still in progress? FLINK-1999; FLINK-1735; > clean up jira issues > > > Key: FLINK-5384 > URL: https://issues.apache.org/jira/browse/FLINK-5384 > Project: Flink > Issue Type: Improvement >Reporter: Anton Solovev >Assignee: Anton Solovev >Priority: Minor > > h4. must be closed: > FLINK-37 -> from stratosphere; > FLINK-87 -> from stratosphere; > FLINK-481 -> from stratosphere; > FLINK-605 -> from stratosphere; > FLINK-639 -> from stratosphere; > FLINK-650 -> from stratosphere; > FLINK-735 -> from stratosphere; > FLINK-456 -> from stratosphere; > FLINK-788 -> from stratosphere; > FLINK-796 -> from stratosphere; > FLINK-805 -> from stratosphere ; > FLINK-867 -> from
[jira] [Updated] (FLINK-5384) clean up jira issues
[ https://issues.apache.org/jira/browse/FLINK-5384?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Anton Solovev updated FLINK-5384: - Description: h4. must be closed: FLINK-37 -> from stratosphere; FLINK-87 -> from stratosphere; FLINK-481 -> from stratosphere; FLINK-605 -> from stratosphere; FLINK-639 -> from stratosphere; FLINK-650 -> from stratosphere; FLINK-735 -> from stratosphere; FLINK-456 -> from stratosphere; FLINK-788 -> from stratosphere; FLINK-796 -> from stratosphere; FLINK-805 -> from stratosphere ; FLINK-867 -> from stratosphere; FLINK-879 -> from stratosphere; FLINK-1166 -> closed on github; FLINK-1293 -> outdated; FLINK-1946 -> closed on github; FLINK-2119 -> closed on github; FLINK-2157 -> closed on github; FLINK-2220 -> closed on github; FLINK-2363 -> closed on github; FLINK-2480 -> closed on github; FLINK-2609 -> closed on github; FLINK-2823 -> closed on github; FLINK-3155 -> closed on github; FLINK-3964 -> closed on github; FLINK-4653 -> closed on github; FLINK-4717 -> closed on github; FLINK-4829 -> closed on github; FLINK-5016 -> closed on github; FLINK-2319 -> closed on github, outdated; FLINK-2399 -> closed on github, outdated; FLINK-2428 -> closed on github, outdated; FLINK-2472 -> closed on github, outdated; h4. should be discussed before: FLINK-1055 ; FLINK-1098 -> create other issue to add a colectEach(); FLINK-1100 ; FLINK-1146 ; FLINK-1335 -> maybe rename?; FLINK-1439 ; FLINK-1447 -> firefox problem?; FLINK-1521 -> closed on github; FLINK-1538 -> gsoc2015, is it solved?; FLINK-1541 -> gsoc2015, is it solved?; FLINK-1723 -> almost done? ; FLINK-1814 ; FLINK-1858 -> is QA bot deleted?; FLINK-2023 -> does not block Scala Graph API; FLINK-2032 -> all subtasks done; FLINK-2108 -> almost done? ; FLINK-2309 -> maybe it's worth to merge with FLINK-2316 ? ; FLINK-3109 -> its PR is stuck; FLINK-3154 -> must be addressed as part of a bigger initiative; FLINK-3297 -> solved as third party lib; FLINK-4042 -> outdated; FLINK-4313 -> outdated; FLINK-4760 -> fixed? FLINK-5282 -> closed on github; h4. far from done yet: FLINK-1926 FLINK-3331 h4. still in progress? FLINK-1999; FLINK-1735; was: h4. must be closed: FLINK-37 -> from stratosphere; FLINK-87 -> from stratosphere; FLINK-481 -> from stratosphere; FLINK-605 -> from stratosphere; FLINK-639 -> from stratosphere; FLINK-650 -> from stratosphere; FLINK-735 -> from stratosphere; FLINK-456 -> from stratosphere; FLINK-788 -> from stratosphere; FLINK-796 -> from stratosphere; FLINK-805 -> from stratosphere ; FLINK-867 -> from stratosphere; FLINK-879 -> from stratosphere; FLINK-1166 -> closed on github; FLINK-1293 -> outdated; FLINK-1946 -> closed on github; FLINK-2119 -> closed on github; FLINK-2157 -> closed on github; FLINK-2220 -> closed on github; FLINK-2363 -> closed on github; FLINK-2480 -> closed on github; FLINK-2609 -> closed on github; FLINK-2823 -> closed on github; FLINK-3155 -> closed on github; FLINK-3964 -> closed on github; FLINK-4653 -> closed on github; FLINK-4717 -> closed on github; FLINK-4829 -> closed on github; FLINK-5016 -> closed on github; FLINK-2319 -> closed on github, outdated; FLINK-2399 -> closed on github, outdated; FLINK-2428 -> closed on github, outdated; FLINK-2472 -> closed on github, outdated; h4. should be discussed before: FLINK-1055 ; FLINK-1098 -> create other issue to add a colectEach(); FLINK-1100 ; FLINK-1146 ; FLINK-1335 -> maybe rename?; FLINK-1439 ; FLINK-1447 -> firefox problem?; FLINK-1521 -> closed on github; FLINK-1538 -> gsoc2015, is it solved?; FLINK-1541 -> gsoc2015, is it solved?; FLINK-1723 -> almost done? ; FLINK-1814 ; FLINK-1858 -> is QA bot deleted?; FLINK-2023 -> does not block Scala Graph API; FLINK-2032 -> all subtasks done; FLINK-2108 -> almost done? ; FLINK-2309 -> maybe it's worth to merge with FLINK-2316 ? ; FLINK-3109 -> its PR is stuck; FLINK-3154 -> must be addressed as part of a bigger initiative; FLINK-3297 -> solved as third party lib; FLINK-4042 -> outdated; FLINK-4313 -> outdated; FLINK-4760 -> fixed? FLINK-5282 -> closed on github; h4. far from done yet: FLINK-1926 FLINK-3331 > clean up jira issues > > > Key: FLINK-5384 > URL: https://issues.apache.org/jira/browse/FLINK-5384 > Project: Flink > Issue Type: Improvement >Reporter: Anton Solovev >Assignee: Anton Solovev >Priority: Minor > > h4. must be closed: > FLINK-37 -> from stratosphere; > FLINK-87 -> from stratosphere; > FLINK-481 -> from stratosphere; > FLINK-605 -> from stratosphere; > FLINK-639 -> from stratosphere; > FLINK-650 -> from stratosphere; > FLINK-735 -> from stratosphere; > FLINK-456 -> from stratosphere; > FLINK-788 -> from stratosphere; > FLINK-796 -> from stratosphere; > FLINK-805 -> from stratosphere ; > FLINK-867 -> from stratosphere; > FLINK-879 -> from stratosphere; > FLINK-1166 -> closed on github; >
[jira] [Commented] (FLINK-5417) Fix the wrong config file name
[ https://issues.apache.org/jira/browse/FLINK-5417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15824194#comment-15824194 ] ASF GitHub Bot commented on FLINK-5417: --- Github user WangTaoTheTonic commented on the issue: https://github.com/apache/flink/pull/3071 Surely not :) > Fix the wrong config file name > --- > > Key: FLINK-5417 > URL: https://issues.apache.org/jira/browse/FLINK-5417 > Project: Flink > Issue Type: Bug > Components: Documentation >Reporter: Tao Wang >Priority: Trivial > > As the config file name is conf/flink-conf.yaml, the usage > "conf/flink-config.yaml" in document is wrong and easy to confuse user. We > should correct them. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #3071: [FLINK-5417][DOCUMENTATION]correct the wrong config file ...
Github user WangTaoTheTonic commented on the issue: https://github.com/apache/flink/pull/3071 Surely not :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Resolved] (FLINK-3580) Reintroduce Date/Time and implement scalar functions for it
[ https://issues.apache.org/jira/browse/FLINK-3580?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Timo Walther resolved FLINK-3580. - Resolution: Fixed Fix Version/s: 1.2.0 All functions mentioned in this issue have been implemented in 1.2.0. > Reintroduce Date/Time and implement scalar functions for it > --- > > Key: FLINK-3580 > URL: https://issues.apache.org/jira/browse/FLINK-3580 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Timo Walther > Fix For: 1.2.0 > > > This task includes: > {code} > DATETIME_PLUS > EXTRACT_DATE > FLOOR > CEIL > CURRENT_TIME > CURRENT_TIMESTAMP > LOCALTIME > LOCALTIMESTAMP > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #3071: [FLINK-5417][DOCUMENTATION]correct the wrong config file ...
Github user zentol commented on the issue: https://github.com/apache/flink/pull/3071 Could be; it's a bit frightening to see how much the results differ based on which application one uses :/ Anyway, I will add this to my batch of commits to merge. However, the massive diff drove me crazy, so I massaged the svg file a bit and the diff is now a bit smaller (see here: https://github.com/zentol/flink/commit/4c8d2bde83aeb9d066845c26ea937228d077f867). Hope you don't mind that. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-5514) Implement an efficient physical execution for CUBE/ROLLUP/GROUPING SETS
Timo Walther created FLINK-5514: --- Summary: Implement an efficient physical execution for CUBE/ROLLUP/GROUPING SETS Key: FLINK-5514 URL: https://issues.apache.org/jira/browse/FLINK-5514 Project: Flink Issue Type: Improvement Components: Table API & SQL Reporter: Timo Walther A first support for GROUPING SETS has been added in FLINK-5303. However, the current runtime implementation is not very efficient as it basically only translates logical operators to physical operators i.e. grouping sets are currently only translated into multiple groupings that are unioned together. A rough design document for this has been created in FLINK-2980. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (FLINK-5303) Add CUBE/ROLLUP/GROUPING SETS operator in SQL
[ https://issues.apache.org/jira/browse/FLINK-5303?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Timo Walther resolved FLINK-5303. - Resolution: Fixed Fix Version/s: 1.3.0 Fixed in 1.3.0: ef8cdfe5930201f79c78f34cc9f462b4e88b3da1 > Add CUBE/ROLLUP/GROUPING SETS operator in SQL > - > > Key: FLINK-5303 > URL: https://issues.apache.org/jira/browse/FLINK-5303 > Project: Flink > Issue Type: New Feature > Components: Documentation, Table API & SQL >Reporter: Alexander Chermenin >Assignee: Alexander Chermenin > Fix For: 1.3.0 > > > Add support for such operators as CUBE, ROLLUP and GROUPING SETS in SQL. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-5384) clean up jira issues
[ https://issues.apache.org/jira/browse/FLINK-5384?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Anton Solovev updated FLINK-5384: - Description: h4. must be closed: FLINK-37 -> from stratosphere; FLINK-87 -> from stratosphere; FLINK-481 -> from stratosphere; FLINK-605 -> from stratosphere; FLINK-639 -> from stratosphere; FLINK-650 -> from stratosphere; FLINK-735 -> from stratosphere; FLINK-456 -> from stratosphere; FLINK-788 -> from stratosphere; FLINK-796 -> from stratosphere; FLINK-805 -> from stratosphere ; FLINK-867 -> from stratosphere; FLINK-879 -> from stratosphere; FLINK-1166 -> closed on github; FLINK-1293 -> outdated; FLINK-1946 -> closed on github; FLINK-2119 -> closed on github; FLINK-2157 -> closed on github; FLINK-2220 -> closed on github; FLINK-2363 -> closed on github; FLINK-2480 -> closed on github; FLINK-2609 -> closed on github; FLINK-2823 -> closed on github; FLINK-3155 -> closed on github; FLINK-3964 -> closed on github; FLINK-4653 -> closed on github; FLINK-4717 -> closed on github; FLINK-4829 -> closed on github; FLINK-5016 -> closed on github; FLINK-2319 -> closed on github, outdated; FLINK-2399 -> closed on github, outdated; FLINK-2428 -> closed on github, outdated; FLINK-2472 -> closed on github, outdated; h4. should be discussed before: FLINK-1055 ; FLINK-1098 -> create other issue to add a colectEach(); FLINK-1100 ; FLINK-1146 ; FLINK-1335 -> maybe rename?; FLINK-1439 ; FLINK-1447 -> firefox problem?; FLINK-1521 -> closed on github; FLINK-1538 -> gsoc2015, is it solved?; FLINK-1541 -> gsoc2015, is it solved?; FLINK-1723 -> almost done? ; FLINK-1814 ; FLINK-1858 -> is QA bot deleted?; FLINK-2023 -> does not block Scala Graph API; FLINK-2032 -> all subtasks done; FLINK-2108 -> almost done? ; FLINK-2309 -> maybe it's worth to merge with FLINK-2316 ? ; FLINK-3109 -> its PR is stuck; FLINK-3154 -> must be addressed as part of a bigger initiative; FLINK-3297 -> solved as third party lib; FLINK-4042 -> outdated; FLINK-4313 -> outdated; FLINK-4760 -> fixed? FLINK-5282 -> closed on github; h4. far from done yet: FLINK-1926 FLINK-3331 was: h4. must be closed: FLINK-37 -> from stratosphere; FLINK-87 -> from stratosphere; FLINK-481 -> from stratosphere; FLINK-605 -> from stratosphere; FLINK-639 -> from stratosphere; FLINK-650 -> from stratosphere; FLINK-735 -> from stratosphere; FLINK-456 -> from stratosphere; FLINK-788 -> from stratosphere; FLINK-796 -> from stratosphere; FLINK-805 -> from stratosphere ; FLINK-867 -> from stratosphere; FLINK-879 -> from stratosphere; FLINK-1166 -> closed on github; FLINK-1946 -> closed on github; FLINK-2119 -> closed on github; FLINK-2157 -> closed on github; FLINK-2220 -> closed on github; FLINK-2363 -> closed on github; FLINK-2480 -> closed on github; FLINK-2609 -> closed on github; FLINK-2823 -> closed on github; FLINK-3155 -> closed on github; FLINK-3964 -> closed on github; FLINK-4653 -> closed on github; FLINK-4717 -> closed on github; FLINK-4829 -> closed on github; FLINK-5016 -> closed on github; FLINK-2319 -> closed on github, outdated; FLINK-2399 -> closed on github, outdated; FLINK-2428 -> closed on github, outdated; FLINK-2472 -> closed on github, outdated; h4. should be discussed before: FLINK-1055 ; FLINK-1098 -> create other issue to add a colectEach(); FLINK-1100 ; FLINK-1146 ; FLINK-1335 -> maybe rename?; FLINK-1439 ; FLINK-1447 -> firefox problem?; FLINK-1521 -> closed on github; FLINK-1538 -> gsoc2015, is it solved?; FLINK-1541 -> gsoc2015, is it solved?; FLINK-1723 -> almost done? ; FLINK-1814 ; FLINK-1858 -> is QA bot deleted?; FLINK-2023 -> does not block Scala Graph API; FLINK-2032 -> all subtasks done; FLINK-2108 -> almost done? ; FLINK-2309 -> maybe it's worth to merge with FLINK-2316 ? ; FLINK-3109 -> its PR is stuck; FLINK-3154 -> must be addressed as part of a bigger initiative; FLINK-3297 -> solved as third party lib; FLINK-4042 -> outdated; FLINK-4313 -> outdated; FLINK-4760 -> fixed? FLINK-5282 -> closed on github; h4. far from done yet: FLINK-1926 FLINK-3331 > clean up jira issues > > > Key: FLINK-5384 > URL: https://issues.apache.org/jira/browse/FLINK-5384 > Project: Flink > Issue Type: Improvement >Reporter: Anton Solovev >Assignee: Anton Solovev >Priority: Minor > > h4. must be closed: > FLINK-37 -> from stratosphere; > FLINK-87 -> from stratosphere; > FLINK-481 -> from stratosphere; > FLINK-605 -> from stratosphere; > FLINK-639 -> from stratosphere; > FLINK-650 -> from stratosphere; > FLINK-735 -> from stratosphere; > FLINK-456 -> from stratosphere; > FLINK-788 -> from stratosphere; > FLINK-796 -> from stratosphere; > FLINK-805 -> from stratosphere ; > FLINK-867 -> from stratosphere; > FLINK-879 -> from stratosphere; > FLINK-1166 -> closed on github; > FLINK-1293 -> outdated; > FLINK-1946 -> closed on github; > FLINK-2119 -> closed
[jira] [Commented] (FLINK-5303) Add CUBE/ROLLUP/GROUPING SETS operator in SQL
[ https://issues.apache.org/jira/browse/FLINK-5303?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15824176#comment-15824176 ] ASF GitHub Bot commented on FLINK-5303: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2976 > Add CUBE/ROLLUP/GROUPING SETS operator in SQL > - > > Key: FLINK-5303 > URL: https://issues.apache.org/jira/browse/FLINK-5303 > Project: Flink > Issue Type: New Feature > Components: Documentation, Table API & SQL >Reporter: Alexander Chermenin >Assignee: Alexander Chermenin > Fix For: 1.3.0 > > > Add support for such operators as CUBE, ROLLUP and GROUPING SETS in SQL. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2976: [FLINK-5303] [table] Support for SQL GROUPING SETS...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2976 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5417) Fix the wrong config file name
[ https://issues.apache.org/jira/browse/FLINK-5417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15824170#comment-15824170 ] ASF GitHub Bot commented on FLINK-5417: --- Github user WangTaoTheTonic commented on the issue: https://github.com/apache/flink/pull/3071 I guess it is probably that the illustrator added sth. > Fix the wrong config file name > --- > > Key: FLINK-5417 > URL: https://issues.apache.org/jira/browse/FLINK-5417 > Project: Flink > Issue Type: Bug > Components: Documentation >Reporter: Tao Wang >Priority: Trivial > > As the config file name is conf/flink-conf.yaml, the usage > "conf/flink-config.yaml" in document is wrong and easy to confuse user. We > should correct them. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #3071: [FLINK-5417][DOCUMENTATION]correct the wrong config file ...
Github user WangTaoTheTonic commented on the issue: https://github.com/apache/flink/pull/3071 I guess it is probably that the illustrator added sth. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #243: [FLINK-1293] Add support for out-of-place aggregations
Github user tonycox commented on the issue: https://github.com/apache/flink/pull/243 Ok, cool. I can go through abandoned PRs and add them to https://issues.apache.org/jira/browse/FLINK-5384. Wouldn't you mind @fhueske ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-1293) Add support for out-of-place aggregations
[ https://issues.apache.org/jira/browse/FLINK-1293?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15824168#comment-15824168 ] ASF GitHub Bot commented on FLINK-1293: --- Github user tonycox commented on the issue: https://github.com/apache/flink/pull/243 Ok, cool. I can go through abandoned PRs and add them to https://issues.apache.org/jira/browse/FLINK-5384. Wouldn't you mind @fhueske ? > Add support for out-of-place aggregations > - > > Key: FLINK-1293 > URL: https://issues.apache.org/jira/browse/FLINK-1293 > Project: Flink > Issue Type: Improvement > Components: DataSet API >Affects Versions: 0.7.0-incubating >Reporter: Viktor Rosenfeld >Assignee: Fabian Hueske >Priority: Minor > > Currently, the output of an aggregation is of the same type as the input. > This restriction has to major drawbacks: > 1. Every tuple field can only be used in one aggregation because the > aggregations result is stored in the field. > 2. Aggregations having a return type that is different from the input type, > e.g., count or average, cannot be implemented. > It would be nice to have the aggregation return any kind of tuple as a > result, so the restrictions above no longer apply. > See also: > - > https://github.com/stratosphere/stratosphere/wiki/Design-of-Aggregate-Operator > - > http://apache-flink-incubator-mailing-list-archive.1008284.n3.nabble.com/Hi-Aggregation-support-td2311.html -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2976: [FLINK-5303] [table] Support for SQL GROUPING SETS clause...
Github user twalthr commented on the issue: https://github.com/apache/flink/pull/2976 Thanks for the update @chermenin. I will have a final look over the code and merge it. I will also add a follow-up issue for efficiency. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5303) Add CUBE/ROLLUP/GROUPING SETS operator in SQL
[ https://issues.apache.org/jira/browse/FLINK-5303?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15824169#comment-15824169 ] ASF GitHub Bot commented on FLINK-5303: --- Github user twalthr commented on the issue: https://github.com/apache/flink/pull/2976 Thanks for the update @chermenin. I will have a final look over the code and merge it. I will also add a follow-up issue for efficiency. > Add CUBE/ROLLUP/GROUPING SETS operator in SQL > - > > Key: FLINK-5303 > URL: https://issues.apache.org/jira/browse/FLINK-5303 > Project: Flink > Issue Type: New Feature > Components: Documentation, Table API & SQL >Reporter: Alexander Chermenin >Assignee: Alexander Chermenin > > Add support for such operators as CUBE, ROLLUP and GROUPING SETS in SQL. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1293) Add support for out-of-place aggregations
[ https://issues.apache.org/jira/browse/FLINK-1293?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15824160#comment-15824160 ] ASF GitHub Bot commented on FLINK-1293: --- Github user fhueske commented on the issue: https://github.com/apache/flink/pull/243 Thanks for the ping @tonycox. I think this PR can be closed. The functionality it adds is available in the Table API. > Add support for out-of-place aggregations > - > > Key: FLINK-1293 > URL: https://issues.apache.org/jira/browse/FLINK-1293 > Project: Flink > Issue Type: Improvement > Components: DataSet API >Affects Versions: 0.7.0-incubating >Reporter: Viktor Rosenfeld >Assignee: Fabian Hueske >Priority: Minor > > Currently, the output of an aggregation is of the same type as the input. > This restriction has to major drawbacks: > 1. Every tuple field can only be used in one aggregation because the > aggregations result is stored in the field. > 2. Aggregations having a return type that is different from the input type, > e.g., count or average, cannot be implemented. > It would be nice to have the aggregation return any kind of tuple as a > result, so the restrictions above no longer apply. > See also: > - > https://github.com/stratosphere/stratosphere/wiki/Design-of-Aggregate-Operator > - > http://apache-flink-incubator-mailing-list-archive.1008284.n3.nabble.com/Hi-Aggregation-support-td2311.html -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #243: [FLINK-1293] Add support for out-of-place aggregations
Github user fhueske commented on the issue: https://github.com/apache/flink/pull/243 Thanks for the ping @tonycox. I think this PR can be closed. The functionality it adds is available in the Table API. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---