[GitHub] [flink] flinkbot edited a comment on issue #10082: [FLINK-14164][runtime] Add a meter ‘numberOfRestarts’ to show number of restarts as well as its rate
flinkbot edited a comment on issue #10082: [FLINK-14164][runtime] Add a meter ‘numberOfRestarts’ to show number of restarts as well as its rate URL: https://github.com/apache/flink/pull/10082#issuecomment-549698167 ## CI report: * 9ad21545cdce68c8892941e8f33a89147bddb1f6 : PENDING [Build](https://travis-ci.com/flink-ci/flink/builds/134980263) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #10060: [FLINK-14546] [flink-json] Support map type in flink-json
flinkbot edited a comment on issue #10060: [FLINK-14546] [flink-json] Support map type in flink-json URL: https://github.com/apache/flink/pull/10060#issuecomment-548313242 ## CI report: * c68dc12d24be4ef19efc1ef42246e6018c4f6e8d : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/134363638) * f65310be9fb96f2446398a718dabeb4bd5a7a874 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/134963469) * 8dc48c4a16d1f4f82c78b14b18c0832fbb235d3d : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/134970076) * 2f6aec8fe5899e7360156efb05df6c3c9b1d9f04 : UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot commented on issue #10083: [FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs.
flinkbot commented on issue #10083: [FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs. URL: https://github.com/apache/flink/pull/10083#issuecomment-549704821 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Automated Checks Last check on commit bdb7952a0a48b4e67f51a04db61cd96a1cbecbbc (Tue Nov 05 07:53:06 UTC 2019) **Warnings:** * No documentation files were touched! Remember to keep the Flink docs up to date! Mention the bot in a comment to re-run the automated checks. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-14472) Implement back-pressure monitor with non-blocking outputs
[ https://issues.apache.org/jira/browse/FLINK-14472?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-14472: --- Labels: pull-request-available (was: ) > Implement back-pressure monitor with non-blocking outputs > - > > Key: FLINK-14472 > URL: https://issues.apache.org/jira/browse/FLINK-14472 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Network >Reporter: zhijiang >Assignee: Yingjie Cao >Priority: Minor > Labels: pull-request-available > Fix For: 1.10.0 > > > Currently back-pressure monitor relies on detecting task threads that are > stuck in `requestBufferBuilderBlocking`. There are actually two cases to > cause back-pressure ATM: > * There are no available buffers in `LocalBufferPool` and all the given > quotas from global pool are also exhausted. Then we need to wait for buffer > recycling to `LocalBufferPool`. > * No available buffers in `LocalBufferPool`, but the quota has not been used > up. While requesting buffer from global pool, it is blocked because of no > available buffers in global pool. Then we need to wait for buffer recycling > to global pool. > We try to implement the non-blocking network output in FLINK-14396, so the > back pressure monitor should be adjusted accordingly after the non-blocking > output is used in practice. > In detail we try to avoid the current monitor way by analyzing the task > thread stack, which has some drawbacks discussed before: > * If the `requestBuffer` is not triggered by task thread, the current > monitor is invalid in practice. > * The current monitor is heavy-weight and fragile because it needs to > understand more details of LocalBufferPool implementation. > We could provide a transparent method for the monitor caller to get the > backpressure result directly, and hide the implementation details in the > LocalBufferPool. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] wsry opened a new pull request #10083: [FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs.
wsry opened a new pull request #10083: [FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs. URL: https://github.com/apache/flink/pull/10083 ## What is the purpose of the change Currently back-pressure monitor relies on detecting task threads that are stuck in `requestBufferBuilderBlocking`. There are actually two cases to cause back-pressure ATM: - There are no available buffers in `LocalBufferPool` and all the given quotas from global pool are also exhausted. Then we need to wait for buffer recycling to `LocalBufferPool`. - No available buffers in `LocalBufferPool`, but the quota has not been used up. While requesting buffer from global pool, it is blocked because of no available buffers in global pool. Then we need to wait for buffer recycling to global pool. We try to implement the non-blocking network output in FLINK-14396, so the back pressure monitor should be adjusted accordingly after the non-blocking output is used in practice. In this PR, we implement a new back pressure monitor which monitors the task back pressure by checking the availability of ResultPartitionWriter, e.g. if there are available free buffers in the BufferPool of ResultPartitions for output. ## Brief change log - A new back pressure tracker was implemented which monitors the task back pressure by checking the availability of ResultPartitionWriter, e.g. if there are available free buffers in the BufferPool of ResultPartitions for output. - The old stack sampling based back pressure tracker implementation and relevant code were removed. - New test cases were added to verify the changes. ## Verifying this change Several new test cases are added to verify the changes, including ```BackPressureStatsTrackerImplTest```, ```BackPressureSampleCoordinatorTest```, ```TaskBackPressureSampleServiceTest```, ```TaskTest#testNoBackPressureIfTaskNotStarted```, ```TaskExecutorSubmissionTest#testSampleTaskBackPressure```. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhuzhurk commented on a change in pull request #10076: [FLINK-14465][runtime] Let `StandaloneJobClusterEntrypoint` use user code class loader
zhuzhurk commented on a change in pull request #10076: [FLINK-14465][runtime] Let `StandaloneJobClusterEntrypoint` use user code class loader URL: https://github.com/apache/flink/pull/10076#discussion_r342418215 ## File path: flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java ## @@ -184,8 +184,6 @@ public PackagedProgram(File jarFile, List classpaths, @Nullable String entr } checkJarFile(jarFileUrl); - } else if (!isPython) { - throw new IllegalArgumentException("The jar file must not be null."); Review comment: If to do the rework in another ticket and keep the constructor public in this PR, I think a check is needed to ensure the `entryPointClassName` and `jarFile` are not null at the same time, otherwise an NPE would happen, as till mentioned. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-14610) Add documentation for how to use watermark syntax in DDL
Jark Wu created FLINK-14610: --- Summary: Add documentation for how to use watermark syntax in DDL Key: FLINK-14610 URL: https://issues.apache.org/jira/browse/FLINK-14610 Project: Flink Issue Type: Sub-task Components: Documentation Reporter: Jark Wu Fix For: 1.10.0 Add documentation for how to use watermark syntax in DDL. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] libenchao commented on a change in pull request #10060: [FLINK-14546] [flink-json] Support map type in flink-json
libenchao commented on a change in pull request #10060: [FLINK-14546] [flink-json] Support map type in flink-json URL: https://github.com/apache/flink/pull/10060#discussion_r342414169 ## File path: flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDeserializationSchema.java ## @@ -242,11 +247,25 @@ private DeserializationRuntimeConverter wrapIntoNullableConverter(Deserializatio return Optional.of(createObjectArrayConverter(((BasicArrayTypeInfo) typeInfo).getComponentInfo())); } else if (isPrimitiveByteArray(typeInfo)) { return Optional.of(createByteArrayConverter()); + } else if (typeInfo instanceof MapTypeInfo) { + MapTypeInfo mapTypeInfo = (MapTypeInfo) typeInfo; + return Optional.of(createMapConverter(mapTypeInfo.getKeyTypeInfo(), mapTypeInfo.getValueTypeInfo())); } else { return Optional.empty(); } } + private DeserializationRuntimeConverter createMapConverter(TypeInformation keyType, TypeInformation valueType) { + DeserializationRuntimeConverter valueConverter = createConverter(valueType); + DeserializationRuntimeConverter keyConverter = createConverter(keyType); + + return (mapper, jsonNode) -> StreamSupport.stream( Review comment: done. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-14609) Add doc for Flink SQL computed columns
Danny Chen created FLINK-14609: -- Summary: Add doc for Flink SQL computed columns Key: FLINK-14609 URL: https://issues.apache.org/jira/browse/FLINK-14609 Project: Flink Issue Type: Sub-task Components: Table SQL / API Affects Versions: 1.9.1 Reporter: Danny Chen Fix For: 1.10.0 1. Add doc to describe the syntax of computed column. 2. Add some demo on the website. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] flinkbot commented on issue #10082: [FLINK-14164][runtime] Add a meter ‘numberOfRestarts’ to show number of restarts as well as its rate
flinkbot commented on issue #10082: [FLINK-14164][runtime] Add a meter ‘numberOfRestarts’ to show number of restarts as well as its rate URL: https://github.com/apache/flink/pull/10082#issuecomment-549698167 ## CI report: * 9ad21545cdce68c8892941e8f33a89147bddb1f6 : UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] libenchao commented on a change in pull request #10060: [FLINK-14546] [flink-json] Support map type in flink-json
libenchao commented on a change in pull request #10060: [FLINK-14546] [flink-json] Support map type in flink-json URL: https://github.com/apache/flink/pull/10060#discussion_r342410101 ## File path: flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDeserializationSchema.java ## @@ -242,11 +247,25 @@ private DeserializationRuntimeConverter wrapIntoNullableConverter(Deserializatio return Optional.of(createObjectArrayConverter(((BasicArrayTypeInfo) typeInfo).getComponentInfo())); } else if (isPrimitiveByteArray(typeInfo)) { return Optional.of(createByteArrayConverter()); + } else if (typeInfo instanceof MapTypeInfo) { + MapTypeInfo mapTypeInfo = (MapTypeInfo) typeInfo; + return Optional.of(createMapConverter(mapTypeInfo.getKeyTypeInfo(), mapTypeInfo.getValueTypeInfo())); } else { return Optional.empty(); } } + private DeserializationRuntimeConverter createMapConverter(TypeInformation keyType, TypeInformation valueType) { + DeserializationRuntimeConverter valueConverter = createConverter(valueType); + DeserializationRuntimeConverter keyConverter = createConverter(keyType); + + return (mapper, jsonNode) -> StreamSupport.stream( Review comment: Yes, it's called per record. I'll change it back according to the style guide. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-14608) avoid using Java Streams in JsonRowDeserializationSchema
[ https://issues.apache.org/jira/browse/FLINK-14608?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kurt Young updated FLINK-14608: --- Description: According to [https://flink.apache.org/contributing/code-style-and-quality-java.html], we should avoid using Java Streams in any performance critical code. Since this `DeserializationRuntimeConverter` will be called per field of each coming record, we should provide a non Java Streams implementation. (was: According to [Flink CodeStyle|[https://flink.apache.org/contributing/code-style-and-quality-java.html]], we should avoid using Java Streams in any performance critical code. Since this `DeserializationRuntimeConverter` will be called per field of each coming record, we should provide a non Java Streams implementation. ) > avoid using Java Streams in JsonRowDeserializationSchema > > > Key: FLINK-14608 > URL: https://issues.apache.org/jira/browse/FLINK-14608 > Project: Flink > Issue Type: Improvement > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) >Affects Versions: 1.10.0 >Reporter: Kurt Young >Priority: Major > > According to > [https://flink.apache.org/contributing/code-style-and-quality-java.html], we > should avoid using Java Streams in any performance critical code. Since this > `DeserializationRuntimeConverter` will be called per field of each coming > record, we should provide a non Java Streams implementation. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-14608) avoid using Java Streams in JsonRowDeserializationSchema
Kurt Young created FLINK-14608: -- Summary: avoid using Java Streams in JsonRowDeserializationSchema Key: FLINK-14608 URL: https://issues.apache.org/jira/browse/FLINK-14608 Project: Flink Issue Type: Improvement Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) Affects Versions: 1.10.0 Reporter: Kurt Young According to [Flink CodeStyle|[https://flink.apache.org/contributing/code-style-and-quality-java.html]], we should avoid using Java Streams in any performance critical code. Since this `DeserializationRuntimeConverter` will be called per field of each coming record, we should provide a non Java Streams implementation. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Closed] (FLINK-14498) Introduce NetworkBufferPool#isAvailable() for non-blocking output
[ https://issues.apache.org/jira/browse/FLINK-14498?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yingjie Cao closed FLINK-14498. --- > Introduce NetworkBufferPool#isAvailable() for non-blocking output > - > > Key: FLINK-14498 > URL: https://issues.apache.org/jira/browse/FLINK-14498 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Network >Reporter: zhijiang >Assignee: Yingjie Cao >Priority: Minor > Labels: pull-request-available > Fix For: 1.10.0 > > Time Spent: 20m > Remaining Estimate: 0h > > In order to best-effort implement non-blocking output, we need to further > improve the interaction between LocalBufferPool and NetworkBufferPool in > non-blocking way as a supplementation of FLINK-14396. > In detail, we provide the NetworkBufferPool#isAvailable to indicate the > global pool state, then we could combine its state via > LocalBufferPool#isAvailable` method to avoid blocking in global request while > task processing. > Meanwhile we would refactor the process when LocalBufferPool requests global > buffer. If there are no available buffers in NetworkBufferPool, the > LocalBufferPool should monitor the global's available future instead of > waiting 2 seconds currently in every loop retry. So we can solve the wait > delay and cleanup the codes in a unified way. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] guoweiM commented on a change in pull request #10076: [FLINK-14465][runtime] Let `StandaloneJobClusterEntrypoint` use user code class loader
guoweiM commented on a change in pull request #10076: [FLINK-14465][runtime] Let `StandaloneJobClusterEntrypoint` use user code class loader URL: https://github.com/apache/flink/pull/10076#discussion_r342405496 ## File path: flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPoint.java ## @@ -74,10 +79,20 @@ private StandaloneJobClusterEntryPoint( } @Override - protected DispatcherResourceManagerComponentFactory createDispatcherResourceManagerComponentFactory(Configuration configuration) { + protected DispatcherResourceManagerComponentFactory createDispatcherResourceManagerComponentFactory(Configuration configuration) throws IOException { + final String flinkHomeDir = System.getenv(ENV_FLINK_HOME_DIR); Review comment: 1. I have verified it. (Running /test_docker_embedded_job.sh and manually package the per-job docker image and run) . I see this value in `ConfigConstants`. So I thought it was a contract. 1. Since you said this is not a contract, I would like to propose not to rely on `FLINK_HOME` 1. One possible approach is: 1. We could try the ENV_FLINK_USR_LIB_DIR directory first 1. if this dir does not exsits we could try the UsrLib directory relative to WorkingDir. By the way This requires modifying the working dir in docker-entrypoint.sh to support the default behavior. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] KurtYoung commented on a change in pull request #10060: [FLINK-14546] [flink-json] Support map type in flink-json
KurtYoung commented on a change in pull request #10060: [FLINK-14546] [flink-json] Support map type in flink-json URL: https://github.com/apache/flink/pull/10060#discussion_r342405648 ## File path: flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDeserializationSchema.java ## @@ -242,11 +247,25 @@ private DeserializationRuntimeConverter wrapIntoNullableConverter(Deserializatio return Optional.of(createObjectArrayConverter(((BasicArrayTypeInfo) typeInfo).getComponentInfo())); } else if (isPrimitiveByteArray(typeInfo)) { return Optional.of(createByteArrayConverter()); + } else if (typeInfo instanceof MapTypeInfo) { + MapTypeInfo mapTypeInfo = (MapTypeInfo) typeInfo; + return Optional.of(createMapConverter(mapTypeInfo.getKeyTypeInfo(), mapTypeInfo.getValueTypeInfo())); } else { return Optional.empty(); } } + private DeserializationRuntimeConverter createMapConverter(TypeInformation keyType, TypeInformation valueType) { + DeserializationRuntimeConverter valueConverter = createConverter(valueType); + DeserializationRuntimeConverter keyConverter = createConverter(keyType); + + return (mapper, jsonNode) -> StreamSupport.stream( Review comment: According to [flink code style](https://flink.apache.org/contributing/code-style-and-quality-java.html), we should avoid Java Streams in any performance critical code. Since this DeserializationRuntimeConverter will be called in per record fashion, I would suggest to write it without Java Streams. (I know the code codes are written with Java Streams, but we should follow the code style in newly added codes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] KurtYoung commented on a change in pull request #10060: [FLINK-14546] [flink-json] Support map type in flink-json
KurtYoung commented on a change in pull request #10060: [FLINK-14546] [flink-json] Support map type in flink-json URL: https://github.com/apache/flink/pull/10060#discussion_r342405648 ## File path: flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDeserializationSchema.java ## @@ -242,11 +247,25 @@ private DeserializationRuntimeConverter wrapIntoNullableConverter(Deserializatio return Optional.of(createObjectArrayConverter(((BasicArrayTypeInfo) typeInfo).getComponentInfo())); } else if (isPrimitiveByteArray(typeInfo)) { return Optional.of(createByteArrayConverter()); + } else if (typeInfo instanceof MapTypeInfo) { + MapTypeInfo mapTypeInfo = (MapTypeInfo) typeInfo; + return Optional.of(createMapConverter(mapTypeInfo.getKeyTypeInfo(), mapTypeInfo.getValueTypeInfo())); } else { return Optional.empty(); } } + private DeserializationRuntimeConverter createMapConverter(TypeInformation keyType, TypeInformation valueType) { + DeserializationRuntimeConverter valueConverter = createConverter(valueType); + DeserializationRuntimeConverter keyConverter = createConverter(keyType); + + return (mapper, jsonNode) -> StreamSupport.stream( Review comment: According to [flink code style](https://flink.apache.org/contributing/code-style-and-quality-java.html), we should avoid Java Streams in any performance critical code. Since this DeserializationRuntimeConverter will be called in per record fashion, I would suggest to write it without Java Streams. (I know the old codes are written with Java Streams, but we should follow the code style in newly added codes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] guoweiM commented on a change in pull request #10076: [FLINK-14465][runtime] Let `StandaloneJobClusterEntrypoint` use user code class loader
guoweiM commented on a change in pull request #10076: [FLINK-14465][runtime] Let `StandaloneJobClusterEntrypoint` use user code class loader URL: https://github.com/apache/flink/pull/10076#discussion_r342405496 ## File path: flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPoint.java ## @@ -74,10 +79,20 @@ private StandaloneJobClusterEntryPoint( } @Override - protected DispatcherResourceManagerComponentFactory createDispatcherResourceManagerComponentFactory(Configuration configuration) { + protected DispatcherResourceManagerComponentFactory createDispatcherResourceManagerComponentFactory(Configuration configuration) throws IOException { + final String flinkHomeDir = System.getenv(ENV_FLINK_HOME_DIR); Review comment: 1. I have verified it. (Running /test_docker_embedded_job.sh and manually package the per-job docker image and run) . I see this value in `ConfigConstants`. So I thought it was a contract. 1. Since you said this is not a contract, I would like to propose not to rely on `FLINK_HOME` 1. One possible approach is: 1. The default behavior is to only look for the UsrLib directory relative to WorkingDir. 1. If it does not exist, we could try the ENV_FLINK_USR_LIB_DIR directory. By the way This requires modifying the working dir in docker-entrypoint.sh to support the default behavior. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot commented on issue #10082: [FLINK-14164][runtime] Add a meter ‘numberOfRestarts’ to show number of restarts as well as its rate
flinkbot commented on issue #10082: [FLINK-14164][runtime] Add a meter ‘numberOfRestarts’ to show number of restarts as well as its rate URL: https://github.com/apache/flink/pull/10082#issuecomment-549688949 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Automated Checks Last check on commit 9ad21545cdce68c8892941e8f33a89147bddb1f6 (Tue Nov 05 06:57:08 UTC 2019) **Warnings:** * No documentation files were touched! Remember to keep the Flink docs up to date! Mention the bot in a comment to re-run the automated checks. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-14164) Add a metric to show failover count regarding fine grained recovery
[ https://issues.apache.org/jira/browse/FLINK-14164?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-14164: --- Labels: pull-request-available (was: ) > Add a metric to show failover count regarding fine grained recovery > --- > > Key: FLINK-14164 > URL: https://issues.apache.org/jira/browse/FLINK-14164 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Coordination, Runtime / Metrics >Affects Versions: 1.10.0 >Reporter: Zhu Zhu >Assignee: Zhu Zhu >Priority: Major > Labels: pull-request-available > Fix For: 1.10.0 > > > Previously Flink uses restart all strategy to recover jobs from failures. And > the metric "fullRestart" is used to show the count of failovers. > However, with fine grained recovery introduced in 1.9.0, the "fullRestart" > metric only reveals how many times the entire graph has been restarted, not > including the number of fine grained failure recoveries. > As many users want to build their job alerting based on failovers, I'd > propose to add such a new metric {{numberOfRestarts}} which also respects > fine grained recoveries. The metric should be a meter(MeterView) so that > users can leverage the rate to detect newly happened failures rather than de > deviation by themselves. > The MeterView should be added in SchedulerBase to serve both legacy scheduler > and ng scheduler. > The underlying counter of the MeterView is determined by the scheduler > implementations: > 1. for legacy scheduler, it's the {{ExecutionGraph#numberOfRestartsCounter}} > which was added in FLINK-14206 > 2. for ng scheduler, it's a new counter added in {{ExecutionFailureHandler}} > that counts all the task and global failures notified to it. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] hequn8128 commented on a change in pull request #10017: [FLINK-14019][python] add support for managing environment and dependencies of Python UDF in Flink Python API
hequn8128 commented on a change in pull request #10017: [FLINK-14019][python] add support for managing environment and dependencies of Python UDF in Flink Python API URL: https://github.com/apache/flink/pull/10017#discussion_r341950747 ## File path: flink-python/pyflink/common/dependency_manager.py ## @@ -0,0 +1,106 @@ + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json +import os +import uuid + +__all__ = ['DependencyManager'] + + +class DependencyManager(object): +""" +Container class of dependency-related parameters. It collects all the dependency parameters +and transmit them to JVM before executing job. +""" + +PYTHON_FILE_PREFIX = "python_file" +PYTHON_REQUIREMENTS_FILE_PREFIX = "python_requirements_file" +PYTHON_REQUIREMENTS_CACHE_PREFIX = "python_requirements_cache" +PYTHON_ARCHIVE_PREFIX = "python_archive" + +PYTHON_FILE_MAP = "PYTHON_FILE_MAP" +PYTHON_REQUIREMENTS_FILE = "PYTHON_REQUIREMENTS_FILE" +PYTHON_REQUIREMENTS_CACHE = "PYTHON_REQUIREMENTS_CACHE" +PYTHON_ARCHIVES_MAP = "PYTHON_ARCHIVES_MAP" +PYTHON_EXEC = "PYTHON_EXEC" Review comment: As we put these names in config, how about adding prefix to all these names? For example, `PYTHON_EXEC = "python.environment.python_exec"`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] hequn8128 commented on a change in pull request #10017: [FLINK-14019][python] add support for managing environment and dependencies of Python UDF in Flink Python API
hequn8128 commented on a change in pull request #10017: [FLINK-14019][python] add support for managing environment and dependencies of Python UDF in Flink Python API URL: https://github.com/apache/flink/pull/10017#discussion_r342395415 ## File path: flink-python/src/main/java/org/apache/flink/python/ProcessEnvironmentManager.java ## @@ -0,0 +1,317 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.python; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.python.util.UnzipUtil; +import org.apache.flink.util.FileUtils; + +import org.apache.beam.model.pipeline.v1.RunnerApi; +import org.apache.beam.runners.core.construction.Environments; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.DataOutputStream; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.nio.file.FileSystems; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +/** + * The ProcessEnvironmentManager used to prepare the working dir of python UDF worker + * and create ProcessEnvironment object of Beam Fn API. It will be created if the python + * function runner is configured to run python UDF in process mode. + */ +@Internal +public class ProcessEnvironmentManager implements PythonEnvironmentManager { + + private static final Logger LOG = LoggerFactory.getLogger(ProcessEnvironmentManager.class); + + static final String PYTHON_REQUIREMENTS_FILE = "_PYTHON_REQUIREMENTS_FILE"; + static final String PYTHON_REQUIREMENTS_CACHE = "_PYTHON_REQUIREMENTS_CACHE"; + static final String PYTHON_REQUIREMENTS_TARGET_DIR = "_PYTHON_REQUIREMENTS_TARGET_DIR"; + static final String PYTHON_WORKING_DIR = "_PYTHON_WORKING_DIR"; + + static final String PYTHON_TMP_DIR_PREFIX = "python_dist_"; + static final String PYTHON_REQUIREMENTS_TARGET_DIR_NAME = "python_requirements_target"; + static final String PYTHON_ARCHIVES_DIR = "python_archives"; + static final String PYTHON_PATH_FILES_DIR = "python_path_files_dir"; + + private PythonDependencyManager dependencyManager; + private String pythonTmpDirectoryBase; + private String requirementsTargetDirPath; + private String pythonWorkingDirectory; + private String pythonPathFilesDirectory; + private Thread shutdownHook; + private Map systemEnv; + + static boolean testCopy = false; + + private ProcessEnvironmentManager( + PythonDependencyManager dependencyManager, + String pythonTmpDirectoryBase, + String pythonPathFilesDirectory, + String pythonWorkingDirectory, + String requirementsTargetDirPath, + Map systemEnv) { + this.dependencyManager = dependencyManager; + this.pythonTmpDirectoryBase = pythonTmpDirectoryBase; + this.pythonPathFilesDirectory = pythonPathFilesDirectory; + this.pythonWorkingDirectory = pythonWorkingDirectory; + this.requirementsTargetDirPath = requirementsTargetDirPath; + this.systemEnv = systemEnv; + } + + @Override + public void cleanup() { + if (shutdownHook != null) { + shutdownHook.run(); + Runtime.getRuntime().removeShutdownHook(shutdownHook); + shutdownHook = null; + } + } + + @Override + public RunnerApi.Environment createEnvironment() { + prepareEnvironment(); + Map generatedEnv = generateEnvironmentVariable(); + String flinkHomePath = systemEnv.get(ConfigConstants.ENV_FLINK_HOME_DIR); + String pythonWorkerCommand = + flinkHomePath + File.separator + "bin" + File.separator + "pyflink-udf-runner.sh"; + + return Environments.createProcessEnvironment( +
[GitHub] [flink] hequn8128 commented on a change in pull request #10017: [FLINK-14019][python] add support for managing environment and dependencies of Python UDF in Flink Python API
hequn8128 commented on a change in pull request #10017: [FLINK-14019][python] add support for managing environment and dependencies of Python UDF in Flink Python API URL: https://github.com/apache/flink/pull/10017#discussion_r342050046 ## File path: flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractPythonFunctionOperator.java ## @@ -227,6 +251,9 @@ public void processWatermark(Watermark mark) throws Exception { */ public abstract PythonFunctionRunner createPythonFunctionRunner(); + public abstract PythonEnvironmentManager createPythonEnvironmentManager( Review comment: This method can also be removed if we only put `environmentManager` in `AbstractPythonFunctionRunner`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] hequn8128 commented on a change in pull request #10017: [FLINK-14019][python] add support for managing environment and dependencies of Python UDF in Flink Python API
hequn8128 commented on a change in pull request #10017: [FLINK-14019][python] add support for managing environment and dependencies of Python UDF in Flink Python API URL: https://github.com/apache/flink/pull/10017#discussion_r342403304 ## File path: flink-python/src/main/java/org/apache/flink/python/ProcessEnvironmentManager.java ## @@ -0,0 +1,317 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.python; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.python.util.UnzipUtil; +import org.apache.flink.util.FileUtils; + +import org.apache.beam.model.pipeline.v1.RunnerApi; +import org.apache.beam.runners.core.construction.Environments; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.DataOutputStream; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.nio.file.FileSystems; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +/** + * The ProcessEnvironmentManager used to prepare the working dir of python UDF worker + * and create ProcessEnvironment object of Beam Fn API. It will be created if the python + * function runner is configured to run python UDF in process mode. + */ +@Internal +public class ProcessEnvironmentManager implements PythonEnvironmentManager { Review comment: public final? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] hequn8128 commented on a change in pull request #10017: [FLINK-14019][python] add support for managing environment and dependencies of Python UDF in Flink Python API
hequn8128 commented on a change in pull request #10017: [FLINK-14019][python] add support for managing environment and dependencies of Python UDF in Flink Python API URL: https://github.com/apache/flink/pull/10017#discussion_r342010724 ## File path: flink-python/pyflink/common/tests/test_dependency_manager.py ## @@ -0,0 +1,137 @@ + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json +import re +import unittest + +from pyflink.common import Configuration +from pyflink.common.dependency_manager import DependencyManager +from pyflink.table import TableConfig +from pyflink.testing.test_case_utils import PyFlinkTestCase + + +def replace_uuid(input_obj): +if isinstance(input_obj, str): +return re.sub(r'[a-z0-9]{8}-[a-z0-9]{4}-[a-z0-9]{4}-[a-z0-9]{4}-[a-z0-9]{12}', + '{uuid}', input_obj) +elif isinstance(input_obj, dict): +input_obj_copy = dict() +for key in input_obj: +input_obj_copy[replace_uuid(key)] = replace_uuid(input_obj[key]) +return input_obj_copy + + +class DependencyManagerTests(PyFlinkTestCase): + +def setUp(self): +self.j_env = MockedJavaEnv() +self.config = Configuration() +self.dependency_manager = DependencyManager(self.config, self.j_env) + +def test_add_python_file(self): +self.dependency_manager.add_python_file("tmp_dir/test_file1.py") Review comment: Also test adding dir. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] hequn8128 commented on a change in pull request #10017: [FLINK-14019][python] add support for managing environment and dependencies of Python UDF in Flink Python API
hequn8128 commented on a change in pull request #10017: [FLINK-14019][python] add support for managing environment and dependencies of Python UDF in Flink Python API URL: https://github.com/apache/flink/pull/10017#discussion_r342031800 ## File path: flink-python/pom.xml ## @@ -80,6 +80,11 @@ under the License. ${project.version} provided + + org.apache.flink + flink-shaded-jackson + provided Review comment: I find that we can remove this dependency directly because we already have fasterxml dependencies. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] hequn8128 commented on a change in pull request #10017: [FLINK-14019][python] add support for managing environment and dependencies of Python UDF in Flink Python API
hequn8128 commented on a change in pull request #10017: [FLINK-14019][python] add support for managing environment and dependencies of Python UDF in Flink Python API URL: https://github.com/apache/flink/pull/10017#discussion_r342372266 ## File path: flink-python/src/main/java/org/apache/flink/python/ProcessEnvironmentManager.java ## @@ -0,0 +1,317 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.python; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.python.util.UnzipUtil; +import org.apache.flink.util.FileUtils; + +import org.apache.beam.model.pipeline.v1.RunnerApi; +import org.apache.beam.runners.core.construction.Environments; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.DataOutputStream; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.nio.file.FileSystems; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +/** + * The ProcessEnvironmentManager used to prepare the working dir of python UDF worker + * and create ProcessEnvironment object of Beam Fn API. It will be created if the python + * function runner is configured to run python UDF in process mode. + */ +@Internal +public class ProcessEnvironmentManager implements PythonEnvironmentManager { + + private static final Logger LOG = LoggerFactory.getLogger(ProcessEnvironmentManager.class); + + static final String PYTHON_REQUIREMENTS_FILE = "_PYTHON_REQUIREMENTS_FILE"; + static final String PYTHON_REQUIREMENTS_CACHE = "_PYTHON_REQUIREMENTS_CACHE"; + static final String PYTHON_REQUIREMENTS_TARGET_DIR = "_PYTHON_REQUIREMENTS_TARGET_DIR"; + static final String PYTHON_WORKING_DIR = "_PYTHON_WORKING_DIR"; + + static final String PYTHON_TMP_DIR_PREFIX = "python_dist_"; + static final String PYTHON_REQUIREMENTS_TARGET_DIR_NAME = "python_requirements_target"; + static final String PYTHON_ARCHIVES_DIR = "python_archives"; + static final String PYTHON_PATH_FILES_DIR = "python_path_files_dir"; + + private PythonDependencyManager dependencyManager; + private String pythonTmpDirectoryBase; + private String requirementsTargetDirPath; + private String pythonWorkingDirectory; + private String pythonPathFilesDirectory; + private Thread shutdownHook; + private Map systemEnv; + + static boolean testCopy = false; + + private ProcessEnvironmentManager( + PythonDependencyManager dependencyManager, + String pythonTmpDirectoryBase, + String pythonPathFilesDirectory, + String pythonWorkingDirectory, + String requirementsTargetDirPath, + Map systemEnv) { + this.dependencyManager = dependencyManager; + this.pythonTmpDirectoryBase = pythonTmpDirectoryBase; + this.pythonPathFilesDirectory = pythonPathFilesDirectory; + this.pythonWorkingDirectory = pythonWorkingDirectory; + this.requirementsTargetDirPath = requirementsTargetDirPath; + this.systemEnv = systemEnv; + } + + @Override + public void cleanup() { + if (shutdownHook != null) { + shutdownHook.run(); + Runtime.getRuntime().removeShutdownHook(shutdownHook); + shutdownHook = null; + } + } + + @Override + public RunnerApi.Environment createEnvironment() { + prepareEnvironment(); + Map generatedEnv = generateEnvironmentVariable(); + String flinkHomePath = systemEnv.get(ConfigConstants.ENV_FLINK_HOME_DIR); + String pythonWorkerCommand = + flinkHomePath + File.separator + "bin" + File.separator + "pyflink-udf-runner.sh"; + + return Environments.createProcessEnvironment( +
[GitHub] [flink] hequn8128 commented on a change in pull request #10017: [FLINK-14019][python] add support for managing environment and dependencies of Python UDF in Flink Python API
hequn8128 commented on a change in pull request #10017: [FLINK-14019][python] add support for managing environment and dependencies of Python UDF in Flink Python API URL: https://github.com/apache/flink/pull/10017#discussion_r341476050 ## File path: flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractPythonFunctionOperator.java ## @@ -110,6 +116,23 @@ public void open() throws Exception { LOG.info("The maximum bundle time is configured to {} milliseconds.", this.maxBundleTimeMills); } + String[] tmpDirectories = + getContainingTask().getEnvironment().getTaskManagerInfo().getTmpDirectories(); + Random rand = new Random(); + int tmpDirectoryIndex = rand.nextInt() % tmpDirectories.length; + String pythonTmpDirectoryRoot = tmpDirectories[tmpDirectoryIndex]; + ExecutionConfig.GlobalJobParameters globalJobParameters = getExecutionConfig().getGlobalJobParameters(); + Map parameters; + if (globalJobParameters != null) { Review comment: globalJobParameters can not be null. Simplify it as ``` PythonDependencyManager dependencyManager = PythonDependencyManager.createDependencyManager( getExecutionConfig().getGlobalJobParameters().toMap(), getRuntimeContext().getDistributedCache()); ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] hequn8128 commented on a change in pull request #10017: [FLINK-14019][python] add support for managing environment and dependencies of Python UDF in Flink Python API
hequn8128 commented on a change in pull request #10017: [FLINK-14019][python] add support for managing environment and dependencies of Python UDF in Flink Python API URL: https://github.com/apache/flink/pull/10017#discussion_r342373918 ## File path: flink-python/src/main/java/org/apache/flink/python/ProcessEnvironmentManager.java ## @@ -0,0 +1,317 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.python; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.python.util.UnzipUtil; +import org.apache.flink.util.FileUtils; + +import org.apache.beam.model.pipeline.v1.RunnerApi; +import org.apache.beam.runners.core.construction.Environments; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.DataOutputStream; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.nio.file.FileSystems; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +/** + * The ProcessEnvironmentManager used to prepare the working dir of python UDF worker + * and create ProcessEnvironment object of Beam Fn API. It will be created if the python + * function runner is configured to run python UDF in process mode. + */ +@Internal +public class ProcessEnvironmentManager implements PythonEnvironmentManager { + + private static final Logger LOG = LoggerFactory.getLogger(ProcessEnvironmentManager.class); + + static final String PYTHON_REQUIREMENTS_FILE = "_PYTHON_REQUIREMENTS_FILE"; + static final String PYTHON_REQUIREMENTS_CACHE = "_PYTHON_REQUIREMENTS_CACHE"; + static final String PYTHON_REQUIREMENTS_TARGET_DIR = "_PYTHON_REQUIREMENTS_TARGET_DIR"; + static final String PYTHON_WORKING_DIR = "_PYTHON_WORKING_DIR"; + + static final String PYTHON_TMP_DIR_PREFIX = "python_dist_"; + static final String PYTHON_REQUIREMENTS_TARGET_DIR_NAME = "python_requirements_target"; + static final String PYTHON_ARCHIVES_DIR = "python_archives"; + static final String PYTHON_PATH_FILES_DIR = "python_path_files_dir"; + + private PythonDependencyManager dependencyManager; + private String pythonTmpDirectoryBase; + private String requirementsTargetDirPath; + private String pythonWorkingDirectory; + private String pythonPathFilesDirectory; + private Thread shutdownHook; + private Map systemEnv; + + static boolean testCopy = false; + + private ProcessEnvironmentManager( + PythonDependencyManager dependencyManager, + String pythonTmpDirectoryBase, + String pythonPathFilesDirectory, + String pythonWorkingDirectory, + String requirementsTargetDirPath, + Map systemEnv) { + this.dependencyManager = dependencyManager; + this.pythonTmpDirectoryBase = pythonTmpDirectoryBase; + this.pythonPathFilesDirectory = pythonPathFilesDirectory; + this.pythonWorkingDirectory = pythonWorkingDirectory; + this.requirementsTargetDirPath = requirementsTargetDirPath; + this.systemEnv = systemEnv; + } + + @Override + public void cleanup() { Review comment: Call this cleanup in the close method of Runner. Remove the `environmentManager` member in `AbstractPythonFunctionOperator`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] hequn8128 commented on a change in pull request #10017: [FLINK-14019][python] add support for managing environment and dependencies of Python UDF in Flink Python API
hequn8128 commented on a change in pull request #10017: [FLINK-14019][python] add support for managing environment and dependencies of Python UDF in Flink Python API URL: https://github.com/apache/flink/pull/10017#discussion_r342009003 ## File path: flink-python/pyflink/common/dependency_manager.py ## @@ -0,0 +1,106 @@ + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json +import os +import uuid + +__all__ = ['DependencyManager'] + + +class DependencyManager(object): +""" +Container class of dependency-related parameters. It collects all the dependency parameters +and transmit them to JVM before executing job. +""" + +PYTHON_FILE_PREFIX = "python_file" +PYTHON_REQUIREMENTS_FILE_PREFIX = "python_requirements_file" +PYTHON_REQUIREMENTS_CACHE_PREFIX = "python_requirements_cache" +PYTHON_ARCHIVE_PREFIX = "python_archive" + +PYTHON_FILE_MAP = "PYTHON_FILE_MAP" +PYTHON_REQUIREMENTS_FILE = "PYTHON_REQUIREMENTS_FILE" +PYTHON_REQUIREMENTS_CACHE = "PYTHON_REQUIREMENTS_CACHE" +PYTHON_ARCHIVES_MAP = "PYTHON_ARCHIVES_MAP" +PYTHON_EXEC = "PYTHON_EXEC" + +def __init__(self, parameters, j_env): +self._parameters = parameters +self._j_env = j_env +self._python_file_map = dict() # type: dict[str, str] +self._archives_map = dict() # type: dict[str, str] +self._counter_map = dict() # type: dict[str, int] + +def _generate_file_key(self, prefix): +if prefix not in self._counter_map: +self._counter_map[prefix] = 0 +else: +self._counter_map[prefix] += 1 +return "%s_%d_%s" % (prefix, self._counter_map[prefix], uuid.uuid4()) + +def add_python_file(self, file_path): +key = self._generate_file_key(DependencyManager.PYTHON_FILE_PREFIX) +self._python_file_map[key] = os.path.basename(file_path) +self._parameters.set_string( +DependencyManager.PYTHON_FILE_MAP, json.dumps(self._python_file_map)) +self.register_file(key, file_path) + +def set_python_requirements(self, requirements_file_path, requirements_cached_dir=None): + +if self._parameters.contains_key(DependencyManager.PYTHON_REQUIREMENTS_FILE): +self.remove_file( + self._parameters.get_string(DependencyManager.PYTHON_REQUIREMENTS_FILE, "")) + self._parameters.remove_config(DependencyManager.PYTHON_REQUIREMENTS_FILE) Review comment: Add a meaningful method for this? For example, removeIfDelete() and it can also be used to remove requirements_cached. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhuzhurk opened a new pull request #10082: [FLINK-14164][runtime] Add a meter ‘numberOfRestarts’ to show number of restarts as well as its rate
zhuzhurk opened a new pull request #10082: [FLINK-14164][runtime] Add a meter ‘numberOfRestarts’ to show number of restarts as well as its rate URL: https://github.com/apache/flink/pull/10082 ## What is the purpose of the change This PR is to add a meter ‘numberOfRestarts’ to show number of restarts as well as its rate. It servers both the DefaultScheduler and LegacyScheduler. ## Brief change log - *Add a meter ‘numberOfRestarts’ in SchedulerBase* - *Let DefaultScheduler update the counter on restarts* ## Verifying this change This change added tests and can be verified as follows: - *Refined DefaultSchedulerTest to test this change* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (**yes** / no / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] hequn8128 commented on a change in pull request #10017: [FLINK-14019][python] add support for managing environment and dependencies of Python UDF in Flink Python API
hequn8128 commented on a change in pull request #10017: [FLINK-14019][python] add support for managing environment and dependencies of Python UDF in Flink Python API URL: https://github.com/apache/flink/pull/10017#discussion_r342354936 ## File path: flink-python/src/main/java/org/apache/flink/python/ProcessEnvironmentManager.java ## @@ -0,0 +1,317 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.python; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.python.util.UnzipUtil; +import org.apache.flink.util.FileUtils; + +import org.apache.beam.model.pipeline.v1.RunnerApi; +import org.apache.beam.runners.core.construction.Environments; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.DataOutputStream; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.nio.file.FileSystems; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +/** + * The ProcessEnvironmentManager used to prepare the working dir of python UDF worker + * and create ProcessEnvironment object of Beam Fn API. It will be created if the python + * function runner is configured to run python UDF in process mode. + */ +@Internal +public class ProcessEnvironmentManager implements PythonEnvironmentManager { + + private static final Logger LOG = LoggerFactory.getLogger(ProcessEnvironmentManager.class); + + static final String PYTHON_REQUIREMENTS_FILE = "_PYTHON_REQUIREMENTS_FILE"; + static final String PYTHON_REQUIREMENTS_CACHE = "_PYTHON_REQUIREMENTS_CACHE"; + static final String PYTHON_REQUIREMENTS_TARGET_DIR = "_PYTHON_REQUIREMENTS_TARGET_DIR"; + static final String PYTHON_WORKING_DIR = "_PYTHON_WORKING_DIR"; + + static final String PYTHON_TMP_DIR_PREFIX = "python_dist_"; + static final String PYTHON_REQUIREMENTS_TARGET_DIR_NAME = "python_requirements_target"; + static final String PYTHON_ARCHIVES_DIR = "python_archives"; + static final String PYTHON_PATH_FILES_DIR = "python_path_files_dir"; + + private PythonDependencyManager dependencyManager; + private String pythonTmpDirectoryBase; + private String requirementsTargetDirPath; + private String pythonWorkingDirectory; + private String pythonPathFilesDirectory; + private Thread shutdownHook; + private Map systemEnv; + + static boolean testCopy = false; + + private ProcessEnvironmentManager( + PythonDependencyManager dependencyManager, + String pythonTmpDirectoryBase, + String pythonPathFilesDirectory, + String pythonWorkingDirectory, + String requirementsTargetDirPath, + Map systemEnv) { + this.dependencyManager = dependencyManager; + this.pythonTmpDirectoryBase = pythonTmpDirectoryBase; + this.pythonPathFilesDirectory = pythonPathFilesDirectory; + this.pythonWorkingDirectory = pythonWorkingDirectory; + this.requirementsTargetDirPath = requirementsTargetDirPath; + this.systemEnv = systemEnv; + } + + @Override + public void cleanup() { + if (shutdownHook != null) { + shutdownHook.run(); + Runtime.getRuntime().removeShutdownHook(shutdownHook); + shutdownHook = null; + } + } + + @Override + public RunnerApi.Environment createEnvironment() { + prepareEnvironment(); + Map generatedEnv = generateEnvironmentVariable(); + String flinkHomePath = systemEnv.get(ConfigConstants.ENV_FLINK_HOME_DIR); + String pythonWorkerCommand = + flinkHomePath + File.separator + "bin" + File.separator + "pyflink-udf-runner.sh"; + + return Environments.createProcessEnvironment( +
[GitHub] [flink] hequn8128 commented on a change in pull request #10017: [FLINK-14019][python] add support for managing environment and dependencies of Python UDF in Flink Python API
hequn8128 commented on a change in pull request #10017: [FLINK-14019][python] add support for managing environment and dependencies of Python UDF in Flink Python API URL: https://github.com/apache/flink/pull/10017#discussion_r342049265 ## File path: flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractPythonFunctionOperator.java ## @@ -75,6 +80,7 @@ * A timer that finishes the current bundle after a fixed amount of time. */ private transient ScheduledFuture checkFinishBundleTimer; + protected PythonEnvironmentManager environmentManager; Review comment: I see `environmentManager` has already declared in `AbstractPythonFunctionRunner`, so maybe it's unnecessary to also add it here. The related initialization can also be moved into `AbstractPythonFunctionRunner`. On the other hand, the environment is created in Runner currently, so put environmentManager in Runner is also consistent with it. What do you think? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] hequn8128 commented on a change in pull request #10017: [FLINK-14019][python] add support for managing environment and dependencies of Python UDF in Flink Python API
hequn8128 commented on a change in pull request #10017: [FLINK-14019][python] add support for managing environment and dependencies of Python UDF in Flink Python API URL: https://github.com/apache/flink/pull/10017#discussion_r342365852 ## File path: flink-python/src/main/java/org/apache/flink/python/ProcessEnvironmentManager.java ## @@ -0,0 +1,317 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.python; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.python.util.UnzipUtil; +import org.apache.flink.util.FileUtils; + +import org.apache.beam.model.pipeline.v1.RunnerApi; +import org.apache.beam.runners.core.construction.Environments; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.DataOutputStream; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.nio.file.FileSystems; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +/** + * The ProcessEnvironmentManager used to prepare the working dir of python UDF worker + * and create ProcessEnvironment object of Beam Fn API. It will be created if the python + * function runner is configured to run python UDF in process mode. + */ +@Internal +public class ProcessEnvironmentManager implements PythonEnvironmentManager { + + private static final Logger LOG = LoggerFactory.getLogger(ProcessEnvironmentManager.class); + + static final String PYTHON_REQUIREMENTS_FILE = "_PYTHON_REQUIREMENTS_FILE"; + static final String PYTHON_REQUIREMENTS_CACHE = "_PYTHON_REQUIREMENTS_CACHE"; + static final String PYTHON_REQUIREMENTS_TARGET_DIR = "_PYTHON_REQUIREMENTS_TARGET_DIR"; + static final String PYTHON_WORKING_DIR = "_PYTHON_WORKING_DIR"; + + static final String PYTHON_TMP_DIR_PREFIX = "python_dist_"; + static final String PYTHON_REQUIREMENTS_TARGET_DIR_NAME = "python_requirements_target"; + static final String PYTHON_ARCHIVES_DIR = "python_archives"; + static final String PYTHON_PATH_FILES_DIR = "python_path_files_dir"; + + private PythonDependencyManager dependencyManager; + private String pythonTmpDirectoryBase; + private String requirementsTargetDirPath; + private String pythonWorkingDirectory; + private String pythonPathFilesDirectory; + private Thread shutdownHook; + private Map systemEnv; + + static boolean testCopy = false; + + private ProcessEnvironmentManager( + PythonDependencyManager dependencyManager, + String pythonTmpDirectoryBase, + String pythonPathFilesDirectory, + String pythonWorkingDirectory, + String requirementsTargetDirPath, + Map systemEnv) { + this.dependencyManager = dependencyManager; + this.pythonTmpDirectoryBase = pythonTmpDirectoryBase; + this.pythonPathFilesDirectory = pythonPathFilesDirectory; + this.pythonWorkingDirectory = pythonWorkingDirectory; + this.requirementsTargetDirPath = requirementsTargetDirPath; + this.systemEnv = systemEnv; + } + + @Override + public void cleanup() { + if (shutdownHook != null) { + shutdownHook.run(); + Runtime.getRuntime().removeShutdownHook(shutdownHook); + shutdownHook = null; + } + } + + @Override + public RunnerApi.Environment createEnvironment() { + prepareEnvironment(); + Map generatedEnv = generateEnvironmentVariable(); + String flinkHomePath = systemEnv.get(ConfigConstants.ENV_FLINK_HOME_DIR); + String pythonWorkerCommand = + flinkHomePath + File.separator + "bin" + File.separator + "pyflink-udf-runner.sh"; + + return Environments.createProcessEnvironment( +
[GitHub] [flink] hequn8128 commented on a change in pull request #10017: [FLINK-14019][python] add support for managing environment and dependencies of Python UDF in Flink Python API
hequn8128 commented on a change in pull request #10017: [FLINK-14019][python] add support for managing environment and dependencies of Python UDF in Flink Python API URL: https://github.com/apache/flink/pull/10017#discussion_r342009550 ## File path: flink-python/pyflink/common/dependency_manager.py ## @@ -0,0 +1,106 @@ + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json +import os +import uuid + +__all__ = ['DependencyManager'] + + +class DependencyManager(object): +""" +Container class of dependency-related parameters. It collects all the dependency parameters +and transmit them to JVM before executing job. +""" + +PYTHON_FILE_PREFIX = "python_file" +PYTHON_REQUIREMENTS_FILE_PREFIX = "python_requirements_file" +PYTHON_REQUIREMENTS_CACHE_PREFIX = "python_requirements_cache" +PYTHON_ARCHIVE_PREFIX = "python_archive" + +PYTHON_FILE_MAP = "PYTHON_FILE_MAP" +PYTHON_REQUIREMENTS_FILE = "PYTHON_REQUIREMENTS_FILE" +PYTHON_REQUIREMENTS_CACHE = "PYTHON_REQUIREMENTS_CACHE" +PYTHON_ARCHIVES_MAP = "PYTHON_ARCHIVES_MAP" +PYTHON_EXEC = "PYTHON_EXEC" + +def __init__(self, parameters, j_env): +self._parameters = parameters +self._j_env = j_env +self._python_file_map = dict() # type: dict[str, str] +self._archives_map = dict() # type: dict[str, str] +self._counter_map = dict() # type: dict[str, int] + +def _generate_file_key(self, prefix): +if prefix not in self._counter_map: +self._counter_map[prefix] = 0 +else: +self._counter_map[prefix] += 1 +return "%s_%d_%s" % (prefix, self._counter_map[prefix], uuid.uuid4()) + +def add_python_file(self, file_path): +key = self._generate_file_key(DependencyManager.PYTHON_FILE_PREFIX) +self._python_file_map[key] = os.path.basename(file_path) +self._parameters.set_string( +DependencyManager.PYTHON_FILE_MAP, json.dumps(self._python_file_map)) +self.register_file(key, file_path) + +def set_python_requirements(self, requirements_file_path, requirements_cached_dir=None): + +if self._parameters.contains_key(DependencyManager.PYTHON_REQUIREMENTS_FILE): +self.remove_file( + self._parameters.get_string(DependencyManager.PYTHON_REQUIREMENTS_FILE, "")) + self._parameters.remove_config(DependencyManager.PYTHON_REQUIREMENTS_FILE) + +if self._parameters.contains_key(DependencyManager.PYTHON_REQUIREMENTS_CACHE): +self.remove_file( + self._parameters.get_string(DependencyManager.PYTHON_REQUIREMENTS_CACHE, "")) + self._parameters.remove_config(DependencyManager.PYTHON_REQUIREMENTS_CACHE) + +requirements_file_key = "%s_%s" % ( +DependencyManager.PYTHON_REQUIREMENTS_FILE_PREFIX, uuid.uuid4()) +self._parameters.set_string(DependencyManager.PYTHON_REQUIREMENTS_FILE, +requirements_file_key) +self.register_file(requirements_file_key, requirements_file_path) + +if requirements_cached_dir is not None: +requirements_cache_key = "%s_%s" % ( +DependencyManager.PYTHON_REQUIREMENTS_CACHE_PREFIX, uuid.uuid4()) + self._parameters.set_string(DependencyManager.PYTHON_REQUIREMENTS_CACHE, +requirements_cache_key) +self.register_file(requirements_cache_key, requirements_cached_dir) + Review comment: Extract a method to avoid code duplication? It also makes the code more readable. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] hequn8128 commented on a change in pull request #10017: [FLINK-14019][python] add support for managing environment and dependencies of Python UDF in Flink Python API
hequn8128 commented on a change in pull request #10017: [FLINK-14019][python] add support for managing environment and dependencies of Python UDF in Flink Python API URL: https://github.com/apache/flink/pull/10017#discussion_r342368907 ## File path: flink-python/src/main/java/org/apache/flink/python/ProcessEnvironmentManager.java ## @@ -0,0 +1,317 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.python; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.python.util.UnzipUtil; +import org.apache.flink.util.FileUtils; + +import org.apache.beam.model.pipeline.v1.RunnerApi; +import org.apache.beam.runners.core.construction.Environments; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.DataOutputStream; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.nio.file.FileSystems; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +/** + * The ProcessEnvironmentManager used to prepare the working dir of python UDF worker + * and create ProcessEnvironment object of Beam Fn API. It will be created if the python + * function runner is configured to run python UDF in process mode. + */ +@Internal +public class ProcessEnvironmentManager implements PythonEnvironmentManager { + + private static final Logger LOG = LoggerFactory.getLogger(ProcessEnvironmentManager.class); + + static final String PYTHON_REQUIREMENTS_FILE = "_PYTHON_REQUIREMENTS_FILE"; + static final String PYTHON_REQUIREMENTS_CACHE = "_PYTHON_REQUIREMENTS_CACHE"; + static final String PYTHON_REQUIREMENTS_TARGET_DIR = "_PYTHON_REQUIREMENTS_TARGET_DIR"; + static final String PYTHON_WORKING_DIR = "_PYTHON_WORKING_DIR"; + + static final String PYTHON_TMP_DIR_PREFIX = "python_dist_"; + static final String PYTHON_REQUIREMENTS_TARGET_DIR_NAME = "python_requirements_target"; + static final String PYTHON_ARCHIVES_DIR = "python_archives"; + static final String PYTHON_PATH_FILES_DIR = "python_path_files_dir"; + + private PythonDependencyManager dependencyManager; + private String pythonTmpDirectoryBase; + private String requirementsTargetDirPath; + private String pythonWorkingDirectory; + private String pythonPathFilesDirectory; + private Thread shutdownHook; + private Map systemEnv; + + static boolean testCopy = false; + + private ProcessEnvironmentManager( + PythonDependencyManager dependencyManager, + String pythonTmpDirectoryBase, + String pythonPathFilesDirectory, + String pythonWorkingDirectory, + String requirementsTargetDirPath, + Map systemEnv) { + this.dependencyManager = dependencyManager; + this.pythonTmpDirectoryBase = pythonTmpDirectoryBase; + this.pythonPathFilesDirectory = pythonPathFilesDirectory; + this.pythonWorkingDirectory = pythonWorkingDirectory; + this.requirementsTargetDirPath = requirementsTargetDirPath; + this.systemEnv = systemEnv; + } + + @Override + public void cleanup() { + if (shutdownHook != null) { + shutdownHook.run(); + Runtime.getRuntime().removeShutdownHook(shutdownHook); + shutdownHook = null; + } + } + + @Override + public RunnerApi.Environment createEnvironment() { + prepareEnvironment(); + Map generatedEnv = generateEnvironmentVariable(); + String flinkHomePath = systemEnv.get(ConfigConstants.ENV_FLINK_HOME_DIR); + String pythonWorkerCommand = + flinkHomePath + File.separator + "bin" + File.separator + "pyflink-udf-runner.sh"; + + return Environments.createProcessEnvironment( +
[GitHub] [flink] hequn8128 commented on a change in pull request #10017: [FLINK-14019][python] add support for managing environment and dependencies of Python UDF in Flink Python API
hequn8128 commented on a change in pull request #10017: [FLINK-14019][python] add support for managing environment and dependencies of Python UDF in Flink Python API URL: https://github.com/apache/flink/pull/10017#discussion_r341476368 ## File path: flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractPythonFunctionOperator.java ## @@ -110,6 +116,23 @@ public void open() throws Exception { LOG.info("The maximum bundle time is configured to {} milliseconds.", this.maxBundleTimeMills); } + String[] tmpDirectories = + getContainingTask().getEnvironment().getTaskManagerInfo().getTmpDirectories(); + Random rand = new Random(); + int tmpDirectoryIndex = rand.nextInt() % tmpDirectories.length; + String pythonTmpDirectoryRoot = tmpDirectories[tmpDirectoryIndex]; Review comment: Maybe extract these logic into a small method? Putting all logic in the open makes it a mess. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] hequn8128 commented on a change in pull request #10017: [FLINK-14019][python] add support for managing environment and dependencies of Python UDF in Flink Python API
hequn8128 commented on a change in pull request #10017: [FLINK-14019][python] add support for managing environment and dependencies of Python UDF in Flink Python API URL: https://github.com/apache/flink/pull/10017#discussion_r342395626 ## File path: flink-python/src/main/java/org/apache/flink/python/ProcessEnvironmentManager.java ## @@ -0,0 +1,317 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.python; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.python.util.UnzipUtil; +import org.apache.flink.util.FileUtils; + +import org.apache.beam.model.pipeline.v1.RunnerApi; +import org.apache.beam.runners.core.construction.Environments; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.DataOutputStream; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.nio.file.FileSystems; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +/** + * The ProcessEnvironmentManager used to prepare the working dir of python UDF worker + * and create ProcessEnvironment object of Beam Fn API. It will be created if the python + * function runner is configured to run python UDF in process mode. + */ +@Internal +public class ProcessEnvironmentManager implements PythonEnvironmentManager { + + private static final Logger LOG = LoggerFactory.getLogger(ProcessEnvironmentManager.class); + + static final String PYTHON_REQUIREMENTS_FILE = "_PYTHON_REQUIREMENTS_FILE"; + static final String PYTHON_REQUIREMENTS_CACHE = "_PYTHON_REQUIREMENTS_CACHE"; + static final String PYTHON_REQUIREMENTS_TARGET_DIR = "_PYTHON_REQUIREMENTS_TARGET_DIR"; + static final String PYTHON_WORKING_DIR = "_PYTHON_WORKING_DIR"; + + static final String PYTHON_TMP_DIR_PREFIX = "python_dist_"; + static final String PYTHON_REQUIREMENTS_TARGET_DIR_NAME = "python_requirements_target"; + static final String PYTHON_ARCHIVES_DIR = "python_archives"; + static final String PYTHON_PATH_FILES_DIR = "python_path_files_dir"; + + private PythonDependencyManager dependencyManager; + private String pythonTmpDirectoryBase; + private String requirementsTargetDirPath; + private String pythonWorkingDirectory; + private String pythonPathFilesDirectory; + private Thread shutdownHook; + private Map systemEnv; + + static boolean testCopy = false; + + private ProcessEnvironmentManager( + PythonDependencyManager dependencyManager, + String pythonTmpDirectoryBase, + String pythonPathFilesDirectory, + String pythonWorkingDirectory, + String requirementsTargetDirPath, + Map systemEnv) { + this.dependencyManager = dependencyManager; + this.pythonTmpDirectoryBase = pythonTmpDirectoryBase; + this.pythonPathFilesDirectory = pythonPathFilesDirectory; + this.pythonWorkingDirectory = pythonWorkingDirectory; + this.requirementsTargetDirPath = requirementsTargetDirPath; + this.systemEnv = systemEnv; + } + + @Override + public void cleanup() { + if (shutdownHook != null) { + shutdownHook.run(); + Runtime.getRuntime().removeShutdownHook(shutdownHook); + shutdownHook = null; + } + } + + @Override + public RunnerApi.Environment createEnvironment() { + prepareEnvironment(); + Map generatedEnv = generateEnvironmentVariable(); + String flinkHomePath = systemEnv.get(ConfigConstants.ENV_FLINK_HOME_DIR); + String pythonWorkerCommand = + flinkHomePath + File.separator + "bin" + File.separator + "pyflink-udf-runner.sh"; + + return Environments.createProcessEnvironment( +
[GitHub] [flink] hequn8128 commented on a change in pull request #10017: [FLINK-14019][python] add support for managing environment and dependencies of Python UDF in Flink Python API
hequn8128 commented on a change in pull request #10017: [FLINK-14019][python] add support for managing environment and dependencies of Python UDF in Flink Python API URL: https://github.com/apache/flink/pull/10017#discussion_r342377276 ## File path: flink-python/src/main/java/org/apache/flink/python/ProcessEnvironmentManager.java ## @@ -0,0 +1,317 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.python; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.python.util.UnzipUtil; +import org.apache.flink.util.FileUtils; + +import org.apache.beam.model.pipeline.v1.RunnerApi; +import org.apache.beam.runners.core.construction.Environments; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.DataOutputStream; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.nio.file.FileSystems; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +/** + * The ProcessEnvironmentManager used to prepare the working dir of python UDF worker + * and create ProcessEnvironment object of Beam Fn API. It will be created if the python + * function runner is configured to run python UDF in process mode. + */ +@Internal +public class ProcessEnvironmentManager implements PythonEnvironmentManager { + + private static final Logger LOG = LoggerFactory.getLogger(ProcessEnvironmentManager.class); + + static final String PYTHON_REQUIREMENTS_FILE = "_PYTHON_REQUIREMENTS_FILE"; + static final String PYTHON_REQUIREMENTS_CACHE = "_PYTHON_REQUIREMENTS_CACHE"; + static final String PYTHON_REQUIREMENTS_TARGET_DIR = "_PYTHON_REQUIREMENTS_TARGET_DIR"; + static final String PYTHON_WORKING_DIR = "_PYTHON_WORKING_DIR"; + + static final String PYTHON_TMP_DIR_PREFIX = "python_dist_"; + static final String PYTHON_REQUIREMENTS_TARGET_DIR_NAME = "python_requirements_target"; + static final String PYTHON_ARCHIVES_DIR = "python_archives"; + static final String PYTHON_PATH_FILES_DIR = "python_path_files_dir"; + + private PythonDependencyManager dependencyManager; + private String pythonTmpDirectoryBase; + private String requirementsTargetDirPath; + private String pythonWorkingDirectory; + private String pythonPathFilesDirectory; + private Thread shutdownHook; + private Map systemEnv; + + static boolean testCopy = false; + + private ProcessEnvironmentManager( + PythonDependencyManager dependencyManager, + String pythonTmpDirectoryBase, + String pythonPathFilesDirectory, + String pythonWorkingDirectory, + String requirementsTargetDirPath, + Map systemEnv) { + this.dependencyManager = dependencyManager; + this.pythonTmpDirectoryBase = pythonTmpDirectoryBase; + this.pythonPathFilesDirectory = pythonPathFilesDirectory; + this.pythonWorkingDirectory = pythonWorkingDirectory; + this.requirementsTargetDirPath = requirementsTargetDirPath; + this.systemEnv = systemEnv; + } + + @Override + public void cleanup() { + if (shutdownHook != null) { + shutdownHook.run(); + Runtime.getRuntime().removeShutdownHook(shutdownHook); + shutdownHook = null; + } + } + + @Override + public RunnerApi.Environment createEnvironment() { + prepareEnvironment(); + Map generatedEnv = generateEnvironmentVariable(); + String flinkHomePath = systemEnv.get(ConfigConstants.ENV_FLINK_HOME_DIR); + String pythonWorkerCommand = + flinkHomePath + File.separator + "bin" + File.separator + "pyflink-udf-runner.sh"; + + return Environments.createProcessEnvironment( +
[GitHub] [flink] hequn8128 commented on a change in pull request #10017: [FLINK-14019][python] add support for managing environment and dependencies of Python UDF in Flink Python API
hequn8128 commented on a change in pull request #10017: [FLINK-14019][python] add support for managing environment and dependencies of Python UDF in Flink Python API URL: https://github.com/apache/flink/pull/10017#discussion_r342351699 ## File path: flink-python/src/main/java/org/apache/flink/python/ProcessEnvironmentManager.java ## @@ -0,0 +1,317 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.python; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.python.util.UnzipUtil; +import org.apache.flink.util.FileUtils; + +import org.apache.beam.model.pipeline.v1.RunnerApi; +import org.apache.beam.runners.core.construction.Environments; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.DataOutputStream; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.nio.file.FileSystems; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +/** + * The ProcessEnvironmentManager used to prepare the working dir of python UDF worker + * and create ProcessEnvironment object of Beam Fn API. It will be created if the python + * function runner is configured to run python UDF in process mode. + */ +@Internal +public class ProcessEnvironmentManager implements PythonEnvironmentManager { + + private static final Logger LOG = LoggerFactory.getLogger(ProcessEnvironmentManager.class); + + static final String PYTHON_REQUIREMENTS_FILE = "_PYTHON_REQUIREMENTS_FILE"; + static final String PYTHON_REQUIREMENTS_CACHE = "_PYTHON_REQUIREMENTS_CACHE"; + static final String PYTHON_REQUIREMENTS_TARGET_DIR = "_PYTHON_REQUIREMENTS_TARGET_DIR"; + static final String PYTHON_WORKING_DIR = "_PYTHON_WORKING_DIR"; + + static final String PYTHON_TMP_DIR_PREFIX = "python_dist_"; + static final String PYTHON_REQUIREMENTS_TARGET_DIR_NAME = "python_requirements_target"; + static final String PYTHON_ARCHIVES_DIR = "python_archives"; + static final String PYTHON_PATH_FILES_DIR = "python_path_files_dir"; + + private PythonDependencyManager dependencyManager; + private String pythonTmpDirectoryBase; + private String requirementsTargetDirPath; + private String pythonWorkingDirectory; + private String pythonPathFilesDirectory; + private Thread shutdownHook; + private Map systemEnv; + + static boolean testCopy = false; + + private ProcessEnvironmentManager( + PythonDependencyManager dependencyManager, + String pythonTmpDirectoryBase, + String pythonPathFilesDirectory, + String pythonWorkingDirectory, + String requirementsTargetDirPath, + Map systemEnv) { + this.dependencyManager = dependencyManager; + this.pythonTmpDirectoryBase = pythonTmpDirectoryBase; + this.pythonPathFilesDirectory = pythonPathFilesDirectory; + this.pythonWorkingDirectory = pythonWorkingDirectory; + this.requirementsTargetDirPath = requirementsTargetDirPath; + this.systemEnv = systemEnv; + } + + @Override + public void cleanup() { + if (shutdownHook != null) { + shutdownHook.run(); + Runtime.getRuntime().removeShutdownHook(shutdownHook); + shutdownHook = null; + } + } + + @Override + public RunnerApi.Environment createEnvironment() { + prepareEnvironment(); + Map generatedEnv = generateEnvironmentVariable(); + String flinkHomePath = systemEnv.get(ConfigConstants.ENV_FLINK_HOME_DIR); + String pythonWorkerCommand = + flinkHomePath + File.separator + "bin" + File.separator + "pyflink-udf-runner.sh"; + + return Environments.createProcessEnvironment( +
[GitHub] [flink] hequn8128 commented on a change in pull request #10017: [FLINK-14019][python] add support for managing environment and dependencies of Python UDF in Flink Python API
hequn8128 commented on a change in pull request #10017: [FLINK-14019][python] add support for managing environment and dependencies of Python UDF in Flink Python API URL: https://github.com/apache/flink/pull/10017#discussion_r342006956 ## File path: flink-python/pyflink/common/dependency_manager.py ## @@ -0,0 +1,106 @@ + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json +import os +import uuid + +__all__ = ['DependencyManager'] + + +class DependencyManager(object): +""" +Container class of dependency-related parameters. It collects all the dependency parameters +and transmit them to JVM before executing job. +""" + +PYTHON_FILE_PREFIX = "python_file" +PYTHON_REQUIREMENTS_FILE_PREFIX = "python_requirements_file" +PYTHON_REQUIREMENTS_CACHE_PREFIX = "python_requirements_cache" +PYTHON_ARCHIVE_PREFIX = "python_archive" + +PYTHON_FILE_MAP = "PYTHON_FILE_MAP" +PYTHON_REQUIREMENTS_FILE = "PYTHON_REQUIREMENTS_FILE" +PYTHON_REQUIREMENTS_CACHE = "PYTHON_REQUIREMENTS_CACHE" +PYTHON_ARCHIVES_MAP = "PYTHON_ARCHIVES_MAP" +PYTHON_EXEC = "PYTHON_EXEC" + +def __init__(self, parameters, j_env): +self._parameters = parameters +self._j_env = j_env +self._python_file_map = dict() # type: dict[str, str] +self._archives_map = dict() # type: dict[str, str] +self._counter_map = dict() # type: dict[str, int] + +def _generate_file_key(self, prefix): +if prefix not in self._counter_map: +self._counter_map[prefix] = 0 +else: +self._counter_map[prefix] += 1 +return "%s_%d_%s" % (prefix, self._counter_map[prefix], uuid.uuid4()) + +def add_python_file(self, file_path): +key = self._generate_file_key(DependencyManager.PYTHON_FILE_PREFIX) +self._python_file_map[key] = os.path.basename(file_path) +self._parameters.set_string( +DependencyManager.PYTHON_FILE_MAP, json.dumps(self._python_file_map)) +self.register_file(key, file_path) + +def set_python_requirements(self, requirements_file_path, requirements_cached_dir=None): Review comment: Add some inline comments for this method? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #10081: [FLINK-14605][hive] Use Hive-1.1.0 as the profile to test against 1.1.x
flinkbot edited a comment on issue #10081: [FLINK-14605][hive] Use Hive-1.1.0 as the profile to test against 1.1.x URL: https://github.com/apache/flink/pull/10081#issuecomment-549666964 ## CI report: * c8a2505e137a9bdf9eadcc3c562b851285787965 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/134971651) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] KurtYoung commented on a change in pull request #9864: [FLINK-14254][table] Introduce FileSystemOutputFormat for batch
KurtYoung commented on a change in pull request #9864: [FLINK-14254][table] Introduce FileSystemOutputFormat for batch URL: https://github.com/apache/flink/pull/9864#discussion_r342396703 ## File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/GroupedPartitionWriter.java ## @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.filesystem; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.io.OutputFormat; + +/** + * {@link PartitionWriter} for grouped dynamic partition inserting. It will create a new format + * when partition changed. + * + * @param The type of the consumed records. + */ +@Internal +public class GroupedPartitionWriter implements PartitionWriter { + + private final Context context; + private final PathGenerator pathGenerator; + private final PartitionComputer computer; + private final PartitionPathMaker maker; + + private OutputFormat currentFormat; + private String currentPartition; + + public GroupedPartitionWriter( + Context context, + PathGenerator pathGenerator, + PartitionComputer computer, + PartitionPathMaker maker) { + this.context = context; + this.pathGenerator = pathGenerator; + this.computer = computer; + this.maker = maker; + } + + @Override + public void write(T in) throws Exception { + String partition = maker.makePartitionPath(computer.makePartitionValues(in)); + if (currentPartition == null || !partition.equals(currentPartition)) { Review comment: partition.equals(null) will be false, no? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] JingsongLi commented on a change in pull request #9864: [FLINK-14254][table] Introduce FileSystemOutputFormat for batch
JingsongLi commented on a change in pull request #9864: [FLINK-14254][table] Introduce FileSystemOutputFormat for batch URL: https://github.com/apache/flink/pull/9864#discussion_r342394854 ## File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/GroupedPartitionWriter.java ## @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.filesystem; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.io.OutputFormat; + +/** + * {@link PartitionWriter} for grouped dynamic partition inserting. It will create a new format + * when partition changed. + * + * @param The type of the consumed records. + */ +@Internal +public class GroupedPartitionWriter implements PartitionWriter { + + private final Context context; + private final PathGenerator pathGenerator; + private final PartitionComputer computer; + private final PartitionPathMaker maker; + + private OutputFormat currentFormat; + private String currentPartition; + + public GroupedPartitionWriter( + Context context, + PathGenerator pathGenerator, + PartitionComputer computer, + PartitionPathMaker maker) { + this.context = context; + this.pathGenerator = pathGenerator; + this.computer = computer; + this.maker = maker; + } + + @Override + public void write(T in) throws Exception { + String partition = maker.makePartitionPath(computer.makePartitionValues(in)); + if (currentPartition == null || !partition.equals(currentPartition)) { Review comment: For grouped input, at the first, the `currentPartition` is null. And then, we need close previous format. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] JingsongLi commented on a change in pull request #9864: [FLINK-14254][table] Introduce FileSystemOutputFormat for batch
JingsongLi commented on a change in pull request #9864: [FLINK-14254][table] Introduce FileSystemOutputFormat for batch URL: https://github.com/apache/flink/pull/9864#discussion_r342394368 ## File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/PartitionPathMaker.java ## @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.filesystem; + +import java.io.Serializable; +import java.util.LinkedHashMap; + +/** + * Interface to make partition path. May need handle escape chars and default partition name + * for null values. + */ +public interface PartitionPathMaker extends Serializable { Review comment: I will delete it and use utils. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #10060: [FLINK-14546] [flink-json] Support map type in flink-json
flinkbot edited a comment on issue #10060: [FLINK-14546] [flink-json] Support map type in flink-json URL: https://github.com/apache/flink/pull/10060#issuecomment-548313242 ## CI report: * c68dc12d24be4ef19efc1ef42246e6018c4f6e8d : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/134363638) * f65310be9fb96f2446398a718dabeb4bd5a7a874 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/134963469) * 8dc48c4a16d1f4f82c78b14b18c0832fbb235d3d : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/134970076) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] wuchong commented on a change in pull request #10039: [FLINK-14262][table] Support referencing function with fully/partially qualified names in SQL
wuchong commented on a change in pull request #10039: [FLINK-14262][table] Support referencing function with fully/partially qualified names in SQL URL: https://github.com/apache/flink/pull/10039#discussion_r342376155 ## File path: flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/catalog/CatalogTableITCase.scala ## @@ -84,6 +94,41 @@ class CatalogTableITCase(isStreamingMode: Boolean) { tableEnv.execute(name) } + private def testUdf(sql: String): Unit = { +val sinkDDL = + """ +|create table sinkT( +| a bigint +|) with ( +| 'connector' = 'COLLECTION' +|) + """.stripMargin +tableEnv.sqlUpdate(sinkDDL) +tableEnv.sqlUpdate(sql) +tableEnv.execute("") +assertEquals(Seq(toRow(2L)), TestCollectionTableFactory.RESULT.sorted) + } + + @Test + def testUdfWithFullIdentifier(): Unit = { +testUdf("insert into sinkT select default_catalog.default_database.myfunc(cast(1 as bigint))") Review comment: I think we should also test `default_database.myfunc` and `myfunc`. Could you also verify it also works for temproary (system/catalog) function? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] wuchong commented on a change in pull request #10039: [FLINK-14262][table] Support referencing function with fully/partially qualified names in SQL
wuchong commented on a change in pull request #10039: [FLINK-14262][table] Support referencing function with fully/partially qualified names in SQL URL: https://github.com/apache/flink/pull/10039#discussion_r342376155 ## File path: flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/catalog/CatalogTableITCase.scala ## @@ -84,6 +94,41 @@ class CatalogTableITCase(isStreamingMode: Boolean) { tableEnv.execute(name) } + private def testUdf(sql: String): Unit = { +val sinkDDL = + """ +|create table sinkT( +| a bigint +|) with ( +| 'connector' = 'COLLECTION' +|) + """.stripMargin +tableEnv.sqlUpdate(sinkDDL) +tableEnv.sqlUpdate(sql) +tableEnv.execute("") +assertEquals(Seq(toRow(2L)), TestCollectionTableFactory.RESULT.sorted) + } + + @Test + def testUdfWithFullIdentifier(): Unit = { +testUdf("insert into sinkT select default_catalog.default_database.myfunc(cast(1 as bigint))") Review comment: I think we should also test `default_database.myfunc` and `myfunc`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] wuchong commented on a change in pull request #10039: [FLINK-14262][table] Support referencing function with fully/partially qualified names in SQL
wuchong commented on a change in pull request #10039: [FLINK-14262][table] Support referencing function with fully/partially qualified names in SQL URL: https://github.com/apache/flink/pull/10039#discussion_r342375742 ## File path: flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/catalog/CatalogTableITCase.scala ## @@ -84,6 +94,41 @@ class CatalogTableITCase(isStreamingMode: Boolean) { tableEnv.execute(name) } + private def testUdf(sql: String): Unit = { +val sinkDDL = + """ +|create table sinkT( +| a bigint +|) with ( +| 'connector' = 'COLLECTION' +|) + """.stripMargin +tableEnv.sqlUpdate(sinkDDL) +tableEnv.sqlUpdate(sql) +tableEnv.execute("") +assertEquals(Seq(toRow(2L)), TestCollectionTableFactory.RESULT.sorted) Review comment: The expected result is hard code here which is not clean. It would be better to pass in the expected result as parameter. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] wuchong commented on a change in pull request #10039: [FLINK-14262][table] Support referencing function with fully/partially qualified names in SQL
wuchong commented on a change in pull request #10039: [FLINK-14262][table] Support referencing function with fully/partially qualified names in SQL URL: https://github.com/apache/flink/pull/10039#discussion_r342388213 ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/functions/utils/ScalarSqlFunction.scala ## @@ -45,13 +44,13 @@ import scala.collection.JavaConverters._ * @param typeFactorytype factory for converting Flink's between Calcite's types */ class ScalarSqlFunction( -name: String, +name: SqlIdentifier, Review comment: I would suggest to use `FunctionIdentifier` as the name parameter. 1)`FunctionIdentifier` is a more strict interface, either 3-part full path (UDF), or 1-part name (built-in). 2) `FunctionIdentifier` has a better utility to constuct, rather than using `SqlParserPos.ZERO` here and there. 3) the constructor of `ScalarSqlFunction` is not only from SQL path, but also Table API path, but it is weird to construct a `SqlIdentifier` in Table API path. The same to `TableSqlFunction`, `AggregateSqlFunction`, etc... What do you think? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] wuchong commented on a change in pull request #10039: [FLINK-14262][table] Support referencing function with fully/partially qualified names in SQL
wuchong commented on a change in pull request #10039: [FLINK-14262][table] Support referencing function with fully/partially qualified names in SQL URL: https://github.com/apache/flink/pull/10039#discussion_r342380567 ## File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/catalog/FunctionCatalogOperatorTable.java ## @@ -184,4 +188,21 @@ private boolean isNotUserFunction(SqlFunctionCategory category) { public List getOperatorList() { throw new UnsupportedOperationException("This should never be called"); } + + public static FunctionIdentifier toFunctionIdentifier(String[] names, CatalogManager catalogManager) { + return names.length == 1 ? + FunctionIdentifier.of(names[0]) : + FunctionIdentifier.of( + catalogManager.qualifyIdentifier(UnresolvedIdentifier.of(names))); + } + + public static SqlIdentifier createSqlIdentifier(CallExpression call, UserDefinedFunction function) { Review comment: The `createSqlIdentifier` and `toFunctionIdentifier` utility is weird in this class, this class is not a utility class. Maybe `UserDefinedFunctionUtils` is a better place. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] wuchong commented on a change in pull request #10039: [FLINK-14262][table] Support referencing function with fully/partially qualified names in SQL
wuchong commented on a change in pull request #10039: [FLINK-14262][table] Support referencing function with fully/partially qualified names in SQL URL: https://github.com/apache/flink/pull/10039#discussion_r342393287 ## File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/catalog/FunctionCatalogOperatorTable.java ## @@ -74,7 +84,7 @@ public void lookupOperatorOverloads( SqlSyntax syntax, List operatorList, SqlNameMatcher nameMatcher) { - if (!opName.isSimple()) { + if (opName.isStar()) { Review comment: nit: `opName.isStar() && syntax == SqlSyntax.FUNCTION`. I just add a test in local, `f0 is not null` will also go into `lookupFunction` which should never happen. `FunctionCatalog.lookupFunction` should only handle functions, not prefix or postfix syntax. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-14607) SharedSlot cannot fulfill pending slot requests before it's completely released
[ https://issues.apache.org/jira/browse/FLINK-14607?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhu Zhu updated FLINK-14607: Description: Currently a pending request can only be fulfilled when a physical slot({{AllocatedSlot}}) becomes available in {{SlotPool}}. A shared slot however, cannot be used to fulfill pending requests even if it becomes qualified. This may lead to resource deadlocks in certain cases. For example, running job A(parallelism=2) --(pipelined)--> B(parallelism=2) with 1 slot only, all vertices are in the same slot sharing group, here's what may happen: 1. Schedule A1 and A2. A1 acquires the only slot, A2's slot request is pending because a slot cannot host 2 instances of the same JobVertex at the same time. Shared slot status: \{A1\} 2. A1 produces data and triggers the scheduling of B1. Shared slot status {A1, B1} 3. A1 finishes. Shared slot status {B1} 4. B1 cannot finish since A2 has not finished, while A2 cannot get launched due to no physical slot becomes available, even though the slot is qualified for host it now. A resource deadlock happens. Maybe we should improve {{SlotSharingManager}}. One a task slot is released, its root {{MultiTaskSlot}} should be used to try fulfilling existing pending task slots from other pending root slots({{unresolvedRootSlots}}) in this {{SlotSharingManager}}(means in the same slot sharing group). We need to be careful to not cause any failures, and do not violate colocation constraints. cc [~trohrmann] was: Currently a pending request can only be fulfilled when a physical slot({{AllocatedSlot}}) becomes available in {{SlotPool}}. A shared slot however, cannot be used to fulfill pending requests even if it becomes qualified. This may lead to resource deadlocks in certain cases. For example, running job A(parallelism=2) --(pipelined)--> B(parallelism=2) with 1 slot only, all vertices are in the same slot sharing group, here's what may happen: 1. Schedule A1 and A2. A1 acquires the only slot, A2's slot request is pending because a slot cannot host 2 instances of the same JobVertex at the same time. Shared slot status {A1} 2. A1 produces data and triggers the scheduling of B1. Shared slot status {A1, B1} 3. A1 finishes. Shared slot status {B1} 4. B1 cannot finish since A2 has not finished, while A2 cannot get launched due to no physical slot becomes available, even though the slot is qualified for host it now. A resource deadlock happens. Maybe we should improve {{SlotSharingManager}}. One a task slot is released, its root {{MultiTaskSlot}} should be used to try fulfilling existing pending task slots from other pending root slots({{unresolvedRootSlots}}) in this {{SlotSharingManager}}(means in the same slot sharing group). We need to be careful to not cause any failures, and do not violate colocation constraints. cc [~trohrmann] > SharedSlot cannot fulfill pending slot requests before it's completely > released > --- > > Key: FLINK-14607 > URL: https://issues.apache.org/jira/browse/FLINK-14607 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.10.0, 1.9.1 >Reporter: Zhu Zhu >Priority: Major > > Currently a pending request can only be fulfilled when a physical > slot({{AllocatedSlot}}) becomes available in {{SlotPool}}. > A shared slot however, cannot be used to fulfill pending requests even if it > becomes qualified. This may lead to resource deadlocks in certain cases. > For example, running job A(parallelism=2) --(pipelined)--> B(parallelism=2) > with 1 slot only, all vertices are in the same slot sharing group, here's > what may happen: > 1. Schedule A1 and A2. A1 acquires the only slot, A2's slot request is > pending because a slot cannot host 2 instances of the same JobVertex at the > same time. Shared slot status: \{A1\} > 2. A1 produces data and triggers the scheduling of B1. Shared slot status > {A1, B1} > 3. A1 finishes. Shared slot status {B1} > 4. B1 cannot finish since A2 has not finished, while A2 cannot get launched > due to no physical slot becomes available, even though the slot is qualified > for host it now. A resource deadlock happens. > Maybe we should improve {{SlotSharingManager}}. One a task slot is released, > its root {{MultiTaskSlot}} should be used to try fulfilling existing pending > task slots from other pending root slots({{unresolvedRootSlots}}) in this > {{SlotSharingManager}}(means in the same slot sharing group). > We need to be careful to not cause any failures, and do not violate > colocation constraints. > cc [~trohrmann] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-14607) SharedSlot cannot fulfill pending slot requests before it's completely released
[ https://issues.apache.org/jira/browse/FLINK-14607?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhu Zhu updated FLINK-14607: Description: Currently a pending request can only be fulfilled when a physical slot({{AllocatedSlot}}) becomes available in {{SlotPool}}. A shared slot however, cannot be used to fulfill pending requests even if it becomes qualified. This may lead to resource deadlocks in certain cases. For example, running job A(parallelism=2) --(pipelined)--> B(parallelism=2) with 1 slot only, all vertices are in the same slot sharing group, here's what may happen: 1. Schedule A1 and A2. A1 acquires the only slot, A2's slot request is pending because a slot cannot host 2 instances of the same JobVertex at the same time. Shared slot status: \{A1\} 2. A1 produces data and triggers the scheduling of B1. Shared slot status: \{A1, B1\} 3. A1 finishes. Shared slot status: \{B1\} 4. B1 cannot finish since A2 has not finished, while A2 cannot get launched due to no physical slot becomes available, even though the shred slot is qualified to host it now. A resource deadlock happens. Maybe we should improve {{SlotSharingManager}}. One a task slot is released, its root {{MultiTaskSlot}} should be used to try fulfilling existing pending task slots from other pending root slots({{unresolvedRootSlots}}) in this {{SlotSharingManager}}(means in the same slot sharing group). We need to be careful to not cause any failures, and do not violate colocation constraints. cc [~trohrmann] was: Currently a pending request can only be fulfilled when a physical slot({{AllocatedSlot}}) becomes available in {{SlotPool}}. A shared slot however, cannot be used to fulfill pending requests even if it becomes qualified. This may lead to resource deadlocks in certain cases. For example, running job A(parallelism=2) --(pipelined)--> B(parallelism=2) with 1 slot only, all vertices are in the same slot sharing group, here's what may happen: 1. Schedule A1 and A2. A1 acquires the only slot, A2's slot request is pending because a slot cannot host 2 instances of the same JobVertex at the same time. Shared slot status: \{A1\} 2. A1 produces data and triggers the scheduling of B1. Shared slot status: \{A1, B1\} 3. A1 finishes. Shared slot status: \{B1\} 4. B1 cannot finish since A2 has not finished, while A2 cannot get launched due to no physical slot becomes available, even though the slot is qualified for host it now. A resource deadlock happens. Maybe we should improve {{SlotSharingManager}}. One a task slot is released, its root {{MultiTaskSlot}} should be used to try fulfilling existing pending task slots from other pending root slots({{unresolvedRootSlots}}) in this {{SlotSharingManager}}(means in the same slot sharing group). We need to be careful to not cause any failures, and do not violate colocation constraints. cc [~trohrmann] > SharedSlot cannot fulfill pending slot requests before it's completely > released > --- > > Key: FLINK-14607 > URL: https://issues.apache.org/jira/browse/FLINK-14607 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.10.0, 1.9.1 >Reporter: Zhu Zhu >Priority: Major > > Currently a pending request can only be fulfilled when a physical > slot({{AllocatedSlot}}) becomes available in {{SlotPool}}. > A shared slot however, cannot be used to fulfill pending requests even if it > becomes qualified. This may lead to resource deadlocks in certain cases. > For example, running job A(parallelism=2) --(pipelined)--> B(parallelism=2) > with 1 slot only, all vertices are in the same slot sharing group, here's > what may happen: > 1. Schedule A1 and A2. A1 acquires the only slot, A2's slot request is > pending because a slot cannot host 2 instances of the same JobVertex at the > same time. Shared slot status: \{A1\} > 2. A1 produces data and triggers the scheduling of B1. Shared slot status: > \{A1, B1\} > 3. A1 finishes. Shared slot status: \{B1\} > 4. B1 cannot finish since A2 has not finished, while A2 cannot get launched > due to no physical slot becomes available, even though the shred slot is > qualified to host it now. A resource deadlock happens. > Maybe we should improve {{SlotSharingManager}}. One a task slot is released, > its root {{MultiTaskSlot}} should be used to try fulfilling existing pending > task slots from other pending root slots({{unresolvedRootSlots}}) in this > {{SlotSharingManager}}(means in the same slot sharing group). > We need to be careful to not cause any failures, and do not violate > colocation constraints. > cc [~trohrmann] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-14607) SharedSlot cannot fulfill pending slot requests before it's completely released
[ https://issues.apache.org/jira/browse/FLINK-14607?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhu Zhu updated FLINK-14607: Description: Currently a pending request can only be fulfilled when a physical slot({{AllocatedSlot}}) becomes available in {{SlotPool}}. A shared slot however, cannot be used to fulfill pending requests even if it becomes qualified. This may lead to resource deadlocks in certain cases. For example, running job A(parallelism=2) --(pipelined)--> B(parallelism=2) with 1 slot only, all vertices are in the same slot sharing group, here's what may happen: 1. Schedule A1 and A2. A1 acquires the only slot, A2's slot request is pending because a slot cannot host 2 instances of the same JobVertex at the same time. Shared slot status: \{A1\} 2. A1 produces data and triggers the scheduling of B1. Shared slot status: \{A1, B1\} 3. A1 finishes. Shared slot status: \{B1\} 4. B1 cannot finish since A2 has not finished, while A2 cannot get launched due to no physical slot becomes available, even though the slot is qualified for host it now. A resource deadlock happens. Maybe we should improve {{SlotSharingManager}}. One a task slot is released, its root {{MultiTaskSlot}} should be used to try fulfilling existing pending task slots from other pending root slots({{unresolvedRootSlots}}) in this {{SlotSharingManager}}(means in the same slot sharing group). We need to be careful to not cause any failures, and do not violate colocation constraints. cc [~trohrmann] was: Currently a pending request can only be fulfilled when a physical slot({{AllocatedSlot}}) becomes available in {{SlotPool}}. A shared slot however, cannot be used to fulfill pending requests even if it becomes qualified. This may lead to resource deadlocks in certain cases. For example, running job A(parallelism=2) --(pipelined)--> B(parallelism=2) with 1 slot only, all vertices are in the same slot sharing group, here's what may happen: 1. Schedule A1 and A2. A1 acquires the only slot, A2's slot request is pending because a slot cannot host 2 instances of the same JobVertex at the same time. Shared slot status: \{A1\} 2. A1 produces data and triggers the scheduling of B1. Shared slot status {A1, B1} 3. A1 finishes. Shared slot status {B1} 4. B1 cannot finish since A2 has not finished, while A2 cannot get launched due to no physical slot becomes available, even though the slot is qualified for host it now. A resource deadlock happens. Maybe we should improve {{SlotSharingManager}}. One a task slot is released, its root {{MultiTaskSlot}} should be used to try fulfilling existing pending task slots from other pending root slots({{unresolvedRootSlots}}) in this {{SlotSharingManager}}(means in the same slot sharing group). We need to be careful to not cause any failures, and do not violate colocation constraints. cc [~trohrmann] > SharedSlot cannot fulfill pending slot requests before it's completely > released > --- > > Key: FLINK-14607 > URL: https://issues.apache.org/jira/browse/FLINK-14607 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.10.0, 1.9.1 >Reporter: Zhu Zhu >Priority: Major > > Currently a pending request can only be fulfilled when a physical > slot({{AllocatedSlot}}) becomes available in {{SlotPool}}. > A shared slot however, cannot be used to fulfill pending requests even if it > becomes qualified. This may lead to resource deadlocks in certain cases. > For example, running job A(parallelism=2) --(pipelined)--> B(parallelism=2) > with 1 slot only, all vertices are in the same slot sharing group, here's > what may happen: > 1. Schedule A1 and A2. A1 acquires the only slot, A2's slot request is > pending because a slot cannot host 2 instances of the same JobVertex at the > same time. Shared slot status: \{A1\} > 2. A1 produces data and triggers the scheduling of B1. Shared slot status: > \{A1, B1\} > 3. A1 finishes. Shared slot status: \{B1\} > 4. B1 cannot finish since A2 has not finished, while A2 cannot get launched > due to no physical slot becomes available, even though the slot is qualified > for host it now. A resource deadlock happens. > Maybe we should improve {{SlotSharingManager}}. One a task slot is released, > its root {{MultiTaskSlot}} should be used to try fulfilling existing pending > task slots from other pending root slots({{unresolvedRootSlots}}) in this > {{SlotSharingManager}}(means in the same slot sharing group). > We need to be careful to not cause any failures, and do not violate > colocation constraints. > cc [~trohrmann] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-14607) SharedSlot cannot fulfill pending slot requests before it's completely released
[ https://issues.apache.org/jira/browse/FLINK-14607?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhu Zhu updated FLINK-14607: Description: Currently a pending request can only be fulfilled when a physical slot({{AllocatedSlot}}) becomes available in {{SlotPool}}. A shared slot however, cannot be used to fulfill pending requests even if it becomes qualified. This may lead to resource deadlocks in certain cases. For example, running job A(parallelism=2) --(pipelined)--> B(parallelism=2) with 1 slot only, all vertices are in the same slot sharing group, here's what may happen: 1. Schedule A1 and A2. A1 acquires the only slot, A2's slot request is pending because a slot cannot host 2 instances of the same JobVertex at the same time. Shared slot status {A1} 2. A1 produces data and triggers the scheduling of B1. Shared slot status {A1, B1} 3. A1 finishes. Shared slot status {B1} 4. B1 cannot finish since A2 has not finished, while A2 cannot get launched due to no physical slot becomes available, even though the slot is qualified for host it now. A resource deadlock happens. Maybe we should improve {{SlotSharingManager}}. One a task slot is released, its root {{MultiTaskSlot}} should be used to try fulfilling existing pending task slots from other pending root slots({{unresolvedRootSlots}}) in this {{SlotSharingManager}}(means in the same slot sharing group). We need to be careful to not cause any failures, and do not violate colocation constraints. cc [~trohrmann] was: Currently a pending request can only be fulfilled when a physical slot({{AllocatedSlot}}) becomes available in {{SlotPool}}. A shared slot however, cannot be used to fulfill pending requests even if it becomes qualified. This may lead to resource deadlocks in certain cases. For example, running job A(parallelism=2) --(pipelined)--> B(parallelism=2) with 1 slot only, all vertices are in the same slot sharing group, here's what may happen: 1. Schedule A1 and A2. A1 acquires the only slot, A2's slot request is pending because a slot cannot host 2 instances of the same JobVertex at the same time. Shared slot status: {A1} 2. A1 produces data and triggers the scheduling of B1. Shared slot status: {A1, B1} 3. A1 finishes. Shared slot status: {B1} 4. B1 cannot finish since A2 has not finished, while A2 cannot get launched due to no physical slot becomes available, even though the slot is qualified for host it now. A resource deadlock happens. Maybe we should improve {{SlotSharingManager}}. One a task slot is released, its root {{MultiTaskSlot}} should be used to try fulfilling existing pending task slots from other pending root slots({{unresolvedRootSlots}}) in this {{SlotSharingManager}}(means in the same slot sharing group). We need to be careful to not cause any failures, and do not violate colocation constraints. cc [~trohrmann] > SharedSlot cannot fulfill pending slot requests before it's completely > released > --- > > Key: FLINK-14607 > URL: https://issues.apache.org/jira/browse/FLINK-14607 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.10.0, 1.9.1 >Reporter: Zhu Zhu >Priority: Major > > Currently a pending request can only be fulfilled when a physical > slot({{AllocatedSlot}}) becomes available in {{SlotPool}}. > A shared slot however, cannot be used to fulfill pending requests even if it > becomes qualified. This may lead to resource deadlocks in certain cases. > For example, running job A(parallelism=2) --(pipelined)--> B(parallelism=2) > with 1 slot only, all vertices are in the same slot sharing group, here's > what may happen: > 1. Schedule A1 and A2. A1 acquires the only slot, A2's slot request is > pending because a slot cannot host 2 instances of the same JobVertex at the > same time. Shared slot status {A1} > 2. A1 produces data and triggers the scheduling of B1. Shared slot status > {A1, B1} > 3. A1 finishes. Shared slot status {B1} > 4. B1 cannot finish since A2 has not finished, while A2 cannot get launched > due to no physical slot becomes available, even though the slot is qualified > for host it now. A resource deadlock happens. > Maybe we should improve {{SlotSharingManager}}. One a task slot is released, > its root {{MultiTaskSlot}} should be used to try fulfilling existing pending > task slots from other pending root slots({{unresolvedRootSlots}}) in this > {{SlotSharingManager}}(means in the same slot sharing group). > We need to be careful to not cause any failures, and do not violate > colocation constraints. > cc [~trohrmann] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-14607) SharedSlot cannot fulfill pending slot requests before it's completely released
[ https://issues.apache.org/jira/browse/FLINK-14607?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhu Zhu updated FLINK-14607: Summary: SharedSlot cannot fulfill pending slot requests before it's completely released (was: SharedSlot cannot fulfill pending slot requests before it's totally released) > SharedSlot cannot fulfill pending slot requests before it's completely > released > --- > > Key: FLINK-14607 > URL: https://issues.apache.org/jira/browse/FLINK-14607 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.10.0, 1.9.1 >Reporter: Zhu Zhu >Priority: Major > > Currently a pending request can only be fulfilled when a physical > slot({{AllocatedSlot}}) becomes available in {{SlotPool}}. > A shared slot however, cannot be used to fulfill pending requests even if it > becomes qualified. This may lead to resource deadlocks in certain cases. > For example, running job A(parallelism=2) --(pipelined)--> B(parallelism=2) > with 1 slot only, all vertices are in the same slot sharing group, here's > what may happen: > 1. Schedule A1 and A2. A1 acquires the only slot, A2's slot request is > pending because a slot cannot host 2 instances of the same JobVertex at the > same time. Shared slot status: {A1} > 2. A1 produces data and triggers the scheduling of B1. Shared slot status: > {A1, B1} > 3. A1 finishes. Shared slot status: {B1} > 4. B1 cannot finish since A2 has not finished, while A2 cannot get launched > due to no physical slot becomes available, even though the slot is qualified > for host it now. A resource deadlock happens. > Maybe we should improve {{SlotSharingManager}}. One a task slot is released, > its root {{MultiTaskSlot}} should be used to try fulfilling existing pending > task slots from other pending root slots({{unresolvedRootSlots}}) in this > {{SlotSharingManager}}(means in the same slot sharing group). > We need to be careful to not cause any failures, and do not violate > colocation constraints. > cc [~trohrmann] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-14607) SharedSlot cannot fulfill pending slot requests before it's totally released
Zhu Zhu created FLINK-14607: --- Summary: SharedSlot cannot fulfill pending slot requests before it's totally released Key: FLINK-14607 URL: https://issues.apache.org/jira/browse/FLINK-14607 Project: Flink Issue Type: Bug Components: Runtime / Coordination Affects Versions: 1.9.1, 1.10.0 Reporter: Zhu Zhu Currently a pending request can only be fulfilled when a physical slot({{AllocatedSlot}}) becomes available in {{SlotPool}}. A shared slot however, cannot be used to fulfill pending requests even if it becomes qualified. This may lead to resource deadlocks in certain cases. For example, running job A(parallelism=2) --(pipelined)--> B(parallelism=2) with 1 slot only, all vertices are in the same slot sharing group, here's what may happen: 1. Schedule A1 and A2. A1 acquires the only slot, A2's slot request is pending because a slot cannot host 2 instances of the same JobVertex at the same time. Shared slot status: {A1} 2. A1 produces data and triggers the scheduling of B1. Shared slot status: {A1, B1} 3. A1 finishes. Shared slot status: {B1} 4. B1 cannot finish since A2 has not finished, while A2 cannot get launched due to no physical slot becomes available, even though the slot is qualified for host it now. A resource deadlock happens. Maybe we should improve {{SlotSharingManager}}. One a task slot is released, its root {{MultiTaskSlot}} should be used to try fulfilling existing pending task slots from other pending root slots({{unresolvedRootSlots}}) in this {{SlotSharingManager}}(means in the same slot sharing group). We need to be careful to not cause any failures, and do not violate colocation constraints. cc [~trohrmann] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] flinkbot edited a comment on issue #10081: [FLINK-14605][hive] Use Hive-1.1.0 as the profile to test against 1.1.x
flinkbot edited a comment on issue #10081: [FLINK-14605][hive] Use Hive-1.1.0 as the profile to test against 1.1.x URL: https://github.com/apache/flink/pull/10081#issuecomment-549666964 ## CI report: * c8a2505e137a9bdf9eadcc3c562b851285787965 : PENDING [Build](https://travis-ci.com/flink-ci/flink/builds/134971651) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-14606) Simplify params of Execution#processFail
Zhu Zhu created FLINK-14606: --- Summary: Simplify params of Execution#processFail Key: FLINK-14606 URL: https://issues.apache.org/jira/browse/FLINK-14606 Project: Flink Issue Type: Sub-task Components: Runtime / Coordination Affects Versions: 1.10.0 Reporter: Zhu Zhu Fix For: 1.10.0 The 3 params fromSchedulerNg/releasePartitions/isCallback of Execution#processFail are quite a mess while they seem to be correlated. I'd propose to simplify the prams of processFail by using a {{isInternalError}} to replace those 3 params. {{isInternalError}} is true iff the failure is from TM(strictly speaking, notified from SchedulerBase). This also hardens the handling of cases that a task is successfully deployed but JM does not realize it(see #3 below). Here's why these 3 params can be simplified: 1. {{fromSchedulerNg}}, true iff the failure is from TM and isLegacyScheduling==false. It's only used like this: {{if (!fromSchedulerNg && !isLegacyScheduling()))}}. So it's the same to use {{!isInternalFailure}} to replace it. 2. {{releasePartitions}}, true iff the failure is from TM. Now the value is exactly the same as {{isInternalFailure}}, we can drop it and use {{isInternalFailure}} instead. 3. {{isCallback}}, true iff the failure is from TM or the task is not deployed. It's only used like this: {{(!isCallback && (current == RUNNING || current == DEPLOYING))}}. So using {{!isInternalFailure}} to replace it would be enough. It is a bit different for the case that a task deployment to a task manager fails, which set {{isCallback}} to true previously. However, it would be safer to signal a cancel call, in case the deployment is actually a success but the response is lost on network. cc [~GJL] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] zhuzhurk commented on a change in pull request #10067: [FLINK-14375][runtime] Avoid to notify scheduler about fake or outdated state update
zhuzhurk commented on a change in pull request #10067: [FLINK-14375][runtime] Avoid to notify scheduler about fake or outdated state update URL: https://github.com/apache/flink/pull/10067#discussion_r342386939 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java ## @@ -1248,21 +1251,9 @@ private void processFail(Throwable t, boolean isCallback, Map
[GitHub] [flink] flinkbot edited a comment on issue #9885: [FLINK-14344][checkpointing] Snapshots master hook state asynchronously
flinkbot edited a comment on issue #9885: [FLINK-14344][checkpointing] Snapshots master hook state asynchronously URL: https://github.com/apache/flink/pull/9885#issuecomment-541299624 ## CI report: * d0c426e6ff61dcad1de4c7d3e9a2a16416e77e54 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/131633477) * 22434aac9a08ebb0ea1a61eba28ec12a55149d10 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/133945741) * cdff164bf39e74bdfd81332d84f12193a09997e5 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/134839623) * 30f912924c8497d9c55aa62a4987d94e31d409b5 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/134843066) * 5012d25a071772b683abddd7ef4c8d660e4c164d : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/134966688) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot commented on issue #10081: [FLINK-14605][hive] Use Hive-1.1.0 as the profile to test against 1.1.x
flinkbot commented on issue #10081: [FLINK-14605][hive] Use Hive-1.1.0 as the profile to test against 1.1.x URL: https://github.com/apache/flink/pull/10081#issuecomment-549666964 ## CI report: * c8a2505e137a9bdf9eadcc3c562b851285787965 : UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #10060: [FLINK-14546] [flink-json] Support map type in flink-json
flinkbot edited a comment on issue #10060: [FLINK-14546] [flink-json] Support map type in flink-json URL: https://github.com/apache/flink/pull/10060#issuecomment-548313242 ## CI report: * c68dc12d24be4ef19efc1ef42246e6018c4f6e8d : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/134363638) * f65310be9fb96f2446398a718dabeb4bd5a7a874 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/134963469) * 8dc48c4a16d1f4f82c78b14b18c0832fbb235d3d : PENDING [Build](https://travis-ci.com/flink-ci/flink/builds/134970076) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-14604) Bump commons-cli to 1.4
[ https://issues.apache.org/jira/browse/FLINK-14604?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wei Zhong updated FLINK-14604: -- Issue Type: Task (was: Improvement) > Bump commons-cli to 1.4 > --- > > Key: FLINK-14604 > URL: https://issues.apache.org/jira/browse/FLINK-14604 > Project: Flink > Issue Type: Task > Components: Command Line Client >Reporter: Wei Zhong >Priority: Major > > Currently flink is using commons-cli 1.3.1. There is a > [bug|https://issues.apache.org/jira/projects/CLI/issues/CLI-265] in it which > prevent us from using options that accept variable arguments in command line. > To be precise, it prevents us from accepting a short-name option after a > varargs option because there is a problem in the implementation of > DefaultParser#isShortOption() method in commons-cli 1.3.1: > {code:java} > /** > * Tells if the token looks like a short option. > * > * @param token > */ > private boolean isShortOption(String token) > { > // short options (-S, -SV, -S=V, -SV1=V2, -S1S2) > // PROBLEM: It assumes that short option only has single character, > // but in fact we have many multi-characters short options. > return token.startsWith("-") && token.length() >= 2 && > options.hasShortOption(token.substring(1, 2)); > } > {code} > If we bump the version to 1.4, we can solve this problem. > I request this change because there are 2 varargs options which hit this bug > in the design of command line options of [Python UDF Dependency > Management|https://cwiki.apache.org/confluence/display/FLINK/FLIP-78%3A+Flink+Python+UDF+Environment+and+Dependency+Management]. > It will be great helpful if we can bump the commons-cli version to 1.4 :). > The commons-cli 1.4 is also a stable version which released at Mar, 2017. And > today its usage statistic is greater than 1.3.1 on [maven central > repository|https://mvnrepository.com/artifact/commons-cli/commons-cli]. > I have pushed the change to my own travis to check if it breaks something. > This is the [link|https://travis-ci.org/WeiZhong94/flink/builds/607438208] > and it seems that everything is fine. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-12675) Event time synchronization in Kafka consumer
[ https://issues.apache.org/jira/browse/FLINK-12675?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16967243#comment-16967243 ] Jiayi Liao commented on FLINK-12675: [~thw] Since the FLIP-27 design will be updated soon according to the mail thread, I'm going to help to review the updated design and continue the work after that. > Event time synchronization in Kafka consumer > > > Key: FLINK-12675 > URL: https://issues.apache.org/jira/browse/FLINK-12675 > Project: Flink > Issue Type: Sub-task > Components: Connectors / Kafka >Reporter: Thomas Weise >Assignee: Thomas Weise >Priority: Major > > Integrate the source watermark tracking into the Kafka consumer and implement > the sync mechanism (different consumer model, compared to Kinesis). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] flinkbot edited a comment on issue #10067: [FLINK-14375][runtime] Avoid to notify scheduler about fake or outdated state update
flinkbot edited a comment on issue #10067: [FLINK-14375][runtime] Avoid to notify scheduler about fake or outdated state update URL: https://github.com/apache/flink/pull/10067#issuecomment-548812084 ## CI report: * d3341d9f0bff2cb97bd0b5bf507725ae6d670b88 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/134571714) * 37c17d2be1cdbd72cc8a3554f7c8cf09f2c8a400 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/134655935) * c290a32e36d7706fd5dd426a3ad85b65d113a6fc : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/134890002) * 4908a3ce961c7518b10e43831fa5aca2764cbf1a : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/134963479) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #10071: [FLINK-14371][runtime, tests] Enable ClassLoaderITCase/WindowCheckpointingITCase/EventTimeWindowCheckpointingITCase for NG scheduling
flinkbot edited a comment on issue #10071: [FLINK-14371][runtime, tests] Enable ClassLoaderITCase/WindowCheckpointingITCase/EventTimeWindowCheckpointingITCase for NG scheduling URL: https://github.com/apache/flink/pull/10071#issuecomment-549017258 ## CI report: * 80b38a072f3c5a415bf3c95e883c358a1648cc37 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/134655999) * 7e1c5a50ab017261aadda70e97852c298c05633b : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/134963492) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #10060: [FLINK-14546] [flink-json] Support map type in flink-json
flinkbot edited a comment on issue #10060: [FLINK-14546] [flink-json] Support map type in flink-json URL: https://github.com/apache/flink/pull/10060#issuecomment-548313242 ## CI report: * c68dc12d24be4ef19efc1ef42246e6018c4f6e8d : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/134363638) * f65310be9fb96f2446398a718dabeb4bd5a7a874 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/134963469) * 8dc48c4a16d1f4f82c78b14b18c0832fbb235d3d : UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #10042: [FLINK-11466][e2e] Design the e2e java framework so that the Kafka streaming tests and Kafka SQL tests can running on it
flinkbot edited a comment on issue #10042: [FLINK-11466][e2e] Design the e2e java framework so that the Kafka streaming tests and Kafka SQL tests can running on it URL: https://github.com/apache/flink/pull/10042#issuecomment-547792578 ## CI report: * b523eace793d9168a2816e50891d6227183a7175 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/134149629) * 876f0f86b208f6fce4d74488081791dcc7b89cf8 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/134326879) * 027d5fe9ca3832c9384a715f3f7e65df3fd64dcf : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/134395038) * 4108e40950bfcf9f26e8b62603a6c35402252966 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/134965103) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot commented on issue #10081: [FLINK-14605][hive] Use Hive-1.1.0 as the profile to test against 1.1.x
flinkbot commented on issue #10081: [FLINK-14605][hive] Use Hive-1.1.0 as the profile to test against 1.1.x URL: https://github.com/apache/flink/pull/10081#issuecomment-549661353 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Automated Checks Last check on commit c8a2505e137a9bdf9eadcc3c562b851285787965 (Tue Nov 05 04:48:22 UTC 2019) **Warnings:** * **1 pom.xml files were touched**: Check for build and licensing issues. * No documentation files were touched! Remember to keep the Flink docs up to date! * **This pull request references an unassigned [Jira ticket](https://issues.apache.org/jira/browse/FLINK-14605).** According to the [code contribution guide](https://flink.apache.org/contributing/contribute-code.html), tickets need to be assigned before starting with the implementation work. Mention the bot in a comment to re-run the automated checks. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] walterddr commented on a change in pull request #9355: [FLINK-13577][ml] Add an util class to build result row and generate …
walterddr commented on a change in pull request #9355: [FLINK-13577][ml] Add an util class to build result row and generate … URL: https://github.com/apache/flink/pull/9355#discussion_r342379204 ## File path: flink-ml-parent/flink-ml-lib/src/main/java/org/apache/flink/ml/common/utils/OutputColsHelper.java ## @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flink.ml.common.utils; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.types.Row; + +import org.apache.commons.lang3.ArrayUtils; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; + +/** + * Util for generating output schema when doing prediction or transformation. + * + * Input: + * 1) Schema of input data being predicted or transformed. + * 2) Output column names of the prediction/transformation operator. + * 3) Output column types of the prediction/transformation operator. + * 4) Reserved column names, which is a subset of input data's column names that we want to preserve. + * + * Output: + * 1)The result data schema. The result data is a combination of the preserved columns and the operator's + * output columns. + * + * Several rules are followed: + * 1) If reserved columns are not given, then all columns of input data is reserved. Review comment: I think this is actually the one default that confuses me: why if reserved columns are not given then preserves all input data? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] walterddr commented on a change in pull request #9355: [FLINK-13577][ml] Add an util class to build result row and generate …
walterddr commented on a change in pull request #9355: [FLINK-13577][ml] Add an util class to build result row and generate … URL: https://github.com/apache/flink/pull/9355#discussion_r342379344 ## File path: flink-ml-parent/flink-ml-lib/src/main/java/org/apache/flink/ml/common/utils/OutputColsHelper.java ## @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flink.ml.common.utils; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.types.Row; + +import org.apache.commons.lang3.ArrayUtils; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; + +/** + * Util for generating output schema when doing prediction or transformation. + * + * Input: + * 1) Schema of input data being predicted or transformed. + * 2) Output column names of the prediction/transformation operator. + * 3) Output column types of the prediction/transformation operator. + * 4) Reserved column names, which is a subset of input data's column names that we want to preserve. + * + * Output: + * 1)The result data schema. The result data is a combination of the preserved columns and the operator's + * output columns. + * + * Several rules are followed: + * 1) If reserved columns are not given, then all columns of input data is reserved. + * 2)The reserved columns are arranged ahead of the operator's output columns in the final output. + * 3) If some of the reserved column names overlap with those of operator's output columns, then the operator's Review comment: I think we should throw an exception when output column names and the user-defined reserved column names have conflict (ultimately both are defined by user, this shouldn't occur on the users' perspective) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-14605) Use Hive-1.1.0 as the profile to test against 1.1.x
[ https://issues.apache.org/jira/browse/FLINK-14605?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-14605: --- Labels: pull-request-available (was: ) > Use Hive-1.1.0 as the profile to test against 1.1.x > --- > > Key: FLINK-14605 > URL: https://issues.apache.org/jira/browse/FLINK-14605 > Project: Flink > Issue Type: Test > Components: Connectors / Hive >Reporter: Rui Li >Priority: Major > Labels: pull-request-available > > Hive-1.1.1 has the issue that it can't properly handle {{stored as > file_format}} syntax. So let's test against 1.1.0. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] walterddr commented on a change in pull request #9355: [FLINK-13577][ml] Add an util class to build result row and generate …
walterddr commented on a change in pull request #9355: [FLINK-13577][ml] Add an util class to build result row and generate … URL: https://github.com/apache/flink/pull/9355#discussion_r342377733 ## File path: flink-ml-parent/flink-ml-lib/src/main/java/org/apache/flink/ml/common/utils/OutputColsHelper.java ## @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flink.ml.common.utils; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.types.Row; + +import org.apache.commons.lang3.ArrayUtils; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; + +/** + * Util for generating output schema when doing prediction or transformation. + * + * Input: + * 1) Schema of input data being predicted or transformed. + * 2) Output column names of the prediction/transformation operator. + * 3) Output column types of the prediction/transformation operator. + * 4) Reserved column names, which is a subset of input data's column names that we want to preserve. + * + * Output: + * 1)The result data schema. The result data is a combination of the preserved columns and the operator's + * output columns. + * + * Several rules are followed: + * 1) If reserved columns are not given, then all columns of input data is reserved. + * 2)The reserved columns are arranged ahead of the operator's output columns in the final output. + * 3) If some of the reserved column names overlap with those of operator's output columns, then the operator's + * output columns override the conflicting reserved columns. + * + * For example, if we have input data schema of ["id":INT, "f1":FLOAT, "f2":DOUBLE], and the operator outputs + * a column "label" with type STRING, and we want to preserve the column "id", then we get the output + * schema of ["id":INT, "label":STRING]. + */ +public class OutputColsHelper implements Serializable { Review comment: based on the usage #9413 and #9523 . this can be private to flink, yes? (I do not foresee user directly interacting with this Helper function) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] lirui-apache commented on issue #10081: [FLINK-14605][hive] Use Hive-1.1.0 as the profile to test against 1.1.x
lirui-apache commented on issue #10081: [FLINK-14605][hive] Use Hive-1.1.0 as the profile to test against 1.1.x URL: https://github.com/apache/flink/pull/10081#issuecomment-549660903 cc @bowenli86 @xuefuz @zjuwangg This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] lirui-apache opened a new pull request #10081: [FLINK-14605][hive] Use Hive-1.1.0 as the profile to test against 1.1.x
lirui-apache opened a new pull request #10081: [FLINK-14605][hive] Use Hive-1.1.0 as the profile to test against 1.1.x URL: https://github.com/apache/flink/pull/10081 ## What is the purpose of the change Test against Hive-1.1.0. ## Brief change log - Changed the 1.1.x profile to use Hive-1.1.0. ## Verifying this change Manually verified tests pass with 1.1.0. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): yes - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? NA This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] libenchao commented on a change in pull request #10060: [FLINK-14546] [flink-json] Support map type in flink-json
libenchao commented on a change in pull request #10060: [FLINK-14546] [flink-json] Support map type in flink-json URL: https://github.com/apache/flink/pull/10060#discussion_r342379414 ## File path: flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDeserializationSchemaTest.java ## @@ -61,19 +65,21 @@ public void testTypeInfoDeserialization() throws Exception { root.put("id", id); root.put("name", name); root.put("bytes", bytes); + root.putObject("map").put("flink", 123); byte[] serializedJson = objectMapper.writeValueAsBytes(root); JsonRowDeserializationSchema deserializationSchema = new JsonRowDeserializationSchema.Builder( Types.ROW_NAMED( - new String[]{"id", "name", "bytes"}, - Types.LONG, Types.STRING, Types.PRIMITIVE_ARRAY(Types.BYTE)) + new String[]{"id", "name", "bytes", "map"}, + Types.LONG, Types.STRING, Types.PRIMITIVE_ARRAY(Types.BYTE), Types.MAP(Types.STRING, Types.LONG)) Review comment: done. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-14605) Use Hive-1.1.0 as the profile to test against 1.1.x
Rui Li created FLINK-14605: -- Summary: Use Hive-1.1.0 as the profile to test against 1.1.x Key: FLINK-14605 URL: https://issues.apache.org/jira/browse/FLINK-14605 Project: Flink Issue Type: Test Components: Connectors / Hive Reporter: Rui Li Hive-1.1.1 has the issue that it can't properly handle {{stored as file_format}} syntax. So let's test against 1.1.0. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] flinkbot edited a comment on issue #9885: [FLINK-14344][checkpointing] Snapshots master hook state asynchronously
flinkbot edited a comment on issue #9885: [FLINK-14344][checkpointing] Snapshots master hook state asynchronously URL: https://github.com/apache/flink/pull/9885#issuecomment-541299624 ## CI report: * d0c426e6ff61dcad1de4c7d3e9a2a16416e77e54 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/131633477) * 22434aac9a08ebb0ea1a61eba28ec12a55149d10 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/133945741) * cdff164bf39e74bdfd81332d84f12193a09997e5 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/134839623) * 30f912924c8497d9c55aa62a4987d94e31d409b5 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/134843066) * 5012d25a071772b683abddd7ef4c8d660e4c164d : PENDING [Build](https://travis-ci.com/flink-ci/flink/builds/134966688) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-14604) Bump commons-cli to 1.4
Wei Zhong created FLINK-14604: - Summary: Bump commons-cli to 1.4 Key: FLINK-14604 URL: https://issues.apache.org/jira/browse/FLINK-14604 Project: Flink Issue Type: Improvement Components: Command Line Client Reporter: Wei Zhong Currently flink is using commons-cli 1.3.1. There is a [bug|https://issues.apache.org/jira/projects/CLI/issues/CLI-265] in it which prevent us from using options that accept variable arguments in command line. To be precise, it prevents us from accepting a short-name option after a varargs option because there is a problem in the implementation of DefaultParser#isShortOption() method in commons-cli 1.3.1: {code:java} /** * Tells if the token looks like a short option. * * @param token */ private boolean isShortOption(String token) { // short options (-S, -SV, -S=V, -SV1=V2, -S1S2) // PROBLEM: It assumes that short option only has single character, // but in fact we have many multi-characters short options. return token.startsWith("-") && token.length() >= 2 && options.hasShortOption(token.substring(1, 2)); } {code} If we bump the version to 1.4, we can solve this problem. I request this change because there are 2 varargs options which hit this bug in the design of command line options of [Python UDF Dependency Management|https://cwiki.apache.org/confluence/display/FLINK/FLIP-78%3A+Flink+Python+UDF+Environment+and+Dependency+Management]. It will be great helpful if we can bump the commons-cli version to 1.4 :). The commons-cli 1.4 is also a stable version which released at Mar, 2017. And today its usage statistic is greater than 1.3.1 on [maven central repository|https://mvnrepository.com/artifact/commons-cli/commons-cli]. I have pushed the change to my own travis to check if it breaks something. This is the [link|https://travis-ci.org/WeiZhong94/flink/builds/607438208] and it seems that everything is fine. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] flinkbot edited a comment on issue #10060: [FLINK-14546] [flink-json] Support map type in flink-json
flinkbot edited a comment on issue #10060: [FLINK-14546] [flink-json] Support map type in flink-json URL: https://github.com/apache/flink/pull/10060#issuecomment-548313242 ## CI report: * c68dc12d24be4ef19efc1ef42246e6018c4f6e8d : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/134363638) * f65310be9fb96f2446398a718dabeb4bd5a7a874 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/134963469) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #10059: [FLINK-14543][table] Support partition for temporary table
flinkbot edited a comment on issue #10059: [FLINK-14543][table] Support partition for temporary table URL: https://github.com/apache/flink/pull/10059#issuecomment-548289939 ## CI report: * 4937b8b139bab4957799947b841fb1ae3758ed48 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/134352112) * 6de7bf454ccb8a05ec596954b7551ea9eddac297 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/134385271) * 177c6c50b0636e6e14bbb6c511c88cff05905318 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/134792783) * ab296376ac94d06f5a94b93dafa9c9aab2e0eab7 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/134963458) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] walterddr commented on a change in pull request #9373: [FLINK-13596][ml] Add two utils for Table transformations
walterddr commented on a change in pull request #9373: [FLINK-13596][ml] Add two utils for Table transformations URL: https://github.com/apache/flink/pull/9373#discussion_r342376557 ## File path: flink-ml-parent/flink-ml-lib/src/main/java/org/apache/flink/ml/common/utils/DataSetConversionUtil.java ## @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flink.ml.common.utils; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.operators.SingleInputUdfOperator; +import org.apache.flink.api.java.operators.TwoInputUdfOperator; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.ml.common.MLEnvironment; +import org.apache.flink.ml.common.MLEnvironmentFactory; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.types.Row; + +/** + * Provide functions of conversions between DataSet and Table. + */ +public class DataSetConversionUtil { + /** +* Convert the given Table to {@link DataSet}<{@link Row}>. +* +* @param sessionId the sessionId of {@link MLEnvironmentFactory} +* @param table the Table to convert. +* @return the converted DataSet. +*/ + public static DataSet fromTable(Long sessionId, Table table) { + return MLEnvironmentFactory + .get(sessionId) + .getBatchTableEnvironment() + .toDataSet(table, Row.class); + } + + /** +* Convert the given DataSet into a Table with specified TableSchema. +* +* @param sessionId the sessionId of {@link MLEnvironmentFactory} +* @param data the DataSet to convert. +* @param schema the specified TableSchema. +* @return the converted Table. +*/ + public static Table toTable(Long sessionId, DataSet data, TableSchema schema) { Review comment: also if `schema.getFieldTypes()` is deprecated. I would rather we remove this function and just allow the user to handle deprecation in user code. (this would be much cleaner for future maintenance) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] walterddr commented on a change in pull request #9373: [FLINK-13596][ml] Add two utils for Table transformations
walterddr commented on a change in pull request #9373: [FLINK-13596][ml] Add two utils for Table transformations URL: https://github.com/apache/flink/pull/9373#discussion_r342374624 ## File path: flink-ml-parent/flink-ml-lib/src/main/java/org/apache/flink/ml/common/utils/DataSetConversionUtil.java ## @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flink.ml.common.utils; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.operators.SingleInputUdfOperator; +import org.apache.flink.api.java.operators.TwoInputUdfOperator; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.ml.common.MLEnvironment; +import org.apache.flink.ml.common.MLEnvironmentFactory; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.types.Row; + +/** + * Provide functions of conversions between DataSet and Table. + */ +public class DataSetConversionUtil { + /** +* Convert the given Table to {@link DataSet}<{@link Row}>. +* +* @param sessionId the sessionId of {@link MLEnvironmentFactory} +* @param table the Table to convert. +* @return the converted DataSet. +*/ + public static DataSet fromTable(Long sessionId, Table table) { + return MLEnvironmentFactory + .get(sessionId) + .getBatchTableEnvironment() + .toDataSet(table, Row.class); + } + + /** +* Convert the given DataSet into a Table with specified TableSchema. +* +* @param sessionId the sessionId of {@link MLEnvironmentFactory} +* @param data the DataSet to convert. +* @param schema the specified TableSchema. +* @return the converted Table. +*/ + public static Table toTable(Long sessionId, DataSet data, TableSchema schema) { Review comment: This API seems to be not tested. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] walterddr commented on a change in pull request #9373: [FLINK-13596][ml] Add two utils for Table transformations
walterddr commented on a change in pull request #9373: [FLINK-13596][ml] Add two utils for Table transformations URL: https://github.com/apache/flink/pull/9373#discussion_r342375237 ## File path: flink-ml-parent/flink-ml-lib/src/main/java/org/apache/flink/ml/common/utils/DataSetConversionUtil.java ## @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flink.ml.common.utils; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.operators.SingleInputUdfOperator; +import org.apache.flink.api.java.operators.TwoInputUdfOperator; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.ml.common.MLEnvironment; +import org.apache.flink.ml.common.MLEnvironmentFactory; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.types.Row; + +/** + * Provide functions of conversions between DataSet and Table. + */ +public class DataSetConversionUtil { + /** +* Convert the given Table to {@link DataSet}<{@link Row}>. +* +* @param sessionId the sessionId of {@link MLEnvironmentFactory} +* @param table the Table to convert. +* @return the converted DataSet. +*/ + public static DataSet fromTable(Long sessionId, Table table) { + return MLEnvironmentFactory + .get(sessionId) + .getBatchTableEnvironment() + .toDataSet(table, Row.class); + } + + /** +* Convert the given DataSet into a Table with specified TableSchema. +* +* @param sessionId the sessionId of {@link MLEnvironmentFactory} +* @param data the DataSet to convert. +* @param schema the specified TableSchema. +* @return the converted Table. +*/ + public static Table toTable(Long sessionId, DataSet data, TableSchema schema) { + return toTable(sessionId, data, schema.getFieldNames(), schema.getFieldTypes()); + } + + /** +* Convert the given DataSet into a Table with specified colNames and colTypes. +* +* @param sessionId sessionId the sessionId of {@link MLEnvironmentFactory}. +* @param data the DataSet to convert. +* @param colNames the specified colNames. +* @param colTypes the specified colTypes. This variable is used only when the +* DataSet is produced by a function and Flink cannot determine +* automatically what the produced type is. +* @return the converted Table. +*/ + public static Table toTable(Long sessionId, DataSet data, String[] colNames, TypeInformation [] colTypes) { + return toTable(MLEnvironmentFactory.get(sessionId), data, colNames, colTypes); + } + + /** +* Convert the given DataSet into a Table with specified colNames. +* +* @param sessionId sessionId the sessionId of {@link MLEnvironmentFactory}. +* @param data the DataSet to convert. +* @param colNames the specified colNames. +* @return the converted Table. +*/ + public static Table toTable(Long sessionId, DataSet data, String[] colNames) { + return toTable(MLEnvironmentFactory.get(sessionId), data, colNames); + } + + /** +* Convert the given DataSet into a Table with specified colNames and colTypes. +* +* @param session the MLEnvironment using to convert DataSet to Table. +* @param data the DataSet to convert. +* @param colNames the specified colNames. +* @param colTypes the specified colTypes. This variable is used only when the +* DataSet is produced by a function and Flink cannot determine +* automatically what the produced type is. +* @return the converted Table. +*/ + public static Table toTable(MLEnvironment session, DataSet
[GitHub] [flink] walterddr commented on a change in pull request #9373: [FLINK-13596][ml] Add two utils for Table transformations
walterddr commented on a change in pull request #9373: [FLINK-13596][ml] Add two utils for Table transformations URL: https://github.com/apache/flink/pull/9373#discussion_r342376183 ## File path: flink-ml-parent/flink-ml-lib/src/main/java/org/apache/flink/ml/common/utils/DataSetConversionUtil.java ## @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flink.ml.common.utils; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.operators.SingleInputUdfOperator; +import org.apache.flink.api.java.operators.TwoInputUdfOperator; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.ml.common.MLEnvironment; +import org.apache.flink.ml.common.MLEnvironmentFactory; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.types.Row; + +/** + * Provide functions of conversions between DataSet and Table. + */ +public class DataSetConversionUtil { + /** +* Convert the given Table to {@link DataSet}<{@link Row}>. +* +* @param sessionId the sessionId of {@link MLEnvironmentFactory} +* @param table the Table to convert. +* @return the converted DataSet. +*/ + public static DataSet fromTable(Long sessionId, Table table) { + return MLEnvironmentFactory + .get(sessionId) + .getBatchTableEnvironment() + .toDataSet(table, Row.class); + } + + /** +* Convert the given DataSet into a Table with specified TableSchema. +* +* @param sessionId the sessionId of {@link MLEnvironmentFactory} +* @param data the DataSet to convert. +* @param schema the specified TableSchema. +* @return the converted Table. +*/ + public static Table toTable(Long sessionId, DataSet data, TableSchema schema) { + return toTable(sessionId, data, schema.getFieldNames(), schema.getFieldTypes()); + } + + /** +* Convert the given DataSet into a Table with specified colNames and colTypes. +* +* @param sessionId sessionId the sessionId of {@link MLEnvironmentFactory}. +* @param data the DataSet to convert. +* @param colNames the specified colNames. +* @param colTypes the specified colTypes. This variable is used only when the +* DataSet is produced by a function and Flink cannot determine +* automatically what the produced type is. +* @return the converted Table. +*/ + public static Table toTable(Long sessionId, DataSet data, String[] colNames, TypeInformation [] colTypes) { + return toTable(MLEnvironmentFactory.get(sessionId), data, colNames, colTypes); + } + + /** +* Convert the given DataSet into a Table with specified colNames. +* +* @param sessionId sessionId the sessionId of {@link MLEnvironmentFactory}. +* @param data the DataSet to convert. +* @param colNames the specified colNames. +* @return the converted Table. +*/ + public static Table toTable(Long sessionId, DataSet data, String[] colNames) { + return toTable(MLEnvironmentFactory.get(sessionId), data, colNames); + } + + /** +* Convert the given DataSet into a Table with specified colNames and colTypes. +* +* @param session the MLEnvironment using to convert DataSet to Table. +* @param data the DataSet to convert. +* @param colNames the specified colNames. +* @param colTypes the specified colTypes. This variable is used only when the +* DataSet is produced by a function and Flink cannot determine +* automatically what the produced type is. +* @return the converted Table. +*/ + public static Table toTable(MLEnvironment session, DataSet
[GitHub] [flink] walterddr commented on a change in pull request #9373: [FLINK-13596][ml] Add two utils for Table transformations
walterddr commented on a change in pull request #9373: [FLINK-13596][ml] Add two utils for Table transformations URL: https://github.com/apache/flink/pull/9373#discussion_r342375974 ## File path: flink-ml-parent/flink-ml-lib/src/main/java/org/apache/flink/ml/common/utils/DataSetConversionUtil.java ## @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flink.ml.common.utils; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.operators.SingleInputUdfOperator; +import org.apache.flink.api.java.operators.TwoInputUdfOperator; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.ml.common.MLEnvironment; +import org.apache.flink.ml.common.MLEnvironmentFactory; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.types.Row; + +/** + * Provide functions of conversions between DataSet and Table. + */ +public class DataSetConversionUtil { + /** +* Convert the given Table to {@link DataSet}<{@link Row}>. +* +* @param sessionId the sessionId of {@link MLEnvironmentFactory} +* @param table the Table to convert. +* @return the converted DataSet. +*/ + public static DataSet fromTable(Long sessionId, Table table) { + return MLEnvironmentFactory + .get(sessionId) + .getBatchTableEnvironment() + .toDataSet(table, Row.class); + } + + /** +* Convert the given DataSet into a Table with specified TableSchema. +* +* @param sessionId the sessionId of {@link MLEnvironmentFactory} +* @param data the DataSet to convert. +* @param schema the specified TableSchema. +* @return the converted Table. +*/ + public static Table toTable(Long sessionId, DataSet data, TableSchema schema) { + return toTable(sessionId, data, schema.getFieldNames(), schema.getFieldTypes()); + } + + /** +* Convert the given DataSet into a Table with specified colNames and colTypes. +* +* @param sessionId sessionId the sessionId of {@link MLEnvironmentFactory}. +* @param data the DataSet to convert. +* @param colNames the specified colNames. +* @param colTypes the specified colTypes. This variable is used only when the +* DataSet is produced by a function and Flink cannot determine +* automatically what the produced type is. +* @return the converted Table. +*/ + public static Table toTable(Long sessionId, DataSet data, String[] colNames, TypeInformation [] colTypes) { + return toTable(MLEnvironmentFactory.get(sessionId), data, colNames, colTypes); + } + + /** +* Convert the given DataSet into a Table with specified colNames. +* +* @param sessionId sessionId the sessionId of {@link MLEnvironmentFactory}. +* @param data the DataSet to convert. +* @param colNames the specified colNames. +* @return the converted Table. +*/ + public static Table toTable(Long sessionId, DataSet data, String[] colNames) { + return toTable(MLEnvironmentFactory.get(sessionId), data, colNames); + } + + /** +* Convert the given DataSet into a Table with specified colNames and colTypes. +* +* @param session the MLEnvironment using to convert DataSet to Table. +* @param data the DataSet to convert. +* @param colNames the specified colNames. +* @param colTypes the specified colTypes. This variable is used only when the +* DataSet is produced by a function and Flink cannot determine +* automatically what the produced type is. +* @return the converted Table. +*/ + public static Table toTable(MLEnvironment session, DataSet
[GitHub] [flink] openinx commented on issue #10042: [FLINK-11466][e2e] Design the e2e java framework so that the Kafka streaming tests and Kafka SQL tests can running on it
openinx commented on issue #10042: [FLINK-11466][e2e] Design the e2e java framework so that the Kafka streaming tests and Kafka SQL tests can running on it URL: https://github.com/apache/flink/pull/10042#issuecomment-549655452 Ping @tillrohrmann @zentol for reviewing .. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-14572) BlobsCleanupITCase failed in Travis stage core - scheduler_ng
[ https://issues.apache.org/jira/browse/FLINK-14572?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16967225#comment-16967225 ] Zhu Zhu commented on FLINK-14572: - >From the root cause, the case fails when trying to get jar files from blob >server, but the file does not exist. This seems not to be related to NG scheduler since the scheduler is not created yet. >From the logics of {{BlobsCleanupITCase#testBlobServerCleanup()}}, a job will >only be submitted if the jar uploading succeeded with >{{BlobClient.uploadFiles}}. So it's high likely that the uploaded jar was >removed unexpectedly before the job is submitted. But I've no idea why the file can be removed. And I cannot produce this problem locally with hundreds of re-runs either. [~trohrmann][~gjy] Does you have good ideas for it? Root cause: 07:47:47,787 ERROR org.apache.flink.runtime.dispatcher.StandaloneDispatcher - Failed to submit job 15838a6ef77eb89697d3def42c1a58b0. java.lang.RuntimeException: org.apache.flink.runtime.client.JobExecutionException: Could not set up JobManager at org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:36) at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590) at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44) at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: org.apache.flink.runtime.client.JobExecutionException: Could not set up JobManager at org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.(JobManagerRunnerImpl.java:152) at org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner(DefaultJobManagerRunnerFactory.java:84) at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$6(Dispatcher.java:381) at org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:34) ... 7 more Caused by: java.lang.Exception: Cannot set up the user code libraries: /tmp/junit6489941706970935338/junit9210755917417967354/blobStore-1474738d-89f8-4ab0-88b2-9df867ba4cc1/incoming/temp-0001 at org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.(JobManagerRunnerImpl.java:131) ... 10 more Caused by: java.nio.file.NoSuchFileException: /tmp/junit6489941706970935338/junit9210755917417967354/blobStore-1474738d-89f8-4ab0-88b2-9df867ba4cc1/incoming/temp-0001 at sun.nio.fs.UnixException.translateToIOException(UnixException.java:86) at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102) at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107) at sun.nio.fs.UnixCopyFile.move(UnixCopyFile.java:409) at sun.nio.fs.UnixFileSystemProvider.move(UnixFileSystemProvider.java:262) at java.nio.file.Files.move(Files.java:1395) at org.apache.flink.runtime.blob.BlobUtils.moveTempFileToStore(BlobUtils.java:410) at org.apache.flink.runtime.blob.BlobServer.getFileInternal(BlobServer.java:497) at org.apache.flink.runtime.blob.BlobServer.getFileInternal(BlobServer.java:444) at org.apache.flink.runtime.blob.BlobServer.getFile(BlobServer.java:417) at org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager.registerTask(BlobLibraryCacheManager.java:120) at org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager.registerJob(BlobLibraryCacheManager.java:91) at org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.(JobManagerRunnerImpl.java:128) ... 10 more > BlobsCleanupITCase failed in Travis stage core - scheduler_ng > - > > Key: FLINK-14572 > URL: https://issues.apache.org/jira/browse/FLINK-14572 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination, Tests >Affects Versions: 1.10.0 >Reporter: Gary Yao >Priority: Critical > Labels: scheduler-ng, test-stability > Fix For: 1.10.0 > > > {noformat} > java.lang.AssertionError: > Expected: is > but: was > at > org.apache.flink.runtime.jobmanager.BlobsCleanupITCase.testBlobServerCleanup(BlobsCleanupITCase.java:220) > at > org.apache.flink.runtime.jobmanager.BlobsCleanupITCase.testBlobServerCleanupFinishedJob(BlobsCleanupITCase.java:133) > {noformat} > https://api.travis-ci.com/v3/job/250445874/log.txt --
[GitHub] [flink] flinkbot edited a comment on issue #9885: [FLINK-14344][checkpointing] Snapshots master hook state asynchronously
flinkbot edited a comment on issue #9885: [FLINK-14344][checkpointing] Snapshots master hook state asynchronously URL: https://github.com/apache/flink/pull/9885#issuecomment-541299624 ## CI report: * d0c426e6ff61dcad1de4c7d3e9a2a16416e77e54 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/131633477) * 22434aac9a08ebb0ea1a61eba28ec12a55149d10 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/133945741) * cdff164bf39e74bdfd81332d84f12193a09997e5 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/134839623) * 30f912924c8497d9c55aa62a4987d94e31d409b5 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/134843066) * 5012d25a071772b683abddd7ef4c8d660e4c164d : UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #10042: [FLINK-11466][e2e] Design the e2e java framework so that the Kafka streaming tests and Kafka SQL tests can running on it
flinkbot edited a comment on issue #10042: [FLINK-11466][e2e] Design the e2e java framework so that the Kafka streaming tests and Kafka SQL tests can running on it URL: https://github.com/apache/flink/pull/10042#issuecomment-547792578 ## CI report: * b523eace793d9168a2816e50891d6227183a7175 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/134149629) * 876f0f86b208f6fce4d74488081791dcc7b89cf8 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/134326879) * 027d5fe9ca3832c9384a715f3f7e65df3fd64dcf : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/134395038) * 4108e40950bfcf9f26e8b62603a6c35402252966 : PENDING [Build](https://travis-ci.com/flink-ci/flink/builds/134965103) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-14591) Execute PlannerBase#mergeParameters every time of calling PlannerBase#translate method
[ https://issues.apache.org/jira/browse/FLINK-14591?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16967212#comment-16967212 ] Jark Wu commented on FLINK-14591: - Hi [~Dillon.], I'm not sure the fixing approach yet. Hi [~godfreyhe], what do you think about this problem? > Execute PlannerBase#mergeParameters every time of calling > PlannerBase#translate method > --- > > Key: FLINK-14591 > URL: https://issues.apache.org/jira/browse/FLINK-14591 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Reporter: Wei Zhong >Priority: Minor > > In current implementation of blink planner, the method > "PlannerBase#mergeParameter" will be called by "PlannerBase#translate" method > to merge the configuration inside TableConfig into global job parameters: > {code:scala} > override def translate( > modifyOperations: util.List[ModifyOperation]): > util.List[Transformation[_]] = { > if (modifyOperations.isEmpty) { > return List.empty[Transformation[_]] > } > mergeParameters() > val relNodes = modifyOperations.map(translateToRel) > val optimizedRelNodes = optimize(relNodes) > val execNodes = translateToExecNodePlan(optimizedRelNodes) > translateToPlan(execNodes) > } > {code} > This translate method is called in every important moment, e.g. execute, > toDataStream, insertInto, etc. > But as shown above, there is a chance that the method return directly and not > call the "mergeParameters". > In fact if we set some configurations between the "Table#insertInto" method > and "TableEnvironment#execute" method, these configurations will not be > merged into global job parameters because the "mergeParameters" method is not > called: > {code:scala} > val env = StreamExecutionEnvironment.getExecutionEnvironment > val tEnv = StreamTableEnvironment.create(env, > EnvironmentSettings.newInstance.useBlinkPlanner.build) > ... > ... > val result = ... > val sink = TestSinkUtil.configureSink(result, new TestingAppendTableSink) > tEnv.registerTableSink("MySink", sink) > tEnv.getConfig.getConfiguration.setString("jobparam1", "value1") > result.insertInto("MySink") > > // the "jobparam2" configuration will loss > tEnv.getConfig.getConfiguration.setString("jobparam2", "value2") > tEnv.execute("test") > val jobConfig = env.getConfig.getGlobalJobParameters.toMap > > assertTrue(jobConfig.get("jobparam1")=="value1") > // this assertion will fail: > assertTrue(jobConfig.get("jobparam2")=="value2"){code} > This may bring some confusion to the user. It will be great if we can fix > this problem. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] flinkbot edited a comment on issue #10080: [FLINK-14600] Degenerate the current AtomicInteger type of verticesFinished to a normal int type
flinkbot edited a comment on issue #10080: [FLINK-14600] Degenerate the current AtomicInteger type of verticesFinished to a normal int type URL: https://github.com/apache/flink/pull/10080#issuecomment-549633254 ## CI report: * d854a07f002bd43cab524acb03e4dbd918869827 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/134960303) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #10067: [FLINK-14375][runtime] Avoid to notify scheduler about fake or outdated state update
flinkbot edited a comment on issue #10067: [FLINK-14375][runtime] Avoid to notify scheduler about fake or outdated state update URL: https://github.com/apache/flink/pull/10067#issuecomment-548812084 ## CI report: * d3341d9f0bff2cb97bd0b5bf507725ae6d670b88 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/134571714) * 37c17d2be1cdbd72cc8a3554f7c8cf09f2c8a400 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/134655935) * c290a32e36d7706fd5dd426a3ad85b65d113a6fc : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/134890002) * 4908a3ce961c7518b10e43831fa5aca2764cbf1a : PENDING [Build](https://travis-ci.com/flink-ci/flink/builds/134963479) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #10071: [FLINK-14371][runtime, tests] Enable ClassLoaderITCase/WindowCheckpointingITCase/EventTimeWindowCheckpointingITCase for NG scheduling
flinkbot edited a comment on issue #10071: [FLINK-14371][runtime, tests] Enable ClassLoaderITCase/WindowCheckpointingITCase/EventTimeWindowCheckpointingITCase for NG scheduling URL: https://github.com/apache/flink/pull/10071#issuecomment-549017258 ## CI report: * 80b38a072f3c5a415bf3c95e883c358a1648cc37 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/134655999) * 7e1c5a50ab017261aadda70e97852c298c05633b : PENDING [Build](https://travis-ci.com/flink-ci/flink/builds/134963492) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #10060: [FLINK-14546] [flink-json] Support map type in flink-json
flinkbot edited a comment on issue #10060: [FLINK-14546] [flink-json] Support map type in flink-json URL: https://github.com/apache/flink/pull/10060#issuecomment-548313242 ## CI report: * c68dc12d24be4ef19efc1ef42246e6018c4f6e8d : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/134363638) * f65310be9fb96f2446398a718dabeb4bd5a7a874 : PENDING [Build](https://travis-ci.com/flink-ci/flink/builds/134963469) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #10059: [FLINK-14543][table] Support partition for temporary table
flinkbot edited a comment on issue #10059: [FLINK-14543][table] Support partition for temporary table URL: https://github.com/apache/flink/pull/10059#issuecomment-548289939 ## CI report: * 4937b8b139bab4957799947b841fb1ae3758ed48 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/134352112) * 6de7bf454ccb8a05ec596954b7551ea9eddac297 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/134385271) * 177c6c50b0636e6e14bbb6c511c88cff05905318 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/134792783) * ab296376ac94d06f5a94b93dafa9c9aab2e0eab7 : PENDING [Build](https://travis-ci.com/flink-ci/flink/builds/134963458) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #10042: [FLINK-11466][e2e] Design the e2e java framework so that the Kafka streaming tests and Kafka SQL tests can running on it
flinkbot edited a comment on issue #10042: [FLINK-11466][e2e] Design the e2e java framework so that the Kafka streaming tests and Kafka SQL tests can running on it URL: https://github.com/apache/flink/pull/10042#issuecomment-547792578 ## CI report: * b523eace793d9168a2816e50891d6227183a7175 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/134149629) * 876f0f86b208f6fce4d74488081791dcc7b89cf8 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/134326879) * 027d5fe9ca3832c9384a715f3f7e65df3fd64dcf : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/134395038) * 4108e40950bfcf9f26e8b62603a6c35402252966 : UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-14591) Execute PlannerBase#mergeParameters every time of calling PlannerBase#translate method
[ https://issues.apache.org/jira/browse/FLINK-14591?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16967203#comment-16967203 ] Zhanchun Zhang commented on FLINK-14591: Hi [~zhongwei][~jark], I'm willing to fix this issue, can you assign it to me. Thanks ~ > Execute PlannerBase#mergeParameters every time of calling > PlannerBase#translate method > --- > > Key: FLINK-14591 > URL: https://issues.apache.org/jira/browse/FLINK-14591 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Reporter: Wei Zhong >Priority: Minor > > In current implementation of blink planner, the method > "PlannerBase#mergeParameter" will be called by "PlannerBase#translate" method > to merge the configuration inside TableConfig into global job parameters: > {code:scala} > override def translate( > modifyOperations: util.List[ModifyOperation]): > util.List[Transformation[_]] = { > if (modifyOperations.isEmpty) { > return List.empty[Transformation[_]] > } > mergeParameters() > val relNodes = modifyOperations.map(translateToRel) > val optimizedRelNodes = optimize(relNodes) > val execNodes = translateToExecNodePlan(optimizedRelNodes) > translateToPlan(execNodes) > } > {code} > This translate method is called in every important moment, e.g. execute, > toDataStream, insertInto, etc. > But as shown above, there is a chance that the method return directly and not > call the "mergeParameters". > In fact if we set some configurations between the "Table#insertInto" method > and "TableEnvironment#execute" method, these configurations will not be > merged into global job parameters because the "mergeParameters" method is not > called: > {code:scala} > val env = StreamExecutionEnvironment.getExecutionEnvironment > val tEnv = StreamTableEnvironment.create(env, > EnvironmentSettings.newInstance.useBlinkPlanner.build) > ... > ... > val result = ... > val sink = TestSinkUtil.configureSink(result, new TestingAppendTableSink) > tEnv.registerTableSink("MySink", sink) > tEnv.getConfig.getConfiguration.setString("jobparam1", "value1") > result.insertInto("MySink") > > // the "jobparam2" configuration will loss > tEnv.getConfig.getConfiguration.setString("jobparam2", "value2") > tEnv.execute("test") > val jobConfig = env.getConfig.getGlobalJobParameters.toMap > > assertTrue(jobConfig.get("jobparam1")=="value1") > // this assertion will fail: > assertTrue(jobConfig.get("jobparam2")=="value2"){code} > This may bring some confusion to the user. It will be great if we can fix > this problem. > -- This message was sent by Atlassian Jira (v8.3.4#803005)