[jira] [Created] (FLINK-16254) Support -p/--parallelism option for StatefulFunctionsClusterEntryPoint
Tzu-Li (Gordon) Tai created FLINK-16254: --- Summary: Support -p/--parallelism option for StatefulFunctionsClusterEntryPoint Key: FLINK-16254 URL: https://issues.apache.org/jira/browse/FLINK-16254 Project: Flink Issue Type: New Feature Components: Stateful Functions Affects Versions: statefun-1.1 Reporter: Tzu-Li (Gordon) Tai Assignee: Tzu-Li (Gordon) Tai Currently the only way for users to specify parallelism > 1 for Stateful Functions applications is to provide a value for {{parallelism.default}} via {{flink-conf.yaml}}. That is not so nice to use, as users would essentially need to rebuild the Stateful Functions application image just to change the parallelism. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] flinkbot commented on issue #11198: [FLINK-16248][python][ml] Add interfaces for MLEnvironment and MLEnvironmentFactory
flinkbot commented on issue #11198: [FLINK-16248][python][ml] Add interfaces for MLEnvironment and MLEnvironmentFactory URL: https://github.com/apache/flink/pull/11198#issuecomment-590196380 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Automated Checks Last check on commit 2683ef7e092d4026bad2307c4cbb204aa052be1d (Mon Feb 24 07:31:21 UTC 2020) **Warnings:** * No documentation files were touched! Remember to keep the Flink docs up to date! Mention the bot in a comment to re-run the automated checks. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Assigned] (FLINK-16161) Statistics zero should be unknown in HiveCatalog
[ https://issues.apache.org/jira/browse/FLINK-16161?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jingsong Lee reassigned FLINK-16161: Assignee: Jingsong Lee > Statistics zero should be unknown in HiveCatalog > > > Key: FLINK-16161 > URL: https://issues.apache.org/jira/browse/FLINK-16161 > Project: Flink > Issue Type: Bug > Components: Connectors / Hive >Reporter: Jingsong Lee >Assignee: Jingsong Lee >Priority: Blocker > Fix For: 1.10.1 > > > In hive, treat statistics zero as unknown, but in Flink HiveCatalog, treat > zero as real value. > This lead wrong inputs to CBO. > Previous discussed in [https://github.com/apache/flink/pull/10380] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] hequn8128 commented on issue #11198: [FLINK-16248][python][ml] Add interfaces for MLEnvironment and MLEnvironmentFactory
hequn8128 commented on issue #11198: [FLINK-16248][python][ml] Add interfaces for MLEnvironment and MLEnvironmentFactory URL: https://github.com/apache/flink/pull/11198#issuecomment-590192859 @becketqin @walterddr Would be great if you can also take a look. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-16248) Add interfaces for MLEnvironment and MLEnvironmentFactory
[ https://issues.apache.org/jira/browse/FLINK-16248?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-16248: --- Labels: pull-request-available (was: ) > Add interfaces for MLEnvironment and MLEnvironmentFactory > - > > Key: FLINK-16248 > URL: https://issues.apache.org/jira/browse/FLINK-16248 > Project: Flink > Issue Type: Sub-task > Components: API / Python >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > Labels: pull-request-available > > Align interface for MLEnvironment and MLEnvironmentFactory, so Python users > can use Python MLEnvironmentFactory to maintain execution environment and > table environment. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] hequn8128 opened a new pull request #11198: [FLINK-16248][python][ml] Add interfaces for MLEnvironment and MLEnvironmentFactory
hequn8128 opened a new pull request #11198: [FLINK-16248][python][ml] Add interfaces for MLEnvironment and MLEnvironmentFactory URL: https://github.com/apache/flink/pull/11198 ## What is the purpose of the change Align interface for MLEnvironment and MLEnvironmentFactory, so Python users can use Python MLEnvironmentFactory to maintain execution environment and table environment. ## Brief change log - Add MLEnvironment and MLEnvironmentFactory interface. - Remove is_blink_planner parameter for TableEnvironment init method. - Add tests. ## Verifying this change This change added tests and can be verified as follows: - Add MLEnvironmentTest to test the logic in MLEnvironment. - Add MLEnvironmentFactoryTest to test the logic in MLEnvironmentFactory. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (Yes) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (yes) - If yes, how is the feature documented? (PythonDocs) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] PatrickRen commented on a change in pull request #11151: [FLINK-16128][docs]Translate "Google Cloud PubSub" page into Chinese
PatrickRen commented on a change in pull request #11151: [FLINK-16128][docs]Translate "Google Cloud PubSub" page into Chinese URL: https://github.com/apache/flink/pull/11151#discussion_r383101507 ## File path: docs/dev/connectors/pubsub.zh.md ## @@ -23,9 +23,7 @@ specific language governing permissions and limitations under the License. --> -This connector provides a Source and Sink that can read from and write to -[Google Cloud PubSub](https://cloud.google.com/pubsub). To use this connector, add the -following dependency to your project: +这个连接器可向 [Google Cloud PubSub](https://cloud.google.com/pubsub) 读取与写入数据。添加下面的依赖来使用此连接器: Review comment: "Source and Sink" is not shown in the translated sentence This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] PatrickRen commented on a change in pull request #11151: [FLINK-16128][docs]Translate "Google Cloud PubSub" page into Chinese
PatrickRen commented on a change in pull request #11151: [FLINK-16128][docs]Translate "Google Cloud PubSub" page into Chinese URL: https://github.com/apache/flink/pull/11151#discussion_r383103413 ## File path: docs/dev/connectors/pubsub.zh.md ## @@ -106,17 +97,18 @@ dataStream.addSink(pubsubSink); ### Google Credentials -Google uses [Credentials](https://cloud.google.com/docs/authentication/production) to authenticate and authorize applications so that they can use Google Cloud Platform resources (such as PubSub). +应用程序需要使用 [Credentials](https://cloud.google.com/docs/authentication/production) 来通过认证和授权才能使用 Google Cloud Platform 的资源,例如PubSub。 Review comment: 例如[space]PubSub This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] PatrickRen commented on a change in pull request #11151: [FLINK-16128][docs]Translate "Google Cloud PubSub" page into Chinese
PatrickRen commented on a change in pull request #11151: [FLINK-16128][docs]Translate "Google Cloud PubSub" page into Chinese URL: https://github.com/apache/flink/pull/11151#discussion_r383102874 ## File path: docs/dev/connectors/pubsub.zh.md ## @@ -39,24 +37,17 @@ following dependency to your project: Note: This connector has been added to Flink recently. It has not received widespread testing yet. -Note that the streaming connectors are currently not part of the binary -distribution. See -[here]({{ site.baseurl }}/dev/projectsetup/dependencies.html) -for information about how to package the program with the libraries for -cluster execution. - - +注意连接器目前还不是二进制发行版的一部分,添加依赖、打包配置以及集群运行信息请参考[这里]({{ site.baseurl }}/dev/projectsetup/dependencies.html) ## Consuming or Producing PubSubMessages -The connector provides a connectors for receiving and sending messages from and to Google PubSub. -Google PubSub has an `at-least-once` guarantee and as such the connector delivers the same guarantees. +连接器可以接收和发送 Google PubSub 的信息。和 Google PubSub 一样,这个连接器能够保证`至少一次`的语义。 ### PubSub SourceFunction -The class `PubSubSource` has a builder to create PubSubsources: `PubSubSource.newBuilder(...)` +`PubSubSource` 类的对象由构建类来构建: `PubSubSource.newBuilder(...)` -There are several optional methods to alter how the PubSubSource is created, the bare minimum is to provide a Google project, Pubsub subscription and a way to deserialize the PubSubMessages. +除了 Google project,Pubsub subscription 和反序列化 PubSubMessage的方法是必须的,你有多种可选的方法来创建 PubSubSource。 Review comment: I think it's better to be: 有多种可选的方法来创建 PubSubSource,但最低要求是要提供 Google Project、Pubsub 订阅和反序列化 PubSubMessages 的方法。 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] PatrickRen commented on a change in pull request #11151: [FLINK-16128][docs]Translate "Google Cloud PubSub" page into Chinese
PatrickRen commented on a change in pull request #11151: [FLINK-16128][docs]Translate "Google Cloud PubSub" page into Chinese URL: https://github.com/apache/flink/pull/11151#discussion_r383103168 ## File path: docs/dev/connectors/pubsub.zh.md ## @@ -77,13 +68,13 @@ streamExecEnv.addSource(source); -Currently the source functions [pulls](https://cloud.google.com/pubsub/docs/pull) messages from PubSub, [push endpoints](https://cloud.google.com/pubsub/docs/push) are not supported. +当前还不支持 PubSub 的source functions [pulls](https://cloud.google.com/pubsub/docs/pull) messages和 [push endpoints](https://cloud.google.com/pubsub/docs/push)。 Review comment: PubSub 的[space]source functions ... messages[space]和... This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] PatrickRen commented on a change in pull request #11151: [FLINK-16128][docs]Translate "Google Cloud PubSub" page into Chinese
PatrickRen commented on a change in pull request #11151: [FLINK-16128][docs]Translate "Google Cloud PubSub" page into Chinese URL: https://github.com/apache/flink/pull/11151#discussion_r383101821 ## File path: docs/dev/connectors/pubsub.zh.md ## @@ -39,24 +37,17 @@ following dependency to your project: Note: This connector has been added to Flink recently. It has not received widespread testing yet. Review comment: The note here is not translated This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] PatrickRen commented on a change in pull request #11151: [FLINK-16128][docs]Translate "Google Cloud PubSub" page into Chinese
PatrickRen commented on a change in pull request #11151: [FLINK-16128][docs]Translate "Google Cloud PubSub" page into Chinese URL: https://github.com/apache/flink/pull/11151#discussion_r383105282 ## File path: docs/dev/connectors/pubsub.zh.md ## @@ -143,22 +135,22 @@ env.addSource(pubsubSource) -### Atleast once guarantee +### 精确一次语义保证 SourceFunction -There are several reasons why a message might be send multiple times, such as failure scenarios on Google PubSub's side. +有很多原因导致会一个信息会被多次发出,例如 Google PubSub 的故障。 -Another reason is when the acknowledgement deadline has passed. This is the time between receiving the message and between acknowledging the message. The PubSubSource will only acknowledge a message on successful checkpoints to guarantee Atleast-Once. This does mean if the time between successful checkpoints is larger than the acknowledgment deadline of your subscription messages will most likely be processed multiple times. +另一个可能的原因是超过了确认的截止时间。这是收到信息的间隔和信息确认的间隔。PubSubSource 只有在信息被成功快照之后才会确认以保证至少一次的语义。这意味着,如果你的快照间隔大于信息确认的截止时间,那么你订阅的信息很有可能会被多次处理。 Review comment: I doubt that the second "between" is added by mistake in the original English document: > This is the time between receiving the message and between acknowledging the message. If so, the translation would be: 另一个可能的原因是超过了确认的截止时间,即收到与确认信息之间的时间间隔。 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] PatrickRen commented on a change in pull request #11151: [FLINK-16128][docs]Translate "Google Cloud PubSub" page into Chinese
PatrickRen commented on a change in pull request #11151: [FLINK-16128][docs]Translate "Google Cloud PubSub" page into Chinese URL: https://github.com/apache/flink/pull/11151#discussion_r383106065 ## File path: docs/dev/connectors/pubsub.zh.md ## @@ -143,22 +135,22 @@ env.addSource(pubsubSource) -### Atleast once guarantee +### 精确一次语义保证 SourceFunction -There are several reasons why a message might be send multiple times, such as failure scenarios on Google PubSub's side. +有很多原因导致会一个信息会被多次发出,例如 Google PubSub 的故障。 -Another reason is when the acknowledgement deadline has passed. This is the time between receiving the message and between acknowledging the message. The PubSubSource will only acknowledge a message on successful checkpoints to guarantee Atleast-Once. This does mean if the time between successful checkpoints is larger than the acknowledgment deadline of your subscription messages will most likely be processed multiple times. +另一个可能的原因是超过了确认的截止时间。这是收到信息的间隔和信息确认的间隔。PubSubSource 只有在信息被成功快照之后才会确认以保证至少一次的语义。这意味着,如果你的快照间隔大于信息确认的截止时间,那么你订阅的信息很有可能会被多次处理。 -For this reason it's recommended to have a (much) lower checkpoint interval than acknowledgement deadline. +因此,我们建议把快照的间隔设置得比信息确认截止时间更短。 -See [PubSub](https://cloud.google.com/pubsub/docs/subscriber) for details on how to increase the acknowledgment deadline of your subscription. +参照 [PubSub](https://cloud.google.com/pubsub/docs/subscriber) 来增加信息确认截止时间。 -Note: The metric `PubSubMessagesProcessedNotAcked` shows how many messages are waiting for the next checkpoint before they will be acknowledged. +注意: `PubSubMessagesProcessedNotAcked` 显示了有多少信息正在等待下一个快照还没被确认。 Review comment: I think we don't need to translate "checkpoint" according to Flink Translation Specifications This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] PatrickRen commented on a change in pull request #11151: [FLINK-16128][docs]Translate "Google Cloud PubSub" page into Chinese
PatrickRen commented on a change in pull request #11151: [FLINK-16128][docs]Translate "Google Cloud PubSub" page into Chinese URL: https://github.com/apache/flink/pull/11151#discussion_r383106631 ## File path: docs/dev/connectors/pubsub.zh.md ## @@ -143,22 +135,22 @@ env.addSource(pubsubSource) -### Atleast once guarantee +### 精确一次语义保证 SourceFunction -There are several reasons why a message might be send multiple times, such as failure scenarios on Google PubSub's side. +有很多原因导致会一个信息会被多次发出,例如 Google PubSub 的故障。 -Another reason is when the acknowledgement deadline has passed. This is the time between receiving the message and between acknowledging the message. The PubSubSource will only acknowledge a message on successful checkpoints to guarantee Atleast-Once. This does mean if the time between successful checkpoints is larger than the acknowledgment deadline of your subscription messages will most likely be processed multiple times. +另一个可能的原因是超过了确认的截止时间。这是收到信息的间隔和信息确认的间隔。PubSubSource 只有在信息被成功快照之后才会确认以保证至少一次的语义。这意味着,如果你的快照间隔大于信息确认的截止时间,那么你订阅的信息很有可能会被多次处理。 -For this reason it's recommended to have a (much) lower checkpoint interval than acknowledgement deadline. +因此,我们建议把快照的间隔设置得比信息确认截止时间更短。 -See [PubSub](https://cloud.google.com/pubsub/docs/subscriber) for details on how to increase the acknowledgment deadline of your subscription. +参照 [PubSub](https://cloud.google.com/pubsub/docs/subscriber) 来增加信息确认截止时间。 -Note: The metric `PubSubMessagesProcessedNotAcked` shows how many messages are waiting for the next checkpoint before they will be acknowledged. +注意: `PubSubMessagesProcessedNotAcked` 显示了有多少信息正在等待下一个快照还没被确认。 SinkFunction -The sink function buffers messages that are to be send to PubSub for a short amount of time for performance reasons. Before each checkpoint this buffer is flushed and the checkpoint will not succeed unless the messages have been delivered to PubSub. +Sink function 会把准备发到 PubSub 的信息短暂地缓存以提高性能。每次快照前,它会 flush 缓冲区,并且只有当所有信息成功发送到 PubSub 之后,快照才会成功完成。 Review comment: Same issue of "checkpoint" here. I think "flush" can be translated as "刷新". This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] PatrickRen commented on a change in pull request #11151: [FLINK-16128][docs]Translate "Google Cloud PubSub" page into Chinese
PatrickRen commented on a change in pull request #11151: [FLINK-16128][docs]Translate "Google Cloud PubSub" page into Chinese URL: https://github.com/apache/flink/pull/11151#discussion_r383107003 ## File path: docs/dev/connectors/pubsub.md ## @@ -143,13 +143,13 @@ env.addSource(pubsubSource) -### Atleast once guarantee +### At least once guarantee SourceFunction There are several reasons why a message might be send multiple times, such as failure scenarios on Google PubSub's side. -Another reason is when the acknowledgement deadline has passed. This is the time between receiving the message and between acknowledging the message. The PubSubSource will only acknowledge a message on successful checkpoints to guarantee Atleast-Once. This does mean if the time between successful checkpoints is larger than the acknowledgment deadline of your subscription messages will most likely be processed multiple times. +Another reason is when the acknowledgement deadline has passed. This is the time between receiving the message and between acknowledging the message. The PubSubSource will only acknowledge a message on successful checkpoints to guarantee at-least-Once. This does mean if the time between successful checkpoints is larger than the acknowledgment deadline of your subscription messages will most likely be processed multiple times. Review comment: I think using lower case for all three words is better: "at-least-once" This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #11112: [FLINK-16121][python] Introduce ArrowReader and ArrowWriter for Arrow format data read and write
flinkbot edited a comment on issue #2: [FLINK-16121][python] Introduce ArrowReader and ArrowWriter for Arrow format data read and write URL: https://github.com/apache/flink/pull/2#issuecomment-587035354 ## CI report: * dba261b36b56aedec766097042dbe40858e5fc8c Travis: [FAILURE](https://travis-ci.com/flink-ci/flink/builds/149292329) Azure: [SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5252) * fc594cbe4670795421281fd5d19cd5b3d8109a9e Travis: [PENDING](https://travis-ci.com/flink-ci/flink/builds/150247518) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] liupc commented on issue #9703: [FLINK-14038]Add default GC options for flink on yarn to facilitate debugging
liupc commented on issue #9703: [FLINK-14038]Add default GC options for flink on yarn to facilitate debugging URL: https://github.com/apache/flink/pull/9703#issuecomment-590186785 Thanks @tillrohrmann , I think your concern is right, and the docs #11165 looks good! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #11112: [FLINK-16121][python] Introduce ArrowReader and ArrowWriter for Arrow format data read and write
flinkbot edited a comment on issue #2: [FLINK-16121][python] Introduce ArrowReader and ArrowWriter for Arrow format data read and write URL: https://github.com/apache/flink/pull/2#issuecomment-587035354 ## CI report: * dba261b36b56aedec766097042dbe40858e5fc8c Travis: [FAILURE](https://travis-ci.com/flink-ci/flink/builds/149292329) Azure: [SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5252) * fc594cbe4670795421281fd5d19cd5b3d8109a9e UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #11155: [FLINK-14818][benchmark] Fix receiving InputGate setup of StreamNetworkBenchmarkEnvironment.
zhijiangW commented on a change in pull request #11155: [FLINK-14818][benchmark] Fix receiving InputGate setup of StreamNetworkBenchmarkEnvironment. URL: https://github.com/apache/flink/pull/11155#discussion_r383102059 ## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java ## @@ -272,11 +287,161 @@ private static ShuffleDescriptor createShuffleDescriptor( ResultPartitionID resultPartitionID, ResourceID location, TaskManagerLocation senderLocation, - int channel) { + int connectionIndex) { final NettyShuffleDescriptorBuilder builder = NettyShuffleDescriptorBuilder.newBuilder() .setId(resultPartitionID) .setProducerInfoFromTaskManagerLocation(senderLocation) - .setConnectionIndex(channel); + .setConnectionIndex(connectionIndex); return localMode ? builder.setProducerLocation(location).buildLocal() : builder.buildRemote(); } + + /** +* A {@link SingleInputGateFactory} which replaces the default {@link RemoteInputChannel} and +* {@link LocalInputChannel} implementation with costume ones. +*/ + private static class TestSingleInputGateFactory extends SingleInputGateFactory { + + private final ResourceID taskExecutorResourceId; + private final int partitionRequestInitialBackoff; + private final int partitionRequestMaxBackoff; + private final ConnectionManager connectionManager; + private final ResultPartitionManager partitionManager; + private final TaskEventPublisher taskEventPublisher; + private final NetworkBufferPool networkBufferPool; + + public TestSingleInputGateFactory( + @Nonnull ResourceID taskExecutorResourceId, + @Nonnull NettyShuffleEnvironmentConfiguration networkConfig, + @Nonnull ConnectionManager connectionManager, + @Nonnull ResultPartitionManager partitionManager, + @Nonnull TaskEventPublisher taskEventPublisher, + @Nonnull NetworkBufferPool networkBufferPool) { + super( + taskExecutorResourceId, + networkConfig, + connectionManager, + partitionManager, + taskEventPublisher, + networkBufferPool); + this.networkBufferPool = networkBufferPool; + this.taskEventPublisher = taskEventPublisher; + this.partitionManager = partitionManager; + this.connectionManager = connectionManager; + this.partitionRequestMaxBackoff = networkConfig.partitionRequestMaxBackoff(); + this.partitionRequestInitialBackoff = networkConfig.partitionRequestInitialBackoff(); + this.taskExecutorResourceId = taskExecutorResourceId; + } + + @Override + protected InputChannel createKnownInputChannel( + SingleInputGate inputGate, + int index, + NettyShuffleDescriptor inputChannelDescriptor, + ChannelStatistics channelStatistics, + InputChannelMetrics metrics) { + ResultPartitionID partitionId = inputChannelDescriptor.getResultPartitionID(); + if (inputChannelDescriptor.isLocalTo(taskExecutorResourceId)) { + return new TestLocalInputChannel( + inputGate, + index, + partitionId, + partitionManager, + taskEventPublisher, + partitionRequestInitialBackoff, + partitionRequestMaxBackoff, + metrics); + } else { + return new TestRemoteInputChannel( + inputGate, + index, + partitionId, + inputChannelDescriptor.getConnectionId(), + connectionManager, + partitionRequestInitialBackoff, +
[GitHub] [flink] zhijiangW commented on a change in pull request #11155: [FLINK-14818][benchmark] Fix receiving InputGate setup of StreamNetworkBenchmarkEnvironment.
zhijiangW commented on a change in pull request #11155: [FLINK-14818][benchmark] Fix receiving InputGate setup of StreamNetworkBenchmarkEnvironment. URL: https://github.com/apache/flink/pull/11155#discussion_r383102071 ## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java ## @@ -272,11 +287,161 @@ private static ShuffleDescriptor createShuffleDescriptor( ResultPartitionID resultPartitionID, ResourceID location, TaskManagerLocation senderLocation, - int channel) { + int connectionIndex) { final NettyShuffleDescriptorBuilder builder = NettyShuffleDescriptorBuilder.newBuilder() .setId(resultPartitionID) .setProducerInfoFromTaskManagerLocation(senderLocation) - .setConnectionIndex(channel); + .setConnectionIndex(connectionIndex); return localMode ? builder.setProducerLocation(location).buildLocal() : builder.buildRemote(); } + + /** +* A {@link SingleInputGateFactory} which replaces the default {@link RemoteInputChannel} and +* {@link LocalInputChannel} implementation with costume ones. +*/ + private static class TestSingleInputGateFactory extends SingleInputGateFactory { + + private final ResourceID taskExecutorResourceId; + private final int partitionRequestInitialBackoff; + private final int partitionRequestMaxBackoff; + private final ConnectionManager connectionManager; + private final ResultPartitionManager partitionManager; + private final TaskEventPublisher taskEventPublisher; + private final NetworkBufferPool networkBufferPool; + + public TestSingleInputGateFactory( + @Nonnull ResourceID taskExecutorResourceId, + @Nonnull NettyShuffleEnvironmentConfiguration networkConfig, + @Nonnull ConnectionManager connectionManager, + @Nonnull ResultPartitionManager partitionManager, + @Nonnull TaskEventPublisher taskEventPublisher, + @Nonnull NetworkBufferPool networkBufferPool) { + super( + taskExecutorResourceId, + networkConfig, + connectionManager, + partitionManager, + taskEventPublisher, + networkBufferPool); + this.networkBufferPool = networkBufferPool; + this.taskEventPublisher = taskEventPublisher; + this.partitionManager = partitionManager; + this.connectionManager = connectionManager; + this.partitionRequestMaxBackoff = networkConfig.partitionRequestMaxBackoff(); + this.partitionRequestInitialBackoff = networkConfig.partitionRequestInitialBackoff(); + this.taskExecutorResourceId = taskExecutorResourceId; + } + + @Override + protected InputChannel createKnownInputChannel( + SingleInputGate inputGate, + int index, + NettyShuffleDescriptor inputChannelDescriptor, + ChannelStatistics channelStatistics, + InputChannelMetrics metrics) { + ResultPartitionID partitionId = inputChannelDescriptor.getResultPartitionID(); + if (inputChannelDescriptor.isLocalTo(taskExecutorResourceId)) { + return new TestLocalInputChannel( + inputGate, + index, + partitionId, + partitionManager, + taskEventPublisher, + partitionRequestInitialBackoff, + partitionRequestMaxBackoff, + metrics); + } else { + return new TestRemoteInputChannel( + inputGate, + index, + partitionId, + inputChannelDescriptor.getConnectionId(), + connectionManager, + partitionRequestInitialBackoff, +
[GitHub] [flink] dianfu commented on issue #11112: [FLINK-16121][python] Introduce ArrowReader and ArrowWriter for Arrow format data read and write
dianfu commented on issue #2: [FLINK-16121][python] Introduce ArrowReader and ArrowWriter for Arrow format data read and write URL: https://github.com/apache/flink/pull/2#issuecomment-590184765 @hequn8128 Thanks a lot for your review. Have updated the PR accordingly. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] dianfu commented on a change in pull request #11112: [FLINK-16121][python] Introduce ArrowReader and ArrowWriter for Arrow format data read and write
dianfu commented on a change in pull request #2: [FLINK-16121][python] Introduce ArrowReader and ArrowWriter for Arrow format data read and write URL: https://github.com/apache/flink/pull/2#discussion_r383100703 ## File path: flink-python/src/main/java/org/apache/flink/table/runtime/arrow/ArrowFieldReader.java ## @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.arrow; Review comment: I recall that we have reached consensus that we should use functions.python instead of python.functions. However, I'm also not sure which one is best. What about discussing whether we should improve the package name in a separate thread? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #11155: [FLINK-14818][benchmark] Fix receiving InputGate setup of StreamNetworkBenchmarkEnvironment.
zhijiangW commented on a change in pull request #11155: [FLINK-14818][benchmark] Fix receiving InputGate setup of StreamNetworkBenchmarkEnvironment. URL: https://github.com/apache/flink/pull/11155#discussion_r383090816 ## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java ## @@ -272,11 +287,161 @@ private static ShuffleDescriptor createShuffleDescriptor( ResultPartitionID resultPartitionID, ResourceID location, TaskManagerLocation senderLocation, - int channel) { + int connectionIndex) { final NettyShuffleDescriptorBuilder builder = NettyShuffleDescriptorBuilder.newBuilder() .setId(resultPartitionID) .setProducerInfoFromTaskManagerLocation(senderLocation) - .setConnectionIndex(channel); + .setConnectionIndex(connectionIndex); return localMode ? builder.setProducerLocation(location).buildLocal() : builder.buildRemote(); } + + /** +* A {@link SingleInputGateFactory} which replaces the default {@link RemoteInputChannel} and +* {@link LocalInputChannel} implementation with costume ones. +*/ + private static class TestSingleInputGateFactory extends SingleInputGateFactory { + + private final ResourceID taskExecutorResourceId; + private final int partitionRequestInitialBackoff; + private final int partitionRequestMaxBackoff; + private final ConnectionManager connectionManager; + private final ResultPartitionManager partitionManager; + private final TaskEventPublisher taskEventPublisher; + private final NetworkBufferPool networkBufferPool; + + public TestSingleInputGateFactory( + @Nonnull ResourceID taskExecutorResourceId, + @Nonnull NettyShuffleEnvironmentConfiguration networkConfig, + @Nonnull ConnectionManager connectionManager, + @Nonnull ResultPartitionManager partitionManager, + @Nonnull TaskEventPublisher taskEventPublisher, + @Nonnull NetworkBufferPool networkBufferPool) { + super( + taskExecutorResourceId, + networkConfig, + connectionManager, + partitionManager, + taskEventPublisher, + networkBufferPool); + this.networkBufferPool = networkBufferPool; + this.taskEventPublisher = taskEventPublisher; + this.partitionManager = partitionManager; + this.connectionManager = connectionManager; + this.partitionRequestMaxBackoff = networkConfig.partitionRequestMaxBackoff(); + this.partitionRequestInitialBackoff = networkConfig.partitionRequestInitialBackoff(); + this.taskExecutorResourceId = taskExecutorResourceId; + } + + @Override + protected InputChannel createKnownInputChannel( + SingleInputGate inputGate, + int index, + NettyShuffleDescriptor inputChannelDescriptor, + ChannelStatistics channelStatistics, + InputChannelMetrics metrics) { + ResultPartitionID partitionId = inputChannelDescriptor.getResultPartitionID(); + if (inputChannelDescriptor.isLocalTo(taskExecutorResourceId)) { + return new TestLocalInputChannel( + inputGate, + index, + partitionId, + partitionManager, + taskEventPublisher, + partitionRequestInitialBackoff, + partitionRequestMaxBackoff, + metrics); + } else { + return new TestRemoteInputChannel( + inputGate, + index, + partitionId, + inputChannelDescriptor.getConnectionId(), + connectionManager, + partitionRequestInitialBackoff, +
[GitHub] [flink] zhijiangW commented on a change in pull request #11155: [FLINK-14818][benchmark] Fix receiving InputGate setup of StreamNetworkBenchmarkEnvironment.
zhijiangW commented on a change in pull request #11155: [FLINK-14818][benchmark] Fix receiving InputGate setup of StreamNetworkBenchmarkEnvironment. URL: https://github.com/apache/flink/pull/11155#discussion_r383090260 ## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java ## @@ -272,11 +287,161 @@ private static ShuffleDescriptor createShuffleDescriptor( ResultPartitionID resultPartitionID, ResourceID location, TaskManagerLocation senderLocation, - int channel) { + int connectionIndex) { final NettyShuffleDescriptorBuilder builder = NettyShuffleDescriptorBuilder.newBuilder() .setId(resultPartitionID) .setProducerInfoFromTaskManagerLocation(senderLocation) - .setConnectionIndex(channel); + .setConnectionIndex(connectionIndex); return localMode ? builder.setProducerLocation(location).buildLocal() : builder.buildRemote(); } + + /** +* A {@link SingleInputGateFactory} which replaces the default {@link RemoteInputChannel} and +* {@link LocalInputChannel} implementation with costume ones. +*/ + private static class TestSingleInputGateFactory extends SingleInputGateFactory { + + private final ResourceID taskExecutorResourceId; + private final int partitionRequestInitialBackoff; + private final int partitionRequestMaxBackoff; + private final ConnectionManager connectionManager; + private final ResultPartitionManager partitionManager; + private final TaskEventPublisher taskEventPublisher; + private final NetworkBufferPool networkBufferPool; + + public TestSingleInputGateFactory( + @Nonnull ResourceID taskExecutorResourceId, + @Nonnull NettyShuffleEnvironmentConfiguration networkConfig, + @Nonnull ConnectionManager connectionManager, + @Nonnull ResultPartitionManager partitionManager, + @Nonnull TaskEventPublisher taskEventPublisher, + @Nonnull NetworkBufferPool networkBufferPool) { + super( + taskExecutorResourceId, + networkConfig, + connectionManager, + partitionManager, + taskEventPublisher, + networkBufferPool); + this.networkBufferPool = networkBufferPool; + this.taskEventPublisher = taskEventPublisher; + this.partitionManager = partitionManager; + this.connectionManager = connectionManager; + this.partitionRequestMaxBackoff = networkConfig.partitionRequestMaxBackoff(); + this.partitionRequestInitialBackoff = networkConfig.partitionRequestInitialBackoff(); + this.taskExecutorResourceId = taskExecutorResourceId; + } + + @Override + protected InputChannel createKnownInputChannel( + SingleInputGate inputGate, + int index, + NettyShuffleDescriptor inputChannelDescriptor, + ChannelStatistics channelStatistics, + InputChannelMetrics metrics) { + ResultPartitionID partitionId = inputChannelDescriptor.getResultPartitionID(); + if (inputChannelDescriptor.isLocalTo(taskExecutorResourceId)) { + return new TestLocalInputChannel( + inputGate, + index, + partitionId, + partitionManager, + taskEventPublisher, + partitionRequestInitialBackoff, + partitionRequestMaxBackoff, + metrics); + } else { + return new TestRemoteInputChannel( + inputGate, + index, + partitionId, + inputChannelDescriptor.getConnectionId(), + connectionManager, + partitionRequestInitialBackoff, +
[GitHub] [flink] zhijiangW commented on a change in pull request #11155: [FLINK-14818][benchmark] Fix receiving InputGate setup of StreamNetworkBenchmarkEnvironment.
zhijiangW commented on a change in pull request #11155: [FLINK-14818][benchmark] Fix receiving InputGate setup of StreamNetworkBenchmarkEnvironment. URL: https://github.com/apache/flink/pull/11155#discussion_r383090336 ## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/SingleInputGateBenchmarkFactory.java ## @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.io.benchmark; + +import org.apache.flink.core.memory.MemorySegmentProvider; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.io.network.ConnectionID; +import org.apache.flink.runtime.io.network.ConnectionManager; +import org.apache.flink.runtime.io.network.TaskEventPublisher; +import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool; +import org.apache.flink.runtime.io.network.metrics.InputChannelMetrics; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.io.network.partition.ResultPartitionManager; +import org.apache.flink.runtime.io.network.partition.consumer.InputChannel; +import org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel; +import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel; +import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate; +import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateFactory; +import org.apache.flink.runtime.shuffle.NettyShuffleDescriptor; +import org.apache.flink.runtime.taskmanager.NettyShuffleEnvironmentConfiguration; + +import java.io.IOException; + +/** + * A benchmark-specific input gate factory which overrides the respective methods of creating + * {@link RemoteInputChannel} and {@link LocalInputChannel} for requesting specific subpartitions. + */ +public class SingleInputGateBenchmarkFactory extends SingleInputGateFactory { + + public SingleInputGateBenchmarkFactory( + ResourceID taskExecutorResourceId, + NettyShuffleEnvironmentConfiguration networkConfig, + ConnectionManager connectionManager, + ResultPartitionManager partitionManager, + TaskEventPublisher taskEventPublisher, + NetworkBufferPool networkBufferPool) { + super( + taskExecutorResourceId, + networkConfig, + connectionManager, + partitionManager, + taskEventPublisher, + networkBufferPool); + } + + @Override + protected InputChannel createKnownInputChannel( + SingleInputGate inputGate, + int index, + NettyShuffleDescriptor inputChannelDescriptor, + SingleInputGateFactory.ChannelStatistics channelStatistics, + InputChannelMetrics metrics) { + ResultPartitionID partitionId = inputChannelDescriptor.getResultPartitionID(); + if (inputChannelDescriptor.isLocalTo(taskExecutorResourceId)) { + return new TestLocalInputChannel( + inputGate, + index, + partitionId, + partitionManager, + taskEventPublisher, + partitionRequestInitialBackoff, + partitionRequestMaxBackoff, + metrics); + } else { + return new TestRemoteInputChannel( + inputGate, + index, + partitionId, + inputChannelDescriptor.getConnectionId(), + connectionManager, + partitionRequestInitialBackoff, + partitionRequestMaxBackoff, +
[GitHub] [flink] zhijiangW commented on a change in pull request #11155: [FLINK-14818][benchmark] Fix receiving InputGate setup of StreamNetworkBenchmarkEnvironment.
zhijiangW commented on a change in pull request #11155: [FLINK-14818][benchmark] Fix receiving InputGate setup of StreamNetworkBenchmarkEnvironment. URL: https://github.com/apache/flink/pull/11155#discussion_r383094730 ## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/SingleInputGateBenchmarkFactory.java ## @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.io.benchmark; + +import org.apache.flink.core.memory.MemorySegmentProvider; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.io.network.ConnectionID; +import org.apache.flink.runtime.io.network.ConnectionManager; +import org.apache.flink.runtime.io.network.TaskEventPublisher; +import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool; +import org.apache.flink.runtime.io.network.metrics.InputChannelMetrics; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.io.network.partition.ResultPartitionManager; +import org.apache.flink.runtime.io.network.partition.consumer.InputChannel; +import org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel; +import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel; +import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate; +import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateFactory; +import org.apache.flink.runtime.shuffle.NettyShuffleDescriptor; +import org.apache.flink.runtime.taskmanager.NettyShuffleEnvironmentConfiguration; + +import java.io.IOException; + +/** + * A benchmark-specific input gate factory which overrides the respective methods of creating + * {@link RemoteInputChannel} and {@link LocalInputChannel} for requesting specific subpartitions. + */ +public class SingleInputGateBenchmarkFactory extends SingleInputGateFactory { + + public SingleInputGateBenchmarkFactory( + ResourceID taskExecutorResourceId, + NettyShuffleEnvironmentConfiguration networkConfig, + ConnectionManager connectionManager, + ResultPartitionManager partitionManager, + TaskEventPublisher taskEventPublisher, + NetworkBufferPool networkBufferPool) { + super( + taskExecutorResourceId, + networkConfig, + connectionManager, + partitionManager, + taskEventPublisher, + networkBufferPool); + } + + @Override + protected InputChannel createKnownInputChannel( + SingleInputGate inputGate, + int index, + NettyShuffleDescriptor inputChannelDescriptor, + SingleInputGateFactory.ChannelStatistics channelStatistics, + InputChannelMetrics metrics) { + ResultPartitionID partitionId = inputChannelDescriptor.getResultPartitionID(); + if (inputChannelDescriptor.isLocalTo(taskExecutorResourceId)) { + return new TestLocalInputChannel( + inputGate, + index, + partitionId, + partitionManager, + taskEventPublisher, + partitionRequestInitialBackoff, + partitionRequestMaxBackoff, + metrics); + } else { + return new TestRemoteInputChannel( + inputGate, + index, + partitionId, + inputChannelDescriptor.getConnectionId(), + connectionManager, + partitionRequestInitialBackoff, + partitionRequestMaxBackoff, +
[GitHub] [flink] zhijiangW commented on a change in pull request #11155: [FLINK-14818][benchmark] Fix receiving InputGate setup of StreamNetworkBenchmarkEnvironment.
zhijiangW commented on a change in pull request #11155: [FLINK-14818][benchmark] Fix receiving InputGate setup of StreamNetworkBenchmarkEnvironment. URL: https://github.com/apache/flink/pull/11155#discussion_r383094754 ## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/SingleInputGateBenchmarkFactory.java ## @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.io.benchmark; + +import org.apache.flink.core.memory.MemorySegmentProvider; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.io.network.ConnectionID; +import org.apache.flink.runtime.io.network.ConnectionManager; +import org.apache.flink.runtime.io.network.TaskEventPublisher; +import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool; +import org.apache.flink.runtime.io.network.metrics.InputChannelMetrics; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.io.network.partition.ResultPartitionManager; +import org.apache.flink.runtime.io.network.partition.consumer.InputChannel; +import org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel; +import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel; +import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate; +import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateFactory; +import org.apache.flink.runtime.shuffle.NettyShuffleDescriptor; +import org.apache.flink.runtime.taskmanager.NettyShuffleEnvironmentConfiguration; + +import java.io.IOException; + +/** + * A benchmark-specific input gate factory which overrides the respective methods of creating + * {@link RemoteInputChannel} and {@link LocalInputChannel} for requesting specific subpartitions. + */ +public class SingleInputGateBenchmarkFactory extends SingleInputGateFactory { + + public SingleInputGateBenchmarkFactory( + ResourceID taskExecutorResourceId, + NettyShuffleEnvironmentConfiguration networkConfig, + ConnectionManager connectionManager, + ResultPartitionManager partitionManager, + TaskEventPublisher taskEventPublisher, + NetworkBufferPool networkBufferPool) { + super( + taskExecutorResourceId, + networkConfig, + connectionManager, + partitionManager, + taskEventPublisher, + networkBufferPool); + } + + @Override + protected InputChannel createKnownInputChannel( + SingleInputGate inputGate, + int index, + NettyShuffleDescriptor inputChannelDescriptor, + SingleInputGateFactory.ChannelStatistics channelStatistics, + InputChannelMetrics metrics) { + ResultPartitionID partitionId = inputChannelDescriptor.getResultPartitionID(); + if (inputChannelDescriptor.isLocalTo(taskExecutorResourceId)) { + return new TestLocalInputChannel( + inputGate, + index, + partitionId, + partitionManager, + taskEventPublisher, + partitionRequestInitialBackoff, + partitionRequestMaxBackoff, + metrics); + } else { + return new TestRemoteInputChannel( + inputGate, + index, + partitionId, + inputChannelDescriptor.getConnectionId(), + connectionManager, + partitionRequestInitialBackoff, + partitionRequestMaxBackoff, +
[GitHub] [flink] zhijiangW commented on a change in pull request #11155: [FLINK-14818][benchmark] Fix receiving InputGate setup of StreamNetworkBenchmarkEnvironment.
zhijiangW commented on a change in pull request #11155: [FLINK-14818][benchmark] Fix receiving InputGate setup of StreamNetworkBenchmarkEnvironment. URL: https://github.com/apache/flink/pull/11155#discussion_r383090816 ## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java ## @@ -272,11 +287,161 @@ private static ShuffleDescriptor createShuffleDescriptor( ResultPartitionID resultPartitionID, ResourceID location, TaskManagerLocation senderLocation, - int channel) { + int connectionIndex) { final NettyShuffleDescriptorBuilder builder = NettyShuffleDescriptorBuilder.newBuilder() .setId(resultPartitionID) .setProducerInfoFromTaskManagerLocation(senderLocation) - .setConnectionIndex(channel); + .setConnectionIndex(connectionIndex); return localMode ? builder.setProducerLocation(location).buildLocal() : builder.buildRemote(); } + + /** +* A {@link SingleInputGateFactory} which replaces the default {@link RemoteInputChannel} and +* {@link LocalInputChannel} implementation with costume ones. +*/ + private static class TestSingleInputGateFactory extends SingleInputGateFactory { + + private final ResourceID taskExecutorResourceId; + private final int partitionRequestInitialBackoff; + private final int partitionRequestMaxBackoff; + private final ConnectionManager connectionManager; + private final ResultPartitionManager partitionManager; + private final TaskEventPublisher taskEventPublisher; + private final NetworkBufferPool networkBufferPool; + + public TestSingleInputGateFactory( + @Nonnull ResourceID taskExecutorResourceId, + @Nonnull NettyShuffleEnvironmentConfiguration networkConfig, + @Nonnull ConnectionManager connectionManager, + @Nonnull ResultPartitionManager partitionManager, + @Nonnull TaskEventPublisher taskEventPublisher, + @Nonnull NetworkBufferPool networkBufferPool) { + super( + taskExecutorResourceId, + networkConfig, + connectionManager, + partitionManager, + taskEventPublisher, + networkBufferPool); + this.networkBufferPool = networkBufferPool; + this.taskEventPublisher = taskEventPublisher; + this.partitionManager = partitionManager; + this.connectionManager = connectionManager; + this.partitionRequestMaxBackoff = networkConfig.partitionRequestMaxBackoff(); + this.partitionRequestInitialBackoff = networkConfig.partitionRequestInitialBackoff(); + this.taskExecutorResourceId = taskExecutorResourceId; + } + + @Override + protected InputChannel createKnownInputChannel( + SingleInputGate inputGate, + int index, + NettyShuffleDescriptor inputChannelDescriptor, + ChannelStatistics channelStatistics, + InputChannelMetrics metrics) { + ResultPartitionID partitionId = inputChannelDescriptor.getResultPartitionID(); + if (inputChannelDescriptor.isLocalTo(taskExecutorResourceId)) { + return new TestLocalInputChannel( + inputGate, + index, + partitionId, + partitionManager, + taskEventPublisher, + partitionRequestInitialBackoff, + partitionRequestMaxBackoff, + metrics); + } else { + return new TestRemoteInputChannel( + inputGate, + index, + partitionId, + inputChannelDescriptor.getConnectionId(), + connectionManager, + partitionRequestInitialBackoff, +
[GitHub] [flink] zhijiangW commented on a change in pull request #11155: [FLINK-14818][benchmark] Fix receiving InputGate setup of StreamNetworkBenchmarkEnvironment.
zhijiangW commented on a change in pull request #11155: [FLINK-14818][benchmark] Fix receiving InputGate setup of StreamNetworkBenchmarkEnvironment. URL: https://github.com/apache/flink/pull/11155#discussion_r383090336 ## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/SingleInputGateBenchmarkFactory.java ## @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.io.benchmark; + +import org.apache.flink.core.memory.MemorySegmentProvider; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.io.network.ConnectionID; +import org.apache.flink.runtime.io.network.ConnectionManager; +import org.apache.flink.runtime.io.network.TaskEventPublisher; +import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool; +import org.apache.flink.runtime.io.network.metrics.InputChannelMetrics; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.io.network.partition.ResultPartitionManager; +import org.apache.flink.runtime.io.network.partition.consumer.InputChannel; +import org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel; +import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel; +import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate; +import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateFactory; +import org.apache.flink.runtime.shuffle.NettyShuffleDescriptor; +import org.apache.flink.runtime.taskmanager.NettyShuffleEnvironmentConfiguration; + +import java.io.IOException; + +/** + * A benchmark-specific input gate factory which overrides the respective methods of creating + * {@link RemoteInputChannel} and {@link LocalInputChannel} for requesting specific subpartitions. + */ +public class SingleInputGateBenchmarkFactory extends SingleInputGateFactory { + + public SingleInputGateBenchmarkFactory( + ResourceID taskExecutorResourceId, + NettyShuffleEnvironmentConfiguration networkConfig, + ConnectionManager connectionManager, + ResultPartitionManager partitionManager, + TaskEventPublisher taskEventPublisher, + NetworkBufferPool networkBufferPool) { + super( + taskExecutorResourceId, + networkConfig, + connectionManager, + partitionManager, + taskEventPublisher, + networkBufferPool); + } + + @Override + protected InputChannel createKnownInputChannel( + SingleInputGate inputGate, + int index, + NettyShuffleDescriptor inputChannelDescriptor, + SingleInputGateFactory.ChannelStatistics channelStatistics, + InputChannelMetrics metrics) { + ResultPartitionID partitionId = inputChannelDescriptor.getResultPartitionID(); + if (inputChannelDescriptor.isLocalTo(taskExecutorResourceId)) { + return new TestLocalInputChannel( + inputGate, + index, + partitionId, + partitionManager, + taskEventPublisher, + partitionRequestInitialBackoff, + partitionRequestMaxBackoff, + metrics); + } else { + return new TestRemoteInputChannel( + inputGate, + index, + partitionId, + inputChannelDescriptor.getConnectionId(), + connectionManager, + partitionRequestInitialBackoff, + partitionRequestMaxBackoff, +
[GitHub] [flink] zhijiangW commented on a change in pull request #11155: [FLINK-14818][benchmark] Fix receiving InputGate setup of StreamNetworkBenchmarkEnvironment.
zhijiangW commented on a change in pull request #11155: [FLINK-14818][benchmark] Fix receiving InputGate setup of StreamNetworkBenchmarkEnvironment. URL: https://github.com/apache/flink/pull/11155#discussion_r383090260 ## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java ## @@ -272,11 +287,161 @@ private static ShuffleDescriptor createShuffleDescriptor( ResultPartitionID resultPartitionID, ResourceID location, TaskManagerLocation senderLocation, - int channel) { + int connectionIndex) { final NettyShuffleDescriptorBuilder builder = NettyShuffleDescriptorBuilder.newBuilder() .setId(resultPartitionID) .setProducerInfoFromTaskManagerLocation(senderLocation) - .setConnectionIndex(channel); + .setConnectionIndex(connectionIndex); return localMode ? builder.setProducerLocation(location).buildLocal() : builder.buildRemote(); } + + /** +* A {@link SingleInputGateFactory} which replaces the default {@link RemoteInputChannel} and +* {@link LocalInputChannel} implementation with costume ones. +*/ + private static class TestSingleInputGateFactory extends SingleInputGateFactory { + + private final ResourceID taskExecutorResourceId; + private final int partitionRequestInitialBackoff; + private final int partitionRequestMaxBackoff; + private final ConnectionManager connectionManager; + private final ResultPartitionManager partitionManager; + private final TaskEventPublisher taskEventPublisher; + private final NetworkBufferPool networkBufferPool; + + public TestSingleInputGateFactory( + @Nonnull ResourceID taskExecutorResourceId, + @Nonnull NettyShuffleEnvironmentConfiguration networkConfig, + @Nonnull ConnectionManager connectionManager, + @Nonnull ResultPartitionManager partitionManager, + @Nonnull TaskEventPublisher taskEventPublisher, + @Nonnull NetworkBufferPool networkBufferPool) { + super( + taskExecutorResourceId, + networkConfig, + connectionManager, + partitionManager, + taskEventPublisher, + networkBufferPool); + this.networkBufferPool = networkBufferPool; + this.taskEventPublisher = taskEventPublisher; + this.partitionManager = partitionManager; + this.connectionManager = connectionManager; + this.partitionRequestMaxBackoff = networkConfig.partitionRequestMaxBackoff(); + this.partitionRequestInitialBackoff = networkConfig.partitionRequestInitialBackoff(); + this.taskExecutorResourceId = taskExecutorResourceId; + } + + @Override + protected InputChannel createKnownInputChannel( + SingleInputGate inputGate, + int index, + NettyShuffleDescriptor inputChannelDescriptor, + ChannelStatistics channelStatistics, + InputChannelMetrics metrics) { + ResultPartitionID partitionId = inputChannelDescriptor.getResultPartitionID(); + if (inputChannelDescriptor.isLocalTo(taskExecutorResourceId)) { + return new TestLocalInputChannel( + inputGate, + index, + partitionId, + partitionManager, + taskEventPublisher, + partitionRequestInitialBackoff, + partitionRequestMaxBackoff, + metrics); + } else { + return new TestRemoteInputChannel( + inputGate, + index, + partitionId, + inputChannelDescriptor.getConnectionId(), + connectionManager, + partitionRequestInitialBackoff, +
[GitHub] [flink] zhijiangW commented on a change in pull request #11155: [FLINK-14818][benchmark] Fix receiving InputGate setup of StreamNetworkBenchmarkEnvironment.
zhijiangW commented on a change in pull request #11155: [FLINK-14818][benchmark] Fix receiving InputGate setup of StreamNetworkBenchmarkEnvironment. URL: https://github.com/apache/flink/pull/11155#discussion_r383088089 ## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java ## @@ -272,11 +287,161 @@ private static ShuffleDescriptor createShuffleDescriptor( ResultPartitionID resultPartitionID, ResourceID location, TaskManagerLocation senderLocation, - int channel) { + int connectionIndex) { final NettyShuffleDescriptorBuilder builder = NettyShuffleDescriptorBuilder.newBuilder() .setId(resultPartitionID) .setProducerInfoFromTaskManagerLocation(senderLocation) - .setConnectionIndex(channel); + .setConnectionIndex(connectionIndex); return localMode ? builder.setProducerLocation(location).buildLocal() : builder.buildRemote(); } + + /** +* A {@link SingleInputGateFactory} which replaces the default {@link RemoteInputChannel} and +* {@link LocalInputChannel} implementation with costume ones. +*/ + private static class TestSingleInputGateFactory extends SingleInputGateFactory { + + private final ResourceID taskExecutorResourceId; + private final int partitionRequestInitialBackoff; + private final int partitionRequestMaxBackoff; + private final ConnectionManager connectionManager; + private final ResultPartitionManager partitionManager; + private final TaskEventPublisher taskEventPublisher; + private final NetworkBufferPool networkBufferPool; + + public TestSingleInputGateFactory( + @Nonnull ResourceID taskExecutorResourceId, + @Nonnull NettyShuffleEnvironmentConfiguration networkConfig, + @Nonnull ConnectionManager connectionManager, + @Nonnull ResultPartitionManager partitionManager, + @Nonnull TaskEventPublisher taskEventPublisher, + @Nonnull NetworkBufferPool networkBufferPool) { + super( + taskExecutorResourceId, + networkConfig, + connectionManager, + partitionManager, + taskEventPublisher, + networkBufferPool); + this.networkBufferPool = networkBufferPool; + this.taskEventPublisher = taskEventPublisher; + this.partitionManager = partitionManager; + this.connectionManager = connectionManager; + this.partitionRequestMaxBackoff = networkConfig.partitionRequestMaxBackoff(); + this.partitionRequestInitialBackoff = networkConfig.partitionRequestInitialBackoff(); + this.taskExecutorResourceId = taskExecutorResourceId; + } + + @Override + protected InputChannel createKnownInputChannel( + SingleInputGate inputGate, + int index, + NettyShuffleDescriptor inputChannelDescriptor, + ChannelStatistics channelStatistics, + InputChannelMetrics metrics) { + ResultPartitionID partitionId = inputChannelDescriptor.getResultPartitionID(); + if (inputChannelDescriptor.isLocalTo(taskExecutorResourceId)) { + return new TestLocalInputChannel( + inputGate, + index, + partitionId, + partitionManager, + taskEventPublisher, + partitionRequestInitialBackoff, + partitionRequestMaxBackoff, + metrics); + } else { + return new TestRemoteInputChannel( + inputGate, + index, + partitionId, + inputChannelDescriptor.getConnectionId(), + connectionManager, + partitionRequestInitialBackoff, +
[GitHub] [flink] zhijiangW commented on issue #11155: [FLINK-14818][benchmark] Fix receiving InputGate setup of StreamNetworkBenchmarkEnvironment.
zhijiangW commented on issue #11155: [FLINK-14818][benchmark] Fix receiving InputGate setup of StreamNetworkBenchmarkEnvironment. URL: https://github.com/apache/flink/pull/11155#issuecomment-590170854 Thanks for the updates @wsry and it generally looks good to me now, just left two tiny comments. As for the previous main concern, I left some suggestions [here](https://github.com/apache/flink/pull/11155#discussion_r383088089). After you providing the micro-benchmark results, I would also execute it in our benchmark machine to convince us. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #11155: [FLINK-14818][benchmark] Fix receiving InputGate setup of StreamNetworkBenchmarkEnvironment.
zhijiangW commented on a change in pull request #11155: [FLINK-14818][benchmark] Fix receiving InputGate setup of StreamNetworkBenchmarkEnvironment. URL: https://github.com/apache/flink/pull/11155#discussion_r383088311 ## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/SingleInputGateBenchmarkFactory.java ## @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.io.benchmark; + +import org.apache.flink.core.memory.MemorySegmentProvider; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.io.network.ConnectionID; +import org.apache.flink.runtime.io.network.ConnectionManager; +import org.apache.flink.runtime.io.network.TaskEventPublisher; +import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool; +import org.apache.flink.runtime.io.network.metrics.InputChannelMetrics; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.io.network.partition.ResultPartitionManager; +import org.apache.flink.runtime.io.network.partition.consumer.InputChannel; +import org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel; +import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel; +import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate; +import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateFactory; +import org.apache.flink.runtime.shuffle.NettyShuffleDescriptor; +import org.apache.flink.runtime.taskmanager.NettyShuffleEnvironmentConfiguration; + +import java.io.IOException; + +/** + * A benchmark-specific input gate factory which overrides the respective methods of creating + * {@link RemoteInputChannel} and {@link LocalInputChannel} for requesting specific subpartitions. + */ +public class SingleInputGateBenchmarkFactory extends SingleInputGateFactory { + + public SingleInputGateBenchmarkFactory( + ResourceID taskExecutorResourceId, + NettyShuffleEnvironmentConfiguration networkConfig, + ConnectionManager connectionManager, + ResultPartitionManager partitionManager, + TaskEventPublisher taskEventPublisher, + NetworkBufferPool networkBufferPool) { + super( + taskExecutorResourceId, + networkConfig, + connectionManager, + partitionManager, + taskEventPublisher, + networkBufferPool); + } + + @Override + protected InputChannel createKnownInputChannel( + SingleInputGate inputGate, + int index, + NettyShuffleDescriptor inputChannelDescriptor, + SingleInputGateFactory.ChannelStatistics channelStatistics, + InputChannelMetrics metrics) { + ResultPartitionID partitionId = inputChannelDescriptor.getResultPartitionID(); + if (inputChannelDescriptor.isLocalTo(taskExecutorResourceId)) { + return new TestLocalInputChannel( + inputGate, + index, + partitionId, + partitionManager, + taskEventPublisher, + partitionRequestInitialBackoff, + partitionRequestMaxBackoff, + metrics); + } else { + return new TestRemoteInputChannel( + inputGate, + index, + partitionId, + inputChannelDescriptor.getConnectionId(), + connectionManager, + partitionRequestInitialBackoff, + partitionRequestMaxBackoff, +
[GitHub] [flink] zhijiangW commented on a change in pull request #11155: [FLINK-14818][benchmark] Fix receiving InputGate setup of StreamNetworkBenchmarkEnvironment.
zhijiangW commented on a change in pull request #11155: [FLINK-14818][benchmark] Fix receiving InputGate setup of StreamNetworkBenchmarkEnvironment. URL: https://github.com/apache/flink/pull/11155#discussion_r383088089 ## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java ## @@ -272,11 +287,161 @@ private static ShuffleDescriptor createShuffleDescriptor( ResultPartitionID resultPartitionID, ResourceID location, TaskManagerLocation senderLocation, - int channel) { + int connectionIndex) { final NettyShuffleDescriptorBuilder builder = NettyShuffleDescriptorBuilder.newBuilder() .setId(resultPartitionID) .setProducerInfoFromTaskManagerLocation(senderLocation) - .setConnectionIndex(channel); + .setConnectionIndex(connectionIndex); return localMode ? builder.setProducerLocation(location).buildLocal() : builder.buildRemote(); } + + /** +* A {@link SingleInputGateFactory} which replaces the default {@link RemoteInputChannel} and +* {@link LocalInputChannel} implementation with costume ones. +*/ + private static class TestSingleInputGateFactory extends SingleInputGateFactory { + + private final ResourceID taskExecutorResourceId; + private final int partitionRequestInitialBackoff; + private final int partitionRequestMaxBackoff; + private final ConnectionManager connectionManager; + private final ResultPartitionManager partitionManager; + private final TaskEventPublisher taskEventPublisher; + private final NetworkBufferPool networkBufferPool; + + public TestSingleInputGateFactory( + @Nonnull ResourceID taskExecutorResourceId, + @Nonnull NettyShuffleEnvironmentConfiguration networkConfig, + @Nonnull ConnectionManager connectionManager, + @Nonnull ResultPartitionManager partitionManager, + @Nonnull TaskEventPublisher taskEventPublisher, + @Nonnull NetworkBufferPool networkBufferPool) { + super( + taskExecutorResourceId, + networkConfig, + connectionManager, + partitionManager, + taskEventPublisher, + networkBufferPool); + this.networkBufferPool = networkBufferPool; + this.taskEventPublisher = taskEventPublisher; + this.partitionManager = partitionManager; + this.connectionManager = connectionManager; + this.partitionRequestMaxBackoff = networkConfig.partitionRequestMaxBackoff(); + this.partitionRequestInitialBackoff = networkConfig.partitionRequestInitialBackoff(); + this.taskExecutorResourceId = taskExecutorResourceId; + } + + @Override + protected InputChannel createKnownInputChannel( + SingleInputGate inputGate, + int index, + NettyShuffleDescriptor inputChannelDescriptor, + ChannelStatistics channelStatistics, + InputChannelMetrics metrics) { + ResultPartitionID partitionId = inputChannelDescriptor.getResultPartitionID(); + if (inputChannelDescriptor.isLocalTo(taskExecutorResourceId)) { + return new TestLocalInputChannel( + inputGate, + index, + partitionId, + partitionManager, + taskEventPublisher, + partitionRequestInitialBackoff, + partitionRequestMaxBackoff, + metrics); + } else { + return new TestRemoteInputChannel( + inputGate, + index, + partitionId, + inputChannelDescriptor.getConnectionId(), + connectionManager, + partitionRequestInitialBackoff, +
[GitHub] [flink] Jiayi-Liao commented on a change in pull request #11179: [FLINK-16178][FLINK-16192][checkpointing] Clean up checkpoint metadata code and remove remaining bits of "legacy state"
Jiayi-Liao commented on a change in pull request #11179: [FLINK-16178][FLINK-16192][checkpointing] Clean up checkpoint metadata code and remove remaining bits of "legacy state" URL: https://github.com/apache/flink/pull/11179#discussion_r383065779 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointSerializer.java ## @@ -52,6 +37,5 @@ * @return The deserialized savepoint * @throws IOException Serialization failures are forwarded */ - T deserialize(DataInputStream dis, ClassLoader userCodeClassLoader) throws IOException; - + SavepointV2 deserialize(DataInputStream dis, ClassLoader userCodeClassLoader) throws IOException; Review comment: You're right. I was misguided by the previous interface. Would it be better if we change the function name to "deserializeToLatestVersion" ? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #11155: [FLINK-14818][benchmark] Fix receiving InputGate setup of StreamNetworkBenchmarkEnvironment.
zhijiangW commented on a change in pull request #11155: [FLINK-14818][benchmark] Fix receiving InputGate setup of StreamNetworkBenchmarkEnvironment. URL: https://github.com/apache/flink/pull/11155#discussion_r383081286 ## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java ## @@ -158,6 +157,10 @@ public void tearDown() { suppressExceptions(receiverEnv::close); } + /** +* Note: It should be guaranteed that {@link #createResultPartitionWriter(int)} has been called before +* creating the receiver. Review comment: nit: might supplement the reason why we should guarantee this. E.g. `Otherwise it might cause unexpected behaviors when {@link PartitionNotFoundException} happens in {@link TestRemoteInputChannel}`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Assigned] (FLINK-15289) Run sql appear error of "Zero-length character strings have no serializable string representation".
[ https://issues.apache.org/jira/browse/FLINK-15289?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jingsong Lee reassigned FLINK-15289: Assignee: (was: Jingsong Lee) > Run sql appear error of "Zero-length character strings have no serializable > string representation". > --- > > Key: FLINK-15289 > URL: https://issues.apache.org/jira/browse/FLINK-15289 > Project: Flink > Issue Type: Bug > Components: Table SQL / Client >Affects Versions: 1.10.0 >Reporter: xiaojin.wy >Priority: Major > Fix For: 1.11.0 > > > *The sql is:* > CREATE TABLE `INT8_TBL` ( > q1 BIGINT, > q2 BIGINT > ) WITH ( > 'format.field-delimiter'=',', > 'connector.type'='filesystem', > 'format.derive-schema'='true', > > 'connector.path'='/defender_test_data/daily_regression_batch_postgres_1.10/test_bigint/sources/INT8_TBL.csv', > 'format.type'='csv' > ); > SELECT '' AS five, q1 AS plus, -q1 AS xm FROM INT8_TBL; > *The error detail is :* > 2019-12-17 15:35:07,026 ERROR org.apache.flink.table.client.SqlClient - SQL > Client must stop. Unexpected exception. This is a bug. Please consider filing > an issue. > org.apache.flink.table.api.TableException: Zero-length character strings > have no serializable string representation. > at > org.apache.flink.table.types.logical.CharType.asSerializableString(CharType.java:116) > at > org.apache.flink.table.descriptors.DescriptorProperties.putTableSchema(DescriptorProperties.java:218) > at > org.apache.flink.table.catalog.CatalogTableImpl.toProperties(CatalogTableImpl.java:75) > at > org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSink(TableFactoryUtil.java:85) > at > org.apache.flink.table.client.gateway.local.LocalExecutor.executeQueryAndPersistInternal(LocalExecutor.java:688) > at > org.apache.flink.table.client.gateway.local.LocalExecutor.executeQueryAndPersist(LocalExecutor.java:488) > at org.apache.flink.table.client.cli.CliClient.callSelect(CliClient.java:601) > at > org.apache.flink.table.client.cli.CliClient.callCommand(CliClient.java:385) > at java.util.Optional.ifPresent(Optional.java:159) > at > org.apache.flink.table.client.cli.CliClient.submitSQLFile(CliClient.java:271) > at org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:125) > at org.apache.flink.table.client.SqlClient.start(SqlClient.java:104) > at org.apache.flink.table.client.SqlClient.main(SqlClient.java:180) > *The input data is:* > 123,456 > 123,4567890123456789 > 4567890123456789,123 > 4567890123456789,4567890123456789 > 4567890123456789,-4567890123456789 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (FLINK-14257) Integrate csv to FileSystemTableFactory
[ https://issues.apache.org/jira/browse/FLINK-14257?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jingsong Lee reassigned FLINK-14257: Assignee: (was: Jingsong Lee) > Integrate csv to FileSystemTableFactory > --- > > Key: FLINK-14257 > URL: https://issues.apache.org/jira/browse/FLINK-14257 > Project: Flink > Issue Type: Sub-task > Components: Connectors / FileSystem >Reporter: Jingsong Lee >Priority: Major > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (FLINK-15397) Streaming and batch has different value in the case of count function
[ https://issues.apache.org/jira/browse/FLINK-15397?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jingsong Lee reassigned FLINK-15397: Assignee: (was: Jingsong Lee) > Streaming and batch has different value in the case of count function > - > > Key: FLINK-15397 > URL: https://issues.apache.org/jira/browse/FLINK-15397 > Project: Flink > Issue Type: Bug > Components: Table SQL / Client >Affects Versions: 1.10.0 >Reporter: xiaojin.wy >Priority: Major > Fix For: 1.11.0 > > > *The sql is:* > CREATE TABLE `testdata` ( > a INT, > b INT > ) WITH ( > > 'connector.path'='/defender_test_data/daily_regression_batch_spark_1.10/test_group_agg/sources/testdata.csv', > 'format.empty-column-as-null'='true', > 'format.field-delimiter'='|', > 'connector.type'='filesystem', > 'format.derive-schema'='true', > 'format.type'='csv' > ); > SELECT COUNT(1) FROM testdata WHERE false; > If the configuration's type is batch ,the result will be 0, but if the > configuration is streaming, there will be no value; > *The configuration is:* > execution: > planner: blink > type: streaming > *The input data is:* > {code:java} > 1|1 > 1|2 > 2|1 > 2|2 > 3|1 > 3|2 > |1 > 3| > | > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Closed] (FLINK-15407) Add document to explain how to write a table with PK
[ https://issues.apache.org/jira/browse/FLINK-15407?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jingsong Lee closed FLINK-15407. Resolution: Won't Fix > Add document to explain how to write a table with PK > > > Key: FLINK-15407 > URL: https://issues.apache.org/jira/browse/FLINK-15407 > Project: Flink > Issue Type: Bug > Components: Documentation, Table SQL / API >Reporter: Jingsong Lee >Assignee: Jingsong Lee >Priority: Major > Fix For: 1.11.0 > > > I have had several user problems: > Why is an error reported when writing the upsertsink: TableException: > UpsertStreamTableSink requires that Table has a full primary keys if it is > updated. > Users are confused. > I think we can consider writing a document to describe it. > User need careful like: > > {code:java} > insert into result_table select pk1, if(pk2 is null, '', pk2) as pk2, > count(*), sum(f3) from source group by pk1, pk2; {code} > This will failed. > > {code:java} > insert into result_table select pk1, pk2, count(*), sum(f1) from (select pk1, > if(pk2 is null, '', pk2) as pk2, f1 from source) group by pk1, pk2; > {code} > This can work. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (FLINK-14258) Integrate hive to FileSystemTableFactory
[ https://issues.apache.org/jira/browse/FLINK-14258?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jingsong Lee reassigned FLINK-14258: Assignee: (was: Jingsong Lee) > Integrate hive to FileSystemTableFactory > > > Key: FLINK-14258 > URL: https://issues.apache.org/jira/browse/FLINK-14258 > Project: Flink > Issue Type: Sub-task > Components: Connectors / Hive >Reporter: Jingsong Lee >Priority: Major > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (FLINK-14256) Introduce FileSystemTableFactory with partitioned support
[ https://issues.apache.org/jira/browse/FLINK-14256?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jingsong Lee reassigned FLINK-14256: Assignee: (was: Jingsong Lee) > Introduce FileSystemTableFactory with partitioned support > - > > Key: FLINK-14256 > URL: https://issues.apache.org/jira/browse/FLINK-14256 > Project: Flink > Issue Type: New Feature > Components: Table SQL / Planner >Reporter: Jingsong Lee >Priority: Major > > Introduce FileSystemTableFactory to unify all file system connectors. > FileSystemTableFactory use FileSystemInputFormatFactory to get the format > reader. > FileSystemTableFactory use FileSystemOutputFormatFactory to get the format > writer. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-15407) Add document to explain how to write a table with PK
[ https://issues.apache.org/jira/browse/FLINK-15407?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17043152#comment-17043152 ] Jingsong Lee commented on FLINK-15407: -- We should have better design with changelog support to this, close this one. > Add document to explain how to write a table with PK > > > Key: FLINK-15407 > URL: https://issues.apache.org/jira/browse/FLINK-15407 > Project: Flink > Issue Type: Bug > Components: Documentation, Table SQL / API >Reporter: Jingsong Lee >Assignee: Jingsong Lee >Priority: Major > Fix For: 1.11.0 > > > I have had several user problems: > Why is an error reported when writing the upsertsink: TableException: > UpsertStreamTableSink requires that Table has a full primary keys if it is > updated. > Users are confused. > I think we can consider writing a document to describe it. > User need careful like: > > {code:java} > insert into result_table select pk1, if(pk2 is null, '', pk2) as pk2, > count(*), sum(f3) from source group by pk1, pk2; {code} > This will failed. > > {code:java} > insert into result_table select pk1, pk2, count(*), sum(f1) from (select pk1, > if(pk2 is null, '', pk2) as pk2, f1 from source) group by pk1, pk2; > {code} > This can work. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (FLINK-15960) support creating Hive tables, views, functions within Flink
[ https://issues.apache.org/jira/browse/FLINK-15960?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jingsong Lee reassigned FLINK-15960: Assignee: Rui Li (was: Jingsong Lee) > support creating Hive tables, views, functions within Flink > --- > > Key: FLINK-15960 > URL: https://issues.apache.org/jira/browse/FLINK-15960 > Project: Flink > Issue Type: Improvement > Components: Connectors / Hive >Reporter: Bowen Li >Assignee: Rui Li >Priority: Major > Fix For: 1.11.0 > > > support creating Hive tables, views, functions within Flink, to achieve > higher interoperability between Flink and Hive, and not requiring users to > switch between Flink and Hive CLIs. > Have heard such requests from multiple Flink-Hive users > > cc [~ykt836] [~lirui] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (FLINK-15255) document how to create Hive table from java API and DDL
[ https://issues.apache.org/jira/browse/FLINK-15255?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jingsong Lee reassigned FLINK-15255: Assignee: Rui Li (was: Jingsong Lee) > document how to create Hive table from java API and DDL > --- > > Key: FLINK-15255 > URL: https://issues.apache.org/jira/browse/FLINK-15255 > Project: Flink > Issue Type: Task > Components: Documentation >Reporter: Bowen Li >Assignee: Rui Li >Priority: Major > > documentation Jira for FLINK-15960 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (FLINK-15809) component stack page needs to be updated for blink planner
[ https://issues.apache.org/jira/browse/FLINK-15809?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jingsong Lee reassigned FLINK-15809: Assignee: (was: Jingsong Lee) > component stack page needs to be updated for blink planner > -- > > Key: FLINK-15809 > URL: https://issues.apache.org/jira/browse/FLINK-15809 > Project: Flink > Issue Type: Bug > Components: Documentation >Reporter: Bowen Li >Priority: Critical > Fix For: 1.10.1, 1.11.0 > > > [https://ci.apache.org/projects/flink/flink-docs-master/internals/components.html] > needs to be updated to reflect latest stack components > > cc [~ykt836] [~jark] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (FLINK-16027) kafka connector's 'connector.topic' property should be optional rather than required
[ https://issues.apache.org/jira/browse/FLINK-16027?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jingsong Lee reassigned FLINK-16027: Assignee: (was: Jingsong Lee) > kafka connector's 'connector.topic' property should be optional rather than > required > > > Key: FLINK-16027 > URL: https://issues.apache.org/jira/browse/FLINK-16027 > Project: Flink > Issue Type: Improvement > Components: Connectors / Kafka >Reporter: Bowen Li >Priority: Major > Fix For: 1.11.0 > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (FLINK-16024) support filter pushdown in jdbc connector
[ https://issues.apache.org/jira/browse/FLINK-16024?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jingsong Lee reassigned FLINK-16024: Assignee: (was: Jingsong Lee) > support filter pushdown in jdbc connector > - > > Key: FLINK-16024 > URL: https://issues.apache.org/jira/browse/FLINK-16024 > Project: Flink > Issue Type: Improvement > Components: Connectors / JDBC >Reporter: Bowen Li >Priority: Major > Fix For: 1.11.0 > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (FLINK-13777) Introduce sql function wrappers and conversion to ExpressionConverter
[ https://issues.apache.org/jira/browse/FLINK-13777?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jingsong Lee reassigned FLINK-13777: Assignee: (was: Jingsong Lee) > Introduce sql function wrappers and conversion to ExpressionConverter > - > > Key: FLINK-13777 > URL: https://issues.apache.org/jira/browse/FLINK-13777 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / Planner >Reporter: Jingsong Lee >Priority: Major > > For remove the extended calcite sql functions, we introduce wrappers to wrap > flink FunctionDefinition: > 1.Add SqlReturnTypeInferenceWrapper to wrap TypeStrategy > 2.Add SqlOperandTypeInferenceWrapper to wrap InputTypeStrategy > 3.Add SqlOperandTypeCheckerWrapper to wrap InputTypeValidator > 4.Add SqlFunctionWrapper to wrap SqlFunction > 5.Add SqlFunctionWrapper converter and Standard sql converter -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (FLINK-13113) Introduce range partition in blink
[ https://issues.apache.org/jira/browse/FLINK-13113?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jingsong Lee reassigned FLINK-13113: Assignee: (was: Jingsong Lee) > Introduce range partition in blink > -- > > Key: FLINK-13113 > URL: https://issues.apache.org/jira/browse/FLINK-13113 > Project: Flink > Issue Type: New Feature > Components: Table SQL / Runtime >Reporter: Jingsong Lee >Priority: Major > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (FLINK-14676) Introduce parallelism inference for source
[ https://issues.apache.org/jira/browse/FLINK-14676?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jingsong Lee reassigned FLINK-14676: Assignee: (was: Jingsong Lee) > Introduce parallelism inference for source > -- > > Key: FLINK-14676 > URL: https://issues.apache.org/jira/browse/FLINK-14676 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / Planner >Reporter: Jingsong Lee >Priority: Major > Labels: pull-request-available > Time Spent: 20m > Remaining Estimate: 0h > > FLINK-12801 has introduce parallelism setting for table, but because > TableSource generate DataStream, maybe DataStream is not a real source, that > will lead to some shuffle errors. So FLINK-13494 remove these implementations. > In this ticket, I would like to introduce parallelism inference only for > InputFormatTableSource, the RowCount of InputFormatTableSource is more > accurate than downstream stages. It is worth to automatically generate its > parallelism. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (FLINK-13786) Implement type inference for other functions
[ https://issues.apache.org/jira/browse/FLINK-13786?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jingsong Lee reassigned FLINK-13786: Assignee: (was: Jingsong Lee) > Implement type inference for other functions > > > Key: FLINK-13786 > URL: https://issues.apache.org/jira/browse/FLINK-13786 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API >Reporter: Jingsong Lee >Priority: Major > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (FLINK-13782) Implement type inference for logic functions
[ https://issues.apache.org/jira/browse/FLINK-13782?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jingsong Lee reassigned FLINK-13782: Assignee: (was: Jingsong Lee) > Implement type inference for logic functions > > > Key: FLINK-13782 > URL: https://issues.apache.org/jira/browse/FLINK-13782 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API >Reporter: Jingsong Lee >Priority: Major > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (FLINK-13783) Implement type inference for string functions
[ https://issues.apache.org/jira/browse/FLINK-13783?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jingsong Lee reassigned FLINK-13783: Assignee: (was: Jingsong Lee) > Implement type inference for string functions > - > > Key: FLINK-13783 > URL: https://issues.apache.org/jira/browse/FLINK-13783 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API >Reporter: Jingsong Lee >Priority: Major > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (FLINK-13780) Introduce ExpressionConverter to legacy planner
[ https://issues.apache.org/jira/browse/FLINK-13780?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jingsong Lee reassigned FLINK-13780: Assignee: (was: Jingsong Lee) > Introduce ExpressionConverter to legacy planner > --- > > Key: FLINK-13780 > URL: https://issues.apache.org/jira/browse/FLINK-13780 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / Legacy Planner >Reporter: Jingsong Lee >Priority: Major > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (FLINK-13781) Use new Expression in RexNodeToExpressionConverter
[ https://issues.apache.org/jira/browse/FLINK-13781?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jingsong Lee reassigned FLINK-13781: Assignee: (was: Jingsong Lee) > Use new Expression in RexNodeToExpressionConverter > -- > > Key: FLINK-13781 > URL: https://issues.apache.org/jira/browse/FLINK-13781 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / Legacy Planner >Reporter: Jingsong Lee >Priority: Major > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (FLINK-13785) Implement type inference for time functions
[ https://issues.apache.org/jira/browse/FLINK-13785?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jingsong Lee reassigned FLINK-13785: Assignee: (was: Jingsong Lee) > Implement type inference for time functions > --- > > Key: FLINK-13785 > URL: https://issues.apache.org/jira/browse/FLINK-13785 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API >Reporter: Jingsong Lee >Priority: Major > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (FLINK-13784) Implement type inference for math functions
[ https://issues.apache.org/jira/browse/FLINK-13784?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jingsong Lee reassigned FLINK-13784: Assignee: (was: Jingsong Lee) > Implement type inference for math functions > --- > > Key: FLINK-13784 > URL: https://issues.apache.org/jira/browse/FLINK-13784 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API >Reporter: Jingsong Lee >Priority: Major > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (FLINK-13773) Rework of the Expression Design
[ https://issues.apache.org/jira/browse/FLINK-13773?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jingsong Lee reassigned FLINK-13773: Assignee: (was: Jingsong Lee) > Rework of the Expression Design > --- > > Key: FLINK-13773 > URL: https://issues.apache.org/jira/browse/FLINK-13773 > Project: Flink > Issue Type: Improvement > Components: Table SQL / API, Table SQL / Legacy Planner, Table SQL / > Planner >Reporter: Jingsong Lee >Priority: Major > > This JIRA addresses several shortcomings of current: > - New Expressions still use PlannerExpressions to type inference and > to RexNode. Flnk-planner and blink-planner have a lot of repetitive code > and logic. > - Let TableApi and Cacite definitions consistent. > - Reduce the complexity of Function development. > - Powerful Function for user. > > Key changes can be summarized as follows: > - Improve the interface of FunctionDefinition. > - Introduce type inference for built-in functions. > - Introduce ExpressionConverter to convert Expression to calcite > RexNode. > - Remove repetitive code and logic in planners. > > Details: > [https://cwiki.apache.org/confluence/display/FLINK/FLIP-51%3A+Rework+of+the+Expression+Design] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-16253) Switch to Log4j 2 by default for flink-kubernetes submodule
[ https://issues.apache.org/jira/browse/FLINK-16253?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17043148#comment-17043148 ] Canbin Zheng commented on FLINK-16253: -- cc [~chesnay] > Switch to Log4j 2 by default for flink-kubernetes submodule > --- > > Key: FLINK-16253 > URL: https://issues.apache.org/jira/browse/FLINK-16253 > Project: Flink > Issue Type: Improvement > Components: Deployment / Kubernetes >Reporter: Canbin Zheng >Priority: Major > Fix For: 1.11.0 > > > Switch to Log4j 2 by default for flink-kubernetes submodule, including the > script and the container startup command or parameters. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-16253) Switch to Log4j 2 by default for flink-kubernetes submodule
Canbin Zheng created FLINK-16253: Summary: Switch to Log4j 2 by default for flink-kubernetes submodule Key: FLINK-16253 URL: https://issues.apache.org/jira/browse/FLINK-16253 Project: Flink Issue Type: Improvement Components: Deployment / Kubernetes Reporter: Canbin Zheng Fix For: 1.11.0 Switch to Log4j 2 by default for flink-kubernetes submodule, including the script and the container startup command or parameters. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-15786) Load connector code with separate classloader
[ https://issues.apache.org/jira/browse/FLINK-15786?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17043146#comment-17043146 ] Yang Wang commented on FLINK-15786: --- [~maguowei] Since the filesystem could be well supported via plugin mechanism. So do you mean to use the same way to load the connectors? BTW, i think the metrics reporter is in the similar situation. > Load connector code with separate classloader > - > > Key: FLINK-15786 > URL: https://issues.apache.org/jira/browse/FLINK-15786 > Project: Flink > Issue Type: Improvement > Components: Runtime / Task >Reporter: Guowei Ma >Priority: Major > Labels: usability > > Currently, connector code can be seen as part of user code. Usually, users > only need to add the corresponding connector as a dependency and package it > in the user jar. This is convenient enough. > However, connectors usually need to interact with external systems and often > introduce heavy dependencies, there is a high possibility of a class conflict > of different connectors or the user code of the same job. For example, every > one or two weeks, we will receive issue reports relevant with connector class > conflict from our users. The problem can get worse when users want to analyze > data from different sources and write output to different sinks. > Using separate classloader to load the different connector code could resolve > the problem. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] flinkbot edited a comment on issue #11176: [FLINK-15948][yarn] Enrich the warning log for YARN minimum allocation memory
flinkbot edited a comment on issue #11176: [FLINK-15948][yarn] Enrich the warning log for YARN minimum allocation memory URL: https://github.com/apache/flink/pull/11176#issuecomment-589671142 ## CI report: * 754693bae424353bac4cdd6d0db556ed8f6234f9 Travis: [SUCCESS](https://travis-ci.com/flink-ci/flink/builds/150231364) Azure: [SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5494) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] KarmaGYZ commented on issue #11197: [FLINK-16288][travis] Remove redundant double-quote at the end of the…
KarmaGYZ commented on issue #11197: [FLINK-16288][travis] Remove redundant double-quote at the end of the… URL: https://github.com/apache/flink/pull/11197#issuecomment-590157417 Travis link https://travis-ci.org/KarmaGYZ/flink/jobs/654212375. I don't know why the "e2e - container - scala 2.12" keep failing because of timeout in "flink-connector-gcp-pubsub" and "flink-azure-fs-hadoop" module. But it seems not related to this PR. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] lirui-apache commented on issue #11175: [FLINK-16197][hive] Failed to query partitioned table when partition …
lirui-apache commented on issue #11175: [FLINK-16197][hive] Failed to query partitioned table when partition … URL: https://github.com/apache/flink/pull/11175#issuecomment-590155149 Hi @bowenli86 , could you elaborate why it's not a good longterm strategy? IMHO, it should be up to each connector to decide how to handle discrepancy between metadata and storage. Because such discrepancies might be treated differently in the external systems. One system may consider the discrepancy as a fatal error, and another system may expect the discrepancy to happen from time to time and choose to tolerate it. Therefore I think each connector should follow the behavior of the external system it connects to. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] wangyang0918 commented on issue #11117: [FLINK-16115][filesystem] Make aliyun oss filesystem could work with plugin mechanism
wangyang0918 commented on issue #7: [FLINK-16115][filesystem] Make aliyun oss filesystem could work with plugin mechanism URL: https://github.com/apache/flink/pull/7#issuecomment-590153300 @zentol Could you have another look and help with merging? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-16215) Start redundant TaskExecutor when JM failed
[ https://issues.apache.org/jira/browse/FLINK-16215?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17043137#comment-17043137 ] Xintong Song commented on FLINK-16215: -- I share [~trohrmann]'s concern. On Yarn deployment, {{YarnResourceManager}} starts a {{TaskExecutor}} in two steps. 1. Requests a container from Yarn. 2. Launch the {{TaskExecutor}} process inside the allocated container. If the JM failover happens between the two steps, the container will be recovered but no {{TaskExecutor}} will be started inside it. I think it is a problem that for such a container, neither a {{TaskExecutor}} will be started in it, nor will it be released. This might be solved by FLINK-13554, with a timeout for starting new {{TaskExecutor}}s. We can apply this timeout to recovered containers as well. FYI, the Kubernetes deployment does not have this problem, because the pod/container is allocated and {{TaskExecutor}} is started in one step. > Start redundant TaskExecutor when JM failed > --- > > Key: FLINK-16215 > URL: https://issues.apache.org/jira/browse/FLINK-16215 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.10.0 >Reporter: YufeiLiu >Priority: Major > > TaskExecutor will reconnect to the new ResourceManager leader when JM failed, > and JobMaster will restart and reschedule job. If job slot request arrive > earlier than TM registration, RM will start new workers rather than reuse the > existing TMs. > It‘s hard to reproduce becasue TM registration usually come first, and > timeout check will stop redundant TMs. > But I think it would be better if we make the {{recoverWokerNode}} to > interface, and put recovered slots in {{pendingSlots}} wait for TM > reconnection. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-16215) Start redundant TaskExecutor when JM failed
[ https://issues.apache.org/jira/browse/FLINK-16215?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17043134#comment-17043134 ] Yang Wang commented on FLINK-16215: --- I think even we make {{recoverWokerNode}} as interface and do the recovery before slot request coming, we still could not completely avoid this problem. Since there is no guarantee that we could get all the previous containers from the recovery process. Some other containers may also be returned via the subsequent heartbeat. Maybe the {{JobMaster}} should be aware of the failover and could recover the running from {{TaskManager}}. If it fails with timeout, then allocate a new slot from {{ResourceManager}}. It is just a rough thought. Please correct me if i am wrong. > Start redundant TaskExecutor when JM failed > --- > > Key: FLINK-16215 > URL: https://issues.apache.org/jira/browse/FLINK-16215 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.10.0 >Reporter: YufeiLiu >Priority: Major > > TaskExecutor will reconnect to the new ResourceManager leader when JM failed, > and JobMaster will restart and reschedule job. If job slot request arrive > earlier than TM registration, RM will start new workers rather than reuse the > existing TMs. > It‘s hard to reproduce becasue TM registration usually come first, and > timeout check will stop redundant TMs. > But I think it would be better if we make the {{recoverWokerNode}} to > interface, and put recovered slots in {{pendingSlots}} wait for TM > reconnection. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] Jiayi-Liao commented on a change in pull request #11179: [FLINK-16178][FLINK-16192][checkpointing] Clean up checkpoint metadata code and remove remaining bits of "legacy state"
Jiayi-Liao commented on a change in pull request #11179: [FLINK-16178][FLINK-16192][checkpointing] Clean up checkpoint metadata code and remove remaining bits of "legacy state" URL: https://github.com/apache/flink/pull/11179#discussion_r383065779 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointSerializer.java ## @@ -52,6 +37,5 @@ * @return The deserialized savepoint * @throws IOException Serialization failures are forwarded */ - T deserialize(DataInputStream dis, ClassLoader userCodeClassLoader) throws IOException; - + SavepointV2 deserialize(DataInputStream dis, ClassLoader userCodeClassLoader) throws IOException; Review comment: You're right. I was misguided by the previous interface. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #11176: [FLINK-15948][yarn] Enrich the warning log for YARN minimum allocation memory
flinkbot edited a comment on issue #11176: [FLINK-15948][yarn] Enrich the warning log for YARN minimum allocation memory URL: https://github.com/apache/flink/pull/11176#issuecomment-589671142 ## CI report: * adb2e9e62190ba505ee86554a97452984fbc2de4 Travis: [SUCCESS](https://travis-ci.com/flink-ci/flink/builds/150018467) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5423) * 754693bae424353bac4cdd6d0db556ed8f6234f9 Travis: [PENDING](https://travis-ci.com/flink-ci/flink/builds/150231364) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] danny0405 commented on issue #11190: [FLINK-16089][docs] Translate "Data Type" page of "Table API & SQL" into Chinese
danny0405 commented on issue #11190: [FLINK-16089][docs] Translate "Data Type" page of "Table API & SQL" into Chinese URL: https://github.com/apache/flink/pull/11190#issuecomment-590146367 @wuchong , sure, let me take the review work. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] danny0405 commented on a change in pull request #11116: [FLINK-15349] add 'create catalog' DDL to blink planner
danny0405 commented on a change in pull request #6: [FLINK-15349] add 'create catalog' DDL to blink planner URL: https://github.com/apache/flink/pull/6#discussion_r383064990 ## File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java ## @@ -663,6 +665,15 @@ public void sqlUpdate(String stmt) { DropTempSystemFunctionOperation dropTempSystemFunctionOperation = (DropTempSystemFunctionOperation) operation; dropSystemFunction(dropTempSystemFunctionOperation); + } else if (operation instanceof CreateCatalogOperation) { + CreateCatalogOperation createCatalogOperation = (CreateCatalogOperation) operation; + String exMsg = getDDLOpExecuteErrorMsg(createCatalogOperation.asSummaryString()); + try { + catalogManager.registerCatalog( + createCatalogOperation.getCatalogName(), createCatalogOperation.getCatalog()); + } catch (CatalogException e) { + throw new ValidationException(exMsg, e); + } Review comment: Why not throw `CatalogException ` directly, does all the failures come from a validation ? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] xintongsong commented on a change in pull request #11176: [FLINK-15948][yarn] Enrich the warning log for YARN minimum allocation memory
xintongsong commented on a change in pull request #11176: [FLINK-15948][yarn] Enrich the warning log for YARN minimum allocation memory URL: https://github.com/apache/flink/pull/11176#discussion_r383064789 ## File path: flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java ## @@ -521,11 +521,18 @@ private ClusterSpecification validateClusterResources( int jobManagerMemoryMb = clusterSpecification.getMasterMemoryMB(); final int taskManagerMemoryMb = clusterSpecification.getTaskManagerMemoryMB(); - if (jobManagerMemoryMb < yarnMinAllocationMB || taskManagerMemoryMb < yarnMinAllocationMB) { - LOG.warn("The JobManager or TaskManager memory is below the smallest possible YARN Container size. " - + "The value of 'yarn.scheduler.minimum-allocation-mb' is '" + yarnMinAllocationMB + "'. Please increase the memory size." + - "YARN will allocate the smaller containers but the scheduler will account for the minimum-allocation-mb, maybe not all instances " + - "you requested will start."); + final List noteMem = new ArrayList<>(); + if (jobManagerMemoryMb % yarnMinAllocationMB != 0 || jobManagerMemoryMb == 0) { + noteMem.add("JobManager memory(" + jobManagerMemoryMb + ")"); + } + if (taskManagerMemoryMb % yarnMinAllocationMB != 0 || taskManagerMemoryMb == 0) { + noteMem.add("TaskManager memory(" + taskManagerMemoryMb + ")"); + } + if (noteMem.size() > 0) { + LOG.warn("The {} is not a multiple of YARN minimum allocation memory({}), so some extra memory will be wasted. " + + "Because YARN will always normalize the resource request by insuring that the requested memory is a multiple " + + "of minimum allocation. The value of 'yarn.scheduler.minimum-allocation-mb' is '" + yarnMinAllocationMB + "'. " + + "Please increase the specified memory size.", noteMem.toString(), yarnMinAllocationMB); Review comment: I would suggest to have a more concise log message. The current one mentions "multiple of YARN min allocation" and the configured value of yarnMinAllocationMB twice each, which is not necessary. I would suggest the following: `The configured {} memory is {} MB. YARN will allocate {} MB to make up an integer multiple of its minimum allocation memory ({} MB, configured via 'yarn.scheduler.minimum-allocation-mb'). The extra {} MB may not be used by Flink.` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] xintongsong commented on a change in pull request #11176: [FLINK-15948][yarn] Enrich the warning log for YARN minimum allocation memory
xintongsong commented on a change in pull request #11176: [FLINK-15948][yarn] Enrich the warning log for YARN minimum allocation memory URL: https://github.com/apache/flink/pull/11176#discussion_r383064879 ## File path: flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java ## @@ -521,11 +521,18 @@ private ClusterSpecification validateClusterResources( int jobManagerMemoryMb = clusterSpecification.getMasterMemoryMB(); final int taskManagerMemoryMb = clusterSpecification.getTaskManagerMemoryMB(); - if (jobManagerMemoryMb < yarnMinAllocationMB || taskManagerMemoryMb < yarnMinAllocationMB) { - LOG.warn("The JobManager or TaskManager memory is below the smallest possible YARN Container size. " - + "The value of 'yarn.scheduler.minimum-allocation-mb' is '" + yarnMinAllocationMB + "'. Please increase the memory size." + - "YARN will allocate the smaller containers but the scheduler will account for the minimum-allocation-mb, maybe not all instances " + - "you requested will start."); + final List noteMem = new ArrayList<>(); + if (jobManagerMemoryMb % yarnMinAllocationMB != 0 || jobManagerMemoryMb == 0) { + noteMem.add("JobManager memory(" + jobManagerMemoryMb + ")"); + } + if (taskManagerMemoryMb % yarnMinAllocationMB != 0 || taskManagerMemoryMb == 0) { + noteMem.add("TaskManager memory(" + taskManagerMemoryMb + ")"); + } + if (noteMem.size() > 0) { + LOG.warn("The {} is not a multiple of YARN minimum allocation memory({}), so some extra memory will be wasted. " + + "Because YARN will always normalize the resource request by insuring that the requested memory is a multiple " + + "of minimum allocation. The value of 'yarn.scheduler.minimum-allocation-mb' is '" + yarnMinAllocationMB + "'. " + + "Please increase the specified memory size.", noteMem.toString(), yarnMinAllocationMB); Review comment: I would suggest to wrap the checking and the logging into one method, and call it twice for both JM and TM. The method could look like: `logIfComponentMemNotIntegerMultipleOfYarnMinAllocation(String componentName, int componentMemoryMB, int yarnMinAllocationMB)` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Assigned] (FLINK-16252) Optimize the progress of the process_outputs in Python UDF
[ https://issues.apache.org/jira/browse/FLINK-16252?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dian Fu reassigned FLINK-16252: --- Assignee: Huang Xingbo > Optimize the progress of the process_outputs in Python UDF > -- > > Key: FLINK-16252 > URL: https://issues.apache.org/jira/browse/FLINK-16252 > Project: Flink > Issue Type: Improvement > Components: API / Python >Reporter: Huang Xingbo >Assignee: Huang Xingbo >Priority: Major > Fix For: 1.11.0 > > > We need to optimize the function call chains in process_outputs to improve > the performance in Python UDF -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (FLINK-16251) Optimize the cost of function call in ScalarFunctionOpertation
[ https://issues.apache.org/jira/browse/FLINK-16251?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dian Fu reassigned FLINK-16251: --- Assignee: Huang Xingbo > Optimize the cost of function call in ScalarFunctionOpertation > --- > > Key: FLINK-16251 > URL: https://issues.apache.org/jira/browse/FLINK-16251 > Project: Flink > Issue Type: Improvement > Components: API / Python >Reporter: Huang Xingbo >Assignee: Huang Xingbo >Priority: Major > Fix For: 1.11.0 > > > Currently, there are too many extra function calls cost in > ScalarFunctionOpertation.We need to optimize it to improve performance in > Python UDF. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-16252) Optimize the progress of the process_outputs in Python UDF
Huang Xingbo created FLINK-16252: Summary: Optimize the progress of the process_outputs in Python UDF Key: FLINK-16252 URL: https://issues.apache.org/jira/browse/FLINK-16252 Project: Flink Issue Type: Improvement Components: API / Python Reporter: Huang Xingbo Fix For: 1.11.0 We need to optimize the function call chains in process_outputs to improve the performance in Python UDF -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] danny0405 commented on a change in pull request #11116: [FLINK-15349] add 'create catalog' DDL to blink planner
danny0405 commented on a change in pull request #6: [FLINK-15349] add 'create catalog' DDL to blink planner URL: https://github.com/apache/flink/pull/6#discussion_r383064486 ## File path: flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/catalog/CatalogITCase.java ## @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.catalog; + +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.EnvironmentSettings; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.api.java.StreamTableEnvironment; +import org.apache.flink.table.catalog.GenericInMemoryCatalog; + +import org.junit.Test; + +import static org.apache.flink.table.descriptors.GenericInMemoryCatalogValidator.CATALOG_TYPE_VALUE_GENERIC_IN_MEMORY; +import static org.junit.Assert.assertTrue; + +/** + * IT Case for catalog ddl. + */ +public class CatalogITCase { + + @Test + public void testCreateCatalog() { + String name = "c1"; + TableEnvironment tableEnv = getTableEnvironment(); + String ddl = String.format("create catalog %s with('type'='%s')", name, CATALOG_TYPE_VALUE_GENERIC_IN_MEMORY); + + tableEnv.sqlUpdate(ddl); + + assertTrue(tableEnv.getCatalog(name).isPresent()); Review comment: I know that, just curious if they should have the same life cycle. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #11176: [FLINK-15948][yarn] Enrich the warning log for YARN minimum allocation memory
flinkbot edited a comment on issue #11176: [FLINK-15948][yarn] Enrich the warning log for YARN minimum allocation memory URL: https://github.com/apache/flink/pull/11176#issuecomment-589671142 ## CI report: * adb2e9e62190ba505ee86554a97452984fbc2de4 Travis: [SUCCESS](https://travis-ci.com/flink-ci/flink/builds/150018467) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5423) * 754693bae424353bac4cdd6d0db556ed8f6234f9 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-16251) Optimize the cost of function call in ScalarFunctionOpertation
Huang Xingbo created FLINK-16251: Summary: Optimize the cost of function call in ScalarFunctionOpertation Key: FLINK-16251 URL: https://issues.apache.org/jira/browse/FLINK-16251 Project: Flink Issue Type: Improvement Components: API / Python Reporter: Huang Xingbo Fix For: 1.11.0 Currently, there are too many extra function calls cost in ScalarFunctionOpertation.We need to optimize it to improve performance in Python UDF. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-16250) Add interfaces for PipelineStage and Pipeline
Hequn Cheng created FLINK-16250: --- Summary: Add interfaces for PipelineStage and Pipeline Key: FLINK-16250 URL: https://issues.apache.org/jira/browse/FLINK-16250 Project: Flink Issue Type: Sub-task Components: API / Python Reporter: Hequn Cheng Assignee: Hequn Cheng A pipeline is a linear workflow that chains some PipelineStages, e.g., Estimators and Transformers to execute an algorithm. After this issue is addressed, Python users can write Python Pipelines. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (FLINK-15172) Optimize the operator algorithm to lazily allocate memory
[ https://issues.apache.org/jira/browse/FLINK-15172?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jingsong Lee resolved FLINK-15172. -- Resolution: Fixed Merged in Master: c11c696cca3b6ecafab605cceb36b7541444993f 56acfe9f666f590352453c9df6fbc385894de22f f144e13acfe4ab37bfd9f65ae5cffd0b84fd78fe adb595cd1445d0eefe9e23f85c79d8e9640cf07b a907f29e24d27d1da9c981abed8ece2b211355eb > Optimize the operator algorithm to lazily allocate memory > - > > Key: FLINK-15172 > URL: https://issues.apache.org/jira/browse/FLINK-15172 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / Runtime >Reporter: Jingsong Lee >Assignee: Jingsong Lee >Priority: Major > Labels: pull-request-available > Fix For: 1.11.0 > > Time Spent: 20m > Remaining Estimate: 0h > > Now after FLINK-14063 , operators will get all manage memory of TaskManager, > The cost of over allocate memory is very high, lead to performance regression > of small batch sql jobs: > * Allocate memory will have the cost of memory management algorithm. > * Allocate memory will have the cost of memory initialization, will set all > memory to zero. And this initialization will require the operating system to > actually allocate physical memory. > * Over allocate memory will squash the file cache too. > We can optimize the operator algorithm, apply lazy allocation, and avoid > meaningless memory allocation. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-16249) Add interfaces for Params, ParamInfo and WithParams
Hequn Cheng created FLINK-16249: --- Summary: Add interfaces for Params, ParamInfo and WithParams Key: FLINK-16249 URL: https://issues.apache.org/jira/browse/FLINK-16249 Project: Flink Issue Type: Sub-task Components: API / Python Reporter: Hequn Cheng Assignee: Hequn Cheng Parameters are widely used in machine learning realm. These classes define common interfaces to interact with classes with parameters. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] JingsongLi commented on issue #10797: [FLINK-15172][table-blink] Optimize the operator algorithm to lazily allocate memory
JingsongLi commented on issue #10797: [FLINK-15172][table-blink] Optimize the operator algorithm to lazily allocate memory URL: https://github.com/apache/flink/pull/10797#issuecomment-590144331 Thanks @TsReaper for the reviewing. Merged. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] JingsongLi closed pull request #10797: [FLINK-15172][table-blink] Optimize the operator algorithm to lazily allocate memory
JingsongLi closed pull request #10797: [FLINK-15172][table-blink] Optimize the operator algorithm to lazily allocate memory URL: https://github.com/apache/flink/pull/10797 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-16248) Add interfaces for MLEnvironment and MLEnvironmentFactory
Hequn Cheng created FLINK-16248: --- Summary: Add interfaces for MLEnvironment and MLEnvironmentFactory Key: FLINK-16248 URL: https://issues.apache.org/jira/browse/FLINK-16248 Project: Flink Issue Type: Sub-task Components: API / Python Reporter: Hequn Cheng Assignee: Hequn Cheng Align interface for MLEnvironment and MLEnvironmentFactory, so Python users can use Python MLEnvironmentFactory to maintain execution environment and table environment. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (FLINK-14509) Improve the README.md in pyflink to prepare for PyPI release
[ https://issues.apache.org/jira/browse/FLINK-14509?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dian Fu reassigned FLINK-14509: --- Assignee: Wei Zhong (was: Weizhong) > Improve the README.md in pyflink to prepare for PyPI release > > > Key: FLINK-14509 > URL: https://issues.apache.org/jira/browse/FLINK-14509 > Project: Flink > Issue Type: Sub-task > Components: API / Python >Reporter: Dian Fu >Assignee: Wei Zhong >Priority: Major > Labels: pull-request-available > Fix For: 1.10.0 > > Time Spent: 20m > Remaining Estimate: 0h > > Currently there are still some information missing for the README.md in > pyflink, such as the pyflink documentation link, etc. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] KarmaGYZ commented on a change in pull request #11100: [FLINK-15562][docs] Add Example settings.xml for maven archetype command wh…
KarmaGYZ commented on a change in pull request #11100: [FLINK-15562][docs] Add Example settings.xml for maven archetype command wh… URL: https://github.com/apache/flink/pull/11100#discussion_r383061588 ## File path: docs/dev/projectsetup/scala_api_quickstart.md ## @@ -128,27 +128,46 @@ Use one of the following commands to __create a project__: -{% highlight bash %} -$ mvn archetype:generate \ - -DarchetypeGroupId=org.apache.flink \ - -DarchetypeArtifactId=flink-quickstart-scala \{% unless site.is_stable %} - -DarchetypeCatalog=https://repository.apache.org/content/repositories/snapshots/ \{% endunless %} - -DarchetypeVersion={{site.version}} -{% endhighlight %} +{% highlight bash %} +$ mvn archetype:generate \ + -DarchetypeGroupId=org.apache.flink \ + -DarchetypeArtifactId=flink-quickstart-scala \{% unless site.is_stable %} + -DarchetypeCatalog=https://repository.apache.org/content/repositories/snapshots/ \{% endunless %} + -DarchetypeVersion={{site.version}} +{% endhighlight %} This allows you to name your newly created project. It will interactively ask you for the groupId, artifactId, and package name. {% highlight bash %} {% if site.is_stable %} -$ curl https://flink.apache.org/q/quickstart-scala.sh | bash -s {{site.version}} +$ curl https://flink.apache.org/q/quickstart-scala.sh | bash -s {{site.version}} {% else %} -$ curl https://flink.apache.org/q/quickstart-scala-SNAPSHOT.sh | bash -s {{site.version}} +$ curl https://flink.apache.org/q/quickstart-scala-SNAPSHOT.sh | bash -s {{site.version}} {% endif %} {% endhighlight %} {% unless site.is_stable %} -Note: For Maven 3.0 or higher, it is no longer possible to specify the repository (-DarchetypeCatalog) via the commandline. If you wish to use the snapshot repository, you need to add a repository entry to your settings.xml. For details about this change, please refer to http://maven.apache.org/archetype/maven-archetype-plugin/archetype-repository.html;>Maven official document +Note: For Maven 3.0 or higher, it is no longer possible to specify the repository (-DarchetypeCatalog) via the command line. For details about this change, please refer to http://maven.apache.org/archetype/maven-archetype-plugin/archetype-repository.html;>Maven official document +If you wish to use the snapshot repository, you need to add a repository entry to your settings.xml. For example: +{% highlight bash %} + + Review comment: Sorry, I did not understand what you mean. Do you mean to set it as the default profile? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Assigned] (FLINK-16031) Improve the description in the README file of PyFlink 1.9.x
[ https://issues.apache.org/jira/browse/FLINK-16031?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dian Fu reassigned FLINK-16031: --- Assignee: Wei Zhong (was: Weizhong) > Improve the description in the README file of PyFlink 1.9.x > > > Key: FLINK-16031 > URL: https://issues.apache.org/jira/browse/FLINK-16031 > Project: Flink > Issue Type: Improvement > Components: API / Python >Affects Versions: 1.9.1 >Reporter: Wei Zhong >Assignee: Wei Zhong >Priority: Major > Labels: pull-request-available > Fix For: 1.9.2, 1.9.3 > > Time Spent: 10m > Remaining Estimate: 0h > > Currently, the description in the README file of PyFlink 1.9.x is not > suitable for publishing in PyPI. It should be changed to be more > user-friendly. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (FLINK-16031) Improve the description in the README file of PyFlink 1.9.x
[ https://issues.apache.org/jira/browse/FLINK-16031?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dian Fu reassigned FLINK-16031: --- Assignee: Weizhong > Improve the description in the README file of PyFlink 1.9.x > > > Key: FLINK-16031 > URL: https://issues.apache.org/jira/browse/FLINK-16031 > Project: Flink > Issue Type: Improvement > Components: API / Python >Affects Versions: 1.9.1 >Reporter: Wei Zhong >Assignee: Weizhong >Priority: Major > Labels: pull-request-available > Fix For: 1.9.2, 1.9.3 > > Time Spent: 10m > Remaining Estimate: 0h > > Currently, the description in the README file of PyFlink 1.9.x is not > suitable for publishing in PyPI. It should be changed to be more > user-friendly. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (FLINK-14509) Improve the README.md in pyflink to prepare for PyPI release
[ https://issues.apache.org/jira/browse/FLINK-14509?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dian Fu reassigned FLINK-14509: --- Assignee: Weizhong (was: Wei Zhong (old)) > Improve the README.md in pyflink to prepare for PyPI release > > > Key: FLINK-14509 > URL: https://issues.apache.org/jira/browse/FLINK-14509 > Project: Flink > Issue Type: Sub-task > Components: API / Python >Reporter: Dian Fu >Assignee: Weizhong >Priority: Major > Labels: pull-request-available > Fix For: 1.10.0 > > Time Spent: 20m > Remaining Estimate: 0h > > Currently there are still some information missing for the README.md in > pyflink, such as the pyflink documentation link, etc. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] flinkbot edited a comment on issue #11192: [FLINK-16237][build] Add Log4j2 configuration properties
flinkbot edited a comment on issue #11192: [FLINK-16237][build] Add Log4j2 configuration properties URL: https://github.com/apache/flink/pull/11192#issuecomment-590057592 ## CI report: * 6500c5170375b837320f00a032d64a7eec69135d Travis: [SUCCESS](https://travis-ci.com/flink-ci/flink/builds/150225021) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot commented on issue #11197: [FLINK-16288][travis] Remove redundant double-quote at the end of the…
flinkbot commented on issue #11197: [FLINK-16288][travis] Remove redundant double-quote at the end of the… URL: https://github.com/apache/flink/pull/11197#issuecomment-590137926 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Automated Checks Last check on commit 45a5042fe34c700f4f9d84e207a788c44e36c055 (Mon Feb 24 00:52:46 UTC 2020) **Warnings:** * No documentation files were touched! Remember to keep the Flink docs up to date! Mention the bot in a comment to re-run the automated checks. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] KarmaGYZ opened a new pull request #11197: [FLINK-16288][travis] Remove redundant double-quote at the end of the…
KarmaGYZ opened a new pull request #11197: [FLINK-16288][travis] Remove redundant double-quote at the end of the… URL: https://github.com/apache/flink/pull/11197 … env field ## What is the purpose of the change Remove redundant double-quote at the end of the env field, which cause mesos e2e test fail. ## Brief change log Remove redundant double-quote at the end of the env field. ## Verifying this change Trigger e2e test ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode
gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode URL: https://github.com/apache/flink/pull/7368#discussion_r383055795 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/ByteBufUtils.java ## @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.netty; + +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; + +/** + * Utility routines to process Netty ByteBuf. + */ +public class ByteBufUtils { + + /** +* Cumulates data from the source buffer to the target buffer. +* +* @param cumulationBuf The target buffer. +* @param src The source buffer. +* @param expectedSize The expected length to cumulate. +* +* @return The ByteBuf containing cumulated data or null if not enough data has been cumulated. +*/ + public static ByteBuf cumulate(ByteBuf cumulationBuf, ByteBuf src, int expectedSize) { Review comment: Have renamed `cumulate` into `accumulate` and `cumulation` into `accumulation`, and renamed the variable to target. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode
gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode URL: https://github.com/apache/flink/pull/7368#discussion_r383055217 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/ByteBufUtils.java ## @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.netty; + +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; + +/** + * Utility routines to process Netty ByteBuf. + */ +public class ByteBufUtils { + + /** +* Cumulates data from the source buffer to the target buffer. +* +* @param cumulationBuf The target buffer. +* @param src The source buffer. +* @param expectedSize The expected length to cumulate. +* +* @return The ByteBuf containing cumulated data or null if not enough data has been cumulated. +*/ + public static ByteBuf cumulate(ByteBuf cumulationBuf, ByteBuf src, int expectedSize) { + // If the cumulation buffer is empty and src has enought bytes, Review comment: Fixed the typo. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode
gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode URL: https://github.com/apache/flink/pull/7368#discussion_r383055205 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NetworkBufferAllocator.java ## @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.netty; + +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.core.memory.MemorySegmentFactory; +import org.apache.flink.runtime.io.network.NetworkClientHandler; +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler; +import org.apache.flink.runtime.io.network.buffer.NetworkBuffer; +import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID; +import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel; + +import javax.annotation.Nullable; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * An allocator used for requesting buffers in the receiver side of netty handlers. + */ +class NetworkBufferAllocator { + private final NetworkClientHandler partitionRequestClientHandler; Review comment: Renamed the variable. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode
gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode URL: https://github.com/apache/flink/pull/7368#discussion_r383055079 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/BufferResponseDecoder.java ## @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.netty; + +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext; + +import static org.apache.flink.runtime.io.network.netty.NettyMessage.BufferResponse; +import static org.apache.flink.runtime.io.network.netty.NettyMessage.BufferResponse.MESSAGE_HEADER_LENGTH; + +/** + * The parser for {@link org.apache.flink.runtime.io.network.netty.NettyMessage.BufferResponse}. + */ +class BufferResponseDecoder extends NettyMessageDecoder { + + /** The Flink Buffer allocator. */ + private final NetworkBufferAllocator allocator; + + /** The cumulation buffer of message header. */ + private ByteBuf messageHeaderBuffer; + + /** +* The current BufferResponse message that are process the buffer part. +* If it is null, we are still processing the message header part, otherwise +* we are processing the buffer part. +*/ + private BufferResponse currentResponse; + + /** How much bytes have been received or discarded for the buffer part. */ Review comment: Have modified accordingly. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode
gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode URL: https://github.com/apache/flink/pull/7368#discussion_r383055067 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/BufferResponseDecoder.java ## @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.netty; + +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext; + +import static org.apache.flink.runtime.io.network.netty.NettyMessage.BufferResponse; +import static org.apache.flink.runtime.io.network.netty.NettyMessage.BufferResponse.MESSAGE_HEADER_LENGTH; + +/** + * The parser for {@link org.apache.flink.runtime.io.network.netty.NettyMessage.BufferResponse}. Review comment: Have modified Accordingly. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode
gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode URL: https://github.com/apache/flink/pull/7368#discussion_r383055157 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NonBufferResponseDecoder.java ## @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.netty; + +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext; + +import java.net.ProtocolException; + +/** + * The parser for messages without specific parser. It receives the whole + * messages and then delegate the parsing to the targeted messages. + */ +class NonBufferResponseDecoder extends NettyMessageDecoder { + + /** The initial size of the message header cumulator buffer. */ + private static final int INITIAL_MESSAGE_HEADER_BUFFER_LENGTH = 128; + + /** The cumulation buffer of message header. */ + private ByteBuf messageBuffer; + + @Override + public void onChannelActive(ChannelHandlerContext ctx) { + messageBuffer = ctx.alloc().directBuffer(INITIAL_MESSAGE_HEADER_BUFFER_LENGTH); + } + + @Override + public ParseResult onChannelRead(ByteBuf data) throws Exception { + ensureBufferCapacityIfNewMessage(); + + ByteBuf toDecode = ByteBufUtils.cumulate(messageBuffer, data, messageLength); + + if (toDecode == null) { + return ParseResult.notFinished(); + } + + NettyMessage nettyMessage; + switch (msgId) { + case NettyMessage.ErrorResponse.ID: Review comment: Modified to add `import static org.apache.flink.runtime.io.network.netty.NettyMessage.ErrorResponse` in advance. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services