[GitHub] [flink] klion26 commented on a change in pull request #13212: [FLINK-18973][docs-zh] Translate the 'History Server' page of 'Debugging & Monitoring' into Chinese
klion26 commented on a change in pull request #13212: URL: https://github.com/apache/flink/pull/13212#discussion_r477046283 ## File path: docs/monitoring/historyserver.zh.md ## @@ -22,62 +22,67 @@ specific language governing permissions and limitations under the License. --> -Flink has a history server that can be used to query the statistics of completed jobs after the corresponding Flink cluster has been shut down. +Flink 提供了 history server,可以在相应的 Flink 集群关闭之后查询已完成作业的统计信息。 -Furthermore, it exposes a REST API that accepts HTTP requests and responds with JSON data. +此外,它暴露了一套 REST API,该 API 接受 HTTP 请求并以 JSON 数据格式进行响应。 Review comment: `并返回 JSON 格式的数据` 会好一些吗? ## File path: docs/monitoring/historyserver.zh.md ## @@ -22,62 +22,67 @@ specific language governing permissions and limitations under the License. --> -Flink has a history server that can be used to query the statistics of completed jobs after the corresponding Flink cluster has been shut down. +Flink 提供了 history server,可以在相应的 Flink 集群关闭之后查询已完成作业的统计信息。 -Furthermore, it exposes a REST API that accepts HTTP requests and responds with JSON data. +此外,它暴露了一套 REST API,该 API 接受 HTTP 请求并以 JSON 数据格式进行响应。 * This will be replaced by the TOC {:toc} -## Overview + -The HistoryServer allows you to query the status and statistics of completed jobs that have been archived by a JobManager. +## 概览 -After you have configured the HistoryServer *and* JobManager, you start and stop the HistoryServer via its corresponding startup script: +HistoryServer 允许查询 JobManager 存档的已完成作业的状态和统计信息。 + +在配置 HistoryServer *和* JobManager 之后,你可以使用其相应的启动脚本来启动和停止 HistoryServer: Review comment: `你可以使用相应的脚本来启动和停止 HistoryServer` 会好一些吗? ## File path: docs/monitoring/historyserver.zh.md ## @@ -22,62 +22,67 @@ specific language governing permissions and limitations under the License. --> -Flink has a history server that can be used to query the statistics of completed jobs after the corresponding Flink cluster has been shut down. +Flink 提供了 history server,可以在相应的 Flink 集群关闭之后查询已完成作业的统计信息。 -Furthermore, it exposes a REST API that accepts HTTP requests and responds with JSON data. +此外,它暴露了一套 REST API,该 API 接受 HTTP 请求并以 JSON 数据格式进行响应。 * This will be replaced by the TOC {:toc} -## Overview + -The HistoryServer allows you to query the status and statistics of completed jobs that have been archived by a JobManager. +## 概览 -After you have configured the HistoryServer *and* JobManager, you start and stop the HistoryServer via its corresponding startup script: +HistoryServer 允许查询 JobManager 存档的已完成作业的状态和统计信息。 + +在配置 HistoryServer *和* JobManager 之后,你可以使用其相应的启动脚本来启动和停止 HistoryServer: {% highlight shell %} -# Start or stop the HistoryServer +# 启动或者停止 HistoryServer bin/historyserver.sh (start|start-foreground|stop) {% endhighlight %} -By default, this server binds to `localhost` and listens at port `8082`. +默认情况下,此服务器绑定到 `localhost` 并监听 `8082` 端口。 Review comment: “此服务器绑定到 `localhost` 的 `8082` 端口” 这样描述会好一些吗? ## File path: docs/monitoring/historyserver.zh.md ## @@ -22,62 +22,67 @@ specific language governing permissions and limitations under the License. --> -Flink has a history server that can be used to query the statistics of completed jobs after the corresponding Flink cluster has been shut down. +Flink 提供了 history server,可以在相应的 Flink 集群关闭之后查询已完成作业的统计信息。 -Furthermore, it exposes a REST API that accepts HTTP requests and responds with JSON data. +此外,它暴露了一套 REST API,该 API 接受 HTTP 请求并以 JSON 数据格式进行响应。 * This will be replaced by the TOC {:toc} -## Overview + -The HistoryServer allows you to query the status and statistics of completed jobs that have been archived by a JobManager. +## 概览 -After you have configured the HistoryServer *and* JobManager, you start and stop the HistoryServer via its corresponding startup script: +HistoryServer 允许查询 JobManager 存档的已完成作业的状态和统计信息。 + +在配置 HistoryServer *和* JobManager 之后,你可以使用其相应的启动脚本来启动和停止 HistoryServer: {% highlight shell %} -# Start or stop the HistoryServer +# 启动或者停止 HistoryServer bin/historyserver.sh (start|start-foreground|stop) {% endhighlight %} -By default, this server binds to `localhost` and listens at port `8082`. +默认情况下,此服务器绑定到 `localhost` 并监听 `8082` 端口。 + +目前,只能将 HistoryServer 作为独立的进程运行。 -Currently, you can only run it as a standalone process. + -## Configuration +## 配置参数 -The configuration keys `jobmanager.archive.fs.dir` and `historyserver.archive.fs.refresh-interval` need to be adjusted for archiving and displaying archived jobs. +配置项 `jobmanager.archive.fs.dir` 和 `historyserver.archive.fs.refresh-interval` 需要根据归档路径和已归档作业刷新间隔进行调整。 Review comment: `和已归档作业刷新间隔进行调整` 这里的描述可以看出意思,不过读起来有点点拗口,这里有更好的描述吗?或者看看其他人是否有更好的建议 ## File path: docs/monitoring/historyserver.zh.md ## @@ -22,62 +22,67 @@ specific language governing permissions and limitations under the
[jira] [Closed] (FLINK-11712) Add cost model for both batch and streaming
[ https://issues.apache.org/jira/browse/FLINK-11712?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] godfrey he closed FLINK-11712. -- Resolution: Resolved > Add cost model for both batch and streaming > --- > > Key: FLINK-11712 > URL: https://issues.apache.org/jira/browse/FLINK-11712 > Project: Flink > Issue Type: New Feature > Components: Table SQL / Planner >Reporter: godfrey he >Priority: Major > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink-statefun] billyrrr commented on pull request #131: [FLINK-18968] Translate README.md to Chinese
billyrrr commented on pull request #131: URL: https://github.com/apache/flink-statefun/pull/131#issuecomment-680671983 Hi, @carp84 Thank you for the review. I think that "stateful functions" should be translated to chinese to promote communications within people who primarily speak Chinese. Suppose that someone is proficient at processing stateful streams but not proficient in English, they would not be able to notice the target use case of Flink Stateful Functions. On another note, when someone wants to refer flink stateful functions to another person, it is easier to use the term 'Flink有状态函数' in their native language rather than ' Flink Stateful Functions'. Nevertheless, the English term on its own has its merit of accuracy and ease of finding related documentations. So I do understand your point too. I think that there are more benefit from translating 'Flink Stateful Functions' to 'Flink有状态函数'. Can we discuss more about this decision? Thanks! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #13225: [FLINK-18974][docs-zh]Translate the 'User-Defined Functions' page of "Application Development's DataStream API" into Chinese
flinkbot edited a comment on pull request #13225: URL: https://github.com/apache/flink/pull/13225#issuecomment-678953566 ## CI report: * 6bfd27b44774c770a2d28ee9ca1bde0e11bfbe29 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5870) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-19025) table sql write orc file but hive2.1.1 can not read
[ https://issues.apache.org/jira/browse/FLINK-19025?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17184947#comment-17184947 ] Rui Li commented on FLINK-19025: [~McClone] The stacktrace indicates you're writing streaming data. Therefore you'll need FLINK-18659 to fix the issue. > table sql write orc file but hive2.1.1 can not read > --- > > Key: FLINK-19025 > URL: https://issues.apache.org/jira/browse/FLINK-19025 > Project: Flink > Issue Type: Bug > Components: Connectors / ORC >Affects Versions: 1.11.0 >Reporter: McClone >Priority: Major > > table sql write orc file but hive2.1.1 create external table can not read > data.Because flink use orc-core-1.5.6.jar but hive 2.1.1 use his own orcfile > jar. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-19025) table sql write orc file but hive2.1.1 can not read
[ https://issues.apache.org/jira/browse/FLINK-19025?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17184946#comment-17184946 ] McClone commented on FLINK-19025: - Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/orc/PhysicalWriterException in thread "main" java.lang.NoClassDefFoundError: org/apache/orc/PhysicalWriter at org.apache.flink.table.catalog.hive.client.HiveShimV200.createOrcBulkWriterFactory(HiveShimV200.java:60) at org.apache.flink.connectors.hive.HiveTableSink.createBulkWriterFactory(HiveTableSink.java:251) at org.apache.flink.connectors.hive.HiveTableSink.consumeDataStream(HiveTableSink.java:192) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:114) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:48) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlan(StreamExecLegacySink.scala:48) at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:67) at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:66) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:66) at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:166) at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1248) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:694) at org.apache.flink.table.api.internal.StatementSetImpl.execute(StatementSetImpl.java:97) at com.galaxy.sunny.flink.StartTableJob.main(StartTableJob.java:45)Caused by: java.lang.ClassNotFoundException: org.apache.orc.PhysicalWriter at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ... 23 more > table sql write orc file but hive2.1.1 can not read > --- > > Key: FLINK-19025 > URL: https://issues.apache.org/jira/browse/FLINK-19025 > Project: Flink > Issue Type: Bug > Components: Connectors / ORC >Affects Versions: 1.11.0 >Reporter: McClone >Priority: Major > > table sql write orc file but hive2.1.1 create external table can not read > data.Because flink use orc-core-1.5.6.jar but hive 2.1.1 use his own orcfile > jar. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] flinkbot edited a comment on pull request #13225: [FLINK-18974][docs-zh]Translate the 'User-Defined Functions' page of "Application Development's DataStream API" into Chinese
flinkbot edited a comment on pull request #13225: URL: https://github.com/apache/flink/pull/13225#issuecomment-678953566 ## CI report: * 40658a82560ebc79ee2047a789db4fa003115099 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5808) * 6bfd27b44774c770a2d28ee9ca1bde0e11bfbe29 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5870) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-19022) AkkaRpcActor failed to start but no exception information
[ https://issues.apache.org/jira/browse/FLINK-19022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17184921#comment-17184921 ] tartarus commented on FLINK-19022: -- [~trohrmann] I want to confirm the little details. we pass a {{FatalErrorHandler}} to the {{DispatcherResourceManagerComponent, and then we need to remove }}{{FatalErrorHandler from {{ResourceManager}} and {{Dispatcher}} ?}} Just register the {{TerminationFuture}} of ResourceManager}} and {{Dispatcher to {{DispatcherResourceManagerComponent.}} > AkkaRpcActor failed to start but no exception information > - > > Key: FLINK-19022 > URL: https://issues.apache.org/jira/browse/FLINK-19022 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.10.0, 1.12.0, 1.11.1 >Reporter: tartarus >Assignee: tartarus >Priority: Critical > Fix For: 1.12.0, 1.11.2, 1.10.3 > > > My job appeared that JM could not start normally, and the JM container was > finally killed by RM. > In the end, I found through debug that AkkaRpcActor failed to start because > the version of yarn in my job was incompatible with the version in the > cluster. > [AkkaRpcActor exception > handling|https://github.com/apache/flink/blob/478c9657fe1240acdc1eb08ad32ea93e08b0cd5e/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java#L550] > I add log printing here,and then found the specific problem. > {code:java} > 2020-08-21 21:31:16,985 ERROR > org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StoppedState > [flink-akka.actor.default-dispatcher-4] - Could not start RpcEndpoint > resourcemanager. > java.lang.NoSuchMethodError: > org.apache.hadoop.yarn.api.ApplicationMasterProtocolPB.registerApplicationMaster(Lcom/google/protobuf/RpcController;Lorg/apache/hadoop/yarn/proto/YarnServiceProtos$RegisterApplicationMasterRequestProto;)Lorg/apache/hadoop/yarn/proto/YarnServiceProtos$RegisterApplicationMasterResponseProto; > at > org.apache.hadoop.yarn.api.impl.pb.client.ApplicationMasterProtocolPBClientImpl.registerApplicationMaster(ApplicationMasterProtocolPBClientImpl.java:106) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191) > at > org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102) > at com.sun.proxy.$Proxy25.registerApplicationMaster(Unknown Source) > at > org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl.registerApplicationMaster(AMRMClientImpl.java:222) > at > org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl.registerApplicationMaster(AMRMClientImpl.java:214) > at > org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl.registerApplicationMaster(AMRMClientAsyncImpl.java:138) > at > org.apache.flink.yarn.YarnResourceManager.createAndStartResourceManagerClient(YarnResourceManager.java:229) > at > org.apache.flink.yarn.YarnResourceManager.initialize(YarnResourceManager.java:262) > at > org.apache.flink.runtime.resourcemanager.ResourceManager.startResourceManagerServices(ResourceManager.java:204) > at > org.apache.flink.runtime.resourcemanager.ResourceManager.onStart(ResourceManager.java:192) > at > org.apache.flink.runtime.rpc.RpcEndpoint.internalCallOnStart(RpcEndpoint.java:185) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StoppedState.start(AkkaRpcActor.java:544) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleControlMessage(AkkaRpcActor.java:169) > at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) > at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) > at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) > at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) > at akka.actor.Actor$class.aroundReceive(Actor.scala:517) > at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) > at akka.actor.ActorCell.invoke(ActorCell.scala:561) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) > at akka.dispatch.Mailbox.run(Mailbox.scala:225) > at akka.dispatch.Mailbox.exec(Mailbox.scala:235) >
[GitHub] [flink] flinkbot edited a comment on pull request #13225: [FLINK-18974][docs-zh]Translate the 'User-Defined Functions' page of "Application Development's DataStream API" into Chinese
flinkbot edited a comment on pull request #13225: URL: https://github.com/apache/flink/pull/13225#issuecomment-678953566 ## CI report: * 40658a82560ebc79ee2047a789db4fa003115099 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5808) * 6bfd27b44774c770a2d28ee9ca1bde0e11bfbe29 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #13244: [FLINK-18333][jdbc] UnsignedTypeConversionITCase failed caused by MariaDB4j "Asked to waitFor Program"
flinkbot edited a comment on pull request #13244: URL: https://github.com/apache/flink/pull/13244#issuecomment-680535784 ## CI report: * f5fee133d28436ab73b03d05b1f6fadc8648f5d0 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5869) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-19025) table sql write orc file but hive2.1.1 can not read
[ https://issues.apache.org/jira/browse/FLINK-19025?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17184918#comment-17184918 ] Rui Li commented on FLINK-19025: Hey [~McClone], are you writing streaming data into hive orc table? If so, there's indeed a known issue which has been fixed in FLINK-18659. You should be able to run this use case if you apply that patch, and set {{table.exec.hive.fallback-mapred-writer=true}}, which is the default setting. If you hit the issue when writing batch data into hive, please provide the stacktrace of the exception. > table sql write orc file but hive2.1.1 can not read > --- > > Key: FLINK-19025 > URL: https://issues.apache.org/jira/browse/FLINK-19025 > Project: Flink > Issue Type: Bug > Components: Connectors / ORC >Affects Versions: 1.11.0 >Reporter: McClone >Priority: Major > > table sql write orc file but hive2.1.1 create external table can not read > data.Because flink use orc-core-1.5.6.jar but hive 2.1.1 use his own orcfile > jar. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] zhuxiaoshang commented on pull request #13225: [FLINK-18974][docs-zh]Translate the 'User-Defined Functions' page of "Application Development's DataStream API" into Chinese
zhuxiaoshang commented on pull request #13225: URL: https://github.com/apache/flink/pull/13225#issuecomment-680556966 Thanks for your review @RocMarshal.I have made some changes according to your suggestions. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] zhuxiaoshang commented on a change in pull request #13225: [FLINK-18974][docs-zh]Translate the 'User-Defined Functions' page of "Application Development's DataStream API" into Chinese
zhuxiaoshang commented on a change in pull request #13225: URL: https://github.com/apache/flink/pull/13225#discussion_r477020865 ## File path: docs/dev/user_defined_functions.zh.md ## @@ -147,95 +142,75 @@ data.map (new RichMapFunction[String, Int] { -Rich functions provide, in addition to the user-defined function (map, -reduce, etc), four methods: `open`, `close`, `getRuntimeContext`, and -`setRuntimeContext`. These are useful for parameterizing the function -(see [Passing Parameters to Functions]({{ site.baseurl }}/dev/batch/index.html#passing-parameters-to-functions)), -creating and finalizing local state, accessing broadcast variables (see -[Broadcast Variables]({{ site.baseurl }}/dev/batch/index.html#broadcast-variables)), and for accessing runtime -information such as accumulators and counters (see -[Accumulators and Counters](#accumulators--counters)), and information -on iterations (see [Iterations]({{ site.baseurl }}/dev/batch/iterations.html)). +除了用户自定义的功能(map,reduce 等),富函数还提供了四个方法:`open`、`close`、`getRuntimeContext` 和 +`setRuntimeContext`。这些对于参数化功能很有用 +(参阅 [给函数传递参数]({{ site.baseurl }}/zh/dev/batch/index.html#passing-parameters-to-functions)), +创建和最终确定本地状态,访问广播变量(参阅 +[广播变量]({{ site.baseurl }}/zh/dev/batch/index.html#broadcast-variables)),以及访问运行时信息,例如累加器和计数器(参阅 Review comment: These link-tags can be linked to correct pages,so i think it's unnecessary to change. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot commented on pull request #13244: [FLINK-18333][jdbc] UnsignedTypeConversionITCase failed caused by MariaDB4j "Asked to waitFor Program"
flinkbot commented on pull request #13244: URL: https://github.com/apache/flink/pull/13244#issuecomment-680535784 ## CI report: * f5fee133d28436ab73b03d05b1f6fadc8648f5d0 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot commented on pull request #13244: [FLINK-18333][jdbc] UnsignedTypeConversionITCase failed caused by MariaDB4j "Asked to waitFor Program"
flinkbot commented on pull request #13244: URL: https://github.com/apache/flink/pull/13244#issuecomment-680526208 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Automated Checks Last check on commit f5fee133d28436ab73b03d05b1f6fadc8648f5d0 (Wed Aug 26 03:47:11 UTC 2020) **Warnings:** * No documentation files were touched! Remember to keep the Flink docs up to date! Mention the bot in a comment to re-run the automated checks. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-18333) UnsignedTypeConversionITCase failed caused by MariaDB4j "Asked to waitFor Program"
[ https://issues.apache.org/jira/browse/FLINK-18333?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-18333: --- Labels: pull-request-available test-stability (was: test-stability) > UnsignedTypeConversionITCase failed caused by MariaDB4j "Asked to waitFor > Program" > -- > > Key: FLINK-18333 > URL: https://issues.apache.org/jira/browse/FLINK-18333 > Project: Flink > Issue Type: Bug > Components: Connectors / JDBC, Table SQL / Ecosystem >Affects Versions: 1.12.0 >Reporter: Jark Wu >Assignee: Leonard Xu >Priority: Critical > Labels: pull-request-available, test-stability > > https://dev.azure.com/rmetzger/Flink/_build/results?buildId=8173=logs=66592496-52df-56bb-d03e-37509e1d9d0f=ae0269db-6796-5583-2e5f-d84757d711aa > {code} > 2020-06-16T08:23:26.3013987Z [INFO] Running > org.apache.flink.connector.jdbc.table.JdbcDynamicTableSourceITCase > 2020-06-16T08:23:30.2252334Z Tue Jun 16 08:23:30 UTC 2020 Thread[main,5,main] > java.lang.NoSuchFieldException: DEV_NULL > 2020-06-16T08:23:31.2907920Z > > 2020-06-16T08:23:31.2913806Z Tue Jun 16 08:23:30 UTC 2020: > 2020-06-16T08:23:31.2914839Z Booting Derby version The Apache Software > Foundation - Apache Derby - 10.14.2.0 - (1828579): instance > a816c00e-0172-bc39-e4b1-0e4ce818 > 2020-06-16T08:23:31.2915845Z on database directory > memory:/__w/1/s/flink-connectors/flink-connector-jdbc/target/test with class > loader sun.misc.Launcher$AppClassLoader@677327b6 > 2020-06-16T08:23:31.2916637Z Loaded from > file:/__w/1/.m2/repository/org/apache/derby/derby/10.14.2.0/derby-10.14.2.0.jar > 2020-06-16T08:23:31.2916968Z java.vendor=Private Build > 2020-06-16T08:23:31.2917461Z > java.runtime.version=1.8.0_242-8u242-b08-0ubuntu3~16.04-b08 > 2020-06-16T08:23:31.2922200Z > user.dir=/__w/1/s/flink-connectors/flink-connector-jdbc/target > 2020-06-16T08:23:31.2922516Z os.name=Linux > 2020-06-16T08:23:31.2922709Z os.arch=amd64 > 2020-06-16T08:23:31.2923086Z os.version=4.15.0-1083-azure > 2020-06-16T08:23:31.2923316Z derby.system.home=null > 2020-06-16T08:23:31.2923616Z > derby.stream.error.field=org.apache.flink.connector.jdbc.JdbcTestBase.DEV_NULL > 2020-06-16T08:23:31.2924790Z Database Class Loader started - > derby.database.classpath='' > 2020-06-16T08:23:37.4354243Z [INFO] Tests run: 2, Failures: 0, Errors: 0, > Skipped: 0, Time elapsed: 11.133 s - in > org.apache.flink.connector.jdbc.table.JdbcDynamicTableSourceITCase > 2020-06-16T08:23:38.1880075Z [INFO] Running > org.apache.flink.connector.jdbc.table.JdbcTableSourceITCase > 2020-06-16T08:23:41.3718038Z Tue Jun 16 08:23:41 UTC 2020 Thread[main,5,main] > java.lang.NoSuchFieldException: DEV_NULL > 2020-06-16T08:23:41.4383244Z > > 2020-06-16T08:23:41.4401761Z Tue Jun 16 08:23:41 UTC 2020: > 2020-06-16T08:23:41.4402797Z Booting Derby version The Apache Software > Foundation - Apache Derby - 10.14.2.0 - (1828579): instance > a816c00e-0172-bc3a-103b-0e4b0610 > 2020-06-16T08:23:41.4403758Z on database directory > memory:/__w/1/s/flink-connectors/flink-connector-jdbc/target/test with class > loader sun.misc.Launcher$AppClassLoader@677327b6 > 2020-06-16T08:23:41.4404581Z Loaded from > file:/__w/1/.m2/repository/org/apache/derby/derby/10.14.2.0/derby-10.14.2.0.jar > 2020-06-16T08:23:41.4404945Z java.vendor=Private Build > 2020-06-16T08:23:41.4405497Z > java.runtime.version=1.8.0_242-8u242-b08-0ubuntu3~16.04-b08 > 2020-06-16T08:23:41.4406048Z > user.dir=/__w/1/s/flink-connectors/flink-connector-jdbc/target > 2020-06-16T08:23:41.4406303Z os.name=Linux > 2020-06-16T08:23:41.4406494Z os.arch=amd64 > 2020-06-16T08:23:41.4406878Z os.version=4.15.0-1083-azure > 2020-06-16T08:23:41.4407097Z derby.system.home=null > 2020-06-16T08:23:41.4407415Z > derby.stream.error.field=org.apache.flink.connector.jdbc.JdbcTestBase.DEV_NULL > 2020-06-16T08:23:41.5287219Z Database Class Loader started - > derby.database.classpath='' > 2020-06-16T08:23:46.4567063Z [ERROR] Tests run: 1, Failures: 0, Errors: 1, > Skipped: 0, Time elapsed: 23.729 s <<< FAILURE! - in > org.apache.flink.connector.jdbc.table.UnsignedTypeConversionITCase > 2020-06-16T08:23:46.4575785Z [ERROR] > org.apache.flink.connector.jdbc.table.UnsignedTypeConversionITCase Time > elapsed: 23.729 s <<< ERROR! > 2020-06-16T08:23:46.4576490Z ch.vorburger.exec.ManagedProcessException: An > error occurred while running a command: create database if not exists `test`; > 2020-06-16T08:23:46.4577193Z at ch.vorburger.mariadb4j.DB.run(DB.java:300) > 2020-06-16T08:23:46.4577537Z at ch.vorburger.mariadb4j.DB.run(DB.java:265) > 2020-06-16T08:23:46.4577861Z
[GitHub] [flink] leonardBang opened a new pull request #13244: [FLINK-18333][jdbc] UnsignedTypeConversionITCase failed caused by MariaDB4j "Asked to waitFor Program"
leonardBang opened a new pull request #13244: URL: https://github.com/apache/flink/pull/13244 ## What is the purpose of the change * This pull request fix up unstable test UnsignedTypeConversionITCase. ## Brief change log - Avoid using MariaDB4jRule to create a DB instance, because the MariaDB4jRule will pull up a `ManagedProcess` to run the create db statement which may exit unexpected. - Add retry logic for initializing the DB instance. ## Verifying this change This change is an unstable test fix. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): ( no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #13243: [FLINK-18992][table] Fix Table API renameColumns method annotation error
flinkbot edited a comment on pull request #13243: URL: https://github.com/apache/flink/pull/13243#issuecomment-680489296 ## CI report: * 7abb7dec5769d7d9569b17de97c477c989f67ed2 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5868) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-benchmarks] zhijiangW commented on a change in pull request #2: [FLINK-19003][checkpointing] Add micro-benchmark for unaligned checkpoints
zhijiangW commented on a change in pull request #2: URL: https://github.com/apache/flink-benchmarks/pull/2#discussion_r477009866 ## File path: src/main/java/org/apache/flink/benchmark/RemoteChannelThroughputBenchmark.java ## @@ -80,6 +84,7 @@ public void remoteRebalance(FlinkEnvironmentContext context) throws Exception { StreamExecutionEnvironment env = context.env; env.enableCheckpointing(CHECKPOINT_INTERVAL_MS); env.setParallelism(PARALLELISM); + env.getCheckpointConfig().enableUnalignedCheckpoints(!mode.equals("AlignedCheckpoint")); Review comment: Either option sounds fine to me. But where is the codespeed UI and can I modify it? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #13186: [FLINK-18720][k8s] Migrate KubernetesResourceManager to the new KubernetesResourceManagerDriver
flinkbot edited a comment on pull request #13186: URL: https://github.com/apache/flink/pull/13186#issuecomment-675299227 ## CI report: * 63c2f658cc309661c434451a5d8c0b38d2e748d3 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5848) * 952d92954bb418aa30820b811bc5a41f76e9ab21 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5867) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot commented on pull request #13243: [FLINK-18992][table] Fix Table API renameColumns method annotation error
flinkbot commented on pull request #13243: URL: https://github.com/apache/flink/pull/13243#issuecomment-680489296 ## CI report: * 7abb7dec5769d7d9569b17de97c477c989f67ed2 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-benchmarks] zhijiangW commented on a change in pull request #2: [FLINK-19003][checkpointing] Add micro-benchmark for unaligned checkpoints
zhijiangW commented on a change in pull request #2: URL: https://github.com/apache/flink-benchmarks/pull/2#discussion_r477008616 ## File path: src/main/java/org/apache/flink/benchmark/RemoteChannelThroughputBenchmark.java ## @@ -48,6 +49,9 @@ private MiniCluster miniCluster; +@Param({"AlignedCheckpoint", "UnalignedCheckpoint"}) Review comment: Actually I tried the shorten name in early version, but i am wondering it might confuse external users, so i changed to the semantic name finally. Anyway, i have no preference here and will change it as you suggested. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot commented on pull request #13243: [FLINK-18992][table] Fix Table API renameColumns method annotation error
flinkbot commented on pull request #13243: URL: https://github.com/apache/flink/pull/13243#issuecomment-680478070 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Automated Checks Last check on commit 7abb7dec5769d7d9569b17de97c477c989f67ed2 (Wed Aug 26 03:12:56 UTC 2020) **Warnings:** * No documentation files were touched! Remember to keep the Flink docs up to date! * **This pull request references an unassigned [Jira ticket](https://issues.apache.org/jira/browse/FLINK-18992).** According to the [code contribution guide](https://flink.apache.org/contributing/contribute-code.html), tickets need to be assigned before starting with the implementation work. Mention the bot in a comment to re-run the automated checks. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-18992) Table API renameColumns method annotation error
[ https://issues.apache.org/jira/browse/FLINK-18992?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-18992: --- Labels: pull-request-available (was: ) > Table API renameColumns method annotation error > --- > > Key: FLINK-18992 > URL: https://issues.apache.org/jira/browse/FLINK-18992 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Affects Versions: 1.11.1 >Reporter: tzxxh >Priority: Major > Labels: pull-request-available > Attachments: image-2020-08-19-09-07-06-405.png, > image-2020-08-19-09-07-28-148.png, image-2020-08-19-09-08-30-227.png > > > !image-2020-08-19-09-07-28-148.png! > > when I use this method > !image-2020-08-19-09-08-30-227.png! -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] Fin-chan opened a new pull request #13243: [FLINK-18992][table] Fix Table API renameColumns method annotation error
Fin-chan opened a new pull request #13243: URL: https://github.com/apache/flink/pull/13243 Fix Table API renameColumns method annotation error ## What is the purpose of the change *(For example: This pull request makes task deployment go through the blob server, rather than through RPC. That way we avoid re-transferring them on each deployment (during recovery).)* ## Brief change log *(for example:)* - *The TaskInfo is stored in the blob store on job creation time as a persistent artifact* - *Deployments RPC transmits only the blob storage reference* - *TaskManagers retrieve the TaskInfo from the blob cache* ## Verifying this change *(Please pick either of the following options)* This change is a trivial rework / code cleanup without any test coverage. *(or)* This change is already covered by existing tests, such as *(please describe tests)*. *(or)* This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (100MB)* - *Extended integration test for recovery after master (JobManager) failure* - *Added test that validates that TaskInfo is transferred only once across recoveries* - *Manually verified the change by running a 4 node cluser with 2 JobManagers and 4 TaskManagers, a stateful streaming program, and killing one JobManager and two TaskManagers during the execution, verifying that recovery happens correctly.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / no) - The serializers: (yes / no / don't know) - The runtime per-record code paths (performance sensitive): (yes / no / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: (yes / no / don't know) - The S3 file system connector: (yes / no / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-19025) table sql write orc file but hive2.1.1 can not read
[ https://issues.apache.org/jira/browse/FLINK-19025?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17184887#comment-17184887 ] McClone commented on FLINK-19025: - It really has problems.Flink hive Code use some hive code,but hive-exec 2.1.1 can not provide . > table sql write orc file but hive2.1.1 can not read > --- > > Key: FLINK-19025 > URL: https://issues.apache.org/jira/browse/FLINK-19025 > Project: Flink > Issue Type: Bug > Components: Connectors / ORC >Affects Versions: 1.11.0 >Reporter: McClone >Priority: Major > > table sql write orc file but hive2.1.1 create external table can not read > data.Because flink use orc-core-1.5.6.jar but hive 2.1.1 use his own orcfile > jar. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] flinkbot edited a comment on pull request #13186: [FLINK-18720][k8s] Migrate KubernetesResourceManager to the new KubernetesResourceManagerDriver
flinkbot edited a comment on pull request #13186: URL: https://github.com/apache/flink/pull/13186#issuecomment-675299227 ## CI report: * 63c2f658cc309661c434451a5d8c0b38d2e748d3 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5848) * 952d92954bb418aa30820b811bc5a41f76e9ab21 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-19025) table sql write orc file but hive2.1.1 can not read
[ https://issues.apache.org/jira/browse/FLINK-19025?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17184885#comment-17184885 ] McClone commented on FLINK-19025: - Rui Li , Now i use hive connector , but flink-hive need PhysicalWriter.java and hive-exec-2.1.1 cannot provide . If i import orc-file.jar ,hive can not read table right. > table sql write orc file but hive2.1.1 can not read > --- > > Key: FLINK-19025 > URL: https://issues.apache.org/jira/browse/FLINK-19025 > Project: Flink > Issue Type: Bug > Components: Connectors / ORC >Affects Versions: 1.11.0 >Reporter: McClone >Priority: Major > > table sql write orc file but hive2.1.1 create external table can not read > data.Because flink use orc-core-1.5.6.jar but hive 2.1.1 use his own orcfile > jar. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] xintongsong commented on pull request #13186: [FLINK-18720][k8s] Migrate KubernetesResourceManager to the new KubernetesResourceManagerDriver
xintongsong commented on pull request #13186: URL: https://github.com/apache/flink/pull/13186#issuecomment-680452263 Thanks for the review, @tillrohrmann. I replied about the `flinkClientConfig`. The rest of the comments are addressed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] xintongsong commented on a change in pull request #13186: [FLINK-18720][k8s] Migrate KubernetesResourceManager to the new KubernetesResourceManagerDriver
xintongsong commented on a change in pull request #13186: URL: https://github.com/apache/flink/pull/13186#discussion_r476996403 ## File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManagerDriver.java ## @@ -0,0 +1,301 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions; +import org.apache.flink.kubernetes.configuration.KubernetesResourceManagerConfiguration; +import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient; +import org.apache.flink.kubernetes.kubeclient.factory.KubernetesTaskManagerFactory; +import org.apache.flink.kubernetes.kubeclient.parameters.KubernetesTaskManagerParameters; +import org.apache.flink.kubernetes.kubeclient.resources.KubernetesPod; +import org.apache.flink.kubernetes.kubeclient.resources.KubernetesWatch; +import org.apache.flink.kubernetes.utils.KubernetesUtils; +import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.clusterframework.BootstrapTools; +import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters; +import org.apache.flink.runtime.clusterframework.TaskExecutorProcessSpec; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.externalresource.ExternalResourceUtils; +import org.apache.flink.runtime.resourcemanager.active.AbstractResourceManagerDriver; +import org.apache.flink.runtime.resourcemanager.active.ResourceManagerDriver; +import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; + +/** + * Implementation of {@link ResourceManagerDriver} for Kubernetes deployment. + */ +public class KubernetesResourceManagerDriver extends AbstractResourceManagerDriver + implements FlinkKubeClient.PodCallbackHandler { + + /** The taskmanager pod name pattern is {clusterId}-{taskmanager}-{attemptId}-{podIndex}. */ + private static final String TASK_MANAGER_POD_FORMAT = "%s-taskmanager-%d-%d"; + + private final String clusterId; + + private final FlinkKubeClient kubeClient; + + /** Request resource futures, keyed by pod names. */ + private final Map> requestResourceFutures; + + /** When ResourceManager failover, the max attempt should recover. */ + private long currentMaxAttemptId = 0; + + /** Current max pod index. When creating a new pod, it should increase one. */ + private long currentMaxPodId = 0; + + private KubernetesWatch podsWatch; + + public KubernetesResourceManagerDriver( + Configuration flinkConfig, + FlinkKubeClient kubeClient, + KubernetesResourceManagerConfiguration configuration) { + super(flinkConfig, GlobalConfiguration.loadConfiguration()); + + this.clusterId = Preconditions.checkNotNull(configuration.getClusterId()); + this.kubeClient = Preconditions.checkNotNull(kubeClient); + this.requestResourceFutures = new HashMap<>(); + } + + // + // ResourceManagerDriver + // + + @Override + protected void initializeInternal() throws Exception { + recoverWorkerNodesFromPreviousAttempts(); + + podsWatch = kubeClient.watchPodsAndDoCallback( + KubernetesUtils.getTaskManagerLabels(clusterId), +
[GitHub] [flink] xintongsong commented on a change in pull request #13186: [FLINK-18720][k8s] Migrate KubernetesResourceManager to the new KubernetesResourceManagerDriver
xintongsong commented on a change in pull request #13186: URL: https://github.com/apache/flink/pull/13186#discussion_r476988674 ## File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManagerDriver.java ## @@ -0,0 +1,301 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions; +import org.apache.flink.kubernetes.configuration.KubernetesResourceManagerConfiguration; +import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient; +import org.apache.flink.kubernetes.kubeclient.factory.KubernetesTaskManagerFactory; +import org.apache.flink.kubernetes.kubeclient.parameters.KubernetesTaskManagerParameters; +import org.apache.flink.kubernetes.kubeclient.resources.KubernetesPod; +import org.apache.flink.kubernetes.kubeclient.resources.KubernetesWatch; +import org.apache.flink.kubernetes.utils.KubernetesUtils; +import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.clusterframework.BootstrapTools; +import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters; +import org.apache.flink.runtime.clusterframework.TaskExecutorProcessSpec; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.externalresource.ExternalResourceUtils; +import org.apache.flink.runtime.resourcemanager.active.AbstractResourceManagerDriver; +import org.apache.flink.runtime.resourcemanager.active.ResourceManagerDriver; +import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; + +/** + * Implementation of {@link ResourceManagerDriver} for Kubernetes deployment. + */ +public class KubernetesResourceManagerDriver extends AbstractResourceManagerDriver + implements FlinkKubeClient.PodCallbackHandler { + + /** The taskmanager pod name pattern is {clusterId}-{taskmanager}-{attemptId}-{podIndex}. */ + private static final String TASK_MANAGER_POD_FORMAT = "%s-taskmanager-%d-%d"; + + private final String clusterId; + + private final FlinkKubeClient kubeClient; + + /** Request resource futures, keyed by pod names. */ + private final Map> requestResourceFutures; + + /** When ResourceManager failover, the max attempt should recover. */ + private long currentMaxAttemptId = 0; + + /** Current max pod index. When creating a new pod, it should increase one. */ + private long currentMaxPodId = 0; + + private KubernetesWatch podsWatch; + + public KubernetesResourceManagerDriver( + Configuration flinkConfig, + FlinkKubeClient kubeClient, + KubernetesResourceManagerConfiguration configuration) { + super(flinkConfig, GlobalConfiguration.loadConfiguration()); Review comment: > The problem is that it is not guaranteed at this point that you can read a proper Flink configuration. That sounds to be a problem. Could you explain what might prevent reading a proper Flink configuration? > Moreover, you are missing all dynamically configured properties which are processed in the entrypoints. I think that is exactly what we want. Please see my other comment about `flinkClientConfig`. ## File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManagerDriver.java ## @@ -0,0 +1,301 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed
[GitHub] [flink] xintongsong commented on a change in pull request #13186: [FLINK-18720][k8s] Migrate KubernetesResourceManager to the new KubernetesResourceManagerDriver
xintongsong commented on a change in pull request #13186: URL: https://github.com/apache/flink/pull/13186#discussion_r476986171 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/AbstractResourceManagerDriver.java ## @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.resourcemanager.active; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.clusterframework.types.ResourceIDRetrievable; +import org.apache.flink.runtime.concurrent.ScheduledExecutor; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Abstract common base class for implementations of {@link ResourceManagerDriver}. + */ +public abstract class AbstractResourceManagerDriver + implements ResourceManagerDriver { + + protected final Logger log = LoggerFactory.getLogger(getClass()); + + protected final Configuration flinkConfig; + protected final Configuration flinkClientConfig; Review comment: The background is that, RM will not send the complete configuration to the TMs, but only the differences compared to the original configuration submitted by the client. Currently, when submitting the job, client will ship the local `flink-conf.yaml` file. This file is available to both JM and TMs, through Yarn Shared Cache and Kubernetes ConfigMap. TM should not use that original `flink-conf.yaml` directly, because the configuration might be overwritten on the JM side (e.g., dynamic properties, JM address, TM resource, etc.). Therefore, RM will compare the effective configuration (`flinkConfig`) with the original one shipped from the client side (`flinkClientConfig`), and send the differences to TMs as dynamic properties, which overwrites the original configuration on the TM side. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] xintongsong commented on a change in pull request #13186: [FLINK-18720][k8s] Migrate KubernetesResourceManager to the new KubernetesResourceManagerDriver
xintongsong commented on a change in pull request #13186: URL: https://github.com/apache/flink/pull/13186#discussion_r476975877 ## File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManagerDriver.java ## @@ -0,0 +1,301 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions; +import org.apache.flink.kubernetes.configuration.KubernetesResourceManagerConfiguration; +import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient; +import org.apache.flink.kubernetes.kubeclient.factory.KubernetesTaskManagerFactory; +import org.apache.flink.kubernetes.kubeclient.parameters.KubernetesTaskManagerParameters; +import org.apache.flink.kubernetes.kubeclient.resources.KubernetesPod; +import org.apache.flink.kubernetes.kubeclient.resources.KubernetesWatch; +import org.apache.flink.kubernetes.utils.KubernetesUtils; +import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.clusterframework.BootstrapTools; +import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters; +import org.apache.flink.runtime.clusterframework.TaskExecutorProcessSpec; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.externalresource.ExternalResourceUtils; +import org.apache.flink.runtime.resourcemanager.active.AbstractResourceManagerDriver; +import org.apache.flink.runtime.resourcemanager.active.ResourceManagerDriver; +import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; + +/** + * Implementation of {@link ResourceManagerDriver} for Kubernetes deployment. + */ +public class KubernetesResourceManagerDriver extends AbstractResourceManagerDriver + implements FlinkKubeClient.PodCallbackHandler { + + /** The taskmanager pod name pattern is {clusterId}-{taskmanager}-{attemptId}-{podIndex}. */ + private static final String TASK_MANAGER_POD_FORMAT = "%s-taskmanager-%d-%d"; + + private final String clusterId; + + private final FlinkKubeClient kubeClient; + + /** Request resource futures, keyed by pod names. */ + private final Map> requestResourceFutures; + + /** When ResourceManager failover, the max attempt should recover. */ + private long currentMaxAttemptId = 0; + + /** Current max pod index. When creating a new pod, it should increase one. */ + private long currentMaxPodId = 0; + + private KubernetesWatch podsWatch; + + public KubernetesResourceManagerDriver( + Configuration flinkConfig, + FlinkKubeClient kubeClient, + KubernetesResourceManagerConfiguration configuration) { + super(flinkConfig, GlobalConfiguration.loadConfiguration()); + + this.clusterId = Preconditions.checkNotNull(configuration.getClusterId()); + this.kubeClient = Preconditions.checkNotNull(kubeClient); + this.requestResourceFutures = new HashMap<>(); + } + + // + // ResourceManagerDriver + // + + @Override + protected void initializeInternal() throws Exception { + recoverWorkerNodesFromPreviousAttempts(); + + podsWatch = kubeClient.watchPodsAndDoCallback( + KubernetesUtils.getTaskManagerLabels(clusterId), +
[GitHub] [flink] xintongsong commented on a change in pull request #13186: [FLINK-18720][k8s] Migrate KubernetesResourceManager to the new KubernetesResourceManagerDriver
xintongsong commented on a change in pull request #13186: URL: https://github.com/apache/flink/pull/13186#discussion_r476973166 ## File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManagerDriver.java ## @@ -0,0 +1,325 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions; +import org.apache.flink.kubernetes.configuration.KubernetesResourceManagerDriverConfiguration; +import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient; +import org.apache.flink.kubernetes.kubeclient.factory.KubernetesTaskManagerFactory; +import org.apache.flink.kubernetes.kubeclient.parameters.KubernetesTaskManagerParameters; +import org.apache.flink.kubernetes.kubeclient.resources.KubernetesPod; +import org.apache.flink.kubernetes.kubeclient.resources.KubernetesWatch; +import org.apache.flink.kubernetes.utils.KubernetesUtils; +import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.clusterframework.BootstrapTools; +import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters; +import org.apache.flink.runtime.clusterframework.TaskExecutorProcessSpec; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.externalresource.ExternalResourceUtils; +import org.apache.flink.runtime.resourcemanager.active.AbstractResourceManagerDriver; +import org.apache.flink.runtime.resourcemanager.active.ResourceManagerDriver; +import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; + +/** + * Implementation of {@link ResourceManagerDriver} for Kubernetes deployment. + */ +public class KubernetesResourceManagerDriver extends AbstractResourceManagerDriver + implements FlinkKubeClient.PodCallbackHandler { + + /** The taskmanager pod name pattern is {clusterId}-{taskmanager}-{attemptId}-{podIndex}. */ + private static final String TASK_MANAGER_POD_FORMAT = "%s-taskmanager-%d-%d"; + + private final String clusterId; + + private final Time podCreationRetryInterval; + + private final FlinkKubeClient kubeClient; + + /** Request resource futures, keyed by pod names. */ + private final Map> requestResourceFutures; + + /** When ResourceManager failover, the max attempt should recover. */ + private long currentMaxAttemptId = 0; + + /** Current max pod index. When creating a new pod, it should increase one. */ + private long currentMaxPodId = 0; + + private KubernetesWatch podsWatch; + + /** +* Incompletion of this future indicates that there was a pod creation failure recently and the driver should not +* retry creating pods until the future become completed again. It's guaranteed to be modified in main thread. +*/ + private CompletableFuture podCreationCoolDown; + + public KubernetesResourceManagerDriver( + Configuration flinkConfig, + FlinkKubeClient kubeClient, + KubernetesResourceManagerDriverConfiguration configuration) { + super(flinkConfig, GlobalConfiguration.loadConfiguration()); + + this.clusterId = Preconditions.checkNotNull(configuration.getClusterId()); + this.podCreationRetryInterval = Preconditions.checkNotNull(configuration.getPodCreationRetryInterval()); + this.kubeClient = Preconditions.checkNotNull(kubeClient); +
[GitHub] [flink] xintongsong commented on a change in pull request #13186: [FLINK-18720][k8s] Migrate KubernetesResourceManager to the new KubernetesResourceManagerDriver
xintongsong commented on a change in pull request #13186: URL: https://github.com/apache/flink/pull/13186#discussion_r476973166 ## File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManagerDriver.java ## @@ -0,0 +1,325 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions; +import org.apache.flink.kubernetes.configuration.KubernetesResourceManagerDriverConfiguration; +import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient; +import org.apache.flink.kubernetes.kubeclient.factory.KubernetesTaskManagerFactory; +import org.apache.flink.kubernetes.kubeclient.parameters.KubernetesTaskManagerParameters; +import org.apache.flink.kubernetes.kubeclient.resources.KubernetesPod; +import org.apache.flink.kubernetes.kubeclient.resources.KubernetesWatch; +import org.apache.flink.kubernetes.utils.KubernetesUtils; +import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.clusterframework.BootstrapTools; +import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters; +import org.apache.flink.runtime.clusterframework.TaskExecutorProcessSpec; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.externalresource.ExternalResourceUtils; +import org.apache.flink.runtime.resourcemanager.active.AbstractResourceManagerDriver; +import org.apache.flink.runtime.resourcemanager.active.ResourceManagerDriver; +import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; + +/** + * Implementation of {@link ResourceManagerDriver} for Kubernetes deployment. + */ +public class KubernetesResourceManagerDriver extends AbstractResourceManagerDriver + implements FlinkKubeClient.PodCallbackHandler { + + /** The taskmanager pod name pattern is {clusterId}-{taskmanager}-{attemptId}-{podIndex}. */ + private static final String TASK_MANAGER_POD_FORMAT = "%s-taskmanager-%d-%d"; + + private final String clusterId; + + private final Time podCreationRetryInterval; + + private final FlinkKubeClient kubeClient; + + /** Request resource futures, keyed by pod names. */ + private final Map> requestResourceFutures; + + /** When ResourceManager failover, the max attempt should recover. */ + private long currentMaxAttemptId = 0; + + /** Current max pod index. When creating a new pod, it should increase one. */ + private long currentMaxPodId = 0; + + private KubernetesWatch podsWatch; + + /** +* Incompletion of this future indicates that there was a pod creation failure recently and the driver should not +* retry creating pods until the future become completed again. It's guaranteed to be modified in main thread. +*/ + private CompletableFuture podCreationCoolDown; + + public KubernetesResourceManagerDriver( + Configuration flinkConfig, + FlinkKubeClient kubeClient, + KubernetesResourceManagerDriverConfiguration configuration) { + super(flinkConfig, GlobalConfiguration.loadConfiguration()); + + this.clusterId = Preconditions.checkNotNull(configuration.getClusterId()); + this.podCreationRetryInterval = Preconditions.checkNotNull(configuration.getPodCreationRetryInterval()); + this.kubeClient = Preconditions.checkNotNull(kubeClient); +
[jira] [Closed] (FLINK-19041) Add dependency management for ConnectedStream in Python DataStream API.
[ https://issues.apache.org/jira/browse/FLINK-19041?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hequn Cheng closed FLINK-19041. --- Resolution: Resolved > Add dependency management for ConnectedStream in Python DataStream API. > --- > > Key: FLINK-19041 > URL: https://issues.apache.org/jira/browse/FLINK-19041 > Project: Flink > Issue Type: Bug > Components: API / Python >Affects Versions: 1.12.0 >Reporter: Shuiqiang Chen >Assignee: Shuiqiang Chen >Priority: Major > Labels: pull-request-available > Fix For: 1.12.0 > > > We failed to set merged configurations into > DataStreamTwoInputPythonStatelessFunctionOperator when finally generating the > StreamGraph. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-19041) Add dependency management for ConnectedStream in Python DataStream API.
[ https://issues.apache.org/jira/browse/FLINK-19041?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17184825#comment-17184825 ] Hequn Cheng commented on FLINK-19041: - Resolved in 1.12.0 via 66797ac6352c38c98e310fd2b34aa39088a6eacb > Add dependency management for ConnectedStream in Python DataStream API. > --- > > Key: FLINK-19041 > URL: https://issues.apache.org/jira/browse/FLINK-19041 > Project: Flink > Issue Type: Bug > Components: API / Python >Affects Versions: 1.12.0 >Reporter: Shuiqiang Chen >Assignee: Shuiqiang Chen >Priority: Major > Labels: pull-request-available > Fix For: 1.12.0 > > > We failed to set merged configurations into > DataStreamTwoInputPythonStatelessFunctionOperator when finally generating the > StreamGraph. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] hequn8128 merged pull request #13236: [FLINK-19041][python] Add dependency management for ConnectedStream i…
hequn8128 merged pull request #13236: URL: https://github.com/apache/flink/pull/13236 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] hequn8128 commented on pull request #13236: [FLINK-19041][python] Add dependency management for ConnectedStream i…
hequn8128 commented on pull request #13236: URL: https://github.com/apache/flink/pull/13236#issuecomment-680393618 @shuiqiangchen Thanks a lot for fixing the bug. LGTM. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] zentol commented on a change in pull request #13163: [FLINK-16789][runtime] Support JMX RMI random port assign
zentol commented on a change in pull request #13163: URL: https://github.com/apache/flink/pull/13163#discussion_r476790261 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/management/JMXService.java ## @@ -85,6 +86,9 @@ private static JMXServer startJMXServerWithPortRanges(Iterator ports) { while (ports.hasNext() && successfullyStartedServer == null) { JMXServer server = new JMXServer(); int port = ports.next(); + if (port == 0) { // try poke with a random port when port is set to zero Review comment: @walterddr Yes, I think we can close the PR. It is admittedly slightly less convenient to explicitly define a port range, but the users will also get more predictable behavior, and we don't have to deal with the added complexity. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] zentol commented on a change in pull request #13163: [FLINK-16789][runtime] Support JMX RMI random port assign
zentol commented on a change in pull request #13163: URL: https://github.com/apache/flink/pull/13163#discussion_r476645574 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/management/JMXService.java ## @@ -85,6 +86,9 @@ private static JMXServer startJMXServerWithPortRanges(Iterator ports) { while (ports.hasNext() && successfullyStartedServer == null) { JMXServer server = new JMXServer(); int port = ports.next(); + if (port == 0) { // try poke with a random port when port is set to zero Review comment: > it would be better to do this conversion in the realm of the JMXServer Agreed. > Do we know that the startup period will be super long if we don't do this? It probably isn't long. In the port-probing version, you are essentially entering a race condition between all processes on this node. I would consider it unlikely you will have more than a hundred of TaskExecutors on a given node, and only at that point should it be relevant in the first place. I only suggested the special iterator to workaround the worst case scenario, something like the first 1000 ports already being allocated, where the current approach would likely fair miserably. But there clear downsides to having this be applied in general to all users of the NetUtils; it is _very_ convenient to define a port-range and be pretty much guaranteed that all processes start at the front, and form a continuous port sequence if present on a single node. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] walterddr commented on a change in pull request #13163: [FLINK-16789][runtime] Support JMX RMI random port assign
walterddr commented on a change in pull request #13163: URL: https://github.com/apache/flink/pull/13163#discussion_r476778503 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/management/JMXService.java ## @@ -85,6 +86,9 @@ private static JMXServer startJMXServerWithPortRanges(Iterator ports) { while (ports.hasNext() && successfullyStartedServer == null) { JMXServer server = new JMXServer(); int port = ports.next(); + if (port == 0) { // try poke with a random port when port is set to zero Review comment: actually with some consideration, I dont think this PR is even necessary in a realistic stand point - setting the port to `"0"` vs setting it to some port range is pretty much equal in a platform job management standpoint. WRT the random port assignment, yeah I am not sure that's actually needed - we didn't do a profiling. I intent to close this PR and comment on the JIRA ticket if we all agree. (thank you so much for chiming in and sharing your thoughts @tillrohrmann @zentol :-) ) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (FLINK-18192) Upgrade to Avro version 1.9.2 from 1.8.2
[ https://issues.apache.org/jira/browse/FLINK-18192?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17183877#comment-17183877 ] Chesnay Schepler edited comment on FLINK-18192 at 8/25/20, 9:31 PM: [~twalthr] Could you find someone to gauge how difficult this would be? I'm interested in this because of CVE-2019-10172 transitively affecting Avro 1.8.X. I know that we expect users to provide whatever version they wish (and in fact if they can just use 1.9 it may not be a problem), but we should think about how/when to upgrade avro. was (Author: zentol): [~twalthr] Could you find someone to gauge how difficult this would be? > Upgrade to Avro version 1.9.2 from 1.8.2 > > > Key: FLINK-18192 > URL: https://issues.apache.org/jira/browse/FLINK-18192 > Project: Flink > Issue Type: Improvement > Components: Build System, Formats (JSON, Avro, Parquet, ORC, > SequenceFile) >Reporter: Lucas Heimberg >Priority: Major > Fix For: 1.12.0 > > > As of version 1.11, Flink (i.e., flink-avro) still uses Avro in version 1.8.2. > Avro 1.9.2 contains many bugfixes, in particular in respect to the support > for logical types. A further advantage would be that an upgrade to Avro 1.9.2 > would also allow to use the Confluent Schema Registry client and Avro > deserializer in version 5.5.0, which finally support schema references. > Therefore it would be great if Flink could make use of Avro 1.9.2 or higher > in future releases. > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-18546) Upgrade to Kafka Schema Registry Client 5.5.0
[ https://issues.apache.org/jira/browse/FLINK-18546?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17184742#comment-17184742 ] Chesnay Schepler commented on FLINK-18546: -- How would this work in terms of backwards compatibility? > Upgrade to Kafka Schema Registry Client 5.5.0 > - > > Key: FLINK-18546 > URL: https://issues.apache.org/jira/browse/FLINK-18546 > Project: Flink > Issue Type: Improvement > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) >Affects Versions: 1.11.0 >Reporter: Lucas Heimberg >Priority: Major > Labels: Avro, avro > > As of version 1.11, Flink (i.e., flink-avro-confluent-registry) still uses > kafka-schema-registry-client in version 4.1.0. > From version 5.5.0 on, the Kafka schema registry client supports resolving of > schema references (see > [https://docs.confluent.io/current/schema-registry/serdes-develop/index.html#referenced-schemas]), > i.e., schemas that are defined using schemas from other subjects. > Therefore it would be great if Flink could make use of > kafka-schema-registry-client in version 5.5.0 or higher in future releases. > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] flinkbot edited a comment on pull request #13242: [FLINK-19009][metrics] Fixed the downtime metric issue and updated the comment
flinkbot edited a comment on pull request #13242: URL: https://github.com/apache/flink/pull/13242#issuecomment-680108384 ## CI report: * fc6dc041ef06000c57767f9665c34ecc70cff6da Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5863) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] AHeise commented on a change in pull request #13228: [FLINK-19026][network] Improve threading model of CheckpointBarrierUnaligner
AHeise commented on a change in pull request #13228: URL: https://github.com/apache/flink/pull/13228#discussion_r476683415 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnaligner.java ## @@ -92,306 +90,173 @@ super(toNotifyOnCheckpoint); this.taskName = taskName; - hasInflightBuffers = Arrays.stream(inputGates) + this.inputGates = inputGates; + storeNewBuffers = Arrays.stream(inputGates) .flatMap(gate -> gate.getChannelInfos().stream()) .collect(Collectors.toMap(Function.identity(), info -> false)); - threadSafeUnaligner = new ThreadSafeUnaligner(checkNotNull(checkpointCoordinator), this, inputGates); + numOpenChannels = storeNewBuffers.size(); + this.checkpointCoordinator = checkpointCoordinator; } - /** -* We still need to trigger checkpoint via {@link ThreadSafeUnaligner#notifyBarrierReceived(CheckpointBarrier, InputChannelInfo)} -* while reading the first barrier from one channel, because this might happen -* earlier than the previous async trigger via mailbox by netty thread. -* -* Note this is also suitable for the trigger case of local input channel. -*/ @Override - public void processBarrier(CheckpointBarrier receivedBarrier, InputChannelInfo channelInfo) throws IOException { - long barrierId = receivedBarrier.getId(); - if (currentConsumedCheckpointId > barrierId || (currentConsumedCheckpointId == barrierId && !isCheckpointPending())) { + public void processBarrier(CheckpointBarrier barrier, InputChannelInfo channelInfo) throws IOException { + long barrierId = barrier.getId(); + if (currentCheckpointId > barrierId || (currentCheckpointId == barrierId && !isCheckpointPending())) { // ignore old and cancelled barriers return; } - if (currentConsumedCheckpointId < barrierId) { - currentConsumedCheckpointId = barrierId; - numBarrierConsumed = 0; - hasInflightBuffers.entrySet().forEach(hasInflightBuffer -> hasInflightBuffer.setValue(true)); + if (currentCheckpointId < barrierId) { + handleNewCheckpoint(barrier); + notifyCheckpoint(barrier, 0); } - if (currentConsumedCheckpointId == barrierId) { - hasInflightBuffers.put(channelInfo, false); - numBarrierConsumed++; + if (currentCheckpointId == barrierId) { + if (storeNewBuffers.put(channelInfo, false)) { + LOG.debug("{}: Received barrier from channel {} @ {}.", taskName, channelInfo, barrierId); + + inputGates[channelInfo.getGateIdx()].getChannel(channelInfo.getInputChannelIdx()) + .spillInflightBuffers(barrierId, checkpointCoordinator.getChannelStateWriter()); + + if (++numBarriersReceived == numOpenChannels) { + allBarriersReceivedFuture.complete(null); + } + } } - threadSafeUnaligner.notifyBarrierReceived(receivedBarrier, channelInfo); } @Override public void abortPendingCheckpoint(long checkpointId, CheckpointException exception) throws IOException { - threadSafeUnaligner.tryAbortPendingCheckpoint(checkpointId, exception); + tryAbortPendingCheckpoint(checkpointId, exception); - if (checkpointId > currentConsumedCheckpointId) { - resetPendingCheckpoint(checkpointId); + if (checkpointId > currentCheckpointId) { + resetPendingCheckpoint(); } } @Override public void processCancellationBarrier(CancelCheckpointMarker cancelBarrier) throws IOException { final long cancelledId = cancelBarrier.getCheckpointId(); - boolean shouldAbort = threadSafeUnaligner.setCancelledCheckpointId(cancelledId); + boolean shouldAbort = setCancelledCheckpointId(cancelledId); if (shouldAbort) { notifyAbort( cancelledId, new CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED_ON_CANCELLATION_BARRIER)); } - if (cancelledId >= currentConsumedCheckpointId) { - resetPendingCheckpoint(cancelledId); - currentConsumedCheckpointId =
[GitHub] [flink] AHeise commented on a change in pull request #13228: [FLINK-19026][network] Improve threading model of CheckpointBarrierUnaligner
AHeise commented on a change in pull request #13228: URL: https://github.com/apache/flink/pull/13228#discussion_r476678903 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInput.java ## @@ -159,10 +158,12 @@ public InputStatus emitNext(DataOutput output) throws Exception { if (bufferOrEvent.isPresent()) { // return to the mailbox after receiving a checkpoint barrier to avoid processing of // data after the barrier before checkpoint is performed for unaligned checkpoint mode - if (bufferOrEvent.get().isEvent() && bufferOrEvent.get().getEvent() instanceof CheckpointBarrier) { + if (bufferOrEvent.get().isBuffer()) { + processBuffer(bufferOrEvent.get()); + } else { + processEvent(bufferOrEvent.get()); Review comment: Nope, it's a refactoring that I should move out. We could also revert back. I had an intermediate version where `processEvent` signaled if the loop should be broken or not and there the split made more sense. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] morsapaes commented on a change in pull request #13203: [FLINK-18984][python][docs] Add tutorial documentation for Python DataStream API
morsapaes commented on a change in pull request #13203: URL: https://github.com/apache/flink/pull/13203#discussion_r476650361 ## File path: docs/dev/python/getting-started/tutorial/datastream_tutorial.md ## @@ -0,0 +1,126 @@ +--- +title: "Python DataStream API Tutorial" +nav-parent_id: python_tutorial +nav-pos: 30 +--- + + +This walkthrough will quickly get you started building a pure Python Flink DataStream project. + +Please refer to the PyFlink [installation guide]({{ site.baseurl }}/dev/python/getting-started/installation.html) on how to set up the Python execution environments. + +* This will be replaced by the TOC +{:toc} + +## Setting up a Python Project + +You can begin by creating a Python project and installing the PyFlink package following the [installation guide]({{ site.baseurl }}/dev/python/getting-started/installation.html#installation-of-pyflink). + +## Writing a Flink Python DataStream API Program + +DataStream API applications begin by declaring a `StreamExecutionEnvironment`. +This is the context in which a streaming program is executed. +It can be used for setting execution parameters such as restart strategy, default parallelism, etc. + +{% highlight python %} +env = StreamExecutionEnvironment.get_execution_environment() +env.set_parallelism(1) +{% endhighlight %} + +Once a `StreamExecutionEnvironment` created, you can declare your source with it. + +{% highlight python %} +ds = env.from_collection( +collection=[(1, 'aaa'), (2, 'bbb')], +type_info=Types.ROW([Types.INT(), Types.STRING()])) +{% endhighlight %} + +This creates a data stream from the given collection. The type is that of the elements in the collection. In this example, the type is a Row type with two fields. The type of the first field is integer type while the second is string type. + +You can now perform transformations on the datastream or writes the data into external system with sink. Review comment: ```suggestion You can now perform transformations on this data stream, or just write the data to an external system using a _sink_. This walkthrough uses the `StreamingFileSink` sink connector to write the data into a file in the `/tmp/output` directory. ``` ## File path: docs/dev/python/getting-started/tutorial/datastream_tutorial.md ## @@ -0,0 +1,126 @@ +--- +title: "Python DataStream API Tutorial" +nav-parent_id: python_tutorial +nav-pos: 30 +--- + + +This walkthrough will quickly get you started building a pure Python Flink DataStream project. + +Please refer to the PyFlink [installation guide]({{ site.baseurl }}/dev/python/getting-started/installation.html) on how to set up the Python execution environments. Review comment: Installation with `pip` is pretty straightforward, so why not just add this to the tutorial instead of making the user go to a different page? If we are restructuring these anyways, I'd suggest to follow the same structure as the existing tutorials: https://ci.apache.org/projects/flink/flink-docs-release-1.11/try-flink/datastream_api.html ## File path: docs/dev/python/getting-started/tutorial/datastream_tutorial.md ## @@ -0,0 +1,126 @@ +--- +title: "Python DataStream API Tutorial" +nav-parent_id: python_tutorial +nav-pos: 30 +--- + + +This walkthrough will quickly get you started building a pure Python Flink DataStream project. + +Please refer to the PyFlink [installation guide]({{ site.baseurl }}/dev/python/getting-started/installation.html) on how to set up the Python execution environments. + +* This will be replaced by the TOC +{:toc} + +## Setting up a Python Project + +You can begin by creating a Python project and installing the PyFlink package following the [installation guide]({{ site.baseurl }}/dev/python/getting-started/installation.html#installation-of-pyflink). + +## Writing a Flink Python DataStream API Program + +DataStream API applications begin by declaring a `StreamExecutionEnvironment`. +This is the context in which a streaming program is executed. +It can be used for setting execution parameters such as restart strategy, default parallelism, etc. + +{% highlight python %} +env = StreamExecutionEnvironment.get_execution_environment() +env.set_parallelism(1) +{% endhighlight %} + +Once a `StreamExecutionEnvironment` created, you can declare your source with it. + +{% highlight python %} +ds = env.from_collection( +collection=[(1, 'aaa'), (2, 'bbb')], +type_info=Types.ROW([Types.INT(), Types.STRING()])) +{% endhighlight %} + +This creates a data stream from the given collection. The type is that of the elements in the collection. In this example, the type is a Row type with two fields. The type of the first field is integer type while the second is string type. + +You can now perform transformations on the datastream or writes the data into external system with sink. + +{% highlight python %} +ds.add_sink(StreamingFileSink +
[GitHub] [flink] AHeise commented on a change in pull request #13228: [FLINK-19026][network] Improve threading model of CheckpointBarrierUnaligner
AHeise commented on a change in pull request #13228: URL: https://github.com/apache/flink/pull/13228#discussion_r476677925 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointedInputGate.java ## @@ -74,34 +106,34 @@ public CheckpointedInputGate( } @Override - public Optional pollNext() throws Exception { - while (true) { - Optional next = inputGate.pollNext(); + public Optional pollNext() throws IOException, InterruptedException { + Optional next = inputGate.pollNext(); - if (!next.isPresent()) { - return handleEmptyBuffer(); - } + if (!next.isPresent()) { + return handleEmptyBuffer(); + } - BufferOrEvent bufferOrEvent = next.get(); - checkState(!barrierHandler.isBlocked(bufferOrEvent.getChannelInfo())); + BufferOrEvent bufferOrEvent = next.get(); + checkState(!barrierHandler.isBlocked(bufferOrEvent.getChannelInfo())); - if (bufferOrEvent.isBuffer()) { - return next; - } - else if (bufferOrEvent.getEvent().getClass() == CheckpointBarrier.class) { - CheckpointBarrier checkpointBarrier = (CheckpointBarrier) bufferOrEvent.getEvent(); - barrierHandler.processBarrier(checkpointBarrier, bufferOrEvent.getChannelInfo()); - return next; - } - else if (bufferOrEvent.getEvent().getClass() == CancelCheckpointMarker.class) { - barrierHandler.processCancellationBarrier((CancelCheckpointMarker) bufferOrEvent.getEvent()); - } - else { - if (bufferOrEvent.getEvent().getClass() == EndOfPartitionEvent.class) { - barrierHandler.processEndOfPartition(); - } - return next; - } + if (bufferOrEvent.isEvent()) { + handleEvent(bufferOrEvent); + } else { + barrierHandler.processBuffer(bufferOrEvent.getBuffer(), bufferOrEvent.getChannelInfo()); Review comment: Yes, it's used in the next commit to persist in-flight data (replaces `notifyBufferReceived`). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] AHeise commented on a change in pull request #13228: [FLINK-19026][network] Improve threading model of CheckpointBarrierUnaligner
AHeise commented on a change in pull request #13228: URL: https://github.com/apache/flink/pull/13228#discussion_r476674004 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointedInputGate.java ## @@ -74,34 +106,34 @@ public CheckpointedInputGate( } @Override - public Optional pollNext() throws Exception { - while (true) { Review comment: Yes, it's changing semantics (as I had written in commit message). I have not found a good reason why it's not always exited and it makes things easier especially since this method can now be used to process priority events. Btw I think it also changes semantics for all event that are not handled at all, but I'm not sure which events survive at this point (Superstep?). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] AHeise commented on a change in pull request #13228: [FLINK-19026][network] Improve threading model of CheckpointBarrierUnaligner
AHeise commented on a change in pull request #13228: URL: https://github.com/apache/flink/pull/13228#discussion_r476675096 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointedInputGate.java ## @@ -63,9 +66,38 @@ */ public CheckpointedInputGate( InputGate inputGate, - CheckpointBarrierHandler barrierHandler) { + CheckpointBarrierHandler barrierHandler, + MailboxExecutor mailboxExecutor) { this.inputGate = inputGate; this.barrierHandler = barrierHandler; + this.mailboxExecutor = mailboxExecutor; + + waitForPriorityEvents(inputGate, mailboxExecutor); + } + + /** +* Eagerly pulls and processes all priority events. Must be called from task thread. +* +* Basic assumption is that no priority event needs to be handled by the {@link StreamTaskNetworkInput}. +*/ + private void processPriorityEvents() throws IOException, InterruptedException { + // check if the priority event is still not processed (could have been pulled before mail was being executed) + final boolean hasPriorityEvents = inputGate.getPriorityEventAvailableFuture().isDone(); + if (hasPriorityEvents) { + // process as many priority events as possible + while (pollNext().map(BufferOrEvent::morePriorityEvents).orElse(false)) { + } Review comment: Yes, first this method checks if there is at least one priority event (priority future completed). If there is at least one, it starts processing the first one. At this point, it relies on `BufferOrEvent::morePriorityEvents` to be correct in both directions (no false positives or negatives; although a false negative would just be a tad slower). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] AHeise commented on a change in pull request #13228: [FLINK-19026][network] Improve threading model of CheckpointBarrierUnaligner
AHeise commented on a change in pull request #13228: URL: https://github.com/apache/flink/pull/13228#discussion_r476674004 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointedInputGate.java ## @@ -74,34 +106,34 @@ public CheckpointedInputGate( } @Override - public Optional pollNext() throws Exception { - while (true) { Review comment: Yes, it's changing semantics (as written in commit message). I have not found a good reason why it's not always exited and it makes things easier especially since this method can now be used to process priority events. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] AHeise commented on a change in pull request #13228: [FLINK-19026][network] Improve threading model of CheckpointBarrierUnaligner
AHeise commented on a change in pull request #13228: URL: https://github.com/apache/flink/pull/13228#discussion_r476673355 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointedInputGate.java ## @@ -63,9 +66,38 @@ */ public CheckpointedInputGate( InputGate inputGate, - CheckpointBarrierHandler barrierHandler) { + CheckpointBarrierHandler barrierHandler, + MailboxExecutor mailboxExecutor) { this.inputGate = inputGate; this.barrierHandler = barrierHandler; + this.mailboxExecutor = mailboxExecutor; + + waitForPriorityEvents(inputGate, mailboxExecutor); + } + + /** +* Eagerly pulls and processes all priority events. Must be called from task thread. +* +* Basic assumption is that no priority event needs to be handled by the {@link StreamTaskNetworkInput}. +*/ + private void processPriorityEvents() throws IOException, InterruptedException { + // check if the priority event is still not processed (could have been pulled before mail was being executed) + final boolean hasPriorityEvents = inputGate.getPriorityEventAvailableFuture().isDone(); + if (hasPriorityEvents) { + // process as many priority events as possible + while (pollNext().map(BufferOrEvent::morePriorityEvents).orElse(false)) { + } + } + + // re-enqueue mail to process priority events + waitForPriorityEvents(inputGate, mailboxExecutor); + } + + private void waitForPriorityEvents(InputGate inputGate, MailboxExecutor mailboxExecutor) { + final CompletableFuture priorityEventAvailableFuture = inputGate.getPriorityEventAvailableFuture(); + priorityEventAvailableFuture.thenRun(() -> { + mailboxExecutor.execute(this::processPriorityEvents, "process priority even @ gate %s", inputGate); + }); Review comment: 1. Nope, this assumption does not hold. That's why the first thing that `processPriorityEvents` does is to check if the future is still completed. If the task polled the only priority event in the meantime, the future has been reset. During the execution of `processPriorityEvents` in the task thread, the task cannot concurrently pull the priority event, so this is safe. 2.+3. The basic idea of not involving `StreamTaskNetworkInput#emitNext` or using `pollNext()` is to not make non-blocking output more complicated. Currently, `emitNext` or `pollNext` are only called when an output buffer is available. In the meantime only mails are processed. Hence, I used a mail to perform `processPriorityEvents`. Note that the assumption here is that no priority event ever need to be handled in `emitNext` (which currently only handles `EndOfPartitionEvent`) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] AHeise commented on a change in pull request #13228: [FLINK-19026][network] Improve threading model of CheckpointBarrierUnaligner
AHeise commented on a change in pull request #13228: URL: https://github.com/apache/flink/pull/13228#discussion_r476665640 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java ## @@ -195,28 +193,27 @@ void retriggerSubpartitionRequest(int subpartitionIndex) throws IOException { } @Override - public void spillInflightBuffers(long checkpointId, ChannelStateWriter channelStateWriter) throws IOException { + public void spillInflightBuffers(long checkpointId, ChannelStateWriter channelStateWriter) { synchronized (receivedBuffers) { - checkState(checkpointId > lastRequestedCheckpointId, "Need to request the next checkpointId"); - - final List inflightBuffers = new ArrayList<>(receivedBuffers.size()); - for (Buffer buffer : receivedBuffers) { - CheckpointBarrier checkpointBarrier = parseCheckpointBarrierOrNull(buffer); - if (checkpointBarrier != null && checkpointBarrier.getId() >= checkpointId) { - break; + final Integer numRecords = numRecordsOvertaken.remove(checkpointId); Review comment: Good catch, a leak could happen when the checkpoint is cancelled through another channel. The map itself is rather small, but it could add up over all channels and gates. I don't have a good idea on how to properly abstract this cleanup except by adding some kind of checkpoint-cancelled hook though. Alternatively, checkpoint barrier handler becomes more aware of the buffers to be spilled. So instead of calling `channel.spillInflightBuffers`, it could be `channel.getSpilledBuffers().forEach(channelStateWriter::write)` on a good checkpoint and `channel.getSpilledBuffers().forEach(Buffer::recycle)` on cancelled checkpoints, where `getSpilledBuffers` always cleans up this map. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] AHeise commented on a change in pull request #13228: [FLINK-19026][network] Improve threading model of CheckpointBarrierUnaligner
AHeise commented on a change in pull request #13228: URL: https://github.com/apache/flink/pull/13228#discussion_r476662192 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java ## @@ -770,34 +808,50 @@ void triggerPartitionStateCheck(ResultPartitionID partitionId) { })); } - private void queueChannel(InputChannel channel) { - int availableChannels; + private void queueChannel(InputChannel channel, boolean priority) { Review comment: Well this change is less about expressing priority events and more about making sure that channels with priority events are always polled first. It's some kind of potential double notification, where the priority notification overrides the normal data available notification. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] AHeise commented on a change in pull request #13228: [FLINK-19026][network] Improve threading model of CheckpointBarrierUnaligner
AHeise commented on a change in pull request #13228: URL: https://github.com/apache/flink/pull/13228#discussion_r476660758 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedSequenceNumberingViewReader.java ## @@ -133,14 +136,14 @@ public boolean isAvailable() { * @param bufferAndBacklog * current buffer and backlog including information about the next buffer */ - private boolean isAvailable(BufferAndBacklog bufferAndBacklog) { + @Nullable + private Buffer.DataType getNextDataType(BufferAndBacklog bufferAndBacklog) { // BEWARE: this must be in sync with #isAvailable()! - if (numCreditsAvailable > 0) { - return bufferAndBacklog.isDataAvailable(); - } - else { - return bufferAndBacklog.isEventAvailable(); + final Buffer.DataType nextDataType = bufferAndBacklog.getNextDataType(); + if (numCreditsAvailable > 0 || (nextDataType != null && nextDataType.isEvent())) { + return nextDataType; } + return null; Review comment: An enum type NONE would work for me and might make the code a bit clearer. However, be aware that this is mostly a copy I don't think it would simplify any code path. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] AHeise commented on a change in pull request #13228: [FLINK-19026][network] Improve threading model of CheckpointBarrierUnaligner
AHeise commented on a change in pull request #13228: URL: https://github.com/apache/flink/pull/13228#discussion_r476660921 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java ## @@ -621,61 +626,84 @@ public boolean isFinished() { return Optional.of(transformToBufferOrEvent( inputWithData.data.buffer(), inputWithData.moreAvailable, - inputWithData.input)); + inputWithData.input, + inputWithData.morePriorityEvents)); } private Optional> waitAndGetNextData(boolean blocking) throws IOException, InterruptedException { while (true) { - Optional inputChannel = getChannel(blocking); - if (!inputChannel.isPresent()) { + Optional inputChannelOpt = getChannel(blocking); + if (!inputChannelOpt.isPresent()) { return Optional.empty(); } // Do not query inputChannel under the lock, to avoid potential deadlocks coming from // notifications. - Optional result = inputChannel.get().getNextBuffer(); + final InputChannel inputChannel = inputChannelOpt.get(); + Optional bufferAndAvailabilityOpt = inputChannel.getNextBuffer(); synchronized (inputChannelsWithData) { - if (result.isPresent() && result.get().moreAvailable()) { + if (!bufferAndAvailabilityOpt.isPresent()) { + if (inputChannelsWithData.isEmpty()) { + availabilityHelper.resetUnavailable(); + } + continue; Review comment: Sry, I will split. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] AHeise commented on a change in pull request #13228: [FLINK-19026][network] Improve threading model of CheckpointBarrierUnaligner
AHeise commented on a change in pull request #13228: URL: https://github.com/apache/flink/pull/13228#discussion_r476659669 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java ## @@ -171,19 +175,42 @@ private void handleAddingBarrier(BufferConsumer bufferConsumer, boolean insertAs checkState(inflightBufferSnapshot.isEmpty(), "Supporting only one concurrent checkpoint in unaligned " + "checkpoints"); - // Meanwhile prepare the collection of in-flight buffers which would be fetched in the next step later. - for (BufferConsumer buffer : buffers) { - try (BufferConsumer bc = buffer.copy()) { - if (bc.isBuffer()) { - inflightBufferSnapshot.add(bc.build()); + final int pos = buffers.getNumPriorityElements(); + buffers.addPriorityElement(bufferConsumer); + + boolean unalignedCheckpoint = isUnalignedCheckpoint(bufferConsumer); + if (unalignedCheckpoint) { + final Iterator iterator = buffers.iterator(); + Iterators.advance(iterator, pos + 1); + while (iterator.hasNext()) { + BufferConsumer buffer = iterator.next(); + + if (buffer.isBuffer()) { + try (BufferConsumer bc = buffer.copy()) { + inflightBufferSnapshot.add(bc.build()); + } } } } + return; + } Review comment: In general, I wanted to drop the assumption that there is only one priority event going on at any given time. That's especially true when we make cancellation events also a priority and we have a more or less fully blocked channel. Specifically, this change had following motivations: * drop the assumption that all priority events are unaligned checkpoints. * drop the assumption that the new priority event is always at position 0. * a small performance improvement where buffers are only copied after it's clear that they are not containing an event. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] AHeise commented on a change in pull request #13228: [FLINK-19026][network] Improve threading model of CheckpointBarrierUnaligner
AHeise commented on a change in pull request #13228: URL: https://github.com/apache/flink/pull/13228#discussion_r476657231 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java ## @@ -171,19 +175,42 @@ private void handleAddingBarrier(BufferConsumer bufferConsumer, boolean insertAs checkState(inflightBufferSnapshot.isEmpty(), "Supporting only one concurrent checkpoint in unaligned " + "checkpoints"); - // Meanwhile prepare the collection of in-flight buffers which would be fetched in the next step later. - for (BufferConsumer buffer : buffers) { - try (BufferConsumer bc = buffer.copy()) { - if (bc.isBuffer()) { - inflightBufferSnapshot.add(bc.build()); + final int pos = buffers.getNumPriorityElements(); + buffers.addPriorityElement(bufferConsumer); + + boolean unalignedCheckpoint = isUnalignedCheckpoint(bufferConsumer); + if (unalignedCheckpoint) { + final Iterator iterator = buffers.iterator(); + Iterators.advance(iterator, pos + 1); + while (iterator.hasNext()) { + BufferConsumer buffer = iterator.next(); + + if (buffer.isBuffer()) { + try (BufferConsumer bc = buffer.copy()) { + inflightBufferSnapshot.add(bc.build()); + } } } } + return; + } + buffers.add(bufferConsumer); Review comment: Yes, good idea. In general that change looks a bit odd, because it's isolated from the upcoming changes (I had to split somewhere and probably didn't hit the sweet spot everywhere). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] zentol commented on a change in pull request #13163: [FLINK-16789][runtime] Support JMX RMI random port assign
zentol commented on a change in pull request #13163: URL: https://github.com/apache/flink/pull/13163#discussion_r476645574 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/management/JMXService.java ## @@ -85,6 +86,9 @@ private static JMXServer startJMXServerWithPortRanges(Iterator ports) { while (ports.hasNext() && successfullyStartedServer == null) { JMXServer server = new JMXServer(); int port = ports.next(); + if (port == 0) { // try poke with a random port when port is set to zero Review comment: > it would be better to do this conversion in the realm of the JMXServer Agreed. > Do we know that the startup period will be super long if we don't do this? It probably isn't long. In the port-probing version, you are essentially entering a race condition between all processes on this node. I would consider it unlikely you will have more than a hundred of TaskExecutors on a given node, and only at that point should it be relevant in the first place. I only suggested the special iterator to workaround the worst case scenario, something like the first 1000 ports already being allocated, where the current approach would likely fair miserably. But there clear downsides to having this be applied in general to all users of the NetUtils; it is _very_ convenient to define a port-range and be pretty much guaranteed that all processes start at the front, and form a continuous port sequence if present on a single node. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #13239: [FLINK-19021][network] Various cleanups and refactorings of the ResultPartition
flinkbot edited a comment on pull request #13239: URL: https://github.com/apache/flink/pull/13239#issuecomment-679946229 ## CI report: * fbf7c39bb2d6cb79f52a93efabe310cb2bafdee9 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5858) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-training] alpinegizmo merged pull request #13: [FLINK-18630] Redo the LongRideAlerts solutions to cover a corner case
alpinegizmo merged pull request #13: URL: https://github.com/apache/flink-training/pull/13 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #13240: [FLINK-19050][Documentation]
flinkbot edited a comment on pull request #13240: URL: https://github.com/apache/flink/pull/13240#issuecomment-680007217 ## CI report: * 11e18bf9587cfcc3a948ead892c826114a03bb4c Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5857) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #13236: [FLINK-19041][python] Add dependency management for ConnectedStream i…
flinkbot edited a comment on pull request #13236: URL: https://github.com/apache/flink/pull/13236#issuecomment-679860054 ## CI report: * 5ed0da03dffc085997903aa6868e97ea98340433 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5856) Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5842) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #13241: [hotfix] Fix a typo in the watermark docs
flinkbot edited a comment on pull request #13241: URL: https://github.com/apache/flink/pull/13241#issuecomment-680066941 ## CI report: * b8271b12c25e16aab38b9beb58d0f29835bee01a Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5859) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #13034: [FLINK-9992][tests] Fix FsStorageLocationReferenceTest#testEncodeAndDecode by adding retries to generate a valid path
flinkbot edited a comment on pull request #13034: URL: https://github.com/apache/flink/pull/13034#issuecomment-666932731 ## CI report: * db2c909c2b516c253c5f4f2afc1a0d200df72d85 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5854) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] pnowojski commented on a change in pull request #13228: [FLINK-19026][network] Improve threading model of CheckpointBarrierUnaligner
pnowojski commented on a change in pull request #13228: URL: https://github.com/apache/flink/pull/13228#discussion_r476481681 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedSequenceNumberingViewReader.java ## @@ -133,14 +136,14 @@ public boolean isAvailable() { * @param bufferAndBacklog * current buffer and backlog including information about the next buffer */ - private boolean isAvailable(BufferAndBacklog bufferAndBacklog) { + @Nullable + private Buffer.DataType getNextDataType(BufferAndBacklog bufferAndBacklog) { // BEWARE: this must be in sync with #isAvailable()! - if (numCreditsAvailable > 0) { - return bufferAndBacklog.isDataAvailable(); - } - else { - return bufferAndBacklog.isEventAvailable(); + final Buffer.DataType nextDataType = bufferAndBacklog.getNextDataType(); + if (numCreditsAvailable > 0 || (nextDataType != null && nextDataType.isEvent())) { + return nextDataType; } + return null; Review comment: hmmm, maybe add another enum type for this purpose, instead of having `null`? (I'm not sure, just brain storming) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] pnowojski commented on a change in pull request #13228: [FLINK-19026][network] Improve threading model of CheckpointBarrierUnaligner
pnowojski commented on a change in pull request #13228: URL: https://github.com/apache/flink/pull/13228#discussion_r476498494 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java ## @@ -621,61 +626,84 @@ public boolean isFinished() { return Optional.of(transformToBufferOrEvent( inputWithData.data.buffer(), inputWithData.moreAvailable, - inputWithData.input)); + inputWithData.input, + inputWithData.morePriorityEvents)); } private Optional> waitAndGetNextData(boolean blocking) throws IOException, InterruptedException { while (true) { - Optional inputChannel = getChannel(blocking); - if (!inputChannel.isPresent()) { + Optional inputChannelOpt = getChannel(blocking); + if (!inputChannelOpt.isPresent()) { return Optional.empty(); } // Do not query inputChannel under the lock, to avoid potential deadlocks coming from // notifications. - Optional result = inputChannel.get().getNextBuffer(); + final InputChannel inputChannel = inputChannelOpt.get(); + Optional bufferAndAvailabilityOpt = inputChannel.getNextBuffer(); synchronized (inputChannelsWithData) { - if (result.isPresent() && result.get().moreAvailable()) { + if (!bufferAndAvailabilityOpt.isPresent()) { + if (inputChannelsWithData.isEmpty()) { + availabilityHelper.resetUnavailable(); + } + continue; Review comment: maybe if the `result` variable rename and adding `continue` branch had happened in an independent "refactor" commit, It would have saved me a couple of minutes while reading this code while I was trying to understand the change :( maybe not, as I can see how the changes are a bit interconnected. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] pnowojski commented on a change in pull request #13228: [FLINK-19026][network] Improve threading model of CheckpointBarrierUnaligner
pnowojski commented on a change in pull request #13228: URL: https://github.com/apache/flink/pull/13228#discussion_r476480862 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedSequenceNumberingViewReader.java ## @@ -133,14 +136,14 @@ public boolean isAvailable() { * @param bufferAndBacklog * current buffer and backlog including information about the next buffer */ - private boolean isAvailable(BufferAndBacklog bufferAndBacklog) { + @Nullable Review comment: nit: add a javadoc explaining the returned value? ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java ## @@ -171,19 +175,42 @@ private void handleAddingBarrier(BufferConsumer bufferConsumer, boolean insertAs checkState(inflightBufferSnapshot.isEmpty(), "Supporting only one concurrent checkpoint in unaligned " + "checkpoints"); - // Meanwhile prepare the collection of in-flight buffers which would be fetched in the next step later. - for (BufferConsumer buffer : buffers) { - try (BufferConsumer bc = buffer.copy()) { - if (bc.isBuffer()) { - inflightBufferSnapshot.add(bc.build()); + final int pos = buffers.getNumPriorityElements(); + buffers.addPriorityElement(bufferConsumer); + + boolean unalignedCheckpoint = isUnalignedCheckpoint(bufferConsumer); + if (unalignedCheckpoint) { + final Iterator iterator = buffers.iterator(); + Iterators.advance(iterator, pos + 1); + while (iterator.hasNext()) { + BufferConsumer buffer = iterator.next(); + + if (buffer.isBuffer()) { + try (BufferConsumer bc = buffer.copy()) { + inflightBufferSnapshot.add(bc.build()); + } } } } + return; + } Review comment: Why do we need this change? In what scenarios are you expecting more than one priority event in the output buffer? (if there is a reason that I'm forgetting about, please add it to the commit message) edit: (after reading commit message a couple of times) Or you are just re-using here a class, that you are mostly intending to use later in the future (on the inputs?)? If so maybe it needs some more explanation in the commit message? ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java ## @@ -195,28 +193,27 @@ void retriggerSubpartitionRequest(int subpartitionIndex) throws IOException { } @Override - public void spillInflightBuffers(long checkpointId, ChannelStateWriter channelStateWriter) throws IOException { + public void spillInflightBuffers(long checkpointId, ChannelStateWriter channelStateWriter) { synchronized (receivedBuffers) { - checkState(checkpointId > lastRequestedCheckpointId, "Need to request the next checkpointId"); - - final List inflightBuffers = new ArrayList<>(receivedBuffers.size()); - for (Buffer buffer : receivedBuffers) { - CheckpointBarrier checkpointBarrier = parseCheckpointBarrierOrNull(buffer); - if (checkpointBarrier != null && checkpointBarrier.getId() >= checkpointId) { - break; + final Integer numRecords = numRecordsOvertaken.remove(checkpointId); Review comment: shouldn't we remove also obsolete values from this map? (to prevent a potential memory leak?) ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java ## @@ -171,19 +175,42 @@ private void handleAddingBarrier(BufferConsumer bufferConsumer, boolean insertAs checkState(inflightBufferSnapshot.isEmpty(), "Supporting only one concurrent checkpoint in unaligned " + "checkpoints"); - // Meanwhile prepare the collection of in-flight buffers which would be fetched in the next step later. - for (BufferConsumer buffer : buffers) { - try (BufferConsumer bc = buffer.copy()) { -
[GitHub] [flink] flinkbot edited a comment on pull request #13242: [FLINK-19009][metrics] Fixed the downtime metric issue and updated the comment
flinkbot edited a comment on pull request #13242: URL: https://github.com/apache/flink/pull/13242#issuecomment-680108384 ## CI report: * fc6dc041ef06000c57767f9665c34ecc70cff6da Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5863) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-19005) used metaspace grow on every execution
[ https://issues.apache.org/jira/browse/FLINK-19005?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guillermo Sánchez updated FLINK-19005: -- Attachment: (was: 0_executions.zip) > used metaspace grow on every execution > -- > > Key: FLINK-19005 > URL: https://issues.apache.org/jira/browse/FLINK-19005 > Project: Flink > Issue Type: Bug > Components: Client / Job Submission, Runtime / Configuration, > Runtime / Coordination >Affects Versions: 1.11.1 >Reporter: Guillermo Sánchez >Assignee: Chesnay Schepler >Priority: Major > Attachments: heap_dump_after_10_executions.zip, > heap_dump_after_1_execution.zip, heap_dump_echo_lee.tar.xz > > > Hi ! > Im running a 1.11.1 flink cluster, where I execute batch jobs made with > DataSet API. > I submit these jobs every day to calculate daily data. > In every execution, cluster's used metaspace increase by 7MB and its never > released. > This ends up with an OutOfMemoryError caused by Metaspace every 15 days and i > need to restart the cluster to clean the metaspace > taskmanager.memory.jvm-metaspace.size is set to 512mb > Any idea of what could be causing this metaspace grow and why is it not > released ? > > > === Summary == > > Case 1, reported by [~gestevez]: > * Flink 1.11.1 > * Java 11 > * Maximum Metaspace size set to 512mb > * Custom Batch job, submitted daily > * Requires restart every 15 days after an OOM > Case 2, reported by [~Echo Lee]: > * Flink 1.11.0 > * Java 11 > * G1GC > * WordCount Batch job, submitted every second / every 5 minutes > * eventually fails TaskExecutor with OOM > Case 3, reported by [~DaDaShen] > * Flink 1.11.0 > * Java 11 > * WordCount Batch job, submitted every 5 seconds > * growing Metaspace, eventually OOM > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-19005) used metaspace grow on every execution
[ https://issues.apache.org/jira/browse/FLINK-19005?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guillermo Sánchez updated FLINK-19005: -- Attachment: (was: WordCount_1_execution.zip) > used metaspace grow on every execution > -- > > Key: FLINK-19005 > URL: https://issues.apache.org/jira/browse/FLINK-19005 > Project: Flink > Issue Type: Bug > Components: Client / Job Submission, Runtime / Configuration, > Runtime / Coordination >Affects Versions: 1.11.1 >Reporter: Guillermo Sánchez >Assignee: Chesnay Schepler >Priority: Major > Attachments: heap_dump_after_10_executions.zip, > heap_dump_after_1_execution.zip, heap_dump_echo_lee.tar.xz > > > Hi ! > Im running a 1.11.1 flink cluster, where I execute batch jobs made with > DataSet API. > I submit these jobs every day to calculate daily data. > In every execution, cluster's used metaspace increase by 7MB and its never > released. > This ends up with an OutOfMemoryError caused by Metaspace every 15 days and i > need to restart the cluster to clean the metaspace > taskmanager.memory.jvm-metaspace.size is set to 512mb > Any idea of what could be causing this metaspace grow and why is it not > released ? > > > === Summary == > > Case 1, reported by [~gestevez]: > * Flink 1.11.1 > * Java 11 > * Maximum Metaspace size set to 512mb > * Custom Batch job, submitted daily > * Requires restart every 15 days after an OOM > Case 2, reported by [~Echo Lee]: > * Flink 1.11.0 > * Java 11 > * G1GC > * WordCount Batch job, submitted every second / every 5 minutes > * eventually fails TaskExecutor with OOM > Case 3, reported by [~DaDaShen] > * Flink 1.11.0 > * Java 11 > * WordCount Batch job, submitted every 5 seconds > * growing Metaspace, eventually OOM > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-19005) used metaspace grow on every execution
[ https://issues.apache.org/jira/browse/FLINK-19005?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guillermo Sánchez updated FLINK-19005: -- Attachment: (was: WordCount_15_executions.zip) > used metaspace grow on every execution > -- > > Key: FLINK-19005 > URL: https://issues.apache.org/jira/browse/FLINK-19005 > Project: Flink > Issue Type: Bug > Components: Client / Job Submission, Runtime / Configuration, > Runtime / Coordination >Affects Versions: 1.11.1 >Reporter: Guillermo Sánchez >Assignee: Chesnay Schepler >Priority: Major > Attachments: heap_dump_after_10_executions.zip, > heap_dump_after_1_execution.zip, heap_dump_echo_lee.tar.xz > > > Hi ! > Im running a 1.11.1 flink cluster, where I execute batch jobs made with > DataSet API. > I submit these jobs every day to calculate daily data. > In every execution, cluster's used metaspace increase by 7MB and its never > released. > This ends up with an OutOfMemoryError caused by Metaspace every 15 days and i > need to restart the cluster to clean the metaspace > taskmanager.memory.jvm-metaspace.size is set to 512mb > Any idea of what could be causing this metaspace grow and why is it not > released ? > > > === Summary == > > Case 1, reported by [~gestevez]: > * Flink 1.11.1 > * Java 11 > * Maximum Metaspace size set to 512mb > * Custom Batch job, submitted daily > * Requires restart every 15 days after an OOM > Case 2, reported by [~Echo Lee]: > * Flink 1.11.0 > * Java 11 > * G1GC > * WordCount Batch job, submitted every second / every 5 minutes > * eventually fails TaskExecutor with OOM > Case 3, reported by [~DaDaShen] > * Flink 1.11.0 > * Java 11 > * WordCount Batch job, submitted every 5 seconds > * growing Metaspace, eventually OOM > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] flinkbot commented on pull request #13242: [FLINK-19009][metrics] Fixed the downtime metric issue and updated the comment
flinkbot commented on pull request #13242: URL: https://github.com/apache/flink/pull/13242#issuecomment-680108384 ## CI report: * fc6dc041ef06000c57767f9665c34ecc70cff6da UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (FLINK-19013) Log start/end of state restoration
[ https://issues.apache.org/jira/browse/FLINK-19013?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler reassigned FLINK-19013: Assignee: Matt Wang > Log start/end of state restoration > -- > > Key: FLINK-19013 > URL: https://issues.apache.org/jira/browse/FLINK-19013 > Project: Flink > Issue Type: Improvement > Components: Runtime / State Backends >Reporter: Chesnay Schepler >Assignee: Matt Wang >Priority: Major > Fix For: 1.12.0 > > > State restoration can take a significant amount of time if the state is large > enough, or in special cases like FLINK-19008. > It would be useful for debugging if we'd log the start/end of > {{RestoreOperation#restore.}} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] flinkbot commented on pull request #13242: [FLINK-19009][metrics] Fixed the downtime metric issue and updated the comment
flinkbot commented on pull request #13242: URL: https://github.com/apache/flink/pull/13242#issuecomment-680105014 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Automated Checks Last check on commit fc6dc041ef06000c57767f9665c34ecc70cff6da (Tue Aug 25 15:41:53 UTC 2020) **Warnings:** * No documentation files were touched! Remember to keep the Flink docs up to date! Mention the bot in a comment to re-run the automated checks. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] aljoscha closed pull request #13237: [FLINK-17159] Harden ElasticsearchSinkITCase
aljoscha closed pull request #13237: URL: https://github.com/apache/flink/pull/13237 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] aljoscha commented on pull request #13237: [FLINK-17159] Harden ElasticsearchSinkITCase
aljoscha commented on pull request #13237: URL: https://github.com/apache/flink/pull/13237#issuecomment-680103938 Merged! Thanks for the speedy review! (And for the (external) suggestion to use the liveness probe) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-19009) wrong way to calculate the "downtime" metric
[ https://issues.apache.org/jira/browse/FLINK-19009?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-19009: --- Labels: pull-request-available (was: ) > wrong way to calculate the "downtime" metric > > > Key: FLINK-19009 > URL: https://issues.apache.org/jira/browse/FLINK-19009 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination, Runtime / Metrics >Affects Versions: 1.7.2, 1.8.0 >Reporter: Zhinan Cheng >Assignee: kevin liu >Priority: Trivial > Labels: pull-request-available > Fix For: 1.12.0 > > Original Estimate: 1h > Remaining Estimate: 1h > > Currently the way to calculate the Flink system metric "downtime" is not > consistent with the description in the doc, now the downtime is actually the > current timestamp minus the time timestamp when the job started. > > But Flink doc (https://flink.apache.org/gettinghelp.html) obviously describes > the time as the current timestamp minus the timestamp when the job failed. > > I believe we should update the code this metric as the Flink doc shows. The > easy way to solve this is using the current timestamp to minus the latest > uptime timestamp. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] Flyangz opened a new pull request #13242: [FLINK-19009][metrics] Fixed the downtime metric issue and updated the comment
Flyangz opened a new pull request #13242: URL: https://github.com/apache/flink/pull/13242 ## What is the purpose of the change Currently the way to calculate the Flink system metric "downtime" is not consistent with the description in the doc, now the downtime is actually the current timestamp minus the time timestamp when the job started. This pull request fixed this bug. ## Brief change log Calculate the downtime metric by using the current system timestamp to minus the latest failing time. ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (FLINK-17159) ES6 ElasticsearchSinkITCase unstable
[ https://issues.apache.org/jira/browse/FLINK-17159?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aljoscha Krettek closed FLINK-17159. Fix Version/s: 1.11.2 1.12.0 Resolution: Fixed Fix on release-1.11: 6c97f22913197e7a5a948f67b7a0b7f8fb480fa7 Please re-open if the issue persists. > ES6 ElasticsearchSinkITCase unstable > > > Key: FLINK-17159 > URL: https://issues.apache.org/jira/browse/FLINK-17159 > Project: Flink > Issue Type: Bug > Components: Connectors / ElasticSearch, Tests >Affects Versions: 1.11.0, 1.12.0 >Reporter: Chesnay Schepler >Assignee: Aljoscha Krettek >Priority: Critical > Labels: pull-request-available, test-stability > Fix For: 1.12.0, 1.11.2 > > > [https://dev.azure.com/rmetzger/Flink/_build/results?buildId=7482=logs=64110e28-73be-50d7-9369-8750330e0bf1=aa84fb9a-59ae-5696-70f7-011bc086e59b] > {code:java} > 2020-04-15T02:37:04.4289477Z [ERROR] > testElasticsearchSinkWithSmile(org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSinkITCase) > Time elapsed: 0.145 s <<< ERROR! > 2020-04-15T02:37:04.4290310Z > org.apache.flink.runtime.client.JobExecutionException: Job execution failed. > 2020-04-15T02:37:04.4290790Z at > org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147) > 2020-04-15T02:37:04.4291404Z at > org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:659) > 2020-04-15T02:37:04.4291956Z at > org.apache.flink.streaming.util.TestStreamEnvironment.execute(TestStreamEnvironment.java:77) > 2020-04-15T02:37:04.4292548Z at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1643) > 2020-04-15T02:37:04.4293254Z at > org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkTestBase.runElasticSearchSinkTest(ElasticsearchSinkTestBase.java:128) > 2020-04-15T02:37:04.4293990Z at > org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkTestBase.runElasticsearchSinkSmileTest(ElasticsearchSinkTestBase.java:106) > 2020-04-15T02:37:04.4295096Z at > org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSinkITCase.testElasticsearchSinkWithSmile(ElasticsearchSinkITCase.java:45) > 2020-04-15T02:37:04.4295923Z at > sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > 2020-04-15T02:37:04.4296489Z at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > 2020-04-15T02:37:04.4297076Z at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > 2020-04-15T02:37:04.4297513Z at > java.lang.reflect.Method.invoke(Method.java:498) > 2020-04-15T02:37:04.4297951Z at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) > 2020-04-15T02:37:04.4298688Z at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > 2020-04-15T02:37:04.4299374Z at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) > 2020-04-15T02:37:04.4300069Z at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > 2020-04-15T02:37:04.4300960Z at > org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55) > 2020-04-15T02:37:04.4301705Z at > org.junit.rules.RunRules.evaluate(RunRules.java:20) > 2020-04-15T02:37:04.4302204Z at > org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) > 2020-04-15T02:37:04.4302661Z at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) > 2020-04-15T02:37:04.4303234Z at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) > 2020-04-15T02:37:04.4303706Z at > org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) > 2020-04-15T02:37:04.4304127Z at > org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) > 2020-04-15T02:37:04.4304716Z at > org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) > 2020-04-15T02:37:04.4305394Z at > org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) > 2020-04-15T02:37:04.4305965Z at > org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) > 2020-04-15T02:37:04.4306425Z at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > 2020-04-15T02:37:04.4306942Z at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > 2020-04-15T02:37:04.4307466Z at > org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48) > 2020-04-15T02:37:04.4307920Z at > org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48) > 2020-04-15T02:37:04.4308375Z at > org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48) >
[GitHub] [flink] flinkbot edited a comment on pull request #13217: [FLINK-16866] Make job submission non-blocking
flinkbot edited a comment on pull request #13217: URL: https://github.com/apache/flink/pull/13217#issuecomment-678285884 ## CI report: * 3655fcea1966bfbcb85c86d6a159c354f20d6cc7 UNKNOWN * 52a49f0b0840aa9220de72d64c50a6b33f6adf92 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5832) * 805f3142619f652b96804ab3c65beca2ba50f5d4 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5860) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-17159) ES6 ElasticsearchSinkITCase unstable
[ https://issues.apache.org/jira/browse/FLINK-17159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17184125#comment-17184125 ] Aljoscha Krettek commented on FLINK-17159: -- Potential fix on master: f6467816334ae04da9e72ee759ad007d60bfdca7 > ES6 ElasticsearchSinkITCase unstable > > > Key: FLINK-17159 > URL: https://issues.apache.org/jira/browse/FLINK-17159 > Project: Flink > Issue Type: Bug > Components: Connectors / ElasticSearch, Tests >Affects Versions: 1.11.0, 1.12.0 >Reporter: Chesnay Schepler >Assignee: Aljoscha Krettek >Priority: Critical > Labels: pull-request-available, test-stability > > [https://dev.azure.com/rmetzger/Flink/_build/results?buildId=7482=logs=64110e28-73be-50d7-9369-8750330e0bf1=aa84fb9a-59ae-5696-70f7-011bc086e59b] > {code:java} > 2020-04-15T02:37:04.4289477Z [ERROR] > testElasticsearchSinkWithSmile(org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSinkITCase) > Time elapsed: 0.145 s <<< ERROR! > 2020-04-15T02:37:04.4290310Z > org.apache.flink.runtime.client.JobExecutionException: Job execution failed. > 2020-04-15T02:37:04.4290790Z at > org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147) > 2020-04-15T02:37:04.4291404Z at > org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:659) > 2020-04-15T02:37:04.4291956Z at > org.apache.flink.streaming.util.TestStreamEnvironment.execute(TestStreamEnvironment.java:77) > 2020-04-15T02:37:04.4292548Z at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1643) > 2020-04-15T02:37:04.4293254Z at > org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkTestBase.runElasticSearchSinkTest(ElasticsearchSinkTestBase.java:128) > 2020-04-15T02:37:04.4293990Z at > org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkTestBase.runElasticsearchSinkSmileTest(ElasticsearchSinkTestBase.java:106) > 2020-04-15T02:37:04.4295096Z at > org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSinkITCase.testElasticsearchSinkWithSmile(ElasticsearchSinkITCase.java:45) > 2020-04-15T02:37:04.4295923Z at > sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > 2020-04-15T02:37:04.4296489Z at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > 2020-04-15T02:37:04.4297076Z at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > 2020-04-15T02:37:04.4297513Z at > java.lang.reflect.Method.invoke(Method.java:498) > 2020-04-15T02:37:04.4297951Z at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) > 2020-04-15T02:37:04.4298688Z at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > 2020-04-15T02:37:04.4299374Z at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) > 2020-04-15T02:37:04.4300069Z at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > 2020-04-15T02:37:04.4300960Z at > org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55) > 2020-04-15T02:37:04.4301705Z at > org.junit.rules.RunRules.evaluate(RunRules.java:20) > 2020-04-15T02:37:04.4302204Z at > org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) > 2020-04-15T02:37:04.4302661Z at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) > 2020-04-15T02:37:04.4303234Z at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) > 2020-04-15T02:37:04.4303706Z at > org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) > 2020-04-15T02:37:04.4304127Z at > org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) > 2020-04-15T02:37:04.4304716Z at > org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) > 2020-04-15T02:37:04.4305394Z at > org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) > 2020-04-15T02:37:04.4305965Z at > org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) > 2020-04-15T02:37:04.4306425Z at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > 2020-04-15T02:37:04.4306942Z at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > 2020-04-15T02:37:04.4307466Z at > org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48) > 2020-04-15T02:37:04.4307920Z at > org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48) > 2020-04-15T02:37:04.4308375Z at > org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48) > 2020-04-15T02:37:04.4308782Z at > org.junit.rules.RunRules.evaluate(RunRules.java:20) > 2020-04-15T02:37:04.4309182Z at >
[GitHub] [flink] RocMarshal commented on a change in pull request #13225: [FLINK-18974][docs-zh]Translate the 'User-Defined Functions' page of "Application Development's DataStream API" into Chinese
RocMarshal commented on a change in pull request #13225: URL: https://github.com/apache/flink/pull/13225#discussion_r476500930 ## File path: docs/dev/user_defined_functions.zh.md ## @@ -23,16 +23,14 @@ specific language governing permissions and limitations under the License. --> -Most operations require a user-defined function. This section lists different -ways of how they can be specified. We also cover `Accumulators`, which can be -used to gain insights into your Flink application. +大多数操作都需要用户自定义函数。本节列出了实现用户自定义函数的不同方式。还会介绍 `Accumulators`(累加器),可用于深入了解你的 Flink 应用程序。 -## Implementing an interface +## 实现接口 Review comment: I think that we should improve on the section-titles in the whole page according to the item `4. 标题锚点链接` of [Flink Translation Specifications](https://cwiki.apache.org/confluence/display/FLINK/Flink+Translation+Specifications) ## File path: docs/dev/user_defined_functions.zh.md ## @@ -147,95 +142,75 @@ data.map (new RichMapFunction[String, Int] { -Rich functions provide, in addition to the user-defined function (map, -reduce, etc), four methods: `open`, `close`, `getRuntimeContext`, and -`setRuntimeContext`. These are useful for parameterizing the function -(see [Passing Parameters to Functions]({{ site.baseurl }}/dev/batch/index.html#passing-parameters-to-functions)), -creating and finalizing local state, accessing broadcast variables (see -[Broadcast Variables]({{ site.baseurl }}/dev/batch/index.html#broadcast-variables)), and for accessing runtime -information such as accumulators and counters (see -[Accumulators and Counters](#accumulators--counters)), and information -on iterations (see [Iterations]({{ site.baseurl }}/dev/batch/iterations.html)). +除了用户自定义的功能(map,reduce 等),富函数还提供了四个方法:`open`、`close`、`getRuntimeContext` 和 +`setRuntimeContext`。这些对于参数化功能很有用 +(参阅 [给函数传递参数]({{ site.baseurl }}/zh/dev/batch/index.html#passing-parameters-to-functions)), +创建和最终确定本地状态,访问广播变量(参阅 +[广播变量]({{ site.baseurl }}/zh/dev/batch/index.html#broadcast-variables)),以及访问运行时信息,例如累加器和计数器(参阅 +[累加器和计数器](#累加器和计数器)),以及迭代器的相关信息(参阅 [迭代器]({{ site.baseurl }}/zh/dev/batch/iterations.html))。 {% top %} -## Accumulators & Counters +## 累加器和计数器 -Accumulators are simple constructs with an **add operation** and a **final accumulated result**, -which is available after the job ended. +累加器是具有**加法运算**和**最终累加结果**的一种简单结构,可在作业结束后使用。 -The most straightforward accumulator is a **counter**: You can increment it using the -```Accumulator.add(V value)``` method. At the end of the job Flink will sum up (merge) all partial -results and send the result to the client. Accumulators are useful during debugging or if you -quickly want to find out more about your data. +最简单的累加器就是**计数器**: 你可以使用 +```Accumulator.add(V value)``` 方法将其递增。在作业结束时,Flink 会汇总(合并)所有部分的结果并将其发送给客户端。 +在调试过程中或在你想快速了解有关数据更多信息时,累加器作用很大。 -Flink currently has the following **built-in accumulators**. Each of them implements the -{% gh_link /flink-core/src/main/java/org/apache/flink/api/common/accumulators/Accumulator.java "Accumulator" %} -interface. +Flink 目前有如下**内置累加器**。每个都实现了 +{% gh_link /flink-core/src/main/java/org/apache/flink/api/common/accumulators/Accumulator.java "累加器" %} +接口。 - {% gh_link /flink-core/src/main/java/org/apache/flink/api/common/accumulators/IntCounter.java "__IntCounter__" %}, {% gh_link /flink-core/src/main/java/org/apache/flink/api/common/accumulators/LongCounter.java "__LongCounter__" %} - and {% gh_link /flink-core/src/main/java/org/apache/flink/api/common/accumulators/DoubleCounter.java "__DoubleCounter__" %}: - See below for an example using a counter. -- {% gh_link /flink-core/src/main/java/org/apache/flink/api/common/accumulators/Histogram.java "__Histogram__" %}: - A histogram implementation for a discrete number of bins. Internally it is just a map from Integer - to Integer. You can use this to compute distributions of values, e.g. the distribution of - words-per-line for a word count program. + 和 {% gh_link /flink-core/src/main/java/org/apache/flink/api/common/accumulators/DoubleCounter.java "__DoubleCounter__" %}: + 有关使用计数器的示例,请参见下文。 +- {% gh_link /flink-core/src/main/java/org/apache/flink/api/common/accumulators/Histogram.java "__直方图__" %}: + 离散数量的柱状直方图实现。 在内部,它只是整形到整形的映射。你可以使用它来计算值的分布,例如,单词计数程序的每行单词的分布情况。 -__How to use accumulators:__ +__如何使用累加器:__ -First you have to create an accumulator object (here a counter) in the user-defined transformation -function where you want to use it. +首先,你要在需要使用累加器的用户自定义的转换函数中创建一个累加器对象(此处是计数器) 。 {% highlight java %} private IntCounter numLines = new IntCounter(); {% endhighlight %} -Second you have to register the accumulator object, typically in the ```open()``` method of the -*rich* function. Here you also define the name. +其次,你必须在富函数的```open()```方法中注册累加器对象。也可以在此处定义名称。 {% highlight java %} getRuntimeContext().addAccumulator("num-lines", this.numLines); {% endhighlight %}
[GitHub] [flink] flinkbot edited a comment on pull request #13241: [hotfix] Fix a typo in the watermark docs
flinkbot edited a comment on pull request #13241: URL: https://github.com/apache/flink/pull/13241#issuecomment-680066941 ## CI report: * b8271b12c25e16aab38b9beb58d0f29835bee01a Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5859) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] rmetzger commented on pull request #13217: [FLINK-16866] Make job submission non-blocking
rmetzger commented on pull request #13217: URL: https://github.com/apache/flink/pull/13217#issuecomment-680075151 Thanks a lot for your extensive and detailed review. It was very helpful for me! I have addressed most of you your comments. I extended the `DispatcherJobTest`. I'm in the process of testing if the refactoring to the client has introduced any new test failures, and I might clean up my new tests in `DispatcherJob` (I would thus recommend you to review them last). I might push a small change to the client code later today to better distinguish between initialization and runtime failures, so that we can solve the unstable `FunctionITCase.testInvalidUseOfTableFunction()` test. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-18695) Allow NettyBufferPool to allocate heap buffers
[ https://issues.apache.org/jira/browse/FLINK-18695?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17184102#comment-17184102 ] Chesnay Schepler commented on FLINK-18695: -- Given that we will allocate a heap memory anyway when passing things to the ssl engine it should be fine to set heap arenas to 0. That _should_ behave exactly like the current ode. > Allow NettyBufferPool to allocate heap buffers > -- > > Key: FLINK-18695 > URL: https://issues.apache.org/jira/browse/FLINK-18695 > Project: Flink > Issue Type: Improvement > Components: Runtime / Network >Reporter: Chesnay Schepler >Assignee: Yun Gao >Priority: Major > Fix For: 1.12.0 > > > in 4.1.43 netty made a change to their SslHandler to always use heap buffers > for JDK SSLEngine implementations, to avoid an additional memory copy. > However, our {{NettyBufferPool}} forbids heap buffer allocations. > We will either have to allow heap buffer allocations, or create a custom > SslHandler implementation that does not use heap buffers (although this seems > ill-adviced?). > /cc [~sewen] [~uce] [~NicoK] [~zjwang] [~pnowojski] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-19005) used metaspace grow on every execution
[ https://issues.apache.org/jira/browse/FLINK-19005?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guillermo Sánchez updated FLINK-19005: -- Attachment: 0_executions.zip WordCount_1_execution.zip WordCount_15_executions.zip > used metaspace grow on every execution > -- > > Key: FLINK-19005 > URL: https://issues.apache.org/jira/browse/FLINK-19005 > Project: Flink > Issue Type: Bug > Components: Client / Job Submission, Runtime / Configuration, > Runtime / Coordination >Affects Versions: 1.11.1 >Reporter: Guillermo Sánchez >Assignee: Chesnay Schepler >Priority: Major > Attachments: 0_executions.zip, WordCount_15_executions.zip, > WordCount_1_execution.zip, heap_dump_after_10_executions.zip, > heap_dump_after_1_execution.zip, heap_dump_echo_lee.tar.xz > > > Hi ! > Im running a 1.11.1 flink cluster, where I execute batch jobs made with > DataSet API. > I submit these jobs every day to calculate daily data. > In every execution, cluster's used metaspace increase by 7MB and its never > released. > This ends up with an OutOfMemoryError caused by Metaspace every 15 days and i > need to restart the cluster to clean the metaspace > taskmanager.memory.jvm-metaspace.size is set to 512mb > Any idea of what could be causing this metaspace grow and why is it not > released ? > > > === Summary == > > Case 1, reported by [~gestevez]: > * Flink 1.11.1 > * Java 11 > * Maximum Metaspace size set to 512mb > * Custom Batch job, submitted daily > * Requires restart every 15 days after an OOM > Case 2, reported by [~Echo Lee]: > * Flink 1.11.0 > * Java 11 > * G1GC > * WordCount Batch job, submitted every second / every 5 minutes > * eventually fails TaskExecutor with OOM > Case 3, reported by [~DaDaShen] > * Flink 1.11.0 > * Java 11 > * WordCount Batch job, submitted every 5 seconds > * growing Metaspace, eventually OOM > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-19013) Log start/end of state restoration
[ https://issues.apache.org/jira/browse/FLINK-19013?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17184098#comment-17184098 ] Matt Wang commented on FLINK-19013: --- Agree it, it would be useful while debugging. hi, [~chesnay] I'm interested in this, can I task this? > Log start/end of state restoration > -- > > Key: FLINK-19013 > URL: https://issues.apache.org/jira/browse/FLINK-19013 > Project: Flink > Issue Type: Improvement > Components: Runtime / State Backends >Reporter: Chesnay Schepler >Priority: Major > Fix For: 1.12.0 > > > State restoration can take a significant amount of time if the state is large > enough, or in special cases like FLINK-19008. > It would be useful for debugging if we'd log the start/end of > {{RestoreOperation#restore.}} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] tillrohrmann commented on a change in pull request #13186: [FLINK-18720][k8s] Migrate KubernetesResourceManager to the new KubernetesResourceManagerDriver
tillrohrmann commented on a change in pull request #13186: URL: https://github.com/apache/flink/pull/13186#discussion_r476473271 ## File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManagerDriver.java ## @@ -0,0 +1,301 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions; +import org.apache.flink.kubernetes.configuration.KubernetesResourceManagerConfiguration; +import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient; +import org.apache.flink.kubernetes.kubeclient.factory.KubernetesTaskManagerFactory; +import org.apache.flink.kubernetes.kubeclient.parameters.KubernetesTaskManagerParameters; +import org.apache.flink.kubernetes.kubeclient.resources.KubernetesPod; +import org.apache.flink.kubernetes.kubeclient.resources.KubernetesWatch; +import org.apache.flink.kubernetes.utils.KubernetesUtils; +import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.clusterframework.BootstrapTools; +import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters; +import org.apache.flink.runtime.clusterframework.TaskExecutorProcessSpec; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.externalresource.ExternalResourceUtils; +import org.apache.flink.runtime.resourcemanager.active.AbstractResourceManagerDriver; +import org.apache.flink.runtime.resourcemanager.active.ResourceManagerDriver; +import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; + +/** + * Implementation of {@link ResourceManagerDriver} for Kubernetes deployment. + */ +public class KubernetesResourceManagerDriver extends AbstractResourceManagerDriver + implements FlinkKubeClient.PodCallbackHandler { + + /** The taskmanager pod name pattern is {clusterId}-{taskmanager}-{attemptId}-{podIndex}. */ + private static final String TASK_MANAGER_POD_FORMAT = "%s-taskmanager-%d-%d"; + + private final String clusterId; + + private final FlinkKubeClient kubeClient; + + /** Request resource futures, keyed by pod names. */ + private final Map> requestResourceFutures; + + /** When ResourceManager failover, the max attempt should recover. */ + private long currentMaxAttemptId = 0; + + /** Current max pod index. When creating a new pod, it should increase one. */ + private long currentMaxPodId = 0; + + private KubernetesWatch podsWatch; + + public KubernetesResourceManagerDriver( + Configuration flinkConfig, + FlinkKubeClient kubeClient, + KubernetesResourceManagerConfiguration configuration) { + super(flinkConfig, GlobalConfiguration.loadConfiguration()); + + this.clusterId = Preconditions.checkNotNull(configuration.getClusterId()); + this.kubeClient = Preconditions.checkNotNull(kubeClient); + this.requestResourceFutures = new HashMap<>(); + } + + // + // ResourceManagerDriver + // + + @Override + protected void initializeInternal() throws Exception { + recoverWorkerNodesFromPreviousAttempts(); + + podsWatch = kubeClient.watchPodsAndDoCallback( + KubernetesUtils.getTaskManagerLabels(clusterId), +
[GitHub] [flink] flinkbot commented on pull request #13241: [hotfix] Fix a typo in the watermark docs
flinkbot commented on pull request #13241: URL: https://github.com/apache/flink/pull/13241#issuecomment-680066941 ## CI report: * b8271b12c25e16aab38b9beb58d0f29835bee01a UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #13217: [FLINK-16866] Make job submission non-blocking
flinkbot edited a comment on pull request #13217: URL: https://github.com/apache/flink/pull/13217#issuecomment-678285884 ## CI report: * 3655fcea1966bfbcb85c86d6a159c354f20d6cc7 UNKNOWN * 52a49f0b0840aa9220de72d64c50a6b33f6adf92 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5832) * 805f3142619f652b96804ab3c65beca2ba50f5d4 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot commented on pull request #13241: [hotfix] Fix a typo in the watermark docs
flinkbot commented on pull request #13241: URL: https://github.com/apache/flink/pull/13241#issuecomment-680063288 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Automated Checks Last check on commit b8271b12c25e16aab38b9beb58d0f29835bee01a (Tue Aug 25 14:34:15 UTC 2020) **Warnings:** * Documentation files were touched, but no `.zh.md` files: Update Chinese documentation or file Jira ticket. Mention the bot in a comment to re-run the automated checks. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] afedulov opened a new pull request #13241: [hotfix] Fix a typo in the watermark docs
afedulov opened a new pull request #13241: URL: https://github.com/apache/flink/pull/13241 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-19043) Translate the 'Logging' page of 'Debugging & Monitoring' into Chinese
[ https://issues.apache.org/jira/browse/FLINK-19043?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Roc Marshal updated FLINK-19043: Summary: Translate the 'Logging' page of 'Debugging & Monitoring' into Chinese (was: Translate the page 'Logging' of 'Debugging & Monitoring' into Chinese) > Translate the 'Logging' page of 'Debugging & Monitoring' into Chinese > - > > Key: FLINK-19043 > URL: https://issues.apache.org/jira/browse/FLINK-19043 > Project: Flink > Issue Type: Improvement > Components: chinese-translation, Documentation >Affects Versions: 1.10.0, 1.10.1, 1.11.0, 1.10.2, 1.11.1 >Reporter: Roc Marshal >Assignee: Roc Marshal >Priority: Major > Labels: Documentation, Translation, translation-zh > > The page url is : > [Logging|https://ci.apache.org/projects/flink/flink-docs-release-1.11/monitoring/logging.html] > The markdown file location is : flink/docs/monitoring/logging.zh.md -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] flinkbot edited a comment on pull request #13238: [FLINK-19042][hive] Remove print table sink from HiveTableSourceITCas…
flinkbot edited a comment on pull request #13238: URL: https://github.com/apache/flink/pull/13238#issuecomment-679946137 ## CI report: * e85e2428cac727ca774025ca106d2cb0c319e78e Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5852) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-18695) Allow NettyBufferPool to allocate heap buffers
[ https://issues.apache.org/jira/browse/FLINK-18695?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17184089#comment-17184089 ] Yun Gao commented on FLINK-18695: - Hi [~chesnay] I also agree with you that there are both cases in the implementation of SSLHandler. I mentioned the PERFER_DIRECT flag in the test since it seems to have some impact on the overall direct memory used, but it did should not be the barrier for the current issue and we could consider it separately. For enabling heap buffer, another possible option from my side might be we keep PERFER_DIRECT = true unchanged and allocates 0 heap arenas. In this case the heap buffer would be allocated directly from the process's memory without cache (Since cache cause more memory consumption). This method did changes the footprint for the heap memory, but since the actually used amount of heap memory seems to be limited (several hundreds KB) according to the current tests, I'm not sure if it is acceptable. > Allow NettyBufferPool to allocate heap buffers > -- > > Key: FLINK-18695 > URL: https://issues.apache.org/jira/browse/FLINK-18695 > Project: Flink > Issue Type: Improvement > Components: Runtime / Network >Reporter: Chesnay Schepler >Assignee: Yun Gao >Priority: Major > Fix For: 1.12.0 > > > in 4.1.43 netty made a change to their SslHandler to always use heap buffers > for JDK SSLEngine implementations, to avoid an additional memory copy. > However, our {{NettyBufferPool}} forbids heap buffer allocations. > We will either have to allow heap buffer allocations, or create a custom > SslHandler implementation that does not use heap buffers (although this seems > ill-adviced?). > /cc [~sewen] [~uce] [~NicoK] [~zjwang] [~pnowojski] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] flinkbot edited a comment on pull request #13239: [FLINK-19021][network] Various cleanups and refactorings of the ResultPartition
flinkbot edited a comment on pull request #13239: URL: https://github.com/apache/flink/pull/13239#issuecomment-679946229 ## CI report: * af13efd2abeedd3f3d3a7db9e28d6a32456f8e52 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5853) * fbf7c39bb2d6cb79f52a93efabe310cb2bafdee9 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5858) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-19041) Add dependency management for ConnectedStream in Python DataStream API.
[ https://issues.apache.org/jira/browse/FLINK-19041?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hequn Cheng updated FLINK-19041: Affects Version/s: 1.12.0 > Add dependency management for ConnectedStream in Python DataStream API. > --- > > Key: FLINK-19041 > URL: https://issues.apache.org/jira/browse/FLINK-19041 > Project: Flink > Issue Type: Bug > Components: API / Python >Affects Versions: 1.12.0 >Reporter: Shuiqiang Chen >Assignee: Shuiqiang Chen >Priority: Major > Labels: pull-request-available > Fix For: 1.12.0 > > > We failed to set merged configurations into > DataStreamTwoInputPythonStatelessFunctionOperator when finally generating the > StreamGraph. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-18695) Allow NettyBufferPool to allocate heap buffers
[ https://issues.apache.org/jira/browse/FLINK-18695?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17184076#comment-17184076 ] Chesnay Schepler commented on FLINK-18695: -- [~gaoyunhaii] there are certainly cases where PREFER_DIRECT can have an effect; it's just that there are [others|https://github.com/netty/netty/blob/8c5b72aaf02e7f349a9972dd9179b449b5a6067b/handler/src/main/java/io/netty/handler/ssl/SslHandler.java#L312] [where|https://github.com/netty/netty/blob/8c5b72aaf02e7f349a9972dd9179b449b5a6067b/handler/src/main/java/io/netty/handler/ssl/SslHandler.java#L22369] it might not. The first instance specifically is the one that was changed in [this commit|https://github.com/netty/netty/commit/39cc7a673939dec96258ff27f5b1874671838af0], which appears to be the crux of the issue. > Allow NettyBufferPool to allocate heap buffers > -- > > Key: FLINK-18695 > URL: https://issues.apache.org/jira/browse/FLINK-18695 > Project: Flink > Issue Type: Improvement > Components: Runtime / Network >Reporter: Chesnay Schepler >Assignee: Yun Gao >Priority: Major > Fix For: 1.12.0 > > > in 4.1.43 netty made a change to their SslHandler to always use heap buffers > for JDK SSLEngine implementations, to avoid an additional memory copy. > However, our {{NettyBufferPool}} forbids heap buffer allocations. > We will either have to allow heap buffer allocations, or create a custom > SslHandler implementation that does not use heap buffers (although this seems > ill-adviced?). > /cc [~sewen] [~uce] [~NicoK] [~zjwang] [~pnowojski] -- This message was sent by Atlassian Jira (v8.3.4#803005)