[jira] [Created] (FLINK-10769) port InMemoryExternalCatalog to java

2018-11-02 Thread Bowen Li (JIRA)
Bowen Li created FLINK-10769:


 Summary: port InMemoryExternalCatalog to java
 Key: FLINK-10769
 URL: https://issues.apache.org/jira/browse/FLINK-10769
 Project: Flink
  Issue Type: Sub-task
  Components: Table API  SQL
Reporter: Bowen Li
Assignee: Bowen Li
 Fix For: 1.8.0


In the Flink-Hive integration design, we propose a new FlinkInMemoryCatalog 
(FLINK-10697) for production use. FlinkInMemoryCatalog will share some part 
with the existing InMemoryExternalCatalog, thus we need to make changes to 
InMemoryExternalCatalog.

As we are moving away from Scala to Java, we should write all new code/feature 
in Java. Therefore, we will port InMemoryExternalCatalog to java first without 
any feature or behavior change.

This is a pre-requisite for FLINK-10697



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10768) Move external catalog related code from TableEnvironment to CatalogManager

2018-11-02 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10768?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-10768:
---
Labels: pull-request-available  (was: )

> Move external catalog related code from TableEnvironment to CatalogManager
> --
>
> Key: FLINK-10768
> URL: https://issues.apache.org/jira/browse/FLINK-10768
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>
> Add a new CatalogManager class and port existing Calcite-directly-related 
> code from TableEnvironment into CatalogManager.
> This is NOT a feature, but purely refactor, thus no new functions should be 
> introduced. It acts the pre-requisite for FLINK-10698



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10768) Move external catalog related code from TableEnvironment to CatalogManager

2018-11-02 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10768?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16673922#comment-16673922
 ] 

ASF GitHub Bot commented on FLINK-10768:


bowenli86 opened a new pull request #7011: [FLINK-10768][Table & SQL] Move 
external catalog related code from TableEnvironment to CatalogManager
URL: https://github.com/apache/flink/pull/7011
 
 
   ## What is the purpose of the change
   
   In the design of Flink-Hive integration and external catalog enhancements, 
we propose using `CatalogManager` to manage all external catalogs and related 
aspects for a clearer separation. Currently all external catalogs are managed 
by `TableEnvironment`, which makes TableEnvironment bloated.
   
   Also, as discussed with @twalthr , since we are moving away from Scala, all 
new feature code will be in Java.
   
   This PR moves all EXISTING external catalogs related code from 
`TableEnvironment` to `CatalogManager`, and it's the initial step for 
`CatalogManager`. It blocks `[[FLINK-10698]Create CatalogManager class manages 
all external catalogs and temporary meta 
objects](https://issues.apache.org/jira/browse/FLINK-10698)`
   
   This PR is a PURE REFACTOR, only moving APIs around, NO change on the 
behaviors and NO new feature is added. 
   
   ## Brief change log
   
 - added `CatalogManager` in Java
 - moved external catalog related code from `TableEnvironment` to 
`CatalogManager`
 - added unit tests in `CatalogManagerTest`
   
   ## Verifying this change
   
   - This change is already covered by existing tests, such as 
*TableEnvironmentTest*.
   - This change added tests and can be verified as follows:
 - Added unit tests in `CatalogManagerTest`
   
   ## Does this pull request potentially affect one of the following parts:
   
   none
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Move external catalog related code from TableEnvironment to CatalogManager
> --
>
> Key: FLINK-10768
> URL: https://issues.apache.org/jira/browse/FLINK-10768
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>
> Add a new CatalogManager class and port existing Calcite-directly-related 
> code from TableEnvironment into CatalogManager.
> This is NOT a feature, but purely refactor, thus no new functions should be 
> introduced. It acts the pre-requisite for FLINK-10698



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] bowenli86 opened a new pull request #7011: [FLINK-10768][Table & SQL] Move external catalog related code from TableEnvironment to CatalogManager

2018-11-02 Thread GitBox
bowenli86 opened a new pull request #7011: [FLINK-10768][Table & SQL] Move 
external catalog related code from TableEnvironment to CatalogManager
URL: https://github.com/apache/flink/pull/7011
 
 
   ## What is the purpose of the change
   
   In the design of Flink-Hive integration and external catalog enhancements, 
we propose using `CatalogManager` to manage all external catalogs and related 
aspects for a clearer separation. Currently all external catalogs are managed 
by `TableEnvironment`, which makes TableEnvironment bloated.
   
   Also, as discussed with @twalthr , since we are moving away from Scala, all 
new feature code will be in Java.
   
   This PR moves all EXISTING external catalogs related code from 
`TableEnvironment` to `CatalogManager`, and it's the initial step for 
`CatalogManager`. It blocks `[[FLINK-10698]Create CatalogManager class manages 
all external catalogs and temporary meta 
objects](https://issues.apache.org/jira/browse/FLINK-10698)`
   
   This PR is a PURE REFACTOR, only moving APIs around, NO change on the 
behaviors and NO new feature is added. 
   
   ## Brief change log
   
 - added `CatalogManager` in Java
 - moved external catalog related code from `TableEnvironment` to 
`CatalogManager`
 - added unit tests in `CatalogManagerTest`
   
   ## Verifying this change
   
   - This change is already covered by existing tests, such as 
*TableEnvironmentTest*.
   - This change added tests and can be verified as follows:
 - Added unit tests in `CatalogManagerTest`
   
   ## Does this pull request potentially affect one of the following parts:
   
   none
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Comment Edited] (FLINK-10766) Add a link to flink-china.org in Flink website

2018-11-02 Thread Hequn Cheng (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10766?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16673911#comment-16673911
 ] 

Hequn Cheng edited comment on FLINK-10766 at 11/3/18 3:35 AM:
--

[~jark] Hi, thanks for the proposal. It is a good idea! There are a lot of 
Flink enthusiasts in China. The translated documents and meetup information 
would be very helpful. 



was (Author: hequn8128):
[~jark] Hi, thanks for the proposal. It is a good idea! There are a lot of 
Flink enthusiasts in China. The translated document and meetup informations 
would be very helpful. 


> Add a link to flink-china.org in Flink website
> --
>
> Key: FLINK-10766
> URL: https://issues.apache.org/jira/browse/FLINK-10766
> Project: Flink
>  Issue Type: Wish
>  Components: Project Website
>Reporter: Jark Wu
>Priority: Minor
>
> The Apache Flink China website has been reworked and published in the last 
> days, see more http://flink-china.org
> The flink-china.org is a very popular website in China for flink users. Now 
> we included the latest translated documentation (v1.6) and use-cases and 
> meetup events in the website.
> We hope to add a link to flink-china.org in the flink website 
> (flink.apache.org),  so that we can help and attract more Chinese Flink users 
> and improve the community.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10766) Add a link to flink-china.org in Flink website

2018-11-02 Thread Hequn Cheng (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10766?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16673911#comment-16673911
 ] 

Hequn Cheng commented on FLINK-10766:
-

[~jark] Hi, thanks for the proposal. It is a good idea! There are a lot of 
Flink enthusiasts in China. The translated document and meetup informations 
would be very helpful. 


> Add a link to flink-china.org in Flink website
> --
>
> Key: FLINK-10766
> URL: https://issues.apache.org/jira/browse/FLINK-10766
> Project: Flink
>  Issue Type: Wish
>  Components: Project Website
>Reporter: Jark Wu
>Priority: Minor
>
> The Apache Flink China website has been reworked and published in the last 
> days, see more http://flink-china.org
> The flink-china.org is a very popular website in China for flink users. Now 
> we included the latest translated documentation (v1.6) and use-cases and 
> meetup events in the website.
> We hope to add a link to flink-china.org in the flink website 
> (flink.apache.org),  so that we can help and attract more Chinese Flink users 
> and improve the community.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9172) Support external catalog factory that comes default with SQL-Client

2018-11-02 Thread Xuefu Zhang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9172?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16673895#comment-16673895
 ] 

Xuefu Zhang commented on FLINK-9172:


Hi [~eronwright], Just curious about your progress on this. I asked because 
this aligns well with our effort  in FLINK-10744, and like to move this forward 
along. Please let me know if any help is needed. Thanks.

> Support external catalog factory that comes default with SQL-Client
> ---
>
> Key: FLINK-9172
> URL: https://issues.apache.org/jira/browse/FLINK-9172
> Project: Flink
>  Issue Type: Sub-task
>  Components: SQL Client
>Reporter: Rong Rong
>Assignee: Eron Wright 
>Priority: Major
>
> It doesn't seem that the configuration (YAML) file allows specifications of 
> external catalogs currently. The request here is to add support for external 
> catalog specifications in YAML file. User should also be able to specify one 
> catalog is the default.
> It will be great to have SQL-Client to support some external catalogs 
> out-of-the-box for SQL users to configure and utilize easily. I am currently 
> think of having an external catalog factory that spins up both streaming and 
> batch external catalog table sources and sinks. This could greatly unify and 
> provide easy access for SQL users. 
> The catalog-related configurations then need to be processed and passed to 
> TableEnvironment accordingly by calling relevant APIs.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-4372) Add ability to take savepoints from job manager web UI

2018-11-02 Thread vinoyang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-4372?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

vinoyang reassigned FLINK-4372:
---

Assignee: vinoyang

> Add ability to take savepoints from job manager web UI
> --
>
> Key: FLINK-4372
> URL: https://issues.apache.org/jira/browse/FLINK-4372
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing, Webfrontend
>Reporter: Zhenzhong Xu
>Assignee: vinoyang
>Priority: Major
>
> subtask of FLINK-4336



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10768) Move external catalog related code from TableEnvironment to CatalogManager

2018-11-02 Thread Bowen Li (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10768?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bowen Li updated FLINK-10768:
-
Summary: Move external catalog related code from TableEnvironment to 
CatalogManager  (was: Move Calcite related code from TableEnvironment to 
CatalogManager)

> Move external catalog related code from TableEnvironment to CatalogManager
> --
>
> Key: FLINK-10768
> URL: https://issues.apache.org/jira/browse/FLINK-10768
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Major
> Fix For: 1.8.0
>
>
> Add a new CatalogManager class and port existing Calcite-directly-related 
> code from TableEnvironment into CatalogManager.
> This is NOT a feature, but purely refactor, thus no new functions should be 
> introduced. It acts the pre-requisite for FLINK-10698



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10768) Move Calcite related code from TableEnvironment to CatalogManager

2018-11-02 Thread Bowen Li (JIRA)
Bowen Li created FLINK-10768:


 Summary: Move Calcite related code from TableEnvironment to 
CatalogManager
 Key: FLINK-10768
 URL: https://issues.apache.org/jira/browse/FLINK-10768
 Project: Flink
  Issue Type: Sub-task
  Components: Table API  SQL
Reporter: Bowen Li
Assignee: Bowen Li
 Fix For: 1.8.0


Add a new CatalogManager class and port existing Calcite-directly-related code 
from TableEnvironment into CatalogManager.

This is NOT a feature, but purely refactor, thus no new functions should be 
introduced. It acts the pre-requisite for FLINK-10698



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-4372) Add ability to take savepoints from job manager web UI

2018-11-02 Thread Oleksandr Soldatov (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-4372?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16673611#comment-16673611
 ] 

Oleksandr Soldatov commented on FLINK-4372:
---

Hello, I guess even more useful button will be `cancel with savepoint`, may be 
disabled if job got no savepoint configuration (or configured via dialog).

> Add ability to take savepoints from job manager web UI
> --
>
> Key: FLINK-4372
> URL: https://issues.apache.org/jira/browse/FLINK-4372
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing, Webfrontend
>Reporter: Zhenzhong Xu
>Priority: Major
>
> subtask of FLINK-4336



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10736) Shaded Hadoop S3A end-to-end test failed on Travis

2018-11-02 Thread Stephan Ewen (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10736?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16673512#comment-16673512
 ] 

Stephan Ewen commented on FLINK-10736:
--

I would suggest to change that test to use a constant test data file. That 
would save us the brittleness of relying on an S3 upload in a bash script and 
would save us form eventual consistency visibility issues.

> Shaded Hadoop S3A end-to-end test failed on Travis
> --
>
> Key: FLINK-10736
> URL: https://issues.apache.org/jira/browse/FLINK-10736
> Project: Flink
>  Issue Type: Bug
>  Components: E2E Tests
>Affects Versions: 1.7.0
>Reporter: Till Rohrmann
>Priority: Critical
>  Labels: test-stability
> Fix For: 1.7.0
>
>
> The {{Shaded Hadoop S3A end-to-end test}} failed on Travis because it could 
> not find a file stored on S3:
> {code}
> org.apache.flink.client.program.ProgramInvocationException: Job failed. 
> (JobID: f28270bedd943ed6b41548b60f5cea73)
>   at 
> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:268)
>   at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:487)
>   at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:475)
>   at 
> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:62)
>   at 
> org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:85)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)
>   at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
>   at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427)
>   at 
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:813)
>   at 
> org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:287)
>   at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
>   at 
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050)
>   at 
> org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)
>   at java.security.AccessController.doPrivileged(Native Method)
>   at javax.security.auth.Subject.doAs(Subject.java:422)
>   at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
>   at 
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>   at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job 
> execution failed.
>   at 
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
>   at 
> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:265)
>   ... 21 more
> Caused by: java.io.IOException: Error opening the Input Split 
> s3://[secure]/flink-end-to-end-test-shaded-s3a [0,44]: No such file or 
> directory: s3://[secure]/flink-end-to-end-test-shaded-s3a
>   at 
> org.apache.flink.api.common.io.FileInputFormat.open(FileInputFormat.java:824)
>   at 
> org.apache.flink.api.common.io.DelimitedInputFormat.open(DelimitedInputFormat.java:470)
>   at 
> org.apache.flink.api.common.io.DelimitedInputFormat.open(DelimitedInputFormat.java:47)
>   at 
> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:170)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: java.io.FileNotFoundException: No such file or directory: 
> s3://[secure]/flink-end-to-end-test-shaded-s3a
>   at 
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:2255)
>   at 
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:2149)
>   at 
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:2088)
>   at 
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.open(S3AFileSystem.java:699)
>   at 
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.FileSystem.open(FileSystem.java:950)
>   at 
> 

[jira] [Commented] (FLINK-10732) force-shading 1.5.5 maven artifact was not released

2018-11-02 Thread Chesnay Schepler (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10732?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16673506#comment-16673506
 ] 

Chesnay Schepler commented on FLINK-10732:
--

Option 2 is what I'm shooting for, I'm just double-checking with INFRA that we 
can simply deploy it without any issues. (since this is done rarely) I've 
already linked the INFRA ticket.

> force-shading 1.5.5 maven artifact was not released
> ---
>
> Key: FLINK-10732
> URL: https://issues.apache.org/jira/browse/FLINK-10732
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.5.5
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
> Fix For: 1.5.5
>
>
> The 1.5.5 maven artifact for {{force-shading}} was not deployed. We have to 
> investigate whether other artifacts are missing as well and how to remedy the 
> problem, i.e. re-deploy the missing artifacts (which may or may not require a 
> separate vote).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10520) Job save points REST API fails unless parameters are specified

2018-11-02 Thread Huide Yin (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16673505#comment-16673505
 ] 

Huide Yin commented on FLINK-10520:
---

I wonder if `state.savepoints.dir:` specified as the default savpoint path in 
your flink-conf.yaml file?

`target-directory` is must have for specify the target location for saving 
savepoint

`cancel-job` is must since there's no separate `cacnel` request defined here: 
https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/rest_api.html

> Job save points REST API fails unless parameters are specified
> --
>
> Key: FLINK-10520
> URL: https://issues.apache.org/jira/browse/FLINK-10520
> Project: Flink
>  Issue Type: Bug
>  Components: REST
>Affects Versions: 1.6.1
>Reporter: Elias Levy
>Assignee: Chesnay Schepler
>Priority: Minor
>
> The new REST API POST endpoint, {{/jobs/:jobid/savepoints}}, returns an error 
> unless the request includes a body with all parameters ({{target-directory}} 
> and {{cancel-job}})), even thought the 
> [documentation|https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/runtime/rest/handler/job/savepoints/SavepointHandlers.html]
>  suggests these are optional.
> If a POST request with no data is made, the response is a 400 status code 
> with the error message "Bad request received."
> If the POST request submits an empty JSON object ( {} ), the response is a 
> 400 status code with the error message "Request did not match expected format 
> SavepointTriggerRequestBody."  The same is true if only the 
> {{target-directory}} or {{cancel-job}} parameters are included.
> As the system is configured with a default savepoint location, there 
> shouldn't be a need to include the parameter in the quest.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9920) BucketingSinkFaultToleranceITCase fails on travis

2018-11-02 Thread Bowen Li (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9920?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16673482#comment-16673482
 ] 

Bowen Li commented on FLINK-9920:
-

Still saw it. https://api.travis-ci.org/v3/job/449685897/log.txt

> BucketingSinkFaultToleranceITCase fails on travis
> -
>
> Key: FLINK-9920
> URL: https://issues.apache.org/jira/browse/FLINK-9920
> Project: Flink
>  Issue Type: Bug
>  Components: filesystem-connector
>Affects Versions: 1.6.0, 1.7.0
>Reporter: Chesnay Schepler
>Priority: Critical
>  Labels: test-stability
> Fix For: 1.7.0
>
>
> https://travis-ci.org/zentol/flink/jobs/407021898
> {code}
> Tests run: 1, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 13.082 sec 
> <<< FAILURE! - in 
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSinkFaultToleranceITCase
> runCheckpointedProgram(org.apache.flink.streaming.connectors.fs.bucketing.BucketingSinkFaultToleranceITCase)
>   Time elapsed: 5.696 sec  <<< FAILURE!
> java.lang.AssertionError: Read line does not match expected pattern.
>   at org.junit.Assert.fail(Assert.java:88)
>   at 
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSinkFaultToleranceITCase.postSubmit(BucketingSinkFaultToleranceITCase.java:182)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10732) force-shading 1.5.5 maven artifact was not released

2018-11-02 Thread Maximilian Michels (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10732?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16673474#comment-16673474
 ] 

Maximilian Michels commented on FLINK-10732:


Possibles fixes:

1) Release a new version of all the other artifacts referencing 
force-shading:1.5.5 and let it point to an older version, e.g. 
force-shading:1.5.4 or flink-shaded-force-shading:1.0.

2) Release force-shading:1.5.5 (From what I know this wouldn't be too hard and 
we have done it for other artifacts in the past)

3) Let downstream deal with it by letting them exclude force-shading during 
their build (works, but not really a good solution)

> force-shading 1.5.5 maven artifact was not released
> ---
>
> Key: FLINK-10732
> URL: https://issues.apache.org/jira/browse/FLINK-10732
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.5.5
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
> Fix For: 1.5.5
>
>
> The 1.5.5 maven artifact for {{force-shading}} was not deployed. We have to 
> investigate whether other artifacts are missing as well and how to remedy the 
> problem, i.e. re-deploy the missing artifacts (which may or may not require a 
> separate vote).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10356) Add sanity checks to SpillingAdaptiveSpanningRecordDeserializer

2018-11-02 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10356?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16673460#comment-16673460
 ] 

ASF GitHub Bot commented on FLINK-10356:


NicoK commented on a change in pull request #6705: [FLINK-10356][network] add 
sanity checks to SpillingAdaptiveSpanningRecordDeserializer
URL: https://github.com/apache/flink/pull/6705#discussion_r230452399
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java
 ##
 @@ -104,11 +120,236 @@ public void testHandleMixedLargeRecords() throws 
Exception {
testSerializationRoundTrip(originalRecords, segmentSize);
}
 
+   /**
+* Non-spanning, deserialization reads one byte too many and succeeds.
+*/
+   @Test
+   public void testHandleDeserializingTooMuchNonSpanning1() throws 
Exception {
+   testHandleWrongDeserialization(
+   new StringValueDeserializingTooMuch("Test string"),
+   32 * 1024,
+   null);
+   }
+
+   /**
+* Non-spanning, serialization length is 16 (including headers), 
deserialization reads one byte
+* too many and succeeds.
+*/
+   @Test
+   public void testHandleDeserializingTooMuchNonSpanning2() throws 
Exception {
+   testHandleWrongDeserialization(
+   new StringValueDeserializingTooMuch("Test string"),
 
 Review comment:
   actually, the type does not really matter - I could just as well use a 
`LargeObjectType` of an arbitrary length


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add sanity checks to SpillingAdaptiveSpanningRecordDeserializer
> ---
>
> Key: FLINK-10356
> URL: https://issues.apache.org/jira/browse/FLINK-10356
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.5.0, 1.5.1, 1.5.2, 1.5.3, 1.5.4, 1.6.0, 1.6.1, 1.7.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Major
>  Labels: pull-request-available
>
> {{SpillingAdaptiveSpanningRecordDeserializer}} doesn't have any consistency 
> checks for usage calls or serializers behaving properly, e.g. to read only as 
> many bytes as available/promised for that record. At least these checks 
> should be added:
>  # Check that buffers have not been read from yet before adding them (this is 
> an invariant {{SpillingAdaptiveSpanningRecordDeserializer}} works with and 
> from what I can see, it is followed now.
>  # Check that after deserialization, we actually consumed {{recordLength}} 
> bytes
>  ** If not, in the spanning deserializer, we currently simply skip the 
> remaining bytes.
>  ** But in the non-spanning deserializer, we currently continue from the 
> wrong offset.
>  # Protect against {{setNextBuffer}} being called before draining all 
> available records



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10356) Add sanity checks to SpillingAdaptiveSpanningRecordDeserializer

2018-11-02 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10356?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16673459#comment-16673459
 ] 

ASF GitHub Bot commented on FLINK-10356:


NicoK commented on a change in pull request #6705: [FLINK-10356][network] add 
sanity checks to SpillingAdaptiveSpanningRecordDeserializer
URL: https://github.com/apache/flink/pull/6705#discussion_r230452399
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java
 ##
 @@ -104,11 +120,236 @@ public void testHandleMixedLargeRecords() throws 
Exception {
testSerializationRoundTrip(originalRecords, segmentSize);
}
 
+   /**
+* Non-spanning, deserialization reads one byte too many and succeeds.
+*/
+   @Test
+   public void testHandleDeserializingTooMuchNonSpanning1() throws 
Exception {
+   testHandleWrongDeserialization(
+   new StringValueDeserializingTooMuch("Test string"),
+   32 * 1024,
+   null);
+   }
+
+   /**
+* Non-spanning, serialization length is 16 (including headers), 
deserialization reads one byte
+* too many and succeeds.
+*/
+   @Test
+   public void testHandleDeserializingTooMuchNonSpanning2() throws 
Exception {
+   testHandleWrongDeserialization(
+   new StringValueDeserializingTooMuch("Test string"),
 
 Review comment:
   actually, the type does not really matter - I could just as well use a 
`LargeObjectType` or an arbitrary length


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add sanity checks to SpillingAdaptiveSpanningRecordDeserializer
> ---
>
> Key: FLINK-10356
> URL: https://issues.apache.org/jira/browse/FLINK-10356
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.5.0, 1.5.1, 1.5.2, 1.5.3, 1.5.4, 1.6.0, 1.6.1, 1.7.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Major
>  Labels: pull-request-available
>
> {{SpillingAdaptiveSpanningRecordDeserializer}} doesn't have any consistency 
> checks for usage calls or serializers behaving properly, e.g. to read only as 
> many bytes as available/promised for that record. At least these checks 
> should be added:
>  # Check that buffers have not been read from yet before adding them (this is 
> an invariant {{SpillingAdaptiveSpanningRecordDeserializer}} works with and 
> from what I can see, it is followed now.
>  # Check that after deserialization, we actually consumed {{recordLength}} 
> bytes
>  ** If not, in the spanning deserializer, we currently simply skip the 
> remaining bytes.
>  ** But in the non-spanning deserializer, we currently continue from the 
> wrong offset.
>  # Protect against {{setNextBuffer}} being called before draining all 
> available records



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] NicoK commented on a change in pull request #6705: [FLINK-10356][network] add sanity checks to SpillingAdaptiveSpanningRecordDeserializer

2018-11-02 Thread GitBox
NicoK commented on a change in pull request #6705: [FLINK-10356][network] add 
sanity checks to SpillingAdaptiveSpanningRecordDeserializer
URL: https://github.com/apache/flink/pull/6705#discussion_r230452399
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java
 ##
 @@ -104,11 +120,236 @@ public void testHandleMixedLargeRecords() throws 
Exception {
testSerializationRoundTrip(originalRecords, segmentSize);
}
 
+   /**
+* Non-spanning, deserialization reads one byte too many and succeeds.
+*/
+   @Test
+   public void testHandleDeserializingTooMuchNonSpanning1() throws 
Exception {
+   testHandleWrongDeserialization(
+   new StringValueDeserializingTooMuch("Test string"),
+   32 * 1024,
+   null);
+   }
+
+   /**
+* Non-spanning, serialization length is 16 (including headers), 
deserialization reads one byte
+* too many and succeeds.
+*/
+   @Test
+   public void testHandleDeserializingTooMuchNonSpanning2() throws 
Exception {
+   testHandleWrongDeserialization(
+   new StringValueDeserializingTooMuch("Test string"),
 
 Review comment:
   actually, the type does not really matter - I could just as well use a 
`LargeObjectType` or an arbitrary length


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] NicoK commented on a change in pull request #6705: [FLINK-10356][network] add sanity checks to SpillingAdaptiveSpanningRecordDeserializer

2018-11-02 Thread GitBox
NicoK commented on a change in pull request #6705: [FLINK-10356][network] add 
sanity checks to SpillingAdaptiveSpanningRecordDeserializer
URL: https://github.com/apache/flink/pull/6705#discussion_r230452399
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java
 ##
 @@ -104,11 +120,236 @@ public void testHandleMixedLargeRecords() throws 
Exception {
testSerializationRoundTrip(originalRecords, segmentSize);
}
 
+   /**
+* Non-spanning, deserialization reads one byte too many and succeeds.
+*/
+   @Test
+   public void testHandleDeserializingTooMuchNonSpanning1() throws 
Exception {
+   testHandleWrongDeserialization(
+   new StringValueDeserializingTooMuch("Test string"),
+   32 * 1024,
+   null);
+   }
+
+   /**
+* Non-spanning, serialization length is 16 (including headers), 
deserialization reads one byte
+* too many and succeeds.
+*/
+   @Test
+   public void testHandleDeserializingTooMuchNonSpanning2() throws 
Exception {
+   testHandleWrongDeserialization(
+   new StringValueDeserializingTooMuch("Test string"),
 
 Review comment:
   actually, the type does not really matter - I could just as well use a 
`LargeObjectType` of an arbitrary length


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Assigned] (FLINK-10353) Restoring a KafkaProducer with Semantic.EXACTLY_ONCE from a savepoint written with Semantic.AT_LEAST_ONCE fails with NPE

2018-11-02 Thread Stefan Richter (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10353?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stefan Richter reassigned FLINK-10353:
--

Assignee: Stefan Richter

> Restoring a KafkaProducer with Semantic.EXACTLY_ONCE from a savepoint written 
> with Semantic.AT_LEAST_ONCE fails with NPE
> 
>
> Key: FLINK-10353
> URL: https://issues.apache.org/jira/browse/FLINK-10353
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.5.3, 1.6.0
>Reporter: Konstantin Knauf
>Assignee: Stefan Richter
>Priority: Critical
>  Labels: pull-request-available
>
> If a KafkaProducer with {{Semantic.EXACTLY_ONCE}} is restored from a 
> savepoint written with {{Semantic.AT_LEAST_ONCE}} the job fails on restore 
> with the NPE below. This makes it impossible to upgrade an AT_LEAST_ONCE 
> pipeline to an EXACTL_ONCE pipeline statefully.
> {quote}
> java.lang.NullPointerException
>   at java.util.Hashtable.put(Hashtable.java:460)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initTransactionalProducer(FlinkKafkaProducer011.java:955)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.recoverAndCommit(FlinkKafkaProducer011.java:733)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.recoverAndCommit(FlinkKafkaProducer011.java:93)
>   at 
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.recoverAndCommitInternal(TwoPhaseCommitSinkFunction.java:373)
>   at 
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.initializeState(TwoPhaseCommitSinkFunction.java:333)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initializeState(FlinkKafkaProducer011.java:867)
>   at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
>   at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
>   at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:254)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>   at java.lang.Thread.run(Thread.java:748){quote}
> The reason is, that for {{Semantic.AT_LEAST_ONCE}} the snapshotted state of 
> the {{TwoPhaseCommitFunction}} is of the form 
> "TransactionHolder\{handle=KafkaTransactionState [transactionalId=null, 
> producerId=-1, epoch=-1], transactionStartTime=1537175471175}".



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10720) Add stress deployment end-to-end test

2018-11-02 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10720?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16673442#comment-16673442
 ] 

ASF GitHub Bot commented on FLINK-10720:


StefanRRichter commented on issue #6994: [FLINK-10720][tests] Add deployment 
end-to-end stress test with many …
URL: https://github.com/apache/flink/pull/6994#issuecomment-435449615
 
 
   @tillrohrmann that depends how we want to interpret the result. Right now, 
the test is passed when it somehow manages to do one successsful recovery. If 
we want to test that the first recovery must already be successful, we could 
limit the number of allowed failed attempts to 1. Would you prefer to enforce 
succesful recovery in the first attempt?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add stress deployment end-to-end test
> -
>
> Key: FLINK-10720
> URL: https://issues.apache.org/jira/browse/FLINK-10720
> Project: Flink
>  Issue Type: Sub-task
>  Components: E2E Tests
>Affects Versions: 1.7.0
>Reporter: Till Rohrmann
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> In order to test Flink's scalability, I suggest to add an end-to-end test 
> which tests the deployment of a job which is very demanding. The job should 
> have large {{TaskDeploymentDescriptors}} (e.g. a job using union state or 
> having a high degree of parallelism). That way we can test that the 
> serialization overhead of the TDDs does not affect the health of the cluster 
> (e.g. heartbeats are not affected because the serialization does not happen 
> in the main thread).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] StefanRRichter commented on issue #6994: [FLINK-10720][tests] Add deployment end-to-end stress test with many …

2018-11-02 Thread GitBox
StefanRRichter commented on issue #6994: [FLINK-10720][tests] Add deployment 
end-to-end stress test with many …
URL: https://github.com/apache/flink/pull/6994#issuecomment-435449615
 
 
   @tillrohrmann that depends how we want to interpret the result. Right now, 
the test is passed when it somehow manages to do one successsful recovery. If 
we want to test that the first recovery must already be successful, we could 
limit the number of allowed failed attempts to 1. Would you prefer to enforce 
succesful recovery in the first attempt?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10455) Potential Kafka producer leak in case of failures

2018-11-02 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10455?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16673441#comment-16673441
 ] 

ASF GitHub Bot commented on FLINK-10455:


azagrebin commented on a change in pull request #6989: [FLINK-10455][Kafka Tx] 
Close transactional producers in case of failure and termination
URL: https://github.com/apache/flink/pull/6989#discussion_r230447543
 
 

 ##
 File path: 
flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
 ##
 @@ -671,6 +672,9 @@ public void close() throws FlinkKafka011Exception {
}
// make sure we propagate pending errors
checkErroneous();
+   pendingTransactions().values().forEach(transaction ->
 
 Review comment:
   `TransactionManager#maybeFailWithError` contains just a check, it does not 
help with the deadlock so far


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Potential Kafka producer leak in case of failures
> -
>
> Key: FLINK-10455
> URL: https://issues.apache.org/jira/browse/FLINK-10455
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.5.2
>Reporter: Nico Kruber
>Assignee: Andrey Zagrebin
>Priority: Major
>  Labels: pull-request-available
>
> If the Kafka brokers' timeout is too low for our checkpoint interval [1], we 
> may get an {{ProducerFencedException}}. Documentation around 
> {{ProducerFencedException}} explicitly states that we should close the 
> producer after encountering it.
> By looking at the code, it doesn't seem like this is actually done in 
> {{FlinkKafkaProducer011}}. Also, in case one transaction's commit in 
> {{TwoPhaseCommitSinkFunction#notifyCheckpointComplete}} fails with an 
> exception, we don't clean up (nor try to commit) any other transaction.
> -> from what I see, {{TwoPhaseCommitSinkFunction#notifyCheckpointComplete}} 
> simply iterates over the {{pendingCommitTransactions}} which is not touched 
> during {{close()}}
> Now if we restart the failing job on the same Flink cluster, any resources 
> from the previous attempt will still linger around.
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/connectors/kafka.html#kafka-011



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] azagrebin commented on a change in pull request #6989: [FLINK-10455][Kafka Tx] Close transactional producers in case of failure and termination

2018-11-02 Thread GitBox
azagrebin commented on a change in pull request #6989: [FLINK-10455][Kafka Tx] 
Close transactional producers in case of failure and termination
URL: https://github.com/apache/flink/pull/6989#discussion_r230447543
 
 

 ##
 File path: 
flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
 ##
 @@ -671,6 +672,9 @@ public void close() throws FlinkKafka011Exception {
}
// make sure we propagate pending errors
checkErroneous();
+   pendingTransactions().values().forEach(transaction ->
 
 Review comment:
   `TransactionManager#maybeFailWithError` contains just a check, it does not 
help with the deadlock so far


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10353) Restoring a KafkaProducer with Semantic.EXACTLY_ONCE from a savepoint written with Semantic.AT_LEAST_ONCE fails with NPE

2018-11-02 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10353?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16673437#comment-16673437
 ] 

ASF GitHub Bot commented on FLINK-10353:


StefanRRichter opened a new pull request #7010: [FLINK-10353][kafka] Support 
change of transactional semantics in Kaf…
URL: https://github.com/apache/flink/pull/7010
 
 
   …ka Producer with existing state
   
   ## What is the purpose of the change
   
   This PR changes `FlinkKafkaProducer` and  `FlinkKafkaProducer011` to support 
a change of transactional semantics when restoring from existing state, e.g. a 
savepoint.
   
   ## Brief change log
   
   - Introduced `KafkaTransactionState#isTransactional` to distinguish which 
transactional handling should be applied instead of relying on what is 
currently configured.
   
   - Call `initializeUserContext` in `TwoPhaseCommitSinkFunction` for all cases 
that did not recover a user context.
   
   - Consider removing tranactional id from the properties when creating a new 
producer to deactivate transactional semantics if no longer required.
   
   
   ## Verifying this change
   
   Added `FlinkKafkaProducer(11)ITCase#testMigrateFromAtLeastOnceToExactlyOnce` 
and `#testMigrateFromAtExactlyOnceToAtLeastOnce`
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not applicable)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Restoring a KafkaProducer with Semantic.EXACTLY_ONCE from a savepoint written 
> with Semantic.AT_LEAST_ONCE fails with NPE
> 
>
> Key: FLINK-10353
> URL: https://issues.apache.org/jira/browse/FLINK-10353
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.5.3, 1.6.0
>Reporter: Konstantin Knauf
>Priority: Critical
>  Labels: pull-request-available
>
> If a KafkaProducer with {{Semantic.EXACTLY_ONCE}} is restored from a 
> savepoint written with {{Semantic.AT_LEAST_ONCE}} the job fails on restore 
> with the NPE below. This makes it impossible to upgrade an AT_LEAST_ONCE 
> pipeline to an EXACTL_ONCE pipeline statefully.
> {quote}
> java.lang.NullPointerException
>   at java.util.Hashtable.put(Hashtable.java:460)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initTransactionalProducer(FlinkKafkaProducer011.java:955)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.recoverAndCommit(FlinkKafkaProducer011.java:733)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.recoverAndCommit(FlinkKafkaProducer011.java:93)
>   at 
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.recoverAndCommitInternal(TwoPhaseCommitSinkFunction.java:373)
>   at 
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.initializeState(TwoPhaseCommitSinkFunction.java:333)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initializeState(FlinkKafkaProducer011.java:867)
>   at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
>   at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
>   at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:254)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>   at java.lang.Thread.run(Thread.java:748){quote}
> The reason is, that for 

[jira] [Updated] (FLINK-10353) Restoring a KafkaProducer with Semantic.EXACTLY_ONCE from a savepoint written with Semantic.AT_LEAST_ONCE fails with NPE

2018-11-02 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10353?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-10353:
---
Labels: pull-request-available  (was: )

> Restoring a KafkaProducer with Semantic.EXACTLY_ONCE from a savepoint written 
> with Semantic.AT_LEAST_ONCE fails with NPE
> 
>
> Key: FLINK-10353
> URL: https://issues.apache.org/jira/browse/FLINK-10353
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.5.3, 1.6.0
>Reporter: Konstantin Knauf
>Priority: Critical
>  Labels: pull-request-available
>
> If a KafkaProducer with {{Semantic.EXACTLY_ONCE}} is restored from a 
> savepoint written with {{Semantic.AT_LEAST_ONCE}} the job fails on restore 
> with the NPE below. This makes it impossible to upgrade an AT_LEAST_ONCE 
> pipeline to an EXACTL_ONCE pipeline statefully.
> {quote}
> java.lang.NullPointerException
>   at java.util.Hashtable.put(Hashtable.java:460)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initTransactionalProducer(FlinkKafkaProducer011.java:955)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.recoverAndCommit(FlinkKafkaProducer011.java:733)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.recoverAndCommit(FlinkKafkaProducer011.java:93)
>   at 
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.recoverAndCommitInternal(TwoPhaseCommitSinkFunction.java:373)
>   at 
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.initializeState(TwoPhaseCommitSinkFunction.java:333)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initializeState(FlinkKafkaProducer011.java:867)
>   at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
>   at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
>   at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:254)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>   at java.lang.Thread.run(Thread.java:748){quote}
> The reason is, that for {{Semantic.AT_LEAST_ONCE}} the snapshotted state of 
> the {{TwoPhaseCommitFunction}} is of the form 
> "TransactionHolder\{handle=KafkaTransactionState [transactionalId=null, 
> producerId=-1, epoch=-1], transactionStartTime=1537175471175}".



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] StefanRRichter opened a new pull request #7010: [FLINK-10353][kafka] Support change of transactional semantics in Kaf…

2018-11-02 Thread GitBox
StefanRRichter opened a new pull request #7010: [FLINK-10353][kafka] Support 
change of transactional semantics in Kaf…
URL: https://github.com/apache/flink/pull/7010
 
 
   …ka Producer with existing state
   
   ## What is the purpose of the change
   
   This PR changes `FlinkKafkaProducer` and  `FlinkKafkaProducer011` to support 
a change of transactional semantics when restoring from existing state, e.g. a 
savepoint.
   
   ## Brief change log
   
   - Introduced `KafkaTransactionState#isTransactional` to distinguish which 
transactional handling should be applied instead of relying on what is 
currently configured.
   
   - Call `initializeUserContext` in `TwoPhaseCommitSinkFunction` for all cases 
that did not recover a user context.
   
   - Consider removing tranactional id from the properties when creating a new 
producer to deactivate transactional semantics if no longer required.
   
   
   ## Verifying this change
   
   Added `FlinkKafkaProducer(11)ITCase#testMigrateFromAtLeastOnceToExactlyOnce` 
and `#testMigrateFromAtExactlyOnceToAtLeastOnce`
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not applicable)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] isunjin commented on a change in pull request #6684: [FLINK-10205] Batch Job: InputSplit Fault tolerant for DataSource…

2018-11-02 Thread GitBox
isunjin commented on a change in pull request #6684: [FLINK-10205] Batch 
Job: InputSplit Fault tolerant for DataSource…
URL: https://github.com/apache/flink/pull/6684#discussion_r230445728
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
 ##
 @@ -249,6 +254,19 @@ public CoLocationConstraint getLocationConstraint() {
return locationConstraint;
}
 
+   public InputSplit getNextInputSplit(int index, String host) {
+   final int taskId = this.getParallelSubtaskIndex();
+   synchronized (this.inputSplits) {
+   if (index < this.inputSplits.size()) {
+   return this.inputSplits.get(index);
 
 Review comment:
   Thanks, removed.
   It was intent for support speculative execution, that multiple version of 
the same executionVertex can be run at same time, but i will postpone the 
feature and add it later


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10205) Batch Job: InputSplit Fault tolerant for DataSourceTask

2018-11-02 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16673434#comment-16673434
 ] 

ASF GitHub Bot commented on FLINK-10205:


isunjin commented on a change in pull request #6684: [FLINK-10205] Batch 
Job: InputSplit Fault tolerant for DataSource…
URL: https://github.com/apache/flink/pull/6684#discussion_r230445728
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
 ##
 @@ -249,6 +254,19 @@ public CoLocationConstraint getLocationConstraint() {
return locationConstraint;
}
 
+   public InputSplit getNextInputSplit(int index, String host) {
+   final int taskId = this.getParallelSubtaskIndex();
+   synchronized (this.inputSplits) {
+   if (index < this.inputSplits.size()) {
+   return this.inputSplits.get(index);
 
 Review comment:
   Thanks, removed.
   It was intent for support speculative execution, that multiple version of 
the same executionVertex can be run at same time, but i will postpone the 
feature and add it later


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Batch Job: InputSplit Fault tolerant for DataSourceTask
> ---
>
> Key: FLINK-10205
> URL: https://issues.apache.org/jira/browse/FLINK-10205
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager
>Affects Versions: 1.6.1, 1.6.2, 1.7.0
>Reporter: JIN SUN
>Assignee: JIN SUN
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> Today DataSource Task pull InputSplits from JobManager to achieve better 
> performance, however, when a DataSourceTask failed and rerun, it will not get 
> the same splits as its previous version. this will introduce inconsistent 
> result or even data corruption.
> Furthermore,  if there are two executions run at the same time (in batch 
> scenario), this two executions should process same splits.
> we need to fix the issue to make the inputs of a DataSourceTask 
> deterministic. The propose is save all splits into ExecutionVertex and 
> DataSourceTask will pull split from there.
>  document:
> [https://docs.google.com/document/d/1FdZdcA63tPUEewcCimTFy9Iz2jlVlMRANZkO4RngIuk/edit?usp=sharing]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10633) End-to-end test: Prometheus reporter

2018-11-02 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10633?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16673430#comment-16673430
 ] 

ASF GitHub Bot commented on FLINK-10633:


zentol commented on issue #7003: [FLINK-10633][prometheus] Add E2E test
URL: https://github.com/apache/flink/pull/7003#issuecomment-435447361
 
 
   For one this was an experiment; I hated writing this test in bash, so 
instead I ported it to java. We may use it for future tests, we may rework it 
entirely. But from what I've seen things get done significantly faster if at 
least you have some starting point. The majority of the code here is sane and 
easily re-usable in whatever direction we want to go with. The mistake we did 
in the past was to jump ahead at the first stuff we came up with (our bash 
stuff) and not critically analyze it when doing so.
   
   Furthermore, I don't consider this a framework. It's too simply for that, 
all it adds is a programmatic way to interact with flink-dist. That's it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> End-to-end test: Prometheus reporter
> 
>
> Key: FLINK-10633
> URL: https://issues.apache.org/jira/browse/FLINK-10633
> Project: Flink
>  Issue Type: Sub-task
>  Components: E2E Tests
>Affects Versions: 1.7.0
>Reporter: Till Rohrmann
>Assignee: Chesnay Schepler
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Add an end-to-end test using the {{PrometheusReporter}} to verify that all 
> metrics are properly reported. Additionally verify that the newly introduce 
> RocksDB metrics are accessible (see FLINK-10423).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] zentol commented on issue #7003: [FLINK-10633][prometheus] Add E2E test

2018-11-02 Thread GitBox
zentol commented on issue #7003: [FLINK-10633][prometheus] Add E2E test
URL: https://github.com/apache/flink/pull/7003#issuecomment-435447361
 
 
   For one this was an experiment; I hated writing this test in bash, so 
instead I ported it to java. We may use it for future tests, we may rework it 
entirely. But from what I've seen things get done significantly faster if at 
least you have some starting point. The majority of the code here is sane and 
easily re-usable in whatever direction we want to go with. The mistake we did 
in the past was to jump ahead at the first stuff we came up with (our bash 
stuff) and not critically analyze it when doing so.
   
   Furthermore, I don't consider this a framework. It's too simply for that, 
all it adds is a programmatic way to interact with flink-dist. That's it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10720) Add stress deployment end-to-end test

2018-11-02 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10720?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16673421#comment-16673421
 ] 

ASF GitHub Bot commented on FLINK-10720:


tillrohrmann commented on a change in pull request #6994: [FLINK-10720][tests] 
Add deployment end-to-end stress test with many …
URL: https://github.com/apache/flink/pull/6994#discussion_r230441106
 
 

 ##
 File path: flink-end-to-end-tests/test-scripts/test_heavy_deployment.sh
 ##
 @@ -0,0 +1,50 @@
+#!/usr/bin/env bash
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+source "$(dirname "$0")"/common.sh
+
+CHECKPOINT_DIR="file://$TEST_DATA_DIR/savepoint-e2e-test-chckpt-dir"
+
+TEST=flink-heavy-deployment-stress-test
+TEST_PROGRAM_NAME=HeavyDeploymentStressTestProgram
+TEST_PROGRAM_JAR=${END_TO_END_DIR}/$TEST/target/$TEST_PROGRAM_NAME.jar
+
+set_conf "taskmanager.heap.mb" "256" # 256Mb x 20 TMs = 5Gb total heap
+
+set_conf "taskmanager.memory.size" "8" # 8Mb
+set_conf "taskmanager.network.memory.min" "8mb"
+set_conf "taskmanager.network.memory.max" "8mb"
+set_conf "taskmanager.memory.segment-size" "8kb"
+
+set_conf "taskmanager.numberOfTaskSlots" "11"
+
+start_cluster
+
+NUM_TMS=20
+
+#20 x 13 slots to support a parallelism of 256
+echo "Start $NUM_TMS more task managers"
+for i in `seq 1 $NUM_TMS`; do
+$FLINK_DIR/bin/taskmanager.sh start
+done
+
+$FLINK_DIR/bin/flink run -p 200 ${TEST_PROGRAM_JAR} \
+--environment.max_parallelism 1024 --environment.parallelism 200 \
 
 Review comment:
   Why don't we use all slots?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add stress deployment end-to-end test
> -
>
> Key: FLINK-10720
> URL: https://issues.apache.org/jira/browse/FLINK-10720
> Project: Flink
>  Issue Type: Sub-task
>  Components: E2E Tests
>Affects Versions: 1.7.0
>Reporter: Till Rohrmann
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> In order to test Flink's scalability, I suggest to add an end-to-end test 
> which tests the deployment of a job which is very demanding. The job should 
> have large {{TaskDeploymentDescriptors}} (e.g. a job using union state or 
> having a high degree of parallelism). That way we can test that the 
> serialization overhead of the TDDs does not affect the health of the cluster 
> (e.g. heartbeats are not affected because the serialization does not happen 
> in the main thread).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10720) Add stress deployment end-to-end test

2018-11-02 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10720?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16673426#comment-16673426
 ] 

ASF GitHub Bot commented on FLINK-10720:


tillrohrmann commented on a change in pull request #6994: [FLINK-10720][tests] 
Add deployment end-to-end stress test with many …
URL: https://github.com/apache/flink/pull/6994#discussion_r230440912
 
 

 ##
 File path: flink-end-to-end-tests/test-scripts/test_heavy_deployment.sh
 ##
 @@ -0,0 +1,50 @@
+#!/usr/bin/env bash
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+source "$(dirname "$0")"/common.sh
+
+CHECKPOINT_DIR="file://$TEST_DATA_DIR/savepoint-e2e-test-chckpt-dir"
+
+TEST=flink-heavy-deployment-stress-test
+TEST_PROGRAM_NAME=HeavyDeploymentStressTestProgram
+TEST_PROGRAM_JAR=${END_TO_END_DIR}/$TEST/target/$TEST_PROGRAM_NAME.jar
+
+set_conf "taskmanager.heap.mb" "256" # 256Mb x 20 TMs = 5Gb total heap
+
+set_conf "taskmanager.memory.size" "8" # 8Mb
+set_conf "taskmanager.network.memory.min" "8mb"
+set_conf "taskmanager.network.memory.max" "8mb"
+set_conf "taskmanager.memory.segment-size" "8kb"
+
+set_conf "taskmanager.numberOfTaskSlots" "11"
+
+start_cluster
 
 Review comment:
   `start_cluster` already starts a single TM. Thus, we should have 21 TMs 
running. Is this correct?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add stress deployment end-to-end test
> -
>
> Key: FLINK-10720
> URL: https://issues.apache.org/jira/browse/FLINK-10720
> Project: Flink
>  Issue Type: Sub-task
>  Components: E2E Tests
>Affects Versions: 1.7.0
>Reporter: Till Rohrmann
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> In order to test Flink's scalability, I suggest to add an end-to-end test 
> which tests the deployment of a job which is very demanding. The job should 
> have large {{TaskDeploymentDescriptors}} (e.g. a job using union state or 
> having a high degree of parallelism). That way we can test that the 
> serialization overhead of the TDDs does not affect the health of the cluster 
> (e.g. heartbeats are not affected because the serialization does not happen 
> in the main thread).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10720) Add stress deployment end-to-end test

2018-11-02 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10720?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16673422#comment-16673422
 ] 

ASF GitHub Bot commented on FLINK-10720:


tillrohrmann commented on a change in pull request #6994: [FLINK-10720][tests] 
Add deployment end-to-end stress test with many …
URL: https://github.com/apache/flink/pull/6994#discussion_r230439584
 
 

 ##
 File path: flink-end-to-end-tests/test-scripts/test_heavy_deployment.sh
 ##
 @@ -0,0 +1,50 @@
+#!/usr/bin/env bash
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+source "$(dirname "$0")"/common.sh
+
+CHECKPOINT_DIR="file://$TEST_DATA_DIR/savepoint-e2e-test-chckpt-dir"
+
+TEST=flink-heavy-deployment-stress-test
+TEST_PROGRAM_NAME=HeavyDeploymentStressTestProgram
+TEST_PROGRAM_JAR=${END_TO_END_DIR}/$TEST/target/$TEST_PROGRAM_NAME.jar
+
+set_conf "taskmanager.heap.mb" "256" # 256Mb x 20 TMs = 5Gb total heap
+
+set_conf "taskmanager.memory.size" "8" # 8Mb
+set_conf "taskmanager.network.memory.min" "8mb"
+set_conf "taskmanager.network.memory.max" "8mb"
+set_conf "taskmanager.memory.segment-size" "8kb"
+
+set_conf "taskmanager.numberOfTaskSlots" "11"
+
+start_cluster
+
+NUM_TMS=20
+
+#20 x 13 slots to support a parallelism of 256
 
 Review comment:
   Comment is not correct.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add stress deployment end-to-end test
> -
>
> Key: FLINK-10720
> URL: https://issues.apache.org/jira/browse/FLINK-10720
> Project: Flink
>  Issue Type: Sub-task
>  Components: E2E Tests
>Affects Versions: 1.7.0
>Reporter: Till Rohrmann
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> In order to test Flink's scalability, I suggest to add an end-to-end test 
> which tests the deployment of a job which is very demanding. The job should 
> have large {{TaskDeploymentDescriptors}} (e.g. a job using union state or 
> having a high degree of parallelism). That way we can test that the 
> serialization overhead of the TDDs does not affect the health of the cluster 
> (e.g. heartbeats are not affected because the serialization does not happen 
> in the main thread).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10720) Add stress deployment end-to-end test

2018-11-02 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10720?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16673424#comment-16673424
 ] 

ASF GitHub Bot commented on FLINK-10720:


tillrohrmann commented on a change in pull request #6994: [FLINK-10720][tests] 
Add deployment end-to-end stress test with many …
URL: https://github.com/apache/flink/pull/6994#discussion_r230443405
 
 

 ##
 File path: 
flink-end-to-end-tests/flink-heavy-deployment-stress-test/src/main/java/org/apache/flink/deployment/HeavyDeploymentStressTestProgram.java
 ##
 @@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.deployment;
+
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.OperatorStateStore;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
+import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.util.Preconditions;
+
+import static 
org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.setupEnvironment;
+
+
+/**
+ * End-to-end test for heavy deployment descriptors. This test creates a heavy 
deployment by producing inflated meta
+ * data for the source's operator state. The state is registered as union 
state and will be multiplied in deployment.
+ */
+public class HeavyDeploymentStressTestProgram {
+
+   private static final ConfigOption NUM_LIST_STATES_PER_OP = 
ConfigOptions
+   .key("heavy_deployment_test.num_list_states_per_op")
+   .defaultValue(100);
+
+   private static final ConfigOption 
NUM_PARTITIONS_PER_LIST_STATE = ConfigOptions
+   .key("heavy_deployment_test.num_partitions_per_list_state")
+   .defaultValue(100);
+
+   public static void main(String[] args) throws Exception {
+
+   final ParameterTool pt = ParameterTool.fromArgs(args);
+   final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+   setupEnvironment(env, pt);
+
+   final int numStates =
+   pt.getInt(NUM_LIST_STATES_PER_OP.key(), 
NUM_LIST_STATES_PER_OP.defaultValue());
+   final int numPartitionsPerState =
+   pt.getInt(NUM_PARTITIONS_PER_LIST_STATE.key(), 
NUM_PARTITIONS_PER_LIST_STATE.defaultValue());
+
+   Preconditions.checkState(env.getCheckpointInterval() > 0L, 
"Checkpointing must be enabled for this test!");
+
+   env.addSource(new 
SimpleEndlessSourceWithBloatedState(numStates, 
numPartitionsPerState)).setParallelism(env.getParallelism())
+   .addSink(new DiscardingSink<>()).setParallelism(1);
+
+   env.execute("HeavyDeploymentStressTestProgram");
+   }
+
+   /**
+* Source with dummy operator state that results in inflated meta data.
+*/
+   static class SimpleEndlessSourceWithBloatedState extends 
RichParallelSourceFunction
 
 Review comment:
   `serialVersionUID` missing


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add stress deployment end-to-end test
> -
>
> Key: FLINK-10720
> URL: https://issues.apache.org/jira/browse/FLINK-10720
> Project: 

[jira] [Commented] (FLINK-10720) Add stress deployment end-to-end test

2018-11-02 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10720?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16673428#comment-16673428
 ] 

ASF GitHub Bot commented on FLINK-10720:


tillrohrmann commented on a change in pull request #6994: [FLINK-10720][tests] 
Add deployment end-to-end stress test with many …
URL: https://github.com/apache/flink/pull/6994#discussion_r230439813
 
 

 ##
 File path: flink-end-to-end-tests/test-scripts/test_heavy_deployment.sh
 ##
 @@ -0,0 +1,50 @@
+#!/usr/bin/env bash
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+source "$(dirname "$0")"/common.sh
+
+CHECKPOINT_DIR="file://$TEST_DATA_DIR/savepoint-e2e-test-chckpt-dir"
+
+TEST=flink-heavy-deployment-stress-test
+TEST_PROGRAM_NAME=HeavyDeploymentStressTestProgram
+TEST_PROGRAM_JAR=${END_TO_END_DIR}/$TEST/target/$TEST_PROGRAM_NAME.jar
+
+set_conf "taskmanager.heap.mb" "256" # 256Mb x 20 TMs = 5Gb total heap
+
+set_conf "taskmanager.memory.size" "8" # 8Mb
+set_conf "taskmanager.network.memory.min" "8mb"
+set_conf "taskmanager.network.memory.max" "8mb"
+set_conf "taskmanager.memory.segment-size" "8kb"
+
+set_conf "taskmanager.numberOfTaskSlots" "11"
+
+start_cluster
+
+NUM_TMS=20
+
+#20 x 13 slots to support a parallelism of 256
+echo "Start $NUM_TMS more task managers"
+for i in `seq 1 $NUM_TMS`; do
+$FLINK_DIR/bin/taskmanager.sh start
+done
 
 Review comment:
   I think it's better to use `common.sh#start_taskmanagers` here.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add stress deployment end-to-end test
> -
>
> Key: FLINK-10720
> URL: https://issues.apache.org/jira/browse/FLINK-10720
> Project: Flink
>  Issue Type: Sub-task
>  Components: E2E Tests
>Affects Versions: 1.7.0
>Reporter: Till Rohrmann
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> In order to test Flink's scalability, I suggest to add an end-to-end test 
> which tests the deployment of a job which is very demanding. The job should 
> have large {{TaskDeploymentDescriptors}} (e.g. a job using union state or 
> having a high degree of parallelism). That way we can test that the 
> serialization overhead of the TDDs does not affect the health of the cluster 
> (e.g. heartbeats are not affected because the serialization does not happen 
> in the main thread).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10720) Add stress deployment end-to-end test

2018-11-02 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10720?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16673423#comment-16673423
 ] 

ASF GitHub Bot commented on FLINK-10720:


tillrohrmann commented on a change in pull request #6994: [FLINK-10720][tests] 
Add deployment end-to-end stress test with many …
URL: https://github.com/apache/flink/pull/6994#discussion_r230440331
 
 

 ##
 File path: flink-end-to-end-tests/test-scripts/test_heavy_deployment.sh
 ##
 @@ -0,0 +1,50 @@
+#!/usr/bin/env bash
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+source "$(dirname "$0")"/common.sh
+
+CHECKPOINT_DIR="file://$TEST_DATA_DIR/savepoint-e2e-test-chckpt-dir"
+
+TEST=flink-heavy-deployment-stress-test
+TEST_PROGRAM_NAME=HeavyDeploymentStressTestProgram
+TEST_PROGRAM_JAR=${END_TO_END_DIR}/$TEST/target/$TEST_PROGRAM_NAME.jar
+
+set_conf "taskmanager.heap.mb" "256" # 256Mb x 20 TMs = 5Gb total heap
+
+set_conf "taskmanager.memory.size" "8" # 8Mb
+set_conf "taskmanager.network.memory.min" "8mb"
+set_conf "taskmanager.network.memory.max" "8mb"
+set_conf "taskmanager.memory.segment-size" "8kb"
+
+set_conf "taskmanager.numberOfTaskSlots" "11"
+
+start_cluster
+
+NUM_TMS=20
+
+#20 x 13 slots to support a parallelism of 256
+echo "Start $NUM_TMS more task managers"
+for i in `seq 1 $NUM_TMS`; do
+$FLINK_DIR/bin/taskmanager.sh start
+done
+
+$FLINK_DIR/bin/flink run -p 200 ${TEST_PROGRAM_JAR} \
+--environment.max_parallelism 1024 --environment.parallelism 200 \
 
 Review comment:
   `-p 200` should not be needed if we set `--environment.parallelism`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add stress deployment end-to-end test
> -
>
> Key: FLINK-10720
> URL: https://issues.apache.org/jira/browse/FLINK-10720
> Project: Flink
>  Issue Type: Sub-task
>  Components: E2E Tests
>Affects Versions: 1.7.0
>Reporter: Till Rohrmann
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> In order to test Flink's scalability, I suggest to add an end-to-end test 
> which tests the deployment of a job which is very demanding. The job should 
> have large {{TaskDeploymentDescriptors}} (e.g. a job using union state or 
> having a high degree of parallelism). That way we can test that the 
> serialization overhead of the TDDs does not affect the health of the cluster 
> (e.g. heartbeats are not affected because the serialization does not happen 
> in the main thread).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10720) Add stress deployment end-to-end test

2018-11-02 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10720?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16673427#comment-16673427
 ] 

ASF GitHub Bot commented on FLINK-10720:


tillrohrmann commented on a change in pull request #6994: [FLINK-10720][tests] 
Add deployment end-to-end stress test with many …
URL: https://github.com/apache/flink/pull/6994#discussion_r230443577
 
 

 ##
 File path: flink-end-to-end-tests/flink-heavy-deployment-stress-test/pom.xml
 ##
 @@ -0,0 +1,99 @@
+
+
+http://maven.apache.org/POM/4.0.0; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd;>
+
+   4.0.0
+   
+   
+   org.apache.flink
+   flink-end-to-end-tests
+   1.7-SNAPSHOT
+   ..
+   
+
+   flink-heavy-deployment-stress-test
+   flink-heavy-deployment-stress-test
+   jar
+
+   
+   
+   org.apache.flink
+   flink-core
+   ${project.version}
+   
+   
+   org.apache.flink
+   flink-java
+   ${project.version}
+   
+   
+   org.apache.flink
+   flink-streaming-java_2.11
+   ${project.version}
+   
+   
+   org.apache.flink
+   flink-datastream-allround-test
+   ${project.version}
+   
+   
+   junit
+   junit
+   
 
 Review comment:
   Not needed imo


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add stress deployment end-to-end test
> -
>
> Key: FLINK-10720
> URL: https://issues.apache.org/jira/browse/FLINK-10720
> Project: Flink
>  Issue Type: Sub-task
>  Components: E2E Tests
>Affects Versions: 1.7.0
>Reporter: Till Rohrmann
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> In order to test Flink's scalability, I suggest to add an end-to-end test 
> which tests the deployment of a job which is very demanding. The job should 
> have large {{TaskDeploymentDescriptors}} (e.g. a job using union state or 
> having a high degree of parallelism). That way we can test that the 
> serialization overhead of the TDDs does not affect the health of the cluster 
> (e.g. heartbeats are not affected because the serialization does not happen 
> in the main thread).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10720) Add stress deployment end-to-end test

2018-11-02 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10720?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16673425#comment-16673425
 ] 

ASF GitHub Bot commented on FLINK-10720:


tillrohrmann commented on a change in pull request #6994: [FLINK-10720][tests] 
Add deployment end-to-end stress test with many …
URL: https://github.com/apache/flink/pull/6994#discussion_r230443486
 
 

 ##
 File path: flink-end-to-end-tests/flink-heavy-deployment-stress-test/pom.xml
 ##
 @@ -0,0 +1,99 @@
+
+
+http://maven.apache.org/POM/4.0.0; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd;>
+
+   4.0.0
+   
+   
+   org.apache.flink
+   flink-end-to-end-tests
+   1.7-SNAPSHOT
+   ..
+   
+
+   flink-heavy-deployment-stress-test
+   flink-heavy-deployment-stress-test
+   jar
+
+   
+   
+   org.apache.flink
+   flink-core
+   ${project.version}
+   
+   
+   org.apache.flink
+   flink-java
+   ${project.version}
+   
 
 Review comment:
   I think we don't need these two dependencies.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add stress deployment end-to-end test
> -
>
> Key: FLINK-10720
> URL: https://issues.apache.org/jira/browse/FLINK-10720
> Project: Flink
>  Issue Type: Sub-task
>  Components: E2E Tests
>Affects Versions: 1.7.0
>Reporter: Till Rohrmann
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> In order to test Flink's scalability, I suggest to add an end-to-end test 
> which tests the deployment of a job which is very demanding. The job should 
> have large {{TaskDeploymentDescriptors}} (e.g. a job using union state or 
> having a high degree of parallelism). That way we can test that the 
> serialization overhead of the TDDs does not affect the health of the cluster 
> (e.g. heartbeats are not affected because the serialization does not happen 
> in the main thread).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10720) Add stress deployment end-to-end test

2018-11-02 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10720?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16673420#comment-16673420
 ] 

ASF GitHub Bot commented on FLINK-10720:


tillrohrmann commented on a change in pull request #6994: [FLINK-10720][tests] 
Add deployment end-to-end stress test with many …
URL: https://github.com/apache/flink/pull/6994#discussion_r230443678
 
 

 ##
 File path: flink-end-to-end-tests/flink-heavy-deployment-stress-test/pom.xml
 ##
 @@ -0,0 +1,99 @@
+
+
+http://maven.apache.org/POM/4.0.0; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd;>
+
+   4.0.0
+   
+   
+   org.apache.flink
+   flink-end-to-end-tests
+   1.7-SNAPSHOT
+   ..
+   
+
+   flink-heavy-deployment-stress-test
+   flink-heavy-deployment-stress-test
+   jar
+
+   
+   
+   org.apache.flink
+   flink-core
+   ${project.version}
+   
+   
+   org.apache.flink
+   flink-java
+   ${project.version}
+   
+   
+   org.apache.flink
+   flink-streaming-java_2.11
+   ${project.version}
+   
+   
+   org.apache.flink
+   flink-datastream-allround-test
+   ${project.version}
+   
+   
+   junit
+   junit
+   
+   
+   org.apache.flink
+   flink-test-utils_2.11
+   ${project.version}
+   compile
+   
+   
+   log4j
+   log4j
+   
 
 Review comment:
   I don't think that we need this dependency.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add stress deployment end-to-end test
> -
>
> Key: FLINK-10720
> URL: https://issues.apache.org/jira/browse/FLINK-10720
> Project: Flink
>  Issue Type: Sub-task
>  Components: E2E Tests
>Affects Versions: 1.7.0
>Reporter: Till Rohrmann
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> In order to test Flink's scalability, I suggest to add an end-to-end test 
> which tests the deployment of a job which is very demanding. The job should 
> have large {{TaskDeploymentDescriptors}} (e.g. a job using union state or 
> having a high degree of parallelism). That way we can test that the 
> serialization overhead of the TDDs does not affect the health of the cluster 
> (e.g. heartbeats are not affected because the serialization does not happen 
> in the main thread).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] tillrohrmann commented on a change in pull request #6994: [FLINK-10720][tests] Add deployment end-to-end stress test with many …

2018-11-02 Thread GitBox
tillrohrmann commented on a change in pull request #6994: [FLINK-10720][tests] 
Add deployment end-to-end stress test with many …
URL: https://github.com/apache/flink/pull/6994#discussion_r230439584
 
 

 ##
 File path: flink-end-to-end-tests/test-scripts/test_heavy_deployment.sh
 ##
 @@ -0,0 +1,50 @@
+#!/usr/bin/env bash
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+source "$(dirname "$0")"/common.sh
+
+CHECKPOINT_DIR="file://$TEST_DATA_DIR/savepoint-e2e-test-chckpt-dir"
+
+TEST=flink-heavy-deployment-stress-test
+TEST_PROGRAM_NAME=HeavyDeploymentStressTestProgram
+TEST_PROGRAM_JAR=${END_TO_END_DIR}/$TEST/target/$TEST_PROGRAM_NAME.jar
+
+set_conf "taskmanager.heap.mb" "256" # 256Mb x 20 TMs = 5Gb total heap
+
+set_conf "taskmanager.memory.size" "8" # 8Mb
+set_conf "taskmanager.network.memory.min" "8mb"
+set_conf "taskmanager.network.memory.max" "8mb"
+set_conf "taskmanager.memory.segment-size" "8kb"
+
+set_conf "taskmanager.numberOfTaskSlots" "11"
+
+start_cluster
+
+NUM_TMS=20
+
+#20 x 13 slots to support a parallelism of 256
 
 Review comment:
   Comment is not correct.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann commented on a change in pull request #6994: [FLINK-10720][tests] Add deployment end-to-end stress test with many …

2018-11-02 Thread GitBox
tillrohrmann commented on a change in pull request #6994: [FLINK-10720][tests] 
Add deployment end-to-end stress test with many …
URL: https://github.com/apache/flink/pull/6994#discussion_r230440331
 
 

 ##
 File path: flink-end-to-end-tests/test-scripts/test_heavy_deployment.sh
 ##
 @@ -0,0 +1,50 @@
+#!/usr/bin/env bash
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+source "$(dirname "$0")"/common.sh
+
+CHECKPOINT_DIR="file://$TEST_DATA_DIR/savepoint-e2e-test-chckpt-dir"
+
+TEST=flink-heavy-deployment-stress-test
+TEST_PROGRAM_NAME=HeavyDeploymentStressTestProgram
+TEST_PROGRAM_JAR=${END_TO_END_DIR}/$TEST/target/$TEST_PROGRAM_NAME.jar
+
+set_conf "taskmanager.heap.mb" "256" # 256Mb x 20 TMs = 5Gb total heap
+
+set_conf "taskmanager.memory.size" "8" # 8Mb
+set_conf "taskmanager.network.memory.min" "8mb"
+set_conf "taskmanager.network.memory.max" "8mb"
+set_conf "taskmanager.memory.segment-size" "8kb"
+
+set_conf "taskmanager.numberOfTaskSlots" "11"
+
+start_cluster
+
+NUM_TMS=20
+
+#20 x 13 slots to support a parallelism of 256
+echo "Start $NUM_TMS more task managers"
+for i in `seq 1 $NUM_TMS`; do
+$FLINK_DIR/bin/taskmanager.sh start
+done
+
+$FLINK_DIR/bin/flink run -p 200 ${TEST_PROGRAM_JAR} \
+--environment.max_parallelism 1024 --environment.parallelism 200 \
 
 Review comment:
   `-p 200` should not be needed if we set `--environment.parallelism`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann commented on a change in pull request #6994: [FLINK-10720][tests] Add deployment end-to-end stress test with many …

2018-11-02 Thread GitBox
tillrohrmann commented on a change in pull request #6994: [FLINK-10720][tests] 
Add deployment end-to-end stress test with many …
URL: https://github.com/apache/flink/pull/6994#discussion_r230439813
 
 

 ##
 File path: flink-end-to-end-tests/test-scripts/test_heavy_deployment.sh
 ##
 @@ -0,0 +1,50 @@
+#!/usr/bin/env bash
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+source "$(dirname "$0")"/common.sh
+
+CHECKPOINT_DIR="file://$TEST_DATA_DIR/savepoint-e2e-test-chckpt-dir"
+
+TEST=flink-heavy-deployment-stress-test
+TEST_PROGRAM_NAME=HeavyDeploymentStressTestProgram
+TEST_PROGRAM_JAR=${END_TO_END_DIR}/$TEST/target/$TEST_PROGRAM_NAME.jar
+
+set_conf "taskmanager.heap.mb" "256" # 256Mb x 20 TMs = 5Gb total heap
+
+set_conf "taskmanager.memory.size" "8" # 8Mb
+set_conf "taskmanager.network.memory.min" "8mb"
+set_conf "taskmanager.network.memory.max" "8mb"
+set_conf "taskmanager.memory.segment-size" "8kb"
+
+set_conf "taskmanager.numberOfTaskSlots" "11"
+
+start_cluster
+
+NUM_TMS=20
+
+#20 x 13 slots to support a parallelism of 256
+echo "Start $NUM_TMS more task managers"
+for i in `seq 1 $NUM_TMS`; do
+$FLINK_DIR/bin/taskmanager.sh start
+done
 
 Review comment:
   I think it's better to use `common.sh#start_taskmanagers` here.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann commented on a change in pull request #6994: [FLINK-10720][tests] Add deployment end-to-end stress test with many …

2018-11-02 Thread GitBox
tillrohrmann commented on a change in pull request #6994: [FLINK-10720][tests] 
Add deployment end-to-end stress test with many …
URL: https://github.com/apache/flink/pull/6994#discussion_r230443577
 
 

 ##
 File path: flink-end-to-end-tests/flink-heavy-deployment-stress-test/pom.xml
 ##
 @@ -0,0 +1,99 @@
+
+
+http://maven.apache.org/POM/4.0.0; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd;>
+
+   4.0.0
+   
+   
+   org.apache.flink
+   flink-end-to-end-tests
+   1.7-SNAPSHOT
+   ..
+   
+
+   flink-heavy-deployment-stress-test
+   flink-heavy-deployment-stress-test
+   jar
+
+   
+   
+   org.apache.flink
+   flink-core
+   ${project.version}
+   
+   
+   org.apache.flink
+   flink-java
+   ${project.version}
+   
+   
+   org.apache.flink
+   flink-streaming-java_2.11
+   ${project.version}
+   
+   
+   org.apache.flink
+   flink-datastream-allround-test
+   ${project.version}
+   
+   
+   junit
+   junit
+   
 
 Review comment:
   Not needed imo


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann commented on a change in pull request #6994: [FLINK-10720][tests] Add deployment end-to-end stress test with many …

2018-11-02 Thread GitBox
tillrohrmann commented on a change in pull request #6994: [FLINK-10720][tests] 
Add deployment end-to-end stress test with many …
URL: https://github.com/apache/flink/pull/6994#discussion_r230443678
 
 

 ##
 File path: flink-end-to-end-tests/flink-heavy-deployment-stress-test/pom.xml
 ##
 @@ -0,0 +1,99 @@
+
+
+http://maven.apache.org/POM/4.0.0; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd;>
+
+   4.0.0
+   
+   
+   org.apache.flink
+   flink-end-to-end-tests
+   1.7-SNAPSHOT
+   ..
+   
+
+   flink-heavy-deployment-stress-test
+   flink-heavy-deployment-stress-test
+   jar
+
+   
+   
+   org.apache.flink
+   flink-core
+   ${project.version}
+   
+   
+   org.apache.flink
+   flink-java
+   ${project.version}
+   
+   
+   org.apache.flink
+   flink-streaming-java_2.11
+   ${project.version}
+   
+   
+   org.apache.flink
+   flink-datastream-allround-test
+   ${project.version}
+   
+   
+   junit
+   junit
+   
+   
+   org.apache.flink
+   flink-test-utils_2.11
+   ${project.version}
+   compile
+   
+   
+   log4j
+   log4j
+   
 
 Review comment:
   I don't think that we need this dependency.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann commented on a change in pull request #6994: [FLINK-10720][tests] Add deployment end-to-end stress test with many …

2018-11-02 Thread GitBox
tillrohrmann commented on a change in pull request #6994: [FLINK-10720][tests] 
Add deployment end-to-end stress test with many …
URL: https://github.com/apache/flink/pull/6994#discussion_r230443405
 
 

 ##
 File path: 
flink-end-to-end-tests/flink-heavy-deployment-stress-test/src/main/java/org/apache/flink/deployment/HeavyDeploymentStressTestProgram.java
 ##
 @@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.deployment;
+
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.OperatorStateStore;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
+import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.util.Preconditions;
+
+import static 
org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.setupEnvironment;
+
+
+/**
+ * End-to-end test for heavy deployment descriptors. This test creates a heavy 
deployment by producing inflated meta
+ * data for the source's operator state. The state is registered as union 
state and will be multiplied in deployment.
+ */
+public class HeavyDeploymentStressTestProgram {
+
+   private static final ConfigOption NUM_LIST_STATES_PER_OP = 
ConfigOptions
+   .key("heavy_deployment_test.num_list_states_per_op")
+   .defaultValue(100);
+
+   private static final ConfigOption 
NUM_PARTITIONS_PER_LIST_STATE = ConfigOptions
+   .key("heavy_deployment_test.num_partitions_per_list_state")
+   .defaultValue(100);
+
+   public static void main(String[] args) throws Exception {
+
+   final ParameterTool pt = ParameterTool.fromArgs(args);
+   final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+   setupEnvironment(env, pt);
+
+   final int numStates =
+   pt.getInt(NUM_LIST_STATES_PER_OP.key(), 
NUM_LIST_STATES_PER_OP.defaultValue());
+   final int numPartitionsPerState =
+   pt.getInt(NUM_PARTITIONS_PER_LIST_STATE.key(), 
NUM_PARTITIONS_PER_LIST_STATE.defaultValue());
+
+   Preconditions.checkState(env.getCheckpointInterval() > 0L, 
"Checkpointing must be enabled for this test!");
+
+   env.addSource(new 
SimpleEndlessSourceWithBloatedState(numStates, 
numPartitionsPerState)).setParallelism(env.getParallelism())
+   .addSink(new DiscardingSink<>()).setParallelism(1);
+
+   env.execute("HeavyDeploymentStressTestProgram");
+   }
+
+   /**
+* Source with dummy operator state that results in inflated meta data.
+*/
+   static class SimpleEndlessSourceWithBloatedState extends 
RichParallelSourceFunction
 
 Review comment:
   `serialVersionUID` missing


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann commented on a change in pull request #6994: [FLINK-10720][tests] Add deployment end-to-end stress test with many …

2018-11-02 Thread GitBox
tillrohrmann commented on a change in pull request #6994: [FLINK-10720][tests] 
Add deployment end-to-end stress test with many …
URL: https://github.com/apache/flink/pull/6994#discussion_r230441106
 
 

 ##
 File path: flink-end-to-end-tests/test-scripts/test_heavy_deployment.sh
 ##
 @@ -0,0 +1,50 @@
+#!/usr/bin/env bash
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+source "$(dirname "$0")"/common.sh
+
+CHECKPOINT_DIR="file://$TEST_DATA_DIR/savepoint-e2e-test-chckpt-dir"
+
+TEST=flink-heavy-deployment-stress-test
+TEST_PROGRAM_NAME=HeavyDeploymentStressTestProgram
+TEST_PROGRAM_JAR=${END_TO_END_DIR}/$TEST/target/$TEST_PROGRAM_NAME.jar
+
+set_conf "taskmanager.heap.mb" "256" # 256Mb x 20 TMs = 5Gb total heap
+
+set_conf "taskmanager.memory.size" "8" # 8Mb
+set_conf "taskmanager.network.memory.min" "8mb"
+set_conf "taskmanager.network.memory.max" "8mb"
+set_conf "taskmanager.memory.segment-size" "8kb"
+
+set_conf "taskmanager.numberOfTaskSlots" "11"
+
+start_cluster
+
+NUM_TMS=20
+
+#20 x 13 slots to support a parallelism of 256
+echo "Start $NUM_TMS more task managers"
+for i in `seq 1 $NUM_TMS`; do
+$FLINK_DIR/bin/taskmanager.sh start
+done
+
+$FLINK_DIR/bin/flink run -p 200 ${TEST_PROGRAM_JAR} \
+--environment.max_parallelism 1024 --environment.parallelism 200 \
 
 Review comment:
   Why don't we use all slots?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann commented on a change in pull request #6994: [FLINK-10720][tests] Add deployment end-to-end stress test with many …

2018-11-02 Thread GitBox
tillrohrmann commented on a change in pull request #6994: [FLINK-10720][tests] 
Add deployment end-to-end stress test with many …
URL: https://github.com/apache/flink/pull/6994#discussion_r230443486
 
 

 ##
 File path: flink-end-to-end-tests/flink-heavy-deployment-stress-test/pom.xml
 ##
 @@ -0,0 +1,99 @@
+
+
+http://maven.apache.org/POM/4.0.0; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd;>
+
+   4.0.0
+   
+   
+   org.apache.flink
+   flink-end-to-end-tests
+   1.7-SNAPSHOT
+   ..
+   
+
+   flink-heavy-deployment-stress-test
+   flink-heavy-deployment-stress-test
+   jar
+
+   
+   
+   org.apache.flink
+   flink-core
+   ${project.version}
+   
+   
+   org.apache.flink
+   flink-java
+   ${project.version}
+   
 
 Review comment:
   I think we don't need these two dependencies.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann commented on a change in pull request #6994: [FLINK-10720][tests] Add deployment end-to-end stress test with many …

2018-11-02 Thread GitBox
tillrohrmann commented on a change in pull request #6994: [FLINK-10720][tests] 
Add deployment end-to-end stress test with many …
URL: https://github.com/apache/flink/pull/6994#discussion_r230440912
 
 

 ##
 File path: flink-end-to-end-tests/test-scripts/test_heavy_deployment.sh
 ##
 @@ -0,0 +1,50 @@
+#!/usr/bin/env bash
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+source "$(dirname "$0")"/common.sh
+
+CHECKPOINT_DIR="file://$TEST_DATA_DIR/savepoint-e2e-test-chckpt-dir"
+
+TEST=flink-heavy-deployment-stress-test
+TEST_PROGRAM_NAME=HeavyDeploymentStressTestProgram
+TEST_PROGRAM_JAR=${END_TO_END_DIR}/$TEST/target/$TEST_PROGRAM_NAME.jar
+
+set_conf "taskmanager.heap.mb" "256" # 256Mb x 20 TMs = 5Gb total heap
+
+set_conf "taskmanager.memory.size" "8" # 8Mb
+set_conf "taskmanager.network.memory.min" "8mb"
+set_conf "taskmanager.network.memory.max" "8mb"
+set_conf "taskmanager.memory.segment-size" "8kb"
+
+set_conf "taskmanager.numberOfTaskSlots" "11"
+
+start_cluster
 
 Review comment:
   `start_cluster` already starts a single TM. Thus, we should have 21 TMs 
running. Is this correct?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zentol commented on a change in pull request #7003: [FLINK-10633][prometheus] Add E2E test

2018-11-02 Thread GitBox
zentol commented on a change in pull request #7003: [FLINK-10633][prometheus] 
Add E2E test
URL: https://github.com/apache/flink/pull/7003#discussion_r230443206
 
 

 ##
 File path: 
flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/FlinkDistribution.java
 ##
 @@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tests.util;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.UnmodifiableConfiguration;
+import org.apache.flink.util.ExceptionUtils;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.Response;
+import org.junit.Assert;
+import org.junit.rules.ExternalResource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardCopyOption;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * A wrapper around a Flink distribution.
+ */
+public final class FlinkDistribution extends ExternalResource {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(FlinkDistribution.class);
+
+   private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+   private final List filesToDelete = new ArrayList<>(4);
+
+   private static final Path FLINK_CONF_YAML = 
Paths.get("flink-conf.yaml");
+   private static final Path FLINK_CONF_YAML_BACKUP = 
Paths.get("flink-conf.yaml.bak");
+
+   private final Path opt;
+   private final Path lib;
+   private final Path conf;
+   private final Path log;
+   private final Path bin;
+
+   private Configuration defaultConfig;
+
+   public FlinkDistribution() {
+   final String distDirProperty = System.getProperty("distDir");
+   if (distDirProperty == null) {
+   Assert.fail("The distDir property was not set. You can 
set it when running maven via -DdistDir= .");
+   }
+   final Path flinkDir = Paths.get(distDirProperty);
+   bin = flinkDir.resolve("bin");
+   opt = flinkDir.resolve("opt");
+   lib = flinkDir.resolve("lib");
+   conf = flinkDir.resolve("conf");
+   log = flinkDir.resolve("log");
+   }
+
+   @Override
+   protected void before() throws IOException {
+   defaultConfig = new 
UnmodifiableConfiguration(GlobalConfiguration.loadConfiguration(conf.toAbsolutePath().toString()));
+   final Path originalConfig = conf.resolve(FLINK_CONF_YAML);
+   final Path backupConfig = conf.resolve(FLINK_CONF_YAML_BACKUP);
+   Files.copy(originalConfig, backupConfig);
+   filesToDelete.add(new AutoClosablePath(backupConfig));
+   }
+
+   @Override
+   protected void after() {
+   try {
+   stopFlinkCluster();
+   } catch (IOException e) {
+   LOG.error("Failure while shutting down Flink cluster.", 
e);
+   }
+
+   final Path originalConfig = conf.resolve(FLINK_CONF_YAML);
+   final Path backupConfig = conf.resolve(FLINK_CONF_YAML_BACKUP);
+
+   try {
+   Files.move(backupConfig, originalConfig, 
StandardCopyOption.REPLACE_EXISTING);
+   } catch (IOException e) {
+   LOG.error("Failed to restore flink-conf.yaml", e);
+   }
+
+  

[jira] [Commented] (FLINK-10633) End-to-end test: Prometheus reporter

2018-11-02 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10633?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16673414#comment-16673414
 ] 

ASF GitHub Bot commented on FLINK-10633:


zentol commented on a change in pull request #7003: [FLINK-10633][prometheus] 
Add E2E test
URL: https://github.com/apache/flink/pull/7003#discussion_r230443206
 
 

 ##
 File path: 
flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/FlinkDistribution.java
 ##
 @@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tests.util;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.UnmodifiableConfiguration;
+import org.apache.flink.util.ExceptionUtils;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.Response;
+import org.junit.Assert;
+import org.junit.rules.ExternalResource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardCopyOption;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * A wrapper around a Flink distribution.
+ */
+public final class FlinkDistribution extends ExternalResource {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(FlinkDistribution.class);
+
+   private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+   private final List filesToDelete = new ArrayList<>(4);
+
+   private static final Path FLINK_CONF_YAML = 
Paths.get("flink-conf.yaml");
+   private static final Path FLINK_CONF_YAML_BACKUP = 
Paths.get("flink-conf.yaml.bak");
+
+   private final Path opt;
+   private final Path lib;
+   private final Path conf;
+   private final Path log;
+   private final Path bin;
+
+   private Configuration defaultConfig;
+
+   public FlinkDistribution() {
+   final String distDirProperty = System.getProperty("distDir");
+   if (distDirProperty == null) {
+   Assert.fail("The distDir property was not set. You can 
set it when running maven via -DdistDir= .");
+   }
+   final Path flinkDir = Paths.get(distDirProperty);
+   bin = flinkDir.resolve("bin");
+   opt = flinkDir.resolve("opt");
+   lib = flinkDir.resolve("lib");
+   conf = flinkDir.resolve("conf");
+   log = flinkDir.resolve("log");
+   }
+
+   @Override
+   protected void before() throws IOException {
+   defaultConfig = new 
UnmodifiableConfiguration(GlobalConfiguration.loadConfiguration(conf.toAbsolutePath().toString()));
+   final Path originalConfig = conf.resolve(FLINK_CONF_YAML);
+   final Path backupConfig = conf.resolve(FLINK_CONF_YAML_BACKUP);
+   Files.copy(originalConfig, backupConfig);
+   filesToDelete.add(new AutoClosablePath(backupConfig));
+   }
+
+   @Override
+   protected void after() {
+   try {
+   stopFlinkCluster();
+   } catch (IOException e) {
+   LOG.error("Failure while shutting down Flink cluster.", 
e);
+   }
+
+   final Path originalConfig = conf.resolve(FLINK_CONF_YAML);
+   final Path backupConfig = conf.resolve(FLINK_CONF_YAML_BACKUP);
+
+   try {

[jira] [Commented] (FLINK-10732) force-shading 1.5.5 maven artifact was not released

2018-11-02 Thread Chesnay Schepler (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10732?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16673412#comment-16673412
 ] 

Chesnay Schepler commented on FLINK-10732:
--

True, we could also switch to {{flink-shaded-force-shading}} which we've 
already had for over a year, however this doesn't solve the immediate issue of 
an unreleased artifact of a release.

> force-shading 1.5.5 maven artifact was not released
> ---
>
> Key: FLINK-10732
> URL: https://issues.apache.org/jira/browse/FLINK-10732
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.5.5
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
> Fix For: 1.5.5
>
>
> The 1.5.5 maven artifact for {{force-shading}} was not deployed. We have to 
> investigate whether other artifacts are missing as well and how to remedy the 
> problem, i.e. re-deploy the missing artifacts (which may or may not require a 
> separate vote).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10356) Add sanity checks to SpillingAdaptiveSpanningRecordDeserializer

2018-11-02 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10356?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16673405#comment-16673405
 ] 

ASF GitHub Bot commented on FLINK-10356:


NicoK commented on a change in pull request #6705: [FLINK-10356][network] add 
sanity checks to SpillingAdaptiveSpanningRecordDeserializer
URL: https://github.com/apache/flink/pull/6705#discussion_r230440178
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java
 ##
 @@ -104,11 +120,236 @@ public void testHandleMixedLargeRecords() throws 
Exception {
testSerializationRoundTrip(originalRecords, segmentSize);
}
 
+   /**
+* Non-spanning, deserialization reads one byte too many and succeeds.
+*/
+   @Test
+   public void testHandleDeserializingTooMuchNonSpanning1() throws 
Exception {
+   testHandleWrongDeserialization(
+   new StringValueDeserializingTooMuch("Test string"),
+   32 * 1024,
+   null);
+   }
+
+   /**
+* Non-spanning, serialization length is 16 (including headers), 
deserialization reads one byte
+* too many and succeeds.
+*/
+   @Test
+   public void testHandleDeserializingTooMuchNonSpanning2() throws 
Exception {
+   testHandleWrongDeserialization(
+   new StringValueDeserializingTooMuch("Test string"),
 
 Review comment:
   I'm not quite getting where you are going with this? Do you want it 
randomised?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add sanity checks to SpillingAdaptiveSpanningRecordDeserializer
> ---
>
> Key: FLINK-10356
> URL: https://issues.apache.org/jira/browse/FLINK-10356
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.5.0, 1.5.1, 1.5.2, 1.5.3, 1.5.4, 1.6.0, 1.6.1, 1.7.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Major
>  Labels: pull-request-available
>
> {{SpillingAdaptiveSpanningRecordDeserializer}} doesn't have any consistency 
> checks for usage calls or serializers behaving properly, e.g. to read only as 
> many bytes as available/promised for that record. At least these checks 
> should be added:
>  # Check that buffers have not been read from yet before adding them (this is 
> an invariant {{SpillingAdaptiveSpanningRecordDeserializer}} works with and 
> from what I can see, it is followed now.
>  # Check that after deserialization, we actually consumed {{recordLength}} 
> bytes
>  ** If not, in the spanning deserializer, we currently simply skip the 
> remaining bytes.
>  ** But in the non-spanning deserializer, we currently continue from the 
> wrong offset.
>  # Protect against {{setNextBuffer}} being called before draining all 
> available records



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] NicoK commented on a change in pull request #6705: [FLINK-10356][network] add sanity checks to SpillingAdaptiveSpanningRecordDeserializer

2018-11-02 Thread GitBox
NicoK commented on a change in pull request #6705: [FLINK-10356][network] add 
sanity checks to SpillingAdaptiveSpanningRecordDeserializer
URL: https://github.com/apache/flink/pull/6705#discussion_r230440178
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java
 ##
 @@ -104,11 +120,236 @@ public void testHandleMixedLargeRecords() throws 
Exception {
testSerializationRoundTrip(originalRecords, segmentSize);
}
 
+   /**
+* Non-spanning, deserialization reads one byte too many and succeeds.
+*/
+   @Test
+   public void testHandleDeserializingTooMuchNonSpanning1() throws 
Exception {
+   testHandleWrongDeserialization(
+   new StringValueDeserializingTooMuch("Test string"),
+   32 * 1024,
+   null);
+   }
+
+   /**
+* Non-spanning, serialization length is 16 (including headers), 
deserialization reads one byte
+* too many and succeeds.
+*/
+   @Test
+   public void testHandleDeserializingTooMuchNonSpanning2() throws 
Exception {
+   testHandleWrongDeserialization(
+   new StringValueDeserializingTooMuch("Test string"),
 
 Review comment:
   I'm not quite getting where you are going with this? Do you want it 
randomised?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10712) RestartPipelinedRegionStrategy does not restore state

2018-11-02 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10712?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16673402#comment-16673402
 ] 

ASF GitHub Bot commented on FLINK-10712:


Myasuka opened a new pull request #7009: [FLINK-10712] Support to restore state 
when using RestartPipelinedRegionStrategy
URL: https://github.com/apache/flink/pull/7009
 
 
   ## What is the purpose of the change
   
   Currently, RestartPipelinedRegionStrategy does not perform any state 
restore. This is big problem because all restored regions will be restarted 
with empty state. This PR supports to restore state when using 
RestartPipelinedRegionStrategy.
   
   
   ## Brief change log
   
 - Implement new `restoreLatestCheckpointedState` API for region-based 
failover in `CheckpointCoordinator`.
 - Reload checkpointed state when `FailoverRegion` called `restart` method.
- `StateAssignmentOperation` could assign state with given 
executionVertices.
   
   
   ## Verifying this change
   This change added tests and can be verified as follows:
   
 - Added unit tests for `FailoverRegion` to ensure the failover region ever 
called new `restoreLatestCheckpointedState` API within `CheckpointCoordinator`.
 - Added unit tests for `CheckpointCoordinatorTest` to ensure 
`CheckpointCoordinator` could restore with `RestartPipelinedRegionStrategy`.
- Added unit tests for `CheckpointStateRestoreTest` to ensure 
`RestartPipelinedRegionStrategy` could handle well when restoring state from a 
checkpoint to the task executions.
 - Added new integration test `RegionFailoverITCase` to verify state could 
be restored properly when the job consists multi regions.
 - Refactored `StreamFaultToleranceTestBase` to let all sub-classes ITs 
could failover with state using RestartPipelinedRegionStrategy.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): don't know
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? yes
 - If yes, how is the feature documented? JavaDocs
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> RestartPipelinedRegionStrategy does not restore state
> -
>
> Key: FLINK-10712
> URL: https://issues.apache.org/jira/browse/FLINK-10712
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.3.3, 1.4.2, 1.5.5, 1.6.2, 1.7.0
>Reporter: Stefan Richter
>Assignee: Yun Tang
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> RestartPipelinedRegionStrategy does not perform any state restore. This is 
> big problem because all restored regions will be restarted with empty state. 
> We need to take checkpoints into account when restoring.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10712) RestartPipelinedRegionStrategy does not restore state

2018-11-02 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10712?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-10712:
---
Labels: pull-request-available  (was: )

> RestartPipelinedRegionStrategy does not restore state
> -
>
> Key: FLINK-10712
> URL: https://issues.apache.org/jira/browse/FLINK-10712
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.3.3, 1.4.2, 1.5.5, 1.6.2, 1.7.0
>Reporter: Stefan Richter
>Assignee: Yun Tang
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> RestartPipelinedRegionStrategy does not perform any state restore. This is 
> big problem because all restored regions will be restarted with empty state. 
> We need to take checkpoints into account when restoring.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] Myasuka opened a new pull request #7009: [FLINK-10712] Support to restore state when using RestartPipelinedRegionStrategy

2018-11-02 Thread GitBox
Myasuka opened a new pull request #7009: [FLINK-10712] Support to restore state 
when using RestartPipelinedRegionStrategy
URL: https://github.com/apache/flink/pull/7009
 
 
   ## What is the purpose of the change
   
   Currently, RestartPipelinedRegionStrategy does not perform any state 
restore. This is big problem because all restored regions will be restarted 
with empty state. This PR supports to restore state when using 
RestartPipelinedRegionStrategy.
   
   
   ## Brief change log
   
 - Implement new `restoreLatestCheckpointedState` API for region-based 
failover in `CheckpointCoordinator`.
 - Reload checkpointed state when `FailoverRegion` called `restart` method.
- `StateAssignmentOperation` could assign state with given 
executionVertices.
   
   
   ## Verifying this change
   This change added tests and can be verified as follows:
   
 - Added unit tests for `FailoverRegion` to ensure the failover region ever 
called new `restoreLatestCheckpointedState` API within `CheckpointCoordinator`.
 - Added unit tests for `CheckpointCoordinatorTest` to ensure 
`CheckpointCoordinator` could restore with `RestartPipelinedRegionStrategy`.
- Added unit tests for `CheckpointStateRestoreTest` to ensure 
`RestartPipelinedRegionStrategy` could handle well when restoring state from a 
checkpoint to the task executions.
 - Added new integration test `RegionFailoverITCase` to verify state could 
be restored properly when the job consists multi regions.
 - Refactored `StreamFaultToleranceTestBase` to let all sub-classes ITs 
could failover with state using RestartPipelinedRegionStrategy.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): don't know
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? yes
 - If yes, how is the feature documented? JavaDocs
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10633) End-to-end test: Prometheus reporter

2018-11-02 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10633?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16673398#comment-16673398
 ] 

ASF GitHub Bot commented on FLINK-10633:


kl0u commented on issue #7003: [FLINK-10633][prometheus] Add E2E test
URL: https://github.com/apache/flink/pull/7003#issuecomment-435440138
 
 
   And one more, a more in-depth discussion and a design doc could be very 
helpful to correctly design a testing framework for e2e testing.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> End-to-end test: Prometheus reporter
> 
>
> Key: FLINK-10633
> URL: https://issues.apache.org/jira/browse/FLINK-10633
> Project: Flink
>  Issue Type: Sub-task
>  Components: E2E Tests
>Affects Versions: 1.7.0
>Reporter: Till Rohrmann
>Assignee: Chesnay Schepler
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Add an end-to-end test using the {{PrometheusReporter}} to verify that all 
> metrics are properly reported. Additionally verify that the newly introduce 
> RocksDB metrics are accessible (see FLINK-10423).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] kl0u commented on issue #7003: [FLINK-10633][prometheus] Add E2E test

2018-11-02 Thread GitBox
kl0u commented on issue #7003: [FLINK-10633][prometheus] Add E2E test
URL: https://github.com/apache/flink/pull/7003#issuecomment-435440138
 
 
   And one more, a more in-depth discussion and a design doc could be very 
helpful to correctly design a testing framework for e2e testing.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10633) End-to-end test: Prometheus reporter

2018-11-02 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10633?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16673391#comment-16673391
 ] 

ASF GitHub Bot commented on FLINK-10633:


kl0u edited a comment on issue #7003: [FLINK-10633][prometheus] Add E2E test
URL: https://github.com/apache/flink/pull/7003#issuecomment-435437266
 
 
   Btw, three more notes:
   1) there should be a discussion about how a testing framework should look 
like in the future, so that we are all on-board
   2) it would be nice to have more javadocs
   3) I would also suggest (slowly but steadily) to move away from `sed` or 
`wget` and stuff (in  future JIRAs).


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> End-to-end test: Prometheus reporter
> 
>
> Key: FLINK-10633
> URL: https://issues.apache.org/jira/browse/FLINK-10633
> Project: Flink
>  Issue Type: Sub-task
>  Components: E2E Tests
>Affects Versions: 1.7.0
>Reporter: Till Rohrmann
>Assignee: Chesnay Schepler
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Add an end-to-end test using the {{PrometheusReporter}} to verify that all 
> metrics are properly reported. Additionally verify that the newly introduce 
> RocksDB metrics are accessible (see FLINK-10423).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] kl0u edited a comment on issue #7003: [FLINK-10633][prometheus] Add E2E test

2018-11-02 Thread GitBox
kl0u edited a comment on issue #7003: [FLINK-10633][prometheus] Add E2E test
URL: https://github.com/apache/flink/pull/7003#issuecomment-435437266
 
 
   Btw, three more notes:
   1) there should be a discussion about how a testing framework should look 
like in the future, so that we are all on-board
   2) it would be nice to have more javadocs
   3) I would also suggest (slowly but steadily) to move away from `sed` or 
`wget` and stuff (in  future JIRAs).


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10729) Create a Hive connector for Hive data access in Flink

2018-11-02 Thread Xuefu Zhang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10729?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16673382#comment-16673382
 ] 

Xuefu Zhang commented on FLINK-10729:
-

@li Thanks for your interest. Yes, metadata integration is in progress and 
tracked by FLINK-10744. This one covers data aspect. We can only declare 
integration is is completed after the two are in place. Your contribution is 
certainly welcome.

BTW, I'm currently coming up with a rough design and will be shared soon. I can 
image that a prototype is needed.

> Create a Hive connector for Hive data access in Flink
> -
>
> Key: FLINK-10729
> URL: https://issues.apache.org/jira/browse/FLINK-10729
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Affects Versions: 1.6.2
>Reporter: Xuefu Zhang
>Assignee: Xuefu Zhang
>Priority: Major
>
> As part of Flink-Hive integration effort, it's important for Flink to access 
> (read/write) Hive data, which is the responsibility of Hive connector. While 
> there is a HCatalog data connector in the code base, it's not complete (i.e. 
> missing all connector related classes such as validators, etc.). Further, 
> HCatalog interface has many limitations such as accessing a subset of Hive 
> data, supporting a subset of Hive data types, etc. In addition, it's not 
> actively maintained. In fact, it's now only a sub-project in Hive.
> Therefore, here we propose a complete connector set for Hive tables, not via 
> HCatalog, but via direct Hive interface. HCatalog connector will be 
> deprecated.
> Please note that connector on Hive metadata is already covered in other 
> JIRAs, as {{HiveExternalCatalog}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10633) End-to-end test: Prometheus reporter

2018-11-02 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10633?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16673379#comment-16673379
 ] 

ASF GitHub Bot commented on FLINK-10633:


kl0u commented on issue #7003: [FLINK-10633][prometheus] Add E2E test
URL: https://github.com/apache/flink/pull/7003#issuecomment-435437266
 
 
   Btw, two more general notes:
   1) it would be nice to have more javadocs
   2) I would also suggest (slowly but steadily) to move away from `sed` or 
`wget` and stuff (in  future JIRAs).


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> End-to-end test: Prometheus reporter
> 
>
> Key: FLINK-10633
> URL: https://issues.apache.org/jira/browse/FLINK-10633
> Project: Flink
>  Issue Type: Sub-task
>  Components: E2E Tests
>Affects Versions: 1.7.0
>Reporter: Till Rohrmann
>Assignee: Chesnay Schepler
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Add an end-to-end test using the {{PrometheusReporter}} to verify that all 
> metrics are properly reported. Additionally verify that the newly introduce 
> RocksDB metrics are accessible (see FLINK-10423).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] kl0u commented on issue #7003: [FLINK-10633][prometheus] Add E2E test

2018-11-02 Thread GitBox
kl0u commented on issue #7003: [FLINK-10633][prometheus] Add E2E test
URL: https://github.com/apache/flink/pull/7003#issuecomment-435437266
 
 
   Btw, two more general notes:
   1) it would be nice to have more javadocs
   2) I would also suggest (slowly but steadily) to move away from `sed` or 
`wget` and stuff (in  future JIRAs).


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10633) End-to-end test: Prometheus reporter

2018-11-02 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10633?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16673343#comment-16673343
 ] 

ASF GitHub Bot commented on FLINK-10633:


kl0u commented on a change in pull request #7003: [FLINK-10633][prometheus] Add 
E2E test
URL: https://github.com/apache/flink/pull/7003#discussion_r230429154
 
 

 ##
 File path: 
flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/FlinkDistribution.java
 ##
 @@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tests.util;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.UnmodifiableConfiguration;
+import org.apache.flink.util.ExceptionUtils;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.Response;
+import org.junit.Assert;
+import org.junit.rules.ExternalResource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardCopyOption;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * A wrapper around a Flink distribution.
+ */
+public final class FlinkDistribution extends ExternalResource {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(FlinkDistribution.class);
+
+   private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+   private final List filesToDelete = new ArrayList<>(4);
+
+   private static final Path FLINK_CONF_YAML = 
Paths.get("flink-conf.yaml");
+   private static final Path FLINK_CONF_YAML_BACKUP = 
Paths.get("flink-conf.yaml.bak");
+
+   private final Path opt;
+   private final Path lib;
+   private final Path conf;
+   private final Path log;
+   private final Path bin;
+
+   private Configuration defaultConfig;
+
+   public FlinkDistribution() {
+   final String distDirProperty = System.getProperty("distDir");
+   if (distDirProperty == null) {
+   Assert.fail("The distDir property was not set. You can 
set it when running maven via -DdistDir= .");
+   }
+   final Path flinkDir = Paths.get(distDirProperty);
+   bin = flinkDir.resolve("bin");
+   opt = flinkDir.resolve("opt");
+   lib = flinkDir.resolve("lib");
+   conf = flinkDir.resolve("conf");
+   log = flinkDir.resolve("log");
+   }
+
+   @Override
+   protected void before() throws IOException {
+   defaultConfig = new 
UnmodifiableConfiguration(GlobalConfiguration.loadConfiguration(conf.toAbsolutePath().toString()));
+   final Path originalConfig = conf.resolve(FLINK_CONF_YAML);
+   final Path backupConfig = conf.resolve(FLINK_CONF_YAML_BACKUP);
+   Files.copy(originalConfig, backupConfig);
+   filesToDelete.add(new AutoClosablePath(backupConfig));
+   }
+
+   @Override
+   protected void after() {
+   try {
+   stopFlinkCluster();
+   } catch (IOException e) {
+   LOG.error("Failure while shutting down Flink cluster.", 
e);
+   }
+
+   final Path originalConfig = conf.resolve(FLINK_CONF_YAML);
+   final Path backupConfig = conf.resolve(FLINK_CONF_YAML_BACKUP);
+
+   try {
+  

[GitHub] kl0u commented on a change in pull request #7003: [FLINK-10633][prometheus] Add E2E test

2018-11-02 Thread GitBox
kl0u commented on a change in pull request #7003: [FLINK-10633][prometheus] Add 
E2E test
URL: https://github.com/apache/flink/pull/7003#discussion_r230429154
 
 

 ##
 File path: 
flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/FlinkDistribution.java
 ##
 @@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tests.util;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.UnmodifiableConfiguration;
+import org.apache.flink.util.ExceptionUtils;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.Response;
+import org.junit.Assert;
+import org.junit.rules.ExternalResource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardCopyOption;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * A wrapper around a Flink distribution.
+ */
+public final class FlinkDistribution extends ExternalResource {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(FlinkDistribution.class);
+
+   private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+   private final List filesToDelete = new ArrayList<>(4);
+
+   private static final Path FLINK_CONF_YAML = 
Paths.get("flink-conf.yaml");
+   private static final Path FLINK_CONF_YAML_BACKUP = 
Paths.get("flink-conf.yaml.bak");
+
+   private final Path opt;
+   private final Path lib;
+   private final Path conf;
+   private final Path log;
+   private final Path bin;
+
+   private Configuration defaultConfig;
+
+   public FlinkDistribution() {
+   final String distDirProperty = System.getProperty("distDir");
+   if (distDirProperty == null) {
+   Assert.fail("The distDir property was not set. You can 
set it when running maven via -DdistDir= .");
+   }
+   final Path flinkDir = Paths.get(distDirProperty);
+   bin = flinkDir.resolve("bin");
+   opt = flinkDir.resolve("opt");
+   lib = flinkDir.resolve("lib");
+   conf = flinkDir.resolve("conf");
+   log = flinkDir.resolve("log");
+   }
+
+   @Override
+   protected void before() throws IOException {
+   defaultConfig = new 
UnmodifiableConfiguration(GlobalConfiguration.loadConfiguration(conf.toAbsolutePath().toString()));
+   final Path originalConfig = conf.resolve(FLINK_CONF_YAML);
+   final Path backupConfig = conf.resolve(FLINK_CONF_YAML_BACKUP);
+   Files.copy(originalConfig, backupConfig);
+   filesToDelete.add(new AutoClosablePath(backupConfig));
+   }
+
+   @Override
+   protected void after() {
+   try {
+   stopFlinkCluster();
+   } catch (IOException e) {
+   LOG.error("Failure while shutting down Flink cluster.", 
e);
+   }
+
+   final Path originalConfig = conf.resolve(FLINK_CONF_YAML);
+   final Path backupConfig = conf.resolve(FLINK_CONF_YAML_BACKUP);
+
+   try {
+   Files.move(backupConfig, originalConfig, 
StandardCopyOption.REPLACE_EXISTING);
+   } catch (IOException e) {
+   LOG.error("Failed to restore flink-conf.yaml", e);
+   }
+
+

[jira] [Commented] (FLINK-10455) Potential Kafka producer leak in case of failures

2018-11-02 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10455?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16673336#comment-16673336
 ] 

ASF GitHub Bot commented on FLINK-10455:


pnowojski commented on issue #6989: [FLINK-10455][Kafka Tx] Close transactional 
producers in case of failure and termination
URL: https://github.com/apache/flink/pull/6989#issuecomment-435429850
 
 
   Regarding your question @azagrebin, I think my bug fix for 
https://issues.apache.org/jira/browse/FLINK-8086  (which added "ignore 
ProducerFencedException") indeed might turned the @GJL's timeouts code:
   ```
} catch (final Exception e) {
final long elapsedTime = clock.millis() - 
transactionHolder.transactionStartTime;
if (ignoreFailuresAfterTransactionTimeout && 
elapsedTime > transactionTimeout) {
LOG.error("Error while committing transaction 
{}. " +
"Transaction has been open for 
longer than the transaction timeout ({})." +
"Commit will not be attempted 
again. Data loss might have occurred.",
transactionHolder.handle, 
transactionTimeout, e);
} else {
throw e;
}
}
   ```
   into a dead code at the moment. But I think my bug fix is more important 
(from my commit message):
   > [FLINK-8086][kafka] Ignore ProducerFencedException 
   >
   > during recovery  ProducerFencedException can happen if we restore twice 
from the same checkpoint or if we restore from an old savepoint. In both cases 
transactional.ids that we want to recoverAndCommit have been already committed 
and reused. Reusing mean that they will be known by Kafka's brokers under newer 
producerId/epochId, which will result in ProducerFencedException if we try to 
commit again some old (and already committed) transaction.  
   >
   > Ignoring this exception might hide some bugs/issues, because instead of 
failing we might have a semi silent (with a warning) data loss.
   
   I think the last sentence answers your question.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Potential Kafka producer leak in case of failures
> -
>
> Key: FLINK-10455
> URL: https://issues.apache.org/jira/browse/FLINK-10455
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.5.2
>Reporter: Nico Kruber
>Assignee: Andrey Zagrebin
>Priority: Major
>  Labels: pull-request-available
>
> If the Kafka brokers' timeout is too low for our checkpoint interval [1], we 
> may get an {{ProducerFencedException}}. Documentation around 
> {{ProducerFencedException}} explicitly states that we should close the 
> producer after encountering it.
> By looking at the code, it doesn't seem like this is actually done in 
> {{FlinkKafkaProducer011}}. Also, in case one transaction's commit in 
> {{TwoPhaseCommitSinkFunction#notifyCheckpointComplete}} fails with an 
> exception, we don't clean up (nor try to commit) any other transaction.
> -> from what I see, {{TwoPhaseCommitSinkFunction#notifyCheckpointComplete}} 
> simply iterates over the {{pendingCommitTransactions}} which is not touched 
> during {{close()}}
> Now if we restart the failing job on the same Flink cluster, any resources 
> from the previous attempt will still linger around.
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/connectors/kafka.html#kafka-011



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] pnowojski commented on issue #6989: [FLINK-10455][Kafka Tx] Close transactional producers in case of failure and termination

2018-11-02 Thread GitBox
pnowojski commented on issue #6989: [FLINK-10455][Kafka Tx] Close transactional 
producers in case of failure and termination
URL: https://github.com/apache/flink/pull/6989#issuecomment-435429850
 
 
   Regarding your question @azagrebin, I think my bug fix for 
https://issues.apache.org/jira/browse/FLINK-8086  (which added "ignore 
ProducerFencedException") indeed might turned the @GJL's timeouts code:
   ```
} catch (final Exception e) {
final long elapsedTime = clock.millis() - 
transactionHolder.transactionStartTime;
if (ignoreFailuresAfterTransactionTimeout && 
elapsedTime > transactionTimeout) {
LOG.error("Error while committing transaction 
{}. " +
"Transaction has been open for 
longer than the transaction timeout ({})." +
"Commit will not be attempted 
again. Data loss might have occurred.",
transactionHolder.handle, 
transactionTimeout, e);
} else {
throw e;
}
}
   ```
   into a dead code at the moment. But I think my bug fix is more important 
(from my commit message):
   > [FLINK-8086][kafka] Ignore ProducerFencedException 
   >
   > during recovery  ProducerFencedException can happen if we restore twice 
from the same checkpoint or if we restore from an old savepoint. In both cases 
transactional.ids that we want to recoverAndCommit have been already committed 
and reused. Reusing mean that they will be known by Kafka's brokers under newer 
producerId/epochId, which will result in ProducerFencedException if we try to 
commit again some old (and already committed) transaction.  
   >
   > Ignoring this exception might hide some bugs/issues, because instead of 
failing we might have a semi silent (with a warning) data loss.
   
   I think the last sentence answers your question.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10633) End-to-end test: Prometheus reporter

2018-11-02 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10633?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16673329#comment-16673329
 ] 

ASF GitHub Bot commented on FLINK-10633:


zentol commented on a change in pull request #7003: [FLINK-10633][prometheus] 
Add E2E test
URL: https://github.com/apache/flink/pull/7003#discussion_r230425448
 
 

 ##
 File path: 
flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/FlinkDistribution.java
 ##
 @@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tests.util;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.UnmodifiableConfiguration;
+import org.apache.flink.util.ExceptionUtils;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.Response;
+import org.junit.Assert;
+import org.junit.rules.ExternalResource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardCopyOption;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * A wrapper around a Flink distribution.
+ */
+public final class FlinkDistribution extends ExternalResource {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(FlinkDistribution.class);
+
+   private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+   private final List filesToDelete = new ArrayList<>(4);
+
+   private static final Path FLINK_CONF_YAML = 
Paths.get("flink-conf.yaml");
+   private static final Path FLINK_CONF_YAML_BACKUP = 
Paths.get("flink-conf.yaml.bak");
+
+   private final Path opt;
+   private final Path lib;
+   private final Path conf;
+   private final Path log;
+   private final Path bin;
+
+   private Configuration defaultConfig;
+
+   public FlinkDistribution() {
+   final String distDirProperty = System.getProperty("distDir");
+   if (distDirProperty == null) {
+   Assert.fail("The distDir property was not set. You can 
set it when running maven via -DdistDir= .");
+   }
+   final Path flinkDir = Paths.get(distDirProperty);
+   bin = flinkDir.resolve("bin");
+   opt = flinkDir.resolve("opt");
+   lib = flinkDir.resolve("lib");
+   conf = flinkDir.resolve("conf");
+   log = flinkDir.resolve("log");
+   }
+
+   @Override
+   protected void before() throws IOException {
+   defaultConfig = new 
UnmodifiableConfiguration(GlobalConfiguration.loadConfiguration(conf.toAbsolutePath().toString()));
+   final Path originalConfig = conf.resolve(FLINK_CONF_YAML);
+   final Path backupConfig = conf.resolve(FLINK_CONF_YAML_BACKUP);
+   Files.copy(originalConfig, backupConfig);
+   filesToDelete.add(new AutoClosablePath(backupConfig));
+   }
+
+   @Override
+   protected void after() {
+   try {
+   stopFlinkCluster();
+   } catch (IOException e) {
+   LOG.error("Failure while shutting down Flink cluster.", 
e);
+   }
+
+   final Path originalConfig = conf.resolve(FLINK_CONF_YAML);
+   final Path backupConfig = conf.resolve(FLINK_CONF_YAML_BACKUP);
+
+   try {

[GitHub] zentol commented on a change in pull request #7003: [FLINK-10633][prometheus] Add E2E test

2018-11-02 Thread GitBox
zentol commented on a change in pull request #7003: [FLINK-10633][prometheus] 
Add E2E test
URL: https://github.com/apache/flink/pull/7003#discussion_r230425448
 
 

 ##
 File path: 
flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/FlinkDistribution.java
 ##
 @@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tests.util;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.UnmodifiableConfiguration;
+import org.apache.flink.util.ExceptionUtils;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.Response;
+import org.junit.Assert;
+import org.junit.rules.ExternalResource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardCopyOption;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * A wrapper around a Flink distribution.
+ */
+public final class FlinkDistribution extends ExternalResource {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(FlinkDistribution.class);
+
+   private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+   private final List filesToDelete = new ArrayList<>(4);
+
+   private static final Path FLINK_CONF_YAML = 
Paths.get("flink-conf.yaml");
+   private static final Path FLINK_CONF_YAML_BACKUP = 
Paths.get("flink-conf.yaml.bak");
+
+   private final Path opt;
+   private final Path lib;
+   private final Path conf;
+   private final Path log;
+   private final Path bin;
+
+   private Configuration defaultConfig;
+
+   public FlinkDistribution() {
+   final String distDirProperty = System.getProperty("distDir");
+   if (distDirProperty == null) {
+   Assert.fail("The distDir property was not set. You can 
set it when running maven via -DdistDir= .");
+   }
+   final Path flinkDir = Paths.get(distDirProperty);
+   bin = flinkDir.resolve("bin");
+   opt = flinkDir.resolve("opt");
+   lib = flinkDir.resolve("lib");
+   conf = flinkDir.resolve("conf");
+   log = flinkDir.resolve("log");
+   }
+
+   @Override
+   protected void before() throws IOException {
+   defaultConfig = new 
UnmodifiableConfiguration(GlobalConfiguration.loadConfiguration(conf.toAbsolutePath().toString()));
+   final Path originalConfig = conf.resolve(FLINK_CONF_YAML);
+   final Path backupConfig = conf.resolve(FLINK_CONF_YAML_BACKUP);
+   Files.copy(originalConfig, backupConfig);
+   filesToDelete.add(new AutoClosablePath(backupConfig));
+   }
+
+   @Override
+   protected void after() {
+   try {
+   stopFlinkCluster();
+   } catch (IOException e) {
+   LOG.error("Failure while shutting down Flink cluster.", 
e);
+   }
+
+   final Path originalConfig = conf.resolve(FLINK_CONF_YAML);
+   final Path backupConfig = conf.resolve(FLINK_CONF_YAML_BACKUP);
+
+   try {
+   Files.move(backupConfig, originalConfig, 
StandardCopyOption.REPLACE_EXISTING);
+   } catch (IOException e) {
+   LOG.error("Failed to restore flink-conf.yaml", e);
+   }
+
+  

[jira] [Commented] (FLINK-10638) Invalid table scan resolution for temporal join queries

2018-11-02 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10638?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16673297#comment-16673297
 ] 

ASF GitHub Bot commented on FLINK-10638:


pnowojski commented on a change in pull request #6981: [FLINK-10638][table] 
Invalid table scan resolution for temporal join queries
URL: https://github.com/apache/flink/pull/6981#discussion_r230420946
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/TemporalJoinITCase.scala
 ##
 @@ -142,11 +140,16 @@ class TemporalJoinITCase extends 
StreamingWithStateTestBase {
 
 tEnv.registerTable("Orders", orders)
 tEnv.registerTable("RatesHistory", ratesHistory)
+tEnv.registerTable("FilteredRatesHistory", 
tEnv.scan("RatesHistory").filter('rate > 110L))
 tEnv.registerFunction(
   "Rates",
-  ratesHistory.createTemporalTableFunction('rowtime, 'currency))
+  tEnv.scan("FilteredRatesHistory").createTemporalTableFunction('rowtime, 
'currency))
+tEnv.registerTable("TemporalJoinResult", tEnv.sqlQuery(sqlQuery))
 
-val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
+// Scan from registered table to test for interplay between
+// LogicalCorrelateToTemporalTableJoinRule and TableScanRule
+val result = tEnv.scan("TemporalJoinResult").toAppendStream[Row]
+System.out.println(tEnv.explain(tEnv.sqlQuery(sqlQuery)))
 
 Review comment:
   ops  


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Invalid table scan resolution for temporal join queries
> ---
>
> Key: FLINK-10638
> URL: https://issues.apache.org/jira/browse/FLINK-10638
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Registered tables that contain a temporal join are not properly resolved when 
> performing a table scan.
> A planning error occurs when registering a table with a temporal join and 
> reading from it again.
> {code}
> LogicalProject(amount=[*($0, $4)])
>   LogicalFilter(condition=[=($3, $1)])
> LogicalCorrelate(correlation=[$cor0], joinType=[inner], 
> requiredColumns=[{2}])
>   LogicalTableScan(table=[[_DataStreamTable_0]])
>   
> LogicalTableFunctionScan(invocation=[Rates(CAST($cor0.rowtime):TIMESTAMP(3) 
> NOT NULL)], rowType=[RecordType(VARCHAR(65536) currency, BIGINT rate, TIME 
> ATTRIBUTE(ROWTIME) rowtime)], elementType=[class [Ljava.lang.Object;])
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] pnowojski commented on a change in pull request #6981: [FLINK-10638][table] Invalid table scan resolution for temporal join queries

2018-11-02 Thread GitBox
pnowojski commented on a change in pull request #6981: [FLINK-10638][table] 
Invalid table scan resolution for temporal join queries
URL: https://github.com/apache/flink/pull/6981#discussion_r230420946
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/TemporalJoinITCase.scala
 ##
 @@ -142,11 +140,16 @@ class TemporalJoinITCase extends 
StreamingWithStateTestBase {
 
 tEnv.registerTable("Orders", orders)
 tEnv.registerTable("RatesHistory", ratesHistory)
+tEnv.registerTable("FilteredRatesHistory", 
tEnv.scan("RatesHistory").filter('rate > 110L))
 tEnv.registerFunction(
   "Rates",
-  ratesHistory.createTemporalTableFunction('rowtime, 'currency))
+  tEnv.scan("FilteredRatesHistory").createTemporalTableFunction('rowtime, 
'currency))
+tEnv.registerTable("TemporalJoinResult", tEnv.sqlQuery(sqlQuery))
 
-val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
+// Scan from registered table to test for interplay between
+// LogicalCorrelateToTemporalTableJoinRule and TableScanRule
+val result = tEnv.scan("TemporalJoinResult").toAppendStream[Row]
+System.out.println(tEnv.explain(tEnv.sqlQuery(sqlQuery)))
 
 Review comment:
   ops  


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10356) Add sanity checks to SpillingAdaptiveSpanningRecordDeserializer

2018-11-02 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10356?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16673295#comment-16673295
 ] 

ASF GitHub Bot commented on FLINK-10356:


NicoK commented on a change in pull request #6705: [FLINK-10356][network] add 
sanity checks to SpillingAdaptiveSpanningRecordDeserializer
URL: https://github.com/apache/flink/pull/6705#discussion_r230420404
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java
 ##
 @@ -104,11 +120,236 @@ public void testHandleMixedLargeRecords() throws 
Exception {
testSerializationRoundTrip(originalRecords, segmentSize);
}
 
+   /**
+* Non-spanning, deserialization reads one byte too many and succeeds.
+*/
+   @Test
+   public void testHandleDeserializingTooMuchNonSpanning1() throws 
Exception {
+   testHandleWrongDeserialization(
+   new StringValueDeserializingTooMuch("Test string"),
+   32 * 1024,
+   null);
+   }
+
+   /**
+* Non-spanning, serialization length is 16 (including headers), 
deserialization reads one byte
+* too many and succeeds.
+*/
+   @Test
+   public void testHandleDeserializingTooMuchNonSpanning2() throws 
Exception {
+   testHandleWrongDeserialization(
+   new StringValueDeserializingTooMuch("Test string"),
+   17,
 
 Review comment:
   yes, that would be really nice but is not available out of the box since 
`StringValue#getBinaryLength` only returns `-1` and additionally, we do add one 
more length header from the `SpanningRecordSerializer` - I could evaluate the 
actual value through serializing it (maybe only once, statically) though and 
keep using that or so...


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add sanity checks to SpillingAdaptiveSpanningRecordDeserializer
> ---
>
> Key: FLINK-10356
> URL: https://issues.apache.org/jira/browse/FLINK-10356
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.5.0, 1.5.1, 1.5.2, 1.5.3, 1.5.4, 1.6.0, 1.6.1, 1.7.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Major
>  Labels: pull-request-available
>
> {{SpillingAdaptiveSpanningRecordDeserializer}} doesn't have any consistency 
> checks for usage calls or serializers behaving properly, e.g. to read only as 
> many bytes as available/promised for that record. At least these checks 
> should be added:
>  # Check that buffers have not been read from yet before adding them (this is 
> an invariant {{SpillingAdaptiveSpanningRecordDeserializer}} works with and 
> from what I can see, it is followed now.
>  # Check that after deserialization, we actually consumed {{recordLength}} 
> bytes
>  ** If not, in the spanning deserializer, we currently simply skip the 
> remaining bytes.
>  ** But in the non-spanning deserializer, we currently continue from the 
> wrong offset.
>  # Protect against {{setNextBuffer}} being called before draining all 
> available records



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] NicoK commented on a change in pull request #6705: [FLINK-10356][network] add sanity checks to SpillingAdaptiveSpanningRecordDeserializer

2018-11-02 Thread GitBox
NicoK commented on a change in pull request #6705: [FLINK-10356][network] add 
sanity checks to SpillingAdaptiveSpanningRecordDeserializer
URL: https://github.com/apache/flink/pull/6705#discussion_r230420404
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java
 ##
 @@ -104,11 +120,236 @@ public void testHandleMixedLargeRecords() throws 
Exception {
testSerializationRoundTrip(originalRecords, segmentSize);
}
 
+   /**
+* Non-spanning, deserialization reads one byte too many and succeeds.
+*/
+   @Test
+   public void testHandleDeserializingTooMuchNonSpanning1() throws 
Exception {
+   testHandleWrongDeserialization(
+   new StringValueDeserializingTooMuch("Test string"),
+   32 * 1024,
+   null);
+   }
+
+   /**
+* Non-spanning, serialization length is 16 (including headers), 
deserialization reads one byte
+* too many and succeeds.
+*/
+   @Test
+   public void testHandleDeserializingTooMuchNonSpanning2() throws 
Exception {
+   testHandleWrongDeserialization(
+   new StringValueDeserializingTooMuch("Test string"),
+   17,
 
 Review comment:
   yes, that would be really nice but is not available out of the box since 
`StringValue#getBinaryLength` only returns `-1` and additionally, we do add one 
more length header from the `SpanningRecordSerializer` - I could evaluate the 
actual value through serializing it (maybe only once, statically) though and 
keep using that or so...


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10633) End-to-end test: Prometheus reporter

2018-11-02 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10633?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16673294#comment-16673294
 ] 

ASF GitHub Bot commented on FLINK-10633:


zentol commented on a change in pull request #7003: [FLINK-10633][prometheus] 
Add E2E test
URL: https://github.com/apache/flink/pull/7003#discussion_r230420322
 
 

 ##
 File path: 
flink-end-to-end-tests/flink-metrics-reporter-prometheus-test/src/test/java/org/apache/flink/metrics/prometheus/tests/PrometheusReporterEndToEndITCase.java
 ##
 @@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.prometheus.tests;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.prometheus.PrometheusReporter;
+import org.apache.flink.tests.util.AutoClosableProcess;
+import org.apache.flink.tests.util.CommandLineWrapper;
+import org.apache.flink.tests.util.FlinkDistribution;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.TestLogger;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.Response;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.time.Duration;
+import java.util.List;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.tests.util.AutoClosableProcess.runBlocking;
+import static org.apache.flink.tests.util.AutoClosableProcess.runNonBlocking;
+
+/**
+ * End-to-end test for the PrometheusReporter.
+ */
+public class PrometheusReporterEndToEndITCase extends TestLogger {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(PrometheusReporterEndToEndITCase.class);
+
+   private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+   private static final String PROMETHEUS_FILE_NAME = 
"prometheus-2.4.3.linux-amd64";
+
+   private static final Pattern LOG_REPORTER_PORT_PATTERN = 
Pattern.compile(".*Started PrometheusReporter HTTP server on port ([0-9]+).*");
+
+   @Rule
+   public final FlinkDistribution dist = new FlinkDistribution();
+
+   @Rule
+   public final TemporaryFolder tmp = new TemporaryFolder();
+
+   @Test
+   public void testReporter() throws Exception {
+   LOG.info("starting test");
+   dist.copyOptJarsToLib("flink-metrics-prometheus");
+
+   final Configuration config = new Configuration();
+   config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"prom." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, 
PrometheusReporter.class.getCanonicalName());
+   config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"prom.port", "9000-9100");
+
+   dist.appendConfiguration(config);
+
+   final Path tmpPrometheusDir = 
tmp.newFolder().toPath().resolve("prometheus");
+   final Path prometheusArchive = 
tmpPrometheusDir.resolve(PROMETHEUS_FILE_NAME + ".tar.gz");
+   final Path prometheusBinDir = 
tmpPrometheusDir.resolve(PROMETHEUS_FILE_NAME);
+   final Path prometheusConfig = 
prometheusBinDir.resolve("prometheus.yml");
+   final Path prometheusBinary = 
prometheusBinDir.resolve("prometheus");
+   Files.createDirectory(tmpPrometheusDir);
+
+   runBlocking(
+   "Download of Prometheus",
+   Duration.ofMinutes(5),
+   CommandLineWrapper
+   
.wget("https://github.com/prometheus/prometheus/releases/download/v2.4.3/; + 
prometheusArchive.getFileName())
 
 Review comment:
   hmm...maybe but I doubt we'll change the version anytime soon.


This is an automated message from the Apache Git Service.
To respond to the message, please log 

[GitHub] zentol commented on a change in pull request #7003: [FLINK-10633][prometheus] Add E2E test

2018-11-02 Thread GitBox
zentol commented on a change in pull request #7003: [FLINK-10633][prometheus] 
Add E2E test
URL: https://github.com/apache/flink/pull/7003#discussion_r230420322
 
 

 ##
 File path: 
flink-end-to-end-tests/flink-metrics-reporter-prometheus-test/src/test/java/org/apache/flink/metrics/prometheus/tests/PrometheusReporterEndToEndITCase.java
 ##
 @@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.prometheus.tests;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.prometheus.PrometheusReporter;
+import org.apache.flink.tests.util.AutoClosableProcess;
+import org.apache.flink.tests.util.CommandLineWrapper;
+import org.apache.flink.tests.util.FlinkDistribution;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.TestLogger;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.Response;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.time.Duration;
+import java.util.List;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.tests.util.AutoClosableProcess.runBlocking;
+import static org.apache.flink.tests.util.AutoClosableProcess.runNonBlocking;
+
+/**
+ * End-to-end test for the PrometheusReporter.
+ */
+public class PrometheusReporterEndToEndITCase extends TestLogger {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(PrometheusReporterEndToEndITCase.class);
+
+   private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+   private static final String PROMETHEUS_FILE_NAME = 
"prometheus-2.4.3.linux-amd64";
+
+   private static final Pattern LOG_REPORTER_PORT_PATTERN = 
Pattern.compile(".*Started PrometheusReporter HTTP server on port ([0-9]+).*");
+
+   @Rule
+   public final FlinkDistribution dist = new FlinkDistribution();
+
+   @Rule
+   public final TemporaryFolder tmp = new TemporaryFolder();
+
+   @Test
+   public void testReporter() throws Exception {
+   LOG.info("starting test");
+   dist.copyOptJarsToLib("flink-metrics-prometheus");
+
+   final Configuration config = new Configuration();
+   config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"prom." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, 
PrometheusReporter.class.getCanonicalName());
+   config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"prom.port", "9000-9100");
+
+   dist.appendConfiguration(config);
+
+   final Path tmpPrometheusDir = 
tmp.newFolder().toPath().resolve("prometheus");
+   final Path prometheusArchive = 
tmpPrometheusDir.resolve(PROMETHEUS_FILE_NAME + ".tar.gz");
+   final Path prometheusBinDir = 
tmpPrometheusDir.resolve(PROMETHEUS_FILE_NAME);
+   final Path prometheusConfig = 
prometheusBinDir.resolve("prometheus.yml");
+   final Path prometheusBinary = 
prometheusBinDir.resolve("prometheus");
+   Files.createDirectory(tmpPrometheusDir);
+
+   runBlocking(
+   "Download of Prometheus",
+   Duration.ofMinutes(5),
+   CommandLineWrapper
+   
.wget("https://github.com/prometheus/prometheus/releases/download/v2.4.3/; + 
prometheusArchive.getFileName())
 
 Review comment:
   hmm...maybe but I doubt we'll change the version anytime soon.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10633) End-to-end test: Prometheus reporter

2018-11-02 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10633?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16673292#comment-16673292
 ] 

ASF GitHub Bot commented on FLINK-10633:


zentol commented on a change in pull request #7003: [FLINK-10633][prometheus] 
Add E2E test
URL: https://github.com/apache/flink/pull/7003#discussion_r230420018
 
 

 ##
 File path: 
flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/FlinkDistribution.java
 ##
 @@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tests.util;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.UnmodifiableConfiguration;
+import org.apache.flink.util.ExceptionUtils;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.Response;
+import org.junit.Assert;
+import org.junit.rules.ExternalResource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardCopyOption;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * A wrapper around a Flink distribution.
+ */
+public final class FlinkDistribution extends ExternalResource {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(FlinkDistribution.class);
+
+   private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+   private final List filesToDelete = new ArrayList<>(4);
+
+   private static final Path FLINK_CONF_YAML = 
Paths.get("flink-conf.yaml");
+   private static final Path FLINK_CONF_YAML_BACKUP = 
Paths.get("flink-conf.yaml.bak");
+
+   private final Path opt;
+   private final Path lib;
+   private final Path conf;
+   private final Path log;
+   private final Path bin;
+
+   private Configuration defaultConfig;
+
+   public FlinkDistribution() {
+   final String distDirProperty = System.getProperty("distDir");
+   if (distDirProperty == null) {
+   Assert.fail("The distDir property was not set. You can 
set it when running maven via -DdistDir= .");
+   }
+   final Path flinkDir = Paths.get(distDirProperty);
+   bin = flinkDir.resolve("bin");
+   opt = flinkDir.resolve("opt");
+   lib = flinkDir.resolve("lib");
+   conf = flinkDir.resolve("conf");
+   log = flinkDir.resolve("log");
+   }
+
+   @Override
+   protected void before() throws IOException {
+   defaultConfig = new 
UnmodifiableConfiguration(GlobalConfiguration.loadConfiguration(conf.toAbsolutePath().toString()));
+   final Path originalConfig = conf.resolve(FLINK_CONF_YAML);
+   final Path backupConfig = conf.resolve(FLINK_CONF_YAML_BACKUP);
+   Files.copy(originalConfig, backupConfig);
+   filesToDelete.add(new AutoClosablePath(backupConfig));
+   }
+
+   @Override
+   protected void after() {
+   try {
 
 Review comment:
   This is purely for safety and as such I'd like to keep it without any 
condition.
   I can see the point, and maybe we should return a `Closable` in 
`startCluster` to allow better management in the test.
   
   `FlinkDistribution` does not represent a flink cluster but `flink-dist`; it 
thus doesn't make sense to start a cluster in `before`.


[GitHub] zentol commented on a change in pull request #7003: [FLINK-10633][prometheus] Add E2E test

2018-11-02 Thread GitBox
zentol commented on a change in pull request #7003: [FLINK-10633][prometheus] 
Add E2E test
URL: https://github.com/apache/flink/pull/7003#discussion_r230420018
 
 

 ##
 File path: 
flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/FlinkDistribution.java
 ##
 @@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tests.util;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.UnmodifiableConfiguration;
+import org.apache.flink.util.ExceptionUtils;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.Response;
+import org.junit.Assert;
+import org.junit.rules.ExternalResource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardCopyOption;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * A wrapper around a Flink distribution.
+ */
+public final class FlinkDistribution extends ExternalResource {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(FlinkDistribution.class);
+
+   private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+   private final List filesToDelete = new ArrayList<>(4);
+
+   private static final Path FLINK_CONF_YAML = 
Paths.get("flink-conf.yaml");
+   private static final Path FLINK_CONF_YAML_BACKUP = 
Paths.get("flink-conf.yaml.bak");
+
+   private final Path opt;
+   private final Path lib;
+   private final Path conf;
+   private final Path log;
+   private final Path bin;
+
+   private Configuration defaultConfig;
+
+   public FlinkDistribution() {
+   final String distDirProperty = System.getProperty("distDir");
+   if (distDirProperty == null) {
+   Assert.fail("The distDir property was not set. You can 
set it when running maven via -DdistDir= .");
+   }
+   final Path flinkDir = Paths.get(distDirProperty);
+   bin = flinkDir.resolve("bin");
+   opt = flinkDir.resolve("opt");
+   lib = flinkDir.resolve("lib");
+   conf = flinkDir.resolve("conf");
+   log = flinkDir.resolve("log");
+   }
+
+   @Override
+   protected void before() throws IOException {
+   defaultConfig = new 
UnmodifiableConfiguration(GlobalConfiguration.loadConfiguration(conf.toAbsolutePath().toString()));
+   final Path originalConfig = conf.resolve(FLINK_CONF_YAML);
+   final Path backupConfig = conf.resolve(FLINK_CONF_YAML_BACKUP);
+   Files.copy(originalConfig, backupConfig);
+   filesToDelete.add(new AutoClosablePath(backupConfig));
+   }
+
+   @Override
+   protected void after() {
+   try {
 
 Review comment:
   This is purely for safety and as such I'd like to keep it without any 
condition.
   I can see the point, and maybe we should return a `Closable` in 
`startCluster` to allow better management in the test.
   
   `FlinkDistribution` does not represent a flink cluster but `flink-dist`; it 
thus doesn't make sense to start a cluster in `before`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about 

[jira] [Commented] (FLINK-10633) End-to-end test: Prometheus reporter

2018-11-02 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10633?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16673284#comment-16673284
 ] 

ASF GitHub Bot commented on FLINK-10633:


zentol commented on a change in pull request #7003: [FLINK-10633][prometheus] 
Add E2E test
URL: https://github.com/apache/flink/pull/7003#discussion_r230418196
 
 

 ##
 File path: 
flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/FlinkDistribution.java
 ##
 @@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tests.util;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.UnmodifiableConfiguration;
+import org.apache.flink.util.ExceptionUtils;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.Response;
+import org.junit.Assert;
+import org.junit.rules.ExternalResource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardCopyOption;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * A wrapper around a Flink distribution.
+ */
+public final class FlinkDistribution extends ExternalResource {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(FlinkDistribution.class);
+
+   private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+   private final List filesToDelete = new ArrayList<>(4);
+
+   private static final Path FLINK_CONF_YAML = 
Paths.get("flink-conf.yaml");
+   private static final Path FLINK_CONF_YAML_BACKUP = 
Paths.get("flink-conf.yaml.bak");
+
+   private final Path opt;
+   private final Path lib;
+   private final Path conf;
+   private final Path log;
+   private final Path bin;
+
+   private Configuration defaultConfig;
+
+   public FlinkDistribution() {
+   final String distDirProperty = System.getProperty("distDir");
+   if (distDirProperty == null) {
+   Assert.fail("The distDir property was not set. You can 
set it when running maven via -DdistDir= .");
+   }
+   final Path flinkDir = Paths.get(distDirProperty);
+   bin = flinkDir.resolve("bin");
+   opt = flinkDir.resolve("opt");
+   lib = flinkDir.resolve("lib");
+   conf = flinkDir.resolve("conf");
+   log = flinkDir.resolve("log");
+   }
+
+   @Override
+   protected void before() throws IOException {
+   defaultConfig = new 
UnmodifiableConfiguration(GlobalConfiguration.loadConfiguration(conf.toAbsolutePath().toString()));
+   final Path originalConfig = conf.resolve(FLINK_CONF_YAML);
+   final Path backupConfig = conf.resolve(FLINK_CONF_YAML_BACKUP);
+   Files.copy(originalConfig, backupConfig);
+   filesToDelete.add(new AutoClosablePath(backupConfig));
+   }
+
+   @Override
+   protected void after() {
+   try {
+   stopFlinkCluster();
+   } catch (IOException e) {
+   LOG.error("Failure while shutting down Flink cluster.", 
e);
+   }
+
+   final Path originalConfig = conf.resolve(FLINK_CONF_YAML);
+   final Path backupConfig = conf.resolve(FLINK_CONF_YAML_BACKUP);
+
+   try {

[GitHub] zentol commented on a change in pull request #7003: [FLINK-10633][prometheus] Add E2E test

2018-11-02 Thread GitBox
zentol commented on a change in pull request #7003: [FLINK-10633][prometheus] 
Add E2E test
URL: https://github.com/apache/flink/pull/7003#discussion_r230418196
 
 

 ##
 File path: 
flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/FlinkDistribution.java
 ##
 @@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tests.util;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.UnmodifiableConfiguration;
+import org.apache.flink.util.ExceptionUtils;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.Response;
+import org.junit.Assert;
+import org.junit.rules.ExternalResource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardCopyOption;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * A wrapper around a Flink distribution.
+ */
+public final class FlinkDistribution extends ExternalResource {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(FlinkDistribution.class);
+
+   private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+   private final List filesToDelete = new ArrayList<>(4);
+
+   private static final Path FLINK_CONF_YAML = 
Paths.get("flink-conf.yaml");
+   private static final Path FLINK_CONF_YAML_BACKUP = 
Paths.get("flink-conf.yaml.bak");
+
+   private final Path opt;
+   private final Path lib;
+   private final Path conf;
+   private final Path log;
+   private final Path bin;
+
+   private Configuration defaultConfig;
+
+   public FlinkDistribution() {
+   final String distDirProperty = System.getProperty("distDir");
+   if (distDirProperty == null) {
+   Assert.fail("The distDir property was not set. You can 
set it when running maven via -DdistDir= .");
+   }
+   final Path flinkDir = Paths.get(distDirProperty);
+   bin = flinkDir.resolve("bin");
+   opt = flinkDir.resolve("opt");
+   lib = flinkDir.resolve("lib");
+   conf = flinkDir.resolve("conf");
+   log = flinkDir.resolve("log");
+   }
+
+   @Override
+   protected void before() throws IOException {
+   defaultConfig = new 
UnmodifiableConfiguration(GlobalConfiguration.loadConfiguration(conf.toAbsolutePath().toString()));
+   final Path originalConfig = conf.resolve(FLINK_CONF_YAML);
+   final Path backupConfig = conf.resolve(FLINK_CONF_YAML_BACKUP);
+   Files.copy(originalConfig, backupConfig);
+   filesToDelete.add(new AutoClosablePath(backupConfig));
+   }
+
+   @Override
+   protected void after() {
+   try {
+   stopFlinkCluster();
+   } catch (IOException e) {
+   LOG.error("Failure while shutting down Flink cluster.", 
e);
+   }
+
+   final Path originalConfig = conf.resolve(FLINK_CONF_YAML);
+   final Path backupConfig = conf.resolve(FLINK_CONF_YAML_BACKUP);
+
+   try {
+   Files.move(backupConfig, originalConfig, 
StandardCopyOption.REPLACE_EXISTING);
+   } catch (IOException e) {
+   LOG.error("Failed to restore flink-conf.yaml", e);
+   }
+
+  

[jira] [Commented] (FLINK-10633) End-to-end test: Prometheus reporter

2018-11-02 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10633?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16673283#comment-16673283
 ] 

ASF GitHub Bot commented on FLINK-10633:


zentol commented on a change in pull request #7003: [FLINK-10633][prometheus] 
Add E2E test
URL: https://github.com/apache/flink/pull/7003#discussion_r230417575
 
 

 ##
 File path: 
flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/FlinkDistribution.java
 ##
 @@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tests.util;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.UnmodifiableConfiguration;
+import org.apache.flink.util.ExceptionUtils;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.Response;
+import org.junit.Assert;
+import org.junit.rules.ExternalResource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardCopyOption;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * A wrapper around a Flink distribution.
+ */
+public final class FlinkDistribution extends ExternalResource {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(FlinkDistribution.class);
+
+   private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+   private final List filesToDelete = new ArrayList<>(4);
+
+   private static final Path FLINK_CONF_YAML = 
Paths.get("flink-conf.yaml");
+   private static final Path FLINK_CONF_YAML_BACKUP = 
Paths.get("flink-conf.yaml.bak");
+
+   private final Path opt;
+   private final Path lib;
+   private final Path conf;
+   private final Path log;
+   private final Path bin;
+
+   private Configuration defaultConfig;
+
+   public FlinkDistribution() {
+   final String distDirProperty = System.getProperty("distDir");
+   if (distDirProperty == null) {
+   Assert.fail("The distDir property was not set. You can 
set it when running maven via -DdistDir= .");
+   }
+   final Path flinkDir = Paths.get(distDirProperty);
+   bin = flinkDir.resolve("bin");
+   opt = flinkDir.resolve("opt");
+   lib = flinkDir.resolve("lib");
+   conf = flinkDir.resolve("conf");
+   log = flinkDir.resolve("log");
+   }
+
+   @Override
+   protected void before() throws IOException {
+   defaultConfig = new 
UnmodifiableConfiguration(GlobalConfiguration.loadConfiguration(conf.toAbsolutePath().toString()));
+   final Path originalConfig = conf.resolve(FLINK_CONF_YAML);
+   final Path backupConfig = conf.resolve(FLINK_CONF_YAML_BACKUP);
+   Files.copy(originalConfig, backupConfig);
+   filesToDelete.add(new AutoClosablePath(backupConfig));
+   }
+
+   @Override
+   protected void after() {
+   try {
+   stopFlinkCluster();
+   } catch (IOException e) {
+   LOG.error("Failure while shutting down Flink cluster.", 
e);
+   }
+
+   final Path originalConfig = conf.resolve(FLINK_CONF_YAML);
+   final Path backupConfig = conf.resolve(FLINK_CONF_YAML_BACKUP);
+
+   try {

[GitHub] zentol commented on a change in pull request #7003: [FLINK-10633][prometheus] Add E2E test

2018-11-02 Thread GitBox
zentol commented on a change in pull request #7003: [FLINK-10633][prometheus] 
Add E2E test
URL: https://github.com/apache/flink/pull/7003#discussion_r230417575
 
 

 ##
 File path: 
flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/FlinkDistribution.java
 ##
 @@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tests.util;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.UnmodifiableConfiguration;
+import org.apache.flink.util.ExceptionUtils;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.Response;
+import org.junit.Assert;
+import org.junit.rules.ExternalResource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardCopyOption;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * A wrapper around a Flink distribution.
+ */
+public final class FlinkDistribution extends ExternalResource {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(FlinkDistribution.class);
+
+   private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+   private final List filesToDelete = new ArrayList<>(4);
+
+   private static final Path FLINK_CONF_YAML = 
Paths.get("flink-conf.yaml");
+   private static final Path FLINK_CONF_YAML_BACKUP = 
Paths.get("flink-conf.yaml.bak");
+
+   private final Path opt;
+   private final Path lib;
+   private final Path conf;
+   private final Path log;
+   private final Path bin;
+
+   private Configuration defaultConfig;
+
+   public FlinkDistribution() {
+   final String distDirProperty = System.getProperty("distDir");
+   if (distDirProperty == null) {
+   Assert.fail("The distDir property was not set. You can 
set it when running maven via -DdistDir= .");
+   }
+   final Path flinkDir = Paths.get(distDirProperty);
+   bin = flinkDir.resolve("bin");
+   opt = flinkDir.resolve("opt");
+   lib = flinkDir.resolve("lib");
+   conf = flinkDir.resolve("conf");
+   log = flinkDir.resolve("log");
+   }
+
+   @Override
+   protected void before() throws IOException {
+   defaultConfig = new 
UnmodifiableConfiguration(GlobalConfiguration.loadConfiguration(conf.toAbsolutePath().toString()));
+   final Path originalConfig = conf.resolve(FLINK_CONF_YAML);
+   final Path backupConfig = conf.resolve(FLINK_CONF_YAML_BACKUP);
+   Files.copy(originalConfig, backupConfig);
+   filesToDelete.add(new AutoClosablePath(backupConfig));
+   }
+
+   @Override
+   protected void after() {
+   try {
+   stopFlinkCluster();
+   } catch (IOException e) {
+   LOG.error("Failure while shutting down Flink cluster.", 
e);
+   }
+
+   final Path originalConfig = conf.resolve(FLINK_CONF_YAML);
+   final Path backupConfig = conf.resolve(FLINK_CONF_YAML_BACKUP);
+
+   try {
+   Files.move(backupConfig, originalConfig, 
StandardCopyOption.REPLACE_EXISTING);
+   } catch (IOException e) {
+   LOG.error("Failed to restore flink-conf.yaml", e);
+   }
+
+  

[jira] [Commented] (FLINK-10633) End-to-end test: Prometheus reporter

2018-11-02 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10633?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16673281#comment-16673281
 ] 

ASF GitHub Bot commented on FLINK-10633:


zentol commented on a change in pull request #7003: [FLINK-10633][prometheus] 
Add E2E test
URL: https://github.com/apache/flink/pull/7003#discussion_r230417518
 
 

 ##
 File path: 
flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/FlinkDistribution.java
 ##
 @@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tests.util;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.UnmodifiableConfiguration;
+import org.apache.flink.util.ExceptionUtils;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.Response;
+import org.junit.Assert;
+import org.junit.rules.ExternalResource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardCopyOption;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * A wrapper around a Flink distribution.
+ */
+public final class FlinkDistribution extends ExternalResource {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(FlinkDistribution.class);
+
+   private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+   private final List filesToDelete = new ArrayList<>(4);
+
+   private static final Path FLINK_CONF_YAML = 
Paths.get("flink-conf.yaml");
+   private static final Path FLINK_CONF_YAML_BACKUP = 
Paths.get("flink-conf.yaml.bak");
+
+   private final Path opt;
+   private final Path lib;
+   private final Path conf;
+   private final Path log;
+   private final Path bin;
+
+   private Configuration defaultConfig;
+
+   public FlinkDistribution() {
+   final String distDirProperty = System.getProperty("distDir");
+   if (distDirProperty == null) {
+   Assert.fail("The distDir property was not set. You can 
set it when running maven via -DdistDir= .");
+   }
+   final Path flinkDir = Paths.get(distDirProperty);
+   bin = flinkDir.resolve("bin");
+   opt = flinkDir.resolve("opt");
+   lib = flinkDir.resolve("lib");
+   conf = flinkDir.resolve("conf");
+   log = flinkDir.resolve("log");
+   }
+
+   @Override
+   protected void before() throws IOException {
+   defaultConfig = new 
UnmodifiableConfiguration(GlobalConfiguration.loadConfiguration(conf.toAbsolutePath().toString()));
+   final Path originalConfig = conf.resolve(FLINK_CONF_YAML);
+   final Path backupConfig = conf.resolve(FLINK_CONF_YAML_BACKUP);
+   Files.copy(originalConfig, backupConfig);
+   filesToDelete.add(new AutoClosablePath(backupConfig));
+   }
+
+   @Override
+   protected void after() {
+   try {
+   stopFlinkCluster();
+   } catch (IOException e) {
+   LOG.error("Failure while shutting down Flink cluster.", 
e);
+   }
+
+   final Path originalConfig = conf.resolve(FLINK_CONF_YAML);
+   final Path backupConfig = conf.resolve(FLINK_CONF_YAML_BACKUP);
+
+   try {

[GitHub] zentol commented on a change in pull request #7003: [FLINK-10633][prometheus] Add E2E test

2018-11-02 Thread GitBox
zentol commented on a change in pull request #7003: [FLINK-10633][prometheus] 
Add E2E test
URL: https://github.com/apache/flink/pull/7003#discussion_r230417518
 
 

 ##
 File path: 
flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/FlinkDistribution.java
 ##
 @@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tests.util;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.UnmodifiableConfiguration;
+import org.apache.flink.util.ExceptionUtils;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.Response;
+import org.junit.Assert;
+import org.junit.rules.ExternalResource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardCopyOption;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * A wrapper around a Flink distribution.
+ */
+public final class FlinkDistribution extends ExternalResource {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(FlinkDistribution.class);
+
+   private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+   private final List filesToDelete = new ArrayList<>(4);
+
+   private static final Path FLINK_CONF_YAML = 
Paths.get("flink-conf.yaml");
+   private static final Path FLINK_CONF_YAML_BACKUP = 
Paths.get("flink-conf.yaml.bak");
+
+   private final Path opt;
+   private final Path lib;
+   private final Path conf;
+   private final Path log;
+   private final Path bin;
+
+   private Configuration defaultConfig;
+
+   public FlinkDistribution() {
+   final String distDirProperty = System.getProperty("distDir");
+   if (distDirProperty == null) {
+   Assert.fail("The distDir property was not set. You can 
set it when running maven via -DdistDir= .");
+   }
+   final Path flinkDir = Paths.get(distDirProperty);
+   bin = flinkDir.resolve("bin");
+   opt = flinkDir.resolve("opt");
+   lib = flinkDir.resolve("lib");
+   conf = flinkDir.resolve("conf");
+   log = flinkDir.resolve("log");
+   }
+
+   @Override
+   protected void before() throws IOException {
+   defaultConfig = new 
UnmodifiableConfiguration(GlobalConfiguration.loadConfiguration(conf.toAbsolutePath().toString()));
+   final Path originalConfig = conf.resolve(FLINK_CONF_YAML);
+   final Path backupConfig = conf.resolve(FLINK_CONF_YAML_BACKUP);
+   Files.copy(originalConfig, backupConfig);
+   filesToDelete.add(new AutoClosablePath(backupConfig));
+   }
+
+   @Override
+   protected void after() {
+   try {
+   stopFlinkCluster();
+   } catch (IOException e) {
+   LOG.error("Failure while shutting down Flink cluster.", 
e);
+   }
+
+   final Path originalConfig = conf.resolve(FLINK_CONF_YAML);
+   final Path backupConfig = conf.resolve(FLINK_CONF_YAML_BACKUP);
+
+   try {
+   Files.move(backupConfig, originalConfig, 
StandardCopyOption.REPLACE_EXISTING);
+   } catch (IOException e) {
+   LOG.error("Failed to restore flink-conf.yaml", e);
+   }
+
+  

[jira] [Commented] (FLINK-10633) End-to-end test: Prometheus reporter

2018-11-02 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10633?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16673279#comment-16673279
 ] 

ASF GitHub Bot commented on FLINK-10633:


zentol commented on a change in pull request #7003: [FLINK-10633][prometheus] 
Add E2E test
URL: https://github.com/apache/flink/pull/7003#discussion_r230416606
 
 

 ##
 File path: 
flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/FlinkDistribution.java
 ##
 @@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tests.util;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.UnmodifiableConfiguration;
+import org.apache.flink.util.ExceptionUtils;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.Response;
+import org.junit.Assert;
+import org.junit.rules.ExternalResource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardCopyOption;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * A wrapper around a Flink distribution.
+ */
+public final class FlinkDistribution extends ExternalResource {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(FlinkDistribution.class);
+
+   private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+   private final List filesToDelete = new ArrayList<>(4);
+
+   private static final Path FLINK_CONF_YAML = 
Paths.get("flink-conf.yaml");
+   private static final Path FLINK_CONF_YAML_BACKUP = 
Paths.get("flink-conf.yaml.bak");
+
+   private final Path opt;
+   private final Path lib;
+   private final Path conf;
+   private final Path log;
+   private final Path bin;
+
+   private Configuration defaultConfig;
+
+   public FlinkDistribution() {
+   final String distDirProperty = System.getProperty("distDir");
+   if (distDirProperty == null) {
+   Assert.fail("The distDir property was not set. You can 
set it when running maven via -DdistDir= .");
+   }
+   final Path flinkDir = Paths.get(distDirProperty);
+   bin = flinkDir.resolve("bin");
+   opt = flinkDir.resolve("opt");
+   lib = flinkDir.resolve("lib");
+   conf = flinkDir.resolve("conf");
+   log = flinkDir.resolve("log");
+   }
+
+   @Override
+   protected void before() throws IOException {
+   defaultConfig = new 
UnmodifiableConfiguration(GlobalConfiguration.loadConfiguration(conf.toAbsolutePath().toString()));
+   final Path originalConfig = conf.resolve(FLINK_CONF_YAML);
 
 Review comment:
   There's no use-case for multiple backups. The backup only exists to reset 
flink-dist to the state before the test.
   
   Tests can already setup multiple configs for separate clusters by calling 
`appendConfiguration` multiple times.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> End-to-end test: Prometheus reporter
> 
>
>

[GitHub] zentol commented on a change in pull request #7003: [FLINK-10633][prometheus] Add E2E test

2018-11-02 Thread GitBox
zentol commented on a change in pull request #7003: [FLINK-10633][prometheus] 
Add E2E test
URL: https://github.com/apache/flink/pull/7003#discussion_r230416606
 
 

 ##
 File path: 
flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/FlinkDistribution.java
 ##
 @@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tests.util;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.UnmodifiableConfiguration;
+import org.apache.flink.util.ExceptionUtils;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.Response;
+import org.junit.Assert;
+import org.junit.rules.ExternalResource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardCopyOption;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * A wrapper around a Flink distribution.
+ */
+public final class FlinkDistribution extends ExternalResource {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(FlinkDistribution.class);
+
+   private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+   private final List filesToDelete = new ArrayList<>(4);
+
+   private static final Path FLINK_CONF_YAML = 
Paths.get("flink-conf.yaml");
+   private static final Path FLINK_CONF_YAML_BACKUP = 
Paths.get("flink-conf.yaml.bak");
+
+   private final Path opt;
+   private final Path lib;
+   private final Path conf;
+   private final Path log;
+   private final Path bin;
+
+   private Configuration defaultConfig;
+
+   public FlinkDistribution() {
+   final String distDirProperty = System.getProperty("distDir");
+   if (distDirProperty == null) {
+   Assert.fail("The distDir property was not set. You can 
set it when running maven via -DdistDir= .");
+   }
+   final Path flinkDir = Paths.get(distDirProperty);
+   bin = flinkDir.resolve("bin");
+   opt = flinkDir.resolve("opt");
+   lib = flinkDir.resolve("lib");
+   conf = flinkDir.resolve("conf");
+   log = flinkDir.resolve("log");
+   }
+
+   @Override
+   protected void before() throws IOException {
+   defaultConfig = new 
UnmodifiableConfiguration(GlobalConfiguration.loadConfiguration(conf.toAbsolutePath().toString()));
+   final Path originalConfig = conf.resolve(FLINK_CONF_YAML);
 
 Review comment:
   There's no use-case for multiple backups. The backup only exists to reset 
flink-dist to the state before the test.
   
   Tests can already setup multiple configs for separate clusters by calling 
`appendConfiguration` multiple times.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann edited a comment on issue #6998: [FLINK-10757] [tests] Avoid port conflicts in AbstractTaskManagerProc…

2018-11-02 Thread GitBox
tillrohrmann edited a comment on issue #6998: [FLINK-10757] [tests] Avoid port 
conflicts in AbstractTaskManagerProc…
URL: https://github.com/apache/flink/pull/6998#issuecomment-435419268
 
 
   There are 3 more builds in front of this one. It should be done in 2 hours.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10757) TaskManagerProcessFailureStreamingRecoveryITCase failed to initialize the cluster entrypoint StandaloneSessionClusterEntrypoint.

2018-11-02 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10757?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16673272#comment-16673272
 ] 

ASF GitHub Bot commented on FLINK-10757:


tillrohrmann commented on issue #6998: [FLINK-10757] [tests] Avoid port 
conflicts in AbstractTaskManagerProc…
URL: https://github.com/apache/flink/pull/6998#issuecomment-435419268
 
 
   There are 3 more builds in front of this one. It should be done in 4 hours.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> TaskManagerProcessFailureStreamingRecoveryITCase failed to initialize the 
> cluster entrypoint StandaloneSessionClusterEntrypoint.
> 
>
> Key: FLINK-10757
> URL: https://issues.apache.org/jira/browse/FLINK-10757
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 1.6.2, 1.7.0
>Reporter: Bowen Li
>Assignee: TisonKun
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> {code:java}
> Failed tests: 
>   ...
>  
> TaskManagerProcessFailureStreamingRecoveryITCase>AbstractTaskManagerProcessFailureRecoveryTest.testTaskManagerProcessFailure:223
>  Failed to initialize the cluster entrypoint 
> StandaloneSessionClusterEntrypoint.
> Tests in error: 
> {code}
> https://travis-ci.org/apache/flink/jobs/449439623



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] tillrohrmann commented on issue #6998: [FLINK-10757] [tests] Avoid port conflicts in AbstractTaskManagerProc…

2018-11-02 Thread GitBox
tillrohrmann commented on issue #6998: [FLINK-10757] [tests] Avoid port 
conflicts in AbstractTaskManagerProc…
URL: https://github.com/apache/flink/pull/6998#issuecomment-435419268
 
 
   There are 3 more builds in front of this one. It should be done in 4 hours.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10757) TaskManagerProcessFailureStreamingRecoveryITCase failed to initialize the cluster entrypoint StandaloneSessionClusterEntrypoint.

2018-11-02 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10757?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16673274#comment-16673274
 ] 

ASF GitHub Bot commented on FLINK-10757:


tillrohrmann edited a comment on issue #6998: [FLINK-10757] [tests] Avoid port 
conflicts in AbstractTaskManagerProc…
URL: https://github.com/apache/flink/pull/6998#issuecomment-435419268
 
 
   There are 3 more builds in front of this one. It should be done in 2 hours.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> TaskManagerProcessFailureStreamingRecoveryITCase failed to initialize the 
> cluster entrypoint StandaloneSessionClusterEntrypoint.
> 
>
> Key: FLINK-10757
> URL: https://issues.apache.org/jira/browse/FLINK-10757
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 1.6.2, 1.7.0
>Reporter: Bowen Li
>Assignee: TisonKun
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> {code:java}
> Failed tests: 
>   ...
>  
> TaskManagerProcessFailureStreamingRecoveryITCase>AbstractTaskManagerProcessFailureRecoveryTest.testTaskManagerProcessFailure:223
>  Failed to initialize the cluster entrypoint 
> StandaloneSessionClusterEntrypoint.
> Tests in error: 
> {code}
> https://travis-ci.org/apache/flink/jobs/449439623



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10633) End-to-end test: Prometheus reporter

2018-11-02 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10633?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16673269#comment-16673269
 ] 

ASF GitHub Bot commented on FLINK-10633:


zentol commented on a change in pull request #7003: [FLINK-10633][prometheus] 
Add E2E test
URL: https://github.com/apache/flink/pull/7003#discussion_r230415501
 
 

 ##
 File path: 
flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/FlinkDistribution.java
 ##
 @@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tests.util;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.UnmodifiableConfiguration;
+import org.apache.flink.util.ExceptionUtils;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.Response;
+import org.junit.Assert;
+import org.junit.rules.ExternalResource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardCopyOption;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * A wrapper around a Flink distribution.
+ */
+public final class FlinkDistribution extends ExternalResource {
 
 Review comment:
   eh, I prefer `FlinkDistributionResource` since this class should really only 
provide primitives operations that map closely to `flink-dist`. Anything more 
complex beyond that belongs into a separate class.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> End-to-end test: Prometheus reporter
> 
>
> Key: FLINK-10633
> URL: https://issues.apache.org/jira/browse/FLINK-10633
> Project: Flink
>  Issue Type: Sub-task
>  Components: E2E Tests
>Affects Versions: 1.7.0
>Reporter: Till Rohrmann
>Assignee: Chesnay Schepler
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Add an end-to-end test using the {{PrometheusReporter}} to verify that all 
> metrics are properly reported. Additionally verify that the newly introduce 
> RocksDB metrics are accessible (see FLINK-10423).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] zentol commented on a change in pull request #7003: [FLINK-10633][prometheus] Add E2E test

2018-11-02 Thread GitBox
zentol commented on a change in pull request #7003: [FLINK-10633][prometheus] 
Add E2E test
URL: https://github.com/apache/flink/pull/7003#discussion_r230415501
 
 

 ##
 File path: 
flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/FlinkDistribution.java
 ##
 @@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tests.util;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.UnmodifiableConfiguration;
+import org.apache.flink.util.ExceptionUtils;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.Response;
+import org.junit.Assert;
+import org.junit.rules.ExternalResource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardCopyOption;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * A wrapper around a Flink distribution.
+ */
+public final class FlinkDistribution extends ExternalResource {
 
 Review comment:
   eh, I prefer `FlinkDistributionResource` since this class should really only 
provide primitives operations that map closely to `flink-dist`. Anything more 
complex beyond that belongs into a separate class.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zentol commented on a change in pull request #7003: [FLINK-10633][prometheus] Add E2E test

2018-11-02 Thread GitBox
zentol commented on a change in pull request #7003: [FLINK-10633][prometheus] 
Add E2E test
URL: https://github.com/apache/flink/pull/7003#discussion_r230414679
 
 

 ##
 File path: 
flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/CommandLineWrapper.java
 ##
 @@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tests.util;
+
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Utility class for setting up command-line tool usages in a readable fashion.
+ */
+public enum CommandLineWrapper {
+   ;
+
+   public static WGetBuilder wget(String url) {
+   return new WGetBuilder(url);
+   }
+
+   /**
+* Wrapper around wget used for downloading files.
+*/
+   public static final class WGetBuilder {
+
+   private final String url;
+   private Path targetDir;
+
+   WGetBuilder(String url) {
+   this.url = url;
+   }
+
+   public WGetBuilder targetDir(Path dir) {
 
 Review comment:
   it's harder to extend as with a variety of options you'll end up with a 
bloated constructor.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10633) End-to-end test: Prometheus reporter

2018-11-02 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10633?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16673268#comment-16673268
 ] 

ASF GitHub Bot commented on FLINK-10633:


zentol commented on a change in pull request #7003: [FLINK-10633][prometheus] 
Add E2E test
URL: https://github.com/apache/flink/pull/7003#discussion_r230414679
 
 

 ##
 File path: 
flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/CommandLineWrapper.java
 ##
 @@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tests.util;
+
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Utility class for setting up command-line tool usages in a readable fashion.
+ */
+public enum CommandLineWrapper {
+   ;
+
+   public static WGetBuilder wget(String url) {
+   return new WGetBuilder(url);
+   }
+
+   /**
+* Wrapper around wget used for downloading files.
+*/
+   public static final class WGetBuilder {
+
+   private final String url;
+   private Path targetDir;
+
+   WGetBuilder(String url) {
+   this.url = url;
+   }
+
+   public WGetBuilder targetDir(Path dir) {
 
 Review comment:
   it's harder to extend as with a variety of options you'll end up with a 
bloated constructor.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> End-to-end test: Prometheus reporter
> 
>
> Key: FLINK-10633
> URL: https://issues.apache.org/jira/browse/FLINK-10633
> Project: Flink
>  Issue Type: Sub-task
>  Components: E2E Tests
>Affects Versions: 1.7.0
>Reporter: Till Rohrmann
>Assignee: Chesnay Schepler
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Add an end-to-end test using the {{PrometheusReporter}} to verify that all 
> metrics are properly reported. Additionally verify that the newly introduce 
> RocksDB metrics are accessible (see FLINK-10423).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10633) End-to-end test: Prometheus reporter

2018-11-02 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10633?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16673245#comment-16673245
 ] 

ASF GitHub Bot commented on FLINK-10633:


kl0u commented on a change in pull request #7003: [FLINK-10633][prometheus] Add 
E2E test
URL: https://github.com/apache/flink/pull/7003#discussion_r230410636
 
 

 ##
 File path: 
flink-end-to-end-tests/flink-metrics-reporter-prometheus-test/src/test/java/org/apache/flink/metrics/prometheus/tests/PrometheusReporterEndToEndITCase.java
 ##
 @@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.prometheus.tests;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.prometheus.PrometheusReporter;
+import org.apache.flink.tests.util.AutoClosableProcess;
+import org.apache.flink.tests.util.CommandLineWrapper;
+import org.apache.flink.tests.util.FlinkDistribution;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.TestLogger;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.Response;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.time.Duration;
+import java.util.List;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.tests.util.AutoClosableProcess.runBlocking;
+import static org.apache.flink.tests.util.AutoClosableProcess.runNonBlocking;
+
+/**
+ * End-to-end test for the PrometheusReporter.
+ */
+public class PrometheusReporterEndToEndITCase extends TestLogger {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(PrometheusReporterEndToEndITCase.class);
+
+   private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+   private static final String PROMETHEUS_FILE_NAME = 
"prometheus-2.4.3.linux-amd64";
+
+   private static final Pattern LOG_REPORTER_PORT_PATTERN = 
Pattern.compile(".*Started PrometheusReporter HTTP server on port ([0-9]+).*");
+
+   @Rule
+   public final FlinkDistribution dist = new FlinkDistribution();
+
+   @Rule
+   public final TemporaryFolder tmp = new TemporaryFolder();
+
+   @Test
+   public void testReporter() throws Exception {
+   LOG.info("starting test");
+   dist.copyOptJarsToLib("flink-metrics-prometheus");
+
+   final Configuration config = new Configuration();
+   config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"prom." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, 
PrometheusReporter.class.getCanonicalName());
+   config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"prom.port", "9000-9100");
+
+   dist.appendConfiguration(config);
+
+   final Path tmpPrometheusDir = 
tmp.newFolder().toPath().resolve("prometheus");
+   final Path prometheusArchive = 
tmpPrometheusDir.resolve(PROMETHEUS_FILE_NAME + ".tar.gz");
+   final Path prometheusBinDir = 
tmpPrometheusDir.resolve(PROMETHEUS_FILE_NAME);
+   final Path prometheusConfig = 
prometheusBinDir.resolve("prometheus.yml");
+   final Path prometheusBinary = 
prometheusBinDir.resolve("prometheus");
+   Files.createDirectory(tmpPrometheusDir);
+
+   runBlocking(
+   "Download of Prometheus",
+   Duration.ofMinutes(5),
+   CommandLineWrapper
+   
.wget("https://github.com/prometheus/prometheus/releases/download/v2.4.3/; + 
prometheusArchive.getFileName())
 
 Review comment:
   Given that the version is already in the path, does it make sense to move 
the whole path or the part that includes the version to a class variable?


This is an 

[jira] [Commented] (FLINK-10633) End-to-end test: Prometheus reporter

2018-11-02 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10633?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16673242#comment-16673242
 ] 

ASF GitHub Bot commented on FLINK-10633:


kl0u commented on a change in pull request #7003: [FLINK-10633][prometheus] Add 
E2E test
URL: https://github.com/apache/flink/pull/7003#discussion_r230401483
 
 

 ##
 File path: 
flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/AutoClosableProcess.java
 ##
 @@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tests.util;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * Utility class to terminate a given {@link Process} when exiting a 
try-with-resources statement.
+ */
+public class AutoClosableProcess implements AutoCloseable {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(AutoClosableProcess.class);
+
+   private final Process process;
+
+   public AutoClosableProcess(Process process) {
+   this.process = process;
 
 Review comment:
   Add `null` checks.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> End-to-end test: Prometheus reporter
> 
>
> Key: FLINK-10633
> URL: https://issues.apache.org/jira/browse/FLINK-10633
> Project: Flink
>  Issue Type: Sub-task
>  Components: E2E Tests
>Affects Versions: 1.7.0
>Reporter: Till Rohrmann
>Assignee: Chesnay Schepler
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Add an end-to-end test using the {{PrometheusReporter}} to verify that all 
> metrics are properly reported. Additionally verify that the newly introduce 
> RocksDB metrics are accessible (see FLINK-10423).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10633) End-to-end test: Prometheus reporter

2018-11-02 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10633?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16673249#comment-16673249
 ] 

ASF GitHub Bot commented on FLINK-10633:


kl0u commented on a change in pull request #7003: [FLINK-10633][prometheus] Add 
E2E test
URL: https://github.com/apache/flink/pull/7003#discussion_r230407576
 
 

 ##
 File path: 
flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/FlinkDistribution.java
 ##
 @@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tests.util;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.UnmodifiableConfiguration;
+import org.apache.flink.util.ExceptionUtils;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.Response;
+import org.junit.Assert;
+import org.junit.rules.ExternalResource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardCopyOption;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * A wrapper around a Flink distribution.
+ */
+public final class FlinkDistribution extends ExternalResource {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(FlinkDistribution.class);
+
+   private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+   private final List filesToDelete = new ArrayList<>(4);
+
+   private static final Path FLINK_CONF_YAML = 
Paths.get("flink-conf.yaml");
+   private static final Path FLINK_CONF_YAML_BACKUP = 
Paths.get("flink-conf.yaml.bak");
+
+   private final Path opt;
+   private final Path lib;
+   private final Path conf;
+   private final Path log;
+   private final Path bin;
+
+   private Configuration defaultConfig;
+
+   public FlinkDistribution() {
+   final String distDirProperty = System.getProperty("distDir");
+   if (distDirProperty == null) {
+   Assert.fail("The distDir property was not set. You can 
set it when running maven via -DdistDir= .");
+   }
+   final Path flinkDir = Paths.get(distDirProperty);
+   bin = flinkDir.resolve("bin");
+   opt = flinkDir.resolve("opt");
+   lib = flinkDir.resolve("lib");
+   conf = flinkDir.resolve("conf");
+   log = flinkDir.resolve("log");
+   }
+
+   @Override
+   protected void before() throws IOException {
+   defaultConfig = new 
UnmodifiableConfiguration(GlobalConfiguration.loadConfiguration(conf.toAbsolutePath().toString()));
+   final Path originalConfig = conf.resolve(FLINK_CONF_YAML);
 
 Review comment:
   I would put the line 95-98 to a separate method (e.g. 
`backupOriginal/DefaultConfig`) and call it also in the `before()`, because 
this method can be also useful on its own. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> End-to-end test: Prometheus reporter
> 
>
> Key: FLINK-10633
> URL: 

[jira] [Commented] (FLINK-10633) End-to-end test: Prometheus reporter

2018-11-02 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10633?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16673237#comment-16673237
 ] 

ASF GitHub Bot commented on FLINK-10633:


kl0u commented on a change in pull request #7003: [FLINK-10633][prometheus] Add 
E2E test
URL: https://github.com/apache/flink/pull/7003#discussion_r230401171
 
 

 ##
 File path: 
flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/AutoClosablePath.java
 ##
 @@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tests.util;
+
+import org.apache.flink.util.FileUtils;
+
+import java.io.IOException;
+import java.nio.file.Path;
+
+/**
+ * Utility class to delete a given {@link Path} when exiting a 
try-with-resources statement.
+ */
+public final class AutoClosablePath implements AutoCloseable {
+
+   private final Path file;
+
+   public AutoClosablePath(final Path file) {
+   this.file = file;
+   }
+
+   @Override
+   public void close() throws IOException {
+   FileUtils.deleteFileOrDirectory(file.toFile());
+   }
+
+   public Path getFile() {
 
 Review comment:
   This is not used anywhere, so why not removing it and putting it if a future 
test needs it?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> End-to-end test: Prometheus reporter
> 
>
> Key: FLINK-10633
> URL: https://issues.apache.org/jira/browse/FLINK-10633
> Project: Flink
>  Issue Type: Sub-task
>  Components: E2E Tests
>Affects Versions: 1.7.0
>Reporter: Till Rohrmann
>Assignee: Chesnay Schepler
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Add an end-to-end test using the {{PrometheusReporter}} to verify that all 
> metrics are properly reported. Additionally verify that the newly introduce 
> RocksDB metrics are accessible (see FLINK-10423).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10633) End-to-end test: Prometheus reporter

2018-11-02 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10633?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16673252#comment-16673252
 ] 

ASF GitHub Bot commented on FLINK-10633:


kl0u commented on a change in pull request #7003: [FLINK-10633][prometheus] Add 
E2E test
URL: https://github.com/apache/flink/pull/7003#discussion_r230404317
 
 

 ##
 File path: 
flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/FlinkDistribution.java
 ##
 @@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tests.util;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.UnmodifiableConfiguration;
+import org.apache.flink.util.ExceptionUtils;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.Response;
+import org.junit.Assert;
+import org.junit.rules.ExternalResource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardCopyOption;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * A wrapper around a Flink distribution.
+ */
+public final class FlinkDistribution extends ExternalResource {
 
 Review comment:
   This is like a `E2ETestHarness` more than a `FlinkDistribution` right?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> End-to-end test: Prometheus reporter
> 
>
> Key: FLINK-10633
> URL: https://issues.apache.org/jira/browse/FLINK-10633
> Project: Flink
>  Issue Type: Sub-task
>  Components: E2E Tests
>Affects Versions: 1.7.0
>Reporter: Till Rohrmann
>Assignee: Chesnay Schepler
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Add an end-to-end test using the {{PrometheusReporter}} to verify that all 
> metrics are properly reported. Additionally verify that the newly introduce 
> RocksDB metrics are accessible (see FLINK-10423).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


  1   2   3   4   5   >