[jira] [Commented] (FLINK-10766) Add a link to flink-china.org in Flink website
[ https://issues.apache.org/jira/browse/FLINK-10766?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16684644#comment-16684644 ] ASF GitHub Bot commented on FLINK-10766: wuchong closed pull request #134: [FLINK-10766] Add a link to flink-china.org in Flink website URL: https://github.com/apache/flink-web/pull/134 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/_config.yml b/_config.yml index 44e65167..f9254a67 100644 --- a/_config.yml +++ b/_config.yml @@ -120,6 +120,7 @@ stackoverflow: "https://stackoverflow.com/search?q=flink; twitter: "https://twitter.com/apacheflink; github: "https://github.com/apache/flink; wiki: "https://cwiki.apache.org/confluence/display/FLINK/Apache+Flink+Home; +chinese: "https://flink-china.org/; twitter-handle: "ApacheFlink" diff --git a/_includes/navbar.html b/_includes/navbar.html index 3123650d..50363e5c 100755 --- a/_includes/navbar.html +++ b/_includes/navbar.html @@ -90,6 +90,8 @@ Plan Visualizer + +中文站 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add a link to flink-china.org in Flink website > -- > > Key: FLINK-10766 > URL: https://issues.apache.org/jira/browse/FLINK-10766 > Project: Flink > Issue Type: Wish > Components: Project Website >Reporter: Jark Wu >Priority: Minor > Labels: pull-request-available > > The Apache Flink China website has been reworked and published in the last > days, see more http://flink-china.org > The flink-china.org is a very popular website in China for flink users. Now > we included the latest translated documentation (v1.6) and use-cases and > meetup events in the website. > We hope to add a link to flink-china.org in the flink website > (flink.apache.org), so that we can help and attract more Chinese Flink users > and improve the community. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] maqingxiang opened a new pull request #7083: [hotfix][flink-runtime] modify maxAttempts when find a unique file name for the spilling channel
maqingxiang opened a new pull request #7083: [hotfix][flink-runtime] modify maxAttempts when find a unique file name for the spilling channel URL: https://github.com/apache/flink/pull/7083 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] hequn8128 commented on a change in pull request #7079: [FLINK-10845][table] Support multiple different DISTINCT aggregates for batch
hequn8128 commented on a change in pull request #7079: [FLINK-10845][table] Support multiple different DISTINCT aggregates for batch URL: https://github.com/apache/flink/pull/7079#discussion_r232887060 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala ## @@ -1309,6 +1312,18 @@ object ScalarOperators { | $resultTerm = ${expr(left.resultTerm, right.resultTerm)}; |} |""".stripMargin +} else if (nullCheck && compareNull) { + s""" +|${left.code} +|${right.code} +|boolean $nullTerm = false; +|$resultTypeTerm $resultTerm; +|if (${left.nullTerm} || ${right.nullTerm}) { +| $resultTerm = ${left.nullTerm} && ${right.nullTerm}; Review comment: Add some null data in tests to test this logic. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] hequn8128 commented on a change in pull request #7079: [FLINK-10845][table] Support multiple different DISTINCT aggregates for batch
hequn8128 commented on a change in pull request #7079: [FLINK-10845][table] Support multiple different DISTINCT aggregates for batch URL: https://github.com/apache/flink/pull/7079#discussion_r232887066 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/AggregateITCase.scala ## @@ -559,4 +559,57 @@ class AggregateITCase( TestBaseUtils.compareResultAsText(result.asJava, expected) } + + @Test + def testMultipleDistinctWithDiffParams(): Unit = { +val env = ExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env, config) +val sqlQuery = + "SELECT b, " + + " SUM(DISTINCT (a / 3)), " + + " COUNT(DISTINCT SUBSTRING(c FROM 1 FOR 2))," + + " COUNT(DISTINCT c) " + + "FROM MyTable " + + "GROUP BY b " + + "ORDER BY b" + +val data = new scala.collection.mutable.MutableList[(Int, Long, String)] Review comment: Replace this with `CollectionDataSets.get5TupleDataSet(env)`. I think we can add a `case when` to generate null data. For example: ``` val t = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c, 'd, 'e) tEnv.registerTable("MyTable", t) val sqlAddNull = "SELECT a, b, c, e, CASE d WHEN 'Hallo' THEN null ELSE d END AS d From MyTable" val sqlQuery = "SELECT d, " + " COUNT(DISTINCT d), " + " COUNT(DISTINCT e) " + s"FROM ($sqlAddNull) " + "GROUP BY d " + "ORDER BY d" ``` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-9555) Support table api in scala shell
[ https://issues.apache.org/jira/browse/FLINK-9555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16684704#comment-16684704 ] sunjincheng commented on FLINK-9555: Thanks for report this issue Jeff Zhang ! +1 to make TableApi supported by default in Scala-shell. > Support table api in scala shell > > > Key: FLINK-9555 > URL: https://issues.apache.org/jira/browse/FLINK-9555 > Project: Flink > Issue Type: New Feature > Components: Scala Shell >Affects Versions: 1.5.0 >Reporter: Jeff Zhang >Priority: Major > > It would be nice to have table api available in scala shell so that user can > experience table api in interactive way. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10845) Support DISTINCT aggregates for batch
[ https://issues.apache.org/jira/browse/FLINK-10845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16684676#comment-16684676 ] ASF GitHub Bot commented on FLINK-10845: hequn8128 commented on a change in pull request #7079: [FLINK-10845][table] Support multiple different DISTINCT aggregates for batch URL: https://github.com/apache/flink/pull/7079#discussion_r232887060 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala ## @@ -1309,6 +1312,18 @@ object ScalarOperators { | $resultTerm = ${expr(left.resultTerm, right.resultTerm)}; |} |""".stripMargin +} else if (nullCheck && compareNull) { + s""" +|${left.code} +|${right.code} +|boolean $nullTerm = false; +|$resultTypeTerm $resultTerm; +|if (${left.nullTerm} || ${right.nullTerm}) { +| $resultTerm = ${left.nullTerm} && ${right.nullTerm}; Review comment: Add some null data in tests to test this logic. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support DISTINCT aggregates for batch > - > > Key: FLINK-10845 > URL: https://issues.apache.org/jira/browse/FLINK-10845 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Reporter: Timo Walther >Assignee: xueyu >Priority: Major > Labels: pull-request-available > > Currently, we support distinct aggregates for streaming. However, executing > the same query on batch like the following test: > {code} > val env = ExecutionEnvironment.getExecutionEnvironment > val tEnv = TableEnvironment.getTableEnvironment(env) > val sqlQuery = > "SELECT b, " + > " SUM(DISTINCT (a / 3)), " + > " COUNT(DISTINCT SUBSTRING(c FROM 1 FOR 2))," + > " COUNT(DISTINCT c) " + > "FROM MyTable " + > "GROUP BY b" > val data = new mutable.MutableList[(Int, Long, String)] > data.+=((1, 1L, "Hi")) > data.+=((2, 2L, "Hello")) > data.+=((3, 2L, "Hello world")) > data.+=((4, 3L, "Hello world, how are you?")) > data.+=((5, 3L, "I am fine.")) > data.+=((6, 3L, "Luke Skywalker")) > data.+=((7, 4L, "Comment#1")) > data.+=((8, 4L, "Comment#2")) > data.+=((9, 4L, "Comment#3")) > data.+=((10, 4L, "Comment#4")) > data.+=((11, 5L, "Comment#5")) > data.+=((12, 5L, "Comment#6")) > data.+=((13, 5L, "Comment#7")) > data.+=((14, 5L, "Comment#8")) > data.+=((15, 5L, "Comment#9")) > data.+=((16, 6L, "Comment#10")) > data.+=((17, 6L, "Comment#11")) > data.+=((18, 6L, "Comment#12")) > data.+=((19, 6L, "Comment#13")) > data.+=((20, 6L, "Comment#14")) > data.+=((21, 6L, "Comment#15")) > val t = env.fromCollection(data).toTable(tEnv).as('a, 'b, 'c) > tEnv.registerTable("MyTable", t) > tEnv.sqlQuery(sqlQuery).toDataSet[Row].print() > {code} > Fails with: > {code} > org.apache.flink.table.codegen.CodeGenException: Unsupported call: IS NOT > DISTINCT FROM > If you think this function should be supported, you can create an issue and > start a discussion for it. > at > org.apache.flink.table.codegen.CodeGenerator$$anonfun$visitCall$3.apply(CodeGenerator.scala:1027) > at > org.apache.flink.table.codegen.CodeGenerator$$anonfun$visitCall$3.apply(CodeGenerator.scala:1027) > at scala.Option.getOrElse(Option.scala:121) > at > org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:1027) > at > org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:66) > at org.apache.calcite.rex.RexCall.accept(RexCall.java:107) > at > org.apache.flink.table.codegen.CodeGenerator.generateExpression(CodeGenerator.scala:247) > at > org.apache.flink.table.plan.nodes.dataset.DataSetJoin.addInnerJoin(DataSetJoin.scala:221) > at > org.apache.flink.table.plan.nodes.dataset.DataSetJoin.translateToPlan(DataSetJoin.scala:170) > at > org.apache.flink.table.plan.nodes.dataset.DataSetCalc.translateToPlan(DataSetCalc.scala:91) > at > org.apache.flink.table.plan.nodes.dataset.DataSetJoin.translateToPlan(DataSetJoin.scala:165) > at > org.apache.flink.table.plan.nodes.dataset.DataSetCalc.translateToPlan(DataSetCalc.scala:91) > at > org.apache.flink.table.api.BatchTableEnvironment.translate(BatchTableEnvironment.scala:498) > at > org.apache.flink.table.api.BatchTableEnvironment.translate(BatchTableEnvironment.scala:476) > at >
[jira] [Commented] (FLINK-10845) Support DISTINCT aggregates for batch
[ https://issues.apache.org/jira/browse/FLINK-10845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16684675#comment-16684675 ] ASF GitHub Bot commented on FLINK-10845: hequn8128 commented on a change in pull request #7079: [FLINK-10845][table] Support multiple different DISTINCT aggregates for batch URL: https://github.com/apache/flink/pull/7079#discussion_r232887066 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/AggregateITCase.scala ## @@ -559,4 +559,57 @@ class AggregateITCase( TestBaseUtils.compareResultAsText(result.asJava, expected) } + + @Test + def testMultipleDistinctWithDiffParams(): Unit = { +val env = ExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env, config) +val sqlQuery = + "SELECT b, " + + " SUM(DISTINCT (a / 3)), " + + " COUNT(DISTINCT SUBSTRING(c FROM 1 FOR 2))," + + " COUNT(DISTINCT c) " + + "FROM MyTable " + + "GROUP BY b " + + "ORDER BY b" + +val data = new scala.collection.mutable.MutableList[(Int, Long, String)] Review comment: Replace this with `CollectionDataSets.get5TupleDataSet(env)`. I think we can add a `case when` to generate null data. For example: ``` val t = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c, 'd, 'e) tEnv.registerTable("MyTable", t) val sqlAddNull = "SELECT a, b, c, e, CASE d WHEN 'Hallo' THEN null ELSE d END AS d From MyTable" val sqlQuery = "SELECT d, " + " COUNT(DISTINCT d), " + " COUNT(DISTINCT e) " + s"FROM ($sqlAddNull) " + "GROUP BY d " + "ORDER BY d" ``` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support DISTINCT aggregates for batch > - > > Key: FLINK-10845 > URL: https://issues.apache.org/jira/browse/FLINK-10845 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Reporter: Timo Walther >Assignee: xueyu >Priority: Major > Labels: pull-request-available > > Currently, we support distinct aggregates for streaming. However, executing > the same query on batch like the following test: > {code} > val env = ExecutionEnvironment.getExecutionEnvironment > val tEnv = TableEnvironment.getTableEnvironment(env) > val sqlQuery = > "SELECT b, " + > " SUM(DISTINCT (a / 3)), " + > " COUNT(DISTINCT SUBSTRING(c FROM 1 FOR 2))," + > " COUNT(DISTINCT c) " + > "FROM MyTable " + > "GROUP BY b" > val data = new mutable.MutableList[(Int, Long, String)] > data.+=((1, 1L, "Hi")) > data.+=((2, 2L, "Hello")) > data.+=((3, 2L, "Hello world")) > data.+=((4, 3L, "Hello world, how are you?")) > data.+=((5, 3L, "I am fine.")) > data.+=((6, 3L, "Luke Skywalker")) > data.+=((7, 4L, "Comment#1")) > data.+=((8, 4L, "Comment#2")) > data.+=((9, 4L, "Comment#3")) > data.+=((10, 4L, "Comment#4")) > data.+=((11, 5L, "Comment#5")) > data.+=((12, 5L, "Comment#6")) > data.+=((13, 5L, "Comment#7")) > data.+=((14, 5L, "Comment#8")) > data.+=((15, 5L, "Comment#9")) > data.+=((16, 6L, "Comment#10")) > data.+=((17, 6L, "Comment#11")) > data.+=((18, 6L, "Comment#12")) > data.+=((19, 6L, "Comment#13")) > data.+=((20, 6L, "Comment#14")) > data.+=((21, 6L, "Comment#15")) > val t = env.fromCollection(data).toTable(tEnv).as('a, 'b, 'c) > tEnv.registerTable("MyTable", t) > tEnv.sqlQuery(sqlQuery).toDataSet[Row].print() > {code} > Fails with: > {code} > org.apache.flink.table.codegen.CodeGenException: Unsupported call: IS NOT > DISTINCT FROM > If you think this function should be supported, you can create an issue and > start a discussion for it. > at > org.apache.flink.table.codegen.CodeGenerator$$anonfun$visitCall$3.apply(CodeGenerator.scala:1027) > at > org.apache.flink.table.codegen.CodeGenerator$$anonfun$visitCall$3.apply(CodeGenerator.scala:1027) > at scala.Option.getOrElse(Option.scala:121) > at > org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:1027) > at > org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:66) > at org.apache.calcite.rex.RexCall.accept(RexCall.java:107) > at > org.apache.flink.table.codegen.CodeGenerator.generateExpression(CodeGenerator.scala:247) > at >
[jira] [Closed] (FLINK-10851) sqlUpdate support complex insert grammar
[ https://issues.apache.org/jira/browse/FLINK-10851?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] frank wang closed FLINK-10851. -- Resolution: Resolved > sqlUpdate support complex insert grammar > > > Key: FLINK-10851 > URL: https://issues.apache.org/jira/browse/FLINK-10851 > Project: Flink > Issue Type: Bug >Reporter: frank wang >Priority: Major > Labels: pull-request-available > > my code is > {{tableEnv.sqlUpdate("insert into kafka.sdkafka.product_4 select filedName1, > filedName2 from kafka.sdkafka.order_4");}} > but flink give me error info, said kafka "No table was registered under the > name kafka" > i modify the code ,that is ok now > TableEnvironment.scala > {code:java} > def sqlUpdate(stmt: String, config: QueryConfig): Unit = { > val planner = new FlinkPlannerImpl(getFrameworkConfig, getPlanner, > getTypeFactory) > // parse the sql query > val parsed = planner.parse(stmt) > parsed match { > case insert: SqlInsert => > // validate the SQL query > val query = insert.getSource > val validatedQuery = planner.validate(query) > // get query result as Table > val queryResult = new Table(this, > LogicalRelNode(planner.rel(validatedQuery).rel)) > // get name of sink table > val targetTableName = > insert.getTargetTable.asInstanceOf[SqlIdentifier].names.get(0) > // insert query result into sink table > insertInto(queryResult, targetTableName, config) > case _ => > throw new TableException( > "Unsupported SQL query! sqlUpdate() only accepts SQL statements of > type INSERT.") > } > } > {code} > should modify to this > {code:java} > def sqlUpdate(stmt: String, config: QueryConfig): Unit = { > val planner = new FlinkPlannerImpl(getFrameworkConfig, getPlanner, > getTypeFactory) > // parse the sql query > val parsed = planner.parse(stmt) > parsed match { > case insert: SqlInsert => > // validate the SQL query > val query = insert.getSource > val validatedQuery = planner.validate(query) > // get query result as Table > val queryResult = new Table(this, > LogicalRelNode(planner.rel(validatedQuery).rel)) > // get name of sink table > //val targetTableName = > insert.getTargetTable.asInstanceOf[SqlIdentifier].names.get(0) > val targetTableName = insert.getTargetTable.toString > // insert query result into sink table > insertInto(queryResult, targetTableName, config) > case _ => > throw new TableException( > "Unsupported SQL query! sqlUpdate() only accepts SQL statements of > type INSERT.") > } > } > {code} > > i hope this can be acceptted, thx -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10851) sqlUpdate support complex insert grammar
[ https://issues.apache.org/jira/browse/FLINK-10851?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16684717#comment-16684717 ] frank wang commented on FLINK-10851: OK,it can work,thx > sqlUpdate support complex insert grammar > > > Key: FLINK-10851 > URL: https://issues.apache.org/jira/browse/FLINK-10851 > Project: Flink > Issue Type: Bug >Reporter: frank wang >Priority: Major > Labels: pull-request-available > > my code is > {{tableEnv.sqlUpdate("insert into kafka.sdkafka.product_4 select filedName1, > filedName2 from kafka.sdkafka.order_4");}} > but flink give me error info, said kafka "No table was registered under the > name kafka" > i modify the code ,that is ok now > TableEnvironment.scala > {code:java} > def sqlUpdate(stmt: String, config: QueryConfig): Unit = { > val planner = new FlinkPlannerImpl(getFrameworkConfig, getPlanner, > getTypeFactory) > // parse the sql query > val parsed = planner.parse(stmt) > parsed match { > case insert: SqlInsert => > // validate the SQL query > val query = insert.getSource > val validatedQuery = planner.validate(query) > // get query result as Table > val queryResult = new Table(this, > LogicalRelNode(planner.rel(validatedQuery).rel)) > // get name of sink table > val targetTableName = > insert.getTargetTable.asInstanceOf[SqlIdentifier].names.get(0) > // insert query result into sink table > insertInto(queryResult, targetTableName, config) > case _ => > throw new TableException( > "Unsupported SQL query! sqlUpdate() only accepts SQL statements of > type INSERT.") > } > } > {code} > should modify to this > {code:java} > def sqlUpdate(stmt: String, config: QueryConfig): Unit = { > val planner = new FlinkPlannerImpl(getFrameworkConfig, getPlanner, > getTypeFactory) > // parse the sql query > val parsed = planner.parse(stmt) > parsed match { > case insert: SqlInsert => > // validate the SQL query > val query = insert.getSource > val validatedQuery = planner.validate(query) > // get query result as Table > val queryResult = new Table(this, > LogicalRelNode(planner.rel(validatedQuery).rel)) > // get name of sink table > //val targetTableName = > insert.getTargetTable.asInstanceOf[SqlIdentifier].names.get(0) > val targetTableName = insert.getTargetTable.toString > // insert query result into sink table > insertInto(queryResult, targetTableName, config) > case _ => > throw new TableException( > "Unsupported SQL query! sqlUpdate() only accepts SQL statements of > type INSERT.") > } > } > {code} > > i hope this can be acceptted, thx -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] hequn8128 commented on a change in pull request #7079: [FLINK-10845][table] Support multiple different DISTINCT aggregates for batch
hequn8128 commented on a change in pull request #7079: [FLINK-10845][table] Support multiple different DISTINCT aggregates for batch URL: https://github.com/apache/flink/pull/7079#discussion_r232887066 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/AggregateITCase.scala ## @@ -559,4 +559,57 @@ class AggregateITCase( TestBaseUtils.compareResultAsText(result.asJava, expected) } + + @Test + def testMultipleDistinctWithDiffParams(): Unit = { +val env = ExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env, config) +val sqlQuery = + "SELECT b, " + + " SUM(DISTINCT (a / 3)), " + + " COUNT(DISTINCT SUBSTRING(c FROM 1 FOR 2))," + + " COUNT(DISTINCT c) " + + "FROM MyTable " + + "GROUP BY b " + + "ORDER BY b" + +val data = new scala.collection.mutable.MutableList[(Int, Long, String)] Review comment: Replace this with `CollectionDataSets.get5TupleDataSet(env)`. I think we can add a `case when` to generate null data. For example(just an example): ``` val t = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c, 'd, 'e) tEnv.registerTable("MyTable", t) val sqlAddNull = "SELECT a, b, c, e, CASE d WHEN 'Hallo' THEN null ELSE d END AS d From MyTable" val sqlQuery = "SELECT d, " + " COUNT(DISTINCT d), " + " COUNT(DISTINCT e) " + s"FROM ($sqlAddNull) " + "GROUP BY d " + "ORDER BY d" ``` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10845) Support DISTINCT aggregates for batch
[ https://issues.apache.org/jira/browse/FLINK-10845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16684693#comment-16684693 ] ASF GitHub Bot commented on FLINK-10845: hequn8128 commented on a change in pull request #7079: [FLINK-10845][table] Support multiple different DISTINCT aggregates for batch URL: https://github.com/apache/flink/pull/7079#discussion_r232887066 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/AggregateITCase.scala ## @@ -559,4 +559,57 @@ class AggregateITCase( TestBaseUtils.compareResultAsText(result.asJava, expected) } + + @Test + def testMultipleDistinctWithDiffParams(): Unit = { +val env = ExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env, config) +val sqlQuery = + "SELECT b, " + + " SUM(DISTINCT (a / 3)), " + + " COUNT(DISTINCT SUBSTRING(c FROM 1 FOR 2))," + + " COUNT(DISTINCT c) " + + "FROM MyTable " + + "GROUP BY b " + + "ORDER BY b" + +val data = new scala.collection.mutable.MutableList[(Int, Long, String)] Review comment: Replace this with `CollectionDataSets.get5TupleDataSet(env)`. I think we can add a `case when` to generate null data. For example(just an example): ``` val t = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c, 'd, 'e) tEnv.registerTable("MyTable", t) val sqlAddNull = "SELECT a, b, c, e, CASE d WHEN 'Hallo' THEN null ELSE d END AS d From MyTable" val sqlQuery = "SELECT d, " + " COUNT(DISTINCT d), " + " COUNT(DISTINCT e) " + s"FROM ($sqlAddNull) " + "GROUP BY d " + "ORDER BY d" ``` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support DISTINCT aggregates for batch > - > > Key: FLINK-10845 > URL: https://issues.apache.org/jira/browse/FLINK-10845 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Reporter: Timo Walther >Assignee: xueyu >Priority: Major > Labels: pull-request-available > > Currently, we support distinct aggregates for streaming. However, executing > the same query on batch like the following test: > {code} > val env = ExecutionEnvironment.getExecutionEnvironment > val tEnv = TableEnvironment.getTableEnvironment(env) > val sqlQuery = > "SELECT b, " + > " SUM(DISTINCT (a / 3)), " + > " COUNT(DISTINCT SUBSTRING(c FROM 1 FOR 2))," + > " COUNT(DISTINCT c) " + > "FROM MyTable " + > "GROUP BY b" > val data = new mutable.MutableList[(Int, Long, String)] > data.+=((1, 1L, "Hi")) > data.+=((2, 2L, "Hello")) > data.+=((3, 2L, "Hello world")) > data.+=((4, 3L, "Hello world, how are you?")) > data.+=((5, 3L, "I am fine.")) > data.+=((6, 3L, "Luke Skywalker")) > data.+=((7, 4L, "Comment#1")) > data.+=((8, 4L, "Comment#2")) > data.+=((9, 4L, "Comment#3")) > data.+=((10, 4L, "Comment#4")) > data.+=((11, 5L, "Comment#5")) > data.+=((12, 5L, "Comment#6")) > data.+=((13, 5L, "Comment#7")) > data.+=((14, 5L, "Comment#8")) > data.+=((15, 5L, "Comment#9")) > data.+=((16, 6L, "Comment#10")) > data.+=((17, 6L, "Comment#11")) > data.+=((18, 6L, "Comment#12")) > data.+=((19, 6L, "Comment#13")) > data.+=((20, 6L, "Comment#14")) > data.+=((21, 6L, "Comment#15")) > val t = env.fromCollection(data).toTable(tEnv).as('a, 'b, 'c) > tEnv.registerTable("MyTable", t) > tEnv.sqlQuery(sqlQuery).toDataSet[Row].print() > {code} > Fails with: > {code} > org.apache.flink.table.codegen.CodeGenException: Unsupported call: IS NOT > DISTINCT FROM > If you think this function should be supported, you can create an issue and > start a discussion for it. > at > org.apache.flink.table.codegen.CodeGenerator$$anonfun$visitCall$3.apply(CodeGenerator.scala:1027) > at > org.apache.flink.table.codegen.CodeGenerator$$anonfun$visitCall$3.apply(CodeGenerator.scala:1027) > at scala.Option.getOrElse(Option.scala:121) > at > org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:1027) > at > org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:66) > at org.apache.calcite.rex.RexCall.accept(RexCall.java:107) > at > org.apache.flink.table.codegen.CodeGenerator.generateExpression(CodeGenerator.scala:247) > at >
[jira] [Assigned] (FLINK-10855) CheckpointCoordinator does not delete checkpoint directory of late/failed checkpoints
[ https://issues.apache.org/jira/browse/FLINK-10855?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] vinoyang reassigned FLINK-10855: Assignee: vinoyang > CheckpointCoordinator does not delete checkpoint directory of late/failed > checkpoints > - > > Key: FLINK-10855 > URL: https://issues.apache.org/jira/browse/FLINK-10855 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.5.5, 1.6.2, 1.7.0 >Reporter: Till Rohrmann >Assignee: vinoyang >Priority: Major > > In case that an acknowledge checkpoint message is late or a checkpoint cannot > be acknowledged, we discard the subtask state in the > {{CheckpointCoordinator}}. What's not happening in this case is that we > delete the parent directory of the checkpoint. This only happens when we > dispose a {{PendingCheckpoint#dispose}}. > Due to this behaviour it can happen that a checkpoint fails (e.g. a task not > being ready) and we delete the checkpoint directory. Next another task writes > its checkpoint data to the checkpoint directory (thereby creating it again) > and sending an acknowledge message back to the {{CheckpointCoordinator}}. The > {{CheckpointCoordinator}} will realize that there is no longer a > {{PendingCheckpoint}} and will discard the sub task state. This will remove > the state files from the checkpoint directory but will leave the checkpoint > directory untouched. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8997) Add sliding window aggregation to the job
[ https://issues.apache.org/jira/browse/FLINK-8997?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16683414#comment-16683414 ] ASF GitHub Bot commented on FLINK-8997: --- kl0u commented on a change in pull request #7039: [FLINK-8997] Added sliding window aggregation to datastream test job URL: https://github.com/apache/flink/pull/7039#discussion_r232569338 ## File path: flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/SlidingWindowCheckMapper.java ## @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.tests; + +import org.apache.flink.api.common.functions.RichFlatMapFunction; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.ListTypeInfo; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.util.Collector; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Optional; + +/** + * This mapper validates sliding event time window. It checks each event belongs to appropriate number of consecutive + * windows. + */ +public class SlidingWindowCheckMapper extends RichFlatMapFunction>, String> { + + private static final long serialVersionUID = -744070793650644485L; + + /** This value state tracks previously seen events with the number of windows they appeared in. */ + private transient ValueState>> eventsSeenSoFar; + + private transient ValueState lastSequenceNumber; + + private final int slideFactor; + + SlidingWindowCheckMapper(int slideFactor) { + this.slideFactor = slideFactor; + } + + @Override + public void open(Configuration parameters) { + ValueStateDescriptor>> previousWindowDescriptor = + new ValueStateDescriptor<>("eventsSeenSoFar", + new ListTypeInfo<>(new TupleTypeInfo<>(TypeInformation.of(Event.class), BasicTypeInfo.INT_TYPE_INFO))); + + eventsSeenSoFar = getRuntimeContext().getState(previousWindowDescriptor); + + ValueStateDescriptor lastSequenceNumberDescriptor = + new ValueStateDescriptor<>("lastSequenceNumber", BasicTypeInfo.LONG_TYPE_INFO); + + lastSequenceNumber = getRuntimeContext().getState(lastSequenceNumberDescriptor); + } + + @Override + public void flatMap(Tuple2> value, Collector out) throws Exception { + List> previousWindowValues = Optional.ofNullable(eventsSeenSoFar.value()).orElseGet( + Collections::emptyList); + + List newValues = value.f1; + Optional lastEventInWindow = verifyWindowContiguity(newValues, out); + + Long lastSequenceNumberSeenSoFar = lastSequenceNumber.value(); + List> newWindows = + verifyPreviousOccurences(previousWindowValues, newValues, out, lastSequenceNumberSeenSoFar); + + if (lastEventInWindow.isPresent()) { + updateLastSeenSequenceNumber(lastEventInWindow.get(), lastSequenceNumberSeenSoFar); + } + + eventsSeenSoFar.update(newWindows); + } + + private void updateLastSeenSequenceNumber( + Event lastEventInWindow, + Long lastSequenceNumberSeenSoFar) throws IOException { + long lastSequenceNumberInWindow = lastEventInWindow.getSequenceNumber(); + if (lastSequenceNumberSeenSoFar == null || lastSequenceNumberInWindow > lastSequenceNumberSeenSoFar) { + lastSequenceNumber.update(lastSequenceNumberInWindow); + } + } + +
[GitHub] kl0u commented on a change in pull request #7039: [FLINK-8997] Added sliding window aggregation to datastream test job
kl0u commented on a change in pull request #7039: [FLINK-8997] Added sliding window aggregation to datastream test job URL: https://github.com/apache/flink/pull/7039#discussion_r232569338 ## File path: flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/SlidingWindowCheckMapper.java ## @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.tests; + +import org.apache.flink.api.common.functions.RichFlatMapFunction; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.ListTypeInfo; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.util.Collector; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Optional; + +/** + * This mapper validates sliding event time window. It checks each event belongs to appropriate number of consecutive + * windows. + */ +public class SlidingWindowCheckMapper extends RichFlatMapFunction>, String> { + + private static final long serialVersionUID = -744070793650644485L; + + /** This value state tracks previously seen events with the number of windows they appeared in. */ + private transient ValueState>> eventsSeenSoFar; + + private transient ValueState lastSequenceNumber; + + private final int slideFactor; + + SlidingWindowCheckMapper(int slideFactor) { + this.slideFactor = slideFactor; + } + + @Override + public void open(Configuration parameters) { + ValueStateDescriptor>> previousWindowDescriptor = + new ValueStateDescriptor<>("eventsSeenSoFar", + new ListTypeInfo<>(new TupleTypeInfo<>(TypeInformation.of(Event.class), BasicTypeInfo.INT_TYPE_INFO))); + + eventsSeenSoFar = getRuntimeContext().getState(previousWindowDescriptor); + + ValueStateDescriptor lastSequenceNumberDescriptor = + new ValueStateDescriptor<>("lastSequenceNumber", BasicTypeInfo.LONG_TYPE_INFO); + + lastSequenceNumber = getRuntimeContext().getState(lastSequenceNumberDescriptor); + } + + @Override + public void flatMap(Tuple2> value, Collector out) throws Exception { + List> previousWindowValues = Optional.ofNullable(eventsSeenSoFar.value()).orElseGet( + Collections::emptyList); + + List newValues = value.f1; + Optional lastEventInWindow = verifyWindowContiguity(newValues, out); + + Long lastSequenceNumberSeenSoFar = lastSequenceNumber.value(); + List> newWindows = + verifyPreviousOccurences(previousWindowValues, newValues, out, lastSequenceNumberSeenSoFar); + + if (lastEventInWindow.isPresent()) { + updateLastSeenSequenceNumber(lastEventInWindow.get(), lastSequenceNumberSeenSoFar); + } + + eventsSeenSoFar.update(newWindows); + } + + private void updateLastSeenSequenceNumber( + Event lastEventInWindow, + Long lastSequenceNumberSeenSoFar) throws IOException { + long lastSequenceNumberInWindow = lastEventInWindow.getSequenceNumber(); + if (lastSequenceNumberSeenSoFar == null || lastSequenceNumberInWindow > lastSequenceNumberSeenSoFar) { + lastSequenceNumber.update(lastSequenceNumberInWindow); + } + } + + /** +* Verifies if all values from previous windows appear in the new one. Returns union of all events seen so far that +* were not seen slideFactor number of times yet. +*/ + private List>
[jira] [Commented] (FLINK-8997) Add sliding window aggregation to the job
[ https://issues.apache.org/jira/browse/FLINK-8997?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16683418#comment-16683418 ] ASF GitHub Bot commented on FLINK-8997: --- kl0u commented on a change in pull request #7039: [FLINK-8997] Added sliding window aggregation to datastream test job URL: https://github.com/apache/flink/pull/7039#discussion_r232569563 ## File path: flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/SlidingWindowCheckMapper.java ## @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.tests; + +import org.apache.flink.api.common.functions.RichFlatMapFunction; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.ListTypeInfo; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.util.Collector; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Optional; + +/** + * This mapper validates sliding event time window. It checks each event belongs to appropriate number of consecutive + * windows. + */ +public class SlidingWindowCheckMapper extends RichFlatMapFunction>, String> { + + private static final long serialVersionUID = -744070793650644485L; + + /** This value state tracks previously seen events with the number of windows they appeared in. */ + private transient ValueState>> eventsSeenSoFar; + + private transient ValueState lastSequenceNumber; + + private final int slideFactor; + + SlidingWindowCheckMapper(int slideFactor) { + this.slideFactor = slideFactor; + } + + @Override + public void open(Configuration parameters) { + ValueStateDescriptor>> previousWindowDescriptor = + new ValueStateDescriptor<>("eventsSeenSoFar", + new ListTypeInfo<>(new TupleTypeInfo<>(TypeInformation.of(Event.class), BasicTypeInfo.INT_TYPE_INFO))); + + eventsSeenSoFar = getRuntimeContext().getState(previousWindowDescriptor); + + ValueStateDescriptor lastSequenceNumberDescriptor = + new ValueStateDescriptor<>("lastSequenceNumber", BasicTypeInfo.LONG_TYPE_INFO); + + lastSequenceNumber = getRuntimeContext().getState(lastSequenceNumberDescriptor); + } + + @Override + public void flatMap(Tuple2> value, Collector out) throws Exception { + List> previousWindowValues = Optional.ofNullable(eventsSeenSoFar.value()).orElseGet( + Collections::emptyList); + + List newValues = value.f1; + Optional lastEventInWindow = verifyWindowContiguity(newValues, out); + + Long lastSequenceNumberSeenSoFar = lastSequenceNumber.value(); + List> newWindows = + verifyPreviousOccurences(previousWindowValues, newValues, out, lastSequenceNumberSeenSoFar); + + if (lastEventInWindow.isPresent()) { + updateLastSeenSequenceNumber(lastEventInWindow.get(), lastSequenceNumberSeenSoFar); + } + + eventsSeenSoFar.update(newWindows); + } + + private void updateLastSeenSequenceNumber( + Event lastEventInWindow, + Long lastSequenceNumberSeenSoFar) throws IOException { + long lastSequenceNumberInWindow = lastEventInWindow.getSequenceNumber(); + if (lastSequenceNumberSeenSoFar == null || lastSequenceNumberInWindow > lastSequenceNumberSeenSoFar) { + lastSequenceNumber.update(lastSequenceNumberInWindow); + } + } + +
[GitHub] kl0u commented on a change in pull request #7039: [FLINK-8997] Added sliding window aggregation to datastream test job
kl0u commented on a change in pull request #7039: [FLINK-8997] Added sliding window aggregation to datastream test job URL: https://github.com/apache/flink/pull/7039#discussion_r232569563 ## File path: flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/SlidingWindowCheckMapper.java ## @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.tests; + +import org.apache.flink.api.common.functions.RichFlatMapFunction; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.ListTypeInfo; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.util.Collector; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Optional; + +/** + * This mapper validates sliding event time window. It checks each event belongs to appropriate number of consecutive + * windows. + */ +public class SlidingWindowCheckMapper extends RichFlatMapFunction>, String> { + + private static final long serialVersionUID = -744070793650644485L; + + /** This value state tracks previously seen events with the number of windows they appeared in. */ + private transient ValueState>> eventsSeenSoFar; + + private transient ValueState lastSequenceNumber; + + private final int slideFactor; + + SlidingWindowCheckMapper(int slideFactor) { + this.slideFactor = slideFactor; + } + + @Override + public void open(Configuration parameters) { + ValueStateDescriptor>> previousWindowDescriptor = + new ValueStateDescriptor<>("eventsSeenSoFar", + new ListTypeInfo<>(new TupleTypeInfo<>(TypeInformation.of(Event.class), BasicTypeInfo.INT_TYPE_INFO))); + + eventsSeenSoFar = getRuntimeContext().getState(previousWindowDescriptor); + + ValueStateDescriptor lastSequenceNumberDescriptor = + new ValueStateDescriptor<>("lastSequenceNumber", BasicTypeInfo.LONG_TYPE_INFO); + + lastSequenceNumber = getRuntimeContext().getState(lastSequenceNumberDescriptor); + } + + @Override + public void flatMap(Tuple2> value, Collector out) throws Exception { + List> previousWindowValues = Optional.ofNullable(eventsSeenSoFar.value()).orElseGet( + Collections::emptyList); + + List newValues = value.f1; + Optional lastEventInWindow = verifyWindowContiguity(newValues, out); + + Long lastSequenceNumberSeenSoFar = lastSequenceNumber.value(); + List> newWindows = + verifyPreviousOccurences(previousWindowValues, newValues, out, lastSequenceNumberSeenSoFar); + + if (lastEventInWindow.isPresent()) { + updateLastSeenSequenceNumber(lastEventInWindow.get(), lastSequenceNumberSeenSoFar); + } + + eventsSeenSoFar.update(newWindows); + } + + private void updateLastSeenSequenceNumber( + Event lastEventInWindow, + Long lastSequenceNumberSeenSoFar) throws IOException { + long lastSequenceNumberInWindow = lastEventInWindow.getSequenceNumber(); + if (lastSequenceNumberSeenSoFar == null || lastSequenceNumberInWindow > lastSequenceNumberSeenSoFar) { + lastSequenceNumber.update(lastSequenceNumberInWindow); + } + } + + /** +* Verifies if all values from previous windows appear in the new one. Returns union of all events seen so far that +* were not seen slideFactor number of times yet. +*/ + private List>
[jira] [Commented] (FLINK-8997) Add sliding window aggregation to the job
[ https://issues.apache.org/jira/browse/FLINK-8997?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16683419#comment-16683419 ] ASF GitHub Bot commented on FLINK-8997: --- kl0u commented on a change in pull request #7039: [FLINK-8997] Added sliding window aggregation to datastream test job URL: https://github.com/apache/flink/pull/7039#discussion_r232570916 ## File path: flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/SlidingWindowCheckMapper.java ## @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.tests; + +import org.apache.flink.api.common.functions.RichFlatMapFunction; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.ListTypeInfo; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.util.Collector; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Optional; + +/** + * This mapper validates sliding event time window. It checks each event belongs to appropriate number of consecutive + * windows. + */ +public class SlidingWindowCheckMapper extends RichFlatMapFunction>, String> { + + private static final long serialVersionUID = -744070793650644485L; + + /** This value state tracks previously seen events with the number of windows they appeared in. */ + private transient ValueState>> eventsSeenSoFar; + + private transient ValueState lastSequenceNumber; + + private final int slideFactor; + + SlidingWindowCheckMapper(int slideFactor) { + this.slideFactor = slideFactor; + } + + @Override + public void open(Configuration parameters) { + ValueStateDescriptor>> previousWindowDescriptor = + new ValueStateDescriptor<>("eventsSeenSoFar", + new ListTypeInfo<>(new TupleTypeInfo<>(TypeInformation.of(Event.class), BasicTypeInfo.INT_TYPE_INFO))); + + eventsSeenSoFar = getRuntimeContext().getState(previousWindowDescriptor); + + ValueStateDescriptor lastSequenceNumberDescriptor = + new ValueStateDescriptor<>("lastSequenceNumber", BasicTypeInfo.LONG_TYPE_INFO); + + lastSequenceNumber = getRuntimeContext().getState(lastSequenceNumberDescriptor); + } + + @Override + public void flatMap(Tuple2> value, Collector out) throws Exception { + List> previousWindowValues = Optional.ofNullable(eventsSeenSoFar.value()).orElseGet( + Collections::emptyList); + + List newValues = value.f1; + Optional lastEventInWindow = verifyWindowContiguity(newValues, out); + + Long lastSequenceNumberSeenSoFar = lastSequenceNumber.value(); + List> newWindows = + verifyPreviousOccurences(previousWindowValues, newValues, out, lastSequenceNumberSeenSoFar); + + if (lastEventInWindow.isPresent()) { + updateLastSeenSequenceNumber(lastEventInWindow.get(), lastSequenceNumberSeenSoFar); + } + + eventsSeenSoFar.update(newWindows); + } + + private void updateLastSeenSequenceNumber( + Event lastEventInWindow, + Long lastSequenceNumberSeenSoFar) throws IOException { + long lastSequenceNumberInWindow = lastEventInWindow.getSequenceNumber(); + if (lastSequenceNumberSeenSoFar == null || lastSequenceNumberInWindow > lastSequenceNumberSeenSoFar) { Review comment: if `(lastSequenceNumberInWindow < lastSequenceNumberSeenSoFar)` then we should fail the
[GitHub] kl0u commented on a change in pull request #7039: [FLINK-8997] Added sliding window aggregation to datastream test job
kl0u commented on a change in pull request #7039: [FLINK-8997] Added sliding window aggregation to datastream test job URL: https://github.com/apache/flink/pull/7039#discussion_r232570104 ## File path: flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/SlidingWindowCheckMapper.java ## @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.tests; + +import org.apache.flink.api.common.functions.RichFlatMapFunction; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.ListTypeInfo; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.util.Collector; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Optional; + +/** + * This mapper validates sliding event time window. It checks each event belongs to appropriate number of consecutive + * windows. + */ +public class SlidingWindowCheckMapper extends RichFlatMapFunction>, String> { + + private static final long serialVersionUID = -744070793650644485L; + + /** This value state tracks previously seen events with the number of windows they appeared in. */ + private transient ValueState>> eventsSeenSoFar; + + private transient ValueState lastSequenceNumber; + + private final int slideFactor; + + SlidingWindowCheckMapper(int slideFactor) { + this.slideFactor = slideFactor; + } + + @Override + public void open(Configuration parameters) { + ValueStateDescriptor>> previousWindowDescriptor = + new ValueStateDescriptor<>("eventsSeenSoFar", + new ListTypeInfo<>(new TupleTypeInfo<>(TypeInformation.of(Event.class), BasicTypeInfo.INT_TYPE_INFO))); + + eventsSeenSoFar = getRuntimeContext().getState(previousWindowDescriptor); + + ValueStateDescriptor lastSequenceNumberDescriptor = + new ValueStateDescriptor<>("lastSequenceNumber", BasicTypeInfo.LONG_TYPE_INFO); + + lastSequenceNumber = getRuntimeContext().getState(lastSequenceNumberDescriptor); + } + + @Override + public void flatMap(Tuple2> value, Collector out) throws Exception { + List> previousWindowValues = Optional.ofNullable(eventsSeenSoFar.value()).orElseGet( + Collections::emptyList); + + List newValues = value.f1; + Optional lastEventInWindow = verifyWindowContiguity(newValues, out); + + Long lastSequenceNumberSeenSoFar = lastSequenceNumber.value(); + List> newWindows = + verifyPreviousOccurences(previousWindowValues, newValues, out, lastSequenceNumberSeenSoFar); + + if (lastEventInWindow.isPresent()) { + updateLastSeenSequenceNumber(lastEventInWindow.get(), lastSequenceNumberSeenSoFar); + } + + eventsSeenSoFar.update(newWindows); + } + + private void updateLastSeenSequenceNumber( + Event lastEventInWindow, + Long lastSequenceNumberSeenSoFar) throws IOException { + long lastSequenceNumberInWindow = lastEventInWindow.getSequenceNumber(); + if (lastSequenceNumberSeenSoFar == null || lastSequenceNumberInWindow > lastSequenceNumberSeenSoFar) { + lastSequenceNumber.update(lastSequenceNumberInWindow); + } + } + + /** +* Verifies if all values from previous windows appear in the new one. Returns union of all events seen so far that +* were not seen slideFactor number of times yet. +*/ + private List>
[jira] [Commented] (FLINK-8997) Add sliding window aggregation to the job
[ https://issues.apache.org/jira/browse/FLINK-8997?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16683416#comment-16683416 ] ASF GitHub Bot commented on FLINK-8997: --- kl0u commented on a change in pull request #7039: [FLINK-8997] Added sliding window aggregation to datastream test job URL: https://github.com/apache/flink/pull/7039#discussion_r232570104 ## File path: flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/SlidingWindowCheckMapper.java ## @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.tests; + +import org.apache.flink.api.common.functions.RichFlatMapFunction; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.ListTypeInfo; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.util.Collector; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Optional; + +/** + * This mapper validates sliding event time window. It checks each event belongs to appropriate number of consecutive + * windows. + */ +public class SlidingWindowCheckMapper extends RichFlatMapFunction>, String> { + + private static final long serialVersionUID = -744070793650644485L; + + /** This value state tracks previously seen events with the number of windows they appeared in. */ + private transient ValueState>> eventsSeenSoFar; + + private transient ValueState lastSequenceNumber; + + private final int slideFactor; + + SlidingWindowCheckMapper(int slideFactor) { + this.slideFactor = slideFactor; + } + + @Override + public void open(Configuration parameters) { + ValueStateDescriptor>> previousWindowDescriptor = + new ValueStateDescriptor<>("eventsSeenSoFar", + new ListTypeInfo<>(new TupleTypeInfo<>(TypeInformation.of(Event.class), BasicTypeInfo.INT_TYPE_INFO))); + + eventsSeenSoFar = getRuntimeContext().getState(previousWindowDescriptor); + + ValueStateDescriptor lastSequenceNumberDescriptor = + new ValueStateDescriptor<>("lastSequenceNumber", BasicTypeInfo.LONG_TYPE_INFO); + + lastSequenceNumber = getRuntimeContext().getState(lastSequenceNumberDescriptor); + } + + @Override + public void flatMap(Tuple2> value, Collector out) throws Exception { + List> previousWindowValues = Optional.ofNullable(eventsSeenSoFar.value()).orElseGet( + Collections::emptyList); + + List newValues = value.f1; + Optional lastEventInWindow = verifyWindowContiguity(newValues, out); + + Long lastSequenceNumberSeenSoFar = lastSequenceNumber.value(); + List> newWindows = + verifyPreviousOccurences(previousWindowValues, newValues, out, lastSequenceNumberSeenSoFar); + + if (lastEventInWindow.isPresent()) { + updateLastSeenSequenceNumber(lastEventInWindow.get(), lastSequenceNumberSeenSoFar); + } + + eventsSeenSoFar.update(newWindows); + } + + private void updateLastSeenSequenceNumber( + Event lastEventInWindow, + Long lastSequenceNumberSeenSoFar) throws IOException { + long lastSequenceNumberInWindow = lastEventInWindow.getSequenceNumber(); + if (lastSequenceNumberSeenSoFar == null || lastSequenceNumberInWindow > lastSequenceNumberSeenSoFar) { + lastSequenceNumber.update(lastSequenceNumberInWindow); + } + } + +
[jira] [Commented] (FLINK-8997) Add sliding window aggregation to the job
[ https://issues.apache.org/jira/browse/FLINK-8997?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16683417#comment-16683417 ] ASF GitHub Bot commented on FLINK-8997: --- kl0u commented on a change in pull request #7039: [FLINK-8997] Added sliding window aggregation to datastream test job URL: https://github.com/apache/flink/pull/7039#discussion_r232570322 ## File path: flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/SlidingWindowCheckMapper.java ## @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.tests; + +import org.apache.flink.api.common.functions.RichFlatMapFunction; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.ListTypeInfo; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.util.Collector; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Optional; + +/** + * This mapper validates sliding event time window. It checks each event belongs to appropriate number of consecutive + * windows. + */ +public class SlidingWindowCheckMapper extends RichFlatMapFunction>, String> { + + private static final long serialVersionUID = -744070793650644485L; + + /** This value state tracks previously seen events with the number of windows they appeared in. */ + private transient ValueState>> eventsSeenSoFar; + + private transient ValueState lastSequenceNumber; + + private final int slideFactor; + + SlidingWindowCheckMapper(int slideFactor) { + this.slideFactor = slideFactor; + } + + @Override + public void open(Configuration parameters) { + ValueStateDescriptor>> previousWindowDescriptor = + new ValueStateDescriptor<>("eventsSeenSoFar", + new ListTypeInfo<>(new TupleTypeInfo<>(TypeInformation.of(Event.class), BasicTypeInfo.INT_TYPE_INFO))); + + eventsSeenSoFar = getRuntimeContext().getState(previousWindowDescriptor); + + ValueStateDescriptor lastSequenceNumberDescriptor = + new ValueStateDescriptor<>("lastSequenceNumber", BasicTypeInfo.LONG_TYPE_INFO); + + lastSequenceNumber = getRuntimeContext().getState(lastSequenceNumberDescriptor); + } + + @Override + public void flatMap(Tuple2> value, Collector out) throws Exception { + List> previousWindowValues = Optional.ofNullable(eventsSeenSoFar.value()).orElseGet( + Collections::emptyList); + + List newValues = value.f1; + Optional lastEventInWindow = verifyWindowContiguity(newValues, out); + + Long lastSequenceNumberSeenSoFar = lastSequenceNumber.value(); + List> newWindows = + verifyPreviousOccurences(previousWindowValues, newValues, out, lastSequenceNumberSeenSoFar); + + if (lastEventInWindow.isPresent()) { + updateLastSeenSequenceNumber(lastEventInWindow.get(), lastSequenceNumberSeenSoFar); + } + + eventsSeenSoFar.update(newWindows); + } + + private void updateLastSeenSequenceNumber( + Event lastEventInWindow, + Long lastSequenceNumberSeenSoFar) throws IOException { + long lastSequenceNumberInWindow = lastEventInWindow.getSequenceNumber(); + if (lastSequenceNumberSeenSoFar == null || lastSequenceNumberInWindow > lastSequenceNumberSeenSoFar) { + lastSequenceNumber.update(lastSequenceNumberInWindow); + } + } + +
[GitHub] kl0u commented on a change in pull request #7039: [FLINK-8997] Added sliding window aggregation to datastream test job
kl0u commented on a change in pull request #7039: [FLINK-8997] Added sliding window aggregation to datastream test job URL: https://github.com/apache/flink/pull/7039#discussion_r232570726 ## File path: flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/SlidingWindowCheckMapper.java ## @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.tests; + +import org.apache.flink.api.common.functions.RichFlatMapFunction; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.ListTypeInfo; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.util.Collector; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Optional; + +/** + * This mapper validates sliding event time window. It checks each event belongs to appropriate number of consecutive + * windows. + */ +public class SlidingWindowCheckMapper extends RichFlatMapFunction>, String> { + + private static final long serialVersionUID = -744070793650644485L; + + /** This value state tracks previously seen events with the number of windows they appeared in. */ + private transient ValueState>> eventsSeenSoFar; + + private transient ValueState lastSequenceNumber; + + private final int slideFactor; + + SlidingWindowCheckMapper(int slideFactor) { + this.slideFactor = slideFactor; + } + + @Override + public void open(Configuration parameters) { + ValueStateDescriptor>> previousWindowDescriptor = + new ValueStateDescriptor<>("eventsSeenSoFar", + new ListTypeInfo<>(new TupleTypeInfo<>(TypeInformation.of(Event.class), BasicTypeInfo.INT_TYPE_INFO))); + + eventsSeenSoFar = getRuntimeContext().getState(previousWindowDescriptor); + + ValueStateDescriptor lastSequenceNumberDescriptor = + new ValueStateDescriptor<>("lastSequenceNumber", BasicTypeInfo.LONG_TYPE_INFO); + + lastSequenceNumber = getRuntimeContext().getState(lastSequenceNumberDescriptor); + } + + @Override + public void flatMap(Tuple2> value, Collector out) throws Exception { + List> previousWindowValues = Optional.ofNullable(eventsSeenSoFar.value()).orElseGet( + Collections::emptyList); + + List newValues = value.f1; + Optional lastEventInWindow = verifyWindowContiguity(newValues, out); + + Long lastSequenceNumberSeenSoFar = lastSequenceNumber.value(); + List> newWindows = + verifyPreviousOccurences(previousWindowValues, newValues, out, lastSequenceNumberSeenSoFar); + + if (lastEventInWindow.isPresent()) { + updateLastSeenSequenceNumber(lastEventInWindow.get(), lastSequenceNumberSeenSoFar); + } + + eventsSeenSoFar.update(newWindows); + } + + private void updateLastSeenSequenceNumber( + Event lastEventInWindow, + Long lastSequenceNumberSeenSoFar) throws IOException { + long lastSequenceNumberInWindow = lastEventInWindow.getSequenceNumber(); + if (lastSequenceNumberSeenSoFar == null || lastSequenceNumberInWindow > lastSequenceNumberSeenSoFar) { + lastSequenceNumber.update(lastSequenceNumberInWindow); + } + } + + /** +* Verifies if all values from previous windows appear in the new one. Returns union of all events seen so far that +* were not seen slideFactor number of times yet. +*/ + private List>
[GitHub] kl0u commented on a change in pull request #7039: [FLINK-8997] Added sliding window aggregation to datastream test job
kl0u commented on a change in pull request #7039: [FLINK-8997] Added sliding window aggregation to datastream test job URL: https://github.com/apache/flink/pull/7039#discussion_r232570322 ## File path: flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/SlidingWindowCheckMapper.java ## @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.tests; + +import org.apache.flink.api.common.functions.RichFlatMapFunction; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.ListTypeInfo; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.util.Collector; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Optional; + +/** + * This mapper validates sliding event time window. It checks each event belongs to appropriate number of consecutive + * windows. + */ +public class SlidingWindowCheckMapper extends RichFlatMapFunction>, String> { + + private static final long serialVersionUID = -744070793650644485L; + + /** This value state tracks previously seen events with the number of windows they appeared in. */ + private transient ValueState>> eventsSeenSoFar; + + private transient ValueState lastSequenceNumber; + + private final int slideFactor; + + SlidingWindowCheckMapper(int slideFactor) { + this.slideFactor = slideFactor; + } + + @Override + public void open(Configuration parameters) { + ValueStateDescriptor>> previousWindowDescriptor = + new ValueStateDescriptor<>("eventsSeenSoFar", + new ListTypeInfo<>(new TupleTypeInfo<>(TypeInformation.of(Event.class), BasicTypeInfo.INT_TYPE_INFO))); + + eventsSeenSoFar = getRuntimeContext().getState(previousWindowDescriptor); + + ValueStateDescriptor lastSequenceNumberDescriptor = + new ValueStateDescriptor<>("lastSequenceNumber", BasicTypeInfo.LONG_TYPE_INFO); + + lastSequenceNumber = getRuntimeContext().getState(lastSequenceNumberDescriptor); + } + + @Override + public void flatMap(Tuple2> value, Collector out) throws Exception { + List> previousWindowValues = Optional.ofNullable(eventsSeenSoFar.value()).orElseGet( + Collections::emptyList); + + List newValues = value.f1; + Optional lastEventInWindow = verifyWindowContiguity(newValues, out); + + Long lastSequenceNumberSeenSoFar = lastSequenceNumber.value(); + List> newWindows = + verifyPreviousOccurences(previousWindowValues, newValues, out, lastSequenceNumberSeenSoFar); + + if (lastEventInWindow.isPresent()) { + updateLastSeenSequenceNumber(lastEventInWindow.get(), lastSequenceNumberSeenSoFar); + } + + eventsSeenSoFar.update(newWindows); + } + + private void updateLastSeenSequenceNumber( + Event lastEventInWindow, + Long lastSequenceNumberSeenSoFar) throws IOException { + long lastSequenceNumberInWindow = lastEventInWindow.getSequenceNumber(); + if (lastSequenceNumberSeenSoFar == null || lastSequenceNumberInWindow > lastSequenceNumberSeenSoFar) { + lastSequenceNumber.update(lastSequenceNumberInWindow); + } + } + + /** +* Verifies if all values from previous windows appear in the new one. Returns union of all events seen so far that +* were not seen slideFactor number of times yet. +*/ + private List>
[GitHub] kl0u commented on a change in pull request #7039: [FLINK-8997] Added sliding window aggregation to datastream test job
kl0u commented on a change in pull request #7039: [FLINK-8997] Added sliding window aggregation to datastream test job URL: https://github.com/apache/flink/pull/7039#discussion_r232570916 ## File path: flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/SlidingWindowCheckMapper.java ## @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.tests; + +import org.apache.flink.api.common.functions.RichFlatMapFunction; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.ListTypeInfo; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.util.Collector; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Optional; + +/** + * This mapper validates sliding event time window. It checks each event belongs to appropriate number of consecutive + * windows. + */ +public class SlidingWindowCheckMapper extends RichFlatMapFunction>, String> { + + private static final long serialVersionUID = -744070793650644485L; + + /** This value state tracks previously seen events with the number of windows they appeared in. */ + private transient ValueState>> eventsSeenSoFar; + + private transient ValueState lastSequenceNumber; + + private final int slideFactor; + + SlidingWindowCheckMapper(int slideFactor) { + this.slideFactor = slideFactor; + } + + @Override + public void open(Configuration parameters) { + ValueStateDescriptor>> previousWindowDescriptor = + new ValueStateDescriptor<>("eventsSeenSoFar", + new ListTypeInfo<>(new TupleTypeInfo<>(TypeInformation.of(Event.class), BasicTypeInfo.INT_TYPE_INFO))); + + eventsSeenSoFar = getRuntimeContext().getState(previousWindowDescriptor); + + ValueStateDescriptor lastSequenceNumberDescriptor = + new ValueStateDescriptor<>("lastSequenceNumber", BasicTypeInfo.LONG_TYPE_INFO); + + lastSequenceNumber = getRuntimeContext().getState(lastSequenceNumberDescriptor); + } + + @Override + public void flatMap(Tuple2> value, Collector out) throws Exception { + List> previousWindowValues = Optional.ofNullable(eventsSeenSoFar.value()).orElseGet( + Collections::emptyList); + + List newValues = value.f1; + Optional lastEventInWindow = verifyWindowContiguity(newValues, out); + + Long lastSequenceNumberSeenSoFar = lastSequenceNumber.value(); + List> newWindows = + verifyPreviousOccurences(previousWindowValues, newValues, out, lastSequenceNumberSeenSoFar); + + if (lastEventInWindow.isPresent()) { + updateLastSeenSequenceNumber(lastEventInWindow.get(), lastSequenceNumberSeenSoFar); + } + + eventsSeenSoFar.update(newWindows); + } + + private void updateLastSeenSequenceNumber( + Event lastEventInWindow, + Long lastSequenceNumberSeenSoFar) throws IOException { + long lastSequenceNumberInWindow = lastEventInWindow.getSequenceNumber(); + if (lastSequenceNumberSeenSoFar == null || lastSequenceNumberInWindow > lastSequenceNumberSeenSoFar) { Review comment: if `(lastSequenceNumberInWindow < lastSequenceNumberSeenSoFar)` then we should fail the test, right? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For
[jira] [Commented] (FLINK-8997) Add sliding window aggregation to the job
[ https://issues.apache.org/jira/browse/FLINK-8997?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16683415#comment-16683415 ] ASF GitHub Bot commented on FLINK-8997: --- kl0u commented on a change in pull request #7039: [FLINK-8997] Added sliding window aggregation to datastream test job URL: https://github.com/apache/flink/pull/7039#discussion_r232570726 ## File path: flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/SlidingWindowCheckMapper.java ## @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.tests; + +import org.apache.flink.api.common.functions.RichFlatMapFunction; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.ListTypeInfo; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.util.Collector; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Optional; + +/** + * This mapper validates sliding event time window. It checks each event belongs to appropriate number of consecutive + * windows. + */ +public class SlidingWindowCheckMapper extends RichFlatMapFunction>, String> { + + private static final long serialVersionUID = -744070793650644485L; + + /** This value state tracks previously seen events with the number of windows they appeared in. */ + private transient ValueState>> eventsSeenSoFar; + + private transient ValueState lastSequenceNumber; + + private final int slideFactor; + + SlidingWindowCheckMapper(int slideFactor) { + this.slideFactor = slideFactor; + } + + @Override + public void open(Configuration parameters) { + ValueStateDescriptor>> previousWindowDescriptor = + new ValueStateDescriptor<>("eventsSeenSoFar", + new ListTypeInfo<>(new TupleTypeInfo<>(TypeInformation.of(Event.class), BasicTypeInfo.INT_TYPE_INFO))); + + eventsSeenSoFar = getRuntimeContext().getState(previousWindowDescriptor); + + ValueStateDescriptor lastSequenceNumberDescriptor = + new ValueStateDescriptor<>("lastSequenceNumber", BasicTypeInfo.LONG_TYPE_INFO); + + lastSequenceNumber = getRuntimeContext().getState(lastSequenceNumberDescriptor); + } + + @Override + public void flatMap(Tuple2> value, Collector out) throws Exception { + List> previousWindowValues = Optional.ofNullable(eventsSeenSoFar.value()).orElseGet( + Collections::emptyList); + + List newValues = value.f1; + Optional lastEventInWindow = verifyWindowContiguity(newValues, out); + + Long lastSequenceNumberSeenSoFar = lastSequenceNumber.value(); + List> newWindows = + verifyPreviousOccurences(previousWindowValues, newValues, out, lastSequenceNumberSeenSoFar); + + if (lastEventInWindow.isPresent()) { + updateLastSeenSequenceNumber(lastEventInWindow.get(), lastSequenceNumberSeenSoFar); + } + + eventsSeenSoFar.update(newWindows); + } + + private void updateLastSeenSequenceNumber( + Event lastEventInWindow, + Long lastSequenceNumberSeenSoFar) throws IOException { + long lastSequenceNumberInWindow = lastEventInWindow.getSequenceNumber(); + if (lastSequenceNumberSeenSoFar == null || lastSequenceNumberInWindow > lastSequenceNumberSeenSoFar) { + lastSequenceNumber.update(lastSequenceNumberInWindow); + } + } + +
[jira] [Commented] (FLINK-10826) Heavy deployment end-to-end test produces no output on Travis
[ https://issues.apache.org/jira/browse/FLINK-10826?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16683431#comment-16683431 ] ASF GitHub Bot commented on FLINK-10826: twalthr commented on issue #7066: [FLINK-10826][test] Decrease deployment size of heavy deplyment e2e t… URL: https://github.com/apache/flink/pull/7066#issuecomment-437803788 Thanks @StefanRRichter I will address @zentol comments and merge this to unblock #7045. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Heavy deployment end-to-end test produces no output on Travis > - > > Key: FLINK-10826 > URL: https://issues.apache.org/jira/browse/FLINK-10826 > Project: Flink > Issue Type: Bug > Components: E2E Tests >Affects Versions: 1.7.0 >Reporter: Timo Walther >Assignee: Stefan Richter >Priority: Critical > Labels: pull-request-available > Attachments: heavy_deployement_log.txt > > > The Heavy deployment end-to-end test produces no output on Travis such that > it is killed after 10 minutes. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10736) Shaded Hadoop S3A end-to-end test failed on Travis
[ https://issues.apache.org/jira/browse/FLINK-10736?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16683457#comment-16683457 ] Andrey Zagrebin commented on FLINK-10736: - I uploaded the words file as public s3 object in Flink testing: [https://s3.eu-central-1.amazonaws.com/flink-e2e-tests/words] or s3://flink-e2e-tests/words > Shaded Hadoop S3A end-to-end test failed on Travis > -- > > Key: FLINK-10736 > URL: https://issues.apache.org/jira/browse/FLINK-10736 > Project: Flink > Issue Type: Bug > Components: E2E Tests >Affects Versions: 1.7.0 >Reporter: Till Rohrmann >Assignee: Andrey Zagrebin >Priority: Critical > Labels: pull-request-available, test-stability > Fix For: 1.7.0 > > > The {{Shaded Hadoop S3A end-to-end test}} failed on Travis because it could > not find a file stored on S3: > {code} > org.apache.flink.client.program.ProgramInvocationException: Job failed. > (JobID: f28270bedd943ed6b41548b60f5cea73) > at > org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:268) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:487) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:475) > at > org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:62) > at > org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:85) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529) > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427) > at > org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:813) > at > org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:287) > at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213) > at > org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050) > at > org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836) > at > org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126) > Caused by: org.apache.flink.runtime.client.JobExecutionException: Job > execution failed. > at > org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146) > at > org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:265) > ... 21 more > Caused by: java.io.IOException: Error opening the Input Split > s3://[secure]/flink-end-to-end-test-shaded-s3a [0,44]: No such file or > directory: s3://[secure]/flink-end-to-end-test-shaded-s3a > at > org.apache.flink.api.common.io.FileInputFormat.open(FileInputFormat.java:824) > at > org.apache.flink.api.common.io.DelimitedInputFormat.open(DelimitedInputFormat.java:470) > at > org.apache.flink.api.common.io.DelimitedInputFormat.open(DelimitedInputFormat.java:47) > at > org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:170) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.io.FileNotFoundException: No such file or directory: > s3://[secure]/flink-end-to-end-test-shaded-s3a > at > org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:2255) > at > org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:2149) > at > org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:2088) > at > org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.open(S3AFileSystem.java:699) > at > org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.FileSystem.open(FileSystem.java:950) > at >
[jira] [Commented] (FLINK-10809) Using DataStreamUtils.reinterpretAsKeyedStream produces corrupted keyed state after restore
[ https://issues.apache.org/jira/browse/FLINK-10809?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16683474#comment-16683474 ] ASF GitHub Bot commented on FLINK-10809: StefanRRichter commented on a change in pull request #7048: [FLINK-10809][state] Include keyed state that is not from head operat… URL: https://github.com/apache/flink/pull/7048#discussion_r232590240 ## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/datastream/ReinterpretDataStreamAsKeyedStreamITCase.java ## @@ -227,5 +293,22 @@ public void close() throws Exception { Assert.assertEquals(expectedSum, runningSum); super.close(); } + + @Override + public void snapshotState(FunctionSnapshotContext context) throws Exception { + sumState.add(runningSum); + } + + @Override + public void initializeState(FunctionInitializationContext context) throws Exception { + sumState = context.getOperatorStateStore().getListState( + new ListStateDescriptor<>("sumState", Integer.class)); + + if (context.isRestored()) { Review comment: Through through the restore flag and a validating sink? If state was dropped, the validation would go wrong. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Using DataStreamUtils.reinterpretAsKeyedStream produces corrupted keyed state > after restore > --- > > Key: FLINK-10809 > URL: https://issues.apache.org/jira/browse/FLINK-10809 > Project: Flink > Issue Type: Bug > Components: DataStream API, State Backends, Checkpointing >Affects Versions: 1.7.0 >Reporter: Dawid Wysakowicz >Assignee: Stefan Richter >Priority: Major > Labels: pull-request-available > > I've tried using {{DataStreamUtils.reinterpretAsKeyedStream}} for results of > windowed aggregation: > {code} > DataStream>> eventStream4 = > eventStream2.keyBy(Event::getKey) > > .window(SlidingEventTimeWindows.of(Time.milliseconds(150 * 3), > Time.milliseconds(150))) > .apply(new WindowFunction List>, Integer, TimeWindow>() { > private static final long serialVersionUID = > 3166250579972849440L; > @Override > public void apply( > Integer key, TimeWindow window, > Iterable input, > Collector>> > out) throws Exception { > out.collect(Tuple2.of(key, > StreamSupport.stream(input.spliterator(), > false).collect(Collectors.toList(; > } > }); > DataStreamUtils.reinterpretAsKeyedStream(eventStream4, events-> > events.f0) > .flatMap(createSlidingWindowCheckMapper(pt)) > .addSink(new PrintSinkFunction<>()); > {code} > and then in the createSlidingWindowCheckMapper I verify that each event > belongs to 3 consecutive windows, for which I keep contents of last window in > ValueState. In a non-failure setup this check runs fine, but it misses few > windows after restore at the beginning. > {code} > public class SlidingWindowCheckMapper extends > RichFlatMapFunction>, String> { > private static final long serialVersionUID = -744070793650644485L; > /** This value state tracks previously seen events with the number of > windows they appeared in. */ > private transient ValueState>> > previousWindow; > private final int slideFactor; > SlidingWindowCheckMapper(int slideFactor) { > this.slideFactor = slideFactor; > } > @Override > public void open(Configuration parameters) throws Exception { > ValueStateDescriptor>> > previousWindowDescriptor = > new ValueStateDescriptor<>("previousWindow", > new ListTypeInfo<>(new > TupleTypeInfo<>(TypeInformation.of(Event.class), > BasicTypeInfo.INT_TYPE_INFO))); > previousWindow = > getRuntimeContext().getState(previousWindowDescriptor); > } > @Override > public void flatMap(Tuple2> value, > Collector out) throws Exception { > List> previousWindowValues = >
[GitHub] yiduwangkai opened a new pull request #7080: flink sqlUpdate support complex insert
yiduwangkai opened a new pull request #7080: flink sqlUpdate support complex insert URL: https://github.com/apache/flink/pull/7080 my code is `tableEnv.sqlUpdate("insert into kafka.sdkafka.product_4 select filedName1, filedName2 from kafka.sdkafka.order_4");` but flink give me error info, said kafka "No table was registered under the name kafka" i modify the code ,that is ok now TableEnvironment.scala ` def sqlUpdate(stmt: String, config: QueryConfig): Unit = { val planner = new FlinkPlannerImpl(getFrameworkConfig, getPlanner, getTypeFactory) // parse the sql query val parsed = planner.parse(stmt) parsed match { case insert: SqlInsert => // validate the SQL query val query = insert.getSource val validatedQuery = planner.validate(query) // get query result as Table val queryResult = new Table(this, LogicalRelNode(planner.rel(validatedQuery).rel)) // get name of sink table val targetTableName = insert.getTargetTable.asInstanceOf[SqlIdentifier].names.get(0) // insert query result into sink table insertInto(queryResult, targetTableName, config) case _ => throw new TableException( "Unsupported SQL query! sqlUpdate() only accepts SQL statements of type INSERT.") } }` should modify to this `def sqlUpdate(stmt: String, config: QueryConfig): Unit = { val planner = new FlinkPlannerImpl(getFrameworkConfig, getPlanner, getTypeFactory) // parse the sql query val parsed = planner.parse(stmt) parsed match { case insert: SqlInsert => // validate the SQL query val query = insert.getSource val validatedQuery = planner.validate(query) // get query result as Table val queryResult = new Table(this, LogicalRelNode(planner.rel(validatedQuery).rel)) // get name of sink table //val targetTableName = insert.getTargetTable.asInstanceOf[SqlIdentifier].names.get(0) val targetTableName = insert.getTargetTable.toString // insert query result into sink table insertInto(queryResult, targetTableName, config) case _ => throw new TableException( "Unsupported SQL query! sqlUpdate() only accepts SQL statements of type INSERT.") } }` i hope this can be acceptted, thx This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] StefanRRichter commented on a change in pull request #7048: [FLINK-10809][state] Include keyed state that is not from head operat…
StefanRRichter commented on a change in pull request #7048: [FLINK-10809][state] Include keyed state that is not from head operat… URL: https://github.com/apache/flink/pull/7048#discussion_r232590240 ## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/datastream/ReinterpretDataStreamAsKeyedStreamITCase.java ## @@ -227,5 +293,22 @@ public void close() throws Exception { Assert.assertEquals(expectedSum, runningSum); super.close(); } + + @Override + public void snapshotState(FunctionSnapshotContext context) throws Exception { + sumState.add(runningSum); + } + + @Override + public void initializeState(FunctionInitializationContext context) throws Exception { + sumState = context.getOperatorStateStore().getListState( + new ListStateDescriptor<>("sumState", Integer.class)); + + if (context.isRestored()) { Review comment: Through through the restore flag and a validating sink? If state was dropped, the validation would go wrong. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-10851) sqlUpdate support complex insert grammar
frank wang created FLINK-10851: -- Summary: sqlUpdate support complex insert grammar Key: FLINK-10851 URL: https://issues.apache.org/jira/browse/FLINK-10851 Project: Flink Issue Type: Bug Reporter: frank wang my code is {{tableEnv.sqlUpdate("insert into kafka.sdkafka.product_4 select filedName1, filedName2 from kafka.sdkafka.order_4");}} but flink give me error info, said kafka "No table was registered under the name kafka" i modify the code ,that is ok now TableEnvironment.scala {code:java} def sqlUpdate(stmt: String, config: QueryConfig): Unit = { val planner = new FlinkPlannerImpl(getFrameworkConfig, getPlanner, getTypeFactory) // parse the sql query val parsed = planner.parse(stmt) parsed match { case insert: SqlInsert => // validate the SQL query val query = insert.getSource val validatedQuery = planner.validate(query) // get query result as Table val queryResult = new Table(this, LogicalRelNode(planner.rel(validatedQuery).rel)) // get name of sink table val targetTableName = insert.getTargetTable.asInstanceOf[SqlIdentifier].names.get(0) // insert query result into sink table insertInto(queryResult, targetTableName, config) case _ => throw new TableException( "Unsupported SQL query! sqlUpdate() only accepts SQL statements of type INSERT.") } } {code} should modify to this {code:java} def sqlUpdate(stmt: String, config: QueryConfig): Unit = { val planner = new FlinkPlannerImpl(getFrameworkConfig, getPlanner, getTypeFactory) // parse the sql query val parsed = planner.parse(stmt) parsed match { case insert: SqlInsert => // validate the SQL query val query = insert.getSource val validatedQuery = planner.validate(query) // get query result as Table val queryResult = new Table(this, LogicalRelNode(planner.rel(validatedQuery).rel)) // get name of sink table //val targetTableName = insert.getTargetTable.asInstanceOf[SqlIdentifier].names.get(0) val targetTableName = insert.getTargetTable.toString // insert query result into sink table insertInto(queryResult, targetTableName, config) case _ => throw new TableException( "Unsupported SQL query! sqlUpdate() only accepts SQL statements of type INSERT.") } } {code} i hope this can be acceptted, thx -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] StefanRRichter commented on a change in pull request #7061: [FLINK-10827][tests] Add test for duplicate() to SerializerTestBase
StefanRRichter commented on a change in pull request #7061: [FLINK-10827][tests] Add test for duplicate() to SerializerTestBase URL: https://github.com/apache/flink/pull/7061#discussion_r232572989 ## File path: flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java ## @@ -436,6 +441,36 @@ public void testNullability() { } } + @Test + public void testDuplicate() throws Exception { + final int numThreads = 10; + final TypeSerializer serializer = getSerializer(); + final OneShotLatch startLatch = new OneShotLatch(); + final List> concurrentRunners = new ArrayList<>(numThreads); + Assert.assertEquals(serializer, serializer.duplicate()); + + T[] testData = getData(); + final int testDataIterations = Math.max(1, 250 / testData.length); + + for (int i = 0; i < numThreads; ++i) { + SerializerRunner runner = new SerializerRunner<>( + startLatch, Review comment: What would be the benefit? This feels like it does exactly what we want. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] asfgit closed pull request #7066: [FLINK-10826][test] Decrease deployment size of heavy deplyment e2e t…
asfgit closed pull request #7066: [FLINK-10826][test] Decrease deployment size of heavy deplyment e2e t… URL: https://github.com/apache/flink/pull/7066 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-end-to-end-tests/test-scripts/test_heavy_deployment.sh b/flink-end-to-end-tests/test-scripts/test_heavy_deployment.sh index 895e4a7f302..b4646fc3260 100755 --- a/flink-end-to-end-tests/test-scripts/test_heavy_deployment.sh +++ b/flink-end-to-end-tests/test-scripts/test_heavy_deployment.sh @@ -25,7 +25,7 @@ TEST=flink-heavy-deployment-stress-test TEST_PROGRAM_NAME=HeavyDeploymentStressTestProgram TEST_PROGRAM_JAR=${END_TO_END_DIR}/$TEST/target/$TEST_PROGRAM_NAME.jar -set_conf "taskmanager.heap.mb" "256" # 256Mb x 20TMs = 5Gb total heap +set_conf "taskmanager.heap.mb" "512" # 512Mb x 10TMs = 5Gb total heap set_conf "taskmanager.memory.size" "8" # 8Mb set_conf "taskmanager.network.memory.min" "8mb" @@ -35,12 +35,12 @@ set_conf "taskmanager.memory.segment-size" "8kb" set_conf "taskmanager.numberOfTaskSlots" "10" # 10 slots per TM start_cluster # this also starts 1TM -start_taskmanagers 19 # 1TM + 19TM = 20TM a 10 slots = 200 slots +start_taskmanagers 9 # 1TM + 9TM = 10TM a 10 slots = 100 slots -# This call will result in a deployment with state meta data of 200 x 200 x 50 union states x each 75 entries. +# This call will result in a deployment with state meta data of 100 x 100 x 50 union states x each 100 entries. # We can scale up the numbers to make the test even heavier. $FLINK_DIR/bin/flink run ${TEST_PROGRAM_JAR} \ ---environment.max_parallelism 1024 --environment.parallelism 200 \ +--environment.max_parallelism 1024 --environment.parallelism 100 \ --environment.restart_strategy fixed_delay --environment.restart_strategy.fixed_delay.attempts 3 \ --state_backend.checkpoint_directory ${CHECKPOINT_DIR} \ ---heavy_deployment_test.num_list_states_per_op 50 --heavy_deployment_test.num_partitions_per_list_state 75 +--heavy_deployment_test.num_list_states_per_op 50 --heavy_deployment_test.num_partitions_per_list_state 100 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10634) End-to-end test: Metrics accessible via REST API
[ https://issues.apache.org/jira/browse/FLINK-10634?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16683485#comment-16683485 ] ASF GitHub Bot commented on FLINK-10634: kl0u commented on a change in pull request #7056: [FLINK-10634][metrics][rest] Add metrics availability e2e test URL: https://github.com/apache/flink/pull/7056#discussion_r232591615 ## File path: flink-end-to-end-tests/flink-metrics-availability-test/src/test/java/org/pache/flink/metrics/tests/MetricsAvailabilityITCase.java ## @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.pache.flink.metrics.tests; + +import org.apache.flink.api.common.time.Deadline; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter; +import org.apache.flink.runtime.rest.RestClient; +import org.apache.flink.runtime.rest.RestClientConfiguration; +import org.apache.flink.runtime.rest.messages.EmptyMessageParameters; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.job.metrics.JobManagerMetricsHeaders; +import org.apache.flink.runtime.rest.messages.job.metrics.JobManagerMetricsMessageParameters; +import org.apache.flink.runtime.rest.messages.job.metrics.MetricCollectionResponseBody; +import org.apache.flink.runtime.rest.messages.job.metrics.TaskManagerMetricsHeaders; +import org.apache.flink.runtime.rest.messages.job.metrics.TaskManagerMetricsMessageParameters; +import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerInfo; +import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagersHeaders; +import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagersInfo; +import org.apache.flink.tests.util.FlinkDistribution; +import org.apache.flink.util.TestLogger; +import org.apache.flink.util.function.SupplierWithException; + +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.time.Duration; +import java.util.Collection; +import java.util.Collections; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.function.Predicate; +import java.util.stream.Collectors; + +/** + * End-to-end test for the availability of metrics. + */ +public class MetricsAvailabilityITCase extends TestLogger { + + private static final String HOST = "localhost"; + private static final int PORT = 8081; + + @Rule + public final FlinkDistribution dist = new FlinkDistribution(); + + @Nullable + private static ScheduledExecutorService scheduledExecutorService = null; + + @BeforeClass + public static void startExecutor() { + scheduledExecutorService = Executors.newScheduledThreadPool(4); + } + + @AfterClass + public static void shutdownExecutor() { + if (scheduledExecutorService != null) { + scheduledExecutorService.shutdown(); + } + } + + @Test + public void testReporter() throws Exception { + dist.startFlinkCluster(); + + final RestClient restClient = new RestClient(RestClientConfiguration.fromConfiguration(new Configuration()), scheduledExecutorService); + Review comment: Do we need the `checkJobManagerMetricAvailability ` given that later we also ask the JM for the `TaskManagerIds`? If not, the we can remove that method and rename the `getTaskManagerIds ` to sth like `verifyJMMetricsAndFetch...`. This is an automated message from the Apache Git Service. To
[jira] [Commented] (FLINK-10634) End-to-end test: Metrics accessible via REST API
[ https://issues.apache.org/jira/browse/FLINK-10634?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16683484#comment-16683484 ] ASF GitHub Bot commented on FLINK-10634: kl0u commented on a change in pull request #7056: [FLINK-10634][metrics][rest] Add metrics availability e2e test URL: https://github.com/apache/flink/pull/7056#discussion_r232591854 ## File path: flink-end-to-end-tests/flink-metrics-availability-test/src/test/java/org/pache/flink/metrics/tests/MetricsAvailabilityITCase.java ## @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.pache.flink.metrics.tests; + +import org.apache.flink.api.common.time.Deadline; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter; +import org.apache.flink.runtime.rest.RestClient; +import org.apache.flink.runtime.rest.RestClientConfiguration; +import org.apache.flink.runtime.rest.messages.EmptyMessageParameters; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.job.metrics.JobManagerMetricsHeaders; +import org.apache.flink.runtime.rest.messages.job.metrics.JobManagerMetricsMessageParameters; +import org.apache.flink.runtime.rest.messages.job.metrics.MetricCollectionResponseBody; +import org.apache.flink.runtime.rest.messages.job.metrics.TaskManagerMetricsHeaders; +import org.apache.flink.runtime.rest.messages.job.metrics.TaskManagerMetricsMessageParameters; +import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerInfo; +import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagersHeaders; +import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagersInfo; +import org.apache.flink.tests.util.FlinkDistribution; +import org.apache.flink.util.TestLogger; +import org.apache.flink.util.function.SupplierWithException; + +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.time.Duration; +import java.util.Collection; +import java.util.Collections; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.function.Predicate; +import java.util.stream.Collectors; + +/** + * End-to-end test for the availability of metrics. + */ +public class MetricsAvailabilityITCase extends TestLogger { + + private static final String HOST = "localhost"; + private static final int PORT = 8081; + + @Rule + public final FlinkDistribution dist = new FlinkDistribution(); + + @Nullable + private static ScheduledExecutorService scheduledExecutorService = null; + + @BeforeClass + public static void startExecutor() { + scheduledExecutorService = Executors.newScheduledThreadPool(4); + } + + @AfterClass + public static void shutdownExecutor() { + if (scheduledExecutorService != null) { + scheduledExecutorService.shutdown(); + } + } + + @Test + public void testReporter() throws Exception { + dist.startFlinkCluster(); + + final RestClient restClient = new RestClient(RestClientConfiguration.fromConfiguration(new Configuration()), scheduledExecutorService); + + checkJobManagerMetricAvailability(restClient); + + final Collection taskManagerIds = getTaskManagerIds(restClient); + + for (final ResourceID taskManagerId : taskManagerIds) { + checkTaskManagerMetricAvailability(restClient, taskManagerId); + } + } + + private static void
[jira] [Commented] (FLINK-10634) End-to-end test: Metrics accessible via REST API
[ https://issues.apache.org/jira/browse/FLINK-10634?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16683488#comment-16683488 ] ASF GitHub Bot commented on FLINK-10634: kl0u commented on a change in pull request #7056: [FLINK-10634][metrics][rest] Add metrics availability e2e test URL: https://github.com/apache/flink/pull/7056#discussion_r232591615 ## File path: flink-end-to-end-tests/flink-metrics-availability-test/src/test/java/org/pache/flink/metrics/tests/MetricsAvailabilityITCase.java ## @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.pache.flink.metrics.tests; + +import org.apache.flink.api.common.time.Deadline; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter; +import org.apache.flink.runtime.rest.RestClient; +import org.apache.flink.runtime.rest.RestClientConfiguration; +import org.apache.flink.runtime.rest.messages.EmptyMessageParameters; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.job.metrics.JobManagerMetricsHeaders; +import org.apache.flink.runtime.rest.messages.job.metrics.JobManagerMetricsMessageParameters; +import org.apache.flink.runtime.rest.messages.job.metrics.MetricCollectionResponseBody; +import org.apache.flink.runtime.rest.messages.job.metrics.TaskManagerMetricsHeaders; +import org.apache.flink.runtime.rest.messages.job.metrics.TaskManagerMetricsMessageParameters; +import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerInfo; +import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagersHeaders; +import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagersInfo; +import org.apache.flink.tests.util.FlinkDistribution; +import org.apache.flink.util.TestLogger; +import org.apache.flink.util.function.SupplierWithException; + +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.time.Duration; +import java.util.Collection; +import java.util.Collections; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.function.Predicate; +import java.util.stream.Collectors; + +/** + * End-to-end test for the availability of metrics. + */ +public class MetricsAvailabilityITCase extends TestLogger { + + private static final String HOST = "localhost"; + private static final int PORT = 8081; + + @Rule + public final FlinkDistribution dist = new FlinkDistribution(); + + @Nullable + private static ScheduledExecutorService scheduledExecutorService = null; + + @BeforeClass + public static void startExecutor() { + scheduledExecutorService = Executors.newScheduledThreadPool(4); + } + + @AfterClass + public static void shutdownExecutor() { + if (scheduledExecutorService != null) { + scheduledExecutorService.shutdown(); + } + } + + @Test + public void testReporter() throws Exception { + dist.startFlinkCluster(); + + final RestClient restClient = new RestClient(RestClientConfiguration.fromConfiguration(new Configuration()), scheduledExecutorService); + Review comment: Do we need the `checkJobManagerMetricAvailability ` given that later we also ask the JM for the `TaskManagerIds`? If not, the we can remove that method and rename the `getTaskManagerIds ` to sth like `verifyJMMetricsAndFetch...`. This is an automated message from the Apache Git Service. To
[jira] [Commented] (FLINK-10634) End-to-end test: Metrics accessible via REST API
[ https://issues.apache.org/jira/browse/FLINK-10634?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16683483#comment-16683483 ] ASF GitHub Bot commented on FLINK-10634: kl0u commented on a change in pull request #7056: [FLINK-10634][metrics][rest] Add metrics availability e2e test URL: https://github.com/apache/flink/pull/7056#discussion_r232591919 ## File path: flink-end-to-end-tests/flink-metrics-availability-test/src/test/java/org/pache/flink/metrics/tests/MetricsAvailabilityITCase.java ## @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.pache.flink.metrics.tests; + +import org.apache.flink.api.common.time.Deadline; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter; +import org.apache.flink.runtime.rest.RestClient; +import org.apache.flink.runtime.rest.RestClientConfiguration; +import org.apache.flink.runtime.rest.messages.EmptyMessageParameters; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.job.metrics.JobManagerMetricsHeaders; +import org.apache.flink.runtime.rest.messages.job.metrics.JobManagerMetricsMessageParameters; +import org.apache.flink.runtime.rest.messages.job.metrics.MetricCollectionResponseBody; +import org.apache.flink.runtime.rest.messages.job.metrics.TaskManagerMetricsHeaders; +import org.apache.flink.runtime.rest.messages.job.metrics.TaskManagerMetricsMessageParameters; +import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerInfo; +import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagersHeaders; +import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagersInfo; +import org.apache.flink.tests.util.FlinkDistribution; +import org.apache.flink.util.TestLogger; +import org.apache.flink.util.function.SupplierWithException; + +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.time.Duration; +import java.util.Collection; +import java.util.Collections; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.function.Predicate; +import java.util.stream.Collectors; + +/** + * End-to-end test for the availability of metrics. + */ +public class MetricsAvailabilityITCase extends TestLogger { + + private static final String HOST = "localhost"; + private static final int PORT = 8081; + + @Rule + public final FlinkDistribution dist = new FlinkDistribution(); + + @Nullable + private static ScheduledExecutorService scheduledExecutorService = null; + + @BeforeClass + public static void startExecutor() { + scheduledExecutorService = Executors.newScheduledThreadPool(4); + } + + @AfterClass + public static void shutdownExecutor() { + if (scheduledExecutorService != null) { + scheduledExecutorService.shutdown(); + } + } + + @Test + public void testReporter() throws Exception { + dist.startFlinkCluster(); + + final RestClient restClient = new RestClient(RestClientConfiguration.fromConfiguration(new Configuration()), scheduledExecutorService); + + checkJobManagerMetricAvailability(restClient); + + final Collection taskManagerIds = getTaskManagerIds(restClient); + + for (final ResourceID taskManagerId : taskManagerIds) { + checkTaskManagerMetricAvailability(restClient, taskManagerId); + } + } + + private static void
[GitHub] aljoscha commented on issue #7058: [FLINK-10817] Upgrade presto dependency to support path-style access
aljoscha commented on issue #7058: [FLINK-10817] Upgrade presto dependency to support path-style access URL: https://github.com/apache/flink/pull/7058#issuecomment-437829882 I bumped the version in all places and also added an exclude for the new `io.airlift.log-manager` dependency`. PTAL. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10457) Support SequenceFile for StreamingFileSink
[ https://issues.apache.org/jira/browse/FLINK-10457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16683393#comment-16683393 ] ASF GitHub Bot commented on FLINK-10457: kl0u commented on issue #6774: [FLINK-10457] Support SequenceFile for StreamingFileSink URL: https://github.com/apache/flink/pull/6774#issuecomment-437795189 Hi @jihyun ! Currently we are busy with testing the release candidate for Flink-1.7. Unfortunately, this PR will have to wait until the release happens. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support SequenceFile for StreamingFileSink > -- > > Key: FLINK-10457 > URL: https://issues.apache.org/jira/browse/FLINK-10457 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Jihyun Cho >Priority: Major > Labels: pull-request-available > > SequenceFile is major file format in Hadoop eco system. > It is simple to manage file and easy to combine with other tools. > So we are still needed SequenceFile format, even if the file format supports > Parquet and ORC. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] twalthr commented on issue #7065: [FLINK-10626] [docs] [table] Add documentation for event-time temporal table joins
twalthr commented on issue #7065: [FLINK-10626] [docs] [table] Add documentation for event-time temporal table joins URL: https://github.com/apache/flink/pull/7065#issuecomment-437795049 Thanks @pnowojski. I will address your comments and merge this. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] kl0u commented on issue #6774: [FLINK-10457] Support SequenceFile for StreamingFileSink
kl0u commented on issue #6774: [FLINK-10457] Support SequenceFile for StreamingFileSink URL: https://github.com/apache/flink/pull/6774#issuecomment-437795189 Hi @jihyun ! Currently we are busy with testing the release candidate for Flink-1.7. Unfortunately, this PR will have to wait until the release happens. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] igalshilman commented on a change in pull request #7072: [FLINK-10821] Fix resume from externalized checkpoints E2E Test
igalshilman commented on a change in pull request #7072: [FLINK-10821] Fix resume from externalized checkpoints E2E Test URL: https://github.com/apache/flink/pull/7072#discussion_r232572095 ## File path: flink-end-to-end-tests/test-scripts/test_resume_externalized_checkpoints.sh ## @@ -125,9 +126,20 @@ fi echo "Restoring job with externalized checkpoint at $CHECKPOINT_PATH ..." -BASE_JOB_CMD=`buildBaseJobCmd $NEW_DOP` +BASE_JOB_CMD=`buildBaseJobCmd $NEW_DOP "-s file://${CHECKPOINT_PATH}"` +JOB_CMD="" +if [[ $SIMULATE_FAILURE == "true" ]]; then Review comment: Thanks for looking into it @tillrohrmann! We construct a failing job (that doesn't actually fail since all the parameters are 0) so that the recovered job and the original job would have exactly the same operators (omitting the `--test.simulate_failure true` omits the `FailureMapper` from the recovered job.) Having the jobs differ doesn't allow recovering from the externalized checkpoint without supplying `--allowNonRestoredState` (unless there is a different way that I'm not aware of) The problem with passing this argument is: 1. I think It makes the test more fragile , if the recovered job would accidentally start with a fresh state. 2. The `SemanticsCheckMapper` reports violation in the following case: ```run_test "Running HA per-job cluster (file, sync) end-to-end test" "$END_TO_END_DIR/test-scripts/test_ha_per_job_cluster_datastream.sh file false false" "skip_check_exceptions" ``` which looks like an artifact of how a job is constructed from its arguments via `DataStreamAllroundTestJobFactory` and the missing `.uid()` but we can look into that in a followup ticket? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] kl0u commented on a change in pull request #7056: [FLINK-10634][metrics][rest] Add metrics availability e2e test
kl0u commented on a change in pull request #7056: [FLINK-10634][metrics][rest] Add metrics availability e2e test URL: https://github.com/apache/flink/pull/7056#discussion_r232591854 ## File path: flink-end-to-end-tests/flink-metrics-availability-test/src/test/java/org/pache/flink/metrics/tests/MetricsAvailabilityITCase.java ## @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.pache.flink.metrics.tests; + +import org.apache.flink.api.common.time.Deadline; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter; +import org.apache.flink.runtime.rest.RestClient; +import org.apache.flink.runtime.rest.RestClientConfiguration; +import org.apache.flink.runtime.rest.messages.EmptyMessageParameters; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.job.metrics.JobManagerMetricsHeaders; +import org.apache.flink.runtime.rest.messages.job.metrics.JobManagerMetricsMessageParameters; +import org.apache.flink.runtime.rest.messages.job.metrics.MetricCollectionResponseBody; +import org.apache.flink.runtime.rest.messages.job.metrics.TaskManagerMetricsHeaders; +import org.apache.flink.runtime.rest.messages.job.metrics.TaskManagerMetricsMessageParameters; +import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerInfo; +import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagersHeaders; +import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagersInfo; +import org.apache.flink.tests.util.FlinkDistribution; +import org.apache.flink.util.TestLogger; +import org.apache.flink.util.function.SupplierWithException; + +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.time.Duration; +import java.util.Collection; +import java.util.Collections; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.function.Predicate; +import java.util.stream.Collectors; + +/** + * End-to-end test for the availability of metrics. + */ +public class MetricsAvailabilityITCase extends TestLogger { + + private static final String HOST = "localhost"; + private static final int PORT = 8081; + + @Rule + public final FlinkDistribution dist = new FlinkDistribution(); + + @Nullable + private static ScheduledExecutorService scheduledExecutorService = null; + + @BeforeClass + public static void startExecutor() { + scheduledExecutorService = Executors.newScheduledThreadPool(4); + } + + @AfterClass + public static void shutdownExecutor() { + if (scheduledExecutorService != null) { + scheduledExecutorService.shutdown(); + } + } + + @Test + public void testReporter() throws Exception { + dist.startFlinkCluster(); + + final RestClient restClient = new RestClient(RestClientConfiguration.fromConfiguration(new Configuration()), scheduledExecutorService); + + checkJobManagerMetricAvailability(restClient); + + final Collection taskManagerIds = getTaskManagerIds(restClient); + + for (final ResourceID taskManagerId : taskManagerIds) { + checkTaskManagerMetricAvailability(restClient, taskManagerId); + } + } + + private static void checkJobManagerMetricAvailability(final RestClient restClient) throws Exception { + final JobManagerMetricsHeaders headers = JobManagerMetricsHeaders.getInstance(); + final JobManagerMetricsMessageParameters parameters =
[GitHub] kl0u commented on a change in pull request #7056: [FLINK-10634][metrics][rest] Add metrics availability e2e test
kl0u commented on a change in pull request #7056: [FLINK-10634][metrics][rest] Add metrics availability e2e test URL: https://github.com/apache/flink/pull/7056#discussion_r232591615 ## File path: flink-end-to-end-tests/flink-metrics-availability-test/src/test/java/org/pache/flink/metrics/tests/MetricsAvailabilityITCase.java ## @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.pache.flink.metrics.tests; + +import org.apache.flink.api.common.time.Deadline; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter; +import org.apache.flink.runtime.rest.RestClient; +import org.apache.flink.runtime.rest.RestClientConfiguration; +import org.apache.flink.runtime.rest.messages.EmptyMessageParameters; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.job.metrics.JobManagerMetricsHeaders; +import org.apache.flink.runtime.rest.messages.job.metrics.JobManagerMetricsMessageParameters; +import org.apache.flink.runtime.rest.messages.job.metrics.MetricCollectionResponseBody; +import org.apache.flink.runtime.rest.messages.job.metrics.TaskManagerMetricsHeaders; +import org.apache.flink.runtime.rest.messages.job.metrics.TaskManagerMetricsMessageParameters; +import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerInfo; +import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagersHeaders; +import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagersInfo; +import org.apache.flink.tests.util.FlinkDistribution; +import org.apache.flink.util.TestLogger; +import org.apache.flink.util.function.SupplierWithException; + +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.time.Duration; +import java.util.Collection; +import java.util.Collections; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.function.Predicate; +import java.util.stream.Collectors; + +/** + * End-to-end test for the availability of metrics. + */ +public class MetricsAvailabilityITCase extends TestLogger { + + private static final String HOST = "localhost"; + private static final int PORT = 8081; + + @Rule + public final FlinkDistribution dist = new FlinkDistribution(); + + @Nullable + private static ScheduledExecutorService scheduledExecutorService = null; + + @BeforeClass + public static void startExecutor() { + scheduledExecutorService = Executors.newScheduledThreadPool(4); + } + + @AfterClass + public static void shutdownExecutor() { + if (scheduledExecutorService != null) { + scheduledExecutorService.shutdown(); + } + } + + @Test + public void testReporter() throws Exception { + dist.startFlinkCluster(); + + final RestClient restClient = new RestClient(RestClientConfiguration.fromConfiguration(new Configuration()), scheduledExecutorService); + Review comment: Do we need the `checkJobManagerMetricAvailability ` given that later we also ask the JM for the `TaskManagerIds`? If not, the we can remove that method and rename the `getTaskManagerIds ` to sth like `verifyJMMetricsAndFetch...`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10634) End-to-end test: Metrics accessible via REST API
[ https://issues.apache.org/jira/browse/FLINK-10634?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16683486#comment-16683486 ] ASF GitHub Bot commented on FLINK-10634: kl0u commented on a change in pull request #7056: [FLINK-10634][metrics][rest] Add metrics availability e2e test URL: https://github.com/apache/flink/pull/7056#discussion_r232592152 ## File path: flink-end-to-end-tests/flink-metrics-availability-test/src/test/java/org/pache/flink/metrics/tests/MetricsAvailabilityITCase.java ## @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.pache.flink.metrics.tests; + +import org.apache.flink.api.common.time.Deadline; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter; +import org.apache.flink.runtime.rest.RestClient; +import org.apache.flink.runtime.rest.RestClientConfiguration; +import org.apache.flink.runtime.rest.messages.EmptyMessageParameters; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.job.metrics.JobManagerMetricsHeaders; +import org.apache.flink.runtime.rest.messages.job.metrics.JobManagerMetricsMessageParameters; +import org.apache.flink.runtime.rest.messages.job.metrics.MetricCollectionResponseBody; +import org.apache.flink.runtime.rest.messages.job.metrics.TaskManagerMetricsHeaders; +import org.apache.flink.runtime.rest.messages.job.metrics.TaskManagerMetricsMessageParameters; +import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerInfo; +import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagersHeaders; +import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagersInfo; +import org.apache.flink.tests.util.FlinkDistribution; +import org.apache.flink.util.TestLogger; +import org.apache.flink.util.function.SupplierWithException; + +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.time.Duration; +import java.util.Collection; +import java.util.Collections; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.function.Predicate; +import java.util.stream.Collectors; + +/** + * End-to-end test for the availability of metrics. + */ +public class MetricsAvailabilityITCase extends TestLogger { + + private static final String HOST = "localhost"; + private static final int PORT = 8081; + + @Rule + public final FlinkDistribution dist = new FlinkDistribution(); + + @Nullable + private static ScheduledExecutorService scheduledExecutorService = null; + + @BeforeClass + public static void startExecutor() { + scheduledExecutorService = Executors.newScheduledThreadPool(4); + } + + @AfterClass + public static void shutdownExecutor() { + if (scheduledExecutorService != null) { + scheduledExecutorService.shutdown(); + } + } + + @Test + public void testReporter() throws Exception { + dist.startFlinkCluster(); + + final RestClient restClient = new RestClient(RestClientConfiguration.fromConfiguration(new Configuration()), scheduledExecutorService); + + checkJobManagerMetricAvailability(restClient); + + final Collection taskManagerIds = getTaskManagerIds(restClient); + + for (final ResourceID taskManagerId : taskManagerIds) { + checkTaskManagerMetricAvailability(restClient, taskManagerId); + } + } + + private static void
[GitHub] kl0u commented on a change in pull request #7056: [FLINK-10634][metrics][rest] Add metrics availability e2e test
kl0u commented on a change in pull request #7056: [FLINK-10634][metrics][rest] Add metrics availability e2e test URL: https://github.com/apache/flink/pull/7056#discussion_r232591919 ## File path: flink-end-to-end-tests/flink-metrics-availability-test/src/test/java/org/pache/flink/metrics/tests/MetricsAvailabilityITCase.java ## @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.pache.flink.metrics.tests; + +import org.apache.flink.api.common.time.Deadline; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter; +import org.apache.flink.runtime.rest.RestClient; +import org.apache.flink.runtime.rest.RestClientConfiguration; +import org.apache.flink.runtime.rest.messages.EmptyMessageParameters; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.job.metrics.JobManagerMetricsHeaders; +import org.apache.flink.runtime.rest.messages.job.metrics.JobManagerMetricsMessageParameters; +import org.apache.flink.runtime.rest.messages.job.metrics.MetricCollectionResponseBody; +import org.apache.flink.runtime.rest.messages.job.metrics.TaskManagerMetricsHeaders; +import org.apache.flink.runtime.rest.messages.job.metrics.TaskManagerMetricsMessageParameters; +import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerInfo; +import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagersHeaders; +import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagersInfo; +import org.apache.flink.tests.util.FlinkDistribution; +import org.apache.flink.util.TestLogger; +import org.apache.flink.util.function.SupplierWithException; + +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.time.Duration; +import java.util.Collection; +import java.util.Collections; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.function.Predicate; +import java.util.stream.Collectors; + +/** + * End-to-end test for the availability of metrics. + */ +public class MetricsAvailabilityITCase extends TestLogger { + + private static final String HOST = "localhost"; + private static final int PORT = 8081; + + @Rule + public final FlinkDistribution dist = new FlinkDistribution(); + + @Nullable + private static ScheduledExecutorService scheduledExecutorService = null; + + @BeforeClass + public static void startExecutor() { + scheduledExecutorService = Executors.newScheduledThreadPool(4); + } + + @AfterClass + public static void shutdownExecutor() { + if (scheduledExecutorService != null) { + scheduledExecutorService.shutdown(); + } + } + + @Test + public void testReporter() throws Exception { + dist.startFlinkCluster(); + + final RestClient restClient = new RestClient(RestClientConfiguration.fromConfiguration(new Configuration()), scheduledExecutorService); + + checkJobManagerMetricAvailability(restClient); + + final Collection taskManagerIds = getTaskManagerIds(restClient); + + for (final ResourceID taskManagerId : taskManagerIds) { + checkTaskManagerMetricAvailability(restClient, taskManagerId); + } + } + + private static void checkJobManagerMetricAvailability(final RestClient restClient) throws Exception { + final JobManagerMetricsHeaders headers = JobManagerMetricsHeaders.getInstance(); + final JobManagerMetricsMessageParameters parameters =
[GitHub] kl0u commented on a change in pull request #7056: [FLINK-10634][metrics][rest] Add metrics availability e2e test
kl0u commented on a change in pull request #7056: [FLINK-10634][metrics][rest] Add metrics availability e2e test URL: https://github.com/apache/flink/pull/7056#discussion_r232592152 ## File path: flink-end-to-end-tests/flink-metrics-availability-test/src/test/java/org/pache/flink/metrics/tests/MetricsAvailabilityITCase.java ## @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.pache.flink.metrics.tests; + +import org.apache.flink.api.common.time.Deadline; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter; +import org.apache.flink.runtime.rest.RestClient; +import org.apache.flink.runtime.rest.RestClientConfiguration; +import org.apache.flink.runtime.rest.messages.EmptyMessageParameters; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.job.metrics.JobManagerMetricsHeaders; +import org.apache.flink.runtime.rest.messages.job.metrics.JobManagerMetricsMessageParameters; +import org.apache.flink.runtime.rest.messages.job.metrics.MetricCollectionResponseBody; +import org.apache.flink.runtime.rest.messages.job.metrics.TaskManagerMetricsHeaders; +import org.apache.flink.runtime.rest.messages.job.metrics.TaskManagerMetricsMessageParameters; +import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerInfo; +import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagersHeaders; +import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagersInfo; +import org.apache.flink.tests.util.FlinkDistribution; +import org.apache.flink.util.TestLogger; +import org.apache.flink.util.function.SupplierWithException; + +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.time.Duration; +import java.util.Collection; +import java.util.Collections; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.function.Predicate; +import java.util.stream.Collectors; + +/** + * End-to-end test for the availability of metrics. + */ +public class MetricsAvailabilityITCase extends TestLogger { + + private static final String HOST = "localhost"; + private static final int PORT = 8081; + + @Rule + public final FlinkDistribution dist = new FlinkDistribution(); + + @Nullable + private static ScheduledExecutorService scheduledExecutorService = null; + + @BeforeClass + public static void startExecutor() { + scheduledExecutorService = Executors.newScheduledThreadPool(4); + } + + @AfterClass + public static void shutdownExecutor() { + if (scheduledExecutorService != null) { + scheduledExecutorService.shutdown(); + } + } + + @Test + public void testReporter() throws Exception { + dist.startFlinkCluster(); + + final RestClient restClient = new RestClient(RestClientConfiguration.fromConfiguration(new Configuration()), scheduledExecutorService); + + checkJobManagerMetricAvailability(restClient); + + final Collection taskManagerIds = getTaskManagerIds(restClient); + + for (final ResourceID taskManagerId : taskManagerIds) { + checkTaskManagerMetricAvailability(restClient, taskManagerId); + } + } + + private static void checkJobManagerMetricAvailability(final RestClient restClient) throws Exception { + final JobManagerMetricsHeaders headers = JobManagerMetricsHeaders.getInstance(); + final JobManagerMetricsMessageParameters parameters =
[GitHub] kl0u commented on a change in pull request #7056: [FLINK-10634][metrics][rest] Add metrics availability e2e test
kl0u commented on a change in pull request #7056: [FLINK-10634][metrics][rest] Add metrics availability e2e test URL: https://github.com/apache/flink/pull/7056#discussion_r232591615 ## File path: flink-end-to-end-tests/flink-metrics-availability-test/src/test/java/org/pache/flink/metrics/tests/MetricsAvailabilityITCase.java ## @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.pache.flink.metrics.tests; + +import org.apache.flink.api.common.time.Deadline; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter; +import org.apache.flink.runtime.rest.RestClient; +import org.apache.flink.runtime.rest.RestClientConfiguration; +import org.apache.flink.runtime.rest.messages.EmptyMessageParameters; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.job.metrics.JobManagerMetricsHeaders; +import org.apache.flink.runtime.rest.messages.job.metrics.JobManagerMetricsMessageParameters; +import org.apache.flink.runtime.rest.messages.job.metrics.MetricCollectionResponseBody; +import org.apache.flink.runtime.rest.messages.job.metrics.TaskManagerMetricsHeaders; +import org.apache.flink.runtime.rest.messages.job.metrics.TaskManagerMetricsMessageParameters; +import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerInfo; +import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagersHeaders; +import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagersInfo; +import org.apache.flink.tests.util.FlinkDistribution; +import org.apache.flink.util.TestLogger; +import org.apache.flink.util.function.SupplierWithException; + +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.time.Duration; +import java.util.Collection; +import java.util.Collections; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.function.Predicate; +import java.util.stream.Collectors; + +/** + * End-to-end test for the availability of metrics. + */ +public class MetricsAvailabilityITCase extends TestLogger { + + private static final String HOST = "localhost"; + private static final int PORT = 8081; + + @Rule + public final FlinkDistribution dist = new FlinkDistribution(); + + @Nullable + private static ScheduledExecutorService scheduledExecutorService = null; + + @BeforeClass + public static void startExecutor() { + scheduledExecutorService = Executors.newScheduledThreadPool(4); + } + + @AfterClass + public static void shutdownExecutor() { + if (scheduledExecutorService != null) { + scheduledExecutorService.shutdown(); + } + } + + @Test + public void testReporter() throws Exception { + dist.startFlinkCluster(); + + final RestClient restClient = new RestClient(RestClientConfiguration.fromConfiguration(new Configuration()), scheduledExecutorService); + Review comment: Do we need the `checkJobManagerMetricAvailability ` given that later we also ask the JM for the `TaskManagerIds`? If not, the we can remove that method and rename the `getTaskManagerIds ` to sth like `verifyJMMetricsAndFetch...`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] art4ul commented on issue #6608: [FLINK-10203]Support truncate method for old Hadoop versions in HadoopRecoverableFsDataOutputStream
art4ul commented on issue #6608: [FLINK-10203]Support truncate method for old Hadoop versions in HadoopRecoverableFsDataOutputStream URL: https://github.com/apache/flink/pull/6608#issuecomment-437836407 Hi @kl0u , maybe could you share your ticket regarding the implementation of logic to support supportsResume() method? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Resolved] (FLINK-10626) Add documentation for event-time temporal table joins
[ https://issues.apache.org/jira/browse/FLINK-10626?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Timo Walther resolved FLINK-10626. -- Resolution: Fixed Fixed in master: 17a6e7a2be756883f9109ba04febe1f6743944a3 Fixed in 1.7: 096918c6577b06bb9ac6250063c6c99a04907f77 > Add documentation for event-time temporal table joins > - > > Key: FLINK-10626 > URL: https://issues.apache.org/jira/browse/FLINK-10626 > Project: Flink > Issue Type: Sub-task > Components: Documentation, Table API SQL >Affects Versions: 1.7.0 >Reporter: Till Rohrmann >Assignee: Timo Walther >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > The Flink documentation should be updated to cover the newly added > functionality. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] twalthr commented on a change in pull request #7045: [hotfix] Update nightly master cron jobs
twalthr commented on a change in pull request #7045: [hotfix] Update nightly master cron jobs URL: https://github.com/apache/flink/pull/7045#discussion_r232570011 ## File path: splits/split_misc.sh ## @@ -43,12 +43,16 @@ echo "Flink distribution directory: $FLINK_DIR" # run_test "" "$END_TO_END_DIR/test-scripts/" +run_test "Flink CLI end-to-end test" "$END_TO_END_DIR/test-scripts/test_cli.sh" Review comment: I will copy some tests but I think we need to further split `split_misc_hadoopfree` because we run out of time otherwise. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10826) Heavy deployment end-to-end test produces no output on Travis
[ https://issues.apache.org/jira/browse/FLINK-10826?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16683460#comment-16683460 ] ASF GitHub Bot commented on FLINK-10826: asfgit closed pull request #7066: [FLINK-10826][test] Decrease deployment size of heavy deplyment e2e t… URL: https://github.com/apache/flink/pull/7066 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-end-to-end-tests/test-scripts/test_heavy_deployment.sh b/flink-end-to-end-tests/test-scripts/test_heavy_deployment.sh index 895e4a7f302..b4646fc3260 100755 --- a/flink-end-to-end-tests/test-scripts/test_heavy_deployment.sh +++ b/flink-end-to-end-tests/test-scripts/test_heavy_deployment.sh @@ -25,7 +25,7 @@ TEST=flink-heavy-deployment-stress-test TEST_PROGRAM_NAME=HeavyDeploymentStressTestProgram TEST_PROGRAM_JAR=${END_TO_END_DIR}/$TEST/target/$TEST_PROGRAM_NAME.jar -set_conf "taskmanager.heap.mb" "256" # 256Mb x 20TMs = 5Gb total heap +set_conf "taskmanager.heap.mb" "512" # 512Mb x 10TMs = 5Gb total heap set_conf "taskmanager.memory.size" "8" # 8Mb set_conf "taskmanager.network.memory.min" "8mb" @@ -35,12 +35,12 @@ set_conf "taskmanager.memory.segment-size" "8kb" set_conf "taskmanager.numberOfTaskSlots" "10" # 10 slots per TM start_cluster # this also starts 1TM -start_taskmanagers 19 # 1TM + 19TM = 20TM a 10 slots = 200 slots +start_taskmanagers 9 # 1TM + 9TM = 10TM a 10 slots = 100 slots -# This call will result in a deployment with state meta data of 200 x 200 x 50 union states x each 75 entries. +# This call will result in a deployment with state meta data of 100 x 100 x 50 union states x each 100 entries. # We can scale up the numbers to make the test even heavier. $FLINK_DIR/bin/flink run ${TEST_PROGRAM_JAR} \ ---environment.max_parallelism 1024 --environment.parallelism 200 \ +--environment.max_parallelism 1024 --environment.parallelism 100 \ --environment.restart_strategy fixed_delay --environment.restart_strategy.fixed_delay.attempts 3 \ --state_backend.checkpoint_directory ${CHECKPOINT_DIR} \ ---heavy_deployment_test.num_list_states_per_op 50 --heavy_deployment_test.num_partitions_per_list_state 75 +--heavy_deployment_test.num_list_states_per_op 50 --heavy_deployment_test.num_partitions_per_list_state 100 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Heavy deployment end-to-end test produces no output on Travis > - > > Key: FLINK-10826 > URL: https://issues.apache.org/jira/browse/FLINK-10826 > Project: Flink > Issue Type: Bug > Components: E2E Tests >Affects Versions: 1.7.0 >Reporter: Timo Walther >Assignee: Stefan Richter >Priority: Critical > Labels: pull-request-available > Attachments: heavy_deployement_log.txt > > > The Heavy deployment end-to-end test produces no output on Travis such that > it is killed after 10 minutes. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10817) Upgrade presto dependency to support path-style access
[ https://issues.apache.org/jira/browse/FLINK-10817?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16683525#comment-16683525 ] ASF GitHub Bot commented on FLINK-10817: aljoscha commented on issue #7058: [FLINK-10817] Upgrade presto dependency to support path-style access URL: https://github.com/apache/flink/pull/7058#issuecomment-437829882 I bumped the version in all places and also added an exclude for the new `io.airlift.log-manager` dependency`. PTAL. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Upgrade presto dependency to support path-style access > -- > > Key: FLINK-10817 > URL: https://issues.apache.org/jira/browse/FLINK-10817 > Project: Flink > Issue Type: Improvement >Reporter: Adam Lamar >Assignee: Aljoscha Krettek >Priority: Major > Labels: pull-request-available > > In order to use any given non-AWS s3 implementation backed by the presto s3 > filesystem, it is necessary to set at least one configuration parameter in > flink-conf.yaml: > * presto.s3.endpoint: https://example.com > This appears to work as expected for hosted s3 alternatives. > In order to use a bring-your-own, self-hosted s3 alternative like > [minio|https://www.minio.io/], at least two configuration parameters are > required: > * presto.s3.endpoint: https://example.com > * presto.s3.path-style-access: true > However, the second path-style-access parameter doesn't work because the > 0.185 version of presto doesn't support passing through that configuration > option to the hive s3 client. > To work around the issue, path-style-access can be forced on the s3 client by > using an IP address for the endpoint (instead of a hostname). Without this > workaround, flink attempts to use the virtualhost-style at > bucketname.example.com, which fails unless the expected DNS records exist. > To solve this problem and enable non-IP endpoints, upgrade the > [pom|https://github.com/apache/flink/blob/master/flink-filesystems/flink-s3-fs-presto/pom.xml#L36] > to at least 0.186 which includes [this > commit|https://github.com/prestodb/presto/commit/0707f2f21a96d2fd30953fb3fa9a9a03f03d88bd] > Note that 0.213 is the latest presto release. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10203) Support truncate method for old Hadoop versions in HadoopRecoverableFsDataOutputStream
[ https://issues.apache.org/jira/browse/FLINK-10203?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16683556#comment-16683556 ] ASF GitHub Bot commented on FLINK-10203: art4ul commented on issue #6608: [FLINK-10203]Support truncate method for old Hadoop versions in HadoopRecoverableFsDataOutputStream URL: https://github.com/apache/flink/pull/6608#issuecomment-437836407 Hi @kl0u , maybe could you share your ticket regarding the implementation of logic to support supportsResume() method? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support truncate method for old Hadoop versions in > HadoopRecoverableFsDataOutputStream > -- > > Key: FLINK-10203 > URL: https://issues.apache.org/jira/browse/FLINK-10203 > Project: Flink > Issue Type: Bug > Components: DataStream API, filesystem-connector >Affects Versions: 1.6.0, 1.6.1, 1.7.0 >Reporter: Artsem Semianenka >Assignee: Artsem Semianenka >Priority: Major > Labels: pull-request-available > Attachments: legacy truncate logic.pdf > > > New StreamingFileSink ( introduced in 1.6 Flink version ) use > HadoopRecoverableFsDataOutputStream wrapper to write data in HDFS. > HadoopRecoverableFsDataOutputStream is a wrapper for FSDataOutputStream to > have an ability to restore from certain point of file after failure and > continue write data. To achieve this recover functionality the > HadoopRecoverableFsDataOutputStream use "truncate" method which was > introduced only in Hadoop 2.7 . > Unfortunately there are a few official Hadoop distributive which latest > version still use Hadoop 2.6 (This distributives: Cloudera, Pivotal HD ). As > the result Flinks Hadoop connector can't work with this distributives. > Flink declares that supported Hadoop from version 2.4.0 upwards > ([https://ci.apache.org/projects/flink/flink-docs-release-1.6/start/building.html#hadoop-versions]) > I guess we should emulate the functionality of "truncate" method for older > Hadoop versions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10626) Add documentation for event-time temporal table joins
[ https://issues.apache.org/jira/browse/FLINK-10626?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16683392#comment-16683392 ] ASF GitHub Bot commented on FLINK-10626: twalthr commented on issue #7065: [FLINK-10626] [docs] [table] Add documentation for event-time temporal table joins URL: https://github.com/apache/flink/pull/7065#issuecomment-437795049 Thanks @pnowojski. I will address your comments and merge this. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add documentation for event-time temporal table joins > - > > Key: FLINK-10626 > URL: https://issues.apache.org/jira/browse/FLINK-10626 > Project: Flink > Issue Type: Sub-task > Components: Documentation, Table API SQL >Affects Versions: 1.7.0 >Reporter: Till Rohrmann >Assignee: Timo Walther >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > The Flink documentation should be updated to cover the newly added > functionality. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-10419) ClassNotFoundException while deserializing user exceptions from checkpointing
[ https://issues.apache.org/jira/browse/FLINK-10419?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dawid Wysakowicz reassigned FLINK-10419: Assignee: Dawid Wysakowicz > ClassNotFoundException while deserializing user exceptions from checkpointing > - > > Key: FLINK-10419 > URL: https://issues.apache.org/jira/browse/FLINK-10419 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination, State Backends, Checkpointing >Affects Versions: 1.5.0, 1.5.1, 1.5.2, 1.5.3, 1.5.4, 1.6.0, 1.6.1, 1.7.0 >Reporter: Nico Kruber >Assignee: Dawid Wysakowicz >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > It seems that somewhere in the operator's failure handling, we hand a > user-code exception to the checkpoint coordinator via Java serialization but > it will then fail during the de-serialization because the class is not > available. This will result in the following error shadowing the real one: > {code} > java.lang.ClassNotFoundException: > org.apache.flink.streaming.connectors.kafka.FlinkKafka011Exception > at java.net.URLClassLoader.findClass(URLClassLoader.java:381) > at java.lang.ClassLoader.loadClass(ClassLoader.java:424) > at sun.misc.Launcher.loadClass(Launcher.java:338) > at java.lang.ClassLoader.loadClass(ClassLoader.java:357) > at java.lang.Class.forName0(Native Method) > at java.lang.Class.forName(Class.java:348) > at > org.apache.flink.util.InstantiationUtil.resolveClass(InstantiationUtil.java:76) > at > java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1859) > at > java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1745) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2033) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2278) > at > java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:557) > at java.lang.Throwable.readObject(Throwable.java:914) > at sun.reflect.GeneratedMethodAccessor158.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1158) > at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2060) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:427) > at > org.apache.flink.runtime.rpc.messages.RemoteRpcInvocation.readObject(RemoteRpcInvocation.java:222) > at sun.reflect.GeneratedMethodAccessor7.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1158) > at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2060) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:427) > at > org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:502) > at > org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:489) > at > org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:477) > at > org.apache.flink.util.SerializedValue.deserializeValue(SerializedValue.java:58) > at > org.apache.flink.runtime.rpc.messages.RemoteRpcInvocation.deserializeMethodInvocation(RemoteRpcInvocation.java:118) > at > org.apache.flink.runtime.rpc.messages.RemoteRpcInvocation.getMethodName(RemoteRpcInvocation.java:59) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:214) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:162) > at > org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142) > at > org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40) >
[GitHub] dawidwys commented on a change in pull request #7039: [FLINK-8997] Added sliding window aggregation to datastream test job
dawidwys commented on a change in pull request #7039: [FLINK-8997] Added sliding window aggregation to datastream test job URL: https://github.com/apache/flink/pull/7039#discussion_r232586645 ## File path: flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/SlidingWindowCheckMapper.java ## @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.tests; + +import org.apache.flink.api.common.functions.RichFlatMapFunction; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.ListTypeInfo; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.util.Collector; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Optional; + +/** + * This mapper validates sliding event time window. It checks each event belongs to appropriate number of consecutive + * windows. + */ +public class SlidingWindowCheckMapper extends RichFlatMapFunction>, String> { + + private static final long serialVersionUID = -744070793650644485L; + + /** This value state tracks previously seen events with the number of windows they appeared in. */ + private transient ValueState>> eventsSeenSoFar; + + private transient ValueState lastSequenceNumber; + + private final int slideFactor; + + SlidingWindowCheckMapper(int slideFactor) { + this.slideFactor = slideFactor; + } + + @Override + public void open(Configuration parameters) { + ValueStateDescriptor>> previousWindowDescriptor = + new ValueStateDescriptor<>("eventsSeenSoFar", + new ListTypeInfo<>(new TupleTypeInfo<>(TypeInformation.of(Event.class), BasicTypeInfo.INT_TYPE_INFO))); + + eventsSeenSoFar = getRuntimeContext().getState(previousWindowDescriptor); + + ValueStateDescriptor lastSequenceNumberDescriptor = + new ValueStateDescriptor<>("lastSequenceNumber", BasicTypeInfo.LONG_TYPE_INFO); + + lastSequenceNumber = getRuntimeContext().getState(lastSequenceNumberDescriptor); + } + + @Override + public void flatMap(Tuple2> value, Collector out) throws Exception { + List> previousWindowValues = Optional.ofNullable(eventsSeenSoFar.value()).orElseGet( + Collections::emptyList); + + List newValues = value.f1; + Optional lastEventInWindow = verifyWindowContiguity(newValues, out); + + Long lastSequenceNumberSeenSoFar = lastSequenceNumber.value(); + List> newWindows = + verifyPreviousOccurences(previousWindowValues, newValues, out, lastSequenceNumberSeenSoFar); + + if (lastEventInWindow.isPresent()) { + updateLastSeenSequenceNumber(lastEventInWindow.get(), lastSequenceNumberSeenSoFar); + } + + eventsSeenSoFar.update(newWindows); + } + + private void updateLastSeenSequenceNumber( + Event lastEventInWindow, + Long lastSequenceNumberSeenSoFar) throws IOException { + long lastSequenceNumberInWindow = lastEventInWindow.getSequenceNumber(); + if (lastSequenceNumberSeenSoFar == null || lastSequenceNumberInWindow > lastSequenceNumberSeenSoFar) { + lastSequenceNumber.update(lastSequenceNumberInWindow); + } + } + + /** +* Verifies if all values from previous windows appear in the new one. Returns union of all events seen so far that +* were not seen slideFactor number of times yet. +*/ + private List>
[jira] [Commented] (FLINK-10634) End-to-end test: Metrics accessible via REST API
[ https://issues.apache.org/jira/browse/FLINK-10634?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16683491#comment-16683491 ] ASF GitHub Bot commented on FLINK-10634: kl0u commented on issue #7056: [FLINK-10634][metrics][rest] Add metrics availability e2e test URL: https://github.com/apache/flink/pull/7056#issuecomment-437821227 BTW the Travis build you link to has failed. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > End-to-end test: Metrics accessible via REST API > > > Key: FLINK-10634 > URL: https://issues.apache.org/jira/browse/FLINK-10634 > Project: Flink > Issue Type: Sub-task > Components: E2E Tests, Metrics, REST >Affects Versions: 1.7.0 >Reporter: Till Rohrmann >Assignee: Chesnay Schepler >Priority: Major > Labels: pull-request-available > > Verify that Flink's metrics can be accessed via the REST API. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-10736) Shaded Hadoop S3A end-to-end test failed on Travis
[ https://issues.apache.org/jira/browse/FLINK-10736?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dawid Wysakowicz reassigned FLINK-10736: Assignee: Andrey Zagrebin > Shaded Hadoop S3A end-to-end test failed on Travis > -- > > Key: FLINK-10736 > URL: https://issues.apache.org/jira/browse/FLINK-10736 > Project: Flink > Issue Type: Bug > Components: E2E Tests >Affects Versions: 1.7.0 >Reporter: Till Rohrmann >Assignee: Andrey Zagrebin >Priority: Critical > Labels: pull-request-available, test-stability > Fix For: 1.7.0 > > > The {{Shaded Hadoop S3A end-to-end test}} failed on Travis because it could > not find a file stored on S3: > {code} > org.apache.flink.client.program.ProgramInvocationException: Job failed. > (JobID: f28270bedd943ed6b41548b60f5cea73) > at > org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:268) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:487) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:475) > at > org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:62) > at > org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:85) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529) > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427) > at > org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:813) > at > org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:287) > at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213) > at > org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050) > at > org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836) > at > org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126) > Caused by: org.apache.flink.runtime.client.JobExecutionException: Job > execution failed. > at > org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146) > at > org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:265) > ... 21 more > Caused by: java.io.IOException: Error opening the Input Split > s3://[secure]/flink-end-to-end-test-shaded-s3a [0,44]: No such file or > directory: s3://[secure]/flink-end-to-end-test-shaded-s3a > at > org.apache.flink.api.common.io.FileInputFormat.open(FileInputFormat.java:824) > at > org.apache.flink.api.common.io.DelimitedInputFormat.open(DelimitedInputFormat.java:470) > at > org.apache.flink.api.common.io.DelimitedInputFormat.open(DelimitedInputFormat.java:47) > at > org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:170) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.io.FileNotFoundException: No such file or directory: > s3://[secure]/flink-end-to-end-test-shaded-s3a > at > org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:2255) > at > org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:2149) > at > org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:2088) > at > org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.open(S3AFileSystem.java:699) > at > org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.FileSystem.open(FileSystem.java:950) > at > org.apache.flink.fs.s3.common.hadoop.HadoopFileSystem.open(HadoopFileSystem.java:120) > at > org.apache.flink.fs.s3.common.hadoop.HadoopFileSystem.open(HadoopFileSystem.java:37) > at >
[jira] [Commented] (FLINK-10626) Add documentation for event-time temporal table joins
[ https://issues.apache.org/jira/browse/FLINK-10626?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16683400#comment-16683400 ] ASF GitHub Bot commented on FLINK-10626: asfgit closed pull request #7065: [FLINK-10626] [docs] [table] Add documentation for event-time temporal table joins URL: https://github.com/apache/flink/pull/7065 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/docs/dev/table/streaming/joins.md b/docs/dev/table/streaming/joins.md index 24c33b9be63..49abc022ee7 100644 --- a/docs/dev/table/streaming/joins.md +++ b/docs/dev/table/streaming/joins.md @@ -136,14 +136,14 @@ SELECT o.amount * r.rate AS amount FROM Orders AS o, - LATERAL TABLE (Rates(o.proctime)) AS r + LATERAL TABLE (Rates(o.rowtime)) AS r WHERE r.currency = o.currency {% endhighlight %} -Each record from the probe side will be joined with the version of the build side table at the time of the correlated time attribute of the probe side record. +Each record from the probe side will be joined with the version of the build side table at the time of the correlated time attribute of the probe side record. In order to support updates (overwrites) of previous values on the build side table, the table must define a primary key. -In our example, each record from `Orders` will be joined with the version of `Rates` at time `o.proctime`. Because the time attribute defines a processing-time notion, a newly appended order is always joined with the most recent version of `Rates` when executing the operation. The `currency` field has been defined as the primary key of `Rates` before and is used to connect both tables in our example. +In our example, each record from `Orders` will be joined with the version of `Rates` at time `o.rowtime`. The `currency` field has been defined as the primary key of `Rates` before and is used to connect both tables in our example. If the query were using a processing-time notion, a newly appended order would always be joined with the most recent version of `Rates` when executing the operation. In contrast to [regular joins](#regular-joins), this means that if there is a new record on the build side, it will not affect the previous results of the join. This again allows Flink to limit the number of elements that must be kept in the state. @@ -189,14 +189,26 @@ val result = orders +**Note**: State retention defined in a [query configuration](query_configuration.html) is not yet considered for temporal joins. This means that the required state to compute the query result might grow infinitely depending on the number of distinct primary keys for the history table. + ### Processing-time Temporal Joins With a processing-time time attribute, it is impossible to pass _past_ time attributes as an argument to the temporal table function. -By definition, it is always the current timestamp. Thus, processing-time temporal table function invocations will always return the latest known versions of the underlying table +By definition, it is always the current timestamp. Thus, invocations of a processing-time temporal table function will always return the latest known versions of the underlying table and any updates in the underlying history table will also immediately overwrite the current values. Only the latest versions (with respect to the defined primary key) of the build side records are kept in the state. New updates will have no effect on the previously results emitted/processed records from the probe side. -One can think about processing-time temporal join as a simple `HashMap` that stores all of the records from the build side. +One can think about a processing-time temporal join as a simple `HashMap` that stores all of the records from the build side. When a new record from the build side has the same key as some previous record, the old value is just simply overwritten. Every record from the probe side is always evaluated against the most recent/current state of the `HashMap`. + +### Event-time Temporal Joins + +With an event-time time attribute (i.e., a rowtime attribute), it is possible to pass _past_ time attributes to the temporal table function. This allows for joining the two tables at a common point in time. + +Compared to processing-time temporal joins, the temporal table does not only keep the latest version (with respect to the defined primary key) of the build side records in the state but stores multiple versions of them identified by time. + +For example, an incoming row with an event-time timestamp of `12:30:00` that is appened to the probe side table is joined with the version of the build side table at time
[jira] [Commented] (FLINK-10821) Resuming Externalized Checkpoint E2E test does not resume from Externalized Checkpoint
[ https://issues.apache.org/jira/browse/FLINK-10821?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16683423#comment-16683423 ] ASF GitHub Bot commented on FLINK-10821: igalshilman commented on a change in pull request #7072: [FLINK-10821] Fix resume from externalized checkpoints E2E Test URL: https://github.com/apache/flink/pull/7072#discussion_r232572095 ## File path: flink-end-to-end-tests/test-scripts/test_resume_externalized_checkpoints.sh ## @@ -125,9 +126,20 @@ fi echo "Restoring job with externalized checkpoint at $CHECKPOINT_PATH ..." -BASE_JOB_CMD=`buildBaseJobCmd $NEW_DOP` +BASE_JOB_CMD=`buildBaseJobCmd $NEW_DOP "-s file://${CHECKPOINT_PATH}"` +JOB_CMD="" +if [[ $SIMULATE_FAILURE == "true" ]]; then Review comment: Thanks for looking into it @tillrohrmann! We construct a failing job (that doesn't actually fail since all the parameters are 0) so that the recovered job and the original job would have exactly the same operators (omitting the `--test.simulate_failure true` omits the `FailureMapper` from the recovered job.) Having the jobs differ doesn't allow recovering from the externalized checkpoint without supplying `--allowNonRestoredState` (unless there is a different way that I'm not aware of) The problem with passing this argument is: 1. I think It makes the test more fragile , if the recovered job would accidentally start with a fresh state. 2. The `SemanticsCheckMapper` reports violation in the following case: ```run_test "Running HA per-job cluster (file, sync) end-to-end test" "$END_TO_END_DIR/test-scripts/test_ha_per_job_cluster_datastream.sh file false false" "skip_check_exceptions" ``` which looks like an artifact of how a job is constructed from its arguments via `DataStreamAllroundTestJobFactory` and the missing `.uid()` but we can look into that in a followup ticket? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Resuming Externalized Checkpoint E2E test does not resume from Externalized > Checkpoint > -- > > Key: FLINK-10821 > URL: https://issues.apache.org/jira/browse/FLINK-10821 > Project: Flink > Issue Type: Bug > Components: E2E Tests >Affects Versions: 1.7.0 >Reporter: Gary Yao >Assignee: Igal Shilman >Priority: Blocker > Labels: pull-request-available > Fix For: 1.7.0 > > > Path to externalized checkpoint is not passed as the {{-s}} argument: > https://github.com/apache/flink/blob/483507a65c7547347eaafb21a24967c470f94ed6/flink-end-to-end-tests/test-scripts/test_resume_externalized_checkpoints.sh#L128 > That is, the test currently restarts the job without checkpoint. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] igalshilman commented on a change in pull request #7072: [FLINK-10821] Fix resume from externalized checkpoints E2E Test
igalshilman commented on a change in pull request #7072: [FLINK-10821] Fix resume from externalized checkpoints E2E Test URL: https://github.com/apache/flink/pull/7072#discussion_r232572095 ## File path: flink-end-to-end-tests/test-scripts/test_resume_externalized_checkpoints.sh ## @@ -125,9 +126,20 @@ fi echo "Restoring job with externalized checkpoint at $CHECKPOINT_PATH ..." -BASE_JOB_CMD=`buildBaseJobCmd $NEW_DOP` +BASE_JOB_CMD=`buildBaseJobCmd $NEW_DOP "-s file://${CHECKPOINT_PATH}"` +JOB_CMD="" +if [[ $SIMULATE_FAILURE == "true" ]]; then Review comment: Thanks for looking into it @tillrohrmann! We construct a failing job (that doesn't actually fail since all the parameters are 0) so that the recovered job and the original job would have exactly the same operators (omitting the `--test.simulate_failure true` omits the `FailureMapper` from the recovered job.) Having the jobs differ doesn't allow recovering from the externalized checkpoint without supplying `--allowNonRestoredState` (unless there is a different way that I'm not aware of) The problem with passing this argument is: 1. I think It makes the test more fragile , if the recovered job would accidentally start with a fresh state. 2. The `SemanticsCheckMapper` reports violation in the following case: ``` run_test "Running HA per-job cluster (file, sync) end-to-end test" "$END_TO_END_DIR/test-scripts/test_ha_per_job_cluster_datastream.sh file false false" "skip_check_exceptions" ``` which looks like an artifact of how a job is constructed from its arguments via `DataStreamAllroundTestJobFactory` and the missing `.uid()` but we can look into that in a followup ticket? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10821) Resuming Externalized Checkpoint E2E test does not resume from Externalized Checkpoint
[ https://issues.apache.org/jira/browse/FLINK-10821?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16683426#comment-16683426 ] ASF GitHub Bot commented on FLINK-10821: igalshilman commented on a change in pull request #7072: [FLINK-10821] Fix resume from externalized checkpoints E2E Test URL: https://github.com/apache/flink/pull/7072#discussion_r232572095 ## File path: flink-end-to-end-tests/test-scripts/test_resume_externalized_checkpoints.sh ## @@ -125,9 +126,20 @@ fi echo "Restoring job with externalized checkpoint at $CHECKPOINT_PATH ..." -BASE_JOB_CMD=`buildBaseJobCmd $NEW_DOP` +BASE_JOB_CMD=`buildBaseJobCmd $NEW_DOP "-s file://${CHECKPOINT_PATH}"` +JOB_CMD="" +if [[ $SIMULATE_FAILURE == "true" ]]; then Review comment: Thanks for looking into it @tillrohrmann! We construct a failing job (that doesn't actually fail since all the parameters are 0) so that the recovered job and the original job would have exactly the same operators (omitting the `--test.simulate_failure true` omits the `FailureMapper` from the recovered job.) Having the jobs differ doesn't allow recovering from the externalized checkpoint without supplying `--allowNonRestoredState` (unless there is a different way that I'm not aware of) The problem with passing this argument is: 1. I think It makes the test more fragile , if the recovered job would accidentally start with a fresh state. 2. The `SemanticsCheckMapper` reports violation in the following case: ``` run_test "Running HA per-job cluster (file, sync) end-to-end test" "$END_TO_END_DIR/test-scripts/test_ha_per_job_cluster_datastream.sh file false false" "skip_check_exceptions" ``` which looks like an artifact of how a job is constructed from its arguments via `DataStreamAllroundTestJobFactory` and the missing `.uid()` but we can look into that in a followup ticket? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Resuming Externalized Checkpoint E2E test does not resume from Externalized > Checkpoint > -- > > Key: FLINK-10821 > URL: https://issues.apache.org/jira/browse/FLINK-10821 > Project: Flink > Issue Type: Bug > Components: E2E Tests >Affects Versions: 1.7.0 >Reporter: Gary Yao >Assignee: Igal Shilman >Priority: Blocker > Labels: pull-request-available > Fix For: 1.7.0 > > > Path to externalized checkpoint is not passed as the {{-s}} argument: > https://github.com/apache/flink/blob/483507a65c7547347eaafb21a24967c470f94ed6/flink-end-to-end-tests/test-scripts/test_resume_externalized_checkpoints.sh#L128 > That is, the test currently restarts the job without checkpoint. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] twalthr commented on issue #7066: [FLINK-10826][test] Decrease deployment size of heavy deplyment e2e t…
twalthr commented on issue #7066: [FLINK-10826][test] Decrease deployment size of heavy deplyment e2e t… URL: https://github.com/apache/flink/pull/7066#issuecomment-437803788 Thanks @StefanRRichter I will address @zentol comments and merge this to unblock #7045. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-10850) Job may hang on FAILING state if taskmanager updateTaskExecutionState failed
ouyangzhe created FLINK-10850: - Summary: Job may hang on FAILING state if taskmanager updateTaskExecutionState failed Key: FLINK-10850 URL: https://issues.apache.org/jira/browse/FLINK-10850 Project: Flink Issue Type: Bug Components: JobManager Affects Versions: 1.5.5 Reporter: ouyangzhe Fix For: 1.8.0 I encountered a job which is oom but hung on FAILING state. It left 3 slots to release, and the corresponding task state is CANCELING. I found the following log in the taskmanager, it seems that taskmanager tried to updateTaskExecutionState from CANCELING to CANCELED, but OOMed. {panel} 2018-11-08 18:01:23,250 INFO org.apache.flink.runtime.taskmanager.Task - PartialSolution (BulkIteration (Bulk Iteration)) (97/600) (46005ba837e fc4ebf783fc92121e55a8) switched from RUNNING to CANCELING. 2018-11-08 18:01:23,257 INFO org.apache.flink.runtime.taskmanager.Task - Triggering cancellation of task code PartialSolution (BulkIteration (B ulk Iteration)) (97/600) (46005ba837efc4ebf783fc92121e55a8). 2018-11-08 18:01:44,081 INFO org.apache.flink.runtime.taskmanager.Task - PartialSolution (BulkIteration (Bulk Iteration)) (97/600) (46005ba837e fc4ebf783fc92121e55a8) switched from CANCELING to CANCELED. 2018-11-08 18:01:44,081 INFO org.apache.flink.runtime.taskmanager.Task - Freeing task resources for PartialSolution (BulkIteration (Bulk Iterat ion)) (97/600) (46005ba837efc4ebf783fc92121e55a8). 2018-11-08 18:02:03,097 WARN org.apache.flink.runtime.taskmanager.Task - Task 'PartialSolution (BulkIteration (Bulk Iteration)) (97/600)' did n ot react to cancelling signal for 30 seconds, but is stuck in method: org.apache.flink.shaded.guava18.com.google.common.collect.Maps$EntryFunction$1.apply(Maps.java:86) org.apache.flink.shaded.guava18.com.google.common.collect.Iterators$8.transform(Iterators.java:799) org.apache.flink.shaded.guava18.com.google.common.collect.TransformedIterator.next(TransformedIterator.java:48) java.util.AbstractCollection.toArray(AbstractCollection.java:141) org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableList.copyOf(ImmutableList.java:258) org.apache.flink.runtime.io.network.partition.ResultPartitionManager.releasePartitionsProducedBy(ResultPartitionManager.java:100) org.apache.flink.runtime.io.network.NetworkEnvironment.unregisterTask(NetworkEnvironment.java:275) org.apache.flink.runtime.taskmanager.Task.run(Task.java:833) java.lang.Thread.run(Thread.java:745) 2018-11-08 18:02:05,665 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Discarding the results produced by task execution e9141e20871e530dee90 4ddce11adca0. 2018-11-08 18:02:22,536 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Discarding the results produced by task execution 7fac76a5d76247d803e1 f1c47a6b385f. 2018-11-08 18:03:47,210 WARN org.apache.flink.runtime.taskmanager.Task - Task 'PartialSolution (BulkIteration (Bulk Iteration)) (97/600)' did n ot react to cancelling signal for 30 seconds, but is stuck in method: org.apache.flink.runtime.memory.MemoryManager.releaseAll(MemoryManager.java:497) org.apache.flink.runtime.taskmanager.Task.run(Task.java:837) java.lang.Thread.run(Thread.java:745) 2018-11-08 18:03:47,213 INFO org.apache.flink.runtime.taskmanager.Task - Ensuring all FileSystem streams are closed for task PartialSolution (B ulkIteration (Bulk Iteration)) (97/600) (46005ba837efc4ebf783fc92121e55a8) [CANCELED] 2018-11-08 18:03:47,215 WARN org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline - An exception was thrown by a user handler while handlin g an exception event ([id: 0x397132f7, /11.10.199.197:33286 => /11.9.137.228:40859] EXCEPTION: java.lang.OutOfMemoryError: GC overhead limit exceeded) java.lang.OutOfMemoryError: GC overhead limit exceeded at org.apache.flink.shaded.akka.org.jboss.netty.buffer.HeapChannelBuffer.(HeapChannelBuffer.java:42) at org.apache.flink.shaded.akka.org.jboss.netty.buffer.BigEndianHeapChannelBuffer.(BigEndianHeapChannelBuffer.java:34) at org.apache.flink.shaded.akka.org.jboss.netty.buffer.ChannelBuffers.buffer(ChannelBuffers.java:134) at org.apache.flink.shaded.akka.org.jboss.netty.buffer.HeapChannelBufferFactory.getBuffer(HeapChannelBufferFactory.java:68) at org.apache.flink.shaded.akka.org.jboss.netty.buffer.AbstractChannelBufferFactory.getBuffer(AbstractChannelBufferFactory.java:48) at org.apache.flink.shaded.akka.org.jboss.netty.handler.codec.frame.FrameDecoder.extractFrame(FrameDecoder.java:566) at
[GitHub] asfgit closed pull request #7065: [FLINK-10626] [docs] [table] Add documentation for event-time temporal table joins
asfgit closed pull request #7065: [FLINK-10626] [docs] [table] Add documentation for event-time temporal table joins URL: https://github.com/apache/flink/pull/7065 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/docs/dev/table/streaming/joins.md b/docs/dev/table/streaming/joins.md index 24c33b9be63..49abc022ee7 100644 --- a/docs/dev/table/streaming/joins.md +++ b/docs/dev/table/streaming/joins.md @@ -136,14 +136,14 @@ SELECT o.amount * r.rate AS amount FROM Orders AS o, - LATERAL TABLE (Rates(o.proctime)) AS r + LATERAL TABLE (Rates(o.rowtime)) AS r WHERE r.currency = o.currency {% endhighlight %} -Each record from the probe side will be joined with the version of the build side table at the time of the correlated time attribute of the probe side record. +Each record from the probe side will be joined with the version of the build side table at the time of the correlated time attribute of the probe side record. In order to support updates (overwrites) of previous values on the build side table, the table must define a primary key. -In our example, each record from `Orders` will be joined with the version of `Rates` at time `o.proctime`. Because the time attribute defines a processing-time notion, a newly appended order is always joined with the most recent version of `Rates` when executing the operation. The `currency` field has been defined as the primary key of `Rates` before and is used to connect both tables in our example. +In our example, each record from `Orders` will be joined with the version of `Rates` at time `o.rowtime`. The `currency` field has been defined as the primary key of `Rates` before and is used to connect both tables in our example. If the query were using a processing-time notion, a newly appended order would always be joined with the most recent version of `Rates` when executing the operation. In contrast to [regular joins](#regular-joins), this means that if there is a new record on the build side, it will not affect the previous results of the join. This again allows Flink to limit the number of elements that must be kept in the state. @@ -189,14 +189,26 @@ val result = orders +**Note**: State retention defined in a [query configuration](query_configuration.html) is not yet considered for temporal joins. This means that the required state to compute the query result might grow infinitely depending on the number of distinct primary keys for the history table. + ### Processing-time Temporal Joins With a processing-time time attribute, it is impossible to pass _past_ time attributes as an argument to the temporal table function. -By definition, it is always the current timestamp. Thus, processing-time temporal table function invocations will always return the latest known versions of the underlying table +By definition, it is always the current timestamp. Thus, invocations of a processing-time temporal table function will always return the latest known versions of the underlying table and any updates in the underlying history table will also immediately overwrite the current values. Only the latest versions (with respect to the defined primary key) of the build side records are kept in the state. New updates will have no effect on the previously results emitted/processed records from the probe side. -One can think about processing-time temporal join as a simple `HashMap` that stores all of the records from the build side. +One can think about a processing-time temporal join as a simple `HashMap` that stores all of the records from the build side. When a new record from the build side has the same key as some previous record, the old value is just simply overwritten. Every record from the probe side is always evaluated against the most recent/current state of the `HashMap`. + +### Event-time Temporal Joins + +With an event-time time attribute (i.e., a rowtime attribute), it is possible to pass _past_ time attributes to the temporal table function. This allows for joining the two tables at a common point in time. + +Compared to processing-time temporal joins, the temporal table does not only keep the latest version (with respect to the defined primary key) of the build side records in the state but stores multiple versions of them identified by time. + +For example, an incoming row with an event-time timestamp of `12:30:00` that is appened to the probe side table is joined with the version of the build side table at time `12:30:00` according to the [concept of temporal tables](temporal_tables.html). Thus, the incoming row is only joined with rows that have a timestamp lower or equal to `12:30:00` with applied updates according to the primary key until this point
[jira] [Commented] (FLINK-10827) Add test for duplicate() to SerializerTestBase
[ https://issues.apache.org/jira/browse/FLINK-10827?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16683428#comment-16683428 ] ASF GitHub Bot commented on FLINK-10827: StefanRRichter commented on a change in pull request #7061: [FLINK-10827][tests] Add test for duplicate() to SerializerTestBase URL: https://github.com/apache/flink/pull/7061#discussion_r232572989 ## File path: flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java ## @@ -436,6 +441,36 @@ public void testNullability() { } } + @Test + public void testDuplicate() throws Exception { + final int numThreads = 10; + final TypeSerializer serializer = getSerializer(); + final OneShotLatch startLatch = new OneShotLatch(); + final List> concurrentRunners = new ArrayList<>(numThreads); + Assert.assertEquals(serializer, serializer.duplicate()); + + T[] testData = getData(); + final int testDataIterations = Math.max(1, 250 / testData.length); + + for (int i = 0; i < numThreads; ++i) { + SerializerRunner runner = new SerializerRunner<>( + startLatch, Review comment: What would be the benefit? This feels like it does exactly what we want. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add test for duplicate() to SerializerTestBase > -- > > Key: FLINK-10827 > URL: https://issues.apache.org/jira/browse/FLINK-10827 > Project: Flink > Issue Type: Test > Components: Tests >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > In the past, we had many bugs from type serializers that have not properly > implemented the {{duplicate()}} method in a proper way. A very common error > is to forget about creating a deep copy of some fields that can lead to > concurrency problems in the backend. > We should add a test case for that tests duplicated serializer from different > threads to expose concurrency problems. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8997) Add sliding window aggregation to the job
[ https://issues.apache.org/jira/browse/FLINK-8997?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16683463#comment-16683463 ] ASF GitHub Bot commented on FLINK-8997: --- dawidwys commented on a change in pull request #7039: [FLINK-8997] Added sliding window aggregation to datastream test job URL: https://github.com/apache/flink/pull/7039#discussion_r232586645 ## File path: flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/SlidingWindowCheckMapper.java ## @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.tests; + +import org.apache.flink.api.common.functions.RichFlatMapFunction; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.ListTypeInfo; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.util.Collector; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Optional; + +/** + * This mapper validates sliding event time window. It checks each event belongs to appropriate number of consecutive + * windows. + */ +public class SlidingWindowCheckMapper extends RichFlatMapFunction>, String> { + + private static final long serialVersionUID = -744070793650644485L; + + /** This value state tracks previously seen events with the number of windows they appeared in. */ + private transient ValueState>> eventsSeenSoFar; + + private transient ValueState lastSequenceNumber; + + private final int slideFactor; + + SlidingWindowCheckMapper(int slideFactor) { + this.slideFactor = slideFactor; + } + + @Override + public void open(Configuration parameters) { + ValueStateDescriptor>> previousWindowDescriptor = + new ValueStateDescriptor<>("eventsSeenSoFar", + new ListTypeInfo<>(new TupleTypeInfo<>(TypeInformation.of(Event.class), BasicTypeInfo.INT_TYPE_INFO))); + + eventsSeenSoFar = getRuntimeContext().getState(previousWindowDescriptor); + + ValueStateDescriptor lastSequenceNumberDescriptor = + new ValueStateDescriptor<>("lastSequenceNumber", BasicTypeInfo.LONG_TYPE_INFO); + + lastSequenceNumber = getRuntimeContext().getState(lastSequenceNumberDescriptor); + } + + @Override + public void flatMap(Tuple2> value, Collector out) throws Exception { + List> previousWindowValues = Optional.ofNullable(eventsSeenSoFar.value()).orElseGet( + Collections::emptyList); + + List newValues = value.f1; + Optional lastEventInWindow = verifyWindowContiguity(newValues, out); + + Long lastSequenceNumberSeenSoFar = lastSequenceNumber.value(); + List> newWindows = + verifyPreviousOccurences(previousWindowValues, newValues, out, lastSequenceNumberSeenSoFar); + + if (lastEventInWindow.isPresent()) { + updateLastSeenSequenceNumber(lastEventInWindow.get(), lastSequenceNumberSeenSoFar); + } + + eventsSeenSoFar.update(newWindows); + } + + private void updateLastSeenSequenceNumber( + Event lastEventInWindow, + Long lastSequenceNumberSeenSoFar) throws IOException { + long lastSequenceNumberInWindow = lastEventInWindow.getSequenceNumber(); + if (lastSequenceNumberSeenSoFar == null || lastSequenceNumberInWindow > lastSequenceNumberSeenSoFar) { + lastSequenceNumber.update(lastSequenceNumberInWindow); + } + } + +
[jira] [Resolved] (FLINK-10826) Heavy deployment end-to-end test produces no output on Travis
[ https://issues.apache.org/jira/browse/FLINK-10826?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Timo Walther resolved FLINK-10826. -- Resolution: Fixed Fix Version/s: 1.7.0 Fixed in master: f393fb70b27cd2a64cb2c010d7d2e902f75c519f, 6115b05c87e387c301b21758a6d29e300d138a84 Fixed in 1.7: 7d9fcbaf1408b09ee1d409ec1568a5af51d726d0, 78ea3923782fddfd4fbb414c754d4db7701e24ca > Heavy deployment end-to-end test produces no output on Travis > - > > Key: FLINK-10826 > URL: https://issues.apache.org/jira/browse/FLINK-10826 > Project: Flink > Issue Type: Bug > Components: E2E Tests >Affects Versions: 1.7.0 >Reporter: Timo Walther >Assignee: Stefan Richter >Priority: Critical > Labels: pull-request-available > Fix For: 1.7.0 > > Attachments: heavy_deployement_log.txt > > > The Heavy deployment end-to-end test produces no output on Travis such that > it is killed after 10 minutes. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] kl0u commented on issue #7056: [FLINK-10634][metrics][rest] Add metrics availability e2e test
kl0u commented on issue #7056: [FLINK-10634][metrics][rest] Add metrics availability e2e test URL: https://github.com/apache/flink/pull/7056#issuecomment-437821227 BTW the Travis build you link to has failed. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-10852) Decremented number of unfinished producers below 0. This is most likely a bug in the execution state/intermediate result partition management
ouyangzhe created FLINK-10852: - Summary: Decremented number of unfinished producers below 0. This is most likely a bug in the execution state/intermediate result partition management Key: FLINK-10852 URL: https://issues.apache.org/jira/browse/FLINK-10852 Project: Flink Issue Type: Bug Components: Core Affects Versions: 1.7.0 Reporter: ouyangzhe Fix For: 1.8.0 {panel:title=Jobs using DataSet iteration operator, if set jobmanager.execution.failover-strategy: region, will hang on FAILING state when failover and has the following exception.} java.lang.IllegalStateException: Decremented number of unfinished producers below 0. This is most likely a bug in the execution state/intermediate result partition management. at org.apache.flink.runtime.executiongraph.IntermediateResultPartition.markFinished(IntermediateResultPartition.java:103) at org.apache.flink.runtime.executiongraph.ExecutionVertex.finishAllBlockingPartitions(ExecutionVertex.java:707) at org.apache.flink.runtime.executiongraph.Execution.markFinished(Execution.java:939) at org.apache.flink.runtime.executiongraph.ExecutionGraph.updateState(ExecutionGraph.java:1568) at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:542) at sun.reflect.GeneratedMethodAccessor25.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:247) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:162) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40) at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165) at akka.actor.Actor$class.aroundReceive(Actor.scala:502) at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526) at akka.actor.ActorCell.invoke(ActorCell.scala:495) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) at akka.dispatch.Mailbox.run(Mailbox.scala:224) at akka.dispatch.Mailbox.exec(Mailbox.scala:234) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) {panel} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10674) DistinctAccumulator.remove lead to NPE
[ https://issues.apache.org/jira/browse/FLINK-10674?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16684785#comment-16684785 ] ASF GitHub Bot commented on FLINK-10674: wenhuitang closed pull request #7076: [FLINK-10674] [table] Fix DistinctAccumulator.remove lead to NPE. URL: https://github.com/apache/flink/pull/7076 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/docs/build_docs.sh b/docs/build_docs.sh old mode 100755 new mode 100644 diff --git a/docs/check_links.sh b/docs/check_links.sh old mode 100755 new mode 100644 diff --git a/docs/docker/run.sh b/docs/docker/run.sh old mode 100755 new mode 100644 diff --git a/docs/fig/ssl_internal_external.svg b/docs/fig/ssl_internal_external.svg old mode 100755 new mode 100644 diff --git a/flink-container/docker/build.sh b/flink-container/docker/build.sh old mode 100755 new mode 100644 diff --git a/flink-container/docker/docker-entrypoint.sh b/flink-container/docker/docker-entrypoint.sh old mode 100755 new mode 100644 diff --git a/flink-contrib/docker-flink/bluemix-docker-compose.sh b/flink-contrib/docker-flink/bluemix-docker-compose.sh old mode 100755 new mode 100644 diff --git a/flink-contrib/docker-flink/build.sh b/flink-contrib/docker-flink/build.sh old mode 100755 new mode 100644 diff --git a/flink-contrib/docker-flink/create-docker-swarm-service.sh b/flink-contrib/docker-flink/create-docker-swarm-service.sh old mode 100755 new mode 100644 diff --git a/flink-contrib/docker-flink/docker-entrypoint.sh b/flink-contrib/docker-flink/docker-entrypoint.sh old mode 100755 new mode 100644 diff --git a/flink-contrib/docker-flink/remove-docker-swarm-service.sh b/flink-contrib/docker-flink/remove-docker-swarm-service.sh old mode 100755 new mode 100644 diff --git a/flink-dist/src/main/flink-bin/bin/config.sh b/flink-dist/src/main/flink-bin/bin/config.sh old mode 100755 new mode 100644 diff --git a/flink-dist/src/main/flink-bin/bin/jobmanager.sh b/flink-dist/src/main/flink-bin/bin/jobmanager.sh old mode 100755 new mode 100644 diff --git a/flink-dist/src/main/flink-bin/bin/start-cluster.sh b/flink-dist/src/main/flink-bin/bin/start-cluster.sh old mode 100755 new mode 100644 diff --git a/flink-dist/src/main/flink-bin/bin/start-zookeeper-quorum.sh b/flink-dist/src/main/flink-bin/bin/start-zookeeper-quorum.sh old mode 100755 new mode 100644 diff --git a/flink-dist/src/main/flink-bin/bin/stop-cluster.sh b/flink-dist/src/main/flink-bin/bin/stop-cluster.sh old mode 100755 new mode 100644 diff --git a/flink-dist/src/main/flink-bin/bin/stop-zookeeper-quorum.sh b/flink-dist/src/main/flink-bin/bin/stop-zookeeper-quorum.sh old mode 100755 new mode 100644 diff --git a/flink-dist/src/main/flink-bin/bin/taskmanager.sh b/flink-dist/src/main/flink-bin/bin/taskmanager.sh old mode 100755 new mode 100644 diff --git a/flink-dist/src/main/flink-bin/bin/zookeeper.sh b/flink-dist/src/main/flink-bin/bin/zookeeper.sh old mode 100755 new mode 100644 diff --git a/flink-dist/src/main/flink-bin/mesos-bin/mesos-appmaster-job.sh b/flink-dist/src/main/flink-bin/mesos-bin/mesos-appmaster-job.sh old mode 100755 new mode 100644 diff --git a/flink-dist/src/main/flink-bin/mesos-bin/mesos-appmaster.sh b/flink-dist/src/main/flink-bin/mesos-bin/mesos-appmaster.sh old mode 100755 new mode 100644 diff --git a/flink-dist/src/main/flink-bin/mesos-bin/mesos-taskmanager.sh b/flink-dist/src/main/flink-bin/mesos-bin/mesos-taskmanager.sh old mode 100755 new mode 100644 diff --git a/flink-dist/src/main/flink-bin/yarn-bin/yarn-session.sh b/flink-dist/src/main/flink-bin/yarn-bin/yarn-session.sh old mode 100755 new mode 100644 diff --git a/flink-end-to-end-tests/flink-parent-child-classloading-test/src/main/resources/.version.properties b/flink-end-to-end-tests/flink-parent-child-classloading-test/src/main/resources/.version.properties deleted file mode 100644 index 76f9c5aa859..000 --- a/flink-end-to-end-tests/flink-parent-child-classloading-test/src/main/resources/.version.properties +++ /dev/null @@ -1 +0,0 @@ -git.commit.id.abbrev=hello-there-42 diff --git a/flink-end-to-end-tests/test-scripts/test_resume_externalized_checkpoints.sh b/flink-end-to-end-tests/test-scripts/test_resume_externalized_checkpoints.sh old mode 100755 new mode 100644 diff --git a/flink-end-to-end-tests/test-scripts/test_resume_savepoint.sh b/flink-end-to-end-tests/test-scripts/test_resume_savepoint.sh old mode 100755 new mode 100644 diff --git a/flink-end-to-end-tests/test-scripts/test_yarn_kerberos_docker.sh b/flink-end-to-end-tests/test-scripts/test_yarn_kerberos_docker.sh old mode 100755 new mode 100644 diff --git a/flink-jepsen/docker/up.sh b/flink-jepsen/docker/up.sh old mode 100755 new mode 100644
[GitHub] wenhuitang closed pull request #7076: [FLINK-10674] [table] Fix DistinctAccumulator.remove lead to NPE.
wenhuitang closed pull request #7076: [FLINK-10674] [table] Fix DistinctAccumulator.remove lead to NPE. URL: https://github.com/apache/flink/pull/7076 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/docs/build_docs.sh b/docs/build_docs.sh old mode 100755 new mode 100644 diff --git a/docs/check_links.sh b/docs/check_links.sh old mode 100755 new mode 100644 diff --git a/docs/docker/run.sh b/docs/docker/run.sh old mode 100755 new mode 100644 diff --git a/docs/fig/ssl_internal_external.svg b/docs/fig/ssl_internal_external.svg old mode 100755 new mode 100644 diff --git a/flink-container/docker/build.sh b/flink-container/docker/build.sh old mode 100755 new mode 100644 diff --git a/flink-container/docker/docker-entrypoint.sh b/flink-container/docker/docker-entrypoint.sh old mode 100755 new mode 100644 diff --git a/flink-contrib/docker-flink/bluemix-docker-compose.sh b/flink-contrib/docker-flink/bluemix-docker-compose.sh old mode 100755 new mode 100644 diff --git a/flink-contrib/docker-flink/build.sh b/flink-contrib/docker-flink/build.sh old mode 100755 new mode 100644 diff --git a/flink-contrib/docker-flink/create-docker-swarm-service.sh b/flink-contrib/docker-flink/create-docker-swarm-service.sh old mode 100755 new mode 100644 diff --git a/flink-contrib/docker-flink/docker-entrypoint.sh b/flink-contrib/docker-flink/docker-entrypoint.sh old mode 100755 new mode 100644 diff --git a/flink-contrib/docker-flink/remove-docker-swarm-service.sh b/flink-contrib/docker-flink/remove-docker-swarm-service.sh old mode 100755 new mode 100644 diff --git a/flink-dist/src/main/flink-bin/bin/config.sh b/flink-dist/src/main/flink-bin/bin/config.sh old mode 100755 new mode 100644 diff --git a/flink-dist/src/main/flink-bin/bin/jobmanager.sh b/flink-dist/src/main/flink-bin/bin/jobmanager.sh old mode 100755 new mode 100644 diff --git a/flink-dist/src/main/flink-bin/bin/start-cluster.sh b/flink-dist/src/main/flink-bin/bin/start-cluster.sh old mode 100755 new mode 100644 diff --git a/flink-dist/src/main/flink-bin/bin/start-zookeeper-quorum.sh b/flink-dist/src/main/flink-bin/bin/start-zookeeper-quorum.sh old mode 100755 new mode 100644 diff --git a/flink-dist/src/main/flink-bin/bin/stop-cluster.sh b/flink-dist/src/main/flink-bin/bin/stop-cluster.sh old mode 100755 new mode 100644 diff --git a/flink-dist/src/main/flink-bin/bin/stop-zookeeper-quorum.sh b/flink-dist/src/main/flink-bin/bin/stop-zookeeper-quorum.sh old mode 100755 new mode 100644 diff --git a/flink-dist/src/main/flink-bin/bin/taskmanager.sh b/flink-dist/src/main/flink-bin/bin/taskmanager.sh old mode 100755 new mode 100644 diff --git a/flink-dist/src/main/flink-bin/bin/zookeeper.sh b/flink-dist/src/main/flink-bin/bin/zookeeper.sh old mode 100755 new mode 100644 diff --git a/flink-dist/src/main/flink-bin/mesos-bin/mesos-appmaster-job.sh b/flink-dist/src/main/flink-bin/mesos-bin/mesos-appmaster-job.sh old mode 100755 new mode 100644 diff --git a/flink-dist/src/main/flink-bin/mesos-bin/mesos-appmaster.sh b/flink-dist/src/main/flink-bin/mesos-bin/mesos-appmaster.sh old mode 100755 new mode 100644 diff --git a/flink-dist/src/main/flink-bin/mesos-bin/mesos-taskmanager.sh b/flink-dist/src/main/flink-bin/mesos-bin/mesos-taskmanager.sh old mode 100755 new mode 100644 diff --git a/flink-dist/src/main/flink-bin/yarn-bin/yarn-session.sh b/flink-dist/src/main/flink-bin/yarn-bin/yarn-session.sh old mode 100755 new mode 100644 diff --git a/flink-end-to-end-tests/flink-parent-child-classloading-test/src/main/resources/.version.properties b/flink-end-to-end-tests/flink-parent-child-classloading-test/src/main/resources/.version.properties deleted file mode 100644 index 76f9c5aa859..000 --- a/flink-end-to-end-tests/flink-parent-child-classloading-test/src/main/resources/.version.properties +++ /dev/null @@ -1 +0,0 @@ -git.commit.id.abbrev=hello-there-42 diff --git a/flink-end-to-end-tests/test-scripts/test_resume_externalized_checkpoints.sh b/flink-end-to-end-tests/test-scripts/test_resume_externalized_checkpoints.sh old mode 100755 new mode 100644 diff --git a/flink-end-to-end-tests/test-scripts/test_resume_savepoint.sh b/flink-end-to-end-tests/test-scripts/test_resume_savepoint.sh old mode 100755 new mode 100644 diff --git a/flink-end-to-end-tests/test-scripts/test_yarn_kerberos_docker.sh b/flink-end-to-end-tests/test-scripts/test_yarn_kerberos_docker.sh old mode 100755 new mode 100644 diff --git a/flink-jepsen/docker/up.sh b/flink-jepsen/docker/up.sh old mode 100755 new mode 100644 diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/DistinctAccumulator.scala
[GitHub] wenhuitang opened a new pull request #7076: [FLINK-10674] [table] Fix DistinctAccumulator.remove lead to NPE.
wenhuitang opened a new pull request #7076: [FLINK-10674] [table] Fix DistinctAccumulator.remove lead to NPE. URL: https://github.com/apache/flink/pull/7076 ## What is the purpose of the change This pull request fixes the problem reported by FLINK-10674 DistinctAccumulator.remove lead to NPE. ## Brief change log DistinctAccumulator.remove deals with the situation that there is no corresponding instance of the parameters in the distinct map. ## Verifying this change *(Please pick either of the following options)* This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / no) no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / no) no - The serializers: (yes / no / don't know) no - The runtime per-record code paths (performance sensitive): (yes / no / don't know) no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / no / don't know) no - The S3 file system connector: (yes / no / don't know) no ## Documentation - Does this pull request introduce a new feature? (yes / no) no - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-10857) Conflict between JMX and Prometheus Metrics reporter
Truong Duc Kien created FLINK-10857: --- Summary: Conflict between JMX and Prometheus Metrics reporter Key: FLINK-10857 URL: https://issues.apache.org/jira/browse/FLINK-10857 Project: Flink Issue Type: Bug Components: Metrics Affects Versions: 1.6.2 Reporter: Truong Duc Kien When registering both JMX and Prometheus metrics reporter, the Prometheus reporter will fail with an exception. {code:java} o.a.f.r.m.MetricRegistryImpl Error while registering metric. java.lang.IllegalArgumentException: Invalid metric name: flink_jobmanager.Status.JVM.Memory.Mapped_Count at org.apache.flink.shaded.io.prometheus.client.Collector.checkMetricName(Collector.java:182) at org.apache.flink.shaded.io.prometheus.client.SimpleCollector.(SimpleCollector.java:164) at org.apache.flink.shaded.io.prometheus.client.Gauge.(Gauge.java:68) at org.apache.flink.shaded.io.prometheus.client.Gauge$Builder.create(Gauge.java:74) at org.apache.flink.metrics.prometheus.AbstractPrometheusReporter.createCollector(AbstractPrometheusReporter.java:130) at org.apache.flink.metrics.prometheus.AbstractPrometheusReporter.notifyOfAddedMetric(AbstractPrometheusReporter.java:106) at org.apache.flink.runtime.metrics.MetricRegistryImpl.register(MetricRegistryImpl.java:329) at org.apache.flink.runtime.metrics.groups.AbstractMetricGroup.addMetric(AbstractMetricGroup.java:379) at org.apache.flink.runtime.metrics.groups.AbstractMetricGroup.gauge(AbstractMetricGroup.java:323) at org.apache.flink.runtime.metrics.util.MetricUtils.instantiateMemoryMetrics(MetricUtils.java:231) at org.apache.flink.runtime.metrics.util.MetricUtils.instantiateStatusMetrics(MetricUtils.java:100) at org.apache.flink.runtime.metrics.util.MetricUtils.instantiateJobManagerMetricGroup(MetricUtils.java:68) at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startClusterComponents(ClusterEntrypoint.java:342) at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:233) at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$0(ClusterEntrypoint.java:191) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1869) at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:190) at org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint.main(YarnJobClusterEntrypoint.java:176) {code} This is a small program to reproduce the problem: https://github.com/dikei/flink-metrics-conflict-test I -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-10857) Conflict between JMX and Prometheus Metrics reporter
[ https://issues.apache.org/jira/browse/FLINK-10857?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Truong Duc Kien updated FLINK-10857: Description: When registering both JMX and Prometheus metrics reporter, the Prometheus reporter will fail with many exceptions, such as. {code:java} o.a.f.r.m.MetricRegistryImpl Error while registering metric. java.lang.IllegalArgumentException: Invalid metric name: flink_jobmanager.Status.JVM.Memory.Mapped_Count at org.apache.flink.shaded.io.prometheus.client.Collector.checkMetricName(Collector.java:182) at org.apache.flink.shaded.io.prometheus.client.SimpleCollector.(SimpleCollector.java:164) at org.apache.flink.shaded.io.prometheus.client.Gauge.(Gauge.java:68) at org.apache.flink.shaded.io.prometheus.client.Gauge$Builder.create(Gauge.java:74) at org.apache.flink.metrics.prometheus.AbstractPrometheusReporter.createCollector(AbstractPrometheusReporter.java:130) at org.apache.flink.metrics.prometheus.AbstractPrometheusReporter.notifyOfAddedMetric(AbstractPrometheusReporter.java:106) at org.apache.flink.runtime.metrics.MetricRegistryImpl.register(MetricRegistryImpl.java:329) at org.apache.flink.runtime.metrics.groups.AbstractMetricGroup.addMetric(AbstractMetricGroup.java:379) at org.apache.flink.runtime.metrics.groups.AbstractMetricGroup.gauge(AbstractMetricGroup.java:323) at org.apache.flink.runtime.metrics.util.MetricUtils.instantiateMemoryMetrics(MetricUtils.java:231) at org.apache.flink.runtime.metrics.util.MetricUtils.instantiateStatusMetrics(MetricUtils.java:100) at org.apache.flink.runtime.metrics.util.MetricUtils.instantiateJobManagerMetricGroup(MetricUtils.java:68) at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startClusterComponents(ClusterEntrypoint.java:342) at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:233) at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$0(ClusterEntrypoint.java:191) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1869) at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:190) at org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint.main(YarnJobClusterEntrypoint.java:176) {code} This is a small program to reproduce the problem: [https://github.com/dikei/flink-metrics-conflict-test] I was: When registering both JMX and Prometheus metrics reporter, the Prometheus reporter will fail with an exception. {code:java} o.a.f.r.m.MetricRegistryImpl Error while registering metric. java.lang.IllegalArgumentException: Invalid metric name: flink_jobmanager.Status.JVM.Memory.Mapped_Count at org.apache.flink.shaded.io.prometheus.client.Collector.checkMetricName(Collector.java:182) at org.apache.flink.shaded.io.prometheus.client.SimpleCollector.(SimpleCollector.java:164) at org.apache.flink.shaded.io.prometheus.client.Gauge.(Gauge.java:68) at org.apache.flink.shaded.io.prometheus.client.Gauge$Builder.create(Gauge.java:74) at org.apache.flink.metrics.prometheus.AbstractPrometheusReporter.createCollector(AbstractPrometheusReporter.java:130) at org.apache.flink.metrics.prometheus.AbstractPrometheusReporter.notifyOfAddedMetric(AbstractPrometheusReporter.java:106) at org.apache.flink.runtime.metrics.MetricRegistryImpl.register(MetricRegistryImpl.java:329) at org.apache.flink.runtime.metrics.groups.AbstractMetricGroup.addMetric(AbstractMetricGroup.java:379) at org.apache.flink.runtime.metrics.groups.AbstractMetricGroup.gauge(AbstractMetricGroup.java:323) at org.apache.flink.runtime.metrics.util.MetricUtils.instantiateMemoryMetrics(MetricUtils.java:231) at org.apache.flink.runtime.metrics.util.MetricUtils.instantiateStatusMetrics(MetricUtils.java:100) at org.apache.flink.runtime.metrics.util.MetricUtils.instantiateJobManagerMetricGroup(MetricUtils.java:68) at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startClusterComponents(ClusterEntrypoint.java:342) at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:233) at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$0(ClusterEntrypoint.java:191) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1869)
[jira] [Comment Edited] (FLINK-10818) RestartStrategies.fixedDelayRestart Occur NoResourceAvailableException: Not enough free slots available to run the job.
[ https://issues.apache.org/jira/browse/FLINK-10818?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16684814#comment-16684814 ] ambition edited comment on FLINK-10818 at 11/13/18 7:37 AM: logs file : [^jobmanager.log] ^[^taskmanager.log]^ was (Author: ambition): [^jobmanager.log] ^[^taskmanager.log]^ > RestartStrategies.fixedDelayRestart Occur NoResourceAvailableException: Not > enough free slots available to run the job. > > > Key: FLINK-10818 > URL: https://issues.apache.org/jira/browse/FLINK-10818 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.6.2 > Environment: JDK 1.8 > Flink 1.6.0 > Hadoop 2.7.3 >Reporter: ambition >Priority: Major > Attachments: image-2018-11-12-11-05-38-159.png, > image-2018-11-12-11-06-33-387.png, image-2018-11-12-11-08-13-572.png, > jobmanager.log, taskmanager.log > > > Our Online Flink on Yarn environment operation job,code set restart tactic > like > {code:java} > exeEnv.setRestartStrategy(RestartStrategies.fixedDelayRestart(5,1000l)); > {code} > But job running some days, Occur Exception is : > {code:java} > org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: > Not enough free slots available to run the job. You can decrease the operator > parallelism or increase the number of slots per TaskManager in the > configuration. Task to schedule: < Attempt #5 (Source: KafkaJsonTableSource > -> Map -> where: (AND(OR(=(app_key, _UTF-16LE'C4FAF9CE1569F541'), =(app_key, > _UTF-16LE'F5C7F68C7117630B'), =(app_key, _UTF-16LE'57C6FF4B5A064D29')), > OR(=(LOWER(TRIM(FLAG(BOTH), _UTF-16LE' ', os_type)), _UTF-16LE'ios'), > =(LOWER(TRIM(FLAG(BOTH), _UTF-16LE' ', os_type)), _UTF-16LE'android')), IS > NOT NULL(server_id))), select: (MT_Date_Format_Mode(receive_time, > _UTF-16LE'MMddHHmm', 10) AS date_p, LOWER(TRIM(FLAG(BOTH), _UTF-16LE' ', > os_type)) AS os_type, MT_Date_Format_Mode(receive_time, _UTF-16LE'HHmm', 10) > AS date_mm, server_id) (1/6)) @ (unassigned) - [SCHEDULED] > with groupID < > cbc357ccb763df2852fee8c4fc7d55f2 > in sharing group < > 690dbad267a8ff37c8cb5e9dbedd0a6d >. Resources available to scheduler: Number > of instances=6, total number of slots=6, available slots=0 >at > org.apache.flink.runtime.jobmanager.scheduler.Scheduler.scheduleTask(Scheduler.java:281) >at > org.apache.flink.runtime.jobmanager.scheduler.Scheduler.allocateSlot(Scheduler.java:155) >at > org.apache.flink.runtime.executiongraph.Execution.lambda$allocateAndAssignSlotForExecution$2(Execution.java:491) >at > org.apache.flink.runtime.executiongraph.Execution$$Lambda$44/1664178385.apply(Unknown > Source) >at > java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:981) >at > java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2116) >at > org.apache.flink.runtime.executiongraph.Execution.allocateAndAssignSlotForExecution(Execution.java:489) >at > org.apache.flink.runtime.executiongraph.ExecutionJobVertex.allocateResourcesForAll(ExecutionJobVertex.java:521) >at > org.apache.flink.runtime.executiongraph.ExecutionGraph.scheduleEager(ExecutionGraph.java:945) >at > org.apache.flink.runtime.executiongraph.ExecutionGraph.scheduleForExecution(ExecutionGraph.java:875) >at > org.apache.flink.runtime.executiongraph.ExecutionGraph.restart(ExecutionGraph.java:1262) >at > org.apache.flink.runtime.executiongraph.restart.ExecutionGraphRestartCallback.triggerFullRecovery(ExecutionGraphRestartCallback.java:59) >at > org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy$1.run(FixedDelayRestartStrategy.java:68) >at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) >at java.util.concurrent.FutureTask.run(FutureTask.java:266) >at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) >at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) >at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >at java.lang.Thread.run(Thread.java:745) > {code} > > this Exception happened when the job started. issue links to > https://issues.apache.org/jira/browse/FLINK-4486 > > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-10818) RestartStrategies.fixedDelayRestart Occur NoResourceAvailableException: Not enough free slots available to run the job.
[ https://issues.apache.org/jira/browse/FLINK-10818?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16684814#comment-16684814 ] ambition edited comment on FLINK-10818 at 11/13/18 7:36 AM: [^jobmanager.log] ^[^taskmanager.log]^ was (Author: ambition): [^jobmanager.log][^taskmanager.log] > RestartStrategies.fixedDelayRestart Occur NoResourceAvailableException: Not > enough free slots available to run the job. > > > Key: FLINK-10818 > URL: https://issues.apache.org/jira/browse/FLINK-10818 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.6.2 > Environment: JDK 1.8 > Flink 1.6.0 > Hadoop 2.7.3 >Reporter: ambition >Priority: Major > Attachments: image-2018-11-12-11-05-38-159.png, > image-2018-11-12-11-06-33-387.png, image-2018-11-12-11-08-13-572.png, > jobmanager.log, taskmanager.log > > > Our Online Flink on Yarn environment operation job,code set restart tactic > like > {code:java} > exeEnv.setRestartStrategy(RestartStrategies.fixedDelayRestart(5,1000l)); > {code} > But job running some days, Occur Exception is : > {code:java} > org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: > Not enough free slots available to run the job. You can decrease the operator > parallelism or increase the number of slots per TaskManager in the > configuration. Task to schedule: < Attempt #5 (Source: KafkaJsonTableSource > -> Map -> where: (AND(OR(=(app_key, _UTF-16LE'C4FAF9CE1569F541'), =(app_key, > _UTF-16LE'F5C7F68C7117630B'), =(app_key, _UTF-16LE'57C6FF4B5A064D29')), > OR(=(LOWER(TRIM(FLAG(BOTH), _UTF-16LE' ', os_type)), _UTF-16LE'ios'), > =(LOWER(TRIM(FLAG(BOTH), _UTF-16LE' ', os_type)), _UTF-16LE'android')), IS > NOT NULL(server_id))), select: (MT_Date_Format_Mode(receive_time, > _UTF-16LE'MMddHHmm', 10) AS date_p, LOWER(TRIM(FLAG(BOTH), _UTF-16LE' ', > os_type)) AS os_type, MT_Date_Format_Mode(receive_time, _UTF-16LE'HHmm', 10) > AS date_mm, server_id) (1/6)) @ (unassigned) - [SCHEDULED] > with groupID < > cbc357ccb763df2852fee8c4fc7d55f2 > in sharing group < > 690dbad267a8ff37c8cb5e9dbedd0a6d >. Resources available to scheduler: Number > of instances=6, total number of slots=6, available slots=0 >at > org.apache.flink.runtime.jobmanager.scheduler.Scheduler.scheduleTask(Scheduler.java:281) >at > org.apache.flink.runtime.jobmanager.scheduler.Scheduler.allocateSlot(Scheduler.java:155) >at > org.apache.flink.runtime.executiongraph.Execution.lambda$allocateAndAssignSlotForExecution$2(Execution.java:491) >at > org.apache.flink.runtime.executiongraph.Execution$$Lambda$44/1664178385.apply(Unknown > Source) >at > java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:981) >at > java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2116) >at > org.apache.flink.runtime.executiongraph.Execution.allocateAndAssignSlotForExecution(Execution.java:489) >at > org.apache.flink.runtime.executiongraph.ExecutionJobVertex.allocateResourcesForAll(ExecutionJobVertex.java:521) >at > org.apache.flink.runtime.executiongraph.ExecutionGraph.scheduleEager(ExecutionGraph.java:945) >at > org.apache.flink.runtime.executiongraph.ExecutionGraph.scheduleForExecution(ExecutionGraph.java:875) >at > org.apache.flink.runtime.executiongraph.ExecutionGraph.restart(ExecutionGraph.java:1262) >at > org.apache.flink.runtime.executiongraph.restart.ExecutionGraphRestartCallback.triggerFullRecovery(ExecutionGraphRestartCallback.java:59) >at > org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy$1.run(FixedDelayRestartStrategy.java:68) >at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) >at java.util.concurrent.FutureTask.run(FutureTask.java:266) >at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) >at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) >at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >at java.lang.Thread.run(Thread.java:745) > {code} > > this Exception happened when the job started. issue links to > https://issues.apache.org/jira/browse/FLINK-4486 > > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10818) RestartStrategies.fixedDelayRestart Occur NoResourceAvailableException: Not enough free slots available to run the job.
[ https://issues.apache.org/jira/browse/FLINK-10818?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16684814#comment-16684814 ] ambition commented on FLINK-10818: -- [^jobmanager.log][^taskmanager.log] > RestartStrategies.fixedDelayRestart Occur NoResourceAvailableException: Not > enough free slots available to run the job. > > > Key: FLINK-10818 > URL: https://issues.apache.org/jira/browse/FLINK-10818 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.6.2 > Environment: JDK 1.8 > Flink 1.6.0 > Hadoop 2.7.3 >Reporter: ambition >Priority: Major > Attachments: image-2018-11-12-11-05-38-159.png, > image-2018-11-12-11-06-33-387.png, image-2018-11-12-11-08-13-572.png, > jobmanager.log > > > Our Online Flink on Yarn environment operation job,code set restart tactic > like > {code:java} > exeEnv.setRestartStrategy(RestartStrategies.fixedDelayRestart(5,1000l)); > {code} > But job running some days, Occur Exception is : > {code:java} > org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: > Not enough free slots available to run the job. You can decrease the operator > parallelism or increase the number of slots per TaskManager in the > configuration. Task to schedule: < Attempt #5 (Source: KafkaJsonTableSource > -> Map -> where: (AND(OR(=(app_key, _UTF-16LE'C4FAF9CE1569F541'), =(app_key, > _UTF-16LE'F5C7F68C7117630B'), =(app_key, _UTF-16LE'57C6FF4B5A064D29')), > OR(=(LOWER(TRIM(FLAG(BOTH), _UTF-16LE' ', os_type)), _UTF-16LE'ios'), > =(LOWER(TRIM(FLAG(BOTH), _UTF-16LE' ', os_type)), _UTF-16LE'android')), IS > NOT NULL(server_id))), select: (MT_Date_Format_Mode(receive_time, > _UTF-16LE'MMddHHmm', 10) AS date_p, LOWER(TRIM(FLAG(BOTH), _UTF-16LE' ', > os_type)) AS os_type, MT_Date_Format_Mode(receive_time, _UTF-16LE'HHmm', 10) > AS date_mm, server_id) (1/6)) @ (unassigned) - [SCHEDULED] > with groupID < > cbc357ccb763df2852fee8c4fc7d55f2 > in sharing group < > 690dbad267a8ff37c8cb5e9dbedd0a6d >. Resources available to scheduler: Number > of instances=6, total number of slots=6, available slots=0 >at > org.apache.flink.runtime.jobmanager.scheduler.Scheduler.scheduleTask(Scheduler.java:281) >at > org.apache.flink.runtime.jobmanager.scheduler.Scheduler.allocateSlot(Scheduler.java:155) >at > org.apache.flink.runtime.executiongraph.Execution.lambda$allocateAndAssignSlotForExecution$2(Execution.java:491) >at > org.apache.flink.runtime.executiongraph.Execution$$Lambda$44/1664178385.apply(Unknown > Source) >at > java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:981) >at > java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2116) >at > org.apache.flink.runtime.executiongraph.Execution.allocateAndAssignSlotForExecution(Execution.java:489) >at > org.apache.flink.runtime.executiongraph.ExecutionJobVertex.allocateResourcesForAll(ExecutionJobVertex.java:521) >at > org.apache.flink.runtime.executiongraph.ExecutionGraph.scheduleEager(ExecutionGraph.java:945) >at > org.apache.flink.runtime.executiongraph.ExecutionGraph.scheduleForExecution(ExecutionGraph.java:875) >at > org.apache.flink.runtime.executiongraph.ExecutionGraph.restart(ExecutionGraph.java:1262) >at > org.apache.flink.runtime.executiongraph.restart.ExecutionGraphRestartCallback.triggerFullRecovery(ExecutionGraphRestartCallback.java:59) >at > org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy$1.run(FixedDelayRestartStrategy.java:68) >at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) >at java.util.concurrent.FutureTask.run(FutureTask.java:266) >at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) >at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) >at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >at java.lang.Thread.run(Thread.java:745) > {code} > > this Exception happened when the job started. issue links to > https://issues.apache.org/jira/browse/FLINK-4486 > > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] zhijiangW commented on a change in pull request #7051: [FLINK-10820][network] Simplify the RebalancePartitioner implementation
zhijiangW commented on a change in pull request #7051: [FLINK-10820][network] Simplify the RebalancePartitioner implementation URL: https://github.com/apache/flink/pull/7051#discussion_r232926075 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/RebalancePartitioner.java ## @@ -33,29 +33,18 @@ public class RebalancePartitioner extends StreamPartitioner { private static final long serialVersionUID = 1L; - private final int[] returnArray = {Integer.MAX_VALUE - 1}; + private final int[] returnArray = { -1 }; Review comment: Yes, I also considered to use a boolean `isInitialized` before as you said. In order not to bring another extra variable, I relies on `-1` directly. I agree another boolean value seems more cleaner than current way. You are right that the current `ChannelSelector` is short of mechanism to initialization, and it may be necessary to add this logic in the interface. I will double check how many implementations are needing this logic or just `RebalancePartitioner`, especially for the batch implementations in `OutputEmitter`. Then I would consider whether to cover this refactor in this PR or in a separate PR next. :) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10820) Simplify the RebalancePartitioner implementation
[ https://issues.apache.org/jira/browse/FLINK-10820?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16684825#comment-16684825 ] ASF GitHub Bot commented on FLINK-10820: zhijiangW commented on a change in pull request #7051: [FLINK-10820][network] Simplify the RebalancePartitioner implementation URL: https://github.com/apache/flink/pull/7051#discussion_r232926075 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/RebalancePartitioner.java ## @@ -33,29 +33,18 @@ public class RebalancePartitioner extends StreamPartitioner { private static final long serialVersionUID = 1L; - private final int[] returnArray = {Integer.MAX_VALUE - 1}; + private final int[] returnArray = { -1 }; Review comment: Yes, I also considered to use a boolean `isInitialized` before as you said. In order not to bring another extra variable, I relies on `-1` directly. I agree another boolean value seems more cleaner than current way. You are right that the current `ChannelSelector` is short of mechanism to initialization, and it may be necessary to add this logic in the interface. I will double check how many implementations are needing this logic or just `RebalancePartitioner`, especially for the batch implementations in `OutputEmitter`. Then I would consider whether to cover this refactor in this PR or in a separate PR next. :) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Simplify the RebalancePartitioner implementation > > > Key: FLINK-10820 > URL: https://issues.apache.org/jira/browse/FLINK-10820 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.8.0 >Reporter: zhijiang >Assignee: zhijiang >Priority: Minor > Labels: pull-request-available > > _The current {{RebalancePartitioner}} implementation seems a little hacky for > selecting a random number as the first channel index, and the following > selections based on this random index in round-robin fashion._ > _Especially for the corner case of {{numChannels = Integer.MAX_VALUE}}, it > would trigger next random index once reaching the last channel index. > Actually the random index should be selected only once at the first time._ -- This message was sent by Atlassian JIRA (v7.6.3#76005)
***UNCHECKED*** [jira] [Commented] (FLINK-10674) DistinctAccumulator.remove lead to NPE
[ https://issues.apache.org/jira/browse/FLINK-10674?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16684786#comment-16684786 ] ASF GitHub Bot commented on FLINK-10674: wenhuitang opened a new pull request #7076: [FLINK-10674] [table] Fix DistinctAccumulator.remove lead to NPE. URL: https://github.com/apache/flink/pull/7076 ## What is the purpose of the change This pull request fixes the problem reported by FLINK-10674 DistinctAccumulator.remove lead to NPE. ## Brief change log DistinctAccumulator.remove deals with the situation that there is no corresponding instance of the parameters in the distinct map. ## Verifying this change *(Please pick either of the following options)* This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / no) no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / no) no - The serializers: (yes / no / don't know) no - The runtime per-record code paths (performance sensitive): (yes / no / don't know) no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / no / don't know) no - The S3 file system connector: (yes / no / don't know) no ## Documentation - Does this pull request introduce a new feature? (yes / no) no - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > DistinctAccumulator.remove lead to NPE > -- > > Key: FLINK-10674 > URL: https://issues.apache.org/jira/browse/FLINK-10674 > Project: Flink > Issue Type: Bug > Components: Table API SQL >Affects Versions: 1.6.1 > Environment: Flink 1.6.0 >Reporter: ambition >Assignee: winifredtang >Priority: Minor > Labels: pull-request-available > Attachments: image-2018-10-25-14-46-03-373.png > > > Our online Flink Job run about a week,job contain sql : > {code:java} > select `time`, > lower(trim(os_type)) as os_type, > count(distinct feed_id) as feed_total_view > from my_table > group by `time`, lower(trim(os_type)){code} > > then occur NPE: > > {code:java} > java.lang.NullPointerException > at scala.Predef$.Long2long(Predef.scala:363) > at > org.apache.flink.table.functions.aggfunctions.DistinctAccumulator.remove(DistinctAccumulator.scala:109) > at NonWindowedAggregationHelper$894.retract(Unknown Source) > at > org.apache.flink.table.runtime.aggregate.GroupAggProcessFunction.processElement(GroupAggProcessFunction.scala:124) > at > org.apache.flink.table.runtime.aggregate.GroupAggProcessFunction.processElement(GroupAggProcessFunction.scala:39) > at > org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator.processElement(LegacyKeyedProcessOperator.java:88) > at > org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) > at java.lang.Thread.run(Thread.java:745) > {code} > > > View DistinctAccumulator.remove > !image-2018-10-25-14-46-03-373.png! > > this NPE should currentCnt = null lead to, so we simple handle like : > {code:java} > def remove(params: Row): Boolean = { > if(!distinctValueMap.contains(params)){ > true > }else{ > val currentCnt = distinctValueMap.get(params) > // > if (currentCnt == null || currentCnt == 1) { > distinctValueMap.remove(params) > true > } else { > var value = currentCnt - 1L > if(value < 0){ > value = 1 > } > distinctValueMap.put(params, value) > false > } > } > }{code} > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
***UNCHECKED*** [jira] [Updated] (FLINK-10818) RestartStrategies.fixedDelayRestart Occur NoResourceAvailableException: Not enough free slots available to run the job.
[ https://issues.apache.org/jira/browse/FLINK-10818?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ambition updated FLINK-10818: - Attachment: jobmanager.log > RestartStrategies.fixedDelayRestart Occur NoResourceAvailableException: Not > enough free slots available to run the job. > > > Key: FLINK-10818 > URL: https://issues.apache.org/jira/browse/FLINK-10818 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.6.2 > Environment: JDK 1.8 > Flink 1.6.0 > Hadoop 2.7.3 >Reporter: ambition >Priority: Major > Attachments: image-2018-11-12-11-05-38-159.png, > image-2018-11-12-11-06-33-387.png, image-2018-11-12-11-08-13-572.png, > jobmanager.log, taskmanager.log > > > Our Online Flink on Yarn environment operation job,code set restart tactic > like > {code:java} > exeEnv.setRestartStrategy(RestartStrategies.fixedDelayRestart(5,1000l)); > {code} > But job running some days, Occur Exception is : > {code:java} > org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: > Not enough free slots available to run the job. You can decrease the operator > parallelism or increase the number of slots per TaskManager in the > configuration. Task to schedule: < Attempt #5 (Source: KafkaJsonTableSource > -> Map -> where: (AND(OR(=(app_key, _UTF-16LE'C4FAF9CE1569F541'), =(app_key, > _UTF-16LE'F5C7F68C7117630B'), =(app_key, _UTF-16LE'57C6FF4B5A064D29')), > OR(=(LOWER(TRIM(FLAG(BOTH), _UTF-16LE' ', os_type)), _UTF-16LE'ios'), > =(LOWER(TRIM(FLAG(BOTH), _UTF-16LE' ', os_type)), _UTF-16LE'android')), IS > NOT NULL(server_id))), select: (MT_Date_Format_Mode(receive_time, > _UTF-16LE'MMddHHmm', 10) AS date_p, LOWER(TRIM(FLAG(BOTH), _UTF-16LE' ', > os_type)) AS os_type, MT_Date_Format_Mode(receive_time, _UTF-16LE'HHmm', 10) > AS date_mm, server_id) (1/6)) @ (unassigned) - [SCHEDULED] > with groupID < > cbc357ccb763df2852fee8c4fc7d55f2 > in sharing group < > 690dbad267a8ff37c8cb5e9dbedd0a6d >. Resources available to scheduler: Number > of instances=6, total number of slots=6, available slots=0 >at > org.apache.flink.runtime.jobmanager.scheduler.Scheduler.scheduleTask(Scheduler.java:281) >at > org.apache.flink.runtime.jobmanager.scheduler.Scheduler.allocateSlot(Scheduler.java:155) >at > org.apache.flink.runtime.executiongraph.Execution.lambda$allocateAndAssignSlotForExecution$2(Execution.java:491) >at > org.apache.flink.runtime.executiongraph.Execution$$Lambda$44/1664178385.apply(Unknown > Source) >at > java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:981) >at > java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2116) >at > org.apache.flink.runtime.executiongraph.Execution.allocateAndAssignSlotForExecution(Execution.java:489) >at > org.apache.flink.runtime.executiongraph.ExecutionJobVertex.allocateResourcesForAll(ExecutionJobVertex.java:521) >at > org.apache.flink.runtime.executiongraph.ExecutionGraph.scheduleEager(ExecutionGraph.java:945) >at > org.apache.flink.runtime.executiongraph.ExecutionGraph.scheduleForExecution(ExecutionGraph.java:875) >at > org.apache.flink.runtime.executiongraph.ExecutionGraph.restart(ExecutionGraph.java:1262) >at > org.apache.flink.runtime.executiongraph.restart.ExecutionGraphRestartCallback.triggerFullRecovery(ExecutionGraphRestartCallback.java:59) >at > org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy$1.run(FixedDelayRestartStrategy.java:68) >at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) >at java.util.concurrent.FutureTask.run(FutureTask.java:266) >at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) >at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) >at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >at java.lang.Thread.run(Thread.java:745) > {code} > > this Exception happened when the job started. issue links to > https://issues.apache.org/jira/browse/FLINK-4486 > > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-10818) RestartStrategies.fixedDelayRestart Occur NoResourceAvailableException: Not enough free slots available to run the job.
[ https://issues.apache.org/jira/browse/FLINK-10818?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ambition updated FLINK-10818: - Attachment: taskmanager.log > RestartStrategies.fixedDelayRestart Occur NoResourceAvailableException: Not > enough free slots available to run the job. > > > Key: FLINK-10818 > URL: https://issues.apache.org/jira/browse/FLINK-10818 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.6.2 > Environment: JDK 1.8 > Flink 1.6.0 > Hadoop 2.7.3 >Reporter: ambition >Priority: Major > Attachments: image-2018-11-12-11-05-38-159.png, > image-2018-11-12-11-06-33-387.png, image-2018-11-12-11-08-13-572.png, > jobmanager.log, taskmanager.log > > > Our Online Flink on Yarn environment operation job,code set restart tactic > like > {code:java} > exeEnv.setRestartStrategy(RestartStrategies.fixedDelayRestart(5,1000l)); > {code} > But job running some days, Occur Exception is : > {code:java} > org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: > Not enough free slots available to run the job. You can decrease the operator > parallelism or increase the number of slots per TaskManager in the > configuration. Task to schedule: < Attempt #5 (Source: KafkaJsonTableSource > -> Map -> where: (AND(OR(=(app_key, _UTF-16LE'C4FAF9CE1569F541'), =(app_key, > _UTF-16LE'F5C7F68C7117630B'), =(app_key, _UTF-16LE'57C6FF4B5A064D29')), > OR(=(LOWER(TRIM(FLAG(BOTH), _UTF-16LE' ', os_type)), _UTF-16LE'ios'), > =(LOWER(TRIM(FLAG(BOTH), _UTF-16LE' ', os_type)), _UTF-16LE'android')), IS > NOT NULL(server_id))), select: (MT_Date_Format_Mode(receive_time, > _UTF-16LE'MMddHHmm', 10) AS date_p, LOWER(TRIM(FLAG(BOTH), _UTF-16LE' ', > os_type)) AS os_type, MT_Date_Format_Mode(receive_time, _UTF-16LE'HHmm', 10) > AS date_mm, server_id) (1/6)) @ (unassigned) - [SCHEDULED] > with groupID < > cbc357ccb763df2852fee8c4fc7d55f2 > in sharing group < > 690dbad267a8ff37c8cb5e9dbedd0a6d >. Resources available to scheduler: Number > of instances=6, total number of slots=6, available slots=0 >at > org.apache.flink.runtime.jobmanager.scheduler.Scheduler.scheduleTask(Scheduler.java:281) >at > org.apache.flink.runtime.jobmanager.scheduler.Scheduler.allocateSlot(Scheduler.java:155) >at > org.apache.flink.runtime.executiongraph.Execution.lambda$allocateAndAssignSlotForExecution$2(Execution.java:491) >at > org.apache.flink.runtime.executiongraph.Execution$$Lambda$44/1664178385.apply(Unknown > Source) >at > java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:981) >at > java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2116) >at > org.apache.flink.runtime.executiongraph.Execution.allocateAndAssignSlotForExecution(Execution.java:489) >at > org.apache.flink.runtime.executiongraph.ExecutionJobVertex.allocateResourcesForAll(ExecutionJobVertex.java:521) >at > org.apache.flink.runtime.executiongraph.ExecutionGraph.scheduleEager(ExecutionGraph.java:945) >at > org.apache.flink.runtime.executiongraph.ExecutionGraph.scheduleForExecution(ExecutionGraph.java:875) >at > org.apache.flink.runtime.executiongraph.ExecutionGraph.restart(ExecutionGraph.java:1262) >at > org.apache.flink.runtime.executiongraph.restart.ExecutionGraphRestartCallback.triggerFullRecovery(ExecutionGraphRestartCallback.java:59) >at > org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy$1.run(FixedDelayRestartStrategy.java:68) >at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) >at java.util.concurrent.FutureTask.run(FutureTask.java:266) >at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) >at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) >at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >at java.lang.Thread.run(Thread.java:745) > {code} > > this Exception happened when the job started. issue links to > https://issues.apache.org/jira/browse/FLINK-4486 > > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10827) Add test for duplicate() to SerializerTestBase
[ https://issues.apache.org/jira/browse/FLINK-10827?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16683604#comment-16683604 ] ASF GitHub Bot commented on FLINK-10827: StefanRRichter commented on a change in pull request #7061: [FLINK-10827][tests] Add test for duplicate() to SerializerTestBase URL: https://github.com/apache/flink/pull/7061#discussion_r232623087 ## File path: flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java ## @@ -181,11 +181,13 @@ public boolean isImmutableType() { } } - if (stateful) { - return new PojoSerializer(clazz, duplicateFieldSerializers, fields, executionConfig); - } else { - return this; + if (!stateful) { + // as a small memory optimization, we can share the same object between instances + duplicateFieldSerializers = fieldSerializers; } + + // we must create a new instance, otherwise the subclassSerializerCache can create concurrency problems Review comment: I think that we always must create a new instance, no matter what the test says, because for example the cache map is a state that is shared but can be modified at runtime. As we always create a new object for this reason, I also think that `registeredSerializers` is handled automatically from the constructor that we use and therefore is not a problem. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add test for duplicate() to SerializerTestBase > -- > > Key: FLINK-10827 > URL: https://issues.apache.org/jira/browse/FLINK-10827 > Project: Flink > Issue Type: Test > Components: Tests >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > In the past, we had many bugs from type serializers that have not properly > implemented the {{duplicate()}} method in a proper way. A very common error > is to forget about creating a deep copy of some fields that can lead to > concurrency problems in the backend. > We should add a test case for that tests duplicated serializer from different > threads to expose concurrency problems. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] StefanRRichter commented on a change in pull request #7061: [FLINK-10827][tests] Add test for duplicate() to SerializerTestBase
StefanRRichter commented on a change in pull request #7061: [FLINK-10827][tests] Add test for duplicate() to SerializerTestBase URL: https://github.com/apache/flink/pull/7061#discussion_r232623842 ## File path: flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java ## @@ -526,6 +561,55 @@ public void write(DataInputView source, int numBytes) throws IOException { } } + /** +* Runner to test serializer duplication via concurrency. +* @param type of the test elements. +*/ + static class SerializerRunner extends Thread { + final OneShotLatch startLatch; + final TypeSerializer serializer; + final T[] testData; + final int iterations; + Exception failure; + + SerializerRunner(OneShotLatch startLatch, TypeSerializer serializer, T[] testData, int iterations) { + this.startLatch = startLatch; + this.serializer = serializer; + this.testData = testData; + this.iterations = iterations; + this.failure = null; + } + + @Override + public void run() { + DataInputDeserializer dataInputDeserializer = new DataInputDeserializer(); + DataOutputSerializer dataOutputSerializer = new DataOutputSerializer(128); + try { + startLatch.await(); + for (int repeat = 0; repeat < iterations; ++repeat) { + for (T testItem : testData) { + serializer.serialize(testItem, dataOutputSerializer); + dataInputDeserializer.setBuffer( + dataOutputSerializer.getSharedBuffer(), + 0, + dataOutputSerializer.length()); + T serdeTestItem = serializer.deserialize(dataInputDeserializer); + T copySerdeTestItem = serializer.copy(serdeTestItem); + dataOutputSerializer.clear(); + Assert.assertEquals(testItem, copySerdeTestItem); + } + } + } catch (Exception ex) { Review comment: I feel like we better just transform the assert into a `checkState` that just throws an exception. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] twalthr commented on a change in pull request #7070: [FLINK-10625] Documentation for MATCH_RECOGNIZE clause
twalthr commented on a change in pull request #7070: [FLINK-10625] Documentation for MATCH_RECOGNIZE clause URL: https://github.com/apache/flink/pull/7070#discussion_r232617404 ## File path: docs/dev/table/streaming/match_recognize.md ## @@ -0,0 +1,654 @@ +--- +title: 'Detecting event patterns Experimental' +nav-parent_id: streaming_tableapi +nav-title: 'Detecting event patterns' +nav-pos: 5 +--- + + +It is a common use-case to search for set event patterns, especially in case of data streams. Apache Flink +comes with [CEP library]({{ site.baseurl }}/dev/libs/cep.html) which allows for pattern detection in event streams. On the other hand Flink's +Table API & SQL provides a relational way to express queries that comes with multiple functions and +optimizations that can be used out of the box. In December 2016, ISO released a new version of the +international SQL standard (ISO/IEC 9075:2016) including the Row Pattern Recognition for complex event processing, +which allowed to consolidate those two APIs using MATCH_RECOGNIZE clause. + +* This will be replaced by the TOC +{:toc} + +Example query +- + +Row Pattern Recognition in SQL is performed using the MATCH_RECOGNIZE clause. MATCH_RECOGNIZE enables you to do the following tasks: +* Logically partition and order the data that is used in the MATCH_RECOGNIZE clause with its PARTITION BY and ORDER BY clauses. +* Define patterns of rows to seek using the PATTERN clause of the MATCH_RECOGNIZE clause. + These patterns use regular expression syntax, a powerful and expressive feature, applied to the pattern variables you define. +* Specify the logical conditions required to map a row to a row pattern variable in the DEFINE clause. +* Define measures, which are expressions usable in other parts of the SQL query, in the MEASURES clause. + +For example to find periods of constantly decreasing price of a Ticker one could write a query like this: + +{% highlight sql %} +SELECT * +FROM Ticker +MATCH_RECOGNIZE ( +PARTITION BY symbol +ORDER BY rowtime +MEASURES + STRT_ROW.rowtime AS start_tstamp, + LAST(PRICE_DOWN.rowtime) AS bottom_tstamp, + LAST(PRICE_UP.rowtime) AS end_tstamp +ONE ROW PER MATCH +AFTER MATCH SKIP TO LAST UP +PATTERN (STRT_ROW PRICE_DOWN+ PRICE_UP+) +DEFINE + PRICE_DOWN AS PRICE_DOWN.price < LAST(PRICE_DOWN.price, 1) OR + (LAST(PRICE_DOWN.price, 1) IS NULL AND PRICE_DOWN.price < STRT_ROW.price)) + PRICE_UP AS PRICE_UP.price > LAST(PRICE_UP.price, 1) OR LAST(PRICE_UP.price, 1) IS NULL +) MR; +{% endhighlight %} + +This query given following input data: + +{% highlight text %} +SYMBOL ROWTIME PRICE +== === +'ACME' '01-Apr-11 10:00:00' 12 +'ACME' '01-Apr-11 10:00:01' 17 +'ACME' '01-Apr-11 10:00:02' 19 +'ACME' '01-Apr-11 10:00:03' 21 +'ACME' '01-Apr-11 10:00:04' 25 +'ACME' '01-Apr-11 10:00:05' 12 +'ACME' '01-Apr-11 10:00:06' 15 +'ACME' '01-Apr-11 10:00:07' 20 +'ACME' '01-Apr-11 10:00:08' 24 +'ACME' '01-Apr-11 10:00:09' 25 +'ACME' '01-Apr-11 10:00:10' 19 +{% endhighlight %} + +will produce a summary row for each found period in which the price was constantly decreasing. + +{% highlight text %} +SYMBOL START_TST BOTTOM_TS END_TSTAM += == == == +ACME 01-APR-11 10:00:04 01-APR-11 10:00:05 01-APR-11 10:00:09 +{% endhighlight %} + +The aforementioned query consists of following clauses: + +* [PARTITION BY](#partitioning) - defines logical partitioning of the stream, similar to `GROUP BY` operations. +* [ORDER BY](#order-of-events) - specifies how should the incoming events be order, this is essential as patterns define order. +* [MEASURES](#define--measures) - defines output of the clause, similar to `SELECT` clause +* [ONE ROW PER MATCH](#output-mode) - output mode which defines how many rows per match will be produced +* [AFTER MATCH SKIP](#after-match-skip) - allows to specify where next match should start, this is also a way to control to how many distinct matches a single event can belong +* [PATTERN](#defining-pattern) - clause that allows constructing patterns that will be searched for, pro +* [DEFINE](#define--measures) - this section defines conditions on events that should be met in order to be qualified to corresponding pattern variable + + +Installation guide +-- + +Match recognize uses Apache Flink's CEP library internally. In order to be able to use this clause one has to add +this library as dependency. Either by adding it to your uber-jar by adding dependency on: + +{% highlight xml %} + + org.apache.flink + flink-cep{{ site.scala_version_suffix }} + {{ site.version }} + +{% endhighlight %} + +or by adding it to the cluster classpath (see [here]({{ site.baseurl}}/dev/linking.html)). If you want to use
[GitHub] twalthr commented on a change in pull request #7070: [FLINK-10625] Documentation for MATCH_RECOGNIZE clause
twalthr commented on a change in pull request #7070: [FLINK-10625] Documentation for MATCH_RECOGNIZE clause URL: https://github.com/apache/flink/pull/7070#discussion_r232601742 ## File path: docs/dev/table/streaming/match_recognize.md ## @@ -0,0 +1,654 @@ +--- +title: 'Detecting event patterns Experimental' +nav-parent_id: streaming_tableapi +nav-title: 'Detecting event patterns' +nav-pos: 5 +--- + + +It is a common use-case to search for set event patterns, especially in case of data streams. Apache Flink +comes with [CEP library]({{ site.baseurl }}/dev/libs/cep.html) which allows for pattern detection in event streams. On the other hand Flink's +Table API & SQL provides a relational way to express queries that comes with multiple functions and +optimizations that can be used out of the box. In December 2016, ISO released a new version of the +international SQL standard (ISO/IEC 9075:2016) including the Row Pattern Recognition for complex event processing, +which allowed to consolidate those two APIs using MATCH_RECOGNIZE clause. + +* This will be replaced by the TOC +{:toc} + +Example query +- + +Row Pattern Recognition in SQL is performed using the MATCH_RECOGNIZE clause. MATCH_RECOGNIZE enables you to do the following tasks: +* Logically partition and order the data that is used in the MATCH_RECOGNIZE clause with its PARTITION BY and ORDER BY clauses. +* Define patterns of rows to seek using the PATTERN clause of the MATCH_RECOGNIZE clause. + These patterns use regular expression syntax, a powerful and expressive feature, applied to the pattern variables you define. +* Specify the logical conditions required to map a row to a row pattern variable in the DEFINE clause. +* Define measures, which are expressions usable in other parts of the SQL query, in the MEASURES clause. + +For example to find periods of constantly decreasing price of a Ticker one could write a query like this: + +{% highlight sql %} +SELECT * +FROM Ticker +MATCH_RECOGNIZE ( +PARTITION BY symbol +ORDER BY rowtime +MEASURES + STRT_ROW.rowtime AS start_tstamp, + LAST(PRICE_DOWN.rowtime) AS bottom_tstamp, + LAST(PRICE_UP.rowtime) AS end_tstamp +ONE ROW PER MATCH +AFTER MATCH SKIP TO LAST UP +PATTERN (STRT_ROW PRICE_DOWN+ PRICE_UP+) +DEFINE + PRICE_DOWN AS PRICE_DOWN.price < LAST(PRICE_DOWN.price, 1) OR + (LAST(PRICE_DOWN.price, 1) IS NULL AND PRICE_DOWN.price < STRT_ROW.price)) + PRICE_UP AS PRICE_UP.price > LAST(PRICE_UP.price, 1) OR LAST(PRICE_UP.price, 1) IS NULL +) MR; +{% endhighlight %} + +This query given following input data: + +{% highlight text %} +SYMBOL ROWTIME PRICE +== === +'ACME' '01-Apr-11 10:00:00' 12 +'ACME' '01-Apr-11 10:00:01' 17 +'ACME' '01-Apr-11 10:00:02' 19 +'ACME' '01-Apr-11 10:00:03' 21 +'ACME' '01-Apr-11 10:00:04' 25 +'ACME' '01-Apr-11 10:00:05' 12 +'ACME' '01-Apr-11 10:00:06' 15 +'ACME' '01-Apr-11 10:00:07' 20 +'ACME' '01-Apr-11 10:00:08' 24 +'ACME' '01-Apr-11 10:00:09' 25 +'ACME' '01-Apr-11 10:00:10' 19 +{% endhighlight %} + +will produce a summary row for each found period in which the price was constantly decreasing. + +{% highlight text %} +SYMBOL START_TST BOTTOM_TS END_TSTAM += == == == +ACME 01-APR-11 10:00:04 01-APR-11 10:00:05 01-APR-11 10:00:09 +{% endhighlight %} + +The aforementioned query consists of following clauses: + +* [PARTITION BY](#partitioning) - defines logical partitioning of the stream, similar to `GROUP BY` operations. +* [ORDER BY](#order-of-events) - specifies how should the incoming events be order, this is essential as patterns define order. +* [MEASURES](#define--measures) - defines output of the clause, similar to `SELECT` clause +* [ONE ROW PER MATCH](#output-mode) - output mode which defines how many rows per match will be produced +* [AFTER MATCH SKIP](#after-match-skip) - allows to specify where next match should start, this is also a way to control to how many distinct matches a single event can belong +* [PATTERN](#defining-pattern) - clause that allows constructing patterns that will be searched for, pro +* [DEFINE](#define--measures) - this section defines conditions on events that should be met in order to be qualified to corresponding pattern variable + + +Installation guide +-- + +Match recognize uses Apache Flink's CEP library internally. In order to be able to use this clause one has to add +this library as dependency. Either by adding it to your uber-jar by adding dependency on: + +{% highlight xml %} + + org.apache.flink + flink-cep{{ site.scala_version_suffix }} + {{ site.version }} + +{% endhighlight %} + +or by adding it to the cluster classpath (see [here]({{ site.baseurl}}/dev/linking.html)). If you want to use
[GitHub] twalthr commented on a change in pull request #7070: [FLINK-10625] Documentation for MATCH_RECOGNIZE clause
twalthr commented on a change in pull request #7070: [FLINK-10625] Documentation for MATCH_RECOGNIZE clause URL: https://github.com/apache/flink/pull/7070#discussion_r232619371 ## File path: docs/dev/table/streaming/match_recognize.md ## @@ -0,0 +1,654 @@ +--- +title: 'Detecting event patterns Experimental' +nav-parent_id: streaming_tableapi +nav-title: 'Detecting event patterns' +nav-pos: 5 +--- + + +It is a common use-case to search for set event patterns, especially in case of data streams. Apache Flink +comes with [CEP library]({{ site.baseurl }}/dev/libs/cep.html) which allows for pattern detection in event streams. On the other hand Flink's +Table API & SQL provides a relational way to express queries that comes with multiple functions and +optimizations that can be used out of the box. In December 2016, ISO released a new version of the +international SQL standard (ISO/IEC 9075:2016) including the Row Pattern Recognition for complex event processing, +which allowed to consolidate those two APIs using MATCH_RECOGNIZE clause. + +* This will be replaced by the TOC +{:toc} + +Example query +- + +Row Pattern Recognition in SQL is performed using the MATCH_RECOGNIZE clause. MATCH_RECOGNIZE enables you to do the following tasks: +* Logically partition and order the data that is used in the MATCH_RECOGNIZE clause with its PARTITION BY and ORDER BY clauses. +* Define patterns of rows to seek using the PATTERN clause of the MATCH_RECOGNIZE clause. + These patterns use regular expression syntax, a powerful and expressive feature, applied to the pattern variables you define. +* Specify the logical conditions required to map a row to a row pattern variable in the DEFINE clause. +* Define measures, which are expressions usable in other parts of the SQL query, in the MEASURES clause. + +For example to find periods of constantly decreasing price of a Ticker one could write a query like this: + +{% highlight sql %} +SELECT * +FROM Ticker +MATCH_RECOGNIZE ( +PARTITION BY symbol +ORDER BY rowtime +MEASURES + STRT_ROW.rowtime AS start_tstamp, + LAST(PRICE_DOWN.rowtime) AS bottom_tstamp, + LAST(PRICE_UP.rowtime) AS end_tstamp +ONE ROW PER MATCH +AFTER MATCH SKIP TO LAST UP +PATTERN (STRT_ROW PRICE_DOWN+ PRICE_UP+) +DEFINE + PRICE_DOWN AS PRICE_DOWN.price < LAST(PRICE_DOWN.price, 1) OR + (LAST(PRICE_DOWN.price, 1) IS NULL AND PRICE_DOWN.price < STRT_ROW.price)) + PRICE_UP AS PRICE_UP.price > LAST(PRICE_UP.price, 1) OR LAST(PRICE_UP.price, 1) IS NULL +) MR; +{% endhighlight %} + +This query given following input data: + +{% highlight text %} +SYMBOL ROWTIME PRICE +== === +'ACME' '01-Apr-11 10:00:00' 12 +'ACME' '01-Apr-11 10:00:01' 17 +'ACME' '01-Apr-11 10:00:02' 19 +'ACME' '01-Apr-11 10:00:03' 21 +'ACME' '01-Apr-11 10:00:04' 25 +'ACME' '01-Apr-11 10:00:05' 12 +'ACME' '01-Apr-11 10:00:06' 15 +'ACME' '01-Apr-11 10:00:07' 20 +'ACME' '01-Apr-11 10:00:08' 24 +'ACME' '01-Apr-11 10:00:09' 25 +'ACME' '01-Apr-11 10:00:10' 19 +{% endhighlight %} + +will produce a summary row for each found period in which the price was constantly decreasing. + +{% highlight text %} +SYMBOL START_TST BOTTOM_TS END_TSTAM += == == == +ACME 01-APR-11 10:00:04 01-APR-11 10:00:05 01-APR-11 10:00:09 +{% endhighlight %} + +The aforementioned query consists of following clauses: + +* [PARTITION BY](#partitioning) - defines logical partitioning of the stream, similar to `GROUP BY` operations. +* [ORDER BY](#order-of-events) - specifies how should the incoming events be order, this is essential as patterns define order. +* [MEASURES](#define--measures) - defines output of the clause, similar to `SELECT` clause +* [ONE ROW PER MATCH](#output-mode) - output mode which defines how many rows per match will be produced +* [AFTER MATCH SKIP](#after-match-skip) - allows to specify where next match should start, this is also a way to control to how many distinct matches a single event can belong +* [PATTERN](#defining-pattern) - clause that allows constructing patterns that will be searched for, pro +* [DEFINE](#define--measures) - this section defines conditions on events that should be met in order to be qualified to corresponding pattern variable + + +Installation guide +-- + +Match recognize uses Apache Flink's CEP library internally. In order to be able to use this clause one has to add +this library as dependency. Either by adding it to your uber-jar by adding dependency on: + +{% highlight xml %} + + org.apache.flink + flink-cep{{ site.scala_version_suffix }} + {{ site.version }} + +{% endhighlight %} + +or by adding it to the cluster classpath (see [here]({{ site.baseurl}}/dev/linking.html)). If you want to use
[GitHub] twalthr commented on a change in pull request #7070: [FLINK-10625] Documentation for MATCH_RECOGNIZE clause
twalthr commented on a change in pull request #7070: [FLINK-10625] Documentation for MATCH_RECOGNIZE clause URL: https://github.com/apache/flink/pull/7070#discussion_r232595246 ## File path: docs/dev/table/sql.md ## @@ -756,6 +796,51 @@ Group windows are defined in the `GROUP BY` clause of a SQL query. Just like que +### Match_recognize + + + + + + Operation + Description + + + + + +MATCH_RECOGNIZE +Streaming + + +Search for given event pattern in an incoming stream. For more though description see Detecting event patterns Review comment: "Searches for a given pattern in a streaming table according to the `MATCH_RECOGNIZE` standard [link]." "For a more detailed description see..." This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] twalthr commented on a change in pull request #7070: [FLINK-10625] Documentation for MATCH_RECOGNIZE clause
twalthr commented on a change in pull request #7070: [FLINK-10625] Documentation for MATCH_RECOGNIZE clause URL: https://github.com/apache/flink/pull/7070#discussion_r232604261 ## File path: docs/dev/table/streaming/match_recognize.md ## @@ -0,0 +1,654 @@ +--- +title: 'Detecting event patterns Experimental' +nav-parent_id: streaming_tableapi +nav-title: 'Detecting event patterns' +nav-pos: 5 +--- + + +It is a common use-case to search for set event patterns, especially in case of data streams. Apache Flink +comes with [CEP library]({{ site.baseurl }}/dev/libs/cep.html) which allows for pattern detection in event streams. On the other hand Flink's +Table API & SQL provides a relational way to express queries that comes with multiple functions and +optimizations that can be used out of the box. In December 2016, ISO released a new version of the +international SQL standard (ISO/IEC 9075:2016) including the Row Pattern Recognition for complex event processing, +which allowed to consolidate those two APIs using MATCH_RECOGNIZE clause. + +* This will be replaced by the TOC +{:toc} + +Example query +- + +Row Pattern Recognition in SQL is performed using the MATCH_RECOGNIZE clause. MATCH_RECOGNIZE enables you to do the following tasks: +* Logically partition and order the data that is used in the MATCH_RECOGNIZE clause with its PARTITION BY and ORDER BY clauses. +* Define patterns of rows to seek using the PATTERN clause of the MATCH_RECOGNIZE clause. + These patterns use regular expression syntax, a powerful and expressive feature, applied to the pattern variables you define. +* Specify the logical conditions required to map a row to a row pattern variable in the DEFINE clause. +* Define measures, which are expressions usable in other parts of the SQL query, in the MEASURES clause. + +For example to find periods of constantly decreasing price of a Ticker one could write a query like this: + +{% highlight sql %} +SELECT * +FROM Ticker +MATCH_RECOGNIZE ( +PARTITION BY symbol +ORDER BY rowtime +MEASURES + STRT_ROW.rowtime AS start_tstamp, + LAST(PRICE_DOWN.rowtime) AS bottom_tstamp, + LAST(PRICE_UP.rowtime) AS end_tstamp +ONE ROW PER MATCH +AFTER MATCH SKIP TO LAST UP +PATTERN (STRT_ROW PRICE_DOWN+ PRICE_UP+) +DEFINE + PRICE_DOWN AS PRICE_DOWN.price < LAST(PRICE_DOWN.price, 1) OR + (LAST(PRICE_DOWN.price, 1) IS NULL AND PRICE_DOWN.price < STRT_ROW.price)) + PRICE_UP AS PRICE_UP.price > LAST(PRICE_UP.price, 1) OR LAST(PRICE_UP.price, 1) IS NULL +) MR; +{% endhighlight %} + +This query given following input data: + +{% highlight text %} +SYMBOL ROWTIME PRICE +== === +'ACME' '01-Apr-11 10:00:00' 12 +'ACME' '01-Apr-11 10:00:01' 17 +'ACME' '01-Apr-11 10:00:02' 19 +'ACME' '01-Apr-11 10:00:03' 21 +'ACME' '01-Apr-11 10:00:04' 25 +'ACME' '01-Apr-11 10:00:05' 12 +'ACME' '01-Apr-11 10:00:06' 15 +'ACME' '01-Apr-11 10:00:07' 20 +'ACME' '01-Apr-11 10:00:08' 24 +'ACME' '01-Apr-11 10:00:09' 25 +'ACME' '01-Apr-11 10:00:10' 19 +{% endhighlight %} + +will produce a summary row for each found period in which the price was constantly decreasing. + +{% highlight text %} +SYMBOL START_TST BOTTOM_TS END_TSTAM += == == == +ACME 01-APR-11 10:00:04 01-APR-11 10:00:05 01-APR-11 10:00:09 +{% endhighlight %} + +The aforementioned query consists of following clauses: + +* [PARTITION BY](#partitioning) - defines logical partitioning of the stream, similar to `GROUP BY` operations. +* [ORDER BY](#order-of-events) - specifies how should the incoming events be order, this is essential as patterns define order. +* [MEASURES](#define--measures) - defines output of the clause, similar to `SELECT` clause +* [ONE ROW PER MATCH](#output-mode) - output mode which defines how many rows per match will be produced +* [AFTER MATCH SKIP](#after-match-skip) - allows to specify where next match should start, this is also a way to control to how many distinct matches a single event can belong +* [PATTERN](#defining-pattern) - clause that allows constructing patterns that will be searched for, pro Review comment: `pro`? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] twalthr commented on a change in pull request #7070: [FLINK-10625] Documentation for MATCH_RECOGNIZE clause
twalthr commented on a change in pull request #7070: [FLINK-10625] Documentation for MATCH_RECOGNIZE clause URL: https://github.com/apache/flink/pull/7070#discussion_r232593513 ## File path: docs/dev/table/sql.md ## @@ -756,6 +796,51 @@ Group windows are defined in the `GROUP BY` clause of a SQL query. Just like que +### Match_recognize Review comment: `Match_recognize` -> `Pattern Recognition` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] twalthr commented on a change in pull request #7070: [FLINK-10625] Documentation for MATCH_RECOGNIZE clause
twalthr commented on a change in pull request #7070: [FLINK-10625] Documentation for MATCH_RECOGNIZE clause URL: https://github.com/apache/flink/pull/7070#discussion_r232600187 ## File path: docs/dev/table/streaming/match_recognize.md ## @@ -0,0 +1,654 @@ +--- +title: 'Detecting event patterns Experimental' +nav-parent_id: streaming_tableapi +nav-title: 'Detecting event patterns' +nav-pos: 5 +--- + + +It is a common use-case to search for set event patterns, especially in case of data streams. Apache Flink +comes with [CEP library]({{ site.baseurl }}/dev/libs/cep.html) which allows for pattern detection in event streams. On the other hand Flink's +Table API & SQL provides a relational way to express queries that comes with multiple functions and +optimizations that can be used out of the box. In December 2016, ISO released a new version of the +international SQL standard (ISO/IEC 9075:2016) including the Row Pattern Recognition for complex event processing, +which allowed to consolidate those two APIs using MATCH_RECOGNIZE clause. + +* This will be replaced by the TOC +{:toc} + +Example query +- + +Row Pattern Recognition in SQL is performed using the MATCH_RECOGNIZE clause. MATCH_RECOGNIZE enables you to do the following tasks: +* Logically partition and order the data that is used in the MATCH_RECOGNIZE clause with its PARTITION BY and ORDER BY clauses. +* Define patterns of rows to seek using the PATTERN clause of the MATCH_RECOGNIZE clause. + These patterns use regular expression syntax, a powerful and expressive feature, applied to the pattern variables you define. +* Specify the logical conditions required to map a row to a row pattern variable in the DEFINE clause. +* Define measures, which are expressions usable in other parts of the SQL query, in the MEASURES clause. + +For example to find periods of constantly decreasing price of a Ticker one could write a query like this: + +{% highlight sql %} +SELECT * +FROM Ticker +MATCH_RECOGNIZE ( +PARTITION BY symbol +ORDER BY rowtime +MEASURES + STRT_ROW.rowtime AS start_tstamp, + LAST(PRICE_DOWN.rowtime) AS bottom_tstamp, + LAST(PRICE_UP.rowtime) AS end_tstamp +ONE ROW PER MATCH +AFTER MATCH SKIP TO LAST UP +PATTERN (STRT_ROW PRICE_DOWN+ PRICE_UP+) +DEFINE + PRICE_DOWN AS PRICE_DOWN.price < LAST(PRICE_DOWN.price, 1) OR + (LAST(PRICE_DOWN.price, 1) IS NULL AND PRICE_DOWN.price < STRT_ROW.price)) + PRICE_UP AS PRICE_UP.price > LAST(PRICE_UP.price, 1) OR LAST(PRICE_UP.price, 1) IS NULL +) MR; +{% endhighlight %} + +This query given following input data: Review comment: Introduce the input data first and explain each column. Esp. that rowtime is a time attribute. Then show the query with explanation. Then the output data with explanation. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] twalthr commented on a change in pull request #7070: [FLINK-10625] Documentation for MATCH_RECOGNIZE clause
twalthr commented on a change in pull request #7070: [FLINK-10625] Documentation for MATCH_RECOGNIZE clause URL: https://github.com/apache/flink/pull/7070#discussion_r232603954 ## File path: docs/dev/table/streaming/match_recognize.md ## @@ -0,0 +1,654 @@ +--- +title: 'Detecting event patterns Experimental' +nav-parent_id: streaming_tableapi +nav-title: 'Detecting event patterns' +nav-pos: 5 +--- + + +It is a common use-case to search for set event patterns, especially in case of data streams. Apache Flink +comes with [CEP library]({{ site.baseurl }}/dev/libs/cep.html) which allows for pattern detection in event streams. On the other hand Flink's +Table API & SQL provides a relational way to express queries that comes with multiple functions and +optimizations that can be used out of the box. In December 2016, ISO released a new version of the +international SQL standard (ISO/IEC 9075:2016) including the Row Pattern Recognition for complex event processing, +which allowed to consolidate those two APIs using MATCH_RECOGNIZE clause. + +* This will be replaced by the TOC +{:toc} + +Example query +- + +Row Pattern Recognition in SQL is performed using the MATCH_RECOGNIZE clause. MATCH_RECOGNIZE enables you to do the following tasks: Review comment: Also mention that match recognize can only be applied on a append table with time attributes and produces an append table. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] twalthr commented on a change in pull request #7070: [FLINK-10625] Documentation for MATCH_RECOGNIZE clause
twalthr commented on a change in pull request #7070: [FLINK-10625] Documentation for MATCH_RECOGNIZE clause URL: https://github.com/apache/flink/pull/7070#discussion_r232597810 ## File path: docs/dev/table/streaming/match_recognize.md ## @@ -0,0 +1,654 @@ +--- +title: 'Detecting event patterns Experimental' +nav-parent_id: streaming_tableapi +nav-title: 'Detecting event patterns' +nav-pos: 5 +--- + + +It is a common use-case to search for set event patterns, especially in case of data streams. Apache Flink +comes with [CEP library]({{ site.baseurl }}/dev/libs/cep.html) which allows for pattern detection in event streams. On the other hand Flink's +Table API & SQL provides a relational way to express queries that comes with multiple functions and +optimizations that can be used out of the box. In December 2016, ISO released a new version of the +international SQL standard (ISO/IEC 9075:2016) including the Row Pattern Recognition for complex event processing, +which allowed to consolidate those two APIs using MATCH_RECOGNIZE clause. + +* This will be replaced by the TOC +{:toc} + +Example query +- + +Row Pattern Recognition in SQL is performed using the MATCH_RECOGNIZE clause. MATCH_RECOGNIZE enables you to do the following tasks: Review comment: Use backticks throughout the document to mark clauses. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] twalthr commented on a change in pull request #7070: [FLINK-10625] Documentation for MATCH_RECOGNIZE clause
twalthr commented on a change in pull request #7070: [FLINK-10625] Documentation for MATCH_RECOGNIZE clause URL: https://github.com/apache/flink/pull/7070#discussion_r232598253 ## File path: docs/dev/table/streaming/match_recognize.md ## @@ -0,0 +1,654 @@ +--- +title: 'Detecting event patterns Experimental' +nav-parent_id: streaming_tableapi +nav-title: 'Detecting event patterns' +nav-pos: 5 +--- + + +It is a common use-case to search for set event patterns, especially in case of data streams. Apache Flink +comes with [CEP library]({{ site.baseurl }}/dev/libs/cep.html) which allows for pattern detection in event streams. On the other hand Flink's +Table API & SQL provides a relational way to express queries that comes with multiple functions and +optimizations that can be used out of the box. In December 2016, ISO released a new version of the +international SQL standard (ISO/IEC 9075:2016) including the Row Pattern Recognition for complex event processing, +which allowed to consolidate those two APIs using MATCH_RECOGNIZE clause. + +* This will be replaced by the TOC +{:toc} + +Example query +- + +Row Pattern Recognition in SQL is performed using the MATCH_RECOGNIZE clause. MATCH_RECOGNIZE enables you to do the following tasks: +* Logically partition and order the data that is used in the MATCH_RECOGNIZE clause with its PARTITION BY and ORDER BY clauses. +* Define patterns of rows to seek using the PATTERN clause of the MATCH_RECOGNIZE clause. Review comment: remove "of/in the MATCH_RECOGNIZE clause" for all list items This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] twalthr commented on a change in pull request #7070: [FLINK-10625] Documentation for MATCH_RECOGNIZE clause
twalthr commented on a change in pull request #7070: [FLINK-10625] Documentation for MATCH_RECOGNIZE clause URL: https://github.com/apache/flink/pull/7070#discussion_r232596586 ## File path: docs/dev/table/streaming/match_recognize.md ## @@ -0,0 +1,654 @@ +--- +title: 'Detecting event patterns Experimental' +nav-parent_id: streaming_tableapi +nav-title: 'Detecting event patterns' Review comment: Should we just call it `Detecting Patterns`? Because there are no events in the table world but only rows that are appended to a table. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] twalthr commented on a change in pull request #7070: [FLINK-10625] Documentation for MATCH_RECOGNIZE clause
twalthr commented on a change in pull request #7070: [FLINK-10625] Documentation for MATCH_RECOGNIZE clause URL: https://github.com/apache/flink/pull/7070#discussion_r232601307 ## File path: docs/dev/table/streaming/match_recognize.md ## @@ -0,0 +1,654 @@ +--- +title: 'Detecting event patterns Experimental' +nav-parent_id: streaming_tableapi +nav-title: 'Detecting event patterns' +nav-pos: 5 +--- + + +It is a common use-case to search for set event patterns, especially in case of data streams. Apache Flink +comes with [CEP library]({{ site.baseurl }}/dev/libs/cep.html) which allows for pattern detection in event streams. On the other hand Flink's +Table API & SQL provides a relational way to express queries that comes with multiple functions and +optimizations that can be used out of the box. In December 2016, ISO released a new version of the +international SQL standard (ISO/IEC 9075:2016) including the Row Pattern Recognition for complex event processing, +which allowed to consolidate those two APIs using MATCH_RECOGNIZE clause. + +* This will be replaced by the TOC +{:toc} + +Example query +- + +Row Pattern Recognition in SQL is performed using the MATCH_RECOGNIZE clause. MATCH_RECOGNIZE enables you to do the following tasks: +* Logically partition and order the data that is used in the MATCH_RECOGNIZE clause with its PARTITION BY and ORDER BY clauses. +* Define patterns of rows to seek using the PATTERN clause of the MATCH_RECOGNIZE clause. + These patterns use regular expression syntax, a powerful and expressive feature, applied to the pattern variables you define. +* Specify the logical conditions required to map a row to a row pattern variable in the DEFINE clause. +* Define measures, which are expressions usable in other parts of the SQL query, in the MEASURES clause. + +For example to find periods of constantly decreasing price of a Ticker one could write a query like this: + +{% highlight sql %} +SELECT * +FROM Ticker +MATCH_RECOGNIZE ( +PARTITION BY symbol +ORDER BY rowtime +MEASURES + STRT_ROW.rowtime AS start_tstamp, + LAST(PRICE_DOWN.rowtime) AS bottom_tstamp, + LAST(PRICE_UP.rowtime) AS end_tstamp +ONE ROW PER MATCH +AFTER MATCH SKIP TO LAST UP +PATTERN (STRT_ROW PRICE_DOWN+ PRICE_UP+) +DEFINE + PRICE_DOWN AS PRICE_DOWN.price < LAST(PRICE_DOWN.price, 1) OR + (LAST(PRICE_DOWN.price, 1) IS NULL AND PRICE_DOWN.price < STRT_ROW.price)) + PRICE_UP AS PRICE_UP.price > LAST(PRICE_UP.price, 1) OR LAST(PRICE_UP.price, 1) IS NULL +) MR; +{% endhighlight %} + +This query given following input data: + +{% highlight text %} +SYMBOL ROWTIME PRICE +== === +'ACME' '01-Apr-11 10:00:00' 12 +'ACME' '01-Apr-11 10:00:01' 17 +'ACME' '01-Apr-11 10:00:02' 19 +'ACME' '01-Apr-11 10:00:03' 21 +'ACME' '01-Apr-11 10:00:04' 25 +'ACME' '01-Apr-11 10:00:05' 12 +'ACME' '01-Apr-11 10:00:06' 15 +'ACME' '01-Apr-11 10:00:07' 20 +'ACME' '01-Apr-11 10:00:08' 24 +'ACME' '01-Apr-11 10:00:09' 25 +'ACME' '01-Apr-11 10:00:10' 19 +{% endhighlight %} + +will produce a summary row for each found period in which the price was constantly decreasing. + +{% highlight text %} +SYMBOL START_TST BOTTOM_TS END_TSTAM += == == == +ACME 01-APR-11 10:00:04 01-APR-11 10:00:05 01-APR-11 10:00:09 +{% endhighlight %} + +The aforementioned query consists of following clauses: + +* [PARTITION BY](#partitioning) - defines logical partitioning of the stream, similar to `GROUP BY` operations. +* [ORDER BY](#order-of-events) - specifies how should the incoming events be order, this is essential as patterns define order. +* [MEASURES](#define--measures) - defines output of the clause, similar to `SELECT` clause +* [ONE ROW PER MATCH](#output-mode) - output mode which defines how many rows per match will be produced +* [AFTER MATCH SKIP](#after-match-skip) - allows to specify where next match should start, this is also a way to control to how many distinct matches a single event can belong +* [PATTERN](#defining-pattern) - clause that allows constructing patterns that will be searched for, pro +* [DEFINE](#define--measures) - this section defines conditions on events that should be met in order to be qualified to corresponding pattern variable + + +Installation guide +-- + +Match recognize uses Apache Flink's CEP library internally. In order to be able to use this clause one has to add +this library as dependency. Either by adding it to your uber-jar by adding dependency on: + +{% highlight xml %} + + org.apache.flink + flink-cep{{ site.scala_version_suffix }} + {{ site.version }} + +{% endhighlight %} + +or by adding it to the cluster classpath (see [here]({{ site.baseurl}}/dev/linking.html)). If you want to use
[jira] [Commented] (FLINK-10731) Support AVG on Date fields
[ https://issues.apache.org/jira/browse/FLINK-10731?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16683607#comment-16683607 ] Flavio Pompermaier commented on FLINK-10731: Fine for me [~twalthr] > Support AVG on Date fields > -- > > Key: FLINK-10731 > URL: https://issues.apache.org/jira/browse/FLINK-10731 > Project: Flink > Issue Type: Improvement > Components: Table API SQL >Affects Versions: 1.6.2 >Reporter: Flavio Pompermaier >Priority: Minor > > AVG function does not work on date fields right now -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9311) PubSub connector
[ https://issues.apache.org/jira/browse/FLINK-9311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16683691#comment-16683691 ] ASF GitHub Bot commented on FLINK-9311: --- nielsbasjes commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE) URL: https://github.com/apache/flink/pull/6594#discussion_r232636055 ## File path: flink-connectors/flink-connector-pubsub/src/test/java/org/apache/flink/streaming/connectors/pubsub/SubscriberWrapperTest.java ## @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.pubsub; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.streaming.connectors.pubsub.common.PubSubSubscriberFactory; +import org.apache.flink.streaming.connectors.pubsub.common.SerializableCredentialsProvider; + +import com.google.api.core.ApiService; +import com.google.api.gax.core.CredentialsProvider; +import com.google.cloud.pubsub.v1.AckReplyConsumer; +import com.google.cloud.pubsub.v1.MessageReceiver; +import com.google.cloud.pubsub.v1.Subscriber; +import com.google.protobuf.ByteString; +import com.google.pubsub.v1.ProjectSubscriptionName; +import com.google.pubsub.v1.PubsubMessage; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.runners.MockitoJUnitRunner; + +import static org.apache.flink.api.java.ClosureCleaner.ensureSerializable; +import static org.apache.flink.streaming.connectors.pubsub.common.SerializableCredentialsProvider.withoutCredentials; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyZeroInteractions; +import static org.mockito.Mockito.when; + +/** + * Tests for {@link SubscriberWrapper}. + */ +@RunWith(MockitoJUnitRunner.class) +public class SubscriberWrapperTest { + @Mock + private PubSubSubscriberFactory pubSubSubscriberFactory; + + @Mock + private Subscriber subscriber; + + @Mock + private ApiService apiService; + + private PubsubMessage pubsubMessage = pubSubMessage(); + + @Mock + private AckReplyConsumer ackReplyConsumer; + + private SubscriberWrapper subscriberWrapper; + + @Before + public void setup() throws Exception { + when(pubSubSubscriberFactory.getSubscriber(any(), any(), any())).thenReturn(subscriber); + subscriberWrapper = new SubscriberWrapper(withoutCredentials(), ProjectSubscriptionName.of("projectId", "subscriptionId"), pubSubSubscriberFactory); + } + + @Test + public void testSerializedSubscriberBuilder() throws Exception { + SubscriberWrapper subscriberWrapper = new SubscriberWrapper(withoutCredentials(), ProjectSubscriptionName.of("projectId", "subscriptionId"), SubscriberWrapperTest::subscriberFactory); + ensureSerializable(subscriberWrapper); + } + + @Test + public void testInitialisation() { + SerializableCredentialsProvider credentialsProvider = withoutCredentials(); + ProjectSubscriptionName projectSubscriptionName = ProjectSubscriptionName.of("projectId", "subscriptionId"); + SubscriberWrapper subscriberWrapper = new SubscriberWrapper(credentialsProvider, projectSubscriptionName, pubSubSubscriberFactory); + + subscriberWrapper.initialize(); + verify(pubSubSubscriberFactory, times(1)).getSubscriber(credentialsProvider, projectSubscriptionName, subscriberWrapper); + } + + @Test + public void testStart() { + when(subscriber.startAsync()).thenReturn(apiService); + subscriberWrapper.initialize(); + + subscriberWrapper.start(); + verify(apiService, times(1)).awaitRunning(); +
[jira] [Commented] (FLINK-9311) PubSub connector
[ https://issues.apache.org/jira/browse/FLINK-9311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16683693#comment-16683693 ] ASF GitHub Bot commented on FLINK-9311: --- nielsbasjes commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE) URL: https://github.com/apache/flink/pull/6594#discussion_r232636103 ## File path: docs/dev/connectors/pubsub.md ## @@ -0,0 +1,106 @@ +--- +title: "Google PubSub" +nav-title: PubSub +nav-parent_id: connectors +nav-pos: 7 +--- + + +This connector provides a Source and Sink that can read from and write to +[Google PubSub](https://cloud.google.com/pubsub). To use this connector, add the +following dependency to your project: + +{% highlight xml %} + + org.apache.flink + flink-connector-pubsub{{ site.scala_version_suffix }} + {{site.version }} + +{% endhighlight %} + +Note that the streaming connectors are currently not part of the binary +distribution. See +[here]({{site.baseurl}}/dev/linking.html) +for information about how to package the program with the libraries for +cluster execution. + + PubSub Source + +The connector provides a Source for reading data from Google PubSub to Apache Flink. PubSub has an Atleast-Once guarantee and as such. Review comment: Fixed This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > PubSub connector > > > Key: FLINK-9311 > URL: https://issues.apache.org/jira/browse/FLINK-9311 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Richard Deurwaarder >Assignee: Niels Basjes >Priority: Minor > Labels: pull-request-available > > I would like start adding some google cloud connectors starting with a PubSub > Source. I have a basic implementation ready but I want it to be able to: > * easily scale up (should I have it extend RichParallelSourceFunction for > this?) > * Make it easier to provide the google cloud credentials. This would require > being able to send some json string / ServiceAccount to the nodes when > starting up this source. > Could this be something that would be useful for others and added to the > flink connectors repo? -- This message was sent by Atlassian JIRA (v7.6.3#76005)