[GitHub] [flink] TisonKun commented on issue #9861: [FLINK-14237][yarn] No need to rename shipped Flink jar
TisonKun commented on issue #9861: [FLINK-14237][yarn] No need to rename shipped Flink jar URL: https://github.com/apache/flink/pull/9861#issuecomment-540927126 Thanks for your review @tillrohrmann ! I'd verify the change on a YARN cluster first and address comment while merging. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #9805: [FLINK-11635][docs-zh] translate dev/stream/state/checkpointing into Chinese
flinkbot edited a comment on issue #9805: [FLINK-11635][docs-zh] translate dev/stream/state/checkpointing into Chinese URL: https://github.com/apache/flink/pull/9805#issuecomment-536295524 ## CI report: * 9ad039de6bbb953c7e2dd74b3118d80ae552f31e : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/129590285) * 5542dca83dbfb476114efb5aa2f2aa8d0626183a : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/129626942) * d4c5b15213eb89ee29880c7c8259622c9c7f9bb9 : UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #9831: [FLINK-14278] Extend DispatcherResourceManagerComponentFactory.create to take ioExecutor
flinkbot edited a comment on issue #9831: [FLINK-14278] Extend DispatcherResourceManagerComponentFactory.create to take ioExecutor URL: https://github.com/apache/flink/pull/9831#issuecomment-537027384 ## CI report: * e1d37ab03c1bc62b3d45c45ee11d616ed2c78b0c : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/129858618) * ba5dc1be84ded55f91fad1698e0edb2ca7416add : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/130066271) * 56535ce31d6f51fc781b18b601e086ea5d0634d7 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/130858216) * 4ea8b42e4ce029ee7f74f28dd0006fe22abf5f44 : UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #9832: [FLINK-11843] Bind lifespan of Dispatcher to leader session
flinkbot edited a comment on issue #9832: [FLINK-11843] Bind lifespan of Dispatcher to leader session URL: https://github.com/apache/flink/pull/9832#issuecomment-537049332 ## CI report: * abaae048fef753455970fac9d6ab421b660b0536 : UNKNOWN * b96c63552ccd322adae7a41a410615e95b538ece : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/129867595) * 2c95a3939dbf0259d694af6c69451f0ede3c3891 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/130066310) * 6e030add922011ea54178690f171911d0139f14b : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/130858235) * 271703eda6f6c55b1641a54206109ef659f62854 : UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #9805: [FLINK-11635][docs-zh] translate dev/stream/state/checkpointing into Chinese
flinkbot edited a comment on issue #9805: [FLINK-11635][docs-zh] translate dev/stream/state/checkpointing into Chinese URL: https://github.com/apache/flink/pull/9805#issuecomment-536295524 ## CI report: * 9ad039de6bbb953c7e2dd74b3118d80ae552f31e : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/129590285) * 5542dca83dbfb476114efb5aa2f2aa8d0626183a : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/129626942) * d4c5b15213eb89ee29880c7c8259622c9c7f9bb9 : PENDING [Build](https://travis-ci.com/flink-ci/flink/builds/131454148) * a8394e9c2ba5aa8186e67895ad68a325f24a8a31 : UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #9879: [FLINK-14300][runtime, test] Add test making sure that RecordWriter is properly closed in case of early StreamTask failure
zhijiangW commented on a change in pull request #9879: [FLINK-14300][runtime, test] Add test making sure that RecordWriter is properly closed in case of early StreamTask failure URL: https://github.com/apache/flink/pull/9879#discussion_r333846291 ## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamConfigChainer.java ## @@ -109,14 +132,14 @@ private void head(StreamOperator headOperator, OperatorID headOperatorID) { tailConfig.setStateKeySerializer(inputSerializer); } tailConfig.setChainIndex(chainIndex); + tailConfig.setBufferTimeout(bufferTimeout); Review comment: Why we change the previous timeout 0 here? And it would result in some unit tests failure in travis. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #9805: [FLINK-11635][docs-zh] translate dev/stream/state/checkpointing into Chinese
flinkbot edited a comment on issue #9805: [FLINK-11635][docs-zh] translate dev/stream/state/checkpointing into Chinese URL: https://github.com/apache/flink/pull/9805#issuecomment-536295524 ## CI report: * 9ad039de6bbb953c7e2dd74b3118d80ae552f31e : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/129590285) * 5542dca83dbfb476114efb5aa2f2aa8d0626183a : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/129626942) * d4c5b15213eb89ee29880c7c8259622c9c7f9bb9 : CANCELED [Build](https://travis-ci.com/flink-ci/flink/builds/131454148) * a8394e9c2ba5aa8186e67895ad68a325f24a8a31 : PENDING [Build](https://travis-ci.com/flink-ci/flink/builds/131455928) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-14318) JDK11 build stalls during shading
[ https://issues.apache.org/jira/browse/FLINK-14318?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yu Li updated FLINK-14318: -- Labels: test-stability (was: ) Another instance: https://api.travis-ci.org/v3/job/596081750/log.txt > JDK11 build stalls during shading > - > > Key: FLINK-14318 > URL: https://issues.apache.org/jira/browse/FLINK-14318 > Project: Flink > Issue Type: Bug > Components: Build System >Affects Versions: 1.10.0 >Reporter: Gary Yao >Priority: Critical > Labels: test-stability > > JDK11 build stalls during shading. > Travis stage: e2d - misc - jdk11 > https://travis-ci.org/apache/flink/builds/593022581?utm_source=slack_medium=notification > https://api.travis-ci.org/v3/job/593022629/log.txt > Relevant excerpt from logs: > {noformat} > 01:53:43.889 [INFO] > > 01:53:43.889 [INFO] Building flink-metrics-reporter-prometheus-test > 1.10-SNAPSHOT > 01:53:43.889 [INFO] > > ... > 01:53:44.508 [INFO] Including > org.apache.flink:force-shading:jar:1.10-SNAPSHOT in the shaded jar. > 01:53:44.508 [INFO] Excluding org.slf4j:slf4j-api:jar:1.7.15 from the shaded > jar. > 01:53:44.508 [INFO] Excluding com.google.code.findbugs:jsr305:jar:1.3.9 from > the shaded jar. > 01:53:44.508 [INFO] No artifact matching filter io.netty:netty > 01:53:44.522 [INFO] Replacing original artifact with shaded artifact. > 01:53:44.523 [INFO] Replacing > /home/travis/build/apache/flink/flink-end-to-end-tests/flink-metrics-reporter-prometheus-test/target/flink-metrics-reporter-prometheus-test-1.10-SNAPSHOT.jar > with > /home/travis/build/apache/flink/flink-end-to-end-tests/flink-metrics-reporter-prometheus-test/target/flink-metrics-reporter-prometheus-test-1.10-SNAPSHOT-shaded.jar > 01:53:44.524 [INFO] Replacing original test artifact with shaded test > artifact. > 01:53:44.524 [INFO] Replacing > /home/travis/build/apache/flink/flink-end-to-end-tests/flink-metrics-reporter-prometheus-test/target/flink-metrics-reporter-prometheus-test-1.10-SNAPSHOT-tests.jar > with > /home/travis/build/apache/flink/flink-end-to-end-tests/flink-metrics-reporter-prometheus-test/target/flink-metrics-reporter-prometheus-test-1.10-SNAPSHOT-shaded-tests.jar > 01:53:44.524 [INFO] Dependency-reduced POM written at: > /home/travis/build/apache/flink/flink-end-to-end-tests/flink-metrics-reporter-prometheus-test/target/dependency-reduced-pom.xml > No output has been received in the last 10m0s, this potentially indicates a > stalled build or something wrong with the build itself. > Check the details on how to adjust your build configuration on: > https://docs.travis-ci.com/user/common-build-problems/#build-times-out-because-no-output-was-received > The build has been terminated > {noformat} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (FLINK-14212) Support Python UDFs without arguments
[ https://issues.apache.org/jira/browse/FLINK-14212?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hequn Cheng reassigned FLINK-14212: --- Assignee: Wei Zhong > Support Python UDFs without arguments > - > > Key: FLINK-14212 > URL: https://issues.apache.org/jira/browse/FLINK-14212 > Project: Flink > Issue Type: Sub-task > Components: API / Python >Reporter: Dian Fu >Assignee: Wei Zhong >Priority: Major > Labels: pull-request-available > Fix For: 1.10.0 > > Time Spent: 10m > Remaining Estimate: 0h > > We should support Python UDFs without arguments -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] Henvealf commented on issue #9805: [FLINK-11635][docs-zh] translate dev/stream/state/checkpointing into Chinese
Henvealf commented on issue #9805: [FLINK-11635][docs-zh] translate dev/stream/state/checkpointing into Chinese URL: https://github.com/apache/flink/pull/9805#issuecomment-540922883 > Thanks @wuchong, I do have other concerns. > > I've left several comments, regarding translations that IMO are not easy to understand. These comments are marked 'resolved' by @Henvealf without addressed or any further explanation. I have commit the update. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Assigned] (FLINK-14208) Optimize Python UDFs with parameters of constant values
[ https://issues.apache.org/jira/browse/FLINK-14208?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hequn Cheng reassigned FLINK-14208: --- Assignee: Huang Xingbo > Optimize Python UDFs with parameters of constant values > --- > > Key: FLINK-14208 > URL: https://issues.apache.org/jira/browse/FLINK-14208 > Project: Flink > Issue Type: Sub-task > Components: API / Python >Reporter: Dian Fu >Assignee: Huang Xingbo >Priority: Major > Fix For: 1.10.0 > > > We need support Python UDFs with parameters of constant values. It should be > noticed that the constant parameters are not needed to be transferred between > the Java operator and the Python worker. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] zhijiangW commented on a change in pull request #9879: [FLINK-14300][runtime, test] Add test making sure that RecordWriter is properly closed in case of early StreamTask failure
zhijiangW commented on a change in pull request #9879: [FLINK-14300][runtime, test] Add test making sure that RecordWriter is properly closed in case of early StreamTask failure URL: https://github.com/apache/flink/pull/9879#discussion_r333842749 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java ## @@ -56,6 +57,10 @@ */ public abstract class RecordWriter { + /** Default name for teh output flush thread, if no name with a task reference is given. */ Review comment: typo: teh? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #9879: [FLINK-14300][runtime, test] Add test making sure that RecordWriter is properly closed in case of early StreamTask failure
zhijiangW commented on a change in pull request #9879: [FLINK-14300][runtime, test] Add test making sure that RecordWriter is properly closed in case of early StreamTask failure URL: https://github.com/apache/flink/pull/9879#discussion_r333843383 ## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamConfigChainer.java ## @@ -42,22 +44,29 @@ public class StreamConfigChainer { private final StreamConfig headConfig; private final Map chainedConfigs = new HashMap<>(); + private final long bufferTimeout; private StreamConfig tailConfig; private int chainIndex = 0; public StreamConfigChainer(OperatorID headOperatorID, StreamOperator headOperator, StreamConfig headConfig) { + this(headOperatorID, SimpleOperatorFactory.of(headOperator), headConfig); + } + + public StreamConfigChainer(OperatorID headOperatorID, StreamOperatorFactory headOperatorFactory, StreamConfig headConfig) { this.headConfig = checkNotNull(headConfig); this.tailConfig = checkNotNull(headConfig); + this.bufferTimeout = headConfig.getBufferTimeout(); - head(headOperator, headOperatorID); + head(headOperatorID, headOperatorFactory); } - private void head(StreamOperator headOperator, OperatorID headOperatorID) { - headConfig.setStreamOperator(headOperator); + private void head(OperatorID headOperatorID, StreamOperatorFactory headOperatorFactory) { + headConfig.setStreamOperatorFactory(headOperatorFactory); headConfig.setOperatorID(headOperatorID); headConfig.setChainStart(); headConfig.setChainIndex(chainIndex); + headConfig.setBufferTimeout(bufferTimeout); Review comment: no need to set for head config? because the `bufferTimeout` is initialized via `headConfig.getBufferTimeout()` in constructor. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #9831: [FLINK-14278] Extend DispatcherResourceManagerComponentFactory.create to take ioExecutor
flinkbot edited a comment on issue #9831: [FLINK-14278] Extend DispatcherResourceManagerComponentFactory.create to take ioExecutor URL: https://github.com/apache/flink/pull/9831#issuecomment-537027384 ## CI report: * e1d37ab03c1bc62b3d45c45ee11d616ed2c78b0c : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/129858618) * ba5dc1be84ded55f91fad1698e0edb2ca7416add : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/130066271) * 56535ce31d6f51fc781b18b601e086ea5d0634d7 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/130858216) * 4ea8b42e4ce029ee7f74f28dd0006fe22abf5f44 : PENDING [Build](https://travis-ci.com/flink-ci/flink/builds/131454163) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #9832: [FLINK-11843] Bind lifespan of Dispatcher to leader session
flinkbot edited a comment on issue #9832: [FLINK-11843] Bind lifespan of Dispatcher to leader session URL: https://github.com/apache/flink/pull/9832#issuecomment-537049332 ## CI report: * abaae048fef753455970fac9d6ab421b660b0536 : UNKNOWN * b96c63552ccd322adae7a41a410615e95b538ece : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/129867595) * 2c95a3939dbf0259d694af6c69451f0ede3c3891 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/130066310) * 6e030add922011ea54178690f171911d0139f14b : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/130858235) * 271703eda6f6c55b1641a54206109ef659f62854 : PENDING [Build](https://travis-ci.com/flink-ci/flink/builds/131454171) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] xintongsong commented on a change in pull request #9805: [FLINK-11635][docs-zh] translate dev/stream/state/checkpointing into Chinese
xintongsong commented on a change in pull request #9805: [FLINK-11635][docs-zh] translate dev/stream/state/checkpointing into Chinese URL: https://github.com/apache/flink/pull/9805#discussion_r333847561 ## File path: docs/dev/stream/state/checkpointing.zh.md ## @@ -25,146 +25,138 @@ under the License. * ToC {:toc} -Every function and operator in Flink can be **stateful** (see [working with state](state.html) for details). -Stateful functions store data across the processing of individual elements/events, making state a critical building block for -any type of more elaborate operation. +Flink 中的每个方法或算子都能够是**有状态的**(阅读 [working with state](state.html) 查看详细)。 +状态化的方法在处理单个 元素/事件 的时候存储数据,让状态成为使各个类型的算子更加精细的重要部分。 +为了让状态容错,Flink 需要为状态添加**Checkpoint(检查点)**。Checkpoint 使得 Flink 能够恢复状态和在流中的位置,从而向应用提供和无故障执行时一样的语义。 -In order to make state fault tolerant, Flink needs to **checkpoint** the state. Checkpoints allow Flink to recover state and positions -in the streams to give the application the same semantics as a failure-free execution. +[Documentation on streaming fault tolerance]({{ site.baseurl }}/zh/internals/stream_checkpointing.html) 介绍了 Flink 流计算容错机制的内部技术原理。 -The [documentation on streaming fault tolerance]({{ site.baseurl }}/internals/stream_checkpointing.html) describes in detail the technique behind Flink's streaming fault tolerance mechanism. +## 前提条件 -## Prerequisites +Flink 的 Checkpoint 机制会和持久化存储进行交互,交换流与状态。一般需要: -Flink's checkpointing mechanism interacts with durable storage for streams and state. In general, it requires: + - 一个能够回放一段时间内数据的持久化数据源,例如持久化消息队列(例如 Apache Kafka、RabbitMQ、 Amazon Kinesis、 Google PubSub 等)或文件系统(例如 HDFS、 S3、 GFS、 NFS、 Ceph 等)。 + - 存放状态的持久化存储,通常为分布式文件系统(比如 HDFS、 S3、 GFS、 NFS、 Ceph 等)。 - - A *persistent* (or *durable*) data source that can replay records for a certain amount of time. Examples for such sources are persistent messages queues (e.g., Apache Kafka, RabbitMQ, Amazon Kinesis, Google PubSub) or file systems (e.g., HDFS, S3, GFS, NFS, Ceph, ...). - - A persistent storage for state, typically a distributed filesystem (e.g., HDFS, S3, GFS, NFS, Ceph, ...) +## 激活与配置 Checkpoint +默认情况下,Checkpoint 是禁用的。通过调用 `StreamExecutionEnvironment` 的 `enableCheckpointing(n)` 来激活 Checkpoint,里面的 *n* 是进行 Checkpoint 的间隔,单位毫秒。 -## Enabling and Configuring Checkpointing +Checkpoint 其他的属性包括: -By default, checkpointing is disabled. To enable checkpointing, call `enableCheckpointing(n)` on the `StreamExecutionEnvironment`, where *n* is the checkpoint interval in milliseconds. - -Other parameters for checkpointing include: - - - *exactly-once vs. at-least-once*: You can optionally pass a mode to the `enableCheckpointing(n)` method to choose between the two guarantee levels. -Exactly-once is preferable for most applications. At-least-once may be relevant for certain super-low-latency (consistently few milliseconds) applications. - - - *checkpoint timeout*: The time after which a checkpoint-in-progress is aborted, if it did not complete by then. - - - *minimum time between checkpoints*: To make sure that the streaming application makes a certain amount of progress between checkpoints, -one can define how much time needs to pass between checkpoints. If this value is set for example to *5000*, the next checkpoint will be -started no sooner than 5 seconds after the previous checkpoint completed, regardless of the checkpoint duration and the checkpoint interval. -Note that this implies that the checkpoint interval will never be smaller than this parameter. + - *精确一次(exactly-once) 对比 至少一次(at-least-once)*:你可以选择向 `enableCheckpointing(n)` 方法中传入一个模式来选择使用两种保证等级中的哪一种。 +对于大多数应用来说,精确一次是较好的选择。至少一次可能与某些延迟超低(始终只有几毫秒)的应用的关联较大。 + + - *Checkpoint超时(checkpoint timeout)*:如果过了这个时间,还在进行中的 checkpoint 操作就会被抛弃。 + + - *checkpoint 之间的最小时间(minimum time between checkpoints)*: 为了确保流应用在 checkpoint 之间有足够的进展,可以定义在 checkpoint 之间需要多久的时间。如果值设置为了 *5000*, +无论 checkpoint 持续时间与间隔是多久,在前一个 checkpoint 完成的五秒后才会开始下一个 checkpoint。 -It is often easier to configure applications by defining the "time between checkpoints" than the checkpoint interval, because the "time between checkpoints" -is not susceptible to the fact that checkpoints may sometimes take longer than on average (for example if the target storage system is temporarily slow). - -Note that this value also implies that the number of concurrent checkpoints is *one*. - - - *number of concurrent checkpoints*: By default, the system will not trigger another checkpoint while one is still in progress. -This ensures that the topology does not spend too much time on checkpoints and not make progress with processing the streams. -It is possible to allow for multiple overlapping checkpoints, which is interesting for pipelines that have a certain processing delay -(for example because the functions call external services that need some time to respond) but
[GitHub] [flink] xintongsong commented on a change in pull request #9805: [FLINK-11635][docs-zh] translate dev/stream/state/checkpointing into Chinese
xintongsong commented on a change in pull request #9805: [FLINK-11635][docs-zh] translate dev/stream/state/checkpointing into Chinese URL: https://github.com/apache/flink/pull/9805#discussion_r333847601 ## File path: docs/dev/stream/state/checkpointing.zh.md ## @@ -25,146 +25,138 @@ under the License. * ToC {:toc} -Every function and operator in Flink can be **stateful** (see [working with state](state.html) for details). -Stateful functions store data across the processing of individual elements/events, making state a critical building block for -any type of more elaborate operation. +Flink 中的每个方法或算子都能够是**有状态的**(阅读 [working with state](state.html) 查看详细)。 +状态化的方法在处理单个 元素/事件 的时候存储数据,让状态成为使各个类型的算子更加精细的重要部分。 +为了让状态容错,Flink 需要为状态添加**Checkpoint(检查点)**。Checkpoint 使得 Flink 能够恢复状态和在流中的位置,从而向应用提供和无故障执行时一样的语义。 -In order to make state fault tolerant, Flink needs to **checkpoint** the state. Checkpoints allow Flink to recover state and positions -in the streams to give the application the same semantics as a failure-free execution. +[Documentation on streaming fault tolerance]({{ site.baseurl }}/zh/internals/stream_checkpointing.html) 介绍了 Flink 流计算容错机制的内部技术原理。 -The [documentation on streaming fault tolerance]({{ site.baseurl }}/internals/stream_checkpointing.html) describes in detail the technique behind Flink's streaming fault tolerance mechanism. +## 前提条件 -## Prerequisites +Flink 的 Checkpoint 机制会和持久化存储进行交互,交换流与状态。一般需要: -Flink's checkpointing mechanism interacts with durable storage for streams and state. In general, it requires: + - 一个能够回放一段时间内数据的持久化数据源,例如持久化消息队列(例如 Apache Kafka、RabbitMQ、 Amazon Kinesis、 Google PubSub 等)或文件系统(例如 HDFS、 S3、 GFS、 NFS、 Ceph 等)。 + - 存放状态的持久化存储,通常为分布式文件系统(比如 HDFS、 S3、 GFS、 NFS、 Ceph 等)。 - - A *persistent* (or *durable*) data source that can replay records for a certain amount of time. Examples for such sources are persistent messages queues (e.g., Apache Kafka, RabbitMQ, Amazon Kinesis, Google PubSub) or file systems (e.g., HDFS, S3, GFS, NFS, Ceph, ...). - - A persistent storage for state, typically a distributed filesystem (e.g., HDFS, S3, GFS, NFS, Ceph, ...) +## 激活与配置 Checkpoint +默认情况下,Checkpoint 是禁用的。通过调用 `StreamExecutionEnvironment` 的 `enableCheckpointing(n)` 来激活 Checkpoint,里面的 *n* 是进行 Checkpoint 的间隔,单位毫秒。 -## Enabling and Configuring Checkpointing +Checkpoint 其他的属性包括: -By default, checkpointing is disabled. To enable checkpointing, call `enableCheckpointing(n)` on the `StreamExecutionEnvironment`, where *n* is the checkpoint interval in milliseconds. - -Other parameters for checkpointing include: - - - *exactly-once vs. at-least-once*: You can optionally pass a mode to the `enableCheckpointing(n)` method to choose between the two guarantee levels. -Exactly-once is preferable for most applications. At-least-once may be relevant for certain super-low-latency (consistently few milliseconds) applications. - - - *checkpoint timeout*: The time after which a checkpoint-in-progress is aborted, if it did not complete by then. - - - *minimum time between checkpoints*: To make sure that the streaming application makes a certain amount of progress between checkpoints, -one can define how much time needs to pass between checkpoints. If this value is set for example to *5000*, the next checkpoint will be -started no sooner than 5 seconds after the previous checkpoint completed, regardless of the checkpoint duration and the checkpoint interval. -Note that this implies that the checkpoint interval will never be smaller than this parameter. + - *精确一次(exactly-once) 对比 至少一次(at-least-once)*:你可以选择向 `enableCheckpointing(n)` 方法中传入一个模式来选择使用两种保证等级中的哪一种。 +对于大多数应用来说,精确一次是较好的选择。至少一次可能与某些延迟超低(始终只有几毫秒)的应用的关联较大。 + + - *Checkpoint超时(checkpoint timeout)*:如果过了这个时间,还在进行中的 checkpoint 操作就会被抛弃。 + + - *checkpoint 之间的最小时间(minimum time between checkpoints)*: 为了确保流应用在 checkpoint 之间有足够的进展,可以定义在 checkpoint 之间需要多久的时间。如果值设置为了 *5000*, +无论 checkpoint 持续时间与间隔是多久,在前一个 checkpoint 完成的五秒后才会开始下一个 checkpoint。 -It is often easier to configure applications by defining the "time between checkpoints" than the checkpoint interval, because the "time between checkpoints" -is not susceptible to the fact that checkpoints may sometimes take longer than on average (for example if the target storage system is temporarily slow). - -Note that this value also implies that the number of concurrent checkpoints is *one*. - - - *number of concurrent checkpoints*: By default, the system will not trigger another checkpoint while one is still in progress. -This ensures that the topology does not spend too much time on checkpoints and not make progress with processing the streams. -It is possible to allow for multiple overlapping checkpoints, which is interesting for pipelines that have a certain processing delay -(for example because the functions call external services that need some time to respond) but
[GitHub] [flink] flinkbot edited a comment on issue #9860: [FLINK-14331][runtime] Reset vertices right after they transition to terminated states
flinkbot edited a comment on issue #9860: [FLINK-14331][runtime] Reset vertices right after they transition to terminated states URL: https://github.com/apache/flink/pull/9860#issuecomment-539900648 ## CI report: * 80c100513c089a9dd0930aa547383ae970c4e7f8 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/131094955) * 2de47e991caa74adea1792b6b9153dee94e46b95 : UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-13567) Avro Confluent Schema Registry nightly end-to-end test failed on Travis
[ https://issues.apache.org/jira/browse/FLINK-13567?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yu Li updated FLINK-13567: -- Priority: Blocker (was: Critical) bq. Would it make sense to disable this test for the moment and make this issue a blocker for 1.10? +1, marking this as a blocker makes sense according to the failure frequency. > Avro Confluent Schema Registry nightly end-to-end test failed on Travis > --- > > Key: FLINK-13567 > URL: https://issues.apache.org/jira/browse/FLINK-13567 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka, Tests >Affects Versions: 1.8.2, 1.9.0, 1.10.0 >Reporter: Till Rohrmann >Priority: Blocker > Labels: test-stability > Fix For: 1.10.0 > > > The {{Avro Confluent Schema Registry nightly end-to-end test}} failed on > Travis with > {code} > [FAIL] 'Avro Confluent Schema Registry nightly end-to-end test' failed after > 2 minutes and 11 seconds! Test exited with exit code 1 > No taskexecutor daemon (pid: 29044) is running anymore on > travis-job-b0823aec-c4ec-4d4b-8b59-e9f968de9501. > No standalonesession daemon to stop on host > travis-job-b0823aec-c4ec-4d4b-8b59-e9f968de9501. > rm: cannot remove > '/home/travis/build/apache/flink/flink-dist/target/flink-1.10-SNAPSHOT-bin/flink-1.10-SNAPSHOT/plugins': > No such file or directory > {code} > https://api.travis-ci.org/v3/job/567273939/log.txt -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-14360) Flink on yarn should support obtain delegation tokens of multi hdfs namespaces
[ https://issues.apache.org/jira/browse/FLINK-14360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16949209#comment-16949209 ] Shen Yinjie commented on FLINK-14360: - Hi ,[~karmagyz] ,We deploy flink on yarn with RM federation and multi hdfs namespaces and hdfs clusters,flink would fail to connect to some hdfs namenodes due to lack of delegation tokens. We provide a configuration for users to set a list of hdfs scheme names.So that before flink appmaster start,it will obtain these delegation tokens for specified hdfs namenodes. > Flink on yarn should support obtain delegation tokens of multi hdfs namespaces > -- > > Key: FLINK-14360 > URL: https://issues.apache.org/jira/browse/FLINK-14360 > Project: Flink > Issue Type: Improvement > Components: Deployment / YARN >Reporter: Shen Yinjie >Priority: Major > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > There's a scenario when deploy flink on yarn with multi hdfs cluster or hdfs > federation, Flink need to get delegation tokens of all the namespaces before > start appmaster. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] hequn8128 commented on a change in pull request #9865: [FLINK-14212][python] Support no-argument Python UDFs.
hequn8128 commented on a change in pull request #9865: [FLINK-14212][python] Support no-argument Python UDFs. URL: https://github.com/apache/flink/pull/9865#discussion_r333858302 ## File path: flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/ExpressionReductionRulesTest.scala ## @@ -46,4 +47,21 @@ class ExpressionReductionRulesTest extends TableTestBase { util.verifyPlan("SELECT myUdf(1) FROM MyTable") } + @Test + def testExpressionReductionWithPythonUDF(): Unit = { +util.addFunction("PyUdf", MockedPythonUDFWithoutArguments) +util.addFunction("MyUdf", Func1) +util.verifyPlan("SELECT PyUdf(), MyUdf(1) FROM MyTable") + } +} + +object MockedPythonUDFWithoutArguments extends ScalarFunction { Review comment: Check both deterministic and non-deterministic python udfs? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] hequn8128 commented on a change in pull request #9865: [FLINK-14212][python] Support no-argument Python UDFs.
hequn8128 commented on a change in pull request #9865: [FLINK-14212][python] Support no-argument Python UDFs. URL: https://github.com/apache/flink/pull/9865#discussion_r333842540 ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/ExpressionReducer.scala ## @@ -62,6 +63,9 @@ class ExpressionReducer( val literals = constExprs.asScala.map(e => (e.getType.getSqlTypeName, e)).flatMap { + // skip expressions that contain python functions Review comment: Maybe it's better to be more specific that why we skip python functions. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] hequn8128 commented on a change in pull request #9865: [FLINK-14212][python] Support no-argument Python UDFs.
hequn8128 commented on a change in pull request #9865: [FLINK-14212][python] Support no-argument Python UDFs. URL: https://github.com/apache/flink/pull/9865#discussion_r333860456 ## File path: flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/plan/ExpressionReductionRulesTest.scala ## @@ -510,3 +528,14 @@ object DeterministicNullFunc extends ScalarFunction { def eval(): String = null override def isDeterministic = true } + +object MockedPythonUDFWithoutArguments extends ScalarFunction { + + override def getLanguage: FunctionLanguage = FunctionLanguage.PYTHON + + def eval(): Long = { Review comment: Simplify the eval to `def eval(): Long = 1L` directly? and change the class name to `DeterministicPythonFunc`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] hequn8128 commented on a change in pull request #9865: [FLINK-14212][python] Support no-argument Python UDFs.
hequn8128 commented on a change in pull request #9865: [FLINK-14212][python] Support no-argument Python UDFs. URL: https://github.com/apache/flink/pull/9865#discussion_r333854883 ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/ExpressionReducer.scala ## @@ -120,59 +124,64 @@ class ExpressionReducer( var reducedIdx = 0 while (i < constExprs.size()) { val unreduced = constExprs.get(i) - unreduced.getType.getSqlTypeName match { -// we insert the original expression for object literals -case SqlTypeName.ANY | - SqlTypeName.ROW | - SqlTypeName.ARRAY | - SqlTypeName.MAP | - SqlTypeName.MULTISET => - reducedValues.add(unreduced) -case SqlTypeName.VARCHAR | SqlTypeName.CHAR => - val escapeVarchar = StringEscapeUtils - .escapeJava(safeToString(reduced.getField(reducedIdx).asInstanceOf[BinaryString])) - reducedValues.add(maySkipNullLiteralReduce(rexBuilder, escapeVarchar, unreduced)) - reducedIdx += 1 -case SqlTypeName.VARBINARY | SqlTypeName.BINARY => - val reducedValue = reduced.getField(reducedIdx) - val value = if (null != reducedValue) { -new ByteString(reduced.getField(reducedIdx).asInstanceOf[Array[Byte]]) - } else { -reducedValue - } - reducedValues.add(maySkipNullLiteralReduce(rexBuilder, value, unreduced)) - reducedIdx += 1 -case SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE => - val value = if (!reduced.isNullAt(reducedIdx)) { -val mills = reduced.getField(reducedIdx).asInstanceOf[Long] -Long.box(SqlDateTimeUtils.timestampWithLocalZoneToTimestamp( - mills, TimeZone.getTimeZone(config.getLocalTimeZone))) - } else { -null - } - reducedValues.add(maySkipNullLiteralReduce(rexBuilder, value, unreduced)) - reducedIdx += 1 -case SqlTypeName.DECIMAL => - val reducedValue = reduced.getField(reducedIdx) - val value = if (reducedValue != null) { -reducedValue.asInstanceOf[Decimal].toBigDecimal - } else { -reducedValue - } - reducedValues.add(maySkipNullLiteralReduce(rexBuilder, value, unreduced)) - reducedIdx += 1 -case _ => - val reducedValue = reduced.getField(reducedIdx) - // RexBuilder handle double literal incorrectly, convert it into BigDecimal manually - val value = if (reducedValue != null && -unreduced.getType.getSqlTypeName == SqlTypeName.DOUBLE) { -new java.math.BigDecimal(reducedValue.asInstanceOf[Number].doubleValue()) - } else { -reducedValue - } - - reducedValues.add(maySkipNullLiteralReduce(rexBuilder, value, unreduced)) - reducedIdx += 1 + if (PythonUtil.containsFunctionOf(unreduced, FunctionLanguage.PYTHON)) { Review comment: Rex tree traverse is expensive. We traverse twice in this reduce() method. How about do some reuse here, e.g., ``` val jvmUdfs = constExprs.asScala.zipWithIndex .filter(p => PythonUtil.containsFunctionOf(p._1, FunctionLanguage.JVM)) ... if (!jvmUdfIndexes.contains(i)) { // if contains python function then just insert the original expression. reducedValues.add(unreduced) } ``` In this way, we can reduce some optimization time. What do you think? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Comment Edited] (FLINK-14360) Flink on yarn should support obtain delegation tokens of multi hdfs namespaces
[ https://issues.apache.org/jira/browse/FLINK-14360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16949216#comment-16949216 ] Yangze Guo edited comment on FLINK-14360 at 10/11/19 7:37 AM: -- cc [~wangyang] Could you take a look at this ticket? was (Author: karmagyz): cc [~taoyang] [~yangwang166] Could you take a look at this ticket? > Flink on yarn should support obtain delegation tokens of multi hdfs namespaces > -- > > Key: FLINK-14360 > URL: https://issues.apache.org/jira/browse/FLINK-14360 > Project: Flink > Issue Type: Improvement > Components: Deployment / YARN >Reporter: Shen Yinjie >Priority: Major > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > There's a scenario when deploy flink on yarn with multi hdfs cluster or hdfs > federation, Flink need to get delegation tokens of all the namespaces before > start appmaster. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-14366) Annotate MiniCluster tests in flink-tests with AlsoRunWithSchedulerNG
Zhu Zhu created FLINK-14366: --- Summary: Annotate MiniCluster tests in flink-tests with AlsoRunWithSchedulerNG Key: FLINK-14366 URL: https://issues.apache.org/jira/browse/FLINK-14366 Project: Flink Issue Type: Sub-task Components: Runtime / Coordination, Tests Affects Versions: 1.10.0 Reporter: Zhu Zhu Fix For: 1.10.0 This task is to annotate all MiniCluster tests with AlsoRunWithSchedulerNG in flink-tests, so that we can know breaking changes in time when further improving the new generation scheduler. We should also guarantee the annotated tests to pass, either by fixing failed tests, or not annotating a failed test and opening a ticket to track it. The tickets for failed tests should be linked in this task. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-14360) Flink on yarn should support obtain delegation tokens of multi hdfs namespaces
[ https://issues.apache.org/jira/browse/FLINK-14360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16949216#comment-16949216 ] Yangze Guo commented on FLINK-14360: cc [~taoyang] [~yangwang166] Could you take a look at this ticket? > Flink on yarn should support obtain delegation tokens of multi hdfs namespaces > -- > > Key: FLINK-14360 > URL: https://issues.apache.org/jira/browse/FLINK-14360 > Project: Flink > Issue Type: Improvement > Components: Deployment / YARN >Reporter: Shen Yinjie >Priority: Major > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > There's a scenario when deploy flink on yarn with multi hdfs cluster or hdfs > federation, Flink need to get delegation tokens of all the namespaces before > start appmaster. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (FLINK-14360) Flink on yarn should support obtain delegation tokens of multi hdfs namespaces
[ https://issues.apache.org/jira/browse/FLINK-14360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16949216#comment-16949216 ] Yangze Guo edited comment on FLINK-14360 at 10/11/19 7:40 AM: -- cc [~fly_in_gis] Could you take a look at this ticket? was (Author: karmagyz): cc [~wangyang] Could you take a look at this ticket? > Flink on yarn should support obtain delegation tokens of multi hdfs namespaces > -- > > Key: FLINK-14360 > URL: https://issues.apache.org/jira/browse/FLINK-14360 > Project: Flink > Issue Type: Improvement > Components: Deployment / YARN >Reporter: Shen Yinjie >Priority: Major > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > There's a scenario when deploy flink on yarn with multi hdfs cluster or hdfs > federation, Flink need to get delegation tokens of all the namespaces before > start appmaster. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] flinkbot edited a comment on issue #9824: [FLINK-14302] FlinkKafkaInternalProducer should not send `ADD_PARTITIONS_TO_TXN` request if `newPartitionsInTransaction` is empty when enable
flinkbot edited a comment on issue #9824: [FLINK-14302] FlinkKafkaInternalProducer should not send `ADD_PARTITIONS_TO_TXN` request if `newPartitionsInTransaction` is empty when enable EoS URL: https://github.com/apache/flink/pull/9824#issuecomment-536928428 ## CI report: * 87ae788d6490da7af5284c404648647c9919a6df : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/129820284) * c4b5a18b039a90aa6b313c43f423622536bc8cd4 : UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] WeiZhong94 commented on issue #9865: [FLINK-14212][python] Support no-argument Python UDFs.
WeiZhong94 commented on issue #9865: [FLINK-14212][python] Support no-argument Python UDFs. URL: https://github.com/apache/flink/pull/9865#issuecomment-540956430 @hequn8128 Thanks for your feedback! Dian Fu and me has talked about these two approach and we come to an agreement that skip the optimization in `ExpressionReducer` and optimize the UDFs during runtime. Our thoughts are as follows: 1. How to optimize Python UDFs during runtime? After support constant parameters in Python UDF(see [this PR](https://github.com/apache/flink/pull/9858)), we can do this optimization when evaluating the chained Python UDFs in python worker: If the evaluated Python UDF is deterministic and has no argument or its arguments are all constant value, which means it will always return a constant value, replace it with the constant result value. This rule can be applied recursively until all the deterministic UDFs with constant inputs have been replaced. If the root nodes of the chained Python UDFs become constant values, we can only transmit them only once and replace them with None in following transmission to save IO resource. The Java operator also knows which fields of the evaluated result should be constant value rather than None because the reduce rule can be applied in Java side too. No additional interaction between Java operator and Python worker is needed in this design. 2. Why not optimize Python UDFs during optimization phase? Reducing Python UDFs in optimization phase is not a easy work in current design. It means the generated Java wrappers of Python UDFs can be evaluated and return the correct result. In other word we need run Python UDFs in client side, but the Python UDFs is designed to run on cluster whose python environment may different from client side after we introduce environment and dependency management in the future. To solve the environment problem, we need to prepare a python environment that is the same as the python environment on cluster before reducing Python UDFs. To evaluate the Python UDF, we need to implement a new UDF runner which does not depend on Apache Beam(the client side of Flink Python API does not depend on Apache Beam). We know if we complete these it will be a perfect solution of this problem, but it is too expensive compared to runtime optimization. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #9831: [FLINK-14278] Extend DispatcherResourceManagerComponentFactory.create to take ioExecutor
flinkbot edited a comment on issue #9831: [FLINK-14278] Extend DispatcherResourceManagerComponentFactory.create to take ioExecutor URL: https://github.com/apache/flink/pull/9831#issuecomment-537027384 ## CI report: * e1d37ab03c1bc62b3d45c45ee11d616ed2c78b0c : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/129858618) * ba5dc1be84ded55f91fad1698e0edb2ca7416add : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/130066271) * 56535ce31d6f51fc781b18b601e086ea5d0634d7 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/130858216) * 4ea8b42e4ce029ee7f74f28dd0006fe22abf5f44 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/131454163) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-7151) Add a basic function SQL DDL
[ https://issues.apache.org/jira/browse/FLINK-7151?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16949235#comment-16949235 ] Timo Walther commented on FLINK-7151: - Thank you [~ZhenqiuHuang]. We also have a discussion about giving properties to functions in the ML discussion "[DISCUSS] FLIP-64: Support for Temporary Objects in Table module". I think Postgres also has a {{WITH}} clause for this reason which aligns nicely to our {{CREATE TABLE xxx () WITH (...)}} clause. > Add a basic function SQL DDL > > > Key: FLINK-7151 > URL: https://issues.apache.org/jira/browse/FLINK-7151 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API >Reporter: yuemeng >Assignee: Zhenqiu Huang >Priority: Critical > Labels: pull-request-available > Fix For: 1.10.0 > > Time Spent: 10m > Remaining Estimate: 0h > > Based on create function and table.we can register a udf,udaf,udtf use sql: > {code} > CREATE FUNCTION [IF NOT EXISTS] [catalog_name.db_name.]function_name AS > class_name; > DROP FUNCTION [IF EXISTS] [catalog_name.db_name.]function_name; > ALTER FUNCTION [IF EXISTS] [catalog_name.db_name.]function_name RENAME TO > new_name; > {code} > {code} > CREATE function 'TOPK' AS > 'com..aggregate.udaf.distinctUdaf.topk.ITopKUDAF'; > INSERT INTO db_sink SELECT id, TOPK(price, 5, 'DESC') FROM kafka_source GROUP > BY id; > {code} > This ticket can assume that the function class is already loaded in classpath > by users. Advanced syntax like to how to dynamically load udf libraries from > external locations can be on a separate ticket. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-14364) Allow comments fail when not ignore parse errors in CsvRowDeserializationSchema
[ https://issues.apache.org/jira/browse/FLINK-14364?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16949243#comment-16949243 ] Jingsong Lee commented on FLINK-14364: -- {code:java} String string = "#Test,12,Test"; final TypeInformation rowInfo = Types.ROW(Types.STRING, Types.INT, Types.STRING); final CsvRowDeserializationSchema.Builder deserSchemaBuilder = new CsvRowDeserializationSchema.Builder(rowInfo) .setIgnoreParseErrors(false) .setAllowComments(true); System.out.println(deserialize(deserSchemaBuilder, string)); {code} Here is example~ > Allow comments fail when not ignore parse errors in > CsvRowDeserializationSchema > --- > > Key: FLINK-14364 > URL: https://issues.apache.org/jira/browse/FLINK-14364 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Reporter: Jingsong Lee >Priority: Major > Fix For: 1.10.0 > > > Use CsvRowDeserializationSchema, when setIgnoreParseErrors(false) and > setAllowComments(true). > If there are some comments in msg, will throw MismatchedInputException. > If this a bug? and we should catch MismatchedInputException and return null? > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-14356) Introduce "single-field" format to (de)serialize message to a single field
[ https://issues.apache.org/jira/browse/FLINK-14356?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16949244#comment-16949244 ] Timo Walther commented on FLINK-14356: -- If we go for STRING as well, I suggest to implement at least: CHAR/VARCHAR/BINARY/VARBINARY/TINYINT/INT/SMALLINT/BIGINT. The implementation effort is not very big. > Introduce "single-field" format to (de)serialize message to a single field > -- > > Key: FLINK-14356 > URL: https://issues.apache.org/jira/browse/FLINK-14356 > Project: Flink > Issue Type: Improvement > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile), Table > SQL / API >Reporter: jinfeng >Assignee: jinfeng >Priority: Major > > I want to use flink sql to write kafka messages directly to hdfs. The > serialization and deserialization of messages are not involved in the middle. > The bytes of the message directly convert the first field of Row. However, > the current RowSerializationSchema does not support the conversion of bytes > to VARBINARY. Can we add some special RowSerializationSchema and > RowDerializationSchema ? -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-14364) Allow comments fail when not ignore parse errors in CsvRowDeserializationSchema
[ https://issues.apache.org/jira/browse/FLINK-14364?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16949247#comment-16949247 ] Timo Walther commented on FLINK-14364: -- Yes, this seems like a bug. It should be null. Jackson should not throw an exception here. > Allow comments fail when not ignore parse errors in > CsvRowDeserializationSchema > --- > > Key: FLINK-14364 > URL: https://issues.apache.org/jira/browse/FLINK-14364 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Reporter: Jingsong Lee >Priority: Major > Fix For: 1.10.0 > > > Use CsvRowDeserializationSchema, when setIgnoreParseErrors(false) and > setAllowComments(true). > If there are some comments in msg, will throw MismatchedInputException. > If this a bug? and we should catch MismatchedInputException and return null? > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] flinkbot edited a comment on issue #9805: [FLINK-11635][docs-zh] translate dev/stream/state/checkpointing into Chinese
flinkbot edited a comment on issue #9805: [FLINK-11635][docs-zh] translate dev/stream/state/checkpointing into Chinese URL: https://github.com/apache/flink/pull/9805#issuecomment-536295524 ## CI report: * 9ad039de6bbb953c7e2dd74b3118d80ae552f31e : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/129590285) * 5542dca83dbfb476114efb5aa2f2aa8d0626183a : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/129626942) * d4c5b15213eb89ee29880c7c8259622c9c7f9bb9 : CANCELED [Build](https://travis-ci.com/flink-ci/flink/builds/131454148) * a8394e9c2ba5aa8186e67895ad68a325f24a8a31 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/131455928) * b9268914cafdf59bd10cc47fb04616b926f55612 : PENDING [Build](https://travis-ci.com/flink-ci/flink/builds/131461987) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] 1u0 commented on a change in pull request #9870: [FLINK-14350][metrics] Introduce dedicated MetricScope
1u0 commented on a change in pull request #9870: [FLINK-14350][metrics] Introduce dedicated MetricScope URL: https://github.com/apache/flink/pull/9870#discussion_r333877706 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/FrontMetricGroup.java ## @@ -30,28 +34,54 @@ */ public class FrontMetricGroup> extends ProxyMetricGroup { - protected int reporterIndex; + private final ReporterIndexInjectingMetricScope scope; public FrontMetricGroup(int reporterIndex, P reference) { super(reference); - this.reporterIndex = reporterIndex; + this.scope = new ReporterIndexInjectingMetricScope(reporterIndex, this.parentMetricGroup.getScope()); } @Override public String getMetricIdentifier(String metricName) { - return parentMetricGroup.getMetricIdentifier(metricName, null, this.reporterIndex); + return scope.getMetricIdentifier(metricName); } @Override public String getMetricIdentifier(String metricName, CharacterFilter filter) { - return parentMetricGroup.getMetricIdentifier(metricName, filter, this.reporterIndex); + return scope.getMetricIdentifier(metricName, filter); } public String getLogicalScope(CharacterFilter filter) { return parentMetricGroup.getLogicalScope(filter); } public String getLogicalScope(CharacterFilter filter, char delimiter) { - return parentMetricGroup.getLogicalScope(filter, delimiter, this.reporterIndex); + return parentMetricGroup.getLogicalScope(filter, delimiter); Review comment: **NB:** If I understand correctly, without `reporterIndex`, you are making this call non cached. This would impact InfluxDB, Jmx and Prometheus reporters. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] 1u0 commented on a change in pull request #9870: [FLINK-14350][metrics] Introduce dedicated MetricScope
1u0 commented on a change in pull request #9870: [FLINK-14350][metrics] Introduce dedicated MetricScope URL: https://github.com/apache/flink/pull/9870#discussion_r333874262 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/scope/InternalMetricScope.java ## @@ -0,0 +1,89 @@ +package org.apache.flink.runtime.metrics.scope; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.metrics.CharacterFilter; +import org.apache.flink.metrics.MetricScope; +import org.apache.flink.runtime.metrics.DelimiterProvider; + +import java.util.Map; +import java.util.function.Supplier; + +/** + * Default scope implementation. Contains additional methods assembling identifiers based on reporter-specific delimiters. + */ +@Internal +public class InternalMetricScope implements MetricScope { + + private final DelimiterProvider delimiterProvider; + private final Supplier> variablesProvider; + + /** +* The map containing all variables and their associated values, lazily computed. +*/ + protected volatile Map variables; + + /** +* The metrics scope represented by this group. +* For example ["host-7", "taskmanager-2", "window_word_count", "my-mapper" ]. +*/ + private final String[] scopeComponents; + + /** +* Array containing the metrics scope represented by this group for each reporter, as a concatenated string, lazily computed. +* For example: "host-7.taskmanager-2.window_word_count.my-mapper" +*/ + private final String[] scopeStrings; + + public InternalMetricScope(DelimiterProvider delimiterProvider, String[] scopeComponents, Supplier> variablesProvider) { + this.delimiterProvider = delimiterProvider; + this.variablesProvider = variablesProvider; + this.scopeComponents = scopeComponents; + this.scopeStrings = new String[delimiterProvider.getNumberReporters()]; + } + + @Override + public Map getAllVariables() { + if (variables == null) { // avoid synchronization for common case + synchronized (this) { + if (variables == null) { + variables = variablesProvider.get(); + } + } + } + return variables; + } + + public String[] geScopeComponents() { + return scopeStrings; + } Review comment: Also, you are returning `scopeStrings` here instead of `scopeComponents` (I hope this would be revealed by tests, once CI reaches them). Side note: I think a better name for `scopeStrings` would be something like `cachedIdentifierScope`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-8225) Use JsonRowDeserializationSchema without Kafka connector dependency
[ https://issues.apache.org/jira/browse/FLINK-8225?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16949268#comment-16949268 ] Sendoh commented on FLINK-8225: --- cool. will close it > Use JsonRowDeserializationSchema without Kafka connector dependency > > > Key: FLINK-8225 > URL: https://issues.apache.org/jira/browse/FLINK-8225 > Project: Flink > Issue Type: Wish > Components: Table SQL / Ecosystem >Reporter: Sendoh >Priority: Minor > > Now when using JsonRowDeserializationSchema, user needs to add Kafka > connector dependency. Nevertheless JsonRowDeserializationSchema can be used > without using Kafka connector. > AC: > move JsonRowDeserializationSchema to a dedicated module > Ref: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/the-location-of-JsonRowDeserializationSchema-java-td17063.html -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Closed] (FLINK-8225) Use JsonRowDeserializationSchema without Kafka connector dependency
[ https://issues.apache.org/jira/browse/FLINK-8225?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sendoh closed FLINK-8225. - Resolution: Implemented > Use JsonRowDeserializationSchema without Kafka connector dependency > > > Key: FLINK-8225 > URL: https://issues.apache.org/jira/browse/FLINK-8225 > Project: Flink > Issue Type: Wish > Components: Table SQL / Ecosystem >Reporter: Sendoh >Priority: Minor > > Now when using JsonRowDeserializationSchema, user needs to add Kafka > connector dependency. Nevertheless JsonRowDeserializationSchema can be used > without using Kafka connector. > AC: > move JsonRowDeserializationSchema to a dedicated module > Ref: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/the-location-of-JsonRowDeserializationSchema-java-td17063.html -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] WeiZhong94 commented on issue #9865: [FLINK-14212][python] Support no-argument Python UDFs.
WeiZhong94 commented on issue #9865: [FLINK-14212][python] Support no-argument Python UDFs. URL: https://github.com/apache/flink/pull/9865#issuecomment-540973875 @hequn8128 Thanks for your review! I have addressed your comments in the latest commit. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] WeiZhong94 commented on a change in pull request #9865: [FLINK-14212][python] Support no-argument Python UDFs.
WeiZhong94 commented on a change in pull request #9865: [FLINK-14212][python] Support no-argument Python UDFs. URL: https://github.com/apache/flink/pull/9865#discussion_r333886126 ## File path: flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/ExpressionReductionRulesTest.scala ## @@ -46,4 +47,21 @@ class ExpressionReductionRulesTest extends TableTestBase { util.verifyPlan("SELECT myUdf(1) FROM MyTable") } + @Test + def testExpressionReductionWithPythonUDF(): Unit = { +util.addFunction("PyUdf", MockedPythonUDFWithoutArguments) +util.addFunction("MyUdf", Func1) +util.verifyPlan("SELECT PyUdf(), MyUdf(1) FROM MyTable") + } +} + +object MockedPythonUDFWithoutArguments extends ScalarFunction { Review comment: IMO it's not necessary to check non-deterministic python UDFs in this rule test. My reason is: 1. If the UDF is non-deterministic, it won't match this rule whether it is a Python UDF or not. This test case won't cover more code. 2. We already have a ITCase about non-deterministic python UDF in `test_udf.py` to make sure the non-deterministic python UDFs work properly. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-14367) Fully support for converting RexNode to Expression
Jark Wu created FLINK-14367: --- Summary: Fully support for converting RexNode to Expression Key: FLINK-14367 URL: https://issues.apache.org/jira/browse/FLINK-14367 Project: Flink Issue Type: Improvement Components: Table SQL / Planner Reporter: Jark Wu Currently, the {{RexNodeToExpressionConverter}} in both planners are not fully support. There're many RexNodes can not be converted to Expressions. 1) RexFieldAccess -> GET call 2) Literals in interval types and complex types 3) TRIM(BOTH/LEADING/HEADING, ..) ... We should have a comprehensive tests to cover all cases. A good idea is to verify with {{ExpressionConverter}} together. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-14368) KafkaProducerAtLeastOnceITCase>KafkaProducerTestBase.testCustomPartitioning failed on Travis
Till Rohrmann created FLINK-14368: - Summary: KafkaProducerAtLeastOnceITCase>KafkaProducerTestBase.testCustomPartitioning failed on Travis Key: FLINK-14368 URL: https://issues.apache.org/jira/browse/FLINK-14368 Project: Flink Issue Type: Bug Components: Connectors / Kafka Affects Versions: 1.10.0 Reporter: Till Rohrmann The {{KafkaProducerAtLeastOnceITCase>KafkaProducerTestBase.testCustomPartitioning}} fails on Travis with {code} Test testCustomPartitioning(org.apache.flink.streaming.connectors.kafka.KafkaProducerAtLeastOnceITCase) failed with: java.lang.AssertionError: Create test topic : defaultTopic failed, org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment. at org.junit.Assert.fail(Assert.java:88) at org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl.createTestTopic(KafkaTestEnvironmentImpl.java:180) at org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironment.createTestTopic(KafkaTestEnvironment.java:115) at org.apache.flink.streaming.connectors.kafka.KafkaTestBase.createTestTopic(KafkaTestBase.java:196) at org.apache.flink.streaming.connectors.kafka.KafkaProducerTestBase.testCustomPartitioning(KafkaProducerTestBase.java:112) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55) at org.junit.rules.RunRules.evaluate(RunRules.java:20) at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48) at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48) at org.junit.rules.RunRules.evaluate(RunRules.java:20) at org.junit.runners.ParentRunner.run(ParentRunner.java:363) at org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:365) at org.apache.maven.surefire.junit4.JUnit4Provider.executeWithRerun(JUnit4Provider.java:273) at org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:238) at org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:159) at org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:384) at org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:345) at org.apache.maven.surefire.booter.ForkedBooter.execute(ForkedBooter.java:126) at org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:418) {code} https://api.travis-ci.com/v3/job/244297223/log.txt -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-14369) KafkaProducerAtLeastOnceITCase>KafkaProducerTestBase.testOneToOneAtLeastOnceCustomOperator fails on Travis
Till Rohrmann created FLINK-14369: - Summary: KafkaProducerAtLeastOnceITCase>KafkaProducerTestBase.testOneToOneAtLeastOnceCustomOperator fails on Travis Key: FLINK-14369 URL: https://issues.apache.org/jira/browse/FLINK-14369 Project: Flink Issue Type: Bug Components: Connectors / Kafka Affects Versions: 1.10.0 Reporter: Till Rohrmann The {{KafkaProducerAtLeastOnceITCase>KafkaProducerTestBase.testOneToOneAtLeastOnceCustomOperator}} fails on Travis with {code} Test testOneToOneAtLeastOnceCustomOperator(org.apache.flink.streaming.connectors.kafka.KafkaProducerAtLeastOnceITCase) failed with: java.lang.AssertionError: Create test topic : oneToOneTopicCustomOperator failed, org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment. at org.junit.Assert.fail(Assert.java:88) at org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl.createTestTopic(KafkaTestEnvironmentImpl.java:180) at org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironment.createTestTopic(KafkaTestEnvironment.java:115) at org.apache.flink.streaming.connectors.kafka.KafkaTestBase.createTestTopic(KafkaTestBase.java:196) at org.apache.flink.streaming.connectors.kafka.KafkaProducerTestBase.testOneToOneAtLeastOnce(KafkaProducerTestBase.java:231) at org.apache.flink.streaming.connectors.kafka.KafkaProducerTestBase.testOneToOneAtLeastOnceCustomOperator(KafkaProducerTestBase.java:214) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55) at org.junit.rules.RunRules.evaluate(RunRules.java:20) at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48) at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48) at org.junit.rules.RunRules.evaluate(RunRules.java:20) at org.junit.runners.ParentRunner.run(ParentRunner.java:363) at org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:365) at org.apache.maven.surefire.junit4.JUnit4Provider.executeWithRerun(JUnit4Provider.java:273) at org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:238) at org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:159) at org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:384) at org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:345) at org.apache.maven.surefire.booter.ForkedBooter.execute(ForkedBooter.java:126) at org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:418) {code} https://api.travis-ci.com/v3/job/244297223/log.txt -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-14370) KafkaProducerAtLeastOnceITCase>KafkaProducerTestBase.testOneToOneAtLeastOnceRegularSink fails on Travis
Till Rohrmann created FLINK-14370: - Summary: KafkaProducerAtLeastOnceITCase>KafkaProducerTestBase.testOneToOneAtLeastOnceRegularSink fails on Travis Key: FLINK-14370 URL: https://issues.apache.org/jira/browse/FLINK-14370 Project: Flink Issue Type: Bug Components: Connectors / Kafka Affects Versions: 1.10.0 Reporter: Till Rohrmann The {{KafkaProducerAtLeastOnceITCase>KafkaProducerTestBase.testOneToOneAtLeastOnceRegularSink}} fails on Travis with {code} Test testOneToOneAtLeastOnceRegularSink(org.apache.flink.streaming.connectors.kafka.KafkaProducerAtLeastOnceITCase) failed with: java.lang.AssertionError: Job should fail! at org.junit.Assert.fail(Assert.java:88) at org.apache.flink.streaming.connectors.kafka.KafkaProducerTestBase.testOneToOneAtLeastOnce(KafkaProducerTestBase.java:280) at org.apache.flink.streaming.connectors.kafka.KafkaProducerTestBase.testOneToOneAtLeastOnceRegularSink(KafkaProducerTestBase.java:206) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55) at org.junit.rules.RunRules.evaluate(RunRules.java:20) at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48) at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48) at org.junit.rules.RunRules.evaluate(RunRules.java:20) at org.junit.runners.ParentRunner.run(ParentRunner.java:363) at org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:365) at org.apache.maven.surefire.junit4.JUnit4Provider.executeWithRerun(JUnit4Provider.java:273) at org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:238) at org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:159) at org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:384) at org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:345) at org.apache.maven.surefire.booter.ForkedBooter.execute(ForkedBooter.java:126) at org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:418) {code} https://api.travis-ci.com/v3/job/244297223/log.txt -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] flinkbot edited a comment on issue #9805: [FLINK-11635][docs-zh] translate dev/stream/state/checkpointing into Chinese
flinkbot edited a comment on issue #9805: [FLINK-11635][docs-zh] translate dev/stream/state/checkpointing into Chinese URL: https://github.com/apache/flink/pull/9805#issuecomment-536295524 ## CI report: * 9ad039de6bbb953c7e2dd74b3118d80ae552f31e : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/129590285) * 5542dca83dbfb476114efb5aa2f2aa8d0626183a : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/129626942) * d4c5b15213eb89ee29880c7c8259622c9c7f9bb9 : CANCELED [Build](https://travis-ci.com/flink-ci/flink/builds/131454148) * a8394e9c2ba5aa8186e67895ad68a325f24a8a31 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/131455928) * b9268914cafdf59bd10cc47fb04616b926f55612 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/131461987) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #9865: [FLINK-14212][python] Support no-argument Python UDFs.
flinkbot edited a comment on issue #9865: [FLINK-14212][python] Support no-argument Python UDFs. URL: https://github.com/apache/flink/pull/9865#issuecomment-539919841 ## CI report: * bf1d566ea2f91d61c2f436bb92b5337088b7 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/131102433) * 3110f74ae60ed10bdb2bdbed7dd1facfde9fdeea : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/131266172) * f48b910ceee34ff7eb9f6f75d7782b49005c587f : CANCELED [Build](https://travis-ci.com/flink-ci/flink/builds/131275232) * c97adb28424b4518535c9922b011d7adcf5e842f : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/131279546) * 4edf1202d5b9c34f329ca29ed96430b69a5807cd : UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-14365) Annotate MiniCluster tests in core modules with AlsoRunWithSchedulerNG
Zhu Zhu created FLINK-14365: --- Summary: Annotate MiniCluster tests in core modules with AlsoRunWithSchedulerNG Key: FLINK-14365 URL: https://issues.apache.org/jira/browse/FLINK-14365 Project: Flink Issue Type: Sub-task Components: Runtime / Coordination Affects Versions: 1.10.0 Reporter: Zhu Zhu Fix For: 1.10.0 This task is to annotate MiniCluster tests with AlsoRunWithSchedulerNG in flink core modules, so that we can know breaking changes in time when further improving the new generation scheduler. Core modules are the basic flink modules as defined in {{MODULES_CORE}} in flink/travis/stage.sh. MODULES_CORE="\ flink-annotations,\ flink-test-utils-parent/flink-test-utils,\ flink-state-backends/flink-statebackend-rocksdb,\ flink-clients,\ flink-core,\ flink-java,\ flink-optimizer,\ flink-runtime,\ flink-runtime-web,\ flink-scala,\ flink-streaming-java,\ flink-streaming-scala,\ flink-metrics,\ flink-metrics/flink-metrics-core" -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-14246) Annotate all MiniCluster tests in flink-runtime with AlsoRunWithSchedulerNG
[ https://issues.apache.org/jira/browse/FLINK-14246?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhu Zhu updated FLINK-14246: Component/s: Tests > Annotate all MiniCluster tests in flink-runtime with AlsoRunWithSchedulerNG > --- > > Key: FLINK-14246 > URL: https://issues.apache.org/jira/browse/FLINK-14246 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Coordination, Tests >Affects Versions: 1.10.0 >Reporter: Zhu Zhu >Assignee: Zhu Zhu >Priority: Major > Labels: pull-request-available > Fix For: 1.10.0 > > Time Spent: 10m > Remaining Estimate: 0h > > This task is to annotate all MiniCluster tests with AlsoRunWithSchedulerNG in > flink-runtime, so that we can know breaking changes in time when further > improving the new generation scheduler. > We should also guarantee the annotated tests to pass, either by fixing failed > tests, or not annotating a failed test and opening a ticket to track it. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-14365) Annotate MiniCluster tests in core modules with AlsoRunWithSchedulerNG
[ https://issues.apache.org/jira/browse/FLINK-14365?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhu Zhu updated FLINK-14365: Component/s: Tests > Annotate MiniCluster tests in core modules with AlsoRunWithSchedulerNG > -- > > Key: FLINK-14365 > URL: https://issues.apache.org/jira/browse/FLINK-14365 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Coordination, Tests >Affects Versions: 1.10.0 >Reporter: Zhu Zhu >Priority: Major > Fix For: 1.10.0 > > > This task is to annotate MiniCluster tests with AlsoRunWithSchedulerNG in > flink core modules, so that we can know breaking changes in time when further > improving the new generation scheduler. > Core modules are the basic flink modules as defined in {{MODULES_CORE}} in > flink/travis/stage.sh. > MODULES_CORE="\ > flink-annotations,\ > flink-test-utils-parent/flink-test-utils,\ > flink-state-backends/flink-statebackend-rocksdb,\ > flink-clients,\ > flink-core,\ > flink-java,\ > flink-optimizer,\ > flink-runtime,\ > flink-runtime-web,\ > flink-scala,\ > flink-streaming-java,\ > flink-streaming-scala,\ > flink-metrics,\ > flink-metrics/flink-metrics-core" -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] Henvealf commented on issue #9805: [FLINK-11635][docs-zh] translate dev/stream/state/checkpointing into Chinese
Henvealf commented on issue #9805: [FLINK-11635][docs-zh] translate dev/stream/state/checkpointing into Chinese URL: https://github.com/apache/flink/pull/9805#issuecomment-540951519 > Thank for addressing the comments, @Henvealf. > I find two occurrences of 'savepoint' which is suggested not to be translated according to the translation specifications. Besides that, I have no other concerns. Thanks! I have fixed this. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-14364) Allow comments fail when not ignore parse errors in CsvRowDeserializationSchema
[ https://issues.apache.org/jira/browse/FLINK-14364?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16949221#comment-16949221 ] Timo Walther commented on FLINK-14364: -- If {{setIgnoreParseErrors}} is set to {{false}}, it is fine to throw an exception. But if you enabled comments, there should be no exception. Can you provide an example that causes this error? > Allow comments fail when not ignore parse errors in > CsvRowDeserializationSchema > --- > > Key: FLINK-14364 > URL: https://issues.apache.org/jira/browse/FLINK-14364 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Reporter: Jingsong Lee >Priority: Major > Fix For: 1.10.0 > > > Use CsvRowDeserializationSchema, when setIgnoreParseErrors(false) and > setAllowComments(true). > If there are some comments in msg, will throw MismatchedInputException. > If this a bug? and we should catch MismatchedInputException and return null? > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] flinkbot edited a comment on issue #9805: [FLINK-11635][docs-zh] translate dev/stream/state/checkpointing into Chinese
flinkbot edited a comment on issue #9805: [FLINK-11635][docs-zh] translate dev/stream/state/checkpointing into Chinese URL: https://github.com/apache/flink/pull/9805#issuecomment-536295524 ## CI report: * 9ad039de6bbb953c7e2dd74b3118d80ae552f31e : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/129590285) * 5542dca83dbfb476114efb5aa2f2aa8d0626183a : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/129626942) * d4c5b15213eb89ee29880c7c8259622c9c7f9bb9 : CANCELED [Build](https://travis-ci.com/flink-ci/flink/builds/131454148) * a8394e9c2ba5aa8186e67895ad68a325f24a8a31 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/131455928) * b9268914cafdf59bd10cc47fb04616b926f55612 : UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #9832: [FLINK-11843] Bind lifespan of Dispatcher to leader session
flinkbot edited a comment on issue #9832: [FLINK-11843] Bind lifespan of Dispatcher to leader session URL: https://github.com/apache/flink/pull/9832#issuecomment-537049332 ## CI report: * abaae048fef753455970fac9d6ab421b660b0536 : UNKNOWN * b96c63552ccd322adae7a41a410615e95b538ece : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/129867595) * 2c95a3939dbf0259d694af6c69451f0ede3c3891 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/130066310) * 6e030add922011ea54178690f171911d0139f14b : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/130858235) * 271703eda6f6c55b1641a54206109ef659f62854 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/131454171) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #9860: [FLINK-14331][runtime] Reset vertices right after they transition to terminated states
flinkbot edited a comment on issue #9860: [FLINK-14331][runtime] Reset vertices right after they transition to terminated states URL: https://github.com/apache/flink/pull/9860#issuecomment-539900648 ## CI report: * 80c100513c089a9dd0930aa547383ae970c4e7f8 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/131094955) * 2de47e991caa74adea1792b6b9153dee94e46b95 : PENDING [Build](https://travis-ci.com/flink-ci/flink/builds/131457976) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #9877: [FLINK-14360][yarn] flink on yarn support obtain delegation token for multi hdfs.
flinkbot edited a comment on issue #9877: [FLINK-14360][yarn] flink on yarn support obtain delegation token for multi hdfs. URL: https://github.com/apache/flink/pull/9877#issuecomment-540485056 ## CI report: * fdeaa9ae3a6e7f7f9ab22ca60d17bfae4cd24670 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/131283573) * 084b311d28ff32d5a82fa56d4077b2a30a5c6f64 : UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] KarmaGYZ commented on a change in pull request #9873: [FLINK-14336][DataStream] Log Exception for failed checkpoint on TaskExecutor side
KarmaGYZ commented on a change in pull request #9873: [FLINK-14336][DataStream] Log Exception for failed checkpoint on TaskExecutor side URL: https://github.com/apache/flink/pull/9873#discussion_r333869118 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java ## @@ -1115,6 +1115,12 @@ public void run() { checkpointMetaData.getCheckpointId()); } } catch (Exception e) { + if (LOG.isDebugEnabled()) { + LOG.debug("{} - asynchronous part of checkpoint {} could not be completed.", Review comment: Make sense. +1 for merge. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] Myasuka commented on issue #9386: [FLINK-13601][tests] Harden RegionFailoverITCase by recording info when checkpoint just completed
Myasuka commented on issue #9386: [FLINK-13601][tests] Harden RegionFailoverITCase by recording info when checkpoint just completed URL: https://github.com/apache/flink/pull/9386#issuecomment-540963010 @zentol Would you please take a look again at this PR? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #9824: [FLINK-14302] FlinkKafkaInternalProducer should not send `ADD_PARTITIONS_TO_TXN` request if `newPartitionsInTransaction` is empty when enable
flinkbot edited a comment on issue #9824: [FLINK-14302] FlinkKafkaInternalProducer should not send `ADD_PARTITIONS_TO_TXN` request if `newPartitionsInTransaction` is empty when enable EoS URL: https://github.com/apache/flink/pull/9824#issuecomment-536928428 ## CI report: * 87ae788d6490da7af5284c404648647c9919a6df : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/129820284) * c4b5a18b039a90aa6b313c43f423622536bc8cd4 : PENDING [Build](https://travis-ci.com/flink-ci/flink/builds/131462013) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #9860: [FLINK-14331][runtime] Reset vertices right after they transition to terminated states
flinkbot edited a comment on issue #9860: [FLINK-14331][runtime] Reset vertices right after they transition to terminated states URL: https://github.com/apache/flink/pull/9860#issuecomment-539900648 ## CI report: * 80c100513c089a9dd0930aa547383ae970c4e7f8 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/131094955) * 2de47e991caa74adea1792b6b9153dee94e46b95 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/131457976) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #9877: [FLINK-14360][yarn] flink on yarn support obtain delegation token for multi hdfs.
flinkbot edited a comment on issue #9877: [FLINK-14360][yarn] flink on yarn support obtain delegation token for multi hdfs. URL: https://github.com/apache/flink/pull/9877#issuecomment-540485056 ## CI report: * fdeaa9ae3a6e7f7f9ab22ca60d17bfae4cd24670 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/131283573) * 084b311d28ff32d5a82fa56d4077b2a30a5c6f64 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/131462022) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-14359) Create a module called flink-sql-connector-hbase to shade HBase
[ https://issues.apache.org/jira/browse/FLINK-14359?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16949271#comment-16949271 ] Zheng Hu commented on FLINK-14359: -- When run the sql-client using the packed shaded hbase lib, I found hbase client still try to load some non-shaded hbase class such as the following stracktrace: {code} Flink SQL> select * from MyHBaseSource; [ERROR] Could not execute SQL statement. Reason: org.apache.flink.runtime.rest.util.RestClientException: [Internal server error., (JobManagerRunnerImpl.java:152) at org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner(DefaultJobManagerRunnerFactory.java:84) at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:380) at org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:34) ... 7 more Caused by: org.apache.flink.runtime.client.JobExecutionException: Cannot initialize task 'Source: HBaseTableSource(rowkey, a, b) -> Map -> to: Tuple2 -> Sink: SQL Client Stream Collect Sink': Configuring the input format (null) failed: Cannot create connection to HBase. at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:218) at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:106) at org.apache.flink.runtime.scheduler.SchedulerBase.createExecutionGraph(SchedulerBase.java:222) at org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:199) at org.apache.flink.runtime.scheduler.SchedulerBase.(SchedulerBase.java:188) at org.apache.flink.runtime.scheduler.LegacyScheduler.(LegacyScheduler.java:65) at org.apache.flink.runtime.scheduler.LegacySchedulerFactory.createInstance(LegacySchedulerFactory.java:70) at org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:278) at org.apache.flink.runtime.jobmaster.JobMaster.(JobMaster.java:266) at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:98) at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:40) at org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.(JobManagerRunnerImpl.java:146) ... 10 more Caused by: java.lang.Exception: Configuring the input format (null) failed: Cannot create connection to HBase. at org.apache.flink.runtime.jobgraph.InputOutputFormatVertex.initializeOnMaster(InputOutputFormatVertex.java:80) at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:214) ... 21 more Caused by: java.lang.RuntimeException: Cannot create connection to HBase. at org.apache.flink.addons.hbase.HBaseRowInputFormat.connectToTable(HBaseRowInputFormat.java:103) at org.apache.flink.addons.hbase.HBaseRowInputFormat.configure(HBaseRowInputFormat.java:68) at org.apache.flink.runtime.jobgraph.InputOutputFormatVertex.initializeOnMaster(InputOutputFormatVertex.java:77) ... 22 more Caused by: java.io.IOException: java.lang.reflect.InvocationTargetException at org.apache.flink.hbase.shaded.org.apache.hadoop.hbase.client.ConnectionFactory.createConnection(ConnectionFactory.java:240) at org.apache.flink.hbase.shaded.org.apache.hadoop.hbase.client.ConnectionFactory.createConnection(ConnectionFactory.java:218) at org.apache.flink.hbase.shaded.org.apache.hadoop.hbase.client.ConnectionFactory.createConnection(ConnectionFactory.java:119) at org.apache.flink.addons.hbase.HBaseRowInputFormat.connectToTable(HBaseRowInputFormat.java:96) ... 24 more Caused by: java.lang.reflect.InvocationTargetException at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at org.apache.flink.hbase.shaded.org.apache.hadoop.hbase.client.ConnectionFactory.createConnection(ConnectionFactory.java:238) ... 27 more Caused by: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.hbase.client.ClusterStatusListener$MulticastListener not found at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2299) at
[jira] [Commented] (FLINK-14224) Kafka010ProducerITCase>KafkaProducerTestBase.testOneToOneAtLeastOnceCustomOperator fails on Travis
[ https://issues.apache.org/jira/browse/FLINK-14224?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16949276#comment-16949276 ] Till Rohrmann commented on FLINK-14224: --- [~1u0] have you tried looping the test on Travis? > Kafka010ProducerITCase>KafkaProducerTestBase.testOneToOneAtLeastOnceCustomOperator > fails on Travis > -- > > Key: FLINK-14224 > URL: https://issues.apache.org/jira/browse/FLINK-14224 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.10.0 >Reporter: Till Rohrmann >Assignee: Alex >Priority: Blocker > Labels: test-stability > Fix For: 1.10.0 > > > The > {{Kafka010ProducerITCase>KafkaProducerTestBase.testOneToOneAtLeastOnceCustomOperator}} > fails on Travis with > {code} > Test > testOneToOneAtLeastOnceCustomOperator(org.apache.flink.streaming.connectors.kafka.Kafka010ProducerITCase) > failed with: > java.lang.AssertionError: Job should fail! > at org.junit.Assert.fail(Assert.java:88) > at > org.apache.flink.streaming.connectors.kafka.KafkaProducerTestBase.testOneToOneAtLeastOnce(KafkaProducerTestBase.java:280) > at > org.apache.flink.streaming.connectors.kafka.KafkaProducerTestBase.testOneToOneAtLeastOnceCustomOperator(KafkaProducerTestBase.java:214) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55) > at org.junit.rules.RunRules.evaluate(RunRules.java:20) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) > at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) > at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48) > at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48) > at org.junit.rules.RunRules.evaluate(RunRules.java:20) > at org.junit.runners.ParentRunner.run(ParentRunner.java:363) > at > org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:365) > at > org.apache.maven.surefire.junit4.JUnit4Provider.executeWithRerun(JUnit4Provider.java:273) > at > org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:238) > at > org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:159) > at > org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:384) > at > org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:345) > at > org.apache.maven.surefire.booter.ForkedBooter.execute(ForkedBooter.java:126) > at > org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:418) > {code} > https://api.travis-ci.com/v3/job/238920411/log.txt -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] flinkbot edited a comment on issue #9386: [FLINK-13601][tests] Harden RegionFailoverITCase by recording info when checkpoint just completed
flinkbot edited a comment on issue #9386: [FLINK-13601][tests] Harden RegionFailoverITCase by recording info when checkpoint just completed URL: https://github.com/apache/flink/pull/9386#issuecomment-519211363 ## CI report: * 9a4ab3785d8cf1c87d11df6390ee435664cfd964 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/122323718) * 2b6eeb771d5baa303e2663d17bc4b7d77d8b4280 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/124107744) * d1549ae43bdf7863057bb75860042c84048db67b : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/124156091) * 39c3034ffdbe7498d5a78534c2e0969852e81fdf : UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-14309) Kafka09ProducerITCase>KafkaProducerTestBase.testOneToOneAtLeastOnceRegularSink fails on Travis
[ https://issues.apache.org/jira/browse/FLINK-14309?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16949307#comment-16949307 ] Till Rohrmann commented on FLINK-14309: --- Does it make sense to backport this fix to earlier release branches [~becket_qin]? > Kafka09ProducerITCase>KafkaProducerTestBase.testOneToOneAtLeastOnceRegularSink > fails on Travis > -- > > Key: FLINK-14309 > URL: https://issues.apache.org/jira/browse/FLINK-14309 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.10.0 >Reporter: Till Rohrmann >Assignee: Jiayi Liao >Priority: Critical > Labels: pull-request-available, test-stability > Fix For: 1.10.0 > > Time Spent: 20m > Remaining Estimate: 0h > > The > {{Kafka09ProducerITCase>KafkaProducerTestBase.testOneToOneAtLeastOnceRegularSink}} > fails on Travis with > {code} > Test > testOneToOneAtLeastOnceRegularSink(org.apache.flink.streaming.connectors.kafka.Kafka09ProducerITCase) > failed with: > java.lang.AssertionError: Job should fail! > at org.junit.Assert.fail(Assert.java:88) > at > org.apache.flink.streaming.connectors.kafka.KafkaProducerTestBase.testOneToOneAtLeastOnce(KafkaProducerTestBase.java:280) > at > org.apache.flink.streaming.connectors.kafka.KafkaProducerTestBase.testOneToOneAtLeastOnceRegularSink(KafkaProducerTestBase.java:206) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55) > at org.junit.rules.RunRules.evaluate(RunRules.java:20) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) > at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) > at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48) > at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48) > at org.junit.rules.RunRules.evaluate(RunRules.java:20) > at org.junit.runners.ParentRunner.run(ParentRunner.java:363) > at > org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:365) > at > org.apache.maven.surefire.junit4.JUnit4Provider.executeWithRerun(JUnit4Provider.java:273) > at > org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:238) > at > org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:159) > at > org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:384) > at > org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:345) > at > org.apache.maven.surefire.booter.ForkedBooter.execute(ForkedBooter.java:126) > at > org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:418) > {code} > https://api.travis-ci.com/v3/job/240747188/log.txt -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] pnowojski commented on a change in pull request #9646: [FLINK-14004][runtime] Define SourceReaderOperator to verify the integration with StreamOneInputProcessor
pnowojski commented on a change in pull request #9646: [FLINK-14004][runtime] Define SourceReaderOperator to verify the integration with StreamOneInputProcessor URL: https://github.com/apache/flink/pull/9646#discussion_r333898900 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceReaderOperator.java ## @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput; + +/** + * Base source operator only used for integrating the source reader which is proposed by FLIP-27. It implements + * the interface of {@link PushingAsyncDataInput} for naturally compatible with one input processing in runtime + * stack. + * + * @param The output type of the operator + */ +@PublicEvolving +public abstract class SourceReaderOperator extends AbstractStreamOperator implements PushingAsyncDataInput { Review comment: nit: maybe add a note, that we are expecting this to be changed to the concrete class once `SourceReader` interface is introduced? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] pnowojski commented on a change in pull request #9646: [FLINK-14004][runtime] Define SourceReaderOperator to verify the integration with StreamOneInputProcessor
pnowojski commented on a change in pull request #9646: [FLINK-14004][runtime] Define SourceReaderOperator to verify the integration with StreamOneInputProcessor URL: https://github.com/apache/flink/pull/9646#discussion_r333898651 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceReaderOperator.java ## @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput; + +/** + * Base source operator only used for integrating the source reader which is proposed by FLIP-27. It implements + * the interface of {@link PushingAsyncDataInput} for naturally compatible with one input processing in runtime + * stack. + * + * @param The output type of the operator + */ +@PublicEvolving Review comment: I think I would mark it as `Internal` and let the API folks later decide whether to expose this, or just the `SourceReader`. (Imo there is no need to expose the `SourceReaderOperator`). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #9824: [FLINK-14302] FlinkKafkaInternalProducer should not send `ADD_PARTITIONS_TO_TXN` request if `newPartitionsInTransaction` is empty when enable
flinkbot edited a comment on issue #9824: [FLINK-14302] FlinkKafkaInternalProducer should not send `ADD_PARTITIONS_TO_TXN` request if `newPartitionsInTransaction` is empty when enable EoS URL: https://github.com/apache/flink/pull/9824#issuecomment-536928428 ## CI report: * 87ae788d6490da7af5284c404648647c9919a6df : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/129820284) * c4b5a18b039a90aa6b313c43f423622536bc8cd4 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/131462013) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-14371) Enable ClassLoaderITCase to pass with scheduler NG
Zhu Zhu created FLINK-14371: --- Summary: Enable ClassLoaderITCase to pass with scheduler NG Key: FLINK-14371 URL: https://issues.apache.org/jira/browse/FLINK-14371 Project: Flink Issue Type: Sub-task Components: Runtime / Coordination, Tests Affects Versions: 1.10.0 Reporter: Zhu Zhu Fix For: 1.10.0 ClassLoaderITCase now fails with scheduler NG. There are 3 reasons for the failure: 1. state restore is not supported in scheduler NG yet 2. the cause of the expected exception is a bit different 3. there are multiples tasks in multiple regions, which will result in more failovers than expected as scheduler NG is using region failover We need to support the state restore in scheduler NG and them fix this case. And then annotate it with AlsoRunWithSchedulerNG. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] flinkbot edited a comment on issue #9865: [FLINK-14212][python] Support no-argument Python UDFs.
flinkbot edited a comment on issue #9865: [FLINK-14212][python] Support no-argument Python UDFs. URL: https://github.com/apache/flink/pull/9865#issuecomment-539919841 ## CI report: * bf1d566ea2f91d61c2f436bb92b5337088b7 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/131102433) * 3110f74ae60ed10bdb2bdbed7dd1facfde9fdeea : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/131266172) * f48b910ceee34ff7eb9f6f75d7782b49005c587f : CANCELED [Build](https://travis-ci.com/flink-ci/flink/builds/131275232) * c97adb28424b4518535c9922b011d7adcf5e842f : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/131279546) * 4edf1202d5b9c34f329ca29ed96430b69a5807cd : PENDING [Build](https://travis-ci.com/flink-ci/flink/builds/131468804) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-14372) Enable KeyedStateCheckpointingITCase to pass with scheduler NG
Zhu Zhu created FLINK-14372: --- Summary: Enable KeyedStateCheckpointingITCase to pass with scheduler NG Key: FLINK-14372 URL: https://issues.apache.org/jira/browse/FLINK-14372 Project: Flink Issue Type: Sub-task Components: Runtime / Coordination, Tests Affects Versions: 1.10.0 Reporter: Zhu Zhu Fix For: 1.10.0 ClassLoaderITCase now fails with scheduler NG. The failure cause is that state restore is not supported in scheduler NG yet. We need to support the state restore in scheduler NG and annotate it with AlsoRunWithSchedulerNG. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-14372) Enable KeyedStateCheckpointingITCase to pass with scheduler NG
[ https://issues.apache.org/jira/browse/FLINK-14372?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhu Zhu updated FLINK-14372: Description: KeyedStateCheckpointingITCase currently fails with scheduler NG. The failure cause is that state restore is not supported in scheduler NG yet. We need to support the state restore in scheduler NG and annotate it with AlsoRunWithSchedulerNG. was: ClassLoaderITCase now fails with scheduler NG. The failure cause is that state restore is not supported in scheduler NG yet. We need to support the state restore in scheduler NG and annotate it with AlsoRunWithSchedulerNG. > Enable KeyedStateCheckpointingITCase to pass with scheduler NG > -- > > Key: FLINK-14372 > URL: https://issues.apache.org/jira/browse/FLINK-14372 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Coordination, Tests >Affects Versions: 1.10.0 >Reporter: Zhu Zhu >Priority: Major > Fix For: 1.10.0 > > > KeyedStateCheckpointingITCase currently fails with scheduler NG. > The failure cause is that state restore is not supported in scheduler NG yet. > We need to support the state restore in scheduler NG and annotate it with > AlsoRunWithSchedulerNG. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] flinkbot edited a comment on issue #9386: [FLINK-13601][tests] Harden RegionFailoverITCase by recording info when checkpoint just completed
flinkbot edited a comment on issue #9386: [FLINK-13601][tests] Harden RegionFailoverITCase by recording info when checkpoint just completed URL: https://github.com/apache/flink/pull/9386#issuecomment-519211363 ## CI report: * 9a4ab3785d8cf1c87d11df6390ee435664cfd964 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/122323718) * 2b6eeb771d5baa303e2663d17bc4b7d77d8b4280 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/124107744) * d1549ae43bdf7863057bb75860042c84048db67b : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/124156091) * 39c3034ffdbe7498d5a78534c2e0969852e81fdf : PENDING [Build](https://travis-ci.com/flink-ci/flink/builds/131468822) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-14373) Enable ZooKeeperHighAvailabilityITCase to pass with scheduler NG
Zhu Zhu created FLINK-14373: --- Summary: Enable ZooKeeperHighAvailabilityITCase to pass with scheduler NG Key: FLINK-14373 URL: https://issues.apache.org/jira/browse/FLINK-14373 Project: Flink Issue Type: Sub-task Components: Runtime / Coordination, Tests Affects Versions: 1.10.0 Reporter: Zhu Zhu Fix For: 1.10.0 ZooKeeperHighAvailabilityITCase currently fails with scheduler NG. The failure cause is that it will invoke ExecutionGraph#failGlobal but that method is not ready for use in scheduler NG. We need to support failGlobal in scheduler NG to make this case pass with scheduler NG. And then annotate it with AlsoRunWithSchedulerNG. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (FLINK-13567) Avro Confluent Schema Registry nightly end-to-end test failed on Travis
[ https://issues.apache.org/jira/browse/FLINK-13567?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16949313#comment-16949313 ] Till Rohrmann edited comment on FLINK-13567 at 10/11/19 9:30 AM: - I've disabled the test in {{master}} via 2c2095bdad3d47f27973a585112ed820f457de6f until the problem has been fixed. was (Author: till.rohrmann): I've disabled the test via 2c2095bdad3d47f27973a585112ed820f457de6f until the problem has been fixed. > Avro Confluent Schema Registry nightly end-to-end test failed on Travis > --- > > Key: FLINK-13567 > URL: https://issues.apache.org/jira/browse/FLINK-13567 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka, Tests >Affects Versions: 1.8.2, 1.9.0, 1.10.0 >Reporter: Till Rohrmann >Priority: Blocker > Labels: test-stability > Fix For: 1.10.0 > > > The {{Avro Confluent Schema Registry nightly end-to-end test}} failed on > Travis with > {code} > [FAIL] 'Avro Confluent Schema Registry nightly end-to-end test' failed after > 2 minutes and 11 seconds! Test exited with exit code 1 > No taskexecutor daemon (pid: 29044) is running anymore on > travis-job-b0823aec-c4ec-4d4b-8b59-e9f968de9501. > No standalonesession daemon to stop on host > travis-job-b0823aec-c4ec-4d4b-8b59-e9f968de9501. > rm: cannot remove > '/home/travis/build/apache/flink/flink-dist/target/flink-1.10-SNAPSHOT-bin/flink-1.10-SNAPSHOT/plugins': > No such file or directory > {code} > https://api.travis-ci.org/v3/job/567273939/log.txt -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-13567) Avro Confluent Schema Registry nightly end-to-end test failed on Travis
[ https://issues.apache.org/jira/browse/FLINK-13567?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16949313#comment-16949313 ] Till Rohrmann commented on FLINK-13567: --- I've disabled the test via 2c2095bdad3d47f27973a585112ed820f457de6f until the problem has been fixed. > Avro Confluent Schema Registry nightly end-to-end test failed on Travis > --- > > Key: FLINK-13567 > URL: https://issues.apache.org/jira/browse/FLINK-13567 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka, Tests >Affects Versions: 1.8.2, 1.9.0, 1.10.0 >Reporter: Till Rohrmann >Priority: Blocker > Labels: test-stability > Fix For: 1.10.0 > > > The {{Avro Confluent Schema Registry nightly end-to-end test}} failed on > Travis with > {code} > [FAIL] 'Avro Confluent Schema Registry nightly end-to-end test' failed after > 2 minutes and 11 seconds! Test exited with exit code 1 > No taskexecutor daemon (pid: 29044) is running anymore on > travis-job-b0823aec-c4ec-4d4b-8b59-e9f968de9501. > No standalonesession daemon to stop on host > travis-job-b0823aec-c4ec-4d4b-8b59-e9f968de9501. > rm: cannot remove > '/home/travis/build/apache/flink/flink-dist/target/flink-1.10-SNAPSHOT-bin/flink-1.10-SNAPSHOT/plugins': > No such file or directory > {code} > https://api.travis-ci.org/v3/job/567273939/log.txt -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] flinkbot edited a comment on issue #9820: [FLINK-14290] Decouple plan translation from job execution/ClusterClient
flinkbot edited a comment on issue #9820: [FLINK-14290] Decouple plan translation from job execution/ClusterClient URL: https://github.com/apache/flink/pull/9820#issuecomment-536452582 ## CI report: * ff3ef7ae21616fb0295e3bdc53fa349c8f136a4b : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/129646465) * 654524c3508c89d08d5039a451ba425a5a2e3a41 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/129666135) * 7f462e2cad4ca8866abccda492b53007a45b3407 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/129712387) * 196dde727329e41b1a08dc6d5093d5190f5c0ea4 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/131524743) * 013151873e2ad7a1c79284c3807380571b663b98 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/131530297) * e0328ab5dc8c3c24a842281afbf36ecaf98234a3 : UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] Henvealf commented on a change in pull request #9805: [FLINK-11635][docs-zh] translate dev/stream/state/checkpointing into Chinese
Henvealf commented on a change in pull request #9805: [FLINK-11635][docs-zh] translate dev/stream/state/checkpointing into Chinese URL: https://github.com/apache/flink/pull/9805#discussion_r334032024 ## File path: docs/dev/stream/state/checkpointing.zh.md ## @@ -25,146 +25,138 @@ under the License. * ToC {:toc} -Every function and operator in Flink can be **stateful** (see [working with state](state.html) for details). -Stateful functions store data across the processing of individual elements/events, making state a critical building block for -any type of more elaborate operation. +Flink 中的每个方法或算子都能够是**有状态的**(阅读 [working with state](state.html) 查看详细)。 +状态化的方法在处理单个 元素/事件 的时候存储数据,让状态成为使各个类型的算子更加精细的重要部分。 +为了让状态容错,Flink 需要为状态添加**Checkpoint(检查点)**。Checkpoint 使得 Flink 能够恢复状态和在流中的位置,从而向应用提供和无故障执行时一样的语义。 -In order to make state fault tolerant, Flink needs to **checkpoint** the state. Checkpoints allow Flink to recover state and positions -in the streams to give the application the same semantics as a failure-free execution. +[Documentation on streaming fault tolerance]({{ site.baseurl }}/zh/internals/stream_checkpointing.html) 介绍了 Flink 流计算容错机制的内部技术原理。 Review comment: 好 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] xintongsong commented on a change in pull request #9760: [FLINK-13982][runtime] Implement memory calculation logics
xintongsong commented on a change in pull request #9760: [FLINK-13982][runtime] Implement memory calculation logics URL: https://github.com/apache/flink/pull/9760#discussion_r334039016 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtils.java ## @@ -0,0 +1,659 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.clusterframework; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.IllegalConfigurationException; +import org.apache.flink.configuration.MemorySize; +import org.apache.flink.configuration.NettyShuffleEnvironmentOptions; +import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.runtime.util.ConfigurationParserUtils; + +import java.util.HashMap; +import java.util.Map; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Utility class for TaskExecutor memory configurations. + * + * A TaskExecutor's memory consists of the following components. + * + * Framework Heap Memory + * Task Heap Memory + * Task Off-Heap Memory + * Shuffle Memory + * Managed Memory + * + * On-Heap Managed Memory + * Off-Heap Managed Memory + * + * JVM Metaspace + * JVM Overhead + * + * Among all the components, Framework Heap Memory, Task Heap Memory and On-Heap Managed Memory use on heap memory, + * while the rest use off heap memory. We use Total Process Memory to refer to all the memory components, while Total + * Flink Memory refering to all the components except JVM Metaspace and JVM Overhead. + * + * The relationships of TaskExecutor memory components are shown below. + * + * ┌ ─ ─ Total Process Memory ─ ─ ┐ + *┌ ─ ─ Total Flink Memory ─ ─ ┐ + * │ ┌───┐ │ + *││ Framework Heap Memory ││ ─┐ + * │ └───┘ │ │ + *│┌───┐│ │ + * │ │ Task Heap Memory │ │ ─┤ + *│└───┘│ │ + * │ ┌───┐ │ │ + *┌─ ││ Task Off-Heap Memory││ │ + *│ │ └───┘ │ ├─ On-Heap + *│ │┌───┐│ │ + *├─ │ │ Shuffle Memory │ │ │ + *│ │└───┘│ │ + *│ │ ┌─ Managed Memory ──┐ │ │ + *│ ││┌─┐││ │ + *│ │ ││ On-Heap Managed Memory ││ │ ─┘ + *│ ││├─┤││ + * Off-Heap ─┼─ │ ││ Off-Heap Managed Memory ││ │ + *│ ││└─┘││ + *│ │ └───┘ │ + *│ └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘ + *│ │┌─┐│ + *├─ │JVM Metaspace│ + *│ │└─┘│ + *│ ┌─┐ + *└─ ││JVM Overhead ││ + *└─┘ + * └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘ + * + */ +public class TaskExecutorResourceUtils { + + private TaskExecutorResourceUtils() {} + + // + // Generating JVM Parameters + // + + public static String generateJvmParametersStr(final TaskExecutorResourceSpec taskExecutorResourceSpec) { + final MemorySize jvmHeapSize = taskExecutorResourceSpec.getFrameworkHeapSize() + .add(taskExecutorResourceSpec.getTaskHeapSize()) + .add(taskExecutorResourceSpec.getOnHeapManagedMemorySize()); + final MemorySize jvmDirectSize = taskExecutorResourceSpec.getTaskOffHeapSize() +
[GitHub] [flink] tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend
tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend URL: https://github.com/apache/flink/pull/9501#discussion_r334028658 ## File path: flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMap.java ## @@ -1524,4 +1481,31 @@ public void update(StateEntry stateEntry, S newValue) { } } + /** +* Iterate versions of the given node. +*/ + class ValueVersionIterator implements Iterator { Review comment: `private` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend
tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend URL: https://github.com/apache/flink/pull/9501#discussion_r334020491 ## File path: flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMap.java ## @@ -310,57 +299,21 @@ public S removeAndGetOld(K key, N namespace) { * @param keyByteBuffer byte buffer storing the key. * @param keyOffset offset of the key. * @param keyLenlength of the key. -* @return id of the node. NIL_NODE will be returned if key does no exist. +* @return the state. Null will be returned if key does not exist. */ @VisibleForTesting - long getNode(ByteBuffer keyByteBuffer, int keyOffset, int keyLen) { - int deleteCount = 0; - long prevNode = findPredecessor(keyByteBuffer, keyOffset, 1); - long currentNode = helpGetNextNode(prevNode, 0); - long nextNode; - - int c; - while (currentNode != NIL_NODE) { - nextNode = helpGetNextNode(currentNode, 0); - - boolean isRemoved = isNodeRemoved(currentNode); - if (isRemoved) { - // remove the node physically when there is no snapshot running - if (highestRequiredSnapshotVersion == 0 && deleteCount < numKeysToDeleteOneTime) { - doPhysicalRemove(currentNode, prevNode, nextNode); - logicallyRemovedNodes.remove(currentNode); - totalSize--; - deleteCount++; - } else { - prevNode = currentNode; - } - currentNode = nextNode; - continue; - } - - c = compareByteBufferAndNode(keyByteBuffer, keyOffset, keyLen, currentNode); - - // find the key - if (c == 0) { - return currentNode; - } - - // the key is less than the current node, and nodes after current - // node can not be equal to the key. - if (c < 0) { - break; - } - - prevNode = currentNode; - currentNode = helpGetNextNode(currentNode, 0); - } - - return NIL_NODE; + S getNode(ByteBuffer keyByteBuffer, int keyOffset, int keyLen) { Review comment: `@Nullable` annotation is missing This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend
tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend URL: https://github.com/apache/flink/pull/9501#discussion_r334028005 ## File path: flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMap.java ## @@ -1136,10 +1088,9 @@ S helpGetState(long valuePointer, SkipListValueSerializer serializer) { return null; } - Chunk chunk = spaceAllocator.getChunkById(SpaceUtils.getChunkIdByAddress(valuePointer)); - int offsetInChunk = SpaceUtils.getChunkOffsetByAddress(valuePointer); - ByteBuffer bb = chunk.getByteBuffer(offsetInChunk); - int offsetInByteBuffer = chunk.getOffsetInByteBuffer(offsetInChunk); + Tuple2 tuple2 = getNodeByteBufferAndOffset(valuePointer); + ByteBuffer bb = tuple2.f0; + int offsetInByteBuffer = tuple2.f1; Review comment: if the `ByteBuffer` and the offset are needed by all methods to perform an operation on it, then one could directly pass the combined value object instead of unwrapping the individual fields and passing them individually to the modifying method. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend
tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend URL: https://github.com/apache/flink/pull/9501#discussion_r334020903 ## File path: flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMap.java ## @@ -310,57 +299,21 @@ public S removeAndGetOld(K key, N namespace) { * @param keyByteBuffer byte buffer storing the key. * @param keyOffset offset of the key. * @param keyLenlength of the key. -* @return id of the node. NIL_NODE will be returned if key does no exist. +* @return the state. Null will be returned if key does not exist. */ @VisibleForTesting - long getNode(ByteBuffer keyByteBuffer, int keyOffset, int keyLen) { - int deleteCount = 0; - long prevNode = findPredecessor(keyByteBuffer, keyOffset, 1); - long currentNode = helpGetNextNode(prevNode, 0); - long nextNode; - - int c; - while (currentNode != NIL_NODE) { - nextNode = helpGetNextNode(currentNode, 0); - - boolean isRemoved = isNodeRemoved(currentNode); - if (isRemoved) { - // remove the node physically when there is no snapshot running - if (highestRequiredSnapshotVersion == 0 && deleteCount < numKeysToDeleteOneTime) { - doPhysicalRemove(currentNode, prevNode, nextNode); - logicallyRemovedNodes.remove(currentNode); - totalSize--; - deleteCount++; - } else { - prevNode = currentNode; - } - currentNode = nextNode; - continue; - } - - c = compareByteBufferAndNode(keyByteBuffer, keyOffset, keyLen, currentNode); - - // find the key - if (c == 0) { - return currentNode; - } - - // the key is less than the current node, and nodes after current - // node can not be equal to the key. - if (c < 0) { - break; - } - - prevNode = currentNode; - currentNode = helpGetNextNode(currentNode, 0); - } - - return NIL_NODE; + S getNode(ByteBuffer keyByteBuffer, int keyOffset, int keyLen) { + Tuple4 result = iterateAndProcess(keyByteBuffer, keyOffset, keyLen, Review comment: I would refrain from using `Tuple4`. Instead I suggest to create an explicit class with descriptive fields. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend
tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend URL: https://github.com/apache/flink/pull/9501#discussion_r334021225 ## File path: flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMap.java ## @@ -310,57 +299,21 @@ public S removeAndGetOld(K key, N namespace) { * @param keyByteBuffer byte buffer storing the key. * @param keyOffset offset of the key. * @param keyLenlength of the key. -* @return id of the node. NIL_NODE will be returned if key does no exist. +* @return the state. Null will be returned if key does not exist. */ @VisibleForTesting - long getNode(ByteBuffer keyByteBuffer, int keyOffset, int keyLen) { - int deleteCount = 0; - long prevNode = findPredecessor(keyByteBuffer, keyOffset, 1); - long currentNode = helpGetNextNode(prevNode, 0); - long nextNode; - - int c; - while (currentNode != NIL_NODE) { - nextNode = helpGetNextNode(currentNode, 0); - - boolean isRemoved = isNodeRemoved(currentNode); - if (isRemoved) { - // remove the node physically when there is no snapshot running - if (highestRequiredSnapshotVersion == 0 && deleteCount < numKeysToDeleteOneTime) { - doPhysicalRemove(currentNode, prevNode, nextNode); - logicallyRemovedNodes.remove(currentNode); - totalSize--; - deleteCount++; - } else { - prevNode = currentNode; - } - currentNode = nextNode; - continue; - } - - c = compareByteBufferAndNode(keyByteBuffer, keyOffset, keyLen, currentNode); - - // find the key - if (c == 0) { - return currentNode; - } - - // the key is less than the current node, and nodes after current - // node can not be equal to the key. - if (c < 0) { - break; - } - - prevNode = currentNode; - currentNode = helpGetNextNode(currentNode, 0); - } - - return NIL_NODE; + S getNode(ByteBuffer keyByteBuffer, int keyOffset, int keyLen) { + Tuple4 result = iterateAndProcess(keyByteBuffer, keyOffset, keyLen, + (tuple3, isRemoved) -> { + long currentNode = tuple3.f1; + return isRemoved ? null : getNodeStateHelper(currentNode); + }); + return result.f2 ? result.f3 : null; Review comment: I have no idea what this expresses. I think there is a way to make it more expressive. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] xintongsong commented on a change in pull request #9760: [FLINK-13982][runtime] Implement memory calculation logics
xintongsong commented on a change in pull request #9760: [FLINK-13982][runtime] Implement memory calculation logics URL: https://github.com/apache/flink/pull/9760#discussion_r334046721 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtils.java ## @@ -0,0 +1,659 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.clusterframework; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.IllegalConfigurationException; +import org.apache.flink.configuration.MemorySize; +import org.apache.flink.configuration.NettyShuffleEnvironmentOptions; +import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.runtime.util.ConfigurationParserUtils; + +import java.util.HashMap; +import java.util.Map; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Utility class for TaskExecutor memory configurations. + * + * A TaskExecutor's memory consists of the following components. + * + * Framework Heap Memory + * Task Heap Memory + * Task Off-Heap Memory + * Shuffle Memory + * Managed Memory + * + * On-Heap Managed Memory + * Off-Heap Managed Memory + * + * JVM Metaspace + * JVM Overhead + * + * Among all the components, Framework Heap Memory, Task Heap Memory and On-Heap Managed Memory use on heap memory, + * while the rest use off heap memory. We use Total Process Memory to refer to all the memory components, while Total + * Flink Memory refering to all the components except JVM Metaspace and JVM Overhead. + * + * The relationships of TaskExecutor memory components are shown below. + * + * ┌ ─ ─ Total Process Memory ─ ─ ┐ + *┌ ─ ─ Total Flink Memory ─ ─ ┐ + * │ ┌───┐ │ + *││ Framework Heap Memory ││ ─┐ + * │ └───┘ │ │ + *│┌───┐│ │ + * │ │ Task Heap Memory │ │ ─┤ + *│└───┘│ │ + * │ ┌───┐ │ │ + *┌─ ││ Task Off-Heap Memory││ │ + *│ │ └───┘ │ ├─ On-Heap + *│ │┌───┐│ │ + *├─ │ │ Shuffle Memory │ │ │ + *│ │└───┘│ │ + *│ │ ┌─ Managed Memory ──┐ │ │ + *│ ││┌─┐││ │ + *│ │ ││ On-Heap Managed Memory ││ │ ─┘ + *│ ││├─┤││ + * Off-Heap ─┼─ │ ││ Off-Heap Managed Memory ││ │ + *│ ││└─┘││ + *│ │ └───┘ │ + *│ └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘ + *│ │┌─┐│ + *├─ │JVM Metaspace│ + *│ │└─┘│ + *│ ┌─┐ + *└─ ││JVM Overhead ││ + *└─┘ + * └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘ + * + */ +public class TaskExecutorResourceUtils { + + private TaskExecutorResourceUtils() {} + + // + // Generating JVM Parameters + // + + public static String generateJvmParametersStr(final TaskExecutorResourceSpec taskExecutorResourceSpec) { + final MemorySize jvmHeapSize = taskExecutorResourceSpec.getFrameworkHeapSize() + .add(taskExecutorResourceSpec.getTaskHeapSize()) + .add(taskExecutorResourceSpec.getOnHeapManagedMemorySize()); + final MemorySize jvmDirectSize = taskExecutorResourceSpec.getTaskOffHeapSize() +
[GitHub] [flink] tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend
tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend URL: https://github.com/apache/flink/pull/9501#discussion_r334018453 ## File path: flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/NodeStatus.java ## @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.heap; + +/** + * Status of the node. + */ +public enum NodeStatus { + + PUT((byte) 0), REMOVE((byte) 1); + + private byte value; Review comment: `final` is missing This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend
tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend URL: https://github.com/apache/flink/pull/9501#discussion_r334035510 ## File path: flink-state-backends/flink-statebackend-heap-spillable/src/test/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMapTest.java ## @@ -115,577 +144,783 @@ public void testInitStateMap() { assertFalse(stateMap.getStateIncrementalVisitor(100).hasNext()); stateMap.close(); - assertEquals(0, stateMap.size()); - assertEquals(0, stateMap.totalSize()); - assertTrue(stateMap.isClosed()); } /** -* Test basic operations. +* Test state put operation. */ @Test - public void testBasicOperations() throws Exception { - TypeSerializer keySerializer = IntSerializer.INSTANCE; - TypeSerializer namespaceSerializer = LongSerializer.INSTANCE; - TypeSerializer stateSerializer = StringSerializer.INSTANCE; - CopyOnWriteSkipListStateMap stateMap = new CopyOnWriteSkipListStateMap<>( - keySerializer, namespaceSerializer, stateSerializer, spaceAllocator); + public void testPutState() { + testWithFunction((totalSize, stateMap, referenceStates) -> getDefaultSizes(totalSize)); + } - ThreadLocalRandom random = ThreadLocalRandom.current(); - // map to store expected states, namespace -> key -> state - Map> referenceStates = new HashMap<>(); - int totalSize = 0; + /** +* Test remove existing state. +*/ + @Test + public void testRemoveExistingState() { + testRemoveState(false, false); + } - // put some states - for (long namespace = 0; namespace < 10; namespace++) { - for (int key = 0; key < 100; key++) { - totalSize++; - String state = String.valueOf(key * namespace); - if (random.nextBoolean()) { - stateMap.put(key, namespace, state); - } else { - assertNull(stateMap.putAndGetOld(key, namespace, state)); + /** +* Test remove and get existing state. +*/ + @Test + public void testRemoveAndGetExistingState() { + testRemoveState(false, true); + } + + /** +* Test remove absent state. +*/ + @Test + public void testRemoveAbsentState() { + testRemoveState(true, true); + } + + /** +* Test remove previously removed state. +*/ + @Test + public void testPutPreviouslyRemovedState() { + testWithFunction( + (totalSize, stateMap, referenceStates) -> applyFunctionAfterRemove(stateMap, referenceStates, + (removedCnt, removedStates) -> { + int size = totalSize - removedCnt; + for (Map.Entry> entry : removedStates.entrySet()) { + long namespace = entry.getKey(); + for (int key : entry.getValue()) { + size++; + String state = String.valueOf(key * namespace); + assertNull(stateMap.putAndGetOld(key, namespace, state)); + referenceStates.computeIfAbsent(namespace, (none) -> new HashMap<>()).put(key, String.valueOf(state)); + } + } + return getDefaultSizes(size); } - referenceStates.computeIfAbsent(namespace, (none) -> new HashMap<>()).put(key, state); - assertEquals(totalSize, stateMap.size()); - assertEquals(totalSize, stateMap.totalSize()); - } - } + ) + ); + } - // validates space allocation. Each pair need 2 spaces - assertEquals(totalSize * 2, spaceAllocator.getTotalSpaceNumber()); - verifyState(referenceStates, stateMap); + private void testRemoveState(boolean removeAbsent, boolean getOld) { + testWithFunction( + (totalSize, stateMap, referenceStates) -> { + if (removeAbsent) { +
[GitHub] [flink] tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend
tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend URL: https://github.com/apache/flink/pull/9501#discussion_r334018883 ## File path: flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/NodeStatus.java ## @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.heap; + +/** + * Status of the node. + */ +public enum NodeStatus { + + PUT((byte) 0), REMOVE((byte) 1); + + private byte value; + + NodeStatus(byte value) { + this.value = value; + } + + public byte getValue() { + return value; + } + + public static NodeStatus valueOf(byte value) { + switch (value) { + case 0: + return PUT; + case 1: + return REMOVE; + default: + throw new RuntimeException("Unknown type: " + value); Review comment: I would throw an `IllegalArgumentException` similar to `Enum#valueOf` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend
tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend URL: https://github.com/apache/flink/pull/9501#discussion_r334038508 ## File path: flink-state-backends/flink-statebackend-heap-spillable/src/test/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMapTest.java ## @@ -115,577 +144,783 @@ public void testInitStateMap() { assertFalse(stateMap.getStateIncrementalVisitor(100).hasNext()); stateMap.close(); - assertEquals(0, stateMap.size()); - assertEquals(0, stateMap.totalSize()); - assertTrue(stateMap.isClosed()); } /** -* Test basic operations. +* Test state put operation. */ @Test - public void testBasicOperations() throws Exception { - TypeSerializer keySerializer = IntSerializer.INSTANCE; - TypeSerializer namespaceSerializer = LongSerializer.INSTANCE; - TypeSerializer stateSerializer = StringSerializer.INSTANCE; - CopyOnWriteSkipListStateMap stateMap = new CopyOnWriteSkipListStateMap<>( - keySerializer, namespaceSerializer, stateSerializer, spaceAllocator); + public void testPutState() { + testWithFunction((totalSize, stateMap, referenceStates) -> getDefaultSizes(totalSize)); + } - ThreadLocalRandom random = ThreadLocalRandom.current(); - // map to store expected states, namespace -> key -> state - Map> referenceStates = new HashMap<>(); - int totalSize = 0; + /** +* Test remove existing state. +*/ + @Test + public void testRemoveExistingState() { + testRemoveState(false, false); + } - // put some states - for (long namespace = 0; namespace < 10; namespace++) { - for (int key = 0; key < 100; key++) { - totalSize++; - String state = String.valueOf(key * namespace); - if (random.nextBoolean()) { - stateMap.put(key, namespace, state); - } else { - assertNull(stateMap.putAndGetOld(key, namespace, state)); + /** +* Test remove and get existing state. +*/ + @Test + public void testRemoveAndGetExistingState() { + testRemoveState(false, true); + } + + /** +* Test remove absent state. +*/ + @Test + public void testRemoveAbsentState() { + testRemoveState(true, true); + } + + /** +* Test remove previously removed state. +*/ + @Test + public void testPutPreviouslyRemovedState() { + testWithFunction( + (totalSize, stateMap, referenceStates) -> applyFunctionAfterRemove(stateMap, referenceStates, + (removedCnt, removedStates) -> { + int size = totalSize - removedCnt; + for (Map.Entry> entry : removedStates.entrySet()) { + long namespace = entry.getKey(); + for (int key : entry.getValue()) { + size++; + String state = String.valueOf(key * namespace); + assertNull(stateMap.putAndGetOld(key, namespace, state)); + referenceStates.computeIfAbsent(namespace, (none) -> new HashMap<>()).put(key, String.valueOf(state)); + } + } + return getDefaultSizes(size); } - referenceStates.computeIfAbsent(namespace, (none) -> new HashMap<>()).put(key, state); - assertEquals(totalSize, stateMap.size()); - assertEquals(totalSize, stateMap.totalSize()); - } - } + ) + ); + } - // validates space allocation. Each pair need 2 spaces - assertEquals(totalSize * 2, spaceAllocator.getTotalSpaceNumber()); - verifyState(referenceStates, stateMap); + private void testRemoveState(boolean removeAbsent, boolean getOld) { + testWithFunction( + (totalSize, stateMap, referenceStates) -> { + if (removeAbsent) { +
[GitHub] [flink] tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend
tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend URL: https://github.com/apache/flink/pull/9501#discussion_r334032961 ## File path: flink-state-backends/flink-statebackend-heap-spillable/src/test/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMapTest.java ## @@ -115,577 +144,783 @@ public void testInitStateMap() { assertFalse(stateMap.getStateIncrementalVisitor(100).hasNext()); stateMap.close(); - assertEquals(0, stateMap.size()); - assertEquals(0, stateMap.totalSize()); - assertTrue(stateMap.isClosed()); } /** -* Test basic operations. +* Test state put operation. */ @Test - public void testBasicOperations() throws Exception { - TypeSerializer keySerializer = IntSerializer.INSTANCE; - TypeSerializer namespaceSerializer = LongSerializer.INSTANCE; - TypeSerializer stateSerializer = StringSerializer.INSTANCE; - CopyOnWriteSkipListStateMap stateMap = new CopyOnWriteSkipListStateMap<>( - keySerializer, namespaceSerializer, stateSerializer, spaceAllocator); + public void testPutState() { + testWithFunction((totalSize, stateMap, referenceStates) -> getDefaultSizes(totalSize)); + } - ThreadLocalRandom random = ThreadLocalRandom.current(); - // map to store expected states, namespace -> key -> state - Map> referenceStates = new HashMap<>(); - int totalSize = 0; + /** +* Test remove existing state. +*/ + @Test + public void testRemoveExistingState() { + testRemoveState(false, false); + } - // put some states - for (long namespace = 0; namespace < 10; namespace++) { - for (int key = 0; key < 100; key++) { - totalSize++; - String state = String.valueOf(key * namespace); - if (random.nextBoolean()) { - stateMap.put(key, namespace, state); - } else { - assertNull(stateMap.putAndGetOld(key, namespace, state)); + /** +* Test remove and get existing state. +*/ + @Test + public void testRemoveAndGetExistingState() { + testRemoveState(false, true); + } + + /** +* Test remove absent state. +*/ + @Test + public void testRemoveAbsentState() { + testRemoveState(true, true); + } + + /** +* Test remove previously removed state. +*/ + @Test + public void testPutPreviouslyRemovedState() { + testWithFunction( + (totalSize, stateMap, referenceStates) -> applyFunctionAfterRemove(stateMap, referenceStates, + (removedCnt, removedStates) -> { + int size = totalSize - removedCnt; + for (Map.Entry> entry : removedStates.entrySet()) { + long namespace = entry.getKey(); + for (int key : entry.getValue()) { + size++; + String state = String.valueOf(key * namespace); + assertNull(stateMap.putAndGetOld(key, namespace, state)); + referenceStates.computeIfAbsent(namespace, (none) -> new HashMap<>()).put(key, String.valueOf(state)); + } + } + return getDefaultSizes(size); } - referenceStates.computeIfAbsent(namespace, (none) -> new HashMap<>()).put(key, state); - assertEquals(totalSize, stateMap.size()); - assertEquals(totalSize, stateMap.totalSize()); - } - } + ) + ); Review comment: tbh I'm not sure whether I really understand what's going on here. All the different layers of functions which take other functions to do something makes it super hard to reason about the test case. Usually for tests I'm a big fan of explicitness. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use
[GitHub] [flink] tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend
tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend URL: https://github.com/apache/flink/pull/9501#discussion_r334021079 ## File path: flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMap.java ## @@ -310,57 +299,21 @@ public S removeAndGetOld(K key, N namespace) { * @param keyByteBuffer byte buffer storing the key. * @param keyOffset offset of the key. * @param keyLenlength of the key. -* @return id of the node. NIL_NODE will be returned if key does no exist. +* @return the state. Null will be returned if key does not exist. */ @VisibleForTesting - long getNode(ByteBuffer keyByteBuffer, int keyOffset, int keyLen) { - int deleteCount = 0; - long prevNode = findPredecessor(keyByteBuffer, keyOffset, 1); - long currentNode = helpGetNextNode(prevNode, 0); - long nextNode; - - int c; - while (currentNode != NIL_NODE) { - nextNode = helpGetNextNode(currentNode, 0); - - boolean isRemoved = isNodeRemoved(currentNode); - if (isRemoved) { - // remove the node physically when there is no snapshot running - if (highestRequiredSnapshotVersion == 0 && deleteCount < numKeysToDeleteOneTime) { - doPhysicalRemove(currentNode, prevNode, nextNode); - logicallyRemovedNodes.remove(currentNode); - totalSize--; - deleteCount++; - } else { - prevNode = currentNode; - } - currentNode = nextNode; - continue; - } - - c = compareByteBufferAndNode(keyByteBuffer, keyOffset, keyLen, currentNode); - - // find the key - if (c == 0) { - return currentNode; - } - - // the key is less than the current node, and nodes after current - // node can not be equal to the key. - if (c < 0) { - break; - } - - prevNode = currentNode; - currentNode = helpGetNextNode(currentNode, 0); - } - - return NIL_NODE; + S getNode(ByteBuffer keyByteBuffer, int keyOffset, int keyLen) { + Tuple4 result = iterateAndProcess(keyByteBuffer, keyOffset, keyLen, + (tuple3, isRemoved) -> { Review comment: Same here. It is super hard to know what's inside of `tuple3`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend
tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend URL: https://github.com/apache/flink/pull/9501#discussion_r334028341 ## File path: flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMap.java ## @@ -1367,6 +1317,14 @@ public Long next() { } } + private Tuple2 getNodeByteBufferAndOffset(long node) { Review comment: I would introduce a dedicated class for this instead of using the `Tuple2` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend
tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend URL: https://github.com/apache/flink/pull/9501#discussion_r334036060 ## File path: flink-state-backends/flink-statebackend-heap-spillable/src/test/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMapTest.java ## @@ -115,577 +144,783 @@ public void testInitStateMap() { assertFalse(stateMap.getStateIncrementalVisitor(100).hasNext()); stateMap.close(); - assertEquals(0, stateMap.size()); - assertEquals(0, stateMap.totalSize()); - assertTrue(stateMap.isClosed()); } /** -* Test basic operations. +* Test state put operation. */ @Test - public void testBasicOperations() throws Exception { - TypeSerializer keySerializer = IntSerializer.INSTANCE; - TypeSerializer namespaceSerializer = LongSerializer.INSTANCE; - TypeSerializer stateSerializer = StringSerializer.INSTANCE; - CopyOnWriteSkipListStateMap stateMap = new CopyOnWriteSkipListStateMap<>( - keySerializer, namespaceSerializer, stateSerializer, spaceAllocator); + public void testPutState() { + testWithFunction((totalSize, stateMap, referenceStates) -> getDefaultSizes(totalSize)); + } - ThreadLocalRandom random = ThreadLocalRandom.current(); - // map to store expected states, namespace -> key -> state - Map> referenceStates = new HashMap<>(); - int totalSize = 0; + /** +* Test remove existing state. +*/ + @Test + public void testRemoveExistingState() { + testRemoveState(false, false); + } - // put some states - for (long namespace = 0; namespace < 10; namespace++) { - for (int key = 0; key < 100; key++) { - totalSize++; - String state = String.valueOf(key * namespace); - if (random.nextBoolean()) { - stateMap.put(key, namespace, state); - } else { - assertNull(stateMap.putAndGetOld(key, namespace, state)); + /** +* Test remove and get existing state. +*/ + @Test + public void testRemoveAndGetExistingState() { + testRemoveState(false, true); + } + + /** +* Test remove absent state. +*/ + @Test + public void testRemoveAbsentState() { + testRemoveState(true, true); + } + + /** +* Test remove previously removed state. +*/ + @Test + public void testPutPreviouslyRemovedState() { + testWithFunction( + (totalSize, stateMap, referenceStates) -> applyFunctionAfterRemove(stateMap, referenceStates, + (removedCnt, removedStates) -> { + int size = totalSize - removedCnt; + for (Map.Entry> entry : removedStates.entrySet()) { + long namespace = entry.getKey(); + for (int key : entry.getValue()) { + size++; + String state = String.valueOf(key * namespace); + assertNull(stateMap.putAndGetOld(key, namespace, state)); + referenceStates.computeIfAbsent(namespace, (none) -> new HashMap<>()).put(key, String.valueOf(state)); + } + } + return getDefaultSizes(size); } - referenceStates.computeIfAbsent(namespace, (none) -> new HashMap<>()).put(key, state); - assertEquals(totalSize, stateMap.size()); - assertEquals(totalSize, stateMap.totalSize()); - } - } + ) + ); + } - // validates space allocation. Each pair need 2 spaces - assertEquals(totalSize * 2, spaceAllocator.getTotalSpaceNumber()); - verifyState(referenceStates, stateMap); + private void testRemoveState(boolean removeAbsent, boolean getOld) { + testWithFunction( + (totalSize, stateMap, referenceStates) -> { + if (removeAbsent) { +
[GitHub] [flink] tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend
tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend URL: https://github.com/apache/flink/pull/9501#discussion_r334021831 ## File path: flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMap.java ## @@ -371,72 +324,44 @@ long getNode(ByteBuffer keyByteBuffer, int keyOffset, int keyLen) { */ @VisibleForTesting S putNode(ByteBuffer keyByteBuffer, int keyOffset, int keyLen, byte[] value, boolean returnOldState) { - int deleteCount = 0; - long prevNode = findPredecessor(keyByteBuffer, keyOffset, 1); - long currentNode = helpGetNextNode(prevNode, 0); - long nextNode; - - int c; - for ( ; ; ) { - if (currentNode != NIL_NODE) { - nextNode = helpGetNextNode(currentNode, 0); + Tuple4 result = iterateAndProcess(keyByteBuffer, keyOffset, keyLen, + (tuple3, isRemoved) -> { Review comment: I would suggest to factor this out in a separate method This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend
tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend URL: https://github.com/apache/flink/pull/9501#discussion_r334028713 ## File path: flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMap.java ## @@ -1524,4 +1481,31 @@ public void update(StateEntry stateEntry, S newValue) { } } + /** +* Iterate versions of the given node. +*/ + class ValueVersionIterator implements Iterator { + long valuePointer; Review comment: `private` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend
tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend URL: https://github.com/apache/flink/pull/9501#discussion_r334024027 ## File path: flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMap.java ## @@ -472,6 +397,74 @@ S putNode(ByteBuffer keyByteBuffer, int keyOffset, int keyLen, byte[] value, boo * @return the old state. Null will be returned if key does not exist or returnOldState is false. */ private S removeNode(ByteBuffer keyByteBuffer, int keyOffset, int keyLen, boolean returnOldState) { + Tuple4 result = iterateAndProcess(keyByteBuffer, keyOffset, keyLen, + (tuple3, isRemoved) -> { + long prevNode = tuple3.f0; + long currentNode = tuple3.f1; + long nextNode = tuple3.f2; + // if the node has been logically removed, and can not be physically + // removed here, just return null + if (isRemoved && highestRequiredSnapshotVersion != 0) { + return null; + } + + long oldValuePointer; + boolean oldValueNeedFree; + + if (highestRequiredSnapshotVersion == 0) { + // do physically remove only when there is no snapshot running + oldValuePointer = doPhysicalRemoveAndGetValue(currentNode, prevNode, nextNode); + // the node has been logically removed, and remove it from the set + if (isRemoved) { + logicallyRemovedNodes.remove(currentNode); + } + oldValueNeedFree = true; + } else { + int version = SkipListUtils.helpGetNodeLatestVersion(currentNode, spaceAllocator); + if (version < highestRequiredSnapshotVersion) { + // the newest-version value may be used by snapshots, and update it with copy-on-write + oldValuePointer = updateValueWithCopyOnWrite(currentNode, null); + oldValueNeedFree = false; + } else { + // replace the newest-version value. + oldValuePointer = updateValueWithReplace(currentNode, null); + oldValueNeedFree = true; + } + + helpSetNodeStatus(currentNode, NodeStatus.REMOVE); + logicallyRemovedNodes.add(currentNode); + } + + S oldState = null; + if (returnOldState) { + oldState = helpGetState(oldValuePointer); + } + + if (oldValueNeedFree) { + spaceAllocator.free(oldValuePointer); + } + + return oldState; + }); + return result.f2 ? result.f3 : null; + } + + /** +* Iterator the skip list and perform given function. +* +* @param keyByteBuffer byte buffer storing the key. +* @param keyOffset offset of the key. +* @param keyLen length of the key. +* @param function the function to apply when the skip list contains the given key, which accepts two +* parameters: a tuple3 of [previous_node, current_node, next_node] and a boolean indicating +* whether the node with same key has been logically removed, and returns a state. +* @return a tuple4 of [previous_node, current_node, key_found, state_by_applying_function] +*/ + private Tuple4 iterateAndProcess( Review comment: Does this method really iterates the nodes? It looks more like a `findNodeAndApply` because after we have found the entry, we stop the iteration. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about
[GitHub] [flink] tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend
tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend URL: https://github.com/apache/flink/pull/9501#discussion_r334022091 ## File path: flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMap.java ## @@ -472,6 +397,74 @@ S putNode(ByteBuffer keyByteBuffer, int keyOffset, int keyLen, byte[] value, boo * @return the old state. Null will be returned if key does not exist or returnOldState is false. */ private S removeNode(ByteBuffer keyByteBuffer, int keyOffset, int keyLen, boolean returnOldState) { + Tuple4 result = iterateAndProcess(keyByteBuffer, keyOffset, keyLen, + (tuple3, isRemoved) -> { Review comment: I would suggest to factor this method out in a separate method. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend
tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend URL: https://github.com/apache/flink/pull/9501#discussion_r334042929 ## File path: flink-state-backends/flink-statebackend-heap-spillable/src/test/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMapTest.java ## @@ -115,577 +144,783 @@ public void testInitStateMap() { assertFalse(stateMap.getStateIncrementalVisitor(100).hasNext()); stateMap.close(); - assertEquals(0, stateMap.size()); - assertEquals(0, stateMap.totalSize()); - assertTrue(stateMap.isClosed()); } /** -* Test basic operations. +* Test state put operation. */ @Test - public void testBasicOperations() throws Exception { - TypeSerializer keySerializer = IntSerializer.INSTANCE; - TypeSerializer namespaceSerializer = LongSerializer.INSTANCE; - TypeSerializer stateSerializer = StringSerializer.INSTANCE; - CopyOnWriteSkipListStateMap stateMap = new CopyOnWriteSkipListStateMap<>( - keySerializer, namespaceSerializer, stateSerializer, spaceAllocator); + public void testPutState() { + testWithFunction((totalSize, stateMap, referenceStates) -> getDefaultSizes(totalSize)); + } - ThreadLocalRandom random = ThreadLocalRandom.current(); - // map to store expected states, namespace -> key -> state - Map> referenceStates = new HashMap<>(); - int totalSize = 0; + /** +* Test remove existing state. +*/ + @Test + public void testRemoveExistingState() { + testRemoveState(false, false); + } - // put some states - for (long namespace = 0; namespace < 10; namespace++) { - for (int key = 0; key < 100; key++) { - totalSize++; - String state = String.valueOf(key * namespace); - if (random.nextBoolean()) { - stateMap.put(key, namespace, state); - } else { - assertNull(stateMap.putAndGetOld(key, namespace, state)); + /** +* Test remove and get existing state. +*/ + @Test + public void testRemoveAndGetExistingState() { + testRemoveState(false, true); + } + + /** +* Test remove absent state. +*/ + @Test + public void testRemoveAbsentState() { + testRemoveState(true, true); + } + + /** +* Test remove previously removed state. +*/ + @Test + public void testPutPreviouslyRemovedState() { + testWithFunction( + (totalSize, stateMap, referenceStates) -> applyFunctionAfterRemove(stateMap, referenceStates, + (removedCnt, removedStates) -> { + int size = totalSize - removedCnt; + for (Map.Entry> entry : removedStates.entrySet()) { + long namespace = entry.getKey(); + for (int key : entry.getValue()) { + size++; + String state = String.valueOf(key * namespace); + assertNull(stateMap.putAndGetOld(key, namespace, state)); + referenceStates.computeIfAbsent(namespace, (none) -> new HashMap<>()).put(key, String.valueOf(state)); + } + } + return getDefaultSizes(size); } - referenceStates.computeIfAbsent(namespace, (none) -> new HashMap<>()).put(key, state); - assertEquals(totalSize, stateMap.size()); - assertEquals(totalSize, stateMap.totalSize()); - } - } + ) + ); + } - // validates space allocation. Each pair need 2 spaces - assertEquals(totalSize * 2, spaceAllocator.getTotalSpaceNumber()); - verifyState(referenceStates, stateMap); + private void testRemoveState(boolean removeAbsent, boolean getOld) { + testWithFunction( + (totalSize, stateMap, referenceStates) -> { + if (removeAbsent) { +
[GitHub] [flink] tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend
tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend URL: https://github.com/apache/flink/pull/9501#discussion_r334025401 ## File path: flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMap.java ## @@ -590,10 +542,9 @@ private int compareByteBufferAndNode(ByteBuffer keyByteBuffer, int keyOffset, in * equal to, or greater than the second. */ private int compareNamespaceAndNode(ByteBuffer namespaceByteBuffer, int namespaceOffset, int namespaceLen, long targetNode) { - Chunk chunk = spaceAllocator.getChunkById(SpaceUtils.getChunkIdByAddress(targetNode)); - int offsetInChunk = SpaceUtils.getChunkOffsetByAddress(targetNode); - ByteBuffer targetKeyByteBuffer = chunk.getByteBuffer(offsetInChunk); - int offsetInByteBuffer = chunk.getOffsetInByteBuffer(offsetInChunk); + Tuple2 tuple2 = getNodeByteBufferAndOffset(targetNode); + ByteBuffer targetKeyByteBuffer = tuple2.f0; + int offsetInByteBuffer = tuple2.f1; Review comment: Hmm it looks a bit strange that we introduced `getNodeByteBufferAndOffset` which returns a `ByteBuffer` and its offset together just to unwrap it here and then passing it individually into `SkipListUtils.getLevel`. The same applies to the call into `getValuePointer`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services