[GitHub] [flink] flinkbot edited a comment on issue #10082: [FLINK-14164][runtime] Add a meter ‘numberOfRestarts’ to show number of restarts as well as its rate

2019-11-04 Thread GitBox
flinkbot edited a comment on issue #10082: [FLINK-14164][runtime] Add a meter 
‘numberOfRestarts’ to show number of restarts as well as its rate
URL: https://github.com/apache/flink/pull/10082#issuecomment-549698167
 
 
   
   ## CI report:
   
   * 9ad21545cdce68c8892941e8f33a89147bddb1f6 : PENDING 
[Build](https://travis-ci.com/flink-ci/flink/builds/134980263)
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #10060: [FLINK-14546] [flink-json] Support map type in flink-json

2019-11-04 Thread GitBox
flinkbot edited a comment on issue #10060: [FLINK-14546] [flink-json] Support 
map type in flink-json
URL: https://github.com/apache/flink/pull/10060#issuecomment-548313242
 
 
   
   ## CI report:
   
   * c68dc12d24be4ef19efc1ef42246e6018c4f6e8d : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/134363638)
   * f65310be9fb96f2446398a718dabeb4bd5a7a874 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/134963469)
   * 8dc48c4a16d1f4f82c78b14b18c0832fbb235d3d : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/134970076)
   * 2f6aec8fe5899e7360156efb05df6c3c9b1d9f04 : UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot commented on issue #10083: [FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs.

2019-11-04 Thread GitBox
flinkbot commented on issue #10083: [FLINK-14472][runtime]Implement 
back-pressure monitor with non-blocking outputs.
URL: https://github.com/apache/flink/pull/10083#issuecomment-549704821
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Automated Checks
   Last check on commit bdb7952a0a48b4e67f51a04db61cd96a1cbecbbc (Tue Nov 05 
07:53:06 UTC 2019)
   
   **Warnings:**
* No documentation files were touched! Remember to keep the Flink docs up 
to date!
   
   
   Mention the bot in a comment to re-run the automated checks.
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full 
explanation of the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-14472) Implement back-pressure monitor with non-blocking outputs

2019-11-04 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14472?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-14472:
---
Labels: pull-request-available  (was: )

> Implement back-pressure monitor with non-blocking outputs
> -
>
> Key: FLINK-14472
> URL: https://issues.apache.org/jira/browse/FLINK-14472
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Network
>Reporter: zhijiang
>Assignee: Yingjie Cao
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.10.0
>
>
> Currently back-pressure monitor relies on detecting task threads that are 
> stuck in `requestBufferBuilderBlocking`. There are actually two cases to 
> cause back-pressure ATM:
>  * There are no available buffers in `LocalBufferPool` and all the given 
> quotas from global pool are also exhausted. Then we need to wait for buffer 
> recycling to `LocalBufferPool`.
>  * No available buffers in `LocalBufferPool`, but the quota has not been used 
> up. While requesting buffer from global pool, it is blocked because of no 
> available buffers in global pool. Then we need to wait for buffer recycling 
> to global pool.
> We try to implement the non-blocking network output in FLINK-14396, so the 
> back pressure monitor should be adjusted accordingly after the non-blocking 
> output is used in practice.
> In detail we try to avoid the current monitor way by analyzing the task 
> thread stack, which has some drawbacks discussed before:
>  * If the `requestBuffer` is not triggered by task thread, the current 
> monitor is invalid in practice.
>  * The current monitor is heavy-weight and fragile because it needs to 
> understand more details of LocalBufferPool implementation.  
> We could provide a transparent method for the monitor caller to get the 
> backpressure result directly, and hide the implementation details in the 
> LocalBufferPool.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] wsry opened a new pull request #10083: [FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs.

2019-11-04 Thread GitBox
wsry opened a new pull request #10083: [FLINK-14472][runtime]Implement 
back-pressure monitor with non-blocking outputs.
URL: https://github.com/apache/flink/pull/10083
 
 
   ## What is the purpose of the change
   Currently back-pressure monitor relies on detecting task threads that are 
stuck in `requestBufferBuilderBlocking`. There are actually two cases to cause 
back-pressure ATM:
   
- There are no available buffers in `LocalBufferPool` and all the given 
quotas from global pool are also exhausted. Then we need to wait for buffer 
recycling to `LocalBufferPool`.
- No available buffers in `LocalBufferPool`, but the quota has not been 
used up. While requesting buffer from global pool, it is blocked because of no 
available buffers in global pool. Then we need to wait for buffer recycling to 
global pool.
   
   We try to implement the non-blocking network output in FLINK-14396, so the 
back pressure monitor should be adjusted accordingly after the non-blocking 
output is used in practice. In this PR, we implement a new back pressure 
monitor which monitors the task back pressure by checking the availability of 
ResultPartitionWriter, e.g. if there are available free buffers in the 
BufferPool of ResultPartitions for output.
   
   ## Brief change log
 - A new back pressure tracker was implemented which monitors the task back 
pressure by checking the availability of ResultPartitionWriter, e.g. if there 
are available free buffers in the BufferPool of ResultPartitions for output.
 - The old stack sampling based back pressure tracker implementation and 
relevant code were removed.
 - New test cases were added to verify the changes.
   
   
   ## Verifying this change
   Several new test cases are added to verify the changes, including 
```BackPressureStatsTrackerImplTest```, 
```BackPressureSampleCoordinatorTest```, 
```TaskBackPressureSampleServiceTest```, 
```TaskTest#testNoBackPressureIfTaskNotStarted```, 
```TaskExecutorSubmissionTest#testSampleTaskBackPressure```.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / **no**)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
 - The serializers: (yes / **no** / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / **no** 
/ don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
 - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / **no**)
 - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhuzhurk commented on a change in pull request #10076: [FLINK-14465][runtime] Let `StandaloneJobClusterEntrypoint` use user code class loader

2019-11-04 Thread GitBox
zhuzhurk commented on a change in pull request #10076: [FLINK-14465][runtime] 
Let `StandaloneJobClusterEntrypoint` use user code class loader
URL: https://github.com/apache/flink/pull/10076#discussion_r342418215
 
 

 ##
 File path: 
flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java
 ##
 @@ -184,8 +184,6 @@ public PackagedProgram(File jarFile, List classpaths, 
@Nullable String entr
}
 
checkJarFile(jarFileUrl);
-   } else if (!isPython) {
-   throw new IllegalArgumentException("The jar file must 
not be null.");
 
 Review comment:
   If to do the rework in another ticket and keep the constructor public in 
this PR, I think a check is needed to ensure the `entryPointClassName` and 
`jarFile` are not null at the same time, otherwise an NPE would happen, as till 
mentioned.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-14610) Add documentation for how to use watermark syntax in DDL

2019-11-04 Thread Jark Wu (Jira)
Jark Wu created FLINK-14610:
---

 Summary: Add documentation for how to use watermark syntax in DDL
 Key: FLINK-14610
 URL: https://issues.apache.org/jira/browse/FLINK-14610
 Project: Flink
  Issue Type: Sub-task
  Components: Documentation
Reporter: Jark Wu
 Fix For: 1.10.0


Add documentation for how to use watermark syntax in DDL.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] libenchao commented on a change in pull request #10060: [FLINK-14546] [flink-json] Support map type in flink-json

2019-11-04 Thread GitBox
libenchao commented on a change in pull request #10060: [FLINK-14546] 
[flink-json] Support map type in flink-json
URL: https://github.com/apache/flink/pull/10060#discussion_r342414169
 
 

 ##
 File path: 
flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDeserializationSchema.java
 ##
 @@ -242,11 +247,25 @@ private DeserializationRuntimeConverter 
wrapIntoNullableConverter(Deserializatio
return 
Optional.of(createObjectArrayConverter(((BasicArrayTypeInfo) 
typeInfo).getComponentInfo()));
} else if (isPrimitiveByteArray(typeInfo)) {
return Optional.of(createByteArrayConverter());
+   } else if (typeInfo instanceof MapTypeInfo) {
+   MapTypeInfo mapTypeInfo = (MapTypeInfo) 
typeInfo;
+   return 
Optional.of(createMapConverter(mapTypeInfo.getKeyTypeInfo(), 
mapTypeInfo.getValueTypeInfo()));
} else {
return Optional.empty();
}
}
 
+   private DeserializationRuntimeConverter 
createMapConverter(TypeInformation keyType, TypeInformation valueType) {
+   DeserializationRuntimeConverter valueConverter = 
createConverter(valueType);
+   DeserializationRuntimeConverter keyConverter = 
createConverter(keyType);
+
+   return (mapper, jsonNode) -> StreamSupport.stream(
 
 Review comment:
   done.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-14609) Add doc for Flink SQL computed columns

2019-11-04 Thread Danny Chen (Jira)
Danny Chen created FLINK-14609:
--

 Summary: Add doc for Flink SQL computed columns
 Key: FLINK-14609
 URL: https://issues.apache.org/jira/browse/FLINK-14609
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / API
Affects Versions: 1.9.1
Reporter: Danny Chen
 Fix For: 1.10.0


1. Add doc to describe the syntax of computed column.
2. Add some demo on the website.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot commented on issue #10082: [FLINK-14164][runtime] Add a meter ‘numberOfRestarts’ to show number of restarts as well as its rate

2019-11-04 Thread GitBox
flinkbot commented on issue #10082: [FLINK-14164][runtime] Add a meter 
‘numberOfRestarts’ to show number of restarts as well as its rate
URL: https://github.com/apache/flink/pull/10082#issuecomment-549698167
 
 
   
   ## CI report:
   
   * 9ad21545cdce68c8892941e8f33a89147bddb1f6 : UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] libenchao commented on a change in pull request #10060: [FLINK-14546] [flink-json] Support map type in flink-json

2019-11-04 Thread GitBox
libenchao commented on a change in pull request #10060: [FLINK-14546] 
[flink-json] Support map type in flink-json
URL: https://github.com/apache/flink/pull/10060#discussion_r342410101
 
 

 ##
 File path: 
flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDeserializationSchema.java
 ##
 @@ -242,11 +247,25 @@ private DeserializationRuntimeConverter 
wrapIntoNullableConverter(Deserializatio
return 
Optional.of(createObjectArrayConverter(((BasicArrayTypeInfo) 
typeInfo).getComponentInfo()));
} else if (isPrimitiveByteArray(typeInfo)) {
return Optional.of(createByteArrayConverter());
+   } else if (typeInfo instanceof MapTypeInfo) {
+   MapTypeInfo mapTypeInfo = (MapTypeInfo) 
typeInfo;
+   return 
Optional.of(createMapConverter(mapTypeInfo.getKeyTypeInfo(), 
mapTypeInfo.getValueTypeInfo()));
} else {
return Optional.empty();
}
}
 
+   private DeserializationRuntimeConverter 
createMapConverter(TypeInformation keyType, TypeInformation valueType) {
+   DeserializationRuntimeConverter valueConverter = 
createConverter(valueType);
+   DeserializationRuntimeConverter keyConverter = 
createConverter(keyType);
+
+   return (mapper, jsonNode) -> StreamSupport.stream(
 
 Review comment:
   Yes, it's called per record. I'll change it back according to the style 
guide.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-14608) avoid using Java Streams in JsonRowDeserializationSchema

2019-11-04 Thread Kurt Young (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14608?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kurt Young updated FLINK-14608:
---
Description: According to 
[https://flink.apache.org/contributing/code-style-and-quality-java.html], we 
should avoid using Java Streams in any performance critical code. Since this 
`DeserializationRuntimeConverter` will be called per field of each coming 
record, we should provide a non Java Streams implementation.   (was: According 
to [Flink 
CodeStyle|[https://flink.apache.org/contributing/code-style-and-quality-java.html]],
 we should avoid using Java Streams in any performance critical code. Since 
this `DeserializationRuntimeConverter` will be called per field of each coming 
record, we should provide a non Java Streams implementation. )

> avoid using Java Streams in JsonRowDeserializationSchema
> 
>
> Key: FLINK-14608
> URL: https://issues.apache.org/jira/browse/FLINK-14608
> Project: Flink
>  Issue Type: Improvement
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Affects Versions: 1.10.0
>Reporter: Kurt Young
>Priority: Major
>
> According to 
> [https://flink.apache.org/contributing/code-style-and-quality-java.html], we 
> should avoid using Java Streams in any performance critical code. Since this 
> `DeserializationRuntimeConverter` will be called per field of each coming 
> record, we should provide a non Java Streams implementation. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-14608) avoid using Java Streams in JsonRowDeserializationSchema

2019-11-04 Thread Kurt Young (Jira)
Kurt Young created FLINK-14608:
--

 Summary: avoid using Java Streams in JsonRowDeserializationSchema
 Key: FLINK-14608
 URL: https://issues.apache.org/jira/browse/FLINK-14608
 Project: Flink
  Issue Type: Improvement
  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
Affects Versions: 1.10.0
Reporter: Kurt Young


According to [Flink 
CodeStyle|[https://flink.apache.org/contributing/code-style-and-quality-java.html]],
 we should avoid using Java Streams in any performance critical code. Since 
this `DeserializationRuntimeConverter` will be called per field of each coming 
record, we should provide a non Java Streams implementation. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-14498) Introduce NetworkBufferPool#isAvailable() for non-blocking output

2019-11-04 Thread Yingjie Cao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14498?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yingjie Cao closed FLINK-14498.
---

> Introduce NetworkBufferPool#isAvailable() for non-blocking output
> -
>
> Key: FLINK-14498
> URL: https://issues.apache.org/jira/browse/FLINK-14498
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Network
>Reporter: zhijiang
>Assignee: Yingjie Cao
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.10.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> In order to best-effort implement non-blocking output, we need to further 
> improve the interaction between LocalBufferPool and NetworkBufferPool in 
> non-blocking way as a supplementation of FLINK-14396.
> In detail, we provide the NetworkBufferPool#isAvailable to indicate the 
> global pool state, then we could combine its state via 
> LocalBufferPool#isAvailable` method to avoid blocking in global request while 
> task processing.
> Meanwhile we would refactor the process when LocalBufferPool requests global 
> buffer. If there are no available buffers in NetworkBufferPool, the 
> LocalBufferPool should monitor the global's available future instead of 
> waiting 2 seconds currently in every loop retry. So we can solve the wait 
> delay and cleanup the codes in a unified way.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] guoweiM commented on a change in pull request #10076: [FLINK-14465][runtime] Let `StandaloneJobClusterEntrypoint` use user code class loader

2019-11-04 Thread GitBox
guoweiM commented on a change in pull request #10076: [FLINK-14465][runtime] 
Let `StandaloneJobClusterEntrypoint` use user code class loader
URL: https://github.com/apache/flink/pull/10076#discussion_r342405496
 
 

 ##
 File path: 
flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPoint.java
 ##
 @@ -74,10 +79,20 @@ private StandaloneJobClusterEntryPoint(
}
 
@Override
-   protected DispatcherResourceManagerComponentFactory 
createDispatcherResourceManagerComponentFactory(Configuration configuration) {
+   protected DispatcherResourceManagerComponentFactory 
createDispatcherResourceManagerComponentFactory(Configuration configuration) 
throws IOException {
+   final String flinkHomeDir = System.getenv(ENV_FLINK_HOME_DIR);
 
 Review comment:
   1. I have verified it. (Running /test_docker_embedded_job.sh and manually 
package the per-job docker image and run) . I see this value in 
`ConfigConstants`. So I thought it was a contract.
   
   1. Since you said this is not a contract, I would like to propose not to 
rely on `FLINK_HOME`
   1.  One possible approach is:
   1. We could try the ENV_FLINK_USR_LIB_DIR directory first
   1. if this dir does not exsits  we could try the UsrLib directory 
relative to WorkingDir.
   
   By the way
   This requires modifying the working dir in docker-entrypoint.sh to support 
the default behavior.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] KurtYoung commented on a change in pull request #10060: [FLINK-14546] [flink-json] Support map type in flink-json

2019-11-04 Thread GitBox
KurtYoung commented on a change in pull request #10060: [FLINK-14546] 
[flink-json] Support map type in flink-json
URL: https://github.com/apache/flink/pull/10060#discussion_r342405648
 
 

 ##
 File path: 
flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDeserializationSchema.java
 ##
 @@ -242,11 +247,25 @@ private DeserializationRuntimeConverter 
wrapIntoNullableConverter(Deserializatio
return 
Optional.of(createObjectArrayConverter(((BasicArrayTypeInfo) 
typeInfo).getComponentInfo()));
} else if (isPrimitiveByteArray(typeInfo)) {
return Optional.of(createByteArrayConverter());
+   } else if (typeInfo instanceof MapTypeInfo) {
+   MapTypeInfo mapTypeInfo = (MapTypeInfo) 
typeInfo;
+   return 
Optional.of(createMapConverter(mapTypeInfo.getKeyTypeInfo(), 
mapTypeInfo.getValueTypeInfo()));
} else {
return Optional.empty();
}
}
 
+   private DeserializationRuntimeConverter 
createMapConverter(TypeInformation keyType, TypeInformation valueType) {
+   DeserializationRuntimeConverter valueConverter = 
createConverter(valueType);
+   DeserializationRuntimeConverter keyConverter = 
createConverter(keyType);
+
+   return (mapper, jsonNode) -> StreamSupport.stream(
 
 Review comment:
   According to [flink code 
style](https://flink.apache.org/contributing/code-style-and-quality-java.html), 
we should avoid Java Streams in any performance critical code. Since this 
DeserializationRuntimeConverter will be called in per record fashion, I would 
suggest to write it without Java Streams. 
   (I know the code codes are written with Java Streams, but we should follow 
the code style in newly added codes)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] KurtYoung commented on a change in pull request #10060: [FLINK-14546] [flink-json] Support map type in flink-json

2019-11-04 Thread GitBox
KurtYoung commented on a change in pull request #10060: [FLINK-14546] 
[flink-json] Support map type in flink-json
URL: https://github.com/apache/flink/pull/10060#discussion_r342405648
 
 

 ##
 File path: 
flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDeserializationSchema.java
 ##
 @@ -242,11 +247,25 @@ private DeserializationRuntimeConverter 
wrapIntoNullableConverter(Deserializatio
return 
Optional.of(createObjectArrayConverter(((BasicArrayTypeInfo) 
typeInfo).getComponentInfo()));
} else if (isPrimitiveByteArray(typeInfo)) {
return Optional.of(createByteArrayConverter());
+   } else if (typeInfo instanceof MapTypeInfo) {
+   MapTypeInfo mapTypeInfo = (MapTypeInfo) 
typeInfo;
+   return 
Optional.of(createMapConverter(mapTypeInfo.getKeyTypeInfo(), 
mapTypeInfo.getValueTypeInfo()));
} else {
return Optional.empty();
}
}
 
+   private DeserializationRuntimeConverter 
createMapConverter(TypeInformation keyType, TypeInformation valueType) {
+   DeserializationRuntimeConverter valueConverter = 
createConverter(valueType);
+   DeserializationRuntimeConverter keyConverter = 
createConverter(keyType);
+
+   return (mapper, jsonNode) -> StreamSupport.stream(
 
 Review comment:
   According to [flink code 
style](https://flink.apache.org/contributing/code-style-and-quality-java.html), 
we should avoid Java Streams in any performance critical code. Since this 
DeserializationRuntimeConverter will be called in per record fashion, I would 
suggest to write it without Java Streams. 
   (I know the old codes are written with Java Streams, but we should follow 
the code style in newly added codes)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] guoweiM commented on a change in pull request #10076: [FLINK-14465][runtime] Let `StandaloneJobClusterEntrypoint` use user code class loader

2019-11-04 Thread GitBox
guoweiM commented on a change in pull request #10076: [FLINK-14465][runtime] 
Let `StandaloneJobClusterEntrypoint` use user code class loader
URL: https://github.com/apache/flink/pull/10076#discussion_r342405496
 
 

 ##
 File path: 
flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPoint.java
 ##
 @@ -74,10 +79,20 @@ private StandaloneJobClusterEntryPoint(
}
 
@Override
-   protected DispatcherResourceManagerComponentFactory 
createDispatcherResourceManagerComponentFactory(Configuration configuration) {
+   protected DispatcherResourceManagerComponentFactory 
createDispatcherResourceManagerComponentFactory(Configuration configuration) 
throws IOException {
+   final String flinkHomeDir = System.getenv(ENV_FLINK_HOME_DIR);
 
 Review comment:
   1. I have verified it. (Running /test_docker_embedded_job.sh and manually 
package the per-job docker image and run) . I see this value in 
`ConfigConstants`. So I thought it was a contract.
   
   1. Since you said this is not a contract, I would like to propose not to 
rely on `FLINK_HOME`
   1.  One possible approach is:
   1. The default behavior is to only look for the UsrLib directory 
relative to WorkingDir.
   1. If it does not exist, we could try the ENV_FLINK_USR_LIB_DIR 
directory.
   
   By the way
   This requires modifying the working dir in docker-entrypoint.sh to support 
the default behavior.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot commented on issue #10082: [FLINK-14164][runtime] Add a meter ‘numberOfRestarts’ to show number of restarts as well as its rate

2019-11-04 Thread GitBox
flinkbot commented on issue #10082: [FLINK-14164][runtime] Add a meter 
‘numberOfRestarts’ to show number of restarts as well as its rate
URL: https://github.com/apache/flink/pull/10082#issuecomment-549688949
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Automated Checks
   Last check on commit 9ad21545cdce68c8892941e8f33a89147bddb1f6 (Tue Nov 05 
06:57:08 UTC 2019)
   
   **Warnings:**
* No documentation files were touched! Remember to keep the Flink docs up 
to date!
   
   
   Mention the bot in a comment to re-run the automated checks.
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full 
explanation of the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-14164) Add a metric to show failover count regarding fine grained recovery

2019-11-04 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14164?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-14164:
---
Labels: pull-request-available  (was: )

> Add a metric to show failover count regarding fine grained recovery
> ---
>
> Key: FLINK-14164
> URL: https://issues.apache.org/jira/browse/FLINK-14164
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination, Runtime / Metrics
>Affects Versions: 1.10.0
>Reporter: Zhu Zhu
>Assignee: Zhu Zhu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.10.0
>
>
> Previously Flink uses restart all strategy to recover jobs from failures. And 
> the metric "fullRestart" is used to show the count of failovers.
> However, with fine grained recovery introduced in 1.9.0, the "fullRestart" 
> metric only reveals how many times the entire graph has been restarted, not 
> including the number of fine grained failure recoveries.
> As many users want to build their job alerting based on failovers, I'd 
> propose to add such a new metric {{numberOfRestarts}} which also respects 
> fine grained recoveries. The metric should be a meter(MeterView) so that 
> users can leverage the rate to detect newly happened failures rather than de 
> deviation by themselves.
> The MeterView should be added in SchedulerBase to serve both legacy scheduler 
> and ng scheduler.
> The underlying counter of the MeterView is determined by the scheduler 
> implementations:
> 1. for legacy scheduler, it's the {{ExecutionGraph#numberOfRestartsCounter}} 
> which was added in FLINK-14206
> 2. for ng scheduler, it's a new counter added in {{ExecutionFailureHandler}} 
> that counts all the task and global failures notified to it.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] hequn8128 commented on a change in pull request #10017: [FLINK-14019][python] add support for managing environment and dependencies of Python UDF in Flink Python API

2019-11-04 Thread GitBox
hequn8128 commented on a change in pull request #10017: [FLINK-14019][python] 
add support for managing environment and dependencies of Python UDF in Flink 
Python API
URL: https://github.com/apache/flink/pull/10017#discussion_r341950747
 
 

 ##
 File path: flink-python/pyflink/common/dependency_manager.py
 ##
 @@ -0,0 +1,106 @@
+
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+
+import json
+import os
+import uuid
+
+__all__ = ['DependencyManager']
+
+
+class DependencyManager(object):
+"""
+Container class of dependency-related parameters. It collects all the 
dependency parameters
+and transmit them to JVM before executing job.
+"""
+
+PYTHON_FILE_PREFIX = "python_file"
+PYTHON_REQUIREMENTS_FILE_PREFIX = "python_requirements_file"
+PYTHON_REQUIREMENTS_CACHE_PREFIX = "python_requirements_cache"
+PYTHON_ARCHIVE_PREFIX = "python_archive"
+
+PYTHON_FILE_MAP = "PYTHON_FILE_MAP"
+PYTHON_REQUIREMENTS_FILE = "PYTHON_REQUIREMENTS_FILE"
+PYTHON_REQUIREMENTS_CACHE = "PYTHON_REQUIREMENTS_CACHE"
+PYTHON_ARCHIVES_MAP = "PYTHON_ARCHIVES_MAP"
+PYTHON_EXEC = "PYTHON_EXEC"
 
 Review comment:
   As we put these names in config, how about adding prefix to all these names? 
For example, `PYTHON_EXEC = "python.environment.python_exec"`. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] hequn8128 commented on a change in pull request #10017: [FLINK-14019][python] add support for managing environment and dependencies of Python UDF in Flink Python API

2019-11-04 Thread GitBox
hequn8128 commented on a change in pull request #10017: [FLINK-14019][python] 
add support for managing environment and dependencies of Python UDF in Flink 
Python API
URL: https://github.com/apache/flink/pull/10017#discussion_r342395415
 
 

 ##
 File path: 
flink-python/src/main/java/org/apache/flink/python/ProcessEnvironmentManager.java
 ##
 @@ -0,0 +1,317 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.python;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.python.util.UnzipUtil;
+import org.apache.flink.util.FileUtils;
+
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.core.construction.Environments;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.file.FileSystems;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+/**
+ * The ProcessEnvironmentManager used to prepare the working dir of python UDF 
worker
+ * and create ProcessEnvironment object of Beam Fn API. It will be created if 
the python
+ * function runner is configured to run python UDF in process mode.
+ */
+@Internal
+public class ProcessEnvironmentManager implements PythonEnvironmentManager {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(ProcessEnvironmentManager.class);
+
+   static final String PYTHON_REQUIREMENTS_FILE = 
"_PYTHON_REQUIREMENTS_FILE";
+   static final String PYTHON_REQUIREMENTS_CACHE = 
"_PYTHON_REQUIREMENTS_CACHE";
+   static final String PYTHON_REQUIREMENTS_TARGET_DIR = 
"_PYTHON_REQUIREMENTS_TARGET_DIR";
+   static final String PYTHON_WORKING_DIR = "_PYTHON_WORKING_DIR";
+
+   static final String PYTHON_TMP_DIR_PREFIX = "python_dist_";
+   static final String PYTHON_REQUIREMENTS_TARGET_DIR_NAME = 
"python_requirements_target";
+   static final String PYTHON_ARCHIVES_DIR = "python_archives";
+   static final String PYTHON_PATH_FILES_DIR = "python_path_files_dir";
+
+   private PythonDependencyManager dependencyManager;
+   private String pythonTmpDirectoryBase;
+   private String requirementsTargetDirPath;
+   private String pythonWorkingDirectory;
+   private String pythonPathFilesDirectory;
+   private Thread shutdownHook;
+   private Map systemEnv;
+
+   static boolean testCopy = false;
+
+   private ProcessEnvironmentManager(
+   PythonDependencyManager dependencyManager,
+   String pythonTmpDirectoryBase,
+   String pythonPathFilesDirectory,
+   String pythonWorkingDirectory,
+   String requirementsTargetDirPath,
+   Map systemEnv) {
+   this.dependencyManager = dependencyManager;
+   this.pythonTmpDirectoryBase = pythonTmpDirectoryBase;
+   this.pythonPathFilesDirectory = pythonPathFilesDirectory;
+   this.pythonWorkingDirectory = pythonWorkingDirectory;
+   this.requirementsTargetDirPath = requirementsTargetDirPath;
+   this.systemEnv = systemEnv;
+   }
+
+   @Override
+   public void cleanup() {
+   if (shutdownHook != null) {
+   shutdownHook.run();
+   Runtime.getRuntime().removeShutdownHook(shutdownHook);
+   shutdownHook = null;
+   }
+   }
+
+   @Override
+   public RunnerApi.Environment createEnvironment() {
+   prepareEnvironment();
+   Map generatedEnv = 
generateEnvironmentVariable();
+   String flinkHomePath = 
systemEnv.get(ConfigConstants.ENV_FLINK_HOME_DIR);
+   String pythonWorkerCommand =
+   flinkHomePath + File.separator + "bin" + File.separator 
+ "pyflink-udf-runner.sh";
+
+   return Environments.createProcessEnvironment(
+ 

[GitHub] [flink] hequn8128 commented on a change in pull request #10017: [FLINK-14019][python] add support for managing environment and dependencies of Python UDF in Flink Python API

2019-11-04 Thread GitBox
hequn8128 commented on a change in pull request #10017: [FLINK-14019][python] 
add support for managing environment and dependencies of Python UDF in Flink 
Python API
URL: https://github.com/apache/flink/pull/10017#discussion_r342050046
 
 

 ##
 File path: 
flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractPythonFunctionOperator.java
 ##
 @@ -227,6 +251,9 @@ public void processWatermark(Watermark mark) throws 
Exception {
 */
public abstract PythonFunctionRunner createPythonFunctionRunner();
 
+   public abstract PythonEnvironmentManager createPythonEnvironmentManager(
 
 Review comment:
   This method can also be removed if we only put `environmentManager` in 
`AbstractPythonFunctionRunner`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] hequn8128 commented on a change in pull request #10017: [FLINK-14019][python] add support for managing environment and dependencies of Python UDF in Flink Python API

2019-11-04 Thread GitBox
hequn8128 commented on a change in pull request #10017: [FLINK-14019][python] 
add support for managing environment and dependencies of Python UDF in Flink 
Python API
URL: https://github.com/apache/flink/pull/10017#discussion_r342403304
 
 

 ##
 File path: 
flink-python/src/main/java/org/apache/flink/python/ProcessEnvironmentManager.java
 ##
 @@ -0,0 +1,317 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.python;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.python.util.UnzipUtil;
+import org.apache.flink.util.FileUtils;
+
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.core.construction.Environments;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.file.FileSystems;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+/**
+ * The ProcessEnvironmentManager used to prepare the working dir of python UDF 
worker
+ * and create ProcessEnvironment object of Beam Fn API. It will be created if 
the python
+ * function runner is configured to run python UDF in process mode.
+ */
+@Internal
+public class ProcessEnvironmentManager implements PythonEnvironmentManager {
 
 Review comment:
   public final?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] hequn8128 commented on a change in pull request #10017: [FLINK-14019][python] add support for managing environment and dependencies of Python UDF in Flink Python API

2019-11-04 Thread GitBox
hequn8128 commented on a change in pull request #10017: [FLINK-14019][python] 
add support for managing environment and dependencies of Python UDF in Flink 
Python API
URL: https://github.com/apache/flink/pull/10017#discussion_r342010724
 
 

 ##
 File path: flink-python/pyflink/common/tests/test_dependency_manager.py
 ##
 @@ -0,0 +1,137 @@
+
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+
+import json
+import re
+import unittest
+
+from pyflink.common import Configuration
+from pyflink.common.dependency_manager import DependencyManager
+from pyflink.table import TableConfig
+from pyflink.testing.test_case_utils import PyFlinkTestCase
+
+
+def replace_uuid(input_obj):
+if isinstance(input_obj, str):
+return 
re.sub(r'[a-z0-9]{8}-[a-z0-9]{4}-[a-z0-9]{4}-[a-z0-9]{4}-[a-z0-9]{12}',
+  '{uuid}', input_obj)
+elif isinstance(input_obj, dict):
+input_obj_copy = dict()
+for key in input_obj:
+input_obj_copy[replace_uuid(key)] = replace_uuid(input_obj[key])
+return input_obj_copy
+
+
+class DependencyManagerTests(PyFlinkTestCase):
+
+def setUp(self):
+self.j_env = MockedJavaEnv()
+self.config = Configuration()
+self.dependency_manager = DependencyManager(self.config, self.j_env)
+
+def test_add_python_file(self):
+self.dependency_manager.add_python_file("tmp_dir/test_file1.py")
 
 Review comment:
   Also test adding dir.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] hequn8128 commented on a change in pull request #10017: [FLINK-14019][python] add support for managing environment and dependencies of Python UDF in Flink Python API

2019-11-04 Thread GitBox
hequn8128 commented on a change in pull request #10017: [FLINK-14019][python] 
add support for managing environment and dependencies of Python UDF in Flink 
Python API
URL: https://github.com/apache/flink/pull/10017#discussion_r342031800
 
 

 ##
 File path: flink-python/pom.xml
 ##
 @@ -80,6 +80,11 @@ under the License.
${project.version}
provided

+   
+   org.apache.flink
+   flink-shaded-jackson
+   provided
 
 Review comment:
   I find that we can remove this dependency directly because we already have 
fasterxml dependencies.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] hequn8128 commented on a change in pull request #10017: [FLINK-14019][python] add support for managing environment and dependencies of Python UDF in Flink Python API

2019-11-04 Thread GitBox
hequn8128 commented on a change in pull request #10017: [FLINK-14019][python] 
add support for managing environment and dependencies of Python UDF in Flink 
Python API
URL: https://github.com/apache/flink/pull/10017#discussion_r342372266
 
 

 ##
 File path: 
flink-python/src/main/java/org/apache/flink/python/ProcessEnvironmentManager.java
 ##
 @@ -0,0 +1,317 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.python;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.python.util.UnzipUtil;
+import org.apache.flink.util.FileUtils;
+
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.core.construction.Environments;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.file.FileSystems;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+/**
+ * The ProcessEnvironmentManager used to prepare the working dir of python UDF 
worker
+ * and create ProcessEnvironment object of Beam Fn API. It will be created if 
the python
+ * function runner is configured to run python UDF in process mode.
+ */
+@Internal
+public class ProcessEnvironmentManager implements PythonEnvironmentManager {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(ProcessEnvironmentManager.class);
+
+   static final String PYTHON_REQUIREMENTS_FILE = 
"_PYTHON_REQUIREMENTS_FILE";
+   static final String PYTHON_REQUIREMENTS_CACHE = 
"_PYTHON_REQUIREMENTS_CACHE";
+   static final String PYTHON_REQUIREMENTS_TARGET_DIR = 
"_PYTHON_REQUIREMENTS_TARGET_DIR";
+   static final String PYTHON_WORKING_DIR = "_PYTHON_WORKING_DIR";
+
+   static final String PYTHON_TMP_DIR_PREFIX = "python_dist_";
+   static final String PYTHON_REQUIREMENTS_TARGET_DIR_NAME = 
"python_requirements_target";
+   static final String PYTHON_ARCHIVES_DIR = "python_archives";
+   static final String PYTHON_PATH_FILES_DIR = "python_path_files_dir";
+
+   private PythonDependencyManager dependencyManager;
+   private String pythonTmpDirectoryBase;
+   private String requirementsTargetDirPath;
+   private String pythonWorkingDirectory;
+   private String pythonPathFilesDirectory;
+   private Thread shutdownHook;
+   private Map systemEnv;
+
+   static boolean testCopy = false;
+
+   private ProcessEnvironmentManager(
+   PythonDependencyManager dependencyManager,
+   String pythonTmpDirectoryBase,
+   String pythonPathFilesDirectory,
+   String pythonWorkingDirectory,
+   String requirementsTargetDirPath,
+   Map systemEnv) {
+   this.dependencyManager = dependencyManager;
+   this.pythonTmpDirectoryBase = pythonTmpDirectoryBase;
+   this.pythonPathFilesDirectory = pythonPathFilesDirectory;
+   this.pythonWorkingDirectory = pythonWorkingDirectory;
+   this.requirementsTargetDirPath = requirementsTargetDirPath;
+   this.systemEnv = systemEnv;
+   }
+
+   @Override
+   public void cleanup() {
+   if (shutdownHook != null) {
+   shutdownHook.run();
+   Runtime.getRuntime().removeShutdownHook(shutdownHook);
+   shutdownHook = null;
+   }
+   }
+
+   @Override
+   public RunnerApi.Environment createEnvironment() {
+   prepareEnvironment();
+   Map generatedEnv = 
generateEnvironmentVariable();
+   String flinkHomePath = 
systemEnv.get(ConfigConstants.ENV_FLINK_HOME_DIR);
+   String pythonWorkerCommand =
+   flinkHomePath + File.separator + "bin" + File.separator 
+ "pyflink-udf-runner.sh";
+
+   return Environments.createProcessEnvironment(
+ 

[GitHub] [flink] hequn8128 commented on a change in pull request #10017: [FLINK-14019][python] add support for managing environment and dependencies of Python UDF in Flink Python API

2019-11-04 Thread GitBox
hequn8128 commented on a change in pull request #10017: [FLINK-14019][python] 
add support for managing environment and dependencies of Python UDF in Flink 
Python API
URL: https://github.com/apache/flink/pull/10017#discussion_r341476050
 
 

 ##
 File path: 
flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractPythonFunctionOperator.java
 ##
 @@ -110,6 +116,23 @@ public void open() throws Exception {
LOG.info("The maximum bundle time is configured 
to {} milliseconds.", this.maxBundleTimeMills);
}
 
+   String[] tmpDirectories =
+   
getContainingTask().getEnvironment().getTaskManagerInfo().getTmpDirectories();
+   Random rand = new Random();
+   int tmpDirectoryIndex = rand.nextInt() % 
tmpDirectories.length;
+   String pythonTmpDirectoryRoot = 
tmpDirectories[tmpDirectoryIndex];
+   ExecutionConfig.GlobalJobParameters globalJobParameters 
= getExecutionConfig().getGlobalJobParameters();
+   Map parameters;
+   if (globalJobParameters != null) {
 
 Review comment:
   globalJobParameters can not be null. Simplify it as
   ```
   PythonDependencyManager dependencyManager = 
PythonDependencyManager.createDependencyManager(
 getExecutionConfig().getGlobalJobParameters().toMap(),
 getRuntimeContext().getDistributedCache());
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] hequn8128 commented on a change in pull request #10017: [FLINK-14019][python] add support for managing environment and dependencies of Python UDF in Flink Python API

2019-11-04 Thread GitBox
hequn8128 commented on a change in pull request #10017: [FLINK-14019][python] 
add support for managing environment and dependencies of Python UDF in Flink 
Python API
URL: https://github.com/apache/flink/pull/10017#discussion_r342373918
 
 

 ##
 File path: 
flink-python/src/main/java/org/apache/flink/python/ProcessEnvironmentManager.java
 ##
 @@ -0,0 +1,317 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.python;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.python.util.UnzipUtil;
+import org.apache.flink.util.FileUtils;
+
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.core.construction.Environments;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.file.FileSystems;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+/**
+ * The ProcessEnvironmentManager used to prepare the working dir of python UDF 
worker
+ * and create ProcessEnvironment object of Beam Fn API. It will be created if 
the python
+ * function runner is configured to run python UDF in process mode.
+ */
+@Internal
+public class ProcessEnvironmentManager implements PythonEnvironmentManager {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(ProcessEnvironmentManager.class);
+
+   static final String PYTHON_REQUIREMENTS_FILE = 
"_PYTHON_REQUIREMENTS_FILE";
+   static final String PYTHON_REQUIREMENTS_CACHE = 
"_PYTHON_REQUIREMENTS_CACHE";
+   static final String PYTHON_REQUIREMENTS_TARGET_DIR = 
"_PYTHON_REQUIREMENTS_TARGET_DIR";
+   static final String PYTHON_WORKING_DIR = "_PYTHON_WORKING_DIR";
+
+   static final String PYTHON_TMP_DIR_PREFIX = "python_dist_";
+   static final String PYTHON_REQUIREMENTS_TARGET_DIR_NAME = 
"python_requirements_target";
+   static final String PYTHON_ARCHIVES_DIR = "python_archives";
+   static final String PYTHON_PATH_FILES_DIR = "python_path_files_dir";
+
+   private PythonDependencyManager dependencyManager;
+   private String pythonTmpDirectoryBase;
+   private String requirementsTargetDirPath;
+   private String pythonWorkingDirectory;
+   private String pythonPathFilesDirectory;
+   private Thread shutdownHook;
+   private Map systemEnv;
+
+   static boolean testCopy = false;
+
+   private ProcessEnvironmentManager(
+   PythonDependencyManager dependencyManager,
+   String pythonTmpDirectoryBase,
+   String pythonPathFilesDirectory,
+   String pythonWorkingDirectory,
+   String requirementsTargetDirPath,
+   Map systemEnv) {
+   this.dependencyManager = dependencyManager;
+   this.pythonTmpDirectoryBase = pythonTmpDirectoryBase;
+   this.pythonPathFilesDirectory = pythonPathFilesDirectory;
+   this.pythonWorkingDirectory = pythonWorkingDirectory;
+   this.requirementsTargetDirPath = requirementsTargetDirPath;
+   this.systemEnv = systemEnv;
+   }
+
+   @Override
+   public void cleanup() {
 
 Review comment:
   Call this cleanup in the close method of Runner. Remove the 
`environmentManager` member in `AbstractPythonFunctionOperator`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] hequn8128 commented on a change in pull request #10017: [FLINK-14019][python] add support for managing environment and dependencies of Python UDF in Flink Python API

2019-11-04 Thread GitBox
hequn8128 commented on a change in pull request #10017: [FLINK-14019][python] 
add support for managing environment and dependencies of Python UDF in Flink 
Python API
URL: https://github.com/apache/flink/pull/10017#discussion_r342009003
 
 

 ##
 File path: flink-python/pyflink/common/dependency_manager.py
 ##
 @@ -0,0 +1,106 @@
+
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+
+import json
+import os
+import uuid
+
+__all__ = ['DependencyManager']
+
+
+class DependencyManager(object):
+"""
+Container class of dependency-related parameters. It collects all the 
dependency parameters
+and transmit them to JVM before executing job.
+"""
+
+PYTHON_FILE_PREFIX = "python_file"
+PYTHON_REQUIREMENTS_FILE_PREFIX = "python_requirements_file"
+PYTHON_REQUIREMENTS_CACHE_PREFIX = "python_requirements_cache"
+PYTHON_ARCHIVE_PREFIX = "python_archive"
+
+PYTHON_FILE_MAP = "PYTHON_FILE_MAP"
+PYTHON_REQUIREMENTS_FILE = "PYTHON_REQUIREMENTS_FILE"
+PYTHON_REQUIREMENTS_CACHE = "PYTHON_REQUIREMENTS_CACHE"
+PYTHON_ARCHIVES_MAP = "PYTHON_ARCHIVES_MAP"
+PYTHON_EXEC = "PYTHON_EXEC"
+
+def __init__(self, parameters, j_env):
+self._parameters = parameters
+self._j_env = j_env
+self._python_file_map = dict()  # type: dict[str, str]
+self._archives_map = dict()  # type: dict[str, str]
+self._counter_map = dict()  # type: dict[str, int]
+
+def _generate_file_key(self, prefix):
+if prefix not in self._counter_map:
+self._counter_map[prefix] = 0
+else:
+self._counter_map[prefix] += 1
+return "%s_%d_%s" % (prefix, self._counter_map[prefix], uuid.uuid4())
+
+def add_python_file(self, file_path):
+key = self._generate_file_key(DependencyManager.PYTHON_FILE_PREFIX)
+self._python_file_map[key] = os.path.basename(file_path)
+self._parameters.set_string(
+DependencyManager.PYTHON_FILE_MAP, 
json.dumps(self._python_file_map))
+self.register_file(key, file_path)
+
+def set_python_requirements(self, requirements_file_path, 
requirements_cached_dir=None):
+
+if 
self._parameters.contains_key(DependencyManager.PYTHON_REQUIREMENTS_FILE):
+self.remove_file(
+
self._parameters.get_string(DependencyManager.PYTHON_REQUIREMENTS_FILE, ""))
+
self._parameters.remove_config(DependencyManager.PYTHON_REQUIREMENTS_FILE)
 
 Review comment:
   Add a meaningful method for this? For example, removeIfDelete() and it can 
also be used to remove requirements_cached.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhuzhurk opened a new pull request #10082: [FLINK-14164][runtime] Add a meter ‘numberOfRestarts’ to show number of restarts as well as its rate

2019-11-04 Thread GitBox
zhuzhurk opened a new pull request #10082: [FLINK-14164][runtime] Add a meter 
‘numberOfRestarts’ to show number of restarts as well as its rate
URL: https://github.com/apache/flink/pull/10082
 
 
   
   ## What is the purpose of the change
   
   This PR is to add a meter ‘numberOfRestarts’ to show number of restarts as 
well as its rate.
   It servers both the DefaultScheduler and LegacyScheduler.
   
   ## Brief change log
   
 - *Add a meter ‘numberOfRestarts’ in SchedulerBase*
 - *Let DefaultScheduler update the counter on restarts*
   
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
 - *Refined DefaultSchedulerTest to test this change*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / **no**)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
 - The serializers: (yes / **no** / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / **no** 
/ don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (**yes** / no / don't know)
 - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / **no**)
 - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] hequn8128 commented on a change in pull request #10017: [FLINK-14019][python] add support for managing environment and dependencies of Python UDF in Flink Python API

2019-11-04 Thread GitBox
hequn8128 commented on a change in pull request #10017: [FLINK-14019][python] 
add support for managing environment and dependencies of Python UDF in Flink 
Python API
URL: https://github.com/apache/flink/pull/10017#discussion_r342354936
 
 

 ##
 File path: 
flink-python/src/main/java/org/apache/flink/python/ProcessEnvironmentManager.java
 ##
 @@ -0,0 +1,317 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.python;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.python.util.UnzipUtil;
+import org.apache.flink.util.FileUtils;
+
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.core.construction.Environments;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.file.FileSystems;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+/**
+ * The ProcessEnvironmentManager used to prepare the working dir of python UDF 
worker
+ * and create ProcessEnvironment object of Beam Fn API. It will be created if 
the python
+ * function runner is configured to run python UDF in process mode.
+ */
+@Internal
+public class ProcessEnvironmentManager implements PythonEnvironmentManager {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(ProcessEnvironmentManager.class);
+
+   static final String PYTHON_REQUIREMENTS_FILE = 
"_PYTHON_REQUIREMENTS_FILE";
+   static final String PYTHON_REQUIREMENTS_CACHE = 
"_PYTHON_REQUIREMENTS_CACHE";
+   static final String PYTHON_REQUIREMENTS_TARGET_DIR = 
"_PYTHON_REQUIREMENTS_TARGET_DIR";
+   static final String PYTHON_WORKING_DIR = "_PYTHON_WORKING_DIR";
+
+   static final String PYTHON_TMP_DIR_PREFIX = "python_dist_";
+   static final String PYTHON_REQUIREMENTS_TARGET_DIR_NAME = 
"python_requirements_target";
+   static final String PYTHON_ARCHIVES_DIR = "python_archives";
+   static final String PYTHON_PATH_FILES_DIR = "python_path_files_dir";
+
+   private PythonDependencyManager dependencyManager;
+   private String pythonTmpDirectoryBase;
+   private String requirementsTargetDirPath;
+   private String pythonWorkingDirectory;
+   private String pythonPathFilesDirectory;
+   private Thread shutdownHook;
+   private Map systemEnv;
+
+   static boolean testCopy = false;
+
+   private ProcessEnvironmentManager(
+   PythonDependencyManager dependencyManager,
+   String pythonTmpDirectoryBase,
+   String pythonPathFilesDirectory,
+   String pythonWorkingDirectory,
+   String requirementsTargetDirPath,
+   Map systemEnv) {
+   this.dependencyManager = dependencyManager;
+   this.pythonTmpDirectoryBase = pythonTmpDirectoryBase;
+   this.pythonPathFilesDirectory = pythonPathFilesDirectory;
+   this.pythonWorkingDirectory = pythonWorkingDirectory;
+   this.requirementsTargetDirPath = requirementsTargetDirPath;
+   this.systemEnv = systemEnv;
+   }
+
+   @Override
+   public void cleanup() {
+   if (shutdownHook != null) {
+   shutdownHook.run();
+   Runtime.getRuntime().removeShutdownHook(shutdownHook);
+   shutdownHook = null;
+   }
+   }
+
+   @Override
+   public RunnerApi.Environment createEnvironment() {
+   prepareEnvironment();
+   Map generatedEnv = 
generateEnvironmentVariable();
+   String flinkHomePath = 
systemEnv.get(ConfigConstants.ENV_FLINK_HOME_DIR);
+   String pythonWorkerCommand =
+   flinkHomePath + File.separator + "bin" + File.separator 
+ "pyflink-udf-runner.sh";
+
+   return Environments.createProcessEnvironment(
+ 

[GitHub] [flink] hequn8128 commented on a change in pull request #10017: [FLINK-14019][python] add support for managing environment and dependencies of Python UDF in Flink Python API

2019-11-04 Thread GitBox
hequn8128 commented on a change in pull request #10017: [FLINK-14019][python] 
add support for managing environment and dependencies of Python UDF in Flink 
Python API
URL: https://github.com/apache/flink/pull/10017#discussion_r342049265
 
 

 ##
 File path: 
flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractPythonFunctionOperator.java
 ##
 @@ -75,6 +80,7 @@
 * A timer that finishes the current bundle after a fixed amount of 
time.
 */
private transient ScheduledFuture checkFinishBundleTimer;
+   protected PythonEnvironmentManager environmentManager;
 
 Review comment:
   I see `environmentManager` has already declared in 
`AbstractPythonFunctionRunner`, so maybe it's unnecessary to also add it here. 
The related initialization can also be moved into 
`AbstractPythonFunctionRunner`.
   
   On the other hand, the environment is created in Runner currently, so put 
environmentManager in Runner is also consistent with it. 
   
   What do you think?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] hequn8128 commented on a change in pull request #10017: [FLINK-14019][python] add support for managing environment and dependencies of Python UDF in Flink Python API

2019-11-04 Thread GitBox
hequn8128 commented on a change in pull request #10017: [FLINK-14019][python] 
add support for managing environment and dependencies of Python UDF in Flink 
Python API
URL: https://github.com/apache/flink/pull/10017#discussion_r342365852
 
 

 ##
 File path: 
flink-python/src/main/java/org/apache/flink/python/ProcessEnvironmentManager.java
 ##
 @@ -0,0 +1,317 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.python;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.python.util.UnzipUtil;
+import org.apache.flink.util.FileUtils;
+
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.core.construction.Environments;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.file.FileSystems;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+/**
+ * The ProcessEnvironmentManager used to prepare the working dir of python UDF 
worker
+ * and create ProcessEnvironment object of Beam Fn API. It will be created if 
the python
+ * function runner is configured to run python UDF in process mode.
+ */
+@Internal
+public class ProcessEnvironmentManager implements PythonEnvironmentManager {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(ProcessEnvironmentManager.class);
+
+   static final String PYTHON_REQUIREMENTS_FILE = 
"_PYTHON_REQUIREMENTS_FILE";
+   static final String PYTHON_REQUIREMENTS_CACHE = 
"_PYTHON_REQUIREMENTS_CACHE";
+   static final String PYTHON_REQUIREMENTS_TARGET_DIR = 
"_PYTHON_REQUIREMENTS_TARGET_DIR";
+   static final String PYTHON_WORKING_DIR = "_PYTHON_WORKING_DIR";
+
+   static final String PYTHON_TMP_DIR_PREFIX = "python_dist_";
+   static final String PYTHON_REQUIREMENTS_TARGET_DIR_NAME = 
"python_requirements_target";
+   static final String PYTHON_ARCHIVES_DIR = "python_archives";
+   static final String PYTHON_PATH_FILES_DIR = "python_path_files_dir";
+
+   private PythonDependencyManager dependencyManager;
+   private String pythonTmpDirectoryBase;
+   private String requirementsTargetDirPath;
+   private String pythonWorkingDirectory;
+   private String pythonPathFilesDirectory;
+   private Thread shutdownHook;
+   private Map systemEnv;
+
+   static boolean testCopy = false;
+
+   private ProcessEnvironmentManager(
+   PythonDependencyManager dependencyManager,
+   String pythonTmpDirectoryBase,
+   String pythonPathFilesDirectory,
+   String pythonWorkingDirectory,
+   String requirementsTargetDirPath,
+   Map systemEnv) {
+   this.dependencyManager = dependencyManager;
+   this.pythonTmpDirectoryBase = pythonTmpDirectoryBase;
+   this.pythonPathFilesDirectory = pythonPathFilesDirectory;
+   this.pythonWorkingDirectory = pythonWorkingDirectory;
+   this.requirementsTargetDirPath = requirementsTargetDirPath;
+   this.systemEnv = systemEnv;
+   }
+
+   @Override
+   public void cleanup() {
+   if (shutdownHook != null) {
+   shutdownHook.run();
+   Runtime.getRuntime().removeShutdownHook(shutdownHook);
+   shutdownHook = null;
+   }
+   }
+
+   @Override
+   public RunnerApi.Environment createEnvironment() {
+   prepareEnvironment();
+   Map generatedEnv = 
generateEnvironmentVariable();
+   String flinkHomePath = 
systemEnv.get(ConfigConstants.ENV_FLINK_HOME_DIR);
+   String pythonWorkerCommand =
+   flinkHomePath + File.separator + "bin" + File.separator 
+ "pyflink-udf-runner.sh";
+
+   return Environments.createProcessEnvironment(
+ 

[GitHub] [flink] hequn8128 commented on a change in pull request #10017: [FLINK-14019][python] add support for managing environment and dependencies of Python UDF in Flink Python API

2019-11-04 Thread GitBox
hequn8128 commented on a change in pull request #10017: [FLINK-14019][python] 
add support for managing environment and dependencies of Python UDF in Flink 
Python API
URL: https://github.com/apache/flink/pull/10017#discussion_r342009550
 
 

 ##
 File path: flink-python/pyflink/common/dependency_manager.py
 ##
 @@ -0,0 +1,106 @@
+
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+
+import json
+import os
+import uuid
+
+__all__ = ['DependencyManager']
+
+
+class DependencyManager(object):
+"""
+Container class of dependency-related parameters. It collects all the 
dependency parameters
+and transmit them to JVM before executing job.
+"""
+
+PYTHON_FILE_PREFIX = "python_file"
+PYTHON_REQUIREMENTS_FILE_PREFIX = "python_requirements_file"
+PYTHON_REQUIREMENTS_CACHE_PREFIX = "python_requirements_cache"
+PYTHON_ARCHIVE_PREFIX = "python_archive"
+
+PYTHON_FILE_MAP = "PYTHON_FILE_MAP"
+PYTHON_REQUIREMENTS_FILE = "PYTHON_REQUIREMENTS_FILE"
+PYTHON_REQUIREMENTS_CACHE = "PYTHON_REQUIREMENTS_CACHE"
+PYTHON_ARCHIVES_MAP = "PYTHON_ARCHIVES_MAP"
+PYTHON_EXEC = "PYTHON_EXEC"
+
+def __init__(self, parameters, j_env):
+self._parameters = parameters
+self._j_env = j_env
+self._python_file_map = dict()  # type: dict[str, str]
+self._archives_map = dict()  # type: dict[str, str]
+self._counter_map = dict()  # type: dict[str, int]
+
+def _generate_file_key(self, prefix):
+if prefix not in self._counter_map:
+self._counter_map[prefix] = 0
+else:
+self._counter_map[prefix] += 1
+return "%s_%d_%s" % (prefix, self._counter_map[prefix], uuid.uuid4())
+
+def add_python_file(self, file_path):
+key = self._generate_file_key(DependencyManager.PYTHON_FILE_PREFIX)
+self._python_file_map[key] = os.path.basename(file_path)
+self._parameters.set_string(
+DependencyManager.PYTHON_FILE_MAP, 
json.dumps(self._python_file_map))
+self.register_file(key, file_path)
+
+def set_python_requirements(self, requirements_file_path, 
requirements_cached_dir=None):
+
+if 
self._parameters.contains_key(DependencyManager.PYTHON_REQUIREMENTS_FILE):
+self.remove_file(
+
self._parameters.get_string(DependencyManager.PYTHON_REQUIREMENTS_FILE, ""))
+
self._parameters.remove_config(DependencyManager.PYTHON_REQUIREMENTS_FILE)
+
+if 
self._parameters.contains_key(DependencyManager.PYTHON_REQUIREMENTS_CACHE):
+self.remove_file(
+
self._parameters.get_string(DependencyManager.PYTHON_REQUIREMENTS_CACHE, ""))
+
self._parameters.remove_config(DependencyManager.PYTHON_REQUIREMENTS_CACHE)
+
+requirements_file_key = "%s_%s" % (
+DependencyManager.PYTHON_REQUIREMENTS_FILE_PREFIX, uuid.uuid4())
+self._parameters.set_string(DependencyManager.PYTHON_REQUIREMENTS_FILE,
+requirements_file_key)
+self.register_file(requirements_file_key, requirements_file_path)
+
+if requirements_cached_dir is not None:
+requirements_cache_key = "%s_%s" % (
+DependencyManager.PYTHON_REQUIREMENTS_CACHE_PREFIX, 
uuid.uuid4())
+
self._parameters.set_string(DependencyManager.PYTHON_REQUIREMENTS_CACHE,
+requirements_cache_key)
+self.register_file(requirements_cache_key, requirements_cached_dir)
+
 
 Review comment:
   Extract a method to avoid code duplication? It also makes the code more 
readable.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] hequn8128 commented on a change in pull request #10017: [FLINK-14019][python] add support for managing environment and dependencies of Python UDF in Flink Python API

2019-11-04 Thread GitBox
hequn8128 commented on a change in pull request #10017: [FLINK-14019][python] 
add support for managing environment and dependencies of Python UDF in Flink 
Python API
URL: https://github.com/apache/flink/pull/10017#discussion_r342368907
 
 

 ##
 File path: 
flink-python/src/main/java/org/apache/flink/python/ProcessEnvironmentManager.java
 ##
 @@ -0,0 +1,317 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.python;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.python.util.UnzipUtil;
+import org.apache.flink.util.FileUtils;
+
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.core.construction.Environments;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.file.FileSystems;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+/**
+ * The ProcessEnvironmentManager used to prepare the working dir of python UDF 
worker
+ * and create ProcessEnvironment object of Beam Fn API. It will be created if 
the python
+ * function runner is configured to run python UDF in process mode.
+ */
+@Internal
+public class ProcessEnvironmentManager implements PythonEnvironmentManager {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(ProcessEnvironmentManager.class);
+
+   static final String PYTHON_REQUIREMENTS_FILE = 
"_PYTHON_REQUIREMENTS_FILE";
+   static final String PYTHON_REQUIREMENTS_CACHE = 
"_PYTHON_REQUIREMENTS_CACHE";
+   static final String PYTHON_REQUIREMENTS_TARGET_DIR = 
"_PYTHON_REQUIREMENTS_TARGET_DIR";
+   static final String PYTHON_WORKING_DIR = "_PYTHON_WORKING_DIR";
+
+   static final String PYTHON_TMP_DIR_PREFIX = "python_dist_";
+   static final String PYTHON_REQUIREMENTS_TARGET_DIR_NAME = 
"python_requirements_target";
+   static final String PYTHON_ARCHIVES_DIR = "python_archives";
+   static final String PYTHON_PATH_FILES_DIR = "python_path_files_dir";
+
+   private PythonDependencyManager dependencyManager;
+   private String pythonTmpDirectoryBase;
+   private String requirementsTargetDirPath;
+   private String pythonWorkingDirectory;
+   private String pythonPathFilesDirectory;
+   private Thread shutdownHook;
+   private Map systemEnv;
+
+   static boolean testCopy = false;
+
+   private ProcessEnvironmentManager(
+   PythonDependencyManager dependencyManager,
+   String pythonTmpDirectoryBase,
+   String pythonPathFilesDirectory,
+   String pythonWorkingDirectory,
+   String requirementsTargetDirPath,
+   Map systemEnv) {
+   this.dependencyManager = dependencyManager;
+   this.pythonTmpDirectoryBase = pythonTmpDirectoryBase;
+   this.pythonPathFilesDirectory = pythonPathFilesDirectory;
+   this.pythonWorkingDirectory = pythonWorkingDirectory;
+   this.requirementsTargetDirPath = requirementsTargetDirPath;
+   this.systemEnv = systemEnv;
+   }
+
+   @Override
+   public void cleanup() {
+   if (shutdownHook != null) {
+   shutdownHook.run();
+   Runtime.getRuntime().removeShutdownHook(shutdownHook);
+   shutdownHook = null;
+   }
+   }
+
+   @Override
+   public RunnerApi.Environment createEnvironment() {
+   prepareEnvironment();
+   Map generatedEnv = 
generateEnvironmentVariable();
+   String flinkHomePath = 
systemEnv.get(ConfigConstants.ENV_FLINK_HOME_DIR);
+   String pythonWorkerCommand =
+   flinkHomePath + File.separator + "bin" + File.separator 
+ "pyflink-udf-runner.sh";
+
+   return Environments.createProcessEnvironment(
+ 

[GitHub] [flink] hequn8128 commented on a change in pull request #10017: [FLINK-14019][python] add support for managing environment and dependencies of Python UDF in Flink Python API

2019-11-04 Thread GitBox
hequn8128 commented on a change in pull request #10017: [FLINK-14019][python] 
add support for managing environment and dependencies of Python UDF in Flink 
Python API
URL: https://github.com/apache/flink/pull/10017#discussion_r341476368
 
 

 ##
 File path: 
flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractPythonFunctionOperator.java
 ##
 @@ -110,6 +116,23 @@ public void open() throws Exception {
LOG.info("The maximum bundle time is configured 
to {} milliseconds.", this.maxBundleTimeMills);
}
 
+   String[] tmpDirectories =
+   
getContainingTask().getEnvironment().getTaskManagerInfo().getTmpDirectories();
+   Random rand = new Random();
+   int tmpDirectoryIndex = rand.nextInt() % 
tmpDirectories.length;
+   String pythonTmpDirectoryRoot = 
tmpDirectories[tmpDirectoryIndex];
 
 Review comment:
   Maybe extract these logic into a small method? Putting all logic in the open 
makes it a mess.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] hequn8128 commented on a change in pull request #10017: [FLINK-14019][python] add support for managing environment and dependencies of Python UDF in Flink Python API

2019-11-04 Thread GitBox
hequn8128 commented on a change in pull request #10017: [FLINK-14019][python] 
add support for managing environment and dependencies of Python UDF in Flink 
Python API
URL: https://github.com/apache/flink/pull/10017#discussion_r342395626
 
 

 ##
 File path: 
flink-python/src/main/java/org/apache/flink/python/ProcessEnvironmentManager.java
 ##
 @@ -0,0 +1,317 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.python;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.python.util.UnzipUtil;
+import org.apache.flink.util.FileUtils;
+
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.core.construction.Environments;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.file.FileSystems;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+/**
+ * The ProcessEnvironmentManager used to prepare the working dir of python UDF 
worker
+ * and create ProcessEnvironment object of Beam Fn API. It will be created if 
the python
+ * function runner is configured to run python UDF in process mode.
+ */
+@Internal
+public class ProcessEnvironmentManager implements PythonEnvironmentManager {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(ProcessEnvironmentManager.class);
+
+   static final String PYTHON_REQUIREMENTS_FILE = 
"_PYTHON_REQUIREMENTS_FILE";
+   static final String PYTHON_REQUIREMENTS_CACHE = 
"_PYTHON_REQUIREMENTS_CACHE";
+   static final String PYTHON_REQUIREMENTS_TARGET_DIR = 
"_PYTHON_REQUIREMENTS_TARGET_DIR";
+   static final String PYTHON_WORKING_DIR = "_PYTHON_WORKING_DIR";
+
+   static final String PYTHON_TMP_DIR_PREFIX = "python_dist_";
+   static final String PYTHON_REQUIREMENTS_TARGET_DIR_NAME = 
"python_requirements_target";
+   static final String PYTHON_ARCHIVES_DIR = "python_archives";
+   static final String PYTHON_PATH_FILES_DIR = "python_path_files_dir";
+
+   private PythonDependencyManager dependencyManager;
+   private String pythonTmpDirectoryBase;
+   private String requirementsTargetDirPath;
+   private String pythonWorkingDirectory;
+   private String pythonPathFilesDirectory;
+   private Thread shutdownHook;
+   private Map systemEnv;
+
+   static boolean testCopy = false;
+
+   private ProcessEnvironmentManager(
+   PythonDependencyManager dependencyManager,
+   String pythonTmpDirectoryBase,
+   String pythonPathFilesDirectory,
+   String pythonWorkingDirectory,
+   String requirementsTargetDirPath,
+   Map systemEnv) {
+   this.dependencyManager = dependencyManager;
+   this.pythonTmpDirectoryBase = pythonTmpDirectoryBase;
+   this.pythonPathFilesDirectory = pythonPathFilesDirectory;
+   this.pythonWorkingDirectory = pythonWorkingDirectory;
+   this.requirementsTargetDirPath = requirementsTargetDirPath;
+   this.systemEnv = systemEnv;
+   }
+
+   @Override
+   public void cleanup() {
+   if (shutdownHook != null) {
+   shutdownHook.run();
+   Runtime.getRuntime().removeShutdownHook(shutdownHook);
+   shutdownHook = null;
+   }
+   }
+
+   @Override
+   public RunnerApi.Environment createEnvironment() {
+   prepareEnvironment();
+   Map generatedEnv = 
generateEnvironmentVariable();
+   String flinkHomePath = 
systemEnv.get(ConfigConstants.ENV_FLINK_HOME_DIR);
+   String pythonWorkerCommand =
+   flinkHomePath + File.separator + "bin" + File.separator 
+ "pyflink-udf-runner.sh";
+
+   return Environments.createProcessEnvironment(
+ 

[GitHub] [flink] hequn8128 commented on a change in pull request #10017: [FLINK-14019][python] add support for managing environment and dependencies of Python UDF in Flink Python API

2019-11-04 Thread GitBox
hequn8128 commented on a change in pull request #10017: [FLINK-14019][python] 
add support for managing environment and dependencies of Python UDF in Flink 
Python API
URL: https://github.com/apache/flink/pull/10017#discussion_r342377276
 
 

 ##
 File path: 
flink-python/src/main/java/org/apache/flink/python/ProcessEnvironmentManager.java
 ##
 @@ -0,0 +1,317 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.python;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.python.util.UnzipUtil;
+import org.apache.flink.util.FileUtils;
+
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.core.construction.Environments;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.file.FileSystems;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+/**
+ * The ProcessEnvironmentManager used to prepare the working dir of python UDF 
worker
+ * and create ProcessEnvironment object of Beam Fn API. It will be created if 
the python
+ * function runner is configured to run python UDF in process mode.
+ */
+@Internal
+public class ProcessEnvironmentManager implements PythonEnvironmentManager {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(ProcessEnvironmentManager.class);
+
+   static final String PYTHON_REQUIREMENTS_FILE = 
"_PYTHON_REQUIREMENTS_FILE";
+   static final String PYTHON_REQUIREMENTS_CACHE = 
"_PYTHON_REQUIREMENTS_CACHE";
+   static final String PYTHON_REQUIREMENTS_TARGET_DIR = 
"_PYTHON_REQUIREMENTS_TARGET_DIR";
+   static final String PYTHON_WORKING_DIR = "_PYTHON_WORKING_DIR";
+
+   static final String PYTHON_TMP_DIR_PREFIX = "python_dist_";
+   static final String PYTHON_REQUIREMENTS_TARGET_DIR_NAME = 
"python_requirements_target";
+   static final String PYTHON_ARCHIVES_DIR = "python_archives";
+   static final String PYTHON_PATH_FILES_DIR = "python_path_files_dir";
+
+   private PythonDependencyManager dependencyManager;
+   private String pythonTmpDirectoryBase;
+   private String requirementsTargetDirPath;
+   private String pythonWorkingDirectory;
+   private String pythonPathFilesDirectory;
+   private Thread shutdownHook;
+   private Map systemEnv;
+
+   static boolean testCopy = false;
+
+   private ProcessEnvironmentManager(
+   PythonDependencyManager dependencyManager,
+   String pythonTmpDirectoryBase,
+   String pythonPathFilesDirectory,
+   String pythonWorkingDirectory,
+   String requirementsTargetDirPath,
+   Map systemEnv) {
+   this.dependencyManager = dependencyManager;
+   this.pythonTmpDirectoryBase = pythonTmpDirectoryBase;
+   this.pythonPathFilesDirectory = pythonPathFilesDirectory;
+   this.pythonWorkingDirectory = pythonWorkingDirectory;
+   this.requirementsTargetDirPath = requirementsTargetDirPath;
+   this.systemEnv = systemEnv;
+   }
+
+   @Override
+   public void cleanup() {
+   if (shutdownHook != null) {
+   shutdownHook.run();
+   Runtime.getRuntime().removeShutdownHook(shutdownHook);
+   shutdownHook = null;
+   }
+   }
+
+   @Override
+   public RunnerApi.Environment createEnvironment() {
+   prepareEnvironment();
+   Map generatedEnv = 
generateEnvironmentVariable();
+   String flinkHomePath = 
systemEnv.get(ConfigConstants.ENV_FLINK_HOME_DIR);
+   String pythonWorkerCommand =
+   flinkHomePath + File.separator + "bin" + File.separator 
+ "pyflink-udf-runner.sh";
+
+   return Environments.createProcessEnvironment(
+ 

[GitHub] [flink] hequn8128 commented on a change in pull request #10017: [FLINK-14019][python] add support for managing environment and dependencies of Python UDF in Flink Python API

2019-11-04 Thread GitBox
hequn8128 commented on a change in pull request #10017: [FLINK-14019][python] 
add support for managing environment and dependencies of Python UDF in Flink 
Python API
URL: https://github.com/apache/flink/pull/10017#discussion_r342351699
 
 

 ##
 File path: 
flink-python/src/main/java/org/apache/flink/python/ProcessEnvironmentManager.java
 ##
 @@ -0,0 +1,317 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.python;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.python.util.UnzipUtil;
+import org.apache.flink.util.FileUtils;
+
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.core.construction.Environments;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.file.FileSystems;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+/**
+ * The ProcessEnvironmentManager used to prepare the working dir of python UDF 
worker
+ * and create ProcessEnvironment object of Beam Fn API. It will be created if 
the python
+ * function runner is configured to run python UDF in process mode.
+ */
+@Internal
+public class ProcessEnvironmentManager implements PythonEnvironmentManager {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(ProcessEnvironmentManager.class);
+
+   static final String PYTHON_REQUIREMENTS_FILE = 
"_PYTHON_REQUIREMENTS_FILE";
+   static final String PYTHON_REQUIREMENTS_CACHE = 
"_PYTHON_REQUIREMENTS_CACHE";
+   static final String PYTHON_REQUIREMENTS_TARGET_DIR = 
"_PYTHON_REQUIREMENTS_TARGET_DIR";
+   static final String PYTHON_WORKING_DIR = "_PYTHON_WORKING_DIR";
+
+   static final String PYTHON_TMP_DIR_PREFIX = "python_dist_";
+   static final String PYTHON_REQUIREMENTS_TARGET_DIR_NAME = 
"python_requirements_target";
+   static final String PYTHON_ARCHIVES_DIR = "python_archives";
+   static final String PYTHON_PATH_FILES_DIR = "python_path_files_dir";
+
+   private PythonDependencyManager dependencyManager;
+   private String pythonTmpDirectoryBase;
+   private String requirementsTargetDirPath;
+   private String pythonWorkingDirectory;
+   private String pythonPathFilesDirectory;
+   private Thread shutdownHook;
+   private Map systemEnv;
+
+   static boolean testCopy = false;
+
+   private ProcessEnvironmentManager(
+   PythonDependencyManager dependencyManager,
+   String pythonTmpDirectoryBase,
+   String pythonPathFilesDirectory,
+   String pythonWorkingDirectory,
+   String requirementsTargetDirPath,
+   Map systemEnv) {
+   this.dependencyManager = dependencyManager;
+   this.pythonTmpDirectoryBase = pythonTmpDirectoryBase;
+   this.pythonPathFilesDirectory = pythonPathFilesDirectory;
+   this.pythonWorkingDirectory = pythonWorkingDirectory;
+   this.requirementsTargetDirPath = requirementsTargetDirPath;
+   this.systemEnv = systemEnv;
+   }
+
+   @Override
+   public void cleanup() {
+   if (shutdownHook != null) {
+   shutdownHook.run();
+   Runtime.getRuntime().removeShutdownHook(shutdownHook);
+   shutdownHook = null;
+   }
+   }
+
+   @Override
+   public RunnerApi.Environment createEnvironment() {
+   prepareEnvironment();
+   Map generatedEnv = 
generateEnvironmentVariable();
+   String flinkHomePath = 
systemEnv.get(ConfigConstants.ENV_FLINK_HOME_DIR);
+   String pythonWorkerCommand =
+   flinkHomePath + File.separator + "bin" + File.separator 
+ "pyflink-udf-runner.sh";
+
+   return Environments.createProcessEnvironment(
+ 

[GitHub] [flink] hequn8128 commented on a change in pull request #10017: [FLINK-14019][python] add support for managing environment and dependencies of Python UDF in Flink Python API

2019-11-04 Thread GitBox
hequn8128 commented on a change in pull request #10017: [FLINK-14019][python] 
add support for managing environment and dependencies of Python UDF in Flink 
Python API
URL: https://github.com/apache/flink/pull/10017#discussion_r342006956
 
 

 ##
 File path: flink-python/pyflink/common/dependency_manager.py
 ##
 @@ -0,0 +1,106 @@
+
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+
+import json
+import os
+import uuid
+
+__all__ = ['DependencyManager']
+
+
+class DependencyManager(object):
+"""
+Container class of dependency-related parameters. It collects all the 
dependency parameters
+and transmit them to JVM before executing job.
+"""
+
+PYTHON_FILE_PREFIX = "python_file"
+PYTHON_REQUIREMENTS_FILE_PREFIX = "python_requirements_file"
+PYTHON_REQUIREMENTS_CACHE_PREFIX = "python_requirements_cache"
+PYTHON_ARCHIVE_PREFIX = "python_archive"
+
+PYTHON_FILE_MAP = "PYTHON_FILE_MAP"
+PYTHON_REQUIREMENTS_FILE = "PYTHON_REQUIREMENTS_FILE"
+PYTHON_REQUIREMENTS_CACHE = "PYTHON_REQUIREMENTS_CACHE"
+PYTHON_ARCHIVES_MAP = "PYTHON_ARCHIVES_MAP"
+PYTHON_EXEC = "PYTHON_EXEC"
+
+def __init__(self, parameters, j_env):
+self._parameters = parameters
+self._j_env = j_env
+self._python_file_map = dict()  # type: dict[str, str]
+self._archives_map = dict()  # type: dict[str, str]
+self._counter_map = dict()  # type: dict[str, int]
+
+def _generate_file_key(self, prefix):
+if prefix not in self._counter_map:
+self._counter_map[prefix] = 0
+else:
+self._counter_map[prefix] += 1
+return "%s_%d_%s" % (prefix, self._counter_map[prefix], uuid.uuid4())
+
+def add_python_file(self, file_path):
+key = self._generate_file_key(DependencyManager.PYTHON_FILE_PREFIX)
+self._python_file_map[key] = os.path.basename(file_path)
+self._parameters.set_string(
+DependencyManager.PYTHON_FILE_MAP, 
json.dumps(self._python_file_map))
+self.register_file(key, file_path)
+
+def set_python_requirements(self, requirements_file_path, 
requirements_cached_dir=None):
 
 Review comment:
   Add some inline comments for this method?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #10081: [FLINK-14605][hive] Use Hive-1.1.0 as the profile to test against 1.1.x

2019-11-04 Thread GitBox
flinkbot edited a comment on issue #10081: [FLINK-14605][hive] Use Hive-1.1.0 
as the profile to test against 1.1.x
URL: https://github.com/apache/flink/pull/10081#issuecomment-549666964
 
 
   
   ## CI report:
   
   * c8a2505e137a9bdf9eadcc3c562b851285787965 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/134971651)
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] KurtYoung commented on a change in pull request #9864: [FLINK-14254][table] Introduce FileSystemOutputFormat for batch

2019-11-04 Thread GitBox
KurtYoung commented on a change in pull request #9864: [FLINK-14254][table] 
Introduce FileSystemOutputFormat for batch
URL: https://github.com/apache/flink/pull/9864#discussion_r342396703
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/GroupedPartitionWriter.java
 ##
 @@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.filesystem;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.io.OutputFormat;
+
+/**
+ * {@link PartitionWriter} for grouped dynamic partition inserting. It will 
create a new format
+ * when partition changed.
+ *
+ * @param  The type of the consumed records.
+ */
+@Internal
+public class GroupedPartitionWriter implements PartitionWriter {
+
+   private final Context context;
+   private final PathGenerator pathGenerator;
+   private final PartitionComputer computer;
+   private final PartitionPathMaker maker;
+
+   private OutputFormat currentFormat;
+   private String currentPartition;
+
+   public GroupedPartitionWriter(
+   Context context,
+   PathGenerator pathGenerator,
+   PartitionComputer computer,
+   PartitionPathMaker maker) {
+   this.context = context;
+   this.pathGenerator = pathGenerator;
+   this.computer = computer;
+   this.maker = maker;
+   }
+
+   @Override
+   public void write(T in) throws Exception {
+   String partition = 
maker.makePartitionPath(computer.makePartitionValues(in));
+   if (currentPartition == null || 
!partition.equals(currentPartition)) {
 
 Review comment:
   partition.equals(null) will be false, no?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #9864: [FLINK-14254][table] Introduce FileSystemOutputFormat for batch

2019-11-04 Thread GitBox
JingsongLi commented on a change in pull request #9864: [FLINK-14254][table] 
Introduce FileSystemOutputFormat for batch
URL: https://github.com/apache/flink/pull/9864#discussion_r342394854
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/GroupedPartitionWriter.java
 ##
 @@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.filesystem;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.io.OutputFormat;
+
+/**
+ * {@link PartitionWriter} for grouped dynamic partition inserting. It will 
create a new format
+ * when partition changed.
+ *
+ * @param  The type of the consumed records.
+ */
+@Internal
+public class GroupedPartitionWriter implements PartitionWriter {
+
+   private final Context context;
+   private final PathGenerator pathGenerator;
+   private final PartitionComputer computer;
+   private final PartitionPathMaker maker;
+
+   private OutputFormat currentFormat;
+   private String currentPartition;
+
+   public GroupedPartitionWriter(
+   Context context,
+   PathGenerator pathGenerator,
+   PartitionComputer computer,
+   PartitionPathMaker maker) {
+   this.context = context;
+   this.pathGenerator = pathGenerator;
+   this.computer = computer;
+   this.maker = maker;
+   }
+
+   @Override
+   public void write(T in) throws Exception {
+   String partition = 
maker.makePartitionPath(computer.makePartitionValues(in));
+   if (currentPartition == null || 
!partition.equals(currentPartition)) {
 
 Review comment:
   For grouped input, at the first, the `currentPartition` is null. And then, 
we need close previous format. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #9864: [FLINK-14254][table] Introduce FileSystemOutputFormat for batch

2019-11-04 Thread GitBox
JingsongLi commented on a change in pull request #9864: [FLINK-14254][table] 
Introduce FileSystemOutputFormat for batch
URL: https://github.com/apache/flink/pull/9864#discussion_r342394368
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/PartitionPathMaker.java
 ##
 @@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.filesystem;
+
+import java.io.Serializable;
+import java.util.LinkedHashMap;
+
+/**
+ * Interface to make partition path. May need handle escape chars and default 
partition name
+ * for null values.
+ */
+public interface PartitionPathMaker extends Serializable {
 
 Review comment:
   I will delete it and use utils.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #10060: [FLINK-14546] [flink-json] Support map type in flink-json

2019-11-04 Thread GitBox
flinkbot edited a comment on issue #10060: [FLINK-14546] [flink-json] Support 
map type in flink-json
URL: https://github.com/apache/flink/pull/10060#issuecomment-548313242
 
 
   
   ## CI report:
   
   * c68dc12d24be4ef19efc1ef42246e6018c4f6e8d : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/134363638)
   * f65310be9fb96f2446398a718dabeb4bd5a7a874 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/134963469)
   * 8dc48c4a16d1f4f82c78b14b18c0832fbb235d3d : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/134970076)
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wuchong commented on a change in pull request #10039: [FLINK-14262][table] Support referencing function with fully/partially qualified names in SQL

2019-11-04 Thread GitBox
wuchong commented on a change in pull request #10039: [FLINK-14262][table]  
Support referencing function with fully/partially qualified names in SQL
URL: https://github.com/apache/flink/pull/10039#discussion_r342376155
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/catalog/CatalogTableITCase.scala
 ##
 @@ -84,6 +94,41 @@ class CatalogTableITCase(isStreamingMode: Boolean) {
 tableEnv.execute(name)
   }
 
+  private def testUdf(sql: String): Unit = {
+val sinkDDL =
+  """
+|create table sinkT(
+|  a bigint
+|) with (
+|  'connector' = 'COLLECTION'
+|)
+  """.stripMargin
+tableEnv.sqlUpdate(sinkDDL)
+tableEnv.sqlUpdate(sql)
+tableEnv.execute("")
+assertEquals(Seq(toRow(2L)), TestCollectionTableFactory.RESULT.sorted)
+  }
+
+  @Test
+  def testUdfWithFullIdentifier(): Unit = {
+testUdf("insert into sinkT select 
default_catalog.default_database.myfunc(cast(1 as bigint))")
 
 Review comment:
   I think we should also test `default_database.myfunc` and `myfunc`.
   Could you also verify it also works for temproary (system/catalog) function? 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wuchong commented on a change in pull request #10039: [FLINK-14262][table] Support referencing function with fully/partially qualified names in SQL

2019-11-04 Thread GitBox
wuchong commented on a change in pull request #10039: [FLINK-14262][table]  
Support referencing function with fully/partially qualified names in SQL
URL: https://github.com/apache/flink/pull/10039#discussion_r342376155
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/catalog/CatalogTableITCase.scala
 ##
 @@ -84,6 +94,41 @@ class CatalogTableITCase(isStreamingMode: Boolean) {
 tableEnv.execute(name)
   }
 
+  private def testUdf(sql: String): Unit = {
+val sinkDDL =
+  """
+|create table sinkT(
+|  a bigint
+|) with (
+|  'connector' = 'COLLECTION'
+|)
+  """.stripMargin
+tableEnv.sqlUpdate(sinkDDL)
+tableEnv.sqlUpdate(sql)
+tableEnv.execute("")
+assertEquals(Seq(toRow(2L)), TestCollectionTableFactory.RESULT.sorted)
+  }
+
+  @Test
+  def testUdfWithFullIdentifier(): Unit = {
+testUdf("insert into sinkT select 
default_catalog.default_database.myfunc(cast(1 as bigint))")
 
 Review comment:
   I think we should also test `default_database.myfunc` and `myfunc`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wuchong commented on a change in pull request #10039: [FLINK-14262][table] Support referencing function with fully/partially qualified names in SQL

2019-11-04 Thread GitBox
wuchong commented on a change in pull request #10039: [FLINK-14262][table]  
Support referencing function with fully/partially qualified names in SQL
URL: https://github.com/apache/flink/pull/10039#discussion_r342375742
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/catalog/CatalogTableITCase.scala
 ##
 @@ -84,6 +94,41 @@ class CatalogTableITCase(isStreamingMode: Boolean) {
 tableEnv.execute(name)
   }
 
+  private def testUdf(sql: String): Unit = {
+val sinkDDL =
+  """
+|create table sinkT(
+|  a bigint
+|) with (
+|  'connector' = 'COLLECTION'
+|)
+  """.stripMargin
+tableEnv.sqlUpdate(sinkDDL)
+tableEnv.sqlUpdate(sql)
+tableEnv.execute("")
+assertEquals(Seq(toRow(2L)), TestCollectionTableFactory.RESULT.sorted)
 
 Review comment:
   The expected result is hard code here which is not clean. It would be better 
to pass in the expected result as parameter. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wuchong commented on a change in pull request #10039: [FLINK-14262][table] Support referencing function with fully/partially qualified names in SQL

2019-11-04 Thread GitBox
wuchong commented on a change in pull request #10039: [FLINK-14262][table]  
Support referencing function with fully/partially qualified names in SQL
URL: https://github.com/apache/flink/pull/10039#discussion_r342388213
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/functions/utils/ScalarSqlFunction.scala
 ##
 @@ -45,13 +44,13 @@ import scala.collection.JavaConverters._
   * @param typeFactorytype factory for converting Flink's between 
Calcite's types
   */
 class ScalarSqlFunction(
-name: String,
+name: SqlIdentifier,
 
 Review comment:
   I would suggest to use `FunctionIdentifier` as the name parameter. 
   1)`FunctionIdentifier` is a more strict interface, either 3-part full path 
(UDF), or 1-part name (built-in).
   2) `FunctionIdentifier` has a better utility to constuct, rather than using 
`SqlParserPos.ZERO` here and there. 
   3)  the constructor of `ScalarSqlFunction` is not only from SQL path, but 
also Table API path, but it is weird to construct a `SqlIdentifier` in Table 
API path. 
   
   The same to `TableSqlFunction`, `AggregateSqlFunction`, etc...
   
   What do you think?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wuchong commented on a change in pull request #10039: [FLINK-14262][table] Support referencing function with fully/partially qualified names in SQL

2019-11-04 Thread GitBox
wuchong commented on a change in pull request #10039: [FLINK-14262][table]  
Support referencing function with fully/partially qualified names in SQL
URL: https://github.com/apache/flink/pull/10039#discussion_r342380567
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/catalog/FunctionCatalogOperatorTable.java
 ##
 @@ -184,4 +188,21 @@ private boolean isNotUserFunction(SqlFunctionCategory 
category) {
public List getOperatorList() {
throw new UnsupportedOperationException("This should never be 
called");
}
+
+   public static FunctionIdentifier toFunctionIdentifier(String[] names, 
CatalogManager catalogManager) {
+   return names.length == 1 ?
+   FunctionIdentifier.of(names[0]) :
+   FunctionIdentifier.of(
+   
catalogManager.qualifyIdentifier(UnresolvedIdentifier.of(names)));
+   }
+
+   public static SqlIdentifier createSqlIdentifier(CallExpression call, 
UserDefinedFunction function) {
 
 Review comment:
   The `createSqlIdentifier` and `toFunctionIdentifier` utility is weird in 
this class, this class is not a utility class. Maybe `UserDefinedFunctionUtils` 
is a better place. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wuchong commented on a change in pull request #10039: [FLINK-14262][table] Support referencing function with fully/partially qualified names in SQL

2019-11-04 Thread GitBox
wuchong commented on a change in pull request #10039: [FLINK-14262][table]  
Support referencing function with fully/partially qualified names in SQL
URL: https://github.com/apache/flink/pull/10039#discussion_r342393287
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/catalog/FunctionCatalogOperatorTable.java
 ##
 @@ -74,7 +84,7 @@ public void lookupOperatorOverloads(
SqlSyntax syntax,
List operatorList,
SqlNameMatcher nameMatcher) {
-   if (!opName.isSimple()) {
+   if (opName.isStar()) {
 
 Review comment:
   nit: `opName.isStar() && syntax == SqlSyntax.FUNCTION`.
   
   I just add a test in local, `f0 is not null`  will also go into 
`lookupFunction` which should never happen. `FunctionCatalog.lookupFunction` 
should only handle functions, not prefix or postfix syntax. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-14607) SharedSlot cannot fulfill pending slot requests before it's completely released

2019-11-04 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14607?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu updated FLINK-14607:

Description: 
Currently a pending request can only be fulfilled when a physical 
slot({{AllocatedSlot}}) becomes available in {{SlotPool}}.
A shared slot however, cannot be used to fulfill pending requests even if it 
becomes qualified. This may lead to resource deadlocks in certain cases.

For example, running job A(parallelism=2) --(pipelined)--> B(parallelism=2) 
with 1 slot only, all vertices are in the same slot sharing group, here's what 
may happen:
1. Schedule A1 and A2. A1 acquires the only slot, A2's slot request is pending 
because a slot cannot host 2 instances of the same JobVertex at the same time. 
Shared slot status: \{A1\}
2. A1 produces data and triggers the scheduling of B1. Shared slot status {A1, 
B1}
3. A1 finishes. Shared slot status {B1}
4. B1 cannot finish since A2 has not finished, while A2 cannot get launched due 
to no physical slot becomes available, even though the slot is qualified for 
host it now. A resource deadlock happens.

Maybe we should improve {{SlotSharingManager}}. One a task slot is released, 
its root {{MultiTaskSlot}} should be used to try fulfilling existing pending 
task slots from other pending root slots({{unresolvedRootSlots}}) in this 
{{SlotSharingManager}}(means in the same slot sharing group).
We need to be careful to not cause any failures, and do not violate colocation 
constraints.

cc [~trohrmann]


  was:
Currently a pending request can only be fulfilled when a physical 
slot({{AllocatedSlot}}) becomes available in {{SlotPool}}.
A shared slot however, cannot be used to fulfill pending requests even if it 
becomes qualified. This may lead to resource deadlocks in certain cases.

For example, running job A(parallelism=2) --(pipelined)--> B(parallelism=2) 
with 1 slot only, all vertices are in the same slot sharing group, here's what 
may happen:
1. Schedule A1 and A2. A1 acquires the only slot, A2's slot request is pending 
because a slot cannot host 2 instances of the same JobVertex at the same time. 
Shared slot status {A1}
2. A1 produces data and triggers the scheduling of B1. Shared slot status {A1, 
B1}
3. A1 finishes. Shared slot status {B1}
4. B1 cannot finish since A2 has not finished, while A2 cannot get launched due 
to no physical slot becomes available, even though the slot is qualified for 
host it now. A resource deadlock happens.

Maybe we should improve {{SlotSharingManager}}. One a task slot is released, 
its root {{MultiTaskSlot}} should be used to try fulfilling existing pending 
task slots from other pending root slots({{unresolvedRootSlots}}) in this 
{{SlotSharingManager}}(means in the same slot sharing group).
We need to be careful to not cause any failures, and do not violate colocation 
constraints.

cc [~trohrmann]



> SharedSlot cannot fulfill pending slot requests before it's completely 
> released
> ---
>
> Key: FLINK-14607
> URL: https://issues.apache.org/jira/browse/FLINK-14607
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.10.0, 1.9.1
>Reporter: Zhu Zhu
>Priority: Major
>
> Currently a pending request can only be fulfilled when a physical 
> slot({{AllocatedSlot}}) becomes available in {{SlotPool}}.
> A shared slot however, cannot be used to fulfill pending requests even if it 
> becomes qualified. This may lead to resource deadlocks in certain cases.
> For example, running job A(parallelism=2) --(pipelined)--> B(parallelism=2) 
> with 1 slot only, all vertices are in the same slot sharing group, here's 
> what may happen:
> 1. Schedule A1 and A2. A1 acquires the only slot, A2's slot request is 
> pending because a slot cannot host 2 instances of the same JobVertex at the 
> same time. Shared slot status: \{A1\}
> 2. A1 produces data and triggers the scheduling of B1. Shared slot status 
> {A1, B1}
> 3. A1 finishes. Shared slot status {B1}
> 4. B1 cannot finish since A2 has not finished, while A2 cannot get launched 
> due to no physical slot becomes available, even though the slot is qualified 
> for host it now. A resource deadlock happens.
> Maybe we should improve {{SlotSharingManager}}. One a task slot is released, 
> its root {{MultiTaskSlot}} should be used to try fulfilling existing pending 
> task slots from other pending root slots({{unresolvedRootSlots}}) in this 
> {{SlotSharingManager}}(means in the same slot sharing group).
> We need to be careful to not cause any failures, and do not violate 
> colocation constraints.
> cc [~trohrmann]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-14607) SharedSlot cannot fulfill pending slot requests before it's completely released

2019-11-04 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14607?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu updated FLINK-14607:

Description: 
Currently a pending request can only be fulfilled when a physical 
slot({{AllocatedSlot}}) becomes available in {{SlotPool}}.
A shared slot however, cannot be used to fulfill pending requests even if it 
becomes qualified. This may lead to resource deadlocks in certain cases.

For example, running job A(parallelism=2) --(pipelined)--> B(parallelism=2) 
with 1 slot only, all vertices are in the same slot sharing group, here's what 
may happen:
1. Schedule A1 and A2. A1 acquires the only slot, A2's slot request is pending 
because a slot cannot host 2 instances of the same JobVertex at the same time. 
Shared slot status: \{A1\}
2. A1 produces data and triggers the scheduling of B1. Shared slot status: 
\{A1, B1\}
3. A1 finishes. Shared slot status: \{B1\}
4. B1 cannot finish since A2 has not finished, while A2 cannot get launched due 
to no physical slot becomes available, even though the shred slot is qualified 
to host it now. A resource deadlock happens.

Maybe we should improve {{SlotSharingManager}}. One a task slot is released, 
its root {{MultiTaskSlot}} should be used to try fulfilling existing pending 
task slots from other pending root slots({{unresolvedRootSlots}}) in this 
{{SlotSharingManager}}(means in the same slot sharing group).
We need to be careful to not cause any failures, and do not violate colocation 
constraints.

cc [~trohrmann]


  was:
Currently a pending request can only be fulfilled when a physical 
slot({{AllocatedSlot}}) becomes available in {{SlotPool}}.
A shared slot however, cannot be used to fulfill pending requests even if it 
becomes qualified. This may lead to resource deadlocks in certain cases.

For example, running job A(parallelism=2) --(pipelined)--> B(parallelism=2) 
with 1 slot only, all vertices are in the same slot sharing group, here's what 
may happen:
1. Schedule A1 and A2. A1 acquires the only slot, A2's slot request is pending 
because a slot cannot host 2 instances of the same JobVertex at the same time. 
Shared slot status: \{A1\}
2. A1 produces data and triggers the scheduling of B1. Shared slot status: 
\{A1, B1\}
3. A1 finishes. Shared slot status: \{B1\}
4. B1 cannot finish since A2 has not finished, while A2 cannot get launched due 
to no physical slot becomes available, even though the slot is qualified for 
host it now. A resource deadlock happens.

Maybe we should improve {{SlotSharingManager}}. One a task slot is released, 
its root {{MultiTaskSlot}} should be used to try fulfilling existing pending 
task slots from other pending root slots({{unresolvedRootSlots}}) in this 
{{SlotSharingManager}}(means in the same slot sharing group).
We need to be careful to not cause any failures, and do not violate colocation 
constraints.

cc [~trohrmann]



> SharedSlot cannot fulfill pending slot requests before it's completely 
> released
> ---
>
> Key: FLINK-14607
> URL: https://issues.apache.org/jira/browse/FLINK-14607
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.10.0, 1.9.1
>Reporter: Zhu Zhu
>Priority: Major
>
> Currently a pending request can only be fulfilled when a physical 
> slot({{AllocatedSlot}}) becomes available in {{SlotPool}}.
> A shared slot however, cannot be used to fulfill pending requests even if it 
> becomes qualified. This may lead to resource deadlocks in certain cases.
> For example, running job A(parallelism=2) --(pipelined)--> B(parallelism=2) 
> with 1 slot only, all vertices are in the same slot sharing group, here's 
> what may happen:
> 1. Schedule A1 and A2. A1 acquires the only slot, A2's slot request is 
> pending because a slot cannot host 2 instances of the same JobVertex at the 
> same time. Shared slot status: \{A1\}
> 2. A1 produces data and triggers the scheduling of B1. Shared slot status: 
> \{A1, B1\}
> 3. A1 finishes. Shared slot status: \{B1\}
> 4. B1 cannot finish since A2 has not finished, while A2 cannot get launched 
> due to no physical slot becomes available, even though the shred slot is 
> qualified to host it now. A resource deadlock happens.
> Maybe we should improve {{SlotSharingManager}}. One a task slot is released, 
> its root {{MultiTaskSlot}} should be used to try fulfilling existing pending 
> task slots from other pending root slots({{unresolvedRootSlots}}) in this 
> {{SlotSharingManager}}(means in the same slot sharing group).
> We need to be careful to not cause any failures, and do not violate 
> colocation constraints.
> cc [~trohrmann]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-14607) SharedSlot cannot fulfill pending slot requests before it's completely released

2019-11-04 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14607?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu updated FLINK-14607:

Description: 
Currently a pending request can only be fulfilled when a physical 
slot({{AllocatedSlot}}) becomes available in {{SlotPool}}.
A shared slot however, cannot be used to fulfill pending requests even if it 
becomes qualified. This may lead to resource deadlocks in certain cases.

For example, running job A(parallelism=2) --(pipelined)--> B(parallelism=2) 
with 1 slot only, all vertices are in the same slot sharing group, here's what 
may happen:
1. Schedule A1 and A2. A1 acquires the only slot, A2's slot request is pending 
because a slot cannot host 2 instances of the same JobVertex at the same time. 
Shared slot status: \{A1\}
2. A1 produces data and triggers the scheduling of B1. Shared slot status: 
\{A1, B1\}
3. A1 finishes. Shared slot status: \{B1\}
4. B1 cannot finish since A2 has not finished, while A2 cannot get launched due 
to no physical slot becomes available, even though the slot is qualified for 
host it now. A resource deadlock happens.

Maybe we should improve {{SlotSharingManager}}. One a task slot is released, 
its root {{MultiTaskSlot}} should be used to try fulfilling existing pending 
task slots from other pending root slots({{unresolvedRootSlots}}) in this 
{{SlotSharingManager}}(means in the same slot sharing group).
We need to be careful to not cause any failures, and do not violate colocation 
constraints.

cc [~trohrmann]


  was:
Currently a pending request can only be fulfilled when a physical 
slot({{AllocatedSlot}}) becomes available in {{SlotPool}}.
A shared slot however, cannot be used to fulfill pending requests even if it 
becomes qualified. This may lead to resource deadlocks in certain cases.

For example, running job A(parallelism=2) --(pipelined)--> B(parallelism=2) 
with 1 slot only, all vertices are in the same slot sharing group, here's what 
may happen:
1. Schedule A1 and A2. A1 acquires the only slot, A2's slot request is pending 
because a slot cannot host 2 instances of the same JobVertex at the same time. 
Shared slot status: \{A1\}
2. A1 produces data and triggers the scheduling of B1. Shared slot status {A1, 
B1}
3. A1 finishes. Shared slot status {B1}
4. B1 cannot finish since A2 has not finished, while A2 cannot get launched due 
to no physical slot becomes available, even though the slot is qualified for 
host it now. A resource deadlock happens.

Maybe we should improve {{SlotSharingManager}}. One a task slot is released, 
its root {{MultiTaskSlot}} should be used to try fulfilling existing pending 
task slots from other pending root slots({{unresolvedRootSlots}}) in this 
{{SlotSharingManager}}(means in the same slot sharing group).
We need to be careful to not cause any failures, and do not violate colocation 
constraints.

cc [~trohrmann]



> SharedSlot cannot fulfill pending slot requests before it's completely 
> released
> ---
>
> Key: FLINK-14607
> URL: https://issues.apache.org/jira/browse/FLINK-14607
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.10.0, 1.9.1
>Reporter: Zhu Zhu
>Priority: Major
>
> Currently a pending request can only be fulfilled when a physical 
> slot({{AllocatedSlot}}) becomes available in {{SlotPool}}.
> A shared slot however, cannot be used to fulfill pending requests even if it 
> becomes qualified. This may lead to resource deadlocks in certain cases.
> For example, running job A(parallelism=2) --(pipelined)--> B(parallelism=2) 
> with 1 slot only, all vertices are in the same slot sharing group, here's 
> what may happen:
> 1. Schedule A1 and A2. A1 acquires the only slot, A2's slot request is 
> pending because a slot cannot host 2 instances of the same JobVertex at the 
> same time. Shared slot status: \{A1\}
> 2. A1 produces data and triggers the scheduling of B1. Shared slot status: 
> \{A1, B1\}
> 3. A1 finishes. Shared slot status: \{B1\}
> 4. B1 cannot finish since A2 has not finished, while A2 cannot get launched 
> due to no physical slot becomes available, even though the slot is qualified 
> for host it now. A resource deadlock happens.
> Maybe we should improve {{SlotSharingManager}}. One a task slot is released, 
> its root {{MultiTaskSlot}} should be used to try fulfilling existing pending 
> task slots from other pending root slots({{unresolvedRootSlots}}) in this 
> {{SlotSharingManager}}(means in the same slot sharing group).
> We need to be careful to not cause any failures, and do not violate 
> colocation constraints.
> cc [~trohrmann]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-14607) SharedSlot cannot fulfill pending slot requests before it's completely released

2019-11-04 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14607?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu updated FLINK-14607:

Description: 
Currently a pending request can only be fulfilled when a physical 
slot({{AllocatedSlot}}) becomes available in {{SlotPool}}.
A shared slot however, cannot be used to fulfill pending requests even if it 
becomes qualified. This may lead to resource deadlocks in certain cases.

For example, running job A(parallelism=2) --(pipelined)--> B(parallelism=2) 
with 1 slot only, all vertices are in the same slot sharing group, here's what 
may happen:
1. Schedule A1 and A2. A1 acquires the only slot, A2's slot request is pending 
because a slot cannot host 2 instances of the same JobVertex at the same time. 
Shared slot status {A1}
2. A1 produces data and triggers the scheduling of B1. Shared slot status {A1, 
B1}
3. A1 finishes. Shared slot status {B1}
4. B1 cannot finish since A2 has not finished, while A2 cannot get launched due 
to no physical slot becomes available, even though the slot is qualified for 
host it now. A resource deadlock happens.

Maybe we should improve {{SlotSharingManager}}. One a task slot is released, 
its root {{MultiTaskSlot}} should be used to try fulfilling existing pending 
task slots from other pending root slots({{unresolvedRootSlots}}) in this 
{{SlotSharingManager}}(means in the same slot sharing group).
We need to be careful to not cause any failures, and do not violate colocation 
constraints.

cc [~trohrmann]


  was:
Currently a pending request can only be fulfilled when a physical 
slot({{AllocatedSlot}}) becomes available in {{SlotPool}}.
A shared slot however, cannot be used to fulfill pending requests even if it 
becomes qualified. This may lead to resource deadlocks in certain cases.

For example, running job A(parallelism=2) --(pipelined)--> B(parallelism=2) 
with 1 slot only, all vertices are in the same slot sharing group, here's what 
may happen:
1. Schedule A1 and A2. A1 acquires the only slot, A2's slot request is pending 
because a slot cannot host 2 instances of the same JobVertex at the same time. 
Shared slot status: {A1}
2. A1 produces data and triggers the scheduling of B1. Shared slot status: {A1, 
B1}
3. A1 finishes. Shared slot status: {B1}
4. B1 cannot finish since A2 has not finished, while A2 cannot get launched due 
to no physical slot becomes available, even though the slot is qualified for 
host it now. A resource deadlock happens.

Maybe we should improve {{SlotSharingManager}}. One a task slot is released, 
its root {{MultiTaskSlot}} should be used to try fulfilling existing pending 
task slots from other pending root slots({{unresolvedRootSlots}}) in this 
{{SlotSharingManager}}(means in the same slot sharing group).
We need to be careful to not cause any failures, and do not violate colocation 
constraints.

cc [~trohrmann]



> SharedSlot cannot fulfill pending slot requests before it's completely 
> released
> ---
>
> Key: FLINK-14607
> URL: https://issues.apache.org/jira/browse/FLINK-14607
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.10.0, 1.9.1
>Reporter: Zhu Zhu
>Priority: Major
>
> Currently a pending request can only be fulfilled when a physical 
> slot({{AllocatedSlot}}) becomes available in {{SlotPool}}.
> A shared slot however, cannot be used to fulfill pending requests even if it 
> becomes qualified. This may lead to resource deadlocks in certain cases.
> For example, running job A(parallelism=2) --(pipelined)--> B(parallelism=2) 
> with 1 slot only, all vertices are in the same slot sharing group, here's 
> what may happen:
> 1. Schedule A1 and A2. A1 acquires the only slot, A2's slot request is 
> pending because a slot cannot host 2 instances of the same JobVertex at the 
> same time. Shared slot status {A1}
> 2. A1 produces data and triggers the scheduling of B1. Shared slot status 
> {A1, B1}
> 3. A1 finishes. Shared slot status {B1}
> 4. B1 cannot finish since A2 has not finished, while A2 cannot get launched 
> due to no physical slot becomes available, even though the slot is qualified 
> for host it now. A resource deadlock happens.
> Maybe we should improve {{SlotSharingManager}}. One a task slot is released, 
> its root {{MultiTaskSlot}} should be used to try fulfilling existing pending 
> task slots from other pending root slots({{unresolvedRootSlots}}) in this 
> {{SlotSharingManager}}(means in the same slot sharing group).
> We need to be careful to not cause any failures, and do not violate 
> colocation constraints.
> cc [~trohrmann]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-14607) SharedSlot cannot fulfill pending slot requests before it's completely released

2019-11-04 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14607?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu updated FLINK-14607:

Summary: SharedSlot cannot fulfill pending slot requests before it's 
completely released  (was: SharedSlot cannot fulfill pending slot requests 
before it's totally released)

> SharedSlot cannot fulfill pending slot requests before it's completely 
> released
> ---
>
> Key: FLINK-14607
> URL: https://issues.apache.org/jira/browse/FLINK-14607
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.10.0, 1.9.1
>Reporter: Zhu Zhu
>Priority: Major
>
> Currently a pending request can only be fulfilled when a physical 
> slot({{AllocatedSlot}}) becomes available in {{SlotPool}}.
> A shared slot however, cannot be used to fulfill pending requests even if it 
> becomes qualified. This may lead to resource deadlocks in certain cases.
> For example, running job A(parallelism=2) --(pipelined)--> B(parallelism=2) 
> with 1 slot only, all vertices are in the same slot sharing group, here's 
> what may happen:
> 1. Schedule A1 and A2. A1 acquires the only slot, A2's slot request is 
> pending because a slot cannot host 2 instances of the same JobVertex at the 
> same time. Shared slot status: {A1}
> 2. A1 produces data and triggers the scheduling of B1. Shared slot status: 
> {A1, B1}
> 3. A1 finishes. Shared slot status: {B1}
> 4. B1 cannot finish since A2 has not finished, while A2 cannot get launched 
> due to no physical slot becomes available, even though the slot is qualified 
> for host it now. A resource deadlock happens.
> Maybe we should improve {{SlotSharingManager}}. One a task slot is released, 
> its root {{MultiTaskSlot}} should be used to try fulfilling existing pending 
> task slots from other pending root slots({{unresolvedRootSlots}}) in this 
> {{SlotSharingManager}}(means in the same slot sharing group).
> We need to be careful to not cause any failures, and do not violate 
> colocation constraints.
> cc [~trohrmann]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-14607) SharedSlot cannot fulfill pending slot requests before it's totally released

2019-11-04 Thread Zhu Zhu (Jira)
Zhu Zhu created FLINK-14607:
---

 Summary: SharedSlot cannot fulfill pending slot requests before 
it's totally released
 Key: FLINK-14607
 URL: https://issues.apache.org/jira/browse/FLINK-14607
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Coordination
Affects Versions: 1.9.1, 1.10.0
Reporter: Zhu Zhu


Currently a pending request can only be fulfilled when a physical 
slot({{AllocatedSlot}}) becomes available in {{SlotPool}}.
A shared slot however, cannot be used to fulfill pending requests even if it 
becomes qualified. This may lead to resource deadlocks in certain cases.

For example, running job A(parallelism=2) --(pipelined)--> B(parallelism=2) 
with 1 slot only, all vertices are in the same slot sharing group, here's what 
may happen:
1. Schedule A1 and A2. A1 acquires the only slot, A2's slot request is pending 
because a slot cannot host 2 instances of the same JobVertex at the same time. 
Shared slot status: {A1}
2. A1 produces data and triggers the scheduling of B1. Shared slot status: {A1, 
B1}
3. A1 finishes. Shared slot status: {B1}
4. B1 cannot finish since A2 has not finished, while A2 cannot get launched due 
to no physical slot becomes available, even though the slot is qualified for 
host it now. A resource deadlock happens.

Maybe we should improve {{SlotSharingManager}}. One a task slot is released, 
its root {{MultiTaskSlot}} should be used to try fulfilling existing pending 
task slots from other pending root slots({{unresolvedRootSlots}}) in this 
{{SlotSharingManager}}(means in the same slot sharing group).
We need to be careful to not cause any failures, and do not violate colocation 
constraints.

cc [~trohrmann]




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot edited a comment on issue #10081: [FLINK-14605][hive] Use Hive-1.1.0 as the profile to test against 1.1.x

2019-11-04 Thread GitBox
flinkbot edited a comment on issue #10081: [FLINK-14605][hive] Use Hive-1.1.0 
as the profile to test against 1.1.x
URL: https://github.com/apache/flink/pull/10081#issuecomment-549666964
 
 
   
   ## CI report:
   
   * c8a2505e137a9bdf9eadcc3c562b851285787965 : PENDING 
[Build](https://travis-ci.com/flink-ci/flink/builds/134971651)
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-14606) Simplify params of Execution#processFail

2019-11-04 Thread Zhu Zhu (Jira)
Zhu Zhu created FLINK-14606:
---

 Summary: Simplify params of Execution#processFail
 Key: FLINK-14606
 URL: https://issues.apache.org/jira/browse/FLINK-14606
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination
Affects Versions: 1.10.0
Reporter: Zhu Zhu
 Fix For: 1.10.0


The 3 params fromSchedulerNg/releasePartitions/isCallback of 
Execution#processFail are quite a mess while they seem to be correlated. 
I'd propose to simplify the prams of processFail by using a {{isInternalError}} 
to replace those 3 params. {{isInternalError}} is true iff the failure is from 
TM(strictly speaking, notified from SchedulerBase). This also hardens the 
handling of cases that a task is successfully deployed but JM does not realize 
it(see #3 below).

Here's why these 3 params can be simplified:
1. {{fromSchedulerNg}}, true iff the failure is from TM and 
isLegacyScheduling==false.
It's only used like this: {{if (!fromSchedulerNg && 
!isLegacyScheduling()))}}. So it's the same to use {{!isInternalFailure}} to 
replace it.

2. {{releasePartitions}}, true iff the failure is from TM.
  Now the value is exactly the same as {{isInternalFailure}}, we can drop it 
and use {{isInternalFailure}} instead.

3. {{isCallback}}, true iff the failure is from TM or the task is not deployed.
It's only used like this: {{(!isCallback && (current == RUNNING || current 
== DEPLOYING))}}.
So using {{!isInternalFailure}} to replace it would be enough. It is a bit 
different for the case that a task deployment to a task manager fails, which 
set {{isCallback}} to true previously. However, it would be safer to signal a 
cancel call, in case the deployment is actually a success but the response is 
lost on network.

cc [~GJL]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] zhuzhurk commented on a change in pull request #10067: [FLINK-14375][runtime] Avoid to notify scheduler about fake or outdated state update

2019-11-04 Thread GitBox
zhuzhurk commented on a change in pull request #10067: [FLINK-14375][runtime] 
Avoid to notify scheduler about fake or outdated state update
URL: https://github.com/apache/flink/pull/10067#discussion_r342386939
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 ##
 @@ -1248,21 +1251,9 @@ private void processFail(Throwable t, boolean 
isCallback, Map

[GitHub] [flink] flinkbot edited a comment on issue #9885: [FLINK-14344][checkpointing] Snapshots master hook state asynchronously

2019-11-04 Thread GitBox
flinkbot edited a comment on issue #9885: [FLINK-14344][checkpointing] 
Snapshots master hook state asynchronously
URL: https://github.com/apache/flink/pull/9885#issuecomment-541299624
 
 
   
   ## CI report:
   
   * d0c426e6ff61dcad1de4c7d3e9a2a16416e77e54 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/131633477)
   * 22434aac9a08ebb0ea1a61eba28ec12a55149d10 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/133945741)
   * cdff164bf39e74bdfd81332d84f12193a09997e5 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/134839623)
   * 30f912924c8497d9c55aa62a4987d94e31d409b5 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/134843066)
   * 5012d25a071772b683abddd7ef4c8d660e4c164d : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/134966688)
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot commented on issue #10081: [FLINK-14605][hive] Use Hive-1.1.0 as the profile to test against 1.1.x

2019-11-04 Thread GitBox
flinkbot commented on issue #10081: [FLINK-14605][hive] Use Hive-1.1.0 as the 
profile to test against 1.1.x
URL: https://github.com/apache/flink/pull/10081#issuecomment-549666964
 
 
   
   ## CI report:
   
   * c8a2505e137a9bdf9eadcc3c562b851285787965 : UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #10060: [FLINK-14546] [flink-json] Support map type in flink-json

2019-11-04 Thread GitBox
flinkbot edited a comment on issue #10060: [FLINK-14546] [flink-json] Support 
map type in flink-json
URL: https://github.com/apache/flink/pull/10060#issuecomment-548313242
 
 
   
   ## CI report:
   
   * c68dc12d24be4ef19efc1ef42246e6018c4f6e8d : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/134363638)
   * f65310be9fb96f2446398a718dabeb4bd5a7a874 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/134963469)
   * 8dc48c4a16d1f4f82c78b14b18c0832fbb235d3d : PENDING 
[Build](https://travis-ci.com/flink-ci/flink/builds/134970076)
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-14604) Bump commons-cli to 1.4

2019-11-04 Thread Wei Zhong (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14604?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wei Zhong updated FLINK-14604:
--
Issue Type: Task  (was: Improvement)

> Bump commons-cli to 1.4
> ---
>
> Key: FLINK-14604
> URL: https://issues.apache.org/jira/browse/FLINK-14604
> Project: Flink
>  Issue Type: Task
>  Components: Command Line Client
>Reporter: Wei Zhong
>Priority: Major
>
> Currently flink is using commons-cli 1.3.1. There is a 
> [bug|https://issues.apache.org/jira/projects/CLI/issues/CLI-265] in it which 
> prevent us from using options that accept variable arguments in command line.
> To be precise, it prevents us from accepting a short-name option after a 
> varargs option because there is a problem in the implementation of 
> DefaultParser#isShortOption() method in commons-cli 1.3.1:
> {code:java}
> /**
>  * Tells if the token looks like a short option.
>  * 
>  * @param token
>  */
> private boolean isShortOption(String token)
> {
> // short options (-S, -SV, -S=V, -SV1=V2, -S1S2)
> // PROBLEM: It assumes that short option only has single character,
> //  but in fact we have many multi-characters short options.
> return token.startsWith("-") && token.length() >= 2 && 
> options.hasShortOption(token.substring(1, 2));
> }
> {code}
> If we bump the version to 1.4, we can solve this problem.
> I request this change because there are 2 varargs options which hit this bug 
> in the design of command line options of [Python UDF Dependency 
> Management|https://cwiki.apache.org/confluence/display/FLINK/FLIP-78%3A+Flink+Python+UDF+Environment+and+Dependency+Management].
>  It will be great helpful if we can bump the commons-cli version to 1.4 :).
> The commons-cli 1.4 is also a stable version which released at Mar, 2017. And 
> today its usage statistic is greater than 1.3.1 on [maven central 
> repository|https://mvnrepository.com/artifact/commons-cli/commons-cli].
> I have pushed the change to my own travis to check if it breaks something. 
> This is the [link|https://travis-ci.org/WeiZhong94/flink/builds/607438208] 
> and it seems that everything is fine.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-12675) Event time synchronization in Kafka consumer

2019-11-04 Thread Jiayi Liao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-12675?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16967243#comment-16967243
 ] 

Jiayi Liao commented on FLINK-12675:


[~thw] Since the FLIP-27 design will be updated soon according to the mail 
thread, I'm going to help to review the updated design and continue the work 
after that. 

> Event time synchronization in Kafka consumer
> 
>
> Key: FLINK-12675
> URL: https://issues.apache.org/jira/browse/FLINK-12675
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Kafka
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Major
>
> Integrate the source watermark tracking into the Kafka consumer and implement 
> the sync mechanism (different consumer model, compared to Kinesis).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot edited a comment on issue #10067: [FLINK-14375][runtime] Avoid to notify scheduler about fake or outdated state update

2019-11-04 Thread GitBox
flinkbot edited a comment on issue #10067: [FLINK-14375][runtime] Avoid to 
notify scheduler about fake or outdated state update
URL: https://github.com/apache/flink/pull/10067#issuecomment-548812084
 
 
   
   ## CI report:
   
   * d3341d9f0bff2cb97bd0b5bf507725ae6d670b88 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/134571714)
   * 37c17d2be1cdbd72cc8a3554f7c8cf09f2c8a400 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/134655935)
   * c290a32e36d7706fd5dd426a3ad85b65d113a6fc : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/134890002)
   * 4908a3ce961c7518b10e43831fa5aca2764cbf1a : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/134963479)
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #10071: [FLINK-14371][runtime, tests] Enable ClassLoaderITCase/WindowCheckpointingITCase/EventTimeWindowCheckpointingITCase for NG scheduling

2019-11-04 Thread GitBox
flinkbot edited a comment on issue #10071: [FLINK-14371][runtime, tests] Enable 
ClassLoaderITCase/WindowCheckpointingITCase/EventTimeWindowCheckpointingITCase 
for NG scheduling
URL: https://github.com/apache/flink/pull/10071#issuecomment-549017258
 
 
   
   ## CI report:
   
   * 80b38a072f3c5a415bf3c95e883c358a1648cc37 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/134655999)
   * 7e1c5a50ab017261aadda70e97852c298c05633b : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/134963492)
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #10060: [FLINK-14546] [flink-json] Support map type in flink-json

2019-11-04 Thread GitBox
flinkbot edited a comment on issue #10060: [FLINK-14546] [flink-json] Support 
map type in flink-json
URL: https://github.com/apache/flink/pull/10060#issuecomment-548313242
 
 
   
   ## CI report:
   
   * c68dc12d24be4ef19efc1ef42246e6018c4f6e8d : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/134363638)
   * f65310be9fb96f2446398a718dabeb4bd5a7a874 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/134963469)
   * 8dc48c4a16d1f4f82c78b14b18c0832fbb235d3d : UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #10042: [FLINK-11466][e2e] Design the e2e java framework so that the Kafka streaming tests and Kafka SQL tests can running on it

2019-11-04 Thread GitBox
flinkbot edited a comment on issue #10042: [FLINK-11466][e2e] Design the e2e 
java framework so that the Kafka streaming tests and Kafka SQL tests can 
running on it
URL: https://github.com/apache/flink/pull/10042#issuecomment-547792578
 
 
   
   ## CI report:
   
   * b523eace793d9168a2816e50891d6227183a7175 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/134149629)
   * 876f0f86b208f6fce4d74488081791dcc7b89cf8 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/134326879)
   * 027d5fe9ca3832c9384a715f3f7e65df3fd64dcf : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/134395038)
   * 4108e40950bfcf9f26e8b62603a6c35402252966 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/134965103)
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot commented on issue #10081: [FLINK-14605][hive] Use Hive-1.1.0 as the profile to test against 1.1.x

2019-11-04 Thread GitBox
flinkbot commented on issue #10081: [FLINK-14605][hive] Use Hive-1.1.0 as the 
profile to test against 1.1.x
URL: https://github.com/apache/flink/pull/10081#issuecomment-549661353
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Automated Checks
   Last check on commit c8a2505e137a9bdf9eadcc3c562b851285787965 (Tue Nov 05 
04:48:22 UTC 2019)
   
   **Warnings:**
* **1 pom.xml files were touched**: Check for build and licensing issues.
* No documentation files were touched! Remember to keep the Flink docs up 
to date!
* **This pull request references an unassigned [Jira 
ticket](https://issues.apache.org/jira/browse/FLINK-14605).** According to the 
[code contribution 
guide](https://flink.apache.org/contributing/contribute-code.html), tickets 
need to be assigned before starting with the implementation work.
   
   
   Mention the bot in a comment to re-run the automated checks.
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full 
explanation of the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] walterddr commented on a change in pull request #9355: [FLINK-13577][ml] Add an util class to build result row and generate …

2019-11-04 Thread GitBox
walterddr commented on a change in pull request #9355: [FLINK-13577][ml] Add an 
util class to build result row and generate …
URL: https://github.com/apache/flink/pull/9355#discussion_r342379204
 
 

 ##
 File path: 
flink-ml-parent/flink-ml-lib/src/main/java/org/apache/flink/ml/common/utils/OutputColsHelper.java
 ##
 @@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.ml.common.utils;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.types.Row;
+
+import org.apache.commons.lang3.ArrayUtils;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+
+/**
+ * Util for generating output schema when doing prediction or transformation.
+ *
+ * Input:
+ * 1) Schema of input data being predicted or transformed.
+ * 2) Output column names of the prediction/transformation operator.
+ * 3) Output column types of the prediction/transformation operator.
+ * 4) Reserved column names, which is a subset of input data's column names 
that we want to preserve.
+ *
+ * Output:
+ * 1)The result data schema. The result data is a combination of the preserved 
columns and the operator's
+ * output columns.
+ *
+ * Several rules are followed:
+ * 1) If reserved columns are not given, then all columns of input data is 
reserved.
 
 Review comment:
   I think this is actually the one default that confuses me: why if reserved 
columns are not given then preserves all input data? 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] walterddr commented on a change in pull request #9355: [FLINK-13577][ml] Add an util class to build result row and generate …

2019-11-04 Thread GitBox
walterddr commented on a change in pull request #9355: [FLINK-13577][ml] Add an 
util class to build result row and generate …
URL: https://github.com/apache/flink/pull/9355#discussion_r342379344
 
 

 ##
 File path: 
flink-ml-parent/flink-ml-lib/src/main/java/org/apache/flink/ml/common/utils/OutputColsHelper.java
 ##
 @@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.ml.common.utils;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.types.Row;
+
+import org.apache.commons.lang3.ArrayUtils;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+
+/**
+ * Util for generating output schema when doing prediction or transformation.
+ *
+ * Input:
+ * 1) Schema of input data being predicted or transformed.
+ * 2) Output column names of the prediction/transformation operator.
+ * 3) Output column types of the prediction/transformation operator.
+ * 4) Reserved column names, which is a subset of input data's column names 
that we want to preserve.
+ *
+ * Output:
+ * 1)The result data schema. The result data is a combination of the preserved 
columns and the operator's
+ * output columns.
+ *
+ * Several rules are followed:
+ * 1) If reserved columns are not given, then all columns of input data is 
reserved.
+ * 2)The reserved columns are arranged ahead of the operator's output columns 
in the final output.
+ * 3) If some of the reserved column names overlap with those of operator's 
output columns, then the operator's
 
 Review comment:
   I think we should throw an exception when output column names and the 
user-defined reserved column names have conflict (ultimately both are defined 
by user, this shouldn't occur on the users' perspective)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-14605) Use Hive-1.1.0 as the profile to test against 1.1.x

2019-11-04 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14605?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-14605:
---
Labels: pull-request-available  (was: )

> Use Hive-1.1.0 as the profile to test against 1.1.x
> ---
>
> Key: FLINK-14605
> URL: https://issues.apache.org/jira/browse/FLINK-14605
> Project: Flink
>  Issue Type: Test
>  Components: Connectors / Hive
>Reporter: Rui Li
>Priority: Major
>  Labels: pull-request-available
>
> Hive-1.1.1 has the issue that it can't properly handle {{stored as 
> file_format}} syntax. So let's test against 1.1.0.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] walterddr commented on a change in pull request #9355: [FLINK-13577][ml] Add an util class to build result row and generate …

2019-11-04 Thread GitBox
walterddr commented on a change in pull request #9355: [FLINK-13577][ml] Add an 
util class to build result row and generate …
URL: https://github.com/apache/flink/pull/9355#discussion_r342377733
 
 

 ##
 File path: 
flink-ml-parent/flink-ml-lib/src/main/java/org/apache/flink/ml/common/utils/OutputColsHelper.java
 ##
 @@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.ml.common.utils;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.types.Row;
+
+import org.apache.commons.lang3.ArrayUtils;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+
+/**
+ * Util for generating output schema when doing prediction or transformation.
+ *
+ * Input:
+ * 1) Schema of input data being predicted or transformed.
+ * 2) Output column names of the prediction/transformation operator.
+ * 3) Output column types of the prediction/transformation operator.
+ * 4) Reserved column names, which is a subset of input data's column names 
that we want to preserve.
+ *
+ * Output:
+ * 1)The result data schema. The result data is a combination of the preserved 
columns and the operator's
+ * output columns.
+ *
+ * Several rules are followed:
+ * 1) If reserved columns are not given, then all columns of input data is 
reserved.
+ * 2)The reserved columns are arranged ahead of the operator's output columns 
in the final output.
+ * 3) If some of the reserved column names overlap with those of operator's 
output columns, then the operator's
+ * output columns override the conflicting reserved columns.
+ *
+ * For example, if we have input data schema of ["id":INT, "f1":FLOAT, 
"f2":DOUBLE], and the operator outputs
+ * a column "label" with type STRING, and we want to preserve the column "id", 
then we get the output
+ * schema of ["id":INT, "label":STRING].
+ */
+public class OutputColsHelper implements Serializable {
 
 Review comment:
   based on the usage #9413 and #9523 . this can be private to flink, yes? (I 
do not foresee user directly interacting with this Helper function) 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] lirui-apache commented on issue #10081: [FLINK-14605][hive] Use Hive-1.1.0 as the profile to test against 1.1.x

2019-11-04 Thread GitBox
lirui-apache commented on issue #10081: [FLINK-14605][hive] Use Hive-1.1.0 as 
the profile to test against 1.1.x
URL: https://github.com/apache/flink/pull/10081#issuecomment-549660903
 
 
   cc @bowenli86 @xuefuz @zjuwangg 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] lirui-apache opened a new pull request #10081: [FLINK-14605][hive] Use Hive-1.1.0 as the profile to test against 1.1.x

2019-11-04 Thread GitBox
lirui-apache opened a new pull request #10081: [FLINK-14605][hive] Use 
Hive-1.1.0 as the profile to test against 1.1.x
URL: https://github.com/apache/flink/pull/10081
 
 
   
   
   ## What is the purpose of the change
   
   Test against Hive-1.1.0.
   
   ## Brief change log
   
 - Changed the 1.1.x profile to use Hive-1.1.0.
   
   
   ## Verifying this change
   
   Manually verified tests pass with 1.1.0.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): yes
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? no
 - If yes, how is the feature documented? NA
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] libenchao commented on a change in pull request #10060: [FLINK-14546] [flink-json] Support map type in flink-json

2019-11-04 Thread GitBox
libenchao commented on a change in pull request #10060: [FLINK-14546] 
[flink-json] Support map type in flink-json
URL: https://github.com/apache/flink/pull/10060#discussion_r342379414
 
 

 ##
 File path: 
flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDeserializationSchemaTest.java
 ##
 @@ -61,19 +65,21 @@ public void testTypeInfoDeserialization() throws Exception 
{
root.put("id", id);
root.put("name", name);
root.put("bytes", bytes);
+   root.putObject("map").put("flink", 123);
 
byte[] serializedJson = objectMapper.writeValueAsBytes(root);
 
JsonRowDeserializationSchema deserializationSchema = new 
JsonRowDeserializationSchema.Builder(
Types.ROW_NAMED(
-   new String[]{"id", "name", "bytes"},
-   Types.LONG, Types.STRING, 
Types.PRIMITIVE_ARRAY(Types.BYTE))
+   new String[]{"id", "name", "bytes", "map"},
+   Types.LONG, Types.STRING, 
Types.PRIMITIVE_ARRAY(Types.BYTE), Types.MAP(Types.STRING, Types.LONG))
 
 Review comment:
   done.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-14605) Use Hive-1.1.0 as the profile to test against 1.1.x

2019-11-04 Thread Rui Li (Jira)
Rui Li created FLINK-14605:
--

 Summary: Use Hive-1.1.0 as the profile to test against 1.1.x
 Key: FLINK-14605
 URL: https://issues.apache.org/jira/browse/FLINK-14605
 Project: Flink
  Issue Type: Test
  Components: Connectors / Hive
Reporter: Rui Li


Hive-1.1.1 has the issue that it can't properly handle {{stored as 
file_format}} syntax. So let's test against 1.1.0.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot edited a comment on issue #9885: [FLINK-14344][checkpointing] Snapshots master hook state asynchronously

2019-11-04 Thread GitBox
flinkbot edited a comment on issue #9885: [FLINK-14344][checkpointing] 
Snapshots master hook state asynchronously
URL: https://github.com/apache/flink/pull/9885#issuecomment-541299624
 
 
   
   ## CI report:
   
   * d0c426e6ff61dcad1de4c7d3e9a2a16416e77e54 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/131633477)
   * 22434aac9a08ebb0ea1a61eba28ec12a55149d10 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/133945741)
   * cdff164bf39e74bdfd81332d84f12193a09997e5 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/134839623)
   * 30f912924c8497d9c55aa62a4987d94e31d409b5 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/134843066)
   * 5012d25a071772b683abddd7ef4c8d660e4c164d : PENDING 
[Build](https://travis-ci.com/flink-ci/flink/builds/134966688)
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-14604) Bump commons-cli to 1.4

2019-11-04 Thread Wei Zhong (Jira)
Wei Zhong created FLINK-14604:
-

 Summary: Bump commons-cli to 1.4
 Key: FLINK-14604
 URL: https://issues.apache.org/jira/browse/FLINK-14604
 Project: Flink
  Issue Type: Improvement
  Components: Command Line Client
Reporter: Wei Zhong


Currently flink is using commons-cli 1.3.1. There is a 
[bug|https://issues.apache.org/jira/projects/CLI/issues/CLI-265] in it which 
prevent us from using options that accept variable arguments in command line.

To be precise, it prevents us from accepting a short-name option after a 
varargs option because there is a problem in the implementation of 
DefaultParser#isShortOption() method in commons-cli 1.3.1:
{code:java}
/**
 * Tells if the token looks like a short option.
 * 
 * @param token
 */
private boolean isShortOption(String token)
{
// short options (-S, -SV, -S=V, -SV1=V2, -S1S2)
// PROBLEM: It assumes that short option only has single character,
//  but in fact we have many multi-characters short options.
return token.startsWith("-") && token.length() >= 2 && 
options.hasShortOption(token.substring(1, 2));
}
{code}
If we bump the version to 1.4, we can solve this problem.

I request this change because there are 2 varargs options which hit this bug in 
the design of command line options of [Python UDF Dependency 
Management|https://cwiki.apache.org/confluence/display/FLINK/FLIP-78%3A+Flink+Python+UDF+Environment+and+Dependency+Management].
 It will be great helpful if we can bump the commons-cli version to 1.4 :).

The commons-cli 1.4 is also a stable version which released at Mar, 2017. And 
today its usage statistic is greater than 1.3.1 on [maven central 
repository|https://mvnrepository.com/artifact/commons-cli/commons-cli].

I have pushed the change to my own travis to check if it breaks something. This 
is the [link|https://travis-ci.org/WeiZhong94/flink/builds/607438208] and it 
seems that everything is fine.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot edited a comment on issue #10060: [FLINK-14546] [flink-json] Support map type in flink-json

2019-11-04 Thread GitBox
flinkbot edited a comment on issue #10060: [FLINK-14546] [flink-json] Support 
map type in flink-json
URL: https://github.com/apache/flink/pull/10060#issuecomment-548313242
 
 
   
   ## CI report:
   
   * c68dc12d24be4ef19efc1ef42246e6018c4f6e8d : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/134363638)
   * f65310be9fb96f2446398a718dabeb4bd5a7a874 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/134963469)
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #10059: [FLINK-14543][table] Support partition for temporary table

2019-11-04 Thread GitBox
flinkbot edited a comment on issue #10059: [FLINK-14543][table] Support 
partition for temporary table
URL: https://github.com/apache/flink/pull/10059#issuecomment-548289939
 
 
   
   ## CI report:
   
   * 4937b8b139bab4957799947b841fb1ae3758ed48 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/134352112)
   * 6de7bf454ccb8a05ec596954b7551ea9eddac297 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/134385271)
   * 177c6c50b0636e6e14bbb6c511c88cff05905318 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/134792783)
   * ab296376ac94d06f5a94b93dafa9c9aab2e0eab7 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/134963458)
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] walterddr commented on a change in pull request #9373: [FLINK-13596][ml] Add two utils for Table transformations

2019-11-04 Thread GitBox
walterddr commented on a change in pull request #9373: [FLINK-13596][ml] Add 
two utils for Table transformations
URL: https://github.com/apache/flink/pull/9373#discussion_r342376557
 
 

 ##
 File path: 
flink-ml-parent/flink-ml-lib/src/main/java/org/apache/flink/ml/common/utils/DataSetConversionUtil.java
 ##
 @@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.ml.common.utils;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.operators.SingleInputUdfOperator;
+import org.apache.flink.api.java.operators.TwoInputUdfOperator;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.ml.common.MLEnvironment;
+import org.apache.flink.ml.common.MLEnvironmentFactory;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.types.Row;
+
+/**
+ * Provide functions of conversions between DataSet and Table.
+ */
+public class DataSetConversionUtil {
+   /**
+* Convert the given Table to {@link DataSet}<{@link Row}>.
+*
+* @param sessionId the sessionId of {@link MLEnvironmentFactory}
+* @param table the Table to convert.
+* @return the converted DataSet.
+*/
+   public static DataSet  fromTable(Long sessionId, Table table) {
+   return MLEnvironmentFactory
+   .get(sessionId)
+   .getBatchTableEnvironment()
+   .toDataSet(table, Row.class);
+   }
+
+   /**
+* Convert the given DataSet into a Table with specified TableSchema.
+*
+* @param sessionId the sessionId of {@link MLEnvironmentFactory}
+* @param data   the DataSet to convert.
+* @param schema the specified TableSchema.
+* @return the converted Table.
+*/
+   public static Table toTable(Long sessionId, DataSet  data, 
TableSchema schema) {
 
 Review comment:
   also if `schema.getFieldTypes()` is deprecated. I would rather we remove 
this function and just allow the user to handle deprecation in user code. (this 
would be much cleaner for future maintenance)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] walterddr commented on a change in pull request #9373: [FLINK-13596][ml] Add two utils for Table transformations

2019-11-04 Thread GitBox
walterddr commented on a change in pull request #9373: [FLINK-13596][ml] Add 
two utils for Table transformations
URL: https://github.com/apache/flink/pull/9373#discussion_r342374624
 
 

 ##
 File path: 
flink-ml-parent/flink-ml-lib/src/main/java/org/apache/flink/ml/common/utils/DataSetConversionUtil.java
 ##
 @@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.ml.common.utils;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.operators.SingleInputUdfOperator;
+import org.apache.flink.api.java.operators.TwoInputUdfOperator;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.ml.common.MLEnvironment;
+import org.apache.flink.ml.common.MLEnvironmentFactory;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.types.Row;
+
+/**
+ * Provide functions of conversions between DataSet and Table.
+ */
+public class DataSetConversionUtil {
+   /**
+* Convert the given Table to {@link DataSet}<{@link Row}>.
+*
+* @param sessionId the sessionId of {@link MLEnvironmentFactory}
+* @param table the Table to convert.
+* @return the converted DataSet.
+*/
+   public static DataSet  fromTable(Long sessionId, Table table) {
+   return MLEnvironmentFactory
+   .get(sessionId)
+   .getBatchTableEnvironment()
+   .toDataSet(table, Row.class);
+   }
+
+   /**
+* Convert the given DataSet into a Table with specified TableSchema.
+*
+* @param sessionId the sessionId of {@link MLEnvironmentFactory}
+* @param data   the DataSet to convert.
+* @param schema the specified TableSchema.
+* @return the converted Table.
+*/
+   public static Table toTable(Long sessionId, DataSet  data, 
TableSchema schema) {
 
 Review comment:
   This API seems to be not tested.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] walterddr commented on a change in pull request #9373: [FLINK-13596][ml] Add two utils for Table transformations

2019-11-04 Thread GitBox
walterddr commented on a change in pull request #9373: [FLINK-13596][ml] Add 
two utils for Table transformations
URL: https://github.com/apache/flink/pull/9373#discussion_r342375237
 
 

 ##
 File path: 
flink-ml-parent/flink-ml-lib/src/main/java/org/apache/flink/ml/common/utils/DataSetConversionUtil.java
 ##
 @@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.ml.common.utils;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.operators.SingleInputUdfOperator;
+import org.apache.flink.api.java.operators.TwoInputUdfOperator;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.ml.common.MLEnvironment;
+import org.apache.flink.ml.common.MLEnvironmentFactory;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.types.Row;
+
+/**
+ * Provide functions of conversions between DataSet and Table.
+ */
+public class DataSetConversionUtil {
+   /**
+* Convert the given Table to {@link DataSet}<{@link Row}>.
+*
+* @param sessionId the sessionId of {@link MLEnvironmentFactory}
+* @param table the Table to convert.
+* @return the converted DataSet.
+*/
+   public static DataSet  fromTable(Long sessionId, Table table) {
+   return MLEnvironmentFactory
+   .get(sessionId)
+   .getBatchTableEnvironment()
+   .toDataSet(table, Row.class);
+   }
+
+   /**
+* Convert the given DataSet into a Table with specified TableSchema.
+*
+* @param sessionId the sessionId of {@link MLEnvironmentFactory}
+* @param data   the DataSet to convert.
+* @param schema the specified TableSchema.
+* @return the converted Table.
+*/
+   public static Table toTable(Long sessionId, DataSet  data, 
TableSchema schema) {
+   return toTable(sessionId, data, schema.getFieldNames(), 
schema.getFieldTypes());
+   }
+
+   /**
+* Convert the given DataSet into a Table with specified colNames and 
colTypes.
+*
+* @param sessionId sessionId the sessionId of {@link 
MLEnvironmentFactory}.
+* @param data the DataSet to convert.
+* @param colNames the specified colNames.
+* @param colTypes the specified colTypes. This variable is used only 
when the
+* DataSet is produced by a function and Flink cannot 
determine
+* automatically what the produced type is.
+* @return the converted Table.
+*/
+   public static Table toTable(Long sessionId, DataSet  data, 
String[] colNames, TypeInformation [] colTypes) {
+   return toTable(MLEnvironmentFactory.get(sessionId), data, 
colNames, colTypes);
+   }
+
+   /**
+* Convert the given DataSet into a Table with specified colNames.
+*
+* @param sessionId sessionId the sessionId of {@link 
MLEnvironmentFactory}.
+* @param data the DataSet to convert.
+* @param colNames the specified colNames.
+* @return the converted Table.
+*/
+   public static Table toTable(Long sessionId, DataSet  data, 
String[] colNames) {
+   return toTable(MLEnvironmentFactory.get(sessionId), data, 
colNames);
+   }
+
+   /**
+* Convert the given DataSet into a Table with specified colNames and 
colTypes.
+*
+* @param session the MLEnvironment using to convert DataSet to Table.
+* @param data the DataSet to convert.
+* @param colNames the specified colNames.
+* @param colTypes the specified colTypes. This variable is used only 
when the
+* DataSet is produced by a function and Flink cannot 
determine
+* automatically what the produced type is.
+* @return the converted Table.
+*/
+   public static Table toTable(MLEnvironment session, DataSet  

[GitHub] [flink] walterddr commented on a change in pull request #9373: [FLINK-13596][ml] Add two utils for Table transformations

2019-11-04 Thread GitBox
walterddr commented on a change in pull request #9373: [FLINK-13596][ml] Add 
two utils for Table transformations
URL: https://github.com/apache/flink/pull/9373#discussion_r342376183
 
 

 ##
 File path: 
flink-ml-parent/flink-ml-lib/src/main/java/org/apache/flink/ml/common/utils/DataSetConversionUtil.java
 ##
 @@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.ml.common.utils;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.operators.SingleInputUdfOperator;
+import org.apache.flink.api.java.operators.TwoInputUdfOperator;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.ml.common.MLEnvironment;
+import org.apache.flink.ml.common.MLEnvironmentFactory;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.types.Row;
+
+/**
+ * Provide functions of conversions between DataSet and Table.
+ */
+public class DataSetConversionUtil {
+   /**
+* Convert the given Table to {@link DataSet}<{@link Row}>.
+*
+* @param sessionId the sessionId of {@link MLEnvironmentFactory}
+* @param table the Table to convert.
+* @return the converted DataSet.
+*/
+   public static DataSet  fromTable(Long sessionId, Table table) {
+   return MLEnvironmentFactory
+   .get(sessionId)
+   .getBatchTableEnvironment()
+   .toDataSet(table, Row.class);
+   }
+
+   /**
+* Convert the given DataSet into a Table with specified TableSchema.
+*
+* @param sessionId the sessionId of {@link MLEnvironmentFactory}
+* @param data   the DataSet to convert.
+* @param schema the specified TableSchema.
+* @return the converted Table.
+*/
+   public static Table toTable(Long sessionId, DataSet  data, 
TableSchema schema) {
+   return toTable(sessionId, data, schema.getFieldNames(), 
schema.getFieldTypes());
+   }
+
+   /**
+* Convert the given DataSet into a Table with specified colNames and 
colTypes.
+*
+* @param sessionId sessionId the sessionId of {@link 
MLEnvironmentFactory}.
+* @param data the DataSet to convert.
+* @param colNames the specified colNames.
+* @param colTypes the specified colTypes. This variable is used only 
when the
+* DataSet is produced by a function and Flink cannot 
determine
+* automatically what the produced type is.
+* @return the converted Table.
+*/
+   public static Table toTable(Long sessionId, DataSet  data, 
String[] colNames, TypeInformation [] colTypes) {
+   return toTable(MLEnvironmentFactory.get(sessionId), data, 
colNames, colTypes);
+   }
+
+   /**
+* Convert the given DataSet into a Table with specified colNames.
+*
+* @param sessionId sessionId the sessionId of {@link 
MLEnvironmentFactory}.
+* @param data the DataSet to convert.
+* @param colNames the specified colNames.
+* @return the converted Table.
+*/
+   public static Table toTable(Long sessionId, DataSet  data, 
String[] colNames) {
+   return toTable(MLEnvironmentFactory.get(sessionId), data, 
colNames);
+   }
+
+   /**
+* Convert the given DataSet into a Table with specified colNames and 
colTypes.
+*
+* @param session the MLEnvironment using to convert DataSet to Table.
+* @param data the DataSet to convert.
+* @param colNames the specified colNames.
+* @param colTypes the specified colTypes. This variable is used only 
when the
+* DataSet is produced by a function and Flink cannot 
determine
+* automatically what the produced type is.
+* @return the converted Table.
+*/
+   public static Table toTable(MLEnvironment session, DataSet  

[GitHub] [flink] walterddr commented on a change in pull request #9373: [FLINK-13596][ml] Add two utils for Table transformations

2019-11-04 Thread GitBox
walterddr commented on a change in pull request #9373: [FLINK-13596][ml] Add 
two utils for Table transformations
URL: https://github.com/apache/flink/pull/9373#discussion_r342375974
 
 

 ##
 File path: 
flink-ml-parent/flink-ml-lib/src/main/java/org/apache/flink/ml/common/utils/DataSetConversionUtil.java
 ##
 @@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.ml.common.utils;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.operators.SingleInputUdfOperator;
+import org.apache.flink.api.java.operators.TwoInputUdfOperator;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.ml.common.MLEnvironment;
+import org.apache.flink.ml.common.MLEnvironmentFactory;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.types.Row;
+
+/**
+ * Provide functions of conversions between DataSet and Table.
+ */
+public class DataSetConversionUtil {
+   /**
+* Convert the given Table to {@link DataSet}<{@link Row}>.
+*
+* @param sessionId the sessionId of {@link MLEnvironmentFactory}
+* @param table the Table to convert.
+* @return the converted DataSet.
+*/
+   public static DataSet  fromTable(Long sessionId, Table table) {
+   return MLEnvironmentFactory
+   .get(sessionId)
+   .getBatchTableEnvironment()
+   .toDataSet(table, Row.class);
+   }
+
+   /**
+* Convert the given DataSet into a Table with specified TableSchema.
+*
+* @param sessionId the sessionId of {@link MLEnvironmentFactory}
+* @param data   the DataSet to convert.
+* @param schema the specified TableSchema.
+* @return the converted Table.
+*/
+   public static Table toTable(Long sessionId, DataSet  data, 
TableSchema schema) {
+   return toTable(sessionId, data, schema.getFieldNames(), 
schema.getFieldTypes());
+   }
+
+   /**
+* Convert the given DataSet into a Table with specified colNames and 
colTypes.
+*
+* @param sessionId sessionId the sessionId of {@link 
MLEnvironmentFactory}.
+* @param data the DataSet to convert.
+* @param colNames the specified colNames.
+* @param colTypes the specified colTypes. This variable is used only 
when the
+* DataSet is produced by a function and Flink cannot 
determine
+* automatically what the produced type is.
+* @return the converted Table.
+*/
+   public static Table toTable(Long sessionId, DataSet  data, 
String[] colNames, TypeInformation [] colTypes) {
+   return toTable(MLEnvironmentFactory.get(sessionId), data, 
colNames, colTypes);
+   }
+
+   /**
+* Convert the given DataSet into a Table with specified colNames.
+*
+* @param sessionId sessionId the sessionId of {@link 
MLEnvironmentFactory}.
+* @param data the DataSet to convert.
+* @param colNames the specified colNames.
+* @return the converted Table.
+*/
+   public static Table toTable(Long sessionId, DataSet  data, 
String[] colNames) {
+   return toTable(MLEnvironmentFactory.get(sessionId), data, 
colNames);
+   }
+
+   /**
+* Convert the given DataSet into a Table with specified colNames and 
colTypes.
+*
+* @param session the MLEnvironment using to convert DataSet to Table.
+* @param data the DataSet to convert.
+* @param colNames the specified colNames.
+* @param colTypes the specified colTypes. This variable is used only 
when the
+* DataSet is produced by a function and Flink cannot 
determine
+* automatically what the produced type is.
+* @return the converted Table.
+*/
+   public static Table toTable(MLEnvironment session, DataSet  

[GitHub] [flink] openinx commented on issue #10042: [FLINK-11466][e2e] Design the e2e java framework so that the Kafka streaming tests and Kafka SQL tests can running on it

2019-11-04 Thread GitBox
openinx commented on issue #10042: [FLINK-11466][e2e] Design the e2e java 
framework so that the Kafka streaming tests and Kafka SQL tests can running on 
it
URL: https://github.com/apache/flink/pull/10042#issuecomment-549655452
 
 
   Ping @tillrohrmann @zentol for reviewing ..


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-14572) BlobsCleanupITCase failed in Travis stage core - scheduler_ng

2019-11-04 Thread Zhu Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14572?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16967225#comment-16967225
 ] 

Zhu Zhu commented on FLINK-14572:
-

>From the root cause, the case fails when trying to get jar files from blob 
>server, but the file does not exist.
This seems not to be related to NG scheduler since the scheduler is not created 
yet.

>From the logics of {{BlobsCleanupITCase#testBlobServerCleanup()}}, a job will 
>only be submitted if the jar uploading succeeded with 
>{{BlobClient.uploadFiles}}. So it's high likely that the uploaded jar was 
>removed unexpectedly before the job is submitted.
But I've no idea why the file can be removed. 
And I cannot produce this problem locally with hundreds of re-runs either.

[~trohrmann][~gjy] Does you have good ideas for it?

Root cause:
07:47:47,787 ERROR org.apache.flink.runtime.dispatcher.StandaloneDispatcher 
 - Failed to submit job 15838a6ef77eb89697d3def42c1a58b0.
java.lang.RuntimeException: 
org.apache.flink.runtime.client.JobExecutionException: Could not set up 
JobManager
   at 
org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:36)
   at 
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
   at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
   at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
   at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
   at 
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
   at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
   at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Could not set 
up JobManager
   at 
org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.(JobManagerRunnerImpl.java:152)
   at 
org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner(DefaultJobManagerRunnerFactory.java:84)
   at 
org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$6(Dispatcher.java:381)
   at 
org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:34)
   ... 7 more
Caused by: java.lang.Exception: Cannot set up the user code libraries: 
/tmp/junit6489941706970935338/junit9210755917417967354/blobStore-1474738d-89f8-4ab0-88b2-9df867ba4cc1/incoming/temp-0001
   at 
org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.(JobManagerRunnerImpl.java:131)
   ... 10 more
Caused by: java.nio.file.NoSuchFileException: 
/tmp/junit6489941706970935338/junit9210755917417967354/blobStore-1474738d-89f8-4ab0-88b2-9df867ba4cc1/incoming/temp-0001
   at sun.nio.fs.UnixException.translateToIOException(UnixException.java:86)
   at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
   at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107)
   at sun.nio.fs.UnixCopyFile.move(UnixCopyFile.java:409)
   at 
sun.nio.fs.UnixFileSystemProvider.move(UnixFileSystemProvider.java:262)
   at java.nio.file.Files.move(Files.java:1395)
   at 
org.apache.flink.runtime.blob.BlobUtils.moveTempFileToStore(BlobUtils.java:410)
   at 
org.apache.flink.runtime.blob.BlobServer.getFileInternal(BlobServer.java:497)
   at 
org.apache.flink.runtime.blob.BlobServer.getFileInternal(BlobServer.java:444)
   at org.apache.flink.runtime.blob.BlobServer.getFile(BlobServer.java:417)
   at 
org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager.registerTask(BlobLibraryCacheManager.java:120)
   at 
org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager.registerJob(BlobLibraryCacheManager.java:91)
   at 
org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.(JobManagerRunnerImpl.java:128)
   ... 10 more

> BlobsCleanupITCase failed in Travis stage core - scheduler_ng
> -
>
> Key: FLINK-14572
> URL: https://issues.apache.org/jira/browse/FLINK-14572
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination, Tests
>Affects Versions: 1.10.0
>Reporter: Gary Yao
>Priority: Critical
>  Labels: scheduler-ng, test-stability
> Fix For: 1.10.0
>
>
> {noformat}
> java.lang.AssertionError: 
> Expected: is 
>  but: was 
>   at 
> org.apache.flink.runtime.jobmanager.BlobsCleanupITCase.testBlobServerCleanup(BlobsCleanupITCase.java:220)
>   at 
> org.apache.flink.runtime.jobmanager.BlobsCleanupITCase.testBlobServerCleanupFinishedJob(BlobsCleanupITCase.java:133)
> {noformat}
> https://api.travis-ci.com/v3/job/250445874/log.txt



--

[GitHub] [flink] flinkbot edited a comment on issue #9885: [FLINK-14344][checkpointing] Snapshots master hook state asynchronously

2019-11-04 Thread GitBox
flinkbot edited a comment on issue #9885: [FLINK-14344][checkpointing] 
Snapshots master hook state asynchronously
URL: https://github.com/apache/flink/pull/9885#issuecomment-541299624
 
 
   
   ## CI report:
   
   * d0c426e6ff61dcad1de4c7d3e9a2a16416e77e54 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/131633477)
   * 22434aac9a08ebb0ea1a61eba28ec12a55149d10 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/133945741)
   * cdff164bf39e74bdfd81332d84f12193a09997e5 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/134839623)
   * 30f912924c8497d9c55aa62a4987d94e31d409b5 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/134843066)
   * 5012d25a071772b683abddd7ef4c8d660e4c164d : UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #10042: [FLINK-11466][e2e] Design the e2e java framework so that the Kafka streaming tests and Kafka SQL tests can running on it

2019-11-04 Thread GitBox
flinkbot edited a comment on issue #10042: [FLINK-11466][e2e] Design the e2e 
java framework so that the Kafka streaming tests and Kafka SQL tests can 
running on it
URL: https://github.com/apache/flink/pull/10042#issuecomment-547792578
 
 
   
   ## CI report:
   
   * b523eace793d9168a2816e50891d6227183a7175 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/134149629)
   * 876f0f86b208f6fce4d74488081791dcc7b89cf8 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/134326879)
   * 027d5fe9ca3832c9384a715f3f7e65df3fd64dcf : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/134395038)
   * 4108e40950bfcf9f26e8b62603a6c35402252966 : PENDING 
[Build](https://travis-ci.com/flink-ci/flink/builds/134965103)
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-14591) Execute PlannerBase#mergeParameters every time of calling PlannerBase#translate method

2019-11-04 Thread Jark Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14591?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16967212#comment-16967212
 ] 

Jark Wu commented on FLINK-14591:
-

Hi [~Dillon.], I'm not sure the fixing approach yet. 
Hi [~godfreyhe], what do you think about this problem? 

>  Execute PlannerBase#mergeParameters every time of calling 
> PlannerBase#translate method
> ---
>
> Key: FLINK-14591
> URL: https://issues.apache.org/jira/browse/FLINK-14591
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Reporter: Wei Zhong
>Priority: Minor
>
> In current implementation of blink planner, the method 
> "PlannerBase#mergeParameter" will be called by "PlannerBase#translate" method 
> to merge the configuration inside TableConfig into global job parameters:
> {code:scala}
>   override def translate(
>   modifyOperations: util.List[ModifyOperation]): 
> util.List[Transformation[_]] = {
> if (modifyOperations.isEmpty) {
>   return List.empty[Transformation[_]]
> }
> mergeParameters()
> val relNodes = modifyOperations.map(translateToRel)
> val optimizedRelNodes = optimize(relNodes)
> val execNodes = translateToExecNodePlan(optimizedRelNodes)
> translateToPlan(execNodes)
>   }
> {code}
> This translate method is called in every important moment, e.g. execute, 
> toDataStream, insertInto, etc.
> But as shown above, there is a chance that the method return directly and not 
> call the "mergeParameters".
> In fact if we set some configurations between the "Table#insertInto" method 
> and "TableEnvironment#execute" method, these configurations will not be 
> merged into global job parameters because the "mergeParameters" method is not 
> called:
> {code:scala}
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> val tEnv = StreamTableEnvironment.create(env, 
> EnvironmentSettings.newInstance.useBlinkPlanner.build)
> ...
> ...
> val result = ...
> val sink = TestSinkUtil.configureSink(result, new TestingAppendTableSink)
> tEnv.registerTableSink("MySink", sink)
> tEnv.getConfig.getConfiguration.setString("jobparam1", "value1")
> result.insertInto("MySink")
> 
> // the "jobparam2" configuration will loss
> tEnv.getConfig.getConfiguration.setString("jobparam2", "value2")
> tEnv.execute("test")
> val jobConfig = env.getConfig.getGlobalJobParameters.toMap
> 
> assertTrue(jobConfig.get("jobparam1")=="value1")
> // this assertion will fail:
> assertTrue(jobConfig.get("jobparam2")=="value2"){code}
> This may bring some confusion to the user. It will be great if we can fix 
> this problem.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot edited a comment on issue #10080: [FLINK-14600] Degenerate the current AtomicInteger type of verticesFinished to a normal int type

2019-11-04 Thread GitBox
flinkbot edited a comment on issue #10080: [FLINK-14600] Degenerate the current 
AtomicInteger type of verticesFinished to a normal int type
URL: https://github.com/apache/flink/pull/10080#issuecomment-549633254
 
 
   
   ## CI report:
   
   * d854a07f002bd43cab524acb03e4dbd918869827 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/134960303)
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #10067: [FLINK-14375][runtime] Avoid to notify scheduler about fake or outdated state update

2019-11-04 Thread GitBox
flinkbot edited a comment on issue #10067: [FLINK-14375][runtime] Avoid to 
notify scheduler about fake or outdated state update
URL: https://github.com/apache/flink/pull/10067#issuecomment-548812084
 
 
   
   ## CI report:
   
   * d3341d9f0bff2cb97bd0b5bf507725ae6d670b88 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/134571714)
   * 37c17d2be1cdbd72cc8a3554f7c8cf09f2c8a400 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/134655935)
   * c290a32e36d7706fd5dd426a3ad85b65d113a6fc : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/134890002)
   * 4908a3ce961c7518b10e43831fa5aca2764cbf1a : PENDING 
[Build](https://travis-ci.com/flink-ci/flink/builds/134963479)
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #10071: [FLINK-14371][runtime, tests] Enable ClassLoaderITCase/WindowCheckpointingITCase/EventTimeWindowCheckpointingITCase for NG scheduling

2019-11-04 Thread GitBox
flinkbot edited a comment on issue #10071: [FLINK-14371][runtime, tests] Enable 
ClassLoaderITCase/WindowCheckpointingITCase/EventTimeWindowCheckpointingITCase 
for NG scheduling
URL: https://github.com/apache/flink/pull/10071#issuecomment-549017258
 
 
   
   ## CI report:
   
   * 80b38a072f3c5a415bf3c95e883c358a1648cc37 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/134655999)
   * 7e1c5a50ab017261aadda70e97852c298c05633b : PENDING 
[Build](https://travis-ci.com/flink-ci/flink/builds/134963492)
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #10060: [FLINK-14546] [flink-json] Support map type in flink-json

2019-11-04 Thread GitBox
flinkbot edited a comment on issue #10060: [FLINK-14546] [flink-json] Support 
map type in flink-json
URL: https://github.com/apache/flink/pull/10060#issuecomment-548313242
 
 
   
   ## CI report:
   
   * c68dc12d24be4ef19efc1ef42246e6018c4f6e8d : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/134363638)
   * f65310be9fb96f2446398a718dabeb4bd5a7a874 : PENDING 
[Build](https://travis-ci.com/flink-ci/flink/builds/134963469)
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #10059: [FLINK-14543][table] Support partition for temporary table

2019-11-04 Thread GitBox
flinkbot edited a comment on issue #10059: [FLINK-14543][table] Support 
partition for temporary table
URL: https://github.com/apache/flink/pull/10059#issuecomment-548289939
 
 
   
   ## CI report:
   
   * 4937b8b139bab4957799947b841fb1ae3758ed48 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/134352112)
   * 6de7bf454ccb8a05ec596954b7551ea9eddac297 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/134385271)
   * 177c6c50b0636e6e14bbb6c511c88cff05905318 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/134792783)
   * ab296376ac94d06f5a94b93dafa9c9aab2e0eab7 : PENDING 
[Build](https://travis-ci.com/flink-ci/flink/builds/134963458)
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #10042: [FLINK-11466][e2e] Design the e2e java framework so that the Kafka streaming tests and Kafka SQL tests can running on it

2019-11-04 Thread GitBox
flinkbot edited a comment on issue #10042: [FLINK-11466][e2e] Design the e2e 
java framework so that the Kafka streaming tests and Kafka SQL tests can 
running on it
URL: https://github.com/apache/flink/pull/10042#issuecomment-547792578
 
 
   
   ## CI report:
   
   * b523eace793d9168a2816e50891d6227183a7175 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/134149629)
   * 876f0f86b208f6fce4d74488081791dcc7b89cf8 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/134326879)
   * 027d5fe9ca3832c9384a715f3f7e65df3fd64dcf : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/134395038)
   * 4108e40950bfcf9f26e8b62603a6c35402252966 : UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-14591) Execute PlannerBase#mergeParameters every time of calling PlannerBase#translate method

2019-11-04 Thread Zhanchun Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14591?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16967203#comment-16967203
 ] 

Zhanchun Zhang commented on FLINK-14591:


Hi [~zhongwei][~jark], I'm willing to fix this issue, can you assign it to me. 
Thanks ~

>  Execute PlannerBase#mergeParameters every time of calling 
> PlannerBase#translate method
> ---
>
> Key: FLINK-14591
> URL: https://issues.apache.org/jira/browse/FLINK-14591
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Reporter: Wei Zhong
>Priority: Minor
>
> In current implementation of blink planner, the method 
> "PlannerBase#mergeParameter" will be called by "PlannerBase#translate" method 
> to merge the configuration inside TableConfig into global job parameters:
> {code:scala}
>   override def translate(
>   modifyOperations: util.List[ModifyOperation]): 
> util.List[Transformation[_]] = {
> if (modifyOperations.isEmpty) {
>   return List.empty[Transformation[_]]
> }
> mergeParameters()
> val relNodes = modifyOperations.map(translateToRel)
> val optimizedRelNodes = optimize(relNodes)
> val execNodes = translateToExecNodePlan(optimizedRelNodes)
> translateToPlan(execNodes)
>   }
> {code}
> This translate method is called in every important moment, e.g. execute, 
> toDataStream, insertInto, etc.
> But as shown above, there is a chance that the method return directly and not 
> call the "mergeParameters".
> In fact if we set some configurations between the "Table#insertInto" method 
> and "TableEnvironment#execute" method, these configurations will not be 
> merged into global job parameters because the "mergeParameters" method is not 
> called:
> {code:scala}
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> val tEnv = StreamTableEnvironment.create(env, 
> EnvironmentSettings.newInstance.useBlinkPlanner.build)
> ...
> ...
> val result = ...
> val sink = TestSinkUtil.configureSink(result, new TestingAppendTableSink)
> tEnv.registerTableSink("MySink", sink)
> tEnv.getConfig.getConfiguration.setString("jobparam1", "value1")
> result.insertInto("MySink")
> 
> // the "jobparam2" configuration will loss
> tEnv.getConfig.getConfiguration.setString("jobparam2", "value2")
> tEnv.execute("test")
> val jobConfig = env.getConfig.getGlobalJobParameters.toMap
> 
> assertTrue(jobConfig.get("jobparam1")=="value1")
> // this assertion will fail:
> assertTrue(jobConfig.get("jobparam2")=="value2"){code}
> This may bring some confusion to the user. It will be great if we can fix 
> this problem.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


  1   2   3   4   >