[GitHub] [flink] flinkbot commented on issue #9738: [FLINK-13746]Whitelist [Terror] to avoid end to end test failure in e…

2019-09-20 Thread GitBox
flinkbot commented on issue #9738: [FLINK-13746]Whitelist [Terror] to avoid end 
to end test failure in e…
URL: https://github.com/apache/flink/pull/9738#issuecomment-533770354
 
 
   
   ## CI report:
   
   * b13b6edb9952c74ba5b68d0f64bd6393534c73ea : UNKNOWN
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-13746) Elasticsearch (v2.3.5) sink end-to-end test fails on Travis

2019-09-20 Thread Zijie Lu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-13746?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16934921#comment-16934921
 ] 

Zijie Lu commented on FLINK-13746:
--

I have submitted a pull request [https://github.com/apache/flink/pull/9738] . 
[~trohrmann]

 

> Elasticsearch (v2.3.5) sink end-to-end test fails on Travis
> ---
>
> Key: FLINK-13746
> URL: https://issues.apache.org/jira/browse/FLINK-13746
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / ElasticSearch, Tests
>Affects Versions: 1.9.0, 1.10.0
>Reporter: Till Rohrmann
>Assignee: Zijie Lu
>Priority: Critical
>  Labels: pull-request-available, test-stability
> Fix For: 1.10.0, 1.9.1
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> The {{Elasticsearch (v2.3.5) sink end-to-end test}} fails on Travis because 
> it logs contain the following line:
> {code}
> INFO  org.elasticsearch.plugins - [Terror] modules [], plugins [], sites []
> {code}
> Due to this, the error check is triggered.
> https://api.travis-ci.org/v3/job/572255901/log.txt



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot commented on issue #9738: [FLINK-13746]Whitelist [Terror] to avoid end to end test failure in e…

2019-09-20 Thread GitBox
flinkbot commented on issue #9738: [FLINK-13746]Whitelist [Terror] to avoid end 
to end test failure in e…
URL: https://github.com/apache/flink/pull/9738#issuecomment-533770121
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Automated Checks
   Last check on commit b13b6edb9952c74ba5b68d0f64bd6393534c73ea (Sat Sep 21 
05:38:12 UTC 2019)
   
   **Warnings:**
* No documentation files were touched! Remember to keep the Flink docs up 
to date!
   
   
   Mention the bot in a comment to re-run the automated checks.
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full 
explanation of the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] TszKitLo40 opened a new pull request #9738: [FLINK-13746]Whitelist [Terror] to avoid end to end test failure in e…

2019-09-20 Thread GitBox
TszKitLo40 opened a new pull request #9738: [FLINK-13746]Whitelist [Terror] to 
avoid end to end test failure in e…
URL: https://github.com/apache/flink/pull/9738
 
 
   …s 2.3.5
   
   
   
   ## What is the purpose of the change
   
   *(For example: This pull request makes task deployment go through the blob 
server, rather than through RPC. That way we avoid re-transferring them on each 
deployment (during recovery).)*
   
   
   ## Brief change log
   
   *(for example:)*
 - *The TaskInfo is stored in the blob store on job creation time as a 
persistent artifact*
 - *Deployments RPC transmits only the blob storage reference*
 - *TaskManagers retrieve the TaskInfo from the blob cache*
   
   
   ## Verifying this change
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This change is already covered by existing tests, such as *(please describe 
tests)*.
   
   *(or)*
   
   This change added tests and can be verified as follows:
   
   *(example:)*
 - *Added integration tests for end-to-end deployment with large payloads 
(100MB)*
 - *Extended integration test for recovery after master (JobManager) 
failure*
 - *Added test that validates that TaskInfo is transferred only once across 
recoveries*
 - *Manually verified the change by running a 4 node cluser with 2 
JobManagers and 4 TaskManagers, a stateful streaming program, and killing one 
JobManager and two TaskManagers during the execution, verifying that recovery 
happens correctly.*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / no)
 - The serializers: (yes / no / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / no / 
don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / no / don't know)
 - The S3 file system connector: (yes / no / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / no)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-13746) Elasticsearch (v2.3.5) sink end-to-end test fails on Travis

2019-09-20 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-13746?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-13746:
---
Labels: pull-request-available test-stability  (was: test-stability)

> Elasticsearch (v2.3.5) sink end-to-end test fails on Travis
> ---
>
> Key: FLINK-13746
> URL: https://issues.apache.org/jira/browse/FLINK-13746
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / ElasticSearch, Tests
>Affects Versions: 1.9.0, 1.10.0
>Reporter: Till Rohrmann
>Assignee: Zijie Lu
>Priority: Critical
>  Labels: pull-request-available, test-stability
> Fix For: 1.10.0, 1.9.1
>
>
> The {{Elasticsearch (v2.3.5) sink end-to-end test}} fails on Travis because 
> it logs contain the following line:
> {code}
> INFO  org.elasticsearch.plugins - [Terror] modules [], plugins [], sites []
> {code}
> Due to this, the error check is triggered.
> https://api.travis-ci.org/v3/job/572255901/log.txt



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] lirui-apache commented on a change in pull request #9721: [FLINK-14129][hive] HiveTableSource should implement ProjectableTable…

2019-09-20 Thread GitBox
lirui-apache commented on a change in pull request #9721: [FLINK-14129][hive] 
HiveTableSource should implement ProjectableTable…
URL: https://github.com/apache/flink/pull/9721#discussion_r326846024
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableInputFormat.java
 ##
 @@ -203,30 +210,40 @@ protected void fetchNext() throws IOException {
}
 
@Override
-   public Row nextRecord(Row ignore) throws IOException {
+   public Row nextRecord(Row reuse) throws IOException {
if (reachedEnd()) {
return null;
}
-   Row row = new Row(rowArity);
try {
//Use HiveDeserializer to deserialize an object out of 
a Writable blob
Object hiveRowStruct = deserializer.deserialize(value);
-   int index = 0;
-   for (; index < structFields.size(); index++) {
-   StructField structField = 
structFields.get(index);
-   Object object = 
HiveInspectors.toFlinkObject(structField.getFieldObjectInspector(),
-   
structObjectInspector.getStructFieldData(hiveRowStruct, structField));
-   row.setField(index, object);
-   }
-   for (String partition : partitionColNames){
-   row.setField(index++, 
hiveTablePartition.getPartitionSpec().get(partition));
+   for (int i = 0; i < fields.length; i++) {
+   // set non-partition columns
+   if (fields[i] < structFields.size()) {
+   StructField structField = 
structFields.get(fields[i]);
+   Object object = 
HiveInspectors.toFlinkObject(structField.getFieldObjectInspector(),
+   
structObjectInspector.getStructFieldData(hiveRowStruct, structField));
+   reuse.setField(i, object);
+   }
}
-   } catch (Exception e){
+   } catch (Exception e) {
logger.error("Error happens when converting hive data 
type to flink data type.");
throw new FlinkHiveException(e);
}
+   if (!rowReused) {
 
 Review comment:
   > it's always triggered for the first row, but still have to be checked for 
all other remaining rows regardless?
   
   Yes. But it's still slightly better than populating each partition columns 
for a row.
   
   > maybe we can add a Row member variable to HiveTableInputFormet, and 
precompute and set partition values for it based on projection, and just reuse 
that row in nextRecord()?
   
   That's possible. My concern is we don't have guarantee that the fields of 
the row we return in `nextRecord` won't be manipulated by the framework or 
downstream operators. Although both solutions have this problem, the current 
implementation is a little bit more "compliant" with the interface contract -- 
that the Row parameter is the instance to be reused.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #9416: [FLINK-13646][build system] Add basic ARM CI job definition

2019-09-20 Thread GitBox
flinkbot edited a comment on issue #9416: [FLINK-13646][build system] Add basic 
ARM CI job definition
URL: https://github.com/apache/flink/pull/9416#issuecomment-520285701
 
 
   
   ## CI report:
   
   * dbf749f0d0c3167265287b182dd1ea7b4aed84c1 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/122738631)
   * 141896dbe8372442f95f7b6425b23a7cbb4cb624 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/122757944)
   * df40969ada1c232fb00e6aea33f9a1c88b07855c : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/124324441)
   * 39cc2828bc5b12844736bbfe41e1dcf4f2795c24 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/125522179)
   * d95749e9417977ba20fe7e1486cbeaa951cba1db : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/126026434)
   * 43a27e72238f9e06b4e9b89f2d366eba7cbd732f : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/128592640)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #9416: [FLINK-13646][build system] Add basic ARM CI job definition

2019-09-20 Thread GitBox
flinkbot edited a comment on issue #9416: [FLINK-13646][build system] Add basic 
ARM CI job definition
URL: https://github.com/apache/flink/pull/9416#issuecomment-520285701
 
 
   
   ## CI report:
   
   * dbf749f0d0c3167265287b182dd1ea7b4aed84c1 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/122738631)
   * 141896dbe8372442f95f7b6425b23a7cbb4cb624 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/122757944)
   * df40969ada1c232fb00e6aea33f9a1c88b07855c : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/124324441)
   * 39cc2828bc5b12844736bbfe41e1dcf4f2795c24 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/125522179)
   * d95749e9417977ba20fe7e1486cbeaa951cba1db : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/126026434)
   * 43a27e72238f9e06b4e9b89f2d366eba7cbd732f : PENDING 
[Build](https://travis-ci.com/flink-ci/flink/builds/128592640)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-14159) flink rocksdb StreamCompressionDecorator not right

2019-09-20 Thread jackylau (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14159?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

jackylau updated FLINK-14159:
-
Description: 
I think the current flink rocksdb StreamCompressionDecorator is not right when 
calling method 

getCompressionDecorator(executionConfig) ,which defalut value is false.That is 
to say, current compression is none.But I find rocksdb  using 
{{options.compression}} to specify the compression to use. By default it is 
Snappy, which you can see here 
[https://github.com/facebook/rocksdb/wiki/RocksDB-Tuning-Guide]. And I use 
rocksdb tool sstdump to find it is indeed snappy compression.

So I think it should be return SnappyStreamCompressionDecorator.INSTANCE  
rather than getCompressionDecorator( executionConfig) 

Coud i commit a PR?

  was:
I think the current flink rocksdb StreamCompressionDecorator is not right 
calling method 

getCompressionDecorator(executionConfig) which defalut value is false.That is 
to say, current compression is none.But I find rocksdb  using 
{{options.compression}} to specify the compression to use. By default it is 
Snappy, which you can see here 
[https://github.com/facebook/rocksdb/wiki/RocksDB-Tuning-Guide]. And I use 
rocksdb tool sstdump to find it is indeed snappy compression.

So I think it should be return SnappyStreamCompressionDecorator.INSTANCE  
rather than getCompressionDecorator( executionConfig) 

Coud i commit a PR?


> flink rocksdb StreamCompressionDecorator not right
> --
>
> Key: FLINK-14159
> URL: https://issues.apache.org/jira/browse/FLINK-14159
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Affects Versions: 1.9.0
>Reporter: jackylau
>Priority: Major
> Fix For: 1.10.0
>
>
> I think the current flink rocksdb StreamCompressionDecorator is not right 
> when calling method 
> getCompressionDecorator(executionConfig) ,which defalut value is false.That 
> is to say, current compression is none.But I find rocksdb  using 
> {{options.compression}} to specify the compression to use. By default it is 
> Snappy, which you can see here 
> [https://github.com/facebook/rocksdb/wiki/RocksDB-Tuning-Guide]. And I use 
> rocksdb tool sstdump to find it is indeed snappy compression.
> So I think it should be return SnappyStreamCompressionDecorator.INSTANCE  
> rather than getCompressionDecorator( executionConfig) 
> Coud i commit a PR?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-14159) flink rocksdb StreamCompressionDecorator not right

2019-09-20 Thread jackylau (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16934884#comment-16934884
 ] 

jackylau edited comment on FLINK-14159 at 9/21/19 2:36 AM:
---

I want to konw who is responsible for , and who can see this issue 
[~lzljs3620320] [~jark]


was (Author: jackylau):
who is responsible for , and who can see this issue [~lzljs3620320]

> flink rocksdb StreamCompressionDecorator not right
> --
>
> Key: FLINK-14159
> URL: https://issues.apache.org/jira/browse/FLINK-14159
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Affects Versions: 1.9.0
>Reporter: jackylau
>Priority: Major
> Fix For: 1.10.0
>
>
> I think the current flink rocksdb StreamCompressionDecorator is not right 
> calling method 
> getCompressionDecorator(executionConfig) which defalut value is false.That is 
> to say, current compression is none.But I find rocksdb  using 
> {{options.compression}} to specify the compression to use. By default it is 
> Snappy, which you can see here 
> [https://github.com/facebook/rocksdb/wiki/RocksDB-Tuning-Guide]. And I use 
> rocksdb tool sstdump to find it is indeed snappy compression.
> So I think it should be return SnappyStreamCompressionDecorator.INSTANCE  
> rather than getCompressionDecorator( executionConfig) 
> Coud i commit a PR?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14159) flink rocksdb StreamCompressionDecorator not right

2019-09-20 Thread jackylau (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16934884#comment-16934884
 ] 

jackylau commented on FLINK-14159:
--

who is responsible for , and who can see this issue [~lzljs3620320]

> flink rocksdb StreamCompressionDecorator not right
> --
>
> Key: FLINK-14159
> URL: https://issues.apache.org/jira/browse/FLINK-14159
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Affects Versions: 1.9.0
>Reporter: jackylau
>Priority: Major
> Fix For: 1.10.0
>
>
> I think the current flink rocksdb StreamCompressionDecorator is not right 
> calling method 
> getCompressionDecorator(executionConfig) which defalut value is false.That is 
> to say, current compression is none.But I find rocksdb  using 
> {{options.compression}} to specify the compression to use. By default it is 
> Snappy, which you can see here 
> [https://github.com/facebook/rocksdb/wiki/RocksDB-Tuning-Guide]. And I use 
> rocksdb tool sstdump to find it is indeed snappy compression.
> So I think it should be return SnappyStreamCompressionDecorator.INSTANCE  
> rather than getCompressionDecorator( executionConfig) 
> Coud i commit a PR?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot edited a comment on issue #9416: [FLINK-13646][build system] Add basic ARM CI job definition

2019-09-20 Thread GitBox
flinkbot edited a comment on issue #9416: [FLINK-13646][build system] Add basic 
ARM CI job definition
URL: https://github.com/apache/flink/pull/9416#issuecomment-520285701
 
 
   
   ## CI report:
   
   * dbf749f0d0c3167265287b182dd1ea7b4aed84c1 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/122738631)
   * 141896dbe8372442f95f7b6425b23a7cbb4cb624 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/122757944)
   * df40969ada1c232fb00e6aea33f9a1c88b07855c : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/124324441)
   * 39cc2828bc5b12844736bbfe41e1dcf4f2795c24 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/125522179)
   * d95749e9417977ba20fe7e1486cbeaa951cba1db : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/126026434)
   * 43a27e72238f9e06b4e9b89f2d366eba7cbd732f : UNKNOWN
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-14159) flink rocksdb StreamCompressionDecorator not right

2019-09-20 Thread jackylau (Jira)
jackylau created FLINK-14159:


 Summary: flink rocksdb StreamCompressionDecorator not right
 Key: FLINK-14159
 URL: https://issues.apache.org/jira/browse/FLINK-14159
 Project: Flink
  Issue Type: Bug
  Components: Runtime / State Backends
Affects Versions: 1.9.0
Reporter: jackylau
 Fix For: 1.10.0


I think the current flink rocksdb StreamCompressionDecorator is not right 
calling method 

getCompressionDecorator(executionConfig) which defalut value is false.That is 
to say, current compression is none.But I find rocksdb  using 
{{options.compression}} to specify the compression to use. By default it is 
Snappy, which you can see here 
[https://github.com/facebook/rocksdb/wiki/RocksDB-Tuning-Guide]. And I use 
rocksdb tool sstdump to find it is indeed snappy compression.

So I think it should be return SnappyStreamCompressionDecorator.INSTANCE  
rather than getCompressionDecorator( executionConfig) 

Coud i commit a PR?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] wangxiyuan commented on issue #9416: [FLINK-13646][build system] Add basic ARM CI job definition

2019-09-20 Thread GitBox
wangxiyuan commented on issue #9416: [FLINK-13646][build system] Add basic ARM 
CI job definition
URL: https://github.com/apache/flink/pull/9416#issuecomment-533758675
 
 
   check_arm


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #9737: [FLINK-14158] Update Mesos scheduler to make leaseOfferExpiration and declinedOfferRefuse duration configurable

2019-09-20 Thread GitBox
flinkbot edited a comment on issue #9737: [FLINK-14158] Update Mesos scheduler 
to make leaseOfferExpiration and declinedOfferRefuse duration configurable
URL: https://github.com/apache/flink/pull/9737#issuecomment-533692308
 
 
   
   ## CI report:
   
   * 6570149a99fac86e45cc3b0c9a83b32ff7b32365 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/128567899)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #9737: [FLINK-14158] Update Mesos scheduler to make leaseOfferExpiration and declinedOfferRefuse duration configurable

2019-09-20 Thread GitBox
flinkbot edited a comment on issue #9737: [FLINK-14158] Update Mesos scheduler 
to make leaseOfferExpiration and declinedOfferRefuse duration configurable
URL: https://github.com/apache/flink/pull/9737#issuecomment-533692308
 
 
   
   ## CI report:
   
   * 6570149a99fac86e45cc3b0c9a83b32ff7b32365 : PENDING 
[Build](https://travis-ci.com/flink-ci/flink/builds/128567899)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on a change in pull request #9721: [FLINK-14129][hive] HiveTableSource should implement ProjectableTable…

2019-09-20 Thread GitBox
bowenli86 commented on a change in pull request #9721: [FLINK-14129][hive] 
HiveTableSource should implement ProjectableTable…
URL: https://github.com/apache/flink/pull/9721#discussion_r326790235
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableInputFormat.java
 ##
 @@ -203,30 +210,40 @@ protected void fetchNext() throws IOException {
}
 
@Override
-   public Row nextRecord(Row ignore) throws IOException {
+   public Row nextRecord(Row reuse) throws IOException {
if (reachedEnd()) {
return null;
}
-   Row row = new Row(rowArity);
try {
//Use HiveDeserializer to deserialize an object out of 
a Writable blob
Object hiveRowStruct = deserializer.deserialize(value);
-   int index = 0;
-   for (; index < structFields.size(); index++) {
-   StructField structField = 
structFields.get(index);
-   Object object = 
HiveInspectors.toFlinkObject(structField.getFieldObjectInspector(),
-   
structObjectInspector.getStructFieldData(hiveRowStruct, structField));
-   row.setField(index, object);
-   }
-   for (String partition : partitionColNames){
-   row.setField(index++, 
hiveTablePartition.getPartitionSpec().get(partition));
+   for (int i = 0; i < fields.length; i++) {
+   // set non-partition columns
+   if (fields[i] < structFields.size()) {
+   StructField structField = 
structFields.get(fields[i]);
+   Object object = 
HiveInspectors.toFlinkObject(structField.getFieldObjectInspector(),
+   
structObjectInspector.getStructFieldData(hiveRowStruct, structField));
+   reuse.setField(i, object);
+   }
}
-   } catch (Exception e){
+   } catch (Exception e) {
logger.error("Error happens when converting hive data 
type to flink data type.");
throw new FlinkHiveException(e);
}
+   if (!rowReused) {
 
 Review comment:
   It seems to be initialization logic to me, as it's always triggered for the 
first row, but still have to be checked for all other remaining rows regardless?
   
   maybe we can add a `Row` member variable to HiveTableInputFormet, and 
precompute and set partition values for it based on projection, and just reuse 
that row in `nextRecord()`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot commented on issue #9737: [FLINK-14158] Update Mesos scheduler to make leaseOfferExpiration and declinedOfferRefuse duration configurable

2019-09-20 Thread GitBox
flinkbot commented on issue #9737: [FLINK-14158] Update Mesos scheduler to make 
leaseOfferExpiration and declinedOfferRefuse duration configurable
URL: https://github.com/apache/flink/pull/9737#issuecomment-533692308
 
 
   
   ## CI report:
   
   * 6570149a99fac86e45cc3b0c9a83b32ff7b32365 : UNKNOWN
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] kl0u commented on issue #9581: [FLINK-13864][streaming]: Modify the StreamingFileSink Builder interface to allow for easier subclassing of StreamingFileSink

2019-09-20 Thread GitBox
kl0u commented on issue #9581: [FLINK-13864][streaming]: Modify the 
StreamingFileSink Builder interface to allow for easier subclassing of 
StreamingFileSink 
URL: https://github.com/apache/flink/pull/9581#issuecomment-533688638
 
 
   @tweise It would be nice if it can be addressed before merging. I was 
thinking that I will be the one to merge this PR so I was planning to do it 
then but if you are willing to do it, it would be much appreciated if you could 
add the two separate constructors (one for row and one for bulk) before merging.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot commented on issue #9737: [FLINK-14158] Update Mesos scheduler to make leaseOfferExpiration and declinedOfferRefuse duration configurable

2019-09-20 Thread GitBox
flinkbot commented on issue #9737: [FLINK-14158] Update Mesos scheduler to make 
leaseOfferExpiration and declinedOfferRefuse duration configurable
URL: https://github.com/apache/flink/pull/9737#issuecomment-533687706
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Automated Checks
   Last check on commit 6570149a99fac86e45cc3b0c9a83b32ff7b32365 (Fri Sep 20 
19:51:24 UTC 2019)
   
   **Warnings:**
* No documentation files were touched! Remember to keep the Flink docs up 
to date!
* **This pull request references an unassigned [Jira 
ticket](https://issues.apache.org/jira/browse/FLINK-14158).** According to the 
[code contribution 
guide](https://flink.apache.org/contributing/contribute-code.html), tickets 
need to be assigned before starting with the implementation work.
   
   
   Mention the bot in a comment to re-run the automated checks.
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full 
explanation of the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-14158) Update Mesos configs to add leaseOfferExpiration and declinedOfferRefuse durations

2019-09-20 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14158?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-14158:
---
Labels: pull-request-available  (was: )

> Update Mesos configs to add leaseOfferExpiration and declinedOfferRefuse 
> durations
> --
>
> Key: FLINK-14158
> URL: https://issues.apache.org/jira/browse/FLINK-14158
> Project: Flink
>  Issue Type: Bug
>Reporter: Piyush Narang
>Priority: Minor
>  Labels: pull-request-available
>
> While debugging some Flink on Mesos scheduling issues (tied to our use of 
> Mesos quotas) we end up getting skewed offers that are useless fairly often. 
> As we are not rejecting these offers fast enough and as we are not telling 
> Mesos to not re-send for a long enough period, we end up not being able to 
> schedule our job for upwards of an hour (~30 Mesos containers). 
> The Fenzo default is to reject expired and unused Mesos offers after 120s, 
> this can be overridden using their TaskScheduler builder. Additionally, Mesos 
> allows us to override the time for which it won't re-send offers (default is 
> 5s). We found that updating to reject more aggressively (every 1s instead of 
> 120s) and keeping rejected offers away for longer (60s instead of 5s) 
> dramatically increases our chances of scheduling our jobs on Mesos. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] piyushnarang opened a new pull request #9737: [FLINK-14158] Update Mesos scheduler to make leaseOfferExpiration and declinedOfferRefuse duration configurable

2019-09-20 Thread GitBox
piyushnarang opened a new pull request #9737: [FLINK-14158] Update Mesos 
scheduler to make leaseOfferExpiration and declinedOfferRefuse duration 
configurable
URL: https://github.com/apache/flink/pull/9737
 
 
   
   
   ## What is the purpose of the change
   
   While debugging some Flink on Mesos scheduling issues (tied to our use of 
Mesos quotas) we end up getting skewed offers that are useless fairly often. As 
we are not rejecting these offers fast enough and as we are not telling Mesos 
to not re-send for a long enough period, we end up not being able to schedule 
our job for upwards of an hour (~30 Mesos containers).
   
   The Fenzo default is to reject expired and unused Mesos offers after 120s, 
this can be overridden using their TaskScheduler builder. Additionally, Mesos 
allows us to override the time for which it won't re-send offers (default is 
5s). We found that updating to reject more aggressively (every 1s instead of 
120s) and keeping rejected offers away for longer (60s instead of 5s) 
dramatically increases our chances of scheduling our jobs on Mesos.
   
   (As the defaults, I've left the current values that Flink on Mesos users 
have as of today to not result in any surprises when they upgrade / pull in 
this change. )
   
   This is a follow-up item that was discussed in 
https://github.com/apache/flink/pull/9652
   
   
   ## Brief change log
   
 - Add config options for unused offer expiry time and declined offer 
refuse duration
 - Plumb change through to Fenzo TaskScheduler and Mesos driver.
   
   ## Verifying this change
   
   
   Existing unit tests + manually tested config options to verify that they 
work as expected. 
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): No
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: No
 - The serializers: No
 - The runtime per-record code paths (performance sensitive): No
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: Yes (Mesos)
 - The S3 file system connector: No
   
   ## Documentation
   
 - Does this pull request introduce a new feature? No just config options
 - If yes, how is the feature documented? Documented config options
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-14158) Update Mesos configs to add leaseOfferExpiration and declinedOfferRefuse durations

2019-09-20 Thread Piyush Narang (Jira)
Piyush Narang created FLINK-14158:
-

 Summary: Update Mesos configs to add leaseOfferExpiration and 
declinedOfferRefuse durations
 Key: FLINK-14158
 URL: https://issues.apache.org/jira/browse/FLINK-14158
 Project: Flink
  Issue Type: Bug
Reporter: Piyush Narang


While debugging some Flink on Mesos scheduling issues (tied to our use of Mesos 
quotas) we end up getting skewed offers that are useless fairly often. As we 
are not rejecting these offers fast enough and as we are not telling Mesos to 
not re-send for a long enough period, we end up not being able to schedule our 
job for upwards of an hour (~30 Mesos containers). 

The Fenzo default is to reject expired and unused Mesos offers after 120s, this 
can be overridden using their TaskScheduler builder. Additionally, Mesos allows 
us to override the time for which it won't re-send offers (default is 5s). We 
found that updating to reject more aggressively (every 1s instead of 120s) and 
keeping rejected offers away for longer (60s instead of 5s) dramatically 
increases our chances of scheduling our jobs on Mesos. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] tweise commented on issue #9581: [FLINK-13864][streaming]: Modify the StreamingFileSink Builder interface to allow for easier subclassing of StreamingFileSink

2019-09-20 Thread GitBox
tweise commented on issue #9581: [FLINK-13864][streaming]: Modify the 
StreamingFileSink Builder interface to allow for easier subclassing of 
StreamingFileSink 
URL: https://github.com/apache/flink/pull/9581#issuecomment-533679202
 
 
   @kl0u thanks for working through this. Should the constructor issue be 
addressed before merging or you prefer to take it up separately?
   
   @yxu-valleytider please rebase your branch and squash.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] yxu-valleytider removed a comment on issue #9581: [FLINK-13864][streaming]: Modify the StreamingFileSink Builder interface to allow for easier subclassing of StreamingFileSink

2019-09-20 Thread GitBox
yxu-valleytider removed a comment on issue #9581: [FLINK-13864][streaming]: 
Modify the StreamingFileSink Builder interface to allow for easier subclassing 
of StreamingFileSink 
URL: https://github.com/apache/flink/pull/9581#issuecomment-533666729
 
 
    
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] yxu-valleytider commented on issue #9581: [FLINK-13864][streaming]: Modify the StreamingFileSink Builder interface to allow for easier subclassing of StreamingFileSink

2019-09-20 Thread GitBox
yxu-valleytider commented on issue #9581: [FLINK-13864][streaming]: Modify the 
StreamingFileSink Builder interface to allow for easier subclassing of 
StreamingFileSink 
URL: https://github.com/apache/flink/pull/9581#issuecomment-533666729
 
 
    
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] yxu-valleytider commented on issue #9581: [FLINK-13864][streaming]: Modify the StreamingFileSink Builder interface to allow for easier subclassing of StreamingFileSink

2019-09-20 Thread GitBox
yxu-valleytider commented on issue #9581: [FLINK-13864][streaming]: Modify the 
StreamingFileSink Builder interface to allow for easier subclassing of 
StreamingFileSink 
URL: https://github.com/apache/flink/pull/9581#issuecomment-533666702
 
 
   > @yxu-valleytider I think we can merge this PR as is for now and re-iterate 
within this release cycle some of the details that need to be ironed out.
   
   Thanks @kl0u  for the review, much appreciated!   Yes definitely open to 
further improvements/consolidations around the `StreamingFileSink` interface. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #9726: [FLINK-14113][client] Remove class JobWithJars

2019-09-20 Thread GitBox
flinkbot edited a comment on issue #9726: [FLINK-14113][client] Remove class 
JobWithJars
URL: https://github.com/apache/flink/pull/9726#issuecomment-533472035
 
 
   
   ## CI report:
   
   * 9d72829c8c3697d2e2582005ba60863d525ffc66 : UNKNOWN
   * 49a801cce0eb397c0248250d986ce99ade485941 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/128479586)
   * a9ac0bfe3bb8c8d48988b8aa68ee085e73e61982 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/128539473)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #9693: [FLINK-13984] Separate on-heap and off-heap managed memory pools

2019-09-20 Thread GitBox
flinkbot edited a comment on issue #9693: [FLINK-13984] Separate on-heap and 
off-heap managed memory pools
URL: https://github.com/apache/flink/pull/9693#issuecomment-531808855
 
 
   
   ## CI report:
   
   * 62ad65e31df8f36747991513ab89504126758af6 : CANCELED 
[Build](https://travis-ci.com/flink-ci/flink/builds/127816671)
   * 95e4a9cff8a946937939e96c75e96d6489ac2d77 : UNKNOWN
   * 7d455542c48bea126f644dbb96dc7fb298253f7a : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/127820213)
   * 48ee1c457622faaa7e644bf4f4fc4b6a22e306a5 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/128536424)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #9468: [FLINK-13689] [Connectors/ElasticSearch] Fix thread leak when elasticsearch6 rest high level cli…

2019-09-20 Thread GitBox
flinkbot edited a comment on issue #9468: [FLINK-13689] 
[Connectors/ElasticSearch] Fix thread leak when elasticsearch6 rest high level 
cli…
URL: https://github.com/apache/flink/pull/9468#issuecomment-522106295
 
 
   
   ## CI report:
   
   * 78654e8f80aa0b1a01654e7684f4610eb1d3aff4 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/123548600)
   * f4a0f59543372d85e7fd1dc5156ba6f165737a03 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/128536515)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] aljoscha commented on a change in pull request #9726: [FLINK-14113][client] Remove class JobWithJars

2019-09-20 Thread GitBox
aljoscha commented on a change in pull request #9726: [FLINK-14113][client] 
Remove class JobWithJars
URL: https://github.com/apache/flink/pull/9726#discussion_r326714019
 
 

 ##
 File path: flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
 ##
 @@ -124,6 +47,7 @@ public static void checkJarFile(URL jar) throws IOException 
{
throw new IOException("JAR file can't be read '" + 
jarFile.getAbsolutePath() + '\'');
}
 
+   //noinspection EmptyTryBlock
 
 Review comment:
   Yes, having a discussion about this is a good point.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #9726: [FLINK-14113][client] Remove class JobWithJars

2019-09-20 Thread GitBox
flinkbot edited a comment on issue #9726: [FLINK-14113][client] Remove class 
JobWithJars
URL: https://github.com/apache/flink/pull/9726#issuecomment-533472035
 
 
   
   ## CI report:
   
   * 9d72829c8c3697d2e2582005ba60863d525ffc66 : UNKNOWN
   * 49a801cce0eb397c0248250d986ce99ade485941 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/128479586)
   * a9ac0bfe3bb8c8d48988b8aa68ee085e73e61982 : PENDING 
[Build](https://travis-ci.com/flink-ci/flink/builds/128539473)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #9726: [FLINK-14113][client] Remove class JobWithJars

2019-09-20 Thread GitBox
flinkbot edited a comment on issue #9726: [FLINK-14113][client] Remove class 
JobWithJars
URL: https://github.com/apache/flink/pull/9726#issuecomment-533472035
 
 
   
   ## CI report:
   
   * 9d72829c8c3697d2e2582005ba60863d525ffc66 : UNKNOWN
   * 49a801cce0eb397c0248250d986ce99ade485941 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/128479586)
   * a9ac0bfe3bb8c8d48988b8aa68ee085e73e61982 : UNKNOWN
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #9468: [FLINK-13689] [Connectors/ElasticSearch] Fix thread leak when elasticsearch6 rest high level cli…

2019-09-20 Thread GitBox
flinkbot edited a comment on issue #9468: [FLINK-13689] 
[Connectors/ElasticSearch] Fix thread leak when elasticsearch6 rest high level 
cli…
URL: https://github.com/apache/flink/pull/9468#issuecomment-522106295
 
 
   
   ## CI report:
   
   * 78654e8f80aa0b1a01654e7684f4610eb1d3aff4 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/123548600)
   * f4a0f59543372d85e7fd1dc5156ba6f165737a03 : PENDING 
[Build](https://travis-ci.com/flink-ci/flink/builds/128536515)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #9693: [FLINK-13984] Separate on-heap and off-heap managed memory pools

2019-09-20 Thread GitBox
flinkbot edited a comment on issue #9693: [FLINK-13984] Separate on-heap and 
off-heap managed memory pools
URL: https://github.com/apache/flink/pull/9693#issuecomment-531808855
 
 
   
   ## CI report:
   
   * 62ad65e31df8f36747991513ab89504126758af6 : CANCELED 
[Build](https://travis-ci.com/flink-ci/flink/builds/127816671)
   * 95e4a9cff8a946937939e96c75e96d6489ac2d77 : UNKNOWN
   * 7d455542c48bea126f644dbb96dc7fb298253f7a : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/127820213)
   * 48ee1c457622faaa7e644bf4f4fc4b6a22e306a5 : PENDING 
[Build](https://travis-ci.com/flink-ci/flink/builds/128536424)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] TisonKun commented on a change in pull request #9727: [FLINK-14145] Fix getLatestCheckpoint(true) returns wrong checkpoint

2019-09-20 Thread GitBox
TisonKun commented on a change in pull request #9727: [FLINK-14145] Fix 
getLatestCheckpoint(true) returns wrong checkpoint
URL: https://github.com/apache/flink/pull/9727#discussion_r326695695
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java
 ##
 @@ -55,31 +54,26 @@
 * added.
 */
default CompletedCheckpoint getLatestCheckpoint(boolean 
isPreferCheckpointForRecovery) throws Exception {
-   if (getAllCheckpoints().isEmpty()) {
+   List allCheckpoints = getAllCheckpoints();
+   if (allCheckpoints.isEmpty()) {
return null;
}
 
-   CompletedCheckpoint candidate = 
getAllCheckpoints().get(getAllCheckpoints().size() - 1);
-   if (isPreferCheckpointForRecovery && getAllCheckpoints().size() 
> 1) {
-   List allCheckpoints;
-   try {
-   allCheckpoints = getAllCheckpoints();
-   ListIterator listIterator 
= allCheckpoints.listIterator(allCheckpoints.size() - 1);
-   while (listIterator.hasPrevious()) {
-   CompletedCheckpoint prev = 
listIterator.previous();
-   if 
(!prev.getProperties().isSavepoint()) {
-   candidate = prev;
-   LOG.info("Found a completed 
checkpoint before the latest savepoint, will use it to recover!");
-   break;
-   }
+   CompletedCheckpoint lastCompleted = 
allCheckpoints.get(allCheckpoints.size() - 1);
+
+   if (lastCompleted.getProperties().isSavepoint() && 
isPreferCheckpointForRecovery && allCheckpoints.size() > 1) {
+   ListIterator listIterator = 
allCheckpoints.listIterator(allCheckpoints.size() - 1);
 
 Review comment:
   @gyfora Yes. My last concern is also about logging. In your version given 
the last completed is a checkpoint we actually log nothing :P


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #9468: [FLINK-13689] [Connectors/ElasticSearch] Fix thread leak when elasticsearch6 rest high level cli…

2019-09-20 Thread GitBox
flinkbot edited a comment on issue #9468: [FLINK-13689] 
[Connectors/ElasticSearch] Fix thread leak when elasticsearch6 rest high level 
cli…
URL: https://github.com/apache/flink/pull/9468#issuecomment-522106295
 
 
   
   ## CI report:
   
   * 78654e8f80aa0b1a01654e7684f4610eb1d3aff4 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/123548600)
   * f4a0f59543372d85e7fd1dc5156ba6f165737a03 : UNKNOWN
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #9693: [FLINK-13984] Separate on-heap and off-heap managed memory pools

2019-09-20 Thread GitBox
flinkbot edited a comment on issue #9693: [FLINK-13984] Separate on-heap and 
off-heap managed memory pools
URL: https://github.com/apache/flink/pull/9693#issuecomment-531808855
 
 
   
   ## CI report:
   
   * 62ad65e31df8f36747991513ab89504126758af6 : CANCELED 
[Build](https://travis-ci.com/flink-ci/flink/builds/127816671)
   * 95e4a9cff8a946937939e96c75e96d6489ac2d77 : UNKNOWN
   * 7d455542c48bea126f644dbb96dc7fb298253f7a : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/127820213)
   * 48ee1c457622faaa7e644bf4f4fc4b6a22e306a5 : UNKNOWN
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] TisonKun commented on a change in pull request #9726: [FLINK-14113][client] Remove class JobWithJars

2019-09-20 Thread GitBox
TisonKun commented on a change in pull request #9726: [FLINK-14113][client] 
Remove class JobWithJars
URL: https://github.com/apache/flink/pull/9726#discussion_r326691424
 
 

 ##
 File path: flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
 ##
 @@ -124,6 +47,7 @@ public static void checkJarFile(URL jar) throws IOException 
{
throw new IOException("JAR file can't be read '" + 
jarFile.getAbsolutePath() + '\'');
}
 
+   //noinspection EmptyTryBlock
 
 Review comment:
   Thanks for your information. I think it is not a warning for java compiler. 
Besides, we have over 100 usages of `//noinspection`. I'm ok to revert this 
line fair enough but it might be worth a community discussion :-)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] TisonKun commented on a change in pull request #9726: [FLINK-14113][client] Remove class JobWithJars

2019-09-20 Thread GitBox
TisonKun commented on a change in pull request #9726: [FLINK-14113][client] 
Remove class JobWithJars
URL: https://github.com/apache/flink/pull/9726#discussion_r326691424
 
 

 ##
 File path: flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
 ##
 @@ -124,6 +47,7 @@ public static void checkJarFile(URL jar) throws IOException 
{
throw new IOException("JAR file can't be read '" + 
jarFile.getAbsolutePath() + '\'');
}
 
+   //noinspection EmptyTryBlock
 
 Review comment:
   Thanks for your information. I think it is not a warning for java compiler. 
Besides, we over 100 usages of `//noinspection`. I'm ok to revert this line 
fair enough but it might be worth a community discussion :-)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] ex00 commented on a change in pull request #9184: [FLINK-13339][ml] Add an implementation of pipeline's api

2019-09-20 Thread GitBox
ex00 commented on a change in pull request #9184: [FLINK-13339][ml] Add an 
implementation of pipeline's api
URL: https://github.com/apache/flink/pull/9184#discussion_r326684479
 
 

 ##
 File path: 
flink-ml-parent/flink-ml-lib/src/main/java/org/apache/flink/ml/common/AlgoOperator.java
 ##
 @@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.ml.common;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.ml.api.misc.param.Params;
+import org.apache.flink.ml.api.misc.param.WithParams;
+import org.apache.flink.ml.batchoperator.source.TableSourceBatchOp;
+import org.apache.flink.ml.params.shared.HasMLEnvironmentId;
+import org.apache.flink.ml.streamoperator.source.TableSourceStreamOp;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.table.api.java.StreamTableEnvironment;
+
+import java.io.Serializable;
+
+/**
+ * Base class for algorithm operators.
+ *
+ * Hold a Table as its output.
+ *
+ * @param  The class type of the {@link AlgoOperator} implementation itself
+ */
+public abstract class AlgoOperator>
+   implements WithParams, HasMLEnvironmentId, Serializable {
+
+   /**
+* Params for algorithms.
+*/
+   private Params params;
+
+   /**
+* The table held by operator.
+*/
+   private Table output = null;
+
+   /**
+* The side outputs of operator that be similar to the stream's side 
outputs.
+*/
+   private Table[] sideOutputs = null;
+
+   /**
+* Construct the operator with empty Params.
+*/
+   protected AlgoOperator() {
+   this(null);
+   }
+
+   /**
+* Construct the operator with the initial Params.
+*/
+   protected AlgoOperator(Params params) {
+   if (null == params) {
+   this.params = new Params();
+   } else {
+   this.params = params.clone();
+   }
+   }
+
+   @Override
+   public Params getParams() {
+   if (null == this.params) {
+   this.params = new Params();
 
 Review comment:
   duplicate logic from constructor. 
   If we cant set `params` as null we don't need check all time it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] ex00 commented on a change in pull request #9184: [FLINK-13339][ml] Add an implementation of pipeline's api

2019-09-20 Thread GitBox
ex00 commented on a change in pull request #9184: [FLINK-13339][ml] Add an 
implementation of pipeline's api
URL: https://github.com/apache/flink/pull/9184#discussion_r326669328
 
 

 ##
 File path: 
flink-ml-parent/flink-ml-lib/src/main/java/org/apache/flink/ml/common/AlgoOperator.java
 ##
 @@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.ml.common;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.ml.api.misc.param.Params;
+import org.apache.flink.ml.api.misc.param.WithParams;
+import org.apache.flink.ml.batchoperator.source.TableSourceBatchOp;
+import org.apache.flink.ml.params.shared.HasMLEnvironmentId;
+import org.apache.flink.ml.streamoperator.source.TableSourceStreamOp;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.table.api.java.StreamTableEnvironment;
+
+import java.io.Serializable;
+
+/**
+ * Base class for algorithm operators.
+ *
+ * Hold a Table as its output.
+ *
+ * @param  The class type of the {@link AlgoOperator} implementation itself
+ */
+public abstract class AlgoOperator>
+   implements WithParams, HasMLEnvironmentId, Serializable {
+
+   /**
+* Params for algorithms.
+*/
+   private Params params;
+
+   /**
+* The table held by operator.
+*/
+   private Table output = null;
+
+   /**
+* The side outputs of operator that be similar to the stream's side 
outputs.
+*/
+   private Table[] sideOutputs = null;
+
+   /**
+* Construct the operator with empty Params.
+*/
+   protected AlgoOperator() {
 
 Review comment:
   probably need only one default constructor with not nullable `Table` 
argument and  nullable `Params` 
   
   big part of methods in this class depends on `output` field
   ```
   public AlgoOperator(Params params, Table table) {
if (null == table) {
throw new IllegalArgumentException('Table shoud be non 
null');
}
this.output = table;
   
   if (null == params) {
this.params = new Params();
} else {
this.params = params.clone();
}
}
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] ex00 commented on a change in pull request #9184: [FLINK-13339][ml] Add an implementation of pipeline's api

2019-09-20 Thread GitBox
ex00 commented on a change in pull request #9184: [FLINK-13339][ml] Add an 
implementation of pipeline's api
URL: https://github.com/apache/flink/pull/9184#discussion_r32594
 
 

 ##
 File path: 
flink-ml-parent/flink-ml-lib/src/main/java/org/apache/flink/ml/common/AlgoOperator.java
 ##
 @@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.ml.common;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.ml.api.misc.param.Params;
+import org.apache.flink.ml.api.misc.param.WithParams;
+import org.apache.flink.ml.batchoperator.source.TableSourceBatchOp;
+import org.apache.flink.ml.params.shared.HasMLEnvironmentId;
+import org.apache.flink.ml.streamoperator.source.TableSourceStreamOp;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.table.api.java.StreamTableEnvironment;
+
+import java.io.Serializable;
+
+/**
+ * Base class for algorithm operators.
+ *
+ * Hold a Table as its output.
+ *
+ * @param  The class type of the {@link AlgoOperator} implementation itself
+ */
+public abstract class AlgoOperator>
+   implements WithParams, HasMLEnvironmentId, Serializable {
+
+   /**
+* Params for algorithms.
+*/
+   private Params params;
+
+   /**
+* The table held by operator.
+*/
+   private Table output = null;
+
+   /**
+* The side outputs of operator that be similar to the stream's side 
outputs.
+*/
+   private Table[] sideOutputs = null;
+
+   /**
+* Construct the operator with empty Params.
+*/
+   protected AlgoOperator() {
+   this(null);
+   }
+
+   /**
+* Construct the operator with the initial Params.
+*/
+   protected AlgoOperator(Params params) {
+   if (null == params) {
+   this.params = new Params();
+   } else {
+   this.params = params.clone();
+   }
+   }
+
+   @Override
+   public Params getParams() {
+   if (null == this.params) {
+   this.params = new Params();
+   }
+   return this.params;
+   }
+
+   /**
+* Returns the table held by operator.
+*/
+   public Table getOutput() {
+   return this.output;
+   }
+
+   /**
+* Returns the side outputs.
+*/
+   public Table[] getSideOutputs() {
+   return this.sideOutputs;
+   }
+
+   /**
+* Set the side outputs.
+*
+* @param sideOutputs the side outputs set the operator.
+*/
+   protected void setSideOutputs(Table[] sideOutputs) {
+   this.sideOutputs = sideOutputs;
+   }
+
+   /**
+* Set the table held by operator.
+*
+* @param output the output table.
+*/
+   protected void setOutput(Table output) {
+   this.output = output;
+   }
+
+   /**
+* Returns the column names of the output table.
+*/
+   public String[] getColNames() {
+   return getSchema().getFieldNames();
+   }
+
+   /**
+* Returns the column types of the output table.
+*/
+   public TypeInformation [] getColTypes() {
+   return getSchema().getFieldTypes();
+   }
+
+   /**
+* Returns the schema of the output table.
+*/
+   public TableSchema getSchema() {
+   return this.getOutput().getSchema();
+   }
+
+   @Override
+   public String toString() {
+   return getOutput().toString();
+   }
+
+   /**
+* create a new AlgoOperator from table.
+* @param table the input table
+* @return the new AlgoOperator
+*/
+   public static AlgoOperator sourceFrom(Table table) {
+   if (((TableImpl) table).getTableEnvironment() instanceof 
StreamTableEnvironment) {
+   return new TableSourceStreamOp(table);
 
 Review comment:
   the cyclic 

[GitHub] [flink] ex00 commented on a change in pull request #9184: [FLINK-13339][ml] Add an implementation of pipeline's api

2019-09-20 Thread GitBox
ex00 commented on a change in pull request #9184: [FLINK-13339][ml] Add an 
implementation of pipeline's api
URL: https://github.com/apache/flink/pull/9184#discussion_r326674127
 
 

 ##
 File path: 
flink-ml-parent/flink-ml-lib/src/main/java/org/apache/flink/ml/batchoperator/source/TableSourceBatchOp.java
 ##
 @@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.ml.batchoperator.source;
+
+import org.apache.flink.ml.batchoperator.BatchOperator;
+import org.apache.flink.table.api.Table;
+
+/**
+ * Transform the Table to SourceBatchOp.
+ */
+public final class TableSourceBatchOp extends 
BatchOperator {
+
+   public TableSourceBatchOp(Table table) {
 
 Review comment:
   this logic could be moved to parent, it same as in `TableSourceStreamOp `


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] ex00 commented on a change in pull request #9184: [FLINK-13339][ml] Add an implementation of pipeline's api

2019-09-20 Thread GitBox
ex00 commented on a change in pull request #9184: [FLINK-13339][ml] Add an 
implementation of pipeline's api
URL: https://github.com/apache/flink/pull/9184#discussion_r326684479
 
 

 ##
 File path: 
flink-ml-parent/flink-ml-lib/src/main/java/org/apache/flink/ml/common/AlgoOperator.java
 ##
 @@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.ml.common;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.ml.api.misc.param.Params;
+import org.apache.flink.ml.api.misc.param.WithParams;
+import org.apache.flink.ml.batchoperator.source.TableSourceBatchOp;
+import org.apache.flink.ml.params.shared.HasMLEnvironmentId;
+import org.apache.flink.ml.streamoperator.source.TableSourceStreamOp;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.table.api.java.StreamTableEnvironment;
+
+import java.io.Serializable;
+
+/**
+ * Base class for algorithm operators.
+ *
+ * Hold a Table as its output.
+ *
+ * @param  The class type of the {@link AlgoOperator} implementation itself
+ */
+public abstract class AlgoOperator>
+   implements WithParams, HasMLEnvironmentId, Serializable {
+
+   /**
+* Params for algorithms.
+*/
+   private Params params;
+
+   /**
+* The table held by operator.
+*/
+   private Table output = null;
+
+   /**
+* The side outputs of operator that be similar to the stream's side 
outputs.
+*/
+   private Table[] sideOutputs = null;
+
+   /**
+* Construct the operator with empty Params.
+*/
+   protected AlgoOperator() {
+   this(null);
+   }
+
+   /**
+* Construct the operator with the initial Params.
+*/
+   protected AlgoOperator(Params params) {
+   if (null == params) {
+   this.params = new Params();
+   } else {
+   this.params = params.clone();
+   }
+   }
+
+   @Override
+   public Params getParams() {
+   if (null == this.params) {
+   this.params = new Params();
 
 Review comment:
   duplicate logic from constructor. 
   If we cant set `params` as null we don't need check any time it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] ex00 commented on a change in pull request #9184: [FLINK-13339][ml] Add an implementation of pipeline's api

2019-09-20 Thread GitBox
ex00 commented on a change in pull request #9184: [FLINK-13339][ml] Add an 
implementation of pipeline's api
URL: https://github.com/apache/flink/pull/9184#discussion_r326669328
 
 

 ##
 File path: 
flink-ml-parent/flink-ml-lib/src/main/java/org/apache/flink/ml/common/AlgoOperator.java
 ##
 @@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.ml.common;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.ml.api.misc.param.Params;
+import org.apache.flink.ml.api.misc.param.WithParams;
+import org.apache.flink.ml.batchoperator.source.TableSourceBatchOp;
+import org.apache.flink.ml.params.shared.HasMLEnvironmentId;
+import org.apache.flink.ml.streamoperator.source.TableSourceStreamOp;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.table.api.java.StreamTableEnvironment;
+
+import java.io.Serializable;
+
+/**
+ * Base class for algorithm operators.
+ *
+ * Hold a Table as its output.
+ *
+ * @param  The class type of the {@link AlgoOperator} implementation itself
+ */
+public abstract class AlgoOperator>
+   implements WithParams, HasMLEnvironmentId, Serializable {
+
+   /**
+* Params for algorithms.
+*/
+   private Params params;
+
+   /**
+* The table held by operator.
+*/
+   private Table output = null;
+
+   /**
+* The side outputs of operator that be similar to the stream's side 
outputs.
+*/
+   private Table[] sideOutputs = null;
+
+   /**
+* Construct the operator with empty Params.
+*/
+   protected AlgoOperator() {
 
 Review comment:
   probably need only one default constructor with not nullable `Table` 
argument  nullable `Params` 
   
   big part of methods in this class depends on `output` field
   ```
   public AlgoOperator(Params params, Table table) {
if (null == table) {
throw new IllegalArgumentException('Table shoud be non 
null');
}
this.output = table;
   
   if (null == params) {
this.params = new Params();
} else {
this.params = params.clone();
}
}
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #9727: [FLINK-14145] Fix getLatestCheckpoint(true) returns wrong checkpoint

2019-09-20 Thread GitBox
flinkbot edited a comment on issue #9727: [FLINK-14145] Fix 
getLatestCheckpoint(true) returns wrong checkpoint
URL: https://github.com/apache/flink/pull/9727#issuecomment-533484572
 
 
   
   ## CI report:
   
   * 31a78ba191c36561f073b0b5a0a2c1b6aa2d18a0 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/128484543)
   * 88ba2336842deb46124e5bd86ae0393ec0217e05 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/128524337)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] gyfora commented on issue #9727: [FLINK-14145] Fix getLatestCheckpoint(true) returns wrong checkpoint

2019-09-20 Thread GitBox
gyfora commented on issue #9727: [FLINK-14145] Fix getLatestCheckpoint(true) 
returns wrong checkpoint
URL: https://github.com/apache/flink/pull/9727#issuecomment-533599941
 
 
   > Thanks for your updates @gyfora. Tests part looks good to me. I will share 
my version of `getLatestCheckpoint`. You're the man decide which one to adopt.
   > 
   > ```java
   >default CompletedCheckpoint getLatestCheckpoint(boolean 
isPreferCheckpointForRecovery) throws Exception {
   >List allCheckpoints = getAllCheckpoints();
   > 
   >if (allCheckpoints.isEmpty()) {
   >return null;
   >}
   > 
   >Predicate isSavepoint = i -> 
allCheckpoints.get(i).getProperties().isSavepoint();
   > 
   >int i = allCheckpoints.size() - 1;
   >while (isPreferCheckpointForRecovery && i > 0 && 
isSavepoint.test(i)) {
   >i--;
   >}
   > 
   >CompletedCheckpoint checkpoint = allCheckpoints.get(i);
   > 
   >LOG.info(
   >"Use {} ({}) to recover.",
   >checkpoint.getProperties().isSavepoint() ? "savepoint" 
: "checkpoint",
   >checkpoint);
   > 
   >return checkpoint;
   >}
   > ```
   
   I don't think one is much better than the other but I feel that from a 
readability perspective my proposal is slightly better. It also includes some 
nice logging to know when we actually preferred a checkpoint or when we fell 
back to a savepoint anyways.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] azagrebin commented on issue #9693: [FLINK-13984] Separate on-heap and off-heap managed memory pools

2019-09-20 Thread GitBox
azagrebin commented on issue #9693: [FLINK-13984] Separate on-heap and off-heap 
managed memory pools
URL: https://github.com/apache/flink/pull/9693#issuecomment-533599524
 
 
   Thanks for the review @KarmaGYZ 
   I have updated the PR.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] aljoscha commented on a change in pull request #9726: [FLINK-14113][client] Remove class JobWithJars

2019-09-20 Thread GitBox
aljoscha commented on a change in pull request #9726: [FLINK-14113][client] 
Remove class JobWithJars
URL: https://github.com/apache/flink/pull/9726#discussion_r326609037
 
 

 ##
 File path: flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
 ##
 @@ -124,6 +47,7 @@ public static void checkJarFile(URL jar) throws IOException 
{
throw new IOException("JAR file can't be read '" + 
jarFile.getAbsolutePath() + '\'');
}
 
+   //noinspection EmptyTryBlock
 
 Review comment:
   `//noinspection` is only an IntelliJ concept, for warning suppression we 
should use `@SuppressWarnings`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] gyfora commented on a change in pull request #9727: [FLINK-14145] Fix getLatestCheckpoint(true) returns wrong checkpoint

2019-09-20 Thread GitBox
gyfora commented on a change in pull request #9727: [FLINK-14145] Fix 
getLatestCheckpoint(true) returns wrong checkpoint
URL: https://github.com/apache/flink/pull/9727#discussion_r326680093
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java
 ##
 @@ -55,31 +54,26 @@
 * added.
 */
default CompletedCheckpoint getLatestCheckpoint(boolean 
isPreferCheckpointForRecovery) throws Exception {
-   if (getAllCheckpoints().isEmpty()) {
+   List allCheckpoints = getAllCheckpoints();
+   if (allCheckpoints.isEmpty()) {
return null;
}
 
-   CompletedCheckpoint candidate = 
getAllCheckpoints().get(getAllCheckpoints().size() - 1);
-   if (isPreferCheckpointForRecovery && getAllCheckpoints().size() 
> 1) {
-   List allCheckpoints;
-   try {
-   allCheckpoints = getAllCheckpoints();
-   ListIterator listIterator 
= allCheckpoints.listIterator(allCheckpoints.size() - 1);
-   while (listIterator.hasPrevious()) {
-   CompletedCheckpoint prev = 
listIterator.previous();
-   if 
(!prev.getProperties().isSavepoint()) {
-   candidate = prev;
-   LOG.info("Found a completed 
checkpoint before the latest savepoint, will use it to recover!");
-   break;
-   }
+   CompletedCheckpoint lastCompleted = 
allCheckpoints.get(allCheckpoints.size() - 1);
+
+   if (lastCompleted.getProperties().isSavepoint() && 
isPreferCheckpointForRecovery && allCheckpoints.size() > 1) {
+   ListIterator listIterator = 
allCheckpoints.listIterator(allCheckpoints.size() - 1);
 
 Review comment:
   This way it's easy to give a log message when we fall back to a checkpoint


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] azagrebin commented on a change in pull request #9693: [FLINK-13984] Separate on-heap and off-heap managed memory pools

2019-09-20 Thread GitBox
azagrebin commented on a change in pull request #9693: [FLINK-13984] Separate 
on-heap and off-heap managed memory pools
URL: https://github.com/apache/flink/pull/9693#discussion_r326679037
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java
 ##
 @@ -474,60 +403,121 @@ public void releaseAll(Object owner) {
return;
}
 
-   //  BEGIN CRITICAL SECTION 
---
-   synchronized (lock) {
-   if (isShutDown) {
-   throw new IllegalStateException("Memory manager 
has been shut down.");
-   }
-
-   // get all segments
-   final Set segments = 
allocatedSegments.remove(owner);
+   Preconditions.checkState(!isShutDown, "Memory manager has been 
shut down.");
 
-   // all segments may have been freed previously 
individually
-   if (segments == null || segments.isEmpty()) {
-   return;
-   }
+   // get all segments
+   Set segments = allocatedSegments.remove(owner);
 
-   // free each segment
-   if (isPreAllocated) {
-   for (MemorySegment seg : segments) {
-   memoryPool.returnSegmentToPool(seg);
-   }
-   }
-   else {
-   for (MemorySegment seg : segments) {
-   seg.free();
-   }
-   numNonAllocatedPages += segments.size();
-   }
+   // all segments may have been freed previously individually
+   if (segments == null || segments.isEmpty()) {
+   return;
+   }
 
-   segments.clear();
+   // free each segment
+   EnumMap releasedMemory = new 
EnumMap<>(MemoryType.class);
+   for (MemorySegment segment : segments) {
+   releaseSegment(segment, releasedMemory);
}
-   //  END CRITICAL SECTION ---
+   budgetByType.releaseBudgetForKeys(releasedMemory);
+
+   segments.clear();
}
 
-   // 

-   //  Properties, sizes and size conversions
-   // 

+   /**
+* Reserves memory of a certain type for an owner from this memory 
manager.
+*
+* @param owner The owner to associate with the memory reservation, for 
the fallback release.
+* @param memoryType type of memory to reserve (heap / off-heap).
+* @param size size of memory to reserve.
+* @throws MemoryAllocationException Thrown, if this memory manager 
does not have the requested amount
+*   of memory any more.
+*/
+   public void reserveMemory(Object owner, MemoryType memoryType, long 
size) throws MemoryAllocationException {
+   checkMemoryReservationPreconditions(owner, memoryType, size);
+   if (size == 0L) {
+   return;
+   }
+
+   long acquiredMemory = 
budgetByType.acquireBudgetForKey(memoryType, size);
+   if (acquiredMemory < size) {
+   throw new MemoryAllocationException(
+   String.format("Could not allocate %d bytes. 
Only %d bytes are remaining.", size, acquiredMemory));
+   }
+
+   reservedMemory.compute(owner, (o, reservations) -> {
+   Map newReservations = reservations;
+   if (reservations == null) {
+   newReservations = new 
EnumMap<>(MemoryType.class);
+   newReservations.put(memoryType, size);
+   } else {
+   reservations.compute(
+   memoryType,
+   (mt, currentlyReserved) -> 
currentlyReserved == null ? size : currentlyReserved + size);
+   }
+   return newReservations;
+   });
+
+   Preconditions.checkState(!isShutDown, "Memory manager has been 
concurrently shut down.");
+   }
 
/**
-* Gets the type of memory (heap / off-heap) managed by this memory 
manager.
+* Releases memory of a certain type from an owner to this memory 
manager.
 *
-* @return The type of memory managed by this memory manager.
+* 

[GitHub] [flink] azagrebin commented on a change in pull request #9693: [FLINK-13984] Separate on-heap and off-heap managed memory pools

2019-09-20 Thread GitBox
azagrebin commented on a change in pull request #9693: [FLINK-13984] Separate 
on-heap and off-heap managed memory pools
URL: https://github.com/apache/flink/pull/9693#discussion_r326676164
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java
 ##
 @@ -474,60 +403,121 @@ public void releaseAll(Object owner) {
return;
}
 
-   //  BEGIN CRITICAL SECTION 
---
-   synchronized (lock) {
-   if (isShutDown) {
-   throw new IllegalStateException("Memory manager 
has been shut down.");
-   }
-
-   // get all segments
-   final Set segments = 
allocatedSegments.remove(owner);
+   Preconditions.checkState(!isShutDown, "Memory manager has been 
shut down.");
 
-   // all segments may have been freed previously 
individually
-   if (segments == null || segments.isEmpty()) {
-   return;
-   }
+   // get all segments
+   Set segments = allocatedSegments.remove(owner);
 
-   // free each segment
-   if (isPreAllocated) {
-   for (MemorySegment seg : segments) {
-   memoryPool.returnSegmentToPool(seg);
-   }
-   }
-   else {
-   for (MemorySegment seg : segments) {
-   seg.free();
-   }
-   numNonAllocatedPages += segments.size();
-   }
+   // all segments may have been freed previously individually
+   if (segments == null || segments.isEmpty()) {
+   return;
+   }
 
-   segments.clear();
+   // free each segment
+   EnumMap releasedMemory = new 
EnumMap<>(MemoryType.class);
+   for (MemorySegment segment : segments) {
+   releaseSegment(segment, releasedMemory);
}
-   //  END CRITICAL SECTION ---
+   budgetByType.releaseBudgetForKeys(releasedMemory);
+
+   segments.clear();
}
 
-   // 

-   //  Properties, sizes and size conversions
-   // 

+   /**
+* Reserves memory of a certain type for an owner from this memory 
manager.
+*
+* @param owner The owner to associate with the memory reservation, for 
the fallback release.
+* @param memoryType type of memory to reserve (heap / off-heap).
+* @param size size of memory to reserve.
+* @throws MemoryAllocationException Thrown, if this memory manager 
does not have the requested amount
+*   of memory any more.
+*/
+   public void reserveMemory(Object owner, MemoryType memoryType, long 
size) throws MemoryAllocationException {
+   checkMemoryReservationPreconditions(owner, memoryType, size);
+   if (size == 0L) {
+   return;
+   }
+
+   long acquiredMemory = 
budgetByType.acquireBudgetForKey(memoryType, size);
+   if (acquiredMemory < size) {
+   throw new MemoryAllocationException(
+   String.format("Could not allocate %d bytes. 
Only %d bytes are remaining.", size, acquiredMemory));
+   }
+
+   reservedMemory.compute(owner, (o, reservations) -> {
+   Map newReservations = reservations;
+   if (reservations == null) {
+   newReservations = new 
EnumMap<>(MemoryType.class);
+   newReservations.put(memoryType, size);
+   } else {
+   reservations.compute(
+   memoryType,
+   (mt, currentlyReserved) -> 
currentlyReserved == null ? size : currentlyReserved + size);
+   }
+   return newReservations;
+   });
+
+   Preconditions.checkState(!isShutDown, "Memory manager has been 
concurrently shut down.");
+   }
 
/**
-* Gets the type of memory (heap / off-heap) managed by this memory 
manager.
+* Releases memory of a certain type from an owner to this memory 
manager.
 *
-* @return The type of memory managed by this memory manager.
+* 

[GitHub] [flink] azagrebin commented on a change in pull request #9693: [FLINK-13984] Separate on-heap and off-heap managed memory pools

2019-09-20 Thread GitBox
azagrebin commented on a change in pull request #9693: [FLINK-13984] Separate 
on-heap and off-heap managed memory pools
URL: https://github.com/apache/flink/pull/9693#discussion_r326675307
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java
 ##
 @@ -394,74 +318,79 @@ public void release(Collection segments) {
return;
}
 
-   //  BEGIN CRITICAL SECTION 
---
-   synchronized (lock) {
-   if (isShutDown) {
-   throw new IllegalStateException("Memory manager 
has been shut down.");
-   }
+   Preconditions.checkState(!isShutDown, "Memory manager has been 
shut down.");
 
-   // since concurrent modifications to the collection
-   // can disturb the release, we need to try potentially 
multiple times
-   boolean successfullyReleased = false;
-   do {
-   final Iterator segmentsIterator 
= segments.iterator();
+   EnumMap releasedMemory = new 
EnumMap<>(MemoryType.class);
 
-   Object lastOwner = null;
-   Set segsForOwner = null;
+   // since concurrent modifications to the collection
+   // can disturb the release, we need to try potentially multiple 
times
+   boolean successfullyReleased = false;
+   do {
+   Iterator segmentsIterator = 
segments.iterator();
 
-   try {
-   // go over all segments
-   while (segmentsIterator.hasNext()) {
-
-   final MemorySegment seg = 
segmentsIterator.next();
-   if (seg == null || 
seg.isFreed()) {
-   continue;
-   }
-
-   final Object owner = 
seg.getOwner();
-
-   try {
-   // get the list of 
segments by this owner only if it is a different owner than for
-   // the previous one (or 
it is the first segment)
-   if (lastOwner != owner) 
{
-   lastOwner = 
owner;
-   segsForOwner = 
this.allocatedSegments.get(owner);
-   }
-
-   // remove the segment 
from the list
-   if (segsForOwner != 
null) {
-   
segsForOwner.remove(seg);
-   if 
(segsForOwner.isEmpty()) {
-   
this.allocatedSegments.remove(owner);
-   }
-   }
-
-   if (isPreAllocated) {
-   
memoryPool.returnSegmentToPool(seg);
-   }
-   else {
-   seg.free();
-   
numNonAllocatedPages++;
-   }
-   }
-   catch (Throwable t) {
-   throw new 
RuntimeException(
-   "Error 
removing book-keeping reference to allocated memory segment.", t);
-   }
+   //noinspection ProhibitedExceptionCaught
+   try {
+   MemorySegment segment = null;
+   while (segment == null && 
segmentsIterator.hasNext()) {
+   segment = segmentsIterator.next();
+   if (segment.isFreed()) {
+   segment = null;
}
+   }
+   while (segment != null) {
+   

[GitHub] [flink] azagrebin commented on a change in pull request #9693: [FLINK-13984] Separate on-heap and off-heap managed memory pools

2019-09-20 Thread GitBox
azagrebin commented on a change in pull request #9693: [FLINK-13984] Separate 
on-heap and off-heap managed memory pools
URL: https://github.com/apache/flink/pull/9693#discussion_r326675307
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java
 ##
 @@ -394,74 +318,79 @@ public void release(Collection segments) {
return;
}
 
-   //  BEGIN CRITICAL SECTION 
---
-   synchronized (lock) {
-   if (isShutDown) {
-   throw new IllegalStateException("Memory manager 
has been shut down.");
-   }
+   Preconditions.checkState(!isShutDown, "Memory manager has been 
shut down.");
 
-   // since concurrent modifications to the collection
-   // can disturb the release, we need to try potentially 
multiple times
-   boolean successfullyReleased = false;
-   do {
-   final Iterator segmentsIterator 
= segments.iterator();
+   EnumMap releasedMemory = new 
EnumMap<>(MemoryType.class);
 
-   Object lastOwner = null;
-   Set segsForOwner = null;
+   // since concurrent modifications to the collection
+   // can disturb the release, we need to try potentially multiple 
times
+   boolean successfullyReleased = false;
+   do {
+   Iterator segmentsIterator = 
segments.iterator();
 
-   try {
-   // go over all segments
-   while (segmentsIterator.hasNext()) {
-
-   final MemorySegment seg = 
segmentsIterator.next();
-   if (seg == null || 
seg.isFreed()) {
-   continue;
-   }
-
-   final Object owner = 
seg.getOwner();
-
-   try {
-   // get the list of 
segments by this owner only if it is a different owner than for
-   // the previous one (or 
it is the first segment)
-   if (lastOwner != owner) 
{
-   lastOwner = 
owner;
-   segsForOwner = 
this.allocatedSegments.get(owner);
-   }
-
-   // remove the segment 
from the list
-   if (segsForOwner != 
null) {
-   
segsForOwner.remove(seg);
-   if 
(segsForOwner.isEmpty()) {
-   
this.allocatedSegments.remove(owner);
-   }
-   }
-
-   if (isPreAllocated) {
-   
memoryPool.returnSegmentToPool(seg);
-   }
-   else {
-   seg.free();
-   
numNonAllocatedPages++;
-   }
-   }
-   catch (Throwable t) {
-   throw new 
RuntimeException(
-   "Error 
removing book-keeping reference to allocated memory segment.", t);
-   }
+   //noinspection ProhibitedExceptionCaught
+   try {
+   MemorySegment segment = null;
+   while (segment == null && 
segmentsIterator.hasNext()) {
+   segment = segmentsIterator.next();
+   if (segment.isFreed()) {
+   segment = null;
}
+   }
+   while (segment != null) {
+   

[GitHub] [flink] azagrebin commented on a change in pull request #9693: [FLINK-13984] Separate on-heap and off-heap managed memory pools

2019-09-20 Thread GitBox
azagrebin commented on a change in pull request #9693: [FLINK-13984] Separate 
on-heap and off-heap managed memory pools
URL: https://github.com/apache/flink/pull/9693#discussion_r326673300
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java
 ##
 @@ -257,133 +216,98 @@ public boolean verifyEmpty() {
 *   of memory pages any more.
 */
public List allocatePages(Object owner, int numPages) 
throws MemoryAllocationException {
-   final ArrayList segs = new 
ArrayList(numPages);
-   allocatePages(owner, segs, numPages);
-   return segs;
+   List segments = new ArrayList<>(numPages);
+   allocatePages(owner, segments, numPages);
+   return segments;
}
 
/**
-* Allocates a set of memory segments from this memory manager. If the 
memory manager pre-allocated the
-* segments, they will be taken from the pool of memory segments. 
Otherwise, they will be allocated
-* as part of this call.
+* Allocates a set of memory segments from this memory manager.
+*
+* The returned segments can have any memory type. The total 
allocated memory for each type will not exceed its
+* size limit, announced in the constructor.
 *
 * @param owner The owner to associate with the memory segment, for the 
fallback release.
 * @param target The list into which to put the allocated memory pages.
 * @param numPages The number of pages to allocate.
 * @throws MemoryAllocationException Thrown, if this memory manager 
does not have the requested amount
 *   of memory pages any more.
 */
-   public void allocatePages(Object owner, List target, int 
numPages)
-   throws MemoryAllocationException {
+   public void allocatePages(
+   Object owner,
+   Collection target,
+   int numPages) throws MemoryAllocationException {
// sanity check
-   if (owner == null) {
-   throw new IllegalArgumentException("The memory owner 
must not be null.");
-   }
+   Preconditions.checkNotNull(owner, "The memory owner must not be 
null.");
+   Preconditions.checkState(!isShutDown, "Memory manager has been 
shut down.");
 
// reserve array space, if applicable
if (target instanceof ArrayList) {
((ArrayList) 
target).ensureCapacity(numPages);
}
 
-   //  BEGIN CRITICAL SECTION 
---
-   synchronized (lock) {
-   if (isShutDown) {
-   throw new IllegalStateException("Memory manager 
has been shut down.");
-   }
-
-   // in the case of pre-allocated memory, the 
'numNonAllocatedPages' is zero, in the
-   // lazy case, the 'freeSegments.size()' is zero.
-   if (numPages > 
(memoryPool.getNumberOfAvailableMemorySegments() + numNonAllocatedPages)) {
-   throw new MemoryAllocationException("Could not 
allocate " + numPages + " pages. Only " +
-   
(memoryPool.getNumberOfAvailableMemorySegments() + numNonAllocatedPages)
-   + " pages are remaining.");
-   }
-
-   Set segmentsForOwner = 
allocatedSegments.get(owner);
-   if (segmentsForOwner == null) {
-   segmentsForOwner = new 
HashSet(numPages);
-   allocatedSegments.put(owner, segmentsForOwner);
-   }
+   // in the case of pre-allocated memory, the 
'numNonAllocatedPages' is zero, in the
 
 Review comment:
   Good catch, thanks, I will remove it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] azagrebin commented on a change in pull request #9693: [FLINK-13984] Separate on-heap and off-heap managed memory pools

2019-09-20 Thread GitBox
azagrebin commented on a change in pull request #9693: [FLINK-13984] Separate 
on-heap and off-heap managed memory pools
URL: https://github.com/apache/flink/pull/9693#discussion_r326673300
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java
 ##
 @@ -257,133 +216,98 @@ public boolean verifyEmpty() {
 *   of memory pages any more.
 */
public List allocatePages(Object owner, int numPages) 
throws MemoryAllocationException {
-   final ArrayList segs = new 
ArrayList(numPages);
-   allocatePages(owner, segs, numPages);
-   return segs;
+   List segments = new ArrayList<>(numPages);
+   allocatePages(owner, segments, numPages);
+   return segments;
}
 
/**
-* Allocates a set of memory segments from this memory manager. If the 
memory manager pre-allocated the
-* segments, they will be taken from the pool of memory segments. 
Otherwise, they will be allocated
-* as part of this call.
+* Allocates a set of memory segments from this memory manager.
+*
+* The returned segments can have any memory type. The total 
allocated memory for each type will not exceed its
+* size limit, announced in the constructor.
 *
 * @param owner The owner to associate with the memory segment, for the 
fallback release.
 * @param target The list into which to put the allocated memory pages.
 * @param numPages The number of pages to allocate.
 * @throws MemoryAllocationException Thrown, if this memory manager 
does not have the requested amount
 *   of memory pages any more.
 */
-   public void allocatePages(Object owner, List target, int 
numPages)
-   throws MemoryAllocationException {
+   public void allocatePages(
+   Object owner,
+   Collection target,
+   int numPages) throws MemoryAllocationException {
// sanity check
-   if (owner == null) {
-   throw new IllegalArgumentException("The memory owner 
must not be null.");
-   }
+   Preconditions.checkNotNull(owner, "The memory owner must not be 
null.");
+   Preconditions.checkState(!isShutDown, "Memory manager has been 
shut down.");
 
// reserve array space, if applicable
if (target instanceof ArrayList) {
((ArrayList) 
target).ensureCapacity(numPages);
}
 
-   //  BEGIN CRITICAL SECTION 
---
-   synchronized (lock) {
-   if (isShutDown) {
-   throw new IllegalStateException("Memory manager 
has been shut down.");
-   }
-
-   // in the case of pre-allocated memory, the 
'numNonAllocatedPages' is zero, in the
-   // lazy case, the 'freeSegments.size()' is zero.
-   if (numPages > 
(memoryPool.getNumberOfAvailableMemorySegments() + numNonAllocatedPages)) {
-   throw new MemoryAllocationException("Could not 
allocate " + numPages + " pages. Only " +
-   
(memoryPool.getNumberOfAvailableMemorySegments() + numNonAllocatedPages)
-   + " pages are remaining.");
-   }
-
-   Set segmentsForOwner = 
allocatedSegments.get(owner);
-   if (segmentsForOwner == null) {
-   segmentsForOwner = new 
HashSet(numPages);
-   allocatedSegments.put(owner, segmentsForOwner);
-   }
+   // in the case of pre-allocated memory, the 
'numNonAllocatedPages' is zero, in the
 
 Review comment:
   Good catch, thanks, I will change it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] azagrebin commented on a change in pull request #8479: [FLINK-11193][State Backends]Use user passed configuration overriding default configuration loading from file

2019-09-20 Thread GitBox
azagrebin commented on a change in pull request #8479: [FLINK-11193][State 
Backends]Use user passed configuration overriding default configuration loading 
from file
URL: https://github.com/apache/flink/pull/8479#discussion_r323107972
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
 ##
 @@ -507,7 +507,7 @@ public CheckpointStorage createCheckpointStorage(JobID 
jobId) throws IOException
keyGroupRange,
executionConfig,
localRecoveryConfig,
-   priorityQueueStateType,
+   getPriorityQueueStateType(priorityQueueStateType),
 
 Review comment:
   `priorityQueueStateType` is a class field, do we actually need to pass it to 
`getPriorityQueueStateType ` if it is already available there?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] azagrebin commented on a change in pull request #8479: [FLINK-11193][State Backends]Use user passed configuration overriding default configuration loading from file

2019-09-20 Thread GitBox
azagrebin commented on a change in pull request #8479: [FLINK-11193][State 
Backends]Use user passed configuration overriding default configuration loading 
from file
URL: https://github.com/apache/flink/pull/8479#discussion_r323107474
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
 ##
 @@ -716,6 +716,16 @@ public void enableTtlCompactionFilter() {
enableTtlCompactionFilter = TernaryBoolean.TRUE;
}
 
+   /**
+* Gets the PriorityQueueStateType. It will fallback to the default 
value, if it is not explicitly set.
 
 Review comment:
   nit: `Gets the type of the priority queue state. It will ..`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] azagrebin commented on a change in pull request #8479: [FLINK-11193][State Backends]Use user passed configuration overriding default configuration loading from file

2019-09-20 Thread GitBox
azagrebin commented on a change in pull request #8479: [FLINK-11193][State 
Backends]Use user passed configuration overriding default configuration loading 
from file
URL: https://github.com/apache/flink/pull/8479#discussion_r323108356
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
 ##
 @@ -716,6 +716,16 @@ public void enableTtlCompactionFilter() {
enableTtlCompactionFilter = TernaryBoolean.TRUE;
}
 
+   /**
+* Gets the PriorityQueueStateType. It will fallback to the default 
value, if it is not explicitly set.
+* @param origin
+* @return
 
 Review comment:
   I think empty `@return` doc does not add any value. I would either remove it 
or add:
   `the type of the priority queue state`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #9727: [FLINK-14145] Fix getLatestCheckpoint(true) returns wrong checkpoint

2019-09-20 Thread GitBox
flinkbot edited a comment on issue #9727: [FLINK-14145] Fix 
getLatestCheckpoint(true) returns wrong checkpoint
URL: https://github.com/apache/flink/pull/9727#issuecomment-533484572
 
 
   
   ## CI report:
   
   * 31a78ba191c36561f073b0b5a0a2c1b6aa2d18a0 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/128484543)
   * 88ba2336842deb46124e5bd86ae0393ec0217e05 : PENDING 
[Build](https://travis-ci.com/flink-ci/flink/builds/128524337)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #9736: [FLINK-14157] Disable StreamingFileSink e2e test for java 11

2019-09-20 Thread GitBox
flinkbot edited a comment on issue #9736: [FLINK-14157] Disable 
StreamingFileSink e2e test for java 11
URL: https://github.com/apache/flink/pull/9736#issuecomment-533561281
 
 
   
   ## CI report:
   
   * c8f2280889a4f076dc56329912b7890c8f828dd7 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/128517732)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #9735: [FLINK-14156][runtime] Submit timer trigger letters to task's mailbox with operator's precedence

2019-09-20 Thread GitBox
flinkbot edited a comment on issue #9735: [FLINK-14156][runtime] Submit timer 
trigger letters to task's mailbox with operator's precedence
URL: https://github.com/apache/flink/pull/9735#issuecomment-533554096
 
 
   
   ## CI report:
   
   * 51167d9aaf8082150738dc5bd8b9e2319dd01570 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/128514569)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] 1u0 commented on a change in pull request #9727: [FLINK-14145] Fix getLatestCheckpoint(true) returns wrong checkpoint

2019-09-20 Thread GitBox
1u0 commented on a change in pull request #9727: [FLINK-14145] Fix 
getLatestCheckpoint(true) returns wrong checkpoint
URL: https://github.com/apache/flink/pull/9727#discussion_r326659911
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java
 ##
 @@ -55,31 +54,26 @@
 * added.
 */
default CompletedCheckpoint getLatestCheckpoint(boolean 
isPreferCheckpointForRecovery) throws Exception {
-   if (getAllCheckpoints().isEmpty()) {
+   List allCheckpoints = getAllCheckpoints();
+   if (allCheckpoints.isEmpty()) {
return null;
}
 
-   CompletedCheckpoint candidate = 
getAllCheckpoints().get(getAllCheckpoints().size() - 1);
-   if (isPreferCheckpointForRecovery && getAllCheckpoints().size() 
> 1) {
-   List allCheckpoints;
-   try {
-   allCheckpoints = getAllCheckpoints();
-   ListIterator listIterator 
= allCheckpoints.listIterator(allCheckpoints.size() - 1);
-   while (listIterator.hasPrevious()) {
-   CompletedCheckpoint prev = 
listIterator.previous();
-   if 
(!prev.getProperties().isSavepoint()) {
-   candidate = prev;
-   LOG.info("Found a completed 
checkpoint before the latest savepoint, will use it to recover!");
-   break;
-   }
+   CompletedCheckpoint lastCompleted = 
allCheckpoints.get(allCheckpoints.size() - 1);
+
+   if (lastCompleted.getProperties().isSavepoint() && 
isPreferCheckpointForRecovery && allCheckpoints.size() > 1) {
+   ListIterator listIterator = 
allCheckpoints.listIterator(allCheckpoints.size() - 1);
 
 Review comment:
   You can simplify code further: as I've mentioned before, you don't need a 
special `lastCompleted.getProperties().isSavepoint()` check, instead 
instantiate the iterator from the very last element 
`allCheckpoints.listIterator(allCheckpoints.size())`.
   
   The whole method can be structured as:
   ```
   if (allCheckpoints.isEmpty()) {
   return null;
   }
   if (isPreferCheckpointForRecovery) {
   // try to find the latest checkpoint by reverse traversal over 
allCheckpoints
   // return it if found
   }
   // return the latest (be it checkpoint or savepoint)
   return allCheckpoints.get(allCheckpoints.size() - 1);
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #9727: [FLINK-14145] Fix getLatestCheckpoint(true) returns wrong checkpoint

2019-09-20 Thread GitBox
flinkbot edited a comment on issue #9727: [FLINK-14145] Fix 
getLatestCheckpoint(true) returns wrong checkpoint
URL: https://github.com/apache/flink/pull/9727#issuecomment-533484572
 
 
   
   ## CI report:
   
   * 31a78ba191c36561f073b0b5a0a2c1b6aa2d18a0 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/128484543)
   * 88ba2336842deb46124e5bd86ae0393ec0217e05 : UNKNOWN
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-14123) Change taskmanager.memory.fraction default value to 0.6

2019-09-20 Thread Stephan Ewen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14123?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16934441#comment-16934441
 ] 

Stephan Ewen commented on FLINK-14123:
--

Would the behavior be the same when using G1 GC? Or CMS?

CMS my be used mainly be streaming-only users, but G1 is definitely used as the 
default also by various batch users.

> Change taskmanager.memory.fraction default value to 0.6
> ---
>
> Key: FLINK-14123
> URL: https://issues.apache.org/jira/browse/FLINK-14123
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Configuration
>Affects Versions: 1.9.0
>Reporter: liupengcheng
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Currently, we are testing flink batch task, such as terasort, however, it 
> started only awhile then it failed due to OOM. 
>  
> {code:java}
> org.apache.flink.client.program.ProgramInvocationException: Job failed. 
> (JobID: a807e1d635bd4471ceea4282477f8850)
>   at 
> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:262)
>   at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:338)
>   at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:326)
>   at 
> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:62)
>   at 
> org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:539)
>   at 
> com.github.ehiggs.spark.terasort.FlinkTeraSort$.main(FlinkTeraSort.scala:89)
>   at 
> com.github.ehiggs.spark.terasort.FlinkTeraSort.main(FlinkTeraSort.scala)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:604)
>   at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:466)
>   at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:274)
>   at 
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:746)
>   at 
> org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:273)
>   at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:205)
>   at 
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1007)
>   at 
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1080)
>   at java.security.AccessController.doPrivileged(Native Method)
>   at javax.security.auth.Subject.doAs(Subject.java:422)
>   at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1886)
>   at 
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>   at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1080)
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job 
> execution failed.
>   at 
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
>   at 
> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:259)
>   ... 23 more
> Caused by: java.lang.RuntimeException: Error obtaining the sorted input: 
> Thread 'SortMerger Reading Thread' terminated due to an exception: GC 
> overhead limit exceeded
>   at 
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:650)
>   at 
> org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1109)
>   at org.apache.flink.runtime.operators.NoOpDriver.run(NoOpDriver.java:82)
>   at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:504)
>   at 
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:369)
>   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: java.io.IOException: Thread 'SortMerger Reading Thread' terminated 
> due to an exception: GC overhead limit exceeded
>   at 
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:831)
> Caused by: java.lang.OutOfMemoryError: GC overhead limit exceeded
>   at 
> org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer.deserialize(BytePrimitiveArraySerializer.java:84)
>   at 
> 

[GitHub] [flink] gyfora commented on issue #9727: [FLINK-14145] Fix getLatestCheckpoint(true) returns wrong checkpoint

2019-09-20 Thread GitBox
gyfora commented on issue #9727: [FLINK-14145] Fix getLatestCheckpoint(true) 
returns wrong checkpoint
URL: https://github.com/apache/flink/pull/9727#issuecomment-533572489
 
 
   @TisonKun @1u0 I pushed a new commit that addresses the comments + adds more 
tests


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #9734: [FLINK-14155][ml] Add a wrapper class of a JSON library to provide the unified json format.

2019-09-20 Thread GitBox
flinkbot edited a comment on issue #9734: [FLINK-14155][ml] Add a wrapper class 
of a JSON library to provide the unified json format.
URL: https://github.com/apache/flink/pull/9734#issuecomment-533547640
 
 
   
   ## CI report:
   
   * d0eb5e2b5a8f664a1de56dbe0ea3aa592452b7c2 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/128511658)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #9736: [FLINK-14157] Disable StreamingFileSink e2e test for java 11

2019-09-20 Thread GitBox
flinkbot edited a comment on issue #9736: [FLINK-14157] Disable 
StreamingFileSink e2e test for java 11
URL: https://github.com/apache/flink/pull/9736#issuecomment-533561281
 
 
   
   ## CI report:
   
   * c8f2280889a4f076dc56329912b7890c8f828dd7 : PENDING 
[Build](https://travis-ci.com/flink-ci/flink/builds/128517732)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #9733: [FLINK-14154][ml] Add the class for multivariate Gaussian Distribution.

2019-09-20 Thread GitBox
flinkbot edited a comment on issue #9733: [FLINK-14154][ml] Add the class for 
multivariate Gaussian Distribution.
URL: https://github.com/apache/flink/pull/9733#issuecomment-533547601
 
 
   
   ## CI report:
   
   * ea3d0cd6da22ebd2846bf0f3a2fa1d9f21964675 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/128511616)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhuzhurk commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub

2019-09-20 Thread GitBox
zhuzhurk commented on a change in pull request #9663: 
[WIP][FLINK-12433][runtime] Implement DefaultScheduler stub
URL: https://github.com/apache/flink/pull/9663#discussion_r326644539
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
 ##
 @@ -23,40 +23,93 @@
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.blob.BlobWriter;
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler;
+import org.apache.flink.runtime.executiongraph.failover.flip1.FailoverStrategy;
+import 
org.apache.flink.runtime.executiongraph.failover.flip1.FailureHandlingResult;
+import 
org.apache.flink.runtime.executiongraph.failover.flip1.RestartBackoffTimeStrategy;
 import org.apache.flink.runtime.executiongraph.restart.ThrowingRestartStrategy;
 import org.apache.flink.runtime.io.network.partition.PartitionTracker;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
 import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
 import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
 import 
org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureStatsTracker;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import 
org.apache.flink.runtime.scheduler.strategy.LazyFromSourcesSchedulingStrategy;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingStrategy;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingStrategyFactory;
 import org.apache.flink.runtime.shuffle.ShuffleMaster;
+import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 
 import org.slf4j.Logger;
 
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
 
 /**
- * Stub implementation of the future default scheduler.
+ * The future default scheduler.
  */
-public class DefaultScheduler extends LegacyScheduler {
+public class DefaultScheduler extends SchedulerBase implements 
SchedulerOperations {
+
+   private final Logger log;
+
+   private final ClassLoader userCodeLoader;
+
+   private final ExecutionSlotAllocator executionSlotAllocator;
+
+   private final ExecutionFailureHandler executionFailureHandler;
+
+   private final ScheduledExecutor delayExecutor;
+
+   private final SchedulingStrategy schedulingStrategy;
+
+   private final ExecutionVertexVersioner executionVertexVersioner;
+
+   private final ExecutionVertexOperations executionVertexOperations;
 
public DefaultScheduler(
-   final Logger log,
 
 Review comment:
   I think the benefit of double indentation is to avoid the param in function 
declaration mixed up with the function body which has single indentation. But 
as you did, the confusion can be reduced by adding an empty line between them.
   
   I don't have much preference for this style actually, so it's fine to not 
change it if double indentation is not explicitly required.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Closed] (FLINK-14096) Merge NewClusterClient into ClusterClient

2019-09-20 Thread tison (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14096?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

tison closed FLINK-14096.
-
Resolution: Resolved

master via 1c20b8397299b17279e30ed4dca1f9efe6b8d9ec

> Merge NewClusterClient into ClusterClient
> -
>
> Key: FLINK-14096
> URL: https://issues.apache.org/jira/browse/FLINK-14096
> Project: Flink
>  Issue Type: Sub-task
>  Components: Client / Job Submission
>Reporter: tison
>Assignee: tison
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.10.0
>
>  Time Spent: 40m
>  Remaining Estimate: 0h
>
> With the effort under FLINK-10392 we don't need the bridge class 
> {{NewClusterClient}} any more. We can just merge {{NewClusterClient}} into 
> {{ClusterClient}} towards an interface-ized {{ClusterClient}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] kl0u commented on issue #9701: [FLINK-14096][client] Merge NewClusterClient into ClusterClient

2019-09-20 Thread GitBox
kl0u commented on issue #9701: [FLINK-14096][client] Merge NewClusterClient 
into ClusterClient
URL: https://github.com/apache/flink/pull/9701#issuecomment-533562510
 
 
   Thanks @TisonKun !


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] TisonKun merged pull request #9701: [FLINK-14096][client] Merge NewClusterClient into ClusterClient

2019-09-20 Thread GitBox
TisonKun merged pull request #9701: [FLINK-14096][client] Merge 
NewClusterClient into ClusterClient
URL: https://github.com/apache/flink/pull/9701
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] kl0u closed pull request #9695: [FLINK-13748][S3][build] Fix jaxb relocation for S3.

2019-09-20 Thread GitBox
kl0u closed pull request #9695: [FLINK-13748][S3][build] Fix jaxb relocation 
for S3.
URL: https://github.com/apache/flink/pull/9695
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #9735: [FLINK-14156][runtime] Submit timer trigger letters to task's mailbox with operator's precedence

2019-09-20 Thread GitBox
flinkbot edited a comment on issue #9735: [FLINK-14156][runtime] Submit timer 
trigger letters to task's mailbox with operator's precedence
URL: https://github.com/apache/flink/pull/9735#issuecomment-533554096
 
 
   
   ## CI report:
   
   * 51167d9aaf8082150738dc5bd8b9e2319dd01570 : PENDING 
[Build](https://travis-ci.com/flink-ci/flink/builds/128514569)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot commented on issue #9736: [FLINK-14157] Disable StreamingFileSink e2e test for java 11

2019-09-20 Thread GitBox
flinkbot commented on issue #9736: [FLINK-14157] Disable StreamingFileSink e2e 
test for java 11
URL: https://github.com/apache/flink/pull/9736#issuecomment-533561281
 
 
   
   ## CI report:
   
   * c8f2280889a4f076dc56329912b7890c8f828dd7 : UNKNOWN
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] 1u0 commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator

2019-09-20 Thread GitBox
1u0 commented on a change in pull request #9717: [FLINK-14044] [runtime] 
Reducing synchronization in AsyncWaitOperator
URL: https://github.com/apache/flink/pull/9717#discussion_r326635582
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
 ##
 @@ -293,140 +246,120 @@ public void close() throws Exception {
waitInFlightInputsFinished();
}
finally {
-   Exception exception = null;
-
-   try {
-   super.close();
-   } catch (InterruptedException interrupted) {
-   exception = interrupted;
-
-   Thread.currentThread().interrupt();
-   } catch (Exception e) {
-   exception = e;
-   }
-
-   try {
-   // terminate the emitter, the emitter thread 
and the executor
-   stopResources(true);
-   } catch (InterruptedException interrupted) {
-   exception = 
ExceptionUtils.firstOrSuppressed(interrupted, exception);
-
-   Thread.currentThread().interrupt();
-   } catch (Exception e) {
-   exception = ExceptionUtils.firstOrSuppressed(e, 
exception);
-   }
-
-   if (exception != null) {
-   LOG.warn("Errors occurred while closing the 
AsyncWaitOperator.", exception);
-   }
+   super.close();
}
}
 
-   @Override
-   public void dispose() throws Exception {
-   Exception exception = null;
+   /**
+* Add the given stream element to the operator's stream element queue. 
This operation blocks until the element
+* has been added.
+*
+* Between two insertion attempts, this method yields the execution 
to the mailbox, such that events as well
+* as asynchronous results can be processed.
+*
+* @param streamElement to add to the operator's queue
+* @throws InterruptedException if the current thread has been 
interrupted while yielding to mailbox
+* @return a handle that allows to set the result of the async 
computation for the given element.
+*/
+   private ResultFuture addToWorkQueue(StreamElement streamElement) 
throws InterruptedException {
+   assert(Thread.holdsLock(checkpointingLock));
 
-   try {
-   super.dispose();
-   } catch (InterruptedException interrupted) {
-   exception = interrupted;
+   pendingStreamElement = streamElement;
 
-   Thread.currentThread().interrupt();
-   } catch (Exception e) {
-   exception = e;
+   Optional> queueEntry;
+   while (!(queueEntry = queue.tryPut(streamElement)).isPresent()) 
{
+   mailboxExecutor.yield();
}
 
-   try {
-   stopResources(false);
-   } catch (InterruptedException interrupted) {
-   exception = 
ExceptionUtils.firstOrSuppressed(interrupted, exception);
+   pendingStreamElement = null;
 
-   Thread.currentThread().interrupt();
-   } catch (Exception e) {
-   exception = ExceptionUtils.firstOrSuppressed(e, 
exception);
-   }
+   return queueEntry.get();
+   }
 
-   if (exception != null) {
-   throw exception;
+   private void waitInFlightInputsFinished() throws InterruptedException {
+   assert(Thread.holdsLock(checkpointingLock));
+
+   while (!queue.isEmpty()) {
+   mailboxExecutor.yield();
}
}
 
/**
-* Close the operator's resources. They include the emitter thread and 
the executor to run
-* the queue's complete operation.
+* Batch output of all completed elements. Watermarks are always 
completed if it's their turn to be processed.
 *
-* @param waitForShutdown is true if the method should wait for the 
resources to be freed;
-*   otherwise false.
-* @throws InterruptedException if current thread has been interrupted
+* This method will be called from {@link 
#processWatermark(Watermark)} and from a mail processing the result
+* of an async function call.
 */
-   private void stopResources(boolean waitForShutdown) throws 
InterruptedException {
-   emitter.stop();
-   

[GitHub] [flink] TisonKun commented on issue #9701: [FLINK-14096][client] Merge NewClusterClient into ClusterClient

2019-09-20 Thread GitBox
TisonKun commented on issue #9701: [FLINK-14096][client] Merge NewClusterClient 
into ClusterClient
URL: https://github.com/apache/flink/pull/9701#issuecomment-533559645
 
 
   Thanks for your explanation @kl0u. Then I will try to merge this pull 
request myself :-)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-14140) The Flink Logo Displayed in Flink Python Shell is Broken

2019-09-20 Thread Hequn Cheng (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14140?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16934409#comment-16934409
 ] 

Hequn Cheng commented on FLINK-14140:
-

[~zhongwei] Thanks a lot for the checking. The PR looks good. Will merge later.

> The Flink Logo Displayed in Flink Python Shell is Broken
> 
>
> Key: FLINK-14140
> URL: https://issues.apache.org/jira/browse/FLINK-14140
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python
>Affects Versions: 1.9.0, 1.10.0
>Reporter: Wei Zhong
>Assignee: Wei Zhong
>Priority: Trivial
>  Labels: pull-request-available
> Attachments: image-2019-09-20-15-03-09-111.png, 
> image-2019-09-20-15-23-17-810.png, image-2019-09-20-15-25-23-674.png
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> when executing "./pyflink-shell.sh local",we can get such a Logo:
> !image-2019-09-20-15-03-09-111.png|width=396,height=553!
> It seems that the upper right corner of the squirrel is broken.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] hequn8128 commented on issue #9725: [FLINK-14140][python] Fix the broken flink logo in flink python shell.

2019-09-20 Thread GitBox
hequn8128 commented on issue #9725: [FLINK-14140][python] Fix the broken flink 
logo in flink python shell.
URL: https://github.com/apache/flink/pull/9725#issuecomment-533558849
 
 
   @WeiZhong94 Thanks a lot for the fixing. LGTM.
   Merging...


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] pnowojski edited a comment on issue #9706: [FLINK-14118][runtime]Reduce the unnecessary flushing when there is no data available for flush.

2019-09-20 Thread GitBox
pnowojski edited a comment on issue #9706: [FLINK-14118][runtime]Reduce the 
unnecessary flushing when there is no data available for flush.
URL: https://github.com/apache/flink/pull/9706#issuecomment-533557963
 
 
   Could you either show that this change improves some existing benchmarks or 
add a new benchmark to cover for this for the future?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] pnowojski commented on a change in pull request #9706: [FLINK-14118][runtime]Reduce the unnecessary flushing when there is no data available for flush.

2019-09-20 Thread GitBox
pnowojski commented on a change in pull request #9706: 
[FLINK-14118][runtime]Reduce the unnecessary flushing when there is no data 
available for flush.
URL: https://github.com/apache/flink/pull/9706#discussion_r326623064
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferConsumer.java
 ##
 @@ -163,6 +170,10 @@ public int getCached() {
return PositionMarker.getAbsolute(cachedPosition);
}
 
+   private int getLatest() {
+   return PositionMarker.getAbsolute(positionMarker.get());
 
 Review comment:
   This is adding new synchronisation point (`volatile` read) and will probably 
decrease performance in low latency or huge clusters high throughput scenarios.
   
   Let me think a bit about this issue.
   
   In the meantime could you provide results of `networkThroughput` benchmark 
with `1000,1ms` parameters before and after the change?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] pnowojski commented on issue #9706: [FLINK-14118][runtime]Reduce the unnecessary flushing when there is no data available for flush.

2019-09-20 Thread GitBox
pnowojski commented on issue #9706: [FLINK-14118][runtime]Reduce the 
unnecessary flushing when there is no data available for flush.
URL: https://github.com/apache/flink/pull/9706#issuecomment-533557963
 
 
   Could you provide at least results of `networkThroughput` benchmark with 
`1000,1ms` parameters before and after the change?
   
   Also could you either show that this change improves some existing 
benchmarks or to add a new benchmark to cover for this for the future?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot commented on issue #9736: [FLINK-14157] Disable StreamingFileSink e2e test for java 11

2019-09-20 Thread GitBox
flinkbot commented on issue #9736: [FLINK-14157] Disable StreamingFileSink e2e 
test for java 11
URL: https://github.com/apache/flink/pull/9736#issuecomment-533557856
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Automated Checks
   Last check on commit c8f2280889a4f076dc56329912b7890c8f828dd7 (Fri Sep 20 
13:39:35 UTC 2019)
   
   **Warnings:**
* **2 pom.xml files were touched**: Check for build and licensing issues.
* No documentation files were touched! Remember to keep the Flink docs up 
to date!
   
   
   Mention the bot in a comment to re-run the automated checks.
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full 
explanation of the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-14157) Temporarily remove S3 StreamingFileSink end-to-end test

2019-09-20 Thread Kostas Kloudas (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14157?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kostas Kloudas updated FLINK-14157:
---
Description: This issue temporarily disables the failing test for Java 11 
so that we can have a green travis build, until a proper solution is found. In 
addition, it removes the relocations for Java 8 so that the production code 
works for Java 8, the main java version Flink supports.  (was: This issue 
temporarily disables the failing test so that we can have a green travis build, 
until a proper solution is found.)

> Temporarily remove S3 StreamingFileSink end-to-end test
> ---
>
> Key: FLINK-14157
> URL: https://issues.apache.org/jira/browse/FLINK-14157
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / FileSystem, Tests
>Affects Versions: 1.10.0
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> This issue temporarily disables the failing test for Java 11 so that we can 
> have a green travis build, until a proper solution is found. In addition, it 
> removes the relocations for Java 8 so that the production code works for Java 
> 8, the main java version Flink supports.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot edited a comment on issue #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator

2019-09-20 Thread GitBox
flinkbot edited a comment on issue #9717: [FLINK-14044] [runtime] Reducing 
synchronization in AsyncWaitOperator
URL: https://github.com/apache/flink/pull/9717#issuecomment-533068819
 
 
   
   ## CI report:
   
   * 9cfda801891969ac460f45ea639d636b519f22db : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/128317194)
   * af7f6be848f0bd90a049d5ee9f7a38b1c3e2b972 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/128336908)
   * 43157eb165d9409fbdf4a2f773ef7d52dd74e759 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/128365390)
   * ff400d149ed36c63a90d2f3fcd517bde738e5af1 : CANCELED 
[Build](https://travis-ci.com/flink-ci/flink/builds/128402339)
   * 02dbf61e50e6c913610df7f586d7eb0f20529c13 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/128406923)
   * 68cf941c7e165a5c6b2a27f7ac716b29f76d1918 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/128472867)
   * 54aa0043ec73b03580eda2e0310b44bdee352a55 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/128495989)
   * e081ea5c536d6ebb828eb8b68c404379d23f5aef : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/128502020)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] gyfora edited a comment on issue #9727: [FLINK-14145] Fix getLatestCheckpoint(true) returns wrong checkpoint

2019-09-20 Thread GitBox
gyfora edited a comment on issue #9727: [FLINK-14145] Fix 
getLatestCheckpoint(true) returns wrong checkpoint
URL: https://github.com/apache/flink/pull/9727#issuecomment-533556876
 
 
   Sure I can probably clean this up a bit. I wanted to the minimal change that 
makes this correct so I added the explicit return in case the last entry is a 
checkpoint.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-14157) Temporarily remove S3 StreamingFileSink end-to-end test

2019-09-20 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14157?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-14157:
---
Labels: pull-request-available  (was: )

> Temporarily remove S3 StreamingFileSink end-to-end test
> ---
>
> Key: FLINK-14157
> URL: https://issues.apache.org/jira/browse/FLINK-14157
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / FileSystem, Tests
>Affects Versions: 1.10.0
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
>Priority: Major
>  Labels: pull-request-available
>
> This issue temporarily disables the failing test so that we can have a green 
> travis build, until a proper solution is found.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] gyfora commented on issue #9727: [FLINK-14145] Fix getLatestCheckpoint(true) returns wrong checkpoint

2019-09-20 Thread GitBox
gyfora commented on issue #9727: [FLINK-14145] Fix getLatestCheckpoint(true) 
returns wrong checkpoint
URL: https://github.com/apache/flink/pull/9727#issuecomment-533556876
 
 
   Sure I can probably clean this up if a bit. I wanted to the minimal change 
that makes this correct so I added the explicit return in case the last entry 
is a checkpoint.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] kl0u opened a new pull request #9736: [FLINK-14157] Disable StreamingFileSink e2e test for java 11

2019-09-20 Thread GitBox
kl0u opened a new pull request #9736: [FLINK-14157] Disable StreamingFileSink 
e2e test for java 11
URL: https://github.com/apache/flink/pull/9736
 
 
   ## What is the purpose of the change
   
   This PR disables the `StreamingFileSink` S3 test for java 11 and removes the 
`jaxb` relocations for java 8. This is a temporary solution to have green cron 
jobs until https://issues.apache.org/jira/browse/FLINK-13748 is properly solved.
   
   ## Brief change log
   
   * The relocations are removed from the `pom.xml` files of the 
`flink-s3-fs-hadoop` and `flink-s3-fs-presto` modules.
   
   * The S3 `StreamingFileSink` test is disabled for java 11 in the 
`split_misc.sh`.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (**yes** / no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
 - The serializers: (yes / **no** / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / **no** 
/ don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
 - The S3 file system connector: (**yes** / no / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / **no**)
 - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #9733: [FLINK-14154][ml] Add the class for multivariate Gaussian Distribution.

2019-09-20 Thread GitBox
flinkbot edited a comment on issue #9733: [FLINK-14154][ml] Add the class for 
multivariate Gaussian Distribution.
URL: https://github.com/apache/flink/pull/9733#issuecomment-533547601
 
 
   
   ## CI report:
   
   * ea3d0cd6da22ebd2846bf0f3a2fa1d9f21964675 : PENDING 
[Build](https://travis-ci.com/flink-ci/flink/builds/128511616)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot commented on issue #9735: [FLINK-14156][runtime] Submit timer trigger letters to task's mailbox with operator's precedence

2019-09-20 Thread GitBox
flinkbot commented on issue #9735: [FLINK-14156][runtime] Submit timer trigger 
letters to task's mailbox with operator's precedence
URL: https://github.com/apache/flink/pull/9735#issuecomment-533554096
 
 
   
   ## CI report:
   
   * 51167d9aaf8082150738dc5bd8b9e2319dd01570 : UNKNOWN
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #9734: [FLINK-14155][ml] Add a wrapper class of a JSON library to provide the unified json format.

2019-09-20 Thread GitBox
flinkbot edited a comment on issue #9734: [FLINK-14155][ml] Add a wrapper class 
of a JSON library to provide the unified json format.
URL: https://github.com/apache/flink/pull/9734#issuecomment-533547640
 
 
   
   ## CI report:
   
   * d0eb5e2b5a8f664a1de56dbe0ea3aa592452b7c2 : PENDING 
[Build](https://travis-ci.com/flink-ci/flink/builds/128511658)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #9730: [FLINK-14151][ml] Add class for DocHashIDFVectorizerModelMapper.

2019-09-20 Thread GitBox
flinkbot edited a comment on issue #9730: [FLINK-14151][ml] Add class for 
DocHashIDFVectorizerModelMapper.
URL: https://github.com/apache/flink/pull/9730#issuecomment-533540803
 
 
   
   ## CI report:
   
   * 67cf93e4ddd84ac53a64ac2e27950eda8fb1f1b2 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/128508865)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #9731: [FLINK-14152][ml] Add class for DocCountVectorizerMapper.

2019-09-20 Thread GitBox
flinkbot edited a comment on issue #9731: [FLINK-14152][ml] Add class for 
DocCountVectorizerMapper.
URL: https://github.com/apache/flink/pull/9731#issuecomment-533540895
 
 
   
   ## CI report:
   
   * 3ae6c348886b5e395e6701407e5667d682e9bdde : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/128508897)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] hequn8128 commented on a change in pull request #9707: [FLINK-14015][python] Introduces PythonScalarFunctionOperator to execute Python user-defined functions

2019-09-20 Thread GitBox
hequn8128 commented on a change in pull request #9707: [FLINK-14015][python] 
Introduces PythonScalarFunctionOperator to execute Python user-defined functions
URL: https://github.com/apache/flink/pull/9707#discussion_r326549641
 
 

 ##
 File path: 
flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/PythonScalarFunctionOperator.java
 ##
 @@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.python;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.python.PythonFunctionRunner;
+import org.apache.flink.table.functions.ScalarFunction;
+import org.apache.flink.table.functions.python.PythonFunctionInfo;
+import org.apache.flink.table.functions.python.PythonScalarFunctionRunner;
+import org.apache.flink.table.runtime.types.CRow;
+import org.apache.flink.table.runtime.types.CRowTypeInfo;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.utils.TypeConversions;
+import org.apache.flink.types.Row;
+
+import org.apache.beam.sdk.fn.data.FnDataReceiver;
+
+import java.util.concurrent.LinkedBlockingQueue;
+
+/**
+ * The {@link PythonScalarFunctionOperator} is responsible for executing 
Python {@link ScalarFunction}s.
+ * It executes the Python {@link ScalarFunction}s in separate Python execution 
environment.
+ *
+ * The inputs are assumed as the following format:
+ * {{{
+ *   +--+--+
+ *   | forwarded fields | extra fields |
+ *   +--+--+
+ * }}}.
+ *
+ * The Python UDFs may take input columns directly from the input row or 
the execution result of Java UDFs:
+ * 1) The input columns from the input row can be referred from the 'forwarded 
fields';
+ * 2) The Java UDFs will be computed and the execution results can be referred 
from the 'extra fields'.
+ *
+ * The outputs will be as the following format:
+ * {{{
+ *   +--+-+
+ *   | forwarded fields | scalar function results |
+ *   +--+-+
+ * }}}.
+ */
+@Internal
+public class PythonScalarFunctionOperator extends 
AbstractPythonScalarFunctionOperator {
 
 Review comment:
   Could we further merge the `PythonScalarFunctionOperator ` and 
`BaseRowPythonScalarFunctionOperator`? I see a lot of code(and class comments) 
could be moved into the base class. In this way, we can avoid introducing 
redundant code. For example:
   - Most part of the comments could be moved into the base class.
   - The member variable `forwardedInputQueue` and `udfResultQueue` could be 
moved into the base class with generic type.
   - `createPythonFunctionRunner` method could also be moved into the base 
class.
   - Inner class `PythonScalarFunctionRunnerWrapper` as well(maybe need some 
refactor). 
   
   This problem also exist in the test class.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-14118) Reduce the unnecessary flushing when there is no data available for flush

2019-09-20 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14118?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16934395#comment-16934395
 ] 

Piotr Nowojski commented on FLINK-14118:


As I wrote in the PR, I would be afraid it will make low latency use cases 
worse, as it's adding extra {{volatile}} read on the critical path.

One thing to ask. Have you only observed higher CPU usage while idling? Or 
actually reduced throughput? Because low latency changes from 1.5 are known to 
increase CPU usage when idling, while under full throughput everything goes 
back to normal.

> Reduce the unnecessary flushing when there is no data available for flush
> -
>
> Key: FLINK-14118
> URL: https://issues.apache.org/jira/browse/FLINK-14118
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Network
>Reporter: Yingjie Cao
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.10.0, 1.9.1, 1.8.3
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> The new flush implementation which works by triggering a netty user event may 
> cause performance regression compared to the old synchronization-based one. 
> More specifically, when there is exactly one BufferConsumer in the buffer 
> queue of subpartition and no new data will be added for a while in the future 
> (may because of just no input or the logic of the operator is to collect some 
> data for processing and will not emit records immediately), that is, there is 
> no data to send, the OutputFlusher will continuously notify data available 
> and wake up the netty thread, though no data will be returned by the 
> pollBuffer method.
> For some of our production jobs, this will incur 20% to 40% CPU overhead 
> compared to the old implementation. We tried to fix the problem by checking 
> if there is new data available when flushing, if there is no new data, the 
> netty thread will not be notified. It works for our jobs and the cpu usage 
> falls to previous level.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] hequn8128 commented on a change in pull request #9707: [FLINK-14015][python] Introduces PythonScalarFunctionOperator to execute Python user-defined functions

2019-09-20 Thread GitBox
hequn8128 commented on a change in pull request #9707: [FLINK-14015][python] 
Introduces PythonScalarFunctionOperator to execute Python user-defined functions
URL: https://github.com/apache/flink/pull/9707#discussion_r326526010
 
 

 ##
 File path: 
flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/PythonScalarFunctionOperator.java
 ##
 @@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.python;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.python.PythonFunctionRunner;
+import org.apache.flink.table.functions.ScalarFunction;
+import org.apache.flink.table.functions.python.PythonFunctionInfo;
+import org.apache.flink.table.functions.python.PythonScalarFunctionRunner;
+import org.apache.flink.table.runtime.types.CRow;
+import org.apache.flink.table.runtime.types.CRowTypeInfo;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.utils.TypeConversions;
+import org.apache.flink.types.Row;
+
+import org.apache.beam.sdk.fn.data.FnDataReceiver;
+
+import java.util.concurrent.LinkedBlockingQueue;
+
+/**
+ * The {@link PythonScalarFunctionOperator} is responsible for executing 
Python {@link ScalarFunction}s.
+ * It executes the Python {@link ScalarFunction}s in separate Python execution 
environment.
+ *
+ * The inputs are assumed as the following format:
+ * {{{
+ *   +--+--+
+ *   | forwarded fields | extra fields |
+ *   +--+--+
+ * }}}.
+ *
+ * The Python UDFs may take input columns directly from the input row or 
the execution result of Java UDFs:
+ * 1) The input columns from the input row can be referred from the 'forwarded 
fields';
+ * 2) The Java UDFs will be computed and the execution results can be referred 
from the 'extra fields'.
+ *
+ * The outputs will be as the following format:
+ * {{{
+ *   +--+-+
+ *   | forwarded fields | scalar function results |
+ *   +--+-+
+ * }}}.
+ */
+@Internal
+public class PythonScalarFunctionOperator extends 
AbstractPythonScalarFunctionOperator {
+
+   private static final long serialVersionUID = 1L;
+
+   /**
+* The collector used to collect records.
+*/
+   private transient StreamRecordCRowWrappingCollector cRowWrapper;
+
+   /**
+* The queue holding the input elements for which the execution results 
have not been received.
+*/
+   private transient LinkedBlockingQueue forwardedInputQueue;
+
+   /**
+* The queue holding the user-defined function execution results. The 
execution results are in
+* the same order as the input elements.
+*/
+   private transient LinkedBlockingQueue udfResultQueue;
+
+   /**
+* The type serializer for the forwarded fields.
+*/
+   private transient TypeSerializer forwardedInputSerializer;
+
+   public PythonScalarFunctionOperator(
+   PythonFunctionInfo[] scalarFunctions,
+   RowType inputType,
+   RowType outputType,
+   int[] udfInputOffsets,
+   int forwardedFieldCnt) {
+   super(scalarFunctions, inputType, outputType, udfInputOffsets, 
forwardedFieldCnt);
+   }
+
+   @Override
+   public void open() throws Exception {
+   super.open();
+   this.cRowWrapper = new 
StreamRecordCRowWrappingCollector(output);
+   this.forwardedInputQueue = new LinkedBlockingQueue<>();
+   this.udfResultQueue = new LinkedBlockingQueue<>();
+
+   CRowTypeInfo forwardedInputTypeInfo = new CRowTypeInfo(new 
RowTypeInfo(
+   getInputType().getFields().stream()
+   .limit(getForwardedFieldCnt())
+   .map(RowType.RowField::getType)
+

[GitHub] [flink] hequn8128 commented on a change in pull request #9707: [FLINK-14015][python] Introduces PythonScalarFunctionOperator to execute Python user-defined functions

2019-09-20 Thread GitBox
hequn8128 commented on a change in pull request #9707: [FLINK-14015][python] 
Introduces PythonScalarFunctionOperator to execute Python user-defined functions
URL: https://github.com/apache/flink/pull/9707#discussion_r326614662
 
 

 ##
 File path: 
flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractPythonFunctionOperator.java
 ##
 @@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.python;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.python.PythonFunctionRunner;
+import org.apache.flink.python.PythonOptions;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.BoundedOneInput;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Base class for all stream operators to execute Python functions.
+ */
+@Internal
+public abstract class AbstractPythonFunctionOperator
+   extends AbstractStreamOperator
+   implements OneInputStreamOperator, BoundedOneInput {
+
+   private static final long serialVersionUID = 1L;
+
+   /**
+* The {@link PythonFunctionRunner} which is responsible for Python 
user-defined function execution.
+*/
+   private transient PythonFunctionRunner pythonFunctionRunner;
+
+   /**
+* Use an AtomicBoolean because we start/stop bundles by a timer thread.
+*/
+   private transient AtomicBoolean bundleStarted;
+
+   /**
+* Number of processed elements in the current bundle.
+*/
+   private transient int elementCount;
+
+   /**
+* Max number of elements to include in a bundle.
+*/
+   private transient int maxBundleSize;
+
+   /**
+* Max duration of a bundle.
+*/
+   private transient long maxBundleTimeMills;
+
+   /**
+* Time that the last bundle was finished.
+*/
+   private transient long lastFinishBundleTime;
+
+   /**
+* A timer that finishes the current bundle after a fixed amount of 
time.
+*/
+   private transient ScheduledFuture checkFinishBundleTimer;
+
+   /**
+* Callback to be executed after the current bundle was finished.
+*/
+   private transient Runnable bundleFinishedCallback;
+
+   @Override
+   public void open() throws Exception {
+   try {
+   this.bundleStarted = new AtomicBoolean(false);
+
+   this.maxBundleSize = 
getOperatorConfig().getConfiguration().getInteger(PythonOptions.MAX_BUNDLE_SIZE);
+   this.maxBundleTimeMills =
+   
getOperatorConfig().getConfiguration().getLong(PythonOptions.MAX_BUNDLE_TIME_MILLS);
+
+   this.pythonFunctionRunner = 
createPythonFunctionRunner();
+   this.pythonFunctionRunner.open();
+
+   this.elementCount = 0;
+   this.lastFinishBundleTime = 
getProcessingTimeService().getCurrentProcessingTime();
+
+   // Schedule timer to check timeout of finish bundle.
+   long bundleCheckPeriod = 
Math.max(this.maxBundleTimeMills, 1);
+   this.checkFinishBundleTimer =
+   getProcessingTimeService()
+   .scheduleAtFixedRate(
+   // ProcessingTimeService 
callbacks are executed under the checkpointing lock
+   timestamp -> 
checkInvokeFinishBundleByTime(), bundleCheckPeriod, bundleCheckPeriod);
+   } finally {
+   super.open();
+   }
+   }
+
+   @Override
+   public void close() throws Exception {
+   try {
+   invokeFinishBundle();
+   } finally 

  1   2   3   4   >