[jira] [Created] (FLINK-16689) kafka connector as source get byte deserialize add support charsets

2020-03-19 Thread xiaodao (Jira)
xiaodao created FLINK-16689:
---

 Summary: kafka connector as source get byte deserialize add 
support charsets
 Key: FLINK-16689
 URL: https://issues.apache.org/jira/browse/FLINK-16689
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / Kafka
Affects Versions: 1.10.0
Reporter: xiaodao


some times kafkaProductor send record which is serialize with Specified 
charsets eg: gbk,

and the consumer is not support to deserialize with Specified charsets.

just like:

org.apache.flink.formats.json.JsonRowDeserializationSchema#deserialize



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot edited a comment on issue #11458: [FLINK-16625][k8s] Extract BootstrapTools#getEnvironmentVariables to a general utility in ConfigurationUtils

2020-03-19 Thread GitBox
flinkbot edited a comment on issue #11458: [FLINK-16625][k8s] Extract 
BootstrapTools#getEnvironmentVariables to a general utility in 
ConfigurationUtils
URL: https://github.com/apache/flink/pull/11458#issuecomment-601537757
 
 
   
   ## CI report:
   
   * 61e13aca86d70d5279d9ed0e482a5797915b068c Travis: 
[PENDING](https://travis-ci.com/github/flink-ci/flink/builds/154202409) Azure: 
[PENDING](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=6432)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] jinglining commented on a change in pull request #11250: [FLINK-16302][rest]add log list and read log by name for taskmanager

2020-03-19 Thread GitBox
jinglining commented on a change in pull request #11250: [FLINK-16302][rest]add 
log list and read log by name for taskmanager
URL: https://github.com/apache/flink/pull/11250#discussion_r395440082
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/AbstractTaskManagerFileHandler.java
 ##
 @@ -263,4 +266,8 @@ private void transferFile(ChannelHandlerContext ctx, File 
file, HttpRequest http
throw new FlinkException("Could not transfer file " + 
file + " to the client.", ioe);
}
}
+
+   protected String getFileName(HandlerRequest 
handlerRequest) {
+   return null;
 
 Review comment:
   If a user requests STDOUT and then LOG, he will not get STDOUT twice. 
   As `fileBlobKeys` is not static, so it's belonging to its object. And I have 
tested it by:
   ```java
   log.info(String.format("fileBlobKeys [%s] cached file for TaskExecutor [%s] 
taskManagerId [%s], blobKey [%s]", fileBlobKeys.toString(), 
taskManagerIdAndFileName, taskManagerId, blobKey.toString()));
   ```
   
   The result is:
   ```text
   2020-03-20 11:32:18,582 INFO  
org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerLogFileHandler [] 
- fileBlobKeys 
[org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalLoadingCache@6cfede43]
 cached file for TaskExecutor [(c65f774c537555a1fb64c64f9b1ff18b,null)] 
taskManagerId [c65f774c537555a1fb64c64f9b1ff18b], blobKey 
[t-a8637ce83ae75216ce99399eb59f07c63c53603b-f018c857f158c354d3018f6d03ae2ad6]
   2020-03-20 11:32:24,202 INFO  
org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerStdoutFileHandler 
[] - fileBlobKeys 
[org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalLoadingCache@76dd5d49]
 cached file for TaskExecutor [(c65f774c537555a1fb64c64f9b1ff18b,null)] 
taskManagerId [c65f774c537555a1fb64c64f9b1ff18b], blobKey 
[t-da39a3ee5e6b4b0d3255bfef95601890afd80709-89a50af389ae582c598680476d832e0d]
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] jinglining commented on a change in pull request #11250: [FLINK-16302][rest]add log list and read log by name for taskmanager

2020-03-19 Thread GitBox
jinglining commented on a change in pull request #11250: [FLINK-16302][rest]add 
log list and read log by name for taskmanager
URL: https://github.com/apache/flink/pull/11250#discussion_r395440082
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/AbstractTaskManagerFileHandler.java
 ##
 @@ -263,4 +266,8 @@ private void transferFile(ChannelHandlerContext ctx, File 
file, HttpRequest http
throw new FlinkException("Could not transfer file " + 
file + " to the client.", ioe);
}
}
+
+   protected String getFileName(HandlerRequest 
handlerRequest) {
+   return null;
 
 Review comment:
   If a user requests STDOUT and then LOG, he will not get STDOUT twice. As 
`fileBlobKeys` is not static, so it's belonging to its object. And I hava test 
it by 
   ```java
   log.info(String.format("fileBlobKeys [%s] cached file for TaskExecutor [%s] 
taskManagerId [%s], blobKey [%s]", fileBlobKeys.toString(), 
taskManagerIdAndFileName, taskManagerId, blobKey.toString()));
   ```
   
   The result is
   ```text
   2020-03-20 11:32:18,582 INFO  
org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerLogFileHandler [] 
- fileBlobKeys 
[org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalLoadingCache@6cfede43]
 cached file for TaskExecutor [(c65f774c537555a1fb64c64f9b1ff18b,null)] 
taskManagerId [c65f774c537555a1fb64c64f9b1ff18b], blobKey 
[t-a8637ce83ae75216ce99399eb59f07c63c53603b-f018c857f158c354d3018f6d03ae2ad6]
   2020-03-20 11:32:24,202 INFO  
org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerStdoutFileHandler 
[] - fileBlobKeys 
[org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalLoadingCache@76dd5d49]
 cached file for TaskExecutor [(c65f774c537555a1fb64c64f9b1ff18b,null)] 
taskManagerId [c65f774c537555a1fb64c64f9b1ff18b], blobKey 
[t-da39a3ee5e6b4b0d3255bfef95601890afd80709-89a50af389ae582c598680476d832e0d]
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot commented on issue #11458: [FLINK-16625][k8s] Extract BootstrapTools#getEnvironmentVariables to a general utility in ConfigurationUtils

2020-03-19 Thread GitBox
flinkbot commented on issue #11458: [FLINK-16625][k8s] Extract 
BootstrapTools#getEnvironmentVariables to a general utility in 
ConfigurationUtils
URL: https://github.com/apache/flink/pull/11458#issuecomment-601537757
 
 
   
   ## CI report:
   
   * 61e13aca86d70d5279d9ed0e482a5797915b068c UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xintongsong commented on issue #11284: [FLINK-15911][runtime] Make Flink work with NAT.

2020-03-19 Thread GitBox
xintongsong commented on issue #11284: [FLINK-15911][runtime] Make Flink work 
with NAT.
URL: https://github.com/apache/flink/pull/11284#issuecomment-601536922
 
 
   Thanks for the review @tillrohrmann.
   
   > A quick question for my understanding. In the e2e test we give every 
container an external address which is equal to the hosts ip address, right? 
How can the docker container route the packages for this address if the docker 
network does not happen to use the same subnet as the host's IP address? Can 
docker container talk to services running in the host's network? I guess I 
don't understand Docker good enough...
   
   As I understand, all the packages whose destination is not the docker 
network will be routed to the host. It depends on the host's setting whether 
and where to further route the package, which, without any special settings, 
should be simply forward the package like any other package sent from the host. 
Accessing other services running on the host should have no difference from 
accessing any public Internet address from inside a docker container.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink-playgrounds] garyfeng closed pull request #11: Pyflink

2020-03-19 Thread GitBox
garyfeng closed pull request #11: Pyflink
URL: https://github.com/apache/flink-playgrounds/pull/11
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink-playgrounds] garyfeng commented on issue #11: Pyflink

2020-03-19 Thread GitBox
garyfeng commented on issue #11: Pyflink
URL: https://github.com/apache/flink-playgrounds/pull/11#issuecomment-601535615
 
 
   The PyLink branch fixed the missing flink-connector and flink-formats 
issues. other bugs remain (#3). 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink-playgrounds] garyfeng opened a new pull request #11: Pyflink

2020-03-19 Thread GitBox
garyfeng opened a new pull request #11: Pyflink
URL: https://github.com/apache/flink-playgrounds/pull/11
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xintongsong commented on a change in pull request #11284: [FLINK-15911][runtime] Make Flink work with NAT.

2020-03-19 Thread GitBox
xintongsong commented on a change in pull request #11284: 
[FLINK-15911][runtime] Make Flink work with NAT.
URL: https://github.com/apache/flink/pull/11284#discussion_r395435948
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
 ##
 @@ -351,21 +351,21 @@ public static TaskExecutor startTaskManager(
 
LOG.info("Starting TaskManager with ResourceID: {}", 
resourceID);
 
-   InetAddress remoteAddress = 
InetAddress.getByName(rpcService.getAddress());
+   InetAddress externalAddress = 
InetAddress.getByName(rpcService.getAddress());
 
 Review comment:
   True. I'll try to fix the problem and update the e2e test case.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-16688) Hive-connector should set SessionState for hive

2020-03-19 Thread Jingsong Lee (Jira)
Jingsong Lee created FLINK-16688:


 Summary: Hive-connector should set SessionState for hive
 Key: FLINK-16688
 URL: https://issues.apache.org/jira/browse/FLINK-16688
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Hive
Affects Versions: 1.10.0
Reporter: Jingsong Lee
 Fix For: 1.10.1, 1.11.0


Without SessionState like GenericUDFUnixTimeStamp can not be used.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] xintongsong commented on a change in pull request #11284: [FLINK-15911][runtime] Make Flink work with NAT.

2020-03-19 Thread GitBox
xintongsong commented on a change in pull request #11284: 
[FLINK-15911][runtime] Make Flink work with NAT.
URL: https://github.com/apache/flink/pull/11284#discussion_r395435813
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
 ##
 @@ -102,21 +102,21 @@ public int getNumTaskManagers() {
public String getJobManagerBindAddress() {
return commonBindAddress != null ?
commonBindAddress :
-   
configuration.getString(JobManagerOptions.ADDRESS, "localhost");
+   
configuration.getString(JobManagerOptions.BIND_HOST, "localhost");
}
 
public String getTaskManagerBindAddress() {
return commonBindAddress != null ?
commonBindAddress :
-   
configuration.getString(TaskManagerOptions.HOST, "localhost");
+   
configuration.getString(TaskManagerOptions.BIND_HOST, "localhost");
}
 
-   public String getJobManagerBindPortRange() {
-   return 
String.valueOf(configuration.getInteger(JobManagerOptions.PORT, 0));
+   public int getJobManagerBindPort() {
+   return 
configuration.getInteger(JobManagerOptions.RPC_BIND_PORT, 0);
}
 
-   public String getTaskManagerBindPortRange() {
-   return configuration.getString(TaskManagerOptions.RPC_PORT);
+   public int getTaskManagerBindPort() {
+   return 
configuration.getInteger(TaskManagerOptions.RPC_BIND_PORT, 0);
 
 Review comment:
   Not sure if I fully understand your concern.
   
   I think the RPC services are meant to be used by Flink components 
internally. Then how would user be affected if the configured address is not 
used internally?
   
   I have no problem preserving the external address/ports if configured. Just 
trying to understand.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #11457: [FLINK-14255][WIP][table] Introduce FileSystemStreamingSinkV2

2020-03-19 Thread GitBox
flinkbot edited a comment on issue #11457: [FLINK-14255][WIP][table] Introduce 
FileSystemStreamingSinkV2
URL: https://github.com/apache/flink/pull/11457#issuecomment-601515074
 
 
   
   ## CI report:
   
   * 2d43f0faeabc760120d3e0cbfc9632e42b2cfab6 Travis: 
[FAILURE](https://travis-ci.com/github/flink-ci/flink/builds/154199678) Azure: 
[FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=6431)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot commented on issue #11458: [FLINK-16625][k8s] Extract BootstrapTools#getEnvironmentVariables to a general utility in ConfigurationUtils

2020-03-19 Thread GitBox
flinkbot commented on issue #11458: [FLINK-16625][k8s] Extract 
BootstrapTools#getEnvironmentVariables to a general utility in 
ConfigurationUtils
URL: https://github.com/apache/flink/pull/11458#issuecomment-601534139
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Automated Checks
   Last check on commit 61e13aca86d70d5279d9ed0e482a5797915b068c (Fri Mar 20 
04:31:53 UTC 2020)
   
   **Warnings:**
* No documentation files were touched! Remember to keep the Flink docs up 
to date!
   
   
   Mention the bot in a comment to re-run the automated checks.
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full 
explanation of the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-16625) Extract BootstrapTools#getEnvironmentVariables to a general utility in ConfigurationUtils

2020-03-19 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16625?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-16625:
---
Labels: pull-request-available  (was: )

> Extract BootstrapTools#getEnvironmentVariables to a general utility in 
> ConfigurationUtils
> -
>
> Key: FLINK-16625
> URL: https://issues.apache.org/jira/browse/FLINK-16625
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Core
>Reporter: Canbin Zheng
>Assignee: Canbin Zheng
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.11.0
>
>
> {{BootstrapTools#getEnvironmentVariables}} actually is a general utility to 
> extract key-value pairs with specified prefix trimmed from the Flink 
> Configuration object. It can not only be used to extract customized 
> environment variables in the YARN setup but also for customized 
> annotations/labels/node-selectors in the Kubernetes setup.
> This ticket proposes to rename it to 
> {{ConfigurationUtils#getPrefixedKeyValuePairs}} and move it to the 
> {{flink-core}} module as a more general utility to share for the 
> YARN/Kubernetes setup.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] zhengcanbin opened a new pull request #11458: [FLINK-16625][k8s] Extract BootstrapTools#getEnvironmentVariables to a general utility in ConfigurationUtils

2020-03-19 Thread GitBox
zhengcanbin opened a new pull request #11458: [FLINK-16625][k8s] Extract 
BootstrapTools#getEnvironmentVariables to a general utility in 
ConfigurationUtils
URL: https://github.com/apache/flink/pull/11458
 
 
   ## What is the purpose of the change
   
   `BootstrapTools#getEnvironmentVariables` actually is a general utility to 
extract key-value pairs with specified prefix trimmed from the Flink 
Configuration object. It can not only be used to extract customized environment 
variables in the YARN setup but also for customized 
annotations/labels/node-selectors in the Kubernetes setup.
   
   This ticket proposes to rename it to 
`ConfigurationUtils#getPrefixedKeyValuePairs` and move it to the flink-core 
module as a more general utility to share for the YARN/Kubernetes setup.
   
   ## Verifying this change
   
   This change added tests.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / **no**)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
 - The serializers: (yes / **no** / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / **no** 
/ don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: (yes / **no** / 
don't know)
 - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / **no**)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #11453: [FLINK-13553][tests] Fix ByteBuf leak in KvStateServerHandlerTest

2020-03-19 Thread GitBox
zhijiangW commented on a change in pull request #11453: [FLINK-13553][tests] 
Fix ByteBuf leak in KvStateServerHandlerTest
URL: https://github.com/apache/flink/pull/11453#discussion_r395433462
 
 

 ##
 File path: 
flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java
 ##
 @@ -544,6 +552,7 @@ public void testIncomingBufferIsRecycled() throws 
Exception {
 
channel.writeInbound(unexpected);
assertEquals("Buffer not recycled", 0L, unexpected.refCnt());
+   channel.finishAndReleaseAll();
 
 Review comment:
   I am curious of why we add this action only for this test, and actually the 
allocated buffers in this unit test are already released before.  


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #11457: [FLINK-14255][WIP][table] Introduce FileSystemStreamingSinkV2

2020-03-19 Thread GitBox
flinkbot edited a comment on issue #11457: [FLINK-14255][WIP][table] Introduce 
FileSystemStreamingSinkV2
URL: https://github.com/apache/flink/pull/11457#issuecomment-601515074
 
 
   
   ## CI report:
   
   * 2d43f0faeabc760120d3e0cbfc9632e42b2cfab6 Travis: 
[PENDING](https://travis-ci.com/github/flink-ci/flink/builds/154199678) Azure: 
[PENDING](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=6431)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] jiasheng55 commented on issue #11322: [FLINK-16376][yarn] Use consistent method to get Yarn application dir…

2020-03-19 Thread GitBox
jiasheng55 commented on issue #11322: [FLINK-16376][yarn] Use consistent method 
to get Yarn application dir…
URL: https://github.com/apache/flink/pull/11322#issuecomment-601517139
 
 
   @kl0u this PR removed the "targetDir" parameter from 
`YarnClusterDescriptor#uploadAndRegisterFiles` method and use 
`FileSystem#getHomeDirectory` as the default target direcotry, so the 
`YarnFileStageTestS3ITCase .testRecursiveUploadForYarnS3a ()` failed becuase it 
seemed having credential error accessing the **"/user/{user.name}"** bucket.
   
   If we choose not to use `mockito`, is it possible to set the bucket 
**"/user"**  accessible?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] ifndef-SleePy commented on a change in pull request #11347: [FLINK-14971][checkpointing] Make all the non-IO operations in CheckpointCoordinator single-threaded

2020-03-19 Thread GitBox
ifndef-SleePy commented on a change in pull request #11347: 
[FLINK-14971][checkpointing] Make all the non-IO operations in 
CheckpointCoordinator single-threaded
URL: https://github.com/apache/flink/pull/11347#discussion_r395431078
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 ##
 @@ -151,6 +149,9 @@
 * It must be single-threaded. Eventually it will be replaced by main 
thread executor. */
private final ScheduledExecutor timer;
 
+   @Nullable
+   private ComponentMainThreadExecutor mainThreadExecutor;
 
 Review comment:
   Yes, it could not be a `final` variable based on current implementation of 
`ExecutionGraph`. I'm just not a fan of the trick of 
`DummyComponentMainThreadExecutor`. But I agree with you that it's not a bad 
idea to provide a consistent way with `ExecutionGraph`. Let me think of it a 
bit more. Maybe we could find a better solution.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot commented on issue #11457: [FLINK-14255][WIP][table] Introduce FileSystemStreamingSinkV2

2020-03-19 Thread GitBox
flinkbot commented on issue #11457: [FLINK-14255][WIP][table] Introduce 
FileSystemStreamingSinkV2
URL: https://github.com/apache/flink/pull/11457#issuecomment-601515074
 
 
   
   ## CI report:
   
   * 2d43f0faeabc760120d3e0cbfc9632e42b2cfab6 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #11456: [FLINK-16602][k8s] Rework the internal/external Service

2020-03-19 Thread GitBox
flinkbot edited a comment on issue #11456: [FLINK-16602][k8s] Rework the 
internal/external Service
URL: https://github.com/apache/flink/pull/11456#issuecomment-601501414
 
 
   
   ## CI report:
   
   * e4c3073a6fb379bacb4da0e299245a73ea2a97d4 Travis: 
[SUCCESS](https://travis-ci.com/github/flink-ci/flink/builds/154194249) Azure: 
[FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=6429)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-16625) Extract BootstrapTools#getEnvironmentVariables to a general utility in ConfigurationUtils

2020-03-19 Thread Canbin Zheng (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16625?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Canbin Zheng updated FLINK-16625:
-
Summary: Extract BootstrapTools#getEnvironmentVariables to a general 
utility in ConfigurationUtils  (was: Extract 
BootstrapTools#getEnvironmentVariables to a general utility in 
ConfigurationUtil)

> Extract BootstrapTools#getEnvironmentVariables to a general utility in 
> ConfigurationUtils
> -
>
> Key: FLINK-16625
> URL: https://issues.apache.org/jira/browse/FLINK-16625
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Core
>Reporter: Canbin Zheng
>Assignee: Canbin Zheng
>Priority: Minor
> Fix For: 1.11.0
>
>
> {{BootstrapTools#getEnvironmentVariables}} actually is a general utility to 
> extract key-value pairs with specified prefix trimmed from the Flink 
> Configuration object. It can not only be used to extract customized 
> environment variables in the YARN setup but also for customized 
> annotations/labels/node-selectors in the Kubernetes setup.
> This ticket proposes to rename it to 
> {{ConfigurationUtil#getPrefixedKeyValuePairs}} and move it to the 
> {{flink-core}} module as a more general utility to share for the 
> YARN/Kubernetes setup.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-16625) Extract BootstrapTools#getEnvironmentVariables to a general utility in ConfigurationUtils

2020-03-19 Thread Canbin Zheng (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16625?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Canbin Zheng updated FLINK-16625:
-
Description: 
{{BootstrapTools#getEnvironmentVariables}} actually is a general utility to 
extract key-value pairs with specified prefix trimmed from the Flink 
Configuration object. It can not only be used to extract customized environment 
variables in the YARN setup but also for customized 
annotations/labels/node-selectors in the Kubernetes setup.

This ticket proposes to rename it to 
{{ConfigurationUtils#getPrefixedKeyValuePairs}} and move it to the 
{{flink-core}} module as a more general utility to share for the 
YARN/Kubernetes setup.

  was:
{{BootstrapTools#getEnvironmentVariables}} actually is a general utility to 
extract key-value pairs with specified prefix trimmed from the Flink 
Configuration object. It can not only be used to extract customized environment 
variables in the YARN setup but also for customized 
annotations/labels/node-selectors in the Kubernetes setup.

This ticket proposes to rename it to 
{{ConfigurationUtil#getPrefixedKeyValuePairs}} and move it to the 
{{flink-core}} module as a more general utility to share for the 
YARN/Kubernetes setup.


> Extract BootstrapTools#getEnvironmentVariables to a general utility in 
> ConfigurationUtils
> -
>
> Key: FLINK-16625
> URL: https://issues.apache.org/jira/browse/FLINK-16625
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Core
>Reporter: Canbin Zheng
>Assignee: Canbin Zheng
>Priority: Minor
> Fix For: 1.11.0
>
>
> {{BootstrapTools#getEnvironmentVariables}} actually is a general utility to 
> extract key-value pairs with specified prefix trimmed from the Flink 
> Configuration object. It can not only be used to extract customized 
> environment variables in the YARN setup but also for customized 
> annotations/labels/node-selectors in the Kubernetes setup.
> This ticket proposes to rename it to 
> {{ConfigurationUtils#getPrefixedKeyValuePairs}} and move it to the 
> {{flink-core}} module as a more general utility to share for the 
> YARN/Kubernetes setup.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] xintongsong commented on a change in pull request #11284: [FLINK-15911][runtime] Make Flink work with NAT.

2020-03-19 Thread GitBox
xintongsong commented on a change in pull request #11284: 
[FLINK-15911][runtime] Make Flink work with NAT.
URL: https://github.com/apache/flink/pull/11284#discussion_r395430171
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
 ##
 @@ -248,10 +247,14 @@ protected void initializeServices(Configuration 
configuration) throws Exception
LOG.info("Initializing cluster services.");
 
synchronized (lock) {
-   final String bindAddress = 
configuration.getString(JobManagerOptions.ADDRESS);
-   final String portRange = getRPCPortRange(configuration);
-
-   commonRpcService = createRpcService(configuration, 
bindAddress, portRange);
+   commonRpcService = 
AkkaRpcServiceUtils.createRemoteRpcService(
+   configuration,
+   
configuration.getString(JobManagerOptions.ADDRESS),
+   getRPCPortRange(configuration),
+   
configuration.getString(JobManagerOptions.BIND_HOST),
+   
configuration.contains(JobManagerOptions.RPC_BIND_PORT) ?
+   
configuration.getInteger(JobManagerOptions.RPC_BIND_PORT) :
+   null);
 
 Review comment:
   Same here. I also think using Optional will express the contract better. The 
question is do we need to strictly follow the code style guide.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Assigned] (FLINK-16625) Extract BootstrapTools#getEnvironmentVariables to a general utility in ConfigurationUtil

2020-03-19 Thread Zili Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16625?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zili Chen reassigned FLINK-16625:
-

Assignee: Canbin Zheng

> Extract BootstrapTools#getEnvironmentVariables to a general utility in 
> ConfigurationUtil
> 
>
> Key: FLINK-16625
> URL: https://issues.apache.org/jira/browse/FLINK-16625
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Core
>Reporter: Canbin Zheng
>Assignee: Canbin Zheng
>Priority: Minor
> Fix For: 1.11.0
>
>
> {{BootstrapTools#getEnvironmentVariables}} actually is a general utility to 
> extract key-value pairs with specified prefix trimmed from the Flink 
> Configuration object. It can not only be used to extract customized 
> environment variables in the YARN setup but also for customized 
> annotations/labels/node-selectors in the Kubernetes setup.
> This ticket proposes to rename it to 
> {{ConfigurationUtil#getPrefixedKeyValuePairs}} and move it to the 
> {{flink-core}} module as a more general utility to share for the 
> YARN/Kubernetes setup.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] jiasheng55 commented on a change in pull request #11322: [FLINK-16376][yarn] Use consistent method to get Yarn application dir…

2020-03-19 Thread GitBox
jiasheng55 commented on a change in pull request #11322: [FLINK-16376][yarn] 
Use consistent method to get Yarn application dir…
URL: https://github.com/apache/flink/pull/11322#discussion_r393478330
 
 

 ##
 File path: 
flink-yarn/src/test/java/org/apache/flink/yarn/YarnFileStageTestS3ITCase.java
 ##
 @@ -160,8 +162,12 @@ private void testRecursiveUploadForYarn(String scheme, 
String pathSuffix) throws
 
assumeFalse(fs.exists(basePath));
 
+   // mock the test bucket path as home dir.
+   org.apache.hadoop.fs.FileSystem spyFileSystem = 
spy(fs.getHadoopFileSystem());
+   when(spyFileSystem.getHomeDirectory()).thenReturn(new 
org.apache.hadoop.fs.Path(basePath.toUri()));
 
 Review comment:
   @kl0u you're right, the test path changed from **S3_TEST_BUCKET** to which 
returned by `org.apache.hadoop.fs.FileSystem#getHomeDirectory`.
   I added a few lines to mock the S3_TEST_BUCKET as the home directory, what 
do you think?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xintongsong commented on a change in pull request #11284: [FLINK-15911][runtime] Make Flink work with NAT.

2020-03-19 Thread GitBox
xintongsong commented on a change in pull request #11284: 
[FLINK-15911][runtime] Make Flink work with NAT.
URL: https://github.com/apache/flink/pull/11284#discussion_r395429730
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
 ##
 @@ -94,19 +94,28 @@
" or if it has been quarantined by another 
actor system.");
 
/**
-* The config parameter defining the task manager's hostname.
+* The external address of the network interface where the TaskManager 
is exposed.
 * Overrides {@link #HOST_BIND_POLICY} automatic address binding.
 */
@Documentation.Section({Documentation.Sections.COMMON_HOST_PORT, 
Documentation.Sections.ALL_TASK_MANAGER})
public static final ConfigOption HOST =
key("taskmanager.host")
.stringType()
.noDefaultValue()
-   .withDescription("The address of the network interface 
that the TaskManager binds to." +
-   " This option can be used to define explicitly 
a binding address. Because" +
-   " different TaskManagers need different values 
for this option, usually it is specified in an" +
+   .withDescription("The external address of the network 
interface where the TaskManager is exposed." +
+   " Because different TaskManagers need different 
values for this option, usually it is specified in an" +
" additional non-shared TaskManager-specific 
config file.");
 
+   /**
+* The local address of the network interface that the task manager 
binds to.
+*/
+   public static final ConfigOption BIND_HOST =
+   key("taskmanager.bind-host")
 
 Review comment:
   Same here. TM address and bind-address are shared by RPC service, netty 
shuffle service and queryable kv state.
   
   I'm actually thinking about changing "jobmanager.rpc.address" to 
"jobmanager.host" as a follow up, to make the config keys consistently using 
"host".


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-15447) To improve utilization of the `java.io.tmpdir` for YARN module

2020-03-19 Thread Victor Wong (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15447?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17063064#comment-17063064
 ] 

Victor Wong commented on FLINK-15447:
-

Currently, we can solve this issue through "env.java.opts: 
-Djava.io.tmpdir=./tmp", closing this issue now.

> To improve utilization of the `java.io.tmpdir` for YARN module
> --
>
> Key: FLINK-15447
> URL: https://issues.apache.org/jira/browse/FLINK-15447
> Project: Flink
>  Issue Type: Improvement
>  Components: Deployment / YARN
>Affects Versions: 1.9.1
>Reporter: Victor Wong
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> *#Background*
> Currently, when running Flink on Yarn, the "java.io.tmpdir" property is set 
> to the default value, which is "/tmp".  
> Sometimes we ran into exceptions caused by a full "/tmp" directory, which 
> would not be cleaned automatically after applications finished.
>  
> #*Goal*
> quoted from: [HADOOP-2735|https://issues.apache.org/jira/browse/HADOOP-2735]
> _1) Tasks can utilize all disks when using tmp_
>  _2) Any undeleted tmp files will be deleted by the tasktracker when 
> task(job?) is done._
>  
> #*Suggestion*
> I think we can set "java.io.tmpdir" to "PWD/tmp" directory, or 
> something similar. "PWD" will be replaced with the true working 
> directory of JM/TM by Yarn, which will be cleaned automatically.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-15447) To improve utilization of the `java.io.tmpdir` for YARN module

2020-03-19 Thread Victor Wong (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15447?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Victor Wong closed FLINK-15447.
---
Resolution: Not A Problem

> To improve utilization of the `java.io.tmpdir` for YARN module
> --
>
> Key: FLINK-15447
> URL: https://issues.apache.org/jira/browse/FLINK-15447
> Project: Flink
>  Issue Type: Improvement
>  Components: Deployment / YARN
>Affects Versions: 1.9.1
>Reporter: Victor Wong
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> *#Background*
> Currently, when running Flink on Yarn, the "java.io.tmpdir" property is set 
> to the default value, which is "/tmp".  
> Sometimes we ran into exceptions caused by a full "/tmp" directory, which 
> would not be cleaned automatically after applications finished.
>  
> #*Goal*
> quoted from: [HADOOP-2735|https://issues.apache.org/jira/browse/HADOOP-2735]
> _1) Tasks can utilize all disks when using tmp_
>  _2) Any undeleted tmp files will be deleted by the tasktracker when 
> task(job?) is done._
>  
> #*Suggestion*
> I think we can set "java.io.tmpdir" to "PWD/tmp" directory, or 
> something similar. "PWD" will be replaced with the true working 
> directory of JM/TM by Yarn, which will be cleaned automatically.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] ifndef-SleePy commented on a change in pull request #11347: [FLINK-14971][checkpointing] Make all the non-IO operations in CheckpointCoordinator single-threaded

2020-03-19 Thread GitBox
ifndef-SleePy commented on a change in pull request #11347: 
[FLINK-14971][checkpointing] Make all the non-IO operations in 
CheckpointCoordinator single-threaded
URL: https://github.com/apache/flink/pull/11347#discussion_r395429422
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 ##
 @@ -91,9 +92,6 @@
 
// 

 
-   /** Coordinator-wide lock to safeguard the checkpoint updates. */
-   private final Object lock = new Object();
 
 Review comment:
   Sure, nice point!


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] ifndef-SleePy commented on a change in pull request #11347: [FLINK-14971][checkpointing] Make all the non-IO operations in CheckpointCoordinator single-threaded

2020-03-19 Thread GitBox
ifndef-SleePy commented on a change in pull request #11347: 
[FLINK-14971][checkpointing] Make all the non-IO operations in 
CheckpointCoordinator single-threaded
URL: https://github.com/apache/flink/pull/11347#discussion_r395429228
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
 ##
 @@ -111,6 +108,14 @@
/** The executor for potentially blocking I/O operations, like state 
disposal. */
private final Executor executor;
 
+   /** The executor for non-blocking operations. */
+   private final Executor mainThreadExecutor;
+
+   private final CompletedCheckpointStore completedCheckpointStore;
+
+   /** The lock for avoiding conflict between I/O operations. */
+   private final Object operationLock = new Object();
 
 Review comment:
   Yes, there is a small possibility that the `CheckpointCoordinator` is shut 
down when a `PendingCheckpoint` is doing finalization. There could be some 
concurrent conflicts on `operatorStates` and `targetLocation`. It might be not 
a big deal because it would be shut down anyway. The finalization probably 
could not finish because the IO executor would be also shut down. However it's 
not so elegant to leave the concurrent issue to the `CheckpointStorageLocation` 
and `OperatorState`. And it's a bit heavy to make all of these implementations 
thread-safe to avoid the small possibility issue. So here I think introducing a 
lock outside is better.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Assigned] (FLINK-16687) PyFlink Cannot determine simple type name "PythonScalarFunction$0"

2020-03-19 Thread Dian Fu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16687?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dian Fu reassigned FLINK-16687:
---

Assignee: Huang Xingbo

> PyFlink Cannot determine simple type name "PythonScalarFunction$0"
> --
>
> Key: FLINK-16687
> URL: https://issues.apache.org/jira/browse/FLINK-16687
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python
>Affects Versions: 1.10.0
>Reporter: mayne wong
>Assignee: Huang Xingbo
>Priority: Major
>
>  
> I try to run PyFlink UDF with SQL UNNEST, execution of job failed, I defined 
> a source from element, and use UDF split the string to list.
> raise org.codehaus.commons.compiler.CompileException: Cannot determine simple 
> type name "PythonScalarFunction$0"
> {code:python}
> import os
> from pyflink.table.udf import udf
> from pyflink.datastream import StreamExecutionEnvironment
> from pyflink.table import StreamTableEnvironment, DataTypes, CsvTableSink
> @udf(input_types=[DataTypes.STRING()], 
> result_type=DataTypes.ARRAY(DataTypes.STRING()))
> def format_string_to_array(item):
> return item.replace('[', '').replace(']', '').replace(', ', 
> ',').split(',')
> if __name__ == '__main__':
> env = StreamExecutionEnvironment.get_execution_environment()
> env.set_parallelism(1)
> st_env = StreamTableEnvironment.create(env)
> result_file = "result.csv"
> if os.path.exists(result_file):
> os.remove(result_file)
> st_env.register_table_sink("result_tab",  CsvTableSink(["id", "url"], 
> [DataTypes.STRING(), DataTypes.STRING()],  result_file))
> st_env.register_function("format_string_to_array", format_string_to_array)
> tab = st_env.from_elements([("1", "['www.bing.com', 'www.google.com']"), 
> ("2", "['www.taobao.com']")], ['id', 'urls'])
> st_env.register_table("temp_table", tab)
> st_env.sql_query("Select id, A.url from temp_table, 
> UNNEST(format_string_to_array(temp_table.urls)) AS 
> A(url)").insert_into("result_tab")
> st_env.execute("udf")
> {code}
>  
> When I execute the program, I get the following exception:
>  
> {code:java}
> py4j.protocol.Py4JJavaError: An error occurred while calling o2.execute.
> : java.util.concurrent.ExecutionException: 
> org.apache.flink.client.program.ProgramInvocationException: Job failed 
> (JobID: 5d63838ad2043bf4a5d0bca83623959d)
> at 
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
> at 
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
> at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1640)
> at 
> org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:74)
> at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1620)
> at 
> org.apache.flink.table.executor.StreamExecutor.execute(StreamExecutor.java:50)
> at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:643)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at 
> org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
> at 
> org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
> at 
> org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
> at 
> org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
> at 
> org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
> at 
> org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.client.program.ProgramInvocationException: Job 
> failed (JobID: 5d63838ad2043bf4a5d0bca83623959d)
> at 
> org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:112)
> at 
> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
> at 
> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
> at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
> at 
> 

[GitHub] [flink] jiasheng55 closed pull request #11331: [FLINK-15447][yarn] Add config to define 'java.io.tmpdir' property of…

2020-03-19 Thread GitBox
jiasheng55 closed pull request #11331: [FLINK-15447][yarn] Add config to define 
'java.io.tmpdir' property of…
URL: https://github.com/apache/flink/pull/11331
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xintongsong commented on a change in pull request #11284: [FLINK-15911][runtime] Make Flink work with NAT.

2020-03-19 Thread GitBox
xintongsong commented on a change in pull request #11284: 
[FLINK-15911][runtime] Make Flink work with NAT.
URL: https://github.com/apache/flink/pull/11284#discussion_r395428193
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
 ##
 @@ -54,6 +54,16 @@
" leader-election service (like ZooKeeper) is used to 
elect and discover the JobManager" +
" leader from potentially multiple standby 
JobManagers.");
 
+   /**
+* The local address of the network interface that the job manager 
binds to.
+*/
+   public static final ConfigOption BIND_HOST =
+   key("jobmanager.bind-host")
 
 Review comment:
   This address is not only used by the RPC service, but also shared by the 
blob server, rest server, and potentially any other service in future that 
needs to bind to the network interface.
   
   To that end, I think the config option for TM ("taskmanager.host") makes 
more sense. I guess that might be the reason we use "host" in the config key at 
the first place.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xintongsong commented on a change in pull request #11284: [FLINK-15911][runtime] Make Flink work with NAT.

2020-03-19 Thread GitBox
xintongsong commented on a change in pull request #11284: 
[FLINK-15911][runtime] Make Flink work with NAT.
URL: https://github.com/apache/flink/pull/11284#discussion_r395426701
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
 ##
 @@ -261,26 +258,30 @@ public void start() throws Exception {
// bring up all the RPC services
LOG.info("Starting RPC Service(s)");
 
-   AkkaRpcServiceConfiguration 
akkaRpcServiceConfig = 
AkkaRpcServiceConfiguration.fromConfiguration(configuration);
-
final RpcServiceFactory 
dispatcherResourceManagreComponentRpcServiceFactory;
 
if (useSingleRpcService) {
// we always need the 
'commonRpcService' for auxiliary calls
-   commonRpcService = 
createRpcService(akkaRpcServiceConfig, false, null);
+   commonRpcService = 
createLocalRpcService(configuration);
final CommonRpcServiceFactory 
commonRpcServiceFactory = new CommonRpcServiceFactory(commonRpcService);
taskManagerRpcServiceFactory = 
commonRpcServiceFactory;

dispatcherResourceManagreComponentRpcServiceFactory = commonRpcServiceFactory;
} else {
-   // we always need the 
'commonRpcService' for auxiliary calls
-   commonRpcService = 
createRpcService(akkaRpcServiceConfig, true, null);
 
// start a new service per component, 
possibly with custom bind addresses
final String jobManagerBindAddress = 
miniClusterConfiguration.getJobManagerBindAddress();
final String taskManagerBindAddress = 
miniClusterConfiguration.getTaskManagerBindAddress();
+   final String jobManagerBindPort = 
miniClusterConfiguration.getJobManagerBindPortRange();
+   final String taskManagerBindPort = 
miniClusterConfiguration.getTaskManagerBindPortRange();
 
 Review comment:
   Please see my explanation above.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot commented on issue #11457: [FLINK-14255][WIP][table] Introduce FileSystemStreamingSinkV2

2020-03-19 Thread GitBox
flinkbot commented on issue #11457: [FLINK-14255][WIP][table] Introduce 
FileSystemStreamingSinkV2
URL: https://github.com/apache/flink/pull/11457#issuecomment-601511488
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Automated Checks
   Last check on commit 2d43f0faeabc760120d3e0cbfc9632e42b2cfab6 (Fri Mar 20 
03:39:44 UTC 2020)
   
   **Warnings:**
* No documentation files were touched! Remember to keep the Flink docs up 
to date!
   
   
   Mention the bot in a comment to re-run the automated checks.
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full 
explanation of the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xintongsong commented on a change in pull request #11284: [FLINK-15911][runtime] Make Flink work with NAT.

2020-03-19 Thread GitBox
xintongsong commented on a change in pull request #11284: 
[FLINK-15911][runtime] Make Flink work with NAT.
URL: https://github.com/apache/flink/pull/11284#discussion_r395426615
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
 ##
 @@ -261,26 +258,30 @@ public void start() throws Exception {
// bring up all the RPC services
LOG.info("Starting RPC Service(s)");
 
-   AkkaRpcServiceConfiguration 
akkaRpcServiceConfig = 
AkkaRpcServiceConfiguration.fromConfiguration(configuration);
-
final RpcServiceFactory 
dispatcherResourceManagreComponentRpcServiceFactory;
 
if (useSingleRpcService) {
// we always need the 
'commonRpcService' for auxiliary calls
-   commonRpcService = 
createRpcService(akkaRpcServiceConfig, false, null);
+   commonRpcService = 
createLocalRpcService(configuration);
final CommonRpcServiceFactory 
commonRpcServiceFactory = new CommonRpcServiceFactory(commonRpcService);
taskManagerRpcServiceFactory = 
commonRpcServiceFactory;

dispatcherResourceManagreComponentRpcServiceFactory = commonRpcServiceFactory;
} else {
-   // we always need the 
'commonRpcService' for auxiliary calls
-   commonRpcService = 
createRpcService(akkaRpcServiceConfig, true, null);
 
// start a new service per component, 
possibly with custom bind addresses
final String jobManagerBindAddress = 
miniClusterConfiguration.getJobManagerBindAddress();
final String taskManagerBindAddress = 
miniClusterConfiguration.getTaskManagerBindAddress();
+   final String jobManagerBindPort = 
miniClusterConfiguration.getJobManagerBindPortRange();
 
 Review comment:
   No, and I don't think we should.
   
   Whenever the user needs to specify a bind port apart form the external port, 
the user also needs to set port forwarding rules in the environment, e.g. NAT 
gateway. For each external port, it can only be forwarded to one internal port. 
Therefore, it does not make sense to support a range of bind ports, because the 
user needs to know which internal port is bound anyway.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi opened a new pull request #11457: [FLINK-14255][WIP][table] Introduce FileSystemStreamingSinkV2

2020-03-19 Thread GitBox
JingsongLi opened a new pull request #11457: [FLINK-14255][WIP][table] 
Introduce FileSystemStreamingSinkV2
URL: https://github.com/apache/flink/pull/11457
 
 
   ## What is the purpose of the change
   
   Build table streaming file sink based on DataStream StreamingFileSink.
   
   NOTE: Work in progress, for testing.
   
   ## Brief change log
   
   - Introduce FileSystemStreamingSink
   - Integrate hive to FileSystemStreamingSink
   
   ## Verifying this change
   
   FileSystemStreamingSinkTest
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes)
 - If yes, how is the feature documented? JavaDocs
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xintongsong commented on a change in pull request #11284: [FLINK-15911][runtime] Make Flink work with NAT.

2020-03-19 Thread GitBox
xintongsong commented on a change in pull request #11284: 
[FLINK-15911][runtime] Make Flink work with NAT.
URL: https://github.com/apache/flink/pull/11284#discussion_r395425038
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
 ##
 @@ -82,92 +82,80 @@
.build();
 
/**
-* Starts an ActorSystem with the given configuration listening at the 
address/ports.
+* Starts a remote ActorSystem at given address and specific port range.
 * @param configuration The Flink configuration
-* @param listeningAddress The address to listen at.
-* @param portRangeDefinition The port range to choose a port from.
+* @param externalAddress The external address to access the 
ActorSystem.
+* @param externalPortRange The choosing range of the external port to 
access the ActorSystem.
 * @param logger The logger to output log information.
 * @return The ActorSystem which has been started
 * @throws Exception Thrown when actor system cannot be started in 
specified port range
 */
-   public static ActorSystem startActorSystem(
+   @VisibleForTesting
+   public static ActorSystem startRemoteActorSystem(
Configuration configuration,
-   String listeningAddress,
-   String portRangeDefinition,
+   String externalAddress,
+   String externalPortRange,
Logger logger) throws Exception {
-   return startActorSystem(
-   configuration,
-   listeningAddress,
-   portRangeDefinition,
-   logger,
-   
ForkJoinExecutorConfiguration.fromConfiguration(configuration));
-   }
-
-   /**
-* Starts an ActorSystem with the given configuration listening at the 
address/ports.
-*
-* @param configuration The Flink configuration
-* @param listeningAddress The address to listen at.
-* @param portRangeDefinition The port range to choose a port from.
-* @param logger The logger to output log information.
-* @param actorSystemExecutorConfiguration configuration for the 
ActorSystem's underlying executor
-* @return The ActorSystem which has been started
-* @throws Exception Thrown when actor system cannot be started in 
specified port range
-*/
-   public static ActorSystem startActorSystem(
-   Configuration configuration,
-   String listeningAddress,
-   String portRangeDefinition,
-   Logger logger,
-   @Nonnull ActorSystemExecutorConfiguration 
actorSystemExecutorConfiguration) throws Exception {
-   return startActorSystem(
+   return startRemoteActorSystem(
configuration,
AkkaUtils.getFlinkActorSystemName(),
-   listeningAddress,
-   portRangeDefinition,
+   externalAddress,
+   externalPortRange,
+   NetUtils.getWildcardIPAddress(),
+   -1,
logger,
-   actorSystemExecutorConfiguration);
+   
ForkJoinExecutorConfiguration.fromConfiguration(configuration),
+   null);
}
 
/**
-* Starts an ActorSystem with the given configuration listening at the 
address/ports.
+* Starts a remote ActorSystem at given address and specific port range.
 *
 * @param configuration The Flink configuration
 * @param actorSystemName Name of the started {@link ActorSystem}
-* @param listeningAddress The address to listen at.
-* @param portRangeDefinition The port range to choose a port from.
+* @param externalAddress The external address to access the 
ActorSystem.
+* @param externalPortRange The choosing range of the external port to 
access the ActorSystem.
+* @param bindAddress The local address to bind to.
+* @param bindPort The local port to bind to. If negative, external 
port will be used.
 * @param logger The logger to output log information.
 * @param actorSystemExecutorConfiguration configuration for the 
ActorSystem's underlying executor
+* @param customConfig Custom Akka config to be combined with the 
config derived from Flink configuration.
 * @return The ActorSystem which has been started
 * @throws Exception Thrown when actor system cannot be started in 
specified port range
 */
-   public static ActorSystem startActorSystem(
+   public static ActorSystem startRemoteActorSystem(
Configuration configuration,
String 

[GitHub] [flink] flinkbot edited a comment on issue #11456: [FLINK-16602][k8s] Rework the internal/external Service

2020-03-19 Thread GitBox
flinkbot edited a comment on issue #11456: [FLINK-16602][k8s] Rework the 
internal/external Service
URL: https://github.com/apache/flink/pull/11456#issuecomment-601501414
 
 
   
   ## CI report:
   
   * e4c3073a6fb379bacb4da0e299245a73ea2a97d4 Travis: 
[PENDING](https://travis-ci.com/github/flink-ci/flink/builds/154194249) Azure: 
[FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=6429)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xintongsong commented on a change in pull request #11284: [FLINK-15911][runtime] Make Flink work with NAT.

2020-03-19 Thread GitBox
xintongsong commented on a change in pull request #11284: 
[FLINK-15911][runtime] Make Flink work with NAT.
URL: https://github.com/apache/flink/pull/11284#discussion_r395425038
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
 ##
 @@ -82,92 +82,80 @@
.build();
 
/**
-* Starts an ActorSystem with the given configuration listening at the 
address/ports.
+* Starts a remote ActorSystem at given address and specific port range.
 * @param configuration The Flink configuration
-* @param listeningAddress The address to listen at.
-* @param portRangeDefinition The port range to choose a port from.
+* @param externalAddress The external address to access the 
ActorSystem.
+* @param externalPortRange The choosing range of the external port to 
access the ActorSystem.
 * @param logger The logger to output log information.
 * @return The ActorSystem which has been started
 * @throws Exception Thrown when actor system cannot be started in 
specified port range
 */
-   public static ActorSystem startActorSystem(
+   @VisibleForTesting
+   public static ActorSystem startRemoteActorSystem(
Configuration configuration,
-   String listeningAddress,
-   String portRangeDefinition,
+   String externalAddress,
+   String externalPortRange,
Logger logger) throws Exception {
-   return startActorSystem(
-   configuration,
-   listeningAddress,
-   portRangeDefinition,
-   logger,
-   
ForkJoinExecutorConfiguration.fromConfiguration(configuration));
-   }
-
-   /**
-* Starts an ActorSystem with the given configuration listening at the 
address/ports.
-*
-* @param configuration The Flink configuration
-* @param listeningAddress The address to listen at.
-* @param portRangeDefinition The port range to choose a port from.
-* @param logger The logger to output log information.
-* @param actorSystemExecutorConfiguration configuration for the 
ActorSystem's underlying executor
-* @return The ActorSystem which has been started
-* @throws Exception Thrown when actor system cannot be started in 
specified port range
-*/
-   public static ActorSystem startActorSystem(
-   Configuration configuration,
-   String listeningAddress,
-   String portRangeDefinition,
-   Logger logger,
-   @Nonnull ActorSystemExecutorConfiguration 
actorSystemExecutorConfiguration) throws Exception {
-   return startActorSystem(
+   return startRemoteActorSystem(
configuration,
AkkaUtils.getFlinkActorSystemName(),
-   listeningAddress,
-   portRangeDefinition,
+   externalAddress,
+   externalPortRange,
+   NetUtils.getWildcardIPAddress(),
+   -1,
logger,
-   actorSystemExecutorConfiguration);
+   
ForkJoinExecutorConfiguration.fromConfiguration(configuration),
+   null);
}
 
/**
-* Starts an ActorSystem with the given configuration listening at the 
address/ports.
+* Starts a remote ActorSystem at given address and specific port range.
 *
 * @param configuration The Flink configuration
 * @param actorSystemName Name of the started {@link ActorSystem}
-* @param listeningAddress The address to listen at.
-* @param portRangeDefinition The port range to choose a port from.
+* @param externalAddress The external address to access the 
ActorSystem.
+* @param externalPortRange The choosing range of the external port to 
access the ActorSystem.
+* @param bindAddress The local address to bind to.
+* @param bindPort The local port to bind to. If negative, external 
port will be used.
 * @param logger The logger to output log information.
 * @param actorSystemExecutorConfiguration configuration for the 
ActorSystem's underlying executor
+* @param customConfig Custom Akka config to be combined with the 
config derived from Flink configuration.
 * @return The ActorSystem which has been started
 * @throws Exception Thrown when actor system cannot be started in 
specified port range
 */
-   public static ActorSystem startActorSystem(
+   public static ActorSystem startRemoteActorSystem(
Configuration configuration,
String 

[GitHub] [flink] flinkbot edited a comment on issue #11415: [FLINK-15667][k8s] Support to mount custom Hadoop configurations

2020-03-19 Thread GitBox
flinkbot edited a comment on issue #11415: [FLINK-15667][k8s] Support to mount 
custom Hadoop configurations
URL: https://github.com/apache/flink/pull/11415#issuecomment-599458187
 
 
   
   ## CI report:
   
   * abc66a0d2605b380ed890dd23e4ff19c9a65ed6a Travis: 
[FAILURE](https://travis-ci.com/github/flink-ci/flink/builds/154195224) Azure: 
[FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=6430)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] liuzhixing1006 commented on issue #11391: [FLINK-16098] [chinese-translation, Documentation] Translate "Overview" page of "Hive Integration" into Chinese

2020-03-19 Thread GitBox
liuzhixing1006 commented on issue #11391: [FLINK-16098] [chinese-translation, 
Documentation] Translate "Overview" page of "Hive Integration" into Chinese
URL: https://github.com/apache/flink/pull/11391#issuecomment-601505120
 
 
   Thank you @JingsongLi  for bringing this problem to my attention, There's 
something wrong with this branch, I created a new pr 
[https://github.com/apache/flink/pull/11455](New PR).
   I'm sorry to have caused so much trouble.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #11456: [FLINK-16602][k8s] Rework the internal/external Service

2020-03-19 Thread GitBox
flinkbot edited a comment on issue #11456: [FLINK-16602][k8s] Rework the 
internal/external Service
URL: https://github.com/apache/flink/pull/11456#issuecomment-601501414
 
 
   
   ## CI report:
   
   * e4c3073a6fb379bacb4da0e299245a73ea2a97d4 Travis: 
[PENDING](https://travis-ci.com/github/flink-ci/flink/builds/154194249) Azure: 
[PENDING](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=6429)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #11415: [FLINK-15667][k8s] Support to mount custom Hadoop configurations

2020-03-19 Thread GitBox
flinkbot edited a comment on issue #11415: [FLINK-15667][k8s] Support to mount 
custom Hadoop configurations
URL: https://github.com/apache/flink/pull/11415#issuecomment-599458187
 
 
   
   ## CI report:
   
   * 64eaf715bef2bfc9492ef197ecec62d3e067d18d Travis: 
[PENDING](https://travis-ci.com/github/flink-ci/flink/builds/154064206) Azure: 
[FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=6399)
 
   * abc66a0d2605b380ed890dd23e4ff19c9a65ed6a UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] jinglining commented on a change in pull request #11250: [FLINK-16302][rest]add log list and read log by name for taskmanager

2020-03-19 Thread GitBox
jinglining commented on a change in pull request #11250: [FLINK-16302][rest]add 
log list and read log by name for taskmanager
URL: https://github.com/apache/flink/pull/11250#discussion_r395418363
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagerLogsHandlerTest.java
 ##
 @@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.taskmanager;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import 
org.apache.flink.runtime.resourcemanager.exceptions.UnknownTaskExecutorException;
+import 
org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.HandlerRequestException;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.taskmanager.LogInfo;
+import org.apache.flink.runtime.rest.messages.taskmanager.LogsInfo;
+import 
org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerIdPathParameter;
+import 
org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerLogsHeaders;
+import 
org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerMessageParameters;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Test for the {@link TaskManagerLogsHandler}.
+ */
+public class TaskManagerLogsHandlerTest extends TestLogger {
+
+   private static final ResourceID EXPECTED_TASK_MANAGER_ID = 
ResourceID.generate();
+
+   @Test
+   public void testGetTaskManagerLogsList() throws Exception {
+   final TestingResourceManagerGateway resourceManagerGateway = 
new TestingResourceManagerGateway();
+   final TaskManagerLogsHandler taskManagerLogsHandler = new 
TaskManagerLogsHandler(
+   () -> CompletableFuture.completedFuture(null),
+   TestingUtils.TIMEOUT(),
+   Collections.emptyMap(),
+   TaskManagerLogsHeaders.getInstance(),
+   () -> 
CompletableFuture.completedFuture(resourceManagerGateway));
+   final HandlerRequest handlerRequest = 
createRequest(EXPECTED_TASK_MANAGER_ID);
+   List logsList = new ArrayList<>();
+   logsList.add(new LogInfo("taskmanager.log", 1024L));
+   logsList.add(new LogInfo("taskmanager.out", 1024L));
+   logsList.add(new LogInfo("taskmanager-2.out", 1024L));
+   
resourceManagerGateway.setRequestTaskManagerLogListFunction(EXPECTED_TASK_MANAGER_ID
 -> CompletableFuture.completedFuture(logsList));
+   LogsInfo logsInfo = 
taskManagerLogsHandler.handleRequest(handlerRequest, 
resourceManagerGateway).get();
+   assertEquals(logsInfo.getLogInfos().size(), 
resourceManagerGateway.requestTaskManagerLogList(EXPECTED_TASK_MANAGER_ID, 
TestingUtils.TIMEOUT()).get().size());
+   }
+
+   @Test
+   public void testGetTaskManagerLogsListForUnknownTaskExecutorException() 
throws Exception {
+   final TestingResourceManagerGateway resourceManagerGateway = 
new TestingResourceManagerGateway();
+   final TaskManagerLogsHandler taskManagerLogsHandler = new 
TaskManagerLogsHandler(
+   () -> CompletableFuture.completedFuture(null),
+   TestingUtils.TIMEOUT(),
+   Collections.emptyMap(),
+   TaskManagerLogsHeaders.getInstance(),
+   () -> 
CompletableFuture.completedFuture(resourceManagerGateway));
+   final HandlerRequest handlerRequest = 
createRequest(EXPECTED_TASK_MANAGER_ID);
+   

[jira] [Commented] (FLINK-16482) Flink Job throw CloseException when call the FlinkKafkaConsumer cancel function

2020-03-19 Thread likang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16482?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17063042#comment-17063042
 ] 

likang commented on FLINK-16482:


Thank you very much

> Flink Job throw CloseException when call the FlinkKafkaConsumer cancel 
> function
> ---
>
> Key: FLINK-16482
> URL: https://issues.apache.org/jira/browse/FLINK-16482
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Reporter: likang
>Priority: Critical
>  Labels: pull-request-available
> Attachments: The bug and solution.docx
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
>             *Background:*
>             Today I tried to detect my Flink job with a timing thread, and if 
> the job did not read the data for a long time, it automatically exited. But 
> when I detect the read timeout and call the cancel function of 
> FlinkKafkaConsumer, I find that a CloseException is thrown, and then Flink's 
> recovery mechanism considers that it exited abnormally and re-puller the task.
>            *Bug*:
>           I checked the Cancel code of the FlinkKafkaConsumer code, and found 
> that in fact, the Cancel of KafkaFetcher was first called, then the Close () 
> of Handover was called, and then the shutdown () of KafkaConsumerThread was 
> called. Finally, the KafkaConsumerThread thread exited the while loop and 
> called once after detecting the running identifier. Handover's Close ().
>      There will be several problems here: 1. CloseException will be thrown 
> when Handover is called in Cancel of KafkaFetcher, here need to remove the 
> call of handover.close () 2. The thread in KafkaConsumerThread exits because 
> of running = false After the loop, you need to determine whether to exit 
> normally. You should not call handover.close () for normal exit, otherwise 
> you will also throw a CloseException.
>       Final the  details and solutions are in the attachment



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot commented on issue #11456: [FLINK-16602][k8s] Rework the internal/external Service

2020-03-19 Thread GitBox
flinkbot commented on issue #11456: [FLINK-16602][k8s] Rework the 
internal/external Service
URL: https://github.com/apache/flink/pull/11456#issuecomment-601501414
 
 
   
   ## CI report:
   
   * e4c3073a6fb379bacb4da0e299245a73ea2a97d4 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #11421: [FLINK-15962][network] Reduce the default chunk size to 4M in netty stack

2020-03-19 Thread GitBox
flinkbot edited a comment on issue #11421: [FLINK-15962][network] Reduce the 
default chunk size to 4M in netty stack
URL: https://github.com/apache/flink/pull/11421#issuecomment-599826019
 
 
   
   ## CI report:
   
   * 394192b70d312009146b91bda454b5297ad3b036 Travis: 
[SUCCESS](https://travis-ci.com/github/flink-ci/flink/builds/154189440) Azure: 
[FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=6427)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhengcanbin commented on issue #11415: [FLINK-15667][k8s] Support to mount custom Hadoop configurations

2020-03-19 Thread GitBox
zhengcanbin commented on issue #11415: [FLINK-15667][k8s] Support to mount 
custom Hadoop configurations
URL: https://github.com/apache/flink/pull/11415#issuecomment-601500881
 
 
   @flinkbot run travis


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhengcanbin commented on issue #11415: [FLINK-15667][k8s] Support to mount custom Hadoop configurations

2020-03-19 Thread GitBox
zhengcanbin commented on issue #11415: [FLINK-15667][k8s] Support to mount 
custom Hadoop configurations
URL: https://github.com/apache/flink/pull/11415#issuecomment-601500915
 
 
   @flinkbot run azure


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot commented on issue #11456: [FLINK-16602][k8s] Rework the internal/external Service

2020-03-19 Thread GitBox
flinkbot commented on issue #11456: [FLINK-16602][k8s] Rework the 
internal/external Service
URL: https://github.com/apache/flink/pull/11456#issuecomment-601500094
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Automated Checks
   Last check on commit e4c3073a6fb379bacb4da0e299245a73ea2a97d4 (Fri Mar 20 
02:29:50 UTC 2020)
   
   **Warnings:**
* No documentation files were touched! Remember to keep the Flink docs up 
to date!
   
   
   Mention the bot in a comment to re-run the automated checks.
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full 
explanation of the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-16602) Rework the Service design for Kubernetes deployment

2020-03-19 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16602?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-16602:
---
Labels: pull-request-available  (was: )

> Rework the Service design for Kubernetes deployment
> ---
>
> Key: FLINK-16602
> URL: https://issues.apache.org/jira/browse/FLINK-16602
> Project: Flink
>  Issue Type: Improvement
>  Components: Deployment / Kubernetes
>Affects Versions: 1.10.0
>Reporter: Canbin Zheng
>Assignee: Canbin Zheng
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.11.0
>
>
> {color:#0e101a}At the moment we usually create two Services for a Flink 
> application, one is the internal Service and the other is the so-called rest 
> Service, the previous aims for forwarding request from the TMs to the JM, and 
> the rest Service mainly serves as an external service for the Flink 
> application. Here is a summary of the issues:{color}
>  # {color:#0e101a}The functionality boundary of the two Services is not clear 
> enough since the internal Service could also become the rest Service when its 
> exposed type is ClusterIP.{color}
>  # {color:#0e101a}For the high availability scenario, we create a useless 
> internal Service which does not help forward the internal requests since the 
> TMs directly communicate with the JM via the IP or hostname of the JM 
> Pod.{color}
>  # {color:#0e101a}Headless service is enough to help forward the internal 
> requests from the TMs to the JM. Service of ClusterIP type would add 
> corresponding rules into the iptables, too many rules in the iptables would 
> lower the kube-proxy's efficiency in refreshing iptables while notified of 
> change events, which could possibly cause severe stability problems in a 
> Kubernetes cluster.{color}
>  
> {color:#0e101a}Therefore, we propose some improvements to the current 
> design:{color}
>  # {color:#0e101a}Clarify the functionality boundary for the two Services, 
> the internal Service only serves the internal communication from TMs to JM, 
> while the rest Service makes the Flink cluster accessible from outside. The 
> internal Service only exposes the RPC and BLOB ports while the external one 
> exposes the REST port.{color}
>  # {color:#0e101a}Do not create the internal Service in the high availability 
> case.{color}
>  # {color:#0e101a}Use HEADLESS type for the internal Service.{color}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-16687) PyFlink Cannot determine simple type name "PythonScalarFunction$0"

2020-03-19 Thread Dian Fu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16687?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dian Fu updated FLINK-16687:

Priority: Major  (was: Blocker)

> PyFlink Cannot determine simple type name "PythonScalarFunction$0"
> --
>
> Key: FLINK-16687
> URL: https://issues.apache.org/jira/browse/FLINK-16687
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python
>Affects Versions: 1.10.0
>Reporter: mayne wong
>Priority: Major
>
>  
> I try to run PyFlink UDF with SQL UNNEST, execution of job failed, I defined 
> a source from element, and use UDF split the string to list.
> raise org.codehaus.commons.compiler.CompileException: Cannot determine simple 
> type name "PythonScalarFunction$0"
> {code:python}
> import os
> from pyflink.table.udf import udf
> from pyflink.datastream import StreamExecutionEnvironment
> from pyflink.table import StreamTableEnvironment, DataTypes, CsvTableSink
> @udf(input_types=[DataTypes.STRING()], 
> result_type=DataTypes.ARRAY(DataTypes.STRING()))
> def format_string_to_array(item):
> return item.replace('[', '').replace(']', '').replace(', ', 
> ',').split(',')
> if __name__ == '__main__':
> env = StreamExecutionEnvironment.get_execution_environment()
> env.set_parallelism(1)
> st_env = StreamTableEnvironment.create(env)
> result_file = "result.csv"
> if os.path.exists(result_file):
> os.remove(result_file)
> st_env.register_table_sink("result_tab",  CsvTableSink(["id", "url"], 
> [DataTypes.STRING(), DataTypes.STRING()],  result_file))
> st_env.register_function("format_string_to_array", format_string_to_array)
> tab = st_env.from_elements([("1", "['www.bing.com', 'www.google.com']"), 
> ("2", "['www.taobao.com']")], ['id', 'urls'])
> st_env.register_table("temp_table", tab)
> st_env.sql_query("Select id, A.url from temp_table, 
> UNNEST(format_string_to_array(temp_table.urls)) AS 
> A(url)").insert_into("result_tab")
> st_env.execute("udf")
> {code}
>  
> When I execute the program, I get the following exception:
>  
> {code:java}
> py4j.protocol.Py4JJavaError: An error occurred while calling o2.execute.
> : java.util.concurrent.ExecutionException: 
> org.apache.flink.client.program.ProgramInvocationException: Job failed 
> (JobID: 5d63838ad2043bf4a5d0bca83623959d)
> at 
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
> at 
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
> at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1640)
> at 
> org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:74)
> at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1620)
> at 
> org.apache.flink.table.executor.StreamExecutor.execute(StreamExecutor.java:50)
> at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:643)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at 
> org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
> at 
> org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
> at 
> org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
> at 
> org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
> at 
> org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
> at 
> org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.client.program.ProgramInvocationException: Job 
> failed (JobID: 5d63838ad2043bf4a5d0bca83623959d)
> at 
> org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:112)
> at 
> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
> at 
> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
> at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
> at 
> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
> at 
> 

[GitHub] [flink] zhengcanbin opened a new pull request #11456: [FLINK-16602][k8s] Rework the internal/external Service

2020-03-19 Thread GitBox
zhengcanbin opened a new pull request #11456: [FLINK-16602][k8s] Rework the 
internal/external Service
URL: https://github.com/apache/flink/pull/11456
 
 
   ## What is the purpose of the change
   
   At the moment we usually create two Services for a Flink application, one is 
the internal Service and the other is the so-called rest Service, the previous 
aims for forwarding request from the TMs to the JM, and the rest Service mainly 
serves as an external service for the Flink application. Here is a summary of 
the issues:
   
   1. The functionality boundary of the two Services is not clear enough since 
the internal Service could also become the rest Service when its exposed type 
is ClusterIP.
   2. For the high availability scenario, we create a useless internal Service 
which does not help forward the internal requests since the TMs directly 
communicate with the JM via the IP or hostname of the JM Pod.
   3. Headless service is enough to help forward the internal requests from the 
TMs to the JM. Service of ClusterIP type would add corresponding rules into the 
iptables, too many rules in the iptables would lower the kube-proxy's 
efficiency in refreshing iptables while notified of change events, which could 
possibly cause severe stability problems in a Kubernetes cluster.
   
   Therefore, we propose some improvements to the current design:
   
   1. Clarify the functionality boundary for the two Services, the internal 
Service only serves the internal communication from TMs to JM, while the rest 
Service makes the Flink cluster accessible from outside. The internal Service 
only exposes the RPC and BLOB ports while the external one exposes the REST 
port.
   2. Do not create the internal Service in the high availability case.
   3. Use HEADLESS type for the internal Service.
   
   
   ## Verifying this change
   
   This change added unit tests and can be verified on a real K8s cluster as 
follows:
   
   1. Deploy a non-HA session cluster and check there are two dedicated 
Services created, the internal one has NONE CLUSTER-IP.
   2. Deploy an HA session cluster and check only the rest Service is created.
   3. Check that the internal Service only exposes the RPC and BLOB port while 
the external one exposes the REST port.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / **no**)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
 - The serializers: (yes / no / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / **no** 
/ don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: (yes / **no** / 
don't know)
 - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / **no**)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #11336: [FLINK-16471][jdbc] develop PostgresCatalog

2020-03-19 Thread GitBox
JingsongLi commented on a change in pull request #11336: [FLINK-16471][jdbc] 
develop PostgresCatalog
URL: https://github.com/apache/flink/pull/11336#discussion_r395413626
 
 

 ##
 File path: 
flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/catalog/JDBCCatalog.java
 ##
 @@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc.catalog;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.java.io.jdbc.dialect.JDBCDialect;
+import org.apache.flink.api.java.io.jdbc.dialect.JDBCDialects;
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+/**
+ *
+ */
+@PublicEvolving
+public class JDBCCatalog extends AbstractJDBCCatalog {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(JDBCCatalog.class);
+
+   private final Catalog internal;
+
+   public JDBCCatalog(String catalogName, String defaultDatabase, String 
username, String pwd, String baseUrl) {
+   super(catalogName, defaultDatabase, username, pwd, baseUrl);
+
+   JDBCDialect dialect = JDBCDialects.get(baseUrl).get();
+
+   if (dialect instanceof JDBCDialects.PostgresDialect) {
+   internal = new PostgresCatalog(catalogName, 
defaultDatabase, username, pwd, baseUrl);
 
 Review comment:
   I know this implementation is in FLIP.
   But this if else still looks weird to me. Can we add this interface to 
JDBCDialect? like add method: `createCatalog(...)` in JDBCDialect. Instead need 
update JDBCCatalog every time.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #11421: [FLINK-15962][network] Reduce the default chunk size to 4M in netty stack

2020-03-19 Thread GitBox
flinkbot edited a comment on issue #11421: [FLINK-15962][network] Reduce the 
default chunk size to 4M in netty stack
URL: https://github.com/apache/flink/pull/11421#issuecomment-599826019
 
 
   
   ## CI report:
   
   * 394192b70d312009146b91bda454b5297ad3b036 Travis: 
[PENDING](https://travis-ci.com/github/flink-ci/flink/builds/154189440) Azure: 
[FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=6427)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #11336: [FLINK-16471][jdbc] develop PostgresCatalog

2020-03-19 Thread GitBox
JingsongLi commented on a change in pull request #11336: [FLINK-16471][jdbc] 
develop PostgresCatalog
URL: https://github.com/apache/flink/pull/11336#discussion_r395412510
 
 

 ##
 File path: 
flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/catalog/JDBCCatalog.java
 ##
 @@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc.catalog;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.java.io.jdbc.dialect.JDBCDialect;
+import org.apache.flink.api.java.io.jdbc.dialect.JDBCDialects;
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+/**
+ *
 
 Review comment:
   Comment?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-16687) PyFlink Cannot determine simple type name "PythonScalarFunction$0"

2020-03-19 Thread mayne wong (Jira)
mayne wong created FLINK-16687:
--

 Summary: PyFlink Cannot determine simple type name 
"PythonScalarFunction$0"
 Key: FLINK-16687
 URL: https://issues.apache.org/jira/browse/FLINK-16687
 Project: Flink
  Issue Type: Bug
  Components: API / Python
Affects Versions: 1.10.0
Reporter: mayne wong


 
I try to run PyFlink UDF with SQL UNNEST, execution of job failed, I defined a 
source from element, and use UDF split the string to list.
raise org.codehaus.commons.compiler.CompileException: Cannot determine simple 
type name "PythonScalarFunction$0"
{code:python}
import os
from pyflink.table.udf import udf
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, DataTypes, CsvTableSink

@udf(input_types=[DataTypes.STRING()], 
result_type=DataTypes.ARRAY(DataTypes.STRING()))
def format_string_to_array(item):
return item.replace('[', '').replace(']', '').replace(', ', ',').split(',')

if __name__ == '__main__':
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
st_env = StreamTableEnvironment.create(env)
result_file = "result.csv"
if os.path.exists(result_file):
os.remove(result_file)

st_env.register_table_sink("result_tab",  CsvTableSink(["id", "url"], 
[DataTypes.STRING(), DataTypes.STRING()],  result_file))
st_env.register_function("format_string_to_array", format_string_to_array)
tab = st_env.from_elements([("1", "['www.bing.com', 'www.google.com']"), 
("2", "['www.taobao.com']")], ['id', 'urls'])
st_env.register_table("temp_table", tab)
st_env.sql_query("Select id, A.url from temp_table, 
UNNEST(format_string_to_array(temp_table.urls)) AS 
A(url)").insert_into("result_tab")
st_env.execute("udf")

{code}
 

When I execute the program, I get the following exception:

 
{code:java}
py4j.protocol.Py4JJavaError: An error occurred while calling o2.execute.
: java.util.concurrent.ExecutionException: 
org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: 
5d63838ad2043bf4a5d0bca83623959d)
at 
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at 
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1640)
at 
org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:74)
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1620)
at 
org.apache.flink.table.executor.StreamExecutor.execute(StreamExecutor.java:50)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:643)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at 
org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at 
org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
at 
org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at 
org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
at 
org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.client.program.ProgramInvocationException: Job 
failed (JobID: 5d63838ad2043bf4a5d0bca83623959d)
at 
org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:112)
at 
java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
at 
java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at 
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
at 
org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:874)
at akka.dispatch.OnComplete.internal(Future.scala:264)
at akka.dispatch.OnComplete.internal(Future.scala:261)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188)
at 

[GitHub] [flink] JingsongLi commented on a change in pull request #11336: [FLINK-16471][jdbc] develop PostgresCatalog

2020-03-19 Thread GitBox
JingsongLi commented on a change in pull request #11336: [FLINK-16471][jdbc] 
develop PostgresCatalog
URL: https://github.com/apache/flink/pull/11336#discussion_r395412120
 
 

 ##
 File path: 
flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/catalog/AbstractJDBCCatalog.java
 ##
 @@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc.catalog;
+
+import org.apache.flink.api.java.io.jdbc.JDBCTableSourceSinkFactory;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.AbstractCatalog;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.CatalogFunction;
+import org.apache.flink.table.catalog.CatalogPartition;
+import org.apache.flink.table.catalog.CatalogPartitionSpec;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.FunctionNotExistException;
+import 
org.apache.flink.table.catalog.exceptions.PartitionAlreadyExistsException;
+import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
+import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException;
+import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
+import org.apache.flink.table.catalog.exceptions.TablePartitionedException;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
+import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.factories.TableFactory;
+import org.apache.flink.util.StringUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * Abstract catalog for any JDBC catalogs.
+ */
+public abstract class AbstractJDBCCatalog extends AbstractCatalog {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(AbstractJDBCCatalog.class);
+
+   protected final String username;
+   protected final String pwd;
+   protected final String baseUrl;
+   protected final String defaultUrl;
+
+   public AbstractJDBCCatalog(String catalogName, String defaultDatabase, 
String username, String pwd, String baseUrl) {
+   super(catalogName, defaultDatabase);
+
+   checkArgument(!StringUtils.isNullOrWhitespaceOnly(username));
+   checkArgument(!StringUtils.isNullOrWhitespaceOnly(pwd));
+   checkArgument(!StringUtils.isNullOrWhitespaceOnly(baseUrl));
+
+   JDBCCatalogUtils.validateJDBCUrl(baseUrl);
+
+   this.username = username;
+   this.pwd = pwd;
+   this.baseUrl = baseUrl.endsWith("/") ? baseUrl : baseUrl + "/";
+   this.defaultUrl = baseUrl + defaultDatabase;
+   }
+
+   @Override
+   public void open() throws CatalogException {
+   // test connection, fail early if we cannot connect to database
+   try (Connection conn = DriverManager.getConnection(defaultUrl, 
username, pwd)) {
+   } catch (SQLException e) {
+   throw new ValidationException(
+   String.format("Failed connecting to %s via 
JDBC.", defaultUrl), e);
+   }
+
+   LOG.info("Catalog {} established connection to {}", getName(), 
defaultUrl);

[GitHub] [flink] JingsongLi commented on issue #11391: [FLINK-16098] [chinese-translation, Documentation] Translate "Overview" page of "Hive Integration" into Chinese

2020-03-19 Thread GitBox
JingsongLi commented on issue #11391: [FLINK-16098] [chinese-translation, 
Documentation] Translate "Overview" page of "Hive Integration" into Chinese
URL: https://github.com/apache/flink/pull/11391#issuecomment-601496994
 
 
   Hi @liuzhixing1006 , you can not use merge. You should use rebase only.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink-statefun] sjwiesman commented on issue #62: [FLINK-16685] Add a k8s example for the Python SDK

2020-03-19 Thread GitBox
sjwiesman commented on issue #62: [FLINK-16685] Add a k8s example for the 
Python SDK
URL: https://github.com/apache/flink-statefun/pull/62#issuecomment-601494204
 
 
   Maybe off topics but should we add k8s deployment docs? 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-16557) Document YAML-ized Kafka egresses / ingresses in Stateful Functions documentation

2020-03-19 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16557?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-16557:
---
Labels: pull-request-available  (was: )

> Document YAML-ized Kafka egresses / ingresses in Stateful Functions 
> documentation
> -
>
> Key: FLINK-16557
> URL: https://issues.apache.org/jira/browse/FLINK-16557
> Project: Flink
>  Issue Type: Task
>  Components: Stateful Functions
>Affects Versions: statefun-1.1
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Seth Wiesman
>Priority: Major
>  Labels: pull-request-available
>
> The Stateful Functions documentation is still missing information about 
> YAML-ized egresses / ingresses.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink-statefun] sjwiesman opened a new pull request #63: [FLINK-16557][docs] Document YAML-ized Kafka egresses / ingresses in …

2020-03-19 Thread GitBox
sjwiesman opened a new pull request #63: [FLINK-16557][docs] Document YAML-ized 
Kafka egresses / ingresses in …
URL: https://github.com/apache/flink-statefun/pull/63
 
 
   …Stateful Functions documentation


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Closed] (FLINK-16650) Support LocalZonedTimestampType for Python UDF in blink planner

2020-03-19 Thread Hequn Cheng (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16650?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hequn Cheng closed FLINK-16650.
---
Resolution: Resolved

> Support LocalZonedTimestampType for Python UDF in blink planner
> ---
>
> Key: FLINK-16650
> URL: https://issues.apache.org/jira/browse/FLINK-16650
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Python
>Reporter: Dian Fu
>Assignee: Dian Fu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.11.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-16650) Support LocalZonedTimestampType for Python UDF in blink planner

2020-03-19 Thread Hequn Cheng (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17063034#comment-17063034
 ] 

Hequn Cheng commented on FLINK-16650:
-

Resolved in 1.11.0 via 5ccb16724769becab0003e0299d9c4a63cd52378

> Support LocalZonedTimestampType for Python UDF in blink planner
> ---
>
> Key: FLINK-16650
> URL: https://issues.apache.org/jira/browse/FLINK-16650
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Python
>Reporter: Dian Fu
>Assignee: Dian Fu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.11.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] hequn8128 merged pull request #11439: [FLINK-16650][python] Support LocalZonedTimestampType for Python UDF in blink planner

2020-03-19 Thread GitBox
hequn8128 merged pull request #11439: [FLINK-16650][python] Support 
LocalZonedTimestampType for Python UDF in blink planner
URL: https://github.com/apache/flink/pull/11439
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Comment Edited] (FLINK-16637) Flink per job mode terminates before serving job cancellation result

2020-03-19 Thread Zili Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16637?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17063025#comment-17063025
 ] 

Zili Chen edited comment on FLINK-16637 at 3/20/20, 1:31 AM:
-

[~gjy] after "cancel" command received by the {{JobMaster}}, it causes

1. The job asynchronously cancelled, and later causes cluster shutdown.
2. The response asynchronously sent

I don't find an explicit synchronization between 1 and 2, so is the statement.

But generally 1 takes some time before later it causes the cluster shutdown, 
and thus 2 can nearly always happens before the cluster shutdown.

Possibly we flush outstanding response in rest server before it gets closed.


was (Author: tison):
[~gjy] after "cancel" command received by the {{JobMaster}}, it causes

1. The job asynchronously cancelled
2. The response asynchronously sent

I don't find an explicit synchronization between 1 and 2, so is the statement.

But generally 1 takes some time before later it causes the cluster shutdown, 
and thus 2 can nearly always happens before the cluster shutdown.

> Flink per job mode terminates before serving job cancellation result
> 
>
> Key: FLINK-16637
> URL: https://issues.apache.org/jira/browse/FLINK-16637
> Project: Flink
>  Issue Type: Bug
>  Components: Deployment / YARN
>Affects Versions: 1.10.0
>Reporter: Zili Chen
>Priority: Major
> Fix For: 1.10.1, 1.11.0
>
> Attachments: yarn.log
>
>
> The {{MiniDispatcher}} no longer waits until the REST handler has served the 
> cancellation result before shutting down. This behaviour seems to be 
> introduced with FLINK-15116.
> See also 
> [https://lists.apache.org/x/thread.html/rcadbd6ceede422bac8d4483fd0cdae58659fbff78533a399eb136743@%3Cdev.flink.apache.org%3E]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot edited a comment on issue #11421: [FLINK-15962][network] Reduce the default chunk size to 4M in netty stack

2020-03-19 Thread GitBox
flinkbot edited a comment on issue #11421: [FLINK-15962][network] Reduce the 
default chunk size to 4M in netty stack
URL: https://github.com/apache/flink/pull/11421#issuecomment-599826019
 
 
   
   ## CI report:
   
   * dd9c079d6024bcae4216bdc3004660018e41f938 Travis: 
[FAILURE](https://travis-ci.com/github/flink-ci/flink/builds/153624584) Azure: 
[FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=6315)
 
   * 394192b70d312009146b91bda454b5297ad3b036 Travis: 
[PENDING](https://travis-ci.com/github/flink-ci/flink/builds/154189440) Azure: 
[PENDING](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=6427)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-16637) Flink per job mode terminates before serving job cancellation result

2020-03-19 Thread Zili Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16637?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17063025#comment-17063025
 ] 

Zili Chen commented on FLINK-16637:
---

[~gjy] after "cancel" command received by the {{JobMaster}}, it causes

1. The job asynchronously cancelled
2. The response asynchronously sent

I don't find an explicit synchronization between 1 and 2, so is the statement.

But generally 1 takes some time before later it causes the cluster shutdown, 
and thus 2 can nearly always happens before the cluster shutdown.

> Flink per job mode terminates before serving job cancellation result
> 
>
> Key: FLINK-16637
> URL: https://issues.apache.org/jira/browse/FLINK-16637
> Project: Flink
>  Issue Type: Bug
>  Components: Deployment / YARN
>Affects Versions: 1.10.0
>Reporter: Zili Chen
>Priority: Major
> Fix For: 1.10.1, 1.11.0
>
> Attachments: yarn.log
>
>
> The {{MiniDispatcher}} no longer waits until the REST handler has served the 
> cancellation result before shutting down. This behaviour seems to be 
> introduced with FLINK-15116.
> See also 
> [https://lists.apache.org/x/thread.html/rcadbd6ceede422bac8d4483fd0cdae58659fbff78533a399eb136743@%3Cdev.flink.apache.org%3E]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] dianfu commented on issue #11439: [FLINK-16650][python] Support LocalZonedTimestampType for Python UDF in blink planner

2020-03-19 Thread GitBox
dianfu commented on issue #11439: [FLINK-16650][python] Support 
LocalZonedTimestampType for Python UDF in blink planner
URL: https://github.com/apache/flink/pull/11439#issuecomment-601487959
 
 
   The failed azure test is a known issue and has already been tracked in  
https://issues.apache.org/jira/browse/FLINK-16676


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-15852) Job is submitted to the wrong session cluster

2020-03-19 Thread Canbin Zheng (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15852?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17063021#comment-17063021
 ] 

Canbin Zheng commented on FLINK-15852:
--

Thanks for the fixup [~kkl0u]!

> Job is submitted to the wrong session cluster
> -
>
> Key: FLINK-15852
> URL: https://issues.apache.org/jira/browse/FLINK-15852
> Project: Flink
>  Issue Type: Bug
>  Components: Command Line Client
>Affects Versions: 1.10.0
>Reporter: Canbin Zheng
>Assignee: Kostas Kloudas
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.10.1, 1.11.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> Steps to reproduce the problem:
>  # Deploy a YARN session cluster by command {{./bin/yarn-session.sh -d}}
>  # Deploy a Kubernetes session cluster by command 
> {{./bin/kubernetes-session.sh -Dkubernetes.cluster-id=test ...}}
>  # Try to submit a Job to the Kubernetes session cluster by command 
> {{./bin/flink run -d -e kubernetes-session -Dkubernetes.cluster-id=test 
> examples/streaming/WordCount.jar}}
> It's expected that the Job will be submitted to the Kubernetes session 
> cluster whose cluster-id is *test*, however, the job was submitted to the 
> YARN session cluster.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot edited a comment on issue #11421: [FLINK-15962][network] Reduce the default chunk size to 4M in netty stack

2020-03-19 Thread GitBox
flinkbot edited a comment on issue #11421: [FLINK-15962][network] Reduce the 
default chunk size to 4M in netty stack
URL: https://github.com/apache/flink/pull/11421#issuecomment-599826019
 
 
   
   ## CI report:
   
   * dd9c079d6024bcae4216bdc3004660018e41f938 Travis: 
[FAILURE](https://travis-ci.com/github/flink-ci/flink/builds/153624584) Azure: 
[FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=6315)
 
   * 394192b70d312009146b91bda454b5297ad3b036 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] liuzhixing1006 edited a comment on issue #11391: [FLINK-16098] [chinese-translation, Documentation] Translate "Overview" page of "Hive Integration" into Chinese

2020-03-19 Thread GitBox
liuzhixing1006 edited a comment on issue #11391: [FLINK-16098] 
[chinese-translation, Documentation] Translate "Overview" page of "Hive 
Integration" into Chinese
URL: https://github.com/apache/flink/pull/11391#issuecomment-601200160
 
 
   @lirui-apache @JingsongLi  Thanks for your advice, I have improved the 
problem.
   But I had a problem updating the branch,I use the following in order:
   
   1.git add -A
   2.git commit -am [FLINK-16098] [chinese-translation, Documentation] 
Translate "Overview" page of "Hive Integration" into Chinese
   3.git fetch upstream master
   4.git merge upstream/master
   5.git rebase -i master
   6.git push origin docFix
   
   Then in the pull request you find that there are a lot of unrelated commits。
   Is there something wrong with me?
   Do I need to close the current pullrequest and create a new one?
   Thanks~
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters

2020-03-19 Thread GitBox
flinkbot edited a comment on issue #11195: [FLINK-16222][runtime] Use plugins 
mechanism for initializing MetricReporters
URL: https://github.com/apache/flink/pull/11195#issuecomment-590065588
 
 
   
   ## CI report:
   
   * 738a6fd162d5ba99432438d224f3b4890961af56 UNKNOWN
   * 77f24b292a04fde34f4c645151484cf86a13832c UNKNOWN
   * 5dfb306ba6ff07f700b1afc21847641321705d4d UNKNOWN
   * 5b0f219689e81fe11c3511c13c7f4943a997a4ef UNKNOWN
   * 7f6ad172474e37bbf346a7762868c9114510c4c9 UNKNOWN
   * 2590a69417bdd2b6bf2f4765b276a051040a97cf UNKNOWN
   * d07ec693eb90e33071e710be0b774fc995f05867 Travis: 
[CANCELED](https://travis-ci.com/github/flink-ci/flink/builds/154179607) 
   * a1b15671c97e8809c0301fb4c225fdc1a7f78545 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters

2020-03-19 Thread GitBox
afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] 
Use plugins mechanism for initializing MetricReporters
URL: https://github.com/apache/flink/pull/11195#discussion_r395376334
 
 

 ##
 File path: 
flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/FlinkDistribution.java
 ##
 @@ -260,16 +262,26 @@ public void submitSQLJob(SQLJobSubmission job) throws 
IOException {
}
 
public void copyOptJarsToLib(String jarNamePrefix) throws 
FileNotFoundException, IOException {
-   final Optional reporterJarOptional;
-   try (Stream logFiles = Files.walk(opt)) {
-   reporterJarOptional = logFiles
+   copyOptJars(jarNamePrefix, lib);
+   }
+
+   public void copyOptJarsToPlugins(String jarNamePrefix) throws 
FileNotFoundException, IOException {
 
 Review comment:
   I assume you mean to modify `FlinkDistribution#mapJarLocationToPath` ?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters

2020-03-19 Thread GitBox
afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] 
Use plugins mechanism for initializing MetricReporters
URL: https://github.com/apache/flink/pull/11195#discussion_r395375550
 
 

 ##
 File path: 
flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/FlinkDistribution.java
 ##
 @@ -260,16 +262,26 @@ public void submitSQLJob(SQLJobSubmission job) throws 
IOException {
}
 
public void copyOptJarsToLib(String jarNamePrefix) throws 
FileNotFoundException, IOException {
-   final Optional reporterJarOptional;
-   try (Stream logFiles = Files.walk(opt)) {
-   reporterJarOptional = logFiles
+   copyOptJars(jarNamePrefix, lib);
+   }
+
+   public void copyOptJarsToPlugins(String jarNamePrefix) throws 
FileNotFoundException, IOException {
 
 Review comment:
   > and modify FlinkDistribution#moveJar to handle this location appropriately.
   
   could you clarify? 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters

2020-03-19 Thread GitBox
afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] 
Use plugins mechanism for initializing MetricReporters
URL: https://github.com/apache/flink/pull/11195#discussion_r395376334
 
 

 ##
 File path: 
flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/FlinkDistribution.java
 ##
 @@ -260,16 +262,26 @@ public void submitSQLJob(SQLJobSubmission job) throws 
IOException {
}
 
public void copyOptJarsToLib(String jarNamePrefix) throws 
FileNotFoundException, IOException {
-   final Optional reporterJarOptional;
-   try (Stream logFiles = Files.walk(opt)) {
-   reporterJarOptional = logFiles
+   copyOptJars(jarNamePrefix, lib);
+   }
+
+   public void copyOptJarsToPlugins(String jarNamePrefix) throws 
FileNotFoundException, IOException {
 
 Review comment:
   I assume you mean to modify FlinkDistribution#mapJarLocationToPath ?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters

2020-03-19 Thread GitBox
afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] 
Use plugins mechanism for initializing MetricReporters
URL: https://github.com/apache/flink/pull/11195#discussion_r395375550
 
 

 ##
 File path: 
flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/FlinkDistribution.java
 ##
 @@ -260,16 +262,26 @@ public void submitSQLJob(SQLJobSubmission job) throws 
IOException {
}
 
public void copyOptJarsToLib(String jarNamePrefix) throws 
FileNotFoundException, IOException {
-   final Optional reporterJarOptional;
-   try (Stream logFiles = Files.walk(opt)) {
-   reporterJarOptional = logFiles
+   copyOptJars(jarNamePrefix, lib);
+   }
+
+   public void copyOptJarsToPlugins(String jarNamePrefix) throws 
FileNotFoundException, IOException {
 
 Review comment:
   > and modify FlinkDistribution#moveJar to handle this location appropriately.
   - could you clarify? 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters

2020-03-19 Thread GitBox
afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] 
Use plugins mechanism for initializing MetricReporters
URL: https://github.com/apache/flink/pull/11195#discussion_r395375550
 
 

 ##
 File path: 
flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/FlinkDistribution.java
 ##
 @@ -260,16 +262,26 @@ public void submitSQLJob(SQLJobSubmission job) throws 
IOException {
}
 
public void copyOptJarsToLib(String jarNamePrefix) throws 
FileNotFoundException, IOException {
-   final Optional reporterJarOptional;
-   try (Stream logFiles = Files.walk(opt)) {
-   reporterJarOptional = logFiles
+   copyOptJars(jarNamePrefix, lib);
+   }
+
+   public void copyOptJarsToPlugins(String jarNamePrefix) throws 
FileNotFoundException, IOException {
 
 Review comment:
   > and modify FlinkDistribution#moveJar to handle this location appropriately.
   
   could you clarify? 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters

2020-03-19 Thread GitBox
afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] 
Use plugins mechanism for initializing MetricReporters
URL: https://github.com/apache/flink/pull/11195#discussion_r395375550
 
 

 ##
 File path: 
flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/FlinkDistribution.java
 ##
 @@ -260,16 +262,26 @@ public void submitSQLJob(SQLJobSubmission job) throws 
IOException {
}
 
public void copyOptJarsToLib(String jarNamePrefix) throws 
FileNotFoundException, IOException {
-   final Optional reporterJarOptional;
-   try (Stream logFiles = Files.walk(opt)) {
-   reporterJarOptional = logFiles
+   copyOptJars(jarNamePrefix, lib);
+   }
+
+   public void copyOptJarsToPlugins(String jarNamePrefix) throws 
FileNotFoundException, IOException {
 
 Review comment:
   > and modify FlinkDistribution#moveJar to handle this location appropriately.
could you clarify? 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters

2020-03-19 Thread GitBox
afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] 
Use plugins mechanism for initializing MetricReporters
URL: https://github.com/apache/flink/pull/11195#discussion_r395375423
 
 

 ##
 File path: 
flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/FlinkDistribution.java
 ##
 @@ -260,16 +262,26 @@ public void submitSQLJob(SQLJobSubmission job) throws 
IOException {
}
 
public void copyOptJarsToLib(String jarNamePrefix) throws 
FileNotFoundException, IOException {
-   final Optional reporterJarOptional;
-   try (Stream logFiles = Files.walk(opt)) {
-   reporterJarOptional = logFiles
+   copyOptJars(jarNamePrefix, lib);
+   }
+
+   public void copyOptJarsToPlugins(String jarNamePrefix) throws 
FileNotFoundException, IOException {
 
 Review comment:
   But this is not really "copying jars", right? It will actually move the file 
from opt to lib or plugins. The problem is that one of the cases I would like 
to test required the jar to be actually copied to both.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters

2020-03-19 Thread GitBox
flinkbot edited a comment on issue #11195: [FLINK-16222][runtime] Use plugins 
mechanism for initializing MetricReporters
URL: https://github.com/apache/flink/pull/11195#issuecomment-590065588
 
 
   
   ## CI report:
   
   * 738a6fd162d5ba99432438d224f3b4890961af56 UNKNOWN
   * 77f24b292a04fde34f4c645151484cf86a13832c UNKNOWN
   * 5dfb306ba6ff07f700b1afc21847641321705d4d UNKNOWN
   * 5b0f219689e81fe11c3511c13c7f4943a997a4ef UNKNOWN
   * 7ce9ad87a780aab19b9c97680d7af5f648c13f3b Travis: 
[FAILURE](https://travis-ci.com/github/flink-ci/flink/builds/154170382) 
   * 7f6ad172474e37bbf346a7762868c9114510c4c9 UNKNOWN
   * 2590a69417bdd2b6bf2f4765b276a051040a97cf UNKNOWN
   * d07ec693eb90e33071e710be0b774fc995f05867 Travis: 
[PENDING](https://travis-ci.com/github/flink-ci/flink/builds/154179607) 
   * a1b15671c97e8809c0301fb4c225fdc1a7f78545 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters

2020-03-19 Thread GitBox
afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] 
Use plugins mechanism for initializing MetricReporters
URL: https://github.com/apache/flink/pull/11195#discussion_r395348055
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java
 ##
 @@ -179,28 +164,82 @@ private static ReporterSetup createReporterSetup(String 
reporterName, MetricConf
metricReporterOptional.ifPresent(reporter -> {
MetricConfig metricConfig = new 
MetricConfig();

reporterConfig.addAllToProperties(metricConfig);
-
-   
reporterArguments.add(createReporterSetup(reporterName, metricConfig, 
reporter));
+   
reporterSetups.add(createReporterSetup(reporterName, metricConfig, reporter));
});
-   }
-   catch (Throwable t) {
+   } catch (Throwable t) {
LOG.error("Could not instantiate metrics 
reporter {}. Metrics might not be exposed/reported.", reporterName, t);
}
}
-   return reporterArguments;
+   return reporterSetups;
}
 
-   private static Map 
loadReporterFactories() {
-   final ServiceLoader serviceLoader = 
ServiceLoader.load(MetricReporterFactory.class);
+   private static List> 
loadReporterConfigurations(Configuration configuration, Set 
namedReporters) {
+   final List> 
reporterConfigurations = new ArrayList<>(namedReporters.size());
 
+   for (String namedReporter: namedReporters) {
+   DelegatingConfiguration delegatingConfiguration = new 
DelegatingConfiguration(
+   configuration,
+   ConfigConstants.METRICS_REPORTER_PREFIX + 
namedReporter + '.');
+
+   reporterConfigurations.add(Tuple2.of(namedReporter, 
delegatingConfiguration));
+   }
+   return reporterConfigurations;
+   }
+
+   private static Set 
findEnabledReportersInConfiguration(Configuration configuration, String 
includedReportersString) {
+   Set includedReporters = 
reporterListPattern.splitAsStream(includedReportersString)
+   .filter(r -> !r.isEmpty()) // splitting an empty string 
results in an empty string on jdk9+
+   .collect(Collectors.toSet());
 
 Review comment:
   @AHeise I have applied this refactoring but then understood that I probably 
really did not get what you actually propose. Could you please clarify? 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters

2020-03-19 Thread GitBox
afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] 
Use plugins mechanism for initializing MetricReporters
URL: https://github.com/apache/flink/pull/11195#discussion_r395368073
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java
 ##
 @@ -178,36 +164,96 @@ private static ReporterSetup createReporterSetup(String 
reporterName, MetricConf
metricReporterOptional.ifPresent(reporter -> {
MetricConfig metricConfig = new 
MetricConfig();

reporterConfig.addAllToProperties(metricConfig);
-
-   
reporterArguments.add(createReporterSetup(reporterName, metricConfig, 
reporter));
+   
reporterSetups.add(createReporterSetup(reporterName, metricConfig, reporter));
});
-   }
-   catch (Throwable t) {
+   } catch (Throwable t) {
LOG.error("Could not instantiate metrics 
reporter {}. Metrics might not be exposed/reported.", reporterName, t);
}
}
-   return reporterArguments;
+   return reporterSetups;
}
 
-   private static Map 
loadReporterFactories() {
-   final ServiceLoader serviceLoader = 
ServiceLoader.load(MetricReporterFactory.class);
+   private static List> 
loadReporterConfigurations(Configuration configuration, Set 
namedReporters) {
+   final List> 
reporterConfigurations = new ArrayList<>(namedReporters.size());
+
+   for (String namedReporter: namedReporters) {
+   DelegatingConfiguration delegatingConfiguration = new 
DelegatingConfiguration(
+   configuration,
+   ConfigConstants.METRICS_REPORTER_PREFIX + 
namedReporter + '.');
 
+   reporterConfigurations.add(Tuple2.of(namedReporter, 
delegatingConfiguration));
+   }
+   return reporterConfigurations;
+   }
+
+   private static Set 
findEnabledReportersInConfiguration(Configuration configuration, String 
includedReportersString) {
+   Set includedReporters = 
reporterListPattern.splitAsStream(includedReportersString)
+   .filter(r -> !r.isEmpty()) // splitting an empty string 
results in an empty string on jdk9+
+   .collect(Collectors.toSet());
+
+   // use a TreeSet to make the reporter order deterministic, 
which is useful for testing
+   Set namedOrderedReporters = new 
TreeSet<>(String::compareTo);
+
+   // scan entire configuration for keys starting with 
METRICS_REPORTER_PREFIX and determine the set of enabled reporters
+   for (String key : configuration.keySet()) {
+   if 
(key.startsWith(ConfigConstants.METRICS_REPORTER_PREFIX)) {
+   Matcher matcher = 
reporterClassPattern.matcher(key);
+   if (matcher.matches()) {
+   String reporterName = matcher.group(1);
+   if (includedReporters.isEmpty() || 
includedReporters.contains(reporterName)) {
+   if 
(namedOrderedReporters.contains(reporterName)) {
+   LOG.warn("Duplicate 
class configuration detected for reporter {}.", reporterName);
+   } else {
+   
namedOrderedReporters.add(reporterName);
+   }
+   } else {
+   LOG.info("Excluding reporter 
{}, not configured in reporter list ({}).", reporterName, 
includedReportersString);
+   }
+   }
+   }
+   }
+   return namedOrderedReporters;
+   }
+
+   private static Map 
loadAvailableReporterFactories(PluginManager pluginManager) {
final Map reporterFactories = 
new HashMap<>(2);
-   final Iterator factoryIterator = 
serviceLoader.iterator();
+   final Iterator factoryIterator = 
getAllReporterFactories(pluginManager);
+   LOG.debug("All available factories (from both SPIs and 
Plugins):");
+   getAllReporterFactories(pluginManager).forEachRemaining(i -> 
LOG.debug(i.toString()));
// do not use streams or for-each loops here because they do 
not allow catching individual ServiceConfigurationErrors
// such an error might be caused if the META-INF/services 

[GitHub] [flink] flinkbot edited a comment on issue #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters

2020-03-19 Thread GitBox
flinkbot edited a comment on issue #11195: [FLINK-16222][runtime] Use plugins 
mechanism for initializing MetricReporters
URL: https://github.com/apache/flink/pull/11195#issuecomment-590065588
 
 
   
   ## CI report:
   
   * 738a6fd162d5ba99432438d224f3b4890961af56 UNKNOWN
   * 77f24b292a04fde34f4c645151484cf86a13832c UNKNOWN
   * 5dfb306ba6ff07f700b1afc21847641321705d4d UNKNOWN
   * 5b0f219689e81fe11c3511c13c7f4943a997a4ef UNKNOWN
   * 7ce9ad87a780aab19b9c97680d7af5f648c13f3b Travis: 
[FAILURE](https://travis-ci.com/github/flink-ci/flink/builds/154170382) 
   * 7f6ad172474e37bbf346a7762868c9114510c4c9 UNKNOWN
   * 2590a69417bdd2b6bf2f4765b276a051040a97cf UNKNOWN
   * d07ec693eb90e33071e710be0b774fc995f05867 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters

2020-03-19 Thread GitBox
afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] 
Use plugins mechanism for initializing MetricReporters
URL: https://github.com/apache/flink/pull/11195#discussion_r395366353
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
 ##
 @@ -160,13 +161,14 @@ public void startCluster() throws 
ClusterEntrypointException {
LOG.info("Starting {}.", getClass().getSimpleName());
 
try {
-
-   configureFileSystems(configuration);
+   //TODO: push down filesystem initialization into 
runCluster - initializeServices (?)
 
 Review comment:
   If it is something non-trivial and hard to make a call about, I would 
propose to skip this refactoring for now.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters

2020-03-19 Thread GitBox
afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] 
Use plugins mechanism for initializing MetricReporters
URL: https://github.com/apache/flink/pull/11195#discussion_r395365114
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java
 ##
 @@ -210,17 +213,31 @@ private static ReporterSetup createReporterSetup(String 
reporterName, MetricConf
return namedOrderedReporters;
}
 
-   private static Map 
loadReporterFactories() {
-   final ServiceLoader serviceLoader = 
ServiceLoader.load(MetricReporterFactory.class);
-
+   private static Map 
loadAvailableReporterFactories(PluginManager pluginManager) {
final Map reporterFactories = 
new HashMap<>(2);
-   final Iterator factoryIterator = 
serviceLoader.iterator();
+   final Iterator factoryIterator = 
getAllReporterFactories(pluginManager);
+   LOG.info("Prepare reporter factories (from both SPIs and 
Plugins):");
// do not use streams or for-each loops here because they do 
not allow catching individual ServiceConfigurationErrors
// such an error might be caused if the META-INF/services 
contains an entry to a non-existing factory class
while (factoryIterator.hasNext()) {
try {
MetricReporterFactory factory = 
factoryIterator.next();
-   
reporterFactories.put(factory.getClass().getName(), factory);
+   String factoryClassName = 
factory.getClass().getName();
+   MetricReporterFactory existingFactory = 
reporterFactories.get(factoryClassName);
+   if (existingFactory == null){
+   reporterFactories.put(factoryClassName, 
factory);
+   LOG.info("Found reporter factory {} at 
{} ",
+   factoryClassName,
+   new 
File(factory.getClass().getProtectionDomain().getCodeSource().getLocation().toURI()).getCanonicalPath());
+   } else {
+   //TODO: use path information below, 
when Plugin Classloader stops always prioritizing factories from /lib
+// String jarPath1 = new 
File(existingFactory.getClass().getProtectionDomain().getCodeSource().getLocation()
+// .toURI()).getCanonicalPath();
+// String jarPath2 = new 
File(factory.getClass().getProtectionDomain().getCodeSource().getLocation()
+// .toURI()).getCanonicalPath();
+// LOG.warn("Multiple implementations of 
the same reporter were found: \n {} and \n{}", jarPath1, jarPath2);
+   LOG.warn("Multiple implementations of 
the same reporter were found in 'lib' and/or 'plugins' directories for {}. It 
is recommended to remove redundant reporter JARs to resolve used versions' 
ambiguity.", factoryClassName);
 
 Review comment:
   I thing the problem is that we have too many variants to describe with a 
single coordinating conjunction like "or" (plus "or" can unfortunately be both 
inclusive and exclusive). Cases:
   (xx) _ ()
   () _ (xx)
   (x) _ (x)
   (xx) _ (xx)
   
   Alternatives like "a or b or both" also do not work, because in this 
particular case they can be easily be interpreted as if the "(x) _ (x)" case is 
not an issue (because of "Multiple" in the beginning). Seems to me like the 
best case to indicate this ambiguity is is to use "and/or" so that people will 
be aware of multiple ways this can happen.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters

2020-03-19 Thread GitBox
afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] 
Use plugins mechanism for initializing MetricReporters
URL: https://github.com/apache/flink/pull/11195#discussion_r395365114
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java
 ##
 @@ -210,17 +213,31 @@ private static ReporterSetup createReporterSetup(String 
reporterName, MetricConf
return namedOrderedReporters;
}
 
-   private static Map 
loadReporterFactories() {
-   final ServiceLoader serviceLoader = 
ServiceLoader.load(MetricReporterFactory.class);
-
+   private static Map 
loadAvailableReporterFactories(PluginManager pluginManager) {
final Map reporterFactories = 
new HashMap<>(2);
-   final Iterator factoryIterator = 
serviceLoader.iterator();
+   final Iterator factoryIterator = 
getAllReporterFactories(pluginManager);
+   LOG.info("Prepare reporter factories (from both SPIs and 
Plugins):");
// do not use streams or for-each loops here because they do 
not allow catching individual ServiceConfigurationErrors
// such an error might be caused if the META-INF/services 
contains an entry to a non-existing factory class
while (factoryIterator.hasNext()) {
try {
MetricReporterFactory factory = 
factoryIterator.next();
-   
reporterFactories.put(factory.getClass().getName(), factory);
+   String factoryClassName = 
factory.getClass().getName();
+   MetricReporterFactory existingFactory = 
reporterFactories.get(factoryClassName);
+   if (existingFactory == null){
+   reporterFactories.put(factoryClassName, 
factory);
+   LOG.info("Found reporter factory {} at 
{} ",
+   factoryClassName,
+   new 
File(factory.getClass().getProtectionDomain().getCodeSource().getLocation().toURI()).getCanonicalPath());
+   } else {
+   //TODO: use path information below, 
when Plugin Classloader stops always prioritizing factories from /lib
+// String jarPath1 = new 
File(existingFactory.getClass().getProtectionDomain().getCodeSource().getLocation()
+// .toURI()).getCanonicalPath();
+// String jarPath2 = new 
File(factory.getClass().getProtectionDomain().getCodeSource().getLocation()
+// .toURI()).getCanonicalPath();
+// LOG.warn("Multiple implementations of 
the same reporter were found: \n {} and \n{}", jarPath1, jarPath2);
+   LOG.warn("Multiple implementations of 
the same reporter were found in 'lib' and/or 'plugins' directories for {}. It 
is recommended to remove redundant reporter JARs to resolve used versions' 
ambiguity.", factoryClassName);
 
 Review comment:
   I thing the problem is that we have too many variants to describe with a 
single coordinating conjunction like "or" (plus "or" can unfortunately be both 
inclusive and exclusive). Cases:
   (xx) _ ()
   () _ (xx)
   (x) _ (x)
   (xx) _ (xx)
   
   Alternatives like "a or b or both" also do not work, because in this 
particular case they can be easily be interpreted as if the "(x) _ (x)" case is 
not an issue (because of "Multiple" in the beginning). Seems to me like the 
best case to indicate this ambiguity is is to use "and/or" so that people will 
be aware for what kinds of problem potentially to look for.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters

2020-03-19 Thread GitBox
afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] 
Use plugins mechanism for initializing MetricReporters
URL: https://github.com/apache/flink/pull/11195#discussion_r395365114
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java
 ##
 @@ -210,17 +213,31 @@ private static ReporterSetup createReporterSetup(String 
reporterName, MetricConf
return namedOrderedReporters;
}
 
-   private static Map 
loadReporterFactories() {
-   final ServiceLoader serviceLoader = 
ServiceLoader.load(MetricReporterFactory.class);
-
+   private static Map 
loadAvailableReporterFactories(PluginManager pluginManager) {
final Map reporterFactories = 
new HashMap<>(2);
-   final Iterator factoryIterator = 
serviceLoader.iterator();
+   final Iterator factoryIterator = 
getAllReporterFactories(pluginManager);
+   LOG.info("Prepare reporter factories (from both SPIs and 
Plugins):");
// do not use streams or for-each loops here because they do 
not allow catching individual ServiceConfigurationErrors
// such an error might be caused if the META-INF/services 
contains an entry to a non-existing factory class
while (factoryIterator.hasNext()) {
try {
MetricReporterFactory factory = 
factoryIterator.next();
-   
reporterFactories.put(factory.getClass().getName(), factory);
+   String factoryClassName = 
factory.getClass().getName();
+   MetricReporterFactory existingFactory = 
reporterFactories.get(factoryClassName);
+   if (existingFactory == null){
+   reporterFactories.put(factoryClassName, 
factory);
+   LOG.info("Found reporter factory {} at 
{} ",
+   factoryClassName,
+   new 
File(factory.getClass().getProtectionDomain().getCodeSource().getLocation().toURI()).getCanonicalPath());
+   } else {
+   //TODO: use path information below, 
when Plugin Classloader stops always prioritizing factories from /lib
+// String jarPath1 = new 
File(existingFactory.getClass().getProtectionDomain().getCodeSource().getLocation()
+// .toURI()).getCanonicalPath();
+// String jarPath2 = new 
File(factory.getClass().getProtectionDomain().getCodeSource().getLocation()
+// .toURI()).getCanonicalPath();
+// LOG.warn("Multiple implementations of 
the same reporter were found: \n {} and \n{}", jarPath1, jarPath2);
+   LOG.warn("Multiple implementations of 
the same reporter were found in 'lib' and/or 'plugins' directories for {}. It 
is recommended to remove redundant reporter JARs to resolve used versions' 
ambiguity.", factoryClassName);
 
 Review comment:
   I thing the problem is that we have too many variants to describe with a 
single coordinating conjunction like "or" (plus "or" can unfortunately be both 
inclusive and exclusive). Cases:
   (xx) _ ()
   () _ (xx)
   (x) _ (x)
   (xx) _ (xx)
   
   Alternatives like "x or y or both" also do not work, because in this 
particular case they can be easily be interpreted as if the "(x) _ (x)" case is 
not an issue (because of "Multiple" in the beginning). Seems to me like the 
best case to indicate this ambiguity is is to use "and/or" so that people will 
be aware for what kinds of problem potentially to look for.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters

2020-03-19 Thread GitBox
afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] 
Use plugins mechanism for initializing MetricReporters
URL: https://github.com/apache/flink/pull/11195#discussion_r395360427
 
 

 ##
 File path: 
flink-end-to-end-tests/flink-metrics-reporter-prometheus-test/src/test/java/org/apache/flink/metrics/prometheus/tests/PrometheusReporterEndToEndITCase.java
 ##
 @@ -120,24 +122,51 @@ public static void checkOS() {
public final DownloadCache downloadCache = DownloadCache.get();
 
@Test
-   public void testReporter() throws Exception {
-   dist.copyOptJarsToLib("flink-metrics-prometheus");
+   public void reporterWorksWhenFoundInLibsViaReflection() throws 
Exception {
+   dist.copyOptJarsToLib(PROMETHEUS_JAR_PREFIX);
+   testReporter(false);
+   }
+
+   @Test
+   public void reporterWorksWhenFoundInPluginsViaReflection() throws 
Exception {
+   dist.copyOptJarsToPlugins(PROMETHEUS_JAR_PREFIX);
+   testReporter(false);
+   }
+
+   @Test
+   public void reporterWorksWhenFoundInPluginsViaFactories() throws 
Exception {
+   dist.copyOptJarsToPlugins(PROMETHEUS_JAR_PREFIX);
+   testReporter(true);
+   }
 
+   @Test
+   public void reporterWorksWhenFoundBothInPluginsAndLibsViaFactories() 
throws Exception {
+   dist.copyOptJarsToPlugins(PROMETHEUS_JAR_PREFIX);
+   dist.copyOptJarsToLib(PROMETHEUS_JAR_PREFIX);
+   testReporter(true);
+   }
+
+   private void testReporter(boolean useFactory) throws Exception {
final Configuration config = new Configuration();
-   config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"prom." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, 
PrometheusReporter.class.getCanonicalName());
+
+   if (useFactory) {
+   
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "prom." + 
ConfigConstants.METRICS_REPORTER_FACTORY_CLASS_SUFFIX, 
PrometheusReporterFactory.class.getName());
+   } else {
+   
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "prom." + 
ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, 
PrometheusReporter.class.getCanonicalName());
+   }
+
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"prom.port", "9000-9100");
 
dist.appendConfiguration(config);
 
final Path tmpPrometheusDir = 
tmp.newFolder().toPath().resolve("prometheus");
-   final Path prometheusArchive = 
tmpPrometheusDir.resolve(PROMETHEUS_FILE_NAME + ".tar.gz");
final Path prometheusBinDir = 
tmpPrometheusDir.resolve(PROMETHEUS_FILE_NAME);
final Path prometheusConfig = 
prometheusBinDir.resolve("prometheus.yml");
final Path prometheusBinary = 
prometheusBinDir.resolve("prometheus");
Files.createDirectory(tmpPrometheusDir);
 
-   downloadCache.getOrDownload(
-   
"https://github.com/prometheus/prometheus/releases/download/v; + 
PROMETHEUS_VERSION + '/' + prometheusArchive.getFileName(),
+   final Path prometheusArchive = downloadCache.getOrDownload(
 
 Review comment:
   Split as requested.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters

2020-03-19 Thread GitBox
flinkbot edited a comment on issue #11195: [FLINK-16222][runtime] Use plugins 
mechanism for initializing MetricReporters
URL: https://github.com/apache/flink/pull/11195#issuecomment-590065588
 
 
   
   ## CI report:
   
   * 738a6fd162d5ba99432438d224f3b4890961af56 UNKNOWN
   * 77f24b292a04fde34f4c645151484cf86a13832c UNKNOWN
   * 5dfb306ba6ff07f700b1afc21847641321705d4d UNKNOWN
   * 5b0f219689e81fe11c3511c13c7f4943a997a4ef UNKNOWN
   * 7ce9ad87a780aab19b9c97680d7af5f648c13f3b Travis: 
[FAILURE](https://travis-ci.com/github/flink-ci/flink/builds/154170382) 
   * 7f6ad172474e37bbf346a7762868c9114510c4c9 UNKNOWN
   * 2590a69417bdd2b6bf2f4765b276a051040a97cf UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters

2020-03-19 Thread GitBox
afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] 
Use plugins mechanism for initializing MetricReporters
URL: https://github.com/apache/flink/pull/11195#discussion_r395348055
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java
 ##
 @@ -179,28 +164,82 @@ private static ReporterSetup createReporterSetup(String 
reporterName, MetricConf
metricReporterOptional.ifPresent(reporter -> {
MetricConfig metricConfig = new 
MetricConfig();

reporterConfig.addAllToProperties(metricConfig);
-
-   
reporterArguments.add(createReporterSetup(reporterName, metricConfig, 
reporter));
+   
reporterSetups.add(createReporterSetup(reporterName, metricConfig, reporter));
});
-   }
-   catch (Throwable t) {
+   } catch (Throwable t) {
LOG.error("Could not instantiate metrics 
reporter {}. Metrics might not be exposed/reported.", reporterName, t);
}
}
-   return reporterArguments;
+   return reporterSetups;
}
 
-   private static Map 
loadReporterFactories() {
-   final ServiceLoader serviceLoader = 
ServiceLoader.load(MetricReporterFactory.class);
+   private static List> 
loadReporterConfigurations(Configuration configuration, Set 
namedReporters) {
+   final List> 
reporterConfigurations = new ArrayList<>(namedReporters.size());
 
+   for (String namedReporter: namedReporters) {
+   DelegatingConfiguration delegatingConfiguration = new 
DelegatingConfiguration(
+   configuration,
+   ConfigConstants.METRICS_REPORTER_PREFIX + 
namedReporter + '.');
+
+   reporterConfigurations.add(Tuple2.of(namedReporter, 
delegatingConfiguration));
+   }
+   return reporterConfigurations;
+   }
+
+   private static Set 
findEnabledReportersInConfiguration(Configuration configuration, String 
includedReportersString) {
+   Set includedReporters = 
reporterListPattern.splitAsStream(includedReportersString)
+   .filter(r -> !r.isEmpty()) // splitting an empty string 
results in an empty string on jdk9+
+   .collect(Collectors.toSet());
 
 Review comment:
   @AHeise I have applied this refactoring but then understood that I probably 
did not get what you actually propose. Could you please clarify? 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters

2020-03-19 Thread GitBox
flinkbot edited a comment on issue #11195: [FLINK-16222][runtime] Use plugins 
mechanism for initializing MetricReporters
URL: https://github.com/apache/flink/pull/11195#issuecomment-590065588
 
 
   
   ## CI report:
   
   * 738a6fd162d5ba99432438d224f3b4890961af56 UNKNOWN
   * 77f24b292a04fde34f4c645151484cf86a13832c UNKNOWN
   * 5dfb306ba6ff07f700b1afc21847641321705d4d UNKNOWN
   * 5b0f219689e81fe11c3511c13c7f4943a997a4ef UNKNOWN
   * 7ce9ad87a780aab19b9c97680d7af5f648c13f3b Travis: 
[FAILURE](https://travis-ci.com/github/flink-ci/flink/builds/154170382) 
   * 7f6ad172474e37bbf346a7762868c9114510c4c9 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters

2020-03-19 Thread GitBox
afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] 
Use plugins mechanism for initializing MetricReporters
URL: https://github.com/apache/flink/pull/11195#discussion_r395348055
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java
 ##
 @@ -179,28 +164,82 @@ private static ReporterSetup createReporterSetup(String 
reporterName, MetricConf
metricReporterOptional.ifPresent(reporter -> {
MetricConfig metricConfig = new 
MetricConfig();

reporterConfig.addAllToProperties(metricConfig);
-
-   
reporterArguments.add(createReporterSetup(reporterName, metricConfig, 
reporter));
+   
reporterSetups.add(createReporterSetup(reporterName, metricConfig, reporter));
});
-   }
-   catch (Throwable t) {
+   } catch (Throwable t) {
LOG.error("Could not instantiate metrics 
reporter {}. Metrics might not be exposed/reported.", reporterName, t);
}
}
-   return reporterArguments;
+   return reporterSetups;
}
 
-   private static Map 
loadReporterFactories() {
-   final ServiceLoader serviceLoader = 
ServiceLoader.load(MetricReporterFactory.class);
+   private static List> 
loadReporterConfigurations(Configuration configuration, Set 
namedReporters) {
+   final List> 
reporterConfigurations = new ArrayList<>(namedReporters.size());
 
+   for (String namedReporter: namedReporters) {
+   DelegatingConfiguration delegatingConfiguration = new 
DelegatingConfiguration(
+   configuration,
+   ConfigConstants.METRICS_REPORTER_PREFIX + 
namedReporter + '.');
+
+   reporterConfigurations.add(Tuple2.of(namedReporter, 
delegatingConfiguration));
+   }
+   return reporterConfigurations;
+   }
+
+   private static Set 
findEnabledReportersInConfiguration(Configuration configuration, String 
includedReportersString) {
+   Set includedReporters = 
reporterListPattern.splitAsStream(includedReportersString)
+   .filter(r -> !r.isEmpty()) // splitting an empty string 
results in an empty string on jdk9+
+   .collect(Collectors.toSet());
 
 Review comment:
   @AHeise I have applied this refactoring but then understood that I probably 
did not get what you actually propose. Could you clarify? 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Closed] (FLINK-10424) Inconsistency between JsonSchemaConveerter and FlinkTypeFactory

2020-03-19 Thread Jira


 [ 
https://issues.apache.org/jira/browse/FLINK-10424?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dominik Wosiński closed FLINK-10424.

Resolution: Fixed

> Inconsistency between JsonSchemaConveerter and FlinkTypeFactory
> ---
>
> Key: FLINK-10424
> URL: https://issues.apache.org/jira/browse/FLINK-10424
> Project: Flink
>  Issue Type: Bug
>  Components: API / Type Serialization System
>Affects Versions: 1.6.0
>Reporter: Dominik Wosiński
>Assignee: Dominik Wosiński
>Priority: Major
>
> There is still an inconsistency between _JsonSchemaConverter_ and 
> _FlinkTypeFactory_ in case of using JsonSchema with _integer_ type field. 
> _JsonSchemaConverter_ will return BigInteger Type Information for _integer_, 
> but _FlinkTypeFactory_ currently does not support BigInteger Type Info and 
> thus an exception will be thrown. 
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot edited a comment on issue #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters

2020-03-19 Thread GitBox
flinkbot edited a comment on issue #11195: [FLINK-16222][runtime] Use plugins 
mechanism for initializing MetricReporters
URL: https://github.com/apache/flink/pull/11195#issuecomment-590065588
 
 
   
   ## CI report:
   
   * 738a6fd162d5ba99432438d224f3b4890961af56 UNKNOWN
   * 77f24b292a04fde34f4c645151484cf86a13832c UNKNOWN
   * 5dfb306ba6ff07f700b1afc21847641321705d4d UNKNOWN
   * 5b0f219689e81fe11c3511c13c7f4943a997a4ef UNKNOWN
   * 7ce9ad87a780aab19b9c97680d7af5f648c13f3b Travis: 
[FAILURE](https://travis-ci.com/github/flink-ci/flink/builds/154170382) 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters

2020-03-19 Thread GitBox
afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] 
Use plugins mechanism for initializing MetricReporters
URL: https://github.com/apache/flink/pull/11195#discussion_r395337241
 
 

 ##
 File path: 
flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusReporterFactory.java
 ##
 @@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.prometheus;
+
+import org.apache.flink.core.plugin.Plugin;
+import org.apache.flink.metrics.reporter.MetricReporterFactory;
+
+import java.util.Properties;
+
+/**
+ * {@link MetricReporterFactory} for {@link PrometheusReporter}.
+ */
+public class PrometheusReporterFactory implements MetricReporterFactory, 
Plugin {
 
 Review comment:
   Added.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters

2020-03-19 Thread GitBox
afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] 
Use plugins mechanism for initializing MetricReporters
URL: https://github.com/apache/flink/pull/11195#discussion_r395335333
 
 

 ##
 File path: 
flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusReporterFactory.java
 ##
 @@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.prometheus;
+
+import org.apache.flink.core.plugin.Plugin;
+import org.apache.flink.metrics.reporter.MetricReporterFactory;
+
+import java.util.Properties;
+
+/**
+ * {@link MetricReporterFactory} for {@link PrometheusReporter}.
+ */
+public class PrometheusReporterFactory implements MetricReporterFactory, 
Plugin {
+
+   @Override
+   public PrometheusReporter createMetricReporter(Properties properties) {
+   return new PrometheusReporter();
 
 Review comment:
   It seems that this will pull a rather large refactoring with it, because of 
the call to `super.open(config)` in the `open` methods and because of having to 
reconcile different configuration containers - `Properties` vs `MetricsConfig`. 
I would prefer to address it in a separate refactoring PR, if possible.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


  1   2   3   4   5   6   >