[GitHub] [flink] TisonKun commented on issue #9861: [FLINK-14237][yarn] No need to rename shipped Flink jar

2019-10-11 Thread GitBox
TisonKun commented on issue #9861: [FLINK-14237][yarn] No need to rename 
shipped Flink jar
URL: https://github.com/apache/flink/pull/9861#issuecomment-540927126
 
 
   Thanks for your review @tillrohrmann ! I'd verify the change on a YARN 
cluster first and address comment while merging.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #9805: [FLINK-11635][docs-zh] translate dev/stream/state/checkpointing into Chinese

2019-10-11 Thread GitBox
flinkbot edited a comment on issue #9805: [FLINK-11635][docs-zh] translate 
dev/stream/state/checkpointing into Chinese
URL: https://github.com/apache/flink/pull/9805#issuecomment-536295524
 
 
   
   ## CI report:
   
   * 9ad039de6bbb953c7e2dd74b3118d80ae552f31e : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/129590285)
   * 5542dca83dbfb476114efb5aa2f2aa8d0626183a : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/129626942)
   * d4c5b15213eb89ee29880c7c8259622c9c7f9bb9 : UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #9831: [FLINK-14278] Extend DispatcherResourceManagerComponentFactory.create to take ioExecutor

2019-10-11 Thread GitBox
flinkbot edited a comment on issue #9831: [FLINK-14278] Extend 
DispatcherResourceManagerComponentFactory.create to take ioExecutor
URL: https://github.com/apache/flink/pull/9831#issuecomment-537027384
 
 
   
   ## CI report:
   
   * e1d37ab03c1bc62b3d45c45ee11d616ed2c78b0c : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/129858618)
   * ba5dc1be84ded55f91fad1698e0edb2ca7416add : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/130066271)
   * 56535ce31d6f51fc781b18b601e086ea5d0634d7 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/130858216)
   * 4ea8b42e4ce029ee7f74f28dd0006fe22abf5f44 : UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #9832: [FLINK-11843] Bind lifespan of Dispatcher to leader session

2019-10-11 Thread GitBox
flinkbot edited a comment on issue #9832: [FLINK-11843] Bind lifespan of 
Dispatcher to leader session
URL: https://github.com/apache/flink/pull/9832#issuecomment-537049332
 
 
   
   ## CI report:
   
   * abaae048fef753455970fac9d6ab421b660b0536 : UNKNOWN
   * b96c63552ccd322adae7a41a410615e95b538ece : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/129867595)
   * 2c95a3939dbf0259d694af6c69451f0ede3c3891 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/130066310)
   * 6e030add922011ea54178690f171911d0139f14b : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/130858235)
   * 271703eda6f6c55b1641a54206109ef659f62854 : UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #9805: [FLINK-11635][docs-zh] translate dev/stream/state/checkpointing into Chinese

2019-10-11 Thread GitBox
flinkbot edited a comment on issue #9805: [FLINK-11635][docs-zh] translate 
dev/stream/state/checkpointing into Chinese
URL: https://github.com/apache/flink/pull/9805#issuecomment-536295524
 
 
   
   ## CI report:
   
   * 9ad039de6bbb953c7e2dd74b3118d80ae552f31e : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/129590285)
   * 5542dca83dbfb476114efb5aa2f2aa8d0626183a : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/129626942)
   * d4c5b15213eb89ee29880c7c8259622c9c7f9bb9 : PENDING 
[Build](https://travis-ci.com/flink-ci/flink/builds/131454148)
   * a8394e9c2ba5aa8186e67895ad68a325f24a8a31 : UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #9879: [FLINK-14300][runtime, test] Add test making sure that RecordWriter is properly closed in case of early StreamTask failure

2019-10-11 Thread GitBox
zhijiangW commented on a change in pull request #9879: [FLINK-14300][runtime, 
test] Add test making sure that RecordWriter is properly closed in case of 
early StreamTask failure
URL: https://github.com/apache/flink/pull/9879#discussion_r333846291
 
 

 ##
 File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamConfigChainer.java
 ##
 @@ -109,14 +132,14 @@ private void head(StreamOperator headOperator, 
OperatorID headOperatorID) {
tailConfig.setStateKeySerializer(inputSerializer);
}
tailConfig.setChainIndex(chainIndex);
+   tailConfig.setBufferTimeout(bufferTimeout);
 
 Review comment:
   Why we change the previous timeout 0 here? And it would result in some unit 
tests failure in travis.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #9805: [FLINK-11635][docs-zh] translate dev/stream/state/checkpointing into Chinese

2019-10-11 Thread GitBox
flinkbot edited a comment on issue #9805: [FLINK-11635][docs-zh] translate 
dev/stream/state/checkpointing into Chinese
URL: https://github.com/apache/flink/pull/9805#issuecomment-536295524
 
 
   
   ## CI report:
   
   * 9ad039de6bbb953c7e2dd74b3118d80ae552f31e : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/129590285)
   * 5542dca83dbfb476114efb5aa2f2aa8d0626183a : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/129626942)
   * d4c5b15213eb89ee29880c7c8259622c9c7f9bb9 : CANCELED 
[Build](https://travis-ci.com/flink-ci/flink/builds/131454148)
   * a8394e9c2ba5aa8186e67895ad68a325f24a8a31 : PENDING 
[Build](https://travis-ci.com/flink-ci/flink/builds/131455928)
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-14318) JDK11 build stalls during shading

2019-10-11 Thread Yu Li (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14318?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yu Li updated FLINK-14318:
--
Labels: test-stability  (was: )

Another instance: https://api.travis-ci.org/v3/job/596081750/log.txt

> JDK11 build stalls during shading
> -
>
> Key: FLINK-14318
> URL: https://issues.apache.org/jira/browse/FLINK-14318
> Project: Flink
>  Issue Type: Bug
>  Components: Build System
>Affects Versions: 1.10.0
>Reporter: Gary Yao
>Priority: Critical
>  Labels: test-stability
>
> JDK11 build stalls during shading.
> Travis stage: e2d - misc - jdk11
> https://travis-ci.org/apache/flink/builds/593022581?utm_source=slack_medium=notification
> https://api.travis-ci.org/v3/job/593022629/log.txt
> Relevant excerpt from logs:
> {noformat}
> 01:53:43.889 [INFO] 
> 
> 01:53:43.889 [INFO] Building flink-metrics-reporter-prometheus-test 
> 1.10-SNAPSHOT
> 01:53:43.889 [INFO] 
> 
> ...
> 01:53:44.508 [INFO] Including 
> org.apache.flink:force-shading:jar:1.10-SNAPSHOT in the shaded jar.
> 01:53:44.508 [INFO] Excluding org.slf4j:slf4j-api:jar:1.7.15 from the shaded 
> jar.
> 01:53:44.508 [INFO] Excluding com.google.code.findbugs:jsr305:jar:1.3.9 from 
> the shaded jar.
> 01:53:44.508 [INFO] No artifact matching filter io.netty:netty
> 01:53:44.522 [INFO] Replacing original artifact with shaded artifact.
> 01:53:44.523 [INFO] Replacing 
> /home/travis/build/apache/flink/flink-end-to-end-tests/flink-metrics-reporter-prometheus-test/target/flink-metrics-reporter-prometheus-test-1.10-SNAPSHOT.jar
>  with 
> /home/travis/build/apache/flink/flink-end-to-end-tests/flink-metrics-reporter-prometheus-test/target/flink-metrics-reporter-prometheus-test-1.10-SNAPSHOT-shaded.jar
> 01:53:44.524 [INFO] Replacing original test artifact with shaded test 
> artifact.
> 01:53:44.524 [INFO] Replacing 
> /home/travis/build/apache/flink/flink-end-to-end-tests/flink-metrics-reporter-prometheus-test/target/flink-metrics-reporter-prometheus-test-1.10-SNAPSHOT-tests.jar
>  with 
> /home/travis/build/apache/flink/flink-end-to-end-tests/flink-metrics-reporter-prometheus-test/target/flink-metrics-reporter-prometheus-test-1.10-SNAPSHOT-shaded-tests.jar
> 01:53:44.524 [INFO] Dependency-reduced POM written at: 
> /home/travis/build/apache/flink/flink-end-to-end-tests/flink-metrics-reporter-prometheus-test/target/dependency-reduced-pom.xml
> No output has been received in the last 10m0s, this potentially indicates a 
> stalled build or something wrong with the build itself.
> Check the details on how to adjust your build configuration on: 
> https://docs.travis-ci.com/user/common-build-problems/#build-times-out-because-no-output-was-received
> The build has been terminated
> {noformat}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (FLINK-14212) Support Python UDFs without arguments

2019-10-11 Thread Hequn Cheng (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14212?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hequn Cheng reassigned FLINK-14212:
---

Assignee: Wei Zhong

> Support Python UDFs without arguments
> -
>
> Key: FLINK-14212
> URL: https://issues.apache.org/jira/browse/FLINK-14212
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / Python
>Reporter: Dian Fu
>Assignee: Wei Zhong
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.10.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> We should support Python UDFs without arguments 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] Henvealf commented on issue #9805: [FLINK-11635][docs-zh] translate dev/stream/state/checkpointing into Chinese

2019-10-11 Thread GitBox
Henvealf commented on issue #9805: [FLINK-11635][docs-zh] translate 
dev/stream/state/checkpointing into Chinese
URL: https://github.com/apache/flink/pull/9805#issuecomment-540922883
 
 
   > Thanks @wuchong, I do have other concerns.
   > 
   > I've left several comments, regarding translations that IMO are not easy 
to understand. These comments are marked 'resolved' by @Henvealf without 
addressed or any further explanation.
   
   I have commit the update.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Assigned] (FLINK-14208) Optimize Python UDFs with parameters of constant values

2019-10-11 Thread Hequn Cheng (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14208?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hequn Cheng reassigned FLINK-14208:
---

Assignee: Huang Xingbo

> Optimize Python UDFs with parameters of constant values
> ---
>
> Key: FLINK-14208
> URL: https://issues.apache.org/jira/browse/FLINK-14208
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / Python
>Reporter: Dian Fu
>Assignee: Huang Xingbo
>Priority: Major
> Fix For: 1.10.0
>
>
> We need support Python UDFs with parameters of constant values. It should be 
> noticed that the constant parameters are not needed to be transferred between 
> the Java operator and the Python worker.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] zhijiangW commented on a change in pull request #9879: [FLINK-14300][runtime, test] Add test making sure that RecordWriter is properly closed in case of early StreamTask failure

2019-10-11 Thread GitBox
zhijiangW commented on a change in pull request #9879: [FLINK-14300][runtime, 
test] Add test making sure that RecordWriter is properly closed in case of 
early StreamTask failure
URL: https://github.com/apache/flink/pull/9879#discussion_r333842749
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
 ##
 @@ -56,6 +57,10 @@
  */
 public abstract class RecordWriter {
 
+   /** Default name for teh output flush thread, if no name with a task 
reference is given. */
 
 Review comment:
   typo: teh?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #9879: [FLINK-14300][runtime, test] Add test making sure that RecordWriter is properly closed in case of early StreamTask failure

2019-10-11 Thread GitBox
zhijiangW commented on a change in pull request #9879: [FLINK-14300][runtime, 
test] Add test making sure that RecordWriter is properly closed in case of 
early StreamTask failure
URL: https://github.com/apache/flink/pull/9879#discussion_r333843383
 
 

 ##
 File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamConfigChainer.java
 ##
 @@ -42,22 +44,29 @@
 public class StreamConfigChainer {
private final StreamConfig headConfig;
private final Map chainedConfigs = new 
HashMap<>();
+   private final long bufferTimeout;
 
private StreamConfig tailConfig;
private int chainIndex = 0;
 
public StreamConfigChainer(OperatorID headOperatorID, StreamOperator 
headOperator, StreamConfig headConfig) {
+   this(headOperatorID, SimpleOperatorFactory.of(headOperator), 
headConfig);
+   }
+
+   public StreamConfigChainer(OperatorID headOperatorID, 
StreamOperatorFactory headOperatorFactory, StreamConfig headConfig) {
this.headConfig = checkNotNull(headConfig);
this.tailConfig = checkNotNull(headConfig);
+   this.bufferTimeout = headConfig.getBufferTimeout();
 
-   head(headOperator, headOperatorID);
+   head(headOperatorID, headOperatorFactory);
}
 
-   private void head(StreamOperator headOperator, OperatorID 
headOperatorID) {
-   headConfig.setStreamOperator(headOperator);
+   private void head(OperatorID headOperatorID, StreamOperatorFactory 
headOperatorFactory) {
+   headConfig.setStreamOperatorFactory(headOperatorFactory);
headConfig.setOperatorID(headOperatorID);
headConfig.setChainStart();
headConfig.setChainIndex(chainIndex);
+   headConfig.setBufferTimeout(bufferTimeout);
 
 Review comment:
   no need to set for head config? because the `bufferTimeout` is initialized 
via `headConfig.getBufferTimeout()` in constructor.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #9831: [FLINK-14278] Extend DispatcherResourceManagerComponentFactory.create to take ioExecutor

2019-10-11 Thread GitBox
flinkbot edited a comment on issue #9831: [FLINK-14278] Extend 
DispatcherResourceManagerComponentFactory.create to take ioExecutor
URL: https://github.com/apache/flink/pull/9831#issuecomment-537027384
 
 
   
   ## CI report:
   
   * e1d37ab03c1bc62b3d45c45ee11d616ed2c78b0c : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/129858618)
   * ba5dc1be84ded55f91fad1698e0edb2ca7416add : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/130066271)
   * 56535ce31d6f51fc781b18b601e086ea5d0634d7 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/130858216)
   * 4ea8b42e4ce029ee7f74f28dd0006fe22abf5f44 : PENDING 
[Build](https://travis-ci.com/flink-ci/flink/builds/131454163)
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #9832: [FLINK-11843] Bind lifespan of Dispatcher to leader session

2019-10-11 Thread GitBox
flinkbot edited a comment on issue #9832: [FLINK-11843] Bind lifespan of 
Dispatcher to leader session
URL: https://github.com/apache/flink/pull/9832#issuecomment-537049332
 
 
   
   ## CI report:
   
   * abaae048fef753455970fac9d6ab421b660b0536 : UNKNOWN
   * b96c63552ccd322adae7a41a410615e95b538ece : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/129867595)
   * 2c95a3939dbf0259d694af6c69451f0ede3c3891 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/130066310)
   * 6e030add922011ea54178690f171911d0139f14b : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/130858235)
   * 271703eda6f6c55b1641a54206109ef659f62854 : PENDING 
[Build](https://travis-ci.com/flink-ci/flink/builds/131454171)
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xintongsong commented on a change in pull request #9805: [FLINK-11635][docs-zh] translate dev/stream/state/checkpointing into Chinese

2019-10-11 Thread GitBox
xintongsong commented on a change in pull request #9805: [FLINK-11635][docs-zh] 
translate dev/stream/state/checkpointing into Chinese
URL: https://github.com/apache/flink/pull/9805#discussion_r333847561
 
 

 ##
 File path: docs/dev/stream/state/checkpointing.zh.md
 ##
 @@ -25,146 +25,138 @@ under the License.
 * ToC
 {:toc}
 
-Every function and operator in Flink can be **stateful** (see [working with 
state](state.html) for details).
-Stateful functions store data across the processing of individual 
elements/events, making state a critical building block for
-any type of more elaborate operation.
+Flink 中的每个方法或算子都能够是**有状态的**(阅读 [working with state](state.html) 查看详细)。
+状态化的方法在处理单个 元素/事件 的时候存储数据,让状态成为使各个类型的算子更加精细的重要部分。
+为了让状态容错,Flink 需要为状态添加**Checkpoint(检查点)**。Checkpoint 使得 Flink 
能够恢复状态和在流中的位置,从而向应用提供和无故障执行时一样的语义。
 
-In order to make state fault tolerant, Flink needs to **checkpoint** the 
state. Checkpoints allow Flink to recover state and positions
-in the streams to give the application the same semantics as a failure-free 
execution.
+[Documentation on streaming fault tolerance]({{ site.baseurl 
}}/zh/internals/stream_checkpointing.html) 介绍了 Flink 流计算容错机制的内部技术原理。
 
-The [documentation on streaming fault tolerance]({{ site.baseurl 
}}/internals/stream_checkpointing.html) describes in detail the technique 
behind Flink's streaming fault tolerance mechanism.
 
+## 前提条件
 
-## Prerequisites
+Flink 的 Checkpoint 机制会和持久化存储进行交互,交换流与状态。一般需要:
 
-Flink's checkpointing mechanism interacts with durable storage for streams and 
state. In general, it requires:
+  - 一个能够回放一段时间内数据的持久化数据源,例如持久化消息队列(例如 Apache Kafka、RabbitMQ、 Amazon Kinesis、 
Google PubSub 等)或文件系统(例如 HDFS、 S3、 GFS、 NFS、 Ceph 等)。
+  - 存放状态的持久化存储,通常为分布式文件系统(比如 HDFS、 S3、 GFS、 NFS、 Ceph 等)。
 
-  - A *persistent* (or *durable*) data source that can replay records for a 
certain amount of time. Examples for such sources are persistent messages 
queues (e.g., Apache Kafka, RabbitMQ, Amazon Kinesis, Google PubSub) or file 
systems (e.g., HDFS, S3, GFS, NFS, Ceph, ...).
-  - A persistent storage for state, typically a distributed filesystem (e.g., 
HDFS, S3, GFS, NFS, Ceph, ...)
+## 激活与配置 Checkpoint
 
+默认情况下,Checkpoint 是禁用的。通过调用 `StreamExecutionEnvironment` 的 
`enableCheckpointing(n)` 来激活 Checkpoint,里面的 *n* 是进行 Checkpoint 的间隔,单位毫秒。
 
-## Enabling and Configuring Checkpointing
+Checkpoint 其他的属性包括:
 
-By default, checkpointing is disabled. To enable checkpointing, call 
`enableCheckpointing(n)` on the `StreamExecutionEnvironment`, where *n* is the 
checkpoint interval in milliseconds.
-
-Other parameters for checkpointing include:
-
-  - *exactly-once vs. at-least-once*: You can optionally pass a mode to the 
`enableCheckpointing(n)` method to choose between the two guarantee levels.
-Exactly-once is preferable for most applications. At-least-once may be 
relevant for certain super-low-latency (consistently few milliseconds) 
applications.
-
-  - *checkpoint timeout*: The time after which a checkpoint-in-progress is 
aborted, if it did not complete by then.
-
-  - *minimum time between checkpoints*: To make sure that the streaming 
application makes a certain amount of progress between checkpoints,
-one can define how much time needs to pass between checkpoints. If this 
value is set for example to *5000*, the next checkpoint will be
-started no sooner than 5 seconds after the previous checkpoint completed, 
regardless of the checkpoint duration and the checkpoint interval.
-Note that this implies that the checkpoint interval will never be smaller 
than this parameter.
+  - *精确一次(exactly-once) 对比 至少一次(at-least-once)*:你可以选择向 
`enableCheckpointing(n)` 方法中传入一个模式来选择使用两种保证等级中的哪一种。
+对于大多数应用来说,精确一次是较好的选择。至少一次可能与某些延迟超低(始终只有几毫秒)的应用的关联较大。
+  
+  - *Checkpoint超时(checkpoint timeout)*:如果过了这个时间,还在进行中的 checkpoint 操作就会被抛弃。
+  
+  - *checkpoint 之间的最小时间(minimum time between checkpoints)*: 为了确保流应用在 
checkpoint 之间有足够的进展,可以定义在 checkpoint 之间需要多久的时间。如果值设置为了 *5000*,
+无论 checkpoint 持续时间与间隔是多久,在前一个 checkpoint 完成的五秒后才会开始下一个 checkpoint。
 
-It is often easier to configure applications by defining the "time between 
checkpoints" than the checkpoint interval, because the "time between 
checkpoints"
-is not susceptible to the fact that checkpoints may sometimes take longer 
than on average (for example if the target storage system is temporarily slow).
-
-Note that this value also implies that the number of concurrent 
checkpoints is *one*.
-
-  - *number of concurrent checkpoints*: By default, the system will not 
trigger another checkpoint while one is still in progress.
-This ensures that the topology does not spend too much time on checkpoints 
and not make progress with processing the streams.
-It is possible to allow for multiple overlapping checkpoints, which is 
interesting for pipelines that have a certain processing delay
-(for example because the functions call external services that need some 
time to respond) but 

[GitHub] [flink] xintongsong commented on a change in pull request #9805: [FLINK-11635][docs-zh] translate dev/stream/state/checkpointing into Chinese

2019-10-11 Thread GitBox
xintongsong commented on a change in pull request #9805: [FLINK-11635][docs-zh] 
translate dev/stream/state/checkpointing into Chinese
URL: https://github.com/apache/flink/pull/9805#discussion_r333847601
 
 

 ##
 File path: docs/dev/stream/state/checkpointing.zh.md
 ##
 @@ -25,146 +25,138 @@ under the License.
 * ToC
 {:toc}
 
-Every function and operator in Flink can be **stateful** (see [working with 
state](state.html) for details).
-Stateful functions store data across the processing of individual 
elements/events, making state a critical building block for
-any type of more elaborate operation.
+Flink 中的每个方法或算子都能够是**有状态的**(阅读 [working with state](state.html) 查看详细)。
+状态化的方法在处理单个 元素/事件 的时候存储数据,让状态成为使各个类型的算子更加精细的重要部分。
+为了让状态容错,Flink 需要为状态添加**Checkpoint(检查点)**。Checkpoint 使得 Flink 
能够恢复状态和在流中的位置,从而向应用提供和无故障执行时一样的语义。
 
-In order to make state fault tolerant, Flink needs to **checkpoint** the 
state. Checkpoints allow Flink to recover state and positions
-in the streams to give the application the same semantics as a failure-free 
execution.
+[Documentation on streaming fault tolerance]({{ site.baseurl 
}}/zh/internals/stream_checkpointing.html) 介绍了 Flink 流计算容错机制的内部技术原理。
 
-The [documentation on streaming fault tolerance]({{ site.baseurl 
}}/internals/stream_checkpointing.html) describes in detail the technique 
behind Flink's streaming fault tolerance mechanism.
 
+## 前提条件
 
-## Prerequisites
+Flink 的 Checkpoint 机制会和持久化存储进行交互,交换流与状态。一般需要:
 
-Flink's checkpointing mechanism interacts with durable storage for streams and 
state. In general, it requires:
+  - 一个能够回放一段时间内数据的持久化数据源,例如持久化消息队列(例如 Apache Kafka、RabbitMQ、 Amazon Kinesis、 
Google PubSub 等)或文件系统(例如 HDFS、 S3、 GFS、 NFS、 Ceph 等)。
+  - 存放状态的持久化存储,通常为分布式文件系统(比如 HDFS、 S3、 GFS、 NFS、 Ceph 等)。
 
-  - A *persistent* (or *durable*) data source that can replay records for a 
certain amount of time. Examples for such sources are persistent messages 
queues (e.g., Apache Kafka, RabbitMQ, Amazon Kinesis, Google PubSub) or file 
systems (e.g., HDFS, S3, GFS, NFS, Ceph, ...).
-  - A persistent storage for state, typically a distributed filesystem (e.g., 
HDFS, S3, GFS, NFS, Ceph, ...)
+## 激活与配置 Checkpoint
 
+默认情况下,Checkpoint 是禁用的。通过调用 `StreamExecutionEnvironment` 的 
`enableCheckpointing(n)` 来激活 Checkpoint,里面的 *n* 是进行 Checkpoint 的间隔,单位毫秒。
 
-## Enabling and Configuring Checkpointing
+Checkpoint 其他的属性包括:
 
-By default, checkpointing is disabled. To enable checkpointing, call 
`enableCheckpointing(n)` on the `StreamExecutionEnvironment`, where *n* is the 
checkpoint interval in milliseconds.
-
-Other parameters for checkpointing include:
-
-  - *exactly-once vs. at-least-once*: You can optionally pass a mode to the 
`enableCheckpointing(n)` method to choose between the two guarantee levels.
-Exactly-once is preferable for most applications. At-least-once may be 
relevant for certain super-low-latency (consistently few milliseconds) 
applications.
-
-  - *checkpoint timeout*: The time after which a checkpoint-in-progress is 
aborted, if it did not complete by then.
-
-  - *minimum time between checkpoints*: To make sure that the streaming 
application makes a certain amount of progress between checkpoints,
-one can define how much time needs to pass between checkpoints. If this 
value is set for example to *5000*, the next checkpoint will be
-started no sooner than 5 seconds after the previous checkpoint completed, 
regardless of the checkpoint duration and the checkpoint interval.
-Note that this implies that the checkpoint interval will never be smaller 
than this parameter.
+  - *精确一次(exactly-once) 对比 至少一次(at-least-once)*:你可以选择向 
`enableCheckpointing(n)` 方法中传入一个模式来选择使用两种保证等级中的哪一种。
+对于大多数应用来说,精确一次是较好的选择。至少一次可能与某些延迟超低(始终只有几毫秒)的应用的关联较大。
+  
+  - *Checkpoint超时(checkpoint timeout)*:如果过了这个时间,还在进行中的 checkpoint 操作就会被抛弃。
+  
+  - *checkpoint 之间的最小时间(minimum time between checkpoints)*: 为了确保流应用在 
checkpoint 之间有足够的进展,可以定义在 checkpoint 之间需要多久的时间。如果值设置为了 *5000*,
+无论 checkpoint 持续时间与间隔是多久,在前一个 checkpoint 完成的五秒后才会开始下一个 checkpoint。
 
-It is often easier to configure applications by defining the "time between 
checkpoints" than the checkpoint interval, because the "time between 
checkpoints"
-is not susceptible to the fact that checkpoints may sometimes take longer 
than on average (for example if the target storage system is temporarily slow).
-
-Note that this value also implies that the number of concurrent 
checkpoints is *one*.
-
-  - *number of concurrent checkpoints*: By default, the system will not 
trigger another checkpoint while one is still in progress.
-This ensures that the topology does not spend too much time on checkpoints 
and not make progress with processing the streams.
-It is possible to allow for multiple overlapping checkpoints, which is 
interesting for pipelines that have a certain processing delay
-(for example because the functions call external services that need some 
time to respond) but 

[GitHub] [flink] flinkbot edited a comment on issue #9860: [FLINK-14331][runtime] Reset vertices right after they transition to terminated states

2019-10-11 Thread GitBox
flinkbot edited a comment on issue #9860: [FLINK-14331][runtime] Reset vertices 
right after they transition to terminated states
URL: https://github.com/apache/flink/pull/9860#issuecomment-539900648
 
 
   
   ## CI report:
   
   * 80c100513c089a9dd0930aa547383ae970c4e7f8 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/131094955)
   * 2de47e991caa74adea1792b6b9153dee94e46b95 : UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-13567) Avro Confluent Schema Registry nightly end-to-end test failed on Travis

2019-10-11 Thread Yu Li (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-13567?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yu Li updated FLINK-13567:
--
Priority: Blocker  (was: Critical)

bq. Would it make sense to disable this test for the moment and make this issue 
a blocker for 1.10?
+1, marking this as a blocker makes sense according to the failure frequency.

> Avro Confluent Schema Registry nightly end-to-end test failed on Travis
> ---
>
> Key: FLINK-13567
> URL: https://issues.apache.org/jira/browse/FLINK-13567
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka, Tests
>Affects Versions: 1.8.2, 1.9.0, 1.10.0
>Reporter: Till Rohrmann
>Priority: Blocker
>  Labels: test-stability
> Fix For: 1.10.0
>
>
> The {{Avro Confluent Schema Registry nightly end-to-end test}} failed on 
> Travis with
> {code}
> [FAIL] 'Avro Confluent Schema Registry nightly end-to-end test' failed after 
> 2 minutes and 11 seconds! Test exited with exit code 1
> No taskexecutor daemon (pid: 29044) is running anymore on 
> travis-job-b0823aec-c4ec-4d4b-8b59-e9f968de9501.
> No standalonesession daemon to stop on host 
> travis-job-b0823aec-c4ec-4d4b-8b59-e9f968de9501.
> rm: cannot remove 
> '/home/travis/build/apache/flink/flink-dist/target/flink-1.10-SNAPSHOT-bin/flink-1.10-SNAPSHOT/plugins':
>  No such file or directory
> {code}
> https://api.travis-ci.org/v3/job/567273939/log.txt



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14360) Flink on yarn should support obtain delegation tokens of multi hdfs namespaces

2019-10-11 Thread Shen Yinjie (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16949209#comment-16949209
 ] 

Shen Yinjie commented on FLINK-14360:
-

Hi ,[~karmagyz] ,We deploy flink on yarn with RM federation and multi hdfs 
namespaces and hdfs clusters,flink  would fail to connect to some hdfs 
namenodes  due to lack of delegation tokens.
We provide a configuration for users to set a list of hdfs scheme names.So that 
before flink appmaster start,it will obtain these delegation tokens for 
specified hdfs namenodes.

> Flink on yarn should support obtain delegation tokens of multi hdfs namespaces
> --
>
> Key: FLINK-14360
> URL: https://issues.apache.org/jira/browse/FLINK-14360
> Project: Flink
>  Issue Type: Improvement
>  Components: Deployment / YARN
>Reporter: Shen Yinjie
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> There's a scenario when deploy flink on yarn with multi hdfs cluster or hdfs 
> federation, Flink need to get delegation tokens of all the namespaces before 
> start appmaster.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] hequn8128 commented on a change in pull request #9865: [FLINK-14212][python] Support no-argument Python UDFs.

2019-10-11 Thread GitBox
hequn8128 commented on a change in pull request #9865: [FLINK-14212][python] 
Support no-argument Python UDFs.
URL: https://github.com/apache/flink/pull/9865#discussion_r333858302
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/ExpressionReductionRulesTest.scala
 ##
 @@ -46,4 +47,21 @@ class ExpressionReductionRulesTest extends TableTestBase {
 util.verifyPlan("SELECT myUdf(1) FROM MyTable")
   }
 
+  @Test
+  def testExpressionReductionWithPythonUDF(): Unit = {
+util.addFunction("PyUdf", MockedPythonUDFWithoutArguments)
+util.addFunction("MyUdf", Func1)
+util.verifyPlan("SELECT PyUdf(), MyUdf(1) FROM MyTable")
+  }
+}
+
+object MockedPythonUDFWithoutArguments extends ScalarFunction {
 
 Review comment:
   Check both deterministic and non-deterministic python udfs?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] hequn8128 commented on a change in pull request #9865: [FLINK-14212][python] Support no-argument Python UDFs.

2019-10-11 Thread GitBox
hequn8128 commented on a change in pull request #9865: [FLINK-14212][python] 
Support no-argument Python UDFs.
URL: https://github.com/apache/flink/pull/9865#discussion_r333842540
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/ExpressionReducer.scala
 ##
 @@ -62,6 +63,9 @@ class ExpressionReducer(
 
 val literals = constExprs.asScala.map(e => (e.getType.getSqlTypeName, 
e)).flatMap {
 
+  // skip expressions that contain python functions
 
 Review comment:
   Maybe it's better to be more specific that why we skip python functions. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] hequn8128 commented on a change in pull request #9865: [FLINK-14212][python] Support no-argument Python UDFs.

2019-10-11 Thread GitBox
hequn8128 commented on a change in pull request #9865: [FLINK-14212][python] 
Support no-argument Python UDFs.
URL: https://github.com/apache/flink/pull/9865#discussion_r333860456
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/plan/ExpressionReductionRulesTest.scala
 ##
 @@ -510,3 +528,14 @@ object DeterministicNullFunc extends ScalarFunction {
   def eval(): String = null
   override def isDeterministic = true
 }
+
+object MockedPythonUDFWithoutArguments extends ScalarFunction {
+
+  override def getLanguage: FunctionLanguage = FunctionLanguage.PYTHON
+
+  def eval(): Long = {
 
 Review comment:
   Simplify the eval to `def eval(): Long = 1L` directly? and change the class 
name to `DeterministicPythonFunc`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] hequn8128 commented on a change in pull request #9865: [FLINK-14212][python] Support no-argument Python UDFs.

2019-10-11 Thread GitBox
hequn8128 commented on a change in pull request #9865: [FLINK-14212][python] 
Support no-argument Python UDFs.
URL: https://github.com/apache/flink/pull/9865#discussion_r333854883
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/ExpressionReducer.scala
 ##
 @@ -120,59 +124,64 @@ class ExpressionReducer(
 var reducedIdx = 0
 while (i < constExprs.size()) {
   val unreduced = constExprs.get(i)
-  unreduced.getType.getSqlTypeName match {
-// we insert the original expression for object literals
-case SqlTypeName.ANY |
- SqlTypeName.ROW |
- SqlTypeName.ARRAY |
- SqlTypeName.MAP |
- SqlTypeName.MULTISET =>
-  reducedValues.add(unreduced)
-case SqlTypeName.VARCHAR | SqlTypeName.CHAR =>
-  val escapeVarchar = StringEscapeUtils
-
.escapeJava(safeToString(reduced.getField(reducedIdx).asInstanceOf[BinaryString]))
-  reducedValues.add(maySkipNullLiteralReduce(rexBuilder, 
escapeVarchar, unreduced))
-  reducedIdx += 1
-case SqlTypeName.VARBINARY | SqlTypeName.BINARY =>
-  val reducedValue = reduced.getField(reducedIdx)
-  val value = if (null != reducedValue) {
-new 
ByteString(reduced.getField(reducedIdx).asInstanceOf[Array[Byte]])
-  } else {
-reducedValue
-  }
-  reducedValues.add(maySkipNullLiteralReduce(rexBuilder, value, 
unreduced))
-  reducedIdx += 1
-case SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE =>
-  val value = if (!reduced.isNullAt(reducedIdx)) {
-val mills = reduced.getField(reducedIdx).asInstanceOf[Long]
-Long.box(SqlDateTimeUtils.timestampWithLocalZoneToTimestamp(
-  mills, TimeZone.getTimeZone(config.getLocalTimeZone)))
-  } else {
-null
-  }
-  reducedValues.add(maySkipNullLiteralReduce(rexBuilder, value, 
unreduced))
-  reducedIdx += 1
-case SqlTypeName.DECIMAL =>
-  val reducedValue = reduced.getField(reducedIdx)
-  val value = if (reducedValue != null) {
-reducedValue.asInstanceOf[Decimal].toBigDecimal
-  } else {
-reducedValue
-  }
-  reducedValues.add(maySkipNullLiteralReduce(rexBuilder, value, 
unreduced))
-  reducedIdx += 1
-case _ =>
-  val reducedValue = reduced.getField(reducedIdx)
-  // RexBuilder handle double literal incorrectly, convert it into 
BigDecimal manually
-  val value = if (reducedValue != null &&
-unreduced.getType.getSqlTypeName == SqlTypeName.DOUBLE) {
-new 
java.math.BigDecimal(reducedValue.asInstanceOf[Number].doubleValue())
-  } else {
-reducedValue
-  }
-
-  reducedValues.add(maySkipNullLiteralReduce(rexBuilder, value, 
unreduced))
-  reducedIdx += 1
+  if (PythonUtil.containsFunctionOf(unreduced, FunctionLanguage.PYTHON)) {
 
 Review comment:
   Rex tree traverse is expensive. We traverse twice in this reduce() method. 
How about do some reuse here, e.g., 
   ```
   val jvmUdfs = constExprs.asScala.zipWithIndex
 .filter(p => PythonUtil.containsFunctionOf(p._1, FunctionLanguage.JVM))
   ...
   
if (!jvmUdfIndexes.contains(i)) {
   // if contains python function then just insert the original 
expression.
   reducedValues.add(unreduced)
 }
   ```
   In this way, we can reduce some optimization time. What do you think?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Comment Edited] (FLINK-14360) Flink on yarn should support obtain delegation tokens of multi hdfs namespaces

2019-10-11 Thread Yangze Guo (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16949216#comment-16949216
 ] 

Yangze Guo edited comment on FLINK-14360 at 10/11/19 7:37 AM:
--

cc [~wangyang] Could you take a look at this ticket?


was (Author: karmagyz):
cc [~taoyang] [~yangwang166] Could you take a look at this ticket?

> Flink on yarn should support obtain delegation tokens of multi hdfs namespaces
> --
>
> Key: FLINK-14360
> URL: https://issues.apache.org/jira/browse/FLINK-14360
> Project: Flink
>  Issue Type: Improvement
>  Components: Deployment / YARN
>Reporter: Shen Yinjie
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> There's a scenario when deploy flink on yarn with multi hdfs cluster or hdfs 
> federation, Flink need to get delegation tokens of all the namespaces before 
> start appmaster.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-14366) Annotate MiniCluster tests in flink-tests with AlsoRunWithSchedulerNG

2019-10-11 Thread Zhu Zhu (Jira)
Zhu Zhu created FLINK-14366:
---

 Summary: Annotate MiniCluster tests in flink-tests with 
AlsoRunWithSchedulerNG
 Key: FLINK-14366
 URL: https://issues.apache.org/jira/browse/FLINK-14366
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination, Tests
Affects Versions: 1.10.0
Reporter: Zhu Zhu
 Fix For: 1.10.0


This task is to annotate all MiniCluster tests with AlsoRunWithSchedulerNG in 
flink-tests, so that we can know breaking changes in time when further 
improving the new generation scheduler.

We should also guarantee the annotated tests to pass, either by fixing failed 
tests, or not annotating a failed test and opening a ticket to track it.
The tickets for failed tests should be linked in this task.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14360) Flink on yarn should support obtain delegation tokens of multi hdfs namespaces

2019-10-11 Thread Yangze Guo (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16949216#comment-16949216
 ] 

Yangze Guo commented on FLINK-14360:


cc [~taoyang] [~yangwang166] Could you take a look at this ticket?

> Flink on yarn should support obtain delegation tokens of multi hdfs namespaces
> --
>
> Key: FLINK-14360
> URL: https://issues.apache.org/jira/browse/FLINK-14360
> Project: Flink
>  Issue Type: Improvement
>  Components: Deployment / YARN
>Reporter: Shen Yinjie
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> There's a scenario when deploy flink on yarn with multi hdfs cluster or hdfs 
> federation, Flink need to get delegation tokens of all the namespaces before 
> start appmaster.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-14360) Flink on yarn should support obtain delegation tokens of multi hdfs namespaces

2019-10-11 Thread Yangze Guo (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16949216#comment-16949216
 ] 

Yangze Guo edited comment on FLINK-14360 at 10/11/19 7:40 AM:
--

cc [~fly_in_gis] Could you take a look at this ticket?


was (Author: karmagyz):
cc [~wangyang] Could you take a look at this ticket?

> Flink on yarn should support obtain delegation tokens of multi hdfs namespaces
> --
>
> Key: FLINK-14360
> URL: https://issues.apache.org/jira/browse/FLINK-14360
> Project: Flink
>  Issue Type: Improvement
>  Components: Deployment / YARN
>Reporter: Shen Yinjie
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> There's a scenario when deploy flink on yarn with multi hdfs cluster or hdfs 
> federation, Flink need to get delegation tokens of all the namespaces before 
> start appmaster.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot edited a comment on issue #9824: [FLINK-14302] FlinkKafkaInternalProducer should not send `ADD_PARTITIONS_TO_TXN` request if `newPartitionsInTransaction` is empty when enable

2019-10-11 Thread GitBox
flinkbot edited a comment on issue #9824: [FLINK-14302] 
FlinkKafkaInternalProducer should not send `ADD_PARTITIONS_TO_TXN` request if 
`newPartitionsInTransaction` is empty when enable EoS
URL: https://github.com/apache/flink/pull/9824#issuecomment-536928428
 
 
   
   ## CI report:
   
   * 87ae788d6490da7af5284c404648647c9919a6df : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/129820284)
   * c4b5a18b039a90aa6b313c43f423622536bc8cd4 : UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] WeiZhong94 commented on issue #9865: [FLINK-14212][python] Support no-argument Python UDFs.

2019-10-11 Thread GitBox
WeiZhong94 commented on issue #9865: [FLINK-14212][python] Support no-argument 
Python UDFs.
URL: https://github.com/apache/flink/pull/9865#issuecomment-540956430
 
 
   @hequn8128 Thanks for your feedback! Dian Fu and me has talked about these 
two approach and we come to an agreement that skip the optimization in 
`ExpressionReducer` and optimize the UDFs during runtime. Our thoughts are as 
follows:
   1. How to optimize Python UDFs during runtime?
   After support constant parameters in Python UDF(see [this 
PR](https://github.com/apache/flink/pull/9858)), we can do this optimization 
when evaluating the chained Python UDFs in python worker: 
   If the evaluated Python UDF is deterministic and has no argument or its 
arguments are all constant value, which means it will always return a constant 
value, replace it with the constant result value.
   This rule can be applied recursively until all the deterministic UDFs with 
constant inputs have been replaced. If the root nodes of the chained Python 
UDFs become constant values, we can only transmit them only once and replace 
them with None in following transmission to save IO resource. The Java operator 
also knows which fields of the evaluated result should be constant value rather 
than None because the reduce rule can be applied in Java side too. No 
additional interaction between Java operator and Python worker is needed in 
this design.
   2. Why not optimize Python UDFs during optimization phase?
   Reducing Python UDFs in optimization phase is not a easy work in current 
design. It means the generated Java wrappers of Python UDFs can be evaluated 
and return the correct result. In other word we need run Python UDFs in client 
side, but the Python UDFs is designed to run on cluster whose python 
environment may different from client side after we introduce environment and 
dependency management in the future. To solve the environment problem, we need 
to prepare a python environment that is the same as the python environment on 
cluster before reducing Python UDFs. To evaluate the Python UDF, we need to 
implement a new UDF runner which does not depend on Apache Beam(the client side 
of Flink Python API does not depend on Apache Beam). We know if we complete 
these it will be a perfect solution of this problem, but it is too expensive 
compared to runtime optimization.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #9831: [FLINK-14278] Extend DispatcherResourceManagerComponentFactory.create to take ioExecutor

2019-10-11 Thread GitBox
flinkbot edited a comment on issue #9831: [FLINK-14278] Extend 
DispatcherResourceManagerComponentFactory.create to take ioExecutor
URL: https://github.com/apache/flink/pull/9831#issuecomment-537027384
 
 
   
   ## CI report:
   
   * e1d37ab03c1bc62b3d45c45ee11d616ed2c78b0c : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/129858618)
   * ba5dc1be84ded55f91fad1698e0edb2ca7416add : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/130066271)
   * 56535ce31d6f51fc781b18b601e086ea5d0634d7 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/130858216)
   * 4ea8b42e4ce029ee7f74f28dd0006fe22abf5f44 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/131454163)
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-7151) Add a basic function SQL DDL

2019-10-11 Thread Timo Walther (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-7151?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16949235#comment-16949235
 ] 

Timo Walther commented on FLINK-7151:
-

Thank you [~ZhenqiuHuang]. We also have a discussion about giving properties to 
functions in the ML discussion "[DISCUSS] FLIP-64: Support for Temporary 
Objects in Table module". I think Postgres also has a {{WITH}} clause for this 
reason which aligns nicely to our {{CREATE TABLE xxx () WITH (...)}} clause.

> Add a basic function SQL DDL
> 
>
> Key: FLINK-7151
> URL: https://issues.apache.org/jira/browse/FLINK-7151
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: yuemeng
>Assignee: Zhenqiu Huang
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.10.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Based on create function and table.we can register a udf,udaf,udtf use sql:
> {code}
> CREATE FUNCTION [IF NOT EXISTS] [catalog_name.db_name.]function_name AS 
> class_name;
> DROP FUNCTION [IF EXISTS] [catalog_name.db_name.]function_name;
> ALTER FUNCTION [IF EXISTS] [catalog_name.db_name.]function_name RENAME TO 
> new_name;
> {code}
> {code}
> CREATE function 'TOPK' AS 
> 'com..aggregate.udaf.distinctUdaf.topk.ITopKUDAF';
> INSERT INTO db_sink SELECT id, TOPK(price, 5, 'DESC') FROM kafka_source GROUP 
> BY id;
> {code}
> This ticket can assume that the function class is already loaded in classpath 
> by users. Advanced syntax like to how to dynamically load udf libraries from 
> external locations can be on a separate ticket.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14364) Allow comments fail when not ignore parse errors in CsvRowDeserializationSchema

2019-10-11 Thread Jingsong Lee (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14364?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16949243#comment-16949243
 ] 

Jingsong Lee commented on FLINK-14364:
--

{code:java}
String string = "#Test,12,Test";
final TypeInformation rowInfo = Types.ROW(Types.STRING, Types.INT, 
Types.STRING);
final CsvRowDeserializationSchema.Builder deserSchemaBuilder = new 
CsvRowDeserializationSchema.Builder(rowInfo)
  .setIgnoreParseErrors(false)
  .setAllowComments(true);
System.out.println(deserialize(deserSchemaBuilder, string));
{code}
Here is example~

> Allow comments fail when not ignore parse errors in 
> CsvRowDeserializationSchema
> ---
>
> Key: FLINK-14364
> URL: https://issues.apache.org/jira/browse/FLINK-14364
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Reporter: Jingsong Lee
>Priority: Major
> Fix For: 1.10.0
>
>
> Use CsvRowDeserializationSchema, when setIgnoreParseErrors(false) and 
> setAllowComments(true).
> If there are some comments in msg, will throw MismatchedInputException.
> If this a bug? and we should catch MismatchedInputException and return null?
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14356) Introduce "single-field" format to (de)serialize message to a single field

2019-10-11 Thread Timo Walther (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14356?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16949244#comment-16949244
 ] 

Timo Walther commented on FLINK-14356:
--

If we go for STRING as well, I suggest to implement at least: 
CHAR/VARCHAR/BINARY/VARBINARY/TINYINT/INT/SMALLINT/BIGINT. The implementation 
effort is not very big. 

> Introduce "single-field" format to (de)serialize message to a single field
> --
>
> Key: FLINK-14356
> URL: https://issues.apache.org/jira/browse/FLINK-14356
> Project: Flink
>  Issue Type: Improvement
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile), Table 
> SQL / API
>Reporter: jinfeng
>Assignee: jinfeng
>Priority: Major
>
> I want to use flink sql to write kafka messages directly to hdfs. The 
> serialization and deserialization of messages are not involved in the middle. 
>  The bytes of the message directly convert the first field of Row.  However, 
> the current RowSerializationSchema does not support the conversion of bytes 
> to VARBINARY. Can we add some special RowSerializationSchema and 
> RowDerializationSchema ? 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14364) Allow comments fail when not ignore parse errors in CsvRowDeserializationSchema

2019-10-11 Thread Timo Walther (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14364?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16949247#comment-16949247
 ] 

Timo Walther commented on FLINK-14364:
--

Yes, this seems like a bug. It should be null. Jackson should not throw an 
exception here.

> Allow comments fail when not ignore parse errors in 
> CsvRowDeserializationSchema
> ---
>
> Key: FLINK-14364
> URL: https://issues.apache.org/jira/browse/FLINK-14364
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Reporter: Jingsong Lee
>Priority: Major
> Fix For: 1.10.0
>
>
> Use CsvRowDeserializationSchema, when setIgnoreParseErrors(false) and 
> setAllowComments(true).
> If there are some comments in msg, will throw MismatchedInputException.
> If this a bug? and we should catch MismatchedInputException and return null?
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot edited a comment on issue #9805: [FLINK-11635][docs-zh] translate dev/stream/state/checkpointing into Chinese

2019-10-11 Thread GitBox
flinkbot edited a comment on issue #9805: [FLINK-11635][docs-zh] translate 
dev/stream/state/checkpointing into Chinese
URL: https://github.com/apache/flink/pull/9805#issuecomment-536295524
 
 
   
   ## CI report:
   
   * 9ad039de6bbb953c7e2dd74b3118d80ae552f31e : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/129590285)
   * 5542dca83dbfb476114efb5aa2f2aa8d0626183a : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/129626942)
   * d4c5b15213eb89ee29880c7c8259622c9c7f9bb9 : CANCELED 
[Build](https://travis-ci.com/flink-ci/flink/builds/131454148)
   * a8394e9c2ba5aa8186e67895ad68a325f24a8a31 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/131455928)
   * b9268914cafdf59bd10cc47fb04616b926f55612 : PENDING 
[Build](https://travis-ci.com/flink-ci/flink/builds/131461987)
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] 1u0 commented on a change in pull request #9870: [FLINK-14350][metrics] Introduce dedicated MetricScope

2019-10-11 Thread GitBox
1u0 commented on a change in pull request #9870: [FLINK-14350][metrics] 
Introduce dedicated MetricScope
URL: https://github.com/apache/flink/pull/9870#discussion_r333877706
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/FrontMetricGroup.java
 ##
 @@ -30,28 +34,54 @@
  */
 public class FrontMetricGroup> extends 
ProxyMetricGroup {
 
-   protected int reporterIndex;
+   private final ReporterIndexInjectingMetricScope scope;
 
public FrontMetricGroup(int reporterIndex, P reference) {
super(reference);
-   this.reporterIndex = reporterIndex;
+   this.scope = new 
ReporterIndexInjectingMetricScope(reporterIndex, 
this.parentMetricGroup.getScope());
}
 
@Override
public String getMetricIdentifier(String metricName) {
-   return parentMetricGroup.getMetricIdentifier(metricName, null, 
this.reporterIndex);
+   return scope.getMetricIdentifier(metricName);
}
 
@Override
public String getMetricIdentifier(String metricName, CharacterFilter 
filter) {
-   return parentMetricGroup.getMetricIdentifier(metricName, 
filter, this.reporterIndex);
+   return scope.getMetricIdentifier(metricName, filter);
}
 
public String getLogicalScope(CharacterFilter filter) {
return parentMetricGroup.getLogicalScope(filter);
}
 
public String getLogicalScope(CharacterFilter filter, char delimiter) {
-   return parentMetricGroup.getLogicalScope(filter, delimiter, 
this.reporterIndex);
+   return parentMetricGroup.getLogicalScope(filter, delimiter);
 
 Review comment:
   **NB:** If I understand correctly, without `reporterIndex`, you are making 
this call non cached.
   This would impact InfluxDB, Jmx and Prometheus reporters. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] 1u0 commented on a change in pull request #9870: [FLINK-14350][metrics] Introduce dedicated MetricScope

2019-10-11 Thread GitBox
1u0 commented on a change in pull request #9870: [FLINK-14350][metrics] 
Introduce dedicated MetricScope
URL: https://github.com/apache/flink/pull/9870#discussion_r333874262
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/scope/InternalMetricScope.java
 ##
 @@ -0,0 +1,89 @@
+package org.apache.flink.runtime.metrics.scope;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.metrics.CharacterFilter;
+import org.apache.flink.metrics.MetricScope;
+import org.apache.flink.runtime.metrics.DelimiterProvider;
+
+import java.util.Map;
+import java.util.function.Supplier;
+
+/**
+ * Default scope implementation. Contains additional methods assembling 
identifiers based on reporter-specific delimiters.
+ */
+@Internal
+public class InternalMetricScope implements MetricScope {
+
+   private final DelimiterProvider delimiterProvider;
+   private final Supplier> variablesProvider;
+
+   /**
+* The map containing all variables and their associated values, lazily 
computed.
+*/
+   protected volatile Map variables;
+
+   /**
+* The metrics scope represented by this group.
+* For example ["host-7", "taskmanager-2", "window_word_count", 
"my-mapper" ].
+*/
+   private final String[] scopeComponents;
+
+   /**
+* Array containing the metrics scope represented by this group for 
each reporter, as a concatenated string, lazily computed.
+* For example: "host-7.taskmanager-2.window_word_count.my-mapper"
+*/
+   private final String[] scopeStrings;
+
+   public InternalMetricScope(DelimiterProvider delimiterProvider, 
String[] scopeComponents, Supplier> variablesProvider) {
+   this.delimiterProvider = delimiterProvider;
+   this.variablesProvider = variablesProvider;
+   this.scopeComponents = scopeComponents;
+   this.scopeStrings = new 
String[delimiterProvider.getNumberReporters()];
+   }
+
+   @Override
+   public Map getAllVariables() {
+   if (variables == null) { // avoid synchronization for common 
case
+   synchronized (this) {
+   if (variables == null) {
+   variables = variablesProvider.get();
+   }
+   }
+   }
+   return variables;
+   }
+
+   public String[] geScopeComponents() {
+   return scopeStrings;
+   }
 
 Review comment:
   Also, you are returning `scopeStrings` here instead of `scopeComponents` (I 
hope this would be revealed by tests, once CI reaches them).
   
   Side note: I think a better name for `scopeStrings` would be something like 
`cachedIdentifierScope`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-8225) Use JsonRowDeserializationSchema without Kafka connector dependency

2019-10-11 Thread Sendoh (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-8225?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16949268#comment-16949268
 ] 

Sendoh commented on FLINK-8225:
---

cool. will close it

> Use JsonRowDeserializationSchema without Kafka connector dependency 
> 
>
> Key: FLINK-8225
> URL: https://issues.apache.org/jira/browse/FLINK-8225
> Project: Flink
>  Issue Type: Wish
>  Components: Table SQL / Ecosystem
>Reporter: Sendoh
>Priority: Minor
>
> Now when using JsonRowDeserializationSchema, user needs to add Kafka 
> connector dependency. Nevertheless JsonRowDeserializationSchema can be used 
> without using Kafka connector.
> AC:
> move JsonRowDeserializationSchema to a dedicated module
> Ref:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/the-location-of-JsonRowDeserializationSchema-java-td17063.html



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-8225) Use JsonRowDeserializationSchema without Kafka connector dependency

2019-10-11 Thread Sendoh (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-8225?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sendoh closed FLINK-8225.
-
Resolution: Implemented

> Use JsonRowDeserializationSchema without Kafka connector dependency 
> 
>
> Key: FLINK-8225
> URL: https://issues.apache.org/jira/browse/FLINK-8225
> Project: Flink
>  Issue Type: Wish
>  Components: Table SQL / Ecosystem
>Reporter: Sendoh
>Priority: Minor
>
> Now when using JsonRowDeserializationSchema, user needs to add Kafka 
> connector dependency. Nevertheless JsonRowDeserializationSchema can be used 
> without using Kafka connector.
> AC:
> move JsonRowDeserializationSchema to a dedicated module
> Ref:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/the-location-of-JsonRowDeserializationSchema-java-td17063.html



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] WeiZhong94 commented on issue #9865: [FLINK-14212][python] Support no-argument Python UDFs.

2019-10-11 Thread GitBox
WeiZhong94 commented on issue #9865: [FLINK-14212][python] Support no-argument 
Python UDFs.
URL: https://github.com/apache/flink/pull/9865#issuecomment-540973875
 
 
   @hequn8128 Thanks for your review! I have addressed your comments in the 
latest commit.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] WeiZhong94 commented on a change in pull request #9865: [FLINK-14212][python] Support no-argument Python UDFs.

2019-10-11 Thread GitBox
WeiZhong94 commented on a change in pull request #9865: [FLINK-14212][python] 
Support no-argument Python UDFs.
URL: https://github.com/apache/flink/pull/9865#discussion_r333886126
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/ExpressionReductionRulesTest.scala
 ##
 @@ -46,4 +47,21 @@ class ExpressionReductionRulesTest extends TableTestBase {
 util.verifyPlan("SELECT myUdf(1) FROM MyTable")
   }
 
+  @Test
+  def testExpressionReductionWithPythonUDF(): Unit = {
+util.addFunction("PyUdf", MockedPythonUDFWithoutArguments)
+util.addFunction("MyUdf", Func1)
+util.verifyPlan("SELECT PyUdf(), MyUdf(1) FROM MyTable")
+  }
+}
+
+object MockedPythonUDFWithoutArguments extends ScalarFunction {
 
 Review comment:
   IMO it's not necessary to check non-deterministic python UDFs in this rule 
test. My reason is:
   1. If the UDF is non-deterministic, it won't match this rule whether it is a 
Python UDF or not. This test case won't cover more code.
   2. We already have a ITCase about non-deterministic python UDF in 
`test_udf.py` to make sure the non-deterministic python UDFs work properly.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-14367) Fully support for converting RexNode to Expression

2019-10-11 Thread Jark Wu (Jira)
Jark Wu created FLINK-14367:
---

 Summary: Fully support for converting RexNode to Expression
 Key: FLINK-14367
 URL: https://issues.apache.org/jira/browse/FLINK-14367
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Planner
Reporter: Jark Wu


Currently, the {{RexNodeToExpressionConverter}} in both planners are not fully 
support. There're many RexNodes can not be converted to Expressions.

1) RexFieldAccess -> GET call
2) Literals in interval types and complex types
3) TRIM(BOTH/LEADING/HEADING, ..) 
...

We should have a comprehensive tests to cover all cases. A good idea is to 
verify with {{ExpressionConverter}} together. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-14368) KafkaProducerAtLeastOnceITCase>KafkaProducerTestBase.testCustomPartitioning failed on Travis

2019-10-11 Thread Till Rohrmann (Jira)
Till Rohrmann created FLINK-14368:
-

 Summary: 
KafkaProducerAtLeastOnceITCase>KafkaProducerTestBase.testCustomPartitioning 
failed on Travis
 Key: FLINK-14368
 URL: https://issues.apache.org/jira/browse/FLINK-14368
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Kafka
Affects Versions: 1.10.0
Reporter: Till Rohrmann


The 
{{KafkaProducerAtLeastOnceITCase>KafkaProducerTestBase.testCustomPartitioning}} 
fails on Travis with

{code}
Test 
testCustomPartitioning(org.apache.flink.streaming.connectors.kafka.KafkaProducerAtLeastOnceITCase)
 failed with:
java.lang.AssertionError: Create test topic : defaultTopic failed, 
org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node 
assignment.
at org.junit.Assert.fail(Assert.java:88)
at 
org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl.createTestTopic(KafkaTestEnvironmentImpl.java:180)
at 
org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironment.createTestTopic(KafkaTestEnvironment.java:115)
at 
org.apache.flink.streaming.connectors.kafka.KafkaTestBase.createTestTopic(KafkaTestBase.java:196)
at 
org.apache.flink.streaming.connectors.kafka.KafkaProducerTestBase.testCustomPartitioning(KafkaProducerTestBase.java:112)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
at org.junit.rules.RunRules.evaluate(RunRules.java:20)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at 
org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
at org.junit.rules.RunRules.evaluate(RunRules.java:20)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at 
org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:365)
at 
org.apache.maven.surefire.junit4.JUnit4Provider.executeWithRerun(JUnit4Provider.java:273)
at 
org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:238)
at 
org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:159)
at 
org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:384)
at 
org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:345)
at 
org.apache.maven.surefire.booter.ForkedBooter.execute(ForkedBooter.java:126)
at 
org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:418)
{code}

https://api.travis-ci.com/v3/job/244297223/log.txt



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-14369) KafkaProducerAtLeastOnceITCase>KafkaProducerTestBase.testOneToOneAtLeastOnceCustomOperator fails on Travis

2019-10-11 Thread Till Rohrmann (Jira)
Till Rohrmann created FLINK-14369:
-

 Summary: 
KafkaProducerAtLeastOnceITCase>KafkaProducerTestBase.testOneToOneAtLeastOnceCustomOperator
 fails on Travis
 Key: FLINK-14369
 URL: https://issues.apache.org/jira/browse/FLINK-14369
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Kafka
Affects Versions: 1.10.0
Reporter: Till Rohrmann


The 
{{KafkaProducerAtLeastOnceITCase>KafkaProducerTestBase.testOneToOneAtLeastOnceCustomOperator}}
 fails on Travis with 

{code}
Test 
testOneToOneAtLeastOnceCustomOperator(org.apache.flink.streaming.connectors.kafka.KafkaProducerAtLeastOnceITCase)
 failed with:
java.lang.AssertionError: Create test topic : oneToOneTopicCustomOperator 
failed, org.apache.kafka.common.errors.TimeoutException: Timed out waiting for 
a node assignment.
at org.junit.Assert.fail(Assert.java:88)
at 
org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl.createTestTopic(KafkaTestEnvironmentImpl.java:180)
at 
org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironment.createTestTopic(KafkaTestEnvironment.java:115)
at 
org.apache.flink.streaming.connectors.kafka.KafkaTestBase.createTestTopic(KafkaTestBase.java:196)
at 
org.apache.flink.streaming.connectors.kafka.KafkaProducerTestBase.testOneToOneAtLeastOnce(KafkaProducerTestBase.java:231)
at 
org.apache.flink.streaming.connectors.kafka.KafkaProducerTestBase.testOneToOneAtLeastOnceCustomOperator(KafkaProducerTestBase.java:214)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
at org.junit.rules.RunRules.evaluate(RunRules.java:20)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at 
org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
at org.junit.rules.RunRules.evaluate(RunRules.java:20)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at 
org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:365)
at 
org.apache.maven.surefire.junit4.JUnit4Provider.executeWithRerun(JUnit4Provider.java:273)
at 
org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:238)
at 
org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:159)
at 
org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:384)
at 
org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:345)
at 
org.apache.maven.surefire.booter.ForkedBooter.execute(ForkedBooter.java:126)
at 
org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:418)
{code}

https://api.travis-ci.com/v3/job/244297223/log.txt



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-14370) KafkaProducerAtLeastOnceITCase>KafkaProducerTestBase.testOneToOneAtLeastOnceRegularSink fails on Travis

2019-10-11 Thread Till Rohrmann (Jira)
Till Rohrmann created FLINK-14370:
-

 Summary: 
KafkaProducerAtLeastOnceITCase>KafkaProducerTestBase.testOneToOneAtLeastOnceRegularSink
 fails on Travis
 Key: FLINK-14370
 URL: https://issues.apache.org/jira/browse/FLINK-14370
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Kafka
Affects Versions: 1.10.0
Reporter: Till Rohrmann


The 
{{KafkaProducerAtLeastOnceITCase>KafkaProducerTestBase.testOneToOneAtLeastOnceRegularSink}}
 fails on Travis with

{code}
Test 
testOneToOneAtLeastOnceRegularSink(org.apache.flink.streaming.connectors.kafka.KafkaProducerAtLeastOnceITCase)
 failed with:
java.lang.AssertionError: Job should fail!
at org.junit.Assert.fail(Assert.java:88)
at 
org.apache.flink.streaming.connectors.kafka.KafkaProducerTestBase.testOneToOneAtLeastOnce(KafkaProducerTestBase.java:280)
at 
org.apache.flink.streaming.connectors.kafka.KafkaProducerTestBase.testOneToOneAtLeastOnceRegularSink(KafkaProducerTestBase.java:206)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
at org.junit.rules.RunRules.evaluate(RunRules.java:20)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at 
org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
at org.junit.rules.RunRules.evaluate(RunRules.java:20)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at 
org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:365)
at 
org.apache.maven.surefire.junit4.JUnit4Provider.executeWithRerun(JUnit4Provider.java:273)
at 
org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:238)
at 
org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:159)
at 
org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:384)
at 
org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:345)
at 
org.apache.maven.surefire.booter.ForkedBooter.execute(ForkedBooter.java:126)
at 
org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:418)
{code}

https://api.travis-ci.com/v3/job/244297223/log.txt



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot edited a comment on issue #9805: [FLINK-11635][docs-zh] translate dev/stream/state/checkpointing into Chinese

2019-10-11 Thread GitBox
flinkbot edited a comment on issue #9805: [FLINK-11635][docs-zh] translate 
dev/stream/state/checkpointing into Chinese
URL: https://github.com/apache/flink/pull/9805#issuecomment-536295524
 
 
   
   ## CI report:
   
   * 9ad039de6bbb953c7e2dd74b3118d80ae552f31e : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/129590285)
   * 5542dca83dbfb476114efb5aa2f2aa8d0626183a : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/129626942)
   * d4c5b15213eb89ee29880c7c8259622c9c7f9bb9 : CANCELED 
[Build](https://travis-ci.com/flink-ci/flink/builds/131454148)
   * a8394e9c2ba5aa8186e67895ad68a325f24a8a31 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/131455928)
   * b9268914cafdf59bd10cc47fb04616b926f55612 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/131461987)
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #9865: [FLINK-14212][python] Support no-argument Python UDFs.

2019-10-11 Thread GitBox
flinkbot edited a comment on issue #9865: [FLINK-14212][python] Support 
no-argument Python UDFs.
URL: https://github.com/apache/flink/pull/9865#issuecomment-539919841
 
 
   
   ## CI report:
   
   * bf1d566ea2f91d61c2f436bb92b5337088b7 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/131102433)
   * 3110f74ae60ed10bdb2bdbed7dd1facfde9fdeea : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/131266172)
   * f48b910ceee34ff7eb9f6f75d7782b49005c587f : CANCELED 
[Build](https://travis-ci.com/flink-ci/flink/builds/131275232)
   * c97adb28424b4518535c9922b011d7adcf5e842f : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/131279546)
   * 4edf1202d5b9c34f329ca29ed96430b69a5807cd : UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-14365) Annotate MiniCluster tests in core modules with AlsoRunWithSchedulerNG

2019-10-11 Thread Zhu Zhu (Jira)
Zhu Zhu created FLINK-14365:
---

 Summary: Annotate MiniCluster tests in core modules with 
AlsoRunWithSchedulerNG
 Key: FLINK-14365
 URL: https://issues.apache.org/jira/browse/FLINK-14365
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination
Affects Versions: 1.10.0
Reporter: Zhu Zhu
 Fix For: 1.10.0


This task is to annotate MiniCluster tests with AlsoRunWithSchedulerNG in flink 
core modules, so that we can know breaking changes in time when further 
improving the new generation scheduler.

Core modules are the basic flink modules as defined in {{MODULES_CORE}} in 
flink/travis/stage.sh.

MODULES_CORE="\
flink-annotations,\
flink-test-utils-parent/flink-test-utils,\
flink-state-backends/flink-statebackend-rocksdb,\
flink-clients,\
flink-core,\
flink-java,\
flink-optimizer,\
flink-runtime,\
flink-runtime-web,\
flink-scala,\
flink-streaming-java,\
flink-streaming-scala,\
flink-metrics,\
flink-metrics/flink-metrics-core"



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-14246) Annotate all MiniCluster tests in flink-runtime with AlsoRunWithSchedulerNG

2019-10-11 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14246?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu updated FLINK-14246:

Component/s: Tests

> Annotate all MiniCluster tests in flink-runtime with AlsoRunWithSchedulerNG
> ---
>
> Key: FLINK-14246
> URL: https://issues.apache.org/jira/browse/FLINK-14246
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination, Tests
>Affects Versions: 1.10.0
>Reporter: Zhu Zhu
>Assignee: Zhu Zhu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.10.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> This task is to annotate all MiniCluster tests with AlsoRunWithSchedulerNG in 
> flink-runtime, so that we can know breaking changes in time when further 
> improving the new generation scheduler. 
> We should also guarantee the annotated tests to pass, either by fixing failed 
> tests, or not annotating a failed test and opening a ticket to track it.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-14365) Annotate MiniCluster tests in core modules with AlsoRunWithSchedulerNG

2019-10-11 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14365?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu updated FLINK-14365:

Component/s: Tests

> Annotate MiniCluster tests in core modules with AlsoRunWithSchedulerNG
> --
>
> Key: FLINK-14365
> URL: https://issues.apache.org/jira/browse/FLINK-14365
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination, Tests
>Affects Versions: 1.10.0
>Reporter: Zhu Zhu
>Priority: Major
> Fix For: 1.10.0
>
>
> This task is to annotate MiniCluster tests with AlsoRunWithSchedulerNG in 
> flink core modules, so that we can know breaking changes in time when further 
> improving the new generation scheduler.
> Core modules are the basic flink modules as defined in {{MODULES_CORE}} in 
> flink/travis/stage.sh.
> MODULES_CORE="\
> flink-annotations,\
> flink-test-utils-parent/flink-test-utils,\
> flink-state-backends/flink-statebackend-rocksdb,\
> flink-clients,\
> flink-core,\
> flink-java,\
> flink-optimizer,\
> flink-runtime,\
> flink-runtime-web,\
> flink-scala,\
> flink-streaming-java,\
> flink-streaming-scala,\
> flink-metrics,\
> flink-metrics/flink-metrics-core"



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] Henvealf commented on issue #9805: [FLINK-11635][docs-zh] translate dev/stream/state/checkpointing into Chinese

2019-10-11 Thread GitBox
Henvealf commented on issue #9805: [FLINK-11635][docs-zh] translate 
dev/stream/state/checkpointing into Chinese
URL: https://github.com/apache/flink/pull/9805#issuecomment-540951519
 
 
   > Thank for addressing the comments, @Henvealf.
   > I find two occurrences of 'savepoint' which is suggested not to be 
translated according to the translation specifications. Besides that, I have no 
other concerns.
   
   Thanks! I have fixed this.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-14364) Allow comments fail when not ignore parse errors in CsvRowDeserializationSchema

2019-10-11 Thread Timo Walther (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14364?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16949221#comment-16949221
 ] 

Timo Walther commented on FLINK-14364:
--

If {{setIgnoreParseErrors}} is set to {{false}}, it is fine to throw an 
exception. But if you enabled comments, there should be no exception. Can you 
provide an example that causes this error?

> Allow comments fail when not ignore parse errors in 
> CsvRowDeserializationSchema
> ---
>
> Key: FLINK-14364
> URL: https://issues.apache.org/jira/browse/FLINK-14364
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Reporter: Jingsong Lee
>Priority: Major
> Fix For: 1.10.0
>
>
> Use CsvRowDeserializationSchema, when setIgnoreParseErrors(false) and 
> setAllowComments(true).
> If there are some comments in msg, will throw MismatchedInputException.
> If this a bug? and we should catch MismatchedInputException and return null?
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot edited a comment on issue #9805: [FLINK-11635][docs-zh] translate dev/stream/state/checkpointing into Chinese

2019-10-11 Thread GitBox
flinkbot edited a comment on issue #9805: [FLINK-11635][docs-zh] translate 
dev/stream/state/checkpointing into Chinese
URL: https://github.com/apache/flink/pull/9805#issuecomment-536295524
 
 
   
   ## CI report:
   
   * 9ad039de6bbb953c7e2dd74b3118d80ae552f31e : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/129590285)
   * 5542dca83dbfb476114efb5aa2f2aa8d0626183a : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/129626942)
   * d4c5b15213eb89ee29880c7c8259622c9c7f9bb9 : CANCELED 
[Build](https://travis-ci.com/flink-ci/flink/builds/131454148)
   * a8394e9c2ba5aa8186e67895ad68a325f24a8a31 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/131455928)
   * b9268914cafdf59bd10cc47fb04616b926f55612 : UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #9832: [FLINK-11843] Bind lifespan of Dispatcher to leader session

2019-10-11 Thread GitBox
flinkbot edited a comment on issue #9832: [FLINK-11843] Bind lifespan of 
Dispatcher to leader session
URL: https://github.com/apache/flink/pull/9832#issuecomment-537049332
 
 
   
   ## CI report:
   
   * abaae048fef753455970fac9d6ab421b660b0536 : UNKNOWN
   * b96c63552ccd322adae7a41a410615e95b538ece : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/129867595)
   * 2c95a3939dbf0259d694af6c69451f0ede3c3891 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/130066310)
   * 6e030add922011ea54178690f171911d0139f14b : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/130858235)
   * 271703eda6f6c55b1641a54206109ef659f62854 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/131454171)
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #9860: [FLINK-14331][runtime] Reset vertices right after they transition to terminated states

2019-10-11 Thread GitBox
flinkbot edited a comment on issue #9860: [FLINK-14331][runtime] Reset vertices 
right after they transition to terminated states
URL: https://github.com/apache/flink/pull/9860#issuecomment-539900648
 
 
   
   ## CI report:
   
   * 80c100513c089a9dd0930aa547383ae970c4e7f8 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/131094955)
   * 2de47e991caa74adea1792b6b9153dee94e46b95 : PENDING 
[Build](https://travis-ci.com/flink-ci/flink/builds/131457976)
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #9877: [FLINK-14360][yarn] flink on yarn support obtain delegation token for multi hdfs.

2019-10-11 Thread GitBox
flinkbot edited a comment on issue #9877: [FLINK-14360][yarn] flink on yarn 
support obtain delegation token for multi hdfs.
URL: https://github.com/apache/flink/pull/9877#issuecomment-540485056
 
 
   
   ## CI report:
   
   * fdeaa9ae3a6e7f7f9ab22ca60d17bfae4cd24670 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/131283573)
   * 084b311d28ff32d5a82fa56d4077b2a30a5c6f64 : UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] KarmaGYZ commented on a change in pull request #9873: [FLINK-14336][DataStream] Log Exception for failed checkpoint on TaskExecutor side

2019-10-11 Thread GitBox
KarmaGYZ commented on a change in pull request #9873: [FLINK-14336][DataStream] 
Log Exception for failed checkpoint on TaskExecutor side
URL: https://github.com/apache/flink/pull/9873#discussion_r333869118
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 ##
 @@ -1115,6 +1115,12 @@ public void run() {

checkpointMetaData.getCheckpointId());
}
} catch (Exception e) {
+   if (LOG.isDebugEnabled()) {
+   LOG.debug("{} - asynchronous part of 
checkpoint {} could not be completed.",
 
 Review comment:
   Make sense. +1 for merge.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] Myasuka commented on issue #9386: [FLINK-13601][tests] Harden RegionFailoverITCase by recording info when checkpoint just completed

2019-10-11 Thread GitBox
Myasuka commented on issue #9386: [FLINK-13601][tests] Harden 
RegionFailoverITCase by recording info when checkpoint just completed
URL: https://github.com/apache/flink/pull/9386#issuecomment-540963010
 
 
   @zentol Would you please take a look again at this PR?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #9824: [FLINK-14302] FlinkKafkaInternalProducer should not send `ADD_PARTITIONS_TO_TXN` request if `newPartitionsInTransaction` is empty when enable

2019-10-11 Thread GitBox
flinkbot edited a comment on issue #9824: [FLINK-14302] 
FlinkKafkaInternalProducer should not send `ADD_PARTITIONS_TO_TXN` request if 
`newPartitionsInTransaction` is empty when enable EoS
URL: https://github.com/apache/flink/pull/9824#issuecomment-536928428
 
 
   
   ## CI report:
   
   * 87ae788d6490da7af5284c404648647c9919a6df : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/129820284)
   * c4b5a18b039a90aa6b313c43f423622536bc8cd4 : PENDING 
[Build](https://travis-ci.com/flink-ci/flink/builds/131462013)
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #9860: [FLINK-14331][runtime] Reset vertices right after they transition to terminated states

2019-10-11 Thread GitBox
flinkbot edited a comment on issue #9860: [FLINK-14331][runtime] Reset vertices 
right after they transition to terminated states
URL: https://github.com/apache/flink/pull/9860#issuecomment-539900648
 
 
   
   ## CI report:
   
   * 80c100513c089a9dd0930aa547383ae970c4e7f8 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/131094955)
   * 2de47e991caa74adea1792b6b9153dee94e46b95 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/131457976)
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #9877: [FLINK-14360][yarn] flink on yarn support obtain delegation token for multi hdfs.

2019-10-11 Thread GitBox
flinkbot edited a comment on issue #9877: [FLINK-14360][yarn] flink on yarn 
support obtain delegation token for multi hdfs.
URL: https://github.com/apache/flink/pull/9877#issuecomment-540485056
 
 
   
   ## CI report:
   
   * fdeaa9ae3a6e7f7f9ab22ca60d17bfae4cd24670 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/131283573)
   * 084b311d28ff32d5a82fa56d4077b2a30a5c6f64 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/131462022)
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-14359) Create a module called flink-sql-connector-hbase to shade HBase

2019-10-11 Thread Zheng Hu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14359?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16949271#comment-16949271
 ] 

Zheng Hu commented on FLINK-14359:
--

When run the sql-client using the packed shaded hbase lib,  I found  hbase 
client still try to load some non-shaded hbase class such as the following 
stracktrace: 
{code}
Flink SQL> select * from MyHBaseSource;
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.runtime.rest.util.RestClientException: [Internal server 
error., (JobManagerRunnerImpl.java:152)
at 
org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner(DefaultJobManagerRunnerFactory.java:84)
at 
org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:380)
at 
org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:34)
... 7 more
Caused by: org.apache.flink.runtime.client.JobExecutionException: Cannot 
initialize task 'Source: HBaseTableSource(rowkey, a, b) -> Map -> to: Tuple2 -> 
Sink: SQL Client Stream Collect Sink': Configuring the input format (null) 
failed: Cannot create connection to HBase.
at 
org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:218)
at 
org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:106)
at 
org.apache.flink.runtime.scheduler.SchedulerBase.createExecutionGraph(SchedulerBase.java:222)
at 
org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:199)
at 
org.apache.flink.runtime.scheduler.SchedulerBase.(SchedulerBase.java:188)
at 
org.apache.flink.runtime.scheduler.LegacyScheduler.(LegacyScheduler.java:65)
at 
org.apache.flink.runtime.scheduler.LegacySchedulerFactory.createInstance(LegacySchedulerFactory.java:70)
at 
org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:278)
at 
org.apache.flink.runtime.jobmaster.JobMaster.(JobMaster.java:266)
at 
org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:98)
at 
org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:40)
at 
org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.(JobManagerRunnerImpl.java:146)
... 10 more
Caused by: java.lang.Exception: Configuring the input format (null) failed: 
Cannot create connection to HBase.
at 
org.apache.flink.runtime.jobgraph.InputOutputFormatVertex.initializeOnMaster(InputOutputFormatVertex.java:80)
at 
org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:214)
... 21 more
Caused by: java.lang.RuntimeException: Cannot create connection to HBase.
at 
org.apache.flink.addons.hbase.HBaseRowInputFormat.connectToTable(HBaseRowInputFormat.java:103)
at 
org.apache.flink.addons.hbase.HBaseRowInputFormat.configure(HBaseRowInputFormat.java:68)
at 
org.apache.flink.runtime.jobgraph.InputOutputFormatVertex.initializeOnMaster(InputOutputFormatVertex.java:77)
... 22 more
Caused by: java.io.IOException: java.lang.reflect.InvocationTargetException
at 
org.apache.flink.hbase.shaded.org.apache.hadoop.hbase.client.ConnectionFactory.createConnection(ConnectionFactory.java:240)
at 
org.apache.flink.hbase.shaded.org.apache.hadoop.hbase.client.ConnectionFactory.createConnection(ConnectionFactory.java:218)
at 
org.apache.flink.hbase.shaded.org.apache.hadoop.hbase.client.ConnectionFactory.createConnection(ConnectionFactory.java:119)
at 
org.apache.flink.addons.hbase.HBaseRowInputFormat.connectToTable(HBaseRowInputFormat.java:96)
... 24 more
Caused by: java.lang.reflect.InvocationTargetException
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at 
org.apache.flink.hbase.shaded.org.apache.hadoop.hbase.client.ConnectionFactory.createConnection(ConnectionFactory.java:238)
... 27 more
Caused by: java.lang.RuntimeException: java.lang.RuntimeException: 
java.lang.ClassNotFoundException: Class 
org.apache.hadoop.hbase.client.ClusterStatusListener$MulticastListener not found
at 
org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2299)
at 

[jira] [Commented] (FLINK-14224) Kafka010ProducerITCase>KafkaProducerTestBase.testOneToOneAtLeastOnceCustomOperator fails on Travis

2019-10-11 Thread Till Rohrmann (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14224?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16949276#comment-16949276
 ] 

Till Rohrmann commented on FLINK-14224:
---

[~1u0] have you tried looping the test on Travis?

> Kafka010ProducerITCase>KafkaProducerTestBase.testOneToOneAtLeastOnceCustomOperator
>  fails on Travis
> --
>
> Key: FLINK-14224
> URL: https://issues.apache.org/jira/browse/FLINK-14224
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.10.0
>Reporter: Till Rohrmann
>Assignee: Alex
>Priority: Blocker
>  Labels: test-stability
> Fix For: 1.10.0
>
>
> The 
> {{Kafka010ProducerITCase>KafkaProducerTestBase.testOneToOneAtLeastOnceCustomOperator}}
>  fails on Travis with
> {code}
> Test 
> testOneToOneAtLeastOnceCustomOperator(org.apache.flink.streaming.connectors.kafka.Kafka010ProducerITCase)
>  failed with:
> java.lang.AssertionError: Job should fail!
>   at org.junit.Assert.fail(Assert.java:88)
>   at 
> org.apache.flink.streaming.connectors.kafka.KafkaProducerTestBase.testOneToOneAtLeastOnce(KafkaProducerTestBase.java:280)
>   at 
> org.apache.flink.streaming.connectors.kafka.KafkaProducerTestBase.testOneToOneAtLeastOnceCustomOperator(KafkaProducerTestBase.java:214)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
>   at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
>   at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>   at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>   at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
>   at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
>   at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>   at 
> org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:365)
>   at 
> org.apache.maven.surefire.junit4.JUnit4Provider.executeWithRerun(JUnit4Provider.java:273)
>   at 
> org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:238)
>   at 
> org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:159)
>   at 
> org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:384)
>   at 
> org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:345)
>   at 
> org.apache.maven.surefire.booter.ForkedBooter.execute(ForkedBooter.java:126)
>   at 
> org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:418)
> {code}
> https://api.travis-ci.com/v3/job/238920411/log.txt



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot edited a comment on issue #9386: [FLINK-13601][tests] Harden RegionFailoverITCase by recording info when checkpoint just completed

2019-10-11 Thread GitBox
flinkbot edited a comment on issue #9386: [FLINK-13601][tests] Harden 
RegionFailoverITCase by recording info when checkpoint just completed
URL: https://github.com/apache/flink/pull/9386#issuecomment-519211363
 
 
   
   ## CI report:
   
   * 9a4ab3785d8cf1c87d11df6390ee435664cfd964 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/122323718)
   * 2b6eeb771d5baa303e2663d17bc4b7d77d8b4280 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/124107744)
   * d1549ae43bdf7863057bb75860042c84048db67b : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/124156091)
   * 39c3034ffdbe7498d5a78534c2e0969852e81fdf : UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-14309) Kafka09ProducerITCase>KafkaProducerTestBase.testOneToOneAtLeastOnceRegularSink fails on Travis

2019-10-11 Thread Till Rohrmann (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14309?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16949307#comment-16949307
 ] 

Till Rohrmann commented on FLINK-14309:
---

Does it make sense to backport this fix to earlier release branches 
[~becket_qin]?

> Kafka09ProducerITCase>KafkaProducerTestBase.testOneToOneAtLeastOnceRegularSink
>  fails on Travis
> --
>
> Key: FLINK-14309
> URL: https://issues.apache.org/jira/browse/FLINK-14309
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.10.0
>Reporter: Till Rohrmann
>Assignee: Jiayi Liao
>Priority: Critical
>  Labels: pull-request-available, test-stability
> Fix For: 1.10.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> The 
> {{Kafka09ProducerITCase>KafkaProducerTestBase.testOneToOneAtLeastOnceRegularSink}}
>  fails on Travis with 
> {code}
> Test 
> testOneToOneAtLeastOnceRegularSink(org.apache.flink.streaming.connectors.kafka.Kafka09ProducerITCase)
>  failed with:
> java.lang.AssertionError: Job should fail!
>   at org.junit.Assert.fail(Assert.java:88)
>   at 
> org.apache.flink.streaming.connectors.kafka.KafkaProducerTestBase.testOneToOneAtLeastOnce(KafkaProducerTestBase.java:280)
>   at 
> org.apache.flink.streaming.connectors.kafka.KafkaProducerTestBase.testOneToOneAtLeastOnceRegularSink(KafkaProducerTestBase.java:206)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
>   at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
>   at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>   at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>   at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
>   at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
>   at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>   at 
> org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:365)
>   at 
> org.apache.maven.surefire.junit4.JUnit4Provider.executeWithRerun(JUnit4Provider.java:273)
>   at 
> org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:238)
>   at 
> org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:159)
>   at 
> org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:384)
>   at 
> org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:345)
>   at 
> org.apache.maven.surefire.booter.ForkedBooter.execute(ForkedBooter.java:126)
>   at 
> org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:418)
> {code}
> https://api.travis-ci.com/v3/job/240747188/log.txt



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] pnowojski commented on a change in pull request #9646: [FLINK-14004][runtime] Define SourceReaderOperator to verify the integration with StreamOneInputProcessor

2019-10-11 Thread GitBox
pnowojski commented on a change in pull request #9646:  [FLINK-14004][runtime] 
Define SourceReaderOperator to verify the integration with 
StreamOneInputProcessor
URL: https://github.com/apache/flink/pull/9646#discussion_r333898900
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceReaderOperator.java
 ##
 @@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput;
+
+/**
+ * Base source operator only used for integrating the source reader which is 
proposed by FLIP-27. It implements
+ * the interface of {@link PushingAsyncDataInput} for naturally compatible 
with one input processing in runtime
+ * stack.
+ *
+ * @param  The output type of the operator
+ */
+@PublicEvolving
+public abstract class SourceReaderOperator extends 
AbstractStreamOperator implements PushingAsyncDataInput {
 
 Review comment:
   nit: maybe add a note, that we are expecting this to be changed to the 
concrete class once `SourceReader` interface is introduced?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] pnowojski commented on a change in pull request #9646: [FLINK-14004][runtime] Define SourceReaderOperator to verify the integration with StreamOneInputProcessor

2019-10-11 Thread GitBox
pnowojski commented on a change in pull request #9646:  [FLINK-14004][runtime] 
Define SourceReaderOperator to verify the integration with 
StreamOneInputProcessor
URL: https://github.com/apache/flink/pull/9646#discussion_r333898651
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceReaderOperator.java
 ##
 @@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput;
+
+/**
+ * Base source operator only used for integrating the source reader which is 
proposed by FLIP-27. It implements
+ * the interface of {@link PushingAsyncDataInput} for naturally compatible 
with one input processing in runtime
+ * stack.
+ *
+ * @param  The output type of the operator
+ */
+@PublicEvolving
 
 Review comment:
   I think I would mark it as `Internal` and let the API folks later decide 
whether to expose this, or just the `SourceReader`. (Imo there is no need to 
expose the `SourceReaderOperator`).


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #9824: [FLINK-14302] FlinkKafkaInternalProducer should not send `ADD_PARTITIONS_TO_TXN` request if `newPartitionsInTransaction` is empty when enable

2019-10-11 Thread GitBox
flinkbot edited a comment on issue #9824: [FLINK-14302] 
FlinkKafkaInternalProducer should not send `ADD_PARTITIONS_TO_TXN` request if 
`newPartitionsInTransaction` is empty when enable EoS
URL: https://github.com/apache/flink/pull/9824#issuecomment-536928428
 
 
   
   ## CI report:
   
   * 87ae788d6490da7af5284c404648647c9919a6df : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/129820284)
   * c4b5a18b039a90aa6b313c43f423622536bc8cd4 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/131462013)
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-14371) Enable ClassLoaderITCase to pass with scheduler NG

2019-10-11 Thread Zhu Zhu (Jira)
Zhu Zhu created FLINK-14371:
---

 Summary: Enable ClassLoaderITCase to pass with scheduler NG
 Key: FLINK-14371
 URL: https://issues.apache.org/jira/browse/FLINK-14371
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination, Tests
Affects Versions: 1.10.0
Reporter: Zhu Zhu
 Fix For: 1.10.0


ClassLoaderITCase now fails with scheduler NG.
There are 3 reasons for the failure:
1. state restore is not supported in scheduler NG yet
2. the cause of the expected exception is a bit different
3. there are multiples tasks in multiple regions, which will result in more 
failovers than expected as scheduler NG is using region failover

We need to support the state restore in scheduler NG and them fix this case. 
And then annotate it with AlsoRunWithSchedulerNG.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot edited a comment on issue #9865: [FLINK-14212][python] Support no-argument Python UDFs.

2019-10-11 Thread GitBox
flinkbot edited a comment on issue #9865: [FLINK-14212][python] Support 
no-argument Python UDFs.
URL: https://github.com/apache/flink/pull/9865#issuecomment-539919841
 
 
   
   ## CI report:
   
   * bf1d566ea2f91d61c2f436bb92b5337088b7 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/131102433)
   * 3110f74ae60ed10bdb2bdbed7dd1facfde9fdeea : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/131266172)
   * f48b910ceee34ff7eb9f6f75d7782b49005c587f : CANCELED 
[Build](https://travis-ci.com/flink-ci/flink/builds/131275232)
   * c97adb28424b4518535c9922b011d7adcf5e842f : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/131279546)
   * 4edf1202d5b9c34f329ca29ed96430b69a5807cd : PENDING 
[Build](https://travis-ci.com/flink-ci/flink/builds/131468804)
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-14372) Enable KeyedStateCheckpointingITCase to pass with scheduler NG

2019-10-11 Thread Zhu Zhu (Jira)
Zhu Zhu created FLINK-14372:
---

 Summary: Enable KeyedStateCheckpointingITCase to pass with 
scheduler NG
 Key: FLINK-14372
 URL: https://issues.apache.org/jira/browse/FLINK-14372
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination, Tests
Affects Versions: 1.10.0
Reporter: Zhu Zhu
 Fix For: 1.10.0


ClassLoaderITCase now fails with scheduler NG.
The failure cause is that state restore is not supported in scheduler NG yet.

We need to support the state restore in scheduler NG and annotate it with 
AlsoRunWithSchedulerNG.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-14372) Enable KeyedStateCheckpointingITCase to pass with scheduler NG

2019-10-11 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14372?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu updated FLINK-14372:

Description: 
KeyedStateCheckpointingITCase currently fails with scheduler NG.
The failure cause is that state restore is not supported in scheduler NG yet.

We need to support the state restore in scheduler NG and annotate it with 
AlsoRunWithSchedulerNG.

  was:
ClassLoaderITCase now fails with scheduler NG.
The failure cause is that state restore is not supported in scheduler NG yet.

We need to support the state restore in scheduler NG and annotate it with 
AlsoRunWithSchedulerNG.


> Enable KeyedStateCheckpointingITCase to pass with scheduler NG
> --
>
> Key: FLINK-14372
> URL: https://issues.apache.org/jira/browse/FLINK-14372
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination, Tests
>Affects Versions: 1.10.0
>Reporter: Zhu Zhu
>Priority: Major
> Fix For: 1.10.0
>
>
> KeyedStateCheckpointingITCase currently fails with scheduler NG.
> The failure cause is that state restore is not supported in scheduler NG yet.
> We need to support the state restore in scheduler NG and annotate it with 
> AlsoRunWithSchedulerNG.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot edited a comment on issue #9386: [FLINK-13601][tests] Harden RegionFailoverITCase by recording info when checkpoint just completed

2019-10-11 Thread GitBox
flinkbot edited a comment on issue #9386: [FLINK-13601][tests] Harden 
RegionFailoverITCase by recording info when checkpoint just completed
URL: https://github.com/apache/flink/pull/9386#issuecomment-519211363
 
 
   
   ## CI report:
   
   * 9a4ab3785d8cf1c87d11df6390ee435664cfd964 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/122323718)
   * 2b6eeb771d5baa303e2663d17bc4b7d77d8b4280 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/124107744)
   * d1549ae43bdf7863057bb75860042c84048db67b : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/124156091)
   * 39c3034ffdbe7498d5a78534c2e0969852e81fdf : PENDING 
[Build](https://travis-ci.com/flink-ci/flink/builds/131468822)
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-14373) Enable ZooKeeperHighAvailabilityITCase to pass with scheduler NG

2019-10-11 Thread Zhu Zhu (Jira)
Zhu Zhu created FLINK-14373:
---

 Summary: Enable ZooKeeperHighAvailabilityITCase to pass with 
scheduler NG
 Key: FLINK-14373
 URL: https://issues.apache.org/jira/browse/FLINK-14373
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination, Tests
Affects Versions: 1.10.0
Reporter: Zhu Zhu
 Fix For: 1.10.0


ZooKeeperHighAvailabilityITCase currently fails with scheduler NG.
The failure cause is that it will invoke ExecutionGraph#failGlobal but that 
method is not ready for use in scheduler NG.

We need to support failGlobal in scheduler NG to make this case pass with 
scheduler NG. And then annotate it with AlsoRunWithSchedulerNG.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-13567) Avro Confluent Schema Registry nightly end-to-end test failed on Travis

2019-10-11 Thread Till Rohrmann (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-13567?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16949313#comment-16949313
 ] 

Till Rohrmann edited comment on FLINK-13567 at 10/11/19 9:30 AM:
-

I've disabled the test in {{master}} via 
2c2095bdad3d47f27973a585112ed820f457de6f until the problem has been fixed.


was (Author: till.rohrmann):
I've disabled the test via 2c2095bdad3d47f27973a585112ed820f457de6f until the 
problem has been fixed.

> Avro Confluent Schema Registry nightly end-to-end test failed on Travis
> ---
>
> Key: FLINK-13567
> URL: https://issues.apache.org/jira/browse/FLINK-13567
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka, Tests
>Affects Versions: 1.8.2, 1.9.0, 1.10.0
>Reporter: Till Rohrmann
>Priority: Blocker
>  Labels: test-stability
> Fix For: 1.10.0
>
>
> The {{Avro Confluent Schema Registry nightly end-to-end test}} failed on 
> Travis with
> {code}
> [FAIL] 'Avro Confluent Schema Registry nightly end-to-end test' failed after 
> 2 minutes and 11 seconds! Test exited with exit code 1
> No taskexecutor daemon (pid: 29044) is running anymore on 
> travis-job-b0823aec-c4ec-4d4b-8b59-e9f968de9501.
> No standalonesession daemon to stop on host 
> travis-job-b0823aec-c4ec-4d4b-8b59-e9f968de9501.
> rm: cannot remove 
> '/home/travis/build/apache/flink/flink-dist/target/flink-1.10-SNAPSHOT-bin/flink-1.10-SNAPSHOT/plugins':
>  No such file or directory
> {code}
> https://api.travis-ci.org/v3/job/567273939/log.txt



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-13567) Avro Confluent Schema Registry nightly end-to-end test failed on Travis

2019-10-11 Thread Till Rohrmann (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-13567?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16949313#comment-16949313
 ] 

Till Rohrmann commented on FLINK-13567:
---

I've disabled the test via 2c2095bdad3d47f27973a585112ed820f457de6f until the 
problem has been fixed.

> Avro Confluent Schema Registry nightly end-to-end test failed on Travis
> ---
>
> Key: FLINK-13567
> URL: https://issues.apache.org/jira/browse/FLINK-13567
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka, Tests
>Affects Versions: 1.8.2, 1.9.0, 1.10.0
>Reporter: Till Rohrmann
>Priority: Blocker
>  Labels: test-stability
> Fix For: 1.10.0
>
>
> The {{Avro Confluent Schema Registry nightly end-to-end test}} failed on 
> Travis with
> {code}
> [FAIL] 'Avro Confluent Schema Registry nightly end-to-end test' failed after 
> 2 minutes and 11 seconds! Test exited with exit code 1
> No taskexecutor daemon (pid: 29044) is running anymore on 
> travis-job-b0823aec-c4ec-4d4b-8b59-e9f968de9501.
> No standalonesession daemon to stop on host 
> travis-job-b0823aec-c4ec-4d4b-8b59-e9f968de9501.
> rm: cannot remove 
> '/home/travis/build/apache/flink/flink-dist/target/flink-1.10-SNAPSHOT-bin/flink-1.10-SNAPSHOT/plugins':
>  No such file or directory
> {code}
> https://api.travis-ci.org/v3/job/567273939/log.txt



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot edited a comment on issue #9820: [FLINK-14290] Decouple plan translation from job execution/ClusterClient

2019-10-11 Thread GitBox
flinkbot edited a comment on issue #9820: [FLINK-14290] Decouple plan 
translation from job execution/ClusterClient
URL: https://github.com/apache/flink/pull/9820#issuecomment-536452582
 
 
   
   ## CI report:
   
   * ff3ef7ae21616fb0295e3bdc53fa349c8f136a4b : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/129646465)
   * 654524c3508c89d08d5039a451ba425a5a2e3a41 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/129666135)
   * 7f462e2cad4ca8866abccda492b53007a45b3407 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/129712387)
   * 196dde727329e41b1a08dc6d5093d5190f5c0ea4 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/131524743)
   * 013151873e2ad7a1c79284c3807380571b663b98 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/131530297)
   * e0328ab5dc8c3c24a842281afbf36ecaf98234a3 : UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] Henvealf commented on a change in pull request #9805: [FLINK-11635][docs-zh] translate dev/stream/state/checkpointing into Chinese

2019-10-11 Thread GitBox
Henvealf commented on a change in pull request #9805: [FLINK-11635][docs-zh] 
translate dev/stream/state/checkpointing into Chinese
URL: https://github.com/apache/flink/pull/9805#discussion_r334032024
 
 

 ##
 File path: docs/dev/stream/state/checkpointing.zh.md
 ##
 @@ -25,146 +25,138 @@ under the License.
 * ToC
 {:toc}
 
-Every function and operator in Flink can be **stateful** (see [working with 
state](state.html) for details).
-Stateful functions store data across the processing of individual 
elements/events, making state a critical building block for
-any type of more elaborate operation.
+Flink 中的每个方法或算子都能够是**有状态的**(阅读 [working with state](state.html) 查看详细)。
+状态化的方法在处理单个 元素/事件 的时候存储数据,让状态成为使各个类型的算子更加精细的重要部分。
+为了让状态容错,Flink 需要为状态添加**Checkpoint(检查点)**。Checkpoint 使得 Flink 
能够恢复状态和在流中的位置,从而向应用提供和无故障执行时一样的语义。
 
-In order to make state fault tolerant, Flink needs to **checkpoint** the 
state. Checkpoints allow Flink to recover state and positions
-in the streams to give the application the same semantics as a failure-free 
execution.
+[Documentation on streaming fault tolerance]({{ site.baseurl 
}}/zh/internals/stream_checkpointing.html) 介绍了 Flink 流计算容错机制的内部技术原理。
 
 Review comment:
   好


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xintongsong commented on a change in pull request #9760: [FLINK-13982][runtime] Implement memory calculation logics

2019-10-11 Thread GitBox
xintongsong commented on a change in pull request #9760: [FLINK-13982][runtime] 
Implement memory calculation logics
URL: https://github.com/apache/flink/pull/9760#discussion_r334039016
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtils.java
 ##
 @@ -0,0 +1,659 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.runtime.util.ConfigurationParserUtils;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Utility class for TaskExecutor memory configurations.
+ *
+ * A TaskExecutor's memory consists of the following components.
+ * 
+ * Framework Heap Memory
+ * Task Heap Memory
+ * Task Off-Heap Memory
+ * Shuffle Memory
+ * Managed Memory
+ * 
+ * On-Heap Managed Memory
+ * Off-Heap Managed Memory
+ * 
+ * JVM Metaspace
+ * JVM Overhead
+ * 
+ * Among all the components, Framework Heap Memory, Task Heap Memory and 
On-Heap Managed Memory use on heap memory,
+ * while the rest use off heap memory. We use Total Process Memory to refer to 
all the memory components, while Total
+ * Flink Memory refering to all the components except JVM Metaspace and JVM 
Overhead.
+ *
+ * The relationships of TaskExecutor memory components are shown below.
+ * 
+ *   ┌ ─ ─ Total Process Memory  ─ ─ ┐
+ *┌ ─ ─ Total Flink Memory  ─ ─ ┐
+ *   │ ┌───┐ │
+ *││   Framework Heap Memory   ││  ─┐
+ *   │ └───┘ │  │
+ *│┌───┐│   │
+ *   │ │ Task Heap Memory  │ │ ─┤
+ *│└───┘│   │
+ *   │ ┌───┐ │  │
+ *┌─  ││   Task Off-Heap Memory││   │
+ *│  │ └───┘ │  ├─ On-Heap
+ *│   │┌───┐│   │
+ *├─ │ │  Shuffle Memory   │ │  │
+ *│   │└───┘│   │
+ *│  │ ┌─ Managed Memory ──┐ │  │
+ *│   ││┌─┐││   │
+ *│  │ ││ On-Heap Managed Memory  ││ │ ─┘
+ *│   ││├─┤││
+ *  Off-Heap ─┼─ │ ││ Off-Heap Managed Memory ││ │
+ *│   ││└─┘││
+ *│  │ └───┘ │
+ *│   └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘
+ *│  │┌─┐│
+ *├─  │JVM Metaspace│
+ *│  │└─┘│
+ *│   ┌─┐
+ *└─ ││JVM Overhead ││
+ *└─┘
+ *   └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘
+ * 
+ */
+public class TaskExecutorResourceUtils {
+
+   private TaskExecutorResourceUtils() {}
+
+   // 

+   //  Generating JVM Parameters
+   // 

+
+   public static String generateJvmParametersStr(final 
TaskExecutorResourceSpec taskExecutorResourceSpec) {
+   final MemorySize jvmHeapSize = 
taskExecutorResourceSpec.getFrameworkHeapSize()
+   .add(taskExecutorResourceSpec.getTaskHeapSize())
+   
.add(taskExecutorResourceSpec.getOnHeapManagedMemorySize());
+   final MemorySize jvmDirectSize = 
taskExecutorResourceSpec.getTaskOffHeapSize()
+   

[GitHub] [flink] tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend

2019-10-11 Thread GitBox
tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State 
Backends] Support on-disk state storage for spill-able heap backend
URL: https://github.com/apache/flink/pull/9501#discussion_r334028658
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMap.java
 ##
 @@ -1524,4 +1481,31 @@ public void update(StateEntry stateEntry, S 
newValue) {
}
}
 
+   /**
+* Iterate versions of the given node.
+*/
+   class ValueVersionIterator implements Iterator {
 
 Review comment:
   `private`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend

2019-10-11 Thread GitBox
tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State 
Backends] Support on-disk state storage for spill-able heap backend
URL: https://github.com/apache/flink/pull/9501#discussion_r334020491
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMap.java
 ##
 @@ -310,57 +299,21 @@ public S removeAndGetOld(K key, N namespace) {
 * @param keyByteBuffer byte buffer storing the key.
 * @param keyOffset offset of the key.
 * @param keyLenlength of the key.
-* @return id of the node. NIL_NODE will be returned if key does no 
exist.
+* @return the state. Null will be returned if key does not exist.
 */
@VisibleForTesting
-   long getNode(ByteBuffer keyByteBuffer, int keyOffset, int keyLen) {
-   int deleteCount = 0;
-   long prevNode = findPredecessor(keyByteBuffer, keyOffset, 1);
-   long currentNode  = helpGetNextNode(prevNode, 0);
-   long nextNode;
-
-   int c;
-   while (currentNode != NIL_NODE) {
-   nextNode = helpGetNextNode(currentNode, 0);
-
-   boolean isRemoved = isNodeRemoved(currentNode);
-   if (isRemoved) {
-   // remove the node physically when there is no 
snapshot running
-   if (highestRequiredSnapshotVersion == 0 && 
deleteCount < numKeysToDeleteOneTime) {
-   doPhysicalRemove(currentNode, prevNode, 
nextNode);
-   
logicallyRemovedNodes.remove(currentNode);
-   totalSize--;
-   deleteCount++;
-   } else {
-   prevNode = currentNode;
-   }
-   currentNode = nextNode;
-   continue;
-   }
-
-   c = compareByteBufferAndNode(keyByteBuffer, keyOffset, 
keyLen, currentNode);
-
-   // find the key
-   if (c == 0) {
-   return currentNode;
-   }
-
-   // the key is less than the current node, and nodes 
after current
-   // node can not be equal to the key.
-   if (c < 0) {
-   break;
-   }
-
-   prevNode = currentNode;
-   currentNode = helpGetNextNode(currentNode, 0);
-   }
-
-   return NIL_NODE;
+   S getNode(ByteBuffer keyByteBuffer, int keyOffset, int keyLen) {
 
 Review comment:
   `@Nullable` annotation is missing


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend

2019-10-11 Thread GitBox
tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State 
Backends] Support on-disk state storage for spill-able heap backend
URL: https://github.com/apache/flink/pull/9501#discussion_r334028005
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMap.java
 ##
 @@ -1136,10 +1088,9 @@ S helpGetState(long valuePointer, 
SkipListValueSerializer serializer) {
return null;
}
 
-   Chunk chunk = 
spaceAllocator.getChunkById(SpaceUtils.getChunkIdByAddress(valuePointer));
-   int offsetInChunk = 
SpaceUtils.getChunkOffsetByAddress(valuePointer);
-   ByteBuffer bb = chunk.getByteBuffer(offsetInChunk);
-   int offsetInByteBuffer = 
chunk.getOffsetInByteBuffer(offsetInChunk);
+   Tuple2 tuple2 = 
getNodeByteBufferAndOffset(valuePointer);
+   ByteBuffer bb = tuple2.f0;
+   int offsetInByteBuffer = tuple2.f1;
 
 Review comment:
   if the `ByteBuffer` and the offset are needed by all methods to perform an 
operation on it, then one could directly pass the combined value object instead 
of unwrapping the individual fields and passing them individually to the 
modifying method.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend

2019-10-11 Thread GitBox
tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State 
Backends] Support on-disk state storage for spill-able heap backend
URL: https://github.com/apache/flink/pull/9501#discussion_r334020903
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMap.java
 ##
 @@ -310,57 +299,21 @@ public S removeAndGetOld(K key, N namespace) {
 * @param keyByteBuffer byte buffer storing the key.
 * @param keyOffset offset of the key.
 * @param keyLenlength of the key.
-* @return id of the node. NIL_NODE will be returned if key does no 
exist.
+* @return the state. Null will be returned if key does not exist.
 */
@VisibleForTesting
-   long getNode(ByteBuffer keyByteBuffer, int keyOffset, int keyLen) {
-   int deleteCount = 0;
-   long prevNode = findPredecessor(keyByteBuffer, keyOffset, 1);
-   long currentNode  = helpGetNextNode(prevNode, 0);
-   long nextNode;
-
-   int c;
-   while (currentNode != NIL_NODE) {
-   nextNode = helpGetNextNode(currentNode, 0);
-
-   boolean isRemoved = isNodeRemoved(currentNode);
-   if (isRemoved) {
-   // remove the node physically when there is no 
snapshot running
-   if (highestRequiredSnapshotVersion == 0 && 
deleteCount < numKeysToDeleteOneTime) {
-   doPhysicalRemove(currentNode, prevNode, 
nextNode);
-   
logicallyRemovedNodes.remove(currentNode);
-   totalSize--;
-   deleteCount++;
-   } else {
-   prevNode = currentNode;
-   }
-   currentNode = nextNode;
-   continue;
-   }
-
-   c = compareByteBufferAndNode(keyByteBuffer, keyOffset, 
keyLen, currentNode);
-
-   // find the key
-   if (c == 0) {
-   return currentNode;
-   }
-
-   // the key is less than the current node, and nodes 
after current
-   // node can not be equal to the key.
-   if (c < 0) {
-   break;
-   }
-
-   prevNode = currentNode;
-   currentNode = helpGetNextNode(currentNode, 0);
-   }
-
-   return NIL_NODE;
+   S getNode(ByteBuffer keyByteBuffer, int keyOffset, int keyLen) {
+   Tuple4 result =  
iterateAndProcess(keyByteBuffer, keyOffset, keyLen,
 
 Review comment:
   I would refrain from using `Tuple4`. Instead I suggest to create an explicit 
class with descriptive fields.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend

2019-10-11 Thread GitBox
tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State 
Backends] Support on-disk state storage for spill-able heap backend
URL: https://github.com/apache/flink/pull/9501#discussion_r334021225
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMap.java
 ##
 @@ -310,57 +299,21 @@ public S removeAndGetOld(K key, N namespace) {
 * @param keyByteBuffer byte buffer storing the key.
 * @param keyOffset offset of the key.
 * @param keyLenlength of the key.
-* @return id of the node. NIL_NODE will be returned if key does no 
exist.
+* @return the state. Null will be returned if key does not exist.
 */
@VisibleForTesting
-   long getNode(ByteBuffer keyByteBuffer, int keyOffset, int keyLen) {
-   int deleteCount = 0;
-   long prevNode = findPredecessor(keyByteBuffer, keyOffset, 1);
-   long currentNode  = helpGetNextNode(prevNode, 0);
-   long nextNode;
-
-   int c;
-   while (currentNode != NIL_NODE) {
-   nextNode = helpGetNextNode(currentNode, 0);
-
-   boolean isRemoved = isNodeRemoved(currentNode);
-   if (isRemoved) {
-   // remove the node physically when there is no 
snapshot running
-   if (highestRequiredSnapshotVersion == 0 && 
deleteCount < numKeysToDeleteOneTime) {
-   doPhysicalRemove(currentNode, prevNode, 
nextNode);
-   
logicallyRemovedNodes.remove(currentNode);
-   totalSize--;
-   deleteCount++;
-   } else {
-   prevNode = currentNode;
-   }
-   currentNode = nextNode;
-   continue;
-   }
-
-   c = compareByteBufferAndNode(keyByteBuffer, keyOffset, 
keyLen, currentNode);
-
-   // find the key
-   if (c == 0) {
-   return currentNode;
-   }
-
-   // the key is less than the current node, and nodes 
after current
-   // node can not be equal to the key.
-   if (c < 0) {
-   break;
-   }
-
-   prevNode = currentNode;
-   currentNode = helpGetNextNode(currentNode, 0);
-   }
-
-   return NIL_NODE;
+   S getNode(ByteBuffer keyByteBuffer, int keyOffset, int keyLen) {
+   Tuple4 result =  
iterateAndProcess(keyByteBuffer, keyOffset, keyLen,
+   (tuple3, isRemoved) -> {
+   long currentNode = tuple3.f1;
+   return isRemoved ? null : 
getNodeStateHelper(currentNode);
+   });
+   return result.f2 ? result.f3 : null;
 
 Review comment:
   I have no idea what this expresses. I think there is a way to make it more 
expressive.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xintongsong commented on a change in pull request #9760: [FLINK-13982][runtime] Implement memory calculation logics

2019-10-11 Thread GitBox
xintongsong commented on a change in pull request #9760: [FLINK-13982][runtime] 
Implement memory calculation logics
URL: https://github.com/apache/flink/pull/9760#discussion_r334046721
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtils.java
 ##
 @@ -0,0 +1,659 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.runtime.util.ConfigurationParserUtils;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Utility class for TaskExecutor memory configurations.
+ *
+ * A TaskExecutor's memory consists of the following components.
+ * 
+ * Framework Heap Memory
+ * Task Heap Memory
+ * Task Off-Heap Memory
+ * Shuffle Memory
+ * Managed Memory
+ * 
+ * On-Heap Managed Memory
+ * Off-Heap Managed Memory
+ * 
+ * JVM Metaspace
+ * JVM Overhead
+ * 
+ * Among all the components, Framework Heap Memory, Task Heap Memory and 
On-Heap Managed Memory use on heap memory,
+ * while the rest use off heap memory. We use Total Process Memory to refer to 
all the memory components, while Total
+ * Flink Memory refering to all the components except JVM Metaspace and JVM 
Overhead.
+ *
+ * The relationships of TaskExecutor memory components are shown below.
+ * 
+ *   ┌ ─ ─ Total Process Memory  ─ ─ ┐
+ *┌ ─ ─ Total Flink Memory  ─ ─ ┐
+ *   │ ┌───┐ │
+ *││   Framework Heap Memory   ││  ─┐
+ *   │ └───┘ │  │
+ *│┌───┐│   │
+ *   │ │ Task Heap Memory  │ │ ─┤
+ *│└───┘│   │
+ *   │ ┌───┐ │  │
+ *┌─  ││   Task Off-Heap Memory││   │
+ *│  │ └───┘ │  ├─ On-Heap
+ *│   │┌───┐│   │
+ *├─ │ │  Shuffle Memory   │ │  │
+ *│   │└───┘│   │
+ *│  │ ┌─ Managed Memory ──┐ │  │
+ *│   ││┌─┐││   │
+ *│  │ ││ On-Heap Managed Memory  ││ │ ─┘
+ *│   ││├─┤││
+ *  Off-Heap ─┼─ │ ││ Off-Heap Managed Memory ││ │
+ *│   ││└─┘││
+ *│  │ └───┘ │
+ *│   └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘
+ *│  │┌─┐│
+ *├─  │JVM Metaspace│
+ *│  │└─┘│
+ *│   ┌─┐
+ *└─ ││JVM Overhead ││
+ *└─┘
+ *   └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘
+ * 
+ */
+public class TaskExecutorResourceUtils {
+
+   private TaskExecutorResourceUtils() {}
+
+   // 

+   //  Generating JVM Parameters
+   // 

+
+   public static String generateJvmParametersStr(final 
TaskExecutorResourceSpec taskExecutorResourceSpec) {
+   final MemorySize jvmHeapSize = 
taskExecutorResourceSpec.getFrameworkHeapSize()
+   .add(taskExecutorResourceSpec.getTaskHeapSize())
+   
.add(taskExecutorResourceSpec.getOnHeapManagedMemorySize());
+   final MemorySize jvmDirectSize = 
taskExecutorResourceSpec.getTaskOffHeapSize()
+   

[GitHub] [flink] tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend

2019-10-11 Thread GitBox
tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State 
Backends] Support on-disk state storage for spill-able heap backend
URL: https://github.com/apache/flink/pull/9501#discussion_r334018453
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/NodeStatus.java
 ##
 @@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+/**
+ * Status of the node.
+ */
+public enum NodeStatus {
+
+   PUT((byte) 0), REMOVE((byte) 1);
+
+   private byte value;
 
 Review comment:
   `final` is missing


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend

2019-10-11 Thread GitBox
tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State 
Backends] Support on-disk state storage for spill-able heap backend
URL: https://github.com/apache/flink/pull/9501#discussion_r334035510
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-heap-spillable/src/test/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMapTest.java
 ##
 @@ -115,577 +144,783 @@ public void testInitStateMap() {
assertFalse(stateMap.getStateIncrementalVisitor(100).hasNext());
 
stateMap.close();
-   assertEquals(0, stateMap.size());
-   assertEquals(0, stateMap.totalSize());
-   assertTrue(stateMap.isClosed());
}
 
/**
-* Test basic operations.
+* Test state put operation.
 */
@Test
-   public void testBasicOperations() throws Exception {
-   TypeSerializer keySerializer = IntSerializer.INSTANCE;
-   TypeSerializer namespaceSerializer = 
LongSerializer.INSTANCE;
-   TypeSerializer stateSerializer = 
StringSerializer.INSTANCE;
-   CopyOnWriteSkipListStateMap stateMap = 
new CopyOnWriteSkipListStateMap<>(
-   keySerializer, namespaceSerializer, stateSerializer, 
spaceAllocator);
+   public void testPutState() {
+   testWithFunction((totalSize, stateMap, referenceStates) -> 
getDefaultSizes(totalSize));
+   }
 
-   ThreadLocalRandom random = ThreadLocalRandom.current();
-   // map to store expected states, namespace -> key -> state
-   Map> referenceStates = new 
HashMap<>();
-   int totalSize = 0;
+   /**
+* Test remove existing state.
+*/
+   @Test
+   public void testRemoveExistingState() {
+   testRemoveState(false, false);
+   }
 
-   // put some states
-   for (long namespace = 0; namespace < 10; namespace++) {
-   for (int key = 0; key < 100; key++) {
-   totalSize++;
-   String state = String.valueOf(key * namespace);
-   if (random.nextBoolean()) {
-   stateMap.put(key, namespace, state);
-   } else {
-   assertNull(stateMap.putAndGetOld(key, 
namespace, state));
+   /**
+* Test remove and get existing state.
+*/
+   @Test
+   public void testRemoveAndGetExistingState() {
+   testRemoveState(false, true);
+   }
+
+   /**
+* Test remove absent state.
+*/
+   @Test
+   public void testRemoveAbsentState() {
+   testRemoveState(true, true);
+   }
+
+   /**
+* Test remove previously removed state.
+*/
+   @Test
+   public void testPutPreviouslyRemovedState() {
+   testWithFunction(
+   (totalSize, stateMap, referenceStates) -> 
applyFunctionAfterRemove(stateMap, referenceStates,
+   (removedCnt, removedStates) -> {
+   int size = totalSize - removedCnt;
+   for (Map.Entry> 
entry : removedStates.entrySet()) {
+   long namespace = entry.getKey();
+   for (int key : 
entry.getValue()) {
+   size++;
+   String state = 
String.valueOf(key * namespace);
+   
assertNull(stateMap.putAndGetOld(key, namespace, state));
+   
referenceStates.computeIfAbsent(namespace, (none) -> new HashMap<>()).put(key, 
String.valueOf(state));
+   }
+   }
+   return getDefaultSizes(size);
}
-   referenceStates.computeIfAbsent(namespace, 
(none) -> new HashMap<>()).put(key, state);
-   assertEquals(totalSize, stateMap.size());
-   assertEquals(totalSize, stateMap.totalSize());
-   }
-   }
+   )
+   );
+   }
 
-   // validates space allocation. Each pair need 2 spaces
-   assertEquals(totalSize * 2, 
spaceAllocator.getTotalSpaceNumber());
-   verifyState(referenceStates, stateMap);
+   private void testRemoveState(boolean removeAbsent, boolean getOld) {
+   testWithFunction(
+   (totalSize, stateMap, referenceStates) -> {
+   if (removeAbsent) {
+ 

[GitHub] [flink] tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend

2019-10-11 Thread GitBox
tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State 
Backends] Support on-disk state storage for spill-able heap backend
URL: https://github.com/apache/flink/pull/9501#discussion_r334018883
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/NodeStatus.java
 ##
 @@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+/**
+ * Status of the node.
+ */
+public enum NodeStatus {
+
+   PUT((byte) 0), REMOVE((byte) 1);
+
+   private byte value;
+
+   NodeStatus(byte value) {
+   this.value = value;
+   }
+
+   public byte getValue() {
+   return value;
+   }
+
+   public static NodeStatus valueOf(byte value) {
+   switch (value) {
+   case 0:
+   return PUT;
+   case 1:
+   return REMOVE;
+   default:
+   throw new RuntimeException("Unknown type: " + 
value);
 
 Review comment:
   I would throw an `IllegalArgumentException` similar to `Enum#valueOf`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend

2019-10-11 Thread GitBox
tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State 
Backends] Support on-disk state storage for spill-able heap backend
URL: https://github.com/apache/flink/pull/9501#discussion_r334038508
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-heap-spillable/src/test/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMapTest.java
 ##
 @@ -115,577 +144,783 @@ public void testInitStateMap() {
assertFalse(stateMap.getStateIncrementalVisitor(100).hasNext());
 
stateMap.close();
-   assertEquals(0, stateMap.size());
-   assertEquals(0, stateMap.totalSize());
-   assertTrue(stateMap.isClosed());
}
 
/**
-* Test basic operations.
+* Test state put operation.
 */
@Test
-   public void testBasicOperations() throws Exception {
-   TypeSerializer keySerializer = IntSerializer.INSTANCE;
-   TypeSerializer namespaceSerializer = 
LongSerializer.INSTANCE;
-   TypeSerializer stateSerializer = 
StringSerializer.INSTANCE;
-   CopyOnWriteSkipListStateMap stateMap = 
new CopyOnWriteSkipListStateMap<>(
-   keySerializer, namespaceSerializer, stateSerializer, 
spaceAllocator);
+   public void testPutState() {
+   testWithFunction((totalSize, stateMap, referenceStates) -> 
getDefaultSizes(totalSize));
+   }
 
-   ThreadLocalRandom random = ThreadLocalRandom.current();
-   // map to store expected states, namespace -> key -> state
-   Map> referenceStates = new 
HashMap<>();
-   int totalSize = 0;
+   /**
+* Test remove existing state.
+*/
+   @Test
+   public void testRemoveExistingState() {
+   testRemoveState(false, false);
+   }
 
-   // put some states
-   for (long namespace = 0; namespace < 10; namespace++) {
-   for (int key = 0; key < 100; key++) {
-   totalSize++;
-   String state = String.valueOf(key * namespace);
-   if (random.nextBoolean()) {
-   stateMap.put(key, namespace, state);
-   } else {
-   assertNull(stateMap.putAndGetOld(key, 
namespace, state));
+   /**
+* Test remove and get existing state.
+*/
+   @Test
+   public void testRemoveAndGetExistingState() {
+   testRemoveState(false, true);
+   }
+
+   /**
+* Test remove absent state.
+*/
+   @Test
+   public void testRemoveAbsentState() {
+   testRemoveState(true, true);
+   }
+
+   /**
+* Test remove previously removed state.
+*/
+   @Test
+   public void testPutPreviouslyRemovedState() {
+   testWithFunction(
+   (totalSize, stateMap, referenceStates) -> 
applyFunctionAfterRemove(stateMap, referenceStates,
+   (removedCnt, removedStates) -> {
+   int size = totalSize - removedCnt;
+   for (Map.Entry> 
entry : removedStates.entrySet()) {
+   long namespace = entry.getKey();
+   for (int key : 
entry.getValue()) {
+   size++;
+   String state = 
String.valueOf(key * namespace);
+   
assertNull(stateMap.putAndGetOld(key, namespace, state));
+   
referenceStates.computeIfAbsent(namespace, (none) -> new HashMap<>()).put(key, 
String.valueOf(state));
+   }
+   }
+   return getDefaultSizes(size);
}
-   referenceStates.computeIfAbsent(namespace, 
(none) -> new HashMap<>()).put(key, state);
-   assertEquals(totalSize, stateMap.size());
-   assertEquals(totalSize, stateMap.totalSize());
-   }
-   }
+   )
+   );
+   }
 
-   // validates space allocation. Each pair need 2 spaces
-   assertEquals(totalSize * 2, 
spaceAllocator.getTotalSpaceNumber());
-   verifyState(referenceStates, stateMap);
+   private void testRemoveState(boolean removeAbsent, boolean getOld) {
+   testWithFunction(
+   (totalSize, stateMap, referenceStates) -> {
+   if (removeAbsent) {
+ 

[GitHub] [flink] tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend

2019-10-11 Thread GitBox
tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State 
Backends] Support on-disk state storage for spill-able heap backend
URL: https://github.com/apache/flink/pull/9501#discussion_r334032961
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-heap-spillable/src/test/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMapTest.java
 ##
 @@ -115,577 +144,783 @@ public void testInitStateMap() {
assertFalse(stateMap.getStateIncrementalVisitor(100).hasNext());
 
stateMap.close();
-   assertEquals(0, stateMap.size());
-   assertEquals(0, stateMap.totalSize());
-   assertTrue(stateMap.isClosed());
}
 
/**
-* Test basic operations.
+* Test state put operation.
 */
@Test
-   public void testBasicOperations() throws Exception {
-   TypeSerializer keySerializer = IntSerializer.INSTANCE;
-   TypeSerializer namespaceSerializer = 
LongSerializer.INSTANCE;
-   TypeSerializer stateSerializer = 
StringSerializer.INSTANCE;
-   CopyOnWriteSkipListStateMap stateMap = 
new CopyOnWriteSkipListStateMap<>(
-   keySerializer, namespaceSerializer, stateSerializer, 
spaceAllocator);
+   public void testPutState() {
+   testWithFunction((totalSize, stateMap, referenceStates) -> 
getDefaultSizes(totalSize));
+   }
 
-   ThreadLocalRandom random = ThreadLocalRandom.current();
-   // map to store expected states, namespace -> key -> state
-   Map> referenceStates = new 
HashMap<>();
-   int totalSize = 0;
+   /**
+* Test remove existing state.
+*/
+   @Test
+   public void testRemoveExistingState() {
+   testRemoveState(false, false);
+   }
 
-   // put some states
-   for (long namespace = 0; namespace < 10; namespace++) {
-   for (int key = 0; key < 100; key++) {
-   totalSize++;
-   String state = String.valueOf(key * namespace);
-   if (random.nextBoolean()) {
-   stateMap.put(key, namespace, state);
-   } else {
-   assertNull(stateMap.putAndGetOld(key, 
namespace, state));
+   /**
+* Test remove and get existing state.
+*/
+   @Test
+   public void testRemoveAndGetExistingState() {
+   testRemoveState(false, true);
+   }
+
+   /**
+* Test remove absent state.
+*/
+   @Test
+   public void testRemoveAbsentState() {
+   testRemoveState(true, true);
+   }
+
+   /**
+* Test remove previously removed state.
+*/
+   @Test
+   public void testPutPreviouslyRemovedState() {
+   testWithFunction(
+   (totalSize, stateMap, referenceStates) -> 
applyFunctionAfterRemove(stateMap, referenceStates,
+   (removedCnt, removedStates) -> {
+   int size = totalSize - removedCnt;
+   for (Map.Entry> 
entry : removedStates.entrySet()) {
+   long namespace = entry.getKey();
+   for (int key : 
entry.getValue()) {
+   size++;
+   String state = 
String.valueOf(key * namespace);
+   
assertNull(stateMap.putAndGetOld(key, namespace, state));
+   
referenceStates.computeIfAbsent(namespace, (none) -> new HashMap<>()).put(key, 
String.valueOf(state));
+   }
+   }
+   return getDefaultSizes(size);
}
-   referenceStates.computeIfAbsent(namespace, 
(none) -> new HashMap<>()).put(key, state);
-   assertEquals(totalSize, stateMap.size());
-   assertEquals(totalSize, stateMap.totalSize());
-   }
-   }
+   )
+   );
 
 Review comment:
   tbh I'm not sure whether I really understand what's going on here. All the 
different layers of functions which take other functions to do something makes 
it super hard to reason about the test case. Usually for tests I'm a big fan of 
explicitness.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use 

[GitHub] [flink] tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend

2019-10-11 Thread GitBox
tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State 
Backends] Support on-disk state storage for spill-able heap backend
URL: https://github.com/apache/flink/pull/9501#discussion_r334021079
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMap.java
 ##
 @@ -310,57 +299,21 @@ public S removeAndGetOld(K key, N namespace) {
 * @param keyByteBuffer byte buffer storing the key.
 * @param keyOffset offset of the key.
 * @param keyLenlength of the key.
-* @return id of the node. NIL_NODE will be returned if key does no 
exist.
+* @return the state. Null will be returned if key does not exist.
 */
@VisibleForTesting
-   long getNode(ByteBuffer keyByteBuffer, int keyOffset, int keyLen) {
-   int deleteCount = 0;
-   long prevNode = findPredecessor(keyByteBuffer, keyOffset, 1);
-   long currentNode  = helpGetNextNode(prevNode, 0);
-   long nextNode;
-
-   int c;
-   while (currentNode != NIL_NODE) {
-   nextNode = helpGetNextNode(currentNode, 0);
-
-   boolean isRemoved = isNodeRemoved(currentNode);
-   if (isRemoved) {
-   // remove the node physically when there is no 
snapshot running
-   if (highestRequiredSnapshotVersion == 0 && 
deleteCount < numKeysToDeleteOneTime) {
-   doPhysicalRemove(currentNode, prevNode, 
nextNode);
-   
logicallyRemovedNodes.remove(currentNode);
-   totalSize--;
-   deleteCount++;
-   } else {
-   prevNode = currentNode;
-   }
-   currentNode = nextNode;
-   continue;
-   }
-
-   c = compareByteBufferAndNode(keyByteBuffer, keyOffset, 
keyLen, currentNode);
-
-   // find the key
-   if (c == 0) {
-   return currentNode;
-   }
-
-   // the key is less than the current node, and nodes 
after current
-   // node can not be equal to the key.
-   if (c < 0) {
-   break;
-   }
-
-   prevNode = currentNode;
-   currentNode = helpGetNextNode(currentNode, 0);
-   }
-
-   return NIL_NODE;
+   S getNode(ByteBuffer keyByteBuffer, int keyOffset, int keyLen) {
+   Tuple4 result =  
iterateAndProcess(keyByteBuffer, keyOffset, keyLen,
+   (tuple3, isRemoved) -> {
 
 Review comment:
   Same here. It is super hard to know what's inside of `tuple3`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend

2019-10-11 Thread GitBox
tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State 
Backends] Support on-disk state storage for spill-able heap backend
URL: https://github.com/apache/flink/pull/9501#discussion_r334028341
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMap.java
 ##
 @@ -1367,6 +1317,14 @@ public Long next() {
}
}
 
+   private Tuple2 getNodeByteBufferAndOffset(long 
node) {
 
 Review comment:
   I would introduce a dedicated class for this instead of using the `Tuple2`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend

2019-10-11 Thread GitBox
tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State 
Backends] Support on-disk state storage for spill-able heap backend
URL: https://github.com/apache/flink/pull/9501#discussion_r334036060
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-heap-spillable/src/test/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMapTest.java
 ##
 @@ -115,577 +144,783 @@ public void testInitStateMap() {
assertFalse(stateMap.getStateIncrementalVisitor(100).hasNext());
 
stateMap.close();
-   assertEquals(0, stateMap.size());
-   assertEquals(0, stateMap.totalSize());
-   assertTrue(stateMap.isClosed());
}
 
/**
-* Test basic operations.
+* Test state put operation.
 */
@Test
-   public void testBasicOperations() throws Exception {
-   TypeSerializer keySerializer = IntSerializer.INSTANCE;
-   TypeSerializer namespaceSerializer = 
LongSerializer.INSTANCE;
-   TypeSerializer stateSerializer = 
StringSerializer.INSTANCE;
-   CopyOnWriteSkipListStateMap stateMap = 
new CopyOnWriteSkipListStateMap<>(
-   keySerializer, namespaceSerializer, stateSerializer, 
spaceAllocator);
+   public void testPutState() {
+   testWithFunction((totalSize, stateMap, referenceStates) -> 
getDefaultSizes(totalSize));
+   }
 
-   ThreadLocalRandom random = ThreadLocalRandom.current();
-   // map to store expected states, namespace -> key -> state
-   Map> referenceStates = new 
HashMap<>();
-   int totalSize = 0;
+   /**
+* Test remove existing state.
+*/
+   @Test
+   public void testRemoveExistingState() {
+   testRemoveState(false, false);
+   }
 
-   // put some states
-   for (long namespace = 0; namespace < 10; namespace++) {
-   for (int key = 0; key < 100; key++) {
-   totalSize++;
-   String state = String.valueOf(key * namespace);
-   if (random.nextBoolean()) {
-   stateMap.put(key, namespace, state);
-   } else {
-   assertNull(stateMap.putAndGetOld(key, 
namespace, state));
+   /**
+* Test remove and get existing state.
+*/
+   @Test
+   public void testRemoveAndGetExistingState() {
+   testRemoveState(false, true);
+   }
+
+   /**
+* Test remove absent state.
+*/
+   @Test
+   public void testRemoveAbsentState() {
+   testRemoveState(true, true);
+   }
+
+   /**
+* Test remove previously removed state.
+*/
+   @Test
+   public void testPutPreviouslyRemovedState() {
+   testWithFunction(
+   (totalSize, stateMap, referenceStates) -> 
applyFunctionAfterRemove(stateMap, referenceStates,
+   (removedCnt, removedStates) -> {
+   int size = totalSize - removedCnt;
+   for (Map.Entry> 
entry : removedStates.entrySet()) {
+   long namespace = entry.getKey();
+   for (int key : 
entry.getValue()) {
+   size++;
+   String state = 
String.valueOf(key * namespace);
+   
assertNull(stateMap.putAndGetOld(key, namespace, state));
+   
referenceStates.computeIfAbsent(namespace, (none) -> new HashMap<>()).put(key, 
String.valueOf(state));
+   }
+   }
+   return getDefaultSizes(size);
}
-   referenceStates.computeIfAbsent(namespace, 
(none) -> new HashMap<>()).put(key, state);
-   assertEquals(totalSize, stateMap.size());
-   assertEquals(totalSize, stateMap.totalSize());
-   }
-   }
+   )
+   );
+   }
 
-   // validates space allocation. Each pair need 2 spaces
-   assertEquals(totalSize * 2, 
spaceAllocator.getTotalSpaceNumber());
-   verifyState(referenceStates, stateMap);
+   private void testRemoveState(boolean removeAbsent, boolean getOld) {
+   testWithFunction(
+   (totalSize, stateMap, referenceStates) -> {
+   if (removeAbsent) {
+ 

[GitHub] [flink] tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend

2019-10-11 Thread GitBox
tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State 
Backends] Support on-disk state storage for spill-able heap backend
URL: https://github.com/apache/flink/pull/9501#discussion_r334021831
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMap.java
 ##
 @@ -371,72 +324,44 @@ long getNode(ByteBuffer keyByteBuffer, int keyOffset, 
int keyLen) {
 */
@VisibleForTesting
S putNode(ByteBuffer keyByteBuffer, int keyOffset, int keyLen, byte[] 
value, boolean returnOldState) {
-   int deleteCount = 0;
-   long prevNode = findPredecessor(keyByteBuffer, keyOffset, 1);
-   long currentNode = helpGetNextNode(prevNode, 0);
-   long nextNode;
-
-   int c;
-   for ( ; ; ) {
-   if (currentNode != NIL_NODE) {
-   nextNode = helpGetNextNode(currentNode, 0);
+   Tuple4 result =  
iterateAndProcess(keyByteBuffer, keyOffset, keyLen,
+   (tuple3, isRemoved) -> {
 
 Review comment:
   I would suggest to factor this out in a separate method


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend

2019-10-11 Thread GitBox
tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State 
Backends] Support on-disk state storage for spill-able heap backend
URL: https://github.com/apache/flink/pull/9501#discussion_r334028713
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMap.java
 ##
 @@ -1524,4 +1481,31 @@ public void update(StateEntry stateEntry, S 
newValue) {
}
}
 
+   /**
+* Iterate versions of the given node.
+*/
+   class ValueVersionIterator implements Iterator {
+   long valuePointer;
 
 Review comment:
   `private`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend

2019-10-11 Thread GitBox
tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State 
Backends] Support on-disk state storage for spill-able heap backend
URL: https://github.com/apache/flink/pull/9501#discussion_r334024027
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMap.java
 ##
 @@ -472,6 +397,74 @@ S putNode(ByteBuffer keyByteBuffer, int keyOffset, int 
keyLen, byte[] value, boo
 * @return the old state. Null will be returned if key does not exist 
or returnOldState is false.
 */
private S removeNode(ByteBuffer keyByteBuffer, int keyOffset, int 
keyLen, boolean returnOldState) {
+   Tuple4 result = 
iterateAndProcess(keyByteBuffer, keyOffset, keyLen,
+   (tuple3, isRemoved) -> {
+   long prevNode = tuple3.f0;
+   long currentNode = tuple3.f1;
+   long nextNode = tuple3.f2;
+   // if the node has been logically removed, and 
can not be physically
+   // removed here, just return null
+   if (isRemoved && highestRequiredSnapshotVersion 
!= 0) {
+   return null;
+   }
+
+   long oldValuePointer;
+   boolean oldValueNeedFree;
+
+   if (highestRequiredSnapshotVersion == 0) {
+   // do physically remove only when there 
is no snapshot running
+   oldValuePointer = 
doPhysicalRemoveAndGetValue(currentNode, prevNode, nextNode);
+   // the node has been logically removed, 
and remove it from the set
+   if (isRemoved) {
+   
logicallyRemovedNodes.remove(currentNode);
+   }
+   oldValueNeedFree = true;
+   } else {
+   int version = 
SkipListUtils.helpGetNodeLatestVersion(currentNode, spaceAllocator);
+   if (version < 
highestRequiredSnapshotVersion) {
+   // the newest-version value may 
be used by snapshots, and update it with copy-on-write
+   oldValuePointer = 
updateValueWithCopyOnWrite(currentNode, null);
+   oldValueNeedFree = false;
+   } else {
+   // replace the newest-version 
value.
+   oldValuePointer = 
updateValueWithReplace(currentNode, null);
+   oldValueNeedFree = true;
+   }
+
+   helpSetNodeStatus(currentNode, 
NodeStatus.REMOVE);
+   logicallyRemovedNodes.add(currentNode);
+   }
+
+   S oldState = null;
+   if (returnOldState) {
+   oldState = 
helpGetState(oldValuePointer);
+   }
+
+   if (oldValueNeedFree) {
+   spaceAllocator.free(oldValuePointer);
+   }
+
+   return oldState;
+   });
+   return result.f2 ? result.f3 : null;
+   }
+
+   /**
+* Iterator the skip list and perform given function.
+*
+* @param keyByteBuffer byte buffer storing the key.
+* @param keyOffset offset of the key.
+* @param keyLen length of the key.
+* @param function the function to apply when the skip list contains 
the given key, which accepts two
+* parameters: a tuple3 of [previous_node, 
current_node, next_node] and a boolean indicating
+* whether the node with same key has been logically 
removed, and returns a state.
+* @return a tuple4 of [previous_node, current_node, key_found, 
state_by_applying_function]
+*/
+   private Tuple4 iterateAndProcess(
 
 Review comment:
   Does this method really iterates the nodes? It looks more like a 
`findNodeAndApply` because after we have found the entry, we stop the iteration.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about 

[GitHub] [flink] tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend

2019-10-11 Thread GitBox
tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State 
Backends] Support on-disk state storage for spill-able heap backend
URL: https://github.com/apache/flink/pull/9501#discussion_r334022091
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMap.java
 ##
 @@ -472,6 +397,74 @@ S putNode(ByteBuffer keyByteBuffer, int keyOffset, int 
keyLen, byte[] value, boo
 * @return the old state. Null will be returned if key does not exist 
or returnOldState is false.
 */
private S removeNode(ByteBuffer keyByteBuffer, int keyOffset, int 
keyLen, boolean returnOldState) {
+   Tuple4 result = 
iterateAndProcess(keyByteBuffer, keyOffset, keyLen,
+   (tuple3, isRemoved) -> {
 
 Review comment:
   I would suggest to factor this method out in a separate method.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend

2019-10-11 Thread GitBox
tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State 
Backends] Support on-disk state storage for spill-able heap backend
URL: https://github.com/apache/flink/pull/9501#discussion_r334042929
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-heap-spillable/src/test/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMapTest.java
 ##
 @@ -115,577 +144,783 @@ public void testInitStateMap() {
assertFalse(stateMap.getStateIncrementalVisitor(100).hasNext());
 
stateMap.close();
-   assertEquals(0, stateMap.size());
-   assertEquals(0, stateMap.totalSize());
-   assertTrue(stateMap.isClosed());
}
 
/**
-* Test basic operations.
+* Test state put operation.
 */
@Test
-   public void testBasicOperations() throws Exception {
-   TypeSerializer keySerializer = IntSerializer.INSTANCE;
-   TypeSerializer namespaceSerializer = 
LongSerializer.INSTANCE;
-   TypeSerializer stateSerializer = 
StringSerializer.INSTANCE;
-   CopyOnWriteSkipListStateMap stateMap = 
new CopyOnWriteSkipListStateMap<>(
-   keySerializer, namespaceSerializer, stateSerializer, 
spaceAllocator);
+   public void testPutState() {
+   testWithFunction((totalSize, stateMap, referenceStates) -> 
getDefaultSizes(totalSize));
+   }
 
-   ThreadLocalRandom random = ThreadLocalRandom.current();
-   // map to store expected states, namespace -> key -> state
-   Map> referenceStates = new 
HashMap<>();
-   int totalSize = 0;
+   /**
+* Test remove existing state.
+*/
+   @Test
+   public void testRemoveExistingState() {
+   testRemoveState(false, false);
+   }
 
-   // put some states
-   for (long namespace = 0; namespace < 10; namespace++) {
-   for (int key = 0; key < 100; key++) {
-   totalSize++;
-   String state = String.valueOf(key * namespace);
-   if (random.nextBoolean()) {
-   stateMap.put(key, namespace, state);
-   } else {
-   assertNull(stateMap.putAndGetOld(key, 
namespace, state));
+   /**
+* Test remove and get existing state.
+*/
+   @Test
+   public void testRemoveAndGetExistingState() {
+   testRemoveState(false, true);
+   }
+
+   /**
+* Test remove absent state.
+*/
+   @Test
+   public void testRemoveAbsentState() {
+   testRemoveState(true, true);
+   }
+
+   /**
+* Test remove previously removed state.
+*/
+   @Test
+   public void testPutPreviouslyRemovedState() {
+   testWithFunction(
+   (totalSize, stateMap, referenceStates) -> 
applyFunctionAfterRemove(stateMap, referenceStates,
+   (removedCnt, removedStates) -> {
+   int size = totalSize - removedCnt;
+   for (Map.Entry> 
entry : removedStates.entrySet()) {
+   long namespace = entry.getKey();
+   for (int key : 
entry.getValue()) {
+   size++;
+   String state = 
String.valueOf(key * namespace);
+   
assertNull(stateMap.putAndGetOld(key, namespace, state));
+   
referenceStates.computeIfAbsent(namespace, (none) -> new HashMap<>()).put(key, 
String.valueOf(state));
+   }
+   }
+   return getDefaultSizes(size);
}
-   referenceStates.computeIfAbsent(namespace, 
(none) -> new HashMap<>()).put(key, state);
-   assertEquals(totalSize, stateMap.size());
-   assertEquals(totalSize, stateMap.totalSize());
-   }
-   }
+   )
+   );
+   }
 
-   // validates space allocation. Each pair need 2 spaces
-   assertEquals(totalSize * 2, 
spaceAllocator.getTotalSpaceNumber());
-   verifyState(referenceStates, stateMap);
+   private void testRemoveState(boolean removeAbsent, boolean getOld) {
+   testWithFunction(
+   (totalSize, stateMap, referenceStates) -> {
+   if (removeAbsent) {
+ 

[GitHub] [flink] tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend

2019-10-11 Thread GitBox
tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State 
Backends] Support on-disk state storage for spill-able heap backend
URL: https://github.com/apache/flink/pull/9501#discussion_r334025401
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMap.java
 ##
 @@ -590,10 +542,9 @@ private int compareByteBufferAndNode(ByteBuffer 
keyByteBuffer, int keyOffset, in
 * equal to, or greater than the second.
 */
private int compareNamespaceAndNode(ByteBuffer namespaceByteBuffer, int 
namespaceOffset, int namespaceLen, long targetNode) {
-   Chunk chunk = 
spaceAllocator.getChunkById(SpaceUtils.getChunkIdByAddress(targetNode));
-   int offsetInChunk = 
SpaceUtils.getChunkOffsetByAddress(targetNode);
-   ByteBuffer targetKeyByteBuffer = 
chunk.getByteBuffer(offsetInChunk);
-   int offsetInByteBuffer = 
chunk.getOffsetInByteBuffer(offsetInChunk);
+   Tuple2 tuple2 = 
getNodeByteBufferAndOffset(targetNode);
+   ByteBuffer targetKeyByteBuffer = tuple2.f0;
+   int offsetInByteBuffer = tuple2.f1;
 
 Review comment:
   Hmm it looks a bit strange that we introduced `getNodeByteBufferAndOffset` 
which returns a `ByteBuffer` and its offset together just to unwrap it here and 
then passing it individually into `SkipListUtils.getLevel`. The same applies to 
the call into `getValuePointer`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


  1   2   3   4   5   >