[GitHub] [flink] klion26 commented on a change in pull request #13212: [FLINK-18973][docs-zh] Translate the 'History Server' page of 'Debugging & Monitoring' into Chinese

2020-08-25 Thread GitBox


klion26 commented on a change in pull request #13212:
URL: https://github.com/apache/flink/pull/13212#discussion_r477046283



##
File path: docs/monitoring/historyserver.zh.md
##
@@ -22,62 +22,67 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-Flink has a history server that can be used to query the statistics of 
completed jobs after the corresponding Flink cluster has been shut down.
+Flink 提供了 history server,可以在相应的 Flink 集群关闭之后查询已完成作业的统计信息。
 
-Furthermore, it exposes a REST API that accepts HTTP requests and responds 
with JSON data.
+此外,它暴露了一套 REST API,该 API 接受 HTTP 请求并以 JSON 数据格式进行响应。

Review comment:
   `并返回 JSON 格式的数据` 会好一些吗?

##
File path: docs/monitoring/historyserver.zh.md
##
@@ -22,62 +22,67 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-Flink has a history server that can be used to query the statistics of 
completed jobs after the corresponding Flink cluster has been shut down.
+Flink 提供了 history server,可以在相应的 Flink 集群关闭之后查询已完成作业的统计信息。
 
-Furthermore, it exposes a REST API that accepts HTTP requests and responds 
with JSON data.
+此外,它暴露了一套 REST API,该 API 接受 HTTP 请求并以 JSON 数据格式进行响应。
 
 * This will be replaced by the TOC
 {:toc}
 
-## Overview
+
 
-The HistoryServer allows you to query the status and statistics of completed 
jobs that have been archived by a JobManager.
+## 概览
 
-After you have configured the HistoryServer *and* JobManager, you start and 
stop the HistoryServer via its corresponding startup script:
+HistoryServer 允许查询 JobManager 存档的已完成作业的状态和统计信息。
+
+在配置 HistoryServer *和* JobManager 之后,你可以使用其相应的启动脚本来启动和停止 HistoryServer:

Review comment:
   `你可以使用相应的脚本来启动和停止 HistoryServer` 会好一些吗?

##
File path: docs/monitoring/historyserver.zh.md
##
@@ -22,62 +22,67 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-Flink has a history server that can be used to query the statistics of 
completed jobs after the corresponding Flink cluster has been shut down.
+Flink 提供了 history server,可以在相应的 Flink 集群关闭之后查询已完成作业的统计信息。
 
-Furthermore, it exposes a REST API that accepts HTTP requests and responds 
with JSON data.
+此外,它暴露了一套 REST API,该 API 接受 HTTP 请求并以 JSON 数据格式进行响应。
 
 * This will be replaced by the TOC
 {:toc}
 
-## Overview
+
 
-The HistoryServer allows you to query the status and statistics of completed 
jobs that have been archived by a JobManager.
+## 概览
 
-After you have configured the HistoryServer *and* JobManager, you start and 
stop the HistoryServer via its corresponding startup script:
+HistoryServer 允许查询 JobManager 存档的已完成作业的状态和统计信息。
+
+在配置 HistoryServer *和* JobManager 之后,你可以使用其相应的启动脚本来启动和停止 HistoryServer:
 
 {% highlight shell %}
-# Start or stop the HistoryServer
+# 启动或者停止 HistoryServer
 bin/historyserver.sh (start|start-foreground|stop)
 {% endhighlight %}
 
-By default, this server binds to `localhost` and listens at port `8082`.
+默认情况下,此服务器绑定到 `localhost` 并监听 `8082` 端口。

Review comment:
   “此服务器绑定到 `localhost` 的 `8082` 端口”  这样描述会好一些吗?

##
File path: docs/monitoring/historyserver.zh.md
##
@@ -22,62 +22,67 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-Flink has a history server that can be used to query the statistics of 
completed jobs after the corresponding Flink cluster has been shut down.
+Flink 提供了 history server,可以在相应的 Flink 集群关闭之后查询已完成作业的统计信息。
 
-Furthermore, it exposes a REST API that accepts HTTP requests and responds 
with JSON data.
+此外,它暴露了一套 REST API,该 API 接受 HTTP 请求并以 JSON 数据格式进行响应。
 
 * This will be replaced by the TOC
 {:toc}
 
-## Overview
+
 
-The HistoryServer allows you to query the status and statistics of completed 
jobs that have been archived by a JobManager.
+## 概览
 
-After you have configured the HistoryServer *and* JobManager, you start and 
stop the HistoryServer via its corresponding startup script:
+HistoryServer 允许查询 JobManager 存档的已完成作业的状态和统计信息。
+
+在配置 HistoryServer *和* JobManager 之后,你可以使用其相应的启动脚本来启动和停止 HistoryServer:
 
 {% highlight shell %}
-# Start or stop the HistoryServer
+# 启动或者停止 HistoryServer
 bin/historyserver.sh (start|start-foreground|stop)
 {% endhighlight %}
 
-By default, this server binds to `localhost` and listens at port `8082`.
+默认情况下,此服务器绑定到 `localhost` 并监听 `8082` 端口。
+
+目前,只能将 HistoryServer 作为独立的进程运行。
 
-Currently, you can only run it as a standalone process.
+
 
-## Configuration
+## 配置参数
 
-The configuration keys `jobmanager.archive.fs.dir` and 
`historyserver.archive.fs.refresh-interval` need to be adjusted for archiving 
and displaying archived jobs.
+配置项 `jobmanager.archive.fs.dir` 和 `historyserver.archive.fs.refresh-interval` 
需要根据归档路径和已归档作业刷新间隔进行调整。

Review comment:
   `和已归档作业刷新间隔进行调整` 这里的描述可以看出意思,不过读起来有点点拗口,这里有更好的描述吗?或者看看其他人是否有更好的建议

##
File path: docs/monitoring/historyserver.zh.md
##
@@ -22,62 +22,67 @@ specific language governing permissions and limitations
 under the 

[jira] [Closed] (FLINK-11712) Add cost model for both batch and streaming

2020-08-25 Thread godfrey he (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-11712?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

godfrey he closed FLINK-11712.
--
Resolution: Resolved

> Add cost model for both batch and streaming
> ---
>
> Key: FLINK-11712
> URL: https://issues.apache.org/jira/browse/FLINK-11712
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / Planner
>Reporter: godfrey he
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink-statefun] billyrrr commented on pull request #131: [FLINK-18968] Translate README.md to Chinese

2020-08-25 Thread GitBox


billyrrr commented on pull request #131:
URL: https://github.com/apache/flink-statefun/pull/131#issuecomment-680671983


   Hi, @carp84 Thank you for the review. 
   
   I think that "stateful functions" should be translated to chinese to promote 
communications within people who primarily speak Chinese. Suppose that someone 
is proficient at processing stateful streams but not proficient in English, 
they would not be able to notice the target use case of Flink Stateful 
Functions. On another note, when someone wants to refer flink stateful 
functions to another person, it is easier to use the term 'Flink有状态函数' in their 
native language rather than ' Flink Stateful Functions'. Nevertheless, the 
English term on its own has its merit of accuracy and ease of finding related 
documentations. So I do understand your point too. 
   
   I think that there are more benefit from translating 'Flink Stateful 
Functions' to 'Flink有状态函数'. Can we discuss more about this decision? Thanks! 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #13225: [FLINK-18974][docs-zh]Translate the 'User-Defined Functions' page of "Application Development's DataStream API" into Chinese

2020-08-25 Thread GitBox


flinkbot edited a comment on pull request #13225:
URL: https://github.com/apache/flink/pull/13225#issuecomment-678953566


   
   ## CI report:
   
   * 6bfd27b44774c770a2d28ee9ca1bde0e11bfbe29 Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5870)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (FLINK-19025) table sql write orc file but hive2.1.1 can not read

2020-08-25 Thread Rui Li (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19025?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17184947#comment-17184947
 ] 

Rui Li commented on FLINK-19025:


[~McClone] The stacktrace indicates you're writing streaming data. Therefore 
you'll need FLINK-18659 to fix the issue.

> table sql write orc file but hive2.1.1 can not read
> ---
>
> Key: FLINK-19025
> URL: https://issues.apache.org/jira/browse/FLINK-19025
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / ORC
>Affects Versions: 1.11.0
>Reporter: McClone
>Priority: Major
>
> table sql write orc file but hive2.1.1 create external table can not read 
> data.Because flink use orc-core-1.5.6.jar but hive 2.1.1 use his own orcfile 
> jar.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-19025) table sql write orc file but hive2.1.1 can not read

2020-08-25 Thread McClone (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19025?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17184946#comment-17184946
 ] 

McClone commented on FLINK-19025:
-

Exception in thread "main" java.lang.NoClassDefFoundError: 
org/apache/orc/PhysicalWriterException in thread "main" 
java.lang.NoClassDefFoundError: org/apache/orc/PhysicalWriter at 
org.apache.flink.table.catalog.hive.client.HiveShimV200.createOrcBulkWriterFactory(HiveShimV200.java:60)
 at 
org.apache.flink.connectors.hive.HiveTableSink.createBulkWriterFactory(HiveTableSink.java:251)
 at 
org.apache.flink.connectors.hive.HiveTableSink.consumeDataStream(HiveTableSink.java:192)
 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:114)
 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:48)
 at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlan(StreamExecLegacySink.scala:48)
 at 
org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:67)
 at 
org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:66)
 at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 at scala.collection.Iterator$class.foreach(Iterator.scala:891) at 
scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at 
scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at 
scala.collection.AbstractIterable.foreach(Iterable.scala:54) at 
scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at 
scala.collection.AbstractTraversable.map(Traversable.scala:104) at 
org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:66)
 at 
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:166)
 at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1248)
 at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:694)
 at 
org.apache.flink.table.api.internal.StatementSetImpl.execute(StatementSetImpl.java:97)
 at com.galaxy.sunny.flink.StartTableJob.main(StartTableJob.java:45)Caused by: 
java.lang.ClassNotFoundException: org.apache.orc.PhysicalWriter at 
java.net.URLClassLoader.findClass(URLClassLoader.java:381) at 
java.lang.ClassLoader.loadClass(ClassLoader.java:424) at 
sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349) at 
java.lang.ClassLoader.loadClass(ClassLoader.java:357) ... 23 more

> table sql write orc file but hive2.1.1 can not read
> ---
>
> Key: FLINK-19025
> URL: https://issues.apache.org/jira/browse/FLINK-19025
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / ORC
>Affects Versions: 1.11.0
>Reporter: McClone
>Priority: Major
>
> table sql write orc file but hive2.1.1 create external table can not read 
> data.Because flink use orc-core-1.5.6.jar but hive 2.1.1 use his own orcfile 
> jar.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot edited a comment on pull request #13225: [FLINK-18974][docs-zh]Translate the 'User-Defined Functions' page of "Application Development's DataStream API" into Chinese

2020-08-25 Thread GitBox


flinkbot edited a comment on pull request #13225:
URL: https://github.com/apache/flink/pull/13225#issuecomment-678953566


   
   ## CI report:
   
   * 40658a82560ebc79ee2047a789db4fa003115099 Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5808)
 
   * 6bfd27b44774c770a2d28ee9ca1bde0e11bfbe29 Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5870)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (FLINK-19022) AkkaRpcActor failed to start but no exception information

2020-08-25 Thread tartarus (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17184921#comment-17184921
 ] 

tartarus commented on FLINK-19022:
--

[~trohrmann] I want to confirm the little details.

we pass a {{FatalErrorHandler}} to the {{DispatcherResourceManagerComponent, 
and then we need to remove }}{{FatalErrorHandler from {{ResourceManager}} and 
{{Dispatcher}} ?}}

Just register the {{TerminationFuture}} of ResourceManager}} and 
{{Dispatcher to {{DispatcherResourceManagerComponent.}}

> AkkaRpcActor failed to start but no exception information
> -
>
> Key: FLINK-19022
> URL: https://issues.apache.org/jira/browse/FLINK-19022
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.10.0, 1.12.0, 1.11.1
>Reporter: tartarus
>Assignee: tartarus
>Priority: Critical
> Fix For: 1.12.0, 1.11.2, 1.10.3
>
>
> My job appeared that JM could not start normally, and the JM container was 
> finally killed by RM.
> In the end, I found through debug that AkkaRpcActor failed to start because 
> the version of yarn in my job was incompatible with the version in the 
> cluster.
> [AkkaRpcActor exception 
> handling|https://github.com/apache/flink/blob/478c9657fe1240acdc1eb08ad32ea93e08b0cd5e/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java#L550]
> I add log printing here,and then found the specific problem.
> {code:java}
> 2020-08-21 21:31:16,985 ERROR 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StoppedState 
> [flink-akka.actor.default-dispatcher-4]  - Could not start RpcEndpoint 
> resourcemanager.
> java.lang.NoSuchMethodError: 
> org.apache.hadoop.yarn.api.ApplicationMasterProtocolPB.registerApplicationMaster(Lcom/google/protobuf/RpcController;Lorg/apache/hadoop/yarn/proto/YarnServiceProtos$RegisterApplicationMasterRequestProto;)Lorg/apache/hadoop/yarn/proto/YarnServiceProtos$RegisterApplicationMasterResponseProto;
>   at 
> org.apache.hadoop.yarn.api.impl.pb.client.ApplicationMasterProtocolPBClientImpl.registerApplicationMaster(ApplicationMasterProtocolPBClientImpl.java:106)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
>   at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
>   at com.sun.proxy.$Proxy25.registerApplicationMaster(Unknown Source)
>   at 
> org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl.registerApplicationMaster(AMRMClientImpl.java:222)
>   at 
> org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl.registerApplicationMaster(AMRMClientImpl.java:214)
>   at 
> org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl.registerApplicationMaster(AMRMClientAsyncImpl.java:138)
>   at 
> org.apache.flink.yarn.YarnResourceManager.createAndStartResourceManagerClient(YarnResourceManager.java:229)
>   at 
> org.apache.flink.yarn.YarnResourceManager.initialize(YarnResourceManager.java:262)
>   at 
> org.apache.flink.runtime.resourcemanager.ResourceManager.startResourceManagerServices(ResourceManager.java:204)
>   at 
> org.apache.flink.runtime.resourcemanager.ResourceManager.onStart(ResourceManager.java:192)
>   at 
> org.apache.flink.runtime.rpc.RpcEndpoint.internalCallOnStart(RpcEndpoint.java:185)
>   at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StoppedState.start(AkkaRpcActor.java:544)
>   at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleControlMessage(AkkaRpcActor.java:169)
>   at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
>   at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
>   at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
>   at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
>   at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
>   at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>   at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
>   at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
>   at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
>   at akka.actor.ActorCell.invoke(ActorCell.scala:561)
>   at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
>   at akka.dispatch.Mailbox.run(Mailbox.scala:225)
>   at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
>  

[GitHub] [flink] flinkbot edited a comment on pull request #13225: [FLINK-18974][docs-zh]Translate the 'User-Defined Functions' page of "Application Development's DataStream API" into Chinese

2020-08-25 Thread GitBox


flinkbot edited a comment on pull request #13225:
URL: https://github.com/apache/flink/pull/13225#issuecomment-678953566


   
   ## CI report:
   
   * 40658a82560ebc79ee2047a789db4fa003115099 Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5808)
 
   * 6bfd27b44774c770a2d28ee9ca1bde0e11bfbe29 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #13244: [FLINK-18333][jdbc] UnsignedTypeConversionITCase failed caused by MariaDB4j "Asked to waitFor Program"

2020-08-25 Thread GitBox


flinkbot edited a comment on pull request #13244:
URL: https://github.com/apache/flink/pull/13244#issuecomment-680535784


   
   ## CI report:
   
   * f5fee133d28436ab73b03d05b1f6fadc8648f5d0 Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5869)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (FLINK-19025) table sql write orc file but hive2.1.1 can not read

2020-08-25 Thread Rui Li (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19025?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17184918#comment-17184918
 ] 

Rui Li commented on FLINK-19025:


Hey [~McClone], are you writing streaming data into hive orc table? If so, 
there's indeed a known issue which has been fixed in FLINK-18659. You should be 
able to run this use case if you apply that patch, and set 
{{table.exec.hive.fallback-mapred-writer=true}}, which is the default setting.

If you hit the issue when writing batch data into hive, please provide the 
stacktrace of the exception.

> table sql write orc file but hive2.1.1 can not read
> ---
>
> Key: FLINK-19025
> URL: https://issues.apache.org/jira/browse/FLINK-19025
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / ORC
>Affects Versions: 1.11.0
>Reporter: McClone
>Priority: Major
>
> table sql write orc file but hive2.1.1 create external table can not read 
> data.Because flink use orc-core-1.5.6.jar but hive 2.1.1 use his own orcfile 
> jar.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] zhuxiaoshang commented on pull request #13225: [FLINK-18974][docs-zh]Translate the 'User-Defined Functions' page of "Application Development's DataStream API" into Chinese

2020-08-25 Thread GitBox


zhuxiaoshang commented on pull request #13225:
URL: https://github.com/apache/flink/pull/13225#issuecomment-680556966


   Thanks for your review @RocMarshal.I have made some changes according to 
your suggestions.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] zhuxiaoshang commented on a change in pull request #13225: [FLINK-18974][docs-zh]Translate the 'User-Defined Functions' page of "Application Development's DataStream API" into Chinese

2020-08-25 Thread GitBox


zhuxiaoshang commented on a change in pull request #13225:
URL: https://github.com/apache/flink/pull/13225#discussion_r477020865



##
File path: docs/dev/user_defined_functions.zh.md
##
@@ -147,95 +142,75 @@ data.map (new RichMapFunction[String, Int] {
 
 
 
-Rich functions provide, in addition to the user-defined function (map,
-reduce, etc), four methods: `open`, `close`, `getRuntimeContext`, and
-`setRuntimeContext`. These are useful for parameterizing the function
-(see [Passing Parameters to Functions]({{ site.baseurl 
}}/dev/batch/index.html#passing-parameters-to-functions)),
-creating and finalizing local state, accessing broadcast variables (see
-[Broadcast Variables]({{ site.baseurl 
}}/dev/batch/index.html#broadcast-variables)), and for accessing runtime
-information such as accumulators and counters (see
-[Accumulators and Counters](#accumulators--counters)), and information
-on iterations (see [Iterations]({{ site.baseurl }}/dev/batch/iterations.html)).
+除了用户自定义的功能(map,reduce 等),富函数还提供了四个方法:`open`、`close`、`getRuntimeContext` 和
+`setRuntimeContext`。这些对于参数化功能很有用
+(参阅 [给函数传递参数]({{ site.baseurl 
}}/zh/dev/batch/index.html#passing-parameters-to-functions)),
+创建和最终确定本地状态,访问广播变量(参阅
+[广播变量]({{ site.baseurl 
}}/zh/dev/batch/index.html#broadcast-variables)),以及访问运行时信息,例如累加器和计数器(参阅

Review comment:
   These link-tags can be linked to correct pages,so i think it's 
unnecessary to change.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot commented on pull request #13244: [FLINK-18333][jdbc] UnsignedTypeConversionITCase failed caused by MariaDB4j "Asked to waitFor Program"

2020-08-25 Thread GitBox


flinkbot commented on pull request #13244:
URL: https://github.com/apache/flink/pull/13244#issuecomment-680535784


   
   ## CI report:
   
   * f5fee133d28436ab73b03d05b1f6fadc8648f5d0 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot commented on pull request #13244: [FLINK-18333][jdbc] UnsignedTypeConversionITCase failed caused by MariaDB4j "Asked to waitFor Program"

2020-08-25 Thread GitBox


flinkbot commented on pull request #13244:
URL: https://github.com/apache/flink/pull/13244#issuecomment-680526208


   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Automated Checks
   Last check on commit f5fee133d28436ab73b03d05b1f6fadc8648f5d0 (Wed Aug 26 
03:47:11 UTC 2020)
   
   **Warnings:**
* No documentation files were touched! Remember to keep the Flink docs up 
to date!
   
   
   Mention the bot in a comment to re-run the automated checks.
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full 
explanation of the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (FLINK-18333) UnsignedTypeConversionITCase failed caused by MariaDB4j "Asked to waitFor Program"

2020-08-25 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-18333?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-18333:
---
Labels: pull-request-available test-stability  (was: test-stability)

> UnsignedTypeConversionITCase failed caused by MariaDB4j "Asked to waitFor 
> Program"
> --
>
> Key: FLINK-18333
> URL: https://issues.apache.org/jira/browse/FLINK-18333
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / JDBC, Table SQL / Ecosystem
>Affects Versions: 1.12.0
>Reporter: Jark Wu
>Assignee: Leonard Xu
>Priority: Critical
>  Labels: pull-request-available, test-stability
>
> https://dev.azure.com/rmetzger/Flink/_build/results?buildId=8173=logs=66592496-52df-56bb-d03e-37509e1d9d0f=ae0269db-6796-5583-2e5f-d84757d711aa
> {code}
> 2020-06-16T08:23:26.3013987Z [INFO] Running 
> org.apache.flink.connector.jdbc.table.JdbcDynamicTableSourceITCase
> 2020-06-16T08:23:30.2252334Z Tue Jun 16 08:23:30 UTC 2020 Thread[main,5,main] 
> java.lang.NoSuchFieldException: DEV_NULL
> 2020-06-16T08:23:31.2907920Z 
> 
> 2020-06-16T08:23:31.2913806Z Tue Jun 16 08:23:30 UTC 2020:
> 2020-06-16T08:23:31.2914839Z Booting Derby version The Apache Software 
> Foundation - Apache Derby - 10.14.2.0 - (1828579): instance 
> a816c00e-0172-bc39-e4b1-0e4ce818 
> 2020-06-16T08:23:31.2915845Z on database directory 
> memory:/__w/1/s/flink-connectors/flink-connector-jdbc/target/test with class 
> loader sun.misc.Launcher$AppClassLoader@677327b6 
> 2020-06-16T08:23:31.2916637Z Loaded from 
> file:/__w/1/.m2/repository/org/apache/derby/derby/10.14.2.0/derby-10.14.2.0.jar
> 2020-06-16T08:23:31.2916968Z java.vendor=Private Build
> 2020-06-16T08:23:31.2917461Z 
> java.runtime.version=1.8.0_242-8u242-b08-0ubuntu3~16.04-b08
> 2020-06-16T08:23:31.2922200Z 
> user.dir=/__w/1/s/flink-connectors/flink-connector-jdbc/target
> 2020-06-16T08:23:31.2922516Z os.name=Linux
> 2020-06-16T08:23:31.2922709Z os.arch=amd64
> 2020-06-16T08:23:31.2923086Z os.version=4.15.0-1083-azure
> 2020-06-16T08:23:31.2923316Z derby.system.home=null
> 2020-06-16T08:23:31.2923616Z 
> derby.stream.error.field=org.apache.flink.connector.jdbc.JdbcTestBase.DEV_NULL
> 2020-06-16T08:23:31.2924790Z Database Class Loader started - 
> derby.database.classpath=''
> 2020-06-16T08:23:37.4354243Z [INFO] Tests run: 2, Failures: 0, Errors: 0, 
> Skipped: 0, Time elapsed: 11.133 s - in 
> org.apache.flink.connector.jdbc.table.JdbcDynamicTableSourceITCase
> 2020-06-16T08:23:38.1880075Z [INFO] Running 
> org.apache.flink.connector.jdbc.table.JdbcTableSourceITCase
> 2020-06-16T08:23:41.3718038Z Tue Jun 16 08:23:41 UTC 2020 Thread[main,5,main] 
> java.lang.NoSuchFieldException: DEV_NULL
> 2020-06-16T08:23:41.4383244Z 
> 
> 2020-06-16T08:23:41.4401761Z Tue Jun 16 08:23:41 UTC 2020:
> 2020-06-16T08:23:41.4402797Z Booting Derby version The Apache Software 
> Foundation - Apache Derby - 10.14.2.0 - (1828579): instance 
> a816c00e-0172-bc3a-103b-0e4b0610 
> 2020-06-16T08:23:41.4403758Z on database directory 
> memory:/__w/1/s/flink-connectors/flink-connector-jdbc/target/test with class 
> loader sun.misc.Launcher$AppClassLoader@677327b6 
> 2020-06-16T08:23:41.4404581Z Loaded from 
> file:/__w/1/.m2/repository/org/apache/derby/derby/10.14.2.0/derby-10.14.2.0.jar
> 2020-06-16T08:23:41.4404945Z java.vendor=Private Build
> 2020-06-16T08:23:41.4405497Z 
> java.runtime.version=1.8.0_242-8u242-b08-0ubuntu3~16.04-b08
> 2020-06-16T08:23:41.4406048Z 
> user.dir=/__w/1/s/flink-connectors/flink-connector-jdbc/target
> 2020-06-16T08:23:41.4406303Z os.name=Linux
> 2020-06-16T08:23:41.4406494Z os.arch=amd64
> 2020-06-16T08:23:41.4406878Z os.version=4.15.0-1083-azure
> 2020-06-16T08:23:41.4407097Z derby.system.home=null
> 2020-06-16T08:23:41.4407415Z 
> derby.stream.error.field=org.apache.flink.connector.jdbc.JdbcTestBase.DEV_NULL
> 2020-06-16T08:23:41.5287219Z Database Class Loader started - 
> derby.database.classpath=''
> 2020-06-16T08:23:46.4567063Z [ERROR] Tests run: 1, Failures: 0, Errors: 1, 
> Skipped: 0, Time elapsed: 23.729 s <<< FAILURE! - in 
> org.apache.flink.connector.jdbc.table.UnsignedTypeConversionITCase
> 2020-06-16T08:23:46.4575785Z [ERROR] 
> org.apache.flink.connector.jdbc.table.UnsignedTypeConversionITCase  Time 
> elapsed: 23.729 s  <<< ERROR!
> 2020-06-16T08:23:46.4576490Z ch.vorburger.exec.ManagedProcessException: An 
> error occurred while running a command: create database if not exists `test`;
> 2020-06-16T08:23:46.4577193Z  at ch.vorburger.mariadb4j.DB.run(DB.java:300)
> 2020-06-16T08:23:46.4577537Z  at ch.vorburger.mariadb4j.DB.run(DB.java:265)
> 2020-06-16T08:23:46.4577861Z 

[GitHub] [flink] leonardBang opened a new pull request #13244: [FLINK-18333][jdbc] UnsignedTypeConversionITCase failed caused by MariaDB4j "Asked to waitFor Program"

2020-08-25 Thread GitBox


leonardBang opened a new pull request #13244:
URL: https://github.com/apache/flink/pull/13244


   ## What is the purpose of the change
   
   * This pull request fix up unstable test UnsignedTypeConversionITCase.
   
   
   ## Brief change log
   
 - Avoid using MariaDB4jRule to create a DB instance, because the 
MariaDB4jRule will pull up a `ManagedProcess` to run the create db statement 
which may exit unexpected.
   - Add retry logic for initializing the DB instance. 
   
   ## Verifying this change
   
   This change is an unstable test fix.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): ( no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #13243: [FLINK-18992][table] Fix Table API renameColumns method annotation error

2020-08-25 Thread GitBox


flinkbot edited a comment on pull request #13243:
URL: https://github.com/apache/flink/pull/13243#issuecomment-680489296


   
   ## CI report:
   
   * 7abb7dec5769d7d9569b17de97c477c989f67ed2 Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5868)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink-benchmarks] zhijiangW commented on a change in pull request #2: [FLINK-19003][checkpointing] Add micro-benchmark for unaligned checkpoints

2020-08-25 Thread GitBox


zhijiangW commented on a change in pull request #2:
URL: https://github.com/apache/flink-benchmarks/pull/2#discussion_r477009866



##
File path: 
src/main/java/org/apache/flink/benchmark/RemoteChannelThroughputBenchmark.java
##
@@ -80,6 +84,7 @@ public void remoteRebalance(FlinkEnvironmentContext context) 
throws Exception {
 StreamExecutionEnvironment env = context.env;
 env.enableCheckpointing(CHECKPOINT_INTERVAL_MS);
 env.setParallelism(PARALLELISM);
+
env.getCheckpointConfig().enableUnalignedCheckpoints(!mode.equals("AlignedCheckpoint"));

Review comment:
   Either option sounds fine to me. But where is the codespeed UI and can I 
modify it?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #13186: [FLINK-18720][k8s] Migrate KubernetesResourceManager to the new KubernetesResourceManagerDriver

2020-08-25 Thread GitBox


flinkbot edited a comment on pull request #13186:
URL: https://github.com/apache/flink/pull/13186#issuecomment-675299227


   
   ## CI report:
   
   * 63c2f658cc309661c434451a5d8c0b38d2e748d3 Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5848)
 
   * 952d92954bb418aa30820b811bc5a41f76e9ab21 Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5867)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot commented on pull request #13243: [FLINK-18992][table] Fix Table API renameColumns method annotation error

2020-08-25 Thread GitBox


flinkbot commented on pull request #13243:
URL: https://github.com/apache/flink/pull/13243#issuecomment-680489296


   
   ## CI report:
   
   * 7abb7dec5769d7d9569b17de97c477c989f67ed2 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink-benchmarks] zhijiangW commented on a change in pull request #2: [FLINK-19003][checkpointing] Add micro-benchmark for unaligned checkpoints

2020-08-25 Thread GitBox


zhijiangW commented on a change in pull request #2:
URL: https://github.com/apache/flink-benchmarks/pull/2#discussion_r477008616



##
File path: 
src/main/java/org/apache/flink/benchmark/RemoteChannelThroughputBenchmark.java
##
@@ -48,6 +49,9 @@
 
 private MiniCluster miniCluster;
 
+@Param({"AlignedCheckpoint", "UnalignedCheckpoint"})

Review comment:
   Actually I tried the shorten name in early version, but i am wondering 
it might confuse external users, so i changed to the semantic name finally. 
Anyway, i have no preference here and will change it as you suggested. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot commented on pull request #13243: [FLINK-18992][table] Fix Table API renameColumns method annotation error

2020-08-25 Thread GitBox


flinkbot commented on pull request #13243:
URL: https://github.com/apache/flink/pull/13243#issuecomment-680478070


   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Automated Checks
   Last check on commit 7abb7dec5769d7d9569b17de97c477c989f67ed2 (Wed Aug 26 
03:12:56 UTC 2020)
   
   **Warnings:**
* No documentation files were touched! Remember to keep the Flink docs up 
to date!
* **This pull request references an unassigned [Jira 
ticket](https://issues.apache.org/jira/browse/FLINK-18992).** According to the 
[code contribution 
guide](https://flink.apache.org/contributing/contribute-code.html), tickets 
need to be assigned before starting with the implementation work.
   
   
   Mention the bot in a comment to re-run the automated checks.
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full 
explanation of the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (FLINK-18992) Table API renameColumns method annotation error

2020-08-25 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-18992?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-18992:
---
Labels: pull-request-available  (was: )

> Table API renameColumns method annotation error
> ---
>
> Key: FLINK-18992
> URL: https://issues.apache.org/jira/browse/FLINK-18992
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.11.1
>Reporter: tzxxh
>Priority: Major
>  Labels: pull-request-available
> Attachments: image-2020-08-19-09-07-06-405.png, 
> image-2020-08-19-09-07-28-148.png, image-2020-08-19-09-08-30-227.png
>
>
> !image-2020-08-19-09-07-28-148.png!
>  
> when I use this method
> !image-2020-08-19-09-08-30-227.png!



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] Fin-chan opened a new pull request #13243: [FLINK-18992][table] Fix Table API renameColumns method annotation error

2020-08-25 Thread GitBox


Fin-chan opened a new pull request #13243:
URL: https://github.com/apache/flink/pull/13243


   Fix Table API renameColumns method annotation error
   
   
   ## What is the purpose of the change
   
   *(For example: This pull request makes task deployment go through the blob 
server, rather than through RPC. That way we avoid re-transferring them on each 
deployment (during recovery).)*
   
   
   ## Brief change log
   
   *(for example:)*
 - *The TaskInfo is stored in the blob store on job creation time as a 
persistent artifact*
 - *Deployments RPC transmits only the blob storage reference*
 - *TaskManagers retrieve the TaskInfo from the blob cache*
   
   
   ## Verifying this change
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This change is already covered by existing tests, such as *(please describe 
tests)*.
   
   *(or)*
   
   This change added tests and can be verified as follows:
   
   *(example:)*
 - *Added integration tests for end-to-end deployment with large payloads 
(100MB)*
 - *Extended integration test for recovery after master (JobManager) 
failure*
 - *Added test that validates that TaskInfo is transferred only once across 
recoveries*
 - *Manually verified the change by running a 4 node cluser with 2 
JobManagers and 4 TaskManagers, a stateful streaming program, and killing one 
JobManager and two TaskManagers during the execution, verifying that recovery 
happens correctly.*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / no)
 - The serializers: (yes / no / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / no / 
don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: (yes / no / don't 
know)
 - The S3 file system connector: (yes / no / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / no)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (FLINK-19025) table sql write orc file but hive2.1.1 can not read

2020-08-25 Thread McClone (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19025?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17184887#comment-17184887
 ] 

McClone commented on FLINK-19025:
-

It really has problems.Flink hive Code use some hive code,but hive-exec 2.1.1 
can not provide .

> table sql write orc file but hive2.1.1 can not read
> ---
>
> Key: FLINK-19025
> URL: https://issues.apache.org/jira/browse/FLINK-19025
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / ORC
>Affects Versions: 1.11.0
>Reporter: McClone
>Priority: Major
>
> table sql write orc file but hive2.1.1 create external table can not read 
> data.Because flink use orc-core-1.5.6.jar but hive 2.1.1 use his own orcfile 
> jar.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot edited a comment on pull request #13186: [FLINK-18720][k8s] Migrate KubernetesResourceManager to the new KubernetesResourceManagerDriver

2020-08-25 Thread GitBox


flinkbot edited a comment on pull request #13186:
URL: https://github.com/apache/flink/pull/13186#issuecomment-675299227


   
   ## CI report:
   
   * 63c2f658cc309661c434451a5d8c0b38d2e748d3 Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5848)
 
   * 952d92954bb418aa30820b811bc5a41f76e9ab21 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (FLINK-19025) table sql write orc file but hive2.1.1 can not read

2020-08-25 Thread McClone (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19025?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17184885#comment-17184885
 ] 

McClone commented on FLINK-19025:
-

Rui Li , Now i use hive connector  , but flink-hive need PhysicalWriter.java 
and hive-exec-2.1.1 cannot provide . If i import orc-file.jar ,hive can not 
read table right.

> table sql write orc file but hive2.1.1 can not read
> ---
>
> Key: FLINK-19025
> URL: https://issues.apache.org/jira/browse/FLINK-19025
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / ORC
>Affects Versions: 1.11.0
>Reporter: McClone
>Priority: Major
>
> table sql write orc file but hive2.1.1 create external table can not read 
> data.Because flink use orc-core-1.5.6.jar but hive 2.1.1 use his own orcfile 
> jar.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] xintongsong commented on pull request #13186: [FLINK-18720][k8s] Migrate KubernetesResourceManager to the new KubernetesResourceManagerDriver

2020-08-25 Thread GitBox


xintongsong commented on pull request #13186:
URL: https://github.com/apache/flink/pull/13186#issuecomment-680452263


   Thanks for the review, @tillrohrmann.
   I replied about the `flinkClientConfig`. The rest of the comments are 
addressed.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] xintongsong commented on a change in pull request #13186: [FLINK-18720][k8s] Migrate KubernetesResourceManager to the new KubernetesResourceManagerDriver

2020-08-25 Thread GitBox


xintongsong commented on a change in pull request #13186:
URL: https://github.com/apache/flink/pull/13186#discussion_r476996403



##
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManagerDriver.java
##
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
+import 
org.apache.flink.kubernetes.configuration.KubernetesResourceManagerConfiguration;
+import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
+import 
org.apache.flink.kubernetes.kubeclient.factory.KubernetesTaskManagerFactory;
+import 
org.apache.flink.kubernetes.kubeclient.parameters.KubernetesTaskManagerParameters;
+import org.apache.flink.kubernetes.kubeclient.resources.KubernetesPod;
+import org.apache.flink.kubernetes.kubeclient.resources.KubernetesWatch;
+import org.apache.flink.kubernetes.utils.KubernetesUtils;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import 
org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
+import org.apache.flink.runtime.clusterframework.TaskExecutorProcessSpec;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.externalresource.ExternalResourceUtils;
+import 
org.apache.flink.runtime.resourcemanager.active.AbstractResourceManagerDriver;
+import org.apache.flink.runtime.resourcemanager.active.ResourceManagerDriver;
+import 
org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Implementation of {@link ResourceManagerDriver} for Kubernetes deployment.
+ */
+public class KubernetesResourceManagerDriver extends 
AbstractResourceManagerDriver
+   implements FlinkKubeClient.PodCallbackHandler {
+
+   /** The taskmanager pod name pattern is 
{clusterId}-{taskmanager}-{attemptId}-{podIndex}. */
+   private static final String TASK_MANAGER_POD_FORMAT = 
"%s-taskmanager-%d-%d";
+
+   private final String clusterId;
+
+   private final FlinkKubeClient kubeClient;
+
+   /** Request resource futures, keyed by pod names. */
+   private final Map> 
requestResourceFutures;
+
+   /** When ResourceManager failover, the max attempt should recover. */
+   private long currentMaxAttemptId = 0;
+
+   /** Current max pod index. When creating a new pod, it should increase 
one. */
+   private long currentMaxPodId = 0;
+
+   private KubernetesWatch podsWatch;
+
+   public KubernetesResourceManagerDriver(
+   Configuration flinkConfig,
+   FlinkKubeClient kubeClient,
+   KubernetesResourceManagerConfiguration configuration) {
+   super(flinkConfig, GlobalConfiguration.loadConfiguration());
+
+   this.clusterId = 
Preconditions.checkNotNull(configuration.getClusterId());
+   this.kubeClient = Preconditions.checkNotNull(kubeClient);
+   this.requestResourceFutures = new HashMap<>();
+   }
+
+   // 

+   //  ResourceManagerDriver
+   // 

+
+   @Override
+   protected void initializeInternal() throws Exception {
+   recoverWorkerNodesFromPreviousAttempts();
+
+   podsWatch = kubeClient.watchPodsAndDoCallback(
+   KubernetesUtils.getTaskManagerLabels(clusterId),
+ 

[GitHub] [flink] xintongsong commented on a change in pull request #13186: [FLINK-18720][k8s] Migrate KubernetesResourceManager to the new KubernetesResourceManagerDriver

2020-08-25 Thread GitBox


xintongsong commented on a change in pull request #13186:
URL: https://github.com/apache/flink/pull/13186#discussion_r476988674



##
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManagerDriver.java
##
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
+import 
org.apache.flink.kubernetes.configuration.KubernetesResourceManagerConfiguration;
+import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
+import 
org.apache.flink.kubernetes.kubeclient.factory.KubernetesTaskManagerFactory;
+import 
org.apache.flink.kubernetes.kubeclient.parameters.KubernetesTaskManagerParameters;
+import org.apache.flink.kubernetes.kubeclient.resources.KubernetesPod;
+import org.apache.flink.kubernetes.kubeclient.resources.KubernetesWatch;
+import org.apache.flink.kubernetes.utils.KubernetesUtils;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import 
org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
+import org.apache.flink.runtime.clusterframework.TaskExecutorProcessSpec;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.externalresource.ExternalResourceUtils;
+import 
org.apache.flink.runtime.resourcemanager.active.AbstractResourceManagerDriver;
+import org.apache.flink.runtime.resourcemanager.active.ResourceManagerDriver;
+import 
org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Implementation of {@link ResourceManagerDriver} for Kubernetes deployment.
+ */
+public class KubernetesResourceManagerDriver extends 
AbstractResourceManagerDriver
+   implements FlinkKubeClient.PodCallbackHandler {
+
+   /** The taskmanager pod name pattern is 
{clusterId}-{taskmanager}-{attemptId}-{podIndex}. */
+   private static final String TASK_MANAGER_POD_FORMAT = 
"%s-taskmanager-%d-%d";
+
+   private final String clusterId;
+
+   private final FlinkKubeClient kubeClient;
+
+   /** Request resource futures, keyed by pod names. */
+   private final Map> 
requestResourceFutures;
+
+   /** When ResourceManager failover, the max attempt should recover. */
+   private long currentMaxAttemptId = 0;
+
+   /** Current max pod index. When creating a new pod, it should increase 
one. */
+   private long currentMaxPodId = 0;
+
+   private KubernetesWatch podsWatch;
+
+   public KubernetesResourceManagerDriver(
+   Configuration flinkConfig,
+   FlinkKubeClient kubeClient,
+   KubernetesResourceManagerConfiguration configuration) {
+   super(flinkConfig, GlobalConfiguration.loadConfiguration());

Review comment:
   > The problem is that it is not guaranteed at this point that you can 
read a proper Flink configuration.
   That sounds to be a problem. Could you explain what might prevent reading a 
proper Flink configuration?
   
   > Moreover, you are missing all dynamically configured properties which are 
processed in the entrypoints.
   I think that is exactly what we want. Please see my other comment about 
`flinkClientConfig`.

##
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManagerDriver.java
##
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed 

[GitHub] [flink] xintongsong commented on a change in pull request #13186: [FLINK-18720][k8s] Migrate KubernetesResourceManager to the new KubernetesResourceManagerDriver

2020-08-25 Thread GitBox


xintongsong commented on a change in pull request #13186:
URL: https://github.com/apache/flink/pull/13186#discussion_r476986171



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/AbstractResourceManagerDriver.java
##
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.active;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.clusterframework.types.ResourceIDRetrievable;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Abstract common base class for implementations of {@link 
ResourceManagerDriver}.
+ */
+public abstract class AbstractResourceManagerDriver
+   implements ResourceManagerDriver {
+
+   protected final Logger log = LoggerFactory.getLogger(getClass());
+
+   protected final Configuration flinkConfig;
+   protected final Configuration flinkClientConfig;

Review comment:
   The background is that, RM will not send the complete configuration to 
the TMs, but only the differences compared to the original configuration 
submitted by the client.
   
   Currently, when submitting the job, client will ship the local 
`flink-conf.yaml` file. This file is available to both JM and TMs, through Yarn 
Shared Cache and Kubernetes ConfigMap.
   
   TM should not use that original `flink-conf.yaml` directly, because the 
configuration might be overwritten on the JM side (e.g., dynamic properties, JM 
address, TM resource, etc.). Therefore, RM will compare the effective 
configuration (`flinkConfig`) with the original one shipped from the client 
side (`flinkClientConfig`), and send the differences to TMs as dynamic 
properties, which overwrites the original configuration on the TM side.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] xintongsong commented on a change in pull request #13186: [FLINK-18720][k8s] Migrate KubernetesResourceManager to the new KubernetesResourceManagerDriver

2020-08-25 Thread GitBox


xintongsong commented on a change in pull request #13186:
URL: https://github.com/apache/flink/pull/13186#discussion_r476975877



##
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManagerDriver.java
##
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
+import 
org.apache.flink.kubernetes.configuration.KubernetesResourceManagerConfiguration;
+import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
+import 
org.apache.flink.kubernetes.kubeclient.factory.KubernetesTaskManagerFactory;
+import 
org.apache.flink.kubernetes.kubeclient.parameters.KubernetesTaskManagerParameters;
+import org.apache.flink.kubernetes.kubeclient.resources.KubernetesPod;
+import org.apache.flink.kubernetes.kubeclient.resources.KubernetesWatch;
+import org.apache.flink.kubernetes.utils.KubernetesUtils;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import 
org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
+import org.apache.flink.runtime.clusterframework.TaskExecutorProcessSpec;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.externalresource.ExternalResourceUtils;
+import 
org.apache.flink.runtime.resourcemanager.active.AbstractResourceManagerDriver;
+import org.apache.flink.runtime.resourcemanager.active.ResourceManagerDriver;
+import 
org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Implementation of {@link ResourceManagerDriver} for Kubernetes deployment.
+ */
+public class KubernetesResourceManagerDriver extends 
AbstractResourceManagerDriver
+   implements FlinkKubeClient.PodCallbackHandler {
+
+   /** The taskmanager pod name pattern is 
{clusterId}-{taskmanager}-{attemptId}-{podIndex}. */
+   private static final String TASK_MANAGER_POD_FORMAT = 
"%s-taskmanager-%d-%d";
+
+   private final String clusterId;
+
+   private final FlinkKubeClient kubeClient;
+
+   /** Request resource futures, keyed by pod names. */
+   private final Map> 
requestResourceFutures;
+
+   /** When ResourceManager failover, the max attempt should recover. */
+   private long currentMaxAttemptId = 0;
+
+   /** Current max pod index. When creating a new pod, it should increase 
one. */
+   private long currentMaxPodId = 0;
+
+   private KubernetesWatch podsWatch;
+
+   public KubernetesResourceManagerDriver(
+   Configuration flinkConfig,
+   FlinkKubeClient kubeClient,
+   KubernetesResourceManagerConfiguration configuration) {
+   super(flinkConfig, GlobalConfiguration.loadConfiguration());
+
+   this.clusterId = 
Preconditions.checkNotNull(configuration.getClusterId());
+   this.kubeClient = Preconditions.checkNotNull(kubeClient);
+   this.requestResourceFutures = new HashMap<>();
+   }
+
+   // 

+   //  ResourceManagerDriver
+   // 

+
+   @Override
+   protected void initializeInternal() throws Exception {
+   recoverWorkerNodesFromPreviousAttempts();
+
+   podsWatch = kubeClient.watchPodsAndDoCallback(
+   KubernetesUtils.getTaskManagerLabels(clusterId),
+ 

[GitHub] [flink] xintongsong commented on a change in pull request #13186: [FLINK-18720][k8s] Migrate KubernetesResourceManager to the new KubernetesResourceManagerDriver

2020-08-25 Thread GitBox


xintongsong commented on a change in pull request #13186:
URL: https://github.com/apache/flink/pull/13186#discussion_r476973166



##
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManagerDriver.java
##
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
+import 
org.apache.flink.kubernetes.configuration.KubernetesResourceManagerDriverConfiguration;
+import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
+import 
org.apache.flink.kubernetes.kubeclient.factory.KubernetesTaskManagerFactory;
+import 
org.apache.flink.kubernetes.kubeclient.parameters.KubernetesTaskManagerParameters;
+import org.apache.flink.kubernetes.kubeclient.resources.KubernetesPod;
+import org.apache.flink.kubernetes.kubeclient.resources.KubernetesWatch;
+import org.apache.flink.kubernetes.utils.KubernetesUtils;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import 
org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
+import org.apache.flink.runtime.clusterframework.TaskExecutorProcessSpec;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.externalresource.ExternalResourceUtils;
+import 
org.apache.flink.runtime.resourcemanager.active.AbstractResourceManagerDriver;
+import org.apache.flink.runtime.resourcemanager.active.ResourceManagerDriver;
+import 
org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Implementation of {@link ResourceManagerDriver} for Kubernetes deployment.
+ */
+public class KubernetesResourceManagerDriver extends 
AbstractResourceManagerDriver
+   implements FlinkKubeClient.PodCallbackHandler {
+
+   /** The taskmanager pod name pattern is 
{clusterId}-{taskmanager}-{attemptId}-{podIndex}. */
+   private static final String TASK_MANAGER_POD_FORMAT = 
"%s-taskmanager-%d-%d";
+
+   private final String clusterId;
+
+   private final Time podCreationRetryInterval;
+
+   private final FlinkKubeClient kubeClient;
+
+   /** Request resource futures, keyed by pod names. */
+   private final Map> 
requestResourceFutures;
+
+   /** When ResourceManager failover, the max attempt should recover. */
+   private long currentMaxAttemptId = 0;
+
+   /** Current max pod index. When creating a new pod, it should increase 
one. */
+   private long currentMaxPodId = 0;
+
+   private KubernetesWatch podsWatch;
+
+   /**
+* Incompletion of this future indicates that there was a pod creation 
failure recently and the driver should not
+* retry creating pods until the future become completed again. It's 
guaranteed to be modified in main thread.
+*/
+   private CompletableFuture podCreationCoolDown;
+
+   public KubernetesResourceManagerDriver(
+   Configuration flinkConfig,
+   FlinkKubeClient kubeClient,
+   KubernetesResourceManagerDriverConfiguration 
configuration) {
+   super(flinkConfig, GlobalConfiguration.loadConfiguration());
+
+   this.clusterId = 
Preconditions.checkNotNull(configuration.getClusterId());
+   this.podCreationRetryInterval = 
Preconditions.checkNotNull(configuration.getPodCreationRetryInterval());
+   this.kubeClient = Preconditions.checkNotNull(kubeClient);
+   

[GitHub] [flink] xintongsong commented on a change in pull request #13186: [FLINK-18720][k8s] Migrate KubernetesResourceManager to the new KubernetesResourceManagerDriver

2020-08-25 Thread GitBox


xintongsong commented on a change in pull request #13186:
URL: https://github.com/apache/flink/pull/13186#discussion_r476973166



##
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManagerDriver.java
##
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
+import 
org.apache.flink.kubernetes.configuration.KubernetesResourceManagerDriverConfiguration;
+import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
+import 
org.apache.flink.kubernetes.kubeclient.factory.KubernetesTaskManagerFactory;
+import 
org.apache.flink.kubernetes.kubeclient.parameters.KubernetesTaskManagerParameters;
+import org.apache.flink.kubernetes.kubeclient.resources.KubernetesPod;
+import org.apache.flink.kubernetes.kubeclient.resources.KubernetesWatch;
+import org.apache.flink.kubernetes.utils.KubernetesUtils;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import 
org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
+import org.apache.flink.runtime.clusterframework.TaskExecutorProcessSpec;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.externalresource.ExternalResourceUtils;
+import 
org.apache.flink.runtime.resourcemanager.active.AbstractResourceManagerDriver;
+import org.apache.flink.runtime.resourcemanager.active.ResourceManagerDriver;
+import 
org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Implementation of {@link ResourceManagerDriver} for Kubernetes deployment.
+ */
+public class KubernetesResourceManagerDriver extends 
AbstractResourceManagerDriver
+   implements FlinkKubeClient.PodCallbackHandler {
+
+   /** The taskmanager pod name pattern is 
{clusterId}-{taskmanager}-{attemptId}-{podIndex}. */
+   private static final String TASK_MANAGER_POD_FORMAT = 
"%s-taskmanager-%d-%d";
+
+   private final String clusterId;
+
+   private final Time podCreationRetryInterval;
+
+   private final FlinkKubeClient kubeClient;
+
+   /** Request resource futures, keyed by pod names. */
+   private final Map> 
requestResourceFutures;
+
+   /** When ResourceManager failover, the max attempt should recover. */
+   private long currentMaxAttemptId = 0;
+
+   /** Current max pod index. When creating a new pod, it should increase 
one. */
+   private long currentMaxPodId = 0;
+
+   private KubernetesWatch podsWatch;
+
+   /**
+* Incompletion of this future indicates that there was a pod creation 
failure recently and the driver should not
+* retry creating pods until the future become completed again. It's 
guaranteed to be modified in main thread.
+*/
+   private CompletableFuture podCreationCoolDown;
+
+   public KubernetesResourceManagerDriver(
+   Configuration flinkConfig,
+   FlinkKubeClient kubeClient,
+   KubernetesResourceManagerDriverConfiguration 
configuration) {
+   super(flinkConfig, GlobalConfiguration.loadConfiguration());
+
+   this.clusterId = 
Preconditions.checkNotNull(configuration.getClusterId());
+   this.podCreationRetryInterval = 
Preconditions.checkNotNull(configuration.getPodCreationRetryInterval());
+   this.kubeClient = Preconditions.checkNotNull(kubeClient);
+   

[jira] [Closed] (FLINK-19041) Add dependency management for ConnectedStream in Python DataStream API.

2020-08-25 Thread Hequn Cheng (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-19041?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hequn Cheng closed FLINK-19041.
---
Resolution: Resolved

> Add dependency management for ConnectedStream in Python DataStream API.
> ---
>
> Key: FLINK-19041
> URL: https://issues.apache.org/jira/browse/FLINK-19041
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python
>Affects Versions: 1.12.0
>Reporter: Shuiqiang Chen
>Assignee: Shuiqiang Chen
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.12.0
>
>
> We failed to set merged configurations into 
> DataStreamTwoInputPythonStatelessFunctionOperator when finally generating the 
> StreamGraph.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-19041) Add dependency management for ConnectedStream in Python DataStream API.

2020-08-25 Thread Hequn Cheng (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19041?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17184825#comment-17184825
 ] 

Hequn Cheng commented on FLINK-19041:
-

Resolved in 1.12.0 via 66797ac6352c38c98e310fd2b34aa39088a6eacb

> Add dependency management for ConnectedStream in Python DataStream API.
> ---
>
> Key: FLINK-19041
> URL: https://issues.apache.org/jira/browse/FLINK-19041
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python
>Affects Versions: 1.12.0
>Reporter: Shuiqiang Chen
>Assignee: Shuiqiang Chen
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.12.0
>
>
> We failed to set merged configurations into 
> DataStreamTwoInputPythonStatelessFunctionOperator when finally generating the 
> StreamGraph.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] hequn8128 merged pull request #13236: [FLINK-19041][python] Add dependency management for ConnectedStream i…

2020-08-25 Thread GitBox


hequn8128 merged pull request #13236:
URL: https://github.com/apache/flink/pull/13236


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] hequn8128 commented on pull request #13236: [FLINK-19041][python] Add dependency management for ConnectedStream i…

2020-08-25 Thread GitBox


hequn8128 commented on pull request #13236:
URL: https://github.com/apache/flink/pull/13236#issuecomment-680393618


   @shuiqiangchen Thanks a lot for fixing the bug. LGTM.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] zentol commented on a change in pull request #13163: [FLINK-16789][runtime] Support JMX RMI random port assign

2020-08-25 Thread GitBox


zentol commented on a change in pull request #13163:
URL: https://github.com/apache/flink/pull/13163#discussion_r476790261



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/management/JMXService.java
##
@@ -85,6 +86,9 @@ private static JMXServer 
startJMXServerWithPortRanges(Iterator ports) {
while (ports.hasNext() && successfullyStartedServer == null) {
JMXServer server = new JMXServer();
int port = ports.next();
+   if (port == 0) { // try poke with a random port when 
port is set to zero

Review comment:
   @walterddr Yes, I think we can close the PR. It is admittedly slightly 
less convenient to explicitly define a port range, but the users will also get 
more predictable behavior, and we don't have to deal with the added complexity.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] zentol commented on a change in pull request #13163: [FLINK-16789][runtime] Support JMX RMI random port assign

2020-08-25 Thread GitBox


zentol commented on a change in pull request #13163:
URL: https://github.com/apache/flink/pull/13163#discussion_r476645574



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/management/JMXService.java
##
@@ -85,6 +86,9 @@ private static JMXServer 
startJMXServerWithPortRanges(Iterator ports) {
while (ports.hasNext() && successfullyStartedServer == null) {
JMXServer server = new JMXServer();
int port = ports.next();
+   if (port == 0) { // try poke with a random port when 
port is set to zero

Review comment:
   > it would be better to do this conversion in the realm of the JMXServer
   
   Agreed.
   
   > Do we know that the startup period will be super long if we don't do this?
   
   It probably isn't long. In the port-probing version, you are essentially 
entering a race condition between all processes on this node. I would consider 
it unlikely you will have more than a hundred of TaskExecutors on a given node, 
and only at that point should it be relevant in the first place.
   I only suggested the special iterator to workaround the worst case scenario, 
something like the first 1000 ports already being allocated, where the current 
approach would likely fair miserably.
   But there clear downsides to having this be applied in general to all users 
of the NetUtils; it is _very_ convenient to define a port-range and be pretty 
much guaranteed that all processes start at the front, and form a continuous 
port sequence if present on a single node.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] walterddr commented on a change in pull request #13163: [FLINK-16789][runtime] Support JMX RMI random port assign

2020-08-25 Thread GitBox


walterddr commented on a change in pull request #13163:
URL: https://github.com/apache/flink/pull/13163#discussion_r476778503



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/management/JMXService.java
##
@@ -85,6 +86,9 @@ private static JMXServer 
startJMXServerWithPortRanges(Iterator ports) {
while (ports.hasNext() && successfullyStartedServer == null) {
JMXServer server = new JMXServer();
int port = ports.next();
+   if (port == 0) { // try poke with a random port when 
port is set to zero

Review comment:
   actually with some consideration, I dont think this PR is even necessary 
in a realistic stand point - setting the port to `"0"` vs setting it to some 
port range is pretty much equal in a platform job management standpoint. 
   
   WRT the random port assignment, yeah I am not sure that's actually needed - 
we didn't do a profiling. 
   I intent to close this PR and comment on the JIRA ticket if we all agree. 
(thank you so much for chiming in and sharing your thoughts @tillrohrmann 
@zentol  :-) )





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Comment Edited] (FLINK-18192) Upgrade to Avro version 1.9.2 from 1.8.2

2020-08-25 Thread Chesnay Schepler (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18192?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17183877#comment-17183877
 ] 

Chesnay Schepler edited comment on FLINK-18192 at 8/25/20, 9:31 PM:


[~twalthr] Could you find someone to gauge how difficult this would be?

I'm interested in this because of CVE-2019-10172 transitively affecting Avro 
1.8.X. I know that we expect users to provide whatever version they wish (and 
in fact if they can just use 1.9 it may not be a problem), but we should think 
about how/when to upgrade avro.


was (Author: zentol):
[~twalthr] Could you find someone to gauge how difficult this would be?

> Upgrade to Avro version 1.9.2 from 1.8.2
> 
>
> Key: FLINK-18192
> URL: https://issues.apache.org/jira/browse/FLINK-18192
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System, Formats (JSON, Avro, Parquet, ORC, 
> SequenceFile)
>Reporter: Lucas Heimberg
>Priority: Major
> Fix For: 1.12.0
>
>
> As of version 1.11, Flink (i.e., flink-avro) still uses Avro in version 1.8.2.
> Avro 1.9.2 contains many bugfixes, in particular in respect to the support 
> for logical types. A further advantage would be that an upgrade to Avro 1.9.2 
> would also allow to use the Confluent Schema Registry client and Avro 
> deserializer in version 5.5.0, which finally support schema references.
> Therefore it would be great if Flink could make use of Avro 1.9.2 or higher 
> in future releases.
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-18546) Upgrade to Kafka Schema Registry Client 5.5.0

2020-08-25 Thread Chesnay Schepler (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18546?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17184742#comment-17184742
 ] 

Chesnay Schepler commented on FLINK-18546:
--

How would this work in terms of backwards compatibility?

> Upgrade to Kafka Schema Registry Client 5.5.0
> -
>
> Key: FLINK-18546
> URL: https://issues.apache.org/jira/browse/FLINK-18546
> Project: Flink
>  Issue Type: Improvement
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Affects Versions: 1.11.0
>Reporter: Lucas Heimberg
>Priority: Major
>  Labels: Avro, avro
>
> As of version 1.11, Flink (i.e., flink-avro-confluent-registry) still uses 
> kafka-schema-registry-client in version 4.1.0.
> From version 5.5.0 on, the Kafka schema registry client supports resolving of 
> schema references (see 
> [https://docs.confluent.io/current/schema-registry/serdes-develop/index.html#referenced-schemas]),
>  i.e., schemas that are defined using schemas from other subjects. 
> Therefore it would be great if Flink could make use of 
> kafka-schema-registry-client in version 5.5.0 or higher in future releases.
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot edited a comment on pull request #13242: [FLINK-19009][metrics] Fixed the downtime metric issue and updated the comment

2020-08-25 Thread GitBox


flinkbot edited a comment on pull request #13242:
URL: https://github.com/apache/flink/pull/13242#issuecomment-680108384


   
   ## CI report:
   
   * fc6dc041ef06000c57767f9665c34ecc70cff6da Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5863)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] AHeise commented on a change in pull request #13228: [FLINK-19026][network] Improve threading model of CheckpointBarrierUnaligner

2020-08-25 Thread GitBox


AHeise commented on a change in pull request #13228:
URL: https://github.com/apache/flink/pull/13228#discussion_r476683415



##
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnaligner.java
##
@@ -92,306 +90,173 @@
super(toNotifyOnCheckpoint);
 
this.taskName = taskName;
-   hasInflightBuffers = Arrays.stream(inputGates)
+   this.inputGates = inputGates;
+   storeNewBuffers = Arrays.stream(inputGates)
.flatMap(gate -> gate.getChannelInfos().stream())
.collect(Collectors.toMap(Function.identity(), info -> 
false));
-   threadSafeUnaligner = new 
ThreadSafeUnaligner(checkNotNull(checkpointCoordinator), this, inputGates);
+   numOpenChannels = storeNewBuffers.size();
+   this.checkpointCoordinator = checkpointCoordinator;
}
 
-   /**
-* We still need to trigger checkpoint via {@link 
ThreadSafeUnaligner#notifyBarrierReceived(CheckpointBarrier, InputChannelInfo)}
-* while reading the first barrier from one channel, because this might 
happen
-* earlier than the previous async trigger via mailbox by netty thread.
-*
-* Note this is also suitable for the trigger case of local input 
channel.
-*/
@Override
-   public void processBarrier(CheckpointBarrier receivedBarrier, 
InputChannelInfo channelInfo) throws IOException {
-   long barrierId = receivedBarrier.getId();
-   if (currentConsumedCheckpointId > barrierId || 
(currentConsumedCheckpointId == barrierId && !isCheckpointPending())) {
+   public void processBarrier(CheckpointBarrier barrier, InputChannelInfo 
channelInfo) throws IOException {
+   long barrierId = barrier.getId();
+   if (currentCheckpointId > barrierId || (currentCheckpointId == 
barrierId && !isCheckpointPending())) {
// ignore old and cancelled barriers
return;
}
-   if (currentConsumedCheckpointId < barrierId) {
-   currentConsumedCheckpointId = barrierId;
-   numBarrierConsumed = 0;
-   hasInflightBuffers.entrySet().forEach(hasInflightBuffer 
-> hasInflightBuffer.setValue(true));
+   if (currentCheckpointId < barrierId) {
+   handleNewCheckpoint(barrier);
+   notifyCheckpoint(barrier, 0);
}
-   if (currentConsumedCheckpointId == barrierId) {
-   hasInflightBuffers.put(channelInfo, false);
-   numBarrierConsumed++;
+   if (currentCheckpointId == barrierId) {
+   if (storeNewBuffers.put(channelInfo, false)) {
+   LOG.debug("{}: Received barrier from channel {} 
@ {}.", taskName, channelInfo, barrierId);
+
+   
inputGates[channelInfo.getGateIdx()].getChannel(channelInfo.getInputChannelIdx())
+   .spillInflightBuffers(barrierId, 
checkpointCoordinator.getChannelStateWriter());
+
+   if (++numBarriersReceived == numOpenChannels) {
+   
allBarriersReceivedFuture.complete(null);
+   }
+   }
}
-   threadSafeUnaligner.notifyBarrierReceived(receivedBarrier, 
channelInfo);
}
 
@Override
public void abortPendingCheckpoint(long checkpointId, 
CheckpointException exception) throws IOException {
-   threadSafeUnaligner.tryAbortPendingCheckpoint(checkpointId, 
exception);
+   tryAbortPendingCheckpoint(checkpointId, exception);
 
-   if (checkpointId > currentConsumedCheckpointId) {
-   resetPendingCheckpoint(checkpointId);
+   if (checkpointId > currentCheckpointId) {
+   resetPendingCheckpoint();
}
}
 
@Override
public void processCancellationBarrier(CancelCheckpointMarker 
cancelBarrier) throws IOException {
final long cancelledId = cancelBarrier.getCheckpointId();
-   boolean shouldAbort = 
threadSafeUnaligner.setCancelledCheckpointId(cancelledId);
+   boolean shouldAbort = setCancelledCheckpointId(cancelledId);
if (shouldAbort) {
notifyAbort(
cancelledId,
new 
CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED_ON_CANCELLATION_BARRIER));
}
 
-   if (cancelledId >= currentConsumedCheckpointId) {
-   resetPendingCheckpoint(cancelledId);
-   currentConsumedCheckpointId = 

[GitHub] [flink] AHeise commented on a change in pull request #13228: [FLINK-19026][network] Improve threading model of CheckpointBarrierUnaligner

2020-08-25 Thread GitBox


AHeise commented on a change in pull request #13228:
URL: https://github.com/apache/flink/pull/13228#discussion_r476678903



##
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInput.java
##
@@ -159,10 +158,12 @@ public InputStatus emitNext(DataOutput output) throws 
Exception {
if (bufferOrEvent.isPresent()) {
// return to the mailbox after receiving a 
checkpoint barrier to avoid processing of
// data after the barrier before checkpoint is 
performed for unaligned checkpoint mode
-   if (bufferOrEvent.get().isEvent() && 
bufferOrEvent.get().getEvent() instanceof CheckpointBarrier) {
+   if (bufferOrEvent.get().isBuffer()) {
+   processBuffer(bufferOrEvent.get());
+   } else {
+   processEvent(bufferOrEvent.get());

Review comment:
   Nope, it's a refactoring that I should move out. We could also revert 
back. I had an intermediate version where `processEvent` signaled if the loop 
should be broken or not and there the split made more sense.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] morsapaes commented on a change in pull request #13203: [FLINK-18984][python][docs] Add tutorial documentation for Python DataStream API

2020-08-25 Thread GitBox


morsapaes commented on a change in pull request #13203:
URL: https://github.com/apache/flink/pull/13203#discussion_r476650361



##
File path: docs/dev/python/getting-started/tutorial/datastream_tutorial.md
##
@@ -0,0 +1,126 @@
+---
+title: "Python DataStream API Tutorial"
+nav-parent_id: python_tutorial
+nav-pos: 30
+---
+
+
+This walkthrough will quickly get you started building a pure Python Flink 
DataStream project.
+
+Please refer to the PyFlink [installation guide]({{ site.baseurl 
}}/dev/python/getting-started/installation.html) on how to set up the Python 
execution environments.
+
+* This will be replaced by the TOC
+{:toc}
+
+## Setting up a Python Project
+
+You can begin by creating a Python project and installing the PyFlink package 
following the [installation guide]({{ site.baseurl 
}}/dev/python/getting-started/installation.html#installation-of-pyflink).
+
+## Writing a Flink Python DataStream API Program
+
+DataStream API applications begin by declaring a `StreamExecutionEnvironment`.
+This is the context in which a streaming program is executed.
+It can be used for setting execution parameters such as restart strategy, 
default parallelism, etc.
+
+{% highlight python %}
+env = StreamExecutionEnvironment.get_execution_environment()
+env.set_parallelism(1)
+{% endhighlight %}
+
+Once a `StreamExecutionEnvironment` created, you can declare your source with 
it.
+
+{% highlight python %}
+ds = env.from_collection(
+collection=[(1, 'aaa'), (2, 'bbb')],
+type_info=Types.ROW([Types.INT(), Types.STRING()]))
+{% endhighlight %}
+
+This creates a data stream from the given collection. The type is that of the 
elements in the collection. In this example, the type is a Row type with two 
fields. The type of the first field is integer type while the second is string 
type.
+
+You can now perform transformations on the datastream or writes the data into 
external system with sink.

Review comment:
   ```suggestion
   You can now perform transformations on this data stream, or just write the 
data to an external system using a _sink_. This walkthrough uses the 
`StreamingFileSink` sink connector to write the data into a file in the 
`/tmp/output` directory.
   ```

##
File path: docs/dev/python/getting-started/tutorial/datastream_tutorial.md
##
@@ -0,0 +1,126 @@
+---
+title: "Python DataStream API Tutorial"
+nav-parent_id: python_tutorial
+nav-pos: 30
+---
+
+
+This walkthrough will quickly get you started building a pure Python Flink 
DataStream project.
+
+Please refer to the PyFlink [installation guide]({{ site.baseurl 
}}/dev/python/getting-started/installation.html) on how to set up the Python 
execution environments.

Review comment:
   Installation with `pip` is pretty straightforward, so why not just add 
this to the tutorial instead of making the user go to a different page?
   
   If we are restructuring these anyways, I'd suggest to follow the same 
structure as the existing tutorials: 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/try-flink/datastream_api.html

##
File path: docs/dev/python/getting-started/tutorial/datastream_tutorial.md
##
@@ -0,0 +1,126 @@
+---
+title: "Python DataStream API Tutorial"
+nav-parent_id: python_tutorial
+nav-pos: 30
+---
+
+
+This walkthrough will quickly get you started building a pure Python Flink 
DataStream project.
+
+Please refer to the PyFlink [installation guide]({{ site.baseurl 
}}/dev/python/getting-started/installation.html) on how to set up the Python 
execution environments.
+
+* This will be replaced by the TOC
+{:toc}
+
+## Setting up a Python Project
+
+You can begin by creating a Python project and installing the PyFlink package 
following the [installation guide]({{ site.baseurl 
}}/dev/python/getting-started/installation.html#installation-of-pyflink).
+
+## Writing a Flink Python DataStream API Program
+
+DataStream API applications begin by declaring a `StreamExecutionEnvironment`.
+This is the context in which a streaming program is executed.
+It can be used for setting execution parameters such as restart strategy, 
default parallelism, etc.
+
+{% highlight python %}
+env = StreamExecutionEnvironment.get_execution_environment()
+env.set_parallelism(1)
+{% endhighlight %}
+
+Once a `StreamExecutionEnvironment` created, you can declare your source with 
it.
+
+{% highlight python %}
+ds = env.from_collection(
+collection=[(1, 'aaa'), (2, 'bbb')],
+type_info=Types.ROW([Types.INT(), Types.STRING()]))
+{% endhighlight %}
+
+This creates a data stream from the given collection. The type is that of the 
elements in the collection. In this example, the type is a Row type with two 
fields. The type of the first field is integer type while the second is string 
type.
+
+You can now perform transformations on the datastream or writes the data into 
external system with sink.
+
+{% highlight python %}
+ds.add_sink(StreamingFileSink
+

[GitHub] [flink] AHeise commented on a change in pull request #13228: [FLINK-19026][network] Improve threading model of CheckpointBarrierUnaligner

2020-08-25 Thread GitBox


AHeise commented on a change in pull request #13228:
URL: https://github.com/apache/flink/pull/13228#discussion_r476677925



##
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointedInputGate.java
##
@@ -74,34 +106,34 @@ public CheckpointedInputGate(
}
 
@Override
-   public Optional pollNext() throws Exception {
-   while (true) {
-   Optional next = inputGate.pollNext();
+   public Optional pollNext() throws IOException, 
InterruptedException {
+   Optional next = inputGate.pollNext();
 
-   if (!next.isPresent()) {
-   return handleEmptyBuffer();
-   }
+   if (!next.isPresent()) {
+   return handleEmptyBuffer();
+   }
 
-   BufferOrEvent bufferOrEvent = next.get();
-   
checkState(!barrierHandler.isBlocked(bufferOrEvent.getChannelInfo()));
+   BufferOrEvent bufferOrEvent = next.get();
+   
checkState(!barrierHandler.isBlocked(bufferOrEvent.getChannelInfo()));
 
-   if (bufferOrEvent.isBuffer()) {
-   return next;
-   }
-   else if (bufferOrEvent.getEvent().getClass() == 
CheckpointBarrier.class) {
-   CheckpointBarrier checkpointBarrier = 
(CheckpointBarrier) bufferOrEvent.getEvent();
-   
barrierHandler.processBarrier(checkpointBarrier, 
bufferOrEvent.getChannelInfo());
-   return next;
-   }
-   else if (bufferOrEvent.getEvent().getClass() == 
CancelCheckpointMarker.class) {
-   
barrierHandler.processCancellationBarrier((CancelCheckpointMarker) 
bufferOrEvent.getEvent());
-   }
-   else {
-   if (bufferOrEvent.getEvent().getClass() == 
EndOfPartitionEvent.class) {
-   barrierHandler.processEndOfPartition();
-   }
-   return next;
-   }
+   if (bufferOrEvent.isEvent()) {
+   handleEvent(bufferOrEvent);
+   } else {
+   barrierHandler.processBuffer(bufferOrEvent.getBuffer(), 
bufferOrEvent.getChannelInfo());

Review comment:
   Yes, it's used in the next commit to persist in-flight data (replaces 
`notifyBufferReceived`).





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] AHeise commented on a change in pull request #13228: [FLINK-19026][network] Improve threading model of CheckpointBarrierUnaligner

2020-08-25 Thread GitBox


AHeise commented on a change in pull request #13228:
URL: https://github.com/apache/flink/pull/13228#discussion_r476674004



##
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointedInputGate.java
##
@@ -74,34 +106,34 @@ public CheckpointedInputGate(
}
 
@Override
-   public Optional pollNext() throws Exception {
-   while (true) {

Review comment:
   Yes, it's changing semantics (as I had written in commit message). I 
have not found a good reason why it's not always exited and it makes things 
easier especially since this method can now be used to process priority events.
   Btw I think it also changes semantics for all event that are not handled at 
all, but I'm not sure which events survive at this point (Superstep?).





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] AHeise commented on a change in pull request #13228: [FLINK-19026][network] Improve threading model of CheckpointBarrierUnaligner

2020-08-25 Thread GitBox


AHeise commented on a change in pull request #13228:
URL: https://github.com/apache/flink/pull/13228#discussion_r476675096



##
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointedInputGate.java
##
@@ -63,9 +66,38 @@
 */
public CheckpointedInputGate(
InputGate inputGate,
-   CheckpointBarrierHandler barrierHandler) {
+   CheckpointBarrierHandler barrierHandler,
+   MailboxExecutor mailboxExecutor) {
this.inputGate = inputGate;
this.barrierHandler = barrierHandler;
+   this.mailboxExecutor = mailboxExecutor;
+
+   waitForPriorityEvents(inputGate, mailboxExecutor);
+   }
+
+   /**
+* Eagerly pulls and processes all priority events. Must be called from 
task thread.
+*
+* Basic assumption is that no priority event needs to be handled by 
the {@link StreamTaskNetworkInput}.
+*/
+   private void processPriorityEvents() throws IOException, 
InterruptedException {
+   // check if the priority event is still not processed (could 
have been pulled before mail was being executed)
+   final boolean hasPriorityEvents = 
inputGate.getPriorityEventAvailableFuture().isDone();
+   if (hasPriorityEvents) {
+   // process as many priority events as possible
+   while 
(pollNext().map(BufferOrEvent::morePriorityEvents).orElse(false)) {
+   }

Review comment:
   Yes, first this method checks if there is at least one priority event 
(priority future completed). If there is at least one, it starts processing the 
first one. At this point, it relies on `BufferOrEvent::morePriorityEvents` to 
be correct in both directions (no false positives or negatives; although a 
false negative would just be a tad slower).





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] AHeise commented on a change in pull request #13228: [FLINK-19026][network] Improve threading model of CheckpointBarrierUnaligner

2020-08-25 Thread GitBox


AHeise commented on a change in pull request #13228:
URL: https://github.com/apache/flink/pull/13228#discussion_r476674004



##
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointedInputGate.java
##
@@ -74,34 +106,34 @@ public CheckpointedInputGate(
}
 
@Override
-   public Optional pollNext() throws Exception {
-   while (true) {

Review comment:
   Yes, it's changing semantics (as written in commit message). I have not 
found a good reason why it's not always exited and it makes things easier 
especially since this method can now be used to process priority events.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] AHeise commented on a change in pull request #13228: [FLINK-19026][network] Improve threading model of CheckpointBarrierUnaligner

2020-08-25 Thread GitBox


AHeise commented on a change in pull request #13228:
URL: https://github.com/apache/flink/pull/13228#discussion_r476673355



##
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointedInputGate.java
##
@@ -63,9 +66,38 @@
 */
public CheckpointedInputGate(
InputGate inputGate,
-   CheckpointBarrierHandler barrierHandler) {
+   CheckpointBarrierHandler barrierHandler,
+   MailboxExecutor mailboxExecutor) {
this.inputGate = inputGate;
this.barrierHandler = barrierHandler;
+   this.mailboxExecutor = mailboxExecutor;
+
+   waitForPriorityEvents(inputGate, mailboxExecutor);
+   }
+
+   /**
+* Eagerly pulls and processes all priority events. Must be called from 
task thread.
+*
+* Basic assumption is that no priority event needs to be handled by 
the {@link StreamTaskNetworkInput}.
+*/
+   private void processPriorityEvents() throws IOException, 
InterruptedException {
+   // check if the priority event is still not processed (could 
have been pulled before mail was being executed)
+   final boolean hasPriorityEvents = 
inputGate.getPriorityEventAvailableFuture().isDone();
+   if (hasPriorityEvents) {
+   // process as many priority events as possible
+   while 
(pollNext().map(BufferOrEvent::morePriorityEvents).orElse(false)) {
+   }
+   }
+
+   // re-enqueue mail to process priority events
+   waitForPriorityEvents(inputGate, mailboxExecutor);
+   }
+
+   private void waitForPriorityEvents(InputGate inputGate, MailboxExecutor 
mailboxExecutor) {
+   final CompletableFuture priorityEventAvailableFuture = 
inputGate.getPriorityEventAvailableFuture();
+   priorityEventAvailableFuture.thenRun(() -> {
+   mailboxExecutor.execute(this::processPriorityEvents, 
"process priority even @ gate %s", inputGate);
+   });

Review comment:
   1. Nope, this assumption does not hold. That's why the first thing that 
`processPriorityEvents` does is to check if the future is still completed. If 
the task polled the only priority event in the meantime, the future has been 
reset. During the execution of `processPriorityEvents` in the task thread, the 
task cannot concurrently pull the priority event, so this is safe.
   2.+3. The basic idea of not involving `StreamTaskNetworkInput#emitNext` or 
using `pollNext()` is to not make non-blocking output more complicated. 
Currently, `emitNext` or `pollNext` are only called when an output buffer is 
available. In the meantime only mails are processed. Hence, I used a mail to 
perform `processPriorityEvents`.
   Note that the assumption here is that no priority event ever need to be 
handled in `emitNext` (which currently only handles `EndOfPartitionEvent`)





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] AHeise commented on a change in pull request #13228: [FLINK-19026][network] Improve threading model of CheckpointBarrierUnaligner

2020-08-25 Thread GitBox


AHeise commented on a change in pull request #13228:
URL: https://github.com/apache/flink/pull/13228#discussion_r476665640



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
##
@@ -195,28 +193,27 @@ void retriggerSubpartitionRequest(int subpartitionIndex) 
throws IOException {
}
 
@Override
-   public void spillInflightBuffers(long checkpointId, ChannelStateWriter 
channelStateWriter) throws IOException {
+   public void spillInflightBuffers(long checkpointId, ChannelStateWriter 
channelStateWriter) {
synchronized (receivedBuffers) {
-   checkState(checkpointId > lastRequestedCheckpointId, 
"Need to request the next checkpointId");
-
-   final List inflightBuffers = new 
ArrayList<>(receivedBuffers.size());
-   for (Buffer buffer : receivedBuffers) {
-   CheckpointBarrier checkpointBarrier = 
parseCheckpointBarrierOrNull(buffer);
-   if (checkpointBarrier != null && 
checkpointBarrier.getId() >= checkpointId) {
-   break;
+   final Integer numRecords = 
numRecordsOvertaken.remove(checkpointId);

Review comment:
   Good catch, a leak could happen when the checkpoint is cancelled through 
another channel. The map itself is rather small, but it could add up over all 
channels and gates.
   I don't have a good idea on how to properly abstract this cleanup except by 
adding some kind of checkpoint-cancelled hook though. 
   Alternatively, checkpoint barrier handler becomes more aware of the buffers 
to be spilled. So instead of calling `channel.spillInflightBuffers`, it could 
be `channel.getSpilledBuffers().forEach(channelStateWriter::write)` on a good 
checkpoint and `channel.getSpilledBuffers().forEach(Buffer::recycle)` on 
cancelled checkpoints, where `getSpilledBuffers` always cleans up this map.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] AHeise commented on a change in pull request #13228: [FLINK-19026][network] Improve threading model of CheckpointBarrierUnaligner

2020-08-25 Thread GitBox


AHeise commented on a change in pull request #13228:
URL: https://github.com/apache/flink/pull/13228#discussion_r476662192



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
##
@@ -770,34 +808,50 @@ void triggerPartitionStateCheck(ResultPartitionID 
partitionId) {
}));
}
 
-   private void queueChannel(InputChannel channel) {
-   int availableChannels;
+   private void queueChannel(InputChannel channel, boolean priority) {

Review comment:
   Well this change is less about expressing priority events and more about 
making sure that channels with priority events are always polled first. It's 
some kind of potential double notification, where the priority notification 
overrides the normal data available notification.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] AHeise commented on a change in pull request #13228: [FLINK-19026][network] Improve threading model of CheckpointBarrierUnaligner

2020-08-25 Thread GitBox


AHeise commented on a change in pull request #13228:
URL: https://github.com/apache/flink/pull/13228#discussion_r476660758



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedSequenceNumberingViewReader.java
##
@@ -133,14 +136,14 @@ public boolean isAvailable() {
 * @param bufferAndBacklog
 *  current buffer and backlog including information about 
the next buffer
 */
-   private boolean isAvailable(BufferAndBacklog bufferAndBacklog) {
+   @Nullable
+   private Buffer.DataType getNextDataType(BufferAndBacklog 
bufferAndBacklog) {
// BEWARE: this must be in sync with #isAvailable()!
-   if (numCreditsAvailable > 0) {
-   return bufferAndBacklog.isDataAvailable();
-   }
-   else {
-   return bufferAndBacklog.isEventAvailable();
+   final Buffer.DataType nextDataType = 
bufferAndBacklog.getNextDataType();
+   if (numCreditsAvailable > 0 || (nextDataType != null && 
nextDataType.isEvent())) {
+   return nextDataType;
}
+   return null;

Review comment:
   An enum type NONE would work for me and might make the code a bit 
clearer. However, be aware that this is mostly a copy I don't think it 
would simplify any code path.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] AHeise commented on a change in pull request #13228: [FLINK-19026][network] Improve threading model of CheckpointBarrierUnaligner

2020-08-25 Thread GitBox


AHeise commented on a change in pull request #13228:
URL: https://github.com/apache/flink/pull/13228#discussion_r476660921



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
##
@@ -621,61 +626,84 @@ public boolean isFinished() {
return Optional.of(transformToBufferOrEvent(
inputWithData.data.buffer(),
inputWithData.moreAvailable,
-   inputWithData.input));
+   inputWithData.input,
+   inputWithData.morePriorityEvents));
}
 
private Optional> 
waitAndGetNextData(boolean blocking)
throws IOException, InterruptedException {
while (true) {
-   Optional inputChannel = 
getChannel(blocking);
-   if (!inputChannel.isPresent()) {
+   Optional inputChannelOpt = 
getChannel(blocking);
+   if (!inputChannelOpt.isPresent()) {
return Optional.empty();
}
 
// Do not query inputChannel under the lock, to avoid 
potential deadlocks coming from
// notifications.
-   Optional result = 
inputChannel.get().getNextBuffer();
+   final InputChannel inputChannel = inputChannelOpt.get();
+   Optional 
bufferAndAvailabilityOpt = inputChannel.getNextBuffer();
 
synchronized (inputChannelsWithData) {
-   if (result.isPresent() && 
result.get().moreAvailable()) {
+   if (!bufferAndAvailabilityOpt.isPresent()) {
+   if (inputChannelsWithData.isEmpty()) {
+   
availabilityHelper.resetUnavailable();
+   }
+   continue;

Review comment:
   Sry, I will split.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] AHeise commented on a change in pull request #13228: [FLINK-19026][network] Improve threading model of CheckpointBarrierUnaligner

2020-08-25 Thread GitBox


AHeise commented on a change in pull request #13228:
URL: https://github.com/apache/flink/pull/13228#discussion_r476659669



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
##
@@ -171,19 +175,42 @@ private void handleAddingBarrier(BufferConsumer 
bufferConsumer, boolean insertAs
checkState(inflightBufferSnapshot.isEmpty(), 
"Supporting only one concurrent checkpoint in unaligned " +
"checkpoints");
 
-   // Meanwhile prepare the collection of in-flight 
buffers which would be fetched in the next step later.
-   for (BufferConsumer buffer : buffers) {
-   try (BufferConsumer bc = buffer.copy()) {
-   if (bc.isBuffer()) {
-   
inflightBufferSnapshot.add(bc.build());
+   final int pos = buffers.getNumPriorityElements();
+   buffers.addPriorityElement(bufferConsumer);
+
+   boolean unalignedCheckpoint = 
isUnalignedCheckpoint(bufferConsumer);
+   if (unalignedCheckpoint) {
+   final Iterator iterator = 
buffers.iterator();
+   Iterators.advance(iterator, pos + 1);
+   while (iterator.hasNext()) {
+   BufferConsumer buffer = iterator.next();
+
+   if (buffer.isBuffer()) {
+   try (BufferConsumer bc = 
buffer.copy()) {
+   
inflightBufferSnapshot.add(bc.build());
+   }
}
}
}
+   return;
+   }

Review comment:
   In general, I wanted to drop the assumption that there is only one 
priority event going on at any given time. That's especially true when we make 
cancellation events also a priority and we have a more or less fully blocked 
channel.
   
   Specifically, this change had following motivations:
   * drop the assumption that all priority events are unaligned checkpoints.
   * drop the assumption that the new priority event is always at position 0.
   * a small performance improvement where buffers are only copied after it's 
clear that they are not containing an event.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] AHeise commented on a change in pull request #13228: [FLINK-19026][network] Improve threading model of CheckpointBarrierUnaligner

2020-08-25 Thread GitBox


AHeise commented on a change in pull request #13228:
URL: https://github.com/apache/flink/pull/13228#discussion_r476657231



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
##
@@ -171,19 +175,42 @@ private void handleAddingBarrier(BufferConsumer 
bufferConsumer, boolean insertAs
checkState(inflightBufferSnapshot.isEmpty(), 
"Supporting only one concurrent checkpoint in unaligned " +
"checkpoints");
 
-   // Meanwhile prepare the collection of in-flight 
buffers which would be fetched in the next step later.
-   for (BufferConsumer buffer : buffers) {
-   try (BufferConsumer bc = buffer.copy()) {
-   if (bc.isBuffer()) {
-   
inflightBufferSnapshot.add(bc.build());
+   final int pos = buffers.getNumPriorityElements();
+   buffers.addPriorityElement(bufferConsumer);
+
+   boolean unalignedCheckpoint = 
isUnalignedCheckpoint(bufferConsumer);
+   if (unalignedCheckpoint) {
+   final Iterator iterator = 
buffers.iterator();
+   Iterators.advance(iterator, pos + 1);
+   while (iterator.hasNext()) {
+   BufferConsumer buffer = iterator.next();
+
+   if (buffer.isBuffer()) {
+   try (BufferConsumer bc = 
buffer.copy()) {
+   
inflightBufferSnapshot.add(bc.build());
+   }
}
}
}
+   return;
+   }
+   buffers.add(bufferConsumer);

Review comment:
   Yes, good idea. In general that change looks a bit odd, because it's 
isolated from the upcoming changes (I had to split somewhere and probably 
didn't hit the sweet spot everywhere).





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] zentol commented on a change in pull request #13163: [FLINK-16789][runtime] Support JMX RMI random port assign

2020-08-25 Thread GitBox


zentol commented on a change in pull request #13163:
URL: https://github.com/apache/flink/pull/13163#discussion_r476645574



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/management/JMXService.java
##
@@ -85,6 +86,9 @@ private static JMXServer 
startJMXServerWithPortRanges(Iterator ports) {
while (ports.hasNext() && successfullyStartedServer == null) {
JMXServer server = new JMXServer();
int port = ports.next();
+   if (port == 0) { // try poke with a random port when 
port is set to zero

Review comment:
   > it would be better to do this conversion in the realm of the JMXServer
   Agreed.
   
   > Do we know that the startup period will be super long if we don't do this?
   It probably isn't long. In the port-probing version, you are essentially 
entering a race condition between all processes on this node. I would consider 
it unlikely you will have more than a hundred of TaskExecutors on a given node, 
and only at that point should it be relevant in the first place.
   I only suggested the special iterator to workaround the worst case scenario, 
something like the first 1000 ports already being allocated, where the current 
approach would likely fair miserably.
   But there clear downsides to having this be applied in general to all users 
of the NetUtils; it is _very_ convenient to define a port-range and be pretty 
much guaranteed that all processes start at the front, and form a continuous 
port sequence if present on a single node.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #13239: [FLINK-19021][network] Various cleanups and refactorings of the ResultPartition

2020-08-25 Thread GitBox


flinkbot edited a comment on pull request #13239:
URL: https://github.com/apache/flink/pull/13239#issuecomment-679946229


   
   ## CI report:
   
   * fbf7c39bb2d6cb79f52a93efabe310cb2bafdee9 Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5858)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink-training] alpinegizmo merged pull request #13: [FLINK-18630] Redo the LongRideAlerts solutions to cover a corner case

2020-08-25 Thread GitBox


alpinegizmo merged pull request #13:
URL: https://github.com/apache/flink-training/pull/13


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #13240: [FLINK-19050][Documentation]

2020-08-25 Thread GitBox


flinkbot edited a comment on pull request #13240:
URL: https://github.com/apache/flink/pull/13240#issuecomment-680007217


   
   ## CI report:
   
   * 11e18bf9587cfcc3a948ead892c826114a03bb4c Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5857)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #13236: [FLINK-19041][python] Add dependency management for ConnectedStream i…

2020-08-25 Thread GitBox


flinkbot edited a comment on pull request #13236:
URL: https://github.com/apache/flink/pull/13236#issuecomment-679860054


   
   ## CI report:
   
   * 5ed0da03dffc085997903aa6868e97ea98340433 Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5856)
 Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5842)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #13241: [hotfix] Fix a typo in the watermark docs

2020-08-25 Thread GitBox


flinkbot edited a comment on pull request #13241:
URL: https://github.com/apache/flink/pull/13241#issuecomment-680066941


   
   ## CI report:
   
   * b8271b12c25e16aab38b9beb58d0f29835bee01a Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5859)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #13034: [FLINK-9992][tests] Fix FsStorageLocationReferenceTest#testEncodeAndDecode by adding retries to generate a valid path

2020-08-25 Thread GitBox


flinkbot edited a comment on pull request #13034:
URL: https://github.com/apache/flink/pull/13034#issuecomment-666932731


   
   ## CI report:
   
   * db2c909c2b516c253c5f4f2afc1a0d200df72d85 Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5854)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] pnowojski commented on a change in pull request #13228: [FLINK-19026][network] Improve threading model of CheckpointBarrierUnaligner

2020-08-25 Thread GitBox


pnowojski commented on a change in pull request #13228:
URL: https://github.com/apache/flink/pull/13228#discussion_r476481681



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedSequenceNumberingViewReader.java
##
@@ -133,14 +136,14 @@ public boolean isAvailable() {
 * @param bufferAndBacklog
 *  current buffer and backlog including information about 
the next buffer
 */
-   private boolean isAvailable(BufferAndBacklog bufferAndBacklog) {
+   @Nullable
+   private Buffer.DataType getNextDataType(BufferAndBacklog 
bufferAndBacklog) {
// BEWARE: this must be in sync with #isAvailable()!
-   if (numCreditsAvailable > 0) {
-   return bufferAndBacklog.isDataAvailable();
-   }
-   else {
-   return bufferAndBacklog.isEventAvailable();
+   final Buffer.DataType nextDataType = 
bufferAndBacklog.getNextDataType();
+   if (numCreditsAvailable > 0 || (nextDataType != null && 
nextDataType.isEvent())) {
+   return nextDataType;
}
+   return null;

Review comment:
   hmmm, maybe add another enum type for this purpose, instead of having 
`null`? (I'm not sure, just brain storming)





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] pnowojski commented on a change in pull request #13228: [FLINK-19026][network] Improve threading model of CheckpointBarrierUnaligner

2020-08-25 Thread GitBox


pnowojski commented on a change in pull request #13228:
URL: https://github.com/apache/flink/pull/13228#discussion_r476498494



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
##
@@ -621,61 +626,84 @@ public boolean isFinished() {
return Optional.of(transformToBufferOrEvent(
inputWithData.data.buffer(),
inputWithData.moreAvailable,
-   inputWithData.input));
+   inputWithData.input,
+   inputWithData.morePriorityEvents));
}
 
private Optional> 
waitAndGetNextData(boolean blocking)
throws IOException, InterruptedException {
while (true) {
-   Optional inputChannel = 
getChannel(blocking);
-   if (!inputChannel.isPresent()) {
+   Optional inputChannelOpt = 
getChannel(blocking);
+   if (!inputChannelOpt.isPresent()) {
return Optional.empty();
}
 
// Do not query inputChannel under the lock, to avoid 
potential deadlocks coming from
// notifications.
-   Optional result = 
inputChannel.get().getNextBuffer();
+   final InputChannel inputChannel = inputChannelOpt.get();
+   Optional 
bufferAndAvailabilityOpt = inputChannel.getNextBuffer();
 
synchronized (inputChannelsWithData) {
-   if (result.isPresent() && 
result.get().moreAvailable()) {
+   if (!bufferAndAvailabilityOpt.isPresent()) {
+   if (inputChannelsWithData.isEmpty()) {
+   
availabilityHelper.resetUnavailable();
+   }
+   continue;

Review comment:
   maybe if the `result` variable rename and adding `continue` branch had 
happened in an independent "refactor" commit, It would have saved me a couple 
of minutes while reading this code while I was trying to understand the change 
:( 
   
   maybe not, as I can see how the changes are a bit interconnected.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] pnowojski commented on a change in pull request #13228: [FLINK-19026][network] Improve threading model of CheckpointBarrierUnaligner

2020-08-25 Thread GitBox


pnowojski commented on a change in pull request #13228:
URL: https://github.com/apache/flink/pull/13228#discussion_r476480862



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedSequenceNumberingViewReader.java
##
@@ -133,14 +136,14 @@ public boolean isAvailable() {
 * @param bufferAndBacklog
 *  current buffer and backlog including information about 
the next buffer
 */
-   private boolean isAvailable(BufferAndBacklog bufferAndBacklog) {
+   @Nullable

Review comment:
   nit: add a javadoc explaining the returned value?

##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
##
@@ -171,19 +175,42 @@ private void handleAddingBarrier(BufferConsumer 
bufferConsumer, boolean insertAs
checkState(inflightBufferSnapshot.isEmpty(), 
"Supporting only one concurrent checkpoint in unaligned " +
"checkpoints");
 
-   // Meanwhile prepare the collection of in-flight 
buffers which would be fetched in the next step later.
-   for (BufferConsumer buffer : buffers) {
-   try (BufferConsumer bc = buffer.copy()) {
-   if (bc.isBuffer()) {
-   
inflightBufferSnapshot.add(bc.build());
+   final int pos = buffers.getNumPriorityElements();
+   buffers.addPriorityElement(bufferConsumer);
+
+   boolean unalignedCheckpoint = 
isUnalignedCheckpoint(bufferConsumer);
+   if (unalignedCheckpoint) {
+   final Iterator iterator = 
buffers.iterator();
+   Iterators.advance(iterator, pos + 1);
+   while (iterator.hasNext()) {
+   BufferConsumer buffer = iterator.next();
+
+   if (buffer.isBuffer()) {
+   try (BufferConsumer bc = 
buffer.copy()) {
+   
inflightBufferSnapshot.add(bc.build());
+   }
}
}
}
+   return;
+   }

Review comment:
   Why do we need this change? In what scenarios are you expecting more 
than one priority event in the output buffer?  (if there is a reason that I'm 
forgetting about, please add it to the commit message)
   
   edit: (after reading commit message a couple of times) Or you are just 
re-using here a class, that you are mostly intending to use later in the future 
(on the inputs?)? If so maybe it needs some more explanation in the commit 
message?

##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
##
@@ -195,28 +193,27 @@ void retriggerSubpartitionRequest(int subpartitionIndex) 
throws IOException {
}
 
@Override
-   public void spillInflightBuffers(long checkpointId, ChannelStateWriter 
channelStateWriter) throws IOException {
+   public void spillInflightBuffers(long checkpointId, ChannelStateWriter 
channelStateWriter) {
synchronized (receivedBuffers) {
-   checkState(checkpointId > lastRequestedCheckpointId, 
"Need to request the next checkpointId");
-
-   final List inflightBuffers = new 
ArrayList<>(receivedBuffers.size());
-   for (Buffer buffer : receivedBuffers) {
-   CheckpointBarrier checkpointBarrier = 
parseCheckpointBarrierOrNull(buffer);
-   if (checkpointBarrier != null && 
checkpointBarrier.getId() >= checkpointId) {
-   break;
+   final Integer numRecords = 
numRecordsOvertaken.remove(checkpointId);

Review comment:
   shouldn't we remove also obsolete values from this map? (to prevent a 
potential memory leak?)

##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
##
@@ -171,19 +175,42 @@ private void handleAddingBarrier(BufferConsumer 
bufferConsumer, boolean insertAs
checkState(inflightBufferSnapshot.isEmpty(), 
"Supporting only one concurrent checkpoint in unaligned " +
"checkpoints");
 
-   // Meanwhile prepare the collection of in-flight 
buffers which would be fetched in the next step later.
-   for (BufferConsumer buffer : buffers) {
-   try (BufferConsumer bc = buffer.copy()) {
-  

[GitHub] [flink] flinkbot edited a comment on pull request #13242: [FLINK-19009][metrics] Fixed the downtime metric issue and updated the comment

2020-08-25 Thread GitBox


flinkbot edited a comment on pull request #13242:
URL: https://github.com/apache/flink/pull/13242#issuecomment-680108384


   
   ## CI report:
   
   * fc6dc041ef06000c57767f9665c34ecc70cff6da Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5863)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (FLINK-19005) used metaspace grow on every execution

2020-08-25 Thread Jira


 [ 
https://issues.apache.org/jira/browse/FLINK-19005?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guillermo Sánchez updated FLINK-19005:
--
Attachment: (was: 0_executions.zip)

> used metaspace grow on every execution
> --
>
> Key: FLINK-19005
> URL: https://issues.apache.org/jira/browse/FLINK-19005
> Project: Flink
>  Issue Type: Bug
>  Components: Client / Job Submission, Runtime / Configuration, 
> Runtime / Coordination
>Affects Versions: 1.11.1
>Reporter: Guillermo Sánchez
>Assignee: Chesnay Schepler
>Priority: Major
> Attachments: heap_dump_after_10_executions.zip, 
> heap_dump_after_1_execution.zip, heap_dump_echo_lee.tar.xz
>
>
> Hi !
> Im running a 1.11.1 flink cluster, where I execute batch jobs made with 
> DataSet API.
> I submit these jobs every day to calculate daily data.
> In every execution, cluster's used metaspace increase by 7MB and its never 
> released.
> This ends up with an OutOfMemoryError caused by Metaspace every 15 days and i 
> need to restart the cluster to clean the metaspace
> taskmanager.memory.jvm-metaspace.size is set to 512mb
> Any idea of what could be causing this metaspace grow and why is it not 
> released ?
>  
> 
> === Summary ==
> 
> Case 1, reported by [~gestevez]:
> * Flink 1.11.1
> * Java 11
> * Maximum Metaspace size set to 512mb
> * Custom Batch job, submitted daily
> * Requires restart every 15 days after an OOM
>  Case 2, reported by [~Echo Lee]:
> * Flink 1.11.0
> * Java 11
> * G1GC
> * WordCount Batch job, submitted every second / every 5 minutes
> * eventually fails TaskExecutor with OOM
> Case 3, reported by [~DaDaShen]
> * Flink 1.11.0
> * Java 11
> * WordCount Batch job, submitted every 5 seconds
> * growing Metaspace, eventually OOM
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-19005) used metaspace grow on every execution

2020-08-25 Thread Jira


 [ 
https://issues.apache.org/jira/browse/FLINK-19005?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guillermo Sánchez updated FLINK-19005:
--
Attachment: (was: WordCount_1_execution.zip)

> used metaspace grow on every execution
> --
>
> Key: FLINK-19005
> URL: https://issues.apache.org/jira/browse/FLINK-19005
> Project: Flink
>  Issue Type: Bug
>  Components: Client / Job Submission, Runtime / Configuration, 
> Runtime / Coordination
>Affects Versions: 1.11.1
>Reporter: Guillermo Sánchez
>Assignee: Chesnay Schepler
>Priority: Major
> Attachments: heap_dump_after_10_executions.zip, 
> heap_dump_after_1_execution.zip, heap_dump_echo_lee.tar.xz
>
>
> Hi !
> Im running a 1.11.1 flink cluster, where I execute batch jobs made with 
> DataSet API.
> I submit these jobs every day to calculate daily data.
> In every execution, cluster's used metaspace increase by 7MB and its never 
> released.
> This ends up with an OutOfMemoryError caused by Metaspace every 15 days and i 
> need to restart the cluster to clean the metaspace
> taskmanager.memory.jvm-metaspace.size is set to 512mb
> Any idea of what could be causing this metaspace grow and why is it not 
> released ?
>  
> 
> === Summary ==
> 
> Case 1, reported by [~gestevez]:
> * Flink 1.11.1
> * Java 11
> * Maximum Metaspace size set to 512mb
> * Custom Batch job, submitted daily
> * Requires restart every 15 days after an OOM
>  Case 2, reported by [~Echo Lee]:
> * Flink 1.11.0
> * Java 11
> * G1GC
> * WordCount Batch job, submitted every second / every 5 minutes
> * eventually fails TaskExecutor with OOM
> Case 3, reported by [~DaDaShen]
> * Flink 1.11.0
> * Java 11
> * WordCount Batch job, submitted every 5 seconds
> * growing Metaspace, eventually OOM
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-19005) used metaspace grow on every execution

2020-08-25 Thread Jira


 [ 
https://issues.apache.org/jira/browse/FLINK-19005?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guillermo Sánchez updated FLINK-19005:
--
Attachment: (was: WordCount_15_executions.zip)

> used metaspace grow on every execution
> --
>
> Key: FLINK-19005
> URL: https://issues.apache.org/jira/browse/FLINK-19005
> Project: Flink
>  Issue Type: Bug
>  Components: Client / Job Submission, Runtime / Configuration, 
> Runtime / Coordination
>Affects Versions: 1.11.1
>Reporter: Guillermo Sánchez
>Assignee: Chesnay Schepler
>Priority: Major
> Attachments: heap_dump_after_10_executions.zip, 
> heap_dump_after_1_execution.zip, heap_dump_echo_lee.tar.xz
>
>
> Hi !
> Im running a 1.11.1 flink cluster, where I execute batch jobs made with 
> DataSet API.
> I submit these jobs every day to calculate daily data.
> In every execution, cluster's used metaspace increase by 7MB and its never 
> released.
> This ends up with an OutOfMemoryError caused by Metaspace every 15 days and i 
> need to restart the cluster to clean the metaspace
> taskmanager.memory.jvm-metaspace.size is set to 512mb
> Any idea of what could be causing this metaspace grow and why is it not 
> released ?
>  
> 
> === Summary ==
> 
> Case 1, reported by [~gestevez]:
> * Flink 1.11.1
> * Java 11
> * Maximum Metaspace size set to 512mb
> * Custom Batch job, submitted daily
> * Requires restart every 15 days after an OOM
>  Case 2, reported by [~Echo Lee]:
> * Flink 1.11.0
> * Java 11
> * G1GC
> * WordCount Batch job, submitted every second / every 5 minutes
> * eventually fails TaskExecutor with OOM
> Case 3, reported by [~DaDaShen]
> * Flink 1.11.0
> * Java 11
> * WordCount Batch job, submitted every 5 seconds
> * growing Metaspace, eventually OOM
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot commented on pull request #13242: [FLINK-19009][metrics] Fixed the downtime metric issue and updated the comment

2020-08-25 Thread GitBox


flinkbot commented on pull request #13242:
URL: https://github.com/apache/flink/pull/13242#issuecomment-680108384


   
   ## CI report:
   
   * fc6dc041ef06000c57767f9665c34ecc70cff6da UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (FLINK-19013) Log start/end of state restoration

2020-08-25 Thread Chesnay Schepler (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-19013?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler reassigned FLINK-19013:


Assignee: Matt Wang

> Log start/end of state restoration
> --
>
> Key: FLINK-19013
> URL: https://issues.apache.org/jira/browse/FLINK-19013
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Reporter: Chesnay Schepler
>Assignee: Matt Wang
>Priority: Major
> Fix For: 1.12.0
>
>
> State restoration can take a significant amount of time if the state is large 
> enough, or in special cases like FLINK-19008.
> It would be useful for debugging if we'd log the start/end of 
> {{RestoreOperation#restore.}}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot commented on pull request #13242: [FLINK-19009][metrics] Fixed the downtime metric issue and updated the comment

2020-08-25 Thread GitBox


flinkbot commented on pull request #13242:
URL: https://github.com/apache/flink/pull/13242#issuecomment-680105014


   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Automated Checks
   Last check on commit fc6dc041ef06000c57767f9665c34ecc70cff6da (Tue Aug 25 
15:41:53 UTC 2020)
   
   **Warnings:**
* No documentation files were touched! Remember to keep the Flink docs up 
to date!
   
   
   Mention the bot in a comment to re-run the automated checks.
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full 
explanation of the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] aljoscha closed pull request #13237: [FLINK-17159] Harden ElasticsearchSinkITCase

2020-08-25 Thread GitBox


aljoscha closed pull request #13237:
URL: https://github.com/apache/flink/pull/13237


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] aljoscha commented on pull request #13237: [FLINK-17159] Harden ElasticsearchSinkITCase

2020-08-25 Thread GitBox


aljoscha commented on pull request #13237:
URL: https://github.com/apache/flink/pull/13237#issuecomment-680103938


   Merged! Thanks for the speedy review! (And for the (external) suggestion to 
use the liveness probe)



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (FLINK-19009) wrong way to calculate the "downtime" metric

2020-08-25 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-19009?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-19009:
---
Labels: pull-request-available  (was: )

> wrong way to calculate the "downtime" metric
> 
>
> Key: FLINK-19009
> URL: https://issues.apache.org/jira/browse/FLINK-19009
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination, Runtime / Metrics
>Affects Versions: 1.7.2, 1.8.0
>Reporter: Zhinan Cheng
>Assignee: kevin liu
>Priority: Trivial
>  Labels: pull-request-available
> Fix For: 1.12.0
>
>   Original Estimate: 1h
>  Remaining Estimate: 1h
>
> Currently the way to calculate the Flink system metric "downtime"  is not 
> consistent with the description in the doc, now the downtime is actually the 
> current timestamp minus the time timestamp when the job started.
>    
> But Flink doc (https://flink.apache.org/gettinghelp.html) obviously describes 
> the time as the current timestamp minus the timestamp when the job failed.
>  
> I believe we should update the code this metric as the Flink doc shows. The 
> easy way to solve this is using the current timestamp to minus the latest 
> uptime timestamp.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] Flyangz opened a new pull request #13242: [FLINK-19009][metrics] Fixed the downtime metric issue and updated the comment

2020-08-25 Thread GitBox


Flyangz opened a new pull request #13242:
URL: https://github.com/apache/flink/pull/13242


   
   
   ## What is the purpose of the change
   
   Currently the way to calculate the Flink system metric "downtime"  is not 
consistent with the description in the doc, now the downtime is actually the 
current timestamp minus the time timestamp when the job started. This pull 
request fixed this bug.
   
   ## Brief change log
   
   Calculate the downtime metric by using the current system timestamp to minus 
the latest failing time.
   
   ## Verifying this change
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? no
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Closed] (FLINK-17159) ES6 ElasticsearchSinkITCase unstable

2020-08-25 Thread Aljoscha Krettek (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-17159?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek closed FLINK-17159.

Fix Version/s: 1.11.2
   1.12.0
   Resolution: Fixed

Fix on release-1.11: 6c97f22913197e7a5a948f67b7a0b7f8fb480fa7

Please re-open if the issue persists.

> ES6 ElasticsearchSinkITCase unstable
> 
>
> Key: FLINK-17159
> URL: https://issues.apache.org/jira/browse/FLINK-17159
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / ElasticSearch, Tests
>Affects Versions: 1.11.0, 1.12.0
>Reporter: Chesnay Schepler
>Assignee: Aljoscha Krettek
>Priority: Critical
>  Labels: pull-request-available, test-stability
> Fix For: 1.12.0, 1.11.2
>
>
> [https://dev.azure.com/rmetzger/Flink/_build/results?buildId=7482=logs=64110e28-73be-50d7-9369-8750330e0bf1=aa84fb9a-59ae-5696-70f7-011bc086e59b]
> {code:java}
> 2020-04-15T02:37:04.4289477Z [ERROR] 
> testElasticsearchSinkWithSmile(org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSinkITCase)
>   Time elapsed: 0.145 s  <<< ERROR!
> 2020-04-15T02:37:04.4290310Z 
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
> 2020-04-15T02:37:04.4290790Z  at 
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147)
> 2020-04-15T02:37:04.4291404Z  at 
> org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:659)
> 2020-04-15T02:37:04.4291956Z  at 
> org.apache.flink.streaming.util.TestStreamEnvironment.execute(TestStreamEnvironment.java:77)
> 2020-04-15T02:37:04.4292548Z  at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1643)
> 2020-04-15T02:37:04.4293254Z  at 
> org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkTestBase.runElasticSearchSinkTest(ElasticsearchSinkTestBase.java:128)
> 2020-04-15T02:37:04.4293990Z  at 
> org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkTestBase.runElasticsearchSinkSmileTest(ElasticsearchSinkTestBase.java:106)
> 2020-04-15T02:37:04.4295096Z  at 
> org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSinkITCase.testElasticsearchSinkWithSmile(ElasticsearchSinkITCase.java:45)
> 2020-04-15T02:37:04.4295923Z  at 
> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 2020-04-15T02:37:04.4296489Z  at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> 2020-04-15T02:37:04.4297076Z  at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 2020-04-15T02:37:04.4297513Z  at 
> java.lang.reflect.Method.invoke(Method.java:498)
> 2020-04-15T02:37:04.4297951Z  at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
> 2020-04-15T02:37:04.4298688Z  at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> 2020-04-15T02:37:04.4299374Z  at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
> 2020-04-15T02:37:04.4300069Z  at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> 2020-04-15T02:37:04.4300960Z  at 
> org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
> 2020-04-15T02:37:04.4301705Z  at 
> org.junit.rules.RunRules.evaluate(RunRules.java:20)
> 2020-04-15T02:37:04.4302204Z  at 
> org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
> 2020-04-15T02:37:04.4302661Z  at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
> 2020-04-15T02:37:04.4303234Z  at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
> 2020-04-15T02:37:04.4303706Z  at 
> org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
> 2020-04-15T02:37:04.4304127Z  at 
> org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
> 2020-04-15T02:37:04.4304716Z  at 
> org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
> 2020-04-15T02:37:04.4305394Z  at 
> org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
> 2020-04-15T02:37:04.4305965Z  at 
> org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
> 2020-04-15T02:37:04.4306425Z  at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
> 2020-04-15T02:37:04.4306942Z  at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
> 2020-04-15T02:37:04.4307466Z  at 
> org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
> 2020-04-15T02:37:04.4307920Z  at 
> org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
> 2020-04-15T02:37:04.4308375Z  at 
> org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
> 

[GitHub] [flink] flinkbot edited a comment on pull request #13217: [FLINK-16866] Make job submission non-blocking

2020-08-25 Thread GitBox


flinkbot edited a comment on pull request #13217:
URL: https://github.com/apache/flink/pull/13217#issuecomment-678285884


   
   ## CI report:
   
   * 3655fcea1966bfbcb85c86d6a159c354f20d6cc7 UNKNOWN
   * 52a49f0b0840aa9220de72d64c50a6b33f6adf92 Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5832)
 
   * 805f3142619f652b96804ab3c65beca2ba50f5d4 Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5860)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (FLINK-17159) ES6 ElasticsearchSinkITCase unstable

2020-08-25 Thread Aljoscha Krettek (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17184125#comment-17184125
 ] 

Aljoscha Krettek commented on FLINK-17159:
--

Potential fix on master: f6467816334ae04da9e72ee759ad007d60bfdca7

> ES6 ElasticsearchSinkITCase unstable
> 
>
> Key: FLINK-17159
> URL: https://issues.apache.org/jira/browse/FLINK-17159
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / ElasticSearch, Tests
>Affects Versions: 1.11.0, 1.12.0
>Reporter: Chesnay Schepler
>Assignee: Aljoscha Krettek
>Priority: Critical
>  Labels: pull-request-available, test-stability
>
> [https://dev.azure.com/rmetzger/Flink/_build/results?buildId=7482=logs=64110e28-73be-50d7-9369-8750330e0bf1=aa84fb9a-59ae-5696-70f7-011bc086e59b]
> {code:java}
> 2020-04-15T02:37:04.4289477Z [ERROR] 
> testElasticsearchSinkWithSmile(org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSinkITCase)
>   Time elapsed: 0.145 s  <<< ERROR!
> 2020-04-15T02:37:04.4290310Z 
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
> 2020-04-15T02:37:04.4290790Z  at 
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147)
> 2020-04-15T02:37:04.4291404Z  at 
> org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:659)
> 2020-04-15T02:37:04.4291956Z  at 
> org.apache.flink.streaming.util.TestStreamEnvironment.execute(TestStreamEnvironment.java:77)
> 2020-04-15T02:37:04.4292548Z  at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1643)
> 2020-04-15T02:37:04.4293254Z  at 
> org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkTestBase.runElasticSearchSinkTest(ElasticsearchSinkTestBase.java:128)
> 2020-04-15T02:37:04.4293990Z  at 
> org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkTestBase.runElasticsearchSinkSmileTest(ElasticsearchSinkTestBase.java:106)
> 2020-04-15T02:37:04.4295096Z  at 
> org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSinkITCase.testElasticsearchSinkWithSmile(ElasticsearchSinkITCase.java:45)
> 2020-04-15T02:37:04.4295923Z  at 
> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 2020-04-15T02:37:04.4296489Z  at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> 2020-04-15T02:37:04.4297076Z  at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 2020-04-15T02:37:04.4297513Z  at 
> java.lang.reflect.Method.invoke(Method.java:498)
> 2020-04-15T02:37:04.4297951Z  at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
> 2020-04-15T02:37:04.4298688Z  at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> 2020-04-15T02:37:04.4299374Z  at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
> 2020-04-15T02:37:04.4300069Z  at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> 2020-04-15T02:37:04.4300960Z  at 
> org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
> 2020-04-15T02:37:04.4301705Z  at 
> org.junit.rules.RunRules.evaluate(RunRules.java:20)
> 2020-04-15T02:37:04.4302204Z  at 
> org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
> 2020-04-15T02:37:04.4302661Z  at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
> 2020-04-15T02:37:04.4303234Z  at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
> 2020-04-15T02:37:04.4303706Z  at 
> org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
> 2020-04-15T02:37:04.4304127Z  at 
> org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
> 2020-04-15T02:37:04.4304716Z  at 
> org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
> 2020-04-15T02:37:04.4305394Z  at 
> org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
> 2020-04-15T02:37:04.4305965Z  at 
> org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
> 2020-04-15T02:37:04.4306425Z  at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
> 2020-04-15T02:37:04.4306942Z  at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
> 2020-04-15T02:37:04.4307466Z  at 
> org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
> 2020-04-15T02:37:04.4307920Z  at 
> org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
> 2020-04-15T02:37:04.4308375Z  at 
> org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
> 2020-04-15T02:37:04.4308782Z  at 
> org.junit.rules.RunRules.evaluate(RunRules.java:20)
> 2020-04-15T02:37:04.4309182Z  at 
> 

[GitHub] [flink] RocMarshal commented on a change in pull request #13225: [FLINK-18974][docs-zh]Translate the 'User-Defined Functions' page of "Application Development's DataStream API" into Chinese

2020-08-25 Thread GitBox


RocMarshal commented on a change in pull request #13225:
URL: https://github.com/apache/flink/pull/13225#discussion_r476500930



##
File path: docs/dev/user_defined_functions.zh.md
##
@@ -23,16 +23,14 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-Most operations require a user-defined function. This section lists different
-ways of how they can be specified. We also cover `Accumulators`, which can be
-used to gain insights into your Flink application.
+大多数操作都需要用户自定义函数。本节列出了实现用户自定义函数的不同方式。还会介绍 `Accumulators`(累加器),可用于深入了解你的 Flink 
应用程序。
 
 
 
 
-## Implementing an interface
+## 实现接口

Review comment:
   I think that we should improve on the section-titles in the whole page  
according to the item `4. 标题锚点链接` of [Flink Translation 
Specifications](https://cwiki.apache.org/confluence/display/FLINK/Flink+Translation+Specifications)

##
File path: docs/dev/user_defined_functions.zh.md
##
@@ -147,95 +142,75 @@ data.map (new RichMapFunction[String, Int] {
 
 
 
-Rich functions provide, in addition to the user-defined function (map,
-reduce, etc), four methods: `open`, `close`, `getRuntimeContext`, and
-`setRuntimeContext`. These are useful for parameterizing the function
-(see [Passing Parameters to Functions]({{ site.baseurl 
}}/dev/batch/index.html#passing-parameters-to-functions)),
-creating and finalizing local state, accessing broadcast variables (see
-[Broadcast Variables]({{ site.baseurl 
}}/dev/batch/index.html#broadcast-variables)), and for accessing runtime
-information such as accumulators and counters (see
-[Accumulators and Counters](#accumulators--counters)), and information
-on iterations (see [Iterations]({{ site.baseurl }}/dev/batch/iterations.html)).
+除了用户自定义的功能(map,reduce 等),富函数还提供了四个方法:`open`、`close`、`getRuntimeContext` 和
+`setRuntimeContext`。这些对于参数化功能很有用
+(参阅 [给函数传递参数]({{ site.baseurl 
}}/zh/dev/batch/index.html#passing-parameters-to-functions)),
+创建和最终确定本地状态,访问广播变量(参阅
+[广播变量]({{ site.baseurl 
}}/zh/dev/batch/index.html#broadcast-variables)),以及访问运行时信息,例如累加器和计数器(参阅
+[累加器和计数器](#累加器和计数器)),以及迭代器的相关信息(参阅 [迭代器]({{ site.baseurl 
}}/zh/dev/batch/iterations.html))。
 
 {% top %}
 
-## Accumulators & Counters
+## 累加器和计数器
 
-Accumulators are simple constructs with an **add operation** and a **final 
accumulated result**,
-which is available after the job ended.
+累加器是具有**加法运算**和**最终累加结果**的一种简单结构,可在作业结束后使用。
 
-The most straightforward accumulator is a **counter**: You can increment it 
using the
-```Accumulator.add(V value)``` method. At the end of the job Flink will sum up 
(merge) all partial
-results and send the result to the client. Accumulators are useful during 
debugging or if you
-quickly want to find out more about your data.
+最简单的累加器就是**计数器**: 你可以使用
+```Accumulator.add(V value)``` 方法将其递增。在作业结束时,Flink 会汇总(合并)所有部分的结果并将其发送给客户端。
+在调试过程中或在你想快速了解有关数据更多信息时,累加器作用很大。
 
-Flink currently has the following **built-in accumulators**. Each of them 
implements the
-{% gh_link 
/flink-core/src/main/java/org/apache/flink/api/common/accumulators/Accumulator.java
 "Accumulator" %}
-interface.
+Flink 目前有如下**内置累加器**。每个都实现了
+{% gh_link 
/flink-core/src/main/java/org/apache/flink/api/common/accumulators/Accumulator.java
 "累加器" %}
+接口。
 
 - {% gh_link 
/flink-core/src/main/java/org/apache/flink/api/common/accumulators/IntCounter.java
 "__IntCounter__" %},
   {% gh_link 
/flink-core/src/main/java/org/apache/flink/api/common/accumulators/LongCounter.java
 "__LongCounter__" %}
-  and {% gh_link 
/flink-core/src/main/java/org/apache/flink/api/common/accumulators/DoubleCounter.java
 "__DoubleCounter__" %}:
-  See below for an example using a counter.
-- {% gh_link 
/flink-core/src/main/java/org/apache/flink/api/common/accumulators/Histogram.java
 "__Histogram__" %}:
-  A histogram implementation for a discrete number of bins. Internally it is 
just a map from Integer
-  to Integer. You can use this to compute distributions of values, e.g. the 
distribution of
-  words-per-line for a word count program.
+  和 {% gh_link 
/flink-core/src/main/java/org/apache/flink/api/common/accumulators/DoubleCounter.java
 "__DoubleCounter__" %}:
+  有关使用计数器的示例,请参见下文。
+- {% gh_link 
/flink-core/src/main/java/org/apache/flink/api/common/accumulators/Histogram.java
 "__直方图__" %}:
+  离散数量的柱状直方图实现。 在内部,它只是整形到整形的映射。你可以使用它来计算值的分布,例如,单词计数程序的每行单词的分布情况。
 
-__How to use accumulators:__
+__如何使用累加器:__
 
-First you have to create an accumulator object (here a counter) in the 
user-defined transformation
-function where you want to use it.
+首先,你要在需要使用累加器的用户自定义的转换函数中创建一个累加器对象(此处是计数器) 。
 
 {% highlight java %}
 private IntCounter numLines = new IntCounter();
 {% endhighlight %}
 
-Second you have to register the accumulator object, typically in the 
```open()``` method of the
-*rich* function. Here you also define the name.
+其次,你必须在富函数的```open()```方法中注册累加器对象。也可以在此处定义名称。
 
 {% highlight java %}
 getRuntimeContext().addAccumulator("num-lines", this.numLines);
 {% endhighlight %}
 

[GitHub] [flink] flinkbot edited a comment on pull request #13241: [hotfix] Fix a typo in the watermark docs

2020-08-25 Thread GitBox


flinkbot edited a comment on pull request #13241:
URL: https://github.com/apache/flink/pull/13241#issuecomment-680066941


   
   ## CI report:
   
   * b8271b12c25e16aab38b9beb58d0f29835bee01a Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5859)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] rmetzger commented on pull request #13217: [FLINK-16866] Make job submission non-blocking

2020-08-25 Thread GitBox


rmetzger commented on pull request #13217:
URL: https://github.com/apache/flink/pull/13217#issuecomment-680075151


   Thanks a lot for your extensive and detailed review. It was very helpful for 
me!
   
   I have addressed most of you your comments. I extended the 
`DispatcherJobTest`. 
   I'm in the process of testing if the refactoring to the client has 
introduced any new test failures, and I might clean up my new tests in 
`DispatcherJob` (I would thus recommend you to review them last).
   
   I might push a small change to the client code later today to better 
distinguish between initialization and runtime failures, so that we can solve 
the unstable `FunctionITCase.testInvalidUseOfTableFunction()` test.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (FLINK-18695) Allow NettyBufferPool to allocate heap buffers

2020-08-25 Thread Chesnay Schepler (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18695?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17184102#comment-17184102
 ] 

Chesnay Schepler commented on FLINK-18695:
--

Given that we will allocate a heap memory anyway when passing things to the ssl 
engine it should be fine to set heap arenas to 0. That _should_ behave exactly 
like the current ode.

> Allow NettyBufferPool to allocate heap buffers
> --
>
> Key: FLINK-18695
> URL: https://issues.apache.org/jira/browse/FLINK-18695
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Network
>Reporter: Chesnay Schepler
>Assignee: Yun Gao
>Priority: Major
> Fix For: 1.12.0
>
>
> in 4.1.43 netty made a change to their SslHandler to always use heap buffers 
> for JDK SSLEngine implementations, to avoid an additional memory copy.
> However, our {{NettyBufferPool}} forbids heap buffer allocations.
> We will either have to allow heap buffer allocations, or create a custom 
> SslHandler implementation that does not use heap buffers (although this seems 
> ill-adviced?).
> /cc [~sewen] [~uce] [~NicoK] [~zjwang] [~pnowojski]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-19005) used metaspace grow on every execution

2020-08-25 Thread Jira


 [ 
https://issues.apache.org/jira/browse/FLINK-19005?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guillermo Sánchez updated FLINK-19005:
--
Attachment: 0_executions.zip
WordCount_1_execution.zip
WordCount_15_executions.zip

> used metaspace grow on every execution
> --
>
> Key: FLINK-19005
> URL: https://issues.apache.org/jira/browse/FLINK-19005
> Project: Flink
>  Issue Type: Bug
>  Components: Client / Job Submission, Runtime / Configuration, 
> Runtime / Coordination
>Affects Versions: 1.11.1
>Reporter: Guillermo Sánchez
>Assignee: Chesnay Schepler
>Priority: Major
> Attachments: 0_executions.zip, WordCount_15_executions.zip, 
> WordCount_1_execution.zip, heap_dump_after_10_executions.zip, 
> heap_dump_after_1_execution.zip, heap_dump_echo_lee.tar.xz
>
>
> Hi !
> Im running a 1.11.1 flink cluster, where I execute batch jobs made with 
> DataSet API.
> I submit these jobs every day to calculate daily data.
> In every execution, cluster's used metaspace increase by 7MB and its never 
> released.
> This ends up with an OutOfMemoryError caused by Metaspace every 15 days and i 
> need to restart the cluster to clean the metaspace
> taskmanager.memory.jvm-metaspace.size is set to 512mb
> Any idea of what could be causing this metaspace grow and why is it not 
> released ?
>  
> 
> === Summary ==
> 
> Case 1, reported by [~gestevez]:
> * Flink 1.11.1
> * Java 11
> * Maximum Metaspace size set to 512mb
> * Custom Batch job, submitted daily
> * Requires restart every 15 days after an OOM
>  Case 2, reported by [~Echo Lee]:
> * Flink 1.11.0
> * Java 11
> * G1GC
> * WordCount Batch job, submitted every second / every 5 minutes
> * eventually fails TaskExecutor with OOM
> Case 3, reported by [~DaDaShen]
> * Flink 1.11.0
> * Java 11
> * WordCount Batch job, submitted every 5 seconds
> * growing Metaspace, eventually OOM
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-19013) Log start/end of state restoration

2020-08-25 Thread Matt Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19013?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17184098#comment-17184098
 ] 

Matt Wang commented on FLINK-19013:
---

Agree it, it would be useful while debugging. 
hi, [~chesnay]  I'm interested in this, can I task this? 

> Log start/end of state restoration
> --
>
> Key: FLINK-19013
> URL: https://issues.apache.org/jira/browse/FLINK-19013
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Reporter: Chesnay Schepler
>Priority: Major
> Fix For: 1.12.0
>
>
> State restoration can take a significant amount of time if the state is large 
> enough, or in special cases like FLINK-19008.
> It would be useful for debugging if we'd log the start/end of 
> {{RestoreOperation#restore.}}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] tillrohrmann commented on a change in pull request #13186: [FLINK-18720][k8s] Migrate KubernetesResourceManager to the new KubernetesResourceManagerDriver

2020-08-25 Thread GitBox


tillrohrmann commented on a change in pull request #13186:
URL: https://github.com/apache/flink/pull/13186#discussion_r476473271



##
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManagerDriver.java
##
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
+import 
org.apache.flink.kubernetes.configuration.KubernetesResourceManagerConfiguration;
+import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
+import 
org.apache.flink.kubernetes.kubeclient.factory.KubernetesTaskManagerFactory;
+import 
org.apache.flink.kubernetes.kubeclient.parameters.KubernetesTaskManagerParameters;
+import org.apache.flink.kubernetes.kubeclient.resources.KubernetesPod;
+import org.apache.flink.kubernetes.kubeclient.resources.KubernetesWatch;
+import org.apache.flink.kubernetes.utils.KubernetesUtils;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import 
org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
+import org.apache.flink.runtime.clusterframework.TaskExecutorProcessSpec;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.externalresource.ExternalResourceUtils;
+import 
org.apache.flink.runtime.resourcemanager.active.AbstractResourceManagerDriver;
+import org.apache.flink.runtime.resourcemanager.active.ResourceManagerDriver;
+import 
org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Implementation of {@link ResourceManagerDriver} for Kubernetes deployment.
+ */
+public class KubernetesResourceManagerDriver extends 
AbstractResourceManagerDriver
+   implements FlinkKubeClient.PodCallbackHandler {
+
+   /** The taskmanager pod name pattern is 
{clusterId}-{taskmanager}-{attemptId}-{podIndex}. */
+   private static final String TASK_MANAGER_POD_FORMAT = 
"%s-taskmanager-%d-%d";
+
+   private final String clusterId;
+
+   private final FlinkKubeClient kubeClient;
+
+   /** Request resource futures, keyed by pod names. */
+   private final Map> 
requestResourceFutures;
+
+   /** When ResourceManager failover, the max attempt should recover. */
+   private long currentMaxAttemptId = 0;
+
+   /** Current max pod index. When creating a new pod, it should increase 
one. */
+   private long currentMaxPodId = 0;
+
+   private KubernetesWatch podsWatch;
+
+   public KubernetesResourceManagerDriver(
+   Configuration flinkConfig,
+   FlinkKubeClient kubeClient,
+   KubernetesResourceManagerConfiguration configuration) {
+   super(flinkConfig, GlobalConfiguration.loadConfiguration());
+
+   this.clusterId = 
Preconditions.checkNotNull(configuration.getClusterId());
+   this.kubeClient = Preconditions.checkNotNull(kubeClient);
+   this.requestResourceFutures = new HashMap<>();
+   }
+
+   // 

+   //  ResourceManagerDriver
+   // 

+
+   @Override
+   protected void initializeInternal() throws Exception {
+   recoverWorkerNodesFromPreviousAttempts();
+
+   podsWatch = kubeClient.watchPodsAndDoCallback(
+   KubernetesUtils.getTaskManagerLabels(clusterId),
+

[GitHub] [flink] flinkbot commented on pull request #13241: [hotfix] Fix a typo in the watermark docs

2020-08-25 Thread GitBox


flinkbot commented on pull request #13241:
URL: https://github.com/apache/flink/pull/13241#issuecomment-680066941


   
   ## CI report:
   
   * b8271b12c25e16aab38b9beb58d0f29835bee01a UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #13217: [FLINK-16866] Make job submission non-blocking

2020-08-25 Thread GitBox


flinkbot edited a comment on pull request #13217:
URL: https://github.com/apache/flink/pull/13217#issuecomment-678285884


   
   ## CI report:
   
   * 3655fcea1966bfbcb85c86d6a159c354f20d6cc7 UNKNOWN
   * 52a49f0b0840aa9220de72d64c50a6b33f6adf92 Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5832)
 
   * 805f3142619f652b96804ab3c65beca2ba50f5d4 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot commented on pull request #13241: [hotfix] Fix a typo in the watermark docs

2020-08-25 Thread GitBox


flinkbot commented on pull request #13241:
URL: https://github.com/apache/flink/pull/13241#issuecomment-680063288


   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Automated Checks
   Last check on commit b8271b12c25e16aab38b9beb58d0f29835bee01a (Tue Aug 25 
14:34:15 UTC 2020)
   
   **Warnings:**
* Documentation files were touched, but no `.zh.md` files: Update Chinese 
documentation or file Jira ticket.
   
   
   Mention the bot in a comment to re-run the automated checks.
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full 
explanation of the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] afedulov opened a new pull request #13241: [hotfix] Fix a typo in the watermark docs

2020-08-25 Thread GitBox


afedulov opened a new pull request #13241:
URL: https://github.com/apache/flink/pull/13241


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (FLINK-19043) Translate the 'Logging' page of 'Debugging & Monitoring' into Chinese

2020-08-25 Thread Roc Marshal (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-19043?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Roc Marshal updated FLINK-19043:

Summary: Translate the 'Logging' page of 'Debugging & Monitoring' into 
Chinese  (was: Translate the page 'Logging' of 'Debugging & Monitoring' into 
Chinese)

> Translate the 'Logging' page of 'Debugging & Monitoring' into Chinese
> -
>
> Key: FLINK-19043
> URL: https://issues.apache.org/jira/browse/FLINK-19043
> Project: Flink
>  Issue Type: Improvement
>  Components: chinese-translation, Documentation
>Affects Versions: 1.10.0, 1.10.1, 1.11.0, 1.10.2, 1.11.1
>Reporter: Roc Marshal
>Assignee: Roc Marshal
>Priority: Major
>  Labels: Documentation, Translation, translation-zh
>
> The page url is : 
> [Logging|https://ci.apache.org/projects/flink/flink-docs-release-1.11/monitoring/logging.html]
>  The markdown file location is : flink/docs/monitoring/logging.zh.md



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot edited a comment on pull request #13238: [FLINK-19042][hive] Remove print table sink from HiveTableSourceITCas…

2020-08-25 Thread GitBox


flinkbot edited a comment on pull request #13238:
URL: https://github.com/apache/flink/pull/13238#issuecomment-679946137


   
   ## CI report:
   
   * e85e2428cac727ca774025ca106d2cb0c319e78e Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5852)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (FLINK-18695) Allow NettyBufferPool to allocate heap buffers

2020-08-25 Thread Yun Gao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18695?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17184089#comment-17184089
 ] 

Yun Gao commented on FLINK-18695:
-

Hi [~chesnay] I also agree with you that there are both cases in the 
implementation of SSLHandler. I mentioned the PERFER_DIRECT flag in the test 
since it seems to have some impact on the overall direct memory used, but it 
did should not be the barrier for the current issue and we could consider it 
separately. 

For enabling heap buffer, another possible option from my side might be we keep 
PERFER_DIRECT = true unchanged and allocates 0 heap arenas. In this case the 
heap buffer would be allocated directly from the process's memory without cache 
(Since cache cause more memory consumption). This method did changes the 
footprint for the heap memory, but since the actually used amount of heap 
memory seems to be limited  (several hundreds KB) according to the current 
tests, I'm not sure if it is acceptable.  

> Allow NettyBufferPool to allocate heap buffers
> --
>
> Key: FLINK-18695
> URL: https://issues.apache.org/jira/browse/FLINK-18695
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Network
>Reporter: Chesnay Schepler
>Assignee: Yun Gao
>Priority: Major
> Fix For: 1.12.0
>
>
> in 4.1.43 netty made a change to their SslHandler to always use heap buffers 
> for JDK SSLEngine implementations, to avoid an additional memory copy.
> However, our {{NettyBufferPool}} forbids heap buffer allocations.
> We will either have to allow heap buffer allocations, or create a custom 
> SslHandler implementation that does not use heap buffers (although this seems 
> ill-adviced?).
> /cc [~sewen] [~uce] [~NicoK] [~zjwang] [~pnowojski]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot edited a comment on pull request #13239: [FLINK-19021][network] Various cleanups and refactorings of the ResultPartition

2020-08-25 Thread GitBox


flinkbot edited a comment on pull request #13239:
URL: https://github.com/apache/flink/pull/13239#issuecomment-679946229


   
   ## CI report:
   
   * af13efd2abeedd3f3d3a7db9e28d6a32456f8e52 Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5853)
 
   * fbf7c39bb2d6cb79f52a93efabe310cb2bafdee9 Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5858)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (FLINK-19041) Add dependency management for ConnectedStream in Python DataStream API.

2020-08-25 Thread Hequn Cheng (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-19041?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hequn Cheng updated FLINK-19041:

Affects Version/s: 1.12.0

> Add dependency management for ConnectedStream in Python DataStream API.
> ---
>
> Key: FLINK-19041
> URL: https://issues.apache.org/jira/browse/FLINK-19041
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python
>Affects Versions: 1.12.0
>Reporter: Shuiqiang Chen
>Assignee: Shuiqiang Chen
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.12.0
>
>
> We failed to set merged configurations into 
> DataStreamTwoInputPythonStatelessFunctionOperator when finally generating the 
> StreamGraph.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-18695) Allow NettyBufferPool to allocate heap buffers

2020-08-25 Thread Chesnay Schepler (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18695?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17184076#comment-17184076
 ] 

Chesnay Schepler commented on FLINK-18695:
--

[~gaoyunhaii] there are certainly cases where PREFER_DIRECT can have an effect; 
it's just that there are 
[others|https://github.com/netty/netty/blob/8c5b72aaf02e7f349a9972dd9179b449b5a6067b/handler/src/main/java/io/netty/handler/ssl/SslHandler.java#L312]
 
[where|https://github.com/netty/netty/blob/8c5b72aaf02e7f349a9972dd9179b449b5a6067b/handler/src/main/java/io/netty/handler/ssl/SslHandler.java#L22369]
 it might not. The first instance specifically is the one that was changed in 
[this 
commit|https://github.com/netty/netty/commit/39cc7a673939dec96258ff27f5b1874671838af0],
 which appears to be the crux of the issue.

> Allow NettyBufferPool to allocate heap buffers
> --
>
> Key: FLINK-18695
> URL: https://issues.apache.org/jira/browse/FLINK-18695
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Network
>Reporter: Chesnay Schepler
>Assignee: Yun Gao
>Priority: Major
> Fix For: 1.12.0
>
>
> in 4.1.43 netty made a change to their SslHandler to always use heap buffers 
> for JDK SSLEngine implementations, to avoid an additional memory copy.
> However, our {{NettyBufferPool}} forbids heap buffer allocations.
> We will either have to allow heap buffer allocations, or create a custom 
> SslHandler implementation that does not use heap buffers (although this seems 
> ill-adviced?).
> /cc [~sewen] [~uce] [~NicoK] [~zjwang] [~pnowojski]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


  1   2   3   >