[jira] [Assigned] (FLINK-10500) Let ExecutionGraphDriver react to fail signal
[ https://issues.apache.org/jira/browse/FLINK-10500?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shimin Yang reassigned FLINK-10500: --- Assignee: Shimin Yang > Let ExecutionGraphDriver react to fail signal > - > > Key: FLINK-10500 > URL: https://issues.apache.org/jira/browse/FLINK-10500 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Affects Versions: 1.7.0 >Reporter: Till Rohrmann >Assignee: Shimin Yang >Priority: Major > Fix For: 1.7.0 > > > In order to scale down when there are not enough resources available or if > TMs died, the {{ExecutionGraphDriver}} needs to learn about a failure. > Depending on the failure type and the available set of resources, it can then > decide to scale the job down or simply restart. In the scope of this issue, > the {{ExecutionGraphDriver}} should simply call into the {{RestartStrategy}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10618) Introduce catalog for Flink tables
[ https://issues.apache.org/jira/browse/FLINK-10618?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16661688#comment-16661688 ] Shimin Yang commented on FLINK-10618: - Sounds good. Do you have a draft of design doc? > Introduce catalog for Flink tables > -- > > Key: FLINK-10618 > URL: https://issues.apache.org/jira/browse/FLINK-10618 > Project: Flink > Issue Type: New Feature > Components: SQL Client >Affects Versions: 1.6.1 >Reporter: Xuefu Zhang >Assignee: Xuefu Zhang >Priority: Major > > Besides meta objects such as tables that may come from an > {{ExternalCatalog}}, Flink also deals with tables/views/functions that are > created on the fly (in memory), or specified in a configuration file. Those > objects don't belong to any {{ExternalCatalog}}, yet Flink either stores them > in memory, which are non-persistent, or recreates them from a file, which is > a big pain for the user. Those objects are only known to Flink but Flink has > a poor management for them. > Since they are typical objects in a database catalog, it's natural to have a > catalog that manages those objects. The interface will be similar to > {{ExternalCatalog}}, which contains meta objects that are not managed by > Flink. There are several possible implementations of the Flink internal > catalog interface: memory, file, external registry (such as confluent schema > registry or Hive metastore), and relational database, etc. > The initial functionality as well as the catalog hierarchy could be very > simple. The basic functionality of the catalog will be mostly create, alter, > and drop tables, views, functions, etc. Obviously, this can evolve over the > time. > We plan to provide implementations with memory, file, and Hive metastore, and > will be plugged in at SQL-Client layer. > Please provide your feedback. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-10659) get rid of splitting by comma for ProgramArgsQueryParameter
Kan created FLINK-10659: --- Summary: get rid of splitting by comma for ProgramArgsQueryParameter Key: FLINK-10659 URL: https://issues.apache.org/jira/browse/FLINK-10659 Project: Flink Issue Type: Improvement Components: REST Environment: I encountered this issue on flink 1.6 Reporter: Kan Flink rest handler splits query param {color:#59afe1}program-args{color} by comma. But it will then report error "Expected only one value". Same as the other query params. Shall we remove the splitting by comma from the corresponding XxxQueryParameter class? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10658) org.apache.flink.util.FlinkException: Releasing shared slot parent.
[ https://issues.apache.org/jira/browse/FLINK-10658?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16661682#comment-16661682 ] vinoyang commented on FLINK-10658: -- The behavior of releasing slots is usually triggered by the Flink framework and does not require application level intervention. Therefore, when you see it, it may be an unexpected behavior (perhaps a bug). > org.apache.flink.util.FlinkException: Releasing shared slot parent. > --- > > Key: FLINK-10658 > URL: https://issues.apache.org/jira/browse/FLINK-10658 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.5.4 >Reporter: chauncy >Priority: Major > > i don't when throw the exception who tell me ? thanks -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10658) org.apache.flink.util.FlinkException: Releasing shared slot parent.
[ https://issues.apache.org/jira/browse/FLINK-10658?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16661676#comment-16661676 ] chauncy commented on FLINK-10658: - [~yanghua] confuse, how release that child slots ? in my app code or flink framework > org.apache.flink.util.FlinkException: Releasing shared slot parent. > --- > > Key: FLINK-10658 > URL: https://issues.apache.org/jira/browse/FLINK-10658 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.5.4 >Reporter: chauncy >Priority: Major > > i don't when throw the exception who tell me ? thanks -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10658) org.apache.flink.util.FlinkException: Releasing shared slot parent.
[ https://issues.apache.org/jira/browse/FLINK-10658?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16661669#comment-16661669 ] vinoyang commented on FLINK-10658: -- hi [~chauncy] When the SharedSlot is being released, this exception will be generated if it still has child slots that has not been released. > org.apache.flink.util.FlinkException: Releasing shared slot parent. > --- > > Key: FLINK-10658 > URL: https://issues.apache.org/jira/browse/FLINK-10658 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.5.4 >Reporter: chauncy >Priority: Major > > i don't when throw the exception who tell me ? thanks -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-10658) org.apache.flink.util.FlinkException: Releasing shared slot parent.
[ https://issues.apache.org/jira/browse/FLINK-10658?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] chauncy updated FLINK-10658: Description: i don't when throw the exception (was: i don't why throw the ) > org.apache.flink.util.FlinkException: Releasing shared slot parent. > --- > > Key: FLINK-10658 > URL: https://issues.apache.org/jira/browse/FLINK-10658 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.5.4 >Reporter: chauncy >Priority: Major > > i don't when throw the exception -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-10658) org.apache.flink.util.FlinkException: Releasing shared slot parent.
[ https://issues.apache.org/jira/browse/FLINK-10658?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] chauncy updated FLINK-10658: Description: i don't when throw the exception who tell me ? thanks (was: i don't when throw the exception) > org.apache.flink.util.FlinkException: Releasing shared slot parent. > --- > > Key: FLINK-10658 > URL: https://issues.apache.org/jira/browse/FLINK-10658 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.5.4 >Reporter: chauncy >Priority: Major > > i don't when throw the exception who tell me ? thanks -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-10658) org.apache.flink.util.FlinkException: Releasing shared slot parent.
[ https://issues.apache.org/jira/browse/FLINK-10658?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] chauncy updated FLINK-10658: Description: i don't why throw the > org.apache.flink.util.FlinkException: Releasing shared slot parent. > --- > > Key: FLINK-10658 > URL: https://issues.apache.org/jira/browse/FLINK-10658 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.5.4 >Reporter: chauncy >Priority: Major > > i don't why throw the -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-10658) org.apache.flink.util.FlinkException: Releasing shared slot parent.
chauncy created FLINK-10658: --- Summary: org.apache.flink.util.FlinkException: Releasing shared slot parent. Key: FLINK-10658 URL: https://issues.apache.org/jira/browse/FLINK-10658 Project: Flink Issue Type: Bug Components: DataStream API Affects Versions: 1.5.4 Reporter: chauncy -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9999) Add ISNUMERIC supported in Table API/SQL
[ https://issues.apache.org/jira/browse/FLINK-?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16661627#comment-16661627 ] ASF GitHub Bot commented on FLINK-: --- yanghua closed pull request #6473: [FLINK-] [table] Add ISNUMERIC supported in Table API/SQL URL: https://github.com/apache/flink/pull/6473 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/docs/dev/table/sql.md b/docs/dev/table/sql.md index 366e3fdcc64..76d801ca960 100644 --- a/docs/dev/table/sql.md +++ b/docs/dev/table/sql.md @@ -1842,6 +1842,16 @@ RPAD(text string, len integer, pad string) {% highlight text %} +ISNUMERIC(text string) +{% endhighlight %} + + +Returns an Integer to indicate if the text string is a numeric value, supports some characters that are not numbers, such as plus (+), minus (-), and valid currency symbols such as the dollar sign ($), if true, returns 1, otherwise returns 0, if text is NULL, returns NULL. E.g. ISNUMERIC('123.0') returns 1. + + + + +{% highlight text %} FROM_BASE64(text string) {% endhighlight %} diff --git a/docs/dev/table/tableApi.md b/docs/dev/table/tableApi.md index 6e202f19d5d..61f74d51dee 100644 --- a/docs/dev/table/tableApi.md +++ b/docs/dev/table/tableApi.md @@ -2477,6 +2477,17 @@ STRING.rpad(len INT, pad STRING) {% highlight java %} +STRING.isNumeric() +{% endhighlight %} + + + +Returns an Integer to indicate if the text string is a numeric value, supports some characters that are not numbers, such as plus (+), minus (-), and valid currency symbols such as the dollar sign ($), if true, returns 1, otherwise returns 0, if text is NULL, returns NULL. E.g. "123".isNumeric() returns 1. + + + + +{% highlight java %} STRING.fromBase64() {% endhighlight %} @@ -4025,6 +4036,18 @@ STRING.initCap() + + +{% highlight scala %} +STRING.isNumeric() +{% endhighlight %} + + + +Returns an Integer to indicate if the text string is a numeric value, supports some characters that are not numbers, such as plus (+), minus (-), and valid currency symbols such as the dollar sign ($), if true, returns 1, otherwise returns 0, if text is NULL, returns NULL. E.g. "123".isNumeric() returns 1. + + + diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala index 35d2167848a..4c3c411bbe0 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala @@ -544,6 +544,11 @@ trait ImplicitExpressionOperations { def overlay(newString: Expression, starting: Expression, length: Expression) = Overlay(expr, newString, starting, length) + /** +* Returns an Integer to indicate if the text string is a numeric value. +*/ + def isNumeric() = IsNumeric(expr) + /** * Returns the base string decoded with base64. */ diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala index 0e0f709eabc..e02dc6e8869 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala @@ -113,5 +113,7 @@ object BuiltInMethods { val BIN = Types.lookupMethod(classOf[JLong], "toBinaryString", classOf[Long]) + val ISNUMERIC = Types.lookupMethod(classOf[ScalarFunctions], "isNumeric", classOf[String]) + val FROMBASE64 = Types.lookupMethod(classOf[ScalarFunctions], "fromBase64", classOf[String]) } diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala index a5c275ab415..39f1f4948b1 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala @@ -146,6 +146,12 @@ object FunctionGenerator { STRING_TYPE_INFO, BuiltInMethod.OVERLAY.method) + addSqlFunctionMethod( +ISNUMERIC, +Seq(STRING_TYPE_INFO), +
[GitHub] yanghua closed pull request #6473: [FLINK-9999] [table] Add ISNUMERIC supported in Table API/SQL
yanghua closed pull request #6473: [FLINK-] [table] Add ISNUMERIC supported in Table API/SQL URL: https://github.com/apache/flink/pull/6473 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/docs/dev/table/sql.md b/docs/dev/table/sql.md index 366e3fdcc64..76d801ca960 100644 --- a/docs/dev/table/sql.md +++ b/docs/dev/table/sql.md @@ -1842,6 +1842,16 @@ RPAD(text string, len integer, pad string) {% highlight text %} +ISNUMERIC(text string) +{% endhighlight %} + + +Returns an Integer to indicate if the text string is a numeric value, supports some characters that are not numbers, such as plus (+), minus (-), and valid currency symbols such as the dollar sign ($), if true, returns 1, otherwise returns 0, if text is NULL, returns NULL. E.g. ISNUMERIC('123.0') returns 1. + + + + +{% highlight text %} FROM_BASE64(text string) {% endhighlight %} diff --git a/docs/dev/table/tableApi.md b/docs/dev/table/tableApi.md index 6e202f19d5d..61f74d51dee 100644 --- a/docs/dev/table/tableApi.md +++ b/docs/dev/table/tableApi.md @@ -2477,6 +2477,17 @@ STRING.rpad(len INT, pad STRING) {% highlight java %} +STRING.isNumeric() +{% endhighlight %} + + + +Returns an Integer to indicate if the text string is a numeric value, supports some characters that are not numbers, such as plus (+), minus (-), and valid currency symbols such as the dollar sign ($), if true, returns 1, otherwise returns 0, if text is NULL, returns NULL. E.g. "123".isNumeric() returns 1. + + + + +{% highlight java %} STRING.fromBase64() {% endhighlight %} @@ -4025,6 +4036,18 @@ STRING.initCap() + + +{% highlight scala %} +STRING.isNumeric() +{% endhighlight %} + + + +Returns an Integer to indicate if the text string is a numeric value, supports some characters that are not numbers, such as plus (+), minus (-), and valid currency symbols such as the dollar sign ($), if true, returns 1, otherwise returns 0, if text is NULL, returns NULL. E.g. "123".isNumeric() returns 1. + + + diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala index 35d2167848a..4c3c411bbe0 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala @@ -544,6 +544,11 @@ trait ImplicitExpressionOperations { def overlay(newString: Expression, starting: Expression, length: Expression) = Overlay(expr, newString, starting, length) + /** +* Returns an Integer to indicate if the text string is a numeric value. +*/ + def isNumeric() = IsNumeric(expr) + /** * Returns the base string decoded with base64. */ diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala index 0e0f709eabc..e02dc6e8869 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala @@ -113,5 +113,7 @@ object BuiltInMethods { val BIN = Types.lookupMethod(classOf[JLong], "toBinaryString", classOf[Long]) + val ISNUMERIC = Types.lookupMethod(classOf[ScalarFunctions], "isNumeric", classOf[String]) + val FROMBASE64 = Types.lookupMethod(classOf[ScalarFunctions], "fromBase64", classOf[String]) } diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala index a5c275ab415..39f1f4948b1 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala @@ -146,6 +146,12 @@ object FunctionGenerator { STRING_TYPE_INFO, BuiltInMethod.OVERLAY.method) + addSqlFunctionMethod( +ISNUMERIC, +Seq(STRING_TYPE_INFO), +INT_TYPE_INFO, +BuiltInMethods.ISNUMERIC) + addSqlFunctionMethod( FROM_BASE64, Seq(STRING_TYPE_INFO), diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/stringExpressions.scala
[jira] [Updated] (FLINK-10425) taskmanager.host is not respected
[ https://issues.apache.org/jira/browse/FLINK-10425?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-10425: --- Labels: pull-request-available (was: ) > taskmanager.host is not respected > - > > Key: FLINK-10425 > URL: https://issues.apache.org/jira/browse/FLINK-10425 > Project: Flink > Issue Type: Bug > Components: TaskManager >Affects Versions: 1.6.1 >Reporter: Andrew Kowpak >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > The documentation states that taskmanager.host can be set to override the > discovered hostname, however, setting this value has no effect. > Looking at the code, the value never seems to be used. Instead, the > deprecated taskmanager.hostname is still used. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] yanghua opened a new pull request #6913: [FLINK-10425] taskmanager.host is not respected
yanghua opened a new pull request #6913: [FLINK-10425] taskmanager.host is not respected URL: https://github.com/apache/flink/pull/6913 ## What is the purpose of the change *This pull request fixes taskmanager.host is not respected* ## Brief change log - *Fix taskmanager.host is not respected* ## Verifying this change This change is already covered by existing tests*. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / **not documented**) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10425) taskmanager.host is not respected
[ https://issues.apache.org/jira/browse/FLINK-10425?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16661563#comment-16661563 ] ASF GitHub Bot commented on FLINK-10425: yanghua opened a new pull request #6913: [FLINK-10425] taskmanager.host is not respected URL: https://github.com/apache/flink/pull/6913 ## What is the purpose of the change *This pull request fixes taskmanager.host is not respected* ## Brief change log - *Fix taskmanager.host is not respected* ## Verifying this change This change is already covered by existing tests*. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / **not documented**) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > taskmanager.host is not respected > - > > Key: FLINK-10425 > URL: https://issues.apache.org/jira/browse/FLINK-10425 > Project: Flink > Issue Type: Bug > Components: TaskManager >Affects Versions: 1.6.1 >Reporter: Andrew Kowpak >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > The documentation states that taskmanager.host can be set to override the > discovered hostname, however, setting this value has no effect. > Looking at the code, the value never seems to be used. Instead, the > deprecated taskmanager.hostname is still used. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-10657) TPCHQuery3 fail with IllegalAccessException
[ https://issues.apache.org/jira/browse/FLINK-10657?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] JIN SUN updated FLINK-10657: Issue Type: Bug (was: Improvement) > TPCHQuery3 fail with IllegalAccessException > --- > > Key: FLINK-10657 > URL: https://issues.apache.org/jira/browse/FLINK-10657 > Project: Flink > Issue Type: Bug > Components: E2E Tests >Reporter: JIN SUN >Assignee: JIN SUN >Priority: Trivial > Labels: easy-fix, pull-request-available > Fix For: 1.7.0 > > > Similar with [FLINK-7998|https://issues.apache.org/jira/browse/FLINK-7998], > ShoppingPriorityItem in example TPCHQuery3.java are set to private. This > causes an IllegalAccessException exception because of reflection check in > dynamic class instantiation. Making them public resolves the problem -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-10657) TPCHQuery3 fail with IllegalAccessException
[ https://issues.apache.org/jira/browse/FLINK-10657?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-10657: --- Labels: easy-fix pull-request-available (was: easy-fix) > TPCHQuery3 fail with IllegalAccessException > --- > > Key: FLINK-10657 > URL: https://issues.apache.org/jira/browse/FLINK-10657 > Project: Flink > Issue Type: Improvement > Components: E2E Tests >Reporter: JIN SUN >Assignee: JIN SUN >Priority: Trivial > Labels: easy-fix, pull-request-available > Fix For: 1.7.0 > > > Similar with [FLINK-7998|https://issues.apache.org/jira/browse/FLINK-7998], > ShoppingPriorityItem in example TPCHQuery3.java are set to private. This > causes an IllegalAccessException exception because of reflection check in > dynamic class instantiation. Making them public resolves the problem -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10657) TPCHQuery3 fail with IllegalAccessException
[ https://issues.apache.org/jira/browse/FLINK-10657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16661532#comment-16661532 ] ASF GitHub Bot commented on FLINK-10657: isunjin opened a new pull request #6912: [FLINK-10657] TPCHQuery3 fail with IllegalAccessException URL: https://github.com/apache/flink/pull/6912 This is a trivial fix. Similar with [FLINK-7998](https://issues.apache.org/jira/browse/FLINK-10657), ShoppingPriorityItem in example TPCHQuery3.java are set to private. This causes an IllegalAccessException exception because of reflection check in dynamic class instantiation. Making them public resolves the problem This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > TPCHQuery3 fail with IllegalAccessException > --- > > Key: FLINK-10657 > URL: https://issues.apache.org/jira/browse/FLINK-10657 > Project: Flink > Issue Type: Improvement > Components: E2E Tests >Reporter: JIN SUN >Assignee: JIN SUN >Priority: Trivial > Labels: easy-fix, pull-request-available > Fix For: 1.7.0 > > > Similar with [FLINK-7998|https://issues.apache.org/jira/browse/FLINK-7998], > ShoppingPriorityItem in example TPCHQuery3.java are set to private. This > causes an IllegalAccessException exception because of reflection check in > dynamic class instantiation. Making them public resolves the problem -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] isunjin opened a new pull request #6912: [FLINK-10657] TPCHQuery3 fail with IllegalAccessException
isunjin opened a new pull request #6912: [FLINK-10657] TPCHQuery3 fail with IllegalAccessException URL: https://github.com/apache/flink/pull/6912 This is a trivial fix. Similar with [FLINK-7998](https://issues.apache.org/jira/browse/FLINK-10657), ShoppingPriorityItem in example TPCHQuery3.java are set to private. This causes an IllegalAccessException exception because of reflection check in dynamic class instantiation. Making them public resolves the problem This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-10657) TPCHQuery3 fail with IllegalAccessException
JIN SUN created FLINK-10657: --- Summary: TPCHQuery3 fail with IllegalAccessException Key: FLINK-10657 URL: https://issues.apache.org/jira/browse/FLINK-10657 Project: Flink Issue Type: Improvement Components: E2E Tests Reporter: JIN SUN Assignee: JIN SUN Fix For: 1.7.0 Similar with [FLINK-7998|https://issues.apache.org/jira/browse/FLINK-7998], ShoppingPriorityItem in example TPCHQuery3.java are set to private. This causes an IllegalAccessException exception because of reflection check in dynamic class instantiation. Making them public resolves the problem -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-10656) Refactor org.apache.flink.runtime.io.network.api.reader.ReaderBase
[ https://issues.apache.org/jira/browse/FLINK-10656?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-10656: --- Labels: pull-request-available (was: ) > Refactor org.apache.flink.runtime.io.network.api.reader.ReaderBase > -- > > Key: FLINK-10656 > URL: https://issues.apache.org/jira/browse/FLINK-10656 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Reporter: JIN SUN >Assignee: JIN SUN >Priority: Minor > Labels: pull-request-available > Fix For: 1.8.0 > > > The interface of org.apache.flink.runtime.io.network.api.reader.ReaderBase is > not very clean, the API in it are called only by iteration and handle event. > which is not related the name ReaderBase. And the functionality is > independent, so propose to change the name and split the interface to two > isolated interface. > More details please look at the PR. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10656) Refactor org.apache.flink.runtime.io.network.api.reader.ReaderBase
[ https://issues.apache.org/jira/browse/FLINK-10656?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16661403#comment-16661403 ] ASF GitHub Bot commented on FLINK-10656: isunjin opened a new pull request #6911: [FLINK-10656] Refactor org.apache.flink.runtime.io.network.api.reader… URL: https://github.com/apache/flink/pull/6911 ## What is the purpose of the change The interface of ```org.apache.flink.runtime.io.network.api.reader.ReaderBase``` is not very clean, the API in it are called only by iteration and handle event. which is not related the name ```ReaderBase```. And the functionality is independent, so propose to change the name and split the interface to two isolated interface. ## Brief change log - *Split interface ```ReaderBase``` to ```IterationReader``` and ```TaskEventSender```* - *rename ```getCurrentBuffer``` to ```resetCurrentBuffer```* ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Refactor org.apache.flink.runtime.io.network.api.reader.ReaderBase > -- > > Key: FLINK-10656 > URL: https://issues.apache.org/jira/browse/FLINK-10656 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Reporter: JIN SUN >Assignee: JIN SUN >Priority: Minor > Labels: pull-request-available > Fix For: 1.8.0 > > > The interface of org.apache.flink.runtime.io.network.api.reader.ReaderBase is > not very clean, the API in it are called only by iteration and handle event. > which is not related the name ReaderBase. And the functionality is > independent, so propose to change the name and split the interface to two > isolated interface. > More details please look at the PR. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] isunjin opened a new pull request #6911: [FLINK-10656] Refactor org.apache.flink.runtime.io.network.api.reader…
isunjin opened a new pull request #6911: [FLINK-10656] Refactor org.apache.flink.runtime.io.network.api.reader… URL: https://github.com/apache/flink/pull/6911 ## What is the purpose of the change The interface of ```org.apache.flink.runtime.io.network.api.reader.ReaderBase``` is not very clean, the API in it are called only by iteration and handle event. which is not related the name ```ReaderBase```. And the functionality is independent, so propose to change the name and split the interface to two isolated interface. ## Brief change log - *Split interface ```ReaderBase``` to ```IterationReader``` and ```TaskEventSender```* - *rename ```getCurrentBuffer``` to ```resetCurrentBuffer```* ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-10656) Refactor org.apache.flink.runtime.io.network.api.reader.ReaderBase
JIN SUN created FLINK-10656: --- Summary: Refactor org.apache.flink.runtime.io.network.api.reader.ReaderBase Key: FLINK-10656 URL: https://issues.apache.org/jira/browse/FLINK-10656 Project: Flink Issue Type: Improvement Components: Distributed Coordination Reporter: JIN SUN Assignee: JIN SUN Fix For: 1.8.0 The interface of org.apache.flink.runtime.io.network.api.reader.ReaderBase is not very clean, the API in it are called only by iteration and handle event. which is not related the name ReaderBase. And the functionality is independent, so propose to change the name and split the interface to two isolated interface. More details please look at the PR. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] tzulitai commented on a change in pull request #6875: [FLINK-9808] [state backends] Migrate state when necessary in state backends
tzulitai commented on a change in pull request #6875: [FLINK-9808] [state backends] Migrate state when necessary in state backends URL: https://github.com/apache/flink/pull/6875#discussion_r227538700 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendMigrationTestBase.java ## @@ -0,0 +1,775 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility; +import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot; +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.checkpoint.StateObjectCollection; +import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.operators.testutils.DummyEnvironment; +import org.apache.flink.types.StringValue; +import org.apache.flink.util.TestLogger; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.RunnableFuture; + +import static org.junit.Assert.assertEquals; + +/** + * Tests for the {@link KeyedStateBackend} and {@link OperatorStateBackend} as produced + * by various {@link StateBackend}s. + */ +@SuppressWarnings("serial") +public abstract class StateBackendMigrationTestBase extends TestLogger { + + @Rule + public final ExpectedException expectedException = ExpectedException.none(); + + // lazily initialized stream storage + private CheckpointStorageLocation checkpointStorageLocation; + + /** +* Different "personalities" of {@link CustomStringSerializer}. Instead of creating +* different classes we parameterize the serializer with this and +* {@link CustomStringSerializerSnapshot} will instantiate serializers with the correct +* personality. +*/ + public enum SerializerVersion { + INITIAL, + RESTORE, + NEW + } + + /** +* The compatibility behaviour of {@link CustomStringSerializer}. This controls what +* type of serializer {@link CustomStringSerializerSnapshot} will create for +* the different methods that return/create serializers. +*/ + public enum SerializerCompatibilityType { + COMPATIBLE_AS_IS, + REQUIRES_MIGRATION + } + + /** +* The serialization timeliness behaviour of the state backend under test. +*/ + public enum BackendSerializationTimeliness { + ON_ACCESS, + ON_CHECKPOINTS + } + + @Test + @SuppressWarnings("unchecked") + public void testValueStateWithSerializerRequiringMigration() throws Exception { + CustomStringSerializer.resetCountingMaps(); + + CheckpointStreamFactory streamFactory = createStreamFactory(); + SharedStateRegistry sharedStateRegistry = new SharedStateRegistry(); + AbstractKeyedStateBackend backend = createKeyedBackend(IntSerializer.INSTANCE); + + ValueStateDescriptor kvId = new ValueStateDescriptor<>( + "id", + new CustomStringSerializer( + org.apache.flink.runtime.state.StateBackendMigrationTestBase.SerializerCompatibilityType.REQUIRES_MIGRATION, org.apache.flink.runtime.state.StateBackendMigrationTestBase.SerializerVersion.INITIAL)); Review comment: Fixed This is an automated message from the Apache Git Service. To respond to the message, please
[jira] [Commented] (FLINK-9808) Implement state conversion procedure in state backends
[ https://issues.apache.org/jira/browse/FLINK-9808?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16661178#comment-16661178 ] ASF GitHub Bot commented on FLINK-9808: --- tzulitai commented on a change in pull request #6875: [FLINK-9808] [state backends] Migrate state when necessary in state backends URL: https://github.com/apache/flink/pull/6875#discussion_r227538700 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendMigrationTestBase.java ## @@ -0,0 +1,775 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility; +import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot; +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.checkpoint.StateObjectCollection; +import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.operators.testutils.DummyEnvironment; +import org.apache.flink.types.StringValue; +import org.apache.flink.util.TestLogger; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.RunnableFuture; + +import static org.junit.Assert.assertEquals; + +/** + * Tests for the {@link KeyedStateBackend} and {@link OperatorStateBackend} as produced + * by various {@link StateBackend}s. + */ +@SuppressWarnings("serial") +public abstract class StateBackendMigrationTestBase extends TestLogger { + + @Rule + public final ExpectedException expectedException = ExpectedException.none(); + + // lazily initialized stream storage + private CheckpointStorageLocation checkpointStorageLocation; + + /** +* Different "personalities" of {@link CustomStringSerializer}. Instead of creating +* different classes we parameterize the serializer with this and +* {@link CustomStringSerializerSnapshot} will instantiate serializers with the correct +* personality. +*/ + public enum SerializerVersion { + INITIAL, + RESTORE, + NEW + } + + /** +* The compatibility behaviour of {@link CustomStringSerializer}. This controls what +* type of serializer {@link CustomStringSerializerSnapshot} will create for +* the different methods that return/create serializers. +*/ + public enum SerializerCompatibilityType { + COMPATIBLE_AS_IS, + REQUIRES_MIGRATION + } + + /** +* The serialization timeliness behaviour of the state backend under test. +*/ + public enum BackendSerializationTimeliness { + ON_ACCESS, + ON_CHECKPOINTS + } + + @Test + @SuppressWarnings("unchecked") + public void testValueStateWithSerializerRequiringMigration() throws Exception { + CustomStringSerializer.resetCountingMaps(); + + CheckpointStreamFactory streamFactory = createStreamFactory(); + SharedStateRegistry sharedStateRegistry = new SharedStateRegistry(); + AbstractKeyedStateBackend backend = createKeyedBackend(IntSerializer.INSTANCE); + + ValueStateDescriptor kvId = new ValueStateDescriptor<>( + "id", + new CustomStringSerializer( + org.apache.flink.runtime.state.StateBackendMigrationTestBase.SerializerCompatibilityType.REQUIRES_MIGRATION,
[jira] [Commented] (FLINK-10490) OperatorSnapshotUtil should probably use SavepointV2Serializer
[ https://issues.apache.org/jira/browse/FLINK-10490?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16660994#comment-16660994 ] ASF GitHub Bot commented on FLINK-10490: StefanRRichter opened a new pull request #6910: [FLINK-10490][tests] OperatorSnapshotUtil should use SavepointV2Seria… URL: https://github.com/apache/flink/pull/6910 ## Brief change log Methods in `OperatorSnapshotUtil` that are used in current tests are still based on the outdated `SavepointV1Serializer`. This PR updates this to the current `SavepointV2Serializer`. ## Verifying this change This change is already covered by existing tests. ## Does this pull request potentially affect one of the following parts: Change is only relevant for tests. ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > OperatorSnapshotUtil should probably use SavepointV2Serializer > -- > > Key: FLINK-10490 > URL: https://issues.apache.org/jira/browse/FLINK-10490 > Project: Flink > Issue Type: Test > Components: Tests >Affects Versions: 1.7.0 >Reporter: Stefan Richter >Priority: Major > Labels: pull-request-available > > {{OperatorSnapshotUtil}} is used for testing savepoint migration. This > utility internally still uses {{SavepointV1Serializer}} and I would assume > that it should use {{SavepointV2Serializer}}. I wonder if that means that > some newer cases are actually not covered in the migration tests. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-10490) OperatorSnapshotUtil should probably use SavepointV2Serializer
[ https://issues.apache.org/jira/browse/FLINK-10490?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-10490: --- Labels: pull-request-available (was: ) > OperatorSnapshotUtil should probably use SavepointV2Serializer > -- > > Key: FLINK-10490 > URL: https://issues.apache.org/jira/browse/FLINK-10490 > Project: Flink > Issue Type: Test > Components: Tests >Affects Versions: 1.7.0 >Reporter: Stefan Richter >Priority: Major > Labels: pull-request-available > > {{OperatorSnapshotUtil}} is used for testing savepoint migration. This > utility internally still uses {{SavepointV1Serializer}} and I would assume > that it should use {{SavepointV2Serializer}}. I wonder if that means that > some newer cases are actually not covered in the migration tests. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] StefanRRichter opened a new pull request #6910: [FLINK-10490][tests] OperatorSnapshotUtil should use SavepointV2Seria…
StefanRRichter opened a new pull request #6910: [FLINK-10490][tests] OperatorSnapshotUtil should use SavepointV2Seria… URL: https://github.com/apache/flink/pull/6910 ## Brief change log Methods in `OperatorSnapshotUtil` that are used in current tests are still based on the outdated `SavepointV1Serializer`. This PR updates this to the current `SavepointV2Serializer`. ## Verifying this change This change is already covered by existing tests. ## Does this pull request potentially affect one of the following parts: Change is only relevant for tests. ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-8995) Add a test operator with keyed state that uses custom, stateful serializer
[ https://issues.apache.org/jira/browse/FLINK-8995?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16660969#comment-16660969 ] ASF GitHub Bot commented on FLINK-8995: --- StefanRRichter commented on issue #6909: [FLINK-8995][tests] Add keyed state that uses custom stateful seriali… URL: https://github.com/apache/flink/pull/6909#issuecomment-432332081 CC @tzulitai This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add a test operator with keyed state that uses custom, stateful serializer > -- > > Key: FLINK-8995 > URL: https://issues.apache.org/jira/browse/FLINK-8995 > Project: Flink > Issue Type: Sub-task > Components: Tests >Affects Versions: 1.5.0 >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > This test should figure out problems in places where multiple threads would > share the same serializer instead of properly duplicating it. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] StefanRRichter commented on issue #6909: [FLINK-8995][tests] Add keyed state that uses custom stateful seriali…
StefanRRichter commented on issue #6909: [FLINK-8995][tests] Add keyed state that uses custom stateful seriali… URL: https://github.com/apache/flink/pull/6909#issuecomment-432332081 CC @tzulitai This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-8995) Add a test operator with keyed state that uses custom, stateful serializer
[ https://issues.apache.org/jira/browse/FLINK-8995?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-8995: -- Labels: pull-request-available (was: ) > Add a test operator with keyed state that uses custom, stateful serializer > -- > > Key: FLINK-8995 > URL: https://issues.apache.org/jira/browse/FLINK-8995 > Project: Flink > Issue Type: Sub-task > Components: Tests >Affects Versions: 1.5.0 >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > This test should figure out problems in places where multiple threads would > share the same serializer instead of properly duplicating it. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8995) Add a test operator with keyed state that uses custom, stateful serializer
[ https://issues.apache.org/jira/browse/FLINK-8995?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16660964#comment-16660964 ] ASF GitHub Bot commented on FLINK-8995: --- StefanRRichter opened a new pull request #6909: [FLINK-8995][tests] Add keyed state that uses custom stateful seriali… URL: https://github.com/apache/flink/pull/6909 ## What is the purpose of the change This PR adds keyed state that uses a custom stateful serializer to the allround e2e test job, so that was cover this case. ## Verifying this change For example, run `./run-single-test.sh test-scripts/test_ha_datastream.sh` ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add a test operator with keyed state that uses custom, stateful serializer > -- > > Key: FLINK-8995 > URL: https://issues.apache.org/jira/browse/FLINK-8995 > Project: Flink > Issue Type: Sub-task > Components: Tests >Affects Versions: 1.5.0 >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > This test should figure out problems in places where multiple threads would > share the same serializer instead of properly duplicating it. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] StefanRRichter opened a new pull request #6909: [FLINK-8995][tests] Add keyed state that uses custom stateful seriali…
StefanRRichter opened a new pull request #6909: [FLINK-8995][tests] Add keyed state that uses custom stateful seriali… URL: https://github.com/apache/flink/pull/6909 ## What is the purpose of the change This PR adds keyed state that uses a custom stateful serializer to the allround e2e test job, so that was cover this case. ## Verifying this change For example, run `./run-single-test.sh test-scripts/test_ha_datastream.sh` ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-10655) RemoteRpcInvocation not overwriting ObjectInputStream's ClassNotFoundException
Nico Kruber created FLINK-10655: --- Summary: RemoteRpcInvocation not overwriting ObjectInputStream's ClassNotFoundException Key: FLINK-10655 URL: https://issues.apache.org/jira/browse/FLINK-10655 Project: Flink Issue Type: Bug Components: Distributed Coordination Affects Versions: 1.6.1, 1.5.4, 1.7.0 Environment: {{RemoteRpcInvocation}} tries to give a more details {{ClassNotFoundException}} if the method type/argument deserialization fails. However, it turns out, once {{ObjectInputStream}} has received a {{ClassNotFoundException}}, it will not overwrite this anymore and we can therefore not provide a more detailed {{ClassNotFoundException}}. Instead, the least invasive solution would be to add a suppressed {{ClassNotFoundException}} to the existing one. While at it, we could also add more details, i.e. the successfully deserialized types and arguments. Reporter: Nico Kruber Assignee: Nico Kruber Fix For: 1.5.6, 1.6.3, 1.7.0 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool
[ https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16660910#comment-16660910 ] ASF GitHub Bot commented on FLINK-10431: StefanRRichter commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler URL: https://github.com/apache/flink/pull/6898#discussion_r227466414 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PreviousAllocationSlotSelectionStrategy.java ## @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster.slotpool; + +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.SlotProfile; +import org.apache.flink.runtime.jobmanager.scheduler.Locality; +import org.apache.flink.runtime.jobmaster.SlotInfo; + +import javax.annotation.Nonnull; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +/** + * This class implements a {@link SlotSelectionStrategy} that is based on previous allocations and + * falls back to using location preference hints if there is no previous allocation. + */ +public class PreviousAllocationSlotSelectionStrategy implements SlotSelectionStrategy { + + public static final PreviousAllocationSlotSelectionStrategy INSTANCE = + new PreviousAllocationSlotSelectionStrategy(); + + private final LocationPreferenceSlotSelection locationPreferenceSlotSelection; + + private PreviousAllocationSlotSelectionStrategy() { + this.locationPreferenceSlotSelection = LocationPreferenceSlotSelection.INSTANCE; + } + + @Nonnull + @Override + public SlotInfoAndLocality selectBestSlotForProfile( + @Nonnull List availableSlots, + @Nonnull SlotProfile slotProfile) { + + Collection priorAllocations = slotProfile.getPreferredAllocations(); + + // First, if there was a prior allocation try to schedule to the same/old slot + if (!priorAllocations.isEmpty()) { + + HashSet priorAllocationsSet = new HashSet<>(priorAllocations); Review comment: In theory: it is a collection and contains on a collection can be expensive, depending on the implementation. A hash set offers ideal performance for contains. In practise, this collections is currently always of size 1, so either way, it should not matter much. But as the code could evolve, I prefer not to create a potential complexity bomb :) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Extract scheduling-related code from SlotPool > - > > Key: FLINK-10431 > URL: https://issues.apache.org/jira/browse/FLINK-10431 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Affects Versions: 1.7.0 >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Labels: pull-request-available > > The other half of the current scheduling logic is the management of slot > sharing and is located in the SlotPool. We need to extract this logic into > our new Scheduler component from the previous step. This leaves us with a > simpler SlotPool that mainly cares about obtaining, holding, and releasing > slots in interaction with a ResourceManager. The new Scheduler can now > identify slot sharing groups and interacts with the SlotPool. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] StefanRRichter commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler
StefanRRichter commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler URL: https://github.com/apache/flink/pull/6898#discussion_r227466414 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PreviousAllocationSlotSelectionStrategy.java ## @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster.slotpool; + +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.SlotProfile; +import org.apache.flink.runtime.jobmanager.scheduler.Locality; +import org.apache.flink.runtime.jobmaster.SlotInfo; + +import javax.annotation.Nonnull; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +/** + * This class implements a {@link SlotSelectionStrategy} that is based on previous allocations and + * falls back to using location preference hints if there is no previous allocation. + */ +public class PreviousAllocationSlotSelectionStrategy implements SlotSelectionStrategy { + + public static final PreviousAllocationSlotSelectionStrategy INSTANCE = + new PreviousAllocationSlotSelectionStrategy(); + + private final LocationPreferenceSlotSelection locationPreferenceSlotSelection; + + private PreviousAllocationSlotSelectionStrategy() { + this.locationPreferenceSlotSelection = LocationPreferenceSlotSelection.INSTANCE; + } + + @Nonnull + @Override + public SlotInfoAndLocality selectBestSlotForProfile( + @Nonnull List availableSlots, + @Nonnull SlotProfile slotProfile) { + + Collection priorAllocations = slotProfile.getPreferredAllocations(); + + // First, if there was a prior allocation try to schedule to the same/old slot + if (!priorAllocations.isEmpty()) { + + HashSet priorAllocationsSet = new HashSet<>(priorAllocations); Review comment: In theory: it is a collection and contains on a collection can be expensive, depending on the implementation. A hash set offers ideal performance for contains. In practise, this collections is currently always of size 1, so either way, it should not matter much. But as the code could evolve, I prefer not to create a potential complexity bomb :) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool
[ https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16660888#comment-16660888 ] ASF GitHub Bot commented on FLINK-10431: tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler URL: https://github.com/apache/flink/pull/6898#discussion_r227450833 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultSchedulerFactory.java ## @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster.slotpool; + +import org.apache.flink.configuration.CheckpointingOptions; +import org.apache.flink.configuration.Configuration; + +import javax.annotation.Nonnull; + +import java.util.HashMap; + +/** + * Default implementation of a {@link SchedulerFactory}. + */ +public class DefaultSchedulerFactory implements SchedulerFactory { + + @Nonnull + private final SlotSelectionStrategy slotSelectionStrategy; + + public DefaultSchedulerFactory(@Nonnull SlotSelectionStrategy slotSelectionStrategy) { + this.slotSelectionStrategy = slotSelectionStrategy; + } + + @Nonnull + @Override + public Scheduler createScheduler(@Nonnull SlotPoolGateway slotPoolGateway) { + return new Scheduler(new HashMap<>(), slotSelectionStrategy, slotPoolGateway); Review comment: What could be a reasonable initial size of the `HashMap`? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Extract scheduling-related code from SlotPool > - > > Key: FLINK-10431 > URL: https://issues.apache.org/jira/browse/FLINK-10431 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Affects Versions: 1.7.0 >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Labels: pull-request-available > > The other half of the current scheduling logic is the management of slot > sharing and is located in the SlotPool. We need to extract this logic into > our new Scheduler component from the previous step. This leaves us with a > simpler SlotPool that mainly cares about obtaining, holding, and releasing > slots in interaction with a ResourceManager. The new Scheduler can now > identify slot sharing groups and interacts with the SlotPool. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool
[ https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16660895#comment-16660895 ] ASF GitHub Bot commented on FLINK-10431: tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler URL: https://github.com/apache/flink/pull/6898#discussion_r227456207 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/Scheduler.java ## @@ -0,0 +1,516 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster.slotpool; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.clusterframework.types.SlotProfile; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.instance.SlotSharingGroupId; +import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint; +import org.apache.flink.runtime.jobmanager.scheduler.Locality; +import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException; +import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit; +import org.apache.flink.runtime.jobmaster.LogicalSlot; +import org.apache.flink.runtime.jobmaster.SlotContext; +import org.apache.flink.runtime.jobmaster.SlotInfo; +import org.apache.flink.runtime.jobmaster.SlotOwner; +import org.apache.flink.runtime.jobmaster.SlotRequestId; +import org.apache.flink.runtime.messages.Acknowledge; +import org.apache.flink.util.AbstractID; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.Executor; +import java.util.function.BooleanSupplier; + +/** + * Scheduler that assigns tasks to slots. This class is currently work in progress, comments will be updated as we + * move forward. + */ +public class Scheduler implements SlotProvider, SlotOwner { + + /** Logger */ + private final Logger log = LoggerFactory.getLogger(getClass()); + + /** Strategy that selects the best slot for a given slot allocation request. */ + @Nonnull + private final SlotSelectionStrategy slotSelectionStrategy; + + /** Managers for the different slot sharing groups. */ + @Nonnull + private final Map slotSharingManagersMap; + + /** The slot pool from which slots are allocated. */ + @Nonnull + private final SlotPoolGateway slotPoolGateway; + + /** Executor for running tasks in the job master's main thread. */ + @Nonnull + private Executor componentMainThreadExecutor; + + /** Predicate to check if the current thread is the job master's main thread. */ + @Nonnull + private BooleanSupplier componentMainThreadCheck; + + + public Scheduler( + @Nonnull Map slotSharingManagersMap, + @Nonnull SlotSelectionStrategy slotSelectionStrategy, + @Nonnull SlotPoolGateway slotPoolGateway) { + + this.slotSelectionStrategy = slotSelectionStrategy; + this.slotSharingManagersMap = slotSharingManagersMap; + this.slotPoolGateway = slotPoolGateway; + this.componentMainThreadExecutor = (runnable) -> { + throw new IllegalStateException("Main thread executor not initialized."); + }; + this.componentMainThreadCheck = () -> { + throw new IllegalStateException("Main thread checker not initialized"); + }; + } + + public void start(@Nonnull Executor mainThreadExecutor, @Nonnull BooleanSupplier mainThreadCheck) { + this.componentMainThreadExecutor = mainThreadExecutor; + this.componentMainThreadCheck = mainThreadCheck; + }
[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool
[ https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16660896#comment-16660896 ] ASF GitHub Bot commented on FLINK-10431: tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler URL: https://github.com/apache/flink/pull/6898#discussion_r227460500 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/Scheduler.java ## @@ -0,0 +1,516 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster.slotpool; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.clusterframework.types.SlotProfile; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.instance.SlotSharingGroupId; +import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint; +import org.apache.flink.runtime.jobmanager.scheduler.Locality; +import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException; +import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit; +import org.apache.flink.runtime.jobmaster.LogicalSlot; +import org.apache.flink.runtime.jobmaster.SlotContext; +import org.apache.flink.runtime.jobmaster.SlotInfo; +import org.apache.flink.runtime.jobmaster.SlotOwner; +import org.apache.flink.runtime.jobmaster.SlotRequestId; +import org.apache.flink.runtime.messages.Acknowledge; +import org.apache.flink.util.AbstractID; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.Executor; +import java.util.function.BooleanSupplier; + +/** + * Scheduler that assigns tasks to slots. This class is currently work in progress, comments will be updated as we + * move forward. + */ +public class Scheduler implements SlotProvider, SlotOwner { + + /** Logger */ + private final Logger log = LoggerFactory.getLogger(getClass()); + + /** Strategy that selects the best slot for a given slot allocation request. */ + @Nonnull + private final SlotSelectionStrategy slotSelectionStrategy; + + /** Managers for the different slot sharing groups. */ + @Nonnull + private final Map slotSharingManagersMap; + + /** The slot pool from which slots are allocated. */ + @Nonnull + private final SlotPoolGateway slotPoolGateway; + + /** Executor for running tasks in the job master's main thread. */ + @Nonnull + private Executor componentMainThreadExecutor; + + /** Predicate to check if the current thread is the job master's main thread. */ + @Nonnull + private BooleanSupplier componentMainThreadCheck; + + + public Scheduler( + @Nonnull Map slotSharingManagersMap, + @Nonnull SlotSelectionStrategy slotSelectionStrategy, + @Nonnull SlotPoolGateway slotPoolGateway) { + + this.slotSelectionStrategy = slotSelectionStrategy; + this.slotSharingManagersMap = slotSharingManagersMap; + this.slotPoolGateway = slotPoolGateway; + this.componentMainThreadExecutor = (runnable) -> { + throw new IllegalStateException("Main thread executor not initialized."); + }; + this.componentMainThreadCheck = () -> { + throw new IllegalStateException("Main thread checker not initialized"); + }; + } + + public void start(@Nonnull Executor mainThreadExecutor, @Nonnull BooleanSupplier mainThreadCheck) { + this.componentMainThreadExecutor = mainThreadExecutor; + this.componentMainThreadCheck = mainThreadCheck; + }
[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool
[ https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16660893#comment-16660893 ] ASF GitHub Bot commented on FLINK-10431: tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler URL: https://github.com/apache/flink/pull/6898#discussion_r227453190 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/LocationPreferenceSlotSelection.java ## @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster.slotpool; + +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.clusterframework.types.SlotProfile; +import org.apache.flink.runtime.jobmanager.scheduler.Locality; +import org.apache.flink.runtime.jobmaster.SlotInfo; +import org.apache.flink.runtime.taskmanager.TaskManagerLocation; + +import javax.annotation.Nonnull; + +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.BiFunction; + +/** + * This class implements a {@link SlotSelectionStrategy} that is based on location preference hints. + */ +public class LocationPreferenceSlotSelection implements SlotSelectionStrategy { + + public static final LocationPreferenceSlotSelection INSTANCE = new LocationPreferenceSlotSelection(); + + private LocationPreferenceSlotSelection() { + } + + /** +* Calculates the candidate's locality score. +*/ + private static final BiFunction LOCALITY_EVALUATION_FUNCTION = + (localWeigh, hostLocalWeigh) -> localWeigh * 10 + hostLocalWeigh; + + @Nonnull + @Override + public SlotInfoAndLocality selectBestSlotForProfile( + @Nonnull List availableSlots, + @Nonnull SlotProfile slotProfile) { + + Collection locationPreferences = slotProfile.getPreferredLocations(); + + if (availableSlots.isEmpty()) { + return SlotInfoAndLocality.of(null, Locality.UNKNOWN); + } + + final ResourceProfile resourceProfile = slotProfile.getResourceProfile(); + + // if we have no location preferences, we can only filter by the additional requirements. + if (locationPreferences.isEmpty()) { + for (SlotInfo candidate : availableSlots) { + if (candidate.getResourceProfile().isMatching(resourceProfile)) { + return SlotInfoAndLocality.of(candidate, Locality.UNCONSTRAINED); + } + } + return SlotInfoAndLocality.of(null, Locality.UNKNOWN); + } + + // we build up two indexes, one for resource id and one for host names of the preferred locations. + final Map preferredResourceIDs = new HashMap<>(locationPreferences.size()); + final Map preferredFQHostNames = new HashMap<>(locationPreferences.size()); + + for (TaskManagerLocation locationPreference : locationPreferences) { + preferredResourceIDs.merge(locationPreference.getResourceID(), 1, Integer::sum); + preferredFQHostNames.merge(locationPreference.getFQDNHostname(), 1, Integer::sum); + } + + SlotInfo bestCandidate = null; + Locality bestCandidateLocality = Locality.UNKNOWN; + int bestCandidateScore = Integer.MIN_VALUE; + + for (SlotInfo candidate : availableSlots) { + + if (candidate.getResourceProfile().isMatching(resourceProfile)) { + + // this gets candidate is local-weigh + Integer localWeigh = preferredResourceIDs.getOrDefault(candidate.getTaskManagerLocation().getResourceID(), 0); + +
[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool
[ https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16660890#comment-16660890 ] ASF GitHub Bot commented on FLINK-10431: tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler URL: https://github.com/apache/flink/pull/6898#discussion_r227449157 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java ## @@ -285,7 +290,9 @@ public JobMaster( this.slotPool = checkNotNull(slotPoolFactory).createSlotPool(jobGraph.getJobID()); - this.slotPoolGateway = slotPool.getSelfGateway(SlotPoolGateway.class); + this.slotPoolGateway = slotPool; Review comment: Can we remove this field? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Extract scheduling-related code from SlotPool > - > > Key: FLINK-10431 > URL: https://issues.apache.org/jira/browse/FLINK-10431 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Affects Versions: 1.7.0 >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Labels: pull-request-available > > The other half of the current scheduling logic is the management of slot > sharing and is located in the SlotPool. We need to extract this logic into > our new Scheduler component from the previous step. This leaves us with a > simpler SlotPool that mainly cares about obtaining, holding, and releasing > slots in interaction with a ResourceManager. The new Scheduler can now > identify slot sharing groups and interacts with the SlotPool. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool
[ https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16660889#comment-16660889 ] ASF GitHub Bot commented on FLINK-10431: tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler URL: https://github.com/apache/flink/pull/6898#discussion_r227450350 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/AllocatedSlotActions.java ## @@ -39,10 +38,23 @@ * @param slotRequestId identifying the slot to release * @param slotSharingGroupId identifying the slot sharing group to which the slot belongs, null if none * @param cause of the slot release, null if none -* @return Acknowledge (future) after the slot has been released +* @return Acknowledge after the slot has been released */ - CompletableFuture releaseSlot( + @Deprecated + Acknowledge releaseSlot( Review comment: Could be `void`? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Extract scheduling-related code from SlotPool > - > > Key: FLINK-10431 > URL: https://issues.apache.org/jira/browse/FLINK-10431 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Affects Versions: 1.7.0 >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Labels: pull-request-available > > The other half of the current scheduling logic is the management of slot > sharing and is located in the SlotPool. We need to extract this logic into > our new Scheduler component from the previous step. This leaves us with a > simpler SlotPool that mainly cares about obtaining, holding, and releasing > slots in interaction with a ResourceManager. The new Scheduler can now > identify slot sharing groups and interacts with the SlotPool. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool
[ https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16660892#comment-16660892 ] ASF GitHub Bot commented on FLINK-10431: tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler URL: https://github.com/apache/flink/pull/6898#discussion_r227450458 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/AllocatedSlotActions.java ## @@ -39,10 +38,23 @@ * @param slotRequestId identifying the slot to release * @param slotSharingGroupId identifying the slot sharing group to which the slot belongs, null if none * @param cause of the slot release, null if none -* @return Acknowledge (future) after the slot has been released +* @return Acknowledge after the slot has been released */ - CompletableFuture releaseSlot( + @Deprecated + Acknowledge releaseSlot( SlotRequestId slotRequestId, @Nullable SlotSharingGroupId slotSharingGroupId, @Nullable Throwable cause); + + /** +* Releases the slot with the given {@link SlotRequestId}. Additionally, one can provide a cause for the slot release. +* +* @param slotRequestId identifying the slot to release +* @param cause of the slot release, null if none +* @return Acknowledge after the slot has been released +*/ + @Nonnull + Acknowledge releaseSlot( Review comment: Same here with `void`? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Extract scheduling-related code from SlotPool > - > > Key: FLINK-10431 > URL: https://issues.apache.org/jira/browse/FLINK-10431 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Affects Versions: 1.7.0 >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Labels: pull-request-available > > The other half of the current scheduling logic is the management of slot > sharing and is located in the SlotPool. We need to extract this logic into > our new Scheduler component from the previous step. This leaves us with a > simpler SlotPool that mainly cares about obtaining, holding, and releasing > slots in interaction with a ResourceManager. The new Scheduler can now > identify slot sharing groups and interacts with the SlotPool. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool
[ https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16660894#comment-16660894 ] ASF GitHub Bot commented on FLINK-10431: tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler URL: https://github.com/apache/flink/pull/6898#discussion_r227452749 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/LocationPreferenceSlotSelection.java ## @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster.slotpool; + +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.clusterframework.types.SlotProfile; +import org.apache.flink.runtime.jobmanager.scheduler.Locality; +import org.apache.flink.runtime.jobmaster.SlotInfo; +import org.apache.flink.runtime.taskmanager.TaskManagerLocation; + +import javax.annotation.Nonnull; + +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.BiFunction; + +/** + * This class implements a {@link SlotSelectionStrategy} that is based on location preference hints. + */ +public class LocationPreferenceSlotSelection implements SlotSelectionStrategy { + + public static final LocationPreferenceSlotSelection INSTANCE = new LocationPreferenceSlotSelection(); + + private LocationPreferenceSlotSelection() { + } + + /** +* Calculates the candidate's locality score. +*/ + private static final BiFunction LOCALITY_EVALUATION_FUNCTION = + (localWeigh, hostLocalWeigh) -> localWeigh * 10 + hostLocalWeigh; + + @Nonnull + @Override + public SlotInfoAndLocality selectBestSlotForProfile( + @Nonnull List availableSlots, + @Nonnull SlotProfile slotProfile) { + + Collection locationPreferences = slotProfile.getPreferredLocations(); + + if (availableSlots.isEmpty()) { + return SlotInfoAndLocality.of(null, Locality.UNKNOWN); + } + + final ResourceProfile resourceProfile = slotProfile.getResourceProfile(); + + // if we have no location preferences, we can only filter by the additional requirements. + if (locationPreferences.isEmpty()) { + for (SlotInfo candidate : availableSlots) { + if (candidate.getResourceProfile().isMatching(resourceProfile)) { + return SlotInfoAndLocality.of(candidate, Locality.UNCONSTRAINED); + } + } + return SlotInfoAndLocality.of(null, Locality.UNKNOWN); Review comment: Factor out in separate method, e.g. `selectBestSlotByResourceProfile` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Extract scheduling-related code from SlotPool > - > > Key: FLINK-10431 > URL: https://issues.apache.org/jira/browse/FLINK-10431 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Affects Versions: 1.7.0 >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Labels: pull-request-available > > The other half of the current scheduling logic is the management of slot > sharing and is located in the SlotPool. We need to extract this logic into > our new Scheduler component from the previous step. This leaves us with a > simpler SlotPool that mainly cares about obtaining, holding, and
[GitHub] tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler
tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler URL: https://github.com/apache/flink/pull/6898#discussion_r227448691 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java ## @@ -172,8 +172,8 @@ else if (ret instanceof CompletableFuture) { } @Override - public CompletableFuture cancelSlotRequest(SlotRequestId slotRequestId, @Nullable SlotSharingGroupId slotSharingGroupId, Throwable cause) { - return CompletableFuture.completedFuture(Acknowledge.get()); + public Acknowledge cancelSlotRequest(SlotRequestId slotRequestId, @Nullable SlotSharingGroupId slotSharingGroupId, Throwable cause) { Review comment: I guess it could have a `void` return type. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler
tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler URL: https://github.com/apache/flink/pull/6898#discussion_r227456417 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/Scheduler.java ## @@ -0,0 +1,516 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster.slotpool; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.clusterframework.types.SlotProfile; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.instance.SlotSharingGroupId; +import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint; +import org.apache.flink.runtime.jobmanager.scheduler.Locality; +import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException; +import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit; +import org.apache.flink.runtime.jobmaster.LogicalSlot; +import org.apache.flink.runtime.jobmaster.SlotContext; +import org.apache.flink.runtime.jobmaster.SlotInfo; +import org.apache.flink.runtime.jobmaster.SlotOwner; +import org.apache.flink.runtime.jobmaster.SlotRequestId; +import org.apache.flink.runtime.messages.Acknowledge; +import org.apache.flink.util.AbstractID; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.Executor; +import java.util.function.BooleanSupplier; + +/** + * Scheduler that assigns tasks to slots. This class is currently work in progress, comments will be updated as we + * move forward. + */ +public class Scheduler implements SlotProvider, SlotOwner { + + /** Logger */ + private final Logger log = LoggerFactory.getLogger(getClass()); + + /** Strategy that selects the best slot for a given slot allocation request. */ + @Nonnull + private final SlotSelectionStrategy slotSelectionStrategy; + + /** Managers for the different slot sharing groups. */ + @Nonnull + private final Map slotSharingManagersMap; + + /** The slot pool from which slots are allocated. */ + @Nonnull + private final SlotPoolGateway slotPoolGateway; + + /** Executor for running tasks in the job master's main thread. */ + @Nonnull + private Executor componentMainThreadExecutor; + + /** Predicate to check if the current thread is the job master's main thread. */ + @Nonnull + private BooleanSupplier componentMainThreadCheck; + + + public Scheduler( + @Nonnull Map slotSharingManagersMap, + @Nonnull SlotSelectionStrategy slotSelectionStrategy, + @Nonnull SlotPoolGateway slotPoolGateway) { + + this.slotSelectionStrategy = slotSelectionStrategy; + this.slotSharingManagersMap = slotSharingManagersMap; + this.slotPoolGateway = slotPoolGateway; + this.componentMainThreadExecutor = (runnable) -> { + throw new IllegalStateException("Main thread executor not initialized."); + }; + this.componentMainThreadCheck = () -> { + throw new IllegalStateException("Main thread checker not initialized"); + }; + } + + public void start(@Nonnull Executor mainThreadExecutor, @Nonnull BooleanSupplier mainThreadCheck) { + this.componentMainThreadExecutor = mainThreadExecutor; + this.componentMainThreadCheck = mainThreadCheck; + } + + //--- + + @Override + public CompletableFuture allocateSlot( + SlotRequestId slotRequestId, + ScheduledUnit scheduledUnit, + boolean
[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool
[ https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16660887#comment-16660887 ] ASF GitHub Bot commented on FLINK-10431: tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler URL: https://github.com/apache/flink/pull/6898#discussion_r227448691 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java ## @@ -172,8 +172,8 @@ else if (ret instanceof CompletableFuture) { } @Override - public CompletableFuture cancelSlotRequest(SlotRequestId slotRequestId, @Nullable SlotSharingGroupId slotSharingGroupId, Throwable cause) { - return CompletableFuture.completedFuture(Acknowledge.get()); + public Acknowledge cancelSlotRequest(SlotRequestId slotRequestId, @Nullable SlotSharingGroupId slotSharingGroupId, Throwable cause) { Review comment: I guess it could have a `void` return type. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Extract scheduling-related code from SlotPool > - > > Key: FLINK-10431 > URL: https://issues.apache.org/jira/browse/FLINK-10431 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Affects Versions: 1.7.0 >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Labels: pull-request-available > > The other half of the current scheduling logic is the management of slot > sharing and is located in the SlotPool. We need to extract this logic into > our new Scheduler component from the previous step. This leaves us with a > simpler SlotPool that mainly cares about obtaining, holding, and releasing > slots in interaction with a ResourceManager. The new Scheduler can now > identify slot sharing groups and interacts with the SlotPool. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler
tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler URL: https://github.com/apache/flink/pull/6898#discussion_r227460500 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/Scheduler.java ## @@ -0,0 +1,516 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster.slotpool; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.clusterframework.types.SlotProfile; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.instance.SlotSharingGroupId; +import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint; +import org.apache.flink.runtime.jobmanager.scheduler.Locality; +import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException; +import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit; +import org.apache.flink.runtime.jobmaster.LogicalSlot; +import org.apache.flink.runtime.jobmaster.SlotContext; +import org.apache.flink.runtime.jobmaster.SlotInfo; +import org.apache.flink.runtime.jobmaster.SlotOwner; +import org.apache.flink.runtime.jobmaster.SlotRequestId; +import org.apache.flink.runtime.messages.Acknowledge; +import org.apache.flink.util.AbstractID; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.Executor; +import java.util.function.BooleanSupplier; + +/** + * Scheduler that assigns tasks to slots. This class is currently work in progress, comments will be updated as we + * move forward. + */ +public class Scheduler implements SlotProvider, SlotOwner { + + /** Logger */ + private final Logger log = LoggerFactory.getLogger(getClass()); + + /** Strategy that selects the best slot for a given slot allocation request. */ + @Nonnull + private final SlotSelectionStrategy slotSelectionStrategy; + + /** Managers for the different slot sharing groups. */ + @Nonnull + private final Map slotSharingManagersMap; + + /** The slot pool from which slots are allocated. */ + @Nonnull + private final SlotPoolGateway slotPoolGateway; + + /** Executor for running tasks in the job master's main thread. */ + @Nonnull + private Executor componentMainThreadExecutor; + + /** Predicate to check if the current thread is the job master's main thread. */ + @Nonnull + private BooleanSupplier componentMainThreadCheck; + + + public Scheduler( + @Nonnull Map slotSharingManagersMap, + @Nonnull SlotSelectionStrategy slotSelectionStrategy, + @Nonnull SlotPoolGateway slotPoolGateway) { + + this.slotSelectionStrategy = slotSelectionStrategy; + this.slotSharingManagersMap = slotSharingManagersMap; + this.slotPoolGateway = slotPoolGateway; + this.componentMainThreadExecutor = (runnable) -> { + throw new IllegalStateException("Main thread executor not initialized."); + }; + this.componentMainThreadCheck = () -> { + throw new IllegalStateException("Main thread checker not initialized"); + }; + } + + public void start(@Nonnull Executor mainThreadExecutor, @Nonnull BooleanSupplier mainThreadCheck) { + this.componentMainThreadExecutor = mainThreadExecutor; + this.componentMainThreadCheck = mainThreadCheck; + } + + //--- + + @Override + public CompletableFuture allocateSlot( + SlotRequestId slotRequestId, + ScheduledUnit scheduledUnit, + boolean
[GitHub] tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler
tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler URL: https://github.com/apache/flink/pull/6898#discussion_r227454366 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PreviousAllocationSlotSelectionStrategy.java ## @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster.slotpool; + +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.SlotProfile; +import org.apache.flink.runtime.jobmanager.scheduler.Locality; +import org.apache.flink.runtime.jobmaster.SlotInfo; + +import javax.annotation.Nonnull; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +/** + * This class implements a {@link SlotSelectionStrategy} that is based on previous allocations and + * falls back to using location preference hints if there is no previous allocation. + */ +public class PreviousAllocationSlotSelectionStrategy implements SlotSelectionStrategy { + + public static final PreviousAllocationSlotSelectionStrategy INSTANCE = + new PreviousAllocationSlotSelectionStrategy(); + + private final LocationPreferenceSlotSelection locationPreferenceSlotSelection; + + private PreviousAllocationSlotSelectionStrategy() { + this.locationPreferenceSlotSelection = LocationPreferenceSlotSelection.INSTANCE; + } + + @Nonnull + @Override + public SlotInfoAndLocality selectBestSlotForProfile( + @Nonnull List availableSlots, + @Nonnull SlotProfile slotProfile) { + + Collection priorAllocations = slotProfile.getPreferredAllocations(); + + // First, if there was a prior allocation try to schedule to the same/old slot + if (!priorAllocations.isEmpty()) { + + HashSet priorAllocationsSet = new HashSet<>(priorAllocations); Review comment: Why are we creating a new `HashSet` here? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool
[ https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16660897#comment-16660897 ] ASF GitHub Bot commented on FLINK-10431: tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler URL: https://github.com/apache/flink/pull/6898#discussion_r227459571 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/Scheduler.java ## @@ -0,0 +1,516 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster.slotpool; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.clusterframework.types.SlotProfile; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.instance.SlotSharingGroupId; +import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint; +import org.apache.flink.runtime.jobmanager.scheduler.Locality; +import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException; +import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit; +import org.apache.flink.runtime.jobmaster.LogicalSlot; +import org.apache.flink.runtime.jobmaster.SlotContext; +import org.apache.flink.runtime.jobmaster.SlotInfo; +import org.apache.flink.runtime.jobmaster.SlotOwner; +import org.apache.flink.runtime.jobmaster.SlotRequestId; +import org.apache.flink.runtime.messages.Acknowledge; +import org.apache.flink.util.AbstractID; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.Executor; +import java.util.function.BooleanSupplier; + +/** + * Scheduler that assigns tasks to slots. This class is currently work in progress, comments will be updated as we + * move forward. + */ +public class Scheduler implements SlotProvider, SlotOwner { + + /** Logger */ + private final Logger log = LoggerFactory.getLogger(getClass()); + + /** Strategy that selects the best slot for a given slot allocation request. */ + @Nonnull + private final SlotSelectionStrategy slotSelectionStrategy; + + /** Managers for the different slot sharing groups. */ + @Nonnull + private final Map slotSharingManagersMap; + + /** The slot pool from which slots are allocated. */ + @Nonnull + private final SlotPoolGateway slotPoolGateway; + + /** Executor for running tasks in the job master's main thread. */ + @Nonnull + private Executor componentMainThreadExecutor; + + /** Predicate to check if the current thread is the job master's main thread. */ + @Nonnull + private BooleanSupplier componentMainThreadCheck; + + + public Scheduler( + @Nonnull Map slotSharingManagersMap, + @Nonnull SlotSelectionStrategy slotSelectionStrategy, + @Nonnull SlotPoolGateway slotPoolGateway) { + + this.slotSelectionStrategy = slotSelectionStrategy; + this.slotSharingManagersMap = slotSharingManagersMap; + this.slotPoolGateway = slotPoolGateway; + this.componentMainThreadExecutor = (runnable) -> { + throw new IllegalStateException("Main thread executor not initialized."); + }; + this.componentMainThreadCheck = () -> { + throw new IllegalStateException("Main thread checker not initialized"); + }; + } + + public void start(@Nonnull Executor mainThreadExecutor, @Nonnull BooleanSupplier mainThreadCheck) { + this.componentMainThreadExecutor = mainThreadExecutor; + this.componentMainThreadCheck = mainThreadCheck; + }
[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool
[ https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16660900#comment-16660900 ] ASF GitHub Bot commented on FLINK-10431: tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler URL: https://github.com/apache/flink/pull/6898#discussion_r227454366 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PreviousAllocationSlotSelectionStrategy.java ## @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster.slotpool; + +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.SlotProfile; +import org.apache.flink.runtime.jobmanager.scheduler.Locality; +import org.apache.flink.runtime.jobmaster.SlotInfo; + +import javax.annotation.Nonnull; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +/** + * This class implements a {@link SlotSelectionStrategy} that is based on previous allocations and + * falls back to using location preference hints if there is no previous allocation. + */ +public class PreviousAllocationSlotSelectionStrategy implements SlotSelectionStrategy { + + public static final PreviousAllocationSlotSelectionStrategy INSTANCE = + new PreviousAllocationSlotSelectionStrategy(); + + private final LocationPreferenceSlotSelection locationPreferenceSlotSelection; + + private PreviousAllocationSlotSelectionStrategy() { + this.locationPreferenceSlotSelection = LocationPreferenceSlotSelection.INSTANCE; + } + + @Nonnull + @Override + public SlotInfoAndLocality selectBestSlotForProfile( + @Nonnull List availableSlots, + @Nonnull SlotProfile slotProfile) { + + Collection priorAllocations = slotProfile.getPreferredAllocations(); + + // First, if there was a prior allocation try to schedule to the same/old slot + if (!priorAllocations.isEmpty()) { + + HashSet priorAllocationsSet = new HashSet<>(priorAllocations); Review comment: Why are we creating a new `HashSet` here? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Extract scheduling-related code from SlotPool > - > > Key: FLINK-10431 > URL: https://issues.apache.org/jira/browse/FLINK-10431 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Affects Versions: 1.7.0 >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Labels: pull-request-available > > The other half of the current scheduling logic is the management of slot > sharing and is located in the SlotPool. We need to extract this logic into > our new Scheduler component from the previous step. This leaves us with a > simpler SlotPool that mainly cares about obtaining, holding, and releasing > slots in interaction with a ResourceManager. The new Scheduler can now > identify slot sharing groups and interacts with the SlotPool. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler
tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler URL: https://github.com/apache/flink/pull/6898#discussion_r227449157 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java ## @@ -285,7 +290,9 @@ public JobMaster( this.slotPool = checkNotNull(slotPoolFactory).createSlotPool(jobGraph.getJobID()); - this.slotPoolGateway = slotPool.getSelfGateway(SlotPoolGateway.class); + this.slotPoolGateway = slotPool; Review comment: Can we remove this field? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler
tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler URL: https://github.com/apache/flink/pull/6898#discussion_r227450458 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/AllocatedSlotActions.java ## @@ -39,10 +38,23 @@ * @param slotRequestId identifying the slot to release * @param slotSharingGroupId identifying the slot sharing group to which the slot belongs, null if none * @param cause of the slot release, null if none -* @return Acknowledge (future) after the slot has been released +* @return Acknowledge after the slot has been released */ - CompletableFuture releaseSlot( + @Deprecated + Acknowledge releaseSlot( SlotRequestId slotRequestId, @Nullable SlotSharingGroupId slotSharingGroupId, @Nullable Throwable cause); + + /** +* Releases the slot with the given {@link SlotRequestId}. Additionally, one can provide a cause for the slot release. +* +* @param slotRequestId identifying the slot to release +* @param cause of the slot release, null if none +* @return Acknowledge after the slot has been released +*/ + @Nonnull + Acknowledge releaseSlot( Review comment: Same here with `void`? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool
[ https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16660901#comment-16660901 ] ASF GitHub Bot commented on FLINK-10431: tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler URL: https://github.com/apache/flink/pull/6898#discussion_r227456417 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/Scheduler.java ## @@ -0,0 +1,516 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster.slotpool; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.clusterframework.types.SlotProfile; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.instance.SlotSharingGroupId; +import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint; +import org.apache.flink.runtime.jobmanager.scheduler.Locality; +import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException; +import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit; +import org.apache.flink.runtime.jobmaster.LogicalSlot; +import org.apache.flink.runtime.jobmaster.SlotContext; +import org.apache.flink.runtime.jobmaster.SlotInfo; +import org.apache.flink.runtime.jobmaster.SlotOwner; +import org.apache.flink.runtime.jobmaster.SlotRequestId; +import org.apache.flink.runtime.messages.Acknowledge; +import org.apache.flink.util.AbstractID; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.Executor; +import java.util.function.BooleanSupplier; + +/** + * Scheduler that assigns tasks to slots. This class is currently work in progress, comments will be updated as we + * move forward. + */ +public class Scheduler implements SlotProvider, SlotOwner { + + /** Logger */ + private final Logger log = LoggerFactory.getLogger(getClass()); + + /** Strategy that selects the best slot for a given slot allocation request. */ + @Nonnull + private final SlotSelectionStrategy slotSelectionStrategy; + + /** Managers for the different slot sharing groups. */ + @Nonnull + private final Map slotSharingManagersMap; + + /** The slot pool from which slots are allocated. */ + @Nonnull + private final SlotPoolGateway slotPoolGateway; + + /** Executor for running tasks in the job master's main thread. */ + @Nonnull + private Executor componentMainThreadExecutor; + + /** Predicate to check if the current thread is the job master's main thread. */ + @Nonnull + private BooleanSupplier componentMainThreadCheck; + + + public Scheduler( + @Nonnull Map slotSharingManagersMap, + @Nonnull SlotSelectionStrategy slotSelectionStrategy, + @Nonnull SlotPoolGateway slotPoolGateway) { + + this.slotSelectionStrategy = slotSelectionStrategy; + this.slotSharingManagersMap = slotSharingManagersMap; + this.slotPoolGateway = slotPoolGateway; + this.componentMainThreadExecutor = (runnable) -> { + throw new IllegalStateException("Main thread executor not initialized."); + }; + this.componentMainThreadCheck = () -> { + throw new IllegalStateException("Main thread checker not initialized"); + }; + } + + public void start(@Nonnull Executor mainThreadExecutor, @Nonnull BooleanSupplier mainThreadCheck) { + this.componentMainThreadExecutor = mainThreadExecutor; + this.componentMainThreadCheck = mainThreadCheck; + }
[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool
[ https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16660891#comment-16660891 ] ASF GitHub Bot commented on FLINK-10431: tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler URL: https://github.com/apache/flink/pull/6898#discussion_r227451887 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/LocationPreferenceSlotSelection.java ## @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster.slotpool; + +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.clusterframework.types.SlotProfile; +import org.apache.flink.runtime.jobmanager.scheduler.Locality; +import org.apache.flink.runtime.jobmaster.SlotInfo; +import org.apache.flink.runtime.taskmanager.TaskManagerLocation; + +import javax.annotation.Nonnull; + +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.BiFunction; + +/** + * This class implements a {@link SlotSelectionStrategy} that is based on location preference hints. + */ +public class LocationPreferenceSlotSelection implements SlotSelectionStrategy { Review comment: Singleton's are best being implemented by using an ``` enum LocationPreferenceSlotSelection implements SlotSelectionStrategy { INSTANCE; @Override public SlotInfoAndLocality selectBestSlotForProfile( } ``` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Extract scheduling-related code from SlotPool > - > > Key: FLINK-10431 > URL: https://issues.apache.org/jira/browse/FLINK-10431 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Affects Versions: 1.7.0 >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Labels: pull-request-available > > The other half of the current scheduling logic is the management of slot > sharing and is located in the SlotPool. We need to extract this logic into > our new Scheduler component from the previous step. This leaves us with a > simpler SlotPool that mainly cares about obtaining, holding, and releasing > slots in interaction with a ResourceManager. The new Scheduler can now > identify slot sharing groups and interacts with the SlotPool. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool
[ https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16660898#comment-16660898 ] ASF GitHub Bot commented on FLINK-10431: tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler URL: https://github.com/apache/flink/pull/6898#discussion_r227453757 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PreviousAllocationSlotSelectionStrategy.java ## @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster.slotpool; + +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.SlotProfile; +import org.apache.flink.runtime.jobmanager.scheduler.Locality; +import org.apache.flink.runtime.jobmaster.SlotInfo; + +import javax.annotation.Nonnull; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +/** + * This class implements a {@link SlotSelectionStrategy} that is based on previous allocations and + * falls back to using location preference hints if there is no previous allocation. + */ +public class PreviousAllocationSlotSelectionStrategy implements SlotSelectionStrategy { Review comment: Same comment concerning singleton's and using an `enum` for that. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Extract scheduling-related code from SlotPool > - > > Key: FLINK-10431 > URL: https://issues.apache.org/jira/browse/FLINK-10431 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Affects Versions: 1.7.0 >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Labels: pull-request-available > > The other half of the current scheduling logic is the management of slot > sharing and is located in the SlotPool. We need to extract this logic into > our new Scheduler component from the previous step. This leaves us with a > simpler SlotPool that mainly cares about obtaining, holding, and releasing > slots in interaction with a ResourceManager. The new Scheduler can now > identify slot sharing groups and interacts with the SlotPool. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler
tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler URL: https://github.com/apache/flink/pull/6898#discussion_r227453190 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/LocationPreferenceSlotSelection.java ## @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster.slotpool; + +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.clusterframework.types.SlotProfile; +import org.apache.flink.runtime.jobmanager.scheduler.Locality; +import org.apache.flink.runtime.jobmaster.SlotInfo; +import org.apache.flink.runtime.taskmanager.TaskManagerLocation; + +import javax.annotation.Nonnull; + +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.BiFunction; + +/** + * This class implements a {@link SlotSelectionStrategy} that is based on location preference hints. + */ +public class LocationPreferenceSlotSelection implements SlotSelectionStrategy { + + public static final LocationPreferenceSlotSelection INSTANCE = new LocationPreferenceSlotSelection(); + + private LocationPreferenceSlotSelection() { + } + + /** +* Calculates the candidate's locality score. +*/ + private static final BiFunction LOCALITY_EVALUATION_FUNCTION = + (localWeigh, hostLocalWeigh) -> localWeigh * 10 + hostLocalWeigh; + + @Nonnull + @Override + public SlotInfoAndLocality selectBestSlotForProfile( + @Nonnull List availableSlots, + @Nonnull SlotProfile slotProfile) { + + Collection locationPreferences = slotProfile.getPreferredLocations(); + + if (availableSlots.isEmpty()) { + return SlotInfoAndLocality.of(null, Locality.UNKNOWN); + } + + final ResourceProfile resourceProfile = slotProfile.getResourceProfile(); + + // if we have no location preferences, we can only filter by the additional requirements. + if (locationPreferences.isEmpty()) { + for (SlotInfo candidate : availableSlots) { + if (candidate.getResourceProfile().isMatching(resourceProfile)) { + return SlotInfoAndLocality.of(candidate, Locality.UNCONSTRAINED); + } + } + return SlotInfoAndLocality.of(null, Locality.UNKNOWN); + } + + // we build up two indexes, one for resource id and one for host names of the preferred locations. + final Map preferredResourceIDs = new HashMap<>(locationPreferences.size()); + final Map preferredFQHostNames = new HashMap<>(locationPreferences.size()); + + for (TaskManagerLocation locationPreference : locationPreferences) { + preferredResourceIDs.merge(locationPreference.getResourceID(), 1, Integer::sum); + preferredFQHostNames.merge(locationPreference.getFQDNHostname(), 1, Integer::sum); + } + + SlotInfo bestCandidate = null; + Locality bestCandidateLocality = Locality.UNKNOWN; + int bestCandidateScore = Integer.MIN_VALUE; + + for (SlotInfo candidate : availableSlots) { + + if (candidate.getResourceProfile().isMatching(resourceProfile)) { + + // this gets candidate is local-weigh + Integer localWeigh = preferredResourceIDs.getOrDefault(candidate.getTaskManagerLocation().getResourceID(), 0); + + // this gets candidate is host-local-weigh + Integer hostLocalWeigh = preferredFQHostNames.getOrDefault(candidate.getTaskManagerLocation().getFQDNHostname(), 0); + +
[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool
[ https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16660899#comment-16660899 ] ASF GitHub Bot commented on FLINK-10431: tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler URL: https://github.com/apache/flink/pull/6898#discussion_r227455130 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PreviousAllocationSlotSelectionStrategy.java ## @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster.slotpool; + +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.SlotProfile; +import org.apache.flink.runtime.jobmanager.scheduler.Locality; +import org.apache.flink.runtime.jobmaster.SlotInfo; + +import javax.annotation.Nonnull; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +/** + * This class implements a {@link SlotSelectionStrategy} that is based on previous allocations and + * falls back to using location preference hints if there is no previous allocation. + */ +public class PreviousAllocationSlotSelectionStrategy implements SlotSelectionStrategy { + + public static final PreviousAllocationSlotSelectionStrategy INSTANCE = + new PreviousAllocationSlotSelectionStrategy(); + + private final LocationPreferenceSlotSelection locationPreferenceSlotSelection; + + private PreviousAllocationSlotSelectionStrategy() { + this.locationPreferenceSlotSelection = LocationPreferenceSlotSelection.INSTANCE; + } + + @Nonnull + @Override + public SlotInfoAndLocality selectBestSlotForProfile( + @Nonnull List availableSlots, + @Nonnull SlotProfile slotProfile) { + + Collection priorAllocations = slotProfile.getPreferredAllocations(); + + // First, if there was a prior allocation try to schedule to the same/old slot + if (!priorAllocations.isEmpty()) { + + HashSet priorAllocationsSet = new HashSet<>(priorAllocations); + for (SlotInfo availableSlot : availableSlots) { + if (priorAllocationsSet.contains(availableSlot.getAllocationId())) { + return SlotInfoAndLocality.of(availableSlot, Locality.LOCAL); + } + } + } + + // Second, select based on location preference, excluding blacklisted allocations + Set blackListedAllocations = slotProfile.getPreviousExecutionGraphAllocations(); + if (blackListedAllocations.isEmpty()) { + return locationPreferenceSlotSelection.selectBestSlotForProfile(availableSlots, slotProfile); + } else { + ArrayList availableAndAllowedSlots = new ArrayList<>(availableSlots.size()); + for (SlotInfo availableSlot : availableSlots) { + if (!blackListedAllocations.contains(availableSlot.getAllocationId())) { + availableAndAllowedSlots.add(availableSlot); + } + } + return locationPreferenceSlotSelection.selectBestSlotForProfile(availableAndAllowedSlots, slotProfile); + } Review comment: I would move `locationPreferenceSlotSelection.selectBestSlotForProfile(availableAndAllowedSlots, slotProfile)` out of the if-else blocks and only do the filtering of `availableSlots` in the branches. This reduces a bit code duplication. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about
[GitHub] tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler
tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler URL: https://github.com/apache/flink/pull/6898#discussion_r227451887 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/LocationPreferenceSlotSelection.java ## @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster.slotpool; + +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.clusterframework.types.SlotProfile; +import org.apache.flink.runtime.jobmanager.scheduler.Locality; +import org.apache.flink.runtime.jobmaster.SlotInfo; +import org.apache.flink.runtime.taskmanager.TaskManagerLocation; + +import javax.annotation.Nonnull; + +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.BiFunction; + +/** + * This class implements a {@link SlotSelectionStrategy} that is based on location preference hints. + */ +public class LocationPreferenceSlotSelection implements SlotSelectionStrategy { Review comment: Singleton's are best being implemented by using an ``` enum LocationPreferenceSlotSelection implements SlotSelectionStrategy { INSTANCE; @Override public SlotInfoAndLocality selectBestSlotForProfile( } ``` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler
tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler URL: https://github.com/apache/flink/pull/6898#discussion_r227459571 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/Scheduler.java ## @@ -0,0 +1,516 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster.slotpool; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.clusterframework.types.SlotProfile; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.instance.SlotSharingGroupId; +import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint; +import org.apache.flink.runtime.jobmanager.scheduler.Locality; +import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException; +import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit; +import org.apache.flink.runtime.jobmaster.LogicalSlot; +import org.apache.flink.runtime.jobmaster.SlotContext; +import org.apache.flink.runtime.jobmaster.SlotInfo; +import org.apache.flink.runtime.jobmaster.SlotOwner; +import org.apache.flink.runtime.jobmaster.SlotRequestId; +import org.apache.flink.runtime.messages.Acknowledge; +import org.apache.flink.util.AbstractID; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.Executor; +import java.util.function.BooleanSupplier; + +/** + * Scheduler that assigns tasks to slots. This class is currently work in progress, comments will be updated as we + * move forward. + */ +public class Scheduler implements SlotProvider, SlotOwner { + + /** Logger */ + private final Logger log = LoggerFactory.getLogger(getClass()); + + /** Strategy that selects the best slot for a given slot allocation request. */ + @Nonnull + private final SlotSelectionStrategy slotSelectionStrategy; + + /** Managers for the different slot sharing groups. */ + @Nonnull + private final Map slotSharingManagersMap; + + /** The slot pool from which slots are allocated. */ + @Nonnull + private final SlotPoolGateway slotPoolGateway; + + /** Executor for running tasks in the job master's main thread. */ + @Nonnull + private Executor componentMainThreadExecutor; + + /** Predicate to check if the current thread is the job master's main thread. */ + @Nonnull + private BooleanSupplier componentMainThreadCheck; + + + public Scheduler( + @Nonnull Map slotSharingManagersMap, + @Nonnull SlotSelectionStrategy slotSelectionStrategy, + @Nonnull SlotPoolGateway slotPoolGateway) { + + this.slotSelectionStrategy = slotSelectionStrategy; + this.slotSharingManagersMap = slotSharingManagersMap; + this.slotPoolGateway = slotPoolGateway; + this.componentMainThreadExecutor = (runnable) -> { + throw new IllegalStateException("Main thread executor not initialized."); + }; + this.componentMainThreadCheck = () -> { + throw new IllegalStateException("Main thread checker not initialized"); + }; + } + + public void start(@Nonnull Executor mainThreadExecutor, @Nonnull BooleanSupplier mainThreadCheck) { + this.componentMainThreadExecutor = mainThreadExecutor; + this.componentMainThreadCheck = mainThreadCheck; + } + + //--- + + @Override + public CompletableFuture allocateSlot( + SlotRequestId slotRequestId, + ScheduledUnit scheduledUnit, + boolean
[GitHub] tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler
tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler URL: https://github.com/apache/flink/pull/6898#discussion_r227452749 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/LocationPreferenceSlotSelection.java ## @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster.slotpool; + +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.clusterframework.types.SlotProfile; +import org.apache.flink.runtime.jobmanager.scheduler.Locality; +import org.apache.flink.runtime.jobmaster.SlotInfo; +import org.apache.flink.runtime.taskmanager.TaskManagerLocation; + +import javax.annotation.Nonnull; + +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.BiFunction; + +/** + * This class implements a {@link SlotSelectionStrategy} that is based on location preference hints. + */ +public class LocationPreferenceSlotSelection implements SlotSelectionStrategy { + + public static final LocationPreferenceSlotSelection INSTANCE = new LocationPreferenceSlotSelection(); + + private LocationPreferenceSlotSelection() { + } + + /** +* Calculates the candidate's locality score. +*/ + private static final BiFunction LOCALITY_EVALUATION_FUNCTION = + (localWeigh, hostLocalWeigh) -> localWeigh * 10 + hostLocalWeigh; + + @Nonnull + @Override + public SlotInfoAndLocality selectBestSlotForProfile( + @Nonnull List availableSlots, + @Nonnull SlotProfile slotProfile) { + + Collection locationPreferences = slotProfile.getPreferredLocations(); + + if (availableSlots.isEmpty()) { + return SlotInfoAndLocality.of(null, Locality.UNKNOWN); + } + + final ResourceProfile resourceProfile = slotProfile.getResourceProfile(); + + // if we have no location preferences, we can only filter by the additional requirements. + if (locationPreferences.isEmpty()) { + for (SlotInfo candidate : availableSlots) { + if (candidate.getResourceProfile().isMatching(resourceProfile)) { + return SlotInfoAndLocality.of(candidate, Locality.UNCONSTRAINED); + } + } + return SlotInfoAndLocality.of(null, Locality.UNKNOWN); Review comment: Factor out in separate method, e.g. `selectBestSlotByResourceProfile` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler
tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler URL: https://github.com/apache/flink/pull/6898#discussion_r227450833 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultSchedulerFactory.java ## @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster.slotpool; + +import org.apache.flink.configuration.CheckpointingOptions; +import org.apache.flink.configuration.Configuration; + +import javax.annotation.Nonnull; + +import java.util.HashMap; + +/** + * Default implementation of a {@link SchedulerFactory}. + */ +public class DefaultSchedulerFactory implements SchedulerFactory { + + @Nonnull + private final SlotSelectionStrategy slotSelectionStrategy; + + public DefaultSchedulerFactory(@Nonnull SlotSelectionStrategy slotSelectionStrategy) { + this.slotSelectionStrategy = slotSelectionStrategy; + } + + @Nonnull + @Override + public Scheduler createScheduler(@Nonnull SlotPoolGateway slotPoolGateway) { + return new Scheduler(new HashMap<>(), slotSelectionStrategy, slotPoolGateway); Review comment: What could be a reasonable initial size of the `HashMap`? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler
tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler URL: https://github.com/apache/flink/pull/6898#discussion_r227455130 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PreviousAllocationSlotSelectionStrategy.java ## @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster.slotpool; + +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.SlotProfile; +import org.apache.flink.runtime.jobmanager.scheduler.Locality; +import org.apache.flink.runtime.jobmaster.SlotInfo; + +import javax.annotation.Nonnull; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +/** + * This class implements a {@link SlotSelectionStrategy} that is based on previous allocations and + * falls back to using location preference hints if there is no previous allocation. + */ +public class PreviousAllocationSlotSelectionStrategy implements SlotSelectionStrategy { + + public static final PreviousAllocationSlotSelectionStrategy INSTANCE = + new PreviousAllocationSlotSelectionStrategy(); + + private final LocationPreferenceSlotSelection locationPreferenceSlotSelection; + + private PreviousAllocationSlotSelectionStrategy() { + this.locationPreferenceSlotSelection = LocationPreferenceSlotSelection.INSTANCE; + } + + @Nonnull + @Override + public SlotInfoAndLocality selectBestSlotForProfile( + @Nonnull List availableSlots, + @Nonnull SlotProfile slotProfile) { + + Collection priorAllocations = slotProfile.getPreferredAllocations(); + + // First, if there was a prior allocation try to schedule to the same/old slot + if (!priorAllocations.isEmpty()) { + + HashSet priorAllocationsSet = new HashSet<>(priorAllocations); + for (SlotInfo availableSlot : availableSlots) { + if (priorAllocationsSet.contains(availableSlot.getAllocationId())) { + return SlotInfoAndLocality.of(availableSlot, Locality.LOCAL); + } + } + } + + // Second, select based on location preference, excluding blacklisted allocations + Set blackListedAllocations = slotProfile.getPreviousExecutionGraphAllocations(); + if (blackListedAllocations.isEmpty()) { + return locationPreferenceSlotSelection.selectBestSlotForProfile(availableSlots, slotProfile); + } else { + ArrayList availableAndAllowedSlots = new ArrayList<>(availableSlots.size()); + for (SlotInfo availableSlot : availableSlots) { + if (!blackListedAllocations.contains(availableSlot.getAllocationId())) { + availableAndAllowedSlots.add(availableSlot); + } + } + return locationPreferenceSlotSelection.selectBestSlotForProfile(availableAndAllowedSlots, slotProfile); + } Review comment: I would move `locationPreferenceSlotSelection.selectBestSlotForProfile(availableAndAllowedSlots, slotProfile)` out of the if-else blocks and only do the filtering of `availableSlots` in the branches. This reduces a bit code duplication. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler
tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler URL: https://github.com/apache/flink/pull/6898#discussion_r227450350 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/AllocatedSlotActions.java ## @@ -39,10 +38,23 @@ * @param slotRequestId identifying the slot to release * @param slotSharingGroupId identifying the slot sharing group to which the slot belongs, null if none * @param cause of the slot release, null if none -* @return Acknowledge (future) after the slot has been released +* @return Acknowledge after the slot has been released */ - CompletableFuture releaseSlot( + @Deprecated + Acknowledge releaseSlot( Review comment: Could be `void`? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler
tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler URL: https://github.com/apache/flink/pull/6898#discussion_r227456207 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/Scheduler.java ## @@ -0,0 +1,516 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster.slotpool; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.clusterframework.types.SlotProfile; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.instance.SlotSharingGroupId; +import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint; +import org.apache.flink.runtime.jobmanager.scheduler.Locality; +import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException; +import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit; +import org.apache.flink.runtime.jobmaster.LogicalSlot; +import org.apache.flink.runtime.jobmaster.SlotContext; +import org.apache.flink.runtime.jobmaster.SlotInfo; +import org.apache.flink.runtime.jobmaster.SlotOwner; +import org.apache.flink.runtime.jobmaster.SlotRequestId; +import org.apache.flink.runtime.messages.Acknowledge; +import org.apache.flink.util.AbstractID; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.Executor; +import java.util.function.BooleanSupplier; + +/** + * Scheduler that assigns tasks to slots. This class is currently work in progress, comments will be updated as we + * move forward. + */ +public class Scheduler implements SlotProvider, SlotOwner { + + /** Logger */ + private final Logger log = LoggerFactory.getLogger(getClass()); + + /** Strategy that selects the best slot for a given slot allocation request. */ + @Nonnull + private final SlotSelectionStrategy slotSelectionStrategy; + + /** Managers for the different slot sharing groups. */ + @Nonnull + private final Map slotSharingManagersMap; + + /** The slot pool from which slots are allocated. */ + @Nonnull + private final SlotPoolGateway slotPoolGateway; + + /** Executor for running tasks in the job master's main thread. */ + @Nonnull + private Executor componentMainThreadExecutor; + + /** Predicate to check if the current thread is the job master's main thread. */ + @Nonnull + private BooleanSupplier componentMainThreadCheck; + + + public Scheduler( + @Nonnull Map slotSharingManagersMap, + @Nonnull SlotSelectionStrategy slotSelectionStrategy, + @Nonnull SlotPoolGateway slotPoolGateway) { + + this.slotSelectionStrategy = slotSelectionStrategy; + this.slotSharingManagersMap = slotSharingManagersMap; + this.slotPoolGateway = slotPoolGateway; + this.componentMainThreadExecutor = (runnable) -> { + throw new IllegalStateException("Main thread executor not initialized."); + }; + this.componentMainThreadCheck = () -> { + throw new IllegalStateException("Main thread checker not initialized"); + }; + } + + public void start(@Nonnull Executor mainThreadExecutor, @Nonnull BooleanSupplier mainThreadCheck) { + this.componentMainThreadExecutor = mainThreadExecutor; + this.componentMainThreadCheck = mainThreadCheck; + } + + //--- + + @Override + public CompletableFuture allocateSlot( + SlotRequestId slotRequestId, + ScheduledUnit scheduledUnit, + boolean
[GitHub] tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler
tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler URL: https://github.com/apache/flink/pull/6898#discussion_r227453757 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PreviousAllocationSlotSelectionStrategy.java ## @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster.slotpool; + +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.SlotProfile; +import org.apache.flink.runtime.jobmanager.scheduler.Locality; +import org.apache.flink.runtime.jobmaster.SlotInfo; + +import javax.annotation.Nonnull; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +/** + * This class implements a {@link SlotSelectionStrategy} that is based on previous allocations and + * falls back to using location preference hints if there is no previous allocation. + */ +public class PreviousAllocationSlotSelectionStrategy implements SlotSelectionStrategy { Review comment: Same comment concerning singleton's and using an `enum` for that. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10052) Tolerate temporarily suspended ZooKeeper connections
[ https://issues.apache.org/jira/browse/FLINK-10052?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16660864#comment-16660864 ] Elias Levy commented on FLINK-10052: [~Wosinsan] any progress on this issue? > Tolerate temporarily suspended ZooKeeper connections > > > Key: FLINK-10052 > URL: https://issues.apache.org/jira/browse/FLINK-10052 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.4.2, 1.5.2, 1.6.0 >Reporter: Till Rohrmann >Assignee: Dominik Wosiński >Priority: Major > > This issue results from FLINK-10011 which uncovered a problem with Flink's HA > recovery and proposed the following solution to harden Flink: > The {{ZooKeeperLeaderElectionService}} uses the {{LeaderLatch}} Curator > recipe for leader election. The leader latch revokes leadership in case of a > suspended ZooKeeper connection. This can be premature in case that the system > can reconnect to ZooKeeper before its session expires. The effect of the lost > leadership is that all jobs will be canceled and directly restarted after > regaining the leadership. > Instead of directly revoking the leadership upon a SUSPENDED ZooKeeper > connection, it would be better to wait until the ZooKeeper connection is > LOST. That way we would allow the system to reconnect and not lose the > leadership. This could be achievable by using Curator's {{LeaderSelector}} > instead of the {{LeaderLatch}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10637) Start MiniCluster with random REST port
[ https://issues.apache.org/jira/browse/FLINK-10637?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16660850#comment-16660850 ] ASF GitHub Bot commented on FLINK-10637: zentol commented on issue #6899: [FLINK-10637] Use MiniClusterResource for tests in flink-runtime URL: https://github.com/apache/flink/pull/6899#issuecomment-432301818 sounds good to me @tillrohrmann . This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Start MiniCluster with random REST port > --- > > Key: FLINK-10637 > URL: https://issues.apache.org/jira/browse/FLINK-10637 > Project: Flink > Issue Type: Improvement > Components: Tests >Affects Versions: 1.5.4, 1.6.1, 1.7.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Critical > Labels: pull-request-available > Fix For: 1.5.6, 1.6.3, 1.7.0 > > > The {{MiniCluster}} picks a random port for the {{RpcService}} but not for > the REST server endpoint. Due to this it falls back to {{8081}}. This can > lead to port conflicts if tests are executed concurrently. > I propose to rename the {{MiniClusterResource}} into > {{MiniClusterResourceWithRestClient}} and add a new {{MiniClusterResource}} > which only starts a {{MiniCluster}} with the REST port set to 0. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] zentol commented on issue #6899: [FLINK-10637] Use MiniClusterResource for tests in flink-runtime
zentol commented on issue #6899: [FLINK-10637] Use MiniClusterResource for tests in flink-runtime URL: https://github.com/apache/flink/pull/6899#issuecomment-432301818 sounds good to me @tillrohrmann . This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10037) Document details event time behavior in a single location
[ https://issues.apache.org/jira/browse/FLINK-10037?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16660845#comment-16660845 ] ASF GitHub Bot commented on FLINK-10037: pnowojski commented on issue #6481: [FLINK-10037] [Documentation] Add Event Time Details documentation page URL: https://github.com/apache/flink/pull/6481#issuecomment-432298148 @eliaslevy what @aljoscha had in mind (correct me if I'm wrong) is that this new `"Event Time Details"` page should be deduplicated with the existing content (I think in three pages `event_time***.md`: [1](https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/event_time.html), [2](https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/event_timestamps_watermarks.html) and [3](https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/event_timestamp_extractors.html)). Could you read/compare them and either move content between them or cross reference it? If we keep more or less the same content duplicated in the docs, there is high chance that they will drift out of sync and that one of them will become outdated. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Document details event time behavior in a single location > - > > Key: FLINK-10037 > URL: https://issues.apache.org/jira/browse/FLINK-10037 > Project: Flink > Issue Type: Bug > Components: Documentation >Affects Versions: 1.5.2 >Reporter: Elias Levy >Assignee: Elias Levy >Priority: Minor > Labels: pull-request-available > > A description of event time and watermarks, how they generated, assigned, and > handled, is spread across many pages in the documentation. I would be useful > to have it all in a single place and includes missing information, such as > how Flink assigns timestamps to new records generated by operators. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] pnowojski commented on issue #6481: [FLINK-10037] [Documentation] Add Event Time Details documentation page
pnowojski commented on issue #6481: [FLINK-10037] [Documentation] Add Event Time Details documentation page URL: https://github.com/apache/flink/pull/6481#issuecomment-432298148 @eliaslevy what @aljoscha had in mind (correct me if I'm wrong) is that this new `"Event Time Details"` page should be deduplicated with the existing content (I think in three pages `event_time***.md`: [1](https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/event_time.html), [2](https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/event_timestamps_watermarks.html) and [3](https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/event_timestamp_extractors.html)). Could you read/compare them and either move content between them or cross reference it? If we keep more or less the same content duplicated in the docs, there is high chance that they will drift out of sync and that one of them will become outdated. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool
[ https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16660839#comment-16660839 ] ASF GitHub Bot commented on FLINK-10431: tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler URL: https://github.com/apache/flink/pull/6898#discussion_r227447242 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java ## @@ -1676,6 +1684,35 @@ public void updateAccumulators(AccumulatorSnapshot accumulatorSnapshot) { } } + /** +* Computes and returns a set with the prior allocation ids from all execution vertices in the graph. +*/ + private Set computeAllPriorAllocationIds() { + HashSet allPreviousAllocationIds = new HashSet<>(); + Iterable ejvIterable = getVerticesTopologically(); + for (ExecutionJobVertex executionJobVertex : ejvIterable) { + for (ExecutionVertex executionVertex : executionJobVertex.getTaskVertices()) { Review comment: `getAllExecutionVertices` should be easier to use here. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Extract scheduling-related code from SlotPool > - > > Key: FLINK-10431 > URL: https://issues.apache.org/jira/browse/FLINK-10431 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Affects Versions: 1.7.0 >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Labels: pull-request-available > > The other half of the current scheduling logic is the management of slot > sharing and is located in the SlotPool. We need to extract this logic into > our new Scheduler component from the previous step. This leaves us with a > simpler SlotPool that mainly cares about obtaining, holding, and releasing > slots in interaction with a ResourceManager. The new Scheduler can now > identify slot sharing groups and interacts with the SlotPool. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler
tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler URL: https://github.com/apache/flink/pull/6898#discussion_r227447242 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java ## @@ -1676,6 +1684,35 @@ public void updateAccumulators(AccumulatorSnapshot accumulatorSnapshot) { } } + /** +* Computes and returns a set with the prior allocation ids from all execution vertices in the graph. +*/ + private Set computeAllPriorAllocationIds() { + HashSet allPreviousAllocationIds = new HashSet<>(); + Iterable ejvIterable = getVerticesTopologically(); + for (ExecutionJobVertex executionJobVertex : ejvIterable) { + for (ExecutionVertex executionVertex : executionJobVertex.getTaskVertices()) { Review comment: `getAllExecutionVertices` should be easier to use here. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool
[ https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16660838#comment-16660838 ] ASF GitHub Bot commented on FLINK-10431: tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler URL: https://github.com/apache/flink/pull/6898#discussion_r227446797 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java ## @@ -1676,6 +1684,35 @@ public void updateAccumulators(AccumulatorSnapshot accumulatorSnapshot) { } } + /** +* Computes and returns a set with the prior allocation ids from all execution vertices in the graph. +*/ + private Set computeAllPriorAllocationIds() { + HashSet allPreviousAllocationIds = new HashSet<>(); Review comment: Initialize `HashSet` with `getNumberOfExecutionJobVertices()` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Extract scheduling-related code from SlotPool > - > > Key: FLINK-10431 > URL: https://issues.apache.org/jira/browse/FLINK-10431 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Affects Versions: 1.7.0 >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Labels: pull-request-available > > The other half of the current scheduling logic is the management of slot > sharing and is located in the SlotPool. We need to extract this logic into > our new Scheduler component from the previous step. This leaves us with a > simpler SlotPool that mainly cares about obtaining, holding, and releasing > slots in interaction with a ResourceManager. The new Scheduler can now > identify slot sharing groups and interacts with the SlotPool. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler
tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler URL: https://github.com/apache/flink/pull/6898#discussion_r227446797 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java ## @@ -1676,6 +1684,35 @@ public void updateAccumulators(AccumulatorSnapshot accumulatorSnapshot) { } } + /** +* Computes and returns a set with the prior allocation ids from all execution vertices in the graph. +*/ + private Set computeAllPriorAllocationIds() { + HashSet allPreviousAllocationIds = new HashSet<>(); Review comment: Initialize `HashSet` with `getNumberOfExecutionJobVertices()` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler
tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler URL: https://github.com/apache/flink/pull/6898#discussion_r227446397 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java ## @@ -939,13 +942,18 @@ public void scheduleForExecution() throws JobException { // collecting all the slots may resize and fail in that operation without slots getting lost final ArrayList> allAllocationFutures = new ArrayList<>(getNumberOfExecutionJobVertices()); + // this is a (temporary) to avoid collecting the previous allocations for all executions again and again Review comment: noun missing in the comment This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool
[ https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16660835#comment-16660835 ] ASF GitHub Bot commented on FLINK-10431: StefanRRichter commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler URL: https://github.com/apache/flink/pull/6898#discussion_r227446372 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java ## @@ -450,6 +454,7 @@ public void setInitialState(@Nullable JobManagerTaskRestore taskRestore) { SlotProvider slotProvider, boolean queued, LocationPreferenceConstraint locationPreferenceConstraint, + @Nullable Set allPreviousExecutionGraphAllocationIds, Review comment: Will do. Overall this construct that uses `null` will go away once we introduce group scheduling. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Extract scheduling-related code from SlotPool > - > > Key: FLINK-10431 > URL: https://issues.apache.org/jira/browse/FLINK-10431 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Affects Versions: 1.7.0 >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Labels: pull-request-available > > The other half of the current scheduling logic is the management of slot > sharing and is located in the SlotPool. We need to extract this logic into > our new Scheduler component from the previous step. This leaves us with a > simpler SlotPool that mainly cares about obtaining, holding, and releasing > slots in interaction with a ResourceManager. The new Scheduler can now > identify slot sharing groups and interacts with the SlotPool. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool
[ https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16660836#comment-16660836 ] ASF GitHub Bot commented on FLINK-10431: tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler URL: https://github.com/apache/flink/pull/6898#discussion_r227446397 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java ## @@ -939,13 +942,18 @@ public void scheduleForExecution() throws JobException { // collecting all the slots may resize and fail in that operation without slots getting lost final ArrayList> allAllocationFutures = new ArrayList<>(getNumberOfExecutionJobVertices()); + // this is a (temporary) to avoid collecting the previous allocations for all executions again and again Review comment: noun missing in the comment This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Extract scheduling-related code from SlotPool > - > > Key: FLINK-10431 > URL: https://issues.apache.org/jira/browse/FLINK-10431 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Affects Versions: 1.7.0 >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Labels: pull-request-available > > The other half of the current scheduling logic is the management of slot > sharing and is located in the SlotPool. We need to extract this logic into > our new Scheduler component from the previous step. This leaves us with a > simpler SlotPool that mainly cares about obtaining, holding, and releasing > slots in interaction with a ResourceManager. The new Scheduler can now > identify slot sharing groups and interacts with the SlotPool. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] StefanRRichter commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler
StefanRRichter commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler URL: https://github.com/apache/flink/pull/6898#discussion_r227446372 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java ## @@ -450,6 +454,7 @@ public void setInitialState(@Nullable JobManagerTaskRestore taskRestore) { SlotProvider slotProvider, boolean queued, LocationPreferenceConstraint locationPreferenceConstraint, + @Nullable Set allPreviousExecutionGraphAllocationIds, Review comment: Will do. Overall this construct that uses `null` will go away once we introduce group scheduling. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool
[ https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16660833#comment-16660833 ] ASF GitHub Bot commented on FLINK-10431: tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler URL: https://github.com/apache/flink/pull/6898#discussion_r227445757 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java ## @@ -450,6 +454,7 @@ public void setInitialState(@Nullable JobManagerTaskRestore taskRestore) { SlotProvider slotProvider, boolean queued, LocationPreferenceConstraint locationPreferenceConstraint, + @Nullable Set allPreviousExecutionGraphAllocationIds, Review comment: Please update the JavaDocs stating what `null` for this parameter means. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Extract scheduling-related code from SlotPool > - > > Key: FLINK-10431 > URL: https://issues.apache.org/jira/browse/FLINK-10431 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Affects Versions: 1.7.0 >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Labels: pull-request-available > > The other half of the current scheduling logic is the management of slot > sharing and is located in the SlotPool. We need to extract this logic into > our new Scheduler component from the previous step. This leaves us with a > simpler SlotPool that mainly cares about obtaining, holding, and releasing > slots in interaction with a ResourceManager. The new Scheduler can now > identify slot sharing groups and interacts with the SlotPool. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool
[ https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16660827#comment-16660827 ] ASF GitHub Bot commented on FLINK-10431: tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler URL: https://github.com/apache/flink/pull/6898#discussion_r227444801 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java ## @@ -450,6 +454,7 @@ public void setInitialState(@Nullable JobManagerTaskRestore taskRestore) { SlotProvider slotProvider, boolean queued, LocationPreferenceConstraint locationPreferenceConstraint, + @Nullable Set allPreviousExecutionGraphAllocationIds, Review comment: Same here with `@Nullable` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Extract scheduling-related code from SlotPool > - > > Key: FLINK-10431 > URL: https://issues.apache.org/jira/browse/FLINK-10431 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Affects Versions: 1.7.0 >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Labels: pull-request-available > > The other half of the current scheduling logic is the management of slot > sharing and is located in the SlotPool. We need to extract this logic into > our new Scheduler component from the previous step. This leaves us with a > simpler SlotPool that mainly cares about obtaining, holding, and releasing > slots in interaction with a ResourceManager. The new Scheduler can now > identify slot sharing groups and interacts with the SlotPool. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler
tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler URL: https://github.com/apache/flink/pull/6898#discussion_r227445757 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java ## @@ -450,6 +454,7 @@ public void setInitialState(@Nullable JobManagerTaskRestore taskRestore) { SlotProvider slotProvider, boolean queued, LocationPreferenceConstraint locationPreferenceConstraint, + @Nullable Set allPreviousExecutionGraphAllocationIds, Review comment: Please update the JavaDocs stating what `null` for this parameter means. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool
[ https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16660829#comment-16660829 ] ASF GitHub Bot commented on FLINK-10431: StefanRRichter commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler URL: https://github.com/apache/flink/pull/6898#discussion_r227445579 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java ## @@ -385,7 +386,8 @@ public void setInitialState(@Nullable JobManagerTaskRestore taskRestore) { return scheduleForExecution( resourceProvider, allowQueued, - LocationPreferenceConstraint.ANY); + LocationPreferenceConstraint.ANY, + null); Review comment: Point is that`null` and `emptySet` have different meanings here: `null` means it was not previously computed/provided and has tp be determined inside the execution. `emptySet` means it has been provided and the result was just empty. Overall, this whole construct should go away very soon when we move ro group scheduling. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Extract scheduling-related code from SlotPool > - > > Key: FLINK-10431 > URL: https://issues.apache.org/jira/browse/FLINK-10431 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Affects Versions: 1.7.0 >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Labels: pull-request-available > > The other half of the current scheduling logic is the management of slot > sharing and is located in the SlotPool. We need to extract this logic into > our new Scheduler component from the previous step. This leaves us with a > simpler SlotPool that mainly cares about obtaining, holding, and releasing > slots in interaction with a ResourceManager. The new Scheduler can now > identify slot sharing groups and interacts with the SlotPool. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool
[ https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16660828#comment-16660828 ] ASF GitHub Bot commented on FLINK-10431: tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler URL: https://github.com/apache/flink/pull/6898#discussion_r227444632 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java ## @@ -402,13 +404,15 @@ public void setInitialState(@Nullable JobManagerTaskRestore taskRestore) { public CompletableFuture scheduleForExecution( SlotProvider slotProvider, boolean queued, - LocationPreferenceConstraint locationPreferenceConstraint) { + LocationPreferenceConstraint locationPreferenceConstraint, + @Nullable Set allPreviousExecutionGraphAllocationIds) { Review comment: Let's try not to pass in null arguments This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Extract scheduling-related code from SlotPool > - > > Key: FLINK-10431 > URL: https://issues.apache.org/jira/browse/FLINK-10431 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Affects Versions: 1.7.0 >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Labels: pull-request-available > > The other half of the current scheduling logic is the management of slot > sharing and is located in the SlotPool. We need to extract this logic into > our new Scheduler component from the previous step. This leaves us with a > simpler SlotPool that mainly cares about obtaining, holding, and releasing > slots in interaction with a ResourceManager. The new Scheduler can now > identify slot sharing groups and interacts with the SlotPool. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool
[ https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16660831#comment-16660831 ] ASF GitHub Bot commented on FLINK-10431: StefanRRichter commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler URL: https://github.com/apache/flink/pull/6898#discussion_r227445579 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java ## @@ -385,7 +386,8 @@ public void setInitialState(@Nullable JobManagerTaskRestore taskRestore) { return scheduleForExecution( resourceProvider, allowQueued, - LocationPreferenceConstraint.ANY); + LocationPreferenceConstraint.ANY, + null); Review comment: Point is that`null` and `emptySet` have different meanings here: `null` means it was not previously computed/provided and has tp be determined inside the execution. `emptySet` means it has been provided and the result was just empty. Overall, this whole construct should go away very soon when we move ro group scheduling. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Extract scheduling-related code from SlotPool > - > > Key: FLINK-10431 > URL: https://issues.apache.org/jira/browse/FLINK-10431 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Affects Versions: 1.7.0 >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Labels: pull-request-available > > The other half of the current scheduling logic is the management of slot > sharing and is located in the SlotPool. We need to extract this logic into > our new Scheduler component from the previous step. This leaves us with a > simpler SlotPool that mainly cares about obtaining, holding, and releasing > slots in interaction with a ResourceManager. The new Scheduler can now > identify slot sharing groups and interacts with the SlotPool. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool
[ https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16660830#comment-16660830 ] ASF GitHub Bot commented on FLINK-10431: tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler URL: https://github.com/apache/flink/pull/6898#discussion_r22705 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java ## @@ -385,7 +386,8 @@ public void setInitialState(@Nullable JobManagerTaskRestore taskRestore) { return scheduleForExecution( resourceProvider, allowQueued, - LocationPreferenceConstraint.ANY); + LocationPreferenceConstraint.ANY, + null); Review comment: Let's not use `null` but instead `Collections.emptySet()` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Extract scheduling-related code from SlotPool > - > > Key: FLINK-10431 > URL: https://issues.apache.org/jira/browse/FLINK-10431 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Affects Versions: 1.7.0 >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Labels: pull-request-available > > The other half of the current scheduling logic is the management of slot > sharing and is located in the SlotPool. We need to extract this logic into > our new Scheduler component from the previous step. This leaves us with a > simpler SlotPool that mainly cares about obtaining, holding, and releasing > slots in interaction with a ResourceManager. The new Scheduler can now > identify slot sharing groups and interacts with the SlotPool. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] StefanRRichter commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler
StefanRRichter commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler URL: https://github.com/apache/flink/pull/6898#discussion_r227445579 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java ## @@ -385,7 +386,8 @@ public void setInitialState(@Nullable JobManagerTaskRestore taskRestore) { return scheduleForExecution( resourceProvider, allowQueued, - LocationPreferenceConstraint.ANY); + LocationPreferenceConstraint.ANY, + null); Review comment: Point is that`null` and `emptySet` have different meanings here: `null` means it was not previously computed/provided and has tp be determined inside the execution. `emptySet` means it has been provided and the result was just empty. Overall, this whole construct should go away very soon when we move ro group scheduling. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] StefanRRichter commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler
StefanRRichter commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler URL: https://github.com/apache/flink/pull/6898#discussion_r227445579 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java ## @@ -385,7 +386,8 @@ public void setInitialState(@Nullable JobManagerTaskRestore taskRestore) { return scheduleForExecution( resourceProvider, allowQueued, - LocationPreferenceConstraint.ANY); + LocationPreferenceConstraint.ANY, + null); Review comment: Point is that`null` and `emptySet` have different meanings here: `null` means it was not previously computed/provided and has tp be determined inside the execution. `emptySet` means it has been provided and the result was just empty. Overall, this whole construct should go away very soon when we move ro group scheduling. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler
tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler URL: https://github.com/apache/flink/pull/6898#discussion_r227444632 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java ## @@ -402,13 +404,15 @@ public void setInitialState(@Nullable JobManagerTaskRestore taskRestore) { public CompletableFuture scheduleForExecution( SlotProvider slotProvider, boolean queued, - LocationPreferenceConstraint locationPreferenceConstraint) { + LocationPreferenceConstraint locationPreferenceConstraint, + @Nullable Set allPreviousExecutionGraphAllocationIds) { Review comment: Let's try not to pass in null arguments This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler
tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler URL: https://github.com/apache/flink/pull/6898#discussion_r22705 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java ## @@ -385,7 +386,8 @@ public void setInitialState(@Nullable JobManagerTaskRestore taskRestore) { return scheduleForExecution( resourceProvider, allowQueued, - LocationPreferenceConstraint.ANY); + LocationPreferenceConstraint.ANY, + null); Review comment: Let's not use `null` but instead `Collections.emptySet()` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler
tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler URL: https://github.com/apache/flink/pull/6898#discussion_r227444801 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java ## @@ -450,6 +454,7 @@ public void setInitialState(@Nullable JobManagerTaskRestore taskRestore) { SlotProvider slotProvider, boolean queued, LocationPreferenceConstraint locationPreferenceConstraint, + @Nullable Set allPreviousExecutionGraphAllocationIds, Review comment: Same here with `@Nullable` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler
tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler URL: https://github.com/apache/flink/pull/6898#discussion_r227444801 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java ## @@ -450,6 +454,7 @@ public void setInitialState(@Nullable JobManagerTaskRestore taskRestore) { SlotProvider slotProvider, boolean queued, LocationPreferenceConstraint locationPreferenceConstraint, + @Nullable Set allPreviousExecutionGraphAllocationIds, Review comment: Same here with `@Nullable` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool
[ https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16660824#comment-16660824 ] ASF GitHub Bot commented on FLINK-10431: tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler URL: https://github.com/apache/flink/pull/6898#discussion_r227444801 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java ## @@ -450,6 +454,7 @@ public void setInitialState(@Nullable JobManagerTaskRestore taskRestore) { SlotProvider slotProvider, boolean queued, LocationPreferenceConstraint locationPreferenceConstraint, + @Nullable Set allPreviousExecutionGraphAllocationIds, Review comment: Same here with `@Nullable` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Extract scheduling-related code from SlotPool > - > > Key: FLINK-10431 > URL: https://issues.apache.org/jira/browse/FLINK-10431 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Affects Versions: 1.7.0 >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Labels: pull-request-available > > The other half of the current scheduling logic is the management of slot > sharing and is located in the SlotPool. We need to extract this logic into > our new Scheduler component from the previous step. This leaves us with a > simpler SlotPool that mainly cares about obtaining, holding, and releasing > slots in interaction with a ResourceManager. The new Scheduler can now > identify slot sharing groups and interacts with the SlotPool. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool
[ https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16660823#comment-16660823 ] ASF GitHub Bot commented on FLINK-10431: tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler URL: https://github.com/apache/flink/pull/6898#discussion_r227444632 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java ## @@ -402,13 +404,15 @@ public void setInitialState(@Nullable JobManagerTaskRestore taskRestore) { public CompletableFuture scheduleForExecution( SlotProvider slotProvider, boolean queued, - LocationPreferenceConstraint locationPreferenceConstraint) { + LocationPreferenceConstraint locationPreferenceConstraint, + @Nullable Set allPreviousExecutionGraphAllocationIds) { Review comment: Let's try not to pass in null arguments This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Extract scheduling-related code from SlotPool > - > > Key: FLINK-10431 > URL: https://issues.apache.org/jira/browse/FLINK-10431 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Affects Versions: 1.7.0 >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Labels: pull-request-available > > The other half of the current scheduling logic is the management of slot > sharing and is located in the SlotPool. We need to extract this logic into > our new Scheduler component from the previous step. This leaves us with a > simpler SlotPool that mainly cares about obtaining, holding, and releasing > slots in interaction with a ResourceManager. The new Scheduler can now > identify slot sharing groups and interacts with the SlotPool. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler
tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler URL: https://github.com/apache/flink/pull/6898#discussion_r227444632 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java ## @@ -402,13 +404,15 @@ public void setInitialState(@Nullable JobManagerTaskRestore taskRestore) { public CompletableFuture scheduleForExecution( SlotProvider slotProvider, boolean queued, - LocationPreferenceConstraint locationPreferenceConstraint) { + LocationPreferenceConstraint locationPreferenceConstraint, + @Nullable Set allPreviousExecutionGraphAllocationIds) { Review comment: Let's try not to pass in null arguments This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool
[ https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16660822#comment-16660822 ] ASF GitHub Bot commented on FLINK-10431: tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler URL: https://github.com/apache/flink/pull/6898#discussion_r22705 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java ## @@ -385,7 +386,8 @@ public void setInitialState(@Nullable JobManagerTaskRestore taskRestore) { return scheduleForExecution( resourceProvider, allowQueued, - LocationPreferenceConstraint.ANY); + LocationPreferenceConstraint.ANY, + null); Review comment: Let's not use `null` but instead `Collections.emptySet()` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Extract scheduling-related code from SlotPool > - > > Key: FLINK-10431 > URL: https://issues.apache.org/jira/browse/FLINK-10431 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Affects Versions: 1.7.0 >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Labels: pull-request-available > > The other half of the current scheduling logic is the management of slot > sharing and is located in the SlotPool. We need to extract this logic into > our new Scheduler component from the previous step. This leaves us with a > simpler SlotPool that mainly cares about obtaining, holding, and releasing > slots in interaction with a ResourceManager. The new Scheduler can now > identify slot sharing groups and interacts with the SlotPool. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler
tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler URL: https://github.com/apache/flink/pull/6898#discussion_r22705 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java ## @@ -385,7 +386,8 @@ public void setInitialState(@Nullable JobManagerTaskRestore taskRestore) { return scheduleForExecution( resourceProvider, allowQueued, - LocationPreferenceConstraint.ANY); + LocationPreferenceConstraint.ANY, + null); Review comment: Let's not use `null` but instead `Collections.emptySet()` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Closed] (FLINK-10622) Add end-to-end test for temporal table joins
[ https://issues.apache.org/jira/browse/FLINK-10622?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Piotr Nowojski closed FLINK-10622. -- Resolution: Won't Fix After thinking more about this, it should be enough to cover Temporal Joins with unit tests and ITCases. > Add end-to-end test for temporal table joins > > > Key: FLINK-10622 > URL: https://issues.apache.org/jira/browse/FLINK-10622 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL, Tests >Affects Versions: 1.7.0 >Reporter: Till Rohrmann >Assignee: Piotr Nowojski >Priority: Critical > Fix For: 1.7.0 > > > We should add an end-to-end test to test the new time versioned joins > functionality. Maybe we can add this to the existing Stream SQL end-to-end > test {{test_streaming_sql.sh}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10220) StreamSQL E2E test fails on travis
[ https://issues.apache.org/jira/browse/FLINK-10220?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16660814#comment-16660814 ] ASF GitHub Bot commented on FLINK-10220: dawidwys opened a new pull request #6908: [FLINK-10220][e2e] Removing logs for streaming sql e2e test before validation URL: https://github.com/apache/flink/pull/6908 ## Brief change log * It properly checks logs for exception & errors (previously it was nondeterministic) * Removing logs for streaming sql e2e test, because exceptions there are expected Explanation for this change: Previously the last grep command had `-iq` flag which was terminated after first occurance of exception/error, which resulted in pipe failure and the whole assertion not working. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > StreamSQL E2E test fails on travis > -- > > Key: FLINK-10220 > URL: https://issues.apache.org/jira/browse/FLINK-10220 > Project: Flink > Issue Type: Bug > Components: Table API SQL, Tests >Affects Versions: 1.7.0 >Reporter: Chesnay Schepler >Assignee: Hequn Cheng >Priority: Critical > Labels: pull-request-available > Fix For: 1.7.0 > > > https://travis-ci.org/zentol/flink-ci/jobs/420972344 > {code} > [FAIL] 'Streaming SQL end-to-end test' failed after 1 minutes and 49 seconds! > Test exited with exit code 0 but the logs contained errors, exceptions or > non-empty .out files > 2018-08-27 07:34:36,311 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph- window: > (TumblingGroupWindow('w$, 'rowtime, 2.millis)), select: ($SUM0(correct) > AS correct, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS > w$rowtime, proctime('w$) AS w$proctime) -> select: (correct, w$start AS > rowtime) -> to: Row -> Map -> Sink: Unnamed (1/1) > (97d055e4661ff3361a504626257d406d) switched from RUNNING to FAILED. > java.lang.RuntimeException: Exception occurred while processing valve output > watermark: > at > org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:265) > at > org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189) > at > org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111) > at > org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:184) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) > at java.lang.Thread.run(Thread.java:748) > Caused by: > org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: > Could not forward element to next operator > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:596) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:689) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:667) > at >