[jira] [Created] (FLINK-16254) Support -p/--parallelism option for StatefulFunctionsClusterEntryPoint

2020-02-23 Thread Tzu-Li (Gordon) Tai (Jira)
Tzu-Li (Gordon) Tai created FLINK-16254:
---

 Summary: Support -p/--parallelism option for 
StatefulFunctionsClusterEntryPoint
 Key: FLINK-16254
 URL: https://issues.apache.org/jira/browse/FLINK-16254
 Project: Flink
  Issue Type: New Feature
  Components: Stateful Functions
Affects Versions: statefun-1.1
Reporter: Tzu-Li (Gordon) Tai
Assignee: Tzu-Li (Gordon) Tai


Currently the only way for users to specify parallelism > 1 for Stateful 
Functions applications is to provide a value for {{parallelism.default}} via 
{{flink-conf.yaml}}.

That is not so nice to use, as users would essentially need to rebuild the 
Stateful Functions application image just to change the parallelism.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot commented on issue #11198: [FLINK-16248][python][ml] Add interfaces for MLEnvironment and MLEnvironmentFactory

2020-02-23 Thread GitBox
flinkbot commented on issue #11198: [FLINK-16248][python][ml] Add interfaces 
for MLEnvironment and MLEnvironmentFactory
URL: https://github.com/apache/flink/pull/11198#issuecomment-590196380
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Automated Checks
   Last check on commit 2683ef7e092d4026bad2307c4cbb204aa052be1d (Mon Feb 24 
07:31:21 UTC 2020)
   
   **Warnings:**
* No documentation files were touched! Remember to keep the Flink docs up 
to date!
   
   
   Mention the bot in a comment to re-run the automated checks.
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full 
explanation of the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Assigned] (FLINK-16161) Statistics zero should be unknown in HiveCatalog

2020-02-23 Thread Jingsong Lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16161?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jingsong Lee reassigned FLINK-16161:


Assignee: Jingsong Lee

> Statistics zero should be unknown in HiveCatalog
> 
>
> Key: FLINK-16161
> URL: https://issues.apache.org/jira/browse/FLINK-16161
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Hive
>Reporter: Jingsong Lee
>Assignee: Jingsong Lee
>Priority: Blocker
> Fix For: 1.10.1
>
>
> In hive, treat statistics zero as unknown, but in Flink HiveCatalog, treat 
> zero as real value.
> This lead wrong inputs to CBO.
> Previous discussed in [https://github.com/apache/flink/pull/10380]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] hequn8128 commented on issue #11198: [FLINK-16248][python][ml] Add interfaces for MLEnvironment and MLEnvironmentFactory

2020-02-23 Thread GitBox
hequn8128 commented on issue #11198: [FLINK-16248][python][ml] Add interfaces 
for MLEnvironment and MLEnvironmentFactory
URL: https://github.com/apache/flink/pull/11198#issuecomment-590192859
 
 
   @becketqin @walterddr Would be great if you can also take a look. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-16248) Add interfaces for MLEnvironment and MLEnvironmentFactory

2020-02-23 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16248?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-16248:
---
Labels: pull-request-available  (was: )

> Add interfaces for MLEnvironment and MLEnvironmentFactory
> -
>
> Key: FLINK-16248
> URL: https://issues.apache.org/jira/browse/FLINK-16248
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / Python
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>  Labels: pull-request-available
>
> Align interface for MLEnvironment and MLEnvironmentFactory, so Python users 
> can use Python MLEnvironmentFactory to maintain execution environment and 
> table environment.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] hequn8128 opened a new pull request #11198: [FLINK-16248][python][ml] Add interfaces for MLEnvironment and MLEnvironmentFactory

2020-02-23 Thread GitBox
hequn8128 opened a new pull request #11198: [FLINK-16248][python][ml] Add 
interfaces for MLEnvironment and MLEnvironmentFactory
URL: https://github.com/apache/flink/pull/11198
 
 
   
   ## What is the purpose of the change
   
   Align interface for MLEnvironment and MLEnvironmentFactory, so Python users 
can use Python MLEnvironmentFactory to maintain execution environment and table 
environment.
   
   
   ## Brief change log
   
 - Add MLEnvironment and MLEnvironmentFactory interface. 
 - Remove is_blink_planner parameter for TableEnvironment init method.
 - Add tests.
   
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   
 - Add MLEnvironmentTest to test the logic in MLEnvironment.
 - Add MLEnvironmentFactoryTest to test the logic in MLEnvironmentFactory.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (Yes)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes)
 - If yes, how is the feature documented? (PythonDocs)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] PatrickRen commented on a change in pull request #11151: [FLINK-16128][docs]Translate "Google Cloud PubSub" page into Chinese

2020-02-23 Thread GitBox
PatrickRen commented on a change in pull request #11151: 
[FLINK-16128][docs]Translate "Google Cloud PubSub" page into Chinese
URL: https://github.com/apache/flink/pull/11151#discussion_r383101507
 
 

 ##
 File path: docs/dev/connectors/pubsub.zh.md
 ##
 @@ -23,9 +23,7 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-This connector provides a Source and Sink that can read from and write to
-[Google Cloud PubSub](https://cloud.google.com/pubsub). To use this connector, 
add the
-following dependency to your project:
+这个连接器可向 [Google Cloud PubSub](https://cloud.google.com/pubsub) 
读取与写入数据。添加下面的依赖来使用此连接器:
 
 Review comment:
   "Source and Sink" is not shown in the translated sentence


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] PatrickRen commented on a change in pull request #11151: [FLINK-16128][docs]Translate "Google Cloud PubSub" page into Chinese

2020-02-23 Thread GitBox
PatrickRen commented on a change in pull request #11151: 
[FLINK-16128][docs]Translate "Google Cloud PubSub" page into Chinese
URL: https://github.com/apache/flink/pull/11151#discussion_r383103413
 
 

 ##
 File path: docs/dev/connectors/pubsub.zh.md
 ##
 @@ -106,17 +97,18 @@ dataStream.addSink(pubsubSink);
 
 ### Google Credentials
 
-Google uses 
[Credentials](https://cloud.google.com/docs/authentication/production) to 
authenticate and authorize applications so that they can use Google Cloud 
Platform resources (such as PubSub).
+应用程序需要使用 
[Credentials](https://cloud.google.com/docs/authentication/production) 
来通过认证和授权才能使用 Google Cloud Platform 的资源,例如PubSub。
 
 Review comment:
   例如[space]PubSub


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] PatrickRen commented on a change in pull request #11151: [FLINK-16128][docs]Translate "Google Cloud PubSub" page into Chinese

2020-02-23 Thread GitBox
PatrickRen commented on a change in pull request #11151: 
[FLINK-16128][docs]Translate "Google Cloud PubSub" page into Chinese
URL: https://github.com/apache/flink/pull/11151#discussion_r383102874
 
 

 ##
 File path: docs/dev/connectors/pubsub.zh.md
 ##
 @@ -39,24 +37,17 @@ following dependency to your project:
 Note: This connector has been added to Flink recently. It has not 
received widespread testing yet.
 
 
-Note that the streaming connectors are currently not part of the binary
-distribution. See
-[here]({{ site.baseurl }}/dev/projectsetup/dependencies.html)
-for information about how to package the program with the libraries for
-cluster execution.
-
-
+注意连接器目前还不是二进制发行版的一部分,添加依赖、打包配置以及集群运行信息请参考[这里]({{ site.baseurl 
}}/dev/projectsetup/dependencies.html)
 
 ## Consuming or Producing PubSubMessages
 
-The connector provides a connectors for receiving and sending messages from 
and to Google PubSub.
-Google PubSub has an `at-least-once` guarantee and as such the connector 
delivers the same guarantees.
+连接器可以接收和发送 Google PubSub 的信息。和 Google PubSub 一样,这个连接器能够保证`至少一次`的语义。
 
 ### PubSub SourceFunction
 
-The class `PubSubSource` has a builder to create PubSubsources: 
`PubSubSource.newBuilder(...)`
+`PubSubSource` 类的对象由构建类来构建: `PubSubSource.newBuilder(...)`
 
-There are several optional methods to alter how the PubSubSource is created, 
the bare minimum is to provide a Google project, Pubsub subscription and a way 
to deserialize the PubSubMessages.
+除了 Google project,Pubsub subscription 和反序列化 PubSubMessage的方法是必须的,你有多种可选的方法来创建 
PubSubSource。
 
 Review comment:
   I think it's better to be:
   有多种可选的方法来创建 PubSubSource,但最低要求是要提供 Google Project、Pubsub 订阅和反序列化 
PubSubMessages 的方法。


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] PatrickRen commented on a change in pull request #11151: [FLINK-16128][docs]Translate "Google Cloud PubSub" page into Chinese

2020-02-23 Thread GitBox
PatrickRen commented on a change in pull request #11151: 
[FLINK-16128][docs]Translate "Google Cloud PubSub" page into Chinese
URL: https://github.com/apache/flink/pull/11151#discussion_r383103168
 
 

 ##
 File path: docs/dev/connectors/pubsub.zh.md
 ##
 @@ -77,13 +68,13 @@ streamExecEnv.addSource(source);
 
 
 
-Currently the source functions 
[pulls](https://cloud.google.com/pubsub/docs/pull) messages from PubSub, [push 
endpoints](https://cloud.google.com/pubsub/docs/push) are not supported.
+当前还不支持 PubSub 的source functions 
[pulls](https://cloud.google.com/pubsub/docs/pull) messages和 [push 
endpoints](https://cloud.google.com/pubsub/docs/push)。
 
 Review comment:
   PubSub 的[space]source functions ... messages[space]和...


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] PatrickRen commented on a change in pull request #11151: [FLINK-16128][docs]Translate "Google Cloud PubSub" page into Chinese

2020-02-23 Thread GitBox
PatrickRen commented on a change in pull request #11151: 
[FLINK-16128][docs]Translate "Google Cloud PubSub" page into Chinese
URL: https://github.com/apache/flink/pull/11151#discussion_r383101821
 
 

 ##
 File path: docs/dev/connectors/pubsub.zh.md
 ##
 @@ -39,24 +37,17 @@ following dependency to your project:
 Note: This connector has been added to Flink recently. It has not 
received widespread testing yet.
 
 Review comment:
   The note here is not translated


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] PatrickRen commented on a change in pull request #11151: [FLINK-16128][docs]Translate "Google Cloud PubSub" page into Chinese

2020-02-23 Thread GitBox
PatrickRen commented on a change in pull request #11151: 
[FLINK-16128][docs]Translate "Google Cloud PubSub" page into Chinese
URL: https://github.com/apache/flink/pull/11151#discussion_r383105282
 
 

 ##
 File path: docs/dev/connectors/pubsub.zh.md
 ##
 @@ -143,22 +135,22 @@ env.addSource(pubsubSource)
 
 
 
-### Atleast once guarantee
+### 精确一次语义保证
 
  SourceFunction
 
-There are several reasons why a message might be send multiple times, such as 
failure scenarios on Google PubSub's side.
+有很多原因导致会一个信息会被多次发出,例如 Google PubSub 的故障。
 
-Another reason is when the acknowledgement deadline has passed. This is the 
time between receiving the message and between acknowledging the message. The 
PubSubSource will only acknowledge a message on successful checkpoints to 
guarantee Atleast-Once. This does mean if the time between successful 
checkpoints is larger than the acknowledgment deadline of your subscription 
messages will most likely be processed multiple times.
+另一个可能的原因是超过了确认的截止时间。这是收到信息的间隔和信息确认的间隔。PubSubSource 
只有在信息被成功快照之后才会确认以保证至少一次的语义。这意味着,如果你的快照间隔大于信息确认的截止时间,那么你订阅的信息很有可能会被多次处理。
 
 Review comment:
   I doubt that the second "between" is added by mistake in the original 
English document:
   > This is the time between receiving the message and between 
acknowledging the message.
   
   If so, the translation would be:
   另一个可能的原因是超过了确认的截止时间,即收到与确认信息之间的时间间隔。



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] PatrickRen commented on a change in pull request #11151: [FLINK-16128][docs]Translate "Google Cloud PubSub" page into Chinese

2020-02-23 Thread GitBox
PatrickRen commented on a change in pull request #11151: 
[FLINK-16128][docs]Translate "Google Cloud PubSub" page into Chinese
URL: https://github.com/apache/flink/pull/11151#discussion_r383106065
 
 

 ##
 File path: docs/dev/connectors/pubsub.zh.md
 ##
 @@ -143,22 +135,22 @@ env.addSource(pubsubSource)
 
 
 
-### Atleast once guarantee
+### 精确一次语义保证
 
  SourceFunction
 
-There are several reasons why a message might be send multiple times, such as 
failure scenarios on Google PubSub's side.
+有很多原因导致会一个信息会被多次发出,例如 Google PubSub 的故障。
 
-Another reason is when the acknowledgement deadline has passed. This is the 
time between receiving the message and between acknowledging the message. The 
PubSubSource will only acknowledge a message on successful checkpoints to 
guarantee Atleast-Once. This does mean if the time between successful 
checkpoints is larger than the acknowledgment deadline of your subscription 
messages will most likely be processed multiple times.
+另一个可能的原因是超过了确认的截止时间。这是收到信息的间隔和信息确认的间隔。PubSubSource 
只有在信息被成功快照之后才会确认以保证至少一次的语义。这意味着,如果你的快照间隔大于信息确认的截止时间,那么你订阅的信息很有可能会被多次处理。
 
-For this reason it's recommended to have a (much) lower checkpoint interval 
than acknowledgement deadline.
+因此,我们建议把快照的间隔设置得比信息确认截止时间更短。
 
-See [PubSub](https://cloud.google.com/pubsub/docs/subscriber) for details on 
how to increase the acknowledgment deadline of your subscription.
+参照 [PubSub](https://cloud.google.com/pubsub/docs/subscriber) 来增加信息确认截止时间。
 
-Note: The metric `PubSubMessagesProcessedNotAcked` shows how many messages are 
waiting for the next checkpoint before they will be acknowledged.
+注意: `PubSubMessagesProcessedNotAcked` 显示了有多少信息正在等待下一个快照还没被确认。
 
 Review comment:
   I think we don't need to translate "checkpoint" according to Flink 
Translation Specifications


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] PatrickRen commented on a change in pull request #11151: [FLINK-16128][docs]Translate "Google Cloud PubSub" page into Chinese

2020-02-23 Thread GitBox
PatrickRen commented on a change in pull request #11151: 
[FLINK-16128][docs]Translate "Google Cloud PubSub" page into Chinese
URL: https://github.com/apache/flink/pull/11151#discussion_r383106631
 
 

 ##
 File path: docs/dev/connectors/pubsub.zh.md
 ##
 @@ -143,22 +135,22 @@ env.addSource(pubsubSource)
 
 
 
-### Atleast once guarantee
+### 精确一次语义保证
 
  SourceFunction
 
-There are several reasons why a message might be send multiple times, such as 
failure scenarios on Google PubSub's side.
+有很多原因导致会一个信息会被多次发出,例如 Google PubSub 的故障。
 
-Another reason is when the acknowledgement deadline has passed. This is the 
time between receiving the message and between acknowledging the message. The 
PubSubSource will only acknowledge a message on successful checkpoints to 
guarantee Atleast-Once. This does mean if the time between successful 
checkpoints is larger than the acknowledgment deadline of your subscription 
messages will most likely be processed multiple times.
+另一个可能的原因是超过了确认的截止时间。这是收到信息的间隔和信息确认的间隔。PubSubSource 
只有在信息被成功快照之后才会确认以保证至少一次的语义。这意味着,如果你的快照间隔大于信息确认的截止时间,那么你订阅的信息很有可能会被多次处理。
 
-For this reason it's recommended to have a (much) lower checkpoint interval 
than acknowledgement deadline.
+因此,我们建议把快照的间隔设置得比信息确认截止时间更短。
 
-See [PubSub](https://cloud.google.com/pubsub/docs/subscriber) for details on 
how to increase the acknowledgment deadline of your subscription.
+参照 [PubSub](https://cloud.google.com/pubsub/docs/subscriber) 来增加信息确认截止时间。
 
-Note: The metric `PubSubMessagesProcessedNotAcked` shows how many messages are 
waiting for the next checkpoint before they will be acknowledged.
+注意: `PubSubMessagesProcessedNotAcked` 显示了有多少信息正在等待下一个快照还没被确认。
 
  SinkFunction
 
-The sink function buffers messages that are to be send to PubSub for a short 
amount of time for performance reasons. Before each checkpoint this buffer is 
flushed and the checkpoint will not succeed unless the messages have been 
delivered to PubSub.
+Sink function 会把准备发到 PubSub 的信息短暂地缓存以提高性能。每次快照前,它会 flush 缓冲区,并且只有当所有信息成功发送到 
PubSub 之后,快照才会成功完成。
 
 Review comment:
   Same issue of "checkpoint" here. I think "flush" can be translated as "刷新". 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] PatrickRen commented on a change in pull request #11151: [FLINK-16128][docs]Translate "Google Cloud PubSub" page into Chinese

2020-02-23 Thread GitBox
PatrickRen commented on a change in pull request #11151: 
[FLINK-16128][docs]Translate "Google Cloud PubSub" page into Chinese
URL: https://github.com/apache/flink/pull/11151#discussion_r383107003
 
 

 ##
 File path: docs/dev/connectors/pubsub.md
 ##
 @@ -143,13 +143,13 @@ env.addSource(pubsubSource)
 
 
 
-### Atleast once guarantee
+### At least once guarantee
 
  SourceFunction
 
 There are several reasons why a message might be send multiple times, such as 
failure scenarios on Google PubSub's side.
 
-Another reason is when the acknowledgement deadline has passed. This is the 
time between receiving the message and between acknowledging the message. The 
PubSubSource will only acknowledge a message on successful checkpoints to 
guarantee Atleast-Once. This does mean if the time between successful 
checkpoints is larger than the acknowledgment deadline of your subscription 
messages will most likely be processed multiple times.
+Another reason is when the acknowledgement deadline has passed. This is the 
time between receiving the message and between acknowledging the message. The 
PubSubSource will only acknowledge a message on successful checkpoints to 
guarantee at-least-Once. This does mean if the time between successful 
checkpoints is larger than the acknowledgment deadline of your subscription 
messages will most likely be processed multiple times.
 
 Review comment:
   I think using lower case for all three words is better: "at-least-once"


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #11112: [FLINK-16121][python] Introduce ArrowReader and ArrowWriter for Arrow format data read and write

2020-02-23 Thread GitBox
flinkbot edited a comment on issue #2: [FLINK-16121][python] Introduce 
ArrowReader and ArrowWriter for Arrow format data read and write
URL: https://github.com/apache/flink/pull/2#issuecomment-587035354
 
 
   
   ## CI report:
   
   * dba261b36b56aedec766097042dbe40858e5fc8c Travis: 
[FAILURE](https://travis-ci.com/flink-ci/flink/builds/149292329) Azure: 
[SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5252)
 
   * fc594cbe4670795421281fd5d19cd5b3d8109a9e Travis: 
[PENDING](https://travis-ci.com/flink-ci/flink/builds/150247518) 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] liupc commented on issue #9703: [FLINK-14038]Add default GC options for flink on yarn to facilitate debugging

2020-02-23 Thread GitBox
liupc commented on issue #9703: [FLINK-14038]Add default GC options for flink 
on yarn to facilitate debugging
URL: https://github.com/apache/flink/pull/9703#issuecomment-590186785
 
 
   Thanks @tillrohrmann , I think your concern is right, and the docs #11165 
looks good! 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #11112: [FLINK-16121][python] Introduce ArrowReader and ArrowWriter for Arrow format data read and write

2020-02-23 Thread GitBox
flinkbot edited a comment on issue #2: [FLINK-16121][python] Introduce 
ArrowReader and ArrowWriter for Arrow format data read and write
URL: https://github.com/apache/flink/pull/2#issuecomment-587035354
 
 
   
   ## CI report:
   
   * dba261b36b56aedec766097042dbe40858e5fc8c Travis: 
[FAILURE](https://travis-ci.com/flink-ci/flink/builds/149292329) Azure: 
[SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5252)
 
   * fc594cbe4670795421281fd5d19cd5b3d8109a9e UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #11155: [FLINK-14818][benchmark] Fix receiving InputGate setup of StreamNetworkBenchmarkEnvironment.

2020-02-23 Thread GitBox
zhijiangW commented on a change in pull request #11155: 
[FLINK-14818][benchmark] Fix receiving InputGate setup of 
StreamNetworkBenchmarkEnvironment.
URL: https://github.com/apache/flink/pull/11155#discussion_r383102059
 
 

 ##
 File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java
 ##
 @@ -272,11 +287,161 @@ private static ShuffleDescriptor 
createShuffleDescriptor(
ResultPartitionID resultPartitionID,
ResourceID location,
TaskManagerLocation senderLocation,
-   int channel) {
+   int connectionIndex) {
final NettyShuffleDescriptorBuilder builder = 
NettyShuffleDescriptorBuilder.newBuilder()
.setId(resultPartitionID)
.setProducerInfoFromTaskManagerLocation(senderLocation)
-   .setConnectionIndex(channel);
+   .setConnectionIndex(connectionIndex);
return localMode ? 
builder.setProducerLocation(location).buildLocal() : builder.buildRemote();
}
+
+   /**
+* A {@link SingleInputGateFactory} which replaces the default {@link 
RemoteInputChannel} and
+* {@link LocalInputChannel} implementation with costume ones.
+*/
+   private static class TestSingleInputGateFactory extends 
SingleInputGateFactory {
+
+   private final ResourceID taskExecutorResourceId;
+   private final int partitionRequestInitialBackoff;
+   private final int partitionRequestMaxBackoff;
+   private final ConnectionManager connectionManager;
+   private final ResultPartitionManager partitionManager;
+   private final TaskEventPublisher taskEventPublisher;
+   private final NetworkBufferPool networkBufferPool;
+
+   public TestSingleInputGateFactory(
+   @Nonnull ResourceID taskExecutorResourceId,
+   @Nonnull NettyShuffleEnvironmentConfiguration 
networkConfig,
+   @Nonnull ConnectionManager connectionManager,
+   @Nonnull ResultPartitionManager 
partitionManager,
+   @Nonnull TaskEventPublisher taskEventPublisher,
+   @Nonnull NetworkBufferPool networkBufferPool) {
+   super(
+   taskExecutorResourceId,
+   networkConfig,
+   connectionManager,
+   partitionManager,
+   taskEventPublisher,
+   networkBufferPool);
+   this.networkBufferPool = networkBufferPool;
+   this.taskEventPublisher = taskEventPublisher;
+   this.partitionManager = partitionManager;
+   this.connectionManager = connectionManager;
+   this.partitionRequestMaxBackoff = 
networkConfig.partitionRequestMaxBackoff();
+   this.partitionRequestInitialBackoff = 
networkConfig.partitionRequestInitialBackoff();
+   this.taskExecutorResourceId = taskExecutorResourceId;
+   }
+
+   @Override
+   protected InputChannel createKnownInputChannel(
+   SingleInputGate inputGate,
+   int index,
+   NettyShuffleDescriptor inputChannelDescriptor,
+   ChannelStatistics channelStatistics,
+   InputChannelMetrics metrics) {
+   ResultPartitionID partitionId = 
inputChannelDescriptor.getResultPartitionID();
+   if 
(inputChannelDescriptor.isLocalTo(taskExecutorResourceId)) {
+   return new TestLocalInputChannel(
+   inputGate,
+   index,
+   partitionId,
+   partitionManager,
+   taskEventPublisher,
+   partitionRequestInitialBackoff,
+   partitionRequestMaxBackoff,
+   metrics);
+   } else {
+   return new TestRemoteInputChannel(
+   inputGate,
+   index,
+   partitionId,
+   
inputChannelDescriptor.getConnectionId(),
+   connectionManager,
+   partitionRequestInitialBackoff,
+

[GitHub] [flink] zhijiangW commented on a change in pull request #11155: [FLINK-14818][benchmark] Fix receiving InputGate setup of StreamNetworkBenchmarkEnvironment.

2020-02-23 Thread GitBox
zhijiangW commented on a change in pull request #11155: 
[FLINK-14818][benchmark] Fix receiving InputGate setup of 
StreamNetworkBenchmarkEnvironment.
URL: https://github.com/apache/flink/pull/11155#discussion_r383102071
 
 

 ##
 File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java
 ##
 @@ -272,11 +287,161 @@ private static ShuffleDescriptor 
createShuffleDescriptor(
ResultPartitionID resultPartitionID,
ResourceID location,
TaskManagerLocation senderLocation,
-   int channel) {
+   int connectionIndex) {
final NettyShuffleDescriptorBuilder builder = 
NettyShuffleDescriptorBuilder.newBuilder()
.setId(resultPartitionID)
.setProducerInfoFromTaskManagerLocation(senderLocation)
-   .setConnectionIndex(channel);
+   .setConnectionIndex(connectionIndex);
return localMode ? 
builder.setProducerLocation(location).buildLocal() : builder.buildRemote();
}
+
+   /**
+* A {@link SingleInputGateFactory} which replaces the default {@link 
RemoteInputChannel} and
+* {@link LocalInputChannel} implementation with costume ones.
+*/
+   private static class TestSingleInputGateFactory extends 
SingleInputGateFactory {
+
+   private final ResourceID taskExecutorResourceId;
+   private final int partitionRequestInitialBackoff;
+   private final int partitionRequestMaxBackoff;
+   private final ConnectionManager connectionManager;
+   private final ResultPartitionManager partitionManager;
+   private final TaskEventPublisher taskEventPublisher;
+   private final NetworkBufferPool networkBufferPool;
+
+   public TestSingleInputGateFactory(
+   @Nonnull ResourceID taskExecutorResourceId,
+   @Nonnull NettyShuffleEnvironmentConfiguration 
networkConfig,
+   @Nonnull ConnectionManager connectionManager,
+   @Nonnull ResultPartitionManager 
partitionManager,
+   @Nonnull TaskEventPublisher taskEventPublisher,
+   @Nonnull NetworkBufferPool networkBufferPool) {
+   super(
+   taskExecutorResourceId,
+   networkConfig,
+   connectionManager,
+   partitionManager,
+   taskEventPublisher,
+   networkBufferPool);
+   this.networkBufferPool = networkBufferPool;
+   this.taskEventPublisher = taskEventPublisher;
+   this.partitionManager = partitionManager;
+   this.connectionManager = connectionManager;
+   this.partitionRequestMaxBackoff = 
networkConfig.partitionRequestMaxBackoff();
+   this.partitionRequestInitialBackoff = 
networkConfig.partitionRequestInitialBackoff();
+   this.taskExecutorResourceId = taskExecutorResourceId;
+   }
+
+   @Override
+   protected InputChannel createKnownInputChannel(
+   SingleInputGate inputGate,
+   int index,
+   NettyShuffleDescriptor inputChannelDescriptor,
+   ChannelStatistics channelStatistics,
+   InputChannelMetrics metrics) {
+   ResultPartitionID partitionId = 
inputChannelDescriptor.getResultPartitionID();
+   if 
(inputChannelDescriptor.isLocalTo(taskExecutorResourceId)) {
+   return new TestLocalInputChannel(
+   inputGate,
+   index,
+   partitionId,
+   partitionManager,
+   taskEventPublisher,
+   partitionRequestInitialBackoff,
+   partitionRequestMaxBackoff,
+   metrics);
+   } else {
+   return new TestRemoteInputChannel(
+   inputGate,
+   index,
+   partitionId,
+   
inputChannelDescriptor.getConnectionId(),
+   connectionManager,
+   partitionRequestInitialBackoff,
+

[GitHub] [flink] dianfu commented on issue #11112: [FLINK-16121][python] Introduce ArrowReader and ArrowWriter for Arrow format data read and write

2020-02-23 Thread GitBox
dianfu commented on issue #2: [FLINK-16121][python] Introduce ArrowReader 
and ArrowWriter for Arrow format data read and write
URL: https://github.com/apache/flink/pull/2#issuecomment-590184765
 
 
   @hequn8128 Thanks a lot for your review. Have updated the PR accordingly. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] dianfu commented on a change in pull request #11112: [FLINK-16121][python] Introduce ArrowReader and ArrowWriter for Arrow format data read and write

2020-02-23 Thread GitBox
dianfu commented on a change in pull request #2: [FLINK-16121][python] 
Introduce ArrowReader and ArrowWriter for Arrow format data read and write
URL: https://github.com/apache/flink/pull/2#discussion_r383100703
 
 

 ##
 File path: 
flink-python/src/main/java/org/apache/flink/table/runtime/arrow/ArrowFieldReader.java
 ##
 @@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.arrow;
 
 Review comment:
   I recall that we have reached consensus that we should use functions.python 
instead of python.functions. However, I'm also not sure which one is best. What 
about discussing whether we should improve the package name in a separate 
thread? 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #11155: [FLINK-14818][benchmark] Fix receiving InputGate setup of StreamNetworkBenchmarkEnvironment.

2020-02-23 Thread GitBox
zhijiangW commented on a change in pull request #11155: 
[FLINK-14818][benchmark] Fix receiving InputGate setup of 
StreamNetworkBenchmarkEnvironment.
URL: https://github.com/apache/flink/pull/11155#discussion_r383090816
 
 

 ##
 File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java
 ##
 @@ -272,11 +287,161 @@ private static ShuffleDescriptor 
createShuffleDescriptor(
ResultPartitionID resultPartitionID,
ResourceID location,
TaskManagerLocation senderLocation,
-   int channel) {
+   int connectionIndex) {
final NettyShuffleDescriptorBuilder builder = 
NettyShuffleDescriptorBuilder.newBuilder()
.setId(resultPartitionID)
.setProducerInfoFromTaskManagerLocation(senderLocation)
-   .setConnectionIndex(channel);
+   .setConnectionIndex(connectionIndex);
return localMode ? 
builder.setProducerLocation(location).buildLocal() : builder.buildRemote();
}
+
+   /**
+* A {@link SingleInputGateFactory} which replaces the default {@link 
RemoteInputChannel} and
+* {@link LocalInputChannel} implementation with costume ones.
+*/
+   private static class TestSingleInputGateFactory extends 
SingleInputGateFactory {
+
+   private final ResourceID taskExecutorResourceId;
+   private final int partitionRequestInitialBackoff;
+   private final int partitionRequestMaxBackoff;
+   private final ConnectionManager connectionManager;
+   private final ResultPartitionManager partitionManager;
+   private final TaskEventPublisher taskEventPublisher;
+   private final NetworkBufferPool networkBufferPool;
+
+   public TestSingleInputGateFactory(
+   @Nonnull ResourceID taskExecutorResourceId,
+   @Nonnull NettyShuffleEnvironmentConfiguration 
networkConfig,
+   @Nonnull ConnectionManager connectionManager,
+   @Nonnull ResultPartitionManager 
partitionManager,
+   @Nonnull TaskEventPublisher taskEventPublisher,
+   @Nonnull NetworkBufferPool networkBufferPool) {
+   super(
+   taskExecutorResourceId,
+   networkConfig,
+   connectionManager,
+   partitionManager,
+   taskEventPublisher,
+   networkBufferPool);
+   this.networkBufferPool = networkBufferPool;
+   this.taskEventPublisher = taskEventPublisher;
+   this.partitionManager = partitionManager;
+   this.connectionManager = connectionManager;
+   this.partitionRequestMaxBackoff = 
networkConfig.partitionRequestMaxBackoff();
+   this.partitionRequestInitialBackoff = 
networkConfig.partitionRequestInitialBackoff();
+   this.taskExecutorResourceId = taskExecutorResourceId;
+   }
+
+   @Override
+   protected InputChannel createKnownInputChannel(
+   SingleInputGate inputGate,
+   int index,
+   NettyShuffleDescriptor inputChannelDescriptor,
+   ChannelStatistics channelStatistics,
+   InputChannelMetrics metrics) {
+   ResultPartitionID partitionId = 
inputChannelDescriptor.getResultPartitionID();
+   if 
(inputChannelDescriptor.isLocalTo(taskExecutorResourceId)) {
+   return new TestLocalInputChannel(
+   inputGate,
+   index,
+   partitionId,
+   partitionManager,
+   taskEventPublisher,
+   partitionRequestInitialBackoff,
+   partitionRequestMaxBackoff,
+   metrics);
+   } else {
+   return new TestRemoteInputChannel(
+   inputGate,
+   index,
+   partitionId,
+   
inputChannelDescriptor.getConnectionId(),
+   connectionManager,
+   partitionRequestInitialBackoff,
+

[GitHub] [flink] zhijiangW commented on a change in pull request #11155: [FLINK-14818][benchmark] Fix receiving InputGate setup of StreamNetworkBenchmarkEnvironment.

2020-02-23 Thread GitBox
zhijiangW commented on a change in pull request #11155: 
[FLINK-14818][benchmark] Fix receiving InputGate setup of 
StreamNetworkBenchmarkEnvironment.
URL: https://github.com/apache/flink/pull/11155#discussion_r383090260
 
 

 ##
 File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java
 ##
 @@ -272,11 +287,161 @@ private static ShuffleDescriptor 
createShuffleDescriptor(
ResultPartitionID resultPartitionID,
ResourceID location,
TaskManagerLocation senderLocation,
-   int channel) {
+   int connectionIndex) {
final NettyShuffleDescriptorBuilder builder = 
NettyShuffleDescriptorBuilder.newBuilder()
.setId(resultPartitionID)
.setProducerInfoFromTaskManagerLocation(senderLocation)
-   .setConnectionIndex(channel);
+   .setConnectionIndex(connectionIndex);
return localMode ? 
builder.setProducerLocation(location).buildLocal() : builder.buildRemote();
}
+
+   /**
+* A {@link SingleInputGateFactory} which replaces the default {@link 
RemoteInputChannel} and
+* {@link LocalInputChannel} implementation with costume ones.
+*/
+   private static class TestSingleInputGateFactory extends 
SingleInputGateFactory {
+
+   private final ResourceID taskExecutorResourceId;
+   private final int partitionRequestInitialBackoff;
+   private final int partitionRequestMaxBackoff;
+   private final ConnectionManager connectionManager;
+   private final ResultPartitionManager partitionManager;
+   private final TaskEventPublisher taskEventPublisher;
+   private final NetworkBufferPool networkBufferPool;
+
+   public TestSingleInputGateFactory(
+   @Nonnull ResourceID taskExecutorResourceId,
+   @Nonnull NettyShuffleEnvironmentConfiguration 
networkConfig,
+   @Nonnull ConnectionManager connectionManager,
+   @Nonnull ResultPartitionManager 
partitionManager,
+   @Nonnull TaskEventPublisher taskEventPublisher,
+   @Nonnull NetworkBufferPool networkBufferPool) {
+   super(
+   taskExecutorResourceId,
+   networkConfig,
+   connectionManager,
+   partitionManager,
+   taskEventPublisher,
+   networkBufferPool);
+   this.networkBufferPool = networkBufferPool;
+   this.taskEventPublisher = taskEventPublisher;
+   this.partitionManager = partitionManager;
+   this.connectionManager = connectionManager;
+   this.partitionRequestMaxBackoff = 
networkConfig.partitionRequestMaxBackoff();
+   this.partitionRequestInitialBackoff = 
networkConfig.partitionRequestInitialBackoff();
+   this.taskExecutorResourceId = taskExecutorResourceId;
+   }
+
+   @Override
+   protected InputChannel createKnownInputChannel(
+   SingleInputGate inputGate,
+   int index,
+   NettyShuffleDescriptor inputChannelDescriptor,
+   ChannelStatistics channelStatistics,
+   InputChannelMetrics metrics) {
+   ResultPartitionID partitionId = 
inputChannelDescriptor.getResultPartitionID();
+   if 
(inputChannelDescriptor.isLocalTo(taskExecutorResourceId)) {
+   return new TestLocalInputChannel(
+   inputGate,
+   index,
+   partitionId,
+   partitionManager,
+   taskEventPublisher,
+   partitionRequestInitialBackoff,
+   partitionRequestMaxBackoff,
+   metrics);
+   } else {
+   return new TestRemoteInputChannel(
+   inputGate,
+   index,
+   partitionId,
+   
inputChannelDescriptor.getConnectionId(),
+   connectionManager,
+   partitionRequestInitialBackoff,
+

[GitHub] [flink] zhijiangW commented on a change in pull request #11155: [FLINK-14818][benchmark] Fix receiving InputGate setup of StreamNetworkBenchmarkEnvironment.

2020-02-23 Thread GitBox
zhijiangW commented on a change in pull request #11155: 
[FLINK-14818][benchmark] Fix receiving InputGate setup of 
StreamNetworkBenchmarkEnvironment.
URL: https://github.com/apache/flink/pull/11155#discussion_r383090336
 
 

 ##
 File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/SingleInputGateBenchmarkFactory.java
 ##
 @@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io.benchmark;
+
+import org.apache.flink.core.memory.MemorySegmentProvider;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.io.network.ConnectionID;
+import org.apache.flink.runtime.io.network.ConnectionManager;
+import org.apache.flink.runtime.io.network.TaskEventPublisher;
+import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
+import org.apache.flink.runtime.io.network.metrics.InputChannelMetrics;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
+import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
+import 
org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel;
+import 
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
+import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
+import 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateFactory;
+import org.apache.flink.runtime.shuffle.NettyShuffleDescriptor;
+import 
org.apache.flink.runtime.taskmanager.NettyShuffleEnvironmentConfiguration;
+
+import java.io.IOException;
+
+/**
+ * A benchmark-specific input gate factory which overrides the respective 
methods of creating
+ * {@link RemoteInputChannel} and {@link LocalInputChannel} for requesting 
specific subpartitions.
+ */
+public class SingleInputGateBenchmarkFactory extends SingleInputGateFactory {
+
+   public SingleInputGateBenchmarkFactory(
+   ResourceID taskExecutorResourceId,
+   NettyShuffleEnvironmentConfiguration networkConfig,
+   ConnectionManager connectionManager,
+   ResultPartitionManager partitionManager,
+   TaskEventPublisher taskEventPublisher,
+   NetworkBufferPool networkBufferPool) {
+   super(
+   taskExecutorResourceId,
+   networkConfig,
+   connectionManager,
+   partitionManager,
+   taskEventPublisher,
+   networkBufferPool);
+   }
+
+   @Override
+   protected InputChannel createKnownInputChannel(
+   SingleInputGate inputGate,
+   int index,
+   NettyShuffleDescriptor inputChannelDescriptor,
+   SingleInputGateFactory.ChannelStatistics 
channelStatistics,
+   InputChannelMetrics metrics) {
+   ResultPartitionID partitionId = 
inputChannelDescriptor.getResultPartitionID();
+   if (inputChannelDescriptor.isLocalTo(taskExecutorResourceId)) {
+   return new TestLocalInputChannel(
+   inputGate,
+   index,
+   partitionId,
+   partitionManager,
+   taskEventPublisher,
+   partitionRequestInitialBackoff,
+   partitionRequestMaxBackoff,
+   metrics);
+   } else {
+   return new TestRemoteInputChannel(
+   inputGate,
+   index,
+   partitionId,
+   inputChannelDescriptor.getConnectionId(),
+   connectionManager,
+   partitionRequestInitialBackoff,
+   partitionRequestMaxBackoff,
+   

[GitHub] [flink] zhijiangW commented on a change in pull request #11155: [FLINK-14818][benchmark] Fix receiving InputGate setup of StreamNetworkBenchmarkEnvironment.

2020-02-23 Thread GitBox
zhijiangW commented on a change in pull request #11155: 
[FLINK-14818][benchmark] Fix receiving InputGate setup of 
StreamNetworkBenchmarkEnvironment.
URL: https://github.com/apache/flink/pull/11155#discussion_r383094730
 
 

 ##
 File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/SingleInputGateBenchmarkFactory.java
 ##
 @@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io.benchmark;
+
+import org.apache.flink.core.memory.MemorySegmentProvider;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.io.network.ConnectionID;
+import org.apache.flink.runtime.io.network.ConnectionManager;
+import org.apache.flink.runtime.io.network.TaskEventPublisher;
+import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
+import org.apache.flink.runtime.io.network.metrics.InputChannelMetrics;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
+import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
+import 
org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel;
+import 
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
+import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
+import 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateFactory;
+import org.apache.flink.runtime.shuffle.NettyShuffleDescriptor;
+import 
org.apache.flink.runtime.taskmanager.NettyShuffleEnvironmentConfiguration;
+
+import java.io.IOException;
+
+/**
+ * A benchmark-specific input gate factory which overrides the respective 
methods of creating
+ * {@link RemoteInputChannel} and {@link LocalInputChannel} for requesting 
specific subpartitions.
+ */
+public class SingleInputGateBenchmarkFactory extends SingleInputGateFactory {
+
+   public SingleInputGateBenchmarkFactory(
+   ResourceID taskExecutorResourceId,
+   NettyShuffleEnvironmentConfiguration networkConfig,
+   ConnectionManager connectionManager,
+   ResultPartitionManager partitionManager,
+   TaskEventPublisher taskEventPublisher,
+   NetworkBufferPool networkBufferPool) {
+   super(
+   taskExecutorResourceId,
+   networkConfig,
+   connectionManager,
+   partitionManager,
+   taskEventPublisher,
+   networkBufferPool);
+   }
+
+   @Override
+   protected InputChannel createKnownInputChannel(
+   SingleInputGate inputGate,
+   int index,
+   NettyShuffleDescriptor inputChannelDescriptor,
+   SingleInputGateFactory.ChannelStatistics 
channelStatistics,
+   InputChannelMetrics metrics) {
+   ResultPartitionID partitionId = 
inputChannelDescriptor.getResultPartitionID();
+   if (inputChannelDescriptor.isLocalTo(taskExecutorResourceId)) {
+   return new TestLocalInputChannel(
+   inputGate,
+   index,
+   partitionId,
+   partitionManager,
+   taskEventPublisher,
+   partitionRequestInitialBackoff,
+   partitionRequestMaxBackoff,
+   metrics);
+   } else {
+   return new TestRemoteInputChannel(
+   inputGate,
+   index,
+   partitionId,
+   inputChannelDescriptor.getConnectionId(),
+   connectionManager,
+   partitionRequestInitialBackoff,
+   partitionRequestMaxBackoff,
+   

[GitHub] [flink] zhijiangW commented on a change in pull request #11155: [FLINK-14818][benchmark] Fix receiving InputGate setup of StreamNetworkBenchmarkEnvironment.

2020-02-23 Thread GitBox
zhijiangW commented on a change in pull request #11155: 
[FLINK-14818][benchmark] Fix receiving InputGate setup of 
StreamNetworkBenchmarkEnvironment.
URL: https://github.com/apache/flink/pull/11155#discussion_r383094754
 
 

 ##
 File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/SingleInputGateBenchmarkFactory.java
 ##
 @@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io.benchmark;
+
+import org.apache.flink.core.memory.MemorySegmentProvider;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.io.network.ConnectionID;
+import org.apache.flink.runtime.io.network.ConnectionManager;
+import org.apache.flink.runtime.io.network.TaskEventPublisher;
+import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
+import org.apache.flink.runtime.io.network.metrics.InputChannelMetrics;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
+import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
+import 
org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel;
+import 
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
+import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
+import 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateFactory;
+import org.apache.flink.runtime.shuffle.NettyShuffleDescriptor;
+import 
org.apache.flink.runtime.taskmanager.NettyShuffleEnvironmentConfiguration;
+
+import java.io.IOException;
+
+/**
+ * A benchmark-specific input gate factory which overrides the respective 
methods of creating
+ * {@link RemoteInputChannel} and {@link LocalInputChannel} for requesting 
specific subpartitions.
+ */
+public class SingleInputGateBenchmarkFactory extends SingleInputGateFactory {
+
+   public SingleInputGateBenchmarkFactory(
+   ResourceID taskExecutorResourceId,
+   NettyShuffleEnvironmentConfiguration networkConfig,
+   ConnectionManager connectionManager,
+   ResultPartitionManager partitionManager,
+   TaskEventPublisher taskEventPublisher,
+   NetworkBufferPool networkBufferPool) {
+   super(
+   taskExecutorResourceId,
+   networkConfig,
+   connectionManager,
+   partitionManager,
+   taskEventPublisher,
+   networkBufferPool);
+   }
+
+   @Override
+   protected InputChannel createKnownInputChannel(
+   SingleInputGate inputGate,
+   int index,
+   NettyShuffleDescriptor inputChannelDescriptor,
+   SingleInputGateFactory.ChannelStatistics 
channelStatistics,
+   InputChannelMetrics metrics) {
+   ResultPartitionID partitionId = 
inputChannelDescriptor.getResultPartitionID();
+   if (inputChannelDescriptor.isLocalTo(taskExecutorResourceId)) {
+   return new TestLocalInputChannel(
+   inputGate,
+   index,
+   partitionId,
+   partitionManager,
+   taskEventPublisher,
+   partitionRequestInitialBackoff,
+   partitionRequestMaxBackoff,
+   metrics);
+   } else {
+   return new TestRemoteInputChannel(
+   inputGate,
+   index,
+   partitionId,
+   inputChannelDescriptor.getConnectionId(),
+   connectionManager,
+   partitionRequestInitialBackoff,
+   partitionRequestMaxBackoff,
+   

[GitHub] [flink] zhijiangW commented on a change in pull request #11155: [FLINK-14818][benchmark] Fix receiving InputGate setup of StreamNetworkBenchmarkEnvironment.

2020-02-23 Thread GitBox
zhijiangW commented on a change in pull request #11155: 
[FLINK-14818][benchmark] Fix receiving InputGate setup of 
StreamNetworkBenchmarkEnvironment.
URL: https://github.com/apache/flink/pull/11155#discussion_r383090816
 
 

 ##
 File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java
 ##
 @@ -272,11 +287,161 @@ private static ShuffleDescriptor 
createShuffleDescriptor(
ResultPartitionID resultPartitionID,
ResourceID location,
TaskManagerLocation senderLocation,
-   int channel) {
+   int connectionIndex) {
final NettyShuffleDescriptorBuilder builder = 
NettyShuffleDescriptorBuilder.newBuilder()
.setId(resultPartitionID)
.setProducerInfoFromTaskManagerLocation(senderLocation)
-   .setConnectionIndex(channel);
+   .setConnectionIndex(connectionIndex);
return localMode ? 
builder.setProducerLocation(location).buildLocal() : builder.buildRemote();
}
+
+   /**
+* A {@link SingleInputGateFactory} which replaces the default {@link 
RemoteInputChannel} and
+* {@link LocalInputChannel} implementation with costume ones.
+*/
+   private static class TestSingleInputGateFactory extends 
SingleInputGateFactory {
+
+   private final ResourceID taskExecutorResourceId;
+   private final int partitionRequestInitialBackoff;
+   private final int partitionRequestMaxBackoff;
+   private final ConnectionManager connectionManager;
+   private final ResultPartitionManager partitionManager;
+   private final TaskEventPublisher taskEventPublisher;
+   private final NetworkBufferPool networkBufferPool;
+
+   public TestSingleInputGateFactory(
+   @Nonnull ResourceID taskExecutorResourceId,
+   @Nonnull NettyShuffleEnvironmentConfiguration 
networkConfig,
+   @Nonnull ConnectionManager connectionManager,
+   @Nonnull ResultPartitionManager 
partitionManager,
+   @Nonnull TaskEventPublisher taskEventPublisher,
+   @Nonnull NetworkBufferPool networkBufferPool) {
+   super(
+   taskExecutorResourceId,
+   networkConfig,
+   connectionManager,
+   partitionManager,
+   taskEventPublisher,
+   networkBufferPool);
+   this.networkBufferPool = networkBufferPool;
+   this.taskEventPublisher = taskEventPublisher;
+   this.partitionManager = partitionManager;
+   this.connectionManager = connectionManager;
+   this.partitionRequestMaxBackoff = 
networkConfig.partitionRequestMaxBackoff();
+   this.partitionRequestInitialBackoff = 
networkConfig.partitionRequestInitialBackoff();
+   this.taskExecutorResourceId = taskExecutorResourceId;
+   }
+
+   @Override
+   protected InputChannel createKnownInputChannel(
+   SingleInputGate inputGate,
+   int index,
+   NettyShuffleDescriptor inputChannelDescriptor,
+   ChannelStatistics channelStatistics,
+   InputChannelMetrics metrics) {
+   ResultPartitionID partitionId = 
inputChannelDescriptor.getResultPartitionID();
+   if 
(inputChannelDescriptor.isLocalTo(taskExecutorResourceId)) {
+   return new TestLocalInputChannel(
+   inputGate,
+   index,
+   partitionId,
+   partitionManager,
+   taskEventPublisher,
+   partitionRequestInitialBackoff,
+   partitionRequestMaxBackoff,
+   metrics);
+   } else {
+   return new TestRemoteInputChannel(
+   inputGate,
+   index,
+   partitionId,
+   
inputChannelDescriptor.getConnectionId(),
+   connectionManager,
+   partitionRequestInitialBackoff,
+

[GitHub] [flink] zhijiangW commented on a change in pull request #11155: [FLINK-14818][benchmark] Fix receiving InputGate setup of StreamNetworkBenchmarkEnvironment.

2020-02-23 Thread GitBox
zhijiangW commented on a change in pull request #11155: 
[FLINK-14818][benchmark] Fix receiving InputGate setup of 
StreamNetworkBenchmarkEnvironment.
URL: https://github.com/apache/flink/pull/11155#discussion_r383090336
 
 

 ##
 File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/SingleInputGateBenchmarkFactory.java
 ##
 @@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io.benchmark;
+
+import org.apache.flink.core.memory.MemorySegmentProvider;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.io.network.ConnectionID;
+import org.apache.flink.runtime.io.network.ConnectionManager;
+import org.apache.flink.runtime.io.network.TaskEventPublisher;
+import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
+import org.apache.flink.runtime.io.network.metrics.InputChannelMetrics;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
+import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
+import 
org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel;
+import 
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
+import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
+import 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateFactory;
+import org.apache.flink.runtime.shuffle.NettyShuffleDescriptor;
+import 
org.apache.flink.runtime.taskmanager.NettyShuffleEnvironmentConfiguration;
+
+import java.io.IOException;
+
+/**
+ * A benchmark-specific input gate factory which overrides the respective 
methods of creating
+ * {@link RemoteInputChannel} and {@link LocalInputChannel} for requesting 
specific subpartitions.
+ */
+public class SingleInputGateBenchmarkFactory extends SingleInputGateFactory {
+
+   public SingleInputGateBenchmarkFactory(
+   ResourceID taskExecutorResourceId,
+   NettyShuffleEnvironmentConfiguration networkConfig,
+   ConnectionManager connectionManager,
+   ResultPartitionManager partitionManager,
+   TaskEventPublisher taskEventPublisher,
+   NetworkBufferPool networkBufferPool) {
+   super(
+   taskExecutorResourceId,
+   networkConfig,
+   connectionManager,
+   partitionManager,
+   taskEventPublisher,
+   networkBufferPool);
+   }
+
+   @Override
+   protected InputChannel createKnownInputChannel(
+   SingleInputGate inputGate,
+   int index,
+   NettyShuffleDescriptor inputChannelDescriptor,
+   SingleInputGateFactory.ChannelStatistics 
channelStatistics,
+   InputChannelMetrics metrics) {
+   ResultPartitionID partitionId = 
inputChannelDescriptor.getResultPartitionID();
+   if (inputChannelDescriptor.isLocalTo(taskExecutorResourceId)) {
+   return new TestLocalInputChannel(
+   inputGate,
+   index,
+   partitionId,
+   partitionManager,
+   taskEventPublisher,
+   partitionRequestInitialBackoff,
+   partitionRequestMaxBackoff,
+   metrics);
+   } else {
+   return new TestRemoteInputChannel(
+   inputGate,
+   index,
+   partitionId,
+   inputChannelDescriptor.getConnectionId(),
+   connectionManager,
+   partitionRequestInitialBackoff,
+   partitionRequestMaxBackoff,
+   

[GitHub] [flink] zhijiangW commented on a change in pull request #11155: [FLINK-14818][benchmark] Fix receiving InputGate setup of StreamNetworkBenchmarkEnvironment.

2020-02-23 Thread GitBox
zhijiangW commented on a change in pull request #11155: 
[FLINK-14818][benchmark] Fix receiving InputGate setup of 
StreamNetworkBenchmarkEnvironment.
URL: https://github.com/apache/flink/pull/11155#discussion_r383090260
 
 

 ##
 File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java
 ##
 @@ -272,11 +287,161 @@ private static ShuffleDescriptor 
createShuffleDescriptor(
ResultPartitionID resultPartitionID,
ResourceID location,
TaskManagerLocation senderLocation,
-   int channel) {
+   int connectionIndex) {
final NettyShuffleDescriptorBuilder builder = 
NettyShuffleDescriptorBuilder.newBuilder()
.setId(resultPartitionID)
.setProducerInfoFromTaskManagerLocation(senderLocation)
-   .setConnectionIndex(channel);
+   .setConnectionIndex(connectionIndex);
return localMode ? 
builder.setProducerLocation(location).buildLocal() : builder.buildRemote();
}
+
+   /**
+* A {@link SingleInputGateFactory} which replaces the default {@link 
RemoteInputChannel} and
+* {@link LocalInputChannel} implementation with costume ones.
+*/
+   private static class TestSingleInputGateFactory extends 
SingleInputGateFactory {
+
+   private final ResourceID taskExecutorResourceId;
+   private final int partitionRequestInitialBackoff;
+   private final int partitionRequestMaxBackoff;
+   private final ConnectionManager connectionManager;
+   private final ResultPartitionManager partitionManager;
+   private final TaskEventPublisher taskEventPublisher;
+   private final NetworkBufferPool networkBufferPool;
+
+   public TestSingleInputGateFactory(
+   @Nonnull ResourceID taskExecutorResourceId,
+   @Nonnull NettyShuffleEnvironmentConfiguration 
networkConfig,
+   @Nonnull ConnectionManager connectionManager,
+   @Nonnull ResultPartitionManager 
partitionManager,
+   @Nonnull TaskEventPublisher taskEventPublisher,
+   @Nonnull NetworkBufferPool networkBufferPool) {
+   super(
+   taskExecutorResourceId,
+   networkConfig,
+   connectionManager,
+   partitionManager,
+   taskEventPublisher,
+   networkBufferPool);
+   this.networkBufferPool = networkBufferPool;
+   this.taskEventPublisher = taskEventPublisher;
+   this.partitionManager = partitionManager;
+   this.connectionManager = connectionManager;
+   this.partitionRequestMaxBackoff = 
networkConfig.partitionRequestMaxBackoff();
+   this.partitionRequestInitialBackoff = 
networkConfig.partitionRequestInitialBackoff();
+   this.taskExecutorResourceId = taskExecutorResourceId;
+   }
+
+   @Override
+   protected InputChannel createKnownInputChannel(
+   SingleInputGate inputGate,
+   int index,
+   NettyShuffleDescriptor inputChannelDescriptor,
+   ChannelStatistics channelStatistics,
+   InputChannelMetrics metrics) {
+   ResultPartitionID partitionId = 
inputChannelDescriptor.getResultPartitionID();
+   if 
(inputChannelDescriptor.isLocalTo(taskExecutorResourceId)) {
+   return new TestLocalInputChannel(
+   inputGate,
+   index,
+   partitionId,
+   partitionManager,
+   taskEventPublisher,
+   partitionRequestInitialBackoff,
+   partitionRequestMaxBackoff,
+   metrics);
+   } else {
+   return new TestRemoteInputChannel(
+   inputGate,
+   index,
+   partitionId,
+   
inputChannelDescriptor.getConnectionId(),
+   connectionManager,
+   partitionRequestInitialBackoff,
+

[GitHub] [flink] zhijiangW commented on a change in pull request #11155: [FLINK-14818][benchmark] Fix receiving InputGate setup of StreamNetworkBenchmarkEnvironment.

2020-02-23 Thread GitBox
zhijiangW commented on a change in pull request #11155: 
[FLINK-14818][benchmark] Fix receiving InputGate setup of 
StreamNetworkBenchmarkEnvironment.
URL: https://github.com/apache/flink/pull/11155#discussion_r383088089
 
 

 ##
 File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java
 ##
 @@ -272,11 +287,161 @@ private static ShuffleDescriptor 
createShuffleDescriptor(
ResultPartitionID resultPartitionID,
ResourceID location,
TaskManagerLocation senderLocation,
-   int channel) {
+   int connectionIndex) {
final NettyShuffleDescriptorBuilder builder = 
NettyShuffleDescriptorBuilder.newBuilder()
.setId(resultPartitionID)
.setProducerInfoFromTaskManagerLocation(senderLocation)
-   .setConnectionIndex(channel);
+   .setConnectionIndex(connectionIndex);
return localMode ? 
builder.setProducerLocation(location).buildLocal() : builder.buildRemote();
}
+
+   /**
+* A {@link SingleInputGateFactory} which replaces the default {@link 
RemoteInputChannel} and
+* {@link LocalInputChannel} implementation with costume ones.
+*/
+   private static class TestSingleInputGateFactory extends 
SingleInputGateFactory {
+
+   private final ResourceID taskExecutorResourceId;
+   private final int partitionRequestInitialBackoff;
+   private final int partitionRequestMaxBackoff;
+   private final ConnectionManager connectionManager;
+   private final ResultPartitionManager partitionManager;
+   private final TaskEventPublisher taskEventPublisher;
+   private final NetworkBufferPool networkBufferPool;
+
+   public TestSingleInputGateFactory(
+   @Nonnull ResourceID taskExecutorResourceId,
+   @Nonnull NettyShuffleEnvironmentConfiguration 
networkConfig,
+   @Nonnull ConnectionManager connectionManager,
+   @Nonnull ResultPartitionManager 
partitionManager,
+   @Nonnull TaskEventPublisher taskEventPublisher,
+   @Nonnull NetworkBufferPool networkBufferPool) {
+   super(
+   taskExecutorResourceId,
+   networkConfig,
+   connectionManager,
+   partitionManager,
+   taskEventPublisher,
+   networkBufferPool);
+   this.networkBufferPool = networkBufferPool;
+   this.taskEventPublisher = taskEventPublisher;
+   this.partitionManager = partitionManager;
+   this.connectionManager = connectionManager;
+   this.partitionRequestMaxBackoff = 
networkConfig.partitionRequestMaxBackoff();
+   this.partitionRequestInitialBackoff = 
networkConfig.partitionRequestInitialBackoff();
+   this.taskExecutorResourceId = taskExecutorResourceId;
+   }
+
+   @Override
+   protected InputChannel createKnownInputChannel(
+   SingleInputGate inputGate,
+   int index,
+   NettyShuffleDescriptor inputChannelDescriptor,
+   ChannelStatistics channelStatistics,
+   InputChannelMetrics metrics) {
+   ResultPartitionID partitionId = 
inputChannelDescriptor.getResultPartitionID();
+   if 
(inputChannelDescriptor.isLocalTo(taskExecutorResourceId)) {
+   return new TestLocalInputChannel(
+   inputGate,
+   index,
+   partitionId,
+   partitionManager,
+   taskEventPublisher,
+   partitionRequestInitialBackoff,
+   partitionRequestMaxBackoff,
+   metrics);
+   } else {
+   return new TestRemoteInputChannel(
+   inputGate,
+   index,
+   partitionId,
+   
inputChannelDescriptor.getConnectionId(),
+   connectionManager,
+   partitionRequestInitialBackoff,
+

[GitHub] [flink] zhijiangW commented on issue #11155: [FLINK-14818][benchmark] Fix receiving InputGate setup of StreamNetworkBenchmarkEnvironment.

2020-02-23 Thread GitBox
zhijiangW commented on issue #11155: [FLINK-14818][benchmark] Fix receiving 
InputGate setup of StreamNetworkBenchmarkEnvironment.
URL: https://github.com/apache/flink/pull/11155#issuecomment-590170854
 
 
   Thanks for the updates @wsry and it generally looks good to me now, just 
left two tiny comments.
   As for the previous main concern, I left some suggestions 
[here](https://github.com/apache/flink/pull/11155#discussion_r383088089).
   
   After you providing the micro-benchmark results, I would also execute it in 
our benchmark machine to convince us.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #11155: [FLINK-14818][benchmark] Fix receiving InputGate setup of StreamNetworkBenchmarkEnvironment.

2020-02-23 Thread GitBox
zhijiangW commented on a change in pull request #11155: 
[FLINK-14818][benchmark] Fix receiving InputGate setup of 
StreamNetworkBenchmarkEnvironment.
URL: https://github.com/apache/flink/pull/11155#discussion_r383088311
 
 

 ##
 File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/SingleInputGateBenchmarkFactory.java
 ##
 @@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io.benchmark;
+
+import org.apache.flink.core.memory.MemorySegmentProvider;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.io.network.ConnectionID;
+import org.apache.flink.runtime.io.network.ConnectionManager;
+import org.apache.flink.runtime.io.network.TaskEventPublisher;
+import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
+import org.apache.flink.runtime.io.network.metrics.InputChannelMetrics;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
+import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
+import 
org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel;
+import 
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
+import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
+import 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateFactory;
+import org.apache.flink.runtime.shuffle.NettyShuffleDescriptor;
+import 
org.apache.flink.runtime.taskmanager.NettyShuffleEnvironmentConfiguration;
+
+import java.io.IOException;
+
+/**
+ * A benchmark-specific input gate factory which overrides the respective 
methods of creating
+ * {@link RemoteInputChannel} and {@link LocalInputChannel} for requesting 
specific subpartitions.
+ */
+public class SingleInputGateBenchmarkFactory extends SingleInputGateFactory {
+
+   public SingleInputGateBenchmarkFactory(
+   ResourceID taskExecutorResourceId,
+   NettyShuffleEnvironmentConfiguration networkConfig,
+   ConnectionManager connectionManager,
+   ResultPartitionManager partitionManager,
+   TaskEventPublisher taskEventPublisher,
+   NetworkBufferPool networkBufferPool) {
+   super(
+   taskExecutorResourceId,
+   networkConfig,
+   connectionManager,
+   partitionManager,
+   taskEventPublisher,
+   networkBufferPool);
+   }
+
+   @Override
+   protected InputChannel createKnownInputChannel(
+   SingleInputGate inputGate,
+   int index,
+   NettyShuffleDescriptor inputChannelDescriptor,
+   SingleInputGateFactory.ChannelStatistics 
channelStatistics,
+   InputChannelMetrics metrics) {
+   ResultPartitionID partitionId = 
inputChannelDescriptor.getResultPartitionID();
+   if (inputChannelDescriptor.isLocalTo(taskExecutorResourceId)) {
+   return new TestLocalInputChannel(
+   inputGate,
+   index,
+   partitionId,
+   partitionManager,
+   taskEventPublisher,
+   partitionRequestInitialBackoff,
+   partitionRequestMaxBackoff,
+   metrics);
+   } else {
+   return new TestRemoteInputChannel(
+   inputGate,
+   index,
+   partitionId,
+   inputChannelDescriptor.getConnectionId(),
+   connectionManager,
+   partitionRequestInitialBackoff,
+   partitionRequestMaxBackoff,
+   

[GitHub] [flink] zhijiangW commented on a change in pull request #11155: [FLINK-14818][benchmark] Fix receiving InputGate setup of StreamNetworkBenchmarkEnvironment.

2020-02-23 Thread GitBox
zhijiangW commented on a change in pull request #11155: 
[FLINK-14818][benchmark] Fix receiving InputGate setup of 
StreamNetworkBenchmarkEnvironment.
URL: https://github.com/apache/flink/pull/11155#discussion_r383088089
 
 

 ##
 File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java
 ##
 @@ -272,11 +287,161 @@ private static ShuffleDescriptor 
createShuffleDescriptor(
ResultPartitionID resultPartitionID,
ResourceID location,
TaskManagerLocation senderLocation,
-   int channel) {
+   int connectionIndex) {
final NettyShuffleDescriptorBuilder builder = 
NettyShuffleDescriptorBuilder.newBuilder()
.setId(resultPartitionID)
.setProducerInfoFromTaskManagerLocation(senderLocation)
-   .setConnectionIndex(channel);
+   .setConnectionIndex(connectionIndex);
return localMode ? 
builder.setProducerLocation(location).buildLocal() : builder.buildRemote();
}
+
+   /**
+* A {@link SingleInputGateFactory} which replaces the default {@link 
RemoteInputChannel} and
+* {@link LocalInputChannel} implementation with costume ones.
+*/
+   private static class TestSingleInputGateFactory extends 
SingleInputGateFactory {
+
+   private final ResourceID taskExecutorResourceId;
+   private final int partitionRequestInitialBackoff;
+   private final int partitionRequestMaxBackoff;
+   private final ConnectionManager connectionManager;
+   private final ResultPartitionManager partitionManager;
+   private final TaskEventPublisher taskEventPublisher;
+   private final NetworkBufferPool networkBufferPool;
+
+   public TestSingleInputGateFactory(
+   @Nonnull ResourceID taskExecutorResourceId,
+   @Nonnull NettyShuffleEnvironmentConfiguration 
networkConfig,
+   @Nonnull ConnectionManager connectionManager,
+   @Nonnull ResultPartitionManager 
partitionManager,
+   @Nonnull TaskEventPublisher taskEventPublisher,
+   @Nonnull NetworkBufferPool networkBufferPool) {
+   super(
+   taskExecutorResourceId,
+   networkConfig,
+   connectionManager,
+   partitionManager,
+   taskEventPublisher,
+   networkBufferPool);
+   this.networkBufferPool = networkBufferPool;
+   this.taskEventPublisher = taskEventPublisher;
+   this.partitionManager = partitionManager;
+   this.connectionManager = connectionManager;
+   this.partitionRequestMaxBackoff = 
networkConfig.partitionRequestMaxBackoff();
+   this.partitionRequestInitialBackoff = 
networkConfig.partitionRequestInitialBackoff();
+   this.taskExecutorResourceId = taskExecutorResourceId;
+   }
+
+   @Override
+   protected InputChannel createKnownInputChannel(
+   SingleInputGate inputGate,
+   int index,
+   NettyShuffleDescriptor inputChannelDescriptor,
+   ChannelStatistics channelStatistics,
+   InputChannelMetrics metrics) {
+   ResultPartitionID partitionId = 
inputChannelDescriptor.getResultPartitionID();
+   if 
(inputChannelDescriptor.isLocalTo(taskExecutorResourceId)) {
+   return new TestLocalInputChannel(
+   inputGate,
+   index,
+   partitionId,
+   partitionManager,
+   taskEventPublisher,
+   partitionRequestInitialBackoff,
+   partitionRequestMaxBackoff,
+   metrics);
+   } else {
+   return new TestRemoteInputChannel(
+   inputGate,
+   index,
+   partitionId,
+   
inputChannelDescriptor.getConnectionId(),
+   connectionManager,
+   partitionRequestInitialBackoff,
+

[GitHub] [flink] Jiayi-Liao commented on a change in pull request #11179: [FLINK-16178][FLINK-16192][checkpointing] Clean up checkpoint metadata code and remove remaining bits of "legacy state"

2020-02-23 Thread GitBox
Jiayi-Liao commented on a change in pull request #11179: 
[FLINK-16178][FLINK-16192][checkpointing] Clean up checkpoint metadata code and 
remove remaining bits of "legacy state"
URL: https://github.com/apache/flink/pull/11179#discussion_r383065779
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointSerializer.java
 ##
 @@ -52,6 +37,5 @@
 * @return The deserialized savepoint
 * @throws IOException Serialization failures are forwarded
 */
-   T deserialize(DataInputStream dis, ClassLoader userCodeClassLoader) 
throws IOException;
-
+   SavepointV2 deserialize(DataInputStream dis, ClassLoader 
userCodeClassLoader) throws IOException;
 
 Review comment:
   You're right. I was misguided by the previous interface. Would it be better 
if we change the function name to "deserializeToLatestVersion" ?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #11155: [FLINK-14818][benchmark] Fix receiving InputGate setup of StreamNetworkBenchmarkEnvironment.

2020-02-23 Thread GitBox
zhijiangW commented on a change in pull request #11155: 
[FLINK-14818][benchmark] Fix receiving InputGate setup of 
StreamNetworkBenchmarkEnvironment.
URL: https://github.com/apache/flink/pull/11155#discussion_r383081286
 
 

 ##
 File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java
 ##
 @@ -158,6 +157,10 @@ public void tearDown() {
suppressExceptions(receiverEnv::close);
}
 
+   /**
+* Note: It should be guaranteed that {@link 
#createResultPartitionWriter(int)} has been called before
+* creating the receiver.
 
 Review comment:
   nit: might supplement the reason why we should guarantee this. 
   E.g. `Otherwise it might cause unexpected behaviors when {@link 
PartitionNotFoundException} happens in {@link TestRemoteInputChannel}`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Assigned] (FLINK-15289) Run sql appear error of "Zero-length character strings have no serializable string representation".

2020-02-23 Thread Jingsong Lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15289?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jingsong Lee reassigned FLINK-15289:


Assignee: (was: Jingsong Lee)

> Run sql appear error of "Zero-length character strings have no serializable 
> string representation".
> ---
>
> Key: FLINK-15289
> URL: https://issues.apache.org/jira/browse/FLINK-15289
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Client
>Affects Versions: 1.10.0
>Reporter: xiaojin.wy
>Priority: Major
> Fix For: 1.11.0
>
>
> *The sql is:*
>  CREATE TABLE `INT8_TBL` (
>  q1 BIGINT,
>  q2 BIGINT
>  ) WITH (
>  'format.field-delimiter'=',',
>  'connector.type'='filesystem',
>  'format.derive-schema'='true',
>  
> 'connector.path'='/defender_test_data/daily_regression_batch_postgres_1.10/test_bigint/sources/INT8_TBL.csv',
>  'format.type'='csv'
>  );
> SELECT '' AS five, q1 AS plus, -q1 AS xm FROM INT8_TBL;
> *The error detail is :*
>  2019-12-17 15:35:07,026 ERROR org.apache.flink.table.client.SqlClient - SQL 
> Client must stop. Unexpected exception. This is a bug. Please consider filing 
> an issue.
>  org.apache.flink.table.api.TableException: Zero-length character strings 
> have no serializable string representation.
>  at 
> org.apache.flink.table.types.logical.CharType.asSerializableString(CharType.java:116)
>  at 
> org.apache.flink.table.descriptors.DescriptorProperties.putTableSchema(DescriptorProperties.java:218)
>  at 
> org.apache.flink.table.catalog.CatalogTableImpl.toProperties(CatalogTableImpl.java:75)
>  at 
> org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSink(TableFactoryUtil.java:85)
>  at 
> org.apache.flink.table.client.gateway.local.LocalExecutor.executeQueryAndPersistInternal(LocalExecutor.java:688)
>  at 
> org.apache.flink.table.client.gateway.local.LocalExecutor.executeQueryAndPersist(LocalExecutor.java:488)
>  at org.apache.flink.table.client.cli.CliClient.callSelect(CliClient.java:601)
>  at 
> org.apache.flink.table.client.cli.CliClient.callCommand(CliClient.java:385)
>  at java.util.Optional.ifPresent(Optional.java:159)
>  at 
> org.apache.flink.table.client.cli.CliClient.submitSQLFile(CliClient.java:271)
>  at org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:125)
>  at org.apache.flink.table.client.SqlClient.start(SqlClient.java:104)
>  at org.apache.flink.table.client.SqlClient.main(SqlClient.java:180)
> *The input data is:*
>  123,456
>  123,4567890123456789
>  4567890123456789,123
>  4567890123456789,4567890123456789
>  4567890123456789,-4567890123456789



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (FLINK-14257) Integrate csv to FileSystemTableFactory

2020-02-23 Thread Jingsong Lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14257?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jingsong Lee reassigned FLINK-14257:


Assignee: (was: Jingsong Lee)

> Integrate csv to FileSystemTableFactory
> ---
>
> Key: FLINK-14257
> URL: https://issues.apache.org/jira/browse/FLINK-14257
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / FileSystem
>Reporter: Jingsong Lee
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (FLINK-15397) Streaming and batch has different value in the case of count function

2020-02-23 Thread Jingsong Lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15397?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jingsong Lee reassigned FLINK-15397:


Assignee: (was: Jingsong Lee)

> Streaming and batch has different value in the case of count function
> -
>
> Key: FLINK-15397
> URL: https://issues.apache.org/jira/browse/FLINK-15397
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Client
>Affects Versions: 1.10.0
>Reporter: xiaojin.wy
>Priority: Major
> Fix For: 1.11.0
>
>
> *The sql is:*
> CREATE TABLE `testdata` (
>   a INT,
>   b INT
> ) WITH (
>   
> 'connector.path'='/defender_test_data/daily_regression_batch_spark_1.10/test_group_agg/sources/testdata.csv',
>   'format.empty-column-as-null'='true',
>   'format.field-delimiter'='|',
>   'connector.type'='filesystem',
>   'format.derive-schema'='true',
>   'format.type'='csv'
> );
> SELECT COUNT(1) FROM testdata WHERE false;
> If the configuration's type is batch ,the result will be 0, but if the 
> configuration is streaming, there will be no value;
> *The configuration is:*
> execution:
>   planner: blink
>   type: streaming
> *The input data is:*
> {code:java}
> 1|1
> 1|2
> 2|1
> 2|2
> 3|1
> 3|2
> |1
> 3|
> |
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-15407) Add document to explain how to write a table with PK

2020-02-23 Thread Jingsong Lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15407?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jingsong Lee closed FLINK-15407.

Resolution: Won't Fix

> Add document to explain how to write a table with PK
> 
>
> Key: FLINK-15407
> URL: https://issues.apache.org/jira/browse/FLINK-15407
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation, Table SQL / API
>Reporter: Jingsong Lee
>Assignee: Jingsong Lee
>Priority: Major
> Fix For: 1.11.0
>
>
> I have had several user problems:
> Why is an error reported when writing the upsertsink: TableException: 
> UpsertStreamTableSink requires that Table has a full primary keys if it is 
> updated.
> Users are confused.
> I think we can consider writing a document to describe it.
> User need careful like:
>  
> {code:java}
> insert into result_table select pk1, if(pk2 is null, '', pk2) as pk2, 
> count(*), sum(f3) from source group by pk1, pk2; {code}
> This will failed.
>  
> {code:java}
> insert into result_table select pk1, pk2, count(*), sum(f1) from (select pk1, 
> if(pk2 is null, '', pk2) as pk2, f1 from source) group by pk1, pk2; 
> {code}
> This can work.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (FLINK-14258) Integrate hive to FileSystemTableFactory

2020-02-23 Thread Jingsong Lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14258?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jingsong Lee reassigned FLINK-14258:


Assignee: (was: Jingsong Lee)

> Integrate hive to FileSystemTableFactory
> 
>
> Key: FLINK-14258
> URL: https://issues.apache.org/jira/browse/FLINK-14258
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Hive
>Reporter: Jingsong Lee
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (FLINK-14256) Introduce FileSystemTableFactory with partitioned support

2020-02-23 Thread Jingsong Lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14256?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jingsong Lee reassigned FLINK-14256:


Assignee: (was: Jingsong Lee)

> Introduce FileSystemTableFactory with partitioned support
> -
>
> Key: FLINK-14256
> URL: https://issues.apache.org/jira/browse/FLINK-14256
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / Planner
>Reporter: Jingsong Lee
>Priority: Major
>
> Introduce FileSystemTableFactory to unify all file system connectors.
> FileSystemTableFactory use FileSystemInputFormatFactory to get the format 
> reader.
> FileSystemTableFactory use FileSystemOutputFormatFactory to get the format 
> writer.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15407) Add document to explain how to write a table with PK

2020-02-23 Thread Jingsong Lee (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15407?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17043152#comment-17043152
 ] 

Jingsong Lee commented on FLINK-15407:
--

We should have better design with changelog support to this, close this one.

> Add document to explain how to write a table with PK
> 
>
> Key: FLINK-15407
> URL: https://issues.apache.org/jira/browse/FLINK-15407
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation, Table SQL / API
>Reporter: Jingsong Lee
>Assignee: Jingsong Lee
>Priority: Major
> Fix For: 1.11.0
>
>
> I have had several user problems:
> Why is an error reported when writing the upsertsink: TableException: 
> UpsertStreamTableSink requires that Table has a full primary keys if it is 
> updated.
> Users are confused.
> I think we can consider writing a document to describe it.
> User need careful like:
>  
> {code:java}
> insert into result_table select pk1, if(pk2 is null, '', pk2) as pk2, 
> count(*), sum(f3) from source group by pk1, pk2; {code}
> This will failed.
>  
> {code:java}
> insert into result_table select pk1, pk2, count(*), sum(f1) from (select pk1, 
> if(pk2 is null, '', pk2) as pk2, f1 from source) group by pk1, pk2; 
> {code}
> This can work.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (FLINK-15960) support creating Hive tables, views, functions within Flink

2020-02-23 Thread Jingsong Lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15960?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jingsong Lee reassigned FLINK-15960:


Assignee: Rui Li  (was: Jingsong Lee)

> support creating Hive tables, views, functions within Flink
> ---
>
> Key: FLINK-15960
> URL: https://issues.apache.org/jira/browse/FLINK-15960
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Hive
>Reporter: Bowen Li
>Assignee: Rui Li
>Priority: Major
> Fix For: 1.11.0
>
>
> support creating Hive tables, views, functions within Flink, to achieve 
> higher interoperability between Flink and Hive, and not requiring users to 
> switch between Flink and Hive CLIs.
> Have heard such requests from multiple Flink-Hive users
>  
> cc [~ykt836] [~lirui]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (FLINK-15255) document how to create Hive table from java API and DDL

2020-02-23 Thread Jingsong Lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15255?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jingsong Lee reassigned FLINK-15255:


Assignee: Rui Li  (was: Jingsong Lee)

> document how to create Hive table from java API and DDL
> ---
>
> Key: FLINK-15255
> URL: https://issues.apache.org/jira/browse/FLINK-15255
> Project: Flink
>  Issue Type: Task
>  Components: Documentation
>Reporter: Bowen Li
>Assignee: Rui Li
>Priority: Major
>
> documentation Jira for  FLINK-15960



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (FLINK-15809) component stack page needs to be updated for blink planner

2020-02-23 Thread Jingsong Lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15809?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jingsong Lee reassigned FLINK-15809:


Assignee: (was: Jingsong Lee)

> component stack page needs to be updated for blink planner
> --
>
> Key: FLINK-15809
> URL: https://issues.apache.org/jira/browse/FLINK-15809
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Reporter: Bowen Li
>Priority: Critical
> Fix For: 1.10.1, 1.11.0
>
>
> [https://ci.apache.org/projects/flink/flink-docs-master/internals/components.html]
> needs to be updated to reflect latest stack components
>  
> cc [~ykt836] [~jark]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (FLINK-16027) kafka connector's 'connector.topic' property should be optional rather than required

2020-02-23 Thread Jingsong Lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16027?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jingsong Lee reassigned FLINK-16027:


Assignee: (was: Jingsong Lee)

> kafka connector's 'connector.topic' property should be optional rather than 
> required
> 
>
> Key: FLINK-16027
> URL: https://issues.apache.org/jira/browse/FLINK-16027
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kafka
>Reporter: Bowen Li
>Priority: Major
> Fix For: 1.11.0
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (FLINK-16024) support filter pushdown in jdbc connector

2020-02-23 Thread Jingsong Lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16024?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jingsong Lee reassigned FLINK-16024:


Assignee: (was: Jingsong Lee)

> support filter pushdown in jdbc connector
> -
>
> Key: FLINK-16024
> URL: https://issues.apache.org/jira/browse/FLINK-16024
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / JDBC
>Reporter: Bowen Li
>Priority: Major
> Fix For: 1.11.0
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (FLINK-13777) Introduce sql function wrappers and conversion to ExpressionConverter

2020-02-23 Thread Jingsong Lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-13777?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jingsong Lee reassigned FLINK-13777:


Assignee: (was: Jingsong Lee)

> Introduce sql function wrappers and conversion to ExpressionConverter
> -
>
> Key: FLINK-13777
> URL: https://issues.apache.org/jira/browse/FLINK-13777
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Planner
>Reporter: Jingsong Lee
>Priority: Major
>
> For remove the extended calcite sql functions, we introduce wrappers to wrap 
> flink FunctionDefinition:
> 1.Add SqlReturnTypeInferenceWrapper to wrap TypeStrategy
> 2.Add SqlOperandTypeInferenceWrapper to wrap InputTypeStrategy
> 3.Add SqlOperandTypeCheckerWrapper to wrap InputTypeValidator
> 4.Add SqlFunctionWrapper to wrap SqlFunction
> 5.Add SqlFunctionWrapper converter and Standard sql converter



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (FLINK-13113) Introduce range partition in blink

2020-02-23 Thread Jingsong Lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-13113?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jingsong Lee reassigned FLINK-13113:


Assignee: (was: Jingsong Lee)

> Introduce range partition in blink
> --
>
> Key: FLINK-13113
> URL: https://issues.apache.org/jira/browse/FLINK-13113
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / Runtime
>Reporter: Jingsong Lee
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (FLINK-14676) Introduce parallelism inference for source

2020-02-23 Thread Jingsong Lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14676?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jingsong Lee reassigned FLINK-14676:


Assignee: (was: Jingsong Lee)

> Introduce parallelism inference for source
> --
>
> Key: FLINK-14676
> URL: https://issues.apache.org/jira/browse/FLINK-14676
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Planner
>Reporter: Jingsong Lee
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> FLINK-12801 has introduce parallelism setting for table, but because 
> TableSource generate DataStream, maybe DataStream is not a real source, that 
> will lead to some shuffle errors. So FLINK-13494 remove these implementations.
> In this ticket, I would like to introduce parallelism inference only for 
> InputFormatTableSource, the RowCount of InputFormatTableSource is more 
> accurate than downstream stages. It is worth to automatically generate its 
> parallelism.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (FLINK-13786) Implement type inference for other functions

2020-02-23 Thread Jingsong Lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-13786?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jingsong Lee reassigned FLINK-13786:


Assignee: (was: Jingsong Lee)

> Implement type inference for other functions
> 
>
> Key: FLINK-13786
> URL: https://issues.apache.org/jira/browse/FLINK-13786
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: Jingsong Lee
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (FLINK-13782) Implement type inference for logic functions

2020-02-23 Thread Jingsong Lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-13782?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jingsong Lee reassigned FLINK-13782:


Assignee: (was: Jingsong Lee)

> Implement type inference for logic functions
> 
>
> Key: FLINK-13782
> URL: https://issues.apache.org/jira/browse/FLINK-13782
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: Jingsong Lee
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (FLINK-13783) Implement type inference for string functions

2020-02-23 Thread Jingsong Lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-13783?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jingsong Lee reassigned FLINK-13783:


Assignee: (was: Jingsong Lee)

> Implement type inference for string functions
> -
>
> Key: FLINK-13783
> URL: https://issues.apache.org/jira/browse/FLINK-13783
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: Jingsong Lee
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (FLINK-13780) Introduce ExpressionConverter to legacy planner

2020-02-23 Thread Jingsong Lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-13780?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jingsong Lee reassigned FLINK-13780:


Assignee: (was: Jingsong Lee)

> Introduce ExpressionConverter to legacy planner
> ---
>
> Key: FLINK-13780
> URL: https://issues.apache.org/jira/browse/FLINK-13780
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Legacy Planner
>Reporter: Jingsong Lee
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (FLINK-13781) Use new Expression in RexNodeToExpressionConverter

2020-02-23 Thread Jingsong Lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-13781?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jingsong Lee reassigned FLINK-13781:


Assignee: (was: Jingsong Lee)

> Use new Expression in RexNodeToExpressionConverter
> --
>
> Key: FLINK-13781
> URL: https://issues.apache.org/jira/browse/FLINK-13781
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Legacy Planner
>Reporter: Jingsong Lee
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (FLINK-13785) Implement type inference for time functions

2020-02-23 Thread Jingsong Lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-13785?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jingsong Lee reassigned FLINK-13785:


Assignee: (was: Jingsong Lee)

> Implement type inference for time functions
> ---
>
> Key: FLINK-13785
> URL: https://issues.apache.org/jira/browse/FLINK-13785
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: Jingsong Lee
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (FLINK-13784) Implement type inference for math functions

2020-02-23 Thread Jingsong Lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-13784?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jingsong Lee reassigned FLINK-13784:


Assignee: (was: Jingsong Lee)

> Implement type inference for math functions
> ---
>
> Key: FLINK-13784
> URL: https://issues.apache.org/jira/browse/FLINK-13784
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: Jingsong Lee
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (FLINK-13773) Rework of the Expression Design

2020-02-23 Thread Jingsong Lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-13773?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jingsong Lee reassigned FLINK-13773:


Assignee: (was: Jingsong Lee)

> Rework of the Expression Design
> ---
>
> Key: FLINK-13773
> URL: https://issues.apache.org/jira/browse/FLINK-13773
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / API, Table SQL / Legacy Planner, Table SQL / 
> Planner
>Reporter: Jingsong Lee
>Priority: Major
>
> This JIRA addresses several shortcomings of current:
>    - New Expressions still use PlannerExpressions to type inference and
> to RexNode. Flnk-planner and blink-planner have a lot of repetitive code
>  and logic.
>    - Let TableApi and Cacite definitions consistent.
>    - Reduce the complexity of Function development.
>    - Powerful Function for user.
>  
> Key changes can be summarized as follows:
>    - Improve the interface of FunctionDefinition.
>    - Introduce type inference for built-in functions.
>    - Introduce ExpressionConverter to convert Expression to calcite
> RexNode.
>    - Remove repetitive code and logic in planners.
>  
> Details: 
> [https://cwiki.apache.org/confluence/display/FLINK/FLIP-51%3A+Rework+of+the+Expression+Design]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-16253) Switch to Log4j 2 by default for flink-kubernetes submodule

2020-02-23 Thread Canbin Zheng (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16253?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17043148#comment-17043148
 ] 

Canbin Zheng commented on FLINK-16253:
--

cc [~chesnay]

> Switch to Log4j 2 by default for flink-kubernetes submodule
> ---
>
> Key: FLINK-16253
> URL: https://issues.apache.org/jira/browse/FLINK-16253
> Project: Flink
>  Issue Type: Improvement
>  Components: Deployment / Kubernetes
>Reporter: Canbin Zheng
>Priority: Major
> Fix For: 1.11.0
>
>
> Switch to Log4j 2 by default for flink-kubernetes submodule, including the 
> script and the container startup command or parameters.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-16253) Switch to Log4j 2 by default for flink-kubernetes submodule

2020-02-23 Thread Canbin Zheng (Jira)
Canbin Zheng created FLINK-16253:


 Summary: Switch to Log4j 2 by default for flink-kubernetes 
submodule
 Key: FLINK-16253
 URL: https://issues.apache.org/jira/browse/FLINK-16253
 Project: Flink
  Issue Type: Improvement
  Components: Deployment / Kubernetes
Reporter: Canbin Zheng
 Fix For: 1.11.0


Switch to Log4j 2 by default for flink-kubernetes submodule, including the 
script and the container startup command or parameters.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15786) Load connector code with separate classloader

2020-02-23 Thread Yang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15786?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17043146#comment-17043146
 ] 

Yang Wang commented on FLINK-15786:
---

[~maguowei] Since the filesystem could be well supported via plugin mechanism. 
So do you mean to use the same way to load the connectors? BTW, i think the 
metrics reporter is in the similar situation.

> Load connector code with separate classloader
> -
>
> Key: FLINK-15786
> URL: https://issues.apache.org/jira/browse/FLINK-15786
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Task
>Reporter: Guowei Ma
>Priority: Major
>  Labels: usability
>
> Currently, connector code can be seen as part of user code. Usually, users 
> only need to add the corresponding connector as a dependency and package it 
> in the user jar. This is convenient enough.
> However, connectors usually need to interact with external systems and often 
> introduce heavy dependencies, there is a high possibility of a class conflict 
> of different connectors or the user code of the same job. For example, every 
> one or two weeks, we will receive issue reports relevant with connector class 
> conflict from our users. The problem can get worse when users want to analyze 
> data from different sources and write output to different sinks.
> Using separate classloader to load the different connector code could resolve 
> the problem.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot edited a comment on issue #11176: [FLINK-15948][yarn] Enrich the warning log for YARN minimum allocation memory

2020-02-23 Thread GitBox
flinkbot edited a comment on issue #11176: [FLINK-15948][yarn] Enrich the 
warning log for YARN minimum allocation memory
URL: https://github.com/apache/flink/pull/11176#issuecomment-589671142
 
 
   
   ## CI report:
   
   * 754693bae424353bac4cdd6d0db556ed8f6234f9 Travis: 
[SUCCESS](https://travis-ci.com/flink-ci/flink/builds/150231364) Azure: 
[SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5494)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] KarmaGYZ commented on issue #11197: [FLINK-16288][travis] Remove redundant double-quote at the end of the…

2020-02-23 Thread GitBox
KarmaGYZ commented on issue #11197: [FLINK-16288][travis] Remove redundant 
double-quote at the end of the…
URL: https://github.com/apache/flink/pull/11197#issuecomment-590157417
 
 
   Travis link https://travis-ci.org/KarmaGYZ/flink/jobs/654212375.
   I don't know why the "e2e - container - scala 2.12" keep failing because of 
timeout in "flink-connector-gcp-pubsub" and "flink-azure-fs-hadoop" module. But 
it seems not related to this PR.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] lirui-apache commented on issue #11175: [FLINK-16197][hive] Failed to query partitioned table when partition …

2020-02-23 Thread GitBox
lirui-apache commented on issue #11175: [FLINK-16197][hive] Failed to query 
partitioned table when partition …
URL: https://github.com/apache/flink/pull/11175#issuecomment-590155149
 
 
   Hi @bowenli86 , could you elaborate why it's not a good longterm strategy? 
IMHO, it should be up to each connector to decide how to handle discrepancy 
between metadata and storage. Because such discrepancies might be treated 
differently in the external systems. One system may consider the discrepancy as 
a fatal error, and another system may expect the discrepancy to happen from 
time to time and choose to tolerate it. Therefore I think each connector should 
follow the behavior of the external system it connects to.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wangyang0918 commented on issue #11117: [FLINK-16115][filesystem] Make aliyun oss filesystem could work with plugin mechanism

2020-02-23 Thread GitBox
wangyang0918 commented on issue #7: [FLINK-16115][filesystem] Make aliyun 
oss filesystem could work with plugin mechanism
URL: https://github.com/apache/flink/pull/7#issuecomment-590153300
 
 
   @zentol Could you have another look and help with merging?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-16215) Start redundant TaskExecutor when JM failed

2020-02-23 Thread Xintong Song (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16215?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17043137#comment-17043137
 ] 

Xintong Song commented on FLINK-16215:
--

I share [~trohrmann]'s concern.

On Yarn deployment, {{YarnResourceManager}} starts a {{TaskExecutor}} in two 
steps.
1.  Requests a container from Yarn.
2. Launch the {{TaskExecutor}} process inside the allocated container.
If the JM failover happens between the two steps, the container will be 
recovered but no {{TaskExecutor}} will be started inside it. 

I think it is a problem that for such a container, neither a {{TaskExecutor}} 
will be started in it, nor will it be released. This might be solved by 
FLINK-13554, with a timeout for starting new {{TaskExecutor}}s. We can apply 
this timeout to recovered containers as well.

FYI, the Kubernetes deployment does not have this problem, because the 
pod/container is allocated and {{TaskExecutor}} is started in one step.

> Start redundant TaskExecutor when JM failed
> ---
>
> Key: FLINK-16215
> URL: https://issues.apache.org/jira/browse/FLINK-16215
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.10.0
>Reporter: YufeiLiu
>Priority: Major
>
> TaskExecutor will reconnect to the new ResourceManager leader when JM failed, 
> and JobMaster will restart and reschedule job. If job slot request arrive 
> earlier than TM registration, RM will start new workers rather than reuse the 
> existing TMs.
> It‘s hard to reproduce becasue TM registration usually come first, and 
> timeout check will stop redundant TMs. 
> But I think it would be better if we make the {{recoverWokerNode}} to 
> interface, and put recovered slots in {{pendingSlots}} wait for TM 
> reconnection.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-16215) Start redundant TaskExecutor when JM failed

2020-02-23 Thread Yang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16215?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17043134#comment-17043134
 ] 

Yang Wang commented on FLINK-16215:
---

I think even we make {{recoverWokerNode}} as interface and do the recovery 
before slot request coming, we still could not completely avoid this problem. 
Since there is no guarantee that we could get all the previous containers from 
the recovery process. Some other containers may also be returned via the 
subsequent heartbeat.

Maybe the {{JobMaster}} should be aware of the failover and could recover the 
running from {{TaskManager}}. If it fails with timeout, then allocate a new 
slot from {{ResourceManager}}. It is just a rough thought. Please correct me if 
i am wrong.

 

> Start redundant TaskExecutor when JM failed
> ---
>
> Key: FLINK-16215
> URL: https://issues.apache.org/jira/browse/FLINK-16215
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.10.0
>Reporter: YufeiLiu
>Priority: Major
>
> TaskExecutor will reconnect to the new ResourceManager leader when JM failed, 
> and JobMaster will restart and reschedule job. If job slot request arrive 
> earlier than TM registration, RM will start new workers rather than reuse the 
> existing TMs.
> It‘s hard to reproduce becasue TM registration usually come first, and 
> timeout check will stop redundant TMs. 
> But I think it would be better if we make the {{recoverWokerNode}} to 
> interface, and put recovered slots in {{pendingSlots}} wait for TM 
> reconnection.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] Jiayi-Liao commented on a change in pull request #11179: [FLINK-16178][FLINK-16192][checkpointing] Clean up checkpoint metadata code and remove remaining bits of "legacy state"

2020-02-23 Thread GitBox
Jiayi-Liao commented on a change in pull request #11179: 
[FLINK-16178][FLINK-16192][checkpointing] Clean up checkpoint metadata code and 
remove remaining bits of "legacy state"
URL: https://github.com/apache/flink/pull/11179#discussion_r383065779
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointSerializer.java
 ##
 @@ -52,6 +37,5 @@
 * @return The deserialized savepoint
 * @throws IOException Serialization failures are forwarded
 */
-   T deserialize(DataInputStream dis, ClassLoader userCodeClassLoader) 
throws IOException;
-
+   SavepointV2 deserialize(DataInputStream dis, ClassLoader 
userCodeClassLoader) throws IOException;
 
 Review comment:
   You're right. I was misguided by the previous interface. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #11176: [FLINK-15948][yarn] Enrich the warning log for YARN minimum allocation memory

2020-02-23 Thread GitBox
flinkbot edited a comment on issue #11176: [FLINK-15948][yarn] Enrich the 
warning log for YARN minimum allocation memory
URL: https://github.com/apache/flink/pull/11176#issuecomment-589671142
 
 
   
   ## CI report:
   
   * adb2e9e62190ba505ee86554a97452984fbc2de4 Travis: 
[SUCCESS](https://travis-ci.com/flink-ci/flink/builds/150018467) Azure: 
[FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5423)
 
   * 754693bae424353bac4cdd6d0db556ed8f6234f9 Travis: 
[PENDING](https://travis-ci.com/flink-ci/flink/builds/150231364) 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] danny0405 commented on issue #11190: [FLINK-16089][docs] Translate "Data Type" page of "Table API & SQL" into Chinese

2020-02-23 Thread GitBox
danny0405 commented on issue #11190: [FLINK-16089][docs] Translate "Data Type" 
page of "Table API & SQL" into Chinese
URL: https://github.com/apache/flink/pull/11190#issuecomment-590146367
 
 
   @wuchong , sure, let me take the review work.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] danny0405 commented on a change in pull request #11116: [FLINK-15349] add 'create catalog' DDL to blink planner

2020-02-23 Thread GitBox
danny0405 commented on a change in pull request #6: [FLINK-15349] add 
'create catalog' DDL to blink planner
URL: https://github.com/apache/flink/pull/6#discussion_r383064990
 
 

 ##
 File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
 ##
 @@ -663,6 +665,15 @@ public void sqlUpdate(String stmt) {
DropTempSystemFunctionOperation 
dropTempSystemFunctionOperation =
(DropTempSystemFunctionOperation) operation;
dropSystemFunction(dropTempSystemFunctionOperation);
+   } else if (operation instanceof CreateCatalogOperation) {
+   CreateCatalogOperation createCatalogOperation = 
(CreateCatalogOperation) operation;
+   String exMsg = 
getDDLOpExecuteErrorMsg(createCatalogOperation.asSummaryString());
+   try {
+   catalogManager.registerCatalog(
+   
createCatalogOperation.getCatalogName(), createCatalogOperation.getCatalog());
+   } catch (CatalogException e) {
+   throw new ValidationException(exMsg, e);
+   }
 
 Review comment:
   Why not throw `CatalogException ` directly, does all the failures come from 
a validation ?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xintongsong commented on a change in pull request #11176: [FLINK-15948][yarn] Enrich the warning log for YARN minimum allocation memory

2020-02-23 Thread GitBox
xintongsong commented on a change in pull request #11176: [FLINK-15948][yarn] 
Enrich the warning log for YARN minimum allocation memory
URL: https://github.com/apache/flink/pull/11176#discussion_r383064789
 
 

 ##
 File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
 ##
 @@ -521,11 +521,18 @@ private ClusterSpecification validateClusterResources(
int jobManagerMemoryMb = 
clusterSpecification.getMasterMemoryMB();
final int taskManagerMemoryMb = 
clusterSpecification.getTaskManagerMemoryMB();
 
-   if (jobManagerMemoryMb < yarnMinAllocationMB || 
taskManagerMemoryMb < yarnMinAllocationMB) {
-   LOG.warn("The JobManager or TaskManager memory is below 
the smallest possible YARN Container size. "
-   + "The value of 
'yarn.scheduler.minimum-allocation-mb' is '" + yarnMinAllocationMB + "'. Please 
increase the memory size." +
-   "YARN will allocate the smaller 
containers but the scheduler will account for the minimum-allocation-mb, maybe 
not all instances " +
-   "you requested will start.");
+   final List noteMem = new ArrayList<>();
+   if (jobManagerMemoryMb % yarnMinAllocationMB != 0 || 
jobManagerMemoryMb == 0) {
+   noteMem.add("JobManager memory(" + jobManagerMemoryMb + 
")");
+   }
+   if (taskManagerMemoryMb % yarnMinAllocationMB != 0 || 
taskManagerMemoryMb == 0) {
+   noteMem.add("TaskManager memory(" + taskManagerMemoryMb 
+ ")");
+   }
+   if (noteMem.size() > 0) {
+   LOG.warn("The {} is not a multiple of YARN minimum 
allocation memory({}), so some extra memory will be wasted. "
+   + "Because YARN will always normalize the 
resource request by insuring that the requested memory is a multiple "
+   + "of minimum allocation. The value of 
'yarn.scheduler.minimum-allocation-mb' is '" + yarnMinAllocationMB + "'. "
+   + "Please increase the specified memory size.", 
noteMem.toString(), yarnMinAllocationMB);
 
 Review comment:
   I would suggest to have a more concise log message. The current one mentions 
"multiple of YARN min allocation" and the configured value of 
yarnMinAllocationMB twice each, which is not necessary.
   
   I would suggest the following:
   `The configured {} memory is {} MB. YARN will allocate {} MB to make up an 
integer multiple of its minimum allocation memory ({} MB, configured via 
'yarn.scheduler.minimum-allocation-mb'). The extra {} MB may not be used by 
Flink.`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xintongsong commented on a change in pull request #11176: [FLINK-15948][yarn] Enrich the warning log for YARN minimum allocation memory

2020-02-23 Thread GitBox
xintongsong commented on a change in pull request #11176: [FLINK-15948][yarn] 
Enrich the warning log for YARN minimum allocation memory
URL: https://github.com/apache/flink/pull/11176#discussion_r383064879
 
 

 ##
 File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
 ##
 @@ -521,11 +521,18 @@ private ClusterSpecification validateClusterResources(
int jobManagerMemoryMb = 
clusterSpecification.getMasterMemoryMB();
final int taskManagerMemoryMb = 
clusterSpecification.getTaskManagerMemoryMB();
 
-   if (jobManagerMemoryMb < yarnMinAllocationMB || 
taskManagerMemoryMb < yarnMinAllocationMB) {
-   LOG.warn("The JobManager or TaskManager memory is below 
the smallest possible YARN Container size. "
-   + "The value of 
'yarn.scheduler.minimum-allocation-mb' is '" + yarnMinAllocationMB + "'. Please 
increase the memory size." +
-   "YARN will allocate the smaller 
containers but the scheduler will account for the minimum-allocation-mb, maybe 
not all instances " +
-   "you requested will start.");
+   final List noteMem = new ArrayList<>();
+   if (jobManagerMemoryMb % yarnMinAllocationMB != 0 || 
jobManagerMemoryMb == 0) {
+   noteMem.add("JobManager memory(" + jobManagerMemoryMb + 
")");
+   }
+   if (taskManagerMemoryMb % yarnMinAllocationMB != 0 || 
taskManagerMemoryMb == 0) {
+   noteMem.add("TaskManager memory(" + taskManagerMemoryMb 
+ ")");
+   }
+   if (noteMem.size() > 0) {
+   LOG.warn("The {} is not a multiple of YARN minimum 
allocation memory({}), so some extra memory will be wasted. "
+   + "Because YARN will always normalize the 
resource request by insuring that the requested memory is a multiple "
+   + "of minimum allocation. The value of 
'yarn.scheduler.minimum-allocation-mb' is '" + yarnMinAllocationMB + "'. "
+   + "Please increase the specified memory size.", 
noteMem.toString(), yarnMinAllocationMB);
 
 Review comment:
   I would suggest to wrap the checking and the logging into one method, and 
call it twice for both JM and TM. The method could look like:
   `logIfComponentMemNotIntegerMultipleOfYarnMinAllocation(String 
componentName, int componentMemoryMB, int yarnMinAllocationMB)`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Assigned] (FLINK-16252) Optimize the progress of the process_outputs in Python UDF

2020-02-23 Thread Dian Fu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16252?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dian Fu reassigned FLINK-16252:
---

Assignee: Huang Xingbo

> Optimize the progress of the process_outputs in Python UDF
> --
>
> Key: FLINK-16252
> URL: https://issues.apache.org/jira/browse/FLINK-16252
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Python
>Reporter: Huang Xingbo
>Assignee: Huang Xingbo
>Priority: Major
> Fix For: 1.11.0
>
>
> We need to optimize the function call chains in process_outputs to improve 
> the performance in Python UDF



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (FLINK-16251) Optimize the cost of function call in ScalarFunctionOpertation

2020-02-23 Thread Dian Fu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16251?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dian Fu reassigned FLINK-16251:
---

Assignee: Huang Xingbo

> Optimize the cost of function call  in ScalarFunctionOpertation
> ---
>
> Key: FLINK-16251
> URL: https://issues.apache.org/jira/browse/FLINK-16251
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Python
>Reporter: Huang Xingbo
>Assignee: Huang Xingbo
>Priority: Major
> Fix For: 1.11.0
>
>
> Currently, there are too many extra function calls cost in  
> ScalarFunctionOpertation.We need to optimize it to improve performance in 
> Python UDF.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-16252) Optimize the progress of the process_outputs in Python UDF

2020-02-23 Thread Huang Xingbo (Jira)
Huang Xingbo created FLINK-16252:


 Summary: Optimize the progress of the process_outputs in Python UDF
 Key: FLINK-16252
 URL: https://issues.apache.org/jira/browse/FLINK-16252
 Project: Flink
  Issue Type: Improvement
  Components: API / Python
Reporter: Huang Xingbo
 Fix For: 1.11.0


We need to optimize the function call chains in process_outputs to improve the 
performance in Python UDF



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] danny0405 commented on a change in pull request #11116: [FLINK-15349] add 'create catalog' DDL to blink planner

2020-02-23 Thread GitBox
danny0405 commented on a change in pull request #6: [FLINK-15349] add 
'create catalog' DDL to blink planner
URL: https://github.com/apache/flink/pull/6#discussion_r383064486
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/catalog/CatalogITCase.java
 ##
 @@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.catalog;
+
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.java.StreamTableEnvironment;
+import org.apache.flink.table.catalog.GenericInMemoryCatalog;
+
+import org.junit.Test;
+
+import static 
org.apache.flink.table.descriptors.GenericInMemoryCatalogValidator.CATALOG_TYPE_VALUE_GENERIC_IN_MEMORY;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * IT Case for catalog ddl.
+ */
+public class CatalogITCase {
+
+   @Test
+   public void testCreateCatalog() {
+   String name = "c1";
+   TableEnvironment tableEnv = getTableEnvironment();
+   String ddl = String.format("create catalog %s 
with('type'='%s')", name, CATALOG_TYPE_VALUE_GENERIC_IN_MEMORY);
+
+   tableEnv.sqlUpdate(ddl);
+
+   assertTrue(tableEnv.getCatalog(name).isPresent());
 
 Review comment:
   I know that, just curious if they should have the same life cycle.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #11176: [FLINK-15948][yarn] Enrich the warning log for YARN minimum allocation memory

2020-02-23 Thread GitBox
flinkbot edited a comment on issue #11176: [FLINK-15948][yarn] Enrich the 
warning log for YARN minimum allocation memory
URL: https://github.com/apache/flink/pull/11176#issuecomment-589671142
 
 
   
   ## CI report:
   
   * adb2e9e62190ba505ee86554a97452984fbc2de4 Travis: 
[SUCCESS](https://travis-ci.com/flink-ci/flink/builds/150018467) Azure: 
[FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5423)
 
   * 754693bae424353bac4cdd6d0db556ed8f6234f9 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-16251) Optimize the cost of function call in ScalarFunctionOpertation

2020-02-23 Thread Huang Xingbo (Jira)
Huang Xingbo created FLINK-16251:


 Summary: Optimize the cost of function call  in 
ScalarFunctionOpertation
 Key: FLINK-16251
 URL: https://issues.apache.org/jira/browse/FLINK-16251
 Project: Flink
  Issue Type: Improvement
  Components: API / Python
Reporter: Huang Xingbo
 Fix For: 1.11.0


Currently, there are too many extra function calls cost in  
ScalarFunctionOpertation.We need to optimize it to improve performance in 
Python UDF.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-16250) Add interfaces for PipelineStage and Pipeline

2020-02-23 Thread Hequn Cheng (Jira)
Hequn Cheng created FLINK-16250:
---

 Summary: Add interfaces for PipelineStage and Pipeline
 Key: FLINK-16250
 URL: https://issues.apache.org/jira/browse/FLINK-16250
 Project: Flink
  Issue Type: Sub-task
  Components: API / Python
Reporter: Hequn Cheng
Assignee: Hequn Cheng


A pipeline is a linear workflow that chains some PipelineStages, e.g., 
Estimators and Transformers to execute an algorithm. After this issue is 
addressed, Python users can write Python Pipelines.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (FLINK-15172) Optimize the operator algorithm to lazily allocate memory

2020-02-23 Thread Jingsong Lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15172?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jingsong Lee resolved FLINK-15172.
--
Resolution: Fixed

Merged in Master:

c11c696cca3b6ecafab605cceb36b7541444993f

56acfe9f666f590352453c9df6fbc385894de22f

f144e13acfe4ab37bfd9f65ae5cffd0b84fd78fe

adb595cd1445d0eefe9e23f85c79d8e9640cf07b

a907f29e24d27d1da9c981abed8ece2b211355eb

> Optimize the operator algorithm to lazily allocate memory
> -
>
> Key: FLINK-15172
> URL: https://issues.apache.org/jira/browse/FLINK-15172
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Runtime
>Reporter: Jingsong Lee
>Assignee: Jingsong Lee
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.11.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> Now after FLINK-14063 , operators will get all manage memory of TaskManager,  
> The cost of over allocate memory is very high, lead to performance regression 
> of small batch sql jobs:
>  * Allocate memory will have the cost of memory management algorithm.
>  * Allocate memory will have the cost of memory initialization, will set all 
> memory to zero. And this initialization will require the operating system to 
> actually allocate physical memory.
>  * Over allocate memory will squash the file cache too.
> We can optimize the operator algorithm, apply lazy allocation, and avoid 
> meaningless memory allocation.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-16249) Add interfaces for Params, ParamInfo and WithParams

2020-02-23 Thread Hequn Cheng (Jira)
Hequn Cheng created FLINK-16249:
---

 Summary: Add interfaces for Params, ParamInfo and WithParams
 Key: FLINK-16249
 URL: https://issues.apache.org/jira/browse/FLINK-16249
 Project: Flink
  Issue Type: Sub-task
  Components: API / Python
Reporter: Hequn Cheng
Assignee: Hequn Cheng


Parameters are widely used in machine learning realm. These classes define 
common interfaces to interact with classes with parameters.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] JingsongLi commented on issue #10797: [FLINK-15172][table-blink] Optimize the operator algorithm to lazily allocate memory

2020-02-23 Thread GitBox
JingsongLi commented on issue #10797: [FLINK-15172][table-blink] Optimize the 
operator algorithm to lazily allocate memory
URL: https://github.com/apache/flink/pull/10797#issuecomment-590144331
 
 
   Thanks @TsReaper for the reviewing. Merged.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi closed pull request #10797: [FLINK-15172][table-blink] Optimize the operator algorithm to lazily allocate memory

2020-02-23 Thread GitBox
JingsongLi closed pull request #10797: [FLINK-15172][table-blink] Optimize the 
operator algorithm to lazily allocate memory
URL: https://github.com/apache/flink/pull/10797
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-16248) Add interfaces for MLEnvironment and MLEnvironmentFactory

2020-02-23 Thread Hequn Cheng (Jira)
Hequn Cheng created FLINK-16248:
---

 Summary: Add interfaces for MLEnvironment and MLEnvironmentFactory
 Key: FLINK-16248
 URL: https://issues.apache.org/jira/browse/FLINK-16248
 Project: Flink
  Issue Type: Sub-task
  Components: API / Python
Reporter: Hequn Cheng
Assignee: Hequn Cheng


Align interface for MLEnvironment and MLEnvironmentFactory, so Python users can 
use Python MLEnvironmentFactory to maintain execution environment and table 
environment.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (FLINK-14509) Improve the README.md in pyflink to prepare for PyPI release

2020-02-23 Thread Dian Fu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14509?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dian Fu reassigned FLINK-14509:
---

Assignee: Wei Zhong  (was: Weizhong)

> Improve the README.md in pyflink to prepare for PyPI release
> 
>
> Key: FLINK-14509
> URL: https://issues.apache.org/jira/browse/FLINK-14509
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / Python
>Reporter: Dian Fu
>Assignee: Wei Zhong
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.10.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> Currently there are still some information missing for the README.md in 
> pyflink, such as the pyflink documentation link, etc.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] KarmaGYZ commented on a change in pull request #11100: [FLINK-15562][docs] Add Example settings.xml for maven archetype command wh…

2020-02-23 Thread GitBox
KarmaGYZ commented on a change in pull request #11100: [FLINK-15562][docs] Add 
Example settings.xml for maven archetype command wh…
URL: https://github.com/apache/flink/pull/11100#discussion_r383061588
 
 

 ##
 File path: docs/dev/projectsetup/scala_api_quickstart.md
 ##
 @@ -128,27 +128,46 @@ Use one of the following commands to __create a 
project__:
 
 
 
-{% highlight bash %}
-$ mvn archetype:generate   \
-  -DarchetypeGroupId=org.apache.flink  \
-  -DarchetypeArtifactId=flink-quickstart-scala \{% unless 
site.is_stable %}
-  
-DarchetypeCatalog=https://repository.apache.org/content/repositories/snapshots/
 \{% endunless %}
-  -DarchetypeVersion={{site.version}}
-{% endhighlight %}
+{% highlight bash %}
+$ mvn archetype:generate   \
+  -DarchetypeGroupId=org.apache.flink  \
+  -DarchetypeArtifactId=flink-quickstart-scala \{% unless site.is_stable %}
+  
-DarchetypeCatalog=https://repository.apache.org/content/repositories/snapshots/
 \{% endunless %}
+  -DarchetypeVersion={{site.version}}
+{% endhighlight %}
 This allows you to name your newly created project. It 
will interactively ask you for the groupId, artifactId, and package name.
 
 
 {% highlight bash %}
 {% if site.is_stable %}
-$ curl https://flink.apache.org/q/quickstart-scala.sh | bash -s 
{{site.version}}
+$ curl https://flink.apache.org/q/quickstart-scala.sh | bash -s 
{{site.version}}
 {% else %}
-$ curl https://flink.apache.org/q/quickstart-scala-SNAPSHOT.sh | bash -s 
{{site.version}}
+$ curl https://flink.apache.org/q/quickstart-scala-SNAPSHOT.sh | bash -s 
{{site.version}}
 {% endif %}
 {% endhighlight %}
 
 {% unless site.is_stable %}
 
-Note: For Maven 3.0 or higher, it is no longer possible to 
specify the repository (-DarchetypeCatalog) via the commandline. If you wish to 
use the snapshot repository, you need to add a repository entry to your 
settings.xml. For details about this change, please refer to http://maven.apache.org/archetype/maven-archetype-plugin/archetype-repository.html;>Maven
 official document
+Note: For Maven 3.0 or higher, it is no longer possible to 
specify the repository (-DarchetypeCatalog) via the command line. For details 
about this change, please refer to http://maven.apache.org/archetype/maven-archetype-plugin/archetype-repository.html;>Maven
 official document
+If you wish to use the snapshot repository, you need to add a 
repository entry to your settings.xml. For example:
+{% highlight bash %}
+
+  
 
 Review comment:
   Sorry, I did not understand what you mean. Do you mean to set it as the 
default profile?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Assigned] (FLINK-16031) Improve the description in the README file of PyFlink 1.9.x

2020-02-23 Thread Dian Fu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16031?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dian Fu reassigned FLINK-16031:
---

Assignee: Wei Zhong  (was: Weizhong)

>  Improve the description in the README file of PyFlink 1.9.x
> 
>
> Key: FLINK-16031
> URL: https://issues.apache.org/jira/browse/FLINK-16031
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Python
>Affects Versions: 1.9.1
>Reporter: Wei Zhong
>Assignee: Wei Zhong
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.9.2, 1.9.3
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Currently, the description in the README file of PyFlink 1.9.x is not 
> suitable for publishing in PyPI. It should be changed to be more 
> user-friendly.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (FLINK-16031) Improve the description in the README file of PyFlink 1.9.x

2020-02-23 Thread Dian Fu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16031?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dian Fu reassigned FLINK-16031:
---

Assignee: Weizhong

>  Improve the description in the README file of PyFlink 1.9.x
> 
>
> Key: FLINK-16031
> URL: https://issues.apache.org/jira/browse/FLINK-16031
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Python
>Affects Versions: 1.9.1
>Reporter: Wei Zhong
>Assignee: Weizhong
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.9.2, 1.9.3
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Currently, the description in the README file of PyFlink 1.9.x is not 
> suitable for publishing in PyPI. It should be changed to be more 
> user-friendly.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (FLINK-14509) Improve the README.md in pyflink to prepare for PyPI release

2020-02-23 Thread Dian Fu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14509?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dian Fu reassigned FLINK-14509:
---

Assignee: Weizhong  (was: Wei Zhong (old))

> Improve the README.md in pyflink to prepare for PyPI release
> 
>
> Key: FLINK-14509
> URL: https://issues.apache.org/jira/browse/FLINK-14509
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / Python
>Reporter: Dian Fu
>Assignee: Weizhong
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.10.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> Currently there are still some information missing for the README.md in 
> pyflink, such as the pyflink documentation link, etc.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot edited a comment on issue #11192: [FLINK-16237][build] Add Log4j2 configuration properties

2020-02-23 Thread GitBox
flinkbot edited a comment on issue #11192: [FLINK-16237][build] Add Log4j2 
configuration properties
URL: https://github.com/apache/flink/pull/11192#issuecomment-590057592
 
 
   
   ## CI report:
   
   * 6500c5170375b837320f00a032d64a7eec69135d Travis: 
[SUCCESS](https://travis-ci.com/flink-ci/flink/builds/150225021) 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot commented on issue #11197: [FLINK-16288][travis] Remove redundant double-quote at the end of the…

2020-02-23 Thread GitBox
flinkbot commented on issue #11197: [FLINK-16288][travis] Remove redundant 
double-quote at the end of the…
URL: https://github.com/apache/flink/pull/11197#issuecomment-590137926
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Automated Checks
   Last check on commit 45a5042fe34c700f4f9d84e207a788c44e36c055 (Mon Feb 24 
00:52:46 UTC 2020)
   
   **Warnings:**
* No documentation files were touched! Remember to keep the Flink docs up 
to date!
   
   
   Mention the bot in a comment to re-run the automated checks.
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full 
explanation of the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] KarmaGYZ opened a new pull request #11197: [FLINK-16288][travis] Remove redundant double-quote at the end of the…

2020-02-23 Thread GitBox
KarmaGYZ opened a new pull request #11197: [FLINK-16288][travis] Remove 
redundant double-quote at the end of the…
URL: https://github.com/apache/flink/pull/11197
 
 
   … env field
   
   
   
   ## What is the purpose of the change
   
   Remove redundant double-quote at the end of the env field, which cause mesos 
e2e test fail.
   
   ## Brief change log
   
   Remove redundant double-quote at the end of the env field.
   
   ## Verifying this change
   
   Trigger e2e test
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? no
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode

2020-02-23 Thread GitBox
gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] 
Let Netty use Flink's buffers directly in credit-based mode
URL: https://github.com/apache/flink/pull/7368#discussion_r383055795
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/ByteBufUtils.java
 ##
 @@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.netty;
+
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+
+/**
+ * Utility routines to process Netty ByteBuf.
+ */
+public class ByteBufUtils {
+
+   /**
+* Cumulates data from the source buffer to the target buffer.
+*
+* @param cumulationBuf The target buffer.
+* @param src The source buffer.
+* @param expectedSize The expected length to cumulate.
+*
+* @return The ByteBuf containing cumulated data or null if not enough 
data has been cumulated.
+*/
+   public static ByteBuf cumulate(ByteBuf cumulationBuf, ByteBuf src, int 
expectedSize) {
 
 Review comment:
   Have renamed `cumulate` into `accumulate` and `cumulation` into 
`accumulation`, and renamed the variable to target.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode

2020-02-23 Thread GitBox
gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] 
Let Netty use Flink's buffers directly in credit-based mode
URL: https://github.com/apache/flink/pull/7368#discussion_r383055217
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/ByteBufUtils.java
 ##
 @@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.netty;
+
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+
+/**
+ * Utility routines to process Netty ByteBuf.
+ */
+public class ByteBufUtils {
+
+   /**
+* Cumulates data from the source buffer to the target buffer.
+*
+* @param cumulationBuf The target buffer.
+* @param src The source buffer.
+* @param expectedSize The expected length to cumulate.
+*
+* @return The ByteBuf containing cumulated data or null if not enough 
data has been cumulated.
+*/
+   public static ByteBuf cumulate(ByteBuf cumulationBuf, ByteBuf src, int 
expectedSize) {
+   // If the cumulation buffer is empty and src has enought bytes,
 
 Review comment:
   Fixed the typo.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode

2020-02-23 Thread GitBox
gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] 
Let Netty use Flink's buffers directly in credit-based mode
URL: https://github.com/apache/flink/pull/7368#discussion_r383055205
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NetworkBufferAllocator.java
 ##
 @@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.netty;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.io.network.NetworkClientHandler;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
+import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
+import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID;
+import 
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
+
+import javax.annotation.Nullable;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * An allocator used for requesting buffers in the receiver side of netty 
handlers.
+ */
+class NetworkBufferAllocator {
+   private final NetworkClientHandler partitionRequestClientHandler;
 
 Review comment:
   Renamed the variable.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode

2020-02-23 Thread GitBox
gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] 
Let Netty use Flink's buffers directly in credit-based mode
URL: https://github.com/apache/flink/pull/7368#discussion_r383055079
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/BufferResponseDecoder.java
 ##
 @@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.netty;
+
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
+
+import static 
org.apache.flink.runtime.io.network.netty.NettyMessage.BufferResponse;
+import static 
org.apache.flink.runtime.io.network.netty.NettyMessage.BufferResponse.MESSAGE_HEADER_LENGTH;
+
+/**
+ * The parser for {@link 
org.apache.flink.runtime.io.network.netty.NettyMessage.BufferResponse}.
+ */
+class BufferResponseDecoder extends NettyMessageDecoder {
+
+   /** The Flink Buffer allocator. */
+   private final NetworkBufferAllocator allocator;
+
+   /** The cumulation buffer of message header. */
+   private ByteBuf messageHeaderBuffer;
+
+   /**
+* The current BufferResponse message that are process the buffer part.
+* If it is null, we are still processing the message header part, 
otherwise
+* we are processing the buffer part.
+*/
+   private BufferResponse currentResponse;
+
+   /** How much bytes have been received or discarded for the buffer part. 
*/
 
 Review comment:
   Have modified accordingly.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode

2020-02-23 Thread GitBox
gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] 
Let Netty use Flink's buffers directly in credit-based mode
URL: https://github.com/apache/flink/pull/7368#discussion_r383055067
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/BufferResponseDecoder.java
 ##
 @@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.netty;
+
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
+
+import static 
org.apache.flink.runtime.io.network.netty.NettyMessage.BufferResponse;
+import static 
org.apache.flink.runtime.io.network.netty.NettyMessage.BufferResponse.MESSAGE_HEADER_LENGTH;
+
+/**
+ * The parser for {@link 
org.apache.flink.runtime.io.network.netty.NettyMessage.BufferResponse}.
 
 Review comment:
   Have modified Accordingly.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode

2020-02-23 Thread GitBox
gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] 
Let Netty use Flink's buffers directly in credit-based mode
URL: https://github.com/apache/flink/pull/7368#discussion_r383055157
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NonBufferResponseDecoder.java
 ##
 @@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.netty;
+
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
+
+import java.net.ProtocolException;
+
+/**
+ * The parser for messages without specific parser. It receives the whole
+ * messages and then delegate the parsing to the targeted messages.
+ */
+class NonBufferResponseDecoder extends NettyMessageDecoder {
+
+   /** The initial size of the message header cumulator buffer. */
+   private static final int INITIAL_MESSAGE_HEADER_BUFFER_LENGTH = 128;
+
+   /** The cumulation buffer of message header. */
+   private ByteBuf messageBuffer;
+
+   @Override
+   public void onChannelActive(ChannelHandlerContext ctx) {
+   messageBuffer = 
ctx.alloc().directBuffer(INITIAL_MESSAGE_HEADER_BUFFER_LENGTH);
+   }
+
+   @Override
+   public ParseResult onChannelRead(ByteBuf data) throws Exception {
+   ensureBufferCapacityIfNewMessage();
+
+   ByteBuf toDecode = ByteBufUtils.cumulate(messageBuffer, data, 
messageLength);
+
+   if (toDecode == null) {
+   return ParseResult.notFinished();
+   }
+
+   NettyMessage nettyMessage;
+   switch (msgId) {
+   case NettyMessage.ErrorResponse.ID:
 
 Review comment:
   Modified to add `import static 
org.apache.flink.runtime.io.network.netty.NettyMessage.ErrorResponse` in 
advance.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


  1   2   3   4   5   >