[jira] [Commented] (FLINK-7822) Add documentation for the new queryable state client.
[ https://issues.apache.org/jira/browse/FLINK-7822?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16241854#comment-16241854 ] ASF GitHub Bot commented on FLINK-7822: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/4966#discussion_r149337820 --- Diff: docs/dev/stream/state/queryable_state.md --- @@ -32,38 +32,67 @@ under the License. likely that there will be breaking API changes on the client side in the upcoming Flink versions. -In a nutshell, this feature allows users to query Flink's managed partitioned state -(see [Working with State]({{ site.baseurl }}/dev/stream/state/state.html)) from outside of -Flink. For some scenarios, queryable state thus eliminates the need for distributed -operations/transactions with external systems such as key-value stores which are often the -bottleneck in practice. +In a nutshell, this feature exposes Flink's managed keyed (partitioned) state +(see [Working with State]({{ site.baseurl }}/dev/stream/state/state.html)) to the outside world and +allows the user to query a job's state from outside Flink. For some scenarios, queryable state +eliminates the need for distributed operations/transactions with external systems such as key-value +stores which are often the bottleneck in practice. In addition, this feature may be particularly +useful for debugging purposes. - Attention: Queryable state accesses keyed state from a concurrent thread rather - than synchronizing with the operator and potentially blocking its operation. Since any state - backend using Java heap space, e.g. MemoryStateBackend or - FsStateBackend, does not work with copies when retrieving values but instead directly - references the stored values, read-modify-write patterns are unsafe and may cause the - queryable state server to fail due to concurrent modifications. - The RocksDBStateBackend is safe from these issues. + Attention: When querying a state object, that object is accessed from a concurrent + thread without any synchronization or copying. This is a design choice, as any of the above would lead + to increased job latency, which we wanted to avoid. Since any state backend using Java heap space, + e.g. MemoryStateBackend or FsStateBackend, does not work + with copies when retrieving values but instead directly references the stored values, read-modify-write + patterns are unsafe and may cause the queryable state server to fail due to concurrent modifications. + The RocksDBStateBackend is safe from these issues. +## Architecture + +Before showing how to use the Queryable State, it is useful to briefly describe the entities that compose it. +The Queryable State consists of three main entities: + + 1. the `QueryableStateClient`, which (potentially) runs outside the Flink cluster and submits the user queries, + 2. the `QueryableStateClientProxy`, which runs on each `TaskManager` (*i.e.* inside the Flink cluster) and is responsible + for receiving the client's queries, fetching the requested state on his behalf, and returning it to the client, and + 3. the `QueryableStateServer` which runs on each `TaskManager` and is responsible for serving the locally stored state. + +In a nutshell, the client will connect to one of the proxies and send a request for the state associated with a specific +key, `k`. As stated in [Working with State]({{ site.baseurl }}/dev/stream/state/state.html), keyed state is organized in +*Key Groups*, and each `TaskManager` is assigned a number of these key groups. To discover which `TaskManager` is +responsible for the key group holding `k`, the proxy will ask the `JobManager`. Based on the answer, the proxy will +then query the `QueryableStateServer` running on that `TaskManager` for the state associated with `k`, and forward the +response back to the client. + +## Activating Queryable State + +To enable queryable state on your Flink cluster, you just have to copy the +`flink-queryable-state-runtime{{ site.scala_version_suffix }}-{{site.version }}.jar` +from the `opt/` folder of your [Flink distribution](https://flink.apache.org/downloads.html "Apache Flink: Downloads"), +to the `lib/` folder. In other case, the queryable state feature is not enabled. + +To verify that your cluster is running with queryable state enabled, check the logs of any +task manager for the line: `"Started the Queryable State Proxy Server @ ..."`. + ## Making State Queryable -In order to make state queryable, the queryable state server first needs to be enabled globally -by setting the `query.server.enable` configuration parameter
[jira] [Commented] (FLINK-7822) Add documentation for the new queryable state client.
[ https://issues.apache.org/jira/browse/FLINK-7822?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16241856#comment-16241856 ] ASF GitHub Bot commented on FLINK-7822: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/4966#discussion_r149337649 --- Diff: docs/dev/stream/state/queryable_state.md --- @@ -32,38 +32,67 @@ under the License. likely that there will be breaking API changes on the client side in the upcoming Flink versions. -In a nutshell, this feature allows users to query Flink's managed partitioned state -(see [Working with State]({{ site.baseurl }}/dev/stream/state/state.html)) from outside of -Flink. For some scenarios, queryable state thus eliminates the need for distributed -operations/transactions with external systems such as key-value stores which are often the -bottleneck in practice. +In a nutshell, this feature exposes Flink's managed keyed (partitioned) state +(see [Working with State]({{ site.baseurl }}/dev/stream/state/state.html)) to the outside world and +allows the user to query a job's state from outside Flink. For some scenarios, queryable state +eliminates the need for distributed operations/transactions with external systems such as key-value +stores which are often the bottleneck in practice. In addition, this feature may be particularly +useful for debugging purposes. - Attention: Queryable state accesses keyed state from a concurrent thread rather - than synchronizing with the operator and potentially blocking its operation. Since any state - backend using Java heap space, e.g. MemoryStateBackend or - FsStateBackend, does not work with copies when retrieving values but instead directly - references the stored values, read-modify-write patterns are unsafe and may cause the - queryable state server to fail due to concurrent modifications. - The RocksDBStateBackend is safe from these issues. + Attention: When querying a state object, that object is accessed from a concurrent + thread without any synchronization or copying. This is a design choice, as any of the above would lead + to increased job latency, which we wanted to avoid. Since any state backend using Java heap space, + e.g. MemoryStateBackend or FsStateBackend, does not work + with copies when retrieving values but instead directly references the stored values, read-modify-write + patterns are unsafe and may cause the queryable state server to fail due to concurrent modifications. + The RocksDBStateBackend is safe from these issues. +## Architecture + +Before showing how to use the Queryable State, it is useful to briefly describe the entities that compose it. +The Queryable State consists of three main entities: + + 1. the `QueryableStateClient`, which (potentially) runs outside the Flink cluster and submits the user queries, + 2. the `QueryableStateClientProxy`, which runs on each `TaskManager` (*i.e.* inside the Flink cluster) and is responsible + for receiving the client's queries, fetching the requested state on his behalf, and returning it to the client, and + 3. the `QueryableStateServer` which runs on each `TaskManager` and is responsible for serving the locally stored state. + +In a nutshell, the client will connect to one of the proxies and send a request for the state associated with a specific +key, `k`. As stated in [Working with State]({{ site.baseurl }}/dev/stream/state/state.html), keyed state is organized in +*Key Groups*, and each `TaskManager` is assigned a number of these key groups. To discover which `TaskManager` is +responsible for the key group holding `k`, the proxy will ask the `JobManager`. Based on the answer, the proxy will +then query the `QueryableStateServer` running on that `TaskManager` for the state associated with `k`, and forward the +response back to the client. + +## Activating Queryable State + +To enable queryable state on your Flink cluster, you just have to copy the --- End diff -- nit: omit "the" > Add documentation for the new queryable state client. > - > > Key: FLINK-7822 > URL: https://issues.apache.org/jira/browse/FLINK-7822 > Project: Flink > Issue Type: Sub-task > Components: Documentation, Queryable State >Affects Versions: 1.4.0 >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas >Priority: Blocker > Fix For: 1.4.0 > > > This has to include: > 1) how to enable queryable state (jars)... > 2) configuration parameters > 3) new APIs > 4) Guarantees/Semantics/Limitations (e.g.
[jira] [Commented] (FLINK-7822) Add documentation for the new queryable state client.
[ https://issues.apache.org/jira/browse/FLINK-7822?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16241852#comment-16241852 ] ASF GitHub Bot commented on FLINK-7822: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/4966#discussion_r149337260 --- Diff: docs/dev/stream/state/queryable_state.md --- @@ -32,38 +32,67 @@ under the License. likely that there will be breaking API changes on the client side in the upcoming Flink versions. -In a nutshell, this feature allows users to query Flink's managed partitioned state -(see [Working with State]({{ site.baseurl }}/dev/stream/state/state.html)) from outside of -Flink. For some scenarios, queryable state thus eliminates the need for distributed -operations/transactions with external systems such as key-value stores which are often the -bottleneck in practice. +In a nutshell, this feature exposes Flink's managed keyed (partitioned) state +(see [Working with State]({{ site.baseurl }}/dev/stream/state/state.html)) to the outside world and +allows the user to query a job's state from outside Flink. For some scenarios, queryable state +eliminates the need for distributed operations/transactions with external systems such as key-value +stores which are often the bottleneck in practice. In addition, this feature may be particularly +useful for debugging purposes. - Attention: Queryable state accesses keyed state from a concurrent thread rather - than synchronizing with the operator and potentially blocking its operation. Since any state - backend using Java heap space, e.g. MemoryStateBackend or - FsStateBackend, does not work with copies when retrieving values but instead directly - references the stored values, read-modify-write patterns are unsafe and may cause the - queryable state server to fail due to concurrent modifications. - The RocksDBStateBackend is safe from these issues. + Attention: When querying a state object, that object is accessed from a concurrent + thread without any synchronization or copying. This is a design choice, as any of the above would lead + to increased job latency, which we wanted to avoid. Since any state backend using Java heap space, + e.g. MemoryStateBackend or FsStateBackend, does not work + with copies when retrieving values but instead directly references the stored values, read-modify-write + patterns are unsafe and may cause the queryable state server to fail due to concurrent modifications. + The RocksDBStateBackend is safe from these issues. +## Architecture + +Before showing how to use the Queryable State, it is useful to briefly describe the entities that compose it. +The Queryable State consists of three main entities: --- End diff -- nit: "The Queryable State feature" > Add documentation for the new queryable state client. > - > > Key: FLINK-7822 > URL: https://issues.apache.org/jira/browse/FLINK-7822 > Project: Flink > Issue Type: Sub-task > Components: Documentation, Queryable State >Affects Versions: 1.4.0 >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas >Priority: Blocker > Fix For: 1.4.0 > > > This has to include: > 1) how to enable queryable state (jars)... > 2) configuration parameters > 3) new APIs > 4) Guarantees/Semantics/Limitations (e.g. memory backend) -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4966: [FLINK-7822][FLINK-7823] Adds documentation and fi...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/4966#discussion_r149337397 --- Diff: docs/dev/stream/state/queryable_state.md --- @@ -32,38 +32,67 @@ under the License. likely that there will be breaking API changes on the client side in the upcoming Flink versions. -In a nutshell, this feature allows users to query Flink's managed partitioned state -(see [Working with State]({{ site.baseurl }}/dev/stream/state/state.html)) from outside of -Flink. For some scenarios, queryable state thus eliminates the need for distributed -operations/transactions with external systems such as key-value stores which are often the -bottleneck in practice. +In a nutshell, this feature exposes Flink's managed keyed (partitioned) state +(see [Working with State]({{ site.baseurl }}/dev/stream/state/state.html)) to the outside world and +allows the user to query a job's state from outside Flink. For some scenarios, queryable state +eliminates the need for distributed operations/transactions with external systems such as key-value +stores which are often the bottleneck in practice. In addition, this feature may be particularly +useful for debugging purposes. - Attention: Queryable state accesses keyed state from a concurrent thread rather - than synchronizing with the operator and potentially blocking its operation. Since any state - backend using Java heap space, e.g. MemoryStateBackend or - FsStateBackend, does not work with copies when retrieving values but instead directly - references the stored values, read-modify-write patterns are unsafe and may cause the - queryable state server to fail due to concurrent modifications. - The RocksDBStateBackend is safe from these issues. + Attention: When querying a state object, that object is accessed from a concurrent + thread without any synchronization or copying. This is a design choice, as any of the above would lead + to increased job latency, which we wanted to avoid. Since any state backend using Java heap space, + e.g. MemoryStateBackend or FsStateBackend, does not work + with copies when retrieving values but instead directly references the stored values, read-modify-write + patterns are unsafe and may cause the queryable state server to fail due to concurrent modifications. + The RocksDBStateBackend is safe from these issues. +## Architecture + +Before showing how to use the Queryable State, it is useful to briefly describe the entities that compose it. +The Queryable State consists of three main entities: + + 1. the `QueryableStateClient`, which (potentially) runs outside the Flink cluster and submits the user queries, + 2. the `QueryableStateClientProxy`, which runs on each `TaskManager` (*i.e.* inside the Flink cluster) and is responsible + for receiving the client's queries, fetching the requested state on his behalf, and returning it to the client, and --- End diff -- maybe "fetching the requested state from the responsible TaskManager" ---
[GitHub] flink pull request #4966: [FLINK-7822][FLINK-7823] Adds documentation and fi...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/4966#discussion_r149337715 --- Diff: docs/dev/stream/state/queryable_state.md --- @@ -32,38 +32,67 @@ under the License. likely that there will be breaking API changes on the client side in the upcoming Flink versions. -In a nutshell, this feature allows users to query Flink's managed partitioned state -(see [Working with State]({{ site.baseurl }}/dev/stream/state/state.html)) from outside of -Flink. For some scenarios, queryable state thus eliminates the need for distributed -operations/transactions with external systems such as key-value stores which are often the -bottleneck in practice. +In a nutshell, this feature exposes Flink's managed keyed (partitioned) state +(see [Working with State]({{ site.baseurl }}/dev/stream/state/state.html)) to the outside world and +allows the user to query a job's state from outside Flink. For some scenarios, queryable state +eliminates the need for distributed operations/transactions with external systems such as key-value +stores which are often the bottleneck in practice. In addition, this feature may be particularly +useful for debugging purposes. - Attention: Queryable state accesses keyed state from a concurrent thread rather - than synchronizing with the operator and potentially blocking its operation. Since any state - backend using Java heap space, e.g. MemoryStateBackend or - FsStateBackend, does not work with copies when retrieving values but instead directly - references the stored values, read-modify-write patterns are unsafe and may cause the - queryable state server to fail due to concurrent modifications. - The RocksDBStateBackend is safe from these issues. + Attention: When querying a state object, that object is accessed from a concurrent + thread without any synchronization or copying. This is a design choice, as any of the above would lead + to increased job latency, which we wanted to avoid. Since any state backend using Java heap space, + e.g. MemoryStateBackend or FsStateBackend, does not work + with copies when retrieving values but instead directly references the stored values, read-modify-write + patterns are unsafe and may cause the queryable state server to fail due to concurrent modifications. + The RocksDBStateBackend is safe from these issues. +## Architecture + +Before showing how to use the Queryable State, it is useful to briefly describe the entities that compose it. +The Queryable State consists of three main entities: + + 1. the `QueryableStateClient`, which (potentially) runs outside the Flink cluster and submits the user queries, + 2. the `QueryableStateClientProxy`, which runs on each `TaskManager` (*i.e.* inside the Flink cluster) and is responsible + for receiving the client's queries, fetching the requested state on his behalf, and returning it to the client, and + 3. the `QueryableStateServer` which runs on each `TaskManager` and is responsible for serving the locally stored state. + +In a nutshell, the client will connect to one of the proxies and send a request for the state associated with a specific +key, `k`. As stated in [Working with State]({{ site.baseurl }}/dev/stream/state/state.html), keyed state is organized in +*Key Groups*, and each `TaskManager` is assigned a number of these key groups. To discover which `TaskManager` is +responsible for the key group holding `k`, the proxy will ask the `JobManager`. Based on the answer, the proxy will +then query the `QueryableStateServer` running on that `TaskManager` for the state associated with `k`, and forward the +response back to the client. + +## Activating Queryable State + +To enable queryable state on your Flink cluster, you just have to copy the +`flink-queryable-state-runtime{{ site.scala_version_suffix }}-{{site.version }}.jar` +from the `opt/` folder of your [Flink distribution](https://flink.apache.org/downloads.html "Apache Flink: Downloads"), +to the `lib/` folder. In other case, the queryable state feature is not enabled. --- End diff -- nit: "Otherwise, ..." ---
[GitHub] flink pull request #4966: [FLINK-7822][FLINK-7823] Adds documentation and fi...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/4966#discussion_r149337260 --- Diff: docs/dev/stream/state/queryable_state.md --- @@ -32,38 +32,67 @@ under the License. likely that there will be breaking API changes on the client side in the upcoming Flink versions. -In a nutshell, this feature allows users to query Flink's managed partitioned state -(see [Working with State]({{ site.baseurl }}/dev/stream/state/state.html)) from outside of -Flink. For some scenarios, queryable state thus eliminates the need for distributed -operations/transactions with external systems such as key-value stores which are often the -bottleneck in practice. +In a nutshell, this feature exposes Flink's managed keyed (partitioned) state +(see [Working with State]({{ site.baseurl }}/dev/stream/state/state.html)) to the outside world and +allows the user to query a job's state from outside Flink. For some scenarios, queryable state +eliminates the need for distributed operations/transactions with external systems such as key-value +stores which are often the bottleneck in practice. In addition, this feature may be particularly +useful for debugging purposes. - Attention: Queryable state accesses keyed state from a concurrent thread rather - than synchronizing with the operator and potentially blocking its operation. Since any state - backend using Java heap space, e.g. MemoryStateBackend or - FsStateBackend, does not work with copies when retrieving values but instead directly - references the stored values, read-modify-write patterns are unsafe and may cause the - queryable state server to fail due to concurrent modifications. - The RocksDBStateBackend is safe from these issues. + Attention: When querying a state object, that object is accessed from a concurrent + thread without any synchronization or copying. This is a design choice, as any of the above would lead + to increased job latency, which we wanted to avoid. Since any state backend using Java heap space, + e.g. MemoryStateBackend or FsStateBackend, does not work + with copies when retrieving values but instead directly references the stored values, read-modify-write + patterns are unsafe and may cause the queryable state server to fail due to concurrent modifications. + The RocksDBStateBackend is safe from these issues. +## Architecture + +Before showing how to use the Queryable State, it is useful to briefly describe the entities that compose it. +The Queryable State consists of three main entities: --- End diff -- nit: "The Queryable State feature" ---
[GitHub] flink pull request #4966: [FLINK-7822][FLINK-7823] Adds documentation and fi...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/4966#discussion_r149337820 --- Diff: docs/dev/stream/state/queryable_state.md --- @@ -32,38 +32,67 @@ under the License. likely that there will be breaking API changes on the client side in the upcoming Flink versions. -In a nutshell, this feature allows users to query Flink's managed partitioned state -(see [Working with State]({{ site.baseurl }}/dev/stream/state/state.html)) from outside of -Flink. For some scenarios, queryable state thus eliminates the need for distributed -operations/transactions with external systems such as key-value stores which are often the -bottleneck in practice. +In a nutshell, this feature exposes Flink's managed keyed (partitioned) state +(see [Working with State]({{ site.baseurl }}/dev/stream/state/state.html)) to the outside world and +allows the user to query a job's state from outside Flink. For some scenarios, queryable state +eliminates the need for distributed operations/transactions with external systems such as key-value +stores which are often the bottleneck in practice. In addition, this feature may be particularly +useful for debugging purposes. - Attention: Queryable state accesses keyed state from a concurrent thread rather - than synchronizing with the operator and potentially blocking its operation. Since any state - backend using Java heap space, e.g. MemoryStateBackend or - FsStateBackend, does not work with copies when retrieving values but instead directly - references the stored values, read-modify-write patterns are unsafe and may cause the - queryable state server to fail due to concurrent modifications. - The RocksDBStateBackend is safe from these issues. + Attention: When querying a state object, that object is accessed from a concurrent + thread without any synchronization or copying. This is a design choice, as any of the above would lead + to increased job latency, which we wanted to avoid. Since any state backend using Java heap space, + e.g. MemoryStateBackend or FsStateBackend, does not work + with copies when retrieving values but instead directly references the stored values, read-modify-write + patterns are unsafe and may cause the queryable state server to fail due to concurrent modifications. + The RocksDBStateBackend is safe from these issues. +## Architecture + +Before showing how to use the Queryable State, it is useful to briefly describe the entities that compose it. +The Queryable State consists of three main entities: + + 1. the `QueryableStateClient`, which (potentially) runs outside the Flink cluster and submits the user queries, + 2. the `QueryableStateClientProxy`, which runs on each `TaskManager` (*i.e.* inside the Flink cluster) and is responsible + for receiving the client's queries, fetching the requested state on his behalf, and returning it to the client, and + 3. the `QueryableStateServer` which runs on each `TaskManager` and is responsible for serving the locally stored state. + +In a nutshell, the client will connect to one of the proxies and send a request for the state associated with a specific +key, `k`. As stated in [Working with State]({{ site.baseurl }}/dev/stream/state/state.html), keyed state is organized in +*Key Groups*, and each `TaskManager` is assigned a number of these key groups. To discover which `TaskManager` is +responsible for the key group holding `k`, the proxy will ask the `JobManager`. Based on the answer, the proxy will +then query the `QueryableStateServer` running on that `TaskManager` for the state associated with `k`, and forward the +response back to the client. + +## Activating Queryable State + +To enable queryable state on your Flink cluster, you just have to copy the +`flink-queryable-state-runtime{{ site.scala_version_suffix }}-{{site.version }}.jar` +from the `opt/` folder of your [Flink distribution](https://flink.apache.org/downloads.html "Apache Flink: Downloads"), +to the `lib/` folder. In other case, the queryable state feature is not enabled. + +To verify that your cluster is running with queryable state enabled, check the logs of any +task manager for the line: `"Started the Queryable State Proxy Server @ ..."`. + ## Making State Queryable -In order to make state queryable, the queryable state server first needs to be enabled globally -by setting the `query.server.enable` configuration parameter to `true` (current default). -Then appropriate state needs to be made queryable by using either +Now that you have activated queryable state on your cluster, it is time to see how to use it. In order for a state to --- End
[GitHub] flink pull request #4966: [FLINK-7822][FLINK-7823] Adds documentation and fi...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/4966#discussion_r149337649 --- Diff: docs/dev/stream/state/queryable_state.md --- @@ -32,38 +32,67 @@ under the License. likely that there will be breaking API changes on the client side in the upcoming Flink versions. -In a nutshell, this feature allows users to query Flink's managed partitioned state -(see [Working with State]({{ site.baseurl }}/dev/stream/state/state.html)) from outside of -Flink. For some scenarios, queryable state thus eliminates the need for distributed -operations/transactions with external systems such as key-value stores which are often the -bottleneck in practice. +In a nutshell, this feature exposes Flink's managed keyed (partitioned) state +(see [Working with State]({{ site.baseurl }}/dev/stream/state/state.html)) to the outside world and +allows the user to query a job's state from outside Flink. For some scenarios, queryable state +eliminates the need for distributed operations/transactions with external systems such as key-value +stores which are often the bottleneck in practice. In addition, this feature may be particularly +useful for debugging purposes. - Attention: Queryable state accesses keyed state from a concurrent thread rather - than synchronizing with the operator and potentially blocking its operation. Since any state - backend using Java heap space, e.g. MemoryStateBackend or - FsStateBackend, does not work with copies when retrieving values but instead directly - references the stored values, read-modify-write patterns are unsafe and may cause the - queryable state server to fail due to concurrent modifications. - The RocksDBStateBackend is safe from these issues. + Attention: When querying a state object, that object is accessed from a concurrent + thread without any synchronization or copying. This is a design choice, as any of the above would lead + to increased job latency, which we wanted to avoid. Since any state backend using Java heap space, + e.g. MemoryStateBackend or FsStateBackend, does not work + with copies when retrieving values but instead directly references the stored values, read-modify-write + patterns are unsafe and may cause the queryable state server to fail due to concurrent modifications. + The RocksDBStateBackend is safe from these issues. +## Architecture + +Before showing how to use the Queryable State, it is useful to briefly describe the entities that compose it. +The Queryable State consists of three main entities: + + 1. the `QueryableStateClient`, which (potentially) runs outside the Flink cluster and submits the user queries, + 2. the `QueryableStateClientProxy`, which runs on each `TaskManager` (*i.e.* inside the Flink cluster) and is responsible + for receiving the client's queries, fetching the requested state on his behalf, and returning it to the client, and + 3. the `QueryableStateServer` which runs on each `TaskManager` and is responsible for serving the locally stored state. + +In a nutshell, the client will connect to one of the proxies and send a request for the state associated with a specific +key, `k`. As stated in [Working with State]({{ site.baseurl }}/dev/stream/state/state.html), keyed state is organized in +*Key Groups*, and each `TaskManager` is assigned a number of these key groups. To discover which `TaskManager` is +responsible for the key group holding `k`, the proxy will ask the `JobManager`. Based on the answer, the proxy will +then query the `QueryableStateServer` running on that `TaskManager` for the state associated with `k`, and forward the +response back to the client. + +## Activating Queryable State + +To enable queryable state on your Flink cluster, you just have to copy the --- End diff -- nit: omit "the" ---
[jira] [Commented] (FLINK-7822) Add documentation for the new queryable state client.
[ https://issues.apache.org/jira/browse/FLINK-7822?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16241834#comment-16241834 ] ASF GitHub Bot commented on FLINK-7822: --- GitHub user kl0u opened a pull request: https://github.com/apache/flink/pull/4966 [FLINK-7822][FLINK-7823] Adds documentation and fixes configuration of QS. ## What is the purpose of the change *This PR adds documentation for the new queryable state. R @aljoscha You can merge this pull request into a Git repository by running: $ git pull https://github.com/kl0u/flink qs-doc Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4966.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4966 commit 91c7082158e534ffc30894e9f23e6ea6c94fab07 Author: kkloudasDate: 2017-11-06T11:43:18Z [FLINK-7823][QS] Update Queryable State configuration parameters. commit 2a076d6119eb01a4b4bc48925a37dde1b439dd54 Author: kkloudas Date: 2017-11-06T16:21:45Z [FLINK-7822][QS][doc] Update Queryable State docs. > Add documentation for the new queryable state client. > - > > Key: FLINK-7822 > URL: https://issues.apache.org/jira/browse/FLINK-7822 > Project: Flink > Issue Type: Sub-task > Components: Documentation, Queryable State >Affects Versions: 1.4.0 >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas >Priority: Blocker > Fix For: 1.4.0 > > > This has to include: > 1) how to enable queryable state (jars)... > 2) configuration parameters > 3) new APIs > 4) Guarantees/Semantics/Limitations (e.g. memory backend) -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4966: [FLINK-7822][FLINK-7823] Adds documentation and fi...
GitHub user kl0u opened a pull request: https://github.com/apache/flink/pull/4966 [FLINK-7822][FLINK-7823] Adds documentation and fixes configuration of QS. ## What is the purpose of the change *This PR adds documentation for the new queryable state. R @aljoscha You can merge this pull request into a Git repository by running: $ git pull https://github.com/kl0u/flink qs-doc Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4966.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4966 commit 91c7082158e534ffc30894e9f23e6ea6c94fab07 Author: kkloudasDate: 2017-11-06T11:43:18Z [FLINK-7823][QS] Update Queryable State configuration parameters. commit 2a076d6119eb01a4b4bc48925a37dde1b439dd54 Author: kkloudas Date: 2017-11-06T16:21:45Z [FLINK-7822][QS][doc] Update Queryable State docs. ---
[jira] [Created] (FLINK-8006) flink-daemon.sh: line 103: binary operator expected
Alejandro created FLINK-8006: - Summary: flink-daemon.sh: line 103: binary operator expected Key: FLINK-8006 URL: https://issues.apache.org/jira/browse/FLINK-8006 Project: Flink Issue Type: Bug Components: Startup Shell Scripts Affects Versions: 1.3.2 Environment: Linux 4.12.12-gentoo #2 SMP x86_64 Intel(R) Core(TM) i3-3110M CPU @ 2.40GHz GenuineIntel GNU/Linux Reporter: Alejandro When executing `./bin/start-local.sh` I get flink-1.3.2/bin/flink-daemon.sh: line 79: $pid: ambiguous redirect flink-1.3.2/bin/flink-daemon.sh: line 103: [: /tmp/flink-Alejandro: binary operator expected I solved the problem replacing $pid by "$pid" in lines 79 and 103. Should I make a PR to the repo? -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4964: [hotfix][docs] Add type for numLateRecordsDropped ...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4964 ---
[jira] [Commented] (FLINK-8004) Sample code in Debugging & Monitoring Metrics documentation section is not valid java
[ https://issues.apache.org/jira/browse/FLINK-8004?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16241815#comment-16241815 ] ASF GitHub Bot commented on FLINK-8004: --- GitHub user zentol opened a pull request: https://github.com/apache/flink/pull/4965 [FLINK-8004][metrics][docs] Fix usage examples ## What is the purpose of the change This PR fixes several issues in the metric usage examples in the documentation. ## Brief change log * remove wrong `@public` sections with a proper `@Override` a dpublic modifier * stream line all examples to just return the input (required adjusting types) * added missing return statements * modified all examples to override all required methods (open() & map()) You can merge this pull request into a Git repository by running: $ git pull https://github.com/zentol/flink 8004 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4965.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4965 commit 984473a58a70daf09d7d0cc2f409c05d391a4be1 Author: zentolDate: 2017-11-07T10:40:15Z [FLINK-8004][metrics][docs] Fix usage examples > Sample code in Debugging & Monitoring Metrics documentation section is not > valid java > -- > > Key: FLINK-8004 > URL: https://issues.apache.org/jira/browse/FLINK-8004 > Project: Flink > Issue Type: Improvement >Reporter: Andreas Hecke >Assignee: Chesnay Schepler >Priority: Minor > > Hi, we have been stumbled about some documentation inconsistencies in how to > use metrics in flink. Seems there is some invalid java code posted as samples > like having methods declared as @public and missing return statements, see > [here|https://ci.apache.org/projects/flink/flink-docs-release-1.3/monitoring/metrics.html#counter] > I raised a question on > [SO|https://stackoverflow.com/questions/47153424/what-does-public-modifier-mean-in-method-signature] > and Fabian asked me to open a Jira issue -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4965: [FLINK-8004][metrics][docs] Fix usage examples
GitHub user zentol opened a pull request: https://github.com/apache/flink/pull/4965 [FLINK-8004][metrics][docs] Fix usage examples ## What is the purpose of the change This PR fixes several issues in the metric usage examples in the documentation. ## Brief change log * remove wrong `@public` sections with a proper `@Override` a dpublic modifier * stream line all examples to just return the input (required adjusting types) * added missing return statements * modified all examples to override all required methods (open() & map()) You can merge this pull request into a Git repository by running: $ git pull https://github.com/zentol/flink 8004 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4965.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4965 commit 984473a58a70daf09d7d0cc2f409c05d391a4be1 Author: zentolDate: 2017-11-07T10:40:15Z [FLINK-8004][metrics][docs] Fix usage examples ---
[jira] [Updated] (FLINK-8005) Snapshotting FlinkKafkaProducer011 fails due to ClassLoader issues
[ https://issues.apache.org/jira/browse/FLINK-8005?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gary Yao updated FLINK-8005: Fix Version/s: 1.4.0 > Snapshotting FlinkKafkaProducer011 fails due to ClassLoader issues > -- > > Key: FLINK-8005 > URL: https://issues.apache.org/jira/browse/FLINK-8005 > Project: Flink > Issue Type: Bug > Components: Core, Kafka Connector, State Backends, Checkpointing >Affects Versions: 1.4.0 >Reporter: Gary Yao >Priority: Blocker > Fix For: 1.4.0 > > > *Problem Description* > Classes in the user code jar cannot be loaded by the snapshot thread’s > context class loader ({{AppClassLoader}}). > For example, when creating instances of {{KafkaProducer}}, Strings are > resolved to class objects by Kafka. > Find below an extract from {{ConfigDef.java}}: > {code} > case CLASS: > if (value instanceof Class) > return value; > else if (value instanceof String) > return Class.forName(trimmed, true, > Utils.getContextOrKafkaClassLoader()); > else > throw new ConfigException(name, value, "Expected a Class instance or > class name."); > {code} > *Exception/Stacktrace* > {noformat} > Caused by: java.lang.Exception: Could not complete snapshot 1 for operator > Source: Collection Source -> Sink: kafka-sink-1510048188383 (1/1). > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:379) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1077) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1026) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:659) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:595) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:526) > ... 7 more > Caused by: org.apache.kafka.common.config.ConfigException: Invalid value > org.apache.kafka.common.serialization.ByteArraySerializer for configuration > key.serializer: Class > org.apache.kafka.common.serialization.ByteArraySerializer could not be found. > at > org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:715) > at > org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:460) > at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:453) > at > org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:62) > at > org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:75) > at > org.apache.kafka.clients.producer.ProducerConfig.(ProducerConfig.java:360) > at > org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:288) > at > org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer.(FlinkKafkaProducer.java:114) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initProducer(FlinkKafkaProducer011.java:913) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initTransactionalProducer(FlinkKafkaProducer011.java:904) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.createOrGetProducerFromPool(FlinkKafkaProducer011.java:637) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:613) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:94) > at > org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.beginTransactionInternal(TwoPhaseCommitSinkFunction.java:359) > at > org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.snapshotState(TwoPhaseCommitSinkFunction.java:294) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.snapshotState(FlinkKafkaProducer011.java:756) > at > org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118) > at > org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:357) > ... 12 more > {noformat} > *How to reproduce* > Note that the problem only appears when a job is deployed on a cluster. > # Build Flink 1.4 > # Build test job
[jira] [Updated] (FLINK-8005) Snapshotting FlinkKafkaProducer011 fails due to ClassLoader issues
[ https://issues.apache.org/jira/browse/FLINK-8005?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gary Yao updated FLINK-8005: Description: *Problem Description* Classes in the user code jar cannot be loaded by the snapshot thread’s context class loader ({{AppClassLoader}}). For example, when creating instances of {{KafkaProducer}}, Strings are resolved to class objects by Kafka. Find below an extract from {{ConfigDef.java}}: {code} case CLASS: if (value instanceof Class) return value; else if (value instanceof String) return Class.forName(trimmed, true, Utils.getContextOrKafkaClassLoader()); else throw new ConfigException(name, value, "Expected a Class instance or class name."); {code} *Exception/Stacktrace* {noformat} Caused by: java.lang.Exception: Could not complete snapshot 1 for operator Source: Collection Source -> Sink: kafka-sink-1510048188383 (1/1). at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:379) at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1077) at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1026) at org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:659) at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:595) at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:526) ... 7 more Caused by: org.apache.kafka.common.config.ConfigException: Invalid value org.apache.kafka.common.serialization.ByteArraySerializer for configuration key.serializer: Class org.apache.kafka.common.serialization.ByteArraySerializer could not be found. at org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:715) at org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:460) at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:453) at org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:62) at org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:75) at org.apache.kafka.clients.producer.ProducerConfig.(ProducerConfig.java:360) at org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:288) at org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer.(FlinkKafkaProducer.java:114) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initProducer(FlinkKafkaProducer011.java:913) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initTransactionalProducer(FlinkKafkaProducer011.java:904) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.createOrGetProducerFromPool(FlinkKafkaProducer011.java:637) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:613) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:94) at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.beginTransactionInternal(TwoPhaseCommitSinkFunction.java:359) at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.snapshotState(TwoPhaseCommitSinkFunction.java:294) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.snapshotState(FlinkKafkaProducer011.java:756) at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118) at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:357) ... 12 more {noformat} *How to reproduce* Note that the problem only appears when a job is deployed on a cluster. # Build Flink 1.4 # Build test job https://github.com/GJL/flink-kafka011-producer-test with {{mvn -o clean install -Pbuild-jar}} # Start job: {noformat} bin/flink run -c com.garyyao.StreamingJob /pathto/flink-kafka011-producer/target/flink-kafka011-producer-1.0-SNAPSHOT.jar {noformat} was: *Problem Description* Classes in the user code jar cannot be loaded by the snapshot thread’s context class loader. For example, when creating instances of {{KafkaProducer}}, Strings are resolved to class objects by Kafka. Find below an extract from {{ConfigDef.java}}: {code} case CLASS: if (value instanceof Class) return value; else if
[jira] [Commented] (FLINK-7419) Shade jackson dependency in flink-avro
[ https://issues.apache.org/jira/browse/FLINK-7419?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16241812#comment-16241812 ] ASF GitHub Bot commented on FLINK-7419: --- Github user zjureel closed the pull request at: https://github.com/apache/flink/pull/4524 > Shade jackson dependency in flink-avro > -- > > Key: FLINK-7419 > URL: https://issues.apache.org/jira/browse/FLINK-7419 > Project: Flink > Issue Type: Sub-task > Components: Build System >Reporter: Stephan Ewen >Assignee: Fang Yong >Priority: Blocker > Fix For: 1.4.0 > > > Avro uses {{org.codehouse.jackson}} which also exists in multiple > incompatible versions. We should shade it to > {{org.apache.flink.shaded.avro.org.codehouse.jackson}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4524: [FLINK-7419] Shade jackson dependency in flink-avr...
Github user zjureel closed the pull request at: https://github.com/apache/flink/pull/4524 ---
[jira] [Updated] (FLINK-8004) Sample code in Debugging & Monitoring Metrics documentation section is not valid java
[ https://issues.apache.org/jira/browse/FLINK-8004?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler updated FLINK-8004: Description: Hi, we have been stumbled about some documentation inconsistencies in how to use metrics in flink. Seems there is some invalid java code posted as samples like having methods declared as @public and missing return statements, see [here|https://ci.apache.org/projects/flink/flink-docs-release-1.3/monitoring/metrics.html#counter] I raised a question on [SO|https://stackoverflow.com/questions/47153424/what-does-public-modifier-mean-in-method-signature] and Fabian asked me to open a Jira issue was: Hi, we have been stumbled about some documentation inconsistencies in how to use metrics in flink. Seems there is some invalid java code posted as samples like having methods declared as @public and missing return statements, see [here|https://ci.apache.org/projects/flink/flink-docs-release-1.3/monitoring/metrics.html#counter] I raised a question on [SO|https://ci.apache.org/projects/flink/flink-docs-release-1.3/monitoring/metrics.html#counter] and Fabian asked me to open a Jira issue > Sample code in Debugging & Monitoring Metrics documentation section is not > valid java > -- > > Key: FLINK-8004 > URL: https://issues.apache.org/jira/browse/FLINK-8004 > Project: Flink > Issue Type: Improvement >Reporter: Andreas Hecke >Assignee: Chesnay Schepler >Priority: Minor > > Hi, we have been stumbled about some documentation inconsistencies in how to > use metrics in flink. Seems there is some invalid java code posted as samples > like having methods declared as @public and missing return statements, see > [here|https://ci.apache.org/projects/flink/flink-docs-release-1.3/monitoring/metrics.html#counter] > I raised a question on > [SO|https://stackoverflow.com/questions/47153424/what-does-public-modifier-mean-in-method-signature] > and Fabian asked me to open a Jira issue -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Comment Edited] (FLINK-8004) Sample code in Debugging & Monitoring Metrics documentation section is not valid java
[ https://issues.apache.org/jira/browse/FLINK-8004?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16241798#comment-16241798 ] Chesnay Schepler edited comment on FLINK-8004 at 11/7/17 10:33 AM: --- -Could you add the SO link? Both your links point to the documentation.- Found it. was (Author: zentol): Could you add the SO link? Both your links point to the documentation. > Sample code in Debugging & Monitoring Metrics documentation section is not > valid java > -- > > Key: FLINK-8004 > URL: https://issues.apache.org/jira/browse/FLINK-8004 > Project: Flink > Issue Type: Improvement >Reporter: Andreas Hecke >Assignee: Chesnay Schepler >Priority: Minor > > Hi, we have been stumbled about some documentation inconsistencies in how to > use metrics in flink. Seems there is some invalid java code posted as samples > like having methods declared as @public and missing return statements, see > [here|https://ci.apache.org/projects/flink/flink-docs-release-1.3/monitoring/metrics.html#counter] > I raised a question on > [SO|https://stackoverflow.com/questions/47153424/what-does-public-modifier-mean-in-method-signature] > and Fabian asked me to open a Jira issue -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-8004) Sample code in Debugging & Monitoring Metrics documentation section is not valid java
Andreas Hecke created FLINK-8004: Summary: Sample code in Debugging & Monitoring Metrics documentation section is not valid java Key: FLINK-8004 URL: https://issues.apache.org/jira/browse/FLINK-8004 Project: Flink Issue Type: Improvement Reporter: Andreas Hecke Priority: Minor Hi, we have been stumbled about some documentation inconsistencies in how to use metrics in flink. Seems there is some invalid java code posted as samples like having methods declared as @public and missing return statements, see [here|https://ci.apache.org/projects/flink/flink-docs-release-1.3/monitoring/metrics.html#counter] I raised a question on [SO|https://ci.apache.org/projects/flink/flink-docs-release-1.3/monitoring/metrics.html#counter] and Fabian asked me to open a Jira issue -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7978) Kafka011 exactly-once Producer sporadically fails to commit under high parallelism
[ https://issues.apache.org/jira/browse/FLINK-7978?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16241799#comment-16241799 ] ASF GitHub Bot commented on FLINK-7978: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/4955#discussion_r149326628 --- Diff: flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/TransactionalIdsGenerator.java --- @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.internal; + +import java.util.HashSet; +import java.util.Set; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Class responsible for generating transactional ids to use when communicating with Kafka. + * + * It guarantees that: + * - generated ids to use will never clash with ids to use from different subtasks + * - generated ids to abort will never clash with ids to abort from different subtasks + * - generated ids to use will never clash with ids to abort from different subtasks + * + * In other words, any particular generated id will always be assigned to one and only one subtask. + */ +public class TransactionalIdsGenerator { --- End diff -- another nitpick : `` tags are usually not closed in Javadoc: http://www.oracle.com/technetwork/articles/java/index-137868.html > Kafka011 exactly-once Producer sporadically fails to commit under high > parallelism > -- > > Key: FLINK-7978 > URL: https://issues.apache.org/jira/browse/FLINK-7978 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: Gary Yao >Assignee: Piotr Nowojski >Priority: Blocker > Fix For: 1.4.0 > > > The Kafka011 exactly-once producer sporadically fails to commit/confirm the > first checkpoint. The behavior seems to be easier reproduced under high job > parallelism. > *Logs/Stacktrace* > {noformat} > 10:24:35,347 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator >- Completed checkpoint 1 (191029 bytes in 1435 ms). > 10:24:35,349 INFO > org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction - > FlinkKafkaProducer011 2/32 - checkpoint 1 complete, committing transaction > TransactionHolder{handle=KafkaTransactionState [transactionalId=Sink: > kafka-sink-1509787467330-12], transactionStartTime=1509787474588} from > checkpoint 1 > 10:24:35,349 INFO > org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction - > FlinkKafkaProducer011 1/32 - checkpoint 1 complete, committing transaction > TransactionHolder{handle=KafkaTransactionState [transactionalId=Sink: > kafka-sink-1509787467330-8], transactionStartTime=1509787474393} from > checkpoint 1 > 10:24:35,349 INFO > org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction - > FlinkKafkaProducer011 0/32 - checkpoint 1 complete, committing transaction > TransactionHolder{handle=KafkaTransactionState [transactionalId=Sink: > kafka-sink-1509787467330-4], transactionStartTime=1509787474448} from > checkpoint 1 > 10:24:35,350 INFO > org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction - > FlinkKafkaProducer011 6/32 - checkpoint 1 complete, committing transaction > TransactionHolder{handle=KafkaTransactionState [transactionalId=Sink: > kafka-sink-1509787467330-34], transactionStartTime=1509787474742} from > checkpoint 1 > 10:24:35,350 INFO > org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction - > FlinkKafkaProducer011 4/32 - checkpoint 1 complete, committing transaction > TransactionHolder{handle=KafkaTransactionState [transactionalId=Sink: > kafka-sink-1509787467330-23], transactionStartTime=1509787474777} from > checkpoint 1 >
[jira] [Commented] (FLINK-8004) Sample code in Debugging & Monitoring Metrics documentation section is not valid java
[ https://issues.apache.org/jira/browse/FLINK-8004?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16241798#comment-16241798 ] Chesnay Schepler commented on FLINK-8004: - Could you add the SO link? Both your links point to the documentation. > Sample code in Debugging & Monitoring Metrics documentation section is not > valid java > -- > > Key: FLINK-8004 > URL: https://issues.apache.org/jira/browse/FLINK-8004 > Project: Flink > Issue Type: Improvement >Reporter: Andreas Hecke >Assignee: Chesnay Schepler >Priority: Minor > > Hi, we have been stumbled about some documentation inconsistencies in how to > use metrics in flink. Seems there is some invalid java code posted as samples > like having methods declared as @public and missing return statements, see > [here|https://ci.apache.org/projects/flink/flink-docs-release-1.3/monitoring/metrics.html#counter] > I raised a question on > [SO|https://ci.apache.org/projects/flink/flink-docs-release-1.3/monitoring/metrics.html#counter] > and Fabian asked me to open a Jira issue -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4955: [FLINK-7978][kafka] Ensure that transactional ids ...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/4955#discussion_r149326628 --- Diff: flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/TransactionalIdsGenerator.java --- @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.internal; + +import java.util.HashSet; +import java.util.Set; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Class responsible for generating transactional ids to use when communicating with Kafka. + * + * It guarantees that: + * - generated ids to use will never clash with ids to use from different subtasks + * - generated ids to abort will never clash with ids to abort from different subtasks + * - generated ids to use will never clash with ids to abort from different subtasks + * + * In other words, any particular generated id will always be assigned to one and only one subtask. + */ +public class TransactionalIdsGenerator { --- End diff -- another nitpick ð: `` tags are usually not closed in Javadoc: http://www.oracle.com/technetwork/articles/java/index-137868.html ---
[jira] [Created] (FLINK-8005) Snapshotting FlinkKafkaProducer011 fails due to ClassLoader issues
Gary Yao created FLINK-8005: --- Summary: Snapshotting FlinkKafkaProducer011 fails due to ClassLoader issues Key: FLINK-8005 URL: https://issues.apache.org/jira/browse/FLINK-8005 Project: Flink Issue Type: Bug Components: Core, Kafka Connector, State Backends, Checkpointing Affects Versions: 1.4.0 Reporter: Gary Yao Priority: Blocker *Problem Description* Classes in the user code jar cannot be loaded by the snapshot thread’s context class loader. For example, when creating instances of {{KafkaProducer}}, Strings are resolved to class objects by Kafka. Find below an extract from {{ConfigDef.java}}: {code} case CLASS: if (value instanceof Class) return value; else if (value instanceof String) return Class.forName(trimmed, true, Utils.getContextOrKafkaClassLoader()); else throw new ConfigException(name, value, "Expected a Class instance or class name."); {code} *Exception/Stacktrace* {noformat} Caused by: java.lang.Exception: Could not complete snapshot 1 for operator Source: Collection Source -> Sink: kafka-sink-1510048188383 (1/1). at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:379) at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1077) at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1026) at org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:659) at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:595) at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:526) ... 7 more Caused by: org.apache.kafka.common.config.ConfigException: Invalid value org.apache.kafka.common.serialization.ByteArraySerializer for configuration key.serializer: Class org.apache.kafka.common.serialization.ByteArraySerializer could not be found. at org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:715) at org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:460) at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:453) at org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:62) at org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:75) at org.apache.kafka.clients.producer.ProducerConfig.(ProducerConfig.java:360) at org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:288) at org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer.(FlinkKafkaProducer.java:114) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initProducer(FlinkKafkaProducer011.java:913) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initTransactionalProducer(FlinkKafkaProducer011.java:904) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.createOrGetProducerFromPool(FlinkKafkaProducer011.java:637) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:613) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:94) at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.beginTransactionInternal(TwoPhaseCommitSinkFunction.java:359) at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.snapshotState(TwoPhaseCommitSinkFunction.java:294) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.snapshotState(FlinkKafkaProducer011.java:756) at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118) at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:357) ... 12 more {noformat} *How to reproduce* Note that the problem only appears when a job is deployed on a cluster. # Build Flink 1.4 # Build test job https://github.com/GJL/flink-kafka011-producer-test with {{mvn -o clean install -Pbuild-jar}} # Start job: {noformat} bin/flink run -c com.garyyao.StreamingJob /pathto/flink-kafka011-producer/target/flink-kafka011-producer-1.0-SNAPSHOT.jar {noformat} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Assigned] (FLINK-8004) Sample code in Debugging & Monitoring Metrics documentation section is not valid java
[ https://issues.apache.org/jira/browse/FLINK-8004?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler reassigned FLINK-8004: --- Assignee: Chesnay Schepler > Sample code in Debugging & Monitoring Metrics documentation section is not > valid java > -- > > Key: FLINK-8004 > URL: https://issues.apache.org/jira/browse/FLINK-8004 > Project: Flink > Issue Type: Improvement >Reporter: Andreas Hecke >Assignee: Chesnay Schepler >Priority: Minor > > Hi, we have been stumbled about some documentation inconsistencies in how to > use metrics in flink. Seems there is some invalid java code posted as samples > like having methods declared as @public and missing return statements, see > [here|https://ci.apache.org/projects/flink/flink-docs-release-1.3/monitoring/metrics.html#counter] > I raised a question on > [SO|https://ci.apache.org/projects/flink/flink-docs-release-1.3/monitoring/metrics.html#counter] > and Fabian asked me to open a Jira issue -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7773) Test instability in UtilsTest#testYarnFlinkResourceManagerJobManagerLostLeadership
[ https://issues.apache.org/jira/browse/FLINK-7773?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16241788#comment-16241788 ] ASF GitHub Bot commented on FLINK-7773: --- Github user zentol commented on the issue: https://github.com/apache/flink/pull/4957 +1 > Test instability in > UtilsTest#testYarnFlinkResourceManagerJobManagerLostLeadership > -- > > Key: FLINK-7773 > URL: https://issues.apache.org/jira/browse/FLINK-7773 > Project: Flink > Issue Type: Bug > Components: Tests, YARN >Affects Versions: 1.4.0 >Reporter: Nico Kruber >Assignee: Till Rohrmann >Priority: Critical > Labels: test-stability > Fix For: 1.4.0 > > > {{UtilsTest#testYarnFlinkResourceManagerJobManagerLostLeadership}} may result > in the following exception (repeated run in IntelliJ until failure, but also > on Travis here: https://travis-ci.org/NicoK/flink/jobs/283696974 ) > {code} > org.apache.flink.yarn.UtilsTest "Until Failure" > org.mockito.exceptions.misusing.UnfinishedStubbingException: > Unfinished stubbing detected here: > -> at org.apache.flink.yarn.UtilsTest$1.(UtilsTest.java:171) > E.g. thenReturn() may be missing. > Examples of correct stubbing: > when(mock.isOk()).thenReturn(true); > when(mock.isOk()).thenThrow(exception); > doThrow(exception).when(mock).someVoidMethod(); > Hints: > 1. missing thenReturn() > 2. you are trying to stub a final method, you naughty developer! > 3: you are stubbing the behaviour of another mock inside before 'thenReturn' > instruction if completed > at org.apache.flink.yarn.UtilsTest$1.(UtilsTest.java:179) > at > org.apache.flink.yarn.UtilsTest.testYarnFlinkResourceManagerJobManagerLostLeadership(UtilsTest.java:95) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55) > at org.junit.rules.RunRules.evaluate(RunRules.java:20) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) > at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) > at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > at org.junit.runners.ParentRunner.run(ParentRunner.java:363) > at org.junit.runner.JUnitCore.run(JUnitCore.java:137) > at > com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68) > at > com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:67) > at > com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242) > at > com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70) > {code} > The incriminating code is this: > {code} > doAnswer(new Answer() { > @Override > public Object answer(InvocationOnMock invocation) throws Throwable { > Container container = (Container) invocation.getArguments()[0]; > resourceManagerGateway.tell(new > NotifyResourceStarted(YarnFlinkResourceManager.extractResourceID(container)), > leader1Gateway); > return null; > } > }).when(nodeManagerClient).startContainer(Matchers.any(Container.class), > Matchers.any(ContainerLaunchContext.class)); > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4957: [FLINK-7773] [tests] Move all mocking before testing code...
Github user zentol commented on the issue: https://github.com/apache/flink/pull/4957 +1 ---
[jira] [Commented] (FLINK-8000) Sort REST handler URLs in RestServerEndpoint
[ https://issues.apache.org/jira/browse/FLINK-8000?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16241794#comment-16241794 ] ASF GitHub Bot commented on FLINK-8000: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4958#discussion_r149325305 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java --- @@ -268,4 +287,69 @@ private static void registerHandler(Router router, Tuple2The comparator orders the Rest URLs such that URLs with path parameters are ordered behind +* those without parameters. E.g.: +* /jobs +* /jobs/overview +* /jobs/:jobid +* /jobs/:jobid/config +* /:* +* +* IMPORTANT: This comparator is highly specific to how Netty path parameter are encoded. Namely +* via a preceding ':' character. +*/ + static final class RestHandlerUrlComparator implements Comparator>, Serializable { + + private static final long serialVersionUID = 2388466767835547926L; + + private static final Comparator CASE_INSENSITIVE_ORDER = new CaseInsensitiveOrderComparator(); + + static final RestHandlerUrlComparator INSTANCE = new RestHandlerUrlComparator(); + + @Override + public int compare( + Tuple2 o1, + Tuple2 o2) { + return CASE_INSENSITIVE_ORDER.compare(o1.f0.getTargetRestEndpointURL(), o2.f0.getTargetRestEndpointURL()); + } + + static final class CaseInsensitiveOrderComparator implements Comparator, Serializable { + private static final long serialVersionUID = 8550835445193437027L; + + @Override + public int compare(String s1, String s2) { + int n1 = s1.length(); + int n2 = s2.length(); + int min = Math.min(n1, n2); + for (int i = 0; i < min; i++) { + char c1 = s1.charAt(i); + char c2 = s2.charAt(i); + if (c1 != c2) { + c1 = Character.toUpperCase(c1); + c2 = Character.toUpperCase(c2); + if (c1 != c2) { + c1 = Character.toLowerCase(c1); --- End diff -- why are we checking both lower case (example edge case)? > Sort REST handler URLs in RestServerEndpoint > > > Key: FLINK-8000 > URL: https://issues.apache.org/jira/browse/FLINK-8000 > Project: Flink > Issue Type: Improvement > Components: REST >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Minor > Labels: flip-6 > > In order to make the {{RestServerEndpoint}} more easily extendable, we should > automatically sort the returned list of rest handler when calling > {{RestServerEndpoint#initializeHandlers}}. That way the order in which the > handlers are added to the list is independent of the actual registration > order. This is, for example, important for the static file server which > always needs to be registered last. > I propose to add a special {{String}} {{Comparator}} which considers the > charactor {{':'}} to be the character with the largest value. That way we > should get always the following sort order: > - URLs without path parameters have precedence over similar URLs where parts > are replaced by path parameters (e.g. {{/jobs/overview}}, {{/jobs/:jobid}} > and {{/jobs/:jobid/config}}, {{/jobs/:jobid/vertices/:vertexId}}) > - Prefixes are sorted before URLs containing the prefix (e.g. {{/jobs}}, > {{/jobs/overview}}) -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4958: [FLINK-8000] Sort Rest handler URLS in RestServerE...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4958#discussion_r149325305 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java --- @@ -268,4 +287,69 @@ private static void registerHandler(Router router, Tuple2The comparator orders the Rest URLs such that URLs with path parameters are ordered behind +* those without parameters. E.g.: +* /jobs +* /jobs/overview +* /jobs/:jobid +* /jobs/:jobid/config +* /:* +* +* IMPORTANT: This comparator is highly specific to how Netty path parameter are encoded. Namely +* via a preceding ':' character. +*/ + static final class RestHandlerUrlComparator implements Comparator>, Serializable { + + private static final long serialVersionUID = 2388466767835547926L; + + private static final Comparator CASE_INSENSITIVE_ORDER = new CaseInsensitiveOrderComparator(); + + static final RestHandlerUrlComparator INSTANCE = new RestHandlerUrlComparator(); + + @Override + public int compare( + Tuple2 o1, + Tuple2 o2) { + return CASE_INSENSITIVE_ORDER.compare(o1.f0.getTargetRestEndpointURL(), o2.f0.getTargetRestEndpointURL()); + } + + static final class CaseInsensitiveOrderComparator implements Comparator, Serializable { + private static final long serialVersionUID = 8550835445193437027L; + + @Override + public int compare(String s1, String s2) { + int n1 = s1.length(); + int n2 = s2.length(); + int min = Math.min(n1, n2); + for (int i = 0; i < min; i++) { + char c1 = s1.charAt(i); + char c2 = s2.charAt(i); + if (c1 != c2) { + c1 = Character.toUpperCase(c1); + c2 = Character.toUpperCase(c2); + if (c1 != c2) { + c1 = Character.toLowerCase(c1); --- End diff -- why are we checking both lower case (example edge case)? ---
[jira] [Resolved] (FLINK-7985) Update findbugs-maven-plugin version to 3.0.2
[ https://issues.apache.org/jira/browse/FLINK-7985?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler resolved FLINK-7985. - Resolution: Not A Problem Fix Version/s: (was: 1.5.0) > Update findbugs-maven-plugin version to 3.0.2 > - > > Key: FLINK-7985 > URL: https://issues.apache.org/jira/browse/FLINK-7985 > Project: Flink > Issue Type: Improvement > Components: Build System >Affects Versions: 1.4.0 >Reporter: Hai Zhou UTC+8 >Assignee: Hai Zhou UTC+8 > > The findbug version used by flink is pretty old (1.3.9). The old version of > Findbugs itself have some bugs (like > http://sourceforge.net/p/findbugs/bugs/918/, hit by HADOOP-10474). and the > latest version 3.0.2 fixed the "Missing test classes" issue > (https://github.com/gleclaire/findbugs-maven-plugin/issues/15). -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4955: [FLINK-7978][kafka] Ensure that transactional ids ...
Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/4955#discussion_r149322930 --- Diff: flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/TransactionalIdsGenerator.java --- @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.internal; + +import java.util.HashSet; +import java.util.Set; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Class responsible for generating transactional ids to use when communicating with Kafka. + * + * It guarantees that: + * - generated ids to use will never clash with ids to use from different subtasks + * - generated ids to abort will never clash with ids to abort from different subtasks + * - generated ids to use will never clash with ids to abort from different subtasks + * + * In other words, any particular generated id will always be assigned to one and only one subtask. + */ +public class TransactionalIdsGenerator { --- End diff -- ops, forgot about it ---
[jira] [Commented] (FLINK-7998) Make case classes in TPCHQuery3.java public to allow dynamic instantiation
[ https://issues.apache.org/jira/browse/FLINK-7998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16241787#comment-16241787 ] Chesnay Schepler commented on FLINK-7998: - Could you expand a bit more under which circumstances this is a problem? Is the example straight-up not working? > Make case classes in TPCHQuery3.java public to allow dynamic instantiation > -- > > Key: FLINK-7998 > URL: https://issues.apache.org/jira/browse/FLINK-7998 > Project: Flink > Issue Type: Bug > Components: Examples >Affects Versions: 1.3.2 >Reporter: Keren Zhu >Priority: Minor > Labels: easyfix > Original Estimate: 5m > Remaining Estimate: 5m > > Case classes Lineitem, Customer and Order in example TPCHQuery3.java are set > to private. This causes an IllegalAccessException exception because of > reflection check in dynamic class instantiation. Making them public resolves > the problem (which is what implicitly suggested by _case class_ in > TPCHQuery3.scala) -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4960: Update version to 1.5-SNAPSHOT
Github user Aitozi commented on the issue: https://github.com/apache/flink/pull/4960 ð ---
[jira] [Commented] (FLINK-7978) Kafka011 exactly-once Producer sporadically fails to commit under high parallelism
[ https://issues.apache.org/jira/browse/FLINK-7978?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16241783#comment-16241783 ] ASF GitHub Bot commented on FLINK-7978: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/4955#discussion_r149322930 --- Diff: flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/TransactionalIdsGenerator.java --- @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.internal; + +import java.util.HashSet; +import java.util.Set; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Class responsible for generating transactional ids to use when communicating with Kafka. + * + * It guarantees that: + * - generated ids to use will never clash with ids to use from different subtasks + * - generated ids to abort will never clash with ids to abort from different subtasks + * - generated ids to use will never clash with ids to abort from different subtasks + * + * In other words, any particular generated id will always be assigned to one and only one subtask. + */ +public class TransactionalIdsGenerator { --- End diff -- ops, forgot about it > Kafka011 exactly-once Producer sporadically fails to commit under high > parallelism > -- > > Key: FLINK-7978 > URL: https://issues.apache.org/jira/browse/FLINK-7978 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: Gary Yao >Assignee: Piotr Nowojski >Priority: Blocker > Fix For: 1.4.0 > > > The Kafka011 exactly-once producer sporadically fails to commit/confirm the > first checkpoint. The behavior seems to be easier reproduced under high job > parallelism. > *Logs/Stacktrace* > {noformat} > 10:24:35,347 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator >- Completed checkpoint 1 (191029 bytes in 1435 ms). > 10:24:35,349 INFO > org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction - > FlinkKafkaProducer011 2/32 - checkpoint 1 complete, committing transaction > TransactionHolder{handle=KafkaTransactionState [transactionalId=Sink: > kafka-sink-1509787467330-12], transactionStartTime=1509787474588} from > checkpoint 1 > 10:24:35,349 INFO > org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction - > FlinkKafkaProducer011 1/32 - checkpoint 1 complete, committing transaction > TransactionHolder{handle=KafkaTransactionState [transactionalId=Sink: > kafka-sink-1509787467330-8], transactionStartTime=1509787474393} from > checkpoint 1 > 10:24:35,349 INFO > org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction - > FlinkKafkaProducer011 0/32 - checkpoint 1 complete, committing transaction > TransactionHolder{handle=KafkaTransactionState [transactionalId=Sink: > kafka-sink-1509787467330-4], transactionStartTime=1509787474448} from > checkpoint 1 > 10:24:35,350 INFO > org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction - > FlinkKafkaProducer011 6/32 - checkpoint 1 complete, committing transaction > TransactionHolder{handle=KafkaTransactionState [transactionalId=Sink: > kafka-sink-1509787467330-34], transactionStartTime=1509787474742} from > checkpoint 1 > 10:24:35,350 INFO > org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction - > FlinkKafkaProducer011 4/32 - checkpoint 1 complete, committing transaction > TransactionHolder{handle=KafkaTransactionState [transactionalId=Sink: > kafka-sink-1509787467330-23], transactionStartTime=1509787474777} from > checkpoint 1 > 10:24:35,353 INFO > org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction - >
[GitHub] flink issue #4960: Update version to 1.5-SNAPSHOT
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/4960 Crazy realization is that there are 91 pom.xml files in Flink. Its a large project :-) ---
[jira] [Commented] (FLINK-7996) Add support for (left.time = right.time) predicates to window join.
[ https://issues.apache.org/jira/browse/FLINK-7996?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16241767#comment-16241767 ] Fabian Hueske commented on FLINK-7996: -- Yes, {{left.time = right.time + C}} should be supported as well. I'd suggest to go for the easy case {{left.time = right.time}} first and include this in the 1.4 release. The case with a constant offset can be added later. What do you think? Best, Fabian > Add support for (left.time = right.time) predicates to window join. > --- > > Key: FLINK-7996 > URL: https://issues.apache.org/jira/browse/FLINK-7996 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Fabian Hueske >Assignee: Xingcan Cui >Priority: Critical > Fix For: 1.4.0 > > > A common operation is to join the result of two window aggregations on the > same timestamp. > However, window joins do not support equality predicates on time attributes > such as {{left.time = right.time}} but require two range predicates such as > {{left.time >= right.time AND left.time <= right.time}}. > This can be fixed in the translation code (the operator does not have to be > touched). -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4960: Update version to 1.5-SNAPSHOT
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/4960 :yellow_heart: Very nice! ---
[jira] [Updated] (FLINK-7265) FileSystems should describe their kind and consistency level
[ https://issues.apache.org/jira/browse/FLINK-7265?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nico Kruber updated FLINK-7265: --- Fix Version/s: (was: 1.4.0) 1.5.0 > FileSystems should describe their kind and consistency level > > > Key: FLINK-7265 > URL: https://issues.apache.org/jira/browse/FLINK-7265 > Project: Flink > Issue Type: Improvement > Components: Core >Affects Versions: 1.3.1 >Reporter: Stephan Ewen >Assignee: Stephan Ewen > Fix For: 1.3.3, 1.5.0 > > > Currently, all {{FileSystem}} types look the same to Flink. > However, certain operations should only be executed on certain kinds of file > systems. > For example, it makes no sense to attempt to delete an empty parent directory > on S3, because there are no such thinks as directories, only hierarchical > naming in the keys (file names). -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7883) Stop fetching source before a cancel with savepoint
[ https://issues.apache.org/jira/browse/FLINK-7883?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16241762#comment-16241762 ] Antoine Philippot commented on FLINK-7883: -- Hi [~aljoscha], is there any news about this subject, I will be interested to participate on this feature considering that we really need to not reprocess kafka messages for each job restart and we cannot afford to maintain our fork forever > Stop fetching source before a cancel with savepoint > --- > > Key: FLINK-7883 > URL: https://issues.apache.org/jira/browse/FLINK-7883 > Project: Flink > Issue Type: Improvement > Components: DataStream API, Kafka Connector, State Backends, > Checkpointing >Affects Versions: 1.4.0, 1.3.2 >Reporter: Antoine Philippot > > For a cancel with savepoint command, the JobManager trigger the cancel call > once the savepoint is finished, but during the savepoint execution, kafka > source continue to poll new messages which will not be part of the savepoint > and will be replayed on the next application start. > A solution could be to stop fetching the source stream task before triggering > the savepoint. > I suggest to add an interface {{StoppableFetchingSourceFunction}} with a > method {{stopFetching}} that existant SourceFunction implementations could > implement. > We can add a {{stopFetchingSource}} property in > {{CheckpointOptions}} class to pass the desired behaviour from > {{JobManager.handleMessage(CancelJobWithSavepoint)}} to > {{SourceStreamTask.triggerCheckpoint}} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4955: [FLINK-7978][kafka] Ensure that transactional ids ...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/4955#discussion_r149316761 --- Diff: flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/TransactionalIdsGeneratorTest.java --- @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.internal; + +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import static org.junit.Assert.assertEquals; + +/** + * Tests for {@link TransactionalIdsGenerator}. + */ +public class TransactionalIdsGeneratorTest { + private static final int POOL_SIZE = 3; + private static final int SAFE_SCALE_DOWN_FACTOR = 3; + private static final int SUBTASKS_COUNT = 5; + + @Test + public void testGenerateIdsToUse() { + TransactionalIdsGenerator generator = new TransactionalIdsGenerator("test", 2, SUBTASKS_COUNT, POOL_SIZE, SAFE_SCALE_DOWN_FACTOR); + + assertEquals( + new HashSet<>(Arrays.asList("test-42", "test-43", "test-44")), + generator.generateIdsToUse(36)); + } + + /** +* Ids to abort and to use should never clash between subtasks. +*/ + @Test + public void testGeneratedIdsDoNotClash() { + ListidsToAbort = new ArrayList<>(); + List idsToUse = new ArrayList<>(); + + for (int subtask = 0; subtask < SUBTASKS_COUNT; subtask++) { + TransactionalIdsGenerator generator = new TransactionalIdsGenerator("test", subtask, SUBTASKS_COUNT, POOL_SIZE, SAFE_SCALE_DOWN_FACTOR); + idsToUse.add(generator.generateIdsToUse(0)); + idsToAbort.add(generator.generateIdsToAbort()); + } + + for (int subtask1 = 0; subtask1 < SUBTASKS_COUNT; subtask1++) { + for (int subtask2 = 0; subtask2 < SUBTASKS_COUNT; subtask2++) { + if (subtask2 == subtask1) { + continue; + } + assertIntersectionIsEmpty(idsToAbort.get(subtask2), idsToAbort.get(subtask1)); + assertIntersectionIsEmpty(idsToUse.get(subtask2), idsToUse.get(subtask1)); + assertIntersectionIsEmpty(idsToAbort.get(subtask2), idsToUse.get(subtask1)); + } + } + } + + private void assertIntersectionIsEmpty(Set first, Set second) { --- End diff -- Maybe `assertDisjoint` ---
[jira] [Commented] (FLINK-7978) Kafka011 exactly-once Producer sporadically fails to commit under high parallelism
[ https://issues.apache.org/jira/browse/FLINK-7978?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16241760#comment-16241760 ] ASF GitHub Bot commented on FLINK-7978: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/4955#discussion_r149316761 --- Diff: flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/TransactionalIdsGeneratorTest.java --- @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.internal; + +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import static org.junit.Assert.assertEquals; + +/** + * Tests for {@link TransactionalIdsGenerator}. + */ +public class TransactionalIdsGeneratorTest { + private static final int POOL_SIZE = 3; + private static final int SAFE_SCALE_DOWN_FACTOR = 3; + private static final int SUBTASKS_COUNT = 5; + + @Test + public void testGenerateIdsToUse() { + TransactionalIdsGenerator generator = new TransactionalIdsGenerator("test", 2, SUBTASKS_COUNT, POOL_SIZE, SAFE_SCALE_DOWN_FACTOR); + + assertEquals( + new HashSet<>(Arrays.asList("test-42", "test-43", "test-44")), + generator.generateIdsToUse(36)); + } + + /** +* Ids to abort and to use should never clash between subtasks. +*/ + @Test + public void testGeneratedIdsDoNotClash() { + ListidsToAbort = new ArrayList<>(); + List idsToUse = new ArrayList<>(); + + for (int subtask = 0; subtask < SUBTASKS_COUNT; subtask++) { + TransactionalIdsGenerator generator = new TransactionalIdsGenerator("test", subtask, SUBTASKS_COUNT, POOL_SIZE, SAFE_SCALE_DOWN_FACTOR); + idsToUse.add(generator.generateIdsToUse(0)); + idsToAbort.add(generator.generateIdsToAbort()); + } + + for (int subtask1 = 0; subtask1 < SUBTASKS_COUNT; subtask1++) { + for (int subtask2 = 0; subtask2 < SUBTASKS_COUNT; subtask2++) { + if (subtask2 == subtask1) { + continue; + } + assertIntersectionIsEmpty(idsToAbort.get(subtask2), idsToAbort.get(subtask1)); + assertIntersectionIsEmpty(idsToUse.get(subtask2), idsToUse.get(subtask1)); + assertIntersectionIsEmpty(idsToAbort.get(subtask2), idsToUse.get(subtask1)); + } + } + } + + private void assertIntersectionIsEmpty(Set first, Set second) { --- End diff -- Maybe `assertDisjoint` > Kafka011 exactly-once Producer sporadically fails to commit under high > parallelism > -- > > Key: FLINK-7978 > URL: https://issues.apache.org/jira/browse/FLINK-7978 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: Gary Yao >Assignee: Piotr Nowojski >Priority: Blocker > Fix For: 1.4.0 > > > The Kafka011 exactly-once producer sporadically fails to commit/confirm the > first checkpoint. The behavior seems to be easier reproduced under high job > parallelism. > *Logs/Stacktrace* > {noformat} > 10:24:35,347 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator >- Completed checkpoint 1 (191029 bytes in 1435 ms). > 10:24:35,349 INFO > org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction - >
[jira] [Commented] (FLINK-7978) Kafka011 exactly-once Producer sporadically fails to commit under high parallelism
[ https://issues.apache.org/jira/browse/FLINK-7978?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16241735#comment-16241735 ] ASF GitHub Bot commented on FLINK-7978: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/4955#discussion_r149312052 --- Diff: flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/TransactionalIdsGeneratorTest.java --- @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.internal; + +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import static org.junit.Assert.assertEquals; + +/** + * Tests for {@link TransactionalIdsGenerator}. + */ +public class TransactionalIdsGeneratorTest { + private static final int POOL_SIZE = 3; + private static final int SAFE_SCALE_DOWN_FACTOR = 3; + private static final int SUBTASKS_COUNT = 5; + + @Test + public void testGenerateIdsToUse() { + TransactionalIdsGenerator generator = new TransactionalIdsGenerator("test", 2, SUBTASKS_COUNT, POOL_SIZE, SAFE_SCALE_DOWN_FACTOR); + + assertEquals( + new HashSet<>(Arrays.asList("test-42", "test-43", "test-44")), + generator.generateIdsToUse(36)); + } + + /** +* Ids to abort and to use should never clash between subtasks. +*/ + @Test + public void testGeneratedIdsDoNotClash() { + ListidsToAbort = new ArrayList<>(); + List idsToUse = new ArrayList<>(); + + for (int subtask = 0; subtask < SUBTASKS_COUNT; subtask++) { + TransactionalIdsGenerator generator = new TransactionalIdsGenerator("test", subtask, SUBTASKS_COUNT, POOL_SIZE, SAFE_SCALE_DOWN_FACTOR); + idsToUse.add(generator.generateIdsToUse(0)); + idsToAbort.add(generator.generateIdsToAbort()); + } + + for (int subtask1 = 0; subtask1 < SUBTASKS_COUNT; subtask1++) { + for (int subtask2 = 0; subtask2 < SUBTASKS_COUNT; subtask2++) { + if (subtask2 == subtask1) { + continue; + } + assertIntersectionIsEmpty(idsToAbort.get(subtask2), idsToAbort.get(subtask1)); + assertIntersectionIsEmpty(idsToUse.get(subtask2), idsToUse.get(subtask1)); + assertIntersectionIsEmpty(idsToAbort.get(subtask2), idsToUse.get(subtask1)); --- End diff -- what about `assertIntersectionIsEmpty(idsToUse.get(subtask2), idsToAbort.get(subtask1));`? > Kafka011 exactly-once Producer sporadically fails to commit under high > parallelism > -- > > Key: FLINK-7978 > URL: https://issues.apache.org/jira/browse/FLINK-7978 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: Gary Yao >Assignee: Piotr Nowojski >Priority: Blocker > Fix For: 1.4.0 > > > The Kafka011 exactly-once producer sporadically fails to commit/confirm the > first checkpoint. The behavior seems to be easier reproduced under high job > parallelism. > *Logs/Stacktrace* > {noformat} > 10:24:35,347 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator >- Completed checkpoint 1 (191029 bytes in 1435 ms). > 10:24:35,349 INFO > org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction - > FlinkKafkaProducer011 2/32 - checkpoint 1 complete,
[GitHub] flink pull request #4955: [FLINK-7978][kafka] Ensure that transactional ids ...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/4955#discussion_r149312052 --- Diff: flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/TransactionalIdsGeneratorTest.java --- @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.internal; + +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import static org.junit.Assert.assertEquals; + +/** + * Tests for {@link TransactionalIdsGenerator}. + */ +public class TransactionalIdsGeneratorTest { + private static final int POOL_SIZE = 3; + private static final int SAFE_SCALE_DOWN_FACTOR = 3; + private static final int SUBTASKS_COUNT = 5; + + @Test + public void testGenerateIdsToUse() { + TransactionalIdsGenerator generator = new TransactionalIdsGenerator("test", 2, SUBTASKS_COUNT, POOL_SIZE, SAFE_SCALE_DOWN_FACTOR); + + assertEquals( + new HashSet<>(Arrays.asList("test-42", "test-43", "test-44")), + generator.generateIdsToUse(36)); + } + + /** +* Ids to abort and to use should never clash between subtasks. +*/ + @Test + public void testGeneratedIdsDoNotClash() { + ListidsToAbort = new ArrayList<>(); + List idsToUse = new ArrayList<>(); + + for (int subtask = 0; subtask < SUBTASKS_COUNT; subtask++) { + TransactionalIdsGenerator generator = new TransactionalIdsGenerator("test", subtask, SUBTASKS_COUNT, POOL_SIZE, SAFE_SCALE_DOWN_FACTOR); + idsToUse.add(generator.generateIdsToUse(0)); + idsToAbort.add(generator.generateIdsToAbort()); + } + + for (int subtask1 = 0; subtask1 < SUBTASKS_COUNT; subtask1++) { + for (int subtask2 = 0; subtask2 < SUBTASKS_COUNT; subtask2++) { + if (subtask2 == subtask1) { + continue; + } + assertIntersectionIsEmpty(idsToAbort.get(subtask2), idsToAbort.get(subtask1)); + assertIntersectionIsEmpty(idsToUse.get(subtask2), idsToUse.get(subtask1)); + assertIntersectionIsEmpty(idsToAbort.get(subtask2), idsToUse.get(subtask1)); --- End diff -- what about `assertIntersectionIsEmpty(idsToUse.get(subtask2), idsToAbort.get(subtask1));`? ---
[jira] [Commented] (FLINK-7878) Extend the resource type user can define in ResourceSpec
[ https://issues.apache.org/jira/browse/FLINK-7878?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16241752#comment-16241752 ] ASF GitHub Bot commented on FLINK-7878: --- Github user shuai-xu commented on the issue: https://github.com/apache/flink/pull/4911 @tillrohrmann, There is not a generic way for both Yarn and Mesos, as their resource allocation interface are different. I think the YARN/MESOS resource manager should handle it in their own way. For example, in YarnResourceManager, it can add all extended resources to the yarn Resource.class by call setResourceValue(name, value). And then only if YARN support a new resource type, user can define it without code changing in flink. > Extend the resource type user can define in ResourceSpec > > > Key: FLINK-7878 > URL: https://issues.apache.org/jira/browse/FLINK-7878 > Project: Flink > Issue Type: Improvement > Components: DataSet API, DataStream API >Reporter: shuai.xu >Assignee: shuai.xu > Labels: flip-6 > > Now, flink only support user define how much CPU and MEM used in an operator, > but now the resource in a cluster is various. For example, an application for > image processing may need GPU, some others may need FPGA. > Only CPU and MEM is not enough, and the resource type is becoming more and > more, so we need to make the ResourSpec extendible. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4955: [FLINK-7978][kafka] Ensure that transactional ids ...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/4955#discussion_r149315581 --- Diff: flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/TransactionalIdsGeneratorTest.java --- @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.internal; + +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import static org.junit.Assert.assertEquals; + +/** + * Tests for {@link TransactionalIdsGenerator}. + */ +public class TransactionalIdsGeneratorTest { + private static final int POOL_SIZE = 3; + private static final int SAFE_SCALE_DOWN_FACTOR = 3; + private static final int SUBTASKS_COUNT = 5; + + @Test + public void testGenerateIdsToUse() { + TransactionalIdsGenerator generator = new TransactionalIdsGenerator("test", 2, SUBTASKS_COUNT, POOL_SIZE, SAFE_SCALE_DOWN_FACTOR); + + assertEquals( + new HashSet<>(Arrays.asList("test-42", "test-43", "test-44")), + generator.generateIdsToUse(36)); + } + + /** +* Ids to abort and to use should never clash between subtasks. +*/ + @Test + public void testGeneratedIdsDoNotClash() { + ListidsToAbort = new ArrayList<>(); + List idsToUse = new ArrayList<>(); + + for (int subtask = 0; subtask < SUBTASKS_COUNT; subtask++) { + TransactionalIdsGenerator generator = new TransactionalIdsGenerator("test", subtask, SUBTASKS_COUNT, POOL_SIZE, SAFE_SCALE_DOWN_FACTOR); + idsToUse.add(generator.generateIdsToUse(0)); + idsToAbort.add(generator.generateIdsToAbort()); + } + + for (int subtask1 = 0; subtask1 < SUBTASKS_COUNT; subtask1++) { + for (int subtask2 = 0; subtask2 < SUBTASKS_COUNT; subtask2++) { + if (subtask2 == subtask1) { + continue; + } + assertIntersectionIsEmpty(idsToAbort.get(subtask2), idsToAbort.get(subtask1)); + assertIntersectionIsEmpty(idsToUse.get(subtask2), idsToUse.get(subtask1)); + assertIntersectionIsEmpty(idsToAbort.get(subtask2), idsToUse.get(subtask1)); --- End diff -- ah you're right, thanks for the explanation :) ---
[jira] [Commented] (FLINK-7978) Kafka011 exactly-once Producer sporadically fails to commit under high parallelism
[ https://issues.apache.org/jira/browse/FLINK-7978?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16241751#comment-16241751 ] ASF GitHub Bot commented on FLINK-7978: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/4955#discussion_r149315581 --- Diff: flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/TransactionalIdsGeneratorTest.java --- @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.internal; + +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import static org.junit.Assert.assertEquals; + +/** + * Tests for {@link TransactionalIdsGenerator}. + */ +public class TransactionalIdsGeneratorTest { + private static final int POOL_SIZE = 3; + private static final int SAFE_SCALE_DOWN_FACTOR = 3; + private static final int SUBTASKS_COUNT = 5; + + @Test + public void testGenerateIdsToUse() { + TransactionalIdsGenerator generator = new TransactionalIdsGenerator("test", 2, SUBTASKS_COUNT, POOL_SIZE, SAFE_SCALE_DOWN_FACTOR); + + assertEquals( + new HashSet<>(Arrays.asList("test-42", "test-43", "test-44")), + generator.generateIdsToUse(36)); + } + + /** +* Ids to abort and to use should never clash between subtasks. +*/ + @Test + public void testGeneratedIdsDoNotClash() { + ListidsToAbort = new ArrayList<>(); + List idsToUse = new ArrayList<>(); + + for (int subtask = 0; subtask < SUBTASKS_COUNT; subtask++) { + TransactionalIdsGenerator generator = new TransactionalIdsGenerator("test", subtask, SUBTASKS_COUNT, POOL_SIZE, SAFE_SCALE_DOWN_FACTOR); + idsToUse.add(generator.generateIdsToUse(0)); + idsToAbort.add(generator.generateIdsToAbort()); + } + + for (int subtask1 = 0; subtask1 < SUBTASKS_COUNT; subtask1++) { + for (int subtask2 = 0; subtask2 < SUBTASKS_COUNT; subtask2++) { + if (subtask2 == subtask1) { + continue; + } + assertIntersectionIsEmpty(idsToAbort.get(subtask2), idsToAbort.get(subtask1)); + assertIntersectionIsEmpty(idsToUse.get(subtask2), idsToUse.get(subtask1)); + assertIntersectionIsEmpty(idsToAbort.get(subtask2), idsToUse.get(subtask1)); --- End diff -- ah you're right, thanks for the explanation :) > Kafka011 exactly-once Producer sporadically fails to commit under high > parallelism > -- > > Key: FLINK-7978 > URL: https://issues.apache.org/jira/browse/FLINK-7978 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: Gary Yao >Assignee: Piotr Nowojski >Priority: Blocker > Fix For: 1.4.0 > > > The Kafka011 exactly-once producer sporadically fails to commit/confirm the > first checkpoint. The behavior seems to be easier reproduced under high job > parallelism. > *Logs/Stacktrace* > {noformat} > 10:24:35,347 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator >- Completed checkpoint 1 (191029 bytes in 1435 ms). > 10:24:35,349 INFO > org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction - > FlinkKafkaProducer011 2/32 - checkpoint 1 complete, committing transaction >
[GitHub] flink issue #4911: [FLINK-7878] [api] make resource type extendible in Resou...
Github user shuai-xu commented on the issue: https://github.com/apache/flink/pull/4911 @tillrohrmann, There is not a generic way for both Yarn and Mesos, as their resource allocation interface are different. I think the YARN/MESOS resource manager should handle it in their own way. For example, in YarnResourceManager, it can add all extended resources to the yarn Resource.class by call setResourceValue(name, value). And then only if YARN support a new resource type, user can define it without code changing in flink. ---
[jira] [Commented] (FLINK-7978) Kafka011 exactly-once Producer sporadically fails to commit under high parallelism
[ https://issues.apache.org/jira/browse/FLINK-7978?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16241749#comment-16241749 ] ASF GitHub Bot commented on FLINK-7978: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/4955#discussion_r149315173 --- Diff: flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/TransactionalIdsGeneratorTest.java --- @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.internal; + +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import static org.junit.Assert.assertEquals; + +/** + * Tests for {@link TransactionalIdsGenerator}. + */ +public class TransactionalIdsGeneratorTest { + private static final int POOL_SIZE = 3; + private static final int SAFE_SCALE_DOWN_FACTOR = 3; + private static final int SUBTASKS_COUNT = 5; + + @Test + public void testGenerateIdsToUse() { + TransactionalIdsGenerator generator = new TransactionalIdsGenerator("test", 2, SUBTASKS_COUNT, POOL_SIZE, SAFE_SCALE_DOWN_FACTOR); + + assertEquals( + new HashSet<>(Arrays.asList("test-42", "test-43", "test-44")), + generator.generateIdsToUse(36)); + } + + /** +* Ids to abort and to use should never clash between subtasks. +*/ + @Test + public void testGeneratedIdsDoNotClash() { + ListidsToAbort = new ArrayList<>(); + List idsToUse = new ArrayList<>(); + + for (int subtask = 0; subtask < SUBTASKS_COUNT; subtask++) { + TransactionalIdsGenerator generator = new TransactionalIdsGenerator("test", subtask, SUBTASKS_COUNT, POOL_SIZE, SAFE_SCALE_DOWN_FACTOR); + idsToUse.add(generator.generateIdsToUse(0)); + idsToAbort.add(generator.generateIdsToAbort()); + } + + for (int subtask1 = 0; subtask1 < SUBTASKS_COUNT; subtask1++) { + for (int subtask2 = 0; subtask2 < SUBTASKS_COUNT; subtask2++) { + if (subtask2 == subtask1) { + continue; + } + assertIntersectionIsEmpty(idsToAbort.get(subtask2), idsToAbort.get(subtask1)); + assertIntersectionIsEmpty(idsToUse.get(subtask2), idsToUse.get(subtask1)); + assertIntersectionIsEmpty(idsToAbort.get(subtask2), idsToUse.get(subtask1)); --- End diff -- that will be covered by second half of looping (everytime I start from index 0). It would be necessary if the loop would look like: ``` for (int subtask1 = 0; subtask1 < SUBTASKS_COUNT; subtask1++) { for (int subtask2 = subtask1; subtask2 < SUBTASKS_COUNT; subtask2++) { ``` instead of as it is now: ``` for (int subtask1 = 0; subtask1 < SUBTASKS_COUNT; subtask1++) { for (int subtask2 = 0; subtask2 < SUBTASKS_COUNT; subtask2++) { ``` > Kafka011 exactly-once Producer sporadically fails to commit under high > parallelism > -- > > Key: FLINK-7978 > URL: https://issues.apache.org/jira/browse/FLINK-7978 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: Gary Yao >Assignee: Piotr Nowojski >Priority: Blocker > Fix For: 1.4.0 > > > The Kafka011 exactly-once producer sporadically fails to commit/confirm the > first checkpoint. The behavior seems to be
[jira] [Commented] (FLINK-7978) Kafka011 exactly-once Producer sporadically fails to commit under high parallelism
[ https://issues.apache.org/jira/browse/FLINK-7978?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16241748#comment-16241748 ] ASF GitHub Bot commented on FLINK-7978: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/4955#discussion_r149315106 --- Diff: flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/TransactionalIdsGenerator.java --- @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.internal; + +import java.util.HashSet; +import java.util.Set; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Class responsible for generating transactional ids to use when communicating with Kafka. + * + * It guarantees that: + * - generated ids to use will never clash with ids to use from different subtasks + * - generated ids to abort will never clash with ids to abort from different subtasks + * - generated ids to use will never clash with ids to abort from different subtasks + * + * In other words, any particular generated id will always be assigned to one and only one subtask. + */ +public class TransactionalIdsGenerator { --- End diff -- It is better to use the unordered list tag, e.g., ``` * * generated ids to use will never clash with ids to use from different subtasks * generated ids to abort will never clash with ids to abort from different subtasks * generated ids to use will never clash with ids to abort from different subtasks * ``` Otherwise it is rendered like this: ![image](https://user-images.githubusercontent.com/1681921/32487045-c97ffe8a-c3a8-11e7-95e8-f3ee127072b9.png) > Kafka011 exactly-once Producer sporadically fails to commit under high > parallelism > -- > > Key: FLINK-7978 > URL: https://issues.apache.org/jira/browse/FLINK-7978 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: Gary Yao >Assignee: Piotr Nowojski >Priority: Blocker > Fix For: 1.4.0 > > > The Kafka011 exactly-once producer sporadically fails to commit/confirm the > first checkpoint. The behavior seems to be easier reproduced under high job > parallelism. > *Logs/Stacktrace* > {noformat} > 10:24:35,347 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator >- Completed checkpoint 1 (191029 bytes in 1435 ms). > 10:24:35,349 INFO > org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction - > FlinkKafkaProducer011 2/32 - checkpoint 1 complete, committing transaction > TransactionHolder{handle=KafkaTransactionState [transactionalId=Sink: > kafka-sink-1509787467330-12], transactionStartTime=1509787474588} from > checkpoint 1 > 10:24:35,349 INFO > org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction - > FlinkKafkaProducer011 1/32 - checkpoint 1 complete, committing transaction > TransactionHolder{handle=KafkaTransactionState [transactionalId=Sink: > kafka-sink-1509787467330-8], transactionStartTime=1509787474393} from > checkpoint 1 > 10:24:35,349 INFO > org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction - > FlinkKafkaProducer011 0/32 - checkpoint 1 complete, committing transaction > TransactionHolder{handle=KafkaTransactionState [transactionalId=Sink: > kafka-sink-1509787467330-4], transactionStartTime=1509787474448} from > checkpoint 1 > 10:24:35,350 INFO > org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction - > FlinkKafkaProducer011 6/32 - checkpoint 1 complete, committing transaction > TransactionHolder{handle=KafkaTransactionState [transactionalId=Sink: > kafka-sink-1509787467330-34],
[GitHub] flink pull request #4955: [FLINK-7978][kafka] Ensure that transactional ids ...
Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/4955#discussion_r149315173 --- Diff: flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/TransactionalIdsGeneratorTest.java --- @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.internal; + +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import static org.junit.Assert.assertEquals; + +/** + * Tests for {@link TransactionalIdsGenerator}. + */ +public class TransactionalIdsGeneratorTest { + private static final int POOL_SIZE = 3; + private static final int SAFE_SCALE_DOWN_FACTOR = 3; + private static final int SUBTASKS_COUNT = 5; + + @Test + public void testGenerateIdsToUse() { + TransactionalIdsGenerator generator = new TransactionalIdsGenerator("test", 2, SUBTASKS_COUNT, POOL_SIZE, SAFE_SCALE_DOWN_FACTOR); + + assertEquals( + new HashSet<>(Arrays.asList("test-42", "test-43", "test-44")), + generator.generateIdsToUse(36)); + } + + /** +* Ids to abort and to use should never clash between subtasks. +*/ + @Test + public void testGeneratedIdsDoNotClash() { + ListidsToAbort = new ArrayList<>(); + List idsToUse = new ArrayList<>(); + + for (int subtask = 0; subtask < SUBTASKS_COUNT; subtask++) { + TransactionalIdsGenerator generator = new TransactionalIdsGenerator("test", subtask, SUBTASKS_COUNT, POOL_SIZE, SAFE_SCALE_DOWN_FACTOR); + idsToUse.add(generator.generateIdsToUse(0)); + idsToAbort.add(generator.generateIdsToAbort()); + } + + for (int subtask1 = 0; subtask1 < SUBTASKS_COUNT; subtask1++) { + for (int subtask2 = 0; subtask2 < SUBTASKS_COUNT; subtask2++) { + if (subtask2 == subtask1) { + continue; + } + assertIntersectionIsEmpty(idsToAbort.get(subtask2), idsToAbort.get(subtask1)); + assertIntersectionIsEmpty(idsToUse.get(subtask2), idsToUse.get(subtask1)); + assertIntersectionIsEmpty(idsToAbort.get(subtask2), idsToUse.get(subtask1)); --- End diff -- that will be covered by second half of looping (everytime I start from index 0). It would be necessary if the loop would look like: ``` for (int subtask1 = 0; subtask1 < SUBTASKS_COUNT; subtask1++) { for (int subtask2 = subtask1; subtask2 < SUBTASKS_COUNT; subtask2++) { ``` instead of as it is now: ``` for (int subtask1 = 0; subtask1 < SUBTASKS_COUNT; subtask1++) { for (int subtask2 = 0; subtask2 < SUBTASKS_COUNT; subtask2++) { ``` ---
[jira] [Commented] (FLINK-6022) Improve support for Avro GenericRecord
[ https://issues.apache.org/jira/browse/FLINK-6022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16241747#comment-16241747 ] Stephan Ewen commented on FLINK-6022: - I think this should be resolved, because Generic records and all non-specific records should go through the ReflectDatumReader/Writer. However, would be great if someone from the people that originally opened the ticket could comment on this. > Improve support for Avro GenericRecord > -- > > Key: FLINK-6022 > URL: https://issues.apache.org/jira/browse/FLINK-6022 > Project: Flink > Issue Type: Improvement > Components: Type Serialization System >Reporter: Robert Metzger >Assignee: Stephan Ewen >Priority: Blocker > Fix For: 1.4.0 > > > Currently, Flink is serializing the schema for each Avro GenericRecord in the > stream. > This leads to a lot of overhead over the wire/disk + high serialization costs. > Therefore, I'm proposing to improve the support for GenericRecord in Flink by > shipping the schema to each serializer through the AvroTypeInformation. > Then, we can only support GenericRecords with the same type per stream, but > the performance will be much better. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4955: [FLINK-7978][kafka] Ensure that transactional ids ...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/4955#discussion_r149315106 --- Diff: flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/TransactionalIdsGenerator.java --- @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.internal; + +import java.util.HashSet; +import java.util.Set; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Class responsible for generating transactional ids to use when communicating with Kafka. + * + * It guarantees that: + * - generated ids to use will never clash with ids to use from different subtasks + * - generated ids to abort will never clash with ids to abort from different subtasks + * - generated ids to use will never clash with ids to abort from different subtasks + * + * In other words, any particular generated id will always be assigned to one and only one subtask. + */ +public class TransactionalIdsGenerator { --- End diff -- It is better to use the unordered list tag, e.g., ``` * * generated ids to use will never clash with ids to use from different subtasks * generated ids to abort will never clash with ids to abort from different subtasks * generated ids to use will never clash with ids to abort from different subtasks * ``` Otherwise it is rendered like this: ![image](https://user-images.githubusercontent.com/1681921/32487045-c97ffe8a-c3a8-11e7-95e8-f3ee127072b9.png) ---
[jira] [Commented] (FLINK-7978) Kafka011 exactly-once Producer sporadically fails to commit under high parallelism
[ https://issues.apache.org/jira/browse/FLINK-7978?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16241736#comment-16241736 ] ASF GitHub Bot commented on FLINK-7978: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/4955#discussion_r149310451 --- Diff: flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/TransactionalIdsGenerator.java --- @@ -19,27 +19,35 @@ import java.util.HashSet; import java.util.Set; -import java.util.stream.Collectors; -import java.util.stream.LongStream; import static org.apache.flink.util.Preconditions.checkNotNull; /** * Class responsible for generating transactional ids to use when communicating with Kafka. + * + * It guarantees that: + * - generated ids to use will never clash with ids to use from different subtasks + * - generated ids to abort will never clash with ids to abort from different subtasks + * - generated ids to use will never clash with ids to abort from different subtasks + * + * In other words, any particular generated id will always be assigned to one and only one subtask. */ public class TransactionalIdsGenerator { private final String prefix; private final int subtaskIndex; + private final int totalNumberOfSubtasks; private final int poolSize; private final int safeScaleDownFactor; public TransactionalIdsGenerator( String prefix, int subtaskIndex, + int totalNumberOfSubtasks, int poolSize, int safeScaleDownFactor) { this.prefix = checkNotNull(prefix); this.subtaskIndex = subtaskIndex; + this.totalNumberOfSubtasks = totalNumberOfSubtasks; --- End diff -- Maybe we should add some argument checks for subtask index and totalNumberOfSubtasks, at least. > Kafka011 exactly-once Producer sporadically fails to commit under high > parallelism > -- > > Key: FLINK-7978 > URL: https://issues.apache.org/jira/browse/FLINK-7978 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: Gary Yao >Assignee: Piotr Nowojski >Priority: Blocker > Fix For: 1.4.0 > > > The Kafka011 exactly-once producer sporadically fails to commit/confirm the > first checkpoint. The behavior seems to be easier reproduced under high job > parallelism. > *Logs/Stacktrace* > {noformat} > 10:24:35,347 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator >- Completed checkpoint 1 (191029 bytes in 1435 ms). > 10:24:35,349 INFO > org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction - > FlinkKafkaProducer011 2/32 - checkpoint 1 complete, committing transaction > TransactionHolder{handle=KafkaTransactionState [transactionalId=Sink: > kafka-sink-1509787467330-12], transactionStartTime=1509787474588} from > checkpoint 1 > 10:24:35,349 INFO > org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction - > FlinkKafkaProducer011 1/32 - checkpoint 1 complete, committing transaction > TransactionHolder{handle=KafkaTransactionState [transactionalId=Sink: > kafka-sink-1509787467330-8], transactionStartTime=1509787474393} from > checkpoint 1 > 10:24:35,349 INFO > org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction - > FlinkKafkaProducer011 0/32 - checkpoint 1 complete, committing transaction > TransactionHolder{handle=KafkaTransactionState [transactionalId=Sink: > kafka-sink-1509787467330-4], transactionStartTime=1509787474448} from > checkpoint 1 > 10:24:35,350 INFO > org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction - > FlinkKafkaProducer011 6/32 - checkpoint 1 complete, committing transaction > TransactionHolder{handle=KafkaTransactionState [transactionalId=Sink: > kafka-sink-1509787467330-34], transactionStartTime=1509787474742} from > checkpoint 1 > 10:24:35,350 INFO > org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction - > FlinkKafkaProducer011 4/32 - checkpoint 1 complete, committing transaction > TransactionHolder{handle=KafkaTransactionState [transactionalId=Sink: > kafka-sink-1509787467330-23], transactionStartTime=1509787474777} from > checkpoint 1 > 10:24:35,353 INFO > org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction - > FlinkKafkaProducer011 10/32 - checkpoint 1 complete, committing transaction > TransactionHolder{handle=KafkaTransactionState [transactionalId=Sink: >
[GitHub] flink pull request #4955: [FLINK-7978][kafka] Ensure that transactional ids ...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/4955#discussion_r149310451 --- Diff: flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/TransactionalIdsGenerator.java --- @@ -19,27 +19,35 @@ import java.util.HashSet; import java.util.Set; -import java.util.stream.Collectors; -import java.util.stream.LongStream; import static org.apache.flink.util.Preconditions.checkNotNull; /** * Class responsible for generating transactional ids to use when communicating with Kafka. + * + * It guarantees that: + * - generated ids to use will never clash with ids to use from different subtasks + * - generated ids to abort will never clash with ids to abort from different subtasks + * - generated ids to use will never clash with ids to abort from different subtasks + * + * In other words, any particular generated id will always be assigned to one and only one subtask. */ public class TransactionalIdsGenerator { private final String prefix; private final int subtaskIndex; + private final int totalNumberOfSubtasks; private final int poolSize; private final int safeScaleDownFactor; public TransactionalIdsGenerator( String prefix, int subtaskIndex, + int totalNumberOfSubtasks, int poolSize, int safeScaleDownFactor) { this.prefix = checkNotNull(prefix); this.subtaskIndex = subtaskIndex; + this.totalNumberOfSubtasks = totalNumberOfSubtasks; --- End diff -- Maybe we should add some argument checks for subtask index and totalNumberOfSubtasks, at least. ---
[jira] [Commented] (FLINK-7996) Add support for (left.time = right.time) predicates to window join.
[ https://issues.apache.org/jira/browse/FLINK-7996?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16241732#comment-16241732 ] Xingcan Cui commented on FLINK-7996: Hi [~fhueske], thanks for opening this. IMO, the conditions like {{left.time = right.time + C}} should also be valid since they can be transformed to a window like {{left.time >= right.time + C && left.time <= right.time + C}}. While this seems to be sort of contradictory with FLINK-7800, in which the row expression like {{left.attr = right.attr + C}} will always be translated to a calculation with an equi-join (i.e., $1 = right.time + C and left.time = $1). I need to consider the two problems as a whole. Best, Xingcan > Add support for (left.time = right.time) predicates to window join. > --- > > Key: FLINK-7996 > URL: https://issues.apache.org/jira/browse/FLINK-7996 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Fabian Hueske >Assignee: Xingcan Cui >Priority: Critical > Fix For: 1.4.0 > > > A common operation is to join the result of two window aggregations on the > same timestamp. > However, window joins do not support equality predicates on time attributes > such as {{left.time = right.time}} but require two range predicates such as > {{left.time >= right.time AND left.time <= right.time}}. > This can be fixed in the translation code (the operator does not have to be > touched). -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6022) Improve support for Avro GenericRecord
[ https://issues.apache.org/jira/browse/FLINK-6022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16241726#comment-16241726 ] Aljoscha Krettek commented on FLINK-6022: - [~StephanEwen] We should probably open a follow-up issue and mark this one as resolved? Or move to 1.5. > Improve support for Avro GenericRecord > -- > > Key: FLINK-6022 > URL: https://issues.apache.org/jira/browse/FLINK-6022 > Project: Flink > Issue Type: Improvement > Components: Type Serialization System >Reporter: Robert Metzger >Assignee: Stephan Ewen >Priority: Blocker > Fix For: 1.4.0 > > > Currently, Flink is serializing the schema for each Avro GenericRecord in the > stream. > This leads to a lot of overhead over the wire/disk + high serialization costs. > Therefore, I'm proposing to improve the support for GenericRecord in Flink by > shipping the schema to each serializer through the AvroTypeInformation. > Then, we can only support GenericRecords with the same type per stream, but > the performance will be much better. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4960: Update version to 1.5-SNAPSHOT
Github user aljoscha closed the pull request at: https://github.com/apache/flink/pull/4960 ---
[GitHub] flink pull request #4729: [FLINK-7076] [ResourceManager] implement YARN stop...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4729#discussion_r149307271 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnWorkerNode.java --- @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.clusterframework.types.ResourceIDRetrievable; + +import org.apache.hadoop.yarn.api.records.Container; + +import java.io.Serializable; + +/** + * A stored YARN worker, which contains the YARN container. + */ +public class YarnWorkerNode implements ResourceIDRetrievable, Serializable { --- End diff -- Remove the type constraint in `ResourceManager`, I would suggest. ---
[jira] [Commented] (FLINK-7076) Implement container release to support dynamic scaling
[ https://issues.apache.org/jira/browse/FLINK-7076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16241718#comment-16241718 ] ASF GitHub Bot commented on FLINK-7076: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4729#discussion_r149307271 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnWorkerNode.java --- @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.clusterframework.types.ResourceIDRetrievable; + +import org.apache.hadoop.yarn.api.records.Container; + +import java.io.Serializable; + +/** + * A stored YARN worker, which contains the YARN container. + */ +public class YarnWorkerNode implements ResourceIDRetrievable, Serializable { --- End diff -- Remove the type constraint in `ResourceManager`, I would suggest. > Implement container release to support dynamic scaling > -- > > Key: FLINK-7076 > URL: https://issues.apache.org/jira/browse/FLINK-7076 > Project: Flink > Issue Type: Sub-task > Components: ResourceManager >Reporter: Till Rohrmann >Assignee: Shuyi Chen > Labels: flip-6 > > In order to support dynamic scaling, the {{YarnResourceManager}} has to be > able to dynamically free containers. We have to implement the > {{YarnResourceManager#stopWorker}} method. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4911: [FLINK-7878] [api] make resource type extendible in Resou...
Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/4911 How do you pass the resource specification in a generic way to Yarn and Mesos? Is there some kind of interface defined for that? ---
[jira] [Commented] (FLINK-7878) Extend the resource type user can define in ResourceSpec
[ https://issues.apache.org/jira/browse/FLINK-7878?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16241715#comment-16241715 ] ASF GitHub Bot commented on FLINK-7878: --- Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/4911 How do you pass the resource specification in a generic way to Yarn and Mesos? Is there some kind of interface defined for that? > Extend the resource type user can define in ResourceSpec > > > Key: FLINK-7878 > URL: https://issues.apache.org/jira/browse/FLINK-7878 > Project: Flink > Issue Type: Improvement > Components: DataSet API, DataStream API >Reporter: shuai.xu >Assignee: shuai.xu > Labels: flip-6 > > Now, flink only support user define how much CPU and MEM used in an operator, > but now the resource in a cluster is various. For example, an application for > image processing may need GPU, some others may need FPGA. > Only CPU and MEM is not enough, and the resource type is becoming more and > more, so we need to make the ResourSpec extendible. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7856) Port JobVertexBackPressureHandler to REST endpoint
[ https://issues.apache.org/jira/browse/FLINK-7856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16241710#comment-16241710 ] ASF GitHub Bot commented on FLINK-7856: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4893#discussion_r149305450 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexBackPressureHandler.java --- @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.job; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.WebOptions; +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; +import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex; +import org.apache.flink.runtime.executiongraph.ArchivedExecutionJobVertex; +import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.handler.RestHandlerException; +import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache; +import org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureStatsTracker; +import org.apache.flink.runtime.rest.handler.legacy.backpressure.OperatorBackPressureStats; +import org.apache.flink.runtime.rest.handler.legacy.backpressure.StackTraceSampleCoordinator; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.JobVertexBackPressureInfo; +import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter; +import org.apache.flink.runtime.rest.messages.JobVertexMessageParameters; +import org.apache.flink.runtime.rest.messages.MessageHeaders; +import org.apache.flink.runtime.webmonitor.RestfulGateway; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; + +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; + +/** + * Request handler for the job vertex back pressure. + */ +public class JobVertexBackPressureHandler extends AbstractExecutionGraphHandler{ + /** Back pressure stats tracker. */ + private final BackPressureStatsTracker backPressureStatsTracker; + + /** Time after which stats are considered outdated. */ + private final int refreshInterval; + + public JobVertexBackPressureHandler( + CompletableFuture localRestAddress, + GatewayRetriever leaderRetriever, + Time timeout, + MessageHeaders messageHeaders, + ExecutionGraphCache executionGraphCache, + Executor executor, + Configuration clusterConfiguration) { + super(localRestAddress, leaderRetriever, timeout, messageHeaders, executionGraphCache, executor); + + // Back pressure stats tracker config + this.refreshInterval = clusterConfiguration.getInteger(WebOptions.BACKPRESSURE_REFRESH_INTERVAL); + this.backPressureStatsTracker = new BackPressureStatsTracker( + new StackTraceSampleCoordinator(executor, 6), + clusterConfiguration.getInteger(WebOptions.BACKPRESSURE_CLEANUP_INTERVAL), + clusterConfiguration.getInteger(WebOptions.BACKPRESSURE_NUM_SAMPLES), + Time.milliseconds(clusterConfiguration.getInteger(WebOptions.BACKPRESSURE_DELAY))); + } + + @Override + protected
[jira] [Commented] (FLINK-7856) Port JobVertexBackPressureHandler to REST endpoint
[ https://issues.apache.org/jira/browse/FLINK-7856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16241708#comment-16241708 ] ASF GitHub Bot commented on FLINK-7856: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4893#discussion_r149305275 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexBackPressureInfo.java --- @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages; + +import org.apache.flink.runtime.rest.handler.job.JobVertexBackPressureHandler; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.List; +import java.util.Objects; + +/** + * Response type of the {@link JobVertexBackPressureHandler}. + */ +public class JobVertexBackPressureInfo implements ResponseBody { + + public static final String FIELD_NAME_STATUS = "status"; + public static final String FIELD_NAME_BACKPRESSURE_LEVEL = "backpressure-level"; + public static final String FIELD_NAME_END_TIMESTAMP = "end-timestamp"; + public static final String FIELD_NAME_SUBTASKS = "subtasks"; + + @JsonProperty(FIELD_NAME_STATUS) + private final String status; + + @JsonProperty(FIELD_NAME_BACKPRESSURE_LEVEL) + private final String backpressureLevel; + + @JsonProperty(FIELD_NAME_END_TIMESTAMP) + private final Long endTimestamp; + + @JsonProperty(FIELD_NAME_SUBTASKS) + private final List subtasks; + + @JsonCreator + public JobVertexBackPressureInfo( + @JsonProperty(FIELD_NAME_STATUS) String status, + @JsonProperty(FIELD_NAME_BACKPRESSURE_LEVEL) String backpressureLevel, + @JsonProperty(FIELD_NAME_END_TIMESTAMP) Long endTimestamp, + @JsonProperty(FIELD_NAME_SUBTASKS) List subtasks) { + this.status = status; + this.backpressureLevel = backpressureLevel; + this.endTimestamp = endTimestamp; + this.subtasks = subtasks; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + JobVertexBackPressureInfo that = (JobVertexBackPressureInfo) o; + return Objects.equals(status, that.status) && + Objects.equals(backpressureLevel, that.backpressureLevel) && + Objects.equals(endTimestamp, that.endTimestamp) && + Objects.equals(subtasks, that.subtasks); + } + + @Override + public int hashCode() { + return Objects.hash(status, backpressureLevel, endTimestamp, subtasks); + } + + //- + // Static helper classes + //- + + /** +* Nested class to encapsulate the sub tasks back pressure. +*/ + public static final class SubtaskBackPressureInfo { + + public static final String FIELD_NAME_STATUS = "subtask"; + public static final String FIELD_NAME_BACKPRESSURE_LEVEL = "backpressure-level"; + public static final String FIELD_NAME_RATIO = "ratio"; + + @JsonProperty(FIELD_NAME_STATUS) + private final int subtask; + + @JsonProperty(FIELD_NAME_BACKPRESSURE_LEVEL) + private final String backpressureLevel; + + @JsonProperty(FIELD_NAME_RATIO) + private final double ratio; + +
[jira] [Commented] (FLINK-7856) Port JobVertexBackPressureHandler to REST endpoint
[ https://issues.apache.org/jira/browse/FLINK-7856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16241707#comment-16241707 ] ASF GitHub Bot commented on FLINK-7856: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4893#discussion_r149305236 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexBackPressureInfo.java --- @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages; + +import org.apache.flink.runtime.rest.handler.job.JobVertexBackPressureHandler; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.List; +import java.util.Objects; + +/** + * Response type of the {@link JobVertexBackPressureHandler}. + */ +public class JobVertexBackPressureInfo implements ResponseBody { + + public static final String FIELD_NAME_STATUS = "status"; + public static final String FIELD_NAME_BACKPRESSURE_LEVEL = "backpressure-level"; + public static final String FIELD_NAME_END_TIMESTAMP = "end-timestamp"; + public static final String FIELD_NAME_SUBTASKS = "subtasks"; + + @JsonProperty(FIELD_NAME_STATUS) + private final String status; + + @JsonProperty(FIELD_NAME_BACKPRESSURE_LEVEL) + private final String backpressureLevel; + + @JsonProperty(FIELD_NAME_END_TIMESTAMP) + private final Long endTimestamp; + + @JsonProperty(FIELD_NAME_SUBTASKS) + private final List subtasks; + + @JsonCreator + public JobVertexBackPressureInfo( + @JsonProperty(FIELD_NAME_STATUS) String status, + @JsonProperty(FIELD_NAME_BACKPRESSURE_LEVEL) String backpressureLevel, + @JsonProperty(FIELD_NAME_END_TIMESTAMP) Long endTimestamp, + @JsonProperty(FIELD_NAME_SUBTASKS) List subtasks) { + this.status = status; + this.backpressureLevel = backpressureLevel; + this.endTimestamp = endTimestamp; + this.subtasks = subtasks; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + JobVertexBackPressureInfo that = (JobVertexBackPressureInfo) o; + return Objects.equals(status, that.status) && + Objects.equals(backpressureLevel, that.backpressureLevel) && + Objects.equals(endTimestamp, that.endTimestamp) && + Objects.equals(subtasks, that.subtasks); + } + + @Override + public int hashCode() { + return Objects.hash(status, backpressureLevel, endTimestamp, subtasks); + } + + //- + // Static helper classes + //- + + /** +* Nested class to encapsulate the sub tasks back pressure. +*/ + public static final class SubtaskBackPressureInfo { + + public static final String FIELD_NAME_STATUS = "subtask"; + public static final String FIELD_NAME_BACKPRESSURE_LEVEL = "backpressure-level"; + public static final String FIELD_NAME_RATIO = "ratio"; + + @JsonProperty(FIELD_NAME_STATUS) + private final int subtask; --- End diff -- is that the status or the subtask name? Adapt the field annotation accordingly. > Port JobVertexBackPressureHandler to REST endpoint > -- > >
[GitHub] flink pull request #4893: [FLINK-7856][flip6] Port JobVertexBackPressureHand...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4893#discussion_r149305236 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexBackPressureInfo.java --- @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages; + +import org.apache.flink.runtime.rest.handler.job.JobVertexBackPressureHandler; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.List; +import java.util.Objects; + +/** + * Response type of the {@link JobVertexBackPressureHandler}. + */ +public class JobVertexBackPressureInfo implements ResponseBody { + + public static final String FIELD_NAME_STATUS = "status"; + public static final String FIELD_NAME_BACKPRESSURE_LEVEL = "backpressure-level"; + public static final String FIELD_NAME_END_TIMESTAMP = "end-timestamp"; + public static final String FIELD_NAME_SUBTASKS = "subtasks"; + + @JsonProperty(FIELD_NAME_STATUS) + private final String status; + + @JsonProperty(FIELD_NAME_BACKPRESSURE_LEVEL) + private final String backpressureLevel; + + @JsonProperty(FIELD_NAME_END_TIMESTAMP) + private final Long endTimestamp; + + @JsonProperty(FIELD_NAME_SUBTASKS) + private final List subtasks; + + @JsonCreator + public JobVertexBackPressureInfo( + @JsonProperty(FIELD_NAME_STATUS) String status, + @JsonProperty(FIELD_NAME_BACKPRESSURE_LEVEL) String backpressureLevel, + @JsonProperty(FIELD_NAME_END_TIMESTAMP) Long endTimestamp, + @JsonProperty(FIELD_NAME_SUBTASKS) List subtasks) { + this.status = status; + this.backpressureLevel = backpressureLevel; + this.endTimestamp = endTimestamp; + this.subtasks = subtasks; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + JobVertexBackPressureInfo that = (JobVertexBackPressureInfo) o; + return Objects.equals(status, that.status) && + Objects.equals(backpressureLevel, that.backpressureLevel) && + Objects.equals(endTimestamp, that.endTimestamp) && + Objects.equals(subtasks, that.subtasks); + } + + @Override + public int hashCode() { + return Objects.hash(status, backpressureLevel, endTimestamp, subtasks); + } + + //- + // Static helper classes + //- + + /** +* Nested class to encapsulate the sub tasks back pressure. +*/ + public static final class SubtaskBackPressureInfo { + + public static final String FIELD_NAME_STATUS = "subtask"; + public static final String FIELD_NAME_BACKPRESSURE_LEVEL = "backpressure-level"; + public static final String FIELD_NAME_RATIO = "ratio"; + + @JsonProperty(FIELD_NAME_STATUS) + private final int subtask; --- End diff -- is that the status or the subtask name? Adapt the field annotation accordingly. ---
[jira] [Commented] (FLINK-5633) ClassCastException: X cannot be cast to X when re-submitting a job.
[ https://issues.apache.org/jira/browse/FLINK-5633?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16241701#comment-16241701 ] Erik van Oosten commented on FLINK-5633: [~StephanEwen] We need to process 130K msg/s, I guess that can be called often :) . Our process is CPU bound and parsing Avro is ±15% of that. Any improvement means we can run with fewer machines. For every message we create a new SpecificDatumReader. If I follow the code correctly that should _not_ give a large overhead. The Schema instances we pass to it _are_ cached. Then we call {SpecificDatumReader.read}} to parse each Avro message. In that call you eventually end up in {{SpecificData.newInstance}} to create a new instance of the target class. The constructor of that class is looked up in a cache. That cache is declared as {{static}}. I do not understand how instantiating a new {{SpecificData}} for every call to {{read}} helps because it would still use the same cache. The code I pasted above also uses a constructor cache but the cache is not {{static}}. Reversing the class loader order should also work. > ClassCastException: X cannot be cast to X when re-submitting a job. > --- > > Key: FLINK-5633 > URL: https://issues.apache.org/jira/browse/FLINK-5633 > Project: Flink > Issue Type: Bug > Components: Job-Submission, YARN >Affects Versions: 1.1.4 >Reporter: Giuliano Caliari >Priority: Minor > > I’m running a job on my local cluster and the first time I submit the job > everything works but whenever I cancel and re-submit the same job it fails > with: > {quote} > org.apache.flink.client.program.ProgramInvocationException: The program > execution failed: Job execution failed. > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427) > at > org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:101) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:400) > at > org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66) > at > org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:634) > at au.com.my.package.pTraitor.OneTrait.execute(Traitor.scala:147) > at > au.com.my.package.pTraitor.TraitorAppOneTrait$.delayedEndpoint$au$com$my$package$pTraitor$TraitorAppOneTrait$1(TraitorApp.scala:22) > at > au.com.my.package.pTraitor.TraitorAppOneTrait$delayedInit$body.apply(TraitorApp.scala:21) > at scala.Function0$class.apply$mcV$sp(Function0.scala:34) > at > scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12) > at scala.App$$anonfun$main$1.apply(App.scala:76) > at scala.App$$anonfun$main$1.apply(App.scala:76) > at scala.collection.immutable.List.foreach(List.scala:381) > at > scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35) > at scala.App$class.main(App.scala:76) > at > au.com.my.package.pTraitor.TraitorAppOneTrait$.main(TraitorApp.scala:21) > at au.com.my.package.pTraitor.TraitorAppOneTrait.main(TraitorApp.scala) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:528) > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:419) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:339) > at > org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:831) > at org.apache.flink.client.CliFrontend.run(CliFrontend.java:256) > at > org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1073) > at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1120) > at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1117) > at > org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:29) > at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1116) > Caused by: org.apache.flink.runtime.client.JobExecutionException: Job > execution failed. > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:900) > at >
[jira] [Commented] (FLINK-7856) Port JobVertexBackPressureHandler to REST endpoint
[ https://issues.apache.org/jira/browse/FLINK-7856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16241702#comment-16241702 ] ASF GitHub Bot commented on FLINK-7856: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4893#discussion_r149305060 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexBackPressureInfo.java --- @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages; + +import org.apache.flink.runtime.rest.handler.job.JobVertexBackPressureHandler; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.List; +import java.util.Objects; + +/** + * Response type of the {@link JobVertexBackPressureHandler}. + */ +public class JobVertexBackPressureInfo implements ResponseBody { + + public static final String FIELD_NAME_STATUS = "status"; + public static final String FIELD_NAME_BACKPRESSURE_LEVEL = "backpressure-level"; + public static final String FIELD_NAME_END_TIMESTAMP = "end-timestamp"; + public static final String FIELD_NAME_SUBTASKS = "subtasks"; + + @JsonProperty(FIELD_NAME_STATUS) + private final String status; + + @JsonProperty(FIELD_NAME_BACKPRESSURE_LEVEL) + private final String backpressureLevel; + + @JsonProperty(FIELD_NAME_END_TIMESTAMP) + private final Long endTimestamp; --- End diff -- no need for a the object type here. We can use a primitive long. > Port JobVertexBackPressureHandler to REST endpoint > -- > > Key: FLINK-7856 > URL: https://issues.apache.org/jira/browse/FLINK-7856 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination, REST, Webfrontend >Reporter: Fang Yong >Assignee: Fang Yong > > Port JobVertexBackPressureHandler to REST endpoint -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4893: [FLINK-7856][flip6] Port JobVertexBackPressureHand...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4893#discussion_r149305184 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexBackPressureInfo.java --- @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages; + +import org.apache.flink.runtime.rest.handler.job.JobVertexBackPressureHandler; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.List; +import java.util.Objects; + +/** + * Response type of the {@link JobVertexBackPressureHandler}. + */ +public class JobVertexBackPressureInfo implements ResponseBody { + + public static final String FIELD_NAME_STATUS = "status"; + public static final String FIELD_NAME_BACKPRESSURE_LEVEL = "backpressure-level"; + public static final String FIELD_NAME_END_TIMESTAMP = "end-timestamp"; + public static final String FIELD_NAME_SUBTASKS = "subtasks"; + + @JsonProperty(FIELD_NAME_STATUS) + private final String status; + + @JsonProperty(FIELD_NAME_BACKPRESSURE_LEVEL) + private final String backpressureLevel; + + @JsonProperty(FIELD_NAME_END_TIMESTAMP) + private final Long endTimestamp; + + @JsonProperty(FIELD_NAME_SUBTASKS) + private final List subtasks; + + @JsonCreator + public JobVertexBackPressureInfo( + @JsonProperty(FIELD_NAME_STATUS) String status, + @JsonProperty(FIELD_NAME_BACKPRESSURE_LEVEL) String backpressureLevel, + @JsonProperty(FIELD_NAME_END_TIMESTAMP) Long endTimestamp, + @JsonProperty(FIELD_NAME_SUBTASKS) List subtasks) { + this.status = status; + this.backpressureLevel = backpressureLevel; + this.endTimestamp = endTimestamp; + this.subtasks = subtasks; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + JobVertexBackPressureInfo that = (JobVertexBackPressureInfo) o; + return Objects.equals(status, that.status) && + Objects.equals(backpressureLevel, that.backpressureLevel) && + Objects.equals(endTimestamp, that.endTimestamp) && + Objects.equals(subtasks, that.subtasks); + } + + @Override + public int hashCode() { + return Objects.hash(status, backpressureLevel, endTimestamp, subtasks); + } + + //- + // Static helper classes + //- + + /** +* Nested class to encapsulate the sub tasks back pressure. +*/ + public static final class SubtaskBackPressureInfo { + + public static final String FIELD_NAME_STATUS = "subtask"; + public static final String FIELD_NAME_BACKPRESSURE_LEVEL = "backpressure-level"; + public static final String FIELD_NAME_RATIO = "ratio"; + + @JsonProperty(FIELD_NAME_STATUS) + private final int subtask; + + @JsonProperty(FIELD_NAME_BACKPRESSURE_LEVEL) + private final String backpressureLevel; --- End diff -- enum ---
[GitHub] flink pull request #4893: [FLINK-7856][flip6] Port JobVertexBackPressureHand...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4893#discussion_r149305450 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexBackPressureHandler.java --- @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.job; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.WebOptions; +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; +import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex; +import org.apache.flink.runtime.executiongraph.ArchivedExecutionJobVertex; +import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.handler.RestHandlerException; +import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache; +import org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureStatsTracker; +import org.apache.flink.runtime.rest.handler.legacy.backpressure.OperatorBackPressureStats; +import org.apache.flink.runtime.rest.handler.legacy.backpressure.StackTraceSampleCoordinator; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.JobVertexBackPressureInfo; +import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter; +import org.apache.flink.runtime.rest.messages.JobVertexMessageParameters; +import org.apache.flink.runtime.rest.messages.MessageHeaders; +import org.apache.flink.runtime.webmonitor.RestfulGateway; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; + +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; + +/** + * Request handler for the job vertex back pressure. + */ +public class JobVertexBackPressureHandler extends AbstractExecutionGraphHandler{ + /** Back pressure stats tracker. */ + private final BackPressureStatsTracker backPressureStatsTracker; + + /** Time after which stats are considered outdated. */ + private final int refreshInterval; + + public JobVertexBackPressureHandler( + CompletableFuture localRestAddress, + GatewayRetriever leaderRetriever, + Time timeout, + MessageHeaders messageHeaders, + ExecutionGraphCache executionGraphCache, + Executor executor, + Configuration clusterConfiguration) { + super(localRestAddress, leaderRetriever, timeout, messageHeaders, executionGraphCache, executor); + + // Back pressure stats tracker config + this.refreshInterval = clusterConfiguration.getInteger(WebOptions.BACKPRESSURE_REFRESH_INTERVAL); + this.backPressureStatsTracker = new BackPressureStatsTracker( + new StackTraceSampleCoordinator(executor, 6), + clusterConfiguration.getInteger(WebOptions.BACKPRESSURE_CLEANUP_INTERVAL), + clusterConfiguration.getInteger(WebOptions.BACKPRESSURE_NUM_SAMPLES), + Time.milliseconds(clusterConfiguration.getInteger(WebOptions.BACKPRESSURE_DELAY))); + } + + @Override + protected JobVertexBackPressureInfo handleRequest(HandlerRequest request, AccessExecutionGraph executionGraph) throws RestHandlerException { + JobVertexID jobVertexID =
[GitHub] flink pull request #4893: [FLINK-7856][flip6] Port JobVertexBackPressureHand...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4893#discussion_r149305096 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexBackPressureInfo.java --- @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages; + +import org.apache.flink.runtime.rest.handler.job.JobVertexBackPressureHandler; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.List; +import java.util.Objects; + +/** + * Response type of the {@link JobVertexBackPressureHandler}. + */ +public class JobVertexBackPressureInfo implements ResponseBody { + + public static final String FIELD_NAME_STATUS = "status"; + public static final String FIELD_NAME_BACKPRESSURE_LEVEL = "backpressure-level"; + public static final String FIELD_NAME_END_TIMESTAMP = "end-timestamp"; + public static final String FIELD_NAME_SUBTASKS = "subtasks"; + + @JsonProperty(FIELD_NAME_STATUS) + private final String status; + + @JsonProperty(FIELD_NAME_BACKPRESSURE_LEVEL) + private final String backpressureLevel; + + @JsonProperty(FIELD_NAME_END_TIMESTAMP) + private final Long endTimestamp; + + @JsonProperty(FIELD_NAME_SUBTASKS) + private final List subtasks; + + @JsonCreator + public JobVertexBackPressureInfo( + @JsonProperty(FIELD_NAME_STATUS) String status, + @JsonProperty(FIELD_NAME_BACKPRESSURE_LEVEL) String backpressureLevel, + @JsonProperty(FIELD_NAME_END_TIMESTAMP) Long endTimestamp, + @JsonProperty(FIELD_NAME_SUBTASKS) List subtasks) { + this.status = status; + this.backpressureLevel = backpressureLevel; + this.endTimestamp = endTimestamp; + this.subtasks = subtasks; --- End diff -- null checks are missing. ---
[GitHub] flink pull request #4893: [FLINK-7856][flip6] Port JobVertexBackPressureHand...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4893#discussion_r149305060 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexBackPressureInfo.java --- @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages; + +import org.apache.flink.runtime.rest.handler.job.JobVertexBackPressureHandler; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.List; +import java.util.Objects; + +/** + * Response type of the {@link JobVertexBackPressureHandler}. + */ +public class JobVertexBackPressureInfo implements ResponseBody { + + public static final String FIELD_NAME_STATUS = "status"; + public static final String FIELD_NAME_BACKPRESSURE_LEVEL = "backpressure-level"; + public static final String FIELD_NAME_END_TIMESTAMP = "end-timestamp"; + public static final String FIELD_NAME_SUBTASKS = "subtasks"; + + @JsonProperty(FIELD_NAME_STATUS) + private final String status; + + @JsonProperty(FIELD_NAME_BACKPRESSURE_LEVEL) + private final String backpressureLevel; + + @JsonProperty(FIELD_NAME_END_TIMESTAMP) + private final Long endTimestamp; --- End diff -- no need for a the object type here. We can use a primitive long. ---
[GitHub] flink pull request #4893: [FLINK-7856][flip6] Port JobVertexBackPressureHand...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4893#discussion_r149305275 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexBackPressureInfo.java --- @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages; + +import org.apache.flink.runtime.rest.handler.job.JobVertexBackPressureHandler; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.List; +import java.util.Objects; + +/** + * Response type of the {@link JobVertexBackPressureHandler}. + */ +public class JobVertexBackPressureInfo implements ResponseBody { + + public static final String FIELD_NAME_STATUS = "status"; + public static final String FIELD_NAME_BACKPRESSURE_LEVEL = "backpressure-level"; + public static final String FIELD_NAME_END_TIMESTAMP = "end-timestamp"; + public static final String FIELD_NAME_SUBTASKS = "subtasks"; + + @JsonProperty(FIELD_NAME_STATUS) + private final String status; + + @JsonProperty(FIELD_NAME_BACKPRESSURE_LEVEL) + private final String backpressureLevel; + + @JsonProperty(FIELD_NAME_END_TIMESTAMP) + private final Long endTimestamp; + + @JsonProperty(FIELD_NAME_SUBTASKS) + private final List subtasks; + + @JsonCreator + public JobVertexBackPressureInfo( + @JsonProperty(FIELD_NAME_STATUS) String status, + @JsonProperty(FIELD_NAME_BACKPRESSURE_LEVEL) String backpressureLevel, + @JsonProperty(FIELD_NAME_END_TIMESTAMP) Long endTimestamp, + @JsonProperty(FIELD_NAME_SUBTASKS) List subtasks) { + this.status = status; + this.backpressureLevel = backpressureLevel; + this.endTimestamp = endTimestamp; + this.subtasks = subtasks; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + JobVertexBackPressureInfo that = (JobVertexBackPressureInfo) o; + return Objects.equals(status, that.status) && + Objects.equals(backpressureLevel, that.backpressureLevel) && + Objects.equals(endTimestamp, that.endTimestamp) && + Objects.equals(subtasks, that.subtasks); + } + + @Override + public int hashCode() { + return Objects.hash(status, backpressureLevel, endTimestamp, subtasks); + } + + //- + // Static helper classes + //- + + /** +* Nested class to encapsulate the sub tasks back pressure. +*/ + public static final class SubtaskBackPressureInfo { + + public static final String FIELD_NAME_STATUS = "subtask"; + public static final String FIELD_NAME_BACKPRESSURE_LEVEL = "backpressure-level"; + public static final String FIELD_NAME_RATIO = "ratio"; + + @JsonProperty(FIELD_NAME_STATUS) + private final int subtask; + + @JsonProperty(FIELD_NAME_BACKPRESSURE_LEVEL) + private final String backpressureLevel; + + @JsonProperty(FIELD_NAME_RATIO) + private final double ratio; + + public SubtaskBackPressureInfo( + @JsonProperty(FIELD_NAME_STATUS) int subtask, + @JsonProperty(FIELD_NAME_BACKPRESSURE_LEVEL) String backpressureLevel, +
[GitHub] flink pull request #4893: [FLINK-7856][flip6] Port JobVertexBackPressureHand...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4893#discussion_r149304998 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexBackPressureInfo.java --- @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages; + +import org.apache.flink.runtime.rest.handler.job.JobVertexBackPressureHandler; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.List; +import java.util.Objects; + +/** + * Response type of the {@link JobVertexBackPressureHandler}. + */ +public class JobVertexBackPressureInfo implements ResponseBody { + + public static final String FIELD_NAME_STATUS = "status"; + public static final String FIELD_NAME_BACKPRESSURE_LEVEL = "backpressure-level"; + public static final String FIELD_NAME_END_TIMESTAMP = "end-timestamp"; + public static final String FIELD_NAME_SUBTASKS = "subtasks"; + + @JsonProperty(FIELD_NAME_STATUS) + private final String status; + + @JsonProperty(FIELD_NAME_BACKPRESSURE_LEVEL) + private final String backpressureLevel; --- End diff -- Same here, this should be an enum. ---
[jira] [Commented] (FLINK-7856) Port JobVertexBackPressureHandler to REST endpoint
[ https://issues.apache.org/jira/browse/FLINK-7856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16241700#comment-16241700 ] ASF GitHub Bot commented on FLINK-7856: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4893#discussion_r149304998 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexBackPressureInfo.java --- @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages; + +import org.apache.flink.runtime.rest.handler.job.JobVertexBackPressureHandler; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.List; +import java.util.Objects; + +/** + * Response type of the {@link JobVertexBackPressureHandler}. + */ +public class JobVertexBackPressureInfo implements ResponseBody { + + public static final String FIELD_NAME_STATUS = "status"; + public static final String FIELD_NAME_BACKPRESSURE_LEVEL = "backpressure-level"; + public static final String FIELD_NAME_END_TIMESTAMP = "end-timestamp"; + public static final String FIELD_NAME_SUBTASKS = "subtasks"; + + @JsonProperty(FIELD_NAME_STATUS) + private final String status; + + @JsonProperty(FIELD_NAME_BACKPRESSURE_LEVEL) + private final String backpressureLevel; --- End diff -- Same here, this should be an enum. > Port JobVertexBackPressureHandler to REST endpoint > -- > > Key: FLINK-7856 > URL: https://issues.apache.org/jira/browse/FLINK-7856 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination, REST, Webfrontend >Reporter: Fang Yong >Assignee: Fang Yong > > Port JobVertexBackPressureHandler to REST endpoint -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7856) Port JobVertexBackPressureHandler to REST endpoint
[ https://issues.apache.org/jira/browse/FLINK-7856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16241705#comment-16241705 ] ASF GitHub Bot commented on FLINK-7856: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4893#discussion_r149305184 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexBackPressureInfo.java --- @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages; + +import org.apache.flink.runtime.rest.handler.job.JobVertexBackPressureHandler; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.List; +import java.util.Objects; + +/** + * Response type of the {@link JobVertexBackPressureHandler}. + */ +public class JobVertexBackPressureInfo implements ResponseBody { + + public static final String FIELD_NAME_STATUS = "status"; + public static final String FIELD_NAME_BACKPRESSURE_LEVEL = "backpressure-level"; + public static final String FIELD_NAME_END_TIMESTAMP = "end-timestamp"; + public static final String FIELD_NAME_SUBTASKS = "subtasks"; + + @JsonProperty(FIELD_NAME_STATUS) + private final String status; + + @JsonProperty(FIELD_NAME_BACKPRESSURE_LEVEL) + private final String backpressureLevel; + + @JsonProperty(FIELD_NAME_END_TIMESTAMP) + private final Long endTimestamp; + + @JsonProperty(FIELD_NAME_SUBTASKS) + private final List subtasks; + + @JsonCreator + public JobVertexBackPressureInfo( + @JsonProperty(FIELD_NAME_STATUS) String status, + @JsonProperty(FIELD_NAME_BACKPRESSURE_LEVEL) String backpressureLevel, + @JsonProperty(FIELD_NAME_END_TIMESTAMP) Long endTimestamp, + @JsonProperty(FIELD_NAME_SUBTASKS) List subtasks) { + this.status = status; + this.backpressureLevel = backpressureLevel; + this.endTimestamp = endTimestamp; + this.subtasks = subtasks; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + JobVertexBackPressureInfo that = (JobVertexBackPressureInfo) o; + return Objects.equals(status, that.status) && + Objects.equals(backpressureLevel, that.backpressureLevel) && + Objects.equals(endTimestamp, that.endTimestamp) && + Objects.equals(subtasks, that.subtasks); + } + + @Override + public int hashCode() { + return Objects.hash(status, backpressureLevel, endTimestamp, subtasks); + } + + //- + // Static helper classes + //- + + /** +* Nested class to encapsulate the sub tasks back pressure. +*/ + public static final class SubtaskBackPressureInfo { + + public static final String FIELD_NAME_STATUS = "subtask"; + public static final String FIELD_NAME_BACKPRESSURE_LEVEL = "backpressure-level"; + public static final String FIELD_NAME_RATIO = "ratio"; + + @JsonProperty(FIELD_NAME_STATUS) + private final int subtask; + + @JsonProperty(FIELD_NAME_BACKPRESSURE_LEVEL) + private final String backpressureLevel; --- End diff -- enum > Port JobVertexBackPressureHandler to REST endpoint >
[jira] [Commented] (FLINK-7856) Port JobVertexBackPressureHandler to REST endpoint
[ https://issues.apache.org/jira/browse/FLINK-7856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16241703#comment-16241703 ] ASF GitHub Bot commented on FLINK-7856: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4893#discussion_r149305096 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexBackPressureInfo.java --- @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages; + +import org.apache.flink.runtime.rest.handler.job.JobVertexBackPressureHandler; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.List; +import java.util.Objects; + +/** + * Response type of the {@link JobVertexBackPressureHandler}. + */ +public class JobVertexBackPressureInfo implements ResponseBody { + + public static final String FIELD_NAME_STATUS = "status"; + public static final String FIELD_NAME_BACKPRESSURE_LEVEL = "backpressure-level"; + public static final String FIELD_NAME_END_TIMESTAMP = "end-timestamp"; + public static final String FIELD_NAME_SUBTASKS = "subtasks"; + + @JsonProperty(FIELD_NAME_STATUS) + private final String status; + + @JsonProperty(FIELD_NAME_BACKPRESSURE_LEVEL) + private final String backpressureLevel; + + @JsonProperty(FIELD_NAME_END_TIMESTAMP) + private final Long endTimestamp; + + @JsonProperty(FIELD_NAME_SUBTASKS) + private final List subtasks; + + @JsonCreator + public JobVertexBackPressureInfo( + @JsonProperty(FIELD_NAME_STATUS) String status, + @JsonProperty(FIELD_NAME_BACKPRESSURE_LEVEL) String backpressureLevel, + @JsonProperty(FIELD_NAME_END_TIMESTAMP) Long endTimestamp, + @JsonProperty(FIELD_NAME_SUBTASKS) List subtasks) { + this.status = status; + this.backpressureLevel = backpressureLevel; + this.endTimestamp = endTimestamp; + this.subtasks = subtasks; --- End diff -- null checks are missing. > Port JobVertexBackPressureHandler to REST endpoint > -- > > Key: FLINK-7856 > URL: https://issues.apache.org/jira/browse/FLINK-7856 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination, REST, Webfrontend >Reporter: Fang Yong >Assignee: Fang Yong > > Port JobVertexBackPressureHandler to REST endpoint -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4893: [FLINK-7856][flip6] Port JobVertexBackPressureHand...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4893#discussion_r149304880 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexBackPressureInfo.java --- @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages; + +import org.apache.flink.runtime.rest.handler.job.JobVertexBackPressureHandler; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.List; +import java.util.Objects; + +/** + * Response type of the {@link JobVertexBackPressureHandler}. + */ +public class JobVertexBackPressureInfo implements ResponseBody { + + public static final String FIELD_NAME_STATUS = "status"; + public static final String FIELD_NAME_BACKPRESSURE_LEVEL = "backpressure-level"; + public static final String FIELD_NAME_END_TIMESTAMP = "end-timestamp"; + public static final String FIELD_NAME_SUBTASKS = "subtasks"; + + @JsonProperty(FIELD_NAME_STATUS) + private final String status; --- End diff -- This should be an enum, I think. ---
[GitHub] flink issue #4893: [FLINK-7856][flip6] Port JobVertexBackPressureHandler to ...
Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/4893 I think the plan sounds good @zjureel. We should however not do it in this PR. Thus, I would suggest that we create a `JobVertexBackPressureHandler` which directly inherits from `AbstractRestHandler` and for the time being returns an empty `JobVertexBackPressureInfo`. Then I would add the `BackPressureStatsTracker` to the `JobMaster` in a separate PR. What do you think? ---
[jira] [Commented] (FLINK-7856) Port JobVertexBackPressureHandler to REST endpoint
[ https://issues.apache.org/jira/browse/FLINK-7856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16241698#comment-16241698 ] ASF GitHub Bot commented on FLINK-7856: --- Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/4893 I think the plan sounds good @zjureel. We should however not do it in this PR. Thus, I would suggest that we create a `JobVertexBackPressureHandler` which directly inherits from `AbstractRestHandler` and for the time being returns an empty `JobVertexBackPressureInfo`. Then I would add the `BackPressureStatsTracker` to the `JobMaster` in a separate PR. What do you think? > Port JobVertexBackPressureHandler to REST endpoint > -- > > Key: FLINK-7856 > URL: https://issues.apache.org/jira/browse/FLINK-7856 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination, REST, Webfrontend >Reporter: Fang Yong >Assignee: Fang Yong > > Port JobVertexBackPressureHandler to REST endpoint -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7856) Port JobVertexBackPressureHandler to REST endpoint
[ https://issues.apache.org/jira/browse/FLINK-7856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16241699#comment-16241699 ] ASF GitHub Bot commented on FLINK-7856: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4893#discussion_r149304880 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexBackPressureInfo.java --- @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages; + +import org.apache.flink.runtime.rest.handler.job.JobVertexBackPressureHandler; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.List; +import java.util.Objects; + +/** + * Response type of the {@link JobVertexBackPressureHandler}. + */ +public class JobVertexBackPressureInfo implements ResponseBody { + + public static final String FIELD_NAME_STATUS = "status"; + public static final String FIELD_NAME_BACKPRESSURE_LEVEL = "backpressure-level"; + public static final String FIELD_NAME_END_TIMESTAMP = "end-timestamp"; + public static final String FIELD_NAME_SUBTASKS = "subtasks"; + + @JsonProperty(FIELD_NAME_STATUS) + private final String status; --- End diff -- This should be an enum, I think. > Port JobVertexBackPressureHandler to REST endpoint > -- > > Key: FLINK-7856 > URL: https://issues.apache.org/jira/browse/FLINK-7856 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination, REST, Webfrontend >Reporter: Fang Yong >Assignee: Fang Yong > > Port JobVertexBackPressureHandler to REST endpoint -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7076) Implement container release to support dynamic scaling
[ https://issues.apache.org/jira/browse/FLINK-7076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16241696#comment-16241696 ] ASF GitHub Bot commented on FLINK-7076: --- Github user suez1224 commented on a diff in the pull request: https://github.com/apache/flink/pull/4729#discussion_r149304158 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnWorkerNode.java --- @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.clusterframework.types.ResourceIDRetrievable; + +import org.apache.hadoop.yarn.api.records.Container; + +import java.io.Serializable; + +/** + * A stored YARN worker, which contains the YARN container. + */ +public class YarnWorkerNode implements ResourceIDRetrievable, Serializable { --- End diff -- @tillrohrmann But in ResourceManager.java, WorkerType has to be Serializable (i.e., "ResourceManager"). Do you have any recommendation? > Implement container release to support dynamic scaling > -- > > Key: FLINK-7076 > URL: https://issues.apache.org/jira/browse/FLINK-7076 > Project: Flink > Issue Type: Sub-task > Components: ResourceManager >Reporter: Till Rohrmann >Assignee: Shuyi Chen > Labels: flip-6 > > In order to support dynamic scaling, the {{YarnResourceManager}} has to be > able to dynamically free containers. We have to implement the > {{YarnResourceManager#stopWorker}} method. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4729: [FLINK-7076] [ResourceManager] implement YARN stop...
Github user suez1224 commented on a diff in the pull request: https://github.com/apache/flink/pull/4729#discussion_r149304158 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnWorkerNode.java --- @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.clusterframework.types.ResourceIDRetrievable; + +import org.apache.hadoop.yarn.api.records.Container; + +import java.io.Serializable; + +/** + * A stored YARN worker, which contains the YARN container. + */ +public class YarnWorkerNode implements ResourceIDRetrievable, Serializable { --- End diff -- @tillrohrmann But in ResourceManager.java, WorkerType has to be Serializable (i.e., "ResourceManager"). Do you have any recommendation? ---
[GitHub] flink issue #4949: [FLINK-7866] [runtime] Weigh list of preferred locations ...
Github user sihuazhou commented on the issue: https://github.com/apache/flink/pull/4949 Hi, @tillrohrmann Could you please have a rough look at this ? I'm not sure this is you wanted. ---
[jira] [Commented] (FLINK-7866) Weigh list of preferred locations for scheduling
[ https://issues.apache.org/jira/browse/FLINK-7866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16241691#comment-16241691 ] ASF GitHub Bot commented on FLINK-7866: --- Github user sihuazhou commented on the issue: https://github.com/apache/flink/pull/4949 Hi, @tillrohrmann Could you please have a rough look at this ? I'm not sure this is you wanted. > Weigh list of preferred locations for scheduling > > > Key: FLINK-7866 > URL: https://issues.apache.org/jira/browse/FLINK-7866 > Project: Flink > Issue Type: Improvement > Components: Scheduler >Affects Versions: 1.4.0, 1.3.2 >Reporter: Till Rohrmann >Assignee: Sihua Zhou > Fix For: 1.5.0 > > > [~sihuazhou] proposed to not only use the list of preferred locations to > decide where to schedule a task, but to also weigh the list according to how > often a location appeared and then select the location based on the weight. > That way, we would obtain better locality in some cases. > Example: > Preferred locations list: {{[location1, location2, location2]}} > Weighted preferred locations list {{[(location2 , 2), (location1, 1)]}} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4960: Update version to 1.5-SNAPSHOT
Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/4960 +1 ---
[GitHub] flink issue #4935: [Flink-7945][Metrics]Fix per partition-lag metr...
Github user Aitozi commented on the issue: https://github.com/apache/flink/pull/4935 Hi @tzulitai , could you take look at this again :-) ? ---
[jira] [Commented] (FLINK-7652) Port CurrentJobIdsHandler to new REST endpoint
[ https://issues.apache.org/jira/browse/FLINK-7652?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16241669#comment-16241669 ] ASF GitHub Bot commented on FLINK-7652: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/4734 @tillrohrmann yes, reusing `DispatcherGateway#requestJobDetails` was an option that did occur to me but was also not sure of the redundant cost. But sure, we can avoid adding yet another RPC for now. Thanks for the branch link. I'll rebase onto that. > Port CurrentJobIdsHandler to new REST endpoint > -- > > Key: FLINK-7652 > URL: https://issues.apache.org/jira/browse/FLINK-7652 > Project: Flink > Issue Type: Sub-task > Components: REST, Webfrontend >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai > Labels: flip-6 > Fix For: 1.4.0 > > > Port existing {{CurrentJobIdsHandler}} to new REST endpoint -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4734: [FLINK-7652] [flip6] Port CurrentJobIdsHandler to new RES...
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/4734 @tillrohrmann yes, reusing `DispatcherGateway#requestJobDetails` was an option that did occur to me but was also not sure of the redundant cost. But sure, we can avoid adding yet another RPC for now. Thanks for the branch link. I'll rebase onto that. ---