[GitHub] [flink] flinkbot commented on issue #11147: [FLINK-15717] Update StatefulJobSavepointMigrationITCase.scala to restore from 1.10 savepoint

2020-02-19 Thread GitBox
flinkbot commented on issue #11147: [FLINK-15717] Update 
StatefulJobSavepointMigrationITCase.scala to restore from 1.10 savepoint
URL: https://github.com/apache/flink/pull/11147#issuecomment-588710322
 
 
   
   ## CI report:
   
   * 8127feb053c5d657b8453a7ef02c65da627d037c UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot commented on issue #11148: [FLINK-16180][runtime] Replace the nullable vertexExecution in ScheduledUnit with a non-null executionVertexId

2020-02-19 Thread GitBox
flinkbot commented on issue #11148: [FLINK-16180][runtime] Replace the nullable 
vertexExecution in ScheduledUnit with a non-null executionVertexId
URL: https://github.com/apache/flink/pull/11148#issuecomment-588710480
 
 
   
   ## CI report:
   
   * 9cf340d95807d82066316548ca6e503f8fd731e8 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink-statefun] tzulitai opened a new pull request #28: [FLINK-16159] [tests, build] Add verification integration test + integrate with Maven build

2020-02-19 Thread GitBox
tzulitai opened a new pull request #28: [FLINK-16159] [tests, build] Add 
verification integration test + integrate with Maven build
URL: https://github.com/apache/flink-statefun/pull/28
 
 
   This PR achieves the following goals:
   - Adds a `statefun-integration-tests` module that is meant to maintain 
integration tests in the future.
   - Adds `statefun-sanity-itcase` to the integration tests module, which 
includes a simple application used for sanity verification and a JUnit-driven 
test that performs the verification over a docker setup.
   - Adds a Maven build profile so that the integration tests can be run with a 
simple command: `mvn clean verify -Prun-integration-tests`
   
   ---
   
   ### The verification app and test
   
   The verification app (`SanityVerificationModule`) is a simple application 
used for sanity verification.
   - The application reads commands from a Kafka ingress
   - Has multiple functions binded (currently 2 in this PR) that reacts to the 
commands (see class-level Javadoc) of `SanityVerificationModule` for a full 
description on the set of commands)
   - Reflects any state updates in the functions back to a Kafka egress.
   
   The Junit-driven test does the following:
   - Uses Testcontainers (https://www.testcontainers.org/) to start a Kafka 
broker, builds the image for the verification app on the fly and also starts a 
master + worker container for the app.
   - Writes some commands to the Kafka ingress topic
   - Reads state outputs for the Kafka egress topic, and verifies that they are 
correct
   
   Right now the test scenario does not have any randomization to it.
   We may consider to add that in the future as a follow-up.
   
   ---
   
   ### Maven `run-integration-tests` build profile and structure setup
   
   With this PR, to add new container-based integration tests in the future, a 
developer has to do the following:
   - Add a new sub-module under `statefun-integration-tests`
   - Add source code for the Stateful Functions application to test against
   - Add a test class named with the pattern `*ITCase` that setups the 
containers using testcontainers, and implements the test verification logic as 
a JUnit test. The classname pattern is important because only then would it be 
skipped by the unit test phase, and only kicks in with the 
`run-integration-tests` profile.
   
   To build while also running the integration tests, one simply does:
   `mvn clean verify -Prun-integration-tests` in the project root directory.
   
   This always re-builds the base Stateful Functions image before running the 
ITCases.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-16159) Add simple end-to-end test for Stateful Functions using testcontainers

2020-02-19 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16159?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-16159:
---
Labels: pull-request-available  (was: )

> Add simple end-to-end test for Stateful Functions using testcontainers
> --
>
> Key: FLINK-16159
> URL: https://issues.apache.org/jira/browse/FLINK-16159
> Project: Flink
>  Issue Type: Improvement
>  Components: Stateful Functions, Test Infrastructure
>Affects Versions: statefun-1.1
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Major
>  Labels: pull-request-available
>
> Often in our review process of changes to the Stateful Functions project, we 
> often end up wanting to do a simple sanity check by running the simple 
> greeter example with docker-compose.
> Ideally, this should really be an end-to-end verification program that:
> * Starts a Kafka broker and a simple stateful function application that has a 
> Kafka ingress and egress
> * Uses the Kafka Java client to write some inputs to the broker, reads and 
> verifies output
> With testcontainers (https://www.testcontainers.org/) this might even be 
> achievable as a Junit test, which runs only after the Maven packaging phase.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot edited a comment on issue #11146: [FLINK-15718] Update StatefulJobSavepointMigrationITCase to restore from 1.10 savepoint

2020-02-19 Thread GitBox
flinkbot edited a comment on issue #11146: [FLINK-15718] Update 
StatefulJobSavepointMigrationITCase to restore from 1.10 savepoint
URL: https://github.com/apache/flink/pull/11146#issuecomment-588660647
 
 
   
   ## CI report:
   
   * 6d0d8f47466cf61ec549d85627e0349550e2433d Travis: 
[PENDING](https://travis-ci.com/flink-ci/flink/builds/149768484) 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot commented on issue #11151: [FLINK-16128][docs]Translate "Google Cloud PubSub" page into Chinese

2020-02-19 Thread GitBox
flinkbot commented on issue #11151: [FLINK-16128][docs]Translate "Google Cloud 
PubSub" page into Chinese
URL: https://github.com/apache/flink/pull/11151#issuecomment-588705948
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Automated Checks
   Last check on commit 89d26f578174d90ce9979f076346376bf3597a5b (Thu Feb 20 
07:51:39 UTC 2020)
   
✅no warnings
   
   Mention the bot in a comment to re-run the automated checks.
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full 
explanation of the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-16128) Translate "Google Cloud PubSub" page into Chinese

2020-02-19 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16128?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-16128:
---
Labels: pull-request-available  (was: )

> Translate "Google Cloud PubSub" page into Chinese
> -
>
> Key: FLINK-16128
> URL: https://issues.apache.org/jira/browse/FLINK-16128
> Project: Flink
>  Issue Type: Sub-task
>  Components: chinese-translation, Documentation
>Reporter: Jark Wu
>Assignee: Xuannan Su
>Priority: Major
>  Labels: pull-request-available
>
> The page url is 
> https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/connectors/pubsub.html
> The markdown file is located in flink/docs/dev/connectors/pubsub.zh.md



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] Sxnan opened a new pull request #11151: [FLINK-16128][docs]Translate "Google Cloud PubSub" page into Chinese

2020-02-19 Thread GitBox
Sxnan opened a new pull request #11151: [FLINK-16128][docs]Translate "Google 
Cloud PubSub" page into Chinese
URL: https://github.com/apache/flink/pull/11151
 
 
   ## What is the purpose of the change
   
   This pull request translates [Google Cloud 
PubSub](https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/connectors/pubsub.html)
 page of connectors into Chinese
   
   
   ## Brief change log
   
   - Translate the markdown file located at 
flink/docs/dev/connectors/pubsub.zh.md
   
   
   ## Verifying this change
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? no
 - If yes, how is the feature documented? not applicable
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot commented on issue #11149: [hotfix][table-api] Fixed ExpressionResolver rules modifiers

2020-02-19 Thread GitBox
flinkbot commented on issue #11149: [hotfix][table-api] Fixed 
ExpressionResolver rules modifiers
URL: https://github.com/apache/flink/pull/11149#issuecomment-588698826
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Automated Checks
   Last check on commit 306149eb700f0baa2d75c1e9971cbce50b273618 (Thu Feb 20 
07:46:34 UTC 2020)
   
   **Warnings:**
* No documentation files were touched! Remember to keep the Flink docs up 
to date!
   
   
   Mention the bot in a comment to re-run the automated checks.
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full 
explanation of the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot commented on issue #11150: [FLINK-15721] Update StatefulJobWBroadcastStateMigrationITCase to restore from 1.10 savepoint

2020-02-19 Thread GitBox
flinkbot commented on issue #11150: [FLINK-15721] Update 
StatefulJobWBroadcastStateMigrationITCase to restore from 1.10 savepoint
URL: https://github.com/apache/flink/pull/11150#issuecomment-588698772
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Automated Checks
   Last check on commit 8069d0cf582c214aedad280a56871be754a633ee (Thu Feb 20 
07:46:31 UTC 2020)
   
   **Warnings:**
* No documentation files were touched! Remember to keep the Flink docs up 
to date!
* **This pull request references an unassigned [Jira 
ticket](https://issues.apache.org/jira/browse/FLINK-15721).** According to the 
[code contribution 
guide](https://flink.apache.org/contributing/contribute-code.html), tickets 
need to be assigned before starting with the implementation work.
   
   
   Mention the bot in a comment to re-run the automated checks.
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full 
explanation of the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] yanghua opened a new pull request #11150: [FLINK-15721] Update StatefulJobWBroadcastStateMigrationITCase to restore from 1.10 savepoint

2020-02-19 Thread GitBox
yanghua opened a new pull request #11150: [FLINK-15721] Update 
StatefulJobWBroadcastStateMigrationITCase to restore from 1.10 savepoint
URL: https://github.com/apache/flink/pull/11150
 
 
   
   
   ## What is the purpose of the change
   
   * This pull request updates StatefulJobWBroadcastStateMigrationITCase to 
restore from 1.10 savepoint*
   
   
   ## Brief change log
   
 - *Update StatefulJobWBroadcastStateMigrationITCase to restore from 1.10 
savepoint*
   
   
   ## Verifying this change
   
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / **no**)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
 - The serializers: (yes / **no** / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / **no** 
/ don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
 - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / **no**)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ **not documented**)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-15721) Update StatefulJobWBroadcastStateMigrationITCase to restore from 1.10 savepoint

2020-02-19 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15721?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-15721:
---
Labels: pull-request-available  (was: )

> Update StatefulJobWBroadcastStateMigrationITCase to restore from 1.10 
> savepoint
> ---
>
> Key: FLINK-15721
> URL: https://issues.apache.org/jira/browse/FLINK-15721
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.11.0
>Reporter: vinoyang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.11.0
>
>
> Update {{StatefulJobWBroadcastStateMigrationITCase}} to restore from 1.10 
> savepoint.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode

2020-02-19 Thread GitBox
zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] 
Let Netty use Flink's buffers directly in credit-based mode
URL: https://github.com/apache/flink/pull/7368#discussion_r381826952
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/BufferResponseDecoderDelegate.java
 ##
 @@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.netty;
+
+import org.apache.flink.runtime.io.network.NetworkClientHandler;
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
+
+import static 
org.apache.flink.runtime.io.network.netty.NettyMessage.BufferResponse;
+
+/**
+ * The parser for {@link 
org.apache.flink.runtime.io.network.netty.NettyMessage.BufferResponse}.
+ */
+public class BufferResponseDecoderDelegate implements 
NettyMessageDecoderDelegate {
+
+   /** The network client handler of current channel. */
+   private final NetworkClientHandler networkClientHandler;
+
+   /** The Flink Buffer allocator. */
+   private final NetworkBufferAllocator allocator;
+
+   /** The cumulation buffer of message header. */
+   private ByteBuf messageHeaderBuffer;
+
+   /**
+* The current BufferResponse message that are process the buffer part.
+* If it is null, we are still processing the message header part, 
otherwise
+* we are processing the buffer part.
+*/
+   private BufferResponse currentResponse;
+
+   /** How much bytes have been received or discarded for the buffer part. 
*/
+   private int decodedBytesOfBuffer;
+
+   public BufferResponseDecoderDelegate(NetworkClientHandler 
networkClientHandler) {
+   this.networkClientHandler = networkClientHandler;
 
 Review comment:
   Do not need this class-level var based on the 
[comment](https://github.com/apache/flink/pull/7368/files#r381770789), and we 
can pass the `NetworkBufferAllocator` as constructor argument directly.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] dawidwys opened a new pull request #11149: [hotfix][table-api] Fixed ExpressionResolver rules modifiers

2020-02-19 Thread GitBox
dawidwys opened a new pull request #11149: [hotfix][table-api] Fixed 
ExpressionResolver rules modifiers
URL: https://github.com/apache/flink/pull/11149
 
 
   It is a trivial hotfix.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode

2020-02-19 Thread GitBox
zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] 
Let Netty use Flink's buffers directly in credit-based mode
URL: https://github.com/apache/flink/pull/7368#discussion_r381827789
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/BufferResponseDecoderDelegate.java
 ##
 @@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.netty;
+
+import org.apache.flink.runtime.io.network.NetworkClientHandler;
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
+
+import static 
org.apache.flink.runtime.io.network.netty.NettyMessage.BufferResponse;
+
+/**
+ * The parser for {@link 
org.apache.flink.runtime.io.network.netty.NettyMessage.BufferResponse}.
+ */
+public class BufferResponseDecoderDelegate implements 
NettyMessageDecoderDelegate {
+
+   /** The network client handler of current channel. */
+   private final NetworkClientHandler networkClientHandler;
+
+   /** The Flink Buffer allocator. */
+   private final NetworkBufferAllocator allocator;
+
+   /** The cumulation buffer of message header. */
+   private ByteBuf messageHeaderBuffer;
+
+   /**
+* The current BufferResponse message that are process the buffer part.
+* If it is null, we are still processing the message header part, 
otherwise
+* we are processing the buffer part.
+*/
+   private BufferResponse currentResponse;
+
+   /** How much bytes have been received or discarded for the buffer part. 
*/
+   private int decodedBytesOfBuffer;
+
+   public BufferResponseDecoderDelegate(NetworkClientHandler 
networkClientHandler) {
 
 Review comment:
   nit: package public


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode

2020-02-19 Thread GitBox
zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] 
Let Netty use Flink's buffers directly in credit-based mode
URL: https://github.com/apache/flink/pull/7368#discussion_r381827385
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/BufferResponseDecoderDelegate.java
 ##
 @@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.netty;
+
+import org.apache.flink.runtime.io.network.NetworkClientHandler;
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
+
+import static 
org.apache.flink.runtime.io.network.netty.NettyMessage.BufferResponse;
+
+/**
+ * The parser for {@link 
org.apache.flink.runtime.io.network.netty.NettyMessage.BufferResponse}.
+ */
+public class BufferResponseDecoderDelegate implements 
NettyMessageDecoderDelegate {
+
+   /** The network client handler of current channel. */
+   private final NetworkClientHandler networkClientHandler;
+
+   /** The Flink Buffer allocator. */
+   private final NetworkBufferAllocator allocator;
+
+   /** The cumulation buffer of message header. */
+   private ByteBuf messageHeaderBuffer;
+
+   /**
+* The current BufferResponse message that are process the buffer part.
+* If it is null, we are still processing the message header part, 
otherwise
+* we are processing the buffer part.
+*/
+   private BufferResponse currentResponse;
+
+   /** How much bytes have been received or discarded for the buffer part. 
*/
+   private int decodedBytesOfBuffer;
+
+   public BufferResponseDecoderDelegate(NetworkClientHandler 
networkClientHandler) {
+   this.networkClientHandler = networkClientHandler;
+   this.allocator = new 
NetworkBufferAllocator(networkClientHandler);
+   }
+
+   @Override
+   public void onChannelActive(ChannelHandlerContext ctx) {
+   messageHeaderBuffer = 
ctx.alloc().directBuffer(BufferResponse.MESSAGE_HEADER_LENGTH);
+   }
+
+   @Override
+   public void startParsingMessage(int msgId, int messageLength) {
+   currentResponse = null;
+   decodedBytesOfBuffer = 0;
+
+   messageHeaderBuffer.clear();
+   }
+
+   @Override
+   public ParseResult onChannelRead(ByteBuf data) throws Exception {
+   if (currentResponse == null) {
+   ByteBuf toDecode = 
ByteBufUtils.cumulate(messageHeaderBuffer, data, 
BufferResponse.MESSAGE_HEADER_LENGTH);
 
 Review comment:
   nit: maybe further import `BufferResponse.MESSAGE_HEADER_LENGTH` before 
class.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode

2020-02-19 Thread GitBox
zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] 
Let Netty use Flink's buffers directly in credit-based mode
URL: https://github.com/apache/flink/pull/7368#discussion_r381826952
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/BufferResponseDecoderDelegate.java
 ##
 @@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.netty;
+
+import org.apache.flink.runtime.io.network.NetworkClientHandler;
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
+
+import static 
org.apache.flink.runtime.io.network.netty.NettyMessage.BufferResponse;
+
+/**
+ * The parser for {@link 
org.apache.flink.runtime.io.network.netty.NettyMessage.BufferResponse}.
+ */
+public class BufferResponseDecoderDelegate implements 
NettyMessageDecoderDelegate {
+
+   /** The network client handler of current channel. */
+   private final NetworkClientHandler networkClientHandler;
+
+   /** The Flink Buffer allocator. */
+   private final NetworkBufferAllocator allocator;
+
+   /** The cumulation buffer of message header. */
+   private ByteBuf messageHeaderBuffer;
+
+   /**
+* The current BufferResponse message that are process the buffer part.
+* If it is null, we are still processing the message header part, 
otherwise
+* we are processing the buffer part.
+*/
+   private BufferResponse currentResponse;
+
+   /** How much bytes have been received or discarded for the buffer part. 
*/
+   private int decodedBytesOfBuffer;
+
+   public BufferResponseDecoderDelegate(NetworkClientHandler 
networkClientHandler) {
+   this.networkClientHandler = networkClientHandler;
 
 Review comment:
   Do not need this class-level var based on the 
[comment](https://github.com/apache/flink/pull/7368/files#r381770789)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] HuangZhenQiu commented on issue #10980: [FLINK-12343][flink-yarn] add yarn file replication configuration

2020-02-19 Thread GitBox
HuangZhenQiu commented on issue #10980: [FLINK-12343][flink-yarn] add yarn file 
replication configuration
URL: https://github.com/apache/flink/pull/10980#issuecomment-588680293
 
 
   @TisonKun 
   Would you please take one more round of look?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-16180) Replacing vertexExecution in ScheduledUnit with executionVertexID

2020-02-19 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16180?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu updated FLINK-16180:

Description: 
{{ScheduledUnit#vertexExecution}} is nullable but {{ProgrammedSlotProvider}} 
requires it to be non-null to work. This makes {{ProgrammedSlotProvider}} not 
able to be used by new scheduler tests since {{vertexExecution}} is never set 
in the new scheduler code path. It blocks us from reworking tests which are 
based legacy scheduling to base on the new scheduler.

Besides that, there are 2 other problems caused by the nullable vertexExecution:
1. The log printed in SchedulerImpl#allocateSlotInternal(...) may contain no 
useful info since the vertexExecution can be null.
2. NPE issue reported in FLINK-16145.

Thus I would propose to replace the nullable vertexExecution with a non-null 
executionVertexID.


  was:
{{ScheduledUnit#vertexExecution}} is nullable but {{ProgrammedSlotProvider}} 
requires it to be non-null to work. This makes {{ProgrammedSlotProvider}} not 
able to be used by new scheduler tests since {{vertexExecution}} is never set 
in the new scheduler code path. It blocks us from reworking tests which are 
based legacy scheduling to base on the new scheduler.

I would propose to replace the nullable vertexExecution with a non-null 
executionVertexID.

This change would not break anything since {{vertexExecution}} is mainly used 
by {{ProgrammedSlotProvider}} for testing. {{ProgrammedSlotProvider}} uses it 
to retrieve the JobVertexID and subtaskIndex. 
The only other place where {{ScheduledUnit#vertexExecution}} is used is to log 
the involved task for slot allocation in 
{{SchedulerImpl#allocateSlotInternal(...)}}. The log is problematic at the 
moment with the new scheduler since the vertexExecution is null. This change 
can fix the problematic log.

This change would also fix a NPE issue reported in FLINK-16145.



> Replacing vertexExecution in ScheduledUnit with executionVertexID
> -
>
> Key: FLINK-16180
> URL: https://issues.apache.org/jira/browse/FLINK-16180
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Affects Versions: 1.11.0
>Reporter: Zhu Zhu
>Assignee: Zhu Zhu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.11.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> {{ScheduledUnit#vertexExecution}} is nullable but {{ProgrammedSlotProvider}} 
> requires it to be non-null to work. This makes {{ProgrammedSlotProvider}} not 
> able to be used by new scheduler tests since {{vertexExecution}} is never set 
> in the new scheduler code path. It blocks us from reworking tests which are 
> based legacy scheduling to base on the new scheduler.
> Besides that, there are 2 other problems caused by the nullable 
> vertexExecution:
> 1. The log printed in SchedulerImpl#allocateSlotInternal(...) may contain no 
> useful info since the vertexExecution can be null.
> 2. NPE issue reported in FLINK-16145.
> Thus I would propose to replace the nullable vertexExecution with a non-null 
> executionVertexID.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot commented on issue #11148: [FLINK-16180][runtime] Replace the nullable vertexExecution in ScheduledUnit with a non-null executionVertexId

2020-02-19 Thread GitBox
flinkbot commented on issue #11148: [FLINK-16180][runtime] Replace the nullable 
vertexExecution in ScheduledUnit with a non-null executionVertexId
URL: https://github.com/apache/flink/pull/11148#issuecomment-588668558
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Automated Checks
   Last check on commit 9cf340d95807d82066316548ca6e503f8fd731e8 (Thu Feb 20 
07:24:56 UTC 2020)
   
   **Warnings:**
* No documentation files were touched! Remember to keep the Flink docs up 
to date!
   
   
   Mention the bot in a comment to re-run the automated checks.
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full 
explanation of the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-16180) Replacing vertexExecution in ScheduledUnit with executionVertexID

2020-02-19 Thread Zhu Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16180?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17040713#comment-17040713
 ] 

Zhu Zhu commented on FLINK-16180:
-

cc [~gjy] [~trohrmann]

> Replacing vertexExecution in ScheduledUnit with executionVertexID
> -
>
> Key: FLINK-16180
> URL: https://issues.apache.org/jira/browse/FLINK-16180
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Affects Versions: 1.11.0
>Reporter: Zhu Zhu
>Assignee: Zhu Zhu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.11.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> {{ScheduledUnit#vertexExecution}} is nullable but {{ProgrammedSlotProvider}} 
> requires it to be non-null to work. This makes {{ProgrammedSlotProvider}} not 
> able to be used by new scheduler tests since {{vertexExecution}} is never set 
> in the new scheduler code path. It blocks us from reworking tests which are 
> based legacy scheduling to base on the new scheduler.
> Besides that, there are 2 other problems caused by the nullable 
> vertexExecution:
> 1. The log printed in SchedulerImpl#allocateSlotInternal(...) may contain no 
> useful info since the vertexExecution can be null.
> 2. NPE issue reported in FLINK-16145.
> Thus I would propose to replace the nullable vertexExecution with a non-null 
> executionVertexID.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-16180) Replacing vertexExecution in ScheduledUnit with executionVertexID

2020-02-19 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16180?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-16180:
---
Labels: pull-request-available  (was: )

> Replacing vertexExecution in ScheduledUnit with executionVertexID
> -
>
> Key: FLINK-16180
> URL: https://issues.apache.org/jira/browse/FLINK-16180
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Affects Versions: 1.11.0
>Reporter: Zhu Zhu
>Assignee: Zhu Zhu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.11.0
>
>
> {{ScheduledUnit#vertexExecution}} is nullable but {{ProgrammedSlotProvider}} 
> requires it to be non-null to work. This makes {{ProgrammedSlotProvider}} not 
> able to be used by new scheduler tests since {{vertexExecution}} is never set 
> in the new scheduler code path. It blocks us from reworking tests which are 
> based legacy scheduling to base on the new scheduler.
> I would propose to replace the nullable vertexExecution with a non-null 
> executionVertexID.
> This change would not break anything since {{vertexExecution}} is mainly used 
> by {{ProgrammedSlotProvider}} for testing. {{ProgrammedSlotProvider}} uses it 
> to retrieve the JobVertexID and subtaskIndex. 
> The only other place where {{ScheduledUnit#vertexExecution}} is used is to 
> log the involved task for slot allocation in 
> {{SchedulerImpl#allocateSlotInternal(...)}}. The log is problematic at the 
> moment with the new scheduler since the vertexExecution is null. This change 
> can fix the problematic log.
> This change would also fix a NPE issue reported in FLINK-16145.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] zhuzhurk opened a new pull request #11148: [FLINK-16180][runtime] Replace the nullable vertexExecution in ScheduledUnit with a non-null executionVertexId

2020-02-19 Thread GitBox
zhuzhurk opened a new pull request #11148: [FLINK-16180][runtime] Replace the 
nullable vertexExecution in ScheduledUnit with a non-null executionVertexId
URL: https://github.com/apache/flink/pull/11148
 
 
   ## What is the purpose of the change
   
   ScheduledUnit#vertexExecution is nullable but ProgrammedSlotProvider 
requires it to be non-null to work. This makes ProgrammedSlotProvider not able 
to be used by new scheduler tests since vertexExecution is never set in the new 
scheduler code path. It blocks us from reworking tests which are based legacy 
scheduling to base on the new scheduler.
   
   Thus I opened this PR to replace the nullable vertexExecution with a 
non-null executionVertexID.
   
   It also naturally fixes 2 other issues:
   1. The log printed in SchedulerImpl#allocateSlotInternal(...) can be broken 
since the vertexExecution can be null.
   2. an NPE issue reported in FLINK-16145.
   
   ## Brief change log
   
   See commits.
   
   ## Verifying this change
   
   This change is already covered by existing tests, such as *ExecutionTests*.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / **no**)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
 - The serializers: (yes / **no** / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / **no** 
/ don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
 - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / **no**)
 - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot commented on issue #11146: [FLINK-15718] Update StatefulJobSavepointMigrationITCase to restore from 1.10 savepoint

2020-02-19 Thread GitBox
flinkbot commented on issue #11146: [FLINK-15718] Update 
StatefulJobSavepointMigrationITCase to restore from 1.10 savepoint
URL: https://github.com/apache/flink/pull/11146#issuecomment-588660647
 
 
   
   ## CI report:
   
   * 6d0d8f47466cf61ec549d85627e0349550e2433d UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot commented on issue #11147: [FLINK-15717] Update StatefulJobSavepointMigrationITCase.scala to restore from 1.10 savepoint

2020-02-19 Thread GitBox
flinkbot commented on issue #11147: [FLINK-15717] Update 
StatefulJobSavepointMigrationITCase.scala to restore from 1.10 savepoint
URL: https://github.com/apache/flink/pull/11147#issuecomment-588652456
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Automated Checks
   Last check on commit 8127feb053c5d657b8453a7ef02c65da627d037c (Thu Feb 20 
07:11:52 UTC 2020)
   
   **Warnings:**
* No documentation files were touched! Remember to keep the Flink docs up 
to date!
* **This pull request references an unassigned [Jira 
ticket](https://issues.apache.org/jira/browse/FLINK-15717).** According to the 
[code contribution 
guide](https://flink.apache.org/contributing/contribute-code.html), tickets 
need to be assigned before starting with the implementation work.
   
   
   Mention the bot in a comment to re-run the automated checks.
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full 
explanation of the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-15717) Update StatefulJobSavepointMigrationITCase.scala to restore from 1.10 savepoint

2020-02-19 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15717?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-15717:
---
Labels: pull-request-available  (was: )

> Update StatefulJobSavepointMigrationITCase.scala to restore from 1.10 
> savepoint
> ---
>
> Key: FLINK-15717
> URL: https://issues.apache.org/jira/browse/FLINK-15717
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.11.0
>Reporter: vinoyang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.11.0
>
>
> Update {{StatefulJobSavepointMigrationITCase.scala}} to restore from 1.10 
> savepoint



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] yanghua opened a new pull request #11147: [FLINK-15717] Update StatefulJobSavepointMigrationITCase.scala to restore from 1.10 savepoint

2020-02-19 Thread GitBox
yanghua opened a new pull request #11147: [FLINK-15717] Update 
StatefulJobSavepointMigrationITCase.scala to restore from 1.10 savepoint
URL: https://github.com/apache/flink/pull/11147
 
 
   
   
   ## What is the purpose of the change
   
   *This pull request updates StatefulJobSavepointMigrationITCase.scala to 
restore from 1.10 savepoint*
   
   
   ## Brief change log
   
 - *Update StatefulJobSavepointMigrationITCase.scala to restore from 1.10 
savepoint*
   
   
   ## Verifying this change
   
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / **no**)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
 - The serializers: (yes / **no** / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / **no** 
/ don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
 - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / **no**)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ **not documented**)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-16141) HiveTableSourceTest unstable

2020-02-19 Thread Rui Li (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16141?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17040690#comment-17040690
 ] 

Rui Li commented on FLINK-16141:


Hey [~sewen], yes the test is setup locally. It launches a separate metastore 
process and connects to it. Judging from the stacktrace, either the metastore 
process crashed shortly after it started, or we had some problem establishing 
the connection. The log of the metastore process is disabled by default to 
avoid flooding travis output. We can't tell the root cause for sure unless we 
reproduce the issue with metastore log enabled.

I'll see what I can do about the exception logs. I agree we should suppress 
them for negative test cases, while have them enabled for normal tests.

> HiveTableSourceTest unstable
> 
>
> Key: FLINK-16141
> URL: https://issues.apache.org/jira/browse/FLINK-16141
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Hive
>Affects Versions: 1.10.0
>Reporter: Stephan Ewen
>Priority: Blocker
> Fix For: 1.11.0
>
>
> The test fails with various exceptions.
> See https://api.travis-ci.org/v3/job/651866181/log.txt



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot commented on issue #11146: [FLINK-15718] Update StatefulJobSavepointMigrationITCase to restore from 1.10 savepoint

2020-02-19 Thread GitBox
flinkbot commented on issue #11146: [FLINK-15718] Update 
StatefulJobSavepointMigrationITCase to restore from 1.10 savepoint
URL: https://github.com/apache/flink/pull/11146#issuecomment-588634455
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Automated Checks
   Last check on commit 6d0d8f47466cf61ec549d85627e0349550e2433d (Thu Feb 20 
06:37:25 UTC 2020)
   
   **Warnings:**
* No documentation files were touched! Remember to keep the Flink docs up 
to date!
* **This pull request references an unassigned [Jira 
ticket](https://issues.apache.org/jira/browse/FLINK-15718).** According to the 
[code contribution 
guide](https://flink.apache.org/contributing/contribute-code.html), tickets 
need to be assigned before starting with the implementation work.
   
   
   Mention the bot in a comment to re-run the automated checks.
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full 
explanation of the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-15718) Update StatefulJobSavepointMigrationITCase to restore from 1.10 savepoint

2020-02-19 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15718?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-15718:
---
Labels: pull-request-available  (was: )

> Update StatefulJobSavepointMigrationITCase to restore from 1.10 savepoint
> -
>
> Key: FLINK-15718
> URL: https://issues.apache.org/jira/browse/FLINK-15718
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.11.0
>Reporter: vinoyang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.11.0
>
>
> Update {{StatefulJobSavepointMigrationITCase}} to restore from 1.10 savepoint



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] yanghua opened a new pull request #11146: [FLINK-15718] Update StatefulJobSavepointMigrationITCase to restore from 1.10 savepoint

2020-02-19 Thread GitBox
yanghua opened a new pull request #11146: [FLINK-15718] Update 
StatefulJobSavepointMigrationITCase to restore from 1.10 savepoint
URL: https://github.com/apache/flink/pull/11146
 
 
   
   
   ## What is the purpose of the change
   
   *This pull request updates StatefulJobSavepointMigrationITCase to restore 
from 1.10 savepoint*
   
   
   ## Brief change log
   
 - *Update StatefulJobSavepointMigrationITCase to restore from 1.10 
savepoint*
   
   
   ## Verifying this change
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / **no**)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
 - The serializers: (yes / **no** / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / **no** 
/ don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
 - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / **no**)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ **not documented**)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #11144: [FLINK-15715] Update ContinuousFileProcessingMigrationTest to restore from 1.10 savepoint

2020-02-19 Thread GitBox
flinkbot edited a comment on issue #11144: [FLINK-15715] Update 
ContinuousFileProcessingMigrationTest to restore from 1.10 savepoint
URL: https://github.com/apache/flink/pull/11144#issuecomment-588270086
 
 
   
   ## CI report:
   
   * bafa0b96be75a242a6465becc11e82a1afa1de61 Travis: 
[CANCELED](https://travis-ci.com/flink-ci/flink/builds/149646329) 
   * 66e240be3d3886212b32373a975e6f005a817d39 Travis: 
[SUCCESS](https://travis-ci.com/flink-ci/flink/builds/149745729) Azure: 
[SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5336)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode

2020-02-19 Thread GitBox
zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] 
Let Netty use Flink's buffers directly in credit-based mode
URL: https://github.com/apache/flink/pull/7368#discussion_r381780564
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessageClientDecoder.java
 ##
 @@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.netty;
+
+import org.apache.flink.runtime.io.network.NetworkClientHandler;
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter;
+
+import static 
org.apache.flink.runtime.io.network.netty.NettyMessage.FRAME_HEADER_LENGTH;
+import static 
org.apache.flink.runtime.io.network.netty.NettyMessage.MAGIC_NUMBER;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Decodes messages from the fragmentary netty buffers. This decoder assumes 
the
+ * messages have the following format:
+ * +---++
+ * | FRAME_HEADER ||  MESSAGE_HEADER   | DATA BUFFER (Optional) |
+ * +---++
+ *
+ * This decoder decodes the frame header and delegates the other following work
+ * to corresponding message parsers according to the message type. During the 
process
+ * of decoding, the decoder and parsers try best to eliminate copying. For the 
frame
+ * header and message header, it only cumulates data when they span multiple 
input buffers.
+ * For the buffer part, it copies directly to the input channels to avoid 
future copying.
+ *
+ * The format of the frame header is
+ * +--+--++
+ * | FRAME LENGTH (4) | MAGIC NUMBER (4) | ID (1) |
+ * +--+--++
+ */
+public class NettyMessageClientDecoder extends ChannelInboundHandlerAdapter {
+
+   /** The message parser for buffer response. */
+private final NettyMessageDecoderDelegate bufferResponseDecoderDelegate;
+
+/** The message parser for other messages other than buffer response. */
+   private final NettyMessageDecoderDelegate 
nonBufferResponseDecoderDelegate;
+
+   /** The cumulation buffer for the frame header part. */
+   private ByteBuf frameHeaderBuffer;
+
+   /**
+* The chosen message parser for the current message. If it is null, 
then
+* we are decoding the frame header part, otherwise we are decoding the 
actual
+* message.
+*/
+   private NettyMessageDecoderDelegate currentDecoderDelegate;
+
+NettyMessageClientDecoder(NetworkClientHandler networkClientHandler) {
+this.bufferResponseDecoderDelegate = new 
BufferResponseDecoderDelegate(networkClientHandler);
+this.nonBufferResponseDecoderDelegate = new 
NonBufferResponseDecoderDelegate();
+}
+
+@Override
+public void channelActive(ChannelHandlerContext ctx) throws Exception {
+super.channelActive(ctx);
+
+bufferResponseDecoderDelegate.onChannelActive(ctx);
+nonBufferResponseDecoderDelegate.onChannelActive(ctx);
+
+   frameHeaderBuffer = 
ctx.alloc().directBuffer(FRAME_HEADER_LENGTH);
+}
+
+@Override
+public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+super.channelInactive(ctx);
+
+   bufferResponseDecoderDelegate.close();
+   nonBufferResponseDecoderDelegate.close();
+
+   frameHeaderBuffer.release();
+}
+
+@Override
+public void channelRead(ChannelHandlerContext ctx, Object msg) throws 
Exception {
+ByteBuf data = (ByteBuf) msg;
+
+try {
+while (data.isReadable()) {
+   if (currentDecoderDelegate == null) {
+   decodeFrameHeader(data);
+   }
+
+   if (data.isReadable() && currentDecoderDelegate 
!= null) {
+

[GitHub] [flink] zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode

2020-02-19 Thread GitBox
zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] 
Let Netty use Flink's buffers directly in credit-based mode
URL: https://github.com/apache/flink/pull/7368#discussion_r381779124
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
 ##
 @@ -351,38 +347,68 @@ ByteBuf write(ByteBufAllocator allocator) throws 
IOException {
 
CompositeByteBuf composityBuf = 
allocator.compositeDirectBuffer();
composityBuf.addComponent(headerBuf);
-   composityBuf.addComponent(buffer);
+   composityBuf.addComponent(buffer.asByteBuf());
// update writer index since we have data 
written to the components:
-   
composityBuf.writerIndex(headerBuf.writerIndex() + buffer.writerIndex());
+   
composityBuf.writerIndex(headerBuf.writerIndex() + 
buffer.asByteBuf().writerIndex());
return composityBuf;
}
catch (Throwable t) {
if (headerBuf != null) {
headerBuf.release();
}
-   buffer.release();
+   buffer.recycleBuffer();
 
ExceptionUtils.rethrowIOException(t);
return null; // silence the compiler
}
}
 
-   static BufferResponse readFrom(ByteBuf buffer) {
-   InputChannelID receiverId = 
InputChannelID.fromByteBuf(buffer);
-   int sequenceNumber = buffer.readInt();
-   int backlog = buffer.readInt();
-   boolean isBuffer = buffer.readBoolean();
-   boolean isCompressed = buffer.readBoolean();
-   int size = buffer.readInt();
+   /**
+* Parses the message header part and composes a new 
BufferResponse with an empty data buffer. The
+* data buffer will be filled in later. This method is used in 
credit-based network stack.
+*
+* @param messageHeader the serialized message header.
+* @param bufferAllocator the allocator for network buffer.
+* @return a BufferResponse object with the header parsed and 
the data buffer to fill in later. The
+*  data buffer will be null if the target 
channel has been released or the buffer size is 0.
+*/
+   static BufferResponse readFrom(ByteBuf messageHeader, 
NetworkBufferAllocator bufferAllocator) {
+   InputChannelID receiverId = 
InputChannelID.fromByteBuf(messageHeader);
+   int sequenceNumber = messageHeader.readInt();
+   int backlog = messageHeader.readInt();
+   boolean isBuffer = messageHeader.readBoolean();
+   boolean isCompressed = messageHeader.readBoolean();
+   int size = messageHeader.readInt();
+
+   Buffer dataBuffer = null;
+   if (size != 0) {
+   if (isBuffer) {
+   dataBuffer = 
bufferAllocator.allocatePooledNetworkBuffer(receiverId);
+   } else {
+   dataBuffer = 
bufferAllocator.allocateUnPooledNetworkBuffer(size);
+   }
+   }
+
+   if (dataBuffer == null) {
+   dataBuffer = 
bufferAllocator.getPlaceHolderBuffer();
 
 Review comment:
   Based on the 
[comment](https://github.com/apache/flink/pull/7368/files#r381770789), the 
`BufferResponse` should provide a way for getting released info in order to be 
handler by `PartitionRequestClientHandler`. There might be two options:
   
   - Rely on whether `dataBuffer` is null, not quite sure whether it is 
suggested to use `Optional` by the latest guideline .
   
   - Define a new boolean field in `BufferResponse` to describe this info, but 
it seems a bit redundant with `dataBuffer` state.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode

2020-02-19 Thread GitBox
zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] 
Let Netty use Flink's buffers directly in credit-based mode
URL: https://github.com/apache/flink/pull/7368#discussion_r381779124
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
 ##
 @@ -351,38 +347,68 @@ ByteBuf write(ByteBufAllocator allocator) throws 
IOException {
 
CompositeByteBuf composityBuf = 
allocator.compositeDirectBuffer();
composityBuf.addComponent(headerBuf);
-   composityBuf.addComponent(buffer);
+   composityBuf.addComponent(buffer.asByteBuf());
// update writer index since we have data 
written to the components:
-   
composityBuf.writerIndex(headerBuf.writerIndex() + buffer.writerIndex());
+   
composityBuf.writerIndex(headerBuf.writerIndex() + 
buffer.asByteBuf().writerIndex());
return composityBuf;
}
catch (Throwable t) {
if (headerBuf != null) {
headerBuf.release();
}
-   buffer.release();
+   buffer.recycleBuffer();
 
ExceptionUtils.rethrowIOException(t);
return null; // silence the compiler
}
}
 
-   static BufferResponse readFrom(ByteBuf buffer) {
-   InputChannelID receiverId = 
InputChannelID.fromByteBuf(buffer);
-   int sequenceNumber = buffer.readInt();
-   int backlog = buffer.readInt();
-   boolean isBuffer = buffer.readBoolean();
-   boolean isCompressed = buffer.readBoolean();
-   int size = buffer.readInt();
+   /**
+* Parses the message header part and composes a new 
BufferResponse with an empty data buffer. The
+* data buffer will be filled in later. This method is used in 
credit-based network stack.
+*
+* @param messageHeader the serialized message header.
+* @param bufferAllocator the allocator for network buffer.
+* @return a BufferResponse object with the header parsed and 
the data buffer to fill in later. The
+*  data buffer will be null if the target 
channel has been released or the buffer size is 0.
+*/
+   static BufferResponse readFrom(ByteBuf messageHeader, 
NetworkBufferAllocator bufferAllocator) {
+   InputChannelID receiverId = 
InputChannelID.fromByteBuf(messageHeader);
+   int sequenceNumber = messageHeader.readInt();
+   int backlog = messageHeader.readInt();
+   boolean isBuffer = messageHeader.readBoolean();
+   boolean isCompressed = messageHeader.readBoolean();
+   int size = messageHeader.readInt();
+
+   Buffer dataBuffer = null;
+   if (size != 0) {
+   if (isBuffer) {
+   dataBuffer = 
bufferAllocator.allocatePooledNetworkBuffer(receiverId);
+   } else {
+   dataBuffer = 
bufferAllocator.allocateUnPooledNetworkBuffer(size);
+   }
+   }
+
+   if (dataBuffer == null) {
+   dataBuffer = 
bufferAllocator.getPlaceHolderBuffer();
 
 Review comment:
   Based on the 
[comment](https://github.com/apache/flink/pull/7368/files#r381770789), the 
`BufferResponse` should provide a way for getting released info in order to be 
handled by `PartitionRequestClientHandler`. There might be two options:
   
   - Rely on whether `dataBuffer` is null, not quite sure whether it is 
suggested to use `Optional` by the latest guideline .
   
   - Define a new boolean field in `BufferResponse` to describe this info, but 
it seems a bit redundant with `dataBuffer` state.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode

2020-02-19 Thread GitBox
zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] 
Let Netty use Flink's buffers directly in credit-based mode
URL: https://github.com/apache/flink/pull/7368#discussion_r381773812
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/BufferResponseDecoderDelegate.java
 ##
 @@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.netty;
+
+import org.apache.flink.runtime.io.network.NetworkClientHandler;
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
+
+import static 
org.apache.flink.runtime.io.network.netty.NettyMessage.BufferResponse;
+
+/**
+ * The parser for {@link 
org.apache.flink.runtime.io.network.netty.NettyMessage.BufferResponse}.
+ */
+public class BufferResponseDecoderDelegate implements 
NettyMessageDecoderDelegate {
+
+   /** The network client handler of current channel. */
+   private final NetworkClientHandler networkClientHandler;
+
+   /** The Flink Buffer allocator. */
+   private final NetworkBufferAllocator allocator;
+
+   /** The cumulation buffer of message header. */
+   private ByteBuf messageHeaderBuffer;
+
+   /**
+* The current BufferResponse message that are process the buffer part.
+* If it is null, we are still processing the message header part, 
otherwise
+* we are processing the buffer part.
+*/
+   private BufferResponse currentResponse;
+
+   /** How much bytes have been received or discarded for the buffer part. 
*/
+   private int decodedBytesOfBuffer;
+
+   public BufferResponseDecoderDelegate(NetworkClientHandler 
networkClientHandler) {
+   this.networkClientHandler = networkClientHandler;
+   this.allocator = new 
NetworkBufferAllocator(networkClientHandler);
+   }
+
+   @Override
+   public void onChannelActive(ChannelHandlerContext ctx) {
+   messageHeaderBuffer = 
ctx.alloc().directBuffer(BufferResponse.MESSAGE_HEADER_LENGTH);
+   }
+
+   @Override
+   public void startParsingMessage(int msgId, int messageLength) {
+   currentResponse = null;
+   decodedBytesOfBuffer = 0;
+
+   messageHeaderBuffer.clear();
+   }
+
+   @Override
+   public ParseResult onChannelRead(ByteBuf data) throws Exception {
+   if (currentResponse == null) {
+   ByteBuf toDecode = 
ByteBufUtils.cumulate(messageHeaderBuffer, data, 
BufferResponse.MESSAGE_HEADER_LENGTH);
+
+   if (toDecode != null) {
+   currentResponse = 
BufferResponse.readFrom(toDecode, allocator);
+
+   if (currentResponse.bufferSize == 0) {
+   return 
ParseResult.finishedWith(currentResponse);
+   }
+   }
+   }
+
+   if (currentResponse != null) {
+   boolean isDiscarding = 
allocator.isPlaceHolderBuffer(currentResponse.getBuffer());
+   int remainingBufferSize = currentResponse.bufferSize - 
decodedBytesOfBuffer;
+   int actualBytesToDecode = 
Math.min(data.readableBytes(), remainingBufferSize);
+
+   if (isDiscarding) {
+   data.readerIndex(data.readerIndex() + 
actualBytesToDecode);
+   } else {
+   
currentResponse.getBuffer().asByteBuf().writeBytes(data, actualBytesToDecode);
+   }
+
+   decodedBytesOfBuffer += actualBytesToDecode;
+
+   if (decodedBytesOfBuffer == currentResponse.bufferSize) 
{
+   if (isDiscarding) {
+   
networkClientHandler.cancelRequestFor(currentResponse.receiverId);
+   return ParseResult.finishedWith(null);
 
 Review comment:
   Based on the 

[GitHub] [flink] zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode

2020-02-19 Thread GitBox
zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] 
Let Netty use Flink's buffers directly in credit-based mode
URL: https://github.com/apache/flink/pull/7368#discussion_r381773376
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/BufferResponseDecoderDelegate.java
 ##
 @@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.netty;
+
+import org.apache.flink.runtime.io.network.NetworkClientHandler;
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
+
+import static 
org.apache.flink.runtime.io.network.netty.NettyMessage.BufferResponse;
+
+/**
+ * The parser for {@link 
org.apache.flink.runtime.io.network.netty.NettyMessage.BufferResponse}.
+ */
+public class BufferResponseDecoderDelegate implements 
NettyMessageDecoderDelegate {
+
+   /** The network client handler of current channel. */
+   private final NetworkClientHandler networkClientHandler;
+
+   /** The Flink Buffer allocator. */
+   private final NetworkBufferAllocator allocator;
+
+   /** The cumulation buffer of message header. */
+   private ByteBuf messageHeaderBuffer;
+
+   /**
+* The current BufferResponse message that are process the buffer part.
+* If it is null, we are still processing the message header part, 
otherwise
+* we are processing the buffer part.
+*/
+   private BufferResponse currentResponse;
+
+   /** How much bytes have been received or discarded for the buffer part. 
*/
+   private int decodedBytesOfBuffer;
+
+   public BufferResponseDecoderDelegate(NetworkClientHandler 
networkClientHandler) {
+   this.networkClientHandler = networkClientHandler;
+   this.allocator = new 
NetworkBufferAllocator(networkClientHandler);
+   }
+
+   @Override
+   public void onChannelActive(ChannelHandlerContext ctx) {
+   messageHeaderBuffer = 
ctx.alloc().directBuffer(BufferResponse.MESSAGE_HEADER_LENGTH);
+   }
+
+   @Override
+   public void startParsingMessage(int msgId, int messageLength) {
+   currentResponse = null;
+   decodedBytesOfBuffer = 0;
+
+   messageHeaderBuffer.clear();
+   }
+
+   @Override
+   public ParseResult onChannelRead(ByteBuf data) throws Exception {
+   if (currentResponse == null) {
+   ByteBuf toDecode = 
ByteBufUtils.cumulate(messageHeaderBuffer, data, 
BufferResponse.MESSAGE_HEADER_LENGTH);
+
+   if (toDecode != null) {
+   currentResponse = 
BufferResponse.readFrom(toDecode, allocator);
+
+   if (currentResponse.bufferSize == 0) {
+   return 
ParseResult.finishedWith(currentResponse);
+   }
+   }
+   }
+
+   if (currentResponse != null) {
+   boolean isDiscarding = 
allocator.isPlaceHolderBuffer(currentResponse.getBuffer());
+   int remainingBufferSize = currentResponse.bufferSize - 
decodedBytesOfBuffer;
+   int actualBytesToDecode = 
Math.min(data.readableBytes(), remainingBufferSize);
+
+   if (isDiscarding) {
+   data.readerIndex(data.readerIndex() + 
actualBytesToDecode);
+   } else {
+   
currentResponse.getBuffer().asByteBuf().writeBytes(data, actualBytesToDecode);
+   }
+
+   decodedBytesOfBuffer += actualBytesToDecode;
+
+   if (decodedBytesOfBuffer == currentResponse.bufferSize) 
{
+   if (isDiscarding) {
 
 Review comment:
   We can also remove this logic based on 
[comment](https://github.com/apache/flink/pull/7368/files#r381770789), and 
another benefit is that we can avoid judging this condition twice to make the 
logic seem spread.


[GitHub] [flink] zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode

2020-02-19 Thread GitBox
zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] 
Let Netty use Flink's buffers directly in credit-based mode
URL: https://github.com/apache/flink/pull/7368#discussion_r381771843
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/BufferResponseDecoderDelegate.java
 ##
 @@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.netty;
+
+import org.apache.flink.runtime.io.network.NetworkClientHandler;
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufAllocator;
+
+/**
+ * The parser for {@link 
org.apache.flink.runtime.io.network.netty.NettyMessage.BufferResponse}.
+ */
+public class BufferResponseDecoderDelegate implements 
NettyMessageDecoderDelegate {
+
+   /** The network client handler of current channel. */
+   private final NetworkClientHandler networkClientHandler;
+
+   /** The Flink Buffer allocator. */
+   private final NetworkBufferAllocator allocator;
+
+   /** The cumulation buffer of message header. */
+   private ByteBuf messageHeaderCumulationBuffer;
+
+   /**
+* The current BufferResponse message that are process the buffer part.
+* If it is null, we are still processing the message header part, 
otherwise
+* we are processing the buffer part.
+*/
+   private NettyMessage.BufferResponse currentResponse;
+
+   /** How much bytes have been received or discarded for the buffer part. 
*/
+   private int decodedBytesOfBuffer;
+
+   public BufferResponseDecoderDelegate(NetworkClientHandler 
networkClientHandler) {
+   this.networkClientHandler = networkClientHandler;
+   this.allocator = new 
NetworkBufferAllocator(networkClientHandler);
+   }
+
+   @Override
+   public void onChannelActive(ByteBufAllocator alloc) {
+   messageHeaderCumulationBuffer = 
alloc.directBuffer(NettyMessage.BufferResponse.MESSAGE_HEADER_LENGTH);
+   }
+
+   @Override
+   public void startParsingMessage(int msgId, int messageLength) {
+   currentResponse = null;
+   decodedBytesOfBuffer = 0;
+
+   messageHeaderCumulationBuffer.clear();
+   }
+
+   @Override
+   public ParseResult onChannelRead(ByteBuf data) throws Exception {
+   if (currentResponse == null) {
+   ByteBuf toDecode = 
ByteBufUtils.cumulate(messageHeaderCumulationBuffer, data, 
NettyMessage.BufferResponse.MESSAGE_HEADER_LENGTH);
+
+   if (toDecode != null) {
+   currentResponse = 
NettyMessage.BufferResponse.readFrom(toDecode, allocator);
+
+   if (currentResponse.bufferSize == 0) {
 
 Review comment:
   This logic does not need to explicitly handle based on the 
[comment](https://github.com/apache/flink/pull/7368/files#r381770789), and it 
should be transparent with below process.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode

2020-02-19 Thread GitBox
zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] 
Let Netty use Flink's buffers directly in credit-based mode
URL: https://github.com/apache/flink/pull/7368#discussion_r381770789
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/BufferResponseDecoderDelegate.java
 ##
 @@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.netty;
+
+import org.apache.flink.runtime.io.network.NetworkClientHandler;
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
+
+import static 
org.apache.flink.runtime.io.network.netty.NettyMessage.BufferResponse;
+
+/**
+ * The parser for {@link 
org.apache.flink.runtime.io.network.netty.NettyMessage.BufferResponse}.
+ */
+public class BufferResponseDecoderDelegate implements 
NettyMessageDecoderDelegate {
 
 Review comment:
   After thinking through, I think we should redefine the function of 
`BufferResponseDecoderDelegate`. Currently it is not only decoding from 
`ByteBuf` to `BufferResponse` message, and also needs to involve in the 
specific cases for `size = 0` and `released channel`, which would make the 
internal logics more complex.
   
   Instead we can make `BufferResponseDecoderDelegate` only handle the decoder 
work, and fire the complete `BufferResponse` message to next 
`PartitionRequestClientHandler` which would handle the specific logics more 
easily and clean. I would point out the related changes in detail codes.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode

2020-02-19 Thread GitBox
zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] 
Let Netty use Flink's buffers directly in credit-based mode
URL: https://github.com/apache/flink/pull/7368#discussion_r381770789
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/BufferResponseDecoderDelegate.java
 ##
 @@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.netty;
+
+import org.apache.flink.runtime.io.network.NetworkClientHandler;
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
+
+import static 
org.apache.flink.runtime.io.network.netty.NettyMessage.BufferResponse;
+
+/**
+ * The parser for {@link 
org.apache.flink.runtime.io.network.netty.NettyMessage.BufferResponse}.
+ */
+public class BufferResponseDecoderDelegate implements 
NettyMessageDecoderDelegate {
 
 Review comment:
   After thinking through, it is necessary to redefine the function of 
`BufferResponseDecoderDelegate`. Currently it is not only decoding from 
`ByteBuf` to `BufferResponse` message, and also involving in the specific cases 
for `size = 0` and `released channel`, which would make the internal logics 
more complex.
   
   Instead we can make `BufferResponseDecoderDelegate` only handle the decoder 
work, and fire the complete `BufferResponse` message to next 
`PartitionRequestClientHandler` which would handle the specific logics more 
easily and clean. I would point out the related changes in detail codes.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] dianfu commented on a change in pull request #11118: [FLINK-16072][python] Optimize the performance of the write/read null mask

2020-02-19 Thread GitBox
dianfu commented on a change in pull request #8: [FLINK-16072][python] 
Optimize the performance of the write/read null mask
URL: https://github.com/apache/flink/pull/8#discussion_r381725940
 
 

 ##
 File path: flink-python/pyflink/fn_execution/coder_impl.py
 ##
 @@ -24,57 +24,71 @@
 from pyflink.table import Row
 
 
+def generate_null_mask_search_table():
 
 Review comment:
   Make it a private method of FlattenRowCoderImpl? I think there is no need to 
expose it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] dianfu commented on a change in pull request #11118: [FLINK-16072][python] Optimize the performance of the write/read null mask

2020-02-19 Thread GitBox
dianfu commented on a change in pull request #8: [FLINK-16072][python] 
Optimize the performance of the write/read null mask
URL: https://github.com/apache/flink/pull/8#discussion_r381766299
 
 

 ##
 File path: flink-python/pyflink/fn_execution/coder_impl.py
 ##
 @@ -24,57 +24,71 @@
 from pyflink.table import Row
 
 
+def generate_null_mask_search_table():
+"""
+Each bit of one byte represents if the column at the specified position is 
None or not, e.g.
+0x84 represents the first column and the sixth column are None.
+"""
+num = 256
+null_mask = []
+for b in range(num):
+every_num_null_mask = [(b & 0x80) > 0, (b & 0x40) > 0, (b & 0x20) > 0, 
(b & 0x10) > 0,
+   (b & 0x08) > 0, (b & 0x04) > 0, (b & 0x02) > 0, 
(b & 0x01) > 0]
+null_mask.append(tuple(every_num_null_mask))
+
+return tuple(null_mask)
+
+
 class FlattenRowCoderImpl(StreamCoderImpl):
+null_mask_search_table = generate_null_mask_search_table()
 
 Review comment:
   What about make it an instance variable as it's faster than static variable: 
https://stackoverflow.com/questions/2714573/instance-variables-vs-class-variables-in-python


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] dianfu commented on a change in pull request #11118: [FLINK-16072][python] Optimize the performance of the write/read null mask

2020-02-19 Thread GitBox
dianfu commented on a change in pull request #8: [FLINK-16072][python] 
Optimize the performance of the write/read null mask
URL: https://github.com/apache/flink/pull/8#discussion_r381768526
 
 

 ##
 File path: flink-python/pyflink/fn_execution/coder_impl.py
 ##
 @@ -24,57 +24,71 @@
 from pyflink.table import Row
 
 
+def generate_null_mask_search_table():
+"""
+Each bit of one byte represents if the column at the specified position is 
None or not, e.g.
+0x84 represents the first column and the sixth column are None.
+"""
+num = 256
+null_mask = []
+for b in range(num):
+every_num_null_mask = [(b & 0x80) > 0, (b & 0x40) > 0, (b & 0x20) > 0, 
(b & 0x10) > 0,
+   (b & 0x08) > 0, (b & 0x04) > 0, (b & 0x02) > 0, 
(b & 0x01) > 0]
+null_mask.append(tuple(every_num_null_mask))
+
+return tuple(null_mask)
+
+
 class FlattenRowCoderImpl(StreamCoderImpl):
+null_mask_search_table = generate_null_mask_search_table()
+
+null_byte_search_table = (0x80, 0x40, 0x20, 0x10, 0x08, 0x04, 0x02, 0x01)
 
 def __init__(self, field_coders):
 self._field_coders = field_coders
 self._filed_count = len(field_coders)
+self._complete_byte_num = self._filed_count // 8
+self._leading_bytes_num = self._filed_count % 8
 
 def encode_to_stream(self, value, out_stream, nested):
 self.write_null_mask(value, out_stream)
 for i in range(self._filed_count):
-if value[i] is not None:
-self._field_coders[i].encode_to_stream(value[i], out_stream, 
nested)
+item = value[i]
+if item is not None:
+self._field_coders[i].encode_to_stream(item, out_stream, 
nested)
 
 def decode_from_stream(self, in_stream, nested):
-null_mask = self.read_null_mask(self._filed_count, in_stream)
+null_mask = self.read_null_mask(in_stream)
 return [None if null_mask[idx] else 
self._field_coders[idx].decode_from_stream(
 in_stream, nested) for idx in range(0, self._filed_count)]
 
-@staticmethod
-def write_null_mask(value, out_stream):
 
 Review comment:
   Could we add some unit tests for the null mask related logic?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] dianfu commented on a change in pull request #11118: [FLINK-16072][python] Optimize the performance of the write/read null mask

2020-02-19 Thread GitBox
dianfu commented on a change in pull request #8: [FLINK-16072][python] 
Optimize the performance of the write/read null mask
URL: https://github.com/apache/flink/pull/8#discussion_r381769187
 
 

 ##
 File path: flink-python/pyflink/fn_execution/coder_impl.py
 ##
 @@ -25,56 +25,331 @@
 
 
 class FlattenRowCoderImpl(StreamCoderImpl):
+null_mask_search_table = ((False, False, False, False, False, False, 
False, False),
+  (False, False, False, False, False, False, 
False, True),
+  (False, False, False, False, False, False, True, 
False),
+  (False, False, False, False, False, False, True, 
True),
+  (False, False, False, False, False, True, False, 
False),
+  (False, False, False, False, False, True, False, 
True),
+  (False, False, False, False, False, True, True, 
False),
+  (False, False, False, False, False, True, True, 
True),
+  (False, False, False, False, True, False, False, 
False),
+  (False, False, False, False, True, False, False, 
True),
+  (False, False, False, False, True, False, True, 
False),
+  (False, False, False, False, True, False, True, 
True),
+  (False, False, False, False, True, True, False, 
False),
+  (False, False, False, False, True, True, False, 
True),
+  (False, False, False, False, True, True, True, 
False),
+  (False, False, False, False, True, True, True, 
True),
+  (False, False, False, True, False, False, False, 
False),
+  (False, False, False, True, False, False, False, 
True),
+  (False, False, False, True, False, False, True, 
False),
+  (False, False, False, True, False, False, True, 
True),
+  (False, False, False, True, False, True, False, 
False),
+  (False, False, False, True, False, True, False, 
True),
+  (False, False, False, True, False, True, True, 
False),
+  (False, False, False, True, False, True, True, 
True),
+  (False, False, False, True, True, False, False, 
False),
+  (False, False, False, True, True, False, False, 
True),
+  (False, False, False, True, True, False, True, 
False),
+  (False, False, False, True, True, False, True, 
True),
+  (False, False, False, True, True, True, False, 
False),
+  (False, False, False, True, True, True, False, 
True),
+  (False, False, False, True, True, True, True, 
False),
+  (False, False, False, True, True, True, True, 
True),
+  (False, False, True, False, False, False, False, 
False),
+  (False, False, True, False, False, False, False, 
True),
+  (False, False, True, False, False, False, True, 
False),
+  (False, False, True, False, False, False, True, 
True),
+  (False, False, True, False, False, True, False, 
False),
+  (False, False, True, False, False, True, False, 
True),
+  (False, False, True, False, False, True, True, 
False),
+  (False, False, True, False, False, True, True, 
True),
+  (False, False, True, False, True, False, False, 
False),
+  (False, False, True, False, True, False, False, 
True),
+  (False, False, True, False, True, False, True, 
False),
+  (False, False, True, False, True, False, True, 
True),
+  (False, False, True, False, True, True, False, 
False),
+  (False, False, True, False, True, True, False, 
True),
+  (False, False, True, False, True, True, True, 
False),
+  (False, False, True, False, True, True, True, 
True),
+  (False, False, True, True, False, False, False, 
False),
+  (False, False, True, True, False, False, False, 
True),
+  (False, False, True, True, False, False, True, 
False),
+  (False, False, True, True, False, False, True, 
True),
+  (False, False, True, True, False, True, False, 
False),
+

[GitHub] [flink] flinkbot edited a comment on issue #10904: [FLINK-15669] [sql client] fix SQL client can't cancel flink job

2020-02-19 Thread GitBox
flinkbot edited a comment on issue #10904: [FLINK-15669] [sql client] fix SQL 
client can't cancel flink job
URL: https://github.com/apache/flink/pull/10904#issuecomment-576004529
 
 
   
   ## CI report:
   
   * dbfc485ecea3aec8bc0054ca8bacb310b417c5dc Travis: 
[FAILURE](https://travis-ci.com/flink-ci/flink/builds/145110835) Azure: 
[FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4479)
 
   * 8637d733546f1863b30ecf78766961a4b4471482 Travis: 
[SUCCESS](https://travis-ci.com/flink-ci/flink/builds/145171018) Azure: 
[SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4500)
 
   * 202059d5e5c772b1b11ee50b9715b216b6ddeaad Travis: 
[SUCCESS](https://travis-ci.com/flink-ci/flink/builds/147163955) Azure: 
[FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4765)
 
   * 6d89ef84c8f0673d93a1048350b302e7d24d7484 Travis: 
[SUCCESS](https://travis-ci.com/flink-ci/flink/builds/149745708) Azure: 
[SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5335)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Comment Edited] (FLINK-16142) Memory Leak causes Metaspace OOM error on repeated job submission

2020-02-19 Thread Yingjie Cao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16142?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17040625#comment-17040625
 ] 

Yingjie Cao edited comment on FLINK-16142 at 2/20/20 5:11 AM:
--

Could you also print the contextClassLoader of the thread? I wonder if there 
are any threads which use Flink user ClassLoader as contextClassLoader.

I have some doubts about this thread: java-sdk-http-connection-reaper. I 
searched the Internet and found these: 
[https://stackoverflow.com/questions/18069042/spring-mvc-webapp-schedule-java-sdk-http-connection-reaper-failed-to-stop],
 
[https://github.com/aws/aws-sdk-java/blob/master/aws-java-sdk-core/src/main/java/com/amazonaws/http/IdleConnectionReaper.java].
 I am not sure whether it's the cause or not, but maybe you can try the 
solution in the stackoverflow page. 


was (Author: kevin.cyj):
Could you also print the contextClassLoader of the thread? I wonder if there 
are any threads which use Flink user ClassLoader as contextClassLoader.

I have some doubts about this thread: java-sdk-http-connection-reaper. I 
searched the Internet and found these: 
[https://stackoverflow.com/questions/18069042/spring-mvc-webapp-schedule-java-sdk-http-connection-reaper-failed-to-stop],
 
[https://github.com/aws/aws-sdk-java/blob/master/aws-java-sdk-core/src/main/java/com/amazonaws/http/IdleConnectionReaper.java].
 I am not sure whether it's the cause or not, but maybe you can try the 
solution in the stackoverflow page. (the IdleConnectionReaper thread sleeps 60 
seconds before checking the shutdown flag, so after shutting down the 
IdleConnectionReaper, you may need to wait more than 1min to submit the next 
job)

> Memory Leak causes Metaspace OOM error on repeated job submission
> -
>
> Key: FLINK-16142
> URL: https://issues.apache.org/jira/browse/FLINK-16142
> Project: Flink
>  Issue Type: Bug
>  Components: Client / Job Submission
>Affects Versions: 1.10.0
>Reporter: Thomas Wozniakowski
>Priority: Blocker
> Fix For: 1.10.1, 1.11.0
>
>
> Hi Guys,
> We've just tried deploying 1.10.0 as it has lots of shiny stuff that fits our 
> use-case exactly (RocksDB state backend running in a containerised cluster). 
> Unfortunately, it seems like there is a memory leak somewhere in the job 
> submission logic. We are getting this error:
> {code:java}
> 2020-02-18 10:22:10,020 INFO 
> org.apache.flink.runtime.executiongraph.ExecutionGraph - OPERATOR_NAME 
> switched from RUNNING to FAILED.
> java.lang.OutOfMemoryError: Metaspace
> at java.lang.ClassLoader.defineClass1(Native Method)
> at java.lang.ClassLoader.defineClass(ClassLoader.java:757)
> at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
> at java.net.URLClassLoader.defineClass(URLClassLoader.java:468)
> at java.net.URLClassLoader.access$100(URLClassLoader.java:74)
> at java.net.URLClassLoader$1.run(URLClassLoader.java:369)
> at java.net.URLClassLoader$1.run(URLClassLoader.java:363)
> at java.security.AccessController.doPrivileged(Native Method)
> at java.net.URLClassLoader.findClass(URLClassLoader.java:362)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:419)
> at 
> org.apache.flink.util.ChildFirstClassLoader.loadClass(ChildFirstClassLoader.java:60)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:352)
> at 
> org.apache.flink.kinesis.shaded.com.amazonaws.jmx.SdkMBeanRegistrySupport.registerMetricAdminMBean(SdkMBeanRegistrySupport.java:27)
> at 
> org.apache.flink.kinesis.shaded.com.amazonaws.metrics.AwsSdkMetrics.registerMetricAdminMBean(AwsSdkMetrics.java:398)
> at 
> org.apache.flink.kinesis.shaded.com.amazonaws.metrics.AwsSdkMetrics.(AwsSdkMetrics.java:359)
> at 
> org.apache.flink.kinesis.shaded.com.amazonaws.AmazonWebServiceClient.requestMetricCollector(AmazonWebServiceClient.java:728)
> at 
> org.apache.flink.kinesis.shaded.com.amazonaws.AmazonWebServiceClient.isRMCEnabledAtClientOrSdkLevel(AmazonWebServiceClient.java:660)
> at 
> org.apache.flink.kinesis.shaded.com.amazonaws.AmazonWebServiceClient.isRequestMetricsEnabled(AmazonWebServiceClient.java:652)
> at 
> org.apache.flink.kinesis.shaded.com.amazonaws.AmazonWebServiceClient.createExecutionContext(AmazonWebServiceClient.java:611)
> at 
> org.apache.flink.kinesis.shaded.com.amazonaws.AmazonWebServiceClient.createExecutionContext(AmazonWebServiceClient.java:606)
> at 
> org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.executeListShards(AmazonKinesisClient.java:1534)
> at 
> org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.listShards(AmazonKinesisClient.java:1528)
> at 
> org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.listShards(KinesisProxy.java:439)
> at 
> 

[jira] [Commented] (FLINK-16142) Memory Leak causes Metaspace OOM error on repeated job submission

2020-02-19 Thread Yingjie Cao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16142?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17040625#comment-17040625
 ] 

Yingjie Cao commented on FLINK-16142:
-

Could you also print the contextClassLoader of the thread? I wonder if there 
are any threads which use Flink user ClassLoader as contextClassLoader.

I have some doubts about this thread: java-sdk-http-connection-reaper. I 
searched the Internet and found these: 
[https://stackoverflow.com/questions/18069042/spring-mvc-webapp-schedule-java-sdk-http-connection-reaper-failed-to-stop],
 
[https://github.com/aws/aws-sdk-java/blob/master/aws-java-sdk-core/src/main/java/com/amazonaws/http/IdleConnectionReaper.java].
 I am not sure whether it's the cause or not, but maybe you can try the 
solution in the stackoverflow page. (the IdleConnectionReaper thread sleeps 60 
seconds before checking the shutdown flag, so after shutting down the 
IdleConnectionReaper, you may need to wait more than 1min to submit the next 
job)

> Memory Leak causes Metaspace OOM error on repeated job submission
> -
>
> Key: FLINK-16142
> URL: https://issues.apache.org/jira/browse/FLINK-16142
> Project: Flink
>  Issue Type: Bug
>  Components: Client / Job Submission
>Affects Versions: 1.10.0
>Reporter: Thomas Wozniakowski
>Priority: Blocker
> Fix For: 1.10.1, 1.11.0
>
>
> Hi Guys,
> We've just tried deploying 1.10.0 as it has lots of shiny stuff that fits our 
> use-case exactly (RocksDB state backend running in a containerised cluster). 
> Unfortunately, it seems like there is a memory leak somewhere in the job 
> submission logic. We are getting this error:
> {code:java}
> 2020-02-18 10:22:10,020 INFO 
> org.apache.flink.runtime.executiongraph.ExecutionGraph - OPERATOR_NAME 
> switched from RUNNING to FAILED.
> java.lang.OutOfMemoryError: Metaspace
> at java.lang.ClassLoader.defineClass1(Native Method)
> at java.lang.ClassLoader.defineClass(ClassLoader.java:757)
> at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
> at java.net.URLClassLoader.defineClass(URLClassLoader.java:468)
> at java.net.URLClassLoader.access$100(URLClassLoader.java:74)
> at java.net.URLClassLoader$1.run(URLClassLoader.java:369)
> at java.net.URLClassLoader$1.run(URLClassLoader.java:363)
> at java.security.AccessController.doPrivileged(Native Method)
> at java.net.URLClassLoader.findClass(URLClassLoader.java:362)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:419)
> at 
> org.apache.flink.util.ChildFirstClassLoader.loadClass(ChildFirstClassLoader.java:60)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:352)
> at 
> org.apache.flink.kinesis.shaded.com.amazonaws.jmx.SdkMBeanRegistrySupport.registerMetricAdminMBean(SdkMBeanRegistrySupport.java:27)
> at 
> org.apache.flink.kinesis.shaded.com.amazonaws.metrics.AwsSdkMetrics.registerMetricAdminMBean(AwsSdkMetrics.java:398)
> at 
> org.apache.flink.kinesis.shaded.com.amazonaws.metrics.AwsSdkMetrics.(AwsSdkMetrics.java:359)
> at 
> org.apache.flink.kinesis.shaded.com.amazonaws.AmazonWebServiceClient.requestMetricCollector(AmazonWebServiceClient.java:728)
> at 
> org.apache.flink.kinesis.shaded.com.amazonaws.AmazonWebServiceClient.isRMCEnabledAtClientOrSdkLevel(AmazonWebServiceClient.java:660)
> at 
> org.apache.flink.kinesis.shaded.com.amazonaws.AmazonWebServiceClient.isRequestMetricsEnabled(AmazonWebServiceClient.java:652)
> at 
> org.apache.flink.kinesis.shaded.com.amazonaws.AmazonWebServiceClient.createExecutionContext(AmazonWebServiceClient.java:611)
> at 
> org.apache.flink.kinesis.shaded.com.amazonaws.AmazonWebServiceClient.createExecutionContext(AmazonWebServiceClient.java:606)
> at 
> org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.executeListShards(AmazonKinesisClient.java:1534)
> at 
> org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.listShards(AmazonKinesisClient.java:1528)
> at 
> org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.listShards(KinesisProxy.java:439)
> at 
> org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.getShardsOfStream(KinesisProxy.java:389)
> at 
> org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.getShardList(KinesisProxy.java:279)
> at 
> org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.discoverNewShardsToSubscribe(KinesisDataFetcher.java:686)
> at 
> org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.run(FlinkKinesisConsumer.java:287)
> at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
> at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
> {code}
> (The only change in the above text is the OPERATOR_NAME text where I removed 
> some of 

[GitHub] [flink] flinkbot edited a comment on issue #11144: [FLINK-15715] Update ContinuousFileProcessingMigrationTest to restore from 1.10 savepoint

2020-02-19 Thread GitBox
flinkbot edited a comment on issue #11144: [FLINK-15715] Update 
ContinuousFileProcessingMigrationTest to restore from 1.10 savepoint
URL: https://github.com/apache/flink/pull/11144#issuecomment-588270086
 
 
   
   ## CI report:
   
   * bafa0b96be75a242a6465becc11e82a1afa1de61 Travis: 
[CANCELED](https://travis-ci.com/flink-ci/flink/builds/149646329) 
   * 66e240be3d3886212b32373a975e6f005a817d39 Travis: 
[PENDING](https://travis-ci.com/flink-ci/flink/builds/149745729) Azure: 
[SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5336)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode

2020-02-19 Thread GitBox
zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] 
Let Netty use Flink's buffers directly in credit-based mode
URL: https://github.com/apache/flink/pull/7368#discussion_r381736600
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessageClientDecoder.java
 ##
 @@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.netty;
+
+import org.apache.flink.runtime.io.network.NetworkClientHandler;
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter;
+
+import static 
org.apache.flink.runtime.io.network.netty.NettyMessage.FRAME_HEADER_LENGTH;
+import static 
org.apache.flink.runtime.io.network.netty.NettyMessage.MAGIC_NUMBER;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Decodes messages from the fragmentary netty buffers. This decoder assumes 
the
+ * messages have the following format:
+ * +---++
+ * | FRAME_HEADER ||  MESSAGE_HEADER   | DATA BUFFER (Optional) |
+ * +---++
+ *
+ * This decoder decodes the frame header and delegates the other following work
+ * to corresponding message parsers according to the message type. During the 
process
+ * of decoding, the decoder and parsers try best to eliminate copying. For the 
frame
+ * header and message header, it only cumulates data when they span multiple 
input buffers.
+ * For the buffer part, it copies directly to the input channels to avoid 
future copying.
+ *
+ * The format of the frame header is
+ * +--+--++
+ * | FRAME LENGTH (4) | MAGIC NUMBER (4) | ID (1) |
+ * +--+--++
+ */
+public class NettyMessageClientDecoder extends ChannelInboundHandlerAdapter {
+
+   /** The message parser for buffer response. */
+private final NettyMessageDecoderDelegate bufferResponseDecoderDelegate;
+
+/** The message parser for other messages other than buffer response. */
+   private final NettyMessageDecoderDelegate 
nonBufferResponseDecoderDelegate;
+
+   /** The cumulation buffer for the frame header part. */
+   private ByteBuf frameHeaderBuffer;
+
+   /**
+* The chosen message parser for the current message. If it is null, 
then
+* we are decoding the frame header part, otherwise we are decoding the 
actual
+* message.
+*/
+   private NettyMessageDecoderDelegate currentDecoderDelegate;
+
+NettyMessageClientDecoder(NetworkClientHandler networkClientHandler) {
+this.bufferResponseDecoderDelegate = new 
BufferResponseDecoderDelegate(networkClientHandler);
+this.nonBufferResponseDecoderDelegate = new 
NonBufferResponseDecoderDelegate();
+}
+
+@Override
+public void channelActive(ChannelHandlerContext ctx) throws Exception {
+super.channelActive(ctx);
+
+bufferResponseDecoderDelegate.onChannelActive(ctx.alloc());
+nonBufferResponseDecoderDelegate.onChannelActive(ctx.alloc());
+
+   frameHeaderBuffer = 
ctx.alloc().directBuffer(FRAME_HEADER_LENGTH);
+}
+
+@Override
+public void channelInactive(ChannelHandlerContext ctx) throws Exception {
 
 Review comment:
   It might be better to handle them dependently, not relying on the implicit 
logic of other handlers.  


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #11144: [FLINK-15715] Update ContinuousFileProcessingMigrationTest to restore from 1.10 savepoint

2020-02-19 Thread GitBox
flinkbot edited a comment on issue #11144: [FLINK-15715] Update 
ContinuousFileProcessingMigrationTest to restore from 1.10 savepoint
URL: https://github.com/apache/flink/pull/11144#issuecomment-588270086
 
 
   
   ## CI report:
   
   * bafa0b96be75a242a6465becc11e82a1afa1de61 Travis: 
[CANCELED](https://travis-ci.com/flink-ci/flink/builds/149646329) 
   * 66e240be3d3886212b32373a975e6f005a817d39 Travis: 
[PENDING](https://travis-ci.com/flink-ci/flink/builds/149745729) 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #10904: [FLINK-15669] [sql client] fix SQL client can't cancel flink job

2020-02-19 Thread GitBox
flinkbot edited a comment on issue #10904: [FLINK-15669] [sql client] fix SQL 
client can't cancel flink job
URL: https://github.com/apache/flink/pull/10904#issuecomment-576004529
 
 
   
   ## CI report:
   
   * dbfc485ecea3aec8bc0054ca8bacb310b417c5dc Travis: 
[FAILURE](https://travis-ci.com/flink-ci/flink/builds/145110835) Azure: 
[FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4479)
 
   * 8637d733546f1863b30ecf78766961a4b4471482 Travis: 
[SUCCESS](https://travis-ci.com/flink-ci/flink/builds/145171018) Azure: 
[SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4500)
 
   * 202059d5e5c772b1b11ee50b9715b216b6ddeaad Travis: 
[SUCCESS](https://travis-ci.com/flink-ci/flink/builds/147163955) Azure: 
[FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4765)
 
   * 6d89ef84c8f0673d93a1048350b302e7d24d7484 Travis: 
[PENDING](https://travis-ci.com/flink-ci/flink/builds/149745708) Azure: 
[PENDING](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5335)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-15718) Update StatefulJobSavepointMigrationITCase to restore from 1.10 savepoint

2020-02-19 Thread vinoyang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15718?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17040607#comment-17040607
 ] 

vinoyang commented on FLINK-15718:
--

[~aljoscha] and [~chesnay] WDYT about 
{{LegacyStatefulJobSavepointMigrationITCase}}. We did not process the migration 
test for it after Flink 1.4.

> Update StatefulJobSavepointMigrationITCase to restore from 1.10 savepoint
> -
>
> Key: FLINK-15718
> URL: https://issues.apache.org/jira/browse/FLINK-15718
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.11.0
>Reporter: vinoyang
>Priority: Major
> Fix For: 1.11.0
>
>
> Update {{StatefulJobSavepointMigrationITCase}} to restore from 1.10 savepoint



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot edited a comment on issue #11144: [FLINK-15715] Update ContinuousFileProcessingMigrationTest to restore from 1.10 savepoint

2020-02-19 Thread GitBox
flinkbot edited a comment on issue #11144: [FLINK-15715] Update 
ContinuousFileProcessingMigrationTest to restore from 1.10 savepoint
URL: https://github.com/apache/flink/pull/11144#issuecomment-588270086
 
 
   
   ## CI report:
   
   * bafa0b96be75a242a6465becc11e82a1afa1de61 Travis: 
[PENDING](https://travis-ci.com/flink-ci/flink/builds/149646329) 
   * 66e240be3d3886212b32373a975e6f005a817d39 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode

2020-02-19 Thread GitBox
zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] 
Let Netty use Flink's buffers directly in credit-based mode
URL: https://github.com/apache/flink/pull/7368#discussion_r381714621
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessageClientDecoder.java
 ##
 @@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.netty;
+
+import org.apache.flink.runtime.io.network.NetworkClientHandler;
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter;
+
+import static 
org.apache.flink.runtime.io.network.netty.NettyMessage.FRAME_HEADER_LENGTH;
+import static 
org.apache.flink.runtime.io.network.netty.NettyMessage.MAGIC_NUMBER;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Decodes messages from the fragmentary netty buffers. This decoder assumes 
the
+ * messages have the following format:
+ * +---++
+ * | FRAME_HEADER ||  MESSAGE_HEADER   | DATA BUFFER (Optional) |
+ * +---++
+ *
+ * This decoder decodes the frame header and delegates the other following work
+ * to corresponding message parsers according to the message type. During the 
process
+ * of decoding, the decoder and parsers try best to eliminate copying. For the 
frame
+ * header and message header, it only cumulates data when they span multiple 
input buffers.
+ * For the buffer part, it copies directly to the input channels to avoid 
future copying.
+ *
+ * The format of the frame header is
+ * +--+--++
+ * | FRAME LENGTH (4) | MAGIC NUMBER (4) | ID (1) |
+ * +--+--++
+ */
+public class NettyMessageClientDecoder extends ChannelInboundHandlerAdapter {
+
+   /** The message parser for buffer response. */
+private final NettyMessageDecoderDelegate bufferResponseDecoderDelegate;
+
+/** The message parser for other messages other than buffer response. */
+   private final NettyMessageDecoderDelegate 
nonBufferResponseDecoderDelegate;
+
+   /** The cumulation buffer for the frame header part. */
+   private ByteBuf frameHeaderBuffer;
+
+   /**
+* The chosen message parser for the current message. If it is null, 
then
+* we are decoding the frame header part, otherwise we are decoding the 
actual
+* message.
+*/
+   private NettyMessageDecoderDelegate currentDecoderDelegate;
+
+NettyMessageClientDecoder(NetworkClientHandler networkClientHandler) {
+this.bufferResponseDecoderDelegate = new 
BufferResponseDecoderDelegate(networkClientHandler);
+this.nonBufferResponseDecoderDelegate = new 
NonBufferResponseDecoderDelegate();
+}
+
+@Override
+public void channelActive(ChannelHandlerContext ctx) throws Exception {
+super.channelActive(ctx);
+
+bufferResponseDecoderDelegate.onChannelActive(ctx.alloc());
+nonBufferResponseDecoderDelegate.onChannelActive(ctx.alloc());
+
+   frameHeaderBuffer = 
ctx.alloc().directBuffer(FRAME_HEADER_LENGTH);
+}
+
+@Override
+public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+super.channelInactive(ctx);
+
+   bufferResponseDecoderDelegate.release();
+   nonBufferResponseDecoderDelegate.release();
+
+   frameHeaderBuffer.release();
+}
+
+@Override
+public void channelRead(ChannelHandlerContext ctx, Object msg) throws 
Exception {
+if (!(msg instanceof ByteBuf)) {
+ctx.fireChannelRead(msg);
+return;
+}
+
+ByteBuf data = (ByteBuf) msg;
+
+try {
+while (data.isReadable()) {
+   if (currentDecoderDelegate == null) {
 
 Review comment:
   If we refer to `StreamTaskNetworkInput#emitNext`, it always judges the 

[GitHub] [flink] flinkbot edited a comment on issue #10904: [FLINK-15669] [sql client] fix SQL client can't cancel flink job

2020-02-19 Thread GitBox
flinkbot edited a comment on issue #10904: [FLINK-15669] [sql client] fix SQL 
client can't cancel flink job
URL: https://github.com/apache/flink/pull/10904#issuecomment-576004529
 
 
   
   ## CI report:
   
   * dbfc485ecea3aec8bc0054ca8bacb310b417c5dc Travis: 
[FAILURE](https://travis-ci.com/flink-ci/flink/builds/145110835) Azure: 
[FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4479)
 
   * 8637d733546f1863b30ecf78766961a4b4471482 Travis: 
[SUCCESS](https://travis-ci.com/flink-ci/flink/builds/145171018) Azure: 
[SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4500)
 
   * 202059d5e5c772b1b11ee50b9715b216b6ddeaad Travis: 
[SUCCESS](https://travis-ci.com/flink-ci/flink/builds/147163955) Azure: 
[FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4765)
 
   * 6d89ef84c8f0673d93a1048350b302e7d24d7484 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode

2020-02-19 Thread GitBox
zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] 
Let Netty use Flink's buffers directly in credit-based mode
URL: https://github.com/apache/flink/pull/7368#discussion_r381704846
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/BufferResponseDecoderDelegate.java
 ##
 @@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.netty;
+
+import org.apache.flink.runtime.io.network.NetworkClientHandler;
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufAllocator;
+
+/**
+ * The parser for {@link 
org.apache.flink.runtime.io.network.netty.NettyMessage.BufferResponse}.
+ */
+public class BufferResponseDecoderDelegate implements 
NettyMessageDecoderDelegate {
+
+   /** The network client handler of current channel. */
+   private final NetworkClientHandler networkClientHandler;
+
+   /** The Flink Buffer allocator. */
+   private final NetworkBufferAllocator allocator;
+
+   /** The cumulation buffer of message header. */
+   private ByteBuf messageHeaderCumulationBuffer;
+
+   /**
+* The current BufferResponse message that are process the buffer part.
+* If it is null, we are still processing the message header part, 
otherwise
+* we are processing the buffer part.
+*/
+   private NettyMessage.BufferResponse currentResponse;
+
+   /** How much bytes have been received or discarded for the buffer part. 
*/
+   private int decodedBytesOfBuffer;
+
+   public BufferResponseDecoderDelegate(NetworkClientHandler 
networkClientHandler) {
+   this.networkClientHandler = networkClientHandler;
+   this.allocator = new 
NetworkBufferAllocator(networkClientHandler);
+   }
+
+   @Override
+   public void onChannelActive(ByteBufAllocator alloc) {
+   messageHeaderCumulationBuffer = 
alloc.directBuffer(NettyMessage.BufferResponse.MESSAGE_HEADER_LENGTH);
+   }
+
+   @Override
+   public void startParsingMessage(int msgId, int messageLength) {
+   currentResponse = null;
+   decodedBytesOfBuffer = 0;
+
+   messageHeaderCumulationBuffer.clear();
+   }
+
+   @Override
+   public ParseResult onChannelRead(ByteBuf data) throws Exception {
+   if (currentResponse == null) {
+   ByteBuf toDecode = 
ByteBufUtils.cumulate(messageHeaderCumulationBuffer, data, 
NettyMessage.BufferResponse.MESSAGE_HEADER_LENGTH);
+
+   if (toDecode != null) {
+   currentResponse = 
NettyMessage.BufferResponse.readFrom(toDecode, allocator);
+
+   if (currentResponse.bufferSize == 0) {
 
 Review comment:
   Yes, this case really exists now. The previous handling of this case only 
happens in one place, but now it happens in multiple places which was my 
previous concern. Let me think whether can improve it a bit.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink-statefun] tzulitai closed pull request #26: [FLINK-16176] Use PersistedAppendingBuffer

2020-02-19 Thread GitBox
tzulitai closed pull request #26: [FLINK-16176] Use PersistedAppendingBuffer
URL: https://github.com/apache/flink-statefun/pull/26
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode

2020-02-19 Thread GitBox
zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] 
Let Netty use Flink's buffers directly in credit-based mode
URL: https://github.com/apache/flink/pull/7368#discussion_r381703666
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessageClientDecoder.java
 ##
 @@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.netty;
+
+import org.apache.flink.runtime.io.network.NetworkClientHandler;
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter;
+
+import static 
org.apache.flink.runtime.io.network.netty.NettyMessage.FRAME_HEADER_LENGTH;
+import static 
org.apache.flink.runtime.io.network.netty.NettyMessage.MAGIC_NUMBER;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Decodes messages from the fragmentary netty buffers. This decoder assumes 
the
+ * messages have the following format:
+ * +---++
+ * | FRAME_HEADER ||  MESSAGE_HEADER   | DATA BUFFER (Optional) |
+ * +---++
+ *
+ * This decoder decodes the frame header and delegates the other following work
+ * to corresponding message parsers according to the message type. During the 
process
+ * of decoding, the decoder and parsers try best to eliminate copying. For the 
frame
+ * header and message header, it only cumulates data when they span multiple 
input buffers.
+ * For the buffer part, it copies directly to the input channels to avoid 
future copying.
+ *
+ * The format of the frame header is
+ * +--+--++
+ * | FRAME LENGTH (4) | MAGIC NUMBER (4) | ID (1) |
+ * +--+--++
+ */
+public class NettyMessageClientDecoder extends ChannelInboundHandlerAdapter {
+
+   /** The message parser for buffer response. */
+private final NettyMessageDecoderDelegate bufferResponseDecoderDelegate;
+
+/** The message parser for other messages other than buffer response. */
+   private final NettyMessageDecoderDelegate 
nonBufferResponseDecoderDelegate;
+
+   /** The cumulation buffer for the frame header part. */
+   private ByteBuf frameHeaderBuffer;
+
+   /**
+* The chosen message parser for the current message. If it is null, 
then
+* we are decoding the frame header part, otherwise we are decoding the 
actual
+* message.
+*/
+   private NettyMessageDecoderDelegate currentDecoderDelegate;
+
+NettyMessageClientDecoder(NetworkClientHandler networkClientHandler) {
+this.bufferResponseDecoderDelegate = new 
BufferResponseDecoderDelegate(networkClientHandler);
+this.nonBufferResponseDecoderDelegate = new 
NonBufferResponseDecoderDelegate();
+}
+
+@Override
+public void channelActive(ChannelHandlerContext ctx) throws Exception {
+super.channelActive(ctx);
+
+bufferResponseDecoderDelegate.onChannelActive(ctx.alloc());
+nonBufferResponseDecoderDelegate.onChannelActive(ctx.alloc());
+
+   frameHeaderBuffer = 
ctx.alloc().directBuffer(FRAME_HEADER_LENGTH);
+}
+
+@Override
+public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+super.channelInactive(ctx);
+
+   bufferResponseDecoderDelegate.release();
+   nonBufferResponseDecoderDelegate.release();
+
+   frameHeaderBuffer.release();
+}
+
+@Override
+public void channelRead(ChannelHandlerContext ctx, Object msg) throws 
Exception {
+if (!(msg instanceof ByteBuf)) {
+ctx.fireChannelRead(msg);
 
 Review comment:
   It would be friendly for judging before type transformation and i guess it 
has no performance concern here.


This is an automated message from the Apache Git Service.
To respond to the message, please log on 

[GitHub] [flink] zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode

2020-02-19 Thread GitBox
zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] 
Let Netty use Flink's buffers directly in credit-based mode
URL: https://github.com/apache/flink/pull/7368#discussion_r381703878
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/BufferResponseDecoderDelegate.java
 ##
 @@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.netty;
+
+import org.apache.flink.runtime.io.network.NetworkClientHandler;
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufAllocator;
+
+/**
+ * The parser for {@link 
org.apache.flink.runtime.io.network.netty.NettyMessage.BufferResponse}.
+ */
+public class BufferResponseDecoderDelegate implements 
NettyMessageDecoderDelegate {
+
+   /** The network client handler of current channel. */
+   private final NetworkClientHandler networkClientHandler;
+
+   /** The Flink Buffer allocator. */
+   private final NetworkBufferAllocator allocator;
+
+   /** The cumulation buffer of message header. */
+   private ByteBuf messageHeaderCumulationBuffer;
+
+   /**
+* The current BufferResponse message that are process the buffer part.
+* If it is null, we are still processing the message header part, 
otherwise
+* we are processing the buffer part.
+*/
+   private NettyMessage.BufferResponse currentResponse;
+
+   /** How much bytes have been received or discarded for the buffer part. 
*/
+   private int decodedBytesOfBuffer;
+
+   public BufferResponseDecoderDelegate(NetworkClientHandler 
networkClientHandler) {
+   this.networkClientHandler = networkClientHandler;
+   this.allocator = new 
NetworkBufferAllocator(networkClientHandler);
+   }
+
+   @Override
+   public void onChannelActive(ByteBufAllocator alloc) {
+   messageHeaderCumulationBuffer = 
alloc.directBuffer(NettyMessage.BufferResponse.MESSAGE_HEADER_LENGTH);
+   }
+
+   @Override
+   public void startParsingMessage(int msgId, int messageLength) {
+   currentResponse = null;
+   decodedBytesOfBuffer = 0;
+
+   messageHeaderCumulationBuffer.clear();
+   }
+
+   @Override
+   public ParseResult onChannelRead(ByteBuf data) throws Exception {
+   if (currentResponse == null) {
+   ByteBuf toDecode = 
ByteBufUtils.cumulate(messageHeaderCumulationBuffer, data, 
NettyMessage.BufferResponse.MESSAGE_HEADER_LENGTH);
 
 Review comment:
   I guess it would be fine, let me double check the codes then.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #11144: [FLINK-15715] Update ContinuousFileProcessingMigrationTest to restore from 1.10 savepoint

2020-02-19 Thread GitBox
flinkbot edited a comment on issue #11144: [FLINK-15715] Update 
ContinuousFileProcessingMigrationTest to restore from 1.10 savepoint
URL: https://github.com/apache/flink/pull/11144#issuecomment-588270086
 
 
   
   ## CI report:
   
   * bafa0b96be75a242a6465becc11e82a1afa1de61 Travis: 
[PENDING](https://travis-ci.com/flink-ci/flink/builds/149646329) 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-16181) IfCallGen will throw NPE for primitive types in blink

2020-02-19 Thread Benchao Li (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16181?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17040595#comment-17040595
 ] 

Benchao Li commented on FLINK-16181:


[~lzljs3620320] Thanks for your confirmation, I'd like to fix this.

> IfCallGen will throw NPE for primitive types in blink
> -
>
> Key: FLINK-16181
> URL: https://issues.apache.org/jira/browse/FLINK-16181
> Project: Flink
>  Issue Type: Bug
>Reporter: Benchao Li
>Priority: Major
>
> It can be reproduced by a simple test case:
> Add below to {{ScalarOperatorsTest}}
> {code:java}
> testSqlApi("IF(true, CAST('non-numeric' AS BIGINT), 0)", "null")
> {code}
>  
> IMO, it's {{IfCallGen}}'s bug, which should judge the nullTerm of operands 
> first before assignment.
> cc [~lzljs3620320] [~jark]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode

2020-02-19 Thread GitBox
zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] 
Let Netty use Flink's buffers directly in credit-based mode
URL: https://github.com/apache/flink/pull/7368#discussion_r381696893
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessageDecoderDelegate.java
 ##
 @@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.netty;
+
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufAllocator;
+
+import javax.annotation.Nullable;
+
+/**
+ * Parsers for specified netty messages.
+ */
+public interface NettyMessageDecoderDelegate {
 
 Review comment:
   We might not consider the `NettyMessageServerDecoder` atm, because it is 
still named `NettyMessageDecoder` now. If we want to refactor the server name 
accordingly future, we can consider the consistency with the client side then.
   
   There is a bit difference between client and server. On server side we only 
have one explicit decoder in flink stack now, but on client side there actually 
exits two decoders explicitly which should be distinguished for better 
understanding.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] godfreyhe commented on issue #10904: [FLINK-15669] [sql client] fix SQL client can't cancel flink job

2020-02-19 Thread GitBox
godfreyhe commented on issue #10904: [FLINK-15669] [sql client] fix SQL client 
can't cancel flink job
URL: https://github.com/apache/flink/pull/10904#issuecomment-588584960
 
 
   Thanks for the suggestion @docete .
   Currently we can't get large result set based on `Accumulator`. so I think 
it's a temporary solution , we prepare to use 
[`Table#collect`](https://issues.apache.org/jira/browse/FLINK-14807) to solve 
this.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode

2020-02-19 Thread GitBox
zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] 
Let Netty use Flink's buffers directly in credit-based mode
URL: https://github.com/apache/flink/pull/7368#discussion_r381696893
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessageDecoderDelegate.java
 ##
 @@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.netty;
+
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufAllocator;
+
+import javax.annotation.Nullable;
+
+/**
+ * Parsers for specified netty messages.
+ */
+public interface NettyMessageDecoderDelegate {
 
 Review comment:
   We might not consider the `NettyMessageServerDecoder` atm, because it is 
still named `NettyMessageDecoder` now. If we want to refactor the server name 
accordingly future, we can consider the client side then.
   
   There is a bit difference between client and server. On server side we only 
have one explicit decoder in flink stack now, but on client side there actually 
exits two decoders explicitly which should be distinguished for better 
understanding.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-16181) IfCallGen will throw NPE for primitive types in blink

2020-02-19 Thread Jingsong Lee (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16181?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17040593#comment-17040593
 ] 

Jingsong Lee commented on FLINK-16181:
--

Thanks [~libenchao] for reporting.

You are right, it is a bug. We should deal with nullTerms. Do you want to fix 
this?

> IfCallGen will throw NPE for primitive types in blink
> -
>
> Key: FLINK-16181
> URL: https://issues.apache.org/jira/browse/FLINK-16181
> Project: Flink
>  Issue Type: Bug
>Reporter: Benchao Li
>Priority: Major
>
> It can be reproduced by a simple test case:
> Add below to {{ScalarOperatorsTest}}
> {code:java}
> testSqlApi("IF(true, CAST('non-numeric' AS BIGINT), 0)", "null")
> {code}
>  
> IMO, it's {{IfCallGen}}'s bug, which should judge the nullTerm of operands 
> first before assignment.
> cc [~lzljs3620320] [~jark]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot edited a comment on issue #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode

2020-02-19 Thread GitBox
flinkbot edited a comment on issue #7368: [FLINK-10742][network] Let Netty use 
Flink's buffers directly in credit-based mode
URL: https://github.com/apache/flink/pull/7368#issuecomment-567435407
 
 
   
   ## CI report:
   
   * 6b12b52b99894864db993a2fa8ab2bfcce0edd5c Travis: 
[FAILURE](https://travis-ci.com/flink-ci/flink/builds/141736780) 
   * 25fc5608d0d7143f4384a7648054e9c99ebb32e9 Travis: 
[SUCCESS](https://travis-ci.com/flink-ci/flink/builds/145174867) Azure: 
[SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4502)
 
   * 0bd82aa136c43af8fdfde0f2a64284323c40c999 Travis: 
[FAILURE](https://travis-ci.com/flink-ci/flink/builds/148123283) Azure: 
[FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4982)
 
   * 99f9c6528f53574829368b52de5fa6c3f0a41d44 Travis: 
[FAILURE](https://travis-ci.com/flink-ci/flink/builds/148132166) Azure: 
[FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4986)
 
   * e61197d1bb5a61a55051b193a01543edfa8a22b6 Travis: 
[FAILURE](https://travis-ci.com/flink-ci/flink/builds/148706320) Azure: 
[FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5117)
 
   * b1bc198a051be7c2eaace3f0c0601187ff0b98ed Travis: 
[SUCCESS](https://travis-ci.com/flink-ci/flink/builds/149319970) Azure: 
[SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5258)
 
   * 3b41ea0500078556f85db2a1a47386e02c62c5b5 Travis: 
[FAILURE](https://travis-ci.com/flink-ci/flink/builds/149736534) Azure: 
[FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5334)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-16181) IfCallGen will throw NPE for primitive types in blink

2020-02-19 Thread Benchao Li (Jira)
Benchao Li created FLINK-16181:
--

 Summary: IfCallGen will throw NPE for primitive types in blink
 Key: FLINK-16181
 URL: https://issues.apache.org/jira/browse/FLINK-16181
 Project: Flink
  Issue Type: Bug
Reporter: Benchao Li


It can be reproduced by a simple test case:

Add below to {{ScalarOperatorsTest}}
{code:java}
testSqlApi("IF(true, CAST('non-numeric' AS BIGINT), 0)", "null")
{code}
 

IMO, it's {{IfCallGen}}'s bug, which should judge the nullTerm of operands 
first before assignment.

cc [~lzljs3620320] [~jark]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] yanghua commented on issue #11144: [FLINK-15715] Update ContinuousFileProcessingMigrationTest to restore from 1.10 savepoint

2020-02-19 Thread GitBox
yanghua commented on issue #11144: [FLINK-15715] Update 
ContinuousFileProcessingMigrationTest to restore from 1.10 savepoint
URL: https://github.com/apache/flink/pull/11144#issuecomment-588578559
 
 
   @flinkbot run travis


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-16180) Replacing vertexExecution in ScheduledUnit with executionVertexID

2020-02-19 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16180?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu updated FLINK-16180:

Description: 
{{ScheduledUnit#vertexExecution}} is nullable but {{ProgrammedSlotProvider}} 
requires it to be non-null to work. This makes {{ProgrammedSlotProvider}} not 
able to be used by new scheduler tests since {{vertexExecution}} is never set 
in the new scheduler code path. It blocks us from reworking tests which are 
based legacy scheduling to base on the new scheduler.

I would propose to replace the nullable vertexExecution with a non-null 
executionVertexID.

This change would not break anything since {{vertexExecution}} is mainly used 
by {{ProgrammedSlotProvider}} for testing. {{ProgrammedSlotProvider}} uses it 
to retrieve the JobVertexID and subtaskIndex. 
The only other place where {{ScheduledUnit#vertexExecution}} is used is to log 
the involved task for slot allocation in 
{{SchedulerImpl#allocateSlotInternal(...)}}. The log is problematic at the 
moment with the new scheduler since the vertexExecution is null. This change 
can fix the problematic log.

This change would also fix a NPE issue reported in FLINK-16145.


  was:
{{ScheduledUnit#vertexExecution}} is nullable but {{ProgrammedSlotProvider}} 
requires it to be non-null to work. This makes {{ProgrammedSlotProvider}} not 
able to be used by new scheduler tests since {{vertexExecution}} is never set 
in the new scheduler code path. It blocks us from reworking tests which are 
based legacy scheduling to base on the new scheduler.

I would propose to replace vertexExecution with a non-null executionVertexID.

This change would not break anything since {{vertexExecution}} is mainly used 
by {{ProgrammedSlotProvider}} in tests and logging in production. And the log 
would be broken if it is null.

This would also fix a potential bug, as reported in FLINK-16145.



> Replacing vertexExecution in ScheduledUnit with executionVertexID
> -
>
> Key: FLINK-16180
> URL: https://issues.apache.org/jira/browse/FLINK-16180
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Affects Versions: 1.11.0
>Reporter: Zhu Zhu
>Assignee: Zhu Zhu
>Priority: Major
> Fix For: 1.11.0
>
>
> {{ScheduledUnit#vertexExecution}} is nullable but {{ProgrammedSlotProvider}} 
> requires it to be non-null to work. This makes {{ProgrammedSlotProvider}} not 
> able to be used by new scheduler tests since {{vertexExecution}} is never set 
> in the new scheduler code path. It blocks us from reworking tests which are 
> based legacy scheduling to base on the new scheduler.
> I would propose to replace the nullable vertexExecution with a non-null 
> executionVertexID.
> This change would not break anything since {{vertexExecution}} is mainly used 
> by {{ProgrammedSlotProvider}} for testing. {{ProgrammedSlotProvider}} uses it 
> to retrieve the JobVertexID and subtaskIndex. 
> The only other place where {{ScheduledUnit#vertexExecution}} is used is to 
> log the involved task for slot allocation in 
> {{SchedulerImpl#allocateSlotInternal(...)}}. The log is problematic at the 
> moment with the new scheduler since the vertexExecution is null. This change 
> can fix the problematic log.
> This change would also fix a NPE issue reported in FLINK-16145.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14807) Add Table#collect api for fetching data to client

2020-02-19 Thread godfrey he (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14807?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17040585#comment-17040585
 ] 

godfrey he commented on FLINK-14807:


Thanks for the propose, [~TsReaper].
About "What if the job restarts?"  part,  the sink can not guarantee the 
records's order, so the REST server can not just skip the results before the 
token. Maybe the REST server needs re-pull all data.

> Add Table#collect api for fetching data to client
> -
>
> Key: FLINK-14807
> URL: https://issues.apache.org/jira/browse/FLINK-14807
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / API
>Affects Versions: 1.9.1
>Reporter: Jeff Zhang
>Priority: Major
>  Labels: usability
> Fix For: 1.11.0
>
> Attachments: table-collect.png
>
>
> Currently, it is very unconvinient for user to fetch data of flink job unless 
> specify sink expclitly and then fetch data from this sink via its api (e.g. 
> write to hdfs sink, then read data from hdfs). However, most of time user 
> just want to get the data and do whatever processing he want. So it is very 
> necessary for flink to provide api Table#collect for this purpose. 
>  
> Other apis such as Table#head, Table#print is also helpful.  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink-statefun] tzulitai commented on a change in pull request #26: [FLINK-16176] Use PersistedAppendingBuffer

2020-02-19 Thread GitBox
tzulitai commented on a change in pull request #26: [FLINK-16176] Use 
PersistedAppendingBuffer
URL: https://github.com/apache/flink-statefun/pull/26#discussion_r381674329
 
 

 ##
 File path: 
statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunction.java
 ##
 @@ -221,11 +229,14 @@ private InvocationResponse unpackInvocationResultOrThrow(
   "Failure forwarding a message to a remote function " + self, 
asyncResult.throwable());
 }
 InputStream httpResponseBody = responseBody(asyncResult.value());
-FromFunction fromFunction = parseProtobufOrThrow(FromFunction.parser(), 
httpResponseBody);
-checkState(
-fromFunction.hasInvocationResult(),
-"The received HTTP payload does not contain an InvocationResult, but 
rather [%s]",
-fromFunction);
-return fromFunction.getInvocationResult();
+try {
 
 Review comment:
   @igalshilman The changes here seem unrelated to the ticket. Would be nice if 
this is at least a separate commit, as it is a bit confusing at first glance to 
see an unrelated change.
   Just a comment for the future, no need to change this now, I'll proceed to 
merge as is.
   
   Also as a side comment: this change is to make things easier for us to test 
the WIP remote http function, right? If yes, I think we deserve a JIRA to keep 
track of this.
   For instance, the method name `unpackInvocationResultOrThrow` no longer 
makes sense now given the change.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-16145) ScheduledUnit toString method throw NPE when Execution is null

2020-02-19 Thread Zhu Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16145?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17040582#comment-17040582
 ] 

Zhu Zhu commented on FLINK-16145:
-

Thanks for reporting this issue [~liuyufei]!
This is a bug since {{vertexExecution}} is nullable while {{toString()}} always 
invokes {{vertexExecution.getVertexWithAttempt()}}. It does not cause 
production problem though at the moment.
{{vertexExecution}} is also causing some other problems so I'm planning to 
replace it with an ExecutionVertexID (see FLINK-16180). That would also fix 
this potential issue.

> ScheduledUnit toString method throw NPE when Execution is null
> --
>
> Key: FLINK-16145
> URL: https://issues.apache.org/jira/browse/FLINK-16145
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.10.0
>Reporter: YufeiLiu
>Priority: Major
> Attachments: image-2020-02-18-19-58-38-506.png
>
>
>  !image-2020-02-18-19-58-38-506.png! 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-16180) Replacing vertexExecution in ScheduledUnit with executionVertexID

2020-02-19 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16180?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu updated FLINK-16180:

Parent: FLINK-15626
Issue Type: Sub-task  (was: Improvement)

> Replacing vertexExecution in ScheduledUnit with executionVertexID
> -
>
> Key: FLINK-16180
> URL: https://issues.apache.org/jira/browse/FLINK-16180
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Affects Versions: 1.11.0
>Reporter: Zhu Zhu
>Assignee: Zhu Zhu
>Priority: Major
> Fix For: 1.11.0
>
>
> {{ScheduledUnit#vertexExecution}} is nullable but {{ProgrammedSlotProvider}} 
> requires it to be non-null to work. This makes {{ProgrammedSlotProvider}} not 
> able to be used by new scheduler tests since {{vertexExecution}} is never set 
> in the new scheduler code path. It blocks us from reworking tests which are 
> based legacy scheduling to base on the new scheduler.
> I would propose to replace vertexExecution with a non-null executionVertexID.
> This change would not break anything since {{vertexExecution}} is mainly used 
> by {{ProgrammedSlotProvider}} in tests and logging in production. And the log 
> would be broken if it is null.
> This would also fix a potential bug, as reported in FLINK-16145.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink-statefun] tzulitai commented on a change in pull request #26: [FLINK-16176] Use PersistedAppendingBuffer

2020-02-19 Thread GitBox
tzulitai commented on a change in pull request #26: [FLINK-16176] Use 
PersistedAppendingBuffer
URL: https://github.com/apache/flink-statefun/pull/26#discussion_r381674686
 
 

 ##
 File path: 
statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunction.java
 ##
 @@ -112,13 +107,26 @@ private void onAsyncResult(
 InvocationResponse invocationResult =
 unpackInvocationResultOrThrow(context.self(), asyncResult);
 handleInvocationResponse(context, invocationResult);
-InvocationBatchRequest nextBatch = batch.get();
+InvocationBatchRequest.Builder nextBatch = getNextBatch();
 if (nextBatch == null) {
   hasInFlightRpc.clear();
   return;
 }
 batch.clear();
-sendToFunction(context, nextBatch.toBuilder());
+sendToFunction(context, nextBatch);
+  }
+
+  @Nullable
+  private InvocationBatchRequest.Builder getNextBatch() {
+@Nullable Iterable next = batch.view();
+if (next == null) {
+  return null;
+}
+InvocationBatchRequest.Builder builder = 
InvocationBatchRequest.newBuilder();
+for (Invocation invocation : next) {
 
 Review comment:
   I think this can be just `builder.addAllInvocations(nextBatch)`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink-statefun] tzulitai commented on a change in pull request #26: [FLINK-16176] Use PersistedAppendingBuffer

2020-02-19 Thread GitBox
tzulitai commented on a change in pull request #26: [FLINK-16176] Use 
PersistedAppendingBuffer
URL: https://github.com/apache/flink-statefun/pull/26#discussion_r381674329
 
 

 ##
 File path: 
statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunction.java
 ##
 @@ -221,11 +229,14 @@ private InvocationResponse unpackInvocationResultOrThrow(
   "Failure forwarding a message to a remote function " + self, 
asyncResult.throwable());
 }
 InputStream httpResponseBody = responseBody(asyncResult.value());
-FromFunction fromFunction = parseProtobufOrThrow(FromFunction.parser(), 
httpResponseBody);
-checkState(
-fromFunction.hasInvocationResult(),
-"The received HTTP payload does not contain an InvocationResult, but 
rather [%s]",
-fromFunction);
-return fromFunction.getInvocationResult();
+try {
 
 Review comment:
   @igalshilman The changes here seem unrelated to the ticket. Would be nice if 
this is at least a separate commit, as it is a bit confusing at first glance to 
see an unrelated change.
   Just a comment for the future, no need to change this now, I'll proceed to 
merge as is.
   
   Also as a side comment: this change is to make things easier for us to test 
the WIP remote http function, right? If yes, I think we deserve a JIRA to keep 
track of this.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-16180) Replacing vertexExecution in ScheduledUnit with executionVertexID

2020-02-19 Thread Zhu Zhu (Jira)
Zhu Zhu created FLINK-16180:
---

 Summary: Replacing vertexExecution in ScheduledUnit with 
executionVertexID
 Key: FLINK-16180
 URL: https://issues.apache.org/jira/browse/FLINK-16180
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Coordination
Affects Versions: 1.11.0
Reporter: Zhu Zhu
Assignee: Zhu Zhu
 Fix For: 1.11.0


{{ScheduledUnit#vertexExecution}} is nullable but {{ProgrammedSlotProvider}} 
requires it to be non-null to work. This makes {{ProgrammedSlotProvider}} not 
able to be used by new scheduler tests since {{vertexExecution}} is never set 
in the new scheduler code path. It blocks us from reworking tests which are 
based legacy scheduling to base on the new scheduler.

I would propose to replace vertexExecution with a non-null executionVertexID.

This change would not break anything since {{vertexExecution}} is mainly used 
by {{ProgrammedSlotProvider}} in tests and logging in production. And the log 
would be broken if it is null.

This would also fix a potential bug, as reported in FLINK-16145.




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] danny0405 commented on a change in pull request #11116: [FLINK-15349] add 'create catalog' DDL to blink planner

2020-02-19 Thread GitBox
danny0405 commented on a change in pull request #6: [FLINK-15349] add 
'create catalog' DDL to blink planner
URL: https://github.com/apache/flink/pull/6#discussion_r381670264
 
 

 ##
 File path: 
flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
 ##
 @@ -95,6 +95,21 @@ public void testUseCatalog() {
check("use catalog a", "USE CATALOG `A`");
}
 
+   @Test
+   public void testCreateCatalog() {
+   check(
+   "create catalog c1\n" +
+   " WITH (\n" +
 
 Review comment:
   But your syntax allows that.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-16179) Use configuration from TableFactory in hive connector

2020-02-19 Thread Jingsong Lee (Jira)
Jingsong Lee created FLINK-16179:


 Summary: Use configuration from TableFactory in hive connector
 Key: FLINK-16179
 URL: https://issues.apache.org/jira/browse/FLINK-16179
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / Hive
Reporter: Jingsong Lee
 Fix For: 1.11.0


Now {{HiveOptions}} is used for {{GlobalConfiguration.loadConfiguration()}} .

It is not natural for table, we should use configuration from TableFactory to 
enable table config.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] lirui-apache commented on a change in pull request #11116: [FLINK-15349] add 'create catalog' DDL to blink planner

2020-02-19 Thread GitBox
lirui-apache commented on a change in pull request #6: [FLINK-15349] add 
'create catalog' DDL to blink planner
URL: https://github.com/apache/flink/pull/6#discussion_r381654413
 
 

 ##
 File path: 
flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
 ##
 @@ -55,6 +55,30 @@ SqlUseCatalog SqlUseCatalog() :
 }
 }
 
+/**
+* Parses a create catalog statement.
+* CREATE CATALOG catalog_name [WITH (property_name=property_value, ...)];
+*/
+SqlCreate SqlCreateCatalog(Span s, boolean replace) :
+{
+SqlParserPos startPos;
+SqlIdentifier catalogName;
+SqlNodeList propertyList = SqlNodeList.EMPTY;
+}
+{
+ { startPos = getPos(); }
+catalogName = CompoundIdentifier()
 
 Review comment:
   Why do we support CompoundIdentifier for catalog names?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode

2020-02-19 Thread GitBox
flinkbot edited a comment on issue #7368: [FLINK-10742][network] Let Netty use 
Flink's buffers directly in credit-based mode
URL: https://github.com/apache/flink/pull/7368#issuecomment-567435407
 
 
   
   ## CI report:
   
   * 6b12b52b99894864db993a2fa8ab2bfcce0edd5c Travis: 
[FAILURE](https://travis-ci.com/flink-ci/flink/builds/141736780) 
   * 25fc5608d0d7143f4384a7648054e9c99ebb32e9 Travis: 
[SUCCESS](https://travis-ci.com/flink-ci/flink/builds/145174867) Azure: 
[SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4502)
 
   * 0bd82aa136c43af8fdfde0f2a64284323c40c999 Travis: 
[FAILURE](https://travis-ci.com/flink-ci/flink/builds/148123283) Azure: 
[FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4982)
 
   * 99f9c6528f53574829368b52de5fa6c3f0a41d44 Travis: 
[FAILURE](https://travis-ci.com/flink-ci/flink/builds/148132166) Azure: 
[FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4986)
 
   * e61197d1bb5a61a55051b193a01543edfa8a22b6 Travis: 
[FAILURE](https://travis-ci.com/flink-ci/flink/builds/148706320) Azure: 
[FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5117)
 
   * b1bc198a051be7c2eaace3f0c0601187ff0b98ed Travis: 
[SUCCESS](https://travis-ci.com/flink-ci/flink/builds/149319970) Azure: 
[SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5258)
 
   * 3b41ea0500078556f85db2a1a47386e02c62c5b5 Travis: 
[PENDING](https://travis-ci.com/flink-ci/flink/builds/149736534) Azure: 
[FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5334)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode

2020-02-19 Thread GitBox
flinkbot edited a comment on issue #7368: [FLINK-10742][network] Let Netty use 
Flink's buffers directly in credit-based mode
URL: https://github.com/apache/flink/pull/7368#issuecomment-567435407
 
 
   
   ## CI report:
   
   * 6b12b52b99894864db993a2fa8ab2bfcce0edd5c Travis: 
[FAILURE](https://travis-ci.com/flink-ci/flink/builds/141736780) 
   * 25fc5608d0d7143f4384a7648054e9c99ebb32e9 Travis: 
[SUCCESS](https://travis-ci.com/flink-ci/flink/builds/145174867) Azure: 
[SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4502)
 
   * 0bd82aa136c43af8fdfde0f2a64284323c40c999 Travis: 
[FAILURE](https://travis-ci.com/flink-ci/flink/builds/148123283) Azure: 
[FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4982)
 
   * 99f9c6528f53574829368b52de5fa6c3f0a41d44 Travis: 
[FAILURE](https://travis-ci.com/flink-ci/flink/builds/148132166) Azure: 
[FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4986)
 
   * e61197d1bb5a61a55051b193a01543edfa8a22b6 Travis: 
[FAILURE](https://travis-ci.com/flink-ci/flink/builds/148706320) Azure: 
[FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5117)
 
   * b1bc198a051be7c2eaace3f0c0601187ff0b98ed Travis: 
[SUCCESS](https://travis-ci.com/flink-ci/flink/builds/149319970) Azure: 
[SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5258)
 
   * 3b41ea0500078556f85db2a1a47386e02c62c5b5 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode

2020-02-19 Thread GitBox
gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] 
Let Netty use Flink's buffers directly in credit-based mode
URL: https://github.com/apache/flink/pull/7368#discussion_r381636671
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/BufferResponseDecoderDelegate.java
 ##
 @@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.netty;
+
+import org.apache.flink.runtime.io.network.NetworkClientHandler;
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufAllocator;
+
+/**
+ * The parser for {@link 
org.apache.flink.runtime.io.network.netty.NettyMessage.BufferResponse}.
+ */
+public class BufferResponseDecoderDelegate implements 
NettyMessageDecoderDelegate {
+
+   /** The network client handler of current channel. */
+   private final NetworkClientHandler networkClientHandler;
+
+   /** The Flink Buffer allocator. */
+   private final NetworkBufferAllocator allocator;
+
+   /** The cumulation buffer of message header. */
+   private ByteBuf messageHeaderCumulationBuffer;
+
+   /**
+* The current BufferResponse message that are process the buffer part.
+* If it is null, we are still processing the message header part, 
otherwise
+* we are processing the buffer part.
+*/
+   private NettyMessage.BufferResponse currentResponse;
+
+   /** How much bytes have been received or discarded for the buffer part. 
*/
+   private int decodedBytesOfBuffer;
+
+   public BufferResponseDecoderDelegate(NetworkClientHandler 
networkClientHandler) {
+   this.networkClientHandler = networkClientHandler;
+   this.allocator = new 
NetworkBufferAllocator(networkClientHandler);
+   }
+
+   @Override
+   public void onChannelActive(ByteBufAllocator alloc) {
+   messageHeaderCumulationBuffer = 
alloc.directBuffer(NettyMessage.BufferResponse.MESSAGE_HEADER_LENGTH);
+   }
+
+   @Override
+   public void startParsingMessage(int msgId, int messageLength) {
+   currentResponse = null;
+   decodedBytesOfBuffer = 0;
+
+   messageHeaderCumulationBuffer.clear();
+   }
+
+   @Override
+   public ParseResult onChannelRead(ByteBuf data) throws Exception {
+   if (currentResponse == null) {
+   ByteBuf toDecode = 
ByteBufUtils.cumulate(messageHeaderCumulationBuffer, data, 
NettyMessage.BufferResponse.MESSAGE_HEADER_LENGTH);
+
+   if (toDecode != null) {
+   currentResponse = 
NettyMessage.BufferResponse.readFrom(toDecode, allocator);
+
+   if (currentResponse.bufferSize == 0) {
 
 Review comment:
   Currently there is branch for `size = 0` cases. I think we could check if 
`size = 0` still occurs in separate issue and if there is no such condition, we 
might consider simplify the related process. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #7702: [FLINK-11088][Security][YARN] Allow YARN to discover pre-installed keytab files

2020-02-19 Thread GitBox
flinkbot edited a comment on issue #7702: [FLINK-11088][Security][YARN] Allow 
YARN to discover pre-installed keytab files
URL: https://github.com/apache/flink/pull/7702#issuecomment-572195960
 
 
   
   ## CI report:
   
   * 72dd07f5f10a56adf6025e82083af21ada47c711 Travis: 
[SUCCESS](https://travis-ci.com/flink-ci/flink/builds/143614040) Azure: 
[SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4198)
 
   * f2387288cb33f288164ed9d102b47868a93dc898 Travis: 
[FAILURE](https://travis-ci.com/flink-ci/flink/builds/148710964) Azure: 
[CANCELED](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5118)
 
   * 68496011f53df6a9933c2d8723d8f5995cc96111 Travis: 
[SUCCESS](https://travis-ci.com/flink-ci/flink/builds/149728083) Azure: 
[SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5333)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode

2020-02-19 Thread GitBox
gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] 
Let Netty use Flink's buffers directly in credit-based mode
URL: https://github.com/apache/flink/pull/7368#discussion_r381636153
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/BufferResponseDecoderDelegate.java
 ##
 @@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.netty;
+
+import org.apache.flink.runtime.io.network.NetworkClientHandler;
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufAllocator;
+
+/**
+ * The parser for {@link 
org.apache.flink.runtime.io.network.netty.NettyMessage.BufferResponse}.
+ */
+public class BufferResponseDecoderDelegate implements 
NettyMessageDecoderDelegate {
+
+   /** The network client handler of current channel. */
+   private final NetworkClientHandler networkClientHandler;
+
+   /** The Flink Buffer allocator. */
+   private final NetworkBufferAllocator allocator;
+
+   /** The cumulation buffer of message header. */
+   private ByteBuf messageHeaderCumulationBuffer;
+
+   /**
+* The current BufferResponse message that are process the buffer part.
+* If it is null, we are still processing the message header part, 
otherwise
+* we are processing the buffer part.
+*/
+   private NettyMessage.BufferResponse currentResponse;
+
+   /** How much bytes have been received or discarded for the buffer part. 
*/
+   private int decodedBytesOfBuffer;
+
+   public BufferResponseDecoderDelegate(NetworkClientHandler 
networkClientHandler) {
+   this.networkClientHandler = networkClientHandler;
+   this.allocator = new 
NetworkBufferAllocator(networkClientHandler);
+   }
+
+   @Override
+   public void onChannelActive(ByteBufAllocator alloc) {
+   messageHeaderCumulationBuffer = 
alloc.directBuffer(NettyMessage.BufferResponse.MESSAGE_HEADER_LENGTH);
+   }
+
+   @Override
+   public void startParsingMessage(int msgId, int messageLength) {
+   currentResponse = null;
+   decodedBytesOfBuffer = 0;
+
+   messageHeaderCumulationBuffer.clear();
+   }
+
+   @Override
+   public ParseResult onChannelRead(ByteBuf data) throws Exception {
+   if (currentResponse == null) {
+   ByteBuf toDecode = 
ByteBufUtils.cumulate(messageHeaderCumulationBuffer, data, 
NettyMessage.BufferResponse.MESSAGE_HEADER_LENGTH);
+
+   if (toDecode != null) {
+   currentResponse = 
NettyMessage.BufferResponse.readFrom(toDecode, allocator);
 
 Review comment:
   Have import `NettyMessage.BufferResponse` at the beginning. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode

2020-02-19 Thread GitBox
gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] 
Let Netty use Flink's buffers directly in credit-based mode
URL: https://github.com/apache/flink/pull/7368#discussion_r381636064
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/BufferResponseDecoderDelegate.java
 ##
 @@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.netty;
+
+import org.apache.flink.runtime.io.network.NetworkClientHandler;
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufAllocator;
+
+/**
+ * The parser for {@link 
org.apache.flink.runtime.io.network.netty.NettyMessage.BufferResponse}.
+ */
+public class BufferResponseDecoderDelegate implements 
NettyMessageDecoderDelegate {
+
+   /** The network client handler of current channel. */
+   private final NetworkClientHandler networkClientHandler;
+
+   /** The Flink Buffer allocator. */
+   private final NetworkBufferAllocator allocator;
+
+   /** The cumulation buffer of message header. */
+   private ByteBuf messageHeaderCumulationBuffer;
+
+   /**
+* The current BufferResponse message that are process the buffer part.
+* If it is null, we are still processing the message header part, 
otherwise
+* we are processing the buffer part.
+*/
+   private NettyMessage.BufferResponse currentResponse;
+
+   /** How much bytes have been received or discarded for the buffer part. 
*/
+   private int decodedBytesOfBuffer;
+
+   public BufferResponseDecoderDelegate(NetworkClientHandler 
networkClientHandler) {
+   this.networkClientHandler = networkClientHandler;
+   this.allocator = new 
NetworkBufferAllocator(networkClientHandler);
+   }
+
+   @Override
+   public void onChannelActive(ByteBufAllocator alloc) {
+   messageHeaderCumulationBuffer = 
alloc.directBuffer(NettyMessage.BufferResponse.MESSAGE_HEADER_LENGTH);
+   }
+
+   @Override
+   public void startParsingMessage(int msgId, int messageLength) {
+   currentResponse = null;
+   decodedBytesOfBuffer = 0;
+
+   messageHeaderCumulationBuffer.clear();
+   }
+
+   @Override
+   public ParseResult onChannelRead(ByteBuf data) throws Exception {
+   if (currentResponse == null) {
+   ByteBuf toDecode = 
ByteBufUtils.cumulate(messageHeaderCumulationBuffer, data, 
NettyMessage.BufferResponse.MESSAGE_HEADER_LENGTH);
 
 Review comment:
   Could this problem be solved if we import `NettyMessage.BufferResponse` in 
advance ? As the following comments suggested. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] KarmaGYZ commented on issue #11100: [FLINK-15562][docs] Add Example settings.xml for maven archetype command wh…

2020-02-19 Thread GitBox
KarmaGYZ commented on issue #11100: [FLINK-15562][docs] Add Example 
settings.xml for maven archetype command wh…
URL: https://github.com/apache/flink/pull/11100#issuecomment-588556496
 
 
   Thanks for the review and information @zentol . If it is against ASF rules. 
I agree to not give that advice. I'm not sure should we also remove the 
existing note.
   
   BTW, I'd like to keep those format refactor. WDYT?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode

2020-02-19 Thread GitBox
gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] 
Let Netty use Flink's buffers directly in credit-based mode
URL: https://github.com/apache/flink/pull/7368#discussion_r381633592
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/BufferResponseDecoderDelegate.java
 ##
 @@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.netty;
+
+import org.apache.flink.runtime.io.network.NetworkClientHandler;
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufAllocator;
+
+/**
+ * The parser for {@link 
org.apache.flink.runtime.io.network.netty.NettyMessage.BufferResponse}.
+ */
+public class BufferResponseDecoderDelegate implements 
NettyMessageDecoderDelegate {
+
+   /** The network client handler of current channel. */
+   private final NetworkClientHandler networkClientHandler;
+
+   /** The Flink Buffer allocator. */
+   private final NetworkBufferAllocator allocator;
+
+   /** The cumulation buffer of message header. */
+   private ByteBuf messageHeaderCumulationBuffer;
 
 Review comment:
   I think it might be better to rename to `messageHeaderBuffer` since in 
concept we call the second part `Message Header`. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode

2020-02-19 Thread GitBox
gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] 
Let Netty use Flink's buffers directly in credit-based mode
URL: https://github.com/apache/flink/pull/7368#discussion_r381632755
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessageClientDecoder.java
 ##
 @@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.netty;
+
+import org.apache.flink.runtime.io.network.NetworkClientHandler;
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter;
+
+import static 
org.apache.flink.runtime.io.network.netty.NettyMessage.FRAME_HEADER_LENGTH;
+import static 
org.apache.flink.runtime.io.network.netty.NettyMessage.MAGIC_NUMBER;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Decodes messages from the fragmentary netty buffers. This decoder assumes 
the
+ * messages have the following format:
+ * +---++
+ * | FRAME_HEADER ||  MESSAGE_HEADER   | DATA BUFFER (Optional) |
+ * +---++
+ *
+ * This decoder decodes the frame header and delegates the other following work
+ * to corresponding message parsers according to the message type. During the 
process
+ * of decoding, the decoder and parsers try best to eliminate copying. For the 
frame
+ * header and message header, it only cumulates data when they span multiple 
input buffers.
+ * For the buffer part, it copies directly to the input channels to avoid 
future copying.
+ *
+ * The format of the frame header is
+ * +--+--++
+ * | FRAME LENGTH (4) | MAGIC NUMBER (4) | ID (1) |
+ * +--+--++
+ */
+public class NettyMessageClientDecoder extends ChannelInboundHandlerAdapter {
+
+   /** The message parser for buffer response. */
+private final NettyMessageDecoderDelegate bufferResponseDecoderDelegate;
+
+/** The message parser for other messages other than buffer response. */
+   private final NettyMessageDecoderDelegate 
nonBufferResponseDecoderDelegate;
+
+   /** The cumulation buffer for the frame header part. */
+   private ByteBuf frameHeaderBuffer;
+
+   /**
+* The chosen message parser for the current message. If it is null, 
then
+* we are decoding the frame header part, otherwise we are decoding the 
actual
+* message.
+*/
+   private NettyMessageDecoderDelegate currentDecoderDelegate;
+
+NettyMessageClientDecoder(NetworkClientHandler networkClientHandler) {
+this.bufferResponseDecoderDelegate = new 
BufferResponseDecoderDelegate(networkClientHandler);
+this.nonBufferResponseDecoderDelegate = new 
NonBufferResponseDecoderDelegate();
+}
+
+@Override
+public void channelActive(ChannelHandlerContext ctx) throws Exception {
+super.channelActive(ctx);
+
+bufferResponseDecoderDelegate.onChannelActive(ctx.alloc());
+nonBufferResponseDecoderDelegate.onChannelActive(ctx.alloc());
+
+   frameHeaderBuffer = 
ctx.alloc().directBuffer(FRAME_HEADER_LENGTH);
+}
+
+@Override
+public void channelInactive(ChannelHandlerContext ctx) throws Exception {
 
 Review comment:
   I think it might be ok to not deal with `exceptionCaught` explicitly, since 
`channelInactive` is called when channel get closed, and in 
`CreditBasedPartitionClientHander`, it closes the channel in `exceptionCaught`, 
thus it will close the channel. Besides, I think we should always be able to 
close the channel, otherwise we should have channel leakage problem. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:

[GitHub] [flink] gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode

2020-02-19 Thread GitBox
gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] 
Let Netty use Flink's buffers directly in credit-based mode
URL: https://github.com/apache/flink/pull/7368#discussion_r381624866
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessageDecoderDelegate.java
 ##
 @@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.netty;
+
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufAllocator;
+
+import javax.annotation.Nullable;
+
+/**
+ * Parsers for specified netty messages.
+ */
+public interface NettyMessageDecoderDelegate {
+
+   /**
+* The result of message parsing with the provided data.
+*/
+   class ParseResult {
+   private final static ParseResult NOT_FINISHED = new 
ParseResult(false, null);
+
+   static ParseResult notFinished() {
+   return NOT_FINISHED;
+   }
+
+   static ParseResult finishedWith(@Nullable NettyMessage message) 
{
+   return new ParseResult(true, message);
+   }
+
+   final boolean finished;
+
+   @Nullable
+   final NettyMessage message;
+
+   private ParseResult(boolean finished, NettyMessage message) {
+   this.finished = finished;
+   this.message = message;
+   }
+   }
+
+   /**
+* Notifies the underlying channel become active.
+*
+* @param alloc The netty buffer allocator.
+*/
+   void onChannelActive(ByteBufAllocator alloc);
 
 Review comment:
   Have modified accordingly. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode

2020-02-19 Thread GitBox
gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] 
Let Netty use Flink's buffers directly in credit-based mode
URL: https://github.com/apache/flink/pull/7368#discussion_r381624725
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessageDecoderDelegate.java
 ##
 @@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.netty;
+
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufAllocator;
+
+import javax.annotation.Nullable;
+
+/**
+ * Parsers for specified netty messages.
+ */
+public interface NettyMessageDecoderDelegate {
 
 Review comment:
   I'm slightly hesitate to rename the classes since we also have 
`NettyMessageServerDecoder`, and if we change the name of client-side decoder 
to `NettyMessageDecoderDelegate`, it seems to be some kind of inconsistency. 
   
   I think we might change the DecoderDelegate to something else?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode

2020-02-19 Thread GitBox
gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] 
Let Netty use Flink's buffers directly in credit-based mode
URL: https://github.com/apache/flink/pull/7368#discussion_r381624091
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessageDecoderDelegate.java
 ##
 @@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.netty;
+
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufAllocator;
+
+import javax.annotation.Nullable;
+
+/**
+ * Parsers for specified netty messages.
+ */
+public interface NettyMessageDecoderDelegate {
 
 Review comment:
   Have modified accordingly.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode

2020-02-19 Thread GitBox
gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] 
Let Netty use Flink's buffers directly in credit-based mode
URL: https://github.com/apache/flink/pull/7368#discussion_r381623946
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessageClientDecoder.java
 ##
 @@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.netty;
+
+import org.apache.flink.runtime.io.network.NetworkClientHandler;
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter;
+
+import static 
org.apache.flink.runtime.io.network.netty.NettyMessage.FRAME_HEADER_LENGTH;
+import static 
org.apache.flink.runtime.io.network.netty.NettyMessage.MAGIC_NUMBER;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Decodes messages from the fragmentary netty buffers. This decoder assumes 
the
+ * messages have the following format:
+ * +---++
+ * | FRAME_HEADER ||  MESSAGE_HEADER   | DATA BUFFER (Optional) |
+ * +---++
+ *
+ * This decoder decodes the frame header and delegates the other following work
+ * to corresponding message parsers according to the message type. During the 
process
+ * of decoding, the decoder and parsers try best to eliminate copying. For the 
frame
+ * header and message header, it only cumulates data when they span multiple 
input buffers.
+ * For the buffer part, it copies directly to the input channels to avoid 
future copying.
+ *
+ * The format of the frame header is
+ * +--+--++
+ * | FRAME LENGTH (4) | MAGIC NUMBER (4) | ID (1) |
+ * +--+--++
+ */
+public class NettyMessageClientDecoder extends ChannelInboundHandlerAdapter {
+
+   /** The message parser for buffer response. */
+private final NettyMessageDecoderDelegate bufferResponseDecoderDelegate;
+
+/** The message parser for other messages other than buffer response. */
+   private final NettyMessageDecoderDelegate 
nonBufferResponseDecoderDelegate;
+
+   /** The cumulation buffer for the frame header part. */
+   private ByteBuf frameHeaderBuffer;
+
+   /**
+* The chosen message parser for the current message. If it is null, 
then
+* we are decoding the frame header part, otherwise we are decoding the 
actual
+* message.
+*/
+   private NettyMessageDecoderDelegate currentDecoderDelegate;
+
+NettyMessageClientDecoder(NetworkClientHandler networkClientHandler) {
+this.bufferResponseDecoderDelegate = new 
BufferResponseDecoderDelegate(networkClientHandler);
+this.nonBufferResponseDecoderDelegate = new 
NonBufferResponseDecoderDelegate();
+}
+
+@Override
+public void channelActive(ChannelHandlerContext ctx) throws Exception {
+super.channelActive(ctx);
+
+bufferResponseDecoderDelegate.onChannelActive(ctx.alloc());
+nonBufferResponseDecoderDelegate.onChannelActive(ctx.alloc());
+
+   frameHeaderBuffer = 
ctx.alloc().directBuffer(FRAME_HEADER_LENGTH);
+}
+
+@Override
+public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+super.channelInactive(ctx);
+
+   bufferResponseDecoderDelegate.release();
+   nonBufferResponseDecoderDelegate.release();
+
+   frameHeaderBuffer.release();
+}
+
+@Override
+public void channelRead(ChannelHandlerContext ctx, Object msg) throws 
Exception {
+if (!(msg instanceof ByteBuf)) {
+ctx.fireChannelRead(msg);
+return;
+}
+
+ByteBuf data = (ByteBuf) msg;
+
+try {
+while (data.isReadable()) {
+   if (currentDecoderDelegate == null) {
 
 Review comment:
   Have moved the logic of decoding frame header into separate method. 
   

[GitHub] [flink] gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode

2020-02-19 Thread GitBox
gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] 
Let Netty use Flink's buffers directly in credit-based mode
URL: https://github.com/apache/flink/pull/7368#discussion_r381623946
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessageClientDecoder.java
 ##
 @@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.netty;
+
+import org.apache.flink.runtime.io.network.NetworkClientHandler;
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter;
+
+import static 
org.apache.flink.runtime.io.network.netty.NettyMessage.FRAME_HEADER_LENGTH;
+import static 
org.apache.flink.runtime.io.network.netty.NettyMessage.MAGIC_NUMBER;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Decodes messages from the fragmentary netty buffers. This decoder assumes 
the
+ * messages have the following format:
+ * +---++
+ * | FRAME_HEADER ||  MESSAGE_HEADER   | DATA BUFFER (Optional) |
+ * +---++
+ *
+ * This decoder decodes the frame header and delegates the other following work
+ * to corresponding message parsers according to the message type. During the 
process
+ * of decoding, the decoder and parsers try best to eliminate copying. For the 
frame
+ * header and message header, it only cumulates data when they span multiple 
input buffers.
+ * For the buffer part, it copies directly to the input channels to avoid 
future copying.
+ *
+ * The format of the frame header is
+ * +--+--++
+ * | FRAME LENGTH (4) | MAGIC NUMBER (4) | ID (1) |
+ * +--+--++
+ */
+public class NettyMessageClientDecoder extends ChannelInboundHandlerAdapter {
+
+   /** The message parser for buffer response. */
+private final NettyMessageDecoderDelegate bufferResponseDecoderDelegate;
+
+/** The message parser for other messages other than buffer response. */
+   private final NettyMessageDecoderDelegate 
nonBufferResponseDecoderDelegate;
+
+   /** The cumulation buffer for the frame header part. */
+   private ByteBuf frameHeaderBuffer;
+
+   /**
+* The chosen message parser for the current message. If it is null, 
then
+* we are decoding the frame header part, otherwise we are decoding the 
actual
+* message.
+*/
+   private NettyMessageDecoderDelegate currentDecoderDelegate;
+
+NettyMessageClientDecoder(NetworkClientHandler networkClientHandler) {
+this.bufferResponseDecoderDelegate = new 
BufferResponseDecoderDelegate(networkClientHandler);
+this.nonBufferResponseDecoderDelegate = new 
NonBufferResponseDecoderDelegate();
+}
+
+@Override
+public void channelActive(ChannelHandlerContext ctx) throws Exception {
+super.channelActive(ctx);
+
+bufferResponseDecoderDelegate.onChannelActive(ctx.alloc());
+nonBufferResponseDecoderDelegate.onChannelActive(ctx.alloc());
+
+   frameHeaderBuffer = 
ctx.alloc().directBuffer(FRAME_HEADER_LENGTH);
+}
+
+@Override
+public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+super.channelInactive(ctx);
+
+   bufferResponseDecoderDelegate.release();
+   nonBufferResponseDecoderDelegate.release();
+
+   frameHeaderBuffer.release();
+}
+
+@Override
+public void channelRead(ChannelHandlerContext ctx, Object msg) throws 
Exception {
+if (!(msg instanceof ByteBuf)) {
+ctx.fireChannelRead(msg);
+return;
+}
+
+ByteBuf data = (ByteBuf) msg;
+
+try {
+while (data.isReadable()) {
+   if (currentDecoderDelegate == null) {
 
 Review comment:
   Have moved the logic of decoding frame header into separate method. 
   

[GitHub] [flink] gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode

2020-02-19 Thread GitBox
gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] 
Let Netty use Flink's buffers directly in credit-based mode
URL: https://github.com/apache/flink/pull/7368#discussion_r381623313
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessageClientDecoder.java
 ##
 @@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.netty;
+
+import org.apache.flink.runtime.io.network.NetworkClientHandler;
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter;
+
+import static 
org.apache.flink.runtime.io.network.netty.NettyMessage.FRAME_HEADER_LENGTH;
+import static 
org.apache.flink.runtime.io.network.netty.NettyMessage.MAGIC_NUMBER;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Decodes messages from the fragmentary netty buffers. This decoder assumes 
the
+ * messages have the following format:
+ * +---++
+ * | FRAME_HEADER ||  MESSAGE_HEADER   | DATA BUFFER (Optional) |
+ * +---++
+ *
+ * This decoder decodes the frame header and delegates the other following work
+ * to corresponding message parsers according to the message type. During the 
process
+ * of decoding, the decoder and parsers try best to eliminate copying. For the 
frame
+ * header and message header, it only cumulates data when they span multiple 
input buffers.
+ * For the buffer part, it copies directly to the input channels to avoid 
future copying.
+ *
+ * The format of the frame header is
+ * +--+--++
+ * | FRAME LENGTH (4) | MAGIC NUMBER (4) | ID (1) |
+ * +--+--++
+ */
+public class NettyMessageClientDecoder extends ChannelInboundHandlerAdapter {
+
+   /** The message parser for buffer response. */
+private final NettyMessageDecoderDelegate bufferResponseDecoderDelegate;
+
+/** The message parser for other messages other than buffer response. */
+   private final NettyMessageDecoderDelegate 
nonBufferResponseDecoderDelegate;
+
+   /** The cumulation buffer for the frame header part. */
+   private ByteBuf frameHeaderBuffer;
+
+   /**
+* The chosen message parser for the current message. If it is null, 
then
+* we are decoding the frame header part, otherwise we are decoding the 
actual
+* message.
+*/
+   private NettyMessageDecoderDelegate currentDecoderDelegate;
+
+NettyMessageClientDecoder(NetworkClientHandler networkClientHandler) {
+this.bufferResponseDecoderDelegate = new 
BufferResponseDecoderDelegate(networkClientHandler);
+this.nonBufferResponseDecoderDelegate = new 
NonBufferResponseDecoderDelegate();
+}
+
+@Override
+public void channelActive(ChannelHandlerContext ctx) throws Exception {
+super.channelActive(ctx);
+
+bufferResponseDecoderDelegate.onChannelActive(ctx.alloc());
+nonBufferResponseDecoderDelegate.onChannelActive(ctx.alloc());
+
+   frameHeaderBuffer = 
ctx.alloc().directBuffer(FRAME_HEADER_LENGTH);
+}
+
+@Override
+public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+super.channelInactive(ctx);
+
+   bufferResponseDecoderDelegate.release();
+   nonBufferResponseDecoderDelegate.release();
+
+   frameHeaderBuffer.release();
+}
+
+@Override
+public void channelRead(ChannelHandlerContext ctx, Object msg) throws 
Exception {
+if (!(msg instanceof ByteBuf)) {
+ctx.fireChannelRead(msg);
 
 Review comment:
   Agree with that and removed the `firChannelRead` for objects other than 
`ByteBuf`. Besides, I think we could omit the explicit checking here for 
performance? It will throw exception when casting the `Object` to `ByteBuf`.


[jira] [Commented] (FLINK-16005) Propagate yarn.application.classpath from client to TaskManager Classpath

2020-02-19 Thread Zhenqiu Huang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16005?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17040528#comment-17040528
 ] 

Zhenqiu Huang commented on FLINK-16005:
---

[~trohrmann]
I feel specifying the yarn config key directly in the configuration and do not 
remove them is cleaner for users. I will create a PR with this solution.

> Propagate yarn.application.classpath from client to TaskManager Classpath
> -
>
> Key: FLINK-16005
> URL: https://issues.apache.org/jira/browse/FLINK-16005
> Project: Flink
>  Issue Type: Improvement
>  Components: Deployment / YARN
>Reporter: Zhenqiu Huang
>Priority: Major
>
> When Flink users want to override the hadoop yarn container classpath, they 
> should just specify the yarn.application.classpath in yarn-site.xml from cli 
> side. But currently, the classpath setting can only be used in flink 
> application master, the classpath of TM is still determined by the setting in 
> yarn host.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot edited a comment on issue #7702: [FLINK-11088][Security][YARN] Allow YARN to discover pre-installed keytab files

2020-02-19 Thread GitBox
flinkbot edited a comment on issue #7702: [FLINK-11088][Security][YARN] Allow 
YARN to discover pre-installed keytab files
URL: https://github.com/apache/flink/pull/7702#issuecomment-572195960
 
 
   
   ## CI report:
   
   * 72dd07f5f10a56adf6025e82083af21ada47c711 Travis: 
[SUCCESS](https://travis-ci.com/flink-ci/flink/builds/143614040) Azure: 
[SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4198)
 
   * f2387288cb33f288164ed9d102b47868a93dc898 Travis: 
[FAILURE](https://travis-ci.com/flink-ci/flink/builds/148710964) Azure: 
[CANCELED](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5118)
 
   * 68496011f53df6a9933c2d8723d8f5995cc96111 Travis: 
[PENDING](https://travis-ci.com/flink-ci/flink/builds/149728083) 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #7702: [FLINK-11088][Security][YARN] Allow YARN to discover pre-installed keytab files

2020-02-19 Thread GitBox
flinkbot edited a comment on issue #7702: [FLINK-11088][Security][YARN] Allow 
YARN to discover pre-installed keytab files
URL: https://github.com/apache/flink/pull/7702#issuecomment-572195960
 
 
   
   ## CI report:
   
   * 72dd07f5f10a56adf6025e82083af21ada47c711 Travis: 
[SUCCESS](https://travis-ci.com/flink-ci/flink/builds/143614040) Azure: 
[SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4198)
 
   * f2387288cb33f288164ed9d102b47868a93dc898 Travis: 
[FAILURE](https://travis-ci.com/flink-ci/flink/builds/148710964) Azure: 
[CANCELED](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5118)
 
   * 68496011f53df6a9933c2d8723d8f5995cc96111 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #9703: [FLINK-14038]Add default GC options for flink on yarn to facilitate debugging

2020-02-19 Thread GitBox
flinkbot edited a comment on issue #9703: [FLINK-14038]Add default GC options 
for flink on yarn to facilitate debugging
URL: https://github.com/apache/flink/pull/9703#issuecomment-532581942
 
 
   
   ## CI report:
   
   * 1b930d19f27909ad5e2759eb6c5471c2ce07e8b4 Travis: 
[FAILURE](https://travis-ci.com/flink-ci/flink/builds/128133485) 
   * 977ccb5d91869e37027069d8b2b490bf850253ed Travis: 
[CANCELED](https://travis-ci.com/flink-ci/flink/builds/129659424) 
   * 8347093d4cb32ed752bc01f5cd98abb2d803df94 Travis: 
[FAILURE](https://travis-ci.com/flink-ci/flink/builds/130842273) 
   * 796de65585c861a67c46ba8c578e08302ade2cdc Travis: 
[FAILURE](https://travis-ci.com/flink-ci/flink/builds/133371242) 
   * 5817aa535fb834889eebb96478b7a40f936fb3c3 Travis: 
[FAILURE](https://travis-ci.com/flink-ci/flink/builds/134200223) 
   * 040b9878337aa7b919f16d2cfb1c9bc590b31a7e Travis: 
[FAILURE](https://travis-ci.com/flink-ci/flink/builds/134418855) 
   * 32120e401687204ef737ebe01875e293be71d720 Travis: 
[FAILURE](https://travis-ci.com/flink-ci/flink/builds/134550254) 
   * bb787cffdb56629b880c50edbf368fa81f11db58 Travis: 
[FAILURE](https://travis-ci.com/flink-ci/flink/builds/134797060) 
   * 0cd39929e456b8ab0a1d1c20dc0b05b29d92d8b0 Travis: 
[FAILURE](https://travis-ci.com/flink-ci/flink/builds/135018920) 
   * 560c6f43ae1b26dd81b360d5f32207c459824def Travis: 
[FAILURE](https://travis-ci.com/flink-ci/flink/builds/135182722) 
   * d50ce6eec61b0825c8c3df7e3a738390f69c14f5 Travis: 
[FAILURE](https://travis-ci.com/flink-ci/flink/builds/135279137) 
   * 0877d8ecdaf16b2529efc011de6585570ffa741c Travis: 
[FAILURE](https://travis-ci.com/flink-ci/flink/builds/135302579) 
   * b64ed1deecad145deeeb05aa23b1bff77af9f1ed Travis: 
[FAILURE](https://travis-ci.com/flink-ci/flink/builds/137343176) 
   * 290d9f7498478e04751d099eae7295ab99c3ddc5 Travis: 
[FAILURE](https://travis-ci.com/flink-ci/flink/builds/149686285) Azure: 
[FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5330)
 
   * 40213c1684a264def3e489df1bd264fb8ad1871a Travis: 
[FAILURE](https://travis-ci.com/flink-ci/flink/builds/149715359) Azure: 
[FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5331)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #9703: [FLINK-14038]Add default GC options for flink on yarn to facilitate debugging

2020-02-19 Thread GitBox
flinkbot edited a comment on issue #9703: [FLINK-14038]Add default GC options 
for flink on yarn to facilitate debugging
URL: https://github.com/apache/flink/pull/9703#issuecomment-532581942
 
 
   
   ## CI report:
   
   * 1b930d19f27909ad5e2759eb6c5471c2ce07e8b4 Travis: 
[FAILURE](https://travis-ci.com/flink-ci/flink/builds/128133485) 
   * 977ccb5d91869e37027069d8b2b490bf850253ed Travis: 
[CANCELED](https://travis-ci.com/flink-ci/flink/builds/129659424) 
   * 8347093d4cb32ed752bc01f5cd98abb2d803df94 Travis: 
[FAILURE](https://travis-ci.com/flink-ci/flink/builds/130842273) 
   * 796de65585c861a67c46ba8c578e08302ade2cdc Travis: 
[FAILURE](https://travis-ci.com/flink-ci/flink/builds/133371242) 
   * 5817aa535fb834889eebb96478b7a40f936fb3c3 Travis: 
[FAILURE](https://travis-ci.com/flink-ci/flink/builds/134200223) 
   * 040b9878337aa7b919f16d2cfb1c9bc590b31a7e Travis: 
[FAILURE](https://travis-ci.com/flink-ci/flink/builds/134418855) 
   * 32120e401687204ef737ebe01875e293be71d720 Travis: 
[FAILURE](https://travis-ci.com/flink-ci/flink/builds/134550254) 
   * bb787cffdb56629b880c50edbf368fa81f11db58 Travis: 
[FAILURE](https://travis-ci.com/flink-ci/flink/builds/134797060) 
   * 0cd39929e456b8ab0a1d1c20dc0b05b29d92d8b0 Travis: 
[FAILURE](https://travis-ci.com/flink-ci/flink/builds/135018920) 
   * 560c6f43ae1b26dd81b360d5f32207c459824def Travis: 
[FAILURE](https://travis-ci.com/flink-ci/flink/builds/135182722) 
   * d50ce6eec61b0825c8c3df7e3a738390f69c14f5 Travis: 
[FAILURE](https://travis-ci.com/flink-ci/flink/builds/135279137) 
   * 0877d8ecdaf16b2529efc011de6585570ffa741c Travis: 
[FAILURE](https://travis-ci.com/flink-ci/flink/builds/135302579) 
   * b64ed1deecad145deeeb05aa23b1bff77af9f1ed Travis: 
[FAILURE](https://travis-ci.com/flink-ci/flink/builds/137343176) 
   * 290d9f7498478e04751d099eae7295ab99c3ddc5 Travis: 
[FAILURE](https://travis-ci.com/flink-ci/flink/builds/149686285) Azure: 
[FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5330)
 
   * 40213c1684a264def3e489df1bd264fb8ad1871a Travis: 
[PENDING](https://travis-ci.com/flink-ci/flink/builds/149715359) 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #9703: [FLINK-14038]Add default GC options for flink on yarn to facilitate debugging

2020-02-19 Thread GitBox
flinkbot edited a comment on issue #9703: [FLINK-14038]Add default GC options 
for flink on yarn to facilitate debugging
URL: https://github.com/apache/flink/pull/9703#issuecomment-532581942
 
 
   
   ## CI report:
   
   * 1b930d19f27909ad5e2759eb6c5471c2ce07e8b4 Travis: 
[FAILURE](https://travis-ci.com/flink-ci/flink/builds/128133485) 
   * 977ccb5d91869e37027069d8b2b490bf850253ed Travis: 
[CANCELED](https://travis-ci.com/flink-ci/flink/builds/129659424) 
   * 8347093d4cb32ed752bc01f5cd98abb2d803df94 Travis: 
[FAILURE](https://travis-ci.com/flink-ci/flink/builds/130842273) 
   * 796de65585c861a67c46ba8c578e08302ade2cdc Travis: 
[FAILURE](https://travis-ci.com/flink-ci/flink/builds/133371242) 
   * 5817aa535fb834889eebb96478b7a40f936fb3c3 Travis: 
[FAILURE](https://travis-ci.com/flink-ci/flink/builds/134200223) 
   * 040b9878337aa7b919f16d2cfb1c9bc590b31a7e Travis: 
[FAILURE](https://travis-ci.com/flink-ci/flink/builds/134418855) 
   * 32120e401687204ef737ebe01875e293be71d720 Travis: 
[FAILURE](https://travis-ci.com/flink-ci/flink/builds/134550254) 
   * bb787cffdb56629b880c50edbf368fa81f11db58 Travis: 
[FAILURE](https://travis-ci.com/flink-ci/flink/builds/134797060) 
   * 0cd39929e456b8ab0a1d1c20dc0b05b29d92d8b0 Travis: 
[FAILURE](https://travis-ci.com/flink-ci/flink/builds/135018920) 
   * 560c6f43ae1b26dd81b360d5f32207c459824def Travis: 
[FAILURE](https://travis-ci.com/flink-ci/flink/builds/135182722) 
   * d50ce6eec61b0825c8c3df7e3a738390f69c14f5 Travis: 
[FAILURE](https://travis-ci.com/flink-ci/flink/builds/135279137) 
   * 0877d8ecdaf16b2529efc011de6585570ffa741c Travis: 
[FAILURE](https://travis-ci.com/flink-ci/flink/builds/135302579) 
   * b64ed1deecad145deeeb05aa23b1bff77af9f1ed Travis: 
[FAILURE](https://travis-ci.com/flink-ci/flink/builds/137343176) 
   * 290d9f7498478e04751d099eae7295ab99c3ddc5 Travis: 
[FAILURE](https://travis-ci.com/flink-ci/flink/builds/149686285) Azure: 
[FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5330)
 
   * 40213c1684a264def3e489df1bd264fb8ad1871a UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


  1   2   3   4   5   >