[jira] [Created] (FLINK-10769) port InMemoryExternalCatalog to java
Bowen Li created FLINK-10769: Summary: port InMemoryExternalCatalog to java Key: FLINK-10769 URL: https://issues.apache.org/jira/browse/FLINK-10769 Project: Flink Issue Type: Sub-task Components: Table API SQL Reporter: Bowen Li Assignee: Bowen Li Fix For: 1.8.0 In the Flink-Hive integration design, we propose a new FlinkInMemoryCatalog (FLINK-10697) for production use. FlinkInMemoryCatalog will share some part with the existing InMemoryExternalCatalog, thus we need to make changes to InMemoryExternalCatalog. As we are moving away from Scala to Java, we should write all new code/feature in Java. Therefore, we will port InMemoryExternalCatalog to java first without any feature or behavior change. This is a pre-requisite for FLINK-10697 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-10768) Move external catalog related code from TableEnvironment to CatalogManager
[ https://issues.apache.org/jira/browse/FLINK-10768?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-10768: --- Labels: pull-request-available (was: ) > Move external catalog related code from TableEnvironment to CatalogManager > -- > > Key: FLINK-10768 > URL: https://issues.apache.org/jira/browse/FLINK-10768 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: Bowen Li >Assignee: Bowen Li >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > > Add a new CatalogManager class and port existing Calcite-directly-related > code from TableEnvironment into CatalogManager. > This is NOT a feature, but purely refactor, thus no new functions should be > introduced. It acts the pre-requisite for FLINK-10698 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10768) Move external catalog related code from TableEnvironment to CatalogManager
[ https://issues.apache.org/jira/browse/FLINK-10768?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16673922#comment-16673922 ] ASF GitHub Bot commented on FLINK-10768: bowenli86 opened a new pull request #7011: [FLINK-10768][Table & SQL] Move external catalog related code from TableEnvironment to CatalogManager URL: https://github.com/apache/flink/pull/7011 ## What is the purpose of the change In the design of Flink-Hive integration and external catalog enhancements, we propose using `CatalogManager` to manage all external catalogs and related aspects for a clearer separation. Currently all external catalogs are managed by `TableEnvironment`, which makes TableEnvironment bloated. Also, as discussed with @twalthr , since we are moving away from Scala, all new feature code will be in Java. This PR moves all EXISTING external catalogs related code from `TableEnvironment` to `CatalogManager`, and it's the initial step for `CatalogManager`. It blocks `[[FLINK-10698]Create CatalogManager class manages all external catalogs and temporary meta objects](https://issues.apache.org/jira/browse/FLINK-10698)` This PR is a PURE REFACTOR, only moving APIs around, NO change on the behaviors and NO new feature is added. ## Brief change log - added `CatalogManager` in Java - moved external catalog related code from `TableEnvironment` to `CatalogManager` - added unit tests in `CatalogManagerTest` ## Verifying this change - This change is already covered by existing tests, such as *TableEnvironmentTest*. - This change added tests and can be verified as follows: - Added unit tests in `CatalogManagerTest` ## Does this pull request potentially affect one of the following parts: none ## Documentation - Does this pull request introduce a new feature? (no) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Move external catalog related code from TableEnvironment to CatalogManager > -- > > Key: FLINK-10768 > URL: https://issues.apache.org/jira/browse/FLINK-10768 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: Bowen Li >Assignee: Bowen Li >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > > Add a new CatalogManager class and port existing Calcite-directly-related > code from TableEnvironment into CatalogManager. > This is NOT a feature, but purely refactor, thus no new functions should be > introduced. It acts the pre-requisite for FLINK-10698 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] bowenli86 opened a new pull request #7011: [FLINK-10768][Table & SQL] Move external catalog related code from TableEnvironment to CatalogManager
bowenli86 opened a new pull request #7011: [FLINK-10768][Table & SQL] Move external catalog related code from TableEnvironment to CatalogManager URL: https://github.com/apache/flink/pull/7011 ## What is the purpose of the change In the design of Flink-Hive integration and external catalog enhancements, we propose using `CatalogManager` to manage all external catalogs and related aspects for a clearer separation. Currently all external catalogs are managed by `TableEnvironment`, which makes TableEnvironment bloated. Also, as discussed with @twalthr , since we are moving away from Scala, all new feature code will be in Java. This PR moves all EXISTING external catalogs related code from `TableEnvironment` to `CatalogManager`, and it's the initial step for `CatalogManager`. It blocks `[[FLINK-10698]Create CatalogManager class manages all external catalogs and temporary meta objects](https://issues.apache.org/jira/browse/FLINK-10698)` This PR is a PURE REFACTOR, only moving APIs around, NO change on the behaviors and NO new feature is added. ## Brief change log - added `CatalogManager` in Java - moved external catalog related code from `TableEnvironment` to `CatalogManager` - added unit tests in `CatalogManagerTest` ## Verifying this change - This change is already covered by existing tests, such as *TableEnvironmentTest*. - This change added tests and can be verified as follows: - Added unit tests in `CatalogManagerTest` ## Does this pull request potentially affect one of the following parts: none ## Documentation - Does this pull request introduce a new feature? (no) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Comment Edited] (FLINK-10766) Add a link to flink-china.org in Flink website
[ https://issues.apache.org/jira/browse/FLINK-10766?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16673911#comment-16673911 ] Hequn Cheng edited comment on FLINK-10766 at 11/3/18 3:35 AM: -- [~jark] Hi, thanks for the proposal. It is a good idea! There are a lot of Flink enthusiasts in China. The translated documents and meetup information would be very helpful. was (Author: hequn8128): [~jark] Hi, thanks for the proposal. It is a good idea! There are a lot of Flink enthusiasts in China. The translated document and meetup informations would be very helpful. > Add a link to flink-china.org in Flink website > -- > > Key: FLINK-10766 > URL: https://issues.apache.org/jira/browse/FLINK-10766 > Project: Flink > Issue Type: Wish > Components: Project Website >Reporter: Jark Wu >Priority: Minor > > The Apache Flink China website has been reworked and published in the last > days, see more http://flink-china.org > The flink-china.org is a very popular website in China for flink users. Now > we included the latest translated documentation (v1.6) and use-cases and > meetup events in the website. > We hope to add a link to flink-china.org in the flink website > (flink.apache.org), so that we can help and attract more Chinese Flink users > and improve the community. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10766) Add a link to flink-china.org in Flink website
[ https://issues.apache.org/jira/browse/FLINK-10766?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16673911#comment-16673911 ] Hequn Cheng commented on FLINK-10766: - [~jark] Hi, thanks for the proposal. It is a good idea! There are a lot of Flink enthusiasts in China. The translated document and meetup informations would be very helpful. > Add a link to flink-china.org in Flink website > -- > > Key: FLINK-10766 > URL: https://issues.apache.org/jira/browse/FLINK-10766 > Project: Flink > Issue Type: Wish > Components: Project Website >Reporter: Jark Wu >Priority: Minor > > The Apache Flink China website has been reworked and published in the last > days, see more http://flink-china.org > The flink-china.org is a very popular website in China for flink users. Now > we included the latest translated documentation (v1.6) and use-cases and > meetup events in the website. > We hope to add a link to flink-china.org in the flink website > (flink.apache.org), so that we can help and attract more Chinese Flink users > and improve the community. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9172) Support external catalog factory that comes default with SQL-Client
[ https://issues.apache.org/jira/browse/FLINK-9172?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16673895#comment-16673895 ] Xuefu Zhang commented on FLINK-9172: Hi [~eronwright], Just curious about your progress on this. I asked because this aligns well with our effort in FLINK-10744, and like to move this forward along. Please let me know if any help is needed. Thanks. > Support external catalog factory that comes default with SQL-Client > --- > > Key: FLINK-9172 > URL: https://issues.apache.org/jira/browse/FLINK-9172 > Project: Flink > Issue Type: Sub-task > Components: SQL Client >Reporter: Rong Rong >Assignee: Eron Wright >Priority: Major > > It doesn't seem that the configuration (YAML) file allows specifications of > external catalogs currently. The request here is to add support for external > catalog specifications in YAML file. User should also be able to specify one > catalog is the default. > It will be great to have SQL-Client to support some external catalogs > out-of-the-box for SQL users to configure and utilize easily. I am currently > think of having an external catalog factory that spins up both streaming and > batch external catalog table sources and sinks. This could greatly unify and > provide easy access for SQL users. > The catalog-related configurations then need to be processed and passed to > TableEnvironment accordingly by calling relevant APIs. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-4372) Add ability to take savepoints from job manager web UI
[ https://issues.apache.org/jira/browse/FLINK-4372?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] vinoyang reassigned FLINK-4372: --- Assignee: vinoyang > Add ability to take savepoints from job manager web UI > -- > > Key: FLINK-4372 > URL: https://issues.apache.org/jira/browse/FLINK-4372 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing, Webfrontend >Reporter: Zhenzhong Xu >Assignee: vinoyang >Priority: Major > > subtask of FLINK-4336 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-10768) Move external catalog related code from TableEnvironment to CatalogManager
[ https://issues.apache.org/jira/browse/FLINK-10768?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bowen Li updated FLINK-10768: - Summary: Move external catalog related code from TableEnvironment to CatalogManager (was: Move Calcite related code from TableEnvironment to CatalogManager) > Move external catalog related code from TableEnvironment to CatalogManager > -- > > Key: FLINK-10768 > URL: https://issues.apache.org/jira/browse/FLINK-10768 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: Bowen Li >Assignee: Bowen Li >Priority: Major > Fix For: 1.8.0 > > > Add a new CatalogManager class and port existing Calcite-directly-related > code from TableEnvironment into CatalogManager. > This is NOT a feature, but purely refactor, thus no new functions should be > introduced. It acts the pre-requisite for FLINK-10698 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-10768) Move Calcite related code from TableEnvironment to CatalogManager
Bowen Li created FLINK-10768: Summary: Move Calcite related code from TableEnvironment to CatalogManager Key: FLINK-10768 URL: https://issues.apache.org/jira/browse/FLINK-10768 Project: Flink Issue Type: Sub-task Components: Table API SQL Reporter: Bowen Li Assignee: Bowen Li Fix For: 1.8.0 Add a new CatalogManager class and port existing Calcite-directly-related code from TableEnvironment into CatalogManager. This is NOT a feature, but purely refactor, thus no new functions should be introduced. It acts the pre-requisite for FLINK-10698 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-4372) Add ability to take savepoints from job manager web UI
[ https://issues.apache.org/jira/browse/FLINK-4372?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16673611#comment-16673611 ] Oleksandr Soldatov commented on FLINK-4372: --- Hello, I guess even more useful button will be `cancel with savepoint`, may be disabled if job got no savepoint configuration (or configured via dialog). > Add ability to take savepoints from job manager web UI > -- > > Key: FLINK-4372 > URL: https://issues.apache.org/jira/browse/FLINK-4372 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing, Webfrontend >Reporter: Zhenzhong Xu >Priority: Major > > subtask of FLINK-4336 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10736) Shaded Hadoop S3A end-to-end test failed on Travis
[ https://issues.apache.org/jira/browse/FLINK-10736?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16673512#comment-16673512 ] Stephan Ewen commented on FLINK-10736: -- I would suggest to change that test to use a constant test data file. That would save us the brittleness of relying on an S3 upload in a bash script and would save us form eventual consistency visibility issues. > Shaded Hadoop S3A end-to-end test failed on Travis > -- > > Key: FLINK-10736 > URL: https://issues.apache.org/jira/browse/FLINK-10736 > Project: Flink > Issue Type: Bug > Components: E2E Tests >Affects Versions: 1.7.0 >Reporter: Till Rohrmann >Priority: Critical > Labels: test-stability > Fix For: 1.7.0 > > > The {{Shaded Hadoop S3A end-to-end test}} failed on Travis because it could > not find a file stored on S3: > {code} > org.apache.flink.client.program.ProgramInvocationException: Job failed. > (JobID: f28270bedd943ed6b41548b60f5cea73) > at > org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:268) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:487) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:475) > at > org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:62) > at > org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:85) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529) > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427) > at > org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:813) > at > org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:287) > at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213) > at > org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050) > at > org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836) > at > org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126) > Caused by: org.apache.flink.runtime.client.JobExecutionException: Job > execution failed. > at > org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146) > at > org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:265) > ... 21 more > Caused by: java.io.IOException: Error opening the Input Split > s3://[secure]/flink-end-to-end-test-shaded-s3a [0,44]: No such file or > directory: s3://[secure]/flink-end-to-end-test-shaded-s3a > at > org.apache.flink.api.common.io.FileInputFormat.open(FileInputFormat.java:824) > at > org.apache.flink.api.common.io.DelimitedInputFormat.open(DelimitedInputFormat.java:470) > at > org.apache.flink.api.common.io.DelimitedInputFormat.open(DelimitedInputFormat.java:47) > at > org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:170) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.io.FileNotFoundException: No such file or directory: > s3://[secure]/flink-end-to-end-test-shaded-s3a > at > org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:2255) > at > org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:2149) > at > org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:2088) > at > org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.open(S3AFileSystem.java:699) > at > org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.FileSystem.open(FileSystem.java:950) > at >
[jira] [Commented] (FLINK-10732) force-shading 1.5.5 maven artifact was not released
[ https://issues.apache.org/jira/browse/FLINK-10732?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16673506#comment-16673506 ] Chesnay Schepler commented on FLINK-10732: -- Option 2 is what I'm shooting for, I'm just double-checking with INFRA that we can simply deploy it without any issues. (since this is done rarely) I've already linked the INFRA ticket. > force-shading 1.5.5 maven artifact was not released > --- > > Key: FLINK-10732 > URL: https://issues.apache.org/jira/browse/FLINK-10732 > Project: Flink > Issue Type: Bug >Affects Versions: 1.5.5 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Fix For: 1.5.5 > > > The 1.5.5 maven artifact for {{force-shading}} was not deployed. We have to > investigate whether other artifacts are missing as well and how to remedy the > problem, i.e. re-deploy the missing artifacts (which may or may not require a > separate vote). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10520) Job save points REST API fails unless parameters are specified
[ https://issues.apache.org/jira/browse/FLINK-10520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16673505#comment-16673505 ] Huide Yin commented on FLINK-10520: --- I wonder if `state.savepoints.dir:` specified as the default savpoint path in your flink-conf.yaml file? `target-directory` is must have for specify the target location for saving savepoint `cancel-job` is must since there's no separate `cacnel` request defined here: https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/rest_api.html > Job save points REST API fails unless parameters are specified > -- > > Key: FLINK-10520 > URL: https://issues.apache.org/jira/browse/FLINK-10520 > Project: Flink > Issue Type: Bug > Components: REST >Affects Versions: 1.6.1 >Reporter: Elias Levy >Assignee: Chesnay Schepler >Priority: Minor > > The new REST API POST endpoint, {{/jobs/:jobid/savepoints}}, returns an error > unless the request includes a body with all parameters ({{target-directory}} > and {{cancel-job}})), even thought the > [documentation|https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/runtime/rest/handler/job/savepoints/SavepointHandlers.html] > suggests these are optional. > If a POST request with no data is made, the response is a 400 status code > with the error message "Bad request received." > If the POST request submits an empty JSON object ( {} ), the response is a > 400 status code with the error message "Request did not match expected format > SavepointTriggerRequestBody." The same is true if only the > {{target-directory}} or {{cancel-job}} parameters are included. > As the system is configured with a default savepoint location, there > shouldn't be a need to include the parameter in the quest. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9920) BucketingSinkFaultToleranceITCase fails on travis
[ https://issues.apache.org/jira/browse/FLINK-9920?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16673482#comment-16673482 ] Bowen Li commented on FLINK-9920: - Still saw it. https://api.travis-ci.org/v3/job/449685897/log.txt > BucketingSinkFaultToleranceITCase fails on travis > - > > Key: FLINK-9920 > URL: https://issues.apache.org/jira/browse/FLINK-9920 > Project: Flink > Issue Type: Bug > Components: filesystem-connector >Affects Versions: 1.6.0, 1.7.0 >Reporter: Chesnay Schepler >Priority: Critical > Labels: test-stability > Fix For: 1.7.0 > > > https://travis-ci.org/zentol/flink/jobs/407021898 > {code} > Tests run: 1, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 13.082 sec > <<< FAILURE! - in > org.apache.flink.streaming.connectors.fs.bucketing.BucketingSinkFaultToleranceITCase > runCheckpointedProgram(org.apache.flink.streaming.connectors.fs.bucketing.BucketingSinkFaultToleranceITCase) > Time elapsed: 5.696 sec <<< FAILURE! > java.lang.AssertionError: Read line does not match expected pattern. > at org.junit.Assert.fail(Assert.java:88) > at > org.apache.flink.streaming.connectors.fs.bucketing.BucketingSinkFaultToleranceITCase.postSubmit(BucketingSinkFaultToleranceITCase.java:182) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10732) force-shading 1.5.5 maven artifact was not released
[ https://issues.apache.org/jira/browse/FLINK-10732?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16673474#comment-16673474 ] Maximilian Michels commented on FLINK-10732: Possibles fixes: 1) Release a new version of all the other artifacts referencing force-shading:1.5.5 and let it point to an older version, e.g. force-shading:1.5.4 or flink-shaded-force-shading:1.0. 2) Release force-shading:1.5.5 (From what I know this wouldn't be too hard and we have done it for other artifacts in the past) 3) Let downstream deal with it by letting them exclude force-shading during their build (works, but not really a good solution) > force-shading 1.5.5 maven artifact was not released > --- > > Key: FLINK-10732 > URL: https://issues.apache.org/jira/browse/FLINK-10732 > Project: Flink > Issue Type: Bug >Affects Versions: 1.5.5 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Fix For: 1.5.5 > > > The 1.5.5 maven artifact for {{force-shading}} was not deployed. We have to > investigate whether other artifacts are missing as well and how to remedy the > problem, i.e. re-deploy the missing artifacts (which may or may not require a > separate vote). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10356) Add sanity checks to SpillingAdaptiveSpanningRecordDeserializer
[ https://issues.apache.org/jira/browse/FLINK-10356?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16673460#comment-16673460 ] ASF GitHub Bot commented on FLINK-10356: NicoK commented on a change in pull request #6705: [FLINK-10356][network] add sanity checks to SpillingAdaptiveSpanningRecordDeserializer URL: https://github.com/apache/flink/pull/6705#discussion_r230452399 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java ## @@ -104,11 +120,236 @@ public void testHandleMixedLargeRecords() throws Exception { testSerializationRoundTrip(originalRecords, segmentSize); } + /** +* Non-spanning, deserialization reads one byte too many and succeeds. +*/ + @Test + public void testHandleDeserializingTooMuchNonSpanning1() throws Exception { + testHandleWrongDeserialization( + new StringValueDeserializingTooMuch("Test string"), + 32 * 1024, + null); + } + + /** +* Non-spanning, serialization length is 16 (including headers), deserialization reads one byte +* too many and succeeds. +*/ + @Test + public void testHandleDeserializingTooMuchNonSpanning2() throws Exception { + testHandleWrongDeserialization( + new StringValueDeserializingTooMuch("Test string"), Review comment: actually, the type does not really matter - I could just as well use a `LargeObjectType` of an arbitrary length This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add sanity checks to SpillingAdaptiveSpanningRecordDeserializer > --- > > Key: FLINK-10356 > URL: https://issues.apache.org/jira/browse/FLINK-10356 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.5.0, 1.5.1, 1.5.2, 1.5.3, 1.5.4, 1.6.0, 1.6.1, 1.7.0 >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Major > Labels: pull-request-available > > {{SpillingAdaptiveSpanningRecordDeserializer}} doesn't have any consistency > checks for usage calls or serializers behaving properly, e.g. to read only as > many bytes as available/promised for that record. At least these checks > should be added: > # Check that buffers have not been read from yet before adding them (this is > an invariant {{SpillingAdaptiveSpanningRecordDeserializer}} works with and > from what I can see, it is followed now. > # Check that after deserialization, we actually consumed {{recordLength}} > bytes > ** If not, in the spanning deserializer, we currently simply skip the > remaining bytes. > ** But in the non-spanning deserializer, we currently continue from the > wrong offset. > # Protect against {{setNextBuffer}} being called before draining all > available records -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10356) Add sanity checks to SpillingAdaptiveSpanningRecordDeserializer
[ https://issues.apache.org/jira/browse/FLINK-10356?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16673459#comment-16673459 ] ASF GitHub Bot commented on FLINK-10356: NicoK commented on a change in pull request #6705: [FLINK-10356][network] add sanity checks to SpillingAdaptiveSpanningRecordDeserializer URL: https://github.com/apache/flink/pull/6705#discussion_r230452399 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java ## @@ -104,11 +120,236 @@ public void testHandleMixedLargeRecords() throws Exception { testSerializationRoundTrip(originalRecords, segmentSize); } + /** +* Non-spanning, deserialization reads one byte too many and succeeds. +*/ + @Test + public void testHandleDeserializingTooMuchNonSpanning1() throws Exception { + testHandleWrongDeserialization( + new StringValueDeserializingTooMuch("Test string"), + 32 * 1024, + null); + } + + /** +* Non-spanning, serialization length is 16 (including headers), deserialization reads one byte +* too many and succeeds. +*/ + @Test + public void testHandleDeserializingTooMuchNonSpanning2() throws Exception { + testHandleWrongDeserialization( + new StringValueDeserializingTooMuch("Test string"), Review comment: actually, the type does not really matter - I could just as well use a `LargeObjectType` or an arbitrary length This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add sanity checks to SpillingAdaptiveSpanningRecordDeserializer > --- > > Key: FLINK-10356 > URL: https://issues.apache.org/jira/browse/FLINK-10356 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.5.0, 1.5.1, 1.5.2, 1.5.3, 1.5.4, 1.6.0, 1.6.1, 1.7.0 >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Major > Labels: pull-request-available > > {{SpillingAdaptiveSpanningRecordDeserializer}} doesn't have any consistency > checks for usage calls or serializers behaving properly, e.g. to read only as > many bytes as available/promised for that record. At least these checks > should be added: > # Check that buffers have not been read from yet before adding them (this is > an invariant {{SpillingAdaptiveSpanningRecordDeserializer}} works with and > from what I can see, it is followed now. > # Check that after deserialization, we actually consumed {{recordLength}} > bytes > ** If not, in the spanning deserializer, we currently simply skip the > remaining bytes. > ** But in the non-spanning deserializer, we currently continue from the > wrong offset. > # Protect against {{setNextBuffer}} being called before draining all > available records -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] NicoK commented on a change in pull request #6705: [FLINK-10356][network] add sanity checks to SpillingAdaptiveSpanningRecordDeserializer
NicoK commented on a change in pull request #6705: [FLINK-10356][network] add sanity checks to SpillingAdaptiveSpanningRecordDeserializer URL: https://github.com/apache/flink/pull/6705#discussion_r230452399 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java ## @@ -104,11 +120,236 @@ public void testHandleMixedLargeRecords() throws Exception { testSerializationRoundTrip(originalRecords, segmentSize); } + /** +* Non-spanning, deserialization reads one byte too many and succeeds. +*/ + @Test + public void testHandleDeserializingTooMuchNonSpanning1() throws Exception { + testHandleWrongDeserialization( + new StringValueDeserializingTooMuch("Test string"), + 32 * 1024, + null); + } + + /** +* Non-spanning, serialization length is 16 (including headers), deserialization reads one byte +* too many and succeeds. +*/ + @Test + public void testHandleDeserializingTooMuchNonSpanning2() throws Exception { + testHandleWrongDeserialization( + new StringValueDeserializingTooMuch("Test string"), Review comment: actually, the type does not really matter - I could just as well use a `LargeObjectType` or an arbitrary length This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] NicoK commented on a change in pull request #6705: [FLINK-10356][network] add sanity checks to SpillingAdaptiveSpanningRecordDeserializer
NicoK commented on a change in pull request #6705: [FLINK-10356][network] add sanity checks to SpillingAdaptiveSpanningRecordDeserializer URL: https://github.com/apache/flink/pull/6705#discussion_r230452399 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java ## @@ -104,11 +120,236 @@ public void testHandleMixedLargeRecords() throws Exception { testSerializationRoundTrip(originalRecords, segmentSize); } + /** +* Non-spanning, deserialization reads one byte too many and succeeds. +*/ + @Test + public void testHandleDeserializingTooMuchNonSpanning1() throws Exception { + testHandleWrongDeserialization( + new StringValueDeserializingTooMuch("Test string"), + 32 * 1024, + null); + } + + /** +* Non-spanning, serialization length is 16 (including headers), deserialization reads one byte +* too many and succeeds. +*/ + @Test + public void testHandleDeserializingTooMuchNonSpanning2() throws Exception { + testHandleWrongDeserialization( + new StringValueDeserializingTooMuch("Test string"), Review comment: actually, the type does not really matter - I could just as well use a `LargeObjectType` of an arbitrary length This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Assigned] (FLINK-10353) Restoring a KafkaProducer with Semantic.EXACTLY_ONCE from a savepoint written with Semantic.AT_LEAST_ONCE fails with NPE
[ https://issues.apache.org/jira/browse/FLINK-10353?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stefan Richter reassigned FLINK-10353: -- Assignee: Stefan Richter > Restoring a KafkaProducer with Semantic.EXACTLY_ONCE from a savepoint written > with Semantic.AT_LEAST_ONCE fails with NPE > > > Key: FLINK-10353 > URL: https://issues.apache.org/jira/browse/FLINK-10353 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.5.3, 1.6.0 >Reporter: Konstantin Knauf >Assignee: Stefan Richter >Priority: Critical > Labels: pull-request-available > > If a KafkaProducer with {{Semantic.EXACTLY_ONCE}} is restored from a > savepoint written with {{Semantic.AT_LEAST_ONCE}} the job fails on restore > with the NPE below. This makes it impossible to upgrade an AT_LEAST_ONCE > pipeline to an EXACTL_ONCE pipeline statefully. > {quote} > java.lang.NullPointerException > at java.util.Hashtable.put(Hashtable.java:460) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initTransactionalProducer(FlinkKafkaProducer011.java:955) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.recoverAndCommit(FlinkKafkaProducer011.java:733) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.recoverAndCommit(FlinkKafkaProducer011.java:93) > at > org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.recoverAndCommitInternal(TwoPhaseCommitSinkFunction.java:373) > at > org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.initializeState(TwoPhaseCommitSinkFunction.java:333) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initializeState(FlinkKafkaProducer011.java:867) > at > org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178) > at > org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:254) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) > at java.lang.Thread.run(Thread.java:748){quote} > The reason is, that for {{Semantic.AT_LEAST_ONCE}} the snapshotted state of > the {{TwoPhaseCommitFunction}} is of the form > "TransactionHolder\{handle=KafkaTransactionState [transactionalId=null, > producerId=-1, epoch=-1], transactionStartTime=1537175471175}". -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10720) Add stress deployment end-to-end test
[ https://issues.apache.org/jira/browse/FLINK-10720?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16673442#comment-16673442 ] ASF GitHub Bot commented on FLINK-10720: StefanRRichter commented on issue #6994: [FLINK-10720][tests] Add deployment end-to-end stress test with many … URL: https://github.com/apache/flink/pull/6994#issuecomment-435449615 @tillrohrmann that depends how we want to interpret the result. Right now, the test is passed when it somehow manages to do one successsful recovery. If we want to test that the first recovery must already be successful, we could limit the number of allowed failed attempts to 1. Would you prefer to enforce succesful recovery in the first attempt? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add stress deployment end-to-end test > - > > Key: FLINK-10720 > URL: https://issues.apache.org/jira/browse/FLINK-10720 > Project: Flink > Issue Type: Sub-task > Components: E2E Tests >Affects Versions: 1.7.0 >Reporter: Till Rohrmann >Assignee: Stefan Richter >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > In order to test Flink's scalability, I suggest to add an end-to-end test > which tests the deployment of a job which is very demanding. The job should > have large {{TaskDeploymentDescriptors}} (e.g. a job using union state or > having a high degree of parallelism). That way we can test that the > serialization overhead of the TDDs does not affect the health of the cluster > (e.g. heartbeats are not affected because the serialization does not happen > in the main thread). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] StefanRRichter commented on issue #6994: [FLINK-10720][tests] Add deployment end-to-end stress test with many …
StefanRRichter commented on issue #6994: [FLINK-10720][tests] Add deployment end-to-end stress test with many … URL: https://github.com/apache/flink/pull/6994#issuecomment-435449615 @tillrohrmann that depends how we want to interpret the result. Right now, the test is passed when it somehow manages to do one successsful recovery. If we want to test that the first recovery must already be successful, we could limit the number of allowed failed attempts to 1. Would you prefer to enforce succesful recovery in the first attempt? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10455) Potential Kafka producer leak in case of failures
[ https://issues.apache.org/jira/browse/FLINK-10455?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16673441#comment-16673441 ] ASF GitHub Bot commented on FLINK-10455: azagrebin commented on a change in pull request #6989: [FLINK-10455][Kafka Tx] Close transactional producers in case of failure and termination URL: https://github.com/apache/flink/pull/6989#discussion_r230447543 ## File path: flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java ## @@ -671,6 +672,9 @@ public void close() throws FlinkKafka011Exception { } // make sure we propagate pending errors checkErroneous(); + pendingTransactions().values().forEach(transaction -> Review comment: `TransactionManager#maybeFailWithError` contains just a check, it does not help with the deadlock so far This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Potential Kafka producer leak in case of failures > - > > Key: FLINK-10455 > URL: https://issues.apache.org/jira/browse/FLINK-10455 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.5.2 >Reporter: Nico Kruber >Assignee: Andrey Zagrebin >Priority: Major > Labels: pull-request-available > > If the Kafka brokers' timeout is too low for our checkpoint interval [1], we > may get an {{ProducerFencedException}}. Documentation around > {{ProducerFencedException}} explicitly states that we should close the > producer after encountering it. > By looking at the code, it doesn't seem like this is actually done in > {{FlinkKafkaProducer011}}. Also, in case one transaction's commit in > {{TwoPhaseCommitSinkFunction#notifyCheckpointComplete}} fails with an > exception, we don't clean up (nor try to commit) any other transaction. > -> from what I see, {{TwoPhaseCommitSinkFunction#notifyCheckpointComplete}} > simply iterates over the {{pendingCommitTransactions}} which is not touched > during {{close()}} > Now if we restart the failing job on the same Flink cluster, any resources > from the previous attempt will still linger around. > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/connectors/kafka.html#kafka-011 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] azagrebin commented on a change in pull request #6989: [FLINK-10455][Kafka Tx] Close transactional producers in case of failure and termination
azagrebin commented on a change in pull request #6989: [FLINK-10455][Kafka Tx] Close transactional producers in case of failure and termination URL: https://github.com/apache/flink/pull/6989#discussion_r230447543 ## File path: flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java ## @@ -671,6 +672,9 @@ public void close() throws FlinkKafka011Exception { } // make sure we propagate pending errors checkErroneous(); + pendingTransactions().values().forEach(transaction -> Review comment: `TransactionManager#maybeFailWithError` contains just a check, it does not help with the deadlock so far This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10353) Restoring a KafkaProducer with Semantic.EXACTLY_ONCE from a savepoint written with Semantic.AT_LEAST_ONCE fails with NPE
[ https://issues.apache.org/jira/browse/FLINK-10353?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16673437#comment-16673437 ] ASF GitHub Bot commented on FLINK-10353: StefanRRichter opened a new pull request #7010: [FLINK-10353][kafka] Support change of transactional semantics in Kaf… URL: https://github.com/apache/flink/pull/7010 …ka Producer with existing state ## What is the purpose of the change This PR changes `FlinkKafkaProducer` and `FlinkKafkaProducer011` to support a change of transactional semantics when restoring from existing state, e.g. a savepoint. ## Brief change log - Introduced `KafkaTransactionState#isTransactional` to distinguish which transactional handling should be applied instead of relying on what is currently configured. - Call `initializeUserContext` in `TwoPhaseCommitSinkFunction` for all cases that did not recover a user context. - Consider removing tranactional id from the properties when creating a new producer to deactivate transactional semantics if no longer required. ## Verifying this change Added `FlinkKafkaProducer(11)ITCase#testMigrateFromAtLeastOnceToExactlyOnce` and `#testMigrateFromAtExactlyOnceToAtLeastOnce` ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Restoring a KafkaProducer with Semantic.EXACTLY_ONCE from a savepoint written > with Semantic.AT_LEAST_ONCE fails with NPE > > > Key: FLINK-10353 > URL: https://issues.apache.org/jira/browse/FLINK-10353 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.5.3, 1.6.0 >Reporter: Konstantin Knauf >Priority: Critical > Labels: pull-request-available > > If a KafkaProducer with {{Semantic.EXACTLY_ONCE}} is restored from a > savepoint written with {{Semantic.AT_LEAST_ONCE}} the job fails on restore > with the NPE below. This makes it impossible to upgrade an AT_LEAST_ONCE > pipeline to an EXACTL_ONCE pipeline statefully. > {quote} > java.lang.NullPointerException > at java.util.Hashtable.put(Hashtable.java:460) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initTransactionalProducer(FlinkKafkaProducer011.java:955) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.recoverAndCommit(FlinkKafkaProducer011.java:733) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.recoverAndCommit(FlinkKafkaProducer011.java:93) > at > org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.recoverAndCommitInternal(TwoPhaseCommitSinkFunction.java:373) > at > org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.initializeState(TwoPhaseCommitSinkFunction.java:333) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initializeState(FlinkKafkaProducer011.java:867) > at > org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178) > at > org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:254) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) > at java.lang.Thread.run(Thread.java:748){quote} > The reason is, that for
[jira] [Updated] (FLINK-10353) Restoring a KafkaProducer with Semantic.EXACTLY_ONCE from a savepoint written with Semantic.AT_LEAST_ONCE fails with NPE
[ https://issues.apache.org/jira/browse/FLINK-10353?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-10353: --- Labels: pull-request-available (was: ) > Restoring a KafkaProducer with Semantic.EXACTLY_ONCE from a savepoint written > with Semantic.AT_LEAST_ONCE fails with NPE > > > Key: FLINK-10353 > URL: https://issues.apache.org/jira/browse/FLINK-10353 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.5.3, 1.6.0 >Reporter: Konstantin Knauf >Priority: Critical > Labels: pull-request-available > > If a KafkaProducer with {{Semantic.EXACTLY_ONCE}} is restored from a > savepoint written with {{Semantic.AT_LEAST_ONCE}} the job fails on restore > with the NPE below. This makes it impossible to upgrade an AT_LEAST_ONCE > pipeline to an EXACTL_ONCE pipeline statefully. > {quote} > java.lang.NullPointerException > at java.util.Hashtable.put(Hashtable.java:460) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initTransactionalProducer(FlinkKafkaProducer011.java:955) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.recoverAndCommit(FlinkKafkaProducer011.java:733) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.recoverAndCommit(FlinkKafkaProducer011.java:93) > at > org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.recoverAndCommitInternal(TwoPhaseCommitSinkFunction.java:373) > at > org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.initializeState(TwoPhaseCommitSinkFunction.java:333) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initializeState(FlinkKafkaProducer011.java:867) > at > org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178) > at > org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:254) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) > at java.lang.Thread.run(Thread.java:748){quote} > The reason is, that for {{Semantic.AT_LEAST_ONCE}} the snapshotted state of > the {{TwoPhaseCommitFunction}} is of the form > "TransactionHolder\{handle=KafkaTransactionState [transactionalId=null, > producerId=-1, epoch=-1], transactionStartTime=1537175471175}". -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] StefanRRichter opened a new pull request #7010: [FLINK-10353][kafka] Support change of transactional semantics in Kaf…
StefanRRichter opened a new pull request #7010: [FLINK-10353][kafka] Support change of transactional semantics in Kaf… URL: https://github.com/apache/flink/pull/7010 …ka Producer with existing state ## What is the purpose of the change This PR changes `FlinkKafkaProducer` and `FlinkKafkaProducer011` to support a change of transactional semantics when restoring from existing state, e.g. a savepoint. ## Brief change log - Introduced `KafkaTransactionState#isTransactional` to distinguish which transactional handling should be applied instead of relying on what is currently configured. - Call `initializeUserContext` in `TwoPhaseCommitSinkFunction` for all cases that did not recover a user context. - Consider removing tranactional id from the properties when creating a new producer to deactivate transactional semantics if no longer required. ## Verifying this change Added `FlinkKafkaProducer(11)ITCase#testMigrateFromAtLeastOnceToExactlyOnce` and `#testMigrateFromAtExactlyOnceToAtLeastOnce` ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] isunjin commented on a change in pull request #6684: [FLINK-10205] Batch Job: InputSplit Fault tolerant for DataSource…
isunjin commented on a change in pull request #6684: [FLINK-10205] Batch Job: InputSplit Fault tolerant for DataSource… URL: https://github.com/apache/flink/pull/6684#discussion_r230445728 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java ## @@ -249,6 +254,19 @@ public CoLocationConstraint getLocationConstraint() { return locationConstraint; } + public InputSplit getNextInputSplit(int index, String host) { + final int taskId = this.getParallelSubtaskIndex(); + synchronized (this.inputSplits) { + if (index < this.inputSplits.size()) { + return this.inputSplits.get(index); Review comment: Thanks, removed. It was intent for support speculative execution, that multiple version of the same executionVertex can be run at same time, but i will postpone the feature and add it later This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10205) Batch Job: InputSplit Fault tolerant for DataSourceTask
[ https://issues.apache.org/jira/browse/FLINK-10205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16673434#comment-16673434 ] ASF GitHub Bot commented on FLINK-10205: isunjin commented on a change in pull request #6684: [FLINK-10205] Batch Job: InputSplit Fault tolerant for DataSource… URL: https://github.com/apache/flink/pull/6684#discussion_r230445728 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java ## @@ -249,6 +254,19 @@ public CoLocationConstraint getLocationConstraint() { return locationConstraint; } + public InputSplit getNextInputSplit(int index, String host) { + final int taskId = this.getParallelSubtaskIndex(); + synchronized (this.inputSplits) { + if (index < this.inputSplits.size()) { + return this.inputSplits.get(index); Review comment: Thanks, removed. It was intent for support speculative execution, that multiple version of the same executionVertex can be run at same time, but i will postpone the feature and add it later This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Batch Job: InputSplit Fault tolerant for DataSourceTask > --- > > Key: FLINK-10205 > URL: https://issues.apache.org/jira/browse/FLINK-10205 > Project: Flink > Issue Type: Sub-task > Components: JobManager >Affects Versions: 1.6.1, 1.6.2, 1.7.0 >Reporter: JIN SUN >Assignee: JIN SUN >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > Original Estimate: 168h > Remaining Estimate: 168h > > Today DataSource Task pull InputSplits from JobManager to achieve better > performance, however, when a DataSourceTask failed and rerun, it will not get > the same splits as its previous version. this will introduce inconsistent > result or even data corruption. > Furthermore, if there are two executions run at the same time (in batch > scenario), this two executions should process same splits. > we need to fix the issue to make the inputs of a DataSourceTask > deterministic. The propose is save all splits into ExecutionVertex and > DataSourceTask will pull split from there. > document: > [https://docs.google.com/document/d/1FdZdcA63tPUEewcCimTFy9Iz2jlVlMRANZkO4RngIuk/edit?usp=sharing] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10633) End-to-end test: Prometheus reporter
[ https://issues.apache.org/jira/browse/FLINK-10633?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16673430#comment-16673430 ] ASF GitHub Bot commented on FLINK-10633: zentol commented on issue #7003: [FLINK-10633][prometheus] Add E2E test URL: https://github.com/apache/flink/pull/7003#issuecomment-435447361 For one this was an experiment; I hated writing this test in bash, so instead I ported it to java. We may use it for future tests, we may rework it entirely. But from what I've seen things get done significantly faster if at least you have some starting point. The majority of the code here is sane and easily re-usable in whatever direction we want to go with. The mistake we did in the past was to jump ahead at the first stuff we came up with (our bash stuff) and not critically analyze it when doing so. Furthermore, I don't consider this a framework. It's too simply for that, all it adds is a programmatic way to interact with flink-dist. That's it. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > End-to-end test: Prometheus reporter > > > Key: FLINK-10633 > URL: https://issues.apache.org/jira/browse/FLINK-10633 > Project: Flink > Issue Type: Sub-task > Components: E2E Tests >Affects Versions: 1.7.0 >Reporter: Till Rohrmann >Assignee: Chesnay Schepler >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > Add an end-to-end test using the {{PrometheusReporter}} to verify that all > metrics are properly reported. Additionally verify that the newly introduce > RocksDB metrics are accessible (see FLINK-10423). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] zentol commented on issue #7003: [FLINK-10633][prometheus] Add E2E test
zentol commented on issue #7003: [FLINK-10633][prometheus] Add E2E test URL: https://github.com/apache/flink/pull/7003#issuecomment-435447361 For one this was an experiment; I hated writing this test in bash, so instead I ported it to java. We may use it for future tests, we may rework it entirely. But from what I've seen things get done significantly faster if at least you have some starting point. The majority of the code here is sane and easily re-usable in whatever direction we want to go with. The mistake we did in the past was to jump ahead at the first stuff we came up with (our bash stuff) and not critically analyze it when doing so. Furthermore, I don't consider this a framework. It's too simply for that, all it adds is a programmatic way to interact with flink-dist. That's it. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10720) Add stress deployment end-to-end test
[ https://issues.apache.org/jira/browse/FLINK-10720?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16673421#comment-16673421 ] ASF GitHub Bot commented on FLINK-10720: tillrohrmann commented on a change in pull request #6994: [FLINK-10720][tests] Add deployment end-to-end stress test with many … URL: https://github.com/apache/flink/pull/6994#discussion_r230441106 ## File path: flink-end-to-end-tests/test-scripts/test_heavy_deployment.sh ## @@ -0,0 +1,50 @@ +#!/usr/bin/env bash + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +source "$(dirname "$0")"/common.sh + +CHECKPOINT_DIR="file://$TEST_DATA_DIR/savepoint-e2e-test-chckpt-dir" + +TEST=flink-heavy-deployment-stress-test +TEST_PROGRAM_NAME=HeavyDeploymentStressTestProgram +TEST_PROGRAM_JAR=${END_TO_END_DIR}/$TEST/target/$TEST_PROGRAM_NAME.jar + +set_conf "taskmanager.heap.mb" "256" # 256Mb x 20 TMs = 5Gb total heap + +set_conf "taskmanager.memory.size" "8" # 8Mb +set_conf "taskmanager.network.memory.min" "8mb" +set_conf "taskmanager.network.memory.max" "8mb" +set_conf "taskmanager.memory.segment-size" "8kb" + +set_conf "taskmanager.numberOfTaskSlots" "11" + +start_cluster + +NUM_TMS=20 + +#20 x 13 slots to support a parallelism of 256 +echo "Start $NUM_TMS more task managers" +for i in `seq 1 $NUM_TMS`; do +$FLINK_DIR/bin/taskmanager.sh start +done + +$FLINK_DIR/bin/flink run -p 200 ${TEST_PROGRAM_JAR} \ +--environment.max_parallelism 1024 --environment.parallelism 200 \ Review comment: Why don't we use all slots? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add stress deployment end-to-end test > - > > Key: FLINK-10720 > URL: https://issues.apache.org/jira/browse/FLINK-10720 > Project: Flink > Issue Type: Sub-task > Components: E2E Tests >Affects Versions: 1.7.0 >Reporter: Till Rohrmann >Assignee: Stefan Richter >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > In order to test Flink's scalability, I suggest to add an end-to-end test > which tests the deployment of a job which is very demanding. The job should > have large {{TaskDeploymentDescriptors}} (e.g. a job using union state or > having a high degree of parallelism). That way we can test that the > serialization overhead of the TDDs does not affect the health of the cluster > (e.g. heartbeats are not affected because the serialization does not happen > in the main thread). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10720) Add stress deployment end-to-end test
[ https://issues.apache.org/jira/browse/FLINK-10720?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16673426#comment-16673426 ] ASF GitHub Bot commented on FLINK-10720: tillrohrmann commented on a change in pull request #6994: [FLINK-10720][tests] Add deployment end-to-end stress test with many … URL: https://github.com/apache/flink/pull/6994#discussion_r230440912 ## File path: flink-end-to-end-tests/test-scripts/test_heavy_deployment.sh ## @@ -0,0 +1,50 @@ +#!/usr/bin/env bash + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +source "$(dirname "$0")"/common.sh + +CHECKPOINT_DIR="file://$TEST_DATA_DIR/savepoint-e2e-test-chckpt-dir" + +TEST=flink-heavy-deployment-stress-test +TEST_PROGRAM_NAME=HeavyDeploymentStressTestProgram +TEST_PROGRAM_JAR=${END_TO_END_DIR}/$TEST/target/$TEST_PROGRAM_NAME.jar + +set_conf "taskmanager.heap.mb" "256" # 256Mb x 20 TMs = 5Gb total heap + +set_conf "taskmanager.memory.size" "8" # 8Mb +set_conf "taskmanager.network.memory.min" "8mb" +set_conf "taskmanager.network.memory.max" "8mb" +set_conf "taskmanager.memory.segment-size" "8kb" + +set_conf "taskmanager.numberOfTaskSlots" "11" + +start_cluster Review comment: `start_cluster` already starts a single TM. Thus, we should have 21 TMs running. Is this correct? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add stress deployment end-to-end test > - > > Key: FLINK-10720 > URL: https://issues.apache.org/jira/browse/FLINK-10720 > Project: Flink > Issue Type: Sub-task > Components: E2E Tests >Affects Versions: 1.7.0 >Reporter: Till Rohrmann >Assignee: Stefan Richter >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > In order to test Flink's scalability, I suggest to add an end-to-end test > which tests the deployment of a job which is very demanding. The job should > have large {{TaskDeploymentDescriptors}} (e.g. a job using union state or > having a high degree of parallelism). That way we can test that the > serialization overhead of the TDDs does not affect the health of the cluster > (e.g. heartbeats are not affected because the serialization does not happen > in the main thread). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10720) Add stress deployment end-to-end test
[ https://issues.apache.org/jira/browse/FLINK-10720?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16673422#comment-16673422 ] ASF GitHub Bot commented on FLINK-10720: tillrohrmann commented on a change in pull request #6994: [FLINK-10720][tests] Add deployment end-to-end stress test with many … URL: https://github.com/apache/flink/pull/6994#discussion_r230439584 ## File path: flink-end-to-end-tests/test-scripts/test_heavy_deployment.sh ## @@ -0,0 +1,50 @@ +#!/usr/bin/env bash + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +source "$(dirname "$0")"/common.sh + +CHECKPOINT_DIR="file://$TEST_DATA_DIR/savepoint-e2e-test-chckpt-dir" + +TEST=flink-heavy-deployment-stress-test +TEST_PROGRAM_NAME=HeavyDeploymentStressTestProgram +TEST_PROGRAM_JAR=${END_TO_END_DIR}/$TEST/target/$TEST_PROGRAM_NAME.jar + +set_conf "taskmanager.heap.mb" "256" # 256Mb x 20 TMs = 5Gb total heap + +set_conf "taskmanager.memory.size" "8" # 8Mb +set_conf "taskmanager.network.memory.min" "8mb" +set_conf "taskmanager.network.memory.max" "8mb" +set_conf "taskmanager.memory.segment-size" "8kb" + +set_conf "taskmanager.numberOfTaskSlots" "11" + +start_cluster + +NUM_TMS=20 + +#20 x 13 slots to support a parallelism of 256 Review comment: Comment is not correct. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add stress deployment end-to-end test > - > > Key: FLINK-10720 > URL: https://issues.apache.org/jira/browse/FLINK-10720 > Project: Flink > Issue Type: Sub-task > Components: E2E Tests >Affects Versions: 1.7.0 >Reporter: Till Rohrmann >Assignee: Stefan Richter >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > In order to test Flink's scalability, I suggest to add an end-to-end test > which tests the deployment of a job which is very demanding. The job should > have large {{TaskDeploymentDescriptors}} (e.g. a job using union state or > having a high degree of parallelism). That way we can test that the > serialization overhead of the TDDs does not affect the health of the cluster > (e.g. heartbeats are not affected because the serialization does not happen > in the main thread). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10720) Add stress deployment end-to-end test
[ https://issues.apache.org/jira/browse/FLINK-10720?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16673424#comment-16673424 ] ASF GitHub Bot commented on FLINK-10720: tillrohrmann commented on a change in pull request #6994: [FLINK-10720][tests] Add deployment end-to-end stress test with many … URL: https://github.com/apache/flink/pull/6994#discussion_r230443405 ## File path: flink-end-to-end-tests/flink-heavy-deployment-stress-test/src/main/java/org/apache/flink/deployment/HeavyDeploymentStressTestProgram.java ## @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.deployment; + +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.OperatorStateStore; +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; +import org.apache.flink.runtime.state.CheckpointListener; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.DiscardingSink; +import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; +import org.apache.flink.util.Preconditions; + +import static org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.setupEnvironment; + + +/** + * End-to-end test for heavy deployment descriptors. This test creates a heavy deployment by producing inflated meta + * data for the source's operator state. The state is registered as union state and will be multiplied in deployment. + */ +public class HeavyDeploymentStressTestProgram { + + private static final ConfigOption NUM_LIST_STATES_PER_OP = ConfigOptions + .key("heavy_deployment_test.num_list_states_per_op") + .defaultValue(100); + + private static final ConfigOption NUM_PARTITIONS_PER_LIST_STATE = ConfigOptions + .key("heavy_deployment_test.num_partitions_per_list_state") + .defaultValue(100); + + public static void main(String[] args) throws Exception { + + final ParameterTool pt = ParameterTool.fromArgs(args); + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + setupEnvironment(env, pt); + + final int numStates = + pt.getInt(NUM_LIST_STATES_PER_OP.key(), NUM_LIST_STATES_PER_OP.defaultValue()); + final int numPartitionsPerState = + pt.getInt(NUM_PARTITIONS_PER_LIST_STATE.key(), NUM_PARTITIONS_PER_LIST_STATE.defaultValue()); + + Preconditions.checkState(env.getCheckpointInterval() > 0L, "Checkpointing must be enabled for this test!"); + + env.addSource(new SimpleEndlessSourceWithBloatedState(numStates, numPartitionsPerState)).setParallelism(env.getParallelism()) + .addSink(new DiscardingSink<>()).setParallelism(1); + + env.execute("HeavyDeploymentStressTestProgram"); + } + + /** +* Source with dummy operator state that results in inflated meta data. +*/ + static class SimpleEndlessSourceWithBloatedState extends RichParallelSourceFunction Review comment: `serialVersionUID` missing This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add stress deployment end-to-end test > - > > Key: FLINK-10720 > URL: https://issues.apache.org/jira/browse/FLINK-10720 > Project:
[jira] [Commented] (FLINK-10720) Add stress deployment end-to-end test
[ https://issues.apache.org/jira/browse/FLINK-10720?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16673428#comment-16673428 ] ASF GitHub Bot commented on FLINK-10720: tillrohrmann commented on a change in pull request #6994: [FLINK-10720][tests] Add deployment end-to-end stress test with many … URL: https://github.com/apache/flink/pull/6994#discussion_r230439813 ## File path: flink-end-to-end-tests/test-scripts/test_heavy_deployment.sh ## @@ -0,0 +1,50 @@ +#!/usr/bin/env bash + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +source "$(dirname "$0")"/common.sh + +CHECKPOINT_DIR="file://$TEST_DATA_DIR/savepoint-e2e-test-chckpt-dir" + +TEST=flink-heavy-deployment-stress-test +TEST_PROGRAM_NAME=HeavyDeploymentStressTestProgram +TEST_PROGRAM_JAR=${END_TO_END_DIR}/$TEST/target/$TEST_PROGRAM_NAME.jar + +set_conf "taskmanager.heap.mb" "256" # 256Mb x 20 TMs = 5Gb total heap + +set_conf "taskmanager.memory.size" "8" # 8Mb +set_conf "taskmanager.network.memory.min" "8mb" +set_conf "taskmanager.network.memory.max" "8mb" +set_conf "taskmanager.memory.segment-size" "8kb" + +set_conf "taskmanager.numberOfTaskSlots" "11" + +start_cluster + +NUM_TMS=20 + +#20 x 13 slots to support a parallelism of 256 +echo "Start $NUM_TMS more task managers" +for i in `seq 1 $NUM_TMS`; do +$FLINK_DIR/bin/taskmanager.sh start +done Review comment: I think it's better to use `common.sh#start_taskmanagers` here. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add stress deployment end-to-end test > - > > Key: FLINK-10720 > URL: https://issues.apache.org/jira/browse/FLINK-10720 > Project: Flink > Issue Type: Sub-task > Components: E2E Tests >Affects Versions: 1.7.0 >Reporter: Till Rohrmann >Assignee: Stefan Richter >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > In order to test Flink's scalability, I suggest to add an end-to-end test > which tests the deployment of a job which is very demanding. The job should > have large {{TaskDeploymentDescriptors}} (e.g. a job using union state or > having a high degree of parallelism). That way we can test that the > serialization overhead of the TDDs does not affect the health of the cluster > (e.g. heartbeats are not affected because the serialization does not happen > in the main thread). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10720) Add stress deployment end-to-end test
[ https://issues.apache.org/jira/browse/FLINK-10720?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16673423#comment-16673423 ] ASF GitHub Bot commented on FLINK-10720: tillrohrmann commented on a change in pull request #6994: [FLINK-10720][tests] Add deployment end-to-end stress test with many … URL: https://github.com/apache/flink/pull/6994#discussion_r230440331 ## File path: flink-end-to-end-tests/test-scripts/test_heavy_deployment.sh ## @@ -0,0 +1,50 @@ +#!/usr/bin/env bash + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +source "$(dirname "$0")"/common.sh + +CHECKPOINT_DIR="file://$TEST_DATA_DIR/savepoint-e2e-test-chckpt-dir" + +TEST=flink-heavy-deployment-stress-test +TEST_PROGRAM_NAME=HeavyDeploymentStressTestProgram +TEST_PROGRAM_JAR=${END_TO_END_DIR}/$TEST/target/$TEST_PROGRAM_NAME.jar + +set_conf "taskmanager.heap.mb" "256" # 256Mb x 20 TMs = 5Gb total heap + +set_conf "taskmanager.memory.size" "8" # 8Mb +set_conf "taskmanager.network.memory.min" "8mb" +set_conf "taskmanager.network.memory.max" "8mb" +set_conf "taskmanager.memory.segment-size" "8kb" + +set_conf "taskmanager.numberOfTaskSlots" "11" + +start_cluster + +NUM_TMS=20 + +#20 x 13 slots to support a parallelism of 256 +echo "Start $NUM_TMS more task managers" +for i in `seq 1 $NUM_TMS`; do +$FLINK_DIR/bin/taskmanager.sh start +done + +$FLINK_DIR/bin/flink run -p 200 ${TEST_PROGRAM_JAR} \ +--environment.max_parallelism 1024 --environment.parallelism 200 \ Review comment: `-p 200` should not be needed if we set `--environment.parallelism`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add stress deployment end-to-end test > - > > Key: FLINK-10720 > URL: https://issues.apache.org/jira/browse/FLINK-10720 > Project: Flink > Issue Type: Sub-task > Components: E2E Tests >Affects Versions: 1.7.0 >Reporter: Till Rohrmann >Assignee: Stefan Richter >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > In order to test Flink's scalability, I suggest to add an end-to-end test > which tests the deployment of a job which is very demanding. The job should > have large {{TaskDeploymentDescriptors}} (e.g. a job using union state or > having a high degree of parallelism). That way we can test that the > serialization overhead of the TDDs does not affect the health of the cluster > (e.g. heartbeats are not affected because the serialization does not happen > in the main thread). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10720) Add stress deployment end-to-end test
[ https://issues.apache.org/jira/browse/FLINK-10720?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16673427#comment-16673427 ] ASF GitHub Bot commented on FLINK-10720: tillrohrmann commented on a change in pull request #6994: [FLINK-10720][tests] Add deployment end-to-end stress test with many … URL: https://github.com/apache/flink/pull/6994#discussion_r230443577 ## File path: flink-end-to-end-tests/flink-heavy-deployment-stress-test/pom.xml ## @@ -0,0 +1,99 @@ + + +http://maven.apache.org/POM/4.0.0; xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; +xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd;> + + 4.0.0 + + + org.apache.flink + flink-end-to-end-tests + 1.7-SNAPSHOT + .. + + + flink-heavy-deployment-stress-test + flink-heavy-deployment-stress-test + jar + + + + org.apache.flink + flink-core + ${project.version} + + + org.apache.flink + flink-java + ${project.version} + + + org.apache.flink + flink-streaming-java_2.11 + ${project.version} + + + org.apache.flink + flink-datastream-allround-test + ${project.version} + + + junit + junit + Review comment: Not needed imo This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add stress deployment end-to-end test > - > > Key: FLINK-10720 > URL: https://issues.apache.org/jira/browse/FLINK-10720 > Project: Flink > Issue Type: Sub-task > Components: E2E Tests >Affects Versions: 1.7.0 >Reporter: Till Rohrmann >Assignee: Stefan Richter >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > In order to test Flink's scalability, I suggest to add an end-to-end test > which tests the deployment of a job which is very demanding. The job should > have large {{TaskDeploymentDescriptors}} (e.g. a job using union state or > having a high degree of parallelism). That way we can test that the > serialization overhead of the TDDs does not affect the health of the cluster > (e.g. heartbeats are not affected because the serialization does not happen > in the main thread). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10720) Add stress deployment end-to-end test
[ https://issues.apache.org/jira/browse/FLINK-10720?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16673425#comment-16673425 ] ASF GitHub Bot commented on FLINK-10720: tillrohrmann commented on a change in pull request #6994: [FLINK-10720][tests] Add deployment end-to-end stress test with many … URL: https://github.com/apache/flink/pull/6994#discussion_r230443486 ## File path: flink-end-to-end-tests/flink-heavy-deployment-stress-test/pom.xml ## @@ -0,0 +1,99 @@ + + +http://maven.apache.org/POM/4.0.0; xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; +xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd;> + + 4.0.0 + + + org.apache.flink + flink-end-to-end-tests + 1.7-SNAPSHOT + .. + + + flink-heavy-deployment-stress-test + flink-heavy-deployment-stress-test + jar + + + + org.apache.flink + flink-core + ${project.version} + + + org.apache.flink + flink-java + ${project.version} + Review comment: I think we don't need these two dependencies. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add stress deployment end-to-end test > - > > Key: FLINK-10720 > URL: https://issues.apache.org/jira/browse/FLINK-10720 > Project: Flink > Issue Type: Sub-task > Components: E2E Tests >Affects Versions: 1.7.0 >Reporter: Till Rohrmann >Assignee: Stefan Richter >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > In order to test Flink's scalability, I suggest to add an end-to-end test > which tests the deployment of a job which is very demanding. The job should > have large {{TaskDeploymentDescriptors}} (e.g. a job using union state or > having a high degree of parallelism). That way we can test that the > serialization overhead of the TDDs does not affect the health of the cluster > (e.g. heartbeats are not affected because the serialization does not happen > in the main thread). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10720) Add stress deployment end-to-end test
[ https://issues.apache.org/jira/browse/FLINK-10720?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16673420#comment-16673420 ] ASF GitHub Bot commented on FLINK-10720: tillrohrmann commented on a change in pull request #6994: [FLINK-10720][tests] Add deployment end-to-end stress test with many … URL: https://github.com/apache/flink/pull/6994#discussion_r230443678 ## File path: flink-end-to-end-tests/flink-heavy-deployment-stress-test/pom.xml ## @@ -0,0 +1,99 @@ + + +http://maven.apache.org/POM/4.0.0; xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; +xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd;> + + 4.0.0 + + + org.apache.flink + flink-end-to-end-tests + 1.7-SNAPSHOT + .. + + + flink-heavy-deployment-stress-test + flink-heavy-deployment-stress-test + jar + + + + org.apache.flink + flink-core + ${project.version} + + + org.apache.flink + flink-java + ${project.version} + + + org.apache.flink + flink-streaming-java_2.11 + ${project.version} + + + org.apache.flink + flink-datastream-allround-test + ${project.version} + + + junit + junit + + + org.apache.flink + flink-test-utils_2.11 + ${project.version} + compile + + + log4j + log4j + Review comment: I don't think that we need this dependency. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add stress deployment end-to-end test > - > > Key: FLINK-10720 > URL: https://issues.apache.org/jira/browse/FLINK-10720 > Project: Flink > Issue Type: Sub-task > Components: E2E Tests >Affects Versions: 1.7.0 >Reporter: Till Rohrmann >Assignee: Stefan Richter >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > In order to test Flink's scalability, I suggest to add an end-to-end test > which tests the deployment of a job which is very demanding. The job should > have large {{TaskDeploymentDescriptors}} (e.g. a job using union state or > having a high degree of parallelism). That way we can test that the > serialization overhead of the TDDs does not affect the health of the cluster > (e.g. heartbeats are not affected because the serialization does not happen > in the main thread). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] tillrohrmann commented on a change in pull request #6994: [FLINK-10720][tests] Add deployment end-to-end stress test with many …
tillrohrmann commented on a change in pull request #6994: [FLINK-10720][tests] Add deployment end-to-end stress test with many … URL: https://github.com/apache/flink/pull/6994#discussion_r230439584 ## File path: flink-end-to-end-tests/test-scripts/test_heavy_deployment.sh ## @@ -0,0 +1,50 @@ +#!/usr/bin/env bash + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +source "$(dirname "$0")"/common.sh + +CHECKPOINT_DIR="file://$TEST_DATA_DIR/savepoint-e2e-test-chckpt-dir" + +TEST=flink-heavy-deployment-stress-test +TEST_PROGRAM_NAME=HeavyDeploymentStressTestProgram +TEST_PROGRAM_JAR=${END_TO_END_DIR}/$TEST/target/$TEST_PROGRAM_NAME.jar + +set_conf "taskmanager.heap.mb" "256" # 256Mb x 20 TMs = 5Gb total heap + +set_conf "taskmanager.memory.size" "8" # 8Mb +set_conf "taskmanager.network.memory.min" "8mb" +set_conf "taskmanager.network.memory.max" "8mb" +set_conf "taskmanager.memory.segment-size" "8kb" + +set_conf "taskmanager.numberOfTaskSlots" "11" + +start_cluster + +NUM_TMS=20 + +#20 x 13 slots to support a parallelism of 256 Review comment: Comment is not correct. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tillrohrmann commented on a change in pull request #6994: [FLINK-10720][tests] Add deployment end-to-end stress test with many …
tillrohrmann commented on a change in pull request #6994: [FLINK-10720][tests] Add deployment end-to-end stress test with many … URL: https://github.com/apache/flink/pull/6994#discussion_r230440331 ## File path: flink-end-to-end-tests/test-scripts/test_heavy_deployment.sh ## @@ -0,0 +1,50 @@ +#!/usr/bin/env bash + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +source "$(dirname "$0")"/common.sh + +CHECKPOINT_DIR="file://$TEST_DATA_DIR/savepoint-e2e-test-chckpt-dir" + +TEST=flink-heavy-deployment-stress-test +TEST_PROGRAM_NAME=HeavyDeploymentStressTestProgram +TEST_PROGRAM_JAR=${END_TO_END_DIR}/$TEST/target/$TEST_PROGRAM_NAME.jar + +set_conf "taskmanager.heap.mb" "256" # 256Mb x 20 TMs = 5Gb total heap + +set_conf "taskmanager.memory.size" "8" # 8Mb +set_conf "taskmanager.network.memory.min" "8mb" +set_conf "taskmanager.network.memory.max" "8mb" +set_conf "taskmanager.memory.segment-size" "8kb" + +set_conf "taskmanager.numberOfTaskSlots" "11" + +start_cluster + +NUM_TMS=20 + +#20 x 13 slots to support a parallelism of 256 +echo "Start $NUM_TMS more task managers" +for i in `seq 1 $NUM_TMS`; do +$FLINK_DIR/bin/taskmanager.sh start +done + +$FLINK_DIR/bin/flink run -p 200 ${TEST_PROGRAM_JAR} \ +--environment.max_parallelism 1024 --environment.parallelism 200 \ Review comment: `-p 200` should not be needed if we set `--environment.parallelism`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tillrohrmann commented on a change in pull request #6994: [FLINK-10720][tests] Add deployment end-to-end stress test with many …
tillrohrmann commented on a change in pull request #6994: [FLINK-10720][tests] Add deployment end-to-end stress test with many … URL: https://github.com/apache/flink/pull/6994#discussion_r230439813 ## File path: flink-end-to-end-tests/test-scripts/test_heavy_deployment.sh ## @@ -0,0 +1,50 @@ +#!/usr/bin/env bash + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +source "$(dirname "$0")"/common.sh + +CHECKPOINT_DIR="file://$TEST_DATA_DIR/savepoint-e2e-test-chckpt-dir" + +TEST=flink-heavy-deployment-stress-test +TEST_PROGRAM_NAME=HeavyDeploymentStressTestProgram +TEST_PROGRAM_JAR=${END_TO_END_DIR}/$TEST/target/$TEST_PROGRAM_NAME.jar + +set_conf "taskmanager.heap.mb" "256" # 256Mb x 20 TMs = 5Gb total heap + +set_conf "taskmanager.memory.size" "8" # 8Mb +set_conf "taskmanager.network.memory.min" "8mb" +set_conf "taskmanager.network.memory.max" "8mb" +set_conf "taskmanager.memory.segment-size" "8kb" + +set_conf "taskmanager.numberOfTaskSlots" "11" + +start_cluster + +NUM_TMS=20 + +#20 x 13 slots to support a parallelism of 256 +echo "Start $NUM_TMS more task managers" +for i in `seq 1 $NUM_TMS`; do +$FLINK_DIR/bin/taskmanager.sh start +done Review comment: I think it's better to use `common.sh#start_taskmanagers` here. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tillrohrmann commented on a change in pull request #6994: [FLINK-10720][tests] Add deployment end-to-end stress test with many …
tillrohrmann commented on a change in pull request #6994: [FLINK-10720][tests] Add deployment end-to-end stress test with many … URL: https://github.com/apache/flink/pull/6994#discussion_r230443577 ## File path: flink-end-to-end-tests/flink-heavy-deployment-stress-test/pom.xml ## @@ -0,0 +1,99 @@ + + +http://maven.apache.org/POM/4.0.0; xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; +xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd;> + + 4.0.0 + + + org.apache.flink + flink-end-to-end-tests + 1.7-SNAPSHOT + .. + + + flink-heavy-deployment-stress-test + flink-heavy-deployment-stress-test + jar + + + + org.apache.flink + flink-core + ${project.version} + + + org.apache.flink + flink-java + ${project.version} + + + org.apache.flink + flink-streaming-java_2.11 + ${project.version} + + + org.apache.flink + flink-datastream-allround-test + ${project.version} + + + junit + junit + Review comment: Not needed imo This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tillrohrmann commented on a change in pull request #6994: [FLINK-10720][tests] Add deployment end-to-end stress test with many …
tillrohrmann commented on a change in pull request #6994: [FLINK-10720][tests] Add deployment end-to-end stress test with many … URL: https://github.com/apache/flink/pull/6994#discussion_r230443678 ## File path: flink-end-to-end-tests/flink-heavy-deployment-stress-test/pom.xml ## @@ -0,0 +1,99 @@ + + +http://maven.apache.org/POM/4.0.0; xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; +xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd;> + + 4.0.0 + + + org.apache.flink + flink-end-to-end-tests + 1.7-SNAPSHOT + .. + + + flink-heavy-deployment-stress-test + flink-heavy-deployment-stress-test + jar + + + + org.apache.flink + flink-core + ${project.version} + + + org.apache.flink + flink-java + ${project.version} + + + org.apache.flink + flink-streaming-java_2.11 + ${project.version} + + + org.apache.flink + flink-datastream-allround-test + ${project.version} + + + junit + junit + + + org.apache.flink + flink-test-utils_2.11 + ${project.version} + compile + + + log4j + log4j + Review comment: I don't think that we need this dependency. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tillrohrmann commented on a change in pull request #6994: [FLINK-10720][tests] Add deployment end-to-end stress test with many …
tillrohrmann commented on a change in pull request #6994: [FLINK-10720][tests] Add deployment end-to-end stress test with many … URL: https://github.com/apache/flink/pull/6994#discussion_r230443405 ## File path: flink-end-to-end-tests/flink-heavy-deployment-stress-test/src/main/java/org/apache/flink/deployment/HeavyDeploymentStressTestProgram.java ## @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.deployment; + +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.OperatorStateStore; +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; +import org.apache.flink.runtime.state.CheckpointListener; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.DiscardingSink; +import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; +import org.apache.flink.util.Preconditions; + +import static org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.setupEnvironment; + + +/** + * End-to-end test for heavy deployment descriptors. This test creates a heavy deployment by producing inflated meta + * data for the source's operator state. The state is registered as union state and will be multiplied in deployment. + */ +public class HeavyDeploymentStressTestProgram { + + private static final ConfigOption NUM_LIST_STATES_PER_OP = ConfigOptions + .key("heavy_deployment_test.num_list_states_per_op") + .defaultValue(100); + + private static final ConfigOption NUM_PARTITIONS_PER_LIST_STATE = ConfigOptions + .key("heavy_deployment_test.num_partitions_per_list_state") + .defaultValue(100); + + public static void main(String[] args) throws Exception { + + final ParameterTool pt = ParameterTool.fromArgs(args); + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + setupEnvironment(env, pt); + + final int numStates = + pt.getInt(NUM_LIST_STATES_PER_OP.key(), NUM_LIST_STATES_PER_OP.defaultValue()); + final int numPartitionsPerState = + pt.getInt(NUM_PARTITIONS_PER_LIST_STATE.key(), NUM_PARTITIONS_PER_LIST_STATE.defaultValue()); + + Preconditions.checkState(env.getCheckpointInterval() > 0L, "Checkpointing must be enabled for this test!"); + + env.addSource(new SimpleEndlessSourceWithBloatedState(numStates, numPartitionsPerState)).setParallelism(env.getParallelism()) + .addSink(new DiscardingSink<>()).setParallelism(1); + + env.execute("HeavyDeploymentStressTestProgram"); + } + + /** +* Source with dummy operator state that results in inflated meta data. +*/ + static class SimpleEndlessSourceWithBloatedState extends RichParallelSourceFunction Review comment: `serialVersionUID` missing This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tillrohrmann commented on a change in pull request #6994: [FLINK-10720][tests] Add deployment end-to-end stress test with many …
tillrohrmann commented on a change in pull request #6994: [FLINK-10720][tests] Add deployment end-to-end stress test with many … URL: https://github.com/apache/flink/pull/6994#discussion_r230441106 ## File path: flink-end-to-end-tests/test-scripts/test_heavy_deployment.sh ## @@ -0,0 +1,50 @@ +#!/usr/bin/env bash + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +source "$(dirname "$0")"/common.sh + +CHECKPOINT_DIR="file://$TEST_DATA_DIR/savepoint-e2e-test-chckpt-dir" + +TEST=flink-heavy-deployment-stress-test +TEST_PROGRAM_NAME=HeavyDeploymentStressTestProgram +TEST_PROGRAM_JAR=${END_TO_END_DIR}/$TEST/target/$TEST_PROGRAM_NAME.jar + +set_conf "taskmanager.heap.mb" "256" # 256Mb x 20 TMs = 5Gb total heap + +set_conf "taskmanager.memory.size" "8" # 8Mb +set_conf "taskmanager.network.memory.min" "8mb" +set_conf "taskmanager.network.memory.max" "8mb" +set_conf "taskmanager.memory.segment-size" "8kb" + +set_conf "taskmanager.numberOfTaskSlots" "11" + +start_cluster + +NUM_TMS=20 + +#20 x 13 slots to support a parallelism of 256 +echo "Start $NUM_TMS more task managers" +for i in `seq 1 $NUM_TMS`; do +$FLINK_DIR/bin/taskmanager.sh start +done + +$FLINK_DIR/bin/flink run -p 200 ${TEST_PROGRAM_JAR} \ +--environment.max_parallelism 1024 --environment.parallelism 200 \ Review comment: Why don't we use all slots? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tillrohrmann commented on a change in pull request #6994: [FLINK-10720][tests] Add deployment end-to-end stress test with many …
tillrohrmann commented on a change in pull request #6994: [FLINK-10720][tests] Add deployment end-to-end stress test with many … URL: https://github.com/apache/flink/pull/6994#discussion_r230443486 ## File path: flink-end-to-end-tests/flink-heavy-deployment-stress-test/pom.xml ## @@ -0,0 +1,99 @@ + + +http://maven.apache.org/POM/4.0.0; xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; +xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd;> + + 4.0.0 + + + org.apache.flink + flink-end-to-end-tests + 1.7-SNAPSHOT + .. + + + flink-heavy-deployment-stress-test + flink-heavy-deployment-stress-test + jar + + + + org.apache.flink + flink-core + ${project.version} + + + org.apache.flink + flink-java + ${project.version} + Review comment: I think we don't need these two dependencies. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tillrohrmann commented on a change in pull request #6994: [FLINK-10720][tests] Add deployment end-to-end stress test with many …
tillrohrmann commented on a change in pull request #6994: [FLINK-10720][tests] Add deployment end-to-end stress test with many … URL: https://github.com/apache/flink/pull/6994#discussion_r230440912 ## File path: flink-end-to-end-tests/test-scripts/test_heavy_deployment.sh ## @@ -0,0 +1,50 @@ +#!/usr/bin/env bash + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +source "$(dirname "$0")"/common.sh + +CHECKPOINT_DIR="file://$TEST_DATA_DIR/savepoint-e2e-test-chckpt-dir" + +TEST=flink-heavy-deployment-stress-test +TEST_PROGRAM_NAME=HeavyDeploymentStressTestProgram +TEST_PROGRAM_JAR=${END_TO_END_DIR}/$TEST/target/$TEST_PROGRAM_NAME.jar + +set_conf "taskmanager.heap.mb" "256" # 256Mb x 20 TMs = 5Gb total heap + +set_conf "taskmanager.memory.size" "8" # 8Mb +set_conf "taskmanager.network.memory.min" "8mb" +set_conf "taskmanager.network.memory.max" "8mb" +set_conf "taskmanager.memory.segment-size" "8kb" + +set_conf "taskmanager.numberOfTaskSlots" "11" + +start_cluster Review comment: `start_cluster` already starts a single TM. Thus, we should have 21 TMs running. Is this correct? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] zentol commented on a change in pull request #7003: [FLINK-10633][prometheus] Add E2E test
zentol commented on a change in pull request #7003: [FLINK-10633][prometheus] Add E2E test URL: https://github.com/apache/flink/pull/7003#discussion_r230443206 ## File path: flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/FlinkDistribution.java ## @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.tests.util; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.configuration.UnmodifiableConfiguration; +import org.apache.flink.util.ExceptionUtils; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; + +import okhttp3.OkHttpClient; +import okhttp3.Request; +import okhttp3.Response; +import org.junit.Assert; +import org.junit.rules.ExternalResource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStreamReader; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.StandardCopyOption; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Optional; +import java.util.function.Function; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * A wrapper around a Flink distribution. + */ +public final class FlinkDistribution extends ExternalResource { + + private static final Logger LOG = LoggerFactory.getLogger(FlinkDistribution.class); + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + private final List filesToDelete = new ArrayList<>(4); + + private static final Path FLINK_CONF_YAML = Paths.get("flink-conf.yaml"); + private static final Path FLINK_CONF_YAML_BACKUP = Paths.get("flink-conf.yaml.bak"); + + private final Path opt; + private final Path lib; + private final Path conf; + private final Path log; + private final Path bin; + + private Configuration defaultConfig; + + public FlinkDistribution() { + final String distDirProperty = System.getProperty("distDir"); + if (distDirProperty == null) { + Assert.fail("The distDir property was not set. You can set it when running maven via -DdistDir= ."); + } + final Path flinkDir = Paths.get(distDirProperty); + bin = flinkDir.resolve("bin"); + opt = flinkDir.resolve("opt"); + lib = flinkDir.resolve("lib"); + conf = flinkDir.resolve("conf"); + log = flinkDir.resolve("log"); + } + + @Override + protected void before() throws IOException { + defaultConfig = new UnmodifiableConfiguration(GlobalConfiguration.loadConfiguration(conf.toAbsolutePath().toString())); + final Path originalConfig = conf.resolve(FLINK_CONF_YAML); + final Path backupConfig = conf.resolve(FLINK_CONF_YAML_BACKUP); + Files.copy(originalConfig, backupConfig); + filesToDelete.add(new AutoClosablePath(backupConfig)); + } + + @Override + protected void after() { + try { + stopFlinkCluster(); + } catch (IOException e) { + LOG.error("Failure while shutting down Flink cluster.", e); + } + + final Path originalConfig = conf.resolve(FLINK_CONF_YAML); + final Path backupConfig = conf.resolve(FLINK_CONF_YAML_BACKUP); + + try { + Files.move(backupConfig, originalConfig, StandardCopyOption.REPLACE_EXISTING); + } catch (IOException e) { + LOG.error("Failed to restore flink-conf.yaml", e); + } + +
[jira] [Commented] (FLINK-10633) End-to-end test: Prometheus reporter
[ https://issues.apache.org/jira/browse/FLINK-10633?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16673414#comment-16673414 ] ASF GitHub Bot commented on FLINK-10633: zentol commented on a change in pull request #7003: [FLINK-10633][prometheus] Add E2E test URL: https://github.com/apache/flink/pull/7003#discussion_r230443206 ## File path: flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/FlinkDistribution.java ## @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.tests.util; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.configuration.UnmodifiableConfiguration; +import org.apache.flink.util.ExceptionUtils; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; + +import okhttp3.OkHttpClient; +import okhttp3.Request; +import okhttp3.Response; +import org.junit.Assert; +import org.junit.rules.ExternalResource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStreamReader; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.StandardCopyOption; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Optional; +import java.util.function.Function; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * A wrapper around a Flink distribution. + */ +public final class FlinkDistribution extends ExternalResource { + + private static final Logger LOG = LoggerFactory.getLogger(FlinkDistribution.class); + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + private final List filesToDelete = new ArrayList<>(4); + + private static final Path FLINK_CONF_YAML = Paths.get("flink-conf.yaml"); + private static final Path FLINK_CONF_YAML_BACKUP = Paths.get("flink-conf.yaml.bak"); + + private final Path opt; + private final Path lib; + private final Path conf; + private final Path log; + private final Path bin; + + private Configuration defaultConfig; + + public FlinkDistribution() { + final String distDirProperty = System.getProperty("distDir"); + if (distDirProperty == null) { + Assert.fail("The distDir property was not set. You can set it when running maven via -DdistDir= ."); + } + final Path flinkDir = Paths.get(distDirProperty); + bin = flinkDir.resolve("bin"); + opt = flinkDir.resolve("opt"); + lib = flinkDir.resolve("lib"); + conf = flinkDir.resolve("conf"); + log = flinkDir.resolve("log"); + } + + @Override + protected void before() throws IOException { + defaultConfig = new UnmodifiableConfiguration(GlobalConfiguration.loadConfiguration(conf.toAbsolutePath().toString())); + final Path originalConfig = conf.resolve(FLINK_CONF_YAML); + final Path backupConfig = conf.resolve(FLINK_CONF_YAML_BACKUP); + Files.copy(originalConfig, backupConfig); + filesToDelete.add(new AutoClosablePath(backupConfig)); + } + + @Override + protected void after() { + try { + stopFlinkCluster(); + } catch (IOException e) { + LOG.error("Failure while shutting down Flink cluster.", e); + } + + final Path originalConfig = conf.resolve(FLINK_CONF_YAML); + final Path backupConfig = conf.resolve(FLINK_CONF_YAML_BACKUP); + + try {
[jira] [Commented] (FLINK-10732) force-shading 1.5.5 maven artifact was not released
[ https://issues.apache.org/jira/browse/FLINK-10732?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16673412#comment-16673412 ] Chesnay Schepler commented on FLINK-10732: -- True, we could also switch to {{flink-shaded-force-shading}} which we've already had for over a year, however this doesn't solve the immediate issue of an unreleased artifact of a release. > force-shading 1.5.5 maven artifact was not released > --- > > Key: FLINK-10732 > URL: https://issues.apache.org/jira/browse/FLINK-10732 > Project: Flink > Issue Type: Bug >Affects Versions: 1.5.5 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Fix For: 1.5.5 > > > The 1.5.5 maven artifact for {{force-shading}} was not deployed. We have to > investigate whether other artifacts are missing as well and how to remedy the > problem, i.e. re-deploy the missing artifacts (which may or may not require a > separate vote). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10356) Add sanity checks to SpillingAdaptiveSpanningRecordDeserializer
[ https://issues.apache.org/jira/browse/FLINK-10356?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16673405#comment-16673405 ] ASF GitHub Bot commented on FLINK-10356: NicoK commented on a change in pull request #6705: [FLINK-10356][network] add sanity checks to SpillingAdaptiveSpanningRecordDeserializer URL: https://github.com/apache/flink/pull/6705#discussion_r230440178 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java ## @@ -104,11 +120,236 @@ public void testHandleMixedLargeRecords() throws Exception { testSerializationRoundTrip(originalRecords, segmentSize); } + /** +* Non-spanning, deserialization reads one byte too many and succeeds. +*/ + @Test + public void testHandleDeserializingTooMuchNonSpanning1() throws Exception { + testHandleWrongDeserialization( + new StringValueDeserializingTooMuch("Test string"), + 32 * 1024, + null); + } + + /** +* Non-spanning, serialization length is 16 (including headers), deserialization reads one byte +* too many and succeeds. +*/ + @Test + public void testHandleDeserializingTooMuchNonSpanning2() throws Exception { + testHandleWrongDeserialization( + new StringValueDeserializingTooMuch("Test string"), Review comment: I'm not quite getting where you are going with this? Do you want it randomised? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add sanity checks to SpillingAdaptiveSpanningRecordDeserializer > --- > > Key: FLINK-10356 > URL: https://issues.apache.org/jira/browse/FLINK-10356 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.5.0, 1.5.1, 1.5.2, 1.5.3, 1.5.4, 1.6.0, 1.6.1, 1.7.0 >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Major > Labels: pull-request-available > > {{SpillingAdaptiveSpanningRecordDeserializer}} doesn't have any consistency > checks for usage calls or serializers behaving properly, e.g. to read only as > many bytes as available/promised for that record. At least these checks > should be added: > # Check that buffers have not been read from yet before adding them (this is > an invariant {{SpillingAdaptiveSpanningRecordDeserializer}} works with and > from what I can see, it is followed now. > # Check that after deserialization, we actually consumed {{recordLength}} > bytes > ** If not, in the spanning deserializer, we currently simply skip the > remaining bytes. > ** But in the non-spanning deserializer, we currently continue from the > wrong offset. > # Protect against {{setNextBuffer}} being called before draining all > available records -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] NicoK commented on a change in pull request #6705: [FLINK-10356][network] add sanity checks to SpillingAdaptiveSpanningRecordDeserializer
NicoK commented on a change in pull request #6705: [FLINK-10356][network] add sanity checks to SpillingAdaptiveSpanningRecordDeserializer URL: https://github.com/apache/flink/pull/6705#discussion_r230440178 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java ## @@ -104,11 +120,236 @@ public void testHandleMixedLargeRecords() throws Exception { testSerializationRoundTrip(originalRecords, segmentSize); } + /** +* Non-spanning, deserialization reads one byte too many and succeeds. +*/ + @Test + public void testHandleDeserializingTooMuchNonSpanning1() throws Exception { + testHandleWrongDeserialization( + new StringValueDeserializingTooMuch("Test string"), + 32 * 1024, + null); + } + + /** +* Non-spanning, serialization length is 16 (including headers), deserialization reads one byte +* too many and succeeds. +*/ + @Test + public void testHandleDeserializingTooMuchNonSpanning2() throws Exception { + testHandleWrongDeserialization( + new StringValueDeserializingTooMuch("Test string"), Review comment: I'm not quite getting where you are going with this? Do you want it randomised? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10712) RestartPipelinedRegionStrategy does not restore state
[ https://issues.apache.org/jira/browse/FLINK-10712?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16673402#comment-16673402 ] ASF GitHub Bot commented on FLINK-10712: Myasuka opened a new pull request #7009: [FLINK-10712] Support to restore state when using RestartPipelinedRegionStrategy URL: https://github.com/apache/flink/pull/7009 ## What is the purpose of the change Currently, RestartPipelinedRegionStrategy does not perform any state restore. This is big problem because all restored regions will be restarted with empty state. This PR supports to restore state when using RestartPipelinedRegionStrategy. ## Brief change log - Implement new `restoreLatestCheckpointedState` API for region-based failover in `CheckpointCoordinator`. - Reload checkpointed state when `FailoverRegion` called `restart` method. - `StateAssignmentOperation` could assign state with given executionVertices. ## Verifying this change This change added tests and can be verified as follows: - Added unit tests for `FailoverRegion` to ensure the failover region ever called new `restoreLatestCheckpointedState` API within `CheckpointCoordinator`. - Added unit tests for `CheckpointCoordinatorTest` to ensure `CheckpointCoordinator` could restore with `RestartPipelinedRegionStrategy`. - Added unit tests for `CheckpointStateRestoreTest` to ensure `RestartPipelinedRegionStrategy` could handle well when restoring state from a checkpoint to the task executions. - Added new integration test `RegionFailoverITCase` to verify state could be restored properly when the job consists multi regions. - Refactored `StreamFaultToleranceTestBase` to let all sub-classes ITs could failover with state using RestartPipelinedRegionStrategy. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): don't know - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? yes - If yes, how is the feature documented? JavaDocs This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > RestartPipelinedRegionStrategy does not restore state > - > > Key: FLINK-10712 > URL: https://issues.apache.org/jira/browse/FLINK-10712 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination >Affects Versions: 1.3.3, 1.4.2, 1.5.5, 1.6.2, 1.7.0 >Reporter: Stefan Richter >Assignee: Yun Tang >Priority: Critical > Labels: pull-request-available > Fix For: 1.7.0 > > > RestartPipelinedRegionStrategy does not perform any state restore. This is > big problem because all restored regions will be restarted with empty state. > We need to take checkpoints into account when restoring. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-10712) RestartPipelinedRegionStrategy does not restore state
[ https://issues.apache.org/jira/browse/FLINK-10712?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-10712: --- Labels: pull-request-available (was: ) > RestartPipelinedRegionStrategy does not restore state > - > > Key: FLINK-10712 > URL: https://issues.apache.org/jira/browse/FLINK-10712 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination >Affects Versions: 1.3.3, 1.4.2, 1.5.5, 1.6.2, 1.7.0 >Reporter: Stefan Richter >Assignee: Yun Tang >Priority: Critical > Labels: pull-request-available > Fix For: 1.7.0 > > > RestartPipelinedRegionStrategy does not perform any state restore. This is > big problem because all restored regions will be restarted with empty state. > We need to take checkpoints into account when restoring. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] Myasuka opened a new pull request #7009: [FLINK-10712] Support to restore state when using RestartPipelinedRegionStrategy
Myasuka opened a new pull request #7009: [FLINK-10712] Support to restore state when using RestartPipelinedRegionStrategy URL: https://github.com/apache/flink/pull/7009 ## What is the purpose of the change Currently, RestartPipelinedRegionStrategy does not perform any state restore. This is big problem because all restored regions will be restarted with empty state. This PR supports to restore state when using RestartPipelinedRegionStrategy. ## Brief change log - Implement new `restoreLatestCheckpointedState` API for region-based failover in `CheckpointCoordinator`. - Reload checkpointed state when `FailoverRegion` called `restart` method. - `StateAssignmentOperation` could assign state with given executionVertices. ## Verifying this change This change added tests and can be verified as follows: - Added unit tests for `FailoverRegion` to ensure the failover region ever called new `restoreLatestCheckpointedState` API within `CheckpointCoordinator`. - Added unit tests for `CheckpointCoordinatorTest` to ensure `CheckpointCoordinator` could restore with `RestartPipelinedRegionStrategy`. - Added unit tests for `CheckpointStateRestoreTest` to ensure `RestartPipelinedRegionStrategy` could handle well when restoring state from a checkpoint to the task executions. - Added new integration test `RegionFailoverITCase` to verify state could be restored properly when the job consists multi regions. - Refactored `StreamFaultToleranceTestBase` to let all sub-classes ITs could failover with state using RestartPipelinedRegionStrategy. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): don't know - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? yes - If yes, how is the feature documented? JavaDocs This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10633) End-to-end test: Prometheus reporter
[ https://issues.apache.org/jira/browse/FLINK-10633?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16673398#comment-16673398 ] ASF GitHub Bot commented on FLINK-10633: kl0u commented on issue #7003: [FLINK-10633][prometheus] Add E2E test URL: https://github.com/apache/flink/pull/7003#issuecomment-435440138 And one more, a more in-depth discussion and a design doc could be very helpful to correctly design a testing framework for e2e testing. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > End-to-end test: Prometheus reporter > > > Key: FLINK-10633 > URL: https://issues.apache.org/jira/browse/FLINK-10633 > Project: Flink > Issue Type: Sub-task > Components: E2E Tests >Affects Versions: 1.7.0 >Reporter: Till Rohrmann >Assignee: Chesnay Schepler >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > Add an end-to-end test using the {{PrometheusReporter}} to verify that all > metrics are properly reported. Additionally verify that the newly introduce > RocksDB metrics are accessible (see FLINK-10423). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] kl0u commented on issue #7003: [FLINK-10633][prometheus] Add E2E test
kl0u commented on issue #7003: [FLINK-10633][prometheus] Add E2E test URL: https://github.com/apache/flink/pull/7003#issuecomment-435440138 And one more, a more in-depth discussion and a design doc could be very helpful to correctly design a testing framework for e2e testing. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10633) End-to-end test: Prometheus reporter
[ https://issues.apache.org/jira/browse/FLINK-10633?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16673391#comment-16673391 ] ASF GitHub Bot commented on FLINK-10633: kl0u edited a comment on issue #7003: [FLINK-10633][prometheus] Add E2E test URL: https://github.com/apache/flink/pull/7003#issuecomment-435437266 Btw, three more notes: 1) there should be a discussion about how a testing framework should look like in the future, so that we are all on-board 2) it would be nice to have more javadocs 3) I would also suggest (slowly but steadily) to move away from `sed` or `wget` and stuff (in future JIRAs). This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > End-to-end test: Prometheus reporter > > > Key: FLINK-10633 > URL: https://issues.apache.org/jira/browse/FLINK-10633 > Project: Flink > Issue Type: Sub-task > Components: E2E Tests >Affects Versions: 1.7.0 >Reporter: Till Rohrmann >Assignee: Chesnay Schepler >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > Add an end-to-end test using the {{PrometheusReporter}} to verify that all > metrics are properly reported. Additionally verify that the newly introduce > RocksDB metrics are accessible (see FLINK-10423). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] kl0u edited a comment on issue #7003: [FLINK-10633][prometheus] Add E2E test
kl0u edited a comment on issue #7003: [FLINK-10633][prometheus] Add E2E test URL: https://github.com/apache/flink/pull/7003#issuecomment-435437266 Btw, three more notes: 1) there should be a discussion about how a testing framework should look like in the future, so that we are all on-board 2) it would be nice to have more javadocs 3) I would also suggest (slowly but steadily) to move away from `sed` or `wget` and stuff (in future JIRAs). This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10729) Create a Hive connector for Hive data access in Flink
[ https://issues.apache.org/jira/browse/FLINK-10729?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16673382#comment-16673382 ] Xuefu Zhang commented on FLINK-10729: - @li Thanks for your interest. Yes, metadata integration is in progress and tracked by FLINK-10744. This one covers data aspect. We can only declare integration is is completed after the two are in place. Your contribution is certainly welcome. BTW, I'm currently coming up with a rough design and will be shared soon. I can image that a prototype is needed. > Create a Hive connector for Hive data access in Flink > - > > Key: FLINK-10729 > URL: https://issues.apache.org/jira/browse/FLINK-10729 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Affects Versions: 1.6.2 >Reporter: Xuefu Zhang >Assignee: Xuefu Zhang >Priority: Major > > As part of Flink-Hive integration effort, it's important for Flink to access > (read/write) Hive data, which is the responsibility of Hive connector. While > there is a HCatalog data connector in the code base, it's not complete (i.e. > missing all connector related classes such as validators, etc.). Further, > HCatalog interface has many limitations such as accessing a subset of Hive > data, supporting a subset of Hive data types, etc. In addition, it's not > actively maintained. In fact, it's now only a sub-project in Hive. > Therefore, here we propose a complete connector set for Hive tables, not via > HCatalog, but via direct Hive interface. HCatalog connector will be > deprecated. > Please note that connector on Hive metadata is already covered in other > JIRAs, as {{HiveExternalCatalog}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10633) End-to-end test: Prometheus reporter
[ https://issues.apache.org/jira/browse/FLINK-10633?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16673379#comment-16673379 ] ASF GitHub Bot commented on FLINK-10633: kl0u commented on issue #7003: [FLINK-10633][prometheus] Add E2E test URL: https://github.com/apache/flink/pull/7003#issuecomment-435437266 Btw, two more general notes: 1) it would be nice to have more javadocs 2) I would also suggest (slowly but steadily) to move away from `sed` or `wget` and stuff (in future JIRAs). This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > End-to-end test: Prometheus reporter > > > Key: FLINK-10633 > URL: https://issues.apache.org/jira/browse/FLINK-10633 > Project: Flink > Issue Type: Sub-task > Components: E2E Tests >Affects Versions: 1.7.0 >Reporter: Till Rohrmann >Assignee: Chesnay Schepler >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > Add an end-to-end test using the {{PrometheusReporter}} to verify that all > metrics are properly reported. Additionally verify that the newly introduce > RocksDB metrics are accessible (see FLINK-10423). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] kl0u commented on issue #7003: [FLINK-10633][prometheus] Add E2E test
kl0u commented on issue #7003: [FLINK-10633][prometheus] Add E2E test URL: https://github.com/apache/flink/pull/7003#issuecomment-435437266 Btw, two more general notes: 1) it would be nice to have more javadocs 2) I would also suggest (slowly but steadily) to move away from `sed` or `wget` and stuff (in future JIRAs). This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10633) End-to-end test: Prometheus reporter
[ https://issues.apache.org/jira/browse/FLINK-10633?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16673343#comment-16673343 ] ASF GitHub Bot commented on FLINK-10633: kl0u commented on a change in pull request #7003: [FLINK-10633][prometheus] Add E2E test URL: https://github.com/apache/flink/pull/7003#discussion_r230429154 ## File path: flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/FlinkDistribution.java ## @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.tests.util; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.configuration.UnmodifiableConfiguration; +import org.apache.flink.util.ExceptionUtils; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; + +import okhttp3.OkHttpClient; +import okhttp3.Request; +import okhttp3.Response; +import org.junit.Assert; +import org.junit.rules.ExternalResource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStreamReader; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.StandardCopyOption; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Optional; +import java.util.function.Function; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * A wrapper around a Flink distribution. + */ +public final class FlinkDistribution extends ExternalResource { + + private static final Logger LOG = LoggerFactory.getLogger(FlinkDistribution.class); + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + private final List filesToDelete = new ArrayList<>(4); + + private static final Path FLINK_CONF_YAML = Paths.get("flink-conf.yaml"); + private static final Path FLINK_CONF_YAML_BACKUP = Paths.get("flink-conf.yaml.bak"); + + private final Path opt; + private final Path lib; + private final Path conf; + private final Path log; + private final Path bin; + + private Configuration defaultConfig; + + public FlinkDistribution() { + final String distDirProperty = System.getProperty("distDir"); + if (distDirProperty == null) { + Assert.fail("The distDir property was not set. You can set it when running maven via -DdistDir= ."); + } + final Path flinkDir = Paths.get(distDirProperty); + bin = flinkDir.resolve("bin"); + opt = flinkDir.resolve("opt"); + lib = flinkDir.resolve("lib"); + conf = flinkDir.resolve("conf"); + log = flinkDir.resolve("log"); + } + + @Override + protected void before() throws IOException { + defaultConfig = new UnmodifiableConfiguration(GlobalConfiguration.loadConfiguration(conf.toAbsolutePath().toString())); + final Path originalConfig = conf.resolve(FLINK_CONF_YAML); + final Path backupConfig = conf.resolve(FLINK_CONF_YAML_BACKUP); + Files.copy(originalConfig, backupConfig); + filesToDelete.add(new AutoClosablePath(backupConfig)); + } + + @Override + protected void after() { + try { + stopFlinkCluster(); + } catch (IOException e) { + LOG.error("Failure while shutting down Flink cluster.", e); + } + + final Path originalConfig = conf.resolve(FLINK_CONF_YAML); + final Path backupConfig = conf.resolve(FLINK_CONF_YAML_BACKUP); + + try { +
[GitHub] kl0u commented on a change in pull request #7003: [FLINK-10633][prometheus] Add E2E test
kl0u commented on a change in pull request #7003: [FLINK-10633][prometheus] Add E2E test URL: https://github.com/apache/flink/pull/7003#discussion_r230429154 ## File path: flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/FlinkDistribution.java ## @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.tests.util; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.configuration.UnmodifiableConfiguration; +import org.apache.flink.util.ExceptionUtils; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; + +import okhttp3.OkHttpClient; +import okhttp3.Request; +import okhttp3.Response; +import org.junit.Assert; +import org.junit.rules.ExternalResource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStreamReader; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.StandardCopyOption; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Optional; +import java.util.function.Function; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * A wrapper around a Flink distribution. + */ +public final class FlinkDistribution extends ExternalResource { + + private static final Logger LOG = LoggerFactory.getLogger(FlinkDistribution.class); + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + private final List filesToDelete = new ArrayList<>(4); + + private static final Path FLINK_CONF_YAML = Paths.get("flink-conf.yaml"); + private static final Path FLINK_CONF_YAML_BACKUP = Paths.get("flink-conf.yaml.bak"); + + private final Path opt; + private final Path lib; + private final Path conf; + private final Path log; + private final Path bin; + + private Configuration defaultConfig; + + public FlinkDistribution() { + final String distDirProperty = System.getProperty("distDir"); + if (distDirProperty == null) { + Assert.fail("The distDir property was not set. You can set it when running maven via -DdistDir= ."); + } + final Path flinkDir = Paths.get(distDirProperty); + bin = flinkDir.resolve("bin"); + opt = flinkDir.resolve("opt"); + lib = flinkDir.resolve("lib"); + conf = flinkDir.resolve("conf"); + log = flinkDir.resolve("log"); + } + + @Override + protected void before() throws IOException { + defaultConfig = new UnmodifiableConfiguration(GlobalConfiguration.loadConfiguration(conf.toAbsolutePath().toString())); + final Path originalConfig = conf.resolve(FLINK_CONF_YAML); + final Path backupConfig = conf.resolve(FLINK_CONF_YAML_BACKUP); + Files.copy(originalConfig, backupConfig); + filesToDelete.add(new AutoClosablePath(backupConfig)); + } + + @Override + protected void after() { + try { + stopFlinkCluster(); + } catch (IOException e) { + LOG.error("Failure while shutting down Flink cluster.", e); + } + + final Path originalConfig = conf.resolve(FLINK_CONF_YAML); + final Path backupConfig = conf.resolve(FLINK_CONF_YAML_BACKUP); + + try { + Files.move(backupConfig, originalConfig, StandardCopyOption.REPLACE_EXISTING); + } catch (IOException e) { + LOG.error("Failed to restore flink-conf.yaml", e); + } + +
[jira] [Commented] (FLINK-10455) Potential Kafka producer leak in case of failures
[ https://issues.apache.org/jira/browse/FLINK-10455?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16673336#comment-16673336 ] ASF GitHub Bot commented on FLINK-10455: pnowojski commented on issue #6989: [FLINK-10455][Kafka Tx] Close transactional producers in case of failure and termination URL: https://github.com/apache/flink/pull/6989#issuecomment-435429850 Regarding your question @azagrebin, I think my bug fix for https://issues.apache.org/jira/browse/FLINK-8086 (which added "ignore ProducerFencedException") indeed might turned the @GJL's timeouts code: ``` } catch (final Exception e) { final long elapsedTime = clock.millis() - transactionHolder.transactionStartTime; if (ignoreFailuresAfterTransactionTimeout && elapsedTime > transactionTimeout) { LOG.error("Error while committing transaction {}. " + "Transaction has been open for longer than the transaction timeout ({})." + "Commit will not be attempted again. Data loss might have occurred.", transactionHolder.handle, transactionTimeout, e); } else { throw e; } } ``` into a dead code at the moment. But I think my bug fix is more important (from my commit message): > [FLINK-8086][kafka] Ignore ProducerFencedException > > during recovery ProducerFencedException can happen if we restore twice from the same checkpoint or if we restore from an old savepoint. In both cases transactional.ids that we want to recoverAndCommit have been already committed and reused. Reusing mean that they will be known by Kafka's brokers under newer producerId/epochId, which will result in ProducerFencedException if we try to commit again some old (and already committed) transaction. > > Ignoring this exception might hide some bugs/issues, because instead of failing we might have a semi silent (with a warning) data loss. I think the last sentence answers your question. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Potential Kafka producer leak in case of failures > - > > Key: FLINK-10455 > URL: https://issues.apache.org/jira/browse/FLINK-10455 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.5.2 >Reporter: Nico Kruber >Assignee: Andrey Zagrebin >Priority: Major > Labels: pull-request-available > > If the Kafka brokers' timeout is too low for our checkpoint interval [1], we > may get an {{ProducerFencedException}}. Documentation around > {{ProducerFencedException}} explicitly states that we should close the > producer after encountering it. > By looking at the code, it doesn't seem like this is actually done in > {{FlinkKafkaProducer011}}. Also, in case one transaction's commit in > {{TwoPhaseCommitSinkFunction#notifyCheckpointComplete}} fails with an > exception, we don't clean up (nor try to commit) any other transaction. > -> from what I see, {{TwoPhaseCommitSinkFunction#notifyCheckpointComplete}} > simply iterates over the {{pendingCommitTransactions}} which is not touched > during {{close()}} > Now if we restart the failing job on the same Flink cluster, any resources > from the previous attempt will still linger around. > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/connectors/kafka.html#kafka-011 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] pnowojski commented on issue #6989: [FLINK-10455][Kafka Tx] Close transactional producers in case of failure and termination
pnowojski commented on issue #6989: [FLINK-10455][Kafka Tx] Close transactional producers in case of failure and termination URL: https://github.com/apache/flink/pull/6989#issuecomment-435429850 Regarding your question @azagrebin, I think my bug fix for https://issues.apache.org/jira/browse/FLINK-8086 (which added "ignore ProducerFencedException") indeed might turned the @GJL's timeouts code: ``` } catch (final Exception e) { final long elapsedTime = clock.millis() - transactionHolder.transactionStartTime; if (ignoreFailuresAfterTransactionTimeout && elapsedTime > transactionTimeout) { LOG.error("Error while committing transaction {}. " + "Transaction has been open for longer than the transaction timeout ({})." + "Commit will not be attempted again. Data loss might have occurred.", transactionHolder.handle, transactionTimeout, e); } else { throw e; } } ``` into a dead code at the moment. But I think my bug fix is more important (from my commit message): > [FLINK-8086][kafka] Ignore ProducerFencedException > > during recovery ProducerFencedException can happen if we restore twice from the same checkpoint or if we restore from an old savepoint. In both cases transactional.ids that we want to recoverAndCommit have been already committed and reused. Reusing mean that they will be known by Kafka's brokers under newer producerId/epochId, which will result in ProducerFencedException if we try to commit again some old (and already committed) transaction. > > Ignoring this exception might hide some bugs/issues, because instead of failing we might have a semi silent (with a warning) data loss. I think the last sentence answers your question. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10633) End-to-end test: Prometheus reporter
[ https://issues.apache.org/jira/browse/FLINK-10633?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16673329#comment-16673329 ] ASF GitHub Bot commented on FLINK-10633: zentol commented on a change in pull request #7003: [FLINK-10633][prometheus] Add E2E test URL: https://github.com/apache/flink/pull/7003#discussion_r230425448 ## File path: flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/FlinkDistribution.java ## @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.tests.util; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.configuration.UnmodifiableConfiguration; +import org.apache.flink.util.ExceptionUtils; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; + +import okhttp3.OkHttpClient; +import okhttp3.Request; +import okhttp3.Response; +import org.junit.Assert; +import org.junit.rules.ExternalResource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStreamReader; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.StandardCopyOption; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Optional; +import java.util.function.Function; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * A wrapper around a Flink distribution. + */ +public final class FlinkDistribution extends ExternalResource { + + private static final Logger LOG = LoggerFactory.getLogger(FlinkDistribution.class); + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + private final List filesToDelete = new ArrayList<>(4); + + private static final Path FLINK_CONF_YAML = Paths.get("flink-conf.yaml"); + private static final Path FLINK_CONF_YAML_BACKUP = Paths.get("flink-conf.yaml.bak"); + + private final Path opt; + private final Path lib; + private final Path conf; + private final Path log; + private final Path bin; + + private Configuration defaultConfig; + + public FlinkDistribution() { + final String distDirProperty = System.getProperty("distDir"); + if (distDirProperty == null) { + Assert.fail("The distDir property was not set. You can set it when running maven via -DdistDir= ."); + } + final Path flinkDir = Paths.get(distDirProperty); + bin = flinkDir.resolve("bin"); + opt = flinkDir.resolve("opt"); + lib = flinkDir.resolve("lib"); + conf = flinkDir.resolve("conf"); + log = flinkDir.resolve("log"); + } + + @Override + protected void before() throws IOException { + defaultConfig = new UnmodifiableConfiguration(GlobalConfiguration.loadConfiguration(conf.toAbsolutePath().toString())); + final Path originalConfig = conf.resolve(FLINK_CONF_YAML); + final Path backupConfig = conf.resolve(FLINK_CONF_YAML_BACKUP); + Files.copy(originalConfig, backupConfig); + filesToDelete.add(new AutoClosablePath(backupConfig)); + } + + @Override + protected void after() { + try { + stopFlinkCluster(); + } catch (IOException e) { + LOG.error("Failure while shutting down Flink cluster.", e); + } + + final Path originalConfig = conf.resolve(FLINK_CONF_YAML); + final Path backupConfig = conf.resolve(FLINK_CONF_YAML_BACKUP); + + try {
[GitHub] zentol commented on a change in pull request #7003: [FLINK-10633][prometheus] Add E2E test
zentol commented on a change in pull request #7003: [FLINK-10633][prometheus] Add E2E test URL: https://github.com/apache/flink/pull/7003#discussion_r230425448 ## File path: flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/FlinkDistribution.java ## @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.tests.util; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.configuration.UnmodifiableConfiguration; +import org.apache.flink.util.ExceptionUtils; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; + +import okhttp3.OkHttpClient; +import okhttp3.Request; +import okhttp3.Response; +import org.junit.Assert; +import org.junit.rules.ExternalResource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStreamReader; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.StandardCopyOption; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Optional; +import java.util.function.Function; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * A wrapper around a Flink distribution. + */ +public final class FlinkDistribution extends ExternalResource { + + private static final Logger LOG = LoggerFactory.getLogger(FlinkDistribution.class); + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + private final List filesToDelete = new ArrayList<>(4); + + private static final Path FLINK_CONF_YAML = Paths.get("flink-conf.yaml"); + private static final Path FLINK_CONF_YAML_BACKUP = Paths.get("flink-conf.yaml.bak"); + + private final Path opt; + private final Path lib; + private final Path conf; + private final Path log; + private final Path bin; + + private Configuration defaultConfig; + + public FlinkDistribution() { + final String distDirProperty = System.getProperty("distDir"); + if (distDirProperty == null) { + Assert.fail("The distDir property was not set. You can set it when running maven via -DdistDir= ."); + } + final Path flinkDir = Paths.get(distDirProperty); + bin = flinkDir.resolve("bin"); + opt = flinkDir.resolve("opt"); + lib = flinkDir.resolve("lib"); + conf = flinkDir.resolve("conf"); + log = flinkDir.resolve("log"); + } + + @Override + protected void before() throws IOException { + defaultConfig = new UnmodifiableConfiguration(GlobalConfiguration.loadConfiguration(conf.toAbsolutePath().toString())); + final Path originalConfig = conf.resolve(FLINK_CONF_YAML); + final Path backupConfig = conf.resolve(FLINK_CONF_YAML_BACKUP); + Files.copy(originalConfig, backupConfig); + filesToDelete.add(new AutoClosablePath(backupConfig)); + } + + @Override + protected void after() { + try { + stopFlinkCluster(); + } catch (IOException e) { + LOG.error("Failure while shutting down Flink cluster.", e); + } + + final Path originalConfig = conf.resolve(FLINK_CONF_YAML); + final Path backupConfig = conf.resolve(FLINK_CONF_YAML_BACKUP); + + try { + Files.move(backupConfig, originalConfig, StandardCopyOption.REPLACE_EXISTING); + } catch (IOException e) { + LOG.error("Failed to restore flink-conf.yaml", e); + } + +
[jira] [Commented] (FLINK-10638) Invalid table scan resolution for temporal join queries
[ https://issues.apache.org/jira/browse/FLINK-10638?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16673297#comment-16673297 ] ASF GitHub Bot commented on FLINK-10638: pnowojski commented on a change in pull request #6981: [FLINK-10638][table] Invalid table scan resolution for temporal join queries URL: https://github.com/apache/flink/pull/6981#discussion_r230420946 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/TemporalJoinITCase.scala ## @@ -142,11 +140,16 @@ class TemporalJoinITCase extends StreamingWithStateTestBase { tEnv.registerTable("Orders", orders) tEnv.registerTable("RatesHistory", ratesHistory) +tEnv.registerTable("FilteredRatesHistory", tEnv.scan("RatesHistory").filter('rate > 110L)) tEnv.registerFunction( "Rates", - ratesHistory.createTemporalTableFunction('rowtime, 'currency)) + tEnv.scan("FilteredRatesHistory").createTemporalTableFunction('rowtime, 'currency)) +tEnv.registerTable("TemporalJoinResult", tEnv.sqlQuery(sqlQuery)) -val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row] +// Scan from registered table to test for interplay between +// LogicalCorrelateToTemporalTableJoinRule and TableScanRule +val result = tEnv.scan("TemporalJoinResult").toAppendStream[Row] +System.out.println(tEnv.explain(tEnv.sqlQuery(sqlQuery))) Review comment: ops This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Invalid table scan resolution for temporal join queries > --- > > Key: FLINK-10638 > URL: https://issues.apache.org/jira/browse/FLINK-10638 > Project: Flink > Issue Type: Bug > Components: Table API SQL >Reporter: Timo Walther >Assignee: Piotr Nowojski >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > Registered tables that contain a temporal join are not properly resolved when > performing a table scan. > A planning error occurs when registering a table with a temporal join and > reading from it again. > {code} > LogicalProject(amount=[*($0, $4)]) > LogicalFilter(condition=[=($3, $1)]) > LogicalCorrelate(correlation=[$cor0], joinType=[inner], > requiredColumns=[{2}]) > LogicalTableScan(table=[[_DataStreamTable_0]]) > > LogicalTableFunctionScan(invocation=[Rates(CAST($cor0.rowtime):TIMESTAMP(3) > NOT NULL)], rowType=[RecordType(VARCHAR(65536) currency, BIGINT rate, TIME > ATTRIBUTE(ROWTIME) rowtime)], elementType=[class [Ljava.lang.Object;]) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] pnowojski commented on a change in pull request #6981: [FLINK-10638][table] Invalid table scan resolution for temporal join queries
pnowojski commented on a change in pull request #6981: [FLINK-10638][table] Invalid table scan resolution for temporal join queries URL: https://github.com/apache/flink/pull/6981#discussion_r230420946 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/TemporalJoinITCase.scala ## @@ -142,11 +140,16 @@ class TemporalJoinITCase extends StreamingWithStateTestBase { tEnv.registerTable("Orders", orders) tEnv.registerTable("RatesHistory", ratesHistory) +tEnv.registerTable("FilteredRatesHistory", tEnv.scan("RatesHistory").filter('rate > 110L)) tEnv.registerFunction( "Rates", - ratesHistory.createTemporalTableFunction('rowtime, 'currency)) + tEnv.scan("FilteredRatesHistory").createTemporalTableFunction('rowtime, 'currency)) +tEnv.registerTable("TemporalJoinResult", tEnv.sqlQuery(sqlQuery)) -val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row] +// Scan from registered table to test for interplay between +// LogicalCorrelateToTemporalTableJoinRule and TableScanRule +val result = tEnv.scan("TemporalJoinResult").toAppendStream[Row] +System.out.println(tEnv.explain(tEnv.sqlQuery(sqlQuery))) Review comment: ops This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10356) Add sanity checks to SpillingAdaptiveSpanningRecordDeserializer
[ https://issues.apache.org/jira/browse/FLINK-10356?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16673295#comment-16673295 ] ASF GitHub Bot commented on FLINK-10356: NicoK commented on a change in pull request #6705: [FLINK-10356][network] add sanity checks to SpillingAdaptiveSpanningRecordDeserializer URL: https://github.com/apache/flink/pull/6705#discussion_r230420404 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java ## @@ -104,11 +120,236 @@ public void testHandleMixedLargeRecords() throws Exception { testSerializationRoundTrip(originalRecords, segmentSize); } + /** +* Non-spanning, deserialization reads one byte too many and succeeds. +*/ + @Test + public void testHandleDeserializingTooMuchNonSpanning1() throws Exception { + testHandleWrongDeserialization( + new StringValueDeserializingTooMuch("Test string"), + 32 * 1024, + null); + } + + /** +* Non-spanning, serialization length is 16 (including headers), deserialization reads one byte +* too many and succeeds. +*/ + @Test + public void testHandleDeserializingTooMuchNonSpanning2() throws Exception { + testHandleWrongDeserialization( + new StringValueDeserializingTooMuch("Test string"), + 17, Review comment: yes, that would be really nice but is not available out of the box since `StringValue#getBinaryLength` only returns `-1` and additionally, we do add one more length header from the `SpanningRecordSerializer` - I could evaluate the actual value through serializing it (maybe only once, statically) though and keep using that or so... This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add sanity checks to SpillingAdaptiveSpanningRecordDeserializer > --- > > Key: FLINK-10356 > URL: https://issues.apache.org/jira/browse/FLINK-10356 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.5.0, 1.5.1, 1.5.2, 1.5.3, 1.5.4, 1.6.0, 1.6.1, 1.7.0 >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Major > Labels: pull-request-available > > {{SpillingAdaptiveSpanningRecordDeserializer}} doesn't have any consistency > checks for usage calls or serializers behaving properly, e.g. to read only as > many bytes as available/promised for that record. At least these checks > should be added: > # Check that buffers have not been read from yet before adding them (this is > an invariant {{SpillingAdaptiveSpanningRecordDeserializer}} works with and > from what I can see, it is followed now. > # Check that after deserialization, we actually consumed {{recordLength}} > bytes > ** If not, in the spanning deserializer, we currently simply skip the > remaining bytes. > ** But in the non-spanning deserializer, we currently continue from the > wrong offset. > # Protect against {{setNextBuffer}} being called before draining all > available records -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] NicoK commented on a change in pull request #6705: [FLINK-10356][network] add sanity checks to SpillingAdaptiveSpanningRecordDeserializer
NicoK commented on a change in pull request #6705: [FLINK-10356][network] add sanity checks to SpillingAdaptiveSpanningRecordDeserializer URL: https://github.com/apache/flink/pull/6705#discussion_r230420404 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java ## @@ -104,11 +120,236 @@ public void testHandleMixedLargeRecords() throws Exception { testSerializationRoundTrip(originalRecords, segmentSize); } + /** +* Non-spanning, deserialization reads one byte too many and succeeds. +*/ + @Test + public void testHandleDeserializingTooMuchNonSpanning1() throws Exception { + testHandleWrongDeserialization( + new StringValueDeserializingTooMuch("Test string"), + 32 * 1024, + null); + } + + /** +* Non-spanning, serialization length is 16 (including headers), deserialization reads one byte +* too many and succeeds. +*/ + @Test + public void testHandleDeserializingTooMuchNonSpanning2() throws Exception { + testHandleWrongDeserialization( + new StringValueDeserializingTooMuch("Test string"), + 17, Review comment: yes, that would be really nice but is not available out of the box since `StringValue#getBinaryLength` only returns `-1` and additionally, we do add one more length header from the `SpanningRecordSerializer` - I could evaluate the actual value through serializing it (maybe only once, statically) though and keep using that or so... This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10633) End-to-end test: Prometheus reporter
[ https://issues.apache.org/jira/browse/FLINK-10633?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16673294#comment-16673294 ] ASF GitHub Bot commented on FLINK-10633: zentol commented on a change in pull request #7003: [FLINK-10633][prometheus] Add E2E test URL: https://github.com/apache/flink/pull/7003#discussion_r230420322 ## File path: flink-end-to-end-tests/flink-metrics-reporter-prometheus-test/src/test/java/org/apache/flink/metrics/prometheus/tests/PrometheusReporterEndToEndITCase.java ## @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.metrics.prometheus.tests; + +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.metrics.prometheus.PrometheusReporter; +import org.apache.flink.tests.util.AutoClosableProcess; +import org.apache.flink.tests.util.CommandLineWrapper; +import org.apache.flink.tests.util.FlinkDistribution; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.TestLogger; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; + +import okhttp3.OkHttpClient; +import okhttp3.Request; +import okhttp3.Response; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.file.Files; +import java.nio.file.Path; +import java.time.Duration; +import java.util.List; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +import static org.apache.flink.tests.util.AutoClosableProcess.runBlocking; +import static org.apache.flink.tests.util.AutoClosableProcess.runNonBlocking; + +/** + * End-to-end test for the PrometheusReporter. + */ +public class PrometheusReporterEndToEndITCase extends TestLogger { + + private static final Logger LOG = LoggerFactory.getLogger(PrometheusReporterEndToEndITCase.class); + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + private static final String PROMETHEUS_FILE_NAME = "prometheus-2.4.3.linux-amd64"; + + private static final Pattern LOG_REPORTER_PORT_PATTERN = Pattern.compile(".*Started PrometheusReporter HTTP server on port ([0-9]+).*"); + + @Rule + public final FlinkDistribution dist = new FlinkDistribution(); + + @Rule + public final TemporaryFolder tmp = new TemporaryFolder(); + + @Test + public void testReporter() throws Exception { + LOG.info("starting test"); + dist.copyOptJarsToLib("flink-metrics-prometheus"); + + final Configuration config = new Configuration(); + config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "prom." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, PrometheusReporter.class.getCanonicalName()); + config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "prom.port", "9000-9100"); + + dist.appendConfiguration(config); + + final Path tmpPrometheusDir = tmp.newFolder().toPath().resolve("prometheus"); + final Path prometheusArchive = tmpPrometheusDir.resolve(PROMETHEUS_FILE_NAME + ".tar.gz"); + final Path prometheusBinDir = tmpPrometheusDir.resolve(PROMETHEUS_FILE_NAME); + final Path prometheusConfig = prometheusBinDir.resolve("prometheus.yml"); + final Path prometheusBinary = prometheusBinDir.resolve("prometheus"); + Files.createDirectory(tmpPrometheusDir); + + runBlocking( + "Download of Prometheus", + Duration.ofMinutes(5), + CommandLineWrapper + .wget("https://github.com/prometheus/prometheus/releases/download/v2.4.3/; + prometheusArchive.getFileName()) Review comment: hmm...maybe but I doubt we'll change the version anytime soon. This is an automated message from the Apache Git Service. To respond to the message, please log
[GitHub] zentol commented on a change in pull request #7003: [FLINK-10633][prometheus] Add E2E test
zentol commented on a change in pull request #7003: [FLINK-10633][prometheus] Add E2E test URL: https://github.com/apache/flink/pull/7003#discussion_r230420322 ## File path: flink-end-to-end-tests/flink-metrics-reporter-prometheus-test/src/test/java/org/apache/flink/metrics/prometheus/tests/PrometheusReporterEndToEndITCase.java ## @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.metrics.prometheus.tests; + +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.metrics.prometheus.PrometheusReporter; +import org.apache.flink.tests.util.AutoClosableProcess; +import org.apache.flink.tests.util.CommandLineWrapper; +import org.apache.flink.tests.util.FlinkDistribution; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.TestLogger; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; + +import okhttp3.OkHttpClient; +import okhttp3.Request; +import okhttp3.Response; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.file.Files; +import java.nio.file.Path; +import java.time.Duration; +import java.util.List; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +import static org.apache.flink.tests.util.AutoClosableProcess.runBlocking; +import static org.apache.flink.tests.util.AutoClosableProcess.runNonBlocking; + +/** + * End-to-end test for the PrometheusReporter. + */ +public class PrometheusReporterEndToEndITCase extends TestLogger { + + private static final Logger LOG = LoggerFactory.getLogger(PrometheusReporterEndToEndITCase.class); + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + private static final String PROMETHEUS_FILE_NAME = "prometheus-2.4.3.linux-amd64"; + + private static final Pattern LOG_REPORTER_PORT_PATTERN = Pattern.compile(".*Started PrometheusReporter HTTP server on port ([0-9]+).*"); + + @Rule + public final FlinkDistribution dist = new FlinkDistribution(); + + @Rule + public final TemporaryFolder tmp = new TemporaryFolder(); + + @Test + public void testReporter() throws Exception { + LOG.info("starting test"); + dist.copyOptJarsToLib("flink-metrics-prometheus"); + + final Configuration config = new Configuration(); + config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "prom." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, PrometheusReporter.class.getCanonicalName()); + config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "prom.port", "9000-9100"); + + dist.appendConfiguration(config); + + final Path tmpPrometheusDir = tmp.newFolder().toPath().resolve("prometheus"); + final Path prometheusArchive = tmpPrometheusDir.resolve(PROMETHEUS_FILE_NAME + ".tar.gz"); + final Path prometheusBinDir = tmpPrometheusDir.resolve(PROMETHEUS_FILE_NAME); + final Path prometheusConfig = prometheusBinDir.resolve("prometheus.yml"); + final Path prometheusBinary = prometheusBinDir.resolve("prometheus"); + Files.createDirectory(tmpPrometheusDir); + + runBlocking( + "Download of Prometheus", + Duration.ofMinutes(5), + CommandLineWrapper + .wget("https://github.com/prometheus/prometheus/releases/download/v2.4.3/; + prometheusArchive.getFileName()) Review comment: hmm...maybe but I doubt we'll change the version anytime soon. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10633) End-to-end test: Prometheus reporter
[ https://issues.apache.org/jira/browse/FLINK-10633?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16673292#comment-16673292 ] ASF GitHub Bot commented on FLINK-10633: zentol commented on a change in pull request #7003: [FLINK-10633][prometheus] Add E2E test URL: https://github.com/apache/flink/pull/7003#discussion_r230420018 ## File path: flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/FlinkDistribution.java ## @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.tests.util; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.configuration.UnmodifiableConfiguration; +import org.apache.flink.util.ExceptionUtils; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; + +import okhttp3.OkHttpClient; +import okhttp3.Request; +import okhttp3.Response; +import org.junit.Assert; +import org.junit.rules.ExternalResource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStreamReader; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.StandardCopyOption; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Optional; +import java.util.function.Function; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * A wrapper around a Flink distribution. + */ +public final class FlinkDistribution extends ExternalResource { + + private static final Logger LOG = LoggerFactory.getLogger(FlinkDistribution.class); + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + private final List filesToDelete = new ArrayList<>(4); + + private static final Path FLINK_CONF_YAML = Paths.get("flink-conf.yaml"); + private static final Path FLINK_CONF_YAML_BACKUP = Paths.get("flink-conf.yaml.bak"); + + private final Path opt; + private final Path lib; + private final Path conf; + private final Path log; + private final Path bin; + + private Configuration defaultConfig; + + public FlinkDistribution() { + final String distDirProperty = System.getProperty("distDir"); + if (distDirProperty == null) { + Assert.fail("The distDir property was not set. You can set it when running maven via -DdistDir= ."); + } + final Path flinkDir = Paths.get(distDirProperty); + bin = flinkDir.resolve("bin"); + opt = flinkDir.resolve("opt"); + lib = flinkDir.resolve("lib"); + conf = flinkDir.resolve("conf"); + log = flinkDir.resolve("log"); + } + + @Override + protected void before() throws IOException { + defaultConfig = new UnmodifiableConfiguration(GlobalConfiguration.loadConfiguration(conf.toAbsolutePath().toString())); + final Path originalConfig = conf.resolve(FLINK_CONF_YAML); + final Path backupConfig = conf.resolve(FLINK_CONF_YAML_BACKUP); + Files.copy(originalConfig, backupConfig); + filesToDelete.add(new AutoClosablePath(backupConfig)); + } + + @Override + protected void after() { + try { Review comment: This is purely for safety and as such I'd like to keep it without any condition. I can see the point, and maybe we should return a `Closable` in `startCluster` to allow better management in the test. `FlinkDistribution` does not represent a flink cluster but `flink-dist`; it thus doesn't make sense to start a cluster in `before`.
[GitHub] zentol commented on a change in pull request #7003: [FLINK-10633][prometheus] Add E2E test
zentol commented on a change in pull request #7003: [FLINK-10633][prometheus] Add E2E test URL: https://github.com/apache/flink/pull/7003#discussion_r230420018 ## File path: flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/FlinkDistribution.java ## @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.tests.util; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.configuration.UnmodifiableConfiguration; +import org.apache.flink.util.ExceptionUtils; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; + +import okhttp3.OkHttpClient; +import okhttp3.Request; +import okhttp3.Response; +import org.junit.Assert; +import org.junit.rules.ExternalResource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStreamReader; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.StandardCopyOption; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Optional; +import java.util.function.Function; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * A wrapper around a Flink distribution. + */ +public final class FlinkDistribution extends ExternalResource { + + private static final Logger LOG = LoggerFactory.getLogger(FlinkDistribution.class); + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + private final List filesToDelete = new ArrayList<>(4); + + private static final Path FLINK_CONF_YAML = Paths.get("flink-conf.yaml"); + private static final Path FLINK_CONF_YAML_BACKUP = Paths.get("flink-conf.yaml.bak"); + + private final Path opt; + private final Path lib; + private final Path conf; + private final Path log; + private final Path bin; + + private Configuration defaultConfig; + + public FlinkDistribution() { + final String distDirProperty = System.getProperty("distDir"); + if (distDirProperty == null) { + Assert.fail("The distDir property was not set. You can set it when running maven via -DdistDir= ."); + } + final Path flinkDir = Paths.get(distDirProperty); + bin = flinkDir.resolve("bin"); + opt = flinkDir.resolve("opt"); + lib = flinkDir.resolve("lib"); + conf = flinkDir.resolve("conf"); + log = flinkDir.resolve("log"); + } + + @Override + protected void before() throws IOException { + defaultConfig = new UnmodifiableConfiguration(GlobalConfiguration.loadConfiguration(conf.toAbsolutePath().toString())); + final Path originalConfig = conf.resolve(FLINK_CONF_YAML); + final Path backupConfig = conf.resolve(FLINK_CONF_YAML_BACKUP); + Files.copy(originalConfig, backupConfig); + filesToDelete.add(new AutoClosablePath(backupConfig)); + } + + @Override + protected void after() { + try { Review comment: This is purely for safety and as such I'd like to keep it without any condition. I can see the point, and maybe we should return a `Closable` in `startCluster` to allow better management in the test. `FlinkDistribution` does not represent a flink cluster but `flink-dist`; it thus doesn't make sense to start a cluster in `before`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about
[jira] [Commented] (FLINK-10633) End-to-end test: Prometheus reporter
[ https://issues.apache.org/jira/browse/FLINK-10633?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16673284#comment-16673284 ] ASF GitHub Bot commented on FLINK-10633: zentol commented on a change in pull request #7003: [FLINK-10633][prometheus] Add E2E test URL: https://github.com/apache/flink/pull/7003#discussion_r230418196 ## File path: flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/FlinkDistribution.java ## @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.tests.util; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.configuration.UnmodifiableConfiguration; +import org.apache.flink.util.ExceptionUtils; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; + +import okhttp3.OkHttpClient; +import okhttp3.Request; +import okhttp3.Response; +import org.junit.Assert; +import org.junit.rules.ExternalResource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStreamReader; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.StandardCopyOption; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Optional; +import java.util.function.Function; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * A wrapper around a Flink distribution. + */ +public final class FlinkDistribution extends ExternalResource { + + private static final Logger LOG = LoggerFactory.getLogger(FlinkDistribution.class); + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + private final List filesToDelete = new ArrayList<>(4); + + private static final Path FLINK_CONF_YAML = Paths.get("flink-conf.yaml"); + private static final Path FLINK_CONF_YAML_BACKUP = Paths.get("flink-conf.yaml.bak"); + + private final Path opt; + private final Path lib; + private final Path conf; + private final Path log; + private final Path bin; + + private Configuration defaultConfig; + + public FlinkDistribution() { + final String distDirProperty = System.getProperty("distDir"); + if (distDirProperty == null) { + Assert.fail("The distDir property was not set. You can set it when running maven via -DdistDir= ."); + } + final Path flinkDir = Paths.get(distDirProperty); + bin = flinkDir.resolve("bin"); + opt = flinkDir.resolve("opt"); + lib = flinkDir.resolve("lib"); + conf = flinkDir.resolve("conf"); + log = flinkDir.resolve("log"); + } + + @Override + protected void before() throws IOException { + defaultConfig = new UnmodifiableConfiguration(GlobalConfiguration.loadConfiguration(conf.toAbsolutePath().toString())); + final Path originalConfig = conf.resolve(FLINK_CONF_YAML); + final Path backupConfig = conf.resolve(FLINK_CONF_YAML_BACKUP); + Files.copy(originalConfig, backupConfig); + filesToDelete.add(new AutoClosablePath(backupConfig)); + } + + @Override + protected void after() { + try { + stopFlinkCluster(); + } catch (IOException e) { + LOG.error("Failure while shutting down Flink cluster.", e); + } + + final Path originalConfig = conf.resolve(FLINK_CONF_YAML); + final Path backupConfig = conf.resolve(FLINK_CONF_YAML_BACKUP); + + try {
[GitHub] zentol commented on a change in pull request #7003: [FLINK-10633][prometheus] Add E2E test
zentol commented on a change in pull request #7003: [FLINK-10633][prometheus] Add E2E test URL: https://github.com/apache/flink/pull/7003#discussion_r230418196 ## File path: flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/FlinkDistribution.java ## @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.tests.util; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.configuration.UnmodifiableConfiguration; +import org.apache.flink.util.ExceptionUtils; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; + +import okhttp3.OkHttpClient; +import okhttp3.Request; +import okhttp3.Response; +import org.junit.Assert; +import org.junit.rules.ExternalResource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStreamReader; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.StandardCopyOption; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Optional; +import java.util.function.Function; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * A wrapper around a Flink distribution. + */ +public final class FlinkDistribution extends ExternalResource { + + private static final Logger LOG = LoggerFactory.getLogger(FlinkDistribution.class); + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + private final List filesToDelete = new ArrayList<>(4); + + private static final Path FLINK_CONF_YAML = Paths.get("flink-conf.yaml"); + private static final Path FLINK_CONF_YAML_BACKUP = Paths.get("flink-conf.yaml.bak"); + + private final Path opt; + private final Path lib; + private final Path conf; + private final Path log; + private final Path bin; + + private Configuration defaultConfig; + + public FlinkDistribution() { + final String distDirProperty = System.getProperty("distDir"); + if (distDirProperty == null) { + Assert.fail("The distDir property was not set. You can set it when running maven via -DdistDir= ."); + } + final Path flinkDir = Paths.get(distDirProperty); + bin = flinkDir.resolve("bin"); + opt = flinkDir.resolve("opt"); + lib = flinkDir.resolve("lib"); + conf = flinkDir.resolve("conf"); + log = flinkDir.resolve("log"); + } + + @Override + protected void before() throws IOException { + defaultConfig = new UnmodifiableConfiguration(GlobalConfiguration.loadConfiguration(conf.toAbsolutePath().toString())); + final Path originalConfig = conf.resolve(FLINK_CONF_YAML); + final Path backupConfig = conf.resolve(FLINK_CONF_YAML_BACKUP); + Files.copy(originalConfig, backupConfig); + filesToDelete.add(new AutoClosablePath(backupConfig)); + } + + @Override + protected void after() { + try { + stopFlinkCluster(); + } catch (IOException e) { + LOG.error("Failure while shutting down Flink cluster.", e); + } + + final Path originalConfig = conf.resolve(FLINK_CONF_YAML); + final Path backupConfig = conf.resolve(FLINK_CONF_YAML_BACKUP); + + try { + Files.move(backupConfig, originalConfig, StandardCopyOption.REPLACE_EXISTING); + } catch (IOException e) { + LOG.error("Failed to restore flink-conf.yaml", e); + } + +
[jira] [Commented] (FLINK-10633) End-to-end test: Prometheus reporter
[ https://issues.apache.org/jira/browse/FLINK-10633?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16673283#comment-16673283 ] ASF GitHub Bot commented on FLINK-10633: zentol commented on a change in pull request #7003: [FLINK-10633][prometheus] Add E2E test URL: https://github.com/apache/flink/pull/7003#discussion_r230417575 ## File path: flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/FlinkDistribution.java ## @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.tests.util; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.configuration.UnmodifiableConfiguration; +import org.apache.flink.util.ExceptionUtils; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; + +import okhttp3.OkHttpClient; +import okhttp3.Request; +import okhttp3.Response; +import org.junit.Assert; +import org.junit.rules.ExternalResource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStreamReader; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.StandardCopyOption; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Optional; +import java.util.function.Function; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * A wrapper around a Flink distribution. + */ +public final class FlinkDistribution extends ExternalResource { + + private static final Logger LOG = LoggerFactory.getLogger(FlinkDistribution.class); + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + private final List filesToDelete = new ArrayList<>(4); + + private static final Path FLINK_CONF_YAML = Paths.get("flink-conf.yaml"); + private static final Path FLINK_CONF_YAML_BACKUP = Paths.get("flink-conf.yaml.bak"); + + private final Path opt; + private final Path lib; + private final Path conf; + private final Path log; + private final Path bin; + + private Configuration defaultConfig; + + public FlinkDistribution() { + final String distDirProperty = System.getProperty("distDir"); + if (distDirProperty == null) { + Assert.fail("The distDir property was not set. You can set it when running maven via -DdistDir= ."); + } + final Path flinkDir = Paths.get(distDirProperty); + bin = flinkDir.resolve("bin"); + opt = flinkDir.resolve("opt"); + lib = flinkDir.resolve("lib"); + conf = flinkDir.resolve("conf"); + log = flinkDir.resolve("log"); + } + + @Override + protected void before() throws IOException { + defaultConfig = new UnmodifiableConfiguration(GlobalConfiguration.loadConfiguration(conf.toAbsolutePath().toString())); + final Path originalConfig = conf.resolve(FLINK_CONF_YAML); + final Path backupConfig = conf.resolve(FLINK_CONF_YAML_BACKUP); + Files.copy(originalConfig, backupConfig); + filesToDelete.add(new AutoClosablePath(backupConfig)); + } + + @Override + protected void after() { + try { + stopFlinkCluster(); + } catch (IOException e) { + LOG.error("Failure while shutting down Flink cluster.", e); + } + + final Path originalConfig = conf.resolve(FLINK_CONF_YAML); + final Path backupConfig = conf.resolve(FLINK_CONF_YAML_BACKUP); + + try {
[GitHub] zentol commented on a change in pull request #7003: [FLINK-10633][prometheus] Add E2E test
zentol commented on a change in pull request #7003: [FLINK-10633][prometheus] Add E2E test URL: https://github.com/apache/flink/pull/7003#discussion_r230417575 ## File path: flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/FlinkDistribution.java ## @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.tests.util; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.configuration.UnmodifiableConfiguration; +import org.apache.flink.util.ExceptionUtils; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; + +import okhttp3.OkHttpClient; +import okhttp3.Request; +import okhttp3.Response; +import org.junit.Assert; +import org.junit.rules.ExternalResource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStreamReader; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.StandardCopyOption; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Optional; +import java.util.function.Function; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * A wrapper around a Flink distribution. + */ +public final class FlinkDistribution extends ExternalResource { + + private static final Logger LOG = LoggerFactory.getLogger(FlinkDistribution.class); + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + private final List filesToDelete = new ArrayList<>(4); + + private static final Path FLINK_CONF_YAML = Paths.get("flink-conf.yaml"); + private static final Path FLINK_CONF_YAML_BACKUP = Paths.get("flink-conf.yaml.bak"); + + private final Path opt; + private final Path lib; + private final Path conf; + private final Path log; + private final Path bin; + + private Configuration defaultConfig; + + public FlinkDistribution() { + final String distDirProperty = System.getProperty("distDir"); + if (distDirProperty == null) { + Assert.fail("The distDir property was not set. You can set it when running maven via -DdistDir= ."); + } + final Path flinkDir = Paths.get(distDirProperty); + bin = flinkDir.resolve("bin"); + opt = flinkDir.resolve("opt"); + lib = flinkDir.resolve("lib"); + conf = flinkDir.resolve("conf"); + log = flinkDir.resolve("log"); + } + + @Override + protected void before() throws IOException { + defaultConfig = new UnmodifiableConfiguration(GlobalConfiguration.loadConfiguration(conf.toAbsolutePath().toString())); + final Path originalConfig = conf.resolve(FLINK_CONF_YAML); + final Path backupConfig = conf.resolve(FLINK_CONF_YAML_BACKUP); + Files.copy(originalConfig, backupConfig); + filesToDelete.add(new AutoClosablePath(backupConfig)); + } + + @Override + protected void after() { + try { + stopFlinkCluster(); + } catch (IOException e) { + LOG.error("Failure while shutting down Flink cluster.", e); + } + + final Path originalConfig = conf.resolve(FLINK_CONF_YAML); + final Path backupConfig = conf.resolve(FLINK_CONF_YAML_BACKUP); + + try { + Files.move(backupConfig, originalConfig, StandardCopyOption.REPLACE_EXISTING); + } catch (IOException e) { + LOG.error("Failed to restore flink-conf.yaml", e); + } + +
[jira] [Commented] (FLINK-10633) End-to-end test: Prometheus reporter
[ https://issues.apache.org/jira/browse/FLINK-10633?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16673281#comment-16673281 ] ASF GitHub Bot commented on FLINK-10633: zentol commented on a change in pull request #7003: [FLINK-10633][prometheus] Add E2E test URL: https://github.com/apache/flink/pull/7003#discussion_r230417518 ## File path: flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/FlinkDistribution.java ## @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.tests.util; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.configuration.UnmodifiableConfiguration; +import org.apache.flink.util.ExceptionUtils; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; + +import okhttp3.OkHttpClient; +import okhttp3.Request; +import okhttp3.Response; +import org.junit.Assert; +import org.junit.rules.ExternalResource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStreamReader; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.StandardCopyOption; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Optional; +import java.util.function.Function; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * A wrapper around a Flink distribution. + */ +public final class FlinkDistribution extends ExternalResource { + + private static final Logger LOG = LoggerFactory.getLogger(FlinkDistribution.class); + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + private final List filesToDelete = new ArrayList<>(4); + + private static final Path FLINK_CONF_YAML = Paths.get("flink-conf.yaml"); + private static final Path FLINK_CONF_YAML_BACKUP = Paths.get("flink-conf.yaml.bak"); + + private final Path opt; + private final Path lib; + private final Path conf; + private final Path log; + private final Path bin; + + private Configuration defaultConfig; + + public FlinkDistribution() { + final String distDirProperty = System.getProperty("distDir"); + if (distDirProperty == null) { + Assert.fail("The distDir property was not set. You can set it when running maven via -DdistDir= ."); + } + final Path flinkDir = Paths.get(distDirProperty); + bin = flinkDir.resolve("bin"); + opt = flinkDir.resolve("opt"); + lib = flinkDir.resolve("lib"); + conf = flinkDir.resolve("conf"); + log = flinkDir.resolve("log"); + } + + @Override + protected void before() throws IOException { + defaultConfig = new UnmodifiableConfiguration(GlobalConfiguration.loadConfiguration(conf.toAbsolutePath().toString())); + final Path originalConfig = conf.resolve(FLINK_CONF_YAML); + final Path backupConfig = conf.resolve(FLINK_CONF_YAML_BACKUP); + Files.copy(originalConfig, backupConfig); + filesToDelete.add(new AutoClosablePath(backupConfig)); + } + + @Override + protected void after() { + try { + stopFlinkCluster(); + } catch (IOException e) { + LOG.error("Failure while shutting down Flink cluster.", e); + } + + final Path originalConfig = conf.resolve(FLINK_CONF_YAML); + final Path backupConfig = conf.resolve(FLINK_CONF_YAML_BACKUP); + + try {
[GitHub] zentol commented on a change in pull request #7003: [FLINK-10633][prometheus] Add E2E test
zentol commented on a change in pull request #7003: [FLINK-10633][prometheus] Add E2E test URL: https://github.com/apache/flink/pull/7003#discussion_r230417518 ## File path: flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/FlinkDistribution.java ## @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.tests.util; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.configuration.UnmodifiableConfiguration; +import org.apache.flink.util.ExceptionUtils; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; + +import okhttp3.OkHttpClient; +import okhttp3.Request; +import okhttp3.Response; +import org.junit.Assert; +import org.junit.rules.ExternalResource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStreamReader; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.StandardCopyOption; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Optional; +import java.util.function.Function; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * A wrapper around a Flink distribution. + */ +public final class FlinkDistribution extends ExternalResource { + + private static final Logger LOG = LoggerFactory.getLogger(FlinkDistribution.class); + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + private final List filesToDelete = new ArrayList<>(4); + + private static final Path FLINK_CONF_YAML = Paths.get("flink-conf.yaml"); + private static final Path FLINK_CONF_YAML_BACKUP = Paths.get("flink-conf.yaml.bak"); + + private final Path opt; + private final Path lib; + private final Path conf; + private final Path log; + private final Path bin; + + private Configuration defaultConfig; + + public FlinkDistribution() { + final String distDirProperty = System.getProperty("distDir"); + if (distDirProperty == null) { + Assert.fail("The distDir property was not set. You can set it when running maven via -DdistDir= ."); + } + final Path flinkDir = Paths.get(distDirProperty); + bin = flinkDir.resolve("bin"); + opt = flinkDir.resolve("opt"); + lib = flinkDir.resolve("lib"); + conf = flinkDir.resolve("conf"); + log = flinkDir.resolve("log"); + } + + @Override + protected void before() throws IOException { + defaultConfig = new UnmodifiableConfiguration(GlobalConfiguration.loadConfiguration(conf.toAbsolutePath().toString())); + final Path originalConfig = conf.resolve(FLINK_CONF_YAML); + final Path backupConfig = conf.resolve(FLINK_CONF_YAML_BACKUP); + Files.copy(originalConfig, backupConfig); + filesToDelete.add(new AutoClosablePath(backupConfig)); + } + + @Override + protected void after() { + try { + stopFlinkCluster(); + } catch (IOException e) { + LOG.error("Failure while shutting down Flink cluster.", e); + } + + final Path originalConfig = conf.resolve(FLINK_CONF_YAML); + final Path backupConfig = conf.resolve(FLINK_CONF_YAML_BACKUP); + + try { + Files.move(backupConfig, originalConfig, StandardCopyOption.REPLACE_EXISTING); + } catch (IOException e) { + LOG.error("Failed to restore flink-conf.yaml", e); + } + +
[jira] [Commented] (FLINK-10633) End-to-end test: Prometheus reporter
[ https://issues.apache.org/jira/browse/FLINK-10633?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16673279#comment-16673279 ] ASF GitHub Bot commented on FLINK-10633: zentol commented on a change in pull request #7003: [FLINK-10633][prometheus] Add E2E test URL: https://github.com/apache/flink/pull/7003#discussion_r230416606 ## File path: flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/FlinkDistribution.java ## @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.tests.util; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.configuration.UnmodifiableConfiguration; +import org.apache.flink.util.ExceptionUtils; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; + +import okhttp3.OkHttpClient; +import okhttp3.Request; +import okhttp3.Response; +import org.junit.Assert; +import org.junit.rules.ExternalResource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStreamReader; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.StandardCopyOption; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Optional; +import java.util.function.Function; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * A wrapper around a Flink distribution. + */ +public final class FlinkDistribution extends ExternalResource { + + private static final Logger LOG = LoggerFactory.getLogger(FlinkDistribution.class); + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + private final List filesToDelete = new ArrayList<>(4); + + private static final Path FLINK_CONF_YAML = Paths.get("flink-conf.yaml"); + private static final Path FLINK_CONF_YAML_BACKUP = Paths.get("flink-conf.yaml.bak"); + + private final Path opt; + private final Path lib; + private final Path conf; + private final Path log; + private final Path bin; + + private Configuration defaultConfig; + + public FlinkDistribution() { + final String distDirProperty = System.getProperty("distDir"); + if (distDirProperty == null) { + Assert.fail("The distDir property was not set. You can set it when running maven via -DdistDir= ."); + } + final Path flinkDir = Paths.get(distDirProperty); + bin = flinkDir.resolve("bin"); + opt = flinkDir.resolve("opt"); + lib = flinkDir.resolve("lib"); + conf = flinkDir.resolve("conf"); + log = flinkDir.resolve("log"); + } + + @Override + protected void before() throws IOException { + defaultConfig = new UnmodifiableConfiguration(GlobalConfiguration.loadConfiguration(conf.toAbsolutePath().toString())); + final Path originalConfig = conf.resolve(FLINK_CONF_YAML); Review comment: There's no use-case for multiple backups. The backup only exists to reset flink-dist to the state before the test. Tests can already setup multiple configs for separate clusters by calling `appendConfiguration` multiple times. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > End-to-end test: Prometheus reporter > > >
[GitHub] zentol commented on a change in pull request #7003: [FLINK-10633][prometheus] Add E2E test
zentol commented on a change in pull request #7003: [FLINK-10633][prometheus] Add E2E test URL: https://github.com/apache/flink/pull/7003#discussion_r230416606 ## File path: flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/FlinkDistribution.java ## @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.tests.util; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.configuration.UnmodifiableConfiguration; +import org.apache.flink.util.ExceptionUtils; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; + +import okhttp3.OkHttpClient; +import okhttp3.Request; +import okhttp3.Response; +import org.junit.Assert; +import org.junit.rules.ExternalResource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStreamReader; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.StandardCopyOption; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Optional; +import java.util.function.Function; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * A wrapper around a Flink distribution. + */ +public final class FlinkDistribution extends ExternalResource { + + private static final Logger LOG = LoggerFactory.getLogger(FlinkDistribution.class); + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + private final List filesToDelete = new ArrayList<>(4); + + private static final Path FLINK_CONF_YAML = Paths.get("flink-conf.yaml"); + private static final Path FLINK_CONF_YAML_BACKUP = Paths.get("flink-conf.yaml.bak"); + + private final Path opt; + private final Path lib; + private final Path conf; + private final Path log; + private final Path bin; + + private Configuration defaultConfig; + + public FlinkDistribution() { + final String distDirProperty = System.getProperty("distDir"); + if (distDirProperty == null) { + Assert.fail("The distDir property was not set. You can set it when running maven via -DdistDir= ."); + } + final Path flinkDir = Paths.get(distDirProperty); + bin = flinkDir.resolve("bin"); + opt = flinkDir.resolve("opt"); + lib = flinkDir.resolve("lib"); + conf = flinkDir.resolve("conf"); + log = flinkDir.resolve("log"); + } + + @Override + protected void before() throws IOException { + defaultConfig = new UnmodifiableConfiguration(GlobalConfiguration.loadConfiguration(conf.toAbsolutePath().toString())); + final Path originalConfig = conf.resolve(FLINK_CONF_YAML); Review comment: There's no use-case for multiple backups. The backup only exists to reset flink-dist to the state before the test. Tests can already setup multiple configs for separate clusters by calling `appendConfiguration` multiple times. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tillrohrmann edited a comment on issue #6998: [FLINK-10757] [tests] Avoid port conflicts in AbstractTaskManagerProc…
tillrohrmann edited a comment on issue #6998: [FLINK-10757] [tests] Avoid port conflicts in AbstractTaskManagerProc… URL: https://github.com/apache/flink/pull/6998#issuecomment-435419268 There are 3 more builds in front of this one. It should be done in 2 hours. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10757) TaskManagerProcessFailureStreamingRecoveryITCase failed to initialize the cluster entrypoint StandaloneSessionClusterEntrypoint.
[ https://issues.apache.org/jira/browse/FLINK-10757?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16673272#comment-16673272 ] ASF GitHub Bot commented on FLINK-10757: tillrohrmann commented on issue #6998: [FLINK-10757] [tests] Avoid port conflicts in AbstractTaskManagerProc… URL: https://github.com/apache/flink/pull/6998#issuecomment-435419268 There are 3 more builds in front of this one. It should be done in 4 hours. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > TaskManagerProcessFailureStreamingRecoveryITCase failed to initialize the > cluster entrypoint StandaloneSessionClusterEntrypoint. > > > Key: FLINK-10757 > URL: https://issues.apache.org/jira/browse/FLINK-10757 > Project: Flink > Issue Type: Bug > Components: Tests >Affects Versions: 1.6.2, 1.7.0 >Reporter: Bowen Li >Assignee: TisonKun >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > {code:java} > Failed tests: > ... > > TaskManagerProcessFailureStreamingRecoveryITCase>AbstractTaskManagerProcessFailureRecoveryTest.testTaskManagerProcessFailure:223 > Failed to initialize the cluster entrypoint > StandaloneSessionClusterEntrypoint. > Tests in error: > {code} > https://travis-ci.org/apache/flink/jobs/449439623 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] tillrohrmann commented on issue #6998: [FLINK-10757] [tests] Avoid port conflicts in AbstractTaskManagerProc…
tillrohrmann commented on issue #6998: [FLINK-10757] [tests] Avoid port conflicts in AbstractTaskManagerProc… URL: https://github.com/apache/flink/pull/6998#issuecomment-435419268 There are 3 more builds in front of this one. It should be done in 4 hours. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10757) TaskManagerProcessFailureStreamingRecoveryITCase failed to initialize the cluster entrypoint StandaloneSessionClusterEntrypoint.
[ https://issues.apache.org/jira/browse/FLINK-10757?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16673274#comment-16673274 ] ASF GitHub Bot commented on FLINK-10757: tillrohrmann edited a comment on issue #6998: [FLINK-10757] [tests] Avoid port conflicts in AbstractTaskManagerProc… URL: https://github.com/apache/flink/pull/6998#issuecomment-435419268 There are 3 more builds in front of this one. It should be done in 2 hours. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > TaskManagerProcessFailureStreamingRecoveryITCase failed to initialize the > cluster entrypoint StandaloneSessionClusterEntrypoint. > > > Key: FLINK-10757 > URL: https://issues.apache.org/jira/browse/FLINK-10757 > Project: Flink > Issue Type: Bug > Components: Tests >Affects Versions: 1.6.2, 1.7.0 >Reporter: Bowen Li >Assignee: TisonKun >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > {code:java} > Failed tests: > ... > > TaskManagerProcessFailureStreamingRecoveryITCase>AbstractTaskManagerProcessFailureRecoveryTest.testTaskManagerProcessFailure:223 > Failed to initialize the cluster entrypoint > StandaloneSessionClusterEntrypoint. > Tests in error: > {code} > https://travis-ci.org/apache/flink/jobs/449439623 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10633) End-to-end test: Prometheus reporter
[ https://issues.apache.org/jira/browse/FLINK-10633?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16673269#comment-16673269 ] ASF GitHub Bot commented on FLINK-10633: zentol commented on a change in pull request #7003: [FLINK-10633][prometheus] Add E2E test URL: https://github.com/apache/flink/pull/7003#discussion_r230415501 ## File path: flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/FlinkDistribution.java ## @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.tests.util; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.configuration.UnmodifiableConfiguration; +import org.apache.flink.util.ExceptionUtils; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; + +import okhttp3.OkHttpClient; +import okhttp3.Request; +import okhttp3.Response; +import org.junit.Assert; +import org.junit.rules.ExternalResource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStreamReader; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.StandardCopyOption; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Optional; +import java.util.function.Function; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * A wrapper around a Flink distribution. + */ +public final class FlinkDistribution extends ExternalResource { Review comment: eh, I prefer `FlinkDistributionResource` since this class should really only provide primitives operations that map closely to `flink-dist`. Anything more complex beyond that belongs into a separate class. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > End-to-end test: Prometheus reporter > > > Key: FLINK-10633 > URL: https://issues.apache.org/jira/browse/FLINK-10633 > Project: Flink > Issue Type: Sub-task > Components: E2E Tests >Affects Versions: 1.7.0 >Reporter: Till Rohrmann >Assignee: Chesnay Schepler >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > Add an end-to-end test using the {{PrometheusReporter}} to verify that all > metrics are properly reported. Additionally verify that the newly introduce > RocksDB metrics are accessible (see FLINK-10423). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] zentol commented on a change in pull request #7003: [FLINK-10633][prometheus] Add E2E test
zentol commented on a change in pull request #7003: [FLINK-10633][prometheus] Add E2E test URL: https://github.com/apache/flink/pull/7003#discussion_r230415501 ## File path: flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/FlinkDistribution.java ## @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.tests.util; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.configuration.UnmodifiableConfiguration; +import org.apache.flink.util.ExceptionUtils; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; + +import okhttp3.OkHttpClient; +import okhttp3.Request; +import okhttp3.Response; +import org.junit.Assert; +import org.junit.rules.ExternalResource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStreamReader; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.StandardCopyOption; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Optional; +import java.util.function.Function; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * A wrapper around a Flink distribution. + */ +public final class FlinkDistribution extends ExternalResource { Review comment: eh, I prefer `FlinkDistributionResource` since this class should really only provide primitives operations that map closely to `flink-dist`. Anything more complex beyond that belongs into a separate class. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] zentol commented on a change in pull request #7003: [FLINK-10633][prometheus] Add E2E test
zentol commented on a change in pull request #7003: [FLINK-10633][prometheus] Add E2E test URL: https://github.com/apache/flink/pull/7003#discussion_r230414679 ## File path: flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/CommandLineWrapper.java ## @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.tests.util; + +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.List; + +/** + * Utility class for setting up command-line tool usages in a readable fashion. + */ +public enum CommandLineWrapper { + ; + + public static WGetBuilder wget(String url) { + return new WGetBuilder(url); + } + + /** +* Wrapper around wget used for downloading files. +*/ + public static final class WGetBuilder { + + private final String url; + private Path targetDir; + + WGetBuilder(String url) { + this.url = url; + } + + public WGetBuilder targetDir(Path dir) { Review comment: it's harder to extend as with a variety of options you'll end up with a bloated constructor. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10633) End-to-end test: Prometheus reporter
[ https://issues.apache.org/jira/browse/FLINK-10633?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16673268#comment-16673268 ] ASF GitHub Bot commented on FLINK-10633: zentol commented on a change in pull request #7003: [FLINK-10633][prometheus] Add E2E test URL: https://github.com/apache/flink/pull/7003#discussion_r230414679 ## File path: flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/CommandLineWrapper.java ## @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.tests.util; + +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.List; + +/** + * Utility class for setting up command-line tool usages in a readable fashion. + */ +public enum CommandLineWrapper { + ; + + public static WGetBuilder wget(String url) { + return new WGetBuilder(url); + } + + /** +* Wrapper around wget used for downloading files. +*/ + public static final class WGetBuilder { + + private final String url; + private Path targetDir; + + WGetBuilder(String url) { + this.url = url; + } + + public WGetBuilder targetDir(Path dir) { Review comment: it's harder to extend as with a variety of options you'll end up with a bloated constructor. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > End-to-end test: Prometheus reporter > > > Key: FLINK-10633 > URL: https://issues.apache.org/jira/browse/FLINK-10633 > Project: Flink > Issue Type: Sub-task > Components: E2E Tests >Affects Versions: 1.7.0 >Reporter: Till Rohrmann >Assignee: Chesnay Schepler >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > Add an end-to-end test using the {{PrometheusReporter}} to verify that all > metrics are properly reported. Additionally verify that the newly introduce > RocksDB metrics are accessible (see FLINK-10423). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10633) End-to-end test: Prometheus reporter
[ https://issues.apache.org/jira/browse/FLINK-10633?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16673245#comment-16673245 ] ASF GitHub Bot commented on FLINK-10633: kl0u commented on a change in pull request #7003: [FLINK-10633][prometheus] Add E2E test URL: https://github.com/apache/flink/pull/7003#discussion_r230410636 ## File path: flink-end-to-end-tests/flink-metrics-reporter-prometheus-test/src/test/java/org/apache/flink/metrics/prometheus/tests/PrometheusReporterEndToEndITCase.java ## @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.metrics.prometheus.tests; + +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.metrics.prometheus.PrometheusReporter; +import org.apache.flink.tests.util.AutoClosableProcess; +import org.apache.flink.tests.util.CommandLineWrapper; +import org.apache.flink.tests.util.FlinkDistribution; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.TestLogger; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; + +import okhttp3.OkHttpClient; +import okhttp3.Request; +import okhttp3.Response; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.file.Files; +import java.nio.file.Path; +import java.time.Duration; +import java.util.List; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +import static org.apache.flink.tests.util.AutoClosableProcess.runBlocking; +import static org.apache.flink.tests.util.AutoClosableProcess.runNonBlocking; + +/** + * End-to-end test for the PrometheusReporter. + */ +public class PrometheusReporterEndToEndITCase extends TestLogger { + + private static final Logger LOG = LoggerFactory.getLogger(PrometheusReporterEndToEndITCase.class); + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + private static final String PROMETHEUS_FILE_NAME = "prometheus-2.4.3.linux-amd64"; + + private static final Pattern LOG_REPORTER_PORT_PATTERN = Pattern.compile(".*Started PrometheusReporter HTTP server on port ([0-9]+).*"); + + @Rule + public final FlinkDistribution dist = new FlinkDistribution(); + + @Rule + public final TemporaryFolder tmp = new TemporaryFolder(); + + @Test + public void testReporter() throws Exception { + LOG.info("starting test"); + dist.copyOptJarsToLib("flink-metrics-prometheus"); + + final Configuration config = new Configuration(); + config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "prom." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, PrometheusReporter.class.getCanonicalName()); + config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "prom.port", "9000-9100"); + + dist.appendConfiguration(config); + + final Path tmpPrometheusDir = tmp.newFolder().toPath().resolve("prometheus"); + final Path prometheusArchive = tmpPrometheusDir.resolve(PROMETHEUS_FILE_NAME + ".tar.gz"); + final Path prometheusBinDir = tmpPrometheusDir.resolve(PROMETHEUS_FILE_NAME); + final Path prometheusConfig = prometheusBinDir.resolve("prometheus.yml"); + final Path prometheusBinary = prometheusBinDir.resolve("prometheus"); + Files.createDirectory(tmpPrometheusDir); + + runBlocking( + "Download of Prometheus", + Duration.ofMinutes(5), + CommandLineWrapper + .wget("https://github.com/prometheus/prometheus/releases/download/v2.4.3/; + prometheusArchive.getFileName()) Review comment: Given that the version is already in the path, does it make sense to move the whole path or the part that includes the version to a class variable? This is an
[jira] [Commented] (FLINK-10633) End-to-end test: Prometheus reporter
[ https://issues.apache.org/jira/browse/FLINK-10633?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16673242#comment-16673242 ] ASF GitHub Bot commented on FLINK-10633: kl0u commented on a change in pull request #7003: [FLINK-10633][prometheus] Add E2E test URL: https://github.com/apache/flink/pull/7003#discussion_r230401483 ## File path: flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/AutoClosableProcess.java ## @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.tests.util; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.time.Duration; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +/** + * Utility class to terminate a given {@link Process} when exiting a try-with-resources statement. + */ +public class AutoClosableProcess implements AutoCloseable { + + private static final Logger LOG = LoggerFactory.getLogger(AutoClosableProcess.class); + + private final Process process; + + public AutoClosableProcess(Process process) { + this.process = process; Review comment: Add `null` checks. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > End-to-end test: Prometheus reporter > > > Key: FLINK-10633 > URL: https://issues.apache.org/jira/browse/FLINK-10633 > Project: Flink > Issue Type: Sub-task > Components: E2E Tests >Affects Versions: 1.7.0 >Reporter: Till Rohrmann >Assignee: Chesnay Schepler >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > Add an end-to-end test using the {{PrometheusReporter}} to verify that all > metrics are properly reported. Additionally verify that the newly introduce > RocksDB metrics are accessible (see FLINK-10423). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10633) End-to-end test: Prometheus reporter
[ https://issues.apache.org/jira/browse/FLINK-10633?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16673249#comment-16673249 ] ASF GitHub Bot commented on FLINK-10633: kl0u commented on a change in pull request #7003: [FLINK-10633][prometheus] Add E2E test URL: https://github.com/apache/flink/pull/7003#discussion_r230407576 ## File path: flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/FlinkDistribution.java ## @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.tests.util; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.configuration.UnmodifiableConfiguration; +import org.apache.flink.util.ExceptionUtils; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; + +import okhttp3.OkHttpClient; +import okhttp3.Request; +import okhttp3.Response; +import org.junit.Assert; +import org.junit.rules.ExternalResource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStreamReader; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.StandardCopyOption; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Optional; +import java.util.function.Function; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * A wrapper around a Flink distribution. + */ +public final class FlinkDistribution extends ExternalResource { + + private static final Logger LOG = LoggerFactory.getLogger(FlinkDistribution.class); + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + private final List filesToDelete = new ArrayList<>(4); + + private static final Path FLINK_CONF_YAML = Paths.get("flink-conf.yaml"); + private static final Path FLINK_CONF_YAML_BACKUP = Paths.get("flink-conf.yaml.bak"); + + private final Path opt; + private final Path lib; + private final Path conf; + private final Path log; + private final Path bin; + + private Configuration defaultConfig; + + public FlinkDistribution() { + final String distDirProperty = System.getProperty("distDir"); + if (distDirProperty == null) { + Assert.fail("The distDir property was not set. You can set it when running maven via -DdistDir= ."); + } + final Path flinkDir = Paths.get(distDirProperty); + bin = flinkDir.resolve("bin"); + opt = flinkDir.resolve("opt"); + lib = flinkDir.resolve("lib"); + conf = flinkDir.resolve("conf"); + log = flinkDir.resolve("log"); + } + + @Override + protected void before() throws IOException { + defaultConfig = new UnmodifiableConfiguration(GlobalConfiguration.loadConfiguration(conf.toAbsolutePath().toString())); + final Path originalConfig = conf.resolve(FLINK_CONF_YAML); Review comment: I would put the line 95-98 to a separate method (e.g. `backupOriginal/DefaultConfig`) and call it also in the `before()`, because this method can be also useful on its own. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > End-to-end test: Prometheus reporter > > > Key: FLINK-10633 > URL:
[jira] [Commented] (FLINK-10633) End-to-end test: Prometheus reporter
[ https://issues.apache.org/jira/browse/FLINK-10633?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16673237#comment-16673237 ] ASF GitHub Bot commented on FLINK-10633: kl0u commented on a change in pull request #7003: [FLINK-10633][prometheus] Add E2E test URL: https://github.com/apache/flink/pull/7003#discussion_r230401171 ## File path: flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/AutoClosablePath.java ## @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.tests.util; + +import org.apache.flink.util.FileUtils; + +import java.io.IOException; +import java.nio.file.Path; + +/** + * Utility class to delete a given {@link Path} when exiting a try-with-resources statement. + */ +public final class AutoClosablePath implements AutoCloseable { + + private final Path file; + + public AutoClosablePath(final Path file) { + this.file = file; + } + + @Override + public void close() throws IOException { + FileUtils.deleteFileOrDirectory(file.toFile()); + } + + public Path getFile() { Review comment: This is not used anywhere, so why not removing it and putting it if a future test needs it? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > End-to-end test: Prometheus reporter > > > Key: FLINK-10633 > URL: https://issues.apache.org/jira/browse/FLINK-10633 > Project: Flink > Issue Type: Sub-task > Components: E2E Tests >Affects Versions: 1.7.0 >Reporter: Till Rohrmann >Assignee: Chesnay Schepler >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > Add an end-to-end test using the {{PrometheusReporter}} to verify that all > metrics are properly reported. Additionally verify that the newly introduce > RocksDB metrics are accessible (see FLINK-10423). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10633) End-to-end test: Prometheus reporter
[ https://issues.apache.org/jira/browse/FLINK-10633?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16673252#comment-16673252 ] ASF GitHub Bot commented on FLINK-10633: kl0u commented on a change in pull request #7003: [FLINK-10633][prometheus] Add E2E test URL: https://github.com/apache/flink/pull/7003#discussion_r230404317 ## File path: flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/FlinkDistribution.java ## @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.tests.util; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.configuration.UnmodifiableConfiguration; +import org.apache.flink.util.ExceptionUtils; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; + +import okhttp3.OkHttpClient; +import okhttp3.Request; +import okhttp3.Response; +import org.junit.Assert; +import org.junit.rules.ExternalResource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStreamReader; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.StandardCopyOption; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Optional; +import java.util.function.Function; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * A wrapper around a Flink distribution. + */ +public final class FlinkDistribution extends ExternalResource { Review comment: This is like a `E2ETestHarness` more than a `FlinkDistribution` right? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > End-to-end test: Prometheus reporter > > > Key: FLINK-10633 > URL: https://issues.apache.org/jira/browse/FLINK-10633 > Project: Flink > Issue Type: Sub-task > Components: E2E Tests >Affects Versions: 1.7.0 >Reporter: Till Rohrmann >Assignee: Chesnay Schepler >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > Add an end-to-end test using the {{PrometheusReporter}} to verify that all > metrics are properly reported. Additionally verify that the newly introduce > RocksDB metrics are accessible (see FLINK-10423). -- This message was sent by Atlassian JIRA (v7.6.3#76005)