[jira] [Commented] (FLINK-7822) Add documentation for the new queryable state client.

2017-11-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7822?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16241854#comment-16241854
 ] 

ASF GitHub Bot commented on FLINK-7822:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/4966#discussion_r149337820
  
--- Diff: docs/dev/stream/state/queryable_state.md ---
@@ -32,38 +32,67 @@ under the License.
   likely that there will be breaking API changes on the client side in the 
upcoming Flink versions.
 
 
-In a nutshell, this feature allows users to query Flink's managed 
partitioned state
-(see [Working with State]({{ site.baseurl }}/dev/stream/state/state.html)) 
from outside of
-Flink. For some scenarios, queryable state thus eliminates the need for 
distributed
-operations/transactions with external systems such as key-value stores 
which are often the
-bottleneck in practice.
+In a nutshell, this feature exposes Flink's managed keyed (partitioned) 
state
+(see [Working with State]({{ site.baseurl }}/dev/stream/state/state.html)) 
to the outside world and 
+allows the user to query a job's state from outside Flink. For some 
scenarios, queryable state 
+eliminates the need for distributed operations/transactions with external 
systems such as key-value 
+stores which are often the bottleneck in practice. In addition, this 
feature may be particularly 
+useful for debugging purposes.
 
 
-  Attention: Queryable state accesses keyed state from a 
concurrent thread rather
-  than synchronizing with the operator and potentially blocking its 
operation. Since any state
-  backend using Java heap space, e.g. MemoryStateBackend or
-  FsStateBackend, does not work with copies when retrieving values but 
instead directly
-  references the stored values, read-modify-write patterns are unsafe and 
may cause the
-  queryable state server to fail due to concurrent modifications.
-  The RocksDBStateBackend is safe from these issues.
+  Attention: When querying a state object, that object is 
accessed from a concurrent 
+  thread without any synchronization or copying. This is a design choice, 
as any of the above would lead
+  to increased job latency, which we wanted to avoid. Since any state 
backend using Java heap space, 
+  e.g. MemoryStateBackend or 
FsStateBackend, does not work 
+  with copies when retrieving values but instead directly references the 
stored values, read-modify-write 
+  patterns are unsafe and may cause the queryable state server to fail due 
to concurrent modifications.
+  The RocksDBStateBackend is safe from these issues.
 
 
+## Architecture
+
+Before showing how to use the Queryable State, it is useful to briefly 
describe the entities that compose it.
+The Queryable State consists of three main entities:
+
+ 1. the `QueryableStateClient`, which (potentially) runs outside the Flink 
cluster and submits the user queries, 
+ 2. the `QueryableStateClientProxy`, which runs on each `TaskManager` 
(*i.e.* inside the Flink cluster) and is responsible 
+ for receiving the client's queries, fetching the requested state on his 
behalf, and returning it to the client, and 
+ 3. the `QueryableStateServer` which runs on each `TaskManager` and is 
responsible for serving the locally stored state.
+ 
+In a nutshell, the client will connect to one of the proxies and send a 
request for the state associated with a specific 
+key, `k`. As stated in [Working with State]({{ site.baseurl 
}}/dev/stream/state/state.html), keyed state is organized in 
+*Key Groups*, and each `TaskManager` is assigned a number of these key 
groups. To discover which `TaskManager` is 
+responsible for the key group holding `k`, the proxy will ask the 
`JobManager`. Based on the answer, the proxy will 
+then query the `QueryableStateServer` running on that `TaskManager` for 
the state associated with `k`, and forward the
+response back to the client.
+
+## Activating Queryable State
+
+To enable queryable state on your Flink cluster, you just have to copy the 
+`flink-queryable-state-runtime{{ site.scala_version_suffix 
}}-{{site.version }}.jar` 
+from the `opt/` folder of your [Flink 
distribution](https://flink.apache.org/downloads.html "Apache Flink: 
Downloads"), 
+to the `lib/` folder. In other case, the queryable state feature is not 
enabled. 
+
+To verify that your cluster is running with queryable state enabled, check 
the logs of any 
+task manager for the line: `"Started the Queryable State Proxy Server @ 
..."`.
+
 ## Making State Queryable
 
-In order to make state queryable, the queryable state server first needs 
to be enabled globally
-by setting the `query.server.enable` configuration parameter 

[jira] [Commented] (FLINK-7822) Add documentation for the new queryable state client.

2017-11-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7822?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16241856#comment-16241856
 ] 

ASF GitHub Bot commented on FLINK-7822:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/4966#discussion_r149337649
  
--- Diff: docs/dev/stream/state/queryable_state.md ---
@@ -32,38 +32,67 @@ under the License.
   likely that there will be breaking API changes on the client side in the 
upcoming Flink versions.
 
 
-In a nutshell, this feature allows users to query Flink's managed 
partitioned state
-(see [Working with State]({{ site.baseurl }}/dev/stream/state/state.html)) 
from outside of
-Flink. For some scenarios, queryable state thus eliminates the need for 
distributed
-operations/transactions with external systems such as key-value stores 
which are often the
-bottleneck in practice.
+In a nutshell, this feature exposes Flink's managed keyed (partitioned) 
state
+(see [Working with State]({{ site.baseurl }}/dev/stream/state/state.html)) 
to the outside world and 
+allows the user to query a job's state from outside Flink. For some 
scenarios, queryable state 
+eliminates the need for distributed operations/transactions with external 
systems such as key-value 
+stores which are often the bottleneck in practice. In addition, this 
feature may be particularly 
+useful for debugging purposes.
 
 
-  Attention: Queryable state accesses keyed state from a 
concurrent thread rather
-  than synchronizing with the operator and potentially blocking its 
operation. Since any state
-  backend using Java heap space, e.g. MemoryStateBackend or
-  FsStateBackend, does not work with copies when retrieving values but 
instead directly
-  references the stored values, read-modify-write patterns are unsafe and 
may cause the
-  queryable state server to fail due to concurrent modifications.
-  The RocksDBStateBackend is safe from these issues.
+  Attention: When querying a state object, that object is 
accessed from a concurrent 
+  thread without any synchronization or copying. This is a design choice, 
as any of the above would lead
+  to increased job latency, which we wanted to avoid. Since any state 
backend using Java heap space, 
+  e.g. MemoryStateBackend or 
FsStateBackend, does not work 
+  with copies when retrieving values but instead directly references the 
stored values, read-modify-write 
+  patterns are unsafe and may cause the queryable state server to fail due 
to concurrent modifications.
+  The RocksDBStateBackend is safe from these issues.
 
 
+## Architecture
+
+Before showing how to use the Queryable State, it is useful to briefly 
describe the entities that compose it.
+The Queryable State consists of three main entities:
+
+ 1. the `QueryableStateClient`, which (potentially) runs outside the Flink 
cluster and submits the user queries, 
+ 2. the `QueryableStateClientProxy`, which runs on each `TaskManager` 
(*i.e.* inside the Flink cluster) and is responsible 
+ for receiving the client's queries, fetching the requested state on his 
behalf, and returning it to the client, and 
+ 3. the `QueryableStateServer` which runs on each `TaskManager` and is 
responsible for serving the locally stored state.
+ 
+In a nutshell, the client will connect to one of the proxies and send a 
request for the state associated with a specific 
+key, `k`. As stated in [Working with State]({{ site.baseurl 
}}/dev/stream/state/state.html), keyed state is organized in 
+*Key Groups*, and each `TaskManager` is assigned a number of these key 
groups. To discover which `TaskManager` is 
+responsible for the key group holding `k`, the proxy will ask the 
`JobManager`. Based on the answer, the proxy will 
+then query the `QueryableStateServer` running on that `TaskManager` for 
the state associated with `k`, and forward the
+response back to the client.
+
+## Activating Queryable State
+
+To enable queryable state on your Flink cluster, you just have to copy the 
--- End diff --

nit: omit "the"


> Add documentation for the new queryable state client.
> -
>
> Key: FLINK-7822
> URL: https://issues.apache.org/jira/browse/FLINK-7822
> Project: Flink
>  Issue Type: Sub-task
>  Components: Documentation, Queryable State
>Affects Versions: 1.4.0
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
>Priority: Blocker
> Fix For: 1.4.0
>
>
> This has to include:
> 1) how to enable queryable state (jars)...
> 2) configuration parameters
> 3) new APIs
> 4) Guarantees/Semantics/Limitations (e.g. 

[jira] [Commented] (FLINK-7822) Add documentation for the new queryable state client.

2017-11-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7822?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16241852#comment-16241852
 ] 

ASF GitHub Bot commented on FLINK-7822:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/4966#discussion_r149337260
  
--- Diff: docs/dev/stream/state/queryable_state.md ---
@@ -32,38 +32,67 @@ under the License.
   likely that there will be breaking API changes on the client side in the 
upcoming Flink versions.
 
 
-In a nutshell, this feature allows users to query Flink's managed 
partitioned state
-(see [Working with State]({{ site.baseurl }}/dev/stream/state/state.html)) 
from outside of
-Flink. For some scenarios, queryable state thus eliminates the need for 
distributed
-operations/transactions with external systems such as key-value stores 
which are often the
-bottleneck in practice.
+In a nutshell, this feature exposes Flink's managed keyed (partitioned) 
state
+(see [Working with State]({{ site.baseurl }}/dev/stream/state/state.html)) 
to the outside world and 
+allows the user to query a job's state from outside Flink. For some 
scenarios, queryable state 
+eliminates the need for distributed operations/transactions with external 
systems such as key-value 
+stores which are often the bottleneck in practice. In addition, this 
feature may be particularly 
+useful for debugging purposes.
 
 
-  Attention: Queryable state accesses keyed state from a 
concurrent thread rather
-  than synchronizing with the operator and potentially blocking its 
operation. Since any state
-  backend using Java heap space, e.g. MemoryStateBackend or
-  FsStateBackend, does not work with copies when retrieving values but 
instead directly
-  references the stored values, read-modify-write patterns are unsafe and 
may cause the
-  queryable state server to fail due to concurrent modifications.
-  The RocksDBStateBackend is safe from these issues.
+  Attention: When querying a state object, that object is 
accessed from a concurrent 
+  thread without any synchronization or copying. This is a design choice, 
as any of the above would lead
+  to increased job latency, which we wanted to avoid. Since any state 
backend using Java heap space, 
+  e.g. MemoryStateBackend or 
FsStateBackend, does not work 
+  with copies when retrieving values but instead directly references the 
stored values, read-modify-write 
+  patterns are unsafe and may cause the queryable state server to fail due 
to concurrent modifications.
+  The RocksDBStateBackend is safe from these issues.
 
 
+## Architecture
+
+Before showing how to use the Queryable State, it is useful to briefly 
describe the entities that compose it.
+The Queryable State consists of three main entities:
--- End diff --

nit: "The Queryable State feature"


> Add documentation for the new queryable state client.
> -
>
> Key: FLINK-7822
> URL: https://issues.apache.org/jira/browse/FLINK-7822
> Project: Flink
>  Issue Type: Sub-task
>  Components: Documentation, Queryable State
>Affects Versions: 1.4.0
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
>Priority: Blocker
> Fix For: 1.4.0
>
>
> This has to include:
> 1) how to enable queryable state (jars)...
> 2) configuration parameters
> 3) new APIs
> 4) Guarantees/Semantics/Limitations (e.g. memory backend)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4966: [FLINK-7822][FLINK-7823] Adds documentation and fi...

2017-11-07 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/4966#discussion_r149337397
  
--- Diff: docs/dev/stream/state/queryable_state.md ---
@@ -32,38 +32,67 @@ under the License.
   likely that there will be breaking API changes on the client side in the 
upcoming Flink versions.
 
 
-In a nutshell, this feature allows users to query Flink's managed 
partitioned state
-(see [Working with State]({{ site.baseurl }}/dev/stream/state/state.html)) 
from outside of
-Flink. For some scenarios, queryable state thus eliminates the need for 
distributed
-operations/transactions with external systems such as key-value stores 
which are often the
-bottleneck in practice.
+In a nutshell, this feature exposes Flink's managed keyed (partitioned) 
state
+(see [Working with State]({{ site.baseurl }}/dev/stream/state/state.html)) 
to the outside world and 
+allows the user to query a job's state from outside Flink. For some 
scenarios, queryable state 
+eliminates the need for distributed operations/transactions with external 
systems such as key-value 
+stores which are often the bottleneck in practice. In addition, this 
feature may be particularly 
+useful for debugging purposes.
 
 
-  Attention: Queryable state accesses keyed state from a 
concurrent thread rather
-  than synchronizing with the operator and potentially blocking its 
operation. Since any state
-  backend using Java heap space, e.g. MemoryStateBackend or
-  FsStateBackend, does not work with copies when retrieving values but 
instead directly
-  references the stored values, read-modify-write patterns are unsafe and 
may cause the
-  queryable state server to fail due to concurrent modifications.
-  The RocksDBStateBackend is safe from these issues.
+  Attention: When querying a state object, that object is 
accessed from a concurrent 
+  thread without any synchronization or copying. This is a design choice, 
as any of the above would lead
+  to increased job latency, which we wanted to avoid. Since any state 
backend using Java heap space, 
+  e.g. MemoryStateBackend or 
FsStateBackend, does not work 
+  with copies when retrieving values but instead directly references the 
stored values, read-modify-write 
+  patterns are unsafe and may cause the queryable state server to fail due 
to concurrent modifications.
+  The RocksDBStateBackend is safe from these issues.
 
 
+## Architecture
+
+Before showing how to use the Queryable State, it is useful to briefly 
describe the entities that compose it.
+The Queryable State consists of three main entities:
+
+ 1. the `QueryableStateClient`, which (potentially) runs outside the Flink 
cluster and submits the user queries, 
+ 2. the `QueryableStateClientProxy`, which runs on each `TaskManager` 
(*i.e.* inside the Flink cluster) and is responsible 
+ for receiving the client's queries, fetching the requested state on his 
behalf, and returning it to the client, and 
--- End diff --

maybe "fetching the requested state from the responsible TaskManager"


---


[GitHub] flink pull request #4966: [FLINK-7822][FLINK-7823] Adds documentation and fi...

2017-11-07 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/4966#discussion_r149337715
  
--- Diff: docs/dev/stream/state/queryable_state.md ---
@@ -32,38 +32,67 @@ under the License.
   likely that there will be breaking API changes on the client side in the 
upcoming Flink versions.
 
 
-In a nutshell, this feature allows users to query Flink's managed 
partitioned state
-(see [Working with State]({{ site.baseurl }}/dev/stream/state/state.html)) 
from outside of
-Flink. For some scenarios, queryable state thus eliminates the need for 
distributed
-operations/transactions with external systems such as key-value stores 
which are often the
-bottleneck in practice.
+In a nutshell, this feature exposes Flink's managed keyed (partitioned) 
state
+(see [Working with State]({{ site.baseurl }}/dev/stream/state/state.html)) 
to the outside world and 
+allows the user to query a job's state from outside Flink. For some 
scenarios, queryable state 
+eliminates the need for distributed operations/transactions with external 
systems such as key-value 
+stores which are often the bottleneck in practice. In addition, this 
feature may be particularly 
+useful for debugging purposes.
 
 
-  Attention: Queryable state accesses keyed state from a 
concurrent thread rather
-  than synchronizing with the operator and potentially blocking its 
operation. Since any state
-  backend using Java heap space, e.g. MemoryStateBackend or
-  FsStateBackend, does not work with copies when retrieving values but 
instead directly
-  references the stored values, read-modify-write patterns are unsafe and 
may cause the
-  queryable state server to fail due to concurrent modifications.
-  The RocksDBStateBackend is safe from these issues.
+  Attention: When querying a state object, that object is 
accessed from a concurrent 
+  thread without any synchronization or copying. This is a design choice, 
as any of the above would lead
+  to increased job latency, which we wanted to avoid. Since any state 
backend using Java heap space, 
+  e.g. MemoryStateBackend or 
FsStateBackend, does not work 
+  with copies when retrieving values but instead directly references the 
stored values, read-modify-write 
+  patterns are unsafe and may cause the queryable state server to fail due 
to concurrent modifications.
+  The RocksDBStateBackend is safe from these issues.
 
 
+## Architecture
+
+Before showing how to use the Queryable State, it is useful to briefly 
describe the entities that compose it.
+The Queryable State consists of three main entities:
+
+ 1. the `QueryableStateClient`, which (potentially) runs outside the Flink 
cluster and submits the user queries, 
+ 2. the `QueryableStateClientProxy`, which runs on each `TaskManager` 
(*i.e.* inside the Flink cluster) and is responsible 
+ for receiving the client's queries, fetching the requested state on his 
behalf, and returning it to the client, and 
+ 3. the `QueryableStateServer` which runs on each `TaskManager` and is 
responsible for serving the locally stored state.
+ 
+In a nutshell, the client will connect to one of the proxies and send a 
request for the state associated with a specific 
+key, `k`. As stated in [Working with State]({{ site.baseurl 
}}/dev/stream/state/state.html), keyed state is organized in 
+*Key Groups*, and each `TaskManager` is assigned a number of these key 
groups. To discover which `TaskManager` is 
+responsible for the key group holding `k`, the proxy will ask the 
`JobManager`. Based on the answer, the proxy will 
+then query the `QueryableStateServer` running on that `TaskManager` for 
the state associated with `k`, and forward the
+response back to the client.
+
+## Activating Queryable State
+
+To enable queryable state on your Flink cluster, you just have to copy the 
+`flink-queryable-state-runtime{{ site.scala_version_suffix 
}}-{{site.version }}.jar` 
+from the `opt/` folder of your [Flink 
distribution](https://flink.apache.org/downloads.html "Apache Flink: 
Downloads"), 
+to the `lib/` folder. In other case, the queryable state feature is not 
enabled. 
--- End diff --

nit: "Otherwise, ..."


---


[GitHub] flink pull request #4966: [FLINK-7822][FLINK-7823] Adds documentation and fi...

2017-11-07 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/4966#discussion_r149337260
  
--- Diff: docs/dev/stream/state/queryable_state.md ---
@@ -32,38 +32,67 @@ under the License.
   likely that there will be breaking API changes on the client side in the 
upcoming Flink versions.
 
 
-In a nutshell, this feature allows users to query Flink's managed 
partitioned state
-(see [Working with State]({{ site.baseurl }}/dev/stream/state/state.html)) 
from outside of
-Flink. For some scenarios, queryable state thus eliminates the need for 
distributed
-operations/transactions with external systems such as key-value stores 
which are often the
-bottleneck in practice.
+In a nutshell, this feature exposes Flink's managed keyed (partitioned) 
state
+(see [Working with State]({{ site.baseurl }}/dev/stream/state/state.html)) 
to the outside world and 
+allows the user to query a job's state from outside Flink. For some 
scenarios, queryable state 
+eliminates the need for distributed operations/transactions with external 
systems such as key-value 
+stores which are often the bottleneck in practice. In addition, this 
feature may be particularly 
+useful for debugging purposes.
 
 
-  Attention: Queryable state accesses keyed state from a 
concurrent thread rather
-  than synchronizing with the operator and potentially blocking its 
operation. Since any state
-  backend using Java heap space, e.g. MemoryStateBackend or
-  FsStateBackend, does not work with copies when retrieving values but 
instead directly
-  references the stored values, read-modify-write patterns are unsafe and 
may cause the
-  queryable state server to fail due to concurrent modifications.
-  The RocksDBStateBackend is safe from these issues.
+  Attention: When querying a state object, that object is 
accessed from a concurrent 
+  thread without any synchronization or copying. This is a design choice, 
as any of the above would lead
+  to increased job latency, which we wanted to avoid. Since any state 
backend using Java heap space, 
+  e.g. MemoryStateBackend or 
FsStateBackend, does not work 
+  with copies when retrieving values but instead directly references the 
stored values, read-modify-write 
+  patterns are unsafe and may cause the queryable state server to fail due 
to concurrent modifications.
+  The RocksDBStateBackend is safe from these issues.
 
 
+## Architecture
+
+Before showing how to use the Queryable State, it is useful to briefly 
describe the entities that compose it.
+The Queryable State consists of three main entities:
--- End diff --

nit: "The Queryable State feature"


---


[GitHub] flink pull request #4966: [FLINK-7822][FLINK-7823] Adds documentation and fi...

2017-11-07 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/4966#discussion_r149337820
  
--- Diff: docs/dev/stream/state/queryable_state.md ---
@@ -32,38 +32,67 @@ under the License.
   likely that there will be breaking API changes on the client side in the 
upcoming Flink versions.
 
 
-In a nutshell, this feature allows users to query Flink's managed 
partitioned state
-(see [Working with State]({{ site.baseurl }}/dev/stream/state/state.html)) 
from outside of
-Flink. For some scenarios, queryable state thus eliminates the need for 
distributed
-operations/transactions with external systems such as key-value stores 
which are often the
-bottleneck in practice.
+In a nutshell, this feature exposes Flink's managed keyed (partitioned) 
state
+(see [Working with State]({{ site.baseurl }}/dev/stream/state/state.html)) 
to the outside world and 
+allows the user to query a job's state from outside Flink. For some 
scenarios, queryable state 
+eliminates the need for distributed operations/transactions with external 
systems such as key-value 
+stores which are often the bottleneck in practice. In addition, this 
feature may be particularly 
+useful for debugging purposes.
 
 
-  Attention: Queryable state accesses keyed state from a 
concurrent thread rather
-  than synchronizing with the operator and potentially blocking its 
operation. Since any state
-  backend using Java heap space, e.g. MemoryStateBackend or
-  FsStateBackend, does not work with copies when retrieving values but 
instead directly
-  references the stored values, read-modify-write patterns are unsafe and 
may cause the
-  queryable state server to fail due to concurrent modifications.
-  The RocksDBStateBackend is safe from these issues.
+  Attention: When querying a state object, that object is 
accessed from a concurrent 
+  thread without any synchronization or copying. This is a design choice, 
as any of the above would lead
+  to increased job latency, which we wanted to avoid. Since any state 
backend using Java heap space, 
+  e.g. MemoryStateBackend or 
FsStateBackend, does not work 
+  with copies when retrieving values but instead directly references the 
stored values, read-modify-write 
+  patterns are unsafe and may cause the queryable state server to fail due 
to concurrent modifications.
+  The RocksDBStateBackend is safe from these issues.
 
 
+## Architecture
+
+Before showing how to use the Queryable State, it is useful to briefly 
describe the entities that compose it.
+The Queryable State consists of three main entities:
+
+ 1. the `QueryableStateClient`, which (potentially) runs outside the Flink 
cluster and submits the user queries, 
+ 2. the `QueryableStateClientProxy`, which runs on each `TaskManager` 
(*i.e.* inside the Flink cluster) and is responsible 
+ for receiving the client's queries, fetching the requested state on his 
behalf, and returning it to the client, and 
+ 3. the `QueryableStateServer` which runs on each `TaskManager` and is 
responsible for serving the locally stored state.
+ 
+In a nutshell, the client will connect to one of the proxies and send a 
request for the state associated with a specific 
+key, `k`. As stated in [Working with State]({{ site.baseurl 
}}/dev/stream/state/state.html), keyed state is organized in 
+*Key Groups*, and each `TaskManager` is assigned a number of these key 
groups. To discover which `TaskManager` is 
+responsible for the key group holding `k`, the proxy will ask the 
`JobManager`. Based on the answer, the proxy will 
+then query the `QueryableStateServer` running on that `TaskManager` for 
the state associated with `k`, and forward the
+response back to the client.
+
+## Activating Queryable State
+
+To enable queryable state on your Flink cluster, you just have to copy the 
+`flink-queryable-state-runtime{{ site.scala_version_suffix 
}}-{{site.version }}.jar` 
+from the `opt/` folder of your [Flink 
distribution](https://flink.apache.org/downloads.html "Apache Flink: 
Downloads"), 
+to the `lib/` folder. In other case, the queryable state feature is not 
enabled. 
+
+To verify that your cluster is running with queryable state enabled, check 
the logs of any 
+task manager for the line: `"Started the Queryable State Proxy Server @ 
..."`.
+
 ## Making State Queryable
 
-In order to make state queryable, the queryable state server first needs 
to be enabled globally
-by setting the `query.server.enable` configuration parameter to `true` 
(current default).
-Then appropriate state needs to be made queryable by using either
+Now that you have activated queryable state on your cluster, it is time to 
see how to use it. In order for a state to 
--- End 

[GitHub] flink pull request #4966: [FLINK-7822][FLINK-7823] Adds documentation and fi...

2017-11-07 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/4966#discussion_r149337649
  
--- Diff: docs/dev/stream/state/queryable_state.md ---
@@ -32,38 +32,67 @@ under the License.
   likely that there will be breaking API changes on the client side in the 
upcoming Flink versions.
 
 
-In a nutshell, this feature allows users to query Flink's managed 
partitioned state
-(see [Working with State]({{ site.baseurl }}/dev/stream/state/state.html)) 
from outside of
-Flink. For some scenarios, queryable state thus eliminates the need for 
distributed
-operations/transactions with external systems such as key-value stores 
which are often the
-bottleneck in practice.
+In a nutshell, this feature exposes Flink's managed keyed (partitioned) 
state
+(see [Working with State]({{ site.baseurl }}/dev/stream/state/state.html)) 
to the outside world and 
+allows the user to query a job's state from outside Flink. For some 
scenarios, queryable state 
+eliminates the need for distributed operations/transactions with external 
systems such as key-value 
+stores which are often the bottleneck in practice. In addition, this 
feature may be particularly 
+useful for debugging purposes.
 
 
-  Attention: Queryable state accesses keyed state from a 
concurrent thread rather
-  than synchronizing with the operator and potentially blocking its 
operation. Since any state
-  backend using Java heap space, e.g. MemoryStateBackend or
-  FsStateBackend, does not work with copies when retrieving values but 
instead directly
-  references the stored values, read-modify-write patterns are unsafe and 
may cause the
-  queryable state server to fail due to concurrent modifications.
-  The RocksDBStateBackend is safe from these issues.
+  Attention: When querying a state object, that object is 
accessed from a concurrent 
+  thread without any synchronization or copying. This is a design choice, 
as any of the above would lead
+  to increased job latency, which we wanted to avoid. Since any state 
backend using Java heap space, 
+  e.g. MemoryStateBackend or 
FsStateBackend, does not work 
+  with copies when retrieving values but instead directly references the 
stored values, read-modify-write 
+  patterns are unsafe and may cause the queryable state server to fail due 
to concurrent modifications.
+  The RocksDBStateBackend is safe from these issues.
 
 
+## Architecture
+
+Before showing how to use the Queryable State, it is useful to briefly 
describe the entities that compose it.
+The Queryable State consists of three main entities:
+
+ 1. the `QueryableStateClient`, which (potentially) runs outside the Flink 
cluster and submits the user queries, 
+ 2. the `QueryableStateClientProxy`, which runs on each `TaskManager` 
(*i.e.* inside the Flink cluster) and is responsible 
+ for receiving the client's queries, fetching the requested state on his 
behalf, and returning it to the client, and 
+ 3. the `QueryableStateServer` which runs on each `TaskManager` and is 
responsible for serving the locally stored state.
+ 
+In a nutshell, the client will connect to one of the proxies and send a 
request for the state associated with a specific 
+key, `k`. As stated in [Working with State]({{ site.baseurl 
}}/dev/stream/state/state.html), keyed state is organized in 
+*Key Groups*, and each `TaskManager` is assigned a number of these key 
groups. To discover which `TaskManager` is 
+responsible for the key group holding `k`, the proxy will ask the 
`JobManager`. Based on the answer, the proxy will 
+then query the `QueryableStateServer` running on that `TaskManager` for 
the state associated with `k`, and forward the
+response back to the client.
+
+## Activating Queryable State
+
+To enable queryable state on your Flink cluster, you just have to copy the 
--- End diff --

nit: omit "the"


---


[jira] [Commented] (FLINK-7822) Add documentation for the new queryable state client.

2017-11-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7822?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16241834#comment-16241834
 ] 

ASF GitHub Bot commented on FLINK-7822:
---

GitHub user kl0u opened a pull request:

https://github.com/apache/flink/pull/4966

[FLINK-7822][FLINK-7823] Adds documentation and fixes configuration of QS.

## What is the purpose of the change

*This PR adds documentation for the new queryable state.

R @aljoscha 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/kl0u/flink qs-doc

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4966.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4966


commit 91c7082158e534ffc30894e9f23e6ea6c94fab07
Author: kkloudas 
Date:   2017-11-06T11:43:18Z

[FLINK-7823][QS] Update Queryable State configuration parameters.

commit 2a076d6119eb01a4b4bc48925a37dde1b439dd54
Author: kkloudas 
Date:   2017-11-06T16:21:45Z

[FLINK-7822][QS][doc] Update Queryable State docs.




> Add documentation for the new queryable state client.
> -
>
> Key: FLINK-7822
> URL: https://issues.apache.org/jira/browse/FLINK-7822
> Project: Flink
>  Issue Type: Sub-task
>  Components: Documentation, Queryable State
>Affects Versions: 1.4.0
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
>Priority: Blocker
> Fix For: 1.4.0
>
>
> This has to include:
> 1) how to enable queryable state (jars)...
> 2) configuration parameters
> 3) new APIs
> 4) Guarantees/Semantics/Limitations (e.g. memory backend)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4966: [FLINK-7822][FLINK-7823] Adds documentation and fi...

2017-11-07 Thread kl0u
GitHub user kl0u opened a pull request:

https://github.com/apache/flink/pull/4966

[FLINK-7822][FLINK-7823] Adds documentation and fixes configuration of QS.

## What is the purpose of the change

*This PR adds documentation for the new queryable state.

R @aljoscha 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/kl0u/flink qs-doc

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4966.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4966


commit 91c7082158e534ffc30894e9f23e6ea6c94fab07
Author: kkloudas 
Date:   2017-11-06T11:43:18Z

[FLINK-7823][QS] Update Queryable State configuration parameters.

commit 2a076d6119eb01a4b4bc48925a37dde1b439dd54
Author: kkloudas 
Date:   2017-11-06T16:21:45Z

[FLINK-7822][QS][doc] Update Queryable State docs.




---


[jira] [Created] (FLINK-8006) flink-daemon.sh: line 103: binary operator expected

2017-11-07 Thread Alejandro (JIRA)
Alejandro  created FLINK-8006:
-

 Summary: flink-daemon.sh: line 103: binary operator expected
 Key: FLINK-8006
 URL: https://issues.apache.org/jira/browse/FLINK-8006
 Project: Flink
  Issue Type: Bug
  Components: Startup Shell Scripts
Affects Versions: 1.3.2
 Environment: Linux 4.12.12-gentoo #2 SMP x86_64 Intel(R) Core(TM) 
i3-3110M CPU @ 2.40GHz GenuineIntel GNU/Linux
Reporter: Alejandro 


When executing `./bin/start-local.sh` I get

flink-1.3.2/bin/flink-daemon.sh: line 79: $pid: ambiguous redirect
flink-1.3.2/bin/flink-daemon.sh: line 103: [: /tmp/flink-Alejandro: binary 
operator expected

I solved the problem replacing $pid by "$pid" in lines 79 and 103.

Should I make a PR to the repo?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4964: [hotfix][docs] Add type for numLateRecordsDropped ...

2017-11-07 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4964


---


[jira] [Commented] (FLINK-8004) Sample code in Debugging & Monitoring Metrics documentation section is not valid java

2017-11-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8004?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16241815#comment-16241815
 ] 

ASF GitHub Bot commented on FLINK-8004:
---

GitHub user zentol opened a pull request:

https://github.com/apache/flink/pull/4965

[FLINK-8004][metrics][docs] Fix usage examples

## What is the purpose of the change

This PR fixes several issues in the metric usage examples in the 
documentation.

## Brief change log
* remove wrong `@public` sections with a proper `@Override` a dpublic 
modifier
* stream line all examples to just return the input (required adjusting 
types)
* added missing return statements
* modified all examples to override all required methods (open() & map())


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zentol/flink 8004

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4965.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4965


commit 984473a58a70daf09d7d0cc2f409c05d391a4be1
Author: zentol 
Date:   2017-11-07T10:40:15Z

[FLINK-8004][metrics][docs] Fix usage examples




> Sample code in Debugging & Monitoring Metrics documentation section is not 
> valid java 
> --
>
> Key: FLINK-8004
> URL: https://issues.apache.org/jira/browse/FLINK-8004
> Project: Flink
>  Issue Type: Improvement
>Reporter: Andreas Hecke
>Assignee: Chesnay Schepler
>Priority: Minor
>
> Hi, we have been stumbled about some documentation inconsistencies in how to 
> use metrics in flink. Seems there is some invalid java code posted as samples 
> like having methods declared as @public and missing return statements, see 
> [here|https://ci.apache.org/projects/flink/flink-docs-release-1.3/monitoring/metrics.html#counter]
> I raised a question on 
> [SO|https://stackoverflow.com/questions/47153424/what-does-public-modifier-mean-in-method-signature]
>  and Fabian asked me to open a Jira issue



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4965: [FLINK-8004][metrics][docs] Fix usage examples

2017-11-07 Thread zentol
GitHub user zentol opened a pull request:

https://github.com/apache/flink/pull/4965

[FLINK-8004][metrics][docs] Fix usage examples

## What is the purpose of the change

This PR fixes several issues in the metric usage examples in the 
documentation.

## Brief change log
* remove wrong `@public` sections with a proper `@Override` a dpublic 
modifier
* stream line all examples to just return the input (required adjusting 
types)
* added missing return statements
* modified all examples to override all required methods (open() & map())


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zentol/flink 8004

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4965.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4965


commit 984473a58a70daf09d7d0cc2f409c05d391a4be1
Author: zentol 
Date:   2017-11-07T10:40:15Z

[FLINK-8004][metrics][docs] Fix usage examples




---


[jira] [Updated] (FLINK-8005) Snapshotting FlinkKafkaProducer011 fails due to ClassLoader issues

2017-11-07 Thread Gary Yao (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8005?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gary Yao updated FLINK-8005:

Fix Version/s: 1.4.0

> Snapshotting FlinkKafkaProducer011 fails due to ClassLoader issues
> --
>
> Key: FLINK-8005
> URL: https://issues.apache.org/jira/browse/FLINK-8005
> Project: Flink
>  Issue Type: Bug
>  Components: Core, Kafka Connector, State Backends, Checkpointing
>Affects Versions: 1.4.0
>Reporter: Gary Yao
>Priority: Blocker
> Fix For: 1.4.0
>
>
> *Problem Description*
> Classes in the user code jar cannot be loaded by the snapshot thread’s 
> context class loader ({{AppClassLoader}}).
> For example, when creating instances of {{KafkaProducer}}, Strings are 
> resolved to class objects by Kafka.
> Find below an extract from {{ConfigDef.java}}:
> {code}
> case CLASS:
> if (value instanceof Class)
> return value;
> else if (value instanceof String)
> return Class.forName(trimmed, true, 
> Utils.getContextOrKafkaClassLoader());
> else
> throw new ConfigException(name, value, "Expected a Class instance or 
> class name.");
> {code}
> *Exception/Stacktrace*
> {noformat}
> Caused by: java.lang.Exception: Could not complete snapshot 1 for operator 
> Source: Collection Source -> Sink: kafka-sink-1510048188383 (1/1).
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:379)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1077)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1026)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:659)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:595)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:526)
>   ... 7 more
> Caused by: org.apache.kafka.common.config.ConfigException: Invalid value 
> org.apache.kafka.common.serialization.ByteArraySerializer for configuration 
> key.serializer: Class 
> org.apache.kafka.common.serialization.ByteArraySerializer could not be found.
>   at 
> org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:715)
>   at 
> org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:460)
>   at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:453)
>   at 
> org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:62)
>   at 
> org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:75)
>   at 
> org.apache.kafka.clients.producer.ProducerConfig.(ProducerConfig.java:360)
>   at 
> org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:288)
>   at 
> org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer.(FlinkKafkaProducer.java:114)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initProducer(FlinkKafkaProducer011.java:913)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initTransactionalProducer(FlinkKafkaProducer011.java:904)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.createOrGetProducerFromPool(FlinkKafkaProducer011.java:637)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:613)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:94)
>   at 
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.beginTransactionInternal(TwoPhaseCommitSinkFunction.java:359)
>   at 
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.snapshotState(TwoPhaseCommitSinkFunction.java:294)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.snapshotState(FlinkKafkaProducer011.java:756)
>   at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118)
>   at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99)
>   at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:357)
>   ... 12 more
> {noformat}
> *How to reproduce*
> Note that the problem only appears when a job is deployed on a cluster. 
> # Build Flink 1.4
> # Build test job 

[jira] [Updated] (FLINK-8005) Snapshotting FlinkKafkaProducer011 fails due to ClassLoader issues

2017-11-07 Thread Gary Yao (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8005?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gary Yao updated FLINK-8005:

Description: 
*Problem Description*
Classes in the user code jar cannot be loaded by the snapshot thread’s context 
class loader ({{AppClassLoader}}).
For example, when creating instances of {{KafkaProducer}}, Strings are resolved 
to class objects by Kafka.
Find below an extract from {{ConfigDef.java}}:
{code}
case CLASS:
if (value instanceof Class)
return value;
else if (value instanceof String)
return Class.forName(trimmed, true, 
Utils.getContextOrKafkaClassLoader());
else
throw new ConfigException(name, value, "Expected a Class instance or 
class name.");
{code}

*Exception/Stacktrace*
{noformat}
Caused by: java.lang.Exception: Could not complete snapshot 1 for operator 
Source: Collection Source -> Sink: kafka-sink-1510048188383 (1/1).
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:379)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1077)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1026)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:659)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:595)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:526)
... 7 more
Caused by: org.apache.kafka.common.config.ConfigException: Invalid value 
org.apache.kafka.common.serialization.ByteArraySerializer for configuration 
key.serializer: Class org.apache.kafka.common.serialization.ByteArraySerializer 
could not be found.
at 
org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:715)
at 
org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:460)
at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:453)
at 
org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:62)
at 
org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:75)
at 
org.apache.kafka.clients.producer.ProducerConfig.(ProducerConfig.java:360)
at 
org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:288)
at 
org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer.(FlinkKafkaProducer.java:114)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initProducer(FlinkKafkaProducer011.java:913)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initTransactionalProducer(FlinkKafkaProducer011.java:904)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.createOrGetProducerFromPool(FlinkKafkaProducer011.java:637)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:613)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:94)
at 
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.beginTransactionInternal(TwoPhaseCommitSinkFunction.java:359)
at 
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.snapshotState(TwoPhaseCommitSinkFunction.java:294)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.snapshotState(FlinkKafkaProducer011.java:756)
at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118)
at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99)
at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:357)
... 12 more
{noformat}

*How to reproduce*
Note that the problem only appears when a job is deployed on a cluster. 
# Build Flink 1.4
# Build test job https://github.com/GJL/flink-kafka011-producer-test with {{mvn 
-o clean install -Pbuild-jar}}
# Start job:
{noformat}
bin/flink run -c com.garyyao.StreamingJob 
/pathto/flink-kafka011-producer/target/flink-kafka011-producer-1.0-SNAPSHOT.jar
{noformat}




  was:
*Problem Description*
Classes in the user code jar cannot be loaded by the snapshot thread’s context 
class loader.
For example, when creating instances of {{KafkaProducer}}, Strings are resolved 
to class objects by Kafka.
Find below an extract from {{ConfigDef.java}}:
{code}
case CLASS:
if (value instanceof Class)
return value;
else if 

[jira] [Commented] (FLINK-7419) Shade jackson dependency in flink-avro

2017-11-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7419?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16241812#comment-16241812
 ] 

ASF GitHub Bot commented on FLINK-7419:
---

Github user zjureel closed the pull request at:

https://github.com/apache/flink/pull/4524


> Shade jackson dependency in flink-avro
> --
>
> Key: FLINK-7419
> URL: https://issues.apache.org/jira/browse/FLINK-7419
> Project: Flink
>  Issue Type: Sub-task
>  Components: Build System
>Reporter: Stephan Ewen
>Assignee: Fang Yong
>Priority: Blocker
> Fix For: 1.4.0
>
>
> Avro uses {{org.codehouse.jackson}} which also exists in multiple 
> incompatible versions. We should shade it to 
> {{org.apache.flink.shaded.avro.org.codehouse.jackson}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4524: [FLINK-7419] Shade jackson dependency in flink-avr...

2017-11-07 Thread zjureel
Github user zjureel closed the pull request at:

https://github.com/apache/flink/pull/4524


---


[jira] [Updated] (FLINK-8004) Sample code in Debugging & Monitoring Metrics documentation section is not valid java

2017-11-07 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8004?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler updated FLINK-8004:

Description: 
Hi, we have been stumbled about some documentation inconsistencies in how to 
use metrics in flink. Seems there is some invalid java code posted as samples 
like having methods declared as @public and missing return statements, see 
[here|https://ci.apache.org/projects/flink/flink-docs-release-1.3/monitoring/metrics.html#counter]
I raised a question on 
[SO|https://stackoverflow.com/questions/47153424/what-does-public-modifier-mean-in-method-signature]
 and Fabian asked me to open a Jira issue

  was:
Hi, we have been stumbled about some documentation inconsistencies in how to 
use metrics in flink. Seems there is some invalid java code posted as samples 
like having methods declared as @public and missing return statements, see 
[here|https://ci.apache.org/projects/flink/flink-docs-release-1.3/monitoring/metrics.html#counter]
I raised a question on 
[SO|https://ci.apache.org/projects/flink/flink-docs-release-1.3/monitoring/metrics.html#counter]
 and Fabian asked me to open a Jira issue


> Sample code in Debugging & Monitoring Metrics documentation section is not 
> valid java 
> --
>
> Key: FLINK-8004
> URL: https://issues.apache.org/jira/browse/FLINK-8004
> Project: Flink
>  Issue Type: Improvement
>Reporter: Andreas Hecke
>Assignee: Chesnay Schepler
>Priority: Minor
>
> Hi, we have been stumbled about some documentation inconsistencies in how to 
> use metrics in flink. Seems there is some invalid java code posted as samples 
> like having methods declared as @public and missing return statements, see 
> [here|https://ci.apache.org/projects/flink/flink-docs-release-1.3/monitoring/metrics.html#counter]
> I raised a question on 
> [SO|https://stackoverflow.com/questions/47153424/what-does-public-modifier-mean-in-method-signature]
>  and Fabian asked me to open a Jira issue



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Comment Edited] (FLINK-8004) Sample code in Debugging & Monitoring Metrics documentation section is not valid java

2017-11-07 Thread Chesnay Schepler (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8004?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16241798#comment-16241798
 ] 

Chesnay Schepler edited comment on FLINK-8004 at 11/7/17 10:33 AM:
---

-Could you add the SO link? Both your links point to the documentation.-

Found it.


was (Author: zentol):
Could you add the SO link? Both your links point to the documentation.

> Sample code in Debugging & Monitoring Metrics documentation section is not 
> valid java 
> --
>
> Key: FLINK-8004
> URL: https://issues.apache.org/jira/browse/FLINK-8004
> Project: Flink
>  Issue Type: Improvement
>Reporter: Andreas Hecke
>Assignee: Chesnay Schepler
>Priority: Minor
>
> Hi, we have been stumbled about some documentation inconsistencies in how to 
> use metrics in flink. Seems there is some invalid java code posted as samples 
> like having methods declared as @public and missing return statements, see 
> [here|https://ci.apache.org/projects/flink/flink-docs-release-1.3/monitoring/metrics.html#counter]
> I raised a question on 
> [SO|https://stackoverflow.com/questions/47153424/what-does-public-modifier-mean-in-method-signature]
>  and Fabian asked me to open a Jira issue



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-8004) Sample code in Debugging & Monitoring Metrics documentation section is not valid java

2017-11-07 Thread Andreas Hecke (JIRA)
Andreas Hecke created FLINK-8004:


 Summary: Sample code in Debugging & Monitoring Metrics 
documentation section is not valid java 
 Key: FLINK-8004
 URL: https://issues.apache.org/jira/browse/FLINK-8004
 Project: Flink
  Issue Type: Improvement
Reporter: Andreas Hecke
Priority: Minor


Hi, we have been stumbled about some documentation inconsistencies in how to 
use metrics in flink. Seems there is some invalid java code posted as samples 
like having methods declared as @public and missing return statements, see 
[here|https://ci.apache.org/projects/flink/flink-docs-release-1.3/monitoring/metrics.html#counter]
I raised a question on 
[SO|https://ci.apache.org/projects/flink/flink-docs-release-1.3/monitoring/metrics.html#counter]
 and Fabian asked me to open a Jira issue



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7978) Kafka011 exactly-once Producer sporadically fails to commit under high parallelism

2017-11-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7978?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16241799#comment-16241799
 ] 

ASF GitHub Bot commented on FLINK-7978:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/4955#discussion_r149326628
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/TransactionalIdsGenerator.java
 ---
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internal;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Class responsible for generating transactional ids to use when 
communicating with Kafka.
+ *
+ * It guarantees that:
+ * - generated ids to use will never clash with ids to use from different 
subtasks
+ * - generated ids to abort will never clash with ids to abort from 
different subtasks
+ * - generated ids to use will never clash with ids to abort from 
different subtasks
+ *
+ * In other words, any particular generated id will always be assigned 
to one and only one subtask.
+ */
+public class TransactionalIdsGenerator {
--- End diff --

another nitpick : `` tags are usually not closed in Javadoc: 
http://www.oracle.com/technetwork/articles/java/index-137868.html


> Kafka011 exactly-once Producer sporadically fails to commit under high 
> parallelism
> --
>
> Key: FLINK-7978
> URL: https://issues.apache.org/jira/browse/FLINK-7978
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: Gary Yao
>Assignee: Piotr Nowojski
>Priority: Blocker
> Fix For: 1.4.0
>
>
> The Kafka011 exactly-once producer sporadically fails to commit/confirm the 
> first checkpoint. The behavior seems to be easier reproduced under high job 
> parallelism.
> *Logs/Stacktrace*
> {noformat}
> 10:24:35,347 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator  
>- Completed checkpoint 1 (191029 bytes in 1435 ms).
> 10:24:35,349 INFO  
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction  - 
> FlinkKafkaProducer011 2/32 - checkpoint 1 complete, committing transaction 
> TransactionHolder{handle=KafkaTransactionState [transactionalId=Sink: 
> kafka-sink-1509787467330-12], transactionStartTime=1509787474588} from 
> checkpoint 1
> 10:24:35,349 INFO  
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction  - 
> FlinkKafkaProducer011 1/32 - checkpoint 1 complete, committing transaction 
> TransactionHolder{handle=KafkaTransactionState [transactionalId=Sink: 
> kafka-sink-1509787467330-8], transactionStartTime=1509787474393} from 
> checkpoint 1
> 10:24:35,349 INFO  
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction  - 
> FlinkKafkaProducer011 0/32 - checkpoint 1 complete, committing transaction 
> TransactionHolder{handle=KafkaTransactionState [transactionalId=Sink: 
> kafka-sink-1509787467330-4], transactionStartTime=1509787474448} from 
> checkpoint 1
> 10:24:35,350 INFO  
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction  - 
> FlinkKafkaProducer011 6/32 - checkpoint 1 complete, committing transaction 
> TransactionHolder{handle=KafkaTransactionState [transactionalId=Sink: 
> kafka-sink-1509787467330-34], transactionStartTime=1509787474742} from 
> checkpoint 1
> 10:24:35,350 INFO  
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction  - 
> FlinkKafkaProducer011 4/32 - checkpoint 1 complete, committing transaction 
> TransactionHolder{handle=KafkaTransactionState [transactionalId=Sink: 
> kafka-sink-1509787467330-23], transactionStartTime=1509787474777} from 
> checkpoint 1
> 

[jira] [Commented] (FLINK-8004) Sample code in Debugging & Monitoring Metrics documentation section is not valid java

2017-11-07 Thread Chesnay Schepler (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8004?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16241798#comment-16241798
 ] 

Chesnay Schepler commented on FLINK-8004:
-

Could you add the SO link? Both your links point to the documentation.

> Sample code in Debugging & Monitoring Metrics documentation section is not 
> valid java 
> --
>
> Key: FLINK-8004
> URL: https://issues.apache.org/jira/browse/FLINK-8004
> Project: Flink
>  Issue Type: Improvement
>Reporter: Andreas Hecke
>Assignee: Chesnay Schepler
>Priority: Minor
>
> Hi, we have been stumbled about some documentation inconsistencies in how to 
> use metrics in flink. Seems there is some invalid java code posted as samples 
> like having methods declared as @public and missing return statements, see 
> [here|https://ci.apache.org/projects/flink/flink-docs-release-1.3/monitoring/metrics.html#counter]
> I raised a question on 
> [SO|https://ci.apache.org/projects/flink/flink-docs-release-1.3/monitoring/metrics.html#counter]
>  and Fabian asked me to open a Jira issue



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4955: [FLINK-7978][kafka] Ensure that transactional ids ...

2017-11-07 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/4955#discussion_r149326628
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/TransactionalIdsGenerator.java
 ---
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internal;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Class responsible for generating transactional ids to use when 
communicating with Kafka.
+ *
+ * It guarantees that:
+ * - generated ids to use will never clash with ids to use from different 
subtasks
+ * - generated ids to abort will never clash with ids to abort from 
different subtasks
+ * - generated ids to use will never clash with ids to abort from 
different subtasks
+ *
+ * In other words, any particular generated id will always be assigned 
to one and only one subtask.
+ */
+public class TransactionalIdsGenerator {
--- End diff --

another nitpick 😉: `` tags are usually not closed in Javadoc: 
http://www.oracle.com/technetwork/articles/java/index-137868.html


---


[jira] [Created] (FLINK-8005) Snapshotting FlinkKafkaProducer011 fails due to ClassLoader issues

2017-11-07 Thread Gary Yao (JIRA)
Gary Yao created FLINK-8005:
---

 Summary: Snapshotting FlinkKafkaProducer011 fails due to 
ClassLoader issues
 Key: FLINK-8005
 URL: https://issues.apache.org/jira/browse/FLINK-8005
 Project: Flink
  Issue Type: Bug
  Components: Core, Kafka Connector, State Backends, Checkpointing
Affects Versions: 1.4.0
Reporter: Gary Yao
Priority: Blocker


*Problem Description*
Classes in the user code jar cannot be loaded by the snapshot thread’s context 
class loader.
For example, when creating instances of {{KafkaProducer}}, Strings are resolved 
to class objects by Kafka.
Find below an extract from {{ConfigDef.java}}:
{code}
case CLASS:
if (value instanceof Class)
return value;
else if (value instanceof String)
return Class.forName(trimmed, true, 
Utils.getContextOrKafkaClassLoader());
else
throw new ConfigException(name, value, "Expected a Class instance or 
class name.");
{code}

*Exception/Stacktrace*
{noformat}
Caused by: java.lang.Exception: Could not complete snapshot 1 for operator 
Source: Collection Source -> Sink: kafka-sink-1510048188383 (1/1).
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:379)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1077)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1026)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:659)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:595)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:526)
... 7 more
Caused by: org.apache.kafka.common.config.ConfigException: Invalid value 
org.apache.kafka.common.serialization.ByteArraySerializer for configuration 
key.serializer: Class org.apache.kafka.common.serialization.ByteArraySerializer 
could not be found.
at 
org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:715)
at 
org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:460)
at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:453)
at 
org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:62)
at 
org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:75)
at 
org.apache.kafka.clients.producer.ProducerConfig.(ProducerConfig.java:360)
at 
org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:288)
at 
org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer.(FlinkKafkaProducer.java:114)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initProducer(FlinkKafkaProducer011.java:913)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initTransactionalProducer(FlinkKafkaProducer011.java:904)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.createOrGetProducerFromPool(FlinkKafkaProducer011.java:637)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:613)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:94)
at 
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.beginTransactionInternal(TwoPhaseCommitSinkFunction.java:359)
at 
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.snapshotState(TwoPhaseCommitSinkFunction.java:294)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.snapshotState(FlinkKafkaProducer011.java:756)
at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118)
at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99)
at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:357)
... 12 more
{noformat}

*How to reproduce*
Note that the problem only appears when a job is deployed on a cluster. 
# Build Flink 1.4
# Build test job https://github.com/GJL/flink-kafka011-producer-test with {{mvn 
-o clean install -Pbuild-jar}}
# Start job:
{noformat}
bin/flink run -c com.garyyao.StreamingJob 
/pathto/flink-kafka011-producer/target/flink-kafka011-producer-1.0-SNAPSHOT.jar
{noformat}






--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (FLINK-8004) Sample code in Debugging & Monitoring Metrics documentation section is not valid java

2017-11-07 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8004?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler reassigned FLINK-8004:
---

Assignee: Chesnay Schepler

> Sample code in Debugging & Monitoring Metrics documentation section is not 
> valid java 
> --
>
> Key: FLINK-8004
> URL: https://issues.apache.org/jira/browse/FLINK-8004
> Project: Flink
>  Issue Type: Improvement
>Reporter: Andreas Hecke
>Assignee: Chesnay Schepler
>Priority: Minor
>
> Hi, we have been stumbled about some documentation inconsistencies in how to 
> use metrics in flink. Seems there is some invalid java code posted as samples 
> like having methods declared as @public and missing return statements, see 
> [here|https://ci.apache.org/projects/flink/flink-docs-release-1.3/monitoring/metrics.html#counter]
> I raised a question on 
> [SO|https://ci.apache.org/projects/flink/flink-docs-release-1.3/monitoring/metrics.html#counter]
>  and Fabian asked me to open a Jira issue



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7773) Test instability in UtilsTest#testYarnFlinkResourceManagerJobManagerLostLeadership

2017-11-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7773?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16241788#comment-16241788
 ] 

ASF GitHub Bot commented on FLINK-7773:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/4957
  
+1


> Test instability in 
> UtilsTest#testYarnFlinkResourceManagerJobManagerLostLeadership
> --
>
> Key: FLINK-7773
> URL: https://issues.apache.org/jira/browse/FLINK-7773
> Project: Flink
>  Issue Type: Bug
>  Components: Tests, YARN
>Affects Versions: 1.4.0
>Reporter: Nico Kruber
>Assignee: Till Rohrmann
>Priority: Critical
>  Labels: test-stability
> Fix For: 1.4.0
>
>
> {{UtilsTest#testYarnFlinkResourceManagerJobManagerLostLeadership}} may result 
> in the following exception (repeated run in IntelliJ until failure, but also 
> on Travis here: https://travis-ci.org/NicoK/flink/jobs/283696974 )
> {code}
> org.apache.flink.yarn.UtilsTest "Until Failure"
> org.mockito.exceptions.misusing.UnfinishedStubbingException: 
> Unfinished stubbing detected here:
> -> at org.apache.flink.yarn.UtilsTest$1.(UtilsTest.java:171)
> E.g. thenReturn() may be missing.
> Examples of correct stubbing:
> when(mock.isOk()).thenReturn(true);
> when(mock.isOk()).thenThrow(exception);
> doThrow(exception).when(mock).someVoidMethod();
> Hints:
>  1. missing thenReturn()
>  2. you are trying to stub a final method, you naughty developer!
>  3: you are stubbing the behaviour of another mock inside before 'thenReturn' 
> instruction if completed
>   at org.apache.flink.yarn.UtilsTest$1.(UtilsTest.java:179)
>   at 
> org.apache.flink.yarn.UtilsTest.testYarnFlinkResourceManagerJobManagerLostLeadership(UtilsTest.java:95)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
>   at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
>   at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>   at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>   at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
>   at 
> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
>   at 
> com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:67)
>   at 
> com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
>   at 
> com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
> {code}
> The incriminating code is this:
> {code}
> doAnswer(new Answer() {
>   @Override
>   public Object answer(InvocationOnMock invocation) throws Throwable {
>   Container container = (Container) invocation.getArguments()[0];
>   resourceManagerGateway.tell(new 
> NotifyResourceStarted(YarnFlinkResourceManager.extractResourceID(container)),
>   leader1Gateway);
>   return null;
>   }
> }).when(nodeManagerClient).startContainer(Matchers.any(Container.class), 
> Matchers.any(ContainerLaunchContext.class));
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4957: [FLINK-7773] [tests] Move all mocking before testing code...

2017-11-07 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/4957
  
+1


---


[jira] [Commented] (FLINK-8000) Sort REST handler URLs in RestServerEndpoint

2017-11-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8000?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16241794#comment-16241794
 ] 

ASF GitHub Bot commented on FLINK-8000:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4958#discussion_r149325305
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
 ---
@@ -268,4 +287,69 @@ private static void registerHandler(Router router, 
Tuple2The comparator orders the Rest URLs such that URLs with path 
parameters are ordered behind
+* those without parameters. E.g.:
+* /jobs
+* /jobs/overview
+* /jobs/:jobid
+* /jobs/:jobid/config
+* /:*
+*
+* IMPORTANT: This comparator is highly specific to how Netty path 
parameter are encoded. Namely
+* via a preceding ':' character.
+*/
+   static final class RestHandlerUrlComparator implements 
Comparator>, 
Serializable {
+
+   private static final long serialVersionUID = 
2388466767835547926L;
+
+   private static final Comparator CASE_INSENSITIVE_ORDER 
= new CaseInsensitiveOrderComparator();
+
+   static final RestHandlerUrlComparator INSTANCE = new 
RestHandlerUrlComparator();
+
+   @Override
+   public int compare(
+   Tuple2 o1,
+   Tuple2 o2) {
+   return 
CASE_INSENSITIVE_ORDER.compare(o1.f0.getTargetRestEndpointURL(), 
o2.f0.getTargetRestEndpointURL());
+   }
+
+   static final class CaseInsensitiveOrderComparator implements 
Comparator, Serializable {
+   private static final long serialVersionUID = 
8550835445193437027L;
+
+   @Override
+   public int compare(String s1, String s2) {
+   int n1 = s1.length();
+   int n2 = s2.length();
+   int min = Math.min(n1, n2);
+   for (int i = 0; i < min; i++) {
+   char c1 = s1.charAt(i);
+   char c2 = s2.charAt(i);
+   if (c1 != c2) {
+   c1 = Character.toUpperCase(c1);
+   c2 = Character.toUpperCase(c2);
+   if (c1 != c2) {
+   c1 = 
Character.toLowerCase(c1);
--- End diff --

why are we checking both lower case (example edge case)?


> Sort REST handler URLs in RestServerEndpoint
> 
>
> Key: FLINK-8000
> URL: https://issues.apache.org/jira/browse/FLINK-8000
> Project: Flink
>  Issue Type: Improvement
>  Components: REST
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
>  Labels: flip-6
>
> In order to make the {{RestServerEndpoint}} more easily extendable, we should 
> automatically sort the returned list of rest handler when calling 
> {{RestServerEndpoint#initializeHandlers}}. That way the order in which the 
> handlers are added to the list is independent of the actual registration 
> order. This is, for example, important for the static file server which 
> always needs to be registered last.
> I propose to add a special {{String}} {{Comparator}} which considers the 
> charactor {{':'}} to be the character with the largest value. That way we 
> should get always the following sort order:
> - URLs without path parameters have precedence over similar URLs where parts 
> are replaced by path parameters (e.g. {{/jobs/overview}}, {{/jobs/:jobid}} 
> and {{/jobs/:jobid/config}}, {{/jobs/:jobid/vertices/:vertexId}})
> - Prefixes are sorted before URLs containing the prefix (e.g. {{/jobs}}, 
> {{/jobs/overview}})



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4958: [FLINK-8000] Sort Rest handler URLS in RestServerE...

2017-11-07 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4958#discussion_r149325305
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
 ---
@@ -268,4 +287,69 @@ private static void registerHandler(Router router, 
Tuple2The comparator orders the Rest URLs such that URLs with path 
parameters are ordered behind
+* those without parameters. E.g.:
+* /jobs
+* /jobs/overview
+* /jobs/:jobid
+* /jobs/:jobid/config
+* /:*
+*
+* IMPORTANT: This comparator is highly specific to how Netty path 
parameter are encoded. Namely
+* via a preceding ':' character.
+*/
+   static final class RestHandlerUrlComparator implements 
Comparator>, 
Serializable {
+
+   private static final long serialVersionUID = 
2388466767835547926L;
+
+   private static final Comparator CASE_INSENSITIVE_ORDER 
= new CaseInsensitiveOrderComparator();
+
+   static final RestHandlerUrlComparator INSTANCE = new 
RestHandlerUrlComparator();
+
+   @Override
+   public int compare(
+   Tuple2 o1,
+   Tuple2 o2) {
+   return 
CASE_INSENSITIVE_ORDER.compare(o1.f0.getTargetRestEndpointURL(), 
o2.f0.getTargetRestEndpointURL());
+   }
+
+   static final class CaseInsensitiveOrderComparator implements 
Comparator, Serializable {
+   private static final long serialVersionUID = 
8550835445193437027L;
+
+   @Override
+   public int compare(String s1, String s2) {
+   int n1 = s1.length();
+   int n2 = s2.length();
+   int min = Math.min(n1, n2);
+   for (int i = 0; i < min; i++) {
+   char c1 = s1.charAt(i);
+   char c2 = s2.charAt(i);
+   if (c1 != c2) {
+   c1 = Character.toUpperCase(c1);
+   c2 = Character.toUpperCase(c2);
+   if (c1 != c2) {
+   c1 = 
Character.toLowerCase(c1);
--- End diff --

why are we checking both lower case (example edge case)?


---


[jira] [Resolved] (FLINK-7985) Update findbugs-maven-plugin version to 3.0.2

2017-11-07 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7985?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler resolved FLINK-7985.
-
   Resolution: Not A Problem
Fix Version/s: (was: 1.5.0)

> Update findbugs-maven-plugin version to 3.0.2
> -
>
> Key: FLINK-7985
> URL: https://issues.apache.org/jira/browse/FLINK-7985
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Affects Versions: 1.4.0
>Reporter: Hai Zhou UTC+8
>Assignee: Hai Zhou UTC+8
>
> The findbug version used by flink is pretty old (1.3.9). The old version of 
> Findbugs itself have some bugs (like 
> http://sourceforge.net/p/findbugs/bugs/918/, hit by HADOOP-10474). and the 
> latest version 3.0.2 fixed the "Missing test classes" issue 
> (https://github.com/gleclaire/findbugs-maven-plugin/issues/15).



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4955: [FLINK-7978][kafka] Ensure that transactional ids ...

2017-11-07 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4955#discussion_r149322930
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/TransactionalIdsGenerator.java
 ---
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internal;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Class responsible for generating transactional ids to use when 
communicating with Kafka.
+ *
+ * It guarantees that:
+ * - generated ids to use will never clash with ids to use from different 
subtasks
+ * - generated ids to abort will never clash with ids to abort from 
different subtasks
+ * - generated ids to use will never clash with ids to abort from 
different subtasks
+ *
+ * In other words, any particular generated id will always be assigned 
to one and only one subtask.
+ */
+public class TransactionalIdsGenerator {
--- End diff --

ops, forgot about it


---


[jira] [Commented] (FLINK-7998) Make case classes in TPCHQuery3.java public to allow dynamic instantiation

2017-11-07 Thread Chesnay Schepler (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16241787#comment-16241787
 ] 

Chesnay Schepler commented on FLINK-7998:
-

Could you expand a bit more under which circumstances this is a problem? Is the 
example straight-up not working?

> Make case classes in TPCHQuery3.java public to allow dynamic instantiation
> --
>
> Key: FLINK-7998
> URL: https://issues.apache.org/jira/browse/FLINK-7998
> Project: Flink
>  Issue Type: Bug
>  Components: Examples
>Affects Versions: 1.3.2
>Reporter: Keren Zhu
>Priority: Minor
>  Labels: easyfix
>   Original Estimate: 5m
>  Remaining Estimate: 5m
>
> Case classes Lineitem, Customer and Order in example TPCHQuery3.java are set 
> to private. This causes an IllegalAccessException exception because of 
> reflection check in dynamic class instantiation. Making them public resolves 
> the problem (which is what implicitly suggested by _case class_ in 
> TPCHQuery3.scala)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4960: Update version to 1.5-SNAPSHOT

2017-11-07 Thread Aitozi
Github user Aitozi commented on the issue:

https://github.com/apache/flink/pull/4960
  
🎉 


---


[jira] [Commented] (FLINK-7978) Kafka011 exactly-once Producer sporadically fails to commit under high parallelism

2017-11-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7978?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16241783#comment-16241783
 ] 

ASF GitHub Bot commented on FLINK-7978:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4955#discussion_r149322930
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/TransactionalIdsGenerator.java
 ---
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internal;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Class responsible for generating transactional ids to use when 
communicating with Kafka.
+ *
+ * It guarantees that:
+ * - generated ids to use will never clash with ids to use from different 
subtasks
+ * - generated ids to abort will never clash with ids to abort from 
different subtasks
+ * - generated ids to use will never clash with ids to abort from 
different subtasks
+ *
+ * In other words, any particular generated id will always be assigned 
to one and only one subtask.
+ */
+public class TransactionalIdsGenerator {
--- End diff --

ops, forgot about it


> Kafka011 exactly-once Producer sporadically fails to commit under high 
> parallelism
> --
>
> Key: FLINK-7978
> URL: https://issues.apache.org/jira/browse/FLINK-7978
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: Gary Yao
>Assignee: Piotr Nowojski
>Priority: Blocker
> Fix For: 1.4.0
>
>
> The Kafka011 exactly-once producer sporadically fails to commit/confirm the 
> first checkpoint. The behavior seems to be easier reproduced under high job 
> parallelism.
> *Logs/Stacktrace*
> {noformat}
> 10:24:35,347 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator  
>- Completed checkpoint 1 (191029 bytes in 1435 ms).
> 10:24:35,349 INFO  
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction  - 
> FlinkKafkaProducer011 2/32 - checkpoint 1 complete, committing transaction 
> TransactionHolder{handle=KafkaTransactionState [transactionalId=Sink: 
> kafka-sink-1509787467330-12], transactionStartTime=1509787474588} from 
> checkpoint 1
> 10:24:35,349 INFO  
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction  - 
> FlinkKafkaProducer011 1/32 - checkpoint 1 complete, committing transaction 
> TransactionHolder{handle=KafkaTransactionState [transactionalId=Sink: 
> kafka-sink-1509787467330-8], transactionStartTime=1509787474393} from 
> checkpoint 1
> 10:24:35,349 INFO  
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction  - 
> FlinkKafkaProducer011 0/32 - checkpoint 1 complete, committing transaction 
> TransactionHolder{handle=KafkaTransactionState [transactionalId=Sink: 
> kafka-sink-1509787467330-4], transactionStartTime=1509787474448} from 
> checkpoint 1
> 10:24:35,350 INFO  
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction  - 
> FlinkKafkaProducer011 6/32 - checkpoint 1 complete, committing transaction 
> TransactionHolder{handle=KafkaTransactionState [transactionalId=Sink: 
> kafka-sink-1509787467330-34], transactionStartTime=1509787474742} from 
> checkpoint 1
> 10:24:35,350 INFO  
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction  - 
> FlinkKafkaProducer011 4/32 - checkpoint 1 complete, committing transaction 
> TransactionHolder{handle=KafkaTransactionState [transactionalId=Sink: 
> kafka-sink-1509787467330-23], transactionStartTime=1509787474777} from 
> checkpoint 1
> 10:24:35,353 INFO  
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction  - 
> 

[GitHub] flink issue #4960: Update version to 1.5-SNAPSHOT

2017-11-07 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/4960
  
Crazy realization is that there are 91 pom.xml files in Flink.
Its a large project :-)


---


[jira] [Commented] (FLINK-7996) Add support for (left.time = right.time) predicates to window join.

2017-11-07 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7996?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16241767#comment-16241767
 ] 

Fabian Hueske commented on FLINK-7996:
--

Yes, {{left.time = right.time + C}} should be supported as well.

I'd suggest to go for the easy case {{left.time = right.time}} first and 
include this in the 1.4 release. 
The case with a constant offset can be added later.

What do you think?

Best, Fabian

> Add support for (left.time = right.time) predicates to window join.
> ---
>
> Key: FLINK-7996
> URL: https://issues.apache.org/jira/browse/FLINK-7996
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.4.0
>Reporter: Fabian Hueske
>Assignee: Xingcan Cui
>Priority: Critical
> Fix For: 1.4.0
>
>
> A common operation is to join the result of two window aggregations on the 
> same timestamp. 
> However, window joins do not support equality predicates on time attributes 
> such as {{left.time = right.time}} but require two range predicates such as 
> {{left.time >= right.time AND left.time <= right.time}}.
> This can be fixed in the translation code (the operator does not have to be 
> touched).



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4960: Update version to 1.5-SNAPSHOT

2017-11-07 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/4960
  
:yellow_heart:

Very nice!


---


[jira] [Updated] (FLINK-7265) FileSystems should describe their kind and consistency level

2017-11-07 Thread Nico Kruber (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7265?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nico Kruber updated FLINK-7265:
---
Fix Version/s: (was: 1.4.0)
   1.5.0

> FileSystems should describe their kind and consistency level
> 
>
> Key: FLINK-7265
> URL: https://issues.apache.org/jira/browse/FLINK-7265
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Affects Versions: 1.3.1
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
> Fix For: 1.3.3, 1.5.0
>
>
> Currently, all {{FileSystem}} types look the same to Flink.
> However, certain operations should only be executed on certain kinds of file 
> systems.
> For example, it makes no sense to attempt to delete an empty parent directory 
> on S3, because there are no such thinks as directories, only hierarchical 
> naming in the keys (file names).



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7883) Stop fetching source before a cancel with savepoint

2017-11-07 Thread Antoine Philippot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7883?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16241762#comment-16241762
 ] 

Antoine Philippot commented on FLINK-7883:
--

Hi [~aljoscha], is there any news about this subject, I will be interested to 
participate on this feature
considering that we really need to not reprocess kafka messages for each job 
restart and we cannot afford to
maintain our fork forever


> Stop fetching source before a cancel with savepoint
> ---
>
> Key: FLINK-7883
> URL: https://issues.apache.org/jira/browse/FLINK-7883
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API, Kafka Connector, State Backends, 
> Checkpointing
>Affects Versions: 1.4.0, 1.3.2
>Reporter: Antoine Philippot
>
> For a cancel with savepoint command, the JobManager trigger the cancel call 
> once the savepoint is finished, but during the savepoint execution, kafka 
> source continue to poll new messages which will not be part of the savepoint 
> and will be replayed on the next application start.
> A solution could be to stop fetching the source stream task before triggering 
> the savepoint.
> I suggest to add an interface {{StoppableFetchingSourceFunction}} with a 
> method {{stopFetching}} that existant SourceFunction implementations could 
> implement.
> We can add a {{stopFetchingSource}} property in 
>  {{CheckpointOptions}} class to pass the desired behaviour from 
> {{JobManager.handleMessage(CancelJobWithSavepoint)}} to 
> {{SourceStreamTask.triggerCheckpoint}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4955: [FLINK-7978][kafka] Ensure that transactional ids ...

2017-11-07 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/4955#discussion_r149316761
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/TransactionalIdsGeneratorTest.java
 ---
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internal;
+
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link TransactionalIdsGenerator}.
+ */
+public class TransactionalIdsGeneratorTest {
+   private static final int POOL_SIZE = 3;
+   private static final int SAFE_SCALE_DOWN_FACTOR = 3;
+   private static final int SUBTASKS_COUNT = 5;
+
+   @Test
+   public void testGenerateIdsToUse() {
+   TransactionalIdsGenerator generator = new 
TransactionalIdsGenerator("test", 2, SUBTASKS_COUNT, POOL_SIZE, 
SAFE_SCALE_DOWN_FACTOR);
+
+   assertEquals(
+   new HashSet<>(Arrays.asList("test-42", "test-43", 
"test-44")),
+   generator.generateIdsToUse(36));
+   }
+
+   /**
+* Ids to abort and to use should never clash between subtasks.
+*/
+   @Test
+   public void testGeneratedIdsDoNotClash() {
+   List idsToAbort = new ArrayList<>();
+   List idsToUse = new ArrayList<>();
+
+   for (int subtask = 0; subtask < SUBTASKS_COUNT; subtask++) {
+   TransactionalIdsGenerator generator = new 
TransactionalIdsGenerator("test", subtask, SUBTASKS_COUNT, POOL_SIZE, 
SAFE_SCALE_DOWN_FACTOR);
+   idsToUse.add(generator.generateIdsToUse(0));
+   idsToAbort.add(generator.generateIdsToAbort());
+   }
+
+   for (int subtask1 = 0; subtask1 < SUBTASKS_COUNT; subtask1++) {
+   for (int subtask2 = 0; subtask2 < SUBTASKS_COUNT; 
subtask2++) {
+   if (subtask2 == subtask1) {
+   continue;
+   }
+   
assertIntersectionIsEmpty(idsToAbort.get(subtask2), idsToAbort.get(subtask1));
+   
assertIntersectionIsEmpty(idsToUse.get(subtask2), idsToUse.get(subtask1));
+   
assertIntersectionIsEmpty(idsToAbort.get(subtask2), idsToUse.get(subtask1));
+   }
+   }
+   }
+
+   private  void assertIntersectionIsEmpty(Set first, Set second) 
{
--- End diff --

Maybe `assertDisjoint`


---


[jira] [Commented] (FLINK-7978) Kafka011 exactly-once Producer sporadically fails to commit under high parallelism

2017-11-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7978?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16241760#comment-16241760
 ] 

ASF GitHub Bot commented on FLINK-7978:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/4955#discussion_r149316761
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/TransactionalIdsGeneratorTest.java
 ---
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internal;
+
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link TransactionalIdsGenerator}.
+ */
+public class TransactionalIdsGeneratorTest {
+   private static final int POOL_SIZE = 3;
+   private static final int SAFE_SCALE_DOWN_FACTOR = 3;
+   private static final int SUBTASKS_COUNT = 5;
+
+   @Test
+   public void testGenerateIdsToUse() {
+   TransactionalIdsGenerator generator = new 
TransactionalIdsGenerator("test", 2, SUBTASKS_COUNT, POOL_SIZE, 
SAFE_SCALE_DOWN_FACTOR);
+
+   assertEquals(
+   new HashSet<>(Arrays.asList("test-42", "test-43", 
"test-44")),
+   generator.generateIdsToUse(36));
+   }
+
+   /**
+* Ids to abort and to use should never clash between subtasks.
+*/
+   @Test
+   public void testGeneratedIdsDoNotClash() {
+   List idsToAbort = new ArrayList<>();
+   List idsToUse = new ArrayList<>();
+
+   for (int subtask = 0; subtask < SUBTASKS_COUNT; subtask++) {
+   TransactionalIdsGenerator generator = new 
TransactionalIdsGenerator("test", subtask, SUBTASKS_COUNT, POOL_SIZE, 
SAFE_SCALE_DOWN_FACTOR);
+   idsToUse.add(generator.generateIdsToUse(0));
+   idsToAbort.add(generator.generateIdsToAbort());
+   }
+
+   for (int subtask1 = 0; subtask1 < SUBTASKS_COUNT; subtask1++) {
+   for (int subtask2 = 0; subtask2 < SUBTASKS_COUNT; 
subtask2++) {
+   if (subtask2 == subtask1) {
+   continue;
+   }
+   
assertIntersectionIsEmpty(idsToAbort.get(subtask2), idsToAbort.get(subtask1));
+   
assertIntersectionIsEmpty(idsToUse.get(subtask2), idsToUse.get(subtask1));
+   
assertIntersectionIsEmpty(idsToAbort.get(subtask2), idsToUse.get(subtask1));
+   }
+   }
+   }
+
+   private  void assertIntersectionIsEmpty(Set first, Set second) 
{
--- End diff --

Maybe `assertDisjoint`


> Kafka011 exactly-once Producer sporadically fails to commit under high 
> parallelism
> --
>
> Key: FLINK-7978
> URL: https://issues.apache.org/jira/browse/FLINK-7978
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: Gary Yao
>Assignee: Piotr Nowojski
>Priority: Blocker
> Fix For: 1.4.0
>
>
> The Kafka011 exactly-once producer sporadically fails to commit/confirm the 
> first checkpoint. The behavior seems to be easier reproduced under high job 
> parallelism.
> *Logs/Stacktrace*
> {noformat}
> 10:24:35,347 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator  
>- Completed checkpoint 1 (191029 bytes in 1435 ms).
> 10:24:35,349 INFO  
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction  - 
> 

[jira] [Commented] (FLINK-7978) Kafka011 exactly-once Producer sporadically fails to commit under high parallelism

2017-11-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7978?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16241735#comment-16241735
 ] 

ASF GitHub Bot commented on FLINK-7978:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/4955#discussion_r149312052
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/TransactionalIdsGeneratorTest.java
 ---
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internal;
+
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link TransactionalIdsGenerator}.
+ */
+public class TransactionalIdsGeneratorTest {
+   private static final int POOL_SIZE = 3;
+   private static final int SAFE_SCALE_DOWN_FACTOR = 3;
+   private static final int SUBTASKS_COUNT = 5;
+
+   @Test
+   public void testGenerateIdsToUse() {
+   TransactionalIdsGenerator generator = new 
TransactionalIdsGenerator("test", 2, SUBTASKS_COUNT, POOL_SIZE, 
SAFE_SCALE_DOWN_FACTOR);
+
+   assertEquals(
+   new HashSet<>(Arrays.asList("test-42", "test-43", 
"test-44")),
+   generator.generateIdsToUse(36));
+   }
+
+   /**
+* Ids to abort and to use should never clash between subtasks.
+*/
+   @Test
+   public void testGeneratedIdsDoNotClash() {
+   List idsToAbort = new ArrayList<>();
+   List idsToUse = new ArrayList<>();
+
+   for (int subtask = 0; subtask < SUBTASKS_COUNT; subtask++) {
+   TransactionalIdsGenerator generator = new 
TransactionalIdsGenerator("test", subtask, SUBTASKS_COUNT, POOL_SIZE, 
SAFE_SCALE_DOWN_FACTOR);
+   idsToUse.add(generator.generateIdsToUse(0));
+   idsToAbort.add(generator.generateIdsToAbort());
+   }
+
+   for (int subtask1 = 0; subtask1 < SUBTASKS_COUNT; subtask1++) {
+   for (int subtask2 = 0; subtask2 < SUBTASKS_COUNT; 
subtask2++) {
+   if (subtask2 == subtask1) {
+   continue;
+   }
+   
assertIntersectionIsEmpty(idsToAbort.get(subtask2), idsToAbort.get(subtask1));
+   
assertIntersectionIsEmpty(idsToUse.get(subtask2), idsToUse.get(subtask1));
+   
assertIntersectionIsEmpty(idsToAbort.get(subtask2), idsToUse.get(subtask1));
--- End diff --

what about 
`assertIntersectionIsEmpty(idsToUse.get(subtask2), 
idsToAbort.get(subtask1));`?


> Kafka011 exactly-once Producer sporadically fails to commit under high 
> parallelism
> --
>
> Key: FLINK-7978
> URL: https://issues.apache.org/jira/browse/FLINK-7978
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: Gary Yao
>Assignee: Piotr Nowojski
>Priority: Blocker
> Fix For: 1.4.0
>
>
> The Kafka011 exactly-once producer sporadically fails to commit/confirm the 
> first checkpoint. The behavior seems to be easier reproduced under high job 
> parallelism.
> *Logs/Stacktrace*
> {noformat}
> 10:24:35,347 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator  
>- Completed checkpoint 1 (191029 bytes in 1435 ms).
> 10:24:35,349 INFO  
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction  - 
> FlinkKafkaProducer011 2/32 - checkpoint 1 complete, 

[GitHub] flink pull request #4955: [FLINK-7978][kafka] Ensure that transactional ids ...

2017-11-07 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/4955#discussion_r149312052
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/TransactionalIdsGeneratorTest.java
 ---
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internal;
+
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link TransactionalIdsGenerator}.
+ */
+public class TransactionalIdsGeneratorTest {
+   private static final int POOL_SIZE = 3;
+   private static final int SAFE_SCALE_DOWN_FACTOR = 3;
+   private static final int SUBTASKS_COUNT = 5;
+
+   @Test
+   public void testGenerateIdsToUse() {
+   TransactionalIdsGenerator generator = new 
TransactionalIdsGenerator("test", 2, SUBTASKS_COUNT, POOL_SIZE, 
SAFE_SCALE_DOWN_FACTOR);
+
+   assertEquals(
+   new HashSet<>(Arrays.asList("test-42", "test-43", 
"test-44")),
+   generator.generateIdsToUse(36));
+   }
+
+   /**
+* Ids to abort and to use should never clash between subtasks.
+*/
+   @Test
+   public void testGeneratedIdsDoNotClash() {
+   List idsToAbort = new ArrayList<>();
+   List idsToUse = new ArrayList<>();
+
+   for (int subtask = 0; subtask < SUBTASKS_COUNT; subtask++) {
+   TransactionalIdsGenerator generator = new 
TransactionalIdsGenerator("test", subtask, SUBTASKS_COUNT, POOL_SIZE, 
SAFE_SCALE_DOWN_FACTOR);
+   idsToUse.add(generator.generateIdsToUse(0));
+   idsToAbort.add(generator.generateIdsToAbort());
+   }
+
+   for (int subtask1 = 0; subtask1 < SUBTASKS_COUNT; subtask1++) {
+   for (int subtask2 = 0; subtask2 < SUBTASKS_COUNT; 
subtask2++) {
+   if (subtask2 == subtask1) {
+   continue;
+   }
+   
assertIntersectionIsEmpty(idsToAbort.get(subtask2), idsToAbort.get(subtask1));
+   
assertIntersectionIsEmpty(idsToUse.get(subtask2), idsToUse.get(subtask1));
+   
assertIntersectionIsEmpty(idsToAbort.get(subtask2), idsToUse.get(subtask1));
--- End diff --

what about 
`assertIntersectionIsEmpty(idsToUse.get(subtask2), 
idsToAbort.get(subtask1));`?


---


[jira] [Commented] (FLINK-7878) Extend the resource type user can define in ResourceSpec

2017-11-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7878?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16241752#comment-16241752
 ] 

ASF GitHub Bot commented on FLINK-7878:
---

Github user shuai-xu commented on the issue:

https://github.com/apache/flink/pull/4911
  
@tillrohrmann, There is not a generic way for both Yarn and Mesos, as their 
resource allocation interface are different. I think the YARN/MESOS resource 
manager should handle it in their own way. For example, in YarnResourceManager, 
it can add all extended resources to the yarn Resource.class by call 
setResourceValue(name, value). And then only if YARN support a new resource 
type, user can define it without code changing in flink. 


> Extend the resource type user can define in ResourceSpec
> 
>
> Key: FLINK-7878
> URL: https://issues.apache.org/jira/browse/FLINK-7878
> Project: Flink
>  Issue Type: Improvement
>  Components: DataSet API, DataStream API
>Reporter: shuai.xu
>Assignee: shuai.xu
>  Labels: flip-6
>
> Now, flink only support user define how much CPU and MEM used in an operator, 
> but now the resource in a cluster is various. For example, an application for 
> image processing may need GPU, some others may need FPGA. 
> Only CPU and MEM is not enough, and the resource type is becoming more and 
> more, so we need to make the ResourSpec extendible.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4955: [FLINK-7978][kafka] Ensure that transactional ids ...

2017-11-07 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/4955#discussion_r149315581
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/TransactionalIdsGeneratorTest.java
 ---
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internal;
+
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link TransactionalIdsGenerator}.
+ */
+public class TransactionalIdsGeneratorTest {
+   private static final int POOL_SIZE = 3;
+   private static final int SAFE_SCALE_DOWN_FACTOR = 3;
+   private static final int SUBTASKS_COUNT = 5;
+
+   @Test
+   public void testGenerateIdsToUse() {
+   TransactionalIdsGenerator generator = new 
TransactionalIdsGenerator("test", 2, SUBTASKS_COUNT, POOL_SIZE, 
SAFE_SCALE_DOWN_FACTOR);
+
+   assertEquals(
+   new HashSet<>(Arrays.asList("test-42", "test-43", 
"test-44")),
+   generator.generateIdsToUse(36));
+   }
+
+   /**
+* Ids to abort and to use should never clash between subtasks.
+*/
+   @Test
+   public void testGeneratedIdsDoNotClash() {
+   List idsToAbort = new ArrayList<>();
+   List idsToUse = new ArrayList<>();
+
+   for (int subtask = 0; subtask < SUBTASKS_COUNT; subtask++) {
+   TransactionalIdsGenerator generator = new 
TransactionalIdsGenerator("test", subtask, SUBTASKS_COUNT, POOL_SIZE, 
SAFE_SCALE_DOWN_FACTOR);
+   idsToUse.add(generator.generateIdsToUse(0));
+   idsToAbort.add(generator.generateIdsToAbort());
+   }
+
+   for (int subtask1 = 0; subtask1 < SUBTASKS_COUNT; subtask1++) {
+   for (int subtask2 = 0; subtask2 < SUBTASKS_COUNT; 
subtask2++) {
+   if (subtask2 == subtask1) {
+   continue;
+   }
+   
assertIntersectionIsEmpty(idsToAbort.get(subtask2), idsToAbort.get(subtask1));
+   
assertIntersectionIsEmpty(idsToUse.get(subtask2), idsToUse.get(subtask1));
+   
assertIntersectionIsEmpty(idsToAbort.get(subtask2), idsToUse.get(subtask1));
--- End diff --

ah you're right, thanks for the explanation :)


---


[jira] [Commented] (FLINK-7978) Kafka011 exactly-once Producer sporadically fails to commit under high parallelism

2017-11-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7978?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16241751#comment-16241751
 ] 

ASF GitHub Bot commented on FLINK-7978:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/4955#discussion_r149315581
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/TransactionalIdsGeneratorTest.java
 ---
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internal;
+
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link TransactionalIdsGenerator}.
+ */
+public class TransactionalIdsGeneratorTest {
+   private static final int POOL_SIZE = 3;
+   private static final int SAFE_SCALE_DOWN_FACTOR = 3;
+   private static final int SUBTASKS_COUNT = 5;
+
+   @Test
+   public void testGenerateIdsToUse() {
+   TransactionalIdsGenerator generator = new 
TransactionalIdsGenerator("test", 2, SUBTASKS_COUNT, POOL_SIZE, 
SAFE_SCALE_DOWN_FACTOR);
+
+   assertEquals(
+   new HashSet<>(Arrays.asList("test-42", "test-43", 
"test-44")),
+   generator.generateIdsToUse(36));
+   }
+
+   /**
+* Ids to abort and to use should never clash between subtasks.
+*/
+   @Test
+   public void testGeneratedIdsDoNotClash() {
+   List idsToAbort = new ArrayList<>();
+   List idsToUse = new ArrayList<>();
+
+   for (int subtask = 0; subtask < SUBTASKS_COUNT; subtask++) {
+   TransactionalIdsGenerator generator = new 
TransactionalIdsGenerator("test", subtask, SUBTASKS_COUNT, POOL_SIZE, 
SAFE_SCALE_DOWN_FACTOR);
+   idsToUse.add(generator.generateIdsToUse(0));
+   idsToAbort.add(generator.generateIdsToAbort());
+   }
+
+   for (int subtask1 = 0; subtask1 < SUBTASKS_COUNT; subtask1++) {
+   for (int subtask2 = 0; subtask2 < SUBTASKS_COUNT; 
subtask2++) {
+   if (subtask2 == subtask1) {
+   continue;
+   }
+   
assertIntersectionIsEmpty(idsToAbort.get(subtask2), idsToAbort.get(subtask1));
+   
assertIntersectionIsEmpty(idsToUse.get(subtask2), idsToUse.get(subtask1));
+   
assertIntersectionIsEmpty(idsToAbort.get(subtask2), idsToUse.get(subtask1));
--- End diff --

ah you're right, thanks for the explanation :)


> Kafka011 exactly-once Producer sporadically fails to commit under high 
> parallelism
> --
>
> Key: FLINK-7978
> URL: https://issues.apache.org/jira/browse/FLINK-7978
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: Gary Yao
>Assignee: Piotr Nowojski
>Priority: Blocker
> Fix For: 1.4.0
>
>
> The Kafka011 exactly-once producer sporadically fails to commit/confirm the 
> first checkpoint. The behavior seems to be easier reproduced under high job 
> parallelism.
> *Logs/Stacktrace*
> {noformat}
> 10:24:35,347 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator  
>- Completed checkpoint 1 (191029 bytes in 1435 ms).
> 10:24:35,349 INFO  
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction  - 
> FlinkKafkaProducer011 2/32 - checkpoint 1 complete, committing transaction 
> 

[GitHub] flink issue #4911: [FLINK-7878] [api] make resource type extendible in Resou...

2017-11-07 Thread shuai-xu
Github user shuai-xu commented on the issue:

https://github.com/apache/flink/pull/4911
  
@tillrohrmann, There is not a generic way for both Yarn and Mesos, as their 
resource allocation interface are different. I think the YARN/MESOS resource 
manager should handle it in their own way. For example, in YarnResourceManager, 
it can add all extended resources to the yarn Resource.class by call 
setResourceValue(name, value). And then only if YARN support a new resource 
type, user can define it without code changing in flink. 


---


[jira] [Commented] (FLINK-7978) Kafka011 exactly-once Producer sporadically fails to commit under high parallelism

2017-11-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7978?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16241749#comment-16241749
 ] 

ASF GitHub Bot commented on FLINK-7978:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4955#discussion_r149315173
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/TransactionalIdsGeneratorTest.java
 ---
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internal;
+
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link TransactionalIdsGenerator}.
+ */
+public class TransactionalIdsGeneratorTest {
+   private static final int POOL_SIZE = 3;
+   private static final int SAFE_SCALE_DOWN_FACTOR = 3;
+   private static final int SUBTASKS_COUNT = 5;
+
+   @Test
+   public void testGenerateIdsToUse() {
+   TransactionalIdsGenerator generator = new 
TransactionalIdsGenerator("test", 2, SUBTASKS_COUNT, POOL_SIZE, 
SAFE_SCALE_DOWN_FACTOR);
+
+   assertEquals(
+   new HashSet<>(Arrays.asList("test-42", "test-43", 
"test-44")),
+   generator.generateIdsToUse(36));
+   }
+
+   /**
+* Ids to abort and to use should never clash between subtasks.
+*/
+   @Test
+   public void testGeneratedIdsDoNotClash() {
+   List idsToAbort = new ArrayList<>();
+   List idsToUse = new ArrayList<>();
+
+   for (int subtask = 0; subtask < SUBTASKS_COUNT; subtask++) {
+   TransactionalIdsGenerator generator = new 
TransactionalIdsGenerator("test", subtask, SUBTASKS_COUNT, POOL_SIZE, 
SAFE_SCALE_DOWN_FACTOR);
+   idsToUse.add(generator.generateIdsToUse(0));
+   idsToAbort.add(generator.generateIdsToAbort());
+   }
+
+   for (int subtask1 = 0; subtask1 < SUBTASKS_COUNT; subtask1++) {
+   for (int subtask2 = 0; subtask2 < SUBTASKS_COUNT; 
subtask2++) {
+   if (subtask2 == subtask1) {
+   continue;
+   }
+   
assertIntersectionIsEmpty(idsToAbort.get(subtask2), idsToAbort.get(subtask1));
+   
assertIntersectionIsEmpty(idsToUse.get(subtask2), idsToUse.get(subtask1));
+   
assertIntersectionIsEmpty(idsToAbort.get(subtask2), idsToUse.get(subtask1));
--- End diff --

that will be covered by second half of looping (everytime I start from 
index 0). It would be necessary if the loop would look like:
```
for (int subtask1 = 0; subtask1 < SUBTASKS_COUNT; subtask1++) {
for (int subtask2 = subtask1; subtask2 < SUBTASKS_COUNT; subtask2++) {
```
instead of as it is now:
```
for (int subtask1 = 0; subtask1 < SUBTASKS_COUNT; subtask1++) {
for (int subtask2 = 0; subtask2 < SUBTASKS_COUNT; subtask2++) {
```


> Kafka011 exactly-once Producer sporadically fails to commit under high 
> parallelism
> --
>
> Key: FLINK-7978
> URL: https://issues.apache.org/jira/browse/FLINK-7978
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: Gary Yao
>Assignee: Piotr Nowojski
>Priority: Blocker
> Fix For: 1.4.0
>
>
> The Kafka011 exactly-once producer sporadically fails to commit/confirm the 
> first checkpoint. The behavior seems to be 

[jira] [Commented] (FLINK-7978) Kafka011 exactly-once Producer sporadically fails to commit under high parallelism

2017-11-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7978?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16241748#comment-16241748
 ] 

ASF GitHub Bot commented on FLINK-7978:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/4955#discussion_r149315106
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/TransactionalIdsGenerator.java
 ---
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internal;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Class responsible for generating transactional ids to use when 
communicating with Kafka.
+ *
+ * It guarantees that:
+ * - generated ids to use will never clash with ids to use from different 
subtasks
+ * - generated ids to abort will never clash with ids to abort from 
different subtasks
+ * - generated ids to use will never clash with ids to abort from 
different subtasks
+ *
+ * In other words, any particular generated id will always be assigned 
to one and only one subtask.
+ */
+public class TransactionalIdsGenerator {
--- End diff --

It is better to use the unordered list tag, e.g.,
```
  * 
  * generated ids to use will never clash with ids to use from 
different subtasks
  * generated ids to abort will never clash with ids to abort 
from different subtasks
  * generated ids to use will never clash with ids to abort 
from different subtasks
  * 
```
Otherwise it is rendered like this:

![image](https://user-images.githubusercontent.com/1681921/32487045-c97ffe8a-c3a8-11e7-95e8-f3ee127072b9.png)



> Kafka011 exactly-once Producer sporadically fails to commit under high 
> parallelism
> --
>
> Key: FLINK-7978
> URL: https://issues.apache.org/jira/browse/FLINK-7978
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: Gary Yao
>Assignee: Piotr Nowojski
>Priority: Blocker
> Fix For: 1.4.0
>
>
> The Kafka011 exactly-once producer sporadically fails to commit/confirm the 
> first checkpoint. The behavior seems to be easier reproduced under high job 
> parallelism.
> *Logs/Stacktrace*
> {noformat}
> 10:24:35,347 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator  
>- Completed checkpoint 1 (191029 bytes in 1435 ms).
> 10:24:35,349 INFO  
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction  - 
> FlinkKafkaProducer011 2/32 - checkpoint 1 complete, committing transaction 
> TransactionHolder{handle=KafkaTransactionState [transactionalId=Sink: 
> kafka-sink-1509787467330-12], transactionStartTime=1509787474588} from 
> checkpoint 1
> 10:24:35,349 INFO  
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction  - 
> FlinkKafkaProducer011 1/32 - checkpoint 1 complete, committing transaction 
> TransactionHolder{handle=KafkaTransactionState [transactionalId=Sink: 
> kafka-sink-1509787467330-8], transactionStartTime=1509787474393} from 
> checkpoint 1
> 10:24:35,349 INFO  
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction  - 
> FlinkKafkaProducer011 0/32 - checkpoint 1 complete, committing transaction 
> TransactionHolder{handle=KafkaTransactionState [transactionalId=Sink: 
> kafka-sink-1509787467330-4], transactionStartTime=1509787474448} from 
> checkpoint 1
> 10:24:35,350 INFO  
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction  - 
> FlinkKafkaProducer011 6/32 - checkpoint 1 complete, committing transaction 
> TransactionHolder{handle=KafkaTransactionState [transactionalId=Sink: 
> kafka-sink-1509787467330-34], 

[GitHub] flink pull request #4955: [FLINK-7978][kafka] Ensure that transactional ids ...

2017-11-07 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4955#discussion_r149315173
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/TransactionalIdsGeneratorTest.java
 ---
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internal;
+
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link TransactionalIdsGenerator}.
+ */
+public class TransactionalIdsGeneratorTest {
+   private static final int POOL_SIZE = 3;
+   private static final int SAFE_SCALE_DOWN_FACTOR = 3;
+   private static final int SUBTASKS_COUNT = 5;
+
+   @Test
+   public void testGenerateIdsToUse() {
+   TransactionalIdsGenerator generator = new 
TransactionalIdsGenerator("test", 2, SUBTASKS_COUNT, POOL_SIZE, 
SAFE_SCALE_DOWN_FACTOR);
+
+   assertEquals(
+   new HashSet<>(Arrays.asList("test-42", "test-43", 
"test-44")),
+   generator.generateIdsToUse(36));
+   }
+
+   /**
+* Ids to abort and to use should never clash between subtasks.
+*/
+   @Test
+   public void testGeneratedIdsDoNotClash() {
+   List idsToAbort = new ArrayList<>();
+   List idsToUse = new ArrayList<>();
+
+   for (int subtask = 0; subtask < SUBTASKS_COUNT; subtask++) {
+   TransactionalIdsGenerator generator = new 
TransactionalIdsGenerator("test", subtask, SUBTASKS_COUNT, POOL_SIZE, 
SAFE_SCALE_DOWN_FACTOR);
+   idsToUse.add(generator.generateIdsToUse(0));
+   idsToAbort.add(generator.generateIdsToAbort());
+   }
+
+   for (int subtask1 = 0; subtask1 < SUBTASKS_COUNT; subtask1++) {
+   for (int subtask2 = 0; subtask2 < SUBTASKS_COUNT; 
subtask2++) {
+   if (subtask2 == subtask1) {
+   continue;
+   }
+   
assertIntersectionIsEmpty(idsToAbort.get(subtask2), idsToAbort.get(subtask1));
+   
assertIntersectionIsEmpty(idsToUse.get(subtask2), idsToUse.get(subtask1));
+   
assertIntersectionIsEmpty(idsToAbort.get(subtask2), idsToUse.get(subtask1));
--- End diff --

that will be covered by second half of looping (everytime I start from 
index 0). It would be necessary if the loop would look like:
```
for (int subtask1 = 0; subtask1 < SUBTASKS_COUNT; subtask1++) {
for (int subtask2 = subtask1; subtask2 < SUBTASKS_COUNT; subtask2++) {
```
instead of as it is now:
```
for (int subtask1 = 0; subtask1 < SUBTASKS_COUNT; subtask1++) {
for (int subtask2 = 0; subtask2 < SUBTASKS_COUNT; subtask2++) {
```


---


[jira] [Commented] (FLINK-6022) Improve support for Avro GenericRecord

2017-11-07 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16241747#comment-16241747
 ] 

Stephan Ewen commented on FLINK-6022:
-

I think this should be resolved, because Generic records and all non-specific 
records should go through the ReflectDatumReader/Writer.

However, would be great if someone from the people that originally opened the 
ticket could comment on this.

> Improve support for Avro GenericRecord
> --
>
> Key: FLINK-6022
> URL: https://issues.apache.org/jira/browse/FLINK-6022
> Project: Flink
>  Issue Type: Improvement
>  Components: Type Serialization System
>Reporter: Robert Metzger
>Assignee: Stephan Ewen
>Priority: Blocker
> Fix For: 1.4.0
>
>
> Currently, Flink is serializing the schema for each Avro GenericRecord in the 
> stream.
> This leads to a lot of overhead over the wire/disk + high serialization costs.
> Therefore, I'm proposing to improve the support for GenericRecord in Flink by 
> shipping the schema to each serializer  through the AvroTypeInformation.
> Then, we can only support GenericRecords with the same type per stream, but 
> the performance will be much better.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4955: [FLINK-7978][kafka] Ensure that transactional ids ...

2017-11-07 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/4955#discussion_r149315106
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/TransactionalIdsGenerator.java
 ---
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internal;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Class responsible for generating transactional ids to use when 
communicating with Kafka.
+ *
+ * It guarantees that:
+ * - generated ids to use will never clash with ids to use from different 
subtasks
+ * - generated ids to abort will never clash with ids to abort from 
different subtasks
+ * - generated ids to use will never clash with ids to abort from 
different subtasks
+ *
+ * In other words, any particular generated id will always be assigned 
to one and only one subtask.
+ */
+public class TransactionalIdsGenerator {
--- End diff --

It is better to use the unordered list tag, e.g.,
```
  * 
  * generated ids to use will never clash with ids to use from 
different subtasks
  * generated ids to abort will never clash with ids to abort 
from different subtasks
  * generated ids to use will never clash with ids to abort 
from different subtasks
  * 
```
Otherwise it is rendered like this:

![image](https://user-images.githubusercontent.com/1681921/32487045-c97ffe8a-c3a8-11e7-95e8-f3ee127072b9.png)



---


[jira] [Commented] (FLINK-7978) Kafka011 exactly-once Producer sporadically fails to commit under high parallelism

2017-11-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7978?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16241736#comment-16241736
 ] 

ASF GitHub Bot commented on FLINK-7978:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/4955#discussion_r149310451
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/TransactionalIdsGenerator.java
 ---
@@ -19,27 +19,35 @@
 
 import java.util.HashSet;
 import java.util.Set;
-import java.util.stream.Collectors;
-import java.util.stream.LongStream;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * Class responsible for generating transactional ids to use when 
communicating with Kafka.
+ *
+ * It guarantees that:
+ * - generated ids to use will never clash with ids to use from different 
subtasks
+ * - generated ids to abort will never clash with ids to abort from 
different subtasks
+ * - generated ids to use will never clash with ids to abort from 
different subtasks
+ *
+ * In other words, any particular generated id will always be assigned 
to one and only one subtask.
  */
 public class TransactionalIdsGenerator {
private final String prefix;
private final int subtaskIndex;
+   private final int totalNumberOfSubtasks;
private final int poolSize;
private final int safeScaleDownFactor;
 
public TransactionalIdsGenerator(
String prefix,
int subtaskIndex,
+   int totalNumberOfSubtasks,
int poolSize,
int safeScaleDownFactor) {
this.prefix = checkNotNull(prefix);
this.subtaskIndex = subtaskIndex;
+   this.totalNumberOfSubtasks = totalNumberOfSubtasks;
--- End diff --

Maybe we should add some argument checks for subtask index and 
totalNumberOfSubtasks, at least.


> Kafka011 exactly-once Producer sporadically fails to commit under high 
> parallelism
> --
>
> Key: FLINK-7978
> URL: https://issues.apache.org/jira/browse/FLINK-7978
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: Gary Yao
>Assignee: Piotr Nowojski
>Priority: Blocker
> Fix For: 1.4.0
>
>
> The Kafka011 exactly-once producer sporadically fails to commit/confirm the 
> first checkpoint. The behavior seems to be easier reproduced under high job 
> parallelism.
> *Logs/Stacktrace*
> {noformat}
> 10:24:35,347 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator  
>- Completed checkpoint 1 (191029 bytes in 1435 ms).
> 10:24:35,349 INFO  
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction  - 
> FlinkKafkaProducer011 2/32 - checkpoint 1 complete, committing transaction 
> TransactionHolder{handle=KafkaTransactionState [transactionalId=Sink: 
> kafka-sink-1509787467330-12], transactionStartTime=1509787474588} from 
> checkpoint 1
> 10:24:35,349 INFO  
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction  - 
> FlinkKafkaProducer011 1/32 - checkpoint 1 complete, committing transaction 
> TransactionHolder{handle=KafkaTransactionState [transactionalId=Sink: 
> kafka-sink-1509787467330-8], transactionStartTime=1509787474393} from 
> checkpoint 1
> 10:24:35,349 INFO  
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction  - 
> FlinkKafkaProducer011 0/32 - checkpoint 1 complete, committing transaction 
> TransactionHolder{handle=KafkaTransactionState [transactionalId=Sink: 
> kafka-sink-1509787467330-4], transactionStartTime=1509787474448} from 
> checkpoint 1
> 10:24:35,350 INFO  
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction  - 
> FlinkKafkaProducer011 6/32 - checkpoint 1 complete, committing transaction 
> TransactionHolder{handle=KafkaTransactionState [transactionalId=Sink: 
> kafka-sink-1509787467330-34], transactionStartTime=1509787474742} from 
> checkpoint 1
> 10:24:35,350 INFO  
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction  - 
> FlinkKafkaProducer011 4/32 - checkpoint 1 complete, committing transaction 
> TransactionHolder{handle=KafkaTransactionState [transactionalId=Sink: 
> kafka-sink-1509787467330-23], transactionStartTime=1509787474777} from 
> checkpoint 1
> 10:24:35,353 INFO  
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction  - 
> FlinkKafkaProducer011 10/32 - checkpoint 1 complete, committing transaction 
> TransactionHolder{handle=KafkaTransactionState [transactionalId=Sink: 
> 

[GitHub] flink pull request #4955: [FLINK-7978][kafka] Ensure that transactional ids ...

2017-11-07 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/4955#discussion_r149310451
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/TransactionalIdsGenerator.java
 ---
@@ -19,27 +19,35 @@
 
 import java.util.HashSet;
 import java.util.Set;
-import java.util.stream.Collectors;
-import java.util.stream.LongStream;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * Class responsible for generating transactional ids to use when 
communicating with Kafka.
+ *
+ * It guarantees that:
+ * - generated ids to use will never clash with ids to use from different 
subtasks
+ * - generated ids to abort will never clash with ids to abort from 
different subtasks
+ * - generated ids to use will never clash with ids to abort from 
different subtasks
+ *
+ * In other words, any particular generated id will always be assigned 
to one and only one subtask.
  */
 public class TransactionalIdsGenerator {
private final String prefix;
private final int subtaskIndex;
+   private final int totalNumberOfSubtasks;
private final int poolSize;
private final int safeScaleDownFactor;
 
public TransactionalIdsGenerator(
String prefix,
int subtaskIndex,
+   int totalNumberOfSubtasks,
int poolSize,
int safeScaleDownFactor) {
this.prefix = checkNotNull(prefix);
this.subtaskIndex = subtaskIndex;
+   this.totalNumberOfSubtasks = totalNumberOfSubtasks;
--- End diff --

Maybe we should add some argument checks for subtask index and 
totalNumberOfSubtasks, at least.


---


[jira] [Commented] (FLINK-7996) Add support for (left.time = right.time) predicates to window join.

2017-11-07 Thread Xingcan Cui (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7996?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16241732#comment-16241732
 ] 

Xingcan Cui commented on FLINK-7996:


Hi [~fhueske], thanks for opening this. IMO, the conditions like {{left.time = 
right.time + C}} should also be valid since they can be transformed to a window 
like {{left.time >= right.time + C && left.time <= right.time + C}}. While this 
seems to be sort of contradictory with FLINK-7800, in which the row expression 
like {{left.attr = right.attr + C}} will always be translated to a calculation 
with an equi-join (i.e., $1 = right.time + C and left.time = $1). I need to 
consider the two problems as a whole.

Best, Xingcan

> Add support for (left.time = right.time) predicates to window join.
> ---
>
> Key: FLINK-7996
> URL: https://issues.apache.org/jira/browse/FLINK-7996
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.4.0
>Reporter: Fabian Hueske
>Assignee: Xingcan Cui
>Priority: Critical
> Fix For: 1.4.0
>
>
> A common operation is to join the result of two window aggregations on the 
> same timestamp. 
> However, window joins do not support equality predicates on time attributes 
> such as {{left.time = right.time}} but require two range predicates such as 
> {{left.time >= right.time AND left.time <= right.time}}.
> This can be fixed in the translation code (the operator does not have to be 
> touched).



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6022) Improve support for Avro GenericRecord

2017-11-07 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16241726#comment-16241726
 ] 

Aljoscha Krettek commented on FLINK-6022:
-

[~StephanEwen] We should probably open a follow-up issue and mark this one as 
resolved? Or move to 1.5.

> Improve support for Avro GenericRecord
> --
>
> Key: FLINK-6022
> URL: https://issues.apache.org/jira/browse/FLINK-6022
> Project: Flink
>  Issue Type: Improvement
>  Components: Type Serialization System
>Reporter: Robert Metzger
>Assignee: Stephan Ewen
>Priority: Blocker
> Fix For: 1.4.0
>
>
> Currently, Flink is serializing the schema for each Avro GenericRecord in the 
> stream.
> This leads to a lot of overhead over the wire/disk + high serialization costs.
> Therefore, I'm proposing to improve the support for GenericRecord in Flink by 
> shipping the schema to each serializer  through the AvroTypeInformation.
> Then, we can only support GenericRecords with the same type per stream, but 
> the performance will be much better.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4960: Update version to 1.5-SNAPSHOT

2017-11-07 Thread aljoscha
Github user aljoscha closed the pull request at:

https://github.com/apache/flink/pull/4960


---


[GitHub] flink pull request #4729: [FLINK-7076] [ResourceManager] implement YARN stop...

2017-11-07 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4729#discussion_r149307271
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnWorkerNode.java ---
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import 
org.apache.flink.runtime.clusterframework.types.ResourceIDRetrievable;
+
+import org.apache.hadoop.yarn.api.records.Container;
+
+import java.io.Serializable;
+
+/**
+ * A stored YARN worker, which contains the YARN container.
+ */
+public class YarnWorkerNode implements ResourceIDRetrievable, Serializable 
{
--- End diff --

Remove the type constraint in `ResourceManager`, I would suggest.


---


[jira] [Commented] (FLINK-7076) Implement container release to support dynamic scaling

2017-11-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16241718#comment-16241718
 ] 

ASF GitHub Bot commented on FLINK-7076:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4729#discussion_r149307271
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnWorkerNode.java ---
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import 
org.apache.flink.runtime.clusterframework.types.ResourceIDRetrievable;
+
+import org.apache.hadoop.yarn.api.records.Container;
+
+import java.io.Serializable;
+
+/**
+ * A stored YARN worker, which contains the YARN container.
+ */
+public class YarnWorkerNode implements ResourceIDRetrievable, Serializable 
{
--- End diff --

Remove the type constraint in `ResourceManager`, I would suggest.


> Implement container release to support dynamic scaling
> --
>
> Key: FLINK-7076
> URL: https://issues.apache.org/jira/browse/FLINK-7076
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Reporter: Till Rohrmann
>Assignee: Shuyi Chen
>  Labels: flip-6
>
> In order to support dynamic scaling, the {{YarnResourceManager}} has to be 
> able to dynamically free containers. We have to implement the 
> {{YarnResourceManager#stopWorker}} method.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4911: [FLINK-7878] [api] make resource type extendible in Resou...

2017-11-07 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/4911
  
How do you pass the resource specification in a generic way to Yarn and 
Mesos? Is there some kind of interface defined for that?


---


[jira] [Commented] (FLINK-7878) Extend the resource type user can define in ResourceSpec

2017-11-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7878?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16241715#comment-16241715
 ] 

ASF GitHub Bot commented on FLINK-7878:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/4911
  
How do you pass the resource specification in a generic way to Yarn and 
Mesos? Is there some kind of interface defined for that?


> Extend the resource type user can define in ResourceSpec
> 
>
> Key: FLINK-7878
> URL: https://issues.apache.org/jira/browse/FLINK-7878
> Project: Flink
>  Issue Type: Improvement
>  Components: DataSet API, DataStream API
>Reporter: shuai.xu
>Assignee: shuai.xu
>  Labels: flip-6
>
> Now, flink only support user define how much CPU and MEM used in an operator, 
> but now the resource in a cluster is various. For example, an application for 
> image processing may need GPU, some others may need FPGA. 
> Only CPU and MEM is not enough, and the resource type is becoming more and 
> more, so we need to make the ResourSpec extendible.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7856) Port JobVertexBackPressureHandler to REST endpoint

2017-11-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16241710#comment-16241710
 ] 

ASF GitHub Bot commented on FLINK-7856:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4893#discussion_r149305450
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexBackPressureHandler.java
 ---
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.WebOptions;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
+import 
org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureStatsTracker;
+import 
org.apache.flink.runtime.rest.handler.legacy.backpressure.OperatorBackPressureStats;
+import 
org.apache.flink.runtime.rest.handler.legacy.backpressure.StackTraceSampleCoordinator;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobVertexBackPressureInfo;
+import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter;
+import org.apache.flink.runtime.rest.messages.JobVertexMessageParameters;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+/**
+ * Request handler for the job vertex back pressure.
+ */
+public class JobVertexBackPressureHandler extends 
AbstractExecutionGraphHandler {
+   /** Back pressure stats tracker. */
+   private final BackPressureStatsTracker backPressureStatsTracker;
+
+   /** Time after which stats are considered outdated. */
+   private final int refreshInterval;
+
+   public JobVertexBackPressureHandler(
+   CompletableFuture localRestAddress,
+   GatewayRetriever 
leaderRetriever,
+   Time timeout,
+   MessageHeaders messageHeaders,
+   ExecutionGraphCache executionGraphCache,
+   Executor executor,
+   Configuration clusterConfiguration) {
+   super(localRestAddress, leaderRetriever, timeout, 
messageHeaders, executionGraphCache, executor);
+
+   // Back pressure stats tracker config
+   this.refreshInterval = 
clusterConfiguration.getInteger(WebOptions.BACKPRESSURE_REFRESH_INTERVAL);
+   this.backPressureStatsTracker = new BackPressureStatsTracker(
+   new StackTraceSampleCoordinator(executor, 6),
+   
clusterConfiguration.getInteger(WebOptions.BACKPRESSURE_CLEANUP_INTERVAL),
+   
clusterConfiguration.getInteger(WebOptions.BACKPRESSURE_NUM_SAMPLES),
+   
Time.milliseconds(clusterConfiguration.getInteger(WebOptions.BACKPRESSURE_DELAY)));
+   }
+
+   @Override
+   protected 

[jira] [Commented] (FLINK-7856) Port JobVertexBackPressureHandler to REST endpoint

2017-11-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16241708#comment-16241708
 ] 

ASF GitHub Bot commented on FLINK-7856:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4893#discussion_r149305275
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexBackPressureInfo.java
 ---
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import 
org.apache.flink.runtime.rest.handler.job.JobVertexBackPressureHandler;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Response type of the {@link JobVertexBackPressureHandler}.
+ */
+public class JobVertexBackPressureInfo implements ResponseBody {
+
+   public static final String FIELD_NAME_STATUS = "status";
+   public static final String FIELD_NAME_BACKPRESSURE_LEVEL = 
"backpressure-level";
+   public static final String FIELD_NAME_END_TIMESTAMP = "end-timestamp";
+   public static final String FIELD_NAME_SUBTASKS = "subtasks";
+
+   @JsonProperty(FIELD_NAME_STATUS)
+   private final String status;
+
+   @JsonProperty(FIELD_NAME_BACKPRESSURE_LEVEL)
+   private final String backpressureLevel;
+
+   @JsonProperty(FIELD_NAME_END_TIMESTAMP)
+   private final Long endTimestamp;
+
+   @JsonProperty(FIELD_NAME_SUBTASKS)
+   private final List subtasks;
+
+   @JsonCreator
+   public JobVertexBackPressureInfo(
+   @JsonProperty(FIELD_NAME_STATUS) String status,
+   @JsonProperty(FIELD_NAME_BACKPRESSURE_LEVEL) String 
backpressureLevel,
+   @JsonProperty(FIELD_NAME_END_TIMESTAMP) Long endTimestamp,
+   @JsonProperty(FIELD_NAME_SUBTASKS) 
List subtasks) {
+   this.status = status;
+   this.backpressureLevel = backpressureLevel;
+   this.endTimestamp = endTimestamp;
+   this.subtasks = subtasks;
+   }
+
+   @Override
+   public boolean equals(Object o) {
+   if (this == o) {
+   return true;
+   }
+   if (o == null || getClass() != o.getClass()) {
+   return false;
+   }
+   JobVertexBackPressureInfo that = (JobVertexBackPressureInfo) o;
+   return Objects.equals(status, that.status) &&
+   Objects.equals(backpressureLevel, 
that.backpressureLevel) &&
+   Objects.equals(endTimestamp, that.endTimestamp) &&
+   Objects.equals(subtasks, that.subtasks);
+   }
+
+   @Override
+   public int hashCode() {
+   return Objects.hash(status, backpressureLevel, endTimestamp, 
subtasks);
+   }
+
+   
//-
+   // Static helper classes
+   
//-
+
+   /**
+* Nested class to encapsulate the sub tasks back pressure.
+*/
+   public static final class SubtaskBackPressureInfo {
+
+   public static final String FIELD_NAME_STATUS = "subtask";
+   public static final String FIELD_NAME_BACKPRESSURE_LEVEL = 
"backpressure-level";
+   public static final String FIELD_NAME_RATIO = "ratio";
+
+   @JsonProperty(FIELD_NAME_STATUS)
+   private final int subtask;
+
+   @JsonProperty(FIELD_NAME_BACKPRESSURE_LEVEL)
+   private final String backpressureLevel;
+
+   @JsonProperty(FIELD_NAME_RATIO)
+   private final double ratio;
+
+  

[jira] [Commented] (FLINK-7856) Port JobVertexBackPressureHandler to REST endpoint

2017-11-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16241707#comment-16241707
 ] 

ASF GitHub Bot commented on FLINK-7856:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4893#discussion_r149305236
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexBackPressureInfo.java
 ---
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import 
org.apache.flink.runtime.rest.handler.job.JobVertexBackPressureHandler;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Response type of the {@link JobVertexBackPressureHandler}.
+ */
+public class JobVertexBackPressureInfo implements ResponseBody {
+
+   public static final String FIELD_NAME_STATUS = "status";
+   public static final String FIELD_NAME_BACKPRESSURE_LEVEL = 
"backpressure-level";
+   public static final String FIELD_NAME_END_TIMESTAMP = "end-timestamp";
+   public static final String FIELD_NAME_SUBTASKS = "subtasks";
+
+   @JsonProperty(FIELD_NAME_STATUS)
+   private final String status;
+
+   @JsonProperty(FIELD_NAME_BACKPRESSURE_LEVEL)
+   private final String backpressureLevel;
+
+   @JsonProperty(FIELD_NAME_END_TIMESTAMP)
+   private final Long endTimestamp;
+
+   @JsonProperty(FIELD_NAME_SUBTASKS)
+   private final List subtasks;
+
+   @JsonCreator
+   public JobVertexBackPressureInfo(
+   @JsonProperty(FIELD_NAME_STATUS) String status,
+   @JsonProperty(FIELD_NAME_BACKPRESSURE_LEVEL) String 
backpressureLevel,
+   @JsonProperty(FIELD_NAME_END_TIMESTAMP) Long endTimestamp,
+   @JsonProperty(FIELD_NAME_SUBTASKS) 
List subtasks) {
+   this.status = status;
+   this.backpressureLevel = backpressureLevel;
+   this.endTimestamp = endTimestamp;
+   this.subtasks = subtasks;
+   }
+
+   @Override
+   public boolean equals(Object o) {
+   if (this == o) {
+   return true;
+   }
+   if (o == null || getClass() != o.getClass()) {
+   return false;
+   }
+   JobVertexBackPressureInfo that = (JobVertexBackPressureInfo) o;
+   return Objects.equals(status, that.status) &&
+   Objects.equals(backpressureLevel, 
that.backpressureLevel) &&
+   Objects.equals(endTimestamp, that.endTimestamp) &&
+   Objects.equals(subtasks, that.subtasks);
+   }
+
+   @Override
+   public int hashCode() {
+   return Objects.hash(status, backpressureLevel, endTimestamp, 
subtasks);
+   }
+
+   
//-
+   // Static helper classes
+   
//-
+
+   /**
+* Nested class to encapsulate the sub tasks back pressure.
+*/
+   public static final class SubtaskBackPressureInfo {
+
+   public static final String FIELD_NAME_STATUS = "subtask";
+   public static final String FIELD_NAME_BACKPRESSURE_LEVEL = 
"backpressure-level";
+   public static final String FIELD_NAME_RATIO = "ratio";
+
+   @JsonProperty(FIELD_NAME_STATUS)
+   private final int subtask;
--- End diff --

is that the status or the subtask name? Adapt the field annotation 
accordingly.


> Port JobVertexBackPressureHandler to REST endpoint
> --
>
> 

[GitHub] flink pull request #4893: [FLINK-7856][flip6] Port JobVertexBackPressureHand...

2017-11-07 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4893#discussion_r149305236
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexBackPressureInfo.java
 ---
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import 
org.apache.flink.runtime.rest.handler.job.JobVertexBackPressureHandler;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Response type of the {@link JobVertexBackPressureHandler}.
+ */
+public class JobVertexBackPressureInfo implements ResponseBody {
+
+   public static final String FIELD_NAME_STATUS = "status";
+   public static final String FIELD_NAME_BACKPRESSURE_LEVEL = 
"backpressure-level";
+   public static final String FIELD_NAME_END_TIMESTAMP = "end-timestamp";
+   public static final String FIELD_NAME_SUBTASKS = "subtasks";
+
+   @JsonProperty(FIELD_NAME_STATUS)
+   private final String status;
+
+   @JsonProperty(FIELD_NAME_BACKPRESSURE_LEVEL)
+   private final String backpressureLevel;
+
+   @JsonProperty(FIELD_NAME_END_TIMESTAMP)
+   private final Long endTimestamp;
+
+   @JsonProperty(FIELD_NAME_SUBTASKS)
+   private final List subtasks;
+
+   @JsonCreator
+   public JobVertexBackPressureInfo(
+   @JsonProperty(FIELD_NAME_STATUS) String status,
+   @JsonProperty(FIELD_NAME_BACKPRESSURE_LEVEL) String 
backpressureLevel,
+   @JsonProperty(FIELD_NAME_END_TIMESTAMP) Long endTimestamp,
+   @JsonProperty(FIELD_NAME_SUBTASKS) 
List subtasks) {
+   this.status = status;
+   this.backpressureLevel = backpressureLevel;
+   this.endTimestamp = endTimestamp;
+   this.subtasks = subtasks;
+   }
+
+   @Override
+   public boolean equals(Object o) {
+   if (this == o) {
+   return true;
+   }
+   if (o == null || getClass() != o.getClass()) {
+   return false;
+   }
+   JobVertexBackPressureInfo that = (JobVertexBackPressureInfo) o;
+   return Objects.equals(status, that.status) &&
+   Objects.equals(backpressureLevel, 
that.backpressureLevel) &&
+   Objects.equals(endTimestamp, that.endTimestamp) &&
+   Objects.equals(subtasks, that.subtasks);
+   }
+
+   @Override
+   public int hashCode() {
+   return Objects.hash(status, backpressureLevel, endTimestamp, 
subtasks);
+   }
+
+   
//-
+   // Static helper classes
+   
//-
+
+   /**
+* Nested class to encapsulate the sub tasks back pressure.
+*/
+   public static final class SubtaskBackPressureInfo {
+
+   public static final String FIELD_NAME_STATUS = "subtask";
+   public static final String FIELD_NAME_BACKPRESSURE_LEVEL = 
"backpressure-level";
+   public static final String FIELD_NAME_RATIO = "ratio";
+
+   @JsonProperty(FIELD_NAME_STATUS)
+   private final int subtask;
--- End diff --

is that the status or the subtask name? Adapt the field annotation 
accordingly.


---


[jira] [Commented] (FLINK-5633) ClassCastException: X cannot be cast to X when re-submitting a job.

2017-11-07 Thread Erik van Oosten (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5633?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16241701#comment-16241701
 ] 

Erik van Oosten commented on FLINK-5633:


[~StephanEwen] We need to process 130K msg/s, I guess that can be called often 
:) . Our process is CPU bound and parsing Avro is ±15% of that. Any improvement 
means we can run with fewer machines.

For every message we create a new SpecificDatumReader. If I follow the code 
correctly that should _not_ give a large overhead. The Schema instances we pass 
to it _are_ cached.

Then we call {SpecificDatumReader.read}} to parse each Avro message. In that 
call you eventually end up in {{SpecificData.newInstance}} to create a new 
instance of the target class. The constructor of that class is looked up in a 
cache. That cache is declared as {{static}}. I do not understand how 
instantiating a new {{SpecificData}} for every call to {{read}} helps because 
it would still use the same cache. The code I pasted above also uses a 
constructor cache but the cache is not {{static}}. Reversing the class loader 
order should also work.

> ClassCastException: X cannot be cast to X when re-submitting a job.
> ---
>
> Key: FLINK-5633
> URL: https://issues.apache.org/jira/browse/FLINK-5633
> Project: Flink
>  Issue Type: Bug
>  Components: Job-Submission, YARN
>Affects Versions: 1.1.4
>Reporter: Giuliano Caliari
>Priority: Minor
>
> I’m running a job on my local cluster and the first time I submit the job 
> everything works but whenever I cancel and re-submit the same job it fails 
> with:
> {quote}
> org.apache.flink.client.program.ProgramInvocationException: The program 
> execution failed: Job execution failed.
>   at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427)
>   at 
> org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:101)
>   at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:400)
>   at 
> org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
>   at 
> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:634)
>   at au.com.my.package.pTraitor.OneTrait.execute(Traitor.scala:147)
>   at 
> au.com.my.package.pTraitor.TraitorAppOneTrait$.delayedEndpoint$au$com$my$package$pTraitor$TraitorAppOneTrait$1(TraitorApp.scala:22)
>   at 
> au.com.my.package.pTraitor.TraitorAppOneTrait$delayedInit$body.apply(TraitorApp.scala:21)
>   at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
>   at 
> scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
>   at scala.App$$anonfun$main$1.apply(App.scala:76)
>   at scala.App$$anonfun$main$1.apply(App.scala:76)
>   at scala.collection.immutable.List.foreach(List.scala:381)
>   at 
> scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
>   at scala.App$class.main(App.scala:76)
>   at 
> au.com.my.package.pTraitor.TraitorAppOneTrait$.main(TraitorApp.scala:21)
>   at au.com.my.package.pTraitor.TraitorAppOneTrait.main(TraitorApp.scala)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:528)
>   at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:419)
>   at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:339)
>   at 
> org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:831)
>   at org.apache.flink.client.CliFrontend.run(CliFrontend.java:256)
>   at 
> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1073)
>   at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1120)
>   at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1117)
>   at 
> org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:29)
>   at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1116)
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job 
> execution failed.
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:900)
>   at 
> 

[jira] [Commented] (FLINK-7856) Port JobVertexBackPressureHandler to REST endpoint

2017-11-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16241702#comment-16241702
 ] 

ASF GitHub Bot commented on FLINK-7856:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4893#discussion_r149305060
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexBackPressureInfo.java
 ---
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import 
org.apache.flink.runtime.rest.handler.job.JobVertexBackPressureHandler;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Response type of the {@link JobVertexBackPressureHandler}.
+ */
+public class JobVertexBackPressureInfo implements ResponseBody {
+
+   public static final String FIELD_NAME_STATUS = "status";
+   public static final String FIELD_NAME_BACKPRESSURE_LEVEL = 
"backpressure-level";
+   public static final String FIELD_NAME_END_TIMESTAMP = "end-timestamp";
+   public static final String FIELD_NAME_SUBTASKS = "subtasks";
+
+   @JsonProperty(FIELD_NAME_STATUS)
+   private final String status;
+
+   @JsonProperty(FIELD_NAME_BACKPRESSURE_LEVEL)
+   private final String backpressureLevel;
+
+   @JsonProperty(FIELD_NAME_END_TIMESTAMP)
+   private final Long endTimestamp;
--- End diff --

no need for a the object type here. We can use a primitive long.


> Port JobVertexBackPressureHandler to REST endpoint
> --
>
> Key: FLINK-7856
> URL: https://issues.apache.org/jira/browse/FLINK-7856
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, REST, Webfrontend
>Reporter: Fang Yong
>Assignee: Fang Yong
>
> Port JobVertexBackPressureHandler to REST endpoint



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4893: [FLINK-7856][flip6] Port JobVertexBackPressureHand...

2017-11-07 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4893#discussion_r149305184
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexBackPressureInfo.java
 ---
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import 
org.apache.flink.runtime.rest.handler.job.JobVertexBackPressureHandler;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Response type of the {@link JobVertexBackPressureHandler}.
+ */
+public class JobVertexBackPressureInfo implements ResponseBody {
+
+   public static final String FIELD_NAME_STATUS = "status";
+   public static final String FIELD_NAME_BACKPRESSURE_LEVEL = 
"backpressure-level";
+   public static final String FIELD_NAME_END_TIMESTAMP = "end-timestamp";
+   public static final String FIELD_NAME_SUBTASKS = "subtasks";
+
+   @JsonProperty(FIELD_NAME_STATUS)
+   private final String status;
+
+   @JsonProperty(FIELD_NAME_BACKPRESSURE_LEVEL)
+   private final String backpressureLevel;
+
+   @JsonProperty(FIELD_NAME_END_TIMESTAMP)
+   private final Long endTimestamp;
+
+   @JsonProperty(FIELD_NAME_SUBTASKS)
+   private final List subtasks;
+
+   @JsonCreator
+   public JobVertexBackPressureInfo(
+   @JsonProperty(FIELD_NAME_STATUS) String status,
+   @JsonProperty(FIELD_NAME_BACKPRESSURE_LEVEL) String 
backpressureLevel,
+   @JsonProperty(FIELD_NAME_END_TIMESTAMP) Long endTimestamp,
+   @JsonProperty(FIELD_NAME_SUBTASKS) 
List subtasks) {
+   this.status = status;
+   this.backpressureLevel = backpressureLevel;
+   this.endTimestamp = endTimestamp;
+   this.subtasks = subtasks;
+   }
+
+   @Override
+   public boolean equals(Object o) {
+   if (this == o) {
+   return true;
+   }
+   if (o == null || getClass() != o.getClass()) {
+   return false;
+   }
+   JobVertexBackPressureInfo that = (JobVertexBackPressureInfo) o;
+   return Objects.equals(status, that.status) &&
+   Objects.equals(backpressureLevel, 
that.backpressureLevel) &&
+   Objects.equals(endTimestamp, that.endTimestamp) &&
+   Objects.equals(subtasks, that.subtasks);
+   }
+
+   @Override
+   public int hashCode() {
+   return Objects.hash(status, backpressureLevel, endTimestamp, 
subtasks);
+   }
+
+   
//-
+   // Static helper classes
+   
//-
+
+   /**
+* Nested class to encapsulate the sub tasks back pressure.
+*/
+   public static final class SubtaskBackPressureInfo {
+
+   public static final String FIELD_NAME_STATUS = "subtask";
+   public static final String FIELD_NAME_BACKPRESSURE_LEVEL = 
"backpressure-level";
+   public static final String FIELD_NAME_RATIO = "ratio";
+
+   @JsonProperty(FIELD_NAME_STATUS)
+   private final int subtask;
+
+   @JsonProperty(FIELD_NAME_BACKPRESSURE_LEVEL)
+   private final String backpressureLevel;
--- End diff --

enum


---


[GitHub] flink pull request #4893: [FLINK-7856][flip6] Port JobVertexBackPressureHand...

2017-11-07 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4893#discussion_r149305450
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexBackPressureHandler.java
 ---
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.WebOptions;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
+import 
org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureStatsTracker;
+import 
org.apache.flink.runtime.rest.handler.legacy.backpressure.OperatorBackPressureStats;
+import 
org.apache.flink.runtime.rest.handler.legacy.backpressure.StackTraceSampleCoordinator;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobVertexBackPressureInfo;
+import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter;
+import org.apache.flink.runtime.rest.messages.JobVertexMessageParameters;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+/**
+ * Request handler for the job vertex back pressure.
+ */
+public class JobVertexBackPressureHandler extends 
AbstractExecutionGraphHandler {
+   /** Back pressure stats tracker. */
+   private final BackPressureStatsTracker backPressureStatsTracker;
+
+   /** Time after which stats are considered outdated. */
+   private final int refreshInterval;
+
+   public JobVertexBackPressureHandler(
+   CompletableFuture localRestAddress,
+   GatewayRetriever 
leaderRetriever,
+   Time timeout,
+   MessageHeaders messageHeaders,
+   ExecutionGraphCache executionGraphCache,
+   Executor executor,
+   Configuration clusterConfiguration) {
+   super(localRestAddress, leaderRetriever, timeout, 
messageHeaders, executionGraphCache, executor);
+
+   // Back pressure stats tracker config
+   this.refreshInterval = 
clusterConfiguration.getInteger(WebOptions.BACKPRESSURE_REFRESH_INTERVAL);
+   this.backPressureStatsTracker = new BackPressureStatsTracker(
+   new StackTraceSampleCoordinator(executor, 6),
+   
clusterConfiguration.getInteger(WebOptions.BACKPRESSURE_CLEANUP_INTERVAL),
+   
clusterConfiguration.getInteger(WebOptions.BACKPRESSURE_NUM_SAMPLES),
+   
Time.milliseconds(clusterConfiguration.getInteger(WebOptions.BACKPRESSURE_DELAY)));
+   }
+
+   @Override
+   protected JobVertexBackPressureInfo 
handleRequest(HandlerRequest 
request, AccessExecutionGraph executionGraph) throws RestHandlerException {
+   JobVertexID jobVertexID = 

[GitHub] flink pull request #4893: [FLINK-7856][flip6] Port JobVertexBackPressureHand...

2017-11-07 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4893#discussion_r149305096
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexBackPressureInfo.java
 ---
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import 
org.apache.flink.runtime.rest.handler.job.JobVertexBackPressureHandler;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Response type of the {@link JobVertexBackPressureHandler}.
+ */
+public class JobVertexBackPressureInfo implements ResponseBody {
+
+   public static final String FIELD_NAME_STATUS = "status";
+   public static final String FIELD_NAME_BACKPRESSURE_LEVEL = 
"backpressure-level";
+   public static final String FIELD_NAME_END_TIMESTAMP = "end-timestamp";
+   public static final String FIELD_NAME_SUBTASKS = "subtasks";
+
+   @JsonProperty(FIELD_NAME_STATUS)
+   private final String status;
+
+   @JsonProperty(FIELD_NAME_BACKPRESSURE_LEVEL)
+   private final String backpressureLevel;
+
+   @JsonProperty(FIELD_NAME_END_TIMESTAMP)
+   private final Long endTimestamp;
+
+   @JsonProperty(FIELD_NAME_SUBTASKS)
+   private final List subtasks;
+
+   @JsonCreator
+   public JobVertexBackPressureInfo(
+   @JsonProperty(FIELD_NAME_STATUS) String status,
+   @JsonProperty(FIELD_NAME_BACKPRESSURE_LEVEL) String 
backpressureLevel,
+   @JsonProperty(FIELD_NAME_END_TIMESTAMP) Long endTimestamp,
+   @JsonProperty(FIELD_NAME_SUBTASKS) 
List subtasks) {
+   this.status = status;
+   this.backpressureLevel = backpressureLevel;
+   this.endTimestamp = endTimestamp;
+   this.subtasks = subtasks;
--- End diff --

null checks are missing.


---


[GitHub] flink pull request #4893: [FLINK-7856][flip6] Port JobVertexBackPressureHand...

2017-11-07 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4893#discussion_r149305060
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexBackPressureInfo.java
 ---
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import 
org.apache.flink.runtime.rest.handler.job.JobVertexBackPressureHandler;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Response type of the {@link JobVertexBackPressureHandler}.
+ */
+public class JobVertexBackPressureInfo implements ResponseBody {
+
+   public static final String FIELD_NAME_STATUS = "status";
+   public static final String FIELD_NAME_BACKPRESSURE_LEVEL = 
"backpressure-level";
+   public static final String FIELD_NAME_END_TIMESTAMP = "end-timestamp";
+   public static final String FIELD_NAME_SUBTASKS = "subtasks";
+
+   @JsonProperty(FIELD_NAME_STATUS)
+   private final String status;
+
+   @JsonProperty(FIELD_NAME_BACKPRESSURE_LEVEL)
+   private final String backpressureLevel;
+
+   @JsonProperty(FIELD_NAME_END_TIMESTAMP)
+   private final Long endTimestamp;
--- End diff --

no need for a the object type here. We can use a primitive long.


---


[GitHub] flink pull request #4893: [FLINK-7856][flip6] Port JobVertexBackPressureHand...

2017-11-07 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4893#discussion_r149305275
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexBackPressureInfo.java
 ---
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import 
org.apache.flink.runtime.rest.handler.job.JobVertexBackPressureHandler;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Response type of the {@link JobVertexBackPressureHandler}.
+ */
+public class JobVertexBackPressureInfo implements ResponseBody {
+
+   public static final String FIELD_NAME_STATUS = "status";
+   public static final String FIELD_NAME_BACKPRESSURE_LEVEL = 
"backpressure-level";
+   public static final String FIELD_NAME_END_TIMESTAMP = "end-timestamp";
+   public static final String FIELD_NAME_SUBTASKS = "subtasks";
+
+   @JsonProperty(FIELD_NAME_STATUS)
+   private final String status;
+
+   @JsonProperty(FIELD_NAME_BACKPRESSURE_LEVEL)
+   private final String backpressureLevel;
+
+   @JsonProperty(FIELD_NAME_END_TIMESTAMP)
+   private final Long endTimestamp;
+
+   @JsonProperty(FIELD_NAME_SUBTASKS)
+   private final List subtasks;
+
+   @JsonCreator
+   public JobVertexBackPressureInfo(
+   @JsonProperty(FIELD_NAME_STATUS) String status,
+   @JsonProperty(FIELD_NAME_BACKPRESSURE_LEVEL) String 
backpressureLevel,
+   @JsonProperty(FIELD_NAME_END_TIMESTAMP) Long endTimestamp,
+   @JsonProperty(FIELD_NAME_SUBTASKS) 
List subtasks) {
+   this.status = status;
+   this.backpressureLevel = backpressureLevel;
+   this.endTimestamp = endTimestamp;
+   this.subtasks = subtasks;
+   }
+
+   @Override
+   public boolean equals(Object o) {
+   if (this == o) {
+   return true;
+   }
+   if (o == null || getClass() != o.getClass()) {
+   return false;
+   }
+   JobVertexBackPressureInfo that = (JobVertexBackPressureInfo) o;
+   return Objects.equals(status, that.status) &&
+   Objects.equals(backpressureLevel, 
that.backpressureLevel) &&
+   Objects.equals(endTimestamp, that.endTimestamp) &&
+   Objects.equals(subtasks, that.subtasks);
+   }
+
+   @Override
+   public int hashCode() {
+   return Objects.hash(status, backpressureLevel, endTimestamp, 
subtasks);
+   }
+
+   
//-
+   // Static helper classes
+   
//-
+
+   /**
+* Nested class to encapsulate the sub tasks back pressure.
+*/
+   public static final class SubtaskBackPressureInfo {
+
+   public static final String FIELD_NAME_STATUS = "subtask";
+   public static final String FIELD_NAME_BACKPRESSURE_LEVEL = 
"backpressure-level";
+   public static final String FIELD_NAME_RATIO = "ratio";
+
+   @JsonProperty(FIELD_NAME_STATUS)
+   private final int subtask;
+
+   @JsonProperty(FIELD_NAME_BACKPRESSURE_LEVEL)
+   private final String backpressureLevel;
+
+   @JsonProperty(FIELD_NAME_RATIO)
+   private final double ratio;
+
+   public SubtaskBackPressureInfo(
+   @JsonProperty(FIELD_NAME_STATUS) int subtask,
+   @JsonProperty(FIELD_NAME_BACKPRESSURE_LEVEL) String 
backpressureLevel,
+   

[GitHub] flink pull request #4893: [FLINK-7856][flip6] Port JobVertexBackPressureHand...

2017-11-07 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4893#discussion_r149304998
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexBackPressureInfo.java
 ---
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import 
org.apache.flink.runtime.rest.handler.job.JobVertexBackPressureHandler;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Response type of the {@link JobVertexBackPressureHandler}.
+ */
+public class JobVertexBackPressureInfo implements ResponseBody {
+
+   public static final String FIELD_NAME_STATUS = "status";
+   public static final String FIELD_NAME_BACKPRESSURE_LEVEL = 
"backpressure-level";
+   public static final String FIELD_NAME_END_TIMESTAMP = "end-timestamp";
+   public static final String FIELD_NAME_SUBTASKS = "subtasks";
+
+   @JsonProperty(FIELD_NAME_STATUS)
+   private final String status;
+
+   @JsonProperty(FIELD_NAME_BACKPRESSURE_LEVEL)
+   private final String backpressureLevel;
--- End diff --

Same here, this should be an enum.


---


[jira] [Commented] (FLINK-7856) Port JobVertexBackPressureHandler to REST endpoint

2017-11-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16241700#comment-16241700
 ] 

ASF GitHub Bot commented on FLINK-7856:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4893#discussion_r149304998
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexBackPressureInfo.java
 ---
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import 
org.apache.flink.runtime.rest.handler.job.JobVertexBackPressureHandler;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Response type of the {@link JobVertexBackPressureHandler}.
+ */
+public class JobVertexBackPressureInfo implements ResponseBody {
+
+   public static final String FIELD_NAME_STATUS = "status";
+   public static final String FIELD_NAME_BACKPRESSURE_LEVEL = 
"backpressure-level";
+   public static final String FIELD_NAME_END_TIMESTAMP = "end-timestamp";
+   public static final String FIELD_NAME_SUBTASKS = "subtasks";
+
+   @JsonProperty(FIELD_NAME_STATUS)
+   private final String status;
+
+   @JsonProperty(FIELD_NAME_BACKPRESSURE_LEVEL)
+   private final String backpressureLevel;
--- End diff --

Same here, this should be an enum.


> Port JobVertexBackPressureHandler to REST endpoint
> --
>
> Key: FLINK-7856
> URL: https://issues.apache.org/jira/browse/FLINK-7856
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, REST, Webfrontend
>Reporter: Fang Yong
>Assignee: Fang Yong
>
> Port JobVertexBackPressureHandler to REST endpoint



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7856) Port JobVertexBackPressureHandler to REST endpoint

2017-11-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16241705#comment-16241705
 ] 

ASF GitHub Bot commented on FLINK-7856:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4893#discussion_r149305184
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexBackPressureInfo.java
 ---
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import 
org.apache.flink.runtime.rest.handler.job.JobVertexBackPressureHandler;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Response type of the {@link JobVertexBackPressureHandler}.
+ */
+public class JobVertexBackPressureInfo implements ResponseBody {
+
+   public static final String FIELD_NAME_STATUS = "status";
+   public static final String FIELD_NAME_BACKPRESSURE_LEVEL = 
"backpressure-level";
+   public static final String FIELD_NAME_END_TIMESTAMP = "end-timestamp";
+   public static final String FIELD_NAME_SUBTASKS = "subtasks";
+
+   @JsonProperty(FIELD_NAME_STATUS)
+   private final String status;
+
+   @JsonProperty(FIELD_NAME_BACKPRESSURE_LEVEL)
+   private final String backpressureLevel;
+
+   @JsonProperty(FIELD_NAME_END_TIMESTAMP)
+   private final Long endTimestamp;
+
+   @JsonProperty(FIELD_NAME_SUBTASKS)
+   private final List subtasks;
+
+   @JsonCreator
+   public JobVertexBackPressureInfo(
+   @JsonProperty(FIELD_NAME_STATUS) String status,
+   @JsonProperty(FIELD_NAME_BACKPRESSURE_LEVEL) String 
backpressureLevel,
+   @JsonProperty(FIELD_NAME_END_TIMESTAMP) Long endTimestamp,
+   @JsonProperty(FIELD_NAME_SUBTASKS) 
List subtasks) {
+   this.status = status;
+   this.backpressureLevel = backpressureLevel;
+   this.endTimestamp = endTimestamp;
+   this.subtasks = subtasks;
+   }
+
+   @Override
+   public boolean equals(Object o) {
+   if (this == o) {
+   return true;
+   }
+   if (o == null || getClass() != o.getClass()) {
+   return false;
+   }
+   JobVertexBackPressureInfo that = (JobVertexBackPressureInfo) o;
+   return Objects.equals(status, that.status) &&
+   Objects.equals(backpressureLevel, 
that.backpressureLevel) &&
+   Objects.equals(endTimestamp, that.endTimestamp) &&
+   Objects.equals(subtasks, that.subtasks);
+   }
+
+   @Override
+   public int hashCode() {
+   return Objects.hash(status, backpressureLevel, endTimestamp, 
subtasks);
+   }
+
+   
//-
+   // Static helper classes
+   
//-
+
+   /**
+* Nested class to encapsulate the sub tasks back pressure.
+*/
+   public static final class SubtaskBackPressureInfo {
+
+   public static final String FIELD_NAME_STATUS = "subtask";
+   public static final String FIELD_NAME_BACKPRESSURE_LEVEL = 
"backpressure-level";
+   public static final String FIELD_NAME_RATIO = "ratio";
+
+   @JsonProperty(FIELD_NAME_STATUS)
+   private final int subtask;
+
+   @JsonProperty(FIELD_NAME_BACKPRESSURE_LEVEL)
+   private final String backpressureLevel;
--- End diff --

enum


> Port JobVertexBackPressureHandler to REST endpoint
> 

[jira] [Commented] (FLINK-7856) Port JobVertexBackPressureHandler to REST endpoint

2017-11-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16241703#comment-16241703
 ] 

ASF GitHub Bot commented on FLINK-7856:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4893#discussion_r149305096
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexBackPressureInfo.java
 ---
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import 
org.apache.flink.runtime.rest.handler.job.JobVertexBackPressureHandler;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Response type of the {@link JobVertexBackPressureHandler}.
+ */
+public class JobVertexBackPressureInfo implements ResponseBody {
+
+   public static final String FIELD_NAME_STATUS = "status";
+   public static final String FIELD_NAME_BACKPRESSURE_LEVEL = 
"backpressure-level";
+   public static final String FIELD_NAME_END_TIMESTAMP = "end-timestamp";
+   public static final String FIELD_NAME_SUBTASKS = "subtasks";
+
+   @JsonProperty(FIELD_NAME_STATUS)
+   private final String status;
+
+   @JsonProperty(FIELD_NAME_BACKPRESSURE_LEVEL)
+   private final String backpressureLevel;
+
+   @JsonProperty(FIELD_NAME_END_TIMESTAMP)
+   private final Long endTimestamp;
+
+   @JsonProperty(FIELD_NAME_SUBTASKS)
+   private final List subtasks;
+
+   @JsonCreator
+   public JobVertexBackPressureInfo(
+   @JsonProperty(FIELD_NAME_STATUS) String status,
+   @JsonProperty(FIELD_NAME_BACKPRESSURE_LEVEL) String 
backpressureLevel,
+   @JsonProperty(FIELD_NAME_END_TIMESTAMP) Long endTimestamp,
+   @JsonProperty(FIELD_NAME_SUBTASKS) 
List subtasks) {
+   this.status = status;
+   this.backpressureLevel = backpressureLevel;
+   this.endTimestamp = endTimestamp;
+   this.subtasks = subtasks;
--- End diff --

null checks are missing.


> Port JobVertexBackPressureHandler to REST endpoint
> --
>
> Key: FLINK-7856
> URL: https://issues.apache.org/jira/browse/FLINK-7856
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, REST, Webfrontend
>Reporter: Fang Yong
>Assignee: Fang Yong
>
> Port JobVertexBackPressureHandler to REST endpoint



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4893: [FLINK-7856][flip6] Port JobVertexBackPressureHand...

2017-11-07 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4893#discussion_r149304880
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexBackPressureInfo.java
 ---
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import 
org.apache.flink.runtime.rest.handler.job.JobVertexBackPressureHandler;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Response type of the {@link JobVertexBackPressureHandler}.
+ */
+public class JobVertexBackPressureInfo implements ResponseBody {
+
+   public static final String FIELD_NAME_STATUS = "status";
+   public static final String FIELD_NAME_BACKPRESSURE_LEVEL = 
"backpressure-level";
+   public static final String FIELD_NAME_END_TIMESTAMP = "end-timestamp";
+   public static final String FIELD_NAME_SUBTASKS = "subtasks";
+
+   @JsonProperty(FIELD_NAME_STATUS)
+   private final String status;
--- End diff --

This should be an enum, I think.


---


[GitHub] flink issue #4893: [FLINK-7856][flip6] Port JobVertexBackPressureHandler to ...

2017-11-07 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/4893
  
I think the plan sounds good @zjureel. We should however not do it in this 
PR. Thus, I would suggest that we create a `JobVertexBackPressureHandler` which 
directly inherits from `AbstractRestHandler` and for the time being returns an 
empty `JobVertexBackPressureInfo`. Then I would add the 
`BackPressureStatsTracker` to the `JobMaster` in a separate PR. What do you 
think?


---


[jira] [Commented] (FLINK-7856) Port JobVertexBackPressureHandler to REST endpoint

2017-11-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16241698#comment-16241698
 ] 

ASF GitHub Bot commented on FLINK-7856:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/4893
  
I think the plan sounds good @zjureel. We should however not do it in this 
PR. Thus, I would suggest that we create a `JobVertexBackPressureHandler` which 
directly inherits from `AbstractRestHandler` and for the time being returns an 
empty `JobVertexBackPressureInfo`. Then I would add the 
`BackPressureStatsTracker` to the `JobMaster` in a separate PR. What do you 
think?


> Port JobVertexBackPressureHandler to REST endpoint
> --
>
> Key: FLINK-7856
> URL: https://issues.apache.org/jira/browse/FLINK-7856
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, REST, Webfrontend
>Reporter: Fang Yong
>Assignee: Fang Yong
>
> Port JobVertexBackPressureHandler to REST endpoint



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7856) Port JobVertexBackPressureHandler to REST endpoint

2017-11-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16241699#comment-16241699
 ] 

ASF GitHub Bot commented on FLINK-7856:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4893#discussion_r149304880
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexBackPressureInfo.java
 ---
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import 
org.apache.flink.runtime.rest.handler.job.JobVertexBackPressureHandler;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Response type of the {@link JobVertexBackPressureHandler}.
+ */
+public class JobVertexBackPressureInfo implements ResponseBody {
+
+   public static final String FIELD_NAME_STATUS = "status";
+   public static final String FIELD_NAME_BACKPRESSURE_LEVEL = 
"backpressure-level";
+   public static final String FIELD_NAME_END_TIMESTAMP = "end-timestamp";
+   public static final String FIELD_NAME_SUBTASKS = "subtasks";
+
+   @JsonProperty(FIELD_NAME_STATUS)
+   private final String status;
--- End diff --

This should be an enum, I think.


> Port JobVertexBackPressureHandler to REST endpoint
> --
>
> Key: FLINK-7856
> URL: https://issues.apache.org/jira/browse/FLINK-7856
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, REST, Webfrontend
>Reporter: Fang Yong
>Assignee: Fang Yong
>
> Port JobVertexBackPressureHandler to REST endpoint



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7076) Implement container release to support dynamic scaling

2017-11-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16241696#comment-16241696
 ] 

ASF GitHub Bot commented on FLINK-7076:
---

Github user suez1224 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4729#discussion_r149304158
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnWorkerNode.java ---
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import 
org.apache.flink.runtime.clusterframework.types.ResourceIDRetrievable;
+
+import org.apache.hadoop.yarn.api.records.Container;
+
+import java.io.Serializable;
+
+/**
+ * A stored YARN worker, which contains the YARN container.
+ */
+public class YarnWorkerNode implements ResourceIDRetrievable, Serializable 
{
--- End diff --

@tillrohrmann But in ResourceManager.java, WorkerType has to be 
Serializable (i.e., "ResourceManager"). Do you 
have any recommendation?


> Implement container release to support dynamic scaling
> --
>
> Key: FLINK-7076
> URL: https://issues.apache.org/jira/browse/FLINK-7076
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Reporter: Till Rohrmann
>Assignee: Shuyi Chen
>  Labels: flip-6
>
> In order to support dynamic scaling, the {{YarnResourceManager}} has to be 
> able to dynamically free containers. We have to implement the 
> {{YarnResourceManager#stopWorker}} method.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4729: [FLINK-7076] [ResourceManager] implement YARN stop...

2017-11-07 Thread suez1224
Github user suez1224 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4729#discussion_r149304158
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnWorkerNode.java ---
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import 
org.apache.flink.runtime.clusterframework.types.ResourceIDRetrievable;
+
+import org.apache.hadoop.yarn.api.records.Container;
+
+import java.io.Serializable;
+
+/**
+ * A stored YARN worker, which contains the YARN container.
+ */
+public class YarnWorkerNode implements ResourceIDRetrievable, Serializable 
{
--- End diff --

@tillrohrmann But in ResourceManager.java, WorkerType has to be 
Serializable (i.e., "ResourceManager"). Do you 
have any recommendation?


---


[GitHub] flink issue #4949: [FLINK-7866] [runtime] Weigh list of preferred locations ...

2017-11-07 Thread sihuazhou
Github user sihuazhou commented on the issue:

https://github.com/apache/flink/pull/4949
  
Hi, @tillrohrmann Could you please have a rough look at this ? I'm not sure 
this is you wanted.


---


[jira] [Commented] (FLINK-7866) Weigh list of preferred locations for scheduling

2017-11-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16241691#comment-16241691
 ] 

ASF GitHub Bot commented on FLINK-7866:
---

Github user sihuazhou commented on the issue:

https://github.com/apache/flink/pull/4949
  
Hi, @tillrohrmann Could you please have a rough look at this ? I'm not sure 
this is you wanted.


> Weigh list of preferred locations for scheduling
> 
>
> Key: FLINK-7866
> URL: https://issues.apache.org/jira/browse/FLINK-7866
> Project: Flink
>  Issue Type: Improvement
>  Components: Scheduler
>Affects Versions: 1.4.0, 1.3.2
>Reporter: Till Rohrmann
>Assignee: Sihua Zhou
> Fix For: 1.5.0
>
>
> [~sihuazhou] proposed to not only use the list of preferred locations to 
> decide where to schedule a task, but to also weigh the list according to how 
> often a location appeared and then select the location based on the weight. 
> That way, we would obtain better locality in some cases.
> Example:
> Preferred locations list: {{[location1, location2, location2]}}
> Weighted preferred locations list {{[(location2 , 2), (location1, 1)]}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4960: Update version to 1.5-SNAPSHOT

2017-11-07 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/4960
  
+1


---


[GitHub] flink issue #4935: [Flink-7945][Metrics]Fix per partition-lag metr...

2017-11-07 Thread Aitozi
Github user Aitozi commented on the issue:

https://github.com/apache/flink/pull/4935
  
Hi @tzulitai , could you take look at this again :-) ?


---


[jira] [Commented] (FLINK-7652) Port CurrentJobIdsHandler to new REST endpoint

2017-11-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7652?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16241669#comment-16241669
 ] 

ASF GitHub Bot commented on FLINK-7652:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/4734
  
@tillrohrmann yes, reusing `DispatcherGateway#requestJobDetails` was an 
option that did occur to me but was also not sure of the redundant cost. But 
sure, we can avoid adding yet another RPC for now.

Thanks for the branch link. I'll rebase onto that.


> Port CurrentJobIdsHandler to new REST endpoint
> --
>
> Key: FLINK-7652
> URL: https://issues.apache.org/jira/browse/FLINK-7652
> Project: Flink
>  Issue Type: Sub-task
>  Components: REST, Webfrontend
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>  Labels: flip-6
> Fix For: 1.4.0
>
>
> Port existing {{CurrentJobIdsHandler}} to new REST endpoint



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4734: [FLINK-7652] [flip6] Port CurrentJobIdsHandler to new RES...

2017-11-07 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/4734
  
@tillrohrmann yes, reusing `DispatcherGateway#requestJobDetails` was an 
option that did occur to me but was also not sure of the redundant cost. But 
sure, we can avoid adding yet another RPC for now.

Thanks for the branch link. I'll rebase onto that.


---


<    1   2   3