[jira] [Created] (FLINK-16689) kafka connector as source get byte deserialize add support charsets
xiaodao created FLINK-16689: --- Summary: kafka connector as source get byte deserialize add support charsets Key: FLINK-16689 URL: https://issues.apache.org/jira/browse/FLINK-16689 Project: Flink Issue Type: Improvement Components: Connectors / Kafka Affects Versions: 1.10.0 Reporter: xiaodao some times kafkaProductor send record which is serialize with Specified charsets eg: gbk, and the consumer is not support to deserialize with Specified charsets. just like: org.apache.flink.formats.json.JsonRowDeserializationSchema#deserialize -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] flinkbot edited a comment on issue #11458: [FLINK-16625][k8s] Extract BootstrapTools#getEnvironmentVariables to a general utility in ConfigurationUtils
flinkbot edited a comment on issue #11458: [FLINK-16625][k8s] Extract BootstrapTools#getEnvironmentVariables to a general utility in ConfigurationUtils URL: https://github.com/apache/flink/pull/11458#issuecomment-601537757 ## CI report: * 61e13aca86d70d5279d9ed0e482a5797915b068c Travis: [PENDING](https://travis-ci.com/github/flink-ci/flink/builds/154202409) Azure: [PENDING](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=6432) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] jinglining commented on a change in pull request #11250: [FLINK-16302][rest]add log list and read log by name for taskmanager
jinglining commented on a change in pull request #11250: [FLINK-16302][rest]add log list and read log by name for taskmanager URL: https://github.com/apache/flink/pull/11250#discussion_r395440082 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/AbstractTaskManagerFileHandler.java ## @@ -263,4 +266,8 @@ private void transferFile(ChannelHandlerContext ctx, File file, HttpRequest http throw new FlinkException("Could not transfer file " + file + " to the client.", ioe); } } + + protected String getFileName(HandlerRequest handlerRequest) { + return null; Review comment: If a user requests STDOUT and then LOG, he will not get STDOUT twice. As `fileBlobKeys` is not static, so it's belonging to its object. And I have tested it by: ```java log.info(String.format("fileBlobKeys [%s] cached file for TaskExecutor [%s] taskManagerId [%s], blobKey [%s]", fileBlobKeys.toString(), taskManagerIdAndFileName, taskManagerId, blobKey.toString())); ``` The result is: ```text 2020-03-20 11:32:18,582 INFO org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerLogFileHandler [] - fileBlobKeys [org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalLoadingCache@6cfede43] cached file for TaskExecutor [(c65f774c537555a1fb64c64f9b1ff18b,null)] taskManagerId [c65f774c537555a1fb64c64f9b1ff18b], blobKey [t-a8637ce83ae75216ce99399eb59f07c63c53603b-f018c857f158c354d3018f6d03ae2ad6] 2020-03-20 11:32:24,202 INFO org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerStdoutFileHandler [] - fileBlobKeys [org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalLoadingCache@76dd5d49] cached file for TaskExecutor [(c65f774c537555a1fb64c64f9b1ff18b,null)] taskManagerId [c65f774c537555a1fb64c64f9b1ff18b], blobKey [t-da39a3ee5e6b4b0d3255bfef95601890afd80709-89a50af389ae582c598680476d832e0d] ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] jinglining commented on a change in pull request #11250: [FLINK-16302][rest]add log list and read log by name for taskmanager
jinglining commented on a change in pull request #11250: [FLINK-16302][rest]add log list and read log by name for taskmanager URL: https://github.com/apache/flink/pull/11250#discussion_r395440082 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/AbstractTaskManagerFileHandler.java ## @@ -263,4 +266,8 @@ private void transferFile(ChannelHandlerContext ctx, File file, HttpRequest http throw new FlinkException("Could not transfer file " + file + " to the client.", ioe); } } + + protected String getFileName(HandlerRequest handlerRequest) { + return null; Review comment: If a user requests STDOUT and then LOG, he will not get STDOUT twice. As `fileBlobKeys` is not static, so it's belonging to its object. And I hava test it by ```java log.info(String.format("fileBlobKeys [%s] cached file for TaskExecutor [%s] taskManagerId [%s], blobKey [%s]", fileBlobKeys.toString(), taskManagerIdAndFileName, taskManagerId, blobKey.toString())); ``` The result is ```text 2020-03-20 11:32:18,582 INFO org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerLogFileHandler [] - fileBlobKeys [org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalLoadingCache@6cfede43] cached file for TaskExecutor [(c65f774c537555a1fb64c64f9b1ff18b,null)] taskManagerId [c65f774c537555a1fb64c64f9b1ff18b], blobKey [t-a8637ce83ae75216ce99399eb59f07c63c53603b-f018c857f158c354d3018f6d03ae2ad6] 2020-03-20 11:32:24,202 INFO org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerStdoutFileHandler [] - fileBlobKeys [org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalLoadingCache@76dd5d49] cached file for TaskExecutor [(c65f774c537555a1fb64c64f9b1ff18b,null)] taskManagerId [c65f774c537555a1fb64c64f9b1ff18b], blobKey [t-da39a3ee5e6b4b0d3255bfef95601890afd80709-89a50af389ae582c598680476d832e0d] ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot commented on issue #11458: [FLINK-16625][k8s] Extract BootstrapTools#getEnvironmentVariables to a general utility in ConfigurationUtils
flinkbot commented on issue #11458: [FLINK-16625][k8s] Extract BootstrapTools#getEnvironmentVariables to a general utility in ConfigurationUtils URL: https://github.com/apache/flink/pull/11458#issuecomment-601537757 ## CI report: * 61e13aca86d70d5279d9ed0e482a5797915b068c UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] xintongsong commented on issue #11284: [FLINK-15911][runtime] Make Flink work with NAT.
xintongsong commented on issue #11284: [FLINK-15911][runtime] Make Flink work with NAT. URL: https://github.com/apache/flink/pull/11284#issuecomment-601536922 Thanks for the review @tillrohrmann. > A quick question for my understanding. In the e2e test we give every container an external address which is equal to the hosts ip address, right? How can the docker container route the packages for this address if the docker network does not happen to use the same subnet as the host's IP address? Can docker container talk to services running in the host's network? I guess I don't understand Docker good enough... As I understand, all the packages whose destination is not the docker network will be routed to the host. It depends on the host's setting whether and where to further route the package, which, without any special settings, should be simply forward the package like any other package sent from the host. Accessing other services running on the host should have no difference from accessing any public Internet address from inside a docker container. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink-playgrounds] garyfeng closed pull request #11: Pyflink
garyfeng closed pull request #11: Pyflink URL: https://github.com/apache/flink-playgrounds/pull/11 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink-playgrounds] garyfeng commented on issue #11: Pyflink
garyfeng commented on issue #11: Pyflink URL: https://github.com/apache/flink-playgrounds/pull/11#issuecomment-601535615 The PyLink branch fixed the missing flink-connector and flink-formats issues. other bugs remain (#3). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink-playgrounds] garyfeng opened a new pull request #11: Pyflink
garyfeng opened a new pull request #11: Pyflink URL: https://github.com/apache/flink-playgrounds/pull/11 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] xintongsong commented on a change in pull request #11284: [FLINK-15911][runtime] Make Flink work with NAT.
xintongsong commented on a change in pull request #11284: [FLINK-15911][runtime] Make Flink work with NAT. URL: https://github.com/apache/flink/pull/11284#discussion_r395435948 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java ## @@ -351,21 +351,21 @@ public static TaskExecutor startTaskManager( LOG.info("Starting TaskManager with ResourceID: {}", resourceID); - InetAddress remoteAddress = InetAddress.getByName(rpcService.getAddress()); + InetAddress externalAddress = InetAddress.getByName(rpcService.getAddress()); Review comment: True. I'll try to fix the problem and update the e2e test case. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-16688) Hive-connector should set SessionState for hive
Jingsong Lee created FLINK-16688: Summary: Hive-connector should set SessionState for hive Key: FLINK-16688 URL: https://issues.apache.org/jira/browse/FLINK-16688 Project: Flink Issue Type: Bug Components: Connectors / Hive Affects Versions: 1.10.0 Reporter: Jingsong Lee Fix For: 1.10.1, 1.11.0 Without SessionState like GenericUDFUnixTimeStamp can not be used. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] xintongsong commented on a change in pull request #11284: [FLINK-15911][runtime] Make Flink work with NAT.
xintongsong commented on a change in pull request #11284: [FLINK-15911][runtime] Make Flink work with NAT. URL: https://github.com/apache/flink/pull/11284#discussion_r395435813 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java ## @@ -102,21 +102,21 @@ public int getNumTaskManagers() { public String getJobManagerBindAddress() { return commonBindAddress != null ? commonBindAddress : - configuration.getString(JobManagerOptions.ADDRESS, "localhost"); + configuration.getString(JobManagerOptions.BIND_HOST, "localhost"); } public String getTaskManagerBindAddress() { return commonBindAddress != null ? commonBindAddress : - configuration.getString(TaskManagerOptions.HOST, "localhost"); + configuration.getString(TaskManagerOptions.BIND_HOST, "localhost"); } - public String getJobManagerBindPortRange() { - return String.valueOf(configuration.getInteger(JobManagerOptions.PORT, 0)); + public int getJobManagerBindPort() { + return configuration.getInteger(JobManagerOptions.RPC_BIND_PORT, 0); } - public String getTaskManagerBindPortRange() { - return configuration.getString(TaskManagerOptions.RPC_PORT); + public int getTaskManagerBindPort() { + return configuration.getInteger(TaskManagerOptions.RPC_BIND_PORT, 0); Review comment: Not sure if I fully understand your concern. I think the RPC services are meant to be used by Flink components internally. Then how would user be affected if the configured address is not used internally? I have no problem preserving the external address/ports if configured. Just trying to understand. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #11457: [FLINK-14255][WIP][table] Introduce FileSystemStreamingSinkV2
flinkbot edited a comment on issue #11457: [FLINK-14255][WIP][table] Introduce FileSystemStreamingSinkV2 URL: https://github.com/apache/flink/pull/11457#issuecomment-601515074 ## CI report: * 2d43f0faeabc760120d3e0cbfc9632e42b2cfab6 Travis: [FAILURE](https://travis-ci.com/github/flink-ci/flink/builds/154199678) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=6431) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot commented on issue #11458: [FLINK-16625][k8s] Extract BootstrapTools#getEnvironmentVariables to a general utility in ConfigurationUtils
flinkbot commented on issue #11458: [FLINK-16625][k8s] Extract BootstrapTools#getEnvironmentVariables to a general utility in ConfigurationUtils URL: https://github.com/apache/flink/pull/11458#issuecomment-601534139 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Automated Checks Last check on commit 61e13aca86d70d5279d9ed0e482a5797915b068c (Fri Mar 20 04:31:53 UTC 2020) **Warnings:** * No documentation files were touched! Remember to keep the Flink docs up to date! Mention the bot in a comment to re-run the automated checks. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-16625) Extract BootstrapTools#getEnvironmentVariables to a general utility in ConfigurationUtils
[ https://issues.apache.org/jira/browse/FLINK-16625?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-16625: --- Labels: pull-request-available (was: ) > Extract BootstrapTools#getEnvironmentVariables to a general utility in > ConfigurationUtils > - > > Key: FLINK-16625 > URL: https://issues.apache.org/jira/browse/FLINK-16625 > Project: Flink > Issue Type: Improvement > Components: API / Core >Reporter: Canbin Zheng >Assignee: Canbin Zheng >Priority: Minor > Labels: pull-request-available > Fix For: 1.11.0 > > > {{BootstrapTools#getEnvironmentVariables}} actually is a general utility to > extract key-value pairs with specified prefix trimmed from the Flink > Configuration object. It can not only be used to extract customized > environment variables in the YARN setup but also for customized > annotations/labels/node-selectors in the Kubernetes setup. > This ticket proposes to rename it to > {{ConfigurationUtils#getPrefixedKeyValuePairs}} and move it to the > {{flink-core}} module as a more general utility to share for the > YARN/Kubernetes setup. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] zhengcanbin opened a new pull request #11458: [FLINK-16625][k8s] Extract BootstrapTools#getEnvironmentVariables to a general utility in ConfigurationUtils
zhengcanbin opened a new pull request #11458: [FLINK-16625][k8s] Extract BootstrapTools#getEnvironmentVariables to a general utility in ConfigurationUtils URL: https://github.com/apache/flink/pull/11458 ## What is the purpose of the change `BootstrapTools#getEnvironmentVariables` actually is a general utility to extract key-value pairs with specified prefix trimmed from the Flink Configuration object. It can not only be used to extract customized environment variables in the YARN setup but also for customized annotations/labels/node-selectors in the Kubernetes setup. This ticket proposes to rename it to `ConfigurationUtils#getPrefixedKeyValuePairs` and move it to the flink-core module as a more general utility to share for the YARN/Kubernetes setup. ## Verifying this change This change added tests. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #11453: [FLINK-13553][tests] Fix ByteBuf leak in KvStateServerHandlerTest
zhijiangW commented on a change in pull request #11453: [FLINK-13553][tests] Fix ByteBuf leak in KvStateServerHandlerTest URL: https://github.com/apache/flink/pull/11453#discussion_r395433462 ## File path: flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java ## @@ -544,6 +552,7 @@ public void testIncomingBufferIsRecycled() throws Exception { channel.writeInbound(unexpected); assertEquals("Buffer not recycled", 0L, unexpected.refCnt()); + channel.finishAndReleaseAll(); Review comment: I am curious of why we add this action only for this test, and actually the allocated buffers in this unit test are already released before. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #11457: [FLINK-14255][WIP][table] Introduce FileSystemStreamingSinkV2
flinkbot edited a comment on issue #11457: [FLINK-14255][WIP][table] Introduce FileSystemStreamingSinkV2 URL: https://github.com/apache/flink/pull/11457#issuecomment-601515074 ## CI report: * 2d43f0faeabc760120d3e0cbfc9632e42b2cfab6 Travis: [PENDING](https://travis-ci.com/github/flink-ci/flink/builds/154199678) Azure: [PENDING](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=6431) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] jiasheng55 commented on issue #11322: [FLINK-16376][yarn] Use consistent method to get Yarn application dir…
jiasheng55 commented on issue #11322: [FLINK-16376][yarn] Use consistent method to get Yarn application dir… URL: https://github.com/apache/flink/pull/11322#issuecomment-601517139 @kl0u this PR removed the "targetDir" parameter from `YarnClusterDescriptor#uploadAndRegisterFiles` method and use `FileSystem#getHomeDirectory` as the default target direcotry, so the `YarnFileStageTestS3ITCase .testRecursiveUploadForYarnS3a ()` failed becuase it seemed having credential error accessing the **"/user/{user.name}"** bucket. If we choose not to use `mockito`, is it possible to set the bucket **"/user"** accessible? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] ifndef-SleePy commented on a change in pull request #11347: [FLINK-14971][checkpointing] Make all the non-IO operations in CheckpointCoordinator single-threaded
ifndef-SleePy commented on a change in pull request #11347: [FLINK-14971][checkpointing] Make all the non-IO operations in CheckpointCoordinator single-threaded URL: https://github.com/apache/flink/pull/11347#discussion_r395431078 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java ## @@ -151,6 +149,9 @@ * It must be single-threaded. Eventually it will be replaced by main thread executor. */ private final ScheduledExecutor timer; + @Nullable + private ComponentMainThreadExecutor mainThreadExecutor; Review comment: Yes, it could not be a `final` variable based on current implementation of `ExecutionGraph`. I'm just not a fan of the trick of `DummyComponentMainThreadExecutor`. But I agree with you that it's not a bad idea to provide a consistent way with `ExecutionGraph`. Let me think of it a bit more. Maybe we could find a better solution. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot commented on issue #11457: [FLINK-14255][WIP][table] Introduce FileSystemStreamingSinkV2
flinkbot commented on issue #11457: [FLINK-14255][WIP][table] Introduce FileSystemStreamingSinkV2 URL: https://github.com/apache/flink/pull/11457#issuecomment-601515074 ## CI report: * 2d43f0faeabc760120d3e0cbfc9632e42b2cfab6 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #11456: [FLINK-16602][k8s] Rework the internal/external Service
flinkbot edited a comment on issue #11456: [FLINK-16602][k8s] Rework the internal/external Service URL: https://github.com/apache/flink/pull/11456#issuecomment-601501414 ## CI report: * e4c3073a6fb379bacb4da0e299245a73ea2a97d4 Travis: [SUCCESS](https://travis-ci.com/github/flink-ci/flink/builds/154194249) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=6429) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-16625) Extract BootstrapTools#getEnvironmentVariables to a general utility in ConfigurationUtils
[ https://issues.apache.org/jira/browse/FLINK-16625?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Canbin Zheng updated FLINK-16625: - Summary: Extract BootstrapTools#getEnvironmentVariables to a general utility in ConfigurationUtils (was: Extract BootstrapTools#getEnvironmentVariables to a general utility in ConfigurationUtil) > Extract BootstrapTools#getEnvironmentVariables to a general utility in > ConfigurationUtils > - > > Key: FLINK-16625 > URL: https://issues.apache.org/jira/browse/FLINK-16625 > Project: Flink > Issue Type: Improvement > Components: API / Core >Reporter: Canbin Zheng >Assignee: Canbin Zheng >Priority: Minor > Fix For: 1.11.0 > > > {{BootstrapTools#getEnvironmentVariables}} actually is a general utility to > extract key-value pairs with specified prefix trimmed from the Flink > Configuration object. It can not only be used to extract customized > environment variables in the YARN setup but also for customized > annotations/labels/node-selectors in the Kubernetes setup. > This ticket proposes to rename it to > {{ConfigurationUtil#getPrefixedKeyValuePairs}} and move it to the > {{flink-core}} module as a more general utility to share for the > YARN/Kubernetes setup. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-16625) Extract BootstrapTools#getEnvironmentVariables to a general utility in ConfigurationUtils
[ https://issues.apache.org/jira/browse/FLINK-16625?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Canbin Zheng updated FLINK-16625: - Description: {{BootstrapTools#getEnvironmentVariables}} actually is a general utility to extract key-value pairs with specified prefix trimmed from the Flink Configuration object. It can not only be used to extract customized environment variables in the YARN setup but also for customized annotations/labels/node-selectors in the Kubernetes setup. This ticket proposes to rename it to {{ConfigurationUtils#getPrefixedKeyValuePairs}} and move it to the {{flink-core}} module as a more general utility to share for the YARN/Kubernetes setup. was: {{BootstrapTools#getEnvironmentVariables}} actually is a general utility to extract key-value pairs with specified prefix trimmed from the Flink Configuration object. It can not only be used to extract customized environment variables in the YARN setup but also for customized annotations/labels/node-selectors in the Kubernetes setup. This ticket proposes to rename it to {{ConfigurationUtil#getPrefixedKeyValuePairs}} and move it to the {{flink-core}} module as a more general utility to share for the YARN/Kubernetes setup. > Extract BootstrapTools#getEnvironmentVariables to a general utility in > ConfigurationUtils > - > > Key: FLINK-16625 > URL: https://issues.apache.org/jira/browse/FLINK-16625 > Project: Flink > Issue Type: Improvement > Components: API / Core >Reporter: Canbin Zheng >Assignee: Canbin Zheng >Priority: Minor > Fix For: 1.11.0 > > > {{BootstrapTools#getEnvironmentVariables}} actually is a general utility to > extract key-value pairs with specified prefix trimmed from the Flink > Configuration object. It can not only be used to extract customized > environment variables in the YARN setup but also for customized > annotations/labels/node-selectors in the Kubernetes setup. > This ticket proposes to rename it to > {{ConfigurationUtils#getPrefixedKeyValuePairs}} and move it to the > {{flink-core}} module as a more general utility to share for the > YARN/Kubernetes setup. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] xintongsong commented on a change in pull request #11284: [FLINK-15911][runtime] Make Flink work with NAT.
xintongsong commented on a change in pull request #11284: [FLINK-15911][runtime] Make Flink work with NAT. URL: https://github.com/apache/flink/pull/11284#discussion_r395430171 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java ## @@ -248,10 +247,14 @@ protected void initializeServices(Configuration configuration) throws Exception LOG.info("Initializing cluster services."); synchronized (lock) { - final String bindAddress = configuration.getString(JobManagerOptions.ADDRESS); - final String portRange = getRPCPortRange(configuration); - - commonRpcService = createRpcService(configuration, bindAddress, portRange); + commonRpcService = AkkaRpcServiceUtils.createRemoteRpcService( + configuration, + configuration.getString(JobManagerOptions.ADDRESS), + getRPCPortRange(configuration), + configuration.getString(JobManagerOptions.BIND_HOST), + configuration.contains(JobManagerOptions.RPC_BIND_PORT) ? + configuration.getInteger(JobManagerOptions.RPC_BIND_PORT) : + null); Review comment: Same here. I also think using Optional will express the contract better. The question is do we need to strictly follow the code style guide. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Assigned] (FLINK-16625) Extract BootstrapTools#getEnvironmentVariables to a general utility in ConfigurationUtil
[ https://issues.apache.org/jira/browse/FLINK-16625?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zili Chen reassigned FLINK-16625: - Assignee: Canbin Zheng > Extract BootstrapTools#getEnvironmentVariables to a general utility in > ConfigurationUtil > > > Key: FLINK-16625 > URL: https://issues.apache.org/jira/browse/FLINK-16625 > Project: Flink > Issue Type: Improvement > Components: API / Core >Reporter: Canbin Zheng >Assignee: Canbin Zheng >Priority: Minor > Fix For: 1.11.0 > > > {{BootstrapTools#getEnvironmentVariables}} actually is a general utility to > extract key-value pairs with specified prefix trimmed from the Flink > Configuration object. It can not only be used to extract customized > environment variables in the YARN setup but also for customized > annotations/labels/node-selectors in the Kubernetes setup. > This ticket proposes to rename it to > {{ConfigurationUtil#getPrefixedKeyValuePairs}} and move it to the > {{flink-core}} module as a more general utility to share for the > YARN/Kubernetes setup. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] jiasheng55 commented on a change in pull request #11322: [FLINK-16376][yarn] Use consistent method to get Yarn application dir…
jiasheng55 commented on a change in pull request #11322: [FLINK-16376][yarn] Use consistent method to get Yarn application dir… URL: https://github.com/apache/flink/pull/11322#discussion_r393478330 ## File path: flink-yarn/src/test/java/org/apache/flink/yarn/YarnFileStageTestS3ITCase.java ## @@ -160,8 +162,12 @@ private void testRecursiveUploadForYarn(String scheme, String pathSuffix) throws assumeFalse(fs.exists(basePath)); + // mock the test bucket path as home dir. + org.apache.hadoop.fs.FileSystem spyFileSystem = spy(fs.getHadoopFileSystem()); + when(spyFileSystem.getHomeDirectory()).thenReturn(new org.apache.hadoop.fs.Path(basePath.toUri())); Review comment: @kl0u you're right, the test path changed from **S3_TEST_BUCKET** to which returned by `org.apache.hadoop.fs.FileSystem#getHomeDirectory`. I added a few lines to mock the S3_TEST_BUCKET as the home directory, what do you think? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] xintongsong commented on a change in pull request #11284: [FLINK-15911][runtime] Make Flink work with NAT.
xintongsong commented on a change in pull request #11284: [FLINK-15911][runtime] Make Flink work with NAT. URL: https://github.com/apache/flink/pull/11284#discussion_r395429730 ## File path: flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java ## @@ -94,19 +94,28 @@ " or if it has been quarantined by another actor system."); /** -* The config parameter defining the task manager's hostname. +* The external address of the network interface where the TaskManager is exposed. * Overrides {@link #HOST_BIND_POLICY} automatic address binding. */ @Documentation.Section({Documentation.Sections.COMMON_HOST_PORT, Documentation.Sections.ALL_TASK_MANAGER}) public static final ConfigOption HOST = key("taskmanager.host") .stringType() .noDefaultValue() - .withDescription("The address of the network interface that the TaskManager binds to." + - " This option can be used to define explicitly a binding address. Because" + - " different TaskManagers need different values for this option, usually it is specified in an" + + .withDescription("The external address of the network interface where the TaskManager is exposed." + + " Because different TaskManagers need different values for this option, usually it is specified in an" + " additional non-shared TaskManager-specific config file."); + /** +* The local address of the network interface that the task manager binds to. +*/ + public static final ConfigOption BIND_HOST = + key("taskmanager.bind-host") Review comment: Same here. TM address and bind-address are shared by RPC service, netty shuffle service and queryable kv state. I'm actually thinking about changing "jobmanager.rpc.address" to "jobmanager.host" as a follow up, to make the config keys consistently using "host". This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-15447) To improve utilization of the `java.io.tmpdir` for YARN module
[ https://issues.apache.org/jira/browse/FLINK-15447?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17063064#comment-17063064 ] Victor Wong commented on FLINK-15447: - Currently, we can solve this issue through "env.java.opts: -Djava.io.tmpdir=./tmp", closing this issue now. > To improve utilization of the `java.io.tmpdir` for YARN module > -- > > Key: FLINK-15447 > URL: https://issues.apache.org/jira/browse/FLINK-15447 > Project: Flink > Issue Type: Improvement > Components: Deployment / YARN >Affects Versions: 1.9.1 >Reporter: Victor Wong >Priority: Major > Labels: pull-request-available > Time Spent: 20m > Remaining Estimate: 0h > > *#Background* > Currently, when running Flink on Yarn, the "java.io.tmpdir" property is set > to the default value, which is "/tmp". > Sometimes we ran into exceptions caused by a full "/tmp" directory, which > would not be cleaned automatically after applications finished. > > #*Goal* > quoted from: [HADOOP-2735|https://issues.apache.org/jira/browse/HADOOP-2735] > _1) Tasks can utilize all disks when using tmp_ > _2) Any undeleted tmp files will be deleted by the tasktracker when > task(job?) is done._ > > #*Suggestion* > I think we can set "java.io.tmpdir" to "PWD/tmp" directory, or > something similar. "PWD" will be replaced with the true working > directory of JM/TM by Yarn, which will be cleaned automatically. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Closed] (FLINK-15447) To improve utilization of the `java.io.tmpdir` for YARN module
[ https://issues.apache.org/jira/browse/FLINK-15447?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Victor Wong closed FLINK-15447. --- Resolution: Not A Problem > To improve utilization of the `java.io.tmpdir` for YARN module > -- > > Key: FLINK-15447 > URL: https://issues.apache.org/jira/browse/FLINK-15447 > Project: Flink > Issue Type: Improvement > Components: Deployment / YARN >Affects Versions: 1.9.1 >Reporter: Victor Wong >Priority: Major > Labels: pull-request-available > Time Spent: 20m > Remaining Estimate: 0h > > *#Background* > Currently, when running Flink on Yarn, the "java.io.tmpdir" property is set > to the default value, which is "/tmp". > Sometimes we ran into exceptions caused by a full "/tmp" directory, which > would not be cleaned automatically after applications finished. > > #*Goal* > quoted from: [HADOOP-2735|https://issues.apache.org/jira/browse/HADOOP-2735] > _1) Tasks can utilize all disks when using tmp_ > _2) Any undeleted tmp files will be deleted by the tasktracker when > task(job?) is done._ > > #*Suggestion* > I think we can set "java.io.tmpdir" to "PWD/tmp" directory, or > something similar. "PWD" will be replaced with the true working > directory of JM/TM by Yarn, which will be cleaned automatically. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] ifndef-SleePy commented on a change in pull request #11347: [FLINK-14971][checkpointing] Make all the non-IO operations in CheckpointCoordinator single-threaded
ifndef-SleePy commented on a change in pull request #11347: [FLINK-14971][checkpointing] Make all the non-IO operations in CheckpointCoordinator single-threaded URL: https://github.com/apache/flink/pull/11347#discussion_r395429422 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java ## @@ -91,9 +92,6 @@ // - /** Coordinator-wide lock to safeguard the checkpoint updates. */ - private final Object lock = new Object(); Review comment: Sure, nice point! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] ifndef-SleePy commented on a change in pull request #11347: [FLINK-14971][checkpointing] Make all the non-IO operations in CheckpointCoordinator single-threaded
ifndef-SleePy commented on a change in pull request #11347: [FLINK-14971][checkpointing] Make all the non-IO operations in CheckpointCoordinator single-threaded URL: https://github.com/apache/flink/pull/11347#discussion_r395429228 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java ## @@ -111,6 +108,14 @@ /** The executor for potentially blocking I/O operations, like state disposal. */ private final Executor executor; + /** The executor for non-blocking operations. */ + private final Executor mainThreadExecutor; + + private final CompletedCheckpointStore completedCheckpointStore; + + /** The lock for avoiding conflict between I/O operations. */ + private final Object operationLock = new Object(); Review comment: Yes, there is a small possibility that the `CheckpointCoordinator` is shut down when a `PendingCheckpoint` is doing finalization. There could be some concurrent conflicts on `operatorStates` and `targetLocation`. It might be not a big deal because it would be shut down anyway. The finalization probably could not finish because the IO executor would be also shut down. However it's not so elegant to leave the concurrent issue to the `CheckpointStorageLocation` and `OperatorState`. And it's a bit heavy to make all of these implementations thread-safe to avoid the small possibility issue. So here I think introducing a lock outside is better. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Assigned] (FLINK-16687) PyFlink Cannot determine simple type name "PythonScalarFunction$0"
[ https://issues.apache.org/jira/browse/FLINK-16687?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dian Fu reassigned FLINK-16687: --- Assignee: Huang Xingbo > PyFlink Cannot determine simple type name "PythonScalarFunction$0" > -- > > Key: FLINK-16687 > URL: https://issues.apache.org/jira/browse/FLINK-16687 > Project: Flink > Issue Type: Bug > Components: API / Python >Affects Versions: 1.10.0 >Reporter: mayne wong >Assignee: Huang Xingbo >Priority: Major > > > I try to run PyFlink UDF with SQL UNNEST, execution of job failed, I defined > a source from element, and use UDF split the string to list. > raise org.codehaus.commons.compiler.CompileException: Cannot determine simple > type name "PythonScalarFunction$0" > {code:python} > import os > from pyflink.table.udf import udf > from pyflink.datastream import StreamExecutionEnvironment > from pyflink.table import StreamTableEnvironment, DataTypes, CsvTableSink > @udf(input_types=[DataTypes.STRING()], > result_type=DataTypes.ARRAY(DataTypes.STRING())) > def format_string_to_array(item): > return item.replace('[', '').replace(']', '').replace(', ', > ',').split(',') > if __name__ == '__main__': > env = StreamExecutionEnvironment.get_execution_environment() > env.set_parallelism(1) > st_env = StreamTableEnvironment.create(env) > result_file = "result.csv" > if os.path.exists(result_file): > os.remove(result_file) > st_env.register_table_sink("result_tab", CsvTableSink(["id", "url"], > [DataTypes.STRING(), DataTypes.STRING()], result_file)) > st_env.register_function("format_string_to_array", format_string_to_array) > tab = st_env.from_elements([("1", "['www.bing.com', 'www.google.com']"), > ("2", "['www.taobao.com']")], ['id', 'urls']) > st_env.register_table("temp_table", tab) > st_env.sql_query("Select id, A.url from temp_table, > UNNEST(format_string_to_array(temp_table.urls)) AS > A(url)").insert_into("result_tab") > st_env.execute("udf") > {code} > > When I execute the program, I get the following exception: > > {code:java} > py4j.protocol.Py4JJavaError: An error occurred while calling o2.execute. > : java.util.concurrent.ExecutionException: > org.apache.flink.client.program.ProgramInvocationException: Job failed > (JobID: 5d63838ad2043bf4a5d0bca83623959d) > at > java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) > at > java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908) > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1640) > at > org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:74) > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1620) > at > org.apache.flink.table.executor.StreamExecutor.execute(StreamExecutor.java:50) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:643) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) > at > org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) > at > org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282) > at > org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) > at > org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79) > at > org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238) > at java.lang.Thread.run(Thread.java:748) > Caused by: org.apache.flink.client.program.ProgramInvocationException: Job > failed (JobID: 5d63838ad2043bf4a5d0bca83623959d) > at > org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:112) > at > java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616) > at > java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591) > at > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) > at >
[GitHub] [flink] jiasheng55 closed pull request #11331: [FLINK-15447][yarn] Add config to define 'java.io.tmpdir' property of…
jiasheng55 closed pull request #11331: [FLINK-15447][yarn] Add config to define 'java.io.tmpdir' property of… URL: https://github.com/apache/flink/pull/11331 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] xintongsong commented on a change in pull request #11284: [FLINK-15911][runtime] Make Flink work with NAT.
xintongsong commented on a change in pull request #11284: [FLINK-15911][runtime] Make Flink work with NAT. URL: https://github.com/apache/flink/pull/11284#discussion_r395428193 ## File path: flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java ## @@ -54,6 +54,16 @@ " leader-election service (like ZooKeeper) is used to elect and discover the JobManager" + " leader from potentially multiple standby JobManagers."); + /** +* The local address of the network interface that the job manager binds to. +*/ + public static final ConfigOption BIND_HOST = + key("jobmanager.bind-host") Review comment: This address is not only used by the RPC service, but also shared by the blob server, rest server, and potentially any other service in future that needs to bind to the network interface. To that end, I think the config option for TM ("taskmanager.host") makes more sense. I guess that might be the reason we use "host" in the config key at the first place. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] xintongsong commented on a change in pull request #11284: [FLINK-15911][runtime] Make Flink work with NAT.
xintongsong commented on a change in pull request #11284: [FLINK-15911][runtime] Make Flink work with NAT. URL: https://github.com/apache/flink/pull/11284#discussion_r395426701 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java ## @@ -261,26 +258,30 @@ public void start() throws Exception { // bring up all the RPC services LOG.info("Starting RPC Service(s)"); - AkkaRpcServiceConfiguration akkaRpcServiceConfig = AkkaRpcServiceConfiguration.fromConfiguration(configuration); - final RpcServiceFactory dispatcherResourceManagreComponentRpcServiceFactory; if (useSingleRpcService) { // we always need the 'commonRpcService' for auxiliary calls - commonRpcService = createRpcService(akkaRpcServiceConfig, false, null); + commonRpcService = createLocalRpcService(configuration); final CommonRpcServiceFactory commonRpcServiceFactory = new CommonRpcServiceFactory(commonRpcService); taskManagerRpcServiceFactory = commonRpcServiceFactory; dispatcherResourceManagreComponentRpcServiceFactory = commonRpcServiceFactory; } else { - // we always need the 'commonRpcService' for auxiliary calls - commonRpcService = createRpcService(akkaRpcServiceConfig, true, null); // start a new service per component, possibly with custom bind addresses final String jobManagerBindAddress = miniClusterConfiguration.getJobManagerBindAddress(); final String taskManagerBindAddress = miniClusterConfiguration.getTaskManagerBindAddress(); + final String jobManagerBindPort = miniClusterConfiguration.getJobManagerBindPortRange(); + final String taskManagerBindPort = miniClusterConfiguration.getTaskManagerBindPortRange(); Review comment: Please see my explanation above. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot commented on issue #11457: [FLINK-14255][WIP][table] Introduce FileSystemStreamingSinkV2
flinkbot commented on issue #11457: [FLINK-14255][WIP][table] Introduce FileSystemStreamingSinkV2 URL: https://github.com/apache/flink/pull/11457#issuecomment-601511488 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Automated Checks Last check on commit 2d43f0faeabc760120d3e0cbfc9632e42b2cfab6 (Fri Mar 20 03:39:44 UTC 2020) **Warnings:** * No documentation files were touched! Remember to keep the Flink docs up to date! Mention the bot in a comment to re-run the automated checks. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] xintongsong commented on a change in pull request #11284: [FLINK-15911][runtime] Make Flink work with NAT.
xintongsong commented on a change in pull request #11284: [FLINK-15911][runtime] Make Flink work with NAT. URL: https://github.com/apache/flink/pull/11284#discussion_r395426615 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java ## @@ -261,26 +258,30 @@ public void start() throws Exception { // bring up all the RPC services LOG.info("Starting RPC Service(s)"); - AkkaRpcServiceConfiguration akkaRpcServiceConfig = AkkaRpcServiceConfiguration.fromConfiguration(configuration); - final RpcServiceFactory dispatcherResourceManagreComponentRpcServiceFactory; if (useSingleRpcService) { // we always need the 'commonRpcService' for auxiliary calls - commonRpcService = createRpcService(akkaRpcServiceConfig, false, null); + commonRpcService = createLocalRpcService(configuration); final CommonRpcServiceFactory commonRpcServiceFactory = new CommonRpcServiceFactory(commonRpcService); taskManagerRpcServiceFactory = commonRpcServiceFactory; dispatcherResourceManagreComponentRpcServiceFactory = commonRpcServiceFactory; } else { - // we always need the 'commonRpcService' for auxiliary calls - commonRpcService = createRpcService(akkaRpcServiceConfig, true, null); // start a new service per component, possibly with custom bind addresses final String jobManagerBindAddress = miniClusterConfiguration.getJobManagerBindAddress(); final String taskManagerBindAddress = miniClusterConfiguration.getTaskManagerBindAddress(); + final String jobManagerBindPort = miniClusterConfiguration.getJobManagerBindPortRange(); Review comment: No, and I don't think we should. Whenever the user needs to specify a bind port apart form the external port, the user also needs to set port forwarding rules in the environment, e.g. NAT gateway. For each external port, it can only be forwarded to one internal port. Therefore, it does not make sense to support a range of bind ports, because the user needs to know which internal port is bound anyway. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] JingsongLi opened a new pull request #11457: [FLINK-14255][WIP][table] Introduce FileSystemStreamingSinkV2
JingsongLi opened a new pull request #11457: [FLINK-14255][WIP][table] Introduce FileSystemStreamingSinkV2 URL: https://github.com/apache/flink/pull/11457 ## What is the purpose of the change Build table streaming file sink based on DataStream StreamingFileSink. NOTE: Work in progress, for testing. ## Brief change log - Introduce FileSystemStreamingSink - Integrate hive to FileSystemStreamingSink ## Verifying this change FileSystemStreamingSinkTest ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? (yes) - If yes, how is the feature documented? JavaDocs This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] xintongsong commented on a change in pull request #11284: [FLINK-15911][runtime] Make Flink work with NAT.
xintongsong commented on a change in pull request #11284: [FLINK-15911][runtime] Make Flink work with NAT. URL: https://github.com/apache/flink/pull/11284#discussion_r395425038 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java ## @@ -82,92 +82,80 @@ .build(); /** -* Starts an ActorSystem with the given configuration listening at the address/ports. +* Starts a remote ActorSystem at given address and specific port range. * @param configuration The Flink configuration -* @param listeningAddress The address to listen at. -* @param portRangeDefinition The port range to choose a port from. +* @param externalAddress The external address to access the ActorSystem. +* @param externalPortRange The choosing range of the external port to access the ActorSystem. * @param logger The logger to output log information. * @return The ActorSystem which has been started * @throws Exception Thrown when actor system cannot be started in specified port range */ - public static ActorSystem startActorSystem( + @VisibleForTesting + public static ActorSystem startRemoteActorSystem( Configuration configuration, - String listeningAddress, - String portRangeDefinition, + String externalAddress, + String externalPortRange, Logger logger) throws Exception { - return startActorSystem( - configuration, - listeningAddress, - portRangeDefinition, - logger, - ForkJoinExecutorConfiguration.fromConfiguration(configuration)); - } - - /** -* Starts an ActorSystem with the given configuration listening at the address/ports. -* -* @param configuration The Flink configuration -* @param listeningAddress The address to listen at. -* @param portRangeDefinition The port range to choose a port from. -* @param logger The logger to output log information. -* @param actorSystemExecutorConfiguration configuration for the ActorSystem's underlying executor -* @return The ActorSystem which has been started -* @throws Exception Thrown when actor system cannot be started in specified port range -*/ - public static ActorSystem startActorSystem( - Configuration configuration, - String listeningAddress, - String portRangeDefinition, - Logger logger, - @Nonnull ActorSystemExecutorConfiguration actorSystemExecutorConfiguration) throws Exception { - return startActorSystem( + return startRemoteActorSystem( configuration, AkkaUtils.getFlinkActorSystemName(), - listeningAddress, - portRangeDefinition, + externalAddress, + externalPortRange, + NetUtils.getWildcardIPAddress(), + -1, logger, - actorSystemExecutorConfiguration); + ForkJoinExecutorConfiguration.fromConfiguration(configuration), + null); } /** -* Starts an ActorSystem with the given configuration listening at the address/ports. +* Starts a remote ActorSystem at given address and specific port range. * * @param configuration The Flink configuration * @param actorSystemName Name of the started {@link ActorSystem} -* @param listeningAddress The address to listen at. -* @param portRangeDefinition The port range to choose a port from. +* @param externalAddress The external address to access the ActorSystem. +* @param externalPortRange The choosing range of the external port to access the ActorSystem. +* @param bindAddress The local address to bind to. +* @param bindPort The local port to bind to. If negative, external port will be used. * @param logger The logger to output log information. * @param actorSystemExecutorConfiguration configuration for the ActorSystem's underlying executor +* @param customConfig Custom Akka config to be combined with the config derived from Flink configuration. * @return The ActorSystem which has been started * @throws Exception Thrown when actor system cannot be started in specified port range */ - public static ActorSystem startActorSystem( + public static ActorSystem startRemoteActorSystem( Configuration configuration, String
[GitHub] [flink] flinkbot edited a comment on issue #11456: [FLINK-16602][k8s] Rework the internal/external Service
flinkbot edited a comment on issue #11456: [FLINK-16602][k8s] Rework the internal/external Service URL: https://github.com/apache/flink/pull/11456#issuecomment-601501414 ## CI report: * e4c3073a6fb379bacb4da0e299245a73ea2a97d4 Travis: [PENDING](https://travis-ci.com/github/flink-ci/flink/builds/154194249) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=6429) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] xintongsong commented on a change in pull request #11284: [FLINK-15911][runtime] Make Flink work with NAT.
xintongsong commented on a change in pull request #11284: [FLINK-15911][runtime] Make Flink work with NAT. URL: https://github.com/apache/flink/pull/11284#discussion_r395425038 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java ## @@ -82,92 +82,80 @@ .build(); /** -* Starts an ActorSystem with the given configuration listening at the address/ports. +* Starts a remote ActorSystem at given address and specific port range. * @param configuration The Flink configuration -* @param listeningAddress The address to listen at. -* @param portRangeDefinition The port range to choose a port from. +* @param externalAddress The external address to access the ActorSystem. +* @param externalPortRange The choosing range of the external port to access the ActorSystem. * @param logger The logger to output log information. * @return The ActorSystem which has been started * @throws Exception Thrown when actor system cannot be started in specified port range */ - public static ActorSystem startActorSystem( + @VisibleForTesting + public static ActorSystem startRemoteActorSystem( Configuration configuration, - String listeningAddress, - String portRangeDefinition, + String externalAddress, + String externalPortRange, Logger logger) throws Exception { - return startActorSystem( - configuration, - listeningAddress, - portRangeDefinition, - logger, - ForkJoinExecutorConfiguration.fromConfiguration(configuration)); - } - - /** -* Starts an ActorSystem with the given configuration listening at the address/ports. -* -* @param configuration The Flink configuration -* @param listeningAddress The address to listen at. -* @param portRangeDefinition The port range to choose a port from. -* @param logger The logger to output log information. -* @param actorSystemExecutorConfiguration configuration for the ActorSystem's underlying executor -* @return The ActorSystem which has been started -* @throws Exception Thrown when actor system cannot be started in specified port range -*/ - public static ActorSystem startActorSystem( - Configuration configuration, - String listeningAddress, - String portRangeDefinition, - Logger logger, - @Nonnull ActorSystemExecutorConfiguration actorSystemExecutorConfiguration) throws Exception { - return startActorSystem( + return startRemoteActorSystem( configuration, AkkaUtils.getFlinkActorSystemName(), - listeningAddress, - portRangeDefinition, + externalAddress, + externalPortRange, + NetUtils.getWildcardIPAddress(), + -1, logger, - actorSystemExecutorConfiguration); + ForkJoinExecutorConfiguration.fromConfiguration(configuration), + null); } /** -* Starts an ActorSystem with the given configuration listening at the address/ports. +* Starts a remote ActorSystem at given address and specific port range. * * @param configuration The Flink configuration * @param actorSystemName Name of the started {@link ActorSystem} -* @param listeningAddress The address to listen at. -* @param portRangeDefinition The port range to choose a port from. +* @param externalAddress The external address to access the ActorSystem. +* @param externalPortRange The choosing range of the external port to access the ActorSystem. +* @param bindAddress The local address to bind to. +* @param bindPort The local port to bind to. If negative, external port will be used. * @param logger The logger to output log information. * @param actorSystemExecutorConfiguration configuration for the ActorSystem's underlying executor +* @param customConfig Custom Akka config to be combined with the config derived from Flink configuration. * @return The ActorSystem which has been started * @throws Exception Thrown when actor system cannot be started in specified port range */ - public static ActorSystem startActorSystem( + public static ActorSystem startRemoteActorSystem( Configuration configuration, String
[GitHub] [flink] flinkbot edited a comment on issue #11415: [FLINK-15667][k8s] Support to mount custom Hadoop configurations
flinkbot edited a comment on issue #11415: [FLINK-15667][k8s] Support to mount custom Hadoop configurations URL: https://github.com/apache/flink/pull/11415#issuecomment-599458187 ## CI report: * abc66a0d2605b380ed890dd23e4ff19c9a65ed6a Travis: [FAILURE](https://travis-ci.com/github/flink-ci/flink/builds/154195224) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=6430) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] liuzhixing1006 commented on issue #11391: [FLINK-16098] [chinese-translation, Documentation] Translate "Overview" page of "Hive Integration" into Chinese
liuzhixing1006 commented on issue #11391: [FLINK-16098] [chinese-translation, Documentation] Translate "Overview" page of "Hive Integration" into Chinese URL: https://github.com/apache/flink/pull/11391#issuecomment-601505120 Thank you @JingsongLi for bringing this problem to my attention, There's something wrong with this branch, I created a new pr [https://github.com/apache/flink/pull/11455](New PR). I'm sorry to have caused so much trouble. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #11456: [FLINK-16602][k8s] Rework the internal/external Service
flinkbot edited a comment on issue #11456: [FLINK-16602][k8s] Rework the internal/external Service URL: https://github.com/apache/flink/pull/11456#issuecomment-601501414 ## CI report: * e4c3073a6fb379bacb4da0e299245a73ea2a97d4 Travis: [PENDING](https://travis-ci.com/github/flink-ci/flink/builds/154194249) Azure: [PENDING](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=6429) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #11415: [FLINK-15667][k8s] Support to mount custom Hadoop configurations
flinkbot edited a comment on issue #11415: [FLINK-15667][k8s] Support to mount custom Hadoop configurations URL: https://github.com/apache/flink/pull/11415#issuecomment-599458187 ## CI report: * 64eaf715bef2bfc9492ef197ecec62d3e067d18d Travis: [PENDING](https://travis-ci.com/github/flink-ci/flink/builds/154064206) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=6399) * abc66a0d2605b380ed890dd23e4ff19c9a65ed6a UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] jinglining commented on a change in pull request #11250: [FLINK-16302][rest]add log list and read log by name for taskmanager
jinglining commented on a change in pull request #11250: [FLINK-16302][rest]add log list and read log by name for taskmanager URL: https://github.com/apache/flink/pull/11250#discussion_r395418363 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagerLogsHandlerTest.java ## @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.taskmanager; + +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.resourcemanager.exceptions.UnknownTaskExecutorException; +import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.handler.HandlerRequestException; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.taskmanager.LogInfo; +import org.apache.flink.runtime.rest.messages.taskmanager.LogsInfo; +import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerIdPathParameter; +import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerLogsHeaders; +import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerMessageParameters; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; + +import static org.junit.Assert.assertEquals; + +/** + * Test for the {@link TaskManagerLogsHandler}. + */ +public class TaskManagerLogsHandlerTest extends TestLogger { + + private static final ResourceID EXPECTED_TASK_MANAGER_ID = ResourceID.generate(); + + @Test + public void testGetTaskManagerLogsList() throws Exception { + final TestingResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway(); + final TaskManagerLogsHandler taskManagerLogsHandler = new TaskManagerLogsHandler( + () -> CompletableFuture.completedFuture(null), + TestingUtils.TIMEOUT(), + Collections.emptyMap(), + TaskManagerLogsHeaders.getInstance(), + () -> CompletableFuture.completedFuture(resourceManagerGateway)); + final HandlerRequest handlerRequest = createRequest(EXPECTED_TASK_MANAGER_ID); + List logsList = new ArrayList<>(); + logsList.add(new LogInfo("taskmanager.log", 1024L)); + logsList.add(new LogInfo("taskmanager.out", 1024L)); + logsList.add(new LogInfo("taskmanager-2.out", 1024L)); + resourceManagerGateway.setRequestTaskManagerLogListFunction(EXPECTED_TASK_MANAGER_ID -> CompletableFuture.completedFuture(logsList)); + LogsInfo logsInfo = taskManagerLogsHandler.handleRequest(handlerRequest, resourceManagerGateway).get(); + assertEquals(logsInfo.getLogInfos().size(), resourceManagerGateway.requestTaskManagerLogList(EXPECTED_TASK_MANAGER_ID, TestingUtils.TIMEOUT()).get().size()); + } + + @Test + public void testGetTaskManagerLogsListForUnknownTaskExecutorException() throws Exception { + final TestingResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway(); + final TaskManagerLogsHandler taskManagerLogsHandler = new TaskManagerLogsHandler( + () -> CompletableFuture.completedFuture(null), + TestingUtils.TIMEOUT(), + Collections.emptyMap(), + TaskManagerLogsHeaders.getInstance(), + () -> CompletableFuture.completedFuture(resourceManagerGateway)); + final HandlerRequest handlerRequest = createRequest(EXPECTED_TASK_MANAGER_ID); +
[jira] [Commented] (FLINK-16482) Flink Job throw CloseException when call the FlinkKafkaConsumer cancel function
[ https://issues.apache.org/jira/browse/FLINK-16482?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17063042#comment-17063042 ] likang commented on FLINK-16482: Thank you very much > Flink Job throw CloseException when call the FlinkKafkaConsumer cancel > function > --- > > Key: FLINK-16482 > URL: https://issues.apache.org/jira/browse/FLINK-16482 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Reporter: likang >Priority: Critical > Labels: pull-request-available > Attachments: The bug and solution.docx > > Time Spent: 10m > Remaining Estimate: 0h > > *Background:* > Today I tried to detect my Flink job with a timing thread, and if > the job did not read the data for a long time, it automatically exited. But > when I detect the read timeout and call the cancel function of > FlinkKafkaConsumer, I find that a CloseException is thrown, and then Flink's > recovery mechanism considers that it exited abnormally and re-puller the task. > *Bug*: > I checked the Cancel code of the FlinkKafkaConsumer code, and found > that in fact, the Cancel of KafkaFetcher was first called, then the Close () > of Handover was called, and then the shutdown () of KafkaConsumerThread was > called. Finally, the KafkaConsumerThread thread exited the while loop and > called once after detecting the running identifier. Handover's Close (). > There will be several problems here: 1. CloseException will be thrown > when Handover is called in Cancel of KafkaFetcher, here need to remove the > call of handover.close () 2. The thread in KafkaConsumerThread exits because > of running = false After the loop, you need to determine whether to exit > normally. You should not call handover.close () for normal exit, otherwise > you will also throw a CloseException. > Final the details and solutions are in the attachment -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] flinkbot commented on issue #11456: [FLINK-16602][k8s] Rework the internal/external Service
flinkbot commented on issue #11456: [FLINK-16602][k8s] Rework the internal/external Service URL: https://github.com/apache/flink/pull/11456#issuecomment-601501414 ## CI report: * e4c3073a6fb379bacb4da0e299245a73ea2a97d4 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #11421: [FLINK-15962][network] Reduce the default chunk size to 4M in netty stack
flinkbot edited a comment on issue #11421: [FLINK-15962][network] Reduce the default chunk size to 4M in netty stack URL: https://github.com/apache/flink/pull/11421#issuecomment-599826019 ## CI report: * 394192b70d312009146b91bda454b5297ad3b036 Travis: [SUCCESS](https://travis-ci.com/github/flink-ci/flink/builds/154189440) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=6427) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhengcanbin commented on issue #11415: [FLINK-15667][k8s] Support to mount custom Hadoop configurations
zhengcanbin commented on issue #11415: [FLINK-15667][k8s] Support to mount custom Hadoop configurations URL: https://github.com/apache/flink/pull/11415#issuecomment-601500881 @flinkbot run travis This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhengcanbin commented on issue #11415: [FLINK-15667][k8s] Support to mount custom Hadoop configurations
zhengcanbin commented on issue #11415: [FLINK-15667][k8s] Support to mount custom Hadoop configurations URL: https://github.com/apache/flink/pull/11415#issuecomment-601500915 @flinkbot run azure This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot commented on issue #11456: [FLINK-16602][k8s] Rework the internal/external Service
flinkbot commented on issue #11456: [FLINK-16602][k8s] Rework the internal/external Service URL: https://github.com/apache/flink/pull/11456#issuecomment-601500094 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Automated Checks Last check on commit e4c3073a6fb379bacb4da0e299245a73ea2a97d4 (Fri Mar 20 02:29:50 UTC 2020) **Warnings:** * No documentation files were touched! Remember to keep the Flink docs up to date! Mention the bot in a comment to re-run the automated checks. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-16602) Rework the Service design for Kubernetes deployment
[ https://issues.apache.org/jira/browse/FLINK-16602?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-16602: --- Labels: pull-request-available (was: ) > Rework the Service design for Kubernetes deployment > --- > > Key: FLINK-16602 > URL: https://issues.apache.org/jira/browse/FLINK-16602 > Project: Flink > Issue Type: Improvement > Components: Deployment / Kubernetes >Affects Versions: 1.10.0 >Reporter: Canbin Zheng >Assignee: Canbin Zheng >Priority: Major > Labels: pull-request-available > Fix For: 1.11.0 > > > {color:#0e101a}At the moment we usually create two Services for a Flink > application, one is the internal Service and the other is the so-called rest > Service, the previous aims for forwarding request from the TMs to the JM, and > the rest Service mainly serves as an external service for the Flink > application. Here is a summary of the issues:{color} > # {color:#0e101a}The functionality boundary of the two Services is not clear > enough since the internal Service could also become the rest Service when its > exposed type is ClusterIP.{color} > # {color:#0e101a}For the high availability scenario, we create a useless > internal Service which does not help forward the internal requests since the > TMs directly communicate with the JM via the IP or hostname of the JM > Pod.{color} > # {color:#0e101a}Headless service is enough to help forward the internal > requests from the TMs to the JM. Service of ClusterIP type would add > corresponding rules into the iptables, too many rules in the iptables would > lower the kube-proxy's efficiency in refreshing iptables while notified of > change events, which could possibly cause severe stability problems in a > Kubernetes cluster.{color} > > {color:#0e101a}Therefore, we propose some improvements to the current > design:{color} > # {color:#0e101a}Clarify the functionality boundary for the two Services, > the internal Service only serves the internal communication from TMs to JM, > while the rest Service makes the Flink cluster accessible from outside. The > internal Service only exposes the RPC and BLOB ports while the external one > exposes the REST port.{color} > # {color:#0e101a}Do not create the internal Service in the high availability > case.{color} > # {color:#0e101a}Use HEADLESS type for the internal Service.{color} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-16687) PyFlink Cannot determine simple type name "PythonScalarFunction$0"
[ https://issues.apache.org/jira/browse/FLINK-16687?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dian Fu updated FLINK-16687: Priority: Major (was: Blocker) > PyFlink Cannot determine simple type name "PythonScalarFunction$0" > -- > > Key: FLINK-16687 > URL: https://issues.apache.org/jira/browse/FLINK-16687 > Project: Flink > Issue Type: Bug > Components: API / Python >Affects Versions: 1.10.0 >Reporter: mayne wong >Priority: Major > > > I try to run PyFlink UDF with SQL UNNEST, execution of job failed, I defined > a source from element, and use UDF split the string to list. > raise org.codehaus.commons.compiler.CompileException: Cannot determine simple > type name "PythonScalarFunction$0" > {code:python} > import os > from pyflink.table.udf import udf > from pyflink.datastream import StreamExecutionEnvironment > from pyflink.table import StreamTableEnvironment, DataTypes, CsvTableSink > @udf(input_types=[DataTypes.STRING()], > result_type=DataTypes.ARRAY(DataTypes.STRING())) > def format_string_to_array(item): > return item.replace('[', '').replace(']', '').replace(', ', > ',').split(',') > if __name__ == '__main__': > env = StreamExecutionEnvironment.get_execution_environment() > env.set_parallelism(1) > st_env = StreamTableEnvironment.create(env) > result_file = "result.csv" > if os.path.exists(result_file): > os.remove(result_file) > st_env.register_table_sink("result_tab", CsvTableSink(["id", "url"], > [DataTypes.STRING(), DataTypes.STRING()], result_file)) > st_env.register_function("format_string_to_array", format_string_to_array) > tab = st_env.from_elements([("1", "['www.bing.com', 'www.google.com']"), > ("2", "['www.taobao.com']")], ['id', 'urls']) > st_env.register_table("temp_table", tab) > st_env.sql_query("Select id, A.url from temp_table, > UNNEST(format_string_to_array(temp_table.urls)) AS > A(url)").insert_into("result_tab") > st_env.execute("udf") > {code} > > When I execute the program, I get the following exception: > > {code:java} > py4j.protocol.Py4JJavaError: An error occurred while calling o2.execute. > : java.util.concurrent.ExecutionException: > org.apache.flink.client.program.ProgramInvocationException: Job failed > (JobID: 5d63838ad2043bf4a5d0bca83623959d) > at > java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) > at > java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908) > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1640) > at > org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:74) > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1620) > at > org.apache.flink.table.executor.StreamExecutor.execute(StreamExecutor.java:50) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:643) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) > at > org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) > at > org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282) > at > org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) > at > org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79) > at > org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238) > at java.lang.Thread.run(Thread.java:748) > Caused by: org.apache.flink.client.program.ProgramInvocationException: Job > failed (JobID: 5d63838ad2043bf4a5d0bca83623959d) > at > org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:112) > at > java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616) > at > java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591) > at > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) > at > java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975) > at >
[GitHub] [flink] zhengcanbin opened a new pull request #11456: [FLINK-16602][k8s] Rework the internal/external Service
zhengcanbin opened a new pull request #11456: [FLINK-16602][k8s] Rework the internal/external Service URL: https://github.com/apache/flink/pull/11456 ## What is the purpose of the change At the moment we usually create two Services for a Flink application, one is the internal Service and the other is the so-called rest Service, the previous aims for forwarding request from the TMs to the JM, and the rest Service mainly serves as an external service for the Flink application. Here is a summary of the issues: 1. The functionality boundary of the two Services is not clear enough since the internal Service could also become the rest Service when its exposed type is ClusterIP. 2. For the high availability scenario, we create a useless internal Service which does not help forward the internal requests since the TMs directly communicate with the JM via the IP or hostname of the JM Pod. 3. Headless service is enough to help forward the internal requests from the TMs to the JM. Service of ClusterIP type would add corresponding rules into the iptables, too many rules in the iptables would lower the kube-proxy's efficiency in refreshing iptables while notified of change events, which could possibly cause severe stability problems in a Kubernetes cluster. Therefore, we propose some improvements to the current design: 1. Clarify the functionality boundary for the two Services, the internal Service only serves the internal communication from TMs to JM, while the rest Service makes the Flink cluster accessible from outside. The internal Service only exposes the RPC and BLOB ports while the external one exposes the REST port. 2. Do not create the internal Service in the high availability case. 3. Use HEADLESS type for the internal Service. ## Verifying this change This change added unit tests and can be verified on a real K8s cluster as follows: 1. Deploy a non-HA session cluster and check there are two dedicated Services created, the internal one has NONE CLUSTER-IP. 2. Deploy an HA session cluster and check only the rest Service is created. 3. Check that the internal Service only exposes the RPC and BLOB port while the external one exposes the REST port. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / no / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] JingsongLi commented on a change in pull request #11336: [FLINK-16471][jdbc] develop PostgresCatalog
JingsongLi commented on a change in pull request #11336: [FLINK-16471][jdbc] develop PostgresCatalog URL: https://github.com/apache/flink/pull/11336#discussion_r395413626 ## File path: flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/catalog/JDBCCatalog.java ## @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.io.jdbc.catalog; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.java.io.jdbc.dialect.JDBCDialect; +import org.apache.flink.api.java.io.jdbc.dialect.JDBCDialects; +import org.apache.flink.table.catalog.Catalog; +import org.apache.flink.table.catalog.CatalogBaseTable; +import org.apache.flink.table.catalog.CatalogDatabase; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.catalog.exceptions.CatalogException; +import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException; +import org.apache.flink.table.catalog.exceptions.TableNotExistException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; + +/** + * + */ +@PublicEvolving +public class JDBCCatalog extends AbstractJDBCCatalog { + + private static final Logger LOG = LoggerFactory.getLogger(JDBCCatalog.class); + + private final Catalog internal; + + public JDBCCatalog(String catalogName, String defaultDatabase, String username, String pwd, String baseUrl) { + super(catalogName, defaultDatabase, username, pwd, baseUrl); + + JDBCDialect dialect = JDBCDialects.get(baseUrl).get(); + + if (dialect instanceof JDBCDialects.PostgresDialect) { + internal = new PostgresCatalog(catalogName, defaultDatabase, username, pwd, baseUrl); Review comment: I know this implementation is in FLIP. But this if else still looks weird to me. Can we add this interface to JDBCDialect? like add method: `createCatalog(...)` in JDBCDialect. Instead need update JDBCCatalog every time. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #11421: [FLINK-15962][network] Reduce the default chunk size to 4M in netty stack
flinkbot edited a comment on issue #11421: [FLINK-15962][network] Reduce the default chunk size to 4M in netty stack URL: https://github.com/apache/flink/pull/11421#issuecomment-599826019 ## CI report: * 394192b70d312009146b91bda454b5297ad3b036 Travis: [PENDING](https://travis-ci.com/github/flink-ci/flink/builds/154189440) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=6427) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] JingsongLi commented on a change in pull request #11336: [FLINK-16471][jdbc] develop PostgresCatalog
JingsongLi commented on a change in pull request #11336: [FLINK-16471][jdbc] develop PostgresCatalog URL: https://github.com/apache/flink/pull/11336#discussion_r395412510 ## File path: flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/catalog/JDBCCatalog.java ## @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.io.jdbc.catalog; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.java.io.jdbc.dialect.JDBCDialect; +import org.apache.flink.api.java.io.jdbc.dialect.JDBCDialects; +import org.apache.flink.table.catalog.Catalog; +import org.apache.flink.table.catalog.CatalogBaseTable; +import org.apache.flink.table.catalog.CatalogDatabase; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.catalog.exceptions.CatalogException; +import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException; +import org.apache.flink.table.catalog.exceptions.TableNotExistException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; + +/** + * Review comment: Comment? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-16687) PyFlink Cannot determine simple type name "PythonScalarFunction$0"
mayne wong created FLINK-16687: -- Summary: PyFlink Cannot determine simple type name "PythonScalarFunction$0" Key: FLINK-16687 URL: https://issues.apache.org/jira/browse/FLINK-16687 Project: Flink Issue Type: Bug Components: API / Python Affects Versions: 1.10.0 Reporter: mayne wong I try to run PyFlink UDF with SQL UNNEST, execution of job failed, I defined a source from element, and use UDF split the string to list. raise org.codehaus.commons.compiler.CompileException: Cannot determine simple type name "PythonScalarFunction$0" {code:python} import os from pyflink.table.udf import udf from pyflink.datastream import StreamExecutionEnvironment from pyflink.table import StreamTableEnvironment, DataTypes, CsvTableSink @udf(input_types=[DataTypes.STRING()], result_type=DataTypes.ARRAY(DataTypes.STRING())) def format_string_to_array(item): return item.replace('[', '').replace(']', '').replace(', ', ',').split(',') if __name__ == '__main__': env = StreamExecutionEnvironment.get_execution_environment() env.set_parallelism(1) st_env = StreamTableEnvironment.create(env) result_file = "result.csv" if os.path.exists(result_file): os.remove(result_file) st_env.register_table_sink("result_tab", CsvTableSink(["id", "url"], [DataTypes.STRING(), DataTypes.STRING()], result_file)) st_env.register_function("format_string_to_array", format_string_to_array) tab = st_env.from_elements([("1", "['www.bing.com', 'www.google.com']"), ("2", "['www.taobao.com']")], ['id', 'urls']) st_env.register_table("temp_table", tab) st_env.sql_query("Select id, A.url from temp_table, UNNEST(format_string_to_array(temp_table.urls)) AS A(url)").insert_into("result_tab") st_env.execute("udf") {code} When I execute the program, I get the following exception: {code:java} py4j.protocol.Py4JJavaError: An error occurred while calling o2.execute. : java.util.concurrent.ExecutionException: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: 5d63838ad2043bf4a5d0bca83623959d) at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1640) at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:74) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1620) at org.apache.flink.table.executor.StreamExecutor.execute(StreamExecutor.java:50) at org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:643) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282) at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79) at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: 5d63838ad2043bf4a5d0bca83623959d) at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:112) at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616) at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975) at org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:874) at akka.dispatch.OnComplete.internal(Future.scala:264) at akka.dispatch.OnComplete.internal(Future.scala:261) at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191) at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188) at
[GitHub] [flink] JingsongLi commented on a change in pull request #11336: [FLINK-16471][jdbc] develop PostgresCatalog
JingsongLi commented on a change in pull request #11336: [FLINK-16471][jdbc] develop PostgresCatalog URL: https://github.com/apache/flink/pull/11336#discussion_r395412120 ## File path: flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/catalog/AbstractJDBCCatalog.java ## @@ -0,0 +1,277 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.io.jdbc.catalog; + +import org.apache.flink.api.java.io.jdbc.JDBCTableSourceSinkFactory; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.catalog.AbstractCatalog; +import org.apache.flink.table.catalog.CatalogBaseTable; +import org.apache.flink.table.catalog.CatalogDatabase; +import org.apache.flink.table.catalog.CatalogFunction; +import org.apache.flink.table.catalog.CatalogPartition; +import org.apache.flink.table.catalog.CatalogPartitionSpec; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.catalog.exceptions.CatalogException; +import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException; +import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException; +import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException; +import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException; +import org.apache.flink.table.catalog.exceptions.FunctionNotExistException; +import org.apache.flink.table.catalog.exceptions.PartitionAlreadyExistsException; +import org.apache.flink.table.catalog.exceptions.PartitionNotExistException; +import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException; +import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException; +import org.apache.flink.table.catalog.exceptions.TableNotExistException; +import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException; +import org.apache.flink.table.catalog.exceptions.TablePartitionedException; +import org.apache.flink.table.catalog.stats.CatalogColumnStatistics; +import org.apache.flink.table.catalog.stats.CatalogTableStatistics; +import org.apache.flink.table.expressions.Expression; +import org.apache.flink.table.factories.TableFactory; +import org.apache.flink.util.StringUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.util.Collections; +import java.util.List; +import java.util.Optional; + +import static org.apache.flink.util.Preconditions.checkArgument; + +/** + * Abstract catalog for any JDBC catalogs. + */ +public abstract class AbstractJDBCCatalog extends AbstractCatalog { + + private static final Logger LOG = LoggerFactory.getLogger(AbstractJDBCCatalog.class); + + protected final String username; + protected final String pwd; + protected final String baseUrl; + protected final String defaultUrl; + + public AbstractJDBCCatalog(String catalogName, String defaultDatabase, String username, String pwd, String baseUrl) { + super(catalogName, defaultDatabase); + + checkArgument(!StringUtils.isNullOrWhitespaceOnly(username)); + checkArgument(!StringUtils.isNullOrWhitespaceOnly(pwd)); + checkArgument(!StringUtils.isNullOrWhitespaceOnly(baseUrl)); + + JDBCCatalogUtils.validateJDBCUrl(baseUrl); + + this.username = username; + this.pwd = pwd; + this.baseUrl = baseUrl.endsWith("/") ? baseUrl : baseUrl + "/"; + this.defaultUrl = baseUrl + defaultDatabase; + } + + @Override + public void open() throws CatalogException { + // test connection, fail early if we cannot connect to database + try (Connection conn = DriverManager.getConnection(defaultUrl, username, pwd)) { + } catch (SQLException e) { + throw new ValidationException( + String.format("Failed connecting to %s via JDBC.", defaultUrl), e); + } + + LOG.info("Catalog {} established connection to {}", getName(), defaultUrl);
[GitHub] [flink] JingsongLi commented on issue #11391: [FLINK-16098] [chinese-translation, Documentation] Translate "Overview" page of "Hive Integration" into Chinese
JingsongLi commented on issue #11391: [FLINK-16098] [chinese-translation, Documentation] Translate "Overview" page of "Hive Integration" into Chinese URL: https://github.com/apache/flink/pull/11391#issuecomment-601496994 Hi @liuzhixing1006 , you can not use merge. You should use rebase only. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink-statefun] sjwiesman commented on issue #62: [FLINK-16685] Add a k8s example for the Python SDK
sjwiesman commented on issue #62: [FLINK-16685] Add a k8s example for the Python SDK URL: https://github.com/apache/flink-statefun/pull/62#issuecomment-601494204 Maybe off topics but should we add k8s deployment docs? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-16557) Document YAML-ized Kafka egresses / ingresses in Stateful Functions documentation
[ https://issues.apache.org/jira/browse/FLINK-16557?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-16557: --- Labels: pull-request-available (was: ) > Document YAML-ized Kafka egresses / ingresses in Stateful Functions > documentation > - > > Key: FLINK-16557 > URL: https://issues.apache.org/jira/browse/FLINK-16557 > Project: Flink > Issue Type: Task > Components: Stateful Functions >Affects Versions: statefun-1.1 >Reporter: Tzu-Li (Gordon) Tai >Assignee: Seth Wiesman >Priority: Major > Labels: pull-request-available > > The Stateful Functions documentation is still missing information about > YAML-ized egresses / ingresses. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink-statefun] sjwiesman opened a new pull request #63: [FLINK-16557][docs] Document YAML-ized Kafka egresses / ingresses in …
sjwiesman opened a new pull request #63: [FLINK-16557][docs] Document YAML-ized Kafka egresses / ingresses in … URL: https://github.com/apache/flink-statefun/pull/63 …Stateful Functions documentation This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Closed] (FLINK-16650) Support LocalZonedTimestampType for Python UDF in blink planner
[ https://issues.apache.org/jira/browse/FLINK-16650?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hequn Cheng closed FLINK-16650. --- Resolution: Resolved > Support LocalZonedTimestampType for Python UDF in blink planner > --- > > Key: FLINK-16650 > URL: https://issues.apache.org/jira/browse/FLINK-16650 > Project: Flink > Issue Type: Improvement > Components: API / Python >Reporter: Dian Fu >Assignee: Dian Fu >Priority: Major > Labels: pull-request-available > Fix For: 1.11.0 > > Time Spent: 20m > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-16650) Support LocalZonedTimestampType for Python UDF in blink planner
[ https://issues.apache.org/jira/browse/FLINK-16650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17063034#comment-17063034 ] Hequn Cheng commented on FLINK-16650: - Resolved in 1.11.0 via 5ccb16724769becab0003e0299d9c4a63cd52378 > Support LocalZonedTimestampType for Python UDF in blink planner > --- > > Key: FLINK-16650 > URL: https://issues.apache.org/jira/browse/FLINK-16650 > Project: Flink > Issue Type: Improvement > Components: API / Python >Reporter: Dian Fu >Assignee: Dian Fu >Priority: Major > Labels: pull-request-available > Fix For: 1.11.0 > > Time Spent: 20m > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] hequn8128 merged pull request #11439: [FLINK-16650][python] Support LocalZonedTimestampType for Python UDF in blink planner
hequn8128 merged pull request #11439: [FLINK-16650][python] Support LocalZonedTimestampType for Python UDF in blink planner URL: https://github.com/apache/flink/pull/11439 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Comment Edited] (FLINK-16637) Flink per job mode terminates before serving job cancellation result
[ https://issues.apache.org/jira/browse/FLINK-16637?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17063025#comment-17063025 ] Zili Chen edited comment on FLINK-16637 at 3/20/20, 1:31 AM: - [~gjy] after "cancel" command received by the {{JobMaster}}, it causes 1. The job asynchronously cancelled, and later causes cluster shutdown. 2. The response asynchronously sent I don't find an explicit synchronization between 1 and 2, so is the statement. But generally 1 takes some time before later it causes the cluster shutdown, and thus 2 can nearly always happens before the cluster shutdown. Possibly we flush outstanding response in rest server before it gets closed. was (Author: tison): [~gjy] after "cancel" command received by the {{JobMaster}}, it causes 1. The job asynchronously cancelled 2. The response asynchronously sent I don't find an explicit synchronization between 1 and 2, so is the statement. But generally 1 takes some time before later it causes the cluster shutdown, and thus 2 can nearly always happens before the cluster shutdown. > Flink per job mode terminates before serving job cancellation result > > > Key: FLINK-16637 > URL: https://issues.apache.org/jira/browse/FLINK-16637 > Project: Flink > Issue Type: Bug > Components: Deployment / YARN >Affects Versions: 1.10.0 >Reporter: Zili Chen >Priority: Major > Fix For: 1.10.1, 1.11.0 > > Attachments: yarn.log > > > The {{MiniDispatcher}} no longer waits until the REST handler has served the > cancellation result before shutting down. This behaviour seems to be > introduced with FLINK-15116. > See also > [https://lists.apache.org/x/thread.html/rcadbd6ceede422bac8d4483fd0cdae58659fbff78533a399eb136743@%3Cdev.flink.apache.org%3E] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] flinkbot edited a comment on issue #11421: [FLINK-15962][network] Reduce the default chunk size to 4M in netty stack
flinkbot edited a comment on issue #11421: [FLINK-15962][network] Reduce the default chunk size to 4M in netty stack URL: https://github.com/apache/flink/pull/11421#issuecomment-599826019 ## CI report: * dd9c079d6024bcae4216bdc3004660018e41f938 Travis: [FAILURE](https://travis-ci.com/github/flink-ci/flink/builds/153624584) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=6315) * 394192b70d312009146b91bda454b5297ad3b036 Travis: [PENDING](https://travis-ci.com/github/flink-ci/flink/builds/154189440) Azure: [PENDING](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=6427) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-16637) Flink per job mode terminates before serving job cancellation result
[ https://issues.apache.org/jira/browse/FLINK-16637?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17063025#comment-17063025 ] Zili Chen commented on FLINK-16637: --- [~gjy] after "cancel" command received by the {{JobMaster}}, it causes 1. The job asynchronously cancelled 2. The response asynchronously sent I don't find an explicit synchronization between 1 and 2, so is the statement. But generally 1 takes some time before later it causes the cluster shutdown, and thus 2 can nearly always happens before the cluster shutdown. > Flink per job mode terminates before serving job cancellation result > > > Key: FLINK-16637 > URL: https://issues.apache.org/jira/browse/FLINK-16637 > Project: Flink > Issue Type: Bug > Components: Deployment / YARN >Affects Versions: 1.10.0 >Reporter: Zili Chen >Priority: Major > Fix For: 1.10.1, 1.11.0 > > Attachments: yarn.log > > > The {{MiniDispatcher}} no longer waits until the REST handler has served the > cancellation result before shutting down. This behaviour seems to be > introduced with FLINK-15116. > See also > [https://lists.apache.org/x/thread.html/rcadbd6ceede422bac8d4483fd0cdae58659fbff78533a399eb136743@%3Cdev.flink.apache.org%3E] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] dianfu commented on issue #11439: [FLINK-16650][python] Support LocalZonedTimestampType for Python UDF in blink planner
dianfu commented on issue #11439: [FLINK-16650][python] Support LocalZonedTimestampType for Python UDF in blink planner URL: https://github.com/apache/flink/pull/11439#issuecomment-601487959 The failed azure test is a known issue and has already been tracked in https://issues.apache.org/jira/browse/FLINK-16676 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-15852) Job is submitted to the wrong session cluster
[ https://issues.apache.org/jira/browse/FLINK-15852?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17063021#comment-17063021 ] Canbin Zheng commented on FLINK-15852: -- Thanks for the fixup [~kkl0u]! > Job is submitted to the wrong session cluster > - > > Key: FLINK-15852 > URL: https://issues.apache.org/jira/browse/FLINK-15852 > Project: Flink > Issue Type: Bug > Components: Command Line Client >Affects Versions: 1.10.0 >Reporter: Canbin Zheng >Assignee: Kostas Kloudas >Priority: Critical > Labels: pull-request-available > Fix For: 1.10.1, 1.11.0 > > Time Spent: 20m > Remaining Estimate: 0h > > Steps to reproduce the problem: > # Deploy a YARN session cluster by command {{./bin/yarn-session.sh -d}} > # Deploy a Kubernetes session cluster by command > {{./bin/kubernetes-session.sh -Dkubernetes.cluster-id=test ...}} > # Try to submit a Job to the Kubernetes session cluster by command > {{./bin/flink run -d -e kubernetes-session -Dkubernetes.cluster-id=test > examples/streaming/WordCount.jar}} > It's expected that the Job will be submitted to the Kubernetes session > cluster whose cluster-id is *test*, however, the job was submitted to the > YARN session cluster. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] flinkbot edited a comment on issue #11421: [FLINK-15962][network] Reduce the default chunk size to 4M in netty stack
flinkbot edited a comment on issue #11421: [FLINK-15962][network] Reduce the default chunk size to 4M in netty stack URL: https://github.com/apache/flink/pull/11421#issuecomment-599826019 ## CI report: * dd9c079d6024bcae4216bdc3004660018e41f938 Travis: [FAILURE](https://travis-ci.com/github/flink-ci/flink/builds/153624584) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=6315) * 394192b70d312009146b91bda454b5297ad3b036 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] liuzhixing1006 edited a comment on issue #11391: [FLINK-16098] [chinese-translation, Documentation] Translate "Overview" page of "Hive Integration" into Chinese
liuzhixing1006 edited a comment on issue #11391: [FLINK-16098] [chinese-translation, Documentation] Translate "Overview" page of "Hive Integration" into Chinese URL: https://github.com/apache/flink/pull/11391#issuecomment-601200160 @lirui-apache @JingsongLi Thanks for your advice, I have improved the problem. But I had a problem updating the branch,I use the following in order: 1.git add -A 2.git commit -am [FLINK-16098] [chinese-translation, Documentation] Translate "Overview" page of "Hive Integration" into Chinese 3.git fetch upstream master 4.git merge upstream/master 5.git rebase -i master 6.git push origin docFix Then in the pull request you find that there are a lot of unrelated commits。 Is there something wrong with me? Do I need to close the current pullrequest and create a new one? Thanks~ This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters
flinkbot edited a comment on issue #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters URL: https://github.com/apache/flink/pull/11195#issuecomment-590065588 ## CI report: * 738a6fd162d5ba99432438d224f3b4890961af56 UNKNOWN * 77f24b292a04fde34f4c645151484cf86a13832c UNKNOWN * 5dfb306ba6ff07f700b1afc21847641321705d4d UNKNOWN * 5b0f219689e81fe11c3511c13c7f4943a997a4ef UNKNOWN * 7f6ad172474e37bbf346a7762868c9114510c4c9 UNKNOWN * 2590a69417bdd2b6bf2f4765b276a051040a97cf UNKNOWN * d07ec693eb90e33071e710be0b774fc995f05867 Travis: [CANCELED](https://travis-ci.com/github/flink-ci/flink/builds/154179607) * a1b15671c97e8809c0301fb4c225fdc1a7f78545 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters
afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters URL: https://github.com/apache/flink/pull/11195#discussion_r395376334 ## File path: flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/FlinkDistribution.java ## @@ -260,16 +262,26 @@ public void submitSQLJob(SQLJobSubmission job) throws IOException { } public void copyOptJarsToLib(String jarNamePrefix) throws FileNotFoundException, IOException { - final Optional reporterJarOptional; - try (Stream logFiles = Files.walk(opt)) { - reporterJarOptional = logFiles + copyOptJars(jarNamePrefix, lib); + } + + public void copyOptJarsToPlugins(String jarNamePrefix) throws FileNotFoundException, IOException { Review comment: I assume you mean to modify `FlinkDistribution#mapJarLocationToPath` ? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters
afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters URL: https://github.com/apache/flink/pull/11195#discussion_r395375550 ## File path: flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/FlinkDistribution.java ## @@ -260,16 +262,26 @@ public void submitSQLJob(SQLJobSubmission job) throws IOException { } public void copyOptJarsToLib(String jarNamePrefix) throws FileNotFoundException, IOException { - final Optional reporterJarOptional; - try (Stream logFiles = Files.walk(opt)) { - reporterJarOptional = logFiles + copyOptJars(jarNamePrefix, lib); + } + + public void copyOptJarsToPlugins(String jarNamePrefix) throws FileNotFoundException, IOException { Review comment: > and modify FlinkDistribution#moveJar to handle this location appropriately. could you clarify? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters
afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters URL: https://github.com/apache/flink/pull/11195#discussion_r395376334 ## File path: flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/FlinkDistribution.java ## @@ -260,16 +262,26 @@ public void submitSQLJob(SQLJobSubmission job) throws IOException { } public void copyOptJarsToLib(String jarNamePrefix) throws FileNotFoundException, IOException { - final Optional reporterJarOptional; - try (Stream logFiles = Files.walk(opt)) { - reporterJarOptional = logFiles + copyOptJars(jarNamePrefix, lib); + } + + public void copyOptJarsToPlugins(String jarNamePrefix) throws FileNotFoundException, IOException { Review comment: I assume you mean to modify FlinkDistribution#mapJarLocationToPath ? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters
afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters URL: https://github.com/apache/flink/pull/11195#discussion_r395375550 ## File path: flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/FlinkDistribution.java ## @@ -260,16 +262,26 @@ public void submitSQLJob(SQLJobSubmission job) throws IOException { } public void copyOptJarsToLib(String jarNamePrefix) throws FileNotFoundException, IOException { - final Optional reporterJarOptional; - try (Stream logFiles = Files.walk(opt)) { - reporterJarOptional = logFiles + copyOptJars(jarNamePrefix, lib); + } + + public void copyOptJarsToPlugins(String jarNamePrefix) throws FileNotFoundException, IOException { Review comment: > and modify FlinkDistribution#moveJar to handle this location appropriately. - could you clarify? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters
afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters URL: https://github.com/apache/flink/pull/11195#discussion_r395375550 ## File path: flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/FlinkDistribution.java ## @@ -260,16 +262,26 @@ public void submitSQLJob(SQLJobSubmission job) throws IOException { } public void copyOptJarsToLib(String jarNamePrefix) throws FileNotFoundException, IOException { - final Optional reporterJarOptional; - try (Stream logFiles = Files.walk(opt)) { - reporterJarOptional = logFiles + copyOptJars(jarNamePrefix, lib); + } + + public void copyOptJarsToPlugins(String jarNamePrefix) throws FileNotFoundException, IOException { Review comment: > and modify FlinkDistribution#moveJar to handle this location appropriately. could you clarify? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters
afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters URL: https://github.com/apache/flink/pull/11195#discussion_r395375550 ## File path: flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/FlinkDistribution.java ## @@ -260,16 +262,26 @@ public void submitSQLJob(SQLJobSubmission job) throws IOException { } public void copyOptJarsToLib(String jarNamePrefix) throws FileNotFoundException, IOException { - final Optional reporterJarOptional; - try (Stream logFiles = Files.walk(opt)) { - reporterJarOptional = logFiles + copyOptJars(jarNamePrefix, lib); + } + + public void copyOptJarsToPlugins(String jarNamePrefix) throws FileNotFoundException, IOException { Review comment: > and modify FlinkDistribution#moveJar to handle this location appropriately. could you clarify? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters
afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters URL: https://github.com/apache/flink/pull/11195#discussion_r395375423 ## File path: flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/FlinkDistribution.java ## @@ -260,16 +262,26 @@ public void submitSQLJob(SQLJobSubmission job) throws IOException { } public void copyOptJarsToLib(String jarNamePrefix) throws FileNotFoundException, IOException { - final Optional reporterJarOptional; - try (Stream logFiles = Files.walk(opt)) { - reporterJarOptional = logFiles + copyOptJars(jarNamePrefix, lib); + } + + public void copyOptJarsToPlugins(String jarNamePrefix) throws FileNotFoundException, IOException { Review comment: But this is not really "copying jars", right? It will actually move the file from opt to lib or plugins. The problem is that one of the cases I would like to test required the jar to be actually copied to both. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters
flinkbot edited a comment on issue #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters URL: https://github.com/apache/flink/pull/11195#issuecomment-590065588 ## CI report: * 738a6fd162d5ba99432438d224f3b4890961af56 UNKNOWN * 77f24b292a04fde34f4c645151484cf86a13832c UNKNOWN * 5dfb306ba6ff07f700b1afc21847641321705d4d UNKNOWN * 5b0f219689e81fe11c3511c13c7f4943a997a4ef UNKNOWN * 7ce9ad87a780aab19b9c97680d7af5f648c13f3b Travis: [FAILURE](https://travis-ci.com/github/flink-ci/flink/builds/154170382) * 7f6ad172474e37bbf346a7762868c9114510c4c9 UNKNOWN * 2590a69417bdd2b6bf2f4765b276a051040a97cf UNKNOWN * d07ec693eb90e33071e710be0b774fc995f05867 Travis: [PENDING](https://travis-ci.com/github/flink-ci/flink/builds/154179607) * a1b15671c97e8809c0301fb4c225fdc1a7f78545 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters
afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters URL: https://github.com/apache/flink/pull/11195#discussion_r395348055 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java ## @@ -179,28 +164,82 @@ private static ReporterSetup createReporterSetup(String reporterName, MetricConf metricReporterOptional.ifPresent(reporter -> { MetricConfig metricConfig = new MetricConfig(); reporterConfig.addAllToProperties(metricConfig); - - reporterArguments.add(createReporterSetup(reporterName, metricConfig, reporter)); + reporterSetups.add(createReporterSetup(reporterName, metricConfig, reporter)); }); - } - catch (Throwable t) { + } catch (Throwable t) { LOG.error("Could not instantiate metrics reporter {}. Metrics might not be exposed/reported.", reporterName, t); } } - return reporterArguments; + return reporterSetups; } - private static Map loadReporterFactories() { - final ServiceLoader serviceLoader = ServiceLoader.load(MetricReporterFactory.class); + private static List> loadReporterConfigurations(Configuration configuration, Set namedReporters) { + final List> reporterConfigurations = new ArrayList<>(namedReporters.size()); + for (String namedReporter: namedReporters) { + DelegatingConfiguration delegatingConfiguration = new DelegatingConfiguration( + configuration, + ConfigConstants.METRICS_REPORTER_PREFIX + namedReporter + '.'); + + reporterConfigurations.add(Tuple2.of(namedReporter, delegatingConfiguration)); + } + return reporterConfigurations; + } + + private static Set findEnabledReportersInConfiguration(Configuration configuration, String includedReportersString) { + Set includedReporters = reporterListPattern.splitAsStream(includedReportersString) + .filter(r -> !r.isEmpty()) // splitting an empty string results in an empty string on jdk9+ + .collect(Collectors.toSet()); Review comment: @AHeise I have applied this refactoring but then understood that I probably really did not get what you actually propose. Could you please clarify? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters
afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters URL: https://github.com/apache/flink/pull/11195#discussion_r395368073 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java ## @@ -178,36 +164,96 @@ private static ReporterSetup createReporterSetup(String reporterName, MetricConf metricReporterOptional.ifPresent(reporter -> { MetricConfig metricConfig = new MetricConfig(); reporterConfig.addAllToProperties(metricConfig); - - reporterArguments.add(createReporterSetup(reporterName, metricConfig, reporter)); + reporterSetups.add(createReporterSetup(reporterName, metricConfig, reporter)); }); - } - catch (Throwable t) { + } catch (Throwable t) { LOG.error("Could not instantiate metrics reporter {}. Metrics might not be exposed/reported.", reporterName, t); } } - return reporterArguments; + return reporterSetups; } - private static Map loadReporterFactories() { - final ServiceLoader serviceLoader = ServiceLoader.load(MetricReporterFactory.class); + private static List> loadReporterConfigurations(Configuration configuration, Set namedReporters) { + final List> reporterConfigurations = new ArrayList<>(namedReporters.size()); + + for (String namedReporter: namedReporters) { + DelegatingConfiguration delegatingConfiguration = new DelegatingConfiguration( + configuration, + ConfigConstants.METRICS_REPORTER_PREFIX + namedReporter + '.'); + reporterConfigurations.add(Tuple2.of(namedReporter, delegatingConfiguration)); + } + return reporterConfigurations; + } + + private static Set findEnabledReportersInConfiguration(Configuration configuration, String includedReportersString) { + Set includedReporters = reporterListPattern.splitAsStream(includedReportersString) + .filter(r -> !r.isEmpty()) // splitting an empty string results in an empty string on jdk9+ + .collect(Collectors.toSet()); + + // use a TreeSet to make the reporter order deterministic, which is useful for testing + Set namedOrderedReporters = new TreeSet<>(String::compareTo); + + // scan entire configuration for keys starting with METRICS_REPORTER_PREFIX and determine the set of enabled reporters + for (String key : configuration.keySet()) { + if (key.startsWith(ConfigConstants.METRICS_REPORTER_PREFIX)) { + Matcher matcher = reporterClassPattern.matcher(key); + if (matcher.matches()) { + String reporterName = matcher.group(1); + if (includedReporters.isEmpty() || includedReporters.contains(reporterName)) { + if (namedOrderedReporters.contains(reporterName)) { + LOG.warn("Duplicate class configuration detected for reporter {}.", reporterName); + } else { + namedOrderedReporters.add(reporterName); + } + } else { + LOG.info("Excluding reporter {}, not configured in reporter list ({}).", reporterName, includedReportersString); + } + } + } + } + return namedOrderedReporters; + } + + private static Map loadAvailableReporterFactories(PluginManager pluginManager) { final Map reporterFactories = new HashMap<>(2); - final Iterator factoryIterator = serviceLoader.iterator(); + final Iterator factoryIterator = getAllReporterFactories(pluginManager); + LOG.debug("All available factories (from both SPIs and Plugins):"); + getAllReporterFactories(pluginManager).forEachRemaining(i -> LOG.debug(i.toString())); // do not use streams or for-each loops here because they do not allow catching individual ServiceConfigurationErrors // such an error might be caused if the META-INF/services
[GitHub] [flink] flinkbot edited a comment on issue #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters
flinkbot edited a comment on issue #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters URL: https://github.com/apache/flink/pull/11195#issuecomment-590065588 ## CI report: * 738a6fd162d5ba99432438d224f3b4890961af56 UNKNOWN * 77f24b292a04fde34f4c645151484cf86a13832c UNKNOWN * 5dfb306ba6ff07f700b1afc21847641321705d4d UNKNOWN * 5b0f219689e81fe11c3511c13c7f4943a997a4ef UNKNOWN * 7ce9ad87a780aab19b9c97680d7af5f648c13f3b Travis: [FAILURE](https://travis-ci.com/github/flink-ci/flink/builds/154170382) * 7f6ad172474e37bbf346a7762868c9114510c4c9 UNKNOWN * 2590a69417bdd2b6bf2f4765b276a051040a97cf UNKNOWN * d07ec693eb90e33071e710be0b774fc995f05867 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters
afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters URL: https://github.com/apache/flink/pull/11195#discussion_r395366353 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java ## @@ -160,13 +161,14 @@ public void startCluster() throws ClusterEntrypointException { LOG.info("Starting {}.", getClass().getSimpleName()); try { - - configureFileSystems(configuration); + //TODO: push down filesystem initialization into runCluster - initializeServices (?) Review comment: If it is something non-trivial and hard to make a call about, I would propose to skip this refactoring for now. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters
afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters URL: https://github.com/apache/flink/pull/11195#discussion_r395365114 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java ## @@ -210,17 +213,31 @@ private static ReporterSetup createReporterSetup(String reporterName, MetricConf return namedOrderedReporters; } - private static Map loadReporterFactories() { - final ServiceLoader serviceLoader = ServiceLoader.load(MetricReporterFactory.class); - + private static Map loadAvailableReporterFactories(PluginManager pluginManager) { final Map reporterFactories = new HashMap<>(2); - final Iterator factoryIterator = serviceLoader.iterator(); + final Iterator factoryIterator = getAllReporterFactories(pluginManager); + LOG.info("Prepare reporter factories (from both SPIs and Plugins):"); // do not use streams or for-each loops here because they do not allow catching individual ServiceConfigurationErrors // such an error might be caused if the META-INF/services contains an entry to a non-existing factory class while (factoryIterator.hasNext()) { try { MetricReporterFactory factory = factoryIterator.next(); - reporterFactories.put(factory.getClass().getName(), factory); + String factoryClassName = factory.getClass().getName(); + MetricReporterFactory existingFactory = reporterFactories.get(factoryClassName); + if (existingFactory == null){ + reporterFactories.put(factoryClassName, factory); + LOG.info("Found reporter factory {} at {} ", + factoryClassName, + new File(factory.getClass().getProtectionDomain().getCodeSource().getLocation().toURI()).getCanonicalPath()); + } else { + //TODO: use path information below, when Plugin Classloader stops always prioritizing factories from /lib +// String jarPath1 = new File(existingFactory.getClass().getProtectionDomain().getCodeSource().getLocation() +// .toURI()).getCanonicalPath(); +// String jarPath2 = new File(factory.getClass().getProtectionDomain().getCodeSource().getLocation() +// .toURI()).getCanonicalPath(); +// LOG.warn("Multiple implementations of the same reporter were found: \n {} and \n{}", jarPath1, jarPath2); + LOG.warn("Multiple implementations of the same reporter were found in 'lib' and/or 'plugins' directories for {}. It is recommended to remove redundant reporter JARs to resolve used versions' ambiguity.", factoryClassName); Review comment: I thing the problem is that we have too many variants to describe with a single coordinating conjunction like "or" (plus "or" can unfortunately be both inclusive and exclusive). Cases: (xx) _ () () _ (xx) (x) _ (x) (xx) _ (xx) Alternatives like "a or b or both" also do not work, because in this particular case they can be easily be interpreted as if the "(x) _ (x)" case is not an issue (because of "Multiple" in the beginning). Seems to me like the best case to indicate this ambiguity is is to use "and/or" so that people will be aware of multiple ways this can happen. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters
afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters URL: https://github.com/apache/flink/pull/11195#discussion_r395365114 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java ## @@ -210,17 +213,31 @@ private static ReporterSetup createReporterSetup(String reporterName, MetricConf return namedOrderedReporters; } - private static Map loadReporterFactories() { - final ServiceLoader serviceLoader = ServiceLoader.load(MetricReporterFactory.class); - + private static Map loadAvailableReporterFactories(PluginManager pluginManager) { final Map reporterFactories = new HashMap<>(2); - final Iterator factoryIterator = serviceLoader.iterator(); + final Iterator factoryIterator = getAllReporterFactories(pluginManager); + LOG.info("Prepare reporter factories (from both SPIs and Plugins):"); // do not use streams or for-each loops here because they do not allow catching individual ServiceConfigurationErrors // such an error might be caused if the META-INF/services contains an entry to a non-existing factory class while (factoryIterator.hasNext()) { try { MetricReporterFactory factory = factoryIterator.next(); - reporterFactories.put(factory.getClass().getName(), factory); + String factoryClassName = factory.getClass().getName(); + MetricReporterFactory existingFactory = reporterFactories.get(factoryClassName); + if (existingFactory == null){ + reporterFactories.put(factoryClassName, factory); + LOG.info("Found reporter factory {} at {} ", + factoryClassName, + new File(factory.getClass().getProtectionDomain().getCodeSource().getLocation().toURI()).getCanonicalPath()); + } else { + //TODO: use path information below, when Plugin Classloader stops always prioritizing factories from /lib +// String jarPath1 = new File(existingFactory.getClass().getProtectionDomain().getCodeSource().getLocation() +// .toURI()).getCanonicalPath(); +// String jarPath2 = new File(factory.getClass().getProtectionDomain().getCodeSource().getLocation() +// .toURI()).getCanonicalPath(); +// LOG.warn("Multiple implementations of the same reporter were found: \n {} and \n{}", jarPath1, jarPath2); + LOG.warn("Multiple implementations of the same reporter were found in 'lib' and/or 'plugins' directories for {}. It is recommended to remove redundant reporter JARs to resolve used versions' ambiguity.", factoryClassName); Review comment: I thing the problem is that we have too many variants to describe with a single coordinating conjunction like "or" (plus "or" can unfortunately be both inclusive and exclusive). Cases: (xx) _ () () _ (xx) (x) _ (x) (xx) _ (xx) Alternatives like "a or b or both" also do not work, because in this particular case they can be easily be interpreted as if the "(x) _ (x)" case is not an issue (because of "Multiple" in the beginning). Seems to me like the best case to indicate this ambiguity is is to use "and/or" so that people will be aware for what kinds of problem potentially to look for. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters
afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters URL: https://github.com/apache/flink/pull/11195#discussion_r395365114 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java ## @@ -210,17 +213,31 @@ private static ReporterSetup createReporterSetup(String reporterName, MetricConf return namedOrderedReporters; } - private static Map loadReporterFactories() { - final ServiceLoader serviceLoader = ServiceLoader.load(MetricReporterFactory.class); - + private static Map loadAvailableReporterFactories(PluginManager pluginManager) { final Map reporterFactories = new HashMap<>(2); - final Iterator factoryIterator = serviceLoader.iterator(); + final Iterator factoryIterator = getAllReporterFactories(pluginManager); + LOG.info("Prepare reporter factories (from both SPIs and Plugins):"); // do not use streams or for-each loops here because they do not allow catching individual ServiceConfigurationErrors // such an error might be caused if the META-INF/services contains an entry to a non-existing factory class while (factoryIterator.hasNext()) { try { MetricReporterFactory factory = factoryIterator.next(); - reporterFactories.put(factory.getClass().getName(), factory); + String factoryClassName = factory.getClass().getName(); + MetricReporterFactory existingFactory = reporterFactories.get(factoryClassName); + if (existingFactory == null){ + reporterFactories.put(factoryClassName, factory); + LOG.info("Found reporter factory {} at {} ", + factoryClassName, + new File(factory.getClass().getProtectionDomain().getCodeSource().getLocation().toURI()).getCanonicalPath()); + } else { + //TODO: use path information below, when Plugin Classloader stops always prioritizing factories from /lib +// String jarPath1 = new File(existingFactory.getClass().getProtectionDomain().getCodeSource().getLocation() +// .toURI()).getCanonicalPath(); +// String jarPath2 = new File(factory.getClass().getProtectionDomain().getCodeSource().getLocation() +// .toURI()).getCanonicalPath(); +// LOG.warn("Multiple implementations of the same reporter were found: \n {} and \n{}", jarPath1, jarPath2); + LOG.warn("Multiple implementations of the same reporter were found in 'lib' and/or 'plugins' directories for {}. It is recommended to remove redundant reporter JARs to resolve used versions' ambiguity.", factoryClassName); Review comment: I thing the problem is that we have too many variants to describe with a single coordinating conjunction like "or" (plus "or" can unfortunately be both inclusive and exclusive). Cases: (xx) _ () () _ (xx) (x) _ (x) (xx) _ (xx) Alternatives like "x or y or both" also do not work, because in this particular case they can be easily be interpreted as if the "(x) _ (x)" case is not an issue (because of "Multiple" in the beginning). Seems to me like the best case to indicate this ambiguity is is to use "and/or" so that people will be aware for what kinds of problem potentially to look for. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters
afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters URL: https://github.com/apache/flink/pull/11195#discussion_r395360427 ## File path: flink-end-to-end-tests/flink-metrics-reporter-prometheus-test/src/test/java/org/apache/flink/metrics/prometheus/tests/PrometheusReporterEndToEndITCase.java ## @@ -120,24 +122,51 @@ public static void checkOS() { public final DownloadCache downloadCache = DownloadCache.get(); @Test - public void testReporter() throws Exception { - dist.copyOptJarsToLib("flink-metrics-prometheus"); + public void reporterWorksWhenFoundInLibsViaReflection() throws Exception { + dist.copyOptJarsToLib(PROMETHEUS_JAR_PREFIX); + testReporter(false); + } + + @Test + public void reporterWorksWhenFoundInPluginsViaReflection() throws Exception { + dist.copyOptJarsToPlugins(PROMETHEUS_JAR_PREFIX); + testReporter(false); + } + + @Test + public void reporterWorksWhenFoundInPluginsViaFactories() throws Exception { + dist.copyOptJarsToPlugins(PROMETHEUS_JAR_PREFIX); + testReporter(true); + } + @Test + public void reporterWorksWhenFoundBothInPluginsAndLibsViaFactories() throws Exception { + dist.copyOptJarsToPlugins(PROMETHEUS_JAR_PREFIX); + dist.copyOptJarsToLib(PROMETHEUS_JAR_PREFIX); + testReporter(true); + } + + private void testReporter(boolean useFactory) throws Exception { final Configuration config = new Configuration(); - config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "prom." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, PrometheusReporter.class.getCanonicalName()); + + if (useFactory) { + config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "prom." + ConfigConstants.METRICS_REPORTER_FACTORY_CLASS_SUFFIX, PrometheusReporterFactory.class.getName()); + } else { + config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "prom." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, PrometheusReporter.class.getCanonicalName()); + } + config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "prom.port", "9000-9100"); dist.appendConfiguration(config); final Path tmpPrometheusDir = tmp.newFolder().toPath().resolve("prometheus"); - final Path prometheusArchive = tmpPrometheusDir.resolve(PROMETHEUS_FILE_NAME + ".tar.gz"); final Path prometheusBinDir = tmpPrometheusDir.resolve(PROMETHEUS_FILE_NAME); final Path prometheusConfig = prometheusBinDir.resolve("prometheus.yml"); final Path prometheusBinary = prometheusBinDir.resolve("prometheus"); Files.createDirectory(tmpPrometheusDir); - downloadCache.getOrDownload( - "https://github.com/prometheus/prometheus/releases/download/v; + PROMETHEUS_VERSION + '/' + prometheusArchive.getFileName(), + final Path prometheusArchive = downloadCache.getOrDownload( Review comment: Split as requested. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters
flinkbot edited a comment on issue #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters URL: https://github.com/apache/flink/pull/11195#issuecomment-590065588 ## CI report: * 738a6fd162d5ba99432438d224f3b4890961af56 UNKNOWN * 77f24b292a04fde34f4c645151484cf86a13832c UNKNOWN * 5dfb306ba6ff07f700b1afc21847641321705d4d UNKNOWN * 5b0f219689e81fe11c3511c13c7f4943a997a4ef UNKNOWN * 7ce9ad87a780aab19b9c97680d7af5f648c13f3b Travis: [FAILURE](https://travis-ci.com/github/flink-ci/flink/builds/154170382) * 7f6ad172474e37bbf346a7762868c9114510c4c9 UNKNOWN * 2590a69417bdd2b6bf2f4765b276a051040a97cf UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters
afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters URL: https://github.com/apache/flink/pull/11195#discussion_r395348055 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java ## @@ -179,28 +164,82 @@ private static ReporterSetup createReporterSetup(String reporterName, MetricConf metricReporterOptional.ifPresent(reporter -> { MetricConfig metricConfig = new MetricConfig(); reporterConfig.addAllToProperties(metricConfig); - - reporterArguments.add(createReporterSetup(reporterName, metricConfig, reporter)); + reporterSetups.add(createReporterSetup(reporterName, metricConfig, reporter)); }); - } - catch (Throwable t) { + } catch (Throwable t) { LOG.error("Could not instantiate metrics reporter {}. Metrics might not be exposed/reported.", reporterName, t); } } - return reporterArguments; + return reporterSetups; } - private static Map loadReporterFactories() { - final ServiceLoader serviceLoader = ServiceLoader.load(MetricReporterFactory.class); + private static List> loadReporterConfigurations(Configuration configuration, Set namedReporters) { + final List> reporterConfigurations = new ArrayList<>(namedReporters.size()); + for (String namedReporter: namedReporters) { + DelegatingConfiguration delegatingConfiguration = new DelegatingConfiguration( + configuration, + ConfigConstants.METRICS_REPORTER_PREFIX + namedReporter + '.'); + + reporterConfigurations.add(Tuple2.of(namedReporter, delegatingConfiguration)); + } + return reporterConfigurations; + } + + private static Set findEnabledReportersInConfiguration(Configuration configuration, String includedReportersString) { + Set includedReporters = reporterListPattern.splitAsStream(includedReportersString) + .filter(r -> !r.isEmpty()) // splitting an empty string results in an empty string on jdk9+ + .collect(Collectors.toSet()); Review comment: @AHeise I have applied this refactoring but then understood that I probably did not get what you actually propose. Could you please clarify? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters
flinkbot edited a comment on issue #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters URL: https://github.com/apache/flink/pull/11195#issuecomment-590065588 ## CI report: * 738a6fd162d5ba99432438d224f3b4890961af56 UNKNOWN * 77f24b292a04fde34f4c645151484cf86a13832c UNKNOWN * 5dfb306ba6ff07f700b1afc21847641321705d4d UNKNOWN * 5b0f219689e81fe11c3511c13c7f4943a997a4ef UNKNOWN * 7ce9ad87a780aab19b9c97680d7af5f648c13f3b Travis: [FAILURE](https://travis-ci.com/github/flink-ci/flink/builds/154170382) * 7f6ad172474e37bbf346a7762868c9114510c4c9 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters
afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters URL: https://github.com/apache/flink/pull/11195#discussion_r395348055 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java ## @@ -179,28 +164,82 @@ private static ReporterSetup createReporterSetup(String reporterName, MetricConf metricReporterOptional.ifPresent(reporter -> { MetricConfig metricConfig = new MetricConfig(); reporterConfig.addAllToProperties(metricConfig); - - reporterArguments.add(createReporterSetup(reporterName, metricConfig, reporter)); + reporterSetups.add(createReporterSetup(reporterName, metricConfig, reporter)); }); - } - catch (Throwable t) { + } catch (Throwable t) { LOG.error("Could not instantiate metrics reporter {}. Metrics might not be exposed/reported.", reporterName, t); } } - return reporterArguments; + return reporterSetups; } - private static Map loadReporterFactories() { - final ServiceLoader serviceLoader = ServiceLoader.load(MetricReporterFactory.class); + private static List> loadReporterConfigurations(Configuration configuration, Set namedReporters) { + final List> reporterConfigurations = new ArrayList<>(namedReporters.size()); + for (String namedReporter: namedReporters) { + DelegatingConfiguration delegatingConfiguration = new DelegatingConfiguration( + configuration, + ConfigConstants.METRICS_REPORTER_PREFIX + namedReporter + '.'); + + reporterConfigurations.add(Tuple2.of(namedReporter, delegatingConfiguration)); + } + return reporterConfigurations; + } + + private static Set findEnabledReportersInConfiguration(Configuration configuration, String includedReportersString) { + Set includedReporters = reporterListPattern.splitAsStream(includedReportersString) + .filter(r -> !r.isEmpty()) // splitting an empty string results in an empty string on jdk9+ + .collect(Collectors.toSet()); Review comment: @AHeise I have applied this refactoring but then understood that I probably did not get what you actually propose. Could you clarify? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Closed] (FLINK-10424) Inconsistency between JsonSchemaConveerter and FlinkTypeFactory
[ https://issues.apache.org/jira/browse/FLINK-10424?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dominik Wosiński closed FLINK-10424. Resolution: Fixed > Inconsistency between JsonSchemaConveerter and FlinkTypeFactory > --- > > Key: FLINK-10424 > URL: https://issues.apache.org/jira/browse/FLINK-10424 > Project: Flink > Issue Type: Bug > Components: API / Type Serialization System >Affects Versions: 1.6.0 >Reporter: Dominik Wosiński >Assignee: Dominik Wosiński >Priority: Major > > There is still an inconsistency between _JsonSchemaConverter_ and > _FlinkTypeFactory_ in case of using JsonSchema with _integer_ type field. > _JsonSchemaConverter_ will return BigInteger Type Information for _integer_, > but _FlinkTypeFactory_ currently does not support BigInteger Type Info and > thus an exception will be thrown. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] flinkbot edited a comment on issue #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters
flinkbot edited a comment on issue #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters URL: https://github.com/apache/flink/pull/11195#issuecomment-590065588 ## CI report: * 738a6fd162d5ba99432438d224f3b4890961af56 UNKNOWN * 77f24b292a04fde34f4c645151484cf86a13832c UNKNOWN * 5dfb306ba6ff07f700b1afc21847641321705d4d UNKNOWN * 5b0f219689e81fe11c3511c13c7f4943a997a4ef UNKNOWN * 7ce9ad87a780aab19b9c97680d7af5f648c13f3b Travis: [FAILURE](https://travis-ci.com/github/flink-ci/flink/builds/154170382) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters
afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters URL: https://github.com/apache/flink/pull/11195#discussion_r395337241 ## File path: flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusReporterFactory.java ## @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.metrics.prometheus; + +import org.apache.flink.core.plugin.Plugin; +import org.apache.flink.metrics.reporter.MetricReporterFactory; + +import java.util.Properties; + +/** + * {@link MetricReporterFactory} for {@link PrometheusReporter}. + */ +public class PrometheusReporterFactory implements MetricReporterFactory, Plugin { Review comment: Added. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters
afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters URL: https://github.com/apache/flink/pull/11195#discussion_r395335333 ## File path: flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusReporterFactory.java ## @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.metrics.prometheus; + +import org.apache.flink.core.plugin.Plugin; +import org.apache.flink.metrics.reporter.MetricReporterFactory; + +import java.util.Properties; + +/** + * {@link MetricReporterFactory} for {@link PrometheusReporter}. + */ +public class PrometheusReporterFactory implements MetricReporterFactory, Plugin { + + @Override + public PrometheusReporter createMetricReporter(Properties properties) { + return new PrometheusReporter(); Review comment: It seems that this will pull a rather large refactoring with it, because of the call to `super.open(config)` in the `open` methods and because of having to reconcile different configuration containers - `Properties` vs `MetricsConfig`. I would prefer to address it in a separate refactoring PR, if possible. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services