[GitHub] [flink] flinkbot edited a comment on issue #9984: [FLINK-9495][kubernetes] Implement ResourceManager for Kubernetes.
flinkbot edited a comment on issue #9984: [FLINK-9495][kubernetes] Implement ResourceManager for Kubernetes. URL: https://github.com/apache/flink/pull/9984#issuecomment-545881191 ## CI report: * 802ebf37e3d932169f9826b40df483bb5e9ac064 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/133366796) * f16938ce2fb38ae216def737d14643b94d6083a1 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/133940607) * 20de5cfc7af9a8ba57080d5218fd0293f393a40e : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/133998545) * 56bfbb65802c1d5c48caa625a152070934bb5d79 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/134010321) * 3ea229382fef64b1046673c79ff845d4689c5db4 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/137145261) * 3169988a33e0126e79cd449740c93d3561296ead : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/138017410) * a0dd858b0b91443fc87895a2d32ebfbbc0b9fe4c : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/138730857) * 86b4537979265a0fbecf7c1841ed8fd2f7ebfd86 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/138770610) * d77f83e133b497e42cd85aeaf95e625411274c92 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/139116897) * f2ed0e4b8d37dc3d5b2b770d943e703f4d893da0 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/139256990) * ce01f742aba3a683b98a0ea7da47e678d30c12be : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/139290769) * 7392ac618203c2544ab4a313a7068e03fcc147a7 : CANCELED [Build](https://travis-ci.com/flink-ci/flink/builds/139453237) * 3b3e23e69bde65f504ea5d46ef7c2d55ba04c0af : CANCELED [Build](https://travis-ci.com/flink-ci/flink/builds/139459406) * 71c087913517972655181e0119588c886e4243cb : CANCELED [Build](https://travis-ci.com/flink-ci/flink/builds/139461705) * 9be9f9f2338a2a953a76cc60057554d7d90157f9 : CANCELED [Build](https://travis-ci.com/flink-ci/flink/builds/139518666) * bb01ae34a8d6087f4aef4a989c0d14d576df601f : CANCELED [Build](https://travis-ci.com/flink-ci/flink/builds/139523468) * fd9cdf56812f2d0c9d659b69d9fb243d58ef48a3 : CANCELED [Build](https://travis-ci.com/flink-ci/flink/builds/139555865) * c647669902f574c3ee23adc7efbabc859d93c0e4 : CANCELED [Build](https://travis-ci.com/flink-ci/flink/builds/139627616) * cc29fb3058998b1489451d53ad04d9e9886f4193 : CANCELED [Build](https://travis-ci.com/flink-ci/flink/builds/139635222) * 3d315606bd4fdf826ef823631116a189810dab6a : CANCELED [Build](https://travis-ci.com/flink-ci/flink/builds/139641767) * 37bfdff9aa766deaac10d9c5407cc244d14d1085 : CANCELED [Build](https://travis-ci.com/flink-ci/flink/builds/139643868) * daa0ff528a8ec3c2dcbd33927099cee2d8faa113 : CANCELED [Build](https://travis-ci.com/flink-ci/flink/builds/139650255) * 17ba1baf2383844c1b0ca8aa8ca97a80646e77fb : PENDING [Build](https://travis-ci.com/flink-ci/flink/builds/139653134) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #10455: [FLINK-15089][connectors] Puslar catalog
flinkbot edited a comment on issue #10455: [FLINK-15089][connectors] Puslar catalog URL: https://github.com/apache/flink/pull/10455#issuecomment-562462991 ## CI report: * 206bb09e47ff4f36514a93c96f7cadc30f62cd4b : PENDING [Build](https://travis-ci.com/flink-ci/flink/builds/139653117) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] wangyang0918 commented on a change in pull request #9984: [FLINK-9495][kubernetes] Implement ResourceManager for Kubernetes.
wangyang0918 commented on a change in pull request #9984: [FLINK-9495][kubernetes] Implement ResourceManager for Kubernetes. URL: https://github.com/apache/flink/pull/9984#discussion_r354702929 ## File path: flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesResourceManagerTest.java ## @@ -0,0 +1,372 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions; +import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient; +import org.apache.flink.kubernetes.kubeclient.TaskManagerPodParameter; +import org.apache.flink.kubernetes.kubeclient.resources.KubernetesPod; +import org.apache.flink.kubernetes.utils.Constants; +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.clusterframework.types.SlotID; +import org.apache.flink.runtime.entrypoint.ClusterInformation; +import org.apache.flink.runtime.heartbeat.HeartbeatServices; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.instance.HardwareDescription; +import org.apache.flink.runtime.messages.Acknowledge; +import org.apache.flink.runtime.metrics.groups.ResourceManagerMetricGroup; +import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; +import org.apache.flink.runtime.registration.RegistrationResponse; +import org.apache.flink.runtime.resourcemanager.JobLeaderIdService; +import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; +import org.apache.flink.runtime.resourcemanager.SlotRequest; +import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager; +import org.apache.flink.runtime.resourcemanager.utils.MockResourceManagerRuntimeServices; +import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.runtime.rpc.TestingRpcService; +import org.apache.flink.runtime.taskexecutor.SlotReport; +import org.apache.flink.runtime.taskexecutor.SlotStatus; +import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway; +import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess; +import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder; +import org.apache.flink.runtime.util.TestingFatalErrorHandler; + +import io.fabric8.kubernetes.api.model.Container; +import io.fabric8.kubernetes.api.model.ContainerStateBuilder; +import io.fabric8.kubernetes.api.model.ContainerStatusBuilder; +import io.fabric8.kubernetes.api.model.EnvVarBuilder; +import io.fabric8.kubernetes.api.model.Pod; +import io.fabric8.kubernetes.api.model.PodList; +import io.fabric8.kubernetes.api.model.PodStatusBuilder; +import io.fabric8.kubernetes.client.KubernetesClient; +import org.hamcrest.Matchers; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; + +import static junit.framework.TestCase.assertEquals; +import static org.hamcrest.Matchers.instanceOf; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + +/** + * Test cases for {@link KubernetesResourceManager}. + */ +public class KubernetesResourceManagerTest extends KubernetesTestBase { + + private static final Time TIMEOUT = Time.seconds(10L); + + private TestingFatalErrorHandler testingFatalErrorHandler; + + private final String jobManagerHost = "jm-host1"; + + private Configuration flinkConfig; + + private TestingKubernetesResourceManager resourceManager; + + private Fli
[GitHub] [flink] flinkbot commented on issue #10458: [FLINK-14815][rest]Expose network metric in IOMetricsInfo
flinkbot commented on issue #10458: [FLINK-14815][rest]Expose network metric in IOMetricsInfo URL: https://github.com/apache/flink/pull/10458#issuecomment-562472069 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Automated Checks Last check on commit 303f921f58ff61aa3e332c36ffdc07d5f9ceb352 (Fri Dec 06 07:57:08 UTC 2019) **Warnings:** * No documentation files were touched! Remember to keep the Flink docs up to date! Mention the bot in a comment to re-run the automated checks. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] TsReaper commented on issue #10306: [FLINK-13943][table-api] Provide utility method to convert Flink table to Java List
TsReaper commented on issue #10306: [FLINK-13943][table-api] Provide utility method to convert Flink table to Java List URL: https://github.com/apache/flink/pull/10306#issuecomment-562471647 Travis passed: https://travis-ci.com/TsReaper/flink/builds/139638323 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] wsry commented on a change in pull request #10375: [FLINK-14845][runtime] Introduce data compression to reduce disk and network IO of shuffle.
wsry commented on a change in pull request #10375: [FLINK-14845][runtime] Introduce data compression to reduce disk and network IO of shuffle. URL: https://github.com/apache/flink/pull/10375#discussion_r354702467 ## File path: flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java ## @@ -54,6 +54,28 @@ .withDescription("Enable SSL support for the taskmanager data transport. This is applicable only when the" + " global flag for internal SSL (" + SecurityOptions.SSL_INTERNAL_ENABLED.key() + ") is set to true"); + /** +* Boolean flag indicating whether the shuffle data will be compressed or not. +* +* Note: Data is compressed per buffer (may be sliced buffer in pipeline mode) and compression can incur extra +* CPU overhead so it is more effective for IO bounded scenario when data compression ratio is high. +*/ + public static final ConfigOption DATA_COMPRESSION_ENABLED = + key("taskmanager.data.compression.enabled") Review comment: I am a little afraid that when we make it pluggable in the future, the "LZ4" config value can be dropped and users already using compression may have to change their configurations. Currently, the codec config option is not documented and we can change it to any other values in the future when we make it pluggable in the future. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-14815) Expose network metric in IOMetricsInfo
[ https://issues.apache.org/jira/browse/FLINK-14815?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-14815: --- Labels: pull-request-available (was: ) > Expose network metric in IOMetricsInfo > -- > > Key: FLINK-14815 > URL: https://issues.apache.org/jira/browse/FLINK-14815 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Metrics, Runtime / Network, Runtime / REST >Reporter: lining >Assignee: lining >Priority: Major > Labels: pull-request-available > > * SubTask > ** pool usage: outPoolUsage, inputExclusiveBuffersUsage, > inputFloatingBuffersUsage. > *** If the subtask is not back pressured, but it is causing backpressure > (full input, empty output) > *** By comparing exclusive/floating buffers usage, whether all channels are > back-pressure or only some of them > ** back-pressured for show whether it is back pressured. > * Vertex > ** pool usage: outPoolUsageAvg, inputExclusiveBuffersUsageAvg, > inputFloatingBuffersUsageAvg > ** back-pressured for show whether it is back pressured(merge all iths > subtasks) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] jinglining opened a new pull request #10458: [FLINK-14815][rest]Expose network metric in IOMetricsInfo
jinglining opened a new pull request #10458: [FLINK-14815][rest]Expose network metric in IOMetricsInfo URL: https://github.com/apache/flink/pull/10458 ## What is the purpose of the change - Expose network metric in IOMetricsInfo - JobVertex exposes is-backpressed、out-pool-usage-avg、input-exclusive-buffers-usage-avg、input-floating-buffers-usage-avg. - SubTask exposes is-backpressed、out-pool-usage、input-exclusive-buffers-usage、input-floating-buffers-usage. ## Brief change log - calculate in MutableIOMetrics#addIOMetrics - add SubTaskIOMetricsInfo for subtask's metrics in rest api. - add JobVertexIOMetricsInfo for job vertex's metrics in rest api. ## Verifying this change This change is already covered by existing tests, such as SubtaskCurrentAttemptDetailsHandlerTest、SubtaskExecutionAttemptDetailsHandlerTest、JobDetailsInfoTest、SubtaskExecutionAttemptDetailsInfoTest、JobVertexDetailsInfoTest. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (yes) - If yes, how is the feature documented? (docs) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #10422: [FLINK-15057][tests] Set JM and TM memory config in flink-conf.yaml
flinkbot edited a comment on issue #10422: [FLINK-15057][tests] Set JM and TM memory config in flink-conf.yaml URL: https://github.com/apache/flink/pull/10422#issuecomment-561743112 ## CI report: * 88f6f7790d8bb0b517958be4446e95146605c369 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/139377231) * 5c4f99c16f800edf9f8f9de5889be7e7ee7b297a : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/139563662) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #9984: [FLINK-9495][kubernetes] Implement ResourceManager for Kubernetes.
zhijiangW commented on a change in pull request #9984: [FLINK-9495][kubernetes] Implement ResourceManager for Kubernetes. URL: https://github.com/apache/flink/pull/9984#discussion_r354701520 ## File path: flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesResourceManagerTest.java ## @@ -0,0 +1,372 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions; +import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient; +import org.apache.flink.kubernetes.kubeclient.TaskManagerPodParameter; +import org.apache.flink.kubernetes.kubeclient.resources.KubernetesPod; +import org.apache.flink.kubernetes.utils.Constants; +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.clusterframework.types.SlotID; +import org.apache.flink.runtime.entrypoint.ClusterInformation; +import org.apache.flink.runtime.heartbeat.HeartbeatServices; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.instance.HardwareDescription; +import org.apache.flink.runtime.messages.Acknowledge; +import org.apache.flink.runtime.metrics.groups.ResourceManagerMetricGroup; +import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; +import org.apache.flink.runtime.registration.RegistrationResponse; +import org.apache.flink.runtime.resourcemanager.JobLeaderIdService; +import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; +import org.apache.flink.runtime.resourcemanager.SlotRequest; +import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager; +import org.apache.flink.runtime.resourcemanager.utils.MockResourceManagerRuntimeServices; +import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.runtime.rpc.TestingRpcService; +import org.apache.flink.runtime.taskexecutor.SlotReport; +import org.apache.flink.runtime.taskexecutor.SlotStatus; +import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway; +import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess; +import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder; +import org.apache.flink.runtime.util.TestingFatalErrorHandler; + +import io.fabric8.kubernetes.api.model.Container; +import io.fabric8.kubernetes.api.model.ContainerStateBuilder; +import io.fabric8.kubernetes.api.model.ContainerStatusBuilder; +import io.fabric8.kubernetes.api.model.EnvVarBuilder; +import io.fabric8.kubernetes.api.model.Pod; +import io.fabric8.kubernetes.api.model.PodList; +import io.fabric8.kubernetes.api.model.PodStatusBuilder; +import io.fabric8.kubernetes.client.KubernetesClient; +import org.hamcrest.Matchers; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; + +import static junit.framework.TestCase.assertEquals; +import static org.hamcrest.Matchers.instanceOf; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + +/** + * Test cases for {@link KubernetesResourceManager}. + */ +public class KubernetesResourceManagerTest extends KubernetesTestBase { + + private static final Time TIMEOUT = Time.seconds(10L); + + private TestingFatalErrorHandler testingFatalErrorHandler; + + private final String jobManagerHost = "jm-host1"; + + private Configuration flinkConfig; + + private TestingKubernetesResourceManager resourceManager; + + private FlinkK
[jira] [Commented] (FLINK-15087) JobManager is forced to shutdown JVM due to temporary loss of zookeeper connection
[ https://issues.apache.org/jira/browse/FLINK-15087?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16989482#comment-16989482 ] lamber-ken commented on FLINK-15087: hi, this may be is a duplicate of FLINK-10052. > JobManager is forced to shutdown JVM due to temporary loss of zookeeper > connection > -- > > Key: FLINK-15087 > URL: https://issues.apache.org/jira/browse/FLINK-15087 > Project: Flink > Issue Type: Bug >Affects Versions: 1.8.2 >Reporter: Abdul Qadeer >Priority: Major > > While testing I found that the loss of connection with zookeeper triggers JVM > shutdown for Job Manager, when started through > "StandaloneSessionClusterEntrypoint". This happens due to a NPE on > "taskManagerHeartbeatManager." > When JobManagerRunner suspends jobMasterService (as Job manager is no longer > leader), "taskManagerHeartbeatManager" is set to null in > "stopHeartbeatServices". > Next, "AkkaRpcActor" stops JobMaster and throws NPE in the following method: > {code:java} > @Override > public CompletableFuture disconnectTaskManager(final ResourceID > resourceID, final Exception cause) { >log.debug("Disconnect TaskExecutor {} because: {}", resourceID, > cause.getMessage()); >taskManagerHeartbeatManager.unmonitorTarget(resourceID); >slotPool.releaseTaskManager(resourceID, cause); > {code} > > This leads to a fatal error finally in "ClusterEntryPoint.onFatalError()" and > forces JVM shutdown. > The stack trace is below: > > {noformat} > {"timeMillis":1575581120723,"thread":"flink-akka.actor.default-dispatcher-93","level":"ERROR","loggerName":"com.Sample","message":"Failed > to take leadership with session id > b4662db5-f065-41d9-aaaf-78625355b251.","thrown":{"commonElementCount":0,"localizedMessage":"Failed > to take leadership with session id > b4662db5-f065-41d9-aaaf-78625355b251.","message":"Failed to take leadership > with session id > b4662db5-f065-41d9-aaaf-78625355b251.","name":"org.apache.flink.runtime.dispatcher.DispatcherException","cause":{"commonElementCount":18,"localizedMessage":"Termination > of previous JobManager for job bbb8c430787d92293e9d45c349231d9c failed. > Cannot submit job under the same job id.","message":"Termination of previous > JobManager for job bbb8c430787d92293e9d45c349231d9c failed. Cannot submit job > under the same job > id.","name":"org.apache.flink.runtime.dispatcher.DispatcherException","cause":{"commonElementCount":6,"localizedMessage":"org.apache.flink.util.FlinkException: > Could not properly shut down the > JobManagerRunner","message":"org.apache.flink.util.FlinkException: Could not > properly shut down the > JobManagerRunner","name":"java.util.concurrent.CompletionException","cause":{"commonElementCount":6,"localizedMessage":"Could > not properly shut down the JobManagerRunner","message":"Could not properly > shut down the > JobManagerRunner","name":"org.apache.flink.util.FlinkException","cause":{"commonElementCount":13,"localizedMessage":"Failure > while stopping RpcEndpoint jobmanager_0.","message":"Failure while stopping > RpcEndpoint > jobmanager_0.","name":"org.apache.flink.runtime.rpc.akka.exceptions.AkkaRpcException","cause":{"commonElementCount":13,"name":"java.lang.NullPointerException","extendedStackTrace":[{"class":"org.apache.flink.runtime.jobmaster.JobMaster","method":"disconnectTaskManager","file":"JobMaster.java","line":629,"exact":false,"location":"flink-runtime_2.11-1.8.2.jar","version":"1.8.2"},{"class":"org.apache.flink.runtime.jobmaster.JobMaster","method":"onStop","file":"JobMaster.java","line":346,"exact":false,"location":"flink-runtime_2.11-1.8.2.jar","version":"1.8.2"},{"class":"org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StartedState","method":"terminate","file":"AkkaRpcActor.java","line":504,"exact":false,"location":"flink-runtime_2.11-1.8.2.jar","version":"1.8.2"},{"class":"org.apache.flink.runtime.rpc.akka.AkkaRpcActor","method":"handleControlMessage","file":"AkkaRpcActor.java","line":170,"exact":false,"location":"flink-runtime_2.11-1.8.2.jar","version":"1.8.2"},{"class":"org.apache.flink.runtime.rpc.akka.AkkaRpcActor","method":"onReceive","file":"AkkaRpcActor.java","line":142,"exact":false,"location":"flink-runtime_2.11-1.8.2.jar","version":"1.8.2"}]},"extendedStackTrace":[{"class":"org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StartedState","method":"terminate","file":"AkkaRpcActor.java","line":508,"exact":false,"location":"flink-runtime_2.11-1.8.2.jar","version":"1.8.2"},{"class":"org.apache.flink.runtime.rpc.akka.AkkaRpcActor","method":"handleControlMessage","file":"AkkaRpcActor.java","line":170,"exact":false,"location":"flink-runtime_2.11-1.8.2.jar","version":"1.8.2"},{"class":"org.apache.flink.runtime.rpc.akka.AkkaRpcActor","method":"onReceive","file":
[GitHub] [flink] dianfu commented on a change in pull request #10430: [FLINK-14019][python][cli] Add command-line options for managing Python UDF environment and dependencies.
dianfu commented on a change in pull request #10430: [FLINK-14019][python][cli] Add command-line options for managing Python UDF environment and dependencies. URL: https://github.com/apache/flink/pull/10430#discussion_r354672644 ## File path: flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java ## @@ -183,13 +213,19 @@ STOP_AND_DRAIN.setRequired(false); PY_OPTION.setRequired(false); - PY_OPTION.setArgName("python"); + PY_OPTION.setArgName("pythonFile"); PYFILES_OPTION.setRequired(false); - PYFILES_OPTION.setArgName("pyFiles"); + PYFILES_OPTION.setArgName("pythonFiles"); PYMODULE_OPTION.setRequired(false); PYMODULE_OPTION.setArgName("pyModule"); + + PYREQUIREMENTS_OPTION.setRequired(false); + + PYARCHIVE_OPTION.setRequired(false); Review comment: PYARCHIVE_OPTION.setArgName("pyArchives") This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] dianfu commented on a change in pull request #10430: [FLINK-14019][python][cli] Add command-line options for managing Python UDF environment and dependencies.
dianfu commented on a change in pull request #10430: [FLINK-14019][python][cli] Add command-line options for managing Python UDF environment and dependencies. URL: https://github.com/apache/flink/pull/10430#discussion_r354685811 ## File path: flink-python/pyflink/common/tests/test_python_option.py ## @@ -0,0 +1,130 @@ + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json +import os +import tempfile + +from py4j.protocol import Py4JError +from pyflink.util.utils import to_jarray + +from pyflink.common import Configuration +from pyflink.java_gateway import launch_gateway + +from pyflink.common.dependency_manager import DependencyManager + +from pyflink.testing.test_case_utils import (PyFlinkTestCase, replace_uuid, + TestEnv, encode_to_base64, + create_empty_file) + + +class PythonOptionTests(PyFlinkTestCase): + +def setUp(self): +self.j_env = TestEnv() +self.config = Configuration() +self.dependency_manager = DependencyManager(self.config, self.j_env) + +def test_python_options(self): +dm = DependencyManager + +system_env = dict() +system_env[dm.PYFLINK_PY_FILES] = "/file1.py\nhdfs://file2.zip\nfile3.egg" +system_env[dm.PYFLINK_PY_REQUIREMENTS] = "a.txt\nb_dir" +system_env[dm.PYFLINK_PY_EXECUTABLE] = "/usr/local/bin/python" +system_env[dm.PYFLINK_PY_ARCHIVES] = "/py3.zip\nvenv\n/py3.zip\n\ndata.zip\ndata" + +self.dependency_manager.load_from_env(system_env) + +configs = self.config.to_dict() +python_files = replace_uuid(json.loads(configs[dm.PYTHON_FILES])) +python_requirements_file = replace_uuid(configs[dm.PYTHON_REQUIREMENTS_FILE]) +python_requirements_cache = replace_uuid(configs[dm.PYTHON_REQUIREMENTS_CACHE]) +python_archives = replace_uuid(json.loads(configs[dm.PYTHON_ARCHIVES])) +python_exec = configs[dm.PYTHON_EXEC] +registered_files = replace_uuid(self.j_env.to_dict()) + +self.assertEqual( +{"python_file_0_{uuid}": "file1.py", + "python_file_1_{uuid}": "file2.zip", + "python_file_2_{uuid}": "file3.egg"}, python_files) +self.assertEqual( +{"python_archive_3_{uuid}": "venv", + "python_archive_4_{uuid}": "py3.zip", + "python_archive_5_{uuid}": "data"}, python_archives) +self.assertEqual("python_requirements_file_6_{uuid}", python_requirements_file) +self.assertEqual("python_requirements_cache_7_{uuid}", python_requirements_cache) +self.assertEqual("/usr/local/bin/python", python_exec) +self.assertEqual( +{"python_file_0_{uuid}": "/file1.py", + "python_file_1_{uuid}": "hdfs://file2.zip", + "python_file_2_{uuid}": "file3.egg", + "python_archive_3_{uuid}": "/py3.zip", + "python_archive_4_{uuid}": "/py3.zip", + "python_archive_5_{uuid}": "data.zip", + "python_requirements_file_6_{uuid}": "a.txt", + "python_requirements_cache_7_{uuid}": "b_dir"}, registered_files) + +def test_python_options_integrated(self): +tmp_dir = tempfile.mkdtemp(dir=self.tempdir) Review comment: tempdir could be used directly, there is no need to create it again, what do you think? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] dianfu commented on a change in pull request #10430: [FLINK-14019][python][cli] Add command-line options for managing Python UDF environment and dependencies.
dianfu commented on a change in pull request #10430: [FLINK-14019][python][cli] Add command-line options for managing Python UDF environment and dependencies. URL: https://github.com/apache/flink/pull/10430#discussion_r354660340 ## File path: flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java ## @@ -126,14 +126,44 @@ static final Option PYFILES_OPTION = new Option("pyfs", "pyFiles", true, "Attach custom python files for job. " + - "Comma can be used as the separator to specify multiple files. " + - "The standard python resource file suffixes such as .py/.egg/.zip are all supported." + - "(eg: --pyFiles file:///tmp/myresource.zip,hdfs:///$namenode_address/myresource2.zip)"); + "These files will be added to the PYTHONPATH of both the local client and the remote python UDF worker. " + + "The standard python resource file suffixes such as .py/.egg/.zip or directory are all supported. " + + "Comma (',') could be used as the separator to specify multiple files " + + "(e.g.: --pyFiles file:///tmp/myresource.zip,hdfs:///$namenode_address/myresource2.zip)."); static final Option PYMODULE_OPTION = new Option("pym", "pyModule", true, "Python module with the program entry point. " + "This option must be used in conjunction with `--pyFiles`."); + static final Option PYREQUIREMENTS_OPTION = new Option("pyreq", "pyRequirements", true, + "Specify a requirements.txt file which defines the third-party dependencies. " + + "These dependencies will be installed and added to the PYTHONPATH of the python UDF worker. " + + "A directory which contains the installation packages of these dependencies could be specified" + Review comment: Missing empty space at the end This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] dianfu commented on a change in pull request #10430: [FLINK-14019][python][cli] Add command-line options for managing Python UDF environment and dependencies.
dianfu commented on a change in pull request #10430: [FLINK-14019][python][cli] Add command-line options for managing Python UDF environment and dependencies. URL: https://github.com/apache/flink/pull/10430#discussion_r354669358 ## File path: flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java ## @@ -126,14 +126,44 @@ static final Option PYFILES_OPTION = new Option("pyfs", "pyFiles", true, "Attach custom python files for job. " + - "Comma can be used as the separator to specify multiple files. " + - "The standard python resource file suffixes such as .py/.egg/.zip are all supported." + - "(eg: --pyFiles file:///tmp/myresource.zip,hdfs:///$namenode_address/myresource2.zip)"); + "These files will be added to the PYTHONPATH of both the local client and the remote python UDF worker. " + + "The standard python resource file suffixes such as .py/.egg/.zip or directory are all supported. " + + "Comma (',') could be used as the separator to specify multiple files " + + "(e.g.: --pyFiles file:///tmp/myresource.zip,hdfs:///$namenode_address/myresource2.zip)."); static final Option PYMODULE_OPTION = new Option("pym", "pyModule", true, "Python module with the program entry point. " + "This option must be used in conjunction with `--pyFiles`."); + static final Option PYREQUIREMENTS_OPTION = new Option("pyreq", "pyRequirements", true, + "Specify a requirements.txt file which defines the third-party dependencies. " + + "These dependencies will be installed and added to the PYTHONPATH of the python UDF worker. " + + "A directory which contains the installation packages of these dependencies could be specified" + + "via second parameter. Use '#' as the separator of parameters if second parameter exists. " + Review comment: What about "could be specified optionally. Use '#' as the separator if the optional parameter exists. "? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] dianfu commented on a change in pull request #10430: [FLINK-14019][python][cli] Add command-line options for managing Python UDF environment and dependencies.
dianfu commented on a change in pull request #10430: [FLINK-14019][python][cli] Add command-line options for managing Python UDF environment and dependencies. URL: https://github.com/apache/flink/pull/10430#discussion_r354672690 ## File path: flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java ## @@ -183,13 +213,19 @@ STOP_AND_DRAIN.setRequired(false); PY_OPTION.setRequired(false); - PY_OPTION.setArgName("python"); + PY_OPTION.setArgName("pythonFile"); PYFILES_OPTION.setRequired(false); - PYFILES_OPTION.setArgName("pyFiles"); + PYFILES_OPTION.setArgName("pythonFiles"); PYMODULE_OPTION.setRequired(false); PYMODULE_OPTION.setArgName("pyModule"); + + PYREQUIREMENTS_OPTION.setRequired(false); + + PYARCHIVE_OPTION.setRequired(false); + + PYEXEC_OPTION.setRequired(false); Review comment: PYEXEC_OPTION.setArgName("pyExecutable") This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] dianfu commented on a change in pull request #10430: [FLINK-14019][python][cli] Add command-line options for managing Python UDF environment and dependencies.
dianfu commented on a change in pull request #10430: [FLINK-14019][python][cli] Add command-line options for managing Python UDF environment and dependencies. URL: https://github.com/apache/flink/pull/10430#discussion_r354675266 ## File path: flink-python/pyflink/common/dependency_manager.py ## @@ -39,6 +39,15 @@ class DependencyManager(object): PYTHON_ARCHIVES = "python.archives" PYTHON_EXEC = "python.exec" +# Environment variable names used to store the dependency settings specified via commandline +# options. +# The "PYFLINK_" prefix indicates that the variable is used to transmit data from external Review comment: Personally I think this paragraph is not necessary. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] dianfu commented on a change in pull request #10430: [FLINK-14019][python][cli] Add command-line options for managing Python UDF environment and dependencies.
dianfu commented on a change in pull request #10430: [FLINK-14019][python][cli] Add command-line options for managing Python UDF environment and dependencies. URL: https://github.com/apache/flink/pull/10430#discussion_r354672570 ## File path: flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java ## @@ -183,13 +213,19 @@ STOP_AND_DRAIN.setRequired(false); PY_OPTION.setRequired(false); - PY_OPTION.setArgName("python"); + PY_OPTION.setArgName("pythonFile"); PYFILES_OPTION.setRequired(false); - PYFILES_OPTION.setArgName("pyFiles"); + PYFILES_OPTION.setArgName("pythonFiles"); PYMODULE_OPTION.setRequired(false); PYMODULE_OPTION.setArgName("pyModule"); + + PYREQUIREMENTS_OPTION.setRequired(false); Review comment: PYREQUIREMENTS_OPTION.setArgName("pyRequirements") This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] dianfu commented on a change in pull request #10430: [FLINK-14019][python][cli] Add command-line options for managing Python UDF environment and dependencies.
dianfu commented on a change in pull request #10430: [FLINK-14019][python][cli] Add command-line options for managing Python UDF environment and dependencies. URL: https://github.com/apache/flink/pull/10430#discussion_r354677710 ## File path: flink-python/src/main/java/org/apache/flink/client/python/PythonDriverEnvUtils.java ## @@ -138,9 +152,34 @@ public static PythonEnvironment preparePythonEnvironment( } env.pythonPath = String.join(File.pathSeparator, pythonPathList); + + if (!pythonDriverOptions.getPyFiles().isEmpty()) { + env.systemEnv.put(PYFLINK_PY_FILES, String.join("\n", pythonDriverOptions.getPyFiles())); + } + if (!pythonDriverOptions.getPyArchives().isEmpty()) { + env.systemEnv.put( + PYFLINK_PY_ARCHIVES, + joinTuples(pythonDriverOptions.getPyArchives().toArray(new Tuple2[0]))); + } + pythonDriverOptions.getPyRequirements().ifPresent( + pyRequirements -> env.systemEnv.put(PYFLINK_PY_REQUIREMENTS, joinTuples(pyRequirements))); + pythonDriverOptions.getPyExecutable().ifPresent( + pyExecutable -> env.systemEnv.put(PYFLINK_PY_EXECUTABLE, pythonDriverOptions.getPyExecutable().get())); return env; } + @SafeVarargs + private static String joinTuples(Tuple2... tuples) { Review comment: What about change the signature to "Collection> tuples"? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] dianfu commented on a change in pull request #10430: [FLINK-14019][python][cli] Add command-line options for managing Python UDF environment and dependencies.
dianfu commented on a change in pull request #10430: [FLINK-14019][python][cli] Add command-line options for managing Python UDF environment and dependencies. URL: https://github.com/apache/flink/pull/10430#discussion_r354671048 ## File path: flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java ## @@ -126,14 +126,44 @@ static final Option PYFILES_OPTION = new Option("pyfs", "pyFiles", true, "Attach custom python files for job. " + - "Comma can be used as the separator to specify multiple files. " + - "The standard python resource file suffixes such as .py/.egg/.zip are all supported." + - "(eg: --pyFiles file:///tmp/myresource.zip,hdfs:///$namenode_address/myresource2.zip)"); + "These files will be added to the PYTHONPATH of both the local client and the remote python UDF worker. " + + "The standard python resource file suffixes such as .py/.egg/.zip or directory are all supported. " + + "Comma (',') could be used as the separator to specify multiple files " + + "(e.g.: --pyFiles file:///tmp/myresource.zip,hdfs:///$namenode_address/myresource2.zip)."); static final Option PYMODULE_OPTION = new Option("pym", "pyModule", true, "Python module with the program entry point. " + "This option must be used in conjunction with `--pyFiles`."); + static final Option PYREQUIREMENTS_OPTION = new Option("pyreq", "pyRequirements", true, + "Specify a requirements.txt file which defines the third-party dependencies. " + + "These dependencies will be installed and added to the PYTHONPATH of the python UDF worker. " + + "A directory which contains the installation packages of these dependencies could be specified" + + "via second parameter. Use '#' as the separator of parameters if second parameter exists. " + + "(e.g.: --pyRequirements file:///tmp/requirements.txt#file:///tmp/cached_dir)"); + + static final Option PYARCHIVE_OPTION = new Option("pyarch", "pyArchives", true, + "Add python archive files for job. " + Review comment: What about keeping the length of each line similarly for each line instead of one sentence per line? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] dianfu commented on a change in pull request #10430: [FLINK-14019][python][cli] Add command-line options for managing Python UDF environment and dependencies.
dianfu commented on a change in pull request #10430: [FLINK-14019][python][cli] Add command-line options for managing Python UDF environment and dependencies. URL: https://github.com/apache/flink/pull/10430#discussion_r354671362 ## File path: flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java ## @@ -126,14 +126,44 @@ static final Option PYFILES_OPTION = new Option("pyfs", "pyFiles", true, "Attach custom python files for job. " + - "Comma can be used as the separator to specify multiple files. " + - "The standard python resource file suffixes such as .py/.egg/.zip are all supported." + - "(eg: --pyFiles file:///tmp/myresource.zip,hdfs:///$namenode_address/myresource2.zip)"); + "These files will be added to the PYTHONPATH of both the local client and the remote python UDF worker. " + + "The standard python resource file suffixes such as .py/.egg/.zip or directory are all supported. " + + "Comma (',') could be used as the separator to specify multiple files " + + "(e.g.: --pyFiles file:///tmp/myresource.zip,hdfs:///$namenode_address/myresource2.zip)."); static final Option PYMODULE_OPTION = new Option("pym", "pyModule", true, "Python module with the program entry point. " + "This option must be used in conjunction with `--pyFiles`."); + static final Option PYREQUIREMENTS_OPTION = new Option("pyreq", "pyRequirements", true, + "Specify a requirements.txt file which defines the third-party dependencies. " + + "These dependencies will be installed and added to the PYTHONPATH of the python UDF worker. " + + "A directory which contains the installation packages of these dependencies could be specified" + + "via second parameter. Use '#' as the separator of parameters if second parameter exists. " + + "(e.g.: --pyRequirements file:///tmp/requirements.txt#file:///tmp/cached_dir)"); + + static final Option PYARCHIVE_OPTION = new Option("pyarch", "pyArchives", true, + "Add python archive files for job. " + + "The file will be extracted to the working directory of python UDF worker. " + Review comment: file -> files This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] dianfu commented on a change in pull request #10430: [FLINK-14019][python][cli] Add command-line options for managing Python UDF environment and dependencies.
dianfu commented on a change in pull request #10430: [FLINK-14019][python][cli] Add command-line options for managing Python UDF environment and dependencies. URL: https://github.com/apache/flink/pull/10430#discussion_r354674349 ## File path: flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java ## @@ -80,51 +86,24 @@ public ProgramOptions(CommandLine line) throws CliArgsException { isPython = line.hasOption(PY_OPTION.getOpt()) | line.hasOption(PYMODULE_OPTION.getOpt()) | "org.apache.flink.client.python.PythonGatewayServer".equals(entryPointClass); // If specified the option -py(--python) - if (line.hasOption(PY_OPTION.getOpt())) { - // Cannot use option -py and -pym simultaneously. - if (line.hasOption(PYMODULE_OPTION.getOpt())) { - throw new CliArgsException("Cannot use option -py and -pym simultaneously."); - } - // The cli cmd args which will be transferred to PythonDriver will be transformed as follows: - // CLI cmd : -py ${python.py} pyfs [optional] ${py-files} [optional] ${other args}. - // PythonDriver args: py ${python.py} [optional] pyfs [optional] ${py-files} [optional] ${other args}. - // ---transformed--- - // e.g. -py wordcount.py(CLI cmd) ---> py wordcount.py(PythonDriver args) - // e.g. -py wordcount.py -pyfs file:///AAA.py,hdfs:///BBB.py --input in.txt --output out.txt(CLI cmd) - // -> -py wordcount.py -pyfs file:///AAA.py,hdfs:///BBB.py --input in.txt --output out.txt(PythonDriver args) - String[] newArgs; - int argIndex; - if (line.hasOption(PYFILES_OPTION.getOpt())) { - newArgs = new String[args.length + 4]; - newArgs[2] = "-" + PYFILES_OPTION.getOpt(); - newArgs[3] = line.getOptionValue(PYFILES_OPTION.getOpt()); - argIndex = 4; - } else { - newArgs = new String[args.length + 2]; - argIndex = 2; - } - newArgs[0] = "-" + PY_OPTION.getOpt(); - newArgs[1] = line.getOptionValue(PY_OPTION.getOpt()); - System.arraycopy(args, 0, newArgs, argIndex, args.length); - args = newArgs; - } - - // If specified the option -pym(--pyModule) - if (line.hasOption(PYMODULE_OPTION.getOpt())) { - // If you specify the option -pym, you should specify the option --pyFiles simultaneously. - if (!line.hasOption(PYFILES_OPTION.getOpt())) { - throw new CliArgsException("-pym must be used in conjunction with `--pyFiles`"); + if (isPython) { Review comment: Nice change! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #10389: [FLINK-14198][python] Add "type" and "rtype" options to flink python API docstrings of table.py and table_environment.py
flinkbot edited a comment on issue #10389: [FLINK-14198][python] Add "type" and "rtype" options to flink python API docstrings of table.py and table_environment.py URL: https://github.com/apache/flink/pull/10389#issuecomment-560984209 ## CI report: * ebc6a4396f71fda053e32ed71994e5ce129835c1 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/139081057) * 1a14a71c2785b0c98831779b1e058f585bdc762c : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/139501852) * b1b324f72f4561126eb1dcbf144947d2b3ef2dc1 : PENDING [Build](https://travis-ci.com/flink-ci/flink/builds/139653098) * ed548c842564c3beca8b2afefa0048b6a5b879bb : UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #10315: [FLINK-14552][table] Enable partition statistics in blink planner
flinkbot edited a comment on issue #10315: [FLINK-14552][table] Enable partition statistics in blink planner URL: https://github.com/apache/flink/pull/10315#issuecomment-558221318 ## CI report: * 05b30617c280796e80e8569dae48b726cddb23d8 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/138084321) * 707fc20d39d5a2c82bd4a3024cd02a932413af70 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/138184336) * 748ccd842d5558cc250ab9fd35d7e30e21e57406 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/138268848) * 5edccbaf0cf95635b4a81fe9b16bfefcddf2d07c : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/138883725) * 72ed8881457f007135b6faf4ff5dfeefb6911d7e : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/138972329) * 02498867d43f51e3c583b470cf39ca632737aa5d : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/139141699) * 84e826cef61d4e659669fa169bf6f6a73035048b : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/13975) * 54bd514481c22e6bce418f120e38b91eb361ad42 : CANCELED [Build](https://travis-ci.com/flink-ci/flink/builds/139442856) * 71eed85744450cd933f0b061108ee7c49017bd9f : CANCELED [Build](https://travis-ci.com/flink-ci/flink/builds/139464310) * 71acb8878dad3d9966d43f27d3db3dace81ac301 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/139470658) * ea45df4a4dd2cb13dcb37c1ada91d7c202cf708d : CANCELED [Build](https://travis-ci.com/flink-ci/flink/builds/139635199) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot commented on issue #10456: [FLINK-15091][table-planner-blink] Fix memory overused in SortMergeJoinOperator
flinkbot commented on issue #10456: [FLINK-15091][table-planner-blink] Fix memory overused in SortMergeJoinOperator URL: https://github.com/apache/flink/pull/10456#issuecomment-562468186 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Automated Checks Last check on commit a6e02881eeedc154f73a8e1bd39dd98ce2a89e63 (Fri Dec 06 07:41:46 UTC 2019) **Warnings:** * No documentation files were touched! Remember to keep the Flink docs up to date! Mention the bot in a comment to re-run the automated checks. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] TisonKun closed pull request #10457: !FOR TESTING!
TisonKun closed pull request #10457: !FOR TESTING! URL: https://github.com/apache/flink/pull/10457 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] TisonKun opened a new pull request #10457: !FOR TESTING!
TisonKun opened a new pull request #10457: !FOR TESTING! URL: https://github.com/apache/flink/pull/10457 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-15091) JoinITCase.testFullJoinWithNonEquiJoinPred failed in travis
[ https://issues.apache.org/jira/browse/FLINK-15091?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-15091: --- Labels: pull-request-available (was: ) > JoinITCase.testFullJoinWithNonEquiJoinPred failed in travis > --- > > Key: FLINK-15091 > URL: https://issues.apache.org/jira/browse/FLINK-15091 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.10.0 >Reporter: Kurt Young >Assignee: Jingsong Lee >Priority: Major > Labels: pull-request-available > Fix For: 1.10.0 > > > 04:45:22.404 [ERROR] Tests run: 21, Failures: 0, Errors: 1, Skipped: 0, Time > elapsed: 4.909 s <<< FAILURE! - in > org.apache.flink.table.planner.runtime.batch.table.JoinITCase 04:45:22.406 > [ERROR] > testFullJoinWithNonEquiJoinPred(org.apache.flink.table.planner.runtime.batch.table.JoinITCase) > Time elapsed: 0.168 s <<< ERROR! > org.apache.flink.runtime.client.JobExecutionException: Job execution failed. > at > org.apache.flink.table.planner.runtime.batch.table.JoinITCase.testFullJoinWithNonEquiJoinPred(JoinITCase.scala:344) > Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by > NoRestartBackoffTimeStrategy Caused by: > org.apache.flink.runtime.memory.MemoryAllocationException: Could not allocate > 32 pages. Only 0 pages are remaining. > > details: [https://api.travis-ci.org/v3/job/621407747/log.txt] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] JingsongLi opened a new pull request #10456: [FLINK-15091][table-planner-blink] Fix memory overused in SortMergeJoinOperator
JingsongLi opened a new pull request #10456: [FLINK-15091][table-planner-blink] Fix memory overused in SortMergeJoinOperator URL: https://github.com/apache/flink/pull/10456 ## What is the purpose of the change In SortMergeJoinOperator, we should check if it is a full join, then will use two externalBufferMemory. ## Brief change log Using two externalBufferMemory when SortMergeJoinOperator is full join. ## Verifying this change This change is already covered by existing tests ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] KurtYoung commented on a change in pull request #10454: [FLINK-13195][sql-client] Add create(drop) table support for SqlClient
KurtYoung commented on a change in pull request #10454: [FLINK-13195][sql-client] Add create(drop) table support for SqlClient URL: https://github.com/apache/flink/pull/10454#discussion_r354685477 ## File path: flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java ## @@ -154,8 +160,29 @@ public ExecutionContext( ClusterClientServiceLoader clusterClientServiceLoader, Options commandLineOptions, List availableCommandLines) throws FlinkException { + this( + environment, + originalSessionContext, + null, + dependencies, + flinkConfig, + new DefaultClusterClientServiceLoader(), + commandLineOptions, + availableCommandLines); + } + + public ExecutionContext( Review comment: make all constructors `private` if you want to introduce `Builder` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] wangyang0918 commented on a change in pull request #9984: [FLINK-9495][kubernetes] Implement ResourceManager for Kubernetes.
wangyang0918 commented on a change in pull request #9984: [FLINK-9495][kubernetes] Implement ResourceManager for Kubernetes. URL: https://github.com/apache/flink/pull/9984#discussion_r354697346 ## File path: flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesResourceManagerTest.java ## @@ -0,0 +1,372 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions; +import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient; +import org.apache.flink.kubernetes.kubeclient.TaskManagerPodParameter; +import org.apache.flink.kubernetes.kubeclient.resources.KubernetesPod; +import org.apache.flink.kubernetes.utils.Constants; +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.clusterframework.types.SlotID; +import org.apache.flink.runtime.entrypoint.ClusterInformation; +import org.apache.flink.runtime.heartbeat.HeartbeatServices; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.instance.HardwareDescription; +import org.apache.flink.runtime.messages.Acknowledge; +import org.apache.flink.runtime.metrics.groups.ResourceManagerMetricGroup; +import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; +import org.apache.flink.runtime.registration.RegistrationResponse; +import org.apache.flink.runtime.resourcemanager.JobLeaderIdService; +import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; +import org.apache.flink.runtime.resourcemanager.SlotRequest; +import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager; +import org.apache.flink.runtime.resourcemanager.utils.MockResourceManagerRuntimeServices; +import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.runtime.rpc.TestingRpcService; +import org.apache.flink.runtime.taskexecutor.SlotReport; +import org.apache.flink.runtime.taskexecutor.SlotStatus; +import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway; +import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess; +import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder; +import org.apache.flink.runtime.util.TestingFatalErrorHandler; + +import io.fabric8.kubernetes.api.model.Container; +import io.fabric8.kubernetes.api.model.ContainerStateBuilder; +import io.fabric8.kubernetes.api.model.ContainerStatusBuilder; +import io.fabric8.kubernetes.api.model.EnvVarBuilder; +import io.fabric8.kubernetes.api.model.Pod; +import io.fabric8.kubernetes.api.model.PodList; +import io.fabric8.kubernetes.api.model.PodStatusBuilder; +import io.fabric8.kubernetes.client.KubernetesClient; +import org.hamcrest.Matchers; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; + +import static junit.framework.TestCase.assertEquals; +import static org.hamcrest.Matchers.instanceOf; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + +/** + * Test cases for {@link KubernetesResourceManager}. + */ +public class KubernetesResourceManagerTest extends KubernetesTestBase { + + private static final Time TIMEOUT = Time.seconds(10L); + + private TestingFatalErrorHandler testingFatalErrorHandler; + + private final String jobManagerHost = "jm-host1"; + + private Configuration flinkConfig; + + private TestingKubernetesResourceManager resourceManager; + + private Fli
[GitHub] [flink] wangyang0918 commented on a change in pull request #9984: [FLINK-9495][kubernetes] Implement ResourceManager for Kubernetes.
wangyang0918 commented on a change in pull request #9984: [FLINK-9495][kubernetes] Implement ResourceManager for Kubernetes. URL: https://github.com/apache/flink/pull/9984#discussion_r354697070 ## File path: flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesResourceManagerTest.java ## @@ -0,0 +1,372 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions; +import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient; +import org.apache.flink.kubernetes.kubeclient.TaskManagerPodParameter; +import org.apache.flink.kubernetes.kubeclient.resources.KubernetesPod; +import org.apache.flink.kubernetes.utils.Constants; +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.clusterframework.types.SlotID; +import org.apache.flink.runtime.entrypoint.ClusterInformation; +import org.apache.flink.runtime.heartbeat.HeartbeatServices; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.instance.HardwareDescription; +import org.apache.flink.runtime.messages.Acknowledge; +import org.apache.flink.runtime.metrics.groups.ResourceManagerMetricGroup; +import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; +import org.apache.flink.runtime.registration.RegistrationResponse; +import org.apache.flink.runtime.resourcemanager.JobLeaderIdService; +import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; +import org.apache.flink.runtime.resourcemanager.SlotRequest; +import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager; +import org.apache.flink.runtime.resourcemanager.utils.MockResourceManagerRuntimeServices; +import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.runtime.rpc.TestingRpcService; +import org.apache.flink.runtime.taskexecutor.SlotReport; +import org.apache.flink.runtime.taskexecutor.SlotStatus; +import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway; +import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess; +import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder; +import org.apache.flink.runtime.util.TestingFatalErrorHandler; + +import io.fabric8.kubernetes.api.model.Container; +import io.fabric8.kubernetes.api.model.ContainerStateBuilder; +import io.fabric8.kubernetes.api.model.ContainerStatusBuilder; +import io.fabric8.kubernetes.api.model.EnvVarBuilder; +import io.fabric8.kubernetes.api.model.Pod; +import io.fabric8.kubernetes.api.model.PodList; +import io.fabric8.kubernetes.api.model.PodStatusBuilder; +import io.fabric8.kubernetes.client.KubernetesClient; +import org.hamcrest.Matchers; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; + +import static junit.framework.TestCase.assertEquals; +import static org.hamcrest.Matchers.instanceOf; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + +/** + * Test cases for {@link KubernetesResourceManager}. + */ +public class KubernetesResourceManagerTest extends KubernetesTestBase { + + private static final Time TIMEOUT = Time.seconds(10L); + + private TestingFatalErrorHandler testingFatalErrorHandler; + + private final String jobManagerHost = "jm-host1"; + + private Configuration flinkConfig; + + private TestingKubernetesResourceManager resourceManager; + + private Fli
[GitHub] [flink] wangyang0918 commented on a change in pull request #9984: [FLINK-9495][kubernetes] Implement ResourceManager for Kubernetes.
wangyang0918 commented on a change in pull request #9984: [FLINK-9495][kubernetes] Implement ResourceManager for Kubernetes. URL: https://github.com/apache/flink/pull/9984#discussion_r354697186 ## File path: flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesResourceManagerTest.java ## @@ -0,0 +1,372 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions; +import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient; +import org.apache.flink.kubernetes.kubeclient.TaskManagerPodParameter; +import org.apache.flink.kubernetes.kubeclient.resources.KubernetesPod; +import org.apache.flink.kubernetes.utils.Constants; +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.clusterframework.types.SlotID; +import org.apache.flink.runtime.entrypoint.ClusterInformation; +import org.apache.flink.runtime.heartbeat.HeartbeatServices; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.instance.HardwareDescription; +import org.apache.flink.runtime.messages.Acknowledge; +import org.apache.flink.runtime.metrics.groups.ResourceManagerMetricGroup; +import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; +import org.apache.flink.runtime.registration.RegistrationResponse; +import org.apache.flink.runtime.resourcemanager.JobLeaderIdService; +import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; +import org.apache.flink.runtime.resourcemanager.SlotRequest; +import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager; +import org.apache.flink.runtime.resourcemanager.utils.MockResourceManagerRuntimeServices; +import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.runtime.rpc.TestingRpcService; +import org.apache.flink.runtime.taskexecutor.SlotReport; +import org.apache.flink.runtime.taskexecutor.SlotStatus; +import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway; +import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess; +import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder; +import org.apache.flink.runtime.util.TestingFatalErrorHandler; + +import io.fabric8.kubernetes.api.model.Container; +import io.fabric8.kubernetes.api.model.ContainerStateBuilder; +import io.fabric8.kubernetes.api.model.ContainerStatusBuilder; +import io.fabric8.kubernetes.api.model.EnvVarBuilder; +import io.fabric8.kubernetes.api.model.Pod; +import io.fabric8.kubernetes.api.model.PodList; +import io.fabric8.kubernetes.api.model.PodStatusBuilder; +import io.fabric8.kubernetes.client.KubernetesClient; +import org.hamcrest.Matchers; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; + +import static junit.framework.TestCase.assertEquals; +import static org.hamcrest.Matchers.instanceOf; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + +/** + * Test cases for {@link KubernetesResourceManager}. + */ +public class KubernetesResourceManagerTest extends KubernetesTestBase { + + private static final Time TIMEOUT = Time.seconds(10L); + + private TestingFatalErrorHandler testingFatalErrorHandler; + + private final String jobManagerHost = "jm-host1"; + + private Configuration flinkConfig; + + private TestingKubernetesResourceManager resourceManager; + + private Fli
[GitHub] [flink] wangyang0918 commented on a change in pull request #9984: [FLINK-9495][kubernetes] Implement ResourceManager for Kubernetes.
wangyang0918 commented on a change in pull request #9984: [FLINK-9495][kubernetes] Implement ResourceManager for Kubernetes. URL: https://github.com/apache/flink/pull/9984#discussion_r354696896 ## File path: flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesResourceManagerTest.java ## @@ -0,0 +1,372 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions; +import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient; +import org.apache.flink.kubernetes.kubeclient.TaskManagerPodParameter; +import org.apache.flink.kubernetes.kubeclient.resources.KubernetesPod; +import org.apache.flink.kubernetes.utils.Constants; +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.clusterframework.types.SlotID; +import org.apache.flink.runtime.entrypoint.ClusterInformation; +import org.apache.flink.runtime.heartbeat.HeartbeatServices; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.instance.HardwareDescription; +import org.apache.flink.runtime.messages.Acknowledge; +import org.apache.flink.runtime.metrics.groups.ResourceManagerMetricGroup; +import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; +import org.apache.flink.runtime.registration.RegistrationResponse; +import org.apache.flink.runtime.resourcemanager.JobLeaderIdService; +import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; +import org.apache.flink.runtime.resourcemanager.SlotRequest; +import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager; +import org.apache.flink.runtime.resourcemanager.utils.MockResourceManagerRuntimeServices; +import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.runtime.rpc.TestingRpcService; +import org.apache.flink.runtime.taskexecutor.SlotReport; +import org.apache.flink.runtime.taskexecutor.SlotStatus; +import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway; +import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess; +import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder; +import org.apache.flink.runtime.util.TestingFatalErrorHandler; + +import io.fabric8.kubernetes.api.model.Container; +import io.fabric8.kubernetes.api.model.ContainerStateBuilder; +import io.fabric8.kubernetes.api.model.ContainerStatusBuilder; +import io.fabric8.kubernetes.api.model.EnvVarBuilder; +import io.fabric8.kubernetes.api.model.Pod; +import io.fabric8.kubernetes.api.model.PodList; +import io.fabric8.kubernetes.api.model.PodStatusBuilder; +import io.fabric8.kubernetes.client.KubernetesClient; +import org.hamcrest.Matchers; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; + +import static junit.framework.TestCase.assertEquals; +import static org.hamcrest.Matchers.instanceOf; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + +/** + * Test cases for {@link KubernetesResourceManager}. + */ +public class KubernetesResourceManagerTest extends KubernetesTestBase { + + private static final Time TIMEOUT = Time.seconds(10L); + + private TestingFatalErrorHandler testingFatalErrorHandler; + + private final String jobManagerHost = "jm-host1"; + + private Configuration flinkConfig; + + private TestingKubernetesResourceManager resourceManager; + + private Fli
[GitHub] [flink] wangyang0918 commented on a change in pull request #9984: [FLINK-9495][kubernetes] Implement ResourceManager for Kubernetes.
wangyang0918 commented on a change in pull request #9984: [FLINK-9495][kubernetes] Implement ResourceManager for Kubernetes. URL: https://github.com/apache/flink/pull/9984#discussion_r354696517 ## File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java ## @@ -0,0 +1,340 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions; +import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient; +import org.apache.flink.kubernetes.kubeclient.KubeClientFactory; +import org.apache.flink.kubernetes.kubeclient.TaskManagerPodParameter; +import org.apache.flink.kubernetes.kubeclient.resources.KubernetesPod; +import org.apache.flink.kubernetes.taskmanager.KubernetesTaskExecutorRunner; +import org.apache.flink.kubernetes.utils.Constants; +import org.apache.flink.kubernetes.utils.KubernetesUtils; +import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.clusterframework.BootstrapTools; +import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.entrypoint.ClusterInformation; +import org.apache.flink.runtime.entrypoint.parser.CommandLineOptions; +import org.apache.flink.runtime.heartbeat.HeartbeatServices; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.metrics.groups.ResourceManagerMetricGroup; +import org.apache.flink.runtime.resourcemanager.ActiveResourceManager; +import org.apache.flink.runtime.resourcemanager.JobLeaderIdService; +import org.apache.flink.runtime.resourcemanager.ResourceManager; +import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException; +import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager; +import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.runtime.rpc.RpcService; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.File; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicLong; + +/** + * Kubernetes specific implementation of the {@link ResourceManager}. + */ +public class KubernetesResourceManager extends ActiveResourceManager + implements FlinkKubeClient.PodCallbackHandler { + + private static final Logger LOG = LoggerFactory.getLogger(KubernetesResourceManager.class); + + /** The taskmanager pod name pattern is {clusterId}-{taskmanager}-{attemptId}-{podIndex}. */ + private static final String TASK_MANAGER_POD_FORMAT = "%s-taskmanager-%d-%d"; + + private final Map workerNodes = new HashMap<>(); + + private final double defaultCpus; + + /** When ResourceManager failover, the max attempt should recover. */ + private final AtomicLong currentMaxAttemptId = new AtomicLong(0); Review comment: Will use long instead. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] JingsongLi commented on a change in pull request #10452: [FLINK-15035][table-planner-blink] Introduce unknown memory setting to table in blink planner
JingsongLi commented on a change in pull request #10452: [FLINK-15035][table-planner-blink] Introduce unknown memory setting to table in blink planner URL: https://github.com/apache/flink/pull/10452#discussion_r354681473 ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/exec/BatchExecNode.scala ## @@ -32,4 +33,13 @@ trait BatchExecNode[T] extends ExecNode[BatchPlanner, T] with Logging { */ def getDamBehavior: DamBehavior + def setManagedMemoryWeight[X]( + transformation: Transformation[X], memoryBytes: Long): Transformation[X] = { +// Using Bytes can easily overflow +// Using MebiBytes to cast to int +// Careful about zero +val memoryMB = Math.max(1, (memoryBytes >> 20).toInt) Review comment: Yeah, It's a bit hacky, looks like we create these hacky code to work around runtime default value. Because our current `ManagedMemoryWeight` settings are very large. So I think default 1 is very small and is not a serious problem, so I prefer just check in `ExecutorUtils#setBatchProperties`, and warn log when weight is default 1. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] wangyang0918 commented on a change in pull request #9984: [FLINK-9495][kubernetes] Implement ResourceManager for Kubernetes.
wangyang0918 commented on a change in pull request #9984: [FLINK-9495][kubernetes] Implement ResourceManager for Kubernetes. URL: https://github.com/apache/flink/pull/9984#discussion_r354695465 ## File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java ## @@ -0,0 +1,340 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions; +import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient; +import org.apache.flink.kubernetes.kubeclient.KubeClientFactory; +import org.apache.flink.kubernetes.kubeclient.TaskManagerPodParameter; +import org.apache.flink.kubernetes.kubeclient.resources.KubernetesPod; +import org.apache.flink.kubernetes.taskmanager.KubernetesTaskExecutorRunner; +import org.apache.flink.kubernetes.utils.Constants; +import org.apache.flink.kubernetes.utils.KubernetesUtils; +import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.clusterframework.BootstrapTools; +import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.entrypoint.ClusterInformation; +import org.apache.flink.runtime.entrypoint.parser.CommandLineOptions; +import org.apache.flink.runtime.heartbeat.HeartbeatServices; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.metrics.groups.ResourceManagerMetricGroup; +import org.apache.flink.runtime.resourcemanager.ActiveResourceManager; +import org.apache.flink.runtime.resourcemanager.JobLeaderIdService; +import org.apache.flink.runtime.resourcemanager.ResourceManager; +import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException; +import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager; +import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.runtime.rpc.RpcService; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.File; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicLong; + +/** + * Kubernetes specific implementation of the {@link ResourceManager}. + */ +public class KubernetesResourceManager extends ActiveResourceManager + implements FlinkKubeClient.PodCallbackHandler { + + private static final Logger LOG = LoggerFactory.getLogger(KubernetesResourceManager.class); + + /** The taskmanager pod name pattern is {clusterId}-{taskmanager}-{attemptId}-{podIndex}. */ + private static final String TASK_MANAGER_POD_FORMAT = "%s-taskmanager-%d-%d"; + + private final Map workerNodes = new HashMap<>(); + + private final double defaultCpus; + + /** When ResourceManager failover, the max attempt should recover. */ + private final AtomicLong currentMaxAttemptId = new AtomicLong(0); + + /** Current max pod index. When creating a new pod, it should increase one. */ + private final AtomicLong currentMaxPodId = new AtomicLong(0); + + private final String clusterId; + + private final FlinkKubeClient kubeClient; + + private final ContaineredTaskManagerParameters taskManagerParameters; + + private final List taskManagerStartCommand; + + /** The number of pods requested, but not yet granted. */ + private int numPendingPodRequests = 0; + + public KubernetesResourceManager( + RpcService rpcService, + String resourceManagerEndpointId, + ResourceID resourceId, + Configuration flinkConfig, +
[GitHub] [flink] lamber-ken commented on issue #9751: [FLINK-14177] bump curator from 2.12.0 to 4.2.0
lamber-ken commented on issue #9751: [FLINK-14177] bump curator from 2.12.0 to 4.2.0 URL: https://github.com/apache/flink/pull/9751#issuecomment-562464972 @flinkbot run travis This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] wangyang0918 commented on a change in pull request #9984: [FLINK-9495][kubernetes] Implement ResourceManager for Kubernetes.
wangyang0918 commented on a change in pull request #9984: [FLINK-9495][kubernetes] Implement ResourceManager for Kubernetes. URL: https://github.com/apache/flink/pull/9984#discussion_r354694048 ## File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java ## @@ -0,0 +1,340 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions; +import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient; +import org.apache.flink.kubernetes.kubeclient.KubeClientFactory; +import org.apache.flink.kubernetes.kubeclient.TaskManagerPodParameter; +import org.apache.flink.kubernetes.kubeclient.resources.KubernetesPod; +import org.apache.flink.kubernetes.taskmanager.KubernetesTaskExecutorRunner; +import org.apache.flink.kubernetes.utils.Constants; +import org.apache.flink.kubernetes.utils.KubernetesUtils; +import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.clusterframework.BootstrapTools; +import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.entrypoint.ClusterInformation; +import org.apache.flink.runtime.entrypoint.parser.CommandLineOptions; +import org.apache.flink.runtime.heartbeat.HeartbeatServices; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.metrics.groups.ResourceManagerMetricGroup; +import org.apache.flink.runtime.resourcemanager.ActiveResourceManager; +import org.apache.flink.runtime.resourcemanager.JobLeaderIdService; +import org.apache.flink.runtime.resourcemanager.ResourceManager; +import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException; +import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager; +import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.runtime.rpc.RpcService; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.File; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicLong; + +/** + * Kubernetes specific implementation of the {@link ResourceManager}. + */ +public class KubernetesResourceManager extends ActiveResourceManager + implements FlinkKubeClient.PodCallbackHandler { + + private static final Logger LOG = LoggerFactory.getLogger(KubernetesResourceManager.class); + + /** The taskmanager pod name pattern is {clusterId}-{taskmanager}-{attemptId}-{podIndex}. */ + private static final String TASK_MANAGER_POD_FORMAT = "%s-taskmanager-%d-%d"; + + private final Map workerNodes = new HashMap<>(); + + private final double defaultCpus; + + /** When ResourceManager failover, the max attempt should recover. */ + private final AtomicLong currentMaxAttemptId = new AtomicLong(0); + + /** Current max pod index. When creating a new pod, it should increase one. */ + private final AtomicLong currentMaxPodId = new AtomicLong(0); + + private final String clusterId; + + private final FlinkKubeClient kubeClient; + + private final ContaineredTaskManagerParameters taskManagerParameters; + + private final List taskManagerStartCommand; + + /** The number of pods requested, but not yet granted. */ + private int numPendingPodRequests = 0; + + public KubernetesResourceManager( + RpcService rpcService, + String resourceManagerEndpointId, + ResourceID resourceId, + Configuration flinkConfig, +
[GitHub] [flink] flinkbot edited a comment on issue #9984: [FLINK-9495][kubernetes] Implement ResourceManager for Kubernetes.
flinkbot edited a comment on issue #9984: [FLINK-9495][kubernetes] Implement ResourceManager for Kubernetes. URL: https://github.com/apache/flink/pull/9984#issuecomment-545881191 ## CI report: * 802ebf37e3d932169f9826b40df483bb5e9ac064 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/133366796) * f16938ce2fb38ae216def737d14643b94d6083a1 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/133940607) * 20de5cfc7af9a8ba57080d5218fd0293f393a40e : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/133998545) * 56bfbb65802c1d5c48caa625a152070934bb5d79 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/134010321) * 3ea229382fef64b1046673c79ff845d4689c5db4 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/137145261) * 3169988a33e0126e79cd449740c93d3561296ead : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/138017410) * a0dd858b0b91443fc87895a2d32ebfbbc0b9fe4c : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/138730857) * 86b4537979265a0fbecf7c1841ed8fd2f7ebfd86 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/138770610) * d77f83e133b497e42cd85aeaf95e625411274c92 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/139116897) * f2ed0e4b8d37dc3d5b2b770d943e703f4d893da0 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/139256990) * ce01f742aba3a683b98a0ea7da47e678d30c12be : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/139290769) * 7392ac618203c2544ab4a313a7068e03fcc147a7 : CANCELED [Build](https://travis-ci.com/flink-ci/flink/builds/139453237) * 3b3e23e69bde65f504ea5d46ef7c2d55ba04c0af : CANCELED [Build](https://travis-ci.com/flink-ci/flink/builds/139459406) * 71c087913517972655181e0119588c886e4243cb : CANCELED [Build](https://travis-ci.com/flink-ci/flink/builds/139461705) * 9be9f9f2338a2a953a76cc60057554d7d90157f9 : CANCELED [Build](https://travis-ci.com/flink-ci/flink/builds/139518666) * bb01ae34a8d6087f4aef4a989c0d14d576df601f : CANCELED [Build](https://travis-ci.com/flink-ci/flink/builds/139523468) * fd9cdf56812f2d0c9d659b69d9fb243d58ef48a3 : CANCELED [Build](https://travis-ci.com/flink-ci/flink/builds/139555865) * c647669902f574c3ee23adc7efbabc859d93c0e4 : CANCELED [Build](https://travis-ci.com/flink-ci/flink/builds/139627616) * cc29fb3058998b1489451d53ad04d9e9886f4193 : CANCELED [Build](https://travis-ci.com/flink-ci/flink/builds/139635222) * 3d315606bd4fdf826ef823631116a189810dab6a : CANCELED [Build](https://travis-ci.com/flink-ci/flink/builds/139641767) * 37bfdff9aa766deaac10d9c5407cc244d14d1085 : CANCELED [Build](https://travis-ci.com/flink-ci/flink/builds/139643868) * daa0ff528a8ec3c2dcbd33927099cee2d8faa113 : PENDING [Build](https://travis-ci.com/flink-ci/flink/builds/139650255) * 17ba1baf2383844c1b0ca8aa8ca97a80646e77fb : UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] wangyang0918 commented on a change in pull request #9984: [FLINK-9495][kubernetes] Implement ResourceManager for Kubernetes.
wangyang0918 commented on a change in pull request #9984: [FLINK-9495][kubernetes] Implement ResourceManager for Kubernetes. URL: https://github.com/apache/flink/pull/9984#discussion_r354693688 ## File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java ## @@ -0,0 +1,340 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions; +import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient; +import org.apache.flink.kubernetes.kubeclient.KubeClientFactory; +import org.apache.flink.kubernetes.kubeclient.TaskManagerPodParameter; +import org.apache.flink.kubernetes.kubeclient.resources.KubernetesPod; +import org.apache.flink.kubernetes.taskmanager.KubernetesTaskExecutorRunner; +import org.apache.flink.kubernetes.utils.Constants; +import org.apache.flink.kubernetes.utils.KubernetesUtils; +import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.clusterframework.BootstrapTools; +import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.entrypoint.ClusterInformation; +import org.apache.flink.runtime.entrypoint.parser.CommandLineOptions; +import org.apache.flink.runtime.heartbeat.HeartbeatServices; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.metrics.groups.ResourceManagerMetricGroup; +import org.apache.flink.runtime.resourcemanager.ActiveResourceManager; +import org.apache.flink.runtime.resourcemanager.JobLeaderIdService; +import org.apache.flink.runtime.resourcemanager.ResourceManager; +import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException; +import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager; +import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.runtime.rpc.RpcService; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.File; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicLong; + +/** + * Kubernetes specific implementation of the {@link ResourceManager}. + */ +public class KubernetesResourceManager extends ActiveResourceManager + implements FlinkKubeClient.PodCallbackHandler { + + private static final Logger LOG = LoggerFactory.getLogger(KubernetesResourceManager.class); + + /** The taskmanager pod name pattern is {clusterId}-{taskmanager}-{attemptId}-{podIndex}. */ + private static final String TASK_MANAGER_POD_FORMAT = "%s-taskmanager-%d-%d"; + + private final Map workerNodes = new HashMap<>(); + + private final double defaultCpus; + + /** When ResourceManager failover, the max attempt should recover. */ + private final AtomicLong currentMaxAttemptId = new AtomicLong(0); + + /** Current max pod index. When creating a new pod, it should increase one. */ + private final AtomicLong currentMaxPodId = new AtomicLong(0); + + private final String clusterId; + + private final FlinkKubeClient kubeClient; + + private final ContaineredTaskManagerParameters taskManagerParameters; + + private final List taskManagerStartCommand; + + /** The number of pods requested, but not yet granted. */ + private int numPendingPodRequests = 0; + + public KubernetesResourceManager( + RpcService rpcService, + String resourceManagerEndpointId, + ResourceID resourceId, + Configuration flinkConfig, +
[GitHub] [flink] flinkbot commented on issue #10455: [FLINK-15089][connectors] Puslar catalog
flinkbot commented on issue #10455: [FLINK-15089][connectors] Puslar catalog URL: https://github.com/apache/flink/pull/10455#issuecomment-562462991 ## CI report: * 206bb09e47ff4f36514a93c96f7cadc30f62cd4b : UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] wangyang0918 commented on a change in pull request #9984: [FLINK-9495][kubernetes] Implement ResourceManager for Kubernetes.
wangyang0918 commented on a change in pull request #9984: [FLINK-9495][kubernetes] Implement ResourceManager for Kubernetes. URL: https://github.com/apache/flink/pull/9984#discussion_r354693480 ## File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java ## @@ -0,0 +1,340 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions; +import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient; +import org.apache.flink.kubernetes.kubeclient.KubeClientFactory; +import org.apache.flink.kubernetes.kubeclient.TaskManagerPodParameter; +import org.apache.flink.kubernetes.kubeclient.resources.KubernetesPod; +import org.apache.flink.kubernetes.taskmanager.KubernetesTaskExecutorRunner; +import org.apache.flink.kubernetes.utils.Constants; +import org.apache.flink.kubernetes.utils.KubernetesUtils; +import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.clusterframework.BootstrapTools; +import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.entrypoint.ClusterInformation; +import org.apache.flink.runtime.entrypoint.parser.CommandLineOptions; +import org.apache.flink.runtime.heartbeat.HeartbeatServices; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.metrics.groups.ResourceManagerMetricGroup; +import org.apache.flink.runtime.resourcemanager.ActiveResourceManager; +import org.apache.flink.runtime.resourcemanager.JobLeaderIdService; +import org.apache.flink.runtime.resourcemanager.ResourceManager; +import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException; +import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager; +import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.runtime.rpc.RpcService; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.File; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicLong; + +/** + * Kubernetes specific implementation of the {@link ResourceManager}. + */ +public class KubernetesResourceManager extends ActiveResourceManager + implements FlinkKubeClient.PodCallbackHandler { + + private static final Logger LOG = LoggerFactory.getLogger(KubernetesResourceManager.class); + + /** The taskmanager pod name pattern is {clusterId}-{taskmanager}-{attemptId}-{podIndex}. */ + private static final String TASK_MANAGER_POD_FORMAT = "%s-taskmanager-%d-%d"; + + private final Map workerNodes = new HashMap<>(); + + private final double defaultCpus; + + /** When ResourceManager failover, the max attempt should recover. */ + private final AtomicLong currentMaxAttemptId = new AtomicLong(0); + + /** Current max pod index. When creating a new pod, it should increase one. */ + private final AtomicLong currentMaxPodId = new AtomicLong(0); + + private final String clusterId; + + private final FlinkKubeClient kubeClient; + + private final ContaineredTaskManagerParameters taskManagerParameters; + + private final List taskManagerStartCommand; + + /** The number of pods requested, but not yet granted. */ + private int numPendingPodRequests = 0; + + public KubernetesResourceManager( + RpcService rpcService, + String resourceManagerEndpointId, + ResourceID resourceId, + Configuration flinkConfig, +
[GitHub] [flink] wangyang0918 commented on a change in pull request #9984: [FLINK-9495][kubernetes] Implement ResourceManager for Kubernetes.
wangyang0918 commented on a change in pull request #9984: [FLINK-9495][kubernetes] Implement ResourceManager for Kubernetes. URL: https://github.com/apache/flink/pull/9984#discussion_r354692195 ## File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java ## @@ -0,0 +1,340 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions; +import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient; +import org.apache.flink.kubernetes.kubeclient.KubeClientFactory; +import org.apache.flink.kubernetes.kubeclient.TaskManagerPodParameter; +import org.apache.flink.kubernetes.kubeclient.resources.KubernetesPod; +import org.apache.flink.kubernetes.taskmanager.KubernetesTaskExecutorRunner; +import org.apache.flink.kubernetes.utils.Constants; +import org.apache.flink.kubernetes.utils.KubernetesUtils; +import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.clusterframework.BootstrapTools; +import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.entrypoint.ClusterInformation; +import org.apache.flink.runtime.entrypoint.parser.CommandLineOptions; +import org.apache.flink.runtime.heartbeat.HeartbeatServices; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.metrics.groups.ResourceManagerMetricGroup; +import org.apache.flink.runtime.resourcemanager.ActiveResourceManager; +import org.apache.flink.runtime.resourcemanager.JobLeaderIdService; +import org.apache.flink.runtime.resourcemanager.ResourceManager; +import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException; +import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager; +import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.runtime.rpc.RpcService; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.File; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicLong; + +/** + * Kubernetes specific implementation of the {@link ResourceManager}. + */ +public class KubernetesResourceManager extends ActiveResourceManager + implements FlinkKubeClient.PodCallbackHandler { + + private static final Logger LOG = LoggerFactory.getLogger(KubernetesResourceManager.class); + + /** The taskmanager pod name pattern is {clusterId}-{taskmanager}-{attemptId}-{podIndex}. */ + private static final String TASK_MANAGER_POD_FORMAT = "%s-taskmanager-%d-%d"; + + private final Map workerNodes = new HashMap<>(); + + private final double defaultCpus; + + /** When ResourceManager failover, the max attempt should recover. */ + private final AtomicLong currentMaxAttemptId = new AtomicLong(0); + + /** Current max pod index. When creating a new pod, it should increase one. */ + private final AtomicLong currentMaxPodId = new AtomicLong(0); + + private final String clusterId; + + private final FlinkKubeClient kubeClient; + + private final ContaineredTaskManagerParameters taskManagerParameters; + + private final List taskManagerStartCommand; + + /** The number of pods requested, but not yet granted. */ + private int numPendingPodRequests = 0; + + public KubernetesResourceManager( + RpcService rpcService, + String resourceManagerEndpointId, + ResourceID resourceId, + Configuration flinkConfig, +
[GitHub] [flink] wangyang0918 commented on a change in pull request #9984: [FLINK-9495][kubernetes] Implement ResourceManager for Kubernetes.
wangyang0918 commented on a change in pull request #9984: [FLINK-9495][kubernetes] Implement ResourceManager for Kubernetes. URL: https://github.com/apache/flink/pull/9984#discussion_r354692195 ## File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java ## @@ -0,0 +1,340 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions; +import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient; +import org.apache.flink.kubernetes.kubeclient.KubeClientFactory; +import org.apache.flink.kubernetes.kubeclient.TaskManagerPodParameter; +import org.apache.flink.kubernetes.kubeclient.resources.KubernetesPod; +import org.apache.flink.kubernetes.taskmanager.KubernetesTaskExecutorRunner; +import org.apache.flink.kubernetes.utils.Constants; +import org.apache.flink.kubernetes.utils.KubernetesUtils; +import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.clusterframework.BootstrapTools; +import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.entrypoint.ClusterInformation; +import org.apache.flink.runtime.entrypoint.parser.CommandLineOptions; +import org.apache.flink.runtime.heartbeat.HeartbeatServices; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.metrics.groups.ResourceManagerMetricGroup; +import org.apache.flink.runtime.resourcemanager.ActiveResourceManager; +import org.apache.flink.runtime.resourcemanager.JobLeaderIdService; +import org.apache.flink.runtime.resourcemanager.ResourceManager; +import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException; +import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager; +import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.runtime.rpc.RpcService; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.File; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicLong; + +/** + * Kubernetes specific implementation of the {@link ResourceManager}. + */ +public class KubernetesResourceManager extends ActiveResourceManager + implements FlinkKubeClient.PodCallbackHandler { + + private static final Logger LOG = LoggerFactory.getLogger(KubernetesResourceManager.class); + + /** The taskmanager pod name pattern is {clusterId}-{taskmanager}-{attemptId}-{podIndex}. */ + private static final String TASK_MANAGER_POD_FORMAT = "%s-taskmanager-%d-%d"; + + private final Map workerNodes = new HashMap<>(); + + private final double defaultCpus; + + /** When ResourceManager failover, the max attempt should recover. */ + private final AtomicLong currentMaxAttemptId = new AtomicLong(0); + + /** Current max pod index. When creating a new pod, it should increase one. */ + private final AtomicLong currentMaxPodId = new AtomicLong(0); + + private final String clusterId; + + private final FlinkKubeClient kubeClient; + + private final ContaineredTaskManagerParameters taskManagerParameters; + + private final List taskManagerStartCommand; + + /** The number of pods requested, but not yet granted. */ + private int numPendingPodRequests = 0; + + public KubernetesResourceManager( + RpcService rpcService, + String resourceManagerEndpointId, + ResourceID resourceId, + Configuration flinkConfig, +
[GitHub] [flink] flinkbot edited a comment on issue #10403: [FLINK-14645][table] Support to keep nullability and precision when converting DataTypes to properties
flinkbot edited a comment on issue #10403: [FLINK-14645][table] Support to keep nullability and precision when converting DataTypes to properties URL: https://github.com/apache/flink/pull/10403#issuecomment-561208418 ## CI report: * 038052c833940fbaef3111ed92525216f13edee2 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/139161968) * b0cfd6fc06fb9c581fe74f762654cbb1355ff4a8 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/139259047) * a7adb1327e1705666ae0614e1e4361d05256633d : CANCELED [Build](https://travis-ci.com/flink-ci/flink/builds/139263403) * 73d285470f1bc8475e4563fd23c307b0694dd35e : CANCELED [Build](https://travis-ci.com/flink-ci/flink/builds/139278723) * 6be2267f8937d80a0e4bb5db664bf56169089a1f : CANCELED [Build](https://travis-ci.com/flink-ci/flink/builds/139296259) * 4be2bda46373f12c754183c67ad192b2715c37dd : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/139339756) * c5d81539a92ad9d04e03c3ded1c4dcfa3d1b2bd0 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/139446926) * e1aefe79e1afe03398917b8c8bd8b95facab : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/139562400) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #10389: [FLINK-14198][python] Add "type" and "rtype" options to flink python API docstrings of table.py and table_environment.py
flinkbot edited a comment on issue #10389: [FLINK-14198][python] Add "type" and "rtype" options to flink python API docstrings of table.py and table_environment.py URL: https://github.com/apache/flink/pull/10389#issuecomment-560984209 ## CI report: * ebc6a4396f71fda053e32ed71994e5ce129835c1 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/139081057) * 1a14a71c2785b0c98831779b1e058f585bdc762c : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/139501852) * b1b324f72f4561126eb1dcbf144947d2b3ef2dc1 : UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #9984: [FLINK-9495][kubernetes] Implement ResourceManager for Kubernetes.
zhijiangW commented on a change in pull request #9984: [FLINK-9495][kubernetes] Implement ResourceManager for Kubernetes. URL: https://github.com/apache/flink/pull/9984#discussion_r354691495 ## File path: flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesResourceManagerTest.java ## @@ -0,0 +1,372 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions; +import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient; +import org.apache.flink.kubernetes.kubeclient.TaskManagerPodParameter; +import org.apache.flink.kubernetes.kubeclient.resources.KubernetesPod; +import org.apache.flink.kubernetes.utils.Constants; +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.clusterframework.types.SlotID; +import org.apache.flink.runtime.entrypoint.ClusterInformation; +import org.apache.flink.runtime.heartbeat.HeartbeatServices; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.instance.HardwareDescription; +import org.apache.flink.runtime.messages.Acknowledge; +import org.apache.flink.runtime.metrics.groups.ResourceManagerMetricGroup; +import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; +import org.apache.flink.runtime.registration.RegistrationResponse; +import org.apache.flink.runtime.resourcemanager.JobLeaderIdService; +import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; +import org.apache.flink.runtime.resourcemanager.SlotRequest; +import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager; +import org.apache.flink.runtime.resourcemanager.utils.MockResourceManagerRuntimeServices; +import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.runtime.rpc.TestingRpcService; +import org.apache.flink.runtime.taskexecutor.SlotReport; +import org.apache.flink.runtime.taskexecutor.SlotStatus; +import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway; +import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess; +import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder; +import org.apache.flink.runtime.util.TestingFatalErrorHandler; + +import io.fabric8.kubernetes.api.model.Container; +import io.fabric8.kubernetes.api.model.ContainerStateBuilder; +import io.fabric8.kubernetes.api.model.ContainerStatusBuilder; +import io.fabric8.kubernetes.api.model.EnvVarBuilder; +import io.fabric8.kubernetes.api.model.Pod; +import io.fabric8.kubernetes.api.model.PodList; +import io.fabric8.kubernetes.api.model.PodStatusBuilder; +import io.fabric8.kubernetes.client.KubernetesClient; +import org.hamcrest.Matchers; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; + +import static junit.framework.TestCase.assertEquals; +import static org.hamcrest.Matchers.instanceOf; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + +/** + * Test cases for {@link KubernetesResourceManager}. + */ +public class KubernetesResourceManagerTest extends KubernetesTestBase { + + private static final Time TIMEOUT = Time.seconds(10L); + + private TestingFatalErrorHandler testingFatalErrorHandler; + + private final String jobManagerHost = "jm-host1"; + + private Configuration flinkConfig; + + private TestingKubernetesResourceManager resourceManager; + + private FlinkK
[GitHub] [flink] wuchong commented on a change in pull request #10403: [FLINK-14645][table] Support to keep nullability and precision when converting DataTypes to properties
wuchong commented on a change in pull request #10403: [FLINK-14645][table] Support to keep nullability and precision when converting DataTypes to properties URL: https://github.com/apache/flink/pull/10403#discussion_r354691466 ## File path: flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/descriptors/OldCsv.java ## @@ -123,17 +142,41 @@ public OldCsv field(String fieldName, TypeInformation fieldType) { * This method can be called multiple times. The call order of this method defines * also the order of the fields in the format. * +* NOTE: the fieldType string should follow the type string defined in {@link LogicalTypeParser}. +* This method also keeps compatible with old type string defined in {@link TypeStringUtils} but +* will be dropped in future versions as it uses the old type system. +* * @param fieldName the field name * @param fieldType the type string of the field */ public OldCsv field(String fieldName, String fieldType) { + if (isLegacyTypeString(fieldType)) { + // fallback to legacy parser + TypeInformation typeInfo = TypeStringUtils.readTypeInfo(fieldType); + return field(fieldName, TypeConversions.fromLegacyInfoToDataType(typeInfo)); + } else { + return addField(fieldName, fieldType); + } + } + + private OldCsv addField(String fieldName, String fieldType) { if (schema.containsKey(fieldName)) { throw new ValidationException("Duplicate field name " + fieldName + "."); } schema.put(fieldName, fieldType); return this; } + private boolean isLegacyTypeString(String fieldType) { + try { + LogicalTypeParser.parse(fieldType); Review comment: Sorry, I updated `Schema` but forgot to update `OldCsv`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] wsry commented on a change in pull request #10375: [FLINK-14845][runtime] Introduce data compression to reduce disk and network IO of shuffle.
wsry commented on a change in pull request #10375: [FLINK-14845][runtime] Introduce data compression to reduce disk and network IO of shuffle. URL: https://github.com/apache/flink/pull/10375#discussion_r354691010 ## File path: flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java ## @@ -54,6 +54,28 @@ .withDescription("Enable SSL support for the taskmanager data transport. This is applicable only when the" + " global flag for internal SSL (" + SecurityOptions.SSL_INTERNAL_ENABLED.key() + ") is set to true"); + /** +* Boolean flag indicating whether the shuffle data will be compressed or not. +* +* Note: Data is compressed per buffer (may be sliced buffer in pipeline mode) and compression can incur extra +* CPU overhead so it is more effective for IO bounded scenario when data compression ratio is high. +*/ + public static final ConfigOption DATA_COMPRESSION_ENABLED = + key("taskmanager.data.compression.enabled") Review comment: Thanks for the comments. IMO, configuring separately for blocking and pipeline mode is better. I will adopt the proposal. Maybe in the future, we can make some of the network config options configurable per job. For example, by passing the job level configuration to the create method and overriding the default cluster configuration when initializing InputGate and ResultPartition in Task. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #9984: [FLINK-9495][kubernetes] Implement ResourceManager for Kubernetes.
zhijiangW commented on a change in pull request #9984: [FLINK-9495][kubernetes] Implement ResourceManager for Kubernetes. URL: https://github.com/apache/flink/pull/9984#discussion_r354690468 ## File path: flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesResourceManagerTest.java ## @@ -0,0 +1,372 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions; +import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient; +import org.apache.flink.kubernetes.kubeclient.TaskManagerPodParameter; +import org.apache.flink.kubernetes.kubeclient.resources.KubernetesPod; +import org.apache.flink.kubernetes.utils.Constants; +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.clusterframework.types.SlotID; +import org.apache.flink.runtime.entrypoint.ClusterInformation; +import org.apache.flink.runtime.heartbeat.HeartbeatServices; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.instance.HardwareDescription; +import org.apache.flink.runtime.messages.Acknowledge; +import org.apache.flink.runtime.metrics.groups.ResourceManagerMetricGroup; +import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; +import org.apache.flink.runtime.registration.RegistrationResponse; +import org.apache.flink.runtime.resourcemanager.JobLeaderIdService; +import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; +import org.apache.flink.runtime.resourcemanager.SlotRequest; +import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager; +import org.apache.flink.runtime.resourcemanager.utils.MockResourceManagerRuntimeServices; +import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.runtime.rpc.TestingRpcService; +import org.apache.flink.runtime.taskexecutor.SlotReport; +import org.apache.flink.runtime.taskexecutor.SlotStatus; +import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway; +import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess; +import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder; +import org.apache.flink.runtime.util.TestingFatalErrorHandler; + +import io.fabric8.kubernetes.api.model.Container; +import io.fabric8.kubernetes.api.model.ContainerStateBuilder; +import io.fabric8.kubernetes.api.model.ContainerStatusBuilder; +import io.fabric8.kubernetes.api.model.EnvVarBuilder; +import io.fabric8.kubernetes.api.model.Pod; +import io.fabric8.kubernetes.api.model.PodList; +import io.fabric8.kubernetes.api.model.PodStatusBuilder; +import io.fabric8.kubernetes.client.KubernetesClient; +import org.hamcrest.Matchers; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; + +import static junit.framework.TestCase.assertEquals; +import static org.hamcrest.Matchers.instanceOf; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + +/** + * Test cases for {@link KubernetesResourceManager}. + */ +public class KubernetesResourceManagerTest extends KubernetesTestBase { + + private static final Time TIMEOUT = Time.seconds(10L); + + private TestingFatalErrorHandler testingFatalErrorHandler; + + private final String jobManagerHost = "jm-host1"; + + private Configuration flinkConfig; + + private TestingKubernetesResourceManager resourceManager; + + private FlinkK
[GitHub] [flink] wangyang0918 commented on a change in pull request #9984: [FLINK-9495][kubernetes] Implement ResourceManager for Kubernetes.
wangyang0918 commented on a change in pull request #9984: [FLINK-9495][kubernetes] Implement ResourceManager for Kubernetes. URL: https://github.com/apache/flink/pull/9984#discussion_r354690623 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ActiveResourceManager.java ## @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.resourcemanager; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.runtime.clusterframework.TaskExecutorResourceSpec; +import org.apache.flink.runtime.clusterframework.TaskExecutorResourceUtils; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.clusterframework.types.ResourceIDRetrievable; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.entrypoint.ClusterInformation; +import org.apache.flink.runtime.heartbeat.HeartbeatServices; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.metrics.groups.ResourceManagerMetricGroup; +import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager; +import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.util.FlinkException; + +import java.util.Collection; +import java.util.Map; +import java.util.concurrent.CompletableFuture; + +/** + * Base class for {@link ResourceManager} implementations which contains some common variables and methods. + */ +public abstract class ActiveResourceManager Review comment: I think active means the `ResourceManager` actively and dynamically allocates resource from the resource management cluster. I think `ActiveResourceManagerFactory` has existed for the same reason. If you insist, i could rename `ActiveResourceManager` to `ClusterResourceManager`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #10246: [FLINK-10937][dist] Add kubernetes-entry.sh and kubernetes-session.sh.
flinkbot edited a comment on issue #10246: [FLINK-10937][dist] Add kubernetes-entry.sh and kubernetes-session.sh. URL: https://github.com/apache/flink/pull/10246#issuecomment-555375256 ## CI report: * 9f06575f29a7cb1102357c8919b57b3471dc80fd : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/137142048) * ffd0fdc2709bd21ee9574be845fc4a24d938ca45 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/138017483) * 24406e36296a0bf1c9b057811e54a03fb6abab9b : UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #9984: [FLINK-9495][kubernetes] Implement ResourceManager for Kubernetes.
zhijiangW commented on a change in pull request #9984: [FLINK-9495][kubernetes] Implement ResourceManager for Kubernetes. URL: https://github.com/apache/flink/pull/9984#discussion_r354690086 ## File path: flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesResourceManagerTest.java ## @@ -0,0 +1,372 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions; +import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient; +import org.apache.flink.kubernetes.kubeclient.TaskManagerPodParameter; +import org.apache.flink.kubernetes.kubeclient.resources.KubernetesPod; +import org.apache.flink.kubernetes.utils.Constants; +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.clusterframework.types.SlotID; +import org.apache.flink.runtime.entrypoint.ClusterInformation; +import org.apache.flink.runtime.heartbeat.HeartbeatServices; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.instance.HardwareDescription; +import org.apache.flink.runtime.messages.Acknowledge; +import org.apache.flink.runtime.metrics.groups.ResourceManagerMetricGroup; +import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; +import org.apache.flink.runtime.registration.RegistrationResponse; +import org.apache.flink.runtime.resourcemanager.JobLeaderIdService; +import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; +import org.apache.flink.runtime.resourcemanager.SlotRequest; +import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager; +import org.apache.flink.runtime.resourcemanager.utils.MockResourceManagerRuntimeServices; +import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.runtime.rpc.TestingRpcService; +import org.apache.flink.runtime.taskexecutor.SlotReport; +import org.apache.flink.runtime.taskexecutor.SlotStatus; +import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway; +import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess; +import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder; +import org.apache.flink.runtime.util.TestingFatalErrorHandler; + +import io.fabric8.kubernetes.api.model.Container; +import io.fabric8.kubernetes.api.model.ContainerStateBuilder; +import io.fabric8.kubernetes.api.model.ContainerStatusBuilder; +import io.fabric8.kubernetes.api.model.EnvVarBuilder; +import io.fabric8.kubernetes.api.model.Pod; +import io.fabric8.kubernetes.api.model.PodList; +import io.fabric8.kubernetes.api.model.PodStatusBuilder; +import io.fabric8.kubernetes.client.KubernetesClient; +import org.hamcrest.Matchers; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; + +import static junit.framework.TestCase.assertEquals; +import static org.hamcrest.Matchers.instanceOf; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + +/** + * Test cases for {@link KubernetesResourceManager}. + */ +public class KubernetesResourceManagerTest extends KubernetesTestBase { + + private static final Time TIMEOUT = Time.seconds(10L); + + private TestingFatalErrorHandler testingFatalErrorHandler; + + private final String jobManagerHost = "jm-host1"; + + private Configuration flinkConfig; + + private TestingKubernetesResourceManager resourceManager; + + private FlinkK
[GitHub] [flink] wangyang0918 commented on a change in pull request #9984: [FLINK-9495][kubernetes] Implement ResourceManager for Kubernetes.
wangyang0918 commented on a change in pull request #9984: [FLINK-9495][kubernetes] Implement ResourceManager for Kubernetes. URL: https://github.com/apache/flink/pull/9984#discussion_r354689980 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ActiveResourceManager.java ## @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.resourcemanager; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.runtime.clusterframework.TaskExecutorResourceSpec; +import org.apache.flink.runtime.clusterframework.TaskExecutorResourceUtils; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.clusterframework.types.ResourceIDRetrievable; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.entrypoint.ClusterInformation; +import org.apache.flink.runtime.heartbeat.HeartbeatServices; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.metrics.groups.ResourceManagerMetricGroup; +import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager; +import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.util.FlinkException; + +import java.util.Collection; +import java.util.Map; +import java.util.concurrent.CompletableFuture; + +/** + * Base class for {@link ResourceManager} implementations which contains some common variables and methods. + */ +public abstract class ActiveResourceManager + extends ResourceManager { + + /** The process environment variables. */ + protected final Map env; + + protected final int numSlotsPerTaskManager; + + protected final TaskExecutorResourceSpec taskExecutorResourceSpec; + + protected final int defaultMemoryMB; + + protected final Collection resourceProfilesPerWorker; + + /** +* The updated Flink configuration. The client uploaded configuration may be updated before passed on to +* {@link ResourceManager}. For example, {@link TaskManagerOptions#MANAGED_MEMORY_SIZE}. +*/ + protected final Configuration flinkConfig; + + /** Flink configuration uploaded by client. */ + protected final Configuration flinkClientConfig; + + public ActiveResourceManager( + Configuration flinkConfig, + Map env, + RpcService rpcService, + String resourceManagerEndpointId, + ResourceID resourceId, + HighAvailabilityServices highAvailabilityServices, + HeartbeatServices heartbeatServices, + SlotManager slotManager, + JobLeaderIdService jobLeaderIdService, + ClusterInformation clusterInformation, + FatalErrorHandler fatalErrorHandler, + ResourceManagerMetricGroup resourceManagerMetricGroup) { + super( + rpcService, + resourceManagerEndpointId, + resourceId, + highAvailabilityServices, + heartbeatServices, + slotManager, + jobLeaderIdService, + clusterInformation, + fatalErrorHandler, + resourceManagerMetricGroup); + + this.flinkConfig = flinkConfig; + this.env = env; + + this.numSlotsPerTaskManager = flinkConfig.getInteger(TaskManagerOptions.NUM_TASK_SLOTS); + this.taskExecutorResourceSpec = TaskExecutorResourceUtils.resourceSpecFromConfig(flinkConfig); + this.defaultMemoryMB = taskExecutorResourceSpec.getTotalProcessMemorySize().getMebiBytes(); + + this.resourceProfilesPerWorker = createWorkerSlotProfiles(fli
[jira] [Created] (FLINK-15091) JoinITCase.testFullJoinWithNonEquiJoinPred failed in travis
Kurt Young created FLINK-15091: -- Summary: JoinITCase.testFullJoinWithNonEquiJoinPred failed in travis Key: FLINK-15091 URL: https://issues.apache.org/jira/browse/FLINK-15091 Project: Flink Issue Type: Bug Components: Table SQL / Planner Affects Versions: 1.10.0 Reporter: Kurt Young Assignee: Jingsong Lee Fix For: 1.10.0 04:45:22.404 [ERROR] Tests run: 21, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 4.909 s <<< FAILURE! - in org.apache.flink.table.planner.runtime.batch.table.JoinITCase 04:45:22.406 [ERROR] testFullJoinWithNonEquiJoinPred(org.apache.flink.table.planner.runtime.batch.table.JoinITCase) Time elapsed: 0.168 s <<< ERROR! org.apache.flink.runtime.client.JobExecutionException: Job execution failed. at org.apache.flink.table.planner.runtime.batch.table.JoinITCase.testFullJoinWithNonEquiJoinPred(JoinITCase.scala:344) Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy Caused by: org.apache.flink.runtime.memory.MemoryAllocationException: Could not allocate 32 pages. Only 0 pages are remaining. details: [https://api.travis-ci.org/v3/job/621407747/log.txt] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] wangxlong commented on issue #10369: [FLINK-14893][flink-core]Try child classLoader when parent classLoader could not load in parentFirstPatterns
wangxlong commented on issue #10369: [FLINK-14893][flink-core]Try child classLoader when parent classLoader could not load in parentFirstPatterns URL: https://github.com/apache/flink/pull/10369#issuecomment-562458580 > > > @wangxlong thanks for your contribution. Changes look good to me. It would be better if you can write a test for the specific case and I think it is somewhat necessary :-) > > > > > > Hi @TisonKun . I am sorry for replying late. I found ClassLoader#loadClass will invoke findClass when loadClass failed. So it is not necessary to invoke findClass by ourselves. What do you think of it? > > OK I think it is the reason why we tend to add a test for bisect the behavior :-) > > IIUC you mean that with or without this patch, classloading always works as expected, right? > > If so, it'd better we just close this ticket. Yes, you are right. I will close this ticket. Thanks you for your review. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #9984: [FLINK-9495][kubernetes] Implement ResourceManager for Kubernetes.
zhijiangW commented on a change in pull request #9984: [FLINK-9495][kubernetes] Implement ResourceManager for Kubernetes. URL: https://github.com/apache/flink/pull/9984#discussion_r354689510 ## File path: flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesResourceManagerTest.java ## @@ -0,0 +1,372 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions; +import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient; +import org.apache.flink.kubernetes.kubeclient.TaskManagerPodParameter; +import org.apache.flink.kubernetes.kubeclient.resources.KubernetesPod; +import org.apache.flink.kubernetes.utils.Constants; +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.clusterframework.types.SlotID; +import org.apache.flink.runtime.entrypoint.ClusterInformation; +import org.apache.flink.runtime.heartbeat.HeartbeatServices; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.instance.HardwareDescription; +import org.apache.flink.runtime.messages.Acknowledge; +import org.apache.flink.runtime.metrics.groups.ResourceManagerMetricGroup; +import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; +import org.apache.flink.runtime.registration.RegistrationResponse; +import org.apache.flink.runtime.resourcemanager.JobLeaderIdService; +import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; +import org.apache.flink.runtime.resourcemanager.SlotRequest; +import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager; +import org.apache.flink.runtime.resourcemanager.utils.MockResourceManagerRuntimeServices; +import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.runtime.rpc.TestingRpcService; +import org.apache.flink.runtime.taskexecutor.SlotReport; +import org.apache.flink.runtime.taskexecutor.SlotStatus; +import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway; +import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess; +import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder; +import org.apache.flink.runtime.util.TestingFatalErrorHandler; + +import io.fabric8.kubernetes.api.model.Container; +import io.fabric8.kubernetes.api.model.ContainerStateBuilder; +import io.fabric8.kubernetes.api.model.ContainerStatusBuilder; +import io.fabric8.kubernetes.api.model.EnvVarBuilder; +import io.fabric8.kubernetes.api.model.Pod; +import io.fabric8.kubernetes.api.model.PodList; +import io.fabric8.kubernetes.api.model.PodStatusBuilder; +import io.fabric8.kubernetes.client.KubernetesClient; +import org.hamcrest.Matchers; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; + +import static junit.framework.TestCase.assertEquals; +import static org.hamcrest.Matchers.instanceOf; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + +/** + * Test cases for {@link KubernetesResourceManager}. + */ +public class KubernetesResourceManagerTest extends KubernetesTestBase { + + private static final Time TIMEOUT = Time.seconds(10L); + + private TestingFatalErrorHandler testingFatalErrorHandler; + + private final String jobManagerHost = "jm-host1"; + + private Configuration flinkConfig; + + private TestingKubernetesResourceManager resourceManager; + + private FlinkK
[GitHub] [flink] wangxlong closed pull request #10369: [FLINK-14893][flink-core]Try child classLoader when parent classLoader could not load in parentFirstPatterns
wangxlong closed pull request #10369: [FLINK-14893][flink-core]Try child classLoader when parent classLoader could not load in parentFirstPatterns URL: https://github.com/apache/flink/pull/10369 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Closed] (FLINK-14552) Enable partition statistics in blink planner
[ https://issues.apache.org/jira/browse/FLINK-14552?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kurt Young closed FLINK-14552. -- Fix Version/s: 1.10.0 Resolution: Fixed master: 0d6b43959bb6a576c95549e845f7db5e51229740 > Enable partition statistics in blink planner > > > Key: FLINK-14552 > URL: https://issues.apache.org/jira/browse/FLINK-14552 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / Planner >Reporter: Jingsong Lee >Assignee: Jingsong Lee >Priority: Major > Labels: pull-request-available > Fix For: 1.10.0 > > Time Spent: 20m > Remaining Estimate: 0h > > We need update statistics after partition pruning in > PushPartitionIntoTableSourceScanRule. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] WeiZhong94 commented on issue #10389: [FLINK-14198][python] Add "type" and "rtype" options to flink python API docstrings of table.py and table_environment.py
WeiZhong94 commented on issue #10389: [FLINK-14198][python] Add "type" and "rtype" options to flink python API docstrings of table.py and table_environment.py URL: https://github.com/apache/flink/pull/10389#issuecomment-562457573 @sunjincheng121 Thanks for your review! I have addressed your comments in the latest commit. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #9984: [FLINK-9495][kubernetes] Implement ResourceManager for Kubernetes.
zhijiangW commented on a change in pull request #9984: [FLINK-9495][kubernetes] Implement ResourceManager for Kubernetes. URL: https://github.com/apache/flink/pull/9984#discussion_r354687863 ## File path: flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesResourceManagerTest.java ## @@ -0,0 +1,372 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions; +import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient; +import org.apache.flink.kubernetes.kubeclient.TaskManagerPodParameter; +import org.apache.flink.kubernetes.kubeclient.resources.KubernetesPod; +import org.apache.flink.kubernetes.utils.Constants; +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.clusterframework.types.SlotID; +import org.apache.flink.runtime.entrypoint.ClusterInformation; +import org.apache.flink.runtime.heartbeat.HeartbeatServices; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.instance.HardwareDescription; +import org.apache.flink.runtime.messages.Acknowledge; +import org.apache.flink.runtime.metrics.groups.ResourceManagerMetricGroup; +import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; +import org.apache.flink.runtime.registration.RegistrationResponse; +import org.apache.flink.runtime.resourcemanager.JobLeaderIdService; +import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; +import org.apache.flink.runtime.resourcemanager.SlotRequest; +import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager; +import org.apache.flink.runtime.resourcemanager.utils.MockResourceManagerRuntimeServices; +import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.runtime.rpc.TestingRpcService; +import org.apache.flink.runtime.taskexecutor.SlotReport; +import org.apache.flink.runtime.taskexecutor.SlotStatus; +import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway; +import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess; +import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder; +import org.apache.flink.runtime.util.TestingFatalErrorHandler; + +import io.fabric8.kubernetes.api.model.Container; +import io.fabric8.kubernetes.api.model.ContainerStateBuilder; +import io.fabric8.kubernetes.api.model.ContainerStatusBuilder; +import io.fabric8.kubernetes.api.model.EnvVarBuilder; +import io.fabric8.kubernetes.api.model.Pod; +import io.fabric8.kubernetes.api.model.PodList; +import io.fabric8.kubernetes.api.model.PodStatusBuilder; +import io.fabric8.kubernetes.client.KubernetesClient; +import org.hamcrest.Matchers; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; + +import static junit.framework.TestCase.assertEquals; +import static org.hamcrest.Matchers.instanceOf; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + +/** + * Test cases for {@link KubernetesResourceManager}. + */ +public class KubernetesResourceManagerTest extends KubernetesTestBase { + + private static final Time TIMEOUT = Time.seconds(10L); + + private TestingFatalErrorHandler testingFatalErrorHandler; + + private final String jobManagerHost = "jm-host1"; + + private Configuration flinkConfig; + + private TestingKubernetesResourceManager resourceManager; + + private FlinkK
[GitHub] [flink] flinkbot commented on issue #10455: [FLINK-15089][connectors] Puslar catalog
flinkbot commented on issue #10455: [FLINK-15089][connectors] Puslar catalog URL: https://github.com/apache/flink/pull/10455#issuecomment-562457258 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Automated Checks Last check on commit 206bb09e47ff4f36514a93c96f7cadc30f62cd4b (Fri Dec 06 06:58:15 UTC 2019) **Warnings:** * **2 pom.xml files were touched**: Check for build and licensing issues. * No documentation files were touched! Remember to keep the Flink docs up to date! * **This pull request references an unassigned [Jira ticket](https://issues.apache.org/jira/browse/FLINK-15089).** According to the [code contribution guide](https://flink.apache.org/contributing/contribute-code.html), tickets need to be assigned before starting with the implementation work. Mention the bot in a comment to re-run the automated checks. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] KurtYoung merged pull request #10315: [FLINK-14552][table] Enable partition statistics in blink planner
KurtYoung merged pull request #10315: [FLINK-14552][table] Enable partition statistics in blink planner URL: https://github.com/apache/flink/pull/10315 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-15089) Pulsar Catalog
[ https://issues.apache.org/jira/browse/FLINK-15089?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-15089: --- Labels: pull-request-available (was: ) > Pulsar Catalog > -- > > Key: FLINK-15089 > URL: https://issues.apache.org/jira/browse/FLINK-15089 > Project: Flink > Issue Type: Sub-task > Components: Connectors / Common >Reporter: Yijie Shen >Priority: Major > Labels: pull-request-available > > Per discussion in the mailing list, A Pulsar Catalog implementation is made > to a single task. The design doc is: > [https://docs.google.com/document/d/1LMnABtXn-wQedsmWv8hopvx-B-jbdr8-jHbIiDhdsoE/edit?usp=sharing] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] yjshen opened a new pull request #10455: [FLINK-15089][connectors] Puslar catalog
yjshen opened a new pull request #10455: [FLINK-15089][connectors] Puslar catalog URL: https://github.com/apache/flink/pull/10455 ## What is the purpose of the change This PR implements Pulsar catalog which is part of [FLIP-72](https://cwiki.apache.org/confluence/display/FLINK/FLIP-72%3A+Introduce+Pulsar+Connector). The Pulsar catalog supports the following features by extending `AbstractCatalog` API: - Open and close the catalog - Database operations: list, get, exists, create, drop (create and drop should first check if the user has sufficient permissions) - Table operations: list, get, exist, create, drop (create and drop should first check if the user has sufficient permissions) ## Brief change log - The main functionality of Catalog exists in `PulsarCatalog` and `PulsarMetadataReader`. - Pulsar Schema <-> Flink type conversion exists in `SchemaUtils`. ## Verifying this change This PR will be tested by unit tests and integration tests. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes) . Add Pulsar dependencies. - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (yes) - If yes, how is the feature documented? (https://docs.google.com/document/d/1LMnABtXn-wQedsmWv8hopvx-B-jbdr8-jHbIiDhdsoE/edit?usp=sharing) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-15090) Reverse the dependency from flink-streaming-java to flink-client
[ https://issues.apache.org/jira/browse/FLINK-15090?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zili Chen updated FLINK-15090: -- Description: After FLIP-73 the dependencies are minor. Tasks I can find are 1. Move {{StreamGraphTranslator}} to {{flink-client}}. 2. Implement similar context environment of streaming as batch env in {{flink-client}}. Set/Unset as context along with {{ExecutionEnvironment}}. After this task we still have a dependency from {{flink-streaming-java}} to {{flink-java}} because of some input formats dependencies. We can break the dependencies as follow-ups. cc [~aljoscha] was: After FLIP-73 the dependencies are minor. Tasks I can find are 1. Move {{StreamGraphTranslator}} to {{flink-client}}. 2. Implement similar context environment of streaming as batch env in {{flink-client}}. Set/Unset as context along with {{ExecutionEnvironment}}. After this task we still have a dependency from {{flink-streaming-java}} to {{flink-java}} because some input formats dependencies. We can break the dependencies as follow-ups. cc [~aljoscha] > Reverse the dependency from flink-streaming-java to flink-client > > > Key: FLINK-15090 > URL: https://issues.apache.org/jira/browse/FLINK-15090 > Project: Flink > Issue Type: Improvement >Reporter: Zili Chen >Priority: Major > Fix For: 1.11.0 > > > After FLIP-73 the dependencies are minor. Tasks I can find are > 1. Move {{StreamGraphTranslator}} to {{flink-client}}. > 2. Implement similar context environment of streaming as batch env in > {{flink-client}}. Set/Unset as context along with {{ExecutionEnvironment}}. > After this task we still have a dependency from {{flink-streaming-java}} to > {{flink-java}} because of some input formats dependencies. We can break the > dependencies as follow-ups. > cc [~aljoscha] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] zhijiangW commented on a change in pull request #9984: [FLINK-9495][kubernetes] Implement ResourceManager for Kubernetes.
zhijiangW commented on a change in pull request #9984: [FLINK-9495][kubernetes] Implement ResourceManager for Kubernetes. URL: https://github.com/apache/flink/pull/9984#discussion_r354687863 ## File path: flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesResourceManagerTest.java ## @@ -0,0 +1,372 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions; +import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient; +import org.apache.flink.kubernetes.kubeclient.TaskManagerPodParameter; +import org.apache.flink.kubernetes.kubeclient.resources.KubernetesPod; +import org.apache.flink.kubernetes.utils.Constants; +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.clusterframework.types.SlotID; +import org.apache.flink.runtime.entrypoint.ClusterInformation; +import org.apache.flink.runtime.heartbeat.HeartbeatServices; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.instance.HardwareDescription; +import org.apache.flink.runtime.messages.Acknowledge; +import org.apache.flink.runtime.metrics.groups.ResourceManagerMetricGroup; +import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; +import org.apache.flink.runtime.registration.RegistrationResponse; +import org.apache.flink.runtime.resourcemanager.JobLeaderIdService; +import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; +import org.apache.flink.runtime.resourcemanager.SlotRequest; +import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager; +import org.apache.flink.runtime.resourcemanager.utils.MockResourceManagerRuntimeServices; +import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.runtime.rpc.TestingRpcService; +import org.apache.flink.runtime.taskexecutor.SlotReport; +import org.apache.flink.runtime.taskexecutor.SlotStatus; +import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway; +import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess; +import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder; +import org.apache.flink.runtime.util.TestingFatalErrorHandler; + +import io.fabric8.kubernetes.api.model.Container; +import io.fabric8.kubernetes.api.model.ContainerStateBuilder; +import io.fabric8.kubernetes.api.model.ContainerStatusBuilder; +import io.fabric8.kubernetes.api.model.EnvVarBuilder; +import io.fabric8.kubernetes.api.model.Pod; +import io.fabric8.kubernetes.api.model.PodList; +import io.fabric8.kubernetes.api.model.PodStatusBuilder; +import io.fabric8.kubernetes.client.KubernetesClient; +import org.hamcrest.Matchers; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; + +import static junit.framework.TestCase.assertEquals; +import static org.hamcrest.Matchers.instanceOf; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + +/** + * Test cases for {@link KubernetesResourceManager}. + */ +public class KubernetesResourceManagerTest extends KubernetesTestBase { + + private static final Time TIMEOUT = Time.seconds(10L); + + private TestingFatalErrorHandler testingFatalErrorHandler; + + private final String jobManagerHost = "jm-host1"; + + private Configuration flinkConfig; + + private TestingKubernetesResourceManager resourceManager; + + private FlinkK
[jira] [Updated] (FLINK-15090) Reverse the dependency from flink-streaming-java to flink-client
[ https://issues.apache.org/jira/browse/FLINK-15090?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zili Chen updated FLINK-15090: -- Description: After FLIP-73 the dependencies are minor. Tasks I can find are 1. Move {{StreamGraphTranslator}} to {{flink-client}}. 2. Implement similar context environment of streaming as batch env in {{flink-client}}. Set/Unset as context along with {{ExecutionEnvironment}}. After this task we still have a dependency from {{flink-streaming-java}} to {{flink-java}} because some input formats dependencies. We can break the dependencies as follow-ups. cc [~aljoscha] was: After FLIP-73 the dependencies are minor. Tasks I can find are 1. Move {{StreamGraphTranslator}} to {{flink-client}}. 2. Implement similar context environment of streaming as {{flink-java}}. Set/Unset as context along with {{ExecutionEnvironment}}. After this task we still have a dependency from {{flink-streaming-java}} to {{flink-java}} because some input formats dependencies. We can break the dependencies as follow-ups. cc [~aljoscha] > Reverse the dependency from flink-streaming-java to flink-client > > > Key: FLINK-15090 > URL: https://issues.apache.org/jira/browse/FLINK-15090 > Project: Flink > Issue Type: Improvement >Reporter: Zili Chen >Priority: Major > Fix For: 1.11.0 > > > After FLIP-73 the dependencies are minor. Tasks I can find are > 1. Move {{StreamGraphTranslator}} to {{flink-client}}. > 2. Implement similar context environment of streaming as batch env in > {{flink-client}}. Set/Unset as context along with {{ExecutionEnvironment}}. > After this task we still have a dependency from {{flink-streaming-java}} to > {{flink-java}} because some input formats dependencies. We can break the > dependencies as follow-ups. > cc [~aljoscha] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-15090) Reverse the dependency from flink-streaming-java to flink-client
Zili Chen created FLINK-15090: - Summary: Reverse the dependency from flink-streaming-java to flink-client Key: FLINK-15090 URL: https://issues.apache.org/jira/browse/FLINK-15090 Project: Flink Issue Type: Improvement Reporter: Zili Chen Fix For: 1.11.0 After FLIP-73 the dependencies are minor. Tasks I can find are 1. Move {{StreamGraphTranslator}} to {{flink-client}}. 2. Implement similar context environment of streaming as {{flink-java}}. Set/Unset as context along with {{ExecutionEnvironment}}. After this task we still have a dependency from {{flink-streaming-java}} to {{flink-java}} because some input formats dependencies. We can break the dependencies as follow-ups. cc [~aljoscha] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] zhijiangW commented on a change in pull request #9984: [FLINK-9495][kubernetes] Implement ResourceManager for Kubernetes.
zhijiangW commented on a change in pull request #9984: [FLINK-9495][kubernetes] Implement ResourceManager for Kubernetes. URL: https://github.com/apache/flink/pull/9984#discussion_r354682702 ## File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java ## @@ -0,0 +1,340 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions; +import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient; +import org.apache.flink.kubernetes.kubeclient.KubeClientFactory; +import org.apache.flink.kubernetes.kubeclient.TaskManagerPodParameter; +import org.apache.flink.kubernetes.kubeclient.resources.KubernetesPod; +import org.apache.flink.kubernetes.taskmanager.KubernetesTaskExecutorRunner; +import org.apache.flink.kubernetes.utils.Constants; +import org.apache.flink.kubernetes.utils.KubernetesUtils; +import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.clusterframework.BootstrapTools; +import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.entrypoint.ClusterInformation; +import org.apache.flink.runtime.entrypoint.parser.CommandLineOptions; +import org.apache.flink.runtime.heartbeat.HeartbeatServices; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.metrics.groups.ResourceManagerMetricGroup; +import org.apache.flink.runtime.resourcemanager.ActiveResourceManager; +import org.apache.flink.runtime.resourcemanager.JobLeaderIdService; +import org.apache.flink.runtime.resourcemanager.ResourceManager; +import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException; +import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager; +import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.runtime.rpc.RpcService; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.File; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicLong; + +/** + * Kubernetes specific implementation of the {@link ResourceManager}. + */ +public class KubernetesResourceManager extends ActiveResourceManager + implements FlinkKubeClient.PodCallbackHandler { + + private static final Logger LOG = LoggerFactory.getLogger(KubernetesResourceManager.class); + + /** The taskmanager pod name pattern is {clusterId}-{taskmanager}-{attemptId}-{podIndex}. */ + private static final String TASK_MANAGER_POD_FORMAT = "%s-taskmanager-%d-%d"; + + private final Map workerNodes = new HashMap<>(); + + private final double defaultCpus; + + /** When ResourceManager failover, the max attempt should recover. */ + private final AtomicLong currentMaxAttemptId = new AtomicLong(0); + + /** Current max pod index. When creating a new pod, it should increase one. */ + private final AtomicLong currentMaxPodId = new AtomicLong(0); + + private final String clusterId; + + private final FlinkKubeClient kubeClient; + + private final ContaineredTaskManagerParameters taskManagerParameters; + + private final List taskManagerStartCommand; + + /** The number of pods requested, but not yet granted. */ + private int numPendingPodRequests = 0; + + public KubernetesResourceManager( + RpcService rpcService, + String resourceManagerEndpointId, + ResourceID resourceId, + Configuration flinkConfig, +
[GitHub] [flink] zhijiangW commented on a change in pull request #9984: [FLINK-9495][kubernetes] Implement ResourceManager for Kubernetes.
zhijiangW commented on a change in pull request #9984: [FLINK-9495][kubernetes] Implement ResourceManager for Kubernetes. URL: https://github.com/apache/flink/pull/9984#discussion_r354682441 ## File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java ## @@ -0,0 +1,340 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions; +import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient; +import org.apache.flink.kubernetes.kubeclient.KubeClientFactory; +import org.apache.flink.kubernetes.kubeclient.TaskManagerPodParameter; +import org.apache.flink.kubernetes.kubeclient.resources.KubernetesPod; +import org.apache.flink.kubernetes.taskmanager.KubernetesTaskExecutorRunner; +import org.apache.flink.kubernetes.utils.Constants; +import org.apache.flink.kubernetes.utils.KubernetesUtils; +import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.clusterframework.BootstrapTools; +import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.entrypoint.ClusterInformation; +import org.apache.flink.runtime.entrypoint.parser.CommandLineOptions; +import org.apache.flink.runtime.heartbeat.HeartbeatServices; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.metrics.groups.ResourceManagerMetricGroup; +import org.apache.flink.runtime.resourcemanager.ActiveResourceManager; +import org.apache.flink.runtime.resourcemanager.JobLeaderIdService; +import org.apache.flink.runtime.resourcemanager.ResourceManager; +import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException; +import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager; +import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.runtime.rpc.RpcService; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.File; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicLong; + +/** + * Kubernetes specific implementation of the {@link ResourceManager}. + */ +public class KubernetesResourceManager extends ActiveResourceManager + implements FlinkKubeClient.PodCallbackHandler { + + private static final Logger LOG = LoggerFactory.getLogger(KubernetesResourceManager.class); + + /** The taskmanager pod name pattern is {clusterId}-{taskmanager}-{attemptId}-{podIndex}. */ + private static final String TASK_MANAGER_POD_FORMAT = "%s-taskmanager-%d-%d"; + + private final Map workerNodes = new HashMap<>(); + + private final double defaultCpus; + + /** When ResourceManager failover, the max attempt should recover. */ + private final AtomicLong currentMaxAttemptId = new AtomicLong(0); + + /** Current max pod index. When creating a new pod, it should increase one. */ + private final AtomicLong currentMaxPodId = new AtomicLong(0); + + private final String clusterId; + + private final FlinkKubeClient kubeClient; + + private final ContaineredTaskManagerParameters taskManagerParameters; + + private final List taskManagerStartCommand; + + /** The number of pods requested, but not yet granted. */ + private int numPendingPodRequests = 0; + + public KubernetesResourceManager( + RpcService rpcService, + String resourceManagerEndpointId, + ResourceID resourceId, + Configuration flinkConfig, +
[GitHub] [flink] zhijiangW commented on a change in pull request #9984: [FLINK-9495][kubernetes] Implement ResourceManager for Kubernetes.
zhijiangW commented on a change in pull request #9984: [FLINK-9495][kubernetes] Implement ResourceManager for Kubernetes. URL: https://github.com/apache/flink/pull/9984#discussion_r354680696 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ActiveResourceManager.java ## @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.resourcemanager; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.runtime.clusterframework.TaskExecutorResourceSpec; +import org.apache.flink.runtime.clusterframework.TaskExecutorResourceUtils; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.clusterframework.types.ResourceIDRetrievable; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.entrypoint.ClusterInformation; +import org.apache.flink.runtime.heartbeat.HeartbeatServices; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.metrics.groups.ResourceManagerMetricGroup; +import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager; +import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.util.FlinkException; + +import java.util.Collection; +import java.util.Map; +import java.util.concurrent.CompletableFuture; + +/** + * Base class for {@link ResourceManager} implementations which contains some common variables and methods. + */ +public abstract class ActiveResourceManager Review comment: TBH I think there should be another better semantic decorator instead of `Active` to be distinguished with `StandaloneResourceManager`. Actually we want to describe this resource manager based on resource framework. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #9984: [FLINK-9495][kubernetes] Implement ResourceManager for Kubernetes.
zhijiangW commented on a change in pull request #9984: [FLINK-9495][kubernetes] Implement ResourceManager for Kubernetes. URL: https://github.com/apache/flink/pull/9984#discussion_r354685294 ## File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java ## @@ -0,0 +1,340 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions; +import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient; +import org.apache.flink.kubernetes.kubeclient.KubeClientFactory; +import org.apache.flink.kubernetes.kubeclient.TaskManagerPodParameter; +import org.apache.flink.kubernetes.kubeclient.resources.KubernetesPod; +import org.apache.flink.kubernetes.taskmanager.KubernetesTaskExecutorRunner; +import org.apache.flink.kubernetes.utils.Constants; +import org.apache.flink.kubernetes.utils.KubernetesUtils; +import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.clusterframework.BootstrapTools; +import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.entrypoint.ClusterInformation; +import org.apache.flink.runtime.entrypoint.parser.CommandLineOptions; +import org.apache.flink.runtime.heartbeat.HeartbeatServices; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.metrics.groups.ResourceManagerMetricGroup; +import org.apache.flink.runtime.resourcemanager.ActiveResourceManager; +import org.apache.flink.runtime.resourcemanager.JobLeaderIdService; +import org.apache.flink.runtime.resourcemanager.ResourceManager; +import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException; +import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager; +import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.runtime.rpc.RpcService; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.File; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicLong; + +/** + * Kubernetes specific implementation of the {@link ResourceManager}. + */ +public class KubernetesResourceManager extends ActiveResourceManager + implements FlinkKubeClient.PodCallbackHandler { + + private static final Logger LOG = LoggerFactory.getLogger(KubernetesResourceManager.class); + + /** The taskmanager pod name pattern is {clusterId}-{taskmanager}-{attemptId}-{podIndex}. */ + private static final String TASK_MANAGER_POD_FORMAT = "%s-taskmanager-%d-%d"; + + private final Map workerNodes = new HashMap<>(); + + private final double defaultCpus; + + /** When ResourceManager failover, the max attempt should recover. */ + private final AtomicLong currentMaxAttemptId = new AtomicLong(0); Review comment: If there is no current issues for this variable, we can use the simple `long` type instead. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #9984: [FLINK-9495][kubernetes] Implement ResourceManager for Kubernetes.
zhijiangW commented on a change in pull request #9984: [FLINK-9495][kubernetes] Implement ResourceManager for Kubernetes. URL: https://github.com/apache/flink/pull/9984#discussion_r354672522 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ActiveResourceManager.java ## @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.resourcemanager; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.runtime.clusterframework.TaskExecutorResourceSpec; +import org.apache.flink.runtime.clusterframework.TaskExecutorResourceUtils; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.clusterframework.types.ResourceIDRetrievable; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.entrypoint.ClusterInformation; +import org.apache.flink.runtime.heartbeat.HeartbeatServices; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.metrics.groups.ResourceManagerMetricGroup; +import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager; +import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.util.FlinkException; + +import java.util.Collection; +import java.util.Map; +import java.util.concurrent.CompletableFuture; + +/** + * Base class for {@link ResourceManager} implementations which contains some common variables and methods. + */ +public abstract class ActiveResourceManager + extends ResourceManager { + + /** The process environment variables. */ + protected final Map env; + + protected final int numSlotsPerTaskManager; + + protected final TaskExecutorResourceSpec taskExecutorResourceSpec; + + protected final int defaultMemoryMB; + + protected final Collection resourceProfilesPerWorker; + + /** +* The updated Flink configuration. The client uploaded configuration may be updated before passed on to +* {@link ResourceManager}. For example, {@link TaskManagerOptions#MANAGED_MEMORY_SIZE}. +*/ + protected final Configuration flinkConfig; + + /** Flink configuration uploaded by client. */ + protected final Configuration flinkClientConfig; + + public ActiveResourceManager( + Configuration flinkConfig, + Map env, + RpcService rpcService, + String resourceManagerEndpointId, + ResourceID resourceId, + HighAvailabilityServices highAvailabilityServices, + HeartbeatServices heartbeatServices, + SlotManager slotManager, + JobLeaderIdService jobLeaderIdService, + ClusterInformation clusterInformation, + FatalErrorHandler fatalErrorHandler, + ResourceManagerMetricGroup resourceManagerMetricGroup) { + super( + rpcService, + resourceManagerEndpointId, + resourceId, + highAvailabilityServices, + heartbeatServices, + slotManager, + jobLeaderIdService, + clusterInformation, + fatalErrorHandler, + resourceManagerMetricGroup); + + this.flinkConfig = flinkConfig; + this.env = env; + + this.numSlotsPerTaskManager = flinkConfig.getInteger(TaskManagerOptions.NUM_TASK_SLOTS); + this.taskExecutorResourceSpec = TaskExecutorResourceUtils.resourceSpecFromConfig(flinkConfig); + this.defaultMemoryMB = taskExecutorResourceSpec.getTotalProcessMemorySize().getMebiBytes(); + + this.resourceProfilesPerWorker = createWorkerSlotProfiles(flinkC
[GitHub] [flink] zhijiangW commented on a change in pull request #9984: [FLINK-9495][kubernetes] Implement ResourceManager for Kubernetes.
zhijiangW commented on a change in pull request #9984: [FLINK-9495][kubernetes] Implement ResourceManager for Kubernetes. URL: https://github.com/apache/flink/pull/9984#discussion_r354685039 ## File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java ## @@ -0,0 +1,340 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions; +import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient; +import org.apache.flink.kubernetes.kubeclient.KubeClientFactory; +import org.apache.flink.kubernetes.kubeclient.TaskManagerPodParameter; +import org.apache.flink.kubernetes.kubeclient.resources.KubernetesPod; +import org.apache.flink.kubernetes.taskmanager.KubernetesTaskExecutorRunner; +import org.apache.flink.kubernetes.utils.Constants; +import org.apache.flink.kubernetes.utils.KubernetesUtils; +import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.clusterframework.BootstrapTools; +import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.entrypoint.ClusterInformation; +import org.apache.flink.runtime.entrypoint.parser.CommandLineOptions; +import org.apache.flink.runtime.heartbeat.HeartbeatServices; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.metrics.groups.ResourceManagerMetricGroup; +import org.apache.flink.runtime.resourcemanager.ActiveResourceManager; +import org.apache.flink.runtime.resourcemanager.JobLeaderIdService; +import org.apache.flink.runtime.resourcemanager.ResourceManager; +import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException; +import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager; +import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.runtime.rpc.RpcService; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.File; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicLong; + +/** + * Kubernetes specific implementation of the {@link ResourceManager}. + */ +public class KubernetesResourceManager extends ActiveResourceManager + implements FlinkKubeClient.PodCallbackHandler { + + private static final Logger LOG = LoggerFactory.getLogger(KubernetesResourceManager.class); + + /** The taskmanager pod name pattern is {clusterId}-{taskmanager}-{attemptId}-{podIndex}. */ + private static final String TASK_MANAGER_POD_FORMAT = "%s-taskmanager-%d-%d"; + + private final Map workerNodes = new HashMap<>(); + + private final double defaultCpus; + + /** When ResourceManager failover, the max attempt should recover. */ + private final AtomicLong currentMaxAttemptId = new AtomicLong(0); + + /** Current max pod index. When creating a new pod, it should increase one. */ + private final AtomicLong currentMaxPodId = new AtomicLong(0); + + private final String clusterId; + + private final FlinkKubeClient kubeClient; + + private final ContaineredTaskManagerParameters taskManagerParameters; + + private final List taskManagerStartCommand; + + /** The number of pods requested, but not yet granted. */ + private int numPendingPodRequests = 0; + + public KubernetesResourceManager( + RpcService rpcService, + String resourceManagerEndpointId, + ResourceID resourceId, + Configuration flinkConfig, +
[GitHub] [flink] zhijiangW commented on a change in pull request #9984: [FLINK-9495][kubernetes] Implement ResourceManager for Kubernetes.
zhijiangW commented on a change in pull request #9984: [FLINK-9495][kubernetes] Implement ResourceManager for Kubernetes. URL: https://github.com/apache/flink/pull/9984#discussion_r354682932 ## File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java ## @@ -0,0 +1,340 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions; +import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient; +import org.apache.flink.kubernetes.kubeclient.KubeClientFactory; +import org.apache.flink.kubernetes.kubeclient.TaskManagerPodParameter; +import org.apache.flink.kubernetes.kubeclient.resources.KubernetesPod; +import org.apache.flink.kubernetes.taskmanager.KubernetesTaskExecutorRunner; +import org.apache.flink.kubernetes.utils.Constants; +import org.apache.flink.kubernetes.utils.KubernetesUtils; +import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.clusterframework.BootstrapTools; +import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.entrypoint.ClusterInformation; +import org.apache.flink.runtime.entrypoint.parser.CommandLineOptions; +import org.apache.flink.runtime.heartbeat.HeartbeatServices; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.metrics.groups.ResourceManagerMetricGroup; +import org.apache.flink.runtime.resourcemanager.ActiveResourceManager; +import org.apache.flink.runtime.resourcemanager.JobLeaderIdService; +import org.apache.flink.runtime.resourcemanager.ResourceManager; +import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException; +import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager; +import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.runtime.rpc.RpcService; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.File; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicLong; + +/** + * Kubernetes specific implementation of the {@link ResourceManager}. + */ +public class KubernetesResourceManager extends ActiveResourceManager + implements FlinkKubeClient.PodCallbackHandler { + + private static final Logger LOG = LoggerFactory.getLogger(KubernetesResourceManager.class); + + /** The taskmanager pod name pattern is {clusterId}-{taskmanager}-{attemptId}-{podIndex}. */ + private static final String TASK_MANAGER_POD_FORMAT = "%s-taskmanager-%d-%d"; + + private final Map workerNodes = new HashMap<>(); + + private final double defaultCpus; + + /** When ResourceManager failover, the max attempt should recover. */ + private final AtomicLong currentMaxAttemptId = new AtomicLong(0); + + /** Current max pod index. When creating a new pod, it should increase one. */ + private final AtomicLong currentMaxPodId = new AtomicLong(0); + + private final String clusterId; + + private final FlinkKubeClient kubeClient; + + private final ContaineredTaskManagerParameters taskManagerParameters; + + private final List taskManagerStartCommand; + + /** The number of pods requested, but not yet granted. */ + private int numPendingPodRequests = 0; + + public KubernetesResourceManager( + RpcService rpcService, + String resourceManagerEndpointId, + ResourceID resourceId, + Configuration flinkConfig, +
[GitHub] [flink] zhijiangW commented on a change in pull request #9984: [FLINK-9495][kubernetes] Implement ResourceManager for Kubernetes.
zhijiangW commented on a change in pull request #9984: [FLINK-9495][kubernetes] Implement ResourceManager for Kubernetes. URL: https://github.com/apache/flink/pull/9984#discussion_r354685940 ## File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java ## @@ -0,0 +1,340 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions; +import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient; +import org.apache.flink.kubernetes.kubeclient.KubeClientFactory; +import org.apache.flink.kubernetes.kubeclient.TaskManagerPodParameter; +import org.apache.flink.kubernetes.kubeclient.resources.KubernetesPod; +import org.apache.flink.kubernetes.taskmanager.KubernetesTaskExecutorRunner; +import org.apache.flink.kubernetes.utils.Constants; +import org.apache.flink.kubernetes.utils.KubernetesUtils; +import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.clusterframework.BootstrapTools; +import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.entrypoint.ClusterInformation; +import org.apache.flink.runtime.entrypoint.parser.CommandLineOptions; +import org.apache.flink.runtime.heartbeat.HeartbeatServices; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.metrics.groups.ResourceManagerMetricGroup; +import org.apache.flink.runtime.resourcemanager.ActiveResourceManager; +import org.apache.flink.runtime.resourcemanager.JobLeaderIdService; +import org.apache.flink.runtime.resourcemanager.ResourceManager; +import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException; +import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager; +import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.runtime.rpc.RpcService; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.File; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicLong; + +/** + * Kubernetes specific implementation of the {@link ResourceManager}. + */ +public class KubernetesResourceManager extends ActiveResourceManager + implements FlinkKubeClient.PodCallbackHandler { + + private static final Logger LOG = LoggerFactory.getLogger(KubernetesResourceManager.class); + + /** The taskmanager pod name pattern is {clusterId}-{taskmanager}-{attemptId}-{podIndex}. */ + private static final String TASK_MANAGER_POD_FORMAT = "%s-taskmanager-%d-%d"; + + private final Map workerNodes = new HashMap<>(); + + private final double defaultCpus; + + /** When ResourceManager failover, the max attempt should recover. */ + private final AtomicLong currentMaxAttemptId = new AtomicLong(0); + + /** Current max pod index. When creating a new pod, it should increase one. */ + private final AtomicLong currentMaxPodId = new AtomicLong(0); + + private final String clusterId; + + private final FlinkKubeClient kubeClient; + + private final ContaineredTaskManagerParameters taskManagerParameters; + + private final List taskManagerStartCommand; + + /** The number of pods requested, but not yet granted. */ + private int numPendingPodRequests = 0; + + public KubernetesResourceManager( + RpcService rpcService, + String resourceManagerEndpointId, + ResourceID resourceId, + Configuration flinkConfig, +
[GitHub] [flink] KurtYoung commented on a change in pull request #10454: [FLINK-13195][sql-client] Add create(drop) table support for SqlClient
KurtYoung commented on a change in pull request #10454: [FLINK-13195][sql-client] Add create(drop) table support for SqlClient URL: https://github.com/apache/flink/pull/10454#discussion_r354682103 ## File path: flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java ## @@ -534,6 +540,26 @@ private boolean callInsertInto(SqlCommandCall cmdCall) { return true; } + private void callCreateTable(SqlCommandCall cmdCall) { + try { + executor.createTable(sessionId, cmdCall.operands[0]); + printInfo(CliStrings.MESSAGE_TABLE_CREATED); + } catch (SqlExecutionException e) { + printExecutionException(e); + return; + } + terminal.flush(); Review comment: why flush terminal here but not in drop table? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] KurtYoung commented on a change in pull request #10454: [FLINK-13195][sql-client] Add create(drop) table support for SqlClient
KurtYoung commented on a change in pull request #10454: [FLINK-13195][sql-client] Add create(drop) table support for SqlClient URL: https://github.com/apache/flink/pull/10454#discussion_r354685477 ## File path: flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java ## @@ -154,8 +160,29 @@ public ExecutionContext( ClusterClientServiceLoader clusterClientServiceLoader, Options commandLineOptions, List availableCommandLines) throws FlinkException { + this( + environment, + originalSessionContext, + null, + dependencies, + flinkConfig, + new DefaultClusterClientServiceLoader(), + commandLineOptions, + availableCommandLines); + } + + public ExecutionContext( Review comment: make all construction `private` if you want to introduce `Builder` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] KurtYoung commented on a change in pull request #10454: [FLINK-13195][sql-client] Add create(drop) table support for SqlClient
KurtYoung commented on a change in pull request #10454: [FLINK-13195][sql-client] Add create(drop) table support for SqlClient URL: https://github.com/apache/flink/pull/10454#discussion_r354684030 ## File path: flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java ## @@ -428,6 +485,12 @@ private void initializeTableEnvironment() { modules.forEach(tableEnv::loadModule); } + if (catalogManager != null) { + registerFunctions(); Review comment: why register function if we already have the catalog manager? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] KurtYoung commented on a change in pull request #10454: [FLINK-13195][sql-client] Add create(drop) table support for SqlClient
KurtYoung commented on a change in pull request #10454: [FLINK-13195][sql-client] Add create(drop) table support for SqlClient URL: https://github.com/apache/flink/pull/10454#discussion_r354684496 ## File path: flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java ## @@ -584,4 +655,69 @@ private Pipeline createPipeline(String name) { return execEnv.createProgramPlan(name); } } + + //~ Inner Class --- + + /** Builder for {@link ExecutionContext}. */ + public static class Builder { + // Required members. + private final SessionContext sessionContext; + private final List dependencies; + private final Configuration configuration; + private final ClusterClientServiceLoader serviceLoader; + private final Options commandLineOptions; + private final List commandLines; + + private Environment defaultEnv; + private Environment env; + + // Optional members. Review comment: use Nullable? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] KurtYoung commented on a change in pull request #10454: [FLINK-13195][sql-client] Add create(drop) table support for SqlClient
KurtYoung commented on a change in pull request #10454: [FLINK-13195][sql-client] Add create(drop) table support for SqlClient URL: https://github.com/apache/flink/pull/10454#discussion_r354684576 ## File path: flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java ## @@ -584,4 +655,69 @@ private Pipeline createPipeline(String name) { return execEnv.createProgramPlan(name); } } + + //~ Inner Class --- + + /** Builder for {@link ExecutionContext}. */ + public static class Builder { + // Required members. + private final SessionContext sessionContext; + private final List dependencies; + private final Configuration configuration; + private final ClusterClientServiceLoader serviceLoader; + private final Options commandLineOptions; + private final List commandLines; + + private Environment defaultEnv; + private Environment env; Review comment: use another meaningful name rather than `env` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] KurtYoung commented on a change in pull request #10454: [FLINK-13195][sql-client] Add create(drop) table support for SqlClient
KurtYoung commented on a change in pull request #10454: [FLINK-13195][sql-client] Add create(drop) table support for SqlClient URL: https://github.com/apache/flink/pull/10454#discussion_r354683491 ## File path: flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java ## @@ -428,6 +485,12 @@ private void initializeTableEnvironment() { modules.forEach(tableEnv::loadModule); } + if (catalogManager != null) { + registerFunctions(); + // No need to register the catalogs info if already inherit from the same session. + return; Review comment: step 7 can't be skipped? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-14952) Yarn containers can exceed physical memory limits when using BoundedBlockingSubpartition.
[ https://issues.apache.org/jira/browse/FLINK-14952?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16989448#comment-16989448 ] Yingjie Cao commented on FLINK-14952: - [~gjy] By "FILE-FILE" mode, I mean using BoundedBlockingSubpartitionType.FILE. As explained by [~pnowojski], results are written to a file and read from a file using read/write API of FileChannel. When reading from files, two buffers are needed by each subpartition to store the read data. For MMAP read mode, the extra read buffers are not needed. > Yarn containers can exceed physical memory limits when using > BoundedBlockingSubpartition. > - > > Key: FLINK-14952 > URL: https://issues.apache.org/jira/browse/FLINK-14952 > Project: Flink > Issue Type: Bug > Components: Deployment / YARN, Runtime / Network >Affects Versions: 1.9.1 >Reporter: Piotr Nowojski >Priority: Blocker > Fix For: 1.10.0 > > > As [reported by a user on the user mailing > list|http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/CoGroup-SortMerger-performance-degradation-from-1-6-4-1-9-1-td31082.html], > combination of using {{BoundedBlockingSubpartition}} with yarn containers > can cause yarn container to exceed memory limits. > {quote}2019-11-19 12:49:23,068 INFO org.apache.flink.yarn.YarnResourceManager > - Closing TaskExecutor connection container_e42_1574076744505_9444_01_04 > because: Container > [pid=42774,containerID=container_e42_1574076744505_9444_01_04] is running > beyond physical memory limits. Current usage: 12.0 GB of 12 GB physical > memory used; 13.9 GB of 25.2 GB virtual memory used. Killing container. > {quote} > This is probably happening because memory usage of mmap is not capped and not > accounted by configured memory limits, however yarn is tracking this memory > usage and once Flink exceeds some threshold, container is being killed. > Workaround is to overrule default value and force Flink to not user mmap, by > setting a secret (🤫) config option: > {noformat} > taskmanager.network.bounded-blocking-subpartition-type: file > {noformat} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] KurtYoung commented on a change in pull request #10454: [FLINK-13195][sql-client] Add create(drop) table support for SqlClient
KurtYoung commented on a change in pull request #10454: [FLINK-13195][sql-client] Add create(drop) table support for SqlClient URL: https://github.com/apache/flink/pull/10454#discussion_r354684829 ## File path: flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java ## @@ -198,37 +198,24 @@ public void start() { // nothing to do yet } - /** -* Create a new {@link ExecutionContext} by merging the default environment the the environment in session context. -*/ - private ExecutionContext createExecutionContext(SessionContext sessionContext) { - Environment mergedEnv = Environment.merge(defaultEnvironment, sessionContext.getSessionEnv()); - return createExecutionContext(mergedEnv, sessionContext); - } - - /** -* Create a new {@link ExecutionContext} by using the given environment. -*/ - private ExecutionContext createExecutionContext(Environment environment, SessionContext sessionContext) { - try { - return new ExecutionContext<>( - environment, - sessionContext, - dependencies, - flinkConfig, - clusterClientServiceLoader, - commandLineOptions, - commandLines); - } catch (Throwable t) { - // catch everything such that a configuration does not crash the executor - throw new SqlExecutionException("Could not create execution context.", t); - } + /** Returns ExecutionContext.Builder with given {@link SessionContext} session context. */ + private ExecutionContext.Builder execContextBuilder(SessionContext sessionContext) { Review comment: createExecutionContextBuilder? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] KurtYoung commented on a change in pull request #10454: [FLINK-13195][sql-client] Add create(drop) table support for SqlClient
KurtYoung commented on a change in pull request #10454: [FLINK-13195][sql-client] Add create(drop) table support for SqlClient URL: https://github.com/apache/flink/pull/10454#discussion_r354683869 ## File path: flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java ## @@ -428,6 +485,12 @@ private void initializeTableEnvironment() { modules.forEach(tableEnv::loadModule); } + if (catalogManager != null) { Review comment: I think you can split the logic into 2 groups: 1. initializing this with previous catalog manager 2. initializing this without previous catalog manager and do a if-else here. Step 3-6 can be extracted to a new init catalog method This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #9984: [FLINK-9495][kubernetes] Implement ResourceManager for Kubernetes.
flinkbot edited a comment on issue #9984: [FLINK-9495][kubernetes] Implement ResourceManager for Kubernetes. URL: https://github.com/apache/flink/pull/9984#issuecomment-545881191 ## CI report: * 802ebf37e3d932169f9826b40df483bb5e9ac064 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/133366796) * f16938ce2fb38ae216def737d14643b94d6083a1 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/133940607) * 20de5cfc7af9a8ba57080d5218fd0293f393a40e : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/133998545) * 56bfbb65802c1d5c48caa625a152070934bb5d79 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/134010321) * 3ea229382fef64b1046673c79ff845d4689c5db4 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/137145261) * 3169988a33e0126e79cd449740c93d3561296ead : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/138017410) * a0dd858b0b91443fc87895a2d32ebfbbc0b9fe4c : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/138730857) * 86b4537979265a0fbecf7c1841ed8fd2f7ebfd86 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/138770610) * d77f83e133b497e42cd85aeaf95e625411274c92 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/139116897) * f2ed0e4b8d37dc3d5b2b770d943e703f4d893da0 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/139256990) * ce01f742aba3a683b98a0ea7da47e678d30c12be : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/139290769) * 7392ac618203c2544ab4a313a7068e03fcc147a7 : CANCELED [Build](https://travis-ci.com/flink-ci/flink/builds/139453237) * 3b3e23e69bde65f504ea5d46ef7c2d55ba04c0af : CANCELED [Build](https://travis-ci.com/flink-ci/flink/builds/139459406) * 71c087913517972655181e0119588c886e4243cb : CANCELED [Build](https://travis-ci.com/flink-ci/flink/builds/139461705) * 9be9f9f2338a2a953a76cc60057554d7d90157f9 : CANCELED [Build](https://travis-ci.com/flink-ci/flink/builds/139518666) * bb01ae34a8d6087f4aef4a989c0d14d576df601f : CANCELED [Build](https://travis-ci.com/flink-ci/flink/builds/139523468) * fd9cdf56812f2d0c9d659b69d9fb243d58ef48a3 : CANCELED [Build](https://travis-ci.com/flink-ci/flink/builds/139555865) * c647669902f574c3ee23adc7efbabc859d93c0e4 : CANCELED [Build](https://travis-ci.com/flink-ci/flink/builds/139627616) * cc29fb3058998b1489451d53ad04d9e9886f4193 : CANCELED [Build](https://travis-ci.com/flink-ci/flink/builds/139635222) * 3d315606bd4fdf826ef823631116a189810dab6a : CANCELED [Build](https://travis-ci.com/flink-ci/flink/builds/139641767) * 37bfdff9aa766deaac10d9c5407cc244d14d1085 : PENDING [Build](https://travis-ci.com/flink-ci/flink/builds/139643868) * daa0ff528a8ec3c2dcbd33927099cee2d8faa113 : UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #10453: [FLINK-14026][python] Manage the resource of Python worker properly
flinkbot edited a comment on issue #10453: [FLINK-14026][python] Manage the resource of Python worker properly URL: https://github.com/apache/flink/pull/10453#issuecomment-562427019 ## CI report: * 3204767d620e3f8469819ef3155a6cc559745860 : CANCELED [Build](https://travis-ci.com/flink-ci/flink/builds/139641761) * 19e8894ca5fc0f90e02694e0400b4e67adbbb385 : CANCELED [Build](https://travis-ci.com/flink-ci/flink/builds/139643834) * 3650c724e5c3ffe5734569c5e31bfe049c04a6de : UNKNOWN * 93899035a1541efd6a44b3ff85bad868bb87f903 : PENDING [Build](https://travis-ci.com/flink-ci/flink/builds/139645881) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-15089) Pulsar Catalog
Yijie Shen created FLINK-15089: -- Summary: Pulsar Catalog Key: FLINK-15089 URL: https://issues.apache.org/jira/browse/FLINK-15089 Project: Flink Issue Type: Sub-task Components: Connectors / Common Reporter: Yijie Shen Per discussion in the mailing list, A Pulsar Catalog implementation is made to a single task. The design doc is: [https://docs.google.com/document/d/1LMnABtXn-wQedsmWv8hopvx-B-jbdr8-jHbIiDhdsoE/edit?usp=sharing] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] flinkbot edited a comment on issue #10375: [FLINK-14845][runtime] Introduce data compression to reduce disk and network IO of shuffle.
flinkbot edited a comment on issue #10375: [FLINK-14845][runtime] Introduce data compression to reduce disk and network IO of shuffle. URL: https://github.com/apache/flink/pull/10375#issuecomment-560214201 ## CI report: * af7ab17308bafebf22198881ad59ec4516da43e2 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/138883739) * 02ec2e94d3a782c06b4c46c93445a098887b4e55 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/138891350) * a83e8f46acdc7e4a9e9b1c4f0de83758d4a4e77e : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/138893241) * 1c62e5a4d0f73a0150104a5b1587a674e4369a2b : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/138941812) * fc33c72e50a2db55f9ce029d8af558ecedbcc39f : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/139083473) * 2f538ab20262fbe9981d4851b93e9bee04fc6ad0 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/139147407) * 915e45f5d61583e7578751944ac46766505a8c34 : UNKNOWN * 29936b5ac7e15863c1b109a17ac7cc07dd7cc225 : CANCELED [Build](https://travis-ci.com/flink-ci/flink/builds/139541810) * 055230e7259e53159af46b11be308e88be935d57 : CANCELED [Build](https://travis-ci.com/flink-ci/flink/builds/139555800) * 727261f2bd508985133bb86703727cd1eb93363d : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/139562347) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] cshuo commented on a change in pull request #10316: [FLINK-14624][table-blink] Support computed column as rowtime attribute
cshuo commented on a change in pull request #10316: [FLINK-14624][table-blink] Support computed column as rowtime attribute URL: https://github.com/apache/flink/pull/10316#discussion_r354680497 ## File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/wmassigners/MiniBatchedWatermarkAssignerOperator.java ## @@ -1,208 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.runtime.operators.wmassigners; - -import org.apache.flink.api.common.functions.util.FunctionUtils; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.streaming.api.operators.AbstractStreamOperator; -import org.apache.flink.streaming.api.operators.ChainingStrategy; -import org.apache.flink.streaming.api.operators.OneInputStreamOperator; -import org.apache.flink.streaming.api.watermark.Watermark; -import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.apache.flink.streaming.runtime.streamstatus.StreamStatus; -import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer; -import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback; -import org.apache.flink.table.dataformat.BaseRow; -import org.apache.flink.table.runtime.generated.WatermarkGenerator; - -import static org.apache.flink.util.Preconditions.checkArgument; - -/** - * A stream operator that extracts timestamps from stream elements and - * generates watermarks with specified emit latency. - * - * The difference between this operator and {@link WatermarkAssignerOperator} is that: - * - * This operator has an additional parameter {@link #minibatchInterval} which is - * used to buffer watermarks and emit in an aligned interval with following window operators. - * - */ -public class MiniBatchedWatermarkAssignerOperator - extends AbstractStreamOperator - implements OneInputStreamOperator, ProcessingTimeCallback { - - private static final long serialVersionUID = 1L; - - /** The field index of rowtime attribute. */ - private final int rowtimeFieldIndex; - - /** The watermark generator which generates watermark from the input row. */ - private final WatermarkGenerator watermarkGenerator; - - /** The idle timeout for how long it doesn't receive elements to mark this channel idle. */ - private final long idleTimeout; - - /** The event-time interval for emitting watermarks. */ - private final long minibatchInterval; - - /** Current watermark of this operator, but may not be emitted. */ - private transient long currentWatermark; - - /** The next watermark to be emitted. */ - private transient long nextWatermark; - - /** The processing time when the last record is processed. */ - private transient long lastRecordTime; - - /** Channel status maintainer which is used to inactive channel when channel is idle. */ - private transient StreamStatusMaintainer streamStatusMaintainer; - - /** Flag to prevent duplicate function.close() calls in close() and dispose(). */ - private transient boolean functionsClosed = false; - - public MiniBatchedWatermarkAssignerOperator( - int rowtimeFieldIndex, - WatermarkGenerator watermarkGenerator, - long idleTimeout, - long minibatchInterval) { - checkArgument(minibatchInterval > 0, "The inferred emit latency should be larger than 0"); - this.rowtimeFieldIndex = rowtimeFieldIndex; - this.watermarkGenerator = watermarkGenerator; - this.idleTimeout = idleTimeout; - this.minibatchInterval = minibatchInterval; - this.chainingStrategy = ChainingStrategy.ALWAYS; - } - - @Override - public void open() throws Exception { - super.open(); - - currentWatermark = 0; - nextWatermark = getMiniBatchStart(currentWatermark, minibatchInterval) + minibatchInterval - 1; - - if (idleTimeout > 0) { - this.lastRecordTime = getProce
[GitHub] [flink] cshuo commented on a change in pull request #10316: [FLINK-14624][table-blink] Support computed column as rowtime attribute
cshuo commented on a change in pull request #10316: [FLINK-14624][table-blink] Support computed column as rowtime attribute URL: https://github.com/apache/flink/pull/10316#discussion_r354677891 ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/MiniBatchIntervalInferRule.scala ## @@ -120,9 +116,22 @@ class MiniBatchIntervalInferRule extends RelOptRule( } } - private def isTableSourceScan(node: RelNode): Boolean = node match { -case _: StreamExecDataStreamScan | _: StreamExecTableSourceScan => true -case _ => false + private def shouldAppendMiniBatchAssignerNode(node: RelNode): Boolean = { +val mode = node.getTraitSet + .getTrait(MiniBatchIntervalTraitDef.INSTANCE) + .getMiniBatchInterval + .mode +node match { + case _: StreamExecDataStreamScan | _: StreamExecTableSourceScan => +// append minibatch node if the mode is not NONE and reach a source leaf node +mode == MiniBatchMode.RowTime || mode == MiniBatchMode.ProcTime + case _: StreamExecWatermarkAssigner => Review comment: IIUC, for the case that there is only group aggregation with a **redundant watermark** defined in the sql job, minibatch will not worked even when the user set `table.exec.mini-batch.enabled = true`, which confuses users as this is transparent to them. I'm wondering if some checks can be added to alert user? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] JingsongLi commented on a change in pull request #10452: [FLINK-15035][table-planner-blink] Introduce unknown memory setting to table in blink planner
JingsongLi commented on a change in pull request #10452: [FLINK-15035][table-planner-blink] Introduce unknown memory setting to table in blink planner URL: https://github.com/apache/flink/pull/10452#discussion_r354681473 ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/exec/BatchExecNode.scala ## @@ -32,4 +33,13 @@ trait BatchExecNode[T] extends ExecNode[BatchPlanner, T] with Logging { */ def getDamBehavior: DamBehavior + def setManagedMemoryWeight[X]( + transformation: Transformation[X], memoryBytes: Long): Transformation[X] = { +// Using Bytes can easily overflow +// Using MebiBytes to cast to int +// Careful about zero +val memoryMB = Math.max(1, (memoryBytes >> 20).toInt) Review comment: Yeah, It's a bit hacky, looks like we create these hacky code to work around runtime default value. Because our current `ManagedMemoryWeight` settings are very large. So I think default 1 is very small and is not a serious problem, so I prefer just check in `ExecutorUtils#setBatchProperties`, and warn log when weight is default 1. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #10449: [FLINK-7289][state-backend] Integrate RocksDB memory use with managed memory
flinkbot edited a comment on issue #10449: [FLINK-7289][state-backend] Integrate RocksDB memory use with managed memory URL: https://github.com/apache/flink/pull/10449#issuecomment-562311621 ## CI report: * 354f5dcf865b2de3b8b426ae40f6633ff1755e4a : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/13956) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #10453: [FLINK-14026][python] Manage the resource of Python worker properly
flinkbot edited a comment on issue #10453: [FLINK-14026][python] Manage the resource of Python worker properly URL: https://github.com/apache/flink/pull/10453#issuecomment-562427019 ## CI report: * 3204767d620e3f8469819ef3155a6cc559745860 : CANCELED [Build](https://travis-ci.com/flink-ci/flink/builds/139641761) * 19e8894ca5fc0f90e02694e0400b4e67adbbb385 : CANCELED [Build](https://travis-ci.com/flink-ci/flink/builds/139643834) * 3650c724e5c3ffe5734569c5e31bfe049c04a6de : UNKNOWN * 93899035a1541efd6a44b3ff85bad868bb87f903 : UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #10445: [FLINK-14407][yarn] Retry S3 tests on failure
flinkbot edited a comment on issue #10445: [FLINK-14407][yarn] Retry S3 tests on failure URL: https://github.com/apache/flink/pull/10445#issuecomment-562177526 ## CI report: * 9709c3ce629fb1ee84c180a92b9077dc56a18ab0 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/139548882) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] TisonKun edited a comment on issue #10369: [FLINK-14893][flink-core]Try child classLoader when parent classLoader could not load in parentFirstPatterns
TisonKun edited a comment on issue #10369: [FLINK-14893][flink-core]Try child classLoader when parent classLoader could not load in parentFirstPatterns URL: https://github.com/apache/flink/pull/10369#issuecomment-562445489 > > @wangxlong thanks for your contribution. Changes look good to me. It would be better if you can write a test for the specific case and I think it is somewhat necessary :-) > > Hi @TisonKun . I am sorry for replying late. I found ClassLoader#loadClass will invoke findClass when loadClass failed. So it is not necessary to invoke findClass by ourselves. What do you think of it? OK I think it is the reason why we tend to add a test for bisect the behavior :-) IIUC you mean that with or without this patch, classloading always works as expected, right? If so, it'd better we just close this ticket. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] TisonKun commented on issue #10369: [FLINK-14893][flink-core]Try child classLoader when parent classLoader could not load in parentFirstPatterns
TisonKun commented on issue #10369: [FLINK-14893][flink-core]Try child classLoader when parent classLoader could not load in parentFirstPatterns URL: https://github.com/apache/flink/pull/10369#issuecomment-562445489 > > @wangxlong thanks for your contribution. Changes look good to me. It would be better if you can write a test for the specific case and I think it is somewhat necessary :-) > > Hi @TisonKun . I am sorry for replying late. I found ClassLoader#loadClass will invoke findClass when loadClass failed. So it is not necessary to invoke findClass by ourselves. What do you think of it? OK I think it is the reason why we tend to add a test for bisect the behavior :-) IIUC you mean that with or without this patch, classloading always works as expected, right? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] xintongsong edited a comment on issue #10146: [FLINK-14188][runtime] TaskExecutor derive and register with default slot resource profile
xintongsong edited a comment on issue #10146: [FLINK-14188][runtime] TaskExecutor derive and register with default slot resource profile URL: https://github.com/apache/flink/pull/10146#issuecomment-562425217 PR updatesd. As discussed with Andrey and Till offline, we removed the changes that introduce default slot fraction, to avoid public api changes that will not actually be used in the upcoming 1.10 release. Travis passed: https://travis-ci.org/xintongsong/flink/builds/621435726 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services