[jira] [Assigned] (FLINK-10500) Let ExecutionGraphDriver react to fail signal

2018-10-23 Thread Shimin Yang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10500?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shimin Yang reassigned FLINK-10500:
---

Assignee: Shimin Yang

> Let ExecutionGraphDriver react to fail signal
> -
>
> Key: FLINK-10500
> URL: https://issues.apache.org/jira/browse/FLINK-10500
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Affects Versions: 1.7.0
>Reporter: Till Rohrmann
>Assignee: Shimin Yang
>Priority: Major
> Fix For: 1.7.0
>
>
> In order to scale down when there are not enough resources available or if 
> TMs died, the {{ExecutionGraphDriver}} needs to learn about a failure. 
> Depending on the failure type and the available set of resources, it can then 
> decide to scale the job down or simply restart. In the scope of this issue, 
> the {{ExecutionGraphDriver}} should simply call into the {{RestartStrategy}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10618) Introduce catalog for Flink tables

2018-10-23 Thread Shimin Yang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10618?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16661688#comment-16661688
 ] 

Shimin Yang commented on FLINK-10618:
-

Sounds good. Do you have a draft of design doc?

> Introduce catalog for Flink tables
> --
>
> Key: FLINK-10618
> URL: https://issues.apache.org/jira/browse/FLINK-10618
> Project: Flink
>  Issue Type: New Feature
>  Components: SQL Client
>Affects Versions: 1.6.1
>Reporter: Xuefu Zhang
>Assignee: Xuefu Zhang
>Priority: Major
>
> Besides meta objects such as tables that may come from an 
> {{ExternalCatalog}}, Flink also deals with tables/views/functions that are 
> created on the fly (in memory), or specified in a configuration file. Those 
> objects don't belong to any {{ExternalCatalog}}, yet Flink either stores them 
> in memory, which are non-persistent, or recreates them from a file, which is 
> a big pain for the user. Those objects are only known to Flink but Flink has 
> a poor management for them.
> Since they are typical objects in a database catalog, it's natural to have a 
> catalog that manages those objects. The interface will be similar to 
> {{ExternalCatalog}}, which contains meta objects that are not managed by 
> Flink. There are several possible implementations of the Flink internal 
> catalog interface: memory, file, external registry (such as confluent schema 
> registry or Hive metastore), and relational database, etc. 
> The initial functionality as well as the catalog hierarchy could be very 
> simple. The basic functionality of the catalog will be mostly create, alter, 
> and drop tables, views, functions, etc. Obviously, this can evolve over the 
> time.
> We plan to provide implementations with memory, file, and Hive metastore, and 
> will be plugged in at SQL-Client layer.
> Please provide your feedback.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10659) get rid of splitting by comma for ProgramArgsQueryParameter

2018-10-23 Thread Kan (JIRA)
Kan created FLINK-10659:
---

 Summary: get rid of splitting by comma for 
ProgramArgsQueryParameter
 Key: FLINK-10659
 URL: https://issues.apache.org/jira/browse/FLINK-10659
 Project: Flink
  Issue Type: Improvement
  Components: REST
 Environment: I encountered this issue on flink 1.6
Reporter: Kan


Flink rest handler splits query param {color:#59afe1}program-args{color} by 
comma. But it will then report error "Expected only one value". Same as the 
other query params.

 

Shall we remove the splitting by comma from the corresponding XxxQueryParameter 
class?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10658) org.apache.flink.util.FlinkException: Releasing shared slot parent.

2018-10-23 Thread vinoyang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10658?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16661682#comment-16661682
 ] 

vinoyang commented on FLINK-10658:
--

The behavior of releasing slots is usually triggered by the Flink framework and 
does not require application level intervention. Therefore, when you see it, it 
may be an unexpected behavior (perhaps a bug).

> org.apache.flink.util.FlinkException: Releasing shared slot parent.
> ---
>
> Key: FLINK-10658
> URL: https://issues.apache.org/jira/browse/FLINK-10658
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.5.4
>Reporter: chauncy
>Priority: Major
>
> i don't when throw the exception  who tell me ?  thanks



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10658) org.apache.flink.util.FlinkException: Releasing shared slot parent.

2018-10-23 Thread chauncy (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10658?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16661676#comment-16661676
 ] 

chauncy commented on FLINK-10658:
-

[~yanghua] confuse, how release that child slots ?  in my app code or flink 
framework 

> org.apache.flink.util.FlinkException: Releasing shared slot parent.
> ---
>
> Key: FLINK-10658
> URL: https://issues.apache.org/jira/browse/FLINK-10658
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.5.4
>Reporter: chauncy
>Priority: Major
>
> i don't when throw the exception  who tell me ?  thanks



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10658) org.apache.flink.util.FlinkException: Releasing shared slot parent.

2018-10-23 Thread vinoyang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10658?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16661669#comment-16661669
 ] 

vinoyang commented on FLINK-10658:
--

hi [~chauncy] When the SharedSlot is being released, this exception will be 
generated if it still has child slots that has not been released.

> org.apache.flink.util.FlinkException: Releasing shared slot parent.
> ---
>
> Key: FLINK-10658
> URL: https://issues.apache.org/jira/browse/FLINK-10658
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.5.4
>Reporter: chauncy
>Priority: Major
>
> i don't when throw the exception  who tell me ?  thanks



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10658) org.apache.flink.util.FlinkException: Releasing shared slot parent.

2018-10-23 Thread chauncy (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10658?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

chauncy updated FLINK-10658:

Description: i don't when throw the exception  (was: i don't why throw the )

> org.apache.flink.util.FlinkException: Releasing shared slot parent.
> ---
>
> Key: FLINK-10658
> URL: https://issues.apache.org/jira/browse/FLINK-10658
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.5.4
>Reporter: chauncy
>Priority: Major
>
> i don't when throw the exception



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10658) org.apache.flink.util.FlinkException: Releasing shared slot parent.

2018-10-23 Thread chauncy (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10658?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

chauncy updated FLINK-10658:

Description: i don't when throw the exception  who tell me ?  thanks  (was: 
i don't when throw the exception)

> org.apache.flink.util.FlinkException: Releasing shared slot parent.
> ---
>
> Key: FLINK-10658
> URL: https://issues.apache.org/jira/browse/FLINK-10658
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.5.4
>Reporter: chauncy
>Priority: Major
>
> i don't when throw the exception  who tell me ?  thanks



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10658) org.apache.flink.util.FlinkException: Releasing shared slot parent.

2018-10-23 Thread chauncy (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10658?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

chauncy updated FLINK-10658:

Description: i don't why throw the 

> org.apache.flink.util.FlinkException: Releasing shared slot parent.
> ---
>
> Key: FLINK-10658
> URL: https://issues.apache.org/jira/browse/FLINK-10658
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.5.4
>Reporter: chauncy
>Priority: Major
>
> i don't why throw the 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10658) org.apache.flink.util.FlinkException: Releasing shared slot parent.

2018-10-23 Thread chauncy (JIRA)
chauncy created FLINK-10658:
---

 Summary: org.apache.flink.util.FlinkException: Releasing shared 
slot parent.
 Key: FLINK-10658
 URL: https://issues.apache.org/jira/browse/FLINK-10658
 Project: Flink
  Issue Type: Bug
  Components: DataStream API
Affects Versions: 1.5.4
Reporter: chauncy






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9999) Add ISNUMERIC supported in Table API/SQL

2018-10-23 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16661627#comment-16661627
 ] 

ASF GitHub Bot commented on FLINK-:
---

yanghua closed pull request #6473: [FLINK-] [table] Add ISNUMERIC supported 
in Table API/SQL
URL: https://github.com/apache/flink/pull/6473
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/dev/table/sql.md b/docs/dev/table/sql.md
index 366e3fdcc64..76d801ca960 100644
--- a/docs/dev/table/sql.md
+++ b/docs/dev/table/sql.md
@@ -1842,6 +1842,16 @@ RPAD(text string, len integer, pad string)
 
   
 {% highlight text %}
+ISNUMERIC(text string)
+{% endhighlight %}
+  
+  
+Returns an Integer to indicate if the text string is a numeric 
value, supports some characters that are not numbers, such as plus (+), minus 
(-), and valid currency symbols such as the dollar sign ($), if true, returns 
1, otherwise returns 0, if text is NULL, returns NULL. E.g. 
ISNUMERIC('123.0') returns 1.
+  
+
+
+  
+{% highlight text %}
 FROM_BASE64(text string)
 {% endhighlight %}
   
diff --git a/docs/dev/table/tableApi.md b/docs/dev/table/tableApi.md
index 6e202f19d5d..61f74d51dee 100644
--- a/docs/dev/table/tableApi.md
+++ b/docs/dev/table/tableApi.md
@@ -2477,6 +2477,17 @@ STRING.rpad(len INT, pad STRING)
 
   
 {% highlight java %}
+STRING.isNumeric()
+{% endhighlight %}
+  
+
+  
+Returns an Integer to indicate if the text string is a numeric 
value, supports some characters that are not numbers, such as plus (+), minus 
(-), and valid currency symbols such as the dollar sign ($), if true, returns 
1, otherwise returns 0, if text is NULL, returns NULL. E.g. "123".isNumeric() 
returns 1.
+  
+
+
+  
+{% highlight java %}
 STRING.fromBase64()
 {% endhighlight %}
   
@@ -4025,6 +4036,18 @@ STRING.initCap()
   
 
 
+
+  
+{% highlight scala %}
+STRING.isNumeric()
+{% endhighlight %}
+  
+
+  
+Returns an Integer to indicate if the text string is a numeric 
value, supports some characters that are not numbers, such as plus (+), minus 
(-), and valid currency symbols such as the dollar sign ($), if true, returns 
1, otherwise returns 0, if text is NULL, returns NULL. E.g. "123".isNumeric() 
returns 1.
+  
+
+
   
 
 
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
index 35d2167848a..4c3c411bbe0 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
@@ -544,6 +544,11 @@ trait ImplicitExpressionOperations {
   def overlay(newString: Expression, starting: Expression, length: Expression) 
=
 Overlay(expr, newString, starting, length)
 
+  /**
+* Returns an Integer to indicate if the text string is a numeric value.
+*/
+  def isNumeric() = IsNumeric(expr)
+
   /**
 * Returns the base string decoded with base64.
 */
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala
index 0e0f709eabc..e02dc6e8869 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala
@@ -113,5 +113,7 @@ object BuiltInMethods {
 
   val BIN = Types.lookupMethod(classOf[JLong], "toBinaryString", classOf[Long])
 
+  val ISNUMERIC = Types.lookupMethod(classOf[ScalarFunctions], "isNumeric", 
classOf[String])
+
   val FROMBASE64 = Types.lookupMethod(classOf[ScalarFunctions], "fromBase64", 
classOf[String])
 }
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala
index a5c275ab415..39f1f4948b1 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala
@@ -146,6 +146,12 @@ object FunctionGenerator {
 STRING_TYPE_INFO,
 BuiltInMethod.OVERLAY.method)
 
+  addSqlFunctionMethod(
+ISNUMERIC,
+Seq(STRING_TYPE_INFO),
+

[GitHub] yanghua closed pull request #6473: [FLINK-9999] [table] Add ISNUMERIC supported in Table API/SQL

2018-10-23 Thread GitBox
yanghua closed pull request #6473: [FLINK-] [table] Add ISNUMERIC supported 
in Table API/SQL
URL: https://github.com/apache/flink/pull/6473
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/dev/table/sql.md b/docs/dev/table/sql.md
index 366e3fdcc64..76d801ca960 100644
--- a/docs/dev/table/sql.md
+++ b/docs/dev/table/sql.md
@@ -1842,6 +1842,16 @@ RPAD(text string, len integer, pad string)
 
   
 {% highlight text %}
+ISNUMERIC(text string)
+{% endhighlight %}
+  
+  
+Returns an Integer to indicate if the text string is a numeric 
value, supports some characters that are not numbers, such as plus (+), minus 
(-), and valid currency symbols such as the dollar sign ($), if true, returns 
1, otherwise returns 0, if text is NULL, returns NULL. E.g. 
ISNUMERIC('123.0') returns 1.
+  
+
+
+  
+{% highlight text %}
 FROM_BASE64(text string)
 {% endhighlight %}
   
diff --git a/docs/dev/table/tableApi.md b/docs/dev/table/tableApi.md
index 6e202f19d5d..61f74d51dee 100644
--- a/docs/dev/table/tableApi.md
+++ b/docs/dev/table/tableApi.md
@@ -2477,6 +2477,17 @@ STRING.rpad(len INT, pad STRING)
 
   
 {% highlight java %}
+STRING.isNumeric()
+{% endhighlight %}
+  
+
+  
+Returns an Integer to indicate if the text string is a numeric 
value, supports some characters that are not numbers, such as plus (+), minus 
(-), and valid currency symbols such as the dollar sign ($), if true, returns 
1, otherwise returns 0, if text is NULL, returns NULL. E.g. "123".isNumeric() 
returns 1.
+  
+
+
+  
+{% highlight java %}
 STRING.fromBase64()
 {% endhighlight %}
   
@@ -4025,6 +4036,18 @@ STRING.initCap()
   
 
 
+
+  
+{% highlight scala %}
+STRING.isNumeric()
+{% endhighlight %}
+  
+
+  
+Returns an Integer to indicate if the text string is a numeric 
value, supports some characters that are not numbers, such as plus (+), minus 
(-), and valid currency symbols such as the dollar sign ($), if true, returns 
1, otherwise returns 0, if text is NULL, returns NULL. E.g. "123".isNumeric() 
returns 1.
+  
+
+
   
 
 
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
index 35d2167848a..4c3c411bbe0 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
@@ -544,6 +544,11 @@ trait ImplicitExpressionOperations {
   def overlay(newString: Expression, starting: Expression, length: Expression) 
=
 Overlay(expr, newString, starting, length)
 
+  /**
+* Returns an Integer to indicate if the text string is a numeric value.
+*/
+  def isNumeric() = IsNumeric(expr)
+
   /**
 * Returns the base string decoded with base64.
 */
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala
index 0e0f709eabc..e02dc6e8869 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala
@@ -113,5 +113,7 @@ object BuiltInMethods {
 
   val BIN = Types.lookupMethod(classOf[JLong], "toBinaryString", classOf[Long])
 
+  val ISNUMERIC = Types.lookupMethod(classOf[ScalarFunctions], "isNumeric", 
classOf[String])
+
   val FROMBASE64 = Types.lookupMethod(classOf[ScalarFunctions], "fromBase64", 
classOf[String])
 }
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala
index a5c275ab415..39f1f4948b1 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala
@@ -146,6 +146,12 @@ object FunctionGenerator {
 STRING_TYPE_INFO,
 BuiltInMethod.OVERLAY.method)
 
+  addSqlFunctionMethod(
+ISNUMERIC,
+Seq(STRING_TYPE_INFO),
+INT_TYPE_INFO,
+BuiltInMethods.ISNUMERIC)
+
   addSqlFunctionMethod(
 FROM_BASE64,
 Seq(STRING_TYPE_INFO),
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/stringExpressions.scala
 

[jira] [Updated] (FLINK-10425) taskmanager.host is not respected

2018-10-23 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10425?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-10425:
---
Labels: pull-request-available  (was: )

> taskmanager.host is not respected
> -
>
> Key: FLINK-10425
> URL: https://issues.apache.org/jira/browse/FLINK-10425
> Project: Flink
>  Issue Type: Bug
>  Components: TaskManager
>Affects Versions: 1.6.1
>Reporter: Andrew Kowpak
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> The documentation states that taskmanager.host can be set to override the 
> discovered hostname, however, setting this value has no effect.
> Looking at the code, the value never seems to be used.  Instead, the 
> deprecated taskmanager.hostname is still used.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] yanghua opened a new pull request #6913: [FLINK-10425] taskmanager.host is not respected

2018-10-23 Thread GitBox
yanghua opened a new pull request #6913: [FLINK-10425] taskmanager.host is not 
respected
URL: https://github.com/apache/flink/pull/6913
 
 
   ## What is the purpose of the change
   
   *This pull request fixes taskmanager.host is not respected*
   
   
   ## Brief change log
   
 - *Fix taskmanager.host is not respected*
   
   ## Verifying this change
   
   This change is already covered by existing tests*.
   
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / **no**)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
 - The serializers: (yes / **no** / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / **no** 
/ don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
 - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / **no**)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ **not documented**)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10425) taskmanager.host is not respected

2018-10-23 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10425?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16661563#comment-16661563
 ] 

ASF GitHub Bot commented on FLINK-10425:


yanghua opened a new pull request #6913: [FLINK-10425] taskmanager.host is not 
respected
URL: https://github.com/apache/flink/pull/6913
 
 
   ## What is the purpose of the change
   
   *This pull request fixes taskmanager.host is not respected*
   
   
   ## Brief change log
   
 - *Fix taskmanager.host is not respected*
   
   ## Verifying this change
   
   This change is already covered by existing tests*.
   
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / **no**)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
 - The serializers: (yes / **no** / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / **no** 
/ don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
 - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / **no**)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ **not documented**)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> taskmanager.host is not respected
> -
>
> Key: FLINK-10425
> URL: https://issues.apache.org/jira/browse/FLINK-10425
> Project: Flink
>  Issue Type: Bug
>  Components: TaskManager
>Affects Versions: 1.6.1
>Reporter: Andrew Kowpak
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> The documentation states that taskmanager.host can be set to override the 
> discovered hostname, however, setting this value has no effect.
> Looking at the code, the value never seems to be used.  Instead, the 
> deprecated taskmanager.hostname is still used.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10657) TPCHQuery3 fail with IllegalAccessException

2018-10-23 Thread JIN SUN (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10657?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

JIN SUN updated FLINK-10657:

Issue Type: Bug  (was: Improvement)

> TPCHQuery3 fail with IllegalAccessException
> ---
>
> Key: FLINK-10657
> URL: https://issues.apache.org/jira/browse/FLINK-10657
> Project: Flink
>  Issue Type: Bug
>  Components: E2E Tests
>Reporter: JIN SUN
>Assignee: JIN SUN
>Priority: Trivial
>  Labels: easy-fix, pull-request-available
> Fix For: 1.7.0
>
>
> Similar with [FLINK-7998|https://issues.apache.org/jira/browse/FLINK-7998], 
> ShoppingPriorityItem in example TPCHQuery3.java are set to private. This 
> causes an IllegalAccessException exception because of reflection check in 
> dynamic class instantiation. Making them public resolves the problem 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10657) TPCHQuery3 fail with IllegalAccessException

2018-10-23 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10657?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-10657:
---
Labels: easy-fix pull-request-available  (was: easy-fix)

> TPCHQuery3 fail with IllegalAccessException
> ---
>
> Key: FLINK-10657
> URL: https://issues.apache.org/jira/browse/FLINK-10657
> Project: Flink
>  Issue Type: Improvement
>  Components: E2E Tests
>Reporter: JIN SUN
>Assignee: JIN SUN
>Priority: Trivial
>  Labels: easy-fix, pull-request-available
> Fix For: 1.7.0
>
>
> Similar with [FLINK-7998|https://issues.apache.org/jira/browse/FLINK-7998], 
> ShoppingPriorityItem in example TPCHQuery3.java are set to private. This 
> causes an IllegalAccessException exception because of reflection check in 
> dynamic class instantiation. Making them public resolves the problem 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10657) TPCHQuery3 fail with IllegalAccessException

2018-10-23 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16661532#comment-16661532
 ] 

ASF GitHub Bot commented on FLINK-10657:


isunjin opened a new pull request #6912: [FLINK-10657] TPCHQuery3 fail with 
IllegalAccessException
URL: https://github.com/apache/flink/pull/6912
 
 
   This is a trivial fix.
   
   Similar with 
[FLINK-7998](https://issues.apache.org/jira/browse/FLINK-10657), 
ShoppingPriorityItem in example TPCHQuery3.java are set to private. This causes 
an IllegalAccessException exception because of reflection check in dynamic 
class instantiation. Making them public resolves the problem 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> TPCHQuery3 fail with IllegalAccessException
> ---
>
> Key: FLINK-10657
> URL: https://issues.apache.org/jira/browse/FLINK-10657
> Project: Flink
>  Issue Type: Improvement
>  Components: E2E Tests
>Reporter: JIN SUN
>Assignee: JIN SUN
>Priority: Trivial
>  Labels: easy-fix, pull-request-available
> Fix For: 1.7.0
>
>
> Similar with [FLINK-7998|https://issues.apache.org/jira/browse/FLINK-7998], 
> ShoppingPriorityItem in example TPCHQuery3.java are set to private. This 
> causes an IllegalAccessException exception because of reflection check in 
> dynamic class instantiation. Making them public resolves the problem 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] isunjin opened a new pull request #6912: [FLINK-10657] TPCHQuery3 fail with IllegalAccessException

2018-10-23 Thread GitBox
isunjin opened a new pull request #6912: [FLINK-10657] TPCHQuery3 fail with 
IllegalAccessException
URL: https://github.com/apache/flink/pull/6912
 
 
   This is a trivial fix.
   
   Similar with 
[FLINK-7998](https://issues.apache.org/jira/browse/FLINK-10657), 
ShoppingPriorityItem in example TPCHQuery3.java are set to private. This causes 
an IllegalAccessException exception because of reflection check in dynamic 
class instantiation. Making them public resolves the problem 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-10657) TPCHQuery3 fail with IllegalAccessException

2018-10-23 Thread JIN SUN (JIRA)
JIN SUN created FLINK-10657:
---

 Summary: TPCHQuery3 fail with IllegalAccessException
 Key: FLINK-10657
 URL: https://issues.apache.org/jira/browse/FLINK-10657
 Project: Flink
  Issue Type: Improvement
  Components: E2E Tests
Reporter: JIN SUN
Assignee: JIN SUN
 Fix For: 1.7.0


Similar with [FLINK-7998|https://issues.apache.org/jira/browse/FLINK-7998], 
ShoppingPriorityItem in example TPCHQuery3.java are set to private. This causes 
an IllegalAccessException exception because of reflection check in dynamic 
class instantiation. Making them public resolves the problem 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10656) Refactor org.apache.flink.runtime.io.network.api.reader.ReaderBase

2018-10-23 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10656?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-10656:
---
Labels: pull-request-available  (was: )

> Refactor org.apache.flink.runtime.io.network.api.reader.ReaderBase
> --
>
> Key: FLINK-10656
> URL: https://issues.apache.org/jira/browse/FLINK-10656
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Reporter: JIN SUN
>Assignee: JIN SUN
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>
> The interface of org.apache.flink.runtime.io.network.api.reader.ReaderBase is 
> not very clean, the API in it are called only by iteration and handle event. 
> which is not related the name ReaderBase. And the functionality is 
> independent, so propose to change the name and split the interface to two 
> isolated interface. 
> More details please look at the PR.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10656) Refactor org.apache.flink.runtime.io.network.api.reader.ReaderBase

2018-10-23 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10656?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16661403#comment-16661403
 ] 

ASF GitHub Bot commented on FLINK-10656:


isunjin opened a new pull request #6911: [FLINK-10656] Refactor 
org.apache.flink.runtime.io.network.api.reader…
URL: https://github.com/apache/flink/pull/6911
 
 
   
   ## What is the purpose of the change
   
   The interface of 
```org.apache.flink.runtime.io.network.api.reader.ReaderBase``` is not very 
clean, the API in it are called only by iteration and handle event. which is 
not related the name ```ReaderBase```. And the functionality is independent, so 
propose to change the name and split the interface to two isolated interface. 
   
   ## Brief change log
   
 - *Split interface ```ReaderBase``` to ```IterationReader``` and 
```TaskEventSender```*
 - *rename ```getCurrentBuffer``` to ```resetCurrentBuffer```*
   
   ## Verifying this change
   
   This change is a trivial rework / code cleanup without any test coverage.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Refactor org.apache.flink.runtime.io.network.api.reader.ReaderBase
> --
>
> Key: FLINK-10656
> URL: https://issues.apache.org/jira/browse/FLINK-10656
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Reporter: JIN SUN
>Assignee: JIN SUN
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>
> The interface of org.apache.flink.runtime.io.network.api.reader.ReaderBase is 
> not very clean, the API in it are called only by iteration and handle event. 
> which is not related the name ReaderBase. And the functionality is 
> independent, so propose to change the name and split the interface to two 
> isolated interface. 
> More details please look at the PR.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] isunjin opened a new pull request #6911: [FLINK-10656] Refactor org.apache.flink.runtime.io.network.api.reader…

2018-10-23 Thread GitBox
isunjin opened a new pull request #6911: [FLINK-10656] Refactor 
org.apache.flink.runtime.io.network.api.reader…
URL: https://github.com/apache/flink/pull/6911
 
 
   
   ## What is the purpose of the change
   
   The interface of 
```org.apache.flink.runtime.io.network.api.reader.ReaderBase``` is not very 
clean, the API in it are called only by iteration and handle event. which is 
not related the name ```ReaderBase```. And the functionality is independent, so 
propose to change the name and split the interface to two isolated interface. 
   
   ## Brief change log
   
 - *Split interface ```ReaderBase``` to ```IterationReader``` and 
```TaskEventSender```*
 - *rename ```getCurrentBuffer``` to ```resetCurrentBuffer```*
   
   ## Verifying this change
   
   This change is a trivial rework / code cleanup without any test coverage.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-10656) Refactor org.apache.flink.runtime.io.network.api.reader.ReaderBase

2018-10-23 Thread JIN SUN (JIRA)
JIN SUN created FLINK-10656:
---

 Summary: Refactor 
org.apache.flink.runtime.io.network.api.reader.ReaderBase
 Key: FLINK-10656
 URL: https://issues.apache.org/jira/browse/FLINK-10656
 Project: Flink
  Issue Type: Improvement
  Components: Distributed Coordination
Reporter: JIN SUN
Assignee: JIN SUN
 Fix For: 1.8.0


The interface of org.apache.flink.runtime.io.network.api.reader.ReaderBase is 
not very clean, the API in it are called only by iteration and handle event. 
which is not related the name ReaderBase. And the functionality is independent, 
so propose to change the name and split the interface to two isolated 
interface. 

More details please look at the PR.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] tzulitai commented on a change in pull request #6875: [FLINK-9808] [state backends] Migrate state when necessary in state backends

2018-10-23 Thread GitBox
tzulitai commented on a change in pull request #6875: [FLINK-9808] [state 
backends] Migrate state when necessary in state backends
URL: https://github.com/apache/flink/pull/6875#discussion_r227538700
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendMigrationTestBase.java
 ##
 @@ -0,0 +1,775 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.StateObjectCollection;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
+import org.apache.flink.types.StringValue;
+import org.apache.flink.util.TestLogger;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.RunnableFuture;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for the {@link KeyedStateBackend} and {@link OperatorStateBackend} as 
produced
+ * by various {@link StateBackend}s.
+ */
+@SuppressWarnings("serial")
+public abstract class StateBackendMigrationTestBase extends TestLogger {
+
+   @Rule
+   public final ExpectedException expectedException = 
ExpectedException.none();
+
+   // lazily initialized stream storage
+   private CheckpointStorageLocation checkpointStorageLocation;
+
+   /**
+* Different "personalities" of {@link CustomStringSerializer}. Instead 
of creating
+* different classes we parameterize the serializer with this and
+* {@link CustomStringSerializerSnapshot} will instantiate serializers 
with the correct
+* personality.
+*/
+   public enum SerializerVersion {
+   INITIAL,
+   RESTORE,
+   NEW
+   }
+
+   /**
+* The compatibility behaviour of {@link CustomStringSerializer}. This 
controls what
+* type of serializer {@link CustomStringSerializerSnapshot} will 
create for
+* the different methods that return/create serializers.
+*/
+   public enum SerializerCompatibilityType {
+   COMPATIBLE_AS_IS,
+   REQUIRES_MIGRATION
+   }
+
+   /**
+* The serialization timeliness behaviour of the state backend under 
test.
+*/
+   public enum BackendSerializationTimeliness {
+   ON_ACCESS,
+   ON_CHECKPOINTS
+   }
+
+   @Test
+   @SuppressWarnings("unchecked")
+   public void testValueStateWithSerializerRequiringMigration() throws 
Exception {
+   CustomStringSerializer.resetCountingMaps();
+
+   CheckpointStreamFactory streamFactory = createStreamFactory();
+   SharedStateRegistry sharedStateRegistry = new 
SharedStateRegistry();
+   AbstractKeyedStateBackend backend = 
createKeyedBackend(IntSerializer.INSTANCE);
+
+   ValueStateDescriptor kvId = new ValueStateDescriptor<>(
+   "id",
+   new CustomStringSerializer(
+   
org.apache.flink.runtime.state.StateBackendMigrationTestBase.SerializerCompatibilityType.REQUIRES_MIGRATION,
 
org.apache.flink.runtime.state.StateBackendMigrationTestBase.SerializerVersion.INITIAL));
 
 Review comment:
   Fixed


This is an automated message from the Apache Git Service.
To respond to the message, please 

[jira] [Commented] (FLINK-9808) Implement state conversion procedure in state backends

2018-10-23 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9808?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16661178#comment-16661178
 ] 

ASF GitHub Bot commented on FLINK-9808:
---

tzulitai commented on a change in pull request #6875: [FLINK-9808] [state 
backends] Migrate state when necessary in state backends
URL: https://github.com/apache/flink/pull/6875#discussion_r227538700
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendMigrationTestBase.java
 ##
 @@ -0,0 +1,775 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.StateObjectCollection;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
+import org.apache.flink.types.StringValue;
+import org.apache.flink.util.TestLogger;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.RunnableFuture;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for the {@link KeyedStateBackend} and {@link OperatorStateBackend} as 
produced
+ * by various {@link StateBackend}s.
+ */
+@SuppressWarnings("serial")
+public abstract class StateBackendMigrationTestBase extends TestLogger {
+
+   @Rule
+   public final ExpectedException expectedException = 
ExpectedException.none();
+
+   // lazily initialized stream storage
+   private CheckpointStorageLocation checkpointStorageLocation;
+
+   /**
+* Different "personalities" of {@link CustomStringSerializer}. Instead 
of creating
+* different classes we parameterize the serializer with this and
+* {@link CustomStringSerializerSnapshot} will instantiate serializers 
with the correct
+* personality.
+*/
+   public enum SerializerVersion {
+   INITIAL,
+   RESTORE,
+   NEW
+   }
+
+   /**
+* The compatibility behaviour of {@link CustomStringSerializer}. This 
controls what
+* type of serializer {@link CustomStringSerializerSnapshot} will 
create for
+* the different methods that return/create serializers.
+*/
+   public enum SerializerCompatibilityType {
+   COMPATIBLE_AS_IS,
+   REQUIRES_MIGRATION
+   }
+
+   /**
+* The serialization timeliness behaviour of the state backend under 
test.
+*/
+   public enum BackendSerializationTimeliness {
+   ON_ACCESS,
+   ON_CHECKPOINTS
+   }
+
+   @Test
+   @SuppressWarnings("unchecked")
+   public void testValueStateWithSerializerRequiringMigration() throws 
Exception {
+   CustomStringSerializer.resetCountingMaps();
+
+   CheckpointStreamFactory streamFactory = createStreamFactory();
+   SharedStateRegistry sharedStateRegistry = new 
SharedStateRegistry();
+   AbstractKeyedStateBackend backend = 
createKeyedBackend(IntSerializer.INSTANCE);
+
+   ValueStateDescriptor kvId = new ValueStateDescriptor<>(
+   "id",
+   new CustomStringSerializer(
+   
org.apache.flink.runtime.state.StateBackendMigrationTestBase.SerializerCompatibilityType.REQUIRES_MIGRATION,
 

[jira] [Commented] (FLINK-10490) OperatorSnapshotUtil should probably use SavepointV2Serializer

2018-10-23 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10490?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16660994#comment-16660994
 ] 

ASF GitHub Bot commented on FLINK-10490:


StefanRRichter opened a new pull request #6910: [FLINK-10490][tests] 
OperatorSnapshotUtil should use SavepointV2Seria…
URL: https://github.com/apache/flink/pull/6910
 
 
   ## Brief change log
   
   Methods in `OperatorSnapshotUtil` that are used in current tests are still 
based on the outdated `SavepointV1Serializer`. This PR updates this to the 
current `SavepointV2Serializer`.
   
   ## Verifying this change
   
   This change is already covered by existing tests.
   
   ## Does this pull request potentially affect one of the following parts:
   
   Change is only relevant for tests.
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not applicable)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> OperatorSnapshotUtil should probably use SavepointV2Serializer
> --
>
> Key: FLINK-10490
> URL: https://issues.apache.org/jira/browse/FLINK-10490
> Project: Flink
>  Issue Type: Test
>  Components: Tests
>Affects Versions: 1.7.0
>Reporter: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
>
> {{OperatorSnapshotUtil}} is used for testing savepoint migration. This 
> utility internally still uses {{SavepointV1Serializer}} and I would assume 
> that it should use {{SavepointV2Serializer}}. I wonder if that means that 
> some newer cases are actually not covered in the migration tests.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10490) OperatorSnapshotUtil should probably use SavepointV2Serializer

2018-10-23 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10490?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-10490:
---
Labels: pull-request-available  (was: )

> OperatorSnapshotUtil should probably use SavepointV2Serializer
> --
>
> Key: FLINK-10490
> URL: https://issues.apache.org/jira/browse/FLINK-10490
> Project: Flink
>  Issue Type: Test
>  Components: Tests
>Affects Versions: 1.7.0
>Reporter: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
>
> {{OperatorSnapshotUtil}} is used for testing savepoint migration. This 
> utility internally still uses {{SavepointV1Serializer}} and I would assume 
> that it should use {{SavepointV2Serializer}}. I wonder if that means that 
> some newer cases are actually not covered in the migration tests.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] StefanRRichter opened a new pull request #6910: [FLINK-10490][tests] OperatorSnapshotUtil should use SavepointV2Seria…

2018-10-23 Thread GitBox
StefanRRichter opened a new pull request #6910: [FLINK-10490][tests] 
OperatorSnapshotUtil should use SavepointV2Seria…
URL: https://github.com/apache/flink/pull/6910
 
 
   ## Brief change log
   
   Methods in `OperatorSnapshotUtil` that are used in current tests are still 
based on the outdated `SavepointV1Serializer`. This PR updates this to the 
current `SavepointV2Serializer`.
   
   ## Verifying this change
   
   This change is already covered by existing tests.
   
   ## Does this pull request potentially affect one of the following parts:
   
   Change is only relevant for tests.
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not applicable)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-8995) Add a test operator with keyed state that uses custom, stateful serializer

2018-10-23 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8995?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16660969#comment-16660969
 ] 

ASF GitHub Bot commented on FLINK-8995:
---

StefanRRichter commented on issue #6909: [FLINK-8995][tests] Add keyed state 
that uses custom stateful seriali…
URL: https://github.com/apache/flink/pull/6909#issuecomment-432332081
 
 
   CC @tzulitai 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add a test operator with keyed state that uses custom, stateful serializer
> --
>
> Key: FLINK-8995
> URL: https://issues.apache.org/jira/browse/FLINK-8995
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.5.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> This test should figure out problems in places where multiple threads would 
> share the same serializer instead of properly duplicating it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] StefanRRichter commented on issue #6909: [FLINK-8995][tests] Add keyed state that uses custom stateful seriali…

2018-10-23 Thread GitBox
StefanRRichter commented on issue #6909: [FLINK-8995][tests] Add keyed state 
that uses custom stateful seriali…
URL: https://github.com/apache/flink/pull/6909#issuecomment-432332081
 
 
   CC @tzulitai 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-8995) Add a test operator with keyed state that uses custom, stateful serializer

2018-10-23 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-8995?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-8995:
--
Labels: pull-request-available  (was: )

> Add a test operator with keyed state that uses custom, stateful serializer
> --
>
> Key: FLINK-8995
> URL: https://issues.apache.org/jira/browse/FLINK-8995
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.5.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> This test should figure out problems in places where multiple threads would 
> share the same serializer instead of properly duplicating it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8995) Add a test operator with keyed state that uses custom, stateful serializer

2018-10-23 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8995?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16660964#comment-16660964
 ] 

ASF GitHub Bot commented on FLINK-8995:
---

StefanRRichter opened a new pull request #6909: [FLINK-8995][tests] Add keyed 
state that uses custom stateful seriali…
URL: https://github.com/apache/flink/pull/6909
 
 
   ## What is the purpose of the change
   
   This PR adds keyed state that uses a custom stateful serializer to the 
allround e2e test job, so that was cover this case.
   
   ## Verifying this change
   
   For example, run `./run-single-test.sh test-scripts/test_ha_datastream.sh`
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not applicable)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add a test operator with keyed state that uses custom, stateful serializer
> --
>
> Key: FLINK-8995
> URL: https://issues.apache.org/jira/browse/FLINK-8995
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.5.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> This test should figure out problems in places where multiple threads would 
> share the same serializer instead of properly duplicating it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] StefanRRichter opened a new pull request #6909: [FLINK-8995][tests] Add keyed state that uses custom stateful seriali…

2018-10-23 Thread GitBox
StefanRRichter opened a new pull request #6909: [FLINK-8995][tests] Add keyed 
state that uses custom stateful seriali…
URL: https://github.com/apache/flink/pull/6909
 
 
   ## What is the purpose of the change
   
   This PR adds keyed state that uses a custom stateful serializer to the 
allround e2e test job, so that was cover this case.
   
   ## Verifying this change
   
   For example, run `./run-single-test.sh test-scripts/test_ha_datastream.sh`
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not applicable)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-10655) RemoteRpcInvocation not overwriting ObjectInputStream's ClassNotFoundException

2018-10-23 Thread Nico Kruber (JIRA)
Nico Kruber created FLINK-10655:
---

 Summary: RemoteRpcInvocation not overwriting ObjectInputStream's 
ClassNotFoundException
 Key: FLINK-10655
 URL: https://issues.apache.org/jira/browse/FLINK-10655
 Project: Flink
  Issue Type: Bug
  Components: Distributed Coordination
Affects Versions: 1.6.1, 1.5.4, 1.7.0
 Environment: {{RemoteRpcInvocation}} tries to give a more details 
{{ClassNotFoundException}} if the method type/argument deserialization fails. 
However, it turns out, once {{ObjectInputStream}} has received a 
{{ClassNotFoundException}}, it will not overwrite this anymore and we can 
therefore not provide a more detailed {{ClassNotFoundException}}.

Instead, the least invasive solution would be to add a suppressed 
{{ClassNotFoundException}} to the existing one. While at it, we could also add 
more details, i.e. the successfully deserialized types and arguments.
Reporter: Nico Kruber
Assignee: Nico Kruber
 Fix For: 1.5.6, 1.6.3, 1.7.0






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool

2018-10-23 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16660910#comment-16660910
 ] 

ASF GitHub Bot commented on FLINK-10431:


StefanRRichter commented on a change in pull request #6898: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler
URL: https://github.com/apache/flink/pull/6898#discussion_r227466414
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PreviousAllocationSlotSelectionStrategy.java
 ##
 @@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.SlotProfile;
+import org.apache.flink.runtime.jobmanager.scheduler.Locality;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
+
+import javax.annotation.Nonnull;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * This class implements a {@link SlotSelectionStrategy} that is based on 
previous allocations and
+ * falls back to using location preference hints if there is no previous 
allocation.
+ */
+public class PreviousAllocationSlotSelectionStrategy implements 
SlotSelectionStrategy {
+
+   public static final PreviousAllocationSlotSelectionStrategy INSTANCE =
+   new PreviousAllocationSlotSelectionStrategy();
+
+   private final LocationPreferenceSlotSelection 
locationPreferenceSlotSelection;
+
+   private PreviousAllocationSlotSelectionStrategy() {
+   this.locationPreferenceSlotSelection = 
LocationPreferenceSlotSelection.INSTANCE;
+   }
+
+   @Nonnull
+   @Override
+   public SlotInfoAndLocality selectBestSlotForProfile(
+   @Nonnull List availableSlots,
+   @Nonnull SlotProfile slotProfile) {
+
+   Collection priorAllocations = 
slotProfile.getPreferredAllocations();
+
+   // First, if there was a prior allocation try to schedule to 
the same/old slot
+   if (!priorAllocations.isEmpty()) {
+
+   HashSet priorAllocationsSet = new 
HashSet<>(priorAllocations);
 
 Review comment:
   In theory: it is a collection and contains on a collection can be expensive, 
depending on the implementation. A hash set offers ideal performance for 
contains.
   In practise, this collections is currently always of size 1, so either way, 
it should not matter much. But as the code could evolve, I prefer not to create 
a potential complexity bomb :)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Extract scheduling-related code from SlotPool
> -
>
> Key: FLINK-10431
> URL: https://issues.apache.org/jira/browse/FLINK-10431
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Affects Versions: 1.7.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
>
> The other half of the current scheduling logic is the management of slot 
> sharing and is located in the SlotPool. We need to extract this logic into 
> our new Scheduler component from the previous step. This leaves us with a 
> simpler SlotPool that mainly cares about obtaining, holding, and releasing 
> slots in interaction with a ResourceManager. The new Scheduler can now 
> identify slot sharing groups and interacts with the SlotPool.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] StefanRRichter commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler

2018-10-23 Thread GitBox
StefanRRichter commented on a change in pull request #6898: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler
URL: https://github.com/apache/flink/pull/6898#discussion_r227466414
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PreviousAllocationSlotSelectionStrategy.java
 ##
 @@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.SlotProfile;
+import org.apache.flink.runtime.jobmanager.scheduler.Locality;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
+
+import javax.annotation.Nonnull;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * This class implements a {@link SlotSelectionStrategy} that is based on 
previous allocations and
+ * falls back to using location preference hints if there is no previous 
allocation.
+ */
+public class PreviousAllocationSlotSelectionStrategy implements 
SlotSelectionStrategy {
+
+   public static final PreviousAllocationSlotSelectionStrategy INSTANCE =
+   new PreviousAllocationSlotSelectionStrategy();
+
+   private final LocationPreferenceSlotSelection 
locationPreferenceSlotSelection;
+
+   private PreviousAllocationSlotSelectionStrategy() {
+   this.locationPreferenceSlotSelection = 
LocationPreferenceSlotSelection.INSTANCE;
+   }
+
+   @Nonnull
+   @Override
+   public SlotInfoAndLocality selectBestSlotForProfile(
+   @Nonnull List availableSlots,
+   @Nonnull SlotProfile slotProfile) {
+
+   Collection priorAllocations = 
slotProfile.getPreferredAllocations();
+
+   // First, if there was a prior allocation try to schedule to 
the same/old slot
+   if (!priorAllocations.isEmpty()) {
+
+   HashSet priorAllocationsSet = new 
HashSet<>(priorAllocations);
 
 Review comment:
   In theory: it is a collection and contains on a collection can be expensive, 
depending on the implementation. A hash set offers ideal performance for 
contains.
   In practise, this collections is currently always of size 1, so either way, 
it should not matter much. But as the code could evolve, I prefer not to create 
a potential complexity bomb :)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool

2018-10-23 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16660888#comment-16660888
 ] 

ASF GitHub Bot commented on FLINK-10431:


tillrohrmann commented on a change in pull request #6898: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler
URL: https://github.com/apache/flink/pull/6898#discussion_r227450833
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultSchedulerFactory.java
 ##
 @@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.Configuration;
+
+import javax.annotation.Nonnull;
+
+import java.util.HashMap;
+
+/**
+ * Default implementation of a {@link SchedulerFactory}.
+ */
+public class DefaultSchedulerFactory implements SchedulerFactory {
+
+   @Nonnull
+   private final SlotSelectionStrategy slotSelectionStrategy;
+
+   public DefaultSchedulerFactory(@Nonnull SlotSelectionStrategy 
slotSelectionStrategy) {
+   this.slotSelectionStrategy = slotSelectionStrategy;
+   }
+
+   @Nonnull
+   @Override
+   public Scheduler createScheduler(@Nonnull SlotPoolGateway 
slotPoolGateway) {
+   return new Scheduler(new HashMap<>(), slotSelectionStrategy, 
slotPoolGateway);
 
 Review comment:
   What could be a reasonable initial size of the `HashMap`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Extract scheduling-related code from SlotPool
> -
>
> Key: FLINK-10431
> URL: https://issues.apache.org/jira/browse/FLINK-10431
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Affects Versions: 1.7.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
>
> The other half of the current scheduling logic is the management of slot 
> sharing and is located in the SlotPool. We need to extract this logic into 
> our new Scheduler component from the previous step. This leaves us with a 
> simpler SlotPool that mainly cares about obtaining, holding, and releasing 
> slots in interaction with a ResourceManager. The new Scheduler can now 
> identify slot sharing groups and interacts with the SlotPool.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool

2018-10-23 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16660895#comment-16660895
 ] 

ASF GitHub Bot commented on FLINK-10431:


tillrohrmann commented on a change in pull request #6898: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler
URL: https://github.com/apache/flink/pull/6898#discussion_r227456207
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/Scheduler.java
 ##
 @@ -0,0 +1,516 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.SlotProfile;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.instance.SlotSharingGroupId;
+import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
+import org.apache.flink.runtime.jobmanager.scheduler.Locality;
+import 
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
+import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.SlotContext;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
+import org.apache.flink.runtime.jobmaster.SlotOwner;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.util.AbstractID;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.Executor;
+import java.util.function.BooleanSupplier;
+
+/**
+ * Scheduler that assigns tasks to slots. This class is currently work in 
progress, comments will be updated as we
+ * move forward.
+ */
+public class Scheduler implements SlotProvider, SlotOwner {
+
+   /** Logger */
+   private final Logger log = LoggerFactory.getLogger(getClass());
+
+   /** Strategy that selects the best slot for a given slot allocation 
request. */
+   @Nonnull
+   private final SlotSelectionStrategy slotSelectionStrategy;
+
+   /** Managers for the different slot sharing groups. */
+   @Nonnull
+   private final Map 
slotSharingManagersMap;
+
+   /** The slot pool from which slots are allocated. */
+   @Nonnull
+   private final SlotPoolGateway slotPoolGateway;
+
+   /** Executor for running tasks in the job master's main thread. */
+   @Nonnull
+   private Executor componentMainThreadExecutor;
+
+   /** Predicate to check if the current thread is the job master's main 
thread. */
+   @Nonnull
+   private BooleanSupplier componentMainThreadCheck;
+
+
+   public Scheduler(
+   @Nonnull Map 
slotSharingManagersMap,
+   @Nonnull SlotSelectionStrategy slotSelectionStrategy,
+   @Nonnull SlotPoolGateway slotPoolGateway) {
+
+   this.slotSelectionStrategy = slotSelectionStrategy;
+   this.slotSharingManagersMap = slotSharingManagersMap;
+   this.slotPoolGateway = slotPoolGateway;
+   this.componentMainThreadExecutor = (runnable) -> {
+   throw new IllegalStateException("Main thread executor 
not initialized.");
+   };
+   this.componentMainThreadCheck = () -> {
+   throw new IllegalStateException("Main thread checker 
not initialized");
+   };
+   }
+
+   public void start(@Nonnull Executor mainThreadExecutor, @Nonnull 
BooleanSupplier mainThreadCheck) {
+   this.componentMainThreadExecutor = mainThreadExecutor;
+   this.componentMainThreadCheck = mainThreadCheck;
+   }

[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool

2018-10-23 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16660896#comment-16660896
 ] 

ASF GitHub Bot commented on FLINK-10431:


tillrohrmann commented on a change in pull request #6898: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler
URL: https://github.com/apache/flink/pull/6898#discussion_r227460500
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/Scheduler.java
 ##
 @@ -0,0 +1,516 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.SlotProfile;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.instance.SlotSharingGroupId;
+import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
+import org.apache.flink.runtime.jobmanager.scheduler.Locality;
+import 
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
+import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.SlotContext;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
+import org.apache.flink.runtime.jobmaster.SlotOwner;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.util.AbstractID;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.Executor;
+import java.util.function.BooleanSupplier;
+
+/**
+ * Scheduler that assigns tasks to slots. This class is currently work in 
progress, comments will be updated as we
+ * move forward.
+ */
+public class Scheduler implements SlotProvider, SlotOwner {
+
+   /** Logger */
+   private final Logger log = LoggerFactory.getLogger(getClass());
+
+   /** Strategy that selects the best slot for a given slot allocation 
request. */
+   @Nonnull
+   private final SlotSelectionStrategy slotSelectionStrategy;
+
+   /** Managers for the different slot sharing groups. */
+   @Nonnull
+   private final Map 
slotSharingManagersMap;
+
+   /** The slot pool from which slots are allocated. */
+   @Nonnull
+   private final SlotPoolGateway slotPoolGateway;
+
+   /** Executor for running tasks in the job master's main thread. */
+   @Nonnull
+   private Executor componentMainThreadExecutor;
+
+   /** Predicate to check if the current thread is the job master's main 
thread. */
+   @Nonnull
+   private BooleanSupplier componentMainThreadCheck;
+
+
+   public Scheduler(
+   @Nonnull Map 
slotSharingManagersMap,
+   @Nonnull SlotSelectionStrategy slotSelectionStrategy,
+   @Nonnull SlotPoolGateway slotPoolGateway) {
+
+   this.slotSelectionStrategy = slotSelectionStrategy;
+   this.slotSharingManagersMap = slotSharingManagersMap;
+   this.slotPoolGateway = slotPoolGateway;
+   this.componentMainThreadExecutor = (runnable) -> {
+   throw new IllegalStateException("Main thread executor 
not initialized.");
+   };
+   this.componentMainThreadCheck = () -> {
+   throw new IllegalStateException("Main thread checker 
not initialized");
+   };
+   }
+
+   public void start(@Nonnull Executor mainThreadExecutor, @Nonnull 
BooleanSupplier mainThreadCheck) {
+   this.componentMainThreadExecutor = mainThreadExecutor;
+   this.componentMainThreadCheck = mainThreadCheck;
+   }

[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool

2018-10-23 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16660893#comment-16660893
 ] 

ASF GitHub Bot commented on FLINK-10431:


tillrohrmann commented on a change in pull request #6898: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler
URL: https://github.com/apache/flink/pull/6898#discussion_r227453190
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/LocationPreferenceSlotSelection.java
 ##
 @@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.clusterframework.types.SlotProfile;
+import org.apache.flink.runtime.jobmanager.scheduler.Locality;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+
+import javax.annotation.Nonnull;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.BiFunction;
+
+/**
+ * This class implements a {@link SlotSelectionStrategy} that is based on 
location preference hints.
+ */
+public class LocationPreferenceSlotSelection implements SlotSelectionStrategy {
+
+   public static final LocationPreferenceSlotSelection INSTANCE = new 
LocationPreferenceSlotSelection();
+
+   private LocationPreferenceSlotSelection() {
+   }
+
+   /**
+* Calculates the candidate's locality score.
+*/
+   private static final BiFunction 
LOCALITY_EVALUATION_FUNCTION =
+   (localWeigh, hostLocalWeigh) -> localWeigh * 10 + 
hostLocalWeigh;
+
+   @Nonnull
+   @Override
+   public SlotInfoAndLocality selectBestSlotForProfile(
+   @Nonnull List availableSlots,
+   @Nonnull SlotProfile slotProfile) {
+
+   Collection locationPreferences = 
slotProfile.getPreferredLocations();
+
+   if (availableSlots.isEmpty()) {
+   return SlotInfoAndLocality.of(null, Locality.UNKNOWN);
+   }
+
+   final ResourceProfile resourceProfile = 
slotProfile.getResourceProfile();
+
+   // if we have no location preferences, we can only filter by 
the additional requirements.
+   if (locationPreferences.isEmpty()) {
+   for (SlotInfo candidate : availableSlots) {
+   if 
(candidate.getResourceProfile().isMatching(resourceProfile)) {
+   return 
SlotInfoAndLocality.of(candidate, Locality.UNCONSTRAINED);
+   }
+   }
+   return SlotInfoAndLocality.of(null, Locality.UNKNOWN);
+   }
+
+   // we build up two indexes, one for resource id and one for 
host names of the preferred locations.
+   final Map preferredResourceIDs = new 
HashMap<>(locationPreferences.size());
+   final Map preferredFQHostNames = new 
HashMap<>(locationPreferences.size());
+
+   for (TaskManagerLocation locationPreference : 
locationPreferences) {
+   
preferredResourceIDs.merge(locationPreference.getResourceID(), 1, Integer::sum);
+   
preferredFQHostNames.merge(locationPreference.getFQDNHostname(), 1, 
Integer::sum);
+   }
+
+   SlotInfo bestCandidate = null;
+   Locality bestCandidateLocality = Locality.UNKNOWN;
+   int bestCandidateScore = Integer.MIN_VALUE;
+
+   for (SlotInfo candidate : availableSlots) {
+
+   if 
(candidate.getResourceProfile().isMatching(resourceProfile)) {
+
+   // this gets candidate is local-weigh
+   Integer localWeigh = 
preferredResourceIDs.getOrDefault(candidate.getTaskManagerLocation().getResourceID(),
 0);
+
+

[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool

2018-10-23 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16660890#comment-16660890
 ] 

ASF GitHub Bot commented on FLINK-10431:


tillrohrmann commented on a change in pull request #6898: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler
URL: https://github.com/apache/flink/pull/6898#discussion_r227449157
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
 ##
 @@ -285,7 +290,9 @@ public JobMaster(
 
this.slotPool = 
checkNotNull(slotPoolFactory).createSlotPool(jobGraph.getJobID());
 
-   this.slotPoolGateway = 
slotPool.getSelfGateway(SlotPoolGateway.class);
+   this.slotPoolGateway = slotPool;
 
 Review comment:
   Can we remove this field?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Extract scheduling-related code from SlotPool
> -
>
> Key: FLINK-10431
> URL: https://issues.apache.org/jira/browse/FLINK-10431
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Affects Versions: 1.7.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
>
> The other half of the current scheduling logic is the management of slot 
> sharing and is located in the SlotPool. We need to extract this logic into 
> our new Scheduler component from the previous step. This leaves us with a 
> simpler SlotPool that mainly cares about obtaining, holding, and releasing 
> slots in interaction with a ResourceManager. The new Scheduler can now 
> identify slot sharing groups and interacts with the SlotPool.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool

2018-10-23 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16660889#comment-16660889
 ] 

ASF GitHub Bot commented on FLINK-10431:


tillrohrmann commented on a change in pull request #6898: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler
URL: https://github.com/apache/flink/pull/6898#discussion_r227450350
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/AllocatedSlotActions.java
 ##
 @@ -39,10 +38,23 @@
 * @param slotRequestId identifying the slot to release
 * @param slotSharingGroupId identifying the slot sharing group to 
which the slot belongs, null if none
 * @param cause of the slot release, null if none
-* @return Acknowledge (future) after the slot has been released
+* @return Acknowledge after the slot has been released
 */
-   CompletableFuture releaseSlot(
+   @Deprecated
+   Acknowledge releaseSlot(
 
 Review comment:
   Could be `void`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Extract scheduling-related code from SlotPool
> -
>
> Key: FLINK-10431
> URL: https://issues.apache.org/jira/browse/FLINK-10431
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Affects Versions: 1.7.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
>
> The other half of the current scheduling logic is the management of slot 
> sharing and is located in the SlotPool. We need to extract this logic into 
> our new Scheduler component from the previous step. This leaves us with a 
> simpler SlotPool that mainly cares about obtaining, holding, and releasing 
> slots in interaction with a ResourceManager. The new Scheduler can now 
> identify slot sharing groups and interacts with the SlotPool.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool

2018-10-23 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16660892#comment-16660892
 ] 

ASF GitHub Bot commented on FLINK-10431:


tillrohrmann commented on a change in pull request #6898: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler
URL: https://github.com/apache/flink/pull/6898#discussion_r227450458
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/AllocatedSlotActions.java
 ##
 @@ -39,10 +38,23 @@
 * @param slotRequestId identifying the slot to release
 * @param slotSharingGroupId identifying the slot sharing group to 
which the slot belongs, null if none
 * @param cause of the slot release, null if none
-* @return Acknowledge (future) after the slot has been released
+* @return Acknowledge after the slot has been released
 */
-   CompletableFuture releaseSlot(
+   @Deprecated
+   Acknowledge releaseSlot(
SlotRequestId slotRequestId,
@Nullable SlotSharingGroupId slotSharingGroupId,
@Nullable Throwable cause);
+
+   /**
+* Releases the slot with the given {@link SlotRequestId}. 
Additionally, one can provide a cause for the slot release.
+*
+* @param slotRequestId identifying the slot to release
+* @param cause of the slot release, null if none
+* @return Acknowledge after the slot has been released
+*/
+   @Nonnull
+   Acknowledge releaseSlot(
 
 Review comment:
   Same here with `void`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Extract scheduling-related code from SlotPool
> -
>
> Key: FLINK-10431
> URL: https://issues.apache.org/jira/browse/FLINK-10431
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Affects Versions: 1.7.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
>
> The other half of the current scheduling logic is the management of slot 
> sharing and is located in the SlotPool. We need to extract this logic into 
> our new Scheduler component from the previous step. This leaves us with a 
> simpler SlotPool that mainly cares about obtaining, holding, and releasing 
> slots in interaction with a ResourceManager. The new Scheduler can now 
> identify slot sharing groups and interacts with the SlotPool.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool

2018-10-23 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16660894#comment-16660894
 ] 

ASF GitHub Bot commented on FLINK-10431:


tillrohrmann commented on a change in pull request #6898: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler
URL: https://github.com/apache/flink/pull/6898#discussion_r227452749
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/LocationPreferenceSlotSelection.java
 ##
 @@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.clusterframework.types.SlotProfile;
+import org.apache.flink.runtime.jobmanager.scheduler.Locality;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+
+import javax.annotation.Nonnull;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.BiFunction;
+
+/**
+ * This class implements a {@link SlotSelectionStrategy} that is based on 
location preference hints.
+ */
+public class LocationPreferenceSlotSelection implements SlotSelectionStrategy {
+
+   public static final LocationPreferenceSlotSelection INSTANCE = new 
LocationPreferenceSlotSelection();
+
+   private LocationPreferenceSlotSelection() {
+   }
+
+   /**
+* Calculates the candidate's locality score.
+*/
+   private static final BiFunction 
LOCALITY_EVALUATION_FUNCTION =
+   (localWeigh, hostLocalWeigh) -> localWeigh * 10 + 
hostLocalWeigh;
+
+   @Nonnull
+   @Override
+   public SlotInfoAndLocality selectBestSlotForProfile(
+   @Nonnull List availableSlots,
+   @Nonnull SlotProfile slotProfile) {
+
+   Collection locationPreferences = 
slotProfile.getPreferredLocations();
+
+   if (availableSlots.isEmpty()) {
+   return SlotInfoAndLocality.of(null, Locality.UNKNOWN);
+   }
+
+   final ResourceProfile resourceProfile = 
slotProfile.getResourceProfile();
+
+   // if we have no location preferences, we can only filter by 
the additional requirements.
+   if (locationPreferences.isEmpty()) {
+   for (SlotInfo candidate : availableSlots) {
+   if 
(candidate.getResourceProfile().isMatching(resourceProfile)) {
+   return 
SlotInfoAndLocality.of(candidate, Locality.UNCONSTRAINED);
+   }
+   }
+   return SlotInfoAndLocality.of(null, Locality.UNKNOWN);
 
 Review comment:
   Factor out in separate method, e.g. `selectBestSlotByResourceProfile`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Extract scheduling-related code from SlotPool
> -
>
> Key: FLINK-10431
> URL: https://issues.apache.org/jira/browse/FLINK-10431
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Affects Versions: 1.7.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
>
> The other half of the current scheduling logic is the management of slot 
> sharing and is located in the SlotPool. We need to extract this logic into 
> our new Scheduler component from the previous step. This leaves us with a 
> simpler SlotPool that mainly cares about obtaining, holding, and 

[GitHub] tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler

2018-10-23 Thread GitBox
tillrohrmann commented on a change in pull request #6898: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler
URL: https://github.com/apache/flink/pull/6898#discussion_r227448691
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
 ##
 @@ -172,8 +172,8 @@ else if (ret instanceof CompletableFuture) {
}
 
@Override
-   public CompletableFuture cancelSlotRequest(SlotRequestId 
slotRequestId, @Nullable SlotSharingGroupId slotSharingGroupId, Throwable 
cause) {
-   return CompletableFuture.completedFuture(Acknowledge.get());
+   public Acknowledge cancelSlotRequest(SlotRequestId slotRequestId, 
@Nullable SlotSharingGroupId slotSharingGroupId, Throwable cause) {
 
 Review comment:
   I guess it could have a `void` return type.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler

2018-10-23 Thread GitBox
tillrohrmann commented on a change in pull request #6898: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler
URL: https://github.com/apache/flink/pull/6898#discussion_r227456417
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/Scheduler.java
 ##
 @@ -0,0 +1,516 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.SlotProfile;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.instance.SlotSharingGroupId;
+import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
+import org.apache.flink.runtime.jobmanager.scheduler.Locality;
+import 
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
+import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.SlotContext;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
+import org.apache.flink.runtime.jobmaster.SlotOwner;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.util.AbstractID;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.Executor;
+import java.util.function.BooleanSupplier;
+
+/**
+ * Scheduler that assigns tasks to slots. This class is currently work in 
progress, comments will be updated as we
+ * move forward.
+ */
+public class Scheduler implements SlotProvider, SlotOwner {
+
+   /** Logger */
+   private final Logger log = LoggerFactory.getLogger(getClass());
+
+   /** Strategy that selects the best slot for a given slot allocation 
request. */
+   @Nonnull
+   private final SlotSelectionStrategy slotSelectionStrategy;
+
+   /** Managers for the different slot sharing groups. */
+   @Nonnull
+   private final Map 
slotSharingManagersMap;
+
+   /** The slot pool from which slots are allocated. */
+   @Nonnull
+   private final SlotPoolGateway slotPoolGateway;
+
+   /** Executor for running tasks in the job master's main thread. */
+   @Nonnull
+   private Executor componentMainThreadExecutor;
+
+   /** Predicate to check if the current thread is the job master's main 
thread. */
+   @Nonnull
+   private BooleanSupplier componentMainThreadCheck;
+
+
+   public Scheduler(
+   @Nonnull Map 
slotSharingManagersMap,
+   @Nonnull SlotSelectionStrategy slotSelectionStrategy,
+   @Nonnull SlotPoolGateway slotPoolGateway) {
+
+   this.slotSelectionStrategy = slotSelectionStrategy;
+   this.slotSharingManagersMap = slotSharingManagersMap;
+   this.slotPoolGateway = slotPoolGateway;
+   this.componentMainThreadExecutor = (runnable) -> {
+   throw new IllegalStateException("Main thread executor 
not initialized.");
+   };
+   this.componentMainThreadCheck = () -> {
+   throw new IllegalStateException("Main thread checker 
not initialized");
+   };
+   }
+
+   public void start(@Nonnull Executor mainThreadExecutor, @Nonnull 
BooleanSupplier mainThreadCheck) {
+   this.componentMainThreadExecutor = mainThreadExecutor;
+   this.componentMainThreadCheck = mainThreadCheck;
+   }
+
+   //---
+
+   @Override
+   public CompletableFuture allocateSlot(
+   SlotRequestId slotRequestId,
+   ScheduledUnit scheduledUnit,
+   boolean 

[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool

2018-10-23 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16660887#comment-16660887
 ] 

ASF GitHub Bot commented on FLINK-10431:


tillrohrmann commented on a change in pull request #6898: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler
URL: https://github.com/apache/flink/pull/6898#discussion_r227448691
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
 ##
 @@ -172,8 +172,8 @@ else if (ret instanceof CompletableFuture) {
}
 
@Override
-   public CompletableFuture cancelSlotRequest(SlotRequestId 
slotRequestId, @Nullable SlotSharingGroupId slotSharingGroupId, Throwable 
cause) {
-   return CompletableFuture.completedFuture(Acknowledge.get());
+   public Acknowledge cancelSlotRequest(SlotRequestId slotRequestId, 
@Nullable SlotSharingGroupId slotSharingGroupId, Throwable cause) {
 
 Review comment:
   I guess it could have a `void` return type.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Extract scheduling-related code from SlotPool
> -
>
> Key: FLINK-10431
> URL: https://issues.apache.org/jira/browse/FLINK-10431
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Affects Versions: 1.7.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
>
> The other half of the current scheduling logic is the management of slot 
> sharing and is located in the SlotPool. We need to extract this logic into 
> our new Scheduler component from the previous step. This leaves us with a 
> simpler SlotPool that mainly cares about obtaining, holding, and releasing 
> slots in interaction with a ResourceManager. The new Scheduler can now 
> identify slot sharing groups and interacts with the SlotPool.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler

2018-10-23 Thread GitBox
tillrohrmann commented on a change in pull request #6898: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler
URL: https://github.com/apache/flink/pull/6898#discussion_r227460500
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/Scheduler.java
 ##
 @@ -0,0 +1,516 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.SlotProfile;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.instance.SlotSharingGroupId;
+import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
+import org.apache.flink.runtime.jobmanager.scheduler.Locality;
+import 
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
+import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.SlotContext;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
+import org.apache.flink.runtime.jobmaster.SlotOwner;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.util.AbstractID;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.Executor;
+import java.util.function.BooleanSupplier;
+
+/**
+ * Scheduler that assigns tasks to slots. This class is currently work in 
progress, comments will be updated as we
+ * move forward.
+ */
+public class Scheduler implements SlotProvider, SlotOwner {
+
+   /** Logger */
+   private final Logger log = LoggerFactory.getLogger(getClass());
+
+   /** Strategy that selects the best slot for a given slot allocation 
request. */
+   @Nonnull
+   private final SlotSelectionStrategy slotSelectionStrategy;
+
+   /** Managers for the different slot sharing groups. */
+   @Nonnull
+   private final Map 
slotSharingManagersMap;
+
+   /** The slot pool from which slots are allocated. */
+   @Nonnull
+   private final SlotPoolGateway slotPoolGateway;
+
+   /** Executor for running tasks in the job master's main thread. */
+   @Nonnull
+   private Executor componentMainThreadExecutor;
+
+   /** Predicate to check if the current thread is the job master's main 
thread. */
+   @Nonnull
+   private BooleanSupplier componentMainThreadCheck;
+
+
+   public Scheduler(
+   @Nonnull Map 
slotSharingManagersMap,
+   @Nonnull SlotSelectionStrategy slotSelectionStrategy,
+   @Nonnull SlotPoolGateway slotPoolGateway) {
+
+   this.slotSelectionStrategy = slotSelectionStrategy;
+   this.slotSharingManagersMap = slotSharingManagersMap;
+   this.slotPoolGateway = slotPoolGateway;
+   this.componentMainThreadExecutor = (runnable) -> {
+   throw new IllegalStateException("Main thread executor 
not initialized.");
+   };
+   this.componentMainThreadCheck = () -> {
+   throw new IllegalStateException("Main thread checker 
not initialized");
+   };
+   }
+
+   public void start(@Nonnull Executor mainThreadExecutor, @Nonnull 
BooleanSupplier mainThreadCheck) {
+   this.componentMainThreadExecutor = mainThreadExecutor;
+   this.componentMainThreadCheck = mainThreadCheck;
+   }
+
+   //---
+
+   @Override
+   public CompletableFuture allocateSlot(
+   SlotRequestId slotRequestId,
+   ScheduledUnit scheduledUnit,
+   boolean 

[GitHub] tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler

2018-10-23 Thread GitBox
tillrohrmann commented on a change in pull request #6898: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler
URL: https://github.com/apache/flink/pull/6898#discussion_r227454366
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PreviousAllocationSlotSelectionStrategy.java
 ##
 @@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.SlotProfile;
+import org.apache.flink.runtime.jobmanager.scheduler.Locality;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
+
+import javax.annotation.Nonnull;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * This class implements a {@link SlotSelectionStrategy} that is based on 
previous allocations and
+ * falls back to using location preference hints if there is no previous 
allocation.
+ */
+public class PreviousAllocationSlotSelectionStrategy implements 
SlotSelectionStrategy {
+
+   public static final PreviousAllocationSlotSelectionStrategy INSTANCE =
+   new PreviousAllocationSlotSelectionStrategy();
+
+   private final LocationPreferenceSlotSelection 
locationPreferenceSlotSelection;
+
+   private PreviousAllocationSlotSelectionStrategy() {
+   this.locationPreferenceSlotSelection = 
LocationPreferenceSlotSelection.INSTANCE;
+   }
+
+   @Nonnull
+   @Override
+   public SlotInfoAndLocality selectBestSlotForProfile(
+   @Nonnull List availableSlots,
+   @Nonnull SlotProfile slotProfile) {
+
+   Collection priorAllocations = 
slotProfile.getPreferredAllocations();
+
+   // First, if there was a prior allocation try to schedule to 
the same/old slot
+   if (!priorAllocations.isEmpty()) {
+
+   HashSet priorAllocationsSet = new 
HashSet<>(priorAllocations);
 
 Review comment:
   Why are we creating a new `HashSet` here?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool

2018-10-23 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16660897#comment-16660897
 ] 

ASF GitHub Bot commented on FLINK-10431:


tillrohrmann commented on a change in pull request #6898: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler
URL: https://github.com/apache/flink/pull/6898#discussion_r227459571
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/Scheduler.java
 ##
 @@ -0,0 +1,516 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.SlotProfile;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.instance.SlotSharingGroupId;
+import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
+import org.apache.flink.runtime.jobmanager.scheduler.Locality;
+import 
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
+import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.SlotContext;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
+import org.apache.flink.runtime.jobmaster.SlotOwner;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.util.AbstractID;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.Executor;
+import java.util.function.BooleanSupplier;
+
+/**
+ * Scheduler that assigns tasks to slots. This class is currently work in 
progress, comments will be updated as we
+ * move forward.
+ */
+public class Scheduler implements SlotProvider, SlotOwner {
+
+   /** Logger */
+   private final Logger log = LoggerFactory.getLogger(getClass());
+
+   /** Strategy that selects the best slot for a given slot allocation 
request. */
+   @Nonnull
+   private final SlotSelectionStrategy slotSelectionStrategy;
+
+   /** Managers for the different slot sharing groups. */
+   @Nonnull
+   private final Map 
slotSharingManagersMap;
+
+   /** The slot pool from which slots are allocated. */
+   @Nonnull
+   private final SlotPoolGateway slotPoolGateway;
+
+   /** Executor for running tasks in the job master's main thread. */
+   @Nonnull
+   private Executor componentMainThreadExecutor;
+
+   /** Predicate to check if the current thread is the job master's main 
thread. */
+   @Nonnull
+   private BooleanSupplier componentMainThreadCheck;
+
+
+   public Scheduler(
+   @Nonnull Map 
slotSharingManagersMap,
+   @Nonnull SlotSelectionStrategy slotSelectionStrategy,
+   @Nonnull SlotPoolGateway slotPoolGateway) {
+
+   this.slotSelectionStrategy = slotSelectionStrategy;
+   this.slotSharingManagersMap = slotSharingManagersMap;
+   this.slotPoolGateway = slotPoolGateway;
+   this.componentMainThreadExecutor = (runnable) -> {
+   throw new IllegalStateException("Main thread executor 
not initialized.");
+   };
+   this.componentMainThreadCheck = () -> {
+   throw new IllegalStateException("Main thread checker 
not initialized");
+   };
+   }
+
+   public void start(@Nonnull Executor mainThreadExecutor, @Nonnull 
BooleanSupplier mainThreadCheck) {
+   this.componentMainThreadExecutor = mainThreadExecutor;
+   this.componentMainThreadCheck = mainThreadCheck;
+   }

[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool

2018-10-23 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16660900#comment-16660900
 ] 

ASF GitHub Bot commented on FLINK-10431:


tillrohrmann commented on a change in pull request #6898: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler
URL: https://github.com/apache/flink/pull/6898#discussion_r227454366
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PreviousAllocationSlotSelectionStrategy.java
 ##
 @@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.SlotProfile;
+import org.apache.flink.runtime.jobmanager.scheduler.Locality;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
+
+import javax.annotation.Nonnull;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * This class implements a {@link SlotSelectionStrategy} that is based on 
previous allocations and
+ * falls back to using location preference hints if there is no previous 
allocation.
+ */
+public class PreviousAllocationSlotSelectionStrategy implements 
SlotSelectionStrategy {
+
+   public static final PreviousAllocationSlotSelectionStrategy INSTANCE =
+   new PreviousAllocationSlotSelectionStrategy();
+
+   private final LocationPreferenceSlotSelection 
locationPreferenceSlotSelection;
+
+   private PreviousAllocationSlotSelectionStrategy() {
+   this.locationPreferenceSlotSelection = 
LocationPreferenceSlotSelection.INSTANCE;
+   }
+
+   @Nonnull
+   @Override
+   public SlotInfoAndLocality selectBestSlotForProfile(
+   @Nonnull List availableSlots,
+   @Nonnull SlotProfile slotProfile) {
+
+   Collection priorAllocations = 
slotProfile.getPreferredAllocations();
+
+   // First, if there was a prior allocation try to schedule to 
the same/old slot
+   if (!priorAllocations.isEmpty()) {
+
+   HashSet priorAllocationsSet = new 
HashSet<>(priorAllocations);
 
 Review comment:
   Why are we creating a new `HashSet` here?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Extract scheduling-related code from SlotPool
> -
>
> Key: FLINK-10431
> URL: https://issues.apache.org/jira/browse/FLINK-10431
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Affects Versions: 1.7.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
>
> The other half of the current scheduling logic is the management of slot 
> sharing and is located in the SlotPool. We need to extract this logic into 
> our new Scheduler component from the previous step. This leaves us with a 
> simpler SlotPool that mainly cares about obtaining, holding, and releasing 
> slots in interaction with a ResourceManager. The new Scheduler can now 
> identify slot sharing groups and interacts with the SlotPool.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler

2018-10-23 Thread GitBox
tillrohrmann commented on a change in pull request #6898: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler
URL: https://github.com/apache/flink/pull/6898#discussion_r227449157
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
 ##
 @@ -285,7 +290,9 @@ public JobMaster(
 
this.slotPool = 
checkNotNull(slotPoolFactory).createSlotPool(jobGraph.getJobID());
 
-   this.slotPoolGateway = 
slotPool.getSelfGateway(SlotPoolGateway.class);
+   this.slotPoolGateway = slotPool;
 
 Review comment:
   Can we remove this field?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler

2018-10-23 Thread GitBox
tillrohrmann commented on a change in pull request #6898: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler
URL: https://github.com/apache/flink/pull/6898#discussion_r227450458
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/AllocatedSlotActions.java
 ##
 @@ -39,10 +38,23 @@
 * @param slotRequestId identifying the slot to release
 * @param slotSharingGroupId identifying the slot sharing group to 
which the slot belongs, null if none
 * @param cause of the slot release, null if none
-* @return Acknowledge (future) after the slot has been released
+* @return Acknowledge after the slot has been released
 */
-   CompletableFuture releaseSlot(
+   @Deprecated
+   Acknowledge releaseSlot(
SlotRequestId slotRequestId,
@Nullable SlotSharingGroupId slotSharingGroupId,
@Nullable Throwable cause);
+
+   /**
+* Releases the slot with the given {@link SlotRequestId}. 
Additionally, one can provide a cause for the slot release.
+*
+* @param slotRequestId identifying the slot to release
+* @param cause of the slot release, null if none
+* @return Acknowledge after the slot has been released
+*/
+   @Nonnull
+   Acknowledge releaseSlot(
 
 Review comment:
   Same here with `void`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool

2018-10-23 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16660901#comment-16660901
 ] 

ASF GitHub Bot commented on FLINK-10431:


tillrohrmann commented on a change in pull request #6898: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler
URL: https://github.com/apache/flink/pull/6898#discussion_r227456417
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/Scheduler.java
 ##
 @@ -0,0 +1,516 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.SlotProfile;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.instance.SlotSharingGroupId;
+import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
+import org.apache.flink.runtime.jobmanager.scheduler.Locality;
+import 
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
+import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.SlotContext;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
+import org.apache.flink.runtime.jobmaster.SlotOwner;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.util.AbstractID;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.Executor;
+import java.util.function.BooleanSupplier;
+
+/**
+ * Scheduler that assigns tasks to slots. This class is currently work in 
progress, comments will be updated as we
+ * move forward.
+ */
+public class Scheduler implements SlotProvider, SlotOwner {
+
+   /** Logger */
+   private final Logger log = LoggerFactory.getLogger(getClass());
+
+   /** Strategy that selects the best slot for a given slot allocation 
request. */
+   @Nonnull
+   private final SlotSelectionStrategy slotSelectionStrategy;
+
+   /** Managers for the different slot sharing groups. */
+   @Nonnull
+   private final Map 
slotSharingManagersMap;
+
+   /** The slot pool from which slots are allocated. */
+   @Nonnull
+   private final SlotPoolGateway slotPoolGateway;
+
+   /** Executor for running tasks in the job master's main thread. */
+   @Nonnull
+   private Executor componentMainThreadExecutor;
+
+   /** Predicate to check if the current thread is the job master's main 
thread. */
+   @Nonnull
+   private BooleanSupplier componentMainThreadCheck;
+
+
+   public Scheduler(
+   @Nonnull Map 
slotSharingManagersMap,
+   @Nonnull SlotSelectionStrategy slotSelectionStrategy,
+   @Nonnull SlotPoolGateway slotPoolGateway) {
+
+   this.slotSelectionStrategy = slotSelectionStrategy;
+   this.slotSharingManagersMap = slotSharingManagersMap;
+   this.slotPoolGateway = slotPoolGateway;
+   this.componentMainThreadExecutor = (runnable) -> {
+   throw new IllegalStateException("Main thread executor 
not initialized.");
+   };
+   this.componentMainThreadCheck = () -> {
+   throw new IllegalStateException("Main thread checker 
not initialized");
+   };
+   }
+
+   public void start(@Nonnull Executor mainThreadExecutor, @Nonnull 
BooleanSupplier mainThreadCheck) {
+   this.componentMainThreadExecutor = mainThreadExecutor;
+   this.componentMainThreadCheck = mainThreadCheck;
+   }

[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool

2018-10-23 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16660891#comment-16660891
 ] 

ASF GitHub Bot commented on FLINK-10431:


tillrohrmann commented on a change in pull request #6898: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler
URL: https://github.com/apache/flink/pull/6898#discussion_r227451887
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/LocationPreferenceSlotSelection.java
 ##
 @@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.clusterframework.types.SlotProfile;
+import org.apache.flink.runtime.jobmanager.scheduler.Locality;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+
+import javax.annotation.Nonnull;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.BiFunction;
+
+/**
+ * This class implements a {@link SlotSelectionStrategy} that is based on 
location preference hints.
+ */
+public class LocationPreferenceSlotSelection implements SlotSelectionStrategy {
 
 Review comment:
   Singleton's are best being implemented by using an 
   ```
   enum LocationPreferenceSlotSelection implements SlotSelectionStrategy { 
   INSTANCE; 
   @Override
   public SlotInfoAndLocality selectBestSlotForProfile(
   }
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Extract scheduling-related code from SlotPool
> -
>
> Key: FLINK-10431
> URL: https://issues.apache.org/jira/browse/FLINK-10431
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Affects Versions: 1.7.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
>
> The other half of the current scheduling logic is the management of slot 
> sharing and is located in the SlotPool. We need to extract this logic into 
> our new Scheduler component from the previous step. This leaves us with a 
> simpler SlotPool that mainly cares about obtaining, holding, and releasing 
> slots in interaction with a ResourceManager. The new Scheduler can now 
> identify slot sharing groups and interacts with the SlotPool.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool

2018-10-23 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16660898#comment-16660898
 ] 

ASF GitHub Bot commented on FLINK-10431:


tillrohrmann commented on a change in pull request #6898: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler
URL: https://github.com/apache/flink/pull/6898#discussion_r227453757
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PreviousAllocationSlotSelectionStrategy.java
 ##
 @@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.SlotProfile;
+import org.apache.flink.runtime.jobmanager.scheduler.Locality;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
+
+import javax.annotation.Nonnull;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * This class implements a {@link SlotSelectionStrategy} that is based on 
previous allocations and
+ * falls back to using location preference hints if there is no previous 
allocation.
+ */
+public class PreviousAllocationSlotSelectionStrategy implements 
SlotSelectionStrategy {
 
 Review comment:
   Same comment concerning singleton's and using an `enum` for that.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Extract scheduling-related code from SlotPool
> -
>
> Key: FLINK-10431
> URL: https://issues.apache.org/jira/browse/FLINK-10431
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Affects Versions: 1.7.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
>
> The other half of the current scheduling logic is the management of slot 
> sharing and is located in the SlotPool. We need to extract this logic into 
> our new Scheduler component from the previous step. This leaves us with a 
> simpler SlotPool that mainly cares about obtaining, holding, and releasing 
> slots in interaction with a ResourceManager. The new Scheduler can now 
> identify slot sharing groups and interacts with the SlotPool.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler

2018-10-23 Thread GitBox
tillrohrmann commented on a change in pull request #6898: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler
URL: https://github.com/apache/flink/pull/6898#discussion_r227453190
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/LocationPreferenceSlotSelection.java
 ##
 @@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.clusterframework.types.SlotProfile;
+import org.apache.flink.runtime.jobmanager.scheduler.Locality;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+
+import javax.annotation.Nonnull;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.BiFunction;
+
+/**
+ * This class implements a {@link SlotSelectionStrategy} that is based on 
location preference hints.
+ */
+public class LocationPreferenceSlotSelection implements SlotSelectionStrategy {
+
+   public static final LocationPreferenceSlotSelection INSTANCE = new 
LocationPreferenceSlotSelection();
+
+   private LocationPreferenceSlotSelection() {
+   }
+
+   /**
+* Calculates the candidate's locality score.
+*/
+   private static final BiFunction 
LOCALITY_EVALUATION_FUNCTION =
+   (localWeigh, hostLocalWeigh) -> localWeigh * 10 + 
hostLocalWeigh;
+
+   @Nonnull
+   @Override
+   public SlotInfoAndLocality selectBestSlotForProfile(
+   @Nonnull List availableSlots,
+   @Nonnull SlotProfile slotProfile) {
+
+   Collection locationPreferences = 
slotProfile.getPreferredLocations();
+
+   if (availableSlots.isEmpty()) {
+   return SlotInfoAndLocality.of(null, Locality.UNKNOWN);
+   }
+
+   final ResourceProfile resourceProfile = 
slotProfile.getResourceProfile();
+
+   // if we have no location preferences, we can only filter by 
the additional requirements.
+   if (locationPreferences.isEmpty()) {
+   for (SlotInfo candidate : availableSlots) {
+   if 
(candidate.getResourceProfile().isMatching(resourceProfile)) {
+   return 
SlotInfoAndLocality.of(candidate, Locality.UNCONSTRAINED);
+   }
+   }
+   return SlotInfoAndLocality.of(null, Locality.UNKNOWN);
+   }
+
+   // we build up two indexes, one for resource id and one for 
host names of the preferred locations.
+   final Map preferredResourceIDs = new 
HashMap<>(locationPreferences.size());
+   final Map preferredFQHostNames = new 
HashMap<>(locationPreferences.size());
+
+   for (TaskManagerLocation locationPreference : 
locationPreferences) {
+   
preferredResourceIDs.merge(locationPreference.getResourceID(), 1, Integer::sum);
+   
preferredFQHostNames.merge(locationPreference.getFQDNHostname(), 1, 
Integer::sum);
+   }
+
+   SlotInfo bestCandidate = null;
+   Locality bestCandidateLocality = Locality.UNKNOWN;
+   int bestCandidateScore = Integer.MIN_VALUE;
+
+   for (SlotInfo candidate : availableSlots) {
+
+   if 
(candidate.getResourceProfile().isMatching(resourceProfile)) {
+
+   // this gets candidate is local-weigh
+   Integer localWeigh = 
preferredResourceIDs.getOrDefault(candidate.getTaskManagerLocation().getResourceID(),
 0);
+
+   // this gets candidate is host-local-weigh
+   Integer hostLocalWeigh = 
preferredFQHostNames.getOrDefault(candidate.getTaskManagerLocation().getFQDNHostname(),
 0);
+
+   

[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool

2018-10-23 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16660899#comment-16660899
 ] 

ASF GitHub Bot commented on FLINK-10431:


tillrohrmann commented on a change in pull request #6898: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler
URL: https://github.com/apache/flink/pull/6898#discussion_r227455130
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PreviousAllocationSlotSelectionStrategy.java
 ##
 @@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.SlotProfile;
+import org.apache.flink.runtime.jobmanager.scheduler.Locality;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
+
+import javax.annotation.Nonnull;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * This class implements a {@link SlotSelectionStrategy} that is based on 
previous allocations and
+ * falls back to using location preference hints if there is no previous 
allocation.
+ */
+public class PreviousAllocationSlotSelectionStrategy implements 
SlotSelectionStrategy {
+
+   public static final PreviousAllocationSlotSelectionStrategy INSTANCE =
+   new PreviousAllocationSlotSelectionStrategy();
+
+   private final LocationPreferenceSlotSelection 
locationPreferenceSlotSelection;
+
+   private PreviousAllocationSlotSelectionStrategy() {
+   this.locationPreferenceSlotSelection = 
LocationPreferenceSlotSelection.INSTANCE;
+   }
+
+   @Nonnull
+   @Override
+   public SlotInfoAndLocality selectBestSlotForProfile(
+   @Nonnull List availableSlots,
+   @Nonnull SlotProfile slotProfile) {
+
+   Collection priorAllocations = 
slotProfile.getPreferredAllocations();
+
+   // First, if there was a prior allocation try to schedule to 
the same/old slot
+   if (!priorAllocations.isEmpty()) {
+
+   HashSet priorAllocationsSet = new 
HashSet<>(priorAllocations);
+   for (SlotInfo availableSlot : availableSlots) {
+   if 
(priorAllocationsSet.contains(availableSlot.getAllocationId())) {
+   return 
SlotInfoAndLocality.of(availableSlot, Locality.LOCAL);
+   }
+   }
+   }
+
+   // Second, select based on location preference, excluding 
blacklisted allocations
+   Set blackListedAllocations = 
slotProfile.getPreviousExecutionGraphAllocations();
+   if (blackListedAllocations.isEmpty()) {
+   return 
locationPreferenceSlotSelection.selectBestSlotForProfile(availableSlots, 
slotProfile);
+   } else {
+   ArrayList availableAndAllowedSlots = new 
ArrayList<>(availableSlots.size());
+   for (SlotInfo availableSlot : availableSlots) {
+   if 
(!blackListedAllocations.contains(availableSlot.getAllocationId())) {
+   
availableAndAllowedSlots.add(availableSlot);
+   }
+   }
+   return 
locationPreferenceSlotSelection.selectBestSlotForProfile(availableAndAllowedSlots,
 slotProfile);
+   }
 
 Review comment:
   I would move 
`locationPreferenceSlotSelection.selectBestSlotForProfile(availableAndAllowedSlots,
 slotProfile)` out of the if-else blocks and only do the filtering of 
`availableSlots` in the branches. This reduces a bit code duplication.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about 

[GitHub] tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler

2018-10-23 Thread GitBox
tillrohrmann commented on a change in pull request #6898: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler
URL: https://github.com/apache/flink/pull/6898#discussion_r227451887
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/LocationPreferenceSlotSelection.java
 ##
 @@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.clusterframework.types.SlotProfile;
+import org.apache.flink.runtime.jobmanager.scheduler.Locality;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+
+import javax.annotation.Nonnull;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.BiFunction;
+
+/**
+ * This class implements a {@link SlotSelectionStrategy} that is based on 
location preference hints.
+ */
+public class LocationPreferenceSlotSelection implements SlotSelectionStrategy {
 
 Review comment:
   Singleton's are best being implemented by using an 
   ```
   enum LocationPreferenceSlotSelection implements SlotSelectionStrategy { 
   INSTANCE; 
   @Override
   public SlotInfoAndLocality selectBestSlotForProfile(
   }
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler

2018-10-23 Thread GitBox
tillrohrmann commented on a change in pull request #6898: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler
URL: https://github.com/apache/flink/pull/6898#discussion_r227459571
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/Scheduler.java
 ##
 @@ -0,0 +1,516 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.SlotProfile;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.instance.SlotSharingGroupId;
+import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
+import org.apache.flink.runtime.jobmanager.scheduler.Locality;
+import 
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
+import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.SlotContext;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
+import org.apache.flink.runtime.jobmaster.SlotOwner;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.util.AbstractID;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.Executor;
+import java.util.function.BooleanSupplier;
+
+/**
+ * Scheduler that assigns tasks to slots. This class is currently work in 
progress, comments will be updated as we
+ * move forward.
+ */
+public class Scheduler implements SlotProvider, SlotOwner {
+
+   /** Logger */
+   private final Logger log = LoggerFactory.getLogger(getClass());
+
+   /** Strategy that selects the best slot for a given slot allocation 
request. */
+   @Nonnull
+   private final SlotSelectionStrategy slotSelectionStrategy;
+
+   /** Managers for the different slot sharing groups. */
+   @Nonnull
+   private final Map 
slotSharingManagersMap;
+
+   /** The slot pool from which slots are allocated. */
+   @Nonnull
+   private final SlotPoolGateway slotPoolGateway;
+
+   /** Executor for running tasks in the job master's main thread. */
+   @Nonnull
+   private Executor componentMainThreadExecutor;
+
+   /** Predicate to check if the current thread is the job master's main 
thread. */
+   @Nonnull
+   private BooleanSupplier componentMainThreadCheck;
+
+
+   public Scheduler(
+   @Nonnull Map 
slotSharingManagersMap,
+   @Nonnull SlotSelectionStrategy slotSelectionStrategy,
+   @Nonnull SlotPoolGateway slotPoolGateway) {
+
+   this.slotSelectionStrategy = slotSelectionStrategy;
+   this.slotSharingManagersMap = slotSharingManagersMap;
+   this.slotPoolGateway = slotPoolGateway;
+   this.componentMainThreadExecutor = (runnable) -> {
+   throw new IllegalStateException("Main thread executor 
not initialized.");
+   };
+   this.componentMainThreadCheck = () -> {
+   throw new IllegalStateException("Main thread checker 
not initialized");
+   };
+   }
+
+   public void start(@Nonnull Executor mainThreadExecutor, @Nonnull 
BooleanSupplier mainThreadCheck) {
+   this.componentMainThreadExecutor = mainThreadExecutor;
+   this.componentMainThreadCheck = mainThreadCheck;
+   }
+
+   //---
+
+   @Override
+   public CompletableFuture allocateSlot(
+   SlotRequestId slotRequestId,
+   ScheduledUnit scheduledUnit,
+   boolean 

[GitHub] tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler

2018-10-23 Thread GitBox
tillrohrmann commented on a change in pull request #6898: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler
URL: https://github.com/apache/flink/pull/6898#discussion_r227452749
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/LocationPreferenceSlotSelection.java
 ##
 @@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.clusterframework.types.SlotProfile;
+import org.apache.flink.runtime.jobmanager.scheduler.Locality;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+
+import javax.annotation.Nonnull;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.BiFunction;
+
+/**
+ * This class implements a {@link SlotSelectionStrategy} that is based on 
location preference hints.
+ */
+public class LocationPreferenceSlotSelection implements SlotSelectionStrategy {
+
+   public static final LocationPreferenceSlotSelection INSTANCE = new 
LocationPreferenceSlotSelection();
+
+   private LocationPreferenceSlotSelection() {
+   }
+
+   /**
+* Calculates the candidate's locality score.
+*/
+   private static final BiFunction 
LOCALITY_EVALUATION_FUNCTION =
+   (localWeigh, hostLocalWeigh) -> localWeigh * 10 + 
hostLocalWeigh;
+
+   @Nonnull
+   @Override
+   public SlotInfoAndLocality selectBestSlotForProfile(
+   @Nonnull List availableSlots,
+   @Nonnull SlotProfile slotProfile) {
+
+   Collection locationPreferences = 
slotProfile.getPreferredLocations();
+
+   if (availableSlots.isEmpty()) {
+   return SlotInfoAndLocality.of(null, Locality.UNKNOWN);
+   }
+
+   final ResourceProfile resourceProfile = 
slotProfile.getResourceProfile();
+
+   // if we have no location preferences, we can only filter by 
the additional requirements.
+   if (locationPreferences.isEmpty()) {
+   for (SlotInfo candidate : availableSlots) {
+   if 
(candidate.getResourceProfile().isMatching(resourceProfile)) {
+   return 
SlotInfoAndLocality.of(candidate, Locality.UNCONSTRAINED);
+   }
+   }
+   return SlotInfoAndLocality.of(null, Locality.UNKNOWN);
 
 Review comment:
   Factor out in separate method, e.g. `selectBestSlotByResourceProfile`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler

2018-10-23 Thread GitBox
tillrohrmann commented on a change in pull request #6898: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler
URL: https://github.com/apache/flink/pull/6898#discussion_r227450833
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultSchedulerFactory.java
 ##
 @@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.Configuration;
+
+import javax.annotation.Nonnull;
+
+import java.util.HashMap;
+
+/**
+ * Default implementation of a {@link SchedulerFactory}.
+ */
+public class DefaultSchedulerFactory implements SchedulerFactory {
+
+   @Nonnull
+   private final SlotSelectionStrategy slotSelectionStrategy;
+
+   public DefaultSchedulerFactory(@Nonnull SlotSelectionStrategy 
slotSelectionStrategy) {
+   this.slotSelectionStrategy = slotSelectionStrategy;
+   }
+
+   @Nonnull
+   @Override
+   public Scheduler createScheduler(@Nonnull SlotPoolGateway 
slotPoolGateway) {
+   return new Scheduler(new HashMap<>(), slotSelectionStrategy, 
slotPoolGateway);
 
 Review comment:
   What could be a reasonable initial size of the `HashMap`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler

2018-10-23 Thread GitBox
tillrohrmann commented on a change in pull request #6898: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler
URL: https://github.com/apache/flink/pull/6898#discussion_r227455130
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PreviousAllocationSlotSelectionStrategy.java
 ##
 @@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.SlotProfile;
+import org.apache.flink.runtime.jobmanager.scheduler.Locality;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
+
+import javax.annotation.Nonnull;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * This class implements a {@link SlotSelectionStrategy} that is based on 
previous allocations and
+ * falls back to using location preference hints if there is no previous 
allocation.
+ */
+public class PreviousAllocationSlotSelectionStrategy implements 
SlotSelectionStrategy {
+
+   public static final PreviousAllocationSlotSelectionStrategy INSTANCE =
+   new PreviousAllocationSlotSelectionStrategy();
+
+   private final LocationPreferenceSlotSelection 
locationPreferenceSlotSelection;
+
+   private PreviousAllocationSlotSelectionStrategy() {
+   this.locationPreferenceSlotSelection = 
LocationPreferenceSlotSelection.INSTANCE;
+   }
+
+   @Nonnull
+   @Override
+   public SlotInfoAndLocality selectBestSlotForProfile(
+   @Nonnull List availableSlots,
+   @Nonnull SlotProfile slotProfile) {
+
+   Collection priorAllocations = 
slotProfile.getPreferredAllocations();
+
+   // First, if there was a prior allocation try to schedule to 
the same/old slot
+   if (!priorAllocations.isEmpty()) {
+
+   HashSet priorAllocationsSet = new 
HashSet<>(priorAllocations);
+   for (SlotInfo availableSlot : availableSlots) {
+   if 
(priorAllocationsSet.contains(availableSlot.getAllocationId())) {
+   return 
SlotInfoAndLocality.of(availableSlot, Locality.LOCAL);
+   }
+   }
+   }
+
+   // Second, select based on location preference, excluding 
blacklisted allocations
+   Set blackListedAllocations = 
slotProfile.getPreviousExecutionGraphAllocations();
+   if (blackListedAllocations.isEmpty()) {
+   return 
locationPreferenceSlotSelection.selectBestSlotForProfile(availableSlots, 
slotProfile);
+   } else {
+   ArrayList availableAndAllowedSlots = new 
ArrayList<>(availableSlots.size());
+   for (SlotInfo availableSlot : availableSlots) {
+   if 
(!blackListedAllocations.contains(availableSlot.getAllocationId())) {
+   
availableAndAllowedSlots.add(availableSlot);
+   }
+   }
+   return 
locationPreferenceSlotSelection.selectBestSlotForProfile(availableAndAllowedSlots,
 slotProfile);
+   }
 
 Review comment:
   I would move 
`locationPreferenceSlotSelection.selectBestSlotForProfile(availableAndAllowedSlots,
 slotProfile)` out of the if-else blocks and only do the filtering of 
`availableSlots` in the branches. This reduces a bit code duplication.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler

2018-10-23 Thread GitBox
tillrohrmann commented on a change in pull request #6898: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler
URL: https://github.com/apache/flink/pull/6898#discussion_r227450350
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/AllocatedSlotActions.java
 ##
 @@ -39,10 +38,23 @@
 * @param slotRequestId identifying the slot to release
 * @param slotSharingGroupId identifying the slot sharing group to 
which the slot belongs, null if none
 * @param cause of the slot release, null if none
-* @return Acknowledge (future) after the slot has been released
+* @return Acknowledge after the slot has been released
 */
-   CompletableFuture releaseSlot(
+   @Deprecated
+   Acknowledge releaseSlot(
 
 Review comment:
   Could be `void`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler

2018-10-23 Thread GitBox
tillrohrmann commented on a change in pull request #6898: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler
URL: https://github.com/apache/flink/pull/6898#discussion_r227456207
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/Scheduler.java
 ##
 @@ -0,0 +1,516 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.SlotProfile;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.instance.SlotSharingGroupId;
+import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
+import org.apache.flink.runtime.jobmanager.scheduler.Locality;
+import 
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
+import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.SlotContext;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
+import org.apache.flink.runtime.jobmaster.SlotOwner;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.util.AbstractID;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.Executor;
+import java.util.function.BooleanSupplier;
+
+/**
+ * Scheduler that assigns tasks to slots. This class is currently work in 
progress, comments will be updated as we
+ * move forward.
+ */
+public class Scheduler implements SlotProvider, SlotOwner {
+
+   /** Logger */
+   private final Logger log = LoggerFactory.getLogger(getClass());
+
+   /** Strategy that selects the best slot for a given slot allocation 
request. */
+   @Nonnull
+   private final SlotSelectionStrategy slotSelectionStrategy;
+
+   /** Managers for the different slot sharing groups. */
+   @Nonnull
+   private final Map 
slotSharingManagersMap;
+
+   /** The slot pool from which slots are allocated. */
+   @Nonnull
+   private final SlotPoolGateway slotPoolGateway;
+
+   /** Executor for running tasks in the job master's main thread. */
+   @Nonnull
+   private Executor componentMainThreadExecutor;
+
+   /** Predicate to check if the current thread is the job master's main 
thread. */
+   @Nonnull
+   private BooleanSupplier componentMainThreadCheck;
+
+
+   public Scheduler(
+   @Nonnull Map 
slotSharingManagersMap,
+   @Nonnull SlotSelectionStrategy slotSelectionStrategy,
+   @Nonnull SlotPoolGateway slotPoolGateway) {
+
+   this.slotSelectionStrategy = slotSelectionStrategy;
+   this.slotSharingManagersMap = slotSharingManagersMap;
+   this.slotPoolGateway = slotPoolGateway;
+   this.componentMainThreadExecutor = (runnable) -> {
+   throw new IllegalStateException("Main thread executor 
not initialized.");
+   };
+   this.componentMainThreadCheck = () -> {
+   throw new IllegalStateException("Main thread checker 
not initialized");
+   };
+   }
+
+   public void start(@Nonnull Executor mainThreadExecutor, @Nonnull 
BooleanSupplier mainThreadCheck) {
+   this.componentMainThreadExecutor = mainThreadExecutor;
+   this.componentMainThreadCheck = mainThreadCheck;
+   }
+
+   //---
+
+   @Override
+   public CompletableFuture allocateSlot(
+   SlotRequestId slotRequestId,
+   ScheduledUnit scheduledUnit,
+   boolean 

[GitHub] tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler

2018-10-23 Thread GitBox
tillrohrmann commented on a change in pull request #6898: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler
URL: https://github.com/apache/flink/pull/6898#discussion_r227453757
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PreviousAllocationSlotSelectionStrategy.java
 ##
 @@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.SlotProfile;
+import org.apache.flink.runtime.jobmanager.scheduler.Locality;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
+
+import javax.annotation.Nonnull;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * This class implements a {@link SlotSelectionStrategy} that is based on 
previous allocations and
+ * falls back to using location preference hints if there is no previous 
allocation.
+ */
+public class PreviousAllocationSlotSelectionStrategy implements 
SlotSelectionStrategy {
 
 Review comment:
   Same comment concerning singleton's and using an `enum` for that.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10052) Tolerate temporarily suspended ZooKeeper connections

2018-10-23 Thread Elias Levy (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10052?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16660864#comment-16660864
 ] 

Elias Levy commented on FLINK-10052:


[~Wosinsan] any progress on this issue?

> Tolerate temporarily suspended ZooKeeper connections
> 
>
> Key: FLINK-10052
> URL: https://issues.apache.org/jira/browse/FLINK-10052
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.4.2, 1.5.2, 1.6.0
>Reporter: Till Rohrmann
>Assignee: Dominik Wosiński
>Priority: Major
>
> This issue results from FLINK-10011 which uncovered a problem with Flink's HA 
> recovery and proposed the following solution to harden Flink:
> The {{ZooKeeperLeaderElectionService}} uses the {{LeaderLatch}} Curator 
> recipe for leader election. The leader latch revokes leadership in case of a 
> suspended ZooKeeper connection. This can be premature in case that the system 
> can reconnect to ZooKeeper before its session expires. The effect of the lost 
> leadership is that all jobs will be canceled and directly restarted after 
> regaining the leadership.
> Instead of directly revoking the leadership upon a SUSPENDED ZooKeeper 
> connection, it would be better to wait until the ZooKeeper connection is 
> LOST. That way we would allow the system to reconnect and not lose the 
> leadership. This could be achievable by using Curator's {{LeaderSelector}} 
> instead of the {{LeaderLatch}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10637) Start MiniCluster with random REST port

2018-10-23 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10637?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16660850#comment-16660850
 ] 

ASF GitHub Bot commented on FLINK-10637:


zentol commented on issue #6899: [FLINK-10637] Use MiniClusterResource for 
tests in flink-runtime
URL: https://github.com/apache/flink/pull/6899#issuecomment-432301818
 
 
   sounds good to me @tillrohrmann .


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Start MiniCluster with random REST port
> ---
>
> Key: FLINK-10637
> URL: https://issues.apache.org/jira/browse/FLINK-10637
> Project: Flink
>  Issue Type: Improvement
>  Components: Tests
>Affects Versions: 1.5.4, 1.6.1, 1.7.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.5.6, 1.6.3, 1.7.0
>
>
> The {{MiniCluster}} picks a random port for the {{RpcService}} but not for 
> the REST server endpoint. Due to this it falls back to {{8081}}. This can 
> lead to port conflicts if tests are executed concurrently.
> I propose to rename the {{MiniClusterResource}} into 
> {{MiniClusterResourceWithRestClient}} and add a new {{MiniClusterResource}} 
> which only starts a {{MiniCluster}} with the REST port set to 0.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] zentol commented on issue #6899: [FLINK-10637] Use MiniClusterResource for tests in flink-runtime

2018-10-23 Thread GitBox
zentol commented on issue #6899: [FLINK-10637] Use MiniClusterResource for 
tests in flink-runtime
URL: https://github.com/apache/flink/pull/6899#issuecomment-432301818
 
 
   sounds good to me @tillrohrmann .


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10037) Document details event time behavior in a single location

2018-10-23 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10037?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16660845#comment-16660845
 ] 

ASF GitHub Bot commented on FLINK-10037:


pnowojski commented on issue #6481: [FLINK-10037] [Documentation] Add Event 
Time Details documentation page
URL: https://github.com/apache/flink/pull/6481#issuecomment-432298148
 
 
   @eliaslevy what @aljoscha had in mind (correct me if I'm wrong) is that this 
new `"Event Time Details"` page should be deduplicated with the existing 
content (I think in three pages `event_time***.md`: 
[1](https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/event_time.html),
 
[2](https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/event_timestamps_watermarks.html)
 and 
[3](https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/event_timestamp_extractors.html)).
   
   Could you read/compare them and either move content between them or cross 
reference it? If we keep more or less the same content duplicated in the docs, 
there is high chance that they will drift out of sync and that one of them will 
become outdated.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Document details event time behavior in a single location
> -
>
> Key: FLINK-10037
> URL: https://issues.apache.org/jira/browse/FLINK-10037
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.5.2
>Reporter: Elias Levy
>Assignee: Elias Levy
>Priority: Minor
>  Labels: pull-request-available
>
> A description of event time and watermarks, how they generated, assigned, and 
> handled, is spread across many pages in the documentation.  I would be useful 
> to have it all in a single place and includes missing information, such as 
> how Flink assigns timestamps to new records generated by operators.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] pnowojski commented on issue #6481: [FLINK-10037] [Documentation] Add Event Time Details documentation page

2018-10-23 Thread GitBox
pnowojski commented on issue #6481: [FLINK-10037] [Documentation] Add Event 
Time Details documentation page
URL: https://github.com/apache/flink/pull/6481#issuecomment-432298148
 
 
   @eliaslevy what @aljoscha had in mind (correct me if I'm wrong) is that this 
new `"Event Time Details"` page should be deduplicated with the existing 
content (I think in three pages `event_time***.md`: 
[1](https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/event_time.html),
 
[2](https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/event_timestamps_watermarks.html)
 and 
[3](https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/event_timestamp_extractors.html)).
   
   Could you read/compare them and either move content between them or cross 
reference it? If we keep more or less the same content duplicated in the docs, 
there is high chance that they will drift out of sync and that one of them will 
become outdated.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool

2018-10-23 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16660839#comment-16660839
 ] 

ASF GitHub Bot commented on FLINK-10431:


tillrohrmann commented on a change in pull request #6898: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler
URL: https://github.com/apache/flink/pull/6898#discussion_r227447242
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 ##
 @@ -1676,6 +1684,35 @@ public void updateAccumulators(AccumulatorSnapshot 
accumulatorSnapshot) {
}
}
 
+   /**
+* Computes and returns a set with the prior allocation ids from all 
execution vertices in the graph.
+*/
+   private Set computeAllPriorAllocationIds() {
+   HashSet allPreviousAllocationIds = new 
HashSet<>();
+   Iterable ejvIterable = 
getVerticesTopologically();
+   for (ExecutionJobVertex executionJobVertex : ejvIterable) {
+   for (ExecutionVertex executionVertex : 
executionJobVertex.getTaskVertices()) {
 
 Review comment:
   `getAllExecutionVertices` should be easier to use here.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Extract scheduling-related code from SlotPool
> -
>
> Key: FLINK-10431
> URL: https://issues.apache.org/jira/browse/FLINK-10431
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Affects Versions: 1.7.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
>
> The other half of the current scheduling logic is the management of slot 
> sharing and is located in the SlotPool. We need to extract this logic into 
> our new Scheduler component from the previous step. This leaves us with a 
> simpler SlotPool that mainly cares about obtaining, holding, and releasing 
> slots in interaction with a ResourceManager. The new Scheduler can now 
> identify slot sharing groups and interacts with the SlotPool.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler

2018-10-23 Thread GitBox
tillrohrmann commented on a change in pull request #6898: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler
URL: https://github.com/apache/flink/pull/6898#discussion_r227447242
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 ##
 @@ -1676,6 +1684,35 @@ public void updateAccumulators(AccumulatorSnapshot 
accumulatorSnapshot) {
}
}
 
+   /**
+* Computes and returns a set with the prior allocation ids from all 
execution vertices in the graph.
+*/
+   private Set computeAllPriorAllocationIds() {
+   HashSet allPreviousAllocationIds = new 
HashSet<>();
+   Iterable ejvIterable = 
getVerticesTopologically();
+   for (ExecutionJobVertex executionJobVertex : ejvIterable) {
+   for (ExecutionVertex executionVertex : 
executionJobVertex.getTaskVertices()) {
 
 Review comment:
   `getAllExecutionVertices` should be easier to use here.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool

2018-10-23 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16660838#comment-16660838
 ] 

ASF GitHub Bot commented on FLINK-10431:


tillrohrmann commented on a change in pull request #6898: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler
URL: https://github.com/apache/flink/pull/6898#discussion_r227446797
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 ##
 @@ -1676,6 +1684,35 @@ public void updateAccumulators(AccumulatorSnapshot 
accumulatorSnapshot) {
}
}
 
+   /**
+* Computes and returns a set with the prior allocation ids from all 
execution vertices in the graph.
+*/
+   private Set computeAllPriorAllocationIds() {
+   HashSet allPreviousAllocationIds = new 
HashSet<>();
 
 Review comment:
   Initialize `HashSet` with `getNumberOfExecutionJobVertices()`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Extract scheduling-related code from SlotPool
> -
>
> Key: FLINK-10431
> URL: https://issues.apache.org/jira/browse/FLINK-10431
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Affects Versions: 1.7.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
>
> The other half of the current scheduling logic is the management of slot 
> sharing and is located in the SlotPool. We need to extract this logic into 
> our new Scheduler component from the previous step. This leaves us with a 
> simpler SlotPool that mainly cares about obtaining, holding, and releasing 
> slots in interaction with a ResourceManager. The new Scheduler can now 
> identify slot sharing groups and interacts with the SlotPool.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler

2018-10-23 Thread GitBox
tillrohrmann commented on a change in pull request #6898: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler
URL: https://github.com/apache/flink/pull/6898#discussion_r227446797
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 ##
 @@ -1676,6 +1684,35 @@ public void updateAccumulators(AccumulatorSnapshot 
accumulatorSnapshot) {
}
}
 
+   /**
+* Computes and returns a set with the prior allocation ids from all 
execution vertices in the graph.
+*/
+   private Set computeAllPriorAllocationIds() {
+   HashSet allPreviousAllocationIds = new 
HashSet<>();
 
 Review comment:
   Initialize `HashSet` with `getNumberOfExecutionJobVertices()`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler

2018-10-23 Thread GitBox
tillrohrmann commented on a change in pull request #6898: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler
URL: https://github.com/apache/flink/pull/6898#discussion_r227446397
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 ##
 @@ -939,13 +942,18 @@ public void scheduleForExecution() throws JobException {
// collecting all the slots may resize and fail in that 
operation without slots getting lost
final ArrayList> 
allAllocationFutures = new ArrayList<>(getNumberOfExecutionJobVertices());
 
+   // this is a (temporary) to avoid collecting the previous 
allocations for all executions again and again
 
 Review comment:
   noun missing in the comment


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool

2018-10-23 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16660835#comment-16660835
 ] 

ASF GitHub Bot commented on FLINK-10431:


StefanRRichter commented on a change in pull request #6898: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler
URL: https://github.com/apache/flink/pull/6898#discussion_r227446372
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 ##
 @@ -450,6 +454,7 @@ public void setInitialState(@Nullable 
JobManagerTaskRestore taskRestore) {
SlotProvider slotProvider,
boolean queued,
LocationPreferenceConstraint 
locationPreferenceConstraint,
+   @Nullable Set 
allPreviousExecutionGraphAllocationIds,
 
 Review comment:
   Will do. Overall this construct that uses `null` will go away once we 
introduce group scheduling.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Extract scheduling-related code from SlotPool
> -
>
> Key: FLINK-10431
> URL: https://issues.apache.org/jira/browse/FLINK-10431
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Affects Versions: 1.7.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
>
> The other half of the current scheduling logic is the management of slot 
> sharing and is located in the SlotPool. We need to extract this logic into 
> our new Scheduler component from the previous step. This leaves us with a 
> simpler SlotPool that mainly cares about obtaining, holding, and releasing 
> slots in interaction with a ResourceManager. The new Scheduler can now 
> identify slot sharing groups and interacts with the SlotPool.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool

2018-10-23 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16660836#comment-16660836
 ] 

ASF GitHub Bot commented on FLINK-10431:


tillrohrmann commented on a change in pull request #6898: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler
URL: https://github.com/apache/flink/pull/6898#discussion_r227446397
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 ##
 @@ -939,13 +942,18 @@ public void scheduleForExecution() throws JobException {
// collecting all the slots may resize and fail in that 
operation without slots getting lost
final ArrayList> 
allAllocationFutures = new ArrayList<>(getNumberOfExecutionJobVertices());
 
+   // this is a (temporary) to avoid collecting the previous 
allocations for all executions again and again
 
 Review comment:
   noun missing in the comment


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Extract scheduling-related code from SlotPool
> -
>
> Key: FLINK-10431
> URL: https://issues.apache.org/jira/browse/FLINK-10431
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Affects Versions: 1.7.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
>
> The other half of the current scheduling logic is the management of slot 
> sharing and is located in the SlotPool. We need to extract this logic into 
> our new Scheduler component from the previous step. This leaves us with a 
> simpler SlotPool that mainly cares about obtaining, holding, and releasing 
> slots in interaction with a ResourceManager. The new Scheduler can now 
> identify slot sharing groups and interacts with the SlotPool.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] StefanRRichter commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler

2018-10-23 Thread GitBox
StefanRRichter commented on a change in pull request #6898: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler
URL: https://github.com/apache/flink/pull/6898#discussion_r227446372
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 ##
 @@ -450,6 +454,7 @@ public void setInitialState(@Nullable 
JobManagerTaskRestore taskRestore) {
SlotProvider slotProvider,
boolean queued,
LocationPreferenceConstraint 
locationPreferenceConstraint,
+   @Nullable Set 
allPreviousExecutionGraphAllocationIds,
 
 Review comment:
   Will do. Overall this construct that uses `null` will go away once we 
introduce group scheduling.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool

2018-10-23 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16660833#comment-16660833
 ] 

ASF GitHub Bot commented on FLINK-10431:


tillrohrmann commented on a change in pull request #6898: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler
URL: https://github.com/apache/flink/pull/6898#discussion_r227445757
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 ##
 @@ -450,6 +454,7 @@ public void setInitialState(@Nullable 
JobManagerTaskRestore taskRestore) {
SlotProvider slotProvider,
boolean queued,
LocationPreferenceConstraint 
locationPreferenceConstraint,
+   @Nullable Set 
allPreviousExecutionGraphAllocationIds,
 
 Review comment:
   Please update the JavaDocs stating what `null` for this parameter means.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Extract scheduling-related code from SlotPool
> -
>
> Key: FLINK-10431
> URL: https://issues.apache.org/jira/browse/FLINK-10431
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Affects Versions: 1.7.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
>
> The other half of the current scheduling logic is the management of slot 
> sharing and is located in the SlotPool. We need to extract this logic into 
> our new Scheduler component from the previous step. This leaves us with a 
> simpler SlotPool that mainly cares about obtaining, holding, and releasing 
> slots in interaction with a ResourceManager. The new Scheduler can now 
> identify slot sharing groups and interacts with the SlotPool.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool

2018-10-23 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16660827#comment-16660827
 ] 

ASF GitHub Bot commented on FLINK-10431:


tillrohrmann commented on a change in pull request #6898: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler
URL: https://github.com/apache/flink/pull/6898#discussion_r227444801
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 ##
 @@ -450,6 +454,7 @@ public void setInitialState(@Nullable 
JobManagerTaskRestore taskRestore) {
SlotProvider slotProvider,
boolean queued,
LocationPreferenceConstraint 
locationPreferenceConstraint,
+   @Nullable Set 
allPreviousExecutionGraphAllocationIds,
 
 Review comment:
   Same here with `@Nullable`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Extract scheduling-related code from SlotPool
> -
>
> Key: FLINK-10431
> URL: https://issues.apache.org/jira/browse/FLINK-10431
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Affects Versions: 1.7.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
>
> The other half of the current scheduling logic is the management of slot 
> sharing and is located in the SlotPool. We need to extract this logic into 
> our new Scheduler component from the previous step. This leaves us with a 
> simpler SlotPool that mainly cares about obtaining, holding, and releasing 
> slots in interaction with a ResourceManager. The new Scheduler can now 
> identify slot sharing groups and interacts with the SlotPool.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler

2018-10-23 Thread GitBox
tillrohrmann commented on a change in pull request #6898: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler
URL: https://github.com/apache/flink/pull/6898#discussion_r227445757
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 ##
 @@ -450,6 +454,7 @@ public void setInitialState(@Nullable 
JobManagerTaskRestore taskRestore) {
SlotProvider slotProvider,
boolean queued,
LocationPreferenceConstraint 
locationPreferenceConstraint,
+   @Nullable Set 
allPreviousExecutionGraphAllocationIds,
 
 Review comment:
   Please update the JavaDocs stating what `null` for this parameter means.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool

2018-10-23 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16660829#comment-16660829
 ] 

ASF GitHub Bot commented on FLINK-10431:


StefanRRichter commented on a change in pull request #6898: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler
URL: https://github.com/apache/flink/pull/6898#discussion_r227445579
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 ##
 @@ -385,7 +386,8 @@ public void setInitialState(@Nullable 
JobManagerTaskRestore taskRestore) {
return scheduleForExecution(
resourceProvider,
allowQueued,
-   LocationPreferenceConstraint.ANY);
+   LocationPreferenceConstraint.ANY,
+   null);
 
 Review comment:
   Point is that`null` and `emptySet` have different meanings here: `null` 
means it was not previously computed/provided and has tp be determined inside 
the execution. `emptySet` means it has been provided and the result was just 
empty. Overall, this whole construct should go away very soon when we move ro 
group scheduling.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Extract scheduling-related code from SlotPool
> -
>
> Key: FLINK-10431
> URL: https://issues.apache.org/jira/browse/FLINK-10431
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Affects Versions: 1.7.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
>
> The other half of the current scheduling logic is the management of slot 
> sharing and is located in the SlotPool. We need to extract this logic into 
> our new Scheduler component from the previous step. This leaves us with a 
> simpler SlotPool that mainly cares about obtaining, holding, and releasing 
> slots in interaction with a ResourceManager. The new Scheduler can now 
> identify slot sharing groups and interacts with the SlotPool.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool

2018-10-23 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16660828#comment-16660828
 ] 

ASF GitHub Bot commented on FLINK-10431:


tillrohrmann commented on a change in pull request #6898: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler
URL: https://github.com/apache/flink/pull/6898#discussion_r227444632
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 ##
 @@ -402,13 +404,15 @@ public void setInitialState(@Nullable 
JobManagerTaskRestore taskRestore) {
public CompletableFuture scheduleForExecution(
SlotProvider slotProvider,
boolean queued,
-   LocationPreferenceConstraint 
locationPreferenceConstraint) {
+   LocationPreferenceConstraint 
locationPreferenceConstraint,
+   @Nullable Set 
allPreviousExecutionGraphAllocationIds) {
 
 Review comment:
   Let's try not to pass in null arguments


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Extract scheduling-related code from SlotPool
> -
>
> Key: FLINK-10431
> URL: https://issues.apache.org/jira/browse/FLINK-10431
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Affects Versions: 1.7.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
>
> The other half of the current scheduling logic is the management of slot 
> sharing and is located in the SlotPool. We need to extract this logic into 
> our new Scheduler component from the previous step. This leaves us with a 
> simpler SlotPool that mainly cares about obtaining, holding, and releasing 
> slots in interaction with a ResourceManager. The new Scheduler can now 
> identify slot sharing groups and interacts with the SlotPool.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool

2018-10-23 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16660831#comment-16660831
 ] 

ASF GitHub Bot commented on FLINK-10431:


StefanRRichter commented on a change in pull request #6898: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler
URL: https://github.com/apache/flink/pull/6898#discussion_r227445579
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 ##
 @@ -385,7 +386,8 @@ public void setInitialState(@Nullable 
JobManagerTaskRestore taskRestore) {
return scheduleForExecution(
resourceProvider,
allowQueued,
-   LocationPreferenceConstraint.ANY);
+   LocationPreferenceConstraint.ANY,
+   null);
 
 Review comment:
   Point is that`null` and `emptySet` have different meanings here: `null` 
means it was not previously computed/provided and has tp be determined inside 
the execution. `emptySet` means it has been provided and the result was just 
empty. Overall, this whole construct should go away very soon when we move ro 
group scheduling.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Extract scheduling-related code from SlotPool
> -
>
> Key: FLINK-10431
> URL: https://issues.apache.org/jira/browse/FLINK-10431
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Affects Versions: 1.7.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
>
> The other half of the current scheduling logic is the management of slot 
> sharing and is located in the SlotPool. We need to extract this logic into 
> our new Scheduler component from the previous step. This leaves us with a 
> simpler SlotPool that mainly cares about obtaining, holding, and releasing 
> slots in interaction with a ResourceManager. The new Scheduler can now 
> identify slot sharing groups and interacts with the SlotPool.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool

2018-10-23 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16660830#comment-16660830
 ] 

ASF GitHub Bot commented on FLINK-10431:


tillrohrmann commented on a change in pull request #6898: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler
URL: https://github.com/apache/flink/pull/6898#discussion_r22705
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 ##
 @@ -385,7 +386,8 @@ public void setInitialState(@Nullable 
JobManagerTaskRestore taskRestore) {
return scheduleForExecution(
resourceProvider,
allowQueued,
-   LocationPreferenceConstraint.ANY);
+   LocationPreferenceConstraint.ANY,
+   null);
 
 Review comment:
   Let's not use `null` but instead `Collections.emptySet()`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Extract scheduling-related code from SlotPool
> -
>
> Key: FLINK-10431
> URL: https://issues.apache.org/jira/browse/FLINK-10431
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Affects Versions: 1.7.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
>
> The other half of the current scheduling logic is the management of slot 
> sharing and is located in the SlotPool. We need to extract this logic into 
> our new Scheduler component from the previous step. This leaves us with a 
> simpler SlotPool that mainly cares about obtaining, holding, and releasing 
> slots in interaction with a ResourceManager. The new Scheduler can now 
> identify slot sharing groups and interacts with the SlotPool.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] StefanRRichter commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler

2018-10-23 Thread GitBox
StefanRRichter commented on a change in pull request #6898: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler
URL: https://github.com/apache/flink/pull/6898#discussion_r227445579
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 ##
 @@ -385,7 +386,8 @@ public void setInitialState(@Nullable 
JobManagerTaskRestore taskRestore) {
return scheduleForExecution(
resourceProvider,
allowQueued,
-   LocationPreferenceConstraint.ANY);
+   LocationPreferenceConstraint.ANY,
+   null);
 
 Review comment:
   Point is that`null` and `emptySet` have different meanings here: `null` 
means it was not previously computed/provided and has tp be determined inside 
the execution. `emptySet` means it has been provided and the result was just 
empty. Overall, this whole construct should go away very soon when we move ro 
group scheduling.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] StefanRRichter commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler

2018-10-23 Thread GitBox
StefanRRichter commented on a change in pull request #6898: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler
URL: https://github.com/apache/flink/pull/6898#discussion_r227445579
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 ##
 @@ -385,7 +386,8 @@ public void setInitialState(@Nullable 
JobManagerTaskRestore taskRestore) {
return scheduleForExecution(
resourceProvider,
allowQueued,
-   LocationPreferenceConstraint.ANY);
+   LocationPreferenceConstraint.ANY,
+   null);
 
 Review comment:
   Point is that`null` and `emptySet` have different meanings here: `null` 
means it was not previously computed/provided and has tp be determined inside 
the execution. `emptySet` means it has been provided and the result was just 
empty. Overall, this whole construct should go away very soon when we move ro 
group scheduling.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler

2018-10-23 Thread GitBox
tillrohrmann commented on a change in pull request #6898: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler
URL: https://github.com/apache/flink/pull/6898#discussion_r227444632
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 ##
 @@ -402,13 +404,15 @@ public void setInitialState(@Nullable 
JobManagerTaskRestore taskRestore) {
public CompletableFuture scheduleForExecution(
SlotProvider slotProvider,
boolean queued,
-   LocationPreferenceConstraint 
locationPreferenceConstraint) {
+   LocationPreferenceConstraint 
locationPreferenceConstraint,
+   @Nullable Set 
allPreviousExecutionGraphAllocationIds) {
 
 Review comment:
   Let's try not to pass in null arguments


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler

2018-10-23 Thread GitBox
tillrohrmann commented on a change in pull request #6898: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler
URL: https://github.com/apache/flink/pull/6898#discussion_r22705
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 ##
 @@ -385,7 +386,8 @@ public void setInitialState(@Nullable 
JobManagerTaskRestore taskRestore) {
return scheduleForExecution(
resourceProvider,
allowQueued,
-   LocationPreferenceConstraint.ANY);
+   LocationPreferenceConstraint.ANY,
+   null);
 
 Review comment:
   Let's not use `null` but instead `Collections.emptySet()`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler

2018-10-23 Thread GitBox
tillrohrmann commented on a change in pull request #6898: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler
URL: https://github.com/apache/flink/pull/6898#discussion_r227444801
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 ##
 @@ -450,6 +454,7 @@ public void setInitialState(@Nullable 
JobManagerTaskRestore taskRestore) {
SlotProvider slotProvider,
boolean queued,
LocationPreferenceConstraint 
locationPreferenceConstraint,
+   @Nullable Set 
allPreviousExecutionGraphAllocationIds,
 
 Review comment:
   Same here with `@Nullable`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler

2018-10-23 Thread GitBox
tillrohrmann commented on a change in pull request #6898: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler
URL: https://github.com/apache/flink/pull/6898#discussion_r227444801
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 ##
 @@ -450,6 +454,7 @@ public void setInitialState(@Nullable 
JobManagerTaskRestore taskRestore) {
SlotProvider slotProvider,
boolean queued,
LocationPreferenceConstraint 
locationPreferenceConstraint,
+   @Nullable Set 
allPreviousExecutionGraphAllocationIds,
 
 Review comment:
   Same here with `@Nullable`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool

2018-10-23 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16660824#comment-16660824
 ] 

ASF GitHub Bot commented on FLINK-10431:


tillrohrmann commented on a change in pull request #6898: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler
URL: https://github.com/apache/flink/pull/6898#discussion_r227444801
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 ##
 @@ -450,6 +454,7 @@ public void setInitialState(@Nullable 
JobManagerTaskRestore taskRestore) {
SlotProvider slotProvider,
boolean queued,
LocationPreferenceConstraint 
locationPreferenceConstraint,
+   @Nullable Set 
allPreviousExecutionGraphAllocationIds,
 
 Review comment:
   Same here with `@Nullable`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Extract scheduling-related code from SlotPool
> -
>
> Key: FLINK-10431
> URL: https://issues.apache.org/jira/browse/FLINK-10431
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Affects Versions: 1.7.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
>
> The other half of the current scheduling logic is the management of slot 
> sharing and is located in the SlotPool. We need to extract this logic into 
> our new Scheduler component from the previous step. This leaves us with a 
> simpler SlotPool that mainly cares about obtaining, holding, and releasing 
> slots in interaction with a ResourceManager. The new Scheduler can now 
> identify slot sharing groups and interacts with the SlotPool.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool

2018-10-23 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16660823#comment-16660823
 ] 

ASF GitHub Bot commented on FLINK-10431:


tillrohrmann commented on a change in pull request #6898: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler
URL: https://github.com/apache/flink/pull/6898#discussion_r227444632
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 ##
 @@ -402,13 +404,15 @@ public void setInitialState(@Nullable 
JobManagerTaskRestore taskRestore) {
public CompletableFuture scheduleForExecution(
SlotProvider slotProvider,
boolean queued,
-   LocationPreferenceConstraint 
locationPreferenceConstraint) {
+   LocationPreferenceConstraint 
locationPreferenceConstraint,
+   @Nullable Set 
allPreviousExecutionGraphAllocationIds) {
 
 Review comment:
   Let's try not to pass in null arguments


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Extract scheduling-related code from SlotPool
> -
>
> Key: FLINK-10431
> URL: https://issues.apache.org/jira/browse/FLINK-10431
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Affects Versions: 1.7.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
>
> The other half of the current scheduling logic is the management of slot 
> sharing and is located in the SlotPool. We need to extract this logic into 
> our new Scheduler component from the previous step. This leaves us with a 
> simpler SlotPool that mainly cares about obtaining, holding, and releasing 
> slots in interaction with a ResourceManager. The new Scheduler can now 
> identify slot sharing groups and interacts with the SlotPool.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler

2018-10-23 Thread GitBox
tillrohrmann commented on a change in pull request #6898: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler
URL: https://github.com/apache/flink/pull/6898#discussion_r227444632
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 ##
 @@ -402,13 +404,15 @@ public void setInitialState(@Nullable 
JobManagerTaskRestore taskRestore) {
public CompletableFuture scheduleForExecution(
SlotProvider slotProvider,
boolean queued,
-   LocationPreferenceConstraint 
locationPreferenceConstraint) {
+   LocationPreferenceConstraint 
locationPreferenceConstraint,
+   @Nullable Set 
allPreviousExecutionGraphAllocationIds) {
 
 Review comment:
   Let's try not to pass in null arguments


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool

2018-10-23 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16660822#comment-16660822
 ] 

ASF GitHub Bot commented on FLINK-10431:


tillrohrmann commented on a change in pull request #6898: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler
URL: https://github.com/apache/flink/pull/6898#discussion_r22705
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 ##
 @@ -385,7 +386,8 @@ public void setInitialState(@Nullable 
JobManagerTaskRestore taskRestore) {
return scheduleForExecution(
resourceProvider,
allowQueued,
-   LocationPreferenceConstraint.ANY);
+   LocationPreferenceConstraint.ANY,
+   null);
 
 Review comment:
   Let's not use `null` but instead `Collections.emptySet()`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Extract scheduling-related code from SlotPool
> -
>
> Key: FLINK-10431
> URL: https://issues.apache.org/jira/browse/FLINK-10431
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Affects Versions: 1.7.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
>
> The other half of the current scheduling logic is the management of slot 
> sharing and is located in the SlotPool. We need to extract this logic into 
> our new Scheduler component from the previous step. This leaves us with a 
> simpler SlotPool that mainly cares about obtaining, holding, and releasing 
> slots in interaction with a ResourceManager. The new Scheduler can now 
> identify slot sharing groups and interacts with the SlotPool.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler

2018-10-23 Thread GitBox
tillrohrmann commented on a change in pull request #6898: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler
URL: https://github.com/apache/flink/pull/6898#discussion_r22705
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 ##
 @@ -385,7 +386,8 @@ public void setInitialState(@Nullable 
JobManagerTaskRestore taskRestore) {
return scheduleForExecution(
resourceProvider,
allowQueued,
-   LocationPreferenceConstraint.ANY);
+   LocationPreferenceConstraint.ANY,
+   null);
 
 Review comment:
   Let's not use `null` but instead `Collections.emptySet()`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Closed] (FLINK-10622) Add end-to-end test for temporal table joins

2018-10-23 Thread Piotr Nowojski (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10622?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski closed FLINK-10622.
--
Resolution: Won't Fix

After thinking more about this, it should be enough to cover Temporal Joins 
with unit tests and ITCases. 

> Add end-to-end test for temporal table joins
> 
>
> Key: FLINK-10622
> URL: https://issues.apache.org/jira/browse/FLINK-10622
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL, Tests
>Affects Versions: 1.7.0
>Reporter: Till Rohrmann
>Assignee: Piotr Nowojski
>Priority: Critical
> Fix For: 1.7.0
>
>
> We should add an end-to-end test to test the new time versioned joins 
> functionality. Maybe we can add this to the existing Stream SQL end-to-end 
> test {{test_streaming_sql.sh}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10220) StreamSQL E2E test fails on travis

2018-10-23 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10220?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16660814#comment-16660814
 ] 

ASF GitHub Bot commented on FLINK-10220:


dawidwys opened a new pull request #6908: [FLINK-10220][e2e] Removing logs for 
streaming sql e2e test before validation
URL: https://github.com/apache/flink/pull/6908
 
 
   ## Brief change log
   
   * It properly checks logs for exception & errors (previously it was 
nondeterministic)
   * Removing logs for streaming sql e2e test, because exceptions there are 
expected
   
   Explanation for this change:
   Previously the last grep command had `-iq` flag which was terminated after 
first occurance of exception/error, which resulted in pipe failure and the 
whole assertion not working.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / **no**)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
 - The serializers: (yes / **no** / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / **no** 
/ don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
 - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / **no**)
 - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> StreamSQL E2E test fails on travis
> --
>
> Key: FLINK-10220
> URL: https://issues.apache.org/jira/browse/FLINK-10220
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL, Tests
>Affects Versions: 1.7.0
>Reporter: Chesnay Schepler
>Assignee: Hequn Cheng
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> https://travis-ci.org/zentol/flink-ci/jobs/420972344
> {code}
> [FAIL] 'Streaming SQL end-to-end test' failed after 1 minutes and 49 seconds! 
> Test exited with exit code 0 but the logs contained errors, exceptions or 
> non-empty .out files
> 2018-08-27 07:34:36,311 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph- window: 
> (TumblingGroupWindow('w$, 'rowtime, 2.millis)), select: ($SUM0(correct) 
> AS correct, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS 
> w$rowtime, proctime('w$) AS w$proctime) -> select: (correct, w$start AS 
> rowtime) -> to: Row -> Map -> Sink: Unnamed (1/1) 
> (97d055e4661ff3361a504626257d406d) switched from RUNNING to FAILED.
> java.lang.RuntimeException: Exception occurred while processing valve output 
> watermark: 
>   at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:265)
>   at 
> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
>   at 
> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
>   at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:184)
>   at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: 
> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: 
> Could not forward element to next operator
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:596)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:689)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:667)
>   at 
> 

  1   2   3   >