[GitHub] [flink] flinkbot commented on issue #11147: [FLINK-15717] Update StatefulJobSavepointMigrationITCase.scala to restore from 1.10 savepoint
flinkbot commented on issue #11147: [FLINK-15717] Update StatefulJobSavepointMigrationITCase.scala to restore from 1.10 savepoint URL: https://github.com/apache/flink/pull/11147#issuecomment-588710322 ## CI report: * 8127feb053c5d657b8453a7ef02c65da627d037c UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot commented on issue #11148: [FLINK-16180][runtime] Replace the nullable vertexExecution in ScheduledUnit with a non-null executionVertexId
flinkbot commented on issue #11148: [FLINK-16180][runtime] Replace the nullable vertexExecution in ScheduledUnit with a non-null executionVertexId URL: https://github.com/apache/flink/pull/11148#issuecomment-588710480 ## CI report: * 9cf340d95807d82066316548ca6e503f8fd731e8 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink-statefun] tzulitai opened a new pull request #28: [FLINK-16159] [tests, build] Add verification integration test + integrate with Maven build
tzulitai opened a new pull request #28: [FLINK-16159] [tests, build] Add verification integration test + integrate with Maven build URL: https://github.com/apache/flink-statefun/pull/28 This PR achieves the following goals: - Adds a `statefun-integration-tests` module that is meant to maintain integration tests in the future. - Adds `statefun-sanity-itcase` to the integration tests module, which includes a simple application used for sanity verification and a JUnit-driven test that performs the verification over a docker setup. - Adds a Maven build profile so that the integration tests can be run with a simple command: `mvn clean verify -Prun-integration-tests` --- ### The verification app and test The verification app (`SanityVerificationModule`) is a simple application used for sanity verification. - The application reads commands from a Kafka ingress - Has multiple functions binded (currently 2 in this PR) that reacts to the commands (see class-level Javadoc) of `SanityVerificationModule` for a full description on the set of commands) - Reflects any state updates in the functions back to a Kafka egress. The Junit-driven test does the following: - Uses Testcontainers (https://www.testcontainers.org/) to start a Kafka broker, builds the image for the verification app on the fly and also starts a master + worker container for the app. - Writes some commands to the Kafka ingress topic - Reads state outputs for the Kafka egress topic, and verifies that they are correct Right now the test scenario does not have any randomization to it. We may consider to add that in the future as a follow-up. --- ### Maven `run-integration-tests` build profile and structure setup With this PR, to add new container-based integration tests in the future, a developer has to do the following: - Add a new sub-module under `statefun-integration-tests` - Add source code for the Stateful Functions application to test against - Add a test class named with the pattern `*ITCase` that setups the containers using testcontainers, and implements the test verification logic as a JUnit test. The classname pattern is important because only then would it be skipped by the unit test phase, and only kicks in with the `run-integration-tests` profile. To build while also running the integration tests, one simply does: `mvn clean verify -Prun-integration-tests` in the project root directory. This always re-builds the base Stateful Functions image before running the ITCases. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-16159) Add simple end-to-end test for Stateful Functions using testcontainers
[ https://issues.apache.org/jira/browse/FLINK-16159?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-16159: --- Labels: pull-request-available (was: ) > Add simple end-to-end test for Stateful Functions using testcontainers > -- > > Key: FLINK-16159 > URL: https://issues.apache.org/jira/browse/FLINK-16159 > Project: Flink > Issue Type: Improvement > Components: Stateful Functions, Test Infrastructure >Affects Versions: statefun-1.1 >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai >Priority: Major > Labels: pull-request-available > > Often in our review process of changes to the Stateful Functions project, we > often end up wanting to do a simple sanity check by running the simple > greeter example with docker-compose. > Ideally, this should really be an end-to-end verification program that: > * Starts a Kafka broker and a simple stateful function application that has a > Kafka ingress and egress > * Uses the Kafka Java client to write some inputs to the broker, reads and > verifies output > With testcontainers (https://www.testcontainers.org/) this might even be > achievable as a Junit test, which runs only after the Maven packaging phase. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] flinkbot edited a comment on issue #11146: [FLINK-15718] Update StatefulJobSavepointMigrationITCase to restore from 1.10 savepoint
flinkbot edited a comment on issue #11146: [FLINK-15718] Update StatefulJobSavepointMigrationITCase to restore from 1.10 savepoint URL: https://github.com/apache/flink/pull/11146#issuecomment-588660647 ## CI report: * 6d0d8f47466cf61ec549d85627e0349550e2433d Travis: [PENDING](https://travis-ci.com/flink-ci/flink/builds/149768484) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot commented on issue #11151: [FLINK-16128][docs]Translate "Google Cloud PubSub" page into Chinese
flinkbot commented on issue #11151: [FLINK-16128][docs]Translate "Google Cloud PubSub" page into Chinese URL: https://github.com/apache/flink/pull/11151#issuecomment-588705948 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Automated Checks Last check on commit 89d26f578174d90ce9979f076346376bf3597a5b (Thu Feb 20 07:51:39 UTC 2020) ✅no warnings Mention the bot in a comment to re-run the automated checks. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-16128) Translate "Google Cloud PubSub" page into Chinese
[ https://issues.apache.org/jira/browse/FLINK-16128?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-16128: --- Labels: pull-request-available (was: ) > Translate "Google Cloud PubSub" page into Chinese > - > > Key: FLINK-16128 > URL: https://issues.apache.org/jira/browse/FLINK-16128 > Project: Flink > Issue Type: Sub-task > Components: chinese-translation, Documentation >Reporter: Jark Wu >Assignee: Xuannan Su >Priority: Major > Labels: pull-request-available > > The page url is > https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/connectors/pubsub.html > The markdown file is located in flink/docs/dev/connectors/pubsub.zh.md -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] Sxnan opened a new pull request #11151: [FLINK-16128][docs]Translate "Google Cloud PubSub" page into Chinese
Sxnan opened a new pull request #11151: [FLINK-16128][docs]Translate "Google Cloud PubSub" page into Chinese URL: https://github.com/apache/flink/pull/11151 ## What is the purpose of the change This pull request translates [Google Cloud PubSub](https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/connectors/pubsub.html) page of connectors into Chinese ## Brief change log - Translate the markdown file located at flink/docs/dev/connectors/pubsub.zh.md ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? not applicable This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot commented on issue #11149: [hotfix][table-api] Fixed ExpressionResolver rules modifiers
flinkbot commented on issue #11149: [hotfix][table-api] Fixed ExpressionResolver rules modifiers URL: https://github.com/apache/flink/pull/11149#issuecomment-588698826 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Automated Checks Last check on commit 306149eb700f0baa2d75c1e9971cbce50b273618 (Thu Feb 20 07:46:34 UTC 2020) **Warnings:** * No documentation files were touched! Remember to keep the Flink docs up to date! Mention the bot in a comment to re-run the automated checks. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot commented on issue #11150: [FLINK-15721] Update StatefulJobWBroadcastStateMigrationITCase to restore from 1.10 savepoint
flinkbot commented on issue #11150: [FLINK-15721] Update StatefulJobWBroadcastStateMigrationITCase to restore from 1.10 savepoint URL: https://github.com/apache/flink/pull/11150#issuecomment-588698772 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Automated Checks Last check on commit 8069d0cf582c214aedad280a56871be754a633ee (Thu Feb 20 07:46:31 UTC 2020) **Warnings:** * No documentation files were touched! Remember to keep the Flink docs up to date! * **This pull request references an unassigned [Jira ticket](https://issues.apache.org/jira/browse/FLINK-15721).** According to the [code contribution guide](https://flink.apache.org/contributing/contribute-code.html), tickets need to be assigned before starting with the implementation work. Mention the bot in a comment to re-run the automated checks. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] yanghua opened a new pull request #11150: [FLINK-15721] Update StatefulJobWBroadcastStateMigrationITCase to restore from 1.10 savepoint
yanghua opened a new pull request #11150: [FLINK-15721] Update StatefulJobWBroadcastStateMigrationITCase to restore from 1.10 savepoint URL: https://github.com/apache/flink/pull/11150 ## What is the purpose of the change * This pull request updates StatefulJobWBroadcastStateMigrationITCase to restore from 1.10 savepoint* ## Brief change log - *Update StatefulJobWBroadcastStateMigrationITCase to restore from 1.10 savepoint* ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / **not documented**) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-15721) Update StatefulJobWBroadcastStateMigrationITCase to restore from 1.10 savepoint
[ https://issues.apache.org/jira/browse/FLINK-15721?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-15721: --- Labels: pull-request-available (was: ) > Update StatefulJobWBroadcastStateMigrationITCase to restore from 1.10 > savepoint > --- > > Key: FLINK-15721 > URL: https://issues.apache.org/jira/browse/FLINK-15721 > Project: Flink > Issue Type: Sub-task > Components: Tests >Affects Versions: 1.11.0 >Reporter: vinoyang >Priority: Major > Labels: pull-request-available > Fix For: 1.11.0 > > > Update {{StatefulJobWBroadcastStateMigrationITCase}} to restore from 1.10 > savepoint. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode
zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode URL: https://github.com/apache/flink/pull/7368#discussion_r381826952 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/BufferResponseDecoderDelegate.java ## @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.netty; + +import org.apache.flink.runtime.io.network.NetworkClientHandler; +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext; + +import static org.apache.flink.runtime.io.network.netty.NettyMessage.BufferResponse; + +/** + * The parser for {@link org.apache.flink.runtime.io.network.netty.NettyMessage.BufferResponse}. + */ +public class BufferResponseDecoderDelegate implements NettyMessageDecoderDelegate { + + /** The network client handler of current channel. */ + private final NetworkClientHandler networkClientHandler; + + /** The Flink Buffer allocator. */ + private final NetworkBufferAllocator allocator; + + /** The cumulation buffer of message header. */ + private ByteBuf messageHeaderBuffer; + + /** +* The current BufferResponse message that are process the buffer part. +* If it is null, we are still processing the message header part, otherwise +* we are processing the buffer part. +*/ + private BufferResponse currentResponse; + + /** How much bytes have been received or discarded for the buffer part. */ + private int decodedBytesOfBuffer; + + public BufferResponseDecoderDelegate(NetworkClientHandler networkClientHandler) { + this.networkClientHandler = networkClientHandler; Review comment: Do not need this class-level var based on the [comment](https://github.com/apache/flink/pull/7368/files#r381770789), and we can pass the `NetworkBufferAllocator` as constructor argument directly. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] dawidwys opened a new pull request #11149: [hotfix][table-api] Fixed ExpressionResolver rules modifiers
dawidwys opened a new pull request #11149: [hotfix][table-api] Fixed ExpressionResolver rules modifiers URL: https://github.com/apache/flink/pull/11149 It is a trivial hotfix. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode
zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode URL: https://github.com/apache/flink/pull/7368#discussion_r381827789 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/BufferResponseDecoderDelegate.java ## @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.netty; + +import org.apache.flink.runtime.io.network.NetworkClientHandler; +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext; + +import static org.apache.flink.runtime.io.network.netty.NettyMessage.BufferResponse; + +/** + * The parser for {@link org.apache.flink.runtime.io.network.netty.NettyMessage.BufferResponse}. + */ +public class BufferResponseDecoderDelegate implements NettyMessageDecoderDelegate { + + /** The network client handler of current channel. */ + private final NetworkClientHandler networkClientHandler; + + /** The Flink Buffer allocator. */ + private final NetworkBufferAllocator allocator; + + /** The cumulation buffer of message header. */ + private ByteBuf messageHeaderBuffer; + + /** +* The current BufferResponse message that are process the buffer part. +* If it is null, we are still processing the message header part, otherwise +* we are processing the buffer part. +*/ + private BufferResponse currentResponse; + + /** How much bytes have been received or discarded for the buffer part. */ + private int decodedBytesOfBuffer; + + public BufferResponseDecoderDelegate(NetworkClientHandler networkClientHandler) { Review comment: nit: package public This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode
zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode URL: https://github.com/apache/flink/pull/7368#discussion_r381827385 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/BufferResponseDecoderDelegate.java ## @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.netty; + +import org.apache.flink.runtime.io.network.NetworkClientHandler; +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext; + +import static org.apache.flink.runtime.io.network.netty.NettyMessage.BufferResponse; + +/** + * The parser for {@link org.apache.flink.runtime.io.network.netty.NettyMessage.BufferResponse}. + */ +public class BufferResponseDecoderDelegate implements NettyMessageDecoderDelegate { + + /** The network client handler of current channel. */ + private final NetworkClientHandler networkClientHandler; + + /** The Flink Buffer allocator. */ + private final NetworkBufferAllocator allocator; + + /** The cumulation buffer of message header. */ + private ByteBuf messageHeaderBuffer; + + /** +* The current BufferResponse message that are process the buffer part. +* If it is null, we are still processing the message header part, otherwise +* we are processing the buffer part. +*/ + private BufferResponse currentResponse; + + /** How much bytes have been received or discarded for the buffer part. */ + private int decodedBytesOfBuffer; + + public BufferResponseDecoderDelegate(NetworkClientHandler networkClientHandler) { + this.networkClientHandler = networkClientHandler; + this.allocator = new NetworkBufferAllocator(networkClientHandler); + } + + @Override + public void onChannelActive(ChannelHandlerContext ctx) { + messageHeaderBuffer = ctx.alloc().directBuffer(BufferResponse.MESSAGE_HEADER_LENGTH); + } + + @Override + public void startParsingMessage(int msgId, int messageLength) { + currentResponse = null; + decodedBytesOfBuffer = 0; + + messageHeaderBuffer.clear(); + } + + @Override + public ParseResult onChannelRead(ByteBuf data) throws Exception { + if (currentResponse == null) { + ByteBuf toDecode = ByteBufUtils.cumulate(messageHeaderBuffer, data, BufferResponse.MESSAGE_HEADER_LENGTH); Review comment: nit: maybe further import `BufferResponse.MESSAGE_HEADER_LENGTH` before class. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode
zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode URL: https://github.com/apache/flink/pull/7368#discussion_r381826952 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/BufferResponseDecoderDelegate.java ## @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.netty; + +import org.apache.flink.runtime.io.network.NetworkClientHandler; +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext; + +import static org.apache.flink.runtime.io.network.netty.NettyMessage.BufferResponse; + +/** + * The parser for {@link org.apache.flink.runtime.io.network.netty.NettyMessage.BufferResponse}. + */ +public class BufferResponseDecoderDelegate implements NettyMessageDecoderDelegate { + + /** The network client handler of current channel. */ + private final NetworkClientHandler networkClientHandler; + + /** The Flink Buffer allocator. */ + private final NetworkBufferAllocator allocator; + + /** The cumulation buffer of message header. */ + private ByteBuf messageHeaderBuffer; + + /** +* The current BufferResponse message that are process the buffer part. +* If it is null, we are still processing the message header part, otherwise +* we are processing the buffer part. +*/ + private BufferResponse currentResponse; + + /** How much bytes have been received or discarded for the buffer part. */ + private int decodedBytesOfBuffer; + + public BufferResponseDecoderDelegate(NetworkClientHandler networkClientHandler) { + this.networkClientHandler = networkClientHandler; Review comment: Do not need this class-level var based on the [comment](https://github.com/apache/flink/pull/7368/files#r381770789) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] HuangZhenQiu commented on issue #10980: [FLINK-12343][flink-yarn] add yarn file replication configuration
HuangZhenQiu commented on issue #10980: [FLINK-12343][flink-yarn] add yarn file replication configuration URL: https://github.com/apache/flink/pull/10980#issuecomment-588680293 @TisonKun Would you please take one more round of look? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-16180) Replacing vertexExecution in ScheduledUnit with executionVertexID
[ https://issues.apache.org/jira/browse/FLINK-16180?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhu Zhu updated FLINK-16180: Description: {{ScheduledUnit#vertexExecution}} is nullable but {{ProgrammedSlotProvider}} requires it to be non-null to work. This makes {{ProgrammedSlotProvider}} not able to be used by new scheduler tests since {{vertexExecution}} is never set in the new scheduler code path. It blocks us from reworking tests which are based legacy scheduling to base on the new scheduler. Besides that, there are 2 other problems caused by the nullable vertexExecution: 1. The log printed in SchedulerImpl#allocateSlotInternal(...) may contain no useful info since the vertexExecution can be null. 2. NPE issue reported in FLINK-16145. Thus I would propose to replace the nullable vertexExecution with a non-null executionVertexID. was: {{ScheduledUnit#vertexExecution}} is nullable but {{ProgrammedSlotProvider}} requires it to be non-null to work. This makes {{ProgrammedSlotProvider}} not able to be used by new scheduler tests since {{vertexExecution}} is never set in the new scheduler code path. It blocks us from reworking tests which are based legacy scheduling to base on the new scheduler. I would propose to replace the nullable vertexExecution with a non-null executionVertexID. This change would not break anything since {{vertexExecution}} is mainly used by {{ProgrammedSlotProvider}} for testing. {{ProgrammedSlotProvider}} uses it to retrieve the JobVertexID and subtaskIndex. The only other place where {{ScheduledUnit#vertexExecution}} is used is to log the involved task for slot allocation in {{SchedulerImpl#allocateSlotInternal(...)}}. The log is problematic at the moment with the new scheduler since the vertexExecution is null. This change can fix the problematic log. This change would also fix a NPE issue reported in FLINK-16145. > Replacing vertexExecution in ScheduledUnit with executionVertexID > - > > Key: FLINK-16180 > URL: https://issues.apache.org/jira/browse/FLINK-16180 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Coordination >Affects Versions: 1.11.0 >Reporter: Zhu Zhu >Assignee: Zhu Zhu >Priority: Major > Labels: pull-request-available > Fix For: 1.11.0 > > Time Spent: 10m > Remaining Estimate: 0h > > {{ScheduledUnit#vertexExecution}} is nullable but {{ProgrammedSlotProvider}} > requires it to be non-null to work. This makes {{ProgrammedSlotProvider}} not > able to be used by new scheduler tests since {{vertexExecution}} is never set > in the new scheduler code path. It blocks us from reworking tests which are > based legacy scheduling to base on the new scheduler. > Besides that, there are 2 other problems caused by the nullable > vertexExecution: > 1. The log printed in SchedulerImpl#allocateSlotInternal(...) may contain no > useful info since the vertexExecution can be null. > 2. NPE issue reported in FLINK-16145. > Thus I would propose to replace the nullable vertexExecution with a non-null > executionVertexID. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] flinkbot commented on issue #11148: [FLINK-16180][runtime] Replace the nullable vertexExecution in ScheduledUnit with a non-null executionVertexId
flinkbot commented on issue #11148: [FLINK-16180][runtime] Replace the nullable vertexExecution in ScheduledUnit with a non-null executionVertexId URL: https://github.com/apache/flink/pull/11148#issuecomment-588668558 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Automated Checks Last check on commit 9cf340d95807d82066316548ca6e503f8fd731e8 (Thu Feb 20 07:24:56 UTC 2020) **Warnings:** * No documentation files were touched! Remember to keep the Flink docs up to date! Mention the bot in a comment to re-run the automated checks. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-16180) Replacing vertexExecution in ScheduledUnit with executionVertexID
[ https://issues.apache.org/jira/browse/FLINK-16180?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17040713#comment-17040713 ] Zhu Zhu commented on FLINK-16180: - cc [~gjy] [~trohrmann] > Replacing vertexExecution in ScheduledUnit with executionVertexID > - > > Key: FLINK-16180 > URL: https://issues.apache.org/jira/browse/FLINK-16180 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Coordination >Affects Versions: 1.11.0 >Reporter: Zhu Zhu >Assignee: Zhu Zhu >Priority: Major > Labels: pull-request-available > Fix For: 1.11.0 > > Time Spent: 10m > Remaining Estimate: 0h > > {{ScheduledUnit#vertexExecution}} is nullable but {{ProgrammedSlotProvider}} > requires it to be non-null to work. This makes {{ProgrammedSlotProvider}} not > able to be used by new scheduler tests since {{vertexExecution}} is never set > in the new scheduler code path. It blocks us from reworking tests which are > based legacy scheduling to base on the new scheduler. > Besides that, there are 2 other problems caused by the nullable > vertexExecution: > 1. The log printed in SchedulerImpl#allocateSlotInternal(...) may contain no > useful info since the vertexExecution can be null. > 2. NPE issue reported in FLINK-16145. > Thus I would propose to replace the nullable vertexExecution with a non-null > executionVertexID. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-16180) Replacing vertexExecution in ScheduledUnit with executionVertexID
[ https://issues.apache.org/jira/browse/FLINK-16180?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-16180: --- Labels: pull-request-available (was: ) > Replacing vertexExecution in ScheduledUnit with executionVertexID > - > > Key: FLINK-16180 > URL: https://issues.apache.org/jira/browse/FLINK-16180 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Coordination >Affects Versions: 1.11.0 >Reporter: Zhu Zhu >Assignee: Zhu Zhu >Priority: Major > Labels: pull-request-available > Fix For: 1.11.0 > > > {{ScheduledUnit#vertexExecution}} is nullable but {{ProgrammedSlotProvider}} > requires it to be non-null to work. This makes {{ProgrammedSlotProvider}} not > able to be used by new scheduler tests since {{vertexExecution}} is never set > in the new scheduler code path. It blocks us from reworking tests which are > based legacy scheduling to base on the new scheduler. > I would propose to replace the nullable vertexExecution with a non-null > executionVertexID. > This change would not break anything since {{vertexExecution}} is mainly used > by {{ProgrammedSlotProvider}} for testing. {{ProgrammedSlotProvider}} uses it > to retrieve the JobVertexID and subtaskIndex. > The only other place where {{ScheduledUnit#vertexExecution}} is used is to > log the involved task for slot allocation in > {{SchedulerImpl#allocateSlotInternal(...)}}. The log is problematic at the > moment with the new scheduler since the vertexExecution is null. This change > can fix the problematic log. > This change would also fix a NPE issue reported in FLINK-16145. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] zhuzhurk opened a new pull request #11148: [FLINK-16180][runtime] Replace the nullable vertexExecution in ScheduledUnit with a non-null executionVertexId
zhuzhurk opened a new pull request #11148: [FLINK-16180][runtime] Replace the nullable vertexExecution in ScheduledUnit with a non-null executionVertexId URL: https://github.com/apache/flink/pull/11148 ## What is the purpose of the change ScheduledUnit#vertexExecution is nullable but ProgrammedSlotProvider requires it to be non-null to work. This makes ProgrammedSlotProvider not able to be used by new scheduler tests since vertexExecution is never set in the new scheduler code path. It blocks us from reworking tests which are based legacy scheduling to base on the new scheduler. Thus I opened this PR to replace the nullable vertexExecution with a non-null executionVertexID. It also naturally fixes 2 other issues: 1. The log printed in SchedulerImpl#allocateSlotInternal(...) can be broken since the vertexExecution can be null. 2. an NPE issue reported in FLINK-16145. ## Brief change log See commits. ## Verifying this change This change is already covered by existing tests, such as *ExecutionTests*. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot commented on issue #11146: [FLINK-15718] Update StatefulJobSavepointMigrationITCase to restore from 1.10 savepoint
flinkbot commented on issue #11146: [FLINK-15718] Update StatefulJobSavepointMigrationITCase to restore from 1.10 savepoint URL: https://github.com/apache/flink/pull/11146#issuecomment-588660647 ## CI report: * 6d0d8f47466cf61ec549d85627e0349550e2433d UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot commented on issue #11147: [FLINK-15717] Update StatefulJobSavepointMigrationITCase.scala to restore from 1.10 savepoint
flinkbot commented on issue #11147: [FLINK-15717] Update StatefulJobSavepointMigrationITCase.scala to restore from 1.10 savepoint URL: https://github.com/apache/flink/pull/11147#issuecomment-588652456 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Automated Checks Last check on commit 8127feb053c5d657b8453a7ef02c65da627d037c (Thu Feb 20 07:11:52 UTC 2020) **Warnings:** * No documentation files were touched! Remember to keep the Flink docs up to date! * **This pull request references an unassigned [Jira ticket](https://issues.apache.org/jira/browse/FLINK-15717).** According to the [code contribution guide](https://flink.apache.org/contributing/contribute-code.html), tickets need to be assigned before starting with the implementation work. Mention the bot in a comment to re-run the automated checks. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-15717) Update StatefulJobSavepointMigrationITCase.scala to restore from 1.10 savepoint
[ https://issues.apache.org/jira/browse/FLINK-15717?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-15717: --- Labels: pull-request-available (was: ) > Update StatefulJobSavepointMigrationITCase.scala to restore from 1.10 > savepoint > --- > > Key: FLINK-15717 > URL: https://issues.apache.org/jira/browse/FLINK-15717 > Project: Flink > Issue Type: Sub-task > Components: Tests >Affects Versions: 1.11.0 >Reporter: vinoyang >Priority: Major > Labels: pull-request-available > Fix For: 1.11.0 > > > Update {{StatefulJobSavepointMigrationITCase.scala}} to restore from 1.10 > savepoint -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] yanghua opened a new pull request #11147: [FLINK-15717] Update StatefulJobSavepointMigrationITCase.scala to restore from 1.10 savepoint
yanghua opened a new pull request #11147: [FLINK-15717] Update StatefulJobSavepointMigrationITCase.scala to restore from 1.10 savepoint URL: https://github.com/apache/flink/pull/11147 ## What is the purpose of the change *This pull request updates StatefulJobSavepointMigrationITCase.scala to restore from 1.10 savepoint* ## Brief change log - *Update StatefulJobSavepointMigrationITCase.scala to restore from 1.10 savepoint* ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / **not documented**) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-16141) HiveTableSourceTest unstable
[ https://issues.apache.org/jira/browse/FLINK-16141?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17040690#comment-17040690 ] Rui Li commented on FLINK-16141: Hey [~sewen], yes the test is setup locally. It launches a separate metastore process and connects to it. Judging from the stacktrace, either the metastore process crashed shortly after it started, or we had some problem establishing the connection. The log of the metastore process is disabled by default to avoid flooding travis output. We can't tell the root cause for sure unless we reproduce the issue with metastore log enabled. I'll see what I can do about the exception logs. I agree we should suppress them for negative test cases, while have them enabled for normal tests. > HiveTableSourceTest unstable > > > Key: FLINK-16141 > URL: https://issues.apache.org/jira/browse/FLINK-16141 > Project: Flink > Issue Type: Bug > Components: Connectors / Hive >Affects Versions: 1.10.0 >Reporter: Stephan Ewen >Priority: Blocker > Fix For: 1.11.0 > > > The test fails with various exceptions. > See https://api.travis-ci.org/v3/job/651866181/log.txt -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] flinkbot commented on issue #11146: [FLINK-15718] Update StatefulJobSavepointMigrationITCase to restore from 1.10 savepoint
flinkbot commented on issue #11146: [FLINK-15718] Update StatefulJobSavepointMigrationITCase to restore from 1.10 savepoint URL: https://github.com/apache/flink/pull/11146#issuecomment-588634455 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Automated Checks Last check on commit 6d0d8f47466cf61ec549d85627e0349550e2433d (Thu Feb 20 06:37:25 UTC 2020) **Warnings:** * No documentation files were touched! Remember to keep the Flink docs up to date! * **This pull request references an unassigned [Jira ticket](https://issues.apache.org/jira/browse/FLINK-15718).** According to the [code contribution guide](https://flink.apache.org/contributing/contribute-code.html), tickets need to be assigned before starting with the implementation work. Mention the bot in a comment to re-run the automated checks. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-15718) Update StatefulJobSavepointMigrationITCase to restore from 1.10 savepoint
[ https://issues.apache.org/jira/browse/FLINK-15718?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-15718: --- Labels: pull-request-available (was: ) > Update StatefulJobSavepointMigrationITCase to restore from 1.10 savepoint > - > > Key: FLINK-15718 > URL: https://issues.apache.org/jira/browse/FLINK-15718 > Project: Flink > Issue Type: Sub-task > Components: Tests >Affects Versions: 1.11.0 >Reporter: vinoyang >Priority: Major > Labels: pull-request-available > Fix For: 1.11.0 > > > Update {{StatefulJobSavepointMigrationITCase}} to restore from 1.10 savepoint -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] yanghua opened a new pull request #11146: [FLINK-15718] Update StatefulJobSavepointMigrationITCase to restore from 1.10 savepoint
yanghua opened a new pull request #11146: [FLINK-15718] Update StatefulJobSavepointMigrationITCase to restore from 1.10 savepoint URL: https://github.com/apache/flink/pull/11146 ## What is the purpose of the change *This pull request updates StatefulJobSavepointMigrationITCase to restore from 1.10 savepoint* ## Brief change log - *Update StatefulJobSavepointMigrationITCase to restore from 1.10 savepoint* ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / **not documented**) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #11144: [FLINK-15715] Update ContinuousFileProcessingMigrationTest to restore from 1.10 savepoint
flinkbot edited a comment on issue #11144: [FLINK-15715] Update ContinuousFileProcessingMigrationTest to restore from 1.10 savepoint URL: https://github.com/apache/flink/pull/11144#issuecomment-588270086 ## CI report: * bafa0b96be75a242a6465becc11e82a1afa1de61 Travis: [CANCELED](https://travis-ci.com/flink-ci/flink/builds/149646329) * 66e240be3d3886212b32373a975e6f005a817d39 Travis: [SUCCESS](https://travis-ci.com/flink-ci/flink/builds/149745729) Azure: [SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5336) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode
zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode URL: https://github.com/apache/flink/pull/7368#discussion_r381780564 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessageClientDecoder.java ## @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.netty; + +import org.apache.flink.runtime.io.network.NetworkClientHandler; +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter; + +import static org.apache.flink.runtime.io.network.netty.NettyMessage.FRAME_HEADER_LENGTH; +import static org.apache.flink.runtime.io.network.netty.NettyMessage.MAGIC_NUMBER; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * Decodes messages from the fragmentary netty buffers. This decoder assumes the + * messages have the following format: + * +---++ + * | FRAME_HEADER || MESSAGE_HEADER | DATA BUFFER (Optional) | + * +---++ + * + * This decoder decodes the frame header and delegates the other following work + * to corresponding message parsers according to the message type. During the process + * of decoding, the decoder and parsers try best to eliminate copying. For the frame + * header and message header, it only cumulates data when they span multiple input buffers. + * For the buffer part, it copies directly to the input channels to avoid future copying. + * + * The format of the frame header is + * +--+--++ + * | FRAME LENGTH (4) | MAGIC NUMBER (4) | ID (1) | + * +--+--++ + */ +public class NettyMessageClientDecoder extends ChannelInboundHandlerAdapter { + + /** The message parser for buffer response. */ +private final NettyMessageDecoderDelegate bufferResponseDecoderDelegate; + +/** The message parser for other messages other than buffer response. */ + private final NettyMessageDecoderDelegate nonBufferResponseDecoderDelegate; + + /** The cumulation buffer for the frame header part. */ + private ByteBuf frameHeaderBuffer; + + /** +* The chosen message parser for the current message. If it is null, then +* we are decoding the frame header part, otherwise we are decoding the actual +* message. +*/ + private NettyMessageDecoderDelegate currentDecoderDelegate; + +NettyMessageClientDecoder(NetworkClientHandler networkClientHandler) { +this.bufferResponseDecoderDelegate = new BufferResponseDecoderDelegate(networkClientHandler); +this.nonBufferResponseDecoderDelegate = new NonBufferResponseDecoderDelegate(); +} + +@Override +public void channelActive(ChannelHandlerContext ctx) throws Exception { +super.channelActive(ctx); + +bufferResponseDecoderDelegate.onChannelActive(ctx); +nonBufferResponseDecoderDelegate.onChannelActive(ctx); + + frameHeaderBuffer = ctx.alloc().directBuffer(FRAME_HEADER_LENGTH); +} + +@Override +public void channelInactive(ChannelHandlerContext ctx) throws Exception { +super.channelInactive(ctx); + + bufferResponseDecoderDelegate.close(); + nonBufferResponseDecoderDelegate.close(); + + frameHeaderBuffer.release(); +} + +@Override +public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { +ByteBuf data = (ByteBuf) msg; + +try { +while (data.isReadable()) { + if (currentDecoderDelegate == null) { + decodeFrameHeader(data); + } + + if (data.isReadable() && currentDecoderDelegate != null) { +
[GitHub] [flink] zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode
zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode URL: https://github.com/apache/flink/pull/7368#discussion_r381779124 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java ## @@ -351,38 +347,68 @@ ByteBuf write(ByteBufAllocator allocator) throws IOException { CompositeByteBuf composityBuf = allocator.compositeDirectBuffer(); composityBuf.addComponent(headerBuf); - composityBuf.addComponent(buffer); + composityBuf.addComponent(buffer.asByteBuf()); // update writer index since we have data written to the components: - composityBuf.writerIndex(headerBuf.writerIndex() + buffer.writerIndex()); + composityBuf.writerIndex(headerBuf.writerIndex() + buffer.asByteBuf().writerIndex()); return composityBuf; } catch (Throwable t) { if (headerBuf != null) { headerBuf.release(); } - buffer.release(); + buffer.recycleBuffer(); ExceptionUtils.rethrowIOException(t); return null; // silence the compiler } } - static BufferResponse readFrom(ByteBuf buffer) { - InputChannelID receiverId = InputChannelID.fromByteBuf(buffer); - int sequenceNumber = buffer.readInt(); - int backlog = buffer.readInt(); - boolean isBuffer = buffer.readBoolean(); - boolean isCompressed = buffer.readBoolean(); - int size = buffer.readInt(); + /** +* Parses the message header part and composes a new BufferResponse with an empty data buffer. The +* data buffer will be filled in later. This method is used in credit-based network stack. +* +* @param messageHeader the serialized message header. +* @param bufferAllocator the allocator for network buffer. +* @return a BufferResponse object with the header parsed and the data buffer to fill in later. The +* data buffer will be null if the target channel has been released or the buffer size is 0. +*/ + static BufferResponse readFrom(ByteBuf messageHeader, NetworkBufferAllocator bufferAllocator) { + InputChannelID receiverId = InputChannelID.fromByteBuf(messageHeader); + int sequenceNumber = messageHeader.readInt(); + int backlog = messageHeader.readInt(); + boolean isBuffer = messageHeader.readBoolean(); + boolean isCompressed = messageHeader.readBoolean(); + int size = messageHeader.readInt(); + + Buffer dataBuffer = null; + if (size != 0) { + if (isBuffer) { + dataBuffer = bufferAllocator.allocatePooledNetworkBuffer(receiverId); + } else { + dataBuffer = bufferAllocator.allocateUnPooledNetworkBuffer(size); + } + } + + if (dataBuffer == null) { + dataBuffer = bufferAllocator.getPlaceHolderBuffer(); Review comment: Based on the [comment](https://github.com/apache/flink/pull/7368/files#r381770789), the `BufferResponse` should provide a way for getting released info in order to be handler by `PartitionRequestClientHandler`. There might be two options: - Rely on whether `dataBuffer` is null, not quite sure whether it is suggested to use `Optional` by the latest guideline . - Define a new boolean field in `BufferResponse` to describe this info, but it seems a bit redundant with `dataBuffer` state. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode
zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode URL: https://github.com/apache/flink/pull/7368#discussion_r381779124 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java ## @@ -351,38 +347,68 @@ ByteBuf write(ByteBufAllocator allocator) throws IOException { CompositeByteBuf composityBuf = allocator.compositeDirectBuffer(); composityBuf.addComponent(headerBuf); - composityBuf.addComponent(buffer); + composityBuf.addComponent(buffer.asByteBuf()); // update writer index since we have data written to the components: - composityBuf.writerIndex(headerBuf.writerIndex() + buffer.writerIndex()); + composityBuf.writerIndex(headerBuf.writerIndex() + buffer.asByteBuf().writerIndex()); return composityBuf; } catch (Throwable t) { if (headerBuf != null) { headerBuf.release(); } - buffer.release(); + buffer.recycleBuffer(); ExceptionUtils.rethrowIOException(t); return null; // silence the compiler } } - static BufferResponse readFrom(ByteBuf buffer) { - InputChannelID receiverId = InputChannelID.fromByteBuf(buffer); - int sequenceNumber = buffer.readInt(); - int backlog = buffer.readInt(); - boolean isBuffer = buffer.readBoolean(); - boolean isCompressed = buffer.readBoolean(); - int size = buffer.readInt(); + /** +* Parses the message header part and composes a new BufferResponse with an empty data buffer. The +* data buffer will be filled in later. This method is used in credit-based network stack. +* +* @param messageHeader the serialized message header. +* @param bufferAllocator the allocator for network buffer. +* @return a BufferResponse object with the header parsed and the data buffer to fill in later. The +* data buffer will be null if the target channel has been released or the buffer size is 0. +*/ + static BufferResponse readFrom(ByteBuf messageHeader, NetworkBufferAllocator bufferAllocator) { + InputChannelID receiverId = InputChannelID.fromByteBuf(messageHeader); + int sequenceNumber = messageHeader.readInt(); + int backlog = messageHeader.readInt(); + boolean isBuffer = messageHeader.readBoolean(); + boolean isCompressed = messageHeader.readBoolean(); + int size = messageHeader.readInt(); + + Buffer dataBuffer = null; + if (size != 0) { + if (isBuffer) { + dataBuffer = bufferAllocator.allocatePooledNetworkBuffer(receiverId); + } else { + dataBuffer = bufferAllocator.allocateUnPooledNetworkBuffer(size); + } + } + + if (dataBuffer == null) { + dataBuffer = bufferAllocator.getPlaceHolderBuffer(); Review comment: Based on the [comment](https://github.com/apache/flink/pull/7368/files#r381770789), the `BufferResponse` should provide a way for getting released info in order to be handled by `PartitionRequestClientHandler`. There might be two options: - Rely on whether `dataBuffer` is null, not quite sure whether it is suggested to use `Optional` by the latest guideline . - Define a new boolean field in `BufferResponse` to describe this info, but it seems a bit redundant with `dataBuffer` state. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode
zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode URL: https://github.com/apache/flink/pull/7368#discussion_r381773812 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/BufferResponseDecoderDelegate.java ## @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.netty; + +import org.apache.flink.runtime.io.network.NetworkClientHandler; +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext; + +import static org.apache.flink.runtime.io.network.netty.NettyMessage.BufferResponse; + +/** + * The parser for {@link org.apache.flink.runtime.io.network.netty.NettyMessage.BufferResponse}. + */ +public class BufferResponseDecoderDelegate implements NettyMessageDecoderDelegate { + + /** The network client handler of current channel. */ + private final NetworkClientHandler networkClientHandler; + + /** The Flink Buffer allocator. */ + private final NetworkBufferAllocator allocator; + + /** The cumulation buffer of message header. */ + private ByteBuf messageHeaderBuffer; + + /** +* The current BufferResponse message that are process the buffer part. +* If it is null, we are still processing the message header part, otherwise +* we are processing the buffer part. +*/ + private BufferResponse currentResponse; + + /** How much bytes have been received or discarded for the buffer part. */ + private int decodedBytesOfBuffer; + + public BufferResponseDecoderDelegate(NetworkClientHandler networkClientHandler) { + this.networkClientHandler = networkClientHandler; + this.allocator = new NetworkBufferAllocator(networkClientHandler); + } + + @Override + public void onChannelActive(ChannelHandlerContext ctx) { + messageHeaderBuffer = ctx.alloc().directBuffer(BufferResponse.MESSAGE_HEADER_LENGTH); + } + + @Override + public void startParsingMessage(int msgId, int messageLength) { + currentResponse = null; + decodedBytesOfBuffer = 0; + + messageHeaderBuffer.clear(); + } + + @Override + public ParseResult onChannelRead(ByteBuf data) throws Exception { + if (currentResponse == null) { + ByteBuf toDecode = ByteBufUtils.cumulate(messageHeaderBuffer, data, BufferResponse.MESSAGE_HEADER_LENGTH); + + if (toDecode != null) { + currentResponse = BufferResponse.readFrom(toDecode, allocator); + + if (currentResponse.bufferSize == 0) { + return ParseResult.finishedWith(currentResponse); + } + } + } + + if (currentResponse != null) { + boolean isDiscarding = allocator.isPlaceHolderBuffer(currentResponse.getBuffer()); + int remainingBufferSize = currentResponse.bufferSize - decodedBytesOfBuffer; + int actualBytesToDecode = Math.min(data.readableBytes(), remainingBufferSize); + + if (isDiscarding) { + data.readerIndex(data.readerIndex() + actualBytesToDecode); + } else { + currentResponse.getBuffer().asByteBuf().writeBytes(data, actualBytesToDecode); + } + + decodedBytesOfBuffer += actualBytesToDecode; + + if (decodedBytesOfBuffer == currentResponse.bufferSize) { + if (isDiscarding) { + networkClientHandler.cancelRequestFor(currentResponse.receiverId); + return ParseResult.finishedWith(null); Review comment: Based on the
[GitHub] [flink] zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode
zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode URL: https://github.com/apache/flink/pull/7368#discussion_r381773376 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/BufferResponseDecoderDelegate.java ## @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.netty; + +import org.apache.flink.runtime.io.network.NetworkClientHandler; +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext; + +import static org.apache.flink.runtime.io.network.netty.NettyMessage.BufferResponse; + +/** + * The parser for {@link org.apache.flink.runtime.io.network.netty.NettyMessage.BufferResponse}. + */ +public class BufferResponseDecoderDelegate implements NettyMessageDecoderDelegate { + + /** The network client handler of current channel. */ + private final NetworkClientHandler networkClientHandler; + + /** The Flink Buffer allocator. */ + private final NetworkBufferAllocator allocator; + + /** The cumulation buffer of message header. */ + private ByteBuf messageHeaderBuffer; + + /** +* The current BufferResponse message that are process the buffer part. +* If it is null, we are still processing the message header part, otherwise +* we are processing the buffer part. +*/ + private BufferResponse currentResponse; + + /** How much bytes have been received or discarded for the buffer part. */ + private int decodedBytesOfBuffer; + + public BufferResponseDecoderDelegate(NetworkClientHandler networkClientHandler) { + this.networkClientHandler = networkClientHandler; + this.allocator = new NetworkBufferAllocator(networkClientHandler); + } + + @Override + public void onChannelActive(ChannelHandlerContext ctx) { + messageHeaderBuffer = ctx.alloc().directBuffer(BufferResponse.MESSAGE_HEADER_LENGTH); + } + + @Override + public void startParsingMessage(int msgId, int messageLength) { + currentResponse = null; + decodedBytesOfBuffer = 0; + + messageHeaderBuffer.clear(); + } + + @Override + public ParseResult onChannelRead(ByteBuf data) throws Exception { + if (currentResponse == null) { + ByteBuf toDecode = ByteBufUtils.cumulate(messageHeaderBuffer, data, BufferResponse.MESSAGE_HEADER_LENGTH); + + if (toDecode != null) { + currentResponse = BufferResponse.readFrom(toDecode, allocator); + + if (currentResponse.bufferSize == 0) { + return ParseResult.finishedWith(currentResponse); + } + } + } + + if (currentResponse != null) { + boolean isDiscarding = allocator.isPlaceHolderBuffer(currentResponse.getBuffer()); + int remainingBufferSize = currentResponse.bufferSize - decodedBytesOfBuffer; + int actualBytesToDecode = Math.min(data.readableBytes(), remainingBufferSize); + + if (isDiscarding) { + data.readerIndex(data.readerIndex() + actualBytesToDecode); + } else { + currentResponse.getBuffer().asByteBuf().writeBytes(data, actualBytesToDecode); + } + + decodedBytesOfBuffer += actualBytesToDecode; + + if (decodedBytesOfBuffer == currentResponse.bufferSize) { + if (isDiscarding) { Review comment: We can also remove this logic based on [comment](https://github.com/apache/flink/pull/7368/files#r381770789), and another benefit is that we can avoid judging this condition twice to make the logic seem spread.
[GitHub] [flink] zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode
zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode URL: https://github.com/apache/flink/pull/7368#discussion_r381771843 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/BufferResponseDecoderDelegate.java ## @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.netty; + +import org.apache.flink.runtime.io.network.NetworkClientHandler; +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufAllocator; + +/** + * The parser for {@link org.apache.flink.runtime.io.network.netty.NettyMessage.BufferResponse}. + */ +public class BufferResponseDecoderDelegate implements NettyMessageDecoderDelegate { + + /** The network client handler of current channel. */ + private final NetworkClientHandler networkClientHandler; + + /** The Flink Buffer allocator. */ + private final NetworkBufferAllocator allocator; + + /** The cumulation buffer of message header. */ + private ByteBuf messageHeaderCumulationBuffer; + + /** +* The current BufferResponse message that are process the buffer part. +* If it is null, we are still processing the message header part, otherwise +* we are processing the buffer part. +*/ + private NettyMessage.BufferResponse currentResponse; + + /** How much bytes have been received or discarded for the buffer part. */ + private int decodedBytesOfBuffer; + + public BufferResponseDecoderDelegate(NetworkClientHandler networkClientHandler) { + this.networkClientHandler = networkClientHandler; + this.allocator = new NetworkBufferAllocator(networkClientHandler); + } + + @Override + public void onChannelActive(ByteBufAllocator alloc) { + messageHeaderCumulationBuffer = alloc.directBuffer(NettyMessage.BufferResponse.MESSAGE_HEADER_LENGTH); + } + + @Override + public void startParsingMessage(int msgId, int messageLength) { + currentResponse = null; + decodedBytesOfBuffer = 0; + + messageHeaderCumulationBuffer.clear(); + } + + @Override + public ParseResult onChannelRead(ByteBuf data) throws Exception { + if (currentResponse == null) { + ByteBuf toDecode = ByteBufUtils.cumulate(messageHeaderCumulationBuffer, data, NettyMessage.BufferResponse.MESSAGE_HEADER_LENGTH); + + if (toDecode != null) { + currentResponse = NettyMessage.BufferResponse.readFrom(toDecode, allocator); + + if (currentResponse.bufferSize == 0) { Review comment: This logic does not need to explicitly handle based on the [comment](https://github.com/apache/flink/pull/7368/files#r381770789), and it should be transparent with below process. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode
zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode URL: https://github.com/apache/flink/pull/7368#discussion_r381770789 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/BufferResponseDecoderDelegate.java ## @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.netty; + +import org.apache.flink.runtime.io.network.NetworkClientHandler; +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext; + +import static org.apache.flink.runtime.io.network.netty.NettyMessage.BufferResponse; + +/** + * The parser for {@link org.apache.flink.runtime.io.network.netty.NettyMessage.BufferResponse}. + */ +public class BufferResponseDecoderDelegate implements NettyMessageDecoderDelegate { Review comment: After thinking through, I think we should redefine the function of `BufferResponseDecoderDelegate`. Currently it is not only decoding from `ByteBuf` to `BufferResponse` message, and also needs to involve in the specific cases for `size = 0` and `released channel`, which would make the internal logics more complex. Instead we can make `BufferResponseDecoderDelegate` only handle the decoder work, and fire the complete `BufferResponse` message to next `PartitionRequestClientHandler` which would handle the specific logics more easily and clean. I would point out the related changes in detail codes. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode
zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode URL: https://github.com/apache/flink/pull/7368#discussion_r381770789 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/BufferResponseDecoderDelegate.java ## @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.netty; + +import org.apache.flink.runtime.io.network.NetworkClientHandler; +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext; + +import static org.apache.flink.runtime.io.network.netty.NettyMessage.BufferResponse; + +/** + * The parser for {@link org.apache.flink.runtime.io.network.netty.NettyMessage.BufferResponse}. + */ +public class BufferResponseDecoderDelegate implements NettyMessageDecoderDelegate { Review comment: After thinking through, it is necessary to redefine the function of `BufferResponseDecoderDelegate`. Currently it is not only decoding from `ByteBuf` to `BufferResponse` message, and also involving in the specific cases for `size = 0` and `released channel`, which would make the internal logics more complex. Instead we can make `BufferResponseDecoderDelegate` only handle the decoder work, and fire the complete `BufferResponse` message to next `PartitionRequestClientHandler` which would handle the specific logics more easily and clean. I would point out the related changes in detail codes. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] dianfu commented on a change in pull request #11118: [FLINK-16072][python] Optimize the performance of the write/read null mask
dianfu commented on a change in pull request #8: [FLINK-16072][python] Optimize the performance of the write/read null mask URL: https://github.com/apache/flink/pull/8#discussion_r381725940 ## File path: flink-python/pyflink/fn_execution/coder_impl.py ## @@ -24,57 +24,71 @@ from pyflink.table import Row +def generate_null_mask_search_table(): Review comment: Make it a private method of FlattenRowCoderImpl? I think there is no need to expose it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] dianfu commented on a change in pull request #11118: [FLINK-16072][python] Optimize the performance of the write/read null mask
dianfu commented on a change in pull request #8: [FLINK-16072][python] Optimize the performance of the write/read null mask URL: https://github.com/apache/flink/pull/8#discussion_r381766299 ## File path: flink-python/pyflink/fn_execution/coder_impl.py ## @@ -24,57 +24,71 @@ from pyflink.table import Row +def generate_null_mask_search_table(): +""" +Each bit of one byte represents if the column at the specified position is None or not, e.g. +0x84 represents the first column and the sixth column are None. +""" +num = 256 +null_mask = [] +for b in range(num): +every_num_null_mask = [(b & 0x80) > 0, (b & 0x40) > 0, (b & 0x20) > 0, (b & 0x10) > 0, + (b & 0x08) > 0, (b & 0x04) > 0, (b & 0x02) > 0, (b & 0x01) > 0] +null_mask.append(tuple(every_num_null_mask)) + +return tuple(null_mask) + + class FlattenRowCoderImpl(StreamCoderImpl): +null_mask_search_table = generate_null_mask_search_table() Review comment: What about make it an instance variable as it's faster than static variable: https://stackoverflow.com/questions/2714573/instance-variables-vs-class-variables-in-python This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] dianfu commented on a change in pull request #11118: [FLINK-16072][python] Optimize the performance of the write/read null mask
dianfu commented on a change in pull request #8: [FLINK-16072][python] Optimize the performance of the write/read null mask URL: https://github.com/apache/flink/pull/8#discussion_r381768526 ## File path: flink-python/pyflink/fn_execution/coder_impl.py ## @@ -24,57 +24,71 @@ from pyflink.table import Row +def generate_null_mask_search_table(): +""" +Each bit of one byte represents if the column at the specified position is None or not, e.g. +0x84 represents the first column and the sixth column are None. +""" +num = 256 +null_mask = [] +for b in range(num): +every_num_null_mask = [(b & 0x80) > 0, (b & 0x40) > 0, (b & 0x20) > 0, (b & 0x10) > 0, + (b & 0x08) > 0, (b & 0x04) > 0, (b & 0x02) > 0, (b & 0x01) > 0] +null_mask.append(tuple(every_num_null_mask)) + +return tuple(null_mask) + + class FlattenRowCoderImpl(StreamCoderImpl): +null_mask_search_table = generate_null_mask_search_table() + +null_byte_search_table = (0x80, 0x40, 0x20, 0x10, 0x08, 0x04, 0x02, 0x01) def __init__(self, field_coders): self._field_coders = field_coders self._filed_count = len(field_coders) +self._complete_byte_num = self._filed_count // 8 +self._leading_bytes_num = self._filed_count % 8 def encode_to_stream(self, value, out_stream, nested): self.write_null_mask(value, out_stream) for i in range(self._filed_count): -if value[i] is not None: -self._field_coders[i].encode_to_stream(value[i], out_stream, nested) +item = value[i] +if item is not None: +self._field_coders[i].encode_to_stream(item, out_stream, nested) def decode_from_stream(self, in_stream, nested): -null_mask = self.read_null_mask(self._filed_count, in_stream) +null_mask = self.read_null_mask(in_stream) return [None if null_mask[idx] else self._field_coders[idx].decode_from_stream( in_stream, nested) for idx in range(0, self._filed_count)] -@staticmethod -def write_null_mask(value, out_stream): Review comment: Could we add some unit tests for the null mask related logic? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] dianfu commented on a change in pull request #11118: [FLINK-16072][python] Optimize the performance of the write/read null mask
dianfu commented on a change in pull request #8: [FLINK-16072][python] Optimize the performance of the write/read null mask URL: https://github.com/apache/flink/pull/8#discussion_r381769187 ## File path: flink-python/pyflink/fn_execution/coder_impl.py ## @@ -25,56 +25,331 @@ class FlattenRowCoderImpl(StreamCoderImpl): +null_mask_search_table = ((False, False, False, False, False, False, False, False), + (False, False, False, False, False, False, False, True), + (False, False, False, False, False, False, True, False), + (False, False, False, False, False, False, True, True), + (False, False, False, False, False, True, False, False), + (False, False, False, False, False, True, False, True), + (False, False, False, False, False, True, True, False), + (False, False, False, False, False, True, True, True), + (False, False, False, False, True, False, False, False), + (False, False, False, False, True, False, False, True), + (False, False, False, False, True, False, True, False), + (False, False, False, False, True, False, True, True), + (False, False, False, False, True, True, False, False), + (False, False, False, False, True, True, False, True), + (False, False, False, False, True, True, True, False), + (False, False, False, False, True, True, True, True), + (False, False, False, True, False, False, False, False), + (False, False, False, True, False, False, False, True), + (False, False, False, True, False, False, True, False), + (False, False, False, True, False, False, True, True), + (False, False, False, True, False, True, False, False), + (False, False, False, True, False, True, False, True), + (False, False, False, True, False, True, True, False), + (False, False, False, True, False, True, True, True), + (False, False, False, True, True, False, False, False), + (False, False, False, True, True, False, False, True), + (False, False, False, True, True, False, True, False), + (False, False, False, True, True, False, True, True), + (False, False, False, True, True, True, False, False), + (False, False, False, True, True, True, False, True), + (False, False, False, True, True, True, True, False), + (False, False, False, True, True, True, True, True), + (False, False, True, False, False, False, False, False), + (False, False, True, False, False, False, False, True), + (False, False, True, False, False, False, True, False), + (False, False, True, False, False, False, True, True), + (False, False, True, False, False, True, False, False), + (False, False, True, False, False, True, False, True), + (False, False, True, False, False, True, True, False), + (False, False, True, False, False, True, True, True), + (False, False, True, False, True, False, False, False), + (False, False, True, False, True, False, False, True), + (False, False, True, False, True, False, True, False), + (False, False, True, False, True, False, True, True), + (False, False, True, False, True, True, False, False), + (False, False, True, False, True, True, False, True), + (False, False, True, False, True, True, True, False), + (False, False, True, False, True, True, True, True), + (False, False, True, True, False, False, False, False), + (False, False, True, True, False, False, False, True), + (False, False, True, True, False, False, True, False), + (False, False, True, True, False, False, True, True), + (False, False, True, True, False, True, False, False), +
[GitHub] [flink] flinkbot edited a comment on issue #10904: [FLINK-15669] [sql client] fix SQL client can't cancel flink job
flinkbot edited a comment on issue #10904: [FLINK-15669] [sql client] fix SQL client can't cancel flink job URL: https://github.com/apache/flink/pull/10904#issuecomment-576004529 ## CI report: * dbfc485ecea3aec8bc0054ca8bacb310b417c5dc Travis: [FAILURE](https://travis-ci.com/flink-ci/flink/builds/145110835) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4479) * 8637d733546f1863b30ecf78766961a4b4471482 Travis: [SUCCESS](https://travis-ci.com/flink-ci/flink/builds/145171018) Azure: [SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4500) * 202059d5e5c772b1b11ee50b9715b216b6ddeaad Travis: [SUCCESS](https://travis-ci.com/flink-ci/flink/builds/147163955) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4765) * 6d89ef84c8f0673d93a1048350b302e7d24d7484 Travis: [SUCCESS](https://travis-ci.com/flink-ci/flink/builds/149745708) Azure: [SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5335) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Comment Edited] (FLINK-16142) Memory Leak causes Metaspace OOM error on repeated job submission
[ https://issues.apache.org/jira/browse/FLINK-16142?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17040625#comment-17040625 ] Yingjie Cao edited comment on FLINK-16142 at 2/20/20 5:11 AM: -- Could you also print the contextClassLoader of the thread? I wonder if there are any threads which use Flink user ClassLoader as contextClassLoader. I have some doubts about this thread: java-sdk-http-connection-reaper. I searched the Internet and found these: [https://stackoverflow.com/questions/18069042/spring-mvc-webapp-schedule-java-sdk-http-connection-reaper-failed-to-stop], [https://github.com/aws/aws-sdk-java/blob/master/aws-java-sdk-core/src/main/java/com/amazonaws/http/IdleConnectionReaper.java]. I am not sure whether it's the cause or not, but maybe you can try the solution in the stackoverflow page. was (Author: kevin.cyj): Could you also print the contextClassLoader of the thread? I wonder if there are any threads which use Flink user ClassLoader as contextClassLoader. I have some doubts about this thread: java-sdk-http-connection-reaper. I searched the Internet and found these: [https://stackoverflow.com/questions/18069042/spring-mvc-webapp-schedule-java-sdk-http-connection-reaper-failed-to-stop], [https://github.com/aws/aws-sdk-java/blob/master/aws-java-sdk-core/src/main/java/com/amazonaws/http/IdleConnectionReaper.java]. I am not sure whether it's the cause or not, but maybe you can try the solution in the stackoverflow page. (the IdleConnectionReaper thread sleeps 60 seconds before checking the shutdown flag, so after shutting down the IdleConnectionReaper, you may need to wait more than 1min to submit the next job) > Memory Leak causes Metaspace OOM error on repeated job submission > - > > Key: FLINK-16142 > URL: https://issues.apache.org/jira/browse/FLINK-16142 > Project: Flink > Issue Type: Bug > Components: Client / Job Submission >Affects Versions: 1.10.0 >Reporter: Thomas Wozniakowski >Priority: Blocker > Fix For: 1.10.1, 1.11.0 > > > Hi Guys, > We've just tried deploying 1.10.0 as it has lots of shiny stuff that fits our > use-case exactly (RocksDB state backend running in a containerised cluster). > Unfortunately, it seems like there is a memory leak somewhere in the job > submission logic. We are getting this error: > {code:java} > 2020-02-18 10:22:10,020 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph - OPERATOR_NAME > switched from RUNNING to FAILED. > java.lang.OutOfMemoryError: Metaspace > at java.lang.ClassLoader.defineClass1(Native Method) > at java.lang.ClassLoader.defineClass(ClassLoader.java:757) > at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) > at java.net.URLClassLoader.defineClass(URLClassLoader.java:468) > at java.net.URLClassLoader.access$100(URLClassLoader.java:74) > at java.net.URLClassLoader$1.run(URLClassLoader.java:369) > at java.net.URLClassLoader$1.run(URLClassLoader.java:363) > at java.security.AccessController.doPrivileged(Native Method) > at java.net.URLClassLoader.findClass(URLClassLoader.java:362) > at java.lang.ClassLoader.loadClass(ClassLoader.java:419) > at > org.apache.flink.util.ChildFirstClassLoader.loadClass(ChildFirstClassLoader.java:60) > at java.lang.ClassLoader.loadClass(ClassLoader.java:352) > at > org.apache.flink.kinesis.shaded.com.amazonaws.jmx.SdkMBeanRegistrySupport.registerMetricAdminMBean(SdkMBeanRegistrySupport.java:27) > at > org.apache.flink.kinesis.shaded.com.amazonaws.metrics.AwsSdkMetrics.registerMetricAdminMBean(AwsSdkMetrics.java:398) > at > org.apache.flink.kinesis.shaded.com.amazonaws.metrics.AwsSdkMetrics.(AwsSdkMetrics.java:359) > at > org.apache.flink.kinesis.shaded.com.amazonaws.AmazonWebServiceClient.requestMetricCollector(AmazonWebServiceClient.java:728) > at > org.apache.flink.kinesis.shaded.com.amazonaws.AmazonWebServiceClient.isRMCEnabledAtClientOrSdkLevel(AmazonWebServiceClient.java:660) > at > org.apache.flink.kinesis.shaded.com.amazonaws.AmazonWebServiceClient.isRequestMetricsEnabled(AmazonWebServiceClient.java:652) > at > org.apache.flink.kinesis.shaded.com.amazonaws.AmazonWebServiceClient.createExecutionContext(AmazonWebServiceClient.java:611) > at > org.apache.flink.kinesis.shaded.com.amazonaws.AmazonWebServiceClient.createExecutionContext(AmazonWebServiceClient.java:606) > at > org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.executeListShards(AmazonKinesisClient.java:1534) > at > org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.listShards(AmazonKinesisClient.java:1528) > at > org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.listShards(KinesisProxy.java:439) > at >
[jira] [Commented] (FLINK-16142) Memory Leak causes Metaspace OOM error on repeated job submission
[ https://issues.apache.org/jira/browse/FLINK-16142?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17040625#comment-17040625 ] Yingjie Cao commented on FLINK-16142: - Could you also print the contextClassLoader of the thread? I wonder if there are any threads which use Flink user ClassLoader as contextClassLoader. I have some doubts about this thread: java-sdk-http-connection-reaper. I searched the Internet and found these: [https://stackoverflow.com/questions/18069042/spring-mvc-webapp-schedule-java-sdk-http-connection-reaper-failed-to-stop], [https://github.com/aws/aws-sdk-java/blob/master/aws-java-sdk-core/src/main/java/com/amazonaws/http/IdleConnectionReaper.java]. I am not sure whether it's the cause or not, but maybe you can try the solution in the stackoverflow page. (the IdleConnectionReaper thread sleeps 60 seconds before checking the shutdown flag, so after shutting down the IdleConnectionReaper, you may need to wait more than 1min to submit the next job) > Memory Leak causes Metaspace OOM error on repeated job submission > - > > Key: FLINK-16142 > URL: https://issues.apache.org/jira/browse/FLINK-16142 > Project: Flink > Issue Type: Bug > Components: Client / Job Submission >Affects Versions: 1.10.0 >Reporter: Thomas Wozniakowski >Priority: Blocker > Fix For: 1.10.1, 1.11.0 > > > Hi Guys, > We've just tried deploying 1.10.0 as it has lots of shiny stuff that fits our > use-case exactly (RocksDB state backend running in a containerised cluster). > Unfortunately, it seems like there is a memory leak somewhere in the job > submission logic. We are getting this error: > {code:java} > 2020-02-18 10:22:10,020 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph - OPERATOR_NAME > switched from RUNNING to FAILED. > java.lang.OutOfMemoryError: Metaspace > at java.lang.ClassLoader.defineClass1(Native Method) > at java.lang.ClassLoader.defineClass(ClassLoader.java:757) > at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) > at java.net.URLClassLoader.defineClass(URLClassLoader.java:468) > at java.net.URLClassLoader.access$100(URLClassLoader.java:74) > at java.net.URLClassLoader$1.run(URLClassLoader.java:369) > at java.net.URLClassLoader$1.run(URLClassLoader.java:363) > at java.security.AccessController.doPrivileged(Native Method) > at java.net.URLClassLoader.findClass(URLClassLoader.java:362) > at java.lang.ClassLoader.loadClass(ClassLoader.java:419) > at > org.apache.flink.util.ChildFirstClassLoader.loadClass(ChildFirstClassLoader.java:60) > at java.lang.ClassLoader.loadClass(ClassLoader.java:352) > at > org.apache.flink.kinesis.shaded.com.amazonaws.jmx.SdkMBeanRegistrySupport.registerMetricAdminMBean(SdkMBeanRegistrySupport.java:27) > at > org.apache.flink.kinesis.shaded.com.amazonaws.metrics.AwsSdkMetrics.registerMetricAdminMBean(AwsSdkMetrics.java:398) > at > org.apache.flink.kinesis.shaded.com.amazonaws.metrics.AwsSdkMetrics.(AwsSdkMetrics.java:359) > at > org.apache.flink.kinesis.shaded.com.amazonaws.AmazonWebServiceClient.requestMetricCollector(AmazonWebServiceClient.java:728) > at > org.apache.flink.kinesis.shaded.com.amazonaws.AmazonWebServiceClient.isRMCEnabledAtClientOrSdkLevel(AmazonWebServiceClient.java:660) > at > org.apache.flink.kinesis.shaded.com.amazonaws.AmazonWebServiceClient.isRequestMetricsEnabled(AmazonWebServiceClient.java:652) > at > org.apache.flink.kinesis.shaded.com.amazonaws.AmazonWebServiceClient.createExecutionContext(AmazonWebServiceClient.java:611) > at > org.apache.flink.kinesis.shaded.com.amazonaws.AmazonWebServiceClient.createExecutionContext(AmazonWebServiceClient.java:606) > at > org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.executeListShards(AmazonKinesisClient.java:1534) > at > org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.listShards(AmazonKinesisClient.java:1528) > at > org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.listShards(KinesisProxy.java:439) > at > org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.getShardsOfStream(KinesisProxy.java:389) > at > org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.getShardList(KinesisProxy.java:279) > at > org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.discoverNewShardsToSubscribe(KinesisDataFetcher.java:686) > at > org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.run(FlinkKinesisConsumer.java:287) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) > {code} > (The only change in the above text is the OPERATOR_NAME text where I removed > some of
[GitHub] [flink] flinkbot edited a comment on issue #11144: [FLINK-15715] Update ContinuousFileProcessingMigrationTest to restore from 1.10 savepoint
flinkbot edited a comment on issue #11144: [FLINK-15715] Update ContinuousFileProcessingMigrationTest to restore from 1.10 savepoint URL: https://github.com/apache/flink/pull/11144#issuecomment-588270086 ## CI report: * bafa0b96be75a242a6465becc11e82a1afa1de61 Travis: [CANCELED](https://travis-ci.com/flink-ci/flink/builds/149646329) * 66e240be3d3886212b32373a975e6f005a817d39 Travis: [PENDING](https://travis-ci.com/flink-ci/flink/builds/149745729) Azure: [SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5336) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode
zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode URL: https://github.com/apache/flink/pull/7368#discussion_r381736600 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessageClientDecoder.java ## @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.netty; + +import org.apache.flink.runtime.io.network.NetworkClientHandler; +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter; + +import static org.apache.flink.runtime.io.network.netty.NettyMessage.FRAME_HEADER_LENGTH; +import static org.apache.flink.runtime.io.network.netty.NettyMessage.MAGIC_NUMBER; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * Decodes messages from the fragmentary netty buffers. This decoder assumes the + * messages have the following format: + * +---++ + * | FRAME_HEADER || MESSAGE_HEADER | DATA BUFFER (Optional) | + * +---++ + * + * This decoder decodes the frame header and delegates the other following work + * to corresponding message parsers according to the message type. During the process + * of decoding, the decoder and parsers try best to eliminate copying. For the frame + * header and message header, it only cumulates data when they span multiple input buffers. + * For the buffer part, it copies directly to the input channels to avoid future copying. + * + * The format of the frame header is + * +--+--++ + * | FRAME LENGTH (4) | MAGIC NUMBER (4) | ID (1) | + * +--+--++ + */ +public class NettyMessageClientDecoder extends ChannelInboundHandlerAdapter { + + /** The message parser for buffer response. */ +private final NettyMessageDecoderDelegate bufferResponseDecoderDelegate; + +/** The message parser for other messages other than buffer response. */ + private final NettyMessageDecoderDelegate nonBufferResponseDecoderDelegate; + + /** The cumulation buffer for the frame header part. */ + private ByteBuf frameHeaderBuffer; + + /** +* The chosen message parser for the current message. If it is null, then +* we are decoding the frame header part, otherwise we are decoding the actual +* message. +*/ + private NettyMessageDecoderDelegate currentDecoderDelegate; + +NettyMessageClientDecoder(NetworkClientHandler networkClientHandler) { +this.bufferResponseDecoderDelegate = new BufferResponseDecoderDelegate(networkClientHandler); +this.nonBufferResponseDecoderDelegate = new NonBufferResponseDecoderDelegate(); +} + +@Override +public void channelActive(ChannelHandlerContext ctx) throws Exception { +super.channelActive(ctx); + +bufferResponseDecoderDelegate.onChannelActive(ctx.alloc()); +nonBufferResponseDecoderDelegate.onChannelActive(ctx.alloc()); + + frameHeaderBuffer = ctx.alloc().directBuffer(FRAME_HEADER_LENGTH); +} + +@Override +public void channelInactive(ChannelHandlerContext ctx) throws Exception { Review comment: It might be better to handle them dependently, not relying on the implicit logic of other handlers. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #11144: [FLINK-15715] Update ContinuousFileProcessingMigrationTest to restore from 1.10 savepoint
flinkbot edited a comment on issue #11144: [FLINK-15715] Update ContinuousFileProcessingMigrationTest to restore from 1.10 savepoint URL: https://github.com/apache/flink/pull/11144#issuecomment-588270086 ## CI report: * bafa0b96be75a242a6465becc11e82a1afa1de61 Travis: [CANCELED](https://travis-ci.com/flink-ci/flink/builds/149646329) * 66e240be3d3886212b32373a975e6f005a817d39 Travis: [PENDING](https://travis-ci.com/flink-ci/flink/builds/149745729) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #10904: [FLINK-15669] [sql client] fix SQL client can't cancel flink job
flinkbot edited a comment on issue #10904: [FLINK-15669] [sql client] fix SQL client can't cancel flink job URL: https://github.com/apache/flink/pull/10904#issuecomment-576004529 ## CI report: * dbfc485ecea3aec8bc0054ca8bacb310b417c5dc Travis: [FAILURE](https://travis-ci.com/flink-ci/flink/builds/145110835) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4479) * 8637d733546f1863b30ecf78766961a4b4471482 Travis: [SUCCESS](https://travis-ci.com/flink-ci/flink/builds/145171018) Azure: [SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4500) * 202059d5e5c772b1b11ee50b9715b216b6ddeaad Travis: [SUCCESS](https://travis-ci.com/flink-ci/flink/builds/147163955) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4765) * 6d89ef84c8f0673d93a1048350b302e7d24d7484 Travis: [PENDING](https://travis-ci.com/flink-ci/flink/builds/149745708) Azure: [PENDING](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5335) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-15718) Update StatefulJobSavepointMigrationITCase to restore from 1.10 savepoint
[ https://issues.apache.org/jira/browse/FLINK-15718?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17040607#comment-17040607 ] vinoyang commented on FLINK-15718: -- [~aljoscha] and [~chesnay] WDYT about {{LegacyStatefulJobSavepointMigrationITCase}}. We did not process the migration test for it after Flink 1.4. > Update StatefulJobSavepointMigrationITCase to restore from 1.10 savepoint > - > > Key: FLINK-15718 > URL: https://issues.apache.org/jira/browse/FLINK-15718 > Project: Flink > Issue Type: Sub-task > Components: Tests >Affects Versions: 1.11.0 >Reporter: vinoyang >Priority: Major > Fix For: 1.11.0 > > > Update {{StatefulJobSavepointMigrationITCase}} to restore from 1.10 savepoint -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] flinkbot edited a comment on issue #11144: [FLINK-15715] Update ContinuousFileProcessingMigrationTest to restore from 1.10 savepoint
flinkbot edited a comment on issue #11144: [FLINK-15715] Update ContinuousFileProcessingMigrationTest to restore from 1.10 savepoint URL: https://github.com/apache/flink/pull/11144#issuecomment-588270086 ## CI report: * bafa0b96be75a242a6465becc11e82a1afa1de61 Travis: [PENDING](https://travis-ci.com/flink-ci/flink/builds/149646329) * 66e240be3d3886212b32373a975e6f005a817d39 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode
zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode URL: https://github.com/apache/flink/pull/7368#discussion_r381714621 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessageClientDecoder.java ## @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.netty; + +import org.apache.flink.runtime.io.network.NetworkClientHandler; +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter; + +import static org.apache.flink.runtime.io.network.netty.NettyMessage.FRAME_HEADER_LENGTH; +import static org.apache.flink.runtime.io.network.netty.NettyMessage.MAGIC_NUMBER; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * Decodes messages from the fragmentary netty buffers. This decoder assumes the + * messages have the following format: + * +---++ + * | FRAME_HEADER || MESSAGE_HEADER | DATA BUFFER (Optional) | + * +---++ + * + * This decoder decodes the frame header and delegates the other following work + * to corresponding message parsers according to the message type. During the process + * of decoding, the decoder and parsers try best to eliminate copying. For the frame + * header and message header, it only cumulates data when they span multiple input buffers. + * For the buffer part, it copies directly to the input channels to avoid future copying. + * + * The format of the frame header is + * +--+--++ + * | FRAME LENGTH (4) | MAGIC NUMBER (4) | ID (1) | + * +--+--++ + */ +public class NettyMessageClientDecoder extends ChannelInboundHandlerAdapter { + + /** The message parser for buffer response. */ +private final NettyMessageDecoderDelegate bufferResponseDecoderDelegate; + +/** The message parser for other messages other than buffer response. */ + private final NettyMessageDecoderDelegate nonBufferResponseDecoderDelegate; + + /** The cumulation buffer for the frame header part. */ + private ByteBuf frameHeaderBuffer; + + /** +* The chosen message parser for the current message. If it is null, then +* we are decoding the frame header part, otherwise we are decoding the actual +* message. +*/ + private NettyMessageDecoderDelegate currentDecoderDelegate; + +NettyMessageClientDecoder(NetworkClientHandler networkClientHandler) { +this.bufferResponseDecoderDelegate = new BufferResponseDecoderDelegate(networkClientHandler); +this.nonBufferResponseDecoderDelegate = new NonBufferResponseDecoderDelegate(); +} + +@Override +public void channelActive(ChannelHandlerContext ctx) throws Exception { +super.channelActive(ctx); + +bufferResponseDecoderDelegate.onChannelActive(ctx.alloc()); +nonBufferResponseDecoderDelegate.onChannelActive(ctx.alloc()); + + frameHeaderBuffer = ctx.alloc().directBuffer(FRAME_HEADER_LENGTH); +} + +@Override +public void channelInactive(ChannelHandlerContext ctx) throws Exception { +super.channelInactive(ctx); + + bufferResponseDecoderDelegate.release(); + nonBufferResponseDecoderDelegate.release(); + + frameHeaderBuffer.release(); +} + +@Override +public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { +if (!(msg instanceof ByteBuf)) { +ctx.fireChannelRead(msg); +return; +} + +ByteBuf data = (ByteBuf) msg; + +try { +while (data.isReadable()) { + if (currentDecoderDelegate == null) { Review comment: If we refer to `StreamTaskNetworkInput#emitNext`, it always judges the
[GitHub] [flink] flinkbot edited a comment on issue #10904: [FLINK-15669] [sql client] fix SQL client can't cancel flink job
flinkbot edited a comment on issue #10904: [FLINK-15669] [sql client] fix SQL client can't cancel flink job URL: https://github.com/apache/flink/pull/10904#issuecomment-576004529 ## CI report: * dbfc485ecea3aec8bc0054ca8bacb310b417c5dc Travis: [FAILURE](https://travis-ci.com/flink-ci/flink/builds/145110835) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4479) * 8637d733546f1863b30ecf78766961a4b4471482 Travis: [SUCCESS](https://travis-ci.com/flink-ci/flink/builds/145171018) Azure: [SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4500) * 202059d5e5c772b1b11ee50b9715b216b6ddeaad Travis: [SUCCESS](https://travis-ci.com/flink-ci/flink/builds/147163955) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4765) * 6d89ef84c8f0673d93a1048350b302e7d24d7484 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode
zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode URL: https://github.com/apache/flink/pull/7368#discussion_r381704846 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/BufferResponseDecoderDelegate.java ## @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.netty; + +import org.apache.flink.runtime.io.network.NetworkClientHandler; +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufAllocator; + +/** + * The parser for {@link org.apache.flink.runtime.io.network.netty.NettyMessage.BufferResponse}. + */ +public class BufferResponseDecoderDelegate implements NettyMessageDecoderDelegate { + + /** The network client handler of current channel. */ + private final NetworkClientHandler networkClientHandler; + + /** The Flink Buffer allocator. */ + private final NetworkBufferAllocator allocator; + + /** The cumulation buffer of message header. */ + private ByteBuf messageHeaderCumulationBuffer; + + /** +* The current BufferResponse message that are process the buffer part. +* If it is null, we are still processing the message header part, otherwise +* we are processing the buffer part. +*/ + private NettyMessage.BufferResponse currentResponse; + + /** How much bytes have been received or discarded for the buffer part. */ + private int decodedBytesOfBuffer; + + public BufferResponseDecoderDelegate(NetworkClientHandler networkClientHandler) { + this.networkClientHandler = networkClientHandler; + this.allocator = new NetworkBufferAllocator(networkClientHandler); + } + + @Override + public void onChannelActive(ByteBufAllocator alloc) { + messageHeaderCumulationBuffer = alloc.directBuffer(NettyMessage.BufferResponse.MESSAGE_HEADER_LENGTH); + } + + @Override + public void startParsingMessage(int msgId, int messageLength) { + currentResponse = null; + decodedBytesOfBuffer = 0; + + messageHeaderCumulationBuffer.clear(); + } + + @Override + public ParseResult onChannelRead(ByteBuf data) throws Exception { + if (currentResponse == null) { + ByteBuf toDecode = ByteBufUtils.cumulate(messageHeaderCumulationBuffer, data, NettyMessage.BufferResponse.MESSAGE_HEADER_LENGTH); + + if (toDecode != null) { + currentResponse = NettyMessage.BufferResponse.readFrom(toDecode, allocator); + + if (currentResponse.bufferSize == 0) { Review comment: Yes, this case really exists now. The previous handling of this case only happens in one place, but now it happens in multiple places which was my previous concern. Let me think whether can improve it a bit. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink-statefun] tzulitai closed pull request #26: [FLINK-16176] Use PersistedAppendingBuffer
tzulitai closed pull request #26: [FLINK-16176] Use PersistedAppendingBuffer URL: https://github.com/apache/flink-statefun/pull/26 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode
zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode URL: https://github.com/apache/flink/pull/7368#discussion_r381703666 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessageClientDecoder.java ## @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.netty; + +import org.apache.flink.runtime.io.network.NetworkClientHandler; +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter; + +import static org.apache.flink.runtime.io.network.netty.NettyMessage.FRAME_HEADER_LENGTH; +import static org.apache.flink.runtime.io.network.netty.NettyMessage.MAGIC_NUMBER; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * Decodes messages from the fragmentary netty buffers. This decoder assumes the + * messages have the following format: + * +---++ + * | FRAME_HEADER || MESSAGE_HEADER | DATA BUFFER (Optional) | + * +---++ + * + * This decoder decodes the frame header and delegates the other following work + * to corresponding message parsers according to the message type. During the process + * of decoding, the decoder and parsers try best to eliminate copying. For the frame + * header and message header, it only cumulates data when they span multiple input buffers. + * For the buffer part, it copies directly to the input channels to avoid future copying. + * + * The format of the frame header is + * +--+--++ + * | FRAME LENGTH (4) | MAGIC NUMBER (4) | ID (1) | + * +--+--++ + */ +public class NettyMessageClientDecoder extends ChannelInboundHandlerAdapter { + + /** The message parser for buffer response. */ +private final NettyMessageDecoderDelegate bufferResponseDecoderDelegate; + +/** The message parser for other messages other than buffer response. */ + private final NettyMessageDecoderDelegate nonBufferResponseDecoderDelegate; + + /** The cumulation buffer for the frame header part. */ + private ByteBuf frameHeaderBuffer; + + /** +* The chosen message parser for the current message. If it is null, then +* we are decoding the frame header part, otherwise we are decoding the actual +* message. +*/ + private NettyMessageDecoderDelegate currentDecoderDelegate; + +NettyMessageClientDecoder(NetworkClientHandler networkClientHandler) { +this.bufferResponseDecoderDelegate = new BufferResponseDecoderDelegate(networkClientHandler); +this.nonBufferResponseDecoderDelegate = new NonBufferResponseDecoderDelegate(); +} + +@Override +public void channelActive(ChannelHandlerContext ctx) throws Exception { +super.channelActive(ctx); + +bufferResponseDecoderDelegate.onChannelActive(ctx.alloc()); +nonBufferResponseDecoderDelegate.onChannelActive(ctx.alloc()); + + frameHeaderBuffer = ctx.alloc().directBuffer(FRAME_HEADER_LENGTH); +} + +@Override +public void channelInactive(ChannelHandlerContext ctx) throws Exception { +super.channelInactive(ctx); + + bufferResponseDecoderDelegate.release(); + nonBufferResponseDecoderDelegate.release(); + + frameHeaderBuffer.release(); +} + +@Override +public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { +if (!(msg instanceof ByteBuf)) { +ctx.fireChannelRead(msg); Review comment: It would be friendly for judging before type transformation and i guess it has no performance concern here. This is an automated message from the Apache Git Service. To respond to the message, please log on
[GitHub] [flink] zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode
zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode URL: https://github.com/apache/flink/pull/7368#discussion_r381703878 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/BufferResponseDecoderDelegate.java ## @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.netty; + +import org.apache.flink.runtime.io.network.NetworkClientHandler; +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufAllocator; + +/** + * The parser for {@link org.apache.flink.runtime.io.network.netty.NettyMessage.BufferResponse}. + */ +public class BufferResponseDecoderDelegate implements NettyMessageDecoderDelegate { + + /** The network client handler of current channel. */ + private final NetworkClientHandler networkClientHandler; + + /** The Flink Buffer allocator. */ + private final NetworkBufferAllocator allocator; + + /** The cumulation buffer of message header. */ + private ByteBuf messageHeaderCumulationBuffer; + + /** +* The current BufferResponse message that are process the buffer part. +* If it is null, we are still processing the message header part, otherwise +* we are processing the buffer part. +*/ + private NettyMessage.BufferResponse currentResponse; + + /** How much bytes have been received or discarded for the buffer part. */ + private int decodedBytesOfBuffer; + + public BufferResponseDecoderDelegate(NetworkClientHandler networkClientHandler) { + this.networkClientHandler = networkClientHandler; + this.allocator = new NetworkBufferAllocator(networkClientHandler); + } + + @Override + public void onChannelActive(ByteBufAllocator alloc) { + messageHeaderCumulationBuffer = alloc.directBuffer(NettyMessage.BufferResponse.MESSAGE_HEADER_LENGTH); + } + + @Override + public void startParsingMessage(int msgId, int messageLength) { + currentResponse = null; + decodedBytesOfBuffer = 0; + + messageHeaderCumulationBuffer.clear(); + } + + @Override + public ParseResult onChannelRead(ByteBuf data) throws Exception { + if (currentResponse == null) { + ByteBuf toDecode = ByteBufUtils.cumulate(messageHeaderCumulationBuffer, data, NettyMessage.BufferResponse.MESSAGE_HEADER_LENGTH); Review comment: I guess it would be fine, let me double check the codes then. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #11144: [FLINK-15715] Update ContinuousFileProcessingMigrationTest to restore from 1.10 savepoint
flinkbot edited a comment on issue #11144: [FLINK-15715] Update ContinuousFileProcessingMigrationTest to restore from 1.10 savepoint URL: https://github.com/apache/flink/pull/11144#issuecomment-588270086 ## CI report: * bafa0b96be75a242a6465becc11e82a1afa1de61 Travis: [PENDING](https://travis-ci.com/flink-ci/flink/builds/149646329) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-16181) IfCallGen will throw NPE for primitive types in blink
[ https://issues.apache.org/jira/browse/FLINK-16181?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17040595#comment-17040595 ] Benchao Li commented on FLINK-16181: [~lzljs3620320] Thanks for your confirmation, I'd like to fix this. > IfCallGen will throw NPE for primitive types in blink > - > > Key: FLINK-16181 > URL: https://issues.apache.org/jira/browse/FLINK-16181 > Project: Flink > Issue Type: Bug >Reporter: Benchao Li >Priority: Major > > It can be reproduced by a simple test case: > Add below to {{ScalarOperatorsTest}} > {code:java} > testSqlApi("IF(true, CAST('non-numeric' AS BIGINT), 0)", "null") > {code} > > IMO, it's {{IfCallGen}}'s bug, which should judge the nullTerm of operands > first before assignment. > cc [~lzljs3620320] [~jark] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode
zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode URL: https://github.com/apache/flink/pull/7368#discussion_r381696893 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessageDecoderDelegate.java ## @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.netty; + +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufAllocator; + +import javax.annotation.Nullable; + +/** + * Parsers for specified netty messages. + */ +public interface NettyMessageDecoderDelegate { Review comment: We might not consider the `NettyMessageServerDecoder` atm, because it is still named `NettyMessageDecoder` now. If we want to refactor the server name accordingly future, we can consider the consistency with the client side then. There is a bit difference between client and server. On server side we only have one explicit decoder in flink stack now, but on client side there actually exits two decoders explicitly which should be distinguished for better understanding. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] godfreyhe commented on issue #10904: [FLINK-15669] [sql client] fix SQL client can't cancel flink job
godfreyhe commented on issue #10904: [FLINK-15669] [sql client] fix SQL client can't cancel flink job URL: https://github.com/apache/flink/pull/10904#issuecomment-588584960 Thanks for the suggestion @docete . Currently we can't get large result set based on `Accumulator`. so I think it's a temporary solution , we prepare to use [`Table#collect`](https://issues.apache.org/jira/browse/FLINK-14807) to solve this. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode
zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode URL: https://github.com/apache/flink/pull/7368#discussion_r381696893 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessageDecoderDelegate.java ## @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.netty; + +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufAllocator; + +import javax.annotation.Nullable; + +/** + * Parsers for specified netty messages. + */ +public interface NettyMessageDecoderDelegate { Review comment: We might not consider the `NettyMessageServerDecoder` atm, because it is still named `NettyMessageDecoder` now. If we want to refactor the server name accordingly future, we can consider the client side then. There is a bit difference between client and server. On server side we only have one explicit decoder in flink stack now, but on client side there actually exits two decoders explicitly which should be distinguished for better understanding. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-16181) IfCallGen will throw NPE for primitive types in blink
[ https://issues.apache.org/jira/browse/FLINK-16181?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17040593#comment-17040593 ] Jingsong Lee commented on FLINK-16181: -- Thanks [~libenchao] for reporting. You are right, it is a bug. We should deal with nullTerms. Do you want to fix this? > IfCallGen will throw NPE for primitive types in blink > - > > Key: FLINK-16181 > URL: https://issues.apache.org/jira/browse/FLINK-16181 > Project: Flink > Issue Type: Bug >Reporter: Benchao Li >Priority: Major > > It can be reproduced by a simple test case: > Add below to {{ScalarOperatorsTest}} > {code:java} > testSqlApi("IF(true, CAST('non-numeric' AS BIGINT), 0)", "null") > {code} > > IMO, it's {{IfCallGen}}'s bug, which should judge the nullTerm of operands > first before assignment. > cc [~lzljs3620320] [~jark] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] flinkbot edited a comment on issue #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode
flinkbot edited a comment on issue #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode URL: https://github.com/apache/flink/pull/7368#issuecomment-567435407 ## CI report: * 6b12b52b99894864db993a2fa8ab2bfcce0edd5c Travis: [FAILURE](https://travis-ci.com/flink-ci/flink/builds/141736780) * 25fc5608d0d7143f4384a7648054e9c99ebb32e9 Travis: [SUCCESS](https://travis-ci.com/flink-ci/flink/builds/145174867) Azure: [SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4502) * 0bd82aa136c43af8fdfde0f2a64284323c40c999 Travis: [FAILURE](https://travis-ci.com/flink-ci/flink/builds/148123283) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4982) * 99f9c6528f53574829368b52de5fa6c3f0a41d44 Travis: [FAILURE](https://travis-ci.com/flink-ci/flink/builds/148132166) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4986) * e61197d1bb5a61a55051b193a01543edfa8a22b6 Travis: [FAILURE](https://travis-ci.com/flink-ci/flink/builds/148706320) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5117) * b1bc198a051be7c2eaace3f0c0601187ff0b98ed Travis: [SUCCESS](https://travis-ci.com/flink-ci/flink/builds/149319970) Azure: [SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5258) * 3b41ea0500078556f85db2a1a47386e02c62c5b5 Travis: [FAILURE](https://travis-ci.com/flink-ci/flink/builds/149736534) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5334) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-16181) IfCallGen will throw NPE for primitive types in blink
Benchao Li created FLINK-16181: -- Summary: IfCallGen will throw NPE for primitive types in blink Key: FLINK-16181 URL: https://issues.apache.org/jira/browse/FLINK-16181 Project: Flink Issue Type: Bug Reporter: Benchao Li It can be reproduced by a simple test case: Add below to {{ScalarOperatorsTest}} {code:java} testSqlApi("IF(true, CAST('non-numeric' AS BIGINT), 0)", "null") {code} IMO, it's {{IfCallGen}}'s bug, which should judge the nullTerm of operands first before assignment. cc [~lzljs3620320] [~jark] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] yanghua commented on issue #11144: [FLINK-15715] Update ContinuousFileProcessingMigrationTest to restore from 1.10 savepoint
yanghua commented on issue #11144: [FLINK-15715] Update ContinuousFileProcessingMigrationTest to restore from 1.10 savepoint URL: https://github.com/apache/flink/pull/11144#issuecomment-588578559 @flinkbot run travis This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-16180) Replacing vertexExecution in ScheduledUnit with executionVertexID
[ https://issues.apache.org/jira/browse/FLINK-16180?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhu Zhu updated FLINK-16180: Description: {{ScheduledUnit#vertexExecution}} is nullable but {{ProgrammedSlotProvider}} requires it to be non-null to work. This makes {{ProgrammedSlotProvider}} not able to be used by new scheduler tests since {{vertexExecution}} is never set in the new scheduler code path. It blocks us from reworking tests which are based legacy scheduling to base on the new scheduler. I would propose to replace the nullable vertexExecution with a non-null executionVertexID. This change would not break anything since {{vertexExecution}} is mainly used by {{ProgrammedSlotProvider}} for testing. {{ProgrammedSlotProvider}} uses it to retrieve the JobVertexID and subtaskIndex. The only other place where {{ScheduledUnit#vertexExecution}} is used is to log the involved task for slot allocation in {{SchedulerImpl#allocateSlotInternal(...)}}. The log is problematic at the moment with the new scheduler since the vertexExecution is null. This change can fix the problematic log. This change would also fix a NPE issue reported in FLINK-16145. was: {{ScheduledUnit#vertexExecution}} is nullable but {{ProgrammedSlotProvider}} requires it to be non-null to work. This makes {{ProgrammedSlotProvider}} not able to be used by new scheduler tests since {{vertexExecution}} is never set in the new scheduler code path. It blocks us from reworking tests which are based legacy scheduling to base on the new scheduler. I would propose to replace vertexExecution with a non-null executionVertexID. This change would not break anything since {{vertexExecution}} is mainly used by {{ProgrammedSlotProvider}} in tests and logging in production. And the log would be broken if it is null. This would also fix a potential bug, as reported in FLINK-16145. > Replacing vertexExecution in ScheduledUnit with executionVertexID > - > > Key: FLINK-16180 > URL: https://issues.apache.org/jira/browse/FLINK-16180 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Coordination >Affects Versions: 1.11.0 >Reporter: Zhu Zhu >Assignee: Zhu Zhu >Priority: Major > Fix For: 1.11.0 > > > {{ScheduledUnit#vertexExecution}} is nullable but {{ProgrammedSlotProvider}} > requires it to be non-null to work. This makes {{ProgrammedSlotProvider}} not > able to be used by new scheduler tests since {{vertexExecution}} is never set > in the new scheduler code path. It blocks us from reworking tests which are > based legacy scheduling to base on the new scheduler. > I would propose to replace the nullable vertexExecution with a non-null > executionVertexID. > This change would not break anything since {{vertexExecution}} is mainly used > by {{ProgrammedSlotProvider}} for testing. {{ProgrammedSlotProvider}} uses it > to retrieve the JobVertexID and subtaskIndex. > The only other place where {{ScheduledUnit#vertexExecution}} is used is to > log the involved task for slot allocation in > {{SchedulerImpl#allocateSlotInternal(...)}}. The log is problematic at the > moment with the new scheduler since the vertexExecution is null. This change > can fix the problematic log. > This change would also fix a NPE issue reported in FLINK-16145. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-14807) Add Table#collect api for fetching data to client
[ https://issues.apache.org/jira/browse/FLINK-14807?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17040585#comment-17040585 ] godfrey he commented on FLINK-14807: Thanks for the propose, [~TsReaper]. About "What if the job restarts?" part, the sink can not guarantee the records's order, so the REST server can not just skip the results before the token. Maybe the REST server needs re-pull all data. > Add Table#collect api for fetching data to client > - > > Key: FLINK-14807 > URL: https://issues.apache.org/jira/browse/FLINK-14807 > Project: Flink > Issue Type: New Feature > Components: Table SQL / API >Affects Versions: 1.9.1 >Reporter: Jeff Zhang >Priority: Major > Labels: usability > Fix For: 1.11.0 > > Attachments: table-collect.png > > > Currently, it is very unconvinient for user to fetch data of flink job unless > specify sink expclitly and then fetch data from this sink via its api (e.g. > write to hdfs sink, then read data from hdfs). However, most of time user > just want to get the data and do whatever processing he want. So it is very > necessary for flink to provide api Table#collect for this purpose. > > Other apis such as Table#head, Table#print is also helpful. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink-statefun] tzulitai commented on a change in pull request #26: [FLINK-16176] Use PersistedAppendingBuffer
tzulitai commented on a change in pull request #26: [FLINK-16176] Use PersistedAppendingBuffer URL: https://github.com/apache/flink-statefun/pull/26#discussion_r381674329 ## File path: statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunction.java ## @@ -221,11 +229,14 @@ private InvocationResponse unpackInvocationResultOrThrow( "Failure forwarding a message to a remote function " + self, asyncResult.throwable()); } InputStream httpResponseBody = responseBody(asyncResult.value()); -FromFunction fromFunction = parseProtobufOrThrow(FromFunction.parser(), httpResponseBody); -checkState( -fromFunction.hasInvocationResult(), -"The received HTTP payload does not contain an InvocationResult, but rather [%s]", -fromFunction); -return fromFunction.getInvocationResult(); +try { Review comment: @igalshilman The changes here seem unrelated to the ticket. Would be nice if this is at least a separate commit, as it is a bit confusing at first glance to see an unrelated change. Just a comment for the future, no need to change this now, I'll proceed to merge as is. Also as a side comment: this change is to make things easier for us to test the WIP remote http function, right? If yes, I think we deserve a JIRA to keep track of this. For instance, the method name `unpackInvocationResultOrThrow` no longer makes sense now given the change. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-16145) ScheduledUnit toString method throw NPE when Execution is null
[ https://issues.apache.org/jira/browse/FLINK-16145?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17040582#comment-17040582 ] Zhu Zhu commented on FLINK-16145: - Thanks for reporting this issue [~liuyufei]! This is a bug since {{vertexExecution}} is nullable while {{toString()}} always invokes {{vertexExecution.getVertexWithAttempt()}}. It does not cause production problem though at the moment. {{vertexExecution}} is also causing some other problems so I'm planning to replace it with an ExecutionVertexID (see FLINK-16180). That would also fix this potential issue. > ScheduledUnit toString method throw NPE when Execution is null > -- > > Key: FLINK-16145 > URL: https://issues.apache.org/jira/browse/FLINK-16145 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.10.0 >Reporter: YufeiLiu >Priority: Major > Attachments: image-2020-02-18-19-58-38-506.png > > > !image-2020-02-18-19-58-38-506.png! -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-16180) Replacing vertexExecution in ScheduledUnit with executionVertexID
[ https://issues.apache.org/jira/browse/FLINK-16180?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhu Zhu updated FLINK-16180: Parent: FLINK-15626 Issue Type: Sub-task (was: Improvement) > Replacing vertexExecution in ScheduledUnit with executionVertexID > - > > Key: FLINK-16180 > URL: https://issues.apache.org/jira/browse/FLINK-16180 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Coordination >Affects Versions: 1.11.0 >Reporter: Zhu Zhu >Assignee: Zhu Zhu >Priority: Major > Fix For: 1.11.0 > > > {{ScheduledUnit#vertexExecution}} is nullable but {{ProgrammedSlotProvider}} > requires it to be non-null to work. This makes {{ProgrammedSlotProvider}} not > able to be used by new scheduler tests since {{vertexExecution}} is never set > in the new scheduler code path. It blocks us from reworking tests which are > based legacy scheduling to base on the new scheduler. > I would propose to replace vertexExecution with a non-null executionVertexID. > This change would not break anything since {{vertexExecution}} is mainly used > by {{ProgrammedSlotProvider}} in tests and logging in production. And the log > would be broken if it is null. > This would also fix a potential bug, as reported in FLINK-16145. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink-statefun] tzulitai commented on a change in pull request #26: [FLINK-16176] Use PersistedAppendingBuffer
tzulitai commented on a change in pull request #26: [FLINK-16176] Use PersistedAppendingBuffer URL: https://github.com/apache/flink-statefun/pull/26#discussion_r381674686 ## File path: statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunction.java ## @@ -112,13 +107,26 @@ private void onAsyncResult( InvocationResponse invocationResult = unpackInvocationResultOrThrow(context.self(), asyncResult); handleInvocationResponse(context, invocationResult); -InvocationBatchRequest nextBatch = batch.get(); +InvocationBatchRequest.Builder nextBatch = getNextBatch(); if (nextBatch == null) { hasInFlightRpc.clear(); return; } batch.clear(); -sendToFunction(context, nextBatch.toBuilder()); +sendToFunction(context, nextBatch); + } + + @Nullable + private InvocationBatchRequest.Builder getNextBatch() { +@Nullable Iterable next = batch.view(); +if (next == null) { + return null; +} +InvocationBatchRequest.Builder builder = InvocationBatchRequest.newBuilder(); +for (Invocation invocation : next) { Review comment: I think this can be just `builder.addAllInvocations(nextBatch)` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink-statefun] tzulitai commented on a change in pull request #26: [FLINK-16176] Use PersistedAppendingBuffer
tzulitai commented on a change in pull request #26: [FLINK-16176] Use PersistedAppendingBuffer URL: https://github.com/apache/flink-statefun/pull/26#discussion_r381674329 ## File path: statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunction.java ## @@ -221,11 +229,14 @@ private InvocationResponse unpackInvocationResultOrThrow( "Failure forwarding a message to a remote function " + self, asyncResult.throwable()); } InputStream httpResponseBody = responseBody(asyncResult.value()); -FromFunction fromFunction = parseProtobufOrThrow(FromFunction.parser(), httpResponseBody); -checkState( -fromFunction.hasInvocationResult(), -"The received HTTP payload does not contain an InvocationResult, but rather [%s]", -fromFunction); -return fromFunction.getInvocationResult(); +try { Review comment: @igalshilman The changes here seem unrelated to the ticket. Would be nice if this is at least a separate commit, as it is a bit confusing at first glance to see an unrelated change. Just a comment for the future, no need to change this now, I'll proceed to merge as is. Also as a side comment: this change is to make things easier for us to test the WIP remote http function, right? If yes, I think we deserve a JIRA to keep track of this. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-16180) Replacing vertexExecution in ScheduledUnit with executionVertexID
Zhu Zhu created FLINK-16180: --- Summary: Replacing vertexExecution in ScheduledUnit with executionVertexID Key: FLINK-16180 URL: https://issues.apache.org/jira/browse/FLINK-16180 Project: Flink Issue Type: Improvement Components: Runtime / Coordination Affects Versions: 1.11.0 Reporter: Zhu Zhu Assignee: Zhu Zhu Fix For: 1.11.0 {{ScheduledUnit#vertexExecution}} is nullable but {{ProgrammedSlotProvider}} requires it to be non-null to work. This makes {{ProgrammedSlotProvider}} not able to be used by new scheduler tests since {{vertexExecution}} is never set in the new scheduler code path. It blocks us from reworking tests which are based legacy scheduling to base on the new scheduler. I would propose to replace vertexExecution with a non-null executionVertexID. This change would not break anything since {{vertexExecution}} is mainly used by {{ProgrammedSlotProvider}} in tests and logging in production. And the log would be broken if it is null. This would also fix a potential bug, as reported in FLINK-16145. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] danny0405 commented on a change in pull request #11116: [FLINK-15349] add 'create catalog' DDL to blink planner
danny0405 commented on a change in pull request #6: [FLINK-15349] add 'create catalog' DDL to blink planner URL: https://github.com/apache/flink/pull/6#discussion_r381670264 ## File path: flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java ## @@ -95,6 +95,21 @@ public void testUseCatalog() { check("use catalog a", "USE CATALOG `A`"); } + @Test + public void testCreateCatalog() { + check( + "create catalog c1\n" + + " WITH (\n" + Review comment: But your syntax allows that. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-16179) Use configuration from TableFactory in hive connector
Jingsong Lee created FLINK-16179: Summary: Use configuration from TableFactory in hive connector Key: FLINK-16179 URL: https://issues.apache.org/jira/browse/FLINK-16179 Project: Flink Issue Type: Improvement Components: Connectors / Hive Reporter: Jingsong Lee Fix For: 1.11.0 Now {{HiveOptions}} is used for {{GlobalConfiguration.loadConfiguration()}} . It is not natural for table, we should use configuration from TableFactory to enable table config. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] lirui-apache commented on a change in pull request #11116: [FLINK-15349] add 'create catalog' DDL to blink planner
lirui-apache commented on a change in pull request #6: [FLINK-15349] add 'create catalog' DDL to blink planner URL: https://github.com/apache/flink/pull/6#discussion_r381654413 ## File path: flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl ## @@ -55,6 +55,30 @@ SqlUseCatalog SqlUseCatalog() : } } +/** +* Parses a create catalog statement. +* CREATE CATALOG catalog_name [WITH (property_name=property_value, ...)]; +*/ +SqlCreate SqlCreateCatalog(Span s, boolean replace) : +{ +SqlParserPos startPos; +SqlIdentifier catalogName; +SqlNodeList propertyList = SqlNodeList.EMPTY; +} +{ + { startPos = getPos(); } +catalogName = CompoundIdentifier() Review comment: Why do we support CompoundIdentifier for catalog names? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode
flinkbot edited a comment on issue #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode URL: https://github.com/apache/flink/pull/7368#issuecomment-567435407 ## CI report: * 6b12b52b99894864db993a2fa8ab2bfcce0edd5c Travis: [FAILURE](https://travis-ci.com/flink-ci/flink/builds/141736780) * 25fc5608d0d7143f4384a7648054e9c99ebb32e9 Travis: [SUCCESS](https://travis-ci.com/flink-ci/flink/builds/145174867) Azure: [SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4502) * 0bd82aa136c43af8fdfde0f2a64284323c40c999 Travis: [FAILURE](https://travis-ci.com/flink-ci/flink/builds/148123283) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4982) * 99f9c6528f53574829368b52de5fa6c3f0a41d44 Travis: [FAILURE](https://travis-ci.com/flink-ci/flink/builds/148132166) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4986) * e61197d1bb5a61a55051b193a01543edfa8a22b6 Travis: [FAILURE](https://travis-ci.com/flink-ci/flink/builds/148706320) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5117) * b1bc198a051be7c2eaace3f0c0601187ff0b98ed Travis: [SUCCESS](https://travis-ci.com/flink-ci/flink/builds/149319970) Azure: [SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5258) * 3b41ea0500078556f85db2a1a47386e02c62c5b5 Travis: [PENDING](https://travis-ci.com/flink-ci/flink/builds/149736534) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5334) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode
flinkbot edited a comment on issue #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode URL: https://github.com/apache/flink/pull/7368#issuecomment-567435407 ## CI report: * 6b12b52b99894864db993a2fa8ab2bfcce0edd5c Travis: [FAILURE](https://travis-ci.com/flink-ci/flink/builds/141736780) * 25fc5608d0d7143f4384a7648054e9c99ebb32e9 Travis: [SUCCESS](https://travis-ci.com/flink-ci/flink/builds/145174867) Azure: [SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4502) * 0bd82aa136c43af8fdfde0f2a64284323c40c999 Travis: [FAILURE](https://travis-ci.com/flink-ci/flink/builds/148123283) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4982) * 99f9c6528f53574829368b52de5fa6c3f0a41d44 Travis: [FAILURE](https://travis-ci.com/flink-ci/flink/builds/148132166) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4986) * e61197d1bb5a61a55051b193a01543edfa8a22b6 Travis: [FAILURE](https://travis-ci.com/flink-ci/flink/builds/148706320) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5117) * b1bc198a051be7c2eaace3f0c0601187ff0b98ed Travis: [SUCCESS](https://travis-ci.com/flink-ci/flink/builds/149319970) Azure: [SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5258) * 3b41ea0500078556f85db2a1a47386e02c62c5b5 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode
gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode URL: https://github.com/apache/flink/pull/7368#discussion_r381636671 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/BufferResponseDecoderDelegate.java ## @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.netty; + +import org.apache.flink.runtime.io.network.NetworkClientHandler; +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufAllocator; + +/** + * The parser for {@link org.apache.flink.runtime.io.network.netty.NettyMessage.BufferResponse}. + */ +public class BufferResponseDecoderDelegate implements NettyMessageDecoderDelegate { + + /** The network client handler of current channel. */ + private final NetworkClientHandler networkClientHandler; + + /** The Flink Buffer allocator. */ + private final NetworkBufferAllocator allocator; + + /** The cumulation buffer of message header. */ + private ByteBuf messageHeaderCumulationBuffer; + + /** +* The current BufferResponse message that are process the buffer part. +* If it is null, we are still processing the message header part, otherwise +* we are processing the buffer part. +*/ + private NettyMessage.BufferResponse currentResponse; + + /** How much bytes have been received or discarded for the buffer part. */ + private int decodedBytesOfBuffer; + + public BufferResponseDecoderDelegate(NetworkClientHandler networkClientHandler) { + this.networkClientHandler = networkClientHandler; + this.allocator = new NetworkBufferAllocator(networkClientHandler); + } + + @Override + public void onChannelActive(ByteBufAllocator alloc) { + messageHeaderCumulationBuffer = alloc.directBuffer(NettyMessage.BufferResponse.MESSAGE_HEADER_LENGTH); + } + + @Override + public void startParsingMessage(int msgId, int messageLength) { + currentResponse = null; + decodedBytesOfBuffer = 0; + + messageHeaderCumulationBuffer.clear(); + } + + @Override + public ParseResult onChannelRead(ByteBuf data) throws Exception { + if (currentResponse == null) { + ByteBuf toDecode = ByteBufUtils.cumulate(messageHeaderCumulationBuffer, data, NettyMessage.BufferResponse.MESSAGE_HEADER_LENGTH); + + if (toDecode != null) { + currentResponse = NettyMessage.BufferResponse.readFrom(toDecode, allocator); + + if (currentResponse.bufferSize == 0) { Review comment: Currently there is branch for `size = 0` cases. I think we could check if `size = 0` still occurs in separate issue and if there is no such condition, we might consider simplify the related process. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #7702: [FLINK-11088][Security][YARN] Allow YARN to discover pre-installed keytab files
flinkbot edited a comment on issue #7702: [FLINK-11088][Security][YARN] Allow YARN to discover pre-installed keytab files URL: https://github.com/apache/flink/pull/7702#issuecomment-572195960 ## CI report: * 72dd07f5f10a56adf6025e82083af21ada47c711 Travis: [SUCCESS](https://travis-ci.com/flink-ci/flink/builds/143614040) Azure: [SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4198) * f2387288cb33f288164ed9d102b47868a93dc898 Travis: [FAILURE](https://travis-ci.com/flink-ci/flink/builds/148710964) Azure: [CANCELED](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5118) * 68496011f53df6a9933c2d8723d8f5995cc96111 Travis: [SUCCESS](https://travis-ci.com/flink-ci/flink/builds/149728083) Azure: [SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5333) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode
gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode URL: https://github.com/apache/flink/pull/7368#discussion_r381636153 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/BufferResponseDecoderDelegate.java ## @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.netty; + +import org.apache.flink.runtime.io.network.NetworkClientHandler; +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufAllocator; + +/** + * The parser for {@link org.apache.flink.runtime.io.network.netty.NettyMessage.BufferResponse}. + */ +public class BufferResponseDecoderDelegate implements NettyMessageDecoderDelegate { + + /** The network client handler of current channel. */ + private final NetworkClientHandler networkClientHandler; + + /** The Flink Buffer allocator. */ + private final NetworkBufferAllocator allocator; + + /** The cumulation buffer of message header. */ + private ByteBuf messageHeaderCumulationBuffer; + + /** +* The current BufferResponse message that are process the buffer part. +* If it is null, we are still processing the message header part, otherwise +* we are processing the buffer part. +*/ + private NettyMessage.BufferResponse currentResponse; + + /** How much bytes have been received or discarded for the buffer part. */ + private int decodedBytesOfBuffer; + + public BufferResponseDecoderDelegate(NetworkClientHandler networkClientHandler) { + this.networkClientHandler = networkClientHandler; + this.allocator = new NetworkBufferAllocator(networkClientHandler); + } + + @Override + public void onChannelActive(ByteBufAllocator alloc) { + messageHeaderCumulationBuffer = alloc.directBuffer(NettyMessage.BufferResponse.MESSAGE_HEADER_LENGTH); + } + + @Override + public void startParsingMessage(int msgId, int messageLength) { + currentResponse = null; + decodedBytesOfBuffer = 0; + + messageHeaderCumulationBuffer.clear(); + } + + @Override + public ParseResult onChannelRead(ByteBuf data) throws Exception { + if (currentResponse == null) { + ByteBuf toDecode = ByteBufUtils.cumulate(messageHeaderCumulationBuffer, data, NettyMessage.BufferResponse.MESSAGE_HEADER_LENGTH); + + if (toDecode != null) { + currentResponse = NettyMessage.BufferResponse.readFrom(toDecode, allocator); Review comment: Have import `NettyMessage.BufferResponse` at the beginning. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode
gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode URL: https://github.com/apache/flink/pull/7368#discussion_r381636064 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/BufferResponseDecoderDelegate.java ## @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.netty; + +import org.apache.flink.runtime.io.network.NetworkClientHandler; +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufAllocator; + +/** + * The parser for {@link org.apache.flink.runtime.io.network.netty.NettyMessage.BufferResponse}. + */ +public class BufferResponseDecoderDelegate implements NettyMessageDecoderDelegate { + + /** The network client handler of current channel. */ + private final NetworkClientHandler networkClientHandler; + + /** The Flink Buffer allocator. */ + private final NetworkBufferAllocator allocator; + + /** The cumulation buffer of message header. */ + private ByteBuf messageHeaderCumulationBuffer; + + /** +* The current BufferResponse message that are process the buffer part. +* If it is null, we are still processing the message header part, otherwise +* we are processing the buffer part. +*/ + private NettyMessage.BufferResponse currentResponse; + + /** How much bytes have been received or discarded for the buffer part. */ + private int decodedBytesOfBuffer; + + public BufferResponseDecoderDelegate(NetworkClientHandler networkClientHandler) { + this.networkClientHandler = networkClientHandler; + this.allocator = new NetworkBufferAllocator(networkClientHandler); + } + + @Override + public void onChannelActive(ByteBufAllocator alloc) { + messageHeaderCumulationBuffer = alloc.directBuffer(NettyMessage.BufferResponse.MESSAGE_HEADER_LENGTH); + } + + @Override + public void startParsingMessage(int msgId, int messageLength) { + currentResponse = null; + decodedBytesOfBuffer = 0; + + messageHeaderCumulationBuffer.clear(); + } + + @Override + public ParseResult onChannelRead(ByteBuf data) throws Exception { + if (currentResponse == null) { + ByteBuf toDecode = ByteBufUtils.cumulate(messageHeaderCumulationBuffer, data, NettyMessage.BufferResponse.MESSAGE_HEADER_LENGTH); Review comment: Could this problem be solved if we import `NettyMessage.BufferResponse` in advance ? As the following comments suggested. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] KarmaGYZ commented on issue #11100: [FLINK-15562][docs] Add Example settings.xml for maven archetype command wh…
KarmaGYZ commented on issue #11100: [FLINK-15562][docs] Add Example settings.xml for maven archetype command wh… URL: https://github.com/apache/flink/pull/11100#issuecomment-588556496 Thanks for the review and information @zentol . If it is against ASF rules. I agree to not give that advice. I'm not sure should we also remove the existing note. BTW, I'd like to keep those format refactor. WDYT? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode
gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode URL: https://github.com/apache/flink/pull/7368#discussion_r381633592 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/BufferResponseDecoderDelegate.java ## @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.netty; + +import org.apache.flink.runtime.io.network.NetworkClientHandler; +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufAllocator; + +/** + * The parser for {@link org.apache.flink.runtime.io.network.netty.NettyMessage.BufferResponse}. + */ +public class BufferResponseDecoderDelegate implements NettyMessageDecoderDelegate { + + /** The network client handler of current channel. */ + private final NetworkClientHandler networkClientHandler; + + /** The Flink Buffer allocator. */ + private final NetworkBufferAllocator allocator; + + /** The cumulation buffer of message header. */ + private ByteBuf messageHeaderCumulationBuffer; Review comment: I think it might be better to rename to `messageHeaderBuffer` since in concept we call the second part `Message Header`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode
gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode URL: https://github.com/apache/flink/pull/7368#discussion_r381632755 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessageClientDecoder.java ## @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.netty; + +import org.apache.flink.runtime.io.network.NetworkClientHandler; +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter; + +import static org.apache.flink.runtime.io.network.netty.NettyMessage.FRAME_HEADER_LENGTH; +import static org.apache.flink.runtime.io.network.netty.NettyMessage.MAGIC_NUMBER; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * Decodes messages from the fragmentary netty buffers. This decoder assumes the + * messages have the following format: + * +---++ + * | FRAME_HEADER || MESSAGE_HEADER | DATA BUFFER (Optional) | + * +---++ + * + * This decoder decodes the frame header and delegates the other following work + * to corresponding message parsers according to the message type. During the process + * of decoding, the decoder and parsers try best to eliminate copying. For the frame + * header and message header, it only cumulates data when they span multiple input buffers. + * For the buffer part, it copies directly to the input channels to avoid future copying. + * + * The format of the frame header is + * +--+--++ + * | FRAME LENGTH (4) | MAGIC NUMBER (4) | ID (1) | + * +--+--++ + */ +public class NettyMessageClientDecoder extends ChannelInboundHandlerAdapter { + + /** The message parser for buffer response. */ +private final NettyMessageDecoderDelegate bufferResponseDecoderDelegate; + +/** The message parser for other messages other than buffer response. */ + private final NettyMessageDecoderDelegate nonBufferResponseDecoderDelegate; + + /** The cumulation buffer for the frame header part. */ + private ByteBuf frameHeaderBuffer; + + /** +* The chosen message parser for the current message. If it is null, then +* we are decoding the frame header part, otherwise we are decoding the actual +* message. +*/ + private NettyMessageDecoderDelegate currentDecoderDelegate; + +NettyMessageClientDecoder(NetworkClientHandler networkClientHandler) { +this.bufferResponseDecoderDelegate = new BufferResponseDecoderDelegate(networkClientHandler); +this.nonBufferResponseDecoderDelegate = new NonBufferResponseDecoderDelegate(); +} + +@Override +public void channelActive(ChannelHandlerContext ctx) throws Exception { +super.channelActive(ctx); + +bufferResponseDecoderDelegate.onChannelActive(ctx.alloc()); +nonBufferResponseDecoderDelegate.onChannelActive(ctx.alloc()); + + frameHeaderBuffer = ctx.alloc().directBuffer(FRAME_HEADER_LENGTH); +} + +@Override +public void channelInactive(ChannelHandlerContext ctx) throws Exception { Review comment: I think it might be ok to not deal with `exceptionCaught` explicitly, since `channelInactive` is called when channel get closed, and in `CreditBasedPartitionClientHander`, it closes the channel in `exceptionCaught`, thus it will close the channel. Besides, I think we should always be able to close the channel, otherwise we should have channel leakage problem. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at:
[GitHub] [flink] gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode
gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode URL: https://github.com/apache/flink/pull/7368#discussion_r381624866 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessageDecoderDelegate.java ## @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.netty; + +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufAllocator; + +import javax.annotation.Nullable; + +/** + * Parsers for specified netty messages. + */ +public interface NettyMessageDecoderDelegate { + + /** +* The result of message parsing with the provided data. +*/ + class ParseResult { + private final static ParseResult NOT_FINISHED = new ParseResult(false, null); + + static ParseResult notFinished() { + return NOT_FINISHED; + } + + static ParseResult finishedWith(@Nullable NettyMessage message) { + return new ParseResult(true, message); + } + + final boolean finished; + + @Nullable + final NettyMessage message; + + private ParseResult(boolean finished, NettyMessage message) { + this.finished = finished; + this.message = message; + } + } + + /** +* Notifies the underlying channel become active. +* +* @param alloc The netty buffer allocator. +*/ + void onChannelActive(ByteBufAllocator alloc); Review comment: Have modified accordingly. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode
gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode URL: https://github.com/apache/flink/pull/7368#discussion_r381624725 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessageDecoderDelegate.java ## @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.netty; + +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufAllocator; + +import javax.annotation.Nullable; + +/** + * Parsers for specified netty messages. + */ +public interface NettyMessageDecoderDelegate { Review comment: I'm slightly hesitate to rename the classes since we also have `NettyMessageServerDecoder`, and if we change the name of client-side decoder to `NettyMessageDecoderDelegate`, it seems to be some kind of inconsistency. I think we might change the DecoderDelegate to something else? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode
gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode URL: https://github.com/apache/flink/pull/7368#discussion_r381624091 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessageDecoderDelegate.java ## @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.netty; + +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufAllocator; + +import javax.annotation.Nullable; + +/** + * Parsers for specified netty messages. + */ +public interface NettyMessageDecoderDelegate { Review comment: Have modified accordingly. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode
gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode URL: https://github.com/apache/flink/pull/7368#discussion_r381623946 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessageClientDecoder.java ## @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.netty; + +import org.apache.flink.runtime.io.network.NetworkClientHandler; +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter; + +import static org.apache.flink.runtime.io.network.netty.NettyMessage.FRAME_HEADER_LENGTH; +import static org.apache.flink.runtime.io.network.netty.NettyMessage.MAGIC_NUMBER; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * Decodes messages from the fragmentary netty buffers. This decoder assumes the + * messages have the following format: + * +---++ + * | FRAME_HEADER || MESSAGE_HEADER | DATA BUFFER (Optional) | + * +---++ + * + * This decoder decodes the frame header and delegates the other following work + * to corresponding message parsers according to the message type. During the process + * of decoding, the decoder and parsers try best to eliminate copying. For the frame + * header and message header, it only cumulates data when they span multiple input buffers. + * For the buffer part, it copies directly to the input channels to avoid future copying. + * + * The format of the frame header is + * +--+--++ + * | FRAME LENGTH (4) | MAGIC NUMBER (4) | ID (1) | + * +--+--++ + */ +public class NettyMessageClientDecoder extends ChannelInboundHandlerAdapter { + + /** The message parser for buffer response. */ +private final NettyMessageDecoderDelegate bufferResponseDecoderDelegate; + +/** The message parser for other messages other than buffer response. */ + private final NettyMessageDecoderDelegate nonBufferResponseDecoderDelegate; + + /** The cumulation buffer for the frame header part. */ + private ByteBuf frameHeaderBuffer; + + /** +* The chosen message parser for the current message. If it is null, then +* we are decoding the frame header part, otherwise we are decoding the actual +* message. +*/ + private NettyMessageDecoderDelegate currentDecoderDelegate; + +NettyMessageClientDecoder(NetworkClientHandler networkClientHandler) { +this.bufferResponseDecoderDelegate = new BufferResponseDecoderDelegate(networkClientHandler); +this.nonBufferResponseDecoderDelegate = new NonBufferResponseDecoderDelegate(); +} + +@Override +public void channelActive(ChannelHandlerContext ctx) throws Exception { +super.channelActive(ctx); + +bufferResponseDecoderDelegate.onChannelActive(ctx.alloc()); +nonBufferResponseDecoderDelegate.onChannelActive(ctx.alloc()); + + frameHeaderBuffer = ctx.alloc().directBuffer(FRAME_HEADER_LENGTH); +} + +@Override +public void channelInactive(ChannelHandlerContext ctx) throws Exception { +super.channelInactive(ctx); + + bufferResponseDecoderDelegate.release(); + nonBufferResponseDecoderDelegate.release(); + + frameHeaderBuffer.release(); +} + +@Override +public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { +if (!(msg instanceof ByteBuf)) { +ctx.fireChannelRead(msg); +return; +} + +ByteBuf data = (ByteBuf) msg; + +try { +while (data.isReadable()) { + if (currentDecoderDelegate == null) { Review comment: Have moved the logic of decoding frame header into separate method.
[GitHub] [flink] gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode
gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode URL: https://github.com/apache/flink/pull/7368#discussion_r381623946 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessageClientDecoder.java ## @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.netty; + +import org.apache.flink.runtime.io.network.NetworkClientHandler; +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter; + +import static org.apache.flink.runtime.io.network.netty.NettyMessage.FRAME_HEADER_LENGTH; +import static org.apache.flink.runtime.io.network.netty.NettyMessage.MAGIC_NUMBER; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * Decodes messages from the fragmentary netty buffers. This decoder assumes the + * messages have the following format: + * +---++ + * | FRAME_HEADER || MESSAGE_HEADER | DATA BUFFER (Optional) | + * +---++ + * + * This decoder decodes the frame header and delegates the other following work + * to corresponding message parsers according to the message type. During the process + * of decoding, the decoder and parsers try best to eliminate copying. For the frame + * header and message header, it only cumulates data when they span multiple input buffers. + * For the buffer part, it copies directly to the input channels to avoid future copying. + * + * The format of the frame header is + * +--+--++ + * | FRAME LENGTH (4) | MAGIC NUMBER (4) | ID (1) | + * +--+--++ + */ +public class NettyMessageClientDecoder extends ChannelInboundHandlerAdapter { + + /** The message parser for buffer response. */ +private final NettyMessageDecoderDelegate bufferResponseDecoderDelegate; + +/** The message parser for other messages other than buffer response. */ + private final NettyMessageDecoderDelegate nonBufferResponseDecoderDelegate; + + /** The cumulation buffer for the frame header part. */ + private ByteBuf frameHeaderBuffer; + + /** +* The chosen message parser for the current message. If it is null, then +* we are decoding the frame header part, otherwise we are decoding the actual +* message. +*/ + private NettyMessageDecoderDelegate currentDecoderDelegate; + +NettyMessageClientDecoder(NetworkClientHandler networkClientHandler) { +this.bufferResponseDecoderDelegate = new BufferResponseDecoderDelegate(networkClientHandler); +this.nonBufferResponseDecoderDelegate = new NonBufferResponseDecoderDelegate(); +} + +@Override +public void channelActive(ChannelHandlerContext ctx) throws Exception { +super.channelActive(ctx); + +bufferResponseDecoderDelegate.onChannelActive(ctx.alloc()); +nonBufferResponseDecoderDelegate.onChannelActive(ctx.alloc()); + + frameHeaderBuffer = ctx.alloc().directBuffer(FRAME_HEADER_LENGTH); +} + +@Override +public void channelInactive(ChannelHandlerContext ctx) throws Exception { +super.channelInactive(ctx); + + bufferResponseDecoderDelegate.release(); + nonBufferResponseDecoderDelegate.release(); + + frameHeaderBuffer.release(); +} + +@Override +public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { +if (!(msg instanceof ByteBuf)) { +ctx.fireChannelRead(msg); +return; +} + +ByteBuf data = (ByteBuf) msg; + +try { +while (data.isReadable()) { + if (currentDecoderDelegate == null) { Review comment: Have moved the logic of decoding frame header into separate method.
[GitHub] [flink] gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode
gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode URL: https://github.com/apache/flink/pull/7368#discussion_r381623313 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessageClientDecoder.java ## @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.netty; + +import org.apache.flink.runtime.io.network.NetworkClientHandler; +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter; + +import static org.apache.flink.runtime.io.network.netty.NettyMessage.FRAME_HEADER_LENGTH; +import static org.apache.flink.runtime.io.network.netty.NettyMessage.MAGIC_NUMBER; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * Decodes messages from the fragmentary netty buffers. This decoder assumes the + * messages have the following format: + * +---++ + * | FRAME_HEADER || MESSAGE_HEADER | DATA BUFFER (Optional) | + * +---++ + * + * This decoder decodes the frame header and delegates the other following work + * to corresponding message parsers according to the message type. During the process + * of decoding, the decoder and parsers try best to eliminate copying. For the frame + * header and message header, it only cumulates data when they span multiple input buffers. + * For the buffer part, it copies directly to the input channels to avoid future copying. + * + * The format of the frame header is + * +--+--++ + * | FRAME LENGTH (4) | MAGIC NUMBER (4) | ID (1) | + * +--+--++ + */ +public class NettyMessageClientDecoder extends ChannelInboundHandlerAdapter { + + /** The message parser for buffer response. */ +private final NettyMessageDecoderDelegate bufferResponseDecoderDelegate; + +/** The message parser for other messages other than buffer response. */ + private final NettyMessageDecoderDelegate nonBufferResponseDecoderDelegate; + + /** The cumulation buffer for the frame header part. */ + private ByteBuf frameHeaderBuffer; + + /** +* The chosen message parser for the current message. If it is null, then +* we are decoding the frame header part, otherwise we are decoding the actual +* message. +*/ + private NettyMessageDecoderDelegate currentDecoderDelegate; + +NettyMessageClientDecoder(NetworkClientHandler networkClientHandler) { +this.bufferResponseDecoderDelegate = new BufferResponseDecoderDelegate(networkClientHandler); +this.nonBufferResponseDecoderDelegate = new NonBufferResponseDecoderDelegate(); +} + +@Override +public void channelActive(ChannelHandlerContext ctx) throws Exception { +super.channelActive(ctx); + +bufferResponseDecoderDelegate.onChannelActive(ctx.alloc()); +nonBufferResponseDecoderDelegate.onChannelActive(ctx.alloc()); + + frameHeaderBuffer = ctx.alloc().directBuffer(FRAME_HEADER_LENGTH); +} + +@Override +public void channelInactive(ChannelHandlerContext ctx) throws Exception { +super.channelInactive(ctx); + + bufferResponseDecoderDelegate.release(); + nonBufferResponseDecoderDelegate.release(); + + frameHeaderBuffer.release(); +} + +@Override +public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { +if (!(msg instanceof ByteBuf)) { +ctx.fireChannelRead(msg); Review comment: Agree with that and removed the `firChannelRead` for objects other than `ByteBuf`. Besides, I think we could omit the explicit checking here for performance? It will throw exception when casting the `Object` to `ByteBuf`.
[jira] [Commented] (FLINK-16005) Propagate yarn.application.classpath from client to TaskManager Classpath
[ https://issues.apache.org/jira/browse/FLINK-16005?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17040528#comment-17040528 ] Zhenqiu Huang commented on FLINK-16005: --- [~trohrmann] I feel specifying the yarn config key directly in the configuration and do not remove them is cleaner for users. I will create a PR with this solution. > Propagate yarn.application.classpath from client to TaskManager Classpath > - > > Key: FLINK-16005 > URL: https://issues.apache.org/jira/browse/FLINK-16005 > Project: Flink > Issue Type: Improvement > Components: Deployment / YARN >Reporter: Zhenqiu Huang >Priority: Major > > When Flink users want to override the hadoop yarn container classpath, they > should just specify the yarn.application.classpath in yarn-site.xml from cli > side. But currently, the classpath setting can only be used in flink > application master, the classpath of TM is still determined by the setting in > yarn host. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] flinkbot edited a comment on issue #7702: [FLINK-11088][Security][YARN] Allow YARN to discover pre-installed keytab files
flinkbot edited a comment on issue #7702: [FLINK-11088][Security][YARN] Allow YARN to discover pre-installed keytab files URL: https://github.com/apache/flink/pull/7702#issuecomment-572195960 ## CI report: * 72dd07f5f10a56adf6025e82083af21ada47c711 Travis: [SUCCESS](https://travis-ci.com/flink-ci/flink/builds/143614040) Azure: [SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4198) * f2387288cb33f288164ed9d102b47868a93dc898 Travis: [FAILURE](https://travis-ci.com/flink-ci/flink/builds/148710964) Azure: [CANCELED](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5118) * 68496011f53df6a9933c2d8723d8f5995cc96111 Travis: [PENDING](https://travis-ci.com/flink-ci/flink/builds/149728083) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #7702: [FLINK-11088][Security][YARN] Allow YARN to discover pre-installed keytab files
flinkbot edited a comment on issue #7702: [FLINK-11088][Security][YARN] Allow YARN to discover pre-installed keytab files URL: https://github.com/apache/flink/pull/7702#issuecomment-572195960 ## CI report: * 72dd07f5f10a56adf6025e82083af21ada47c711 Travis: [SUCCESS](https://travis-ci.com/flink-ci/flink/builds/143614040) Azure: [SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4198) * f2387288cb33f288164ed9d102b47868a93dc898 Travis: [FAILURE](https://travis-ci.com/flink-ci/flink/builds/148710964) Azure: [CANCELED](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5118) * 68496011f53df6a9933c2d8723d8f5995cc96111 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #9703: [FLINK-14038]Add default GC options for flink on yarn to facilitate debugging
flinkbot edited a comment on issue #9703: [FLINK-14038]Add default GC options for flink on yarn to facilitate debugging URL: https://github.com/apache/flink/pull/9703#issuecomment-532581942 ## CI report: * 1b930d19f27909ad5e2759eb6c5471c2ce07e8b4 Travis: [FAILURE](https://travis-ci.com/flink-ci/flink/builds/128133485) * 977ccb5d91869e37027069d8b2b490bf850253ed Travis: [CANCELED](https://travis-ci.com/flink-ci/flink/builds/129659424) * 8347093d4cb32ed752bc01f5cd98abb2d803df94 Travis: [FAILURE](https://travis-ci.com/flink-ci/flink/builds/130842273) * 796de65585c861a67c46ba8c578e08302ade2cdc Travis: [FAILURE](https://travis-ci.com/flink-ci/flink/builds/133371242) * 5817aa535fb834889eebb96478b7a40f936fb3c3 Travis: [FAILURE](https://travis-ci.com/flink-ci/flink/builds/134200223) * 040b9878337aa7b919f16d2cfb1c9bc590b31a7e Travis: [FAILURE](https://travis-ci.com/flink-ci/flink/builds/134418855) * 32120e401687204ef737ebe01875e293be71d720 Travis: [FAILURE](https://travis-ci.com/flink-ci/flink/builds/134550254) * bb787cffdb56629b880c50edbf368fa81f11db58 Travis: [FAILURE](https://travis-ci.com/flink-ci/flink/builds/134797060) * 0cd39929e456b8ab0a1d1c20dc0b05b29d92d8b0 Travis: [FAILURE](https://travis-ci.com/flink-ci/flink/builds/135018920) * 560c6f43ae1b26dd81b360d5f32207c459824def Travis: [FAILURE](https://travis-ci.com/flink-ci/flink/builds/135182722) * d50ce6eec61b0825c8c3df7e3a738390f69c14f5 Travis: [FAILURE](https://travis-ci.com/flink-ci/flink/builds/135279137) * 0877d8ecdaf16b2529efc011de6585570ffa741c Travis: [FAILURE](https://travis-ci.com/flink-ci/flink/builds/135302579) * b64ed1deecad145deeeb05aa23b1bff77af9f1ed Travis: [FAILURE](https://travis-ci.com/flink-ci/flink/builds/137343176) * 290d9f7498478e04751d099eae7295ab99c3ddc5 Travis: [FAILURE](https://travis-ci.com/flink-ci/flink/builds/149686285) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5330) * 40213c1684a264def3e489df1bd264fb8ad1871a Travis: [FAILURE](https://travis-ci.com/flink-ci/flink/builds/149715359) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5331) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #9703: [FLINK-14038]Add default GC options for flink on yarn to facilitate debugging
flinkbot edited a comment on issue #9703: [FLINK-14038]Add default GC options for flink on yarn to facilitate debugging URL: https://github.com/apache/flink/pull/9703#issuecomment-532581942 ## CI report: * 1b930d19f27909ad5e2759eb6c5471c2ce07e8b4 Travis: [FAILURE](https://travis-ci.com/flink-ci/flink/builds/128133485) * 977ccb5d91869e37027069d8b2b490bf850253ed Travis: [CANCELED](https://travis-ci.com/flink-ci/flink/builds/129659424) * 8347093d4cb32ed752bc01f5cd98abb2d803df94 Travis: [FAILURE](https://travis-ci.com/flink-ci/flink/builds/130842273) * 796de65585c861a67c46ba8c578e08302ade2cdc Travis: [FAILURE](https://travis-ci.com/flink-ci/flink/builds/133371242) * 5817aa535fb834889eebb96478b7a40f936fb3c3 Travis: [FAILURE](https://travis-ci.com/flink-ci/flink/builds/134200223) * 040b9878337aa7b919f16d2cfb1c9bc590b31a7e Travis: [FAILURE](https://travis-ci.com/flink-ci/flink/builds/134418855) * 32120e401687204ef737ebe01875e293be71d720 Travis: [FAILURE](https://travis-ci.com/flink-ci/flink/builds/134550254) * bb787cffdb56629b880c50edbf368fa81f11db58 Travis: [FAILURE](https://travis-ci.com/flink-ci/flink/builds/134797060) * 0cd39929e456b8ab0a1d1c20dc0b05b29d92d8b0 Travis: [FAILURE](https://travis-ci.com/flink-ci/flink/builds/135018920) * 560c6f43ae1b26dd81b360d5f32207c459824def Travis: [FAILURE](https://travis-ci.com/flink-ci/flink/builds/135182722) * d50ce6eec61b0825c8c3df7e3a738390f69c14f5 Travis: [FAILURE](https://travis-ci.com/flink-ci/flink/builds/135279137) * 0877d8ecdaf16b2529efc011de6585570ffa741c Travis: [FAILURE](https://travis-ci.com/flink-ci/flink/builds/135302579) * b64ed1deecad145deeeb05aa23b1bff77af9f1ed Travis: [FAILURE](https://travis-ci.com/flink-ci/flink/builds/137343176) * 290d9f7498478e04751d099eae7295ab99c3ddc5 Travis: [FAILURE](https://travis-ci.com/flink-ci/flink/builds/149686285) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5330) * 40213c1684a264def3e489df1bd264fb8ad1871a Travis: [PENDING](https://travis-ci.com/flink-ci/flink/builds/149715359) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #9703: [FLINK-14038]Add default GC options for flink on yarn to facilitate debugging
flinkbot edited a comment on issue #9703: [FLINK-14038]Add default GC options for flink on yarn to facilitate debugging URL: https://github.com/apache/flink/pull/9703#issuecomment-532581942 ## CI report: * 1b930d19f27909ad5e2759eb6c5471c2ce07e8b4 Travis: [FAILURE](https://travis-ci.com/flink-ci/flink/builds/128133485) * 977ccb5d91869e37027069d8b2b490bf850253ed Travis: [CANCELED](https://travis-ci.com/flink-ci/flink/builds/129659424) * 8347093d4cb32ed752bc01f5cd98abb2d803df94 Travis: [FAILURE](https://travis-ci.com/flink-ci/flink/builds/130842273) * 796de65585c861a67c46ba8c578e08302ade2cdc Travis: [FAILURE](https://travis-ci.com/flink-ci/flink/builds/133371242) * 5817aa535fb834889eebb96478b7a40f936fb3c3 Travis: [FAILURE](https://travis-ci.com/flink-ci/flink/builds/134200223) * 040b9878337aa7b919f16d2cfb1c9bc590b31a7e Travis: [FAILURE](https://travis-ci.com/flink-ci/flink/builds/134418855) * 32120e401687204ef737ebe01875e293be71d720 Travis: [FAILURE](https://travis-ci.com/flink-ci/flink/builds/134550254) * bb787cffdb56629b880c50edbf368fa81f11db58 Travis: [FAILURE](https://travis-ci.com/flink-ci/flink/builds/134797060) * 0cd39929e456b8ab0a1d1c20dc0b05b29d92d8b0 Travis: [FAILURE](https://travis-ci.com/flink-ci/flink/builds/135018920) * 560c6f43ae1b26dd81b360d5f32207c459824def Travis: [FAILURE](https://travis-ci.com/flink-ci/flink/builds/135182722) * d50ce6eec61b0825c8c3df7e3a738390f69c14f5 Travis: [FAILURE](https://travis-ci.com/flink-ci/flink/builds/135279137) * 0877d8ecdaf16b2529efc011de6585570ffa741c Travis: [FAILURE](https://travis-ci.com/flink-ci/flink/builds/135302579) * b64ed1deecad145deeeb05aa23b1bff77af9f1ed Travis: [FAILURE](https://travis-ci.com/flink-ci/flink/builds/137343176) * 290d9f7498478e04751d099eae7295ab99c3ddc5 Travis: [FAILURE](https://travis-ci.com/flink-ci/flink/builds/149686285) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5330) * 40213c1684a264def3e489df1bd264fb8ad1871a UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services